PartitionedState and watermark of Window coGroup()

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

PartitionedState and watermark of Window coGroup()

Hung
Hi Flink users,

I'm a bit confused about how these two work when writing trigger for window coGroup().

Stream1.assignTimestampsAndWatermarks(new EventWatermark())
                .coGroup(Stream2.assignTimestampsAndWatermarks(new EventWatermark()))
                .where(new JSONKey("key")).equalTo(new JSONKey("key"))
                .window(TumblingEventTimeWindows.of(Time.days(7))).trigger(new CoGroupTrigger())
                .apply(new CoGroupFunction<JSONObject, JSONObject, JSONObject>() {
...
...
}

The documentation describes(
https://github.com/apache/flink/blob/master/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/CoGroupedStreams.scala#L37)
A streaming co-group operation is evaluated over elements in a window.

Does it mean elements are not keyed by the joined key?

Another question is does the watermark take the minimum timestamps of two streams (or keyed streams, if elements are keyed)?

Our trigger is going to fire the window when watermark has passed two keyed streams' timestamps, because one stream is much smaller and consumed much faster. A similar trigger has been implemented for keyed stream, which means if we cannot make coGroup() behave as expected, we can still union those two streams into one stream and use similar solution.

Best,

Sendoh
Reply | Threaded
Open this post in threaded view
|

Re: PartitionedState and watermark of Window coGroup()

rmetzger0
Hi,
elements are coGrouped on the specified key. So only elements with the same key in both streams end up in the same group.

Yes, the watermark uses the minimum of both streams.




On Tue, Dec 13, 2016 at 7:02 PM, Sendoh <[hidden email]> wrote:
Hi Flink users,

I'm a bit confused about how these two work when writing trigger for window
coGroup().

Stream1.assignTimestampsAndWatermarks(new EventWatermark())
                .coGroup(Stream2.assignTimestampsAndWatermarks(new
EventWatermark()))
                .where(new JSONKey("key")).equalTo(new JSONKey("key"))

.window(TumblingEventTimeWindows.of(Time.days(7))).trigger(new
CoGroupTrigger())
                .apply(new CoGroupFunction<JSONObject, JSONObject,
JSONObject>() {
...
...
}

The documentation describes(
https://github.com/apache/flink/blob/master/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/CoGroupedStreams.scala#L37)
*A streaming co-group operation is evaluated over elements in a window.*

Does it mean elements are not keyed by the joined key?

Another question is does the watermark take the minimum timestamps of two
streams (or keyed streams, if elements are keyed)?

Our trigger is going to fire the window when watermark has passed two keyed
streams' timestamps, because one stream is much smaller and consumed much
faster. A similar trigger has been implemented for keyed stream, which means
if we cannot make coGroup() behave as expected, we can still union those two
streams into one stream and use similar solution.

Best,

Sendoh




--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/PartitionedState-and-watermark-of-Window-coGroup-tp10620.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.