NPE thrown when using Storm Kafka Spout with Flink

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

NPE thrown when using Storm Kafka Spout with Flink

Jerry Peng
Hello,

When I try to run a storm topology with a Kafka Spout on top of Flink, I get an NPE at:

15:00:32,853 ERROR org.apache.flink.streaming.runtime.tasks.StreamTask           - Error closing stream operators after an exception.

java.lang.NullPointerException

at storm.kafka.KafkaSpout.close(KafkaSpout.java:130)

at org.apache.flink.stormcompatibility.wrappers.AbstractStormSpoutWrapper.close(AbstractStormSpoutWrapper.java:128)

at org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:40)

at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.close(AbstractUdfStreamOperator.java:75)

at org.apache.flink.streaming.runtime.tasks.StreamTask.closeAllOperators(StreamTask.java:243)

at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:197)

at org.apache.flink.runtime.taskmanager.Task.run(Task.java:581)

at java.lang.Thread.run(Thread.java:745)

15:00:32,855 INFO  org.apache.flink.streaming.runtime.tasks.StreamTask           - State backend for state checkpoints is set to jobmanager.

15:00:32,855 INFO  org.apache.flink.runtime.taskmanager.Task                     - event_deserializer (5/5) switched to RUNNING

15:00:32,859 INFO  org.apache.flink.runtime.taskmanager.Task                     - Source: ads (1/1) switched to FAILED with exception.

java.lang.NullPointerException

at java.util.HashMap.putMapEntries(HashMap.java:500)

at java.util.HashMap.<init>(HashMap.java:489)

at storm.kafka.KafkaSpout.open(KafkaSpout.java:73)

at org.apache.flink.stormcompatibility.wrappers.AbstractStormSpoutWrapper.run(AbstractStormSpoutWrapper.java:102)

at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:57)

at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:58)

at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:172)

at org.apache.flink.runtime.taskmanager.Task.run(Task.java:581)

at java.lang.Thread.run(Thread.java:745)



Has someone seen this before? or Have a fix?  I am using 0.10beta1 for all storm packages and a 0.10-snapshot (latest compiled) for all flink packages.  Sample of the kafka code I am using:

Broker broker = new Broker("localhost", 9092);
GlobalPartitionInformation partitionInfo = new GlobalPartitionInformation();
partitionInfo.addPartition(0, broker);
StaticHosts hosts = new StaticHosts(partitionInfo);

SpoutConfig spoutConfig = new SpoutConfig(hosts, "stuff", UUID.randomUUID().toString());
spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);

builder.setSpout("kafkaSpout", kafkaSpout, 1);
Reply | Threaded
Open this post in threaded view
|

Re: NPE thrown when using Storm Kafka Spout with Flink

rmetzger0
Hi Jerry,

the issue occurs because Flink's storm compatibility layer does not support custom configuration parameters currently.
There is this JIRA which aims to add the missing feature to Flink: https://issues.apache.org/jira/browse/FLINK-2525
Maybe (but its unlikely) passing an empty Map in the AbstractStormSpoutWrapper:

this.spout.open(null,
StormWrapperSetupHelper
.convertToTopologyContext((StreamingRuntimeContext) super.getRuntimeContext(), true),
new SpoutOutputCollector(this.collector));
would fix the issue. But I suspect that the KafkaSpout needs some configuration parameters, so we have to wait for FLINK-2525.

Best,
Robert


On Wed, Sep 2, 2015 at 7:58 PM, Jerry Peng <[hidden email]> wrote:
Hello,

When I try to run a storm topology with a Kafka Spout on top of Flink, I get an NPE at:

15:00:32,853 ERROR org.apache.flink.streaming.runtime.tasks.StreamTask           - Error closing stream operators after an exception.

java.lang.NullPointerException

at storm.kafka.KafkaSpout.close(KafkaSpout.java:130)

at org.apache.flink.stormcompatibility.wrappers.AbstractStormSpoutWrapper.close(AbstractStormSpoutWrapper.java:128)

at org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:40)

at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.close(AbstractUdfStreamOperator.java:75)

at org.apache.flink.streaming.runtime.tasks.StreamTask.closeAllOperators(StreamTask.java:243)

at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:197)

at org.apache.flink.runtime.taskmanager.Task.run(Task.java:581)

at java.lang.Thread.run(Thread.java:745)

15:00:32,855 INFO  org.apache.flink.streaming.runtime.tasks.StreamTask           - State backend for state checkpoints is set to jobmanager.

15:00:32,855 INFO  org.apache.flink.runtime.taskmanager.Task                     - event_deserializer (5/5) switched to RUNNING

15:00:32,859 INFO  org.apache.flink.runtime.taskmanager.Task                     - Source: ads (1/1) switched to FAILED with exception.

java.lang.NullPointerException

at java.util.HashMap.putMapEntries(HashMap.java:500)

at java.util.HashMap.<init>(HashMap.java:489)

at storm.kafka.KafkaSpout.open(KafkaSpout.java:73)

at org.apache.flink.stormcompatibility.wrappers.AbstractStormSpoutWrapper.run(AbstractStormSpoutWrapper.java:102)

at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:57)

at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:58)

at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:172)

at org.apache.flink.runtime.taskmanager.Task.run(Task.java:581)

at java.lang.Thread.run(Thread.java:745)



Has someone seen this before? or Have a fix?  I am using 0.10beta1 for all storm packages and a 0.10-snapshot (latest compiled) for all flink packages.  Sample of the kafka code I am using:

Broker broker = new Broker("localhost", 9092);
GlobalPartitionInformation partitionInfo = new GlobalPartitionInformation();
partitionInfo.addPartition(0, broker);
StaticHosts hosts = new StaticHosts(partitionInfo);

SpoutConfig spoutConfig = new SpoutConfig(hosts, "stuff", UUID.randomUUID().toString());
spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);

builder.setSpout("kafkaSpout", kafkaSpout, 1);