At what point do watermarks get injected into the stream?

classic Classic list List threaded Threaded
10 messages Options
Reply | Threaded
Open this post in threaded view
|

At what point do watermarks get injected into the stream?

Ray Ruvinskiy

I’m trying to build a mental model of how watermarks get injected into the stream. Suppose I have a stream with a parallel source, and I’m running a cluster with multiple task managers. Does each parallel source reader inject watermarks, which are then forwarded to downstream consumers and shuffled between task managers? Or are watermarks created after the shuffle, when the stream records reach their destined task manager and right before they’re processed by the operator?

 

Thanks,

 

Ray

Reply | Threaded
Open this post in threaded view
|

Re: At what point do watermarks get injected into the stream?

Fabian Hueske-2
Hi Ray,

in principle, watermarks can be injected anywhere in a stream by calling DataStream.assignTimestampsAndWatermarks().
However, timestamps are usually injected as soon as possible after a stream in ingested (before the first shuffle). The reason is that watermarks depend on the order of events (and their timestamps) in the stream. While Flink guarantees the order of events within a partition, a shuffle interleaves events of different partitions in an unpredictable way such that it is not possible to reason about the order of timestamps afterwards.

The most common way to inject watermarks is directly inside of a SourceFunction or with a TimestampAssigner before the first shuffle.

Best, Fabian

2017-06-09 0:46 GMT+02:00 Ray Ruvinskiy <[hidden email]>:

I’m trying to build a mental model of how watermarks get injected into the stream. Suppose I have a stream with a parallel source, and I’m running a cluster with multiple task managers. Does each parallel source reader inject watermarks, which are then forwarded to downstream consumers and shuffled between task managers? Or are watermarks created after the shuffle, when the stream records reach their destined task manager and right before they’re processed by the operator?

 

Thanks,

 

Ray


Reply | Threaded
Open this post in threaded view
|

Re: At what point do watermarks get injected into the stream?

Ray Ruvinskiy

Thanks for the explanation, Fabian.

 

Suppose I have a parallel source that does not inject watermarks, and the first operation on the DataStream is assignTimestampsAndWatermarks. Does each parallel task that makes up the source independently inject watermarks for the records that it has read? Suppose I then call keyBy and a shuffle ensues. Will the resulting partitions after the shuffle have interleaved watermarks from the various source tasks?

 

More concretely, suppose s source has a degree of parallelism of two. One of the source tasks injects the watermarks 2 and 5, while the other injects 3 and 10. There is then a shuffle, creating two different partitions. Will all the watermarks be broadcast to all the partitions? Or is it possible for, say, one partition to end up with watermarks 2 and 10 and another with 3 and 5? And after the shuffle, how do we ensure that the watermarks are processed in order by the operators receiving them?

 

Thanks,

 

Ray

 

From: Fabian Hueske <[hidden email]>
Date: Saturday, June 10, 2017 at 3:56 PM
To: Ray Ruvinskiy <[hidden email]>
Cc: "[hidden email]" <[hidden email]>
Subject: Re: At what point do watermarks get injected into the stream?

 

Hi Ray,

in principle, watermarks can be injected anywhere in a stream by calling DataStream.assignTimestampsAndWatermarks().

However, timestamps are usually injected as soon as possible after a stream in ingested (before the first shuffle). The reason is that watermarks depend on the order of events (and their timestamps) in the stream. While Flink guarantees the order of events within a partition, a shuffle interleaves events of different partitions in an unpredictable way such that it is not possible to reason about the order of timestamps afterwards.

The most common way to inject watermarks is directly inside of a SourceFunction or with a TimestampAssigner before the first shuffle.

Best, Fabian

 

2017-06-09 0:46 GMT+02:00 Ray Ruvinskiy <[hidden email]>:

I’m trying to build a mental model of how watermarks get injected into the stream. Suppose I have a stream with a parallel source, and I’m running a cluster with multiple task managers. Does each parallel source reader inject watermarks, which are then forwarded to downstream consumers and shuffled between task managers? Or are watermarks created after the shuffle, when the stream records reach their destined task manager and right before they’re processed by the operator?

 

Thanks,

 

Ray

 

Reply | Threaded
Open this post in threaded view
|

Re: At what point do watermarks get injected into the stream?

Fabian Hueske-2
Each parallel instance of a TimestampAssigner independently assigns timestamps.
After a shuffle, operators forward the minimum watermark across all input connections. For details have a look at the watermarks documentation [1].

Best, Fabian

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/event_time.html#watermarks-in-parallel-streams

2017-06-11 17:22 GMT+02:00 Ray Ruvinskiy <[hidden email]>:

Thanks for the explanation, Fabian.

 

Suppose I have a parallel source that does not inject watermarks, and the first operation on the DataStream is assignTimestampsAndWatermarks. Does each parallel task that makes up the source independently inject watermarks for the records that it has read? Suppose I then call keyBy and a shuffle ensues. Will the resulting partitions after the shuffle have interleaved watermarks from the various source tasks?

 

More concretely, suppose s source has a degree of parallelism of two. One of the source tasks injects the watermarks 2 and 5, while the other injects 3 and 10. There is then a shuffle, creating two different partitions. Will all the watermarks be broadcast to all the partitions? Or is it possible for, say, one partition to end up with watermarks 2 and 10 and another with 3 and 5? And after the shuffle, how do we ensure that the watermarks are processed in order by the operators receiving them?

 

Thanks,

 

Ray

 

From: Fabian Hueske <[hidden email]>
Date: Saturday, June 10, 2017 at 3:56 PM
To: Ray Ruvinskiy <[hidden email]>
Cc: "[hidden email]" <[hidden email]>
Subject: Re: At what point do watermarks get injected into the stream?

 

Hi Ray,

in principle, watermarks can be injected anywhere in a stream by calling DataStream.assignTimestampsAndWatermarks().

However, timestamps are usually injected as soon as possible after a stream in ingested (before the first shuffle). The reason is that watermarks depend on the order of events (and their timestamps) in the stream. While Flink guarantees the order of events within a partition, a shuffle interleaves events of different partitions in an unpredictable way such that it is not possible to reason about the order of timestamps afterwards.

The most common way to inject watermarks is directly inside of a SourceFunction or with a TimestampAssigner before the first shuffle.

Best, Fabian

 

2017-06-09 0:46 GMT+02:00 Ray Ruvinskiy <[hidden email]>:

I’m trying to build a mental model of how watermarks get injected into the stream. Suppose I have a stream with a parallel source, and I’m running a cluster with multiple task managers. Does each parallel source reader inject watermarks, which are then forwarded to downstream consumers and shuffled between task managers? Or are watermarks created after the shuffle, when the stream records reach their destined task manager and right before they’re processed by the operator?

 

Thanks,

 

Ray

 


Reply | Threaded
Open this post in threaded view
|

Re: At what point do watermarks get injected into the stream?

Ray Ruvinskiy

Thanks!

 

I had a couple some follow-up questions to the example in the documentation. Suppose Source 1 sends a watermark of 33, and Source 2 sends a watermark of 17. If I understand correctly, map(1) will forward the watermark of 33 to window(1) and window(2), and map(2) will forward the watermark of 17 to the same window operators. I’m assuming there is nothing to prevent window(1) and window(2) from getting the watermark of 33 before the watermark of 17, right? In that case, how do window(1) and window(2) compute the minimum watermark to forward to the next operator downstream? Will it be a per-window watermark?

 

What would happen if instead if a window operator in that position, we had something like the CEP operator, which in effect maintains state and does aggregations without windowing (or another similar such operator)? How does it determine what the minimum watermark is at any given time, in light of the fact that, in principle, it might receive a watermark value smaller than anything it’s seen before from a parallel source?

 

Ray

 

From: Fabian Hueske <[hidden email]>
Date: Sunday, June 11, 2017 at 5:54 PM
To: Ray Ruvinskiy <[hidden email]>
Cc: "[hidden email]" <[hidden email]>
Subject: Re: At what point do watermarks get injected into the stream?

 

Each parallel instance of a TimestampAssigner independently assigns timestamps.

After a shuffle, operators forward the minimum watermark across all input connections. For details have a look at the watermarks documentation [1].

Best, Fabian

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/event_time.html#watermarks-in-parallel-streams

 

2017-06-11 17:22 GMT+02:00 Ray Ruvinskiy <[hidden email]>:

Thanks for the explanation, Fabian.

 

Suppose I have a parallel source that does not inject watermarks, and the first operation on the DataStream is assignTimestampsAndWatermarks. Does each parallel task that makes up the source independently inject watermarks for the records that it has read? Suppose I then call keyBy and a shuffle ensues. Will the resulting partitions after the shuffle have interleaved watermarks from the various source tasks?

 

More concretely, suppose s source has a degree of parallelism of two. One of the source tasks injects the watermarks 2 and 5, while the other injects 3 and 10. There is then a shuffle, creating two different partitions. Will all the watermarks be broadcast to all the partitions? Or is it possible for, say, one partition to end up with watermarks 2 and 10 and another with 3 and 5? And after the shuffle, how do we ensure that the watermarks are processed in order by the operators receiving them?

 

Thanks,

 

Ray

 

From: Fabian Hueske <[hidden email]>
Date: Saturday, June 10, 2017 at 3:56 PM
To: Ray Ruvinskiy <[hidden email]>
Cc: "[hidden email]" <[hidden email]>
Subject: Re: At what point do watermarks get injected into the stream?

 

Hi Ray,

in principle, watermarks can be injected anywhere in a stream by calling DataStream.assignTimestampsAndWatermarks().

However, timestamps are usually injected as soon as possible after a stream in ingested (before the first shuffle). The reason is that watermarks depend on the order of events (and their timestamps) in the stream. While Flink guarantees the order of events within a partition, a shuffle interleaves events of different partitions in an unpredictable way such that it is not possible to reason about the order of timestamps afterwards.

The most common way to inject watermarks is directly inside of a SourceFunction or with a TimestampAssigner before the first shuffle.

Best, Fabian

 

