Stream job failed after increasing number retained checkpoints

classic Classic list List threaded Threaded
6 messages Options
Reply | Threaded
Open this post in threaded view
|

Stream job failed after increasing number retained checkpoints

Jose Miguel Tejedor Fernandez
Hello,

I have several stream jobs running (v. 1.3.1 ) in production which always fails after a fixed period of around 30h after being executing. That's the WARN trace before failing:

Association with remote system [akka.tcp://flink@...:39876] has failed, address is now gated for [5000] ms. Reason: [Association failed with [akka.tcp://flink@...:39876]] Caused by: [No response from remote for outbound association. Handshake timed out after [20000 ms].

The main change done in the job configuration was to increase the state.checkpoints.num-retained from 1 to 2880. I am using asynchronous RocksDB to persists to snapshot the state. (I attach some screenshots with the  checkpoint conf from webUI)

  • May my assumption be correct that the increase of checkpoints.num-retained is causing the problem? Any known issue regarding this?
  • Besides, Is there any way to increase the Akka handshake timeout from the current 20000 ms to a higher value? I considered that it may be convenient to increase the timeout to 1 minute instead.

BR



Screen Shot 2018-01-09 at 17.35.25.png (61K) Download Attachment
Screen Shot 2018-01-09 at 17.35.18.png (54K) Download Attachment
Screen Shot 2018-01-09 at 17.35.00.png (94K) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: Stream job failed after increasing number retained checkpoints

Piotr Nowojski
Hi,

Increasing akka’s timeouts is rarely a solution for any problems - it either do not help, or just mask the issue making it less visible. But yes, it is possible to bump the limits: https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/config.html#distributed-coordination-via-akka

I don’t think that state.checkpoints.num-retained was thought to handle such large numbers of retained checkpoint so maybe there are some known/unknown limitations. Stefan, do you know something in this regard?

Parallel thing to do is that like for any other akka timeout, you should track down the root cause of it. This one warning line doesn’t tell much. From where does it come from? Client log? Job manager log? Task manager log? Please search on the opposite side of the time outing connection for possible root cause of the timeout including:
- possible error/exceptions/warnings
- long GC pauses or other blocking operations (possibly long unnatural gaps in the logs)
- machine health (CPU usage, disks usage, network connections)

Piotrek

On 9 Jan 2018, at 16:38, Jose Miguel Tejedor Fernandez <[hidden email]> wrote:

Hello,

I have several stream jobs running (v. 1.3.1 ) in production which always fails after a fixed period of around 30h after being executing. That's the WARN trace before failing:

Association with remote system [akka.tcp://flink@...:39876] has failed, address is now gated for [5000] ms. Reason: [Association failed with [akka.tcp://flink@...:39876]] Caused by: [No response from remote for outbound association. Handshake timed out after [20000 ms].

The main change done in the job configuration was to increase the state.checkpoints.num-retained from 1 to 2880. I am using asynchronous RocksDB to persists to snapshot the state. (I attach some screenshots with the  checkpoint conf from webUI)

  • May my assumption be correct that the increase of checkpoints.num-retained is causing the problem? Any known issue regarding this?
  • Besides, Is there any way to increase the Akka handshake timeout from the current 20000 ms to a higher value? I considered that it may be convenient to increase the timeout to 1 minute instead.

BR


<Screen Shot 2018-01-09 at 17.35.25.png><Screen Shot 2018-01-09 at 17.35.18.png><Screen Shot 2018-01-09 at 17.35.00.png>

Reply | Threaded
Open this post in threaded view
|

Re: Stream job failed after increasing number retained checkpoints

Stefan Richter
Hi,

there is no known limitation in the strict sense, but you might run out of dfs space or job manager memory if you keep around a huge number checkpoints. I wonder what reason you might have that you ever want such a huge number of retained checkpoints? Usually keeping one checkpoint should do the job, maybe a couple more if you are very afraid about corruption that goes beyond your DFSs capabilities to handle it. Is there any reason for that or maybe a misconception about increasing the number of retained checkpoints is good for?

Best,
Stefan 

Am 10.01.2018 um 08:54 schrieb Piotr Nowojski <[hidden email]>:

Hi,

Increasing akka’s timeouts is rarely a solution for any problems - it either do not help, or just mask the issue making it less visible. But yes, it is possible to bump the limits: https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/config.html#distributed-coordination-via-akka

I don’t think that state.checkpoints.num-retained was thought to handle such large numbers of retained checkpoint so maybe there are some known/unknown limitations. Stefan, do you know something in this regard?

Parallel thing to do is that like for any other akka timeout, you should track down the root cause of it. This one warning line doesn’t tell much. From where does it come from? Client log? Job manager log? Task manager log? Please search on the opposite side of the time outing connection for possible root cause of the timeout including:
- possible error/exceptions/warnings
- long GC pauses or other blocking operations (possibly long unnatural gaps in the logs)
- machine health (CPU usage, disks usage, network connections)

Piotrek

On 9 Jan 2018, at 16:38, Jose Miguel Tejedor Fernandez <[hidden email]> wrote:

Hello,

I have several stream jobs running (v. 1.3.1 ) in production which always fails after a fixed period of around 30h after being executing. That's the WARN trace before failing:

Association with remote system [akka.tcp://flink@...:39876] has failed, address is now gated for [5000] ms. Reason: [Association failed with [akka.tcp://flink@...:39876]] Caused by: [No response from remote for outbound association. Handshake timed out after [20000 ms].

The main change done in the job configuration was to increase the state.checkpoints.num-retained from 1 to 2880. I am using asynchronous RocksDB to persists to snapshot the state. (I attach some screenshots with the  checkpoint conf from webUI)

  • May my assumption be correct that the increase of checkpoints.num-retained is causing the problem? Any known issue regarding this?
  • Besides, Is there any way to increase the Akka handshake timeout from the current 20000 ms to a higher value? I considered that it may be convenient to increase the timeout to 1 minute instead.

BR


<Screen Shot 2018-01-09 at 17.35.25.png><Screen Shot 2018-01-09 at 17.35.18.png><Screen Shot 2018-01-09 at 17.35.00.png>


Reply | Threaded
Open this post in threaded view
|

Re: Stream job failed after increasing number retained checkpoints

Jose Miguel Tejedor Fernandez
Hi,

I wonder what reason you might have that you ever want such a huge number of retained checkpoints? 

The Flink jobs running on EMR cluster require a checkpoint at midnight. (In our use case we need to synch a loaded delta to our a third party partner with the streamed data). The delta load the whole day data and that's why we wanted to have available the midnight's checkpoint to start from there.
We could also make a savepoint at midnight, but it’s not as handy (we would need to build our own tooling to do it), and it can’t benefit from the smaller latency of an incremental checkpoint. Another thining is that implementing our own savepoint tool is a bit hard to monitor. Besides, retaining several having checkpoints created every minute is that it would also allow us to load a delta at any time. Please, if there are better ways of achieving this, let me know.

From where does the log trace come from?  

It comes from the TaskManager.  

Please search on the opposite side of the time outing connection for possible root cause of the timeout including:
- possible error/exceptions/warnings
- long GC pauses or other blocking operations (possibly long unnatural gaps in the logs)
- machine health (CPU usage, disks usage, network connections)

It seems that TaskManager disconnect from JobManager and then cannot reach it again and I cannot tell the reason. I think machine health metrics mentioned above seems to be OK. Would you say Direct memory stats usage is correct? What is the way to check the GC pauses?
Those are some traces from the TaskManager log, before/after it detached from JobManager

2018-01-08 22:26:37,263 INFO  org.apache.flink.runtime.taskmanager.TaskManager              - Garbage collector stats: [PS Scavenge, GC TIME (ms): 31476, GC COUNT: 923], [PS MarkSweep, GC TIME (ms): 10999, GC COUNT: 36]
2018-01-08 22:26:42,263 INFO  org.apache.flink.runtime.taskmanager.TaskManager              - Memory usage stats: [HEAP: 868/5597/5597 MB, NON HEAP: 116/119/-1 MB (used/committed/max)]
2018-01-08 22:26:42,263 INFO  org.apache.flink.runtime.taskmanager.TaskManager              - Direct memory stats: Count: 100, Total Capacity: 29942814, Used Memory: 29942815
2018-01-08 22:26:42,263 INFO  org.apache.flink.runtime.taskmanager.TaskManager              - Off-heap pool stats: [Code Cache: 42/42/240 MB (used/committed/max)], [Metaspace: 66/68/-1 MB (used/committed/max)], [Compressed Class Space: 8/8/1024 MB (used/committed/max)]
2018-01-08 22:26:42,264 INFO  org.apache.flink.runtime.taskmanager.TaskManager              - Garbage collector stats: [PS Scavenge, GC TIME (ms): 31476, GC COUNT: 923], [PS MarkSweep, GC TIME (ms): 10999, GC COUNT: 36]
2018-01-08 22:26:42,999 WARN  akka.remote.RemoteWatcher                                     - Detected unreachable: [akka.tcp://flink@...:35341]
2018-01-08 22:26:43,034 INFO  org.apache.flink.yarn.YarnTaskManager                         - TaskManager akka://flink/user/taskmanager disconnects from JobManager akka.tcp://flink@...:35341/user/jobmanager: JobManager is no longer reachable
2018-01-08 22:26:43,035 INFO  org.apache.flink.yarn.YarnTaskManager                         - Cancelling all computations and discarding all cached data.
2018-01-08 22:26:43,037 INFO  org.apache.flink.runtime.taskmanager.Task                     - Attempting to fail task externally Sink: Discarded events (4/4) (50b6fc8908a4b13dbbe73f4686beda7d).
2018-01-08 22:26:43,037 INFO  org.apache.flink.runtime.taskmanager.Task                     - Sink: Discarded events (4/4) (50b6fc8908a4b13dbbe73f4686beda7d) switched from RUNNING to FAILED.
java.lang.Exception: TaskManager akka://flink/user/taskmanager disconnects from JobManager akka.tcp://flink@...:35341/user/jobmanager: JobManager is no longer reachable
at org.apache.flink.runtime.taskmanager.TaskManager.handleJobManagerDisconnect(TaskManager.scala:1095)
at org.apache.flink.runtime.taskmanager.TaskManager$$anonfun$handleMessage$1.applyOrElse(TaskManager.scala:311)
at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
at org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:49)
at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)
at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
at org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28)
at akka.actor.Actor$class.aroundReceive(Actor.scala:467)
at org.apache.flink.runtime.taskmanager.TaskManager.aroundReceive(TaskManager.scala:120)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
at akka.actor.dungeon.DeathWatch$class.receivedTerminated(DeathWatch.scala:44)
at akka.actor.ActorCell.receivedTerminated(ActorCell.scala:369)
at akka.actor.ActorCell.autoReceiveMessage(ActorCell.scala:501)
at akka.actor.ActorCell.invoke(ActorCell.scala:486)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
at akka.dispatch.Mailbox.run(Mailbox.scala:220)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
2018-01-08 22:26:43,069 INFO  org.apache.flink.runtime.taskmanager.Task                     - Triggering cancellation of task code Sink: Discarded events (4/4) (50b6fc8908a4b13dbbe73f4686beda7d).
2018-01-08 22:26:43,087 INFO  org.apache.flink.runtime.taskmanager.Task                     - Attempting to fail task externally Sink: CounterSink (async call completed) (3/4) (b9f2b35e1f9822320cded759c2daea1e).


José Miguel Tejedor Fernández
Server developer
[hidden email]

Rovio Entertainment Ltd.
Keilaranta 7, FIN - 02150 Espoo, Finland
www.rovio.com



On Wed, Jan 10, 2018 at 10:50 AM, Stefan Richter <[hidden email]> wrote:
Hi,

there is no known limitation in the strict sense, but you might run out of dfs space or job manager memory if you keep around a huge number checkpoints. I wonder what reason you might have that you ever want such a huge number of retained checkpoints? Usually keeping one checkpoint should do the job, maybe a couple more if you are very afraid about corruption that goes beyond your DFSs capabilities to handle it. Is there any reason for that or maybe a misconception about increasing the number of retained checkpoints is good for?

Best,
Stefan 


Am 10.01.2018 um 08:54 schrieb Piotr Nowojski <[hidden email]>:

Hi,

Increasing akka’s timeouts is rarely a solution for any problems - it either do not help, or just mask the issue making it less visible. But yes, it is possible to bump the limits: https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/config.html#distributed-coordination-via-akka

I don’t think that state.checkpoints.num-retained was thought to handle such large numbers of retained checkpoint so maybe there are some known/unknown limitations. Stefan, do you know something in this regard?

Parallel thing to do is that like for any other akka timeout, you should track down the root cause of it. This one warning line doesn’t tell much. From where does it come from? Client log? Job manager log? Task manager log? Please search on the opposite side of the time outing connection for possible root cause of the timeout including:
- possible error/exceptions/warnings
- long GC pauses or other blocking operations (possibly long unnatural gaps in the logs)
- machine health (CPU usage, disks usage, network connections)

Piotrek

On 9 Jan 2018, at 16:38, Jose Miguel Tejedor Fernandez <[hidden email]> wrote:

Hello,

I have several stream jobs running (v. 1.3.1 ) in production which always fails after a fixed period of around 30h after being executing. That's the WARN trace before failing:

Association with remote system [akka.tcp://flink@ip-10-1-51-134.cloud-internal.acme.com:39876] has failed, address is now gated for [5000] ms. Reason: [Association failed with [akka.tcp://flink@ip-10-1-51-134.cloud-internal.acme.com:39876]] Caused by: [No response from remote for outbound association. Handshake timed out after [20000 ms].

The main change done in the job configuration was to increase the state.checkpoints.num-retained from 1 to 2880. I am using asynchronous RocksDB to persists to snapshot the state. (I attach some screenshots with the  checkpoint conf from webUI)

  • May my assumption be correct that the increase of checkpoints.num-retained is causing the problem? Any known issue regarding this?
  • Besides, Is there any way to increase the Akka handshake timeout from the current 20000 ms to a higher value? I considered that it may be convenient to increase the timeout to 1 minute instead.

BR


<Screen Shot 2018-01-09 at 17.35.25.png><Screen Shot 2018-01-09 at 17.35.18.png><Screen Shot 2018-01-09 at 17.35.00.png>



Reply | Threaded
Open this post in threaded view
|

Re: Stream job failed after increasing number retained checkpoints

Piotr Nowojski
Hi,

This Task Manager log is suggesting that problems lays on the Job Manager side (no visible gap in the logs, GC Time reported is accumulated and 31 seconds accumulated over 963 gc collections is low value). Could you show the Job Manager log itself? Probably it’s the own that’s causing the TaskManager to timeout.

On the other hand, I see that Task Manager max heap size is ~5GB and I assume this is the same setting for the Job manager. A Stefan pointed out, there is some memory overhead on the Job Manager for retaining the checkpoint and it is around couple of hundred bytes (maybe even 1KB) per operator instance. By doing quick math:

2880 checkpoints * 10 task managers * 10 operators in the job * 8 parallelism per task manager * 500 bytes = ~1GB

The answer might be that you just need to increase the Job Manager max heap to retain 2880 checkpoints.

Piotrek

On 10 Jan 2018, at 12:00, Jose Miguel Tejedor Fernandez <[hidden email]> wrote:

Hi,

I wonder what reason you might have that you ever want such a huge number of retained checkpoints? 

The Flink jobs running on EMR cluster require a checkpoint at midnight. (In our use case we need to synch a loaded delta to our a third party partner with the streamed data). The delta load the whole day data and that's why we wanted to have available the midnight's checkpoint to start from there.
We could also make a savepoint at midnight, but it’s not as handy (we would need to build our own tooling to do it), and it can’t benefit from the smaller latency of an incremental checkpoint. Another thining is that implementing our own savepoint tool is a bit hard to monitor. Besides, retaining several having checkpoints created every minute is that it would also allow us to load a delta at any time. Please, if there are better ways of achieving this, let me know.

From where does the log trace come from?  

It comes from the TaskManager.  

Please search on the opposite side of the time outing connection for possible root cause of the timeout including:
- possible error/exceptions/warnings
- long GC pauses or other blocking operations (possibly long unnatural gaps in the logs)
- machine health (CPU usage, disks usage, network connections)

It seems that TaskManager disconnect from JobManager and then cannot reach it again and I cannot tell the reason. I think machine health metrics mentioned above seems to be OK. Would you say Direct memory stats usage is correct? What is the way to check the GC pauses?
Those are some traces from the TaskManager log, before/after it detached from JobManager

2018-01-08 22:26:37,263 INFO  org.apache.flink.runtime.taskmanager.TaskManager              - Garbage collector stats: [PS Scavenge, GC TIME (ms): 31476, GC COUNT: 923], [PS MarkSweep, GC TIME (ms): 10999, GC COUNT: 36]
2018-01-08 22:26:42,263 INFO  org.apache.flink.runtime.taskmanager.TaskManager              - Memory usage stats: [HEAP: 868/5597/5597 MB, NON HEAP: 116/119/-1 MB (used/committed/max)]
2018-01-08 22:26:42,263 INFO  org.apache.flink.runtime.taskmanager.TaskManager              - Direct memory stats: Count: 100, Total Capacity: 29942814, Used Memory: 29942815
2018-01-08 22:26:42,263 INFO  org.apache.flink.runtime.taskmanager.TaskManager              - Off-heap pool stats: [Code Cache: 42/42/240 MB (used/committed/max)], [Metaspace: 66/68/-1 MB (used/committed/max)], [Compressed Class Space: 8/8/1024 MB (used/committed/max)]
2018-01-08 22:26:42,264 INFO  org.apache.flink.runtime.taskmanager.TaskManager              - Garbage collector stats: [PS Scavenge, GC TIME (ms): 31476, GC COUNT: 923], [PS MarkSweep, GC TIME (ms): 10999, GC COUNT: 36]
2018-01-08 22:26:42,999 WARN  akka.remote.RemoteWatcher                                     - Detected unreachable: [akka.tcp://flink@...:35341]
2018-01-08 22:26:43,034 INFO  org.apache.flink.yarn.YarnTaskManager                         - TaskManager <a href="akka://flink/user/taskmanager" class="">akka://flink/user/taskmanager disconnects from JobManager akka.tcp://flink@...:35341/user/jobmanager: JobManager is no longer reachable
2018-01-08 22:26:43,035 INFO  org.apache.flink.yarn.YarnTaskManager                         - Cancelling all computations and discarding all cached data.
2018-01-08 22:26:43,037 INFO  org.apache.flink.runtime.taskmanager.Task                     - Attempting to fail task externally Sink: Discarded events (4/4) (50b6fc8908a4b13dbbe73f4686beda7d).
2018-01-08 22:26:43,037 INFO  org.apache.flink.runtime.taskmanager.Task                     - Sink: Discarded events (4/4) (50b6fc8908a4b13dbbe73f4686beda7d) switched from RUNNING to FAILED.
java.lang.Exception: TaskManager <a href="akka://flink/user/taskmanager" class="">akka://flink/user/taskmanager disconnects from JobManager akka.tcp://flink@...:35341/user/jobmanager: JobManager is no longer reachable
at org.apache.flink.runtime.taskmanager.TaskManager.handleJobManagerDisconnect(TaskManager.scala:1095)
at org.apache.flink.runtime.taskmanager.TaskManager$$anonfun$handleMessage$1.applyOrElse(TaskManager.scala:311)
at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
at org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:49)
at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)
at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
at org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28)
at akka.actor.Actor$class.aroundReceive(Actor.scala:467)
at org.apache.flink.runtime.taskmanager.TaskManager.aroundReceive(TaskManager.scala:120)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
at akka.actor.dungeon.DeathWatch$class.receivedTerminated(DeathWatch.scala:44)
at akka.actor.ActorCell.receivedTerminated(ActorCell.scala:369)
at akka.actor.ActorCell.autoReceiveMessage(ActorCell.scala:501)
at akka.actor.ActorCell.invoke(ActorCell.scala:486)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
at akka.dispatch.Mailbox.run(Mailbox.scala:220)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
2018-01-08 22:26:43,069 INFO  org.apache.flink.runtime.taskmanager.Task                     - Triggering cancellation of task code Sink: Discarded events (4/4) (50b6fc8908a4b13dbbe73f4686beda7d).
2018-01-08 22:26:43,087 INFO  org.apache.flink.runtime.taskmanager.Task                     - Attempting to fail task externally Sink: CounterSink (async call completed) (3/4) (b9f2b35e1f9822320cded759c2daea1e).


