Windowing() is giving me NullPointer exception after keyedstream<> in Apache Flink

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

Windowing() is giving me NullPointer exception after keyedstream<> in Apache Flink

pavan.530530
Hi ,

I have a function called windowing() which basically gets streams from kafka-0.9 and do transformations as below :

       public void windowing() throws Exception {
                DataStream<EnrichedEnergyMetricSchema> timeStampStream = dataStream.assignTimestampsAndWatermarks(new MessageTimeStampExtractor(maxTimeLag));
                KeyedStream<EnrichedEnergyMetricSchema,Tuple> ks=timeStampStream.keyBy(StreamingConstants.KEY_BY_VARIABLE);
                ks.timeWindow(Time.seconds(120)).sum("totalConsumed").addSink(sinkFunction);
                env.execute("test program");
       
            }

Unit testing for this code is as follows :
I used env.readFile() function to read data from file and try to generate streams :

     @org.junit.Test
        public void testWindowing() throws Exception {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
            env.setParallelism(1);
            EnergyInputFormat format = new EnergyInputFormat();
            DataStream<EnrichedEnergyMetricSchema> dataStreamSource = env.readFile(format,"teststream");
            Windowing windowing = new Windowing(env,dataStreamSource,sinkFunctionTest);
            windowing.windowing();
            ObjectMapper objectMapper = new ObjectMapper();
            EnrichedEnergyMetricSchema aggregatedValue = objectMapper.readValue(result,EnrichedEnergyMetricSchema.class);
            Long actual = aggregatedValue.getTotalConsumed();
            Long expected = 18L;
            Assert.assertEquals(expected,actual);
        }



Exception what I am getting is :


    Connected to JobManager at Actor[akka://flink/user/jobmanager_1#991060241]
    11/28/2016 16:37:18    Job execution switched to status RUNNING.
    11/28/2016 16:37:18    Source: Custom File Source(1/1) switched to SCHEDULED
    11/28/2016 16:37:18    Source: Custom File Source(1/1) switched to DEPLOYING
    11/28/2016 16:37:18    FileSplitReader_Custom File Source -> Timestamps/Watermarks(1/1) switched to SCHEDULED
    11/28/2016 16:37:18    FileSplitReader_Custom File Source -> Timestamps/Watermarks(1/1) switched to DEPLOYING
    11/28/2016 16:37:18    TriggerWindow(TumblingEventTimeWindows(120000), ReducingStateDescriptor{serializer=org.apache.flink.api.java.typeutils.runtime.PojoSerializer@72c21b09, reduceFunction=org.apache.flink.streaming.api.functions.aggregation.SumAggregator@433d61fb}, EventTimeTrigger(), WindowedStream.apply(WindowedStream.java:347)) -> Sink: Unnamed(1/1) switched to SCHEDULED
    11/28/2016 16:37:18    TriggerWindow(TumblingEventTimeWindows(120000), ReducingStateDescriptor{serializer=org.apache.flink.api.java.typeutils.runtime.PojoSerializer@72c21b09, reduceFunction=org.apache.flink.streaming.api.functions.aggregation.SumAggregator@433d61fb}, EventTimeTrigger(), WindowedStream.apply(WindowedStream.java:347)) -> Sink: Unnamed(1/1) switched to DEPLOYING
    11/28/2016 16:37:18    TriggerWindow(TumblingEventTimeWindows(120000), ReducingStateDescriptor{serializer=org.apache.flink.api.java.typeutils.runtime.PojoSerializer@72c21b09, reduceFunction=org.apache.flink.streaming.api.functions.aggregation.SumAggregator@433d61fb}, EventTimeTrigger(), WindowedStream.apply(WindowedStream.java:347)) -> Sink: Unnamed(1/1) switched to RUNNING
    11/28/2016 16:37:18    Source: Custom File Source(1/1) switched to RUNNING
    11/28/2016 16:37:18    FileSplitReader_Custom File Source -> Timestamps/Watermarks(1/1) switched to RUNNING
    11/28/2016 16:37:18    Source: Custom File Source(1/1) switched to FINISHED
    11/28/2016 16:37:19    TriggerWindow(TumblingEventTimeWindows(120000), ReducingStateDescriptor{serializer=org.apache.flink.api.java.typeutils.runtime.PojoSerializer@72c21b09, reduceFunction=org.apache.flink.streaming.api.functions.aggregation.SumAggregator@433d61fb}, EventTimeTrigger(), WindowedStream.apply(WindowedStream.java:347)) -> Sink: Unnamed(1/1) switched to FINISHED
    11/28/2016 16:37:19    FileSplitReader_Custom File Source -> Timestamps/Watermarks(1/1) switched to FINISHED
    11/28/2016 16:37:19    Job execution switched to status FINISHED.
   
    java.lang.NullPointerException
                at com.fasterxml.jackson.core.JsonFactory.createParser(JsonFactory.java:876)
                at com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:2146)
                at com.philips.lighting.cheetah.flink.WindowingTest.testWindowing(WindowingTest.java:33)
                at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
                at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
                at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
                at java.lang.reflect.Method.invoke(Method.java:498)
                at org.junit.internal.runners.TestMethod.invoke(TestMethod.java:59)
                at org.junit.internal.runners.MethodRoadie.runTestMethod(MethodRoadie.java:98)
                at org.junit.internal.runners.MethodRoadie$2.run(MethodRoadie.java:79)
                at org.junit.internal.runners.MethodRoadie.runBeforesThenTestThenAfters(MethodRoadie.java:87)
                at org.junit.internal.runners.MethodRoadie.runTest(MethodRoadie.java:77)
                at org.junit.internal.runners.MethodRoadie.run(MethodRoadie.java:42)
                at org.junit.internal.runners.JUnit4ClassRunner.invokeTestMethod(JUnit4ClassRunner.java:88)
                at org.junit.internal.runners.JUnit4ClassRunner.runMethods(JUnit4ClassRunner.java:51)
                at org.junit.internal.runners.JUnit4ClassRunner$1.run(JUnit4ClassRunner.java:44)
                at org.junit.internal.runners.ClassRoadie.runUnprotected(ClassRoadie.java:27)
                at org.junit.internal.runners.ClassRoadie.runProtected(ClassRoadie.java:37)
                at org.junit.internal.runners.JUnit4ClassRunner.run(JUnit4ClassRunner.java:42)
                at org.junit.runner.JUnitCore.run(JUnitCore.java:130)
                at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:119)
                at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:42)
                at com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:234)
                at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:74)
                at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
                at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
                at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
                at java.lang.reflect.Method.invoke(Method.java:498)
                at com.intellij.rt.execution.application.AppMain.main(AppMain.java:144)



I have 2 records in my file and I am injecting the objects using google guice.


Problem is, Windowing is not happening after keyedstream. It's just coming out with NullPointer Exception.

Help is much appreciated!!!