http://deprecated-apache-flink-user-mailing-list-archive.369.s1.nabble.com/streaming-join-implementation-tp6095p6108.html
Hi,
right now, Flink does not give you a way to get the the records that where not joined for a join. You can, however use a co-group operation instead of a join to figure out which records did not join with records from the other side and treat them separately.
Let me show an example:
val input1: DataStream[A] = ...
val input2: DataStream[B] = ...
val result = input1.coGroup(input2)
.where(_.key1)
.equalTo(_.key2)
.window(TumblingTimeWindows.of(Time.days(1)))
.apply(new MyCoGroupFunction)
class MyCoGroupFunction {
void coGroup(Iterable[A] first, Iterable[B] second, Collector[O] out) {
if (!first.iterator().hasNext()) {
// no element from first input matched
out.collect(<message telling that I only have second input elements>)
} else if (!second.iterator().hasNext()) {
out.collect(<message telling that I only have first input elements>)
} else {
// perform the actual join using the two iterables
}
}
}
The result will be a stream that contains both join results as well as the elements telling you that something didn't join. You can process this stream further by splitting it into different streams of only proper join results and non-joined elements and so on.
I hope this helps somewhat.
Cheers,
Aljoscha
On Thu, 14 Apr 2016 at 08:55 Balaji Rajagopalan <
[hidden email]> wrote:
Let me give you specific example, say stream1 event1 happened within your window 0-5 min with key1, and event2 on stream2 with key2 which could have matched with key1 happened at 5:01 outside the join window, so now you will have to co-relate the event2 on stream2 with the event1 with stream1 which has happened on the previous window, this was the corner case I mentioned before. I am not aware if flink can solve this problem for you, that would be nice, instead of solving this in application.