Stopping of a streaming job empties state store on HDFS

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

Stopping of a streaming job empties state store on HDFS

Peter Zende
Hi all,

We have a streaming pipeline (Flink 1.4.2) for which we implemented stoppable sources to be able to  gracefully exit from the job with Yarn state "finished/succeeded".
This works fine, however after creating a savepoint, stopping the job (stop event) and restarting it we remarked that the RocksDB state hasn't been recovered. It looks like that it's because the state directory on HDFS was emptied after issueing a stop event. This isn't the case when we cancel the job, but we'd like to distinguish between job failures and stop events. After reading some related tickets (e.g. FLINK-4201, FLINK-5007) it's still not clear why this is the intended behavior.
Should we use cancel instead?

When we backup the local state directory, stop the job, copy back the directory and start a new job from the savepoint then it works fine.
Another issue is that when we restart the job with different source (1st job: HDFS and Kafka, 2nd job: Kafka), each having uids set, the recovery from savepoint doesn't fail but the local state isn't restored. Is there any trick besides setting allowNonRestoredState?

Many thanks,
Peter
Reply | Threaded
Open this post in threaded view
|

Re: Stopping of a streaming job empties state store on HDFS

Stefan Richter
Hi,

> Am 08.06.2018 um 01:16 schrieb Peter Zende <[hidden email]>:
>
> Hi all,
>
> We have a streaming pipeline (Flink 1.4.2) for which we implemented stoppable sources to be able to  gracefully exit from the job with Yarn state "finished/succeeded".
> This works fine, however after creating a savepoint, stopping the job (stop event) and restarting it we remarked that the RocksDB state hasn't been recovered. It looks like that it's because the state directory on HDFS was emptied after issueing a stop event. This isn't the case when we cancel the job, but we'd like to distinguish between job failures and stop events. After reading some related tickets (e.g. FLINK-4201, FLINK-5007) it's still not clear why this is the intended behavior.
> Should we use cancel instead?

Savepoints should _not_ be cleaned up in case of stop or cancellation, checkpoints should be cleaned up. Where are you storing the created savepoints? They should not go into the checkpoint directory. Stop is intended to be a more „graceful“ variant of cancel, but I think it is rarely used with Flink. I would prefer cancel except if you really require to use stoppable for some particular reason.

> When we backup the local state directory, stop the job, copy back the directory and start a new job from the savepoint then it works fine.
> Another issue is that when we restart the job with different source (1st job: HDFS and Kafka, 2nd job: Kafka), each having uids set, the recovery from savepoint doesn't fail but the local state isn't restored. Is there any trick besides setting allowNonRestoredState?


I need to clarify here, when you say „each having uids set“, do you set the same uids for both types of sources? The uid must match, because Flink will reassign the state in a restore based on the uids, i.e. state x goes to the operator with the same uid as the uid of the operator that created it in the previous job. The flag allowNonRestoredState has the purpose to tolerate that some state from a checkpoint/savepoint does not find a matching operator to which it should be assigned (no operator with matching uid exists in the jobgraph). For example, you want this if you removed operators from the job.

Best,
Stefan

Reply | Threaded
Open this post in threaded view
|

Re: Stopping of a streaming job empties state store on HDFS

Peter Zende
Hi Stefan,

Thanks for the answer.
Fixing the uids solved the problem, that's not an issue anymore.
The savepoint directory is there, but the RocksDB state is not restored after restarting the application because 
that state directory has been removed when I stopped the application. It looks like that the savepoints itself don't
contain the rocksdb state files. What we have is:
env.setStateBackend(new RocksDBStateBackend(new FsStateBackend(stateStoreLocation, true)))
-> this location got emptied.
In the meanwhile we switched to cancelWithSavepoint which doesn't have this behavior and it works fine however the YARN application status 
results in FAILED instead of SUCCEED what we had in case of stopping.

What are the uses cases of stopping? We implemented it because we wanted to ensure that the application shuts down correctly and
we don't end in incosistent/broken state..

Thanks,
Peter


2018-06-11 11:31 GMT+02:00 Stefan Richter <[hidden email]>:
Hi,

> Am 08.06.2018 um 01:16 schrieb Peter Zende <[hidden email]>:
>
> Hi all,
>
> We have a streaming pipeline (Flink 1.4.2) for which we implemented stoppable sources to be able to  gracefully exit from the job with Yarn state "finished/succeeded".
> This works fine, however after creating a savepoint, stopping the job (stop event) and restarting it we remarked that the RocksDB state hasn't been recovered. It looks like that it's because the state directory on HDFS was emptied after issueing a stop event. This isn't the case when we cancel the job, but we'd like to distinguish between job failures and stop events. After reading some related tickets (e.g. FLINK-4201, FLINK-5007) it's still not clear why this is the intended behavior.
> Should we use cancel instead?

Savepoints should _not_ be cleaned up in case of stop or cancellation, checkpoints should be cleaned up. Where are you storing the created savepoints? They should not go into the checkpoint directory. Stop is intended to be a more „graceful“ variant of cancel, but I think it is rarely used with Flink. I would prefer cancel except if you really require to use stoppable for some particular reason.

> When we backup the local state directory, stop the job, copy back the directory and start a new job from the savepoint then it works fine.
> Another issue is that when we restart the job with different source (1st job: HDFS and Kafka, 2nd job: Kafka), each having uids set, the recovery from savepoint doesn't fail but the local state isn't restored. Is there any trick besides setting allowNonRestoredState?


I need to clarify here, when you say „each having uids set“, do you set the same uids for both types of sources? The uid must match, because Flink will reassign the state in a restore based on the uids, i.e. state x goes to the operator with the same uid as the uid of the operator that created it in the previous job. The flag allowNonRestoredState has the purpose to tolerate that some state from a checkpoint/savepoint does not find a matching operator to which it should be assigned (no operator with matching uid exists in the jobgraph). For example, you want this if you removed operators from the job.

Best,
Stefan


Reply | Threaded
Open this post in threaded view
|

Re: Stopping of a streaming job empties state store on HDFS

Till Rohrmann
Hi Peter,

this sounds very strange. I just tried to reproduce the issue locally but for me it worked without a problem. Could you maybe share the jobmanager logs on DEBUG log level with us?

As a side note, enabling the asynchronous checkpointing mode for the FsStateBackend does not have an effect on the RocksDBStateBackend. You should rather call `new RocksDBStateBackend(new FsStateBackend(stateStoreLocation), true)` if you want to enable asynchronous checkpointing.

Cheers,
Till

On Fri, Jun 15, 2018 at 9:57 AM Peter Zende <[hidden email]> wrote:
Hi Stefan,

Thanks for the answer.
Fixing the uids solved the problem, that's not an issue anymore.
The savepoint directory is there, but the RocksDB state is not restored after restarting the application because 
that state directory has been removed when I stopped the application. It looks like that the savepoints itself don't
contain the rocksdb state files. What we have is:
env.setStateBackend(new RocksDBStateBackend(new FsStateBackend(stateStoreLocation, true)))
-> this location got emptied.
In the meanwhile we switched to cancelWithSavepoint which doesn't have this behavior and it works fine however the YARN application status 
results in FAILED instead of SUCCEED what we had in case of stopping.

What are the uses cases of stopping? We implemented it because we wanted to ensure that the application shuts down correctly and
we don't end in incosistent/broken state..

Thanks,
Peter


2018-06-11 11:31 GMT+02:00 Stefan Richter <[hidden email]>:
Hi,

> Am 08.06.2018 um 01:16 schrieb Peter Zende <[hidden email]>:
>
> Hi all,
>
> We have a streaming pipeline (Flink 1.4.2) for which we implemented stoppable sources to be able to  gracefully exit from the job with Yarn state "finished/succeeded".
> This works fine, however after creating a savepoint, stopping the job (stop event) and restarting it we remarked that the RocksDB state hasn't been recovered. It looks like that it's because the state directory on HDFS was emptied after issueing a stop event. This isn't the case when we cancel the job, but we'd like to distinguish between job failures and stop events. After reading some related tickets (e.g. FLINK-4201, FLINK-5007) it's still not clear why this is the intended behavior.
> Should we use cancel instead?

Savepoints should _not_ be cleaned up in case of stop or cancellation, checkpoints should be cleaned up. Where are you storing the created savepoints? They should not go into the checkpoint directory. Stop is intended to be a more „graceful“ variant of cancel, but I think it is rarely used with Flink. I would prefer cancel except if you really require to use stoppable for some particular reason.

> When we backup the local state directory, stop the job, copy back the directory and start a new job from the savepoint then it works fine.
> Another issue is that when we restart the job with different source (1st job: HDFS and Kafka, 2nd job: Kafka), each having uids set, the recovery from savepoint doesn't fail but the local state isn't restored. Is there any trick besides setting allowNonRestoredState?


I need to clarify here, when you say „each having uids set“, do you set the same uids for both types of sources? The uid must match, because Flink will reassign the state in a restore based on the uids, i.e. state x goes to the operator with the same uid as the uid of the operator that created it in the previous job. The flag allowNonRestoredState has the purpose to tolerate that some state from a checkpoint/savepoint does not find a matching operator to which it should be assigned (no operator with matching uid exists in the jobgraph). For example, you want this if you removed operators from the job.

Best,
Stefan