Flink Checkpoint on yarn

classic Classic list List threaded Threaded
15 messages Options
Reply | Threaded
Open this post in threaded view
|

Flink Checkpoint on yarn

Simone Robutti
Hello,

I'm testing the checkpointing functionality with hdfs as a backend.

For what I can see it uses different checkpointing files and resume the computation from different points and not from the latest available. This is to me an unexpected behaviour. 

I log every second, for every worker, a counter that is increased by 1 at each step. 

So for example on node-1 the count goes up to 5, then I kill a job manager or task manager and it resumes from 5 or 4 and it's ok. The next time I kill a job manager the count is at 15 and it resumes at 14 or 15. Sometimes it may happen that at a third kill the work resumes at 4 or 5 as if the checkpoint resumed the second time wasn't there.

Once I even saw it jump forward: the first kill is at 10 and it resumes at 9, the second kill is at 70 and it resumes at 9, the third kill is at 15 but it resumes at 69 as if it resumed from the second kill checkpoint.

This is clearly inconsistent.

Also, in the logs I can find that sometimes it uses a checkpoint file different from the previous, consistent resume.

What am I doing wrong? Is it a known bug? 
Reply | Threaded
Open this post in threaded view
|

Re: Flink Checkpoint on yarn

Ufuk Celebi
Can you please have a look into the JobManager log file and report
which checkpoints are restored? You should see messages from
ZooKeeperCompletedCheckpointStore like:
- Found X checkpoints in ZooKeeper
- Initialized with X. Removing all older checkpoints

You can share the complete job manager log file as well if you like.

– Ufuk

On Wed, Mar 16, 2016 at 2:50 PM, Simone Robutti
<[hidden email]> wrote:

> Hello,
>
> I'm testing the checkpointing functionality with hdfs as a backend.
>
> For what I can see it uses different checkpointing files and resume the
> computation from different points and not from the latest available. This is
> to me an unexpected behaviour.
>
> I log every second, for every worker, a counter that is increased by 1 at
> each step.
>
> So for example on node-1 the count goes up to 5, then I kill a job manager
> or task manager and it resumes from 5 or 4 and it's ok. The next time I kill
> a job manager the count is at 15 and it resumes at 14 or 15. Sometimes it
> may happen that at a third kill the work resumes at 4 or 5 as if the
> checkpoint resumed the second time wasn't there.
>
> Once I even saw it jump forward: the first kill is at 10 and it resumes at
> 9, the second kill is at 70 and it resumes at 9, the third kill is at 15 but
> it resumes at 69 as if it resumed from the second kill checkpoint.
>
> This is clearly inconsistent.
>
> Also, in the logs I can find that sometimes it uses a checkpoint file
> different from the previous, consistent resume.
>
> What am I doing wrong? Is it a known bug?
Reply | Threaded
Open this post in threaded view
|

Re: Flink Checkpoint on yarn

Simone Robutti
This is the log filtered to check messages from ZooKeeperCompletedCheckpointStore.

https://gist.github.com/chobeat/0222b31b87df3fa46a23

It looks like it finds only a checkpoint but I'm not sure if the different hashes and IDs of the checkpoints are meaningful or not.



2016-03-16 15:33 GMT+01:00 Ufuk Celebi <[hidden email]>:
Can you please have a look into the JobManager log file and report
which checkpoints are restored? You should see messages from
ZooKeeperCompletedCheckpointStore like:
- Found X checkpoints in ZooKeeper
- Initialized with X. Removing all older checkpoints

You can share the complete job manager log file as well if you like.

– Ufuk

On Wed, Mar 16, 2016 at 2:50 PM, Simone Robutti
<[hidden email]> wrote:
> Hello,
>
> I'm testing the checkpointing functionality with hdfs as a backend.
>
> For what I can see it uses different checkpointing files and resume the
> computation from different points and not from the latest available. This is
> to me an unexpected behaviour.
>
> I log every second, for every worker, a counter that is increased by 1 at
> each step.
>
> So for example on node-1 the count goes up to 5, then I kill a job manager
> or task manager and it resumes from 5 or 4 and it's ok. The next time I kill
> a job manager the count is at 15 and it resumes at 14 or 15. Sometimes it
> may happen that at a third kill the work resumes at 4 or 5 as if the
> checkpoint resumed the second time wasn't there.
>
> Once I even saw it jump forward: the first kill is at 10 and it resumes at
> 9, the second kill is at 70 and it resumes at 9, the third kill is at 15 but
> it resumes at 69 as if it resumed from the second kill checkpoint.
>
> This is clearly inconsistent.
>
> Also, in the logs I can find that sometimes it uses a checkpoint file
> different from the previous, consistent resume.
>
> What am I doing wrong? Is it a known bug?

Reply | Threaded
Open this post in threaded view
|

Re: Flink Checkpoint on yarn

Ufuk Celebi
Hey Simone,

from the logs it looks like multiple jobs have been submitted to the
cluster, not just one. The different files correspond to different
jobs recovering. The filtered logs show three jobs running/recovering
(with IDs 10d8ccae6e87ac56bf763caf4bc4742f,
124f29322f9026ac1b35435d5de9f625, 7f280b38065eaa6335f5c3de4fc82547).

Did you manually re-submit the job after killing a job manager?

Regarding the counts, it can happen that they are rolled back to a
previous consistent state if the checkpoint was not completed yet
(including the write to ZooKeeper). In that case the job state will be
rolled back to an earlier consistent state.

Can you please share the complete job manager logs of your program?
The most helpful thing will be to have a log for each started job
manager container. I don't know if that is easily possible.

– Ufuk

On Wed, Mar 16, 2016 at 4:12 PM, Simone Robutti
<[hidden email]> wrote:

> This is the log filtered to check messages from
> ZooKeeperCompletedCheckpointStore.
>
> https://gist.github.com/chobeat/0222b31b87df3fa46a23
>
> It looks like it finds only a checkpoint but I'm not sure if the different
> hashes and IDs of the checkpoints are meaningful or not.
>
>
>
> 2016-03-16 15:33 GMT+01:00 Ufuk Celebi <[hidden email]>:
>>
>> Can you please have a look into the JobManager log file and report
>> which checkpoints are restored? You should see messages from
>> ZooKeeperCompletedCheckpointStore like:
>> - Found X checkpoints in ZooKeeper
>> - Initialized with X. Removing all older checkpoints
>>
>> You can share the complete job manager log file as well if you like.
>>
>> – Ufuk
>>
>> On Wed, Mar 16, 2016 at 2:50 PM, Simone Robutti
>> <[hidden email]> wrote:
>> > Hello,
>> >
>> > I'm testing the checkpointing functionality with hdfs as a backend.
>> >
>> > For what I can see it uses different checkpointing files and resume the
>> > computation from different points and not from the latest available.
>> > This is
>> > to me an unexpected behaviour.
>> >
>> > I log every second, for every worker, a counter that is increased by 1
>> > at
>> > each step.
>> >
>> > So for example on node-1 the count goes up to 5, then I kill a job
>> > manager
>> > or task manager and it resumes from 5 or 4 and it's ok. The next time I
>> > kill
>> > a job manager the count is at 15 and it resumes at 14 or 15. Sometimes
>> > it
>> > may happen that at a third kill the work resumes at 4 or 5 as if the
>> > checkpoint resumed the second time wasn't there.
>> >
>> > Once I even saw it jump forward: the first kill is at 10 and it resumes
>> > at
>> > 9, the second kill is at 70 and it resumes at 9, the third kill is at 15
>> > but
>> > it resumes at 69 as if it resumed from the second kill checkpoint.
>> >
>> > This is clearly inconsistent.
>> >
>> > Also, in the logs I can find that sometimes it uses a checkpoint file
>> > different from the previous, consistent resume.
>> >
>> > What am I doing wrong? Is it a known bug?
>
>
Reply | Threaded
Open this post in threaded view
|

Re: Flink Checkpoint on yarn

Simone Robutti
I didn't resubmitted the job. Also the jobs are submitted one by one with -m yarn-master, not with a long running yarn session so I don't really know if they could mix up. 

I will repeat the test with a cleaned state because we saw that killing the job with yarn application -kill left the "flink run" process alive so that may be the problem. We just noticed a few minutes ago. 

If the problem persists, I will eventually come back with a full log.

Thanks for now,

Simone

2016-03-16 18:04 GMT+01:00 Ufuk Celebi <[hidden email]>:
Hey Simone,

from the logs it looks like multiple jobs have been submitted to the
cluster, not just one. The different files correspond to different
jobs recovering. The filtered logs show three jobs running/recovering
(with IDs 10d8ccae6e87ac56bf763caf4bc4742f,
124f29322f9026ac1b35435d5de9f625, 7f280b38065eaa6335f5c3de4fc82547).

Did you manually re-submit the job after killing a job manager?

Regarding the counts, it can happen that they are rolled back to a
previous consistent state if the checkpoint was not completed yet
(including the write to ZooKeeper). In that case the job state will be
rolled back to an earlier consistent state.

Can you please share the complete job manager logs of your program?
The most helpful thing will be to have a log for each started job
manager container. I don't know if that is easily possible.

– Ufuk

On Wed, Mar 16, 2016 at 4:12 PM, Simone Robutti
<[hidden email]> wrote:
> This is the log filtered to check messages from
> ZooKeeperCompletedCheckpointStore.
>
> https://gist.github.com/chobeat/0222b31b87df3fa46a23
>
> It looks like it finds only a checkpoint but I'm not sure if the different
> hashes and IDs of the checkpoints are meaningful or not.
>
>
>
> 2016-03-16 15:33 GMT+01:00 Ufuk Celebi <[hidden email]>:
>>
>> Can you please have a look into the JobManager log file and report
>> which checkpoints are restored? You should see messages from
>> ZooKeeperCompletedCheckpointStore like:
>> - Found X checkpoints in ZooKeeper
>> - Initialized with X. Removing all older checkpoints
>>
>> You can share the complete job manager log file as well if you like.
>>
>> – Ufuk
>>
>> On Wed, Mar 16, 2016 at 2:50 PM, Simone Robutti
>> <[hidden email]> wrote:
>> > Hello,
>> >
>> > I'm testing the checkpointing functionality with hdfs as a backend.
>> >
>> > For what I can see it uses different checkpointing files and resume the
>> > computation from different points and not from the latest available.
>> > This is
>> > to me an unexpected behaviour.
>> >
>> > I log every second, for every worker, a counter that is increased by 1
>> > at
>> > each step.
>> >
>> > So for example on node-1 the count goes up to 5, then I kill a job
>> > manager
>> > or task manager and it resumes from 5 or 4 and it's ok. The next time I
>> > kill
>> > a job manager the count is at 15 and it resumes at 14 or 15. Sometimes
>> > it
>> > may happen that at a third kill the work resumes at 4 or 5 as if the
>> > checkpoint resumed the second time wasn't there.
>> >
>> > Once I even saw it jump forward: the first kill is at 10 and it resumes
>> > at
>> > 9, the second kill is at 70 and it resumes at 9, the third kill is at 15
>> > but
>> > it resumes at 69 as if it resumed from the second kill checkpoint.
>> >
>> > This is clearly inconsistent.
>> >
>> > Also, in the logs I can find that sometimes it uses a checkpoint file
>> > different from the previous, consistent resume.
>> >
>> > What am I doing wrong? Is it a known bug?
>
>

Reply | Threaded
Open this post in threaded view
|

Re: Flink Checkpoint on yarn

Ufuk Celebi
OK, so you are submitting multiple jobs, but you submit them with -m
yarn-cluster and therefore expect them to start separate YARN
clusters. Makes sense and I would expect the same.

I think that you can check in the client logs printed to stdout to
which cluster the job is submitted.

PS: The logs you have shared are out-of-order, how did you gather
them? Do you have an idea why they are out of order? Maybe something
is mixed up in the way we gather the logs and we only think that
something is wrong because of this.


On Wed, Mar 16, 2016 at 6:11 PM, Simone Robutti
<[hidden email]> wrote:

> I didn't resubmitted the job. Also the jobs are submitted one by one with -m
> yarn-master, not with a long running yarn session so I don't really know if
> they could mix up.
>
> I will repeat the test with a cleaned state because we saw that killing the
> job with yarn application -kill left the "flink run" process alive so that
> may be the problem. We just noticed a few minutes ago.
>
> If the problem persists, I will eventually come back with a full log.
>
> Thanks for now,
>
> Simone
>
> 2016-03-16 18:04 GMT+01:00 Ufuk Celebi <[hidden email]>:
>>
>> Hey Simone,
>>
>> from the logs it looks like multiple jobs have been submitted to the
>> cluster, not just one. The different files correspond to different
>> jobs recovering. The filtered logs show three jobs running/recovering
>> (with IDs 10d8ccae6e87ac56bf763caf4bc4742f,
>> 124f29322f9026ac1b35435d5de9f625, 7f280b38065eaa6335f5c3de4fc82547).
>>
>> Did you manually re-submit the job after killing a job manager?
>>
>> Regarding the counts, it can happen that they are rolled back to a
>> previous consistent state if the checkpoint was not completed yet
>> (including the write to ZooKeeper). In that case the job state will be
>> rolled back to an earlier consistent state.
>>
>> Can you please share the complete job manager logs of your program?
>> The most helpful thing will be to have a log for each started job
>> manager container. I don't know if that is easily possible.
>>
>> – Ufuk
>>
>> On Wed, Mar 16, 2016 at 4:12 PM, Simone Robutti
>> <[hidden email]> wrote:
>> > This is the log filtered to check messages from
>> > ZooKeeperCompletedCheckpointStore.
>> >
>> > https://gist.github.com/chobeat/0222b31b87df3fa46a23
>> >
>> > It looks like it finds only a checkpoint but I'm not sure if the
>> > different
>> > hashes and IDs of the checkpoints are meaningful or not.
>> >
>> >
>> >
>> > 2016-03-16 15:33 GMT+01:00 Ufuk Celebi <[hidden email]>:
>> >>
>> >> Can you please have a look into the JobManager log file and report
>> >> which checkpoints are restored? You should see messages from
>> >> ZooKeeperCompletedCheckpointStore like:
>> >> - Found X checkpoints in ZooKeeper
>> >> - Initialized with X. Removing all older checkpoints
>> >>
>> >> You can share the complete job manager log file as well if you like.
>> >>
>> >> – Ufuk
>> >>
>> >> On Wed, Mar 16, 2016 at 2:50 PM, Simone Robutti
>> >> <[hidden email]> wrote:
>> >> > Hello,
>> >> >
>> >> > I'm testing the checkpointing functionality with hdfs as a backend.
>> >> >
>> >> > For what I can see it uses different checkpointing files and resume
>> >> > the
>> >> > computation from different points and not from the latest available.
>> >> > This is
>> >> > to me an unexpected behaviour.
>> >> >
>> >> > I log every second, for every worker, a counter that is increased by
>> >> > 1
>> >> > at
>> >> > each step.
>> >> >
>> >> > So for example on node-1 the count goes up to 5, then I kill a job
>> >> > manager
>> >> > or task manager and it resumes from 5 or 4 and it's ok. The next time
>> >> > I
>> >> > kill
>> >> > a job manager the count is at 15 and it resumes at 14 or 15.
>> >> > Sometimes
>> >> > it
>> >> > may happen that at a third kill the work resumes at 4 or 5 as if the
>> >> > checkpoint resumed the second time wasn't there.
>> >> >
>> >> > Once I even saw it jump forward: the first kill is at 10 and it
>> >> > resumes
>> >> > at
>> >> > 9, the second kill is at 70 and it resumes at 9, the third kill is at
>> >> > 15
>> >> > but
>> >> > it resumes at 69 as if it resumed from the second kill checkpoint.
>> >> >
>> >> > This is clearly inconsistent.
>> >> >
>> >> > Also, in the logs I can find that sometimes it uses a checkpoint file
>> >> > different from the previous, consistent resume.
>> >> >
>> >> > What am I doing wrong? Is it a known bug?
>> >
>> >
>
>
Reply | Threaded
Open this post in threaded view
|

Re: Flink Checkpoint on yarn

Simone Robutti
Actually the test was intended for a single job. The fact that there are more jobs is unexpected and it will be the first thing to verify. Considering these problems we will go for deeper tests with multiple jobs.

The logs are collected with "yarn logs" but log aggregation is not properly configured so I wouldn't rely too much on that. Before doing the tests tomorrow I will clear all the existing logs just to be sure.

2016-03-16 18:19 GMT+01:00 Ufuk Celebi <[hidden email]>:
OK, so you are submitting multiple jobs, but you submit them with -m
yarn-cluster and therefore expect them to start separate YARN
clusters. Makes sense and I would expect the same.

I think that you can check in the client logs printed to stdout to
which cluster the job is submitted.

PS: The logs you have shared are out-of-order, how did you gather
them? Do you have an idea why they are out of order? Maybe something
is mixed up in the way we gather the logs and we only think that
something is wrong because of this.


On Wed, Mar 16, 2016 at 6:11 PM, Simone Robutti
<[hidden email]> wrote:
> I didn't resubmitted the job. Also the jobs are submitted one by one with -m
> yarn-master, not with a long running yarn session so I don't really know if
> they could mix up.
>
> I will repeat the test with a cleaned state because we saw that killing the
> job with yarn application -kill left the "flink run" process alive so that
> may be the problem. We just noticed a few minutes ago.
>
> If the problem persists, I will eventually come back with a full log.
>
> Thanks for now,
>
> Simone
>
> 2016-03-16 18:04 GMT+01:00 Ufuk Celebi <[hidden email]>:
>>
>> Hey Simone,
>>
>> from the logs it looks like multiple jobs have been submitted to the
>> cluster, not just one. The different files correspond to different
>> jobs recovering. The filtered logs show three jobs running/recovering
>> (with IDs 10d8ccae6e87ac56bf763caf4bc4742f,
>> 124f29322f9026ac1b35435d5de9f625, 7f280b38065eaa6335f5c3de4fc82547).
>>
>> Did you manually re-submit the job after killing a job manager?
>>
>> Regarding the counts, it can happen that they are rolled back to a
>> previous consistent state if the checkpoint was not completed yet
>> (including the write to ZooKeeper). In that case the job state will be
>> rolled back to an earlier consistent state.
>>
>> Can you please share the complete job manager logs of your program?
>> The most helpful thing will be to have a log for each started job
>> manager container. I don't know if that is easily possible.
>>
>> – Ufuk
>>
>> On Wed, Mar 16, 2016 at 4:12 PM, Simone Robutti
>> <[hidden email]> wrote:
>> > This is the log filtered to check messages from
>> > ZooKeeperCompletedCheckpointStore.
>> >
>> > https://gist.github.com/chobeat/0222b31b87df3fa46a23
>> >
>> > It looks like it finds only a checkpoint but I'm not sure if the
>> > different
>> > hashes and IDs of the checkpoints are meaningful or not.
>> >
>> >
>> >
>> > 2016-03-16 15:33 GMT+01:00 Ufuk Celebi <[hidden email]>:
>> >>
>> >> Can you please have a look into the JobManager log file and report
>> >> which checkpoints are restored? You should see messages from
>> >> ZooKeeperCompletedCheckpointStore like:
>> >> - Found X checkpoints in ZooKeeper
>> >> - Initialized with X. Removing all older checkpoints
>> >>
>> >> You can share the complete job manager log file as well if you like.
>> >>
>> >> – Ufuk
>> >>
>> >> On Wed, Mar 16, 2016 at 2:50 PM, Simone Robutti
>> >> <[hidden email]> wrote:
>> >> > Hello,
>> >> >
>> >> > I'm testing the checkpointing functionality with hdfs as a backend.
>> >> >
>> >> > For what I can see it uses different checkpointing files and resume
>> >> > the
>> >> > computation from different points and not from the latest available.
>> >> > This is
>> >> > to me an unexpected behaviour.
>> >> >
>> >> > I log every second, for every worker, a counter that is increased by
>> >> > 1
>> >> > at
>> >> > each step.
>> >> >
>> >> > So for example on node-1 the count goes up to 5, then I kill a job
>> >> > manager
>> >> > or task manager and it resumes from 5 or 4 and it's ok. The next time
>> >> > I
>> >> > kill
>> >> > a job manager the count is at 15 and it resumes at 14 or 15.
>> >> > Sometimes
>> >> > it
>> >> > may happen that at a third kill the work resumes at 4 or 5 as if the
>> >> > checkpoint resumed the second time wasn't there.
>> >> >
>> >> > Once I even saw it jump forward: the first kill is at 10 and it
>> >> > resumes
>> >> > at
>> >> > 9, the second kill is at 70 and it resumes at 9, the third kill is at
>> >> > 15
>> >> > but
>> >> > it resumes at 69 as if it resumed from the second kill checkpoint.
>> >> >
>> >> > This is clearly inconsistent.
>> >> >
>> >> > Also, in the logs I can find that sometimes it uses a checkpoint file
>> >> > different from the previous, consistent resume.
>> >> >
>> >> > What am I doing wrong? Is it a known bug?
>> >
>> >
>
>

Reply | Threaded
Open this post in threaded view
|

Re: Flink Checkpoint on yarn

Simone Robutti
Ok, i run another test.

I launched two identical jobs, one after the other, on yarn (without the long running session). I then killed a job manager and both the jobs got problems and then resumed their work after a few seconds. The problem is the first job restored the state of the second job and vice versa.


At line 141 of the first job and at line 131 of the second job I killed the job manager. As you can see, the first stopped at 48 and resumed at 39 while the second stopped at 38 and resumed at 48. I hope there's something wrong with my configuration because otherwise this really looks like a bug. 

Thanks in advance,

Simone

2016-03-16 18:55 GMT+01:00 Simone Robutti <[hidden email]>:
Actually the test was intended for a single job. The fact that there are more jobs is unexpected and it will be the first thing to verify. Considering these problems we will go for deeper tests with multiple jobs.

The logs are collected with "yarn logs" but log aggregation is not properly configured so I wouldn't rely too much on that. Before doing the tests tomorrow I will clear all the existing logs just to be sure.

2016-03-16 18:19 GMT+01:00 Ufuk Celebi <[hidden email]>:
OK, so you are submitting multiple jobs, but you submit them with -m
yarn-cluster and therefore expect them to start separate YARN
clusters. Makes sense and I would expect the same.

I think that you can check in the client logs printed to stdout to
which cluster the job is submitted.

PS: The logs you have shared are out-of-order, how did you gather
them? Do you have an idea why they are out of order? Maybe something
is mixed up in the way we gather the logs and we only think that
something is wrong because of this.


On Wed, Mar 16, 2016 at 6:11 PM, Simone Robutti
<[hidden email]> wrote:
> I didn't resubmitted the job. Also the jobs are submitted one by one with -m
> yarn-master, not with a long running yarn session so I don't really know if
> they could mix up.
>
> I will repeat the test with a cleaned state because we saw that killing the
> job with yarn application -kill left the "flink run" process alive so that
> may be the problem. We just noticed a few minutes ago.
>
> If the problem persists, I will eventually come back with a full log.
>
> Thanks for now,
>
> Simone
>
> 2016-03-16 18:04 GMT+01:00 Ufuk Celebi <[hidden email]>:
>>
>> Hey Simone,
>>
>> from the logs it looks like multiple jobs have been submitted to the
>> cluster, not just one. The different files correspond to different
>> jobs recovering. The filtered logs show three jobs running/recovering
>> (with IDs 10d8ccae6e87ac56bf763caf4bc4742f,
>> 124f29322f9026ac1b35435d5de9f625, 7f280b38065eaa6335f5c3de4fc82547).
>>
>> Did you manually re-submit the job after killing a job manager?
>>
>> Regarding the counts, it can happen that they are rolled back to a
>> previous consistent state if the checkpoint was not completed yet
>> (including the write to ZooKeeper). In that case the job state will be
>> rolled back to an earlier consistent state.
>>
>> Can you please share the complete job manager logs of your program?
>> The most helpful thing will be to have a log for each started job
>> manager container. I don't know if that is easily possible.
>>
>> – Ufuk
>>
>> On Wed, Mar 16, 2016 at 4:12 PM, Simone Robutti
>> <[hidden email]> wrote:
>> > This is the log filtered to check messages from
>> > ZooKeeperCompletedCheckpointStore.
>> >
>> > https://gist.github.com/chobeat/0222b31b87df3fa46a23
>> >
>> > It looks like it finds only a checkpoint but I'm not sure if the
>> > different
>> > hashes and IDs of the checkpoints are meaningful or not.
>> >
>> >
>> >
>> > 2016-03-16 15:33 GMT+01:00 Ufuk Celebi <[hidden email]>:
>> >>
>> >> Can you please have a look into the JobManager log file and report
>> >> which checkpoints are restored? You should see messages from
>> >> ZooKeeperCompletedCheckpointStore like:
>> >> - Found X checkpoints in ZooKeeper
>> >> - Initialized with X. Removing all older checkpoints
>> >>
>> >> You can share the complete job manager log file as well if you like.
>> >>
>> >> – Ufuk
>> >>
>> >> On Wed, Mar 16, 2016 at 2:50 PM, Simone Robutti
>> >> <[hidden email]> wrote:
>> >> > Hello,
>> >> >
>> >> > I'm testing the checkpointing functionality with hdfs as a backend.
>> >> >
>> >> > For what I can see it uses different checkpointing files and resume
>> >> > the
>> >> > computation from different points and not from the latest available.
>> >> > This is
>> >> > to me an unexpected behaviour.
>> >> >
>> >> > I log every second, for every worker, a counter that is increased by
>> >> > 1
>> >> > at
>> >> > each step.
>> >> >
>> >> > So for example on node-1 the count goes up to 5, then I kill a job
>> >> > manager
>> >> > or task manager and it resumes from 5 or 4 and it's ok. The next time
>> >> > I
>> >> > kill
>> >> > a job manager the count is at 15 and it resumes at 14 or 15.
>> >> > Sometimes
>> >> > it
>> >> > may happen that at a third kill the work resumes at 4 or 5 as if the
>> >> > checkpoint resumed the second time wasn't there.
>> >> >
>> >> > Once I even saw it jump forward: the first kill is at 10 and it
>> >> > resumes
>> >> > at
>> >> > 9, the second kill is at 70 and it resumes at 9, the third kill is at
>> >> > 15
>> >> > but
>> >> > it resumes at 69 as if it resumed from the second kill checkpoint.
>> >> >
>> >> > This is clearly inconsistent.
>> >> >
>> >> > Also, in the logs I can find that sometimes it uses a checkpoint file
>> >> > different from the previous, consistent resume.
>> >> >
>> >> > What am I doing wrong? Is it a known bug?
>> >
>> >
>
>


Reply | Threaded
Open this post in threaded view
|

Re: Flink Checkpoint on yarn

Ufuk Celebi
Hey Simone! Did you set different recovery.zookeeper.path.root keys?
The default is /flink and if you don't change it for the 2nd cluster,
it will try to recover the jobs of the first one. Can you gather the
job manager logs as well please?

– Ufuk

On Thu, Mar 17, 2016 at 11:31 AM, Simone Robutti
<[hidden email]> wrote:

> Ok, i run another test.
>
> I launched two identical jobs, one after the other, on yarn (without the
> long running session). I then killed a job manager and both the jobs got
> problems and then resumed their work after a few seconds. The problem is the
> first job restored the state of the second job and vice versa.
>
> Here are the logs: https://gist.github.com/chobeat/38f8eee753aeaca51acc
>
> At line 141 of the first job and at line 131 of the second job I killed the
> job manager. As you can see, the first stopped at 48 and resumed at 39 while
> the second stopped at 38 and resumed at 48. I hope there's something wrong
> with my configuration because otherwise this really looks like a bug.
>
> Thanks in advance,
>
> Simone
>
> 2016-03-16 18:55 GMT+01:00 Simone Robutti <[hidden email]>:
>>
>> Actually the test was intended for a single job. The fact that there are
>> more jobs is unexpected and it will be the first thing to verify.
>> Considering these problems we will go for deeper tests with multiple jobs.
>>
>> The logs are collected with "yarn logs" but log aggregation is not
>> properly configured so I wouldn't rely too much on that. Before doing the
>> tests tomorrow I will clear all the existing logs just to be sure.
>>
>> 2016-03-16 18:19 GMT+01:00 Ufuk Celebi <[hidden email]>:
>>>
>>> OK, so you are submitting multiple jobs, but you submit them with -m
>>> yarn-cluster and therefore expect them to start separate YARN
>>> clusters. Makes sense and I would expect the same.
>>>
>>> I think that you can check in the client logs printed to stdout to
>>> which cluster the job is submitted.
>>>
>>> PS: The logs you have shared are out-of-order, how did you gather
>>> them? Do you have an idea why they are out of order? Maybe something
>>> is mixed up in the way we gather the logs and we only think that
>>> something is wrong because of this.
>>>
>>>
>>> On Wed, Mar 16, 2016 at 6:11 PM, Simone Robutti
>>> <[hidden email]> wrote:
>>> > I didn't resubmitted the job. Also the jobs are submitted one by one
>>> > with -m
>>> > yarn-master, not with a long running yarn session so I don't really
>>> > know if
>>> > they could mix up.
>>> >
>>> > I will repeat the test with a cleaned state because we saw that killing
>>> > the
>>> > job with yarn application -kill left the "flink run" process alive so
>>> > that
>>> > may be the problem. We just noticed a few minutes ago.
>>> >
>>> > If the problem persists, I will eventually come back with a full log.
>>> >
>>> > Thanks for now,
>>> >
>>> > Simone
>>> >
>>> > 2016-03-16 18:04 GMT+01:00 Ufuk Celebi <[hidden email]>:
>>> >>
>>> >> Hey Simone,
>>> >>
>>> >> from the logs it looks like multiple jobs have been submitted to the
>>> >> cluster, not just one. The different files correspond to different
>>> >> jobs recovering. The filtered logs show three jobs running/recovering
>>> >> (with IDs 10d8ccae6e87ac56bf763caf4bc4742f,
>>> >> 124f29322f9026ac1b35435d5de9f625, 7f280b38065eaa6335f5c3de4fc82547).
>>> >>
>>> >> Did you manually re-submit the job after killing a job manager?
>>> >>
>>> >> Regarding the counts, it can happen that they are rolled back to a
>>> >> previous consistent state if the checkpoint was not completed yet
>>> >> (including the write to ZooKeeper). In that case the job state will be
>>> >> rolled back to an earlier consistent state.
>>> >>
>>> >> Can you please share the complete job manager logs of your program?
>>> >> The most helpful thing will be to have a log for each started job
>>> >> manager container. I don't know if that is easily possible.
>>> >>
>>> >> – Ufuk
>>> >>
>>> >> On Wed, Mar 16, 2016 at 4:12 PM, Simone Robutti
>>> >> <[hidden email]> wrote:
>>> >> > This is the log filtered to check messages from
>>> >> > ZooKeeperCompletedCheckpointStore.
>>> >> >
>>> >> > https://gist.github.com/chobeat/0222b31b87df3fa46a23
>>> >> >
>>> >> > It looks like it finds only a checkpoint but I'm not sure if the
>>> >> > different
>>> >> > hashes and IDs of the checkpoints are meaningful or not.
>>> >> >
>>> >> >
>>> >> >
>>> >> > 2016-03-16 15:33 GMT+01:00 Ufuk Celebi <[hidden email]>:
>>> >> >>
>>> >> >> Can you please have a look into the JobManager log file and report
>>> >> >> which checkpoints are restored? You should see messages from
>>> >> >> ZooKeeperCompletedCheckpointStore like:
>>> >> >> - Found X checkpoints in ZooKeeper
>>> >> >> - Initialized with X. Removing all older checkpoints
>>> >> >>
>>> >> >> You can share the complete job manager log file as well if you
>>> >> >> like.
>>> >> >>
>>> >> >> – Ufuk
>>> >> >>
>>> >> >> On Wed, Mar 16, 2016 at 2:50 PM, Simone Robutti
>>> >> >> <[hidden email]> wrote:
>>> >> >> > Hello,
>>> >> >> >
>>> >> >> > I'm testing the checkpointing functionality with hdfs as a
>>> >> >> > backend.
>>> >> >> >
>>> >> >> > For what I can see it uses different checkpointing files and
>>> >> >> > resume
>>> >> >> > the
>>> >> >> > computation from different points and not from the latest
>>> >> >> > available.
>>> >> >> > This is
>>> >> >> > to me an unexpected behaviour.
>>> >> >> >
>>> >> >> > I log every second, for every worker, a counter that is increased
>>> >> >> > by
>>> >> >> > 1
>>> >> >> > at
>>> >> >> > each step.
>>> >> >> >
>>> >> >> > So for example on node-1 the count goes up to 5, then I kill a
>>> >> >> > job
>>> >> >> > manager
>>> >> >> > or task manager and it resumes from 5 or 4 and it's ok. The next
>>> >> >> > time
>>> >> >> > I
>>> >> >> > kill
>>> >> >> > a job manager the count is at 15 and it resumes at 14 or 15.
>>> >> >> > Sometimes
>>> >> >> > it
>>> >> >> > may happen that at a third kill the work resumes at 4 or 5 as if
>>> >> >> > the
>>> >> >> > checkpoint resumed the second time wasn't there.
>>> >> >> >
>>> >> >> > Once I even saw it jump forward: the first kill is at 10 and it
>>> >> >> > resumes
>>> >> >> > at
>>> >> >> > 9, the second kill is at 70 and it resumes at 9, the third kill
>>> >> >> > is at
>>> >> >> > 15
>>> >> >> > but
>>> >> >> > it resumes at 69 as if it resumed from the second kill
>>> >> >> > checkpoint.
>>> >> >> >
>>> >> >> > This is clearly inconsistent.
>>> >> >> >
>>> >> >> > Also, in the logs I can find that sometimes it uses a checkpoint
>>> >> >> > file
>>> >> >> > different from the previous, consistent resume.
>>> >> >> >
>>> >> >> > What am I doing wrong? Is it a known bug?
>>> >> >
>>> >> >
>>> >
>>> >
>>
>>
>
Reply | Threaded
Open this post in threaded view
|

Re: Flink Checkpoint on yarn

stefanobaghino
Hi Ufuk,

does the recovery.zookeeper.path.root property need to be set independently for each job that is run? Doesn't Flink take care of assigning some sort of identification to each job and storing their checkpoints independently?

On Thu, Mar 17, 2016 at 11:43 AM, Ufuk Celebi <[hidden email]> wrote:
Hey Simone! Did you set different recovery.zookeeper.path.root keys?
The default is /flink and if you don't change it for the 2nd cluster,
it will try to recover the jobs of the first one. Can you gather the
job manager logs as well please?

– Ufuk

