http://deprecated-apache-flink-user-mailing-list-archive.369.s1.nabble.com/Batch-expired-in-FlinkKafkaProducer09-tp22511.html
1. The error only and always occurs when checkpoint starts.
2. The error seems not related to flushOnCheckpoint
config, since it is detected before flush check.
3. There is checkErroneous in the beginning of
FlinkKafkaProducerBase.invoke and FlinkKafkaProducerBase.snapshotState, I don’t know why the invoke method works fine.
4. There is no problem when having the same code writing to another Kafka cluster. (We just got a new Kafka server to migrate:)
5. The Kafka server is actually of version 0.11, in this job we need to consume from 0.9, and write to 0.11, so we used 09 version.
java.lang.Exception: Error while triggering checkpoint 3 for Source: v2-kafkaRawUbt -> v2-flatMapUbtEnrich -> v2-filterUbt -> (v2-flatMapUbtAbs -> v2-mapAbsEvent -> v2-flatMapUbtAbsToAlgoEventV2
-> Filter -> Sink: v2-sinkUbtAlgoEventRealtimeV2, v2-filterUbtFeLogErr) (41/226)
at org.apache.flink.runtime.taskmanager.Task$2.run(Task.java:1210)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.Exception: Could not perform checkpoint 3 for operator Source: v2-kafkaRawUbt -> v2-flatMapUbtEnrich -> v2-filterUbt -> (v2-flatMapUbtAbs -> v2-mapAbsEvent -> v2-flatMapUbtAbsToAlgoEventV2
-> Filter -> Sink: v2-sinkUbtAlgoEventRealtimeV2, v2-filterUbtFeLogErr) (41/226).
at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:544)
at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.triggerCheckpoint(SourceStreamTask.java:111)
at org.apache.flink.runtime.taskmanager.Task$2.run(Task.java:1199)
... 5 more
Caused by: java.lang.Exception: Could not complete snapshot 3 for operator Source: v2-kafkaRawUbt -> v2-flatMapUbtEnrich -> v2-filterUbt -> (v2-flatMapUbtAbs -> v2-mapAbsEvent -> v2-flatMapUbtAbsToAlgoEventV2
-> Filter -> Sink: v2-sinkUbtAlgoEventRealtimeV2, v2-filterUbtFeLogErr) (41/226).
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:378)
at org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1089)
at org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1038)
at org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:671)
at org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:607)
at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:538)
... 7 more
Caused by: java.lang.Exception: Failed to send data to Kafka: Batch Expired
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.checkErroneous(FlinkKafkaProducerBase.java:373)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.snapshotState(FlinkKafkaProducerBase.java:350)
at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.trySnapshotFunctionState(StreamingFunctionUtils.java:118)
at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.snapshotFunctionState(StreamingFunctionUtils.java:99)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotState(AbstractUdfStreamOperator.java:90)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:356)
... 12 more
Caused by: org.apache.kafka.common.errors.TimeoutException: Batch Expired