HELP!!! Flink1.10 sql insert into HBase, error like validateSchemaAndApplyImplicitCast

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

HELP!!! Flink1.10 sql insert into HBase, error like validateSchemaAndApplyImplicitCast

Jim Chen
Hi,
  I use flink1.10.1 sql to connect hbase1.4.3. When inset into hbase, report an error like validateSchemaAndApplyImplicitCast. Means that the Query Schema and Sink Schema are inconsistent.
  Mainly Row (EXPR$0) in Query Schema, which are all expressions. Sink Schema is Row(device_id). I don't know how to write in sql to be consistent with hbase's sink schema.
  I try to write sql like select device_id as rowkey, ROW( device_id as [cannot write as]  ) as f1

error message as follow:
image.png

sample code like:
HBase sink ddl:
String ddlSource = "CREATE TABLE test_hive_catalog.test_dim.dw_common_mobile_device_user_mapping (\n" +
                "  rowkey STRING,\n" +
                "  f1 ROW< \n" +
                "        device_id STRING,\n" +
                "        pass_id STRING,\n" +
                "        first_date STRING,\n" +
                "        first_channel_id STRING,\n" +
                "        first_app_version STRING,\n" +
                "        first_server_time STRING,\n" +
                "        first_server_hour STRING,\n" +
                "        first_ip_location STRING,\n" +
                "        first_login_time STRING,\n" +
                "        sys_can_uninstall STRING,\n" +
                "        update_date STRING,\n" +
                "        server_time BIGINT,\n" +
                "        last_pass_id STRING,\n" +
                "        last_channel_id STRING,\n" +
                "        last_app_version STRING,\n" +
                "        last_date STRING,\n" +
                "        os STRING,\n" +
                "        attribution_channel_id STRING,\n" +
                "        attribution_first_date STRING,\n" +
                "        p_product STRING,\n" +
                "        p_project STRING,\n" +
                "        p_dt STRING\n" +
                "        >\n" +
                ") WITH (\n" +
                "  'connector.type' = 'hbase',\n" +
                "  'connector.version' = '1.4.3',\n" + // 即使绕过语法编译,换其他版本的hbase,还是有问题,如线上的版本就不行
                "  'connector.table-name' = 'dw_common_mobile_device_user_mapping_new',\n" +
                "  'connector.zookeeper.quorum' = '"+ zookeeperServers +"',\n" +
                "  'connector.zookeeper.znode.parent' = '/hbase143',\n" +
                "  'connector.write.buffer-flush.max-size' = '2mb',\n" +
                "  'connector.write.buffer-flush.max-rows' = '1000',\n" +
                "  'connector.write.buffer-flush.interval' = '2s'\n" +
                ")";

insert into sql:

String bodyAndLocalSql = "" +
//                "insert into test_hive_catalog.test_dim.dw_common_mobile_device_user_mapping " +
                "SELECT CAST(rowkey AS STRING) AS rowkey, " +
                " ROW(" +
                " device_id, pass_id, first_date, first_channel_id, first_app_version, first_server_time, first_server_hour, first_ip_location, first_login_time, sys_can_uninstall, update_date, server_time, last_pass_id, last_channel_id, last_app_version, last_date, os, attribution_channel_id, attribution_first_date, p_product, p_project, p_dt " +
                ") AS f1" +
                " FROM " +
                "(" +
                " SELECT " +
                " MD5(CONCAT_WS('|', kafka.uid, kafka.p_product, kafka.p_project)) AS rowkey, " +
                " kafka.uid AS device_id " +
                ",kafka.pass_id " +

                // first_date
                ",CASE WHEN COALESCE(hbase.server_time, 0) <= kafka.server_time " +
                // 新用户
                " THEN FROM_UNIXTIME(kafka.server_time, 'yyyy-MM-dd') " +
                // 老用户
                " ELSE hbase.first_date END AS first_date " +

                // first_channel_id
                ",CASE WHEN COALESCE(hbase.server_time, 0) <= kafka.server_time " +
                // 新用户
                " THEN kafka.wlb_channel_id" +
                // 老用户
                " ELSE hbase.first_channel_id END AS first_channel_id " +

                // first_app_version
                ",CASE WHEN COALESCE(hbase.server_time, 0) <= kafka.server_time " +
                // 新用户
                " THEN kafka.app_version " +
                // 老用户
                " ELSE hbase.first_app_version END AS first_app_version " +

                // first_server_time
                ",CASE WHEN COALESCE(hbase.server_time, 0) <= kafka.server_time " +
                // 新用户
                " THEN FROM_UNIXTIME(kafka.server_time, 'yyyy-MM-dd HH:mm:ss') " +
                // 老用户
                " ELSE hbase.first_server_time END AS first_server_time " +

                // first_server_hour
                ",CASE WHEN COALESCE(hbase.server_time, 0) <= kafka.server_time " +
                // 新用户
                " THEN FROM_UNIXTIME(kafka.server_time, 'HH') " +
                // 老用户
                " ELSE hbase.first_server_hour END AS first_server_hour " +

                // first_ip_location
                ",CASE WHEN COALESCE(hbase.server_time, 0) <= kafka.server_time " +
                // 新用户
                " THEN kafka.ip_location " +
                // 老用户
                " ELSE hbase.first_ip_location END AS first_ip_location " +

                // first_login_time
                ",CASE WHEN COALESCE(hbase.server_time, 0) <= kafka.server_time " +
                // 新用户
                " THEN FROM_UNIXTIME(kafka.server_time, 'yyyy-MM-dd HH:mm:ss') " +
                // 老用户
                " ELSE hbase.first_login_time END AS first_login_time " +

                ",kafka.sys_can_uninstall " +

                // update_date
                ",CASE WHEN hbase.pass_id = 0 " +
                " THEN CAST(FROM_UNIXTIME(kafka.server_time, 'yyyy-MM-dd') AS string) " +
                " END AS update_date " + // VARCHAR(2000)

                // server_time
                ",kafka.server_time" +

                ", kafka.pass_id AS last_pass_id" +
                ", kafka.wlb_channel_id AS last_channel_id" +
                ", kafka.app_version AS last_app_version" +
                ", CAST(FROM_UNIXTIME(kafka.server_time, 'yyyy-MM-dd') AS STRING) AS last_date" + // VARCHAR(2000)
                ", kafka.os" +
                ", hbase.attribution_channel_id" +
                ", hbase.attribution_first_date" +
                ", kafka.p_product" +
                ", kafka.p_project" +
                ", kafka.p_dt" +

                " FROM test_hive_catalog.test_ods.test_ods_header AS kafka " +
                " FULL JOIN test_hive_catalog.test_dim.dw_common_mobile_device_user_mapping AS hbase " +
                " ON kafka.uid = hbase.device_id " + // TODO 这里uid,后面要改成device_id
                " WHERE kafka.is_body=1 AND kafka.is_local=1" +
                ")";


Reply | Threaded
Open this post in threaded view
|

Re: HELP!!! Flink1.10 sql insert into HBase, error like validateSchemaAndApplyImplicitCast

Leonard Xu
Hi, Jim

Could you post error message in text that contains the entire schema of query and sink? I doubt there are some  fields type were mismatched.

Best,
Leonard Xu

在 2020年7月16日,10:29,Jim Chen <[hidden email]> 写道:

Hi,
  I use flink1.10.1 sql to connect hbase1.4.3. When inset into hbase, report an error like validateSchemaAndApplyImplicitCast. Means that the Query Schema and Sink Schema are inconsistent.
  Mainly Row (EXPR$0) in Query Schema, which are all expressions. Sink Schema is Row(device_id). I don't know how to write in sql to be consistent with hbase's sink schema.
  I try to write sql like select device_id as rowkey, ROW( device_id as [cannot write as]  ) as f1

error message as follow:
<image.png>

