Ok, have minimal reproducible example. Attaching a tar file of the job that crashed.
The crash has nothing to do with the number of state variables. But it does seem to be caused by using a type for the state variable that is a class nested in the KeyedProcessFunction.
Reduced to a single state variable. The type of the state variable was a class (ExecQueue) defined in class implementing KeyedProcessFunction. Moving the ExecQueue definition to its own file fixed the problem.
The attached example always crashes the taskManager in 30 seconds to 5 minutes.
MyKeyedProcessFunction in tar file and also cut and pasted here:
package crash;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction.Context;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction.OnTimerContext;
import org.apache.flink.util.Collector;
public class MyKeyedProcessFunction extends KeyedProcessFunction<String, Exec, Exec> {
private static final Logger LOG = LoggerFactory.getLogger(MyKeyedProcessFunction.class);
public TypeInformation<ExecQueue> leftTypeInfo;
public transient ValueState<ExecQueue> leftState;
public int initQueueSize;
public long emitFrequencyMs;
public MyKeyedProcessFunction() {
initQueueSize = 10;
emitFrequencyMs = 1;
}
@Override
public void open(Configuration conf) {
leftTypeInfo = TypeInformation.of(new TypeHint<ExecQueue>(){});
leftState = getRuntimeContext().getState(
new ValueStateDescriptor<>("left", leftTypeInfo, null));
}
@Override
public void processElement(Exec leftIn, Context ctx, Collector<Exec> out) {
try {
ExecQueue eq = leftState.value();
if (eq == null) {
eq = new ExecQueue(10);
ctx.timerService().registerProcessingTimeTimer(ctx.timerService().currentProcessingTime() + emitFrequencyMs);
}
leftState.update(eq);
}
catch (Exception e) {
LOG.error("Exception in processElement1. Key: " + ctx.getCurrentKey() + ". " + e + ". trace = " );
for (java.lang.StackTraceElement s:e.getStackTrace())
LOG.error(s.toString());
}
}
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<Exec> out) {
try {
ExecQueue eq = leftState.value();
ctx.timerService().registerProcessingTimeTimer(ctx.timerService().currentProcessingTime() + emitFrequencyMs);
}
catch ( Exception e) {
LOG.error("Exception in onTimer. Key: " + ctx.getCurrentKey() + ". " + e + ". trace = " );
for (java.lang.StackTraceElement s:e.getStackTrace())
LOG.error(s.toString());
}
}
public class ExecQueue {
public RingBufferExec queue;
public ExecQueue (){}
public ExecQueue (int initSize) {
queue = new RingBufferExec(initSize);
}
public class RingBufferExec {
public Integer size;
public Integer count;
public RingBufferExec(){ }
public RingBufferExec(int sizeIn){
size = sizeIn;
count = 0;
}
}
}
}
From: Dawid Wysakowicz <[hidden email]>
Sent: Thursday, October 8, 2020 6:26 AM
To: Colletta, Edward <[hidden email]>; [hidden email]
Subject: Re: state access causing segmentation fault
Hi,
It should be absolutely fine to use multiple state objects. I am not aware of any limits to that. A minimal, reproducible example would definitely be helpful. For those kind of exceptions, I'd look into the serializers you use. Other than that I cannot think
of an obvious reason for that kind of exceptions.
Best,
Dawid
On 08/10/2020 12:12, Colletta, Edward wrote:
Using Flink 1.9.2, Java, FsStateBackend. Running Session cluster on EC2 instances.
I have a KeyedProcessFunction that is causing a segmentation fault, crashing the flink task manager. The seems to be caused by using 3 State variables in the operator. The crash happens consistently after some load is processed.
This is the second time I have encountered this. The first time I had 3 ValueState variables, this time I had 2 ValueState variables and a MapState variable. Both times the error was alleviated by removing one of the state variables.
This time I replaced the 2 valueState variables with a Tuple2 of the types of the individual variables. I can try to put together a minimal example, but I was wondering if anyone has encountered this problem.
Are there any documented limits of the number of state variables 1 operator can use?
For background the reason I use multiple state variables is the operator is processing 2 types of inputs, Left and Right. When Left is received it is put it into a PriorityQueue. When the Right type is received I put that into a ring buffer.
I replaced the PriorityQueue with a queue of Ids and MapState to hold the elements. So I have Left stored in a queue ValueState variable and MapState variable, and Right is stored in the ring buffer ValueState variable.
Free forum by Nabble | Edit this page |