package test; import org.apache.flink.api.common.functions.ReduceFunction; import org.apache.flink.api.common.state.ReducingState; import org.apache.flink.api.common.state.ReducingStateDescriptor; import org.apache.flink.api.common.typeutils.base.LongSerializer; import org.apache.flink.streaming.api.windowing.triggers.Trigger; import org.apache.flink.streaming.api.windowing.triggers.TriggerResult; import org.apache.flink.streaming.api.windowing.windows.Window; public class CustomTrigger extends Trigger { private static final long serialVersionUID = 1L; private final long maxCount; private final ReducingStateDescriptor stateDesc = new ReducingStateDescriptor<>("count", new Sum(), LongSerializer.INSTANCE); private CustomTrigger(long maxCount) { this.maxCount = maxCount; } @Override public TriggerResult onElement(Object element, long timestamp, W window, TriggerContext ctx) throws Exception { ReducingState count = ctx.getPartitionedState(stateDesc); count.add(1L); if (count.get() >= maxCount) { count.clear(); return TriggerResult.FIRE_AND_PURGE; } return TriggerResult.CONTINUE; } @Override public TriggerResult onEventTime(long time, W window, TriggerContext ctx) { return TriggerResult.CONTINUE; } @Override public TriggerResult onProcessingTime(long time, W window, TriggerContext ctx) throws Exception { return TriggerResult.FIRE_AND_PURGE; } @Override public void clear(W window, TriggerContext ctx) throws Exception { ctx.getPartitionedState(stateDesc).clear(); } @Override public boolean canMerge() { return true; } @Override public void onMerge(W window, OnMergeContext ctx) throws Exception { ctx.mergePartitionedState(stateDesc); } @Override public String toString() { return "CountTrigger(" + maxCount + ")"; } /** * Creates a trigger that fires once the number of elements in a pane reaches * the given count. * * @param maxCount The count of elements at which to fire. * @param The type of {@link Window Windows} on which this trigger can * operate. */ public static CustomTrigger of(long maxCount) { return new CustomTrigger<>(maxCount); } private static class Sum implements ReduceFunction { private static final long serialVersionUID = 1L; @Override public Long reduce(Long value1, Long value2) throws Exception { return value1 + value2; } } }