Loading... |
Reply to author |
Edit post |
Move post |
Delete this post |
Delete this post and replies |
Change post date |
Print post |
Permalink |
Raw mail |
Hi Team,
We have recently enabled check pointing in our flink job using S3 as the state backend, but while submitting the Job, it fails with the below error.Can you please let us know what is going wrong here. Below is the code snippet for enabling check pointing. env.setStateBackend(new FsStateBackend("s3://bucket-name/job-name/",true)); env.enableCheckpointing(1000); CheckpointConfig config = env.getCheckpointConfig(); config.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); config.setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE); flink-conf.yaml state.backend: filesystem state.checkpoints.dir: s3://flinkcheckpointing/checkpoint-metadata/ Error Logs : 2021-05-10 13:57:20 java.io.IOException: Could not perform checkpoint 1 for operator Co-Process-Broadcast -> Sink: SinkToOutputKafka (1/1)#0. at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:963) at org.apache.flink.streaming.runtime.io.CheckpointBarrierHandler.notifyCheckpoint(CheckpointBarrierHandler.java:115) at org.apache.flink.streaming.runtime.io.CheckpointBarrierTracker.processBarrier(CheckpointBarrierTracker.java:126) at org.apache.flink.streaming.runtime.io.CheckpointedInputGate.handleEvent(CheckpointedInputGate.java:180) at org.apache.flink.streaming.runtime.io.CheckpointedInputGate.pollNext(CheckpointedInputGate.java:157) at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:179) at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:97) at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:396) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:191) at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:617) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:581) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570) at java.base/java.lang.Thread.run(Unknown Source) Caused by: org.apache.flink.runtime.checkpoint.CheckpointException: Could not complete snapshot 1 for operator Co-Process-Broadcast -> Sink: SinkToOutputKafka (1/1)#0. Failure reason: Checkpoint was declined. at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:241) at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:162) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:371) at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointStreamOperator(SubtaskCheckpointCoordinatorImpl.java:686) at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.buildOperatorSnapshotFutures(SubtaskCheckpointCoordinatorImpl.java:607) at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.takeSnapshotSync(SubtaskCheckpointCoordinatorImpl.java:572) at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointState(SubtaskCheckpointCoordinatorImpl.java:298) at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$9(StreamTask.java:1004) at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50) at org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:988) at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:947) ... 14 more Caused by: com.esotericsoftware.kryo.KryoException: java.lang.UnsupportedOperationException Serialization trace: rules (com.project.eventing.rule.AndCompositeAbstractRule) rule (com.project.eventing.model.producer.DataPair) ruleMap (com.project.eventing.model.producer.EventConfigState) at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125) at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528) at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679) at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106) at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528) at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761) at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:143) at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21) at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679) at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106) at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528) at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:657) at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:273) at org.apache.flink.api.common.typeutils.base.MapSerializer.copy(MapSerializer.java:111) at org.apache.flink.runtime.state.HeapBroadcastState.<init>(HeapBroadcastState.java:69) at org.apache.flink.runtime.state.HeapBroadcastState.deepCopy(HeapBroadcastState.java:84) at org.apache.flink.runtime.state.HeapBroadcastState.deepCopy(HeapBroadcastState.java:40) at org.apache.flink.runtime.state.DefaultOperatorStateBackendSnapshotStrategy.snapshot(DefaultOperatorStateBackendSnapshotStrategy.java:100) at org.apache.flink.runtime.state.DefaultOperatorStateBackend.snapshot(DefaultOperatorStateBackend.java:234) at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:213) ... 24 more Caused by: java.lang.UnsupportedOperationException at java.base/java.util.Collections$UnmodifiableCollection.add(Unknown Source) at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:116) at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:22) at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679) at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106) ... 43 more Thanks, Sudhansu |
Loading... |
Reply to author |
Edit post |
Move post |
Delete this post |
Delete this post and replies |
Change post date |
Print post |
Permalink |
Raw mail |
Please have a look at
https://stackoverflow.com/questions/32453030/using-an-collectionsunmodifiablecollection-with-apache-flink
On 5/10/2021 10:48 AM, sudhansu jena
wrote:
... [show rest of quote]
|
Loading... |
Reply to author |
Edit post |
Move post |
Delete this post |
Delete this post and replies |
Change post date |
Print post |
Permalink |
Raw mail |
Hi, Thanks for the prompt response. I have already visited that page but in the current flink version i.e 1.12.2, the method addDefaultKryoSerializer is not available in the config object. Thanks, Sudhansu On Mon, May 10, 2021 at 2:24 PM Chesnay Schepler <[hidden email]> wrote:
... [show rest of quote] |
Loading... |
Reply to author |
Edit post |
Move post |
Delete this post |
Delete this post and replies |
Change post date |
Print post |
Permalink |
Raw mail |
From what I can tell this method does
exist in 1.12.2 .
On 5/10/2021 11:11 AM, sudhansu jena
wrote:
... [show rest of quote]
|
Loading... |
Reply to author |
Edit post |
Move post |
Delete this post |
Delete this post and replies |
Change post date |
Print post |
Permalink |
Raw mail |
Hi Chesnay, Thank you so much for the help. The fix is working now. Thanks, Sudhansu On Mon, May 10, 2021 at 2:48 PM Chesnay Schepler <[hidden email]> wrote:
... [show rest of quote] |
Free forum by Nabble | Edit this page |