package com.flink.streaming.flinkJoin; import java.text.DateFormat; import java.text.SimpleDateFormat; import java.util.Properties; import org.apache.flink.api.common.functions.JoinFunction; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.tuple.Tuple12; import org.apache.flink.api.java.tuple.Tuple13; import org.apache.flink.api.java.tuple.Tuple4; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks; import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor; import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer; import org.apache.flink.streaming.util.serialization.SimpleStringSchema; import org.json.JSONException; import org.json.JSONObject; @SuppressWarnings("deprecation") public class FlinkEvent { public static void main(String[] args) { Properties props = new Properties(); props.setProperty("bootstrap.servers", "localhost:9092"); props.setProperty("group.id", "myGroup"); props.setProperty("auto.offset.reset", "latest"); //5 min delay long maxOutOfOrderness=300000L; StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); env.getConfig().setAutoWatermarkInterval(1000); FlinkKafkaConsumer order_source = new FlinkKafkaConsumer("order", new SimpleStringSchema(), props); FlinkKafkaConsumer invoice_source = new FlinkKafkaConsumer("invoice", new SimpleStringSchema(), props); DataStream> order_details = env .addSource(order_source).map(new Mapper1()); DataStream> invoice_details = env.addSource(invoice_source) .map(new Mapper2()); DataStream> order_watermark = order_details .assignTimestampsAndWatermarks( new AssignerWithPeriodicWatermarks>() { private static final long serialVersionUID = 1L; long currentTimestamp; @Override public long extractTimestamp( Tuple13 element, long previousElementTimestamp) { currentTimestamp = element.f12; return currentTimestamp; } @Override public Watermark getCurrentWatermark() { return new Watermark(currentTimestamp-maxOutOfOrderness); } }); DataStream> invoice_watermark = invoice_details .assignTimestampsAndWatermarks( new AssignerWithPeriodicWatermarks>() { private static final long serialVersionUID = 1L; long currentTimestamp; @Override public long extractTimestamp(Tuple4 element, long previousElementTimestamp) { currentTimestamp = element.f3; return currentTimestamp; } @Override public Watermark getCurrentWatermark() { return new Watermark(currentTimestamp); } }); DataStream> joinedData = order_watermark .keyBy(0).join(invoice_watermark.keyBy(0)) .where(new KeySelector, Integer>() { private static final long serialVersionUID = 1L; @Override public Integer getKey( Tuple13 value) throws Exception { return value.f0; } }).equalTo(new KeySelector, Integer>() { private static final long serialVersionUID = 1L; @Override public Integer getKey(Tuple4 value) throws Exception { return value.f0; } }).window(TumblingEventTimeWindows.of(Time.milliseconds(1))) .apply(new JoinFunction, Tuple4, Tuple12>() { private static final long serialVersionUID = 1L; @Override public Tuple12 join( Tuple13 first, Tuple4 second) throws Exception { return new Tuple12( first.f0, first.f1, first.f2, first.f3, first.f4, first.f5, first.f6, first.f7, first.f8, first.f9, first.f10, second.f3); } }); joinedData.print(); DataStream final_joinedData = joinedData.map( new MapFunction, String>() { private static final long serialVersionUID = 1L; @Override public String map( Tuple12 value) throws Exception { long current = System.currentTimeMillis(); long diff = current - value.f11; System.out.println("Current " + current + " difference " + (current - value.f11)); String generatedInput = "{\"order_id\":" + value.f0 + ",\"customer_id\":" + value.f1 + ",\"promotion_id\":" + value.f2 + ",\"store_id\":" + value.f3 + ",\"product_id\":" + value.f4 + ",\"warehouse_id\":" + value.f5 + ",\"unit_cost\":" + value.f6 + ",\"total_cost\":" + value.f7 + ",\"units_sold\":" + value.f8 + ",\"promotion_cost\":" + value.f9 + ",\"date_of_order\":" + value.f10 + ",\"latency\":" + diff + "}"; return generatedInput; } }); FlinkKafkaProducer myProducer = new FlinkKafkaProducer("localhost:9092", "orderSales", new SimpleStringSchema()); final_joinedData.addSink(myProducer); try { env.execute(); } catch (Exception e) { e.printStackTrace(); } } private static boolean isJSONValid(String value) { try { new JSONObject(value); } catch (JSONException ex) { ex.printStackTrace(); return false; } return true; } private static class Mapper1 implements MapFunction> { private static final long serialVersionUID = 1L; @Override public Tuple13 map( String value) throws Exception { JSONObject jsonObject = null; if (isJSONValid(value)) jsonObject = new JSONObject(value); else return new Tuple13(); final DateFormat dfm = new SimpleDateFormat("yyyyMMddHHmmss"); return new Tuple13( jsonObject.getInt("order_id"), jsonObject.getInt("customer_id"), jsonObject.getInt("promotion_id"), jsonObject.getInt("store_id"), jsonObject.getInt("product_id"), jsonObject.getInt("warehouse_id"), jsonObject.getInt("unit_cost"), jsonObject.getInt("total_cost"), jsonObject.getInt("units_sold"), jsonObject.getInt("promotion_cost"), jsonObject.getString("date_of_order"), jsonObject.getString("tstamp_trans"), dfm.parse(jsonObject.getString("tstamp_trans")).getTime()); } } private static class Mapper2 implements MapFunction> { private static final long serialVersionUID = 1L; @Override public Tuple4 map(String value) throws Exception { JSONObject jsonObject = null; if (isJSONValid(value)) jsonObject = new JSONObject(value); else return new Tuple4(); final DateFormat dfm = new SimpleDateFormat("yyyyMMddHHmmss"); return new Tuple4(jsonObject.getInt("order_id"), jsonObject.getString("invoice_status"), jsonObject.getString("tstamp_trans"), dfm.parse(jsonObject.getString("tstamp_trans")).getTime()); } } }