Between Checkpoints in Kafka 11

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

Between Checkpoints in Kafka 11

Harshvardhan Agrawal
Hi,

I was going through the blog post on how TwoPhaseCommitSink function works with Kafka 11. One of the things I don’t understand is: What is the behavior of the Kafka 11 Producer between two checkpoints? Say that the time interval between two checkpoints is set to 15 minutes. Will Flink buffer all records in memory in that case and start writing to Kafka when the next checkpoint starts?

Thanks!
--
Regards,
Harshvardhan
Reply | Threaded
Open this post in threaded view
|

Re: Between Checkpoints in Kafka 11

Harshvardhan Agrawal
Hi,

Can someone please help me understand how does the exactly once semantic work with Kafka 11 in Flink?

Thanks,
Harsh

On Tue, Sep 11, 2018 at 10:54 AM Harshvardhan Agrawal <[hidden email]> wrote:
Hi,

I was going through the blog post on how TwoPhaseCommitSink function works with Kafka 11. One of the things I don’t understand is: What is the behavior of the Kafka 11 Producer between two checkpoints? Say that the time interval between two checkpoints is set to 15 minutes. Will Flink buffer all records in memory in that case and start writing to Kafka when the next checkpoint starts?

Thanks!
--
Regards,
Harshvardhan


--
Regards,
Harshvardhan Agrawal
267.991.6618 | LinkedIn
Reply | Threaded
Open this post in threaded view
|

Re: Between Checkpoints in Kafka 11

vino yang
Hi Harshvardhan,

In fact, Flink does not cache data between two checkpoints. In fact, Flink only calls different operations at different points in time. These operations are provided by the Kafka client, so you should have a deeper understanding of the principles of Kafka producer transactions.

In general,

1) TwoPhaseCommitSinkFunction#snapshotState, preCommit old transaction and begin a new transaction
2) TwoPhaseCommitSinkFunction#notifyCheckpointComplete will commit a pending transaction
3) TwoPhaseCommitSinkFunction#close will abort current transaction
4) TwoPhaseCommitSinkFunction#initializeState may recoverAndCommit and recoverAndAbort and begin a new transaction

Looking at the source code of TwoPhaseCommitSinkFunction and FlinkKafkaProducer011 will give you a better understanding of the whole process.

Note the preCommit will trigger kafka transcation.producer.flush() wich method will flush unsend records (that is, there may be a local buffer inside the kafka client, but this is not related to flink). So, producer transaction is not a check The data of the point is cached locally, or one piece of data is not sent, or the data is all sent, and the atomicity is not guaranteed in this form.

From the implementation of kafka, for the produer transaction, the data will be sent to the kafka broker first, and the commit operation will ensure that the data is visible to the consumer.

Thanks, vino.

Harshvardhan Agrawal <[hidden email]> 于2018年9月23日周日 下午11:48写道:
Hi,

Can someone please help me understand how does the exactly once semantic work with Kafka 11 in Flink?

Thanks,
Harsh

On Tue, Sep 11, 2018 at 10:54 AM Harshvardhan Agrawal <[hidden email]> wrote:
Hi,

I was going through the blog post on how TwoPhaseCommitSink function works with Kafka 11. One of the things I don’t understand is: What is the behavior of the Kafka 11 Producer between two checkpoints? Say that the time interval between two checkpoints is set to 15 minutes. Will Flink buffer all records in memory in that case and start writing to Kafka when the next checkpoint starts?

Thanks!
--
Regards,
Harshvardhan


--
Regards,
Harshvardhan Agrawal
267.991.6618 | LinkedIn
Reply | Threaded
Open this post in threaded view
|

Re: Between Checkpoints in Kafka 11

Dawid Wysakowicz-2
In reply to this post by Harshvardhan Agrawal

Hi Harshvardhan,

Flink won't buffer all the events between checkpoints. Flink uses Kafka's transaction, which are committed only on checkpoints, so the data will be persisted on the Kafka's side, but only available to read once committed.

I've cced Piotr, who implemented the Kafka 0.11 connector in case he wants to correct me or add something to the answer.

Best,

Dawid


On 23/09/18 17:48, Harshvardhan Agrawal wrote:
Hi,

Can someone please help me understand how does the exactly once semantic work with Kafka 11 in Flink?

Thanks,
Harsh

On Tue, Sep 11, 2018 at 10:54 AM Harshvardhan Agrawal <[hidden email]> wrote:
Hi,

I was going through the blog post on how TwoPhaseCommitSink function works with Kafka 11. One of the things I don’t understand is: What is the behavior of the Kafka 11 Producer between two checkpoints? Say that the time interval between two checkpoints is set to 15 minutes. Will Flink buffer all records in memory in that case and start writing to Kafka when the next checkpoint starts?

Thanks!
--
Regards,
Harshvardhan


--
Regards,
Harshvardhan Agrawal
267.991.6618 | LinkedIn


signature.asc (849 bytes) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: Between Checkpoints in Kafka 11

Piotr Nowojski
Hi,

I have nothing more to add. You (Dawid) and Vino explained it correctly :)

Piotrek

On 24 Sep 2018, at 15:16, Dawid Wysakowicz <[hidden email]> wrote:

Hi Harshvardhan,

Flink won't buffer all the events between checkpoints. Flink uses Kafka's transaction, which are committed only on checkpoints, so the data will be persisted on the Kafka's side, but only available to read once committed.

I've cced Piotr, who implemented the Kafka 0.11 connector in case he wants to correct me or add something to the answer.

Best,

Dawid


On 23/09/18 17:48, Harshvardhan Agrawal wrote:
Hi,

Can someone please help me understand how does the exactly once semantic work with Kafka 11 in Flink?

Thanks,
Harsh

On Tue, Sep 11, 2018 at 10:54 AM Harshvardhan Agrawal <[hidden email]> wrote:
Hi,

I was going through the blog post on how TwoPhaseCommitSink function works with Kafka 11. One of the things I don’t understand is: What is the behavior of the Kafka 11 Producer between two checkpoints? Say that the time interval between two checkpoints is set to 15 minutes. Will Flink buffer all records in memory in that case and start writing to Kafka when the next checkpoint starts?

Thanks!
--
Regards,
Harshvardhan


--
Regards,
Harshvardhan Agrawal
267.991.6618 | LinkedIn