2017-06-09 0:46 GMT+02:00 Ray Ruvinskiy <[hidden email]>:

I’m trying to build a mental model of how watermarks get injected into the stream. Suppose I have a stream with a parallel source, and I’m running a cluster with multiple task managers. Does each parallel source reader inject watermarks, which are then forwarded to downstream consumers and shuffled between task managers? Or are watermarks created after the shuffle, when the stream records reach their destined task manager and right before they’re processed by the operator?

 

Thanks,

 

Ray

 

 

Reply | Threaded
Open this post in threaded view
|

Re: At what point do watermarks get injected into the stream?

Fabian Hueske-2
Hi,

each operator keeps track of the latest (and therefore maximum) watermark received from each of its inputs and sets its own internal time to the minimum watermark of each input.
In the example, window(1) has two inputs (Map(1) and Map(2)). If Window(1) first receives a WM 33 from Map(1) it won't emit a watermark with 33 until it received a watermark with >= 33 from Map(2).

All operators use this logic for watermark propagation. So it does not matter whether this is a window operator or a CEP operator.

Let me know if you have further questions,
Fabian

2017-06-12 21:55 GMT+02:00 Ray Ruvinskiy <[hidden email]>:

Thanks!

 

I had a couple some follow-up questions to the example in the documentation. Suppose Source 1 sends a watermark of 33, and Source 2 sends a watermark of 17. If I understand correctly, map(1) will forward the watermark of 33 to window(1) and window(2), and map(2) will forward the watermark of 17 to the same window operators. I’m assuming there is nothing to prevent window(1) and window(2) from getting the watermark of 33 before the watermark of 17, right? In that case, how do window(1) and window(2) compute the minimum watermark to forward to the next operator downstream? Will it be a per-window watermark?

 

What would happen if instead if a window operator in that position, we had something like the CEP operator, which in effect maintains state and does aggregations without windowing (or another similar such operator)? How does it determine what the minimum watermark is at any given time, in light of the fact that, in principle, it might receive a watermark value smaller than anything it’s seen before from a parallel source?

 

Ray

 

From: Fabian Hueske <[hidden email]>
Date: Sunday, June 11, 2017 at 5:54 PM


To: Ray Ruvinskiy <[hidden email]>
Cc: "[hidden email]" <[hidden email]>
Subject: Re: At what point do watermarks get injected into the stream?

 

Each parallel instance of a TimestampAssigner independently assigns timestamps.

After a shuffle, operators forward the minimum watermark across all input connections. For details have a look at the watermarks documentation [1].

Best, Fabian

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/event_time.html#watermarks-in-parallel-streams

 

2017-06-11 17:22 GMT+02:00 Ray Ruvinskiy <[hidden email]>:

Thanks for the explanation, Fabian.

 

Suppose I have a parallel source that does not inject watermarks, and the first operation on the DataStream is assignTimestampsAndWatermarks. Does each parallel task that makes up the source independently inject watermarks for the records that it has read? Suppose I then call keyBy and a shuffle ensues. Will the resulting partitions after the shuffle have interleaved watermarks from the various source tasks?

 

More concretely, suppose s source has a degree of parallelism of two. One of the source tasks injects the watermarks 2 and 5, while the other injects 3 and 10. There is then a shuffle, creating two different partitions. Will all the watermarks be broadcast to all the partitions? Or is it possible for, say, one partition to end up with watermarks 2 and 10 and another with 3 and 5? And after the shuffle, how do we ensure that the watermarks are processed in order by the operators receiving them?

 

Thanks,

 

Ray

 

From: Fabian Hueske <[hidden email]>
Date: Saturday, June 10, 2017 at 3:56 PM
To: Ray Ruvinskiy <[hidden email]>
Cc: "[hidden email]" <[hidden email]>
Subject: Re: At what point do watermarks get injected into the stream?

 

Hi Ray,

in principle, watermarks can be injected anywhere in a stream by calling DataStream.assignTimestampsAndWatermarks().

However, timestamps are usually injected as soon as possible after a stream in ingested (before the first shuffle). The reason is that watermarks depend on the order of events (and their timestamps) in the stream. While Flink guarantees the order of events within a partition, a shuffle interleaves events of different partitions in an unpredictable way such that it is not possible to reason about the order of timestamps afterwards.

The most common way to inject watermarks is directly inside of a SourceFunction or with a TimestampAssigner before the first shuffle.

Best, Fabian

 

2017-06-09 0:46 GMT+02:00 Ray Ruvinskiy <[hidden email]>:

I’m trying to build a mental model of how watermarks get injected into the stream. Suppose I have a stream with a parallel source, and I’m running a cluster with multiple task managers. Does each parallel source reader inject watermarks, which are then forwarded to downstream consumers and shuffled between task managers? Or are watermarks created after the shuffle, when the stream records reach their destined task manager and right before they’re processed by the operator?

 

Thanks,

 

Ray

 

 


Reply | Threaded
Open this post in threaded view
|

Re: At what point do watermarks get injected into the stream?

Ray Ruvinskiy

