Hi Gang,
I’m having trouble getting my streaming unit test to work. The following code: @Test public void testDemo() throws Throwable { OneInputStreamOperatorTestHarness<CrawlStateUrl, CrawlStateUrl> testHarness = new KeyedOneInputStreamOperatorTestHarness<String, CrawlStateUrl, CrawlStateUrl>( new StreamFlatMap<>(new DomainDBFunction()), new PldKeySelector<CrawlStateUrl>(), BasicTypeInfo.STRING_TYPE_INFO, 1, 1, 0); testHarness.setup(); testHarness.open(); for (int i = 0; i < 10; i++) { String urlString = String.format("<a href="https://domain-%d.com/page1" class="">https://domain-%d.com/page1", i); CrawlStateUrl url = new CrawlStateUrl(new RawUrl(urlString)); testHarness.processElement(new StreamRecord<>(url)); } testHarness.snapshot(0L, 0L); } Generates the following exception: DomainDBFunctionTest.testDemo testDemo(com.scaleunlimited.flinkcrawler.functions.DomainDBFunctionTest) java.lang.Exception: Could not complete snapshot 0 for operator MockTask (1/1). at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:379) at org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness.snapshot(AbstractStreamOperatorTestHarness.java:459) at com.scaleunlimited.flinkcrawler.functions.DomainDBFunctionTest.testDemo(DomainDBFunctionTest.java:51) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57) at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290) at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71) at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288) at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58) at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268) at org.junit.runners.ParentRunner.run(ParentRunner.java:363) at org.eclipse.jdt.internal.junit4.runner.JUnit4TestReference.run(JUnit4TestReference.java:50) at org.eclipse.jdt.internal.junit.runner.TestExecution.run(TestExecution.java:38) at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:459) at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:675) at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.run(RemoteTestRunner.java:382) at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.main(RemoteTestRunner.java:192) Caused by: java.lang.NullPointerException at org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:58) at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.snapshotFunctionState(StreamingFunctionUtils.java:95) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotState(AbstractUdfStreamOperator.java:90) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:357) ... 26 more I tried explicitly calling testHarness.setStateBackend(new MemoryStateBackend()), but that didn’t seem to help. I could provide more of my code (e.g., PldKeySelector, DomainDBFunction, CrawlStateUrl, RawUrl, etc.), but that doesn’t seem like it would have much to do with the problem. Any advice would be most welcome. Thanks, - Chris -----------------------------------------
Chris Schneider http://www.scaleunlimited.com custom big data solutions ----------------------------------------- |
Hi Ted,
I should have written that we’re using Flink 1.4.0. Thanks for the suggestion re: FLINK-8268; it could well be the issue (though the pull request appears fairly complex so I’ll need some time to study it). Best Regards, - Chris
-----------------------------------------
Chris Schneider http://www.scaleunlimited.com custom big data solutions ----------------------------------------- |
Hi Gang,
FWIW, the code below works just fine using Flink 1.5-SNAPSHOT. I also tried cherry-picking the commit that fixed FLINK-8268 to Flink 1.4.0, but that resulted in the same failure mode. I guess the takeaway is that this streaming test code harness support (which everyone should note is not yet part of the public Flink API) was apparently fragile in 1.4.0. FYI, - Chris
-----------------------------------------
Chris Schneider http://www.scaleunlimited.com custom big data solutions ----------------------------------------- |
Thanks for reporting the issue Chris! Would you mind opening a JIRA issue [1] to track the bug for Flink 1.4?[1] https://issues.apache.org/jira/browse/FLINK 2018-04-25 21:11 GMT+02:00 Chris Schneider <[hidden email]>:
|
Hi Fabian,
I created FLINK-9262. FYI, - Chris
-----------------------------------------
Chris Schneider http://www.scaleunlimited.com custom big data solutions ----------------------------------------- |
Free forum by Nabble | Edit this page |