Hello everyone,
I have a question concerning stopping Flink streaming processes that run in a detached Yarn session. Here's what we do: We start a Yarn session via yarn-session.sh -n 8 -d -jm 4096 -tm 10000 -s 10 -qu flink_queue Then, we start our Flink streaming application via flink run -p 65 -c SomeClass some.jar > /dev/null 2>&1 & The problem occurs when we stop the application. If we stop the Flink application with flink cancel <JOB_ID> and then kill the yarn application with yarn application -kill <APPLICATION_ID> everything is fine. But what we expected was that when we only kill the yarn application without specifically canceling the Flink job before, the Flink job will stay lingering on the machine and use resources until it is killed manually via its process id. One thing that we tried was to stop using ephemeral ports for the application-manager, namely we set yarn.application-master.port specifically to some port number, but the problem remains: Killing the yarn application does not kill the corresponding Flink job. Does anyone have an idea about this? Any help is greatly appreciated :-) By the way, our application reads data from a Kafka queue and writes it into HDFS, maybe this is also important to know. Thank you and best regards Konstantin -- Konstantin Gregor * [hidden email] TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke Sitz: Unterföhring * Amtsgericht München * HRB 135082 |
Are you running in HA mode? If yes, that's the expected behaviour at
the moment, because the ZooKeeper data is only cleaned up on a terminal state (FINISHED, FAILED, CANCELLED). You have to specify separate ZooKeeper root paths via "recovery.zookeeper.path.root". There is an issue which should be fixed for 1.2 to make this configurable in an easy way. On Tue, Jul 12, 2016 at 1:28 PM, Konstantin Gregor <[hidden email]> wrote: > Hello everyone, > > I have a question concerning stopping Flink streaming processes that run > in a detached Yarn session. > > Here's what we do: We start a Yarn session via > yarn-session.sh -n 8 -d -jm 4096 -tm 10000 -s 10 -qu flink_queue > > Then, we start our Flink streaming application via > flink run -p 65 -c SomeClass some.jar > /dev/null 2>&1 & > > The problem occurs when we stop the application. > If we stop the Flink application with > flink cancel <JOB_ID> > and then kill the yarn application with > yarn application -kill <APPLICATION_ID> > everything is fine. > But what we expected was that when we only kill the yarn application > without specifically canceling the Flink job before, the Flink job will > stay lingering on the machine and use resources until it is killed > manually via its process id. > > One thing that we tried was to stop using ephemeral ports for the > application-manager, namely we set yarn.application-master.port > specifically to some port number, but the problem remains: Killing the > yarn application does not kill the corresponding Flink job. > > Does anyone have an idea about this? Any help is greatly appreciated :-) > By the way, our application reads data from a Kafka queue and writes it > into HDFS, maybe this is also important to know. > > Thank you and best regards > > Konstantin > -- > Konstantin Gregor * [hidden email] > TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring > Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke > Sitz: Unterföhring * Amtsgericht München * HRB 135082 |
I think there is a confusion between how Flink thinks about HA and job life cycle, and how many users think about it.
Flink thinks that a killing of the YARN session is a failure of the job. So as soon as new Yarn resources become available, it tries to recover the job. Most users think that killing a Yarn session is equivalent to canceling the job. I am unsure if we should start to interpret the killing of a Yarn session as a cancellation. Do Yarn sessions never get killed accidentally, or as the result of a Yarn-related failure? Using Flink-job-at-a-time-on-yarn, cancelling the Flink Job also shuts down the Yarn session and hence shuts down everything properly. Hope that train of thought helps. On Tue, Jul 12, 2016 at 3:15 PM, Ufuk Celebi <[hidden email]> wrote: Are you running in HA mode? If yes, that's the expected behaviour at |
This is being addressed here: https://github.com/apache/flink/pull/2249
On Tue, Jul 12, 2016 at 3:48 PM, Stephan Ewen <[hidden email]> wrote: > I think there is a confusion between how Flink thinks about HA and job life > cycle, and how many users think about it. > > Flink thinks that a killing of the YARN session is a failure of the job. So > as soon as new Yarn resources become available, it tries to recover the job. > Most users think that killing a Yarn session is equivalent to canceling the > job. > > I am unsure if we should start to interpret the killing of a Yarn session as > a cancellation. Do Yarn sessions never get killed accidentally, or as the > result of a Yarn-related failure? > > Using Flink-job-at-a-time-on-yarn, cancelling the Flink Job also shuts down > the Yarn session and hence shuts down everything properly. > > Hope that train of thought helps. > > > On Tue, Jul 12, 2016 at 3:15 PM, Ufuk Celebi <[hidden email]> wrote: >> >> Are you running in HA mode? If yes, that's the expected behaviour at >> the moment, because the ZooKeeper data is only cleaned up on a >> terminal state (FINISHED, FAILED, CANCELLED). You have to specify >> separate ZooKeeper root paths via "recovery.zookeeper.path.root". >> There is an issue which should be fixed for 1.2 to make this >> configurable in an easy way. >> >> On Tue, Jul 12, 2016 at 1:28 PM, Konstantin Gregor >> <[hidden email]> wrote: >> > Hello everyone, >> > >> > I have a question concerning stopping Flink streaming processes that run >> > in a detached Yarn session. >> > >> > Here's what we do: We start a Yarn session via >> > yarn-session.sh -n 8 -d -jm 4096 -tm 10000 -s 10 -qu flink_queue >> > >> > Then, we start our Flink streaming application via >> > flink run -p 65 -c SomeClass some.jar > /dev/null 2>&1 & >> > >> > The problem occurs when we stop the application. >> > If we stop the Flink application with >> > flink cancel <JOB_ID> >> > and then kill the yarn application with >> > yarn application -kill <APPLICATION_ID> >> > everything is fine. >> > But what we expected was that when we only kill the yarn application >> > without specifically canceling the Flink job before, the Flink job will >> > stay lingering on the machine and use resources until it is killed >> > manually via its process id. >> > >> > One thing that we tried was to stop using ephemeral ports for the >> > application-manager, namely we set yarn.application-master.port >> > specifically to some port number, but the problem remains: Killing the >> > yarn application does not kill the corresponding Flink job. >> > >> > Does anyone have an idea about this? Any help is greatly appreciated :-) >> > By the way, our application reads data from a Kafka queue and writes it >> > into HDFS, maybe this is also important to know. >> > >> > Thank you and best regards >> > >> > Konstantin >> > -- >> > Konstantin Gregor * [hidden email] >> > TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring >> > Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke >> > Sitz: Unterföhring * Amtsgericht München * HRB 135082 > > |
In reply to this post by Stephan Ewen
Hi Stephan,
thank you for this clarification. I have a slightly related follow up question. I keep reading that, the preferred way to run Flink on Yarn is with "Flink-job-at-a-time-on-yarn". Can you explain this a little further? Of course, with separate YARN session the jobs are more decoupled, but on the other hand it seems contra-intuitive to start a new Flink Cluster for each job. Best Regards, Konstantin On 12.07.2016 15:48, Stephan Ewen wrote: > I think there is a confusion between how Flink thinks about HA and job > life cycle, and how many users think about it. > > Flink thinks that a killing of the YARN session is a failure of the job. > So as soon as new Yarn resources become available, it tries to recover > the job. > Most users think that killing a Yarn session is equivalent to canceling > the job. > > I am unsure if we should start to interpret the killing of a Yarn > session as a cancellation. Do Yarn sessions never get killed > accidentally, or as the result of a Yarn-related failure? > > Using Flink-job-at-a-time-on-yarn, cancelling the Flink Job also shuts > down the Yarn session and hence shuts down everything properly. > > Hope that train of thought helps. > > > On Tue, Jul 12, 2016 at 3:15 PM, Ufuk Celebi <[hidden email] > <mailto:[hidden email]>> wrote: > > Are you running in HA mode? If yes, that's the expected behaviour at > the moment, because the ZooKeeper data is only cleaned up on a > terminal state (FINISHED, FAILED, CANCELLED). You have to specify > separate ZooKeeper root paths via "recovery.zookeeper.path.root". > There is an issue which should be fixed for 1.2 to make this > configurable in an easy way. > > On Tue, Jul 12, 2016 at 1:28 PM, Konstantin Gregor > <[hidden email] > <mailto:[hidden email]>> wrote: > > Hello everyone, > > > > I have a question concerning stopping Flink streaming processes > that run > > in a detached Yarn session. > > > > Here's what we do: We start a Yarn session via > > yarn-session.sh -n 8 -d -jm 4096 -tm 10000 -s 10 -qu flink_queue > > > > Then, we start our Flink streaming application via > > flink run -p 65 -c SomeClass some.jar > /dev/null 2>&1 & > > > > The problem occurs when we stop the application. > > If we stop the Flink application with > > flink cancel <JOB_ID> > > and then kill the yarn application with > > yarn application -kill <APPLICATION_ID> > > everything is fine. > > But what we expected was that when we only kill the yarn application > > without specifically canceling the Flink job before, the Flink job > will > > stay lingering on the machine and use resources until it is killed > > manually via its process id. > > > > One thing that we tried was to stop using ephemeral ports for the > > application-manager, namely we set yarn.application-master.port > > specifically to some port number, but the problem remains: Killing the > > yarn application does not kill the corresponding Flink job. > > > > Does anyone have an idea about this? Any help is greatly > appreciated :-) > > By the way, our application reads data from a Kafka queue and > writes it > > into HDFS, maybe this is also important to know. > > > > Thank you and best regards > > > > Konstantin > > -- > > Konstantin Gregor * [hidden email] > <mailto:[hidden email]> > > TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring > > Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke > > Sitz: Unterföhring * Amtsgericht München * HRB 135082 > > Konstantin Knauf * [hidden email] * +49-174-3413182 TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke Sitz: Unterföhring * Amtsgericht München * HRB 135082 signature.asc (853 bytes) Download Attachment |
Hi Konstantin,
If you come from traditional on-premise installations it may seem counter-intuitive to start a Flink cluster for each job. However, in today's cluster world it is not a problem to request containers on demand and spawn a new Flink cluster for each job. Per job clusters are convenient because they can be tailored for the job; you only need to request as many resources as you need for the job. The typical on-premise Flink cluster which you get when you start a Yarn session, has a static resource consumption even when no job is running. On the other hand, the per-job cluster releases all resources when the job has finished. Best, Max On Thu, Jul 28, 2016 at 12:28 PM, Konstantin Knauf <[hidden email]> wrote: > Hi Stephan, > > thank you for this clarification. I have a slightly related follow up > question. I keep reading that, the preferred way to run Flink on Yarn is > with "Flink-job-at-a-time-on-yarn". Can you explain this a little > further? Of course, with separate YARN session the jobs are more > decoupled, but on the other hand it seems contra-intuitive to start a > new Flink Cluster for each job. > > Best Regards, > > Konstantin > > On 12.07.2016 15:48, Stephan Ewen wrote: >> I think there is a confusion between how Flink thinks about HA and job >> life cycle, and how many users think about it. >> >> Flink thinks that a killing of the YARN session is a failure of the job. >> So as soon as new Yarn resources become available, it tries to recover >> the job. >> Most users think that killing a Yarn session is equivalent to canceling >> the job. >> >> I am unsure if we should start to interpret the killing of a Yarn >> session as a cancellation. Do Yarn sessions never get killed >> accidentally, or as the result of a Yarn-related failure? >> >> Using Flink-job-at-a-time-on-yarn, cancelling the Flink Job also shuts >> down the Yarn session and hence shuts down everything properly. >> >> Hope that train of thought helps. >> >> >> On Tue, Jul 12, 2016 at 3:15 PM, Ufuk Celebi <[hidden email] >> <mailto:[hidden email]>> wrote: >> >> Are you running in HA mode? If yes, that's the expected behaviour at >> the moment, because the ZooKeeper data is only cleaned up on a >> terminal state (FINISHED, FAILED, CANCELLED). You have to specify >> separate ZooKeeper root paths via "recovery.zookeeper.path.root". >> There is an issue which should be fixed for 1.2 to make this >> configurable in an easy way. >> >> On Tue, Jul 12, 2016 at 1:28 PM, Konstantin Gregor >> <[hidden email] >> <mailto:[hidden email]>> wrote: >> > Hello everyone, >> > >> > I have a question concerning stopping Flink streaming processes >> that run >> > in a detached Yarn session. >> > >> > Here's what we do: We start a Yarn session via >> > yarn-session.sh -n 8 -d -jm 4096 -tm 10000 -s 10 -qu flink_queue >> > >> > Then, we start our Flink streaming application via >> > flink run -p 65 -c SomeClass some.jar > /dev/null 2>&1 & >> > >> > The problem occurs when we stop the application. >> > If we stop the Flink application with >> > flink cancel <JOB_ID> >> > and then kill the yarn application with >> > yarn application -kill <APPLICATION_ID> >> > everything is fine. >> > But what we expected was that when we only kill the yarn application >> > without specifically canceling the Flink job before, the Flink job >> will >> > stay lingering on the machine and use resources until it is killed >> > manually via its process id. >> > >> > One thing that we tried was to stop using ephemeral ports for the >> > application-manager, namely we set yarn.application-master.port >> > specifically to some port number, but the problem remains: Killing the >> > yarn application does not kill the corresponding Flink job. >> > >> > Does anyone have an idea about this? Any help is greatly >> appreciated :-) >> > By the way, our application reads data from a Kafka queue and >> writes it >> > into HDFS, maybe this is also important to know. >> > >> > Thank you and best regards >> > >> > Konstantin >> > -- >> > Konstantin Gregor * [hidden email] >> <mailto:[hidden email]> >> > TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring >> > Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke >> > Sitz: Unterföhring * Amtsgericht München * HRB 135082 >> >> > > -- > Konstantin Knauf * [hidden email] * +49-174-3413182 > TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring > Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke > Sitz: Unterföhring * Amtsgericht München * HRB 135082 > |
In reply to this post by Stephan Ewen
Hi Stephan, hi Ufuk,
thank you very much for your insights, and sorry for the late reply, there was a lot going on recently. We finally figured out what the problem was: As you pointed out, the Flink job simply waited for new YARN resources. But when a new YARN session started, the Flink job did not come back up. The reason was state from very old jobs hanging around in ZooKeeper and the state backend somehow, probably due to some ungraceful shutdowns during our experiments in the past. So Flink tried to recover all those old jobs about which it found information in ZooKeeper and this just failed, since those old jobs needed classes that didn't even exist anymore. Cleaning up the state backend and the ZooKeeper links to the job graphs in the state backend did the trick and everything works as expected now. Thanks again for your input and best regards Konstantin Gregor On 12.07.2016 15:48, Stephan Ewen wrote: > I think there is a confusion between how Flink thinks about HA and job > life cycle, and how many users think about it. > > Flink thinks that a killing of the YARN session is a failure of the job. > So as soon as new Yarn resources become available, it tries to recover > the job. > Most users think that killing a Yarn session is equivalent to canceling > the job. > > I am unsure if we should start to interpret the killing of a Yarn > session as a cancellation. Do Yarn sessions never get killed > accidentally, or as the result of a Yarn-related failure? > > Using Flink-job-at-a-time-on-yarn, cancelling the Flink Job also shuts > down the Yarn session and hence shuts down everything properly. > > Hope that train of thought helps. > > > On Tue, Jul 12, 2016 at 3:15 PM, Ufuk Celebi <[hidden email] > <mailto:[hidden email]>> wrote: > > Are you running in HA mode? If yes, that's the expected behaviour at > the moment, because the ZooKeeper data is only cleaned up on a > terminal state (FINISHED, FAILED, CANCELLED). You have to specify > separate ZooKeeper root paths via "recovery.zookeeper.path.root". > There is an issue which should be fixed for 1.2 to make this > configurable in an easy way. > > On Tue, Jul 12, 2016 at 1:28 PM, Konstantin Gregor > <[hidden email] > <mailto:[hidden email]>> wrote: > > Hello everyone, > > > > I have a question concerning stopping Flink streaming processes > that run > > in a detached Yarn session. > > > > Here's what we do: We start a Yarn session via > > yarn-session.sh -n 8 -d -jm 4096 -tm 10000 -s 10 -qu flink_queue > > > > Then, we start our Flink streaming application via > > flink run -p 65 -c SomeClass some.jar > /dev/null 2>&1 & > > > > The problem occurs when we stop the application. > > If we stop the Flink application with > > flink cancel <JOB_ID> > > and then kill the yarn application with > > yarn application -kill <APPLICATION_ID> > > everything is fine. > > But what we expected was that when we only kill the yarn application > > without specifically canceling the Flink job before, the Flink job > will > > stay lingering on the machine and use resources until it is killed > > manually via its process id. > > > > One thing that we tried was to stop using ephemeral ports for the > > application-manager, namely we set yarn.application-master.port > > specifically to some port number, but the problem remains: Killing the > > yarn application does not kill the corresponding Flink job. > > > > Does anyone have an idea about this? Any help is greatly > appreciated :-) > > By the way, our application reads data from a Kafka queue and > writes it > > into HDFS, maybe this is also important to know. > > > > Thank you and best regards > > > > Konstantin > > -- > > Konstantin Gregor * [hidden email] > <mailto:[hidden email]> > > TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring > > Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke > > Sitz: Unterföhring * Amtsgericht München * HRB 135082 > > -- Konstantin Gregor * [hidden email] TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke Sitz: Unterföhring * Amtsgericht München * HRB 135082 |
Free forum by Nabble | Edit this page |