Re: Exception : Table of atomic type can only have a single field, when transferring DataStream to Table ?

Posted by Haohui Mai on
URL: http://deprecated-apache-flink-user-mailing-list-archive.369.s1.nabble.com/Exception-Table-of-atomic-type-can-only-have-a-single-field-when-transferring-DataStream-to-Table-tp15790p15791.html

Hi,

I think instead of generating DataStream[BillCount], the correct way is to generate DataStream[Row], that is,

kafkaInputStream.map(value -> Row.of(value.getLogis_id, value.getProvince_id, value.getCity_id, value.getOrder_require_varieties, value.getOrder_rec_amount, value.getStore_rec_date.getTime)

That should work.

Regards,
Haohui



On Sun, Sep 24, 2017 at 6:40 PM [hidden email] <[hidden email]> wrote:
Hi,
     I‘m confused about a problem, occuring a exception "org.apache.flink.table.api.TableException: Table of atomic type can only have a single field.
     Both BillCount and Record are class object.  Following is code.
        
       case  class BillCount(logisId: Int, provinceId: Int, cityId: Int, orderRequVari: Int, orderRecAmount: Double, orderRecDate: Long)
       val kafkaInputStream: DataStream[Record] = env.addSource(source)   //source is FlinkKafkaConsumer010 source
       val tbDataStream : DataStream[BillCount] = kafkaInputStream.map(
                  new MapFunction[Record, BillCount] {
                    override def map(value: Record) = {
                      BillCount(value.getLogis_id, value.getProvince_id, value.getCity_id,
                              value.getOrder_require_varieties, value.getOrder_rec_amount, value.getStore_rec_date.getTime)
        }
      })
     val stream = tbDataStream.toTable(tbEnv, 'logisId, 'provinceId, 'cityId, 'orderRequVari, 'orderRecAmount, 'orderRecDate)       // occur error here

      
    Error :
    Exception in thread "main" org.apache.flink.table.api.TableException: Table of atomic type can only have a single field.
at org.apache.flink.table.api.TableEnvironment$$anonfun$1.apply(TableEnvironment.scala:627)
at org.apache.flink.table.api.TableEnvironment$$anonfun$1.apply(TableEnvironment.scala:624)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)
at scala.collection.mutable.ArrayOps$ofRef.flatMap(ArrayOps.scala:108)
at org.apache.flink.table.api.TableEnvironment.getFieldInfo(TableEnvironment.scala:624)
at org.apache.flink.table.api.StreamTableEnvironment.registerDataStreamInternal(StreamTableEnvironment.scala:398)
at org.apache.flink.table.api.scala.StreamTableEnvironment.fromDataStream(StreamTableEnvironment.scala:85)
at org.apache.flink.table.api.scala.DataStreamConversions.toTable(DataStreamConversions.scala:58)


  Thanks.