Re: Kafka partition alignment for event time
Posted by agaoglu on
URL: http://deprecated-apache-flink-user-mailing-list-archive.369.s1.nabble.com/Kafka-partition-alignment-for-event-time-tp4782p4998.html
Hi,
On a related and a more exaggerated setup, our kafka-producer (flume) seems to send data to a single partition at a time and switches it every few minutes. So when i run my flink datastream program for the first time, it starts on the *largest* offsets and shows something like this:
. Fetched the following start offsets [FetchPartition {partition=7, offset=15118832832}]
. Fetched the following start offsets [FetchPartition {partition=1, offset=15203613236}]
. Fetched the following start offsets [FetchPartition {partition=2, offset=15366811664}]
. Fetched the following start offsets [FetchPartition {partition=0, offset=15393999709}]
. Fetched the following start offsets [FetchPartition {partition=8, offset=15319475583}]
. Fetched the following start offsets [FetchPartition {partition=5, offset=15482889767}]
. Fetched the following start offsets [FetchPartition {partition=6, offset=15113885928}]
. Fetched the following start offsets [FetchPartition {partition=3, offset=15182701991}]
. Fetched the following start offsets [FetchPartition {partition=4, offset=15186569356}]
For that instance flume happens to be sending data to partition-6 only, so other consumers sit idly. Working with default paralellism 4, only one of the 4 threads is able to source data and checkpointing logs reflect that:
Committing offsets [-915623761776, -915623761776, -915623761776, -915623761776, -915623761776, -915623761776, -915623761776, -915623761776, -915623761776] to offset store: FLINK_ZOOKEEPER
Committing offsets [-915623761776, -915623761776, -915623761776, -915623761776, -915623761776, -915623761776, 15114275927, -915623761776, -915623761776] to offset store: FLINK_ZOOKEEPER
Committing offsets [-915623761776, -915623761776, -915623761776, -915623761776, -915623761776, -915623761776, -915623761776, -915623761776, -915623761776] to offset store: FLINK_ZOOKEEPER
Committing offsets [-915623761776, -915623761776, -915623761776, -915623761776, -915623761776, -915623761776, -915623761776, -915623761776, -915623761776] to offset store: FLINK_ZOOKEEPER
This also means checkpoint will only contain the offset for partition-6. So if program is stopped and restarted at a later time, it restores the offset for partition-6 only and other partitions are started at the largest offset. So it's able to process unseen data in partition-6 but not others. Say if flume produces data to partition-3 when flink program is stopped, they're lost, while the data in partition-6 is not. This generally causes multiple (late-)windows to be fired after restart, because we now generate watermarks off partition-3 which says the windows of the unseen data in partition-6 are already complete.
This also has a side effect of windows not triggering unless some rebalancing is done beforehand. Since only 1 of the 4 threads will source data and generate watermarks, window triggers won't get watermarks from other 3 sources and wait long past the watermarks generated from the single source.
I know producers shouldn't work like that, but consumers shouldn't care. I think it may also create some edge cases even if things were not as extreme as ours. If checkpoints could contain offsets of all of the partitions regardless of their contents, probably storing start offsets in first run, i guess that would solve the problems around restarting.