Hi all,
Recently, I was working on adding some custom metrics to a Flink job that required the use of dynamic labels (i.e. capturing various counters that were "slicable" by things like tenant / source, etc.). I ended up handling it in a very naive fashion that would just keep a dictionary of metrics that had already been registered and update them accordingly which looked something like this: class MyCustomProcessFunction: ProcessFunction<Event, Unit>() {Basically registering and updating new metrics for tenants as they are encountered, which I've seen being emitted as expected via hitting the appropriately configured metrics endpoint (using a PrometheusReporter). However, while I was trying to write a few unit tests for this, I seemed to encounter an issue. I was following a Stack Overflow post that was answered by [hidden email] [0] that described the use of an in-memory/embedded Flink cluster and a custom reporter that would statically expose the underlying metrics. So I took a shot at implementing something similar as follows: Flink Cluster Definition private val metricsConfiguration = Configuration.fromMap(mutableMapOf(Custom Reporter class MockCustomMetricsReporter : MetricReporter {Example Test @TestWhile this test will pass, the problem is that the custom metrics defined dynamically (via the CustomMetricsRegistry implementation) do not appear within the registeredCustomMetrics collection. In fact, there are 21 metrics that get registered but all of them appear to be classic out-of-the-box metrics such as CPU usage, number of task managers, load, various other Netty and JVM stats, but no custom metrics are included. I've tried multiple different configurations, implementations via a custom TestHarness, etc. but for some reason the custom metrics being defined are never triggering the notifyOfAddedMetric function which would be responsible for adding them to the static collection to be asserted against. Any ideas / guidance would be more than welcome. Perhaps a different approach? Based off examples I've encountered, the code seems like it should "just work". Thanks much, Rion |
Was there anything in the logs (ideally
on debug)?
Have you debugged the execution and
followed the counter() calls all the way to the reporter?
Do you only see JobManager metrics,
or is there somewhere also something about the TaskManager?
I can see several issues with your
code, but none that would fully explain the issue:
a) your reporter is not thread-safe
b) you only differentiate metrics by
name, which will lead to quite a few collisions.
Be also aware that there will be 2
reporter instances; one for the JM and one for the TM.
To remedy this, I would recommend
creating a factory that returns a static reporter instance
instead; overall this tends to be cleaner.
Alternatively, when using the testing
harnesses IIRC you can also set set a custom MetricGroup
implementation.
On 3/16/2021 4:13 AM, Rion Williams
wrote:
|
Hi Chesnay, Thanks for the prompt response and feedback, it's very much appreciated. Please see the inline responses below to your questions: Was there anything in the logs (ideally on debug)? I didn't see anything within the logs that seemed to indicate anything out of the ordinary. I'm currently using a MiniClusterResources for this and attempted to set the logging levels to pick up everything (i.e. ALL), but if there's a way to expose more, I'm not aware of it. Have you debugged the execution and followed the counter() calls all the way to the reporter? With the debugger, I traced one of the counter initializations and it seems that no reporters were being found within the register call in the MetricsRegistryImpl (i.e. this.reporters has no registered reporters): if (this.reporters != null) {Perhaps this is an error on my part as I had assumed the following would be sufficient to register my reporter (within a local / minicluster environment): private val metricsConfiguration = Configuration.fromMap(mutableMapOf( However, it's clearly being recognized for the built-in metrics, just not these custom ones that are being registered as they are triggering the notifyOfAddedMetric() function within the reporter itself. Do you only see JobManager metrics, or is there somewhere also something about the TaskManager? It looks like there are metrics coming from both the JobManager and TaskManagers from the following examples that were coming out: localhost.jobmanager.numRegisteredTaskManagers .taskmanager.ad5aaade-cd54-44f4-a099-a765b53f79b4.Status.Shuffle.Netty.UsedMemorySegments .taskmanager.ad5aaade-cd54-44f4-a099-a765b53f79b4.Status.JVM.Memory.Metaspace.Committed localhost.jobmanager.Status.JVM.Memory.Direct.CountI do agree that a factory implementation with a static reporter would likely be a better approach, so I may explore that a bit more. As well as adding some changes to the existing, albeit ghetto, implementation for handling the dynamic metrics. I did see several references to a MetricRegistry class, however I wasn't sure if that was the most appropriate place to add this type of functionality or if it was needed at all. Thanks much, Rion On Tue, Mar 16, 2021 at 4:45 AM Chesnay Schepler <[hidden email]> wrote:
|
Are you actually running a job, or are
you using a harness for testing your function?
On 3/16/2021 3:24 PM, Rion Williams
wrote:
|
In this case, I was using a harness to test the function. Although, I could honestly care less about the unit-test surrounding metrics, I'm much more concerned with having something that will actually run and work as intended within a job. The only real concern I have or problem that I want to solve is building metrics that may vary based on the data coming in from a "label" perspective (e.g. keeping track of the events I've seen for a given tenant, or some other properties). Something like: <metric prefix>_events_seen { tenant = "tenant-1" } 1.0 <metric prefix>_events_seen { tenant = "tenant-2" } 200.0 If that makes sense. I've used the Prometheus client previously to accomplish these types of metrics, but since I'm fairly new to the Flink world, I was trying to use the built-in constructs available (thus the dynamic groups / metrics being added). On Tue, Mar 16, 2021 at 9:36 AM Chesnay Schepler <[hidden email]> wrote:
|
The test harness is fully independent
of the MiniClusterResource; it isn't actually running a job.
That's why your metrics never arrive at the reporter.
You can either:
a) use the test harness with a custom
MetricGroup implementation that intercepts registered metrics, set
in the MockEnvironment
b) use the function as part of a job
with the custom reporter approach. (essentially, fromElements
-> function -> discarding sink)
The following would work for a), but it
must be noted that this relies on quite a few things that are
internal to Flink:
...
InterceptingOperatorMetricGroup operatorMetricGroup = new InterceptingOperatorMetricGroup(); InterceptingTaskMetricGroup taskMetricGroup = new InterceptingTaskMetricGroup() { @Override public OperatorMetricGroup getOrAddOperator(OperatorID id, String name) { return operatorMetricGroup; } }; new MockEnvironmentBuilder() .setMetricGroup(taskMetricGroup) ... On 3/16/2021 3:42 PM, Rion Williams
wrote:
|
Actually you'd have to further subclass
the operatorMetricGroup such that addGroup works as expected.
This is admittedly a bit of a drag :/
On 3/16/2021 4:35 PM, Chesnay Schepler
wrote:
|
I've made a handful of tweaks to it to try and get them to pick up as expected (i.e. adding logging to every available overload for the interceptors, etc) using something similar to the following: fun create(): InterceptingTaskMetricGroup {It still looks like it's only ever registering the built-in metrics and not hitting any of those for the TestHarness execution. I've even included a simple test metric for the function during the open() call to ensure that it wasn't some other unrelated issue for something happening in the processFunction() calls / dynamic metrics. Said differently - I can see the logs being hit in the InterceptingOperatorMetricGroup.addGroup() calls, but only for the internal metrics from the Task/JobManagers respectively, nothing custom. Rion On Tue, Mar 16, 2021 at 11:00 AM Chesnay Schepler <[hidden email]> wrote:
|
Free forum by Nabble | Edit this page |