Accumulators/Metrics

classic Classic list List threaded Threaded
6 messages Options
Reply | Threaded
Open this post in threaded view
|

Accumulators/Metrics

Nick Dimiduk
Hello,

I'm interested in exposing metrics from my UDFs. I see FLINK-1501 exposes task manager metrics via a UI; it would be nice to plug into the same MetricRegistry to register my own (ie, gauges). I don't see this exposed via runtime context. This did lead me to discovering the Accumulators API. This looks more oriented to simple counts, which are summed across components of a batch job. In my case, I'd like to expose details of my stream processing vertices so that I can monitor their correctness and health re: runtime decisions. For instance, referring back to my previous thread, I would like to expose the number of filters loaded into my custom RichCoFlatMap so that I can easily monitor this value.

Thanks,
Nick
Reply | Threaded
Open this post in threaded view
|

Re: Accumulators/Metrics

Maximilian Michels
Hi Nick,

I don't know if you have already come across the Rest Api. If not,
please have a look here:
https://ci.apache.org/projects/flink/flink-docs-master/internals/monitoring_rest_api.html

I know that Christian Kreutzfeldt (cc) has been working on a
monitoring service which uses Akka messages to query the JobManager on
a job's status and accumulators. I'm wondering if you two could engage
in any way.

Cheers,
Max

On Wed, Nov 11, 2015 at 6:44 PM, Nick Dimiduk <[hidden email]> wrote:

> Hello,
>
> I'm interested in exposing metrics from my UDFs. I see FLINK-1501 exposes
> task manager metrics via a UI; it would be nice to plug into the same
> MetricRegistry to register my own (ie, gauges). I don't see this exposed via
> runtime context. This did lead me to discovering the Accumulators API. This
> looks more oriented to simple counts, which are summed across components of
> a batch job. In my case, I'd like to expose details of my stream processing
> vertices so that I can monitor their correctness and health re: runtime
> decisions. For instance, referring back to my previous thread, I would like
> to expose the number of filters loaded into my custom RichCoFlatMap so that
> I can easily monitor this value.
>
> Thanks,
> Nick
Reply | Threaded
Open this post in threaded view
|

Re: Accumulators/Metrics

Ufuk Celebi
Hey Nick,

you can do the following for per task stats (this is kind of an workaround):

Create an Accumulator with the subtask index in the name, e.g.

int subtaskIndex = getRuntimeContext().getIndexOfThisSubtask();
IntCounter counter = getRuntimeContext().getIntCounter("counter-" + subtaskIndex);

This way you have one accumulator per subtask.

The web interface will display the values as they are set (I’m not sure if it is in yet). You can also gather the stats from the execution result, e.g.
ExecutionResult res = env.execute();
res.getAllAccumulatorResults();


You can furthermore add a custom Accumulator variant, which simple sets one value if this is what you need.

Does this help?

In any case, I agree that it would be nice to expose a special API/accumulator for this via the runtime context.

– Ufuk

> On 12 Nov 2015, at 11:55, Maximilian Michels <[hidden email]> wrote:
>
> Hi Nick,
>
> I don't know if you have already come across the Rest Api. If not,
> please have a look here:
> https://ci.apache.org/projects/flink/flink-docs-master/internals/monitoring_rest_api.html
>
> I know that Christian Kreutzfeldt (cc) has been working on a
> monitoring service which uses Akka messages to query the JobManager on
> a job's status and accumulators. I'm wondering if you two could engage
> in any way.
>
> Cheers,
> Max
>
> On Wed, Nov 11, 2015 at 6:44 PM, Nick Dimiduk <[hidden email]> wrote:
>> Hello,
>>
>> I'm interested in exposing metrics from my UDFs. I see FLINK-1501 exposes
>> task manager metrics via a UI; it would be nice to plug into the same
>> MetricRegistry to register my own (ie, gauges). I don't see this exposed via
>> runtime context. This did lead me to discovering the Accumulators API. This
>> looks more oriented to simple counts, which are summed across components of
>> a batch job. In my case, I'd like to expose details of my stream processing
>> vertices so that I can monitor their correctness and health re: runtime
>> decisions. For instance, referring back to my previous thread, I would like
>> to expose the number of filters loaded into my custom RichCoFlatMap so that
>> I can easily monitor this value.
>>
>> Thanks,
>> Nick

