Re: How to perform this join operation?

Posted by Till Rohrmann on
URL: http://deprecated-apache-flink-user-mailing-list-archive.369.s1.nabble.com/How-to-perform-this-join-operation-tp6088p6268.html

Hi Elias,

sorry for the late reply. You're right that with the windowed join you would have to deal with pairs where the timestamp of (x,y) is not necessarily earlier than the timestamp of z. Moreover, by using sliding windows you would receive duplicates as you've described. Using tumbling windows would mean that you lose join matches if (x,y) lives in an earlier window. Thus, in order to solve your problem you would have to write a custom stream operator.

The stream operator would do the following: Collecting the inputs from (x,y) and z which are already keyed. Thus, we know that x=z holds true. Using a priority queue we order the elements because we don't know how the arrive at the operator. Whenever we receive a watermark indicating that no earlier events can arrive anymore, we can go through the two priority queues to join the elements. The queues are part of the operators state so that we don't lose information in case of a recovery.

I've sketched such an operator here [1]. I hope this helps you to get started.

[1] https://github.com/tillrohrmann/custom-join

Cheers,
Till

On Thu, Apr 14, 2016 at 5:12 PM, Elias Levy <[hidden email]> wrote:
Anyone from Data Artisans have some idea of how to go about this?

On Wed, Apr 13, 2016 at 5:32 PM, Maxim <[hidden email]> wrote:
You could simulate the Samza approach by having a RichFlatMapFunction over cogrouped streams that maintains the sliding window in its ListState. As I understand the drawback is that the list state is not maintained in the managed memory.
I'm interested to hear about the right way to implement this.

On Wed, Apr 13, 2016 at 3:53 PM, Elias Levy <[hidden email]> wrote:
I am wondering how you would implement the following function in Flink.  The function takes as input two streams.  One stream can be viewed a a tuple with two value (x, y), the second stream is a stream of individual values z.  The function keeps a time based window on the first input (say, 24 hours).  Whenever it receives an element from the second stream, it compares the value z against the x element of each tuple in the window, and for each match it emits (x, y).  You are basically doing a join on x=z.  Note that values from the second stream are not windowed and they are only matched to values from the first stream with an earlier timestamps.

This was relatively easy to implement in Samza.  Consume off two topics, the first keyed by x and the second by z.  Consume both topics in a job.  Messages with the same key would be consumed by the same task.  The task could maintain a window of messages from the first stream in its local state,  Whenever a message came in via the second stream, it could look up in the local state for matching messages, and if it found any, send them to the output stream.  Obviously, with Samza you don't have the luxury of the system handling event time for you, but this work well and it is easy to implement.  

I am not clear how this would be implemented in Flink.

It is easy enough to partition by key either stream, and to window the first stream using a sliding window, but from there out things get complicated.

You can join the two streams by key, but you must do so using the same window for both streams.  That means events from the first stream may be matched to older events of the second stream, which is not what we want.  I suppose if both included a timestamp, you could later add a filter to remove such events from the merged stream.  But you would also have to deal with duplicates, as the window is a sliding window and the same two elements may match across all window panes that contain the matching elements.  So you need to dedup as well.

coGroup seems like it would suffer from the same issues.

Maybe the answer is connected streams, but there is scant documentation on the semantics of ConnectedStreams.  There isn't even an example that I could find that makes use of them.

Thoughts?