On Thu, Mar 17, 2016 at 11:31 AM, Simone Robutti
<[hidden email]> wrote:
> Ok, i run another test.
>
> I launched two identical jobs, one after the other, on yarn (without the
> long running session). I then killed a job manager and both the jobs got
> problems and then resumed their work after a few seconds. The problem is the
> first job restored the state of the second job and vice versa.
>
> Here are the logs: https://gist.github.com/chobeat/38f8eee753aeaca51acc
>
> At line 141 of the first job and at line 131 of the second job I killed the
> job manager. As you can see, the first stopped at 48 and resumed at 39 while
> the second stopped at 38 and resumed at 48. I hope there's something wrong
> with my configuration because otherwise this really looks like a bug.
>
> Thanks in advance,
>
> Simone
>
> 2016-03-16 18:55 GMT+01:00 Simone Robutti <[hidden email]>:
>>
>> Actually the test was intended for a single job. The fact that there are
>> more jobs is unexpected and it will be the first thing to verify.
>> Considering these problems we will go for deeper tests with multiple jobs.
>>
>> The logs are collected with "yarn logs" but log aggregation is not
>> properly configured so I wouldn't rely too much on that. Before doing the
>> tests tomorrow I will clear all the existing logs just to be sure.
>>
>> 2016-03-16 18:19 GMT+01:00 Ufuk Celebi <[hidden email]>:
>>>
>>> OK, so you are submitting multiple jobs, but you submit them with -m
>>> yarn-cluster and therefore expect them to start separate YARN
>>> clusters. Makes sense and I would expect the same.
>>>
>>> I think that you can check in the client logs printed to stdout to
>>> which cluster the job is submitted.
>>>
>>> PS: The logs you have shared are out-of-order, how did you gather
>>> them? Do you have an idea why they are out of order? Maybe something
>>> is mixed up in the way we gather the logs and we only think that
>>> something is wrong because of this.
>>>
>>>
>>> On Wed, Mar 16, 2016 at 6:11 PM, Simone Robutti
>>> <[hidden email]> wrote:
>>> > I didn't resubmitted the job. Also the jobs are submitted one by one
>>> > with -m
>>> > yarn-master, not with a long running yarn session so I don't really
>>> > know if
>>> > they could mix up.
>>> >
>>> > I will repeat the test with a cleaned state because we saw that killing
>>> > the
>>> > job with yarn application -kill left the "flink run" process alive so
>>> > that
>>> > may be the problem. We just noticed a few minutes ago.
>>> >
>>> > If the problem persists, I will eventually come back with a full log.
>>> >
>>> > Thanks for now,
>>> >
>>> > Simone
>>> >
>>> > 2016-03-16 18:04 GMT+01:00 Ufuk Celebi <[hidden email]>:
>>> >>
>>> >> Hey Simone,
>>> >>
>>> >> from the logs it looks like multiple jobs have been submitted to the
>>> >> cluster, not just one. The different files correspond to different
>>> >> jobs recovering. The filtered logs show three jobs running/recovering
>>> >> (with IDs 10d8ccae6e87ac56bf763caf4bc4742f,
>>> >> 124f29322f9026ac1b35435d5de9f625, 7f280b38065eaa6335f5c3de4fc82547).
>>> >>
>>> >> Did you manually re-submit the job after killing a job manager?
>>> >>
>>> >> Regarding the counts, it can happen that they are rolled back to a
>>> >> previous consistent state if the checkpoint was not completed yet
>>> >> (including the write to ZooKeeper). In that case the job state will be
>>> >> rolled back to an earlier consistent state.
>>> >>
>>> >> Can you please share the complete job manager logs of your program?
>>> >> The most helpful thing will be to have a log for each started job
>>> >> manager container. I don't know if that is easily possible.
>>> >>
>>> >> – Ufuk
>>> >>
>>> >> On Wed, Mar 16, 2016 at 4:12 PM, Simone Robutti
>>> >> <[hidden email]> wrote:
>>> >> > This is the log filtered to check messages from
>>> >> > ZooKeeperCompletedCheckpointStore.
>>> >> >
>>> >> > https://gist.github.com/chobeat/0222b31b87df3fa46a23
>>> >> >
>>> >> > It looks like it finds only a checkpoint but I'm not sure if the
>>> >> > different
>>> >> > hashes and IDs of the checkpoints are meaningful or not.
>>> >> >
>>> >> >
>>> >> >
>>> >> > 2016-03-16 15:33 GMT+01:00 Ufuk Celebi <[hidden email]>:
>>> >> >>
>>> >> >> Can you please have a look into the JobManager log file and report
>>> >> >> which checkpoints are restored? You should see messages from
>>> >> >> ZooKeeperCompletedCheckpointStore like:
>>> >> >> - Found X checkpoints in ZooKeeper
>>> >> >> - Initialized with X. Removing all older checkpoints
>>> >> >>
>>> >> >> You can share the complete job manager log file as well if you
>>> >> >> like.
>>> >> >>
>>> >> >> – Ufuk
>>> >> >>
>>> >> >> On Wed, Mar 16, 2016 at 2:50 PM, Simone Robutti
>>> >> >> <[hidden email]> wrote:
>>> >> >> > Hello,
>>> >> >> >
>>> >> >> > I'm testing the checkpointing functionality with hdfs as a
>>> >> >> > backend.
>>> >> >> >
>>> >> >> > For what I can see it uses different checkpointing files and
>>> >> >> > resume
>>> >> >> > the
>>> >> >> > computation from different points and not from the latest
>>> >> >> > available.
>>> >> >> > This is
>>> >> >> > to me an unexpected behaviour.
>>> >> >> >
>>> >> >> > I log every second, for every worker, a counter that is increased
>>> >> >> > by
>>> >> >> > 1
>>> >> >> > at
>>> >> >> > each step.
>>> >> >> >
>>> >> >> > So for example on node-1 the count goes up to 5, then I kill a
>>> >> >> > job
>>> >> >> > manager
>>> >> >> > or task manager and it resumes from 5 or 4 and it's ok. The next
>>> >> >> > time
>>> >> >> > I
>>> >> >> > kill
>>> >> >> > a job manager the count is at 15 and it resumes at 14 or 15.
>>> >> >> > Sometimes
>>> >> >> > it
>>> >> >> > may happen that at a third kill the work resumes at 4 or 5 as if
>>> >> >> > the
>>> >> >> > checkpoint resumed the second time wasn't there.
>>> >> >> >
>>> >> >> > Once I even saw it jump forward: the first kill is at 10 and it
>>> >> >> > resumes
>>> >> >> > at
>>> >> >> > 9, the second kill is at 70 and it resumes at 9, the third kill
>>> >> >> > is at
>>> >> >> > 15
>>> >> >> > but
>>> >> >> > it resumes at 69 as if it resumed from the second kill
>>> >> >> > checkpoint.
>>> >> >> >
>>> >> >> > This is clearly inconsistent.
>>> >> >> >
>>> >> >> > Also, in the logs I can find that sometimes it uses a checkpoint
>>> >> >> > file
>>> >> >> > different from the previous, consistent resume.
>>> >> >> >
>>> >> >> > What am I doing wrong? Is it a known bug?
>>> >> >
>>> >> >
>>> >
>>> >
>>
>>
>



--
BR,
Stefano Baghino

Software Engineer @ Radicalbit
Reply | Threaded
Open this post in threaded view
|

Re: Flink Checkpoint on yarn

stefanobaghino
Hi Ufuk,

I've read the documentation and it's exactly as you say, thanks for the clarification.

Assuming one wants to run several jobs in parallel with different users on a secure cluster in HA mode, would you think setting the recovery.zookeeper.path.root from the startup script could be regarded as a good practice?

On Thu, Mar 17, 2016 at 11:51 AM, Stefano Baghino <[hidden email]> wrote:
Hi Ufuk,

does the recovery.zookeeper.path.root property need to be set independently for each job that is run? Doesn't Flink take care of assigning some sort of identification to each job and storing their checkpoints independently?

On Thu, Mar 17, 2016 at 11:43 AM, Ufuk Celebi <[hidden email]> wrote:
Hey Simone! Did you set different recovery.zookeeper.path.root keys?
The default is /flink and if you don't change it for the 2nd cluster,
it will try to recover the jobs of the first one. Can you gather the
job manager logs as well please?

– Ufuk

On Thu, Mar 17, 2016 at 11:31 AM, Simone Robutti
<[hidden email]> wrote:
> Ok, i run another test.
>
> I launched two identical jobs, one after the other, on yarn (without the
> long running session). I then killed a job manager and both the jobs got
> problems and then resumed their work after a few seconds. The problem is the
> first job restored the state of the second job and vice versa.
>
> Here are the logs: https://gist.github.com/chobeat/38f8eee753aeaca51acc
>
> At line 141 of the first job and at line 131 of the second job I killed the
> job manager. As you can see, the first stopped at 48 and resumed at 39 while
> the second stopped at 38 and resumed at 48. I hope there's something wrong
> with my configuration because otherwise this really looks like a bug.
>
> Thanks in advance,
>
> Simone
>
> 2016-03-16 18:55 GMT+01:00 Simone Robutti <[hidden email]>:
>>
>> Actually the test was intended for a single job. The fact that there are
>> more jobs is unexpected and it will be the first thing to verify.
>> Considering these problems we will go for deeper tests with multiple jobs.
>>
>> The logs are collected with "yarn logs" but log aggregation is not
>> properly configured so I wouldn't rely too much on that. Before doing the
>> tests tomorrow I will clear all the existing logs just to be sure.
>>
>> 2016-03-16 18:19 GMT+01:00 Ufuk Celebi <[hidden email]>:
>>>
>>> OK, so you are submitting multiple jobs, but you submit them with -m
>>> yarn-cluster and therefore expect them to start separate YARN
>>> clusters. Makes sense and I would expect the same.
>>>
>>> I think that you can check in the client logs printed to stdout to
>>> which cluster the job is submitted.
>>>
>>> PS: The logs you have shared are out-of-order, how did you gather
>>> them? Do you have an idea why they are out of order? Maybe something
>>> is mixed up in the way we gather the logs and we only think that
>>> something is wrong because of this.
>>>
>>>
>>> On Wed, Mar 16, 2016 at 6:11 PM, Simone Robutti
>>> <[hidden email]> wrote:
>>> > I didn't resubmitted the job. Also the jobs are submitted one by one
>>> > with -m
>>> > yarn-master, not with a long running yarn session so I don't really
>>> > know if
>>> > they could mix up.
>>> >
>>> > I will repeat the test with a cleaned state because we saw that killing
>>> > the
>>> > job with yarn application -kill left the "flink run" process alive so
>>> > that
>>> > may be the problem. We just noticed a few minutes ago.
>>> >
>>> > If the problem persists, I will eventually come back with a full log.
>>> >
>>> > Thanks for now,
>>> >
>>> > Simone
>>> >
>>> > 2016-03-16 18:04 GMT+01:00 Ufuk Celebi <[hidden email]>:
>>> >>
>>> >> Hey Simone,
>>> >>
>>> >> from the logs it looks like multiple jobs have been submitted to the
>>> >> cluster, not just one. The different files correspond to different
>>> >> jobs recovering. The filtered logs show three jobs running/recovering
>>> >> (with IDs 10d8ccae6e87ac56bf763caf4bc4742f,
>>> >> 124f29322f9026ac1b35435d5de9f625, 7f280b38065eaa6335f5c3de4fc82547).
>>> >>
>>> >> Did you manually re-submit the job after killing a job manager?
>>> >>
>>> >> Regarding the counts, it can happen that they are rolled back to a
>>> >> previous consistent state if the checkpoint was not completed yet
>>> >> (including the write to ZooKeeper). In that case the job state will be
>>> >> rolled back to an earlier consistent state.
>>> >>
>>> >> Can you please share the complete job manager logs of your program?
>>> >> The most helpful thing will be to have a log for each started job
>>> >> manager container. I don't know if that is easily possible.
>>> >>
>>> >> – Ufuk
>>> >>
>>> >> On Wed, Mar 16, 2016 at 4:12 PM, Simone Robutti
>>> >> <[hidden email]> wrote:
>>> >> > This is the log filtered to check messages from
>>> >> > ZooKeeperCompletedCheckpointStore.
>>> >> >
>>> >> > https://gist.github.com/chobeat/0222b31b87df3fa46a23
>>> >> >
>>> >> > It looks like it finds only a checkpoint but I'm not sure if the
>>> >> > different
>>> >> > hashes and IDs of the checkpoints are meaningful or not.
>>> >> >
>>> >> >
>>> >> >
>>> >> > 2016-03-16 15:33 GMT+01:00 Ufuk Celebi <[hidden email]>:
>>> >> >>
>>> >> >> Can you please have a look into the JobManager log file and report
>>> >> >> which checkpoints are restored? You should see messages from
>>> >> >> ZooKeeperCompletedCheckpointStore like:
>>> >> >> - Found X checkpoints in ZooKeeper
>>> >> >> - Initialized with X. Removing all older checkpoints
>>> >> >>
>>> >> >> You can share the complete job manager log file as well if you
>>> >> >> like.
>>> >> >>
>>> >> >> – Ufuk
>>> >> >>
>>> >> >> On Wed, Mar 16, 2016 at 2:50 PM, Simone Robutti
>>> >> >> <[hidden email]> wrote:
>>> >> >> > Hello,
>>> >> >> >
>>> >> >> > I'm testing the checkpointing functionality with hdfs as a
>>> >> >> > backend.
>>> >> >> >
>>> >> >> > For what I can see it uses different checkpointing files and
>>> >> >> > resume
>>> >> >> > the
>>> >> >> > computation from different points and not from the latest
>>> >> >> > available.
>>> >> >> > This is
>>> >> >> > to me an unexpected behaviour.
>>> >> >> >
>>> >> >> > I log every second, for every worker, a counter that is increased
>>> >> >> > by
>>> >> >> > 1
>>> >> >> > at
>>> >> >> > each step.
>>> >> >> >
>>> >> >> > So for example on node-1 the count goes up to 5, then I kill a
>>> >> >> > job
>>> >> >> > manager
>>> >> >> > or task manager and it resumes from 5 or 4 and it's ok. The next
>>> >> >> > time
>>> >> >> > I
>>> >> >> > kill
>>> >> >> > a job manager the count is at 15 and it resumes at 14 or 15.
>>> >> >> > Sometimes
>>> >> >> > it
>>> >> >> > may happen that at a third kill the work resumes at 4 or 5 as if
>>> >> >> > the
>>> >> >> > checkpoint resumed the second time wasn't there.
>>> >> >> >
>>> >> >> > Once I even saw it jump forward: the first kill is at 10 and it
>>> >> >> > resumes
>>> >> >> > at
>>> >> >> > 9, the second kill is at 70 and it resumes at 9, the third kill
>>> >> >> > is at
>>> >> >> > 15
>>> >> >> > but
>>> >> >> > it resumes at 69 as if it resumed from the second kill
>>> >> >> > checkpoint.
>>> >> >> >
>>> >> >> > This is clearly inconsistent.
>>> >> >> >
>>> >> >> > Also, in the logs I can find that sometimes it uses a checkpoint
>>> >> >> > file
>>> >> >> > different from the previous, consistent resume.
>>> >> >> >
>>> >> >> > What am I doing wrong? Is it a known bug?
>>> >> >
>>> >> >
>>> >
>>> >
>>
>>
>



--
BR,
Stefano Baghino

Software Engineer @ Radicalbit



--
BR,
Stefano Baghino

Software Engineer @ Radicalbit
Reply | Threaded
Open this post in threaded view
|

Re: Flink Checkpoint on yarn

Ufuk Celebi
In reply to this post by stefanobaghino
Yes, the jobs have their own UUID.

Although you expect there to be two independent clusters (which makes
sense since you started via yarn-cluster), both clusters act as a
single one because of the shared ZooKeeper root.

What happens in your case is the following (this is also the reason
why we see multiple jobs in the first job manager log you shared):
- Start YARN cluster A with ZK root /flink
- JobManager A becomes leader and all TaskManagers of A connect to JobManager A
- Start another YARN cluster B with ZK root /flink
- JobManager of B takes part in the leader election and most likely
does not becomes leader
- TaskManagers of B check ZK in /flink and see that JobManager A is
leader and connect to JobManager A
- Your job submissions go to JobManager A

When a job is recovered it is then possible that a task manager first
runs a task of A and later recovers a task of B (which would result in
the swapped counts you see in the log).

Do you have time to repeat your experiment with different ZooKeeper root paths?

– Ufuk


On Thu, Mar 17, 2016 at 11:51 AM, Stefano Baghino
<[hidden email]> wrote:

> Hi Ufuk,
>
> does the recovery.zookeeper.path.root property need to be set independently
> for each job that is run? Doesn't Flink take care of assigning some sort of
> identification to each job and storing their checkpoints independently?
>
> On Thu, Mar 17, 2016 at 11:43 AM, Ufuk Celebi <[hidden email]> wrote:
>>
>> Hey Simone! Did you set different recovery.zookeeper.path.root keys?
>> The default is /flink and if you don't change it for the 2nd cluster,
>> it will try to recover the jobs of the first one. Can you gather the
>> job manager logs as well please?
>>
>> – Ufuk
>>
>> On Thu, Mar 17, 2016 at 11:31 AM, Simone Robutti
>> <[hidden email]> wrote:
>> > Ok, i run another test.
>> >
>> > I launched two identical jobs, one after the other, on yarn (without the
>> > long running session). I then killed a job manager and both the jobs got
>> > problems and then resumed their work after a few seconds. The problem is
>> > the
>> > first job restored the state of the second job and vice versa.
>> >
>> > Here are the logs: https://gist.github.com/chobeat/38f8eee753aeaca51acc
>> >
>> > At line 141 of the first job and at line 131 of the second job I killed
>> > the
>> > job manager. As you can see, the first stopped at 48 and resumed at 39
>> > while
>> > the second stopped at 38 and resumed at 48. I hope there's something
>> > wrong
>> > with my configuration because otherwise this really looks like a bug.
>> >
>> > Thanks in advance,
>> >
>> > Simone
>> >
>> > 2016-03-16 18:55 GMT+01:00 Simone Robutti
>> > <[hidden email]>:
>> >>
>> >> Actually the test was intended for a single job. The fact that there
>> >> are
>> >> more jobs is unexpected and it will be the first thing to verify.
>> >> Considering these problems we will go for deeper tests with multiple
>> >> jobs.
>> >>
>> >> The logs are collected with "yarn logs" but log aggregation is not
>> >> properly configured so I wouldn't rely too much on that. Before doing
>> >> the
>> >> tests tomorrow I will clear all the existing logs just to be sure.
>> >>
>> >> 2016-03-16 18:19 GMT+01:00 Ufuk Celebi <[hidden email]>:
>> >>>
>> >>> OK, so you are submitting multiple jobs, but you submit them with -m
>> >>> yarn-cluster and therefore expect them to start separate YARN
>> >>> clusters. Makes sense and I would expect the same.
>> >>>
>> >>> I think that you can check in the client logs printed to stdout to
>> >>> which cluster the job is submitted.
>> >>>
>> >>> PS: The logs you have shared are out-of-order, how did you gather
>> >>> them? Do you have an idea why they are out of order? Maybe something
>> >>> is mixed up in the way we gather the logs and we only think that
>> >>> something is wrong because of this.
>> >>>
>> >>>
>> >>> On Wed, Mar 16, 2016 at 6:11 PM, Simone Robutti
>> >>> <[hidden email]> wrote:
>> >>> > I didn't resubmitted the job. Also the jobs are submitted one by one
>> >>> > with -m
>> >>> > yarn-master, not with a long running yarn session so I don't really
>> >>> > know if
>> >>> > they could mix up.
>> >>> >
>> >>> > I will repeat the test with a cleaned state because we saw that
>> >>> > killing
>> >>> > the
>> >>> > job with yarn application -kill left the "flink run" process alive
>> >>> > so
>> >>> > that
>> >>> > may be the problem. We just noticed a few minutes ago.
>> >>> >
>> >>> > If the problem persists, I will eventually come back with a full
>> >>> > log.
>> >>> >
>> >>> > Thanks for now,
>> >>> >
>> >>> > Simone
>> >>> >
>> >>> > 2016-03-16 18:04 GMT+01:00 Ufuk Celebi <[hidden email]>:
>> >>> >>
>> >>> >> Hey Simone,
>> >>> >>
>> >>> >> from the logs it looks like multiple jobs have been submitted to
>> >>> >> the
>> >>> >> cluster, not just one. The different files correspond to different
>> >>> >> jobs recovering. The filtered logs show three jobs
>> >>> >> running/recovering
>> >>> >> (with IDs 10d8ccae6e87ac56bf763caf4bc4742f,
>> >>> >> 124f29322f9026ac1b35435d5de9f625,
>> >>> >> 7f280b38065eaa6335f5c3de4fc82547).
>> >>> >>
>> >>> >> Did you manually re-submit the job after killing a job manager?
>> >>> >>
>> >>> >> Regarding the counts, it can happen that they are rolled back to a
>> >>> >> previous consistent state if the checkpoint was not completed yet
>> >>> >> (including the write to ZooKeeper). In that case the job state will
>> >>> >> be
>> >>> >> rolled back to an earlier consistent state.
>> >>> >>
>> >>> >> Can you please share the complete job manager logs of your program?
>> >>> >> The most helpful thing will be to have a log for each started job
>> >>> >> manager container. I don't know if that is easily possible.
>> >>> >>
>> >>> >> – Ufuk
>> >>> >>
>> >>> >> On Wed, Mar 16, 2016 at 4:12 PM, Simone Robutti
>> >>> >> <[hidden email]> wrote:
>> >>> >> > This is the log filtered to check messages from
>> >>> >> > ZooKeeperCompletedCheckpointStore.
>> >>> >> >
>> >>> >> > https://gist.github.com/chobeat/0222b31b87df3fa46a23
>> >>> >> >
>> >>> >> > It looks like it finds only a checkpoint but I'm not sure if the
>> >>> >> > different
>> >>> >> > hashes and IDs of the checkpoints are meaningful or not.
>> >>> >> >
>> >>> >> >
>> >>> >> >
>> >>> >> > 2016-03-16 15:33 GMT+01:00 Ufuk Celebi <[hidden email]>:
>> >>> >> >>
>> >>> >> >> Can you please have a look into the JobManager log file and
>> >>> >> >> report
>> >>> >> >> which checkpoints are restored? You should see messages from
>> >>> >> >> ZooKeeperCompletedCheckpointStore like:
>> >>> >> >> - Found X checkpoints in ZooKeeper
>> >>> >> >> - Initialized with X. Removing all older checkpoints
>> >>> >> >>
>> >>> >> >> You can share the complete job manager log file as well if you
>> >>> >> >> like.
>> >>> >> >>
>> >>> >> >> – Ufuk
>> >>> >> >>
>> >>> >> >> On Wed, Mar 16, 2016 at 2:50 PM, Simone Robutti
>> >>> >> >> <[hidden email]> wrote:
>> >>> >> >> > Hello,
>> >>> >> >> >
>> >>> >> >> > I'm testing the checkpointing functionality with hdfs as a
>> >>> >> >> > backend.
>> >>> >> >> >
>> >>> >> >> > For what I can see it uses different checkpointing files and
>> >>> >> >> > resume
>> >>> >> >> > the
>> >>> >> >> > computation from different points and not from the latest
>> >>> >> >> > available.
>> >>> >> >> > This is
>> >>> >> >> > to me an unexpected behaviour.
>> >>> >> >> >
>> >>> >> >> > I log every second, for every worker, a counter that is
>> >>> >> >> > increased
>> >>> >> >> > by
>> >>> >> >> > 1
>> >>> >> >> > at
>> >>> >> >> > each step.
>> >>> >> >> >
>> >>> >> >> > So for example on node-1 the count goes up to 5, then I kill a
>> >>> >> >> > job
>> >>> >> >> > manager
>> >>> >> >> > or task manager and it resumes from 5 or 4 and it's ok. The
>> >>> >> >> > next
>> >>> >> >> > time
>> >>> >> >> > I
>> >>> >> >> > kill
>> >>> >> >> > a job manager the count is at 15 and it resumes at 14 or 15.
>> >>> >> >> > Sometimes
>> >>> >> >> > it
>> >>> >> >> > may happen that at a third kill the work resumes at 4 or 5 as
>> >>> >> >> > if
>> >>> >> >> > the
>> >>> >> >> > checkpoint resumed the second time wasn't there.
>> >>> >> >> >
>> >>> >> >> > Once I even saw it jump forward: the first kill is at 10 and
>> >>> >> >> > it
>> >>> >> >> > resumes
>> >>> >> >> > at
>> >>> >> >> > 9, the second kill is at 70 and it resumes at 9, the third
>> >>> >> >> > kill
>> >>> >> >> > is at
>> >>> >> >> > 15
>> >>> >> >> > but
>> >>> >> >> > it resumes at 69 as if it resumed from the second kill
>> >>> >> >> > checkpoint.
>> >>> >> >> >
>> >>> >> >> > This is clearly inconsistent.
>> >>> >> >> >
>> >>> >> >> > Also, in the logs I can find that sometimes it uses a
>> >>> >> >> > checkpoint
>> >>> >> >> > file
>> >>> >> >> > different from the previous, consistent resume.
>> >>> >> >> >
>> >>> >> >> > What am I doing wrong? Is it a known bug?
>> >>> >> >
>> >>> >> >
>> >>> >
>> >>> >
>> >>
>> >>
>> >
>
>
>
>
> --
> BR,
> Stefano Baghino
>
> Software Engineer @ Radicalbit
Reply | Threaded
Open this post in threaded view
|

Re: Flink Checkpoint on yarn

Ufuk Celebi
In reply to this post by stefanobaghino
On Thu, Mar 17, 2016 at 11:51 AM, Stefano Baghino
<[hidden email]> wrote:
> does the recovery.zookeeper.path.root property need to be set independently
> for each job that is run?

No, just per cluster.
Reply | Threaded
Open this post in threaded view
|

Re: Flink Checkpoint on yarn

stefanobaghino
Yes, but each job runs his own cluster, right? We have to run them on a secure cluster and on a per-user basis, thus we can't run a YARN session but have to run each job independently.

On Thu, Mar 17, 2016 at 12:09 PM, Ufuk Celebi <[hidden email]> wrote:
On Thu, Mar 17, 2016 at 11:51 AM, Stefano Baghino
<[hidden email]> wrote:
> does the recovery.zookeeper.path.root property need to be set independently
> for each job that is run?

No, just per cluster.



--
BR,
Stefano Baghino

Software Engineer @ Radicalbit
Reply | Threaded
Open this post in threaded view
|

Re: Flink Checkpoint on yarn

stefanobaghino
In reply to this post by Ufuk Celebi
Do you have time to repeat your experiment with different ZooKeeper root paths?

We reached the same conclusion and we're running this test right now, thanks.

On Thu, Mar 17, 2016 at 12:08 PM, Ufuk Celebi <[hidden email]> wrote:
Yes, the jobs have their own UUID.

Although you expect there to be two independent clusters (which makes
sense since you started via yarn-cluster), both clusters act as a
single one because of the shared ZooKeeper root.

What happens in your case is the following (this is also the reason
why we see multiple jobs in the first job manager log you shared):
- Start YARN cluster A with ZK root /flink
- JobManager A becomes leader and all TaskManagers of A connect to JobManager A
- Start another YARN cluster B with ZK root /flink
- JobManager of B takes part in the leader election and most likely
does not becomes leader
- TaskManagers of B check ZK in /flink and see that JobManager A is
leader and connect to JobManager A
- Your job submissions go to JobManager A

When a job is recovered it is then possible that a task manager first
runs a task of A and later recovers a task of B (which would result in
the swapped counts you see in the log).

Do you have time to repeat your experiment with different ZooKeeper root paths?

– Ufuk


On Thu, Mar 17, 2016 at 11:51 AM, Stefano Baghino
<[hidden email]> wrote:
> Hi Ufuk,
>
> does the recovery.zookeeper.path.root property need to be set independently
> for each job that is run? Doesn't Flink take care of assigning some sort of
> identification to each job and storing their checkpoints independently?
>
> On Thu, Mar 17, 2016 at 11:43 AM, Ufuk Celebi <[hidden email]> wrote:
>>
>> Hey Simone! Did you set different recovery.zookeeper.path.root keys?
>> The default is /flink and if you don't change it for the 2nd cluster,
>> it will try to recover the jobs of the first one. Can you gather the
>> job manager logs as well please?
>>
>> – Ufuk
>>
>> On Thu, Mar 17, 2016 at 11:31 AM, Simone Robutti
>> <[hidden email]> wrote:
>> > Ok, i run another test.
>> >
>> > I launched two identical jobs, one after the other, on yarn (without the
>> > long running session). I then killed a job manager and both the jobs got
>> > problems and then resumed their work after a few seconds. The problem is
>> > the
>> > first job restored the state of the second job and vice versa.
>> >
>> > Here are the logs: https://gist.github.com/chobeat/38f8eee753aeaca51acc
>> >
>> > At line 141 of the first job and at line 131 of the second job I killed
>> > the
>> > job manager. As you can see, the first stopped at 48 and resumed at 39
>> > while
>> > the second stopped at 38 and resumed at 48. I hope there's something
>> > wrong
>> > with my configuration because otherwise this really looks like a bug.
>> >
>> > Thanks in advance,
>> >
>> > Simone
>> >
>> > 2016-03-16 18:55 GMT+01:00 Simone Robutti
>> > <[hidden email]>:
>> >>
>> >> Actually the test was intended for a single job. The fact that there
>> >> are
>> >> more jobs is unexpected and it will be the first thing to verify.
>> >> Considering these problems we will go for deeper tests with multiple
>> >> jobs.
>> >>
>> >> The logs are collected with "yarn logs" but log aggregation is not
>> >> properly configured so I wouldn't rely too much on that. Before doing
>> >> the
>> >> tests tomorrow I will clear all the existing logs just to be sure.
>> >>
>> >> 2016-03-16 18:19 GMT+01:00 Ufuk Celebi <[hidden email]>:
>> >>>
>> >>> OK, so you are submitting multiple jobs, but you submit them with -m
>> >>> yarn-cluster and therefore expect them to start separate YARN
>> >>> clusters. Makes sense and I would expect the same.
>> >>>
>> >>> I think that you can check in the client logs printed to stdout to
>> >>> which cluster the job is submitted.
>> >>>
>> >>> PS: The logs you have shared are out-of-order, how did you gather
>> >>> them? Do you have an idea why they are out of order? Maybe something
>> >>> is mixed up in the way we gather the logs and we only think that
>> >>> something is wrong because of this.
>> >>>
>> >>>
>> >>> On Wed, Mar 16, 2016 at 6:11 PM, Simone Robutti
>> >>> <[hidden email]> wrote:
>> >>> > I didn't resubmitted the job. Also the jobs are submitted one by one
>> >>> > with -m
>> >>> > yarn-master, not with a long running yarn session so I don't really
>> >>> > know if
>> >>> > they could mix up.
>> >>> >
>> >>> > I will repeat the test with a cleaned state because we saw that
>> >>> > killing
>> >>> > the
>> >>> > job with yarn application -kill left the "flink run" process alive
>> >>> > so
>> >>> > that
>> >>> > may be the problem. We just noticed a few minutes ago.
>> >>> >
>> >>> > If the problem persists, I will eventually come back with a full
>> >>> > log.
>> >>> >
>> >>> > Thanks for now,
>> >>> >
>> >>> > Simone
>> >>> >
>> >>> > 2016-03-16 18:04 GMT+01:00 Ufuk Celebi <[hidden email]>:
>> >>> >>
>> >>> >> Hey Simone,
>> >>> >>
>> >>> >> from the logs it looks like multiple jobs have been submitted to
>> >>> >> the
>> >>> >> cluster, not just one. The different files correspond to different
>> >>> >> jobs recovering. The filtered logs show three jobs
>> >>> >> running/recovering
>> >>> >> (with IDs 10d8ccae6e87ac56bf763caf4bc4742f,
>> >>> >> 124f29322f9026ac1b35435d5de9f625,
>> >>> >> 7f280b38065eaa6335f5c3de4fc82547).
>> >>> >>
>> >>> >> Did you manually re-submit the job after killing a job manager?
>> >>> >>
>> >>> >> Regarding the counts, it can happen that they are rolled back to a
>> >>> >> previous consistent state if the checkpoint was not completed yet
>> >>> >> (including the write to ZooKeeper). In that case the job state will
>> >>> >> be
>> >>> >> rolled back to an earlier consistent state.
>> >>> >>
>> >>> >> Can you please share the complete job manager logs of your program?
>> >>> >> The most helpful thing will be to have a log for each started job
>> >>> >> manager container. I don't know if that is easily possible.
>> >>> >>
>> >>> >> – Ufuk
>> >>> >>
>> >>> >> On Wed, Mar 16, 2016 at 4:12 PM, Simone Robutti
>> >>> >> <[hidden email]> wrote:
>> >>> >> > This is the log filtered to check messages from
>> >>> >> > ZooKeeperCompletedCheckpointStore.
>> >>> >> >
>> >>> >> > https://gist.github.com/chobeat/0222b31b87df3fa46a23
>> >>> >> >
>> >>> >> > It looks like it finds only a checkpoint but I'm not sure if the
>> >>> >> > different
>> >>> >> > hashes and IDs of the checkpoints are meaningful or not.
>> >>> >> >
>> >>> >> >
>> >>> >> >
>> >>> >> > 2016-03-16 15:33 GMT+01:00 Ufuk Celebi <[hidden email]>:
>> >>> >> >>
>> >>> >> >> Can you please have a look into the JobManager log file and
>> >>> >> >> report
>> >>> >> >> which checkpoints are restored? You should see messages from
>> >>> >> >> ZooKeeperCompletedCheckpointStore like:
>> >>> >> >> - Found X checkpoints in ZooKeeper
>> >>> >> >> - Initialized with X. Removing all older checkpoints
>> >>> >> >>
>> >>> >> >> You can share the complete job manager log file as well if you
>> >>> >> >> like.
>> >>> >> >>
>> >>> >> >> – Ufuk
>> >>> >> >>
>> >>> >> >> On Wed, Mar 16, 2016 at 2:50 PM, Simone Robutti
>> >>> >> >> <[hidden email]> wrote:
>> >>> >> >> > Hello,
>> >>> >> >> >
>> >>> >> >> > I'm testing the checkpointing functionality with hdfs as a
>> >>> >> >> > backend.
>> >>> >> >> >
>> >>> >> >> > For what I can see it uses different checkpointing files and
>> >>> >> >> > resume
>> >>> >> >> > the
>> >>> >> >> > computation from different points and not from the latest
>> >>> >> >> > available.
>> >>> >> >> > This is
>> >>> >> >> > to me an unexpected behaviour.
>> >>> >> >> >
>> >>> >> >> > I log every second, for every worker, a counter that is
>> >>> >> >> > increased
>> >>> >> >> > by
>> >>> >> >> > 1
>> >>> >> >> > at
>> >>> >> >> > each step.
>> >>> >> >> >
>> >>> >> >> > So for example on node-1 the count goes up to 5, then I kill a
>> >>> >> >> > job
>> >>> >> >> > manager
>> >>> >> >> > or task manager and it resumes from 5 or 4 and it's ok. The
>> >>> >> >> > next
>> >>> >> >> > time
>> >>> >> >> > I
>> >>> >> >> > kill
>> >>> >> >> > a job manager the count is at 15 and it resumes at 14 or 15.
>> >>> >> >> > Sometimes
>> >>> >> >> > it
>> >>> >> >> > may happen that at a third kill the work resumes at 4 or 5 as
>> >>> >> >> > if
>> >>> >> >> > the
>> >>> >> >> > checkpoint resumed the second time wasn't there.
>> >>> >> >> >
>> >>> >> >> > Once I even saw it jump forward: the first kill is at 10 and
>> >>> >> >> > it
>> >>> >> >> > resumes
>> >>> >> >> > at
>> >>> >> >> > 9, the second kill is at 70 and it resumes at 9, the third
>> >>> >> >> > kill
>> >>> >> >> > is at
>> >>> >> >> > 15
>> >>> >> >> > but
>> >>> >> >> > it resumes at 69 as if it resumed from the second kill
>> >>> >> >> > checkpoint.
>> >>> >> >> >
>> >>> >> >> > This is clearly inconsistent.
>> >>> >> >> >
>> >>> >> >> > Also, in the logs I can find that sometimes it uses a
>> >>> >> >> > checkpoint
>> >>> >> >> > file
>> >>> >> >> > different from the previous, consistent resume.
>> >>> >> >> >
>> >>> >> >> > What am I doing wrong? Is it a known bug?
>> >>> >> >
>> >>> >> >
>> >>> >
>> >>> >
>> >>
>> >>
>> >
>
>
>
>
> --
> BR,
> Stefano Baghino
>
> Software Engineer @ Radicalbit



--
BR,
Stefano Baghino

Software Engineer @ Radicalbit