RocksDB StateBuilder unexpected exception

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

RocksDB StateBuilder unexpected exception

dhanesh arole
Hello Hivemind, 

We are running a stateful streaming job. Each task manager instance hosts around ~100GB of data. During restart of task managers we encountered following errors, because of which the job is not able to restart. Initially we thought it might be due to failing status checks of attached EBS volumes or burst balance exhaustion but AWS console is not indicating any issue with EBS volumes. Is there anything that else that we need to look at which can potentially cause this exception? Also it's quite unclear what exactly is the cause of the exception, any help on that would be much appreciated. 

Flink version: 1.12.2_scala_2.11
Environment: Kubernetes on AWS
Volume Type: EBS, gp2 300GiB

ERROR org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder [] - Caught unexpected exception.
java.nio.channels.ClosedChannelException: null
at sun.nio.ch.FileChannelImpl.ensureOpen(FileChannelImpl.java:150) ~[?:?]
at sun.nio.ch.FileChannelImpl.write(FileChannelImpl.java:266) ~[?:?]
at java.nio.channels.Channels.writeFullyImpl(Channels.java:74) ~[?:?]
at java.nio.channels.Channels.writeFully(Channels.java:97) ~[?:?]
at java.nio.channels.Channels$1.write(Channels.java:172) ~[?:?]
at org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.downloadDataForStateHandle(RocksDBStateDownloader.java:141) ~[flink-dist_2.12-1.12.2.jar:1.12.2]
at org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.lambda$createDownloadRunnables$0(RocksDBStateDownloader.java:110) ~[flink-dist_2.12-1.12.2.jar:1.12.2]
at org.apache.flink.util.function.ThrowingRunnable.lambda$unchecked$0(ThrowingRunnable.java:49) ~[flink-dist_2.12-1.12.2.jar:1.12.2]
at java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1807) ~[?:?]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) ~[?:?]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) ~[?:?]
at java.lang.Thread.run(Thread.java:830) [?:?]
2021-03-19 15:26:10,385 WARN  org.apache.flink.streaming.api.operators.BackendRestorerProcedure [] - Exception while restoring keyed state backend for KeyedCoProcessOperator_55a6c4a5d36b0124ad78cbf6bd864bba_(2/8) from alternative (1/1), will retry while more alternatives are available.
org.apache.flink.runtime.state.BackendBuildingException: Caught unexpected exception.
at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:362) ~[flink-dist_2.12-1.12.2.jar:1.12.2]
at org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:587) ~[flink-dist_2.12-1.12.2.jar:1.12.2]
at org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:93) ~[flink-dist_2.12-1.12.2.jar:1.12.2]
at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:328) ~[flink-dist_2.12-1.12.2.jar:1.12.2]
at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:168) ~[flink-dist_2.12-1.12.2.jar:1.12.2]
at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135) ~[flink-dist_2.12-1.12.2.jar:1.12.2]
at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:345) ~[flink-dist_2.12-1.12.2.jar:1.12.2]
at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:163) ~[flink-dist_2.12-1.12.2.jar:1.12.2]
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:272) ~[flink-dist_2.12-1.12.2.jar:1.12.2]
at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:427) ~[flink-dist_2.12-1.12.2.jar:1.12.2]
at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$2(StreamTask.java:543) ~[flink-dist_2.12-1.12.2.jar:1.12.2]
at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50) [flink-dist_2.12-1.12.2.jar:1.12.2]
at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:533) [flink-dist_2.12-1.12.2.jar:1.12.2]
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:573) [flink-dist_2.12-1.12.2.jar:1.12.2]
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755) [flink-dist_2.12-1.12.2.jar:1.12.2]
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570) [flink-dist_2.12-1.12.2.jar:1.12.2]
at java.lang.Thread.run(Thread.java:830) [?:?]
Caused by: java.nio.channels.ClosedChannelException
at sun.nio.ch.FileChannelImpl.ensureOpen(FileChannelImpl.java:150) ~[?:?]
at sun.nio.ch.FileChannelImpl.write(FileChannelImpl.java:266) ~[?:?]
at java.nio.channels.Channels.writeFullyImpl(Channels.java:74) ~[?:?]
at java.nio.channels.Channels.writeFully(Channels.java:97) ~[?:?]
at java.nio.channels.Channels$1.write(Channels.java:172) ~[?:?]
at org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.downloadDataForStateHandle(RocksDBStateDownloader.java:141) ~[flink-dist_2.12-1.12.2.jar:1.12.2]
at org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.lambda$createDownloadRunnables$0(RocksDBStateDownloader.java:110) ~[flink-dist_2.12-1.12.2.jar:1.12.2]
at org.apache.flink.util.function.ThrowingRunnable.lambda$unchecked$0(ThrowingRunnable.java:49) ~[flink-dist_2.12-1.12.2.jar:1.12.2]
at java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1807) ~[?:?]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) ~[?:?]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) ~[?:?]

Dhanesh Arole 

Reply | Threaded
Open this post in threaded view
|

Re: RocksDB StateBuilder unexpected exception

Matthias
Hi Danesh,
thanks for reaching out to the Flink community. Checking the code, it looks like the OutputStream is added to a CloseableRegistry before writing to it [1]. 

My suspicion is - based on the exception cause - that the CloseableRegistry got triggered while restoring the state. I tried to track down the source of the CloseableRegistry. It looks like it's handed down from the StreamTask [2]. 

The StreamTask closes the CloseableRegistry either when cancelling is triggered or in the class' finalize method. Have you checked the logs to see whether there was some task cancellation logged?

Best,
Matthias


On Fri, Mar 19, 2021 at 5:07 PM dhanesh arole <[hidden email]> wrote:
Hello Hivemind, 

We are running a stateful streaming job. Each task manager instance hosts around ~100GB of data. During restart of task managers we encountered following errors, because of which the job is not able to restart. Initially we thought it might be due to failing status checks of attached EBS volumes or burst balance exhaustion but AWS console is not indicating any issue with EBS volumes. Is there anything that else that we need to look at which can potentially cause this exception? Also it's quite unclear what exactly is the cause of the exception, any help on that would be much appreciated. 

Flink version: 1.12.2_scala_2.11
Environment: Kubernetes on AWS
Volume Type: EBS, gp2 300GiB

ERROR org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder [] - Caught unexpected exception.
java.nio.channels.ClosedChannelException: null
at sun.nio.ch.FileChannelImpl.ensureOpen(FileChannelImpl.java:150) ~[?:?]
at sun.nio.ch.FileChannelImpl.write(FileChannelImpl.java:266) ~[?:?]
at java.nio.channels.Channels.writeFullyImpl(Channels.java:74) ~[?:?]
at java.nio.channels.Channels.writeFully(Channels.java:97) ~[?:?]
at java.nio.channels.Channels$1.write(Channels.java:172) ~[?:?]
at org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.downloadDataForStateHandle(RocksDBStateDownloader.java:141) ~[flink-dist_2.12-1.12.2.jar:1.12.2]
at org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.lambda$createDownloadRunnables$0(RocksDBStateDownloader.java:110) ~[flink-dist_2.12-1.12.2.jar:1.12.2]
at org.apache.flink.util.function.ThrowingRunnable.lambda$unchecked$0(ThrowingRunnable.java:49) ~[flink-dist_2.12-1.12.2.jar:1.12.2]
at java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1807) ~[?:?]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) ~[?:?]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) ~[?:?]
at java.lang.Thread.run(Thread.java:830) [?:?]
2021-03-19 15:26:10,385 WARN  org.apache.flink.streaming.api.operators.BackendRestorerProcedure [] - Exception while restoring keyed state backend for KeyedCoProcessOperator_55a6c4a5d36b0124ad78cbf6bd864bba_(2/8) from alternative (1/1), will retry while more alternatives are available.
org.apache.flink.runtime.state.BackendBuildingException: Caught unexpected exception.
at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:362) ~[flink-dist_2.12-1.12.2.jar:1.12.2]
at org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:587) ~[flink-dist_2.12-1.12.2.jar:1.12.2]
at org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:93) ~[flink-dist_2.12-1.12.2.jar:1.12.2]
at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:328) ~[flink-dist_2.12-1.12.2.jar:1.12.2]
at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:168) ~[flink-dist_2.12-1.12.2.jar:1.12.2]
at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135) ~[flink-dist_2.12-1.12.2.jar:1.12.2]
at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:345) ~[flink-dist_2.12-1.12.2.jar:1.12.2]
at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:163) ~[flink-dist_2.12-1.12.2.jar:1.12.2]
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:272) ~[flink-dist_2.12-1.12.2.jar:1.12.2]
at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:427) ~[flink-dist_2.12-1.12.2.jar:1.12.2]
at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$2(StreamTask.java:543) ~[flink-dist_2.12-1.12.2.jar:1.12.2]
at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50) [flink-dist_2.12-1.12.2.jar:1.12.2]
at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:533) [flink-dist_2.12-1.12.2.jar:1.12.2]
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:573) [flink-dist_2.12-1.12.2.jar:1.12.2]
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755) [flink-dist_2.12-1.12.2.jar:1.12.2]
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570) [flink-dist_2.12-1.12.2.jar:1.12.2]
at java.lang.Thread.run(Thread.java:830) [?:?]
Caused by: java.nio.channels.ClosedChannelException
at sun.nio.ch.FileChannelImpl.ensureOpen(FileChannelImpl.java:150) ~[?:?]
at sun.nio.ch.FileChannelImpl.write(FileChannelImpl.java:266) ~[?:?]
at java.nio.channels.Channels.writeFullyImpl(Channels.java:74) ~[?:?]
at java.nio.channels.Channels.writeFully(Channels.java:97) ~[?:?]
at java.nio.channels.Channels$1.write(Channels.java:172) ~[?:?]
at org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.downloadDataForStateHandle(RocksDBStateDownloader.java:141) ~[flink-dist_2.12-1.12.2.jar:1.12.2]
at org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.lambda$createDownloadRunnables$0(RocksDBStateDownloader.java:110) ~[flink-dist_2.12-1.12.2.jar:1.12.2]
at org.apache.flink.util.function.ThrowingRunnable.lambda$unchecked$0(ThrowingRunnable.java:49) ~[flink-dist_2.12-1.12.2.jar:1.12.2]
at java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1807) ~[?:?]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) ~[?:?]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) ~[?:?]

Dhanesh Arole 

Reply | Threaded
Open this post in threaded view
|

Re: RocksDB StateBuilder unexpected exception

dhanesh arole
Hi Matthias, 

Thanks for taking to help us with this.

You are right there were lots of task cancellations, as this exception causes the job to get restarted, triggering cancellations.


Dhanesh Arole 


On Tue, Mar 23, 2021 at 9:27 AM Matthias Pohl <[hidden email]> wrote:
Hi Danesh,
thanks for reaching out to the Flink community. Checking the code, it looks like the OutputStream is added to a CloseableRegistry before writing to it [1]. 

My suspicion is - based on the exception cause - that the CloseableRegistry got triggered while restoring the state. I tried to track down the source of the CloseableRegistry. It looks like it's handed down from the StreamTask [2]. 

The StreamTask closes the CloseableRegistry either when cancelling is triggered or in the class' finalize method. Have you checked the logs to see whether there was some task cancellation logged?

Best,
Matthias


On Fri, Mar 19, 2021 at 5:07 PM dhanesh arole <[hidden email]> wrote:
Hello Hivemind, 

We are running a stateful streaming job. Each task manager instance hosts around ~100GB of data. During restart of task managers we encountered following errors, because of which the job is not able to restart. Initially we thought it might be due to failing status checks of attached EBS volumes or burst balance exhaustion but AWS console is not indicating any issue with EBS volumes. Is there anything that else that we need to look at which can potentially cause this exception? Also it's quite unclear what exactly is the cause of the exception, any help on that would be much appreciated. 

Flink version: 1.12.2_scala_2.11
Environment: Kubernetes on AWS
Volume Type: EBS, gp2 300GiB

