I want to rigister a table from mysql binlog like this: tEnv.sqlUpdate("CREATE TABLE order(\n" using the following log format: {
"type" : "update",
"timestamp" : 1583373066000,
"binlog_filename" : "mysql-bin.000453",
"binlog_position" : 923020943,
"database" : "wms",
"table_name" : "t_pick_order",
"table_id" : 131936,
"columns" : [ {
"id" : 1,
"name" : "order_id",
"column_type" : -5,
"last_value" : 4606458,
"value" : 4606458
}, {
"id" : 2,
"name" : "order_no",
"column_type" : 12,
"last_value" : "EDBMFSJ00001S2003050006628",
"value" : "EDBMFSJ00001S2003050006628"
}]
} Surely the format.type' = 'json',\n" will not parse the result as I expected. Is there any method I can implement this? For example, using a self defined format class. Thanks, Lei wanglei2@geekplus.com.cn |
Hi Lei, Currently, Flink SQL doesn't support to register a binlog format (i.e. just define "order_id" and "order_no", but the json schema has other binlog fields). This is exactly what we want to support in FLIP-105 [1] and FLIP-95. For now, if you want to consume such json data, you have to define the full schema, e.g. "type", "timestmap", and so on... Btw, what Change Data Capture (CDC) tool are you using? Best, Jark
|
User defined formats also sounds like an interesting extension. Best, Kurt On Thu, Mar 5, 2020 at 3:06 PM Jark Wu <[hidden email]> wrote:
|
Free forum by Nabble | Edit this page |