José Miguel Tejedor Fernández
Server developer
[hidden email]

Rovio Entertainment Ltd.
Keilaranta 7, FIN - 02150 Espoo, Finland
www.rovio.com



On Wed, Jan 10, 2018 at 10:50 AM, Stefan Richter <[hidden email]> wrote:
Hi,

there is no known limitation in the strict sense, but you might run out of dfs space or job manager memory if you keep around a huge number checkpoints. I wonder what reason you might have that you ever want such a huge number of retained checkpoints? Usually keeping one checkpoint should do the job, maybe a couple more if you are very afraid about corruption that goes beyond your DFSs capabilities to handle it. Is there any reason for that or maybe a misconception about increasing the number of retained checkpoints is good for?

Best,
Stefan 


Am 10.01.2018 um 08:54 schrieb Piotr Nowojski <[hidden email]>:

Hi,

Increasing akka’s timeouts is rarely a solution for any problems - it either do not help, or just mask the issue making it less visible. But yes, it is possible to bump the limits: https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/config.html#distributed-coordination-via-akka

I don’t think that state.checkpoints.num-retained was thought to handle such large numbers of retained checkpoint so maybe there are some known/unknown limitations. Stefan, do you know something in this regard?

Parallel thing to do is that like for any other akka timeout, you should track down the root cause of it. This one warning line doesn’t tell much. From where does it come from? Client log? Job manager log? Task manager log? Please search on the opposite side of the time outing connection for possible root cause of the timeout including:
- possible error/exceptions/warnings
- long GC pauses or other blocking operations (possibly long unnatural gaps in the logs)
- machine health (CPU usage, disks usage, network connections)

Piotrek

On 9 Jan 2018, at 16:38, Jose Miguel Tejedor Fernandez <[hidden email]> wrote:

Hello,

I have several stream jobs running (v. 1.3.1 ) in production which always fails after a fixed period of around 30h after being executing. That's the WARN trace before failing:

Association with remote system [akka.tcp://flink@ip-10-1-51-134.cloud-internal.acme.com:39876] has failed, address is now gated for [5000] ms. Reason: [Association failed with [akka.tcp://flink@ip-10-1-51-134.cloud-internal.acme.com:39876]] Caused by: [No response from remote for outbound association. Handshake timed out after [20000 ms].

The main change done in the job configuration was to increase the state.checkpoints.num-retained from 1 to 2880. I am using asynchronous RocksDB to persists to snapshot the state. (I attach some screenshots with the  checkpoint conf from webUI)

  • May my assumption be correct that the increase of checkpoints.num-retained is causing the problem? Any known issue regarding this?
  • Besides, Is there any way to increase the Akka handshake timeout from the current 20000 ms to a higher value? I considered that it may be convenient to increase the timeout to 1 minute instead.

BR


<Screen Shot 2018-01-09 at 17.35.25.png><Screen Shot 2018-01-09 at 17.35.18.png><Screen Shot 2018-01-09 at 17.35.00.png>




Reply | Threaded
Open this post in threaded view
|

Re: Stream job failed after increasing number retained checkpoints

Jose Miguel Tejedor Fernandez
Thanks Piotr and Stefan,

The problem was the overhead in the heap memory usage of the JobManager when increasing the num-retained checkpoints. It was solved once I revert that value to one.

BR

That's the actual error according to the JobManager log in the OOM:

2018-01-08 22:27:09,293 WARN  org.jboss.netty.channel.socket.nio.AbstractNioSelector        - Unexpected exception in the selector loop.
java.lang.OutOfMemoryError: Java heap space
2018-01-08 22:27:15,796 ERROR akka.actor.ActorSystemImpl                                    - Uncaught error from thread [flink-akka.actor.default-dispatcher-22840] shutting down JVM since 'akka.jvm-exit-on-fatal-error' is enabled
java.lang.OutOfMemoryError: Java heap space
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.growArray(ForkJoinPool.java:1090)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1978)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
2018-01-08 22:27:16,288 ERROR akka.actor.ActorSystemImpl                                    - Uncaught error from thread [flink-akka.remote.default-remote-dispatcher-22839] shutting down JVM since 'akka.jvm-exit-on-fatal-error' is enabled
java.lang.OutOfMemoryError: Java heap space
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.growArray(ForkJoinPool.java:1090)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1978)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
2018-01-08 22:27:16,882 INFO  org.apache.flink.runtime.webmonitor.WebRuntimeMonitor         - Removing web dashboard root cache directory /tmp/flink-web-f75e187d-3d08-4864-ba08-1740c8586be1
2018-01-08 22:27:17,394 INFO  org.apache.flink.runtime.webmonitor.WebRuntimeMonitor         - Removing web dashboard jar upload directory /tmp/flink-web-2c8657f2-9b87-4964-bde4-9997ef31966d
2018-01-08 22:27:19,863 INFO  org.apache.flink.runtime.blob.BlobServer                      - Stopped BLOB server at 0.0.0.0:44378



here’s an test with Abba after it has accumulated state for 21 hours
it seems that creating a nightly savepoint won’t necessarily be scalable
so being able to use incremental checkpoints would still seem appealing, if possible
or why not making a savepoint somehow by copying the required files of a checkpoint instead - but I doubt that flink would support that


José Miguel Tejedor Fernández
Server developer
[hidden email]

Rovio Entertainment Ltd.
Keilaranta 7, FIN - 02150 Espoo, Finland
www.rovio.com



On Wed, Jan 10, 2018 at 3:08 PM, Piotr Nowojski <[hidden email]> wrote:
Hi,

This Task Manager log is suggesting that problems lays on the Job Manager side (no visible gap in the logs, GC Time reported is accumulated and 31 seconds accumulated over 963 gc collections is low value). Could you show the Job Manager log itself? Probably it’s the own that’s causing the TaskManager to timeout.

On the other hand, I see that Task Manager max heap size is ~5GB and I assume this is the same setting for the Job manager. A Stefan pointed out, there is some memory overhead on the Job Manager for retaining the checkpoint and it is around couple of hundred bytes (maybe even 1KB) per operator instance. By doing quick math:

2880 checkpoints * 10 task managers * 10 operators in the job * 8 parallelism per task manager * 500 bytes = ~1GB

The answer might be that you just need to increase the Job Manager max heap to retain 2880 checkpoints.

Piotrek

On 10 Jan 2018, at 12:00, Jose Miguel Tejedor Fernandez <[hidden email]> wrote:

Hi,

I wonder what reason you might have that you ever want such a huge number of retained checkpoints? 

The Flink jobs running on EMR cluster require a checkpoint at midnight. (In our use case we need to synch a loaded delta to our a third party partner with the streamed data). The delta load the whole day data and that's why we wanted to have available the midnight's checkpoint to start from there.
We could also make a savepoint at midnight, but it’s not as handy (we would need to build our own tooling to do it), and it can’t benefit from the smaller latency of an incremental checkpoint. Another thining is that implementing our own savepoint tool is a bit hard to monitor. Besides, retaining several having checkpoints created every minute is that it would also allow us to load a delta at any time. Please, if there are better ways of achieving this, let me know.

From where does the log trace come from?  

It comes from the TaskManager.  

Please search on the opposite side of the time outing connection for possible root cause of the timeout including:
- possible error/exceptions/warnings
- long GC pauses or other blocking operations (possibly long unnatural gaps in the logs)
- machine health (CPU usage, disks usage, network connections)

It seems that TaskManager disconnect from JobManager and then cannot reach it again and I cannot tell the reason. I think machine health metrics mentioned above seems to be OK. Would you say Direct memory stats usage is correct? What is the way to check the GC pauses?
Those are some traces from the TaskManager log, before/after it detached from JobManager

2018-01-08 22:26:37,263 INFO  org.apache.flink.runtime.taskmanager.TaskManager              - Garbage collector stats: [PS Scavenge, GC TIME (ms): 31476, GC COUNT: 923], [PS MarkSweep, GC TIME (ms): 10999, GC COUNT: 36]
2018-01-08 22:26:42,263 INFO  org.apache.flink.runtime.taskmanager.TaskManager              - Memory usage stats: [HEAP: 868/5597/5597 MB, NON HEAP: 116/119/-1 MB (used/committed/max)]
2018-01-08 22:26:42,263 INFO  org.apache.flink.runtime.taskmanager.TaskManager              - Direct memory stats: Count: 100, Total Capacity: 29942814, Used Memory: 29942815
2018-01-08 22:26:42,263 INFO  org.apache.flink.runtime.taskmanager.TaskManager              - Off-heap pool stats: [Code Cache: 42/42/240 MB (used/committed/max)], [Metaspace: 66/68/-1 MB (used/committed/max)], [Compressed Class Space: 8/8/1024 MB (used/committed/max)]
2018-01-08 22:26:42,264 INFO  org.apache.flink.runtime.taskmanager.TaskManager              - Garbage collector stats: [PS Scavenge, GC TIME (ms): 31476, GC COUNT: 923], [PS MarkSweep, GC TIME (ms): 10999, GC COUNT: 36]
2018-01-08 22:26:42,999 WARN  akka.remote.RemoteWatcher                                     - Detected unreachable: [akka.tcp://flink@ip-10-1-51-209.cloud-internal.rovio.com:35341]
2018-01-08 22:26:43,034 INFO  org.apache.flink.yarn.YarnTaskManager                         - TaskManager akka://flink/user/taskmanager disconnects from JobManager akka.tcp://flink@ip-10-1-51-209.cloud-internal.rovio.com:35341/user/jobmanager: JobManager is no longer reachable
2018-01-08 22:26:43,035 INFO  org.apache.flink.yarn.YarnTaskManager                         - Cancelling all computations and discarding all cached data.
2018-01-08 22:26:43,037 INFO  org.apache.flink.runtime.taskmanager.Task                     - Attempting to fail task externally Sink: Discarded events (4/4) (50b6fc8908a4b13dbbe73f4686beda7d).
2018-01-08 22:26:43,037 INFO  org.apache.flink.runtime.taskmanager.Task                     - Sink: Discarded events (4/4) (50b6fc8908a4b13dbbe73f4686beda7d) switched from RUNNING to FAILED.
java.lang.Exception: TaskManager akka://flink/user/taskmanager disconnects from JobManager akka.tcp://flink@ip-10-1-51-209.cloud-internal.rovio.com:35341/user/jobmanager: JobManager is no longer reachable
at org.apache.flink.runtime.taskmanager.TaskManager.handleJobManagerDisconnect(TaskManager.scala:1095)
at org.apache.flink.runtime.taskmanager.TaskManager$$anonfun$handleMessage$1.applyOrElse(TaskManager.scala:311)
at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
at org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:49)
at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)
at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
at org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28)
at akka.actor.Actor$class.aroundReceive(Actor.scala:467)
at org.apache.flink.runtime.taskmanager.TaskManager.aroundReceive(TaskManager.scala:120)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
at akka.actor.dungeon.DeathWatch$class.receivedTerminated(DeathWatch.scala:44)
at akka.actor.ActorCell.receivedTerminated(ActorCell.scala:369)
at akka.actor.ActorCell.autoReceiveMessage(ActorCell.scala:501)
at akka.actor.ActorCell.invoke(ActorCell.scala:486)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
at akka.dispatch.Mailbox.run(Mailbox.scala:220)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
2018-01-08 22:26:43,069 INFO  org.apache.flink.runtime.taskmanager.Task                     - Triggering cancellation of task code Sink: Discarded events (4/4) (50b6fc8908a4b13dbbe73f4686beda7d).
2018-01-08 22:26:43,087 INFO  org.apache.flink.runtime.taskmanager.Task                     - Attempting to fail task externally Sink: CounterSink (async call completed) (3/4) (b9f2b35e1f9822320cded759c2daea1e).


José Miguel Tejedor Fernández
Server developer
[hidden email]

Rovio Entertainment Ltd.
Keilaranta 7, FIN - 02150 Espoo, Finland
www.rovio.com



On Wed, Jan 10, 2018 at 10:50 AM, Stefan Richter <[hidden email]> wrote:
Hi,

there is no known limitation in the strict sense, but you might run out of dfs space or job manager memory if you keep around a huge number checkpoints. I wonder what reason you might have that you ever want such a huge number of retained checkpoints? Usually keeping one checkpoint should do the job, maybe a couple more if you are very afraid about corruption that goes beyond your DFSs capabilities to handle it. Is there any reason for that or maybe a misconception about increasing the number of retained checkpoints is good for?

Best,
Stefan 


Am 10.01.2018 um 08:54 schrieb Piotr Nowojski <[hidden email]>:

Hi,

Increasing akka’s timeouts is rarely a solution for any problems - it either do not help, or just mask the issue making it less visible. But yes, it is possible to bump the limits: https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/config.html#distributed-coordination-via-akka

I don’t think that state.checkpoints.num-retained was thought to handle such large numbers of retained checkpoint so maybe there are some known/unknown limitations. Stefan, do you know something in this regard?

Parallel thing to do is that like for any other akka timeout, you should track down the root cause of it. This one warning line doesn’t tell much. From where does it come from? Client log? Job manager log? Task manager log? Please search on the opposite side of the time outing connection for possible root cause of the timeout including:
- possible error/exceptions/warnings
- long GC pauses or other blocking operations (possibly long unnatural gaps in the logs)
- machine health (CPU usage, disks usage, network connections)

Piotrek

On 9 Jan 2018, at 16:38, Jose Miguel Tejedor Fernandez <[hidden email]> wrote:

Hello,

I have several stream jobs running (v. 1.3.1 ) in production which always fails after a fixed period of around 30h after being executing. That's the WARN trace before failing:

Association with remote system [akka.tcp://flink@ip-10-1-51-134.cloud-internal.acme.com:39876] has failed, address is now gated for [5000] ms. Reason: [Association failed with [akka.tcp://flink@ip-10-1-51-134.cloud-internal.acme.com:39876]] Caused by: [No response from remote for outbound association. Handshake timed out after [20000 ms].

The main change done in the job configuration was to increase the state.checkpoints.num-retained from 1 to 2880. I am using asynchronous RocksDB to persists to snapshot the state. (I attach some screenshots with the  checkpoint conf from webUI)

  • May my assumption be correct that the increase of checkpoints.num-retained is causing the problem? Any known issue regarding this?
  • Besides, Is there any way to increase the Akka handshake timeout from the current 20000 ms to a higher value? I considered that it may be convenient to increase the timeout to 1 minute instead.

BR


<Screen Shot 2018-01-09 at 17.35.25.png><Screen Shot 2018-01-09 at 17.35.18.png><Screen Shot 2018-01-09 at 17.35.00.png>