not able to join data coming from kafka

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

not able to join data coming from kafka

Rakesh Kumar

Hello Team,


public class FlinkJoinDataStream {


@SuppressWarnings("serial")

public static void main(String[] args) {


Properties props = new Properties();

props.setProperty("zookeeper.connect", "localhost:2181");

props.setProperty("bootstrap.servers", "localhost:9092");

props.setProperty("group.id", "myGroup");

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

env.enableCheckpointing(1000);

env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);


DataStream<Tuple4<Integer, Integer, String, Long>> order_details = env.addSource(new FlinkKafkaConsumer010<String>("test1", new SimpleStringSchema(), props)).map(new Mapper1());


DataStream<Tuple4<Integer, Integer, String, Long>> invoice_details = env.addSource(new FlinkKafkaConsumer010<String>("test2", new SimpleStringSchema(), props)).map(new Mapper2());

long maxOutOfOrderness=550000L;


DataStream<Tuple4<Integer, Integer, String, Long>> invoice_watermark = invoice_details.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks<Tuple4<Integer, Integer, String, Long>>(){


long currentTimestamp;


@Override

public long extractTimestamp(Tuple4<Integer, Integer, String, Long> element, long previousElementTimestamp) {

currentTimestamp = element.f3;

return currentTimestamp;

}


@Override

public Watermark getCurrentWatermark() {

return new Watermark(currentTimestamp);

}

});

invoice_watermark.print();

DataStream<Tuple4<Integer, Integer, String, Long>> order_watermark = order_details.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks<Tuple4<Integer,Integer,String,Long>>() {

long currentTimestamp;

@Override

public long extractTimestamp(Tuple4<Integer, Integer, String, Long> element, long previousElementTimestamp) {

currentTimestamp = element.f3;

return currentTimestamp;

}

@Override

public Watermark getCurrentWatermark() {

return new Watermark(currentTimestamp-maxOutOfOrderness);

}

});

order_watermark.print();

DataStream<Tuple4<Integer, Integer, String, Integer>> joinedData = order_watermark.keyBy(0).join(invoice_watermark.keyBy(0))

.where(new KeySelector<Tuple4<Integer, Integer, String, Long>, Integer>() {

@Override

public Integer getKey(

Tuple4<Integer, Integer, String, Long>value)

throws Exception {

return value.f0;

}

})

.equalTo(new KeySelector<Tuple4<Integer, Integer, String, Long>, Integer>() {


@Override

public Integer getKey(Tuple4<Integer, Integer, String, Long> value) throws Exception {

return value.f0;

}

})

.window(TumblingEventTimeWindows.of(Time.seconds(60)))

.apply(new JoinFunction<Tuple4<Integer, Integer, String, Long>, Tuple4<Integer, Integer, String, Long>, Tuple4<Integer, Integer, String,Integer>>() {


@Override

public Tuple4<Integer, Integer, String,Integer> join(

Tuple4<Integer, Integer, String, Long> first,

Tuple4<Integer, Integer, String, Long> second) throws Exception {

return new Tuple4<Integer, Integer, String,Integer>(first.f0,first.f1,first.f2,second.f1);

}

});

joinedData.print();

try {

env.execute();

} catch (Exception e) {

e.printStackTrace();

}

}

private static class Mapper1 implements MapFunction<String, Tuple4<Integer, Integer, String, Long>>{


private static final long serialVersionUID = 1L;

//{"order_id":317,"customer_id":654,"tstamp_trans":"20181130090300"}

@Override

public Tuple4<Integer, Integer, String, Long> map(String value) throws Exception {

JSONObject jsonObject = new JSONObject(value);

final DateFormat dfm = new SimpleDateFormat("yyyyMMddHHmmss");


return new Tuple4<Integer, Integer, String, Long>(

jsonObject.getInt("order_id"), jsonObject.getInt("customer_id"),

jsonObject.getString("tstamp_trans"),

dfm.parse(jsonObject.getString("tstamp_trans")).getTime() / 1000);

}

}

private static class Mapper2 implements MapFunction<String, Tuple4<Integer, Integer, String, Long>>{


private static final long serialVersionUID = 1L;

//{"order_id":317,"invoice_status":1,"tstamp_trans":"20181130090300"}


@Override

public Tuple4<Integer, Integer, String, Long> map(String value) throws Exception {

JSONObject jsonObject = new JSONObject(value);

final DateFormat dfm = new SimpleDateFormat("yyyyMMddHHmmss");


return new Tuple4<Integer, Integer, String, Long>(

jsonObject.getInt("order_id"), jsonObject.getInt("invoice_status"),

jsonObject.getString("tstamp_trans"),

dfm.parse(jsonObject.getString("tstamp_trans")).getTime() / 1000);

}

}


}


