Sliding Windows Processing problem with Kafka Queue Event Time and BoundedOutOfOrdernessGenerator

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

Sliding Windows Processing problem with Kafka Queue Event Time and BoundedOutOfOrdernessGenerator

Sujit Sakre
Hi,

Hope you are well.

We have encountered an issue in processing sliding windows. Here we have encountered the problem that if the last record is outside of the sliding window end time then it does not process the record till the next sliding window is completely occupied and gets triggered. 

Please consider the example below:

The datastream is Kafka based and Keyed on Location (1, 2, 3, 4 etc.) 

Our previous sliding window 
starts at
09-09-2016 12:54:00,
End at
09-09-2016 13:00:00
Key is 15 (Location)

Records in between with timestamps:
09-09-2016 12:56:33
09-09-2016 12:56:47
09-09-2016 12:58:04
09-09-2016 12:58:39
09-09-2016 12:58:45

However the next window starts at
09-09-2016 13:04:00
and ends at
09-09-2016 13:10:00
with Key as  16 (Location)
with record timestamps:

09-09-2016 13:04:48
09-09-2016 13:06:07
09-09-2016 13:06:38
09-09-2016 13:07:25
09-09-2016 13:08:00
09-09-2016 13:08:20
09-09-2016 13:08:38


is not processed until records are entered in Location 17 with the timestamps:
09-09-2016 13:08:48
09-09-2016 13:08:55
09-09-2016 13:09:11
09-09-2016 13:11:48
The window that gets formed at that time has
Start Time: 09-09-2016 13:06:00 
End Time: 09-09-2016 13:12:00

We are using the standard BoundedOutOfOrdernessGenerator with maximum out of orderness of 5 seconds (we have tried various other combinations of the maxoutoforderness values but without success), and Event Time based processing.

/**
 * This generator generates watermarks assuming that elements arrive out of order,
 * but only to a certain degree. The latest elements for a certain timestamp t will arrive
 * at most n milliseconds after the earliest elements for timestamp t.
 */
public class BoundedOutOfOrdernessGenerator extends AssignerWithPeriodicWatermarks<MyEvent> {

    private final long maxOutOfOrderness = 5000; // 5 seconds

    private long currentMaxTimestamp;

    @Override
    public long extractTimestamp(MyEvent element, long previousElementTimestamp) {
        long timestamp = element.getCreationTime();
        currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp);
        return timestamp;
    }

    @Override
    public Watermark getCurrentWatermark() {
        // return the watermark as current highest timestamp minus the out-of-orderness bound
        return new Watermark(currentMaxTimestamp - maxOutOfOrderness);
    }
}

Thus to summarize the problem, 

the records in the window before the last window with different location are not processed until there is a window next with a timestamp that is more than the End Time timestamp of the existing sliding window.
This means the window is not processed until next set of records arrive with timestamps that are more than existing end time window timestamp.

In a real situation, this means that we wait till the next set of records arrive, which may be after a very long duration (e.g. maybe 1 hour), and previous records are not processed till then.

Is this a problem that is by behavior? 
Why does the sliding window not process the record that is present even after not receiving a record for more than a substantial amount of time, e.g. 30 minutes? 
How do we resolve this situation?

Please could you suggest how to resolve this.

Many thanks.



Sujit Sakre


This email is sent on behalf of Northgate Public Services (UK) Limited and its associated companies including Rave Technologies (India) Pvt Limited (together "Northgate Public Services") and is strictly confidential and intended solely for the addressee(s). 
If you are not the intended recipient of this email you must: (i) not disclose, copy or distribute its contents to any other person nor use its contents in any way or you may be acting unlawfully;  (ii) contact Northgate Public Services immediately on +44(0)1908 264500 quoting the name of the sender and the addressee then delete it from your system.
Northgate Public Services has taken reasonable precautions to ensure that no viruses are contained in this email, but does not accept any responsibility once this email has been transmitted.  You should scan attachments (if any) for viruses.

Northgate Public Services (UK) Limited, registered in England and Wales under number 00968498 with a registered address of Peoplebuilding 2, Peoplebuilding Estate, Maylands Avenue, Hemel Hempstead, Hertfordshire, HP2 4NN.  Rave Technologies (India) Pvt Limited, registered in India under number 117068 with a registered address of 2nd Floor, Ballard House, Adi Marzban Marg, Ballard Estate, Mumbai, Maharashtra, India, 400001.
Reply | Threaded
Open this post in threaded view
|

Re: Sliding Windows Processing problem with Kafka Queue Event Time and BoundedOutOfOrdernessGenerator

Nico Kruber
Hi Sujit,
actually, according to
https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/
windows.html#allowed-lateness
the sliding window should fire each time for each element arriving late.

Did you set the following for your window operator?
    .window(<window assigner>)
    .allowedLateness(<time>)

The expected behaviour should be:
1) fire once a watermark exceeding the sliding window end has been received
2) fire each time an event after the sliding window end is received as long as
no watermark is exceeding sliding window end + allowed lateness

In your case, if I see it correctly and do not misinterpret your example, the
late firing has nothing to do with the sliding window but rather your
BoundedOutOfOrdernessGenerator:
Since a sliding window is only closed and fired once the watermark is received,
its first time firing is actually 5s behind as per your maxOutOfOrderness.

It actually looks like you may be better of using allowed lateness for your
usecase depending on what you actually need.


Regards
Nico

On Monday, 27 February 2017 15:44:20 CET Sujit Sakre wrote:

> Hi,
>
> Hope you are well.
>
> We have encountered an issue in processing sliding windows. Here we have
> encountered the problem that if the last record is outside of the sliding
> window end time then it does not process the record till the next sliding
> window is completely occupied and gets triggered.
>
> Please consider the example below:
>
> The datastream is Kafka based and Keyed on Location (1, 2, 3, 4 etc.)
>
> Our previous sliding window
> starts at
> 09-09-2016 12:54:00,
> End at
> 09-09-2016 13:00:00
> *Key is 15 (Location)*
>
> Records in between with timestamps:
> 09-09-2016 12:56:33
> 09-09-2016 12:56:47
> 09-09-2016 12:58:04
> 09-09-2016 12:58:39
> 09-09-2016 12:58:45
>
> However the next window starts at
> 09-09-2016 13:04:00
> and ends at
> 09-09-2016 13:10:00
> *with Key as  16 (Location)*
> with record timestamps:
>
> 09-09-2016 13:04:48
> 09-09-2016 13:06:07
> 09-09-2016 13:06:38
> 09-09-2016 13:07:25
> 09-09-2016 13:08:00
> 09-09-2016 13:08:20
> 09-09-2016 13:08:38
>
>
> is not processed until records are entered in *Location 17* with the
> timestamps:
> 09-09-2016 13:08:48
> 09-09-2016 13:08:55
> 09-09-2016 13:09:11
> 09-09-2016 13:11:48
> The window that gets formed at that time has
> Start Time: 09-09-2016 13:06:00
> End Time: 09-09-2016 13:12:00
>
> We are using the standard BoundedOutOfOrdernessGenerator with maximum out
> of orderness of 5 seconds (we have tried various other combinations of the
> maxoutoforderness values but without success), and Event Time based
> processing.
>
> /** * This generator generates watermarks assuming that elements
> arrive out of order, * but only to a certain degree. The latest
> elements for a certain timestamp t will arrive * at most n
> milliseconds after the earliest elements for timestamp t. */public
> class BoundedOutOfOrdernessGenerator extends
> AssignerWithPeriodicWatermarks<MyEvent> {
>
>     private final long maxOutOfOrderness = 5000; // 5 seconds
>
>     private long currentMaxTimestamp;
>
>     @Override
>     public long extractTimestamp(MyEvent element, long
> previousElementTimestamp) {
>         long timestamp = element.getCreationTime();
>         currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp);
>         return timestamp;
>     }
>
>     @Override
>     public Watermark getCurrentWatermark() {
>         // return the watermark as current highest timestamp minus the
> out-of-orderness bound
>         return new Watermark(currentMaxTimestamp - maxOutOfOrderness);
>     }}
>
>
> Thus to summarize the problem,
>
> the records in the window before the last window with different location
> are not processed until there is a window next with a timestamp that is
> more than the End Time timestamp of the existing sliding window.
> This means the window is not processed until next set of records arrive
> with timestamps that are more than existing end time window timestamp.
>
> In a real situation, this means that we wait till the next set of records
> arrive, which may be after a very long duration (e.g. maybe 1 hour), and
> previous records are not processed till then.
>
> Is this a problem that is by behavior?
> Why does the sliding window not process the record that is present even
> after not receiving a record for more than a substantial amount of time,
> e.g. 30 minutes?
> How do we resolve this situation?
>
> Please could you suggest how to resolve this.
>
> Many thanks.
>
>
>
> *Sujit Sakre*


signature.asc (201 bytes) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: Sliding Windows Processing problem with Kafka Queue Event Time and BoundedOutOfOrdernessGenerator

Sujit Sakre
Hi Nico,

Thanks.

Yes, we are using the window assigner and allowedLateness parameters as below

.window(SlidingEventTimeWindows.of(Time.minutes(6), Time.minutes(2)))   // 6 min window with 2 min sliding window
.allowedLateness(Time.seconds(5))

We need to use the BoundedOutofOrdernessGenerator to assign timestamps anyway. 

Even if we use .allowedLateness(Time.seconds(5)), it gives the same output.

It seems to me that the watermark is not generated for final set of records. If we don't use the watermark and timestamps function, the processing can't take place. Hence this processing is dependent on watermarks.

Please could you suggest how we can solve this?

Thank you.



Sujit Sakre


On 27 February 2017 at 22:55, Nico Kruber <[hidden email]> wrote:
Hi Sujit,
actually, according to
https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/
windows.html#allowed-lateness
the sliding window should fire each time for each element arriving late.

Did you set the following for your window operator?
    .window(<window assigner>)
    .allowedLateness(<time>)

The expected behaviour should be:
1) fire once a watermark exceeding the sliding window end has been received
2) fire each time an event after the sliding window end is received as long as
no watermark is exceeding sliding window end + allowed lateness

In your case, if I see it correctly and do not misinterpret your example, the
late firing has nothing to do with the sliding window but rather your
BoundedOutOfOrdernessGenerator:
Since a sliding window is only closed and fired once the watermark is received,
its first time firing is actually 5s behind as per your maxOutOfOrderness.

It actually looks like you may be better of using allowed lateness for your
usecase depending on what you actually need.


Regards
Nico

On Monday, 27 February 2017 15:44:20 CET Sujit Sakre wrote:
> Hi,
>
> Hope you are well.
>
> We have encountered an issue in processing sliding windows. Here we have
> encountered the problem that if the last record is outside of the sliding
> window end time then it does not process the record till the next sliding
> window is completely occupied and gets triggered.
>
> Please consider the example below:
>
> The datastream is Kafka based and Keyed on Location (1, 2, 3, 4 etc.)
>
> Our previous sliding window
> starts at
> 09-09-2016 12:54:00,
> End at
> 09-09-2016 13:00:00
> *Key is 15 (Location)*
>
> Records in between with timestamps:
> 09-09-2016 12:56:33
> 09-09-2016 12:56:47
> 09-09-2016 12:58:04
> 09-09-2016 12:58:39
> 09-09-2016 12:58:45
>
> However the next window starts at
> 09-09-2016 13:04:00
> and ends at
> 09-09-2016 13:10:00
> *with Key as  16 (Location)*
> with record timestamps:
>
> 09-09-2016 13:04:48
> 09-09-2016 13:06:07
> 09-09-2016 13:06:38
> 09-09-2016 13:07:25
> 09-09-2016 13:08:00
> 09-09-2016 13:08:20
> 09-09-2016 13:08:38
>
>
> is not processed until records are entered in *Location 17* with the
> timestamps:
> 09-09-2016 13:08:48
> 09-09-2016 13:08:55
> 09-09-2016 13:09:11
> 09-09-2016 13:11:48
> The window that gets formed at that time has
> Start Time: 09-09-2016 13:06:00
> End Time: 09-09-2016 13:12:00
>
> We are using the standard BoundedOutOfOrdernessGenerator with maximum out
> of orderness of 5 seconds (we have tried various other combinations of the
> maxoutoforderness values but without success), and Event Time based
> processing.
>
> /** * This generator generates watermarks assuming that elements
> arrive out of order, * but only to a certain degree. The latest
> elements for a certain timestamp t will arrive * at most n
> milliseconds after the earliest elements for timestamp t. */public
> class BoundedOutOfOrdernessGenerator extends
> AssignerWithPeriodicWatermarks<MyEvent> {
>
>     private final long maxOutOfOrderness = 5000; // 5 seconds
>
>     private long currentMaxTimestamp;
>
>     @Override
>     public long extractTimestamp(MyEvent element, long
> previousElementTimestamp) {
>         long timestamp = element.getCreationTime();
>         currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp);
>         return timestamp;
>     }
>
>     @Override
>     public Watermark getCurrentWatermark() {
>         // return the watermark as current highest timestamp minus the
> out-of-orderness bound
>         return new Watermark(currentMaxTimestamp - maxOutOfOrderness);
>     }}
>
>
> Thus to summarize the problem,
>
> the records in the window before the last window with different location
> are not processed until there is a window next with a timestamp that is
> more than the End Time timestamp of the existing sliding window.
> This means the window is not processed until next set of records arrive
> with timestamps that are more than existing end time window timestamp.
>
> In a real situation, this means that we wait till the next set of records
> arrive, which may be after a very long duration (e.g. maybe 1 hour), and
> previous records are not processed till then.
>
> Is this a problem that is by behavior?
> Why does the sliding window not process the record that is present even
> after not receiving a record for more than a substantial amount of time,
> e.g. 30 minutes?
> How do we resolve this situation?
>
> Please could you suggest how to resolve this.
>
> Many thanks.
>
>
>
> *Sujit Sakre*



This email is sent on behalf of Northgate Public Services (UK) Limited and its associated companies including Rave Technologies (India) Pvt Limited (together "Northgate Public Services") and is strictly confidential and intended solely for the addressee(s). 
If you are not the intended recipient of this email you must: (i) not disclose, copy or distribute its contents to any other person nor use its contents in any way or you may be acting unlawfully;  (ii) contact Northgate Public Services immediately on +44(0)1908 264500 quoting the name of the sender and the addressee then delete it from your system.
Northgate Public Services has taken reasonable precautions to ensure that no viruses are contained in this email, but does not accept any responsibility once this email has been transmitted.  You should scan attachments (if any) for viruses.

Northgate Public Services (UK) Limited, registered in England and Wales under number 00968498 with a registered address of Peoplebuilding 2, Peoplebuilding Estate, Maylands Avenue, Hemel Hempstead, Hertfordshire, HP2 4NN.  Rave Technologies (India) Pvt Limited, registered in India under number 117068 with a registered address of 2nd Floor, Ballard House, Adi Marzban Marg, Ballard Estate, Mumbai, Maharashtra, India, 400001.
Reply | Threaded
Open this post in threaded view
|

Re: Sliding Windows Processing problem with Kafka Queue Event Time and BoundedOutOfOrdernessGenerator

Aljoscha Krettek
Hi Sujit,
as Nico said, somewhat hidden in the middle of his mail: "Since a sliding window is only closed and fired once the watermark is received,
its first time firing is actually 5s behind as per your maxOutOfOrderness."

In your setup, the watermark is always lagging 5 seconds behind the maximum seen timestamp. Your second window that starts
09-09-2016 13:04:00
and ends at
09-09-2016 13:10:00

will only get triggered once the watermark passed the end of that window, which would be 13:10:00 here. For this to happen, your timestamp extractor needs to see an element with a timestamp of at least 13:10:05, which happens once the elements that make up your third window arrive.

In cases like this, suggest to have a somewhat more complex timestamp/watermark extractor that also advances the watermark if it hasn't seen data for quite a while and doesn't advance the watermark purely based on the seen elements.

Best,
Aljoscha

On Tue, 28 Feb 2017 at 10:22 Sujit Sakre <[hidden email]> wrote:
Hi Nico,

Thanks.

Yes, we are using the window assigner and allowedLateness parameters as below

.window(SlidingEventTimeWindows.of(Time.minutes(6), Time.minutes(2)))   // 6 min window with 2 min sliding window
.allowedLateness(Time.seconds(5))

We need to use the BoundedOutofOrdernessGenerator to assign timestamps anyway. 

Even if we use .allowedLateness(Time.seconds(5)), it gives the same output.

It seems to me that the watermark is not generated for final set of records. If we don't use the watermark and timestamps function, the processing can't take place. Hence this processing is dependent on watermarks.

Please could you suggest how we can solve this?

Thank you.



Sujit Sakre


On 27 February 2017 at 22:55, Nico Kruber <[hidden email]> wrote:
Hi Sujit,
actually, according to
https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/
windows.html#allowed-lateness
the sliding window should fire each time for each element arriving late.

Did you set the following for your window operator?
    .window(<window assigner>)
    .allowedLateness(<time>)

The expected behaviour should be:
1) fire once a watermark exceeding the sliding window end has been received
2) fire each time an event after the sliding window end is received as long as
no watermark is exceeding sliding window end + allowed lateness

In your case, if I see it correctly and do not misinterpret your example, the
late firing has nothing to do with the sliding window but rather your
BoundedOutOfOrdernessGenerator:
Since a sliding window is only closed and fired once the watermark is received,
its first time firing is actually 5s behind as per your maxOutOfOrderness.

It actually looks like you may be better of using allowed lateness for your
usecase depending on what you actually need.


Regards
Nico

On Monday, 27 February 2017 15:44:20 CET Sujit Sakre wrote:
> Hi,
>
> Hope you are well.
>
> We have encountered an issue in processing sliding windows. Here we have
> encountered the problem that if the last record is outside of the sliding
> window end time then it does not process the record till the next sliding
> window is completely occupied and gets triggered.
>
> Please consider the example below:
>
> The datastream is Kafka based and Keyed on Location (1, 2, 3, 4 etc.)
>
> Our previous sliding window
> starts at
> 09-09-2016 12:54:00,
> End at
> 09-09-2016 13:00:00
> *Key is 15 (Location)*
>
> Records in between with timestamps:
> 09-09-2016 12:56:33
> 09-09-2016 12:56:47
> 09-09-2016 12:58:04
> 09-09-2016 12:58:39
> 09-09-2016 12:58:45
>
> However the next window starts at
> 09-09-2016 13:04:00
> and ends at
> 09-09-2016 13:10:00
> *with Key as  16 (Location)*
> with record timestamps:
>
> 09-09-2016 13:04:48
> 09-09-2016 13:06:07
> 09-09-2016 13:06:38
> 09-09-2016 13:07:25
> 09-09-2016 13:08:00
> 09-09-2016 13:08:20
> 09-09-2016 13:08:38
>
>
> is not processed until records are entered in *Location 17* with the
> timestamps:
> 09-09-2016 13:08:48
> 09-09-2016 13:08:55
> 09-09-2016 13:09:11
> 09-09-2016 13:11:48
> The window that gets formed at that time has
> Start Time: 09-09-2016 13:06:00
> End Time: 09-09-2016 13:12:00
>
> We are using the standard BoundedOutOfOrdernessGenerator with maximum out
> of orderness of 5 seconds (we have tried various other combinations of the
> maxoutoforderness values but without success), and Event Time based
> processing.
>
> /** * This generator generates watermarks assuming that elements
> arrive out of order, * but only to a certain degree. The latest
> elements for a certain timestamp t will arrive * at most n
> milliseconds after the earliest elements for timestamp t. */public
> class BoundedOutOfOrdernessGenerator extends
> AssignerWithPeriodicWatermarks<MyEvent> {
>
>     private final long maxOutOfOrderness = 5000; // 5 seconds
>
>     private long currentMaxTimestamp;
>
>     @Override
>     public long extractTimestamp(MyEvent element, long
> previousElementTimestamp) {
>         long timestamp = element.getCreationTime();
>         currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp);
>         return timestamp;
>     }
>
>     @Override
>     public Watermark getCurrentWatermark() {
>         // return the watermark as current highest timestamp minus the
> out-of-orderness bound
>         return new Watermark(currentMaxTimestamp - maxOutOfOrderness);
>     }}
>
>
> Thus to summarize the problem,
>
> the records in the window before the last window with different location
> are not processed until there is a window next with a timestamp that is
> more than the End Time timestamp of the existing sliding window.
> This means the window is not processed until next set of records arrive
> with timestamps that are more than existing end time window timestamp.
>
> In a real situation, this means that we wait till the next set of records
> arrive, which may be after a very long duration (e.g. maybe 1 hour), and
> previous records are not processed till then.
>
> Is this a problem that is by behavior?
> Why does the sliding window not process the record that is present even
> after not receiving a record for more than a substantial amount of time,
> e.g. 30 minutes?
> How do we resolve this situation?
>
> Please could you suggest how to resolve this.
>
> Many thanks.
>
>
>
> *Sujit Sakre*



This email is sent on behalf of Northgate Public Services (UK) Limited and its associated companies including Rave Technologies (India) Pvt Limited (together "Northgate Public Services") and is strictly confidential and intended solely for the addressee(s). 
If you are not the intended recipient of this email you must: (i) not disclose, copy or distribute its contents to any other person nor use its contents in any way or you may be acting unlawfully;  (ii) contact Northgate Public Services immediately on <a href="tel:+44%201908%20264500" value="+441908264500" class="gmail_msg" target="_blank">+44(0)1908 264500 quoting the name of the sender and the addressee then delete it from your system.
Northgate Public Services has taken reasonable precautions to ensure that no viruses are contained in this email, but does not accept any responsibility once this email has been transmitted.  You should scan attachments (if any) for viruses.

Northgate Public Services (UK) Limited, registered in England and Wales under number 00968498 with a registered address of Peoplebuilding 2, Peoplebuilding Estate, Maylands Avenue, Hemel Hempstead, Hertfordshire, HP2 4NN.  Rave Technologies (India) Pvt Limited, registered in India under number 117068 with a registered address of 2nd Floor, Ballard House, Adi Marzban Marg, Ballard Estate, Mumbai, Maharashtra, India, 400001.