Hi all. In Flink 1.8 we have strange exception that causes job failing: 2019-11-14 15:52:52,071 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - op4 (1/1) (797d4c2b85010dab6be5e1d06ff6493a) switched from RUNNING to FAILED. AsynchronousException{java.lang.Exception: Could not materialize checkpoint 2 for operator op4 (1/1).} at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointExceptionHandler.tryHandleCheckpointException(StreamTask.java:1153) at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:947) at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:884) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.Exception: Could not materialize checkpoint 2 for operator op4 (1/1). at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:942) ... 6 more Caused by: java.util.concurrent.ExecutionException: java.lang.NullPointerException at java.util.concurrent.FutureTask.report(FutureTask.java:122) at java.util.concurrent.FutureTask.get(FutureTask.java:192) at org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:394) at org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.<init>(OperatorSnapshotFinalizer.java:53) at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:853) ... 5 more Caused by: java.lang.NullPointerException at com.esotericsoftware.kryo.util.DefaultClassResolver.writeClass(DefaultClassResolver.java:80) at com.esotericsoftware.kryo.Kryo.writeClass(Kryo.java:488) at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:593) at com.esotericsoftware.kryo.serializers.DefaultSerializers$TreeSetSerializer.write(DefaultSerializers.java:608) at com.esotericsoftware.kryo.serializers.DefaultSerializers$TreeSetSerializer.write(DefaultSerializers.java:605) at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:599) at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.serialize(KryoSerializer.java:305) at org.apache.flink.runtime.state.HeapBroadcastState.write(HeapBroadcastState.java:109) at org.apache.flink.runtime.state.DefaultOperatorStateBackendSnapshotStrategy$1.callInternal(DefaultOperatorStateBackendSnapshotStrategy.java:167) at org.apache.flink.runtime.state.DefaultOperatorStateBackendSnapshotStrategy$1.callInternal(DefaultOperatorStateBackendSnapshotStrategy.java:108) at org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:75) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:391) ... 7 more As we see, exception occurs in org.apache.flink.runtime.state.HeapBroadcastState.write(HeapBroadcastState.java:109) but what exactly is the reason? We configured RocksDB state backend for job with local filesystem storage. С уважением, |
Hi all, We found the solution: the problem is Comparator in TreeSet we used as the value of broadcast state. Kryo is unable to serialize lambda in Comparator, so we changed to regular class - and everything is fine now. On Fri, 15 Nov 2019 at 14:29, Vasily Melnik <[hidden email]> wrote:
|
Free forum by Nabble | Edit this page |