Killing Yarn Session Leaves Lingering Flink Jobs

classic Classic list List threaded Threaded
7 messages Options
Reply | Threaded
Open this post in threaded view
|

Killing Yarn Session Leaves Lingering Flink Jobs

Konstantin Gregor
Hello everyone,

I have a question concerning stopping Flink streaming processes that run
in a detached Yarn session.

Here's what we do: We start a Yarn session via
yarn-session.sh -n 8 -d -jm 4096 -tm 10000 -s 10 -qu flink_queue

Then, we start our Flink streaming application via
flink run -p 65 -c SomeClass some.jar > /dev/null 2>&1  &

The problem occurs when we stop the application.
If we stop the Flink application with
flink cancel <JOB_ID>
and then kill the yarn application with
yarn application -kill <APPLICATION_ID>
everything is fine.
But what we expected was that when we only kill the yarn application
without specifically canceling the Flink job before, the Flink job will
stay lingering on the machine and use resources until it is killed
manually via its process id.

One thing that we tried was to stop using ephemeral ports for the
application-manager, namely we set yarn.application-master.port
specifically to some port number, but the problem remains: Killing the
yarn application does not kill the corresponding Flink job.

Does anyone have an idea about this? Any help is greatly appreciated :-)
By the way, our application reads data from a Kafka queue and writes it
into HDFS, maybe this is also important to know.

Thank you and best regards

Konstantin
--
Konstantin Gregor * [hidden email]
TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
Sitz: Unterföhring * Amtsgericht München * HRB 135082
Reply | Threaded
Open this post in threaded view
|

Re: Killing Yarn Session Leaves Lingering Flink Jobs

Ufuk Celebi
Are you running in HA mode? If yes, that's the expected behaviour at
the moment, because the ZooKeeper data is only cleaned up on a
terminal state (FINISHED, FAILED, CANCELLED). You have to specify
separate ZooKeeper root paths via "recovery.zookeeper.path.root".
There is an issue which should be fixed for 1.2 to make this
configurable in an easy way.

On Tue, Jul 12, 2016 at 1:28 PM, Konstantin Gregor
<[hidden email]> wrote:

> Hello everyone,
>
> I have a question concerning stopping Flink streaming processes that run
> in a detached Yarn session.
>
> Here's what we do: We start a Yarn session via
> yarn-session.sh -n 8 -d -jm 4096 -tm 10000 -s 10 -qu flink_queue
>
> Then, we start our Flink streaming application via
> flink run -p 65 -c SomeClass some.jar > /dev/null 2>&1  &
>
> The problem occurs when we stop the application.
> If we stop the Flink application with
> flink cancel <JOB_ID>
> and then kill the yarn application with
> yarn application -kill <APPLICATION_ID>
> everything is fine.
> But what we expected was that when we only kill the yarn application
> without specifically canceling the Flink job before, the Flink job will
> stay lingering on the machine and use resources until it is killed
> manually via its process id.
>
> One thing that we tried was to stop using ephemeral ports for the
> application-manager, namely we set yarn.application-master.port
> specifically to some port number, but the problem remains: Killing the
> yarn application does not kill the corresponding Flink job.
>
> Does anyone have an idea about this? Any help is greatly appreciated :-)
> By the way, our application reads data from a Kafka queue and writes it
> into HDFS, maybe this is also important to know.
>
> Thank you and best regards
>
> Konstantin
> --
> Konstantin Gregor * [hidden email]
> TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
> Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
> Sitz: Unterföhring * Amtsgericht München * HRB 135082
Reply | Threaded
Open this post in threaded view
|

Re: Killing Yarn Session Leaves Lingering Flink Jobs

Stephan Ewen
I think there is a confusion between how Flink thinks about HA and job life cycle, and how many users think about it.

Flink thinks that a killing of the YARN session is a failure of the job. So as soon as new Yarn resources become available, it tries to recover the job.
Most users think that killing a Yarn session is equivalent to canceling the job.

I am unsure if we should start to interpret the killing of a Yarn session as a cancellation. Do Yarn sessions never get killed accidentally, or as the result of a Yarn-related failure?

Using Flink-job-at-a-time-on-yarn, cancelling the Flink Job also shuts down the Yarn session and hence shuts down everything properly.

Hope that train of thought helps.


On Tue, Jul 12, 2016 at 3:15 PM, Ufuk Celebi <[hidden email]> wrote:
Are you running in HA mode? If yes, that's the expected behaviour at
the moment, because the ZooKeeper data is only cleaned up on a
terminal state (FINISHED, FAILED, CANCELLED). You have to specify
separate ZooKeeper root paths via "recovery.zookeeper.path.root".
There is an issue which should be fixed for 1.2 to make this
configurable in an easy way.

On Tue, Jul 12, 2016 at 1:28 PM, Konstantin Gregor
<[hidden email]> wrote:
> Hello everyone,
>
> I have a question concerning stopping Flink streaming processes that run
> in a detached Yarn session.
>
> Here's what we do: We start a Yarn session via
> yarn-session.sh -n 8 -d -jm 4096 -tm 10000 -s 10 -qu flink_queue
>
> Then, we start our Flink streaming application via
> flink run -p 65 -c SomeClass some.jar > /dev/null 2>&1  &
>
> The problem occurs when we stop the application.
> If we stop the Flink application with
> flink cancel <JOB_ID>
> and then kill the yarn application with
> yarn application -kill <APPLICATION_ID>
> everything is fine.
> But what we expected was that when we only kill the yarn application
> without specifically canceling the Flink job before, the Flink job will
> stay lingering on the machine and use resources until it is killed
> manually via its process id.
>
> One thing that we tried was to stop using ephemeral ports for the
> application-manager, namely we set yarn.application-master.port
> specifically to some port number, but the problem remains: Killing the
> yarn application does not kill the corresponding Flink job.
>
> Does anyone have an idea about this? Any help is greatly appreciated :-)
> By the way, our application reads data from a Kafka queue and writes it
> into HDFS, maybe this is also important to know.
>
> Thank you and best regards
>
> Konstantin
> --
> Konstantin Gregor * [hidden email]
> TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
> Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
> Sitz: Unterföhring * Amtsgericht München * HRB 135082

Reply | Threaded
Open this post in threaded view
|

Re: Killing Yarn Session Leaves Lingering Flink Jobs

Ufuk Celebi
This is being addressed here: https://github.com/apache/flink/pull/2249

On Tue, Jul 12, 2016 at 3:48 PM, Stephan Ewen <[hidden email]> wrote:

> I think there is a confusion between how Flink thinks about HA and job life
> cycle, and how many users think about it.
>
> Flink thinks that a killing of the YARN session is a failure of the job. So
> as soon as new Yarn resources become available, it tries to recover the job.
> Most users think that killing a Yarn session is equivalent to canceling the
> job.
>
> I am unsure if we should start to interpret the killing of a Yarn session as
> a cancellation. Do Yarn sessions never get killed accidentally, or as the
> result of a Yarn-related failure?
>
> Using Flink-job-at-a-time-on-yarn, cancelling the Flink Job also shuts down
> the Yarn session and hence shuts down everything properly.
>
> Hope that train of thought helps.
>
>
> On Tue, Jul 12, 2016 at 3:15 PM, Ufuk Celebi <[hidden email]> wrote:
>>
>> Are you running in HA mode? If yes, that's the expected behaviour at
>> the moment, because the ZooKeeper data is only cleaned up on a
>> terminal state (FINISHED, FAILED, CANCELLED). You have to specify
>> separate ZooKeeper root paths via "recovery.zookeeper.path.root".
>> There is an issue which should be fixed for 1.2 to make this
>> configurable in an easy way.
>>
>> On Tue, Jul 12, 2016 at 1:28 PM, Konstantin Gregor
>> <[hidden email]> wrote:
>> > Hello everyone,
>> >
>> > I have a question concerning stopping Flink streaming processes that run
>> > in a detached Yarn session.
>> >
>> > Here's what we do: We start a Yarn session via
>> > yarn-session.sh -n 8 -d -jm 4096 -tm 10000 -s 10 -qu flink_queue
>> >
>> > Then, we start our Flink streaming application via
>> > flink run -p 65 -c SomeClass some.jar > /dev/null 2>&1  &
>> >
>> > The problem occurs when we stop the application.
>> > If we stop the Flink application with
>> > flink cancel <JOB_ID>
>> > and then kill the yarn application with
>> > yarn application -kill <APPLICATION_ID>
>> > everything is fine.
>> > But what we expected was that when we only kill the yarn application
>> > without specifically canceling the Flink job before, the Flink job will
>> > stay lingering on the machine and use resources until it is killed
>> > manually via its process id.
>> >
>> > One thing that we tried was to stop using ephemeral ports for the
>> > application-manager, namely we set yarn.application-master.port
>> > specifically to some port number, but the problem remains: Killing the
>> > yarn application does not kill the corresponding Flink job.
>> >
>> > Does anyone have an idea about this? Any help is greatly appreciated :-)
>> > By the way, our application reads data from a Kafka queue and writes it
>> > into HDFS, maybe this is also important to know.
>> >
>> > Thank you and best regards
>> >
>> > Konstantin
>> > --
>> > Konstantin Gregor * [hidden email]
>> > TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
>> > Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
>> > Sitz: Unterföhring * Amtsgericht München * HRB 135082
>
>
Reply | Threaded
Open this post in threaded view
|

