event-time window cannot become earlier than the current watermark by merging

classic Classic list List threaded Threaded
14 messages Options
Reply | Threaded
Open this post in threaded view
|

event-time window cannot become earlier than the current watermark by merging

Vishal Santoshi
Hey folks,
               I had a pipe with sessionization restarts and then fail after retries with this exception. The only thing I had done was to increase the lateness by 12 hours ( to  a day )  in this pipe and restart from SP and it ran for 12 hours plus without issue. I cannot imagine that increasing the lateness created this and the way I solved this was to increase the lateness further. Could this be if there are TMs in the cluster whose time is off ( as in not synchronized )  ?

2021-04-21 11:27:58
java.lang.UnsupportedOperationException: The end timestamp of an event-time window cannot become earlier than the current watermark by merging. Current watermark: 1618966593999 window: TimeWindow{start=1618878336107, end=1618880140466}
    at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator$2.merge(WindowOperator.java:339)
    at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator$2.merge(WindowOperator.java:321)
    at org.apache.flink.streaming.runtime.operators.windowing.MergingWindowSet.addWindow(MergingWindowSet.java:209)
    at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:319)
    at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:191)
    at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:204)
    at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:174)
    at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:396)
    at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:191)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:617)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:581)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
    at java.lang.Thread.run(Thread.java:748)


Reply | Threaded
Open this post in threaded view
|

Re: event-time window cannot become earlier than the current watermark by merging

Vishal Santoshi
Well it was not a solution after all. We now have a session window that is stuck with the same issue albeit  after the additional lateness. I had increased the lateness to 2 days and that masked the issue which again reared it's head after the 2 days ;lateness was over ( instead of the 1 day ) before. This is very disconcerting.

Caused by: java.lang.UnsupportedOperationException: The end timestamp of an event-time window cannot become earlier than the current watermark by merging. Current watermark: 1619053742129 window: TimeWindow{start=1618877773663, end=1618879580402}



On Wed, Apr 21, 2021 at 7:05 PM Vishal Santoshi <[hidden email]> wrote:
Hey folks,
               I had a pipe with sessionization restarts and then fail after retries with this exception. The only thing I had done was to increase the lateness by 12 hours ( to  a day )  in this pipe and restart from SP and it ran for 12 hours plus without issue. I cannot imagine that increasing the lateness created this and the way I solved this was to increase the lateness further. Could this be if there are TMs in the cluster whose time is off ( as in not synchronized )  ?

2021-04-21 11:27:58
java.lang.UnsupportedOperationException: The end timestamp of an event-time window cannot become earlier than the current watermark by merging. Current watermark: 1618966593999 window: TimeWindow{start=1618878336107, end=1618880140466}
    at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator$2.merge(WindowOperator.java:339)
    at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator$2.merge(WindowOperator.java:321)
    at org.apache.flink.streaming.runtime.operators.windowing.MergingWindowSet.addWindow(MergingWindowSet.java:209)
    at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:319)
    at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:191)
    at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:204)
    at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:174)
    at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:396)
    at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:191)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:617)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:581)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
    at java.lang.Thread.run(Thread.java:748)


Reply | Threaded
Open this post in threaded view
|

Re: event-time window cannot become earlier than the current watermark by merging

Vishal Santoshi
I saw  https://stackoverflow.com/questions/57334257/the-end-timestamp-of-an-event-time-window-cannot-become-earlier-than-the-current. and this seems to suggest a straight up filter, but I am not sure how does that filter works as in would it factor is the lateness when filtering ? 

On Thu, Apr 22, 2021 at 8:24 AM Vishal Santoshi <[hidden email]> wrote:
Well it was not a solution after all. We now have a session window that is stuck with the same issue albeit  after the additional lateness. I had increased the lateness to 2 days and that masked the issue which again reared it's head after the 2 days ;lateness was over ( instead of the 1 day ) before. This is very disconcerting.

Caused by: java.lang.UnsupportedOperationException: The end timestamp of an event-time window cannot become earlier than the current watermark by merging. Current watermark: 1619053742129 window: TimeWindow{start=1618877773663, end=1618879580402}



On Wed, Apr 21, 2021 at 7:05 PM Vishal Santoshi <[hidden email]> wrote:
Hey folks,
               I had a pipe with sessionization restarts and then fail after retries with this exception. The only thing I had done was to increase the lateness by 12 hours ( to  a day )  in this pipe and restart from SP and it ran for 12 hours plus without issue. I cannot imagine that increasing the lateness created this and the way I solved this was to increase the lateness further. Could this be if there are TMs in the cluster whose time is off ( as in not synchronized )  ?

2021-04-21 11:27:58
java.lang.UnsupportedOperationException: The end timestamp of an event-time window cannot become earlier than the current watermark by merging. Current watermark: 1618966593999 window: TimeWindow{start=1618878336107, end=1618880140466}
    at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator$2.merge(WindowOperator.java:339)
    at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator$2.merge(WindowOperator.java:321)
    at org.apache.flink.streaming.runtime.operators.windowing.MergingWindowSet.addWindow(MergingWindowSet.java:209)
    at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:319)
    at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:191)
    at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:204)
    at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:174)
    at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:396)
    at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:191)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:617)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:581)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
    at java.lang.Thread.run(Thread.java:748)


Reply | Threaded
Open this post in threaded view
|

Re: event-time window cannot become earlier than the current watermark by merging

Vishal Santoshi
The only thing I can think of is to add the lateness configured to the filter as in here, as in the time on the element + lateness should always be greater then the current WM. As in the current issue is

           

Mon Apr 19 20:46:20 EDT 2021.  Window end 

Wed Apr 21 21:09:02 EDT 2021,  WM


an event forced this merged window. And it is likely that it has the time of Mon Apr 19 20:46:20 EDT 2021. We filtering this event out to not hit https://github.com/aljoscha/flink/blob/2836eccc8498de7a1cad083e6102944471bbd350/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java#L125


Either ways the solution is yukky and not sure how it happened the first place ?



public class LateEventFilter extends ProcessFunction<KeyedTimedValue<KEY, VALUE>, KeyedTimedValue<KEY, VALUE>> {
private static final long serialVersionUID = 1L;

long allowedLateness;
public LateEventFilter(long allowedLateness){
this.allowedLateness = allowedLateness;
}
@Override
public void processElement(KeyedTimedValue<KEY, VALUE> value, Context ctx,
Collector<KeyedTimedValue<KEY, VALUE>> out) throws Exception {
if (ctx.timestamp() + allowedLateness > ctx.timerService().currentWatermark()) {
out.collect(value);
}
}
}

On Thu, Apr 22, 2021 at 8:52 AM Vishal Santoshi <[hidden email]> wrote:
I saw  https://stackoverflow.com/questions/57334257/the-end-timestamp-of-an-event-time-window-cannot-become-earlier-than-the-current. and this seems to suggest a straight up filter, but I am not sure how does that filter works as in would it factor is the lateness when filtering ? 

On Thu, Apr 22, 2021 at 8:24 AM Vishal Santoshi <[hidden email]> wrote:
Well it was not a solution after all. We now have a session window that is stuck with the same issue albeit  after the additional lateness. I had increased the lateness to 2 days and that masked the issue which again reared it's head after the 2 days ;lateness was over ( instead of the 1 day ) before. This is very disconcerting.

Caused by: java.lang.UnsupportedOperationException: The end timestamp of an event-time window cannot become earlier than the current watermark by merging. Current watermark: 1619053742129 window: TimeWindow{start=1618877773663, end=1618879580402}



On Wed, Apr 21, 2021 at 7:05 PM Vishal Santoshi <[hidden email]> wrote:
Hey folks,
               I had a pipe with sessionization restarts and then fail after retries with this exception. The only thing I had done was to increase the lateness by 12 hours ( to  a day )  in this pipe and restart from SP and it ran for 12 hours plus without issue. I cannot imagine that increasing the lateness created this and the way I solved this was to increase the lateness further. Could this be if there are TMs in the cluster whose time is off ( as in not synchronized )  ?

2021-04-21 11:27:58
java.lang.UnsupportedOperationException: The end timestamp of an event-time window cannot become earlier than the current watermark by merging. Current watermark: 1618966593999 window: TimeWindow{start=1618878336107, end=1618880140466}
    at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator$2.merge(WindowOperator.java:339)
    at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator$2.merge(WindowOperator.java:321)
    at org.apache.flink.streaming.runtime.operators.windowing.MergingWindowSet.addWindow(MergingWindowSet.java:209)
    at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:319)
    at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:191)
    at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:204)
    at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:174)
    at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:396)
    at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:191)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:617)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:581)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
    at java.lang.Thread.run(Thread.java:748)


Reply | Threaded
Open this post in threaded view
|

Re: event-time window cannot become earlier than the current watermark by merging

Matthias
In reply to this post by Vishal Santoshi
Hi Vishal,
based on the error message and the behavior you described, introducing a filter for late events is the way to go - just as described in the SO thread you mentioned. Usually, you would collect late events in some kind of side output [1].

I hope that helps.
Matthias


On Thu, Apr 22, 2021 at 3:22 PM Vishal Santoshi <[hidden email]> wrote:
I saw  https://stackoverflow.com/questions/57334257/the-end-timestamp-of-an-event-time-window-cannot-become-earlier-than-the-current. and this seems to suggest a straight up filter, but I am not sure how does that filter works as in would it factor is the lateness when filtering ? 

On Thu, Apr 22, 2021 at 8:24 AM Vishal Santoshi <[hidden email]> wrote:
Well it was not a solution after all. We now have a session window that is stuck with the same issue albeit  after the additional lateness. I had increased the lateness to 2 days and that masked the issue which again reared it's head after the 2 days ;lateness was over ( instead of the 1 day ) before. This is very disconcerting.

Caused by: java.lang.UnsupportedOperationException: The end timestamp of an event-time window cannot become earlier than the current watermark by merging. Current watermark: 1619053742129 window: TimeWindow{start=1618877773663, end=1618879580402}



On Wed, Apr 21, 2021 at 7:05 PM Vishal Santoshi <[hidden email]> wrote:
Hey folks,
               I had a pipe with sessionization restarts and then fail after retries with this exception. The only thing I had done was to increase the lateness by 12 hours ( to  a day )  in this pipe and restart from SP and it ran for 12 hours plus without issue. I cannot imagine that increasing the lateness created this and the way I solved this was to increase the lateness further. Could this be if there are TMs in the cluster whose time is off ( as in not synchronized )  ?

2021-04-21 11:27:58
java.lang.UnsupportedOperationException: The end timestamp of an event-time window cannot become earlier than the current watermark by merging. Current watermark: 1618966593999 window: TimeWindow{start=1618878336107, end=1618880140466}
    at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator$2.merge(WindowOperator.java:339)
    at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator$2.merge(WindowOperator.java:321)
    at org.apache.flink.streaming.runtime.operators.windowing.MergingWindowSet.addWindow(MergingWindowSet.java:209)
    at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:319)
    at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:191)
    at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:204)
    at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:174)
    at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:396)
    at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:191)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:617)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:581)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
    at java.lang.Thread.run(Thread.java:748)



Reply | Threaded
Open this post in threaded view
|

Re: event-time window cannot become earlier than the current watermark by merging

Vishal Santoshi
I can do that, but I am not certain this is the right filter.  Can you please validate. That aside I already have the lateness configured for the session window ( the normal withLateNess() )  and this looks like a session window was not collected and still is alive for some reason ( a flink bug ? ) 

if (ctx.timestamp() + allowedLateness > ctx.timerService().currentWatermark()) {
out.collect(value);
}


On Thu, Apr 22, 2021 at 9:46 AM Matthias Pohl <[hidden email]> wrote:
Hi Vishal,
based on the error message and the behavior you described, introducing a filter for late events is the way to go - just as described in the SO thread you mentioned. Usually, you would collect late events in some kind of side output [1].

I hope that helps.
Matthias


On Thu, Apr 22, 2021 at 3:22 PM Vishal Santoshi <[hidden email]> wrote:
I saw  https://stackoverflow.com/questions/57334257/the-end-timestamp-of-an-event-time-window-cannot-become-earlier-than-the-current. and this seems to suggest a straight up filter, but I am not sure how does that filter works as in would it factor is the lateness when filtering ? 

On Thu, Apr 22, 2021 at 8:24 AM Vishal Santoshi <[hidden email]> wrote:
Well it was not a solution after all. We now have a session window that is stuck with the same issue albeit  after the additional lateness. I had increased the lateness to 2 days and that masked the issue which again reared it's head after the 2 days ;lateness was over ( instead of the 1 day ) before. This is very disconcerting.

Caused by: java.lang.UnsupportedOperationException: The end timestamp of an event-time window cannot become earlier than the current watermark by merging. Current watermark: 1619053742129 window: TimeWindow{start=1618877773663, end=1618879580402}



On Wed, Apr 21, 2021 at 7:05 PM Vishal Santoshi <[hidden email]> wrote:
Hey folks,
               I had a pipe with sessionization restarts and then fail after retries with this exception. The only thing I had done was to increase the lateness by 12 hours ( to  a day )  in this pipe and restart from SP and it ran for 12 hours plus without issue. I cannot imagine that increasing the lateness created this and the way I solved this was to increase the lateness further. Could this be if there are TMs in the cluster whose time is off ( as in not synchronized )  ?

2021-04-21 11:27:58
java.lang.UnsupportedOperationException: The end timestamp of an event-time window cannot become earlier than the current watermark by merging. Current watermark: 1618966593999 window: TimeWindow{start=1618878336107, end=1618880140466}
    at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator$2.merge(WindowOperator.java:339)
    at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator$2.merge(WindowOperator.java:321)
    at org.apache.flink.streaming.runtime.operators.windowing.MergingWindowSet.addWindow(MergingWindowSet.java:209)
    at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:319)
    at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:191)
    at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:204)
    at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:174)
    at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:396)
    at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:191)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:617)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:581)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
    at java.lang.Thread.run(Thread.java:748)



Reply | Threaded
Open this post in threaded view
|

Re: event-time window cannot become earlier than the current watermark by merging

Vishal Santoshi
As in this is essentially doing what lateness should have done  And I think that is a bug. My code now is . Please look at the allowedLateness on the session window.

SingleOutputStreamOperator<KeyedTimedValue<KEY, VALUE>> filteredKeyedValue = keyedValue
.process(new LateEventFilter(this.lateNessInMinutes*60*1000l)).name("late_filter").uid("late_filter");
SingleOutputStreamOperator<KeyedTimedValue<KEY, VALUE>> lateKeyedValue = keyedValue
.process(new LateEventSideOutput(this.lateNessInMinutes*60*1000l)).name("late_data").uid("late_data");
SingleOutputStreamOperator<KeyedSessionWithSessionID<KEY, VALUE>> aggregate = filteredKeyedValue
.filter((f) -> f.key != null && f.timedValue.getEventTime() != null).keyBy(value -> value.getKey())
.window(EventTimeSessionWindows.withGap(Time.minutes(gapInMinutes)))
.allowedLateness(Time.minutes(lateNessInMinutes)).sideOutputLateData(lateOutputTag)
.trigger(PurgingTrigger.of(CountTrigger.of(1)))
.aggregate(new SortAggregate<KEY, VALUE>(),
new SessionIdProcessWindowFunction<KEY, VALUE>(this.gapInMinutes, this.lateNessInMinutes))
.name("session_aggregate").uid("session_aggregate");

On Thu, Apr 22, 2021 at 9:59 AM Vishal Santoshi <[hidden email]> wrote:
I can do that, but I am not certain this is the right filter.  Can you please validate. That aside I already have the lateness configured for the session window ( the normal withLateNess() )  and this looks like a session window was not collected and still is alive for some reason ( a flink bug ? ) 

if (ctx.timestamp() + allowedLateness > ctx.timerService().currentWatermark()) {
out.collect(value);
}


On Thu, Apr 22, 2021 at 9:46 AM Matthias Pohl <[hidden email]> wrote:
Hi Vishal,
based on the error message and the behavior you described, introducing a filter for late events is the way to go - just as described in the SO thread you mentioned. Usually, you would collect late events in some kind of side output [1].

I hope that helps.
Matthias


On Thu, Apr 22, 2021 at 3:22 PM Vishal Santoshi <[hidden email]> wrote:
I saw  https://stackoverflow.com/questions/57334257/the-end-timestamp-of-an-event-time-window-cannot-become-earlier-than-the-current. and this seems to suggest a straight up filter, but I am not sure how does that filter works as in would it factor is the lateness when filtering ? 

On Thu, Apr 22, 2021 at 8:24 AM Vishal Santoshi <[hidden email]> wrote:
Well it was not a solution after all. We now have a session window that is stuck with the same issue albeit  after the additional lateness. I had increased the lateness to 2 days and that masked the issue which again reared it's head after the 2 days ;lateness was over ( instead of the 1 day ) before. This is very disconcerting.

Caused by: java.lang.UnsupportedOperationException: The end timestamp of an event-time window cannot become earlier than the current watermark by merging. Current watermark: 1619053742129 window: TimeWindow{start=1618877773663, end=1618879580402}



On Wed, Apr 21, 2021 at 7:05 PM Vishal Santoshi <[hidden email]> wrote:
Hey folks,
               I had a pipe with sessionization restarts and then fail after retries with this exception. The only thing I had done was to increase the lateness by 12 hours ( to  a day )  in this pipe and restart from SP and it ran for 12 hours plus without issue. I cannot imagine that increasing the lateness created this and the way I solved this was to increase the lateness further. Could this be if there are TMs in the cluster whose time is off ( as in not synchronized )  ?

2021-04-21 11:27:58
java.lang.UnsupportedOperationException: The end timestamp of an event-time window cannot become earlier than the current watermark by merging. Current watermark: 1618966593999 window: TimeWindow{start=1618878336107, end=1618880140466}
    at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator$2.merge(WindowOperator.java:339)
    at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator$2.merge(WindowOperator.java:321)
    at org.apache.flink.streaming.runtime.operators.windowing.MergingWindowSet.addWindow(MergingWindowSet.java:209)
    at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:319)
    at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:191)
    at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:204)
    at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:174)
    at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:396)
    at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:191)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:617)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:581)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
    at java.lang.Thread.run(Thread.java:748)



Reply | Threaded
Open this post in threaded view
|

Re: event-time window cannot become earlier than the current watermark by merging

Matthias
You're saying that you used `allowedLateness`/`sideOutputLateData` as described in [1] but without the `LateEventFilter`/`LateEventSideOutput` being added to your pipeline when running into the UnsupportedOperationException issue previously?


On Thu, Apr 22, 2021 at 5:32 PM Vishal Santoshi <[hidden email]> wrote:
As in this is essentially doing what lateness should have done  And I think that is a bug. My code now is . Please look at the allowedLateness on the session window.

SingleOutputStreamOperator<KeyedTimedValue<KEY, VALUE>> filteredKeyedValue = keyedValue
.process(new LateEventFilter(this.lateNessInMinutes*60*1000l)).name("late_filter").uid("late_filter");
SingleOutputStreamOperator<KeyedTimedValue<KEY, VALUE>> lateKeyedValue = keyedValue
.process(new LateEventSideOutput(this.lateNessInMinutes*60*1000l)).name("late_data").uid("late_data");
SingleOutputStreamOperator<KeyedSessionWithSessionID<KEY, VALUE>> aggregate = filteredKeyedValue
.filter((f) -> f.key != null && f.timedValue.getEventTime() != null).keyBy(value -> value.getKey())
.window(EventTimeSessionWindows.withGap(Time.minutes(gapInMinutes)))
.allowedLateness(Time.minutes(lateNessInMinutes)).sideOutputLateData(lateOutputTag)
.trigger(PurgingTrigger.of(CountTrigger.of(1)))
.aggregate(new SortAggregate<KEY, VALUE>(),
new SessionIdProcessWindowFunction<KEY, VALUE>(this.gapInMinutes, this.lateNessInMinutes))
.name("session_aggregate").uid("session_aggregate");

On Thu, Apr 22, 2021 at 9:59 AM Vishal Santoshi <[hidden email]> wrote:
I can do that, but I am not certain this is the right filter.  Can you please validate. That aside I already have the lateness configured for the session window ( the normal withLateNess() )  and this looks like a session window was not collected and still is alive for some reason ( a flink bug ? ) 

if (ctx.timestamp() + allowedLateness > ctx.timerService().currentWatermark()) {
out.collect(value);
}


On Thu, Apr 22, 2021 at 9:46 AM Matthias Pohl <[hidden email]> wrote:
Hi Vishal,
based on the error message and the behavior you described, introducing a filter for late events is the way to go - just as described in the SO thread you mentioned. Usually, you would collect late events in some kind of side output [1].

I hope that helps.
Matthias


On Thu, Apr 22, 2021 at 3:22 PM Vishal Santoshi <[hidden email]> wrote:
I saw  https://stackoverflow.com/questions/57334257/the-end-timestamp-of-an-event-time-window-cannot-become-earlier-than-the-current. and this seems to suggest a straight up filter, but I am not sure how does that filter works as in would it factor is the lateness when filtering ? 

On Thu, Apr 22, 2021 at 8:24 AM Vishal Santoshi <[hidden email]> wrote:
Well it was not a solution after all. We now have a session window that is stuck with the same issue albeit  after the additional lateness. I had increased the lateness to 2 days and that masked the issue which again reared it's head after the 2 days ;lateness was over ( instead of the 1 day ) before. This is very disconcerting.

Caused by: java.lang.UnsupportedOperationException: The end timestamp of an event-time window cannot become earlier than the current watermark by merging. Current watermark: 1619053742129 window: TimeWindow{start=1618877773663, end=1618879580402}



On Wed, Apr 21, 2021 at 7:05 PM Vishal Santoshi <[hidden email]> wrote:
Hey folks,
               I had a pipe with sessionization restarts and then fail after retries with this exception. The only thing I had done was to increase the lateness by 12 hours ( to  a day )  in this pipe and restart from SP and it ran for 12 hours plus without issue. I cannot imagine that increasing the lateness created this and the way I solved this was to increase the lateness further. Could this be if there are TMs in the cluster whose time is off ( as in not synchronized )  ?

2021-04-21 11:27:58
java.lang.UnsupportedOperationException: The end timestamp of an event-time window cannot become earlier than the current watermark by merging. Current watermark: 1618966593999 window: TimeWindow{start=1618878336107, end=1618880140466}
    at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator$2.merge(WindowOperator.java:339)
    at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator$2.merge(WindowOperator.java:321)
    at org.apache.flink.streaming.runtime.operators.windowing.MergingWindowSet.addWindow(MergingWindowSet.java:209)
    at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:319)
    at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:191)
    at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:204)
    at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:174)
    at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:396)
    at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:191)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:617)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:581)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
    at java.lang.Thread.run(Thread.java:748)





--

Matthias Pohl | Engineer


Follow us @VervericaData Ververica

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Yip Park Tung Jason, Jinwei (Kevin) Zhang, Karl Anton Wehner
Reply | Threaded
Open this post in threaded view
|

Re: event-time window cannot become earlier than the current watermark by merging

Vishal Santoshi
Yes sir. The allowedLateNess and side output always existed. 

On Thu, Apr 22, 2021 at 11:47 AM Matthias Pohl <[hidden email]> wrote:
You're saying that you used `allowedLateness`/`sideOutputLateData` as described in [1] but without the `LateEventFilter`/`LateEventSideOutput` being added to your pipeline when running into the UnsupportedOperationException issue previously?


On Thu, Apr 22, 2021 at 5:32 PM Vishal Santoshi <[hidden email]> wrote:
As in this is essentially doing what lateness should have done  And I think that is a bug. My code now is . Please look at the allowedLateness on the session window.

SingleOutputStreamOperator<KeyedTimedValue<KEY, VALUE>> filteredKeyedValue = keyedValue
.process(new LateEventFilter(this.lateNessInMinutes*60*1000l)).name("late_filter").uid("late_filter");
SingleOutputStreamOperator<KeyedTimedValue<KEY, VALUE>> lateKeyedValue = keyedValue
.process(new LateEventSideOutput(this.lateNessInMinutes*60*1000l)).name("late_data").uid("late_data");
SingleOutputStreamOperator<KeyedSessionWithSessionID<KEY, VALUE>> aggregate = filteredKeyedValue
.filter((f) -> f.key != null && f.timedValue.getEventTime() != null).keyBy(value -> value.getKey())
.window(EventTimeSessionWindows.withGap(Time.minutes(gapInMinutes)))
.allowedLateness(Time.minutes(lateNessInMinutes)).sideOutputLateData(lateOutputTag)
.trigger(PurgingTrigger.of(CountTrigger.of(1)))
.aggregate(new SortAggregate<KEY, VALUE>(),
new SessionIdProcessWindowFunction<KEY, VALUE>(this.gapInMinutes, this.lateNessInMinutes))
.name("session_aggregate").uid("session_aggregate");

On Thu, Apr 22, 2021 at 9:59 AM Vishal Santoshi <[hidden email]> wrote:
I can do that, but I am not certain this is the right filter.  Can you please validate. That aside I already have the lateness configured for the session window ( the normal withLateNess() )  and this looks like a session window was not collected and still is alive for some reason ( a flink bug ? ) 

if (ctx.timestamp() + allowedLateness > ctx.timerService().currentWatermark()) {
out.collect(value);
}


On Thu, Apr 22, 2021 at 9:46 AM Matthias Pohl <[hidden email]> wrote:
Hi Vishal,
based on the error message and the behavior you described, introducing a filter for late events is the way to go - just as described in the SO thread you mentioned. Usually, you would collect late events in some kind of side output [1].

I hope that helps.
Matthias


On Thu, Apr 22, 2021 at 3:22 PM Vishal Santoshi <[hidden email]> wrote:
I saw  https://stackoverflow.com/questions/57334257/the-end-timestamp-of-an-event-time-window-cannot-become-earlier-than-the-current. and this seems to suggest a straight up filter, but I am not sure how does that filter works as in would it factor is the lateness when filtering ? 

On Thu, Apr 22, 2021 at 8:24 AM Vishal Santoshi <[hidden email]> wrote:
Well it was not a solution after all. We now have a session window that is stuck with the same issue albeit  after the additional lateness. I had increased the lateness to 2 days and that masked the issue which again reared it's head after the 2 days ;lateness was over ( instead of the 1 day ) before. This is very disconcerting.

Caused by: java.lang.UnsupportedOperationException: The end timestamp of an event-time window cannot become earlier than the current watermark by merging. Current watermark: 1619053742129 window: TimeWindow{start=1618877773663, end=1618879580402}



On Wed, Apr 21, 2021 at 7:05 PM Vishal Santoshi <[hidden email]> wrote:
Hey folks,
               I had a pipe with sessionization restarts and then fail after retries with this exception. The only thing I had done was to increase the lateness by 12 hours ( to  a day )  in this pipe and restart from SP and it ran for 12 hours plus without issue. I cannot imagine that increasing the lateness created this and the way I solved this was to increase the lateness further. Could this be if there are TMs in the cluster whose time is off ( as in not synchronized )  ?

2021-04-21 11:27:58
java.lang.UnsupportedOperationException: The end timestamp of an event-time window cannot become earlier than the current watermark by merging. Current watermark: 1618966593999 window: TimeWindow{start=1618878336107, end=1618880140466}
    at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator$2.merge(WindowOperator.java:339)
    at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator$2.merge(WindowOperator.java:321)
    at org.apache.flink.streaming.runtime.operators.windowing.MergingWindowSet.addWindow(MergingWindowSet.java:209)
    at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:319)
    at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:191)
    at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:204)
    at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:174)
    at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:396)
    at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:191)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:617)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:581)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
    at java.lang.Thread.run(Thread.java:748)





--

Matthias Pohl | Engineer


Follow us @VervericaData Ververica

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Yip Park Tung Jason, Jinwei (Kevin) Zhang, Karl Anton Wehner
Reply | Threaded
Open this post in threaded view
|

Re: event-time window cannot become earlier than the current watermark by merging

Vishal Santoshi
And when I added the filter the Exception was not thrown. So the sequence of events

* Increased lateness from 12 ( that was what it was initially running with )  to 24 hours
* the pipe ran as desired before it blew up with the Exception
* masked the issue by increasing the lateness to 48 hours.
* It blew up again but now after the added lateness, so essentially the same issue but added lateness let the pipe run for another few hours.
* Added the Fliter upfront  as below, the pipe has no issues. Also metrics show that no data is being pushed through the sideoutput and that data in not pulled from the a simulated sideout ( below ) 


public class LateEventFilter extends ProcessFunction<KeyedTimedValue<KEY, VALUE>, KeyedTimedValue<KEY, VALUE>> {
private static final long serialVersionUID = 1L;

long allowedLateness;
public LateEventFilter(long allowedLateness){
this.allowedLateness = allowedLateness;
}
@Override
public void processElement(KeyedTimedValue<KEY, VALUE> value, Context ctx,
Collector<KeyedTimedValue<KEY, VALUE>> out) throws Exception {
if (ctx.timestamp() + allowedLateness > ctx.timerService().currentWatermark()) {
out.collect(value);
}
}
}


public class LateEventSideOutput extends ProcessFunction<KeyedTimedValue<KEY, VALUE>, KeyedTimedValue<KEY, VALUE>> {
private static final long serialVersionUID = 1L;

long allowedLateness;
public LateEventSideOutput(long allowedLateness){
this.allowedLateness = allowedLateness;
}
@Override
public void processElement(KeyedTimedValue<KEY, VALUE> value, Context ctx,
Collector<KeyedTimedValue<KEY, VALUE>> out) throws Exception {
if (ctx.timestamp() + allowedLateness <= ctx.timerService().currentWatermark()) {
out.collect(value);
}
}
}



 I am using RocksDB as a backend if that helps. 

On Thu, Apr 22, 2021 at 1:50 PM Vishal Santoshi <[hidden email]> wrote:
Yes sir. The allowedLateNess and side output always existed. 

On Thu, Apr 22, 2021 at 11:47 AM Matthias Pohl <[hidden email]> wrote:
You're saying that you used `allowedLateness`/`sideOutputLateData` as described in [1] but without the `LateEventFilter`/`LateEventSideOutput` being added to your pipeline when running into the UnsupportedOperationException issue previously?


On Thu, Apr 22, 2021 at 5:32 PM Vishal Santoshi <[hidden email]> wrote:
As in this is essentially doing what lateness should have done  And I think that is a bug. My code now is . Please look at the allowedLateness on the session window.

SingleOutputStreamOperator<KeyedTimedValue<KEY, VALUE>> filteredKeyedValue = keyedValue
.process(new LateEventFilter(this.lateNessInMinutes*60*1000l)).name("late_filter").uid("late_filter");
SingleOutputStreamOperator<KeyedTimedValue<KEY, VALUE>> lateKeyedValue = keyedValue
.process(new LateEventSideOutput(this.lateNessInMinutes*60*1000l)).name("late_data").uid("late_data");
SingleOutputStreamOperator<KeyedSessionWithSessionID<KEY, VALUE>> aggregate = filteredKeyedValue
.filter((f) -> f.key != null && f.timedValue.getEventTime() != null).keyBy(value -> value.getKey())
.window(EventTimeSessionWindows.withGap(Time.minutes(gapInMinutes)))
.allowedLateness(Time.minutes(lateNessInMinutes)).sideOutputLateData(lateOutputTag)
.trigger(PurgingTrigger.of(CountTrigger.of(1)))
.aggregate(new SortAggregate<KEY, VALUE>(),
new SessionIdProcessWindowFunction<KEY, VALUE>(this.gapInMinutes, this.lateNessInMinutes))
.name("session_aggregate").uid("session_aggregate");

On Thu, Apr 22, 2021 at 9:59 AM Vishal Santoshi <[hidden email]> wrote:
I can do that, but I am not certain this is the right filter.  Can you please validate. That aside I already have the lateness configured for the session window ( the normal withLateNess() )  and this looks like a session window was not collected and still is alive for some reason ( a flink bug ? ) 

if (ctx.timestamp() + allowedLateness > ctx.timerService().currentWatermark()) {
out.collect(value);
}


On Thu, Apr 22, 2021 at 9:46 AM Matthias Pohl <[hidden email]> wrote:
Hi Vishal,
based on the error message and the behavior you described, introducing a filter for late events is the way to go - just as described in the SO thread you mentioned. Usually, you would collect late events in some kind of side output [1].

I hope that helps.
Matthias


On Thu, Apr 22, 2021 at 3:22 PM Vishal Santoshi <[hidden email]> wrote:
I saw  https://stackoverflow.com/questions/57334257/the-end-timestamp-of-an-event-time-window-cannot-become-earlier-than-the-current. and this seems to suggest a straight up filter, but I am not sure how does that filter works as in would it factor is the lateness when filtering ? 

On Thu, Apr 22, 2021 at 8:24 AM Vishal Santoshi <[hidden email]> wrote:
Well it was not a solution after all. We now have a session window that is stuck with the same issue albeit  after the additional lateness. I had increased the lateness to 2 days and that masked the issue which again reared it's head after the 2 days ;lateness was over ( instead of the 1 day ) before. This is very disconcerting.

Caused by: java.lang.UnsupportedOperationException: The end timestamp of an event-time window cannot become earlier than the current watermark by merging. Current watermark: 1619053742129 window: TimeWindow{start=1618877773663, end=1618879580402}



On Wed, Apr 21, 2021 at 7:05 PM Vishal Santoshi <[hidden email]> wrote:
Hey folks,
               I had a pipe with sessionization restarts and then fail after retries with this exception. The only thing I had done was to increase the lateness by 12 hours ( to  a day )  in this pipe and restart from SP and it ran for 12 hours plus without issue. I cannot imagine that increasing the lateness created this and the way I solved this was to increase the lateness further. Could this be if there are TMs in the cluster whose time is off ( as in not synchronized )  ?

2021-04-21 11:27:58
java.lang.UnsupportedOperationException: The end timestamp of an event-time window cannot become earlier than the current watermark by merging. Current watermark: 1618966593999 window: TimeWindow{start=1618878336107, end=1618880140466}
    at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator$2.merge(WindowOperator.java:339)
    at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator$2.merge(WindowOperator.java:321)
    at org.apache.flink.streaming.runtime.operators.windowing.MergingWindowSet.addWindow(MergingWindowSet.java:209)
    at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:319)
    at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:191)
    at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:204)
    at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:174)
    at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:396)
    at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:191)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:617)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:581)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
    at java.lang.Thread.run(Thread.java:748)





--

Matthias Pohl | Engineer


Follow us @VervericaData Ververica

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Yip Park Tung Jason, Jinwei (Kevin) Zhang, Karl Anton Wehner
Reply | Threaded
Open this post in threaded view
|

Re: event-time window cannot become earlier than the current watermark by merging

Vishal Santoshi
 <<  Added the Fliter upfront  as below, the pipe has no issues. Also metrics show that no data is being pushed through the sideoutput and that data in not pulled from the a simulated sideout ( below ) 

>> Added the Fliter upfront  as below, the pipe has no issues. Also metrics show that no data is being pushed through the sideoutput and that data in now pulled from the simulated sideout , essentially the Process Function with a reverse predicate to the Filter Process Function.


On Thu, Apr 22, 2021 at 1:56 PM Vishal Santoshi <[hidden email]> wrote:
And when I added the filter the Exception was not thrown. So the sequence of events

* Increased lateness from 12 ( that was what it was initially running with )  to 24 hours
* the pipe ran as desired before it blew up with the Exception
* masked the issue by increasing the lateness to 48 hours.
* It blew up again but now after the added lateness, so essentially the same issue but added lateness let the pipe run for another few hours.
* Added the Fliter upfront  as below, the pipe has no issues. Also metrics show that no data is being pushed through the sideoutput and that data in not pulled from the a simulated sideout ( below ) 


public class LateEventFilter extends ProcessFunction<KeyedTimedValue<KEY, VALUE>, KeyedTimedValue<KEY, VALUE>> {
private static final long serialVersionUID = 1L;

long allowedLateness;
public LateEventFilter(long allowedLateness){
this.allowedLateness = allowedLateness;
}
@Override
public void processElement(KeyedTimedValue<KEY, VALUE> value, Context ctx,
Collector<KeyedTimedValue<KEY, VALUE>> out) throws Exception {
if (ctx.timestamp() + allowedLateness > ctx.timerService().currentWatermark()) {
out.collect(value);
}
}
}


public class LateEventSideOutput extends ProcessFunction<KeyedTimedValue<KEY, VALUE>, KeyedTimedValue<KEY, VALUE>> {
private static final long serialVersionUID = 1L;

long allowedLateness;
public LateEventSideOutput(long allowedLateness){
this.allowedLateness = allowedLateness;
}
@Override
public void processElement(KeyedTimedValue<KEY, VALUE> value, Context ctx,
Collector<KeyedTimedValue<KEY, VALUE>> out) throws Exception {
if (ctx.timestamp() + allowedLateness <= ctx.timerService().currentWatermark()) {
out.collect(value);
}
}
}



 I am using RocksDB as a backend if that helps. 

On Thu, Apr 22, 2021 at 1:50 PM Vishal Santoshi <[hidden email]> wrote:
Yes sir. The allowedLateNess and side output always existed. 

On Thu, Apr 22, 2021 at 11:47 AM Matthias Pohl <[hidden email]> wrote:
You're saying that you used `allowedLateness`/`sideOutputLateData` as described in [1] but without the `LateEventFilter`/`LateEventSideOutput` being added to your pipeline when running into the UnsupportedOperationException issue previously?


On Thu, Apr 22, 2021 at 5:32 PM Vishal Santoshi <[hidden email]> wrote:
As in this is essentially doing what lateness should have done  And I think that is a bug. My code now is . Please look at the allowedLateness on the session window.

SingleOutputStreamOperator<KeyedTimedValue<KEY, VALUE>> filteredKeyedValue = keyedValue
.process(new LateEventFilter(this.lateNessInMinutes*60*1000l)).name("late_filter").uid("late_filter");
SingleOutputStreamOperator<KeyedTimedValue<KEY, VALUE>> lateKeyedValue = keyedValue
.process(new LateEventSideOutput(this.lateNessInMinutes*60*1000l)).name("late_data").uid("late_data");
SingleOutputStreamOperator<KeyedSessionWithSessionID<KEY, VALUE>> aggregate = filteredKeyedValue
.filter((f) -> f.key != null && f.timedValue.getEventTime() != null).keyBy(value -> value.getKey())
.window(EventTimeSessionWindows.withGap(Time.minutes(gapInMinutes)))
.allowedLateness(Time.minutes(lateNessInMinutes)).sideOutputLateData(lateOutputTag)
.trigger(PurgingTrigger.of(CountTrigger.of(1)))
.aggregate(new SortAggregate<KEY, VALUE>(),
new SessionIdProcessWindowFunction<KEY, VALUE>(this.gapInMinutes, this.lateNessInMinutes))
.name("session_aggregate").uid("session_aggregate");

On Thu, Apr 22, 2021 at 9:59 AM Vishal Santoshi <[hidden email]> wrote:
I can do that, but I am not certain this is the right filter.  Can you please validate. That aside I already have the lateness configured for the session window ( the normal withLateNess() )  and this looks like a session window was not collected and still is alive for some reason ( a flink bug ? ) 

if (ctx.timestamp() + allowedLateness > ctx.timerService().currentWatermark()) {
out.collect(value);
}


On Thu, Apr 22, 2021 at 9:46 AM Matthias Pohl <[hidden email]> wrote:
Hi Vishal,
based on the error message and the behavior you described, introducing a filter for late events is the way to go - just as described in the SO thread you mentioned. Usually, you would collect late events in some kind of side output [1].

I hope that helps.
Matthias


On Thu, Apr 22, 2021 at 3:22 PM Vishal Santoshi <[hidden email]> wrote:
I saw  https://stackoverflow.com/questions/57334257/the-end-timestamp-of-an-event-time-window-cannot-become-earlier-than-the-current. and this seems to suggest a straight up filter, but I am not sure how does that filter works as in would it factor is the lateness when filtering ? 

On Thu, Apr 22, 2021 at 8:24 AM Vishal Santoshi <[hidden email]> wrote:
Well it was not a solution after all. We now have a session window that is stuck with the same issue albeit  after the additional lateness. I had increased the lateness to 2 days and that masked the issue which again reared it's head after the 2 days ;lateness was over ( instead of the 1 day ) before. This is very disconcerting.

Caused by: java.lang.UnsupportedOperationException: The end timestamp of an event-time window cannot become earlier than the current watermark by merging. Current watermark: 1619053742129 window: TimeWindow{start=1618877773663, end=1618879580402}



On Wed, Apr 21, 2021 at 7:05 PM Vishal Santoshi <[hidden email]> wrote:
Hey folks,
               I had a pipe with sessionization restarts and then fail after retries with this exception. The only thing I had done was to increase the lateness by 12 hours ( to  a day )  in this pipe and restart from SP and it ran for 12 hours plus without issue. I cannot imagine that increasing the lateness created this and the way I solved this was to increase the lateness further. Could this be if there are TMs in the cluster whose time is off ( as in not synchronized )  ?

2021-04-21 11:27:58
java.lang.UnsupportedOperationException: The end timestamp of an event-time window cannot become earlier than the current watermark by merging. Current watermark: 1618966593999 window: TimeWindow{start=1618878336107, end=1618880140466}
    at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator$2.merge(WindowOperator.java:339)
    at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator$2.merge(WindowOperator.java:321)
    at org.apache.flink.streaming.runtime.operators.windowing.MergingWindowSet.addWindow(MergingWindowSet.java:209)
    at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:319)
    at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:191)
    at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:204)
    at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:174)
    at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:396)
    at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:191)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:617)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:581)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
    at java.lang.Thread.run(Thread.java:748)





--

Matthias Pohl | Engineer


Follow us @VervericaData Ververica

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Yip Park Tung Jason, Jinwei (Kevin) Zhang, Karl Anton Wehner
Reply | Threaded
Open this post in threaded view
|

Re: event-time window cannot become earlier than the current watermark by merging

Matthias
To me, it sounds strange. I would have expected it to work with `allowedLateness` and `sideOutput` being defined. I pull in David to have a look at it. Maybe, he has some more insights. I haven't worked that much with lateness, yet.

Matthias

On Thu, Apr 22, 2021 at 10:57 PM Vishal Santoshi <[hidden email]> wrote:
 <<  Added the Fliter upfront  as below, the pipe has no issues. Also metrics show that no data is being pushed through the sideoutput and that data in not pulled from the a simulated sideout ( below ) 

>> Added the Fliter upfront  as below, the pipe has no issues. Also metrics show that no data is being pushed through the sideoutput and that data in now pulled from the simulated sideout , essentially the Process Function with a reverse predicate to the Filter Process Function.


On Thu, Apr 22, 2021 at 1:56 PM Vishal Santoshi <[hidden email]> wrote:
And when I added the filter the Exception was not thrown. So the sequence of events

* Increased lateness from 12 ( that was what it was initially running with )  to 24 hours
* the pipe ran as desired before it blew up with the Exception
* masked the issue by increasing the lateness to 48 hours.
* It blew up again but now after the added lateness, so essentially the same issue but added lateness let the pipe run for another few hours.
* Added the Fliter upfront  as below, the pipe has no issues. Also metrics show that no data is being pushed through the sideoutput and that data in not pulled from the a simulated sideout ( below ) 


public class LateEventFilter extends ProcessFunction<KeyedTimedValue<KEY, VALUE>, KeyedTimedValue<KEY, VALUE>> {
private static final long serialVersionUID = 1L;

long allowedLateness;
public LateEventFilter(long allowedLateness){
this.allowedLateness = allowedLateness;
}
@Override
public void processElement(KeyedTimedValue<KEY, VALUE> value, Context ctx,
Collector<KeyedTimedValue<KEY, VALUE>> out) throws Exception {
if (ctx.timestamp() + allowedLateness > ctx.timerService().currentWatermark()) {
out.collect(value);
}
}
}


public class LateEventSideOutput extends ProcessFunction<KeyedTimedValue<KEY, VALUE>, KeyedTimedValue<KEY, VALUE>> {
private static final long serialVersionUID = 1L;

long allowedLateness;
public LateEventSideOutput(long allowedLateness){
this.allowedLateness = allowedLateness;
}
@Override
public void processElement(KeyedTimedValue<KEY, VALUE> value, Context ctx,
Collector<KeyedTimedValue<KEY, VALUE>> out) throws Exception {
if (ctx.timestamp() + allowedLateness <= ctx.timerService().currentWatermark()) {
out.collect(value);
}
}
}



 I am using RocksDB as a backend if that helps. 

On Thu, Apr 22, 2021 at 1:50 PM Vishal Santoshi <[hidden email]> wrote:
Yes sir. The allowedLateNess and side output always existed. 

On Thu, Apr 22, 2021 at 11:47 AM Matthias Pohl <[hidden email]> wrote:
You're saying that you used `allowedLateness`/`sideOutputLateData` as described in [1] but without the `LateEventFilter`/`LateEventSideOutput` being added to your pipeline when running into the UnsupportedOperationException issue previously?


On Thu, Apr 22, 2021 at 5:32 PM Vishal Santoshi <[hidden email]> wrote:
As in this is essentially doing what lateness should have done  And I think that is a bug. My code now is . Please look at the allowedLateness on the session window.

SingleOutputStreamOperator<KeyedTimedValue<KEY, VALUE>> filteredKeyedValue = keyedValue
.process(new LateEventFilter(this.lateNessInMinutes*60*1000l)).name("late_filter").uid("late_filter");
SingleOutputStreamOperator<KeyedTimedValue<KEY, VALUE>> lateKeyedValue = keyedValue
.process(new LateEventSideOutput(this.lateNessInMinutes*60*1000l)).name("late_data").uid("late_data");
SingleOutputStreamOperator<KeyedSessionWithSessionID<KEY, VALUE>> aggregate = filteredKeyedValue
.filter((f) -> f.key != null && f.timedValue.getEventTime() != null).keyBy(value -> value.getKey())
.window(EventTimeSessionWindows.withGap(Time.minutes(gapInMinutes)))
.allowedLateness(Time.minutes(lateNessInMinutes)).sideOutputLateData(lateOutputTag)
.trigger(PurgingTrigger.of(CountTrigger.of(1)))
.aggregate(new SortAggregate<KEY, VALUE>(),
new SessionIdProcessWindowFunction<KEY, VALUE>(this.gapInMinutes, this.lateNessInMinutes))
.name("session_aggregate").uid("session_aggregate");

On Thu, Apr 22, 2021 at 9:59 AM Vishal Santoshi <[hidden email]> wrote:
I can do that, but I am not certain this is the right filter.  Can you please validate. That aside I already have the lateness configured for the session window ( the normal withLateNess() )  and this looks like a session window was not collected and still is alive for some reason ( a flink bug ? ) 

if (ctx.timestamp() + allowedLateness > ctx.timerService().currentWatermark()) {
out.collect(value);
}


On Thu, Apr 22, 2021 at 9:46 AM Matthias Pohl <[hidden email]> wrote:
Hi Vishal,
based on the error message and the behavior you described, introducing a filter for late events is the way to go - just as described in the SO thread you mentioned. Usually, you would collect late events in some kind of side output [1].

I hope that helps.
Matthias


On Thu, Apr 22, 2021 at 3:22 PM Vishal Santoshi <[hidden email]> wrote:
I saw  https://stackoverflow.com/questions/57334257/the-end-timestamp-of-an-event-time-window-cannot-become-earlier-than-the-current. and this seems to suggest a straight up filter, but I am not sure how does that filter works as in would it factor is the lateness when filtering ? 

On Thu, Apr 22, 2021 at 8:24 AM Vishal Santoshi <[hidden email]> wrote:
Well it was not a solution after all. We now have a session window that is stuck with the same issue albeit  after the additional lateness. I had increased the lateness to 2 days and that masked the issue which again reared it's head after the 2 days ;lateness was over ( instead of the 1 day ) before. This is very disconcerting.

Caused by: java.lang.UnsupportedOperationException: The end timestamp of an event-time window cannot become earlier than the current watermark by merging. Current watermark: 1619053742129 window: TimeWindow{start=1618877773663, end=1618879580402}



On Wed, Apr 21, 2021 at 7:05 PM Vishal Santoshi <[hidden email]> wrote:
Hey folks,
               I had a pipe with sessionization restarts and then fail after retries with this exception. The only thing I had done was to increase the lateness by 12 hours ( to  a day )  in this pipe and restart from SP and it ran for 12 hours plus without issue. I cannot imagine that increasing the lateness created this and the way I solved this was to increase the lateness further. Could this be if there are TMs in the cluster whose time is off ( as in not synchronized )  ?

2021-04-21 11:27:58
java.lang.UnsupportedOperationException: The end timestamp of an event-time window cannot become earlier than the current watermark by merging. Current watermark: 1618966593999 window: TimeWindow{start=1618878336107, end=1618880140466}
    at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator$2.merge(WindowOperator.java:339)
    at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator$2.merge(WindowOperator.java:321)
    at org.apache.flink.streaming.runtime.operators.windowing.MergingWindowSet.addWindow(MergingWindowSet.java:209)
    at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:319)
    at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:191)
    at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:204)
    at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:174)
    at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:396)
    at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:191)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:617)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:581)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
    at java.lang.Thread.run(Thread.java:748)


Reply | Threaded
Open this post in threaded view
|

Re: event-time window cannot become earlier than the current watermark by merging

Matthias
After having talked to David about this issue offline, I decided to create a Jira ticket FLINK-22425 [1] to cover this. Thanks for reporting it on the mailing list, Vishal. Hopefully, the community has the chance to look into it.


On Fri, Apr 23, 2021 at 8:16 AM Matthias Pohl <[hidden email]> wrote:
To me, it sounds strange. I would have expected it to work with `allowedLateness` and `sideOutput` being defined. I pull in David to have a look at it. Maybe, he has some more insights. I haven't worked that much with lateness, yet.

Matthias

On Thu, Apr 22, 2021 at 10:57 PM Vishal Santoshi <[hidden email]> wrote:
 <<  Added the Fliter upfront  as below, the pipe has no issues. Also metrics show that no data is being pushed through the sideoutput and that data in not pulled from the a simulated sideout ( below ) 

>> Added the Fliter upfront  as below, the pipe has no issues. Also metrics show that no data is being pushed through the sideoutput and that data in now pulled from the simulated sideout , essentially the Process Function with a reverse predicate to the Filter Process Function.


On Thu, Apr 22, 2021 at 1:56 PM Vishal Santoshi <[hidden email]> wrote:
And when I added the filter the Exception was not thrown. So the sequence of events

* Increased lateness from 12 ( that was what it was initially running with )  to 24 hours
* the pipe ran as desired before it blew up with the Exception
* masked the issue by increasing the lateness to 48 hours.
* It blew up again but now after the added lateness, so essentially the same issue but added lateness let the pipe run for another few hours.
* Added the Fliter upfront  as below, the pipe has no issues. Also metrics show that no data is being pushed through the sideoutput and that data in not pulled from the a simulated sideout ( below ) 


public class LateEventFilter extends ProcessFunction<KeyedTimedValue<KEY, VALUE>, KeyedTimedValue<KEY, VALUE>> {
private static final long serialVersionUID = 1L;

long allowedLateness;
public LateEventFilter(long allowedLateness){
this.allowedLateness = allowedLateness;
}
@Override
public void processElement(KeyedTimedValue<KEY, VALUE> value, Context ctx,
Collector<KeyedTimedValue<KEY, VALUE>> out) throws Exception {
if (ctx.timestamp() + allowedLateness > ctx.timerService().currentWatermark()) {
out.collect(value);
}
}
}


public class LateEventSideOutput extends ProcessFunction<KeyedTimedValue<KEY, VALUE>, KeyedTimedValue<KEY, VALUE>> {
private static final long serialVersionUID = 1L;

long allowedLateness;
public LateEventSideOutput(long allowedLateness){
this.allowedLateness = allowedLateness;
}
@Override
public void processElement(KeyedTimedValue<KEY, VALUE> value, Context ctx,
Collector<KeyedTimedValue<KEY, VALUE>> out) throws Exception {
if (ctx.timestamp() + allowedLateness <= ctx.timerService().currentWatermark()) {
out.collect(value);
}
}
}



 I am using RocksDB as a backend if that helps. 

On Thu, Apr 22, 2021 at 1:50 PM Vishal Santoshi <[hidden email]> wrote:
Yes sir. The allowedLateNess and side output always existed. 

On Thu, Apr 22, 2021 at 11:47 AM Matthias Pohl <[hidden email]> wrote:
You're saying that you used `allowedLateness`/`sideOutputLateData` as described in [1] but without the `LateEventFilter`/`LateEventSideOutput` being added to your pipeline when running into the UnsupportedOperationException issue previously?


On Thu, Apr 22, 2021 at 5:32 PM Vishal Santoshi <[hidden email]> wrote:
As in this is essentially doing what lateness should have done  And I think that is a bug. My code now is . Please look at the allowedLateness on the session window.

SingleOutputStreamOperator<KeyedTimedValue<KEY, VALUE>> filteredKeyedValue = keyedValue
.process(new LateEventFilter(this.lateNessInMinutes*60*1000l)).name("late_filter").uid("late_filter");
SingleOutputStreamOperator<KeyedTimedValue<KEY, VALUE>> lateKeyedValue = keyedValue
.process(new LateEventSideOutput(this.lateNessInMinutes*60*1000l)).name("late_data").uid("late_data");
SingleOutputStreamOperator<KeyedSessionWithSessionID<KEY, VALUE>> aggregate = filteredKeyedValue
.filter((f) -> f.key != null && f.timedValue.getEventTime() != null).keyBy(value -> value.getKey())
.window(EventTimeSessionWindows.withGap(Time.minutes(gapInMinutes)))
.allowedLateness(Time.minutes(lateNessInMinutes)).sideOutputLateData(lateOutputTag)
.trigger(PurgingTrigger.of(CountTrigger.of(1)))
.aggregate(new SortAggregate<KEY, VALUE>(),
new SessionIdProcessWindowFunction<KEY, VALUE>(this.gapInMinutes, this.lateNessInMinutes))
.name("session_aggregate").uid("session_aggregate");

On Thu, Apr 22, 2021 at 9:59 AM Vishal Santoshi <[hidden email]> wrote:
I can do that, but I am not certain this is the right filter.  Can you please validate. That aside I already have the lateness configured for the session window ( the normal withLateNess() )  and this looks like a session window was not collected and still is alive for some reason ( a flink bug ? ) 

if (ctx.timestamp() + allowedLateness > ctx.timerService().currentWatermark()) {
out.collect(value);
}


On Thu, Apr 22, 2021 at 9:46 AM Matthias Pohl <[hidden email]> wrote:
Hi Vishal,
based on the error message and the behavior you described, introducing a filter for late events is the way to go - just as described in the SO thread you mentioned. Usually, you would collect late events in some kind of side output [1].

I hope that helps.
Matthias


On Thu, Apr 22, 2021 at 3:22 PM Vishal Santoshi <[hidden email]> wrote:
I saw  https://stackoverflow.com/questions/57334257/the-end-timestamp-of-an-event-time-window-cannot-become-earlier-than-the-current. and this seems to suggest a straight up filter, but I am not sure how does that filter works as in would it factor is the lateness when filtering ? 

On Thu, Apr 22, 2021 at 8:24 AM Vishal Santoshi <[hidden email]> wrote:
Well it was not a solution after all. We now have a session window that is stuck with the same issue albeit  after the additional lateness. I had increased the lateness to 2 days and that masked the issue which again reared it's head after the 2 days ;lateness was over ( instead of the 1 day ) before. This is very disconcerting.

Caused by: java.lang.UnsupportedOperationException: The end timestamp of an event-time window cannot become earlier than the current watermark by merging. Current watermark: 1619053742129 window: TimeWindow{start=1618877773663, end=1618879580402}



On Wed, Apr 21, 2021 at 7:05 PM Vishal Santoshi <[hidden email]> wrote:
Hey folks,
               I had a pipe with sessionization restarts and then fail after retries with this exception. The only thing I had done was to increase the lateness by 12 hours ( to  a day )  in this pipe and restart from SP and it ran for 12 hours plus without issue. I cannot imagine that increasing the lateness created this and the way I solved this was to increase the lateness further. Could this be if there are TMs in the cluster whose time is off ( as in not synchronized )  ?

2021-04-21 11:27:58
java.lang.UnsupportedOperationException: The end timestamp of an event-time window cannot become earlier than the current watermark by merging. Current watermark: 1618966593999 window: TimeWindow{start=1618878336107, end=1618880140466}
    at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator$2.merge(WindowOperator.java:339)
    at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator$2.merge(WindowOperator.java:321)
    at org.apache.flink.streaming.runtime.operators.windowing.MergingWindowSet.addWindow(MergingWindowSet.java:209)
    at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:319)
    at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:191)
    at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:204)
    at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:174)
    at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:396)
    at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:191)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:617)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:581)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
    at java.lang.Thread.run(Thread.java:748)


Reply | Threaded
Open this post in threaded view
|

Re: event-time window cannot become earlier than the current watermark by merging

Vishal Santoshi
Great, thanks for the update.  The upfront filter does work and has for the last 24 hours and no reason why it should not. 

Again I have to note that there is no mail group that has been this reactive to issues, so thank you again.



On Fri, Apr 23, 2021 at 4:34 AM Matthias Pohl <[hidden email]> wrote:
After having talked to David about this issue offline, I decided to create a Jira ticket FLINK-22425 [1] to cover this. Thanks for reporting it on the mailing list, Vishal. Hopefully, the community has the chance to look into it.


On Fri, Apr 23, 2021 at 8:16 AM Matthias Pohl <[hidden email]> wrote:
To me, it sounds strange. I would have expected it to work with `allowedLateness` and `sideOutput` being defined. I pull in David to have a look at it. Maybe, he has some more insights. I haven't worked that much with lateness, yet.

Matthias

On Thu, Apr 22, 2021 at 10:57 PM Vishal Santoshi <[hidden email]> wrote:
 <<  Added the Fliter upfront  as below, the pipe has no issues. Also metrics show that no data is being pushed through the sideoutput and that data in not pulled from the a simulated sideout ( below ) 

>> Added the Fliter upfront  as below, the pipe has no issues. Also metrics show that no data is being pushed through the sideoutput and that data in now pulled from the simulated sideout , essentially the Process Function with a reverse predicate to the Filter Process Function.


On Thu, Apr 22, 2021 at 1:56 PM Vishal Santoshi <[hidden email]> wrote:
And when I added the filter the Exception was not thrown. So the sequence of events

* Increased lateness from 12 ( that was what it was initially running with )  to 24 hours
* the pipe ran as desired before it blew up with the Exception
* masked the issue by increasing the lateness to 48 hours.
* It blew up again but now after the added lateness, so essentially the same issue but added lateness let the pipe run for another few hours.
* Added the Fliter upfront  as below, the pipe has no issues. Also metrics show that no data is being pushed through the sideoutput and that data in not pulled from the a simulated sideout ( below ) 


public class LateEventFilter extends ProcessFunction<KeyedTimedValue<KEY, VALUE>, KeyedTimedValue<KEY, VALUE>> {
private static final long serialVersionUID = 1L;

long allowedLateness;
public LateEventFilter(long allowedLateness){
this.allowedLateness = allowedLateness;
}
@Override
public void processElement(KeyedTimedValue<KEY, VALUE> value, Context ctx,
Collector<KeyedTimedValue<KEY, VALUE>> out) throws Exception {
if (ctx.timestamp() + allowedLateness > ctx.timerService().currentWatermark()) {
out.collect(value);
}
}
}


public class LateEventSideOutput extends ProcessFunction<KeyedTimedValue<KEY, VALUE>, KeyedTimedValue<KEY, VALUE>> {
private static final long serialVersionUID = 1L;

long allowedLateness;
public LateEventSideOutput(long allowedLateness){
this.allowedLateness = allowedLateness;
}
@Override
public void processElement(KeyedTimedValue<KEY, VALUE> value, Context ctx,
Collector<KeyedTimedValue<KEY, VALUE>> out) throws Exception {
if (ctx.timestamp() + allowedLateness <= ctx.timerService().currentWatermark()) {
out.collect(value);
}
}
}



 I am using RocksDB as a backend if that helps. 

On Thu, Apr 22, 2021 at 1:50 PM Vishal Santoshi <[hidden email]> wrote:
Yes sir. The allowedLateNess and side output always existed. 

On Thu, Apr 22, 2021 at 11:47 AM Matthias Pohl <[hidden email]> wrote:
You're saying that you used `allowedLateness`/`sideOutputLateData` as described in [1] but without the `LateEventFilter`/`LateEventSideOutput` being added to your pipeline when running into the UnsupportedOperationException issue previously?


On Thu, Apr 22, 2021 at 5:32 PM Vishal Santoshi <[hidden email]> wrote:
As in this is essentially doing what lateness should have done  And I think that is a bug. My code now is . Please look at the allowedLateness on the session window.

SingleOutputStreamOperator<KeyedTimedValue<KEY, VALUE>> filteredKeyedValue = keyedValue
.process(new LateEventFilter(this.lateNessInMinutes*60*1000l)).name("late_filter").uid("late_filter");
SingleOutputStreamOperator<KeyedTimedValue<KEY, VALUE>> lateKeyedValue = keyedValue
.process(new LateEventSideOutput(this.lateNessInMinutes*60*1000l)).name("late_data").uid("late_data");
SingleOutputStreamOperator<KeyedSessionWithSessionID<KEY, VALUE>> aggregate = filteredKeyedValue
.filter((f) -> f.key != null && f.timedValue.getEventTime() != null).keyBy(value -> value.getKey())
.window(EventTimeSessionWindows.withGap(Time.minutes(gapInMinutes)))
.allowedLateness(Time.minutes(lateNessInMinutes)).sideOutputLateData(lateOutputTag)
.trigger(PurgingTrigger.of(CountTrigger.of(1)))
.aggregate(new SortAggregate<KEY, VALUE>(),
new SessionIdProcessWindowFunction<KEY, VALUE>(this.gapInMinutes, this.lateNessInMinutes))
.name("session_aggregate").uid("session_aggregate");

On Thu, Apr 22, 2021 at 9:59 AM Vishal Santoshi <[hidden email]> wrote:
I can do that, but I am not certain this is the right filter.  Can you please validate. That aside I already have the lateness configured for the session window ( the normal withLateNess() )  and this looks like a session window was not collected and still is alive for some reason ( a flink bug ? ) 

if (ctx.timestamp() + allowedLateness > ctx.timerService().currentWatermark()) {
out.collect(value);
}


On Thu, Apr 22, 2021 at 9:46 AM Matthias Pohl <[hidden email]> wrote:
Hi Vishal,
based on the error message and the behavior you described, introducing a filter for late events is the way to go - just as described in the SO thread you mentioned. Usually, you would collect late events in some kind of side output [1].

I hope that helps.
Matthias


On Thu, Apr 22, 2021 at 3:22 PM Vishal Santoshi <[hidden email]> wrote:
I saw  https://stackoverflow.com/questions/57334257/the-end-timestamp-of-an-event-time-window-cannot-become-earlier-than-the-current. and this seems to suggest a straight up filter, but I am not sure how does that filter works as in would it factor is the lateness when filtering ? 

On Thu, Apr 22, 2021 at 8:24 AM Vishal Santoshi <[hidden email]> wrote:
Well it was not a solution after all. We now have a session window that is stuck with the same issue albeit  after the additional lateness. I had increased the lateness to 2 days and that masked the issue which again reared it's head after the 2 days ;lateness was over ( instead of the 1 day ) before. This is very disconcerting.

Caused by: java.lang.UnsupportedOperationException: The end timestamp of an event-time window cannot become earlier than the current watermark by merging. Current watermark: 1619053742129 window: TimeWindow{start=1618877773663, end=1618879580402}



On Wed, Apr 21, 2021 at 7:05 PM Vishal Santoshi <[hidden email]> wrote:
Hey folks,
               I had a pipe with sessionization restarts and then fail after retries with this exception. The only thing I had done was to increase the lateness by 12 hours ( to  a day )  in this pipe and restart from SP and it ran for 12 hours plus without issue. I cannot imagine that increasing the lateness created this and the way I solved this was to increase the lateness further. Could this be if there are TMs in the cluster whose time is off ( as in not synchronized )  ?

2021-04-21 11:27:58
java.lang.UnsupportedOperationException: The end timestamp of an event-time window cannot become earlier than the current watermark by merging. Current watermark: 1618966593999 window: TimeWindow{start=1618878336107, end=1618880140466}
    at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator$2.merge(WindowOperator.java:339)
    at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator$2.merge(WindowOperator.java:321)
    at org.apache.flink.streaming.runtime.operators.windowing.MergingWindowSet.addWindow(MergingWindowSet.java:209)
    at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:319)
    at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:191)
    at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:204)
    at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:174)
    at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:396)
    at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:191)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:617)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:581)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
    at java.lang.Thread.run(Thread.java:748)