Permissions to delete Checkpoint on cancel

classic Classic list List threaded Threaded
6 messages Options
Reply | Threaded
Open this post in threaded view
|

Permissions to delete Checkpoint on cancel

Ashish Pokharel
All,

We recently moved our Checkpoint directory from HDFS to local SSDs mounted on Data Nodes (we were starting to see perf impacts on checkpoints etc as complex ML apps were spinning up more and more in YARN). This worked great other than the fact that when jobs are being canceled or canceled with Savepoint, local data is not being cleaned up. In HDFS, Checkpoint directories were cleaned up on Cancel and Cancel with Savepoints as far as I can remember. I am wondering if it is permissions issue. Local disks have RWX permissions for both yarn and flink headless users (flink headless user submits the apps to YARN using our CICD pipeline).

Appreciate any pointers on this.

Thanks, Ashish
Reply | Threaded
Open this post in threaded view
|

Re: Permissions to delete Checkpoint on cancel

Stefan Richter
Hi,

I am wondering how this can even work properly if you are using a local fs for checkpoints instead of a distributed fs. First, what happens under node failures, if the SSD becomes unavailable or if a task gets scheduled to a different machine, and can no longer access the disk with the  corresponding state data, or if you want to scale-out. Second, the same problem is also what you can observe with the job manager: how could the checkpoint coordinator, that runs on the JM, access a file on a local FS on a different node to cleanup the checkpoint data? The purpose of using a distributed fs here is that all TM and the JM can access the checkpoint files.

Best,
Stefan

> Am 22.07.2018 um 19:03 schrieb Ashish Pokharel <[hidden email]>:
>
> All,
>
> We recently moved our Checkpoint directory from HDFS to local SSDs mounted on Data Nodes (we were starting to see perf impacts on checkpoints etc as complex ML apps were spinning up more and more in YARN). This worked great other than the fact that when jobs are being canceled or canceled with Savepoint, local data is not being cleaned up. In HDFS, Checkpoint directories were cleaned up on Cancel and Cancel with Savepoints as far as I can remember. I am wondering if it is permissions issue. Local disks have RWX permissions for both yarn and flink headless users (flink headless user submits the apps to YARN using our CICD pipeline).
>
> Appreciate any pointers on this.
>
> Thanks, Ashish

Reply | Threaded
Open this post in threaded view
|

Re: Permissions to delete Checkpoint on cancel

Ashish Pokharel
Stefan,

I did have first point at the back of my mind. I was under the impression though for checkpoints, cleanup would be done by TMs as they are being taken by TMs.

So for a standalone cluster with its own zookeeper for JM high availability, a NAS is a must have? We were going to go with local checkpoints with access to remote HDFS for savepoints. This sounds like it will be a bad idea then. Unfortunately we can’t run on YARN and NAS is also a no-no in one of our datacenters - there is a mountain of security complainace to climb before we will in Production if we need to go that route.

Thanks, Ashish

On Monday, July 23, 2018, 5:10 AM, Stefan Richter <[hidden email]> wrote:

Hi,

I am wondering how this can even work properly if you are using a local fs for checkpoints instead of a distributed fs. First, what happens under node failures, if the SSD becomes unavailable or if a task gets scheduled to a different machine, and can no longer access the disk with the  corresponding state data, or if you want to scale-out. Second, the same problem is also what you can observe with the job manager: how could the checkpoint coordinator, that runs on the JM, access a file on a local FS on a different node to cleanup the checkpoint data? The purpose of using a distributed fs here is that all TM and the JM can access the checkpoint files.

Best,
Stefan

> Am 22.07.2018 um 19:03 schrieb Ashish Pokharel <[hidden email]>:
>
> All,
>
> We recently moved our Checkpoint directory from HDFS to local SSDs mounted on Data Nodes (we were starting to see perf impacts on checkpoints etc as complex ML apps were spinning up more and more in YARN). This worked great other than the fact that when jobs are being canceled or canceled with Savepoint, local data is not being cleaned up. In HDFS, Checkpoint directories were cleaned up on Cancel and Cancel with Savepoints as far as I can remember. I am wondering if it is permissions issue. Local disks have RWX permissions for both yarn and flink headless users (flink headless user submits the apps to YARN using our CICD pipeline).
>
> Appreciate any pointers on this.
>
> Thanks, Ashish
Reply | Threaded
Open this post in threaded view
|

Re: Permissions to delete Checkpoint on cancel

Ashish Pokharel
Sorry,

Just a follow-up. In absence of NAS then the best option to go with here is checkpoint and savepoints both on HDFS and StateBackend using local SSDs then?

We were trying to not even hit HDFS other than for savepoints.


- Ashish

On Monday, July 23, 2018, 7:45 AM, ashish pok <[hidden email]> wrote:

Stefan,

I did have first point at the back of my mind. I was under the impression though for checkpoints, cleanup would be done by TMs as they are being taken by TMs.

So for a standalone cluster with its own zookeeper for JM high availability, a NAS is a must have? We were going to go with local checkpoints with access to remote HDFS for savepoints. This sounds like it will be a bad idea then. Unfortunately we can’t run on YARN and NAS is also a no-no in one of our datacenters - there is a mountain of security complainace to climb before we will in Production if we need to go that route.

Thanks, Ashish

On Monday, July 23, 2018, 5:10 AM, Stefan Richter <[hidden email]> wrote:

Hi,

I am wondering how this can even work properly if you are using a local fs for checkpoints instead of a distributed fs. First, what happens under node failures, if the SSD becomes unavailable or if a task gets scheduled to a different machine, and can no longer access the disk with the  corresponding state data, or if you want to scale-out. Second, the same problem is also what you can observe with the job manager: how could the checkpoint coordinator, that runs on the JM, access a file on a local FS on a different node to cleanup the checkpoint data? The purpose of using a distributed fs here is that all TM and the JM can access the checkpoint files.

Best,
Stefan

> Am 22.07.2018 um 19:03 schrieb Ashish Pokharel <[hidden email]>:
>
> All,
>
> We recently moved our Checkpoint directory from HDFS to local SSDs mounted on Data Nodes (we were starting to see perf impacts on checkpoints etc as complex ML apps were spinning up more and more in YARN). This worked great other than the fact that when jobs are being canceled or canceled with Savepoint, local data is not being cleaned up. In HDFS, Checkpoint directories were cleaned up on Cancel and Cancel with Savepoints as far as I can remember. I am wondering if it is permissions issue. Local disks have RWX permissions for both yarn and flink headless users (flink headless user submits the apps to YARN using our CICD pipeline).
>
> Appreciate any pointers on this.
>
> Thanks, Ashish
Reply | Threaded
Open this post in threaded view
|

Re: Permissions to delete Checkpoint on cancel

Stefan Richter
Hi,

ok, let me briefly explain the differences between local working director, checkpoint directory, and savepoint directory and also outline their best practises/requirements/tradeoffs. First easy comment is that typically checkpoints and savepoints have similar requirements and most users write them to the same fs. The working directory, i.e. the directory for spilling or where RocksDB operates is transient, it does not require replication because it is not part of the fault tolerance strategy. Here the main concern is speed and that is why it is ideally a local, physically attached disk on the TM machine.

In contrast to that, checkpoints and savepoints are part of the fault tolerance strategy and that is why they typically should be on fault tolerant file systems. In database terms, think of checkpoints as a recovery mechanism and savepoints as backups. As we usually want to survive node failures, those file systems should be fault tolerant/replicated, and also accessible for read/write from all TMs and the JM. TMs obviously need to write the data, and read in recovery. Under node failures, this means that a TM might have to read state that was written on a different machine, that is why TMs should be able to access the files written by other TMs. The JM is responsible for deleting checkpoints, because TMs might go down and that is why the JM needs access as well.

Those requirements typically hold for most Flink users. However, you might get away with certain particular trade-offs. You can write checkpoints to local disk if:

- Everything runs on one machine, or

- (not sure somebody ever did this, but it could work)
1) You will do the cleanup of old checkpoints manually (because JM cannot reach them), e.g. with scripts and
2) You will never try to rescale from a checkpoint and 
3) Tasks will never migrate to a different machine. You ignore node/disk/etc failures, and ensure that your job „owns" the cluster with no other jobs running in parallel. This means accepting data loss in the previous cases.

Typically, it should be ok to use a dfs only for checkpoints and savepoints, the local working directories should not go to dfs or else things will slow down dramatically. If you are just worried about recovery times, you might want to take a look at the local recovery feature [1], that keeps a secondary copy of the state on local disk for faster restores, but still ensures fault tolerance with a primary copy in dfs.

Best,
Stefan

[1] https://ci.apache.org/projects/flink/flink-docs-master/ops/state/large_state_tuning.html#task-local-recovery

Am 23.07.2018 um 14:18 schrieb ashish pok <[hidden email]>:

Sorry,

Just a follow-up. In absence of NAS then the best option to go with here is checkpoint and savepoints both on HDFS and StateBackend using local SSDs then?

We were trying to not even hit HDFS other than for savepoints.


- Ashish

On Monday, July 23, 2018, 7:45 AM, ashish pok <[hidden email]> wrote:

Stefan,

I did have first point at the back of my mind. I was under the impression though for checkpoints, cleanup would be done by TMs as they are being taken by TMs.

So for a standalone cluster with its own zookeeper for JM high availability, a NAS is a must have? We were going to go with local checkpoints with access to remote HDFS for savepoints. This sounds like it will be a bad idea then. Unfortunately we can’t run on YARN and NAS is also a no-no in one of our datacenters - there is a mountain of security complainace to climb before we will in Production if we need to go that route.

Thanks, Ashish

On Monday, July 23, 2018, 5:10 AM, Stefan Richter <[hidden email]> wrote:

Hi,

I am wondering how this can even work properly if you are using a local fs for checkpoints instead of a distributed fs. First, what happens under node failures, if the SSD becomes unavailable or if a task gets scheduled to a different machine, and can no longer access the disk with the  corresponding state data, or if you want to scale-out. Second, the same problem is also what you can observe with the job manager: how could the checkpoint coordinator, that runs on the JM, access a file on a local FS on a different node to cleanup the checkpoint data? The purpose of using a distributed fs here is that all TM and the JM can access the checkpoint files.

Best,
Stefan

> Am 22.07.2018 um 19:03 schrieb Ashish Pokharel <[hidden email]>:
>
> All,
>
> We recently moved our Checkpoint directory from HDFS to local SSDs mounted on Data Nodes (we were starting to see perf impacts on checkpoints etc as complex ML apps were spinning up more and more in YARN). This worked great other than the fact that when jobs are being canceled or canceled with Savepoint, local data is not being cleaned up. In HDFS, Checkpoint directories were cleaned up on Cancel and Cancel with Savepoints as far as I can remember. I am wondering if it is permissions issue. Local disks have RWX permissions for both yarn and flink headless users (flink headless user submits the apps to YARN using our CICD pipeline).
>
> Appreciate any pointers on this.
>
> Thanks, Ashish

Reply | Threaded
Open this post in threaded view
|

Re: Permissions to delete Checkpoint on cancel

Ashish Pokharel
Stefan, 

Can’t thank you enough for this write-up. This is awesome explanation. I had misunderstood concepts of RocksDB working directory and Checkpoint FS. My main intent is to boost performance of RocksDB with SSD available locally. Recovery time from HDFS is not much of a concern but load on HDFS “may” be a concern in future - we will see.

Going over the documentation again after reading your email, it looks like what I intended to do was change my RocksDB working directory to local SSD, which I believe is Java IO Tmp dir by default, by using 
state.backend.rocksdb.checkpointdir option first and perform any tuning necessary to optimize SSD. 

Thanks,

- Ashish

On Monday, July 23, 2018, 10:39 AM, Stefan Richter <[hidden email]> wrote:

Hi,

ok, let me briefly explain the differences between local working director, checkpoint directory, and savepoint directory and also outline their best practises/requirements/tradeoffs. First easy comment is that typically checkpoints and savepoints have similar requirements and most users write them to the same fs. The working directory, i.e. the directory for spilling or where RocksDB operates is transient, it does not require replication because it is not part of the fault tolerance strategy. Here the main concern is speed and that is why it is ideally a local, physically attached disk on the TM machine.

In contrast to that, checkpoints and savepoints are part of the fault tolerance strategy and that is why they typically should be on fault tolerant file systems. In database terms, think of checkpoints as a recovery mechanism and savepoints as backups. As we usually want to survive node failures, those file systems should be fault tolerant/replicated, and also accessible for read/write from all TMs and the JM. TMs obviously need to write the data, and read in recovery. Under node failures, this means that a TM might have to read state that was written on a different machine, that is why TMs should be able to access the files written by other TMs. The JM is responsible for deleting checkpoints, because TMs might go down and that is why the JM needs access as well.

Those requirements typically hold for most Flink users. However, you might get away with certain particular trade-offs. You can write checkpoints to local disk if:

- Everything runs on one machine, or

- (not sure somebody ever did this, but it could work)
1) You will do the cleanup of old checkpoints manually (because JM cannot reach them), e.g. with scripts and
2) You will never try to rescale from a checkpoint and 
3) Tasks will never migrate to a different machine. You ignore node/disk/etc failures, and ensure that your job „owns" the cluster with no other jobs running in parallel. This means accepting data loss in the previous cases.

Typically, it should be ok to use a dfs only for checkpoints and savepoints, the local working directories should not go to dfs or else things will slow down dramatically. If you are just worried about recovery times, you might want to take a look at the local recovery feature [1], that keeps a secondary copy of the state on local disk for faster restores, but still ensures fault tolerance with a primary copy in dfs.

Best,
Stefan

[1] https://ci.apache.org/projects/flink/flink-docs-master/ops/state/large_state_tuning.html#task-local-recovery

Am 23.07.2018 um 14:18 schrieb ashish pok <[hidden email]>:

Sorry,

Just a follow-up. In absence of NAS then the best option to go with here is checkpoint and savepoints both on HDFS and StateBackend using local SSDs then?

We were trying to not even hit HDFS other than for savepoints.


- Ashish

On Monday, July 23, 2018, 7:45 AM, ashish pok <[hidden email]> wrote:

Stefan,

I did have first point at the back of my mind. I was under the impression though for checkpoints, cleanup would be done by TMs as they are being taken by TMs.

So for a standalone cluster with its own zookeeper for JM high availability, a NAS is a must have? We were going to go with local checkpoints with access to remote HDFS for savepoints. This sounds like it will be a bad idea then. Unfortunately we can’t run on YARN and NAS is also a no-no in one of our datacenters - there is a mountain of security complainace to climb before we will in Production if we need to go that route.

Thanks, Ashish

On Monday, July 23, 2018, 5:10 AM, Stefan Richter <[hidden email]> wrote:

Hi,

I am wondering how this can even work properly if you are using a local fs for checkpoints instead of a distributed fs. First, what happens under node failures, if the SSD becomes unavailable or if a task gets scheduled to a different machine, and can no longer access the disk with the  corresponding state data, or if you want to scale-out. Second, the same problem is also what you can observe with the job manager: how could the checkpoint coordinator, that runs on the JM, access a file on a local FS on a different node to cleanup the checkpoint data? The purpose of using a distributed fs here is that all TM and the JM can access the checkpoint files.

Best,
Stefan

> Am 22.07.2018 um 19:03 schrieb Ashish Pokharel <[hidden email]>:
>
> All,
>
> We recently moved our Checkpoint directory from HDFS to local SSDs mounted on Data Nodes (we were starting to see perf impacts on checkpoints etc as complex ML apps were spinning up more and more in YARN). This worked great other than the fact that when jobs are being canceled or canceled with Savepoint, local data is not being cleaned up. In HDFS, Checkpoint directories were cleaned up on Cancel and Cancel with Savepoints as far as I can remember. I am wondering if it is permissions issue. Local disks have RWX permissions for both yarn and flink headless users (flink headless user submits the apps to YARN using our CICD pipeline).
>
> Appreciate any pointers on this.
>
> Thanks, Ashish