Hi,
Im trying to test my RichAsyncFunction implementation with OneInputStreamOperatorTestHarness based on [1]. I'm using Flink 1.9.2 My test setup is: this.processFunction = new MyRichAsyncFunction(); this.testHarness = new OneInputStreamOperatorTestHarness<>( new AsyncWaitOperator<>(processFunction, 2000, 1, OutputMode.ORDERED)); this.testHarness.open(); I'm having below exception when calling this.testHarness.open(); java.lang.NullPointerException at java.base/java.util.Objects.requireNonNull(Objects.java:221) at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.<init>(StreamElementSerializer.java:64) at org.apache.flink.streaming.api.operators.async.AsyncWaitOperator.setup(AsyncWaitOperator.java:142) at org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness.setup(AbstractStreamOperatorTestHarness.java:287) at org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness.setup(AbstractStreamOperatorTestHarness.java:275) at org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness.initializeState(AbstractStreamOperatorTestHarness.java:393) at org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness.initializeState(AbstractStreamOperatorTestHarness.java:300) at org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness.initializeEmptyState(AbstractStreamOperatorTestHarness.java:308) at org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness.open(AbstractStreamOperatorTestHarness.java:483) I will appreciate help with this one. Additionally even though I add all necessary dependencies defiend in [1] I cannot see ProcessFunctionTestHarnesses class. Thanks. [1] https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/testing.html#unit-testing-stateful-or-timely-udfs--custom-operators -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
I've debug it a little bit and I found that it fails in
InstantiationUtil.readObjectFromConfig method when we execute byte[] bytes = config.getBytes(key, (byte[])null); This returns null. The key that it is looking for is "edgesInOrder". In the config map, there are only two entries though. For "checkpointing -> {Boolean@6347} true" and "operatorID -> {byte[16]@6351} " -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
I think I got this to work, although with "nasty" workaround.
I've debugged that configuration for this testHarnes operator was missing two entries: "edgesInOrder" "typeSerializer_in_1" I added conditional break points to InstantiationUtils.readObjectFromConfig method for those two keys and I ran my "real" FlinkJob from IntelliJ. I saw that for "edgesInOrder" an empty array of StreamEdge object was added and for "typeSerializer_in_1" the instance of PojoSerializer class. I took the byte[] for those two and simply added those to arrays to my TestHarnes setup under appropriate keys, like this: Configuration configuration = new Configuration(); configuration.setBytes("edgesInOrder", emptyEdgesListBytes); configuration.setBytes("typeSerializer_in_1", pojoSerializerBytes); MockEnvironment environment = MockEnvironment.builder().build(); environment.getTaskConfiguration().addAll(configuration); Then I used this mock environment to initialize OneInputStreamOperatorTestHarness for AsyncWaitOperator. That seems work, but its a workaround though. -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Hi,
another update on this one. I managed to make the workaround a little bit cleaner. The test setup I have now is like this: ByteArrayOutputStream streamEdgesBytes = new ByteArrayOutputStream(); ObjectOutputStream oosStreamEdges = new ObjectOutputStream(streamEdgesBytes); oosStreamEdges.writeObject(Collections.<StreamEdge>emptyList()); KryoSerializer<MyMessage> kryoSerializer = new KryoSerializer<>( MyMessage.class, executionConfig); ByteArrayOutputStream kryoSerializerBytes = new ByteArrayOutputStream(); ObjectOutputStream oosKryoSerializer = new ObjectOutputStream(kryoSerializerBytes); oosKryoSerializer.writeObject(kryoSerializer); Configuration configuration = new Configuration(); configuration.setBytes("edgesInOrder", streamEdgesBytes.toByteArray()); configuration.setBytes("typeSerializer_in_1", kryoSerializerBytes.toByteArray()); MockEnvironment environment = MockEnvironment.builder().build(); ExecutionConfig executionConfig = environment.getExecutionConfig(); environment.getTaskConfiguration().addAll(configuration); this.testHarness = new OneInputStreamOperatorTestHarness<>( new AsyncWaitOperator<>(processFunction, 2000, 1, OutputMode.UNORDERED), environment); With this setup, this.testHarness.open(); works. However there is another problem, When calling: testHarness.processElement(myMessage, 1L); it throws another exception: java.lang.AssertionError at org.apache.flink.streaming.api.operators.async.AsyncWaitOperator.addAsyncBufferEntry(AsyncWaitOperator.java:400) at org.apache.flink.streaming.api.operators.async.AsyncWaitOperator.processElement(AsyncWaitOperator.java:228) at org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness.processElement(OneInputStreamOperatorTestHarness.java:112) at org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness.processElement(OneInputStreamOperatorTestHarness.java:107) -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
HI :) I have finally figured it out :)
On top of changes from last email, in my test method, I had to wrap "testHarness.processElement" in synchronized block, like this: @Test public void foo() throws Exception { synchronized (this.testHarness.getCheckpointLock()) { testHarness.processElement(MyMessage.builder().build(), 1L); } } That worked. I think that this could be added to official documentation in [1]. [1] https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/testing.html -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
In reply to this post by KristoffSC
Additionally even though I add all necessary dependencies defiend in [1] I That class was added in Flink 1.10 [1]. On Fri, Mar 27, 2020 at 10:13 PM KristoffSC <[hidden email]> wrote: Hi, |
Thanks,
I would suggest adding my "tutorial" about using testHarnes for AsynOperators, to the documentation. Or maybe build something based on this use case, that could be helpful for others in the future :) Thanks, Krzysztof -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Free forum by Nabble | Edit this page |