Hi Jay,As for the NPE: the file monitoring function throws it when the location is empty. Try running the datagenerator first! :) This behaviour is unwanted though, I am adding a JIRA ticket for it.Best,MartonOn Sun, Oct 4, 2015 at 5:28 AM, Márton Balassi <[hidden email]> wrote:Hi Jay,Creating a batch and a streaming environment in a single Java source file is fine, they just run separately. (If you run it from an IDE locally they might conflict as the second one would try to launch a local executor on a port that is most likely already taken by the first one.) I would suggest to have these jobs in separate files currently, exactly for the previous reason.Looking at your code ExecutionEnvironment.getExecutionEnvironment().registerTypeWithKryoSerializer(com.github.rnowling.bps.datagenerator.datamodels.Product.class, new FlinkBPSGenerator.ProductSerializer()); does not do much good for you. You need to register your serializers to the environment to which you are using. Currently you would need to register it to the streaming env variable. If you would like to also assemble a batch job you need to add them there too.As for the streaming job I assume that you are using Flink version 0.9.1 and checking out the problem shortly.Best,MartonOn Sun, Oct 4, 2015 at 3:37 AM, jay vyas <[hidden email]> wrote:Here is a distilled example of the issue, should be easier to debug for folks interested... :)public static void main(String[] args) {
ExecutionEnvironment.getExecutionEnvironment().registerTypeWithKryoSerializer(com.github.rnowling.bps.datagenerator.datamodels.Product.class, new FlinkBPSGenerator.ProductSerializer());
ExecutionEnvironment.getExecutionEnvironment().registerTypeWithKryoSerializer(com.github.rnowling.bps.datagenerator.datamodels.Transaction.class, new FlinkBPSGenerator.TransactionSerializer());
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//when running "env.execute" this stream should start consuming...
DataStream<String> dataStream = env.readFileStream("/tmp/a", 1000, FileMonitoringFunction.WatchType.ONLY_NEW_FILES);
dataStream.iterate().map(new MapFunction<String, String>() {
public String map(String value) throws Exception {
System.out.println(value);
return ">>> > > > > " + value + " < < < < <<<";
}
});
try {
env.execute();
}
catch(Exception e){
e.printStackTrace();
}
}--On Sat, Oct 3, 2015 at 9:08 PM, jay vyas <[hidden email]> wrote:However, I getI'm trying to do this by creating a dataStream from env.readFileStream, and then later starting a job which writes files out ...Hi flink !Looks like "setSlotSharing" is throwing an NPE when I try to start a Thread which runs a streaming job.
Exception in thread "main" java.lang.NullPointerException
at org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.setSlotSharing(StreamingJobGraphGenerator.java:361)What is the right way to create a stream and batch job all in one environment?For reference, here is a gist of the code https://gist.github.com/jayunit100/c7ab61d1833708d290df, and the offending line is theDataStream<String> dataStream = env.readFileStream("/tmp/a",1000, FileMonitoringFunction.WatchType.ONLY_NEW_FILES);line.Thanks again !
--jay vyasjay vyas
Free forum by Nabble | Edit this page |