Summary
I'm hitting an error when running a job, it happens several times, and I dont know why. Any help would be appreciated. Thanks! Details flink version: 1.4.2-1700 java.lang.Exception: Could not complete snapshot 158 for operator asyncio_by_transform -> flatmap_by_action_list_flat -> order_source_kafka_sink-preprocessing (3/10). at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:370) at org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1285) at org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1223) at org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:707) at org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:622) at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:575) at org.apache.flink.streaming.runtime.io.BarrierBuffer.notifyCheckpoint(BarrierBuffer.java:380) at org.apache.flink.streaming.runtime.io.BarrierBuffer.processBarrier(BarrierBuffer.java:283) at org.apache.flink.streaming.runtime.io.BarrierBuffer.getNextNonBlocked(BarrierBuffer.java:185) at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:217) at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718) at java.lang.Thread.run(Thread.java:745) Caused by: java.util.ConcurrentModificationException at java.util.HashMap$HashIterator.nextNode(HashMap.java:1429) at java.util.HashMap$EntryIterator.next(HashMap.java:1463) at java.util.HashMap$EntryIterator.next(HashMap.java:1461) at com.esotericsoftware.kryo.serializers.MapSerializer.copy(MapSerializer.java:156) at com.esotericsoftware.kryo.serializers.MapSerializer.copy(MapSerializer.java:21) at com.esotericsoftware.kryo.Kryo.copy(Kryo.java:862) at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:198) at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.copy(PojoSerializer.java:239) at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.copy(StreamElementSerializer.java:107) at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.copy(StreamElementSerializer.java:48) at org.apache.flink.runtime.state.ArrayListSerializer.copy(ArrayListSerializer.java:74) at org.apache.flink.runtime.state.DefaultOperatorStateBackend$PartitionableListState.<init>(DefaultOperatorStateBackend.java:453) at org.apache.flink.runtime.state.DefaultOperatorStateBackend$PartitionableListState.deepCopy(DefaultOperatorStateBackend.java:465) at org.apache.flink.runtime.state.DefaultOperatorStateBackend.snapshot(DefaultOperatorStateBackend.java:220) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:355) |
Hi Song
Flink-1.4.2 is a bit too old, and I think this error is caused by FLINK-8876 [1][2] which should be fixed after Flink-1.5, please consider to upgrade Flink version.
Best
Yun Tang
From: Song Wu <[hidden email]>
Sent: Saturday, October 10, 2020 11:03 To: user <[hidden email]> Subject: checkpoint fail
Summary
I'm hitting an error when running a job, it happens several times, and I dont know why.
Any help would be appreciated. Thanks!
Details
flink version: 1.4.2-1700
java.lang.Exception: Could not complete snapshot 158 for operator asyncio_by_transform -> flatmap_by_action_list_flat -> order_source_kafka_sink-preprocessing (3/10). at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:370) at org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1285) at org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1223) at org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:707) at org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:622) at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:575) at org.apache.flink.streaming.runtime.io.BarrierBuffer.notifyCheckpoint(BarrierBuffer.java:380) at org.apache.flink.streaming.runtime.io.BarrierBuffer.processBarrier(BarrierBuffer.java:283) at org.apache.flink.streaming.runtime.io.BarrierBuffer.getNextNonBlocked(BarrierBuffer.java:185) at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:217) at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718) at java.lang.Thread.run(Thread.java:745) Caused by: java.util.ConcurrentModificationException at java.util.HashMap$HashIterator.nextNode(HashMap.java:1429) at java.util.HashMap$EntryIterator.next(HashMap.java:1463) at java.util.HashMap$EntryIterator.next(HashMap.java:1461) at com.esotericsoftware.kryo.serializers.MapSerializer.copy(MapSerializer.java:156) at com.esotericsoftware.kryo.serializers.MapSerializer.copy(MapSerializer.java:21) at com.esotericsoftware.kryo.Kryo.copy(Kryo.java:862) at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:198) at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.copy(PojoSerializer.java:239) at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.copy(StreamElementSerializer.java:107) at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.copy(StreamElementSerializer.java:48) at org.apache.flink.runtime.state.ArrayListSerializer.copy(ArrayListSerializer.java:74) at org.apache.flink.runtime.state.DefaultOperatorStateBackend$PartitionableListState.<init>(DefaultOperatorStateBackend.java:453) at org.apache.flink.runtime.state.DefaultOperatorStateBackend$PartitionableListState.deepCopy(DefaultOperatorStateBackend.java:465) at org.apache.flink.runtime.state.DefaultOperatorStateBackend.snapshot(DefaultOperatorStateBackend.java:220) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:355) |
Free forum by Nabble | Edit this page |