Hello everyone,
last week I've ran some tests with Apache ZooKeeper to get a grip on Flink HA features. My tests went bad so far and I can't sort out the reason. My latest tests involved Flink 0.10.2, ran as a standalone cluster with 3 masters and 4 slaves. The 3 masters are also the ZooKeeper (3.4.6) ensemble. I've started ZooKeeper on each machine, tested it's availability and then started the Flink cluster. Since there's no reliable distributed filesystem on the cluster, I had to use the local file system as the state backend. I then submitted a very simple streaming job that writes the timestamp on a text file on the local file system each second and then went on to kill the process running the job manager to verify that another job manager takes over. However, the job just stopped. I still have to perform some checks on the handover to the new job manager, but before digging deeper I wanted to ask if my expectation of having the job going despite the job manager failure is unreasonable. Thanks in advance. BR, Stefano Baghino |
Using the local file system as state backend only works if all job
managers run on the same machine. Is that the case? Have you specified all job managers in the masters file? With the local file system state backend only something like host-X host-X host-X will be a valid masters configuration. Can you please share the job manager logs of all started job managers? – Ufuk On Mon, Feb 15, 2016 at 12:35 PM, Stefano Baghino <[hidden email]> wrote: > Hello everyone, > > last week I've ran some tests with Apache ZooKeeper to get a grip on Flink > HA features. My tests went bad so far and I can't sort out the reason. > > My latest tests involved Flink 0.10.2, ran as a standalone cluster with 3 > masters and 4 slaves. The 3 masters are also the ZooKeeper (3.4.6) ensemble. > I've started ZooKeeper on each machine, tested it's availability and then > started the Flink cluster. Since there's no reliable distributed filesystem > on the cluster, I had to use the local file system as the state backend. > > I then submitted a very simple streaming job that writes the timestamp on a > text file on the local file system each second and then went on to kill the > process running the job manager to verify that another job manager takes > over. However, the job just stopped. I still have to perform some checks on > the handover to the new job manager, but before digging deeper I wanted to > ask if my expectation of having the job going despite the job manager > failure is unreasonable. > > Thanks in advance. > > -- > BR, > Stefano Baghino > > Software Engineer @ Radicalbit |
In reply to this post by stefanobaghino
Hi Stefano,
The Job should stop temporarily but then be resumed by the new JobManager. Have you increased the number of execution retries? AFAIK, it is set to 0 by default. This will not re-run the job, even in HA mode. You can enable it on the StreamExecutionEnvironment. Otherwise, you have probably already found the documentation: https://ci.apache.org/projects/flink/flink-docs-master/setup/jobmanager_high_availability.html#configuration Cheers, Max On Mon, Feb 15, 2016 at 12:35 PM, Stefano Baghino <[hidden email]> wrote: > Hello everyone, > > last week I've ran some tests with Apache ZooKeeper to get a grip on Flink > HA features. My tests went bad so far and I can't sort out the reason. > > My latest tests involved Flink 0.10.2, ran as a standalone cluster with 3 > masters and 4 slaves. The 3 masters are also the ZooKeeper (3.4.6) ensemble. > I've started ZooKeeper on each machine, tested it's availability and then > started the Flink cluster. Since there's no reliable distributed filesystem > on the cluster, I had to use the local file system as the state backend. > > I then submitted a very simple streaming job that writes the timestamp on a > text file on the local file system each second and then went on to kill the > process running the job manager to verify that another job manager takes > over. However, the job just stopped. I still have to perform some checks on > the handover to the new job manager, but before digging deeper I wanted to > ask if my expectation of having the job going despite the job manager > failure is unreasonable. > > Thanks in advance. > > -- > BR, > Stefano Baghino > > Software Engineer @ Radicalbit |
In reply to this post by Ufuk Celebi
Hi Ufuk, thanks for replying. Regarding the masters file: yes, I've specified all the masters and checked out that they were actually running after the start-cluster.sh. I'll gladly share the logs as soon as I get to see them. Regarding the state backend: how does having a non-distributed storage as the state backend influence the HA features? I thought it would have meant that the job state couldn't be restored but the job itself could've been started after the backup job manager started. Does not having a reliable distributed storage service as the state backend mean that the HA features don't work? Again, thank you very much. On Mon, Feb 15, 2016 at 12:48 PM, Ufuk Celebi <[hidden email]> wrote: Using the local file system as state backend only works if all job BR, Stefano Baghino |
In reply to this post by Maximilian Michels
Hi Maximilian, thank you for the reply. I've checked out the documentation before running my tests (I'm not expert enough to not read the docs ;)) but it doesn't mention some specific requirement regarding the execution retries, I'll check it out, thank! On Mon, Feb 15, 2016 at 12:51 PM, Maximilian Michels <[hidden email]> wrote: Hi Stefano, BR, Stefano Baghino |
Hi Stefano,
That is true. The documentation doesn't mention that. Just wanted to point you to the documentation if anything else needs to be configured. We will update it. Instead of setting the number of execution retries on the StreamExecutionEnvironment, you may also set "execution-retries.default" in the flink-conf.yaml. Let us know if that fixes your setup. Cheers, Max On Mon, Feb 15, 2016 at 1:41 PM, Stefano Baghino <[hidden email]> wrote: > Hi Maximilian, > > thank you for the reply. I've checked out the documentation before running > my tests (I'm not expert enough to not read the docs ;)) but it doesn't > mention some specific requirement regarding the execution retries, I'll > check it out, thank! > > On Mon, Feb 15, 2016 at 12:51 PM, Maximilian Michels <[hidden email]> wrote: >> >> Hi Stefano, >> >> The Job should stop temporarily but then be resumed by the new >> JobManager. Have you increased the number of execution retries? AFAIK, >> it is set to 0 by default. This will not re-run the job, even in HA >> mode. You can enable it on the StreamExecutionEnvironment. >> >> Otherwise, you have probably already found the documentation: >> >> https://ci.apache.org/projects/flink/flink-docs-master/setup/jobmanager_high_availability.html#configuration >> >> Cheers, >> Max >> >> On Mon, Feb 15, 2016 at 12:35 PM, Stefano Baghino >> <[hidden email]> wrote: >> > Hello everyone, >> > >> > last week I've ran some tests with Apache ZooKeeper to get a grip on >> > Flink >> > HA features. My tests went bad so far and I can't sort out the reason. >> > >> > My latest tests involved Flink 0.10.2, ran as a standalone cluster with >> > 3 >> > masters and 4 slaves. The 3 masters are also the ZooKeeper (3.4.6) >> > ensemble. >> > I've started ZooKeeper on each machine, tested it's availability and >> > then >> > started the Flink cluster. Since there's no reliable distributed >> > filesystem >> > on the cluster, I had to use the local file system as the state backend. >> > >> > I then submitted a very simple streaming job that writes the timestamp >> > on a >> > text file on the local file system each second and then went on to kill >> > the >> > process running the job manager to verify that another job manager takes >> > over. However, the job just stopped. I still have to perform some checks >> > on >> > the handover to the new job manager, but before digging deeper I wanted >> > to >> > ask if my expectation of having the job going despite the job manager >> > failure is unreasonable. >> > >> > Thanks in advance. >> > >> > -- >> > BR, >> > Stefano Baghino >> > >> > Software Engineer @ Radicalbit > > > > > -- > BR, > Stefano Baghino > > Software Engineer @ Radicalbit |
In reply to this post by stefanobaghino
> On 15 Feb 2016, at 13:40, Stefano Baghino <[hidden email]> wrote: > > Hi Ufuk, thanks for replying. > > Regarding the masters file: yes, I've specified all the masters and checked out that they were actually running after the start-cluster.sh. I'll gladly share the logs as soon as I get to see them. > > Regarding the state backend: how does having a non-distributed storage as the state backend influence the HA features? I thought it would have meant that the job state couldn't be restored but the job itself could've been started after the backup job manager started. Does not having a reliable distributed storage service as the state backend mean that the HA features don't work? No, the submitted job is also stored in the state backend and it is recovered from there. ZooKeeper has a pointer to the state handle of the configured backend. Since all job managers run on the same host it should work as you expected. The requirement is that all job managers need to be able to access the state backend. Recovery of a job manager failure is actually independent of the execution retries right now. I think as soon as we have a look at the logs, we will figure it out. ;) – Ufuk |
In reply to this post by Maximilian Michels
Hi Stefano,
A correction from my side: You don't need to set the execution retries for HA because a new JobManager will automatically take over and resubmit all jobs which were recovered from the storage directory you set up. The number of execution retries applies only to jobs which are restarted due to a TaskManager failure. It would be great if you could supply some logs. Cheers, Max On Mon, Feb 15, 2016 at 1:45 PM, Maximilian Michels <[hidden email]> wrote: > Hi Stefano, > > That is true. The documentation doesn't mention that. Just wanted to > point you to the documentation if anything else needs to be > configured. We will update it. > > Instead of setting the number of execution retries on the > StreamExecutionEnvironment, you may also set > "execution-retries.default" in the flink-conf.yaml. Let us know if > that fixes your setup. > > Cheers, > Max > > On Mon, Feb 15, 2016 at 1:41 PM, Stefano Baghino > <[hidden email]> wrote: >> Hi Maximilian, >> >> thank you for the reply. I've checked out the documentation before running >> my tests (I'm not expert enough to not read the docs ;)) but it doesn't >> mention some specific requirement regarding the execution retries, I'll >> check it out, thank! >> >> On Mon, Feb 15, 2016 at 12:51 PM, Maximilian Michels <[hidden email]> wrote: >>> >>> Hi Stefano, >>> >>> The Job should stop temporarily but then be resumed by the new >>> JobManager. Have you increased the number of execution retries? AFAIK, >>> it is set to 0 by default. This will not re-run the job, even in HA >>> mode. You can enable it on the StreamExecutionEnvironment. >>> >>> Otherwise, you have probably already found the documentation: >>> >>> https://ci.apache.org/projects/flink/flink-docs-master/setup/jobmanager_high_availability.html#configuration >>> >>> Cheers, >>> Max >>> >>> On Mon, Feb 15, 2016 at 12:35 PM, Stefano Baghino >>> <[hidden email]> wrote: >>> > Hello everyone, >>> > >>> > last week I've ran some tests with Apache ZooKeeper to get a grip on >>> > Flink >>> > HA features. My tests went bad so far and I can't sort out the reason. >>> > >>> > My latest tests involved Flink 0.10.2, ran as a standalone cluster with >>> > 3 >>> > masters and 4 slaves. The 3 masters are also the ZooKeeper (3.4.6) >>> > ensemble. >>> > I've started ZooKeeper on each machine, tested it's availability and >>> > then >>> > started the Flink cluster. Since there's no reliable distributed >>> > filesystem >>> > on the cluster, I had to use the local file system as the state backend. >>> > >>> > I then submitted a very simple streaming job that writes the timestamp >>> > on a >>> > text file on the local file system each second and then went on to kill >>> > the >>> > process running the job manager to verify that another job manager takes >>> > over. However, the job just stopped. I still have to perform some checks >>> > on >>> > the handover to the new job manager, but before digging deeper I wanted >>> > to >>> > ask if my expectation of having the job going despite the job manager >>> > failure is unreasonable. >>> > >>> > Thanks in advance. >>> > >>> > -- >>> > BR, >>> > Stefano Baghino >>> > >>> > Software Engineer @ Radicalbit >> >> >> >> >> -- >> BR, >> Stefano Baghino >> >> Software Engineer @ Radicalbit |
You can find the log of the recovering job manager here: https://gist.github.com/stefanobaghino/ae28f00efb6bdd907b42 Basically, what Ufuk said happened: the job manager tried to fill in for the lost one but couldn't find the actual data because it looked it up locally whereas due to my configuration it was actually stored on another machine. Thanks for the help, it's really been precious! On Mon, Feb 15, 2016 at 5:24 PM, Maximilian Michels <[hidden email]> wrote: Hi Stefano, BR, Stefano Baghino |
Ok, simply turning up HDFS on the cluster and using it as the state backend fixed the issue. Thank you both for the help! On Mon, Feb 15, 2016 at 5:45 PM, Stefano Baghino <[hidden email]> wrote:
BR, Stefano Baghino |
Hi! As a bit of background: ZooKeeper allows you only to store very small data. We hence persist only the changing checkpoint metadata in ZooKeeper. To recover a job, some constant data is also needed: The JobGraph, and the JarFiles. These cannot go to ZooKeeper, but need to go to a reliable storage (such as HDFS, S3, or a mounted file system). Greetings, Stephan On Tue, Feb 16, 2016 at 11:30 AM, Stefano Baghino <[hidden email]> wrote:
|
Free forum by Nabble | Edit this page |