Hello,
When using FlinkKafkaConsumer011 with JSONKeyValueDeserializationSchema, if an invalid, non-parsable message is sent to the Kafka topic, the consumer expectedly fails with JsonParseException. So far so good, but this leads to the following loop: the job switches to FAILED then attempts to restart and fails again, and so on. That is, the parsing error leads to the Kafka message not being committed, hence it keeps being received. Since the JsonParseException can't be catched in application code, what would be the recommended way to handle the case of possibly non-parseable Kafka messages? Is there is a way to configure the Flink Kafka consumer to treat the case of non-parseable messages by logging the parsing error then committing the message such that the processing can continue? Is there isn't, would such an enhancement make sense?
Unless there is a better solution, it looks as a requirement to guarantee that FlinkKafkaConsumer011 only receives valid messages, which can be annoying in practice. For reference, here's the stack of the JsonParseException in the log: Source: Custom Source(1/1) switched to FAILED My env: Flink 1.4.0 and kafka_2.11-1.0.0 running locally on Mac.
Thanks, Adrian Compagnie IBM France Siège Social : 17 avenue de l'Europe, 92275 Bois-Colombes Cedex RCS Nanterre 552 118 465 Forme Sociale : S.A.S. Capital Social : 657.364.587 € SIREN/SIRET : 552 118 465 03644 - Code NAF 6202A |
Hi Adrian,
couldn't you solve this by providing your own DeserializationSchema [1], possibly extending from JSONKeyValueDeserializationSchema and catching the error there? Nico [1] https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/connectors/kafka.html#the-deserializationschema On 12/01/18 18:26, Adrian Vasiliu wrote: > Hello, > > When using FlinkKafkaConsumer011 with JSONKeyValueDeserializationSchema, > if an invalid, non-parsable message is sent to the Kafka topic, the > consumer expectedly fails with JsonParseException. So far so good, but > this leads to the following loop: the job switches to FAILED > then attempts to restart and fails again, and so on. That is, the > parsing error leads to the Kafka message not being committed, hence it > keeps being received. > Since the JsonParseException can't be catched in application code, what > would be the recommended way to handle the case of possibly > non-parseable Kafka messages? > > Is there is a way to configure the Flink Kafka consumer to treat the > case of non-parseable messages by logging the parsing error then > committing the message such that the processing can continue? Is there > isn't, would such an enhancement make sense? > > Unless there is a better solution, it looks as a requirement to > guarantee that FlinkKafkaConsumer011 only receives valid messages, which > can be annoying in practice. > > For reference, here's the stack of the JsonParseException in the log: > > Source: Custom Source(1/1) switched to FAILED > org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParseException: > Unexpected character (':' (code 58)): Expected space separating > root-level values > at [Source: UNKNOWN; line: 1, column: 3] > at > org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:1586) > at > org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:521) > at > org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.base.ParserMinimalBase._reportUnexpectedChar(ParserMinimalBase.java:450) > at > org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.base.ParserMinimalBase._reportMissingRootWS(ParserMinimalBase.java:466) > at > org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.json.UTF8StreamJsonParser._verifyRootSpace(UTF8StreamJsonParser.java:1657) > at > org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.json.UTF8StreamJsonParser._parsePosNumber(UTF8StreamJsonParser.java:1394) > at > org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.json.UTF8StreamJsonParser._nextTokenNotInObject(UTF8StreamJsonParser.java:852) > at > org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.json.UTF8StreamJsonParser.nextToken(UTF8StreamJsonParser.java:748) > at > org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper._initForReading(ObjectMapper.java:3847) > at > org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:3792) > at > org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:2890) > at > org.apache.flink.streaming.util.serialization.JSONKeyValueDeserializationSchema.deserialize(JSONKeyValueDeserializationSchema.java:55) > at > org.apache.flink.streaming.util.serialization.JSONKeyValueDeserializationSchema.deserialize(JSONKeyValueDeserializationSchema.java:40) > at > org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.runFetchLoop(Kafka09Fetcher.java:140) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:641) > at > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:86) > at > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:55) > at > org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:94) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:264) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718) > at java.lang.Thread.run(Thread.java:745) > > My env: Flink 1.4.0 and kafka_2.11-1.0.0 running locally on Mac. > > Thanks, > Adrian > Sauf indication contraire ci-dessus:/ Unless stated otherwise above: > Compagnie IBM France > Siège Social : 17 avenue de l'Europe, 92275 Bois-Colombes Cedex > RCS Nanterre 552 118 465 > Forme Sociale : S.A.S. > Capital Social : 657.364.587 € > SIREN/SIRET : 552 118 465 03644 - Code NAF 6202A signature.asc (201 bytes) Download Attachment |
Hi Nico,
Thanks a lot. I did consider that, but I've missed the clarification of the contract brought by the piece a doc you pointed: "returning
null to allow the Flink Kafka consumer to silently skip the corrupted message".I suppose it could be an improvement for JSONKeyValueDeserializationSchema to provide this behaviour as an out-of-the-box option. But anyway, I do have a solution in hands.
Thanks again.
Adrian
----- Original message ----- Compagnie IBM France Siège Social : 17 avenue de l'Europe, 92275 Bois-Colombes Cedex RCS Nanterre 552 118 465 Forme Sociale : S.A.S. Capital Social : 657.364.587 € SIREN/SIRET : 552 118 465 03644 - Code NAF 6202A |
Nice, I didn't even read that far myself :P
-> turns out the API was prepared for that after all I'm not sure about a default option for handling/skipping corrupted messages since the handling of those is probably highly use-case specific. If you nonetheless feel that this should be in there, feel free to open an improvement request in our issue tracker at https://issues.apache.org/jira/browse/FLINK Nico On 16/01/18 13:35, Adrian Vasiliu wrote: > Hi Nico, > Thanks a lot. I did consider that, but I've missed the clarification of > the contract brought by the piece a doc you > pointed: "returning |null| to allow the Flink Kafka consumer to silently > skip the corrupted message". > I suppose it could be an improvement > for JSONKeyValueDeserializationSchema to provide this behaviour as an > out-of-the-box option. But anyway, I do have a solution in hands. > Thanks again. > Adrian > > > ----- Original message ----- > From: Nico Kruber <[hidden email]> > To: Adrian Vasiliu <[hidden email]>, [hidden email] > Cc: > Subject: Re: Unrecoverable job failure after Json parse error? > Date: Tue, Jan 16, 2018 11:34 AM > > Hi Adrian, > couldn't you solve this by providing your own DeserializationSchema [1], > possibly extending from JSONKeyValueDeserializationSchema and catching > the error there? > > > Nico > > [1] > https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/connectors/kafka.html#the-deserializationschema > > On 12/01/18 18:26, Adrian Vasiliu wrote: > > Hello, > > > > When using FlinkKafkaConsumer011 > with JSONKeyValueDeserializationSchema, > > if an invalid, non-parsable message is sent to the Kafka topic, the > > consumer expectedly fails with JsonParseException. So far so good, but > > this leads to the following loop: the job switches to FAILED > > then attempts to restart and fails again, and so on. That is, the > > parsing error leads to the Kafka message not being committed, hence it > > keeps being received. > > Since the JsonParseException can't be catched in application code, > what > > would be the recommended way to handle the case of possibly > > non-parseable Kafka messages? > > > > Is there is a way to configure the Flink Kafka consumer to treat the > > case of non-parseable messages by logging the parsing error then > > committing the message such that the processing can continue? Is there > > isn't, would such an enhancement make sense? > > > > Unless there is a better solution, it looks as a requirement to > > guarantee that FlinkKafkaConsumer011 only receives valid messages, > which > > can be annoying in practice. > > > > For reference, here's the stack of the JsonParseException in the log: > > > > Source: Custom Source(1/1) switched to FAILED > > > org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParseException: > > Unexpected character (':' (code 58)): Expected space separating > > root-level values > > at [Source: UNKNOWN; line: 1, column: 3] > > at > > > org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:1586) > > at > > > org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:521) > > at > > > org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.base.ParserMinimalBase._reportUnexpectedChar(ParserMinimalBase.java:450) > > at > > > org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.base.ParserMinimalBase._reportMissingRootWS(ParserMinimalBase.java:466) > > at > > > org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.json.UTF8StreamJsonParser._verifyRootSpace(UTF8StreamJsonParser.java:1657) > > at > > > org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.json.UTF8StreamJsonParser._parsePosNumber(UTF8StreamJsonParser.java:1394) > > at > > > org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.json.UTF8StreamJsonParser._nextTokenNotInObject(UTF8StreamJsonParser.java:852) > > at > > > org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.json.UTF8StreamJsonParser.nextToken(UTF8StreamJsonParser.java:748) > > at > > > org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper._initForReading(ObjectMapper.java:3847) > > at > > > org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:3792) > > at > > > org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:2890) > > at > > > org.apache.flink.streaming.util.serialization.JSONKeyValueDeserializationSchema.deserialize(JSONKeyValueDeserializationSchema.java:55) > > at > > > org.apache.flink.streaming.util.serialization.JSONKeyValueDeserializationSchema.deserialize(JSONKeyValueDeserializationSchema.java:40) > > at > > > org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.runFetchLoop(Kafka09Fetcher.java:140) > > at > > > org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:641) > > at > > > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:86) > > at > > > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:55) > > at > > > org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:94) > > at > > > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:264) > > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718) > > at java.lang.Thread.run(Thread.java:745) > > > > My env: Flink 1.4.0 and kafka_2.11-1.0.0 running locally on Mac. > > > > Thanks, > > Adrian > > Sauf indication contraire ci-dessus:/ Unless stated otherwise above: > > Compagnie IBM France > > Siège Social : 17 avenue de l'Europe, 92275 Bois-Colombes Cedex > > RCS Nanterre 552 118 465 > > Forme Sociale : S.A.S. > > Capital Social : 657.364.587 € > > SIREN/SIRET : 552 118 465 03644 - Code NAF 6202A > > > > > Sauf indication contraire ci-dessus:/ Unless stated otherwise above: > Compagnie IBM France > Siège Social : 17 avenue de l'Europe, 92275 Bois-Colombes Cedex > RCS Nanterre 552 118 465 > Forme Sociale : S.A.S. > Capital Social : 657.364.587 € > SIREN/SIRET : 552 118 465 03644 - Code NAF 6202A signature.asc (201 bytes) Download Attachment |
What I had in mind was about a generic handling of the JsonParseException case. But you are right, the picture becomes fuzzier if we also consider messages that are parseable but invalid due to missing or invalid fields. We could imagine a deeper message validation feature but I think subclassing to implement custom handling is okay...
Thanks
Adrian
----- Original message ----- Compagnie IBM France Siège Social : 17 avenue de l'Europe, 92275 Bois-Colombes Cedex RCS Nanterre 552 118 465 Forme Sociale : S.A.S. Capital Social : 657.364.587 € SIREN/SIRET : 552 118 465 03644 - Code NAF 6202A |
Free forum by Nabble | Edit this page |