Help with OneInputStreamOperatorTestHarness

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

Help with OneInputStreamOperatorTestHarness

Chris Schneider
Hi Gang,

I’m having trouble getting my streaming unit test to work. The following code:

    @Test
    public void testDemo() throws Throwable {
        OneInputStreamOperatorTestHarness<CrawlStateUrl, CrawlStateUrl> testHarness =
            new KeyedOneInputStreamOperatorTestHarness<String, CrawlStateUrl, CrawlStateUrl>(
                new StreamFlatMap<>(new DomainDBFunction()),
                new PldKeySelector<CrawlStateUrl>(),
                BasicTypeInfo.STRING_TYPE_INFO,
                1,
                1,
                0);
        testHarness.setup();
        testHarness.open();

        for (int i = 0; i < 10; i++) {
            String urlString = String.format("<a href="https://domain-%d.com/page1" class="">https://domain-%d.com/page1", i);
            CrawlStateUrl url = new CrawlStateUrl(new RawUrl(urlString));
            testHarness.processElement(new StreamRecord<>(url));
        }
        testHarness.snapshot(0L, 0L);
    }


Generates the following exception:

DomainDBFunctionTest.testDemo
testDemo(com.scaleunlimited.flinkcrawler.functions.DomainDBFunctionTest)
java.lang.Exception: Could not complete snapshot 0 for operator MockTask (1/1).
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:379)
at org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness.snapshot(AbstractStreamOperatorTestHarness.java:459)
at com.scaleunlimited.flinkcrawler.functions.DomainDBFunctionTest.testDemo(DomainDBFunctionTest.java:51)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
at org.eclipse.jdt.internal.junit4.runner.JUnit4TestReference.run(JUnit4TestReference.java:50)
at org.eclipse.jdt.internal.junit.runner.TestExecution.run(TestExecution.java:38)
at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:459)
at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:675)
at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.run(RemoteTestRunner.java:382)
at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.main(RemoteTestRunner.java:192)
Caused by: java.lang.NullPointerException
at org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:58)
at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.snapshotFunctionState(StreamingFunctionUtils.java:95)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotState(AbstractUdfStreamOperator.java:90)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:357)
... 26 more

I tried explicitly calling testHarness.setStateBackend(new MemoryStateBackend()), but that didn’t seem to help. I could provide more of my code (e.g., PldKeySelector, DomainDBFunction, CrawlStateUrl, RawUrl, etc.), but that doesn’t seem like it would have much to do with the problem.

Any advice would be most welcome.

Thanks,

- Chris

-----------------------------------------
Chris Schneider
http://www.scaleunlimited.com
custom big data solutions
-----------------------------------------

Reply | Threaded
Open this post in threaded view
|

Re: Help with OneInputStreamOperatorTestHarness

Chris Schneider
Hi Ted,

I should have written that we’re using Flink 1.4.0.

Thanks for the suggestion re: FLINK-8268; it could well be the issue (though the pull request appears fairly complex so I’ll need some time to study it).

Best Regards,

- Chris

On Apr 18, 2018, at 6:33 PM, Ted Yu <[hidden email]> wrote:

Which release are you using ?

See if the work around from FLINK-8268 helps.

Cheers

On Wed, Apr 18, 2018 at 6:26 PM, Chris Schneider <[hidden email]> wrote:
Hi Gang,

I’m having trouble getting my streaming unit test to work. The following code:

    @Test
    public void testDemo() throws Throwable {
        OneInputStreamOperatorTestHarness<CrawlStateUrl, CrawlStateUrl> testHarness =
            new KeyedOneInputStreamOperatorTestHarness<String, CrawlStateUrl, CrawlStateUrl>(
                new StreamFlatMap<>(new DomainDBFunction()),
                new PldKeySelector<CrawlStateUrl>(),
                BasicTypeInfo.STRING_TYPE_INFO,
                1,
                1,
                0);
        testHarness.setup();
        testHarness.open();

        for (int i = 0; i < 10; i++) {
            String urlString = String.format("<a href="https://domain-%d.com/page1" target="_blank" class="">https://domain-%d.com/page1", i);
            CrawlStateUrl url = new CrawlStateUrl(new RawUrl(urlString));
            testHarness.processElement(new StreamRecord<>(url));
        }
        testHarness.snapshot(0L, 0L);
    }


Generates the following exception:

DomainDBFunctionTest.testDemo
testDemo(com.scaleunlimited.flinkcrawler.functions.DomainDBFunctionTest)
java.lang.Exception: Could not complete snapshot 0 for operator MockTask (1/1).
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:379)
at org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness.snapshot(AbstractStreamOperatorTestHarness.java:459)
at com.scaleunlimited.flinkcrawler.functions.DomainDBFunctionTest.testDemo(DomainDBFunctionTest.java:51)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
at org.eclipse.jdt.internal.junit4.runner.JUnit4TestReference.run(JUnit4TestReference.java:50)
at org.eclipse.jdt.internal.junit.runner.TestExecution.run(TestExecution.java:38)
at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:459)
at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:675)
at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.run(RemoteTestRunner.java:382)
at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.main(RemoteTestRunner.java:192)
Caused by: java.lang.NullPointerException
at org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:58)
at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.snapshotFunctionState(StreamingFunctionUtils.java:95)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotState(AbstractUdfStreamOperator.java:90)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:357)
... 26 more

