After OutOfMemoryError State can not be readed

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

After OutOfMemoryError State can not be readed

Edward Rojas
Hello all,

We are running Flink 1.5.3 on Kubernetes with RocksDB as statebackend.
When performing some load testing we got an /OutOfMemoryError: native memory
exhausted/, causing the job to fail and be restarted.

After the Taskmanager is restarted, the job is recovered from a Checkpoint,
but it seems that there is a problem when trying to access the state. We got
the error from the *onTimer* function of a *onProcessingTime*.

It would be possible that the OOM error could have caused to checkpoint a
corrupted state?

We get Exceptions like:

TimerException{java.lang.RuntimeException: Error while retrieving data from
RocksDB.}
        at
org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:288)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:522)
        at java.util.concurrent.FutureTask.run(FutureTask.java:277)
        at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:191)
        at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
        at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1160)
        at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
        at java.lang.Thread.run(Thread.java:811)
Caused by: java.lang.RuntimeException: Error while retrieving data from
RocksDB.
        at
org.apache.flink.contrib.streaming.state.RocksDBValueState.value(RocksDBValueState.java:89)
        at com.xxx.ProcessFunction.*onTimer*(ProcessFunction.java:279)
        at
org.apache.flink.streaming.api.operators.KeyedProcessOperator.invokeUserFunction(KeyedProcessOperator.java:94)
        at
org.apache.flink.streaming.api.operators.KeyedProcessOperator.*onProcessingTime*(KeyedProcessOperator.java:78)
        at
org.apache.flink.streaming.api.operators.HeapInternalTimerService.*onProcessingTime*(HeapInternalTimerService.java:266)
        at
org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:285)
        ... 7 more
Caused by: java.io.EOFException
        at java.io.DataInputStream.readFully(DataInputStream.java:208)
        at java.io.DataInputStream.readUTF(DataInputStream.java:618)
        at java.io.DataInputStream.readUTF(DataInputStream.java:573)
        at
org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:381)
        at
org.apache.flink.contrib.streaming.state.RocksDBValueState.value(RocksDBValueState.java:87)
        ... 12 more


Thanks in advance for any help




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: After OutOfMemoryError State can not be readed

vino yang
Hi Edward,

From this log: Caused by: java.io.EOFException, it seems that the state metadata file has been corrupted.
But I can't confirm it, maybe Stefan knows more details, Ping him for you.

Thanks, vino.

Edward Rojas <[hidden email]> 于2018年9月7日周五 上午1:22写道:
Hello all,

We are running Flink 1.5.3 on Kubernetes with RocksDB as statebackend.
When performing some load testing we got an /OutOfMemoryError: native memory
exhausted/, causing the job to fail and be restarted.

After the Taskmanager is restarted, the job is recovered from a Checkpoint,
but it seems that there is a problem when trying to access the state. We got
the error from the *onTimer* function of a *onProcessingTime*.

It would be possible that the OOM error could have caused to checkpoint a
corrupted state?

We get Exceptions like:

TimerException{java.lang.RuntimeException: Error while retrieving data from
RocksDB.}
        at
org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:288)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:522)
        at java.util.concurrent.FutureTask.run(FutureTask.java:277)
        at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:191)
        at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
        at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1160)
        at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
        at java.lang.Thread.run(Thread.java:811)
Caused by: java.lang.RuntimeException: Error while retrieving data from
RocksDB.
        at
org.apache.flink.contrib.streaming.state.RocksDBValueState.value(RocksDBValueState.java:89)
        at com.xxx.ProcessFunction.*onTimer*(ProcessFunction.java:279)
        at
org.apache.flink.streaming.api.operators.KeyedProcessOperator.invokeUserFunction(KeyedProcessOperator.java:94)
        at
org.apache.flink.streaming.api.operators.KeyedProcessOperator.*onProcessingTime*(KeyedProcessOperator.java:78)
        at
org.apache.flink.streaming.api.operators.HeapInternalTimerService.*onProcessingTime*(HeapInternalTimerService.java:266)
        at
org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:285)
        ... 7 more
Caused by: java.io.EOFException
        at java.io.DataInputStream.readFully(DataInputStream.java:208)
        at java.io.DataInputStream.readUTF(DataInputStream.java:618)
        at java.io.DataInputStream.readUTF(DataInputStream.java:573)
        at
org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:381)
        at
org.apache.flink.contrib.streaming.state.RocksDBValueState.value(RocksDBValueState.java:87)
        ... 12 more


Thanks in advance for any help




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: After OutOfMemoryError State can not be readed

Stefan Richter
Hi,

what I can say is that any failures like OOMs should not corrupt checkpoint files, because only successfully completed checkpoints are used for recovery by the job manager. Just to get a bit more info, are you using full or incremental checkpoints? Unfortunately, it is a bit hard to say from the given information what the cause of the problem is. Typically, these problems have been observed when something was wrong with a serializer or a stateful serializer was used from multiple threads.

Best,
Stefan 

Am 07.09.2018 um 05:04 schrieb vino yang <[hidden email]>:

Hi Edward,

From this log: Caused by: java.io.EOFException, it seems that the state metadata file has been corrupted.
But I can't confirm it, maybe Stefan knows more details, Ping him for you.

Thanks, vino.

Edward Rojas <[hidden email]> 于2018年9月7日周五 上午1:22写道:
Hello all,

We are running Flink 1.5.3 on Kubernetes with RocksDB as statebackend.
When performing some load testing we got an /OutOfMemoryError: native memory
exhausted/, causing the job to fail and be restarted.

