Watermarks with repartition

classic Classic list List threaded Threaded
7 messages Options
Reply | Threaded
Open this post in threaded view
|

Watermarks with repartition

Zach Cox
Hi - how are watermarks passed along parallel tasks where there is a repartition? For example, say I have a simple streaming job computing hourly counts per key, something like this:

val environment = StreamExecutionEnvironment.getExecutionEnvironment
environment.setParallelism(2)
environment.setStreamTimeCharacteristic(EventTime)
environment.getConfig.enableTimestamps()
environment
  .addSource(...)
  .assignAscendingTimestamps(_.timestamp)
  .keyBy("someField")
  .timeWindow(Time.hours(1))
  .fold(0, (count, element) => count + 1)
  .addSink(...)
environment.execute("example")

Say the source has 2 parallel partitions (e.g. Kafka topic) and the events from the source contain timestamps, but over time the 2 source tasks diverge in event time (maybe 1 Kafka topic partition has many more events than the other).

The job graph looks like this: http://imgur.com/hxEpF6b

From what I can tell, the execution graph, with parallelism=2, would look like this: http://imgur.com/pSX8ov5. The keyBy causes a hash partition to be used, so that events with the same key end up at the same window subtask, regardless of which source partition they came from. 

Since the watermarks are skewed between the parallel pipelines, what happens when differing watermarks are sent to the window count operators? Is something tracking the min incoming watermark there? Could anyone point me to Flink code that implements this? I'd really like to learn more about how this works.

Thanks,
Zach


Reply | Threaded
Open this post in threaded view
|

Re: Watermarks with repartition

Zach Cox
I think I found the information I was looking for:

RecordWriter broadcasts each emitted watermark to all outgoing channels [1].

StreamInputProcessor tracks the max watermark received on each incoming channel separately, and computes the task's watermark as the min of all incoming watermarks [2].

Is this an accurate summary of Flink's watermark propagation?

So in my previous example, each window count task is building up a count for each window based on incoming event's timestamp, and when all incoming watermarks have progressed beyond the end of the window, the count is emitted. So if one partition's watermark lags behind the other, it just means the window output is triggered based on this lagging watermark.

-Zach



On Thu, Feb 25, 2016 at 3:31 PM Zach Cox <[hidden email]> wrote:
Hi - how are watermarks passed along parallel tasks where there is a repartition? For example, say I have a simple streaming job computing hourly counts per key, something like this:

val environment = StreamExecutionEnvironment.getExecutionEnvironment
environment.setParallelism(2)
environment.setStreamTimeCharacteristic(EventTime)
environment.getConfig.enableTimestamps()
environment
  .addSource(...)
  .assignAscendingTimestamps(_.timestamp)
  .keyBy("someField")
  .timeWindow(Time.hours(1))
  .fold(0, (count, element) => count + 1)
  .addSink(...)
environment.execute("example")

Say the source has 2 parallel partitions (e.g. Kafka topic) and the events from the source contain timestamps, but over time the 2 source tasks diverge in event time (maybe 1 Kafka topic partition has many more events than the other).

The job graph looks like this: http://imgur.com/hxEpF6b

From what I can tell, the execution graph, with parallelism=2, would look like this: http://imgur.com/pSX8ov5. The keyBy causes a hash partition to be used, so that events with the same key end up at the same window subtask, regardless of which source partition they came from. 

Since the watermarks are skewed between the parallel pipelines, what happens when differing watermarks are sent to the window count operators? Is something tracking the min incoming watermark there? Could anyone point me to Flink code that implements this? I'd really like to learn more about how this works.

Thanks,
Zach


Reply | Threaded
Open this post in threaded view
|

Re: Watermarks with repartition

Aljoscha Krettek
Hi,
yes, your description is spot on!

Cheers,
Aljoscha

