Hi,
I’ve faced a problem trying to model our platform using Flink Streams. Let me describe our model: // Stream of data, ex. stocks: (AAPL, 100.0), (GZMP, 100.0) etc. val realData: DataStream[(K, V)] = env.addSource(…) // Stream of forecasts (same format) based on some window aggregates val forecastedData: DataStream[(K, V)] = realData.keyBy(1).timeWindow(Time.minutes(FORECAST_INTERVAL)).apply(new Forecaster(…)) I would like to construct a stream errors, which values are just differences between realData stream and the latest available forecast for this key in forecastedData stream // I suppose this solution does not guarantee that all realData values will have corresponding forecast val errors: DataStream[(K, V)] = realData.join(forecastedData).where(0).equal(0)… Could you give an advice on how to implement such pattern? Do I have to write custom windows? Artem
|
Hi, right now, the Streaming Join only works on windows, so it would not work for your case. You can implement an approximation of this using connected streams, i.e.: forecastedData.connect(realData).flatMap( new CoFlatMapFunction() ); the CoFlatMapFunction would receive the forecasts on the first input and the real data on the second input. Either of the sides you would store internally and when an element from the other side arrives you can do the comparison and emit a result. You can also use the partitioned state abstraction for keeping the state in the function. I have also recently started thinking about how we can put something like this into the API in a more usable way. The current thoughts on this are collected here: https://docs.google.com/document/d/1hIgxi2Zchww_5fWUHLoYiXwSBXjv-M5eOv-MKQYN3m4/edit?usp=sharing -Aljoscha On Thu, 19 May 2016 at 16:01 Artem Bogachev <[hidden email]> wrote:
|
Free forum by Nabble | Edit this page |