Tracking deserialization errors

classic Classic list List threaded Threaded
8 messages Options
Reply | Threaded
Open this post in threaded view
|

Tracking deserialization errors

Elias Levy
I was wondering how are folks tracking deserialization errors.  The AbstractDeserializationSchema interface provides no mechanism for the deserializer to instantiate a metric counter, and "deserialize" must return a null instead of raising an exception in case of error if you want your job to continue functioning during a deserialization error.  But that means such errors are invisible.

Thoughts?
Reply | Threaded
Open this post in threaded view
|

Re: Tracking deserialization errors

Alexander Smirnov
I have the same question. In case of kafka source, it would be good to know topic name and offset of the corrupted message for further investigation.
Looks like the only option is to write messages into a log file

On Fri, Apr 6, 2018 at 9:12 PM Elias Levy <[hidden email]> wrote:
I was wondering how are folks tracking deserialization errors.  The AbstractDeserializationSchema interface provides no mechanism for the deserializer to instantiate a metric counter, and "deserialize" must return a null instead of raising an exception in case of error if you want your job to continue functioning during a deserialization error.  But that means such errors are invisible.

Thoughts?
Reply | Threaded
Open this post in threaded view
|

Re: Tracking deserialization errors

Fabian Hueske-2
Thanks for starting the discussion Elias.

I see two ways to address this issue.

1) Add an interface that a deserialization schema can implement to register metrics. Each source would need to check for the interface and call it to setup metrics.
2) Check for null returns in the source functions and increment a respective counter.

In both cases, we need to touch the source connectors.

I see that passing information such as topic name, partition, and offset are important debugging information. However, I don't think that metrics would be good to capture them.
In that case, log files might be a better approach.

I'm not sure to what extend the source functions (Kafka, Kinesis) support such error tracking.
Adding Gordon to the thread who knows the internals of the connectors.

Best, Fabian

2018-04-08 17:53 GMT+02:00 Alexander Smirnov <[hidden email]>:
I have the same question. In case of kafka source, it would be good to know topic name and offset of the corrupted message for further investigation.
Looks like the only option is to write messages into a log file

On Fri, Apr 6, 2018 at 9:12 PM Elias Levy <[hidden email]> wrote:
I was wondering how are folks tracking deserialization errors.  The AbstractDeserializationSchema interface provides no mechanism for the deserializer to instantiate a metric counter, and "deserialize" must return a null instead of raising an exception in case of error if you want your job to continue functioning during a deserialization error.  But that means such errors are invisible.

Thoughts?

Reply | Threaded
Open this post in threaded view
|

Re: Tracking deserialization errors

Tzu-Li (Gordon) Tai
Hi,

These are valid concerns. And yes, AFAIK users have been writing to logs within the deserialization schema to track this. The connectors as of now have no logging themselves in case of a skipped record.

I think we can implement both logging and metrics to track this, most of which you have already brought up.
For logging, the information should contain topic, partition, and offset for debugging.
For metrics, we should be able to use the user variable functionality to have skip counters that can be grouped by topic / partition / offset.

Though, I’m not sure how helpful this would be in practice.
I’ve opened a JIRA for this issue for further discussion: https://issues.apache.org/jira/browse/FLINK-9204

Cheers,
Gordon

On 16 April 2018 at 7:43:00 PM, Fabian Hueske ([hidden email]) wrote:

Thanks for starting the discussion Elias.

I see two ways to address this issue.

1) Add an interface that a deserialization schema can implement to register metrics. Each source would need to check for the interface and call it to setup metrics.
2) Check for null returns in the source functions and increment a respective counter.

In both cases, we need to touch the source connectors.

I see that passing information such as topic name, partition, and offset are important debugging information. However, I don't think that metrics would be good to capture them.
In that case, log files might be a better approach.

I'm not sure to what extend the source functions (Kafka, Kinesis) support such error tracking.
Adding Gordon to the thread who knows the internals of the connectors.

Best, Fabian

2018-04-08 17:53 GMT+02:00 Alexander Smirnov <[hidden email]>:
I have the same question. In case of kafka source, it would be good to know topic name and offset of the corrupted message for further investigation.
Looks like the only option is to write messages into a log file

On Fri, Apr 6, 2018 at 9:12 PM Elias Levy <[hidden email]> wrote:
I was wondering how are folks tracking deserialization errors.  The AbstractDeserializationSchema interface provides no mechanism for the deserializer to instantiate a metric counter, and "deserialize" must return a null instead of raising an exception in case of error if you want your job to continue functioning during a deserialization error.  But that means such errors are invisible.

Thoughts?

Reply | Threaded
Open this post in threaded view
|

Re: Tracking deserialization errors

Alexander Smirnov
ouch, i forgot to mention I opened https://issues.apache.org/jira/browse/FLINK-9155 to track this. Should it be a duplicate of 9204 then?

On Wed, Apr 18, 2018 at 3:32 PM Tzu-Li (Gordon) Tai <[hidden email]> wrote:
Hi,

These are valid concerns. And yes, AFAIK users have been writing to logs within the deserialization schema to track this. The connectors as of now have no logging themselves in case of a skipped record.

I think we can implement both logging and metrics to track this, most of which you have already brought up.
For logging, the information should contain topic, partition, and offset for debugging.
For metrics, we should be able to use the user variable functionality to have skip counters that can be grouped by topic / partition / offset.

Though, I’m not sure how helpful this would be in practice.
I’ve opened a JIRA for this issue for further discussion: https://issues.apache.org/jira/browse/FLINK-9204

Cheers,
Gordon

On 16 April 2018 at 7:43:00 PM, Fabian Hueske ([hidden email]) wrote:

Thanks for starting the discussion Elias.

I see two ways to address this issue.

1) Add an interface that a deserialization schema can implement to register metrics. Each source would need to check for the interface and call it to setup metrics.
2) Check for null returns in the source functions and increment a respective counter.

In both cases, we need to touch the source connectors.

I see that passing information such as topic name, partition, and offset are important debugging information. However, I don't think that metrics would be good to capture them.
In that case, log files might be a better approach.

I'm not sure to what extend the source functions (Kafka, Kinesis) support such error tracking.
Adding Gordon to the thread who knows the internals of the connectors.

Best, Fabian

2018-04-08 17:53 GMT+02:00 Alexander Smirnov <[hidden email]>:
I have the same question. In case of kafka source, it would be good to know topic name and offset of the corrupted message for further investigation.
Looks like the only option is to write messages into a log file

On Fri, Apr 6, 2018 at 9:12 PM Elias Levy <[hidden email]> wrote:
I was wondering how are folks tracking deserialization errors.  The AbstractDeserializationSchema interface provides no mechanism for the deserializer to instantiate a metric counter, and "deserialize" must return a null instead of raising an exception in case of error if you want your job to continue functioning during a deserialization error.  But that means such errors are invisible.

Thoughts?

Reply | Threaded
Open this post in threaded view
|

Re: Tracking deserialization errors

Elias Levy
In reply to this post by Fabian Hueske-2
Either proposal would work.  In the later case, at a minimum we'd need a way to identify the source within the metric.  The basic error metric would then allow us to go into the logs to determine the cause of the error, as we already record the message causing trouble in the log. 


On Mon, Apr 16, 2018 at 4:42 AM, Fabian Hueske <[hidden email]> wrote:
Thanks for starting the discussion Elias.

I see two ways to address this issue.

1) Add an interface that a deserialization schema can implement to register metrics. Each source would need to check for the interface and call it to setup metrics.
2) Check for null returns in the source functions and increment a respective counter.

In both cases, we need to touch the source connectors.

I see that passing information such as topic name, partition, and offset are important debugging information. However, I don't think that metrics would be good to capture them.
In that case, log files might be a better approach.

I'm not sure to what extend the source functions (Kafka, Kinesis) support such error tracking.
Adding Gordon to the thread who knows the internals of the connectors.

Reply | Threaded
Open this post in threaded view
|

Re: Tracking deserialization errors

Tzu-Li (Gordon) Tai
@Alexander
Sorry about that, that would be my mistake. I’ll close FLINK-9204 as a duplicate and leave my thoughts on FLINK-9155. Thanks for pointing out!


On 19 April 2018 at 2:00:51 AM, Elias Levy ([hidden email]) wrote:

Either proposal would work.  In the later case, at a minimum we'd need a way to identify the source within the metric.  The basic error metric would then allow us to go into the logs to determine the cause of the error, as we already record the message causing trouble in the log. 


On Mon, Apr 16, 2018 at 4:42 AM, Fabian Hueske <[hidden email]> wrote:
Thanks for starting the discussion Elias.

I see two ways to address this issue.

1) Add an interface that a deserialization schema can implement to register metrics. Each source would need to check for the interface and call it to setup metrics.
2) Check for null returns in the source functions and increment a respective counter.

In both cases, we need to touch the source connectors.

I see that passing information such as topic name, partition, and offset are important debugging information. However, I don't think that metrics would be good to capture them.
In that case, log files might be a better approach.

I'm not sure to what extend the source functions (Kafka, Kinesis) support such error tracking.
Adding Gordon to the thread who knows the internals of the connectors.

Reply | Threaded
Open this post in threaded view
|

Re: Tracking deserialization errors

Alexander Smirnov
That's absolutely no problem Tzu-Li. Either of them would work. Thank you!

On Thu, Apr 19, 2018 at 4:56 PM Tzu-Li (Gordon) Tai <[hidden email]> wrote:
@Alexander
Sorry about that, that would be my mistake. I’ll close FLINK-9204 as a duplicate and leave my thoughts on FLINK-9155. Thanks for pointing out!


On 19 April 2018 at 2:00:51 AM, Elias Levy ([hidden email]) wrote:

Either proposal would work.  In the later case, at a minimum we'd need a way to identify the source within the metric.  The basic error metric would then allow us to go into the logs to determine the cause of the error, as we already record the message causing trouble in the log. 


On Mon, Apr 16, 2018 at 4:42 AM, Fabian Hueske <[hidden email]> wrote:
Thanks for starting the discussion Elias.

I see two ways to address this issue.

1) Add an interface that a deserialization schema can implement to register metrics. Each source would need to check for the interface and call it to setup metrics.
2) Check for null returns in the source functions and increment a respective counter.

In both cases, we need to touch the source connectors.

I see that passing information such as topic name, partition, and offset are important debugging information. However, I don't think that metrics would be good to capture them.
In that case, log files might be a better approach.

I'm not sure to what extend the source functions (Kafka, Kinesis) support such error tracking.
Adding Gordon to the thread who knows the internals of the connectors.