Custom metrics output

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

Custom metrics output

Joris Geer
Hi,

We want to collect metrics for stream processing, typically counts aggregated over 1-minute buckets. However, we want these 1-minute boundaries determined by timestamps within the data records. Flink metrics do not handle this so we want to roll our own. How to proceed ? Some of our team members believe we can add methods in operator class code that can be called from the main Flink program, whist I am not sure this is supposed to be possible. Others consider using a side output stream with a record per input record and use Flink operators to do the aggregation. That may double the amount of records processed.

Can we extend the Flink metrics to provide such aggregation ?

Regards,

Joris

Reply | Threaded
Open this post in threaded view
|

Re: Custom metrics output

Fabian Hueske-2
Hi Joris,

I don't think that the approach of "add methods in operator class code that can be called from the main Flink program" will work.

The most efficient approach would be implementing a ProcessFunction that counts in 1-min time buckets (using event-time semantics) and updates the metrics.
If you need the metric values to be exact, you can keep the intermediate counts as operator state.
I would not use a KeyedProcessFunction because you didn't mention a key and to save the overhead of the shuffle.

You can integrate the ProcessFunctions in different ways in your job.

1) just embed it into the regular flow. The ProcessFunction would just count and forward every record it receives.
2) fork off a stream of records that just just hold the timestamp to a side output and apply the ProcessFunction on the forked-off stream.

I think the first approach is simpler and more efficient. The ProcessFunction would be an identity function to your actual data, just counting and reporting metrics.

Best, Fabian

Am Mo., 20. Juli 2020 um 01:30 Uhr schrieb Joris Geer <[hidden email]>:
Hi,

We want to collect metrics for stream processing, typically counts aggregated over 1-minute buckets. However, we want these 1-minute boundaries determined by timestamps within the data records. Flink metrics do not handle this so we want to roll our own. How to proceed ? Some of our team members believe we can add methods in operator class code that can be called from the main Flink program, whist I am not sure this is supposed to be possible. Others consider using a side output stream with a record per input record and use Flink operators to do the aggregation. That may double the amount of records processed.

Can we extend the Flink metrics to provide such aggregation ?

Regards,

Joris