How to use self defined json format when create table from kafka stream?

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

How to use self defined json format when create table from kafka stream?

wanglei2@geekplus.com.cn

I want to rigister a table from mysql binlog like this: 
tEnv.sqlUpdate("CREATE TABLE order(\n"
+ " order_id BIGINT,\n"
+ " order_no VARCHAR,\n"
+ ") WITH (\n"
+ " 'connector.type' = 'kafka',\n"
...........
+ " 'update-mode' = 'append',\n"
+ " 'format.type' = 'json',\n"
+ " 'format.derive-schema' = 'true'\n"
+ ")");
using the following log format: 
{
  "type" : "update",
  "timestamp" : 1583373066000,
  "binlog_filename" : "mysql-bin.000453",
  "binlog_position" : 923020943,
  "database" : "wms",
  "table_name" : "t_pick_order",
  "table_id" : 131936,
  "columns" : [ {
    "id" : 1,
    "name" : "order_id",
    "column_type" : -5,
    "last_value" : 4606458,
    "value" : 4606458
  }, {
    "id" : 2,
    "name" : "order_no",
    "column_type" : 12,
    "last_value" : "EDBMFSJ00001S2003050006628",
    "value" : "EDBMFSJ00001S2003050006628"
  }]
}

Surely the format.type' = 'json',\n" will not parse the result as I expected.
Is there any method I can implement this? For example, using a self defined format class.

Thanks,
Lei


wanglei2@geekplus.com.cn


Reply | Threaded
Open this post in threaded view
|

Re: How to use self defined json format when create table from kafka stream?

Jark Wu-3
Hi Lei,

Currently, Flink SQL doesn't support to register a binlog format (i.e. just define "order_id" and "order_no", but the json schema has other binlog fields).
This is exactly what we want to support in FLIP-105 [1] and FLIP-95.

For now, if you want to consume such json data, you have to define the full schema, e.g. "type", "timestmap", and so on...

Btw, what Change Data Capture (CDC) tool are you using? 

Best,
Jark



On Thu, 5 Mar 2020 at 11:40, [hidden email] <[hidden email]> wrote:

I want to rigister a table from mysql binlog like this: 
tEnv.sqlUpdate("CREATE TABLE order(\n"
+ " order_id BIGINT,\n"
+ " order_no VARCHAR,\n"
+ ") WITH (\n"
+ " 'connector.type' = 'kafka',\n"
...........
+ " 'update-mode' = 'append',\n"
+ " 'format.type' = 'json',\n"
+ " 'format.derive-schema' = 'true'\n"
+ ")");
using the following log format: 
{
  "type" : "update",
  "timestamp" : 1583373066000,
  "binlog_filename" : "mysql-bin.000453",
  "binlog_position" : 923020943,
  "database" : "wms",
  "table_name" : "t_pick_order",
  "table_id" : 131936,
  "columns" : [ {
    "id" : 1,
    "name" : "order_id",
    "column_type" : -5,
    "last_value" : 4606458,
    "value" : 4606458
  }, {
    "id" : 2,
    "name" : "order_no",
    "column_type" : 12,
    "last_value" : "EDBMFSJ00001S2003050006628",
    "value" : "EDBMFSJ00001S2003050006628"
  }]
}

Surely the format.type' = 'json',\n" will not parse the result as I expected.
Is there any method I can implement this? For example, using a self defined format class.

Thanks,
Lei


Reply | Threaded
Open this post in threaded view
|

Re: How to use self defined json format when create table from kafka stream?

Kurt Young
User defined formats also sounds like an interesting extension. 

Best,
Kurt


On Thu, Mar 5, 2020 at 3:06 PM Jark Wu <[hidden email]> wrote:
Hi Lei,

Currently, Flink SQL doesn't support to register a binlog format (i.e. just define "order_id" and "order_no", but the json schema has other binlog fields).
This is exactly what we want to support in FLIP-105 [1] and FLIP-95.

For now, if you want to consume such json data, you have to define the full schema, e.g. "type", "timestmap", and so on...

Btw, what Change Data Capture (CDC) tool are you using? 

Best,
Jark



On Thu, 5 Mar 2020 at 11:40, [hidden email] <[hidden email]> wrote:

I want to rigister a table from mysql binlog like this: 
tEnv.sqlUpdate("CREATE TABLE order(\n"
+ " order_id BIGINT,\n"
+ " order_no VARCHAR,\n"
+ ") WITH (\n"
+ " 'connector.type' = 'kafka',\n"
...........
+ " 'update-mode' = 'append',\n"
+ " 'format.type' = 'json',\n"
+ " 'format.derive-schema' = 'true'\n"
+ ")");
using the following log format: 
{
  "type" : "update",
  "timestamp" : 1583373066000,
  "binlog_filename" : "mysql-bin.000453",
  "binlog_position" : 923020943,
  "database" : "wms",
  "table_name" : "t_pick_order",
  "table_id" : 131936,
  "columns" : [ {
    "id" : 1,
    "name" : "order_id",
    "column_type" : -5,
    "last_value" : 4606458,
    "value" : 4606458
  }, {
    "id" : 2,
    "name" : "order_no",
    "column_type" : 12,
    "last_value" : "EDBMFSJ00001S2003050006628",
    "value" : "EDBMFSJ00001S2003050006628"
  }]
}

Surely the format.type' = 'json',\n" will not parse the result as I expected.
Is there any method I can implement this? For example, using a self defined format class.

Thanks,
Lei