ERROR org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder [] - Caught unexpected exception.
java.nio.channels.ClosedChannelException: null
at sun.nio.ch.FileChannelImpl.ensureOpen(FileChannelImpl.java:150) ~[?:?]
at sun.nio.ch.FileChannelImpl.write(FileChannelImpl.java:266) ~[?:?]
at java.nio.channels.Channels.writeFullyImpl(Channels.java:74) ~[?:?]
at java.nio.channels.Channels.writeFully(Channels.java:97) ~[?:?]
at java.nio.channels.Channels$1.write(Channels.java:172) ~[?:?]
at org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.downloadDataForStateHandle(RocksDBStateDownloader.java:141) ~[flink-dist_2.12-1.12.2.jar:1.12.2]
at org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.lambda$createDownloadRunnables$0(RocksDBStateDownloader.java:110) ~[flink-dist_2.12-1.12.2.jar:1.12.2]
at org.apache.flink.util.function.ThrowingRunnable.lambda$unchecked$0(ThrowingRunnable.java:49) ~[flink-dist_2.12-1.12.2.jar:1.12.2]
at java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1807) ~[?:?]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) ~[?:?]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) ~[?:?]
at java.lang.Thread.run(Thread.java:830) [?:?]
2021-03-19 15:26:10,385 WARN  org.apache.flink.streaming.api.operators.BackendRestorerProcedure [] - Exception while restoring keyed state backend for KeyedCoProcessOperator_55a6c4a5d36b0124ad78cbf6bd864bba_(2/8) from alternative (1/1), will retry while more alternatives are available.
org.apache.flink.runtime.state.BackendBuildingException: Caught unexpected exception.
at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:362) ~[flink-dist_2.12-1.12.2.jar:1.12.2]
at org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:587) ~[flink-dist_2.12-1.12.2.jar:1.12.2]
at org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:93) ~[flink-dist_2.12-1.12.2.jar:1.12.2]
at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:328) ~[flink-dist_2.12-1.12.2.jar:1.12.2]
at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:168) ~[flink-dist_2.12-1.12.2.jar:1.12.2]
at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135) ~[flink-dist_2.12-1.12.2.jar:1.12.2]
at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:345) ~[flink-dist_2.12-1.12.2.jar:1.12.2]
at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:163) ~[flink-dist_2.12-1.12.2.jar:1.12.2]
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:272) ~[flink-dist_2.12-1.12.2.jar:1.12.2]
at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:427) ~[flink-dist_2.12-1.12.2.jar:1.12.2]
at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$2(StreamTask.java:543) ~[flink-dist_2.12-1.12.2.jar:1.12.2]
at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50) [flink-dist_2.12-1.12.2.jar:1.12.2]
at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:533) [flink-dist_2.12-1.12.2.jar:1.12.2]
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:573) [flink-dist_2.12-1.12.2.jar:1.12.2]
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755) [flink-dist_2.12-1.12.2.jar:1.12.2]
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570) [flink-dist_2.12-1.12.2.jar:1.12.2]
at java.lang.Thread.run(Thread.java:830) [?:?]
Caused by: java.nio.channels.ClosedChannelException
at sun.nio.ch.FileChannelImpl.ensureOpen(FileChannelImpl.java:150) ~[?:?]
at sun.nio.ch.FileChannelImpl.write(FileChannelImpl.java:266) ~[?:?]
at java.nio.channels.Channels.writeFullyImpl(Channels.java:74) ~[?:?]
at java.nio.channels.Channels.writeFully(Channels.java:97) ~[?:?]
at java.nio.channels.Channels$1.write(Channels.java:172) ~[?:?]
at org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.downloadDataForStateHandle(RocksDBStateDownloader.java:141) ~[flink-dist_2.12-1.12.2.jar:1.12.2]
at org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.lambda$createDownloadRunnables$0(RocksDBStateDownloader.java:110) ~[flink-dist_2.12-1.12.2.jar:1.12.2]
at org.apache.flink.util.function.ThrowingRunnable.lambda$unchecked$0(ThrowingRunnable.java:49) ~[flink-dist_2.12-1.12.2.jar:1.12.2]
at java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1807) ~[?:?]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) ~[?:?]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) ~[?:?]

Dhanesh Arole