Watermarks on map operator

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

Watermarks on map operator

Antonis Papaioannou
Hi,

reading through the documentation regarding waterrmarks, it is not clear to me if watermarks are also used by map/flatmpat operators or just by window operators.

My application reads from a kafka topic (with multiple partitions) and extracts assigns timestamp on each tuple based on some fields of the kafka records. A following keyBy operator creates partitions and sends the tuples to the corresponding downstream map/flatmap operator. I have set the timecharacteristic to EventTime.

However, it seems that the flatmap operator *does not* guaranteed that it will process elements in an deterministic time order.
Is this correct?

Antonis



Reply | Threaded
Open this post in threaded view
|

Re: Watermarks on map operator

Kezhu Wang
> it is not clear to me if watermarks are also used by map/flatmpat operators or just by window operators. 

Watermarks are most liked only used by timing segmented aggregation operator to trigger result materialization. In streaming, this “timing segmentation” is usually called “windowing”, so in this sense, watermarks are just used by window operators. But, there are other type of window, say, count-window.

My application reads from a kafka topic (with multiple partitions) and extracts assigns timestamp on each tuple based on some fields of the kafka records.

Watermarks depend on timestamps, but the two are different things. Windowing operations use timestamps to segment/pane/bucket elements to window, while watermarks signal time-progress to window operations, so they can materialize memorized window results to downstream.

it seems that the flatmap operator *does not* guaranteed that it will process elements in an deterministic time order. 

Most operators just pass timestamps/watermarks to downstream. All operators including window operators process element arriving order. If you want event time ordered elements, you need do window operation in upstream operators.

Resources:
https://www.oreilly.com/radar/the-world-beyond-batch-streaming-102/

Hope it could be helpful.

Best,
Kezhu Wang

On February 4, 2021 at 23:17:07, Antonis Papaioannou ([hidden email]) wrote:

Hi,

reading through the documentation regarding waterrmarks, it is not clear to me if watermarks are also used by map/flatmpat operators or just by window operators.

My application reads from a kafka topic (with multiple partitions) and extracts assigns timestamp on each tuple based on some fields of the kafka records. A following keyBy operator creates partitions and sends the tuples to the corresponding downstream map/flatmap operator. I have set the timecharacteristic to EventTime.

However, it seems that the flatmap operator *does not* guaranteed that it will process elements in an deterministic time order.
Is this correct?

Antonis



Reply | Threaded
Open this post in threaded view
|

Re: Watermarks on map operator

David Anderson-3
Basically the only thing that Watermarks do is to trigger event time timers. Event time timers are used explicitly in KeyedProcessFunctions, but are also used internally by time windows, CEP (to sort the event stream), in various time-based join operations, and within the Table/SQL API.

If you want the event stream to be sorted before doing further processing on it, that's easily done with the Table/SQL API, or CEP. There's an example in [1].

Best,
David


On Fri, Feb 5, 2021 at 5:23 AM Kezhu Wang <[hidden email]> wrote:
> it is not clear to me if watermarks are also used by map/flatmpat operators or just by window operators. 

Watermarks are most liked only used by timing segmented aggregation operator to trigger result materialization. In streaming, this “timing segmentation” is usually called “windowing”, so in this sense, watermarks are just used by window operators. But, there are other type of window, say, count-window.

My application reads from a kafka topic (with multiple partitions) and extracts assigns timestamp on each tuple based on some fields of the kafka records.

Watermarks depend on timestamps, but the two are different things. Windowing operations use timestamps to segment/pane/bucket elements to window, while watermarks signal time-progress to window operations, so they can materialize memorized window results to downstream.

it seems that the flatmap operator *does not* guaranteed that it will process elements in an deterministic time order. 

Most operators just pass timestamps/watermarks to downstream. All operators including window operators process element arriving order. If you want event time ordered elements, you need do window operation in upstream operators.

Resources:
https://www.oreilly.com/radar/the-world-beyond-batch-streaming-102/

Hope it could be helpful.

Best,
Kezhu Wang

On February 4, 2021 at 23:17:07, Antonis Papaioannou ([hidden email]) wrote:

Hi,

reading through the documentation regarding waterrmarks, it is not clear to me if watermarks are also used by map/flatmpat operators or just by window operators.

My application reads from a kafka topic (with multiple partitions) and extracts assigns timestamp on each tuple based on some fields of the kafka records. A following keyBy operator creates partitions and sends the tuples to the corresponding downstream map/flatmap operator. I have set the timecharacteristic to EventTime.

However, it seems that the flatmap operator *does not* guaranteed that it will process elements in an deterministic time order.
Is this correct?

Antonis