hi flink1.10,wen i want to sink data to hbase table like this: bstEnv.sqlUpdate("""CREATE TABLE circle_weight ( rowkey String, info ROW<score double> ) WITH ( 'connector.type' = 'hbase', 'connector.version' = '1.4.3', 'connector.table-name' = 'ms:test_circle_info', 'connector.zookeeper.quorum' = 'localhost:2181', 'connector.zookeeper.znode.parent' = '/hbase-secure', 'connector.write.buffer-flush.max-size' = '10mb', 'connector.write.buffer-flush.max-rows' = '1000', 'connector.write.buffer-flush.interval' = '2s' )""") bstEnv.sqlUpdate( """ |insert into circle_weight |select |concat_ws('_',circleName,dt) rowkey, |active_ratio*25 score |from tb""") but i get following exceptions,can anybody tell me what is wrong? Exception in thread "main" org.apache.flink.table.api.ValidationException: Field types of query result and registered TableSink default_catalog.default_database.circle_weight do not match. Query schema: [rowkey: STRING, score: DOUBLE] Sink schema: [rowkey: STRING, info: ROW<`score` DOUBLE>] at org.apache.flink.table.planner.sinks.TableSinkUtils$.validateSchemaAndApplyImplicitCast(TableSinkUtils.scala:96) at org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:198) at org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:190) at scala.Option.map(Option.scala:146) at org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:190) at org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:150) at org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:150) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.Iterator$class.foreach(Iterator.scala:891) at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.AbstractTraversable.map(Traversable.scala:104) at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:150) at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:682) at org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:495) at com.souhu.msns.huyou.test.table.sql.CircleWeightRank$.main(CircleWeightRank.scala:170) at com.souhu.msns.huyou.test.table.sql.CircleWeightRank.main(CircleWeightRank.scala) |
hi, you should make sure the types of the selected fields and the types of sink table are the same, otherwise you will get the above exception. you can change `active_ratio*25 score` to row type, just like: insert into circle_weight select rowkey, ROW(info) from ( select concat_ws('_',circleName,dt) rowkey, active_ratio*25 score as info from tb) t; Best, Godfrey op <[hidden email]> 于2020年6月11日周四 下午3:31写道:
|
In reply to this post by op
Hi, The stack trace indicates that your query schema does not match with your sink schema. It seems that `active_ratio*25 score` in your query is a double value, not a `ROW<score double>` you declared in your sink. op <[hidden email]> 于2020年6月11日周四 下午3:31写道:
|
Free forum by Nabble | Edit this page |