Streaming Job eventually begins failing during checkpointing

classic Classic list List threaded Threaded
9 messages Options
Reply | Threaded
Open this post in threaded view
|

Streaming Job eventually begins failing during checkpointing

Stephen Patel
I've got a flink (1.8.0, emr-5.26) streaming job running on yarn.  It's configured to use rocksdb, and checkpoint once a minute to hdfs.  This job operates just fine for around 20 days, and then begins failing with this exception (it fails, restarts, and fails again, repeatedly):

2020-04-15 13:15:02,920 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Triggering checkpoint 32701 @ 1586956502911 for job 9953424f21e240112dd23ab4f8320b60.
2020-04-15 13:15:05,762 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Completed checkpoint 32701 for job 9953424f21e240112dd23ab4f8320b60 (795385496 bytes in 2667 ms).
2020-04-15 13:16:02,919 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Triggering checkpoint 32702 @ 1586956562911 for job 9953424f21e240112dd23ab4f8320b60.
2020-04-15 13:16:03,147 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - <operator_name> (1/2) (f4737add01961f8b42b8eb4e791b83ba) switched from RUNNING to FAILED.
AsynchronousException{java.lang.Exception: Could not materialize checkpoint 32702 for operator <operator_name> (1/2).}
at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointExceptionHandler.tryHandleCheckpointException(StreamTask.java:1153)
at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:947)
at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:884)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.Exception: Could not materialize checkpoint 32702 for operator <operator_name> (1/2).
at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:942)
... 6 more
Caused by: java.util.concurrent.ExecutionException: java.lang.IllegalArgumentException
at java.util.concurrent.FutureTask.report(FutureTask.java:122)
at java.util.concurrent.FutureTask.get(FutureTask.java:192)
at org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:394)
at org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.<init>(OperatorSnapshotFinalizer.java:53)
at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:853)
... 5 more
Caused by: java.lang.IllegalArgumentException
at org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:123)
at org.apache.flink.runtime.state.OperatorBackendSerializationProxy.<init>(OperatorBackendSerializationProxy.java:68)
at org.apache.flink.runtime.state.DefaultOperatorStateBackendSnapshotStrategy$1.callInternal(DefaultOperatorStateBackendSnapshotStrategy.java:138)
at org.apache.flink.runtime.state.DefaultOperatorStateBackendSnapshotStrategy$1.callInternal(DefaultOperatorStateBackendSnapshotStrategy.java:108)
at org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:75)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:391)
... 7 more

This application configured to retain external checkpoints.  When I attempt to restart from the last successful checkpoint, it will fail with the same error on the first checkpoint that happens after the restart.

I haven't been able to find out why this might be. The source code doesn't seem particularly informative to my eyes: https://github.com/apache/flink/blob/release-1.8.0/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorBackendSerializationProxy.java#L68

Has anyone else seen anything like this?
Reply | Threaded
Open this post in threaded view
|

Re: Streaming Job eventually begins failing during checkpointing

Yun Tang
Hi  Stephen

This is not related with RocksDB but with default on-heap operator state backend. From your exception stack trace, you have created too many operator states (more than 32767).
How do you call context.getOperatorStateStore().getListState or context.getOperatorStateStore().getBroadcastState ? Did you pass a different operator state descriptor each time?

Best
Yun Tang

From: Stephen Patel <[hidden email]>
Sent: Thursday, April 16, 2020 2:09
To: [hidden email] <[hidden email]>
Subject: Streaming Job eventually begins failing during checkpointing
 
I've got a flink (1.8.0, emr-5.26) streaming job running on yarn.  It's configured to use rocksdb, and checkpoint once a minute to hdfs.  This job operates just fine for around 20 days, and then begins failing with this exception (it fails, restarts, and fails again, repeatedly):

2020-04-15 13:15:02,920 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Triggering checkpoint 32701 @ 1586956502911 for job 9953424f21e240112dd23ab4f8320b60.
2020-04-15 13:15:05,762 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Completed checkpoint 32701 for job 9953424f21e240112dd23ab4f8320b60 (795385496 bytes in 2667 ms).
2020-04-15 13:16:02,919 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Triggering checkpoint 32702 @ 1586956562911 for job 9953424f21e240112dd23ab4f8320b60.
2020-04-15 13:16:03,147 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - <operator_name> (1/2) (f4737add01961f8b42b8eb4e791b83ba) switched from RUNNING to FAILED.
AsynchronousException{java.lang.Exception: Could not materialize checkpoint 32702 for operator <operator_name> (1/2).}
at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointExceptionHandler.tryHandleCheckpointException(StreamTask.java:1153)
at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:947)
at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:884)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.Exception: Could not materialize checkpoint 32702 for operator <operator_name> (1/2).
at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:942)
... 6 more
Caused by: java.util.concurrent.ExecutionException: java.lang.IllegalArgumentException
at java.util.concurrent.FutureTask.report(FutureTask.java:122)
at java.util.concurrent.FutureTask.get(FutureTask.java:192)
at org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:394)
at org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.<init>(OperatorSnapshotFinalizer.java:53)
at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:853)
... 5 more
Caused by: java.lang.IllegalArgumentException
at org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:123)
at org.apache.flink.runtime.state.OperatorBackendSerializationProxy.<init>(OperatorBackendSerializationProxy.java:68)
at org.apache.flink.runtime.state.DefaultOperatorStateBackendSnapshotStrategy$1.callInternal(DefaultOperatorStateBackendSnapshotStrategy.java:138)
at org.apache.flink.runtime.state.DefaultOperatorStateBackendSnapshotStrategy$1.callInternal(DefaultOperatorStateBackendSnapshotStrategy.java:108)
at org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:75)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:391)
... 7 more

This application configured to retain external checkpoints.  When I attempt to restart from the last successful checkpoint, it will fail with the same error on the first checkpoint that happens after the restart.

I haven't been able to find out why this might be. The source code doesn't seem particularly informative to my eyes: https://github.com/apache/flink/blob/release-1.8.0/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorBackendSerializationProxy.java#L68

Has anyone else seen anything like this?
Reply | Threaded
Open this post in threaded view
|

Re: Streaming Job eventually begins failing during checkpointing

Stephen Patel
I can't say that I ever call that directly.  The beam library that I'm using does call it in a couple places: https://github.com/apache/beam/blob/v2.14.0/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java#L422-L429

But it seems to be the same descriptor every time.  Is that limit per operator?  That is, can each operator host up to 32767 operator/broadcast states?  I assume that's by name?

On Wed, Apr 15, 2020 at 10:46 PM Yun Tang <[hidden email]> wrote:
Hi  Stephen

This is not related with RocksDB but with default on-heap operator state backend. From your exception stack trace, you have created too many operator states (more than 32767).
How do you call context.getOperatorStateStore().getListState or context.getOperatorStateStore().getBroadcastState ? Did you pass a different operator state descriptor each time?

Best
Yun Tang

From: Stephen Patel <[hidden email]>
Sent: Thursday, April 16, 2020 2:09
To: [hidden email] <[hidden email]>
Subject: Streaming Job eventually begins failing during checkpointing
 
I've got a flink (1.8.0, emr-5.26) streaming job running on yarn.  It's configured to use rocksdb, and checkpoint once a minute to hdfs.  This job operates just fine for around 20 days, and then begins failing with this exception (it fails, restarts, and fails again, repeatedly):

2020-04-15 13:15:02,920 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Triggering checkpoint 32701 @ 1586956502911 for job 9953424f21e240112dd23ab4f8320b60.
2020-04-15 13:15:05,762 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Completed checkpoint 32701 for job 9953424f21e240112dd23ab4f8320b60 (795385496 bytes in 2667 ms).
2020-04-15 13:16:02,919 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Triggering checkpoint 32702 @ 1586956562911 for job 9953424f21e240112dd23ab4f8320b60.
2020-04-15 13:16:03,147 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - <operator_name> (1/2) (f4737add01961f8b42b8eb4e791b83ba) switched from RUNNING to FAILED.
AsynchronousException{java.lang.Exception: Could not materialize checkpoint 32702 for operator <operator_name> (1/2).}
at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointExceptionHandler.tryHandleCheckpointException(StreamTask.java:1153)
at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:947)
at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:884)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.Exception: Could not materialize checkpoint 32702 for operator <operator_name> (1/2).
at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:942)
... 6 more
Caused by: java.util.concurrent.ExecutionException: java.lang.IllegalArgumentException
at java.util.concurrent.FutureTask.report(FutureTask.java:122)
at java.util.concurrent.FutureTask.get(FutureTask.java:192)
at org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:394)
at org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.<init>(OperatorSnapshotFinalizer.java:53)
at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:853)
... 5 more
Caused by: java.lang.IllegalArgumentException
at org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:123)
at org.apache.flink.runtime.state.OperatorBackendSerializationProxy.<init>(OperatorBackendSerializationProxy.java:68)
at org.apache.flink.runtime.state.DefaultOperatorStateBackendSnapshotStrategy$1.callInternal(DefaultOperatorStateBackendSnapshotStrategy.java:138)
at org.apache.flink.runtime.state.DefaultOperatorStateBackendSnapshotStrategy$1.callInternal(DefaultOperatorStateBackendSnapshotStrategy.java:108)
at org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:75)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:391)
... 7 more

This application configured to retain external checkpoints.  When I attempt to restart from the last successful checkpoint, it will fail with the same error on the first checkpoint that happens after the restart.

I haven't been able to find out why this might be. The source code doesn't seem particularly informative to my eyes: https://github.com/apache/flink/blob/release-1.8.0/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorBackendSerializationProxy.java#L68

Has anyone else seen anything like this?
Reply | Threaded
Open this post in threaded view
|

Re: Streaming Job eventually begins failing during checkpointing

Stephen Patel

On Thu, Apr 16, 2020 at 9:03 AM Stephen Patel <[hidden email]> wrote:
I can't say that I ever call that directly.  The beam library that I'm using does call it in a couple places: https://github.com/apache/beam/blob/v2.14.0/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java#L422-L429

But it seems to be the same descriptor every time.  Is that limit per operator?  That is, can each operator host up to 32767 operator/broadcast states?  I assume that's by name?

On Wed, Apr 15, 2020 at 10:46 PM Yun Tang <[hidden email]> wrote:
Hi  Stephen

This is not related with RocksDB but with default on-heap operator state backend. From your exception stack trace, you have created too many operator states (more than 32767).
How do you call context.getOperatorStateStore().getListState or context.getOperatorStateStore().getBroadcastState ? Did you pass a different operator state descriptor each time?

Best
Yun Tang

From: Stephen Patel <[hidden email]>
Sent: Thursday, April 16, 2020 2:09
To: [hidden email] <[hidden email]>
Subject: Streaming Job eventually begins failing during checkpointing
 
I've got a flink (1.8.0, emr-5.26) streaming job running on yarn.  It's configured to use rocksdb, and checkpoint once a minute to hdfs.  This job operates just fine for around 20 days, and then begins failing with this exception (it fails, restarts, and fails again, repeatedly):

2020-04-15 13:15:02,920 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Triggering checkpoint 32701 @ 1586956502911 for job 9953424f21e240112dd23ab4f8320b60.
2020-04-15 13:15:05,762 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Completed checkpoint 32701 for job 9953424f21e240112dd23ab4f8320b60 (795385496 bytes in 2667 ms).
2020-04-15 13:16:02,919 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Triggering checkpoint 32702 @ 1586956562911 for job 9953424f21e240112dd23ab4f8320b60.
2020-04-15 13:16:03,147 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - <operator_name> (1/2) (f4737add01961f8b42b8eb4e791b83ba) switched from RUNNING to FAILED.
AsynchronousException{java.lang.Exception: Could not materialize checkpoint 32702 for operator <operator_name> (1/2).}
at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointExceptionHandler.tryHandleCheckpointException(StreamTask.java:1153)
at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:947)
at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:884)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.Exception: Could not materialize checkpoint 32702 for operator <operator_name> (1/2).
at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:942)
... 6 more
Caused by: java.util.concurrent.ExecutionException: java.lang.IllegalArgumentException
at java.util.concurrent.FutureTask.report(FutureTask.java:122)
at java.util.concurrent.FutureTask.get(FutureTask.java:192)
at org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:394)
at org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.<init>(OperatorSnapshotFinalizer.java:53)
at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:853)
... 5 more
Caused by: java.lang.IllegalArgumentException
at org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:123)
at org.apache.flink.runtime.state.OperatorBackendSerializationProxy.<init>(OperatorBackendSerializationProxy.java:68)
at org.apache.flink.runtime.state.DefaultOperatorStateBackendSnapshotStrategy$1.callInternal(DefaultOperatorStateBackendSnapshotStrategy.java:138)
at org.apache.flink.runtime.state.DefaultOperatorStateBackendSnapshotStrategy$1.callInternal(DefaultOperatorStateBackendSnapshotStrategy.java:108)
at org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:75)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:391)
... 7 more

This application configured to retain external checkpoints.  When I attempt to restart from the last successful checkpoint, it will fail with the same error on the first checkpoint that happens after the restart.

I haven't been able to find out why this might be. The source code doesn't seem particularly informative to my eyes: https://github.com/apache/flink/blob/release-1.8.0/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorBackendSerializationProxy.java#L68

Has anyone else seen anything like this?
Reply | Threaded
Open this post in threaded view
|

Re: Streaming Job eventually begins failing during checkpointing

Yun Tang
Hi Stephen

I think the state name [1] which would be changed every time might the root cause. I am not familiar with Beam code, would it be possible to create so many operator states? Did you configure some parameters wrongly?



Best
Yun Tang

From: Stephen Patel <[hidden email]>
Sent: Thursday, April 16, 2020 22:30
To: Yun Tang <[hidden email]>
Cc: [hidden email] <[hidden email]>
Subject: Re: Streaming Job eventually begins failing during checkpointing
 

On Thu, Apr 16, 2020 at 9:03 AM Stephen Patel <[hidden email]> wrote:
I can't say that I ever call that directly.  The beam library that I'm using does call it in a couple places: https://github.com/apache/beam/blob/v2.14.0/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java#L422-L429

But it seems to be the same descriptor every time.  Is that limit per operator?  That is, can each operator host up to 32767 operator/broadcast states?  I assume that's by name?

On Wed, Apr 15, 2020 at 10:46 PM Yun Tang <[hidden email]> wrote:
Hi  Stephen

This is not related with RocksDB but with default on-heap operator state backend. From your exception stack trace, you have created too many operator states (more than 32767).
How do you call context.getOperatorStateStore().getListState or context.getOperatorStateStore().getBroadcastState ? Did you pass a different operator state descriptor each time?

Best
Yun Tang

From: Stephen Patel <[hidden email]>
Sent: Thursday, April 16, 2020 2:09
To: [hidden email] <[hidden email]>
Subject: Streaming Job eventually begins failing during checkpointing
 
I've got a flink (1.8.0, emr-5.26) streaming job running on yarn.  It's configured to use rocksdb, and checkpoint once a minute to hdfs.  This job operates just fine for around 20 days, and then begins failing with this exception (it fails, restarts, and fails again, repeatedly):

2020-04-15 13:15:02,920 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Triggering checkpoint 32701 @ 1586956502911 for job 9953424f21e240112dd23ab4f8320b60.
2020-04-15 13:15:05,762 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Completed checkpoint 32701 for job 9953424f21e240112dd23ab4f8320b60 (795385496 bytes in 2667 ms).
2020-04-15 13:16:02,919 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Triggering checkpoint 32702 @ 1586956562911 for job 9953424f21e240112dd23ab4f8320b60.
2020-04-15 13:16:03,147 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - <operator_name> (1/2) (f4737add01961f8b42b8eb4e791b83ba) switched from RUNNING to FAILED.
AsynchronousException{java.lang.Exception: Could not materialize checkpoint 32702 for operator <operator_name> (1/2).}
at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointExceptionHandler.tryHandleCheckpointException(StreamTask.java:1153)
at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:947)
at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:884)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.Exception: Could not materialize checkpoint 32702 for operator <operator_name> (1/2).
at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:942)
... 6 more
Caused by: java.util.concurrent.ExecutionException: java.lang.IllegalArgumentException
at java.util.concurrent.FutureTask.report(FutureTask.java:122)
at java.util.concurrent.FutureTask.get(FutureTask.java:192)
at org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:394)
at org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.<init>(OperatorSnapshotFinalizer.java:53)
at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:853)
... 5 more
Caused by: java.lang.IllegalArgumentException
at org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:123)
at org.apache.flink.runtime.state.OperatorBackendSerializationProxy.<init>(OperatorBackendSerializationProxy.java:68)
at org.apache.flink.runtime.state.DefaultOperatorStateBackendSnapshotStrategy$1.callInternal(DefaultOperatorStateBackendSnapshotStrategy.java:138)
at org.apache.flink.runtime.state.DefaultOperatorStateBackendSnapshotStrategy$1.callInternal(DefaultOperatorStateBackendSnapshotStrategy.java:108)
at org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:75)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:391)
... 7 more

This application configured to retain external checkpoints.  When I attempt to restart from the last successful checkpoint, it will fail with the same error on the first checkpoint that happens after the restart.

I haven't been able to find out why this might be. The source code doesn't seem particularly informative to my eyes: https://github.com/apache/flink/blob/release-1.8.0/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorBackendSerializationProxy.java#L68

Has anyone else seen anything like this?
Reply | Threaded
Open this post in threaded view
|

Re: Streaming Job eventually begins failing during checkpointing

Stephen Patel
I posted to the beam mailing list: https://lists.apache.org/thread.html/rb2ebfad16d85bcf668978b3defd442feda0903c20db29c323497a672%40%3Cuser.beam.apache.org%3E

I think this is related to a Beam feature called RequiresStableInput (which my pipeline is using).  It will create a new operator (or keyed) state per checkpoint.  I'm not sure that there are any parameters that I have control over to tweak it's behavior (apart from increasing the checkpoint interval to let the pipeline run longer before building up that many states).

Perhaps this is something that can be fixed (maybe by unregistering Operator States after they aren't used any more in the RequiresStableInput code).  It seems to me that this isn't a Flink issue, but rather a Beam issue.  

Thanks for pointing me in the right direction.  

On Thu, Apr 16, 2020 at 11:29 AM Yun Tang <[hidden email]> wrote:
Hi Stephen

I think the state name [1] which would be changed every time might the root cause. I am not familiar with Beam code, would it be possible to create so many operator states? Did you configure some parameters wrongly?



Best
Yun Tang

From: Stephen Patel <[hidden email]>
Sent: Thursday, April 16, 2020 22:30
To: Yun Tang <[hidden email]>
Cc: [hidden email] <[hidden email]>
Subject: Re: Streaming Job eventually begins failing during checkpointing
 

On Thu, Apr 16, 2020 at 9:03 AM Stephen Patel <[hidden email]> wrote:
I can't say that I ever call that directly.  The beam library that I'm using does call it in a couple places: https://github.com/apache/beam/blob/v2.14.0/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java#L422-L429

But it seems to be the same descriptor every time.  Is that limit per operator?  That is, can each operator host up to 32767 operator/broadcast states?  I assume that's by name?

On Wed, Apr 15, 2020 at 10:46 PM Yun Tang <[hidden email]> wrote:
Hi  Stephen

This is not related with RocksDB but with default on-heap operator state backend. From your exception stack trace, you have created too many operator states (more than 32767).
How do you call context.getOperatorStateStore().getListState or context.getOperatorStateStore().getBroadcastState ? Did you pass a different operator state descriptor each time?

Best
Yun Tang

From: Stephen Patel <[hidden email]>
Sent: Thursday, April 16, 2020 2:09
To: [hidden email] <[hidden email]>
Subject: Streaming Job eventually begins failing during checkpointing
 
I've got a flink (1.8.0, emr-5.26) streaming job running on yarn.  It's configured to use rocksdb, and checkpoint once a minute to hdfs.  This job operates just fine for around 20 days, and then begins failing with this exception (it fails, restarts, and fails again, repeatedly):

2020-04-15 13:15:02,920 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Triggering checkpoint 32701 @ 1586956502911 for job 9953424f21e240112dd23ab4f8320b60.
2020-04-15 13:15:05,762 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Completed checkpoint 32701 for job 9953424f21e240112dd23ab4f8320b60 (795385496 bytes in 2667 ms).
2020-04-15 13:16:02,919 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Triggering checkpoint 32702 @ 1586956562911 for job 9953424f21e240112dd23ab4f8320b60.
2020-04-15 13:16:03,147 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - <operator_name> (1/2) (f4737add01961f8b42b8eb4e791b83ba) switched from RUNNING to FAILED.
AsynchronousException{java.lang.Exception: Could not materialize checkpoint 32702 for operator <operator_name> (1/2).}
at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointExceptionHandler.tryHandleCheckpointException(StreamTask.java:1153)
at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:947)
at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:884)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.Exception: Could not materialize checkpoint 32702 for operator <operator_name> (1/2).
at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:942)
... 6 more
Caused by: java.util.concurrent.ExecutionException: java.lang.IllegalArgumentException
at java.util.concurrent.FutureTask.report(FutureTask.java:122)
at java.util.concurrent.FutureTask.get(FutureTask.java:192)
at org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:394)
at org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.<init>(OperatorSnapshotFinalizer.java:53)
at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:853)
... 5 more
Caused by: java.lang.IllegalArgumentException
at org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:123)
at org.apache.flink.runtime.state.OperatorBackendSerializationProxy.<init>(OperatorBackendSerializationProxy.java:68)
at org.apache.flink.runtime.state.DefaultOperatorStateBackendSnapshotStrategy$1.callInternal(DefaultOperatorStateBackendSnapshotStrategy.java:138)
at org.apache.flink.runtime.state.DefaultOperatorStateBackendSnapshotStrategy$1.callInternal(DefaultOperatorStateBackendSnapshotStrategy.java:108)
at org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:75)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:391)
... 7 more

This application configured to retain external checkpoints.  When I attempt to restart from the last successful checkpoint, it will fail with the same error on the first checkpoint that happens after the restart.

I haven't been able to find out why this might be. The source code doesn't seem particularly informative to my eyes: https://github.com/apache/flink/blob/release-1.8.0/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorBackendSerializationProxy.java#L68

Has anyone else seen anything like this?
Reply | Threaded
Open this post in threaded view
|

Re: Streaming Job eventually begins failing during checkpointing

Stephan Ewen
If something requires Beam to register a new state each time, then this is tricky, because currently you cannot unregister states from Flink.

@Yu @Yun I remember chatting about this (allowing to explicitly unregister states so they get dropped from successive checkpoints) at some point, but I could not find a jira ticket for this. Do you remember what the status of that discussion is?

On Thu, Apr 16, 2020 at 6:37 PM Stephen Patel <[hidden email]> wrote:
I posted to the beam mailing list: https://lists.apache.org/thread.html/rb2ebfad16d85bcf668978b3defd442feda0903c20db29c323497a672%40%3Cuser.beam.apache.org%3E

I think this is related to a Beam feature called RequiresStableInput (which my pipeline is using).  It will create a new operator (or keyed) state per checkpoint.  I'm not sure that there are any parameters that I have control over to tweak it's behavior (apart from increasing the checkpoint interval to let the pipeline run longer before building up that many states).

Perhaps this is something that can be fixed (maybe by unregistering Operator States after they aren't used any more in the RequiresStableInput code).  It seems to me that this isn't a Flink issue, but rather a Beam issue.  

Thanks for pointing me in the right direction.  

On Thu, Apr 16, 2020 at 11:29 AM Yun Tang <[hidden email]> wrote:
Hi Stephen

I think the state name [1] which would be changed every time might the root cause. I am not familiar with Beam code, would it be possible to create so many operator states? Did you configure some parameters wrongly?



Best
Yun Tang

From: Stephen Patel <[hidden email]>
Sent: Thursday, April 16, 2020 22:30
To: Yun Tang <[hidden email]>
Cc: [hidden email] <[hidden email]>
Subject: Re: Streaming Job eventually begins failing during checkpointing
 

On Thu, Apr 16, 2020 at 9:03 AM Stephen Patel <[hidden email]> wrote:
I can't say that I ever call that directly.  The beam library that I'm using does call it in a couple places: https://github.com/apache/beam/blob/v2.14.0/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java#L422-L429

But it seems to be the same descriptor every time.  Is that limit per operator?  That is, can each operator host up to 32767 operator/broadcast states?  I assume that's by name?

On Wed, Apr 15, 2020 at 10:46 PM Yun Tang <[hidden email]> wrote:
Hi  Stephen

This is not related with RocksDB but with default on-heap operator state backend. From your exception stack trace, you have created too many operator states (more than 32767).
How do you call context.getOperatorStateStore().getListState or context.getOperatorStateStore().getBroadcastState ? Did you pass a different operator state descriptor each time?

Best
Yun Tang

From: Stephen Patel <[hidden email]>
Sent: Thursday, April 16, 2020 2:09
To: [hidden email] <[hidden email]>
Subject: Streaming Job eventually begins failing during checkpointing
 
I've got a flink (1.8.0, emr-5.26) streaming job running on yarn.  It's configured to use rocksdb, and checkpoint once a minute to hdfs.  This job operates just fine for around 20 days, and then begins failing with this exception (it fails, restarts, and fails again, repeatedly):

2020-04-15 13:15:02,920 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Triggering checkpoint 32701 @ 1586956502911 for job 9953424f21e240112dd23ab4f8320b60.
2020-04-15 13:15:05,762 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Completed checkpoint 32701 for job 9953424f21e240112dd23ab4f8320b60 (795385496 bytes in 2667 ms).
2020-04-15 13:16:02,919 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Triggering checkpoint 32702 @ 1586956562911 for job 9953424f21e240112dd23ab4f8320b60.
2020-04-15 13:16:03,147 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - <operator_name> (1/2) (f4737add01961f8b42b8eb4e791b83ba) switched from RUNNING to FAILED.
AsynchronousException{java.lang.Exception: Could not materialize checkpoint 32702 for operator <operator_name> (1/2).}
at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointExceptionHandler.tryHandleCheckpointException(StreamTask.java:1153)
at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:947)
at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:884)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.Exception: Could not materialize checkpoint 32702 for operator <operator_name> (1/2).
at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:942)
... 6 more
Caused by: java.util.concurrent.ExecutionException: java.lang.IllegalArgumentException
at java.util.concurrent.FutureTask.report(FutureTask.java:122)
at java.util.concurrent.FutureTask.get(FutureTask.java:192)
at org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:394)
at org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.<init>(OperatorSnapshotFinalizer.java:53)
at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:853)
... 5 more
Caused by: java.lang.IllegalArgumentException
at org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:123)
at org.apache.flink.runtime.state.OperatorBackendSerializationProxy.<init>(OperatorBackendSerializationProxy.java:68)
at org.apache.flink.runtime.state.DefaultOperatorStateBackendSnapshotStrategy$1.callInternal(DefaultOperatorStateBackendSnapshotStrategy.java:138)
at org.apache.flink.runtime.state.DefaultOperatorStateBackendSnapshotStrategy$1.callInternal(DefaultOperatorStateBackendSnapshotStrategy.java:108)
at org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:75)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:391)
... 7 more

This application configured to retain external checkpoints.  When I attempt to restart from the last successful checkpoint, it will fail with the same error on the first checkpoint that happens after the restart.

I haven't been able to find out why this might be. The source code doesn't seem particularly informative to my eyes: https://github.com/apache/flink/blob/release-1.8.0/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorBackendSerializationProxy.java#L68

Has anyone else seen anything like this?
Reply | Threaded
Open this post in threaded view
|

Re: Streaming Job eventually begins failing during checkpointing

Eleanore Jin
Hi All, 

I think the Beam Community fixed this issue: https://github.com/apache/beam/pull/11478

Thanks!
Eleanore

On Thu, Apr 23, 2020 at 4:24 AM Stephan Ewen <[hidden email]> wrote:
If something requires Beam to register a new state each time, then this is tricky, because currently you cannot unregister states from Flink.

@Yu @Yun I remember chatting about this (allowing to explicitly unregister states so they get dropped from successive checkpoints) at some point, but I could not find a jira ticket for this. Do you remember what the status of that discussion is?

On Thu, Apr 16, 2020 at 6:37 PM Stephen Patel <[hidden email]> wrote:
I posted to the beam mailing list: https://lists.apache.org/thread.html/rb2ebfad16d85bcf668978b3defd442feda0903c20db29c323497a672%40%3Cuser.beam.apache.org%3E

I think this is related to a Beam feature called RequiresStableInput (which my pipeline is using).  It will create a new operator (or keyed) state per checkpoint.  I'm not sure that there are any parameters that I have control over to tweak it's behavior (apart from increasing the checkpoint interval to let the pipeline run longer before building up that many states).

Perhaps this is something that can be fixed (maybe by unregistering Operator States after they aren't used any more in the RequiresStableInput code).  It seems to me that this isn't a Flink issue, but rather a Beam issue.  

Thanks for pointing me in the right direction.  

On Thu, Apr 16, 2020 at 11:29 AM Yun Tang <[hidden email]> wrote:
Hi Stephen

I think the state name [1] which would be changed every time might the root cause. I am not familiar with Beam code, would it be possible to create so many operator states? Did you configure some parameters wrongly?



Best
Yun Tang

From: Stephen Patel <[hidden email]>
Sent: Thursday, April 16, 2020 22:30
To: Yun Tang <[hidden email]>
Cc: [hidden email] <[hidden email]>
Subject: Re: Streaming Job eventually begins failing during checkpointing
 

On Thu, Apr 16, 2020 at 9:03 AM Stephen Patel <[hidden email]> wrote:
I can't say that I ever call that directly.  The beam library that I'm using does call it in a couple places: https://github.com/apache/beam/blob/v2.14.0/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java#L422-L429

But it seems to be the same descriptor every time.  Is that limit per operator?  That is, can each operator host up to 32767 operator/broadcast states?  I assume that's by name?

On Wed, Apr 15, 2020 at 10:46 PM Yun Tang <[hidden email]> wrote:
Hi  Stephen

This is not related with RocksDB but with default on-heap operator state backend. From your exception stack trace, you have created too many operator states (more than 32767).
How do you call context.getOperatorStateStore().getListState or context.getOperatorStateStore().getBroadcastState ? Did you pass a different operator state descriptor each time?

Best
Yun Tang

From: Stephen Patel <[hidden email]>
Sent: Thursday, April 16, 2020 2:09
To: [hidden email] <[hidden email]>
Subject: Streaming Job eventually begins failing during checkpointing
 
I've got a flink (1.8.0, emr-5.26) streaming job running on yarn.  It's configured to use rocksdb, and checkpoint once a minute to hdfs.  This job operates just fine for around 20 days, and then begins failing with this exception (it fails, restarts, and fails again, repeatedly):

2020-04-15 13:15:02,920 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Triggering checkpoint 32701 @ 1586956502911 for job 9953424f21e240112dd23ab4f8320b60.
2020-04-15 13:15:05,762 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Completed checkpoint 32701 for job 9953424f21e240112dd23ab4f8320b60 (795385496 bytes in 2667 ms).
2020-04-15 13:16:02,919 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Triggering checkpoint 32702 @ 1586956562911 for job 9953424f21e240112dd23ab4f8320b60.
2020-04-15 13:16:03,147 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - <operator_name> (1/2) (f4737add01961f8b42b8eb4e791b83ba) switched from RUNNING to FAILED.
AsynchronousException{java.lang.Exception: Could not materialize checkpoint 32702 for operator <operator_name> (1/2).}
at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointExceptionHandler.tryHandleCheckpointException(StreamTask.java:1153)
at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:947)
at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:884)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.Exception: Could not materialize checkpoint 32702 for operator <operator_name> (1/2).
at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:942)
... 6 more
Caused by: java.util.concurrent.ExecutionException: java.lang.IllegalArgumentException
at java.util.concurrent.FutureTask.report(FutureTask.java:122)
at java.util.concurrent.FutureTask.get(FutureTask.java:192)
at org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:394)
at org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.<init>(OperatorSnapshotFinalizer.java:53)
at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:853)
... 5 more
Caused by: java.lang.IllegalArgumentException
at org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:123)
at org.apache.flink.runtime.state.OperatorBackendSerializationProxy.<init>(OperatorBackendSerializationProxy.java:68)
at org.apache.flink.runtime.state.DefaultOperatorStateBackendSnapshotStrategy$1.callInternal(DefaultOperatorStateBackendSnapshotStrategy.java:138)
at org.apache.flink.runtime.state.DefaultOperatorStateBackendSnapshotStrategy$1.callInternal(DefaultOperatorStateBackendSnapshotStrategy.java:108)
at org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:75)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:391)
... 7 more

This application configured to retain external checkpoints.  When I attempt to restart from the last successful checkpoint, it will fail with the same error on the first checkpoint that happens after the restart.

I haven't been able to find out why this might be. The source code doesn't seem particularly informative to my eyes: https://github.com/apache/flink/blob/release-1.8.0/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorBackendSerializationProxy.java#L68

Has anyone else seen anything like this?
Reply | Threaded
Open this post in threaded view
|

Re: Streaming Job eventually begins failing during checkpointing

Yu Li
Sorry, just noticed this thread...

@Stephan I cannot remember the discussion but I think it's an interesting topic, will find some time to consider it (unregister states).

@Eleanore Glad to know that Beam community has fixed it and thanks for the reference.

Best Regards,
Yu


On Sun, 26 Apr 2020 at 03:10, Eleanore Jin <[hidden email]> wrote:
Hi All, 

I think the Beam Community fixed this issue: https://github.com/apache/beam/pull/11478

Thanks!
Eleanore

On Thu, Apr 23, 2020 at 4:24 AM Stephan Ewen <[hidden email]> wrote:
If something requires Beam to register a new state each time, then this is tricky, because currently you cannot unregister states from Flink.

@Yu @Yun I remember chatting about this (allowing to explicitly unregister states so they get dropped from successive checkpoints) at some point, but I could not find a jira ticket for this. Do you remember what the status of that discussion is?

On Thu, Apr 16, 2020 at 6:37 PM Stephen Patel <[hidden email]> wrote:
I posted to the beam mailing list: https://lists.apache.org/thread.html/rb2ebfad16d85bcf668978b3defd442feda0903c20db29c323497a672%40%3Cuser.beam.apache.org%3E

I think this is related to a Beam feature called RequiresStableInput (which my pipeline is using).  It will create a new operator (or keyed) state per checkpoint.  I'm not sure that there are any parameters that I have control over to tweak it's behavior (apart from increasing the checkpoint interval to let the pipeline run longer before building up that many states).

Perhaps this is something that can be fixed (maybe by unregistering Operator States after they aren't used any more in the RequiresStableInput code).  It seems to me that this isn't a Flink issue, but rather a Beam issue.  

Thanks for pointing me in the right direction.  

On Thu, Apr 16, 2020 at 11:29 AM Yun Tang <[hidden email]> wrote:
Hi Stephen

I think the state name [1] which would be changed every time might the root cause. I am not familiar with Beam code, would it be possible to create so many operator states? Did you configure some parameters wrongly?



Best
Yun Tang

From: Stephen Patel <[hidden email]>
Sent: Thursday, April 16, 2020 22:30
To: Yun Tang <[hidden email]>
Cc: [hidden email] <[hidden email]>
Subject: Re: Streaming Job eventually begins failing during checkpointing
 

On Thu, Apr 16, 2020 at 9:03 AM Stephen Patel <[hidden email]> wrote:
I can't say that I ever call that directly.  The beam library that I'm using does call it in a couple places: https://github.com/apache/beam/blob/v2.14.0/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java#L422-L429

But it seems to be the same descriptor every time.  Is that limit per operator?  That is, can each operator host up to 32767 operator/broadcast states?  I assume that's by name?

On Wed, Apr 15, 2020 at 10:46 PM Yun Tang <[hidden email]> wrote:
Hi  Stephen

This is not related with RocksDB but with default on-heap operator state backend. From your exception stack trace, you have created too many operator states (more than 32767).
How do you call context.getOperatorStateStore().getListState or context.getOperatorStateStore().getBroadcastState ? Did you pass a different operator state descriptor each time?

Best
Yun Tang

From: Stephen Patel <[hidden email]>
Sent: Thursday, April 16, 2020 2:09
To: [hidden email] <[hidden email]>
Subject: Streaming Job eventually begins failing during checkpointing
 
I've got a flink (1.8.0, emr-5.26) streaming job running on yarn.  It's configured to use rocksdb, and checkpoint once a minute to hdfs.  This job operates just fine for around 20 days, and then begins failing with this exception (it fails, restarts, and fails again, repeatedly):

2020-04-15 13:15:02,920 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Triggering checkpoint 32701 @ 1586956502911 for job 9953424f21e240112dd23ab4f8320b60.
2020-04-15 13:15:05,762 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Completed checkpoint 32701 for job 9953424f21e240112dd23ab4f8320b60 (795385496 bytes in 2667 ms).
2020-04-15 13:16:02,919 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Triggering checkpoint 32702 @ 1586956562911 for job 9953424f21e240112dd23ab4f8320b60.
2020-04-15 13:16:03,147 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - <operator_name> (1/2) (f4737add01961f8b42b8eb4e791b83ba) switched from RUNNING to FAILED.
AsynchronousException{java.lang.Exception: Could not materialize checkpoint 32702 for operator <operator_name> (1/2).}
at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointExceptionHandler.tryHandleCheckpointException(StreamTask.java:1153)
at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:947)
at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:884)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.Exception: Could not materialize checkpoint 32702 for operator <operator_name> (1/2).
at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:942)
... 6 more
Caused by: java.util.concurrent.ExecutionException: java.lang.IllegalArgumentException
at java.util.concurrent.FutureTask.report(FutureTask.java:122)
at java.util.concurrent.FutureTask.get(FutureTask.java:192)
at org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:394)
at org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.<init>(OperatorSnapshotFinalizer.java:53)
at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:853)
... 5 more
Caused by: java.lang.IllegalArgumentException
at org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:123)
at org.apache.flink.runtime.state.OperatorBackendSerializationProxy.<init>(OperatorBackendSerializationProxy.java:68)
at org.apache.flink.runtime.state.DefaultOperatorStateBackendSnapshotStrategy$1.callInternal(DefaultOperatorStateBackendSnapshotStrategy.java:138)
at org.apache.flink.runtime.state.DefaultOperatorStateBackendSnapshotStrategy$1.callInternal(DefaultOperatorStateBackendSnapshotStrategy.java:108)
at org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:75)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:391)
... 7 more

This application configured to retain external checkpoints.  When I attempt to restart from the last successful checkpoint, it will fail with the same error on the first checkpoint that happens after the restart.

I haven't been able to find out why this might be. The source code doesn't seem particularly informative to my eyes: https://github.com/apache/flink/blob/release-1.8.0/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorBackendSerializationProxy.java#L68

Has anyone else seen anything like this?