Rocksdb - org.apache.flink.util.SerializedThrowable : bad entry in block

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

Rocksdb - org.apache.flink.util.SerializedThrowable : bad entry in block

Deshpande, Omkar
Hello,

I am using flink 1.9 with beam 2.26 and rocksdb state backend. I am getting this exception -

org.apache.flink.util.SerializedThrowable: Caught exception while processing timer.
at org.apache.flink.streaming.runtime.tasks.StreamTask$StreamTaskAsyncExceptionHandler.handleAsyncException(StreamTask.java:978)
at org.apache.flink.streaming.runtime.tasks.StreamTask.handleAsyncException(StreamTask.java:952)
at org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:285)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.util.SerializedThrowable: org.apache.beam.sdk.util.UserCodeException: java.lang.RuntimeException: Error reading state.
at org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:284)
... 7 common frames omitted
Caused by: org.apache.flink.util.SerializedThrowable: java.lang.RuntimeException: Error reading state.
at org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:39)
at com.intuit.data.platform.process.sessionization.transform.SessionEnrichDoFn$OnTimerInvoker$tsbufferexpiry$dHMtYnVmZmVyX2V4cGlyeQ.invokeOnTimer(Unknown Source)
at org.apache.beam.sdk.transforms.reflect.ByteBuddyDoFnInvokerFactory$DoFnInvokerBase.invokeOnTimer(ByteBuddyDoFnInvokerFactory.java:235)
at org.apache.beam.runners.core.SimpleDoFnRunner.onTimer(SimpleDoFnRunner.java:226)
at org.apache.beam.runners.core.StatefulDoFnRunner.onTimer(StatefulDoFnRunner.java:237)
at org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.onTimer(DoFnRunnerWithMetricsUpdate.java:79)
at org.apache.beam.runners.core.SimplePushbackSideInputDoFnRunner.onTimer(SimplePushbackSideInputDoFnRunner.java:119)
at org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.fireTimer(DoFnOperator.java:1010)
at org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.fireTimerInternal(DoFnOperator.java:995)
at org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.onProcessingTime(DoFnOperator.java:990)
at org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.onProcessingTime(InternalTimerServiceImpl.java:260)
at org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:281)
... 7 common frames omitted
Caused by: org.apache.flink.util.SerializedThrowable: Error reading state.
at org.apache.beam.runners.flink.translation.wrappers.streaming.state.FlinkStateInternals$FlinkBagState.read(FlinkStateInternals.java:494)
at com.intuit.data.platform.process.sessionization.transform.SessionEnrichDoFn.onEventBufferExpiry(SessionEnrichDoFn.java:92)
Caused by: org.apache.flink.util.SerializedThrowable: Error while retrieving data from RocksDB
at org.apache.flink.contrib.streaming.state.RocksDBListState.getInternal(RocksDBListState.java:121)
at org.apache.flink.contrib.streaming.state.RocksDBListState.get(RocksDBListState.java:111)
at org.apache.flink.contrib.streaming.state.RocksDBListState.get(RocksDBListState.java:60)
at org.apache.beam.runners.flink.translation.wrappers.streaming.state.FlinkStateInternals$FlinkBagState.read(FlinkStateInternals.java:471)
at com.intuit.data.platform.process.sessionization.transform.SessionEnrichDoFn.onEventBufferExpiry(SessionEnrichDoFn.java:92)
at com.intuit.data.platform.process.sessionization.transform.SessionEnrichDoFn$OnTimerInvoker$tsbufferexpiry$dHMtYnVmZmVyX2V4cGlyeQ.invokeOnTimer(Unknown Source)
at org.apache.beam.sdk.transforms.reflect.ByteBuddyDoFnInvokerFactory$DoFnInvokerBase.invokeOnTimer(ByteBuddyDoFnInvokerFactory.java:235)
at org.apache.beam.runners.core.SimpleDoFnRunner.onTimer(SimpleDoFnRunner.java:226)
at org.apache.beam.runners.core.StatefulDoFnRunner.onTimer(StatefulDoFnRunner.java:237)
at org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.onTimer(DoFnRunnerWithMetricsUpdate.java:79)
at org.apache.beam.runners.core.SimplePushbackSideInputDoFnRunner.onTimer(SimplePushbackSideInputDoFnRunner.java:119)
at org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.fireTimer(DoFnOperator.java:1010)
at org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.fireTimerInternal(DoFnOperator.java:995)
at org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.onProcessingTime(DoFnOperator.java:990)
at org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.onProcessingTime(InternalTimerServiceImpl.java:260)
at org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:281)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.util.SerializedThrowable: bad entry in block
at org.rocksdb.RocksDB.get(Native Method)
at org.rocksdb.RocksDB.get(RocksDB.java:810)
at org.apache.flink.contrib.streaming.state.RocksDBListState.getInternal(RocksDBListState.java:118)
... 22 common frames omitted


Deleting the state and starting clean resolved the issue. What would be the root cause for this?
How do I debug this?

Thanks,
Omkar
Reply | Threaded
Open this post in threaded view
|

Re: Rocksdb - org.apache.flink.util.SerializedThrowable : bad entry in block

Timo Walther
Hi Omkar,

sorry for the late reply. This sounds like a serious issue. It looks
like some of the RocksDB data is corrupt. Are you sure this is not a
problem of you storage layer?

Otherwise I would investigate whether the serializers work correctly.
Maybe Beam did put a corrupt data into Flink's state?

Regards,
Timo


On 26.01.21 20:06, Deshpande, Omkar wrote:

> Hello,
>
> I am using flink 1.9 with beam 2.26 and rocksdb state backend. I am
> getting this exception -
>
> org.apache.flink.util.SerializedThrowable: Caught exception while
> processing timer.
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask$StreamTaskAsyncExceptionHandler.handleAsyncException(StreamTask.java:978)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.handleAsyncException(StreamTask.java:952)
> at
> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:285)
> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
> at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.flink.util.SerializedThrowable:
> org.apache.beam.sdk.util.UserCodeException: java.lang.RuntimeException:
> Error reading state.
> at
> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:284)
> ... 7 common frames omitted
> Caused by: org.apache.flink.util.SerializedThrowable:
> java.lang.RuntimeException: Error reading state.
> at
> org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:39)
> at
> com.intuit.data.platform.process.sessionization.transform.SessionEnrichDoFn$OnTimerInvoker$tsbufferexpiry$dHMtYnVmZmVyX2V4cGlyeQ.invokeOnTimer(Unknown
> Source)
> at
> org.apache.beam.sdk.transforms.reflect.ByteBuddyDoFnInvokerFactory$DoFnInvokerBase.invokeOnTimer(ByteBuddyDoFnInvokerFactory.java:235)
> at
> org.apache.beam.runners.core.SimpleDoFnRunner.onTimer(SimpleDoFnRunner.java:226)
> at
> org.apache.beam.runners.core.StatefulDoFnRunner.onTimer(StatefulDoFnRunner.java:237)
> at
> org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.onTimer(DoFnRunnerWithMetricsUpdate.java:79)
> at
> org.apache.beam.runners.core.SimplePushbackSideInputDoFnRunner.onTimer(SimplePushbackSideInputDoFnRunner.java:119)
> at
> org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.fireTimer(DoFnOperator.java:1010)
> at
> org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.fireTimerInternal(DoFnOperator.java:995)
> at
> org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.onProcessingTime(DoFnOperator.java:990)
> at
> org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.onProcessingTime(InternalTimerServiceImpl.java:260)
> at
> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:281)
> ... 7 common frames omitted
> Caused by: org.apache.flink.util.SerializedThrowable: Error reading state.
> at
> org.apache.beam.runners.flink.translation.wrappers.streaming.state.FlinkStateInternals$FlinkBagState.read(FlinkStateInternals.java:494)
> at
> com.intuit.data.platform.process.sessionization.transform.SessionEnrichDoFn.onEventBufferExpiry(SessionEnrichDoFn.java:92)
> Caused by: org.apache.flink.util.SerializedThrowable: Error while
> retrieving data from RocksDB
> at
> org.apache.flink.contrib.streaming.state.RocksDBListState.getInternal(RocksDBListState.java:121)
> at
> org.apache.flink.contrib.streaming.state.RocksDBListState.get(RocksDBListState.java:111)
> at
> org.apache.flink.contrib.streaming.state.RocksDBListState.get(RocksDBListState.java:60)
> at
> org.apache.beam.runners.flink.translation.wrappers.streaming.state.FlinkStateInternals$FlinkBagState.read(FlinkStateInternals.java:471)
> at
> com.intuit.data.platform.process.sessionization.transform.SessionEnrichDoFn.onEventBufferExpiry(SessionEnrichDoFn.java:92)
> at
> com.intuit.data.platform.process.sessionization.transform.SessionEnrichDoFn$OnTimerInvoker$tsbufferexpiry$dHMtYnVmZmVyX2V4cGlyeQ.invokeOnTimer(Unknown
> Source)
> at
> org.apache.beam.sdk.transforms.reflect.ByteBuddyDoFnInvokerFactory$DoFnInvokerBase.invokeOnTimer(ByteBuddyDoFnInvokerFactory.java:235)
> at
> org.apache.beam.runners.core.SimpleDoFnRunner.onTimer(SimpleDoFnRunner.java:226)
> at
> org.apache.beam.runners.core.StatefulDoFnRunner.onTimer(StatefulDoFnRunner.java:237)
> at
> org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.onTimer(DoFnRunnerWithMetricsUpdate.java:79)
> at
> org.apache.beam.runners.core.SimplePushbackSideInputDoFnRunner.onTimer(SimplePushbackSideInputDoFnRunner.java:119)
> at
> org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.fireTimer(DoFnOperator.java:1010)
> at
> org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.fireTimerInternal(DoFnOperator.java:995)
> at
> org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.onProcessingTime(DoFnOperator.java:990)
> at
> org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.onProcessingTime(InternalTimerServiceImpl.java:260)
> at
> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:281)
> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
> at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.flink.util.SerializedThrowable: bad entry in block
> at org.rocksdb.RocksDB.get(Native Method)
> at org.rocksdb.RocksDB.get(RocksDB.java:810)
> at
> org.apache.flink.contrib.streaming.state.RocksDBListState.getInternal(RocksDBListState.java:118)
> ... 22 common frames omitted
>
>
> Deleting the state and starting clean resolved the issue. What would be
> the root cause for this?
> How do I debug this?
>
> Thanks,
> Omkar