Hi all,
Here I prefer to forcing a task running in LAZY_FROM_SOURCE schedule mode with all ResultPartitionType be BLOCKING.
But I cannot find options to config that in StreamExecutionEnvironment, thus using below as a workaround, quite triky.
inal StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.addSource(new InfiniteSourceFunction())
.setParallelism(2)
.shuffle()
.addSink(new DiscardingSink<>())
.setParallelism(2);
Field field = env.getClass().getSuperclass().getDeclaredField("transformations");
field.setAccessible(true);
List<StreamTransformation<?>> transformations = (List<StreamTransformation<?>>) field.get(env);
StreamGraph streamGraph = StreamGraphGenerator.generate(env, transformations);
streamGraph.getCustomConfiguration().setString(ScheduleMode.class.getName(), LAZY_FROM_SOURCES.toString());
streamGraph.setJobName(testname);
streamGraph.getStreamEdges(1, 3)
.get(0).setResultPartitionType(ResultPartitionType.BLOCKING);