Failed job restart - flink on yarn

classic Classic list List threaded Threaded
6 messages Options
Reply | Threaded
Open this post in threaded view
|

Failed job restart - flink on yarn

vprabhu@gmail.com
Hi,

I have a flink streaming job that reads from kafka, performs a aggregation in a window, it ran fine for a while however when the number of events in a window crossed a certain limit , the yarn containers failed with Out Of Memory. The job was running with 10G containers.

We have about 64G memory on the machine and now I want to restart the job  with a 20G container (we ran some tests and 20G should be good enough to accomodate all the elements from the window).

Is there a way to restart the job from the last checkpoint ?

When I resubmit the job, it starts from the last committed offsets however the events that were held in the window at the time of checkpointing seem to get lost. Is there a way to recover the events buffered within the window and were checkpointed before the failure ?

Thanks,
Prabhu
Reply | Threaded
Open this post in threaded view
|

Re: Failed job restart - flink on yarn

Jamie Grier
Hi Prabhu,

Have you taken a look at Flink's savepoints feature?  This allows you to make snapshots of your job's state on demand and then at any time restart your job from that point: https://ci.apache.org/projects/flink/flink-docs-release-1.0/apis/streaming/savepoints.html

Also know that you can use Flink disk-backed state backend as well if you're job state is larger than fits in memory.  See https://ci.apache.org/projects/flink/flink-docs-release-1.0/apis/streaming/state_backends.html#the-rocksdbstatebackend


-Jamie


On Fri, Jul 1, 2016 at 1:34 PM, [hidden email] <[hidden email]> wrote:
Hi,

I have a flink streaming job that reads from kafka, performs a aggregation
in a window, it ran fine for a while however when the number of events in a
window crossed a certain limit , the yarn containers failed with Out Of
Memory. The job was running with 10G containers.

We have about 64G memory on the machine and now I want to restart the job
with a 20G container (we ran some tests and 20G should be good enough to
accomodate all the elements from the window).

Is there a way to restart the job from the last checkpoint ?

When I resubmit the job, it starts from the last committed offsets however
the events that were held in the window at the time of checkpointing seem to
get lost. Is there a way to recover the events buffered within the window
and were checkpointed before the failure ?

Thanks,
Prabhu



--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Failed-job-restart-flink-on-yarn-tp7764.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.



--

Jamie Grier
data Artisans, Director of Applications Engineering

Reply | Threaded
Open this post in threaded view
|

Re: Failed job restart - flink on yarn

vprabhu@gmail.com
Hi Jamie,

Thanks for the reply.

Yeah i looked at save points, i want to start my job only from the last checkpoint, this means I have to keep track of when the checkpoint was taken and the trigger a save point. I am not sure this is the way to go. My state backend is HDFS and I can see that the checkpoint path has the data that has been buffered in the window.

I want to start the job in a way such that it will read the checkpointed data before the failure and continue processing.

I realise that the checkpoints are used whenever there is a container failure, and a new container is obtained. In my case the job failed because a container failed for the maximum AllowedN umber of failures

Thanks,
Prabhu

On Fri, Jul 1, 2016 at 3:54 PM, Jamie Grier [via Apache Flink User Mailing List archive.] <[hidden email]> wrote:
Hi Prabhu,

Have you taken a look at Flink's savepoints feature?  This allows you to make snapshots of your job's state on demand and then at any time restart your job from that point: https://ci.apache.org/projects/flink/flink-docs-release-1.0/apis/streaming/savepoints.html

Also know that you can use Flink disk-backed state backend as well if you're job state is larger than fits in memory.  See https://ci.apache.org/projects/flink/flink-docs-release-1.0/apis/streaming/state_backends.html#the-rocksdbstatebackend


-Jamie


On Fri, Jul 1, 2016 at 1:34 PM, [hidden email] <[hidden email]> wrote:
Hi,

I have a flink streaming job that reads from kafka, performs a aggregation
in a window, it ran fine for a while however when the number of events in a
window crossed a certain limit , the yarn containers failed with Out Of
Memory. The job was running with 10G containers.

We have about 64G memory on the machine and now I want to restart the job
with a 20G container (we ran some tests and 20G should be good enough to
accomodate all the elements from the window).

Is there a way to restart the job from the last checkpoint ?

When I resubmit the job, it starts from the last committed offsets however
the events that were held in the window at the time of checkpointing seem to
get lost. Is there a way to recover the events buffered within the window
and were checkpointed before the failure ?

Thanks,
Prabhu



--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Failed-job-restart-flink-on-yarn-tp7764.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.



--

Jamie Grier
data Artisans, Director of Applications Engineering




If you reply to this email, your message will be added to the discussion below:
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Failed-job-restart-flink-on-yarn-tp7764p7767.html
To unsubscribe from Failed job restart - flink on yarn, click here.
NAML

Reply | Threaded
Open this post in threaded view
|

Re: Failed job restart - flink on yarn

Ufuk Celebi
If you just re-submit the job without a savepoint, the Kafka consumer
will by default start processing from the latest offset and the
operators will be in an empty state. It should be possible to add a
feature to Flink, which allows turning the latest checkpoint to a
savepoint, from which you then could resume the job after increasing
the container memory. But I'm afraid that this won't make it to the
next release though. I will open an issue for it though.

