Hi Yuvraj,
It looks as some race condition for me. Would it be ok for you to switch to either Event or Ingestion time[1]?
I also cced @Aljosha who might give you a bit more insights
Best,
Dawid
this is my code
DataStream<CityWithGeoHashes> cityWithGeoHashesDataStream = filteredGeohashDataStream.keyBy(FilteredGeoHashes::getCity).window( ProcessingTimeSessionWindows.withGap(Time.seconds(4))) .process(new ProcessWindowFunction<FilteredGeoHashes, CityWithGeoHashes, String, TimeWindow>() { @Override public void process(String city, Context context, Iterable<FilteredGeoHashes> iterable, Collector<CityWithGeoHashes> collector) throws Exception { Map<String, List<FilteredGeoHashes>> geoHashesPerCategory = StreamSupport.stream(iterable.spliterator(), false) .collect(Collectors.groupingBy(FilteredGeoHashes::getCategory)); collector.collect(new CityWithGeoHashes(city, geoHashesPerCategory)); } }).name("city-geohashes-processor") .uid("city-geohashes-processor");
On Mon, Sep 24, 2018 at 4:56 PM yuvraj singh <[hidden email]> wrote:
Hi all ,
I am stuck with this error
please help me .
I am using sessionwindow
2018-09-23 07:15:08,097 INFO org.apache.flink.runtime.taskmanager.Task - city-geohashes-processor (24/48) (26aed9a769743191c7cb0257087e490a) switched from RUNNING to FAILED.
java.lang.UnsupportedOperationException: The end timestamp of a processing-time window cannot become earlier than the current processing time by merging. Current processing time: 1537667108063 window: TimeWindow{start=1537667100054, end=1537667107237}
at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator$2.merge(WindowOperator.java:325)
at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator$2.merge(WindowOperator.java:311)
at org.apache.flink.streaming.runtime.operators.windowing.MergingWindowSet.addWindow(MergingWindowSet.java:212)
at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:311)
at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)
Thanks
Yuvraj Singh
Free forum by Nabble | Edit this page |