Re: Killing Yarn Session Leaves Lingering Flink Jobs

snntr
In reply to this post by Stephan Ewen
Hi Stephan,

thank you for this clarification. I have a slightly related follow up
question. I keep reading that, the preferred way to run Flink on Yarn is
with "Flink-job-at-a-time-on-yarn". Can you explain this a little
further? Of course, with separate YARN session the jobs are more
decoupled, but on the other hand it seems contra-intuitive to start a
new Flink Cluster for each job.

Best Regards,

Konstantin

On 12.07.2016 15:48, Stephan Ewen wrote:

> I think there is a confusion between how Flink thinks about HA and job
> life cycle, and how many users think about it.
>
> Flink thinks that a killing of the YARN session is a failure of the job.
> So as soon as new Yarn resources become available, it tries to recover
> the job.
> Most users think that killing a Yarn session is equivalent to canceling
> the job.
>
> I am unsure if we should start to interpret the killing of a Yarn
> session as a cancellation. Do Yarn sessions never get killed
> accidentally, or as the result of a Yarn-related failure?
>
> Using Flink-job-at-a-time-on-yarn, cancelling the Flink Job also shuts
> down the Yarn session and hence shuts down everything properly.
>
> Hope that train of thought helps.
>
>
> On Tue, Jul 12, 2016 at 3:15 PM, Ufuk Celebi <[hidden email]
> <mailto:[hidden email]>> wrote:
>
>     Are you running in HA mode? If yes, that's the expected behaviour at
>     the moment, because the ZooKeeper data is only cleaned up on a
>     terminal state (FINISHED, FAILED, CANCELLED). You have to specify
>     separate ZooKeeper root paths via "recovery.zookeeper.path.root".
>     There is an issue which should be fixed for 1.2 to make this
>     configurable in an easy way.
>
>     On Tue, Jul 12, 2016 at 1:28 PM, Konstantin Gregor
>     <[hidden email]
>     <mailto:[hidden email]>> wrote:
>     > Hello everyone,
>     >
>     > I have a question concerning stopping Flink streaming processes
>     that run
>     > in a detached Yarn session.
>     >
>     > Here's what we do: We start a Yarn session via
>     > yarn-session.sh -n 8 -d -jm 4096 -tm 10000 -s 10 -qu flink_queue
>     >
>     > Then, we start our Flink streaming application via
>     > flink run -p 65 -c SomeClass some.jar > /dev/null 2>&1  &
>     >
>     > The problem occurs when we stop the application.
>     > If we stop the Flink application with
>     > flink cancel <JOB_ID>
>     > and then kill the yarn application with
>     > yarn application -kill <APPLICATION_ID>
>     > everything is fine.
>     > But what we expected was that when we only kill the yarn application
>     > without specifically canceling the Flink job before, the Flink job
>     will
>     > stay lingering on the machine and use resources until it is killed
>     > manually via its process id.
>     >
>     > One thing that we tried was to stop using ephemeral ports for the
>     > application-manager, namely we set yarn.application-master.port
>     > specifically to some port number, but the problem remains: Killing the
>     > yarn application does not kill the corresponding Flink job.
>     >
>     > Does anyone have an idea about this? Any help is greatly
>     appreciated :-)
>     > By the way, our application reads data from a Kafka queue and
>     writes it
>     > into HDFS, maybe this is also important to know.
>     >
>     > Thank you and best regards
>     >
>     > Konstantin
>     > --
>     > Konstantin Gregor * [hidden email]
>     <mailto:[hidden email]>
>     > TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
>     > Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
>     > Sitz: Unterföhring * Amtsgericht München * HRB 135082
>
>
--
Konstantin Knauf * [hidden email] * +49-174-3413182
TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
Sitz: Unterföhring * Amtsgericht München * HRB 135082


