NullPointerException when working with Windows

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

NullPointerException when working with Windows

Philipp Goetze
Hey community,

I am not sure whether it is a bug or I am doing something wrong. I have a little snippet produced by our project (see below). When I execute it in Eclipse everything works fine. However, when deploying the Jar to the local flink installation I get NullPointer Exceptions after the program had already finished. I found out that it happens exactly after the time of the window trigger elapsed (10 seconds in this example). So it seems that there is still a thread running, although the program has already finished. I guess the thread does not get anymore input since the file was completely read already and thus produces NullPointer Exceptions when trying to write these null elements. But I think you know more about this.

FYI: I am using Flink-0.9.0-rc4 built with Scala 2.11

So here the code:

import org.apache.flink.streaming.api.scala._
import dbis.flink._
import java.util.concurrent.TimeUnit
import org.apache.flink.streaming.api.windowing.helper._
import org.apache.flink.util.Collector


object windowCount {

  def customgrpdMap(ts: Iterable[List[Any]], out: Collector[List[Any]]) = {
    out.collect(ts.groupBy(t => t(0)).flatMap(x => List(x._1,x._2)).toList)
  }

  def customcntdMap(ts: Iterable[List[Any]], out: Collector[List[Any]]) = {
    ts.foreach { t => out.collect(List(t(0),PigFuncs.count(t(1).asInstanceOf[Seq[Any]])))}
  }

  def tuplecntdToString(t: List[Any]): String = {
    implicit def anyToSeq(a: Any) = a.asInstanceOf[Seq[Any]]

    val sb = new StringBuilder
    sb.append(t(0))
    .append(",")
    .append(t(1))
    sb.toString
  }

  def main(args: Array[String]) {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val input = PigStorage().load(env, "src/it/resources/mary.txt")
    val words = input.flatMap(t => PigFuncs.tokenize(t(0).toString)).map(t => List(t))

    val win = words.window(Time.of(10, TimeUnit.SECONDS)).every(Time.of(10, TimeUnit.SECONDS))
    val grpd = win.groupBy(t => t(0)).mapWindow(customgrpdMap _)
    val cntd = grpd.mapWindow(customcntdMap _).flatten()

    cntd.map(t => tuplecntdToString(t)).writeAsText("marycounts.out")
    env.execute("Starting Query")
  }
}

And here the log output:

Exception in thread "Thread-32" java.lang.RuntimeException: java.lang.RuntimeException: java.lang.RuntimeException: java.lang.RuntimeException: java.lang.RuntimeException: java.lang.RuntimeException: java.lang.RuntimeException: java.lang.RuntimeException: java.lang.RuntimeException: java.lang.NullPointerException
    at org.apache.flink.streaming.runtime.tasks.OutputHandler$OperatorCollector.collect(OutputHandler.java:244)
    at org.apache.flink.streaming.api.collector.CollectorWrapper.collect(CollectorWrapper.java:40)
    at org.apache.flink.streaming.api.operators.windowing.StreamDiscretizer.emitWindow(StreamDiscretizer.java:133)
    at org.apache.flink.streaming.api.operators.windowing.StreamDiscretizer.triggerOnFakeElement(StreamDiscretizer.java:121)
    at org.apache.flink.streaming.api.operators.windowing.StreamDiscretizer$WindowingCallback.sendFakeElement(StreamDiscretizer.java:194)
    at org.apache.flink.streaming.api.windowing.policy.TimeTriggerPolicy.activeFakeElementEmission(TimeTriggerPolicy.java:117)
    at org.apache.flink.streaming.api.windowing.policy.TimeTriggerPolicy$TimeCheck.run(TimeTriggerPolicy.java:144)
    at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.RuntimeException: java.lang.RuntimeException: java.lang.RuntimeException: java.lang.RuntimeException: java.lang.RuntimeException: java.lang.RuntimeException: java.lang.RuntimeException: java.lang.RuntimeException: java.lang.NullPointerException
    at org.apache.flink.streaming.runtime.tasks.OutputHandler$OperatorCollector.collect(OutputHandler.java:244)
    at org.apache.flink.streaming.api.collector.CollectorWrapper.collect(CollectorWrapper.java:40)
    at org.apache.flink.streaming.api.windowing.windowbuffer.BasicWindowBuffer.emitWindow(BasicWindowBuffer.java:43)
    at org.apache.flink.streaming.api.operators.windowing.StreamWindowBuffer.handleWindowEvent(StreamWindowBuffer.java:55)
    at org.apache.flink.streaming.api.operators.windowing.StreamWindowBuffer.handleWindowEvent(StreamWindowBuffer.java:60)
    at org.apache.flink.streaming.api.operators.windowing.StreamWindowBuffer.processElement(StreamWindowBuffer.java:45)
    at org.apache.flink.streaming.api.operators.windowing.StreamWindowBuffer.processElement(StreamWindowBuffer.java:29)
    at org.apache.flink.streaming.runtime.tasks.OutputHandler$OperatorCollector.collect(OutputHandler.java:239)
    ... 7 more
Caused by: java.lang.RuntimeException: java.lang.RuntimeException: java.lang.RuntimeException: java.lang.RuntimeException: java.lang.RuntimeException: java.lang.RuntimeException: java.lang.RuntimeException: java.lang.NullPointerException
    at org.apache.flink.streaming.runtime.tasks.OutputHandler$OperatorCollector.collect(OutputHandler.java:244)
    at org.apache.flink.streaming.api.collector.CollectorWrapper.collect(CollectorWrapper.java:40)
    at org.apache.flink.streaming.api.operators.windowing.WindowPartitioner.processElement(WindowPartitioner.java:65)
    at org.apache.flink.streaming.api.operators.windowing.WindowPartitioner.processElement(WindowPartitioner.java:29)
    at org.apache.flink.streaming.runtime.tasks.OutputHandler$OperatorCollector.collect(OutputHandler.java:239)
    ... 14 more
Caused by: java.lang.RuntimeException: java.lang.RuntimeException: java.lang.RuntimeException: java.lang.RuntimeException: java.lang.RuntimeException: java.lang.RuntimeException: java.lang.NullPointerException
    at org.apache.flink.streaming.runtime.tasks.OutputHandler$OperatorCollector.collect(OutputHandler.java:244)
    at org.apache.flink.streaming.api.collector.CollectorWrapper.collect(CollectorWrapper.java:40)
    at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:35)
    at org.apache.flink.streaming.runtime.tasks.OutputHandler$OperatorCollector.collect(OutputHandler.java:239)
    ... 18 more
Caused by: java.lang.RuntimeException: java.lang.RuntimeException: java.lang.RuntimeException: java.lang.RuntimeException: java.lang.RuntimeException: java.lang.NullPointerException
    at org.apache.flink.streaming.runtime.tasks.OutputHandler$OperatorCollector.collect(OutputHandler.java:244)
    at org.apache.flink.streaming.api.collector.CollectorWrapper.collect(CollectorWrapper.java:40)
    at org.apache.flink.streaming.api.operators.windowing.WindowMerger.processElement(WindowMerger.java:58)
    at org.apache.flink.streaming.api.operators.windowing.WindowMerger.processElement(WindowMerger.java:32)
    at org.apache.flink.streaming.runtime.tasks.OutputHandler$OperatorCollector.collect(OutputHandler.java:239)
    ... 21 more
Caused by: java.lang.RuntimeException: java.lang.RuntimeException: java.lang.RuntimeException: java.lang.RuntimeException: java.lang.NullPointerException
    at org.apache.flink.streaming.runtime.tasks.OutputHandler$OperatorCollector.collect(OutputHandler.java:244)
    at org.apache.flink.streaming.api.collector.CollectorWrapper.collect(CollectorWrapper.java:40)
    at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:35)
    at org.apache.flink.streaming.runtime.tasks.OutputHandler$OperatorCollector.collect(OutputHandler.java:239)
    ... 25 more
Caused by: java.lang.RuntimeException: java.lang.RuntimeException: java.lang.RuntimeException: java.lang.NullPointerException
    at org.apache.flink.streaming.runtime.tasks.OutputHandler$CopyingOperatorCollector.collect(OutputHandler.java:277)
    at org.apache.flink.streaming.api.collector.CollectorWrapper.collect(CollectorWrapper.java:40)
    at org.apache.flink.streaming.api.operators.windowing.WindowFlattener.processElement(WindowFlattener.java:41)
    at org.apache.flink.streaming.api.operators.windowing.WindowFlattener.processElement(WindowFlattener.java:28)
    at org.apache.flink.streaming.runtime.tasks.OutputHandler$OperatorCollector.collect(OutputHandler.java:239)
    ... 28 more
Caused by: java.lang.RuntimeException: java.lang.RuntimeException: java.lang.NullPointerException
    at org.apache.flink.streaming.runtime.tasks.OutputHandler$CopyingOperatorCollector.collect(OutputHandler.java:277)
    at org.apache.flink.streaming.api.collector.CollectorWrapper.collect(CollectorWrapper.java:40)
    at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:35)
    at org.apache.flink.streaming.runtime.tasks.OutputHandler$CopyingOperatorCollector.collect(OutputHandler.java:272)
    ... 32 more
Caused by: java.lang.RuntimeException: java.lang.NullPointerException
    at org.apache.flink.streaming.api.functions.sink.FileSinkFunction.flush(FileSinkFunction.java:108)
    at org.apache.flink.streaming.api.functions.sink.FileSinkFunction.invoke(FileSinkFunction.java:65)
    at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:35)
    at org.apache.flink.streaming.runtime.tasks.OutputHandler$CopyingOperatorCollector.collect(OutputHandler.java:272)
    ... 35 more
Caused by: java.lang.NullPointerException
    at org.apache.flink.api.java.io.TextOutputFormat.writeRecord(TextOutputFormat.java:93)
    at org.apache.flink.streaming.api.functions.sink.FileSinkFunction.flush(FileSinkFunction.java:94)
    ... 38 more

And a part of the .out - File:

13:28:48,696 INFO  org.apache.flink.runtime.jobmanager.JobManager                - Received job bd1f75cb1db031b11794bf5e1123fd9a (Starting Query).
13:28:48,698 INFO  org.apache.flink.runtime.jobmanager.JobManager                - Scheduling job Starting Query.
13:28:48,698 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Read Text File Source -> Map -> Flat Map -> Map -> StreamDiscretizer -> (BasicWindowBuffer, BasicWindowBuffer, BasicWindowBuffer -> Window partitioner -> Window Map -> Window Merger -> Window Map -> Window Flatten -> Map -> Stream Sink) (1/1) (ff2bb914c620859de94262af78ac9269) switched from CREATED to SCHEDULED
13:28:48,698 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Read Text File Source -> Map -> Flat Map -> Map -> StreamDiscretizer -> (BasicWindowBuffer, BasicWindowBuffer, BasicWindowBuffer -> Window partitioner -> Window Map -> Window Merger -> Window Map -> Window Flatten -> Map -> Stream Sink) (1/1) (ff2bb914c620859de94262af78ac9269) switched from SCHEDULED to DEPLOYING
13:28:48,698 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Deploying Read Text File Source -> Map -> Flat Map -> Map -> StreamDiscretizer -> (BasicWindowBuffer, BasicWindowBuffer, BasicWindowBuffer -> Window partitioner -> Window Map -> Window Merger -> Window Map -> Window Flatten -> Map -> Stream Sink) (1/1) (attempt #0) to localhost
13:28:48,699 INFO  org.apache.flink.runtime.taskmanager.TaskManager              - Received task Read Text File Source -> Map -> Flat Map -> Map -> StreamDiscretizer -> (BasicWindowBuffer, BasicWindowBuffer, BasicWindowBuffer -> Window partitioner -> Window Map -> Window Merger -> Window Map -> Window Flatten -> Map -> Stream Sink) (1/1)
13:28:48,699 INFO  org.apache.flink.runtime.jobmanager.JobManager                - Status of job bd1f75cb1db031b11794bf5e1123fd9a (Starting Query) changed to RUNNING.
13:28:48,705 INFO  org.apache.flink.runtime.taskmanager.Task                     - Loading JAR files for task Read Text File Source -> Map -> Flat Map -> Map -> StreamDiscretizer -> (BasicWindowBuffer, BasicWindowBuffer, BasicWindowBuffer -> Window partitioner -> Window Map -> Window Merger -> Window Map -> Window Flatten -> Map -> Stream Sink) (1/1)
13:28:48,707 INFO  org.apache.flink.runtime.blob.BlobCache                       - Downloading f2612fe1d4aadc5206820be652dfa1019a66007c from localhost/127.0.0.1:47210
13:28:48,709 INFO  org.apache.flink.runtime.taskmanager.Task                     - Registering task at network: Read Text File Source -> Map -> Flat Map -> Map -> StreamDiscretizer -> (BasicWindowBuffer, BasicWindowBuffer, BasicWindowBuffer -> Window partitioner -> Window Map -> Window Merger -> Window Map -> Window Flatten -> Map -> Stream Sink) (1/1) [DEPLOYING]
13:28:48,709 INFO  org.apache.flink.streaming.runtime.tasks.StreamTask           - State backend for state checkpoints is set to jobmanager.
13:28:48,759 INFO  org.apache.flink.runtime.taskmanager.Task                     - Read Text File Source -> Map -> Flat Map -> Map -> StreamDiscretizer -> (BasicWindowBuffer, BasicWindowBuffer, BasicWindowBuffer -> Window partitioner -> Window Map -> Window Merger -> Window Map -> Window Flatten -> Map -> Stream Sink) (1/1) switched to RUNNING
13:28:48,759 INFO  org.apache.flink.api.common.io.LocatableInputSplitAssigner    - Assigning remote split to host localhost
13:28:48,759 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Read Text File Source -> Map -> Flat Map -> Map -> StreamDiscretizer -> (BasicWindowBuffer, BasicWindowBuffer, BasicWindowBuffer -> Window partitioner -> Window Map -> Window Merger -> Window Map -> Window Flatten -> Map -> Stream Sink) (1/1) (ff2bb914c620859de94262af78ac9269) switched from DEPLOYING to RUNNING
13:28:48,786 INFO  org.apache.flink.runtime.taskmanager.Task                     - Read Text File Source -> Map -> Flat Map -> Map -> StreamDiscretizer -> (BasicWindowBuffer, BasicWindowBuffer, BasicWindowBuffer -> Window partitioner -> Window Map -> Window Merger -> Window Map -> Window Flatten -> Map -> Stream Sink) (1/1) switched to FINISHED
13:28:48,786 INFO  org.apache.flink.runtime.taskmanager.Task                     - Freeing task resources for Read Text File Source -> Map -> Flat Map -> Map -> StreamDiscretizer -> (BasicWindowBuffer, BasicWindowBuffer, BasicWindowBuffer -> Window partitioner -> Window Map -> Window Merger -> Window Map -> Window Flatten -> Map -> Stream Sink) (1/1)
13:28:48,786 INFO  org.apache.flink.runtime.taskmanager.TaskManager              - Unregistering task and sending final execution state FINISHED to JobManager for task Read Text File Source -> Map -> Flat Map -> Map -> StreamDiscretizer -> (BasicWindowBuffer, BasicWindowBuffer, BasicWindowBuffer -> Window partitioner -> Window Map -> Window Merger -> Window Map -> Window Flatten -> Map -> Stream Sink) (ff2bb914c620859de94262af78ac9269)
13:28:48,787 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Read Text File Source -> Map -> Flat Map -> Map -> StreamDiscretizer -> (BasicWindowBuffer, BasicWindowBuffer, BasicWindowBuffer -> Window partitioner -> Window Map -> Window Merger -> Window Map -> Window Flatten -> Map -> Stream Sink) (1/1) (ff2bb914c620859de94262af78ac9269) switched from RUNNING to FINISHED
13:28:48,787 INFO  org.apache.flink.runtime.jobmanager.JobManager                - Status of job bd1f75cb1db031b11794bf5e1123fd9a (Starting Query) changed to FINISHED.
13:28:58,793 ERROR org.apache.flink.streaming.api.functions.sink.FileSinkFunction  - Error while writing element.
java.lang.NullPointerException
    at org.apache.flink.api.java.io.TextOutputFormat.writeRecord(TextOutputFormat.java:93)
    at org.apache.flink.streaming.api.functions.sink.FileSinkFunction.flush(FileSinkFunction.java:94)
    at org.apache.flink.streaming.api.functions.sink.FileSinkFunction.invoke(FileSinkFunction.java:65)
    at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:35)
    at org.apache.flink.streaming.runtime.tasks.OutputHandler$CopyingOperatorCollector.collect(OutputHandler.java:272)
    at org.apache.flink.streaming.api.collector.CollectorWrapper.collect(CollectorWrapper.java:40)
    at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:35)
    at org.apache.flink.streaming.runtime.tasks.OutputHandler$CopyingOperatorCollector.collect(OutputHandler.java:272)
    at org.apache.flink.streaming.api.collector.CollectorWrapper.collect(CollectorWrapper.java:40)
    at org.apache.flink.streaming.api.operators.windowing.WindowFlattener.processElement(WindowFlattener.java:41)
    at org.apache.flink.streaming.api.operators.windowing.WindowFlattener.processElement(WindowFlattener.java:28)
    at org.apache.flink.streaming.runtime.tasks.OutputHandler$OperatorCollector.collect(OutputHandler.java:239)
    at org.apache.flink.streaming.api.collector.CollectorWrapper.collect(CollectorWrapper.java:40)
    at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:35)
    at org.apache.flink.streaming.runtime.tasks.OutputHandler$OperatorCollector.collect(OutputHandler.java:239)
    at org.apache.flink.streaming.api.collector.CollectorWrapper.collect(CollectorWrapper.java:40)
    at org.apache.flink.streaming.api.operators.windowing.WindowMerger.processElement(WindowMerger.java:58)
    at org.apache.flink.streaming.api.operators.windowing.WindowMerger.processElement(WindowMerger.java:32)
    at org.apache.flink.streaming.runtime.tasks.OutputHandler$OperatorCollector.collect(OutputHandler.java:239)
    at org.apache.flink.streaming.api.collector.CollectorWrapper.collect(CollectorWrapper.java:40)
    at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:35)
    at org.apache.flink.streaming.runtime.tasks.OutputHandler$OperatorCollector.collect(OutputHandler.java:239)
    at org.apache.flink.streaming.api.collector.CollectorWrapper.collect(CollectorWrapper.java:40)
    at org.apache.flink.streaming.api.operators.windowing.WindowPartitioner.processElement(WindowPartitioner.java:65)
    at org.apache.flink.streaming.api.operators.windowing.WindowPartitioner.processElement(WindowPartitioner.java:29)
    at org.apache.flink.streaming.runtime.tasks.OutputHandler$OperatorCollector.collect(OutputHandler.java:239)
    at org.apache.flink.streaming.api.collector.CollectorWrapper.collect(CollectorWrapper.java:40)
    at org.apache.flink.streaming.api.windowing.windowbuffer.BasicWindowBuffer.emitWindow(BasicWindowBuffer.java:43)
    at org.apache.flink.streaming.api.operators.windowing.StreamWindowBuffer.handleWindowEvent(StreamWindowBuffer.java:55)
    at org.apache.flink.streaming.api.operators.windowing.StreamWindowBuffer.handleWindowEvent(StreamWindowBuffer.java:60)
    at org.apache.flink.streaming.api.operators.windowing.StreamWindowBuffer.processElement(StreamWindowBuffer.java:45)
    at org.apache.flink.streaming.api.operators.windowing.StreamWindowBuffer.processElement(StreamWindowBuffer.java:29)
    at org.apache.flink.streaming.runtime.tasks.OutputHandler$OperatorCollector.collect(OutputHandler.java:239)
    at org.apache.flink.streaming.api.collector.CollectorWrapper.collect(CollectorWrapper.java:40)
    at org.apache.flink.streaming.api.operators.windowing.StreamDiscretizer.emitWindow(StreamDiscretizer.java:133)
    at org.apache.flink.streaming.api.operators.windowing.StreamDiscretizer.triggerOnFakeElement(StreamDiscretizer.java:121)
    at org.apache.flink.streaming.api.operators.windowing.StreamDiscretizer$WindowingCallback.sendFakeElement(StreamDiscretizer.java:194)
    at org.apache.flink.streaming.api.windowing.policy.TimeTriggerPolicy.activeFakeElementEmission(TimeTriggerPolicy.java:117)
    at org.apache.flink.streaming.api.windowing.policy.TimeTriggerPolicy$TimeCheck.run(TimeTriggerPolicy.java:144)
    at java.lang.Thread.run(Thread.java:745)
