Re: How to perform this join operation?

Posted by Elias Levy on
URL: http://deprecated-apache-flink-user-mailing-list-archive.369.s1.nabble.com/How-to-perform-this-join-operation-tp6088p7061.html

Till,

An issue with your suggestion is that the job state may grow unbounded. You are managing
expiration of data from the cache in the operator, but the state is partitioned by the stream key.
That means if we no longer observe a key, the state associated with that key will never be
removed.  

In my data set keys come and go, and many will never be observed again.  That will lead to
continuous state growth over time.


On Mon, May 2, 2016 at 6:06 PM, Elias Levy <[hidden email]> wrote:
Thanks for the suggestion.  I ended up implementing it a different way.

[...]

On Wed, Apr 20, 2016 at 10:13 AM, Till Rohrmann <[hidden email]> wrote:
orry for the late reply. You're right that with the windowed join you would have to deal with pairs where the timestamp of (x,y) is not necessarily earlier than the timestamp of z. Moreover, by using sliding windows you would receive duplicates as you've described. Using tumbling windows would mean that you lose join matches if (x,y) lives in an earlier window. Thus, in order to solve your problem you would have to write a custom stream operator.

The stream operator would do the following: Collecting the inputs from (x,y) and z which are already keyed. Thus, we know that x=z holds true. Using a priority queue we order the elements because we don't know how the arrive at the operator. Whenever we receive a watermark indicating that no earlier events can arrive anymore, we can go through the two priority queues to join the elements. The queues are part of the operators state so that we don't lose information in case of a recovery.

I've sketched such an operator here [1]. I hope this helps you to get started.