On Apr 18, 2018, at 8:07 PM, Chris Schneider <[hidden email]> wrote:Hi Ted,I should have written that we’re using Flink 1.4.0.Thanks for the suggestion re: FLINK-8268; it could well be the issue (though the pull request appears fairly complex so I’ll need some time to study it).Best Regards,- ChrisOn Apr 18, 2018, at 6:33 PM, Ted Yu <[hidden email]> wrote:Which release are you using ?See if the work around from FLINK-8268 helps.CheersOn Wed, Apr 18, 2018 at 6:26 PM, Chris Schneider <[hidden email]> wrote:Hi Gang,I’m having trouble getting my streaming unit test to work. The following code:@Testpublic void testDemo() throws Throwable {OneInputStreamOperatorTestHarness<CrawlStateUrl, CrawlStateUrl> testHarness = new KeyedOneInputStreamOperatorTestHarness<String, CrawlStateUrl, CrawlStateUrl>( new StreamFlatMap<>(new DomainDBFunction()),new PldKeySelector<CrawlStateUrl>(), BasicTypeInfo.STRING_TYPE_INFO, 1,1,0);testHarness.setup();testHarness.open();for (int i = 0; i < 10; i++) {String urlString = String.format("<a href="https://domain-%d.com/page1" target="_blank" class="">https://domain-%d.com/page1" , i);CrawlStateUrl url = new CrawlStateUrl(new RawUrl(urlString));testHarness.processElement(new StreamRecord<>(url));}testHarness.snapshot(0L, 0L);}Generates the following exception:DomainDBFunctionTest.testDemotestDemo(com.scaleunlimited.flinkcrawler.functions. DomainDBFunctionTest) java.lang.Exception: Could not complete snapshot 0 for operator MockTask (1/1).at org.apache.flink.streaming.api.operators. AbstractStreamOperator. snapshotState( AbstractStreamOperator.java: 379) at org.apache.flink.streaming.util. AbstractStreamOperatorTestHarn ess.snapshot( AbstractStreamOperatorTestHarn ess.java:459) at com.scaleunlimited.flinkcrawler.functions. DomainDBFunctionTest.testDemo( DomainDBFunctionTest.java:51) at sun.reflect.NativeMethodAccessorImpl. invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl. invoke( NativeMethodAccessorImpl.java: 62) at sun.reflect.DelegatingMethodAccessorImpl. invoke( DelegatingMethodAccessorImpl. java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.junit.runners.model.FrameworkMethod$1. runReflectiveCall( FrameworkMethod.java:50) at org.junit.internal.runners.model.ReflectiveCallable.run( ReflectiveCallable.java:12) at org.junit.runners.model.FrameworkMethod. invokeExplosively( FrameworkMethod.java:47) at org.junit.internal.runners.statements.InvokeMethod. evaluate(InvokeMethod.java:17) at org.junit.internal.runners.statements.RunBefores. evaluate(RunBefores.java:26) at org.junit.runners.ParentRunner.runLeaf( ParentRunner.java:325) at org.junit.runners.BlockJUnit4ClassRunner. runChild( BlockJUnit4ClassRunner.java: 78) at org.junit.runners.BlockJUnit4ClassRunner. runChild( BlockJUnit4ClassRunner.java: 57) at org.junit.runners.ParentRunner$3.run( ParentRunner.java:290) at org.junit.runners.ParentRunner$1.schedule( ParentRunner.java:71) at org.junit.runners.ParentRunner.runChildren( ParentRunner.java:288) at org.junit.runners.ParentRunner.access$000( ParentRunner.java:58) at org.junit.runners.ParentRunner$2.evaluate( ParentRunner.java:268) at org.junit.runners.ParentRunner.run(ParentRunner. java:363) at org.eclipse.jdt.internal.junit4.runner. JUnit4TestReference.run( JUnit4TestReference.java:50) at org.eclipse.jdt.internal.junit.runner.TestExecution. run(TestExecution.java:38) at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner. runTests(RemoteTestRunner. java:459) at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner. runTests(RemoteTestRunner. java:675) at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner. run(RemoteTestRunner.java:382) at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner. main(RemoteTestRunner.java: 192) Caused by: java.lang.NullPointerExceptionat org.apache.flink.util.Preconditions.checkNotNull( Preconditions.java:58) at org.apache.flink.streaming.util.functions. StreamingFunctionUtils. snapshotFunctionState( StreamingFunctionUtils.java: 95) at org.apache.flink.streaming.api.operators. AbstractUdfStreamOperator. snapshotState( AbstractUdfStreamOperator. java:90) at org.apache.flink.streaming.api.operators. AbstractStreamOperator. snapshotState( AbstractStreamOperator.java: 357) ... 26 moreI tried explicitly calling testHarness.setStateBackend( new MemoryStateBackend()), but that didn’t seem to help. I could provide more of my code (e.g., PldKeySelector, DomainDBFunction, CrawlStateUrl, RawUrl, etc.), but that doesn’t seem like it would have much to do with the problem.Any advice would be most welcome.Thanks,- Chris-----------------------------------------
Chris Schneider
http://www.scaleunlimited.com
custom big data solutions
----------------------------------------- -----------------------------------------
Chris Schneider
http://www.scaleunlimited.com
custom big data solutions
-----------------------------------------
Free forum by Nabble | Edit this page |