RE: state access causing segmentation fault

Posted by Colletta, Edward on
URL: http://deprecated-apache-flink-user-mailing-list-archive.369.s1.nabble.com/state-access-causing-segmentation-fault-tp38562p38622.html

Ok, have minimal reproducible example.   Attaching a tar file of the job that crashed.

The crash has nothing to do with the number of state variables.  But it does seem to be caused by using a type for the state variable that is a class nested in the KeyedProcessFunction.

Reduced to a single state variable.  The type of the state variable was a class (ExecQueue) defined in class implementing KeyedProcessFunction.  Moving the ExecQueue definition to its own file fixed the problem.

 

The attached example always crashes  the taskManager in 30 seconds to 5 minutes.

 

MyKeyedProcessFunction in tar file and also cut and pasted here:

 

package crash;

 

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

 

import org.apache.flink.api.common.state.ValueStateDescriptor;

import org.apache.flink.api.common.typeinfo.TypeHint;

import org.apache.flink.api.common.typeinfo.TypeInformation;

import org.apache.flink.api.common.state.ValueState;

import org.apache.flink.configuration.Configuration;

import org.apache.flink.streaming.api.functions.KeyedProcessFunction;

import org.apache.flink.streaming.api.functions.KeyedProcessFunction.Context;

import org.apache.flink.streaming.api.functions.KeyedProcessFunction.OnTimerContext;

import org.apache.flink.util.Collector;

 

public class MyKeyedProcessFunction extends KeyedProcessFunction<String, Exec, Exec> {

    private static final Logger LOG = LoggerFactory.getLogger(MyKeyedProcessFunction.class);

    public TypeInformation<ExecQueue> leftTypeInfo;

    public transient ValueState<ExecQueue> leftState;

 

    public int initQueueSize;

    public long emitFrequencyMs;

 

    public MyKeyedProcessFunction() {

        initQueueSize = 10;

        emitFrequencyMs = 1;

    }

 

    @Override

    public void open(Configuration conf) {

        leftTypeInfo = TypeInformation.of(new TypeHint<ExecQueue>(){});

        leftState = getRuntimeContext().getState(

                    new ValueStateDescriptor<>("left", leftTypeInfo, null));

    }

 

    @Override

    public void processElement(Exec leftIn, Context ctx, Collector<Exec> out) {

        try {

            ExecQueue eq = leftState.value();

            if (eq == null) {

                eq = new ExecQueue(10);

                ctx.timerService().registerProcessingTimeTimer(ctx.timerService().currentProcessingTime() + emitFrequencyMs);

            }

            leftState.update(eq);

        }

        catch (Exception e) {

            LOG.error("Exception in processElement1. Key: " + ctx.getCurrentKey() + ". " + e + ". trace = " );

            for (java.lang.StackTraceElement s:e.getStackTrace())

                LOG.error(s.toString());

 

        }

    }

 

 

    @Override

    public void onTimer(long timestamp, OnTimerContext ctx, Collector<Exec> out) {

        try {

            ExecQueue eq = leftState.value();

            ctx.timerService().registerProcessingTimeTimer(ctx.timerService().currentProcessingTime() + emitFrequencyMs);

        }

        catch ( Exception e) {

            LOG.error("Exception in onTimer. Key: " + ctx.getCurrentKey() + ". " + e + ". trace = " );

            for (java.lang.StackTraceElement s:e.getStackTrace())

                LOG.error(s.toString());

        }

    }

    public class ExecQueue {

        public RingBufferExec queue;

        public ExecQueue (){}

        public ExecQueue (int initSize) {

            queue = new RingBufferExec(initSize);

        }

 

        public class RingBufferExec {

            public Integer size;

            public Integer count;

            public RingBufferExec(){ }

            public RingBufferExec(int sizeIn){

                size = sizeIn;

                count = 0;

            }

        }

    }

}

 

 

From: Dawid Wysakowicz <[hidden email]>
Sent: Thursday, October 8, 2020 6:26 AM
To: Colletta, Edward <[hidden email]>; [hidden email]
Subject: Re: state access causing segmentation fault

 

Hi,

It should be absolutely fine to use multiple state objects. I am not aware of any limits to that. A minimal, reproducible example would definitely be helpful. For those kind of exceptions, I'd look into the serializers you use. Other than that I cannot think of an obvious reason for that kind of exceptions.

Best,

Dawid

On 08/10/2020 12:12, Colletta, Edward wrote:

Using Flink 1.9.2, Java, FsStateBackend.  Running Session cluster on EC2 instances.

 

I have a KeyedProcessFunction that is causing a segmentation fault, crashing the flink task manager.  The seems to be caused by using 3 State variables in the operator.  The crash happens consistently after some load is processed.

This is the second time I have encountered this.   The first time I had 3 ValueState variables, this time I had 2 ValueState variables and a MapState variable.  Both times the error was alleviated by removing one of the state variables.

This time I replaced the 2 valueState variables with a Tuple2 of the types of the individual variables.   I can try to put together a minimal example, but I was wondering if anyone has encountered this problem.

 

Are there any documented limits of the number of state variables 1 operator can use?

 

For background the reason I use multiple state variables is the operator is processing 2 types of inputs, Left and Right.  When Left is received it is put it into a PriorityQueue. When the Right type is received I put that into a ring buffer.

I replaced the PriorityQueue with a queue of Ids and MapState to hold the elements.  So I have Left stored in a queue ValueState variable and MapState variable, and Right is stored in the ring buffer ValueState variable.

 

 


crash.tar.gz (1009K) Download Attachment