Expected behaviour of windows

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

Expected behaviour of windows

Abdul Salam Shaikh
Hi, 

I needed some clarity on the behaviour of the windows for my use case. 
I have defined my stream as follows: 

        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
          env.setParallelism(1);
        DataStream<String> live = env.addSource(new JsonTestSource());
        DataStream<FlatObject> jsonToTuple = live.flatMap(new Splitter());                                                                     
        KeyedStream<FlatObject, String> keyStream =  jsonToTuple.keyBy(new KeySelector<FlatObject,String>() {
        public String getKey(FlatObject value) throws Exception {
            return value.getIntersectionName();
        }
        });

        DataStream<FlatObject> flatStream = keyStream.window(GlobalWindows.create())
                                                                            .trigger(new WindowCustomTrigger())
                                                                            .apply(new TrafficWindow());
        flatStream.print();

For a given set of Json Objects(Ideal case): 

{"event":[{"CurrentTimeInCycle":20,"S01":"G"}]}
{"event":[{"CurrentTimeInCycle":30,"S01":"G"}]}
{"event":[{"CurrentTimeInCycle":40,"S01":"G"}]}
{"event":[{"CurrentTimeInCycle":60,"S01":"G"}]}
{"event":[{"CurrentTimeInCycle":10,"S01":"G"}]}
------------------------------------------------------------ Trigger (Because the CurrentTimeInCycle is less for the current event than the previous event) 
{"event":[{"CurrentTimeInCycle":20,"S01":"G"}]}
{"event":[{"CurrentTimeInCycle":30,"S01":"G"}]}
{"event":[{"CurrentTimeInCycle":40,"S01":"G"}]}
{"event":[{"CurrentTimeInCycle":60,"S01":"G"}]}
{"event":[{"CurrentTimeInCycle":5,"S01":"G"}]}
------------------------------------------------------------ Trigger (Because the CurrentTimeInCycle is less for the current event than the previous event) 

In my current program, the output is as the following (All the objects from the previous window are a part of the next window and it keeps on adding up to the next): 

{"event":[{"CurrentTimeInCycle":20,"S01":"G"}]}
{"event":[{"CurrentTimeInCycle":30,"S01":"G"}]}
{"event":[{"CurrentTimeInCycle":40,"S01":"G"}]}
{"event":[{"CurrentTimeInCycle":60,"S01":"G"}]}
{"event":[{"CurrentTimeInCycle":10,"S01":"G"}]}
------------------------------------------------------------ Trigger (Because the CurrentTimeInCycle is less for the current event than the previous event) 
{"event":[{"CurrentTimeInCycle":20,"S01":"G"}]}
{"event":[{"CurrentTimeInCycle":30,"S01":"G"}]}
{"event":[{"CurrentTimeInCycle":40,"S01":"G"}]}
{"event":[{"CurrentTimeInCycle":60,"S01":"G"}]}
{"event":[{"CurrentTimeInCycle":10,"S01":"G"}]}
{"event":[{"CurrentTimeInCycle":20,"S01":"G"}]}
{"event":[{"CurrentTimeInCycle":30,"S01":"G"}]}
{"event":[{"CurrentTimeInCycle":40,"S01":"G"}]}
{"event":[{"CurrentTimeInCycle":60,"S01":"G"}]}
{"event":[{"CurrentTimeInCycle":5,"S01":"G"}]}
------------------------------------------------------------ Trigger (Because the CurrentTimeInCycle is less for the current event than the previous event) 


I am not sure if this is what is the expected behaviour of the windows. Is there anything which I can do to get my program working to the ideal case(I mentioned above). 

Thanks in anticipation!


Reply | Threaded
Open this post in threaded view
|

Re: Expected behaviour of windows

Jonas Gröger
The documentation says
https://flink.apache.org/news/2015/12/04/Introducing-windows.html wrote
On each event, a trigger can decide to fire (i.e., evaluate), purge (remove the window and discard its content), or fire and then purge the window. A trigger that just fires evaluates the window and keeps it as it is, i.e., all elements remain in the window and are evaluated again when the triggers fires the next time.
So you can choose between Fire, Purge and Fire&Purge. Seems like selected Fire but meant to choose Fire&Purge.

From what it seems you want a PurgingTrigger. You also didn't state what version of Flink you are using :)

-- Jonas
Reply | Threaded
Open this post in threaded view
|

Re: Expected behaviour of windows

Abdul Salam Shaikh
Thank you Jonas, I am using version 1.2-SNAPSHOT of Apache Flink to leverage the advanced Evictor class. 

However, while trying to use FIRE_AND_PURGE I am getting the following error: 

java.lang.UnsupportedOperationException: Not supported yet.
at de.traffic.ui.streaming.WindowCustomTrigger.clear(WindowCustomTrigger.java:51)
at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator$Context.clear(WindowOperator.java:643)
at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.cleanup(WindowOperator.java:421)
at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:321)
at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:185)
at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:63)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:269)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:654)
at java.lang.Thread.run(Thread.java:745)

Do we have support for this functionality in 1.2-SNAPSHOT

Thanks.

On Mon, Jan 23, 2017 at 10:57 AM, Jonas <[hidden email]> wrote:
The documentation says

https://flink.apache.org/news/2015/12/04/Introducing-windows.html wrote
> On each event, a trigger can decide to fire (i.e., evaluate), purge
> (remove the window and discard its content), or fire and then purge the
> window. A trigger that just fires evaluates the window and keeps it as it
> is, i.e., all elements remain in the window and are evaluated again when
> the triggers fires the next time.

So you can choose between *Fire*, *Purge* and *Fire&Purge*. Seems like
selected *Fire* but meant to choose *Fire&Purge*.

From what it seems you want a PurgingTrigger. You also didn't state what
version of Flink you are using :)

-- Jonas



--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Expected-behaviour-of-windows-tp11200p11205.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.



--
Thanks & Regards,

Abdul Salam Shaikh

Reply | Threaded
Open this post in threaded view
|

Re: Expected behaviour of windows

Abdul Salam Shaikh
This is my definiton of the trigger for more clarity into the issue I am running: 

@Override
    public TriggerResult onElement(FlatObject t, long l, Window w, TriggerContext tc) throws Exception {
        long currentTimeInCycle = t.getCurrentTimeInCycle();
        if (lastKnownCurrentTimeInCycle > currentTimeInCycle) {
            lastKnownCurrentTimeInCycle = 0;
            return TriggerResult.FIRE_AND_PURGE;
        }
        lastKnownCurrentTimeInCycle = currentTimeInCycle;
        return TriggerResult.CONTINUE;
    }

On Mon, Jan 23, 2017 at 10:02 PM, Abdul Salam Shaikh <[hidden email]> wrote:
Thank you Jonas, I am using version 1.2-SNAPSHOT of Apache Flink to leverage the advanced Evictor class. 

However, while trying to use FIRE_AND_PURGE I am getting the following error: 

java.lang.UnsupportedOperationException: Not supported yet.
at de.traffic.ui.streaming.WindowCustomTrigger.clear(WindowCustomTrigger.java:51)
at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator$Context.clear(WindowOperator.java:643)
at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.cleanup(WindowOperator.java:421)
at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:321)
at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:185)
at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:63)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:269)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:654)
at java.lang.Thread.run(Thread.java:745)

Do we have support for this functionality in 1.2-SNAPSHOT

Thanks.

On Mon, Jan 23, 2017 at 10:57 AM, Jonas <[hidden email]> wrote:
The documentation says

https://flink.apache.org/news/2015/12/04/Introducing-windows.html wrote
> On each event, a trigger can decide to fire (i.e., evaluate), purge
> (remove the window and discard its content), or fire and then purge the
> window. A trigger that just fires evaluates the window and keeps it as it
> is, i.e., all elements remain in the window and are evaluated again when
> the triggers fires the next time.

So you can choose between *Fire*, *Purge* and *Fire&Purge*. Seems like
selected *Fire* but meant to choose *Fire&Purge*.

From what it seems you want a PurgingTrigger. You also didn't state what
version of Flink you are using :)

-- Jonas



--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Expected-behaviour-of-windows-tp11200p11205.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.



--
Thanks & Regards,

Abdul Salam Shaikh




--
Thanks & Regards,

Abdul Salam Shaikh

Reply | Threaded
Open this post in threaded view
|

Re: Expected behaviour of windows

Abdul Salam Shaikh
It was an exception because I had missed the clear() function within my CustomTrigger. 

It works as expected now. Thanks for all the help :)  

On Tue, Jan 24, 2017 at 12:23 AM, Abdul Salam Shaikh <[hidden email]> wrote:
This is my definiton of the trigger for more clarity into the issue I am running: 

@Override
    public TriggerResult onElement(FlatObject t, long l, Window w, TriggerContext tc) throws Exception {
        long currentTimeInCycle = t.getCurrentTimeInCycle();
        if (lastKnownCurrentTimeInCycle > currentTimeInCycle) {
            lastKnownCurrentTimeInCycle = 0;
            return TriggerResult.FIRE_AND_PURGE;
        }
        lastKnownCurrentTimeInCycle = currentTimeInCycle;
        return TriggerResult.CONTINUE;
    }

On Mon, Jan 23, 2017 at 10:02 PM, Abdul Salam Shaikh <[hidden email]> wrote:
Thank you Jonas, I am using version 1.2-SNAPSHOT of Apache Flink to leverage the advanced Evictor class. 

However, while trying to use FIRE_AND_PURGE I am getting the following error: 

java.lang.UnsupportedOperationException: Not supported yet.
at de.traffic.ui.streaming.WindowCustomTrigger.clear(WindowCustomTrigger.java:51)
at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator$Context.clear(WindowOperator.java:643)
at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.cleanup(WindowOperator.java:421)
at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:321)
at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:185)
at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:63)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:269)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:654)
at java.lang.Thread.run(Thread.java:745)

Do we have support for this functionality in 1.2-SNAPSHOT

Thanks.

On Mon, Jan 23, 2017 at 10:57 AM, Jonas <[hidden email]> wrote:
The documentation says

https://flink.apache.org/news/2015/12/04/Introducing-windows.html wrote
> On each event, a trigger can decide to fire (i.e., evaluate), purge
> (remove the window and discard its content), or fire and then purge the
> window. A trigger that just fires evaluates the window and keeps it as it
> is, i.e., all elements remain in the window and are evaluated again when
> the triggers fires the next time.

So you can choose between *Fire*, *Purge* and *Fire&Purge*. Seems like
selected *Fire* but meant to choose *Fire&Purge*.

From what it seems you want a PurgingTrigger. You also didn't state what
version of Flink you are using :)

-- Jonas



--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Expected-behaviour-of-windows-tp11200p11205.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.



--
Thanks & Regards,

Abdul Salam Shaikh




--
Thanks & Regards,

Abdul Salam Shaikh




--
Thanks & Regards,

Abdul Salam Shaikh