> On 26 Feb 2016, at 00:19, Zach Cox <[hidden email]> wrote:
>
> I think I found the information I was looking for:
>
> RecordWriter broadcasts each emitted watermark to all outgoing channels [1].
>
> StreamInputProcessor tracks the max watermark received on each incoming channel separately, and computes the task's watermark as the min of all incoming watermarks [2].
>
> Is this an accurate summary of Flink's watermark propagation?
>
> So in my previous example, each window count task is building up a count for each window based on incoming event's timestamp, and when all incoming watermarks have progressed beyond the end of the window, the count is emitted. So if one partition's watermark lags behind the other, it just means the window output is triggered based on this lagging watermark.
>
> -Zach
>
> [1] https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java#L103
> [2] https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java#L147
>
>
> On Thu, Feb 25, 2016 at 3:31 PM Zach Cox <[hidden email]> wrote:
> Hi - how are watermarks passed along parallel tasks where there is a repartition? For example, say I have a simple streaming job computing hourly counts per key, something like this:
>
> val environment = StreamExecutionEnvironment.getExecutionEnvironment
> environment.setParallelism(2)
> environment.setStreamTimeCharacteristic(EventTime)
> environment.getConfig.enableTimestamps()
> environment
>   .addSource(...)
>   .assignAscendingTimestamps(_.timestamp)
>   .keyBy("someField")
>   .timeWindow(Time.hours(1))
>   .fold(0, (count, element) => count + 1)
>   .addSink(...)
> environment.execute("example")
>
> Say the source has 2 parallel partitions (e.g. Kafka topic) and the events from the source contain timestamps, but over time the 2 source tasks diverge in event time (maybe 1 Kafka topic partition has many more events than the other).
>
> The job graph looks like this: http://imgur.com/hxEpF6b
>
> From what I can tell, the execution graph, with parallelism=2, would look like this: http://imgur.com/pSX8ov5. The keyBy causes a hash partition to be used, so that events with the same key end up at the same window subtask, regardless of which source partition they came from.
>
> Since the watermarks are skewed between the parallel pipelines, what happens when differing watermarks are sent to the window count operators? Is something tracking the min incoming watermark there? Could anyone point me to Flink code that implements this? I'd really like to learn more about how this works.
>
> Thanks,
> Zach
>
>

Reply | Threaded
Open this post in threaded view
|

Re: Watermarks with repartition

Zach Cox
Thanks for the confirmation Aljoscha! I wrote up results from my little experiment: https://github.com/zcox/flink-repartition-watermark-example

-Zach


On Fri, Feb 26, 2016 at 2:58 AM Aljoscha Krettek <[hidden email]> wrote:
Hi,
yes, your description is spot on!

Cheers,
Aljoscha
> On 26 Feb 2016, at 00:19, Zach Cox <[hidden email]> wrote:
>
> I think I found the information I was looking for:
>
> RecordWriter broadcasts each emitted watermark to all outgoing channels [1].
>
> StreamInputProcessor tracks the max watermark received on each incoming channel separately, and computes the task's watermark as the min of all incoming watermarks [2].
>
> Is this an accurate summary of Flink's watermark propagation?
>
> So in my previous example, each window count task is building up a count for each window based on incoming event's timestamp, and when all incoming watermarks have progressed beyond the end of the window, the count is emitted. So if one partition's watermark lags behind the other, it just means the window output is triggered based on this lagging watermark.
>
> -Zach
>
> [1] https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java#L103
> [2] https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java#L147
>
>
> On Thu, Feb 25, 2016 at 3:31 PM Zach Cox <[hidden email]> wrote:
> Hi - how are watermarks passed along parallel tasks where there is a repartition? For example, say I have a simple streaming job computing hourly counts per key, something like this:
>
> val environment = StreamExecutionEnvironment.getExecutionEnvironment
> environment.setParallelism(2)
> environment.setStreamTimeCharacteristic(EventTime)
> environment.getConfig.enableTimestamps()
> environment
>   .addSource(...)
>   .assignAscendingTimestamps(_.timestamp)
>   .keyBy("someField")
>   .timeWindow(Time.hours(1))
>   .fold(0, (count, element) => count + 1)
>   .addSink(...)
> environment.execute("example")
>
> Say the source has 2 parallel partitions (e.g. Kafka topic) and the events from the source contain timestamps, but over time the 2 source tasks diverge in event time (maybe 1 Kafka topic partition has many more events than the other).
>
> The job graph looks like this: http://imgur.com/hxEpF6b
>
> From what I can tell, the execution graph, with parallelism=2, would look like this: http://imgur.com/pSX8ov5. The keyBy causes a hash partition to be used, so that events with the same key end up at the same window subtask, regardless of which source partition they came from.
>
> Since the watermarks are skewed between the parallel pipelines, what happens when differing watermarks are sent to the window count operators? Is something tracking the min incoming watermark there? Could anyone point me to Flink code that implements this? I'd really like to learn more about how this works.
>
> Thanks,
> Zach
>
>

Reply | Threaded
Open this post in threaded view
|

Re: Watermarks with repartition

Aljoscha Krettek
Cool, that’s a nice write up. Would you maybe be interested in integrating this as some sort of internal documentation in Flink? So that prospective contributors can get to know this stuff.

Cheers,
Aljoscha

