Watermarks and Kafka

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

Watermarks and Kafka

Juan Gentile

Hello!

 

We currently have a job which reads from Kafka and uses punctuated watermarks based on the messages we read. We currently keep track of the watermarks for each partition to emit a consensus watermark, taking the smallest of all partitions.

We ran into an issue because we are not storing the state of this map of partitions->watermarks when one of the partitions got delayed and the job restarted, losing track of that partition and emitting a watermark anyway.

Our idea of a solution involves saving this map of partition -> watermarks into the state but we would like to know how Flink behaves when we decrease the parallelism so as to make sure that the instance that will read from Kafka also will have the state for that particular partition.

 

To give an example:

 

Operator 1: (Reads Partition1)

Partition 1: Watermark1 (Map / State)

 

Operator 2: (Reads Partition2)

Partition 2: Watermark2 (Map / State)

 

Operator 3: (Reads Partition1)

Partition 3: Watermark3 (Map / State)

 

 

After shrinking:

 

Operator 1: (Reads Partition1)

Partition 1: Watermark1 (Map / State)

 

Operator 2: (Reads Partition2, Partition3)

Partition 2: Watermark2 (Map / State)

Partition 3: Watermark3 (Map / State)

 

Or

 

Operator 1: (Reads Partition1, Partition3) => HERE we would have a problem as the state could be loaded on the other operator.

Partition 1: Watermark1 (Map / State)

 

Operator 2: (Reads Partition2)

Partition 2: Watermark2 (Map / State)

Partition 3: Watermark3 (Map / State)

 

For this we are using the operator state (https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/state.html#using-managed-operator-state) with “Even-split redistribution

 

Could you please give us a hand understanding how Flink behaves in such scenario?

 

Thank you,

Juan G.

Reply | Threaded
Open this post in threaded view
|

Re: Watermarks and Kafka

Konstantin Knauf-2
Hi Juan,

I just replied to your other question, but I think, I better get where you are coming from now.

Are you aware of per-partition watermarking [1]? You don't need to manage this map yourself. BUT: this does not solve the problem, that this Map is not stored in Managed State. Watermarks are generally not part of Flink's State. It seems like this is what you are looking for?

To also answer your questions: You could go for List<Entry<Partition, Watermark>> state with union redistribution. In this case every operator will get all entries during recovery and you can filter out the ones, which are relevant to the current operator by checking which partitions it is subscribed to after recovery.

Hope this helps.

Cheers,

Konstantin



On Wed, Jul 3, 2019 at 6:04 PM Juan Gentile <[hidden email]> wrote:

Hello!

 

We currently have a job which reads from Kafka and uses punctuated watermarks based on the messages we read. We currently keep track of the watermarks for each partition to emit a consensus watermark, taking the smallest of all partitions.

We ran into an issue because we are not storing the state of this map of partitions->watermarks when one of the partitions got delayed and the job restarted, losing track of that partition and emitting a watermark anyway.

Our idea of a solution involves saving this map of partition -> watermarks into the state but we would like to know how Flink behaves when we decrease the parallelism so as to make sure that the instance that will read from Kafka also will have the state for that particular partition.

 

To give an example:

 

Operator 1: (Reads Partition1)

Partition 1: Watermark1 (Map / State)

 

Operator 2: (Reads Partition2)

Partition 2: Watermark2 (Map / State)

 

Operator 3: (Reads Partition1)

Partition 3: Watermark3 (Map / State)

 

 

After shrinking:

 

Operator 1: (Reads Partition1)

Partition 1: Watermark1 (Map / State)

 

Operator 2: (Reads Partition2, Partition3)

Partition 2: Watermark2 (Map / State)

Partition 3: Watermark3 (Map / State)

 

Or

 

Operator 1: (Reads Partition1, Partition3) => HERE we would have a problem as the state could be loaded on the other operator.

Partition 1: Watermark1 (Map / State)

 

Operator 2: (Reads Partition2)

Partition 2: Watermark2 (Map / State)

Partition 3: Watermark3 (Map / State)

 

For this we are using the operator state (https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/state.html#using-managed-operator-state) with “Even-split redistribution

 

Could you please give us a hand understanding how Flink behaves in such scenario?

 

Thank you,

Juan G.



--

Konstantin Knauf | Solutions Architect

+49 160 91394525


Planned Absences: 10.08.2019 - 31.08.2019, 05.09. - 06.09.2010


--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen

Reply | Threaded
Open this post in threaded view
|

Re: Watermarks and Kafka

Juan Gentile

Hello Konstantin,

 

Thank you for you answer, I’ll clarify a bit our problem as actually we have a clear understanding of our problem now 😊.

We have 2 Kafka topics from 2 different datacenters (each with its own watermarks – We have a watermark message injected in each of them).

We replicate these into a single Kafka topic from which our Flink job consumes. In the Flink job, we consume per partition but the watermarks in a partition may come from 2 different datacenters, so the watermark could differ. That’s why we need to keep a Map in memory for each partition and DC and then emit the minimum of both.

Our current workaround to the problem is to implement something similar to what you mentioned, we have an operator that assigns the watermarks and has a Map which is stored in the state. But since there is no guarantee that all of these operators will receive messages from all partitions we need to remove the partition from the Map, and just use DC’s.  Then we use the union redistribution and always get the minimum of all DC’s for all (when it restores). This seems to work if we keep the same parallelism for the source and the watermark assigner, keeping them as close as possible in the flow and use operator chaining. But we understand that if we were to split them or have different parallelism then the watermark assigner would stop working properly because the partition wouldn’t be in the map. So that’s why we were looking for a solution that has already the watermarks handled in the source operator.

Please let us know your opinion.

 

Thank you,

Juan G.

 

From: Konstantin Knauf <[hidden email]>
Date: Sunday, July 7, 2019 at 10:14 PM
To: Juan Gentile <[hidden email]>
Cc: "[hidden email]" <[hidden email]>, Olivier Solliec <[hidden email]>, Oleksandr Nitavskyi <[hidden email]>
Subject: Re: Watermarks and Kafka

 

Hi Juan,

 

I just replied to your other question, but I think, I better get where you are coming from now.

 

Are you aware of per-partition watermarking [1]? You don't need to manage this map yourself. BUT: this does not solve the problem, that this Map is not stored in Managed State. Watermarks are generally not part of Flink's State. It seems like this is what you are looking for?

 

To also answer your questions: You could go for List<Entry<Partition, Watermark>> state with union redistribution. In this case every operator will get all entries during recovery and you can filter out the ones, which are relevant to the current operator by checking which partitions it is subscribed to after recovery.

 

Hope this helps.

 

Cheers,

 

Konstantin

 

 

 

On Wed, Jul 3, 2019 at 6:04 PM Juan Gentile <[hidden email]> wrote:

Hello!

 

We currently have a job which reads from Kafka and uses punctuated watermarks based on the messages we read. We currently keep track of the watermarks for each partition to emit a consensus watermark, taking the smallest of all partitions.

We ran into an issue because we are not storing the state of this map of partitions->watermarks when one of the partitions got delayed and the job restarted, losing track of that partition and emitting a watermark anyway.

Our idea of a solution involves saving this map of partition -> watermarks into the state but we would like to know how Flink behaves when we decrease the parallelism so as to make sure that the instance that will read from Kafka also will have the state for that particular partition.

 

To give an example:

 

Operator 1: (Reads Partition1)

Partition 1: Watermark1 (Map / State)

 

Operator 2: (Reads Partition2)

Partition 2: Watermark2 (Map / State)

 

Operator 3: (Reads Partition1)

Partition 3: Watermark3 (Map / State)

 

 

After shrinking:

 

Operator 1: (Reads Partition1)

Partition 1: Watermark1 (Map / State)

 

Operator 2: (Reads Partition2, Partition3)

Partition 2: Watermark2 (Map / State)

Partition 3: Watermark3 (Map / State)

 

Or

 

Operator 1: (Reads Partition1, Partition3) => HERE we would have a problem as the state could be loaded on the other operator.

Partition 1: Watermark1 (Map / State)

 

Operator 2: (Reads Partition2)

Partition 2: Watermark2 (Map / State)

Partition 3: Watermark3 (Map / State)

 

For this we are using the operator state (https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/state.html#using-managed-operator-state) with “Even-split redistribution

 

Could you please give us a hand understanding how Flink behaves in such scenario?

 

Thank you,

Juan G.



--

Konstantin Knauf | Solutions Architect

+49 160 91394525

 

Planned Absences: 10.08.2019 - 31.08.2019, 05.09. - 06.09.2010

 

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen