Question about the checkpoint mechanism in Flink.

classic Classic list List threaded Threaded
10 messages Options
Reply | Threaded
Open this post in threaded view
|

Question about the checkpoint mechanism in Flink.

Li Wang-2
Hi all,

I have a question regarding to the state checkpoint mechanism in Flink. I find the statement  "Once the last stream has received barrier n, the operator emits all pending outgoing records, and then emits snapshot n barriers itself” on the document https://ci.apache.org/projects/flink/flink-docs-master/internals/stream_checkpointing.html#exactly-once-vs-at-least-once.

Does this mean that to achieve exactly-once semantic, instead of sending tuples downstream immediately the operator buffers its outgoing tuples in a pending queue until the current snapshot is committed? If yes, will this introduce significant processing delay?

Thanks,
Li

Reply | Threaded
Open this post in threaded view
|

Re: Question about the checkpoint mechanism in Flink.

Till Rohrmann
Hi Li,

the statement refers to operators with multiple inputs (two in this case). With the current implementation you will indeed block one of the inputs after receiving a checkpoint barrier n until you've received the corresponding checkpoint barrier n on the other input as well. This is what we call checkpoint barrier alignment. If the processing time on both input paths is similar and thus there is no back pressure on any of the inputs, the alignment should not take too long. In case where one of the inputs is considerably slower than the other, you should an additional delay.

For single input operators, you don't have to align the checkpoint barriers.

The checkpoint barrier alignment is not strictly necessary, but it allows us to not having to store all in flight records from the second input which arrive between the checkpoint barrier on the first input and the corresponding barrier on the second input. We might change this implementation in the future, though.

Cheers,
Till

On Tue, Nov 1, 2016 at 8:05 AM, Li Wang <[hidden email]> wrote:
Hi all,

I have a question regarding to the state checkpoint mechanism in Flink. I find the statement  "Once the last stream has received barrier n, the operator emits all pending outgoing records, and then emits snapshot n barriers itself” on the document https://ci.apache.org/projects/flink/flink-docs-master/internals/stream_checkpointing.html#exactly-once-vs-at-least-once.

Does this mean that to achieve exactly-once semantic, instead of sending tuples downstream immediately the operator buffers its outgoing tuples in a pending queue until the current snapshot is committed? If yes, will this introduce significant processing delay?

Thanks,
Li


Reply | Threaded
Open this post in threaded view
|

Re: Question about the checkpoint mechanism in Flink.

Li Wang-2

Hi Till,

Thanks for your prompt reply. I understand that input streams should be aligned such that a consistent state snapshot can be generated. In my opinion, that statement indicates that an operator will buffer its output tuples until the snapshot is committed. I am wondering if my understand to that very statement is right. If yes, why an operator should pend its output tuples? Is that for replaying output tuples during the state recovery of an downstream operator?

Thanks and regards,
Li


Sent from my iPhone

On 1 Nov 2016, at 8:56 PM, Till Rohrmann <[hidden email]> wrote:

Hi Li,

the statement refers to operators with multiple inputs (two in this case). With the current implementation you will indeed block one of the inputs after receiving a checkpoint barrier n until you've received the corresponding checkpoint barrier n on the other input as well. This is what we call checkpoint barrier alignment. If the processing time on both input paths is similar and thus there is no back pressure on any of the inputs, the alignment should not take too long. In case where one of the inputs is considerably slower than the other, you should an additional delay.

For single input operators, you don't have to align the checkpoint barriers.

The checkpoint barrier alignment is not strictly necessary, but it allows us to not having to store all in flight records from the second input which arrive between the checkpoint barrier on the first input and the corresponding barrier on the second input. We might change this implementation in the future, though.

Cheers,
Till

On Tue, Nov 1, 2016 at 8:05 AM, Li Wang <[hidden email]> wrote:
Hi all,

