Hi, I needed some clarity on the behaviour of the windows for my use case. I have defined my stream as follows: env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); env.setParallelism(1); DataStream<String> live = env.addSource(new JsonTestSource()); DataStream<FlatObject> jsonToTuple = live.flatMap(new Splitter()); KeyedStream<FlatObject, String> keyStream = jsonToTuple.keyBy(new KeySelector<FlatObject,String>() { public String getKey(FlatObject value) throws Exception { return value.getIntersectionName(); } }); DataStream<FlatObject> flatStream = keyStream.window(GlobalWindows.create()) .trigger(new WindowCustomTrigger()) .apply(new TrafficWindow()); flatStream.print(); For a given set of Json Objects(Ideal case): {"event":[{"CurrentTimeInCycle":20,"S01":"G"}]} {"event":[{"CurrentTimeInCycle":30,"S01":"G"}]} {"event":[{"CurrentTimeInCycle":40,"S01":"G"}]} {"event":[{"CurrentTimeInCycle":60,"S01":"G"}]} {"event":[{"CurrentTimeInCycle":10,"S01":"G"}]} ------------------------------------------------------------ Trigger (Because the CurrentTimeInCycle is less for the current event than the previous event) {"event":[{"CurrentTimeInCycle":20,"S01":"G"}]} {"event":[{"CurrentTimeInCycle":30,"S01":"G"}]} {"event":[{"CurrentTimeInCycle":40,"S01":"G"}]} {"event":[{"CurrentTimeInCycle":60,"S01":"G"}]} {"event":[{"CurrentTimeInCycle":5,"S01":"G"}]} ------------------------------------------------------------ Trigger (Because the CurrentTimeInCycle is less for the current event than the previous event) In my current program, the output is as the following (All the objects from the previous window are a part of the next window and it keeps on adding up to the next): {"event":[{"CurrentTimeInCycle":20,"S01":"G"}]} {"event":[{"CurrentTimeInCycle":30,"S01":"G"}]} {"event":[{"CurrentTimeInCycle":40,"S01":"G"}]} {"event":[{"CurrentTimeInCycle":60,"S01":"G"}]} {"event":[{"CurrentTimeInCycle":10,"S01":"G"}]} ------------------------------------------------------------ Trigger (Because the CurrentTimeInCycle is less for the current event than the previous event) {"event":[{"CurrentTimeInCycle":20,"S01":"G"}]} {"event":[{"CurrentTimeInCycle":30,"S01":"G"}]} {"event":[{"CurrentTimeInCycle":40,"S01":"G"}]} {"event":[{"CurrentTimeInCycle":60,"S01":"G"}]} {"event":[{"CurrentTimeInCycle":10,"S01":"G"}]} {"event":[{"CurrentTimeInCycle":20,"S01":"G"}]} {"event":[{"CurrentTimeInCycle":30,"S01":"G"}]} {"event":[{"CurrentTimeInCycle":40,"S01":"G"}]} {"event":[{"CurrentTimeInCycle":60,"S01":"G"}]} {"event":[{"CurrentTimeInCycle":5,"S01":"G"}]} ------------------------------------------------------------ Trigger (Because the CurrentTimeInCycle is less for the current event than the previous event) I am not sure if this is what is the expected behaviour of the windows. Is there anything which I can do to get my program working to the ideal case(I mentioned above). Thanks in anticipation! |
The documentation says
So you can choose between Fire, Purge and Fire&Purge. Seems like selected Fire but meant to choose Fire&Purge. From what it seems you want a PurgingTrigger. You also didn't state what version of Flink you are using :) -- Jonas |
Thank you Jonas, I am using version 1.2-SNAPSHOT of Apache Flink to leverage the advanced Evictor class. However, while trying to use FIRE_AND_PURGE I am getting the following error: java.lang.UnsupportedOperationException: Not supported yet. at de.traffic.ui.streaming.WindowCustomTrigger.clear(WindowCustomTrigger.java:51) at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator$Context.clear(WindowOperator.java:643) at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.cleanup(WindowOperator.java:421) at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:321) at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:185) at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:63) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:269) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:654) at java.lang.Thread.run(Thread.java:745) Do we have support for this functionality in 1.2-SNAPSHOT ? Thanks. On Mon, Jan 23, 2017 at 10:57 AM, Jonas <[hidden email]> wrote: The documentation says Thanks & Regards, Abdul Salam Shaikh |
This is my definiton of the trigger for more clarity into the issue I am running: @Override public TriggerResult onElement(FlatObject t, long l, Window w, TriggerContext tc) throws Exception { long currentTimeInCycle = t.getCurrentTimeInCycle(); if (lastKnownCurrentTimeInCycle > currentTimeInCycle) { lastKnownCurrentTimeInCycle = 0; return TriggerResult.FIRE_AND_PURGE; } lastKnownCurrentTimeInCycle = currentTimeInCycle; return TriggerResult.CONTINUE; } On Mon, Jan 23, 2017 at 10:02 PM, Abdul Salam Shaikh <[hidden email]> wrote:
Thanks & Regards, Abdul Salam Shaikh |
It was an exception because I had missed the clear() function within my CustomTrigger. It works as expected now. Thanks for all the help :) On Tue, Jan 24, 2017 at 12:23 AM, Abdul Salam Shaikh <[hidden email]> wrote:
Thanks & Regards, Abdul Salam Shaikh |
Free forum by Nabble | Edit this page |