Far too few watermarks getting generated with Kafka source

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

Far too few watermarks getting generated with Kafka source

William Saar-2
Hi,
I have a job where we read data from either Kafka or a file (for testing), decode the entries and flat map them into events, and then add a timestamp and watermark assigner to the events in a later operation. This seems to generate periodic watermarks when running from a file, but when Kafka is the source we barely get any watermark updates. What could be causing this? (the environment has setAutowatermarkInterval(1000))

Do we need to do all the timestamp and watermark assignment in the Kafka source? or should it work to do it in later operations? The events do seem to get propagated through the pipeline, we're just not getting watermarks...

Thanks,
William
Reply | Threaded
Open this post in threaded view
|

Re: Far too few watermarks getting generated with Kafka source

Gary Yao-2
Hi William,

How often does the Watermark get updated? Can you share your code that generates
the watermarks? Watermarks should be strictly ascending. If your code produces
watermarks that are not ascending, smaller ones will be discarded. Could it be
that the events in Kafka are more "out of order" with respect to event time than
in your file?

You can assign timestamps in the Kafka source or later. The Flink documentation
has a section on why it could be beneficial to assign Watermarks in the Kafka
source:


Best,
Gary

On Wed, Jan 17, 2018 at 5:15 PM, William Saar <[hidden email]> wrote:
Hi,
I have a job where we read data from either Kafka or a file (for testing), decode the entries and flat map them into events, and then add a timestamp and watermark assigner to the events in a later operation. This seems to generate periodic watermarks when running from a file, but when Kafka is the source we barely get any watermark updates. What could be causing this? (the environment has setAutowatermarkInterval(1000))

Do we need to do all the timestamp and watermark assignment in the Kafka source? or should it work to do it in later operations? The events do seem to get propagated through the pipeline, we're just not getting watermarks...

Thanks,
William

Reply | Threaded
Open this post in threaded view
|

Re: Far too few watermarks getting generated with Kafka source

William Saar-2
Hi,
The watermark does not seem to get updated at all after the first one is emitted. We used to get out-of-order warnings, but we changed to job to support a bounded timestamp extractor so we no longer get those warnings.

Our timestamp extractor looks like this
class TsExtractor[T](time : Time) extends BoundedOutOfOrdernessTimestampExtractor[Timestamped[T]](time : Time) {
override def extractTimestamp(element: Timestamped[T]): Long = element.timestamp
}

Our stream topology starts with a single stream, then we do two separate flat map and filtering operations on the initial stream to transform data batches
into streams of two different event types. We then assignTimestampsAndWatermarks(new TsExtractor[EventType](Time.seconds(20))) for each event type on both
branches before unioning the two branches to a single stream again (the reason for the split is that the data used to come from two different topics).

William



----- Original Message -----
From:
"Gary Yao" <[hidden email]>

To:
"William Saar" <[hidden email]>
Cc:
"user" <[hidden email]>
Sent:
Thu, 18 Jan 2018 11:11:17 +0100
Subject:
Re: Far too few watermarks getting generated with Kafka source


Hi William,

How often does the Watermark get updated? Can you share your code that generates
the watermarks? Watermarks should be strictly ascending. If your code produces
watermarks that are not ascending, smaller ones will be discarded. Could it be
that the events in Kafka are more "out of order" with respect to event time than
in your file?

You can assign timestamps in the Kafka source or later. The Flink documentation
has a section on why it could be beneficial to assign Watermarks in the Kafka
source:


Best,
Gary

On Wed, Jan 17, 2018 at 5:15 PM, William Saar <[hidden email]> wrote:
Hi,
I have a job where we read data from either Kafka or a file (for testing), decode the entries and flat map them into events, and then add a timestamp and watermark assigner to the events in a later operation. This seems to generate periodic watermarks when running from a file, but when Kafka is the source we barely get any watermark updates. What could be causing this? (the environment has setAutowatermarkInterval(1000))

Do we need to do all the timestamp and watermark assignment in the Kafka source? or should it work to do it in later operations? The events do seem to get propagated through the pipeline, we're just not getting watermarks...

Thanks,
William

Reply | Threaded
Open this post in threaded view
|

Re: Far too few watermarks getting generated with Kafka source

Fabian Hueske-2
Hi William,

The TsExtractor looks good.
This sounds like a strange behavior and should not (or only indirectly) be related to the Kafka source since the WMs are generated by a separate extractor.

- Did you compare the first (and only) generated watermark to the timestamps of the records that are produced by the sources?
It might be far ahead of the timestamps in the records and won't be updated because the timestamps of the records are smaller.

- What is the parallelism of the file sources / Kafka source and following operators?
Watermarks can only advance if they advance in all parallel instance of the timestamp extractor.

Best, Fabian

2018-01-18 16:09 GMT+01:00 William Saar <[hidden email]>:
Hi,
The watermark does not seem to get updated at all after the first one is emitted. We used to get out-of-order warnings, but we changed to job to support a bounded timestamp extractor so we no longer get those warnings.

Our timestamp extractor looks like this
class TsExtractor[T](time : Time) extends BoundedOutOfOrdernessTimestampExtractor[Timestamped[T]](time : Time) {
override def extractTimestamp(element: Timestamped[T]): Long = element.timestamp
}

Our stream topology starts with a single stream, then we do two separate flat map and filtering operations on the initial stream to transform data batches
into streams of two different event types. We then assignTimestampsAndWatermarks(new TsExtractor[EventType](Time.seconds(20))) for each event type on both
branches before unioning the two branches to a single stream again (the reason for the split is that the data used to come from two different topics).