sample code like:
HBase sink ddl:
String ddlSource = "CREATE TABLE test_hive_catalog.test_dim.dw_common_mobile_device_user_mapping (\n" +
                "  rowkey STRING,\n" +
                "  f1 ROW< \n" +
                "        device_id STRING,\n" +
                "        pass_id STRING,\n" +
                "        first_date STRING,\n" +
                "        first_channel_id STRING,\n" +
                "        first_app_version STRING,\n" +
                "        first_server_time STRING,\n" +
                "        first_server_hour STRING,\n" +
                "        first_ip_location STRING,\n" +
                "        first_login_time STRING,\n" +
                "        sys_can_uninstall STRING,\n" +
                "        update_date STRING,\n" +
                "        server_time BIGINT,\n" +
                "        last_pass_id STRING,\n" +
                "        last_channel_id STRING,\n" +
                "        last_app_version STRING,\n" +
                "        last_date STRING,\n" +
                "        os STRING,\n" +
                "        attribution_channel_id STRING,\n" +
                "        attribution_first_date STRING,\n" +
                "        p_product STRING,\n" +
                "        p_project STRING,\n" +
                "        p_dt STRING\n" +
                "        >\n" +
                ") WITH (\n" +
                "  'connector.type' = 'hbase',\n" +
                "  'connector.version' = '1.4.3',\n" + // 即使绕过语法编译,换其他版本的hbase,还是有问题,如线上的版本就不行
                "  'connector.table-name' = 'dw_common_mobile_device_user_mapping_new',\n" +
                "  'connector.zookeeper.quorum' = '"+ zookeeperServers +"',\n" +
                "  'connector.zookeeper.znode.parent' = '/hbase143',\n" +
                "  'connector.write.buffer-flush.max-size' = '2mb',\n" +
                "  'connector.write.buffer-flush.max-rows' = '1000',\n" +
                "  'connector.write.buffer-flush.interval' = '2s'\n" +
                ")";

insert into sql:

String bodyAndLocalSql = "" +
//                "insert into test_hive_catalog.test_dim.dw_common_mobile_device_user_mapping " +
                "SELECT CAST(rowkey AS STRING) AS rowkey, " +
                " ROW(" +
                " device_id, pass_id, first_date, first_channel_id, first_app_version, first_server_time, first_server_hour, first_ip_location, first_login_time, sys_can_uninstall, update_date, server_time, last_pass_id, last_channel_id, last_app_version, last_date, os, attribution_channel_id, attribution_first_date, p_product, p_project, p_dt " +
                ") AS f1" +
                " FROM " +
                "(" +
                " SELECT " +
                " MD5(CONCAT_WS('|', kafka.uid, kafka.p_product, kafka.p_project)) AS rowkey, " +
                " kafka.uid AS device_id " +
                ",kafka.pass_id " +

                // first_date
                ",CASE WHEN COALESCE(hbase.server_time, 0) <= kafka.server_time " +
                // 新用户
                " THEN FROM_UNIXTIME(kafka.server_time, 'yyyy-MM-dd') " +
                // 老用户
                " ELSE hbase.first_date END AS first_date " +

                // first_channel_id
                ",CASE WHEN COALESCE(hbase.server_time, 0) <= kafka.server_time " +
                // 新用户
                " THEN kafka.wlb_channel_id" +
                // 老用户
                " ELSE hbase.first_channel_id END AS first_channel_id " +

                // first_app_version
                ",CASE WHEN COALESCE(hbase.server_time, 0) <= kafka.server_time " +
                // 新用户
                " THEN kafka.app_version " +
                // 老用户
                " ELSE hbase.first_app_version END AS first_app_version " +

                // first_server_time
                ",CASE WHEN COALESCE(hbase.server_time, 0) <= kafka.server_time " +
                // 新用户
                " THEN FROM_UNIXTIME(kafka.server_time, 'yyyy-MM-dd HH:mm:ss') " +
                // 老用户
                " ELSE hbase.first_server_time END AS first_server_time " +

                // first_server_hour
                ",CASE WHEN COALESCE(hbase.server_time, 0) <= kafka.server_time " +
                // 新用户
                " THEN FROM_UNIXTIME(kafka.server_time, 'HH') " +
                // 老用户
                " ELSE hbase.first_server_hour END AS first_server_hour " +

                // first_ip_location
                ",CASE WHEN COALESCE(hbase.server_time, 0) <= kafka.server_time " +
                // 新用户
                " THEN kafka.ip_location " +
                // 老用户
                " ELSE hbase.first_ip_location END AS first_ip_location " +

                // first_login_time
                ",CASE WHEN COALESCE(hbase.server_time, 0) <= kafka.server_time " +
                // 新用户
                " THEN FROM_UNIXTIME(kafka.server_time, 'yyyy-MM-dd HH:mm:ss') " +
                // 老用户
                " ELSE hbase.first_login_time END AS first_login_time " +

                ",kafka.sys_can_uninstall " +

                // update_date
                ",CASE WHEN hbase.pass_id = 0 " +
                " THEN CAST(FROM_UNIXTIME(kafka.server_time, 'yyyy-MM-dd') AS string) " +
                " END AS update_date " + // VARCHAR(2000)

                // server_time
                ",kafka.server_time" +

                ", kafka.pass_id AS last_pass_id" +
                ", kafka.wlb_channel_id AS last_channel_id" +
                ", kafka.app_version AS last_app_version" +
                ", CAST(FROM_UNIXTIME(kafka.server_time, 'yyyy-MM-dd') AS STRING) AS last_date" + // VARCHAR(2000)
                ", kafka.os" +
                ", hbase.attribution_channel_id" +
                ", hbase.attribution_first_date" +
                ", kafka.p_product" +
                ", kafka.p_project" +
                ", kafka.p_dt" +

                " FROM test_hive_catalog.test_ods.test_ods_header AS kafka " +
                " FULL JOIN test_hive_catalog.test_dim.dw_common_mobile_device_user_mapping AS hbase " +
                " ON kafka.uid = hbase.device_id " + // TODO 这里uid,后面要改成device_id
                " WHERE kafka.is_body=1 AND kafka.is_local=1" +
                ")";