13:28:58,794 ERROR org.apache.flink.streaming.runtime.tasks.OutputHandler        - Could not forward element to operator.
java.lang.RuntimeException: java.lang.NullPointerException
    at org.apache.flink.streaming.api.functions.sink.FileSinkFunction.flush(FileSinkFunction.java:108)
    at org.apache.flink.streaming.api.functions.sink.FileSinkFunction.invoke(FileSinkFunction.java:65)
    at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:35)
    at org.apache.flink.streaming.runtime.tasks.OutputHandler$CopyingOperatorCollector.collect(OutputHandler.java:272)
    at org.apache.flink.streaming.api.collector.CollectorWrapper.collect(CollectorWrapper.java:40)
    at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:35)
    at org.apache.flink.streaming.runtime.tasks.OutputHandler$CopyingOperatorCollector.collect(OutputHandler.java:272)
    at org.apache.flink.streaming.api.collector.CollectorWrapper.collect(CollectorWrapper.java:40)
    at org.apache.flink.streaming.api.operators.windowing.WindowFlattener.processElement(WindowFlattener.java:41)
    at org.apache.flink.streaming.api.operators.windowing.WindowFlattener.processElement(WindowFlattener.java:28)
    at org.apache.flink.streaming.runtime.tasks.OutputHandler$OperatorCollector.collect(OutputHandler.java:239)
    at org.apache.flink.streaming.api.collector.CollectorWrapper.collect(CollectorWrapper.java:40)
    at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:35)
    at org.apache.flink.streaming.runtime.tasks.OutputHandler$OperatorCollector.collect(OutputHandler.java:239)
    at org.apache.flink.streaming.api.collector.CollectorWrapper.collect(CollectorWrapper.java:40)
    at org.apache.flink.streaming.api.operators.windowing.WindowMerger.processElement(WindowMerger.java:58)
    at org.apache.flink.streaming.api.operators.windowing.WindowMerger.processElement(WindowMerger.java:32)
    at org.apache.flink.streaming.runtime.tasks.OutputHandler$OperatorCollector.collect(OutputHandler.java:239)
    at org.apache.flink.streaming.api.collector.CollectorWrapper.collect(CollectorWrapper.java:40)
    at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:35)
    at org.apache.flink.streaming.runtime.tasks.OutputHandler$OperatorCollector.collect(OutputHandler.java:239)
    at org.apache.flink.streaming.api.collector.CollectorWrapper.collect(CollectorWrapper.java:40)
    at org.apache.flink.streaming.api.operators.windowing.WindowPartitioner.processElement(WindowPartitioner.java:65)
    at org.apache.flink.streaming.api.operators.windowing.WindowPartitioner.processElement(WindowPartitioner.java:29)
    at org.apache.flink.streaming.runtime.tasks.OutputHandler$OperatorCollector.collect(OutputHandler.java:239)
    at org.apache.flink.streaming.api.collector.CollectorWrapper.collect(CollectorWrapper.java:40)
    at org.apache.flink.streaming.api.windowing.windowbuffer.BasicWindowBuffer.emitWindow(BasicWindowBuffer.java:43)
    at org.apache.flink.streaming.api.operators.windowing.StreamWindowBuffer.handleWindowEvent(StreamWindowBuffer.java:55)
    at org.apache.flink.streaming.api.operators.windowing.StreamWindowBuffer.handleWindowEvent(StreamWindowBuffer.java:60)
    at org.apache.flink.streaming.api.operators.windowing.StreamWindowBuffer.processElement(StreamWindowBuffer.java:45)
    at org.apache.flink.streaming.api.operators.windowing.StreamWindowBuffer.processElement(StreamWindowBuffer.java:29)
    at org.apache.flink.streaming.runtime.tasks.OutputHandler$OperatorCollector.collect(OutputHandler.java:239)
    at org.apache.flink.streaming.api.collector.CollectorWrapper.collect(CollectorWrapper.java:40)
    at org.apache.flink.streaming.api.operators.windowing.StreamDiscretizer.emitWindow(StreamDiscretizer.java:133)
    at org.apache.flink.streaming.api.operators.windowing.StreamDiscretizer.triggerOnFakeElement(StreamDiscretizer.java:121)
    at org.apache.flink.streaming.api.operators.windowing.StreamDiscretizer$WindowingCallback.sendFakeElement(StreamDiscretizer.java:194)
    at org.apache.flink.streaming.api.windowing.policy.TimeTriggerPolicy.activeFakeElementEmission(TimeTriggerPolicy.java:117)
    at org.apache.flink.streaming.api.windowing.policy.TimeTriggerPolicy$TimeCheck.run(TimeTriggerPolicy.java:144)
    at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.NullPointerException
    at org.apache.flink.api.java.io.TextOutputFormat.writeRecord(TextOutputFormat.java:93)
    at org.apache.flink.streaming.api.functions.sink.FileSinkFunction.flush(FileSinkFunction.java:94)
    ... 38 more
[...]



Best Regards,
Philipp

Reply | Threaded
Open this post in threaded view
|

Re: NullPointerException when working with Windows

Ufuk Celebi
Hey Phillip,

thanks for reporting the problem. I think your assessment is correct. If the program is already finished, the threads throwing the Exceptions should have been cleaned up as well.

I am not sure, but I think parts of the system touching this mechanism could have been reworked by Aljoscha in the current master branch. Is it possible for you to try it out? If yes, it would be great to know if it is fixed there. As far as I know, there were no API breaking changes in the meantime.

@Aljoscha: do you think this is fixed with your latest changes in 0.10-SNAPSHOT?

– Ufuk

On 28 Jul 2015, at 14:02, Philipp Goetze <[hidden email]> wrote:

> Hey community,
>
> I am not sure whether it is a bug or I am doing something wrong. I have a little snippet produced by our project (see below). When I execute it in Eclipse everything works fine. However, when deploying the Jar to the local flink installation I get NullPointer Exceptions after the program had already finished. I found out that it happens exactly after the time of the window trigger elapsed (10 seconds in this example). So it seems that there is still a thread running, although the program has already finished. I guess the thread does not get anymore input since the file was completely read already and thus produces NullPointer Exceptions when trying to write these null elements. But I think you know more about this.
>
> FYI: I am using Flink-0.9.0-rc4 built with Scala 2.11
>
> So here the code:
>
> import org.apache.flink.streaming.api.scala._
> import dbis.flink._
> import java.util.concurrent.TimeUnit
> import org.apache.flink.streaming.api.windowing.helper._
> import org.apache.flink.util.Collector
>
>
> object windowCount {
>
>   def customgrpdMap(ts: Iterable[List[Any]], out: Collector[List[Any]]) = {
>     out.collect(ts.groupBy(t => t(0)).flatMap(x => List(x._1,x._2)).toList)
>   }
>
>   def customcntdMap(ts: Iterable[List[Any]], out: Collector[List[Any]]) = {
>     ts.foreach { t => out.collect(List(t(0),PigFuncs.count(t(1).asInstanceOf[Seq[Any]])))}
>   }
>
>   def tuplecntdToString(t: List[Any]): String = {
>     implicit def anyToSeq(a: Any) = a.asInstanceOf[Seq[Any]]
>
>     val sb = new StringBuilder
>     sb.append(t(0))
>     .append(",")
>     .append(t(1))
>     sb.toString
>   }
>
>   def main(args: Array[String]) {
>     val env = StreamExecutionEnvironment.getExecutionEnvironment
>     val input = PigStorage().load(env, "src/it/resources/mary.txt")
>     val words = input.flatMap(t => PigFuncs.tokenize(t(0).toString)).map(t => List(t))
>
>     val win = words.window(Time.of(10, TimeUnit.SECONDS)).every(Time.of(10, TimeUnit.SECONDS))
>     val grpd = win.groupBy(t => t(0)).mapWindow(customgrpdMap _)
>     val cntd = grpd.mapWindow(customcntdMap _).flatten()
>
>     cntd.map(t => tuplecntdToString(t)).writeAsText("marycounts.out")
>     env.execute("Starting Query")
>   }
> }
>
> And here the log output:
>
> Exception in thread "Thread-32" java.lang.RuntimeException: java.lang.RuntimeException: java.lang.RuntimeException: java.lang.RuntimeException: java.lang.RuntimeException: java.lang.RuntimeException: java.lang.RuntimeException: java.lang.RuntimeException: java.lang.RuntimeException: java.lang.NullPointerException
>     at org.apache.flink.streaming.runtime.tasks.OutputHandler$OperatorCollector.collect(OutputHandler.java:244)
>     at org.apache.flink.streaming.api.collector.CollectorWrapper.collect(CollectorWrapper.java:40)
>     at org.apache.flink.streaming.api.operators.windowing.StreamDiscretizer.emitWindow(StreamDiscretizer.java:133)
>     at org.apache.flink.streaming.api.operators.windowing.StreamDiscretizer.triggerOnFakeElement(StreamDiscretizer.java:121)
>     at org.apache.flink.streaming.api.operators.windowing.StreamDiscretizer$WindowingCallback.sendFakeElement(StreamDiscretizer.java:194)
>     at org.apache.flink.streaming.api.windowing.policy.TimeTriggerPolicy.activeFakeElementEmission(TimeTriggerPolicy.java:117)
>     at org.apache.flink.streaming.api.windowing.policy.TimeTriggerPolicy$TimeCheck.run(TimeTriggerPolicy.java:144)
>     at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.RuntimeException: java.lang.RuntimeException: java.lang.RuntimeException: java.lang.RuntimeException: java.lang.RuntimeException: java.lang.RuntimeException: java.lang.RuntimeException: java.lang.RuntimeException: java.lang.NullPointerException
>     at org.apache.flink.streaming.runtime.tasks.OutputHandler$OperatorCollector.collect(OutputHandler.java:244)
>     at org.apache.flink.streaming.api.collector.CollectorWrapper.collect(CollectorWrapper.java:40)
>     at org.apache.flink.streaming.api.windowing.windowbuffer.BasicWindowBuffer.emitWindow(BasicWindowBuffer.java:43)
>     at org.apache.flink.streaming.api.operators.windowing.StreamWindowBuffer.handleWindowEvent(StreamWindowBuffer.java:55)
>     at org.apache.flink.streaming.api.operators.windowing.StreamWindowBuffer.handleWindowEvent(StreamWindowBuffer.java:60)
>     at org.apache.flink.streaming.api.operators.windowing.StreamWindowBuffer.processElement(StreamWindowBuffer.java:45)
>     at org.apache.flink.streaming.api.operators.windowing.StreamWindowBuffer.processElement(StreamWindowBuffer.java:29)
>     at org.apache.flink.streaming.runtime.tasks.OutputHandler$OperatorCollector.collect(OutputHandler.java:239)
>     ... 7 more
> Caused by: java.lang.RuntimeException: java.lang.RuntimeException: java.lang.RuntimeException: java.lang.RuntimeException: java.lang.RuntimeException: java.lang.RuntimeException: java.lang.RuntimeException: java.lang.NullPointerException
>     at org.apache.flink.streaming.runtime.tasks.OutputHandler$OperatorCollector.collect(OutputHandler.java:244)
>     at org.apache.flink.streaming.api.collector.CollectorWrapper.collect(CollectorWrapper.java:40)
>     at org.apache.flink.streaming.api.operators.windowing.WindowPartitioner.processElement(WindowPartitioner.java:65)
>     at org.apache.flink.streaming.api.operators.windowing.WindowPartitioner.processElement(WindowPartitioner.java:29)
>     at org.apache.flink.streaming.runtime.tasks.OutputHandler$OperatorCollector.collect(OutputHandler.java:239)
>     ... 14 more
> Caused by: java.lang.RuntimeException: java.lang.RuntimeException: java.lang.RuntimeException: java.lang.RuntimeException: java.lang.RuntimeException: java.lang.RuntimeException: java.lang.NullPointerException
>     at org.apache.flink.streaming.runtime.tasks.OutputHandler$OperatorCollector.collect(OutputHandler.java:244)
>     at org.apache.flink.streaming.api.collector.CollectorWrapper.collect(CollectorWrapper.java:40)
>     at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:35)
>     at org.apache.flink.streaming.runtime.tasks.OutputHandler$OperatorCollector.collect(OutputHandler.java:239)
>     ... 18 more
> Caused by: java.lang.RuntimeException: java.lang.RuntimeException: java.lang.RuntimeException: java.lang.RuntimeException: java.lang.RuntimeException: java.lang.NullPointerException
>     at org.apache.flink.streaming.runtime.tasks.OutputHandler$OperatorCollector.collect(OutputHandler.java:244)
>     at org.apache.flink.streaming.api.collector.CollectorWrapper.collect(CollectorWrapper.java:40)
>     at org.apache.flink.streaming.api.operators.windowing.WindowMerger.processElement(WindowMerger.java:58)
>     at org.apache.flink.streaming.api.operators.windowing.WindowMerger.processElement(WindowMerger.java:32)
>     at org.apache.flink.streaming.runtime.tasks.OutputHandler$OperatorCollector.collect(OutputHandler.java:239)
>     ... 21 more
> Caused by: java.lang.RuntimeException: java.lang.RuntimeException: java.lang.RuntimeException: java.lang.RuntimeException: java.lang.NullPointerException
>     at org.apache.flink.streaming.runtime.tasks.OutputHandler$OperatorCollector.collect(OutputHandler.java:244)
>     at org.apache.flink.streaming.api.collector.CollectorWrapper.collect(CollectorWrapper.java:40)
>     at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:35)
>     at org.apache.flink.streaming.runtime.tasks.OutputHandler$OperatorCollector.collect(OutputHandler.java:239)
>     ... 25 more
> Caused by: java.lang.RuntimeException: java.lang.RuntimeException: java.lang.RuntimeException: java.lang.NullPointerException
>     at org.apache.flink.streaming.runtime.tasks.OutputHandler$CopyingOperatorCollector.collect(OutputHandler.java:277)
>     at org.apache.flink.streaming.api.collector.CollectorWrapper.collect(CollectorWrapper.java:40)
>     at org.apache.flink.streaming.api.operators.windowing.WindowFlattener.processElement(WindowFlattener.java:41)
>     at org.apache.flink.streaming.api.operators.windowing.WindowFlattener.processElement(WindowFlattener.java:28)
>     at org.apache.flink.streaming.runtime.tasks.OutputHandler$OperatorCollector.collect(OutputHandler.java:239)
>     ... 28 more
> Caused by: java.lang.RuntimeException: java.lang.RuntimeException: java.lang.NullPointerException
>     at org.apache.flink.streaming.runtime.tasks.OutputHandler$CopyingOperatorCollector.collect(OutputHandler.java:277)
>     at org.apache.flink.streaming.api.collector.CollectorWrapper.collect(CollectorWrapper.java:40)
>     at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:35)
>     at org.apache.flink.streaming.runtime.tasks.OutputHandler$CopyingOperatorCollector.collect(OutputHandler.java:272)
>     ... 32 more
> Caused by: java.lang.RuntimeException: java.lang.NullPointerException
>     at org.apache.flink.streaming.api.functions.sink.FileSinkFunction.flush(FileSinkFunction.java:108)
>     at org.apache.flink.streaming.api.functions.sink.FileSinkFunction.invoke(FileSinkFunction.java:65)
>     at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:35)
>     at org.apache.flink.streaming.runtime.tasks.OutputHandler$CopyingOperatorCollector.collect(OutputHandler.java:272)
>     ... 35 more
> Caused by: java.lang.NullPointerException
>     at org.apache.flink.api.java.io.TextOutputFormat.writeRecord(TextOutputFormat.java:93)
>     at org.apache.flink.streaming.api.functions.sink.FileSinkFunction.flush(FileSinkFunction.java:94)
>     ... 38 more
>
> And a part of the .out - File:
>
> 13:28:48,696 INFO  org.apache.flink.runtime.jobmanager.JobManager                - Received job bd1f75cb1db031b11794bf5e1123fd9a (Starting Query).
> 13:28:48,698 INFO  org.apache.flink.runtime.jobmanager.JobManager                - Scheduling job Starting Query.
> 13:28:48,698 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Read Text File Source -> Map -> Flat Map -> Map -> StreamDiscretizer -> (BasicWindowBuffer, BasicWindowBuffer, BasicWindowBuffer -> Window partitioner -> Window Map -> Window Merger -> Window Map -> Window Flatten -> Map -> Stream Sink) (1/1) (ff2bb914c620859de94262af78ac9269) switched from CREATED to SCHEDULED
> 13:28:48,698 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Read Text File Source -> Map -> Flat Map -> Map -> StreamDiscretizer -> (BasicWindowBuffer, BasicWindowBuffer, BasicWindowBuffer -> Window partitioner -> Window Map -> Window Merger -> Window Map -> Window Flatten -> Map -> Stream Sink) (1/1) (ff2bb914c620859de94262af78ac9269) switched from SCHEDULED to DEPLOYING
> 13:28:48,698 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Deploying Read Text File Source -> Map -> Flat Map -> Map -> StreamDiscretizer -> (BasicWindowBuffer, BasicWindowBuffer, BasicWindowBuffer -> Window partitioner -> Window Map -> Window Merger -> Window Map -> Window Flatten -> Map -> Stream Sink) (1/1) (attempt #0) to localhost
> 13:28:48,699 INFO  org.apache.flink.runtime.taskmanager.TaskManager              - Received task Read Text File Source -> Map -> Flat Map -> Map -> StreamDiscretizer -> (BasicWindowBuffer, BasicWindowBuffer, BasicWindowBuffer -> Window partitioner -> Window Map -> Window Merger -> Window Map -> Window Flatten -> Map -> Stream Sink) (1/1)
> 13:28:48,699 INFO  org.apache.flink.runtime.jobmanager.JobManager                - Status of job bd1f75cb1db031b11794bf5e1123fd9a (Starting Query) changed to RUNNING.
> 13:28:48,705 INFO  org.apache.flink.runtime.taskmanager.Task                     - Loading JAR files for task Read Text File Source -> Map -> Flat Map -> Map -> StreamDiscretizer -> (BasicWindowBuffer, BasicWindowBuffer, BasicWindowBuffer -> Window partitioner -> Window Map -> Window Merger -> Window Map -> Window Flatten -> Map -> Stream Sink) (1/1)
> 13:28:48,707 INFO  org.apache.flink.runtime.blob.BlobCache                       - Downloading f2612fe1d4aadc5206820be652dfa1019a66007c from localhost/127.0.0.1:47210
> 13:28:48,709 INFO  org.apache.flink.runtime.taskmanager.Task                     - Registering task at network: Read Text File Source -> Map -> Flat Map -> Map -> StreamDiscretizer -> (BasicWindowBuffer, BasicWindowBuffer, BasicWindowBuffer -> Window partitioner -> Window Map -> Window Merger -> Window Map -> Window Flatten -> Map -> Stream Sink) (1/1) [DEPLOYING]
> 13:28:48,709 INFO  org.apache.flink.streaming.runtime.tasks.StreamTask           - State backend for state checkpoints is set to jobmanager.
> 13:28:48,759 INFO  org.apache.flink.runtime.taskmanager.Task                     - Read Text File Source -> Map -> Flat Map -> Map -> StreamDiscretizer -> (BasicWindowBuffer, BasicWindowBuffer, BasicWindowBuffer -> Window partitioner -> Window Map -> Window Merger -> Window Map -> Window Flatten -> Map -> Stream Sink) (1/1) switched to RUNNING
> 13:28:48,759 INFO  org.apache.flink.api.common.io.LocatableInputSplitAssigner    - Assigning remote split to host localhost
> 13:28:48,759 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Read Text File Source -> Map -> Flat Map -> Map -> StreamDiscretizer -> (BasicWindowBuffer, BasicWindowBuffer, BasicWindowBuffer -> Window partitioner -> Window Map -> Window Merger -> Window Map -> Window Flatten -> Map -> Stream Sink) (1/1) (ff2bb914c620859de94262af78ac9269) switched from DEPLOYING to RUNNING
> 13:28:48,786 INFO  org.apache.flink.runtime.taskmanager.Task                     - Read Text File Source -> Map -> Flat Map -> Map -> StreamDiscretizer -> (BasicWindowBuffer, BasicWindowBuffer, BasicWindowBuffer -> Window partitioner -> Window Map -> Window Merger -> Window Map -> Window Flatten -> Map -> Stream Sink) (1/1) switched to FINISHED
> 13:28:48,786 INFO  org.apache.flink.runtime.taskmanager.Task                     - Freeing task resources for Read Text File Source -> Map -> Flat Map -> Map -> StreamDiscretizer -> (BasicWindowBuffer, BasicWindowBuffer, BasicWindowBuffer -> Window partitioner -> Window Map -> Window Merger -> Window Map -> Window Flatten -> Map -> Stream Sink) (1/1)
> 13:28:48,786 INFO  org.apache.flink.runtime.taskmanager.TaskManager              - Unregistering task and sending final execution state FINISHED to JobManager for task Read Text File Source -> Map -> Flat Map -> Map -> StreamDiscretizer -> (BasicWindowBuffer, BasicWindowBuffer, BasicWindowBuffer -> Window partitioner -> Window Map -> Window Merger -> Window Map -> Window Flatten -> Map -> Stream Sink) (ff2bb914c620859de94262af78ac9269)
> 13:28:48,787 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Read Text File Source -> Map -> Flat Map -> Map -> StreamDiscretizer -> (BasicWindowBuffer, BasicWindowBuffer, BasicWindowBuffer -> Window partitioner -> Window Map -> Window Merger -> Window Map -> Window Flatten -> Map -> Stream Sink) (1/1) (ff2bb914c620859de94262af78ac9269) switched from RUNNING to FINISHED
> 13:28:48,787 INFO  org.apache.flink.runtime.jobmanager.JobManager                - Status of job bd1f75cb1db031b11794bf5e1123fd9a (Starting Query) changed to FINISHED.
> 13:28:58,793 ERROR org.apache.flink.streaming.api.functions.sink.FileSinkFunction  - Error while writing element.
> java.lang.NullPointerException
>     at org.apache.flink.api.java.io.TextOutputFormat.writeRecord(TextOutputFormat.java:93)
>     at org.apache.flink.streaming.api.functions.sink.FileSinkFunction.flush(FileSinkFunction.java:94)
>     at org.apache.flink.streaming.api.functions.sink.FileSinkFunction.invoke(FileSinkFunction.java:65)
>     at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:35)
>     at org.apache.flink.streaming.runtime.tasks.OutputHandler$CopyingOperatorCollector.collect(OutputHandler.java:272)
>     at org.apache.flink.streaming.api.collector.CollectorWrapper.collect(CollectorWrapper.java:40)
>     at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:35)
>     at org.apache.flink.streaming.runtime.tasks.OutputHandler$CopyingOperatorCollector.collect(OutputHandler.java:272)
>     at org.apache.flink.streaming.api.collector.CollectorWrapper.collect(CollectorWrapper.java:40)
>     at org.apache.flink.streaming.api.operators.windowing.WindowFlattener.processElement(WindowFlattener.java:41)
>     at org.apache.flink.streaming.api.operators.windowing.WindowFlattener.processElement(WindowFlattener.java:28)
>     at org.apache.flink.streaming.runtime.tasks.OutputHandler$OperatorCollector.collect(OutputHandler.java:239)
>     at org.apache.flink.streaming.api.collector.CollectorWrapper.collect(CollectorWrapper.java:40)
>     at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:35)
>     at org.apache.flink.streaming.runtime.tasks.OutputHandler$OperatorCollector.collect(OutputHandler.java:239)
>     at org.apache.flink.streaming.api.collector.CollectorWrapper.collect(CollectorWrapper.java:40)
>     at org.apache.flink.streaming.api.operators.windowing.WindowMerger.processElement(WindowMerger.java:58)
>     at org.apache.flink.streaming.api.operators.windowing.WindowMerger.processElement(WindowMerger.java:32)
>     at org.apache.flink.streaming.runtime.tasks.OutputHandler$OperatorCollector.collect(OutputHandler.java:239)
>     at org.apache.flink.streaming.api.collector.CollectorWrapper.collect(CollectorWrapper.java:40)
>     at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:35)
>     at org.apache.flink.streaming.runtime.tasks.OutputHandler$OperatorCollector.collect(OutputHandler.java:239)
>     at org.apache.flink.streaming.api.collector.CollectorWrapper.collect(CollectorWrapper.java:40)
>     at org.apache.flink.streaming.api.operators.windowing.WindowPartitioner.processElement(WindowPartitioner.java:65)
>     at org.apache.flink.streaming.api.operators.windowing.WindowPartitioner.processElement(WindowPartitioner.java:29)
>     at org.apache.flink.streaming.runtime.tasks.OutputHandler$OperatorCollector.collect(OutputHandler.java:239)
>     at org.apache.flink.streaming.api.collector.CollectorWrapper.collect(CollectorWrapper.java:40)
>     at org.apache.flink.streaming.api.windowing.windowbuffer.BasicWindowBuffer.emitWindow(BasicWindowBuffer.java:43)
>     at org.apache.flink.streaming.api.operators.windowing.StreamWindowBuffer.handleWindowEvent(StreamWindowBuffer.java:55)
>     at org.apache.flink.streaming.api.operators.windowing.StreamWindowBuffer.handleWindowEvent(StreamWindowBuffer.java:60)
>     at org.apache.flink.streaming.api.operators.windowing.StreamWindowBuffer.processElement(StreamWindowBuffer.java:45)
>     at org.apache.flink.streaming.api.operators.windowing.StreamWindowBuffer.processElement(StreamWindowBuffer.java:29)
>     at org.apache.flink.streaming.runtime.tasks.OutputHandler$OperatorCollector.collect(OutputHandler.java:239)
>     at org.apache.flink.streaming.api.collector.CollectorWrapper.collect(CollectorWrapper.java:40)
>     at org.apache.flink.streaming.api.operators.windowing.StreamDiscretizer.emitWindow(StreamDiscretizer.java:133)
>     at org.apache.flink.streaming.api.operators.windowing.StreamDiscretizer.triggerOnFakeElement(StreamDiscretizer.java:121)
>     at org.apache.flink.streaming.api.operators.windowing.StreamDiscretizer$WindowingCallback.sendFakeElement(StreamDiscretizer.java:194)
>     at org.apache.flink.streaming.api.windowing.policy.TimeTriggerPolicy.activeFakeElementEmission(TimeTriggerPolicy.java:117)
>     at org.apache.flink.streaming.api.windowing.policy.TimeTriggerPolicy$TimeCheck.run(TimeTriggerPolicy.java:144)
>     at java.lang.Thread.run(Thread.java:745)
> 13:28:58,794 ERROR org.apache.flink.streaming.runtime.tasks.OutputHandler        - Could not forward element to operator.
> java.lang.RuntimeException: java.lang.NullPointerException
>     at org.apache.flink.streaming.api.functions.sink.FileSinkFunction.flush(FileSinkFunction.java:108)
>     at org.apache.flink.streaming.api.functions.sink.FileSinkFunction.invoke(FileSinkFunction.java:65)
>     at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:35)
>     at org.apache.flink.streaming.runtime.tasks.OutputHandler$CopyingOperatorCollector.collect(OutputHandler.java:272)
>     at org.apache.flink.streaming.api.collector.CollectorWrapper.collect(CollectorWrapper.java:40)
>     at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:35)
>     at org.apache.flink.streaming.runtime.tasks.OutputHandler$CopyingOperatorCollector.collect(OutputHandler.java:272)
>     at org.apache.flink.streaming.api.collector.CollectorWrapper.collect(CollectorWrapper.java:40)
>     at org.apache.flink.streaming.api.operators.windowing.WindowFlattener.processElement(WindowFlattener.java:41)
>     at org.apache.flink.streaming.api.operators.windowing.WindowFlattener.processElement(WindowFlattener.java:28)
>     at org.apache.flink.streaming.runtime.tasks.OutputHandler$OperatorCollector.collect(OutputHandler.java:239)
>     at org.apache.flink.streaming.api.collector.CollectorWrapper.collect(CollectorWrapper.java:40)
>     at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:35)
>     at org.apache.flink.streaming.runtime.tasks.OutputHandler$OperatorCollector.collect(OutputHandler.java:239)
>     at org.apache.flink.streaming.api.collector.CollectorWrapper.collect(CollectorWrapper.java:40)
>     at org.apache.flink.streaming.api.operators.windowing.WindowMerger.processElement(WindowMerger.java:58)
>     at org.apache.flink.streaming.api.operators.windowing.WindowMerger.processElement(WindowMerger.java:32)
>     at org.apache.flink.streaming.runtime.tasks.OutputHandler$OperatorCollector.collect(OutputHandler.java:239)
>     at org.apache.flink.streaming.api.collector.CollectorWrapper.collect(CollectorWrapper.java:40)
>     at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:35)
>     at org.apache.flink.streaming.runtime.tasks.OutputHandler$OperatorCollector.collect(OutputHandler.java:239)
>     at org.apache.flink.streaming.api.collector.CollectorWrapper.collect(CollectorWrapper.java:40)
>     at org.apache.flink.streaming.api.operators.windowing.WindowPartitioner.processElement(WindowPartitioner.java:65)
>     at org.apache.flink.streaming.api.operators.windowing.WindowPartitioner.processElement(WindowPartitioner.java:29)
>     at org.apache.flink.streaming.runtime.tasks.OutputHandler$OperatorCollector.collect(OutputHandler.java:239)
>     at org.apache.flink.streaming.api.collector.CollectorWrapper.collect(CollectorWrapper.java:40)
>     at org.apache.flink.streaming.api.windowing.windowbuffer.BasicWindowBuffer.emitWindow(BasicWindowBuffer.java:43)
>     at org.apache.flink.streaming.api.operators.windowing.StreamWindowBuffer.handleWindowEvent(StreamWindowBuffer.java:55)
>     at org.apache.flink.streaming.api.operators.windowing.StreamWindowBuffer.handleWindowEvent(StreamWindowBuffer.java:60)
>     at org.apache.flink.streaming.api.operators.windowing.StreamWindowBuffer.processElement(StreamWindowBuffer.java:45)
>     at org.apache.flink.streaming.api.operators.windowing.StreamWindowBuffer.processElement(StreamWindowBuffer.java:29)
>     at org.apache.flink.streaming.runtime.tasks.OutputHandler$OperatorCollector.collect(OutputHandler.java:239)
>     at org.apache.flink.streaming.api.collector.CollectorWrapper.collect(CollectorWrapper.java:40)
>     at org.apache.flink.streaming.api.operators.windowing.StreamDiscretizer.emitWindow(StreamDiscretizer.java:133)
>     at org.apache.flink.streaming.api.operators.windowing.StreamDiscretizer.triggerOnFakeElement(StreamDiscretizer.java:121)
>     at org.apache.flink.streaming.api.operators.windowing.StreamDiscretizer$WindowingCallback.sendFakeElement(StreamDiscretizer.java:194)
>     at org.apache.flink.streaming.api.windowing.policy.TimeTriggerPolicy.activeFakeElementEmission(TimeTriggerPolicy.java:117)
>     at org.apache.flink.streaming.api.windowing.policy.TimeTriggerPolicy$TimeCheck.run(TimeTriggerPolicy.java:144)
>     at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.NullPointerException
>     at org.apache.flink.api.java.io.TextOutputFormat.writeRecord(TextOutputFormat.java:93)
>     at org.apache.flink.streaming.api.functions.sink.FileSinkFunction.flush(FileSinkFunction.java:94)
>     ... 38 more
> [...]
>
>
>
> Best Regards,
> Philipp
>