Reply | Threaded
Open this post in threaded view
|

Re: Accumulators/Metrics

Nick Dimiduk
I'm much more interested in as-they-happening metrics than job completion summaries as these are stream processing jobs that should "never end". Ufuk's suggestion of a subtask-unique counter, combined with rate-of-change functions in a tool like InfluxDB will probably work for my needs. So too does managing my own dropwizard MetricRegistry.

An observation: routing all online metrics through the heartbeat mechanism to a single host for display sounds like a scalability bottleneck. Doesn't this design limit the practical volume of metrics that can be exposed by the runtime and user applications?

On Thu, Nov 12, 2015 at 6:12 AM, Ufuk Celebi <[hidden email]> wrote:
Hey Nick,

you can do the following for per task stats (this is kind of an workaround):

Create an Accumulator with the subtask index in the name, e.g.

int subtaskIndex = getRuntimeContext().getIndexOfThisSubtask();
IntCounter counter = getRuntimeContext().getIntCounter("counter-" + subtaskIndex);

This way you have one accumulator per subtask.

The web interface will display the values as they are set (I’m not sure if it is in yet). You can also gather the stats from the execution result, e.g.
ExecutionResult res = env.execute();
res.getAllAccumulatorResults();


You can furthermore add a custom Accumulator variant, which simple sets one value if this is what you need.

Does this help?

In any case, I agree that it would be nice to expose a special API/accumulator for this via the runtime context.

– Ufuk

> On 12 Nov 2015, at 11:55, Maximilian Michels <[hidden email]> wrote:
>
> Hi Nick,
>
> I don't know if you have already come across the Rest Api. If not,
> please have a look here:
> https://ci.apache.org/projects/flink/flink-docs-master/internals/monitoring_rest_api.html
>
> I know that Christian Kreutzfeldt (cc) has been working on a
> monitoring service which uses Akka messages to query the JobManager on
> a job's status and accumulators. I'm wondering if you two could engage
> in any way.
>
> Cheers,
> Max
>
> On Wed, Nov 11, 2015 at 6:44 PM, Nick Dimiduk <[hidden email]> wrote:
>> Hello,
>>
>> I'm interested in exposing metrics from my UDFs. I see FLINK-1501 exposes
>> task manager metrics via a UI; it would be nice to plug into the same
>> MetricRegistry to register my own (ie, gauges). I don't see this exposed via
>> runtime context. This did lead me to discovering the Accumulators API. This
>> looks more oriented to simple counts, which are summed across components of
>> a batch job. In my case, I'd like to expose details of my stream processing
>> vertices so that I can monitor their correctness and health re: runtime
>> decisions. For instance, referring back to my previous thread, I would like
>> to expose the number of filters loaded into my custom RichCoFlatMap so that
>> I can easily monitor this value.
>>
>> Thanks,
>> Nick


Reply | Threaded
Open this post in threaded view
|

Re: Accumulators/Metrics

mnxfst
Hi Nick,

as Max mentioned in an earlier post on this topic, I started to work on a service to collect metrics from running stream processing jobs. We want to have all our metrics in one place whatever application (type) they come from.

To integrate that behavior, I started to look at the accumulator API and learned from Max that all these information are collected for each task and get forwarded to the job manager. The job manager in turn provides a network exposed interface to interact with it (see org.apache.flink.runtime.messages.JobManagerMessages for more) using akka.

What I did was to request for all running jobs, fetching more detailed information for each of them. You receive the accumulator values previously set.

As the API currently provides only simple value counters, a basic average accumulator and a histogram (I have not worked with yet), I started to extend this to allow the use of metrics somehow similar to gauges, meters, timers, histograms and counters as defined by the dropwizard metrics framework.

Unfortunately, an integration with the framework seems to be a more wild-hack oriented task. Therefore I decided to try out a smarter approach which even makes it simple on the flink framework side.

If you know the graphite application, you will know that it receives a metric identifier, the current value and a timestamp as input. Everything else is handled either by graphite or a switched in statsd.

To reduce any dependency from such external tools, I am actually working on a basic metric implementation which provides those metrics mentioned above. These are aggregated by the collector and may be forwarded towards any metrics system, eg. graphite.

The overall idea is to keep things very simple as it may lead to heavy network traffic if too complex metrics types are provided on job side and must be transferred over the network. Keep it simple and do the aggregation on collector side.

Your objection regarding the network traffic towards the job manager is valid and important. I haven't really thought about that so far, but maybe a more distributed approach must be found to avoid a bottleneck situation here.

If you are interested in the solution that will be used throughout the jobs running in our environment, I hope this will be released as open source anytime soon since the Otto Group believes in open source ;-) If you would like to know more about it, feel free to ask ;-)

Best
  Christian (Kreutzfeldt)

Nick Dimiduk wrote
I'm much more interested in as-they-happening metrics than job completion
summaries as these are stream processing jobs that should "never end".
Ufuk's suggestion of a subtask-unique counter, combined with rate-of-change
functions in a tool like InfluxDB will probably work for my needs. So too
does managing my own dropwizard MetricRegistry.

An observation: routing all online metrics through the heartbeat mechanism
to a single host for display sounds like a scalability bottleneck. Doesn't
this design limit the practical volume of metrics that can be exposed by
the runtime and user applications?
Reply | Threaded
Open this post in threaded view
|

Re: Accumulators/Metrics

Nick Dimiduk-2
Hi Christian,

I've returned to this project and am interested in exploring options. Have you released any of your work yet? Have you considered an implementation where each flink worker exposes it's own metrics via a "well known interface" -- such as HTTP or JMX -- and letting an external process push those metrics to a central data store? This is the architecture pursued by OpenTSDB and Sensu.

Thanks,
Nick

On Thu, Nov 12, 2015 at 1:01 PM, mnxfst <[hidden email]> wrote:
Hi Nick,

as Max mentioned in an earlier post on this topic, I started to work on a
service to collect metrics from running stream processing jobs. We want to
have all our metrics in one place whatever application (type) they come
from.

To integrate that behavior, I started to look at the accumulator API and
learned from Max that all these information are collected for each task and
get forwarded to the job manager. The job manager in turn provides a network
exposed interface to interact with it (see
org.apache.flink.runtime.messages.JobManagerMessages for more) using akka.

What I did was to request for all running jobs, fetching more detailed
information for each of them. You receive the accumulator values previously
set.

As the API currently provides only simple value counters, a basic average
accumulator and a histogram (I have not worked with yet), I started to
extend this to allow the use of metrics somehow similar to gauges, meters,
timers, histograms and counters as defined by the dropwizard metrics
framework.

Unfortunately, an integration with the framework seems to be a more
wild-hack oriented task. Therefore I decided to try out a smarter approach
which even makes it simple on the flink framework side.

If you know the graphite application, you will know that it receives a
metric identifier, the current value and a timestamp as input. Everything
else is handled either by graphite or a switched in statsd.

To reduce any dependency from such external tools, I am actually working on
a basic metric implementation which provides those metrics mentioned above.
These are aggregated by the collector and may be forwarded towards any
metrics system, eg. graphite.

The overall idea is to keep things very simple as it may lead to heavy
network traffic if too complex metrics types are provided on job side and
must be transferred over the network. Keep it simple and do the aggregation
on collector side.

Your objection regarding the network traffic towards the job manager is
valid and important. I haven't really thought about that so far, but maybe a
more distributed approach must be found to avoid a bottleneck situation
here.

If you are interested in the solution that will be used throughout the jobs
running in our environment, I hope this will be released as open source
anytime soon since the Otto Group believes in open source ;-) If you would
like to know more about it, feel free to ask ;-)

Best
  Christian (Kreutzfeldt)


Nick Dimiduk wrote
> I'm much more interested in as-they-happening metrics than job completion
> summaries as these are stream processing jobs that should "never end".
> Ufuk's suggestion of a subtask-unique counter, combined with
> rate-of-change
> functions in a tool like InfluxDB will probably work for my needs. So too
> does managing my own dropwizard MetricRegistry.
>
> An observation: routing all online metrics through the heartbeat mechanism
> to a single host for display sounds like a scalability bottleneck. Doesn't
> this design limit the practical volume of metrics that can be exposed by
> the runtime and user applications?





--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Accumulators-Metrics-tp3447p3459.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.