Exception : Table of atomic type can only have a single field, when transferring DataStream to Table ?

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

Exception : Table of atomic type can only have a single field, when transferring DataStream to Table ?

laney0606@163.com
Hi,
     I‘m confused about a problem, occuring a exception "org.apache.flink.table.api.TableException: Table of atomic type can only have a single field.
     Both BillCount and Record are class object.  Following is code.
        
       case  class BillCount(logisId: Int, provinceId: Int, cityId: Int, orderRequVari: Int, orderRecAmount: Double, orderRecDate: Long)
       val kafkaInputStream: DataStream[Record] = env.addSource(source)   //source is FlinkKafkaConsumer010 source
       val tbDataStream : DataStream[BillCount] = kafkaInputStream.map(
                  new MapFunction[Record, BillCount] {
                    override def map(value: Record) = {
                      BillCount(value.getLogis_id, value.getProvince_id, value.getCity_id,
                              value.getOrder_require_varieties, value.getOrder_rec_amount, value.getStore_rec_date.getTime)
        }
      })
     val stream = tbDataStream.toTable(tbEnv, 'logisId, 'provinceId, 'cityId, 'orderRequVari, 'orderRecAmount, 'orderRecDate)       // occur error here

      
    Error :
    Exception in thread "main" org.apache.flink.table.api.TableException: Table of atomic type can only have a single field.
at org.apache.flink.table.api.TableEnvironment$$anonfun$1.apply(TableEnvironment.scala:627)
at org.apache.flink.table.api.TableEnvironment$$anonfun$1.apply(TableEnvironment.scala:624)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)
at scala.collection.mutable.ArrayOps$ofRef.flatMap(ArrayOps.scala:108)
at org.apache.flink.table.api.TableEnvironment.getFieldInfo(TableEnvironment.scala:624)
at org.apache.flink.table.api.StreamTableEnvironment.registerDataStreamInternal(StreamTableEnvironment.scala:398)
at org.apache.flink.table.api.scala.StreamTableEnvironment.fromDataStream(StreamTableEnvironment.scala:85)
at org.apache.flink.table.api.scala.DataStreamConversions.toTable(DataStreamConversions.scala:58)


  Thanks.


Reply | Threaded
Open this post in threaded view
|

Re: Exception : Table of atomic type can only have a single field, when transferring DataStream to Table ?

Haohui Mai
Hi,

I think instead of generating DataStream[BillCount], the correct way is to generate DataStream[Row], that is,

kafkaInputStream.map(value -> Row.of(value.getLogis_id, value.getProvince_id, value.getCity_id, value.getOrder_require_varieties, value.getOrder_rec_amount, value.getStore_rec_date.getTime)

That should work.

Regards,
Haohui



On Sun, Sep 24, 2017 at 6:40 PM [hidden email] <[hidden email]> wrote:
Hi,
     I‘m confused about a problem, occuring a exception "org.apache.flink.table.api.TableException: Table of atomic type can only have a single field.
     Both BillCount and Record are class object.  Following is code.
        
       case  class BillCount(logisId: Int, provinceId: Int, cityId: Int, orderRequVari: Int, orderRecAmount: Double, orderRecDate: Long)
       val kafkaInputStream: DataStream[Record] = env.addSource(source)   //source is FlinkKafkaConsumer010 source
       val tbDataStream : DataStream[BillCount] = kafkaInputStream.map(
                  new MapFunction[Record, BillCount] {
                    override def map(value: Record) = {
                      BillCount(value.getLogis_id, value.getProvince_id, value.getCity_id,
                              value.getOrder_require_varieties, value.getOrder_rec_amount, value.getStore_rec_date.getTime)
        }
      })
     val stream = tbDataStream.toTable(tbEnv, 'logisId, 'provinceId, 'cityId, 'orderRequVari, 'orderRecAmount, 'orderRecDate)       // occur error here

      
    Error :
    Exception in thread "main" org.apache.flink.table.api.TableException: Table of atomic type can only have a single field.
at org.apache.flink.table.api.TableEnvironment$$anonfun$1.apply(TableEnvironment.scala:627)
at org.apache.flink.table.api.TableEnvironment$$anonfun$1.apply(TableEnvironment.scala:624)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)
at scala.collection.mutable.ArrayOps$ofRef.flatMap(ArrayOps.scala:108)
at org.apache.flink.table.api.TableEnvironment.getFieldInfo(TableEnvironment.scala:624)
at org.apache.flink.table.api.StreamTableEnvironment.registerDataStreamInternal(StreamTableEnvironment.scala:398)
at org.apache.flink.table.api.scala.StreamTableEnvironment.fromDataStream(StreamTableEnvironment.scala:85)
at org.apache.flink.table.api.scala.DataStreamConversions.toTable(DataStreamConversions.scala:58)


  Thanks.


