Hi,
I have been trying to understand how triggers work in Flink. We have a set of data that arrives to us on Kafka. We need to process the data in a window when either one of the two criteria satisfy: 1) Max number of elements has reached in the window. 2) Some max time has elapsed (Say 5 milliseconds in our case). I have written the following code: public class WindowTest { public static void main (String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(); env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime); DataStreamSource<Long> source = env.addSource(new SourceFunction<Long>() { @Override public void run(SourceContext<Long> ctx) throws Exception { LongStream.range(0, 102).forEach(ctx::collect); } @Override public void cancel() { } }); source.timeWindowAll(Time.milliseconds(5)).trigger(PurgingTrigger.of(CountTrigger.of(15))).apply(new AllWindowFunction<Long, Object, TimeWindow>() { @Override public void apply(TimeWindow timeWindow, Iterable<Long> values, Collector<Object> collector) throws Exception { System.out.println("processing a window"); System.out.println(Joiner.on(',').join(values)); } }).print(); env.execute("test-program"); } } Here is the output I get when I run this code: processing a window 0,1,2,3,4,5,6,7,8,9,10,11,12,13,14 processing a window 15,16,17,18,19,20,21,22,23,24,25,26,27,28,29 processing a window 30,31,32,33,34,35,36,37,38,39,40,41,42,43,44 processing a window 45,46,47,48,49,50,51,52,53,54,55,56,57,58,59 processing a window 60,61,62,63,64,65,66,67,68,69,70,71,72,73,74 processing a window 75,76,77,78,79,80,81,82,83,84,85,86,87,88,89 As you can see, the data from 90 to 101 is not processed. Shouldn't it be processed when the window is completed after 5 ms? When I remove the trigger part, all of the data does get processed from 0 to 101. Any idea why do we see such a behaviour here? -- Regards, Harshvardhan Agrawal 267.991.6618 | LinkedIn |
Hi Harshvardhan, By specifying a trigger using trigger() you are overwriting the default trigger of a WindowAssigner. For example, if you specify a CountTrigger for TumblingEventTimeWindows you will no longer get window firings based on the progress of time but only by count. Right now, you have to write your own custom trigger if you want to react based on both time and count. More details here[1]. Best, Hequn On Sun, Jul 22, 2018 at 11:59 PM, Harshvardhan Agrawal <[hidden email]> wrote:
|
Thanks for the response Hequn. I also see a weird behavior with purging trigger. It skips messages.
Here is the repro: public class WindowTest { processing a window 0,1,2,3,4,5,6 processing a window 10,11,12,13,14,15,16 processing a window 17,18,19,20,21,22,23 processing a window 24,25,26,27,28,29,30 processing a window 31,32,33,34,35,36,37 processing a window 38,39,40,41,42,43,44 processing a window 45,46,47,48,49,50,51 processing a window 52,53,54,55,56,57,58 processing a window 59,60,61,62,63,64,65 processing a window 66,67,68,69,70,71,72 processing a window 73,74,75,76,77,78,79 processing a window 80,81,82,83,84,85,86 processing a window 87,88,89,90,91,92,93 processing a window 94,95,96,97,98,99,100 It has skipped numbers 7-9. Is this expected behavior? On Sun, Jul 22, 2018 at 9:43 PM Hequn Cheng <[hidden email]> wrote:
Regards, Harshvardhan Agrawal |
Free forum by Nabble | Edit this page |