[DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

classic Classic list List threaded Threaded
24 messages Options
12
Reply | Threaded
Open this post in threaded view
|

Re: Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

r_khachatryan
Hi Yun,

> b)  With unaligned checkpoint enabled, the slower cases might happen if the downstream task processes very slowly.
I think UC will be the common case with multiple sources each with DoP > 1.
IIUC, waiting for EoP will be needed on each subtask each time one of it's source subtask finishes.

> But since only the result partition part of the finished upstream need wait to be processed, the other part of
> the execution graph could  still perform the unaligned checkpoint normally
Yes, but checkpoint completion notification will not be sent until all the EOPs are processed.

> Declining the RPC-trigger checkpoint would indeed simplify the implementation, but since currently by default the
> failed checkpoint would cause job failover, thus we might have some concerns in directly decline the checkpoint.
Not all declines cause job failure, particularly CHECKPOINT_DECLINED_TASK_NOT_READY doesn't.

> Thus another possible option might be let the upstream task to wait till all the pending buffers in the result partition has been flushed before get to finish.
This is what I meant by "postpone JM notification from source". Just blocking the task thread wouldn't add much complexity, though I'm not sure if it would cause any problems.

> do you think it would be ok for us to view it as an optimization and postpone it to future versions ?
I think that's a good idea.

Regards,
Roman


On Mon, Jan 11, 2021 at 11:03 AM Yun Gao <[hidden email]> wrote:
      Hi Roman,

          Very thanks for the feedbacks !
         
          
        > Probably it would be simpler to just decline the RPC-triggered checkpoint 
        > if not all inputs of this task are finished (with CHECKPOINT_DECLINED_TASK_NOT_READY).

        > But I wonder how significantly this waiting for EoP from every input will delay performing the first checkpoint 
        > by B after becoming a new source. This may in turn impact exactly-once sinks and incremental checkpoints.
        > Maybe a better option would be to postpone JM notification from source until it's EoP is consumed?

       I also agree with that there would indeed be possible cases that the checkpoint get slower since it could not skip
       the data in  the result partition of the finished upstream task:
            a) For aligned checkpoint, the cases would not happen since the downstream tasks would always need to 
                process the buffers in order. 
           b)  With unaligned checkpoint enabled, the slower cases might happen if the downstream task processes very 
                slowly. 

       But since only the result partition part of the finished upstream need wait to be processed, the other part of 
       the execution graph could  still perform the unaligned checkpoint normally, I think the average delay caused would 
       be much lower than the completely aligned checkpoint, but there would still be extremely bad cases that
       the delay is long. 

       Declining the RPC-trigger checkpoint would indeed simplify the implementation, but since currently by default the
       failed checkpoint would cause job failover, thus we might have some concerns in directly decline the checkpoint.
       For postpone the notification the JM notification, since current JM should not be able to know if the task has 
       received all the EndOfPartition from the upstream tasks, we might need to introduce new RPC for notifying the 
       state and since the triggering is not atomic, we may also met with some  synchronization issues between JM and TM, 
       which would introduce some complexity. 
 
      Thus another possible option might be let the upstream task to wait till all the pending buffers in the result partition has
      been flushed before get to finish. We could only do the wait for the PipelineResultPartition so it won't affect the batch
      jobs. With the waiting the unaligned checkpoint could continue to trigger the upstream task and skip the buffers in
      the result partition. Since the result partition state would be kept within the operator state of the last operator, after failover
      we would found that the last operator has an non-empty state and we would restart the tasks containing this operator to 
      resend the snapshotted buffers. Of course this would also introduce some complexity, and since the probability of long delay 
      would be lower than the completely aligned case, do you think it would be ok for us to view it as an optimization and 
      postpone it to future versions ? 

     Best,
     Yun
      
     

------------------------------------------------------------------
From:Khachatryan Roman <[hidden email]>
Send Time:2021 Jan. 11 (Mon.) 05:46
To:Yun Gao <[hidden email]>
Cc:Arvid Heise <[hidden email]>; dev <[hidden email]>; user <[hidden email]>
Subject:Re: Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

Thanks a lot for your answers Yun,

> In detail, support we have a job with the graph A -> B -> C, support in one checkpoint A has reported FINISHED, CheckpointCoordinator would
> choose B as the new "source" to trigger checkpoint via RPC. For task B, if it received checkpoint trigger, it would know that all its precedant tasks
> are finished, then it would wait till all the InputChannel received EndOfPartition from the network (namely inputChannel.onBuffer() is called with
> EndOfPartition) and then taking snapshot for the input channels, as the normal unaligned checkpoints does for the InputChannel side. Then
> we would be able to ensure the finished tasks always have an empty state.

Probably it would be simpler to just decline the RPC-triggered checkpoint if not all inputs of this task are finished (with CHECKPOINT_DECLINED_TASK_NOT_READY).

But I wonder how significantly this waiting for EoP from every input will delay performing the first checkpoint by B after becoming a new source. This may in turn impact exactly-once sinks and incremental checkpoints.
Maybe a better option would be to postpone JM notification from source until it's EoP is consumed?

Regards,
Roman

Reply | Threaded
Open this post in threaded view
|

Re: Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

Yun Gao
In reply to this post by Yun Gao
    Hi Roman, 

        Very thanks for the feedbacks and suggestions!

        > I think UC will be the common case with multiple sources each with DoP > 1.
        > IIUC, waiting for EoP will be needed on each subtask each time one of it's source subtask finishes.
       
        Yes, waiting for EoP would be required for each input channel if we do not blocking the upstream
finished task specially. 

       > Yes, but checkpoint completion notification will not be sent until all the EOPs are processed.
     
      The downstream tasked get triggered indeed must wait for received EoPs from all the input channels,
I initially compared it with the completely aligned cases and now the remaining execution graph after the
trigger task could still taking normal unaligned checkpoint (like if A -> B -> C -> D, A get finished and B get 
triggered, then B -> C -> D could still taking normal unaligned checkpoint). But still it could not limit the 
possible max delay.

    > Not all declines cause job failure, particularly CHECKPOINT_DECLINED_TASK_NOT_READY doesn't.
    
    Sorry for mistaken the logic here and CHECKPOINT_DECLINED_TASK_NOT_READY indeed do not cause failure.
But since after a failed checkpoint we would have to wait for the checkpoint interval for the next checkpoint, I also
agree the following option would be a better one that we try to complete each checkpoint.

>> Thus another possible option might be let the upstream task to wait till all the pending buffers in the result partition has been flushed before get to finish.
> This is what I meant by "postpone JM notification from source". Just blocking the task thread wouldn't add much complexity, though I'm not sure if it would cause any problems.

>> do you think it would be ok for us to view it as an optimization and postpone it to future versions ? 
> I think that's a good idea.

 And also very sorry for here I should wrongly understand the proposals, and currently 
I also do not see explicit problems for waiting for the flush of pipeline result partition. 
Glad that we have the same viewpoints on  this issue. :) 

 Best,
  Yun


------------------------------------------------------------------
From:Khachatryan Roman <[hidden email]>
Send Time:2021 Jan. 11 (Mon.) 19:14
To:Yun Gao <[hidden email]>
Subject:Re: Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

Hi Yun,

> b)  With unaligned checkpoint enabled, the slower cases might happen if the downstream task processes very slowly.
I think UC will be the common case with multiple sources each with DoP > 1.
IIUC, waiting for EoP will be needed on each subtask each time one of it's source subtask finishes.

> But since only the result partition part of the finished upstream need wait to be processed, the other part of
> the execution graph could  still perform the unaligned checkpoint normally
Yes, but checkpoint completion notification will not be sent until all the EOPs are processed.

> Declining the RPC-trigger checkpoint would indeed simplify the implementation, but since currently by default the
> failed checkpoint would cause job failover, thus we might have some concerns in directly decline the checkpoint.
Not all declines cause job failure, particularly CHECKPOINT_DECLINED_TASK_NOT_READY doesn't.

> Thus another possible option might be let the upstream task to wait till all the pending buffers in the result partition has been flushed before get to finish.
This is what I meant by "postpone JM notification from source". Just blocking the task thread wouldn't add much complexity, though I'm not sure if it would cause any problems.

> do you think it would be ok for us to view it as an optimization and postpone it to future versions ?
I think that's a good idea.

Regards,
Roman


Reply | Threaded
Open this post in threaded view
|

Re: Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

Yun Gao
  Hi all,
       
I updated the FLIP[1] to reflect the major discussed points in the ML thread:

 1) For the "new" root tasks finished before it received trigger message, previously we proposed 
to let JM re-compute and re-trigger the descendant tasks, but after the discussion we realized that 
it might cause overhead to JobMaster on cascade finish and large parallelism cases. Another option
might be let the StreamTask do one synchronization with the CheckpointCoordinator before get finished 
to be aware of the missed pending checkpoints, since at then EndOfPartitions are not emitted yet, it
could still broadcast barriers to its descendant tasks. I updated the details in this section[2] in the
FLIP.

2) For the barrier alignment, now we change to insert faked barriers in the input channels to avoid
interference with checkpoint alignment algorithms. One remaining issue is that for unaligned checkpoint
mode we could not snapshot the upstream tasks' result partition if it have been finished. One option
to address this issue is to make the upstream tasks to wait for buffers get flushed before exit, and 
we would include this in the future versions. I updated this part in this section[3] in the FLIP.

3) Some operators like Sink Committer need to wait for one complete checkpoint before exit. To support
the operators that need to wait for some finalization condition like the Sink committer and Async I/O, we 
could introduce a new interface to mark this kind of operators, and let the runtime to wait till the operators
reached its condition. I updated this part in this section[4] in the FLIP.

Could you have another look of the FLIP and the pending issues ? Any feedbacks are warmly welcomed 
and appreciated. Very thanks!

Best,
 Yun


------------------------------------------------------------------
From:Yun Gao <[hidden email]>
Send Time:2021 Jan. 12 (Tue.) 10:30
To:Khachatryan Roman <[hidden email]>
Subject:Re: Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

    Hi Roman, 

         Very thanks for the feedbacks and suggestions!

        > I think UC will be the common case with multiple sources each with DoP > 1.
        > IIUC, waiting for EoP will be needed on each subtask each time one of it's source subtask finishes.

        Yes, waiting for EoP would be required for each input channel if we do not blocking the upstream
finished task specially. 

       > Yes, but checkpoint completion notification will not be sent until all the EOPs are processed.
      The downstream tasked get triggered indeed must wait for received EoPs from all the input channels,
I initially compared it with the completely aligned cases and now the remaining execution graph after the
trigger task could still taking normal unaligned checkpoint (like if A -> B -> C -> D, A get finished and B get 
triggered, then B -> C -> D could still taking normal unaligned checkpoint). But still it could not limit the 
possible max delay.

    > Not all declines cause job failure, particularly CHECKPOINT_DECLINED_TASK_NOT_READY doesn't.
    Sorry for mistaken the logic here and CHECKPOINT_DECLINED_TASK_NOT_READY indeed do not cause failure.
But since after a failed checkpoint we would have to wait for the checkpoint interval for the next checkpoint, I also
agree the following option would be a better one that we try to complete each checkpoint.

>> Thus another possible option might be let the upstream task to wait till all the pending buffers in the result partition has been flushed before get to finish.
> This is what I meant by "postpone JM notification from source". Just blocking the task thread wouldn't add much complexity, though I'm not sure if it would cause any problems.