If I'm reading the same data using collection, everything is working fine:


private static List<String> createOrderRecords() {

List<String>orderRecords=new ArrayList<>();

orderRecords.add("{\"order_id\":312,\"customer_id\":654,\"tstamp_trans\":\"20181130090300\"}");

orderRecords.add("{\"order_id\":314,\"customer_id\":654,\"tstamp_trans\":\"20181130090300\"}");

orderRecords.add("{\"order_id\":316,\"customer_id\":654,\"tstamp_trans\":\"20181130090300\"}");

orderRecords.add("{\"order_id\":317,\"customer_id\":654,\"tstamp_trans\":\"20181130096300\"}");

orderRecords.add("{\"order_id\":315,\"customer_id\":654,\"tstamp_trans\":\"20181130090300\"}");

orderRecords.add("{\"order_id\":318,\"customer_id\":654,\"tstamp_trans\":\"20181130099000\"}");

return orderRecords;

}

private static List<String> createInvoiceRecords() {

List<String>invoiceRecords=new ArrayList<>();

invoiceRecords.add("{\"order_id\":312,\"invoice_status\":1,\"tstamp_trans\":\"20181130090300\"}");

invoiceRecords.add("{\"order_id\":318,\"invoice_status\":1,\"tstamp_trans\":\"20181130099000\"}");

invoiceRecords.add("{\"order_id\":317,\"invoice_status\":1,\"tstamp_trans\":\"20181130096300\"}");

invoiceRecords.add("{\"order_id\":311,\"invoice_status\":1,\"tstamp_trans\":\"20181130050300\"}");

return invoiceRecords;

}


If I'm excluding Kafka as data source and these collections as data source then thing's working fine.


Thank you,

Rakesh Kumar


Reply | Threaded
Open this post in threaded view
|

Re: not able to join data coming from kafka

Dawid Wysakowicz-2

Hi Rakesh,

Could you explain a little bit what is the actual problem? What do you expect as the ouput and what actually happens? It is hard to guess what is the problem you're facing.

Best,

Dawid

On 03/12/2018 12:19, Rakesh Kumar wrote:

Hello Team,


public class FlinkJoinDataStream {


@SuppressWarnings("serial")

public static void main(String[] args) {


Properties props = new Properties();

props.setProperty("zookeeper.connect", "localhost:2181");

props.setProperty("bootstrap.servers", "localhost:9092");

props.setProperty("group.id", "myGroup");

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

env.enableCheckpointing(1000);

env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);


DataStream<Tuple4<Integer, Integer, String, Long>> order_details = env.addSource(new FlinkKafkaConsumer010<String>("test1", new SimpleStringSchema(), props)).map(new Mapper1());


DataStream<Tuple4<Integer, Integer, String, Long>> invoice_details = env.addSource(new FlinkKafkaConsumer010<String>("test2", new SimpleStringSchema(), props)).map(new Mapper2());

long maxOutOfOrderness=550000L;


DataStream<Tuple4<Integer, Integer, String, Long>> invoice_watermark = invoice_details.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks<Tuple4<Integer, Integer, String, Long>>(){


long currentTimestamp;


@Override

public long extractTimestamp(Tuple4<Integer, Integer, String, Long> element, long previousElementTimestamp) {

currentTimestamp = element.f3;

return currentTimestamp;

}


@Override

public Watermark getCurrentWatermark() {

return new Watermark(currentTimestamp);

}

});

invoice_watermark.print();

DataStream<Tuple4<Integer, Integer, String, Long>> order_watermark = order_details.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks<Tuple4<Integer,Integer,String,Long>>() {

long currentTimestamp;

@Override

public long extractTimestamp(Tuple4<Integer, Integer, String, Long> element, long previousElementTimestamp) {

currentTimestamp = element.f3;

return currentTimestamp;

}

@Override

public Watermark getCurrentWatermark() {

return new Watermark(currentTimestamp-maxOutOfOrderness);

}

});

order_watermark.print();

DataStream<Tuple4<Integer, Integer, String, Integer>> joinedData = order_watermark.keyBy(0).join(invoice_watermark.keyBy(0))