One more question: continuing with the same example but supposing that the Map operators in the diagram are followed by a KeyBy, which is then followed by another Map, let’s say with operators name MapB(1) and MapB(2). Let’s further say that by luck of the draw, the keys of the records processed by Map(2) are such that all those records end up going to MapB(2) and none of them end up shuffled to MapB(1). Would MapB(1) still continue to get watermarks generated from Map(2), even though it’s not getting the records that caused those watermarks to be generated?

 

Thanks,

 

Ray

 

From: Fabian Hueske <[hidden email]>
Date: Monday, June 12, 2017 at 4:06 PM
To: Ray Ruvinskiy <[hidden email]>
Cc: "[hidden email]" <[hidden email]>
Subject: Re: At what point do watermarks get injected into the stream?

 

Hi,

each operator keeps track of the latest (and therefore maximum) watermark received from each of its inputs and sets its own internal time to the minimum watermark of each input.
In the example, window(1) has two inputs (Map(1) and Map(2)). If Window(1) first receives a WM 33 from Map(1) it won't emit a watermark with 33 until it received a watermark with >= 33 from Map(2).

All operators use this logic for watermark propagation. So it does not matter whether this is a window operator or a CEP operator.

Let me know if you have further questions,

Fabian

 

2017-06-12 21:55 GMT+02:00 Ray Ruvinskiy <[hidden email]>:

Thanks!

 

I had a couple some follow-up questions to the example in the documentation. Suppose Source 1 sends a watermark of 33, and Source 2 sends a watermark of 17. If I understand correctly, map(1) will forward the watermark of 33 to window(1) and window(2), and map(2) will forward the watermark of 17 to the same window operators. I’m assuming there is nothing to prevent window(1) and window(2) from getting the watermark of 33 before the watermark of 17, right? In that case, how do window(1) and window(2) compute the minimum watermark to forward to the next operator downstream? Will it be a per-window watermark?

 

What would happen if instead if a window operator in that position, we had something like the CEP operator, which in effect maintains state and does aggregations without windowing (or another similar such operator)? How does it determine what the minimum watermark is at any given time, in light of the fact that, in principle, it might receive a watermark value smaller than anything it’s seen before from a parallel source?

 

Ray

 

From: Fabian Hueske <[hidden email]>
Date: Sunday, June 11, 2017 at 5:54 PM


To: Ray Ruvinskiy <[hidden email]>
Cc: "[hidden email]" <[hidden email]>
Subject: Re: At what point do watermarks get injected into the stream?

 

Each parallel instance of a TimestampAssigner independently assigns timestamps.

After a shuffle, operators forward the minimum watermark across all input connections. For details have a look at the watermarks documentation [1].

Best, Fabian

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/event_time.html#watermarks-in-parallel-streams

 

2017-06-11 17:22 GMT+02:00 Ray Ruvinskiy <[hidden email]>:

Thanks for the explanation, Fabian.

 

Suppose I have a parallel source that does not inject watermarks, and the first operation on the DataStream is assignTimestampsAndWatermarks. Does each parallel task that makes up the source independently inject watermarks for the records that it has read? Suppose I then call keyBy and a shuffle ensues. Will the resulting partitions after the shuffle have interleaved watermarks from the various source tasks?

 

More concretely, suppose s source has a degree of parallelism of two. One of the source tasks injects the watermarks 2 and 5, while the other injects 3 and 10. There is then a shuffle, creating two different partitions. Will all the watermarks be broadcast to all the partitions? Or is it possible for, say, one partition to end up with watermarks 2 and 10 and another with 3 and 5? And after the shuffle, how do we ensure that the watermarks are processed in order by the operators receiving them?

 

Thanks,

 

Ray

 

From: Fabian Hueske <[hidden email]>
Date: Saturday, June 10, 2017 at 3:56 PM
To: Ray Ruvinskiy <[hidden email]>
Cc: "[hidden email]" <[hidden email]>
Subject: Re: At what point do watermarks get injected into the stream?

 

Hi Ray,

in principle, watermarks can be injected anywhere in a stream by calling DataStream.assignTimestampsAndWatermarks().

However, timestamps are usually injected as soon as possible after a stream in ingested (before the first shuffle). The reason is that watermarks depend on the order of events (and their timestamps) in the stream. While Flink guarantees the order of events within a partition, a shuffle interleaves events of different partitions in an unpredictable way such that it is not possible to reason about the order of timestamps afterwards.

The most common way to inject watermarks is directly inside of a SourceFunction or with a TimestampAssigner before the first shuffle.

Best, Fabian

 

2017-06-09 0:46 GMT+02:00 Ray Ruvinskiy <[hidden email]>:

I’m trying to build a mental model of how watermarks get injected into the stream. Suppose I have a stream with a parallel source, and I’m running a cluster with multiple task managers. Does each parallel source reader inject watermarks, which are then forwarded to downstream consumers and shuffled between task managers? Or are watermarks created after the shuffle, when the stream records reach their destined task manager and right before they’re processed by the operator?

 

Thanks,

 

Ray

 

 

 

Reply | Threaded
Open this post in threaded view
|

Re: At what point do watermarks get injected into the stream?

