import org.apache.flink.api.common.ExecutionMode; import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.runtime.io.network.partition.ResultPartitionType; import org.apache.flink.runtime.jobgraph.ScheduleMode; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.sink.DiscardingSink; import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; import org.apache.flink.streaming.api.graph.StreamGraph; import org.apache.flink.streaming.api.graph.StreamGraphGenerator; import org.apache.flink.streaming.api.transformations.StreamTransformation; import java.lang.reflect.Field; import java.util.List; import static org.apache.flink.runtime.jobgraph.ScheduleMode.LAZY_FROM_SOURCES; public class JobMasterFailoverTestingJob { // ************************************************************************* // PROGRAM // ************************************************************************* @SuppressWarnings("unchecked") public static void main(String[] args) throws Exception { final ParameterTool params = ParameterTool.fromArgs(args); final String testname = params.get("testname"); final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.getConfig().setExecutionMode(ExecutionMode.BATCH_FORCED); env.addSource(new FiniteSourceFunction()) .setParallelism(2) .shuffle() .addSink(new BlockingSinkFunction()) .setParallelism(2); Field field = env.getClass().getSuperclass().getDeclaredField("transformations"); field.setAccessible(true); List> transformations = (List>) field.get(env); StreamGraph streamGraph = StreamGraphGenerator.generate(env, transformations); streamGraph.getCustomConfiguration().setString(ScheduleMode.class.getName(), LAZY_FROM_SOURCES.toString()); streamGraph.setJobName(testname); streamGraph.getStreamEdges(1, 3) .get(0).setResultPartitionType(ResultPartitionType.BLOCKING); env.execute(streamGraph); } // ************************************************************************* // USER FUNCTIONS // ************************************************************************* private static final class FiniteSourceFunction extends RichParallelSourceFunction { FiniteSourceFunction() { } @Override public void run(SourceContext ctx) { ctx.collect(0); } @Override public void cancel() { } } private static final class BlockingSinkFunction extends RichSinkFunction { BlockingSinkFunction() { } @Override public void invoke(Integer value, Context context) throws Exception { while (value != null) { value = 0; Thread.sleep(500L); } } } }