windows and triggers

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

windows and triggers

Marco Villalobos-2
I wrote this simple test:

.window(TumblingProcessingTimeWindows.of(Time.minutes(1)))
.trigger(PurgingTrigger.of(CountTrigger.of(5)))

Thinking that if I send 2 elements of data, it would collect them after a minute.
But that doesn't seem to be happening.

Is my understanding of how windows and triggers work correct?

/**
* To test this job first in command line make a simple server on a terminal
*
* nc -l 8889
*
* Then start this job at the command line or in IDE. Then in the terminal input each value by typing a line of text.
* Each line of text (excluding the new line) will be picked up by this job.
*/
public class TriggerTestJob {

public static void main(String args[]) throws Exception {
final StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment();
streamEnv.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
streamEnv.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
streamEnv.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);

streamEnv.socketTextStream("localhost", 8889)
.map(value -> new Tuple2<String, String>("test", value)).returns(new TypeHint<Tuple2<String, String>>(){})
.keyBy((KeySelector<Tuple2<String, String>, String>) value -> value.f0)
.window(TumblingProcessingTimeWindows.of(Time.minutes(1)))
.trigger(PurgingTrigger.of(CountTrigger.of(5)))
.process(new ProcessWindowFunction<Tuple2<String, String>, Tuple2<String, String>, String, TimeWindow>() {
@Override
public void process(String key, Context context, Iterable<Tuple2<String, String>> elements, Collector<Tuple2<String, String>> out) throws Exception {
for (Tuple2<String, String> element : elements) {
out.collect(element);
}
}
}).name("trigger")
.print();

streamEnv.execute("trigger test");
}
}
Reply | Threaded
Open this post in threaded view
|

Re: windows and triggers

Marco Villalobos-2
Ah, this works as expected, since Flink documentation states this:

By specifying a trigger using trigger() you are overwriting the default trigger of a WindowAssigner. For example, if you specify a CountTrigger for TumblingEventTimeWindows you will no longer get window firings based on the progress of time but only by count. Right now, you have to write your own custom trigger if you want to react based on both time and count.



On Tue, Jan 26, 2021 at 10:44 AM Marco Villalobos <[hidden email]> wrote:
I wrote this simple test:

.window(TumblingProcessingTimeWindows.of(Time.minutes(1)))
.trigger(PurgingTrigger.of(CountTrigger.of(5)))

Thinking that if I send 2 elements of data, it would collect them after a minute.
But that doesn't seem to be happening.

Is my understanding of how windows and triggers work correct?

/**
* To test this job first in command line make a simple server on a terminal
*
* nc -l 8889
*
* Then start this job at the command line or in IDE. Then in the terminal input each value by typing a line of text.
* Each line of text (excluding the new line) will be picked up by this job.
*/
public class TriggerTestJob {

public static void main(String args[]) throws Exception {
final StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment();
streamEnv.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
streamEnv.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
streamEnv.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);

streamEnv.socketTextStream("localhost", 8889)
.map(value -> new Tuple2<String, String>("test", value)).returns(new TypeHint<Tuple2<String, String>>(){})
.keyBy((KeySelector<Tuple2<String, String>, String>) value -> value.f0)
.window(TumblingProcessingTimeWindows.of(Time.minutes(1)))
.trigger(PurgingTrigger.of(CountTrigger.of(5)))
.process(new ProcessWindowFunction<Tuple2<String, String>, Tuple2<String, String>, String, TimeWindow>() {
@Override
public void process(String key, Context context, Iterable<Tuple2<String, String>> elements, Collector<Tuple2<String, String>> out) throws Exception {
for (Tuple2<String, String> element : elements) {
out.collect(element);
}
}
}).name("trigger")
.print();

streamEnv.execute("trigger test");
}
}