William



----- Original Message -----
From:
"Gary Yao" <[hidden email]>

To:
"William Saar" <[hidden email]>
Cc:
"user" <[hidden email]>
Sent:
Thu, 18 Jan 2018 11:11:17 +0100
Subject:
Re: Far too few watermarks getting generated with Kafka source



Hi William,

How often does the Watermark get updated? Can you share your code that generates
the watermarks? Watermarks should be strictly ascending. If your code produces
watermarks that are not ascending, smaller ones will be discarded. Could it be
that the events in Kafka are more "out of order" with respect to event time than
in your file?

You can assign timestamps in the Kafka source or later. The Flink documentation
has a section on why it could be beneficial to assign Watermarks in the Kafka
source:


Best,
Gary

On Wed, Jan 17, 2018 at 5:15 PM, William Saar <[hidden email]> wrote:
Hi,
I have a job where we read data from either Kafka or a file (for testing), decode the entries and flat map them into events, and then add a timestamp and watermark assigner to the events in a later operation. This seems to generate periodic watermarks when running from a file, but when Kafka is the source we barely get any watermark updates. What could be causing this? (the environment has setAutowatermarkInterval(1000))

Do we need to do all the timestamp and watermark assignment in the Kafka source? or should it work to do it in later operations? The events do seem to get propagated through the pipeline, we're just not getting watermarks...

Thanks,
William


Reply | Threaded
Open this post in threaded view
|

Re: Far too few watermarks getting generated with Kafka source

Eron Wright
I think there's a misconception about `setAutowatermarkInterval`.   It establishes the rate at which your periodic watermark generator is polled for the current watermark.   Like most generators, `BoundedOutOfOrdernessTimestampExtractor` produces a watermark based solely on observed elements.   Therefore, `setAutowatermarkInterval` does not compensate for idle sources (see FLINK-5479 and FLINK-5018).

Keep in mind that sources do not re-order emitted elements into event time order; depending on the source's internals, it might emit elements in a highly unordered fashion with respect to event time.   For example, the Kafka consumer processes elements across numerous partitions simultaneously, and the resultant ordering is highly variable.   When you use the generic `assignTimestampsAndWatermarks` facility, the assigner is challenged to make sense of this multiplexed stream of elements.   For this reason, I would strongly suggest you make use of the Kafka consumer's support for per-partition assigners, to be able to reason about the progression of time in each partition independently.

Here's a good diagram of the phenomemon that I'm describing.  Observe how some elements seem to 'move upward' together, and imagine that they correspond to one partition. 

Hope this helps!
Eron



On Mon, Jan 22, 2018 at 2:24 AM, Fabian Hueske <[hidden email]> wrote:
Hi William,

The TsExtractor looks good.
This sounds like a strange behavior and should not (or only indirectly) be related to the Kafka source since the WMs are generated by a separate extractor.

- Did you compare the first (and only) generated watermark to the timestamps of the records that are produced by the sources?
It might be far ahead of the timestamps in the records and won't be updated because the timestamps of the records are smaller.

- What is the parallelism of the file sources / Kafka source and following operators?
Watermarks can only advance if they advance in all parallel instance of the timestamp extractor.

Best, Fabian

2018-01-18 16:09 GMT+01:00 William Saar <[hidden email]>:
Hi,
The watermark does not seem to get updated at all after the first one is emitted. We used to get out-of-order warnings, but we changed to job to support a bounded timestamp extractor so we no longer get those warnings.

Our timestamp extractor looks like this
class TsExtractor[T](time : Time) extends BoundedOutOfOrdernessTimestampExtractor[Timestamped[T]](time : Time) {
override def extractTimestamp(element: Timestamped[T]): Long = element.timestamp
}

Our stream topology starts with a single stream, then we do two separate flat map and filtering operations on the initial stream to transform data batches
into streams of two different event types. We then assignTimestampsAndWatermarks(new TsExtractor[EventType](Time.seconds(20))) for each event type on both
branches before unioning the two branches to a single stream again (the reason for the split is that the data used to come from two different topics).

William



----- Original Message -----
From:
"Gary Yao" <[hidden email]>

To:
"William Saar" <[hidden email]>
Cc:
"user" <[hidden email]>
Sent:
Thu, 18 Jan 2018 11:11:17 +0100
Subject:
Re: Far too few watermarks getting generated with Kafka source



Hi William,

How often does the Watermark get updated? Can you share your code that generates
the watermarks? Watermarks should be strictly ascending. If your code produces
watermarks that are not ascending, smaller ones will be discarded. Could it be
that the events in Kafka are more "out of order" with respect to event time than
in your file?

You can assign timestamps in the Kafka source or later. The Flink documentation
has a section on why it could be beneficial to assign Watermarks in the Kafka
source:


Best,
Gary

On Wed, Jan 17, 2018 at 5:15 PM, William Saar <[hidden email]> wrote:
Hi,
I have a job where we read data from either Kafka or a file (for testing), decode the entries and flat map them into events, and then add a timestamp and watermark assigner to the events in a later operation. This seems to generate periodic watermarks when running from a file, but when Kafka is the source we barely get any watermark updates. What could be causing this? (the environment has setAutowatermarkInterval(1000))

Do we need to do all the timestamp and watermark assignment in the Kafka source? or should it work to do it in later operations? The events do seem to get propagated through the pipeline, we're just not getting watermarks...

Thanks,
William