I have a question regarding to the state checkpoint mechanism in Flink. I find the statement  "Once the last stream has received barrier n, the operator emits all pending outgoing records, and then emits snapshot n barriers itself” on the document https://ci.apache.org/projects/flink/flink-docs-master/internals/stream_checkpointing.html#exactly-once-vs-at-least-once.

Does this mean that to achieve exactly-once semantic, instead of sending tuples downstream immediately the operator buffers its outgoing tuples in a pending queue until the current snapshot is committed? If yes, will this introduce significant processing delay?

Thanks,
Li


Reply | Threaded
Open this post in threaded view
|

Re: Question about the checkpoint mechanism in Flink.

Renjie Liu
In reply to this post by Li Wang-2

Essentially you are right, but the snapshot commit process is asynchronous. That's what you have to pay for exactly once semantics.


Li Wang <[hidden email]>于2016年11月1日周二 下午3:05写道:
Hi all,

I have a question regarding to the state checkpoint mechanism in Flink. I find the statement  "Once the last stream has received barrier n, the operator emits all pending outgoing records, and then emits snapshot n barriers itself” on the document https://ci.apache.org/projects/flink/flink-docs-master/internals/stream_checkpointing.html#exactly-once-vs-at-least-once.

Does this mean that to achieve exactly-once semantic, instead of sending tuples downstream immediately the operator buffers its outgoing tuples in a pending queue until the current snapshot is committed? If yes, will this introduce significant processing delay?

Thanks,
Li

--
Liu, Renjie
Software Engineer, MVAD
Reply | Threaded
Open this post in threaded view
|

Re: Question about the checkpoint mechanism in Flink.

Renjie Liu
Sorry the incorrect reply, please ignore this.

On Tue, Nov 1, 2016 at 8:47 PM Renjie Liu <[hidden email]> wrote:

Essentially you are right, but the snapshot commit process is asynchronous. That's what you have to pay for exactly once semantics.


Li Wang <[hidden email]>于2016年11月1日周二 下午3:05写道:
Hi all,

I have a question regarding to the state checkpoint mechanism in Flink. I find the statement  "Once the last stream has received barrier n, the operator emits all pending outgoing records, and then emits snapshot n barriers itself” on the document https://ci.apache.org/projects/flink/flink-docs-master/internals/stream_checkpointing.html#exactly-once-vs-at-least-once.

Does this mean that to achieve exactly-once semantic, instead of sending tuples downstream immediately the operator buffers its outgoing tuples in a pending queue until the current snapshot is committed? If yes, will this introduce significant processing delay?

Thanks,
Li

--
Liu, Renjie
Software Engineer, MVAD
--
Liu, Renjie
Software Engineer, MVAD
Reply | Threaded
Open this post in threaded view
|

Re: Question about the checkpoint mechanism in Flink.

Renjie Liu
In reply to this post by Till Rohrmann
Hi, Till:
By operator with multiple inputs, do you mean inputs from multiple subtasks? 

On Tue, Nov 1, 2016 at 8:56 PM Till Rohrmann <[hidden email]> wrote:
Hi Li,

the statement refers to operators with multiple inputs (two in this case). With the current implementation you will indeed block one of the inputs after receiving a checkpoint barrier n until you've received the corresponding checkpoint barrier n on the other input as well. This is what we call checkpoint barrier alignment. If the processing time on both input paths is similar and thus there is no back pressure on any of the inputs, the alignment should not take too long. In case where one of the inputs is considerably slower than the other, you should an additional delay.

For single input operators, you don't have to align the checkpoint barriers.

The checkpoint barrier alignment is not strictly necessary, but it allows us to not having to store all in flight records from the second input which arrive between the checkpoint barrier on the first input and the corresponding barrier on the second input. We might change this implementation in the future, though.

Cheers,
Till

On Tue, Nov 1, 2016 at 8:05 AM, Li Wang <[hidden email]> wrote:
Hi all,