I tried explicitly calling testHarness.setStateBackend(new MemoryStateBackend()), but that didn’t seem to help. I could provide more of my code (e.g., PldKeySelector, DomainDBFunction, CrawlStateUrl, RawUrl, etc.), but that doesn’t seem like it would have much to do with the problem.

Any advice would be most welcome.

Thanks,

- Chris

-----------------------------------------
Chris Schneider
http://www.scaleunlimited.com
custom big data solutions
-----------------------------------------



-----------------------------------------
Chris Schneider
http://www.scaleunlimited.com
custom big data solutions
-----------------------------------------

Reply | Threaded
Open this post in threaded view
|

Re: Help with OneInputStreamOperatorTestHarness

Chris Schneider
Hi Gang,

FWIW, the code below works just fine using Flink 1.5-SNAPSHOT. I also tried cherry-picking the commit that fixed FLINK-8268 to Flink 1.4.0, but that resulted in the same failure mode.

I guess the takeaway is that this streaming test code harness support (which everyone should note is not yet part of the public Flink API) was apparently fragile in 1.4.0.

FYI,

- Chris

On Apr 18, 2018, at 8:07 PM, Chris Schneider <[hidden email]> wrote:

Hi Ted,

I should have written that we’re using Flink 1.4.0.

Thanks for the suggestion re: FLINK-8268; it could well be the issue (though the pull request appears fairly complex so I’ll need some time to study it).

Best Regards,

- Chris

On Apr 18, 2018, at 6:33 PM, Ted Yu <[hidden email]> wrote:

Which release are you using ?

See if the work around from FLINK-8268 helps.

Cheers

On Wed, Apr 18, 2018 at 6:26 PM, Chris Schneider <[hidden email]> wrote:
Hi Gang,

I’m having trouble getting my streaming unit test to work. The following code:

    @Test
    public void testDemo() throws Throwable {
        OneInputStreamOperatorTestHarness<CrawlStateUrl, CrawlStateUrl> testHarness =
            new KeyedOneInputStreamOperatorTestHarness<String, CrawlStateUrl, CrawlStateUrl>(
                new StreamFlatMap<>(new DomainDBFunction()),
                new PldKeySelector<CrawlStateUrl>(),
                BasicTypeInfo.STRING_TYPE_INFO,
                1,
                1,
                0);
        testHarness.setup();
        testHarness.open();

        for (int i = 0; i < 10; i++) {
            String urlString = String.format("<a href="https://domain-%d.com/page1" target="_blank" class="">https://domain-%d.com/page1", i);
            CrawlStateUrl url = new CrawlStateUrl(new RawUrl(urlString));
            testHarness.processElement(new StreamRecord<>(url));
        }
        testHarness.snapshot(0L, 0L);
    }


Generates the following exception:

DomainDBFunctionTest.testDemo
testDemo(com.scaleunlimited.flinkcrawler.functions.DomainDBFunctionTest)
java.lang.Exception: Could not complete snapshot 0 for operator MockTask (1/1).
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:379)
at org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness.snapshot(AbstractStreamOperatorTestHarness.java:459)
at com.scaleunlimited.flinkcrawler.functions.DomainDBFunctionTest.testDemo(DomainDBFunctionTest.java:51)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
at org.eclipse.jdt.internal.junit4.runner.JUnit4TestReference.run(JUnit4TestReference.java:50)
at org.eclipse.jdt.internal.junit.runner.TestExecution.run(TestExecution.java:38)
at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:459)
at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:675)
at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.run(RemoteTestRunner.java:382)
at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.main(RemoteTestRunner.java:192)
Caused by: java.lang.NullPointerException
at org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:58)
at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.snapshotFunctionState(StreamingFunctionUtils.java:95)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotState(AbstractUdfStreamOperator.java:90)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:357)
... 26 more

I tried explicitly calling testHarness.setStateBackend(new MemoryStateBackend()), but that didn’t seem to help. I could provide more of my code (e.g., PldKeySelector, DomainDBFunction, CrawlStateUrl, RawUrl, etc.), but that doesn’t seem like it would have much to do with the problem.

Any advice would be most welcome.

Thanks,

- Chris

-----------------------------------------
Chris Schneider
http://www.scaleunlimited.com
custom big data solutions
-----------------------------------------



-----------------------------------------
Chris Schneider
http://www.scaleunlimited.com
custom big data solutions
-----------------------------------------


-----------------------------------------
Chris Schneider
http://www.scaleunlimited.com
custom big data solutions
-----------------------------------------

Reply | Threaded
Open this post in threaded view
|

Re: Help with OneInputStreamOperatorTestHarness

Fabian Hueske-2
Thanks for reporting the issue Chris!
Would you mind opening a JIRA issue [1] to track the bug for Flink 1.4?

Thank you, Fabian

[1] https://issues.apache.org/jira/browse/FLINK

2018-04-25 21:11 GMT+02:00 Chris Schneider <[hidden email]>:
Hi Gang,

FWIW, the code below works just fine using Flink 1.5-SNAPSHOT. I also tried cherry-picking the commit that fixed FLINK-8268 to Flink 1.4.0, but that resulted in the same failure mode.

I guess the takeaway is that this streaming test code harness support (which everyone should note is not yet part of the public Flink API) was apparently fragile in 1.4.0.

FYI,

- Chris


On Apr 18, 2018, at 8:07 PM, Chris Schneider <[hidden email]> wrote:

Hi Ted,

I should have written that we’re using Flink 1.4.0.

Thanks for the suggestion re: FLINK-8268; it could well be the issue (though the pull request appears fairly complex so I’ll need some time to study it).

Best Regards,

- Chris

On Apr 18, 2018, at 6:33 PM, Ted Yu <[hidden email]> wrote:

Which release are you using ?

See if the work around from FLINK-8268 helps.

Cheers

On Wed, Apr 18, 2018 at 6:26 PM, Chris Schneider <[hidden email]> wrote:
Hi Gang,

I’m having trouble getting my streaming unit test to work. The following code:

    @Test
    public void testDemo() throws Throwable {
        OneInputStreamOperatorTestHarness<CrawlStateUrl, CrawlStateUrl> testHarness =
            new KeyedOneInputStreamOperatorTestHarness<String, CrawlStateUrl, CrawlStateUrl>(
                new StreamFlatMap<>(new DomainDBFunction()),
                new PldKeySelector<CrawlStateUrl>(),
                BasicTypeInfo.STRING_TYPE_INFO,
                1,
                1,
                0);
        testHarness.setup();
        testHarness.open();

        for (int i = 0; i < 10; i++) {
            String urlString = String.format("<a href="https://domain-%d.com/page1" target="_blank">https://domain-%d.com/page1", i);
            CrawlStateUrl url = new CrawlStateUrl(new RawUrl(urlString));
            testHarness.processElement(new StreamRecord<>(url));
        }
        testHarness.snapshot(0L, 0L);
    }


Generates the following exception:

DomainDBFunctionTest.testDemo
testDemo(com.scaleunlimited.flinkcrawler.functions.DomainDBFunctionTest)
java.lang.Exception: Could not complete snapshot 0 for operator MockTask (1/1).
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:379)
at org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness.snapshot(AbstractStreamOperatorTestHarness.java:459)
at com.scaleunlimited.flinkcrawler.functions.DomainDBFunctionTest.testDemo(DomainDBFunctionTest.java:51)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
at org.eclipse.jdt.internal.junit4.runner.JUnit4TestReference.run(JUnit4TestReference.java:50)
at org.eclipse.jdt.internal.junit.runner.TestExecution.run(TestExecution.java:38)
at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:459)
at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:675)
at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.run(RemoteTestRunner.java:382)
at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.main(RemoteTestRunner.java:192)
Caused by: java.lang.NullPointerException
at org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:58)
at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.snapshotFunctionState(StreamingFunctionUtils.java:95)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotState(AbstractUdfStreamOperator.java:90)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:357)
... 26 more

I tried explicitly calling testHarness.setStateBackend(new MemoryStateBackend()), but that didn’t seem to help. I could provide more of my code (e.g., PldKeySelector, DomainDBFunction, CrawlStateUrl, RawUrl, etc.), but that doesn’t seem like it would have much to do with the problem.

Any advice would be most welcome.

Thanks,

- Chris

-----------------------------------------
Chris Schneider
http://www.scaleunlimited.com
custom big data solutions
-----------------------------------------



-----------------------------------------
Chris Schneider
http://www.scaleunlimited.com
custom big data solutions
-----------------------------------------


-----------------------------------------
Chris Schneider
http://www.scaleunlimited.com
custom big data solutions
-----------------------------------------


Reply | Threaded
Open this post in threaded view
|

Re: Help with OneInputStreamOperatorTestHarness

Chris Schneider
Hi Fabian,

I created FLINK-9262.

FYI,

- Chris

On Apr 26, 2018, at 3:07 AM, Fabian Hueske <[hidden email]> wrote:

Thanks for reporting the issue Chris!
Would you mind opening a JIRA issue [1] to track the bug for Flink 1.4?

Thank you, Fabian

[1] https://issues.apache.org/jira/browse/FLINK

2018-04-25 21:11 GMT+02:00 Chris Schneider <[hidden email]>:
Hi Gang,

FWIW, the code below works just fine using Flink 1.5-SNAPSHOT. I also tried cherry-picking the commit that fixed FLINK-8268 to Flink 1.4.0, but that resulted in the same failure mode.

I guess the takeaway is that this streaming test code harness support (which everyone should note is not yet part of the public Flink API) was apparently fragile in 1.4.0.

FYI,

- Chris


On Apr 18, 2018, at 8:07 PM, Chris Schneider <[hidden email]> wrote:

Hi Ted,

I should have written that we’re using Flink 1.4.0.

Thanks for the suggestion re: FLINK-8268; it could well be the issue (though the pull request appears fairly complex so I’ll need some time to study it).

Best Regards,

- Chris

On Apr 18, 2018, at 6:33 PM, Ted Yu <[hidden email]> wrote:

Which release are you using ?

See if the work around from FLINK-8268 helps.

Cheers

On Wed, Apr 18, 2018 at 6:26 PM, Chris Schneider <[hidden email]> wrote:
Hi Gang,

I’m having trouble getting my streaming unit test to work. The following code:

    @Test
    public void testDemo() throws Throwable {
        OneInputStreamOperatorTestHarness<CrawlStateUrl, CrawlStateUrl> testHarness =
            new KeyedOneInputStreamOperatorTestHarness<String, CrawlStateUrl, CrawlStateUrl>(
                new StreamFlatMap<>(new DomainDBFunction()),
                new PldKeySelector<CrawlStateUrl>(),
                BasicTypeInfo.STRING_TYPE_INFO,
                1,
                1,
                0);
        testHarness.setup();
        testHarness.open();

        for (int i = 0; i < 10; i++) {
            String urlString = String.format("<a href="https://domain-%d.com/page1" target="_blank" class="">https://domain-%d.com/page1", i);
            CrawlStateUrl url = new CrawlStateUrl(new RawUrl(urlString));
            testHarness.processElement(new StreamRecord<>(url));
        }
        testHarness.snapshot(0L, 0L);
    }


Generates the following exception:

DomainDBFunctionTest.testDemo
testDemo(com.scaleunlimited.flinkcrawler.functions.DomainDBFunctionTest)
java.lang.Exception: Could not complete snapshot 0 for operator MockTask (1/1).
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:379)
at org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness.snapshot(AbstractStreamOperatorTestHarness.java:459)
at com.scaleunlimited.flinkcrawler.functions.DomainDBFunctionTest.testDemo(DomainDBFunctionTest.java:51)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
at org.eclipse.jdt.internal.junit4.runner.JUnit4TestReference.run(JUnit4TestReference.java:50)
at org.eclipse.jdt.internal.junit.runner.TestExecution.run(TestExecution.java:38)
at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:459)
at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:675)
at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.run(RemoteTestRunner.java:382)
at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.main(RemoteTestRunner.java:192)
Caused by: java.lang.NullPointerException
at org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:58)
at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.snapshotFunctionState(StreamingFunctionUtils.java:95)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotState(AbstractUdfStreamOperator.java:90)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:357)
... 26 more

I tried explicitly calling testHarness.setStateBackend(new MemoryStateBackend()), but that didn’t seem to help. I could provide more of my code (e.g., PldKeySelector, DomainDBFunction, CrawlStateUrl, RawUrl, etc.), but that doesn’t seem like it would have much to do with the problem.

Any advice would be most welcome.

Thanks,

- Chris

-----------------------------------------
Chris Schneider
http://www.scaleunlimited.com
custom big data solutions
-----------------------------------------



-----------------------------------------
Chris Schneider
http://www.scaleunlimited.com
custom big data solutions
-----------------------------------------


-----------------------------------------
Chris Schneider
http://www.scaleunlimited.com
custom big data solutions
-----------------------------------------



-----------------------------------------
Chris Schneider
http://www.scaleunlimited.com
custom big data solutions
-----------------------------------------