public class LunumTrigger extends Trigger<ImaginePaperData, GlobalWindow> {
private static final long serialVersionUID = 1L;
public LunumTrigger() {}
private final ValueStateDescriptor<Integer> prevLunum = new ValueStateDescriptor<>("lunum", Integer.class);
@Override
public TriggerResult onElement(ImaginePaperData element, long timestamp, GlobalWindow window, TriggerContext ctx) throws Exception {
ValueState<Integer> lunumState = ctx.getPartitionedState(prevLunum);
if (lunumState.value() == null || !(element.lunum.equals(lunumState.value()))) {
System.out.println("LUNUM BEFORE: " + lunumState.value() + " NEW LUNUM: " + element.lunum + " ==> FIRE!");
lunumState.update(element.lunum);
return TriggerResult.FIRE_AND_PURGE;
}
System.out.println("LUNUM BEFORE: " + lunumState.value() + " NEW LUNUM: " + element.lunum + " ==> HOLD!");
lunumState.update(element.lunum);
return TriggerResult.CONTINUE;
}
@Override
public TriggerResult onProcessingTime(long time, GlobalWindow window, TriggerContext ctx) throws Exception {
return TriggerResult.CONTINUE;
}
@Override
public TriggerResult onEventTime(long time, GlobalWindow window, TriggerContext ctx) throws Exception {
return TriggerResult.CONTINUE;
}
@Override
public void clear(GlobalWindow window, TriggerContext ctx) throws Exception {
ctx.getPartitionedState(prevLunum).clear();
}
}