Datastream[Row] covert to table exception

classic Classic list List threaded Threaded
5 messages Options
sen
Reply | Threaded
Open this post in threaded view
|

Datastream[Row] covert to table exception

sen
Hi ,
    
     

I've tried to to specify such a schema, when I read from kafka, and covert inputstream to table . But I got the exception:

  • Exception in thread "main" org.apache.flink.table.api.TableException: An input of GenericTypeInfo cannot be converted to Table. Please specify the type of the input with a RowTypeInfo

And the code here:


private def getSchemaMap(jsonSchema: String) = {
    val umsSchema = JsonUtils.json2caseClass[UmsSchema](jsonSchema)
    val fields = umsSchema.fields_get
    val fieldNameList = ListBuffer.empty[String]
    val fieldTypeList = ListBuffer.empty[TypeInformation[_]]
    fields.foreach {
      field =>
        fieldNameList.append(field.name)
        fieldTypeList.append(fieldTypeMatch(field.`type`))
    }
    println(fieldNameList)
    println(fieldTypeList)
    (fieldNameList.toArray, fieldTypeList.toArray)
  }


  private def fieldTypeMatch(umsFieldType: UmsFieldType): TypeInformation[_] = {
    umsFieldType match {
      case STRING => Types.STRING
      case INT => Types.INT
      case LONG => Types.LONG
      case FLOAT => Types.FLOAT
      case DOUBLE => Types.DOUBLE
      case BOOLEAN => Types.BOOLEAN
      case DATE => Types.SQL_DATE
      case DATETIME => Types.SQL_TIMESTAMP
      case DECIMAL => Types.DECIMAL
    }
  }
}


 val myConsumer: FlinkKafkaConsumer010[Row] = new FlinkKafkaConsumer010(topics, new WormholeDeserializationSchema(jsonSchema), properties)
 val inputStream: DataStream[Row] = env.addSource(myConsumer)
 val tableEnv = TableEnvironment.getTableEnvironment(env)<<—————exception here



Thanks !
sen
sen
Reply | Threaded
Open this post in threaded view
|

Re: Datastream[Row] covert to table exception

sen
I’m sorry,  the whole code is:



 class WormholeDeserializationSchema(schema: String) extends KeyedDeserializationSchema[Row] {
  //var keyValueTopic:KeyValueTopic = _


  override def deserialize(messageKey: Array[Byte], message: Array[Byte], topic: String, partition: Int, offset: Long) = {
    val deserializationSchema = new SimpleStringSchema()
    val key = if (messageKey != null) deserializationSchema.deserialize(messageKey) else null
    val value = if (message != null) deserializationSchema.deserialize(message) else null
    val ums = UmsSchemaUtils.toUms(value)
    ums.payload_get.map(_.tuple).map(tuple => Row.of(tuple: _*)).head
  }

  override def isEndOfStream(nextElement: Row): Boolean = false

  override def getProducedType: TypeInformation[Row] = {
    val (fieldNameArray, fieldTypeArray) = getSchemaMap(schema)
    val types =new RowTypeInfo(fieldTypeArray,fieldNameArray)
    types
  }

  private def getSchemaMap(jsonSchema: String) = {
    val umsSchema = JsonUtils.json2caseClass[UmsSchema](jsonSchema)
    val fields = umsSchema.fields_get
    val fieldNameList = ListBuffer.empty[String]
    val fieldTypeList = ListBuffer.empty[TypeInformation[_]]
    fields.foreach {
      field =>
        fieldNameList.append(field.name)
        fieldTypeList.append(fieldTypeMatch(field.`type`))
    }
    println(fieldNameList)
    println(fieldTypeList)
    (fieldNameList.toArray, fieldTypeList.toArray)
  }


  private def fieldTypeMatch(umsFieldType: UmsFieldType): TypeInformation[_] = {
    umsFieldType match {
      case STRING => Types.STRING
      case INT => Types.INT
      case LONG => Types.LONG
      case FLOAT => Types.FLOAT
      case DOUBLE => Types.DOUBLE
      case BOOLEAN => Types.BOOLEAN
      case DATE => Types.SQL_DATE
      case DATETIME => Types.SQL_TIMESTAMP
      case DECIMAL => Types.DECIMAL
    }
  }
}



 val inputStream: DataStream[Row] = env.addSource(myConsumer)
 val tableEnv = TableEnvironment.getTableEnvironment(env)
Thanks~

sen

在 2018年6月6日,下午7:22,孙森 <[hidden email]> 写道:

Hi ,
    
     

I've tried to to specify such a schema, when I read from kafka, and covert inputstream to table . But I got the exception:

  • Exception in thread "main" org.apache.flink.table.api.TableException: An input of GenericTypeInfo cannot be converted to Table. Please specify the type of the input with a RowTypeInfo

And the code here:


