Hello,
When I try to run a storm topology with a Kafka Spout on top of Flink, I get an NPE at: 15:00:32,853 ERROR org.apache.flink.streaming.runtime.tasks.StreamTask - Error closing stream operators after an exception. java.lang.NullPointerException at storm.kafka.KafkaSpout.close(KafkaSpout.java:130) at org.apache.flink.stormcompatibility.wrappers.AbstractStormSpoutWrapper.close(AbstractStormSpoutWrapper.java:128) at org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:40) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.close(AbstractUdfStreamOperator.java:75) at org.apache.flink.streaming.runtime.tasks.StreamTask.closeAllOperators(StreamTask.java:243) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:197) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:581) at java.lang.Thread.run(Thread.java:745) 15:00:32,855 INFO org.apache.flink.streaming.runtime.tasks.StreamTask - State backend for state checkpoints is set to jobmanager. 15:00:32,855 INFO org.apache.flink.runtime.taskmanager.Task - event_deserializer (5/5) switched to RUNNING 15:00:32,859 INFO org.apache.flink.runtime.taskmanager.Task - Source: ads (1/1) switched to FAILED with exception. java.lang.NullPointerException at java.util.HashMap.putMapEntries(HashMap.java:500) at java.util.HashMap.<init>(HashMap.java:489) at storm.kafka.KafkaSpout.open(KafkaSpout.java:73) at org.apache.flink.stormcompatibility.wrappers.AbstractStormSpoutWrapper.run(AbstractStormSpoutWrapper.java:102) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:57) at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:58) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:172) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:581) at java.lang.Thread.run(Thread.java:745) Has someone seen this before? or Have a fix? I am using 0.10beta1 for all storm packages and a 0.10-snapshot (latest compiled) for all flink packages. Sample of the kafka code I am using: Broker broker = new Broker("localhost", 9092); |
Hi Jerry, the issue occurs because Flink's storm compatibility layer does not support custom configuration parameters currently. There is this JIRA which aims to add the missing feature to Flink: https://issues.apache.org/jira/browse/FLINK-2525 Maybe (but its unlikely) passing an empty Map in the AbstractStormSpoutWrapper: this.spout.open(null,would fix the issue. But I suspect that the KafkaSpout needs some configuration parameters, so we have to wait for FLINK-2525. Best, Robert On Wed, Sep 2, 2015 at 7:58 PM, Jerry Peng <[hidden email]> wrote:
|
Free forum by Nabble | Edit this page |