[Flink::Test] access registered accumulators via harness

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

[Flink::Test] access registered accumulators via harness

Rinat
Hi mates !

I guess that I'm doing something wrong, but I couldn't find a way to access registered accumulators and their values via org.apache.flink.streaming.util.ProcessFunctionTestHarness function wrapper that I'm using to test my functions.

During the code research I've found, that required data is stored in org.apache.flink.runtime.metrics.groups.AbstractMetricGroup#metrics field, that is private and is not accessible from tests. It's obvious that Flink somehow accesses this field and exposes counters into it's Web UI.

So I guess that someone can help me to add a check into my Unit Tests for metrics counting or in case if there is no such ability I'm ready to help to implement it if the community considers this acceptable.

Thx !










Reply | Threaded
Open this post in threaded view
|

Re: [Flink::Test] access registered accumulators via harness

Dawid Wysakowicz-2

Hi Rinat,

First of all, sorry for some nitpicking in the beginning, but your message might be a bit misleading for some. If I understood your message correctly you are referring to Metrics, not accumulators, which are a different concept[1]. Or were you indeed referring to accumulators?

Now on to the topic of accessing metrics. Personally I don't think it is a right way for testing by exposing metrics somehow in the ProcessFunctionTestHarness. The harness should primarily be used as a minimal execution environment for testing operators and such behaviours as e.g. checkpointing. I would not recommend using it for testing business logic and most definitely metrics. I'd either test that in an IT test using a MiniCluster and a metric reporter you can assert or I'd separate the business logic from the setup logic. Something like:

    private static class MyProcessFunction<IN, OUT> extends ProcessFunction<IN, OUT> {

        private MyLogic<IN, OUT> logic;

        @Override
        public void open(Configuration parameters) throws Exception {
            super.open(parameters);
            this.logic = new MyLogic<>(getRuntimeContext().getMetricGroup().counter("my-counter"));
        }

        @Override
        public void processElement(IN value, Context ctx, Collector<OUT> out) throws Exception {
            out.collect(logic.doMyBusinessLogic(value));
        }
    }

    private static class MyLogic<IN, OUT> {
        
        private final Counter counter;
        
        public MyLogic(Counter counter) {
            this.counter = counter;
        }

        public OUT doMyBusinessLogic(IN value) {
            // do the processing
        }

    }

That way you can easily test your MyLogic class including interactions with the counter, by passing a mock counter.

Best,

Dawid


[1] https://ci.apache.org/projects/flink/flink-docs-master/dev/user_defined_functions.html#accumulators--counters

On 27/10/2020 08:02, Sharipov, Rinat wrote:
Hi mates !

I guess that I'm doing something wrong, but I couldn't find a way to access registered accumulators and their values via org.apache.flink.streaming.util.ProcessFunctionTestHarness function wrapper that I'm using to test my functions.

During the code research I've found, that required data is stored in org.apache.flink.runtime.metrics.groups.AbstractMetricGroup#metrics field, that is private and is not accessible from tests. It's obvious that Flink somehow accesses this field and exposes counters into it's Web UI.

So I guess that someone can help me to add a check into my Unit Tests for metrics counting or in case if there is no such ability I'm ready to help to implement it if the community considers this acceptable.

Thx !







                              

                              

                            


signature.asc (849 bytes) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: [Flink::Test] access registered accumulators via harness

Rinat
Hi Dawid, thx for your reply and sorry for a question with a double interpretation !

You understood me correctly, I would like to get counters value by their names after completing all operations with the harness component.

I suppose that it should be useful because most of our functions are implementations of Flink functions and create counters in open phase.

I'm expecting that the following API can be useful in AbstractStreamOperatorTestHarness:

public static class AbstractStreamOperatorTestHarness <OUT> implements AutoCloseable { 

  // that will give us an access to whole registered counters, metric sub-groups, gauges, etc
  public MetricGroup getMetricGroup() {
    return environment.getMetricGroup();
  }
}

Here is an example of usage, based on your example

private static class MyProcessFunction<IN, OUT> extends ProcessFunction<IN, OUT> {

        private Counter myCustomCounter1;
        private Counter myCustomCounter2;

        @Override
        public void open(Configuration parameters) throws Exception {
            this.myCustomCounter1 = getRuntimeContext().getMetricGroup().counter("myCustomCounter1");
            this.myCustomCounter2 = getRuntimeContext().getMetricGroup().counter("myCustomCounter2");
        }

        @Override
        public void processElement(IN value, Context ctx, Collector<OUT> out) throws Exception {
            if (checkCase1(value)) {
               myCustomCounter1.inc();
               return;
            }
            if (checkCase2(value)) {
               myCustomCounter2.inc();
               return;
            }

            out.collect(logic.doMyBusinessLogic(value));
        }
    }

public static class TestMyProcessFunction {
   public void processElement_should_incCounter1() {
      ProcessFunctionTestHarness harness = ...;
      harness.processElement(element);
      assertThat(harness.getMetricGroup().counter("myCustomCounter1").getCount(), equalTo(1));
      assertThat(harness.getMetricGroup().counter("myCustomCounter2").getCount(), equalTo(0));
   }
}



What do you think about such a harness API proposal ?

Thx !

пт, 30 окт. 2020 г. в 12:54, Dawid Wysakowicz <[hidden email]>:

Hi Rinat,

First of all, sorry for some nitpicking in the beginning, but your message might be a bit misleading for some. If I understood your message correctly you are referring to Metrics, not accumulators, which are a different concept[1]. Or were you indeed referring to accumulators?

Now on to the topic of accessing metrics. Personally I don't think it is a right way for testing by exposing metrics somehow in the ProcessFunctionTestHarness. The harness should primarily be used as a minimal execution environment for testing operators and such behaviours as e.g. checkpointing. I would not recommend using it for testing business logic and most definitely metrics. I'd either test that in an IT test using a MiniCluster and a metric reporter you can assert or I'd separate the business logic from the setup logic. Something like:

    private static class MyProcessFunction<IN, OUT> extends ProcessFunction<IN, OUT> {

        private MyLogic<IN, OUT> logic;

        @Override
        public void open(Configuration parameters) throws Exception {
            super.open(parameters);
            this.logic = new MyLogic<>(getRuntimeContext().getMetricGroup().counter("my-counter"));
        }

        @Override
        public void processElement(IN value, Context ctx, Collector<OUT> out) throws Exception {
            out.collect(logic.doMyBusinessLogic(value));
        }
    }

    private static class MyLogic<IN, OUT> {
        
        private final Counter counter;
        
        public MyLogic(Counter counter) {
            this.counter = counter;
        }

        public OUT doMyBusinessLogic(IN value) {
            // do the processing
        }

    }

That way you can easily test your MyLogic class including interactions with the counter, by passing a mock counter.

Best,

Dawid


[1] https://ci.apache.org/projects/flink/flink-docs-master/dev/user_defined_functions.html#accumulators--counters

On 27/10/2020 08:02, Sharipov, Rinat wrote:
Hi mates !

I guess that I'm doing something wrong, but I couldn't find a way to access registered accumulators and their values via org.apache.flink.streaming.util.ProcessFunctionTestHarness function wrapper that I'm using to test my functions.

During the code research I've found, that required data is stored in org.apache.flink.runtime.metrics.groups.AbstractMetricGroup#metrics field, that is private and is not accessible from tests. It's obvious that Flink somehow accesses this field and exposes counters into it's Web UI.

So I guess that someone can help me to add a check into my Unit Tests for metrics counting or in case if there is no such ability I'm ready to help to implement it if the community considers this acceptable.

Thx !