Hello,
I'm interested in exposing metrics from my UDFs. I see FLINK-1501 exposes task manager metrics via a UI; it would be nice to plug into the same MetricRegistry to register my own (ie, gauges). I don't see this exposed via runtime context. This did lead me to discovering the Accumulators API. This looks more oriented to simple counts, which are summed across components of a batch job. In my case, I'd like to expose details of my stream processing vertices so that I can monitor their correctness and health re: runtime decisions. For instance, referring back to my previous thread, I would like to expose the number of filters loaded into my custom RichCoFlatMap so that I can easily monitor this value. Thanks, Nick |
Hi Nick,
I don't know if you have already come across the Rest Api. If not, please have a look here: https://ci.apache.org/projects/flink/flink-docs-master/internals/monitoring_rest_api.html I know that Christian Kreutzfeldt (cc) has been working on a monitoring service which uses Akka messages to query the JobManager on a job's status and accumulators. I'm wondering if you two could engage in any way. Cheers, Max On Wed, Nov 11, 2015 at 6:44 PM, Nick Dimiduk <[hidden email]> wrote: > Hello, > > I'm interested in exposing metrics from my UDFs. I see FLINK-1501 exposes > task manager metrics via a UI; it would be nice to plug into the same > MetricRegistry to register my own (ie, gauges). I don't see this exposed via > runtime context. This did lead me to discovering the Accumulators API. This > looks more oriented to simple counts, which are summed across components of > a batch job. In my case, I'd like to expose details of my stream processing > vertices so that I can monitor their correctness and health re: runtime > decisions. For instance, referring back to my previous thread, I would like > to expose the number of filters loaded into my custom RichCoFlatMap so that > I can easily monitor this value. > > Thanks, > Nick |
Hey Nick,
you can do the following for per task stats (this is kind of an workaround): Create an Accumulator with the subtask index in the name, e.g. int subtaskIndex = getRuntimeContext().getIndexOfThisSubtask(); IntCounter counter = getRuntimeContext().getIntCounter("counter-" + subtaskIndex); This way you have one accumulator per subtask. The web interface will display the values as they are set (I’m not sure if it is in yet). You can also gather the stats from the execution result, e.g. ExecutionResult res = env.execute(); res.getAllAccumulatorResults(); You can furthermore add a custom Accumulator variant, which simple sets one value if this is what you need. Does this help? In any case, I agree that it would be nice to expose a special API/accumulator for this via the runtime context. – Ufuk > On 12 Nov 2015, at 11:55, Maximilian Michels <[hidden email]> wrote: > > Hi Nick, > > I don't know if you have already come across the Rest Api. If not, > please have a look here: > https://ci.apache.org/projects/flink/flink-docs-master/internals/monitoring_rest_api.html > > I know that Christian Kreutzfeldt (cc) has been working on a > monitoring service which uses Akka messages to query the JobManager on > a job's status and accumulators. I'm wondering if you two could engage > in any way. > > Cheers, > Max > > On Wed, Nov 11, 2015 at 6:44 PM, Nick Dimiduk <[hidden email]> wrote: >> Hello, >> >> I'm interested in exposing metrics from my UDFs. I see FLINK-1501 exposes >> task manager metrics via a UI; it would be nice to plug into the same >> MetricRegistry to register my own (ie, gauges). I don't see this exposed via >> runtime context. This did lead me to discovering the Accumulators API. This >> looks more oriented to simple counts, which are summed across components of >> a batch job. In my case, I'd like to expose details of my stream processing >> vertices so that I can monitor their correctness and health re: runtime >> decisions. For instance, referring back to my previous thread, I would like >> to expose the number of filters loaded into my custom RichCoFlatMap so that >> I can easily monitor this value. >> >> Thanks, >> Nick |
I'm much more interested in as-they-happening metrics than job completion summaries as these are stream processing jobs that should "never end". Ufuk's suggestion of a subtask-unique counter, combined with rate-of-change functions in a tool like InfluxDB will probably work for my needs. So too does managing my own dropwizard MetricRegistry. An observation: routing all online metrics through the heartbeat mechanism to a single host for display sounds like a scalability bottleneck. Doesn't this design limit the practical volume of metrics that can be exposed by the runtime and user applications? On Thu, Nov 12, 2015 at 6:12 AM, Ufuk Celebi <[hidden email]> wrote: Hey Nick, |
Hi Nick,
as Max mentioned in an earlier post on this topic, I started to work on a service to collect metrics from running stream processing jobs. We want to have all our metrics in one place whatever application (type) they come from. To integrate that behavior, I started to look at the accumulator API and learned from Max that all these information are collected for each task and get forwarded to the job manager. The job manager in turn provides a network exposed interface to interact with it (see org.apache.flink.runtime.messages.JobManagerMessages for more) using akka. What I did was to request for all running jobs, fetching more detailed information for each of them. You receive the accumulator values previously set. As the API currently provides only simple value counters, a basic average accumulator and a histogram (I have not worked with yet), I started to extend this to allow the use of metrics somehow similar to gauges, meters, timers, histograms and counters as defined by the dropwizard metrics framework. Unfortunately, an integration with the framework seems to be a more wild-hack oriented task. Therefore I decided to try out a smarter approach which even makes it simple on the flink framework side. If you know the graphite application, you will know that it receives a metric identifier, the current value and a timestamp as input. Everything else is handled either by graphite or a switched in statsd. To reduce any dependency from such external tools, I am actually working on a basic metric implementation which provides those metrics mentioned above. These are aggregated by the collector and may be forwarded towards any metrics system, eg. graphite. The overall idea is to keep things very simple as it may lead to heavy network traffic if too complex metrics types are provided on job side and must be transferred over the network. Keep it simple and do the aggregation on collector side. Your objection regarding the network traffic towards the job manager is valid and important. I haven't really thought about that so far, but maybe a more distributed approach must be found to avoid a bottleneck situation here. If you are interested in the solution that will be used throughout the jobs running in our environment, I hope this will be released as open source anytime soon since the Otto Group believes in open source ;-) If you would like to know more about it, feel free to ask ;-) Best Christian (Kreutzfeldt)
|
Hi Christian,
I've returned to this project and am interested in exploring options. Have you released any of your work yet? Have you considered an implementation where each flink worker exposes it's own metrics via a "well known interface" -- such as HTTP or JMX -- and letting an external process push those metrics to a central data store? This is the architecture pursued by OpenTSDB and Sensu. Thanks, Nick On Thu, Nov 12, 2015 at 1:01 PM, mnxfst <[hidden email]> wrote: Hi Nick, |
Free forum by Nabble | Edit this page |