Reply | Threaded
Open this post in threaded view
|

Re: HELP!!! Flink1.10 sql insert into HBase, error like validateSchemaAndApplyImplicitCast

Danny Chan
I suspect there are some inconsistency in the nullability of the whole record field, can you compare the 2 schema and see the diff ? For a table, you can get the TableSchema first and print it out.

Best,
Danny Chan
在 2020年7月16日 +0800 AM10:56,Leonard Xu <[hidden email]>,写道:
Hi, Jim

Could you post error message in text that contains the entire schema of query and sink? I doubt there are some  fields type were mismatched.

Best,
Leonard Xu

在 2020年7月16日,10:29,Jim Chen <[hidden email]> 写道:

Hi,
  I use flink1.10.1 sql to connect hbase1.4.3. When inset into hbase, report an error like validateSchemaAndApplyImplicitCast. Means that the Query Schema and Sink Schema are inconsistent.
  Mainly Row (EXPR$0) in Query Schema, which are all expressions. Sink Schema is Row(device_id). I don't know how to write in sql to be consistent with hbase's sink schema.
  I try to write sql like select device_id as rowkey, ROW( device_id as [cannot write as]  ) as f1

error message as follow:
<image.png>

sample code like:
HBase sink ddl:
String ddlSource = "CREATE TABLE test_hive_catalog.test_dim.dw_common_mobile_device_user_mapping (\n" +
                "  rowkey STRING,\n" +
                "  f1 ROW< \n" +
                "        device_id STRING,\n" +
                "        pass_id STRING,\n" +
                "        first_date STRING,\n" +
                "        first_channel_id STRING,\n" +
                "        first_app_version STRING,\n" +
                "        first_server_time STRING,\n" +
                "        first_server_hour STRING,\n" +
                "        first_ip_location STRING,\n" +
                "        first_login_time STRING,\n" +
                "        sys_can_uninstall STRING,\n" +
                "        update_date STRING,\n" +
                "        server_time BIGINT,\n" +
                "        last_pass_id STRING,\n" +
                "        last_channel_id STRING,\n" +
                "        last_app_version STRING,\n" +
                "        last_date STRING,\n" +
                "        os STRING,\n" +
                "        attribution_channel_id STRING,\n" +
                "        attribution_first_date STRING,\n" +
                "        p_product STRING,\n" +
                "        p_project STRING,\n" +
                "        p_dt STRING\n" +
                "        >\n" +
                ") WITH (\n" +
                "  'connector.type' = 'hbase',\n" +
                "  'connector.version' = '1.4.3',\n" + // 即使绕过语法编译,换其他版本的hbase,还是有问题,如线上的版本就不行
                "  'connector.table-name' = 'dw_common_mobile_device_user_mapping_new',\n" +
                "  'connector.zookeeper.quorum' = '"+ zookeeperServers +"',\n" +
                "  'connector.zookeeper.znode.parent' = '/hbase143',\n" +
                "  'connector.write.buffer-flush.max-size' = '2mb',\n" +
                "  'connector.write.buffer-flush.max-rows' = '1000',\n" +
                "  'connector.write.buffer-flush.interval' = '2s'\n" +
                ")";

