MihailHi,we get the following NullPointerException after ~50 minutes when running a streaming job with windowing and state that reads data from Kafka and writes the result to local FS.There are around 170 million messages to be processed, Flink 0.10.1 stops at ~8 million.Flink runs locally, started with the "start-cluster-streaming.sh" script.
12/01/2015 15:06:24 Job execution switched to status RUNNING.
12/01/2015 15:06:24 Source: Custom Source -> Map -> Map(1/1) switched to SCHEDULED
12/01/2015 15:06:24 Source: Custom Source -> Map -> Map(1/1) switched to DEPLOYING
12/01/2015 15:06:24 Fast TumblingTimeWindows(5000) of Reduce at main(ItemPriceAvgPerOrder.java:108) -> Sink: Unnamed(1/1) switched to SCHEDULED
12/01/2015 15:06:24 Fast TumblingTimeWindows(5000) of Reduce at main(ItemPriceAvgPerOrder.java:108) -> Sink: Unnamed(1/1) switched to DEPLOYING
12/01/2015 15:06:24 Source: Custom Source -> Map -> Map(1/1) switched to RUNNING
12/01/2015 15:06:24 Fast TumblingTimeWindows(5000) of Reduce at main(ItemPriceAvgPerOrder.java:108) -> Sink: Unnamed(1/1) switched to RUNNING
12/01/2015 15:56:08 Fast TumblingTimeWindows(5000) of Reduce at main(ItemPriceAvgPerOrder.java:108) -> Sink: Unnamed(1/1) switched to CANCELED
12/01/2015 15:56:08 Source: Custom Source -> Map -> Map(1/1) switched to FAILED
java.lang.Exception
at org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher.run(LegacyFetcher.java:242)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.run(FlinkKafkaConsumer.java:397)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:58)
at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:55)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:218)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.NullPointerException
at org.I0Itec.zkclient.ZkConnection.writeDataReturnStat(ZkConnection.java:115)
at org.I0Itec.zkclient.ZkClient$10.call(ZkClient.java:817)
at org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:675)
at org.I0Itec.zkclient.ZkClient.writeDataReturnStat(ZkClient.java:813)
at org.I0Itec.zkclient.ZkClient.writeData(ZkClient.java:808)
at org.I0Itec.zkclient.ZkClient.writeData(ZkClient.java:777)
at kafka.utils.ZkUtils$.updatePersistentPath(ZkUtils.scala:332)
at kafka.utils.ZkUtils.updatePersistentPath(ZkUtils.scala)
at org.apache.flink.streaming.connectors.kafka.internals.ZookeeperOffsetHandler.setOffsetInZooKeeper(ZookeeperOffsetHandler.java:112)
at org.apache.flink.streaming.connectors.kafka.internals.ZookeeperOffsetHandler.commit(ZookeeperOffsetHandler.java:80)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer$PeriodicOffsetCommitter.run(FlinkKafkaConsumer.java:632)Any ideas on what could cause this behaviour?Best,
Free forum by Nabble | Edit this page |