Hello,
I have a topic in Kafka that Flink reads from. I parse and write messages in this topic to BigQuery using streaming insert in batch of 500 messages using in CountWindow in Flink. Problem: I want to commit manually only when a batch was written successfully to Bigquery. Reason: I saw that Flink KafkaConsumer does not use offset committing to Kafka but uses its own checkpointing. I don't know how Flink checkpointing works and I'm worried that Flink's checkpointing does not solve my following situation: - let's say I have a Flink job running and processing a batch of 500 messages of Kafka offset 1000-1500. - I stopped this job before it saves to BigQuery and makes some modifications to the program. Savepoints did not work when I tried because it required the operators code does not change. What I want is when I start the modified app, it would start every time from offset 1000-1500 in Kafka because these messages have not been written to BigQuery. Is there any way to achieve this in Flink? Thanks, SM |
Hi Son, yes, this is possible, but your sink needs to play its part in Flink's checkpointing mechanism. Depending on the implementation of the sink you should either: * implemented CheckpointedFunction and flush all records to BigQuery in snapshotState. This way in case of a failure/restart of the job, all records up to the last successful checkpoint will have been written to BigQuery and all other records will be replayed. * use managed operator state to store all pending records in the sink. Thereby they will be be persisted in snapshotState. This way in case of a failure/restart of the job, all records up to the last successful checkpoint, which have not been written to BigQuery, will be restored in the sink, all other records will be replayed. In both cases, you might write the same record to the BigQuery twice. If in doubt if your sink fulfills the criteria above, feel free to share it. Cheers, Konstantin On Mon, Mar 25, 2019 at 7:50 AM Son Mai <[hidden email]> wrote:
-- Konstantin Knauf | Solutions Architect +49 160 91394525 Follow us @VervericaData -- Join Flink Forward - The Apache Flink Conference Stream Processing | Event Driven | Real Time -- Data Artisans GmbH | Invalidenstrasse 115, 10115 Berlin, Germany -- Data Artisans GmbHRegistered at Amtsgericht Charlottenburg: HRB 158244 B Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen |
Hi Konstantin, Thanks for the response. What still concerned me is:
On Mon, Mar 25, 2019 at 3:19 PM Konstantin Knauf <[hidden email]> wrote:
|
Hi Son
I think it might be because of not assigning operator ids to your Filter and Map functions, you could refer to [1] to assign ids to your application. Moreover, if you have ever removed some operators, please consider to add
--allowNonRestoredState [2] option.
Best
Yun Tang
From: Son Mai <[hidden email]>
Sent: Tuesday, March 26, 2019 9:51 To: Konstantin Knauf Cc: user Subject: Re: Reserving Kafka offset in Flink after modifying app Hi Konstantin,
Thanks for the response. What still concerned me is:
On Mon, Mar 25, 2019 at 3:19 PM Konstantin Knauf <[hidden email]> wrote:
|
In reply to this post by son
Mapbe this page can help you
Best, Shengjk1 On 03/26/2019 09:51,[hidden email] wrote:
|
Free forum by Nabble | Edit this page |