"Last One" Window

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

"Last One" Window

Artem Bogachev
Hi,

I’ve faced a problem trying to model our platform using Flink Streams.

Let me describe our model:

// Stream of data, ex. stocks: (AAPL, 100.0), (GZMP, 100.0) etc.
val realData: DataStream[(K, V)] =  env.addSource(…)

// Stream of forecasts (same format) based on some window aggregates
val forecastedData: DataStream[(K, V)] = realData.keyBy(1).timeWindow(Time.minutes(FORECAST_INTERVAL)).apply(new Forecaster(…))

I would like to construct a stream errors, which values are just differences between realData stream and the latest available forecast for this key in forecastedData stream

// I suppose this solution does not guarantee that all realData values will have corresponding forecast
val errors: DataStream[(K, V)] = realData.join(forecastedData).where(0).equal(0)…

Could you give an advice on how to implement such pattern? Do I have to write custom windows?

Artem
Reply | Threaded
Open this post in threaded view
|

Re: "Last One" Window

Aljoscha Krettek
Hi,
right now, the Streaming Join only works on windows, so it would not work for your case.

You can implement an approximation of this using connected streams, i.e.:

forecastedData.connect(realData).flatMap( new CoFlatMapFunction() );

the CoFlatMapFunction would receive the forecasts on the first input and the real data on the second input. Either of the sides you would store internally and when an element from the other side arrives you can do the comparison and emit a result. You can also use the partitioned state abstraction for keeping the state in the function.

I have also recently started thinking about how we can put something like this into the API in a more usable way. The current thoughts on this are collected here: https://docs.google.com/document/d/1hIgxi2Zchww_5fWUHLoYiXwSBXjv-M5eOv-MKQYN3m4/edit?usp=sharing

-Aljoscha

On Thu, 19 May 2016 at 16:01 Artem Bogachev <[hidden email]> wrote:
Hi,

I’ve faced a problem trying to model our platform using Flink Streams.

Let me describe our model:

// Stream of data, ex. stocks: (AAPL, 100.0), (GZMP, 100.0) etc.
val realData: DataStream[(K, V)] =  env.addSource(…)

// Stream of forecasts (same format) based on some window aggregates
val forecastedData: DataStream[(K, V)] = realData.keyBy(1).timeWindow(Time.minutes(FORECAST_INTERVAL)).apply(new Forecaster(…))

I would like to construct a stream errors, which values are just differences between realData stream and the latest available forecast for this key in forecastedData stream

// I suppose this solution does not guarantee that all realData values will have corresponding forecast
val errors: DataStream[(K, V)] = realData.join(forecastedData).where(0).equal(0)…

Could you give an advice on how to implement such pattern? Do I have to write custom windows?

Artem