private def getSchemaMap(jsonSchema: String) = {
    val umsSchema = JsonUtils.json2caseClass[UmsSchema](jsonSchema)
    val fields = umsSchema.fields_get
    val fieldNameList = ListBuffer.empty[String]
    val fieldTypeList = ListBuffer.empty[TypeInformation[_]]
    fields.foreach {
      field =>
        fieldNameList.append(field.name)
        fieldTypeList.append(fieldTypeMatch(field.`type`))
    }
    println(fieldNameList)
    println(fieldTypeList)
    (fieldNameList.toArray, fieldTypeList.toArray)
  }


  private def fieldTypeMatch(umsFieldType: UmsFieldType): TypeInformation[_] = {
    umsFieldType match {
      case STRING => Types.STRING
      case INT => Types.INT
      case LONG => Types.LONG
      case FLOAT => Types.FLOAT
      case DOUBLE => Types.DOUBLE
      case BOOLEAN => Types.BOOLEAN
      case DATE => Types.SQL_DATE
      case DATETIME => Types.SQL_TIMESTAMP
      case DECIMAL => Types.DECIMAL
    }
  }
}


 val myConsumer: FlinkKafkaConsumer010[Row] = new FlinkKafkaConsumer010(topics, new WormholeDeserializationSchema(jsonSchema), properties)
 val inputStream: DataStream[Row] = env.addSource(myConsumer)
 val tableEnv = TableEnvironment.getTableEnvironment(env)<<—————exception here



Thanks !
sen

Reply | Threaded
Open this post in threaded view
|

Re: Datastream[Row] covert to table exception

Timo Walther
In reply to this post by sen
Hi,

Row is a very special datatype where Flink cannot generate serializers based on the generics. By default DeserializationSchema uses reflection-based type analysis, you need to override the getResultType() method in WormholeDeserializationSchema. And specify the type information manually there.

Hope this helps.

Regards,
Timo

Am 06.06.18 um 13:22 schrieb 孙森:
Hi ,
    
     

I've tried to to specify such a schema, when I read from kafka, and covert inputstream to table . But I got the exception:

  • Exception in thread "main" org.apache.flink.table.api.TableException: An input of GenericTypeInfo cannot be converted to Table. Please specify the type of the input with a RowTypeInfo

And the code here:


private def getSchemaMap(jsonSchema: String) = {
    val umsSchema = JsonUtils.json2caseClass[UmsSchema](jsonSchema)
    val fields = umsSchema.fields_get
    val fieldNameList = ListBuffer.empty[String]
    val fieldTypeList = ListBuffer.empty[TypeInformation[_]]
    fields.foreach {
      field =>
        fieldNameList.append(field.name)
        fieldTypeList.append(fieldTypeMatch(field.`type`))
    }
    println(fieldNameList)
    println(fieldTypeList)
    (fieldNameList.toArray, fieldTypeList.toArray)
  }


  private def fieldTypeMatch(umsFieldType: UmsFieldType): TypeInformation[_] = {
    umsFieldType match {
      case STRING => Types.STRING
      case INT => Types.INT
      case LONG => Types.LONG
      case FLOAT => Types.FLOAT
      case DOUBLE => Types.DOUBLE
      case BOOLEAN => Types.BOOLEAN
      case DATE => Types.SQL_DATE
      case DATETIME => Types.SQL_TIMESTAMP
      case DECIMAL => Types.DECIMAL
    }
  }
}


 val myConsumer: FlinkKafkaConsumer010[Row] = new FlinkKafkaConsumer010(topics, new WormholeDeserializationSchema(jsonSchema), properties)
 val inputStream: DataStream[Row] = env.addSource(myConsumer)
 val tableEnv = TableEnvironment.getTableEnvironment(env)<<—————exception here



Thanks !
sen


Reply | Threaded
Open this post in threaded view
|

Re: Datastream[Row] covert to table exception

Timo Walther
Sorry, I didn't see you last mail. The code looks good actually. What is the result of `inputStream.getType` if you print it to the console?

Timo

Am 07.06.18 um 08:24 schrieb Timo Walther:
Hi,

Row is a very special datatype where Flink cannot generate serializers based on the generics. By default DeserializationSchema uses reflection-based type analysis, you need to override the getResultType() method in WormholeDeserializationSchema. And specify the type information manually there.

Hope this helps.

Regards,
Timo

Am 06.06.18 um 13:22 schrieb 孙森:
Hi ,
    
     

I've tried to to specify such a schema, when I read from kafka, and covert inputstream to table . But I got the exception:

  • Exception in thread "main" org.apache.flink.table.api.TableException: An input of GenericTypeInfo cannot be converted to Table. Please specify the type of the input with a RowTypeInfo

And the code here:


