[DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

classic Classic list List threaded Threaded
24 messages Options
12
Reply | Threaded
Open this post in threaded view
|

[DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

Yun Gao
Hi, devs & users

As discussed in FLIP-131 [1], Flink will make DataStream the unified API for processing bounded and unbounded data in both streaming and blocking modes. However, one long-standing problem for the streaming mode is that currently Flink does not support checkpoints after some tasks finished, which causes some problems for bounded or mixed jobs:

  1. Flink exactly-once sinks rely on checkpoints to ensure data won’t be replayed before committed to external systems in streaming mode. If sources are bounded and checkpoints are disabled after some tasks are finished, the data sent after the last checkpoint would always not be able to be committed. This issue has already been reported some times in the user ML[2][3][4] and is future brought up when working on FLIP-143: Unified Sink API [5]. 
  2. The jobs with both bounded and unbounded sources might have to replay a large amount of records after failover due to no periodic checkpoints are taken after the bounded sources finished.

Therefore, we propose to also support checkpoints after some tasks finished. Your Could find more details in FLIP-147[6]. 

Best,

Yun


[1] https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=158866741

[2] https://lists.apache.org/thread.html/rea1ac2d82f646fcea1395b5738be495f144c5b0312a290a1d4a339c1%40%3Cuser.flink.apache.org%3E

[3] https://lists.apache.org/thread.html/rad4adeec838093b8b56ae9e2ea6a937a4b1882b53045a12acb7e61ea%40%3Cuser.flink.apache.org%3E

[4] https://lists.apache.org/thread.html/4cf28a9fa3732dfdd9e673da6233c5288ca80b20d58cee130bf1c141%40%3Cuser.flink.apache.org%3E

[5] https://cwiki.apache.org/confluence/display/FLINK/FLIP-143%3A+Unified+Sink+API

[6] https://cwiki.apache.org/confluence/display/FLINK/FLIP-147%3A+Support+Checkpoints+After+Tasks+Finished

Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

Yun Gao
Hi, devs & users

Very sorry for the spoiled formats, I resent the discussion as follows.

As discussed in FLIP-131[1], Flink will make DataStream the unified API for processing bounded and unbounded data in both streaming and blocking modes. However, one long-standing problem for the streaming mode is that currently Flink does not support checkpoints after some tasks finished, which causes some problems for bounded or mixed jobs:
        1. Flink exactly-once sinks rely on checkpoints to ensure data won’t be replayed before committed to external systems in streaming mode. If sources are bounded and checkpoints are disabled after some tasks are finished, the data sent after the last checkpoint would always not be able to be committed. This issue has already been reported some times in the user ML[2][3][4] and is future brought up when working on FLIP-143: Unified Sink API [5]. 
        2. The jobs with both bounded and unbounded sources might have to replay a large amount of records after failover due to no periodic checkpoints are taken after the bounded sources finished.

Therefore, we propose to also support checkpoints after some tasks finished. Your Could find more details in FLIP-147[6]. 

Best,
Yun

[1] https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=158866741
[2] https://lists.apache.org/thread.html/rea1ac2d82f646fcea1395b5738be495f144c5b0312a290a1d4a339c1%40%3Cuser.flink.apache.org%3E
[3] https://lists.apache.org/thread.html/rad4adeec838093b8b56ae9e2ea6a937a4b1882b53045a12acb7e61ea%40%3Cuser.flink.apache.org%3E
[4] https://lists.apache.org/thread.html/4cf28a9fa3732dfdd9e673da6233c5288ca80b20d58cee130bf1c141%40%3Cuser.flink.apache.org%3E
[5] https://cwiki.apache.org/confluence/display/FLINK/FLIP-143%3A+Unified+Sink+API
[6] https://cwiki.apache.org/confluence/display/FLINK/FLIP-147%3A+Support+Checkpoints+After+Tasks+Finished
------------------Original Mail ------------------
Sender:Yun Gao <[hidden email]>
Send Date:Fri Oct 9 14:16:52 2020
Recipients:Flink Dev <[hidden email]>, User-Flink <[hidden email]>
Subject:[DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished
Hi, devs & users

As discussed in FLIP-131 [1], Flink will make DataStream the unified API for processing bounded and unbounded data in both streaming and blocking modes. However, one long-standing problem for the streaming mode is that currently Flink does not support checkpoints after some tasks finished, which causes some problems for bounded or mixed jobs:
Flink exactly-once sinks rely on checkpoints to ensure data won’t be replayed before committed to external systems in streaming mode. If sources are bounded and checkpoints are disabled after some tasks are finished, the data sent after the last checkpoint would always not be able to be committed. This issue has already been reported some times in the user ML[2][3][4] and is future brought up when working on FLIP-143: Unified Sink API [5]. 
The jobs with both bounded and unbounded sources might have to replay a large amount of records after failover due to no periodic checkpoints are taken after the bounded sources finished.
Therefore, we propose to also support checkpoints after some tasks finished. Your Could find more details in FLIP-147[6]. 
Best,
Yun

[1] https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=158866741
[2] https://lists.apache.org/thread.html/rea1ac2d82f646fcea1395b5738be495f144c5b0312a290a1d4a339c1%40%3Cuser.flink.apache.org%3E
[3] https://lists.apache.org/thread.html/rad4adeec838093b8b56ae9e2ea6a937a4b1882b53045a12acb7e61ea%40%3Cuser.flink.apache.org%3E
[4] https://lists.apache.org/thread.html/4cf28a9fa3732dfdd9e673da6233c5288ca80b20d58cee130bf1c141%40%3Cuser.flink.apache.org%3E
[5] https://cwiki.apache.org/confluence/display/FLINK/FLIP-143%3A+Unified+Sink+API
[6] https://cwiki.apache.org/confluence/display/FLINK/FLIP-147%3A+Support+Checkpoints+After+Tasks+Finished
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

Arvid Heise-3
Hi Yun,

Thank you for starting the discussion. This will solve one of the long-standing issues [1] that confuse users. I'm also a big fan of option 3. It is also a bit closer to Chandy-Lamport again.

A couple of comments:

1) You call the tasks that get the barriers injected leaf nodes, which would make the sinks the root nodes. That is very similar to how graphs in relational algebra are labeled. However, I got the feeling that in Flink, we rather iterate from sources to sink, making the sources root nodes and the sinks the leaf nodes. However, I have no clue how it's done in similar cases, so please take that hint cautiously.
2) I'd make the algorithm to find the subtasks iterative and react in CheckpointCoordinator. Let's assume that we inject the barrier at all root subtasks (initially all sources). So in the iterative algorithm, whenever root A finishes, it looks at all connected subtasks B if they have any upstream task left. If not B becomes a new root. That would require to only touch a part of the job graph, but would require some callback from JobManager to CheckpointCoordinator.
2b) We also need to be careful for out-of-sync updates: if the root is about to finish, we could send the barrier to it from CheckpointCoordinator, but at the time it arrives, the subtask is finished already.
3) An implied change is that checkpoints are not aborted anymore at EndOfPartition, which is good, but might be explicitly added.
4) The interaction between unaligned checkpoint and EndOfPartition is a bit ambiguous: What happens when an unaligned checkpoint is started and then one input channel contains the EndOfPartition event? From the written description, it sounds to me like, we move back to an aligned checkpoint for the whole receiving task. However, that is neither easily possible nor necessary. Imho it would be enough to also store the EndOfPartition in the channel state.
5) I'd expand the recovery section a bit. It would be the first time that we recover an incomplete DAG. Afaik the subtasks are deployed before the state is recovered, so at some point, the subtasks either need to be removed again or maybe we could even avoid them being created in the first place.


On Fri, Oct 9, 2020 at 8:22 AM Yun Gao <[hidden email]> wrote:
Hi, devs & users

Very sorry for the spoiled formats, I resent the discussion as follows.

As discussed in FLIP-131[1], Flink will make DataStream the unified API for processing bounded and unbounded data in both streaming and blocking modes. However, one long-standing problem for the streaming mode is that currently Flink does not support checkpoints after some tasks finished, which causes some problems for bounded or mixed jobs:
        1. Flink exactly-once sinks rely on checkpoints to ensure data won’t be replayed before committed to external systems in streaming mode. If sources are bounded and checkpoints are disabled after some tasks are finished, the data sent after the last checkpoint would always not be able to be committed. This issue has already been reported some times in the user ML[2][3][4] and is future brought up when working on FLIP-143: Unified Sink API [5]. 
        2. The jobs with both bounded and unbounded sources might have to replay a large amount of records after failover due to no periodic checkpoints are taken after the bounded sources finished.

Therefore, we propose to also support checkpoints after some tasks finished. Your Could find more details in FLIP-147[6]. 

Best,
Yun

------------------Original Mail ------------------
Sender:Yun Gao <[hidden email]>
Send Date:Fri Oct 9 14:16:52 2020
Recipients:Flink Dev <[hidden email]>, User-Flink <[hidden email]>
Subject:[DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished
Hi, devs & users

As discussed in FLIP-131 [1], Flink will make DataStream the unified API for processing bounded and unbounded data in both streaming and blocking modes. However, one long-standing problem for the streaming mode is that currently Flink does not support checkpoints after some tasks finished, which causes some problems for bounded or mixed jobs:
Flink exactly-once sinks rely on checkpoints to ensure data won’t be replayed before committed to external systems in streaming mode. If sources are bounded and checkpoints are disabled after some tasks are finished, the data sent after the last checkpoint would always not be able to be committed. This issue has already been reported some times in the user ML[2][3][4] and is future brought up when working on FLIP-143: Unified Sink API [5]. 
The jobs with both bounded and unbounded sources might have to replay a large amount of records after failover due to no periodic checkpoints are taken after the bounded sources finished.
Therefore, we propose to also support checkpoints after some tasks finished. Your Could find more details in FLIP-147[6]. 
Best,
Yun

[1] https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=158866741
[2] https://lists.apache.org/thread.html/rea1ac2d82f646fcea1395b5738be495f144c5b0312a290a1d4a339c1%40%3Cuser.flink.apache.org%3E
[3] https://lists.apache.org/thread.html/rad4adeec838093b8b56ae9e2ea6a937a4b1882b53045a12acb7e61ea%40%3Cuser.flink.apache.org%3E
[4] https://lists.apache.org/thread.html/4cf28a9fa3732dfdd9e673da6233c5288ca80b20d58cee130bf1c141%40%3Cuser.flink.apache.org%3E
[5] https://cwiki.apache.org/confluence/display/FLINK/FLIP-143%3A+Unified+Sink+API
[6] https://cwiki.apache.org/confluence/display/FLINK/FLIP-147%3A+Support+Checkpoints+After+Tasks+Finished


--

Arvid Heise | Senior Java Developer


Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng   
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

Yun Gao

Hi Arvid,

Very thanks for the insightful comments! I added the responses for this issue under the quota: 

>> 1) You call the tasks that get the barriers injected leaf nodes, which would make the > sinks the root nodes. That is very similar to how graphs in relational algebra are labeled. However, I got the feeling that in Flink, we rather iterate from sources to sink, making the sources root nodes and the sinks the leaf nodes. However, I have no clue how it's done in similar cases, so please take that hint cautiously.

>> 2) I'd make the algorithm to find the subtasks iterative and react in CheckpointCoordinator. Let's assume that we inject the barrier at all root subtasks (initially all sources). So in the iterative algorithm, whenever root A finishes, it looks at all connected subtasks B if they have any upstream task left. If not B becomes a new root. That would require to only touch a part of the job graph, but would require some callback from JobManager to CheckpointCoordinator.


I think I should have used a bad name of "leaf nodes", in fact I think we should have the same thoughts that we start with the source nodes to find all the nodes whose precedent nodes are all finished. It would be much better to call these nodes (which we would trigger) as "root nodes". I'll modify the FLIP to change the names to "root nodes".

>> 2b) We also need to be careful for out-of-sync updates: if the root is about to finish, we could send the barrier to it from CheckpointCoordinator, but at the time it arrives, the subtask is finished already.

Exactly. When the checkpoint triggers a task but found the task is not there, it may then further check if the task has been finished, if so, it should then re-check its descendants to see if there are new "root nodes" to trigger.

>> 3) An implied change is that checkpoints are not aborted anymore at EndOfPartition, which is good, but might be explicitly added.

Yes, currently barrier alignment would fail the current checkpoint on EndOfPartition, and we would modify the behavior.

>> 4) The interaction between unaligned checkpoint and EndOfPartition is a bit ambiguous: What happens when an unaligned checkpoint is started and then one input channel contains the EndOfPartition event? From the written description, it sounds to me like, we move back to an aligned checkpoint for the whole receiving task. However, that is neither easily possible nor necessary. Imho it would be enough to also store the EndOfPartition in the channel state.


Very thanks for the suggestions on this issue and in fact I did stuck on it for some time. Previously for me one implementation detail issue is that EndOfPartition seems not be able to overtake the previous buffers easily as CheckpointBarrier does, otherwise it might start destroying the input channels if all EndOfPartitions are received.

Therefore, although we could also persistent the channels with EndOfPartition:

1. Start persisting the channels when CheckpointUnaligner received barrier (if not all precendant tasks are finished) or received triggering (if all precendant tasks are finished).

2. The persisting actually stops when onBuffer received EndOfPartition.

After the last channel stopped persisting, CheckpointUnaligner still need to wait till all the previous buffers are processed before complete the allBarriersReceivedFuture. Therefore it would not be able to accelerate the checkpoint in this case.

After some rethinking today currently I think we might inserts some additional virtual events into receivedBuffer when received EndOfPartition and allows these virtual events to overtake the previous buffers. I'll try to double check if it is feasible and let me know if there are also other solutions on this issue :). 

