You are right, the checkpoints should contain all offsets.I created a Ticket for this: https://issues.apache.org/jira/browse/FLINK-3440On Thu, Feb 18, 2016 at 10:15 AM, agaoglu <[hidden email]> wrote:Hi,
On a related and a more exaggerated setup, our kafka-producer (flume) seems
to send data to a single partition at a time and switches it every few
minutes. So when i run my flink datastream program for the first time, it
starts on the *largest* offsets and shows something like this:
. Fetched the following start offsets [FetchPartition {partition=7,
offset=15118832832}]
. Fetched the following start offsets [FetchPartition {partition=1,
offset=15203613236}]
. Fetched the following start offsets [FetchPartition {partition=2,
offset=15366811664}]
. Fetched the following start offsets [FetchPartition {partition=0,
offset=15393999709}]
. Fetched the following start offsets [FetchPartition {partition=8,
offset=15319475583}]
. Fetched the following start offsets [FetchPartition {partition=5,
offset=15482889767}]
. Fetched the following start offsets [FetchPartition {partition=6,
offset=15113885928}]
. Fetched the following start offsets [FetchPartition {partition=3,
offset=15182701991}]
. Fetched the following start offsets [FetchPartition {partition=4,
offset=15186569356}]
For that instance flume happens to be sending data to partition-6 only, so
other consumers sit idly. Working with default paralellism 4, only one of
the 4 threads is able to source data and checkpointing logs reflect that:
Committing offsets [-915623761776, -915623761776, -915623761776,
-915623761776, -915623761776, -915623761776, -915623761776, -915623761776,
-915623761776] to offset store: FLINK_ZOOKEEPER
Committing offsets [-915623761776, -915623761776, -915623761776,
-915623761776, -915623761776, -915623761776, 15114275927, -915623761776,
-915623761776] to offset store: FLINK_ZOOKEEPER
Committing offsets [-915623761776, -915623761776, -915623761776,
-915623761776, -915623761776, -915623761776, -915623761776, -915623761776,
-915623761776] to offset store: FLINK_ZOOKEEPER
Committing offsets [-915623761776, -915623761776, -915623761776,
-915623761776, -915623761776, -915623761776, -915623761776, -915623761776,
-915623761776] to offset store: FLINK_ZOOKEEPER
This also means checkpoint will only contain the offset for partition-6. So
if program is stopped and restarted at a later time, it restores the offset
for partition-6 only and other partitions are started at the largest offset.
So it's able to process unseen data in partition-6 but not others. Say if
flume produces data to partition-3 when flink program is stopped, they're
lost, while the data in partition-6 is not. This generally causes multiple
(late-)windows to be fired after restart, because we now generate watermarks
off partition-3 which says the windows of the unseen data in partition-6 are
already complete.
This also has a side effect of windows not triggering unless some
rebalancing is done beforehand. Since only 1 of the 4 threads will source
data and generate watermarks, window triggers won't get watermarks from
other 3 sources and wait long past the watermarks generated from the single
source.
I know producers shouldn't work like that, but consumers shouldn't care. I
think it may also create some edge cases even if things were not as extreme
as ours. If checkpoints could contain offsets of all of the partitions
regardless of their contents, probably storing start offsets in first run, i
guess that would solve the problems around restarting.
--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kafka-partition-alignment-for-event-time-tp4782p4998.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.
Free forum by Nabble | Edit this page |