Fabian Hueske-2
Yes, I think so. @Aljoscha, please correct me if I describe something wrong here.

The first map tasks (Map(1) and Map(2)) broadcast their watermarks to all connected subsequent tasks (window(1) and window(2)).
The window tasks update their respective watermarks based on the watermarks received from their input tasks (map(1) and map(2)) and forward their watermarks to all connected subsequent tasks.
Since map does not require a shuffle, mapB(1) will receive only watermarks from window(1) and mapB(2) from window(2). However, since the watermarks of the windows depend on their inputs (map(1) and map(2)), the mapB tasks will get watermarks from all inputs.

Note that watermarks are not bound to specified records but define the "logical time" of an operator.
Because watermarks are always broadcasted to all subsequent tasks, the whole application is "synced" to the same time after the first keyBy() (or anyother kind of full shuffle).
So the progress of time is determined by the "slowest" input.

Best, Fabian

2017-06-14 13:51 GMT+02:00 Ray Ruvinskiy <[hidden email]>:

One more question: continuing with the same example but supposing that the Map operators in the diagram are followed by a KeyBy, which is then followed by another Map, let’s say with operators name MapB(1) and MapB(2). Let’s further say that by luck of the draw, the keys of the records processed by Map(2) are such that all those records end up going to MapB(2) and none of them end up shuffled to MapB(1). Would MapB(1) still continue to get watermarks generated from Map(2), even though it’s not getting the records that caused those watermarks to be generated?

 

Thanks,

 

Ray

 

From: Fabian Hueske <[hidden email]>
Date: Monday, June 12, 2017 at 4:06 PM


To: Ray Ruvinskiy <[hidden email]>
Cc: "[hidden email]" <[hidden email]>
Subject: Re: At what point do watermarks get injected into the stream?

 

Hi,

each operator keeps track of the latest (and therefore maximum) watermark received from each of its inputs and sets its own internal time to the minimum watermark of each input.
In the example, window(1) has two inputs (Map(1) and Map(2)). If Window(1) first receives a WM 33 from Map(1) it won't emit a watermark with 33 until it received a watermark with >= 33 from Map(2).

All operators use this logic for watermark propagation. So it does not matter whether this is a window operator or a CEP operator.

Let me know if you have further questions,

Fabian

 

2017-06-12 21:55 GMT+02:00 Ray Ruvinskiy <[hidden email]>:

Thanks!

 

I had a couple some follow-up questions to the example in the documentation. Suppose Source 1 sends a watermark of 33, and Source 2 sends a watermark of 17. If I understand correctly, map(1) will forward the watermark of 33 to window(1) and window(2), and map(2) will forward the watermark of 17 to the same window operators. I’m assuming there is nothing to prevent window(1) and window(2) from getting the watermark of 33 before the watermark of 17, right? In that case, how do window(1) and window(2) compute the minimum watermark to forward to the next operator downstream? Will it be a per-window watermark?

 

What would happen if instead if a window operator in that position, we had something like the CEP operator, which in effect maintains state and does aggregations without windowing (or another similar such operator)? How does it determine what the minimum watermark is at any given time, in light of the fact that, in principle, it might receive a watermark value smaller than anything it’s seen before from a parallel source?

 

Ray

 

From: Fabian Hueske <[hidden email]>
Date: Sunday, June 11, 2017 at 5:54 PM


To: Ray Ruvinskiy <[hidden email]>
Cc: "[hidden email]" <[hidden email]>
Subject: Re: At what point do watermarks get injected into the stream?

 

Each parallel instance of a TimestampAssigner independently assigns timestamps.

After a shuffle, operators forward the minimum watermark across all input connections. For details have a look at the watermarks documentation [1].

Best, Fabian

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/event_time.html#watermarks-in-parallel-streams

 

2017-06-11 17:22 GMT+02:00 Ray Ruvinskiy <[hidden email]>:

Thanks for the explanation, Fabian.

 

Suppose I have a parallel source that does not inject watermarks, and the first operation on the DataStream is assignTimestampsAndWatermarks. Does each parallel task that makes up the source independently inject watermarks for the records that it has read? Suppose I then call keyBy and a shuffle ensues. Will the resulting partitions after the shuffle have interleaved watermarks from the various source tasks?

 

More concretely, suppose s source has a degree of parallelism of two. One of the source tasks injects the watermarks 2 and 5, while the other injects 3 and 10. There is then a shuffle, creating two different partitions. Will all the watermarks be broadcast to all the partitions? Or is it possible for, say, one partition to end up with watermarks 2 and 10 and another with 3 and 5? And after the shuffle, how do we ensure that the watermarks are processed in order by the operators receiving them?

 

Thanks,

 

Ray

 

From: Fabian Hueske <[hidden email]>
Date: Saturday, June 10, 2017 at 3:56 PM
To: Ray Ruvinskiy <[hidden email]>
Cc: "[hidden email]" <[hidden email]>
Subject: Re: At what point do watermarks get injected into the stream?

 

Hi Ray,

in principle, watermarks can be injected anywhere in a stream by calling DataStream.assignTimestampsAndWatermarks().