> On 26 Feb 2016, at 18:32, Zach Cox <[hidden email]> wrote:
>
> Thanks for the confirmation Aljoscha! I wrote up results from my little experiment: https://github.com/zcox/flink-repartition-watermark-example
>
> -Zach
>
>
> On Fri, Feb 26, 2016 at 2:58 AM Aljoscha Krettek <[hidden email]> wrote:
> Hi,
> yes, your description is spot on!
>
> Cheers,
> Aljoscha
> > On 26 Feb 2016, at 00:19, Zach Cox <[hidden email]> wrote:
> >
> > I think I found the information I was looking for:
> >
> > RecordWriter broadcasts each emitted watermark to all outgoing channels [1].
> >
> > StreamInputProcessor tracks the max watermark received on each incoming channel separately, and computes the task's watermark as the min of all incoming watermarks [2].
> >
> > Is this an accurate summary of Flink's watermark propagation?
> >
> > So in my previous example, each window count task is building up a count for each window based on incoming event's timestamp, and when all incoming watermarks have progressed beyond the end of the window, the count is emitted. So if one partition's watermark lags behind the other, it just means the window output is triggered based on this lagging watermark.
> >
> > -Zach
> >
> > [1] https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java#L103
> > [2] https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java#L147
> >
> >
> > On Thu, Feb 25, 2016 at 3:31 PM Zach Cox <[hidden email]> wrote:
> > Hi - how are watermarks passed along parallel tasks where there is a repartition? For example, say I have a simple streaming job computing hourly counts per key, something like this:
> >
> > val environment = StreamExecutionEnvironment.getExecutionEnvironment
> > environment.setParallelism(2)
> > environment.setStreamTimeCharacteristic(EventTime)
> > environment.getConfig.enableTimestamps()
> > environment
> >   .addSource(...)
> >   .assignAscendingTimestamps(_.timestamp)
> >   .keyBy("someField")
> >   .timeWindow(Time.hours(1))
> >   .fold(0, (count, element) => count + 1)
> >   .addSink(...)
> > environment.execute("example")
> >
> > Say the source has 2 parallel partitions (e.g. Kafka topic) and the events from the source contain timestamps, but over time the 2 source tasks diverge in event time (maybe 1 Kafka topic partition has many more events than the other).
> >
> > The job graph looks like this: http://imgur.com/hxEpF6b
> >
> > From what I can tell, the execution graph, with parallelism=2, would look like this: http://imgur.com/pSX8ov5. The keyBy causes a hash partition to be used, so that events with the same key end up at the same window subtask, regardless of which source partition they came from.
> >
> > Since the watermarks are skewed between the parallel pipelines, what happens when differing watermarks are sent to the window count operators? Is something tracking the min incoming watermark there? Could anyone point me to Flink code that implements this? I'd really like to learn more about how this works.
> >
> > Thanks,
> > Zach
> >
> >
>

Reply | Threaded
Open this post in threaded view
|

Re: Watermarks with repartition

Zach Cox
Sure, want me to open a jira issue and then PR a new page into https://github.com/apache/flink/tree/master/docs/internals, following these instructions? http://flink.apache.org/contribute-documentation.html

-Zach


On Fri, Feb 26, 2016 at 1:13 PM Aljoscha Krettek <[hidden email]> wrote:
Cool, that’s a nice write up. Would you maybe be interested in integrating this as some sort of internal documentation in Flink? So that prospective contributors can get to know this stuff.

Cheers,
Aljoscha
> On 26 Feb 2016, at 18:32, Zach Cox <[hidden email]> wrote:
>
> Thanks for the confirmation Aljoscha! I wrote up results from my little experiment: https://github.com/zcox/flink-repartition-watermark-example
>
> -Zach
>
>
> On Fri, Feb 26, 2016 at 2:58 AM Aljoscha Krettek <[hidden email]> wrote:
> Hi,
> yes, your description is spot on!
>
> Cheers,
> Aljoscha
> > On 26 Feb 2016, at 00:19, Zach Cox <[hidden email]> wrote:
> >
> > I think I found the information I was looking for:
> >
> > RecordWriter broadcasts each emitted watermark to all outgoing channels [1].
> >
> > StreamInputProcessor tracks the max watermark received on each incoming channel separately, and computes the task's watermark as the min of all incoming watermarks [2].
> >
> > Is this an accurate summary of Flink's watermark propagation?
> >
> > So in my previous example, each window count task is building up a count for each window based on incoming event's timestamp, and when all incoming watermarks have progressed beyond the end of the window, the count is emitted. So if one partition's watermark lags behind the other, it just means the window output is triggered based on this lagging watermark.
> >
> > -Zach
> >
> > [1] https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java#L103
> > [2] https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java#L147
> >
> >
> > On Thu, Feb 25, 2016 at 3:31 PM Zach Cox <[hidden email]> wrote:
> > Hi - how are watermarks passed along parallel tasks where there is a repartition? For example, say I have a simple streaming job computing hourly counts per key, something like this:
> >
> > val environment = StreamExecutionEnvironment.getExecutionEnvironment
> > environment.setParallelism(2)
> > environment.setStreamTimeCharacteristic(EventTime)
> > environment.getConfig.enableTimestamps()
> > environment
> >   .addSource(...)
> >   .assignAscendingTimestamps(_.timestamp)
> >   .keyBy("someField")
> >   .timeWindow(Time.hours(1))
> >   .fold(0, (count, element) => count + 1)
> >   .addSink(...)
> > environment.execute("example")
> >
> > Say the source has 2 parallel partitions (e.g. Kafka topic) and the events from the source contain timestamps, but over time the 2 source tasks diverge in event time (maybe 1 Kafka topic partition has many more events than the other).
> >
> > The job graph looks like this: http://imgur.com/hxEpF6b
> >
> > From what I can tell, the execution graph, with parallelism=2, would look like this: http://imgur.com/pSX8ov5. The keyBy causes a hash partition to be used, so that events with the same key end up at the same window subtask, regardless of which source partition they came from.
> >
> > Since the watermarks are skewed between the parallel pipelines, what happens when differing watermarks are sent to the window count operators? Is something tracking the min incoming watermark there? Could anyone point me to Flink code that implements this? I'd really like to learn more about how this works.
> >
> > Thanks,
> > Zach
> >
> >
>