signature.asc (853 bytes) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: Killing Yarn Session Leaves Lingering Flink Jobs

Maximilian Michels
Hi Konstantin,

If you come from traditional on-premise installations it may seem
counter-intuitive to start a Flink cluster for each job. However, in
today's cluster world it is not a problem to request containers on
demand and spawn a new Flink cluster for each job. Per job clusters
are convenient because they can be tailored for the job; you only need
to request as many resources as you need for the job. The typical
on-premise Flink cluster which you get when you start a Yarn session,
has a static resource consumption even when no job is running. On the
other hand, the per-job cluster releases all resources when the job
has finished.

Best,
Max

On Thu, Jul 28, 2016 at 12:28 PM, Konstantin Knauf
<[hidden email]> wrote:

> Hi Stephan,
>
> thank you for this clarification. I have a slightly related follow up
> question. I keep reading that, the preferred way to run Flink on Yarn is
> with "Flink-job-at-a-time-on-yarn". Can you explain this a little
> further? Of course, with separate YARN session the jobs are more
> decoupled, but on the other hand it seems contra-intuitive to start a
> new Flink Cluster for each job.
>
> Best Regards,
>
> Konstantin
>
> On 12.07.2016 15:48, Stephan Ewen wrote:
>> I think there is a confusion between how Flink thinks about HA and job
>> life cycle, and how many users think about it.
>>
>> Flink thinks that a killing of the YARN session is a failure of the job.
>> So as soon as new Yarn resources become available, it tries to recover
>> the job.
>> Most users think that killing a Yarn session is equivalent to canceling
>> the job.
>>
>> I am unsure if we should start to interpret the killing of a Yarn
>> session as a cancellation. Do Yarn sessions never get killed
>> accidentally, or as the result of a Yarn-related failure?
>>
>> Using Flink-job-at-a-time-on-yarn, cancelling the Flink Job also shuts
>> down the Yarn session and hence shuts down everything properly.
>>
>> Hope that train of thought helps.
>>
>>
>> On Tue, Jul 12, 2016 at 3:15 PM, Ufuk Celebi <[hidden email]
>> <mailto:[hidden email]>> wrote:
>>
>>     Are you running in HA mode? If yes, that's the expected behaviour at
>>     the moment, because the ZooKeeper data is only cleaned up on a
>>     terminal state (FINISHED, FAILED, CANCELLED). You have to specify
>>     separate ZooKeeper root paths via "recovery.zookeeper.path.root".
>>     There is an issue which should be fixed for 1.2 to make this
>>     configurable in an easy way.
>>
>>     On Tue, Jul 12, 2016 at 1:28 PM, Konstantin Gregor
>>     <[hidden email]
>>     <mailto:[hidden email]>> wrote:
>>     > Hello everyone,
>>     >
>>     > I have a question concerning stopping Flink streaming processes
>>     that run
>>     > in a detached Yarn session.
>>     >
>>     > Here's what we do: We start a Yarn session via
>>     > yarn-session.sh -n 8 -d -jm 4096 -tm 10000 -s 10 -qu flink_queue
>>     >
>>     > Then, we start our Flink streaming application via
>>     > flink run -p 65 -c SomeClass some.jar > /dev/null 2>&1  &
>>     >
>>     > The problem occurs when we stop the application.
>>     > If we stop the Flink application with
>>     > flink cancel <JOB_ID>
>>     > and then kill the yarn application with
>>     > yarn application -kill <APPLICATION_ID>
>>     > everything is fine.
>>     > But what we expected was that when we only kill the yarn application
>>     > without specifically canceling the Flink job before, the Flink job
>>     will
>>     > stay lingering on the machine and use resources until it is killed
>>     > manually via its process id.
>>     >
>>     > One thing that we tried was to stop using ephemeral ports for the
>>     > application-manager, namely we set yarn.application-master.port
>>     > specifically to some port number, but the problem remains: Killing the
>>     > yarn application does not kill the corresponding Flink job.
>>     >
>>     > Does anyone have an idea about this? Any help is greatly
>>     appreciated :-)
>>     > By the way, our application reads data from a Kafka queue and
>>     writes it
>>     > into HDFS, maybe this is also important to know.
>>     >
>>     > Thank you and best regards
>>     >
>>     > Konstantin
>>     > --
>>     > Konstantin Gregor * [hidden email]
>>     <mailto:[hidden email]>
>>     > TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
>>     > Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
>>     > Sitz: Unterföhring * Amtsgericht München * HRB 135082
>>
>>
>
> --
> Konstantin Knauf * [hidden email] * +49-174-3413182
> TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
> Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
> Sitz: Unterföhring * Amtsgericht München * HRB 135082
>
Reply | Threaded
Open this post in threaded view
|

