How to flush all window states after Kafka (0.10.x) topic was removed

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

How to flush all window states after Kafka (0.10.x) topic was removed

Tony Wei
Hi,

I have a simple streaming job consuming data from Kafka and use time window to aggregate them.
I am wondering if there is a built-in function to send a max watermark when consumer find this topic is not available, so that the window function can flush all state to the sink function.

My Kafka version is 0.10.x. Currently, the workaround to me is using `TimestampAssigner` to check a specific record as termination message, and make the watermark be Long.MAX_VALUE.
I will send this message to all partitions before I remove that topic.

I would appreciate if anyone has some suggestions. Thank you.

Best Regards,
Tony Wei
Reply | Threaded
Open this post in threaded view
|

Re: How to flush all window states after Kafka (0.10.x) topic was removed

Tzu-Li (Gordon) Tai
Hi Tony,

Currently, the functionality that you described does not exist in the consumer. When a topic is deleted, as far as I know, the consumer would simply consider the partitions as unreachable and continue to try fetching records from them until they are up again.
I'm not entirely sure if a removed topic is distinguishable from a temporarily out-of-service partition due to Kafka brokers being down in the Kafka API, may need to take a look.

As for the "workaround" that you are using at the moment, you can actually use `KeyedDeserializationSchema#isEndOfStream` for that. When that returns true and the source subtask closes, the Long.MAX_VALUE watermark will be emitted.

Cheers,
Gordon

On Tue, Sep 5, 2017 at 2:50 PM, Tony Wei <[hidden email]> wrote:
Hi,

I have a simple streaming job consuming data from Kafka and use time window to aggregate them.
I am wondering if there is a built-in function to send a max watermark when consumer find this topic is not available, so that the window function can flush all state to the sink function.

My Kafka version is 0.10.x. Currently, the workaround to me is using `TimestampAssigner` to check a specific record as termination message, and make the watermark be Long.MAX_VALUE.
I will send this message to all partitions before I remove that topic.

I would appreciate if anyone has some suggestions. Thank you.

Best Regards,
Tony Wei