Discard message on deserialization errors.

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

Discard message on deserialization errors.

John Smith
Hi using Flink 1.8.0.

I am ingesting data from Kafka, unfortunately for the time being I have not looked into using the schema registry.

So for now I would like to write a simple deserialization schema that discards the data if deserialization fails.

The other option is to do in flat map with markers and split to dead letter queue, but I'm not too concerned about that for now.

Is it ok to just return null if deserialization fails?

@Override
public MyObject deserialize(byte[] message) {
try {
return MyDecoder.decode(message));
} catch (IOException ex) {
logger.warn("Failed to decode message.", ex);
return null;
}
}
Reply | Threaded
Open this post in threaded view
|

Re: Discard message on deserialization errors.

Zhu Zhu
Hi John,

It should work with a null return value.
In the java doc of DeserializationSchema#deserialize it says that
@return The deserialized message as an object (null if the message cannot be deserialized).

I also checked the Kafka fetcher in Flink and it can correctly handle a null deserialized record.

Just pay attention to also not make DeserializationSchema#isEndOfStream throw errors on a null record provided.

Thanks,
Zhu Zhu

John Smith <[hidden email]> 于2019年10月12日周六 上午5:36写道:
Hi using Flink 1.8.0.

I am ingesting data from Kafka, unfortunately for the time being I have not looked into using the schema registry.

So for now I would like to write a simple deserialization schema that discards the data if deserialization fails.

The other option is to do in flat map with markers and split to dead letter queue, but I'm not too concerned about that for now.

Is it ok to just return null if deserialization fails?

@Override
public MyObject deserialize(byte[] message) {
try {
return MyDecoder.decode(message));
} catch (IOException ex) {
logger.warn("Failed to decode message.", ex);
return null;
}
}
Reply | Threaded
Open this post in threaded view
|

Re: Discard message on deserialization errors.

John Smith
The Kafka Fetcher you mean the flink JSON schemas? They throw IOExceptions?

Also what's the purpose of isEndOfStream most schemas I looked at don't do anything but just return false?

On Fri., Oct. 11, 2019, 11:44 p.m. Zhu Zhu, <[hidden email]> wrote:
Hi John,

It should work with a null return value.
In the java doc of DeserializationSchema#deserialize it says that
@return The deserialized message as an object (null if the message cannot be deserialized).

I also checked the Kafka fetcher in Flink and it can correctly handle a null deserialized record.

Just pay attention to also not make DeserializationSchema#isEndOfStream throw errors on a null record provided.

Thanks,
Zhu Zhu

John Smith <[hidden email]> 于2019年10月12日周六 上午5:36写道:
Hi using Flink 1.8.0.

I am ingesting data from Kafka, unfortunately for the time being I have not looked into using the schema registry.

So for now I would like to write a simple deserialization schema that discards the data if deserialization fails.

The other option is to do in flat map with markers and split to dead letter queue, but I'm not too concerned about that for now.

Is it ok to just return null if deserialization fails?

@Override
public MyObject deserialize(byte[] message) {
try {
return MyDecoder.decode(message));
} catch (IOException ex) {
logger.warn("Failed to decode message.", ex);
return null;
}
}
Reply | Threaded
Open this post in threaded view
|

Re: Discard message on deserialization errors.

Zhu Zhu
I mean the Kafka source provided in Flink can correctly ignores null deserialized values.

isEndOfStream allows you to control when to end the input stream. 
If it is used for running infinite stream jobs, you can simply return false.

Thanks,
Zhu Zhu

John Smith <[hidden email]> 于2019年10月12日周六 下午8:40写道:
The Kafka Fetcher you mean the flink JSON schemas? They throw IOExceptions?

Also what's the purpose of isEndOfStream most schemas I looked at don't do anything but just return false?

On Fri., Oct. 11, 2019, 11:44 p.m. Zhu Zhu, <[hidden email]> wrote:
Hi John,

It should work with a null return value.
In the java doc of DeserializationSchema#deserialize it says that
@return The deserialized message as an object (null if the message cannot be deserialized).

I also checked the Kafka fetcher in Flink and it can correctly handle a null deserialized record.

Just pay attention to also not make DeserializationSchema#isEndOfStream throw errors on a null record provided.

Thanks,
Zhu Zhu

John Smith <[hidden email]> 于2019年10月12日周六 上午5:36写道:
Hi using Flink 1.8.0.

I am ingesting data from Kafka, unfortunately for the time being I have not looked into using the schema registry.

So for now I would like to write a simple deserialization schema that discards the data if deserialization fails.

The other option is to do in flat map with markers and split to dead letter queue, but I'm not too concerned about that for now.

Is it ok to just return null if deserialization fails?

@Override
public MyObject deserialize(byte[] message) {
try {
return MyDecoder.decode(message));
} catch (IOException ex) {
logger.warn("Failed to decode message.", ex);
return null;
}
}
Reply | Threaded
Open this post in threaded view
|

Re: Discard message on deserialization errors.

John Smith
Ah ok thanks!

On Sat, 12 Oct 2019 at 11:13, Zhu Zhu <[hidden email]> wrote:
I mean the Kafka source provided in Flink can correctly ignores null deserialized values.

isEndOfStream allows you to control when to end the input stream. 
If it is used for running infinite stream jobs, you can simply return false.

Thanks,
Zhu Zhu

John Smith <[hidden email]> 于2019年10月12日周六 下午8:40写道:
The Kafka Fetcher you mean the flink JSON schemas? They throw IOExceptions?

Also what's the purpose of isEndOfStream most schemas I looked at don't do anything but just return false?

On Fri., Oct. 11, 2019, 11:44 p.m. Zhu Zhu, <[hidden email]> wrote:
Hi John,

It should work with a null return value.
In the java doc of DeserializationSchema#deserialize it says that
@return The deserialized message as an object (null if the message cannot be deserialized).

I also checked the Kafka fetcher in Flink and it can correctly handle a null deserialized record.

Just pay attention to also not make DeserializationSchema#isEndOfStream throw errors on a null record provided.

Thanks,
Zhu Zhu

John Smith <[hidden email]> 于2019年10月12日周六 上午5:36写道:
Hi using Flink 1.8.0.

I am ingesting data from Kafka, unfortunately for the time being I have not looked into using the schema registry.

So for now I would like to write a simple deserialization schema that discards the data if deserialization fails.

The other option is to do in flat map with markers and split to dead letter queue, but I'm not too concerned about that for now.

Is it ok to just return null if deserialization fails?

@Override
public MyObject deserialize(byte[] message) {
try {
return MyDecoder.decode(message));
} catch (IOException ex) {
logger.warn("Failed to decode message.", ex);
return null;
}
}