git.net

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Java Code for Kafka Flink SQL


Hi, 

Could any one help me by providing some sample java code which Flink
subscribes data data from kafka and then doing SQL queries using SQL APIs. 

Also, what are the compatible versions for java/kafka/flink.

Since, I am beginner and there are many exceptions in my code


public class FlinkKafkaSQL {
>         public static void main(String[] args) throws Exception {
>             // Read parameters from command line
>             final ParameterTool params = ParameterTool.fromArgs(args);
>
>             if(params.getNumberOfParameters() < 5) {
>                 System.out.println("\nUsage: FlinkReadKafka " +
>                                    "--read-topic <topic> " +
>                                    "--write-topic <topic> " +
>                                    "--bootstrap.servers <kafka brokers> "
> +
>                                    "zookeeper.connect" +
>                                    "--group.id <groupid>");
>                 return;
>             }
>
>             // setup streaming environment
>             StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
>
> env.getConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart(4,
> 10000));
>             env.enableCheckpointing(300000); // 300 seconds
>             env.getConfig().setGlobalJobParameters(params);
>
>             StreamTableEnvironment tableEnv =
> TableEnvironment.getTableEnvironment(env);
>
>             // specify JSON field names and types
>
>
>             TypeInformation<Row> typeInfo2 = Types.ROW(
>                     new String[] { "iotdevice", "sensorID" },
>                     new TypeInformation<?>[] { Types.STRING()}
>             );
>
>             // create a new tablesource of JSON from kafka
>             KafkaJsonTableSource kafkaTableSource = new
> Kafka09JsonTableSource(
>                     params.getRequired("read-topic"),
>                     params.getProperties(),
>                     typeInfo2);
>
>             // run some SQL to filter results where a key is not null
>             String sql = "SELECT sensorID " +
>                          "FROM iotdevice ";
>             tableEnv.registerTableSource("iotdevice", kafkaTableSource);
>             Table result = tableEnv.sql(sql);
>
>             // create a partition for the data going into kafka
>             FlinkFixedPartitioner partition =  new
> FlinkFixedPartitioner();
>
>             // create new tablesink of JSON to kafka
>             KafkaJsonTableSink kafkaTableSink = new Kafka09JsonTableSink(
>                     params.getRequired("write-topic"),
>                     params.getProperties(),
>                     partition);
>
>             result.writeToSink(kafkaTableSink);
>
>             env.execute("FlinkReadWriteKafkaJSON");
>         }
> }
>
>
> *This is the dependencies  in pom.xml*
>
>         <dependencies>
>             <dependency>
>                 <groupId>org.apache.flink</groupId>
>                 <artifactId>flink-java</artifactId>
>                 <version>1.3.0</version>
>             </dependency>
>             <dependency>
>                         <groupId>org.apache.flink</groupId>
>                         <artifactId>flink-streaming-java_2.11</artifactId>
>                         <version>1.3.0</version>
>                 </dependency>
>                 <dependency>
>                         <groupId>org.apache.flink</groupId>
>                         <artifactId>flink-clients_2.11</artifactId>
>                         <version>1.3.0</version>
>                 </dependency>
>                 <dependency>
>                         <groupId>org.apache.flink</groupId>
>                         <artifactId>flink-connector-kafka-0.9</artifactId>
>
> <version>1.3.0</version>
>                 </dependency>
>                 <dependency>
>                         <groupId>org.apache.flink</groupId>
>                         <artifactId>flink-table_2.11</artifactId>
>                         <version>1.3.0</version>
>                 </dependency>
>                 <dependency>
>                         <groupId>org.apache.flink</groupId>
>                         <artifactId>flink-core</artifactId>
>                         <version>1.3.0</version>
>                 </dependency>
>                 <dependency>
>                         <groupId>org.apache.flink</groupId>
>                         <artifactId>flink-streaming-
> scala_2.11</artifactId>
>                         <version>1.3.0</version>
>                 </dependency>
>         </dependencies>
>
>
> Regards.
>
>
>
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.
> n4.nabble.com/
>
 


Thank you.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/