error with session window

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

error with session window

yuvraj singh


Hi all , 


I  am stuck with this error 


please help me .


I am using sessionwindow 


2018-09-23 07:15:08,097 INFO  org.apache.flink.runtime.taskmanager.Task                     - city-geohashes-processor (24/48) (26aed9a769743191c7cb0257087e490a) switched from RUNNING to FAILED.

java.lang.UnsupportedOperationException: The end timestamp of a processing-time window cannot become earlier than the current processing time by merging. Current processing time: 1537667108063 window: TimeWindow{start=1537667100054, end=1537667107237}

        at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator$2.merge(WindowOperator.java:325)

        at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator$2.merge(WindowOperator.java:311)

        at org.apache.flink.streaming.runtime.operators.windowing.MergingWindowSet.addWindow(MergingWindowSet.java:212)

        at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:311)

        at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)




Thanks 

Yuvraj Singh 

Reply | Threaded
Open this post in threaded view
|

Re: error with session window

yuvraj singh
this is my code 


DataStream<CityWithGeoHashes> cityWithGeoHashesDataStream =
filteredGeohashDataStream.keyBy(FilteredGeoHashes::getCity).window(
ProcessingTimeSessionWindows.withGap(Time.seconds(4)))
.process(new ProcessWindowFunction<FilteredGeoHashes, CityWithGeoHashes, String, TimeWindow>() {
@Override
public void process(String city, Context context, Iterable<FilteredGeoHashes> iterable, Collector<CityWithGeoHashes> collector)
throws Exception {
Map<String, List<FilteredGeoHashes>> geoHashesPerCategory = StreamSupport.stream(iterable.spliterator(), false)
.collect(Collectors.groupingBy(FilteredGeoHashes::getCategory));
collector.collect(new CityWithGeoHashes(city, geoHashesPerCategory));
}
}).name("city-geohashes-processor")
.uid("city-geohashes-processor");

On Mon, Sep 24, 2018 at 4:56 PM yuvraj singh <[hidden email]> wrote:


Hi all , 


I  am stuck with this error 


please help me .


I am using sessionwindow 


2018-09-23 07:15:08,097 INFO  org.apache.flink.runtime.taskmanager.Task                     - city-geohashes-processor (24/48) (26aed9a769743191c7cb0257087e490a) switched from RUNNING to FAILED.

java.lang.UnsupportedOperationException: The end timestamp of a processing-time window cannot become earlier than the current processing time by merging. Current processing time: 1537667108063 window: TimeWindow{start=1537667100054, end=1537667107237}

        at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator$2.merge(WindowOperator.java:325)

        at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator$2.merge(WindowOperator.java:311)

        at org.apache.flink.streaming.runtime.operators.windowing.MergingWindowSet.addWindow(MergingWindowSet.java:212)

        at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:311)

        at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)




Thanks 

Yuvraj Singh 

Reply | Threaded
Open this post in threaded view
|

Re: error with session window

Dawid Wysakowicz-2

Hi Yuvraj,

It looks as some race condition for me. Would it be ok for you to switch to either Event or Ingestion time[1]?

I also cced @Aljosha who might give you a bit more insights

Best,

Dawid


[1] https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/event_time.html#event-time--processing-time--ingestion-time

On 24/09/18 13:26, yuvraj singh wrote:
this is my code 


DataStream<CityWithGeoHashes> cityWithGeoHashesDataStream =
        filteredGeohashDataStream.keyBy(FilteredGeoHashes::getCity).window(
                ProcessingTimeSessionWindows.withGap(Time.seconds(4)))
                .process(new ProcessWindowFunction<FilteredGeoHashes, CityWithGeoHashes, String, TimeWindow>() {
                    @Override
                    public void process(String city, Context context, Iterable<FilteredGeoHashes> iterable, Collector<CityWithGeoHashes> collector)
                            throws Exception {
                        Map<String, List<FilteredGeoHashes>> geoHashesPerCategory = StreamSupport.stream(iterable.spliterator(), false)
                                .collect(Collectors.groupingBy(FilteredGeoHashes::getCategory));
                        collector.collect(new CityWithGeoHashes(city, geoHashesPerCategory));
                    }
                }).name("city-geohashes-processor")
                .uid("city-geohashes-processor");

On Mon, Sep 24, 2018 at 4:56 PM yuvraj singh <[hidden email]> wrote:


Hi all , 


I  am stuck with this error 


please help me .


I am using sessionwindow 


2018-09-23 07:15:08,097 INFO  org.apache.flink.runtime.taskmanager.Task                     - city-geohashes-processor (24/48) (26aed9a769743191c7cb0257087e490a) switched from RUNNING to FAILED.

java.lang.UnsupportedOperationException: The end timestamp of a processing-time window cannot become earlier than the current processing time by merging. Current processing time: 1537667108063 window: TimeWindow{start=1537667100054, end=1537667107237}

        at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator$2.merge(WindowOperator.java:325)

        at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator$2.merge(WindowOperator.java:311)

        at org.apache.flink.streaming.runtime.operators.windowing.MergingWindowSet.addWindow(MergingWindowSet.java:212)

        at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:311)

        at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)




Thanks 

Yuvraj Singh 



signature.asc (849 bytes) Download Attachment