Flink Checkpoint for Stateless Operators

classic Classic list List threaded Threaded
6 messages Options
Reply | Threaded
Open this post in threaded view
|

Flink Checkpoint for Stateless Operators

raghav280392
Hi Team

Assume that we have a job (Checkpoint enabled) with Kafka source and a stateless operator which consumes events from Kafka source.  
We have a stream of records 1, 2, 3, 4, 5,...Let's say the event 1 reaches the Flat Map operator and is being processed. Then the Kafka source has made a successful checkpoint.
In this case, does the offset of event 1 will be part of the checkpoint? Will Flink track the event from source to all downstream operators? If this is a true case and If the processing of the event is failed (any third party API/DB failure) in the Flat Map after a successful checkpoint, do we need to manually re-process (retry using queue or any other business logic) the event? 

Job:
Kafka Source -> Flat Map

Thank you

--
Raghavendar T S

Virus-free. www.avast.com
Reply | Threaded
Open this post in threaded view
|

Re: Flink Checkpoint for Stateless Operators

Roman Khachatryan
Hi Raghavendar,

In Flink, checkpoints are global, meaning that a checkpoint is successful only if all operators acknowledge it. So the offset will be stored in state and then committed to Kafka [1] only after all the tasks acknowledge that checkpoint. At that moment, the element must be either emitted to the external system, stored in the operator state (e.g. window); or in channel state (with Unaligned checkpoints).

So you don't need to re-process it manually (given that the sink provides exactly once guarantee).


Regards,
Roman


On Thu, Apr 29, 2021 at 12:53 PM Raghavendar T S <[hidden email]> wrote:
Hi Team

Assume that we have a job (Checkpoint enabled) with Kafka source and a stateless operator which consumes events from Kafka source.  
We have a stream of records 1, 2, 3, 4, 5,...Let's say the event 1 reaches the Flat Map operator and is being processed. Then the Kafka source has made a successful checkpoint.
In this case, does the offset of event 1 will be part of the checkpoint? Will Flink track the event from source to all downstream operators? If this is a true case and If the processing of the event is failed (any third party API/DB failure) in the Flat Map after a successful checkpoint, do we need to manually re-process (retry using queue or any other business logic) the event? 

Job:
Kafka Source -> Flat Map

Thank you

--
Raghavendar T S

Virus-free. www.avast.com
Reply | Threaded
Open this post in threaded view
|

Re: Flink Checkpoint for Stateless Operators

raghav280392
Hi Roman

In general, how Flink tracks the events from source to downstream operators? We usually emit existing events from an operator or create a new instance of a class and emit it. How does Flink or Flink source know whether the events are which snapshot?

> So you don't need to re-process it manually (given that the sink provides exactly once guarantee).
In my example, I have only 1 Source and 1 Flat Map. Do you mean to say that we need to use a sink instead of a flat map?

Thank you


Virus-free. www.avast.com

On Thu, Apr 29, 2021 at 6:13 PM Roman Khachatryan <[hidden email]> wrote:
Hi Raghavendar,

In Flink, checkpoints are global, meaning that a checkpoint is successful only if all operators acknowledge it. So the offset will be stored in state and then committed to Kafka [1] only after all the tasks acknowledge that checkpoint. At that moment, the element must be either emitted to the external system, stored in the operator state (e.g. window); or in channel state (with Unaligned checkpoints).

So you don't need to re-process it manually (given that the sink provides exactly once guarantee).


Regards,
Roman


On Thu, Apr 29, 2021 at 12:53 PM Raghavendar T S <[hidden email]> wrote:
Hi Team

Assume that we have a job (Checkpoint enabled) with Kafka source and a stateless operator which consumes events from Kafka source.  
We have a stream of records 1, 2, 3, 4, 5,...Let's say the event 1 reaches the Flat Map operator and is being processed. Then the Kafka source has made a successful checkpoint.
In this case, does the offset of event 1 will be part of the checkpoint? Will Flink track the event from source to all downstream operators? If this is a true case and If the processing of the event is failed (any third party API/DB failure) in the Flat Map after a successful checkpoint, do we need to manually re-process (retry using queue or any other business logic) the event? 

Job:
Kafka Source -> Flat Map

Thank you

--
Raghavendar T S

Virus-free. www.avast.com


--
Raghavendar T S
Reply | Threaded
Open this post in threaded view
|

Re: Flink Checkpoint for Stateless Operators

Roman Khachatryan
Flink uses checkpoint barriers that are sent through along the same channels as data. Events are included into the checkpoint if they precede the corresponding barrier (or the RPC call for sources). [1] is the algorithm description and [2] is about integration with Kafka.

> In my example, I have only 1 Source and 1 Flat Map. Do you mean to say that we need to use a sink instead of a flat map?
I'm not sure I understand the use case. What do you do with the results of Flat Map?


Regards,
Roman


On Thu, Apr 29, 2021 at 3:16 PM Raghavendar T S <[hidden email]> wrote:
Hi Roman

In general, how Flink tracks the events from source to downstream operators? We usually emit existing events from an operator or create a new instance of a class and emit it. How does Flink or Flink source know whether the events are which snapshot?

> So you don't need to re-process it manually (given that the sink provides exactly once guarantee).
In my example, I have only 1 Source and 1 Flat Map. Do you mean to say that we need to use a sink instead of a flat map?

Thank you


Virus-free. www.avast.com

On Thu, Apr 29, 2021 at 6:13 PM Roman Khachatryan <[hidden email]> wrote:
Hi Raghavendar,

In Flink, checkpoints are global, meaning that a checkpoint is successful only if all operators acknowledge it. So the offset will be stored in state and then committed to Kafka [1] only after all the tasks acknowledge that checkpoint. At that moment, the element must be either emitted to the external system, stored in the operator state (e.g. window); or in channel state (with Unaligned checkpoints).

So you don't need to re-process it manually (given that the sink provides exactly once guarantee).


Regards,
Roman


On Thu, Apr 29, 2021 at 12:53 PM Raghavendar T S <[hidden email]> wrote:
Hi Team

Assume that we have a job (Checkpoint enabled) with Kafka source and a stateless operator which consumes events from Kafka source.  
We have a stream of records 1, 2, 3, 4, 5,...Let's say the event 1 reaches the Flat Map operator and is being processed. Then the Kafka source has made a successful checkpoint.
In this case, does the offset of event 1 will be part of the checkpoint? Will Flink track the event from source to all downstream operators? If this is a true case and If the processing of the event is failed (any third party API/DB failure) in the Flat Map after a successful checkpoint, do we need to manually re-process (retry using queue or any other business logic) the event? 

Job:
Kafka Source -> Flat Map

Thank you

--
Raghavendar T S

Virus-free. www.avast.com


--
Raghavendar T S
Reply | Threaded
Open this post in threaded view
|

Re: Flink Checkpoint for Stateless Operators

raghav280392
Hi Roman

I am just doing write operations from the flat map. Does it matter If I use a flap map or sink for this purpose?

Thank you

Virus-free. www.avast.com

On Thu, Apr 29, 2021 at 9:10 PM Roman Khachatryan <[hidden email]> wrote:
Flink uses checkpoint barriers that are sent through along the same channels as data. Events are included into the checkpoint if they precede the corresponding barrier (or the RPC call for sources). [1] is the algorithm description and [2] is about integration with Kafka.

> In my example, I have only 1 Source and 1 Flat Map. Do you mean to say that we need to use a sink instead of a flat map?
I'm not sure I understand the use case. What do you do with the results of Flat Map?


Regards,
Roman


On Thu, Apr 29, 2021 at 3:16 PM Raghavendar T S <[hidden email]> wrote:
Hi Roman

In general, how Flink tracks the events from source to downstream operators? We usually emit existing events from an operator or create a new instance of a class and emit it. How does Flink or Flink source know whether the events are which snapshot?

> So you don't need to re-process it manually (given that the sink provides exactly once guarantee).
In my example, I have only 1 Source and 1 Flat Map. Do you mean to say that we need to use a sink instead of a flat map?

Thank you


Virus-free. www.avast.com

On Thu, Apr 29, 2021 at 6:13 PM Roman Khachatryan <[hidden email]> wrote:
Hi Raghavendar,

In Flink, checkpoints are global, meaning that a checkpoint is successful only if all operators acknowledge it. So the offset will be stored in state and then committed to Kafka [1] only after all the tasks acknowledge that checkpoint. At that moment, the element must be either emitted to the external system, stored in the operator state (e.g. window); or in channel state (with Unaligned checkpoints).

So you don't need to re-process it manually (given that the sink provides exactly once guarantee).


Regards,
Roman


On Thu, Apr 29, 2021 at 12:53 PM Raghavendar T S <[hidden email]> wrote:
Hi Team

Assume that we have a job (Checkpoint enabled) with Kafka source and a stateless operator which consumes events from Kafka source.  
We have a stream of records 1, 2, 3, 4, 5,...Let's say the event 1 reaches the Flat Map operator and is being processed. Then the Kafka source has made a successful checkpoint.
In this case, does the offset of event 1 will be part of the checkpoint? Will Flink track the event from source to all downstream operators? If this is a true case and If the processing of the event is failed (any third party API/DB failure) in the Flat Map after a successful checkpoint, do we need to manually re-process (retry using queue or any other business logic) the event? 

Job:
Kafka Source -> Flat Map

Thank you

--
Raghavendar T S

Virus-free. www.avast.com


--
Raghavendar T S


--
Raghavendar T S
Reply | Threaded
Open this post in threaded view
|

Re: Flink Checkpoint for Stateless Operators

Roman Khachatryan
Hi Raghavendar,

It sounds like you don't actually have flatMap logic, in which case you should use a sink instead of a flatMap.
And probably one of the existing ones, as some of them already provide exactly-once guarantee [1].

Regards,
Roman


On Thu, Apr 29, 2021 at 5:55 PM Raghavendar T S <[hidden email]> wrote:
Hi Roman

I am just doing write operations from the flat map. Does it matter If I use a flap map or sink for this purpose?

Thank you

Virus-free. www.avast.com

On Thu, Apr 29, 2021 at 9:10 PM Roman Khachatryan <[hidden email]> wrote:
Flink uses checkpoint barriers that are sent through along the same channels as data. Events are included into the checkpoint if they precede the corresponding barrier (or the RPC call for sources). [1] is the algorithm description and [2] is about integration with Kafka.

> In my example, I have only 1 Source and 1 Flat Map. Do you mean to say that we need to use a sink instead of a flat map?
I'm not sure I understand the use case. What do you do with the results of Flat Map?


Regards,
Roman


On Thu, Apr 29, 2021 at 3:16 PM Raghavendar T S <[hidden email]> wrote:
Hi Roman

In general, how Flink tracks the events from source to downstream operators? We usually emit existing events from an operator or create a new instance of a class and emit it. How does Flink or Flink source know whether the events are which snapshot?

> So you don't need to re-process it manually (given that the sink provides exactly once guarantee).
In my example, I have only 1 Source and 1 Flat Map. Do you mean to say that we need to use a sink instead of a flat map?

Thank you


Virus-free. www.avast.com

On Thu, Apr 29, 2021 at 6:13 PM Roman Khachatryan <[hidden email]> wrote:
Hi Raghavendar,

In Flink, checkpoints are global, meaning that a checkpoint is successful only if all operators acknowledge it. So the offset will be stored in state and then committed to Kafka [1] only after all the tasks acknowledge that checkpoint. At that moment, the element must be either emitted to the external system, stored in the operator state (e.g. window); or in channel state (with Unaligned checkpoints).

So you don't need to re-process it manually (given that the sink provides exactly once guarantee).


Regards,
Roman


On Thu, Apr 29, 2021 at 12:53 PM Raghavendar T S <[hidden email]> wrote:
Hi Team

Assume that we have a job (Checkpoint enabled) with Kafka source and a stateless operator which consumes events from Kafka source.  
We have a stream of records 1, 2, 3, 4, 5,...Let's say the event 1 reaches the Flat Map operator and is being processed. Then the Kafka source has made a successful checkpoint.
In this case, does the offset of event 1 will be part of the checkpoint? Will Flink track the event from source to all downstream operators? If this is a true case and If the processing of the event is failed (any third party API/DB failure) in the Flat Map after a successful checkpoint, do we need to manually re-process (retry using queue or any other business logic) the event? 

Job:
Kafka Source -> Flat Map

Thank you

--
Raghavendar T S

Virus-free. www.avast.com


--
Raghavendar T S


--
Raghavendar T S