I have a question regarding to the state checkpoint mechanism in Flink. I find the statement  "Once the last stream has received barrier n, the operator emits all pending outgoing records, and then emits snapshot n barriers itself” on the document https://ci.apache.org/projects/flink/flink-docs-master/internals/stream_checkpointing.html#exactly-once-vs-at-least-once.

Does this mean that to achieve exactly-once semantic, instead of sending tuples downstream immediately the operator buffers its outgoing tuples in a pending queue until the current snapshot is committed? If yes, will this introduce significant processing delay?

Thanks,
Li


--
Liu, Renjie
Software Engineer, MVAD
Reply | Threaded
Open this post in threaded view
|

Re: Question about the checkpoint mechanism in Flink.

Till Rohrmann
The tuples are not buffered until the snapshot is globally complete (a snapshot is globally complete iff all operators have successfully taken a snapshot). They are only buffered until the corresponding checkpoint barrier on the second input is received. Once this is the case, the checkpoint barrier will directly be send to the downstream operators. Next a snapshot is taken. Depending on the state backend this can happen asynchronously or synchronously. After this is done, the operator continues processing elements (for the first input, the buffered elements are consumed first).

With multiple inputs I referred to a coFlatMap operator or a join operator which have both two inputs.

Cheers,
Till

On Tue, Nov 1, 2016 at 3:29 PM, Renjie Liu <[hidden email]> wrote:
Hi, Till:
By operator with multiple inputs, do you mean inputs from multiple subtasks? 

On Tue, Nov 1, 2016 at 8:56 PM Till Rohrmann <[hidden email]> wrote:
Hi Li,

the statement refers to operators with multiple inputs (two in this case). With the current implementation you will indeed block one of the inputs after receiving a checkpoint barrier n until you've received the corresponding checkpoint barrier n on the other input as well. This is what we call checkpoint barrier alignment. If the processing time on both input paths is similar and thus there is no back pressure on any of the inputs, the alignment should not take too long. In case where one of the inputs is considerably slower than the other, you should an additional delay.

For single input operators, you don't have to align the checkpoint barriers.

The checkpoint barrier alignment is not strictly necessary, but it allows us to not having to store all in flight records from the second input which arrive between the checkpoint barrier on the first input and the corresponding barrier on the second input. We might change this implementation in the future, though.

Cheers,
Till

On Tue, Nov 1, 2016 at 8:05 AM, Li Wang <[hidden email]> wrote:
Hi all,

I have a question regarding to the state checkpoint mechanism in Flink. I find the statement  "Once the last stream has received barrier n, the operator emits all pending outgoing records, and then emits snapshot n barriers itself” on the document https://ci.apache.org/projects/flink/flink-docs-master/internals/stream_checkpointing.html#exactly-once-vs-at-least-once.

Does this mean that to achieve exactly-once semantic, instead of sending tuples downstream immediately the operator buffers its outgoing tuples in a pending queue until the current snapshot is committed? If yes, will this introduce significant processing delay?

Thanks,
Li


--
Liu, Renjie
Software Engineer, MVAD

Reply | Threaded
Open this post in threaded view
|

Re: Question about the checkpoint mechanism in Flink.

Renjie Liu

Hi, Till:
I think the multiple input should include the more general case where redistribution happens between subtasks, right? Since in this case we also need to align check barrier.


Till Rohrmann <[hidden email]>于2016年11月1日周二 下午11:05写道:
The tuples are not buffered until the snapshot is globally complete (a snapshot is globally complete iff all operators have successfully taken a snapshot). They are only buffered until the corresponding checkpoint barrier on the second input is received. Once this is the case, the checkpoint barrier will directly be send to the downstream operators. Next a snapshot is taken. Depending on the state backend this can happen asynchronously or synchronously. After this is done, the operator continues processing elements (for the first input, the buffered elements are consumed first).

With multiple inputs I referred to a coFlatMap operator or a join operator which have both two inputs.

Cheers,
Till

