git.net

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Ask for SQL using kafka in Flink


Hi, 

Could anyone help me to solve this problem 


/Exception in thread "main" java.lang.Error: Unresolved compilation problem: 
	The constructor Kafka09JsonTableSource(String, Properties,
TypeInformation<Row>) is undefined
/
*--This is the code *
public class FlinkKafkaSQL {
        public static void main(String[] args) throws Exception {
            // Read parameters from command line
            final ParameterTool params = ParameterTool.fromArgs(args);

            if(params.getNumberOfParameters() < 5) {
                System.out.println("\nUsage: FlinkReadKafka " +
                                   "--read-topic <topic> " +
                                   "--write-topic <topic> " +
                                   "--bootstrap.servers <kafka brokers> " +
                                   "zookeeper.connect" +
                                   "--group.id <groupid>");
                return;
            }

            // setup streaming environment
            StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
           
env.getConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart(4,
10000));
            env.enableCheckpointing(300000); // 300 seconds
            env.getConfig().setGlobalJobParameters(params);

            StreamTableEnvironment tableEnv =
TableEnvironment.getTableEnvironment(env);

            // specify JSON field names and types
          
            
            TypeInformation<Row> typeInfo2 = Types.ROW(
                    new String[] { "iotdevice", "sensorID" },
                    new TypeInformation<?>[] { Types.STRING()}
            );

            // create a new tablesource of JSON from kafka
            KafkaJsonTableSource kafkaTableSource = new
Kafka09JsonTableSource(
                    params.getRequired("read-topic"),
                    params.getProperties(),
                    typeInfo2);

            // run some SQL to filter results where a key is not null
            String sql = "SELECT sensorID " +
                         "FROM iotdevice ";
            tableEnv.registerTableSource("iotdevice", kafkaTableSource);
            Table result = tableEnv.sql(sql);

            // create a partition for the data going into kafka
            FlinkFixedPartitioner partition =  new FlinkFixedPartitioner();

            // create new tablesink of JSON to kafka
            KafkaJsonTableSink kafkaTableSink = new Kafka09JsonTableSink(
                    params.getRequired("write-topic"),
                    params.getProperties(),
                    partition);

            result.writeToSink(kafkaTableSink);

            env.execute("FlinkReadWriteKafkaJSON");
        }
}


*This is the dependencies  in pom.xml*

        <dependencies>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-java</artifactId>
                <version>1.3.0</version>
            </dependency>
            <dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-streaming-java_2.11</artifactId>
			<version>1.3.0</version>
		</dependency>
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-clients_2.11</artifactId>
			<version>1.3.0</version>
		</dependency>
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-connector-kafka-0.9</artifactId>
			
<version>1.3.0</version>
		</dependency>
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-table_2.11</artifactId>
			<version>1.3.0</version>
		</dependency>
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-core</artifactId>
			<version>1.3.0</version>
		</dependency>
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-streaming-scala_2.11</artifactId>
			<version>1.3.0</version>
		</dependency>
	</dependencies>


Regards.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/