A work around (more a hack) would be to run in HA mode
(https://ci.apache.org/projects/flink/flink-docs-release-1.0/setup/jobmanager_high_availability.html)
and just shut down the YARN containers without cancelling the job. The
latest checkpoint meta data should be stored in ZooKeeper and resumed
when you restart the cluster. It's really more a hack/abuse of HA
though.

– Ufuk


On Sat, Jul 2, 2016 at 7:09 AM, [hidden email] <[hidden email]> wrote:

> Hi Jamie,
>
> Thanks for the reply.
>
> Yeah i looked at save points, i want to start my job only from the last
> checkpoint, this means I have to keep track of when the checkpoint was taken
> and the trigger a save point. I am not sure this is the way to go. My state
> backend is HDFS and I can see that the checkpoint path has the data that has
> been buffered in the window.
>
> I want to start the job in a way such that it will read the checkpointed
> data before the failure and continue processing.
>
> I realise that the checkpoints are used whenever there is a container
> failure, and a new container is obtained. In my case the job failed because
> a container failed for the maximum AllowedN umber of failures
>
> Thanks,
> Prabhu
>
> On Fri, Jul 1, 2016 at 3:54 PM, Jamie Grier [via Apache Flink User Mailing
> List archive.] <[hidden email]> wrote:
>>
>> Hi Prabhu,
>>
>> Have you taken a look at Flink's savepoints feature?  This allows you to
>> make snapshots of your job's state on demand and then at any time restart
>> your job from that point:
>> https://ci.apache.org/projects/flink/flink-docs-release-1.0/apis/streaming/savepoints.html
>>
>> Also know that you can use Flink disk-backed state backend as well if
>> you're job state is larger than fits in memory.  See
>> https://ci.apache.org/projects/flink/flink-docs-release-1.0/apis/streaming/state_backends.html#the-rocksdbstatebackend
>>
>>
>> -Jamie
>>
>>
>> On Fri, Jul 1, 2016 at 1:34 PM, [hidden email] <[hidden email]> wrote:
>>>
>>> Hi,
>>>
>>> I have a flink streaming job that reads from kafka, performs a
>>> aggregation
>>> in a window, it ran fine for a while however when the number of events in
>>> a
>>> window crossed a certain limit , the yarn containers failed with Out Of
>>> Memory. The job was running with 10G containers.
>>>
>>> We have about 64G memory on the machine and now I want to restart the job
>>> with a 20G container (we ran some tests and 20G should be good enough to
>>> accomodate all the elements from the window).
>>>
>>> Is there a way to restart the job from the last checkpoint ?
>>>
>>> When I resubmit the job, it starts from the last committed offsets
>>> however
>>> the events that were held in the window at the time of checkpointing seem
>>> to
>>> get lost. Is there a way to recover the events buffered within the window
>>> and were checkpointed before the failure ?
>>>
>>> Thanks,
>>> Prabhu
>>>
>>>
>>>
>>> --
>>> View this message in context:
>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Failed-job-restart-flink-on-yarn-tp7764.html
>>> Sent from the Apache Flink User Mailing List archive. mailing list
>>> archive at Nabble.com.
>>
>>
>>
>>
>> --
>>
>> Jamie Grier
>> data Artisans, Director of Applications Engineering
>> @jamiegrier
>> [hidden email]
>>
>>
>>
>> ________________________________
>> If you reply to this email, your message will be added to the discussion
>> below:
>>
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Failed-job-restart-flink-on-yarn-tp7764p7767.html
>> To unsubscribe from Failed job restart - flink on yarn, click here.
>> NAML
>
>
>
> ________________________________
> View this message in context: Re: Failed job restart - flink on yarn
>
> Sent from the Apache Flink User Mailing List archive. mailing list archive
> at Nabble.com.
Reply | Threaded
Open this post in threaded view
|

Re: Failed job restart - flink on yarn

vprabhu@gmail.com
Thanks for the reply, It would be great to have the feature to restart a failed job from the last checkpoint.


Is there a way to pass the initial set of partition-offsets to the kafka-client ? In that case I can maintain a list of last processed offsets from within my window operation (possibly store the offsets in some database) and use that to bootstrap the kafka client upon restart.

I realize that I can probably reset the offsets for the consumer group from some external program to the last fully processed offsets and restart the job, just want to confirm if there is already a feature in the kafka-client.

Thanks,
Prabhu

On Mon, Jul 4, 2016 at 2:17 AM, Ufuk Celebi [via Apache Flink User Mailing List archive.] <[hidden email]> wrote:
If you just re-submit the job without a savepoint, the Kafka consumer
will by default start processing from the latest offset and the
operators will be in an empty state. It should be possible to add a
feature to Flink, which allows turning the latest checkpoint to a
savepoint, from which you then could resume the job after increasing
the container memory. But I'm afraid that this won't make it to the
next release though. I will open an issue for it though.

A work around (more a hack) would be to run in HA mode
(https://ci.apache.org/projects/flink/flink-docs-release-1.0/setup/jobmanager_high_availability.html)
and just shut down the YARN containers without cancelling the job. The
latest checkpoint meta data should be stored in ZooKeeper and resumed
when you restart the cluster. It's really more a hack/abuse of HA
though.

– Ufuk


On Sat, Jul 2, 2016 at 7:09 AM, [hidden email] <[hidden email]> wrote:

> Hi Jamie,
>
> Thanks for the reply.
>
> Yeah i looked at save points, i want to start my job only from the last
> checkpoint, this means I have to keep track of when the checkpoint was taken
> and the trigger a save point. I am not sure this is the way to go. My state
> backend is HDFS and I can see that the checkpoint path has the data that has
> been buffered in the window.
>
> I want to start the job in a way such that it will read the checkpointed
> data before the failure and continue processing.
>
> I realise that the checkpoints are used whenever there is a container
> failure, and a new container is obtained. In my case the job failed because
> a container failed for the maximum AllowedN umber of failures
>
> Thanks,
> Prabhu
>
> On Fri, Jul 1, 2016 at 3:54 PM, Jamie Grier [via Apache Flink User Mailing
> List archive.] <[hidden email]> wrote:

>>
>> Hi Prabhu,
>>
>> Have you taken a look at Flink's savepoints feature?  This allows you to
>> make snapshots of your job's state on demand and then at any time restart
>> your job from that point:
>> https://ci.apache.org/projects/flink/flink-docs-release-1.0/apis/streaming/savepoints.html
>>
>> Also know that you can use Flink disk-backed state backend as well if
>> you're job state is larger than fits in memory.  See
>> https://ci.apache.org/projects/flink/flink-docs-release-1.0/apis/streaming/state_backends.html#the-rocksdbstatebackend
>>
>>
>> -Jamie
>>
>>
>> On Fri, Jul 1, 2016 at 1:34 PM, [hidden email] <[hidden email]> wrote:
>>>
>>> Hi,
>>>
>>> I have a flink streaming job that reads from kafka, performs a
>>> aggregation
>>> in a window, it ran fine for a while however when the number of events in
>>> a
>>> window crossed a certain limit , the yarn containers failed with Out Of
>>> Memory. The job was running with 10G containers.
>>>
>>> We have about 64G memory on the machine and now I want to restart the job
>>> with a 20G container (we ran some tests and 20G should be good enough to
>>> accomodate all the elements from the window).
>>>
>>> Is there a way to restart the job from the last checkpoint ?
>>>
>>> When I resubmit the job, it starts from the last committed offsets
>>> however
>>> the events that were held in the window at the time of checkpointing seem
>>> to
>>> get lost. Is there a way to recover the events buffered within the window
>>> and were checkpointed before the failure ?
>>>
>>> Thanks,
>>> Prabhu
>>>
>>>
>>>
>>> --
>>> View this message in context:
>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Failed-job-restart-flink-on-yarn-tp7764.html
>>> Sent from the Apache Flink User Mailing List archive. mailing list
>>> archive at Nabble.com.
>>
>>
>>
>>
>> --
>>
>> Jamie Grier
>> data Artisans, Director of Applications Engineering
>> @jamiegrier
>> [hidden email]
>>
>>
>>
>> ________________________________
>> If you reply to this email, your message will be added to the discussion
>> below:
>>
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Failed-job-restart-flink-on-yarn-tp7764p7767.html
>> To unsubscribe from Failed job restart - flink on yarn, click here.
>> NAML
>
>
>
> ________________________________
> View this message in context: Re: Failed job restart - flink on yarn
>
> Sent from the Apache Flink User Mailing List archive. mailing list archive
> at Nabble.com.



If you reply to this email, your message will be added to the discussion below:
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Failed-job-restart-flink-on-yarn-tp7764p7784.html
To unsubscribe from Failed job restart - flink on yarn, click here.
NAML

Reply | Threaded
Open this post in threaded view
|

Re: Failed job restart - flink on yarn

Jamie Grier
The Kafka client can be configured to commit offsets to Zookeeper periodically even when those offsets are not used in the normal fault-tolerance case.  Normally, the Kafka offsets are part of Flink's normal state.  However, in the absence of this state the FlinkKafkaConsumer will actually retrieve the last committed offsets so you may not need to do anything special in your case unless I've misunderstood you.


On Tue, Jul 5, 2016 at 4:18 PM, [hidden email] <[hidden email]> wrote:
Thanks for the reply, It would be great to have the feature to restart a failed job from the last checkpoint.


Is there a way to pass the initial set of partition-offsets to the kafka-client ? In that case I can maintain a list of last processed offsets from within my window operation (possibly store the offsets in some database) and use that to bootstrap the kafka client upon restart.

I realize that I can probably reset the offsets for the consumer group from some external program to the last fully processed offsets and restart the job, just want to confirm if there is already a feature in the kafka-client.

Thanks,
Prabhu

On Mon, Jul 4, 2016 at 2:17 AM, Ufuk Celebi [via Apache Flink User Mailing List archive.] <[hidden email]> wrote:
If you just re-submit the job without a savepoint, the Kafka consumer
will by default start processing from the latest offset and the
operators will be in an empty state. It should be possible to add a
feature to Flink, which allows turning the latest checkpoint to a
savepoint, from which you then could resume the job after increasing
the container memory. But I'm afraid that this won't make it to the
next release though. I will open an issue for it though.

A work around (more a hack) would be to run in HA mode
(https://ci.apache.org/projects/flink/flink-docs-release-1.0/setup/jobmanager_high_availability.html)
and just shut down the YARN containers without cancelling the job. The
latest checkpoint meta data should be stored in ZooKeeper and resumed
when you restart the cluster. It's really more a hack/abuse of HA
though.

– Ufuk


On Sat, Jul 2, 2016 at 7:09 AM, [hidden email] <[hidden email]> wrote:

> Hi Jamie,
>
> Thanks for the reply.
>
> Yeah i looked at save points, i want to start my job only from the last
> checkpoint, this means I have to keep track of when the checkpoint was taken
> and the trigger a save point. I am not sure this is the way to go. My state
> backend is HDFS and I can see that the checkpoint path has the data that has
> been buffered in the window.
>
> I want to start the job in a way such that it will read the checkpointed
> data before the failure and continue processing.
>
> I realise that the checkpoints are used whenever there is a container
> failure, and a new container is obtained. In my case the job failed because
> a container failed for the maximum AllowedN umber of failures
>
> Thanks,
> Prabhu
>
> On Fri, Jul 1, 2016 at 3:54 PM, Jamie Grier [via Apache Flink User Mailing
> List archive.] <[hidden email]> wrote:

>>
>> Hi Prabhu,
>>
>> Have you taken a look at Flink's savepoints feature?  This allows you to
>> make snapshots of your job's state on demand and then at any time restart
>> your job from that point:
>> https://ci.apache.org/projects/flink/flink-docs-release-1.0/apis/streaming/savepoints.html
>>
>> Also know that you can use Flink disk-backed state backend as well if
>> you're job state is larger than fits in memory.  See
>> https://ci.apache.org/projects/flink/flink-docs-release-1.0/apis/streaming/state_backends.html#the-rocksdbstatebackend
>>
>>
>> -Jamie
>>
>>
>> On Fri, Jul 1, 2016 at 1:34 PM, [hidden email] <[hidden email]> wrote:
>>>
>>> Hi,
>>>
>>> I have a flink streaming job that reads from kafka, performs a
>>> aggregation
>>> in a window, it ran fine for a while however when the number of events in
>>> a
>>> window crossed a certain limit , the yarn containers failed with Out Of
>>> Memory. The job was running with 10G containers.
>>>
>>> We have about 64G memory on the machine and now I want to restart the job
>>> with a 20G container (we ran some tests and 20G should be good enough to
>>> accomodate all the elements from the window).
>>>
>>> Is there a way to restart the job from the last checkpoint ?
>>>
>>> When I resubmit the job, it starts from the last committed offsets
>>> however
>>> the events that were held in the window at the time of checkpointing seem
>>> to
>>> get lost. Is there a way to recover the events buffered within the window
>>> and were checkpointed before the failure ?
>>>
>>> Thanks,
>>> Prabhu
>>>
>>>
>>>
>>> --
>>> View this message in context:
>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Failed-job-restart-flink-on-yarn-tp7764.html
>>> Sent from the Apache Flink User Mailing List archive. mailing list
>>> archive at Nabble.com.
>>
>>
>>
>>
>> --
>>
>> Jamie Grier
>> data Artisans, Director of Applications Engineering
>> @jamiegrier
>> [hidden email]
>>
>>
>>
>> ________________________________
>> If you reply to this email, your message will be added to the discussion
>> below:
>>
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Failed-job-restart-flink-on-yarn-tp7764p7767.html
>> To unsubscribe from Failed job restart - flink on yarn, click here.
>> NAML
>
>
>
> ________________________________
> View this message in context: Re: Failed job restart - flink on yarn
>
> Sent from the Apache Flink User Mailing List archive. mailing list archive
> at Nabble.com.



If you reply to this email, your message will be added to the discussion below:
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Failed-job-restart-flink-on-yarn-tp7764p7784.html
To unsubscribe from Failed job restart - flink on yarn, click here.
NAML




--

Jamie Grier
data Artisans, Director of Applications Engineering