Which test cluster to use for checkpointing tests?

classic Classic list List threaded Threaded
10 messages Options
Reply | Threaded
Open this post in threaded view
|

Which test cluster to use for checkpointing tests?

Ken Krugler
Hi all,

For testing checkpointing, is it possible to use LocalFlinkMiniCluster?

Asking because I’m not seeing checkpoint calls being made to my custom function (implements ListCheckpointed) when I’m running with LocalFlinkMiniCluster.

Though I do see entries like this logged:

18/02/23 12:40:50 INFO jobmanager.JobManager:246 - Using application-defined state backend for checkpoint/savepoint metadata: MemoryStateBackend (data in heap memory / checkpoints to JobManager).
18/02/23 12:40:50 INFO checkpoint.CheckpointCoordinator:525 - Checkpoint triggering task Source: Seed urls source (1/2) is not being executed at the moment. Aborting checkpoint.

But when I browse the Flink source, tests for checkpointing seem to be using TestCluster, e.g. in ResumeCheckpointManuallyITCase

Thanks,

— Ken

--------------------------------------------
http://about.me/kkrugler
+1 530-210-6378

Reply | Threaded
Open this post in threaded view
|

Re: Which test cluster to use for checkpointing tests?

Nico Kruber
Hi Ken,
LocalFlinkMiniCluster should run checkpoints just fine. It looks like it
was attempting to even create one but could not finish. Maybe your
program was not fully running yet?
Can you tell us a little bit more about your set up and how you
configured the LocalFlinkMiniCluster?


Nico

On 23/02/18 21:42, Ken Krugler wrote:

> Hi all,
>
> For testing checkpointing, is it possible to use LocalFlinkMiniCluster?
>
> Asking because I’m not seeing checkpoint calls being made to my custom function (implements ListCheckpointed) when I’m running with LocalFlinkMiniCluster.
>
> Though I do see entries like this logged:
>
> 18/02/23 12:40:50 INFO jobmanager.JobManager:246 - Using application-defined state backend for checkpoint/savepoint metadata: MemoryStateBackend (data in heap memory / checkpoints to JobManager).
> 18/02/23 12:40:50 INFO checkpoint.CheckpointCoordinator:525 - Checkpoint triggering task Source: Seed urls source (1/2) is not being executed at the moment. Aborting checkpoint.
>
> But when I browse the Flink source, tests for checkpointing seem to be using TestCluster, e.g. in ResumeCheckpointManuallyITCase
>
> Thanks,
>
> — Ken
>
> --------------------------------------------
> http://about.me/kkrugler
> +1 530-210-6378
>


signature.asc (201 bytes) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: Which test cluster to use for checkpointing tests?

Ken Krugler
Hi Nico,

On Feb 26, 2018, at 9:41 AM, Nico Kruber <[hidden email]> wrote:

Hi Ken,
LocalFlinkMiniCluster should run checkpoints just fine. It looks like it
was attempting to even create one but could not finish. Maybe your
program was not fully running yet?

In the logs I see:

18/02/23 12:40:50 INFO taskmanager.Task:957 - Source: Seed urls source (1/2) (56fdede2f4783455b4ab8f290e700baa) switched from DEPLOYING to RUNNING.
18/02/23 12:40:50 DEBUG tasks.StreamTask:214 - Initializing Source: Seed urls source (1/2).
18/02/23 12:40:50 INFO checkpoint.CheckpointCoordinator:525 - Checkpoint triggering task Source: Seed urls source (1/2) is not being executed at the moment. Aborting checkpoint.

Maybe the checkpoint here is happening too soon after the “Initializing Source” message.

After that the source is done (it only triggers the iteration with a single starting tuple), so I wouldn’t expect checkpointing to actually do anything. I was just using these messages as indications that I had configured my workflow properly to actually do checkpointing.

Can you tell us a little bit more about your set up and how you
configured the LocalFlinkMiniCluster?

Potential issue #1 - I’ve got a workflow with multiple iterations.

For that reason I had to force checkpointing via:

        env.setStateBackend(new MemoryStateBackend());
env.enableCheckpointing(100L, CheckpointingMode.AT_LEAST_ONCE, true);


Potential issue #2 - because of the fun with tracking iteration progress, I subclass LocalStreamEnvironment to add this async execution method:

public JobSubmissionResult executeAsync(String jobName) throws Exception {
// transform the streaming program into a JobGraph
StreamGraph streamGraph = getStreamGraph();
streamGraph.setJobName(jobName);

JobGraph jobGraph = streamGraph.getJobGraph();

Configuration configuration = new Configuration();
configuration.addAll(jobGraph.getJobConfiguration());

configuration.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, -1L);
configuration.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, jobGraph.getMaximumParallelism());

// add (and override) the settings with what the user defined
configuration.addAll(_conf);

_exec = new LocalFlinkMiniCluster(configuration, true);
_exec.start(true);


// The above code is all basically the same as Flink's LocalStreamEnvironment.
// The change is that here we call submitJobDetached vs. submitJobAndWait.
// We assume that eventually someone calls stop(job id), which then terminates
// the LocalFlinkMinimCluster.
return _exec.submitJobDetached(jobGraph);
}

However I don’t think that would impact checkpointing.

Anything else I should do to debug whether checkpointing is operating as expected? In the logs, at DEBUG level, I don’t see any errors or warnings related to this.

Thanks,

— Ken



Nico

On 23/02/18 21:42, Ken Krugler wrote:
Hi all,

For testing checkpointing, is it possible to use LocalFlinkMiniCluster?

Asking because I’m not seeing checkpoint calls being made to my custom function (implements ListCheckpointed) when I’m running with LocalFlinkMiniCluster.

Though I do see entries like this logged:

18/02/23 12:40:50 INFO jobmanager.JobManager:246 - Using application-defined state backend for checkpoint/savepoint metadata: MemoryStateBackend (data in heap memory / checkpoints to JobManager).
18/02/23 12:40:50 INFO checkpoint.CheckpointCoordinator:525 - Checkpoint triggering task Source: Seed urls source (1/2) is not being executed at the moment. Aborting checkpoint.

But when I browse the Flink source, tests for checkpointing seem to be using TestCluster, e.g. in ResumeCheckpointManuallyITCase

Thanks,

— Ken

--------------------------------------------
http://about.me/kkrugler
+1 530-210-6378



--------------------------------------------
+1 530-210-6378

Reply | Threaded
Open this post in threaded view
|

Re: Which test cluster to use for checkpointing tests?

Nico Kruber
I was a bit confused about when you said that the "source is done" which
is when I realized you must be using the batch API for which
checkpointing is not available / needed. Let me quote from [1] which
imho has not changed:

DataSet:

Fault tolerance for the DataSet API works by restarting the job and
redoing all of the work. [...] The periodic in-flight checkpoints are
not used here.

DataStream:

This one would start immediately inserting data (as it is a streaming
job), and draw periodic checkpoints that make sure replay-on-failure
only has to redo only a bit, not everything.


Nico

[1]
https://lists.apache.org/thread.html/3121ad01f5adf4246aa035dfb886af534b063963dee0f86d63b675a1@1447086324@%3Cuser.flink.apache.org%3E

On 26/02/18 22:55, Ken Krugler wrote:

> Hi Nico,
>
>> On Feb 26, 2018, at 9:41 AM, Nico Kruber <[hidden email]
>> <mailto:[hidden email]>> wrote:
>>
>> Hi Ken,
>> LocalFlinkMiniCluster should run checkpoints just fine. It looks like it
>> was attempting to even create one but could not finish. Maybe your
>> program was not fully running yet?
>
> In the logs I see:
>
> 18/02/23 12:40:50 INFO taskmanager.Task:957 - Source: Seed urls source
> (1/2) (56fdede2f4783455b4ab8f290e700baa) switched from DEPLOYING to RUNNING.
> 18/02/23 12:40:50 DEBUG tasks.StreamTask:214 - Initializing Source: Seed
> urls source (1/2).
> 18/02/23 12:40:50 INFO checkpoint.CheckpointCoordinator:525 - Checkpoint
> triggering task Source: Seed urls source (1/2) is not being executed at
> the moment. Aborting checkpoint.
>
> Maybe the checkpoint here is happening too soon after the “Initializing
> Source” message.
>
> After that the source is done (it only triggers the iteration with a
> single starting tuple), so I wouldn’t expect checkpointing to actually
> do anything. I was just using these messages as indications that I had
> configured my workflow properly to actually do checkpointing.
>
>> Can you tell us a little bit more about your set up and how you
>> configured the LocalFlinkMiniCluster?
>
> Potential issue #1 - I’ve got a workflow with multiple iterations.
>
> For that reason I had to force checkpointing via:
>
>         env.setStateBackend(new MemoryStateBackend());
> env.enableCheckpointing(100L, CheckpointingMode.AT_LEAST_ONCE, true);
>
>
> Potential issue #2 - because of the fun with tracking iteration
> progress, I subclass LocalStreamEnvironment to add this async execution
> method:
>
> public JobSubmissionResult executeAsync(String jobName) throws Exception {
> // transform the streaming program into a JobGraph
> StreamGraph streamGraph = getStreamGraph();
> streamGraph.setJobName(jobName);
>
> JobGraph jobGraph = streamGraph.getJobGraph();
>
> Configuration configuration = new Configuration();
> configuration.addAll(jobGraph.getJobConfiguration());
>
> configuration.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, -1L);
> configuration.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS,
> jobGraph.getMaximumParallelism());
>
> // add (and override) the settings with what the user defined
> configuration.addAll(_conf);
>
> _exec = new LocalFlinkMiniCluster(configuration, true);
> _exec.start(true);
>
>
> // The above code is all basically the same as Flink's
> LocalStreamEnvironment.
> // The change is that here we call submitJobDetached vs. submitJobAndWait.
> // We assume that eventually someone calls stop(job id), which then
> terminates
> // the LocalFlinkMinimCluster.
> return _exec.submitJobDetached(jobGraph);
> }
>
> However I don’t think that would impact checkpointing.
>
> Anything else I should do to debug whether checkpointing is operating as
> expected? In the logs, at DEBUG level, I don’t see any errors or
> warnings related to this.
>
> Thanks,
>
> — Ken
>
>>
>>
>> Nico
>>
>> On 23/02/18 21:42, Ken Krugler wrote:
>>> Hi all,
>>>
>>> For testing checkpointing, is it possible to use LocalFlinkMiniCluster?
>>>
>>> Asking because I’m not seeing checkpoint calls being made to my
>>> custom function (implements ListCheckpointed) when I’m running with
>>> LocalFlinkMiniCluster.
>>>
>>> Though I do see entries like this logged:
>>>
>>> 18/02/23 12:40:50 INFO jobmanager.JobManager:246 - Using
>>> application-defined state backend for checkpoint/savepoint metadata:
>>> MemoryStateBackend (data in heap memory / checkpoints to JobManager).
>>> 18/02/23 12:40:50 INFO checkpoint.CheckpointCoordinator:525 -
>>> Checkpoint triggering task Source: Seed urls source (1/2) is not
>>> being executed at the moment. Aborting checkpoint.
>>>
>>> But when I browse the Flink source, tests for checkpointing seem to
>>> be using TestCluster, e.g. in ResumeCheckpointManuallyITCase
>>>
>>> Thanks,
>>>
>>> — Ken
>>>
>>> --------------------------------------------
>>> http://about.me/kkrugler
>>> +1 530-210-6378
>>>
>>
>
> --------------------------------------------
> http://about.me/kkrugler
> +1 530-210-6378
>


signature.asc (201 bytes) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: Which test cluster to use for checkpointing tests?

Stephan Ewen
@Nico This has nothing to do with the DataSet API. The DataStream API supports finite programs as well.

@Ken The issue you are running into is that Checkpointing works currently only until the job reaches the point where the pipeline starts to drain out, meaning when the sources are done. In your case, the source is done immediately, sending out only one tuple.

Running checkpoints with closed sources is something that's on the feature list and will come soon...


On Wed, Feb 28, 2018 at 4:02 PM, Nico Kruber <[hidden email]> wrote:
I was a bit confused about when you said that the "source is done" which
is when I realized you must be using the batch API for which
checkpointing is not available / needed. Let me quote from [1] which
imho has not changed:

DataSet:

Fault tolerance for the DataSet API works by restarting the job and
redoing all of the work. [...] The periodic in-flight checkpoints are
not used here.

DataStream:

This one would start immediately inserting data (as it is a streaming
job), and draw periodic checkpoints that make sure replay-on-failure
only has to redo only a bit, not everything.


Nico

[1]
https://lists.apache.org/thread.html/3121ad01f5adf4246aa035dfb886af534b063963dee0f86d63b675a1@1447086324@%3Cuser.flink.apache.org%3E

On 26/02/18 22:55, Ken Krugler wrote:
> Hi Nico,
>
>> On Feb 26, 2018, at 9:41 AM, Nico Kruber <[hidden email]
>> <mailto:[hidden email]>> wrote:
>>
>> Hi Ken,
>> LocalFlinkMiniCluster should run checkpoints just fine. It looks like it
>> was attempting to even create one but could not finish. Maybe your
>> program was not fully running yet?
>
> In the logs I see:
>
> 18/02/23 12:40:50 INFO taskmanager.Task:957 - Source: Seed urls source
> (1/2) (56fdede2f4783455b4ab8f290e700baa) switched from DEPLOYING to RUNNING.
> 18/02/23 12:40:50 DEBUG tasks.StreamTask:214 - Initializing Source: Seed
> urls source (1/2).
> 18/02/23 12:40:50 INFO checkpoint.CheckpointCoordinator:525 - Checkpoint
> triggering task Source: Seed urls source (1/2) is not being executed at
> the moment. Aborting checkpoint.
>
> Maybe the checkpoint here is happening too soon after the “Initializing
> Source” message.
>
> After that the source is done (it only triggers the iteration with a
> single starting tuple), so I wouldn’t expect checkpointing to actually
> do anything. I was just using these messages as indications that I had
> configured my workflow properly to actually do checkpointing.
>
>> Can you tell us a little bit more about your set up and how you
>> configured the LocalFlinkMiniCluster?
>
> Potential issue #1 - I’ve got a workflow with multiple iterations.
>
> For that reason I had to force checkpointing via:
>
>         env.setStateBackend(new MemoryStateBackend());
> env.enableCheckpointing(100L, CheckpointingMode.AT_LEAST_ONCE, true);
>
>
> Potential issue #2 - because of the fun with tracking iteration
> progress, I subclass LocalStreamEnvironment to add this async execution
> method:
>
> public JobSubmissionResult executeAsync(String jobName) throws Exception {
> // transform the streaming program into a JobGraph
> StreamGraph streamGraph = getStreamGraph();
> streamGraph.setJobName(jobName);
>
> JobGraph jobGraph = streamGraph.getJobGraph();
>
> Configuration configuration = new Configuration();
> configuration.addAll(jobGraph.getJobConfiguration());
>
> configuration.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, -1L);
> configuration.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS,
> jobGraph.getMaximumParallelism());
>
> // add (and override) the settings with what the user defined
> configuration.addAll(_conf);
>
> _exec = new LocalFlinkMiniCluster(configuration, true);
> _exec.start(true);
>
>
> // The above code is all basically the same as Flink's
> LocalStreamEnvironment.
> // The change is that here we call submitJobDetached vs. submitJobAndWait.
> // We assume that eventually someone calls stop(job id), which then
> terminates
> // the LocalFlinkMinimCluster.
> return _exec.submitJobDetached(jobGraph);
> }
>
> However I don’t think that would impact checkpointing.
>
> Anything else I should do to debug whether checkpointing is operating as
> expected? In the logs, at DEBUG level, I don’t see any errors or
> warnings related to this.
>
> Thanks,
>
> — Ken
>
>>
>>
>> Nico
>>
>> On 23/02/18 21:42, Ken Krugler wrote:
>>> Hi all,
>>>
>>> For testing checkpointing, is it possible to use LocalFlinkMiniCluster?
>>>
>>> Asking because I’m not seeing checkpoint calls being made to my
>>> custom function (implements ListCheckpointed) when I’m running with
>>> LocalFlinkMiniCluster.
>>>
>>> Though I do see entries like this logged:
>>>
>>> 18/02/23 12:40:50 INFO jobmanager.JobManager:246 - Using
>>> application-defined state backend for checkpoint/savepoint metadata:
>>> MemoryStateBackend (data in heap memory / checkpoints to JobManager).
>>> 18/02/23 12:40:50 INFO checkpoint.CheckpointCoordinator:525 -
>>> Checkpoint triggering task Source: Seed urls source (1/2) is not
>>> being executed at the moment. Aborting checkpoint.
>>>
>>> But when I browse the Flink source, tests for checkpointing seem to
>>> be using TestCluster, e.g. in ResumeCheckpointManuallyITCase
>>>
>>> Thanks,
>>>
>>> — Ken
>>>
>>> --------------------------------------------
>>> http://about.me/kkrugler
>>> <a href="tel:%2B1%20530-210-6378" value="+15302106378">+1 530-210-6378
>>>
>>
>
> --------------------------------------------
> http://about.me/kkrugler
> <a href="tel:%2B1%20530-210-6378" value="+15302106378">+1 530-210-6378
>