On Tue, Nov 1, 2016 at 3:29 PM, Renjie Liu <[hidden email]> wrote:
Hi, Till:
By operator with multiple inputs, do you mean inputs from multiple subtasks? 

On Tue, Nov 1, 2016 at 8:56 PM Till Rohrmann <[hidden email]> wrote:
Hi Li,

the statement refers to operators with multiple inputs (two in this case). With the current implementation you will indeed block one of the inputs after receiving a checkpoint barrier n until you've received the corresponding checkpoint barrier n on the other input as well. This is what we call checkpoint barrier alignment. If the processing time on both input paths is similar and thus there is no back pressure on any of the inputs, the alignment should not take too long. In case where one of the inputs is considerably slower than the other, you should an additional delay.

For single input operators, you don't have to align the checkpoint barriers.

The checkpoint barrier alignment is not strictly necessary, but it allows us to not having to store all in flight records from the second input which arrive between the checkpoint barrier on the first input and the corresponding barrier on the second input. We might change this implementation in the future, though.

Cheers,
Till

On Tue, Nov 1, 2016 at 8:05 AM, Li Wang <[hidden email]> wrote:
Hi all,

I have a question regarding to the state checkpoint mechanism in Flink. I find the statement  "Once the last stream has received barrier n, the operator emits all pending outgoing records, and then emits snapshot n barriers itself” on the document https://ci.apache.org/projects/flink/flink-docs-master/internals/stream_checkpointing.html#exactly-once-vs-at-least-once.

Does this mean that to achieve exactly-once semantic, instead of sending tuples downstream immediately the operator buffers its outgoing tuples in a pending queue until the current snapshot is committed? If yes, will this introduce significant processing delay?

Thanks,
Li


--
Liu, Renjie
Software Engineer, MVAD

--
Liu, Renjie
Software Engineer, MVAD
Reply | Threaded
Open this post in threaded view
|

Re: Question about the checkpoint mechanism in Flink.

Till Rohrmann
Yes you're right. Whenever you have multiple input channels which could also be the case if you do a repartitioning between two mappers.

On Tue, Nov 1, 2016 at 11:48 PM, Renjie Liu <[hidden email]> wrote:

Hi, Till:
I think the multiple input should include the more general case where redistribution happens between subtasks, right? Since in this case we also need to align check barrier.


Till Rohrmann <[hidden email]>于2016年11月1日周二 下午11:05写道:
The tuples are not buffered until the snapshot is globally complete (a snapshot is globally complete iff all operators have successfully taken a snapshot). They are only buffered until the corresponding checkpoint barrier on the second input is received. Once this is the case, the checkpoint barrier will directly be send to the downstream operators. Next a snapshot is taken. Depending on the state backend this can happen asynchronously or synchronously. After this is done, the operator continues processing elements (for the first input, the buffered elements are consumed first).

With multiple inputs I referred to a coFlatMap operator or a join operator which have both two inputs.

Cheers,
Till

On Tue, Nov 1, 2016 at 3:29 PM, Renjie Liu <[hidden email]> wrote:
Hi, Till:
By operator with multiple inputs, do you mean inputs from multiple subtasks? 

On Tue, Nov 1, 2016 at 8:56 PM Till Rohrmann <[hidden email]> wrote:
Hi Li,

the statement refers to operators with multiple inputs (two in this case). With the current implementation you will indeed block one of the inputs after receiving a checkpoint barrier n until you've received the corresponding checkpoint barrier n on the other input as well. This is what we call checkpoint barrier alignment. If the processing time on both input paths is similar and thus there is no back pressure on any of the inputs, the alignment should not take too long. In case where one of the inputs is considerably slower than the other, you should an additional delay.

For single input operators, you don't have to align the checkpoint barriers.

The checkpoint barrier alignment is not strictly necessary, but it allows us to not having to store all in flight records from the second input which arrive between the checkpoint barrier on the first input and the corresponding barrier on the second input. We might change this implementation in the future, though.

Cheers,
Till