Reply | Threaded
Open this post in threaded view
|

Re: Watermarks with repartition

Aljoscha Krettek
Yes, that would be perfect. Thanks!

--
Aljoscha

> On 26 Feb 2016, at 20:53, Zach Cox <[hidden email]> wrote:
>
> Sure, want me to open a jira issue and then PR a new page into https://github.com/apache/flink/tree/master/docs/internals, following these instructions? http://flink.apache.org/contribute-documentation.html
>
> -Zach
>
>
> On Fri, Feb 26, 2016 at 1:13 PM Aljoscha Krettek <[hidden email]> wrote:
> Cool, that’s a nice write up. Would you maybe be interested in integrating this as some sort of internal documentation in Flink? So that prospective contributors can get to know this stuff.
>
> Cheers,
> Aljoscha
> > On 26 Feb 2016, at 18:32, Zach Cox <[hidden email]> wrote:
> >
> > Thanks for the confirmation Aljoscha! I wrote up results from my little experiment: https://github.com/zcox/flink-repartition-watermark-example
> >
> > -Zach
> >
> >
> > On Fri, Feb 26, 2016 at 2:58 AM Aljoscha Krettek <[hidden email]> wrote:
> > Hi,
> > yes, your description is spot on!
> >
> > Cheers,
> > Aljoscha
> > > On 26 Feb 2016, at 00:19, Zach Cox <[hidden email]> wrote:
> > >
> > > I think I found the information I was looking for:
> > >
> > > RecordWriter broadcasts each emitted watermark to all outgoing channels [1].
> > >
> > > StreamInputProcessor tracks the max watermark received on each incoming channel separately, and computes the task's watermark as the min of all incoming watermarks [2].
> > >
> > > Is this an accurate summary of Flink's watermark propagation?
> > >
> > > So in my previous example, each window count task is building up a count for each window based on incoming event's timestamp, and when all incoming watermarks have progressed beyond the end of the window, the count is emitted. So if one partition's watermark lags behind the other, it just means the window output is triggered based on this lagging watermark.
> > >
> > > -Zach
> > >
> > > [1] https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java#L103
> > > [2] https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java#L147
> > >
> > >
> > > On Thu, Feb 25, 2016 at 3:31 PM Zach Cox <[hidden email]> wrote:
> > > Hi - how are watermarks passed along parallel tasks where there is a repartition? For example, say I have a simple streaming job computing hourly counts per key, something like this:
> > >
> > > val environment = StreamExecutionEnvironment.getExecutionEnvironment
> > > environment.setParallelism(2)
> > > environment.setStreamTimeCharacteristic(EventTime)
> > > environment.getConfig.enableTimestamps()
> > > environment
> > >   .addSource(...)
> > >   .assignAscendingTimestamps(_.timestamp)
> > >   .keyBy("someField")
> > >   .timeWindow(Time.hours(1))
> > >   .fold(0, (count, element) => count + 1)
> > >   .addSink(...)
> > > environment.execute("example")
> > >
> > > Say the source has 2 parallel partitions (e.g. Kafka topic) and the events from the source contain timestamps, but over time the 2 source tasks diverge in event time (maybe 1 Kafka topic partition has many more events than the other).
> > >
> > > The job graph looks like this: http://imgur.com/hxEpF6b
> > >
> > > From what I can tell, the execution graph, with parallelism=2, would look like this: http://imgur.com/pSX8ov5. The keyBy causes a hash partition to be used, so that events with the same key end up at the same window subtask, regardless of which source partition they came from.
> > >
> > > Since the watermarks are skewed between the parallel pipelines, what happens when differing watermarks are sent to the window count operators? Is something tracking the min incoming watermark there? Could anyone point me to Flink code that implements this? I'd really like to learn more about how this works.
> > >
> > > Thanks,
> > > Zach
> > >
> > >
> >
>