package crash; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.flink.core.fs.FileSystem.WriteMode; import org.apache.flink.streaming.api.functions.sink.SinkFunction; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.configuration.Configuration; import org.apache.flink.api.common.typeinfo.TypeHint; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.KeyedStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.util.Collector; import org.apache.flink.runtime.state.filesystem.FsStateBackend; import org.apache.flink.api.common.restartstrategy.RestartStrategies; import java.util.concurrent.TimeUnit; import java.util.Properties; import java.util.Map; import java.util.List; import java.util.ArrayList; import java.io.IOException; import java.io.*; public class StreamingJob { static final Logger LOG = LoggerFactory.getLogger(StreamingJob.class); public static void main(String[] args) throws Exception { final ParameterTool params = ParameterTool.fromArgs(args); final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.getConfig().setGlobalJobParameters(params); SinkFunction sink = new TestSink(); buildJobGrapth(env, sink); LOG.info("Starting Execution crashTest"); env.execute("crashTest"); } public static void buildJobGrapth (StreamExecutionEnvironment env, SinkFunction sink) throws Exception { // set up the execution environment LOG.info("Setting stateBackend to FsStateBackend"); env.setStateBackend(new FsStateBackend("file:///opt/flink/ha", true)); env.enableCheckpointing(30000); // env.setRestartStrategy(RestartStrategies.fixedDelayRestart // (3, // number of restart attempts // org.apache.flink.api.common.time.Time.of(10, TimeUnit.SECONDS))); // delay DataStream source1 = env .addSource(new Beacon(1, 999000000)) .map(x->x%100000) .map(new MakeExec()); DataStream result = source1 .keyBy(x->x.symbol) .process(new MyKeyedProcessFunction()).name("MyKeyedProcessFunction"); } private static class TestSink implements SinkFunction { public static final List values = new ArrayList<>(); @Override public synchronized void invoke(String value) throws Exception { // values.add(value); } } public static class MakeExec implements MapFunction { private static final Logger LOG = LoggerFactory.getLogger(MakeExec.class); @Override public Exec map (Long in) { return new Exec(in); } } }