>> do you think it would be ok for us to view it as an optimization and postpone it to future versions ? 
> I think that's a good idea.

 And also very sorry for here I should wrongly understand the proposals, and currently 
I also do not see explicit problems for waiting for the flush of pipeline result partition. 
Glad that we have the same viewpoints on  this issue. :) 

 Best,
  Yun



------------------------------------------------------------------
From:Khachatryan Roman <[hidden email]>
Send Time:2021 Jan. 11 (Mon.) 19:14
To:Yun Gao <[hidden email]>
Cc:dev <[hidden email]>; user <[hidden email]>
Subject:Re: Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

Hi Yun,

> b)  With unaligned checkpoint enabled, the slower cases might happen if the downstream task processes very slowly. 
I think UC will be the common case with multiple sources each with DoP > 1.
IIUC, waiting for EoP will be needed on each subtask each time one of it's source subtask finishes.

> But since only the result partition part of the finished upstream need wait to be processed, the other part of 
> the execution graph could  still perform the unaligned checkpoint normally
Yes, but checkpoint completion notification will not be sent until all the EOPs are processed.

> Declining the RPC-trigger checkpoint would indeed simplify the implementation, but since currently by default the
> failed checkpoint would cause job failover, thus we might have some concerns in directly decline the checkpoint.
Not all declines cause job failure, particularly CHECKPOINT_DECLINED_TASK_NOT_READY doesn't.

> Thus another possible option might be let the upstream task to wait till all the pending buffers in the result partition has been flushed before get to finish.
This is what I meant by "postpone JM notification from source". Just blocking the task thread wouldn't add much complexity, though I'm not sure if it would cause any problems.

> do you think it would be ok for us to view it as an optimization and postpone it to future versions ? 
I think that's a good idea.

Regards,
Roman


Reply | Threaded
Open this post in threaded view
|

Re: Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

Yun Gao
  Hi all,

We have some offline discussion together with @Arvid, @Roman and @Aljoscha and I'd 
like to post some points we discussed:

1) For the problem that the "new" root task coincidently finished before getting triggered
successfully, we have listed two options in the FLIP-147[1], for the first version, now we are not tend
to go with the first option that JM would re-compute and re-trigger new sources when it realized
some tasks are not triggered successfully. This option would avoid the complexity of adding 
new PRC and duplicating task states, and in average case it would not cause too much 
overhead.

2) For how to support operators like Sink Committer to wait for one complete checkpoint 
before exit, it would be more an issue of how to use the checkpoints after tasks finished instead 
of how to achieve checkpoint after tasks finished, thus we would like to not include this part 
first in the current discussion. We would discuss and solve this issue separately after FLIP-147 is done.

Best,
 Yun


------------------------------------------------------------------
From:Yun Gao <[hidden email]>
Send Time:2021 Jan. 13 (Wed.) 16:09
Subject:Re: Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

  Hi all,
       
I updated the FLIP[1] to reflect the major discussed points in the ML thread:

 1) For the "new" root tasks finished before it received trigger message, previously we proposed 
to let JM re-compute and re-trigger the descendant tasks, but after the discussion we realized that 
it might cause overhead to JobMaster on cascade finish and large parallelism cases. Another option
might be let the StreamTask do one synchronization with the CheckpointCoordinator before get finished 
to be aware of the missed pending checkpoints, since at then EndOfPartitions are not emitted yet, it
could still broadcast barriers to its descendant tasks. I updated the details in this section[2] in the
FLIP.

2) For the barrier alignment, now we change to insert faked barriers in the input channels to avoid
interference with checkpoint alignment algorithms. One remaining issue is that for unaligned checkpoint
mode we could not snapshot the upstream tasks' result partition if it have been finished. One option
to address this issue is to make the upstream tasks to wait for buffers get flushed before exit, and 
we would include this in the future versions. I updated this part in this section[3] in the FLIP.

