allowed lateness on windowed join?

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

allowed lateness on windowed join?

Saiph Kappa
Hi all,

Is it possible to specify  allowed lateness for a window join like the following one:

val tweetsAndWarning = warningsPerStock.join(tweetsPerStock).where(_.symbol).equalTo(_.symbol)
.window(SlidingEventTimeWindows.of(Time.of(windowDurationSec, TimeUnit.SECONDS), Time.of(windowDurationSec,
TimeUnit.SECONDS)))
.apply((c1, c2) => (c1.count, c2.count))

I think it is related with these: 


Thanks!
Reply | Threaded
Open this post in threaded view
|

Re: allowed lateness on windowed join?

Aljoscha Krettek
Hi,
I'm afraid that's not possible but you can use a regular stream and do the join yourself. What the code for JoinedStreams essentially does is take two streams, map them to a common data type, union them and then perform a normal window operation.

The code for this is in CoGroupedStreams (as the general case of a join) and JoinedStreams.

Cheers,
Aljoscha

On Mon, 30 Jan 2017 at 17:38 Saiph Kappa <[hidden email]> wrote:
Hi all,

Is it possible to specify  allowed lateness for a window join like the following one:

val tweetsAndWarning = warningsPerStock.join(tweetsPerStock).where(_.symbol).equalTo(_.symbol)
.window(SlidingEventTimeWindows.of(Time.of(windowDurationSec, TimeUnit.SECONDS), Time.of(windowDurationSec,
TimeUnit.SECONDS)))
.apply((c1, c2) => (c1.count, c2.count))

I think it is related with these: 


Thanks!
Reply | Threaded
Open this post in threaded view
|

Re: allowed lateness on windowed join?

Saiph Kappa
So, you are saying that I can do the join with a regular stream by using the union transformation? For that, I would need to know which data belongs to which stream. I can add some tags to the streamed data so that I would know by which order I should join the elements. This was what you were proposing right? The only drawback, I think, is that tuples in both upstreams would have to be scanned 2 times: 1 time for performing the union, and then again to perform the join in a custom function.

Thanks! 

On Fri, Feb 3, 2017 at 2:48 PM, Aljoscha Krettek <[hidden email]> wrote:
Hi,
I'm afraid that's not possible but you can use a regular stream and do the join yourself. What the code for JoinedStreams essentially does is take two streams, map them to a common data type, union them and then perform a normal window operation.

The code for this is in CoGroupedStreams (as the general case of a join) and JoinedStreams.

Cheers,
Aljoscha

On Mon, 30 Jan 2017 at 17:38 Saiph Kappa <[hidden email]> wrote:
Hi all,

Is it possible to specify  allowed lateness for a window join like the following one:

val tweetsAndWarning = warningsPerStock.join(tweetsPerStock).where(_.symbol).equalTo(_.symbol)
.window(SlidingEventTimeWindows.of(Time.of(windowDurationSec, TimeUnit.SECONDS), Time.of(windowDurationSec,
TimeUnit.SECONDS)))
.apply((c1, c2) => (c1.count, c2.count))

I think it is related with these: 


Thanks!

Reply | Threaded
Open this post in threaded view
|

Re: allowed lateness on windowed join?

Fabian Hueske-2
Hi,

Union is a super cheap operator in Flink. It does not scan the records, but just merges the streams. So the effort is very low.
The built-in join operator works in the same way but does not expose allowed lateness.

Cheers, Fabian
Reply | Threaded
Open this post in threaded view
|

Re: allowed lateness on windowed join?

Aljoscha Krettek
And to add to that: yes, this is what I was suggesting. :-)

On Mon, 6 Feb 2017 at 09:58 Fabian Hueske <[hidden email]> wrote:
Hi,

Union is a super cheap operator in Flink. It does not scan the records, but just merges the streams. So the effort is very low.
The built-in join operator works in the same way but does not expose allowed lateness.

Cheers, Fabian