Hello all,
My requirement is to re-read the csv file from a file path at certain time intervals and process the csv data. The csv file gets updated at regular intervals. Below is my code: StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment(); DataStream<String> dataStream = getCsvDataStream(see); DataStream<Stock> edits = see.addSource(new FetchStock("path/to/csv")); In FetchStock.java public class FetchStock extends RichSourceFunction<Stock> { public FetchStock(String csvPath) { this.csvPath = csvPath; } } I am trying to adapt code from WikipediaAnalysis, but getting the below not serializable exception on adding source: Exception in thread "main" org.apache.flink.api.common.InvalidProgramException: Object wikiedits.FetchStock@d7b1517 not serializable at org.apache.flink.api.java.ClosureCleaner.ensureSerializable(ClosureCleaner.java:99) at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:61) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1219) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1131) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1075) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1057) at wikiedits.StockAnalysis.main(StockAnalysis.java:30) Caused by: java.io.NotSerializableException: org.apache.flink.streaming.api.environment.LocalStreamEnvironment at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) at org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:300) at org.apache.flink.api.java.ClosureCleaner.ensureSerializable(ClosureCleaner.java:97) ... 6 more I have attached Stock.java which is just a model with getters and setters. Not sure what am I doing wrong. Best Regards, Subash Basnet Stock.java (2K) Download Attachment |
I assume that the provided Cheers, On Tue, Apr 19, 2016 at 2:32 PM, subash basnet <[hidden email]> wrote:
|
Free forum by Nabble | Edit this page |