Question about windowing

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

Question about windowing

Jerry Peng
Hello,

I have a question regarding windowing and triggering.  I am trying to
connect the dots between the simple windowing api e.g.

stream.countWindow(1000, 100)

to the underlying representation using triggers and evictors api:

stream.window(GlobalWindows.create())
  .evictor(CountEvictor.of(1000))
  .trigger(CountTrigger.of(100))


how is the above equivalent to the semantics of a window of window
length to be 1000 tuples and the sliding interval to be 100 tuples?

And for time duration windows:

stream.timeWindow(Time.seconds(5), Time.seconds(1))

which maps to:

stream.window(SlidingProcessingTimeWindows.of(Time.seconds(5), Time.seconds(1)))
  .trigger(ProcessingTimeTrigger.create())

why isn't it mapped to something like:

stream.window(SlidingProcessingTimeWindows.create())
  .trigger(ProccessingTimeTrigger.of(1))
  .evictor(TimeEvictor.of(5))

?

Thanks for any help in advance!

Best,

Jerry
Reply | Threaded
Open this post in threaded view
|

Re: Question about windowing

Tony Wei
Hi Jerry,

You can learn about Flink's windowing mechanics in this blog (https://flink.apache.org/news/2015/12/04/Introducing-windows.html).

To my understanging, window() defines how Flink use WindowAssigner to insert an element to the right windows, trigger() defines when to fire a  window and evictor() defines what elements in window should be passed to the evaluation function.

Therefore, it is obvious for time duration windows to use window() to describe an element should be assigned which windows, and trigger by the processing time.
For the count window, we should actually count the number of elements, so we insert all elements to the single global window, trigger by every 100 elements and filter only 1000 elements to the next evaluation function.

One more thing, in the sliding count window, each element will be placed in the one window, but in the sliding time window element would be duplicated and insert into multiple windows.
Use you case as an example, element would be placed into five different windows each represent different time range.

Hope this will help you.

Regards,
Tony Wei



2017-08-23 8:22 GMT+08:00 Jerry Peng <[hidden email]>:
Hello,

I have a question regarding windowing and triggering.  I am trying to
connect the dots between the simple windowing api e.g.

stream.countWindow(1000, 100)

to the underlying representation using triggers and evictors api:

stream.window(GlobalWindows.create())
  .evictor(CountEvictor.of(1000))
  .trigger(CountTrigger.of(100))


how is the above equivalent to the semantics of a window of window
length to be 1000 tuples and the sliding interval to be 100 tuples?

And for time duration windows:

stream.timeWindow(Time.seconds(5), Time.seconds(1))

which maps to:

stream.window(SlidingProcessingTimeWindows.of(Time.seconds(5), Time.seconds(1)))
  .trigger(ProcessingTimeTrigger.create())

why isn't it mapped to something like:

stream.window(SlidingProcessingTimeWindows.create())
  .trigger(ProccessingTimeTrigger.of(1))
  .evictor(TimeEvictor.of(5))

?

Thanks for any help in advance!

Best,

Jerry

Reply | Threaded
Open this post in threaded view
|

Re: Question about windowing

Aljoscha Krettek
Yes, this is a very good explanation, Tony!

I'd like to add that "Evictor" is not really a good name for what it does. It should be more like "Keeper" or "Retainer" because what a "CountEvictor.of(1000)" really does is to evict everything but the last 1000 elements, so it should be called "CountRetainer.of(1000)". 

(The name Trigger and Evictor were originally inspired by IBM Infosphere streams, AFAIK.)

Best,
Aljoscha

On 23. Aug 2017, at 09:56, 魏偉哲 <[hidden email]> wrote:

Hi Jerry,

You can learn about Flink's windowing mechanics in this blog (https://flink.apache.org/news/2015/12/04/Introducing-windows.html).

To my understanging, window() defines how Flink use WindowAssigner to insert an element to the right windows, trigger() defines when to fire a  window and evictor() defines what elements in window should be passed to the evaluation function.

Therefore, it is obvious for time duration windows to use window() to describe an element should be assigned which windows, and trigger by the processing time.
For the count window, we should actually count the number of elements, so we insert all elements to the single global window, trigger by every 100 elements and filter only 1000 elements to the next evaluation function.

One more thing, in the sliding count window, each element will be placed in the one window, but in the sliding time window element would be duplicated and insert into multiple windows.
Use you case as an example, element would be placed into five different windows each represent different time range.

Hope this will help you.

Regards,
Tony Wei



2017-08-23 8:22 GMT+08:00 Jerry Peng <[hidden email]>:
Hello,

I have a question regarding windowing and triggering.  I am trying to
connect the dots between the simple windowing api e.g.

stream.countWindow(1000, 100)

to the underlying representation using triggers and evictors api:

stream.window(GlobalWindows.create())
  .evictor(CountEvictor.of(1000))
  .trigger(CountTrigger.of(100))


how is the above equivalent to the semantics of a window of window
length to be 1000 tuples and the sliding interval to be 100 tuples?

And for time duration windows:

stream.timeWindow(Time.seconds(5), Time.seconds(1))

which maps to:

stream.window(SlidingProcessingTimeWindows.of(Time.seconds(5), Time.seconds(1)))
  .trigger(ProcessingTimeTrigger.create())

why isn't it mapped to something like:

stream.window(SlidingProcessingTimeWindows.create())
  .trigger(ProccessingTimeTrigger.of(1))
  .evictor(TimeEvictor.of(5))

?

Thanks for any help in advance!

Best,

Jerry