git.net

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: Ask for SQL using kafka in Flink


Hi Radhya,

Can you provide which Flink version you are using? Based on the latest FLINK 1.5 release, Kafka09JsonTableSource takes:
/**
* Creates a Kafka 0.9 JSON {@link StreamTableSource}.
*
* @param topic Kafka topic to consume.
* @param properties Properties for the Kafka consumer.
* @param tableSchema The schema of the table.
* @param jsonSchema The schema of the JSON messages to decode from Kafka.
*/
Also, your type definition: TypeInformation<Row> typeInfo2 = Types.ROW(... arguments seem to have different length for schema names and types.

Thanks,
Rong

On Fri, Jun 1, 2018 at 9:09 AM, Radhya Sahal <radhya.sahal@xxxxxxxxx> wrote:
Hi,

Could anyone help me to solve this problem


/Exception in thread "main" java.lang.Error: Unresolved compilation problem:
        The constructor Kafka09JsonTableSource(String, Properties,
TypeInformation<Row>) is undefined
/
*--This is the code *
public class FlinkKafkaSQL {
        public static void main(String[] args) throws Exception {
            // Read parameters from command line
            final ParameterTool params = ParameterTool.fromArgs(args);

            if(params.getNumberOfParameters() < 5) {
                System.out.println("\nUsage: FlinkReadKafka " +
                                   "--read-topic <topic> " +
                                   "--write-topic <topic> " +
                                   "--bootstrap.servers <kafka brokers> " +
                                   "zookeeper.connect" +
                                   "--group.id <groupid>");
                return;
            }

            // setup streaming environment
            StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();

env.getConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart(4,
10000));
            env.enableCheckpointing(300000); // 300 seconds
            env.getConfig().setGlobalJobParameters(params);

            StreamTableEnvironment tableEnv =
TableEnvironment.getTableEnvironment(env);

            // specify JSON field names and types


            TypeInformation<Row> typeInfo2 = Types.ROW(
                    new String[] { "iotdevice", "sensorID" },
                    new TypeInformation<?>[] { Types.STRING()}
            );

            // create a new tablesource of JSON from kafka
            KafkaJsonTableSource kafkaTableSource = new
Kafka09JsonTableSource(
                    params.getRequired("read-topic"),
                    params.getProperties(),
                    typeInfo2);

            // run some SQL to filter results where a key is not null
            String sql = "SELECT sensorID " +
                         "FROM iotdevice ";
            tableEnv.registerTableSource("iotdevice", kafkaTableSource);
            Table result = tableEnv.sql(sql);

            // create a partition for the data going into kafka
            FlinkFixedPartitioner partition =  new FlinkFixedPartitioner();

            // create new tablesink of JSON to kafka
            KafkaJsonTableSink kafkaTableSink = new Kafka09JsonTableSink(
                    params.getRequired("write-topic"),
                    params.getProperties(),
                    partition);

            result.writeToSink(kafkaTableSink);

            env.execute("FlinkReadWriteKafkaJSON");
        }
}


*This is the dependencies  in pom.xml*

        <dependencies>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-java</artifactId>
                <version>1.3.0</version>
            </dependency>
            <dependency>
                        <groupId>org.apache.flink</groupId>
                        <artifactId>flink-streaming-java_2.11</artifactId>
                        <version>1.3.0</version>
                </dependency>
                <dependency>
                        <groupId>org.apache.flink</groupId>
                        <artifactId>flink-clients_2.11</artifactId>
                        <version>1.3.0</version>
                </dependency>
                <dependency>
                        <groupId>org.apache.flink</groupId>
                        <artifactId>flink-connector-kafka-0.9</artifactId>

<version>1.3.0</version>
                </dependency>
                <dependency>
                        <groupId>org.apache.flink</groupId>
                        <artifactId>flink-table_2.11</artifactId>
                        <version>1.3.0</version>
                </dependency>
                <dependency>
                        <groupId>org.apache.flink</groupId>
                        <artifactId>flink-core</artifactId>
                        <version>1.3.0</version>
                </dependency>
                <dependency>
                        <groupId>org.apache.flink</groupId>
                        <artifactId>flink-streaming-scala_2.11</artifactId>
                        <version>1.3.0</version>
                </dependency>
        </dependencies>


Regards.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/