Hi,
I'm working on a streaming application using Flink. Several steps in the processing are state-full (I use custom Windows and state-full operators ). Now if during a normal run an worker fails the checkpointing system will be used to recover. But what if the entire application is stopped (deliberately) or stops/fails because of a problem? At this moment I have three main reasons/causes for doing this: 1) The application just dies because of a bug on my side or a problem like for example this (which I'm actually confronted with): Failed to Update HDFS Delegation Token for long running application in HA mode https://issues.apache.org/jira/browse/HDFS-9276 2) I need to rebalance my application (i.e. stop, change parallelism, start) 3) I need a new version of my software to be deployed. (i.e. I fixed a bug, changed the topology and need to continue) I assume the solution will be in some part be specific for my application. The question is what features exist in Flink to support such a clean "continue where I left of" scenario? Best regards / Met vriendelijke groeten,
Niels Basjes |
Hello,
You are probably looking for this feature: https://issues.apache.org/jira/browse/FLINK-2976 Best, Gábor 2016-01-14 11:05 GMT+01:00 Niels Basjes <[hidden email]>: > Hi, > > I'm working on a streaming application using Flink. > Several steps in the processing are state-full (I use custom Windows and > state-full operators ). > > Now if during a normal run an worker fails the checkpointing system will be > used to recover. > > But what if the entire application is stopped (deliberately) or stops/fails > because of a problem? > > At this moment I have three main reasons/causes for doing this: > 1) The application just dies because of a bug on my side or a problem like > for example this (which I'm actually confronted with): Failed to Update > HDFS Delegation Token for long running application in HA mode > https://issues.apache.org/jira/browse/HDFS-9276 > 2) I need to rebalance my application (i.e. stop, change parallelism, start) > 3) I need a new version of my software to be deployed. (i.e. I fixed a bug, > changed the topology and need to continue) > > I assume the solution will be in some part be specific for my application. > The question is what features exist in Flink to support such a clean > "continue where I left of" scenario? > > -- > Best regards / Met vriendelijke groeten, > > Niels Basjes |
Hey Niels,
as Gabor wrote, this feature has been merged to the master branch recently. The docs are online here: https://ci.apache.org/projects/flink/flink-docs-master/apis/savepoints.html Feel free to report back your experience with it if you give it a try. – Ufuk > On 14 Jan 2016, at 11:09, Gábor Gévay <[hidden email]> wrote: > > Hello, > > You are probably looking for this feature: > https://issues.apache.org/jira/browse/FLINK-2976 > > Best, > Gábor > > > > > 2016-01-14 11:05 GMT+01:00 Niels Basjes <[hidden email]>: >> Hi, >> >> I'm working on a streaming application using Flink. >> Several steps in the processing are state-full (I use custom Windows and >> state-full operators ). >> >> Now if during a normal run an worker fails the checkpointing system will be >> used to recover. >> >> But what if the entire application is stopped (deliberately) or stops/fails >> because of a problem? >> >> At this moment I have three main reasons/causes for doing this: >> 1) The application just dies because of a bug on my side or a problem like >> for example this (which I'm actually confronted with): Failed to Update >> HDFS Delegation Token for long running application in HA mode >> https://issues.apache.org/jira/browse/HDFS-9276 >> 2) I need to rebalance my application (i.e. stop, change parallelism, start) >> 3) I need a new version of my software to be deployed. (i.e. I fixed a bug, >> changed the topology and need to continue) >> >> I assume the solution will be in some part be specific for my application. >> The question is what features exist in Flink to support such a clean >> "continue where I left of" scenario? >> >> -- >> Best regards / Met vriendelijke groeten, >> >> Niels Basjes |
Yes, that is exactly the type of solution I was looking for. I'll dive into this. Thanks guys! Niels On Thu, Jan 14, 2016 at 11:55 AM, Ufuk Celebi <[hidden email]> wrote: Hey Niels, Best regards / Met vriendelijke groeten,
Niels Basjes |
The documentation layout changed in the master. Then new URL:
https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/savepoints.html On Thu, Jan 14, 2016 at 2:21 PM, Niels Basjes <[hidden email]> wrote: > Yes, that is exactly the type of solution I was looking for. > > I'll dive into this. > Thanks guys! > > Niels > > On Thu, Jan 14, 2016 at 11:55 AM, Ufuk Celebi <[hidden email]> wrote: >> >> Hey Niels, >> >> as Gabor wrote, this feature has been merged to the master branch >> recently. >> >> The docs are online here: >> https://ci.apache.org/projects/flink/flink-docs-master/apis/savepoints.html >> >> Feel free to report back your experience with it if you give it a try. >> >> – Ufuk >> >> > On 14 Jan 2016, at 11:09, Gábor Gévay <[hidden email]> wrote: >> > >> > Hello, >> > >> > You are probably looking for this feature: >> > https://issues.apache.org/jira/browse/FLINK-2976 >> > >> > Best, >> > Gábor >> > >> > >> > >> > >> > 2016-01-14 11:05 GMT+01:00 Niels Basjes <[hidden email]>: >> >> Hi, >> >> >> >> I'm working on a streaming application using Flink. >> >> Several steps in the processing are state-full (I use custom Windows >> >> and >> >> state-full operators ). >> >> >> >> Now if during a normal run an worker fails the checkpointing system >> >> will be >> >> used to recover. >> >> >> >> But what if the entire application is stopped (deliberately) or >> >> stops/fails >> >> because of a problem? >> >> >> >> At this moment I have three main reasons/causes for doing this: >> >> 1) The application just dies because of a bug on my side or a problem >> >> like >> >> for example this (which I'm actually confronted with): Failed to >> >> Update >> >> HDFS Delegation Token for long running application in HA mode >> >> https://issues.apache.org/jira/browse/HDFS-9276 >> >> 2) I need to rebalance my application (i.e. stop, change parallelism, >> >> start) >> >> 3) I need a new version of my software to be deployed. (i.e. I fixed a >> >> bug, >> >> changed the topology and need to continue) >> >> >> >> I assume the solution will be in some part be specific for my >> >> application. >> >> The question is what features exist in Flink to support such a clean >> >> "continue where I left of" scenario? >> >> >> >> -- >> >> Best regards / Met vriendelijke groeten, >> >> >> >> Niels Basjes >> > > > > -- > Best regards / Met vriendelijke groeten, > > Niels Basjes |
Hi, I did a bit of reading about the savepoints and that in fact they are written as "Allow to trigger checkpoints manually". Let me sketch what I think I need: 1) I need recovery of the topology in case of partial failure (i.e. a single node dies). 2) I need recovery of the topology in case of full topology failure (i.e. Hadoop security tokens cause the entire thing to die, or I need to deploy a fixed version of my software). Now what I understand is that the checkpoints are managed by Flink and as such allow me to run the topology without any manual actions. These are cleaned automatically when no longer needed. These savepoints however appear to need external 'intervention'; they are intended as 'manual'. So in addition to my topology I need something extra that periodically (i.e. every minute) fires a command to persist a checkpoint into a savepoint and to cleanup the 'old' ones. What I want is something that works roughly as follows: 1) I configure everything (i.e. assign Ids configure the checkpoint directory, etc.) 2) The framework saves and cleans the checkpoints automatically when the topology is running. 3) I simply start the topology without any special options. My idea is essentially that at the startup of a topology the system looks at the configured checkpoint persistance and recovers the most recent one. Apparently there is a mismatch between what I think is useful and what has been implemented so far. Am I missing something or should I submit this as a Jira ticket for a later version? Niels Basjes On Mon, Jan 18, 2016 at 12:13 PM, Maximilian Michels <[hidden email]> wrote: The documentation layout changed in the master. Then new URL: Best regards / Met vriendelijke groeten,
Niels Basjes |
Hi Niels! There is a slight mismatch between your thoughts and the current design, but not much. What you describe (at the start of the job, the latest checkpoint is automatically loaded) is basically what the high-availability setup does if the master dies. The new master loads all jobs and continues them from the latest checkpoint. If you run an HA setup, and you stop/restart your jobs not by stopping the jobs, but by killing the cluster, you should get that behavior. Once a job is properly stopped, and you start a new job, there is no way for Flink to tell that this is in fact the same job and it should resume from where the recently stopped. Also, "same" should be a fuzzy "same", to allow for slight changes in the job (bug fixes). Safepoints let you put the persistent part of the job somewhere, to tell a new job where to pick up from. - Makes it work in non-HA setups - Allows you to keep multiple savepoint (like "versions", say one per day or so) to roll back to - Can have multiple versions of the same jobs resuming from one savepoint (what-if or A/B tests, or seamless version upgrades) There is something on the roadmap that would make your use case very easy: "StopWithSavepoint" There is an open pull request to cleanly stop() a streaming program. The next enhancement is to stop it and let it draw a savepoint as part of that. Then you can simply script a stop/start like that: # stop with savepoint bin/flink stop -s <random-directory> jobid # resume bin/flink run -s <random-directory> job Hope that helps, Stephan On Fri, Jan 22, 2016 at 3:06 PM, Niels Basjes <[hidden email]> wrote:
|
Hey Niels!
Stephan gave a very good summary of the current state of things. What do you think of the outlined stop with savepoint method? Regarding the broken links: I’ve fixed various broken links in the master docs yesterday. If you encounter something again, feel free to post it to the ML or open a JIRA for it. – Ufuk > On 25 Jan 2016, at 16:21, Stephan Ewen <[hidden email]> wrote: > > Hi Niels! > > There is a slight mismatch between your thoughts and the current design, but not much. > > What you describe (at the start of the job, the latest checkpoint is automatically loaded) is basically what the high-availability setup does if the master dies. The new master loads all jobs and continues them from the latest checkpoint. > If you run an HA setup, and you stop/restart your jobs not by stopping the jobs, but by killing the cluster, you should get that behavior. > > Once a job is properly stopped, and you start a new job, there is no way for Flink to tell that this is in fact the same job and it should resume from where the recently stopped. Also, "same" should be a fuzzy "same", to allow for slight changes in the job (bug fixes). Safepoints let you put the persistent part of the job somewhere, to tell a new job where to pick up from. > - Makes it work in non-HA setups > - Allows you to keep multiple savepoint (like "versions", say one per day or so) to roll back to > - Can have multiple versions of the same jobs resuming from one savepoint (what-if or A/B tests, or seamless version upgrades) > > > There is something on the roadmap that would make your use case very easy: "StopWithSavepoint" > > There is an open pull request to cleanly stop() a streaming program. The next enhancement is to stop it and let it draw a savepoint as part of that. Then you can simply script a stop/start like that: > > # stop with savepoint > bin/flink stop -s <random-directory> jobid > > # resume > bin/flink run -s <random-directory> job > > > Hope that helps, > Stephan > > > On Fri, Jan 22, 2016 at 3:06 PM, Niels Basjes <[hidden email]> wrote: > Hi, > > @Max: Thanks for the new URL. I noticed that a lot (in fact almost all) of links in the new manuals lead to 404 errors. Maybe you should run an automated test to find them all. > > I did a bit of reading about the savepoints and that in fact they are written as "Allow to trigger checkpoints manually". > > Let me sketch what I think I need: > 1) I need recovery of the topology in case of partial failure (i.e. a single node dies). > 2) I need recovery of the topology in case of full topology failure (i.e. Hadoop security tokens cause the entire thing to die, or I need to deploy a fixed version of my software). > > Now what I understand is that the checkpoints are managed by Flink and as such allow me to run the topology without any manual actions. These are cleaned automatically when no longer needed. > These savepoints however appear to need external 'intervention'; they are intended as 'manual'. So in addition to my topology I need something extra that periodically (i.e. every minute) fires a command to persist a checkpoint into a savepoint and to cleanup the 'old' ones. > > What I want is something that works roughly as follows: > 1) I configure everything (i.e. assign Ids configure the checkpoint directory, etc.) > 2) The framework saves and cleans the checkpoints automatically when the topology is running. > 3) I simply start the topology without any special options. > > My idea is essentially that at the startup of a topology the system looks at the configured checkpoint persistance and recovers the most recent one. > > Apparently there is a mismatch between what I think is useful and what has been implemented so far. > Am I missing something or should I submit this as a Jira ticket for a later version? > > Niels Basjes > > > > > > > On Mon, Jan 18, 2016 at 12:13 PM, Maximilian Michels <[hidden email]> wrote: > The documentation layout changed in the master. Then new URL: > https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/savepoints.html > > On Thu, Jan 14, 2016 at 2:21 PM, Niels Basjes <[hidden email]> wrote: > > Yes, that is exactly the type of solution I was looking for. > > > > I'll dive into this. > > Thanks guys! > > > > Niels > > > > On Thu, Jan 14, 2016 at 11:55 AM, Ufuk Celebi <[hidden email]> wrote: > >> > >> Hey Niels, > >> > >> as Gabor wrote, this feature has been merged to the master branch > >> recently. > >> > >> The docs are online here: > >> https://ci.apache.org/projects/flink/flink-docs-master/apis/savepoints.html > >> > >> Feel free to report back your experience with it if you give it a try. > >> > >> – Ufuk > >> > >> > On 14 Jan 2016, at 11:09, Gábor Gévay <[hidden email]> wrote: > >> > > >> > Hello, > >> > > >> > You are probably looking for this feature: > >> > https://issues.apache.org/jira/browse/FLINK-2976 > >> > > >> > Best, > >> > Gábor > >> > > >> > > >> > > >> > > >> > 2016-01-14 11:05 GMT+01:00 Niels Basjes <[hidden email]>: > >> >> Hi, > >> >> > >> >> I'm working on a streaming application using Flink. > >> >> Several steps in the processing are state-full (I use custom Windows > >> >> and > >> >> state-full operators ). > >> >> > >> >> Now if during a normal run an worker fails the checkpointing system > >> >> will be > >> >> used to recover. > >> >> > >> >> But what if the entire application is stopped (deliberately) or > >> >> stops/fails > >> >> because of a problem? > >> >> > >> >> At this moment I have three main reasons/causes for doing this: > >> >> 1) The application just dies because of a bug on my side or a problem > >> >> like > >> >> for example this (which I'm actually confronted with): Failed to > >> >> Update > >> >> HDFS Delegation Token for long running application in HA mode > >> >> https://issues.apache.org/jira/browse/HDFS-9276 > >> >> 2) I need to rebalance my application (i.e. stop, change parallelism, > >> >> start) > >> >> 3) I need a new version of my software to be deployed. (i.e. I fixed a > >> >> bug, > >> >> changed the topology and need to continue) > >> >> > >> >> I assume the solution will be in some part be specific for my > >> >> application. > >> >> The question is what features exist in Flink to support such a clean > >> >> "continue where I left of" scenario? > >> >> > >> >> -- > >> >> Best regards / Met vriendelijke groeten, > >> >> > >> >> Niels Basjes > >> > > > > > > > > -- > > Best regards / Met vriendelijke groeten, > > > > Niels Basjes > > > > -- > Best regards / Met vriendelijke groeten, > > Niels Basjes > |
Hi, In reference with this topic, there is any feature for automatically restart job after a task exception? Like --supervise command line option in apache spark Thanks in advance! El mar., 26 ene. 2016 a las 11:07, Ufuk Celebi (<[hidden email]>) escribió: Hey Niels! |
> On 01 Feb 2016, at 17:14, Don Frascuchon <[hidden email]> wrote: > > Hi, > > In reference with this topic, there is any feature for automatically restart job after a task exception? Like --supervise command line option in apache spark If you are referring to job manager/task manager instances: No. Currently, the recommended way is to run with YARN in order to get this behaviour. If you are referring to tasks: you can enable this via the ExecutionConfig (https://ci.apache.org/projects/flink/flink-docs-master/apis/batch/index.html#execution-configuration) via setNumberOfExecutionRetries() and setExecutionRetryDelay(). Does this help? – Ufuk |
@Don In Flink, the client needs not keep running, it can detach itself once the job is submitted, or stay connected and receive status messages. Since the client is not necessary for the program execution, I think you don't need "supervise" or anything like that... On Mon, Feb 1, 2016 at 7:23 PM, Ufuk Celebi <[hidden email]> wrote:
|
Free forum by Nabble | Edit this page |