Hello Team, public class FlinkJoinDataStream { @SuppressWarnings("serial") public static void main(String[] args) { Properties props = new Properties(); props.setProperty("zookeeper.connect", "localhost:2181"); props.setProperty("bootstrap.servers", "localhost:9092"); props.setProperty("group.id", "myGroup");
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.enableCheckpointing(1000); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
DataStream<Tuple4<Integer, Integer, String, Long>> order_details = env.addSource(new FlinkKafkaConsumer010<String>("test1", new SimpleStringSchema(), props)).map(new Mapper1()); DataStream<Tuple4<Integer, Integer, String, Long>> invoice_details = env.addSource(new FlinkKafkaConsumer010<String>("test2", new SimpleStringSchema(), props)).map(new Mapper2());
long maxOutOfOrderness=550000L; DataStream<Tuple4<Integer, Integer, String, Long>> invoice_watermark = invoice_details.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks<Tuple4<Integer, Integer, String, Long>>(){ long currentTimestamp; @Override public long extractTimestamp(Tuple4<Integer, Integer, String, Long> element, long previousElementTimestamp) { currentTimestamp = element.f3; return currentTimestamp; } @Override public Watermark getCurrentWatermark() { return new Watermark(currentTimestamp); } });
invoice_watermark.print();
DataStream<Tuple4<Integer, Integer, String, Long>> order_watermark = order_details.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks<Tuple4<Integer,Integer,String,Long>>() {
long currentTimestamp; @Override public long extractTimestamp(Tuple4<Integer, Integer, String, Long> element, long previousElementTimestamp) { currentTimestamp = element.f3; return currentTimestamp; }
@Override public Watermark getCurrentWatermark() { return new Watermark(currentTimestamp-maxOutOfOrderness); } });
order_watermark.print();
DataStream<Tuple4<Integer, Integer, String, Integer>> joinedData = order_watermark.keyBy(0).join(invoice_watermark.keyBy(0)) .where(new KeySelector<Tuple4<Integer, Integer, String, Long>, Integer>() { @Override public Integer getKey( Tuple4<Integer, Integer, String, Long>value) throws Exception { return value.f0; } }) .equalTo(new KeySelector<Tuple4<Integer, Integer, String, Long>, Integer>() { @Override public Integer getKey(Tuple4<Integer, Integer, String, Long> value) throws Exception { return value.f0; } }) .window(TumblingEventTimeWindows.of(Time.seconds(60))) .apply(new JoinFunction<Tuple4<Integer, Integer, String, Long>, Tuple4<Integer, Integer, String, Long>, Tuple4<Integer, Integer, String,Integer>>() { @Override public Tuple4<Integer, Integer, String,Integer> join( Tuple4<Integer, Integer, String, Long> first, Tuple4<Integer, Integer, String, Long> second) throws Exception { return new Tuple4<Integer, Integer, String,Integer>(first.f0,first.f1,first.f2,second.f1); } });
joinedData.print(); try { env.execute(); } catch (Exception e) { e.printStackTrace(); } }
private static class Mapper1 implements MapFunction<String, Tuple4<Integer, Integer, String, Long>>{ private static final long serialVersionUID = 1L;
//{"order_id":317,"customer_id":654,"tstamp_trans":"20181130090300"}
@Override public Tuple4<Integer, Integer, String, Long> map(String value) throws Exception {
JSONObject jsonObject = new JSONObject(value); final DateFormat dfm = new SimpleDateFormat("yyyyMMddHHmmss"); return new Tuple4<Integer, Integer, String, Long>( jsonObject.getInt("order_id"), jsonObject.getInt("customer_id"), jsonObject.getString("tstamp_trans"), dfm.parse(jsonObject.getString("tstamp_trans")).getTime() / 1000); } } private static class Mapper2 implements MapFunction<String, Tuple4<Integer, Integer, String, Long>>{ private static final long serialVersionUID = 1L;
//{"order_id":317,"invoice_status":1,"tstamp_trans":"20181130090300"} @Override public Tuple4<Integer, Integer, String, Long> map(String value) throws Exception { JSONObject jsonObject = new JSONObject(value);
final DateFormat dfm = new SimpleDateFormat("yyyyMMddHHmmss"); return new Tuple4<Integer, Integer, String, Long>( jsonObject.getInt("order_id"), jsonObject.getInt("invoice_status"), jsonObject.getString("tstamp_trans"), dfm.parse(jsonObject.getString("tstamp_trans")).getTime() / 1000); } } } If I'm reading the same data using collection, everything is working fine: private static List<String> createOrderRecords() { List<String>orderRecords=new ArrayList<>(); orderRecords.add("{\"order_id\":312,\"customer_id\":654,\"tstamp_trans\":\"20181130090300\"}"); orderRecords.add("{\"order_id\":314,\"customer_id\":654,\"tstamp_trans\":\"20181130090300\"}"); orderRecords.add("{\"order_id\":316,\"customer_id\":654,\"tstamp_trans\":\"20181130090300\"}"); orderRecords.add("{\"order_id\":317,\"customer_id\":654,\"tstamp_trans\":\"20181130096300\"}"); orderRecords.add("{\"order_id\":315,\"customer_id\":654,\"tstamp_trans\":\"20181130090300\"}"); orderRecords.add("{\"order_id\":318,\"customer_id\":654,\"tstamp_trans\":\"20181130099000\"}"); return orderRecords; } private static List<String> createInvoiceRecords() { List<String>invoiceRecords=new ArrayList<>(); invoiceRecords.add("{\"order_id\":312,\"invoice_status\":1,\"tstamp_trans\":\"20181130090300\"}"); invoiceRecords.add("{\"order_id\":318,\"invoice_status\":1,\"tstamp_trans\":\"20181130099000\"}"); invoiceRecords.add("{\"order_id\":317,\"invoice_status\":1,\"tstamp_trans\":\"20181130096300\"}"); invoiceRecords.add("{\"order_id\":311,\"invoice_status\":1,\"tstamp_trans\":\"20181130050300\"}"); return invoiceRecords;
} If I'm excluding Kafka as data source and these collections as data source then thing's working fine. Thank you, Rakesh Kumar |
Hi Rakesh, Could you explain a little bit what is the actual problem? What do you expect as the ouput and what actually happens? It is hard to guess what is the problem you're facing. Best, Dawid On 03/12/2018 12:19, Rakesh Kumar
wrote:
signature.asc (849 bytes) Download Attachment |
Free forum by Nabble | Edit this page |