setSlotSharing NPE: Starting a stream consumer in a thread

classic Classic list List threaded Threaded
6 messages Options
Reply | Threaded
Open this post in threaded view
|

setSlotSharing NPE: Starting a stream consumer in a thread

jay vyas
Hi flink !

Looks like "setSlotSharing" is throwing an NPE when I try to start a Thread  which runs a streaming job.

I'm trying to do this by creating a dataStream from env.readFileStream, and then later starting a job which writes files out ...

However, I get

Exception in thread "main" java.lang.NullPointerException
    at org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.setSlotSharing(StreamingJobGraphGenerator.java:361)

What is the right way to create a stream and batch job all in one environment?

For reference, here is a gist of the code https://gist.github.com/jayunit100/c7ab61d1833708d290df, and the offending line is the
DataStream<String> dataStream = env.readFileStream("/tmp/a",1000, FileMonitoringFunction.WatchType.ONLY_NEW_FILES);
line.

Thanks again !

--
jay vyas
Reply | Threaded
Open this post in threaded view
|

Re: setSlotSharing NPE: Starting a stream consumer in a thread

jay vyas
Here is a distilled example of the issue, should be easier to debug for folks interested... :)
public static void main(String[] args) {


ExecutionEnvironment.getExecutionEnvironment().registerTypeWithKryoSerializer(com.github.rnowling.bps.datagenerator.datamodels.Product.class, new FlinkBPSGenerator.ProductSerializer());
ExecutionEnvironment.getExecutionEnvironment().registerTypeWithKryoSerializer(com.github.rnowling.bps.datagenerator.datamodels.Transaction.class, new FlinkBPSGenerator.TransactionSerializer());

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

//when running "env.execute" this stream should start consuming...
DataStream<String> dataStream = env.readFileStream("/tmp/a", 1000, FileMonitoringFunction.WatchType.ONLY_NEW_FILES);
dataStream.iterate().map(new MapFunction<String, String>() {
public String map(String value) throws Exception {
System.out.println(value);
return ">>> > > > > " + value + " < < < < <<<";
}
});
try {
env.execute();
}
catch(Exception e){
e.printStackTrace();
}
}


On Sat, Oct 3, 2015 at 9:08 PM, jay vyas <[hidden email]> wrote:
Hi flink !

Looks like "setSlotSharing" is throwing an NPE when I try to start a Thread  which runs a streaming job.

I'm trying to do this by creating a dataStream from env.readFileStream, and then later starting a job which writes files out ...

However, I get

Exception in thread "main" java.lang.NullPointerException
    at org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.setSlotSharing(StreamingJobGraphGenerator.java:361)

What is the right way to create a stream and batch job all in one environment?

For reference, here is a gist of the code https://gist.github.com/jayunit100/c7ab61d1833708d290df, and the offending line is the
DataStream<String> dataStream = env.readFileStream("/tmp/a",1000, FileMonitoringFunction.WatchType.ONLY_NEW_FILES);
line.

Thanks again !

--
jay vyas



--
jay vyas
Reply | Threaded
Open this post in threaded view
|

Re: setSlotSharing NPE: Starting a stream consumer in a thread

Márton Balassi
Hi Jay,

Creating a batch and a streaming environment in a single Java source file is fine, they just run separately. (If you run it from an IDE locally they might conflict as the second one would try to launch a local executor on a port that is most likely already taken by the first one.) I would suggest to have these jobs in separate files currently, exactly for the previous reason.

Looking at your code ExecutionEnvironment.getExecutionEnvironment().registerTypeWithKryoSerializer(com.github.rnowling.bps.datagenerator.datamodels.Product.class, new FlinkBPSGenerator.ProductSerializer()); does not do much good for you. You need to register your serializers to the environment to which you are using. Currently you would need to register it to the streaming env variable. If you would like to also assemble a batch job you need to add them there too.

As for the streaming job I assume that you are using Flink version 0.9.1 and checking out the problem shortly.

Best,

Marton  

