error while joining two datastream

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

error while joining two datastream

Abhijeet Kumar
Hello Team,

I'm new to Flink and coming from Spark background. I need help in completing this stream job. I'm reading data from two different Kafka topics and I want to join them.

My code:

formatStream1.join(formatStream2)
.where(new KeySelector<Tuple11<String, String, String, String, String, String, String, String, String, String, Long>, String>() {
public String getKey(Tuple11<String, String, String, String, String, String, String, String, String, String, Long> t1) throws Exception {
return t1.f0;
}
})
.equalTo(new KeySelector<Tuple7<String, String, String, String, String, String, Long>, String>() {
public String getKey(Tuple7<String, String, String, String, String, String, Long> t1) throws Exception {
return t1.f0;
}
}).window(TumblingEventTimeWindows.of(Time.seconds(15)))
.apply(new JoinFunction<Tuple11<String, String, String, String, String, String, String, String, String, String, Long>, Tuple7<String, String, String, String, String, String, Long>, Tuple17<String, String, String, String, String, String, String, String, String, String, String, String, String, String, String, Long, Long>>() {

public Tuple17<String, String, String, String, String, String, String, String, String, String, String, String, String, String, String, Long, Long> join(
Tuple11<String, String, String, String, String, String, String, String, String, String, Long> first,
Tuple7<String, String, String, String, String, String, Long> second) {
return new Tuple17<String, String, String, String, String, String, String, String, String, String, String, String, String, String, String, Long, Long>(first.f0, first.f1, first.f2, first.f3, first.f4, first.f5, first.f6, first.f7, first.f8, first.f9, second.f1, second.f2, second.f3, second.f4, second.f5, second.f6, first.f10);
}
}).print();


Error:

Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
at org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:630)
at org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:123)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1511)
at com.argoid.flink.streaming.kafkajoin.JoinKafka.main(JoinKafka.java:155)
Caused by: java.lang.RuntimeException: Record has Long.MIN_VALUE timestamp (= no timestamp marker). Is the time characteristic set to 'ProcessingTime', or did you forget to call 'DataStream.assignTimestampsAndWatermarks(...)'?
at org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows.assignWindows(TumblingEventTimeWindows.java:69)
at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:295)
at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)
at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
at java.lang.Thread.run(Thread.java:748)

In formatStream1 and formatStream2 variable data is coming I checked by printing them. So, the issue is in the code which I shared. Thanks in advance!!!

Thanks,


Abhijeet Kumar
Software Development Engineer,
Sentienz Solutions Pvt Ltd
Cognitive Data Platform - Perceive the Data !


Reply | Threaded
Open this post in threaded view
|

Re: error while joining two datastream

Nagarjun Guraja
Looks like you need to assign time stamps and emit watermarks to both the streams viz. formatStream1 and formatStream2 as described at 

On Thu, Nov 22, 2018 at 10:55 PM Abhijeet Kumar <[hidden email]> wrote:
Hello Team,

I'm new to Flink and coming from Spark background. I need help in completing this stream job. I'm reading data from two different Kafka topics and I want to join them.

My code:

formatStream1.join(formatStream2)
.where(new KeySelector<Tuple11<String, String, String, String, String, String, String, String, String, String, Long>, String>() {
public String getKey(Tuple11<String, String, String, String, String, String, String, String, String, String, Long> t1) throws Exception {
return t1.f0;
}
})
.equalTo(new KeySelector<Tuple7<String, String, String, String, String, String, Long>, String>() {
public String getKey(Tuple7<String, String, String, String, String, String, Long> t1) throws Exception {
return t1.f0;
}
}).window(TumblingEventTimeWindows.of(Time.seconds(15)))
.apply(new JoinFunction<Tuple11<String, String, String, String, String, String, String, String, String, String, Long>, Tuple7<String, String, String, String, String, String, Long>, Tuple17<String, String, String, String, String, String, String, String, String, String, String, String, String, String, String, Long, Long>>() {

public Tuple17<String, String, String, String, String, String, String, String, String, String, String, String, String, String, String, Long, Long> join(
Tuple11<String, String, String, String, String, String, String, String, String, String, Long> first,
Tuple7<String, String, String, String, String, String, Long> second) {
return new Tuple17<String, String, String, String, String, String, String, String, String, String, String, String, String, String, String, Long, Long>(first.f0, first.f1, first.f2, first.f3, first.f4, first.f5, first.f6, first.f7, first.f8, first.f9, second.f1, second.f2, second.f3, second.f4, second.f5, second.f6, first.f10);
}
}).print();


Error:

Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
at org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:630)
at org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:123)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1511)
at com.argoid.flink.streaming.kafkajoin.JoinKafka.main(JoinKafka.java:155)
Caused by: java.lang.RuntimeException: Record has Long.MIN_VALUE timestamp (= no timestamp marker). Is the time characteristic set to 'ProcessingTime', or did you forget to call 'DataStream.assignTimestampsAndWatermarks(...)'?
at org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows.assignWindows(TumblingEventTimeWindows.java:69)
at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:295)
at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)
at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
at java.lang.Thread.run(Thread.java:748)

In formatStream1 and formatStream2 variable data is coming I checked by printing them. So, the issue is in the code which I shared. Thanks in advance!!!

Thanks,


Abhijeet Kumar
Software Development Engineer,
Sentienz Solutions Pvt Ltd
Cognitive Data Platform - Perceive the Data !

--
Regards,
Nagarjun

Success is not final, failure is not fatal: it is the courage to continue that counts. 
- Winston Churchill - 
Reply | Threaded
Open this post in threaded view
|

Re: error while joining two datastream

Abhijeet Kumar
DataStream<Tuple11<String, String, String, String, String, String, String, String, String, String, Long>> withTimestampsAndWatermarks1 = formatStream1
.assignTimestampsAndWatermarks(
new BoundedOutOfOrdernessTimestampExtractor<Tuple11<String, String, String, String, String, String, String, String, String, String, Long>>(
Time.seconds(Integer.parseInt(parameters.get("watermarkTime")))) {

/**
*/
private static final long serialVersionUID = 1L;

@Override
public long extractTimestamp(
Tuple11<String, String, String, String, String, String, String, String, String, String, Long> element) {
return element.f10;
}
});

DataStream<Tuple7<String, String, String, String, String, String, Long>> withTimestampsAndWatermarks2 = formatStream2
.assignTimestampsAndWatermarks(
new BoundedOutOfOrdernessTimestampExtractor<Tuple7<String, String, String, String, String, String, Long>>(
Time.seconds(Integer.parseInt(parameters.get("watermarkTime")))) {

/**
*/
private static final long serialVersionUID = 1L;

@Override
public long extractTimestamp(
Tuple7<String, String, String, String, String, String, Long> element) {
return element.f6;
}
});


withTimestampsAndWatermarks1.print();
withTimestampsAndWatermarks2.print();


DataStream< Tuple17<String, String, String, String, String, String, String, String, String, String, String, String, String, String, String, Long, Long>> joined = withTimestampsAndWatermarks1
.join(withTimestampsAndWatermarks2)
.where(new KeySelector<Tuple11<String, String, String, String, String, String, String, String, String, String, Long>, String>() {
/**
*/
private static final long serialVersionUID = 1L;

public String getKey(
Tuple11<String, String, String, String, String, String, String, String, String, String, Long> t1)
throws Exception {
return t1.f0;
}
}).equalTo(new KeySelector<Tuple7<String, String, String, String, String, String, Long>, String>() {
/**
*/
private static final long serialVersionUID = 1L;

public String getKey(Tuple7<String, String, String, String, String, String, Long> t1)
throws Exception {
return t1.f0;
}
}).window(TumblingEventTimeWindows.of(Time.seconds(15)))
.apply(new JoinFunction<Tuple11<String, String, String, String, String, String, String, String, String, String, Long>, Tuple7<String, String, String, String, String, String, Long>, Tuple17<String, String, String, String, String, String, String, String, String, String, String, String, String, String, String, Long, Long>>() {

/**
*/
private static final long serialVersionUID = 1L;

public Tuple17<String, String, String, String, String, String, String, String, String, String, String, String, String, String, String, Long, Long> join(
Tuple11<String, String, String, String, String, String, String, String, String, String, Long> first,
Tuple7<String, String, String, String, String, String, Long> second) {
return new Tuple17<String, String, String, String, String, String, String, String, String, String, String, String, String, String, String, Long, Long>(first.f0first.f1first.f2first.f3first.f4first.f5first.f6first.f7first.f8first.f9second.f1second.f2second.f3second.f4second.f5second.f6first.f10);
}
});


joined.print();

Ok, so now I did it like this. Errors resolved! but, now I'm not able to see any output when I'm printing joined datastream.

On 23-Nov-2018, at 12:42 PM, Nagarjun Guraja <[hidden email]> wrote:

Looks like you need to assign time stamps and emit watermarks to both the streams viz. formatStream1 and formatStream2 as described at 

On Thu, Nov 22, 2018 at 10:55 PM Abhijeet Kumar <[hidden email]> wrote:
Hello Team,

I'm new to Flink and coming from Spark background. I need help in completing this stream job. I'm reading data from two different Kafka topics and I want to join them.

My code:

formatStream1.join(formatStream2)
.where(new KeySelector<Tuple11<String, String, String, String, String, String, String, String, String, String, Long>, String>() {
public String getKey(Tuple11<String, String, String, String, String, String, String, String, String, String, Long> t1) throws Exception {
return t1.f0;
}
})
.equalTo(new KeySelector<Tuple7<String, String, String, String, String, String, Long>, String>() {
public String getKey(Tuple7<String, String, String, String, String, String, Long> t1) throws Exception {
return t1.f0;
}
}).window(TumblingEventTimeWindows.of(Time.seconds(15)))
.apply(new JoinFunction<Tuple11<String, String, String, String, String, String, String, String, String, String, Long>, Tuple7<String, String, String, String, String, String, Long>, Tuple17<String, String, String, String, String, String, String, String, String, String, String, String, String, String, String, Long, Long>>() {

public Tuple17<String, String, String, String, String, String, String, String, String, String, String, String, String, String, String, Long, Long> join(
Tuple11<String, String, String, String, String, String, String, String, String, String, Long> first,
Tuple7<String, String, String, String, String, String, Long> second) {
return new Tuple17<String, String, String, String, String, String, String, String, String, String, String, String, String, String, String, Long, Long>(first.f0, first.f1, first.f2, first.f3, first.f4, first.f5, first.f6, first.f7, first.f8, first.f9, second.f1, second.f2, second.f3, second.f4, second.f5, second.f6, first.f10);
}
}).print();


Error:

Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
at org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:630)
at org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:123)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1511)
at com.argoid.flink.streaming.kafkajoin.JoinKafka.main(JoinKafka.java:155)
Caused by: java.lang.RuntimeException: Record has Long.MIN_VALUE timestamp (= no timestamp marker). Is the time characteristic set to 'ProcessingTime', or did you forget to call 'DataStream.assignTimestampsAndWatermarks(...)'?
at org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows.assignWindows(TumblingEventTimeWindows.java:69)
at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:295)
at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)
at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
at java.lang.Thread.run(Thread.java:748)

In formatStream1 and formatStream2 variable data is coming I checked by printing them. So, the issue is in the code which I shared. Thanks in advance!!!

Thanks,


Abhijeet Kumar
Software Development Engineer,
Sentienz Solutions Pvt Ltd
Cognitive Data Platform - Perceive the Data !

-- 
Regards,
Nagarjun

Success is not final, failure is not fatal: it is the courage to continue that counts. 
- Winston Churchill - 

Reply | Threaded
Open this post in threaded view
|

Re: error while joining two datastream

Piotr Nowojski
Hi,

I assume that 

withTimestampsAndWatermarks1.print();
withTimestampsAndWatermarks2.print();

Actually prints what you have expected? 

If so, the problem might be that:
a) time/watermarks are not progressing (watermarks are triggering the output of your `TumblingEventTimeWindows.of(Time.seconds(15))`)
b) data are not being joined, because:
  - there are no matching elements (based on your KeySelectors) to join with between those two streams
  - elements are out of sync with respect to window length (within your 15 second tumbling window, there are no elements to join)
c) streams are producing different event times/watermarks (for example one is far ahead of the other). Windowed join will produce result only once their’s both watermarks catch up/sync up.
  
Piotrek 

On 23 Nov 2018, at 08:50, Abhijeet Kumar <[hidden email]> wrote:

DataStream<Tuple11<String, String, String, String, String, String, String, String, String, String, Long>> withTimestampsAndWatermarks1 = formatStream1
.assignTimestampsAndWatermarks(
new BoundedOutOfOrdernessTimestampExtractor<Tuple11<String, String, String, String, String, String, String, String, String, String, Long>>(
Time.seconds(Integer.parseInt(parameters.get("watermarkTime")))) {

/**
*/
private static final long serialVersionUID = 1L;

@Override
public long extractTimestamp(
Tuple11<String, String, String, String, String, String, String, String, String, String, Long> element) {
return element.f10;
}
});

DataStream<Tuple7<String, String, String, String, String, String, Long>> withTimestampsAndWatermarks2 = formatStream2
.assignTimestampsAndWatermarks(
new BoundedOutOfOrdernessTimestampExtractor<Tuple7<String, String, String, String, String, String, Long>>(
Time.seconds(Integer.parseInt(parameters.get("watermarkTime")))) {

/**
*/
private static final long serialVersionUID = 1L;

@Override
public long extractTimestamp(
Tuple7<String, String, String, String, String, String, Long> element) {
return element.f6;
}
});

withTimestampsAndWatermarks1.print();
withTimestampsAndWatermarks2.print();

DataStream< Tuple17<String, String, String, String, String, String, String, String, String, String, String, String, String, String, String, Long, Long>> joined = withTimestampsAndWatermarks1
.join(withTimestampsAndWatermarks2)
.where(new KeySelector<Tuple11<String, String, String, String, String, String, String, String, String, String, Long>, String>() {
/**
*/
private static final long serialVersionUID = 1L;

public String getKey(
Tuple11<String, String, String, String, String, String, String, String, String, String, Long> t1)
throws Exception {
return t1.f0;
}
}).equalTo(new KeySelector<Tuple7<String, String, String, String, String, String, Long>, String>() {
/**
*/
private static final long serialVersionUID = 1L;

public String getKey(Tuple7<String, String, String, String, String, String, Long> t1)
throws Exception {
return t1.f0;
}
}).window(TumblingEventTimeWindows.of(Time.seconds(15)))
.apply(new JoinFunction<Tuple11<String, String, String, String, String, String, String, String, String, String, Long>, Tuple7<String, String, String, String, String, String, Long>, Tuple17<String, String, String, String, String, String, String, String, String, String, String, String, String, String, String, Long, Long>>() {

/**
*/
private static final long serialVersionUID = 1L;

public Tuple17<String, String, String, String, String, String, String, String, String, String, String, String, String, String, String, Long, Long> join(
Tuple11<String, String, String, String, String, String, String, String, String, String, Long> first,
Tuple7<String, String, String, String, String, String, Long> second) {
return new Tuple17<String, String, String, String, String, String, String, String, String, String, String, String, String, String, String, Long, Long>(first.f0first.f1first.f2first.f3first.f4first.f5first.f6first.f7first.f8first.f9second.f1second.f2second.f3second.f4second.f5second.f6first.f10);
}
});

joined.print();

Ok, so now I did it like this. Errors resolved! but, now I'm not able to see any output when I'm printing joined datastream.

On 23-Nov-2018, at 12:42 PM, Nagarjun Guraja <[hidden email]> wrote:

Looks like you need to assign time stamps and emit watermarks to both the streams viz. formatStream1 and formatStream2 as described at 

On Thu, Nov 22, 2018 at 10:55 PM Abhijeet Kumar <[hidden email]> wrote:
Hello Team,

I'm new to Flink and coming from Spark background. I need help in completing this stream job. I'm reading data from two different Kafka topics and I want to join them.

My code:

formatStream1.join(formatStream2)
.where(new KeySelector<Tuple11<String, String, String, String, String, String, String, String, String, String, Long>, String>() {
public String getKey(Tuple11<String, String, String, String, String, String, String, String, String, String, Long> t1) throws Exception {
return t1.f0;
}
})
.equalTo(new KeySelector<Tuple7<String, String, String, String, String, String, Long>, String>() {
public String getKey(Tuple7<String, String, String, String, String, String, Long> t1) throws Exception {
return t1.f0;
}
}).window(TumblingEventTimeWindows.of(Time.seconds(15)))
.apply(new JoinFunction<Tuple11<String, String, String, String, String, String, String, String, String, String, Long>, Tuple7<String, String, String, String, String, String, Long>, Tuple17<String, String, String, String, String, String, String, String, String, String, String, String, String, String, String, Long, Long>>() {

public Tuple17<String, String, String, String, String, String, String, String, String, String, String, String, String, String, String, Long, Long> join(
Tuple11<String, String, String, String, String, String, String, String, String, String, Long> first,
Tuple7<String, String, String, String, String, String, Long> second) {
return new Tuple17<String, String, String, String, String, String, String, String, String, String, String, String, String, String, String, Long, Long>(first.f0, first.f1, first.f2, first.f3, first.f4, first.f5, first.f6, first.f7, first.f8, first.f9, second.f1, second.f2, second.f3, second.f4, second.f5, second.f6, first.f10);
}
}).print();


Error:

Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
at org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:630)
at org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:123)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1511)
at com.argoid.flink.streaming.kafkajoin.JoinKafka.main(JoinKafka.java:155)
Caused by: java.lang.RuntimeException: Record has Long.MIN_VALUE timestamp (= no timestamp marker). Is the time characteristic set to 'ProcessingTime', or did you forget to call 'DataStream.assignTimestampsAndWatermarks(...)'?
at org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows.assignWindows(TumblingEventTimeWindows.java:69)
at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:295)
at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)
at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
at java.lang.Thread.run(Thread.java:748)

In formatStream1 and formatStream2 variable data is coming I checked by printing them. So, the issue is in the code which I shared. Thanks in advance!!!

Thanks,


Abhijeet Kumar
Software Development Engineer,
Sentienz Solutions Pvt Ltd
Cognitive Data Platform - Perceive the Data !

-- 
Regards,
Nagarjun

Success is not final, failure is not fatal: it is the courage to continue that counts. 
- Winston Churchill -