How are kafka consumer offsets handled if sink fails?

classic Classic list List threaded Threaded
9 messages Options
Reply | Threaded
Open this post in threaded view
|

How are kafka consumer offsets handled if sink fails?

John Smith
Hi using Apache Flink 1.8.0

I'm consuming events from Kafka using nothing fancy...

Properties props = new Properties();
props.setProperty("bootstrap.servers", kafkaAddress);
props.setProperty("group.id",kafkaGroup);

FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>(topic, new SimpleStringSchema(),props);

Do some JSON transforms and then push to my SQL database using JDBC and stored procedure. Let's assume the SQL sink fails.

We know that Kafka can either periodically commit offsets or it can be done manually based on consumers logic.

- How is the source Kafka consumer offsets handled?
- Does the Flink Kafka consumer commit the offset to per event/record?
- Will that single event that failed be retried?
- So if we had 5 incoming events and say on the 3rd one it failed, will it continue on the 3rd or will the job restart and try those 5 events.


Reply | Threaded
Open this post in threaded view
|

Re: How are kafka consumer offsets handled if sink fails?

Konstantin Knauf-2
Hi John,

in case of a failure (e.g. in the SQL Sink) the Flink Job will be restarted from the last checkpoint. This means the offset of all Kafka partitions will be reset to that point in the stream along with state of all operators. To enable checkpointing you need to call StreamExecutionEnvironment#enableCheckpointing(). If you using the JDBCSinkFunction (which is an at-least-once sink), the output will be duplicated in the case of failures.

To answer your questions:

* For this the FlinkKafkaConsumer handles the offsets manually (no auto-commit).
* No, the Flink Kafka Consumer does only commit offsets back to Kafka on a best-effort basis after every checkpoint. Internally Flink "commits" the checkpoints as part of its periodic checkpoints.
* Yes, along with all other events between the last checkpoint and the failure.
* It will continue from the last checkpoint.

Hope this helps.

Cheers,

Konstantin

On Fri, Jul 5, 2019 at 8:37 PM John Smith <[hidden email]> wrote:
Hi using Apache Flink 1.8.0

I'm consuming events from Kafka using nothing fancy...

Properties props = new Properties();
props.setProperty("bootstrap.servers", kafkaAddress);
props.setProperty("group.id",kafkaGroup);

FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>(topic, new SimpleStringSchema(),props);

Do some JSON transforms and then push to my SQL database using JDBC and stored procedure. Let's assume the SQL sink fails.

We know that Kafka can either periodically commit offsets or it can be done manually based on consumers logic.

- How is the source Kafka consumer offsets handled?
- Does the Flink Kafka consumer commit the offset to per event/record?
- Will that single event that failed be retried?
- So if we had 5 incoming events and say on the 3rd one it failed, will it continue on the 3rd or will the job restart and try those 5 events.




--

Konstantin Knauf | Solutions Architect

+49 160 91394525


Planned Absences: 10.08.2019 - 31.08.2019, 05.09. - 06.09.2010


--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen

Reply | Threaded
Open this post in threaded view
|

Re: How are kafka consumer offsets handled if sink fails?

John Smith
So when we say a sink is at least once. It's because internally it's not checking any kind of state and it sends what it has regardless, correct? Cause I willl build a sink that calls stored procedures.

On Sun., Jul. 7, 2019, 4:03 p.m. Konstantin Knauf, <[hidden email]> wrote:
Hi John,

in case of a failure (e.g. in the SQL Sink) the Flink Job will be restarted from the last checkpoint. This means the offset of all Kafka partitions will be reset to that point in the stream along with state of all operators. To enable checkpointing you need to call StreamExecutionEnvironment#enableCheckpointing(). If you using the JDBCSinkFunction (which is an at-least-once sink), the output will be duplicated in the case of failures.

To answer your questions:

* For this the FlinkKafkaConsumer handles the offsets manually (no auto-commit).
* No, the Flink Kafka Consumer does only commit offsets back to Kafka on a best-effort basis after every checkpoint. Internally Flink "commits" the checkpoints as part of its periodic checkpoints.
* Yes, along with all other events between the last checkpoint and the failure.
* It will continue from the last checkpoint.

Hope this helps.

Cheers,

Konstantin

On Fri, Jul 5, 2019 at 8:37 PM John Smith <[hidden email]> wrote:
Hi using Apache Flink 1.8.0

I'm consuming events from Kafka using nothing fancy...

Properties props = new Properties();
props.setProperty("bootstrap.servers", kafkaAddress);
props.setProperty("group.id",kafkaGroup);

FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>(topic, new SimpleStringSchema(),props);

Do some JSON transforms and then push to my SQL database using JDBC and stored procedure. Let's assume the SQL sink fails.

We know that Kafka can either periodically commit offsets or it can be done manually based on consumers logic.

- How is the source Kafka consumer offsets handled?
- Does the Flink Kafka consumer commit the offset to per event/record?
- Will that single event that failed be retried?
- So if we had 5 incoming events and say on the 3rd one it failed, will it continue on the 3rd or will the job restart and try those 5 events.




--

Konstantin Knauf | Solutions Architect

+49 160 91394525


Planned Absences: 10.08.2019 - 31.08.2019, 05.09. - 06.09.2010


--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen

Reply | Threaded
Open this post in threaded view
|

Re: How are kafka consumer offsets handled if sink fails?

Rong Rong
Hi John,

I think what Konstantin is trying to say is: Flink's Kafka consumer does not start consuming from the Kafka commit offset when starting the consumer, it would actually start with the offset that's last checkpointed to external DFS. (e.g. the starting point of the consumer has no relevance with Kafka committed offset whatsoever - if checkpoint is enabled.)

This is to quote: 
"the Flink Kafka Consumer does only commit offsets back to Kafka on a best-effort basis after every checkpoint. Internally Flink "commits" the [checkpoints]->[current Kafka offset] as part of its periodic checkpoints."

However if you do not enable checkpointing, I think your consumer will by-default restart from the default kafka offset (which I think is your committed group offset).

--
Rong


On Mon, Jul 8, 2019 at 6:39 AM John Smith <[hidden email]> wrote:
So when we say a sink is at least once. It's because internally it's not checking any kind of state and it sends what it has regardless, correct? Cause I willl build a sink that calls stored procedures.

On Sun., Jul. 7, 2019, 4:03 p.m. Konstantin Knauf, <[hidden email]> wrote:
Hi John,

in case of a failure (e.g. in the SQL Sink) the Flink Job will be restarted from the last checkpoint. This means the offset of all Kafka partitions will be reset to that point in the stream along with state of all operators. To enable checkpointing you need to call StreamExecutionEnvironment#enableCheckpointing(). If you using the JDBCSinkFunction (which is an at-least-once sink), the output will be duplicated in the case of failures.

To answer your questions:

* For this the FlinkKafkaConsumer handles the offsets manually (no auto-commit).
* No, the Flink Kafka Consumer does only commit offsets back to Kafka on a best-effort basis after every checkpoint. Internally Flink "commits" the checkpoints as part of its periodic checkpoints.
* Yes, along with all other events between the last checkpoint and the failure.
* It will continue from the last checkpoint.

Hope this helps.

Cheers,

Konstantin

On Fri, Jul 5, 2019 at 8:37 PM John Smith <[hidden email]> wrote:
Hi using Apache Flink 1.8.0

I'm consuming events from Kafka using nothing fancy...

Properties props = new Properties();
props.setProperty("bootstrap.servers", kafkaAddress);
props.setProperty("group.id",kafkaGroup);

FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>(topic, new SimpleStringSchema(),props);

Do some JSON transforms and then push to my SQL database using JDBC and stored procedure. Let's assume the SQL sink fails.

We know that Kafka can either periodically commit offsets or it can be done manually based on consumers logic.

- How is the source Kafka consumer offsets handled?
- Does the Flink Kafka consumer commit the offset to per event/record?
- Will that single event that failed be retried?
- So if we had 5 incoming events and say on the 3rd one it failed, will it continue on the 3rd or will the job restart and try those 5 events.




--

Konstantin Knauf | Solutions Architect

+49 160 91394525


Planned Absences: 10.08.2019 - 31.08.2019, 05.09. - 06.09.2010


--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen

Reply | Threaded
Open this post in threaded view
|

Re: How are kafka consumer offsets handled if sink fails?

John Smith
Ok so just to be clear. Let's say we started at day 0...

1- Producer inserted 10 records into Kafka.
2- Kafka Flink Consumer consumed 5 records.
3- Some transformations applied to those records.
4- 4 records sinked, 1 failed.
5- Flink Job restarts because of above failure.

When does the checkpoint happen above?
And does it mean in the above case that it will start back at 0 or will it start at the 4th record and continue or where ever the checkpoint happend. Example 3rd record?
My stored proc will be idempotent and I understand if messages get replayed what to do.
Just want to try to understand when and where the checkpointing will happen. 

On Mon, 8 Jul 2019 at 22:23, Rong Rong <[hidden email]> wrote:
Hi John,

I think what Konstantin is trying to say is: Flink's Kafka consumer does not start consuming from the Kafka commit offset when starting the consumer, it would actually start with the offset that's last checkpointed to external DFS. (e.g. the starting point of the consumer has no relevance with Kafka committed offset whatsoever - if checkpoint is enabled.)

This is to quote: 
"the Flink Kafka Consumer does only commit offsets back to Kafka on a best-effort basis after every checkpoint. Internally Flink "commits" the [checkpoints]->[current Kafka offset] as part of its periodic checkpoints."

However if you do not enable checkpointing, I think your consumer will by-default restart from the default kafka offset (which I think is your committed group offset).

--
Rong


On Mon, Jul 8, 2019 at 6:39 AM John Smith <[hidden email]> wrote:
So when we say a sink is at least once. It's because internally it's not checking any kind of state and it sends what it has regardless, correct? Cause I willl build a sink that calls stored procedures.

On Sun., Jul. 7, 2019, 4:03 p.m. Konstantin Knauf, <[hidden email]> wrote:
Hi John,

in case of a failure (e.g. in the SQL Sink) the Flink Job will be restarted from the last checkpoint. This means the offset of all Kafka partitions will be reset to that point in the stream along with state of all operators. To enable checkpointing you need to call StreamExecutionEnvironment#enableCheckpointing(). If you using the JDBCSinkFunction (which is an at-least-once sink), the output will be duplicated in the case of failures.

To answer your questions:

* For this the FlinkKafkaConsumer handles the offsets manually (no auto-commit).
* No, the Flink Kafka Consumer does only commit offsets back to Kafka on a best-effort basis after every checkpoint. Internally Flink "commits" the checkpoints as part of its periodic checkpoints.
* Yes, along with all other events between the last checkpoint and the failure.
* It will continue from the last checkpoint.

Hope this helps.

Cheers,

Konstantin

On Fri, Jul 5, 2019 at 8:37 PM John Smith <[hidden email]> wrote:
Hi using Apache Flink 1.8.0

I'm consuming events from Kafka using nothing fancy...

Properties props = new Properties();
props.setProperty("bootstrap.servers", kafkaAddress);
props.setProperty("group.id",kafkaGroup);

FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>(topic, new SimpleStringSchema(),props);

Do some JSON transforms and then push to my SQL database using JDBC and stored procedure. Let's assume the SQL sink fails.

We know that Kafka can either periodically commit offsets or it can be done manually based on consumers logic.

- How is the source Kafka consumer offsets handled?
- Does the Flink Kafka consumer commit the offset to per event/record?
- Will that single event that failed be retried?
- So if we had 5 incoming events and say on the 3rd one it failed, will it continue on the 3rd or will the job restart and try those 5 events.




--

Konstantin Knauf | Solutions Architect

+49 160 91394525


Planned Absences: 10.08.2019 - 31.08.2019, 05.09. - 06.09.2010


--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen

Reply | Threaded
Open this post in threaded view
|

Re: How are kafka consumer offsets handled if sink fails?

Konstantin Knauf-2
Hi John,

this depends on your checkpoint interval. When enabled checkpoints are triggered periodically [1].

Cheers,

Konstantin




On Tue, Jul 9, 2019 at 7:30 PM John Smith <[hidden email]> wrote:
Ok so just to be clear. Let's say we started at day 0...

1- Producer inserted 10 records into Kafka.
2- Kafka Flink Consumer consumed 5 records.
3- Some transformations applied to those records.
4- 4 records sinked, 1 failed.
5- Flink Job restarts because of above failure.

When does the checkpoint happen above?
And does it mean in the above case that it will start back at 0 or will it start at the 4th record and continue or where ever the checkpoint happend. Example 3rd record?
My stored proc will be idempotent and I understand if messages get replayed what to do.
Just want to try to understand when and where the checkpointing will happen. 

On Mon, 8 Jul 2019 at 22:23, Rong Rong <[hidden email]> wrote:
Hi John,

I think what Konstantin is trying to say is: Flink's Kafka consumer does not start consuming from the Kafka commit offset when starting the consumer, it would actually start with the offset that's last checkpointed to external DFS. (e.g. the starting point of the consumer has no relevance with Kafka committed offset whatsoever - if checkpoint is enabled.)

This is to quote: 
"the Flink Kafka Consumer does only commit offsets back to Kafka on a best-effort basis after every checkpoint. Internally Flink "commits" the [checkpoints]->[current Kafka offset] as part of its periodic checkpoints."

However if you do not enable checkpointing, I think your consumer will by-default restart from the default kafka offset (which I think is your committed group offset).

--
Rong


On Mon, Jul 8, 2019 at 6:39 AM John Smith <[hidden email]> wrote:
So when we say a sink is at least once. It's because internally it's not checking any kind of state and it sends what it has regardless, correct? Cause I willl build a sink that calls stored procedures.

On Sun., Jul. 7, 2019, 4:03 p.m. Konstantin Knauf, <[hidden email]> wrote:
Hi John,

in case of a failure (e.g. in the SQL Sink) the Flink Job will be restarted from the last checkpoint. This means the offset of all Kafka partitions will be reset to that point in the stream along with state of all operators. To enable checkpointing you need to call StreamExecutionEnvironment#enableCheckpointing(). If you using the JDBCSinkFunction (which is an at-least-once sink), the output will be duplicated in the case of failures.

To answer your questions:

* For this the FlinkKafkaConsumer handles the offsets manually (no auto-commit).
* No, the Flink Kafka Consumer does only commit offsets back to Kafka on a best-effort basis after every checkpoint. Internally Flink "commits" the checkpoints as part of its periodic checkpoints.
* Yes, along with all other events between the last checkpoint and the failure.
* It will continue from the last checkpoint.

Hope this helps.

Cheers,

Konstantin

On Fri, Jul 5, 2019 at 8:37 PM John Smith <[hidden email]> wrote:
Hi using Apache Flink 1.8.0

I'm consuming events from Kafka using nothing fancy...

Properties props = new Properties();
props.setProperty("bootstrap.servers", kafkaAddress);
props.setProperty("group.id",kafkaGroup);

FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>(topic, new SimpleStringSchema(),props);

Do some JSON transforms and then push to my SQL database using JDBC and stored procedure. Let's assume the SQL sink fails.

We know that Kafka can either periodically commit offsets or it can be done manually based on consumers logic.

- How is the source Kafka consumer offsets handled?
- Does the Flink Kafka consumer commit the offset to per event/record?
- Will that single event that failed be retried?
- So if we had 5 incoming events and say on the 3rd one it failed, will it continue on the 3rd or will the job restart and try those 5 events.




--

Konstantin Knauf | Solutions Architect

+49 160 91394525


Planned Absences: 10.08.2019 - 31.08.2019, 05.09. - 06.09.2010


--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen



--

Konstantin Knauf | Solutions Architect

+49 160 91394525


Planned Absences: 10.08.2019 - 31.08.2019, 05.09. - 06.09.2010


--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen

Reply | Threaded
Open this post in threaded view
|

Re: How are kafka consumer offsets handled if sink fails?

John Smith
Ok so when the sink fails on the 5th record then there's no chance that the checkpoint can be at 6th event right?

On Tue, 9 Jul 2019 at 13:51, Konstantin Knauf <[hidden email]> wrote:
Hi John,

this depends on your checkpoint interval. When enabled checkpoints are triggered periodically [1].

Cheers,

Konstantin




On Tue, Jul 9, 2019 at 7:30 PM John Smith <[hidden email]> wrote:
Ok so just to be clear. Let's say we started at day 0...

1- Producer inserted 10 records into Kafka.
2- Kafka Flink Consumer consumed 5 records.
3- Some transformations applied to those records.
4- 4 records sinked, 1 failed.
5- Flink Job restarts because of above failure.

When does the checkpoint happen above?
And does it mean in the above case that it will start back at 0 or will it start at the 4th record and continue or where ever the checkpoint happend. Example 3rd record?
My stored proc will be idempotent and I understand if messages get replayed what to do.
Just want to try to understand when and where the checkpointing will happen. 

On Mon, 8 Jul 2019 at 22:23, Rong Rong <[hidden email]> wrote:
Hi John,

I think what Konstantin is trying to say is: Flink's Kafka consumer does not start consuming from the Kafka commit offset when starting the consumer, it would actually start with the offset that's last checkpointed to external DFS. (e.g. the starting point of the consumer has no relevance with Kafka committed offset whatsoever - if checkpoint is enabled.)

This is to quote: 
"the Flink Kafka Consumer does only commit offsets back to Kafka on a best-effort basis after every checkpoint. Internally Flink "commits" the [checkpoints]->[current Kafka offset] as part of its periodic checkpoints."

However if you do not enable checkpointing, I think your consumer will by-default restart from the default kafka offset (which I think is your committed group offset).

--
Rong


On Mon, Jul 8, 2019 at 6:39 AM John Smith <[hidden email]> wrote:
So when we say a sink is at least once. It's because internally it's not checking any kind of state and it sends what it has regardless, correct? Cause I willl build a sink that calls stored procedures.

On Sun., Jul. 7, 2019, 4:03 p.m. Konstantin Knauf, <[hidden email]> wrote:
Hi John,

in case of a failure (e.g. in the SQL Sink) the Flink Job will be restarted from the last checkpoint. This means the offset of all Kafka partitions will be reset to that point in the stream along with state of all operators. To enable checkpointing you need to call StreamExecutionEnvironment#enableCheckpointing(). If you using the JDBCSinkFunction (which is an at-least-once sink), the output will be duplicated in the case of failures.

To answer your questions:

* For this the FlinkKafkaConsumer handles the offsets manually (no auto-commit).
* No, the Flink Kafka Consumer does only commit offsets back to Kafka on a best-effort basis after every checkpoint. Internally Flink "commits" the checkpoints as part of its periodic checkpoints.
* Yes, along with all other events between the last checkpoint and the failure.
* It will continue from the last checkpoint.

Hope this helps.

Cheers,

Konstantin

On Fri, Jul 5, 2019 at 8:37 PM John Smith <[hidden email]> wrote:
Hi using Apache Flink 1.8.0

I'm consuming events from Kafka using nothing fancy...

Properties props = new Properties();
props.setProperty("bootstrap.servers", kafkaAddress);
props.setProperty("group.id",kafkaGroup);

FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>(topic, new SimpleStringSchema(),props);

Do some JSON transforms and then push to my SQL database using JDBC and stored procedure. Let's assume the SQL sink fails.

We know that Kafka can either periodically commit offsets or it can be done manually based on consumers logic.

- How is the source Kafka consumer offsets handled?
- Does the Flink Kafka consumer commit the offset to per event/record?
- Will that single event that failed be retried?
- So if we had 5 incoming events and say on the 3rd one it failed, will it continue on the 3rd or will the job restart and try those 5 events.




--

Konstantin Knauf | Solutions Architect

+49 160 91394525


Planned Absences: 10.08.2019 - 31.08.2019, 05.09. - 06.09.2010


--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen



--

Konstantin Knauf | Solutions Architect

+49 160 91394525


Planned Absences: 10.08.2019 - 31.08.2019, 05.09. - 06.09.2010


--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen

Reply | Threaded
Open this post in threaded view
|

Re: How are kafka consumer offsets handled if sink fails?

Fabian Hueske-2
Hi John,

let's say Flink performed a checkpoint after the 2nd record (by injecting a checkpoint marker into the data flow) and the sink fails on the 5th record.
When Flink restarts the application, it resets the offset after the 2nd record (it will read the 3rd record first). Hence, the 3rd and 4th record will be emitted again.

Best, Fabian


Am Di., 9. Juli 2019 um 21:11 Uhr schrieb John Smith <[hidden email]>:
Ok so when the sink fails on the 5th record then there's no chance that the checkpoint can be at 6th event right?

On Tue, 9 Jul 2019 at 13:51, Konstantin Knauf <[hidden email]> wrote:
Hi John,

this depends on your checkpoint interval. When enabled checkpoints are triggered periodically [1].

Cheers,

Konstantin




On Tue, Jul 9, 2019 at 7:30 PM John Smith <[hidden email]> wrote:
Ok so just to be clear. Let's say we started at day 0...

1- Producer inserted 10 records into Kafka.
2- Kafka Flink Consumer consumed 5 records.
3- Some transformations applied to those records.
4- 4 records sinked, 1 failed.
5- Flink Job restarts because of above failure.

When does the checkpoint happen above?
And does it mean in the above case that it will start back at 0 or will it start at the 4th record and continue or where ever the checkpoint happend. Example 3rd record?
My stored proc will be idempotent and I understand if messages get replayed what to do.
Just want to try to understand when and where the checkpointing will happen. 

On Mon, 8 Jul 2019 at 22:23, Rong Rong <[hidden email]> wrote:
Hi John,

I think what Konstantin is trying to say is: Flink's Kafka consumer does not start consuming from the Kafka commit offset when starting the consumer, it would actually start with the offset that's last checkpointed to external DFS. (e.g. the starting point of the consumer has no relevance with Kafka committed offset whatsoever - if checkpoint is enabled.)

This is to quote: 
"the Flink Kafka Consumer does only commit offsets back to Kafka on a best-effort basis after every checkpoint. Internally Flink "commits" the [checkpoints]->[current Kafka offset] as part of its periodic checkpoints."

However if you do not enable checkpointing, I think your consumer will by-default restart from the default kafka offset (which I think is your committed group offset).

--
Rong


On Mon, Jul 8, 2019 at 6:39 AM John Smith <[hidden email]> wrote:
So when we say a sink is at least once. It's because internally it's not checking any kind of state and it sends what it has regardless, correct? Cause I willl build a sink that calls stored procedures.

On Sun., Jul. 7, 2019, 4:03 p.m. Konstantin Knauf, <[hidden email]> wrote:
Hi John,

in case of a failure (e.g. in the SQL Sink) the Flink Job will be restarted from the last checkpoint. This means the offset of all Kafka partitions will be reset to that point in the stream along with state of all operators. To enable checkpointing you need to call StreamExecutionEnvironment#enableCheckpointing(). If you using the JDBCSinkFunction (which is an at-least-once sink), the output will be duplicated in the case of failures.

To answer your questions:

* For this the FlinkKafkaConsumer handles the offsets manually (no auto-commit).
* No, the Flink Kafka Consumer does only commit offsets back to Kafka on a best-effort basis after every checkpoint. Internally Flink "commits" the checkpoints as part of its periodic checkpoints.
* Yes, along with all other events between the last checkpoint and the failure.
* It will continue from the last checkpoint.

Hope this helps.

Cheers,

Konstantin

On Fri, Jul 5, 2019 at 8:37 PM John Smith <[hidden email]> wrote:
Hi using Apache Flink 1.8.0

I'm consuming events from Kafka using nothing fancy...

Properties props = new Properties();
props.setProperty("bootstrap.servers", kafkaAddress);
props.setProperty("group.id",kafkaGroup);

FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>(topic, new SimpleStringSchema(),props);

Do some JSON transforms and then push to my SQL database using JDBC and stored procedure. Let's assume the SQL sink fails.

We know that Kafka can either periodically commit offsets or it can be done manually based on consumers logic.

- How is the source Kafka consumer offsets handled?
- Does the Flink Kafka consumer commit the offset to per event/record?
- Will that single event that failed be retried?
- So if we had 5 incoming events and say on the 3rd one it failed, will it continue on the 3rd or will the job restart and try those 5 events.




--

Konstantin Knauf | Solutions Architect

+49 160 91394525


Planned Absences: 10.08.2019 - 31.08.2019, 05.09. - 06.09.2010


--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen



--

Konstantin Knauf | Solutions Architect

+49 160 91394525


Planned Absences: 10.08.2019 - 31.08.2019, 05.09. - 06.09.2010


--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen

Reply | Threaded
Open this post in threaded view
|

Re: How are kafka consumer offsets handled if sink fails?

John Smith
Ok cool. I will try to make my stored proc idempotent. So there no chance that there's a checkpoint happens after the 5th record and the 5th record is missed?

On Thu, 11 Jul 2019 at 05:20, Fabian Hueske <[hidden email]> wrote:
Hi John,

let's say Flink performed a checkpoint after the 2nd record (by injecting a checkpoint marker into the data flow) and the sink fails on the 5th record.
When Flink restarts the application, it resets the offset after the 2nd record (it will read the 3rd record first). Hence, the 3rd and 4th record will be emitted again.

