Long latency when consuming a message from KAFKA and checkpoint is enabled

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

Long latency when consuming a message from KAFKA and checkpoint is enabled

nick toker-2
Hello

We noticed the following behavior:
If we enable the flink checkpoints, we saw that there is a delay between the time we write a message to the KAFKA topic and the time the flink kafka connector consumes this message.
The delay is closely related to checkpointInterval and/or minPauseBetweenCheckpoints meening that the MAX delay when consuming a message from KAFKA will be one of these parameters

If we disable the checkpoints, the message is immediately consumed
We work with the EXACTLY_ONCE semantic
Please note that we inject only one message

Could you please advise how we can remove/control this delay?

Please see the attached code of AbstractFetcher and KafkaFetcher (as a png file)
(For example emitRecordsWithTimestamps() use a lock on checkpointLock).
Could this explain the behaviour ?


BR

flink-kafka.png (199K) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: Long latency when consuming a message from KAFKA and checkpoint is enabled

nick toker-2
Hi

any idea?
is it a bug?


regards'
nick

‫בתאריך יום ד׳, 23 בדצמ׳ 2020 ב-11:10 מאת ‪nick toker‬‏ <‪[hidden email]‬‏>:‬
Hello

We noticed the following behavior:
If we enable the flink checkpoints, we saw that there is a delay between the time we write a message to the KAFKA topic and the time the flink kafka connector consumes this message.
The delay is closely related to checkpointInterval and/or minPauseBetweenCheckpoints meening that the MAX delay when consuming a message from KAFKA will be one of these parameters

If we disable the checkpoints, the message is immediately consumed
We work with the EXACTLY_ONCE semantic
Please note that we inject only one message

Could you please advise how we can remove/control this delay?

Please see the attached code of AbstractFetcher and KafkaFetcher (as a png file)
(For example emitRecordsWithTimestamps() use a lock on checkpointLock).
Could this explain the behaviour ?


BR
Reply | Threaded
Open this post in threaded view
|

Re: Long latency when consuming a message from KAFKA and checkpoint is enabled

Danny Chan-2
Hi, Nick ~
The behavior is as expected, because Kafka source/sink relies on the Checkpoints to complement the exactly-once write semantics, a checkpoint snapshot the states on a time point which is used for recovering, the current internals for Kafka sink is that it writes to Kafka but only commits it when a checkpoint completes.

For your needs, i guess you want a more near-real-time write but still keep the exactly once semantics, i'm sorry to tell that there is no other infrastructure that we can use for exactly-once semantics except for the checkpoints.

nick toker <[hidden email]> 于2020年12月27日周日 下午3:12写道:
Hi

any idea?
is it a bug?


regards'
nick

‫בתאריך יום ד׳, 23 בדצמ׳ 2020 ב-11:10 מאת ‪nick toker‬‏ <‪[hidden email]‬‏>:‬
Hello

We noticed the following behavior:
If we enable the flink checkpoints, we saw that there is a delay between the time we write a message to the KAFKA topic and the time the flink kafka connector consumes this message.
The delay is closely related to checkpointInterval and/or minPauseBetweenCheckpoints meening that the MAX delay when consuming a message from KAFKA will be one of these parameters

If we disable the checkpoints, the message is immediately consumed
We work with the EXACTLY_ONCE semantic
Please note that we inject only one message

Could you please advise how we can remove/control this delay?

Please see the attached code of AbstractFetcher and KafkaFetcher (as a png file)
(For example emitRecordsWithTimestamps() use a lock on checkpointLock).
Could this explain the behaviour ?


BR
Reply | Threaded
Open this post in threaded view
|

Re: Long latency when consuming a message from KAFKA and checkpoint is enabled

Arvid Heise-3
Hi Nick,

I'm not entirely sure that I understand your setup correctly.

Basically, when enabling exactly once and checkpointing, Flink will only consume messages that have been committed.
If you chain two Flink jobs with an intermediate Kafka topic, then the first Flink job will only commit messages on checkpoints and thus the second Flink job will only read these messages with a delay up to the checkpoint interval.

Now if your input record is created with a different tool, make sure that you commit it immediately. Then, Flink should immediately also process that record. However, note that Flink again writes the record in a transaction. Thus, if your tests involve you checking for the output, you would need to configure your reader to read uncommitted data [1].

You can decrease the latency by decreasing the checkpointing interval. If you have a need for very low latency, you might also check if you really need exactly once (that's typically not necessary).


On Mon, Dec 28, 2020 at 3:07 AM Danny Chan <[hidden email]> wrote:
Hi, Nick ~
The behavior is as expected, because Kafka source/sink relies on the Checkpoints to complement the exactly-once write semantics, a checkpoint snapshot the states on a time point which is used for recovering, the current internals for Kafka sink is that it writes to Kafka but only commits it when a checkpoint completes.

For your needs, i guess you want a more near-real-time write but still keep the exactly once semantics, i'm sorry to tell that there is no other infrastructure that we can use for exactly-once semantics except for the checkpoints.

nick toker <[hidden email]> 于2020年12月27日周日 下午3:12写道:
Hi

any idea?
is it a bug?


regards'
nick

‫בתאריך יום ד׳, 23 בדצמ׳ 2020 ב-11:10 מאת ‪nick toker‬‏ <‪[hidden email]‬‏>:‬
Hello

We noticed the following behavior:
If we enable the flink checkpoints, we saw that there is a delay between the time we write a message to the KAFKA topic and the time the flink kafka connector consumes this message.
The delay is closely related to checkpointInterval and/or minPauseBetweenCheckpoints meening that the MAX delay when consuming a message from KAFKA will be one of these parameters

If we disable the checkpoints, the message is immediately consumed
We work with the EXACTLY_ONCE semantic
Please note that we inject only one message

Could you please advise how we can remove/control this delay?

Please see the attached code of AbstractFetcher and KafkaFetcher (as a png file)
(For example emitRecordsWithTimestamps() use a lock on checkpointLock).
Could this explain the behaviour ?


BR


--

Arvid Heise | Senior Java Developer


Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng