How to use Hbase Connector Sink

classic Classic list List threaded Threaded
3 messages Options
op
Reply | Threaded
Open this post in threaded view
|

How to use Hbase Connector Sink

op
hi 
flink1.10,wen i want to sink data to hbase table like this:

 bstEnv.sqlUpdate("""CREATE TABLE circle_weight (
                           rowkey String,
                           info ROW<score double>
                         ) WITH (
                           'connector.type' = 'hbase',
                           'connector.version' = '1.4.3',
                           'connector.table-name' = 'ms:test_circle_info',
                           'connector.zookeeper.quorum' = 'localhost:2181',
                           'connector.zookeeper.znode.parent' = '/hbase-secure',
                           'connector.write.buffer-flush.max-size' = '10mb',
                           'connector.write.buffer-flush.max-rows' = '1000',
                           'connector.write.buffer-flush.interval' = '2s'
                         )""")
 
    bstEnv.sqlUpdate(
      """
        |insert into circle_weight
        |select
        |concat_ws('_',circleName,dt) rowkey,
        |active_ratio*25 score
        |from tb""")

but i get following exceptions,can anybody tell me what is wrong?

Exception in thread "main" org.apache.flink.table.api.ValidationException: Field types of query result and registered TableSink default_catalog.default_database.circle_weight do not match.
Query schema: [rowkey: STRING, score: DOUBLE]
Sink schema: [rowkey: STRING, info: ROW<`score` DOUBLE>]
at org.apache.flink.table.planner.sinks.TableSinkUtils$.validateSchemaAndApplyImplicitCast(TableSinkUtils.scala:96)
at org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:198)
at org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:190)
at scala.Option.map(Option.scala:146)
at org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:190)
at org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:150)
at org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:150)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:150)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:682)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:495)
at com.souhu.msns.huyou.test.table.sql.CircleWeightRank$.main(CircleWeightRank.scala:170)
at com.souhu.msns.huyou.test.table.sql.CircleWeightRank.main(CircleWeightRank.scala)
Reply | Threaded
Open this post in threaded view
|

Re: How to use Hbase Connector Sink

godfrey he
hi, 

you should make sure the types of the selected fields and the types of sink table are the same, 
otherwise you will get the above exception. you can change `active_ratio*25 score` to row type, just like:

insert into circle_weight select rowkey, ROW(info) from (
select concat_ws('_',circleName,dt) rowkey, active_ratio*25 score as info from tb) t;


Best,
Godfrey

op <[hidden email]> 于2020年6月11日周四 下午3:31写道:
hi 
flink1.10,wen i want to sink data to hbase table like this:

 bstEnv.sqlUpdate("""CREATE TABLE circle_weight (
                           rowkey String,
                           info ROW<score double>
                         ) WITH (
                           'connector.type' = 'hbase',
                           'connector.version' = '1.4.3',
                           'connector.table-name' = 'ms:test_circle_info',
                           'connector.zookeeper.quorum' = 'localhost:2181',
                           'connector.zookeeper.znode.parent' = '/hbase-secure',
                           'connector.write.buffer-flush.max-size' = '10mb',
                           'connector.write.buffer-flush.max-rows' = '1000',
                           'connector.write.buffer-flush.interval' = '2s'
                         )""")
 
    bstEnv.sqlUpdate(
      """
        |insert into circle_weight
        |select
        |concat_ws('_',circleName,dt) rowkey,
        |active_ratio*25 score
        |from tb""")

but i get following exceptions,can anybody tell me what is wrong?

Exception in thread "main" org.apache.flink.table.api.ValidationException: Field types of query result and registered TableSink default_catalog.default_database.circle_weight do not match.
Query schema: [rowkey: STRING, score: DOUBLE]
Sink schema: [rowkey: STRING, info: ROW<`score` DOUBLE>]
at org.apache.flink.table.planner.sinks.TableSinkUtils$.validateSchemaAndApplyImplicitCast(TableSinkUtils.scala:96)
at org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:198)
at org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:190)
at scala.Option.map(Option.scala:146)
at org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:190)
at org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:150)
at org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:150)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:150)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:682)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:495)
at com.souhu.msns.huyou.test.table.sql.CircleWeightRank$.main(CircleWeightRank.scala:170)
at com.souhu.msns.huyou.test.table.sql.CircleWeightRank.main(CircleWeightRank.scala)
Reply | Threaded
Open this post in threaded view
|

Re: How to use Hbase Connector Sink

Caizhi Weng
In reply to this post by op
Hi,

The stack trace indicates that your query schema does not match with your sink schema. It seems that `active_ratio*25 score` in your query is a double value, not a `ROW<score double>` you declared in your sink.

op <[hidden email]> 于2020年6月11日周四 下午3:31写道:
hi 
flink1.10,wen i want to sink data to hbase table like this:

 bstEnv.sqlUpdate("""CREATE TABLE circle_weight (
                           rowkey String,
                           info ROW<score double>
                         ) WITH (
                           'connector.type' = 'hbase',
                           'connector.version' = '1.4.3',
                           'connector.table-name' = 'ms:test_circle_info',
                           'connector.zookeeper.quorum' = 'localhost:2181',
                           'connector.zookeeper.znode.parent' = '/hbase-secure',
                           'connector.write.buffer-flush.max-size' = '10mb',
                           'connector.write.buffer-flush.max-rows' = '1000',
                           'connector.write.buffer-flush.interval' = '2s'
                         )""")
 
    bstEnv.sqlUpdate(
      """
        |insert into circle_weight
        |select
        |concat_ws('_',circleName,dt) rowkey,
        |active_ratio*25 score
        |from tb""")

but i get following exceptions,can anybody tell me what is wrong?

Exception in thread "main" org.apache.flink.table.api.ValidationException: Field types of query result and registered TableSink default_catalog.default_database.circle_weight do not match.
Query schema: [rowkey: STRING, score: DOUBLE]
Sink schema: [rowkey: STRING, info: ROW<`score` DOUBLE>]
at org.apache.flink.table.planner.sinks.TableSinkUtils$.validateSchemaAndApplyImplicitCast(TableSinkUtils.scala:96)
at org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:198)
at org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:190)
at scala.Option.map(Option.scala:146)
at org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:190)
at org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:150)
at org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:150)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:150)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:682)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:495)
at com.souhu.msns.huyou.test.table.sql.CircleWeightRank$.main(CircleWeightRank.scala:170)
at com.souhu.msns.huyou.test.table.sql.CircleWeightRank.main(CircleWeightRank.scala)