public static JDBCInputFormatBuilder inputBuilder = JDBCInputFormat.buildJDBCInputFormat()
.setDrivername(DRIVER_CLASS)
.setDBUrl(DB_URL)
.setUsername(USER_NAME)
.setPassword(USER_PASS)
.setQuery(SELECT_ALL_PERSONS)
.setRowTypeInfo(ROW_TYPE_INFO);
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
StreamTableEnvironment tEnv = TableEnvironment.getTableEnvironment(env);
DataStream<Order> orderA = env.addSource(new OrderFunction());
tEnv.registerDataStream("tableA", orderA, "name, product, amount");
DataStream<Row> mysql_table = env.createInput(inputBuilder.finish());
String[] dim_table_fileds = {"id","name","age","address"};
tEnv.registerDataStream("tableB",mysql_table);
Table result = tEnv.sqlQuery("SELECT tableA.name,tableA.amount,tableB.age,tableB.address FROM tableB join tableA on tableA.name = tableB.name" );
tEnv.toRetractStream(result, ROW_TYPE_INFO_OUT).print();
env.execute();