Re: Testing RichAsyncFunction with TestHarness

Posted by KristoffSC on
URL: http://deprecated-apache-flink-user-mailing-list-archive.369.s1.nabble.com/Testing-RichAsyncFunction-with-TestHarness-tp33947p33960.html

Hi,
another update on this one.
I managed to make the workaround a little bit cleaner.

The test setup I have now is like this:

ByteArrayOutputStream streamEdgesBytes = new ByteArrayOutputStream();
    ObjectOutputStream oosStreamEdges = new
ObjectOutputStream(streamEdgesBytes);
    oosStreamEdges.writeObject(Collections.<StreamEdge>emptyList());

    KryoSerializer<MyMessage> kryoSerializer = new KryoSerializer<>(
        MyMessage.class, executionConfig);
    ByteArrayOutputStream kryoSerializerBytes = new ByteArrayOutputStream();
    ObjectOutputStream oosKryoSerializer = new
ObjectOutputStream(kryoSerializerBytes);
    oosKryoSerializer.writeObject(kryoSerializer);

Configuration configuration = new Configuration();
    configuration.setBytes("edgesInOrder", streamEdgesBytes.toByteArray());
    configuration.setBytes("typeSerializer_in_1",
kryoSerializerBytes.toByteArray());

    MockEnvironment environment = MockEnvironment.builder().build();
    ExecutionConfig executionConfig = environment.getExecutionConfig();
    environment.getTaskConfiguration().addAll(configuration);

this.testHarness = new OneInputStreamOperatorTestHarness<>(
        new AsyncWaitOperator<>(processFunction, 2000, 1,
OutputMode.UNORDERED), environment);

With this setup, this.testHarness.open(); works.
However there is another problem,
When calling:
testHarness.processElement(myMessage, 1L);
it throws another exception:

java.lang.AssertionError
        at
org.apache.flink.streaming.api.operators.async.AsyncWaitOperator.addAsyncBufferEntry(AsyncWaitOperator.java:400)
        at
org.apache.flink.streaming.api.operators.async.AsyncWaitOperator.processElement(AsyncWaitOperator.java:228)
        at
org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness.processElement(OneInputStreamOperatorTestHarness.java:112)
        at
org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness.processElement(OneInputStreamOperatorTestHarness.java:107)




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/