Event-Time Windowing

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

Event-Time Windowing

Alexander Kolb

Hi Guys,

I'm trying to use the event-time windowing feature. But the windowing does not work as expected.

What I've been doing is to write my own source which implements the EventTimeSourceFunction and uses the collectWithTimeStamp  method. Additionally I'm emitting a watermark after each element.

My job to test this looks like this:

env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val stream = env.addSource(SourceWithEventTime)

stream
  .window(Time.of(5,TimeUnit.SECONDS))
  .sum(0)
  .flatten()
  .print()

env.execute()

The Input are some tuples with TimeStamps set 10 seconds apart:

value: (1,test) timestamp: 1444228980390
value: (2,foo) timestamp: 1444228990390
value: (3,bar) timestamp: 1444229000390

What I'm expecting is that each tuple goes into a separate window.
The actual output is the sum of all tuples, hence all tuples are collected in the same window.

Thanks in advance!
Alex


Reply | Threaded
Open this post in threaded view
|

Re: Event-Time Windowing

Aljoscha Krettek
Hi,
right now, the 0.10-SNAPSHOT is in a bit of a weird state. We still have the old windowing API in there alongside the new one. To make your example use the new API that actually uses the timestamps and watermarks you would use the following code:

env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val stream = env.addSource(SourceWithEventTime)

stream
  .timeWindowAll(Time.of(5,TimeUnit.SECONDS))
  // or .windowAll(SlidingTimeWindows.of(Time.of(5, TimeUnit.SECONDS)))
  .sum(0)
  .print()

the version for keyed streams would be:
stream
  .keyBy(...)
  .timeWindow(Time.of(5,TimeUnit.SECONDS))
  // or .window(SlidingTimeWindows.of(Time.of(5, TimeUnit.SECONDS)))
  .sum(0)
  .print()

I hope this helps. :D

Cheers,
Aljoscha


On Wed, 7 Oct 2015 at 16:54 Alexander Kolb <[hidden email]> wrote:

Hi Guys,

I'm trying to use the event-time windowing feature. But the windowing does not work as expected.

What I've been doing is to write my own source which implements the EventTimeSourceFunction and uses the collectWithTimeStamp  method. Additionally I'm emitting a watermark after each element.

My job to test this looks like this:

env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val stream = env.addSource(SourceWithEventTime)

stream
  .window(Time.of(5,TimeUnit.SECONDS))
  .sum(0)
  .flatten()
  .print()

env.execute()

The Input are some tuples with TimeStamps set 10 seconds apart:

value: (1,test) timestamp: 1444228980390
value: (2,foo) timestamp: 1444228990390
value: (3,bar) timestamp: 1444229000390

What I'm expecting is that each tuple goes into a separate window.
The actual output is the sum of all tuples, hence all tuples are collected in the same window.

Thanks in advance!
Alex


Reply | Threaded
Open this post in threaded view
|

Re: Event-Time Windowing

Alexander Kolb

Thanks!

This works with the exception that I have to use the reduceWindow() method when summing up my the content of the window.
There still seems to be some work to do.

With the finished Api will I be able to switch from event-time to processing- or ingestion-time without having to adjust my code?

Best,
Alex


Aljoscha Krettek <[hidden email]> schrieb am Mi., 7. Okt. 2015, 17:23:
Hi,
right now, the 0.10-SNAPSHOT is in a bit of a weird state. We still have the old windowing API in there alongside the new one. To make your example use the new API that actually uses the timestamps and watermarks you would use the following code:

env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val stream = env.addSource(SourceWithEventTime)

stream
  .timeWindowAll(Time.of(5,TimeUnit.SECONDS))
  // or .windowAll(SlidingTimeWindows.of(Time.of(5, TimeUnit.SECONDS)))
  .sum(0)
  .print()

the version for keyed streams would be:
stream
  .keyBy(...)
  .timeWindow(Time.of(5,TimeUnit.SECONDS))
  // or .window(SlidingTimeWindows.of(Time.of(5, TimeUnit.SECONDS)))
  .sum(0)
  .print()

I hope this helps. :D

Cheers,
Aljoscha


On Wed, 7 Oct 2015 at 16:54 Alexander Kolb <[hidden email]> wrote:

Hi Guys,

I'm trying to use the event-time windowing feature. But the windowing does not work as expected.

What I've been doing is to write my own source which implements the EventTimeSourceFunction and uses the collectWithTimeStamp  method. Additionally I'm emitting a watermark after each element.

My job to test this looks like this:

env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val stream = env.addSource(SourceWithEventTime)

stream
  .window(Time.of(5,TimeUnit.SECONDS))
  .sum(0)
  .flatten()
  .print()

env.execute()

The Input are some tuples with TimeStamps set 10 seconds apart:

value: (1,test) timestamp: 1444228980390
value: (2,foo) timestamp: 1444228990390
value: (3,bar) timestamp: 1444229000390

What I'm expecting is that each tuple goes into a separate window.
The actual output is the sum of all tuples, hence all tuples are collected in the same window.

Thanks in advance!
Alex


Reply | Threaded
Open this post in threaded view
|

Re: Event-Time Windowing

