package com.BatchStreamAnalytics; import org.apache.flink.api.common.state.ValueState; import org.apache.flink.api.common.state.ValueStateDescriptor; import org.apache.flink.api.common.typeinfo.TypeHint; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.triggers.Trigger; import org.apache.flink.streaming.api.windowing.triggers.TriggerResult; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.time.Instant; import java.util.ArrayList; import java.util.List; public class LazyAlgoTrigger extends Trigger, TimeWindow> { private static final long serialVersionUID = 1L; private Logger LOG = LoggerFactory.getLogger(LazyAlgoTrigger.class); private final double threshold; private final ValueStateDescriptor> stateDesc; private LazyAlgoTrigger(double threshold) { this.threshold = threshold; this.stateDesc = new ValueStateDescriptor<>( "accumulated_error", TypeInformation.of(new TypeHint>() {}) ); } @Override public TriggerResult onElement(Tuple3 element, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception { ValueState> current = ctx.getPartitionedState(stateDesc); double updated_sum; if (current.value() == null) { updated_sum = element.f2; } else { updated_sum = element.f2 + current.value().f2; } current.update(new Tuple3(element.f0, element.f1, updated_sum)); if (updated_sum >= threshold) { LOG.info("Purging {} at {}", current.value(), timestamp); current.clear(); return TriggerResult.FIRE_AND_PURGE; } else if (window.maxTimestamp() <= ctx.getCurrentWatermark()) { // if the watermark is already past the window fire immediately LOG.info("Purging {} at {}", current.value(), timestamp); return TriggerResult.FIRE_AND_PURGE; } else { ctx.registerEventTimeTimer(window.maxTimestamp()); LOG.info("Adding a trigger at {}", window.maxTimestamp()); return TriggerResult.CONTINUE; } } @Override public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) { if(time==window.maxTimestamp()) { LOG.info("Purging at {}", time ); return TriggerResult.FIRE_AND_PURGE; } else{ return TriggerResult.CONTINUE; } } @Override public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) throws Exception { return TriggerResult.CONTINUE; } @Override public void clear(TimeWindow window, TriggerContext ctx) throws Exception { ctx.deleteEventTimeTimer(window.maxTimestamp()); } @Override public boolean canMerge() { return true; } @Override public void onMerge(TimeWindow window, OnMergeContext ctx) { ctx.registerEventTimeTimer(window.maxTimestamp()); } @Override public String toString() { return "LazyAlgoTrigger(" + this.threshold + ")"; } public static LazyAlgoTrigger create(double threshold) { return new LazyAlgoTrigger(threshold); } public static List> getTestData() { long base = Instant.now().toEpochMilli(); // timestamp in milliseconds List> data = new ArrayList>(10); data.add(Tuple3.of(base, "a", 5.0)); data.add(Tuple3.of(base + 10000, "a", 25.0)); data.add(Tuple3.of(base + 20000, "b", 45.0)); data.add(Tuple3.of(base + 50000, "a", 30.0)); data.add(Tuple3.of(base + 75000, "b", 30.0)); data.add(Tuple3.of(base + 85000, "a", 50.0)); data.add(Tuple3.of(base + 90000, "c", 15.0)); data.add(Tuple3.of(base + 111000, "c", 5.0)); data.add(Tuple3.of(base + 121000, "c", 15.0)); data.add(Tuple3.of(base + 125000, "d", 20.0)); data.add(Tuple3.of(base + 130000, "d", 40.0)); data.add(Tuple3.of(base + 150000, "d", 60.0)); return data; } public static void main(String[] args) throws Exception { long windowLength = 100000; final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); env.fromCollection(getTestData()) .assignTimestampsAndWatermarks(new MyTimestampAndWatermarkGenerator()) .keyBy(1) .window(TumblingEventTimeWindows.of(Time.milliseconds(windowLength))) .trigger(LazyAlgoTrigger.create(50)) .sum(2) .print(); env.execute(); } }