adding source not serializable exception in streaming implementation

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

adding source not serializable exception in streaming implementation

subashbasnet
Hello all,

My requirement is to re-read the csv file from a file path at certain time intervals and process the csv data. The csv file gets updated at regular intervals.
Below is my code:
StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> dataStream = getCsvDataStream(see);
DataStream<Stock> edits = see.addSource(new FetchStock("path/to/csv"));

In FetchStock.java
public class FetchStock extends RichSourceFunction<Stock> {
public FetchStock(String csvPath) {
this.csvPath = csvPath;
}
}

I am trying to adapt code from WikipediaAnalysis, but getting the below not serializable exception on adding source:
Exception in thread "main" org.apache.flink.api.common.InvalidProgramException: Object wikiedits.FetchStock@d7b1517 not serializable
at org.apache.flink.api.java.ClosureCleaner.ensureSerializable(ClosureCleaner.java:99)
at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:61)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1219)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1131)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1075)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1057)
at wikiedits.StockAnalysis.main(StockAnalysis.java:30)
Caused by: java.io.NotSerializableException: org.apache.flink.streaming.api.environment.LocalStreamEnvironment
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
at org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:300)
at org.apache.flink.api.java.ClosureCleaner.ensureSerializable(ClosureCleaner.java:97)
... 6 more
 

I have attached Stock.java which is just a model with getters and setters. Not sure what am I doing wrong. 

Best Regards,
Subash Basnet

Stock.java (2K) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: adding source not serializable exception in streaming implementation

Till Rohrmann

I assume that the provided FetchStock code is not complete. As the exception indicates, you somehow store a LocalStreamEnvironment in you source function. The StreamExecutionEnvironments are not serializable and cannot be part of the source function’s closure.

Cheers,
Till


On Tue, Apr 19, 2016 at 2:32 PM, subash basnet <[hidden email]> wrote:
Hello all,

My requirement is to re-read the csv file from a file path at certain time intervals and process the csv data. The csv file gets updated at regular intervals.
Below is my code:
StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> dataStream = getCsvDataStream(see);
DataStream<Stock> edits = see.addSource(new FetchStock("path/to/csv"));

In FetchStock.java
public class FetchStock extends RichSourceFunction<Stock> {
public FetchStock(String csvPath) {
this.csvPath = csvPath;
}
}

I am trying to adapt code from WikipediaAnalysis, but getting the below not serializable exception on adding source:
Exception in thread "main" org.apache.flink.api.common.InvalidProgramException: Object wikiedits.FetchStock@d7b1517 not serializable
at org.apache.flink.api.java.ClosureCleaner.ensureSerializable(ClosureCleaner.java:99)
at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:61)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1219)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1131)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1075)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1057)
at wikiedits.StockAnalysis.main(StockAnalysis.java:30)
Caused by: java.io.NotSerializableException: org.apache.flink.streaming.api.environment.LocalStreamEnvironment
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
at org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:300)
at org.apache.flink.api.java.ClosureCleaner.ensureSerializable(ClosureCleaner.java:97)
... 6 more
 

I have attached Stock.java which is just a model with getters and setters. Not sure what am I doing wrong. 

Best Regards,
Subash Basnet