Reply | Threaded
Open this post in threaded view
|

Re: Which test cluster to use for checkpointing tests?

Ken Krugler
Hi Stephan,

Thanks for the update.

So is support for “running checkpoints with closed sources” part of FLIP-15, or something separate?

Regards,

— Ken

On Mar 1, 2018, at 9:07 AM, Stephan Ewen <[hidden email]> wrote:

@Ken The issue you are running into is that Checkpointing works currently only until the job reaches the point where the pipeline starts to drain out, meaning when the sources are done. In your case, the source is done immediately, sending out only one tuple.

Running checkpoints with closed sources is something that's on the feature list and will come soon…

--------------------------------------------
+1 530-210-6378

Reply | Threaded
Open this post in threaded view
|

Re: Which test cluster to use for checkpointing tests?

Nico Kruber
Hi Ken,
sorry, I was mislead by the fact that you are using iterations and those
were only documented for the DataSet API.

Running checkpoints with closed sources sounds like a more general thing
than being part of the iterations rework of FLIP-15. I couldn't dig up
anything on jira regarding this improvement either.

@Stephan: is this documented somewhere?


Nico

On 02/03/18 23:55, Ken Krugler wrote:

> Hi Stephan,
>
> Thanks for the update.
>
> So is support for “running checkpoints with closed sources” part
> of FLIP-15
> <https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=66853132>,
> or something separate?
>
> Regards,
>
> — Ken
>
>> On Mar 1, 2018, at 9:07 AM, Stephan Ewen <[hidden email]
>> <mailto:[hidden email]>> wrote:
>>
>> @Ken The issue you are running into is that Checkpointing works
>> currently only until the job reaches the point where the pipeline
>> starts to drain out, meaning when the sources are done. In your case,
>> the source is done immediately, sending out only one tuple.
>>
>> Running checkpoints with closed sources is something that's on the
>> feature list and will come soon…
>
> --------------------------------------------
> http://about.me/kkrugler
> +1 530-210-6378
>


signature.asc (201 bytes) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: Which test cluster to use for checkpointing tests?

Paris Carbone
Hey,

Indeed checkpointing iterations and dealing with closed sources are orthogonal issues, that is why the latter is not part of FLIP-15. Though, you kinda need both to have meaningful checkpoints for jobs with iterations.
One has to do with correctness (checkpointing strongly connected components in the execution graph) and the other about termination (terminating the checkpointing protocol when certain tasks ‘finish’).

I am willing to help out resolving the first issue, though I prefer to wait for ongoing changes in the network model and FLIP-6 to be finalised to apply this change properly (are they?).

Paris

> On 6 Mar 2018, at 10:51, Nico Kruber <[hidden email]> wrote:
>
> Hi Ken,
> sorry, I was mislead by the fact that you are using iterations and those
> were only documented for the DataSet API.
>
> Running checkpoints with closed sources sounds like a more general thing
> than being part of the iterations rework of FLIP-15. I couldn't dig up
> anything on jira regarding this improvement either.
>
> @Stephan: is this documented somewhere?
>
>
> Nico
>
> On 02/03/18 23:55, Ken Krugler wrote:
>> Hi Stephan,
>>
>> Thanks for the update.
>>
>> So is support for “running checkpoints with closed sources” part
>> of FLIP-15
>> <https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=66853132>,
>> or something separate?
>>
>> Regards,
>>
>> — Ken
>>
>>> On Mar 1, 2018, at 9:07 AM, Stephan Ewen <[hidden email]
>>> <mailto:[hidden email]>> wrote:
>>>
>>> @Ken The issue you are running into is that Checkpointing works
>>> currently only until the job reaches the point where the pipeline
>>> starts to drain out, meaning when the sources are done. In your case,
>>> the source is done immediately, sending out only one tuple.
>>>
>>> Running checkpoints with closed sources is something that's on the
>>> feature list and will come soon…
>>
>> --------------------------------------------
>> http://about.me/kkrugler
>> +1 530-210-6378
>>
>

Reply | Threaded
Open this post in threaded view
|

Re: Which test cluster to use for checkpointing tests?

Nico Kruber
There are still some upcoming changes for the network stack, but most of
the heavy stuff it already through - you may track this under
https://issues.apache.org/jira/browse/FLINK-8581

FLIP-6 is somewhat similar and currently only undergoes some stability
improvements/bug fixing. The architectural changes are merged now.


Nico

On 06/03/18 11:24, Paris Carbone wrote:

