How flink table api to join with mysql dimtable

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

How flink table api to join with mysql dimtable

yelun
hi,

I want to use flink sql to left join static dimension table from mysql currently, so I converted the mysql table into data stream to join with datastream which has converted to flink table. While I found that the real-time stream data is not joined correctly with mysql data  at the beginning, but the latter stream can be joined correctly. So I want to ask that is there any good way to make real-time stream can join with mysql data with table api which has loaded and supporting dynamicly loading mysql data into memory once each hour. Thanks a lot.

The following is the some example code:

public static JDBCInputFormatBuilder inputBuilder = JDBCInputFormat.buildJDBCInputFormat()
.setDrivername(DRIVER_CLASS)
.setDBUrl(DB_URL)
.setUsername(USER_NAME)
.setPassword(USER_PASS)
.setQuery(SELECT_ALL_PERSONS)
.setRowTypeInfo(ROW_TYPE_INFO);

StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
StreamTableEnvironment  tEnv = TableEnvironment.getTableEnvironment(env);

DataStream<Order> orderA = env.addSource(new OrderFunction());
tEnv.registerDataStream("tableA", orderA, "name, product, amount");

DataStream<Row> mysql_table = env.createInput(inputBuilder.finish());
String[] dim_table_fileds = {"id","name","age","address"};

tEnv.registerDataStream("tableB",mysql_table);
Table result = tEnv.sqlQuery("SELECT tableA.name,tableA.amount,tableB.age,tableB.address FROM tableB  join tableA on tableA.name = tableB.name" );
tEnv.toRetractStream(result, ROW_TYPE_INFO_OUT).print();
env.execute();

Thanks a lot.
Reply | Threaded
Open this post in threaded view
|

Re: How flink table api to join with mysql dimtable

Hequn Cheng
Hi yelun,

Currently, there are no direct ways to dynamically load data and do join in Flink-SQL, as a workaround you can implement your logic with an udtf. In the udtf, you can load the data into a cache and update it according to your requirement.

Best, Hequn

On Wed, Nov 14, 2018 at 10:34 AM yelun <[hidden email]> wrote:
hi,

I want to use flink sql to left join static dimension table from mysql currently, so I converted the mysql table into data stream to join with datastream which has converted to flink table. While I found that the real-time stream data is not joined correctly with mysql data  at the beginning, but the latter stream can be joined correctly. So I want to ask that is there any good way to make real-time stream can join with mysql data with table api which has loaded and supporting dynamicly loading mysql data into memory once each hour. Thanks a lot.

The following is the some example code:

public static JDBCInputFormatBuilder inputBuilder = JDBCInputFormat.buildJDBCInputFormat()
.setDrivername(DRIVER_CLASS)
.setDBUrl(DB_URL)
.setUsername(USER_NAME)
.setPassword(USER_PASS)
.setQuery(SELECT_ALL_PERSONS)
.setRowTypeInfo(ROW_TYPE_INFO);

StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
StreamTableEnvironment  tEnv = TableEnvironment.getTableEnvironment(env);

DataStream<Order> orderA = env.addSource(new OrderFunction());
tEnv.registerDataStream("tableA", orderA, "name, product, amount");

DataStream<Row> mysql_table = env.createInput(inputBuilder.finish());
String[] dim_table_fileds = {"id","name","age","address"};

tEnv.registerDataStream("tableB",mysql_table);
Table result = tEnv.sqlQuery("SELECT tableA.name,tableA.amount,tableB.age,tableB.address FROM tableB  join tableA on tableA.name = tableB.name" );
tEnv.toRetractStream(result, ROW_TYPE_INFO_OUT).print();
env.execute();

Thanks a lot.