Multiple Window Operators

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

Multiple Window Operators

nico-2
Hi,

I am a little bit confused regarding the windows in Flink. Is it possible to use multiple window operators in a single flink job? In my example I receive events every 5s, which need to be edited before further investigation. For this I use a keyBy(ID) followed by a sliding Count-Window (2,1)... so I always use an old and new event in order to calculate the missing attribute (for this attribute I need 2 events with the same ID).

For further investigation I would like to use a Tumbling TimeWindow to analyze events within the last 10s... however It doesn't seem to work and I don't know why.

How it looks like:

stream
.keyBy("id")
.countWindow(2,1)
.reduce(new Reduce())
.keyBy("area")
.timeWindow(Time.seconds(10))
.fold(new Fold())
.print()


When I use both windows separately, it works:

stream.keyBy("id").countWindow(2,1).reduce(new Reduce()).print()
stream.keyBy("area").timeWindow(Time.seconds(10)).fold(new Fold()).print()

Would it be better to use a stateful Mapoperation instead of the countWindow?


Best regards,
Nico
Reply | Threaded
Open this post in threaded view
|

Re: Multiple Window Operators

Aljoscha Krettek
Hi,
the problem is that the elements emitted from the count window operation all have a timestamp of Long.MAX_VALUE. The reason for this is that "countWindow(int, int)" de-sugars to this

input
  .keyBy(...)
  .window(GlobalWindows.create())
  .trigger(CountTrigger.of(1))
  .evictor(CountEvictor.of(2))

elements emitted for windows have the end timestamp of that window assigned to them and GlobalWindow has the end timestamp Long.MAX_VALUE.

Using a stateful map should indeed work and I would encourage to use that instead of a count window.

Cheers,
Aljoscha

On Thu, 8 Dec 2016 at 01:05 Nico <[hidden email]> wrote:
Hi,

I am a little bit confused regarding the windows in Flink. Is it possible to use multiple window operators in a single flink job? In my example I receive events every 5s, which need to be edited before further investigation. For this I use a keyBy(ID) followed by a sliding Count-Window (2,1)... so I always use an old and new event in order to calculate the missing attribute (for this attribute I need 2 events with the same ID).

For further investigation I would like to use a Tumbling TimeWindow to analyze events within the last 10s... however It doesn't seem to work and I don't know why.

How it looks like:

stream
.keyBy("id")
.countWindow(2,1)
.reduce(new Reduce())
.keyBy("area")
.timeWindow(Time.seconds(10))
.fold(new Fold())
.print()


When I use both windows separately, it works:

stream.keyBy("id").countWindow(2,1).reduce(new Reduce()).print()
stream.keyBy("area").timeWindow(Time.seconds(10)).fold(new Fold()).print()

Would it be better to use a stateful Mapoperation instead of the countWindow?


Best regards,
Nico