Different Window Sizes in keyed stream

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

Different Window Sizes in keyed stream

hassahma
Hi All,

I want to know if flink allows to define sliding window size and slide time on the fly. For example I want to configure sliding window of size 2 min and slide 1 min for tenant A but size 10 min and slide min for tenant B in a keyed stream and so on for other tenants. My code is below.

final DataStream<WindowStats> eventStream = inputStream
.keyBy(TENANT, CATEGORY)
.window(SlidingProcessingTimeWindows.of(Time.minutes(2,Time.minute(1)))
.fold(new WindowStats(), newProductAggregationMapper(), newProductAggregationWindowFunction());

Can I do that for unlimited number of tenants in flink ?

Cheers,

Dr. Ahmad Hassan
Reply | Threaded
Open this post in threaded view
|

Re: Different Window Sizes in keyed stream

Fabian Hueske-2
Hi Ahmad,

that is not possible, at least not with Flink's built-in windows.
You can probably implement something like that on top of the DataStream API but I think it would quite a bit of effort.

IMO, the better approach would be to start a separate Flink job per tenant. This would also improve the isolation and failure behavior.

Best, Fabian

2017-06-22 19:43 GMT+02:00 Ahmad Hassan <[hidden email]>:
Hi All,

I want to know if flink allows to define sliding window size and slide time on the fly. For example I want to configure sliding window of size 2 min and slide 1 min for tenant A but size 10 min and slide min for tenant B in a keyed stream and so on for other tenants. My code is below.

final DataStream<WindowStats> eventStream = inputStream
.keyBy(TENANT, CATEGORY)
.window(SlidingProcessingTimeWindows.of(Time.minutes(2,Time.minute(1)))
.fold(new WindowStats(), newProductAggregationMapper(), newProductAggregationWindowFunction());

Can I do that for unlimited number of tenants in flink ?

Cheers,

Dr. Ahmad Hassan

Reply | Threaded
Open this post in threaded view
|

Re: Different Window Sizes in keyed stream

hassahma
Thanks Fabian for the advice!

Best Regards,

Dr. Ahmad Hassan

On 23 June 2017 at 09:05, Fabian Hueske <[hidden email]> wrote:
Hi Ahmad,

that is not possible, at least not with Flink's built-in windows.
You can probably implement something like that on top of the DataStream API but I think it would quite a bit of effort.

IMO, the better approach would be to start a separate Flink job per tenant. This would also improve the isolation and failure behavior.

Best, Fabian

2017-06-22 19:43 GMT+02:00 Ahmad Hassan <[hidden email]>:
Hi All,

I want to know if flink allows to define sliding window size and slide time on the fly. For example I want to configure sliding window of size 2 min and slide 1 min for tenant A but size 10 min and slide min for tenant B in a keyed stream and so on for other tenants. My code is below.

final DataStream<WindowStats> eventStream = inputStream
.keyBy(TENANT, CATEGORY)
.window(SlidingProcessingTimeWindows.of(Time.minutes(2,Time.minute(1)))
.fold(new WindowStats(), newProductAggregationMapper(), newProductAggregationWindowFunction());

Can I do that for unlimited number of tenants in flink ?

Cheers,

Dr. Ahmad Hassan


Reply | Threaded
Open this post in threaded view
|

Re: Different Window Sizes in keyed stream

Aljoscha Krettek
Hi Ahman,

You could in fact do this by writing a custom WindowAssigner. Have a look at the assignWindows() method here: https://github.com/apache/flink/blob/12b4185c6c09101b64e12a84c33dc4d28f95cff9/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingProcessingTimeWindows.java#L45-L45. Instead of assigning always the same windows you can type your WindowAssigner on the actual type of your stream and inspect the event when assigning windows.

Best,
Aljoscha

On 23. Jun 2017, at 10:32, Ahmad Hassan <[hidden email]> wrote:

Thanks Fabian for the advice!

Best Regards,

Dr. Ahmad Hassan

On 23 June 2017 at 09:05, Fabian Hueske <[hidden email]> wrote:
Hi Ahmad,

that is not possible, at least not with Flink's built-in windows.
You can probably implement something like that on top of the DataStream API but I think it would quite a bit of effort.

IMO, the better approach would be to start a separate Flink job per tenant. This would also improve the isolation and failure behavior.

Best, Fabian

2017-06-22 19:43 GMT+02:00 Ahmad Hassan <[hidden email]>:
Hi All,

I want to know if flink allows to define sliding window size and slide time on the fly. For example I want to configure sliding window of size 2 min and slide 1 min for tenant A but size 10 min and slide min for tenant B in a keyed stream and so on for other tenants. My code is below.

final DataStream<WindowStats> eventStream = inputStream
.keyBy(TENANT, CATEGORY)
.window(SlidingProcessingTimeWindows.of(Time.minutes(2,Time.minute(1)))
.fold(new WindowStats(), newProductAggregationMapper(), newProductAggregationWindowFunction());

Can I do that for unlimited number of tenants in flink ?

Cheers,

Dr. Ahmad Hassan