Flink, local development, finish processing a stream of Kafka data

classic Classic list List threaded Threaded
5 messages Options
Dan
Reply | Threaded
Open this post in threaded view
|

Flink, local development, finish processing a stream of Kafka data

Dan
Hi. 

For local and tests development, I want to flush the events in my system to make sure I'm processing everything.  My watermark does not progress to finish all of the data.

What's the best practice for local development or tests?

If I use idle sources for 1 Kafka partition, this appears broken.  I'm guessing there is logic to prevent removing an idle partition if it's the only partition.  Is there a version of this I can enable for local development that supports 1 partition?

I see this tech talk.  Are there other talks to watch?

Do I need to write my own watermark generator?  Or change my test data to have a way of generating watermarks?

I've tried a few variants of the following source code.  The watermark doesn't progress in the operator right after creating the source.

SingleOutputStreamOperator<T> viewInput = env.addSource(...)
        .uid("source-view")
        .assignTimestampsAndWatermarks(
                WatermarkStrategy.<T>forBoundedOutOfOrderness(Duration.ofSeconds(1)).withIdleness(Duration.ofMinutes(1)));
Dan
Reply | Threaded
Open this post in threaded view
|

Does WatermarkStrategy.withIdleness work?

Dan
I haven't been able to get WatermarkStrategy.withIdleness to work.  Is it broken?  None of my timers trigger when I'd expect idleness to take over.

On Tue, Mar 2, 2021 at 11:15 PM Dan Hill <[hidden email]> wrote:
Hi. 

For local and tests development, I want to flush the events in my system to make sure I'm processing everything.  My watermark does not progress to finish all of the data.

What's the best practice for local development or tests?

If I use idle sources for 1 Kafka partition, this appears broken.  I'm guessing there is logic to prevent removing an idle partition if it's the only partition.  Is there a version of this I can enable for local development that supports 1 partition?

I see this tech talk.  Are there other talks to watch?

Do I need to write my own watermark generator?  Or change my test data to have a way of generating watermarks?

I've tried a few variants of the following source code.  The watermark doesn't progress in the operator right after creating the source.

SingleOutputStreamOperator<T> viewInput = env.addSource(...)
        .uid("source-view")
        .assignTimestampsAndWatermarks(
                WatermarkStrategy.<T>forBoundedOutOfOrderness(Duration.ofSeconds(1)).withIdleness(Duration.ofMinutes(1)));
Reply | Threaded
Open this post in threaded view
|

Re: Does WatermarkStrategy.withIdleness work?

David Anderson-4
WatermarkStrategy.withIdleness works by marking idle streams as idle, so that downstream operators will ignore those streams and allow the watermarks to progress based only on the advancement of the watermarks of the still active streams. As you suspected, this mechanism does not provide for the watermark to be advanced in situations where all of the streams are idle.

If your goal is ensure that all of the events are processed and all event-time timers are fired (and all event-time windows are closed) before a job ends, Flink already includes a mechanism for this purpose. If you are using a bounded source, then when that source reaches the end of its input, a final Watermark of value Watermark.MAX_WATERMARK will be automatically emitted. The --drain option, as in

./bin/flink stop --drain <job-id>

also has this effect [1].

With a Kafka source, you can arrange for this to happen by having your kafka deserializer return true from its isEndOfStream() method. Or you could use the new KafkaSource connector included in Flink 1.12 with its setBounded option.

On the other hand, if you really did need to advance the watermark despite a (possibly temporary) total lack of events, you could implement a watermark strategy that artificially advances the watermark based on the passage of processing time. You'll find an example in [2], though it hasn't been updated to use the new watermark strategy interface.

Regards,
David


On Fri, Mar 12, 2021 at 9:47 AM Dan Hill <[hidden email]> wrote:
I haven't been able to get WatermarkStrategy.withIdleness to work.  Is it broken?  None of my timers trigger when I'd expect idleness to take over.

On Tue, Mar 2, 2021 at 11:15 PM Dan Hill <[hidden email]> wrote:
Hi. 

For local and tests development, I want to flush the events in my system to make sure I'm processing everything.  My watermark does not progress to finish all of the data.

What's the best practice for local development or tests?

If I use idle sources for 1 Kafka partition, this appears broken.  I'm guessing there is logic to prevent removing an idle partition if it's the only partition.  Is there a version of this I can enable for local development that supports 1 partition?

I see this tech talk.  Are there other talks to watch?

Do I need to write my own watermark generator?  Or change my test data to have a way of generating watermarks?

I've tried a few variants of the following source code.  The watermark doesn't progress in the operator right after creating the source.

SingleOutputStreamOperator<T> viewInput = env.addSource(...)
        .uid("source-view")
        .assignTimestampsAndWatermarks(
                WatermarkStrategy.<T>forBoundedOutOfOrderness(Duration.ofSeconds(1)).withIdleness(Duration.ofMinutes(1)));
Dan
Reply | Threaded
Open this post in threaded view
|

Re: Does WatermarkStrategy.withIdleness work?

Dan
Thanks David!

On Fri, Mar 12, 2021, 01:54 David Anderson <[hidden email]> wrote:
WatermarkStrategy.withIdleness works by marking idle streams as idle, so that downstream operators will ignore those streams and allow the watermarks to progress based only on the advancement of the watermarks of the still active streams. As you suspected, this mechanism does not provide for the watermark to be advanced in situations where all of the streams are idle.

