Re: How to use Hbase Connector Sink

Posted by godfrey he on
URL: http://deprecated-apache-flink-user-mailing-list-archive.369.s1.nabble.com/How-to-use-Hbase-Connector-Sink-tp35871p35872.html

hi, 

you should make sure the types of the selected fields and the types of sink table are the same, 
otherwise you will get the above exception. you can change `active_ratio*25 score` to row type, just like:

insert into circle_weight select rowkey, ROW(info) from (
select concat_ws('_',circleName,dt) rowkey, active_ratio*25 score as info from tb) t;


Best,
Godfrey

op <[hidden email]> 于2020年6月11日周四 下午3:31写道:
hi 
flink1.10,wen i want to sink data to hbase table like this:

 bstEnv.sqlUpdate("""CREATE TABLE circle_weight (
                           rowkey String,
                           info ROW<score double>
                         ) WITH (
                           'connector.type' = 'hbase',
                           'connector.version' = '1.4.3',
                           'connector.table-name' = 'ms:test_circle_info',
                           'connector.zookeeper.quorum' = 'localhost:2181',
                           'connector.zookeeper.znode.parent' = '/hbase-secure',
                           'connector.write.buffer-flush.max-size' = '10mb',
                           'connector.write.buffer-flush.max-rows' = '1000',
                           'connector.write.buffer-flush.interval' = '2s'
                         )""")
 
    bstEnv.sqlUpdate(
      """
        |insert into circle_weight
        |select
        |concat_ws('_',circleName,dt) rowkey,
        |active_ratio*25 score
        |from tb""")

but i get following exceptions,can anybody tell me what is wrong?

Exception in thread "main" org.apache.flink.table.api.ValidationException: Field types of query result and registered TableSink default_catalog.default_database.circle_weight do not match.
Query schema: [rowkey: STRING, score: DOUBLE]
Sink schema: [rowkey: STRING, info: ROW<`score` DOUBLE>]
at org.apache.flink.table.planner.sinks.TableSinkUtils$.validateSchemaAndApplyImplicitCast(TableSinkUtils.scala:96)
at org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:198)
at org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:190)
at scala.Option.map(Option.scala:146)
at org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:190)
at org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:150)
at org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:150)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:150)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:682)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:495)
at com.souhu.msns.huyou.test.table.sql.CircleWeightRank$.main(CircleWeightRank.scala:170)
at com.souhu.msns.huyou.test.table.sql.CircleWeightRank.main(CircleWeightRank.scala)