private def getSchemaMap(jsonSchema: String) = {
    val umsSchema = JsonUtils.json2caseClass[UmsSchema](jsonSchema)
    val fields = umsSchema.fields_get
    val fieldNameList = ListBuffer.empty[String]
    val fieldTypeList = ListBuffer.empty[TypeInformation[_]]
    fields.foreach {
      field =>
        fieldNameList.append(field.name)
        fieldTypeList.append(fieldTypeMatch(field.`type`))
    }
    println(fieldNameList)
    println(fieldTypeList)
    (fieldNameList.toArray, fieldTypeList.toArray)
  }


  private def fieldTypeMatch(umsFieldType: UmsFieldType): TypeInformation[_] = {
    umsFieldType match {
      case STRING => Types.STRING
      case INT => Types.INT
      case LONG => Types.LONG
      case FLOAT => Types.FLOAT
      case DOUBLE => Types.DOUBLE
      case BOOLEAN => Types.BOOLEAN
      case DATE => Types.SQL_DATE
      case DATETIME => Types.SQL_TIMESTAMP
      case DECIMAL => Types.DECIMAL
    }
  }
}


 val myConsumer: FlinkKafkaConsumer010[Row] = new FlinkKafkaConsumer010(topics, new WormholeDeserializationSchema(jsonSchema), properties)
 val inputStream: DataStream[Row] = env.addSource(myConsumer)
 val tableEnv = TableEnvironment.getTableEnvironment(env)<<—————exception here



Thanks !
sen



Reply | Threaded
Open this post in threaded view
|

Re: Datastream[Row] covert to table exception

Timo Walther
Yes, that's a workaround. I found the cause of the problem. It is a Scala API specific problem.

See: https://issues.apache.org/jira/browse/FLINK-9556

Thanks for reporting it!

Regards,
Timo

Am 08.06.18 um 09:43 schrieb 孙森:
Yes,I really override the method, but it did not work.  Finally ,I used ds.map()(Types.ROW()),then it works fine, but I did't know why.   The code is
val inputStream: DataStream[Row] = env.addSource(myConsumer)(Types.ROW(fieldNameArray, flinkTypeArray))


在 2018年6月8日,下午3:15,Timo Walther <[hidden email]> 写道:

Can you verify with a debugger if you really override the method. It seems to be that your created type information is either not called/not used.

Regards,
Timo

Am 07.06.18 um 09:03 schrieb 孙森:
Hi,Timo

       Thank you for the reply.The `inputStream.getType` is   GenericTypeInfo<Row>.

Thanks~

sen


在 2018年6月7日,下午2:28,Timo Walther <[hidden email]> 写道:

Sorry, I didn't see you last mail. The code looks good actually. What is the result of `inputStream.getType` if you print it to the console?

Timo

Am 07.06.18 um 08:24 schrieb Timo Walther:
Hi,

Row is a very special datatype where Flink cannot generate serializers based on the generics. By default DeserializationSchema uses reflection-based type analysis, you need to override the getResultType() method in WormholeDeserializationSchema. And specify the type information manually there.

Hope this helps.

Regards,
Timo

Am 06.06.18 um 13:22 schrieb 孙森:
Hi ,
    
     

I've tried to to specify such a schema, when I read from kafka, and covert inputstream to table . But I got the exception:

  • Exception in thread "main" org.apache.flink.table.api.TableException: An input of GenericTypeInfo cannot be converted to Table. Please specify the type of the input with a RowTypeInfo

And the code here:


private def getSchemaMap(jsonSchema: String) = {
    val umsSchema = JsonUtils.json2caseClass[UmsSchema](jsonSchema)
    val fields = umsSchema.fields_get
    val fieldNameList = ListBuffer.empty[String]
    val fieldTypeList = ListBuffer.empty[TypeInformation[_]]
    fields.foreach {
      field =>
        fieldNameList.append(field.name)
        fieldTypeList.append(fieldTypeMatch(field.`type`))
    }
    println(fieldNameList)
    println(fieldTypeList)
    (fieldNameList.toArray, fieldTypeList.toArray)
  }


  private def fieldTypeMatch(umsFieldType: UmsFieldType): TypeInformation[_] = {
    umsFieldType match {
      case STRING => Types.STRING
      case INT => Types.INT
      case LONG => Types.LONG
      case FLOAT => Types.FLOAT
      case DOUBLE => Types.DOUBLE
      case BOOLEAN => Types.BOOLEAN
      case DATE => Types.SQL_DATE
      case DATETIME => Types.SQL_TIMESTAMP
      case DECIMAL => Types.DECIMAL
    }
  }
}


 val myConsumer: FlinkKafkaConsumer010[Row] = new FlinkKafkaConsumer010(topics, new WormholeDeserializationSchema(jsonSchema), properties)
 val inputStream: DataStream[Row] = env.addSource(myConsumer)
 val tableEnv = TableEnvironment.getTableEnvironment(env)<<—————exception here



Thanks !
sen