> 5) I'd expand the recovery section a bit. It would be the first time that we recover an incomplete DAG. Afaik the subtasks are deployed before the state is recovered, so at some point, the subtasks either need to be removed again or maybe we could even avoid them being created in the first place.

I also agree that finally we should not "restarted" the finished tasks in some way. It seems not start it in the first place would be better. We should be able to bookkeep additional information in the checkpoint meta about which operators are fully finished, and the scheduler could restore the status of tasks on restoring from previous checkpoints. It would also requires some modification in the task side to support input channels that are finished on starting.

But in the first version, I think we might simplify this issue by still restart all the tasks, but let the finished sources to exit directly? The new Source API would terminate directly since there is no pending splits and the legacy sources would be dealt specially by skipped execution if the source operator is fully finished before. We would be able to turn to the final solution gradually in the next steps. 


Best,

Yun


------------------------------------------------------------------
From:Arvid Heise <[hidden email]>
Send Time:2020 Oct. 12 (Mon.) 15:38
To:Yun Gao <[hidden email]>
Cc:Flink Dev <[hidden email]>; User-Flink <[hidden email]>
Subject:Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

Hi Yun,

Thank you for starting the discussion. This will solve one of the long-standing issues [1] that confuse users. I'm also a big fan of option 3. It is also a bit closer to Chandy-Lamport again.

A couple of comments:

1) You call the tasks that get the barriers injected leaf nodes, which would make the sinks the root nodes. That is very similar to how graphs in relational algebra are labeled. However, I got the feeling that in Flink, we rather iterate from sources to sink, making the sources root nodes and the sinks the leaf nodes. However, I have no clue how it's done in similar cases, so please take that hint cautiously.
2) I'd make the algorithm to find the subtasks iterative and react in CheckpointCoordinator. Let's assume that we inject the barrier at all root subtasks (initially all sources). So in the iterative algorithm, whenever root A finishes, it looks at all connected subtasks B if they have any upstream task left. If not B becomes a new root. That would require to only touch a part of the job graph, but would require some callback from JobManager to CheckpointCoordinator.
2b) We also need to be careful for out-of-sync updates: if the root is about to finish, we could send the barrier to it from CheckpointCoordinator, but at the time it arrives, the subtask is finished already.
3) An implied change is that checkpoints are not aborted anymore at EndOfPartition, which is good, but might be explicitly added.
4) The interaction between unaligned checkpoint and EndOfPartition is a bit ambiguous: What happens when an unaligned checkpoint is started and then one input channel contains the EndOfPartition event? From the written description, it sounds to me like, we move back to an aligned checkpoint for the whole receiving task. However, that is neither easily possible nor necessary. Imho it would be enough to also store the EndOfPartition in the channel state.
5) I'd expand the recovery section a bit. It would be the first time that we recover an incomplete DAG. Afaik the subtasks are deployed before the state is recovered, so at some point, the subtasks either need to be removed again or maybe we could even avoid them being created in the first place.


On Fri, Oct 9, 2020 at 8:22 AM Yun Gao <[hidden email]> wrote:
Hi, devs & users

Very sorry for the spoiled formats, I resent the discussion as follows.

As discussed in FLIP-131[1], Flink will make DataStream the unified API for processing bounded and unbounded data in both streaming and blocking modes. However, one long-standing problem for the streaming mode is that currently Flink does not support checkpoints after some tasks finished, which causes some problems for bounded or mixed jobs:
        1. Flink exactly-once sinks rely on checkpoints to ensure data won’t be replayed before committed to external systems in streaming mode. If sources are bounded and checkpoints are disabled after some tasks are finished, the data sent after the last checkpoint would always not be able to be committed. This issue has already been reported some times in the user ML[2][3][4] and is future brought up when working on FLIP-143: Unified Sink API [5]. 
        2. The jobs with both bounded and unbounded sources might have to replay a large amount of records after failover due to no periodic checkpoints are taken after the bounded sources finished.

Therefore, we propose to also support checkpoints after some tasks finished. Your Could find more details in FLIP-147[6]. 

Best,
Yun

------------------Original Mail ------------------
Sender:Yun Gao <[hidden email]>
Send Date:Fri Oct 9 14:16:52 2020
Recipients:Flink Dev <[hidden email]>, User-Flink <[hidden email]>
Subject:[DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished
Hi, devs & users

As discussed in FLIP-131 [1], Flink will make DataStream the unified API for processing bounded and unbounded data in both streaming and blocking modes. However, one long-standing problem for the streaming mode is that currently Flink does not support checkpoints after some tasks finished, which causes some problems for bounded or mixed jobs:
Flink exactly-once sinks rely on checkpoints to ensure data won’t be replayed before committed to external systems in streaming mode. If sources are bounded and checkpoints are disabled after some tasks are finished, the data sent after the last checkpoint would always not be able to be committed. This issue has already been reported some times in the user ML[2][3][4] and is future brought up when working on FLIP-143: Unified Sink API [5]. 
The jobs with both bounded and unbounded sources might have to replay a large amount of records after failover due to no periodic checkpoints are taken after the bounded sources finished.
Therefore, we propose to also support checkpoints after some tasks finished. Your Could find more details in FLIP-147[6]. 
Best,
Yun

[1] https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=158866741
[2] https://lists.apache.org/thread.html/rea1ac2d82f646fcea1395b5738be495f144c5b0312a290a1d4a339c1%40%3Cuser.flink.apache.org%3E
[3] https://lists.apache.org/thread.html/rad4adeec838093b8b56ae9e2ea6a937a4b1882b53045a12acb7e61ea%40%3Cuser.flink.apache.org%3E
[4] https://lists.apache.org/thread.html/4cf28a9fa3732dfdd9e673da6233c5288ca80b20d58cee130bf1c141%40%3Cuser.flink.apache.org%3E
[5] https://cwiki.apache.org/confluence/display/FLINK/FLIP-143%3A+Unified+Sink+API
[6] https://cwiki.apache.org/confluence/display/FLINK/FLIP-147%3A+Support+Checkpoints+After+Tasks+Finished


--

Arvid Heise | Senior Java Developer


Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng   

Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

Arvid Heise-3
Hi Yun,

4) Yes, the interaction is not trivial and also I have not completely thought it through. But in general, I'm currently at the point where I think that we also need non-checkpoint related events in unaligned checkpoints. So just keep that in mind, that we might converge anyhow at this point.

In general, what is helping in this case is to remember that there no unaligned checkpoint barrier ever going to overtake EndOfPartition. So, we can completely ignore the problem on how to store and restore output buffers of a completed task (also important for the next point).

5) I think we are on the same page and I completely agree that for the MVP/first version, it's completely fine to start and immediately stop. A tad better would be even to not even start the procession loop.

On Mon, Oct 12, 2020 at 6:18 PM Yun Gao <[hidden email]> wrote:

Hi Arvid,

Very thanks for the insightful comments! I added the responses for this issue under the quota: 

>> 1) You call the tasks that get the barriers injected leaf nodes, which would make the > sinks the root nodes. That is very similar to how graphs in relational algebra are labeled. However, I got the feeling that in Flink, we rather iterate from sources to sink, making the sources root nodes and the sinks the leaf nodes. However, I have no clue how it's done in similar cases, so please take that hint cautiously.

>> 2) I'd make the algorithm to find the subtasks iterative and react in CheckpointCoordinator. Let's assume that we inject the barrier at all root subtasks (initially all sources). So in the iterative algorithm, whenever root A finishes, it looks at all connected subtasks B if they have any upstream task left. If not B becomes a new root. That would require to only touch a part of the job graph, but would require some callback from JobManager to CheckpointCoordinator.


I think I should have used a bad name of "leaf nodes", in fact I think we should have the same thoughts that we start with the source nodes to find all the nodes whose precedent nodes are all finished. It would be much better to call these nodes (which we would trigger) as "root nodes". I'll modify the FLIP to change the names to "root nodes".

>> 2b) We also need to be careful for out-of-sync updates: if the root is about to finish, we could send the barrier to it from CheckpointCoordinator, but at the time it arrives, the subtask is finished already.

Exactly. When the checkpoint triggers a task but found the task is not there, it may then further check if the task has been finished, if so, it should then re-check its descendants to see if there are new "root nodes" to trigger.

>> 3) An implied change is that checkpoints are not aborted anymore at EndOfPartition, which is good, but might be explicitly added.

Yes, currently barrier alignment would fail the current checkpoint on EndOfPartition, and we would modify the behavior.

>> 4) The interaction between unaligned checkpoint and EndOfPartition is a bit ambiguous: What happens when an unaligned checkpoint is started and then one input channel contains the EndOfPartition event? From the written description, it sounds to me like, we move back to an aligned checkpoint for the whole receiving task. However, that is neither easily possible nor necessary. Imho it would be enough to also store the EndOfPartition in the channel state.


Very thanks for the suggestions on this issue and in fact I did stuck on it for some time. Previously for me one implementation detail issue is that EndOfPartition seems not be able to overtake the previous buffers easily as CheckpointBarrier does, otherwise it might start destroying the input channels if all EndOfPartitions are received.

Therefore, although we could also persistent the channels with EndOfPartition:

1. Start persisting the channels when CheckpointUnaligner received barrier (if not all precendant tasks are finished) or received triggering (if all precendant tasks are finished).

2. The persisting actually stops when onBuffer received EndOfPartition.

After the last channel stopped persisting, CheckpointUnaligner still need to wait till all the previous buffers are processed before complete the allBarriersReceivedFuture. Therefore it would not be able to accelerate the checkpoint in this case.

After some rethinking today currently I think we might inserts some additional virtual events into receivedBuffer when received EndOfPartition and allows these virtual events to overtake the previous buffers. I'll try to double check if it is feasible and let me know if there are also other solutions on this issue :). 

> 5) I'd expand the recovery section a bit. It would be the first time that we recover an incomplete DAG. Afaik the subtasks are deployed before the state is recovered, so at some point, the subtasks either need to be removed again or maybe we could even avoid them being created in the first place.

I also agree that finally we should not "restarted" the finished tasks in some way. It seems not start it in the first place would be better. We should be able to bookkeep additional information in the checkpoint meta about which operators are fully finished, and the scheduler could restore the status of tasks on restoring from previous checkpoints. It would also requires some modification in the task side to support input channels that are finished on starting.

But in the first version, I think we might simplify this issue by still restart all the tasks, but let the finished sources to exit directly? The new Source API would terminate directly since there is no pending splits and the legacy sources would be dealt specially by skipped execution if the source operator is fully finished before. We would be able to turn to the final solution gradually in the next steps. 


Best,

Yun


------------------------------------------------------------------
From:Arvid Heise <[hidden email]>
Send Time:2020 Oct. 12 (Mon.) 15:38
To:Yun Gao <[hidden email]>
Cc:Flink Dev <[hidden email]>; User-Flink <[hidden email]>
Subject:Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

Hi Yun,

Thank you for starting the discussion. This will solve one of the long-standing issues [1] that confuse users. I'm also a big fan of option 3. It is also a bit closer to Chandy-Lamport again.

A couple of comments:

1) You call the tasks that get the barriers injected leaf nodes, which would make the sinks the root nodes. That is very similar to how graphs in relational algebra are labeled. However, I got the feeling that in Flink, we rather iterate from sources to sink, making the sources root nodes and the sinks the leaf nodes. However, I have no clue how it's done in similar cases, so please take that hint cautiously.
2) I'd make the algorithm to find the subtasks iterative and react in CheckpointCoordinator. Let's assume that we inject the barrier at all root subtasks (initially all sources). So in the iterative algorithm, whenever root A finishes, it looks at all connected subtasks B if they have any upstream task left. If not B becomes a new root. That would require to only touch a part of the job graph, but would require some callback from JobManager to CheckpointCoordinator.
2b) We also need to be careful for out-of-sync updates: if the root is about to finish, we could send the barrier to it from CheckpointCoordinator, but at the time it arrives, the subtask is finished already.
3) An implied change is that checkpoints are not aborted anymore at EndOfPartition, which is good, but might be explicitly added.
4) The interaction between unaligned checkpoint and EndOfPartition is a bit ambiguous: What happens when an unaligned checkpoint is started and then one input channel contains the EndOfPartition event? From the written description, it sounds to me like, we move back to an aligned checkpoint for the whole receiving task. However, that is neither easily possible nor necessary. Imho it would be enough to also store the EndOfPartition in the channel state.
5) I'd expand the recovery section a bit. It would be the first time that we recover an incomplete DAG. Afaik the subtasks are deployed before the state is recovered, so at some point, the subtasks either need to be removed again or maybe we could even avoid them being created in the first place.


On Fri, Oct 9, 2020 at 8:22 AM Yun Gao <[hidden email]> wrote:
Hi, devs & users

Very sorry for the spoiled formats, I resent the discussion as follows.

As discussed in FLIP-131[1], Flink will make DataStream the unified API for processing bounded and unbounded data in both streaming and blocking modes. However, one long-standing problem for the streaming mode is that currently Flink does not support checkpoints after some tasks finished, which causes some problems for bounded or mixed jobs:
        1. Flink exactly-once sinks rely on checkpoints to ensure data won’t be replayed before committed to external systems in streaming mode. If sources are bounded and checkpoints are disabled after some tasks are finished, the data sent after the last checkpoint would always not be able to be committed. This issue has already been reported some times in the user ML[2][3][4] and is future brought up when working on FLIP-143: Unified Sink API [5]. 
        2. The jobs with both bounded and unbounded sources might have to replay a large amount of records after failover due to no periodic checkpoints are taken after the bounded sources finished.

Therefore, we propose to also support checkpoints after some tasks finished. Your Could find more details in FLIP-147[6]. 

Best,
Yun

------------------Original Mail ------------------
Sender:Yun Gao <[hidden email]>
Send Date:Fri Oct 9 14:16:52 2020
Recipients:Flink Dev <[hidden email]>, User-Flink <[hidden email]>
Subject:[DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished
Hi, devs & users

As discussed in FLIP-131 [1], Flink will make DataStream the unified API for processing bounded and unbounded data in both streaming and blocking modes. However, one long-standing problem for the streaming mode is that currently Flink does not support checkpoints after some tasks finished, which causes some problems for bounded or mixed jobs:
Flink exactly-once sinks rely on checkpoints to ensure data won’t be replayed before committed to external systems in streaming mode. If sources are bounded and checkpoints are disabled after some tasks are finished, the data sent after the last checkpoint would always not be able to be committed. This issue has already been reported some times in the user ML[2][3][4] and is future brought up when working on FLIP-143: Unified Sink API [5]. 
The jobs with both bounded and unbounded sources might have to replay a large amount of records after failover due to no periodic checkpoints are taken after the bounded sources finished.
Therefore, we propose to also support checkpoints after some tasks finished. Your Could find more details in FLIP-147[6]. 
Best,
Yun

[1] https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=158866741
[2] https://lists.apache.org/thread.html/rea1ac2d82f646fcea1395b5738be495f144c5b0312a290a1d4a339c1%40%3Cuser.flink.apache.org%3E
[3] https://lists.apache.org/thread.html/rad4adeec838093b8b56ae9e2ea6a937a4b1882b53045a12acb7e61ea%40%3Cuser.flink.apache.org%3E
[4] https://lists.apache.org/thread.html/4cf28a9fa3732dfdd9e673da6233c5288ca80b20d58cee130bf1c141%40%3Cuser.flink.apache.org%3E
[5] https://cwiki.apache.org/confluence/display/FLINK/FLIP-143%3A+Unified+Sink+API
[6] https://cwiki.apache.org/confluence/display/FLINK/FLIP-147%3A+Support+Checkpoints+After+Tasks+Finished


--

Arvid Heise | Senior Java Developer


Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng   



--

Arvid Heise | Senior Java Developer


Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng   
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

Yun Gao

Hi Arvid,

Very thanks for the comments!

>>> 4) Yes, the interaction is not trivial and also I have not completely thought it through. But in general, I'm currently at the point where I think that we also need non-checkpoint related events in unaligned checkpoints. So just keep that in mind, that we might converge anyhow at this point.

I also agree with that it would be better to keep the unaligned checkpoints behavior on EndOfPartition, I will then double check on this issue again. 

>>> In general, what is helping in this case is to remember that there no unaligned checkpoint barrier ever going to overtake EndOfPartition. So, we can completely ignore the problem on how to store and restore output buffers of a completed task (also important for the next point).

Exactly, we should not need to persist the output buffers for the completed tasks, and that would simply the implementation a lot.


>>> 5) I think we are on the same page and I completely agree that for the MVP/first version, it's completely fine to start and immediately stop. A tad better would be even to not even start the procession loop. 

I also agree with this part. We would keep optimizing the implementation after the first version. 


Best,

Yun   




------------------------------------------------------------------
From:Arvid Heise <[hidden email]>
Send Time:2020 Oct. 13 (Tue.) 03:39
To:Yun Gao <[hidden email]>
Cc:Flink Dev <[hidden email]>; User-Flink <[hidden email]>
Subject:Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

Hi Yun,

4) Yes, the interaction is not trivial and also I have not completely thought it through. But in general, I'm currently at the point where I think that we also need non-checkpoint related events in unaligned checkpoints. So just keep that in mind, that we might converge anyhow at this point.

In general, what is helping in this case is to remember that there no unaligned checkpoint barrier ever going to overtake EndOfPartition. So, we can completely ignore the problem on how to store and restore output buffers of a completed task (also important for the next point).

5) I think we are on the same page and I completely agree that for the MVP/first version, it's completely fine to start and immediately stop. A tad better would be even to not even start the procession loop.

On Mon, Oct 12, 2020 at 6:18 PM Yun Gao <[hidden email]> wrote:

Hi Arvid,

Very thanks for the insightful comments! I added the responses for this issue under the quota: 

>> 1) You call the tasks that get the barriers injected leaf nodes, which would make the > sinks the root nodes. That is very similar to how graphs in relational algebra are labeled. However, I got the feeling that in Flink, we rather iterate from sources to sink, making the sources root nodes and the sinks the leaf nodes. However, I have no clue how it's done in similar cases, so please take that hint cautiously.

>> 2) I'd make the algorithm to find the subtasks iterative and react in CheckpointCoordinator. Let's assume that we inject the barrier at all root subtasks (initially all sources). So in the iterative algorithm, whenever root A finishes, it looks at all connected subtasks B if they have any upstream task left. If not B becomes a new root. That would require to only touch a part of the job graph, but would require some callback from JobManager to CheckpointCoordinator.


I think I should have used a bad name of "leaf nodes", in fact I think we should have the same thoughts that we start with the source nodes to find all the nodes whose precedent nodes are all finished. It would be much better to call these nodes (which we would trigger) as "root nodes". I'll modify the FLIP to change the names to "root nodes".

>> 2b) We also need to be careful for out-of-sync updates: if the root is about to finish, we could send the barrier to it from CheckpointCoordinator, but at the time it arrives, the subtask is finished already.

Exactly. When the checkpoint triggers a task but found the task is not there, it may then further check if the task has been finished, if so, it should then re-check its descendants to see if there are new "root nodes" to trigger.

>> 3) An implied change is that checkpoints are not aborted anymore at EndOfPartition, which is good, but might be explicitly added.

Yes, currently barrier alignment would fail the current checkpoint on EndOfPartition, and we would modify the behavior.

>> 4) The interaction between unaligned checkpoint and EndOfPartition is a bit ambiguous: What happens when an unaligned checkpoint is started and then one input channel contains the EndOfPartition event? From the written description, it sounds to me like, we move back to an aligned checkpoint for the whole receiving task. However, that is neither easily possible nor necessary. Imho it would be enough to also store the EndOfPartition in the channel state.


Very thanks for the suggestions on this issue and in fact I did stuck on it for some time. Previously for me one implementation detail issue is that EndOfPartition seems not be able to overtake the previous buffers easily as CheckpointBarrier does, otherwise it might start destroying the input channels if all EndOfPartitions are received.

Therefore, although we could also persistent the channels with EndOfPartition:

1. Start persisting the channels when CheckpointUnaligner received barrier (if not all precendant tasks are finished) or received triggering (if all precendant tasks are finished).

2. The persisting actually stops when onBuffer received EndOfPartition.

After the last channel stopped persisting, CheckpointUnaligner still need to wait till all the previous buffers are processed before complete the allBarriersReceivedFuture. Therefore it would not be able to accelerate the checkpoint in this case.

After some rethinking today currently I think we might inserts some additional virtual events into receivedBuffer when received EndOfPartition and allows these virtual events to overtake the previous buffers. I'll try to double check if it is feasible and let me know if there are also other solutions on this issue :). 

> 5) I'd expand the recovery section a bit. It would be the first time that we recover an incomplete DAG. Afaik the subtasks are deployed before the state is recovered, so at some point, the subtasks either need to be removed again or maybe we could even avoid them being created in the first place.

I also agree that finally we should not "restarted" the finished tasks in some way. It seems not start it in the first place would be better. We should be able to bookkeep additional information in the checkpoint meta about which operators are fully finished, and the scheduler could restore the status of tasks on restoring from previous checkpoints. It would also requires some modification in the task side to support input channels that are finished on starting.

But in the first version, I think we might simplify this issue by still restart all the tasks, but let the finished sources to exit directly? The new Source API would terminate directly since there is no pending splits and the legacy sources would be dealt specially by skipped execution if the source operator is fully finished before. We would be able to turn to the final solution gradually in the next steps. 


Best,

Yun


------------------------------------------------------------------
From:Arvid Heise <[hidden email]>
Send Time:2020 Oct. 12 (Mon.) 15:38
To:Yun Gao <[hidden email]>
Cc:Flink Dev <[hidden email]>; User-Flink <[hidden email]>
Subject:Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

Hi Yun,

Thank you for starting the discussion. This will solve one of the long-standing issues [1] that confuse users. I'm also a big fan of option 3. It is also a bit closer to Chandy-Lamport again.

A couple of comments:

1) You call the tasks that get the barriers injected leaf nodes, which would make the sinks the root nodes. That is very similar to how graphs in relational algebra are labeled. However, I got the feeling that in Flink, we rather iterate from sources to sink, making the sources root nodes and the sinks the leaf nodes. However, I have no clue how it's done in similar cases, so please take that hint cautiously.
2) I'd make the algorithm to find the subtasks iterative and react in CheckpointCoordinator. Let's assume that we inject the barrier at all root subtasks (initially all sources). So in the iterative algorithm, whenever root A finishes, it looks at all connected subtasks B if they have any upstream task left. If not B becomes a new root. That would require to only touch a part of the job graph, but would require some callback from JobManager to CheckpointCoordinator.
2b) We also need to be careful for out-of-sync updates: if the root is about to finish, we could send the barrier to it from CheckpointCoordinator, but at the time it arrives, the subtask is finished already.
3) An implied change is that checkpoints are not aborted anymore at EndOfPartition, which is good, but might be explicitly added.
4) The interaction between unaligned checkpoint and EndOfPartition is a bit ambiguous: What happens when an unaligned checkpoint is started and then one input channel contains the EndOfPartition event? From the written description, it sounds to me like, we move back to an aligned checkpoint for the whole receiving task. However, that is neither easily possible nor necessary. Imho it would be enough to also store the EndOfPartition in the channel state.
5) I'd expand the recovery section a bit. It would be the first time that we recover an incomplete DAG. Afaik the subtasks are deployed before the state is recovered, so at some point, the subtasks either need to be removed again or maybe we could even avoid them being created in the first place.


On Fri, Oct 9, 2020 at 8:22 AM Yun Gao <[hidden email]> wrote:
Hi, devs & users

Very sorry for the spoiled formats, I resent the discussion as follows.

As discussed in FLIP-131[1], Flink will make DataStream the unified API for processing bounded and unbounded data in both streaming and blocking modes. However, one long-standing problem for the streaming mode is that currently Flink does not support checkpoints after some tasks finished, which causes some problems for bounded or mixed jobs:
        1. Flink exactly-once sinks rely on checkpoints to ensure data won’t be replayed before committed to external systems in streaming mode. If sources are bounded and checkpoints are disabled after some tasks are finished, the data sent after the last checkpoint would always not be able to be committed. This issue has already been reported some times in the user ML[2][3][4] and is future brought up when working on FLIP-143: Unified Sink API [5]. 
        2. The jobs with both bounded and unbounded sources might have to replay a large amount of records after failover due to no periodic checkpoints are taken after the bounded sources finished.

Therefore, we propose to also support checkpoints after some tasks finished. Your Could find more details in FLIP-147[6]. 

Best,
Yun

------------------Original Mail ------------------
Sender:Yun Gao <[hidden email]>
Send Date:Fri Oct 9 14:16:52 2020
Recipients:Flink Dev <[hidden email]>, User-Flink <[hidden email]>
Subject:[DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished
Hi, devs & users

As discussed in FLIP-131 [1], Flink will make DataStream the unified API for processing bounded and unbounded data in both streaming and blocking modes. However, one long-standing problem for the streaming mode is that currently Flink does not support checkpoints after some tasks finished, which causes some problems for bounded or mixed jobs:
Flink exactly-once sinks rely on checkpoints to ensure data won’t be replayed before committed to external systems in streaming mode. If sources are bounded and checkpoints are disabled after some tasks are finished, the data sent after the last checkpoint would always not be able to be committed. This issue has already been reported some times in the user ML[2][3][4] and is future brought up when working on FLIP-143: Unified Sink API [5]. 
The jobs with both bounded and unbounded sources might have to replay a large amount of records after failover due to no periodic checkpoints are taken after the bounded sources finished.
Therefore, we propose to also support checkpoints after some tasks finished. Your Could find more details in FLIP-147[6]. 
Best,
Yun

[1] https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=158866741
[2] https://lists.apache.org/thread.html/rea1ac2d82f646fcea1395b5738be495f144c5b0312a290a1d4a339c1%40%3Cuser.flink.apache.org%3E
[3] https://lists.apache.org/thread.html/rad4adeec838093b8b56ae9e2ea6a937a4b1882b53045a12acb7e61ea%40%3Cuser.flink.apache.org%3E
[4] https://lists.apache.org/thread.html/4cf28a9fa3732dfdd9e673da6233c5288ca80b20d58cee130bf1c141%40%3Cuser.flink.apache.org%3E
[5] https://cwiki.apache.org/confluence/display/FLINK/FLIP-143%3A+Unified+Sink+API
[6] https://cwiki.apache.org/confluence/display/FLINK/FLIP-147%3A+Support+Checkpoints+After+Tasks+Finished


--

Arvid Heise| Senior Java Developer


Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng   



--

Arvid Heise | Senior Java Developer


Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng   

Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

Till Rohrmann
Thanks for starting this discussion Yun Gao,

I have three comments/questions:

1) When restarting all tasks independent of the status at checkpoint time (finished, running, scheduled), we might allocate more resources than we actually need to run the remaining job. From a scheduling perspective it would be easier if we already know that certain subtasks don't need to be rescheduled. I believe this can be an optimization, though.