> Hey,
>
> Indeed checkpointing iterations and dealing with closed sources are orthogonal issues, that is why the latter is not part of FLIP-15. Though, you kinda need both to have meaningful checkpoints for jobs with iterations.
> One has to do with correctness (checkpointing strongly connected components in the execution graph) and the other about termination (terminating the checkpointing protocol when certain tasks ‘finish’).
>
> I am willing to help out resolving the first issue, though I prefer to wait for ongoing changes in the network model and FLIP-6 to be finalised to apply this change properly (are they?).
>
> Paris
>
>> On 6 Mar 2018, at 10:51, Nico Kruber <[hidden email]> wrote:
>>
>> Hi Ken,
>> sorry, I was mislead by the fact that you are using iterations and those
>> were only documented for the DataSet API.
>>
>> Running checkpoints with closed sources sounds like a more general thing
>> than being part of the iterations rework of FLIP-15. I couldn't dig up
>> anything on jira regarding this improvement either.
>>
>> @Stephan: is this documented somewhere?
>>
>>
>> Nico
>>
>> On 02/03/18 23:55, Ken Krugler wrote:
>>> Hi Stephan,
>>>
>>> Thanks for the update.
>>>
>>> So is support for “running checkpoints with closed sources” part
>>> of FLIP-15
>>> <https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=66853132>,
>>> or something separate?
>>>
>>> Regards,
>>>
>>> — Ken
>>>
>>>> On Mar 1, 2018, at 9:07 AM, Stephan Ewen <[hidden email]
>>>> <mailto:[hidden email]>> wrote:
>>>>
>>>> @Ken The issue you are running into is that Checkpointing works
>>>> currently only until the job reaches the point where the pipeline
>>>> starts to drain out, meaning when the sources are done. In your case,
>>>> the source is done immediately, sending out only one tuple.
>>>>
>>>> Running checkpoints with closed sources is something that's on the
>>>> feature list and will come soon…
>>>
>>> --------------------------------------------
>>> http://about.me/kkrugler
>>> +1 530-210-6378
>>>
>>
>


signature.asc (201 bytes) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: Which test cluster to use for checkpointing tests?

Aljoscha Krettek
Fyi, this is the Jira issue for tracking the issue: https://issues.apache.org/jira/browse/FLINK-2491

Aljoscha

On 6. Mar 2018, at 02:32, Nico Kruber <[hidden email]> wrote:

There are still some upcoming changes for the network stack, but most of
the heavy stuff it already through - you may track this under
https://issues.apache.org/jira/browse/FLINK-8581

FLIP-6 is somewhat similar and currently only undergoes some stability
improvements/bug fixing. The architectural changes are merged now.


Nico

On 06/03/18 11:24, Paris Carbone wrote:
Hey,

Indeed checkpointing iterations and dealing with closed sources are orthogonal issues, that is why the latter is not part of FLIP-15. Though, you kinda need both to have meaningful checkpoints for jobs with iterations.
One has to do with correctness (checkpointing strongly connected components in the execution graph) and the other about termination (terminating the checkpointing protocol when certain tasks ‘finish’).

I am willing to help out resolving the first issue, though I prefer to wait for ongoing changes in the network model and FLIP-6 to be finalised to apply this change properly (are they?).

Paris

On 6 Mar 2018, at 10:51, Nico Kruber <[hidden email]> wrote:

Hi Ken,
sorry, I was mislead by the fact that you are using iterations and those
were only documented for the DataSet API.

Running checkpoints with closed sources sounds like a more general thing
than being part of the iterations rework of FLIP-15. I couldn't dig up
anything on jira regarding this improvement either.

@Stephan: is this documented somewhere?


Nico

On 02/03/18 23:55, Ken Krugler wrote:
Hi Stephan,

Thanks for the update.

So is support for “running checkpoints with closed sources” part
of FLIP-15
<https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=66853132>,
or something separate?

Regards,

— Ken

On Mar 1, 2018, at 9:07 AM, Stephan Ewen <[hidden email]
<mailto:[hidden email]>> wrote:

@Ken The issue you are running into is that Checkpointing works
currently only until the job reaches the point where the pipeline
starts to drain out, meaning when the sources are done. In your case,
the source is done immediately, sending out only one tuple.

Running checkpoints with closed sources is something that's on the
feature list and will come soon…

--------------------------------------------
http://about.me/kkrugler
+1 530-210-6378