Error while using session window

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

Error while using session window

Abhishek Jain
Hi,
I have a job that uses processing time session window with inactivity gap of 60ms where I intermittently run into the following exception. I'm trying to figure out what happened here. Haven't been able to reproduce this scenario. Any thoughts?

java.lang.UnsupportedOperationException: The end timestamp of a processing-time window cannot become earlier than the current processing time by merging. Current processing time: 1560493731808 window: TimeWindow{start=1560493731654, end=1560493731778}
	at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator$2.merge(WindowOperator.java:325)
	at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator$2.merge(WindowOperator.java:311)
	at org.apache.flink.streaming.runtime.operators.windowing.MergingWindowSet.addWindow(MergingWindowSet.java:212)
	at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:311)
	at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)
	at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
	at java.lang.Thread.run(Thread.java:745)

--
Warm Regards,
Abhishek Jain

Reply | Threaded
Open this post in threaded view
|

Re: Error while using session window

Piotr Nowojski-3
Hi,

Thanks for reporting the issue. I think this might be caused by System.currentTimeMillis() not being monotonic [1] and the fact Flink is accessing this function per element multiple times (at least twice: first for creating a window, second to perform the check that has failed in your case), however I’m pretty sure that this is more general problem in more places.I have created a ticket for this. [2]

I’m not sure if there is an easy hot fix for that. You would have to increase inactivity gap, switch to ingestion/even time (anyway preferable), make sure that machine’s time doesn’t change or just ignore the problem and accept some failure from time to time.

Piotrek

[1] https://stackoverflow.com/questions/2978598/will-system-currenttimemillis-always-return-a-value-previous-calls

On 14 Jun 2019, at 10:14, Abhishek Jain <[hidden email]> wrote:

Hi,
I have a job that uses processing time session window with inactivity gap of 60ms where I intermittently run into the following exception. I'm trying to figure out what happened here. Haven't been able to reproduce this scenario. Any thoughts?

java.lang.UnsupportedOperationException: The end timestamp of a processing-time window cannot become earlier than the current processing time by merging. Current processing time: 1560493731808 window: TimeWindow{start=1560493731654, end=1560493731778}
	at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator$2.merge(WindowOperator.java:325)
	at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator$2.merge(WindowOperator.java:311)
	at org.apache.flink.streaming.runtime.operators.windowing.MergingWindowSet.addWindow(MergingWindowSet.java:212)
	at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:311)
	at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)
	at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
	at java.lang.Thread.run(Thread.java:745)

--
Warm Regards,
Abhishek Jain