package mr.yan; import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.flink.api.common.functions.RichMapFunction; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.typeutils.RowTypeInfo; import org.apache.flink.api.java.typeutils.TupleTypeInfo; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.sink.SinkFunction; import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.TableEnvironment; import org.apache.flink.table.api.java.StreamTableEnvironment; import org.apache.flink.table.sinks.RetractStreamTableSink; import org.apache.flink.table.sinks.TableSink; import org.apache.flink.types.Row; import java.sql.Timestamp; import java.util.Properties; public class Sample { public static void main(String... args) throws Exception { String kafkaServer = "localhost:9092"; String kafkaGroup = "kafkaGroup"; String kafkaEventTopic = "kafkaEventTopic"; StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); env.setParallelism(1); StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env); Properties properties = new Properties(); properties.setProperty("bootstrap.servers", kafkaServer); properties.setProperty("group.id", kafkaGroup); FlinkKafkaConsumer010 kafkaConsumer = new FlinkKafkaConsumer010<>("kafkaEventTopic", new SimpleStringSchema(), properties); DataStream dataStream = env .addSource(kafkaConsumer) .map(new RichMapFunction() { private ObjectMapper om; @Override public void open(Configuration parameters) throws Exception { this.om = new ObjectMapper(); } @Override public MyObject map(String value) throws Exception { return om.readValue(value, MyObject.class); } }).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor(Time.seconds(10)) { @Override public long extractTimestamp(MyObject element) { return element.getEventTs().getTime(); } }); Table table = tableEnv.fromDataStream(dataStream, "id,ip,type,eventTs.rowtime"); tableEnv.registerTable("myTable", table); String sql1 = "select distinct id, eventTs, count(*) over (partition by id order by eventTs rows between 100 preceding and current row) as cnt1 from myTable"; String sql2 = "select distinct id as r_id, eventTs as r_eventTs, count(*) over (partition by id order by eventTs rows between 50 preceding and current row) as cnt2 from myTable"; Table left = tableEnv.sqlQuery(sql1); Table right = tableEnv.sqlQuery(sql2); left.join(right).where("id = r_id && eventTs === r_eventTs").writeToSink(new RetractStreamTableSink() { private String[] fieldNames; private TypeInformation[] fieldTypes; @Override public TypeInformation getRecordType() { return new RowTypeInfo(fieldTypes); } @Override public void emitDataStream(DataStream> dataStream) { dataStream.addSink(new SinkFunction>() { @Override public void invoke(Tuple2 value, Context context) throws Exception { System.out.println(value.toString()); } }); } @Override public TupleTypeInfo> getOutputType() { return new TupleTypeInfo<>(Types.BOOLEAN, getRecordType()); } @Override public String[] getFieldNames() { return fieldNames; } @Override public TypeInformation[] getFieldTypes() { return fieldTypes; } @Override public TableSink> configure(String[] fieldNames, TypeInformation[] fieldTypes) { this.fieldNames = fieldNames; this.fieldTypes = fieldTypes; return this; } }); env.execute("hello"); } public static class MyObject { private String id; private String ip; private Long type; private Timestamp eventTs; public MyObject() { } public String getId() { return id; } public void setId(String id) { this.id = id; } public String getIp() { return ip; } public void setIp(String ip) { this.ip = ip; } public Long getType() { return type; } public void setType(Long type) { this.type = type; } public Timestamp getEventTs() { return eventTs; } public void setEventTs(Timestamp eventTs) { this.eventTs = eventTs; } } }