git.net

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: Ask for SQL using kafka in Flink


Given the popularity of Flink SQL and Kafka as streaming source, I think we can add some examples of using Kafka[XXX]TableSource in flink-examples/flink-examples-table module. What do you guys think?

Cheers
Shuyi

On Mon, Jun 4, 2018 at 12:57 AM, Timo Walther <twalthr@xxxxxxxxxx> wrote:
Hi,

as you can see in code [1] Kafka09JsonTableSource takes a TableSchema. You can create table schema from type information see [2].

Regards,
Timo

[1] https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSource.java
[2] https://github.com/apache/flink/blob/master/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableSchema.scala

Am 02.06.18 um 18:31 schrieb Radhya Sahal:

Thanks Rong,

I used Flink 1.3.0 in case of using Flink 1.5 how can I define jsonschema?

Yes, there are two names but now I put one name only and I want to define
jsonschema.



Rong Rong wrote
Hi Radhya,

Can you provide which Flink version you are using? Based on the latest
FLINK 1.5 release, Kafka09JsonTableSource takes:

/**
  * Creates a Kafka 0.9 JSON {@link StreamTableSource}.
  *
  * @param topic       Kafka topic to consume.
  * @param properties  Properties for the Kafka consumer.
  * @param tableSchema The schema of the table.
  * @param jsonSchema  The schema of the JSON messages to decode from
Kafka.
  */

Also, your type definition: TypeInformation
<Row>
  typeInfo2 = Types.ROW(...
arguments seem to have different length for schema names and types.

Thanks,
Rong

On Fri, Jun 1, 2018 at 9:09 AM, Radhya Sahal &lt;
radhya.sahal@
&gt; wrote:

Hi,

Could anyone help me to solve this problem


/Exception in thread "main" java.lang.Error: Unresolved compilation
problem:
         The constructor Kafka09JsonTableSource(String, Properties,
TypeInformation
<Row>
) is undefined
/
*--This is the code *
public class FlinkKafkaSQL {
         public static void main(String[] args) throws Exception {
             // Read parameters from command line
             final ParameterTool params = ParameterTool.fromArgs(args);

             if(params.getNumberOfParameters() < 5) {
                 System.out.println("\nUsage: FlinkReadKafka " +
                                    "--read-topic
<topic>
  " +
                                    "--write-topic
<topic>
  " +
                                    "--bootstrap.servers
<kafka brokers>
  " +
                                    "zookeeper.connect" +
                                    "--group.id
<groupid>
");
                 return;
             }

             // setup streaming environment
             StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();

env.getConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart(4,
10000));
             env.enableCheckpointing(300000); // 300 seconds
             env.getConfig().setGlobalJobParameters(params);

             StreamTableEnvironment tableEnv =
TableEnvironment.getTableEnvironment(env);

             // specify JSON field names and types


             TypeInformation
<Row>
  typeInfo2 = Types.ROW(
                     new String[] { "iotdevice", "sensorID" },
                     new TypeInformation<?>[] { Types.STRING()}
             );

             // create a new tablesource of JSON from kafka
             KafkaJsonTableSource kafkaTableSource = new
Kafka09JsonTableSource(
                     params.getRequired("read-topic"),
                     params.getProperties(),
                     typeInfo2);

             // run some SQL to filter results where a key is not null
             String sql = "SELECT sensorID " +
                          "FROM iotdevice ";
             tableEnv.registerTableSource("iotdevice", kafkaTableSource);
             Table result = tableEnv.sql(sql);

             // create a partition for the data going into kafka
             FlinkFixedPartitioner partition =  new
FlinkFixedPartitioner();

             // create new tablesink of JSON to kafka
             KafkaJsonTableSink kafkaTableSink = new Kafka09JsonTableSink(
                     params.getRequired("write-topic"),
                     params.getProperties(),
                     partition);

             result.writeToSink(kafkaTableSink);

             env.execute("FlinkReadWriteKafkaJSON");
         }
}


*This is the dependencies  in pom.xml*

         
<dependencies>
             
<dependency>
                 
<groupId>
org.apache.flink
</groupId>
                 
<artifactId>
flink-java
</artifactId>
                 
<version>
1.3.0
</version>
             
</dependency>
             
<dependency>
                         
<groupId>
org.apache.flink
</groupId>
                         
<artifactId>
flink-streaming-java_2.11
</artifactId>
                         
<version>
1.3.0
</version>
                 
</dependency>
                 
<dependency>
                         
<groupId>
org.apache.flink
</groupId>
                         
<artifactId>
flink-clients_2.11
</artifactId>
                         
<version>
1.3.0
</version>
                 
</dependency>
                 
<dependency>
                         
<groupId>
org.apache.flink
</groupId>
                         
<artifactId>
flink-connector-kafka-0.9
</artifactId>

<version>
1.3.0
</version>
                 
</dependency>
                 
<dependency>
                         
<groupId>
org.apache.flink
</groupId>
                         
<artifactId>
flink-table_2.11
</artifactId>
                         
<version>
1.3.0
</version>
                 
</dependency>
                 
<dependency>
                         
<groupId>
org.apache.flink
</groupId>
                         
<artifactId>
flink-core
</artifactId>
                         
<version>
1.3.0
</version>
                 
</dependency>
                 
<dependency>
                         
<groupId>
org.apache.flink
</groupId>
                         
<artifactId>
flink-streaming-
scala_2.11
</artifactId>
                         
<version>
1.3.0
</version>
                 
</dependency>
         
</dependencies>

Regards.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.
n4.nabble.com/





--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/





--
"So you have to trust that the dots will somehow connect in your future."