insert into sql:

String bodyAndLocalSql = "" +
//                "insert into test_hive_catalog.test_dim.dw_common_mobile_device_user_mapping " +
                "SELECT CAST(rowkey AS STRING) AS rowkey, " +
                " ROW(" +
                " device_id, pass_id, first_date, first_channel_id, first_app_version, first_server_time, first_server_hour, first_ip_location, first_login_time, sys_can_uninstall, update_date, server_time, last_pass_id, last_channel_id, last_app_version, last_date, os, attribution_channel_id, attribution_first_date, p_product, p_project, p_dt " +
                ") AS f1" +
                " FROM " +
                "(" +
                " SELECT " +
                " MD5(CONCAT_WS('|', kafka.uid, kafka.p_product, kafka.p_project)) AS rowkey, " +
                " kafka.uid AS device_id " +
                ",kafka.pass_id " +

                // first_date
                ",CASE WHEN COALESCE(hbase.server_time, 0) <= kafka.server_time " +
                // 新用户
                " THEN FROM_UNIXTIME(kafka.server_time, 'yyyy-MM-dd') " +
                // 老用户
                " ELSE hbase.first_date END AS first_date " +

                // first_channel_id
                ",CASE WHEN COALESCE(hbase.server_time, 0) <= kafka.server_time " +
                // 新用户
                " THEN kafka.wlb_channel_id" +
                // 老用户
                " ELSE hbase.first_channel_id END AS first_channel_id " +

                // first_app_version
                ",CASE WHEN COALESCE(hbase.server_time, 0) <= kafka.server_time " +
                // 新用户
                " THEN kafka.app_version " +
                // 老用户
                " ELSE hbase.first_app_version END AS first_app_version " +

                // first_server_time
                ",CASE WHEN COALESCE(hbase.server_time, 0) <= kafka.server_time " +
                // 新用户
                " THEN FROM_UNIXTIME(kafka.server_time, 'yyyy-MM-dd HH:mm:ss') " +
                // 老用户
                " ELSE hbase.first_server_time END AS first_server_time " +

                // first_server_hour
                ",CASE WHEN COALESCE(hbase.server_time, 0) <= kafka.server_time " +
                // 新用户
                " THEN FROM_UNIXTIME(kafka.server_time, 'HH') " +
                // 老用户
                " ELSE hbase.first_server_hour END AS first_server_hour " +

                // first_ip_location
                ",CASE WHEN COALESCE(hbase.server_time, 0) <= kafka.server_time " +
                // 新用户
                " THEN kafka.ip_location " +
                // 老用户
                " ELSE hbase.first_ip_location END AS first_ip_location " +

                // first_login_time
                ",CASE WHEN COALESCE(hbase.server_time, 0) <= kafka.server_time " +
                // 新用户
                " THEN FROM_UNIXTIME(kafka.server_time, 'yyyy-MM-dd HH:mm:ss') " +
                // 老用户
                " ELSE hbase.first_login_time END AS first_login_time " +

                ",kafka.sys_can_uninstall " +

                // update_date
                ",CASE WHEN hbase.pass_id = 0 " +
                " THEN CAST(FROM_UNIXTIME(kafka.server_time, 'yyyy-MM-dd') AS string) " +
                " END AS update_date " + // VARCHAR(2000)

                // server_time
                ",kafka.server_time" +

                ", kafka.pass_id AS last_pass_id" +
                ", kafka.wlb_channel_id AS last_channel_id" +
                ", kafka.app_version AS last_app_version" +
                ", CAST(FROM_UNIXTIME(kafka.server_time, 'yyyy-MM-dd') AS STRING) AS last_date" + // VARCHAR(2000)
                ", kafka.os" +
                ", hbase.attribution_channel_id" +
                ", hbase.attribution_first_date" +
                ", kafka.p_product" +
                ", kafka.p_project" +
                ", kafka.p_dt" +

                " FROM test_hive_catalog.test_ods.test_ods_header AS kafka " +
                " FULL JOIN test_hive_catalog.test_dim.dw_common_mobile_device_user_mapping AS hbase " +
                " ON kafka.uid = hbase.device_id " + // TODO 这里uid,后面要改成device_id
                " WHERE kafka.is_body=1 AND kafka.is_local=1" +
                ")";