On Sun, Oct 4, 2015 at 3:37 AM, jay vyas <[hidden email]> wrote:
Here is a distilled example of the issue, should be easier to debug for folks interested... :)
public static void main(String[] args) {


ExecutionEnvironment.getExecutionEnvironment().registerTypeWithKryoSerializer(com.github.rnowling.bps.datagenerator.datamodels.Product.class, new FlinkBPSGenerator.ProductSerializer());
ExecutionEnvironment.getExecutionEnvironment().registerTypeWithKryoSerializer(com.github.rnowling.bps.datagenerator.datamodels.Transaction.class, new FlinkBPSGenerator.TransactionSerializer());

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

//when running "env.execute" this stream should start consuming...
DataStream<String> dataStream = env.readFileStream("/tmp/a", 1000, FileMonitoringFunction.WatchType.ONLY_NEW_FILES);
dataStream.iterate().map(new MapFunction<String, String>() {
public String map(String value) throws Exception {
System.out.println(value);
return ">>> > > > > " + value + " < < < < <<<";
}
});
try {
env.execute();
}
catch(Exception e){
e.printStackTrace();
}
}


On Sat, Oct 3, 2015 at 9:08 PM, jay vyas <[hidden email]> wrote:
Hi flink !

Looks like "setSlotSharing" is throwing an NPE when I try to start a Thread  which runs a streaming job.

I'm trying to do this by creating a dataStream from env.readFileStream, and then later starting a job which writes files out ...

However, I get

Exception in thread "main" java.lang.NullPointerException
    at org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.setSlotSharing(StreamingJobGraphGenerator.java:361)

What is the right way to create a stream and batch job all in one environment?

For reference, here is a gist of the code https://gist.github.com/jayunit100/c7ab61d1833708d290df, and the offending line is the
DataStream<String> dataStream = env.readFileStream("/tmp/a",1000, FileMonitoringFunction.WatchType.ONLY_NEW_FILES);
line.

Thanks again !

--
jay vyas



--
jay vyas

Reply | Threaded
Open this post in threaded view
|

Re: setSlotSharing NPE: Starting a stream consumer in a thread

Márton Balassi
Hi Jay,

As for the NPE: the file monitoring function throws it when the location is empty. Try running the datagenerator first! :) This behaviour is unwanted though, I am adding a JIRA ticket for it.

Best,

Marton

On Sun, Oct 4, 2015 at 5:28 AM, Márton Balassi <[hidden email]> wrote:
Hi Jay,

Creating a batch and a streaming environment in a single Java source file is fine, they just run separately. (If you run it from an IDE locally they might conflict as the second one would try to launch a local executor on a port that is most likely already taken by the first one.) I would suggest to have these jobs in separate files currently, exactly for the previous reason.

Looking at your code ExecutionEnvironment.getExecutionEnvironment().registerTypeWithKryoSerializer(com.github.rnowling.bps.datagenerator.datamodels.Product.class, new FlinkBPSGenerator.ProductSerializer()); does not do much good for you. You need to register your serializers to the environment to which you are using. Currently you would need to register it to the streaming env variable. If you would like to also assemble a batch job you need to add them there too.

As for the streaming job I assume that you are using Flink version 0.9.1 and checking out the problem shortly.

Best,

Marton  

On Sun, Oct 4, 2015 at 3:37 AM, jay vyas <[hidden email]> wrote:
Here is a distilled example of the issue, should be easier to debug for folks interested... :)
public static void main(String[] args) {


ExecutionEnvironment.getExecutionEnvironment().registerTypeWithKryoSerializer(com.github.rnowling.bps.datagenerator.datamodels.Product.class, new FlinkBPSGenerator.ProductSerializer());
ExecutionEnvironment.getExecutionEnvironment().registerTypeWithKryoSerializer(com.github.rnowling.bps.datagenerator.datamodels.Transaction.class, new FlinkBPSGenerator.TransactionSerializer());

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

//when running "env.execute" this stream should start consuming...
DataStream<String> dataStream = env.readFileStream("/tmp/a", 1000, FileMonitoringFunction.WatchType.ONLY_NEW_FILES);
dataStream.iterate().map(new MapFunction<String, String>() {
public String map(String value) throws Exception {
System.out.println(value);
return ">>> > > > > " + value + " < < < < <<<";
}
});
try {
env.execute();
}
catch(Exception e){
e.printStackTrace();
}
}