However, timestamps are usually injected as soon as possible after a stream in ingested (before the first shuffle). The reason is that watermarks depend on the order of events (and their timestamps) in the stream. While Flink guarantees the order of events within a partition, a shuffle interleaves events of different partitions in an unpredictable way such that it is not possible to reason about the order of timestamps afterwards.

The most common way to inject watermarks is directly inside of a SourceFunction or with a TimestampAssigner before the first shuffle.

Best, Fabian

 

2017-06-09 0:46 GMT+02:00 Ray Ruvinskiy <[hidden email]>:

I’m trying to build a mental model of how watermarks get injected into the stream. Suppose I have a stream with a parallel source, and I’m running a cluster with multiple task managers. Does each parallel source reader inject watermarks, which are then forwarded to downstream consumers and shuffled between task managers? Or are watermarks created after the shuffle, when the stream records reach their destined task manager and right before they’re processed by the operator?

 

Thanks,

 

Ray

 

 

 


Reply | Threaded
Open this post in threaded view
|

Re: At what point do watermarks get injected into the stream?

Ray Ruvinskiy

Thanks again for the detailed explanation. There is one point I want to make sure I fully understand:

 

> Since map does not require a shuffle, mapB(1) will receive only watermarks from window(1) and mapB(2) from window(2). However, since the watermarks of the windows depend on their inputs (map(1) and map(2)), the mapB tasks will get watermarks from all inputs.

 

As long as operators chaining is happening, watermarks will not be broadcast and will effectively be local to the chain, but as soon as a shuffle occurs (e.g., on a keyBy), all watermarks will be broadcast to all downstream tasks?

 

Ray

 

From: Fabian Hueske <[hidden email]>
Date: Wednesday, June 14, 2017 at 8:44 AM
To: Ray Ruvinskiy <[hidden email]>
Cc: "[hidden email]" <[hidden email]>, Aljoscha Krettek <[hidden email]>
Subject: Re: At what point do watermarks get injected into the stream?

 

Yes, I think so. @Aljoscha, please correct me if I describe something wrong here.

The first map tasks (Map(1) and Map(2)) broadcast their watermarks to all connected subsequent tasks (window(1) and window(2)).

The window tasks update their respective watermarks based on the watermarks received from their input tasks (map(1) and map(2)) and forward their watermarks to all connected subsequent tasks.

Since map does not require a shuffle, mapB(1) will receive only watermarks from window(1) and mapB(2) from window(2). However, since the watermarks of the windows depend on their inputs (map(1) and map(2)), the mapB tasks will get watermarks from all inputs.

Note that watermarks are not bound to specified records but define the "logical time" of an operator.
Because watermarks are always broadcasted to all subsequent tasks, the whole application is "synced" to the same time after the first keyBy() (or anyother kind of full shuffle).
So the progress of time is determined by the "slowest" input.

Best, Fabian

 

2017-06-14 13:51 GMT+02:00 Ray Ruvinskiy <[hidden email]>:

One more question: continuing with the same example but supposing that the Map operators in the diagram are followed by a KeyBy, which is then followed by another Map, let’s say with operators name MapB(1) and MapB(2). Let’s further say that by luck of the draw, the keys of the records processed by Map(2) are such that all those records end up going to MapB(2) and none of them end up shuffled to MapB(1). Would MapB(1) still continue to get watermarks generated from Map(2), even though it’s not getting the records that caused those watermarks to be generated?

 

Thanks,

 

Ray

 

From: Fabian Hueske <[hidden email]>
Date: Monday, June 12, 2017 at 4:06 PM


To: Ray Ruvinskiy <[hidden email]>
Cc: "[hidden email]" <[hidden email]>
Subject: Re: At what point do watermarks get injected into the stream?

 

Hi,

each operator keeps track of the latest (and therefore maximum) watermark received from each of its inputs and sets its own internal time to the minimum watermark of each input.
In the example, window(1) has two inputs (Map(1) and Map(2)). If Window(1) first receives a WM 33 from Map(1) it won't emit a watermark with 33 until it received a watermark with >= 33 from Map(2).

All operators use this logic for watermark propagation. So it does not matter whether this is a window operator or a CEP operator.

Let me know if you have further questions,

Fabian

 

2017-06-12 21:55 GMT+02:00 Ray Ruvinskiy <[hidden email]>:

Thanks!

 

I had a couple some follow-up questions to the example in the documentation. Suppose Source 1 sends a watermark of 33, and Source 2 sends a watermark of 17. If I understand correctly, map(1) will forward the watermark of 33 to window(1) and window(2), and map(2) will forward the watermark of 17 to the same window operators. I’m assuming there is nothing to prevent window(1) and window(2) from getting the watermark of 33 before the watermark of 17, right? In that case, how do window(1) and window(2) compute the minimum watermark to forward to the next operator downstream? Will it be a per-window watermark?

 

What would happen if instead if a window operator in that position, we had something like the CEP operator, which in effect maintains state and does aggregations without windowing (or another similar such operator)? How does it determine what the minimum watermark is at any given time, in light of the fact that, in principle, it might receive a watermark value smaller than anything it’s seen before from a parallel source?

 