2) In the section Compatibility, Deprecation and Migration Plan you mentioned that you want to record operators in the CompletedCheckpoint which are fully finished. How will this information be used for constructing a recovered ExecutionGraph? Why wouldn't the same principle work for the task level?

3) How will checkpointing work together with fully bounded jobs and FLIP-1 (fine grained recovery)?

Cheers,
Till

On Tue, Oct 13, 2020 at 9:30 AM Yun Gao <[hidden email]> wrote:

Hi Arvid,

Very thanks for the comments!

>>> 4) Yes, the interaction is not trivial and also I have not completely thought it through. But in general, I'm currently at the point where I think that we also need non-checkpoint related events in unaligned checkpoints. So just keep that in mind, that we might converge anyhow at this point.

I also agree with that it would be better to keep the unaligned checkpoints behavior on EndOfPartition, I will then double check on this issue again. 

>>> In general, what is helping in this case is to remember that there no unaligned checkpoint barrier ever going to overtake EndOfPartition. So, we can completely ignore the problem on how to store and restore output buffers of a completed task (also important for the next point).

Exactly, we should not need to persist the output buffers for the completed tasks, and that would simply the implementation a lot.


>>> 5) I think we are on the same page and I completely agree that for the MVP/first version, it's completely fine to start and immediately stop. A tad better would be even to not even start the procession loop. 

I also agree with this part. We would keep optimizing the implementation after the first version. 


Best,

Yun   




------------------------------------------------------------------
From:Arvid Heise <[hidden email]>
Send Time:2020 Oct. 13 (Tue.) 03:39
To:Yun Gao <[hidden email]>
Cc:Flink Dev <[hidden email]>; User-Flink <[hidden email]>
Subject:Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

Hi Yun,

4) Yes, the interaction is not trivial and also I have not completely thought it through. But in general, I'm currently at the point where I think that we also need non-checkpoint related events in unaligned checkpoints. So just keep that in mind, that we might converge anyhow at this point.

In general, what is helping in this case is to remember that there no unaligned checkpoint barrier ever going to overtake EndOfPartition. So, we can completely ignore the problem on how to store and restore output buffers of a completed task (also important for the next point).

5) I think we are on the same page and I completely agree that for the MVP/first version, it's completely fine to start and immediately stop. A tad better would be even to not even start the procession loop.

On Mon, Oct 12, 2020 at 6:18 PM Yun Gao <[hidden email]> wrote:

Hi Arvid,

Very thanks for the insightful comments! I added the responses for this issue under the quota: 

>> 1) You call the tasks that get the barriers injected leaf nodes, which would make the > sinks the root nodes. That is very similar to how graphs in relational algebra are labeled. However, I got the feeling that in Flink, we rather iterate from sources to sink, making the sources root nodes and the sinks the leaf nodes. However, I have no clue how it's done in similar cases, so please take that hint cautiously.

>> 2) I'd make the algorithm to find the subtasks iterative and react in CheckpointCoordinator. Let's assume that we inject the barrier at all root subtasks (initially all sources). So in the iterative algorithm, whenever root A finishes, it looks at all connected subtasks B if they have any upstream task left. If not B becomes a new root. That would require to only touch a part of the job graph, but would require some callback from JobManager to CheckpointCoordinator.


I think I should have used a bad name of "leaf nodes", in fact I think we should have the same thoughts that we start with the source nodes to find all the nodes whose precedent nodes are all finished. It would be much better to call these nodes (which we would trigger) as "root nodes". I'll modify the FLIP to change the names to "root nodes".

>> 2b) We also need to be careful for out-of-sync updates: if the root is about to finish, we could send the barrier to it from CheckpointCoordinator, but at the time it arrives, the subtask is finished already.

Exactly. When the checkpoint triggers a task but found the task is not there, it may then further check if the task has been finished, if so, it should then re-check its descendants to see if there are new "root nodes" to trigger.

>> 3) An implied change is that checkpoints are not aborted anymore at EndOfPartition, which is good, but might be explicitly added.

Yes, currently barrier alignment would fail the current checkpoint on EndOfPartition, and we would modify the behavior.

>> 4) The interaction between unaligned checkpoint and EndOfPartition is a bit ambiguous: What happens when an unaligned checkpoint is started and then one input channel contains the EndOfPartition event? From the written description, it sounds to me like, we move back to an aligned checkpoint for the whole receiving task. However, that is neither easily possible nor necessary. Imho it would be enough to also store the EndOfPartition in the channel state.


Very thanks for the suggestions on this issue and in fact I did stuck on it for some time. Previously for me one implementation detail issue is that EndOfPartition seems not be able to overtake the previous buffers easily as CheckpointBarrier does, otherwise it might start destroying the input channels if all EndOfPartitions are received.

Therefore, although we could also persistent the channels with EndOfPartition:

1. Start persisting the channels when CheckpointUnaligner received barrier (if not all precendant tasks are finished) or received triggering (if all precendant tasks are finished).

2. The persisting actually stops when onBuffer received EndOfPartition.

After the last channel stopped persisting, CheckpointUnaligner still need to wait till all the previous buffers are processed before complete the allBarriersReceivedFuture. Therefore it would not be able to accelerate the checkpoint in this case.

After some rethinking today currently I think we might inserts some additional virtual events into receivedBuffer when received EndOfPartition and allows these virtual events to overtake the previous buffers. I'll try to double check if it is feasible and let me know if there are also other solutions on this issue :). 

> 5) I'd expand the recovery section a bit. It would be the first time that we recover an incomplete DAG. Afaik the subtasks are deployed before the state is recovered, so at some point, the subtasks either need to be removed again or maybe we could even avoid them being created in the first place.

I also agree that finally we should not "restarted" the finished tasks in some way. It seems not start it in the first place would be better. We should be able to bookkeep additional information in the checkpoint meta about which operators are fully finished, and the scheduler could restore the status of tasks on restoring from previous checkpoints. It would also requires some modification in the task side to support input channels that are finished on starting.

But in the first version, I think we might simplify this issue by still restart all the tasks, but let the finished sources to exit directly? The new Source API would terminate directly since there is no pending splits and the legacy sources would be dealt specially by skipped execution if the source operator is fully finished before. We would be able to turn to the final solution gradually in the next steps. 


Best,

Yun


------------------------------------------------------------------
From:Arvid Heise <[hidden email]>
Send Time:2020 Oct. 12 (Mon.) 15:38
To:Yun Gao <[hidden email]>
Cc:Flink Dev <[hidden email]>; User-Flink <[hidden email]>
Subject:Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

Hi Yun,

Thank you for starting the discussion. This will solve one of the long-standing issues [1] that confuse users. I'm also a big fan of option 3. It is also a bit closer to Chandy-Lamport again.

A couple of comments:

1) You call the tasks that get the barriers injected leaf nodes, which would make the sinks the root nodes. That is very similar to how graphs in relational algebra are labeled. However, I got the feeling that in Flink, we rather iterate from sources to sink, making the sources root nodes and the sinks the leaf nodes. However, I have no clue how it's done in similar cases, so please take that hint cautiously.
2) I'd make the algorithm to find the subtasks iterative and react in CheckpointCoordinator. Let's assume that we inject the barrier at all root subtasks (initially all sources). So in the iterative algorithm, whenever root A finishes, it looks at all connected subtasks B if they have any upstream task left. If not B becomes a new root. That would require to only touch a part of the job graph, but would require some callback from JobManager to CheckpointCoordinator.
2b) We also need to be careful for out-of-sync updates: if the root is about to finish, we could send the barrier to it from CheckpointCoordinator, but at the time it arrives, the subtask is finished already.
3) An implied change is that checkpoints are not aborted anymore at EndOfPartition, which is good, but might be explicitly added.
4) The interaction between unaligned checkpoint and EndOfPartition is a bit ambiguous: What happens when an unaligned checkpoint is started and then one input channel contains the EndOfPartition event? From the written description, it sounds to me like, we move back to an aligned checkpoint for the whole receiving task. However, that is neither easily possible nor necessary. Imho it would be enough to also store the EndOfPartition in the channel state.
5) I'd expand the recovery section a bit. It would be the first time that we recover an incomplete DAG. Afaik the subtasks are deployed before the state is recovered, so at some point, the subtasks either need to be removed again or maybe we could even avoid them being created in the first place.


On Fri, Oct 9, 2020 at 8:22 AM Yun Gao <[hidden email]> wrote:
Hi, devs & users

Very sorry for the spoiled formats, I resent the discussion as follows.

As discussed in FLIP-131[1], Flink will make DataStream the unified API for processing bounded and unbounded data in both streaming and blocking modes. However, one long-standing problem for the streaming mode is that currently Flink does not support checkpoints after some tasks finished, which causes some problems for bounded or mixed jobs:
        1. Flink exactly-once sinks rely on checkpoints to ensure data won’t be replayed before committed to external systems in streaming mode. If sources are bounded and checkpoints are disabled after some tasks are finished, the data sent after the last checkpoint would always not be able to be committed. This issue has already been reported some times in the user ML[2][3][4] and is future brought up when working on FLIP-143: Unified Sink API [5]. 
        2. The jobs with both bounded and unbounded sources might have to replay a large amount of records after failover due to no periodic checkpoints are taken after the bounded sources finished.

Therefore, we propose to also support checkpoints after some tasks finished. Your Could find more details in FLIP-147[6]. 

Best,
Yun

------------------Original Mail ------------------
Sender:Yun Gao <[hidden email]>
Send Date:Fri Oct 9 14:16:52 2020
Recipients:Flink Dev <[hidden email]>, User-Flink <[hidden email]>
Subject:[DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished
Hi, devs & users

As discussed in FLIP-131 [1], Flink will make DataStream the unified API for processing bounded and unbounded data in both streaming and blocking modes. However, one long-standing problem for the streaming mode is that currently Flink does not support checkpoints after some tasks finished, which causes some problems for bounded or mixed jobs:
Flink exactly-once sinks rely on checkpoints to ensure data won’t be replayed before committed to external systems in streaming mode. If sources are bounded and checkpoints are disabled after some tasks are finished, the data sent after the last checkpoint would always not be able to be committed. This issue has already been reported some times in the user ML[2][3][4] and is future brought up when working on FLIP-143: Unified Sink API [5]. 
The jobs with both bounded and unbounded sources might have to replay a large amount of records after failover due to no periodic checkpoints are taken after the bounded sources finished.
Therefore, we propose to also support checkpoints after some tasks finished. Your Could find more details in FLIP-147[6]. 
Best,
Yun

[1] https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=158866741
[2] https://lists.apache.org/thread.html/rea1ac2d82f646fcea1395b5738be495f144c5b0312a290a1d4a339c1%40%3Cuser.flink.apache.org%3E
[3] https://lists.apache.org/thread.html/rad4adeec838093b8b56ae9e2ea6a937a4b1882b53045a12acb7e61ea%40%3Cuser.flink.apache.org%3E
[4] https://lists.apache.org/thread.html/4cf28a9fa3732dfdd9e673da6233c5288ca80b20d58cee130bf1c141%40%3Cuser.flink.apache.org%3E
[5] https://cwiki.apache.org/confluence/display/FLINK/FLIP-143%3A+Unified+Sink+API
[6] https://cwiki.apache.org/confluence/display/FLINK/FLIP-147%3A+Support+Checkpoints+After+Tasks+Finished


--

Arvid Heise| Senior Java Developer


Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng   



--

Arvid Heise | Senior Java Developer


Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng   

Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

Yun Gao

Hi Till,

Very thanks for the feedbacks !

> 1) When restarting all tasks independent of the status at checkpoint time (finished, running, scheduled), we might allocate more resources than we actually need to run the remaining job. From a scheduling perspective it would be easier if we already know that certain subtasks don't need to be rescheduled. I believe this can be an optimization, though.

> 2) In the section Compatibility, Deprecation and Migration Plan you mentioned that you want to record operators in the CompletedCheckpoint which are fully finished. How will this information be used for constructing a recovered ExecutionGraph? Why wouldn't the same principle work for the task level?


I think the first two issues should be related. The main reason that with external checkpoints the checkpoint might taken from one job and used in another jobs, but we do not have a unique ID to match tasks across jobs. Furthermore, users may also change the parallelism of JobVertex, or even modify the graph structures by adding/removing operators or changing the chain relationship between operators. 

On the other side, currently Flink already provides custom UID for operators, which makes the operators a stable unit for recovery. The current checkpoints are also organized in the unit of operators to support rescale and job Upgrading. 

When restarting from a checkpoint with finished operators, we could only starts the tasks with operators that are not fully finished (namely some subtasks are still running when taking checkpoints). Then during the execution of a single task, we only initialize/open/run/close the operators not fully finished. The Scheduler should be able to compute if a tasks contains not fully finished operators with the current JobGraph and the operator finish states restored from the checkpoints.

 

> 3) How will checkpointing work together with fully bounded jobs and FLIP-1 (fine grained recovery)?

Currently I think it should be compatible with fully bounded jobs and FLIP-1 since it could be viewed as a completion of the current checkpoint mechanism. Concretely

1. The batch job (with blocking execution mode) should be not affected since checkpoints are not enabled in this case.

2. The bounded job running with pipeline mode would be also supported with checkpoints during it is finishing with the modification. As discussed in the FLIP it should not affect the current behavior after restored for almost all the jobs.

3. The region failover and more fine-grained tasks should also not be affected: similar to the previous behavior, after failover, the failover policy (full/region/fine-grained) decides which tasks to restart and the checkpoint only decides what state are restored for these tasks. The only difference with this modification is that these tasks are now might restored from a checkpoints taken after some tasks are finished. Since the perviously finished tasks would always be skipped by not started or run an empty execution, and the behavior of the previously running tasks should keeps unchanged, the overall behavior should be not affected.


Best,
Yun

------------------------------------------------------------------
From:Till Rohrmann <[hidden email]>
Send Time:2020 Oct. 13 (Tue.) 17:25
To:Yun Gao <[hidden email]>
Cc:Arvid Heise <[hidden email]>; Flink Dev <[hidden email]>; User-Flink <[hidden email]>
Subject:Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

Thanks for starting this discussion Yun Gao,

I have three comments/questions:

1) When restarting all tasks independent of the status at checkpoint time (finished, running, scheduled), we might allocate more resources than we actually need to run the remaining job. From a scheduling perspective it would be easier if we already know that certain subtasks don't need to be rescheduled. I believe this can be an optimization, though.

2) In the section Compatibility, Deprecation and Migration Plan you mentioned that you want to record operators in the CompletedCheckpoint which are fully finished. How will this information be used for constructing a recovered ExecutionGraph? Why wouldn't the same principle work for the task level?

3) How will checkpointing work together with fully bounded jobs and FLIP-1 (fine grained recovery)?

Cheers,
Till

On Tue, Oct 13, 2020 at 9:30 AM Yun Gao <[hidden email]> wrote:

Hi Arvid,

Very thanks for the comments!

>>> 4) Yes, the interaction is not trivial and also I have not completely thought it through. But in general, I'm currently at the point where I think that we also need non-checkpoint related events in unaligned checkpoints. So just keep that in mind, that we might converge anyhow at this point.

I also agree with that it would be better to keep the unaligned checkpoints behavior on EndOfPartition, I will then double check on this issue again. 

>>> In general, what is helping in this case is to remember that there no unaligned checkpoint barrier ever going to overtake EndOfPartition. So, we can completely ignore the problem on how to store and restore output buffers of a completed task (also important for the next point).

Exactly, we should not need to persist the output buffers for the completed tasks, and that would simply the implementation a lot.


>>> 5) I think we are on the same page and I completely agree that for the MVP/first version, it's completely fine to start and immediately stop. A tad better would be even to not even start the procession loop. 

I also agree with this part. We would keep optimizing the implementation after the first version. 


Best,

Yun   


Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

Yun Gao

Hi all,

    I would like to resume this discussion for supporting checkpoints after tasks Finished :) Based on the previous discussion, we now implement a version of PoC [1] to try the idea. During the PoC we also met with some possible issues:

    1. To include EndOfPartition into consideration for barrier alignment at the TM side, we now tend to decouple the logic for EndOfPartition with the normal alignment behaviors to avoid the complex interference (which seems to be a bit not trackable). We could do so by inserting suitable barriers for input channels received but not processed EndOfPartition. For example, if a task with four inputs has received barrier 2 from two input channels, but the other two inputs do not received barrier 2  before EndOfPartition due to the precedent tasks are finished, we could then insert barrier 2 for the last two channels so that we could still finish the checkpoint 2.
 
    2. As we have discussed, if a tasks finished during we triggering the tasks, it would cause checkpoint failure and we should re-trigger its descendants. But if possible we think we might skip this issue at the first version to reduce the implementation complexity since it should not affect the correctness. We could considering support it in the following versions.

    3. We would have to add a field isFinished  to OperatorState so that we could not re-run finished sources after failover. However, this would require a new version of checkpoint meta. Currently Flink have an abstract MetaV2V3SerializerBase and have V2 and V3 extends it to share some implementation. To add V4 which is only different from V3 for one field, the current PoC want to introduce a new MetaV3V4SerializerBase extends MetaV2V3SerializerBase to share implementation between V3 and V4. This might looks a little complex and we might need a general mechanism to extend checkpoint meta format. 

    4. With the change StreamTask would have two types of subclasses according to how to implement triggerCheckpoint, one is source tasks that perform checkpoints immediately and another is the non-source tasks that would notify CheckpointBarrierHandler in some way. However, since we have multiple source tasks (legacy and new source) and multiple non-source tasks (one-input, two-input, multiple-input), it would cause the cases that multiple subclasses share the same implementation and  cause code repetition. Currently the PoC introduces a new level of abstraction, namely SourceStreamTasks and NonSourceStreamTasks, but what makes it more complicated is that StreamingIterationHead extends OneInputStreamTask but it need to perform checkpoint as source tasks.

Glad to hear your opinions!

Best,
 Yun

[1] https://github.com/gaoyunhaii/flink/commits/try_checkpoint_6 , starts from commit f8005be1ab5e5124e981e56db7bdf2908f4a969a.
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

Aljoscha Krettek
Thanks for the thorough update! I'll answer inline.

On 14.12.20 16:33, Yun Gao wrote:
>      1. To include EndOfPartition into consideration for barrier alignment at the TM side, we now tend to decouple the logic for EndOfPartition with the normal alignment behaviors to avoid the complex interference (which seems to be a bit not trackable). We could do so by inserting suitable barriers for input channels received but not processed EndOfPartition. For example, if a task with four inputs has received barrier 2 from two input channels, but the other two inputs do not received barrier 2  before EndOfPartition due to the precedent tasks are finished, we could then insert barrier 2 for the last two channels so that we could still finish the checkpoint 2.

You mean we would insert "artificial" barriers for barrier 2 in case we
receive  EndOfPartition while other inputs have already received barrier
2? I think that makes sense, yes.

>      2. As we have discussed, if a tasks finished during we triggering the tasks, it would cause checkpoint failure and we should re-trigger its descendants. But if possible we think we might skip this issue at the first version to reduce the implementation complexity since it should not affect the correctness. We could considering support it in the following versions.

I think this should be completely fine.

>      3. We would have to add a field isFinished  to OperatorState so that we could not re-run finished sources after failover. However, this would require a new version of checkpoint meta. Currently Flink have an abstract MetaV2V3SerializerBase and have V2 and V3 extends it to share some implementation. To add V4 which is only different from V3 for one field, the current PoC want to introduce a new MetaV3V4SerializerBase extends MetaV2V3SerializerBase to share implementation between V3 and V4. This might looks a little complex and we might need a general mechanism to extend checkpoint meta format.

This indeed seems complex. Maybe we could switch to using composition
instead of inheritance to make this more extensible?

>      4. With the change StreamTask would have two types of subclasses according to how to implement triggerCheckpoint, one is source tasks that perform checkpoints immediately and another is the non-source tasks that would notify CheckpointBarrierHandler in some way. However, since we have multiple source tasks (legacy and new source) and multiple non-source tasks (one-input, two-input, multiple-input), it would cause the cases that multiple subclasses share the same implementation and  cause code repetition. Currently the PoC introduces a new level of abstraction, namely SourceStreamTasks and NonSourceStreamTasks, but what makes it more complicated is that StreamingIterationHead extends OneInputStreamTask but it need to perform checkpoint as source tasks.

Don't we currently have the same problem? Even right now source tasks
and non-source tasks behave differently when it comes to checkpoints.
Are you saying we should fix that or would the new work introduce even
more duplicate code?

Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

Yun Gao
In reply to this post by Yun Gao
     Hi Aljoscha,

        Very thanks for the feedbacks! For the remaining issues:

      > 1. You mean we would insert "artificial" barriers for barrier 2 in case we receive  EndOfPartition while other inputs have already received barrier 2? I think that makes sense, yes.
     
      Yes, exactly, I would like to  insert "artificial" barriers for in case we receive  EndOfPartition while other inputs have already received barrier 2, and also for the similar cases that some input channels received EndOfPartition during checkpoint 2 is ongoing and when the task receive directly checkpoint triggering after all the precedent tasks are finished but not received their EndOfPartition yet.

     > 3. This indeed seems complex. Maybe we could switch to using composition instead of inheritance to make this more extensible?

    I re-checked the code and now I think composition would be better to avoid complex inheritance hierarchy by exposing the changed part `(de)serializeOperatorState` out, and I'll update the PoC to change this part. Very thanks for the suggestions!

   > 4. Don't we currently have the same problem? Even right now source tasks and non-source tasks behave differently when it comes to checkpoints. Are you saying we should fix that or would the new work introduce even 
more duplicate code?

  Currently since we would never trigger non-source tasks, thus the triggerCheckpoint logic is now implemented in the base StreamTask class and only be used by the source tasks. However, after the change the non-source tasks would also get triggered with a different behavior, we might not be able to continue using this pattern.

Best,
Yun
  
      
    

------------------------------------------------------------------
From:Aljoscha Krettek <[hidden email]>
Send Time:2020 Dec. 15 (Tue.) 18:11
Subject:Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

Thanks for the thorough update! I'll answer inline.

On 14.12.20 16:33, Yun Gao wrote:
>      1. To include EndOfPartition into consideration for barrier alignment at the TM side, we now tend to decouple the logic for EndOfPartition with the normal alignment behaviors to avoid the complex interference (which seems to be a bit not trackable). We could do so by inserting suitable barriers for input channels received but not processed EndOfPartition. For example, if a task with four inputs has received barrier 2 from two input channels, but the other two inputs do not received barrier 2  before EndOfPartition due to the precedent tasks are finished, we could then insert barrier 2 for the last two channels so that we could still finish the checkpoint 2.

You mean we would insert "artificial" barriers for barrier 2 in case we 
receive  EndOfPartition while other inputs have already received barrier 
2? I think that makes sense, yes.

>      2. As we have discussed, if a tasks finished during we triggering the tasks, it would cause checkpoint failure and we should re-trigger its descendants. But if possible we think we might skip this issue at the first version to reduce the implementation complexity since it should not affect the correctness. We could considering support it in the following versions.

I think this should be completely fine.

>      3. We would have to add a field isFinished  to OperatorState so that we could not re-run finished sources after failover. However, this would require a new version of checkpoint meta. Currently Flink have an abstract MetaV2V3SerializerBase and have V2 and V3 extends it to share some implementation. To add V4 which is only different from V3 for one field, the current PoC want to introduce a new MetaV3V4SerializerBase extends MetaV2V3SerializerBase to share implementation between V3 and V4. This might looks a little complex and we might need a general mechanism to extend checkpoint meta format.

This indeed seems complex. Maybe we could switch to using composition 
instead of inheritance to make this more extensible?

>      4. With the change StreamTask would have two types of subclasses according to how to implement triggerCheckpoint, one is source tasks that perform checkpoints immediately and another is the non-source tasks that would notify CheckpointBarrierHandler in some way. However, since we have multiple source tasks (legacy and new source) and multiple non-source tasks (one-input, two-input, multiple-input), it would cause the cases that multiple subclasses share the same implementation and  cause code repetition. Currently the PoC introduces a new level of abstraction, namely SourceStreamTasks and NonSourceStreamTasks, but what makes it more complicated is that StreamingIterationHead extends OneInputStreamTask but it need to perform checkpoint as source tasks.

Don't we currently have the same problem? Even right now source tasks 
and non-source tasks behave differently when it comes to checkpoints. 
Are you saying we should fix that or would the new work introduce even 
more duplicate code?

Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

Yun Gao
   Hi all,

         I tested the previous PoC with the current tests and I found some new issues that might cause divergence, and sorry for there might also be some reversal for some previous problems:

 
     1. Which operators should wait for one more checkpoint before close ?

        One motivation for this FLIP is to ensure the 2PC sink commits the last part of data before closed, which makes the sink operator need to wait for one more checkpoint like onEndOfInput() -> waitForCheckpoint() -> notifyCheckpointComplete() -> close(). This lead to the issue which operators should wait for checkpoint? Possible options are 
                 a. Make all the operators (or UDF) implemented notifyCheckpointCompleted method wait for one more checkpoint. One exception is that since we can only snapshot one or all tasks for a legacy source operator to avoid data repetition[1], we could not support legacy operators and its chained operators to wait for checkpoints since there will be deadlock if part of the tasks are finished, this would finally be solved after legacy source are deprecated. The PoC used this option for now.
                b. Make operators (or UDF) implemented a special marker interface to wait for one more checkpoint.  


   2. Do we need to solve the case that tasks finished before triggered ?

      Previously I think we could postpone it, however, during testing I found that it might cause some problems since by default checkpoint failure would cause job failover, and the job would also need wait for another interval to trigger the next checkpoint. To pass the tests, I updated the PoC to include this part, and we may have a double think on if we need to include it or use some other options.

3. How to extend a new format for checkpoint meta ?

    Sorry previously I gave a wrong estimation, after I extract a sub-component for (de)serialize operator state, I found the problem just goes to the new OperatorStateSerializer. The problem seems to be that v2, v3 and v4 have different fields, thus they use different process when (de)serialize, which is a bit different from the case that we have a fixed steps and each step has different logic. Thus we might either
     a. Use base classes for each two version.
     b. Or have a unified framework contains all the possible fields across all version, and use empty field serializer to skip some fields in each version.

Best,
Yun


------------------------------------------------------------------
From:Yun Gao <[hidden email]>
Send Time:2020 Dec. 16 (Wed.) 11:07
To:Aljoscha Krettek <[hidden email]>; dev <[hidden email]>; user <[hidden email]>
Subject:Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

     Hi Aljoscha,

        Very thanks for the feedbacks! For the remaining issues:

      > 1. You mean we would insert "artificial" barriers for barrier 2 in case we receive  EndOfPartition while other inputs have already received barrier 2? I think that makes sense, yes.

      Yes, exactly, I would like to  insert "artificial" barriers for in case we receive  EndOfPartition while other inputs have already received barrier 2, and also for the similar cases that some input channels received EndOfPartition during checkpoint 2 is ongoing and when the task receive directly checkpoint triggering after all the precedent tasks are finished but not received their EndOfPartition yet.

     > 3. This indeed seems complex. Maybe we could switch to using composition instead of inheritance to make this more extensible?

    I re-checked the code and now I think composition would be better to avoid complex inheritance hierarchy by exposing the changed part `(de)serializeOperatorState` out, and I'll update the PoC to change this part. Very thanks for the suggestions!

   > 4. Don't we currently have the same problem? Even right now source tasks and non-source tasks behave differently when it comes to checkpoints. Are you saying we should fix that or would the new work introduce even 
more duplicate code?

  Currently since we would never trigger non-source tasks, thus the triggerCheckpoint logic is now implemented in the base StreamTask class and only be used by the source tasks. However, after the change the non-source tasks would also get triggered with a different behavior, we might not be able to continue using this pattern.

Best,
Yun


------------------------------------------------------------------
From:Aljoscha Krettek <[hidden email]>
Send Time:2020 Dec. 15 (Tue.) 18:11
To:dev <[hidden email]>
Subject:Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

Thanks for the thorough update! I'll answer inline.

On 14.12.20 16:33, Yun Gao wrote:
>      1. To include EndOfPartition into consideration for barrier alignment at the TM side, we now tend to decouple the logic for EndOfPartition with the normal alignment behaviors to avoid the complex interference (which seems to be a bit not trackable). We could do so by inserting suitable barriers for input channels received but not processed EndOfPartition. For example, if a task with four inputs has received barrier 2 from two input channels, but the other two inputs do not received barrier 2  before EndOfPartition due to the precedent tasks are finished, we could then insert barrier 2 for the last two channels so that we could still finish the checkpoint 2.

You mean we would insert "artificial" barriers for barrier 2 in case we 
receive  EndOfPartition while other inputs have already received barrier 
2? I think that makes sense, yes.

>      2. As we have discussed, if a tasks finished during we triggering the tasks, it would cause checkpoint failure and we should re-trigger its descendants. But if possible we think we might skip this issue at the first version to reduce the implementation complexity since it should not affect the correctness. We could considering support it in the following versions.

I think this should be completely fine.

>      3. We would have to add a field isFinished  to OperatorState so that we could not re-run finished sources after failover. However, this would require a new version of checkpoint meta. Currently Flink have an abstract MetaV2V3SerializerBase and have V2 and V3 extends it to share some implementation. To add V4 which is only different from V3 for one field, the current PoC want to introduce a new MetaV3V4SerializerBase extends MetaV2V3SerializerBase to share implementation between V3 and V4. This might looks a little complex and we might need a general mechanism to extend checkpoint meta format.

This indeed seems complex. Maybe we could switch to using composition 
instead of inheritance to make this more extensible?

>      4. With the change StreamTask would have two types of subclasses according to how to implement triggerCheckpoint, one is source tasks that perform checkpoints immediately and another is the non-source tasks that would notify CheckpointBarrierHandler in some way. However, since we have multiple source tasks (legacy and new source) and multiple non-source tasks (one-input, two-input, multiple-input), it would cause the cases that multiple subclasses share the same implementation and  cause code repetition. Currently the PoC introduces a new level of abstraction, namely SourceStreamTasks and NonSourceStreamTasks, but what makes it more complicated is that StreamingIterationHead extends OneInputStreamTask but it need to perform checkpoint as source tasks.

Don't we currently have the same problem? Even right now source tasks 
and non-source tasks behave differently when it comes to checkpoints. 
Are you saying we should fix that or would the new work introduce even 
more duplicate code?


Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

Arvid Heise-3
Hi Yun,

1. I'd think that this is an orthogonal issue, which I'd solve separately. My gut feeling says that this is something we should only address for new sinks where we decouple the semantics of commits and checkpoints anyways. [hidden email] any idea on this one?

2. I'm not sure I get it completely. Let's assume we have a source partition that is finished before the first checkpoint. Then, we would need to store the finished state of the subtask somehow. So I'm assuming, we still need to trigger some checkpointing code on finished subtasks.

3. Do we really want to store the finished flag in OperatorState? I was assuming we want to have it more fine-grained on OperatorSubtaskState. Maybe we can store the flag inside managed or raw state without changing the format?



On Fri, Dec 25, 2020 at 8:39 AM Yun Gao <[hidden email]> wrote:
   Hi all,

         I tested the previous PoC with the current tests and I found some new issues that might cause divergence, and sorry for there might also be some reversal for some previous problems:

 
     1. Which operators should wait for one more checkpoint before close ?

        One motivation for this FLIP is to ensure the 2PC sink commits the last part of data before closed, which makes the sink operator need to wait for one more checkpoint like onEndOfInput() -> waitForCheckpoint() -> notifyCheckpointComplete() -> close(). This lead to the issue which operators should wait for checkpoint? Possible options are 
                 a. Make all the operators (or UDF) implemented notifyCheckpointCompleted method wait for one more checkpoint. One exception is that since we can only snapshot one or all tasks for a legacy source operator to avoid data repetition[1], we could not support legacy operators and its chained operators to wait for checkpoints since there will be deadlock if part of the tasks are finished, this would finally be solved after legacy source are deprecated. The PoC used this option for now.
                b. Make operators (or UDF) implemented a special marker interface to wait for one more checkpoint.  


   2. Do we need to solve the case that tasks finished before triggered ?

      Previously I think we could postpone it, however, during testing I found that it might cause some problems since by default checkpoint failure would cause job failover, and the job would also need wait for another interval to trigger the next checkpoint. To pass the tests, I updated the PoC to include this part, and we may have a double think on if we need to include it or use some other options.

3. How to extend a new format for checkpoint meta ?

    Sorry previously I gave a wrong estimation, after I extract a sub-component for (de)serialize operator state, I found the problem just goes to the new OperatorStateSerializer. The problem seems to be that v2, v3 and v4 have different fields, thus they use different process when (de)serialize, which is a bit different from the case that we have a fixed steps and each step has different logic. Thus we might either
     a. Use base classes for each two version.
     b. Or have a unified framework contains all the possible fields across all version, and use empty field serializer to skip some fields in each version.

Best,
Yun


------------------------------------------------------------------
From:Yun Gao <[hidden email]>
Send Time:2020 Dec. 16 (Wed.) 11:07
To:Aljoscha Krettek <[hidden email]>; dev <[hidden email]>; user <[hidden email]>
Subject:Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

     Hi Aljoscha,

        Very thanks for the feedbacks! For the remaining issues:

      > 1. You mean we would insert "artificial" barriers for barrier 2 in case we receive  EndOfPartition while other inputs have already received barrier 2? I think that makes sense, yes.

      Yes, exactly, I would like to  insert "artificial" barriers for in case we receive  EndOfPartition while other inputs have already received barrier 2, and also for the similar cases that some input channels received EndOfPartition during checkpoint 2 is ongoing and when the task receive directly checkpoint triggering after all the precedent tasks are finished but not received their EndOfPartition yet.

     > 3. This indeed seems complex. Maybe we could switch to using composition instead of inheritance to make this more extensible?

    I re-checked the code and now I think composition would be better to avoid complex inheritance hierarchy by exposing the changed part `(de)serializeOperatorState` out, and I'll update the PoC to change this part. Very thanks for the suggestions!

   > 4. Don't we currently have the same problem? Even right now source tasks and non-source tasks behave differently when it comes to checkpoints. Are you saying we should fix that or would the new work introduce even 
more duplicate code?

  Currently since we would never trigger non-source tasks, thus the triggerCheckpoint logic is now implemented in the base StreamTask class and only be used by the source tasks. However, after the change the non-source tasks would also get triggered with a different behavior, we might not be able to continue using this pattern.

Best,
Yun


------------------------------------------------------------------
From:Aljoscha Krettek <[hidden email]>
Send Time:2020 Dec. 15 (Tue.) 18:11
To:dev <[hidden email]>
Subject:Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

Thanks for the thorough update! I'll answer inline.

On 14.12.20 16:33, Yun Gao wrote:
>      1. To include EndOfPartition into consideration for barrier alignment at the TM side, we now tend to decouple the logic for EndOfPartition with the normal alignment behaviors to avoid the complex interference (which seems to be a bit not trackable). We could do so by inserting suitable barriers for input channels received but not processed EndOfPartition. For example, if a task with four inputs has received barrier 2 from two input channels, but the other two inputs do not received barrier 2  before EndOfPartition due to the precedent tasks are finished, we could then insert barrier 2 for the last two channels so that we could still finish the checkpoint 2.

You mean we would insert "artificial" barriers for barrier 2 in case we 
receive  EndOfPartition while other inputs have already received barrier 
2? I think that makes sense, yes.

>      2. As we have discussed, if a tasks finished during we triggering the tasks, it would cause checkpoint failure and we should re-trigger its descendants. But if possible we think we might skip this issue at the first version to reduce the implementation complexity since it should not affect the correctness. We could considering support it in the following versions.

I think this should be completely fine.

>      3. We would have to add a field isFinished  to OperatorState so that we could not re-run finished sources after failover. However, this would require a new version of checkpoint meta. Currently Flink have an abstract MetaV2V3SerializerBase and have V2 and V3 extends it to share some implementation. To add V4 which is only different from V3 for one field, the current PoC want to introduce a new MetaV3V4SerializerBase extends MetaV2V3SerializerBase to share implementation between V3 and V4. This might looks a little complex and we might need a general mechanism to extend checkpoint meta format.

This indeed seems complex. Maybe we could switch to using composition 
instead of inheritance to make this more extensible?

>      4. With the change StreamTask would have two types of subclasses according to how to implement triggerCheckpoint, one is source tasks that perform checkpoints immediately and another is the non-source tasks that would notify CheckpointBarrierHandler in some way. However, since we have multiple source tasks (legacy and new source) and multiple non-source tasks (one-input, two-input, multiple-input), it would cause the cases that multiple subclasses share the same implementation and  cause code repetition. Currently the PoC introduces a new level of abstraction, namely SourceStreamTasks and NonSourceStreamTasks, but what makes it more complicated is that StreamingIterationHead extends OneInputStreamTask but it need to perform checkpoint as source tasks.

Don't we currently have the same problem? Even right now source tasks 
and non-source tasks behave differently when it comes to checkpoints. 
Are you saying we should fix that or would the new work introduce even 
more duplicate code?




--

Arvid Heise | Senior Java Developer


Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng   
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

Yun Gao
     Hi Avrid, 

         Very thanks for the feedbacks!

         For the second issue, sorry I think I might not make it very clear, I'm initially thinking the case that for example for a job with graph A -> B -> C, when we compute which tasks to trigger, A is still running, so we trigger A to start the checkpoint. However, before the triggering message reached A, A gets finished and the trigger message failed due to not found the task. In this case if we do not handle it, the checkpoint would failed due to timeout. However, by default failed checkpoint would cause job failure and we would also need to wait for a checkpoint interval for the next checkpoint. One solution would be check all the pending checkpoints to trigger B instead when JM is notified that A is finished.

       For the third issue, it should work if we store a special value for some filed in OperatorState or OperatorSubtaskState, for example, we might store a special subtaskState map inside the OperatorState to mark it is finished since the finished operator should always have an empty state. Very thanks for the advices! I'll try with this method. 

Best,
 Yun


------------------------------------------------------------------
From:Arvid Heise <[hidden email]>
Send Time:2021 Jan. 5 (Tue.) 17:16
To:Yun Gao <[hidden email]>
Cc:Aljoscha Krettek <[hidden email]>; dev <[hidden email]>; user <[hidden email]>
Subject:Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

Hi Yun,

1. I'd think that this is an orthogonal issue, which I'd solve separately. My gut feeling says that this is something we should only address for new sinks where we decouple the semantics of commits and checkpoints anyways. [hidden email] any idea on this one?

2. I'm not sure I get it completely. Let's assume we have a source partition that is finished before the first checkpoint. Then, we would need to store the finished state of the subtask somehow. So I'm assuming, we still need to trigger some checkpointing code on finished subtasks.

3. Do we really want to store the finished flag in OperatorState? I was assuming we want to have it more fine-grained on OperatorSubtaskState. Maybe we can store the flag inside managed or raw state without changing the format?



On Fri, Dec 25, 2020 at 8:39 AM Yun Gao <[hidden email]> wrote:
   Hi all,

         I tested the previous PoC with the current tests and I found some new issues that might cause divergence, and sorry for there might also be some reversal for some previous problems:

 
     1. Which operators should wait for one more checkpoint before close ?

        One motivation for this FLIP is to ensure the 2PC sink commits the last part of data before closed, which makes the sink operator need to wait for one more checkpoint like onEndOfInput() -> waitForCheckpoint() -> notifyCheckpointComplete() -> close(). This lead to the issue which operators should wait for checkpoint? Possible options are 
                 a. Make all the operators (or UDF) implemented notifyCheckpointCompleted method wait for one more checkpoint. One exception is that since we can only snapshot one or all tasks for a legacy source operator to avoid data repetition[1], we could not support legacy operators and its chained operators to wait for checkpoints since there will be deadlock if part of the tasks are finished, this would finally be solved after legacy source are deprecated. The PoC used this option for now.
                b. Make operators (or UDF) implemented a special marker interface to wait for one more checkpoint.  


   2. Do we need to solve the case that tasks finished before triggered ?

      Previously I think we could postpone it, however, during testing I found that it might cause some problems since by default checkpoint failure would cause job failover, and the job would also need wait for another interval to trigger the next checkpoint. To pass the tests, I updated the PoC to include this part, and we may have a double think on if we need to include it or use some other options.

3. How to extend a new format for checkpoint meta ?

    Sorry previously I gave a wrong estimation, after I extract a sub-component for (de)serialize operator state, I found the problem just goes to the new OperatorStateSerializer. The problem seems to be that v2, v3 and v4 have different fields, thus they use different process when (de)serialize, which is a bit different from the case that we have a fixed steps and each step has different logic. Thus we might either
     a. Use base classes for each two version.
     b. Or have a unified framework contains all the possible fields across all version, and use empty field serializer to skip some fields in each version.

Best,
Yun


------------------------------------------------------------------
From:Yun Gao <[hidden email]>
Send Time:2020 Dec. 16 (Wed.) 11:07
To:Aljoscha Krettek <[hidden email]>; dev <[hidden email]>; user <[hidden email]>
Subject:Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

     Hi Aljoscha,

        Very thanks for the feedbacks! For the remaining issues:

      > 1. You mean we would insert "artificial" barriers for barrier 2 in case we receive  EndOfPartition while other inputs have already received barrier 2? I think that makes sense, yes.

      Yes, exactly, I would like to  insert "artificial" barriers for in case we receive  EndOfPartition while other inputs have already received barrier 2, and also for the similar cases that some input channels received EndOfPartition during checkpoint 2 is ongoing and when the task receive directly checkpoint triggering after all the precedent tasks are finished but not received their EndOfPartition yet.

     > 3. This indeed seems complex. Maybe we could switch to using composition instead of inheritance to make this more extensible?

    I re-checked the code and now I think composition would be better to avoid complex inheritance hierarchy by exposing the changed part `(de)serializeOperatorState` out, and I'll update the PoC to change this part. Very thanks for the suggestions!

   > 4. Don't we currently have the same problem? Even right now source tasks and non-source tasks behave differently when it comes to checkpoints. Are you saying we should fix that or would the new work introduce even 
more duplicate code?

  Currently since we would never trigger non-source tasks, thus the triggerCheckpoint logic is now implemented in the base StreamTask class and only be used by the source tasks. However, after the change the non-source tasks would also get triggered with a different behavior, we might not be able to continue using this pattern.

Best,
Yun


------------------------------------------------------------------
From:Aljoscha Krettek <[hidden email]>
Send Time:2020 Dec. 15 (Tue.) 18:11
To:dev <[hidden email]>
Subject:Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

Thanks for the thorough update! I'll answer inline.

On 14.12.20 16:33, Yun Gao wrote:
>      1. To include EndOfPartition into consideration for barrier alignment at the TM side, we now tend to decouple the logic for EndOfPartition with the normal alignment behaviors to avoid the complex interference (which seems to be a bit not trackable). We could do so by inserting suitable barriers for input channels received but not processed EndOfPartition. For example, if a task with four inputs has received barrier 2 from two input channels, but the other two inputs do not received barrier 2  before EndOfPartition due to the precedent tasks are finished, we could then insert barrier 2 for the last two channels so that we could still finish the checkpoint 2.

You mean we would insert "artificial" barriers for barrier 2 in case we 
receive  EndOfPartition while other inputs have already received barrier 
2? I think that makes sense, yes.

>      2. As we have discussed, if a tasks finished during we triggering the tasks, it would cause checkpoint failure and we should re-trigger its descendants. But if possible we think we might skip this issue at the first version to reduce the implementation complexity since it should not affect the correctness. We could considering support it in the following versions.

I think this should be completely fine.

>      3. We would have to add a field isFinished  to OperatorState so that we could not re-run finished sources after failover. However, this would require a new version of checkpoint meta. Currently Flink have an abstract MetaV2V3SerializerBase and have V2 and V3 extends it to share some implementation. To add V4 which is only different from V3 for one field, the current PoC want to introduce a new MetaV3V4SerializerBase extends MetaV2V3SerializerBase to share implementation between V3 and V4. This might looks a little complex and we might need a general mechanism to extend checkpoint meta format.

This indeed seems complex. Maybe we could switch to using composition 
instead of inheritance to make this more extensible?

>      4. With the change StreamTask would have two types of subclasses according to how to implement triggerCheckpoint, one is source tasks that perform checkpoints immediately and another is the non-source tasks that would notify CheckpointBarrierHandler in some way. However, since we have multiple source tasks (legacy and new source) and multiple non-source tasks (one-input, two-input, multiple-input), it would cause the cases that multiple subclasses share the same implementation and  cause code repetition. Currently the PoC introduces a new level of abstraction, namely SourceStreamTasks and NonSourceStreamTasks, but what makes it more complicated is that StreamingIterationHead extends OneInputStreamTask but it need to perform checkpoint as source tasks.

Don't we currently have the same problem? Even right now source tasks 
and non-source tasks behave differently when it comes to checkpoints. 
Are you saying we should fix that or would the new work introduce even 
more duplicate code?




--

Arvid Heise | Senior Java Developer


Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng   

Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

Aljoscha Krettek
In reply to this post by Yun Gao
This is somewhat unrelated to the discussion about how to actually do
the triggering when sources shut down, I'll write on that separately. I
just wanted to get this quick thought out.

For letting operators decide whether they actually want to wait for a
final checkpoint, which is relevant at least for Async I/O and
potentially for sinks.

We could introduce an interface, sth like `RequiresFinalization` or
`FinalizationListener` (all bad names). The operator itself knows when
it is ready to completely shut down, Async I/O would wait for all
requests, sink would potentially wait for a given number of checkpoints.  
The interface would have a method like `isFinalized()` that the
framework can call after each checkpoint (and potentially at other
points)

This way we would decouple that logic from things that don't actually
need it. What do you think?

Best,
Aljoscha
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

Arvid Heise-3
In reply to this post by Yun Gao
We could introduce an interface, sth like `RequiresFinalization` or
`FinalizationListener` (all bad names). The operator itself knows when
it is ready to completely shut down, Async I/O would wait for all
requests, sink would potentially wait for a given number of checkpoints. 
The interface would have a method like `isFinalized()` that the
framework can call after each checkpoint (and potentially at other
points)

I think we are mixing two different things here that may require different solutions:
1. Tasks (=sink) that may need to do something with the final checkpoint.
2. Tasks that only finish after having finished operations that do not depend on data flow (async I/O, but I could also think of some timer actions in process functions).

