Flink Taskmanager failure recovery and large state

classic Classic list List threaded Threaded
7 messages Options
Reply | Threaded
Open this post in threaded view
|

Flink Taskmanager failure recovery and large state

Yaroslav Tkachenko-2
Hi everyone,

I'm wondering if people have experienced issues with Taskmanager failure recovery when dealing with a lot of state.

I'm using 1.12.0 on Kubernetes, RocksDB backend with GCS for savepoints and checkpoints. ~150 task managers with 4 slots each.

When I run a pipeline without much state and kill one of the taskmanagers, it takes a few minutes to recover (I see a few restarts), but eventually when a new replacement taskmanager is registered with the jobmanager things go back to healthy.

But when I run a pipeline with a lot of state (1TB+) and kill one of the taskmanagers, the pipeline never recovers, even after the replacement taskmanager has joined. It just enters an infinite loop of restarts and failures.

On the jobmanager, I see an endless loop of state transitions: RUNNING -> CANCELING -> CANCELED -> CREATED -> SCHEDULED -> DEPLOYING -> RUNNING. It stays in RUNNING for a few seconds, but then transitions into FAILED with a message like this:


22:28:07.338 [flink-akka.actor.default-dispatcher-239] INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph - <REDACTED> (569/624) (11cb45392108bb07d65fdd0fdc6b6741) switched from RUNNING to FAILED on 10.30.10.212:6122-ac6bba @ 10.30.10.212 (dataPort=43357).
org.apache.flink.runtime.io.network.netty.exception.LocalTransportException: readAddress(..) failed: Connection reset by peer (connection to '10.30.10.53/10.30.10.53:45789')
at org.apache.flink.runtime.io.network.netty.CreditBasedPartitionRequestClientHandler.exceptionCaught(CreditBasedPartitionRequestClientHandler.java:173) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
...
Caused by: org.apache.flink.shaded.netty4.io.netty.channel.unix.Errors$NativeIoException: readAddress(..) failed: Connection reset by peer


Which, I guess, means a failed Taskmanager. And since there are not enough task slots to run it goes into this endless loop again. It's never the same Taskmanager that fails.



On the Taskmanager side, things look more interesting. I see a variety of exceptions:


org.apache.flink.runtime.taskmanager.Task - <REDACTED> (141/624)#7 (6f3651a49344754a1e7d1fb20cf2cba3) switched from RUNNING to FAILED.
org.apache.flink.runtime.jobmaster.ExecutionGraphException: The execution attempt 6f3651a49344754a1e7d1fb20cf2cba3 was not found.


also 


WARNING: Failed read retry #1/10 for 'gs://<REDACTED>/flink-checkpoints/150a406a50d20e1ee77422d25ef28d52/shared/3e64cd74-4280-4c31-916a-fe981bf2306c'. Sleeping...
java.nio.channels.ClosedByInterruptException
at java.base/java.nio.channels.spi.AbstractInterruptibleChannel.end(Unknown Source)
at java.base/java.nio.channels.Channels$ReadableByteChannelImpl.read(Unknown Source)
at com.google.cloud.hadoop.repackaged.gcs.com.google.cloud.hadoop.gcsio.GoogleCloudStorageReadChannel.read(GoogleCloudStorageReadChannel.java:313)
at com.google.cloud.hadoop.fs.gcs.GoogleHadoopFSInputStream.read(GoogleHadoopFSInputStream.java:118)
at java.base/java.io.DataInputStream.read(Unknown Source)
at org.apache.flink.runtime.fs.hdfs.HadoopDataInputStream.read(HadoopDataInputStream.java:94)
at java.base/java.io.InputStream.read(Unknown Source)
at org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.downloadDataForStateHandle(RocksDBStateDownloader.java:135)
...


and


SEVERE: Interrupted while sleeping before retry. Giving up after 1/10 retries for 'gs://<REDACTED>/flink-checkpoints/150a406a50d20e1ee77422d25ef28d52/shared/3e64cd74-4280-4c31-916a-fe981bf2306c'
20:52:46.894 [<REDACTED> (141/624)#7] ERROR org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder - Caught unexpected exception.
java.nio.channels.ClosedChannelException: null
at sun.nio.ch.FileChannelImpl.ensureOpen(Unknown Source) ~[?:?]
at sun.nio.ch.FileChannelImpl.write(Unknown Source) ~[?:?]
at java.nio.channels.Channels.writeFullyImpl(Unknown Source) ~[?:?]
at java.nio.channels.Channels.writeFully(Unknown Source) ~[?:?]
at java.nio.channels.Channels$1.write(Unknown Source) ~[?:?]
at org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.downloadDataForStateHandle(RocksDBStateDownloader.java:140) ~[flink-dist_2.12-1.12.0.jar:1.12.0]


also


20:52:46.895 [<REDACTED> (141/624)#7] WARN  org.apache.flink.streaming.api.operators.BackendRestorerProcedure - Exception while restoring keyed state backend for KeyedProcessOperator_ff97494a101b44a4b7a2913028a50243_(141/624) from alternative (1/1), will retry while more alternatives are available.
org.apache.flink.runtime.state.BackendBuildingException: Caught unexpected exception.
at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:328) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
...


and a few of 


Caused by: org.apache.flink.util.FlinkRuntimeException: Failed to download data for state handles.
at org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.downloadDataForAllStateHandles(RocksDBStateDownloader.java:92) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
...
Caused by: java.util.concurrent.ExecutionException: java.lang.RuntimeException: no bytes written



Has anyone seen behaviour like this?


My current theory: because it needs to download a lot of state from GCS the pipeline probably experiences some sort of GCS back-off issue (150 taskmanager x 4 slots, also 4 state.backend.rocksdb.checkpoint.transfer.thread.num), probably too many read requests to the same GCS prefix? And I guess it doesn't finish in the time that's expected and randomly fails. Maybe there is some kind of timeout value I can tweak? So downloading from GCS can take time that's necessary without failing prematurely.

Any help is very appreciated!



Reply | Threaded
Open this post in threaded view
|

Re: Flink Taskmanager failure recovery and large state

Guowei Ma
Hi, Yaroslav

AFAIK there is no official GCS FileSystem support in FLINK.  Does the GCS is implemented by yourself? 
Would you like to share the whole log of jm?

BTW: From the following log I think the implementation has already some retry mechanism.
>>> Interrupted while sleeping before retry. Giving up after 1/10 retries for 'gs://<REDACTED>/flink-checkpoints/150a406a50d20e1ee77422d25ef28d
 
Best,
Guowei


On Thu, Apr 1, 2021 at 12:50 PM Yaroslav Tkachenko <[hidden email]> wrote:
Hi everyone,

I'm wondering if people have experienced issues with Taskmanager failure recovery when dealing with a lot of state.

I'm using 1.12.0 on Kubernetes, RocksDB backend with GCS for savepoints and checkpoints. ~150 task managers with 4 slots each.

When I run a pipeline without much state and kill one of the taskmanagers, it takes a few minutes to recover (I see a few restarts), but eventually when a new replacement taskmanager is registered with the jobmanager things go back to healthy.

But when I run a pipeline with a lot of state (1TB+) and kill one of the taskmanagers, the pipeline never recovers, even after the replacement taskmanager has joined. It just enters an infinite loop of restarts and failures.

On the jobmanager, I see an endless loop of state transitions: RUNNING -> CANCELING -> CANCELED -> CREATED -> SCHEDULED -> DEPLOYING -> RUNNING. It stays in RUNNING for a few seconds, but then transitions into FAILED with a message like this:


22:28:07.338 [flink-akka.actor.default-dispatcher-239] INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph - <REDACTED> (569/624) (11cb45392108bb07d65fdd0fdc6b6741) switched from RUNNING to FAILED on 10.30.10.212:6122-ac6bba @ 10.30.10.212 (dataPort=43357).
org.apache.flink.runtime.io.network.netty.exception.LocalTransportException: readAddress(..) failed: Connection reset by peer (connection to '10.30.10.53/10.30.10.53:45789')
at org.apache.flink.runtime.io.network.netty.CreditBasedPartitionRequestClientHandler.exceptionCaught(CreditBasedPartitionRequestClientHandler.java:173) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
...
Caused by: org.apache.flink.shaded.netty4.io.netty.channel.unix.Errors$NativeIoException: readAddress(..) failed: Connection reset by peer


Which, I guess, means a failed Taskmanager. And since there are not enough task slots to run it goes into this endless loop again. It's never the same Taskmanager that fails.



On the Taskmanager side, things look more interesting. I see a variety of exceptions:


org.apache.flink.runtime.taskmanager.Task - <REDACTED> (141/624)#7 (6f3651a49344754a1e7d1fb20cf2cba3) switched from RUNNING to FAILED.
org.apache.flink.runtime.jobmaster.ExecutionGraphException: The execution attempt 6f3651a49344754a1e7d1fb20cf2cba3 was not found.


also 


WARNING: Failed read retry #1/10 for 'gs://<REDACTED>/flink-checkpoints/150a406a50d20e1ee77422d25ef28d52/shared/3e64cd74-4280-4c31-916a-fe981bf2306c'. Sleeping...
java.nio.channels.ClosedByInterruptException
at java.base/java.nio.channels.spi.AbstractInterruptibleChannel.end(Unknown Source)
at java.base/java.nio.channels.Channels$ReadableByteChannelImpl.read(Unknown Source)
at com.google.cloud.hadoop.repackaged.gcs.com.google.cloud.hadoop.gcsio.GoogleCloudStorageReadChannel.read(GoogleCloudStorageReadChannel.java:313)
at com.google.cloud.hadoop.fs.gcs.GoogleHadoopFSInputStream.read(GoogleHadoopFSInputStream.java:118)
at java.base/java.io.DataInputStream.read(Unknown Source)
at org.apache.flink.runtime.fs.hdfs.HadoopDataInputStream.read(HadoopDataInputStream.java:94)
at java.base/java.io.InputStream.read(Unknown Source)
at org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.downloadDataForStateHandle(RocksDBStateDownloader.java:135)
...


and


SEVERE: Interrupted while sleeping before retry. Giving up after 1/10 retries for 'gs://<REDACTED>/flink-checkpoints/150a406a50d20e1ee77422d25ef28d52/shared/3e64cd74-4280-4c31-916a-fe981bf2306c'
20:52:46.894 [<REDACTED> (141/624)#7] ERROR org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder - Caught unexpected exception.
java.nio.channels.ClosedChannelException: null
at sun.nio.ch.FileChannelImpl.ensureOpen(Unknown Source) ~[?:?]
at sun.nio.ch.FileChannelImpl.write(Unknown Source) ~[?:?]
at java.nio.channels.Channels.writeFullyImpl(Unknown Source) ~[?:?]
at java.nio.channels.Channels.writeFully(Unknown Source) ~[?:?]
at java.nio.channels.Channels$1.write(Unknown Source) ~[?:?]
at org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.downloadDataForStateHandle(RocksDBStateDownloader.java:140) ~[flink-dist_2.12-1.12.0.jar:1.12.0]


also


20:52:46.895 [<REDACTED> (141/624)#7] WARN  org.apache.flink.streaming.api.operators.BackendRestorerProcedure - Exception while restoring keyed state backend for KeyedProcessOperator_ff97494a101b44a4b7a2913028a50243_(141/624) from alternative (1/1), will retry while more alternatives are available.
org.apache.flink.runtime.state.BackendBuildingException: Caught unexpected exception.
at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:328) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
...


and a few of 


Caused by: org.apache.flink.util.FlinkRuntimeException: Failed to download data for state handles.
at org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.downloadDataForAllStateHandles(RocksDBStateDownloader.java:92) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
...
Caused by: java.util.concurrent.ExecutionException: java.lang.RuntimeException: no bytes written



Has anyone seen behaviour like this?


My current theory: because it needs to download a lot of state from GCS the pipeline probably experiences some sort of GCS back-off issue (150 taskmanager x 4 slots, also 4 state.backend.rocksdb.checkpoint.transfer.thread.num), probably too many read requests to the same GCS prefix? And I guess it doesn't finish in the time that's expected and randomly fails. Maybe there is some kind of timeout value I can tweak? So downloading from GCS can take time that's necessary without failing prematurely.

Any help is very appreciated!



Reply | Threaded
Open this post in threaded view
|

Re: Flink Taskmanager failure recovery and large state

Yaroslav Tkachenko-2
Hi Guowei,

I thought Flink can support any HDFS-compatible object store like the majority of Big Data frameworks. So we just added "flink-shaded-hadoop-2-uber" and "gcs-connector-latest-hadoop2" dependencies to the classpath, after that using "gs" prefix seems to be possible:

state.checkpoints.dir: gs://<REDACTED>/flink-checkpoints
state.savepoints.dir: gs://<REDACTED>/flink-savepoints

And yes, I noticed that retries logging too, but I'm not sure if it's implemented on the Flink side or the GCS connector side? Probably need to dive deeper into the source code. And if it's implemented on the GCS connector side, will Flink wait for all the retries? That's why I asked about the potential timeout on the Flink side.

The JM log doesn't have much besides from what I already posted. It's hard for me to share the whole log, but the RocksDB initialization part can be relevant:

16:03:41.987 [cluster-io-thread-3] INFO  org.apache.flink.runtime.jobmaster.JobMaster - Using job/cluster config to configure application-defined state backend: RocksDBStateBackend{checkpointStreamBackend=File State Backend (checkpoints: 'gs://<REDACTED>/flink-checkpoints', savepoints: 'gs://<REDACTED>/flink-savepoints', asynchronous: TRUE, fileStateThreshold: 1048576), localRocksDbDirectories=[/rocksdb], enableIncrementalCheckpointing=TRUE, numberOfTransferThreads=4, writeBatchSize=2097152}
16:03:41.988 [cluster-io-thread-3] INFO  org.apache.flink.contrib.streaming.state.RocksDBStateBackend - Using predefined options: FLASH_SSD_OPTIMIZED.
16:03:41.988 [cluster-io-thread-3] INFO  org.apache.flink.contrib.streaming.state.RocksDBStateBackend - Using application-defined options factory: DefaultConfigurableOptionsFactory{configuredOptions={state.backend.rocksdb.thread.num=4, state.backend.rocksdb.block.blocksize=16 kb, state.backend.rocksdb.block.cache-size=64 mb}}.
16:03:41.988 [cluster-io-thread-3] INFO  org.apache.flink.runtime.jobmaster.JobMaster - Using application-defined state backend: RocksDBStateBackend{checkpointStreamBackend=File State Backend (checkpoints: 'gs://<REDACTED>/flink-checkpoints', savepoints: 'gs://<REDACTED>/flink-savepoints', asynchronous: TRUE, fileStateThreshold: 1048576), localRocksDbDirectories=[/rocksdb], enableIncrementalCheckpointing=TRUE, numberOfTransferThreads=4, writeBatchSize=2097152}

Thanks!

On Thu, Apr 1, 2021 at 2:30 AM Guowei Ma <[hidden email]> wrote:
Hi, Yaroslav

AFAIK there is no official GCS FileSystem support in FLINK.  Does the GCS is implemented by yourself? 
Would you like to share the whole log of jm?

BTW: From the following log I think the implementation has already some retry mechanism.
>>> Interrupted while sleeping before retry. Giving up after 1/10 retries for 'gs://<REDACTED>/flink-checkpoints/150a406a50d20e1ee77422d25ef28d
 
Best,
Guowei


On Thu, Apr 1, 2021 at 12:50 PM Yaroslav Tkachenko <[hidden email]> wrote:
Hi everyone,

I'm wondering if people have experienced issues with Taskmanager failure recovery when dealing with a lot of state.

I'm using 1.12.0 on Kubernetes, RocksDB backend with GCS for savepoints and checkpoints. ~150 task managers with 4 slots each.

When I run a pipeline without much state and kill one of the taskmanagers, it takes a few minutes to recover (I see a few restarts), but eventually when a new replacement taskmanager is registered with the jobmanager things go back to healthy.

But when I run a pipeline with a lot of state (1TB+) and kill one of the taskmanagers, the pipeline never recovers, even after the replacement taskmanager has joined. It just enters an infinite loop of restarts and failures.

On the jobmanager, I see an endless loop of state transitions: RUNNING -> CANCELING -> CANCELED -> CREATED -> SCHEDULED -> DEPLOYING -> RUNNING. It stays in RUNNING for a few seconds, but then transitions into FAILED with a message like this:


22:28:07.338 [flink-akka.actor.default-dispatcher-239] INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph - <REDACTED> (569/624) (11cb45392108bb07d65fdd0fdc6b6741) switched from RUNNING to FAILED on 10.30.10.212:6122-ac6bba @ 10.30.10.212 (dataPort=43357).
org.apache.flink.runtime.io.network.netty.exception.LocalTransportException: readAddress(..) failed: Connection reset by peer (connection to '10.30.10.53/10.30.10.53:45789')
at org.apache.flink.runtime.io.network.netty.CreditBasedPartitionRequestClientHandler.exceptionCaught(CreditBasedPartitionRequestClientHandler.java:173) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
...
Caused by: org.apache.flink.shaded.netty4.io.netty.channel.unix.Errors$NativeIoException: readAddress(..) failed: Connection reset by peer


Which, I guess, means a failed Taskmanager. And since there are not enough task slots to run it goes into this endless loop again. It's never the same Taskmanager that fails.



On the Taskmanager side, things look more interesting. I see a variety of exceptions:


org.apache.flink.runtime.taskmanager.Task - <REDACTED> (141/624)#7 (6f3651a49344754a1e7d1fb20cf2cba3) switched from RUNNING to FAILED.
org.apache.flink.runtime.jobmaster.ExecutionGraphException: The execution attempt 6f3651a49344754a1e7d1fb20cf2cba3 was not found.


also 


WARNING: Failed read retry #1/10 for 'gs://<REDACTED>/flink-checkpoints/150a406a50d20e1ee77422d25ef28d52/shared/3e64cd74-4280-4c31-916a-fe981bf2306c'. Sleeping...
java.nio.channels.ClosedByInterruptException
at java.base/java.nio.channels.spi.AbstractInterruptibleChannel.end(Unknown Source)
at java.base/java.nio.channels.Channels$ReadableByteChannelImpl.read(Unknown Source)
at com.google.cloud.hadoop.repackaged.gcs.com.google.cloud.hadoop.gcsio.GoogleCloudStorageReadChannel.read(GoogleCloudStorageReadChannel.java:313)
at com.google.cloud.hadoop.fs.gcs.GoogleHadoopFSInputStream.read(GoogleHadoopFSInputStream.java:118)
at java.base/java.io.DataInputStream.read(Unknown Source)
at org.apache.flink.runtime.fs.hdfs.HadoopDataInputStream.read(HadoopDataInputStream.java:94)
at java.base/java.io.InputStream.read(Unknown Source)
at org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.downloadDataForStateHandle(RocksDBStateDownloader.java:135)
...


and


SEVERE: Interrupted while sleeping before retry. Giving up after 1/10 retries for 'gs://<REDACTED>/flink-checkpoints/150a406a50d20e1ee77422d25ef28d52/shared/3e64cd74-4280-4c31-916a-fe981bf2306c'
20:52:46.894 [<REDACTED> (141/624)#7] ERROR org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder - Caught unexpected exception.
java.nio.channels.ClosedChannelException: null
at sun.nio.ch.FileChannelImpl.ensureOpen(Unknown Source) ~[?:?]
at sun.nio.ch.FileChannelImpl.write(Unknown Source) ~[?:?]
at java.nio.channels.Channels.writeFullyImpl(Unknown Source) ~[?:?]
at java.nio.channels.Channels.writeFully(Unknown Source) ~[?:?]
at java.nio.channels.Channels$1.write(Unknown Source) ~[?:?]
at org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.downloadDataForStateHandle(RocksDBStateDownloader.java:140) ~[flink-dist_2.12-1.12.0.jar:1.12.0]


also


20:52:46.895 [<REDACTED> (141/624)#7] WARN  org.apache.flink.streaming.api.operators.BackendRestorerProcedure - Exception while restoring keyed state backend for KeyedProcessOperator_ff97494a101b44a4b7a2913028a50243_(141/624) from alternative (1/1), will retry while more alternatives are available.
org.apache.flink.runtime.state.BackendBuildingException: Caught unexpected exception.
at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:328) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
...


and a few of 


Caused by: org.apache.flink.util.FlinkRuntimeException: Failed to download data for state handles.
at org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.downloadDataForAllStateHandles(RocksDBStateDownloader.java:92) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
...
Caused by: java.util.concurrent.ExecutionException: java.lang.RuntimeException: no bytes written



Has anyone seen behaviour like this?


My current theory: because it needs to download a lot of state from GCS the pipeline probably experiences some sort of GCS back-off issue (150 taskmanager x 4 slots, also 4 state.backend.rocksdb.checkpoint.transfer.thread.num), probably too many read requests to the same GCS prefix? And I guess it doesn't finish in the time that's expected and randomly fails. Maybe there is some kind of timeout value I can tweak? So downloading from GCS can take time that's necessary without failing prematurely.

Any help is very appreciated!



Reply | Threaded
Open this post in threaded view
|

Re: Flink Taskmanager failure recovery and large state

Guowei Ma
Hi, Yaroslav

AFAIK Flink does not retry if the download checkpoint from the storage fails. On the other hand the FileSystem already has this retry mechanism already. So I think there is no need for flink to retry.
I am not very sure but from the log it seems that the gfs's retry is interrupted by some reason. So I think we could get more insight if we could find the first fail cause.

Best,
Guowei


On Fri, Apr 2, 2021 at 12:07 AM Yaroslav Tkachenko <[hidden email]> wrote:
Hi Guowei,

I thought Flink can support any HDFS-compatible object store like the majority of Big Data frameworks. So we just added "flink-shaded-hadoop-2-uber" and "gcs-connector-latest-hadoop2" dependencies to the classpath, after that using "gs" prefix seems to be possible:

state.checkpoints.dir: gs://<REDACTED>/flink-checkpoints
state.savepoints.dir: gs://<REDACTED>/flink-savepoints

And yes, I noticed that retries logging too, but I'm not sure if it's implemented on the Flink side or the GCS connector side? Probably need to dive deeper into the source code. And if it's implemented on the GCS connector side, will Flink wait for all the retries? That's why I asked about the potential timeout on the Flink side.

The JM log doesn't have much besides from what I already posted. It's hard for me to share the whole log, but the RocksDB initialization part can be relevant:

16:03:41.987 [cluster-io-thread-3] INFO  org.apache.flink.runtime.jobmaster.JobMaster - Using job/cluster config to configure application-defined state backend: RocksDBStateBackend{checkpointStreamBackend=File State Backend (checkpoints: 'gs://<REDACTED>/flink-checkpoints', savepoints: 'gs://<REDACTED>/flink-savepoints', asynchronous: TRUE, fileStateThreshold: 1048576), localRocksDbDirectories=[/rocksdb], enableIncrementalCheckpointing=TRUE, numberOfTransferThreads=4, writeBatchSize=2097152}
16:03:41.988 [cluster-io-thread-3] INFO  org.apache.flink.contrib.streaming.state.RocksDBStateBackend - Using predefined options: FLASH_SSD_OPTIMIZED.
16:03:41.988 [cluster-io-thread-3] INFO  org.apache.flink.contrib.streaming.state.RocksDBStateBackend - Using application-defined options factory: DefaultConfigurableOptionsFactory{configuredOptions={state.backend.rocksdb.thread.num=4, state.backend.rocksdb.block.blocksize=16 kb, state.backend.rocksdb.block.cache-size=64 mb}}.
16:03:41.988 [cluster-io-thread-3] INFO  org.apache.flink.runtime.jobmaster.JobMaster - Using application-defined state backend: RocksDBStateBackend{checkpointStreamBackend=File State Backend (checkpoints: 'gs://<REDACTED>/flink-checkpoints', savepoints: 'gs://<REDACTED>/flink-savepoints', asynchronous: TRUE, fileStateThreshold: 1048576), localRocksDbDirectories=[/rocksdb], enableIncrementalCheckpointing=TRUE, numberOfTransferThreads=4, writeBatchSize=2097152}

Thanks!

On Thu, Apr 1, 2021 at 2:30 AM Guowei Ma <[hidden email]> wrote:
Hi, Yaroslav

AFAIK there is no official GCS FileSystem support in FLINK.  Does the GCS is implemented by yourself? 
Would you like to share the whole log of jm?

BTW: From the following log I think the implementation has already some retry mechanism.
>>> Interrupted while sleeping before retry. Giving up after 1/10 retries for 'gs://<REDACTED>/flink-checkpoints/150a406a50d20e1ee77422d25ef28d
 
Best,
Guowei


On Thu, Apr 1, 2021 at 12:50 PM Yaroslav Tkachenko <[hidden email]> wrote:
Hi everyone,

I'm wondering if people have experienced issues with Taskmanager failure recovery when dealing with a lot of state.

I'm using 1.12.0 on Kubernetes, RocksDB backend with GCS for savepoints and checkpoints. ~150 task managers with 4 slots each.

When I run a pipeline without much state and kill one of the taskmanagers, it takes a few minutes to recover (I see a few restarts), but eventually when a new replacement taskmanager is registered with the jobmanager things go back to healthy.

But when I run a pipeline with a lot of state (1TB+) and kill one of the taskmanagers, the pipeline never recovers, even after the replacement taskmanager has joined. It just enters an infinite loop of restarts and failures.

On the jobmanager, I see an endless loop of state transitions: RUNNING -> CANCELING -> CANCELED -> CREATED -> SCHEDULED -> DEPLOYING -> RUNNING. It stays in RUNNING for a few seconds, but then transitions into FAILED with a message like this:


22:28:07.338 [flink-akka.actor.default-dispatcher-239] INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph - <REDACTED> (569/624) (11cb45392108bb07d65fdd0fdc6b6741) switched from RUNNING to FAILED on 10.30.10.212:6122-ac6bba @ 10.30.10.212 (dataPort=43357).
org.apache.flink.runtime.io.network.netty.exception.LocalTransportException: readAddress(..) failed: Connection reset by peer (connection to '10.30.10.53/10.30.10.53:45789')
at org.apache.flink.runtime.io.network.netty.CreditBasedPartitionRequestClientHandler.exceptionCaught(CreditBasedPartitionRequestClientHandler.java:173) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
...
Caused by: org.apache.flink.shaded.netty4.io.netty.channel.unix.Errors$NativeIoException: readAddress(..) failed: Connection reset by peer


Which, I guess, means a failed Taskmanager. And since there are not enough task slots to run it goes into this endless loop again. It's never the same Taskmanager that fails.



On the Taskmanager side, things look more interesting. I see a variety of exceptions:


org.apache.flink.runtime.taskmanager.Task - <REDACTED> (141/624)#7 (6f3651a49344754a1e7d1fb20cf2cba3) switched from RUNNING to FAILED.
org.apache.flink.runtime.jobmaster.ExecutionGraphException: The execution attempt 6f3651a49344754a1e7d1fb20cf2cba3 was not found.


also 


WARNING: Failed read retry #1/10 for 'gs://<REDACTED>/flink-checkpoints/150a406a50d20e1ee77422d25ef28d52/shared/3e64cd74-4280-4c31-916a-fe981bf2306c'. Sleeping...
java.nio.channels.ClosedByInterruptException
at java.base/java.nio.channels.spi.AbstractInterruptibleChannel.end(Unknown Source)
at java.base/java.nio.channels.Channels$ReadableByteChannelImpl.read(Unknown Source)
at com.google.cloud.hadoop.repackaged.gcs.com.google.cloud.hadoop.gcsio.GoogleCloudStorageReadChannel.read(GoogleCloudStorageReadChannel.java:313)
at com.google.cloud.hadoop.fs.gcs.GoogleHadoopFSInputStream.read(GoogleHadoopFSInputStream.java:118)
at java.base/java.io.DataInputStream.read(Unknown Source)
at org.apache.flink.runtime.fs.hdfs.HadoopDataInputStream.read(HadoopDataInputStream.java:94)
at java.base/java.io.InputStream.read(Unknown Source)
at org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.downloadDataForStateHandle(RocksDBStateDownloader.java:135)
...


and


SEVERE: Interrupted while sleeping before retry. Giving up after 1/10 retries for 'gs://<REDACTED>/flink-checkpoints/150a406a50d20e1ee77422d25ef28d52/shared/3e64cd74-4280-4c31-916a-fe981bf2306c'
20:52:46.894 [<REDACTED> (141/624)#7] ERROR org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder - Caught unexpected exception.
java.nio.channels.ClosedChannelException: null
at sun.nio.ch.FileChannelImpl.ensureOpen(Unknown Source) ~[?:?]
at sun.nio.ch.FileChannelImpl.write(Unknown Source) ~[?:?]
at java.nio.channels.Channels.writeFullyImpl(Unknown Source) ~[?:?]
at java.nio.channels.Channels.writeFully(Unknown Source) ~[?:?]
at java.nio.channels.Channels$1.write(Unknown Source) ~[?:?]
at org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.downloadDataForStateHandle(RocksDBStateDownloader.java:140) ~[flink-dist_2.12-1.12.0.jar:1.12.0]


also


20:52:46.895 [<REDACTED> (141/624)#7] WARN  org.apache.flink.streaming.api.operators.BackendRestorerProcedure - Exception while restoring keyed state backend for KeyedProcessOperator_ff97494a101b44a4b7a2913028a50243_(141/624) from alternative (1/1), will retry while more alternatives are available.
org.apache.flink.runtime.state.BackendBuildingException: Caught unexpected exception.
at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:328) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
...


and a few of 


Caused by: org.apache.flink.util.FlinkRuntimeException: Failed to download data for state handles.
at org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.downloadDataForAllStateHandles(RocksDBStateDownloader.java:92) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
...
Caused by: java.util.concurrent.ExecutionException: java.lang.RuntimeException: no bytes written



Has anyone seen behaviour like this?


My current theory: because it needs to download a lot of state from GCS the pipeline probably experiences some sort of GCS back-off issue (150 taskmanager x 4 slots, also 4 state.backend.rocksdb.checkpoint.transfer.thread.num), probably too many read requests to the same GCS prefix? And I guess it doesn't finish in the time that's expected and randomly fails. Maybe there is some kind of timeout value I can tweak? So downloading from GCS can take time that's necessary without failing prematurely.

Any help is very appreciated!



Reply | Threaded
Open this post in threaded view
|

Re: Flink Taskmanager failure recovery and large state

rmetzger0
Hey Yaroslav,

GCS is a somewhat popular filesystem that should work fine with Flink.

It seems that the initial scale of a bucket is 5000 read requests per second (https://cloud.google.com/storage/docs/request-rate), your job should be at roughly the same rate (depending on how fast your job restarts in the restart loop).

You could try to tweak the GCS configuration parameters, such as increasing "fs.gs.http.read-timeout" and "fs.gs.http.max.retry". (see https://github.com/GoogleCloudDataproc/hadoop-connectors/blob/master/gcs/CONFIGURATION.md for all available options)


The "ExecutionGraphException: The execution attempt 6f3651a49344754a1e7d1fb20cf2cba3 was not found." error looks weird, but it should not cause the restarts.


On Fri, Apr 2, 2021 at 2:15 AM Guowei Ma <[hidden email]> wrote:
Hi, Yaroslav

AFAIK Flink does not retry if the download checkpoint from the storage fails. On the other hand the FileSystem already has this retry mechanism already. So I think there is no need for flink to retry.
I am not very sure but from the log it seems that the gfs's retry is interrupted by some reason. So I think we could get more insight if we could find the first fail cause.

Best,
Guowei


On Fri, Apr 2, 2021 at 12:07 AM Yaroslav Tkachenko <[hidden email]> wrote:
Hi Guowei,

I thought Flink can support any HDFS-compatible object store like the majority of Big Data frameworks. So we just added "flink-shaded-hadoop-2-uber" and "gcs-connector-latest-hadoop2" dependencies to the classpath, after that using "gs" prefix seems to be possible:

state.checkpoints.dir: gs://<REDACTED>/flink-checkpoints
state.savepoints.dir: gs://<REDACTED>/flink-savepoints

And yes, I noticed that retries logging too, but I'm not sure if it's implemented on the Flink side or the GCS connector side? Probably need to dive deeper into the source code. And if it's implemented on the GCS connector side, will Flink wait for all the retries? That's why I asked about the potential timeout on the Flink side.

The JM log doesn't have much besides from what I already posted. It's hard for me to share the whole log, but the RocksDB initialization part can be relevant:

16:03:41.987 [cluster-io-thread-3] INFO  org.apache.flink.runtime.jobmaster.JobMaster - Using job/cluster config to configure application-defined state backend: RocksDBStateBackend{checkpointStreamBackend=File State Backend (checkpoints: 'gs://<REDACTED>/flink-checkpoints', savepoints: 'gs://<REDACTED>/flink-savepoints', asynchronous: TRUE, fileStateThreshold: 1048576), localRocksDbDirectories=[/rocksdb], enableIncrementalCheckpointing=TRUE, numberOfTransferThreads=4, writeBatchSize=2097152}
16:03:41.988 [cluster-io-thread-3] INFO  org.apache.flink.contrib.streaming.state.RocksDBStateBackend - Using predefined options: FLASH_SSD_OPTIMIZED.
16:03:41.988 [cluster-io-thread-3] INFO  org.apache.flink.contrib.streaming.state.RocksDBStateBackend - Using application-defined options factory: DefaultConfigurableOptionsFactory{configuredOptions={state.backend.rocksdb.thread.num=4, state.backend.rocksdb.block.blocksize=16 kb, state.backend.rocksdb.block.cache-size=64 mb}}.
16:03:41.988 [cluster-io-thread-3] INFO  org.apache.flink.runtime.jobmaster.JobMaster - Using application-defined state backend: RocksDBStateBackend{checkpointStreamBackend=File State Backend (checkpoints: 'gs://<REDACTED>/flink-checkpoints', savepoints: 'gs://<REDACTED>/flink-savepoints', asynchronous: TRUE, fileStateThreshold: 1048576), localRocksDbDirectories=[/rocksdb], enableIncrementalCheckpointing=TRUE, numberOfTransferThreads=4, writeBatchSize=2097152}

Thanks!

On Thu, Apr 1, 2021 at 2:30 AM Guowei Ma <[hidden email]> wrote:
Hi, Yaroslav

AFAIK there is no official GCS FileSystem support in FLINK.  Does the GCS is implemented by yourself? 
Would you like to share the whole log of jm?

BTW: From the following log I think the implementation has already some retry mechanism.
>>> Interrupted while sleeping before retry. Giving up after 1/10 retries for 'gs://<REDACTED>/flink-checkpoints/150a406a50d20e1ee77422d25ef28d
 
Best,
Guowei


On Thu, Apr 1, 2021 at 12:50 PM Yaroslav Tkachenko <[hidden email]> wrote:
Hi everyone,

I'm wondering if people have experienced issues with Taskmanager failure recovery when dealing with a lot of state.

I'm using 1.12.0 on Kubernetes, RocksDB backend with GCS for savepoints and checkpoints. ~150 task managers with 4 slots each.

When I run a pipeline without much state and kill one of the taskmanagers, it takes a few minutes to recover (I see a few restarts), but eventually when a new replacement taskmanager is registered with the jobmanager things go back to healthy.

But when I run a pipeline with a lot of state (1TB+) and kill one of the taskmanagers, the pipeline never recovers, even after the replacement taskmanager has joined. It just enters an infinite loop of restarts and failures.

On the jobmanager, I see an endless loop of state transitions: RUNNING -> CANCELING -> CANCELED -> CREATED -> SCHEDULED -> DEPLOYING -> RUNNING. It stays in RUNNING for a few seconds, but then transitions into FAILED with a message like this:


22:28:07.338 [flink-akka.actor.default-dispatcher-239] INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph - <REDACTED> (569/624) (11cb45392108bb07d65fdd0fdc6b6741) switched from RUNNING to FAILED on 10.30.10.212:6122-ac6bba @ 10.30.10.212 (dataPort=43357).
org.apache.flink.runtime.io.network.netty.exception.LocalTransportException: readAddress(..) failed: Connection reset by peer (connection to '10.30.10.53/10.30.10.53:45789')
at org.apache.flink.runtime.io.network.netty.CreditBasedPartitionRequestClientHandler.exceptionCaught(CreditBasedPartitionRequestClientHandler.java:173) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
...
Caused by: org.apache.flink.shaded.netty4.io.netty.channel.unix.Errors$NativeIoException: readAddress(..) failed: Connection reset by peer


Which, I guess, means a failed Taskmanager. And since there are not enough task slots to run it goes into this endless loop again. It's never the same Taskmanager that fails.



On the Taskmanager side, things look more interesting. I see a variety of exceptions:


org.apache.flink.runtime.taskmanager.Task - <REDACTED> (141/624)#7 (6f3651a49344754a1e7d1fb20cf2cba3) switched from RUNNING to FAILED.
org.apache.flink.runtime.jobmaster.ExecutionGraphException: The execution attempt 6f3651a49344754a1e7d1fb20cf2cba3 was not found.


also 


WARNING: Failed read retry #1/10 for 'gs://<REDACTED>/flink-checkpoints/150a406a50d20e1ee77422d25ef28d52/shared/3e64cd74-4280-4c31-916a-fe981bf2306c'. Sleeping...
java.nio.channels.ClosedByInterruptException
at java.base/java.nio.channels.spi.AbstractInterruptibleChannel.end(Unknown Source)
at java.base/java.nio.channels.Channels$ReadableByteChannelImpl.read(Unknown Source)
at com.google.cloud.hadoop.repackaged.gcs.com.google.cloud.hadoop.gcsio.GoogleCloudStorageReadChannel.read(GoogleCloudStorageReadChannel.java:313)
at com.google.cloud.hadoop.fs.gcs.GoogleHadoopFSInputStream.read(GoogleHadoopFSInputStream.java:118)
at java.base/java.io.DataInputStream.read(Unknown Source)
at org.apache.flink.runtime.fs.hdfs.HadoopDataInputStream.read(HadoopDataInputStream.java:94)
at java.base/java.io.InputStream.read(Unknown Source)
at org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.downloadDataForStateHandle(RocksDBStateDownloader.java:135)
...


and


SEVERE: Interrupted while sleeping before retry. Giving up after 1/10 retries for 'gs://<REDACTED>/flink-checkpoints/150a406a50d20e1ee77422d25ef28d52/shared/3e64cd74-4280-4c31-916a-fe981bf2306c'
20:52:46.894 [<REDACTED> (141/624)#7] ERROR org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder - Caught unexpected exception.
java.nio.channels.ClosedChannelException: null
at sun.nio.ch.FileChannelImpl.ensureOpen(Unknown Source) ~[?:?]
at sun.nio.ch.FileChannelImpl.write(Unknown Source) ~[?:?]
at java.nio.channels.Channels.writeFullyImpl(Unknown Source) ~[?:?]
at java.nio.channels.Channels.writeFully(Unknown Source) ~[?:?]
at java.nio.channels.Channels$1.write(Unknown Source) ~[?:?]
at org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.downloadDataForStateHandle(RocksDBStateDownloader.java:140) ~[flink-dist_2.12-1.12.0.jar:1.12.0]


also


20:52:46.895 [<REDACTED> (141/624)#7] WARN  org.apache.flink.streaming.api.operators.BackendRestorerProcedure - Exception while restoring keyed state backend for KeyedProcessOperator_ff97494a101b44a4b7a2913028a50243_(141/624) from alternative (1/1), will retry while more alternatives are available.
org.apache.flink.runtime.state.BackendBuildingException: Caught unexpected exception.
at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:328) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
...


and a few of 


Caused by: org.apache.flink.util.FlinkRuntimeException: Failed to download data for state handles.
at org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.downloadDataForAllStateHandles(RocksDBStateDownloader.java:92) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
...
Caused by: java.util.concurrent.ExecutionException: java.lang.RuntimeException: no bytes written



Has anyone seen behaviour like this?


My current theory: because it needs to download a lot of state from GCS the pipeline probably experiences some sort of GCS back-off issue (150 taskmanager x 4 slots, also 4 state.backend.rocksdb.checkpoint.transfer.thread.num), probably too many read requests to the same GCS prefix? And I guess it doesn't finish in the time that's expected and randomly fails. Maybe there is some kind of timeout value I can tweak? So downloading from GCS can take time that's necessary without failing prematurely.

Any help is very appreciated!



Reply | Threaded
Open this post in threaded view
|

Re: Flink Taskmanager failure recovery and large state

dhanesh arole
Hi Yaroslav, 

We faced similar issues in our large stateful stream processing job. I had asked question about it on a user mailing list a few days back. Based on the reply to my question, we figured that this happens when the task manager has just come back online and is trying to rebuild / restore its state, but meanwhile another task manager gets restarted or killed. In this situation job manager cancels the job, as a result all task managers also start cancelling the tasks that they are running atm. As a part of cancellation flow, channel buffer through which flink TM writes to the disk gets closed. But there's already state rebuilding happening concurrently using that channelBuffer. This causes the channelClosed exception. 

As a solution to this problem, we increased akka.ask.timeout to 10m. This gives enough room to task managers to wait for rpc responses from other task managers during restart. As a result TM becomes more lenient in marking other TM as failed and cancelling the job in the first place.

Dhanesh Arole



On Tue, Apr 6, 2021 at 7:55 PM Robert Metzger <[hidden email]> wrote:
Hey Yaroslav,

GCS is a somewhat popular filesystem that should work fine with Flink.

It seems that the initial scale of a bucket is 5000 read requests per second (https://cloud.google.com/storage/docs/request-rate), your job should be at roughly the same rate (depending on how fast your job restarts in the restart loop).

You could try to tweak the GCS configuration parameters, such as increasing "fs.gs.http.read-timeout" and "fs.gs.http.max.retry". (see https://github.com/GoogleCloudDataproc/hadoop-connectors/blob/master/gcs/CONFIGURATION.md for all available options)


The "ExecutionGraphException: The execution attempt 6f3651a49344754a1e7d1fb20cf2cba3 was not found." error looks weird, but it should not cause the restarts.


On Fri, Apr 2, 2021 at 2:15 AM Guowei Ma <[hidden email]> wrote:
Hi, Yaroslav

AFAIK Flink does not retry if the download checkpoint from the storage fails. On the other hand the FileSystem already has this retry mechanism already. So I think there is no need for flink to retry.
I am not very sure but from the log it seems that the gfs's retry is interrupted by some reason. So I think we could get more insight if we could find the first fail cause.

Best,
Guowei


On Fri, Apr 2, 2021 at 12:07 AM Yaroslav Tkachenko <[hidden email]> wrote:
Hi Guowei,

I thought Flink can support any HDFS-compatible object store like the majority of Big Data frameworks. So we just added "flink-shaded-hadoop-2-uber" and "gcs-connector-latest-hadoop2" dependencies to the classpath, after that using "gs" prefix seems to be possible:

state.checkpoints.dir: gs://<REDACTED>/flink-checkpoints
state.savepoints.dir: gs://<REDACTED>/flink-savepoints

And yes, I noticed that retries logging too, but I'm not sure if it's implemented on the Flink side or the GCS connector side? Probably need to dive deeper into the source code. And if it's implemented on the GCS connector side, will Flink wait for all the retries? That's why I asked about the potential timeout on the Flink side.

The JM log doesn't have much besides from what I already posted. It's hard for me to share the whole log, but the RocksDB initialization part can be relevant:

16:03:41.987 [cluster-io-thread-3] INFO  org.apache.flink.runtime.jobmaster.JobMaster - Using job/cluster config to configure application-defined state backend: RocksDBStateBackend{checkpointStreamBackend=File State Backend (checkpoints: 'gs://<REDACTED>/flink-checkpoints', savepoints: 'gs://<REDACTED>/flink-savepoints', asynchronous: TRUE, fileStateThreshold: 1048576), localRocksDbDirectories=[/rocksdb], enableIncrementalCheckpointing=TRUE, numberOfTransferThreads=4, writeBatchSize=2097152}
16:03:41.988 [cluster-io-thread-3] INFO  org.apache.flink.contrib.streaming.state.RocksDBStateBackend - Using predefined options: FLASH_SSD_OPTIMIZED.
16:03:41.988 [cluster-io-thread-3] INFO  org.apache.flink.contrib.streaming.state.RocksDBStateBackend - Using application-defined options factory: DefaultConfigurableOptionsFactory{configuredOptions={state.backend.rocksdb.thread.num=4, state.backend.rocksdb.block.blocksize=16 kb, state.backend.rocksdb.block.cache-size=64 mb}}.
16:03:41.988 [cluster-io-thread-3] INFO  org.apache.flink.runtime.jobmaster.JobMaster - Using application-defined state backend: RocksDBStateBackend{checkpointStreamBackend=File State Backend (checkpoints: 'gs://<REDACTED>/flink-checkpoints', savepoints: 'gs://<REDACTED>/flink-savepoints', asynchronous: TRUE, fileStateThreshold: 1048576), localRocksDbDirectories=[/rocksdb], enableIncrementalCheckpointing=TRUE, numberOfTransferThreads=4, writeBatchSize=2097152}

Thanks!

On Thu, Apr 1, 2021 at 2:30 AM Guowei Ma <[hidden email]> wrote:
Hi, Yaroslav

AFAIK there is no official GCS FileSystem support in FLINK.  Does the GCS is implemented by yourself? 
Would you like to share the whole log of jm?

BTW: From the following log I think the implementation has already some retry mechanism.
>>> Interrupted while sleeping before retry. Giving up after 1/10 retries for 'gs://<REDACTED>/flink-checkpoints/150a406a50d20e1ee77422d25ef28d
 
Best,
Guowei


On Thu, Apr 1, 2021 at 12:50 PM Yaroslav Tkachenko <[hidden email]> wrote:
Hi everyone,

I'm wondering if people have experienced issues with Taskmanager failure recovery when dealing with a lot of state.

I'm using 1.12.0 on Kubernetes, RocksDB backend with GCS for savepoints and checkpoints. ~150 task managers with 4 slots each.

When I run a pipeline without much state and kill one of the taskmanagers, it takes a few minutes to recover (I see a few restarts), but eventually when a new replacement taskmanager is registered with the jobmanager things go back to healthy.

But when I run a pipeline with a lot of state (1TB+) and kill one of the taskmanagers, the pipeline never recovers, even after the replacement taskmanager has joined. It just enters an infinite loop of restarts and failures.

On the jobmanager, I see an endless loop of state transitions: RUNNING -> CANCELING -> CANCELED -> CREATED -> SCHEDULED -> DEPLOYING -> RUNNING. It stays in RUNNING for a few seconds, but then transitions into FAILED with a message like this:


22:28:07.338 [flink-akka.actor.default-dispatcher-239] INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph - <REDACTED> (569/624) (11cb45392108bb07d65fdd0fdc6b6741) switched from RUNNING to FAILED on 10.30.10.212:6122-ac6bba @ 10.30.10.212 (dataPort=43357).
org.apache.flink.runtime.io.network.netty.exception.LocalTransportException: readAddress(..) failed: Connection reset by peer (connection to '10.30.10.53/10.30.10.53:45789')
at org.apache.flink.runtime.io.network.netty.CreditBasedPartitionRequestClientHandler.exceptionCaught(CreditBasedPartitionRequestClientHandler.java:173) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
...
Caused by: org.apache.flink.shaded.netty4.io.netty.channel.unix.Errors$NativeIoException: readAddress(..) failed: Connection reset by peer


Which, I guess, means a failed Taskmanager. And since there are not enough task slots to run it goes into this endless loop again. It's never the same Taskmanager that fails.



On the Taskmanager side, things look more interesting. I see a variety of exceptions:


org.apache.flink.runtime.taskmanager.Task - <REDACTED> (141/624)#7 (6f3651a49344754a1e7d1fb20cf2cba3) switched from RUNNING to FAILED.
org.apache.flink.runtime.jobmaster.ExecutionGraphException: The execution attempt 6f3651a49344754a1e7d1fb20cf2cba3 was not found.


also 


WARNING: Failed read retry #1/10 for 'gs://<REDACTED>/flink-checkpoints/150a406a50d20e1ee77422d25ef28d52/shared/3e64cd74-4280-4c31-916a-fe981bf2306c'. Sleeping...
java.nio.channels.ClosedByInterruptException
at java.base/java.nio.channels.spi.AbstractInterruptibleChannel.end(Unknown Source)
at java.base/java.nio.channels.Channels$ReadableByteChannelImpl.read(Unknown Source)
at com.google.cloud.hadoop.repackaged.gcs.com.google.cloud.hadoop.gcsio.GoogleCloudStorageReadChannel.read(GoogleCloudStorageReadChannel.java:313)
at com.google.cloud.hadoop.fs.gcs.GoogleHadoopFSInputStream.read(GoogleHadoopFSInputStream.java:118)
at java.base/java.io.DataInputStream.read(Unknown Source)
at org.apache.flink.runtime.fs.hdfs.HadoopDataInputStream.read(HadoopDataInputStream.java:94)
at java.base/java.io.InputStream.read(Unknown Source)
at org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.downloadDataForStateHandle(RocksDBStateDownloader.java:135)
...


and


SEVERE: Interrupted while sleeping before retry. Giving up after 1/10 retries for 'gs://<REDACTED>/flink-checkpoints/150a406a50d20e1ee77422d25ef28d52/shared/3e64cd74-4280-4c31-916a-fe981bf2306c'
20:52:46.894 [<REDACTED> (141/624)#7] ERROR org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder - Caught unexpected exception.
java.nio.channels.ClosedChannelException: null
at sun.nio.ch.FileChannelImpl.ensureOpen(Unknown Source) ~[?:?]
at sun.nio.ch.FileChannelImpl.write(Unknown Source) ~[?:?]
at java.nio.channels.Channels.writeFullyImpl(Unknown Source) ~[?:?]
at java.nio.channels.Channels.writeFully(Unknown Source) ~[?:?]
at java.nio.channels.Channels$1.write(Unknown Source) ~[?:?]
at org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.downloadDataForStateHandle(RocksDBStateDownloader.java:140) ~[flink-dist_2.12-1.12.0.jar:1.12.0]


also


20:52:46.895 [<REDACTED> (141/624)#7] WARN  org.apache.flink.streaming.api.operators.BackendRestorerProcedure - Exception while restoring keyed state backend for KeyedProcessOperator_ff97494a101b44a4b7a2913028a50243_(141/624) from alternative (1/1), will retry while more alternatives are available.
org.apache.flink.runtime.state.BackendBuildingException: Caught unexpected exception.
at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:328) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
...


and a few of 


Caused by: org.apache.flink.util.FlinkRuntimeException: Failed to download data for state handles.
at org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.downloadDataForAllStateHandles(RocksDBStateDownloader.java:92) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
...
Caused by: java.util.concurrent.ExecutionException: java.lang.RuntimeException: no bytes written



Has anyone seen behaviour like this?


My current theory: because it needs to download a lot of state from GCS the pipeline probably experiences some sort of GCS back-off issue (150 taskmanager x 4 slots, also 4 state.backend.rocksdb.checkpoint.transfer.thread.num), probably too many read requests to the same GCS prefix? And I guess it doesn't finish in the time that's expected and randomly fails. Maybe there is some kind of timeout value I can tweak? So downloading from GCS can take time that's necessary without failing prematurely.

Any help is very appreciated!



Reply | Threaded
Open this post in threaded view
|

Re: Flink Taskmanager failure recovery and large state

Yaroslav Tkachenko-2
Hi Dhanesh,

Thanks for the recommendation! I'll try it out.

On Wed, Apr 7, 2021 at 1:59 AM dhanesh arole <[hidden email]> wrote:
Hi Yaroslav, 

We faced similar issues in our large stateful stream processing job. I had asked question about it on a user mailing list a few days back. Based on the reply to my question, we figured that this happens when the task manager has just come back online and is trying to rebuild / restore its state, but meanwhile another task manager gets restarted or killed. In this situation job manager cancels the job, as a result all task managers also start cancelling the tasks that they are running atm. As a part of cancellation flow, channel buffer through which flink TM writes to the disk gets closed. But there's already state rebuilding happening concurrently using that channelBuffer. This causes the channelClosed exception. 

As a solution to this problem, we increased akka.ask.timeout to 10m. This gives enough room to task managers to wait for rpc responses from other task managers during restart. As a result TM becomes more lenient in marking other TM as failed and cancelling the job in the first place.

Dhanesh Arole



On Tue, Apr 6, 2021 at 7:55 PM Robert Metzger <[hidden email]> wrote:
Hey Yaroslav,

GCS is a somewhat popular filesystem that should work fine with Flink.

It seems that the initial scale of a bucket is 5000 read requests per second (https://cloud.google.com/storage/docs/request-rate), your job should be at roughly the same rate (depending on how fast your job restarts in the restart loop).

You could try to tweak the GCS configuration parameters, such as increasing "fs.gs.http.read-timeout" and "fs.gs.http.max.retry". (see https://github.com/GoogleCloudDataproc/hadoop-connectors/blob/master/gcs/CONFIGURATION.md for all available options)


The "ExecutionGraphException: The execution attempt 6f3651a49344754a1e7d1fb20cf2cba3 was not found." error looks weird, but it should not cause the restarts.


On Fri, Apr 2, 2021 at 2:15 AM Guowei Ma <[hidden email]> wrote:
Hi, Yaroslav

AFAIK Flink does not retry if the download checkpoint from the storage fails. On the other hand the FileSystem already has this retry mechanism already. So I think there is no need for flink to retry.
I am not very sure but from the log it seems that the gfs's retry is interrupted by some reason. So I think we could get more insight if we could find the first fail cause.

Best,
Guowei


On Fri, Apr 2, 2021 at 12:07 AM Yaroslav Tkachenko <[hidden email]> wrote:
Hi Guowei,

I thought Flink can support any HDFS-compatible object store like the majority of Big Data frameworks. So we just added "flink-shaded-hadoop-2-uber" and "gcs-connector-latest-hadoop2" dependencies to the classpath, after that using "gs" prefix seems to be possible:

state.checkpoints.dir: gs://<REDACTED>/flink-checkpoints
state.savepoints.dir: gs://<REDACTED>/flink-savepoints

And yes, I noticed that retries logging too, but I'm not sure if it's implemented on the Flink side or the GCS connector side? Probably need to dive deeper into the source code. And if it's implemented on the GCS connector side, will Flink wait for all the retries? That's why I asked about the potential timeout on the Flink side.

The JM log doesn't have much besides from what I already posted. It's hard for me to share the whole log, but the RocksDB initialization part can be relevant:

16:03:41.987 [cluster-io-thread-3] INFO  org.apache.flink.runtime.jobmaster.JobMaster - Using job/cluster config to configure application-defined state backend: RocksDBStateBackend{checkpointStreamBackend=File State Backend (checkpoints: 'gs://<REDACTED>/flink-checkpoints', savepoints: 'gs://<REDACTED>/flink-savepoints', asynchronous: TRUE, fileStateThreshold: 1048576), localRocksDbDirectories=[/rocksdb], enableIncrementalCheckpointing=TRUE, numberOfTransferThreads=4, writeBatchSize=2097152}
16:03:41.988 [cluster-io-thread-3] INFO  org.apache.flink.contrib.streaming.state.RocksDBStateBackend - Using predefined options: FLASH_SSD_OPTIMIZED.
16:03:41.988 [cluster-io-thread-3] INFO  org.apache.flink.contrib.streaming.state.RocksDBStateBackend - Using application-defined options factory: DefaultConfigurableOptionsFactory{configuredOptions={state.backend.rocksdb.thread.num=4, state.backend.rocksdb.block.blocksize=16 kb, state.backend.rocksdb.block.cache-size=64 mb}}.
16:03:41.988 [cluster-io-thread-3] INFO  org.apache.flink.runtime.jobmaster.JobMaster - Using application-defined state backend: RocksDBStateBackend{checkpointStreamBackend=File State Backend (checkpoints: 'gs://<REDACTED>/flink-checkpoints', savepoints: 'gs://<REDACTED>/flink-savepoints', asynchronous: TRUE, fileStateThreshold: 1048576), localRocksDbDirectories=[/rocksdb], enableIncrementalCheckpointing=TRUE, numberOfTransferThreads=4, writeBatchSize=2097152}

Thanks!

On Thu, Apr 1, 2021 at 2:30 AM Guowei Ma <[hidden email]> wrote:
Hi, Yaroslav

AFAIK there is no official GCS FileSystem support in FLINK.  Does the GCS is implemented by yourself? 
Would you like to share the whole log of jm?

BTW: From the following log I think the implementation has already some retry mechanism.
>>> Interrupted while sleeping before retry. Giving up after 1/10 retries for 'gs://<REDACTED>/flink-checkpoints/150a406a50d20e1ee77422d25ef28d
 
Best,
Guowei


On Thu, Apr 1, 2021 at 12:50 PM Yaroslav Tkachenko <[hidden email]> wrote:
Hi everyone,

I'm wondering if people have experienced issues with Taskmanager failure recovery when dealing with a lot of state.

I'm using 1.12.0 on Kubernetes, RocksDB backend with GCS for savepoints and checkpoints. ~150 task managers with 4 slots each.

When I run a pipeline without much state and kill one of the taskmanagers, it takes a few minutes to recover (I see a few restarts), but eventually when a new replacement taskmanager is registered with the jobmanager things go back to healthy.

But when I run a pipeline with a lot of state (1TB+) and kill one of the taskmanagers, the pipeline never recovers, even after the replacement taskmanager has joined. It just enters an infinite loop of restarts and failures.

On the jobmanager, I see an endless loop of state transitions: RUNNING -> CANCELING -> CANCELED -> CREATED -> SCHEDULED -> DEPLOYING -> RUNNING. It stays in RUNNING for a few seconds, but then transitions into FAILED with a message like this:


22:28:07.338 [flink-akka.actor.default-dispatcher-239] INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph - <REDACTED> (569/624) (11cb45392108bb07d65fdd0fdc6b6741) switched from RUNNING to FAILED on 10.30.10.212:6122-ac6bba @ 10.30.10.212 (dataPort=43357).
org.apache.flink.runtime.io.network.netty.exception.LocalTransportException: readAddress(..) failed: Connection reset by peer (connection to '10.30.10.53/10.30.10.53:45789')
at org.apache.flink.runtime.io.network.netty.CreditBasedPartitionRequestClientHandler.exceptionCaught(CreditBasedPartitionRequestClientHandler.java:173) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
...
Caused by: org.apache.flink.shaded.netty4.io.netty.channel.unix.Errors$NativeIoException: readAddress(..) failed: Connection reset by peer


Which, I guess, means a failed Taskmanager. And since there are not enough task slots to run it goes into this endless loop again. It's never the same Taskmanager that fails.



On the Taskmanager side, things look more interesting. I see a variety of exceptions:


org.apache.flink.runtime.taskmanager.Task - <REDACTED> (141/624)#7 (6f3651a49344754a1e7d1fb20cf2cba3) switched from RUNNING to FAILED.
org.apache.flink.runtime.jobmaster.ExecutionGraphException: The execution attempt 6f3651a49344754a1e7d1fb20cf2cba3 was not found.


also 


WARNING: Failed read retry #1/10 for 'gs://<REDACTED>/flink-checkpoints/150a406a50d20e1ee77422d25ef28d52/shared/3e64cd74-4280-4c31-916a-fe981bf2306c'. Sleeping...
java.nio.channels.ClosedByInterruptException
at java.base/java.nio.channels.spi.AbstractInterruptibleChannel.end(Unknown Source)
at java.base/java.nio.channels.Channels$ReadableByteChannelImpl.read(Unknown Source)
at com.google.cloud.hadoop.repackaged.gcs.com.google.cloud.hadoop.gcsio.GoogleCloudStorageReadChannel.read(GoogleCloudStorageReadChannel.java:313)
at com.google.cloud.hadoop.fs.gcs.GoogleHadoopFSInputStream.read(GoogleHadoopFSInputStream.java:118)
at java.base/java.io.DataInputStream.read(Unknown Source)
at org.apache.flink.runtime.fs.hdfs.HadoopDataInputStream.read(HadoopDataInputStream.java:94)
at java.base/java.io.InputStream.read(Unknown Source)
at org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.downloadDataForStateHandle(RocksDBStateDownloader.java:135)
...


and


SEVERE: Interrupted while sleeping before retry. Giving up after 1/10 retries for 'gs://<REDACTED>/flink-checkpoints/150a406a50d20e1ee77422d25ef28d52/shared/3e64cd74-4280-4c31-916a-fe981bf2306c'
20:52:46.894 [<REDACTED> (141/624)#7] ERROR org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder - Caught unexpected exception.
java.nio.channels.ClosedChannelException: null
at sun.nio.ch.FileChannelImpl.ensureOpen(Unknown Source) ~[?:?]
at sun.nio.ch.FileChannelImpl.write(Unknown Source) ~[?:?]
at java.nio.channels.Channels.writeFullyImpl(Unknown Source) ~[?:?]
at java.nio.channels.Channels.writeFully(Unknown Source) ~[?:?]
at java.nio.channels.Channels$1.write(Unknown Source) ~[?:?]
at org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.downloadDataForStateHandle(RocksDBStateDownloader.java:140) ~[flink-dist_2.12-1.12.0.jar:1.12.0]


also


20:52:46.895 [<REDACTED> (141/624)#7] WARN  org.apache.flink.streaming.api.operators.BackendRestorerProcedure - Exception while restoring keyed state backend for KeyedProcessOperator_ff97494a101b44a4b7a2913028a50243_(141/624) from alternative (1/1), will retry while more alternatives are available.
org.apache.flink.runtime.state.BackendBuildingException: Caught unexpected exception.
at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:328) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
...


and a few of 


Caused by: org.apache.flink.util.FlinkRuntimeException: Failed to download data for state handles.
at org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.downloadDataForAllStateHandles(RocksDBStateDownloader.java:92) ~[flink-dist_2.12-1.12.0.jar:1.12.0]
...
Caused by: java.util.concurrent.ExecutionException: java.lang.RuntimeException: no bytes written



Has anyone seen behaviour like this?


My current theory: because it needs to download a lot of state from GCS the pipeline probably experiences some sort of GCS back-off issue (150 taskmanager x 4 slots, also 4 state.backend.rocksdb.checkpoint.transfer.thread.num), probably too many read requests to the same GCS prefix? And I guess it doesn't finish in the time that's expected and randomly fails. Maybe there is some kind of timeout value I can tweak? So downloading from GCS can take time that's necessary without failing prematurely.

Any help is very appreciated!