Hi, I‘m confused about a problem, occuring a exception "org.apache.flink.table.api.TableException: Table of atomic type can only have a single field." Both BillCount and Record are class object. Following is code. case class BillCount(logisId: Int, provinceId: Int, cityId: Int, orderRequVari: Int, orderRecAmount: Double, orderRecDate: Long) val kafkaInputStream: DataStream[Record] = env.addSource(source) //source is FlinkKafkaConsumer010 source val tbDataStream : DataStream[BillCount] = kafkaInputStream.map( new MapFunction[Record, BillCount] {override def map(value: Record) = { BillCount(value.getLogis_id, value.getProvince_id, value.getCity_id, value.getOrder_require_varieties, value.getOrder_rec_amount, value.getStore_rec_date.getTime) } }) val stream = tbDataStream.toTable(tbEnv, 'logisId, 'provinceId, 'cityId, 'orderRequVari, 'orderRecAmount, 'orderRecDate) // occur error here Error : Exception in thread "main" org.apache.flink.table.api.TableException: Table of atomic type can only have a single field. at org.apache.flink.table.api.TableEnvironment$$anonfun$1.apply(TableEnvironment.scala:627) at org.apache.flink.table.api.TableEnvironment$$anonfun$1.apply(TableEnvironment.scala:624) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251) at scala.collection.mutable.ArrayOps$ofRef.flatMap(ArrayOps.scala:108) at org.apache.flink.table.api.TableEnvironment.getFieldInfo(TableEnvironment.scala:624) at org.apache.flink.table.api.StreamTableEnvironment.registerDataStreamInternal(StreamTableEnvironment.scala:398) at org.apache.flink.table.api.scala.StreamTableEnvironment.fromDataStream(StreamTableEnvironment.scala:85) at org.apache.flink.table.api.scala.DataStreamConversions.toTable(DataStreamConversions.scala:58) Thanks. |
Hi, I think instead of generating DataStream[BillCount], the correct way is to generate DataStream[Row], that is, kafkaInputStream.map(value -> Row.of(value.getLogis_id, value.getProvince_id, value.getCity_id, value.getOrder_require_varieties, value.getOrder_rec_amount, value.getStore_rec_date.getTime) That should work. Regards, Haohui
|
Hi,
I also replied to your Stackoverflow question. I think the problem is that BillCount has the wrong type and is therefore treated as one single black box. Haohui's suggestion will no work because the row type needs information about the fields. The easiest thing is to figure out why BillCount has the wrong type. Make sure that it is defined in a statically. What type is Record? Maybe you don't need the additional MapFunction but can use the Table API for mapping. Regards, Timo Am 9/25/17 um 9:29 AM schrieb Haohui Mai:
|
Free forum by Nabble | Edit this page |