Hey community,
I am not sure whether it is a bug or I am doing something wrong. I have a little snippet produced by our project (see below). When I execute it in Eclipse everything works fine. However, when deploying the Jar to the local flink installation I get NullPointer Exceptions after the program had already finished. I found out that it happens exactly after the time of the window trigger elapsed (10 seconds in this example). So it seems that there is still a thread running, although the program has already finished. I guess the thread does not get anymore input since the file was completely read already and thus produces NullPointer Exceptions when trying to write these null elements. But I think you know more about this. FYI: I am using Flink-0.9.0-rc4 built with Scala 2.11 So here the code: import org.apache.flink.streaming.api.scala._ And here the log output: Exception in thread "Thread-32" java.lang.RuntimeException: java.lang.RuntimeException: java.lang.RuntimeException: java.lang.RuntimeException: java.lang.RuntimeException: java.lang.RuntimeException: java.lang.RuntimeException: java.lang.RuntimeException: java.lang.RuntimeException: java.lang.NullPointerExceptionAnd a part of the .out - File: 13:28:48,696 INFO org.apache.flink.runtime.jobmanager.JobManager - Received job bd1f75cb1db031b11794bf5e1123fd9a (Starting Query). Best Regards, Philipp |
Hey Phillip,
thanks for reporting the problem. I think your assessment is correct. If the program is already finished, the threads throwing the Exceptions should have been cleaned up as well. I am not sure, but I think parts of the system touching this mechanism could have been reworked by Aljoscha in the current master branch. Is it possible for you to try it out? If yes, it would be great to know if it is fixed there. As far as I know, there were no API breaking changes in the meantime. @Aljoscha: do you think this is fixed with your latest changes in 0.10-SNAPSHOT? – Ufuk On 28 Jul 2015, at 14:02, Philipp Goetze <[hidden email]> wrote: > Hey community, > > I am not sure whether it is a bug or I am doing something wrong. I have a little snippet produced by our project (see below). When I execute it in Eclipse everything works fine. However, when deploying the Jar to the local flink installation I get NullPointer Exceptions after the program had already finished. I found out that it happens exactly after the time of the window trigger elapsed (10 seconds in this example). So it seems that there is still a thread running, although the program has already finished. I guess the thread does not get anymore input since the file was completely read already and thus produces NullPointer Exceptions when trying to write these null elements. But I think you know more about this. > > FYI: I am using Flink-0.9.0-rc4 built with Scala 2.11 > > So here the code: > > import org.apache.flink.streaming.api.scala._ > import dbis.flink._ > import java.util.concurrent.TimeUnit > import org.apache.flink.streaming.api.windowing.helper._ > import org.apache.flink.util.Collector > > > object windowCount { > > def customgrpdMap(ts: Iterable[List[Any]], out: Collector[List[Any]]) = { > out.collect(ts.groupBy(t => t(0)).flatMap(x => List(x._1,x._2)).toList) > } > > def customcntdMap(ts: Iterable[List[Any]], out: Collector[List[Any]]) = { > ts.foreach { t => out.collect(List(t(0),PigFuncs.count(t(1).asInstanceOf[Seq[Any]])))} > } > > def tuplecntdToString(t: List[Any]): String = { > implicit def anyToSeq(a: Any) = a.asInstanceOf[Seq[Any]] > > val sb = new StringBuilder > sb.append(t(0)) > .append(",") > .append(t(1)) > sb.toString > } > > def main(args: Array[String]) { > val env = StreamExecutionEnvironment.getExecutionEnvironment > val input = PigStorage().load(env, "src/it/resources/mary.txt") > val words = input.flatMap(t => PigFuncs.tokenize(t(0).toString)).map(t => List(t)) > > val win = words.window(Time.of(10, TimeUnit.SECONDS)).every(Time.of(10, TimeUnit.SECONDS)) > val grpd = win.groupBy(t => t(0)).mapWindow(customgrpdMap _) > val cntd = grpd.mapWindow(customcntdMap _).flatten() > > cntd.map(t => tuplecntdToString(t)).writeAsText("marycounts.out") > env.execute("Starting Query") > } > } > > And here the log output: > > Exception in thread "Thread-32" java.lang.RuntimeException: java.lang.RuntimeException: java.lang.RuntimeException: java.lang.RuntimeException: java.lang.RuntimeException: java.lang.RuntimeException: java.lang.RuntimeException: java.lang.RuntimeException: java.lang.RuntimeException: java.lang.NullPointerException > at org.apache.flink.streaming.runtime.tasks.OutputHandler$OperatorCollector.collect(OutputHandler.java:244) > at org.apache.flink.streaming.api.collector.CollectorWrapper.collect(CollectorWrapper.java:40) > at org.apache.flink.streaming.api.operators.windowing.StreamDiscretizer.emitWindow(StreamDiscretizer.java:133) > at org.apache.flink.streaming.api.operators.windowing.StreamDiscretizer.triggerOnFakeElement(StreamDiscretizer.java:121) > at org.apache.flink.streaming.api.operators.windowing.StreamDiscretizer$WindowingCallback.sendFakeElement(StreamDiscretizer.java:194) > at org.apache.flink.streaming.api.windowing.policy.TimeTriggerPolicy.activeFakeElementEmission(TimeTriggerPolicy.java:117) > at org.apache.flink.streaming.api.windowing.policy.TimeTriggerPolicy$TimeCheck.run(TimeTriggerPolicy.java:144) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.lang.RuntimeException: java.lang.RuntimeException: java.lang.RuntimeException: java.lang.RuntimeException: java.lang.RuntimeException: java.lang.RuntimeException: java.lang.RuntimeException: java.lang.RuntimeException: java.lang.NullPointerException > at org.apache.flink.streaming.runtime.tasks.OutputHandler$OperatorCollector.collect(OutputHandler.java:244) > at org.apache.flink.streaming.api.collector.CollectorWrapper.collect(CollectorWrapper.java:40) > at org.apache.flink.streaming.api.windowing.windowbuffer.BasicWindowBuffer.emitWindow(BasicWindowBuffer.java:43) > at org.apache.flink.streaming.api.operators.windowing.StreamWindowBuffer.handleWindowEvent(StreamWindowBuffer.java:55) > at org.apache.flink.streaming.api.operators.windowing.StreamWindowBuffer.handleWindowEvent(StreamWindowBuffer.java:60) > at org.apache.flink.streaming.api.operators.windowing.StreamWindowBuffer.processElement(StreamWindowBuffer.java:45) > at org.apache.flink.streaming.api.operators.windowing.StreamWindowBuffer.processElement(StreamWindowBuffer.java:29) > at org.apache.flink.streaming.runtime.tasks.OutputHandler$OperatorCollector.collect(OutputHandler.java:239) > ... 7 more > Caused by: java.lang.RuntimeException: java.lang.RuntimeException: java.lang.RuntimeException: java.lang.RuntimeException: java.lang.RuntimeException: java.lang.RuntimeException: java.lang.RuntimeException: java.lang.NullPointerException > at org.apache.flink.streaming.runtime.tasks.OutputHandler$OperatorCollector.collect(OutputHandler.java:244) > at org.apache.flink.streaming.api.collector.CollectorWrapper.collect(CollectorWrapper.java:40) > at org.apache.flink.streaming.api.operators.windowing.WindowPartitioner.processElement(WindowPartitioner.java:65) > at org.apache.flink.streaming.api.operators.windowing.WindowPartitioner.processElement(WindowPartitioner.java:29) > at org.apache.flink.streaming.runtime.tasks.OutputHandler$OperatorCollector.collect(OutputHandler.java:239) > ... 14 more > Caused by: java.lang.RuntimeException: java.lang.RuntimeException: java.lang.RuntimeException: java.lang.RuntimeException: java.lang.RuntimeException: java.lang.RuntimeException: java.lang.NullPointerException > at org.apache.flink.streaming.runtime.tasks.OutputHandler$OperatorCollector.collect(OutputHandler.java:244) > at org.apache.flink.streaming.api.collector.CollectorWrapper.collect(CollectorWrapper.java:40) > at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:35) > at org.apache.flink.streaming.runtime.tasks.OutputHandler$OperatorCollector.collect(OutputHandler.java:239) > ... 18 more > Caused by: java.lang.RuntimeException: java.lang.RuntimeException: java.lang.RuntimeException: java.lang.RuntimeException: java.lang.RuntimeException: java.lang.NullPointerException > at org.apache.flink.streaming.runtime.tasks.OutputHandler$OperatorCollector.collect(OutputHandler.java:244) > at org.apache.flink.streaming.api.collector.CollectorWrapper.collect(CollectorWrapper.java:40) > at org.apache.flink.streaming.api.operators.windowing.WindowMerger.processElement(WindowMerger.java:58) > at org.apache.flink.streaming.api.operators.windowing.WindowMerger.processElement(WindowMerger.java:32) > at org.apache.flink.streaming.runtime.tasks.OutputHandler$OperatorCollector.collect(OutputHandler.java:239) > ... 21 more > Caused by: java.lang.RuntimeException: java.lang.RuntimeException: java.lang.RuntimeException: java.lang.RuntimeException: java.lang.NullPointerException > at org.apache.flink.streaming.runtime.tasks.OutputHandler$OperatorCollector.collect(OutputHandler.java:244) > at org.apache.flink.streaming.api.collector.CollectorWrapper.collect(CollectorWrapper.java:40) > at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:35) > at org.apache.flink.streaming.runtime.tasks.OutputHandler$OperatorCollector.collect(OutputHandler.java:239) > ... 25 more > Caused by: java.lang.RuntimeException: java.lang.RuntimeException: java.lang.RuntimeException: java.lang.NullPointerException > at org.apache.flink.streaming.runtime.tasks.OutputHandler$CopyingOperatorCollector.collect(OutputHandler.java:277) > at org.apache.flink.streaming.api.collector.CollectorWrapper.collect(CollectorWrapper.java:40) > at org.apache.flink.streaming.api.operators.windowing.WindowFlattener.processElement(WindowFlattener.java:41) > at org.apache.flink.streaming.api.operators.windowing.WindowFlattener.processElement(WindowFlattener.java:28) > at org.apache.flink.streaming.runtime.tasks.OutputHandler$OperatorCollector.collect(OutputHandler.java:239) > ... 28 more > Caused by: java.lang.RuntimeException: java.lang.RuntimeException: java.lang.NullPointerException > at org.apache.flink.streaming.runtime.tasks.OutputHandler$CopyingOperatorCollector.collect(OutputHandler.java:277) > at org.apache.flink.streaming.api.collector.CollectorWrapper.collect(CollectorWrapper.java:40) > at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:35) > at org.apache.flink.streaming.runtime.tasks.OutputHandler$CopyingOperatorCollector.collect(OutputHandler.java:272) > ... 32 more > Caused by: java.lang.RuntimeException: java.lang.NullPointerException > at org.apache.flink.streaming.api.functions.sink.FileSinkFunction.flush(FileSinkFunction.java:108) > at org.apache.flink.streaming.api.functions.sink.FileSinkFunction.invoke(FileSinkFunction.java:65) > at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:35) > at org.apache.flink.streaming.runtime.tasks.OutputHandler$CopyingOperatorCollector.collect(OutputHandler.java:272) > ... 35 more > Caused by: java.lang.NullPointerException > at org.apache.flink.api.java.io.TextOutputFormat.writeRecord(TextOutputFormat.java:93) > at org.apache.flink.streaming.api.functions.sink.FileSinkFunction.flush(FileSinkFunction.java:94) > ... 38 more > > And a part of the .out - File: > > 13:28:48,696 INFO org.apache.flink.runtime.jobmanager.JobManager - Received job bd1f75cb1db031b11794bf5e1123fd9a (Starting Query). > 13:28:48,698 INFO org.apache.flink.runtime.jobmanager.JobManager - Scheduling job Starting Query. > 13:28:48,698 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Read Text File Source -> Map -> Flat Map -> Map -> StreamDiscretizer -> (BasicWindowBuffer, BasicWindowBuffer, BasicWindowBuffer -> Window partitioner -> Window Map -> Window Merger -> Window Map -> Window Flatten -> Map -> Stream Sink) (1/1) (ff2bb914c620859de94262af78ac9269) switched from CREATED to SCHEDULED > 13:28:48,698 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Read Text File Source -> Map -> Flat Map -> Map -> StreamDiscretizer -> (BasicWindowBuffer, BasicWindowBuffer, BasicWindowBuffer -> Window partitioner -> Window Map -> Window Merger -> Window Map -> Window Flatten -> Map -> Stream Sink) (1/1) (ff2bb914c620859de94262af78ac9269) switched from SCHEDULED to DEPLOYING > 13:28:48,698 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Deploying Read Text File Source -> Map -> Flat Map -> Map -> StreamDiscretizer -> (BasicWindowBuffer, BasicWindowBuffer, BasicWindowBuffer -> Window partitioner -> Window Map -> Window Merger -> Window Map -> Window Flatten -> Map -> Stream Sink) (1/1) (attempt #0) to localhost > 13:28:48,699 INFO org.apache.flink.runtime.taskmanager.TaskManager - Received task Read Text File Source -> Map -> Flat Map -> Map -> StreamDiscretizer -> (BasicWindowBuffer, BasicWindowBuffer, BasicWindowBuffer -> Window partitioner -> Window Map -> Window Merger -> Window Map -> Window Flatten -> Map -> Stream Sink) (1/1) > 13:28:48,699 INFO org.apache.flink.runtime.jobmanager.JobManager - Status of job bd1f75cb1db031b11794bf5e1123fd9a (Starting Query) changed to RUNNING. > 13:28:48,705 INFO org.apache.flink.runtime.taskmanager.Task - Loading JAR files for task Read Text File Source -> Map -> Flat Map -> Map -> StreamDiscretizer -> (BasicWindowBuffer, BasicWindowBuffer, BasicWindowBuffer -> Window partitioner -> Window Map -> Window Merger -> Window Map -> Window Flatten -> Map -> Stream Sink) (1/1) > 13:28:48,707 INFO org.apache.flink.runtime.blob.BlobCache - Downloading f2612fe1d4aadc5206820be652dfa1019a66007c from localhost/127.0.0.1:47210 > 13:28:48,709 INFO org.apache.flink.runtime.taskmanager.Task - Registering task at network: Read Text File Source -> Map -> Flat Map -> Map -> StreamDiscretizer -> (BasicWindowBuffer, BasicWindowBuffer, BasicWindowBuffer -> Window partitioner -> Window Map -> Window Merger -> Window Map -> Window Flatten -> Map -> Stream Sink) (1/1) [DEPLOYING] > 13:28:48,709 INFO org.apache.flink.streaming.runtime.tasks.StreamTask - State backend for state checkpoints is set to jobmanager. > 13:28:48,759 INFO org.apache.flink.runtime.taskmanager.Task - Read Text File Source -> Map -> Flat Map -> Map -> StreamDiscretizer -> (BasicWindowBuffer, BasicWindowBuffer, BasicWindowBuffer -> Window partitioner -> Window Map -> Window Merger -> Window Map -> Window Flatten -> Map -> Stream Sink) (1/1) switched to RUNNING > 13:28:48,759 INFO org.apache.flink.api.common.io.LocatableInputSplitAssigner - Assigning remote split to host localhost > 13:28:48,759 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Read Text File Source -> Map -> Flat Map -> Map -> StreamDiscretizer -> (BasicWindowBuffer, BasicWindowBuffer, BasicWindowBuffer -> Window partitioner -> Window Map -> Window Merger -> Window Map -> Window Flatten -> Map -> Stream Sink) (1/1) (ff2bb914c620859de94262af78ac9269) switched from DEPLOYING to RUNNING > 13:28:48,786 INFO org.apache.flink.runtime.taskmanager.Task - Read Text File Source -> Map -> Flat Map -> Map -> StreamDiscretizer -> (BasicWindowBuffer, BasicWindowBuffer, BasicWindowBuffer -> Window partitioner -> Window Map -> Window Merger -> Window Map -> Window Flatten -> Map -> Stream Sink) (1/1) switched to FINISHED > 13:28:48,786 INFO org.apache.flink.runtime.taskmanager.Task - Freeing task resources for Read Text File Source -> Map -> Flat Map -> Map -> StreamDiscretizer -> (BasicWindowBuffer, BasicWindowBuffer, BasicWindowBuffer -> Window partitioner -> Window Map -> Window Merger -> Window Map -> Window Flatten -> Map -> Stream Sink) (1/1) > 13:28:48,786 INFO org.apache.flink.runtime.taskmanager.TaskManager - Unregistering task and sending final execution state FINISHED to JobManager for task Read Text File Source -> Map -> Flat Map -> Map -> StreamDiscretizer -> (BasicWindowBuffer, BasicWindowBuffer, BasicWindowBuffer -> Window partitioner -> Window Map -> Window Merger -> Window Map -> Window Flatten -> Map -> Stream Sink) (ff2bb914c620859de94262af78ac9269) > 13:28:48,787 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Read Text File Source -> Map -> Flat Map -> Map -> StreamDiscretizer -> (BasicWindowBuffer, BasicWindowBuffer, BasicWindowBuffer -> Window partitioner -> Window Map -> Window Merger -> Window Map -> Window Flatten -> Map -> Stream Sink) (1/1) (ff2bb914c620859de94262af78ac9269) switched from RUNNING to FINISHED > 13:28:48,787 INFO org.apache.flink.runtime.jobmanager.JobManager - Status of job bd1f75cb1db031b11794bf5e1123fd9a (Starting Query) changed to FINISHED. > 13:28:58,793 ERROR org.apache.flink.streaming.api.functions.sink.FileSinkFunction - Error while writing element. > java.lang.NullPointerException > at org.apache.flink.api.java.io.TextOutputFormat.writeRecord(TextOutputFormat.java:93) > at org.apache.flink.streaming.api.functions.sink.FileSinkFunction.flush(FileSinkFunction.java:94) > at org.apache.flink.streaming.api.functions.sink.FileSinkFunction.invoke(FileSinkFunction.java:65) > at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:35) > at org.apache.flink.streaming.runtime.tasks.OutputHandler$CopyingOperatorCollector.collect(OutputHandler.java:272) > at org.apache.flink.streaming.api.collector.CollectorWrapper.collect(CollectorWrapper.java:40) > at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:35) > at org.apache.flink.streaming.runtime.tasks.OutputHandler$CopyingOperatorCollector.collect(OutputHandler.java:272) > at org.apache.flink.streaming.api.collector.CollectorWrapper.collect(CollectorWrapper.java:40) > at org.apache.flink.streaming.api.operators.windowing.WindowFlattener.processElement(WindowFlattener.java:41) > at org.apache.flink.streaming.api.operators.windowing.WindowFlattener.processElement(WindowFlattener.java:28) > at org.apache.flink.streaming.runtime.tasks.OutputHandler$OperatorCollector.collect(OutputHandler.java:239) > at org.apache.flink.streaming.api.collector.CollectorWrapper.collect(CollectorWrapper.java:40) > at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:35) > at org.apache.flink.streaming.runtime.tasks.OutputHandler$OperatorCollector.collect(OutputHandler.java:239) > at org.apache.flink.streaming.api.collector.CollectorWrapper.collect(CollectorWrapper.java:40) > at org.apache.flink.streaming.api.operators.windowing.WindowMerger.processElement(WindowMerger.java:58) > at org.apache.flink.streaming.api.operators.windowing.WindowMerger.processElement(WindowMerger.java:32) > at org.apache.flink.streaming.runtime.tasks.OutputHandler$OperatorCollector.collect(OutputHandler.java:239) > at org.apache.flink.streaming.api.collector.CollectorWrapper.collect(CollectorWrapper.java:40) > at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:35) > at org.apache.flink.streaming.runtime.tasks.OutputHandler$OperatorCollector.collect(OutputHandler.java:239) > at org.apache.flink.streaming.api.collector.CollectorWrapper.collect(CollectorWrapper.java:40) > at org.apache.flink.streaming.api.operators.windowing.WindowPartitioner.processElement(WindowPartitioner.java:65) > at org.apache.flink.streaming.api.operators.windowing.WindowPartitioner.processElement(WindowPartitioner.java:29) > at org.apache.flink.streaming.runtime.tasks.OutputHandler$OperatorCollector.collect(OutputHandler.java:239) > at org.apache.flink.streaming.api.collector.CollectorWrapper.collect(CollectorWrapper.java:40) > at org.apache.flink.streaming.api.windowing.windowbuffer.BasicWindowBuffer.emitWindow(BasicWindowBuffer.java:43) > at org.apache.flink.streaming.api.operators.windowing.StreamWindowBuffer.handleWindowEvent(StreamWindowBuffer.java:55) > at org.apache.flink.streaming.api.operators.windowing.StreamWindowBuffer.handleWindowEvent(StreamWindowBuffer.java:60) > at org.apache.flink.streaming.api.operators.windowing.StreamWindowBuffer.processElement(StreamWindowBuffer.java:45) > at org.apache.flink.streaming.api.operators.windowing.StreamWindowBuffer.processElement(StreamWindowBuffer.java:29) > at org.apache.flink.streaming.runtime.tasks.OutputHandler$OperatorCollector.collect(OutputHandler.java:239) > at org.apache.flink.streaming.api.collector.CollectorWrapper.collect(CollectorWrapper.java:40) > at org.apache.flink.streaming.api.operators.windowing.StreamDiscretizer.emitWindow(StreamDiscretizer.java:133) > at org.apache.flink.streaming.api.operators.windowing.StreamDiscretizer.triggerOnFakeElement(StreamDiscretizer.java:121) > at org.apache.flink.streaming.api.operators.windowing.StreamDiscretizer$WindowingCallback.sendFakeElement(StreamDiscretizer.java:194) > at org.apache.flink.streaming.api.windowing.policy.TimeTriggerPolicy.activeFakeElementEmission(TimeTriggerPolicy.java:117) > at org.apache.flink.streaming.api.windowing.policy.TimeTriggerPolicy$TimeCheck.run(TimeTriggerPolicy.java:144) > at java.lang.Thread.run(Thread.java:745) > 13:28:58,794 ERROR org.apache.flink.streaming.runtime.tasks.OutputHandler - Could not forward element to operator. > java.lang.RuntimeException: java.lang.NullPointerException > at org.apache.flink.streaming.api.functions.sink.FileSinkFunction.flush(FileSinkFunction.java:108) > at org.apache.flink.streaming.api.functions.sink.FileSinkFunction.invoke(FileSinkFunction.java:65) > at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:35) > at org.apache.flink.streaming.runtime.tasks.OutputHandler$CopyingOperatorCollector.collect(OutputHandler.java:272) > at org.apache.flink.streaming.api.collector.CollectorWrapper.collect(CollectorWrapper.java:40) > at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:35) > at org.apache.flink.streaming.runtime.tasks.OutputHandler$CopyingOperatorCollector.collect(OutputHandler.java:272) > at org.apache.flink.streaming.api.collector.CollectorWrapper.collect(CollectorWrapper.java:40) > at org.apache.flink.streaming.api.operators.windowing.WindowFlattener.processElement(WindowFlattener.java:41) > at org.apache.flink.streaming.api.operators.windowing.WindowFlattener.processElement(WindowFlattener.java:28) > at org.apache.flink.streaming.runtime.tasks.OutputHandler$OperatorCollector.collect(OutputHandler.java:239) > at org.apache.flink.streaming.api.collector.CollectorWrapper.collect(CollectorWrapper.java:40) > at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:35) > at org.apache.flink.streaming.runtime.tasks.OutputHandler$OperatorCollector.collect(OutputHandler.java:239) > at org.apache.flink.streaming.api.collector.CollectorWrapper.collect(CollectorWrapper.java:40) > at org.apache.flink.streaming.api.operators.windowing.WindowMerger.processElement(WindowMerger.java:58) > at org.apache.flink.streaming.api.operators.windowing.WindowMerger.processElement(WindowMerger.java:32) > at org.apache.flink.streaming.runtime.tasks.OutputHandler$OperatorCollector.collect(OutputHandler.java:239) > at org.apache.flink.streaming.api.collector.CollectorWrapper.collect(CollectorWrapper.java:40) > at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:35) > at org.apache.flink.streaming.runtime.tasks.OutputHandler$OperatorCollector.collect(OutputHandler.java:239) > at org.apache.flink.streaming.api.collector.CollectorWrapper.collect(CollectorWrapper.java:40) > at org.apache.flink.streaming.api.operators.windowing.WindowPartitioner.processElement(WindowPartitioner.java:65) > at org.apache.flink.streaming.api.operators.windowing.WindowPartitioner.processElement(WindowPartitioner.java:29) > at org.apache.flink.streaming.runtime.tasks.OutputHandler$OperatorCollector.collect(OutputHandler.java:239) > at org.apache.flink.streaming.api.collector.CollectorWrapper.collect(CollectorWrapper.java:40) > at org.apache.flink.streaming.api.windowing.windowbuffer.BasicWindowBuffer.emitWindow(BasicWindowBuffer.java:43) > at org.apache.flink.streaming.api.operators.windowing.StreamWindowBuffer.handleWindowEvent(StreamWindowBuffer.java:55) > at org.apache.flink.streaming.api.operators.windowing.StreamWindowBuffer.handleWindowEvent(StreamWindowBuffer.java:60) > at org.apache.flink.streaming.api.operators.windowing.StreamWindowBuffer.processElement(StreamWindowBuffer.java:45) > at org.apache.flink.streaming.api.operators.windowing.StreamWindowBuffer.processElement(StreamWindowBuffer.java:29) > at org.apache.flink.streaming.runtime.tasks.OutputHandler$OperatorCollector.collect(OutputHandler.java:239) > at org.apache.flink.streaming.api.collector.CollectorWrapper.collect(CollectorWrapper.java:40) > at org.apache.flink.streaming.api.operators.windowing.StreamDiscretizer.emitWindow(StreamDiscretizer.java:133) > at org.apache.flink.streaming.api.operators.windowing.StreamDiscretizer.triggerOnFakeElement(StreamDiscretizer.java:121) > at org.apache.flink.streaming.api.operators.windowing.StreamDiscretizer$WindowingCallback.sendFakeElement(StreamDiscretizer.java:194) > at org.apache.flink.streaming.api.windowing.policy.TimeTriggerPolicy.activeFakeElementEmission(TimeTriggerPolicy.java:117) > at org.apache.flink.streaming.api.windowing.policy.TimeTriggerPolicy$TimeCheck.run(TimeTriggerPolicy.java:144) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.lang.NullPointerException > at org.apache.flink.api.java.io.TextOutputFormat.writeRecord(TextOutputFormat.java:93) > at org.apache.flink.streaming.api.functions.sink.FileSinkFunction.flush(FileSinkFunction.java:94) > ... 38 more > [...] > > > > Best Regards, > Philipp > |
Hi, no, this is unfortunately not fixed in the current master. Cheers, Aljoscha On Tue, 28 Jul 2015 at 15:29 Ufuk Celebi <[hidden email]> wrote: Hey Phillip, |
In reply to this post by Ufuk Celebi
Hey Ufuk,
I tried it with the newest 0.10-SNAPSHOT, but it leads to the same NullPointers. By the way: I have not touched the configuration and thus the parallelism is at 1. But taking the response by Aljoscha, this is expected. Best Regards, Philipp On 28.07.2015 15:29, Ufuk Celebi wrote: > Hey Phillip, > > thanks for reporting the problem. I think your assessment is correct. If the program is already finished, the threads throwing the Exceptions should have been cleaned up as well. > > I am not sure, but I think parts of the system touching this mechanism could have been reworked by Aljoscha in the current master branch. Is it possible for you to try it out? If yes, it would be great to know if it is fixed there. As far as I know, there were no API breaking changes in the meantime. > > @Aljoscha: do you think this is fixed with your latest changes in 0.10-SNAPSHOT? > > – Ufuk > > On 28 Jul 2015, at 14:02, Philipp Goetze <[hidden email]> wrote: |
Free forum by Nabble | Edit this page |