Hi mates ! I guess that I'm doing something wrong, but I couldn't find a way to access registered accumulators and their values via org.apache.flink.streaming.util.ProcessFunctionTestHarness function wrapper that I'm using to test my functions. During the code research I've found, that required data is stored in org.apache.flink.runtime.metrics.groups.AbstractMetricGroup#metrics field, that is private and is not accessible from tests. It's obvious that Flink somehow accesses this field and exposes counters into it's Web UI. So I guess that someone can help me to add a check into my Unit Tests for metrics counting or in case if there is no such ability I'm ready to help to implement it if the community considers this acceptable. Thx ! |
Hi Rinat, First of all, sorry for some nitpicking in the beginning, but
your message might be a bit misleading for some. If I understood
your message correctly you are referring to Metrics, not
accumulators, which are a different concept[1]. Or were you indeed
referring to accumulators? Now on to the topic of accessing metrics. Personally I don't think it is a right way for testing by exposing metrics somehow in the ProcessFunctionTestHarness. The harness should primarily be used as a minimal execution environment for testing operators and such behaviours as e.g. checkpointing. I would not recommend using it for testing business logic and most definitely metrics. I'd either test that in an IT test using a MiniCluster and a metric reporter you can assert or I'd separate the business logic from the setup logic. Something like: private static class MyProcessFunction<IN, OUT>
extends ProcessFunction<IN, OUT> {
private MyLogic<IN, OUT> logic;
@Override
public void open(Configuration parameters)
throws Exception {
super.open(parameters);
this.logic = new
MyLogic<>(getRuntimeContext().getMetricGroup().counter("my-counter"));
}
@Override
public void processElement(IN value, Context
ctx, Collector<OUT> out) throws Exception {
out.collect(logic.doMyBusinessLogic(value));
}
}
private static class MyLogic<IN, OUT> {
private final Counter counter;
public MyLogic(Counter counter) {
this.counter = counter;
}
public OUT doMyBusinessLogic(IN value) {
// do the processing
}
That way you can easily test your MyLogic class including interactions with the counter, by passing a mock counter. Best, Dawid
On 27/10/2020 08:02, Sharipov, Rinat
wrote:
signature.asc (849 bytes) Download Attachment |
Hi Dawid, thx for your reply and sorry for a question with a double interpretation ! You understood me correctly, I would like to get counters value by their names after completing all operations with the harness component. I suppose that it should be useful because most of our functions are implementations of Flink functions and create counters in open phase. I'm expecting that the following API can be useful in AbstractStreamOperatorTestHarness: public static class AbstractStreamOperatorTestHarness <OUT> implements AutoCloseable { // that will give us an access to whole registered counters, metric sub-groups, gauges, etc public MetricGroup getMetricGroup() { return environment.getMetricGroup(); } } Here is an example of usage, based on your example private static class MyProcessFunction<IN, OUT> extends ProcessFunction<IN, OUT> {
private Counter myCustomCounter1; private Counter myCustomCounter2;
@Override
public void open(Configuration parameters) throws Exception {
this.myCustomCounter1 = getRuntimeContext().getMetricGroup().counter("myCustomCounter1"); this.myCustomCounter2 = getRuntimeContext().getMetricGroup().counter("myCustomCounter2"); }
@Override
public void processElement(IN value, Context ctx, Collector<OUT> out) throws Exception {
if (checkCase1(value)) { myCustomCounter1.inc(); return; } if (checkCase2(value)) { myCustomCounter2.inc(); return; }
out.collect(logic.doMyBusinessLogic(value));
}
}
public static class TestMyProcessFunction { public void processElement_should_incCounter1() { ProcessFunctionTestHarness harness = ...; harness.processElement(element); assertThat(harness.getMetricGroup().counter("myCustomCounter1").getCount(), equalTo(1)); assertThat(harness.getMetricGroup().counter("myCustomCounter2").getCount(), equalTo(0)); } } What do you think about such a harness API proposal ? Thx ! пт, 30 окт. 2020 г. в 12:54, Dawid Wysakowicz <[hidden email]>:
|
Free forum by Nabble | Edit this page |