How do I use values from a data stream to create a new streaming data source

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

How do I use values from a data stream to create a new streaming data source

hnadathur
This post was updated on .
*** Original post edited ***

I'm trying to build a sample application using Flink that does the following:
1. Reads a stream of stock symbols (e.g. 'CSCO', 'FB') from a Kafka queue
2. For each symbol performs a real-time lookup of current prices and streams the values

I moved the map function into a separate class "StockSymbolToPriceMapFunction" and accessing the Stream execution environment using "StreamExecutionEnvironment.getExecutionEnvironment()". I do not get the run-time error message "The implementation of the MapFunction is not serializable. The object probably contains or references non serializable fields" any more.

I'm facing a new issue with the Kafka topic "stockprices" I'm trying to write the prices to is not receiving them. I'm trying to trouble-shoot and will post any updates.

Updated code snippet is provided below:

public class RetrieveStockPrices {
    @SuppressWarnings("serial")
    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment streamExecEnv = StreamExecutionEnvironment.getExecutionEnvironment();
        streamExecEnv.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);

        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "localhost:9092");
        properties.setProperty("zookeeper.connect", "localhost:2181");
        properties.setProperty("group.id", "stocks");

        DataStream<String> streamOfStockSymbols = streamExecEnv.addSource(new FlinkKafkaConsumer08<String>("stocksymbol", new SimpleStringSchema(), properties));

        DataStream<String> stockPrice =
            streamOfStockSymbols
            //get unique keys
            .keyBy(new KeySelector<String, String>() {
                @Override
                public String getKey(String trend) throws Exception {
                    return trend;
                }
                })
            //collect events over a window
            .window(TumblingEventTimeWindows.of(Time.seconds(60)))
            //return the last event from the window...all elements are the same "Symbol"
            .apply(new WindowFunction<String, String, String, TimeWindow>() {
                @Override
                public void apply(String key, TimeWindow window, Iterable<String> input, Collector<String> out) throws Exception {
                    out.collect(input.iterator().next().toString());
                }
            })
            .map(new StockSymbolToPriceMapFunction());

        streamExecEnv.execute("Retrieve Stock Prices");
    }
}

public class StockSymbolToPriceMapFunction extends RichMapFunction<String, String> {
    @Override
    public String map(String stockSymbol) throws Exception {
        final StreamExecutionEnvironment streamExecEnv = StreamExecutionEnvironment.getExecutionEnvironment();
        streamExecEnv.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
        System.out.println("StockSymbolToPriceMapFunction: stockSymbol: " + stockSymbol);

        DataStream<String> stockPrices = streamExecEnv.addSource(new LookupStockPrice(stockSymbol));
        stockPrices.keyBy(new CustomKeySelector()).addSink(new FlinkKafkaProducer08<String>("localhost:9092", "stockprices", new SimpleStringSchema()));

        //To research: how the stock prices returned by "LookupStockPrice()" can be returned so that the sink can be moved to the main program.
        return "100000";
    }

    private static class CustomKeySelector implements KeySelector<String, String> {
        @Override
        public String getKey(String arg0) throws Exception {
            return arg0.trim();
        }
    }
}


public class LookupStockPrice extends RichSourceFunction<String> {
    public String stockSymbol = null;
    public boolean isRunning = true;

    public LookupStockPrice(String inSymbol) {
            stockSymbol = inSymbol;
    }

    @Override
    public void open(Configuration parameters) throws Exception {
            isRunning = true;
    }


    @Override
    public void cancel() {
            isRunning = false;
    }

    @Override
    public void run(SourceFunction.SourceContext<String> ctx)
                    throws Exception {
            String stockPrice = "0";
            while (isRunning) {
                //TODO: query Google Finance API
                stockPrice = Integer.toString((new Random()).nextInt(100)+1);
                ctx.collect(stockPrice);
                Thread.sleep(10000);
            }
    }
}
Reply | Threaded
Open this post in threaded view
|

Re: How do I use values from a data stream to create a new streaming data source

sandeep6
Runtime error is because you have non-serializable code in your 'map' operator.
DataStream<String> stockPrices = streamExecEnv.addSource(new
LookupStockPrice(stockSymbol));
                                stockPrices.print();

The approach that you took will create infinite stockprice sources inside the map operator. I don' think it will be an easy DAG for flink to resolve. One approach could be to have a separate keyed stream for stock price, window them for a specific period and look for the price in that window. This is an interesting use case. I'm sure the flink team tried this.

Thanks,
Sandeep




On Fri, Dec 16, 2016 at 12:16 PM, hnadathur <[hidden email]> wrote:
I'm trying to build a sample application using Flink that does the following:
1. Reads a stream of stock symbols (e.g. 'CSCO', 'FB') from a Kafka queue
2. For each symbol performs a real-time lookup of current prices and streams
the values

The program compiles fine but I get the following run-time error message:
"The implementation of the MapFunction is not serializable. The object
probably contains or references non serializable fields".

I suspect the problem is due to the way I'm accessing the
StreamExecutionEnvironment. Could someone please provide pointers to how I
can use values from a data stream to create a new streaming data source? Any
response is appreciated.

Relevant code snippet is provided below:

public class RetrieveStockPrices {

        @SuppressWarnings("serial")
        public static void main(String[] args) throws Exception {
                final StreamExecutionEnvironment streamExecEnv =
StreamExecutionEnvironment.getExecutionEnvironment();

streamExecEnv.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);

                Properties properties = new Properties();
                properties.setProperty("bootstrap.servers", "localhost:9092");
                properties.setProperty("zookeeper.connect", "localhost:2181");
                properties.setProperty("group.id", "stocks");

                DataStream<String> streamOfStockSymbols = streamExecEnv.addSource(new
FlinkKafkaConsumer08<String>("stocksymbol", new SimpleStringSchema(),
properties));
                streamOfStockSymbols.map(new MapFunction<String, String> () {
                        @Override
                        public String map(String stockSymbol) throws Exception {
                                DataStream<String> stockPrices = streamExecEnv.addSource(new
LookupStockPrice(stockSymbol));
                                stockPrices.print();
                                return null;
                        }
                });

                streamExecEnv.execute("Retrieve Stock Prices");
        }

}


public class LookupStockPrice extends RichSourceFunction<String> {
        public String stockSymbol = null;
        public boolean isRunning = true;

        public LookupStockPrice(String inSymbol) {
                stockSymbol = inSymbol;
        }

        @Override
        public void open(Configuration parameters) throws Exception {
                isRunning = true;
        }


        @Override
        public void cancel() {
                isRunning = false;
        }

        @Override
        public void run(SourceFunction.SourceContext<String> ctx)
                        throws Exception {
                while (isRunning) {
                    //TODO: query Google Finance API
                    ctx.collect("12.5");
                }
        }
}




--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/How-do-I-use-values-from-a-data-stream-to-create-a-new-streaming-data-source-tp10680.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.