Reply | Threaded
Open this post in threaded view
|

Re: Exception : Table of atomic type can only have a single field, when transferring DataStream to Table ?

Timo Walther
Hi,

I also replied to your Stackoverflow question. I think the problem is that BillCount has the wrong type and is therefore treated as one single black box.

Haohui's suggestion will no work because the row type needs information about the fields.  The easiest thing is to figure out why BillCount has the wrong type. Make sure that it is defined in a statically.

What type is Record? Maybe you don't need the additional MapFunction but can use the Table API for mapping.

Regards,
Timo


Am 9/25/17 um 9:29 AM schrieb Haohui Mai:
Hi,

I think instead of generating DataStream[BillCount], the correct way is to generate DataStream[Row], that is,

kafkaInputStream.map(value -> Row.of(value.getLogis_id, value.getProvince_id, value.getCity_id, value.getOrder_require_varieties, value.getOrder_rec_amount, value.getStore_rec_date.getTime)

That should work.

Regards,
Haohui



On Sun, Sep 24, 2017 at 6:40 PM [hidden email] <[hidden email]> wrote:
Hi,
     I‘m confused about a problem, occuring a exception "org.apache.flink.table.api.TableException: Table of atomic type can only have a single field.
     Both BillCount and Record are class object.  Following is code.
        
       case  class BillCount(logisId: Int, provinceId: Int, cityId: Int, orderRequVari: Int, orderRecAmount: Double, orderRecDate: Long)
       val kafkaInputStream: DataStream[Record] = env.addSource(source)   //source is FlinkKafkaConsumer010 source
       val tbDataStream : DataStream[BillCount] = kafkaInputStream.map(
                  new MapFunction[Record, BillCount] {
                    override def map(value: Record) = {
                      BillCount(value.getLogis_id, value.getProvince_id, value.getCity_id,
                              value.getOrder_require_varieties, value.getOrder_rec_amount, value.getStore_rec_date.getTime)
        }
      })
     val stream = tbDataStream.toTable(tbEnv, 'logisId, 'provinceId, 'cityId, 'orderRequVari, 'orderRecAmount, 'orderRecDate)       // occur error here

      
    Error :
    Exception in thread "main" org.apache.flink.table.api.TableException: Table of atomic type can only have a single field.
at org.apache.flink.table.api.TableEnvironment$$anonfun$1.apply(TableEnvironment.scala:627)
at org.apache.flink.table.api.TableEnvironment$$anonfun$1.apply(TableEnvironment.scala:624)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)
at scala.collection.mutable.ArrayOps$ofRef.flatMap(ArrayOps.scala:108)
at org.apache.flink.table.api.TableEnvironment.getFieldInfo(TableEnvironment.scala:624)
at org.apache.flink.table.api.StreamTableEnvironment.registerDataStreamInternal(StreamTableEnvironment.scala:398)
at org.apache.flink.table.api.scala.StreamTableEnvironment.fromDataStream(StreamTableEnvironment.scala:85)
at org.apache.flink.table.api.scala.DataStreamConversions.toTable(DataStreamConversions.scala:58)


  Thanks.