How to use Dimension table in Flink TableAPI with StreamExecutionEnvironment ?

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

How to use Dimension table in Flink TableAPI with StreamExecutionEnvironment ?

叶振宝
Hey, I am new to flink and I have a question and want to see if anyone can
help here.

How to use Dimension table in Flink TableAPI with StreamExecutionEnvironment ?

I use TableFuncion to deal this question, but it have some problem in debug
like this:
LogicalProject(col_1=[$0])
  LogicalJoin(condition=[true], joinType=[left])
    LogicalTableScan(table=[[test]])
    LogicalTableFunctionScan(invocation=[dim_test()], rowType=[RecordType(VARCHAR(65536) col, VARCHAR(65536) name)], elementType=[class [Ljava.lang.Object;])

This exception indicates that the query uses an unsupported SQL feature.
Please check the documentation for the set of currently supported SQL features.
        at org.apache.flink.table.api.TableEnvironment.runVolcanoPlanner(TableEnvironment.scala:274)
        at org.apache.flink.table.api.StreamTableEnvironment.optimize(StreamTableEnvironment.scala:674)
        at org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:730)
        at org.apache.flink.table.api.StreamTableEnvironment.writeToSink(StreamTableEnvironment.scala:216)
        at org.apache.flink.table.api.TableEnvironment.insertInto(TableEnvironment.scala:692)
        at com.bigdata.stream.streamsql.FlinkSqlTest.main(FlinkSqlTest.java:64)

SQL : select t.col_1 from test t left join lateral table(dim_test()) b on true

Main Code:
public static void main(String[] args) throws Exception {
        String sql = "select t.col_1 from test t left join lateral table(dim_test()) b on true";
        StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment stEnv = TableEnvironment.getTableEnvironment(streamEnv);
        Properties kafkaProps = new Properties();
        kafkaProps.setProperty("zookeeper.connect", "localhost:2181");
        kafkaProps.setProperty("bootstrap.servers", "localhost:9092");
        kafkaProps.setProperty("group.id", "test");
        Kafka010JsonTableSource tableSource = Kafka010JsonTableSource.builder()
                .forTopic("test")
                .withKafkaProperties(kafkaProps)
                .withSchema(TableSchema.builder()
                        .field("col_1", Types.STRING)
                        .field("col_2",Types.STRING).build())
                .build();
        stEnv.registerTableSource("test", tableSource);
        String[] columns = {"col","name"};
        TypeInformation[] typeInformations = {TypeInformation.of(String.class),TypeInformation.of(String.class)};
        TableSchema tableSchema = new TableSchema(columns,typeInformations);
        Map<String,Object> context = new HashMap<>();
        context.put("mysql.url","jdbc:mysql://localhost:3306/test");
        context.put("mysql.driver","com.mysql.jdbc.Driver");
        context.put("mysql.user","test");
        context.put("mysql.password","test");
        context.put("mysql.table","dim_test");
        StreamSqlDim dim = new MySqlDimFactory().getInstance(tableSchema,new StreamSqlContext(context));
        stEnv.registerFunction("dim_test",dim);

        String[] outColumns = {"col"};
        TypeInformation[] outType = {TypeInformation.of(String.class)};
        TableSink tableSink = new Kafka010JsonTableSink("test_out",kafkaProps);
        stEnv.registerTableSink("test_out",outColumns,outType,tableSink);
        Table t = stEnv.sql(sql);
        stEnv.insertInto(t,"test_out",stEnv.queryConfig());
        streamEnv.execute();
    }

MySqlDim is extends TableFunction ,and the method eval() is empty,like this:
public void eval(){

}