Hi ,
I have a function called windowing() which basically gets streams from kafka-0.9 and do transformations as below : public void windowing() throws Exception { DataStream<EnrichedEnergyMetricSchema> timeStampStream = dataStream.assignTimestampsAndWatermarks(new MessageTimeStampExtractor(maxTimeLag)); KeyedStream<EnrichedEnergyMetricSchema,Tuple> ks=timeStampStream.keyBy(StreamingConstants.KEY_BY_VARIABLE); ks.timeWindow(Time.seconds(120)).sum("totalConsumed").addSink(sinkFunction); env.execute("test program"); } Unit testing for this code is as follows : I used env.readFile() function to read data from file and try to generate streams : @org.junit.Test public void testWindowing() throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); env.setParallelism(1); EnergyInputFormat format = new EnergyInputFormat(); DataStream<EnrichedEnergyMetricSchema> dataStreamSource = env.readFile(format,"teststream"); Windowing windowing = new Windowing(env,dataStreamSource,sinkFunctionTest); windowing.windowing(); ObjectMapper objectMapper = new ObjectMapper(); EnrichedEnergyMetricSchema aggregatedValue = objectMapper.readValue(result,EnrichedEnergyMetricSchema.class); Long actual = aggregatedValue.getTotalConsumed(); Long expected = 18L; Assert.assertEquals(expected,actual); } Exception what I am getting is : Connected to JobManager at Actor[akka://flink/user/jobmanager_1#991060241] 11/28/2016 16:37:18 Job execution switched to status RUNNING. 11/28/2016 16:37:18 Source: Custom File Source(1/1) switched to SCHEDULED 11/28/2016 16:37:18 Source: Custom File Source(1/1) switched to DEPLOYING 11/28/2016 16:37:18 FileSplitReader_Custom File Source -> Timestamps/Watermarks(1/1) switched to SCHEDULED 11/28/2016 16:37:18 FileSplitReader_Custom File Source -> Timestamps/Watermarks(1/1) switched to DEPLOYING 11/28/2016 16:37:18 TriggerWindow(TumblingEventTimeWindows(120000), ReducingStateDescriptor{serializer=org.apache.flink.api.java.typeutils.runtime.PojoSerializer@72c21b09, reduceFunction=org.apache.flink.streaming.api.functions.aggregation.SumAggregator@433d61fb}, EventTimeTrigger(), WindowedStream.apply(WindowedStream.java:347)) -> Sink: Unnamed(1/1) switched to SCHEDULED 11/28/2016 16:37:18 TriggerWindow(TumblingEventTimeWindows(120000), ReducingStateDescriptor{serializer=org.apache.flink.api.java.typeutils.runtime.PojoSerializer@72c21b09, reduceFunction=org.apache.flink.streaming.api.functions.aggregation.SumAggregator@433d61fb}, EventTimeTrigger(), WindowedStream.apply(WindowedStream.java:347)) -> Sink: Unnamed(1/1) switched to DEPLOYING 11/28/2016 16:37:18 TriggerWindow(TumblingEventTimeWindows(120000), ReducingStateDescriptor{serializer=org.apache.flink.api.java.typeutils.runtime.PojoSerializer@72c21b09, reduceFunction=org.apache.flink.streaming.api.functions.aggregation.SumAggregator@433d61fb}, EventTimeTrigger(), WindowedStream.apply(WindowedStream.java:347)) -> Sink: Unnamed(1/1) switched to RUNNING 11/28/2016 16:37:18 Source: Custom File Source(1/1) switched to RUNNING 11/28/2016 16:37:18 FileSplitReader_Custom File Source -> Timestamps/Watermarks(1/1) switched to RUNNING 11/28/2016 16:37:18 Source: Custom File Source(1/1) switched to FINISHED 11/28/2016 16:37:19 TriggerWindow(TumblingEventTimeWindows(120000), ReducingStateDescriptor{serializer=org.apache.flink.api.java.typeutils.runtime.PojoSerializer@72c21b09, reduceFunction=org.apache.flink.streaming.api.functions.aggregation.SumAggregator@433d61fb}, EventTimeTrigger(), WindowedStream.apply(WindowedStream.java:347)) -> Sink: Unnamed(1/1) switched to FINISHED 11/28/2016 16:37:19 FileSplitReader_Custom File Source -> Timestamps/Watermarks(1/1) switched to FINISHED 11/28/2016 16:37:19 Job execution switched to status FINISHED. java.lang.NullPointerException at com.fasterxml.jackson.core.JsonFactory.createParser(JsonFactory.java:876) at com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:2146) at com.philips.lighting.cheetah.flink.WindowingTest.testWindowing(WindowingTest.java:33) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.junit.internal.runners.TestMethod.invoke(TestMethod.java:59) at org.junit.internal.runners.MethodRoadie.runTestMethod(MethodRoadie.java:98) at org.junit.internal.runners.MethodRoadie$2.run(MethodRoadie.java:79) at org.junit.internal.runners.MethodRoadie.runBeforesThenTestThenAfters(MethodRoadie.java:87) at org.junit.internal.runners.MethodRoadie.runTest(MethodRoadie.java:77) at org.junit.internal.runners.MethodRoadie.run(MethodRoadie.java:42) at org.junit.internal.runners.JUnit4ClassRunner.invokeTestMethod(JUnit4ClassRunner.java:88) at org.junit.internal.runners.JUnit4ClassRunner.runMethods(JUnit4ClassRunner.java:51) at org.junit.internal.runners.JUnit4ClassRunner$1.run(JUnit4ClassRunner.java:44) at org.junit.internal.runners.ClassRoadie.runUnprotected(ClassRoadie.java:27) at org.junit.internal.runners.ClassRoadie.runProtected(ClassRoadie.java:37) at org.junit.internal.runners.JUnit4ClassRunner.run(JUnit4ClassRunner.java:42) at org.junit.runner.JUnitCore.run(JUnitCore.java:130) at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:119) at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:42) at com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:234) at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:74) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at com.intellij.rt.execution.application.AppMain.main(AppMain.java:144) I have 2 records in my file and I am injecting the objects using google guice. Problem is, Windowing is not happening after keyedstream. It's just coming out with NullPointer Exception. Help is much appreciated!!! |
Free forum by Nabble | Edit this page |