How to check is Kafka partition "idle" in emitRecordsWithTimestamps

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

How to check is Kafka partition "idle" in emitRecordsWithTimestamps

Alexey Trenikhun
Hello,
I'm thinking about implementing custom Kafka connector which provides event alignment (similar to FLINK-10921, which seems abandoned). What is the way to determine is partition is idle from override of AbstractFetcher.emitRecordsWithTimestamps()? Does KafkaTopicPartitionState has this information ?

Thanks,
Alexey
Reply | Threaded
Open this post in threaded view
|

Re: How to check is Kafka partition "idle" in emitRecordsWithTimestamps

Till Rohrmann
Hi Alexey,

looking at KafkaTopicPartitionStatus, it looks that it does not contain this information. In a nutshell, what you probably have to do is to aggregate the watermarks across all partitions and then pause the consumption of a partition if its watermark advances too much wrt to the minimum watermark. However, this will stall the whole reading process if there is a partition which has no more data. Hence, you will probably also need a mechanism to advance the watermark if the partition becomes idle.

Note that the community is currently working on a new KafkaConnector based on Flink's new source API (FLIP-27). If I am not mistaken, then these new interfaces should eventually also support event time alignment.

Cheers,
Till

On Fri, May 28, 2021 at 7:17 PM Alexey Trenikhun <[hidden email]> wrote:
Hello,
I'm thinking about implementing custom Kafka connector which provides event alignment (similar to FLINK-10921, which seems abandoned). What is the way to determine is partition is idle from override of AbstractFetcher.emitRecordsWithTimestamps()? Does KafkaTopicPartitionState has this information ?

Thanks,
Alexey
Reply | Threaded
Open this post in threaded view
|

Re: How to check is Kafka partition "idle" in emitRecordsWithTimestamps

Alexey Trenikhun
Hi Till,

>However, this will stall the whole reading process if there is a partition which has no more data. Hence, you will probably also need a mechanism to advance the watermark if the partition becomes idle.
This is why I need to find out is partition idle. Looks like Kafka Flink Connector definitely has this information,  looks like derived class KafkaTopicPartitionStateWithWatermarkGenerator has immediateOutput and deferredOutput have field state which has idle flag.

Thank you for information about new KafkaConnector, I assume that you are referring to [1], but it seems also stalled. Or you are talking about different task ?


Thanks,
Alexey

From: Till Rohrmann <[hidden email]>
Sent: Tuesday, June 1, 2021 6:24 AM
To: Alexey Trenikhun <[hidden email]>
Cc: Flink User Mail List <[hidden email]>
Subject: Re: How to check is Kafka partition "idle" in emitRecordsWithTimestamps
 
Hi Alexey,

looking at KafkaTopicPartitionStatus, it looks that it does not contain this information. In a nutshell, what you probably have to do is to aggregate the watermarks across all partitions and then pause the consumption of a partition if its watermark advances too much wrt to the minimum watermark. However, this will stall the whole reading process if there is a partition which has no more data. Hence, you will probably also need a mechanism to advance the watermark if the partition becomes idle.

Note that the community is currently working on a new KafkaConnector based on Flink's new source API (FLIP-27). If I am not mistaken, then these new interfaces should eventually also support event time alignment.

Cheers,
Till

On Fri, May 28, 2021 at 7:17 PM Alexey Trenikhun <[hidden email]> wrote:
Hello,
I'm thinking about implementing custom Kafka connector which provides event alignment (similar to FLINK-10921, which seems abandoned). What is the way to determine is partition is idle from override of AbstractFetcher.emitRecordsWithTimestamps()? Does KafkaTopicPartitionState has this information ?

Thanks,
Alexey
Reply | Threaded
Open this post in threaded view
|

Re: How to check is Kafka partition "idle" in emitRecordsWithTimestamps

Till Rohrmann
 Hi Alexey,

I think the current idleness detection works based on timeouts. You need a special watermark generator that periodically emits the watermarks. If no event has been emitted for so and so long, then it is marked as idle.

Yes, I was referring to FLINK-18450. At the moment nobody is actively working on it, but it is on the roadmap for improvements for the new source APIs (FLIP-27).

Cheers,
Till

On Tue, Jun 1, 2021 at 8:55 PM Alexey Trenikhun <[hidden email]> wrote:
Hi Till,

>However, this will stall the whole reading process if there is a partition which has no more data. Hence, you will probably also need a mechanism to advance the watermark if the partition becomes idle.
This is why I need to find out is partition idle. Looks like Kafka Flink Connector definitely has this information,  looks like derived class KafkaTopicPartitionStateWithWatermarkGenerator has immediateOutput and deferredOutput have field state which has idle flag.

Thank you for information about new KafkaConnector, I assume that you are referring to [1], but it seems also stalled. Or you are talking about different task ?

trigger comment-preview_link fieldId comment fieldName Comment rendererType atlassian-wiki-renderer issueKey FLINK-18450 Preview comment

Thanks,
Alexey

From: Till Rohrmann <[hidden email]>
Sent: Tuesday, June 1, 2021 6:24 AM
To: Alexey Trenikhun <[hidden email]>
Cc: Flink User Mail List <[hidden email]>
Subject: Re: How to check is Kafka partition "idle" in emitRecordsWithTimestamps
 
Hi Alexey,

looking at KafkaTopicPartitionStatus, it looks that it does not contain this information. In a nutshell, what you probably have to do is to aggregate the watermarks across all partitions and then pause the consumption of a partition if its watermark advances too much wrt to the minimum watermark. However, this will stall the whole reading process if there is a partition which has no more data. Hence, you will probably also need a mechanism to advance the watermark if the partition becomes idle.

Note that the community is currently working on a new KafkaConnector based on Flink's new source API (FLIP-27). If I am not mistaken, then these new interfaces should eventually also support event time alignment.

Cheers,
Till

On Fri, May 28, 2021 at 7:17 PM Alexey Trenikhun <[hidden email]> wrote:
Hello,
I'm thinking about implementing custom Kafka connector which provides event alignment (similar to FLINK-10921, which seems abandoned). What is the way to determine is partition is idle from override of AbstractFetcher.emitRecordsWithTimestamps()? Does KafkaTopicPartitionState has this information ?

Thanks,
Alexey