Hi folks,
I’ve just recently started working with Flink and I was in the process of adding some metrics through my existing pipeline with the hopes of building some Grafana dashboards with them to help with observability. Initially I looked at the built-in Flink metrics that were available, but I didn’t see an easy mechanism for setting/using labels with them. Essentially, I have two properties for my messages coming through the pipeline that I’d like to be able to keep track of (tenant/source) across several metrics (e.g. total_messages with tenant / source labels, etc.). I didn’t see an easy way to adjust this out of the box, or wasn’t aware of a good pattern for handling these. I had previously used the Prometheus Client metrics [0] to accomplish this in the past but I wasn’t entirely sure how it would/could mesh with Flink. Does anyone have experience in working with these or know if they are supported? Secondly, when using the Flink metrics, I noticed I was receiving a separate metric for each task that was being spun up. Is there an “easy button” to handle aggregating these to ensure that a single metric (e.g. total_messages) reflects the total processed across all of the tasks instead of each individual one? Any recommendations / resources / advice would be greatly appreciated! Thanks, Rion |
Rion, Prasanna. On Sat, Feb 27, 2021 at 9:01 PM Rion Williams <[hidden email]> wrote:
|
Hi Prassana,
Thanks for that. It’s what I was doing previously as a workaround however I was just curious if there was any Flink-specific functionality to handle this prior to Prometheus. Additionally from the docs on metrics [0], it seems that there’s a pattern in place to use supported third-party metrics such as those from CodeHale/DropWizard via a Maven package (flink-metrics-dropwizard). I do see a similarly named package for Prometheus which may be what I’m looking for as it’s similarly named (flink-metrics-prometheus), so I may give that a try. Thanks, Rion On Feb 28, 2021, at 12:20 AM, Prasanna kumar <[hidden email]> wrote:
|
Hi Rion,
Regarding the question about adding Prometheus labels out of the box. This is common ask of all exporters, but Prometheus philosophy sees this as an "anti-pattern" as the metrics source can often be ambivalent about context. See [0] for example of such a discussion.
Instead, we can establish context during service discovery. If, for example, we run clusters for tenants on Kubernetes, then within the kubernetes_sd_config [1] labelling rules we can instruct Prometheus to add the Kubernetes labels from the pods, such as "tenant-id:
foo" and "environment: staging" to each incoming metric it processes.
This isn't limited to Kubernetes; each of the service discovery configs designed to accomodate translating metadata from context into metric labels.
If this doesn't work for you, then consider encoding tenant identifier into job names, and extract this identifier in a metric_relabel_config [2]
[2]:
https://prometheus.io/docs/prometheus/latest/configuration/configuration/#metric_relabel_configs
From: Rion Williams <[hidden email]>
Sent: Sunday, February 28, 2021 12:46 AM To: Prasanna kumar <[hidden email]> Cc: user <[hidden email]> Subject: Re: Using Prometheus Client Metrics in Flink Hi Prassana,
Thanks for that. It’s what I was doing previously as a workaround however I was just curious if there was any Flink-specific functionality to handle this prior to Prometheus.
Additionally from the docs on metrics [0], it seems that there’s a pattern in place to use supported third-party metrics such as those from CodeHale/DropWizard via a Maven package (flink-metrics-dropwizard). I do see a similarly named package for Prometheus
which may be what I’m looking for as it’s similarly named (flink-metrics-prometheus), so I may give that a try.
Thanks,
Rion
On Feb 28, 2021, at 12:20 AM, Prasanna kumar <[hidden email]> wrote:
|
Thanks Dylan,
Totally understandable. I already have the appropriate exporters / monitors in place for scraping metrics from Flink, including custom ones, into Prometheus. The labeling challenge is really the big one as while I see lots of labels for the metrics being exported (e.g. job id, worker, etc.) I didn’t see a mechanism to inject my own into those coming from Flink. Additionally, in my specific use case I’m dealing with a multi-tenant pipeline (I.e. reading messages from a single multi-tenant Kafka topic), which is where the labeling comes in. I’d love to be able to have a counter (among other types of metrics) with their appropriate labels for each tenant. I suppose I could implement a custom counter or series of counters (one for each tenant) that would each be responsible for keeping track of their own respective tenant values. In my case I’m dealing with a KeyedProcessFunction, so I only have access to the key (tenant) within the processElement function as opposed to when the function is initially opened, where I understand you would typically register a metric. Sorry for the somewhat convoluted response, I’m still getting accustomed to some of the Flink APIs, specifically around metrics. Thanks, Rion On Feb 28, 2021, at 8:02 AM, Meissner, Dylan <[hidden email]> wrote:
|
It looks like I was finally able to get the expected labeling behavior that I was looking for by simply storing a reference to the underlying MetricGroup and then keeping track of any new metrics that I needed to dynamically create and use downstream: class MagicMetricRegistry(private val metricGroup: MetricGroup): Serializable { // Reference for all of the registered metrics private val registeredMetrics: HashMap<String, Counter> = hashMapOf() // Increments a given metric by key fun inc(metric: String, tenant: String, source: String, amount: Long = 1) { // Store a key val key = "$metric-$tenant-$source" if (!registeredMetrics.containsKey(key)){ registeredMetrics[key] = metricGroup .addGroup("tenant", tenant) .addGroup("source", source) .counter(metric) } // Update the metric by a given amount registeredMetrics[key]!!.inc(amount) } } And then simply within the open function call in my KeyedProcessFunction, I stored a reference to it and registered any new, in this case tenant/source combinations, as they came in: class MagicWindowFunction: KeyedProcessFunction<...>() { @Transient private lateinit var metrics: MagicMetricRegistry override fun open(parameters: Configuration) { metrics = MagicMetricRegistry(runtimeContext.metricGroup) } override fun processElement(...) { // Omitted for brevity metrics.inc("logs_seen", "my-tenant", "my-source") } // Omitted for brevity } This appears to be working as expected as far as I can tell at this point. I can see all of the expected labels appearing within Prometheus and further downstream in Grafana! Thanks again, Rion On Sun, Feb 28, 2021 at 8:15 AM Rion Williams <[hidden email]> wrote:
|
Free forum by Nabble | Edit this page |