I've got a flink (1.8.0, emr-5.26) streaming job running on yarn. It's configured to use rocksdb, and checkpoint once a minute to hdfs. This job operates just fine for around 20 days, and then begins failing with this exception (it fails, restarts, and fails again, repeatedly):
2020-04-15 13:15:02,920 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering checkpoint 32701 @ 1586956502911 for job 9953424f21e240112dd23ab4f8320b60. 2020-04-15 13:15:05,762 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed checkpoint 32701 for job 9953424f21e240112dd23ab4f8320b60 (795385496 bytes in 2667 ms). 2020-04-15 13:16:02,919 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering checkpoint 32702 @ 1586956562911 for job 9953424f21e240112dd23ab4f8320b60. 2020-04-15 13:16:03,147 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - <operator_name> (1/2) (f4737add01961f8b42b8eb4e791b83ba) switched from RUNNING to FAILED. AsynchronousException{java.lang.Exception: Could not materialize checkpoint 32702 for operator <operator_name> (1/2).} at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointExceptionHandler.tryHandleCheckpointException(StreamTask.java:1153) at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:947) at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:884) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.Exception: Could not materialize checkpoint 32702 for operator <operator_name> (1/2). at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:942) ... 6 more Caused by: java.util.concurrent.ExecutionException: java.lang.IllegalArgumentException at java.util.concurrent.FutureTask.report(FutureTask.java:122) at java.util.concurrent.FutureTask.get(FutureTask.java:192) at org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:394) at org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.<init>(OperatorSnapshotFinalizer.java:53) at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:853) ... 5 more Caused by: java.lang.IllegalArgumentException at org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:123) at org.apache.flink.runtime.state.OperatorBackendSerializationProxy.<init>(OperatorBackendSerializationProxy.java:68) at org.apache.flink.runtime.state.DefaultOperatorStateBackendSnapshotStrategy$1.callInternal(DefaultOperatorStateBackendSnapshotStrategy.java:138) at org.apache.flink.runtime.state.DefaultOperatorStateBackendSnapshotStrategy$1.callInternal(DefaultOperatorStateBackendSnapshotStrategy.java:108) at org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:75) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:391) ... 7 more This application configured to retain external checkpoints. When I attempt to restart from the last successful checkpoint, it will fail with the same error on the first checkpoint that happens after the restart. I haven't been able to find out why this might be. The source code doesn't seem particularly informative to my eyes: https://github.com/apache/flink/blob/release-1.8.0/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorBackendSerializationProxy.java#L68 Has anyone else seen anything like this? |
Hi Stephen
This is not related with RocksDB but with default on-heap operator state backend. From your exception stack trace, you have created too many operator states (more than 32767).
How do you call context.getOperatorStateStore().getListState or context.getOperatorStateStore().getBroadcastState ? Did you pass a different operator state descriptor each time?
Best
Yun Tang
From: Stephen Patel <[hidden email]>
Sent: Thursday, April 16, 2020 2:09 To: [hidden email] <[hidden email]> Subject: Streaming Job eventually begins failing during checkpointing I've got a flink (1.8.0, emr-5.26) streaming job running on yarn. It's configured to use rocksdb, and checkpoint once a minute to hdfs. This job operates just fine for around 20 days, and then begins failing with this exception (it fails, restarts,
and fails again, repeatedly):
2020-04-15 13:15:02,920 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering checkpoint 32701 @ 1586956502911 for job 9953424f21e240112dd23ab4f8320b60. 2020-04-15 13:15:05,762 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed checkpoint 32701 for job 9953424f21e240112dd23ab4f8320b60 (795385496 bytes in 2667 ms). 2020-04-15 13:16:02,919 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering checkpoint 32702 @ 1586956562911 for job 9953424f21e240112dd23ab4f8320b60. 2020-04-15 13:16:03,147 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - <operator_name> (1/2) (f4737add01961f8b42b8eb4e791b83ba) switched from RUNNING to FAILED. AsynchronousException{java.lang.Exception: Could not materialize checkpoint 32702 for operator <operator_name> (1/2).} at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointExceptionHandler.tryHandleCheckpointException(StreamTask.java:1153) at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:947) at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:884) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.Exception: Could not materialize checkpoint 32702 for operator <operator_name> (1/2). at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:942) ... 6 more Caused by: java.util.concurrent.ExecutionException: java.lang.IllegalArgumentException at java.util.concurrent.FutureTask.report(FutureTask.java:122) at java.util.concurrent.FutureTask.get(FutureTask.java:192) at org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:394) at org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.<init>(OperatorSnapshotFinalizer.java:53) at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:853) ... 5 more Caused by: java.lang.IllegalArgumentException at org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:123) at org.apache.flink.runtime.state.OperatorBackendSerializationProxy.<init>(OperatorBackendSerializationProxy.java:68) at org.apache.flink.runtime.state.DefaultOperatorStateBackendSnapshotStrategy$1.callInternal(DefaultOperatorStateBackendSnapshotStrategy.java:138) at org.apache.flink.runtime.state.DefaultOperatorStateBackendSnapshotStrategy$1.callInternal(DefaultOperatorStateBackendSnapshotStrategy.java:108) at org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:75) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:391) ... 7 more This application configured to retain external checkpoints. When I attempt to restart from the last successful checkpoint, it will fail with the same error on the first checkpoint that happens after the restart.
I haven't been able to find out why this might be. The source code doesn't seem particularly informative to my eyes: https://github.com/apache/flink/blob/release-1.8.0/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorBackendSerializationProxy.java#L68
Has anyone else seen anything like this?
|
I can't say that I ever call that directly. The beam library that I'm using does call it in a couple places: https://github.com/apache/beam/blob/v2.14.0/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java#L422-L429 But it seems to be the same descriptor every time. Is that limit per operator? That is, can each operator host up to 32767 operator/broadcast states? I assume that's by name? On Wed, Apr 15, 2020 at 10:46 PM Yun Tang <[hidden email]> wrote:
|
Correction. I've actually found a place where it potentially might be creating a new operator state per checkpoint: https://github.com/apache/beam/blob/4fc924a8193bb9495c6b7ba755ced576bb8a35d5/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/stableinput/BufferingDoFnRunner.java#L91-L105https://github.com/apache/beam/blob/4fc924a8193bb9495c6b7ba755ced576bb8a35d5/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/stableinput/BufferingDoFnRunner.java#L141-L149 This gives me something I can investigate locally at least. On Thu, Apr 16, 2020 at 9:03 AM Stephen Patel <[hidden email]> wrote:
|
Hi Stephen
I think the state name [1] which would be changed every time might the root cause. I am not familiar with Beam code, would it be possible to create so many operator states? Did you configure some parameters wrongly?
Best
Yun Tang
From: Stephen Patel <[hidden email]>
Sent: Thursday, April 16, 2020 22:30 To: Yun Tang <[hidden email]> Cc: [hidden email] <[hidden email]> Subject: Re: Streaming Job eventually begins failing during checkpointing Correction. I've actually found a place where it potentially might be creating a new operator state per checkpoint:
https://github.com/apache/beam/blob/4fc924a8193bb9495c6b7ba755ced576bb8a35d5/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/stableinput/BufferingDoFnRunner.java#L91-L105https://github.com/apache/beam/blob/4fc924a8193bb9495c6b7ba755ced576bb8a35d5/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/stableinput/BufferingDoFnRunner.java#L141-L149
This gives me something I can investigate locally at least.
On Thu, Apr 16, 2020 at 9:03 AM Stephen Patel <[hidden email]> wrote:
|
I posted to the beam mailing list: https://lists.apache.org/thread.html/rb2ebfad16d85bcf668978b3defd442feda0903c20db29c323497a672%40%3Cuser.beam.apache.org%3E I think this is related to a Beam feature called RequiresStableInput (which my pipeline is using). It will create a new operator (or keyed) state per checkpoint. I'm not sure that there are any parameters that I have control over to tweak it's behavior (apart from increasing the checkpoint interval to let the pipeline run longer before building up that many states). Perhaps this is something that can be fixed (maybe by unregistering Operator States after they aren't used any more in the RequiresStableInput code). It seems to me that this isn't a Flink issue, but rather a Beam issue. Thanks for pointing me in the right direction. On Thu, Apr 16, 2020 at 11:29 AM Yun Tang <[hidden email]> wrote:
|
If something requires Beam to register a new state each time, then this is tricky, because currently you cannot unregister states from Flink. @Yu @Yun I remember chatting about this (allowing to explicitly unregister states so they get dropped from successive checkpoints) at some point, but I could not find a jira ticket for this. Do you remember what the status of that discussion is? On Thu, Apr 16, 2020 at 6:37 PM Stephen Patel <[hidden email]> wrote:
|
Hi All, I think the Beam Community fixed this issue: https://github.com/apache/beam/pull/11478 Thanks! Eleanore On Thu, Apr 23, 2020 at 4:24 AM Stephan Ewen <[hidden email]> wrote:
|
Sorry, just noticed this thread... @Stephan I cannot remember the discussion but I think it's an interesting topic, will find some time to consider it (unregister states). @Eleanore Glad to know that Beam community has fixed it and thanks for the reference. Best Regards,
Yu On Sun, 26 Apr 2020 at 03:10, Eleanore Jin <[hidden email]> wrote:
|
Free forum by Nabble | Edit this page |