NPE with Flink Streaming from Kafka
Posted by
Vieru, Mihail on
URL: http://deprecated-apache-flink-user-mailing-list-archive.369.s1.nabble.com/NPE-with-Flink-Streaming-from-Kafka-tp3821.html
Hi,
we get the following NullPointerException after ~50 minutes when running a streaming job with windowing and state that reads data from Kafka and writes the result to local FS.
There are around 170 million messages to be processed, Flink 0.10.1 stops at ~8 million.
Flink runs locally, started with the "start-cluster-streaming.sh" script.
12/01/2015 15:06:24 Job execution switched to status RUNNING.
12/01/2015 15:06:24 Source: Custom Source -> Map -> Map(1/1) switched to SCHEDULED
12/01/2015 15:06:24 Source: Custom Source -> Map -> Map(1/1) switched to DEPLOYING
12/01/2015 15:06:24 Fast TumblingTimeWindows(5000) of Reduce at main(ItemPriceAvgPerOrder.java:108) -> Sink: Unnamed(1/1) switched to SCHEDULED
12/01/2015 15:06:24 Fast TumblingTimeWindows(5000) of Reduce at main(ItemPriceAvgPerOrder.java:108) -> Sink: Unnamed(1/1) switched to DEPLOYING
12/01/2015 15:06:24 Source: Custom Source -> Map -> Map(1/1) switched to RUNNING
12/01/2015 15:06:24 Fast TumblingTimeWindows(5000) of Reduce at main(ItemPriceAvgPerOrder.java:108) -> Sink: Unnamed(1/1) switched to RUNNING
12/01/2015 15:56:08 Fast TumblingTimeWindows(5000) of Reduce at main(ItemPriceAvgPerOrder.java:108) -> Sink: Unnamed(1/1) switched to CANCELED
12/01/2015 15:56:08 Source: Custom Source -> Map -> Map(1/1) switched to FAILED
java.lang.Exception
at org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher.run(LegacyFetcher.java:242)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.run(FlinkKafkaConsumer.java:397)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:58)
at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:55)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:218)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.NullPointerException
at org.I0Itec.zkclient.ZkConnection.writeDataReturnStat(ZkConnection.java:115)
at org.I0Itec.zkclient.ZkClient$10.call(ZkClient.java:817)
at org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:675)
at org.I0Itec.zkclient.ZkClient.writeDataReturnStat(ZkClient.java:813)
at org.I0Itec.zkclient.ZkClient.writeData(ZkClient.java:808)
at org.I0Itec.zkclient.ZkClient.writeData(ZkClient.java:777)
at kafka.utils.ZkUtils$.updatePersistentPath(ZkUtils.scala:332)
at kafka.utils.ZkUtils.updatePersistentPath(ZkUtils.scala)
at org.apache.flink.streaming.connectors.kafka.internals.ZookeeperOffsetHandler.setOffsetInZooKeeper(ZookeeperOffsetHandler.java:112)
at org.apache.flink.streaming.connectors.kafka.internals.ZookeeperOffsetHandler.commit(ZookeeperOffsetHandler.java:80)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer$PeriodicOffsetCommitter.run(FlinkKafkaConsumer.java:632)
Any ideas on what could cause this behaviour?
Best,
Mihail