Re: setSlotSharing NPE: Starting a stream consumer in a thread

Posted by jay vyas on
URL: http://deprecated-apache-flink-user-mailing-list-archive.369.s1.nabble.com/setSlotSharing-NPE-Starting-a-stream-consumer-in-a-thread-tp3021p3022.html

Here is a distilled example of the issue, should be easier to debug for folks interested... :)
public static void main(String[] args) {


ExecutionEnvironment.getExecutionEnvironment().registerTypeWithKryoSerializer(com.github.rnowling.bps.datagenerator.datamodels.Product.class, new FlinkBPSGenerator.ProductSerializer());
ExecutionEnvironment.getExecutionEnvironment().registerTypeWithKryoSerializer(com.github.rnowling.bps.datagenerator.datamodels.Transaction.class, new FlinkBPSGenerator.TransactionSerializer());

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

//when running "env.execute" this stream should start consuming...
DataStream<String> dataStream = env.readFileStream("/tmp/a", 1000, FileMonitoringFunction.WatchType.ONLY_NEW_FILES);
dataStream.iterate().map(new MapFunction<String, String>() {
public String map(String value) throws Exception {
System.out.println(value);
return ">>> > > > > " + value + " < < < < <<<";
}
});
try {
env.execute();
}
catch(Exception e){
e.printStackTrace();
}
}


On Sat, Oct 3, 2015 at 9:08 PM, jay vyas <[hidden email]> wrote:
Hi flink !

Looks like "setSlotSharing" is throwing an NPE when I try to start a Thread  which runs a streaming job.

I'm trying to do this by creating a dataStream from env.readFileStream, and then later starting a job which writes files out ...

However, I get

Exception in thread "main" java.lang.NullPointerException
    at org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.setSlotSharing(StreamingJobGraphGenerator.java:361)

What is the right way to create a stream and batch job all in one environment?

For reference, here is a gist of the code https://gist.github.com/jayunit100/c7ab61d1833708d290df, and the offending line is the
DataStream<String> dataStream = env.readFileStream("/tmp/a",1000, FileMonitoringFunction.WatchType.ONLY_NEW_FILES);
line.

Thanks again !

--
jay vyas



--
jay vyas