Custom Metrics outside RichFunctions

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

Custom Metrics outside RichFunctions

David Magalhães
Hi, I want to create a custom metric that shows the number of message that couldn't be deserialized using a custom deserializer inside FlinkKafkaConsumer. 

Looking into Metrics page ( https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/metrics.html ) that doesn't seem to be possible, because it it's a RichFunction. 

Anyone know another way to achieve this ?

Thanks,
David
Reply | Threaded
Open this post in threaded view
|

Re: Custom Metrics outside RichFunctions

Yun Tang
Hi David

FlinkKafkaConsumer in itself is RichParallelSourceFunction, and you could call function below to register your metrics group:

getRuntimeContext().getMetricGroup().addGroup("MyMetrics").counter("myCounter")


Best
Yun Tang

From: David Magalhães <[hidden email]>
Sent: Tuesday, January 21, 2020 3:45
To: user <[hidden email]>
Subject: Custom Metrics outside RichFunctions
 
Hi, I want to create a custom metric that shows the number of message that couldn't be deserialized using a custom deserializer inside FlinkKafkaConsumer. 

Looking into Metrics page ( https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/metrics.html ) that doesn't seem to be possible, because it it's a RichFunction. 

Anyone know another way to achieve this ?

Thanks,
David
Reply | Threaded
Open this post in threaded view
|

Re: Custom Metrics outside RichFunctions

David Magalhães
Hi Yun, I'm trying to use inside a custom DeserializationSchema. Here is the constructor of FlinkKafkaConsumer. Inside DeserializationSchema I can't use getRuntimeContext().

FlinkKafkaConsumer(List<String> topics, DeserializationSchema<T> deserializer, Properties props)

On Wed, Jan 22, 2020 at 3:21 AM Yun Tang <[hidden email]> wrote:
Hi David

FlinkKafkaConsumer in itself is RichParallelSourceFunction, and you could call function below to register your metrics group:

getRuntimeContext().getMetricGroup().addGroup("MyMetrics").counter("myCounter")


Best
Yun Tang

From: David Magalhães <[hidden email]>
Sent: Tuesday, January 21, 2020 3:45
To: user <[hidden email]>
Subject: Custom Metrics outside RichFunctions
 
Hi, I want to create a custom metric that shows the number of message that couldn't be deserialized using a custom deserializer inside FlinkKafkaConsumer. 

Looking into Metrics page ( https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/metrics.html ) that doesn't seem to be possible, because it it's a RichFunction. 

Anyone know another way to achieve this ?

Thanks,
David
Reply | Threaded
Open this post in threaded view
|

Re: Custom Metrics outside RichFunctions

Chesnay Schepler
It is not possible to access metrics from within a schema.

I can't think of a non-hacky workaround (the hacky one being to create a custom kafka consumer that checks the schema class, casts it to your specific class, and then calls a method on your schema that accepts a metric group).

On 22/01/2020 14:33, David Magalhães wrote:
Hi Yun, I'm trying to use inside a custom DeserializationSchema. Here is the constructor of FlinkKafkaConsumer. Inside DeserializationSchema I can't use getRuntimeContext().

FlinkKafkaConsumer(List<String> topics, DeserializationSchema<T> deserializer, Properties props)

On Wed, Jan 22, 2020 at 3:21 AM Yun Tang <[hidden email]> wrote:
Hi David

FlinkKafkaConsumer in itself is RichParallelSourceFunction, and you could call function below to register your metrics group:

getRuntimeContext().getMetricGroup().addGroup("MyMetrics").counter("myCounter")


Best
Yun Tang

From: David Magalhães <[hidden email]>
Sent: Tuesday, January 21, 2020 3:45
To: user <[hidden email]>
Subject: Custom Metrics outside RichFunctions
 
Hi, I want to create a custom metric that shows the number of message that couldn't be deserialized using a custom deserializer inside FlinkKafkaConsumer. 

Looking into Metrics page ( https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/metrics.html ) that doesn't seem to be possible, because it it's a RichFunction. 

Anyone know another way to achieve this ?

Thanks,
David


Reply | Threaded
Open this post in threaded view
|

Re: Custom Metrics outside RichFunctions

David Magalhães
Thanks for the feedback. I will use elastalert to generate an alarm from the logs.

On Wed, Jan 22, 2020, 15:03 Chesnay Schepler <[hidden email]> wrote:
It is not possible to access metrics from within a schema.

I can't think of a non-hacky workaround (the hacky one being to create a custom kafka consumer that checks the schema class, casts it to your specific class, and then calls a method on your schema that accepts a metric group).

On 22/01/2020 14:33, David Magalhães wrote:
Hi Yun, I'm trying to use inside a custom DeserializationSchema. Here is the constructor of FlinkKafkaConsumer. Inside DeserializationSchema I can't use getRuntimeContext().

FlinkKafkaConsumer(List<String> topics, DeserializationSchema<T> deserializer, Properties props)

On Wed, Jan 22, 2020 at 3:21 AM Yun Tang <[hidden email]> wrote:
Hi David

FlinkKafkaConsumer in itself is RichParallelSourceFunction, and you could call function below to register your metrics group:

getRuntimeContext().getMetricGroup().addGroup("MyMetrics").counter("myCounter")


Best
Yun Tang

From: David Magalhães <[hidden email]>
Sent: Tuesday, January 21, 2020 3:45
To: user <[hidden email]>
Subject: Custom Metrics outside RichFunctions
 
Hi, I want to create a custom metric that shows the number of message that couldn't be deserialized using a custom deserializer inside FlinkKafkaConsumer. 

Looking into Metrics page ( https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/metrics.html ) that doesn't seem to be possible, because it it's a RichFunction. 

Anyone know another way to achieve this ?

Thanks,
David