import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; import org.apache.flink.table.api.*; import org.apache.flink.table.api.java.StreamTableEnvironment; import org.apache.flink.types.Row; import java.util.Properties; public class Test { public static void main(String... args) throws Exception { EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(env, bsSettings); FlinkKafkaConsumer consumer1 = getKafkaConsumer("test-topic1"); FlinkKafkaConsumer consumer2 = getKafkaConsumer("test-topic2"); FlinkKafkaConsumer consumer3 = getKafkaConsumer("test-topic3"); FlinkKafkaConsumer consumer4 = getKafkaConsumer("test-topic4"); FlinkKafkaConsumer consumer5 = getKafkaConsumer("test-topic5"); DataStream stream1 = env.addSource(consumer1); DataStream stream2 = env.addSource(consumer2); DataStream stream3 = env.addSource(consumer3); DataStream stream4 = env.addSource(consumer4); DataStream stream5 = env.addSource(consumer5); bsTableEnv.registerDataStream("sample1", stream1); bsTableEnv.registerDataStream("sample2", stream2); bsTableEnv.registerDataStream("sample3", stream3); bsTableEnv.registerDataStream("sample4", stream4); bsTableEnv.registerDataStream("sample5", stream5); Table result = bsTableEnv.sqlQuery("SELECT * FROM sample1 FULL OUTER JOIN sample2 on sample1.f0=sample2.f0" + " FULL OUTER JOIN sample3 on sample2.f0=sample3.f0 FULL OUTER JOIN sample4 on sample3.f0=sample4.f0"); result.printSchema(); bsTableEnv.toRetractStream(result, Row.class).print(); bsTableEnv.execute("sample job"); } private static FlinkKafkaConsumer getKafkaConsumer(String topic) { Properties properties = new Properties(); properties.setProperty("bootstrap.servers", "localhost:9092"); properties.setProperty("group.id", topic); return new FlinkKafkaConsumer<>( java.util.regex.Pattern.compile(topic), new SimpleStringSchema(), properties); } }