Hi all,
For testing checkpointing, is it possible to use LocalFlinkMiniCluster? Asking because I’m not seeing checkpoint calls being made to my custom function (implements ListCheckpointed) when I’m running with LocalFlinkMiniCluster. Though I do see entries like this logged: 18/02/23 12:40:50 INFO jobmanager.JobManager:246 - Using application-defined state backend for checkpoint/savepoint metadata: MemoryStateBackend (data in heap memory / checkpoints to JobManager). 18/02/23 12:40:50 INFO checkpoint.CheckpointCoordinator:525 - Checkpoint triggering task Source: Seed urls source (1/2) is not being executed at the moment. Aborting checkpoint. But when I browse the Flink source, tests for checkpointing seem to be using TestCluster, e.g. in ResumeCheckpointManuallyITCase Thanks, — Ken -------------------------------------------- http://about.me/kkrugler +1 530-210-6378 |
Hi Ken,
LocalFlinkMiniCluster should run checkpoints just fine. It looks like it was attempting to even create one but could not finish. Maybe your program was not fully running yet? Can you tell us a little bit more about your set up and how you configured the LocalFlinkMiniCluster? Nico On 23/02/18 21:42, Ken Krugler wrote: > Hi all, > > For testing checkpointing, is it possible to use LocalFlinkMiniCluster? > > Asking because I’m not seeing checkpoint calls being made to my custom function (implements ListCheckpointed) when I’m running with LocalFlinkMiniCluster. > > Though I do see entries like this logged: > > 18/02/23 12:40:50 INFO jobmanager.JobManager:246 - Using application-defined state backend for checkpoint/savepoint metadata: MemoryStateBackend (data in heap memory / checkpoints to JobManager). > 18/02/23 12:40:50 INFO checkpoint.CheckpointCoordinator:525 - Checkpoint triggering task Source: Seed urls source (1/2) is not being executed at the moment. Aborting checkpoint. > > But when I browse the Flink source, tests for checkpointing seem to be using TestCluster, e.g. in ResumeCheckpointManuallyITCase > > Thanks, > > — Ken > > -------------------------------------------- > http://about.me/kkrugler > +1 530-210-6378 > signature.asc (201 bytes) Download Attachment |
Hi Nico,
18/02/23 12:40:50 INFO taskmanager.Task:957 - Source: Seed urls source (1/2) (56fdede2f4783455b4ab8f290e700baa) switched from DEPLOYING to RUNNING. 18/02/23 12:40:50 DEBUG tasks.StreamTask:214 - Initializing Source: Seed urls source (1/2). 18/02/23 12:40:50 INFO checkpoint.CheckpointCoordinator:525 - Checkpoint triggering task Source: Seed urls source (1/2) is not being executed at the moment. Aborting checkpoint. Maybe the checkpoint here is happening too soon after the “Initializing Source” message. After that the source is done (it only triggers the iteration with a single starting tuple), so I wouldn’t expect checkpointing to actually do anything. I was just using these messages as indications that I had configured my workflow properly to actually do checkpointing.
For that reason I had to force checkpointing via: env.setStateBackend(new MemoryStateBackend()); env.enableCheckpointing(100L, CheckpointingMode.AT_LEAST_ONCE, true); Potential issue #2 - because of the fun with tracking iteration progress, I subclass LocalStreamEnvironment to add this async execution method: public JobSubmissionResult executeAsync(String jobName) throws Exception { // transform the streaming program into a JobGraph StreamGraph streamGraph = getStreamGraph(); streamGraph.setJobName(jobName); JobGraph jobGraph = streamGraph.getJobGraph(); Configuration configuration = new Configuration(); configuration.addAll(jobGraph.getJobConfiguration()); configuration.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, -1L); configuration.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, jobGraph.getMaximumParallelism()); // add (and override) the settings with what the user defined configuration.addAll(_conf); _exec = new LocalFlinkMiniCluster(configuration, true); _exec.start(true); // The above code is all basically the same as Flink's LocalStreamEnvironment. // The change is that here we call submitJobDetached vs. submitJobAndWait. // We assume that eventually someone calls stop(job id), which then terminates // the LocalFlinkMinimCluster. return _exec.submitJobDetached(jobGraph); } However I don’t think that would impact checkpointing. Anything else I should do to debug whether checkpointing is operating as expected? In the logs, at DEBUG level, I don’t see any errors or warnings related to this. Thanks, — Ken
|
I was a bit confused about when you said that the "source is done" which
is when I realized you must be using the batch API for which checkpointing is not available / needed. Let me quote from [1] which imho has not changed: DataSet: Fault tolerance for the DataSet API works by restarting the job and redoing all of the work. [...] The periodic in-flight checkpoints are not used here. DataStream: This one would start immediately inserting data (as it is a streaming job), and draw periodic checkpoints that make sure replay-on-failure only has to redo only a bit, not everything. Nico [1] https://lists.apache.org/thread.html/3121ad01f5adf4246aa035dfb886af534b063963dee0f86d63b675a1@1447086324@%3Cuser.flink.apache.org%3E On 26/02/18 22:55, Ken Krugler wrote: > Hi Nico, > >> On Feb 26, 2018, at 9:41 AM, Nico Kruber <[hidden email] >> <mailto:[hidden email]>> wrote: >> >> Hi Ken, >> LocalFlinkMiniCluster should run checkpoints just fine. It looks like it >> was attempting to even create one but could not finish. Maybe your >> program was not fully running yet? > > In the logs I see: > > 18/02/23 12:40:50 INFO taskmanager.Task:957 - Source: Seed urls source > (1/2) (56fdede2f4783455b4ab8f290e700baa) switched from DEPLOYING to RUNNING. > 18/02/23 12:40:50 DEBUG tasks.StreamTask:214 - Initializing Source: Seed > urls source (1/2). > 18/02/23 12:40:50 INFO checkpoint.CheckpointCoordinator:525 - Checkpoint > triggering task Source: Seed urls source (1/2) is not being executed at > the moment. Aborting checkpoint. > > Maybe the checkpoint here is happening too soon after the “Initializing > Source” message. > > After that the source is done (it only triggers the iteration with a > single starting tuple), so I wouldn’t expect checkpointing to actually > do anything. I was just using these messages as indications that I had > configured my workflow properly to actually do checkpointing. > >> Can you tell us a little bit more about your set up and how you >> configured the LocalFlinkMiniCluster? > > Potential issue #1 - I’ve got a workflow with multiple iterations. > > For that reason I had to force checkpointing via: > > env.setStateBackend(new MemoryStateBackend()); > env.enableCheckpointing(100L, CheckpointingMode.AT_LEAST_ONCE, true); > > > Potential issue #2 - because of the fun with tracking iteration > progress, I subclass LocalStreamEnvironment to add this async execution > method: > > public JobSubmissionResult executeAsync(String jobName) throws Exception { > // transform the streaming program into a JobGraph > StreamGraph streamGraph = getStreamGraph(); > streamGraph.setJobName(jobName); > > JobGraph jobGraph = streamGraph.getJobGraph(); > > Configuration configuration = new Configuration(); > configuration.addAll(jobGraph.getJobConfiguration()); > > configuration.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, -1L); > configuration.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, > jobGraph.getMaximumParallelism()); > > // add (and override) the settings with what the user defined > configuration.addAll(_conf); > > _exec = new LocalFlinkMiniCluster(configuration, true); > _exec.start(true); > > > // The above code is all basically the same as Flink's > LocalStreamEnvironment. > // The change is that here we call submitJobDetached vs. submitJobAndWait. > // We assume that eventually someone calls stop(job id), which then > terminates > // the LocalFlinkMinimCluster. > return _exec.submitJobDetached(jobGraph); > } > > However I don’t think that would impact checkpointing. > > Anything else I should do to debug whether checkpointing is operating as > expected? In the logs, at DEBUG level, I don’t see any errors or > warnings related to this. > > Thanks, > > — Ken > >> >> >> Nico >> >> On 23/02/18 21:42, Ken Krugler wrote: >>> Hi all, >>> >>> For testing checkpointing, is it possible to use LocalFlinkMiniCluster? >>> >>> Asking because I’m not seeing checkpoint calls being made to my >>> custom function (implements ListCheckpointed) when I’m running with >>> LocalFlinkMiniCluster. >>> >>> Though I do see entries like this logged: >>> >>> 18/02/23 12:40:50 INFO jobmanager.JobManager:246 - Using >>> application-defined state backend for checkpoint/savepoint metadata: >>> MemoryStateBackend (data in heap memory / checkpoints to JobManager). >>> 18/02/23 12:40:50 INFO checkpoint.CheckpointCoordinator:525 - >>> Checkpoint triggering task Source: Seed urls source (1/2) is not >>> being executed at the moment. Aborting checkpoint. >>> >>> But when I browse the Flink source, tests for checkpointing seem to >>> be using TestCluster, e.g. in ResumeCheckpointManuallyITCase >>> >>> Thanks, >>> >>> — Ken >>> >>> -------------------------------------------- >>> http://about.me/kkrugler >>> +1 530-210-6378 >>> >> > > -------------------------------------------- > http://about.me/kkrugler > +1 530-210-6378 > signature.asc (201 bytes) Download Attachment |
@Nico This has nothing to do with the DataSet API. The DataStream API supports finite programs as well.
@Ken The issue you are running into is that Checkpointing works currently only until the job reaches the point where the pipeline starts to drain out, meaning when the sources are done. In your case, the source is done immediately, sending out only one tuple. Running checkpoints with closed sources is something that's on the feature list and will come soon... On Wed, Feb 28, 2018 at 4:02 PM, Nico Kruber <[hidden email]> wrote: I was a bit confused about when you said that the "source is done" which |
Hi Stephan,
Thanks for the update. So is support for “running checkpoints with closed sources” part of FLIP-15, or something separate? Regards, — Ken
Running checkpoints with closed sources is something that's on the feature list and will come soon… |
Hi Ken,
sorry, I was mislead by the fact that you are using iterations and those were only documented for the DataSet API. Running checkpoints with closed sources sounds like a more general thing than being part of the iterations rework of FLIP-15. I couldn't dig up anything on jira regarding this improvement either. @Stephan: is this documented somewhere? Nico On 02/03/18 23:55, Ken Krugler wrote: > Hi Stephan, > > Thanks for the update. > > So is support for “running checkpoints with closed sources” part > of FLIP-15 > <https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=66853132>, > or something separate? > > Regards, > > — Ken > >> On Mar 1, 2018, at 9:07 AM, Stephan Ewen <[hidden email] >> <mailto:[hidden email]>> wrote: >> >> @Ken The issue you are running into is that Checkpointing works >> currently only until the job reaches the point where the pipeline >> starts to drain out, meaning when the sources are done. In your case, >> the source is done immediately, sending out only one tuple. >> >> Running checkpoints with closed sources is something that's on the >> feature list and will come soon… > > -------------------------------------------- > http://about.me/kkrugler > +1 530-210-6378 > signature.asc (201 bytes) Download Attachment |
Hey,
Indeed checkpointing iterations and dealing with closed sources are orthogonal issues, that is why the latter is not part of FLIP-15. Though, you kinda need both to have meaningful checkpoints for jobs with iterations. One has to do with correctness (checkpointing strongly connected components in the execution graph) and the other about termination (terminating the checkpointing protocol when certain tasks ‘finish’). I am willing to help out resolving the first issue, though I prefer to wait for ongoing changes in the network model and FLIP-6 to be finalised to apply this change properly (are they?). Paris > On 6 Mar 2018, at 10:51, Nico Kruber <[hidden email]> wrote: > > Hi Ken, > sorry, I was mislead by the fact that you are using iterations and those > were only documented for the DataSet API. > > Running checkpoints with closed sources sounds like a more general thing > than being part of the iterations rework of FLIP-15. I couldn't dig up > anything on jira regarding this improvement either. > > @Stephan: is this documented somewhere? > > > Nico > > On 02/03/18 23:55, Ken Krugler wrote: >> Hi Stephan, >> >> Thanks for the update. >> >> So is support for “running checkpoints with closed sources” part >> of FLIP-15 >> <https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=66853132>, >> or something separate? >> >> Regards, >> >> — Ken >> >>> On Mar 1, 2018, at 9:07 AM, Stephan Ewen <[hidden email] >>> <mailto:[hidden email]>> wrote: >>> >>> @Ken The issue you are running into is that Checkpointing works >>> currently only until the job reaches the point where the pipeline >>> starts to drain out, meaning when the sources are done. In your case, >>> the source is done immediately, sending out only one tuple. >>> >>> Running checkpoints with closed sources is something that's on the >>> feature list and will come soon… >> >> -------------------------------------------- >> http://about.me/kkrugler >> +1 530-210-6378 >> > |
There are still some upcoming changes for the network stack, but most of
the heavy stuff it already through - you may track this under https://issues.apache.org/jira/browse/FLINK-8581 FLIP-6 is somewhat similar and currently only undergoes some stability improvements/bug fixing. The architectural changes are merged now. Nico On 06/03/18 11:24, Paris Carbone wrote: > Hey, > > Indeed checkpointing iterations and dealing with closed sources are orthogonal issues, that is why the latter is not part of FLIP-15. Though, you kinda need both to have meaningful checkpoints for jobs with iterations. > One has to do with correctness (checkpointing strongly connected components in the execution graph) and the other about termination (terminating the checkpointing protocol when certain tasks ‘finish’). > > I am willing to help out resolving the first issue, though I prefer to wait for ongoing changes in the network model and FLIP-6 to be finalised to apply this change properly (are they?). > > Paris > >> On 6 Mar 2018, at 10:51, Nico Kruber <[hidden email]> wrote: >> >> Hi Ken, >> sorry, I was mislead by the fact that you are using iterations and those >> were only documented for the DataSet API. >> >> Running checkpoints with closed sources sounds like a more general thing >> than being part of the iterations rework of FLIP-15. I couldn't dig up >> anything on jira regarding this improvement either. >> >> @Stephan: is this documented somewhere? >> >> >> Nico >> >> On 02/03/18 23:55, Ken Krugler wrote: >>> Hi Stephan, >>> >>> Thanks for the update. >>> >>> So is support for “running checkpoints with closed sources” part >>> of FLIP-15 >>> <https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=66853132>, >>> or something separate? >>> >>> Regards, >>> >>> — Ken >>> >>>> On Mar 1, 2018, at 9:07 AM, Stephan Ewen <[hidden email] >>>> <mailto:[hidden email]>> wrote: >>>> >>>> @Ken The issue you are running into is that Checkpointing works >>>> currently only until the job reaches the point where the pipeline >>>> starts to drain out, meaning when the sources are done. In your case, >>>> the source is done immediately, sending out only one tuple. >>>> >>>> Running checkpoints with closed sources is something that's on the >>>> feature list and will come soon… >>> >>> -------------------------------------------- >>> http://about.me/kkrugler >>> +1 530-210-6378 >>> >> > signature.asc (201 bytes) Download Attachment |
Fyi, this is the Jira issue for tracking the issue: https://issues.apache.org/jira/browse/FLINK-2491
Aljoscha
|
Free forum by Nabble | Edit this page |