Unaligned Checkpoint and Exactly Once

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

Unaligned Checkpoint and Exactly Once

Lu Weizheng
Hi there,

The new feature in Flink 1.11 will provide us the Unaligned Checkpoint which means a operator subtask does not need to wait all the Checkpoint barrier and will not block some channels. As the Checkpoint barrier is the key mechanism for Exactly Once guarantee, I am not sure Unaligned Checkpoint could still achieve Exactly Once guarantee or only AT Least Once?

<a href="https://cwiki.apache.org/confluence/display/FLINK/FLIP-76%3A&#43;Unaligned&#43;Checkpoints" title="https://cwiki.apache.org/confluence/display/FLINK/FLIP-76%3A&#43;Unaligned&#43;Checkpoints">FLIP-76 :
Unaligned checkpoints will initially be an optional feature. After collecting experience and implementing all necessary extensions, unaligned checkpoint will probably be enabled by default for exactly once.

What's more, in the following two configs,

Config 1
env.getCheckpointConfig().enableUnalignedCheckpoints();

Config 2
checkpointCfg.setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE);

Does Config 2 use a even simpler way for Checkpoint than Unaligned Checkpoint?

Hope for replies!

Weizheng
Reply | Threaded
Open this post in threaded view
|

Re: Unaligned Checkpoint and Exactly Once

Zhijiang(wangzhijiang999)
Hi Weizheng,

The unaligned checkpoint (UC) only supports exactly-once mode in Flink 1.11 except savepoint mode. The savepoint is probably used in job rescaling
scenario and we plan to support it in future release version. Of course UC can satisfy exactly-once semantic as promised. 

Regarding the config issue, i am not sure I get your point here. The first config is for describing whether the current setting mode (actually only exactly-once) enables UC or not, and the second config is for setting the different mode (exactly-once or at least-once). I guess you refer to merge them by using the first config form. But somehow they seem two different dimensions for config the checkpoint. One is for the semantic of data processing guarantee. And the other is for how we realize two different mechanisms to guarantee one (exactly-once) of the semantics. 
 

Best,
Zhijiang

------------------------------------------------------------------
From:Lu Weizheng <[hidden email]>
Send Time:2020年6月22日(星期一) 07:20
Subject:Unaligned Checkpoint and Exactly Once

Hi there,

The new feature in Flink 1.11 will provide us the Unaligned Checkpoint which means a operator subtask does not need to wait all the Checkpoint barrier and will not block some channels. As the Checkpoint barrier is the key mechanism for Exactly Once guarantee, I am not sure Unaligned Checkpoint could still achieve Exactly Once guarantee or only AT Least Once?

Unaligned checkpoints will initially be an optional feature. After collecting experience and implementing all necessary extensions, unaligned checkpoint will probably be enabled by default for exactly once.

What's more, in the following two configs,

Config 1
env.getCheckpointConfig().enableUnalignedCheckpoints();

Config 2
checkpointCfg.setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE);

Does Config 2 use a even simpler way for Checkpoint than Unaligned Checkpoint?

Hope for replies!

Weizheng

Reply | Threaded
Open this post in threaded view
|

回复: Unaligned Checkpoint and Exactly Once

Lu Weizheng
Thank you Zhijiang! The second question about config is just because I find a method in InputProcessorUtil. I guess AT_LEAST_ONCE  mode is a simpler way to handle checkpont barrier?

private static CheckpointBarrierHandler createCheckpointBarrierHandler(
StreamConfig config,
InputGate[] inputGates,
SubtaskCheckpointCoordinator checkpointCoordinator,
String taskName,
AbstractInvokable toNotifyOnCheckpoint) {
switch (config.getCheckpointMode()) {
case EXACTLY_ONCE:
if (config.isUnalignedCheckpointsEnabled()) {
return new AlternatingCheckpointBarrierHandler(
new CheckpointBarrierAligner(taskName, toNotifyOnCheckpoint, inputGates),
new CheckpointBarrierUnaligner(checkpointCoordinator, taskName, toNotifyOnCheckpoint, inputGates),
toNotifyOnCheckpoint);
}
return new CheckpointBarrierAligner(taskName, toNotifyOnCheckpoint, inputGates);
case AT_LEAST_ONCE:
int numInputChannels = Arrays.stream(inputGates).mapToInt(InputGate::getNumberOfInputChannels).sum();
return new CheckpointBarrierTracker(numInputChannels, toNotifyOnCheckpoint);
default:
throw new UnsupportedOperationException("Unrecognized Checkpointing Mode: " + config.getCheckpointMode());
}
}


发件人: Zhijiang <[hidden email]>
发送时间: 2020年6月22日 10:41
收件人: Lu Weizheng <[hidden email]>; [hidden email] <[hidden email]>
主题: Re: Unaligned Checkpoint and Exactly Once
 
Hi Weizheng,

The unaligned checkpoint (UC) only supports exactly-once mode in Flink 1.11 except savepoint mode. The savepoint is probably used in job rescaling
scenario and we plan to support it in future release version. Of course UC can satisfy exactly-once semantic as promised. 

Regarding the config issue, i am not sure I get your point here. The first config is for describing whether the current setting mode (actually only exactly-once) enables UC or not, and the second config is for setting the different mode (exactly-once or at least-once). I guess you refer to merge them by using the first config form. But somehow they seem two different dimensions for config the checkpoint. One is for the semantic of data processing guarantee. And the other is for how we realize two different mechanisms to guarantee one (exactly-once) of the semantics. 
 

Best,
Zhijiang

------------------------------------------------------------------
From:Lu Weizheng <[hidden email]>
Send Time:2020年6月22日(星期一) 07:20
Subject:Unaligned Checkpoint and Exactly Once

Hi there,

The new feature in Flink 1.11 will provide us the Unaligned Checkpoint which means a operator subtask does not need to wait all the Checkpoint barrier and will not block some channels. As the Checkpoint barrier is the key mechanism for Exactly Once guarantee, I am not sure Unaligned Checkpoint could still achieve Exactly Once guarantee or only AT Least Once?

<a href="https://cwiki.apache.org/confluence/display/FLINK/FLIP-76%3A&#43;Unaligned&#43;Checkpoints" target="_blank" title="https://cwiki.apache.org/confluence/display/FLINK/FLIP-76%3A&#43;Unaligned&#43;Checkpoints">FLIP-76 :
Unaligned checkpoints will initially be an optional feature. After collecting experience and implementing all necessary extensions, unaligned checkpoint will probably be enabled by default for exactly once.

What's more, in the following two configs,

Config 1
env.getCheckpointConfig().enableUnalignedCheckpoints();

Config 2
checkpointCfg.setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE);

Does Config 2 use a even simpler way for Checkpoint than Unaligned Checkpoint?

Hope for replies!

Weizheng

Reply | Threaded
Open this post in threaded view
|

Re: Unaligned Checkpoint and Exactly Once

Zhijiang(wangzhijiang999)
From implementation or logic complication perspective, the AT_LEAST_ONCE is somehow simpler compared with EXACTLY_ONCE w/o unaligned, since 
it can always process data without blocking any channels. 

------------------------------------------------------------------
From:Lu Weizheng <[hidden email]>
Send Time:2020年6月22日(星期一) 10:53
Subject:回复: Unaligned Checkpoint and Exactly Once

Thank you Zhijiang! The second question about config is just because I find a method in InputProcessorUtil. I guess AT_LEAST_ONCE  mode is a simpler way to handle checkpont barrier?

private static CheckpointBarrierHandler createCheckpointBarrierHandler(
StreamConfig config,
InputGate[] inputGates,
SubtaskCheckpointCoordinator checkpointCoordinator,
String taskName,
AbstractInvokable toNotifyOnCheckpoint) {
switch (config.getCheckpointMode()) {
case EXACTLY_ONCE:
if (config.isUnalignedCheckpointsEnabled()) {
return new AlternatingCheckpointBarrierHandler(
new CheckpointBarrierAligner(taskName, toNotifyOnCheckpoint, inputGates),
new CheckpointBarrierUnaligner(checkpointCoordinator, taskName, toNotifyOnCheckpoint, inputGates),
toNotifyOnCheckpoint);
}
return new CheckpointBarrierAligner(taskName, toNotifyOnCheckpoint, inputGates);
case AT_LEAST_ONCE:
int numInputChannels = Arrays.stream(inputGates).mapToInt(InputGate::getNumberOfInputChannels).sum();
return new CheckpointBarrierTracker(numInputChannels, toNotifyOnCheckpoint);
default:
throw new UnsupportedOperationException("Unrecognized Checkpointing Mode: " + config.getCheckpointMode());
}
}


发件人: Zhijiang <[hidden email]>
发送时间: 2020年6月22日 10:41
收件人: Lu Weizheng <[hidden email]>; [hidden email] <[hidden email]>
主题: Re: Unaligned Checkpoint and Exactly Once
 
Hi Weizheng,

The unaligned checkpoint (UC) only supports exactly-once mode in Flink 1.11 except savepoint mode. The savepoint is probably used in job rescaling
scenario and we plan to support it in future release version. Of course UC can satisfy exactly-once semantic as promised. 

Regarding the config issue, i am not sure I get your point here. The first config is for describing whether the current setting mode (actually only exactly-once) enables UC or not, and the second config is for setting the different mode (exactly-once or at least-once). I guess you refer to merge them by using the first config form. But somehow they seem two different dimensions for config the checkpoint. One is for the semantic of data processing guarantee. And the other is for how we realize two different mechanisms to guarantee one (exactly-once) of the semantics. 
 

Best,
Zhijiang

------------------------------------------------------------------
From:Lu Weizheng <[hidden email]>
Send Time:2020年6月22日(星期一) 07:20
Subject:Unaligned Checkpoint and Exactly Once

Hi there,

