I've tried to to specify such a schema, when I read from kafka, and covert inputstream to table . But I got the exception:
And the code here:
private def getSchemaMap(jsonSchema: String) = {
val umsSchema = JsonUtils.json2caseClass[UmsSchema](jsonSchema)
val fields = umsSchema.fields_get
val fieldNameList = ListBuffer.empty[String]
val fieldTypeList = ListBuffer.empty[TypeInformation[_]]
fields.foreach {
field =>
fieldNameList.append(field.name)
fieldTypeList.append(fieldTypeMatch(field.`type`))
}
println(fieldNameList)
println(fieldTypeList)
(fieldNameList.toArray, fieldTypeList.toArray)
}
private def fieldTypeMatch(umsFieldType: UmsFieldType): TypeInformation[_] = {
umsFieldType match {
case STRING => Types.STRING
case INT => Types.INT
case LONG => Types.LONG
case FLOAT => Types.FLOAT
case DOUBLE => Types.DOUBLE
case BOOLEAN => Types.BOOLEAN
case DATE => Types.SQL_DATE
case DATETIME => Types.SQL_TIMESTAMP
case DECIMAL => Types.DECIMAL
}
}
}
val myConsumer: FlinkKafkaConsumer010[Row] = new FlinkKafkaConsumer010(topics, new WormholeDeserializationSchema(jsonSchema), properties)
val inputStream: DataStream[Row] = env.addSource(myConsumer)
val tableEnv = TableEnvironment.getTableEnvironment(env)<<—————exception here
Free forum by Nabble | Edit this page |