Corrupted unaligned checkpoints in Flink 1.11.1

classic Classic list List threaded Threaded
8 messages Options
Reply | Threaded
Open this post in threaded view
|

Corrupted unaligned checkpoints in Flink 1.11.1

Alexander Filipchik
Hi,

Trying to figure out what happened with our Flink job. We use flink 1.11.1 and run a job with unaligned checkpoints and Rocks Db backend. The whole state is around 300Gb judging by the size of savepoints.

The job ran ok. At some point we tried to deploy new code, but we couldn't take a save point as they were timing out. It looks like the reason it was timing out was due to disk throttle (we use google regional disks).
The new code was deployed using an externalized checkpoint, but it didn't start as job was failing with:

Caused by: java.io.FileNotFoundException: Item not found: 'gs://../app/checkpoints/2834fa1c81dcf7c9578a8be9a371b0d1/shared/3477b236-fb4b-4a0d-be73-cb6fac62c007'. Note, it is possible that the live version is still available but the requested generation is deleted.
    at com.google.cloud.hadoop.gcsio.GoogleCloudStorageExceptions.createFileNotFoundException(GoogleCloudStorageExceptions.java:45)
    at com.google.cloud.hadoop.gcsio.GoogleCloudStorageImpl.open(GoogleCloudStorageImpl.java:653)
    at com.google.cloud.hadoop.gcsio.GoogleCloudStorageFileSystem.open(GoogleCloudStorageFileSystem.java:277)
    at com.google.cloud.hadoop.fs.gcs.GoogleHadoopFSInputStream.<init>(GoogleHadoopFSInputStream.java:78)
    at com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemBase.open(GoogleHadoopFileSystemBase.java:620)
    at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:950)
    at com.css.flink.fs.gcs.moved.HadoopFileSystem.open(HadoopFileSystem.java:120)
    at com.css.flink.fs.gcs.moved.HadoopFileSystem.open(HadoopFileSystem.java:37)
    at org.apache.flink.core.fs.PluginFileSystemFactory$ClassLoaderFixingFileSystem.open(PluginFileSystemFactory.java:127)
    at org.apache.flink.core.fs.SafetyNetWrapperFileSystem.open(SafetyNetWrapperFileSystem.java:85)
    at org.apache.flink.runtime.state.filesystem.FileStateHandle.openInputStream(FileStateHandle.java:69)
    at org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.downloadDataForStateHandle(RocksDBStateDownloader.java:126)
    at org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.lambda$createDownloadRunnables$0(RocksDBStateDownloader.java:109)
    at org.apache.flink.util.function.ThrowingRunnable.lambda$unchecked$0(ThrowingRunnable.java:50)
    at java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1640)
    at org.apache.flink.runtime.concurrent.DirectExecutorService.execute(DirectExecutorService.java:211)
    at java.util.concurrent.CompletableFuture.asyncRunStage(CompletableFuture.java:1654)
    at java.util.concurrent.CompletableFuture.runAsync(CompletableFuture.java:1871)
    at org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.downloadDataForAllStateHandles(RocksDBStateDownloader.java:83)
    at org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.transferAllStateDataToDirectory(RocksDBStateDownloader.java:66)
    at org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.transferRemoteStateToLocalDirectory(RocksDBIncrementalRestoreOperation.java:230)
    at org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreFromRemoteState(RocksDBIncrementalRestoreOperation.java:195)
    at org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreWithoutRescaling(RocksDBIncrementalRestoreOperation.java:169)
    at org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restore(RocksDBIncrementalRestoreOperation.java:155)
    at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:270)
    ... 15 more

We tried to roll back the code, we tried different checkpoints, but all the attempts failed with the same error. The job ID in the error is not from the same checkpoint path, it looks like restore logic
looks back at previous jobs, as all the checkpoints after 2834fa1c81dcf7c9578a8be9a371b0d1 are failing to restore with the same error.
We looked at different checkpoints and found that some of them miss metadata file and can't be used for restoration.
We also use ZK for HA, and we cleaned up the state there between deployments to make sure the non existent file
is not coming from there.

We decided to drop the state as we have means to repopulate it, but it would be great to get to the bottom of it. Any help will be appreciated.

Alex
Reply | Threaded
Open this post in threaded view
|

Re: Corrupted unaligned checkpoints in Flink 1.11.1

Chesnay Schepler
Is there anything in the Flink logs indicating issues with writing the checkpoint data?
When the savepoint could not be created, was anything logged from Flink? How did you shut down the cluster?

On 6/3/2021 5:56 AM, Alexander Filipchik wrote:
Hi,

Trying to figure out what happened with our Flink job. We use flink 1.11.1 and run a job with unaligned checkpoints and Rocks Db backend. The whole state is around 300Gb judging by the size of savepoints.

The job ran ok. At some point we tried to deploy new code, but we couldn't take a save point as they were timing out. It looks like the reason it was timing out was due to disk throttle (we use google regional disks).
The new code was deployed using an externalized checkpoint, but it didn't start as job was failing with:

Caused by: java.io.FileNotFoundException: Item not found: 'gs://../app/checkpoints/2834fa1c81dcf7c9578a8be9a371b0d1/shared/3477b236-fb4b-4a0d-be73-cb6fac62c007'. Note, it is possible that the live version is still available but the requested generation is deleted.
    at com.google.cloud.hadoop.gcsio.GoogleCloudStorageExceptions.createFileNotFoundException(GoogleCloudStorageExceptions.java:45)
    at com.google.cloud.hadoop.gcsio.GoogleCloudStorageImpl.open(GoogleCloudStorageImpl.java:653)
    at com.google.cloud.hadoop.gcsio.GoogleCloudStorageFileSystem.open(GoogleCloudStorageFileSystem.java:277)
    at com.google.cloud.hadoop.fs.gcs.GoogleHadoopFSInputStream.<init>(GoogleHadoopFSInputStream.java:78)
    at com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemBase.open(GoogleHadoopFileSystemBase.java:620)
    at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:950)
    at com.css.flink.fs.gcs.moved.HadoopFileSystem.open(HadoopFileSystem.java:120)
    at com.css.flink.fs.gcs.moved.HadoopFileSystem.open(HadoopFileSystem.java:37)
    at org.apache.flink.core.fs.PluginFileSystemFactory$ClassLoaderFixingFileSystem.open(PluginFileSystemFactory.java:127)
    at org.apache.flink.core.fs.SafetyNetWrapperFileSystem.open(SafetyNetWrapperFileSystem.java:85)
    at org.apache.flink.runtime.state.filesystem.FileStateHandle.openInputStream(FileStateHandle.java:69)
    at org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.downloadDataForStateHandle(RocksDBStateDownloader.java:126)
    at org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.lambda$createDownloadRunnables$0(RocksDBStateDownloader.java:109)
    at org.apache.flink.util.function.ThrowingRunnable.lambda$unchecked$0(ThrowingRunnable.java:50)
    at java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1640)
    at org.apache.flink.runtime.concurrent.DirectExecutorService.execute(DirectExecutorService.java:211)
    at java.util.concurrent.CompletableFuture.asyncRunStage(CompletableFuture.java:1654)
    at java.util.concurrent.CompletableFuture.runAsync(CompletableFuture.java:1871)
    at org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.downloadDataForAllStateHandles(RocksDBStateDownloader.java:83)
    at org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.transferAllStateDataToDirectory(RocksDBStateDownloader.java:66)
    at org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.transferRemoteStateToLocalDirectory(RocksDBIncrementalRestoreOperation.java:230)
    at org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreFromRemoteState(RocksDBIncrementalRestoreOperation.java:195)
    at org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreWithoutRescaling(RocksDBIncrementalRestoreOperation.java:169)
    at org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restore(RocksDBIncrementalRestoreOperation.java:155)
    at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:270)
    ... 15 more
We tried to roll back the code, we tried different checkpoints, but all the attempts failed with the same error. The job ID in the error is not from the same checkpoint path, it looks like restore logic
looks back at previous jobs, as all the checkpoints after 2834fa1c81dcf7c9578a8be9a371b0d1 are failing to restore with the same error.
We looked at different checkpoints and found that some of them miss metadata file and can't be used for restoration.
We also use ZK for HA, and we cleaned up the state there between deployments to make sure the non existent file
is not coming from there.
We decided to drop the state as we have means to repopulate it, but it would be great to get to the bottom of it. Any help will be appreciated.

Alex


Reply | Threaded
Open this post in threaded view
|

Re: Corrupted unaligned checkpoints in Flink 1.11.1

Alexander Filipchik
On the checkpoints -> what kind of issues should I check for? I was looking for metrics and it looks like they were reporting successful checkpoints. It looks like some files were removed in the shared folder, but I'm not sure how to check for what caused it.

Savepoints were failing due to savepoint timeout timeout. Based on metrics, our attached disks were not fast enough (GCS regional disks are network disks and were throttled). The team cancelled the savepoint and just killed the kubernetes cluster. I assume some checkpoints were interrupted as the job triggers them one after another.

Is there a known issue with termination during running checkpoint?

Btw, we use the Flink Kube operator from Lyft.

Alex

On Thu, Jun 3, 2021 at 1:24 AM Chesnay Schepler <[hidden email]> wrote:
Is there anything in the Flink logs indicating issues with writing the checkpoint data?
When the savepoint could not be created, was anything logged from Flink? How did you shut down the cluster?

On 6/3/2021 5:56 AM, Alexander Filipchik wrote:
Hi,

Trying to figure out what happened with our Flink job. We use flink 1.11.1 and run a job with unaligned checkpoints and Rocks Db backend. The whole state is around 300Gb judging by the size of savepoints.

The job ran ok. At some point we tried to deploy new code, but we couldn't take a save point as they were timing out. It looks like the reason it was timing out was due to disk throttle (we use google regional disks).
The new code was deployed using an externalized checkpoint, but it didn't start as job was failing with:

Caused by: java.io.FileNotFoundException: Item not found: 'gs://../app/checkpoints/2834fa1c81dcf7c9578a8be9a371b0d1/shared/3477b236-fb4b-4a0d-be73-cb6fac62c007'. Note, it is possible that the live version is still available but the requested generation is deleted.
    at com.google.cloud.hadoop.gcsio.GoogleCloudStorageExceptions.createFileNotFoundException(GoogleCloudStorageExceptions.java:45)
    at com.google.cloud.hadoop.gcsio.GoogleCloudStorageImpl.open(GoogleCloudStorageImpl.java:653)
    at com.google.cloud.hadoop.gcsio.GoogleCloudStorageFileSystem.open(GoogleCloudStorageFileSystem.java:277)
    at com.google.cloud.hadoop.fs.gcs.GoogleHadoopFSInputStream.<init>(GoogleHadoopFSInputStream.java:78)
    at com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemBase.open(GoogleHadoopFileSystemBase.java:620)
    at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:950)
    at com.css.flink.fs.gcs.moved.HadoopFileSystem.open(HadoopFileSystem.java:120)
    at com.css.flink.fs.gcs.moved.HadoopFileSystem.open(HadoopFileSystem.java:37)
    at org.apache.flink.core.fs.PluginFileSystemFactory$ClassLoaderFixingFileSystem.open(PluginFileSystemFactory.java:127)
    at org.apache.flink.core.fs.SafetyNetWrapperFileSystem.open(SafetyNetWrapperFileSystem.java:85)
    at org.apache.flink.runtime.state.filesystem.FileStateHandle.openInputStream(FileStateHandle.java:69)
    at org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.downloadDataForStateHandle(RocksDBStateDownloader.java:126)
    at org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.lambda$createDownloadRunnables$0(RocksDBStateDownloader.java:109)
    at org.apache.flink.util.function.ThrowingRunnable.lambda$unchecked$0(ThrowingRunnable.java:50)
    at java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1640)
    at org.apache.flink.runtime.concurrent.DirectExecutorService.execute(DirectExecutorService.java:211)
    at java.util.concurrent.CompletableFuture.asyncRunStage(CompletableFuture.java:1654)
    at java.util.concurrent.CompletableFuture.runAsync(CompletableFuture.java:1871)
    at org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.downloadDataForAllStateHandles(RocksDBStateDownloader.java:83)
    at org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.transferAllStateDataToDirectory(RocksDBStateDownloader.java:66)
    at org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.transferRemoteStateToLocalDirectory(RocksDBIncrementalRestoreOperation.java:230)
    at org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreFromRemoteState(RocksDBIncrementalRestoreOperation.java:195)
    at org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreWithoutRescaling(RocksDBIncrementalRestoreOperation.java:169)
    at org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restore(RocksDBIncrementalRestoreOperation.java:155)
    at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:270)
    ... 15 more
We tried to roll back the code, we tried different checkpoints, but all the attempts failed with the same error. The job ID in the error is not from the same checkpoint path, it looks like restore logic
looks back at previous jobs, as all the checkpoints after 2834fa1c81dcf7c9578a8be9a371b0d1 are failing to restore with the same error.
We looked at different checkpoints and found that some of them miss metadata file and can't be used for restoration.
We also use ZK for HA, and we cleaned up the state there between deployments to make sure the non existent file
is not coming from there.
We decided to drop the state as we have means to repopulate it, but it would be great to get to the bottom of it. Any help will be appreciated.

Alex


Reply | Threaded
Open this post in threaded view
|

Re: Corrupted unaligned checkpoints in Flink 1.11.1

Alexander Filipchik
Looked through the logs and didn't see anything fishy that indicated an exception during checkpointing.
To make it clearer, here is the timeline (we use unaligned checkpoints, and state size around 300Gb):
 
T1: Job1 was running
T2: Job1 was savepointed, brought down and replaced with Job2.
T3: Attempts to savepoint Job2 failed (timed out). Job2 was cancelled, brought down and replaced by Job3 that was restored from extarnilized checkpoint of Job2
T3: Attempts to savepoint Job3 failed (timed out). Job3 was cancelled, brought down and replaced by Job4 that was restored from extarnilized checkpoint of Job3
T4: We realized that jobs were timing out to savepoint due to local disk throttling. We provisioned disk with more throughput and IO. Job4 was cancelled, Job4 was deployed and restored from externilized checkpoint of Job3, but failed as it couldn't find some files in the folder that belongs to the checkpoint of Job1
T5: We tried to redeploy and restore from checkpoints of Job3 and Job2, but all the attempts failed on reading files from the folder that belongs to the checkpoint of Job1

We checked the content of the folder containing checkpoints of Job1, and it has files. Not sure what is pointing tho missing files and what could've removed them.

Any way we can figure out what could've happened? Is there a tool that can read the checkpoint and check whether it is valid?

Alex

On Thu, Jun 3, 2021 at 2:12 PM Alexander Filipchik <[hidden email]> wrote:
On the checkpoints -> what kind of issues should I check for? I was looking for metrics and it looks like they were reporting successful checkpoints. It looks like some files were removed in the shared folder, but I'm not sure how to check for what caused it.

Savepoints were failing due to savepoint timeout timeout. Based on metrics, our attached disks were not fast enough (GCS regional disks are network disks and were throttled). The team cancelled the savepoint and just killed the kubernetes cluster. I assume some checkpoints were interrupted as the job triggers them one after another.

Is there a known issue with termination during running checkpoint?

Btw, we use the Flink Kube operator from Lyft.

Alex

On Thu, Jun 3, 2021 at 1:24 AM Chesnay Schepler <[hidden email]> wrote:
Is there anything in the Flink logs indicating issues with writing the checkpoint data?
When the savepoint could not be created, was anything logged from Flink? How did you shut down the cluster?

On 6/3/2021 5:56 AM, Alexander Filipchik wrote:
Hi,

Trying to figure out what happened with our Flink job. We use flink 1.11.1 and run a job with unaligned checkpoints and Rocks Db backend. The whole state is around 300Gb judging by the size of savepoints.

The job ran ok. At some point we tried to deploy new code, but we couldn't take a save point as they were timing out. It looks like the reason it was timing out was due to disk throttle (we use google regional disks).
The new code was deployed using an externalized checkpoint, but it didn't start as job was failing with:

Caused by: java.io.FileNotFoundException: Item not found: 'gs://../app/checkpoints/2834fa1c81dcf7c9578a8be9a371b0d1/shared/3477b236-fb4b-4a0d-be73-cb6fac62c007'. Note, it is possible that the live version is still available but the requested generation is deleted.
    at com.google.cloud.hadoop.gcsio.GoogleCloudStorageExceptions.createFileNotFoundException(GoogleCloudStorageExceptions.java:45)
    at com.google.cloud.hadoop.gcsio.GoogleCloudStorageImpl.open(GoogleCloudStorageImpl.java:653)
    at com.google.cloud.hadoop.gcsio.GoogleCloudStorageFileSystem.open(GoogleCloudStorageFileSystem.java:277)
    at com.google.cloud.hadoop.fs.gcs.GoogleHadoopFSInputStream.<init>(GoogleHadoopFSInputStream.java:78)
    at com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemBase.open(GoogleHadoopFileSystemBase.java:620)
    at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:950)
    at com.css.flink.fs.gcs.moved.HadoopFileSystem.open(HadoopFileSystem.java:120)
    at com.css.flink.fs.gcs.moved.HadoopFileSystem.open(HadoopFileSystem.java:37)
    at org.apache.flink.core.fs.PluginFileSystemFactory$ClassLoaderFixingFileSystem.open(PluginFileSystemFactory.java:127)
    at org.apache.flink.core.fs.SafetyNetWrapperFileSystem.open(SafetyNetWrapperFileSystem.java:85)
    at org.apache.flink.runtime.state.filesystem.FileStateHandle.openInputStream(FileStateHandle.java:69)
    at org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.downloadDataForStateHandle(RocksDBStateDownloader.java:126)
    at org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.lambda$createDownloadRunnables$0(RocksDBStateDownloader.java:109)
    at org.apache.flink.util.function.ThrowingRunnable.lambda$unchecked$0(ThrowingRunnable.java:50)
    at java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1640)
    at org.apache.flink.runtime.concurrent.DirectExecutorService.execute(DirectExecutorService.java:211)
    at java.util.concurrent.CompletableFuture.asyncRunStage(CompletableFuture.java:1654)
    at java.util.concurrent.CompletableFuture.runAsync(CompletableFuture.java:1871)
    at org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.downloadDataForAllStateHandles(RocksDBStateDownloader.java:83)
    at org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.transferAllStateDataToDirectory(RocksDBStateDownloader.java:66)
    at org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.transferRemoteStateToLocalDirectory(RocksDBIncrementalRestoreOperation.java:230)
    at org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreFromRemoteState(RocksDBIncrementalRestoreOperation.java:195)
    at org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreWithoutRescaling(RocksDBIncrementalRestoreOperation.java:169)
    at org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restore(RocksDBIncrementalRestoreOperation.java:155)
    at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:270)
    ... 15 more
We tried to roll back the code, we tried different checkpoints, but all the attempts failed with the same error. The job ID in the error is not from the same checkpoint path, it looks like restore logic
looks back at previous jobs, as all the checkpoints after 2834fa1c81dcf7c9578a8be9a371b0d1 are failing to restore with the same error.
We looked at different checkpoints and found that some of them miss metadata file and can't be used for restoration.
We also use ZK for HA, and we cleaned up the state there between deployments to make sure the non existent file
is not coming from there.
We decided to drop the state as we have means to repopulate it, but it would be great to get to the bottom of it. Any help will be appreciated.

Alex


Reply | Threaded
Open this post in threaded view
|

Re: Corrupted unaligned checkpoints in Flink 1.11.1

Alexander Filipchik
Small correction, in T4 and T5 I mean Job2, not Job 1 (as job 1 was save pointed).

Thank you,
Alex

On Jun 4, 2021, at 3:07 PM, Alexander Filipchik <[hidden email]> wrote:


Looked through the logs and didn't see anything fishy that indicated an exception during checkpointing.
To make it clearer, here is the timeline (we use unaligned checkpoints, and state size around 300Gb):
 
T1: Job1 was running
T2: Job1 was savepointed, brought down and replaced with Job2.
T3: Attempts to savepoint Job2 failed (timed out). Job2 was cancelled, brought down and replaced by Job3 that was restored from extarnilized checkpoint of Job2
T3: Attempts to savepoint Job3 failed (timed out). Job3 was cancelled, brought down and replaced by Job4 that was restored from extarnilized checkpoint of Job3
T4: We realized that jobs were timing out to savepoint due to local disk throttling. We provisioned disk with more throughput and IO. Job4 was cancelled, Job4 was deployed and restored from externilized checkpoint of Job3, but failed as it couldn't find some files in the folder that belongs to the checkpoint of Job1
T5: We tried to redeploy and restore from checkpoints of Job3 and Job2, but all the attempts failed on reading files from the folder that belongs to the checkpoint of Job1

We checked the content of the folder containing checkpoints of Job1, and it has files. Not sure what is pointing tho missing files and what could've removed them.

Any way we can figure out what could've happened? Is there a tool that can read the checkpoint and check whether it is valid?

Alex

On Thu, Jun 3, 2021 at 2:12 PM Alexander Filipchik <[hidden email]> wrote:
On the checkpoints -> what kind of issues should I check for? I was looking for metrics and it looks like they were reporting successful checkpoints. It looks like some files were removed in the shared folder, but I'm not sure how to check for what caused it.

Savepoints were failing due to savepoint timeout timeout. Based on metrics, our attached disks were not fast enough (GCS regional disks are network disks and were throttled). The team cancelled the savepoint and just killed the kubernetes cluster. I assume some checkpoints were interrupted as the job triggers them one after another.

Is there a known issue with termination during running checkpoint?

Btw, we use the Flink Kube operator from Lyft.

Alex

On Thu, Jun 3, 2021 at 1:24 AM Chesnay Schepler <[hidden email]> wrote:
Is there anything in the Flink logs indicating issues with writing the checkpoint data?
When the savepoint could not be created, was anything logged from Flink? How did you shut down the cluster?

On 6/3/2021 5:56 AM, Alexander Filipchik wrote:
Hi,

Trying to figure out what happened with our Flink job. We use flink 1.11.1 and run a job with unaligned checkpoints and Rocks Db backend. The whole state is around 300Gb judging by the size of savepoints.

The job ran ok. At some point we tried to deploy new code, but we couldn't take a save point as they were timing out. It looks like the reason it was timing out was due to disk throttle (we use google regional disks).
The new code was deployed using an externalized checkpoint, but it didn't start as job was failing with:

Caused by: java.io.FileNotFoundException: Item not found: 'gs://../app/checkpoints/2834fa1c81dcf7c9578a8be9a371b0d1/shared/3477b236-fb4b-4a0d-be73-cb6fac62c007'. Note, it is possible that the live version is still available but the requested generation is deleted.
    at com.google.cloud.hadoop.gcsio.GoogleCloudStorageExceptions.createFileNotFoundException(GoogleCloudStorageExceptions.java:45)
    at com.google.cloud.hadoop.gcsio.GoogleCloudStorageImpl.open(GoogleCloudStorageImpl.java:653)
    at com.google.cloud.hadoop.gcsio.GoogleCloudStorageFileSystem.open(GoogleCloudStorageFileSystem.java:277)
    at com.google.cloud.hadoop.fs.gcs.GoogleHadoopFSInputStream.<init>(GoogleHadoopFSInputStream.java:78)
    at com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemBase.open(GoogleHadoopFileSystemBase.java:620)
    at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:950)
    at com.css.flink.fs.gcs.moved.HadoopFileSystem.open(HadoopFileSystem.java:120)
    at com.css.flink.fs.gcs.moved.HadoopFileSystem.open(HadoopFileSystem.java:37)
    at org.apache.flink.core.fs.PluginFileSystemFactory$ClassLoaderFixingFileSystem.open(PluginFileSystemFactory.java:127)
    at org.apache.flink.core.fs.SafetyNetWrapperFileSystem.open(SafetyNetWrapperFileSystem.java:85)
    at org.apache.flink.runtime.state.filesystem.FileStateHandle.openInputStream(FileStateHandle.java:69)
    at org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.downloadDataForStateHandle(RocksDBStateDownloader.java:126)
    at org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.lambda$createDownloadRunnables$0(RocksDBStateDownloader.java:109)
    at org.apache.flink.util.function.ThrowingRunnable.lambda$unchecked$0(ThrowingRunnable.java:50)
    at java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1640)
    at org.apache.flink.runtime.concurrent.DirectExecutorService.execute(DirectExecutorService.java:211)
    at java.util.concurrent.CompletableFuture.asyncRunStage(CompletableFuture.java:1654)
    at java.util.concurrent.CompletableFuture.runAsync(CompletableFuture.java:1871)
    at org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.downloadDataForAllStateHandles(RocksDBStateDownloader.java:83)
    at org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.transferAllStateDataToDirectory(RocksDBStateDownloader.java:66)
    at org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.transferRemoteStateToLocalDirectory(RocksDBIncrementalRestoreOperation.java:230)
    at org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreFromRemoteState(RocksDBIncrementalRestoreOperation.java:195)
    at org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreWithoutRescaling(RocksDBIncrementalRestoreOperation.java:169)
    at org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restore(RocksDBIncrementalRestoreOperation.java:155)
    at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:270)
    ... 15 more
We tried to roll back the code, we tried different checkpoints, but all the attempts failed with the same error. The job ID in the error is not from the same checkpoint path, it looks like restore logic
looks back at previous jobs, as all the checkpoints after 2834fa1c81dcf7c9578a8be9a371b0d1 are failing to restore with the same error.
We looked at different checkpoints and found that some of them miss metadata file and can't be used for restoration.
We also use ZK for HA, and we cleaned up the state there between deployments to make sure the non existent file
is not coming from there.
We decided to drop the state as we have means to repopulate it, but it would be great to get to the bottom of it. Any help will be appreciated.

Alex


Reply | Threaded
Open this post in threaded view
|

Re: Corrupted unaligned checkpoints in Flink 1.11.1

Piotr Nowojski-4
Hi Alex,

A quick question. Are you using incremental checkpoints? 

Best, Piotrek

sob., 5 cze 2021 o 21:23 <[hidden email]> napisał(a):
Small correction, in T4 and T5 I mean Job2, not Job 1 (as job 1 was save pointed).

Thank you,
Alex

On Jun 4, 2021, at 3:07 PM, Alexander Filipchik <[hidden email]> wrote:


Looked through the logs and didn't see anything fishy that indicated an exception during checkpointing.
To make it clearer, here is the timeline (we use unaligned checkpoints, and state size around 300Gb):
 
T1: Job1 was running
T2: Job1 was savepointed, brought down and replaced with Job2.
T3: Attempts to savepoint Job2 failed (timed out). Job2 was cancelled, brought down and replaced by Job3 that was restored from extarnilized checkpoint of Job2
T3: Attempts to savepoint Job3 failed (timed out). Job3 was cancelled, brought down and replaced by Job4 that was restored from extarnilized checkpoint of Job3
T4: We realized that jobs were timing out to savepoint due to local disk throttling. We provisioned disk with more throughput and IO. Job4 was cancelled, Job4 was deployed and restored from externilized checkpoint of Job3, but failed as it couldn't find some files in the folder that belongs to the checkpoint of Job1
T5: We tried to redeploy and restore from checkpoints of Job3 and Job2, but all the attempts failed on reading files from the folder that belongs to the checkpoint of Job1

We checked the content of the folder containing checkpoints of Job1, and it has files. Not sure what is pointing tho missing files and what could've removed them.

Any way we can figure out what could've happened? Is there a tool that can read the checkpoint and check whether it is valid?

Alex

On Thu, Jun 3, 2021 at 2:12 PM Alexander Filipchik <[hidden email]> wrote:
On the checkpoints -> what kind of issues should I check for? I was looking for metrics and it looks like they were reporting successful checkpoints. It looks like some files were removed in the shared folder, but I'm not sure how to check for what caused it.

Savepoints were failing due to savepoint timeout timeout. Based on metrics, our attached disks were not fast enough (GCS regional disks are network disks and were throttled). The team cancelled the savepoint and just killed the kubernetes cluster. I assume some checkpoints were interrupted as the job triggers them one after another.

Is there a known issue with termination during running checkpoint?

Btw, we use the Flink Kube operator from Lyft.

Alex

On Thu, Jun 3, 2021 at 1:24 AM Chesnay Schepler <[hidden email]> wrote:
Is there anything in the Flink logs indicating issues with writing the checkpoint data?
When the savepoint could not be created, was anything logged from Flink? How did you shut down the cluster?

On 6/3/2021 5:56 AM, Alexander Filipchik wrote:
Hi,

Trying to figure out what happened with our Flink job. We use flink 1.11.1 and run a job with unaligned checkpoints and Rocks Db backend. The whole state is around 300Gb judging by the size of savepoints.

The job ran ok. At some point we tried to deploy new code, but we couldn't take a save point as they were timing out. It looks like the reason it was timing out was due to disk throttle (we use google regional disks).
The new code was deployed using an externalized checkpoint, but it didn't start as job was failing with:

Caused by: java.io.FileNotFoundException: Item not found: 'gs://../app/checkpoints/2834fa1c81dcf7c9578a8be9a371b0d1/shared/3477b236-fb4b-4a0d-be73-cb6fac62c007'. Note, it is possible that the live version is still available but the requested generation is deleted.
    at com.google.cloud.hadoop.gcsio.GoogleCloudStorageExceptions.createFileNotFoundException(GoogleCloudStorageExceptions.java:45)
    at com.google.cloud.hadoop.gcsio.GoogleCloudStorageImpl.open(GoogleCloudStorageImpl.java:653)
    at com.google.cloud.hadoop.gcsio.GoogleCloudStorageFileSystem.open(GoogleCloudStorageFileSystem.java:277)
    at com.google.cloud.hadoop.fs.gcs.GoogleHadoopFSInputStream.<init>(GoogleHadoopFSInputStream.java:78)
    at com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemBase.open(GoogleHadoopFileSystemBase.java:620)
    at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:950)
    at com.css.flink.fs.gcs.moved.HadoopFileSystem.open(HadoopFileSystem.java:120)
    at com.css.flink.fs.gcs.moved.HadoopFileSystem.open(HadoopFileSystem.java:37)
    at org.apache.flink.core.fs.PluginFileSystemFactory$ClassLoaderFixingFileSystem.open(PluginFileSystemFactory.java:127)
    at org.apache.flink.core.fs.SafetyNetWrapperFileSystem.open(SafetyNetWrapperFileSystem.java:85)
    at org.apache.flink.runtime.state.filesystem.FileStateHandle.openInputStream(FileStateHandle.java:69)
    at org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.downloadDataForStateHandle(RocksDBStateDownloader.java:126)
    at org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.lambda$createDownloadRunnables$0(RocksDBStateDownloader.java:109)
    at org.apache.flink.util.function.ThrowingRunnable.lambda$unchecked$0(ThrowingRunnable.java:50)
    at java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1640)
    at org.apache.flink.runtime.concurrent.DirectExecutorService.execute(DirectExecutorService.java:211)
    at java.util.concurrent.CompletableFuture.asyncRunStage(CompletableFuture.java:1654)
    at java.util.concurrent.CompletableFuture.runAsync(CompletableFuture.java:1871)
    at org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.downloadDataForAllStateHandles(RocksDBStateDownloader.java:83)
    at org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.transferAllStateDataToDirectory(RocksDBStateDownloader.java:66)
    at org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.transferRemoteStateToLocalDirectory(RocksDBIncrementalRestoreOperation.java:230)
    at org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreFromRemoteState(RocksDBIncrementalRestoreOperation.java:195)
    at org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreWithoutRescaling(RocksDBIncrementalRestoreOperation.java:169)
    at org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restore(RocksDBIncrementalRestoreOperation.java:155)
    at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:270)
    ... 15 more