Re: Killing Yarn Session Leaves Lingering Flink Jobs

Konstantin Gregor
In reply to this post by Stephan Ewen
Hi Stephan, hi Ufuk,

thank you very much for your insights, and sorry for the late reply,
there was a lot going on recently.
We finally figured out what the problem was: As you pointed out, the
Flink job simply waited for new YARN resources. But when a new YARN
session started, the Flink job did not come back up. The reason was
state from very old jobs hanging around in ZooKeeper and the state
backend somehow, probably due to some ungraceful shutdowns during our
experiments in the past.
So Flink tried to recover all those old jobs about which it found
information in ZooKeeper and this just failed, since those old jobs
needed classes that didn't even exist anymore.
Cleaning up the state backend and the ZooKeeper links to the job graphs
in the state backend did the trick and everything works as expected now.

Thanks again for your input and best regards

Konstantin Gregor

On 12.07.2016 15:48, Stephan Ewen wrote:

> I think there is a confusion between how Flink thinks about HA and job
> life cycle, and how many users think about it.
>
> Flink thinks that a killing of the YARN session is a failure of the job.
> So as soon as new Yarn resources become available, it tries to recover
> the job.
> Most users think that killing a Yarn session is equivalent to canceling
> the job.
>
> I am unsure if we should start to interpret the killing of a Yarn
> session as a cancellation. Do Yarn sessions never get killed
> accidentally, or as the result of a Yarn-related failure?
>
> Using Flink-job-at-a-time-on-yarn, cancelling the Flink Job also shuts
> down the Yarn session and hence shuts down everything properly.
>
> Hope that train of thought helps.
>
>
> On Tue, Jul 12, 2016 at 3:15 PM, Ufuk Celebi <[hidden email]
> <mailto:[hidden email]>> wrote:
>
>     Are you running in HA mode? If yes, that's the expected behaviour at
>     the moment, because the ZooKeeper data is only cleaned up on a
>     terminal state (FINISHED, FAILED, CANCELLED). You have to specify
>     separate ZooKeeper root paths via "recovery.zookeeper.path.root".
>     There is an issue which should be fixed for 1.2 to make this
>     configurable in an easy way.
>
>     On Tue, Jul 12, 2016 at 1:28 PM, Konstantin Gregor
>     <[hidden email]
>     <mailto:[hidden email]>> wrote:
>     > Hello everyone,
>     >
>     > I have a question concerning stopping Flink streaming processes
>     that run
>     > in a detached Yarn session.
>     >
>     > Here's what we do: We start a Yarn session via
>     > yarn-session.sh -n 8 -d -jm 4096 -tm 10000 -s 10 -qu flink_queue
>     >
>     > Then, we start our Flink streaming application via
>     > flink run -p 65 -c SomeClass some.jar > /dev/null 2>&1  &
>     >
>     > The problem occurs when we stop the application.
>     > If we stop the Flink application with
>     > flink cancel <JOB_ID>
>     > and then kill the yarn application with
>     > yarn application -kill <APPLICATION_ID>
>     > everything is fine.
>     > But what we expected was that when we only kill the yarn application
>     > without specifically canceling the Flink job before, the Flink job
>     will
>     > stay lingering on the machine and use resources until it is killed
>     > manually via its process id.
>     >
>     > One thing that we tried was to stop using ephemeral ports for the
>     > application-manager, namely we set yarn.application-master.port
>     > specifically to some port number, but the problem remains: Killing the
>     > yarn application does not kill the corresponding Flink job.
>     >
>     > Does anyone have an idea about this? Any help is greatly
>     appreciated :-)
>     > By the way, our application reads data from a Kafka queue and
>     writes it
>     > into HDFS, maybe this is also important to know.
>     >
>     > Thank you and best regards
>     >
>     > Konstantin
>     > --
>     > Konstantin Gregor * [hidden email]
>     <mailto:[hidden email]>
>     > TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
>     > Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
>     > Sitz: Unterföhring * Amtsgericht München * HRB 135082
>
>

--
Konstantin Gregor * [hidden email]
TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
Sitz: Unterföhring * Amtsgericht München * HRB 135082