GlobalWindow with custom tigger doesn't work correctly

Posted by Daniel Krenn on
URL: http://deprecated-apache-flink-user-mailing-list-archive.369.s1.nabble.com/GlobalWindow-with-custom-tigger-doesn-t-work-correctly-tp25674.html

Hello people!

I have a DataStream, which has events with with a continuing number which signifies their belonging to a production cycle. In essence, this is what the data looks like:

value, production cycle
12.0, 2000
12.3, 2000 one production cylce
12.2, 2000

0.0, 2001
0.4, 2002 another production cycle
1.1, 2002

55.0, 2003
60.0, 2003 another production cycle
70.0, 2003

I have to do some calculations over the events of each production cycle. I want to use Flink's window API for that. This is how I'm doing it right now:

DataStream<String> test = streamExecEnv.readTextFile("C:/Projects/Python/testdata.txt")
    .map(new ImaginePaperDataConverterTest()) // convert data to POJO
    .assignTimestampsAndWatermarks(new ImaginePaperAssigner()) // Assign timestamps for event time
    .keyBy((ImaginePaperData event) -> event.lunum) //<- the production cycle number
    .window(GlobalWindows.create()) // create global window
    .trigger(new LunumTrigger()) // "split" the window with a custom trigger
    .process(new ImaginePaperWindowReportFunction()); // apply a function over the aggregated events

I'm getting a "DataStream" out of a text file, just for testing purposes. The problem is that what I'm doing only aggregates one single event for a production cycle. Why is that? I thought keying the stream by the production cycle number already partitions the stream anyways. The trigger says when the production cycle number is changed, a new global window is started and the events of the current window are aggregated. What am I missing here?
Just to be safe, here is my implementation of the custom trigger:

public class LunumTrigger extends Trigger<ImaginePaperData, GlobalWindow> {

private static final long serialVersionUID = 1L;

public LunumTrigger() {}

private final ValueStateDescriptor<Integer> prevLunum = new ValueStateDescriptor<>("lunum", Integer.class);

@Override
public TriggerResult onElement(ImaginePaperData element, long timestamp, GlobalWindow window, TriggerContext ctx) throws Exception {

ValueState<Integer> lunumState = ctx.getPartitionedState(prevLunum);

if (lunumState.value() == null || !(element.lunum.equals(lunumState.value()))) {
System.out.println("LUNUM BEFORE: " + lunumState.value() + " NEW LUNUM: " + element.lunum + " ==> FIRE!");
lunumState.update(element.lunum);
return TriggerResult.FIRE_AND_PURGE;
}

System.out.println("LUNUM BEFORE: " + lunumState.value() + " NEW LUNUM: " + element.lunum + " ==> HOLD!");
lunumState.update(element.lunum);
return TriggerResult.CONTINUE;
}

@Override
public TriggerResult onProcessingTime(long time, GlobalWindow window, TriggerContext ctx) throws Exception {
return TriggerResult.CONTINUE;
}

@Override
public TriggerResult onEventTime(long time, GlobalWindow window, TriggerContext ctx) throws Exception {
return TriggerResult.CONTINUE;
}

@Override
public void clear(GlobalWindow window, TriggerContext ctx) throws Exception {
ctx.getPartitionedState(prevLunum).clear();
}
}

I'm very grateful for your help.

Regards,

Daniel