Broadcast checkpoint serialization fail

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

Broadcast checkpoint serialization fail

Vasily Melnik
Hi all. 
In Flink 1.8 we have strange exception that causes job failing:

2019-11-14 15:52:52,071 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - op4 (1/1) (797d4c2b85010dab6be5e1d06ff6493a) switched from RUNNING to FAILED.
AsynchronousException{java.lang.Exception: Could not materialize checkpoint 2 for operator op4 (1/1).}
at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointExceptionHandler.tryHandleCheckpointException(StreamTask.java:1153)
at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:947)
at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:884)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.Exception: Could not materialize checkpoint 2 for operator op4 (1/1).
at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:942)
... 6 more
Caused by: java.util.concurrent.ExecutionException: java.lang.NullPointerException
at java.util.concurrent.FutureTask.report(FutureTask.java:122)
at java.util.concurrent.FutureTask.get(FutureTask.java:192)
at org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:394)
at org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.<init>(OperatorSnapshotFinalizer.java:53)
at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:853)
... 5 more
Caused by: java.lang.NullPointerException
at com.esotericsoftware.kryo.util.DefaultClassResolver.writeClass(DefaultClassResolver.java:80)
at com.esotericsoftware.kryo.Kryo.writeClass(Kryo.java:488)
at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:593)
at com.esotericsoftware.kryo.serializers.DefaultSerializers$TreeSetSerializer.write(DefaultSerializers.java:608)
at com.esotericsoftware.kryo.serializers.DefaultSerializers$TreeSetSerializer.write(DefaultSerializers.java:605)
at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:599)
at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.serialize(KryoSerializer.java:305)
at org.apache.flink.runtime.state.HeapBroadcastState.write(HeapBroadcastState.java:109)
at org.apache.flink.runtime.state.DefaultOperatorStateBackendSnapshotStrategy$1.callInternal(DefaultOperatorStateBackendSnapshotStrategy.java:167)
at org.apache.flink.runtime.state.DefaultOperatorStateBackendSnapshotStrategy$1.callInternal(DefaultOperatorStateBackendSnapshotStrategy.java:108)
at org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:75)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:391)
... 7 more

As we see, exception occurs in  org.apache.flink.runtime.state.HeapBroadcastState.write(HeapBroadcastState.java:109)
but what exactly is the reason?

We configured RocksDB state backend for job with local filesystem storage.


С уважением,
Василий Мельник

Reply | Threaded
Open this post in threaded view
|

Re: Broadcast checkpoint serialization fail

Vasily Melnik
Hi all, 
We found the solution:
the problem is Comparator in TreeSet we used as the value of broadcast state. Kryo is unable to serialize lambda in Comparator, so we changed to regular class - and everything is fine now.  


С уважением,
Василий Мельник

GlowByte Consulting

===================

Моб. тел.: +7 (903) 101-43-71
[hidden email]



On Fri, 15 Nov 2019 at 14:29, Vasily Melnik <[hidden email]> wrote:
Hi all. 
In Flink 1.8 we have strange exception that causes job failing:

2019-11-14 15:52:52,071 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - op4 (1/1) (797d4c2b85010dab6be5e1d06ff6493a) switched from RUNNING to FAILED.
AsynchronousException{java.lang.Exception: Could not materialize checkpoint 2 for operator op4 (1/1).}
at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointExceptionHandler.tryHandleCheckpointException(StreamTask.java:1153)
at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:947)
at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:884)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.Exception: Could not materialize checkpoint 2 for operator op4 (1/1).
at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:942)
... 6 more
Caused by: java.util.concurrent.ExecutionException: java.lang.NullPointerException
at java.util.concurrent.FutureTask.report(FutureTask.java:122)
at java.util.concurrent.FutureTask.get(FutureTask.java:192)
at org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:394)
at org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.<init>(OperatorSnapshotFinalizer.java:53)
at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:853)
... 5 more
Caused by: java.lang.NullPointerException
at com.esotericsoftware.kryo.util.DefaultClassResolver.writeClass(DefaultClassResolver.java:80)
at com.esotericsoftware.kryo.Kryo.writeClass(Kryo.java:488)
at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:593)
at com.esotericsoftware.kryo.serializers.DefaultSerializers$TreeSetSerializer.write(DefaultSerializers.java:608)
at com.esotericsoftware.kryo.serializers.DefaultSerializers$TreeSetSerializer.write(DefaultSerializers.java:605)
at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:599)
at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.serialize(KryoSerializer.java:305)
at org.apache.flink.runtime.state.HeapBroadcastState.write(HeapBroadcastState.java:109)
at org.apache.flink.runtime.state.DefaultOperatorStateBackendSnapshotStrategy$1.callInternal(DefaultOperatorStateBackendSnapshotStrategy.java:167)
at org.apache.flink.runtime.state.DefaultOperatorStateBackendSnapshotStrategy$1.callInternal(DefaultOperatorStateBackendSnapshotStrategy.java:108)
at org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:75)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:391)
... 7 more

As we see, exception occurs in  org.apache.flink.runtime.state.HeapBroadcastState.write(HeapBroadcastState.java:109)
but what exactly is the reason?

We configured RocksDB state backend for job with local filesystem storage.


С уважением,
Василий Мельник