On Tue, Nov 1, 2016 at 8:05 AM, Li Wang <[hidden email]> wrote:
Hi all,

I have a question regarding to the state checkpoint mechanism in Flink. I find the statement  "Once the last stream has received barrier n, the operator emits all pending outgoing records, and then emits snapshot n barriers itself” on the document https://ci.apache.org/projects/flink/flink-docs-master/internals/stream_checkpointing.html#exactly-once-vs-at-least-once.

Does this mean that to achieve exactly-once semantic, instead of sending tuples downstream immediately the operator buffers its outgoing tuples in a pending queue until the current snapshot is committed? If yes, will this introduce significant processing delay?

Thanks,
Li


--
Liu, Renjie
Software Engineer, MVAD

--
Liu, Renjie
Software Engineer, MVAD

Reply | Threaded
Open this post in threaded view
|

Re: Question about the checkpoint mechanism in Flink.

Renjie Liu
Thanks for the reply.

On Wed, Nov 2, 2016 at 5:19 PM Till Rohrmann <[hidden email]> wrote:
Yes you're right. Whenever you have multiple input channels which could also be the case if you do a repartitioning between two mappers.

On Tue, Nov 1, 2016 at 11:48 PM, Renjie Liu <[hidden email]> wrote:

Hi, Till:
I think the multiple input should include the more general case where redistribution happens between subtasks, right? Since in this case we also need to align check barrier.


Till Rohrmann <[hidden email]>于2016年11月1日周二 下午11:05写道:
The tuples are not buffered until the snapshot is globally complete (a snapshot is globally complete iff all operators have successfully taken a snapshot). They are only buffered until the corresponding checkpoint barrier on the second input is received. Once this is the case, the checkpoint barrier will directly be send to the downstream operators. Next a snapshot is taken. Depending on the state backend this can happen asynchronously or synchronously. After this is done, the operator continues processing elements (for the first input, the buffered elements are consumed first).

With multiple inputs I referred to a coFlatMap operator or a join operator which have both two inputs.

Cheers,
Till

On Tue, Nov 1, 2016 at 3:29 PM, Renjie Liu <[hidden email]> wrote:
Hi, Till:
By operator with multiple inputs, do you mean inputs from multiple subtasks? 

On Tue, Nov 1, 2016 at 8:56 PM Till Rohrmann <[hidden email]> wrote:
Hi Li,

the statement refers to operators with multiple inputs (two in this case). With the current implementation you will indeed block one of the inputs after receiving a checkpoint barrier n until you've received the corresponding checkpoint barrier n on the other input as well. This is what we call checkpoint barrier alignment. If the processing time on both input paths is similar and thus there is no back pressure on any of the inputs, the alignment should not take too long. In case where one of the inputs is considerably slower than the other, you should an additional delay.

For single input operators, you don't have to align the checkpoint barriers.

The checkpoint barrier alignment is not strictly necessary, but it allows us to not having to store all in flight records from the second input which arrive between the checkpoint barrier on the first input and the corresponding barrier on the second input. We might change this implementation in the future, though.

Cheers,
Till

On Tue, Nov 1, 2016 at 8:05 AM, Li Wang <[hidden email]> wrote:
Hi all,

I have a question regarding to the state checkpoint mechanism in Flink. I find the statement  "Once the last stream has received barrier n, the operator emits all pending outgoing records, and then emits snapshot n barriers itself” on the document https://ci.apache.org/projects/flink/flink-docs-master/internals/stream_checkpointing.html#exactly-once-vs-at-least-once.

Does this mean that to achieve exactly-once semantic, instead of sending tuples downstream immediately the operator buffers its outgoing tuples in a pending queue until the current snapshot is committed? If yes, will this introduce significant processing delay?

Thanks,
Li


--
Liu, Renjie
Software Engineer, MVAD

--
Liu, Renjie
Software Engineer, MVAD

--
Liu, Renjie
Software Engineer, MVAD