why doesn't the over-window-aggregation sort the element(considering watermark) before processing?

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

why doesn't the over-window-aggregation sort the element(considering watermark) before processing?

Yan Zhou [FDS Science] ­

Hi,


I use bounded over-window  aggregation in my application. However, sometimes some input elements are "discarded" and not generating output. By reading the source code of RowTimeBoundedRangeOver.scala, I realize the record is actually discarded if it is out of order. Please see the quoted code block below. Please help me to understand why don't we sort the record first? Said we are using BoundedOutOfOrdernessTimestampExtractorwe can use watermark to select a portion of the elements to do the sorting. when watermark proceeds, process the elements that are before the watermark and extend the portion of elements for sorting. 



Best

Yan



override def processElement(
    inputC: CRow,
    ctx: ProcessFunction[CRow, CRow]#Context,
    out: Collector[CRow]): Unit = {
// triggering timestamp for trigger calculation
val triggeringTs = input.getField(rowTimeIdx).asInstanceOf[Long]

val lastTriggeringTs = lastTriggeringTsState.value

// check if the data is expired, if not, save the data and register event time timer
if (triggeringTs > lastTriggeringTs) {
// put in cache, and register timer to process/clean    
// ...
}else{
// DISCARD
}
}

Reply | Threaded
Open this post in threaded view
|

Re: why doesn't the over-window-aggregation sort the element(considering watermark) before processing?

Fabian Hueske-2
The over window operates on an unbounded stream of data. Hence it is not possible to sort the complete stream.
Instead we can sort ranges of the stream. Flink uses watermarks to define these ranges.

The operator processes the records in timestamp order that are not late, i.e., have timestamps larger than the last watermark.
In principle there are different ways to handle records that violate this condition. In the current implementation of the operator we simply drop these records.

At the current state, the only thing to avoid records from being dropped is to use more conservative watermarks. Note that this will increase the processing latency.

Best, Fabian

2018-04-18 8:55 GMT+02:00 Yan Zhou [FDS Science] <[hidden email]>:

Hi,


I use bounded over-window  aggregation in my application. However, sometimes some input elements are "discarded" and not generating output. By reading the source code of RowTimeBoundedRangeOver.scala, I realize the record is actually discarded if it is out of order. Please see the quoted code block below. Please help me to understand why don't we sort the record first? Said we are using BoundedOutOfOrdernessTimestampExtractorwe can use watermark to select a portion of the elements to do the sorting. when watermark proceeds, process the elements that are before the watermark and extend the portion of elements for sorting. 



Best

Yan



override def processElement(
    inputC: CRow,
    ctx: ProcessFunction[CRow, CRow]#Context,
    out: Collector[CRow]): Unit = {
// triggering timestamp for trigger calculation
val triggeringTs = input.getField(rowTimeIdx).asInstanceOf[Long]

val lastTriggeringTs = lastTriggeringTsState.value

// check if the data is expired, if not, save the data and register event time timer
if (triggeringTs > lastTriggeringTs) {
// put in cache, and register timer to process/clean    
// ...
}else{
// DISCARD
}
}


Reply | Threaded
Open this post in threaded view
|

Re: why doesn't the over-window-aggregation sort the element(considering watermark) before processing?

Yan Zhou [FDS Science] ­

Hi Fabian,


Thanks for the reply.

 

I think here is the problem. Currently, the timestamp of an event is compared with previous processed element's timestamp, instead of watermark, to determine if it's late. 


To my understanding, even the order of emitted event in preceding operator is perfectly sorted by event time, because of shuffle/buffing or something else, the order or arrival in current operator is not guaranteed. And watermark is not considered within the over window operator, I will image that a substantial portion of the elements might be dropped. How can I avoid that?


Looking forward to hear your reply.


Best

Yan



From: Fabian Hueske <[hidden email]>
Sent: Wednesday, April 18, 2018 1:04:43 AM
To: Yan Zhou [FDS Science]
Cc: user
Subject: Re: why doesn't the over-window-aggregation sort the element(considering watermark) before processing?
 
The over window operates on an unbounded stream of data. Hence it is not possible to sort the complete stream.
Instead we can sort ranges of the stream. Flink uses watermarks to define these ranges.

The operator processes the records in timestamp order that are not late, i.e., have timestamps larger than the last watermark.
In principle there are different ways to handle records that violate this condition. In the current implementation of the operator we simply drop these records.

At the current state, the only thing to avoid records from being dropped is to use more conservative watermarks. Note that this will increase the processing latency.

Best, Fabian

2018-04-18 8:55 GMT+02:00 Yan Zhou [FDS Science] <[hidden email]>:

Hi,


I use bounded over-window  aggregation in my application. However, sometimes some input elements are "discarded" and not generating output. By reading the source code of RowTimeBoundedRangeOver.scala, I realize the record is actually discarded if it is out of order. Please see the quoted code block below. Please help me to understand why don't we sort the record first? Said we are using BoundedOutOfOrdernessTimestampExtractorwe can use watermark to select a portion of the elements to do the sorting. when watermark proceeds, process the elements that are before the watermark and extend the portion of elements for sorting. 



Best

Yan



override def processElement(
    inputC: CRow,
    ctx: ProcessFunction[CRow, CRow]#Context,
    out: Collector[CRow]): Unit = {
// triggering timestamp for trigger calculation
val triggeringTs = input.getField(rowTimeIdx).asInstanceOf[Long]

val lastTriggeringTs = lastTriggeringTsState.value

// check if the data is expired, if not, save the data and register event time timer
if (triggeringTs > lastTriggeringTs) {
// put in cache, and register timer to process/clean    
// ...
}else{
// DISCARD
}
}


Reply | Threaded
Open this post in threaded view
|

Re: why doesn't the over-window-aggregation sort the element(considering watermark) before processing?

Yan Zhou [FDS Science] ­

nvm, I figure it out. The event is not process once it's arrived. It's registered to processed in event time. It make sense.


best

Yan


From: Yan Zhou [FDS Science] <[hidden email]>
Sent: Wednesday, April 18, 2018 12:56:58 PM
To: Fabian Hueske
Cc: user
Subject: Re: why doesn't the over-window-aggregation sort the element(considering watermark) before processing?
 

Hi Fabian,


Thanks for the reply.

 

I think here is the problem. Currently, the timestamp of an event is compared with previous processed element's timestamp, instead of watermark, to determine if it's late. 


To my understanding, even the order of emitted event in preceding operator is perfectly sorted by event time, because of shuffle/buffing or something else, the order or arrival in current operator is not guaranteed. And watermark is not considered within the over window operator, I will image that a substantial portion of the elements might be dropped. How can I avoid that?


Looking forward to hear your reply.


Best

Yan



From: Fabian Hueske <[hidden email]>
Sent: Wednesday, April 18, 2018 1:04:43 AM
To: Yan Zhou [FDS Science]
Cc: user
Subject: Re: why doesn't the over-window-aggregation sort the element(considering watermark) before processing?
 
The over window operates on an unbounded stream of data. Hence it is not possible to sort the complete stream.
Instead we can sort ranges of the stream. Flink uses watermarks to define these ranges.

The operator processes the records in timestamp order that are not late, i.e., have timestamps larger than the last watermark.
In principle there are different ways to handle records that violate this condition. In the current implementation of the operator we simply drop these records.

At the current state, the only thing to avoid records from being dropped is to use more conservative watermarks. Note that this will increase the processing latency.

Best, Fabian

2018-04-18 8:55 GMT+02:00 Yan Zhou [FDS Science] <[hidden email]>:

Hi,


I use bounded over-window  aggregation in my application. However, sometimes some input elements are "discarded" and not generating output. By reading the source code of RowTimeBoundedRangeOver.scala, I realize the record is actually discarded if it is out of order. Please see the quoted code block below. Please help me to understand why don't we sort the record first? Said we are using BoundedOutOfOrdernessTimestampExtractorwe can use watermark to select a portion of the elements to do the sorting. when watermark proceeds, process the elements that are before the watermark and extend the portion of elements for sorting. 



Best

Yan



override def processElement(
    inputC: CRow,
    ctx: ProcessFunction[CRow, CRow]#Context,
    out: Collector[CRow]): Unit = {
// triggering timestamp for trigger calculation
val triggeringTs = input.getField(rowTimeIdx).asInstanceOf[Long]

val lastTriggeringTs = lastTriggeringTsState.value

// check if the data is expired, if not, save the data and register event time timer
if (triggeringTs > lastTriggeringTs) {
// put in cache, and register timer to process/clean    
// ...
}else{
// DISCARD
}
}