timeWindow emits records before window ends?

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

timeWindow emits records before window ends?

NEKRASSOV, ALEXEI

Hello,

 

With time characteristic set to IngestionTime I expected “timeWindow(Time.minutes(3))” to NOT produce any records in the first 3 minutes of running the job, and yet it does emit the record before 3 minutes elapse.

Am I doing something wrong? Or my understanding of timeWindow is incorrect?

 

For example, in Flink UI I see:

 

TriggerWindow(TumblingEventTimeWindows(180000), AggregatingStateDescriptor{serializer=org.apache.flink.api.java.typeutils.runtime.PojoSerializer@7c810ef9, aggFunction=nextgen.McdrAggregator@7d7758be}, EventTimeTrigger(), WindowedStream.aggregate(WindowedStream.java:752)) -> Map

 

With “duration” 42s and “records sent” 689516.

 

I expected no records would be sent out until 180000 ms elapse.

 

Thanks,

Alex Nekrassov

[hidden email]

 

Reply | Threaded
Open this post in threaded view
|

Re: timeWindow emits records before window ends?

Nico Kruber
Hi Alex,
If you don't set an offset for the Window, it will be aligned with epoch
[1], i.e. assume you started your program at time 00:02:18, then the
window by default starts 00:00:00 and ends 00:02:59.999 and you will
emit records 42 after you started your program.
If you need the window to count 3 minutes from any other time, then
please refer to using TumblingEventTimeWindows#of(Time size, Time offset).


Nico


[1]
https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/operators/windows.html#tumbling-windows

On 27/03/18 16:22, NEKRASSOV, ALEXEI wrote:

> Hello,
>
>  
>
> With time characteristic set to IngestionTime I expected
> “timeWindow(Time.minutes(3))” to NOT produce any records in the first 3
> minutes of running the job, and yet it does emit the record before 3
> minutes elapse.
>
> Am I doing something wrong? Or my understanding of timeWindow is incorrect?
>
>  
>
> For example, in Flink UI I see:
>
>  
>
> TriggerWindow(TumblingEventTimeWindows(180000),
> AggregatingStateDescriptor{serializer=org.apache.flink.api.java.typeutils.runtime.PojoSerializer@7c810ef9,
> aggFunction=nextgen.McdrAggregator@7d7758be}, EventTimeTrigger(),
> WindowedStream.aggregate(WindowedStream.java:752)) -> Map
>
>  
>
> With “duration” 42s and “records sent” 689516.
>
>  
>
> I expected no records would be sent out until 180000 ms elapse.
>
>  
>
> Thanks,
>
> Alex Nekrassov
>
> [hidden email] <mailto:[hidden email]>
>
>  
>
--
Nico Kruber | Software Engineer
data Artisans

Follow us @dataArtisans
--
Join Flink Forward - The Apache Flink Conference
Stream Processing | Event Driven | Real Time
--
Data Artisans GmbH | Stresemannstr. 121A,10963 Berlin, Germany
data Artisans, Inc. | 1161 Mission Street, San Francisco, CA-94103, USA
--
Data Artisans GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen


signature.asc (201 bytes) Download Attachment