Behaviour of triggers in Flink

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

Behaviour of triggers in Flink

Harshvardhan Agrawal
Hi,

I have been trying to understand how triggers work in Flink. We have a set of data that arrives to us on Kafka. We need to process the data in a window when either one of the two criteria satisfy:
1) Max number of elements has reached in the window.
2) Some max time has elapsed (Say 5 milliseconds in our case).

I have written the following code:

public class WindowTest {
    public static void main (String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
        DataStreamSource<Long> source = env.addSource(new SourceFunction<Long>() {

            @Override
            public void run(SourceContext<Long> ctx) throws Exception {
                LongStream.range(0, 102).forEach(ctx::collect);
            }

            @Override
            public void cancel() {

            }
        });

        source.timeWindowAll(Time.milliseconds(5)).trigger(PurgingTrigger.of(CountTrigger.of(15))).apply(new AllWindowFunction<Long, Object, TimeWindow>() {
            @Override
            public void apply(TimeWindow timeWindow, Iterable<Long> values, Collector<Object> collector) throws Exception {
                System.out.println("processing a window");
                System.out.println(Joiner.on(',').join(values));
            }
        }).print();

        env.execute("test-program");

    }
}

Here is the output I get when I run this code:

processing a window
0,1,2,3,4,5,6,7,8,9,10,11,12,13,14
processing a window
15,16,17,18,19,20,21,22,23,24,25,26,27,28,29
processing a window
30,31,32,33,34,35,36,37,38,39,40,41,42,43,44
processing a window
45,46,47,48,49,50,51,52,53,54,55,56,57,58,59
processing a window
60,61,62,63,64,65,66,67,68,69,70,71,72,73,74
processing a window
75,76,77,78,79,80,81,82,83,84,85,86,87,88,89

As you can see, the data from 90 to 101 is not processed. Shouldn't it be processed when the window is completed after 5 ms?

When I remove the trigger part, all of the data does get processed from 0 to 101.
 
Any idea why do we see such a behaviour here?
--
Regards,
Harshvardhan Agrawal
267.991.6618 | LinkedIn
Reply | Threaded
Open this post in threaded view
|

Re: Behaviour of triggers in Flink

Hequn Cheng
Hi Harshvardhan,

By specifying a trigger using trigger() you are overwriting the default trigger of a WindowAssigner. For example, if you specify a CountTrigger for TumblingEventTimeWindows you will no longer get window firings based on the progress of time but only by count. Right now, you have to write your own custom trigger if you want to react based on both time and count.
More details here[1].

Best, Hequn


On Sun, Jul 22, 2018 at 11:59 PM, Harshvardhan Agrawal <[hidden email]> wrote:
Hi,

I have been trying to understand how triggers work in Flink. We have a set of data that arrives to us on Kafka. We need to process the data in a window when either one of the two criteria satisfy:
1) Max number of elements has reached in the window.
2) Some max time has elapsed (Say 5 milliseconds in our case).

I have written the following code:

public class WindowTest {
    public static void main (String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
        DataStreamSource<Long> source = env.addSource(new SourceFunction<Long>() {

            @Override
            public void run(SourceContext<Long> ctx) throws Exception {
                LongStream.range(0, 102).forEach(ctx::collect);
            }

            @Override
            public void cancel() {

            }
        });

        source.timeWindowAll(Time.milliseconds(5)).trigger(PurgingTrigger.of(CountTrigger.of(15))).apply(new AllWindowFunction<Long, Object, TimeWindow>() {
            @Override
            public void apply(TimeWindow timeWindow, Iterable<Long> values, Collector<Object> collector) throws Exception {
                System.out.println("processing a window");
                System.out.println(Joiner.on(',').join(values));
            }
        }).print();

        env.execute("test-program");

    }
}

Here is the output I get when I run this code:

processing a window
0,1,2,3,4,5,6,7,8,9,10,11,12,13,14
processing a window
15,16,17,18,19,20,21,22,23,24,25,26,27,28,29
processing a window
30,31,32,33,34,35,36,37,38,39,40,41,42,43,44
processing a window
45,46,47,48,49,50,51,52,53,54,55,56,57,58,59
processing a window
60,61,62,63,64,65,66,67,68,69,70,71,72,73,74
processing a window
75,76,77,78,79,80,81,82,83,84,85,86,87,88,89

As you can see, the data from 90 to 101 is not processed. Shouldn't it be processed when the window is completed after 5 ms?

When I remove the trigger part, all of the data does get processed from 0 to 101.
 
Any idea why do we see such a behaviour here?
--
Regards,
Harshvardhan Agrawal
267.991.6618 | LinkedIn

Reply | Threaded
Open this post in threaded view
|

Re: Behaviour of triggers in Flink

Harshvardhan Agrawal
Thanks for the response Hequn. I also see a weird behavior with purging trigger. It skips messages.

Here is the repro:
public class WindowTest {
public static void main (String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
DataStreamSource<Long> source = env.addSource(new SourceFunction<Long>() {

@Override
public void run(SourceContext<Long> ctx) throws Exception {
LongStream.range(0, 101).forEach(ctx::collect);
}

@Override
public void cancel() {

}
});

source.timeWindowAll(Time.milliseconds(5)).trigger(PurgingTrigger.of(CountTrigger.of(7))).apply(new AllWindowFunction<Long, Object, TimeWindow>() {
@Override
public void apply(TimeWindow timeWindow, Iterable<Long> values, Collector<Object> collector) throws Exception {
System.out.println("processing a window");
System.out.println(Joiner.on(',').join(values));
}
}).print();

env.execute("test-program");

}
}


processing a window
0,1,2,3,4,5,6
processing a window
10,11,12,13,14,15,16
processing a window
17,18,19,20,21,22,23
processing a window
24,25,26,27,28,29,30
processing a window
31,32,33,34,35,36,37
processing a window
38,39,40,41,42,43,44
processing a window
45,46,47,48,49,50,51
processing a window
52,53,54,55,56,57,58
processing a window
59,60,61,62,63,64,65
processing a window
66,67,68,69,70,71,72
processing a window
73,74,75,76,77,78,79
processing a window
80,81,82,83,84,85,86
processing a window
87,88,89,90,91,92,93
processing a window
94,95,96,97,98,99,100 

It has skipped numbers 7-9. Is this expected behavior?

On Sun, Jul 22, 2018 at 9:43 PM Hequn Cheng <[hidden email]> wrote:
Hi Harshvardhan,

By specifying a trigger using trigger() you are overwriting the default trigger of a WindowAssigner. For example, if you specify a CountTrigger for TumblingEventTimeWindows you will no longer get window firings based on the progress of time but only by count. Right now, you have to write your own custom trigger if you want to react based on both time and count.
More details here[1].

Best, Hequn


On Sun, Jul 22, 2018 at 11:59 PM, Harshvardhan Agrawal <[hidden email]> wrote:
Hi,

I have been trying to understand how triggers work in Flink. We have a set of data that arrives to us on Kafka. We need to process the data in a window when either one of the two criteria satisfy:
1) Max number of elements has reached in the window.
2) Some max time has elapsed (Say 5 milliseconds in our case).

I have written the following code:

public class WindowTest {
    public static void main (String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
        DataStreamSource<Long> source = env.addSource(new SourceFunction<Long>() {

            @Override
            public void run(SourceContext<Long> ctx) throws Exception {
                LongStream.range(0, 102).forEach(ctx::collect);
            }

            @Override
            public void cancel() {

            }
        });

        source.timeWindowAll(Time.milliseconds(5)).trigger(PurgingTrigger.of(CountTrigger.of(15))).apply(new AllWindowFunction<Long, Object, TimeWindow>() {
            @Override
            public void apply(TimeWindow timeWindow, Iterable<Long> values, Collector<Object> collector) throws Exception {
                System.out.println("processing a window");
                System.out.println(Joiner.on(',').join(values));
            }
        }).print();

        env.execute("test-program");

    }
}

Here is the output I get when I run this code:

processing a window
0,1,2,3,4,5,6,7,8,9,10,11,12,13,14
processing a window
15,16,17,18,19,20,21,22,23,24,25,26,27,28,29
processing a window
30,31,32,33,34,35,36,37,38,39,40,41,42,43,44
processing a window
45,46,47,48,49,50,51,52,53,54,55,56,57,58,59
processing a window
60,61,62,63,64,65,66,67,68,69,70,71,72,73,74
processing a window
75,76,77,78,79,80,81,82,83,84,85,86,87,88,89

As you can see, the data from 90 to 101 is not processed. Shouldn't it be processed when the window is completed after 5 ms?

When I remove the trigger part, all of the data does get processed from 0 to 101.
 
Any idea why do we see such a behaviour here?
--
Regards,
Harshvardhan Agrawal



--
Regards,
Harshvardhan Agrawal