hi, I want to use flink sql to left join static dimension table from mysql currently, so I converted the mysql table into data stream to join with datastream which has converted to flink table. While I found that the real-time stream data is not joined correctly with mysql data at the beginning, but the latter stream can be joined correctly. So I want to ask that is there any good way to make real-time stream can join with mysql data with table api which has loaded and supporting dynamicly loading mysql data into memory once each hour. Thanks a lot. The following is the some example code: public static JDBCInputFormatBuilder inputBuilder = JDBCInputFormat.buildJDBCInputFormat() .setDrivername(DRIVER_CLASS) .setDBUrl(DB_URL) .setUsername(USER_NAME) .setPassword(USER_PASS) .setQuery(SELECT_ALL_PERSONS) .setRowTypeInfo(ROW_TYPE_INFO); StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(); StreamTableEnvironment tEnv = TableEnvironment.getTableEnvironment(env); DataStream<Order> orderA = env.addSource(new OrderFunction()); tEnv.registerDataStream("tableA", orderA, "name, product, amount"); DataStream<Row> mysql_table = env.createInput(inputBuilder.finish()); String[] dim_table_fileds = {"id","name","age","address"}; tEnv.registerDataStream("tableB",mysql_table); Table result = tEnv.sqlQuery("SELECT tableA.name,tableA.amount,tableB.age,tableB.address FROM tableB join tableA on tableA.name = tableB.name" ); tEnv.toRetractStream(result, ROW_TYPE_INFO_OUT).print(); env.execute(); Thanks a lot.
|
Hi yelun, Currently, there are no direct ways to dynamically load data and do join in Flink-SQL, as a workaround you can implement your logic with an udtf. In the udtf, you can load the data into a cache and update it according to your requirement. Best, Hequn On Wed, Nov 14, 2018 at 10:34 AM yelun <[hidden email]> wrote:
|
Free forum by Nabble | Edit this page |