We tried to roll back the code, we tried different checkpoints, but all the attempts failed with the same error. The job ID in the error is not from the same checkpoint path, it looks like restore logic
looks back at previous jobs, as all the checkpoints after 2834fa1c81dcf7c9578a8be9a371b0d1 are failing to restore with the same error.
We looked at different checkpoints and found that some of them miss metadata file and can't be used for restoration.
We also use ZK for HA, and we cleaned up the state there between deployments to make sure the non existent file
is not coming from there.
We decided to drop the state as we have means to repopulate it, but it would be great to get to the bottom of it. Any help will be appreciated.

Alex


Reply | Threaded
Open this post in threaded view
|

Re: Corrupted unaligned checkpoints in Flink 1.11.1

Piotr Nowojski-4
Re-adding user mailing list

Hey Alex,

In that case I can see two scenarios that could lead to missing files. Keep in mind that incremental checkpoints are referencing previous checkpoints in order to minimise the size of the checkpoint (roughly speaking only changes since the previous checkpoint are being persisted/uploaded/written). Checkpoint number 42, can reference an arbitrary number of previous checkpoints. I suspect that somehow, some of those previously referenced checkpoints got deleted and removed. Also keep in mind that savepoints (as of now) are never incremental, they are always full checkpoints. However externalised checkpoints can be incremental. Back to the scenarios:
1. You might have accidentally removed some older checkpoints from your Job2, maybe thinking they are no longer needed. Maybe you have just kept this single externalised checkpoint directory from steps T3 or T4, disregarding that this externalised checkpoint might be referencing previous checkpoints of Job2?
2. As I mentioned, Flink is automatically maintaining reference counts of the used files and deletes them when they are no longer used/referenced. However this works only within a single job/cluster. For example if between steps T3 and T4, you restarted Job2 and let it run for a bit, it could take more checkpoints that would subsume files that were still part of the externalised checkpoint that you previously used to start Job3/Job4. Job2 would have no idea that Job3/Job4 exist, let alone that they are referencing some files from Job2, and those files could have been deleted as soon as Job2 was no longer using/referencing them.

Could one of those happen in your case?

Best, Piotrek

pon., 7 cze 2021 o 20:01 Alexander Filipchik <[hidden email]> napisał(a):
Yes, we do use incremental checkpoints.

Alex

On Mon, Jun 7, 2021 at 3:12 AM Piotr Nowojski <[hidden email]> wrote:
Hi Alex,

A quick question. Are you using incremental checkpoints? 

Best, Piotrek

sob., 5 cze 2021 o 21:23 <[hidden email]> napisał(a):
Small correction, in T4 and T5 I mean Job2, not Job 1 (as job 1 was save pointed).

Thank you,
Alex

On Jun 4, 2021, at 3:07 PM, Alexander Filipchik <[hidden email]> wrote:


Looked through the logs and didn't see anything fishy that indicated an exception during checkpointing.
To make it clearer, here is the timeline (we use unaligned checkpoints, and state size around 300Gb):
 
T1: Job1 was running
T2: Job1 was savepointed, brought down and replaced with Job2.
T3: Attempts to savepoint Job2 failed (timed out). Job2 was cancelled, brought down and replaced by Job3 that was restored from extarnilized checkpoint of Job2
T3: Attempts to savepoint Job3 failed (timed out). Job3 was cancelled, brought down and replaced by Job4 that was restored from extarnilized checkpoint of Job3
T4: We realized that jobs were timing out to savepoint due to local disk throttling. We provisioned disk with more throughput and IO. Job4 was cancelled, Job4 was deployed and restored from externilized checkpoint of Job3, but failed as it couldn't find some files in the folder that belongs to the checkpoint of Job1
T5: We tried to redeploy and restore from checkpoints of Job3 and Job2, but all the attempts failed on reading files from the folder that belongs to the checkpoint of Job1

We checked the content of the folder containing checkpoints of Job1, and it has files. Not sure what is pointing tho missing files and what could've removed them.

Any way we can figure out what could've happened? Is there a tool that can read the checkpoint and check whether it is valid?

Alex

On Thu, Jun 3, 2021 at 2:12 PM Alexander Filipchik <[hidden email]> wrote:
On the checkpoints -> what kind of issues should I check for? I was looking for metrics and it looks like they were reporting successful checkpoints. It looks like some files were removed in the shared folder, but I'm not sure how to check for what caused it.

Savepoints were failing due to savepoint timeout timeout. Based on metrics, our attached disks were not fast enough (GCS regional disks are network disks and were throttled). The team cancelled the savepoint and just killed the kubernetes cluster. I assume some checkpoints were interrupted as the job triggers them one after another.

Is there a known issue with termination during running checkpoint?

Btw, we use the Flink Kube operator from Lyft.

Alex

On Thu, Jun 3, 2021 at 1:24 AM Chesnay Schepler <[hidden email]> wrote:
Is there anything in the Flink logs indicating issues with writing the checkpoint data?
When the savepoint could not be created, was anything logged from Flink? How did you shut down the cluster?

On 6/3/2021 5:56 AM, Alexander Filipchik wrote:
Hi,

Trying to figure out what happened with our Flink job. We use flink 1.11.1 and run a job with unaligned checkpoints and Rocks Db backend. The whole state is around 300Gb judging by the size of savepoints.

The job ran ok. At some point we tried to deploy new code, but we couldn't take a save point as they were timing out. It looks like the reason it was timing out was due to disk throttle (we use google regional disks).
The new code was deployed using an externalized checkpoint, but it didn't start as job was failing with:

Caused by: java.io.FileNotFoundException: Item not found: 'gs://../app/checkpoints/2834fa1c81dcf7c9578a8be9a371b0d1/shared/3477b236-fb4b-4a0d-be73-cb6fac62c007'. Note, it is possible that the live version is still available but the requested generation is deleted.
    at com.google.cloud.hadoop.gcsio.GoogleCloudStorageExceptions.createFileNotFoundException(GoogleCloudStorageExceptions.java:45)
    at com.google.cloud.hadoop.gcsio.GoogleCloudStorageImpl.open(GoogleCloudStorageImpl.java:653)
    at com.google.cloud.hadoop.gcsio.GoogleCloudStorageFileSystem.open(GoogleCloudStorageFileSystem.java:277)
    at com.google.cloud.hadoop.fs.gcs.GoogleHadoopFSInputStream.<init>(GoogleHadoopFSInputStream.java:78)
    at com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemBase.open(GoogleHadoopFileSystemBase.java:620)
    at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:950)
    at com.css.flink.fs.gcs.moved.HadoopFileSystem.open(HadoopFileSystem.java:120)
    at com.css.flink.fs.gcs.moved.HadoopFileSystem.open(HadoopFileSystem.java:37)
    at org.apache.flink.core.fs.PluginFileSystemFactory$ClassLoaderFixingFileSystem.open(PluginFileSystemFactory.java:127)
    at org.apache.flink.core.fs.SafetyNetWrapperFileSystem.open(SafetyNetWrapperFileSystem.java:85)
    at org.apache.flink.runtime.state.filesystem.FileStateHandle.openInputStream(FileStateHandle.java:69)
    at org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.downloadDataForStateHandle(RocksDBStateDownloader.java:126)
    at org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.lambda$createDownloadRunnables$0(RocksDBStateDownloader.java:109)
    at org.apache.flink.util.function.ThrowingRunnable.lambda$unchecked$0(ThrowingRunnable.java:50)
    at java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1640)
    at org.apache.flink.runtime.concurrent.DirectExecutorService.execute(DirectExecutorService.java:211)
    at java.util.concurrent.CompletableFuture.asyncRunStage(CompletableFuture.java:1654)
    at java.util.concurrent.CompletableFuture.runAsync(CompletableFuture.java:1871)
    at org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.downloadDataForAllStateHandles(RocksDBStateDownloader.java:83)
    at org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.transferAllStateDataToDirectory(RocksDBStateDownloader.java:66)
    at org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.transferRemoteStateToLocalDirectory(RocksDBIncrementalRestoreOperation.java:230)
    at org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreFromRemoteState(RocksDBIncrementalRestoreOperation.java:195)
    at org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreWithoutRescaling(RocksDBIncrementalRestoreOperation.java:169)
    at org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restore(RocksDBIncrementalRestoreOperation.java:155)
    at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:270)
    ... 15 more
We tried to roll back the code, we tried different checkpoints, but all the attempts failed with the same error. The job ID in the error is not from the same checkpoint path, it looks like restore logic
looks back at previous jobs, as all the checkpoints after 2834fa1c81dcf7c9578a8be9a371b0d1 are failing to restore with the same error.
We looked at different checkpoints and found that some of them miss metadata file and can't be used for restoration.
We also use ZK for HA, and we cleaned up the state there between deployments to make sure the non existent file
is not coming from there.
We decided to drop the state as we have means to repopulate it, but it would be great to get to the bottom of it. Any help will be appreciated.

Alex


Reply | Threaded
Open this post in threaded view
|

Re: Corrupted unaligned checkpoints in Flink 1.11.1

Alexander Filipchik
Did some more digging.
1) is not an option as we are not doing any cleanups at the moment. We keep the last 4 checkpoints per job + all the savepoints.
2) I looked at job deployments that happened 1 week before the incident. We have 23 deployments in total and each resulted in a unique job id. I also looked at job specific metrics and I don't see any evidence of overlapping checkpointing. There is exactly 1 checkpoint per application, every time it has a different job id and every time once a new job checkpoints there are now checkpoints from previous job id.

A bit of a mystery. Is there a way to at least catch it in the future? Any additional telemetry (logs, metrics) we can extract to better understand what is happening.

Alex

On Tue, Jun 8, 2021 at 12:01 AM Piotr Nowojski <[hidden email]> wrote:
Re-adding user mailing list

Hey Alex,

In that case I can see two scenarios that could lead to missing files. Keep in mind that incremental checkpoints are referencing previous checkpoints in order to minimise the size of the checkpoint (roughly speaking only changes since the previous checkpoint are being persisted/uploaded/written). Checkpoint number 42, can reference an arbitrary number of previous checkpoints. I suspect that somehow, some of those previously referenced checkpoints got deleted and removed. Also keep in mind that savepoints (as of now) are never incremental, they are always full checkpoints. However externalised checkpoints can be incremental. Back to the scenarios:
1. You might have accidentally removed some older checkpoints from your Job2, maybe thinking they are no longer needed. Maybe you have just kept this single externalised checkpoint directory from steps T3 or T4, disregarding that this externalised checkpoint might be referencing previous checkpoints of Job2?
2. As I mentioned, Flink is automatically maintaining reference counts of the used files and deletes them when they are no longer used/referenced. However this works only within a single job/cluster. For example if between steps T3 and T4, you restarted Job2 and let it run for a bit, it could take more checkpoints that would subsume files that were still part of the externalised checkpoint that you previously used to start Job3/Job4. Job2 would have no idea that Job3/Job4 exist, let alone that they are referencing some files from Job2, and those files could have been deleted as soon as Job2 was no longer using/referencing them.

Could one of those happen in your case?

Best, Piotrek

pon., 7 cze 2021 o 20:01 Alexander Filipchik <[hidden email]> napisał(a):
Yes, we do use incremental checkpoints.

Alex

On Mon, Jun 7, 2021 at 3:12 AM Piotr Nowojski <[hidden email]> wrote:
Hi Alex,

A quick question. Are you using incremental checkpoints? 

Best, Piotrek

sob., 5 cze 2021 o 21:23 <[hidden email]> napisał(a):
Small correction, in T4 and T5 I mean Job2, not Job 1 (as job 1 was save pointed).

Thank you,
Alex

On Jun 4, 2021, at 3:07 PM, Alexander Filipchik <[hidden email]> wrote:


Looked through the logs and didn't see anything fishy that indicated an exception during checkpointing.
To make it clearer, here is the timeline (we use unaligned checkpoints, and state size around 300Gb):
 
T1: Job1 was running
T2: Job1 was savepointed, brought down and replaced with Job2.
T3: Attempts to savepoint Job2 failed (timed out). Job2 was cancelled, brought down and replaced by Job3 that was restored from extarnilized checkpoint of Job2
T3: Attempts to savepoint Job3 failed (timed out). Job3 was cancelled, brought down and replaced by Job4 that was restored from extarnilized checkpoint of Job3
T4: We realized that jobs were timing out to savepoint due to local disk throttling. We provisioned disk with more throughput and IO. Job4 was cancelled, Job4 was deployed and restored from externilized checkpoint of Job3, but failed as it couldn't find some files in the folder that belongs to the checkpoint of Job1
T5: We tried to redeploy and restore from checkpoints of Job3 and Job2, but all the attempts failed on reading files from the folder that belongs to the checkpoint of Job1

We checked the content of the folder containing checkpoints of Job1, and it has files. Not sure what is pointing tho missing files and what could've removed them.

Any way we can figure out what could've happened? Is there a tool that can read the checkpoint and check whether it is valid?

Alex

On Thu, Jun 3, 2021 at 2:12 PM Alexander Filipchik <[hidden email]> wrote:
On the checkpoints -> what kind of issues should I check for? I was looking for metrics and it looks like they were reporting successful checkpoints. It looks like some files were removed in the shared folder, but I'm not sure how to check for what caused it.

Savepoints were failing due to savepoint timeout timeout. Based on metrics, our attached disks were not fast enough (GCS regional disks are network disks and were throttled). The team cancelled the savepoint and just killed the kubernetes cluster. I assume some checkpoints were interrupted as the job triggers them one after another.

Is there a known issue with termination during running checkpoint?

Btw, we use the Flink Kube operator from Lyft.

Alex

On Thu, Jun 3, 2021 at 1:24 AM Chesnay Schepler <[hidden email]> wrote:
Is there anything in the Flink logs indicating issues with writing the checkpoint data?
When the savepoint could not be created, was anything logged from Flink? How did you shut down the cluster?

On 6/3/2021 5:56 AM, Alexander Filipchik wrote:
Hi,

Trying to figure out what happened with our Flink job. We use flink 1.11.1 and run a job with unaligned checkpoints and Rocks Db backend. The whole state is around 300Gb judging by the size of savepoints.

The job ran ok. At some point we tried to deploy new code, but we couldn't take a save point as they were timing out. It looks like the reason it was timing out was due to disk throttle (we use google regional disks).
The new code was deployed using an externalized checkpoint, but it didn't start as job was failing with:

Caused by: java.io.FileNotFoundException: Item not found: 'gs://../app/checkpoints/2834fa1c81dcf7c9578a8be9a371b0d1/shared/3477b236-fb4b-4a0d-be73-cb6fac62c007'. Note, it is possible that the live version is still available but the requested generation is deleted.
    at com.google.cloud.hadoop.gcsio.GoogleCloudStorageExceptions.createFileNotFoundException(GoogleCloudStorageExceptions.java:45)
    at com.google.cloud.hadoop.gcsio.GoogleCloudStorageImpl.open(GoogleCloudStorageImpl.java:653)
    at com.google.cloud.hadoop.gcsio.GoogleCloudStorageFileSystem.open(GoogleCloudStorageFileSystem.java:277)
    at com.google.cloud.hadoop.fs.gcs.GoogleHadoopFSInputStream.<init>(GoogleHadoopFSInputStream.java:78)
    at com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemBase.open(GoogleHadoopFileSystemBase.java:620)
    at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:950)
    at com.css.flink.fs.gcs.moved.HadoopFileSystem.open(HadoopFileSystem.java:120)
    at com.css.flink.fs.gcs.moved.HadoopFileSystem.open(HadoopFileSystem.java:37)
    at org.apache.flink.core.fs.PluginFileSystemFactory$ClassLoaderFixingFileSystem.open(PluginFileSystemFactory.java:127)
    at org.apache.flink.core.fs.SafetyNetWrapperFileSystem.open(SafetyNetWrapperFileSystem.java:85)
    at org.apache.flink.runtime.state.filesystem.FileStateHandle.openInputStream(FileStateHandle.java:69)
    at org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.downloadDataForStateHandle(RocksDBStateDownloader.java:126)
    at org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.lambda$createDownloadRunnables$0(RocksDBStateDownloader.java:109)
    at org.apache.flink.util.function.ThrowingRunnable.lambda$unchecked$0(ThrowingRunnable.java:50)
    at java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1640)
    at org.apache.flink.runtime.concurrent.DirectExecutorService.execute(DirectExecutorService.java:211)
    at java.util.concurrent.CompletableFuture.asyncRunStage(CompletableFuture.java:1654)
    at java.util.concurrent.CompletableFuture.runAsync(CompletableFuture.java:1871)
    at org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.downloadDataForAllStateHandles(RocksDBStateDownloader.java:83)
    at org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.transferAllStateDataToDirectory(RocksDBStateDownloader.java:66)
    at org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.transferRemoteStateToLocalDirectory(RocksDBIncrementalRestoreOperation.java:230)
    at org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreFromRemoteState(RocksDBIncrementalRestoreOperation.java:195)
    at org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreWithoutRescaling(RocksDBIncrementalRestoreOperation.java:169)
    at org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restore(RocksDBIncrementalRestoreOperation.java:155)
    at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:270)
    ... 15 more
We tried to roll back the code, we tried different checkpoints, but all the attempts failed with the same error. The job ID in the error is not from the same checkpoint path, it looks like restore logic
looks back at previous jobs, as all the checkpoints after 2834fa1c81dcf7c9578a8be9a371b0d1 are failing to restore with the same error.
We looked at different checkpoints and found that some of them miss metadata file and can't be used for restoration.
We also use ZK for HA, and we cleaned up the state there between deployments to make sure the non existent file
is not coming from there.
We decided to drop the state as we have means to repopulate it, but it would be great to get to the bottom of it. Any help will be appreciated.

Alex