.where(new KeySelector<Tuple4<Integer, Integer, String, Long>, Integer>() {

@Override

public Integer getKey(

Tuple4<Integer, Integer, String, Long>value)

throws Exception {

return value.f0;

}

})

.equalTo(new KeySelector<Tuple4<Integer, Integer, String, Long>, Integer>() {


@Override

public Integer getKey(Tuple4<Integer, Integer, String, Long> value) throws Exception {

return value.f0;

}

})

.window(TumblingEventTimeWindows.of(Time.seconds(60)))

.apply(new JoinFunction<Tuple4<Integer, Integer, String, Long>, Tuple4<Integer, Integer, String, Long>, Tuple4<Integer, Integer, String,Integer>>() {


@Override

public Tuple4<Integer, Integer, String,Integer> join(

Tuple4<Integer, Integer, String, Long> first,

Tuple4<Integer, Integer, String, Long> second) throws Exception {

return new Tuple4<Integer, Integer, String,Integer>(first.f0,first.f1,first.f2,second.f1);

}

});

joinedData.print();

try {

env.execute();

} catch (Exception e) {

e.printStackTrace();

}

}

private static class Mapper1 implements MapFunction<String, Tuple4<Integer, Integer, String, Long>>{


private static final long serialVersionUID = 1L;

//{"order_id":317,"customer_id":654,"tstamp_trans":"20181130090300"}

@Override

public Tuple4<Integer, Integer, String, Long> map(String value) throws Exception {

JSONObject jsonObject = new JSONObject(value);

final DateFormat dfm = new SimpleDateFormat("yyyyMMddHHmmss");


return new Tuple4<Integer, Integer, String, Long>(

jsonObject.getInt("order_id"), jsonObject.getInt("customer_id"),

jsonObject.getString("tstamp_trans"),

dfm.parse(jsonObject.getString("tstamp_trans")).getTime() / 1000);

}

}

private static class Mapper2 implements MapFunction<String, Tuple4<Integer, Integer, String, Long>>{


private static final long serialVersionUID = 1L;

//{"order_id":317,"invoice_status":1,"tstamp_trans":"20181130090300"}


@Override

public Tuple4<Integer, Integer, String, Long> map(String value) throws Exception {

JSONObject jsonObject = new JSONObject(value);

final DateFormat dfm = new SimpleDateFormat("yyyyMMddHHmmss");


return new Tuple4<Integer, Integer, String, Long>(

jsonObject.getInt("order_id"), jsonObject.getInt("invoice_status"),

jsonObject.getString("tstamp_trans"),

dfm.parse(jsonObject.getString("tstamp_trans")).getTime() / 1000);

}

}


}


If I'm reading the same data using collection, everything is working fine:


private static List<String> createOrderRecords() {

List<String>orderRecords=new ArrayList<>();

orderRecords.add("{\"order_id\":312,\"customer_id\":654,\"tstamp_trans\":\"20181130090300\"}");

orderRecords.add("{\"order_id\":314,\"customer_id\":654,\"tstamp_trans\":\"20181130090300\"}");

orderRecords.add("{\"order_id\":316,\"customer_id\":654,\"tstamp_trans\":\"20181130090300\"}");

orderRecords.add("{\"order_id\":317,\"customer_id\":654,\"tstamp_trans\":\"20181130096300\"}");

orderRecords.add("{\"order_id\":315,\"customer_id\":654,\"tstamp_trans\":\"20181130090300\"}");

orderRecords.add("{\"order_id\":318,\"customer_id\":654,\"tstamp_trans\":\"20181130099000\"}");

return orderRecords;

}

private static List<String> createInvoiceRecords() {

List<String>invoiceRecords=new ArrayList<>();

invoiceRecords.add("{\"order_id\":312,\"invoice_status\":1,\"tstamp_trans\":\"20181130090300\"}");

invoiceRecords.add("{\"order_id\":318,\"invoice_status\":1,\"tstamp_trans\":\"20181130099000\"}");

invoiceRecords.add("{\"order_id\":317,\"invoice_status\":1,\"tstamp_trans\":\"20181130096300\"}");

invoiceRecords.add("{\"order_id\":311,\"invoice_status\":1,\"tstamp_trans\":\"20181130050300\"}");

return invoiceRecords;

}


If I'm excluding Kafka as data source and these collections as data source then thing's working fine.


Thank you,

Rakesh Kumar



signature.asc (849 bytes) Download Attachment