Best, Fabian


Am Di., 9. Juli 2019 um 21:11 Uhr schrieb John Smith <[hidden email]>:
Ok so when the sink fails on the 5th record then there's no chance that the checkpoint can be at 6th event right?

On Tue, 9 Jul 2019 at 13:51, Konstantin Knauf <[hidden email]> wrote:
Hi John,

this depends on your checkpoint interval. When enabled checkpoints are triggered periodically [1].

Cheers,

Konstantin




On Tue, Jul 9, 2019 at 7:30 PM John Smith <[hidden email]> wrote:
Ok so just to be clear. Let's say we started at day 0...

1- Producer inserted 10 records into Kafka.
2- Kafka Flink Consumer consumed 5 records.
3- Some transformations applied to those records.
4- 4 records sinked, 1 failed.
5- Flink Job restarts because of above failure.

When does the checkpoint happen above?
And does it mean in the above case that it will start back at 0 or will it start at the 4th record and continue or where ever the checkpoint happend. Example 3rd record?
My stored proc will be idempotent and I understand if messages get replayed what to do.
Just want to try to understand when and where the checkpointing will happen. 

On Mon, 8 Jul 2019 at 22:23, Rong Rong <[hidden email]> wrote:
Hi John,

I think what Konstantin is trying to say is: Flink's Kafka consumer does not start consuming from the Kafka commit offset when starting the consumer, it would actually start with the offset that's last checkpointed to external DFS. (e.g. the starting point of the consumer has no relevance with Kafka committed offset whatsoever - if checkpoint is enabled.)

This is to quote: 
"the Flink Kafka Consumer does only commit offsets back to Kafka on a best-effort basis after every checkpoint. Internally Flink "commits" the [checkpoints]->[current Kafka offset] as part of its periodic checkpoints."

However if you do not enable checkpointing, I think your consumer will by-default restart from the default kafka offset (which I think is your committed group offset).

--
Rong


On Mon, Jul 8, 2019 at 6:39 AM John Smith <[hidden email]> wrote:
So when we say a sink is at least once. It's because internally it's not checking any kind of state and it sends what it has regardless, correct? Cause I willl build a sink that calls stored procedures.

On Sun., Jul. 7, 2019, 4:03 p.m. Konstantin Knauf, <[hidden email]> wrote:
Hi John,

in case of a failure (e.g. in the SQL Sink) the Flink Job will be restarted from the last checkpoint. This means the offset of all Kafka partitions will be reset to that point in the stream along with state of all operators. To enable checkpointing you need to call StreamExecutionEnvironment#enableCheckpointing(). If you using the JDBCSinkFunction (which is an at-least-once sink), the output will be duplicated in the case of failures.

To answer your questions:

* For this the FlinkKafkaConsumer handles the offsets manually (no auto-commit).
* No, the Flink Kafka Consumer does only commit offsets back to Kafka on a best-effort basis after every checkpoint. Internally Flink "commits" the checkpoints as part of its periodic checkpoints.
* Yes, along with all other events between the last checkpoint and the failure.
* It will continue from the last checkpoint.

Hope this helps.

Cheers,

Konstantin

On Fri, Jul 5, 2019 at 8:37 PM John Smith <[hidden email]> wrote:
Hi using Apache Flink 1.8.0

I'm consuming events from Kafka using nothing fancy...

Properties props = new Properties();
props.setProperty("bootstrap.servers", kafkaAddress);
props.setProperty("group.id",kafkaGroup);

FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>(topic, new SimpleStringSchema(),props);

Do some JSON transforms and then push to my SQL database using JDBC and stored procedure. Let's assume the SQL sink fails.

We know that Kafka can either periodically commit offsets or it can be done manually based on consumers logic.

- How is the source Kafka consumer offsets handled?
- Does the Flink Kafka consumer commit the offset to per event/record?
- Will that single event that failed be retried?
- So if we had 5 incoming events and say on the 3rd one it failed, will it continue on the 3rd or will the job restart and try those 5 events.




--

Konstantin Knauf | Solutions Architect

+49 160 91394525


Planned Absences: 10.08.2019 - 31.08.2019, 05.09. - 06.09.2010


--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen



--

Konstantin Knauf | Solutions Architect

+49 160 91394525


Planned Absences: 10.08.2019 - 31.08.2019, 05.09. - 06.09.2010


--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen