git.net

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: Java Code for Kafka Flink SQL


Hi Rad,

at a first glance your example does not look too bad. Which exceptions do you get? Did you create your pom.xml with the provided template [1] and then added flink-table, flink-connector-kafkaXXX, flink-streaming-scala?

Regards,
Timo

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.5/quickstart/java_api_quickstart.html

Am 02.06.18 um 19:26 schrieb Rad Rad:
Hi,

Could any one help me by providing some sample java code which Flink
subscribes data data from kafka and then doing SQL queries using SQL APIs.

Also, what are the compatible versions for java/kafka/flink.

Since, I am beginner and there are many exceptions in my code


public class FlinkKafkaSQL {
         public static void main(String[] args) throws Exception {
             // Read parameters from command line
             final ParameterTool params = ParameterTool.fromArgs(args);

             if(params.getNumberOfParameters() < 5) {
                 System.out.println("\nUsage: FlinkReadKafka " +
                                    "--read-topic <topic> " +
                                    "--write-topic <topic> " +
                                    "--bootstrap.servers <kafka brokers> "
+
                                    "zookeeper.connect" +
                                    "--group.id <groupid>");
                 return;
             }

             // setup streaming environment
             StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();

env.getConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart(4,
10000));
             env.enableCheckpointing(300000); // 300 seconds
             env.getConfig().setGlobalJobParameters(params);

             StreamTableEnvironment tableEnv =
TableEnvironment.getTableEnvironment(env);

             // specify JSON field names and types


             TypeInformation<Row> typeInfo2 = Types.ROW(
                     new String[] { "iotdevice", "sensorID" },
                     new TypeInformation<?>[] { Types.STRING()}
             );

             // create a new tablesource of JSON from kafka
             KafkaJsonTableSource kafkaTableSource = new
Kafka09JsonTableSource(
                     params.getRequired("read-topic"),
                     params.getProperties(),
                     typeInfo2);

             // run some SQL to filter results where a key is not null
             String sql = "SELECT sensorID " +
                          "FROM iotdevice ";
             tableEnv.registerTableSource("iotdevice", kafkaTableSource);
             Table result = tableEnv.sql(sql);

             // create a partition for the data going into kafka
             FlinkFixedPartitioner partition =  new
FlinkFixedPartitioner();

             // create new tablesink of JSON to kafka
             KafkaJsonTableSink kafkaTableSink = new Kafka09JsonTableSink(
                     params.getRequired("write-topic"),
                     params.getProperties(),
                     partition);

             result.writeToSink(kafkaTableSink);

             env.execute("FlinkReadWriteKafkaJSON");
         }
}


*This is the dependencies  in pom.xml*

         <dependencies>
             <dependency>
                 <groupId>org.apache.flink</groupId>
                 <artifactId>flink-java</artifactId>
                 <version>1.3.0</version>
             </dependency>
             <dependency>
                         <groupId>org.apache.flink</groupId>
                         <artifactId>flink-streaming-java_2.11</artifactId>
                         <version>1.3.0</version>
                 </dependency>
                 <dependency>
                         <groupId>org.apache.flink</groupId>
                         <artifactId>flink-clients_2.11</artifactId>
                         <version>1.3.0</version>
                 </dependency>
                 <dependency>
                         <groupId>org.apache.flink</groupId>
                         <artifactId>flink-connector-kafka-0.9</artifactId>

<version>1.3.0</version>
                 </dependency>
                 <dependency>
                         <groupId>org.apache.flink</groupId>
                         <artifactId>flink-table_2.11</artifactId>
                         <version>1.3.0</version>
                 </dependency>
                 <dependency>
                         <groupId>org.apache.flink</groupId>
                         <artifactId>flink-core</artifactId>
                         <version>1.3.0</version>
                 </dependency>
                 <dependency>
                         <groupId>org.apache.flink</groupId>
                         <artifactId>flink-streaming-
scala_2.11</artifactId>
                         <version>1.3.0</version>
                 </dependency>
         </dependencies>


Regards.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.
n4.nabble.com/


Thank you.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/