Hey, I am new to flink and I have a question and want to see if anyone can
help here. How to use Dimension table in Flink TableAPI with StreamExecutionEnvironment ? I use TableFuncion to deal this question, but it have some problem in debug like this: LogicalProject(col_1=[$0]) LogicalJoin(condition=[true], joinType=[left]) LogicalTableScan(table=[[test]]) LogicalTableFunctionScan(invocation=[dim_test()], rowType=[RecordType(VARCHAR(65536) col, VARCHAR(65536) name)], elementType=[class [Ljava.lang.Object;]) This exception indicates that the query uses an unsupported SQL feature. Please check the documentation for the set of currently supported SQL features. at org.apache.flink.table.api.TableEnvironment.runVolcanoPlanner(TableEnvironment.scala:274) at org.apache.flink.table.api.StreamTableEnvironment.optimize(StreamTableEnvironment.scala:674) at org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:730) at org.apache.flink.table.api.StreamTableEnvironment.writeToSink(StreamTableEnvironment.scala:216) at org.apache.flink.table.api.TableEnvironment.insertInto(TableEnvironment.scala:692) at com.bigdata.stream.streamsql.FlinkSqlTest.main(FlinkSqlTest.java:64) SQL : select t.col_1 from test t left join lateral table(dim_test()) b on true Main Code: public static void main(String[] args) throws Exception { String sql = "select t.col_1 from test t left join lateral table(dim_test()) b on true"; StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment stEnv = TableEnvironment.getTableEnvironment(streamEnv); Properties kafkaProps = new Properties(); kafkaProps.setProperty("zookeeper.connect", "localhost:2181"); kafkaProps.setProperty("bootstrap.servers", "localhost:9092"); kafkaProps.setProperty("group.id", "test"); Kafka010JsonTableSource tableSource = Kafka010JsonTableSource.builder() .forTopic("test") .withKafkaProperties(kafkaProps) .withSchema(TableSchema.builder() .field("col_1", Types.STRING) .field("col_2",Types.STRING).build()) .build(); stEnv.registerTableSource("test", tableSource); String[] columns = {"col","name"}; TypeInformation[] typeInformations = {TypeInformation.of(String.class),TypeInformation.of(String.class)}; TableSchema tableSchema = new TableSchema(columns,typeInformations); Map<String,Object> context = new HashMap<>(); context.put("mysql.url","jdbc:mysql://localhost:3306/test"); context.put("mysql.driver","com.mysql.jdbc.Driver"); context.put("mysql.user","test"); context.put("mysql.password","test"); context.put("mysql.table","dim_test"); StreamSqlDim dim = new MySqlDimFactory().getInstance(tableSchema,new StreamSqlContext(context)); stEnv.registerFunction("dim_test",dim); String[] outColumns = {"col"}; TypeInformation[] outType = {TypeInformation.of(String.class)}; TableSink tableSink = new Kafka010JsonTableSink("test_out",kafkaProps); stEnv.registerTableSink("test_out",outColumns,outType,tableSink); Table t = stEnv.sql(sql); stEnv.insertInto(t,"test_out",stEnv.queryConfig()); streamEnv.execute(); } MySqlDim is extends TableFunction ,and the method eval() is empty,like this: public void eval(){ } |
Free forum by Nabble | Edit this page |