If your goal is ensure that all of the events are processed and all event-time timers are fired (and all event-time windows are closed) before a job ends, Flink already includes a mechanism for this purpose. If you are using a bounded source, then when that source reaches the end of its input, a final Watermark of value Watermark.MAX_WATERMARK will be automatically emitted. The --drain option, as in

./bin/flink stop --drain <job-id>

also has this effect [1].

With a Kafka source, you can arrange for this to happen by having your kafka deserializer return true from its isEndOfStream() method. Or you could use the new KafkaSource connector included in Flink 1.12 with its setBounded option.

On the other hand, if you really did need to advance the watermark despite a (possibly temporary) total lack of events, you could implement a watermark strategy that artificially advances the watermark based on the passage of processing time. You'll find an example in [2], though it hasn't been updated to use the new watermark strategy interface.

Regards,
David


On Fri, Mar 12, 2021 at 9:47 AM Dan Hill <[hidden email]> wrote:
I haven't been able to get WatermarkStrategy.withIdleness to work.  Is it broken?  None of my timers trigger when I'd expect idleness to take over.

On Tue, Mar 2, 2021 at 11:15 PM Dan Hill <[hidden email]> wrote:
Hi. 

For local and tests development, I want to flush the events in my system to make sure I'm processing everything.  My watermark does not progress to finish all of the data.

What's the best practice for local development or tests?

If I use idle sources for 1 Kafka partition, this appears broken.  I'm guessing there is logic to prevent removing an idle partition if it's the only partition.  Is there a version of this I can enable for local development that supports 1 partition?

I see this tech talk.  Are there other talks to watch?

Do I need to write my own watermark generator?  Or change my test data to have a way of generating watermarks?

I've tried a few variants of the following source code.  The watermark doesn't progress in the operator right after creating the source.

SingleOutputStreamOperator<T> viewInput = env.addSource(...)
        .uid("source-view")
        .assignTimestampsAndWatermarks(
                WatermarkStrategy.<T>forBoundedOutOfOrderness(Duration.ofSeconds(1)).withIdleness(Duration.ofMinutes(1)));
Dan
Reply | Threaded
Open this post in threaded view
|

Re: Does WatermarkStrategy.withIdleness work?

Dan
JFYI in case other users find this in the future.

ProcessingTimeTrailingBoundedOutOfOrdernessTimestampExtractor has a small issue if modified to be used with the new watermark API and if the events can have the same timestamp.  I changed my code to do this onPeriodicEmit.  In this situation, we have a lot of events with the same timestamp.  If the code is still processing events for the same timestamp, periodic emit will think we ran out of events (even though we've processed a bunch of events) and then return a bad watermark.  We modified our copy of this code to keep track of how many events have been emitted.  Since we're just using this for local development, it's fine.


On Fri, Mar 12, 2021 at 1:55 AM Dan Hill <[hidden email]> wrote:
Thanks David!

On Fri, Mar 12, 2021, 01:54 David Anderson <[hidden email]> wrote:
WatermarkStrategy.withIdleness works by marking idle streams as idle, so that downstream operators will ignore those streams and allow the watermarks to progress based only on the advancement of the watermarks of the still active streams. As you suspected, this mechanism does not provide for the watermark to be advanced in situations where all of the streams are idle.

If your goal is ensure that all of the events are processed and all event-time timers are fired (and all event-time windows are closed) before a job ends, Flink already includes a mechanism for this purpose. If you are using a bounded source, then when that source reaches the end of its input, a final Watermark of value Watermark.MAX_WATERMARK will be automatically emitted. The --drain option, as in

./bin/flink stop --drain <job-id>

also has this effect [1].

With a Kafka source, you can arrange for this to happen by having your kafka deserializer return true from its isEndOfStream() method. Or you could use the new KafkaSource connector included in Flink 1.12 with its setBounded option.

On the other hand, if you really did need to advance the watermark despite a (possibly temporary) total lack of events, you could implement a watermark strategy that artificially advances the watermark based on the passage of processing time. You'll find an example in [2], though it hasn't been updated to use the new watermark strategy interface.

Regards,
David


On Fri, Mar 12, 2021 at 9:47 AM Dan Hill <[hidden email]> wrote:
I haven't been able to get WatermarkStrategy.withIdleness to work.  Is it broken?  None of my timers trigger when I'd expect idleness to take over.

On Tue, Mar 2, 2021 at 11:15 PM Dan Hill <[hidden email]> wrote:
Hi. 

For local and tests development, I want to flush the events in my system to make sure I'm processing everything.  My watermark does not progress to finish all of the data.

What's the best practice for local development or tests?

If I use idle sources for 1 Kafka partition, this appears broken.  I'm guessing there is logic to prevent removing an idle partition if it's the only partition.  Is there a version of this I can enable for local development that supports 1 partition?

I see this tech talk.  Are there other talks to watch?

Do I need to write my own watermark generator?  Or change my test data to have a way of generating watermarks?

I've tried a few variants of the following source code.  The watermark doesn't progress in the operator right after creating the source.

SingleOutputStreamOperator<T> viewInput = env.addSource(...)
        .uid("source-view")
        .assignTimestampsAndWatermarks(
                WatermarkStrategy.<T>forBoundedOutOfOrderness(Duration.ofSeconds(1)).withIdleness(Duration.ofMinutes(1)));