[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: Datastream[Row] covert to table exception

Sorry, I didn't see you last mail. The code looks good actually. What is the result of `inputStream.getType` if you print it to the console?


Am 07.06.18 um 08:24 schrieb Timo Walther:

Row is a very special datatype where Flink cannot generate serializers based on the generics. By default DeserializationSchema uses reflection-based type analysis, you need to override the getResultType() method in WormholeDeserializationSchema. And specify the type information manually there.

Hope this helps.


Am 06.06.18 um 13:22 schrieb 孙森:
Hi ,

I've tried to to specify such a schema, when I read from kafka, and covert inputstream to table . But I got the exception:

  • Exception in thread "main" org.apache.flink.table.api.TableException: An input of GenericTypeInfo cannot be converted to Table. Please specify the type of the input with a RowTypeInfo

And the code here:

private def getSchemaMap(jsonSchema: String) = {
    val umsSchema = JsonUtils.json2caseClass[UmsSchema](jsonSchema)
    val fields = umsSchema.fields_get
    val fieldNameList = ListBuffer.empty[String]
    val fieldTypeList = ListBuffer.empty[TypeInformation[_]]
    fields.foreach {
      field =>
    (fieldNameList.toArray, fieldTypeList.toArray)

  private def fieldTypeMatch(umsFieldType: UmsFieldType): TypeInformation[_] = {
    umsFieldType match {
      case STRING => Types.STRING
      case INT => Types.INT
      case LONG => Types.LONG
      case FLOAT => Types.FLOAT
      case DOUBLE => Types.DOUBLE
      case BOOLEAN => Types.BOOLEAN
      case DATE => Types.SQL_DATE
      case DATETIME => Types.SQL_TIMESTAMP
      case DECIMAL => Types.DECIMAL

 val myConsumer: FlinkKafkaConsumer010[Row] = new FlinkKafkaConsumer010(topics, new WormholeDeserializationSchema(jsonSchema), properties)
 val inputStream: DataStream[Row] = env.addSource(myConsumer)
 val tableEnv = TableEnvironment.getTableEnvironment(env)<<—————exception here

Thanks !

( ! ) Warning: include(msgfooter.php): failed to open stream: No such file or directory in /var/www/git/apache-flink-users/msg09724.html on line 227
Call Stack
10.0009372792{main}( ).../msg09724.html:0

( ! ) Warning: include(): Failed opening 'msgfooter.php' for inclusion (include_path='.:/var/www/git') in /var/www/git/apache-flink-users/msg09724.html on line 227
Call Stack
10.0009372792{main}( ).../msg09724.html:0