After the Taskmanager is restarted, the job is recovered from a Checkpoint,
but it seems that there is a problem when trying to access the state. We got
the error from the *onTimer* function of a *onProcessingTime*.

It would be possible that the OOM error could have caused to checkpoint a
corrupted state?

We get Exceptions like:

TimerException{java.lang.RuntimeException: Error while retrieving data from
RocksDB.}
        at
org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:288)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:522)
        at java.util.concurrent.FutureTask.run(FutureTask.java:277)
        at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:191)
        at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
        at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1160)
        at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
        at java.lang.Thread.run(Thread.java:811)
Caused by: java.lang.RuntimeException: Error while retrieving data from
RocksDB.
        at
org.apache.flink.contrib.streaming.state.RocksDBValueState.value(RocksDBValueState.java:89)
        at com.xxx.ProcessFunction.*onTimer*(ProcessFunction.java:279)
        at
org.apache.flink.streaming.api.operators.KeyedProcessOperator.invokeUserFunction(KeyedProcessOperator.java:94)
        at
org.apache.flink.streaming.api.operators.KeyedProcessOperator.*onProcessingTime*(KeyedProcessOperator.java:78)
        at
org.apache.flink.streaming.api.operators.HeapInternalTimerService.*onProcessingTime*(HeapInternalTimerService.java:266)
        at
org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:285)
        ... 7 more
Caused by: java.io.EOFException
        at java.io.DataInputStream.readFully(DataInputStream.java:208)
        at java.io.DataInputStream.readUTF(DataInputStream.java:618)
        at java.io.DataInputStream.readUTF(DataInputStream.java:573)
        at
org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:381)
        at
org.apache.flink.contrib.streaming.state.RocksDBValueState.value(RocksDBValueState.java:87)
        ... 12 more


Thanks in advance for any help




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Reply | Threaded
Open this post in threaded view
|

Re: After OutOfMemoryError State can not be readed

Edward Rojas
Hi Stefan, Vino,
Thanks for your answers. 

We are using full checkpointing, not incremental. We are using custom serializers for the operators state classes, The serializers perform encryption before writing and decrypt when reading. The serializer is stateless.
We register the Serializers by using 
env.getConfig()
      .registerTypeWithKryoSerializer(ProcessState.class, ProcessStateSerializer.class);

In normal cases the Serialization works correctly, even after recovering from a failure. We get this error only when taskmnager fails due to memory problems.

Thanks again for your help,
Edward

El vie., 7 sept. 2018 a las 11:51, Stefan Richter (<[hidden email]>) escribió:
Hi,

what I can say is that any failures like OOMs should not corrupt checkpoint files, because only successfully completed checkpoints are used for recovery by the job manager. Just to get a bit more info, are you using full or incremental checkpoints? Unfortunately, it is a bit hard to say from the given information what the cause of the problem is. Typically, these problems have been observed when something was wrong with a serializer or a stateful serializer was used from multiple threads.

Best,
Stefan 

Am 07.09.2018 um 05:04 schrieb vino yang <[hidden email]>:

Hi Edward,

From this log: Caused by: java.io.EOFException, it seems that the state metadata file has been corrupted.
But I can't confirm it, maybe Stefan knows more details, Ping him for you.

Thanks, vino.

Edward Rojas <[hidden email]> 于2018年9月7日周五 上午1:22写道:
Hello all,

We are running Flink 1.5.3 on Kubernetes with RocksDB as statebackend.
When performing some load testing we got an /OutOfMemoryError: native memory
exhausted/, causing the job to fail and be restarted.

After the Taskmanager is restarted, the job is recovered from a Checkpoint,
but it seems that there is a problem when trying to access the state. We got
the error from the *onTimer* function of a *onProcessingTime*.

It would be possible that the OOM error could have caused to checkpoint a
corrupted state?

We get Exceptions like:

TimerException{java.lang.RuntimeException: Error while retrieving data from
RocksDB.}
        at
org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:288)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:522)
        at java.util.concurrent.FutureTask.run(FutureTask.java:277)
        at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:191)
        at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
        at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1160)
        at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
        at java.lang.Thread.run(Thread.java:811)
Caused by: java.lang.RuntimeException: Error while retrieving data from
RocksDB.
        at
org.apache.flink.contrib.streaming.state.RocksDBValueState.value(RocksDBValueState.java:89)
        at com.xxx.ProcessFunction.*onTimer*(ProcessFunction.java:279)
        at
org.apache.flink.streaming.api.operators.KeyedProcessOperator.invokeUserFunction(KeyedProcessOperator.java:94)
        at
org.apache.flink.streaming.api.operators.KeyedProcessOperator.*onProcessingTime*(KeyedProcessOperator.java:78)
        at
org.apache.flink.streaming.api.operators.HeapInternalTimerService.*onProcessingTime*(HeapInternalTimerService.java:266)
        at
org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:285)
        ... 7 more
Caused by: java.io.EOFException
        at java.io.DataInputStream.readFully(DataInputStream.java:208)
        at java.io.DataInputStream.readUTF(DataInputStream.java:618)
        at java.io.DataInputStream.readUTF(DataInputStream.java:573)
        at
org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:381)
        at
org.apache.flink.contrib.streaming.state.RocksDBValueState.value(RocksDBValueState.java:87)
        ... 12 more


Thanks in advance for any help




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/



--
Edward Alexander Rojas Clavijo

Software Engineer
Hybrid Cloud
IBM France