Re: NPE with Flink Streaming from Kafka

Posted by rmetzger0 on
URL: http://deprecated-apache-flink-user-mailing-list-archive.369.s1.nabble.com/NPE-with-Flink-Streaming-from-Kafka-tp3821p3823.html

Hi Mihail,

the issue is actually a bug in Kafka. We have a JIRA in Flink for this as well: https://issues.apache.org/jira/browse/FLINK-3067

Sadly, we haven't released a fix for it yet. Flink 0.10.2 will contain a fix.

Since the kafka connector is not contained in the flink binary, you can just set the version in your maven pom file to 0.10-SNAPSHOT. Maven will then download the code planned for the 0.10-SNAPSHOT release.

On Tue, Dec 1, 2015 at 4:54 PM, Vieru, Mihail <[hidden email]> wrote:
Hi,

we get the following NullPointerException after ~50 minutes when running a streaming job with windowing and state that reads data from Kafka and writes the result to local FS.
There are around 170 million messages to be processed, Flink 0.10.1 stops at ~8 million.
Flink runs locally, started with the "start-cluster-streaming.sh" script.

12/01/2015 15:06:24    Job execution switched to status RUNNING.
12/01/2015 15:06:24    Source: Custom Source -> Map -> Map(1/1) switched to SCHEDULED
12/01/2015 15:06:24    Source: Custom Source -> Map -> Map(1/1) switched to DEPLOYING
12/01/2015 15:06:24    Fast TumblingTimeWindows(5000) of Reduce at main(ItemPriceAvgPerOrder.java:108) -> Sink: Unnamed(1/1) switched to SCHEDULED
12/01/2015 15:06:24    Fast TumblingTimeWindows(5000) of Reduce at main(ItemPriceAvgPerOrder.java:108) -> Sink: Unnamed(1/1) switched to DEPLOYING
12/01/2015 15:06:24    Source: Custom Source -> Map -> Map(1/1) switched to RUNNING
12/01/2015 15:06:24    Fast TumblingTimeWindows(5000) of Reduce at main(ItemPriceAvgPerOrder.java:108) -> Sink: Unnamed(1/1) switched to RUNNING
12/01/2015 15:56:08    Fast TumblingTimeWindows(5000) of Reduce at main(ItemPriceAvgPerOrder.java:108) -> Sink: Unnamed(1/1) switched to CANCELED
12/01/2015 15:56:08    Source: Custom Source -> Map -> Map(1/1) switched to FAILED
java.lang.Exception
    at org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher.run(LegacyFetcher.java:242)
    at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.run(FlinkKafkaConsumer.java:397)
    at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:58)
    at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:55)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:218)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
    at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.NullPointerException
    at org.I0Itec.zkclient.ZkConnection.writeDataReturnStat(ZkConnection.java:115)
    at org.I0Itec.zkclient.ZkClient$10.call(ZkClient.java:817)
    at org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:675)
    at org.I0Itec.zkclient.ZkClient.writeDataReturnStat(ZkClient.java:813)
    at org.I0Itec.zkclient.ZkClient.writeData(ZkClient.java:808)
    at org.I0Itec.zkclient.ZkClient.writeData(ZkClient.java:777)
    at kafka.utils.ZkUtils$.updatePersistentPath(ZkUtils.scala:332)
    at kafka.utils.ZkUtils.updatePersistentPath(ZkUtils.scala)
    at org.apache.flink.streaming.connectors.kafka.internals.ZookeeperOffsetHandler.setOffsetInZooKeeper(ZookeeperOffsetHandler.java:112)
    at org.apache.flink.streaming.connectors.kafka.internals.ZookeeperOffsetHandler.commit(ZookeeperOffsetHandler.java:80)
    at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer$PeriodicOffsetCommitter.run(FlinkKafkaConsumer.java:632)


Any ideas on what could cause this behaviour?

Best,
Mihail