DataStream<Tuple11<String, String, String, String, String, String, String, String, String, String, Long>> withTimestampsAndWatermarks1 = formatStream1
.assignTimestampsAndWatermarks(
new BoundedOutOfOrdernessTimestampExtractor<Tuple11<String, String, String, String, String, String, String, String, String, String, Long>>(
Time.seconds(Integer.parseInt(parameters.get("watermarkTime")))) {
/**
*
*/
private static final long serialVersionUID = 1L;
@Override
public long extractTimestamp(
Tuple11<String, String, String, String, String, String, String, String, String, String, Long> element) {
return element.f10;
}
});
DataStream<Tuple7<String, String, String, String, String, String, Long>> withTimestampsAndWatermarks2 = formatStream2
.assignTimestampsAndWatermarks(
new BoundedOutOfOrdernessTimestampExtractor<Tuple7<String, String, String, String, String, String, Long>>(
Time.seconds(Integer.parseInt(parameters.get("watermarkTime")))) {
/**
*
*/
private static final long serialVersionUID = 1L;
@Override
public long extractTimestamp(
Tuple7<String, String, String, String, String, String, Long> element) {
return element.f6;
}
});
withTimestampsAndWatermarks1.print();
withTimestampsAndWatermarks2.print();
DataStream< Tuple17<String, String, String, String, String, String, String, String, String, String, String, String, String, String, String, Long, Long>> joined = withTimestampsAndWatermarks1
.join(withTimestampsAndWatermarks2)
.where(new KeySelector<Tuple11<String, String, String, String, String, String, String, String, String, String, Long>, String>() {
/**
*
*/
private static final long serialVersionUID = 1L;
public String getKey(
Tuple11<String, String, String, String, String, String, String, String, String, String, Long> t1)
throws Exception {
return t1.f0;
}
}).equalTo(new KeySelector<Tuple7<String, String, String, String, String, String, Long>, String>() {
/**
*
*/
private static final long serialVersionUID = 1L;
public String getKey(Tuple7<String, String, String, String, String, String, Long> t1)
throws Exception {
return t1.f0;
}
}).window(TumblingEventTimeWindows.of(Time.seconds(15)))
.apply(new JoinFunction<Tuple11<String, String, String, String, String, String, String, String, String, String, Long>, Tuple7<String, String, String, String, String, String, Long>, Tuple17<String, String, String, String, String, String, String, String, String, String, String, String, String, String, String, Long, Long>>() {
/**
*
*/
private static final long serialVersionUID = 1L;
public Tuple17<String, String, String, String, String, String, String, String, String, String, String, String, String, String, String, Long, Long> join(
Tuple11<String, String, String, String, String, String, String, String, String, String, Long> first,
Tuple7<String, String, String, String, String, String, Long> second) {
return new Tuple17<String, String, String, String, String, String, String, String, String, String, String, String, String, String, String, Long, Long>(first.f0, first.f1, first.f2, first.f3, first.f4, first.f5, first.f6, first.f7, first.f8, first.f9, second.f1, second.f2, second.f3, second.f4, second.f5, second.f6, first.f10);
}
});
joined.print();
Ok, so now I did it like this. Errors resolved! but, now I'm not able to see any output when I'm printing joined datastream.
Looks like you need to assign time stamps and emit watermarks to both the streams viz. formatStream1 and formatStream2 as described at
On Thu, Nov 22, 2018 at 10:55 PM Abhijeet Kumar <
[hidden email]> wrote:
Hello Team,
I'm new to Flink and coming from Spark background. I need help in completing this stream job. I'm reading data from two different Kafka topics and I want to join them.
My code:
formatStream1.join(formatStream2)
.where(new KeySelector<Tuple11<String, String, String, String, String, String, String, String, String, String, Long>, String>() {
public String getKey(Tuple11<String, String, String, String, String, String, String, String, String, String, Long> t1) throws Exception {
return t1.f0;
}
})
.equalTo(new KeySelector<Tuple7<String, String, String, String, String, String, Long>, String>() {
public String getKey(Tuple7<String, String, String, String, String, String, Long> t1) throws Exception {
return t1.f0;
}
}).window(TumblingEventTimeWindows.of(Time.seconds(15)))
.apply(new JoinFunction<Tuple11<String, String, String, String, String, String, String, String, String, String, Long>, Tuple7<String, String, String, String, String, String, Long>, Tuple17<String, String, String, String, String, String, String, String, String, String, String, String, String, String, String, Long, Long>>() {
public Tuple17<String, String, String, String, String, String, String, String, String, String, String, String, String, String, String, Long, Long> join(
Tuple11<String, String, String, String, String, String, String, String, String, String, Long> first,
Tuple7<String, String, String, String, String, String, Long> second) {
return new Tuple17<String, String, String, String, String, String, String, String, String, String, String, String, String, String, String, Long, Long>(first.f0, first.f1, first.f2, first.f3, first.f4, first.f5, first.f6, first.f7, first.f8, first.f9, second.f1, second.f2, second.f3, second.f4, second.f5, second.f6, first.f10);
}
}).print();
Error:
Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
at org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:630)
at org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:123)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1511)
at com.argoid.flink.streaming.kafkajoin.JoinKafka.main(JoinKafka.java:155)
Caused by: java.lang.RuntimeException: Record has Long.MIN_VALUE timestamp (= no timestamp marker). Is the time characteristic set to 'ProcessingTime', or did you forget to call 'DataStream.assignTimestampsAndWatermarks(...)'?
at org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows.assignWindows(TumblingEventTimeWindows.java:69)
at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:295)
at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)
at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
at java.lang.Thread.run(Thread.java:748)
In formatStream1 and formatStream2 variable data is coming I checked by printing them. So, the issue is in the code which I shared. Thanks in advance!!!
Thanks,
 |
| Abhijeet Kumar Software Development Engineer, Sentienz Solutions Pvt Ltd Cognitive Data Platform - Perceive the Data !
|
-- Regards,
Nagarjun
Success is not final, failure is not fatal: it is the courage to continue that counts.
- Winston Churchill -