On Sat, Oct 3, 2015 at 9:08 PM, jay vyas <[hidden email]> wrote:
Hi flink !

Looks like "setSlotSharing" is throwing an NPE when I try to start a Thread  which runs a streaming job.

I'm trying to do this by creating a dataStream from env.readFileStream, and then later starting a job which writes files out ...

However, I get

Exception in thread "main" java.lang.NullPointerException
    at org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.setSlotSharing(StreamingJobGraphGenerator.java:361)

What is the right way to create a stream and batch job all in one environment?

For reference, here is a gist of the code https://gist.github.com/jayunit100/c7ab61d1833708d290df, and the offending line is the
DataStream<String> dataStream = env.readFileStream("/tmp/a",1000, FileMonitoringFunction.WatchType.ONLY_NEW_FILES);
line.

Thanks again !

--
jay vyas



--
jay vyas


Reply | Threaded
Open this post in threaded view
|

Re: setSlotSharing NPE: Starting a stream consumer in a thread

Suneel Marthi-2
While on that Marton,  would it make sense to have a dataStream.writeAsJson() method? 

On Sat, Oct 3, 2015 at 11:54 PM, Márton Balassi <[hidden email]> wrote:
Hi Jay,

As for the NPE: the file monitoring function throws it when the location is empty. Try running the datagenerator first! :) This behaviour is unwanted though, I am adding a JIRA ticket for it.

Best,

Marton

On Sun, Oct 4, 2015 at 5:28 AM, Márton Balassi <[hidden email]> wrote:
Hi Jay,

Creating a batch and a streaming environment in a single Java source file is fine, they just run separately. (If you run it from an IDE locally they might conflict as the second one would try to launch a local executor on a port that is most likely already taken by the first one.) I would suggest to have these jobs in separate files currently, exactly for the previous reason.

Looking at your code ExecutionEnvironment.getExecutionEnvironment().registerTypeWithKryoSerializer(com.github.rnowling.bps.datagenerator.datamodels.Product.class, new FlinkBPSGenerator.ProductSerializer()); does not do much good for you. You need to register your serializers to the environment to which you are using. Currently you would need to register it to the streaming env variable. If you would like to also assemble a batch job you need to add them there too.

As for the streaming job I assume that you are using Flink version 0.9.1 and checking out the problem shortly.

Best,

Marton  

On Sun, Oct 4, 2015 at 3:37 AM, jay vyas <[hidden email]> wrote:
Here is a distilled example of the issue, should be easier to debug for folks interested... :)
public static void main(String[] args) {


ExecutionEnvironment.getExecutionEnvironment().registerTypeWithKryoSerializer(com.github.rnowling.bps.datagenerator.datamodels.Product.class, new FlinkBPSGenerator.ProductSerializer());
ExecutionEnvironment.getExecutionEnvironment().registerTypeWithKryoSerializer(com.github.rnowling.bps.datagenerator.datamodels.Transaction.class, new FlinkBPSGenerator.TransactionSerializer());

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

//when running "env.execute" this stream should start consuming...
DataStream<String> dataStream = env.readFileStream("/tmp/a", 1000, FileMonitoringFunction.WatchType.ONLY_NEW_FILES);
dataStream.iterate().map(new MapFunction<String, String>() {
public String map(String value) throws Exception {
System.out.println(value);
return ">>> > > > > " + value + " < < < < <<<";
}
});
try {
env.execute();
}
catch(Exception e){
e.printStackTrace();
}
}


On Sat, Oct 3, 2015 at 9:08 PM, jay vyas <[hidden email]> wrote:
Hi flink !

Looks like "setSlotSharing" is throwing an NPE when I try to start a Thread  which runs a streaming job.

I'm trying to do this by creating a dataStream from env.readFileStream, and then later starting a job which writes files out ...

However, I get

Exception in thread "main" java.lang.NullPointerException
    at org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.setSlotSharing(StreamingJobGraphGenerator.java:361)

What is the right way to create a stream and batch job all in one environment?

For reference, here is a gist of the code https://gist.github.com/jayunit100/c7ab61d1833708d290df, and the offending line is the
DataStream<String> dataStream = env.readFileStream("/tmp/a",1000, FileMonitoringFunction.WatchType.ONLY_NEW_FILES);
line.

Thanks again !

--
jay vyas



--
jay vyas



Reply | Threaded
Open this post in threaded view
|

Re: setSlotSharing NPE: Starting a stream consumer in a thread

Márton Balassi
Suneel, Flink comes with a built-in AvroOutputFormat. Is that good enough for you?

On Sun, Oct 4, 2015 at 6:01 AM, Suneel Marthi <[hidden email]> wrote:
While on that Marton,  would it make sense to have a dataStream.writeAsJson() method? 

On Sat, Oct 3, 2015 at 11:54 PM, Márton Balassi <[hidden email]> wrote:
Hi Jay,

As for the NPE: the file monitoring function throws it when the location is empty. Try running the datagenerator first! :) This behaviour is unwanted though, I am adding a JIRA ticket for it.

Best,

Marton

On Sun, Oct 4, 2015 at 5:28 AM, Márton Balassi <[hidden email]> wrote:
Hi Jay,

Creating a batch and a streaming environment in a single Java source file is fine, they just run separately. (If you run it from an IDE locally they might conflict as the second one would try to launch a local executor on a port that is most likely already taken by the first one.) I would suggest to have these jobs in separate files currently, exactly for the previous reason.

Looking at your code ExecutionEnvironment.getExecutionEnvironment().registerTypeWithKryoSerializer(com.github.rnowling.bps.datagenerator.datamodels.Product.class, new FlinkBPSGenerator.ProductSerializer()); does not do much good for you. You need to register your serializers to the environment to which you are using. Currently you would need to register it to the streaming env variable. If you would like to also assemble a batch job you need to add them there too.

As for the streaming job I assume that you are using Flink version 0.9.1 and checking out the problem shortly.

Best,

Marton  

On Sun, Oct 4, 2015 at 3:37 AM, jay vyas <[hidden email]> wrote:
Here is a distilled example of the issue, should be easier to debug for folks interested... :)
public static void main(String[] args) {


ExecutionEnvironment.getExecutionEnvironment().registerTypeWithKryoSerializer(com.github.rnowling.bps.datagenerator.datamodels.Product.class, new FlinkBPSGenerator.ProductSerializer());
ExecutionEnvironment.getExecutionEnvironment().registerTypeWithKryoSerializer(com.github.rnowling.bps.datagenerator.datamodels.Transaction.class, new FlinkBPSGenerator.TransactionSerializer());

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

//when running "env.execute" this stream should start consuming...
DataStream<String> dataStream = env.readFileStream("/tmp/a", 1000, FileMonitoringFunction.WatchType.ONLY_NEW_FILES);
dataStream.iterate().map(new MapFunction<String, String>() {
public String map(String value) throws Exception {
System.out.println(value);
return ">>> > > > > " + value + " < < < < <<<";
}
});
try {
env.execute();
}
catch(Exception e){
e.printStackTrace();
}
}


On Sat, Oct 3, 2015 at 9:08 PM, jay vyas <[hidden email]> wrote:
Hi flink !

Looks like "setSlotSharing" is throwing an NPE when I try to start a Thread  which runs a streaming job.

I'm trying to do this by creating a dataStream from env.readFileStream, and then later starting a job which writes files out ...

However, I get

Exception in thread "main" java.lang.NullPointerException
    at org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.setSlotSharing(StreamingJobGraphGenerator.java:361)

What is the right way to create a stream and batch job all in one environment?

For reference, here is a gist of the code https://gist.github.com/jayunit100/c7ab61d1833708d290df, and the offending line is the
DataStream<String> dataStream = env.readFileStream("/tmp/a",1000, FileMonitoringFunction.WatchType.ONLY_NEW_FILES);
line.

Thanks again !

--
jay vyas



--
jay vyas