Hi team,
I am facing this exception, org.apache.kafka.common.KafkaException: Received exception when fetching the next record from topic_log-3. If needed, please seek past the record to continue consumption. at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.fetchRecords(Fetcher.java:1076) at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.access$1200(Fetcher.java:944) at org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:567) at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:528) at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1086) at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1043) at org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.run(KafkaConsumerThread.java:257) Caused by: org.apache.kafka.common.errors.CorruptRecordException: Record size is less than the minimum record overhead (14) Also, when I consume message from ubuntu terminal consumer, I get same error. How can skip this corrupt record? |
Hi
As you said, consume from ubuntu terminal has the same error, maybe you could send a email to kafka user maillist.
Best, Congxian
On Apr 1, 2019, 05:26 +0800, Sushant Sawant <[hidden email]>, wrote:
|
Hi, Thanks for reply. But is there a way one could skip this corrupt record from Flink consumer? Flink job goes in a loop, it restarts and then fails again at same record. On Mon, 1 Apr 2019, 07:34 Congxian Qiu, <[hidden email]> wrote:
|
According to docs (here: https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html#the-deserializationschema , last paragraph) that’s an expected behaviour. May be you should think about writing your own deserialisation schema to skip corrupted messages.
|
Hey, As far as I understand the error is not caused by the deserialization but really by the polling of the message, so custom deserialization schema won't really help in this case. There seems to be an error in the messages in Your topic. You can see here what is the data that should be associated with the message. One thing you could possibly do is simply find the offset of the corrupted message and start reading after the record. However, You should probably verify what is the reason for the message size being smaller than it should. One thing that can cause this exact behavior may be a mismatch between Kafka versions on broker and consumer. Best Regards, Dom. wt., 2 kwi 2019 o 09:36 Ilya Karpov <[hidden email]> napisał(a):
|
Hi, yes exactly, I am already using custom deserialization schema. Currently, doing same, checking for corrupt record offset and starting consumer from next offset. But then there is need of continous monitoring and find corrupt record manually. Any idea how could I build program for this. And could you please elaborate on version miss match of kafka broker and consumer? I never installed two kafka versions everything is from one package which was downloaded from apache kafka tar file. Thanks, Sushant Sawant On Tue, 2 Apr 2019, 19:46 Dominik Wosiński, <[hidden email]> wrote:
|
Hey, Sorry for such a delay, but I have missed this message. Basically, technically you could have Kafka broker installed in version say 1.0.0 and using FlinkKafkaConsumer08. This could technically create issues. I'm not sure if You can automate the process of skipping corrupted messages, as You would have to write the consumer that will allow skipping messages that are corrupted. This maybe a good idea to think about for Flink though. On the other hand, if You have many messages that are corrupted, this may mean that the problem lies elsewhere within You pipeline (kafka producers before Flink). |
Free forum by Nabble | Edit this page |