Hello,
I am using flink 1.9 with beam 2.26 and rocksdb state backend. I am getting this exception -
org.apache.flink.util.SerializedThrowable: Caught exception while processing timer.
at org.apache.flink.streaming.runtime.tasks.StreamTask$StreamTaskAsyncExceptionHandler.handleAsyncException(StreamTask.java:978)
at org.apache.flink.streaming.runtime.tasks.StreamTask.handleAsyncException(StreamTask.java:952)
at org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:285)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.util.SerializedThrowable: org.apache.beam.sdk.util.UserCodeException: java.lang.RuntimeException: Error reading state.
at org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:284)
... 7 common frames omitted
Caused by: org.apache.flink.util.SerializedThrowable: java.lang.RuntimeException: Error reading state.
at org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:39)
at com.intuit.data.platform.process.sessionization.transform.SessionEnrichDoFn$OnTimerInvoker$tsbufferexpiry$dHMtYnVmZmVyX2V4cGlyeQ.invokeOnTimer(Unknown Source)
at org.apache.beam.sdk.transforms.reflect.ByteBuddyDoFnInvokerFactory$DoFnInvokerBase.invokeOnTimer(ByteBuddyDoFnInvokerFactory.java:235)
at org.apache.beam.runners.core.SimpleDoFnRunner.onTimer(SimpleDoFnRunner.java:226)
at org.apache.beam.runners.core.StatefulDoFnRunner.onTimer(StatefulDoFnRunner.java:237)
at org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.onTimer(DoFnRunnerWithMetricsUpdate.java:79)
at org.apache.beam.runners.core.SimplePushbackSideInputDoFnRunner.onTimer(SimplePushbackSideInputDoFnRunner.java:119)
at org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.fireTimer(DoFnOperator.java:1010)
at org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.fireTimerInternal(DoFnOperator.java:995)
at org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.onProcessingTime(DoFnOperator.java:990)
at org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.onProcessingTime(InternalTimerServiceImpl.java:260)
at org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:281)
... 7 common frames omitted
Caused by: org.apache.flink.util.SerializedThrowable: Error reading state.
at org.apache.beam.runners.flink.translation.wrappers.streaming.state.FlinkStateInternals$FlinkBagState.read(FlinkStateInternals.java:494)
at com.intuit.data.platform.process.sessionization.transform.SessionEnrichDoFn.onEventBufferExpiry(SessionEnrichDoFn.java:92)
Caused by: org.apache.flink.util.SerializedThrowable: Error while retrieving data from RocksDB
at org.apache.flink.contrib.streaming.state.RocksDBListState.getInternal(RocksDBListState.java:121)
at org.apache.flink.contrib.streaming.state.RocksDBListState.get(RocksDBListState.java:111)
at org.apache.flink.contrib.streaming.state.RocksDBListState.get(RocksDBListState.java:60)
at org.apache.beam.runners.flink.translation.wrappers.streaming.state.FlinkStateInternals$FlinkBagState.read(FlinkStateInternals.java:471)
at com.intuit.data.platform.process.sessionization.transform.SessionEnrichDoFn.onEventBufferExpiry(SessionEnrichDoFn.java:92)
at com.intuit.data.platform.process.sessionization.transform.SessionEnrichDoFn$OnTimerInvoker$tsbufferexpiry$dHMtYnVmZmVyX2V4cGlyeQ.invokeOnTimer(Unknown Source)
at org.apache.beam.sdk.transforms.reflect.ByteBuddyDoFnInvokerFactory$DoFnInvokerBase.invokeOnTimer(ByteBuddyDoFnInvokerFactory.java:235)
at org.apache.beam.runners.core.SimpleDoFnRunner.onTimer(SimpleDoFnRunner.java:226)
at org.apache.beam.runners.core.StatefulDoFnRunner.onTimer(StatefulDoFnRunner.java:237)
at org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.onTimer(DoFnRunnerWithMetricsUpdate.java:79)
at org.apache.beam.runners.core.SimplePushbackSideInputDoFnRunner.onTimer(SimplePushbackSideInputDoFnRunner.java:119)
at org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.fireTimer(DoFnOperator.java:1010)
at org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.fireTimerInternal(DoFnOperator.java:995)
at org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.onProcessingTime(DoFnOperator.java:990)
at org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.onProcessingTime(InternalTimerServiceImpl.java:260)
at org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:281)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.util.SerializedThrowable: bad entry in block
at org.rocksdb.RocksDB.get(Native Method)
at org.rocksdb.RocksDB.get(RocksDB.java:810)
at org.apache.flink.contrib.streaming.state.RocksDBListState.getInternal(RocksDBListState.java:118)
... 22 common frames omitted
Deleting the state and starting clean resolved the issue. What would be the root cause for this?
How do I debug this?
Thanks,
Omkar
|
Hi Omkar,
sorry for the late reply. This sounds like a serious issue. It looks like some of the RocksDB data is corrupt. Are you sure this is not a problem of you storage layer? Otherwise I would investigate whether the serializers work correctly. Maybe Beam did put a corrupt data into Flink's state? Regards, Timo On 26.01.21 20:06, Deshpande, Omkar wrote: > Hello, > > I am using flink 1.9 with beam 2.26 and rocksdb state backend. I am > getting this exception - > > org.apache.flink.util.SerializedThrowable: Caught exception while > processing timer. > at > org.apache.flink.streaming.runtime.tasks.StreamTask$StreamTaskAsyncExceptionHandler.handleAsyncException(StreamTask.java:978) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.handleAsyncException(StreamTask.java:952) > at > org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:285) > at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > Caused by: org.apache.flink.util.SerializedThrowable: > org.apache.beam.sdk.util.UserCodeException: java.lang.RuntimeException: > Error reading state. > at > org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:284) > ... 7 common frames omitted > Caused by: org.apache.flink.util.SerializedThrowable: > java.lang.RuntimeException: Error reading state. > at > org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:39) > at > com.intuit.data.platform.process.sessionization.transform.SessionEnrichDoFn$OnTimerInvoker$tsbufferexpiry$dHMtYnVmZmVyX2V4cGlyeQ.invokeOnTimer(Unknown > Source) > at > org.apache.beam.sdk.transforms.reflect.ByteBuddyDoFnInvokerFactory$DoFnInvokerBase.invokeOnTimer(ByteBuddyDoFnInvokerFactory.java:235) > at > org.apache.beam.runners.core.SimpleDoFnRunner.onTimer(SimpleDoFnRunner.java:226) > at > org.apache.beam.runners.core.StatefulDoFnRunner.onTimer(StatefulDoFnRunner.java:237) > at > org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.onTimer(DoFnRunnerWithMetricsUpdate.java:79) > at > org.apache.beam.runners.core.SimplePushbackSideInputDoFnRunner.onTimer(SimplePushbackSideInputDoFnRunner.java:119) > at > org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.fireTimer(DoFnOperator.java:1010) > at > org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.fireTimerInternal(DoFnOperator.java:995) > at > org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.onProcessingTime(DoFnOperator.java:990) > at > org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.onProcessingTime(InternalTimerServiceImpl.java:260) > at > org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:281) > ... 7 common frames omitted > Caused by: org.apache.flink.util.SerializedThrowable: Error reading state. > at > org.apache.beam.runners.flink.translation.wrappers.streaming.state.FlinkStateInternals$FlinkBagState.read(FlinkStateInternals.java:494) > at > com.intuit.data.platform.process.sessionization.transform.SessionEnrichDoFn.onEventBufferExpiry(SessionEnrichDoFn.java:92) > Caused by: org.apache.flink.util.SerializedThrowable: Error while > retrieving data from RocksDB > at > org.apache.flink.contrib.streaming.state.RocksDBListState.getInternal(RocksDBListState.java:121) > at > org.apache.flink.contrib.streaming.state.RocksDBListState.get(RocksDBListState.java:111) > at > org.apache.flink.contrib.streaming.state.RocksDBListState.get(RocksDBListState.java:60) > at > org.apache.beam.runners.flink.translation.wrappers.streaming.state.FlinkStateInternals$FlinkBagState.read(FlinkStateInternals.java:471) > at > com.intuit.data.platform.process.sessionization.transform.SessionEnrichDoFn.onEventBufferExpiry(SessionEnrichDoFn.java:92) > at > com.intuit.data.platform.process.sessionization.transform.SessionEnrichDoFn$OnTimerInvoker$tsbufferexpiry$dHMtYnVmZmVyX2V4cGlyeQ.invokeOnTimer(Unknown > Source) > at > org.apache.beam.sdk.transforms.reflect.ByteBuddyDoFnInvokerFactory$DoFnInvokerBase.invokeOnTimer(ByteBuddyDoFnInvokerFactory.java:235) > at > org.apache.beam.runners.core.SimpleDoFnRunner.onTimer(SimpleDoFnRunner.java:226) > at > org.apache.beam.runners.core.StatefulDoFnRunner.onTimer(StatefulDoFnRunner.java:237) > at > org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.onTimer(DoFnRunnerWithMetricsUpdate.java:79) > at > org.apache.beam.runners.core.SimplePushbackSideInputDoFnRunner.onTimer(SimplePushbackSideInputDoFnRunner.java:119) > at > org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.fireTimer(DoFnOperator.java:1010) > at > org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.fireTimerInternal(DoFnOperator.java:995) > at > org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.onProcessingTime(DoFnOperator.java:990) > at > org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.onProcessingTime(InternalTimerServiceImpl.java:260) > at > org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:281) > at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > Caused by: org.apache.flink.util.SerializedThrowable: bad entry in block > at org.rocksdb.RocksDB.get(Native Method) > at org.rocksdb.RocksDB.get(RocksDB.java:810) > at > org.apache.flink.contrib.streaming.state.RocksDBListState.getInternal(RocksDBListState.java:118) > ... 22 common frames omitted > > > Deleting the state and starting clean resolved the issue. What would be > the root cause for this? > How do I debug this? > > Thanks, > Omkar |
Free forum by Nabble | Edit this page |