Chaining 2 flink jobs through a KAFKA topic with checkpoint enabled

classic Classic list List threaded Threaded
6 messages Options
Reply | Threaded
Open this post in threaded view
|

Chaining 2 flink jobs through a KAFKA topic with checkpoint enabled

Daniel Peled
Hello,

We have 2 flink jobs that communicate with each other through a KAFKA topic.
Both jobs use checkpoints with EXACTLY ONCE semantic.

We have seen the following behaviour and we want to make sure and ask if this is the expected behaviour or maybe it is a bug.

When the first job produces a message to KAFKA, the message is consumed  by the second job with a latency that depends on the first job checkpoint interval.

We are able to read the message using the kafka tool or using another kafka consumer, but NOT with a flink kafka consumer that again depends on the checkpoint interval of the first job.

How come the consumer of the second job depends on the producer transaction commit time of the first job ?

BR,
Danny
Reply | Threaded
Open this post in threaded view
|

Re: Chaining 2 flink jobs through a KAFKA topic with checkpoint enabled

Chesnay Schepler
I don't particularly know the our Kafka connector, but it sounds like a matter of whether a given consumer does dirty reads.
Flink does not, whereas the other tools you're using do.

On 12/28/2020 7:57 AM, Daniel Peled wrote:
Hello,

We have 2 flink jobs that communicate with each other through a KAFKA topic.
Both jobs use checkpoints with EXACTLY ONCE semantic.

We have seen the following behaviour and we want to make sure and ask if this is the expected behaviour or maybe it is a bug.

When the first job produces a message to KAFKA, the message is consumed  by the second job with a latency that depends on the first job checkpoint interval.

We are able to read the message using the kafka tool or using another kafka consumer, but NOT with a flink kafka consumer that again depends on the checkpoint interval of the first job.

How come the consumer of the second job depends on the producer transaction commit time of the first job ?

BR,
Danny


Reply | Threaded
Open this post in threaded view
|

Re: Chaining 2 flink jobs through a KAFKA topic with checkpoint enabled

Arvid Heise-3
Hi Daniel,

Flink commits transactions on checkpoints while Kafka Streams/connect usually commits on record. This is the typical tradeoff between Throughput and Latency. By decreasing the checkpoint interval in Flink, you can reach comparable latency to Kafka Streams.

If you have two exactly once jobs, the second job may only read data that has been committed (not dirty as Chesnay said). If the second job were to consume data that is uncommitted, it will result in duplicates, in case the first job fails and rolls back.

You can configure the read behavior with isolation.level. If you want to implement exactly once behavior, you also need to set that level in your other Kafka consumers [1]. Also compare what Kafka Streams is setting if you want to go exactly once [2].

If you really want low latency, please also double-check if you really need exactly once.


On Mon, Dec 28, 2020 at 12:22 PM Chesnay Schepler <[hidden email]> wrote:
I don't particularly know the our Kafka connector, but it sounds like a matter of whether a given consumer does dirty reads.
Flink does not, whereas the other tools you're using do.

On 12/28/2020 7:57 AM, Daniel Peled wrote:
Hello,

We have 2 flink jobs that communicate with each other through a KAFKA topic.
Both jobs use checkpoints with EXACTLY ONCE semantic.

We have seen the following behaviour and we want to make sure and ask if this is the expected behaviour or maybe it is a bug.

When the first job produces a message to KAFKA, the message is consumed  by the second job with a latency that depends on the first job checkpoint interval.

We are able to read the message using the kafka tool or using another kafka consumer, but NOT with a flink kafka consumer that again depends on the checkpoint interval of the first job.

How come the consumer of the second job depends on the producer transaction commit time of the first job ?

BR,
Danny




--

Arvid Heise | Senior Java Developer


Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng   
Reply | Threaded
Open this post in threaded view
|

Re: Chaining 2 flink jobs through a KAFKA topic with checkpoint enabled

yidan zhao
I do not have this problem, so I guess it is related with the config of your kafka producer and consumer, and maybe kafka topic properties or kafka server properties also.

Arvid Heise <[hidden email]> 于2021年1月5日周二 下午6:47写道:
Hi Daniel,

Flink commits transactions on checkpoints while Kafka Streams/connect usually commits on record. This is the typical tradeoff between Throughput and Latency. By decreasing the checkpoint interval in Flink, you can reach comparable latency to Kafka Streams.

If you have two exactly once jobs, the second job may only read data that has been committed (not dirty as Chesnay said). If the second job were to consume data that is uncommitted, it will result in duplicates, in case the first job fails and rolls back.

You can configure the read behavior with isolation.level. If you want to implement exactly once behavior, you also need to set that level in your other Kafka consumers [1]. Also compare what Kafka Streams is setting if you want to go exactly once [2].

If you really want low latency, please also double-check if you really need exactly once.


On Mon, Dec 28, 2020 at 12:22 PM Chesnay Schepler <[hidden email]> wrote:
I don't particularly know the our Kafka connector, but it sounds like a matter of whether a given consumer does dirty reads.
Flink does not, whereas the other tools you're using do.

On 12/28/2020 7:57 AM, Daniel Peled wrote:
Hello,

We have 2 flink jobs that communicate with each other through a KAFKA topic.
Both jobs use checkpoints with EXACTLY ONCE semantic.

We have seen the following behaviour and we want to make sure and ask if this is the expected behaviour or maybe it is a bug.

When the first job produces a message to KAFKA, the message is consumed  by the second job with a latency that depends on the first job checkpoint interval.

We are able to read the message using the kafka tool or using another kafka consumer, but NOT with a flink kafka consumer that again depends on the checkpoint interval of the first job.

How come the consumer of the second job depends on the producer transaction commit time of the first job ?

BR,
Danny




--

Arvid Heise | Senior Java Developer


Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng   
Reply | Threaded
Open this post in threaded view
|

Re: Chaining 2 flink jobs through a KAFKA topic with checkpoint enabled

yidan zhao
I think what you need is http://kafka.apache.org/documentation/#consumerconfigs_isolation.level .

The isolation.level setting's default value is read_uncommitted. So, maybe you do not use the default setting?

赵一旦 <[hidden email]> 于2021年1月5日周二 下午9:10写道:
I do not have this problem, so I guess it is related with the config of your kafka producer and consumer, and maybe kafka topic properties or kafka server properties also.

Arvid Heise <[hidden email]> 于2021年1月5日周二 下午6:47写道:
Hi Daniel,

Flink commits transactions on checkpoints while Kafka Streams/connect usually commits on record. This is the typical tradeoff between Throughput and Latency. By decreasing the checkpoint interval in Flink, you can reach comparable latency to Kafka Streams.

If you have two exactly once jobs, the second job may only read data that has been committed (not dirty as Chesnay said). If the second job were to consume data that is uncommitted, it will result in duplicates, in case the first job fails and rolls back.

You can configure the read behavior with isolation.level. If you want to implement exactly once behavior, you also need to set that level in your other Kafka consumers [1]. Also compare what Kafka Streams is setting if you want to go exactly once [2].

If you really want low latency, please also double-check if you really need exactly once.


On Mon, Dec 28, 2020 at 12:22 PM Chesnay Schepler <[hidden email]> wrote:
I don't particularly know the our Kafka connector, but it sounds like a matter of whether a given consumer does dirty reads.
Flink does not, whereas the other tools you're using do.

On 12/28/2020 7:57 AM, Daniel Peled wrote:
Hello,

We have 2 flink jobs that communicate with each other through a KAFKA topic.
Both jobs use checkpoints with EXACTLY ONCE semantic.

We have seen the following behaviour and we want to make sure and ask if this is the expected behaviour or maybe it is a bug.

When the first job produces a message to KAFKA, the message is consumed  by the second job with a latency that depends on the first job checkpoint interval.

We are able to read the message using the kafka tool or using another kafka consumer, but NOT with a flink kafka consumer that again depends on the checkpoint interval of the first job.

How come the consumer of the second job depends on the producer transaction commit time of the first job ?

BR,
Danny




--

Arvid Heise | Senior Java Developer


Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng   
Reply | Threaded
Open this post in threaded view
|

Re: Chaining 2 flink jobs through a KAFKA topic with checkpoint enabled

Daniel Peled
Thank you for your answers.
We ended up changing the isolation level to read_uncommitted and it solves the latency problem between the two jobs understanding that we may have duplications in the second job when the first job fails and roll back.

בתאריך יום ג׳, 5 בינו׳ 2021 ב-15:23 מאת 赵一旦 <[hidden email]>:
I think what you need is http://kafka.apache.org/documentation/#consumerconfigs_isolation.level .

The isolation.level setting's default value is read_uncommitted. So, maybe you do not use the default setting?

赵一旦 <[hidden email]> 于2021年1月5日周二 下午9:10写道:
I do not have this problem, so I guess it is related with the config of your kafka producer and consumer, and maybe kafka topic properties or kafka server properties also.

Arvid Heise <[hidden email]> 于2021年1月5日周二 下午6:47写道:
Hi Daniel,

Flink commits transactions on checkpoints while Kafka Streams/connect usually commits on record. This is the typical tradeoff between Throughput and Latency. By decreasing the checkpoint interval in Flink, you can reach comparable latency to Kafka Streams.

If you have two exactly once jobs, the second job may only read data that has been committed (not dirty as Chesnay said). If the second job were to consume data that is uncommitted, it will result in duplicates, in case the first job fails and rolls back.

You can configure the read behavior with isolation.level. If you want to implement exactly once behavior, you also need to set that level in your other Kafka consumers [1]. Also compare what Kafka Streams is setting if you want to go exactly once [2].

If you really want low latency, please also double-check if you really need exactly once.


On Mon, Dec 28, 2020 at 12:22 PM Chesnay Schepler <[hidden email]> wrote:
I don't particularly know the our Kafka connector, but it sounds like a matter of whether a given consumer does dirty reads.
Flink does not, whereas the other tools you're using do.

On 12/28/2020 7:57 AM, Daniel Peled wrote:
Hello,

We have 2 flink jobs that communicate with each other through a KAFKA topic.
Both jobs use checkpoints with EXACTLY ONCE semantic.

We have seen the following behaviour and we want to make sure and ask if this is the expected behaviour or maybe it is a bug.

When the first job produces a message to KAFKA, the message is consumed  by the second job with a latency that depends on the first job checkpoint interval.

We are able to read the message using the kafka tool or using another kafka consumer, but NOT with a flink kafka consumer that again depends on the checkpoint interval of the first job.

How come the consumer of the second job depends on the producer transaction commit time of the first job ?

BR,
Danny




--

Arvid Heise | Senior Java Developer


Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng