Login  Register

Help with OneInputStreamOperatorTestHarness

Posted by Chris Schneider on Apr 19, 2018; 1:26am
URL: http://deprecated-apache-flink-user-mailing-list-archive.369.s1.nabble.com/Help-with-OneInputStreamOperatorTestHarness-tp19621.html

Hi Gang,

I’m having trouble getting my streaming unit test to work. The following code:

    @Test
    public void testDemo() throws Throwable {
        OneInputStreamOperatorTestHarness<CrawlStateUrl, CrawlStateUrl> testHarness =
            new KeyedOneInputStreamOperatorTestHarness<String, CrawlStateUrl, CrawlStateUrl>(
                new StreamFlatMap<>(new DomainDBFunction()),
                new PldKeySelector<CrawlStateUrl>(),
                BasicTypeInfo.STRING_TYPE_INFO,
                1,
                1,
                0);
        testHarness.setup();
        testHarness.open();

        for (int i = 0; i < 10; i++) {
            String urlString = String.format("<a href="https://domain-%d.com/page1" class="">https://domain-%d.com/page1", i);
            CrawlStateUrl url = new CrawlStateUrl(new RawUrl(urlString));
            testHarness.processElement(new StreamRecord<>(url));
        }
        testHarness.snapshot(0L, 0L);
    }


Generates the following exception:

DomainDBFunctionTest.testDemo
testDemo(com.scaleunlimited.flinkcrawler.functions.DomainDBFunctionTest)
java.lang.Exception: Could not complete snapshot 0 for operator MockTask (1/1).
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:379)
at org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness.snapshot(AbstractStreamOperatorTestHarness.java:459)
at com.scaleunlimited.flinkcrawler.functions.DomainDBFunctionTest.testDemo(DomainDBFunctionTest.java:51)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
at org.eclipse.jdt.internal.junit4.runner.JUnit4TestReference.run(JUnit4TestReference.java:50)
at org.eclipse.jdt.internal.junit.runner.TestExecution.run(TestExecution.java:38)
at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:459)
at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:675)
at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.run(RemoteTestRunner.java:382)
at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.main(RemoteTestRunner.java:192)
Caused by: java.lang.NullPointerException
at org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:58)
at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.snapshotFunctionState(StreamingFunctionUtils.java:95)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotState(AbstractUdfStreamOperator.java:90)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:357)
... 26 more

I tried explicitly calling testHarness.setStateBackend(new MemoryStateBackend()), but that didn’t seem to help. I could provide more of my code (e.g., PldKeySelector, DomainDBFunction, CrawlStateUrl, RawUrl, etc.), but that doesn’t seem like it would have much to do with the problem.

Any advice would be most welcome.

Thanks,

- Chris

-----------------------------------------
Chris Schneider
http://www.scaleunlimited.com
custom big data solutions
-----------------------------------------