Reply | Threaded
Open this post in threaded view
|

Re: NullPointerException when working with Windows

Aljoscha Krettek
Hi,
no, this is unfortunately not fixed in the current master.

Cheers,
Aljoscha

On Tue, 28 Jul 2015 at 15:29 Ufuk Celebi <[hidden email]> wrote:
Hey Phillip,

thanks for reporting the problem. I think your assessment is correct. If the program is already finished, the threads throwing the Exceptions should have been cleaned up as well.

I am not sure, but I think parts of the system touching this mechanism could have been reworked by Aljoscha in the current master branch. Is it possible for you to try it out? If yes, it would be great to know if it is fixed there. As far as I know, there were no API breaking changes in the meantime.

@Aljoscha: do you think this is fixed with your latest changes in 0.10-SNAPSHOT?

– Ufuk

On 28 Jul 2015, at 14:02, Philipp Goetze <[hidden email]> wrote:

> Hey community,
>
> I am not sure whether it is a bug or I am doing something wrong. I have a little snippet produced by our project (see below). When I execute it in Eclipse everything works fine. However, when deploying the Jar to the local flink installation I get NullPointer Exceptions after the program had already finished. I found out that it happens exactly after the time of the window trigger elapsed (10 seconds in this example). So it seems that there is still a thread running, although the program has already finished. I guess the thread does not get anymore input since the file was completely read already and thus produces NullPointer Exceptions when trying to write these null elements. But I think you know more about this.
>
> FYI: I am using Flink-0.9.0-rc4 built with Scala 2.11
>
> So here the code:
>
> import org.apache.flink.streaming.api.scala._
> import dbis.flink._
> import java.util.concurrent.TimeUnit
> import org.apache.flink.streaming.api.windowing.helper._
> import org.apache.flink.util.Collector
>
>
> object windowCount {
>
>   def customgrpdMap(ts: Iterable[List[Any]], out: Collector[List[Any]]) = {
>     out.collect(ts.groupBy(t => t(0)).flatMap(x => List(x._1,x._2)).toList)
>   }
>
>   def customcntdMap(ts: Iterable[List[Any]], out: Collector[List[Any]]) = {
>     ts.foreach { t => out.collect(List(t(0),PigFuncs.count(t(1).asInstanceOf[Seq[Any]])))}
>   }
>
>   def tuplecntdToString(t: List[Any]): String = {
>     implicit def anyToSeq(a: Any) = a.asInstanceOf[Seq[Any]]
>
>     val sb = new StringBuilder
>     sb.append(t(0))
>     .append(",")
>     .append(t(1))
>     sb.toString
>   }
>
>   def main(args: Array[String]) {
>     val env = StreamExecutionEnvironment.getExecutionEnvironment
>     val input = PigStorage().load(env, "src/it/resources/mary.txt")
>     val words = input.flatMap(t => PigFuncs.tokenize(t(0).toString)).map(t => List(t))
>
>     val win = words.window(Time.of(10, TimeUnit.SECONDS)).every(Time.of(10, TimeUnit.SECONDS))
>     val grpd = win.groupBy(t => t(0)).mapWindow(customgrpdMap _)
>     val cntd = grpd.mapWindow(customcntdMap _).flatten()
>
>     cntd.map(t => tuplecntdToString(t)).writeAsText("marycounts.out")
>     env.execute("Starting Query")
>   }
> }
>
> And here the log output:
>
> Exception in thread "Thread-32" java.lang.RuntimeException: java.lang.RuntimeException: java.lang.RuntimeException: java.lang.RuntimeException: java.lang.RuntimeException: java.lang.RuntimeException: java.lang.RuntimeException: java.lang.RuntimeException: java.lang.RuntimeException: java.lang.NullPointerException
>     at org.apache.flink.streaming.runtime.tasks.OutputHandler$OperatorCollector.collect(OutputHandler.java:244)
>     at org.apache.flink.streaming.api.collector.CollectorWrapper.collect(CollectorWrapper.java:40)
>     at org.apache.flink.streaming.api.operators.windowing.StreamDiscretizer.emitWindow(StreamDiscretizer.java:133)
>     at org.apache.flink.streaming.api.operators.windowing.StreamDiscretizer.triggerOnFakeElement(StreamDiscretizer.java:121)
>     at org.apache.flink.streaming.api.operators.windowing.StreamDiscretizer$WindowingCallback.sendFakeElement(StreamDiscretizer.java:194)
>     at org.apache.flink.streaming.api.windowing.policy.TimeTriggerPolicy.activeFakeElementEmission(TimeTriggerPolicy.java:117)
>     at org.apache.flink.streaming.api.windowing.policy.TimeTriggerPolicy$TimeCheck.run(TimeTriggerPolicy.java:144)
>     at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.RuntimeException: java.lang.RuntimeException: java.lang.RuntimeException: java.lang.RuntimeException: java.lang.RuntimeException: java.lang.RuntimeException: java.lang.RuntimeException: java.lang.RuntimeException: java.lang.NullPointerException
>     at org.apache.flink.streaming.runtime.tasks.OutputHandler$OperatorCollector.collect(OutputHandler.java:244)
>     at org.apache.flink.streaming.api.collector.CollectorWrapper.collect(CollectorWrapper.java:40)
>     at org.apache.flink.streaming.api.windowing.windowbuffer.BasicWindowBuffer.emitWindow(BasicWindowBuffer.java:43)
>     at org.apache.flink.streaming.api.operators.windowing.StreamWindowBuffer.handleWindowEvent(StreamWindowBuffer.java:55)
>     at org.apache.flink.streaming.api.operators.windowing.StreamWindowBuffer.handleWindowEvent(StreamWindowBuffer.java:60)
>     at org.apache.flink.streaming.api.operators.windowing.StreamWindowBuffer.processElement(StreamWindowBuffer.java:45)
>     at org.apache.flink.streaming.api.operators.windowing.StreamWindowBuffer.processElement(StreamWindowBuffer.java:29)
>     at org.apache.flink.streaming.runtime.tasks.OutputHandler$OperatorCollector.collect(OutputHandler.java:239)
>     ... 7 more
> Caused by: java.lang.RuntimeException: java.lang.RuntimeException: java.lang.RuntimeException: java.lang.RuntimeException: java.lang.RuntimeException: java.lang.RuntimeException: java.lang.RuntimeException: java.lang.NullPointerException
>     at org.apache.flink.streaming.runtime.tasks.OutputHandler$OperatorCollector.collect(OutputHandler.java:244)
>     at org.apache.flink.streaming.api.collector.CollectorWrapper.collect(CollectorWrapper.java:40)
>     at org.apache.flink.streaming.api.operators.windowing.WindowPartitioner.processElement(WindowPartitioner.java:65)
>     at org.apache.flink.streaming.api.operators.windowing.WindowPartitioner.processElement(WindowPartitioner.java:29)
>     at org.apache.flink.streaming.runtime.tasks.OutputHandler$OperatorCollector.collect(OutputHandler.java:239)
>     ... 14 more
> Caused by: java.lang.RuntimeException: java.lang.RuntimeException: java.lang.RuntimeException: java.lang.RuntimeException: java.lang.RuntimeException: java.lang.RuntimeException: java.lang.NullPointerException
>     at org.apache.flink.streaming.runtime.tasks.OutputHandler$OperatorCollector.collect(OutputHandler.java:244)
>     at org.apache.flink.streaming.api.collector.CollectorWrapper.collect(CollectorWrapper.java:40)
>     at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:35)
>     at org.apache.flink.streaming.runtime.tasks.OutputHandler$OperatorCollector.collect(OutputHandler.java:239)
>     ... 18 more
> Caused by: java.lang.RuntimeException: java.lang.RuntimeException: java.lang.RuntimeException: java.lang.RuntimeException: java.lang.RuntimeException: java.lang.NullPointerException
>     at org.apache.flink.streaming.runtime.tasks.OutputHandler$OperatorCollector.collect(OutputHandler.java:244)
>     at org.apache.flink.streaming.api.collector.CollectorWrapper.collect(CollectorWrapper.java:40)
>     at org.apache.flink.streaming.api.operators.windowing.WindowMerger.processElement(WindowMerger.java:58)
>     at org.apache.flink.streaming.api.operators.windowing.WindowMerger.processElement(WindowMerger.java:32)
>     at org.apache.flink.streaming.runtime.tasks.OutputHandler$OperatorCollector.collect(OutputHandler.java:239)
>     ... 21 more
> Caused by: java.lang.RuntimeException: java.lang.RuntimeException: java.lang.RuntimeException: java.lang.RuntimeException: java.lang.NullPointerException
>     at org.apache.flink.streaming.runtime.tasks.OutputHandler$OperatorCollector.collect(OutputHandler.java:244)
>     at org.apache.flink.streaming.api.collector.CollectorWrapper.collect(CollectorWrapper.java:40)
>     at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:35)
>     at org.apache.flink.streaming.runtime.tasks.OutputHandler$OperatorCollector.collect(OutputHandler.java:239)
>     ... 25 more
> Caused by: java.lang.RuntimeException: java.lang.RuntimeException: java.lang.RuntimeException: java.lang.NullPointerException
>     at org.apache.flink.streaming.runtime.tasks.OutputHandler$CopyingOperatorCollector.collect(OutputHandler.java:277)
>     at org.apache.flink.streaming.api.collector.CollectorWrapper.collect(CollectorWrapper.java:40)
>     at org.apache.flink.streaming.api.operators.windowing.WindowFlattener.processElement(WindowFlattener.java:41)
>     at org.apache.flink.streaming.api.operators.windowing.WindowFlattener.processElement(WindowFlattener.java:28)
>     at org.apache.flink.streaming.runtime.tasks.OutputHandler$OperatorCollector.collect(OutputHandler.java:239)
>     ... 28 more
> Caused by: java.lang.RuntimeException: java.lang.RuntimeException: java.lang.NullPointerException
>     at org.apache.flink.streaming.runtime.tasks.OutputHandler$CopyingOperatorCollector.collect(OutputHandler.java:277)
>     at org.apache.flink.streaming.api.collector.CollectorWrapper.collect(CollectorWrapper.java:40)
>     at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:35)
>     at org.apache.flink.streaming.runtime.tasks.OutputHandler$CopyingOperatorCollector.collect(OutputHandler.java:272)
>     ... 32 more
> Caused by: java.lang.RuntimeException: java.lang.NullPointerException
>     at org.apache.flink.streaming.api.functions.sink.FileSinkFunction.flush(FileSinkFunction.java:108)
>     at org.apache.flink.streaming.api.functions.sink.FileSinkFunction.invoke(FileSinkFunction.java:65)
>     at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:35)
>     at org.apache.flink.streaming.runtime.tasks.OutputHandler$CopyingOperatorCollector.collect(OutputHandler.java:272)
>     ... 35 more
> Caused by: java.lang.NullPointerException
>     at org.apache.flink.api.java.io.TextOutputFormat.writeRecord(TextOutputFormat.java:93)
>     at org.apache.flink.streaming.api.functions.sink.FileSinkFunction.flush(FileSinkFunction.java:94)
>     ... 38 more
>
> And a part of the .out - File:
>
> 13:28:48,696 INFO  org.apache.flink.runtime.jobmanager.JobManager                - Received job bd1f75cb1db031b11794bf5e1123fd9a (Starting Query).
> 13:28:48,698 INFO  org.apache.flink.runtime.jobmanager.JobManager                - Scheduling job Starting Query.
> 13:28:48,698 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Read Text File Source -> Map -> Flat Map -> Map -> StreamDiscretizer -> (BasicWindowBuffer, BasicWindowBuffer, BasicWindowBuffer -> Window partitioner -> Window Map -> Window Merger -> Window Map -> Window Flatten -> Map -> Stream Sink) (1/1) (ff2bb914c620859de94262af78ac9269) switched from CREATED to SCHEDULED
> 13:28:48,698 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Read Text File Source -> Map -> Flat Map -> Map -> StreamDiscretizer -> (BasicWindowBuffer, BasicWindowBuffer, BasicWindowBuffer -> Window partitioner -> Window Map -> Window Merger -> Window Map -> Window Flatten -> Map -> Stream Sink) (1/1) (ff2bb914c620859de94262af78ac9269) switched from SCHEDULED to DEPLOYING
> 13:28:48,698 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Deploying Read Text File Source -> Map -> Flat Map -> Map -> StreamDiscretizer -> (BasicWindowBuffer, BasicWindowBuffer, BasicWindowBuffer -> Window partitioner -> Window Map -> Window Merger -> Window Map -> Window Flatten -> Map -> Stream Sink) (1/1) (attempt #0) to localhost
> 13:28:48,699 INFO  org.apache.flink.runtime.taskmanager.TaskManager              - Received task Read Text File Source -> Map -> Flat Map -> Map -> StreamDiscretizer -> (BasicWindowBuffer, BasicWindowBuffer, BasicWindowBuffer -> Window partitioner -> Window Map -> Window Merger -> Window Map -> Window Flatten -> Map -> Stream Sink) (1/1)
> 13:28:48,699 INFO  org.apache.flink.runtime.jobmanager.JobManager                - Status of job bd1f75cb1db031b11794bf5e1123fd9a (Starting Query) changed to RUNNING.
> 13:28:48,705 INFO  org.apache.flink.runtime.taskmanager.Task                     - Loading JAR files for task Read Text File Source -> Map -> Flat Map -> Map -> StreamDiscretizer -> (BasicWindowBuffer, BasicWindowBuffer, BasicWindowBuffer -> Window partitioner -> Window Map -> Window Merger -> Window Map -> Window Flatten -> Map -> Stream Sink) (1/1)
> 13:28:48,707 INFO  org.apache.flink.runtime.blob.BlobCache                       - Downloading f2612fe1d4aadc5206820be652dfa1019a66007c from localhost/127.0.0.1:47210
> 13:28:48,709 INFO  org.apache.flink.runtime.taskmanager.Task                     - Registering task at network: Read Text File Source -> Map -> Flat Map -> Map -> StreamDiscretizer -> (BasicWindowBuffer, BasicWindowBuffer, BasicWindowBuffer -> Window partitioner -> Window Map -> Window Merger -> Window Map -> Window Flatten -> Map -> Stream Sink) (1/1) [DEPLOYING]
> 13:28:48,709 INFO  org.apache.flink.streaming.runtime.tasks.StreamTask           - State backend for state checkpoints is set to jobmanager.
> 13:28:48,759 INFO  org.apache.flink.runtime.taskmanager.Task                     - Read Text File Source -> Map -> Flat Map -> Map -> StreamDiscretizer -> (BasicWindowBuffer, BasicWindowBuffer, BasicWindowBuffer -> Window partitioner -> Window Map -> Window Merger -> Window Map -> Window Flatten -> Map -> Stream Sink) (1/1) switched to RUNNING
> 13:28:48,759 INFO  org.apache.flink.api.common.io.LocatableInputSplitAssigner    - Assigning remote split to host localhost
> 13:28:48,759 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Read Text File Source -> Map -> Flat Map -> Map -> StreamDiscretizer -> (BasicWindowBuffer, BasicWindowBuffer, BasicWindowBuffer -> Window partitioner -> Window Map -> Window Merger -> Window Map -> Window Flatten -> Map -> Stream Sink) (1/1) (ff2bb914c620859de94262af78ac9269) switched from DEPLOYING to RUNNING
> 13:28:48,786 INFO  org.apache.flink.runtime.taskmanager.Task                     - Read Text File Source -> Map -> Flat Map -> Map -> StreamDiscretizer -> (BasicWindowBuffer, BasicWindowBuffer, BasicWindowBuffer -> Window partitioner -> Window Map -> Window Merger -> Window Map -> Window Flatten -> Map -> Stream Sink) (1/1) switched to FINISHED
> 13:28:48,786 INFO  org.apache.flink.runtime.taskmanager.Task                     - Freeing task resources for Read Text File Source -> Map -> Flat Map -> Map -> StreamDiscretizer -> (BasicWindowBuffer, BasicWindowBuffer, BasicWindowBuffer -> Window partitioner -> Window Map -> Window Merger -> Window Map -> Window Flatten -> Map -> Stream Sink) (1/1)
> 13:28:48,786 INFO  org.apache.flink.runtime.taskmanager.TaskManager              - Unregistering task and sending final execution state FINISHED to JobManager for task Read Text File Source -> Map -> Flat Map -> Map -> StreamDiscretizer -> (BasicWindowBuffer, BasicWindowBuffer, BasicWindowBuffer -> Window partitioner -> Window Map -> Window Merger -> Window Map -> Window Flatten -> Map -> Stream Sink) (ff2bb914c620859de94262af78ac9269)
> 13:28:48,787 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Read Text File Source -> Map -> Flat Map -> Map -> StreamDiscretizer -> (BasicWindowBuffer, BasicWindowBuffer, BasicWindowBuffer -> Window partitioner -> Window Map -> Window Merger -> Window Map -> Window Flatten -> Map -> Stream Sink) (1/1) (ff2bb914c620859de94262af78ac9269) switched from RUNNING to FINISHED
> 13:28:48,787 INFO  org.apache.flink.runtime.jobmanager.JobManager                - Status of job bd1f75cb1db031b11794bf5e1123fd9a (Starting Query) changed to FINISHED.
> 13:28:58,793 ERROR org.apache.flink.streaming.api.functions.sink.FileSinkFunction  - Error while writing element.
> java.lang.NullPointerException
>     at org.apache.flink.api.java.io.TextOutputFormat.writeRecord(TextOutputFormat.java:93)
>     at org.apache.flink.streaming.api.functions.sink.FileSinkFunction.flush(FileSinkFunction.java:94)
>     at org.apache.flink.streaming.api.functions.sink.FileSinkFunction.invoke(FileSinkFunction.java:65)
>     at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:35)
>     at org.apache.flink.streaming.runtime.tasks.OutputHandler$CopyingOperatorCollector.collect(OutputHandler.java:272)
>     at org.apache.flink.streaming.api.collector.CollectorWrapper.collect(CollectorWrapper.java:40)
>     at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:35)
>     at org.apache.flink.streaming.runtime.tasks.OutputHandler$CopyingOperatorCollector.collect(OutputHandler.java:272)
>     at org.apache.flink.streaming.api.collector.CollectorWrapper.collect(CollectorWrapper.java:40)
>     at org.apache.flink.streaming.api.operators.windowing.WindowFlattener.processElement(WindowFlattener.java:41)
>     at org.apache.flink.streaming.api.operators.windowing.WindowFlattener.processElement(WindowFlattener.java:28)
>     at org.apache.flink.streaming.runtime.tasks.OutputHandler$OperatorCollector.collect(OutputHandler.java:239)
>     at org.apache.flink.streaming.api.collector.CollectorWrapper.collect(CollectorWrapper.java:40)
>     at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:35)
>     at org.apache.flink.streaming.runtime.tasks.OutputHandler$OperatorCollector.collect(OutputHandler.java:239)
>     at org.apache.flink.streaming.api.collector.CollectorWrapper.collect(CollectorWrapper.java:40)
>     at org.apache.flink.streaming.api.operators.windowing.WindowMerger.processElement(WindowMerger.java:58)
>     at org.apache.flink.streaming.api.operators.windowing.WindowMerger.processElement(WindowMerger.java:32)
>     at org.apache.flink.streaming.runtime.tasks.OutputHandler$OperatorCollector.collect(OutputHandler.java:239)
>     at org.apache.flink.streaming.api.collector.CollectorWrapper.collect(CollectorWrapper.java:40)
>     at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:35)
>     at org.apache.flink.streaming.runtime.tasks.OutputHandler$OperatorCollector.collect(OutputHandler.java:239)
>     at org.apache.flink.streaming.api.collector.CollectorWrapper.collect(CollectorWrapper.java:40)
>     at org.apache.flink.streaming.api.operators.windowing.WindowPartitioner.processElement(WindowPartitioner.java:65)
>     at org.apache.flink.streaming.api.operators.windowing.WindowPartitioner.processElement(WindowPartitioner.java:29)
>     at org.apache.flink.streaming.runtime.tasks.OutputHandler$OperatorCollector.collect(OutputHandler.java:239)
>     at org.apache.flink.streaming.api.collector.CollectorWrapper.collect(CollectorWrapper.java:40)
>     at org.apache.flink.streaming.api.windowing.windowbuffer.BasicWindowBuffer.emitWindow(BasicWindowBuffer.java:43)
>     at org.apache.flink.streaming.api.operators.windowing.StreamWindowBuffer.handleWindowEvent(StreamWindowBuffer.java:55)
>     at org.apache.flink.streaming.api.operators.windowing.StreamWindowBuffer.handleWindowEvent(StreamWindowBuffer.java:60)
>     at org.apache.flink.streaming.api.operators.windowing.StreamWindowBuffer.processElement(StreamWindowBuffer.java:45)
>     at org.apache.flink.streaming.api.operators.windowing.StreamWindowBuffer.processElement(StreamWindowBuffer.java:29)
>     at org.apache.flink.streaming.runtime.tasks.OutputHandler$OperatorCollector.collect(OutputHandler.java:239)
>     at org.apache.flink.streaming.api.collector.CollectorWrapper.collect(CollectorWrapper.java:40)
>     at org.apache.flink.streaming.api.operators.windowing.StreamDiscretizer.emitWindow(StreamDiscretizer.java:133)
>     at org.apache.flink.streaming.api.operators.windowing.StreamDiscretizer.triggerOnFakeElement(StreamDiscretizer.java:121)
>     at org.apache.flink.streaming.api.operators.windowing.StreamDiscretizer$WindowingCallback.sendFakeElement(StreamDiscretizer.java:194)
>     at org.apache.flink.streaming.api.windowing.policy.TimeTriggerPolicy.activeFakeElementEmission(TimeTriggerPolicy.java:117)
>     at org.apache.flink.streaming.api.windowing.policy.TimeTriggerPolicy$TimeCheck.run(TimeTriggerPolicy.java:144)
>     at java.lang.Thread.run(Thread.java:745)
> 13:28:58,794 ERROR org.apache.flink.streaming.runtime.tasks.OutputHandler        - Could not forward element to operator.
> java.lang.RuntimeException: java.lang.NullPointerException
>     at org.apache.flink.streaming.api.functions.sink.FileSinkFunction.flush(FileSinkFunction.java:108)
>     at org.apache.flink.streaming.api.functions.sink.FileSinkFunction.invoke(FileSinkFunction.java:65)
>     at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:35)
>     at org.apache.flink.streaming.runtime.tasks.OutputHandler$CopyingOperatorCollector.collect(OutputHandler.java:272)
>     at org.apache.flink.streaming.api.collector.CollectorWrapper.collect(CollectorWrapper.java:40)
>     at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:35)
>     at org.apache.flink.streaming.runtime.tasks.OutputHandler$CopyingOperatorCollector.collect(OutputHandler.java:272)
>     at org.apache.flink.streaming.api.collector.CollectorWrapper.collect(CollectorWrapper.java:40)
>     at org.apache.flink.streaming.api.operators.windowing.WindowFlattener.processElement(WindowFlattener.java:41)
>     at org.apache.flink.streaming.api.operators.windowing.WindowFlattener.processElement(WindowFlattener.java:28)
>     at org.apache.flink.streaming.runtime.tasks.OutputHandler$OperatorCollector.collect(OutputHandler.java:239)
>     at org.apache.flink.streaming.api.collector.CollectorWrapper.collect(CollectorWrapper.java:40)
>     at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:35)
>     at org.apache.flink.streaming.runtime.tasks.OutputHandler$OperatorCollector.collect(OutputHandler.java:239)
>     at org.apache.flink.streaming.api.collector.CollectorWrapper.collect(CollectorWrapper.java:40)
>     at org.apache.flink.streaming.api.operators.windowing.WindowMerger.processElement(WindowMerger.java:58)
>     at org.apache.flink.streaming.api.operators.windowing.WindowMerger.processElement(WindowMerger.java:32)
>     at org.apache.flink.streaming.runtime.tasks.OutputHandler$OperatorCollector.collect(OutputHandler.java:239)
>     at org.apache.flink.streaming.api.collector.CollectorWrapper.collect(CollectorWrapper.java:40)
>     at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:35)
>     at org.apache.flink.streaming.runtime.tasks.OutputHandler$OperatorCollector.collect(OutputHandler.java:239)
>     at org.apache.flink.streaming.api.collector.CollectorWrapper.collect(CollectorWrapper.java:40)
>     at org.apache.flink.streaming.api.operators.windowing.WindowPartitioner.processElement(WindowPartitioner.java:65)
>     at org.apache.flink.streaming.api.operators.windowing.WindowPartitioner.processElement(WindowPartitioner.java:29)
>     at org.apache.flink.streaming.runtime.tasks.OutputHandler$OperatorCollector.collect(OutputHandler.java:239)
>     at org.apache.flink.streaming.api.collector.CollectorWrapper.collect(CollectorWrapper.java:40)
>     at org.apache.flink.streaming.api.windowing.windowbuffer.BasicWindowBuffer.emitWindow(BasicWindowBuffer.java:43)
>     at org.apache.flink.streaming.api.operators.windowing.StreamWindowBuffer.handleWindowEvent(StreamWindowBuffer.java:55)
>     at org.apache.flink.streaming.api.operators.windowing.StreamWindowBuffer.handleWindowEvent(StreamWindowBuffer.java:60)
>     at org.apache.flink.streaming.api.operators.windowing.StreamWindowBuffer.processElement(StreamWindowBuffer.java:45)
>     at org.apache.flink.streaming.api.operators.windowing.StreamWindowBuffer.processElement(StreamWindowBuffer.java:29)
>     at org.apache.flink.streaming.runtime.tasks.OutputHandler$OperatorCollector.collect(OutputHandler.java:239)
>     at org.apache.flink.streaming.api.collector.CollectorWrapper.collect(CollectorWrapper.java:40)
>     at org.apache.flink.streaming.api.operators.windowing.StreamDiscretizer.emitWindow(StreamDiscretizer.java:133)
>     at org.apache.flink.streaming.api.operators.windowing.StreamDiscretizer.triggerOnFakeElement(StreamDiscretizer.java:121)
>     at org.apache.flink.streaming.api.operators.windowing.StreamDiscretizer$WindowingCallback.sendFakeElement(StreamDiscretizer.java:194)
>     at org.apache.flink.streaming.api.windowing.policy.TimeTriggerPolicy.activeFakeElementEmission(TimeTriggerPolicy.java:117)
>     at org.apache.flink.streaming.api.windowing.policy.TimeTriggerPolicy$TimeCheck.run(TimeTriggerPolicy.java:144)
>     at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.NullPointerException
>     at org.apache.flink.api.java.io.TextOutputFormat.writeRecord(TextOutputFormat.java:93)
>     at org.apache.flink.streaming.api.functions.sink.FileSinkFunction.flush(FileSinkFunction.java:94)
>     ... 38 more
> [...]
>
>
>
> Best Regards,
> Philipp
>

Reply | Threaded
Open this post in threaded view
|

Re: NullPointerException when working with Windows

Philipp Goetze
In reply to this post by Ufuk Celebi
Hey Ufuk,

I tried it with the newest 0.10-SNAPSHOT, but it leads to the same
NullPointers. By the way: I have not touched the configuration and thus
the parallelism is at 1.

But taking the response by Aljoscha, this is expected.

Best Regards,
Philipp


On 28.07.2015 15:29, Ufuk Celebi wrote:

> Hey Phillip,
>
> thanks for reporting the problem. I think your assessment is correct. If the program is already finished, the threads throwing the Exceptions should have been cleaned up as well.
>
> I am not sure, but I think parts of the system touching this mechanism could have been reworked by Aljoscha in the current master branch. Is it possible for you to try it out? If yes, it would be great to know if it is fixed there. As far as I know, there were no API breaking changes in the meantime.
>
> @Aljoscha: do you think this is fixed with your latest changes in 0.10-SNAPSHOT?
>
> – Ufuk
>
> On 28 Jul 2015, at 14:02, Philipp Goetze <[hidden email]> wrote: