// // Source code recreated from a .class file by IntelliJ IDEA // (powered by Fernflower decompiler) // package wedo.dataflux.windowing.trigger; import org.apache.flink.api.common.state.ReducingState; import org.apache.flink.api.common.state.ReducingStateDescriptor; import org.apache.flink.api.common.typeutils.base.LongSerializer; import org.apache.flink.streaming.api.windowing.triggers.Trigger; import org.apache.flink.streaming.api.windowing.triggers.TriggerResult; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import wedo.dataflux.windowing.function.WindowingMapFunction; import wedo.jaf.util.collections.keyvalue.DataParameterMap; public class EventTimeSessionSignalTrigger extends Trigger { private static final long serialVersionUID = 1L; private final long delay; private final ReducingStateDescriptor stateDesc; // -------------------------------------------------------------------------------------------- /** * Cannot instantiate */ private EventTimeSessionSignalTrigger(long delay) { this.delay = delay; this.stateDesc = new ReducingStateDescriptor<>("signalTimestamp", SignalReduceFunction.create(), LongSerializer.INSTANCE); } // -------------------------------------------------------------------------------------------- /** * * @return */ public static EventTimeSessionSignalTrigger of(long delay) { return new EventTimeSessionSignalTrigger(delay); } // -------------------------------------------------------------------------------------------- @Override public TriggerResult onElement(DataParameterMap element, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception { ReducingState signal = ctx.getPartitionedState(this.stateDesc); if (element.containsKey(WindowingMapFunction.EXPRESSION_RESULT) && element.getBoolean(WindowingMapFunction.EXPRESSION_RESULT)) { long signalTimestamp = timestamp + this.delay; signal.add(signalTimestamp); ctx.registerEventTimeTimer(signalTimestamp); } else if (signal.get() == null) { ctx.registerEventTimeTimer(window.maxTimestamp()); } return TriggerResult.CONTINUE; } // -------------------------------------------------------------------------------------------- @Override public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) throws Exception { ReducingState signal = ctx.getPartitionedState(this.stateDesc); Long signalTimestamp = signal.get(); return time == window.maxTimestamp() || (signalTimestamp != null && time == signalTimestamp) ? TriggerResult.FIRE : TriggerResult.CONTINUE; } // -------------------------------------------------------------------------------------------- @Override public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) throws Exception { return TriggerResult.CONTINUE; } // -------------------------------------------------------------------------------------------- @Override public void clear(TimeWindow window, TriggerContext ctx) throws Exception { ReducingState signal = ctx.getPartitionedState(this.stateDesc); Long signalTimestamp = signal.get(); ctx.deleteEventTimeTimer(window.maxTimestamp()); if (signalTimestamp != null) { ctx.deleteEventTimeTimer(signalTimestamp); } signal.clear(); } // -------------------------------------------------------------------------------------------- @Override public boolean canMerge() { return true; } // -------------------------------------------------------------------------------------------- @Override public void onMerge(TimeWindow window, OnMergeContext ctx) throws Exception { ctx.registerEventTimeTimer(window.maxTimestamp()); ctx.mergePartitionedState(this.stateDesc); } // -------------------------------------------------------------------------------------------- @Override public String toString() { return "EventTimeSessionSignalTrigger(" + this.delay + ")"; } }