join stream with last available element of other stream

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

join stream with last available element of other stream

Artem Bogachev
Hi,

I’ve faced a problem trying to model our platform using Flink Streams.

Let me describe our model:

// Stream of data, ex. stocks: (AAPL, 100.0), (GZMP, 100.0) etc.
val realData: DataStream[(K, V)] =  env.addSource(…)

// Stream of forecasts (same format) based on some window aggregates
val forecastedData: DataStream[(K, V)] = realData.keyBy(1).timeWindow(Time.minutes(FORECAST_INTERVAL)).apply(new Forecaster(…))

I would like to construct a stream errors, which values are just differences between realData stream and the latest available forecast for this key in forecastedData stream

// I suppose this solution does not guarantee that all realData values will have corresponding forecast
val errors: DataStream[(K, V)] = realData.join(forecastedData).where(0).equal(0)…

Could you give an advice on how to implement such pattern? Do I have to write custom windows?

Artem
Reply | Threaded
Open this post in threaded view
|

Re: join stream with last available element of other stream

Ufuk Celebi
Aljoscha answered this in the other thread you started for this
("'Last One' Window")

On Fri, May 20, 2016 at 12:43 PM, Artem Bogachev
<[hidden email]> wrote:

> Hi,
>
> I’ve faced a problem trying to model our platform using Flink Streams.
>
> Let me describe our model:
>
> // Stream of data, ex. stocks: (AAPL, 100.0), (GZMP, 100.0) etc.
> val realData: DataStream[(K, V)] =  env.addSource(…)
>
> // Stream of forecasts (same format) based on some window aggregates
> val forecastedData: DataStream[(K, V)] =
> realData.keyBy(1).timeWindow(Time.minutes(FORECAST_INTERVAL)).apply(new
> Forecaster(…))
>
> I would like to construct a stream errors, which values are just differences
> between realData stream and the latest available forecast for this key in
> forecastedData stream
>
> // I suppose this solution does not guarantee that all realData values will
> have corresponding forecast
> val errors: DataStream[(K, V)] =
> realData.join(forecastedData).where(0).equal(0)…
>
> Could you give an advice on how to implement such pattern? Do I have to
> write custom windows?
>
> Artem