Reserving Kafka offset in Flink after modifying app

classic Classic list List threaded Threaded
5 messages Options
son
Reply | Threaded
Open this post in threaded view
|

Reserving Kafka offset in Flink after modifying app

son
Hello,

I have a topic in Kafka that Flink reads from. I parse and write messages in this topic to BigQuery using streaming insert in batch of 500 messages using in CountWindow in Flink.

Problem: I want to commit manually only when a batch was written successfully to Bigquery.

Reason:
I saw that Flink KafkaConsumer does not use offset committing to Kafka but uses its own checkpointing. I don't know how Flink checkpointing works and I'm worried that Flink's checkpointing does not solve my following situation:
- let's say I have a Flink job running and processing a batch of 500 messages of Kafka offset 1000-1500. 
- I stopped this job before it saves to BigQuery and makes some modifications to the program. Savepoints did not work when I tried because it required the operators code does not change.

What I want is when I start the modified app, it would start every time from offset 1000-1500 in Kafka because these messages have not been written to BigQuery.

Is there any way to achieve this in Flink? 

Thanks,
SM
Reply | Threaded
Open this post in threaded view
|

Re: Reserving Kafka offset in Flink after modifying app

Konstantin Knauf-2
Hi Son,

yes, this is possible, but your sink needs to play its part in Flink's checkpointing mechanism. Depending on the implementation of the sink you should either:

* implemented CheckpointedFunction and flush all records to BigQuery in snapshotState. This way in case of a failure/restart of the job, all records up to the last successful checkpoint will have been written to BigQuery and all other records will be replayed.
* use managed operator state to store all pending records in the sink. Thereby they will be be persisted in snapshotState. This way in  case of a failure/restart of the job, all records up to the last successful checkpoint, which have not been written to BigQuery, will be restored in the sink, all other records will be replayed.

In both cases, you might write the same record to the BigQuery twice.

If in doubt if your sink fulfills the criteria above, feel free to share it.

Cheers,

Konstantin



On Mon, Mar 25, 2019 at 7:50 AM Son Mai <[hidden email]> wrote:
Hello,

I have a topic in Kafka that Flink reads from. I parse and write messages in this topic to BigQuery using streaming insert in batch of 500 messages using in CountWindow in Flink.

Problem: I want to commit manually only when a batch was written successfully to Bigquery.

Reason:
I saw that Flink KafkaConsumer does not use offset committing to Kafka but uses its own checkpointing. I don't know how Flink checkpointing works and I'm worried that Flink's checkpointing does not solve my following situation:
- let's say I have a Flink job running and processing a batch of 500 messages of Kafka offset 1000-1500. 
- I stopped this job before it saves to BigQuery and makes some modifications to the program. Savepoints did not work when I tried because it required the operators code does not change.

What I want is when I start the modified app, it would start every time from offset 1000-1500 in Kafka because these messages have not been written to BigQuery.

Is there any way to achieve this in Flink? 

Thanks,
SM


--

Konstantin Knauf | Solutions Architect

+49 160 91394525


Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Data Artisans GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Data Artisans GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen   
son
Reply | Threaded
Open this post in threaded view
|

Re: Reserving Kafka offset in Flink after modifying app

son
Hi Konstantin, 

Thanks for the response. What still concerned me is:
  1. Am I able to recover from  checkpoints even if I change my program (for example, changing Filter and Map functions, data objects, ..) ? I was not able to recover from savepoints when I changed my program.

On Mon, Mar 25, 2019 at 3:19 PM Konstantin Knauf <[hidden email]> wrote:
Hi Son,

yes, this is possible, but your sink needs to play its part in Flink's checkpointing mechanism. Depending on the implementation of the sink you should either:

* implemented CheckpointedFunction and flush all records to BigQuery in snapshotState. This way in case of a failure/restart of the job, all records up to the last successful checkpoint will have been written to BigQuery and all other records will be replayed.
* use managed operator state to store all pending records in the sink. Thereby they will be be persisted in snapshotState. This way in  case of a failure/restart of the job, all records up to the last successful checkpoint, which have not been written to BigQuery, will be restored in the sink, all other records will be replayed.

In both cases, you might write the same record to the BigQuery twice.

If in doubt if your sink fulfills the criteria above, feel free to share it.

Cheers,

Konstantin



On Mon, Mar 25, 2019 at 7:50 AM Son Mai <[hidden email]> wrote:
Hello,

I have a topic in Kafka that Flink reads from. I parse and write messages in this topic to BigQuery using streaming insert in batch of 500 messages using in CountWindow in Flink.

Problem: I want to commit manually only when a batch was written successfully to Bigquery.

Reason:
I saw that Flink KafkaConsumer does not use offset committing to Kafka but uses its own checkpointing. I don't know how Flink checkpointing works and I'm worried that Flink's checkpointing does not solve my following situation:
- let's say I have a Flink job running and processing a batch of 500 messages of Kafka offset 1000-1500. 
- I stopped this job before it saves to BigQuery and makes some modifications to the program. Savepoints did not work when I tried because it required the operators code does not change.

What I want is when I start the modified app, it would start every time from offset 1000-1500 in Kafka because these messages have not been written to BigQuery.

Is there any way to achieve this in Flink? 

Thanks,
SM


--

Konstantin Knauf | Solutions Architect

+49 160 91394525


Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Data Artisans GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Data Artisans GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen   
Reply | Threaded
Open this post in threaded view
|

Re: Reserving Kafka offset in Flink after modifying app

Yun Tang
Hi Son

I think it might be because of not assigning operator ids to your Filter and Map functions, you could refer to [1] to assign ids to your application. Moreover, if you have ever removed some operators, please consider to add --allowNonRestoredState [2] option.


Best
Yun Tang


From: Son Mai <[hidden email]>
Sent: Tuesday, March 26, 2019 9:51
To: Konstantin Knauf
Cc: user
Subject: Re: Reserving Kafka offset in Flink after modifying app
 
Hi Konstantin, 

Thanks for the response. What still concerned me is:
  1. Am I able to recover from  checkpoints even if I change my program (for example, changing Filter and Map functions, data objects, ..) ? I was not able to recover from savepoints when I changed my program.

On Mon, Mar 25, 2019 at 3:19 PM Konstantin Knauf <[hidden email]> wrote:
Hi Son,

yes, this is possible, but your sink needs to play its part in Flink's checkpointing mechanism. Depending on the implementation of the sink you should either:

* implemented CheckpointedFunction and flush all records to BigQuery in snapshotState. This way in case of a failure/restart of the job, all records up to the last successful checkpoint will have been written to BigQuery and all other records will be replayed.
* use managed operator state to store all pending records in the sink. Thereby they will be be persisted in snapshotState. This way in  case of a failure/restart of the job, all records up to the last successful checkpoint, which have not been written to BigQuery, will be restored in the sink, all other records will be replayed.

In both cases, you might write the same record to the BigQuery twice.

If in doubt if your sink fulfills the criteria above, feel free to share it.

Cheers,

Konstantin



On Mon, Mar 25, 2019 at 7:50 AM Son Mai <[hidden email]> wrote:
Hello,

I have a topic in Kafka that Flink reads from. I parse and write messages in this topic to BigQuery using streaming insert in batch of 500 messages using in CountWindow in Flink.

Problem: I want to commit manually only when a batch was written successfully to Bigquery.

Reason:
I saw that Flink KafkaConsumer does not use offset committing to Kafka but uses its own checkpointing. I don't know how Flink checkpointing works and I'm worried that Flink's checkpointing does not solve my following situation:
- let's say I have a Flink job running and processing a batch of 500 messages of Kafka offset 1000-1500. 
- I stopped this job before it saves to BigQuery and makes some modifications to the program. Savepoints did not work when I tried because it required the operators code does not change.

What I want is when I start the modified app, it would start every time from offset 1000-1500 in Kafka because these messages have not been written to BigQuery.

Is there any way to achieve this in Flink? 

Thanks,
SM


--

Konstantin Knauf | Solutions Architect

+49 160 91394525


Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Data Artisans GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Data Artisans GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen   
Reply | Threaded
Open this post in threaded view
|

Re: Reserving Kafka offset in Flink after modifying app

shengjk1
In reply to this post by son
On 03/26/2019 09:51[hidden email] wrote:
Hi Konstantin, 

Thanks for the response. What still concerned me is:
  1. Am I able to recover from  checkpoints even if I change my program (for example, changing Filter and Map functions, data objects, ..) ? I was not able to recover from savepoints when I changed my program.

On Mon, Mar 25, 2019 at 3:19 PM Konstantin Knauf <[hidden email]> wrote:
Hi Son,

yes, this is possible, but your sink needs to play its part in Flink's checkpointing mechanism. Depending on the implementation of the sink you should either:

* implemented CheckpointedFunction and flush all records to BigQuery in snapshotState. This way in case of a failure/restart of the job, all records up to the last successful checkpoint will have been written to BigQuery and all other records will be replayed.
* use managed operator state to store all pending records in the sink. Thereby they will be be persisted in snapshotState. This way in  case of a failure/restart of the job, all records up to the last successful checkpoint, which have not been written to BigQuery, will be restored in the sink, all other records will be replayed.

In both cases, you might write the same record to the BigQuery twice.

If in doubt if your sink fulfills the criteria above, feel free to share it.

Cheers,

Konstantin



On Mon, Mar 25, 2019 at 7:50 AM Son Mai <[hidden email]> wrote:
Hello,

I have a topic in Kafka that Flink reads from. I parse and write messages in this topic to BigQuery using streaming insert in batch of 500 messages using in CountWindow in Flink.

Problem: I want to commit manually only when a batch was written successfully to Bigquery.

Reason:
I saw that Flink KafkaConsumer does not use offset committing to Kafka but uses its own checkpointing. I don't know how Flink checkpointing works and I'm worried that Flink's checkpointing does not solve my following situation:
- let's say I have a Flink job running and processing a batch of 500 messages of Kafka offset 1000-1500. 
- I stopped this job before it saves to BigQuery and makes some modifications to the program. Savepoints did not work when I tried because it required the operators code does not change.

What I want is when I start the modified app, it would start every time from offset 1000-1500 in Kafka because these messages have not been written to BigQuery.

Is there any way to achieve this in Flink? 

Thanks,
SM


--

Konstantin Knauf | Solutions Architect

+49 160 91394525


Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Data Artisans GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Data Artisans GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen