Re: Flink job hangs using rocksDb as backend

Posted by Nico Kruber on
URL: http://deprecated-apache-flink-user-mailing-list-archive.369.s1.nabble.com/Flink-job-hangs-using-rocksDb-as-backend-tp21356p21378.html

If this is about too many timers and your application allows it, you may
also try to reduce the timer resolution and thus frequency by coalescing
them [1].


Nico

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/stream/operators/process_function.html#timer-coalescing

On 11/07/18 18:27, Stephan Ewen wrote:

> Hi shishal!
>
> I think there is an issue with cancellation when many timers fire at the
> same time. These timers have to finish before shutdown happens, this
> seems to take a while in your case.
>
> Did the TM process actually kill itself in the end (and got restarted)?
>
>
>
> On Wed, Jul 11, 2018 at 9:29 AM, shishal <[hidden email]
> <mailto:[hidden email]>> wrote:
>
>     Hi,
>
>     I am using flink 1.4.2 with rocksdb as backend. I am using process
>     function
>     with timer on EventTime.  For checkpointing I am using hdfs.
>
>     I am trying load testing so Iam reading kafka from beginning (aprox
>     7 days
>     data with 50M events).
>
>     My job gets stuck after aprox 20 min with no error. There after
>     watermark do
>     not progress and all checkpoint fails.
>
>     Also When I try to cancel my job (using web UI) , it takes several
>     minutes
>     to finally gets cancelled. Also it makes Task manager down as well.
>
>     There is no logs while my job hanged but while cancelling I get
>     following
>     error.
>
>     /
>
>     2018-07-11 09:10:39,385 ERROR
>     org.apache.flink.runtime.taskmanager.TaskManager              -
>     ==============================================================
>     ======================      FATAL      =======================
>     ==============================================================
>
>     A fatal error occurred, forcing the TaskManager to shut down: Task
>     'process
>     (3/6)' did not react to cancelling signal in the last 30 seconds, but is
>     stuck in method:
>      org.rocksdb.RocksDB.get(Native Method)
>     org.rocksdb.RocksDB.get(RocksDB.java:810)
>     org.apache.flink.contrib.streaming.state.RocksDBMapState.get(RocksDBMapState.java:102)
>     org.apache.flink.runtime.state.UserFacingMapState.get(UserFacingMapState.java:47)
>     nl.ing.gmtt.observer.analyser.job.rtpe.process.RtpeProcessFunction.onTimer(RtpeProcessFunction.java:94)
>     org.apache.flink.streaming.api.operators.KeyedProcessOperator.onEventTime(KeyedProcessOperator.java:75)
>     org.apache.flink.streaming.api.operators.HeapInternalTimerService.advanceWatermark(HeapInternalTimerService.java:288)
>     org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.java:108)
>     org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:885)
>     org.apache.flink.streaming.runtime.io
>     <http://runtime.io>.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:288)
>     org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:189)
>     org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:111)
>     org.apache.flink.streaming.runtime.io
>     <http://runtime.io>.StreamInputProcessor.processInput(StreamInputProcessor.java:189)
>     org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
>     org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:264)
>     org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
>     java.lang.Thread.run(Thread.java:748)
>
>     2018-07-11 09:10:39,390 DEBUG
>     org.apache.flink.runtime.akka.StoppingSupervisorWithoutLoggingActorKilledExceptionStrategy
>
>     - Actor was killed. Stopping it now.
>     akka.actor.ActorKilledException: Kill
>     2018-07-11 09:10:39,407 INFO
>     org.apache.flink.runtime.taskmanager.TaskManager              - Stopping
>     TaskManager akka://flink/user/taskmanager#-1231617791.
>     2018-07-11 09:10:39,408 INFO
>     org.apache.flink.runtime.taskmanager.TaskManager              -
>     Cancelling
>     all computations and discarding all cached data.
>     2018-07-11 09:10:39,409 INFO 
>     org.apache.flink.runtime.taskmanager.Task                   
>     - Attempting to fail task externally process (3/6)
>     (432fd129f3eea363334521f8c8de5198).
>     2018-07-11 09:10:39,409 INFO 
>     org.apache.flink.runtime.taskmanager.Task                   
>     - Task process (3/6) is already in state CANCELING
>     2018-07-11 09:10:39,409 INFO 
>     org.apache.flink.runtime.taskmanager.Task                   
>     - Attempting to fail task externally process (4/6)
>     (7c6b96c9f32b067bdf8fa7c283eca2e0).
>     2018-07-11 09:10:39,409 INFO 
>     org.apache.flink.runtime.taskmanager.Task                   
>     - Task process (4/6) is already in state CANCELING
>     2018-07-11 09:10:39,409 INFO 
>     org.apache.flink.runtime.taskmanager.Task                   
>     - Attempting to fail task externally process (2/6)
>     (a4f731797a7ea210fd0b512b0263bcd9).
>     2018-07-11 09:10:39,409 INFO 
>     org.apache.flink.runtime.taskmanager.Task                   
>     - Task process (2/6) is already in state CANCELING
>     2018-07-11 09:10:39,409 INFO 
>     org.apache.flink.runtime.taskmanager.Task                   
>     - Attempting to fail task externally process (1/6)
>     (cd8a113779a4c00a051d78ad63bc7963).
>     2018-07-11 09:10:39,409 INFO 
>     org.apache.flink.runtime.taskmanager.Task                   
>     - Task process (1/6) is already in state CANCELING
>     2018-07-11 09:10:39,409 INFO
>     org.apache.flink.runtime.taskmanager.TaskManager              -
>     Disassociating from JobManager
>     2018-07-11 09:10:39,412 INFO
>     org.apache.flink.runtime.blob.PermanentBlobCache              - Shutting
>     down BLOB cache
>     2018-07-11 09:10:39,431 INFO
>     org.apache.flink.runtime.blob.TransientBlobCache              - Shutting
>     down BLOB cache
>     2018-07-11 09:10:39,444 INFO
>     org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService 
>     -
>     Stopping ZooKeeperLeaderRetrievalService.
>     2018-07-11 09:10:39,444 DEBUG
>     org.apache.flink.runtime.io
>     <http://org.apache.flink.runtime.io>.disk.iomanager.IOManager       
>       - Shutting
>     down I/O manager.
>     2018-07-11 09:10:39,451 INFO
>     org.apache.flink.runtime.io
>     <http://org.apache.flink.runtime.io>.disk.iomanager.IOManager       
>       - I/O manager
>     removed spill file directory
>     /tmp/flink-io-989505e5-ac33-4d56-add5-04b2ad3067b4
>     2018-07-11 09:10:39,461 INFO
>     org.apache.flink.runtime.io
>     <http://org.apache.flink.runtime.io>.network.NetworkEnvironment     
>       - Shutting
>     down the network environment and its components.
>     2018-07-11 09:10:39,461 DEBUG
>     org.apache.flink.runtime.io
>     <http://org.apache.flink.runtime.io>.network.NetworkEnvironment     
>       - Shutting
>     down network connection manager
>     2018-07-11 09:10:39,462 INFO
>     org.apache.flink.runtime.io
>     <http://org.apache.flink.runtime.io>.network.netty.NettyClient     
>        - Successful
>     shutdown (took 1 ms).
>     2018-07-11 09:10:39,472 INFO
>     org.apache.flink.runtime.io
>     <http://org.apache.flink.runtime.io>.network.netty.NettyServer     
>        - Successful
>     shutdown (took 10 ms).
>     2018-07-11 09:10:39,472 DEBUG
>     org.apache.flink.runtime.io
>     <http://org.apache.flink.runtime.io>.network.NetworkEnvironment     
>       - Shutting
>     down intermediate result partition manager
>     2018-07-11 09:10:39,473 DEBUG
>     org.apache.flink.runtime.io
>     <http://org.apache.flink.runtime.io>.network.partition.ResultPartitionManager 
>     -
>     Releasing 0 partitions because of shutdown.
>     2018-07-11 09:10:39,474 DEBUG
>     org.apache.flink.runtime.io
>     <http://org.apache.flink.runtime.io>.network.partition.ResultPartitionManager 
>     -
>     Successful shutdown.
>     2018-07-11 09:10:39,498 INFO
>     org.apache.flink.runtime.taskmanager.TaskManager              - Task
>     manager
>     akka://flink/user/taskmanager is completely shut down.
>     2018-07-11 09:10:39,504 ERROR
>     org.apache.flink.runtime.taskmanager.TaskManager              - Actor
>     akka://flink/user/taskmanager#-1231617791 terminated, stopping
>     process...
>     2018-07-11 09:10:39,563 INFO 
>     org.apache.flink.runtime.taskmanager.Task                   
>     - Notifying TaskManager about fatal error. Task 'process (2/6)' did not
>     react to cancelling signal in the last 30 seconds, but is stuck in
>     method:
>      org.rocksdb.RocksDB.get(Native Method)
>     org.rocksdb.RocksDB.get(RocksDB.java:810)
>     org.apache.flink.contrib.streaming.state.RocksDBMapState.get(RocksDBMapState.java:102)
>     org.apache.flink.runtime.state.UserFacingMapState.get(UserFacingMapState.java:47)
>     nl.ing.gmtt.observer.analyser.job.rtpe.process.RtpeProcessFunction.onTimer(RtpeProcessFunction.java:94)
>     org.apache.flink.streaming.api.operators.KeyedProcessOperator.onEventTime(KeyedProcessOperator.java:75)
>     org.apache.flink.streaming.api.operators.HeapInternalTimerService.advanceWatermark(HeapInternalTimerService.java:288)
>     org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.java:108)
>     org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:885)
>     org.apache.flink.streaming.runtime.io
>     <http://runtime.io>.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:288)
>     org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:189)
>     org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:111)
>     org.apache.flink.streaming.runtime.io
>     <http://runtime.io>.StreamInputProcessor.processInput(StreamInputProcessor.java:189)
>     org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
>     org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:264)
>     org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
>     java.lang.Thread.run(Thread.java:748)
>     .
>     2018-07-11 09:10:39,575 INFO 
>     org.apache.flink.runtime.taskmanager.Task                   
>     - Notifying TaskManager about fatal error. Task 'process (1/6)' did not
>     react to cancelling signal in the last 30 seconds, but is stuck in
>     method:
>
>     sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>     java.lang.reflect.Constructor.newInstance(Constructor.java:423)
>     java.lang.Class.newInstance(Class.java:442)
>     org.apache.flink.api.java.typeutils.runtime.PojoSerializer.createInstance(PojoSerializer.java:196)
>     org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:399)
>     org.apache.flink.contrib.streaming.state.RocksDBMapState.deserializeUserValue(RocksDBMapState.java:304)
>     org.apache.flink.contrib.streaming.state.RocksDBMapState.get(RocksDBMapState.java:104)
>     org.apache.flink.runtime.state.UserFacingMapState.get(UserFacingMapState.java:47)
>     nl.ing.gmtt.observer.analyser.job.rtpe.process.RtpeProcessFunction.onTimer(RtpeProcessFunction.java:94)
>     org.apache.flink.streaming.api.operators.KeyedProcessOperator.onEventTime(KeyedProcessOperator.java:75)
>     org.apache.flink.streaming.api.operators.HeapInternalTimerService.advanceWatermark(HeapInternalTimerService.java:288)
>     org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.java:108)
>     org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:885)
>     org.apache.flink.streaming.runtime.io
>     <http://runtime.io>.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:288)
>     org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:189)
>     org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:111)
>     org.apache.flink.streaming.runtime.io
>     <http://runtime.io>.StreamInputProcessor.processInput(StreamInputProcessor.java:189)
>     org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
>     org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:264)
>     org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
>     java.lang.Thread.run(Thread.java:748)
>     /
>
>
>
>     --
>     Sent from:
>     http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>     <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/>
>
>
--
Nico Kruber | Software Engineer
data Artisans

Follow us @dataArtisans
--
Join Flink Forward - The Apache Flink Conference
Stream Processing | Event Driven | Real Time
--
Data Artisans GmbH | Stresemannstr. 121A,10963 Berlin, Germany
data Artisans, Inc. | 1161 Mission Street, San Francisco, CA-94103, USA
--
Data Artisans GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen


signature.asc (201 bytes) Download Attachment