git.net

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: Kafka Avro Table Source


Hi Will,

The community is currently working on improving the Kafka Avro integration
for Flink SQL.
There's a PR [1]. If you like, you could try it out and give some feedback.

Timo (in CC) has been working Kafka Avro and should be able to help with
any specific questions.

Best, Fabian

[1] https://github.com/apache/flink/pull/6218

2018-07-03 3:02 GMT+02:00 Will Du <willddy@xxxxxxxxx>:

> Hi folks,
> I am working on using avro table source mapping to kafka source. By
> looking at the example, I think the current Flink v1.5.0 connector is not
> flexible enough. I wonder if I have to specify the avro record class to
> read from Kafka.
>
> For withSchema, the schema can get from schema registry. However, the
> class of avro seems hard coded.
>
> thanks,
> Will
>
> KafkaTableSource source = Kafka010AvroTableSource.builder()
>   // set Kafka topic
>   .forTopic("sensors")
>   // set Kafka consumer properties
>   .withKafkaProperties(kafkaProps)
>   // set Table schema
>   .withSchema(TableSchema.builder()
>     .field("sensorId", Types.LONG())
>     .field("temp", Types.DOUBLE())
>     .field("time", Types.SQL_TIMESTAMP()).build())
>   // set class of Avro record*  .forAvroRecordClass(SensorReading.class)  // ? Any way to get this without hard code class*
>   .build();
>
>
>


( ! ) Warning: include(msgfooter.php): failed to open stream: No such file or directory in /var/www/git/apache-flink-development/msg07923.html on line 110
Call Stack
#TimeMemoryFunctionLocation
10.0007364552{main}( ).../msg07923.html:0

( ! ) Warning: include(): Failed opening 'msgfooter.php' for inclusion (include_path='.:/var/www/git') in /var/www/git/apache-flink-development/msg07923.html on line 110
Call Stack
#TimeMemoryFunctionLocation
10.0007364552{main}( ).../msg07923.html:0