Ray

 

From: Fabian Hueske <[hidden email]>
Date: Sunday, June 11, 2017 at 5:54 PM


To: Ray Ruvinskiy <[hidden email]>
Cc: "[hidden email]" <[hidden email]>
Subject: Re: At what point do watermarks get injected into the stream?

 

Each parallel instance of a TimestampAssigner independently assigns timestamps.

After a shuffle, operators forward the minimum watermark across all input connections. For details have a look at the watermarks documentation [1].

Best, Fabian

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/event_time.html#watermarks-in-parallel-streams

 

2017-06-11 17:22 GMT+02:00 Ray Ruvinskiy <[hidden email]>:

Thanks for the explanation, Fabian.

 

Suppose I have a parallel source that does not inject watermarks, and the first operation on the DataStream is assignTimestampsAndWatermarks. Does each parallel task that makes up the source independently inject watermarks for the records that it has read? Suppose I then call keyBy and a shuffle ensues. Will the resulting partitions after the shuffle have interleaved watermarks from the various source tasks?

 

More concretely, suppose s source has a degree of parallelism of two. One of the source tasks injects the watermarks 2 and 5, while the other injects 3 and 10. There is then a shuffle, creating two different partitions. Will all the watermarks be broadcast to all the partitions? Or is it possible for, say, one partition to end up with watermarks 2 and 10 and another with 3 and 5? And after the shuffle, how do we ensure that the watermarks are processed in order by the operators receiving them?

 

Thanks,

 

Ray

 

From: Fabian Hueske <[hidden email]>
Date: Saturday, June 10, 2017 at 3:56 PM
To: Ray Ruvinskiy <[hidden email]>
Cc: "[hidden email]" <[hidden email]>
Subject: Re: At what point do watermarks get injected into the stream?

 

Hi Ray,

in principle, watermarks can be injected anywhere in a stream by calling DataStream.assignTimestampsAndWatermarks().

However, timestamps are usually injected as soon as possible after a stream in ingested (before the first shuffle). The reason is that watermarks depend on the order of events (and their timestamps) in the stream. While Flink guarantees the order of events within a partition, a shuffle interleaves events of different partitions in an unpredictable way such that it is not possible to reason about the order of timestamps afterwards.

The most common way to inject watermarks is directly inside of a SourceFunction or with a TimestampAssigner before the first shuffle.

Best, Fabian

 

2017-06-09 0:46 GMT+02:00 Ray Ruvinskiy <[hidden email]>:

I’m trying to build a mental model of how watermarks get injected into the stream. Suppose I have a stream with a parallel source, and I’m running a cluster with multiple task managers. Does each parallel source reader inject watermarks, which are then forwarded to downstream consumers and shuffled between task managers? Or are watermarks created after the shuffle, when the stream records reach their destined task manager and right before they’re processed by the operator?

 

Thanks,

 

Ray

 

 

 

 

Reply | Threaded
Open this post in threaded view
|

Re: At what point do watermarks get injected into the stream?

Fabian Hueske-2
Yes that is correct. Watermarks are propagated to all outgoing channels of an operator even if no records flows over one or more channels (for example if data is already partitioned).

2017-06-14 14:51 GMT+02:00 Ray Ruvinskiy <[hidden email]>:

Thanks again for the detailed explanation. There is one point I want to make sure I fully understand:

 

> Since map does not require a shuffle, mapB(1) will receive only watermarks from window(1) and mapB(2) from window(2). However, since the watermarks of the windows depend on their inputs (map(1) and map(2)), the mapB tasks will get watermarks from all inputs.

 

As long as operators chaining is happening, watermarks will not be broadcast and will effectively be local to the chain, but as soon as a shuffle occurs (e.g., on a keyBy), all watermarks will be broadcast to all downstream tasks?

 

Ray

 

From: Fabian Hueske <[hidden email]>
Date: Wednesday, June 14, 2017 at 8:44 AM
To: Ray Ruvinskiy <[hidden email]>
Cc: "[hidden email]" <[hidden email]>, Aljoscha Krettek <[hidden email]>


Subject: Re: At what point do watermarks get injected into the stream?

 

Yes, I think so. @Aljoscha, please correct me if I describe something wrong here.

The first map tasks (Map(1) and Map(2)) broadcast their watermarks to all connected subsequent tasks (window(1) and window(2)).

The window tasks update their respective watermarks based on the watermarks received from their input tasks (map(1) and map(2)) and forward their watermarks to all connected subsequent tasks.

Since map does not require a shuffle, mapB(1) will receive only watermarks from window(1) and mapB(2) from window(2). However, since the watermarks of the windows depend on their inputs (map(1) and map(2)), the mapB tasks will get watermarks from all inputs.

Note that watermarks are not bound to specified records but define the "logical time" of an operator.
Because watermarks are always broadcasted to all subsequent tasks, the whole application is "synced" to the same time after the first keyBy() (or anyother kind of full shuffle).
So the progress of time is determined by the "slowest" input.

Best, Fabian

 

2017-06-14 13:51 GMT+02:00 Ray Ruvinskiy <[hidden email]>:

One more question: continuing with the same example but supposing that the Map operators in the diagram are followed by a KeyBy, which is then followed by another Map, let’s say with operators name MapB(1) and MapB(2). Let’s further say that by luck of the draw, the keys of the records processed by Map(2) are such that all those records end up going to MapB(2) and none of them end up shuffled to MapB(1). Would MapB(1) still continue to get watermarks generated from Map(2), even though it’s not getting the records that caused those watermarks to be generated?

 

Thanks,

 

Ray

 

From: Fabian Hueske <[hidden email]>
Date: Monday, June 12, 2017 at 4:06 PM


To: Ray Ruvinskiy <[hidden email]>
Cc: "[hidden email]" <[hidden email]>
Subject: Re: At what point do watermarks get injected into the stream?

 

Hi,

each operator keeps track of the latest (and therefore maximum) watermark received from each of its inputs and sets its own internal time to the minimum watermark of each input.
In the example, window(1) has two inputs (Map(1) and Map(2)). If Window(1) first receives a WM 33 from Map(1) it won't emit a watermark with 33 until it received a watermark with >= 33 from Map(2).

All operators use this logic for watermark propagation. So it does not matter whether this is a window operator or a CEP operator.

Let me know if you have further questions,

Fabian

 

2017-06-12 21:55 GMT+02:00 Ray Ruvinskiy <[hidden email]>:

Thanks!

 

I had a couple some follow-up questions to the example in the documentation. Suppose Source 1 sends a watermark of 33, and Source 2 sends a watermark of 17. If I understand correctly, map(1) will forward the watermark of 33 to window(1) and window(2), and map(2) will forward the watermark of 17 to the same window operators. I’m assuming there is nothing to prevent window(1) and window(2) from getting the watermark of 33 before the watermark of 17, right? In that case, how do window(1) and window(2) compute the minimum watermark to forward to the next operator downstream? Will it be a per-window watermark?

 

What would happen if instead if a window operator in that position, we had something like the CEP operator, which in effect maintains state and does aggregations without windowing (or another similar such operator)? How does it determine what the minimum watermark is at any given time, in light of the fact that, in principle, it might receive a watermark value smaller than anything it’s seen before from a parallel source?

 

Ray

 

From: Fabian Hueske <[hidden email]>
Date: Sunday, June 11, 2017 at 5:54 PM


To: Ray Ruvinskiy <[hidden email]>
Cc: "[hidden email]" <[hidden email]>
Subject: Re: At what point do watermarks get injected into the stream?

 

Each parallel instance of a TimestampAssigner independently assigns timestamps.

After a shuffle, operators forward the minimum watermark across all input connections. For details have a look at the watermarks documentation [1].

Best, Fabian

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/event_time.html#watermarks-in-parallel-streams

 

2017-06-11 17:22 GMT+02:00 Ray Ruvinskiy <[hidden email]>:

Thanks for the explanation, Fabian.

 

Suppose I have a parallel source that does not inject watermarks, and the first operation on the DataStream is assignTimestampsAndWatermarks. Does each parallel task that makes up the source independently inject watermarks for the records that it has read? Suppose I then call keyBy and a shuffle ensues. Will the resulting partitions after the shuffle have interleaved watermarks from the various source tasks?

 

More concretely, suppose s source has a degree of parallelism of two. One of the source tasks injects the watermarks 2 and 5, while the other injects 3 and 10. There is then a shuffle, creating two different partitions. Will all the watermarks be broadcast to all the partitions? Or is it possible for, say, one partition to end up with watermarks 2 and 10 and another with 3 and 5? And after the shuffle, how do we ensure that the watermarks are processed in order by the operators receiving them?

 

Thanks,

 

Ray

 

From: Fabian Hueske <[hidden email]>
Date: Saturday, June 10, 2017 at 3:56 PM
To: Ray Ruvinskiy <[hidden email]>
Cc: "[hidden email]" <[hidden email]>
Subject: Re: At what point do watermarks get injected into the stream?

 

Hi Ray,

in principle, watermarks can be injected anywhere in a stream by calling DataStream.assignTimestampsAndWatermarks().

However, timestamps are usually injected as soon as possible after a stream in ingested (before the first shuffle). The reason is that watermarks depend on the order of events (and their timestamps) in the stream. While Flink guarantees the order of events within a partition, a shuffle interleaves events of different partitions in an unpredictable way such that it is not possible to reason about the order of timestamps afterwards.

The most common way to inject watermarks is directly inside of a SourceFunction or with a TimestampAssigner before the first shuffle.

Best, Fabian

 

2017-06-09 0:46 GMT+02:00 Ray Ruvinskiy <[hidden email]>:

I’m trying to build a mental model of how watermarks get injected into the stream. Suppose I have a stream with a parallel source, and I’m running a cluster with multiple task managers. Does each parallel source reader inject watermarks, which are then forwarded to downstream consumers and shuffled between task managers? Or are watermarks created after the shuffle, when the stream records reach their destined task manager and right before they’re processed by the operator?

 

Thanks,

 

Ray