The new feature in Flink 1.11 will provide us the Unaligned Checkpoint which means a operator subtask does not need to wait all the Checkpoint barrier and will not block some channels. As the Checkpoint barrier is the key mechanism for Exactly Once guarantee, I am not sure Unaligned Checkpoint could still achieve Exactly Once guarantee or only AT Least Once?

Unaligned checkpoints will initially be an optional feature. After collecting experience and implementing all necessary extensions, unaligned checkpoint will probably be enabled by default for exactly once.

What's more, in the following two configs,

Config 1
env.getCheckpointConfig().enableUnalignedCheckpoints();

Config 2
checkpointCfg.setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE);

Does Config 2 use a even simpler way for Checkpoint than Unaligned Checkpoint?

Hope for replies!

Weizheng


Reply | Threaded
Open this post in threaded view
|

Re: Unaligned Checkpoint and Exactly Once

Arvid Heise-3
Hi Lu,

Thank you for your interest in unaligned checkpoints!

I just published some PRs that will warn you if you set both unaligned checkpoints and AT_LEAST_ONCE. It's indeed not possible or even meaningful to use them at the same time. AT_LEAST_ONCE has no alignment phase, so it's faster than both EXACTLY_ONCE options (aligned and unaligned).



On Mon, Jun 22, 2020 at 5:20 AM Zhijiang <[hidden email]> wrote:
From implementation or logic complication perspective, the AT_LEAST_ONCE is somehow simpler compared with EXACTLY_ONCE w/o unaligned, since 
it can always process data without blocking any channels. 

------------------------------------------------------------------
From:Lu Weizheng <[hidden email]>
Send Time:2020年6月22日(星期一) 10:53
Subject:回复: Unaligned Checkpoint and Exactly Once

Thank you Zhijiang! The second question about config is just because I find a method in InputProcessorUtil. I guess AT_LEAST_ONCE  mode is a simpler way to handle checkpont barrier?

private static CheckpointBarrierHandler createCheckpointBarrierHandler(
StreamConfig config,
InputGate[] inputGates,
SubtaskCheckpointCoordinator checkpointCoordinator,
String taskName,
AbstractInvokable toNotifyOnCheckpoint) {
switch (config.getCheckpointMode()) {
case EXACTLY_ONCE:
if (config.isUnalignedCheckpointsEnabled()) {
return new AlternatingCheckpointBarrierHandler(
new CheckpointBarrierAligner(taskName, toNotifyOnCheckpoint, inputGates),
new CheckpointBarrierUnaligner(checkpointCoordinator, taskName, toNotifyOnCheckpoint, inputGates),
toNotifyOnCheckpoint);
}
return new CheckpointBarrierAligner(taskName, toNotifyOnCheckpoint, inputGates);
case AT_LEAST_ONCE:
int numInputChannels = Arrays.stream(inputGates).mapToInt(InputGate::getNumberOfInputChannels).sum();
return new CheckpointBarrierTracker(numInputChannels, toNotifyOnCheckpoint);
default:
throw new UnsupportedOperationException("Unrecognized Checkpointing Mode: " + config.getCheckpointMode());
}
}


发件人: Zhijiang <[hidden email]>
发送时间: 2020年6月22日 10:41
收件人: Lu Weizheng <[hidden email]>; [hidden email] <[hidden email]>
主题: Re: Unaligned Checkpoint and Exactly Once
 
Hi Weizheng,

The unaligned checkpoint (UC) only supports exactly-once mode in Flink 1.11 except savepoint mode. The savepoint is probably used in job rescaling
scenario and we plan to support it in future release version. Of course UC can satisfy exactly-once semantic as promised. 

Regarding the config issue, i am not sure I get your point here. The first config is for describing whether the current setting mode (actually only exactly-once) enables UC or not, and the second config is for setting the different mode (exactly-once or at least-once). I guess you refer to merge them by using the first config form. But somehow they seem two different dimensions for config the checkpoint. One is for the semantic of data processing guarantee. And the other is for how we realize two different mechanisms to guarantee one (exactly-once) of the semantics. 
 

Best,
Zhijiang

------------------------------------------------------------------
From:Lu Weizheng <[hidden email]>
Send Time:2020年6月22日(星期一) 07:20
Subject:Unaligned Checkpoint and Exactly Once

Hi there,

The new feature in Flink 1.11 will provide us the Unaligned Checkpoint which means a operator subtask does not need to wait all the Checkpoint barrier and will not block some channels. As the Checkpoint barrier is the key mechanism for Exactly Once guarantee, I am not sure Unaligned Checkpoint could still achieve Exactly Once guarantee or only AT Least Once?

Unaligned checkpoints will initially be an optional feature. After collecting experience and implementing all necessary extensions, unaligned checkpoint will probably be enabled by default for exactly once.

What's more, in the following two configs,

Config 1
env.getCheckpointConfig().enableUnalignedCheckpoints();

Config 2
checkpointCfg.setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE);

Does Config 2 use a even simpler way for Checkpoint than Unaligned Checkpoint?

Hope for replies!

Weizheng




--

Arvid Heise | Senior Java Developer


Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng