This post was updated on .
*** Original post edited ***
I'm trying to build a sample application using Flink that does the following: 1. Reads a stream of stock symbols (e.g. 'CSCO', 'FB') from a Kafka queue 2. For each symbol performs a real-time lookup of current prices and streams the values I moved the map function into a separate class "StockSymbolToPriceMapFunction" and accessing the Stream execution environment using "StreamExecutionEnvironment.getExecutionEnvironment()". I do not get the run-time error message "The implementation of the MapFunction is not serializable. The object probably contains or references non serializable fields" any more. I'm facing a new issue with the Kafka topic "stockprices" I'm trying to write the prices to is not receiving them. I'm trying to trouble-shoot and will post any updates. Updated code snippet is provided below: public class RetrieveStockPrices { @SuppressWarnings("serial") public static void main(String[] args) throws Exception { final StreamExecutionEnvironment streamExecEnv = StreamExecutionEnvironment.getExecutionEnvironment(); streamExecEnv.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime); Properties properties = new Properties(); properties.setProperty("bootstrap.servers", "localhost:9092"); properties.setProperty("zookeeper.connect", "localhost:2181"); properties.setProperty("group.id", "stocks"); DataStream<String> streamOfStockSymbols = streamExecEnv.addSource(new FlinkKafkaConsumer08<String>("stocksymbol", new SimpleStringSchema(), properties)); DataStream<String> stockPrice = streamOfStockSymbols //get unique keys .keyBy(new KeySelector<String, String>() { @Override public String getKey(String trend) throws Exception { return trend; } }) //collect events over a window .window(TumblingEventTimeWindows.of(Time.seconds(60))) //return the last event from the window...all elements are the same "Symbol" .apply(new WindowFunction<String, String, String, TimeWindow>() { @Override public void apply(String key, TimeWindow window, Iterable<String> input, Collector<String> out) throws Exception { out.collect(input.iterator().next().toString()); } }) .map(new StockSymbolToPriceMapFunction()); streamExecEnv.execute("Retrieve Stock Prices"); } } public class StockSymbolToPriceMapFunction extends RichMapFunction<String, String> { @Override public String map(String stockSymbol) throws Exception { final StreamExecutionEnvironment streamExecEnv = StreamExecutionEnvironment.getExecutionEnvironment(); streamExecEnv.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime); System.out.println("StockSymbolToPriceMapFunction: stockSymbol: " + stockSymbol); DataStream<String> stockPrices = streamExecEnv.addSource(new LookupStockPrice(stockSymbol)); stockPrices.keyBy(new CustomKeySelector()).addSink(new FlinkKafkaProducer08<String>("localhost:9092", "stockprices", new SimpleStringSchema())); //To research: how the stock prices returned by "LookupStockPrice()" can be returned so that the sink can be moved to the main program. return "100000"; } private static class CustomKeySelector implements KeySelector<String, String> { @Override public String getKey(String arg0) throws Exception { return arg0.trim(); } } } public class LookupStockPrice extends RichSourceFunction<String> { public String stockSymbol = null; public boolean isRunning = true; public LookupStockPrice(String inSymbol) { stockSymbol = inSymbol; } @Override public void open(Configuration parameters) throws Exception { isRunning = true; } @Override public void cancel() { isRunning = false; } @Override public void run(SourceFunction.SourceContext<String> ctx) throws Exception { String stockPrice = "0"; while (isRunning) { //TODO: query Google Finance API stockPrice = Integer.toString((new Random()).nextInt(100)+1); ctx.collect(stockPrice); Thread.sleep(10000); } } } |
Runtime error is because you have non-serializable code in your 'map' operator. DataStream<String> stockPrices = streamExecEnv.addSource(new LookupStockPrice(stockSymbol)) stockPrices.print(); Thanks, Sandeep On Fri, Dec 16, 2016 at 12:16 PM, hnadathur <[hidden email]> wrote: I'm trying to build a sample application using Flink that does the following: |
Free forum by Nabble | Edit this page |