package crash; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.flink.api.common.state.ValueStateDescriptor; import org.apache.flink.api.common.typeinfo.TypeHint; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.state.ValueState; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.functions.KeyedProcessFunction; import org.apache.flink.streaming.api.functions.KeyedProcessFunction.Context; import org.apache.flink.streaming.api.functions.KeyedProcessFunction.OnTimerContext; import org.apache.flink.util.Collector; public class MyKeyedProcessFunction extends KeyedProcessFunction { private static final Logger LOG = LoggerFactory.getLogger(MyKeyedProcessFunction.class); public TypeInformation leftTypeInfo; public transient ValueState leftState; public int initQueueSize; public long emitFrequencyMs; public MyKeyedProcessFunction() { initQueueSize = 10; emitFrequencyMs = 1; } @Override public void open(Configuration conf) { leftTypeInfo = TypeInformation.of(new TypeHint(){}); leftState = getRuntimeContext().getState( new ValueStateDescriptor<>("left", leftTypeInfo, null)); } @Override public void processElement(Exec leftIn, Context ctx, Collector out) { try { ExecQueue eq = leftState.value(); if (eq == null) { eq = new ExecQueue(10); ctx.timerService().registerProcessingTimeTimer(ctx.timerService().currentProcessingTime() + emitFrequencyMs); } leftState.update(eq); } catch (Exception e) { LOG.error("Exception in processElement1. Key: " + ctx.getCurrentKey() + ". " + e + ". trace = " ); for (java.lang.StackTraceElement s:e.getStackTrace()) LOG.error(s.toString()); } } @Override public void onTimer(long timestamp, OnTimerContext ctx, Collector out) { try { ExecQueue eq = leftState.value(); ctx.timerService().registerProcessingTimeTimer(ctx.timerService().currentProcessingTime() + emitFrequencyMs); } catch ( Exception e) { LOG.error("Exception in onTimer. Key: " + ctx.getCurrentKey() + ". " + e + ". trace = " ); for (java.lang.StackTraceElement s:e.getStackTrace()) LOG.error(s.toString()); } } public class ExecQueue { public RingBufferExec queue; public ExecQueue (){} public ExecQueue (int initSize) { queue = new RingBufferExec(initSize); } public class RingBufferExec { public Integer size; public Integer count; public RingBufferExec(){ } public RingBufferExec(int sizeIn){ size = sizeIn; count = 0; } } } }