Hello all,
We are running Flink 1.5.3 on Kubernetes with RocksDB as statebackend. When performing some load testing we got an /OutOfMemoryError: native memory exhausted/, causing the job to fail and be restarted. After the Taskmanager is restarted, the job is recovered from a Checkpoint, but it seems that there is a problem when trying to access the state. We got the error from the *onTimer* function of a *onProcessingTime*. It would be possible that the OOM error could have caused to checkpoint a corrupted state? We get Exceptions like: TimerException{java.lang.RuntimeException: Error while retrieving data from RocksDB.} at org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:288) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:522) at java.util.concurrent.FutureTask.run(FutureTask.java:277) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:191) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1160) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) at java.lang.Thread.run(Thread.java:811) Caused by: java.lang.RuntimeException: Error while retrieving data from RocksDB. at org.apache.flink.contrib.streaming.state.RocksDBValueState.value(RocksDBValueState.java:89) at com.xxx.ProcessFunction.*onTimer*(ProcessFunction.java:279) at org.apache.flink.streaming.api.operators.KeyedProcessOperator.invokeUserFunction(KeyedProcessOperator.java:94) at org.apache.flink.streaming.api.operators.KeyedProcessOperator.*onProcessingTime*(KeyedProcessOperator.java:78) at org.apache.flink.streaming.api.operators.HeapInternalTimerService.*onProcessingTime*(HeapInternalTimerService.java:266) at org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:285) ... 7 more Caused by: java.io.EOFException at java.io.DataInputStream.readFully(DataInputStream.java:208) at java.io.DataInputStream.readUTF(DataInputStream.java:618) at java.io.DataInputStream.readUTF(DataInputStream.java:573) at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:381) at org.apache.flink.contrib.streaming.state.RocksDBValueState.value(RocksDBValueState.java:87) ... 12 more Thanks in advance for any help -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Hi Edward, From this log: Caused by: java.io.EOFException, it seems that the state metadata file has been corrupted. But I can't confirm it, maybe Stefan knows more details, Ping him for you. Thanks, vino. Edward Rojas <[hidden email]> 于2018年9月7日周五 上午1:22写道: Hello all, |
Hi,
what I can say is that any failures like OOMs should not corrupt checkpoint files, because only successfully completed checkpoints are used for recovery by the job manager. Just to get a bit more info, are you using full or incremental checkpoints? Unfortunately, it is a bit hard to say from the given information what the cause of the problem is. Typically, these problems have been observed when something was wrong with a serializer or a stateful serializer was used from multiple threads. Best, Stefan
|
Hi Stefan, Vino, Thanks for your answers. We are using full checkpointing, not incremental. We are using custom serializers for the operators state classes, The serializers perform encryption before writing and decrypt when reading. The serializer is stateless. We register the Serializers by using env.getConfig() .registerTypeWithKryoSerializer(ProcessState.class, ProcessStateSerializer.class); In normal cases the Serialization works correctly, even after recovering from a failure. We get this error only when taskmnager fails due to memory problems. Thanks again for your help, Edward El vie., 7 sept. 2018 a las 11:51, Stefan Richter (<[hidden email]>) escribió:
Edward Alexander Rojas Clavijo Software Engineer Hybrid Cloud IBM France |
Free forum by Nabble | Edit this page |