3) Some operators like Sink Committer need to wait for one complete checkpoint before exit. To support
the operators that need to wait for some finalization condition like the Sink committer and Async I/O, we 
could introduce a new interface to mark this kind of operators, and let the runtime to wait till the operators
reached its condition. I updated this part in this section[4] in the FLIP.

Could you have another look of the FLIP and the pending issues ? Any feedbacks are warmly welcomed 
and appreciated. Very thanks!

Best,
 Yun


------------------------------------------------------------------
From:Yun Gao <[hidden email]>
Send Time:2021 Jan. 12 (Tue.) 10:30
To:Khachatryan Roman <[hidden email]>
Subject:Re: Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

    Hi Roman, 

         Very thanks for the feedbacks and suggestions!

        > I think UC will be the common case with multiple sources each with DoP > 1.
        > IIUC, waiting for EoP will be needed on each subtask each time one of it's source subtask finishes.

        Yes, waiting for EoP would be required for each input channel if we do not blocking the upstream
finished task specially. 

       > Yes, but checkpoint completion notification will not be sent until all the EOPs are processed.
      The downstream tasked get triggered indeed must wait for received EoPs from all the input channels,
I initially compared it with the completely aligned cases and now the remaining execution graph after the
trigger task could still taking normal unaligned checkpoint (like if A -> B -> C -> D, A get finished and B get 
triggered, then B -> C -> D could still taking normal unaligned checkpoint). But still it could not limit the 
possible max delay.

    > Not all declines cause job failure, particularly CHECKPOINT_DECLINED_TASK_NOT_READY doesn't.
    Sorry for mistaken the logic here and CHECKPOINT_DECLINED_TASK_NOT_READY indeed do not cause failure.
But since after a failed checkpoint we would have to wait for the checkpoint interval for the next checkpoint, I also
agree the following option would be a better one that we try to complete each checkpoint.

>> Thus another possible option might be let the upstream task to wait till all the pending buffers in the result partition has been flushed before get to finish.
> This is what I meant by "postpone JM notification from source". Just blocking the task thread wouldn't add much complexity, though I'm not sure if it would cause any problems.

>> do you think it would be ok for us to view it as an optimization and postpone it to future versions ? 
> I think that's a good idea.

 And also very sorry for here I should wrongly understand the proposals, and currently 
I also do not see explicit problems for waiting for the flush of pipeline result partition. 
Glad that we have the same viewpoints on  this issue. :) 

 Best,
  Yun



------------------------------------------------------------------
From:Khachatryan Roman <[hidden email]>
Send Time:2021 Jan. 11 (Mon.) 19:14
To:Yun Gao <[hidden email]>
Cc:dev <[hidden email]>; user <[hidden email]>
Subject:Re: Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

Hi Yun,

> b)  With unaligned checkpoint enabled, the slower cases might happen if the downstream task processes very slowly. 
I think UC will be the common case with multiple sources each with DoP > 1.
IIUC, waiting for EoP will be needed on each subtask each time one of it's source subtask finishes.

> But since only the result partition part of the finished upstream need wait to be processed, the other part of 
> the execution graph could  still perform the unaligned checkpoint normally
Yes, but checkpoint completion notification will not be sent until all the EOPs are processed.

> Declining the RPC-trigger checkpoint would indeed simplify the implementation, but since currently by default the
> failed checkpoint would cause job failover, thus we might have some concerns in directly decline the checkpoint.
Not all declines cause job failure, particularly CHECKPOINT_DECLINED_TASK_NOT_READY doesn't.

> Thus another possible option might be let the upstream task to wait till all the pending buffers in the result partition has been flushed before get to finish.
This is what I meant by "postpone JM notification from source". Just blocking the task thread wouldn't add much complexity, though I'm not sure if it would cause any problems.

> do you think it would be ok for us to view it as an optimization and postpone it to future versions ? 
I think that's a good idea.

Regards,
Roman



12