package crash; import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; import java.util.List; import java.util.ArrayList; public class Beacon extends RichParallelSourceFunction { private Long offset = 0L; public long interval; public long maxTicks; /** flag for job cancellation */ private volatile boolean isRunning = true; public Beacon (long intervalIn, long maxTicksIn) { interval = intervalIn; maxTicks = maxTicksIn; } @Override public void run(SourceContext ctx) { final Object lock = ctx.getCheckpointLock(); while (isRunning) { // output and state update are atomic synchronized (lock) { ctx.collect(offset); offset += 1; if (offset >= maxTicks) isRunning = false; } try { Thread.sleep(interval); } catch (java.lang.InterruptedException e) {} } } @Override public void cancel() { isRunning = false; } }