Aljoscha Krettek
Hi,
yes, once this PR is merged https://github.com/apache/flink/pull/1238 you can switch between time characteristics and also use the aggregations functions such as sum(...). I'm hoping to merge this by tonight. The tests are still running right now. :D

Cheers,
Aljoscha

On Wed, 7 Oct 2015 at 17:45 Alexander Kolb <[hidden email]> wrote:

Thanks!

This works with the exception that I have to use the reduceWindow() method when summing up my the content of the window.
There still seems to be some work to do.

With the finished Api will I be able to switch from event-time to processing- or ingestion-time without having to adjust my code?

Best,
Alex


Aljoscha Krettek <[hidden email]> schrieb am Mi., 7. Okt. 2015, 17:23:
Hi,
right now, the 0.10-SNAPSHOT is in a bit of a weird state. We still have the old windowing API in there alongside the new one. To make your example use the new API that actually uses the timestamps and watermarks you would use the following code:

env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val stream = env.addSource(SourceWithEventTime)

stream
  .timeWindowAll(Time.of(5,TimeUnit.SECONDS))
  // or .windowAll(SlidingTimeWindows.of(Time.of(5, TimeUnit.SECONDS)))
  .sum(0)
  .print()

the version for keyed streams would be:
stream
  .keyBy(...)
  .timeWindow(Time.of(5,TimeUnit.SECONDS))
  // or .window(SlidingTimeWindows.of(Time.of(5, TimeUnit.SECONDS)))
  .sum(0)
  .print()

I hope this helps. :D

Cheers,
Aljoscha


On Wed, 7 Oct 2015 at 16:54 Alexander Kolb <[hidden email]> wrote:

Hi Guys,

I'm trying to use the event-time windowing feature. But the windowing does not work as expected.

What I've been doing is to write my own source which implements the EventTimeSourceFunction and uses the collectWithTimeStamp  method. Additionally I'm emitting a watermark after each element.

My job to test this looks like this:

env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val stream = env.addSource(SourceWithEventTime)

stream
  .window(Time.of(5,TimeUnit.SECONDS))
  .sum(0)
  .flatten()
  .print()

env.execute()

The Input are some tuples with TimeStamps set 10 seconds apart:

value: (1,test) timestamp: 1444228980390
value: (2,foo) timestamp: 1444228990390
value: (3,bar) timestamp: 1444229000390

What I'm expecting is that each tuple goes into a separate window.
The actual output is the sum of all tuples, hence all tuples are collected in the same window.

Thanks in advance!
Alex


Reply | Threaded
Open this post in threaded view
|

Re: Event-Time Windowing

Aljoscha Krettek
In reply to this post by Alexander Kolb
Hi,
yes, once this PR is merged https://github.com/apache/flink/pull/1238 you can switch between time characteristics and also use the aggregations functions such as sum(...). I'm hoping to merge this by tonight. The tests are still running right now. :D

Cheers,
Aljoscha



On Wed, 7 Oct 2015 at 17:45 Alexander Kolb <[hidden email]> wrote:

Thanks!

This works with the exception that I have to use the reduceWindow() method when summing up my the content of the window.
There still seems to be some work to do.

With the finished Api will I be able to switch from event-time to processing- or ingestion-time without having to adjust my code?

Best,
Alex


Aljoscha Krettek <[hidden email]> schrieb am Mi., 7. Okt. 2015, 17:23:
Hi,
right now, the 0.10-SNAPSHOT is in a bit of a weird state. We still have the old windowing API in there alongside the new one. To make your example use the new API that actually uses the timestamps and watermarks you would use the following code:

env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val stream = env.addSource(SourceWithEventTime)

stream
  .timeWindowAll(Time.of(5,TimeUnit.SECONDS))
  // or .windowAll(SlidingTimeWindows.of(Time.of(5, TimeUnit.SECONDS)))
  .sum(0)
  .print()

the version for keyed streams would be:
stream
  .keyBy(...)
  .timeWindow(Time.of(5,TimeUnit.SECONDS))
  // or .window(SlidingTimeWindows.of(Time.of(5, TimeUnit.SECONDS)))
  .sum(0)
  .print()

I hope this helps. :D

Cheers,
Aljoscha


On Wed, 7 Oct 2015 at 16:54 Alexander Kolb <[hidden email]> wrote:

Hi Guys,

I'm trying to use the event-time windowing feature. But the windowing does not work as expected.

What I've been doing is to write my own source which implements the EventTimeSourceFunction and uses the collectWithTimeStamp  method. Additionally I'm emitting a watermark after each element.

My job to test this looks like this:

env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val stream = env.addSource(SourceWithEventTime)

stream
  .window(Time.of(5,TimeUnit.SECONDS))
  .sum(0)
  .flatten()
  .print()

env.execute()

The Input are some tuples with TimeStamps set 10 seconds apart:

value: (1,test) timestamp: 1444228980390
value: (2,foo) timestamp: 1444228990390
value: (3,bar) timestamp: 1444229000390

What I'm expecting is that each tuple goes into a separate window.
The actual output is the sum of all tuples, hence all tuples are collected in the same window.

Thanks in advance!
Alex