Your proposal would help most for the first case. The second case can solved entirely with current methods without being especially complicated:
- EOP is only emitted once Async I/O is done with all background tasks
- All timers are fired in a process function (I think we rather want to fire immediately on EOP but that's a different discussion)
The advantage of this approach over your idea is that you don't need to wait for a checkpoint to complete to check for finalization.

Now let's look at the first case. I see two alternatives:
- The new sink interface implicitly incorporates this listener. Since I don't see a use case outside sinks, we could simply add this method to the new sink interface.
- We implicitly assume that a sink is done after having a successful checkpoint at the end. Then we just need a tag interface `RequiresFinalization`. It also feels like we should add the property `final` to checkpoint options to help the sink detect that this is the last checkpoint to be taken. We could also try to always have the final checkpoint without tag interface on new sinks...

On Thu, Jan 7, 2021 at 11:58 AM Aljoscha Krettek <[hidden email]> wrote:
This is somewhat unrelated to the discussion about how to actually do
the triggering when sources shut down, I'll write on that separately. I
just wanted to get this quick thought out.

For letting operators decide whether they actually want to wait for a
final checkpoint, which is relevant at least for Async I/O and
potentially for sinks.

We could introduce an interface, sth like `RequiresFinalization` or
`FinalizationListener` (all bad names). The operator itself knows when
it is ready to completely shut down, Async I/O would wait for all
requests, sink would potentially wait for a given number of checkpoints. 
The interface would have a method like `isFinalized()` that the
framework can call after each checkpoint (and potentially at other
points)

This way we would decouple that logic from things that don't actually
need it. What do you think?

Best,
Aljoscha


--

Arvid Heise | Senior Java Developer


Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng   
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

r_khachatryan
Thanks for starting this discussion (and sorry for probably duplicated questions, I couldn't find them answered in FLIP or this thread).

1. Option 1 is said to be not preferable because it wastes resources and adds complexity (new event). 
However, the resources would be wasted for a relatively short time until the job finishes completely. 
And compared to other options, complexity seems much lower. Or are differences in task completion times so huge and so common?

2. I think it would be helpful to describe how is rescaling handled in Options 2 and 3 (or maybe it's not supported for jobs about to finish).

3. Option 3 assumes that the state of a finished task is not used. That's true for operator state, but what about channel state (captured by unaligned checkpoint)? I think it still has to be sent downstream which invalidates this Option.

Regards,
Roman


On Thu, Jan 7, 2021 at 1:21 PM Arvid Heise <[hidden email]> wrote:
We could introduce an interface, sth like `RequiresFinalization` or
`FinalizationListener` (all bad names). The operator itself knows when
it is ready to completely shut down, Async I/O would wait for all
requests, sink would potentially wait for a given number of checkpoints. 
The interface would have a method like `isFinalized()` that the
framework can call after each checkpoint (and potentially at other
points)

I think we are mixing two different things here that may require different solutions:
1. Tasks (=sink) that may need to do something with the final checkpoint.
2. Tasks that only finish after having finished operations that do not depend on data flow (async I/O, but I could also think of some timer actions in process functions).

Your proposal would help most for the first case. The second case can solved entirely with current methods without being especially complicated:
- EOP is only emitted once Async I/O is done with all background tasks
- All timers are fired in a process function (I think we rather want to fire immediately on EOP but that's a different discussion)
The advantage of this approach over your idea is that you don't need to wait for a checkpoint to complete to check for finalization.

Now let's look at the first case. I see two alternatives:
- The new sink interface implicitly incorporates this listener. Since I don't see a use case outside sinks, we could simply add this method to the new sink interface.
- We implicitly assume that a sink is done after having a successful checkpoint at the end. Then we just need a tag interface `RequiresFinalization`. It also feels like we should add the property `final` to checkpoint options to help the sink detect that this is the last checkpoint to be taken. We could also try to always have the final checkpoint without tag interface on new sinks...

On Thu, Jan 7, 2021 at 11:58 AM Aljoscha Krettek <[hidden email]> wrote:
This is somewhat unrelated to the discussion about how to actually do
the triggering when sources shut down, I'll write on that separately. I
just wanted to get this quick thought out.

For letting operators decide whether they actually want to wait for a
final checkpoint, which is relevant at least for Async I/O and
potentially for sinks.

We could introduce an interface, sth like `RequiresFinalization` or
`FinalizationListener` (all bad names). The operator itself knows when
it is ready to completely shut down, Async I/O would wait for all
requests, sink would potentially wait for a given number of checkpoints. 
The interface would have a method like `isFinalized()` that the
framework can call after each checkpoint (and potentially at other
points)

This way we would decouple that logic from things that don't actually
need it. What do you think?

Best,
Aljoscha


--

Arvid Heise | Senior Java Developer


Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng   
Reply | Threaded
Open this post in threaded view
|

Re: Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

Yun Gao
Hi Roman,

   Very thanks for the feedbacks! I'll try to answer the issues inline:

> 1. Option 1 is said to be not preferable because it wastes resources and adds complexity (new event). 
> However, the resources would be wasted for a relatively short time until the job finishes completely. 
> And compared to other options, complexity seems much lower. Or are differences in task completion times so huge and so common?

There might be mixed jobs with both bounded sources and unbounded sources, in this case, the resource for the finished 
part of the job would not be able to be released.

And the Option 1 also complicates the semantics of the EndOfPartition, since if we holding the tasks and we still need to
notify the following tasks about all records are sent, we would have to introduce some kind of pre-EndOfPartition messages, 
which is similar to the current EndOfPartition, but do not cause the channels to be released.

> 2. I think it would be helpful to describe how is rescaling handled in Options 2 and 3 (or maybe it's not supported for jobs about to finish).

For Option 2 and 3 we managed the states via the unit of operator, thus the process of rescaling would be the same with the normal checkpoint.
For example, support one operator resides in a tasks with parallelism 4, if 2 fo the subtasks are finished, now the state of the operator is composed 
of the state of the 2 remaining subtask instance, if we rescale to 5 after failover, the state of the 2 previous remaining subtasks would be re-distributed 
to the 5 new subtasks after failover. 

If before failover all the 4 subtasks are finished, the operator would be marked as finished, after failover the operator would be still marked as finished, 
and all the subtask instance of this operator would skip all the methods like open(), endOfInput(), close() and would be excluded when taking checkpoints
after failover.


> 3. Option 3 assumes that the state of a finished task is not used. That's true for operator state, but what about channel state (captured by unaligned checkpoint)?
> I think it still has to be sent downstream which invalidates this Option.

For unaligned checkpoint, if in one checkpoint a subtask is marked as finished, then its descandent tasks would wait all the records are received
from the finished tasks before taking checkpoint, thus in this case we would not have result partition state, but only have channel state for the 
downstream tasks that are still running.

In detail, support we have a job with the graph A -> B -> C, support in one checkpoint A has reported FINISHED, CheckpointCoordinator would 
choose B as the new "source" to trigger checkpoint via RPC. For task B, if it received checkpoint trigger, it would know that all its precedant tasks
are finished, then it would wait till all the InputChannel received EndOfPartition from the network (namely inputChannel.onBuffer() is called with 
EndOfPartition) and then taking snapshot for the input channels, as the normal unaligned checkpoints does for the InputChannel side. Then
we would be able to ensure the finished tasks always have an empty state.

I'll also optimize the FLIP to make it more clear~

Best,
 Yun


------------------Original Mail ------------------
Sender:Khachatryan Roman <[hidden email]>
Send Date:Thu Jan 7 21:55:52 2021
Recipients:Arvid Heise <[hidden email]>
Subject:Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished
Thanks for starting this discussion (and sorry for probably duplicated questions, I couldn't find them answered in FLIP or this thread).

1. Option 1 is said to be not preferable because it wastes resources and adds complexity (new event). 
However, the resources would be wasted for a relatively short time until the job finishes completely. 
And compared to other options, complexity seems much lower. Or are differences in task completion times so huge and so common?

2. I think it would be helpful to describe how is rescaling handled in Options 2 and 3 (or maybe it's not supported for jobs about to finish).

3. Option 3 assumes that the state of a finished task is not used. That's true for operator state, but what about channel state (captured by unaligned checkpoint)? I think it still has to be sent downstream which invalidates this Option.

Regards,
Roman


On Thu, Jan 7, 2021 at 1:21 PM Arvid Heise <[hidden email]> wrote:
We could introduce an interface, sth like `RequiresFinalization` or
`FinalizationListener` (all bad names). The operator itself knows when
it is ready to completely shut down, Async I/O would wait for all
requests, sink would potentially wait for a given number of checkpoints. 
The interface would have a method like `isFinalized()` that the
framework can call after each checkpoint (and potentially at other
points)

I think we are mixing two different things here that may require different solutions:
1. Tasks (=sink) that may need to do something with the final checkpoint.
2. Tasks that only finish after having finished operations that do not depend on data flow (async I/O, but I could also think of some timer actions in process functions).

Your proposal would help most for the first case. The second case can solved entirely with current methods without being especially complicated:
- EOP is only emitted once Async I/O is done with all background tasks
- All timers are fired in a process function (I think we rather want to fire immediately on EOP but that's a different discussion)
The advantage of this approach over your idea is that you don't need to wait for a checkpoint to complete to check for finalization.

Now let's look at the first case. I see two alternatives:
- The new sink interface implicitly incorporates this listener. Since I don't see a use case outside sinks, we could simply add this method to the new sink interface.
- We implicitly assume that a sink is done after having a successful checkpoint at the end. Then we just need a tag interface `RequiresFinalization`. It also feels like we should add the property `final` to checkpoint options to help the sink detect that this is the last checkpoint to be taken. We could also try to always have the final checkpoint without tag interface on new sinks...

On Thu, Jan 7, 2021 at 11:58 AM Aljoscha Krettek <[hidden email]> wrote:
This is somewhat unrelated to the discussion about how to actually do
the triggering when sources shut down, I'll write on that separately. I
just wanted to get this quick thought out.

For letting operators decide whether they actually want to wait for a
final checkpoint, which is relevant at least for Async I/O and
potentially for sinks.

We could introduce an interface, sth like `RequiresFinalization` or
`FinalizationListener` (all bad names). The operator itself knows when
it is ready to completely shut down, Async I/O would wait for all
requests, sink would potentially wait for a given number of checkpoints. 
The interface would have a method like `isFinalized()` that the
framework can call after each checkpoint (and potentially at other
points)

This way we would decouple that logic from things that don't actually
need it. What do you think?

Best,
Aljoscha


--

Arvid Heise | Senior Java Developer


Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng   
Reply | Threaded
Open this post in threaded view
|

Re: Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

r_khachatryan
Thanks a lot for your answers Yun,

> In detail, support we have a job with the graph A -> B -> C, support in one checkpoint A has reported FINISHED, CheckpointCoordinator would
> choose B as the new "source" to trigger checkpoint via RPC. For task B, if it received checkpoint trigger, it would know that all its precedant tasks
> are finished, then it would wait till all the InputChannel received EndOfPartition from the network (namely inputChannel.onBuffer() is called with
> EndOfPartition) and then taking snapshot for the input channels, as the normal unaligned checkpoints does for the InputChannel side. Then
> we would be able to ensure the finished tasks always have an empty state.

Probably it would be simpler to just decline the RPC-triggered checkpoint if not all inputs of this task are finished (with CHECKPOINT_DECLINED_TASK_NOT_READY).

But I wonder how significantly this waiting for EoP from every input will delay performing the first checkpoint by B after becoming a new source. This may in turn impact exactly-once sinks and incremental checkpoints.
Maybe a better option would be to postpone JM notification from source until it's EoP is consumed?

Regards,
Roman


On Thu, Jan 7, 2021 at 5:01 PM Yun Gao <[hidden email]> wrote:
Hi Roman,

   Very thanks for the feedbacks! I'll try to answer the issues inline:

> 1. Option 1 is said to be not preferable because it wastes resources and adds complexity (new event). 
> However, the resources would be wasted for a relatively short time until the job finishes completely. 
> And compared to other options, complexity seems much lower. Or are differences in task completion times so huge and so common?

There might be mixed jobs with both bounded sources and unbounded sources, in this case, the resource for the finished 
part of the job would not be able to be released.

And the Option 1 also complicates the semantics of the EndOfPartition, since if we holding the tasks and we still need to
notify the following tasks about all records are sent, we would have to introduce some kind of pre-EndOfPartition messages, 
which is similar to the current EndOfPartition, but do not cause the channels to be released.

> 2. I think it would be helpful to describe how is rescaling handled in Options 2 and 3 (or maybe it's not supported for jobs about to finish).

For Option 2 and 3 we managed the states via the unit of operator, thus the process of rescaling would be the same with the normal checkpoint.
For example, support one operator resides in a tasks with parallelism 4, if 2 fo the subtasks are finished, now the state of the operator is composed 
of the state of the 2 remaining subtask instance, if we rescale to 5 after failover, the state of the 2 previous remaining subtasks would be re-distributed 
to the 5 new subtasks after failover. 

If before failover all the 4 subtasks are finished, the operator would be marked as finished, after failover the operator would be still marked as finished, 
and all the subtask instance of this operator would skip all the methods like open(), endOfInput(), close() and would be excluded when taking checkpoints
after failover.


> 3. Option 3 assumes that the state of a finished task is not used. That's true for operator state, but what about channel state (captured by unaligned checkpoint)?
> I think it still has to be sent downstream which invalidates this Option.

For unaligned checkpoint, if in one checkpoint a subtask is marked as finished, then its descandent tasks would wait all the records are received
from the finished tasks before taking checkpoint, thus in this case we would not have result partition state, but only have channel state for the 
downstream tasks that are still running.

In detail, support we have a job with the graph A -> B -> C, support in one checkpoint A has reported FINISHED, CheckpointCoordinator would 
choose B as the new "source" to trigger checkpoint via RPC. For task B, if it received checkpoint trigger, it would know that all its precedant tasks
are finished, then it would wait till all the InputChannel received EndOfPartition from the network (namely inputChannel.onBuffer() is called with 
EndOfPartition) and then taking snapshot for the input channels, as the normal unaligned checkpoints does for the InputChannel side. Then
we would be able to ensure the finished tasks always have an empty state.

I'll also optimize the FLIP to make it more clear~

Best,
 Yun


------------------Original Mail ------------------
Sender:Khachatryan Roman <[hidden email]>
Send Date:Thu Jan 7 21:55:52 2021
Recipients:Arvid Heise <[hidden email]>
Subject:Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished
Thanks for starting this discussion (and sorry for probably duplicated questions, I couldn't find them answered in FLIP or this thread).

1. Option 1 is said to be not preferable because it wastes resources and adds complexity (new event). 
However, the resources would be wasted for a relatively short time until the job finishes completely. 
And compared to other options, complexity seems much lower. Or are differences in task completion times so huge and so common?

2. I think it would be helpful to describe how is rescaling handled in Options 2 and 3 (or maybe it's not supported for jobs about to finish).

3. Option 3 assumes that the state of a finished task is not used. That's true for operator state, but what about channel state (captured by unaligned checkpoint)? I think it still has to be sent downstream which invalidates this Option.

Regards,
Roman


On Thu, Jan 7, 2021 at 1:21 PM Arvid Heise <[hidden email]> wrote:
We could introduce an interface, sth like `RequiresFinalization` or
`FinalizationListener` (all bad names). The operator itself knows when
it is ready to completely shut down, Async I/O would wait for all
requests, sink would potentially wait for a given number of checkpoints. 
The interface would have a method like `isFinalized()` that the
framework can call after each checkpoint (and potentially at other
points)

I think we are mixing two different things here that may require different solutions:
1. Tasks (=sink) that may need to do something with the final checkpoint.
2. Tasks that only finish after having finished operations that do not depend on data flow (async I/O, but I could also think of some timer actions in process functions).

Your proposal would help most for the first case. The second case can solved entirely with current methods without being especially complicated:
- EOP is only emitted once Async I/O is done with all background tasks
- All timers are fired in a process function (I think we rather want to fire immediately on EOP but that's a different discussion)
The advantage of this approach over your idea is that you don't need to wait for a checkpoint to complete to check for finalization.

Now let's look at the first case. I see two alternatives:
- The new sink interface implicitly incorporates this listener. Since I don't see a use case outside sinks, we could simply add this method to the new sink interface.
- We implicitly assume that a sink is done after having a successful checkpoint at the end. Then we just need a tag interface `RequiresFinalization`. It also feels like we should add the property `final` to checkpoint options to help the sink detect that this is the last checkpoint to be taken. We could also try to always have the final checkpoint without tag interface on new sinks...

On Thu, Jan 7, 2021 at 11:58 AM Aljoscha Krettek <[hidden email]> wrote:
This is somewhat unrelated to the discussion about how to actually do
the triggering when sources shut down, I'll write on that separately. I
just wanted to get this quick thought out.

For letting operators decide whether they actually want to wait for a
final checkpoint, which is relevant at least for Async I/O and
potentially for sinks.

We could introduce an interface, sth like `RequiresFinalization` or
`FinalizationListener` (all bad names). The operator itself knows when
it is ready to completely shut down, Async I/O would wait for all
requests, sink would potentially wait for a given number of checkpoints. 
The interface would have a method like `isFinalized()` that the
framework can call after each checkpoint (and potentially at other
points)

This way we would decouple that logic from things that don't actually
need it. What do you think?

Best,
Aljoscha


--

Arvid Heise | Senior Java Developer


Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng   
Reply | Threaded
Open this post in threaded view
|

Re: Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

Yun Gao
      Hi Roman,

          Very thanks for the feedbacks !
         
          
        > Probably it would be simpler to just decline the RPC-triggered checkpoint 
        > if not all inputs of this task are finished (with CHECKPOINT_DECLINED_TASK_NOT_READY).

        > But I wonder how significantly this waiting for EoP from every input will delay performing the first checkpoint 
        > by B after becoming a new source. This may in turn impact exactly-once sinks and incremental checkpoints.
        > Maybe a better option would be to postpone JM notification from source until it's EoP is consumed?

       I also agree with that there would indeed be possible cases that the checkpoint get slower since it could not skip
       the data in  the result partition of the finished upstream task:
            a) For aligned checkpoint, the cases would not happen since the downstream tasks would always need to 
                process the buffers in order. 
           b)  With unaligned checkpoint enabled, the slower cases might happen if the downstream task processes very 
                slowly. 

       But since only the result partition part of the finished upstream need wait to be processed, the other part of 
       the execution graph could  still perform the unaligned checkpoint normally, I think the average delay caused would 
       be much lower than the completely aligned checkpoint, but there would still be extremely bad cases that
       the delay is long. 

       Declining the RPC-trigger checkpoint would indeed simplify the implementation, but since currently by default the
       failed checkpoint would cause job failover, thus we might have some concerns in directly decline the checkpoint.
       For postpone the notification the JM notification, since current JM should not be able to know if the task has 
       received all the EndOfPartition from the upstream tasks, we might need to introduce new RPC for notifying the 
       state and since the triggering is not atomic, we may also met with some  synchronization issues between JM and TM, 
       which would introduce some complexity. 
 
      Thus another possible option might be let the upstream task to wait till all the pending buffers in the result partition has
      been flushed before get to finish. We could only do the wait for the PipelineResultPartition so it won't affect the batch
      jobs. With the waiting the unaligned checkpoint could continue to trigger the upstream task and skip the buffers in
      the result partition. Since the result partition state would be kept within the operator state of the last operator, after failover
      we would found that the last operator has an non-empty state and we would restart the tasks containing this operator to 
      resend the snapshotted buffers. Of course this would also introduce some complexity, and since the probability of long delay 
      would be lower than the completely aligned case, do you think it would be ok for us to view it as an optimization and 
      postpone it to future versions ? 

     Best,
     Yun
      
     

------------------------------------------------------------------
From:Khachatryan Roman <[hidden email]>
Send Time:2021 Jan. 11 (Mon.) 05:46
To:Yun Gao <[hidden email]>
Cc:Arvid Heise <[hidden email]>; dev <[hidden email]>; user <[hidden email]>
Subject:Re: Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

Thanks a lot for your answers Yun,

> In detail, support we have a job with the graph A -> B -> C, support in one checkpoint A has reported FINISHED, CheckpointCoordinator would
> choose B as the new "source" to trigger checkpoint via RPC. For task B, if it received checkpoint trigger, it would know that all its precedant tasks
> are finished, then it would wait till all the InputChannel received EndOfPartition from the network (namely inputChannel.onBuffer() is called with
> EndOfPartition) and then taking snapshot for the input channels, as the normal unaligned checkpoints does for the InputChannel side. Then
> we would be able to ensure the finished tasks always have an empty state.

Probably it would be simpler to just decline the RPC-triggered checkpoint if not all inputs of this task are finished (with CHECKPOINT_DECLINED_TASK_NOT_READY).

But I wonder how significantly this waiting for EoP from every input will delay performing the first checkpoint by B after becoming a new source. This may in turn impact exactly-once sinks and incremental checkpoints.
Maybe a better option would be to postpone JM notification from source until it's EoP is consumed?

Regards,
Roman

12