Unit testing Async Operator

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

Unit testing Async Operator

Arpith techy
Hi,

I tried mocking the Async operator which takes Tuple1, Tuple3 as Input & Output but while creating a test harness I couldn't find the right TupleSerializer. Can anyone help me on this?


public class GetMetadataAsyncProcess extends RichAsyncFunction<Tuple1<Map<String, List<String>>>, Tuple3<Map<String, List<String>>, Map<String, String>, List<Map<String, Integer>>>> {
...
}

private OneInputStreamOperatorTestHarness createTestHarness(GetMetadataAsyncProcess asyncFunction, long timeout, int capacity, AsyncDataStream.OutputMode mode) throws Exception {
OneInputStreamOperatorTestHarness tuple1OUTOneInputStreamOperatorTestHarness = new OneInputStreamOperatorTestHarness<>
(new AsyncWaitOperator<>(asyncFunction, timeout, capacity, mode), <TYPESERIALISER_TO_FILL?>);
return tuple1OUTOneInputStreamOperatorTestHarness;
}

Creating a harness without passing TypeSerializer results in the following error.
java.lang.NullPointerException
at java.util.Objects.requireNonNull(Objects.java:203)
at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.<init>(StreamElementSerializer.java:64)
at org.apache.flink.streaming.api.operators.async.AsyncWaitOperator.setup(AsyncWaitOperator.java:138)
at org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness.setup(AbstractStreamOperatorTestHarness.java:284)
at org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness.setup(AbstractStreamOperatorTestHarness.java:273)
at org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness.initializeState(AbstractStreamOperatorTestHarness.java:389)
at org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness.initializeState(AbstractStreamOperatorTestHarness.java:296)
at org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness.initializeEmptyState(AbstractStreamOperatorTestHarness.java:304)
at org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness.open(AbstractStreamOperatorTestHarness.java:479)
at GetMetadataAsyncProcessTest.testRichAsyncFunction(GetMetadataAsyncProcessTest.java:40)
Reply | Threaded
Open this post in threaded view
|

Re: Unit testing Async Operator

Till Rohrmann
Hi Arpith,

looking at the definition of the GetMetadataAsyncProcess function you need to specify the TypeSerializer for a Tuple1<Map<String, List<String>>>. What you could try in order to not create the serializer manually is to use:

TypeInformation.of(new TypeHint<Tuple1<Map<String, List<String>>>>(){}).createSerializer(new ExecutionConfig())

This should hopefully create the correct serializer.

Cheers,
Till



On Tue, Feb 16, 2021 at 11:33 AM Arpith techy <[hidden email]> wrote:
Hi,

I tried mocking the Async operator which takes Tuple1, Tuple3 as Input & Output but while creating a test harness I couldn't find the right TupleSerializer. Can anyone help me on this?


public class GetMetadataAsyncProcess extends RichAsyncFunction<Tuple1<Map<String, List<String>>>, Tuple3<Map<String, List<String>>, Map<String, String>, List<Map<String, Integer>>>> {
...
}

private OneInputStreamOperatorTestHarness createTestHarness(GetMetadataAsyncProcess asyncFunction, long timeout, int capacity, AsyncDataStream.OutputMode mode) throws Exception {
OneInputStreamOperatorTestHarness tuple1OUTOneInputStreamOperatorTestHarness = new OneInputStreamOperatorTestHarness<>
(new AsyncWaitOperator<>(asyncFunction, timeout, capacity, mode), <TYPESERIALISER_TO_FILL?>);
return tuple1OUTOneInputStreamOperatorTestHarness;
}

Creating a harness without passing TypeSerializer results in the following error.
java.lang.NullPointerException
at java.util.Objects.requireNonNull(Objects.java:203)
at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.<init>(StreamElementSerializer.java:64)
at org.apache.flink.streaming.api.operators.async.AsyncWaitOperator.setup(AsyncWaitOperator.java:138)
at org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness.setup(AbstractStreamOperatorTestHarness.java:284)
at org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness.setup(AbstractStreamOperatorTestHarness.java:273)
at org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness.initializeState(AbstractStreamOperatorTestHarness.java:389)
at org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness.initializeState(AbstractStreamOperatorTestHarness.java:296)
at org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness.initializeEmptyState(AbstractStreamOperatorTestHarness.java:304)
at org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness.open(AbstractStreamOperatorTestHarness.java:479)
at GetMetadataAsyncProcessTest.testRichAsyncFunction(GetMetadataAsyncProcessTest.java:40)
Reply | Threaded
Open this post in threaded view
|

Re: Unit testing Async Operator

Arvid Heise-4
Hi Arpith,

The operator test harness is more meant for use cases where you implement your own operator (quite advanced).

If you just want to test your AsyncFunction, I'd strongly recommend building a small ITCase like [1] and then you don't have to fiddle with these things anymore. The tests run very fast and test much more than the operator harness, which just simulates the execution somewhat.


On Tue, Feb 16, 2021 at 1:47 PM Till Rohrmann <[hidden email]> wrote:
Hi Arpith,

looking at the definition of the GetMetadataAsyncProcess function you need to specify the TypeSerializer for a Tuple1<Map<String, List<String>>>. What you could try in order to not create the serializer manually is to use:

TypeInformation.of(new TypeHint<Tuple1<Map<String, List<String>>>>(){}).createSerializer(new ExecutionConfig())

This should hopefully create the correct serializer.

Cheers,
Till



On Tue, Feb 16, 2021 at 11:33 AM Arpith techy <[hidden email]> wrote:
Hi,

I tried mocking the Async operator which takes Tuple1, Tuple3 as Input & Output but while creating a test harness I couldn't find the right TupleSerializer. Can anyone help me on this?


public class GetMetadataAsyncProcess extends RichAsyncFunction<Tuple1<Map<String, List<String>>>, Tuple3<Map<String, List<String>>, Map<String, String>, List<Map<String, Integer>>>> {
...
}

private OneInputStreamOperatorTestHarness createTestHarness(GetMetadataAsyncProcess asyncFunction, long timeout, int capacity, AsyncDataStream.OutputMode mode) throws Exception {
OneInputStreamOperatorTestHarness tuple1OUTOneInputStreamOperatorTestHarness = new OneInputStreamOperatorTestHarness<>
(new AsyncWaitOperator<>(asyncFunction, timeout, capacity, mode), <TYPESERIALISER_TO_FILL?>);
return tuple1OUTOneInputStreamOperatorTestHarness;
}

Creating a harness without passing TypeSerializer results in the following error.
java.lang.NullPointerException
at java.util.Objects.requireNonNull(Objects.java:203)
at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.<init>(StreamElementSerializer.java:64)
at org.apache.flink.streaming.api.operators.async.AsyncWaitOperator.setup(AsyncWaitOperator.java:138)
at org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness.setup(AbstractStreamOperatorTestHarness.java:284)
at org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness.setup(AbstractStreamOperatorTestHarness.java:273)
at org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness.initializeState(AbstractStreamOperatorTestHarness.java:389)
at org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness.initializeState(AbstractStreamOperatorTestHarness.java:296)
at org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness.initializeEmptyState(AbstractStreamOperatorTestHarness.java:304)
at org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness.open(AbstractStreamOperatorTestHarness.java:479)
at GetMetadataAsyncProcessTest.testRichAsyncFunction(GetMetadataAsyncProcessTest.java:40)
Reply | Threaded
Open this post in threaded view
|

Re: Unit testing Async Operator

Arpith techy
Thanks Till, your solution worked perfectly. 

Arpith

On Wed, Feb 17, 2021 at 12:53 AM Arvid Heise <[hidden email]> wrote:
Hi Arpith,

The operator test harness is more meant for use cases where you implement your own operator (quite advanced).

If you just want to test your AsyncFunction, I'd strongly recommend building a small ITCase like [1] and then you don't have to fiddle with these things anymore. The tests run very fast and test much more than the operator harness, which just simulates the execution somewhat.


On Tue, Feb 16, 2021 at 1:47 PM Till Rohrmann <[hidden email]> wrote:
Hi Arpith,

looking at the definition of the GetMetadataAsyncProcess function you need to specify the TypeSerializer for a Tuple1<Map<String, List<String>>>. What you could try in order to not create the serializer manually is to use:

TypeInformation.of(new TypeHint<Tuple1<Map<String, List<String>>>>(){}).createSerializer(new ExecutionConfig())

This should hopefully create the correct serializer.

Cheers,
Till



On Tue, Feb 16, 2021 at 11:33 AM Arpith techy <[hidden email]> wrote:
Hi,

I tried mocking the Async operator which takes Tuple1, Tuple3 as Input & Output but while creating a test harness I couldn't find the right TupleSerializer. Can anyone help me on this?


public class GetMetadataAsyncProcess extends RichAsyncFunction<Tuple1<Map<String, List<String>>>, Tuple3<Map<String, List<String>>, Map<String, String>, List<Map<String, Integer>>>> {
...
}

private OneInputStreamOperatorTestHarness createTestHarness(GetMetadataAsyncProcess asyncFunction, long timeout, int capacity, AsyncDataStream.OutputMode mode) throws Exception {
OneInputStreamOperatorTestHarness tuple1OUTOneInputStreamOperatorTestHarness = new OneInputStreamOperatorTestHarness<>
(new AsyncWaitOperator<>(asyncFunction, timeout, capacity, mode), <TYPESERIALISER_TO_FILL?>);
return tuple1OUTOneInputStreamOperatorTestHarness;
}

Creating a harness without passing TypeSerializer results in the following error.
java.lang.NullPointerException
at java.util.Objects.requireNonNull(Objects.java:203)
at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.<init>(StreamElementSerializer.java:64)
at org.apache.flink.streaming.api.operators.async.AsyncWaitOperator.setup(AsyncWaitOperator.java:138)
at org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness.setup(AbstractStreamOperatorTestHarness.java:284)
at org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness.setup(AbstractStreamOperatorTestHarness.java:273)
at org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness.initializeState(AbstractStreamOperatorTestHarness.java:389)
at org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness.initializeState(AbstractStreamOperatorTestHarness.java:296)
at org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness.initializeEmptyState(AbstractStreamOperatorTestHarness.java:304)
at org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness.open(AbstractStreamOperatorTestHarness.java:479)
at GetMetadataAsyncProcessTest.testRichAsyncFunction(GetMetadataAsyncProcessTest.java:40)