akka timeout

classic Classic list List threaded Threaded
15 messages Options
Reply | Threaded
Open this post in threaded view
|

akka timeout

Steven Wu

We have set akka.ask.timeout to 60 s in yaml file. I also confirmed the setting in Flink UI. But I saw akka timeout of 10 s for metric query service. two questions
1) why doesn't metric query use the 60 s value configured in yaml file? does it always use default 10 s value?
2) could this cause heartbeat failure between task manager and job manager? or is this jut non-critical failure that won't affect job health?

Thanks,
Steven

2017-08-17 23:34:33,421 WARN org.apache.flink.runtime.webmonitor.metrics.MetricFetcher - Fetching metrics failed. akka.pattern.AskTimeoutException: Ask timed out on [Actor[akka.tcp://flink@1.2.3.4:39139/user/MetricQueryService_23cd9db754bb7d123d80e6b1c0be21d6]] after [10000 ms] at akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:334) at akka.actor.Scheduler$$anon$7.run(Scheduler.scala:117) at scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:599) at scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109) at scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:597) at akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(Scheduler.scala:474) at akka.actor.LightArrayRevolverScheduler$$anon$8.executeBucket$1(Scheduler.scala:425) at akka.actor.LightArrayRevolverScheduler$$anon$8.nextTick(Scheduler.scala:429) at akka.actor.LightArrayRevolverScheduler$$anon$8.run(Scheduler.scala:381) at java.lang.Thread.run(Thread.java:748)
Reply | Threaded
Open this post in threaded view
|

Re: akka timeout

Till Rohrmann
Hi Steven,

I thought that the MetricFetcher picks up the right timeout from the configuration. Which version of Flink are you using?

The timeout is not a critical problem for the job health.

Cheers,
Till

On Fri, Aug 18, 2017 at 7:22 AM, Steven Wu <[hidden email]> wrote:

We have set akka.ask.timeout to 60 s in yaml file. I also confirmed the setting in Flink UI. But I saw akka timeout of 10 s for metric query service. two questions
1) why doesn't metric query use the 60 s value configured in yaml file? does it always use default 10 s value?
2) could this cause heartbeat failure between task manager and job manager? or is this jut non-critical failure that won't affect job health?

Thanks,
Steven

2017-08-17 23:34:33,421 WARN org.apache.flink.runtime.webmonitor.metrics.MetricFetcher - Fetching metrics failed. akka.pattern.AskTimeoutException: Ask timed out on [Actor[akka.tcp://flink@1.2.3.4:39139/user/MetricQueryService_23cd9db754bb7d123d80e6b1c0be21d6]] after [10000 ms] at akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:334) at akka.actor.Scheduler$$anon$7.run(Scheduler.scala:117) at scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:599) at scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109) at scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:597) at akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(Scheduler.scala:474) at akka.actor.LightArrayRevolverScheduler$$anon$8.executeBucket$1(Scheduler.scala:425) at akka.actor.LightArrayRevolverScheduler$$anon$8.nextTick(Scheduler.scala:429) at akka.actor.LightArrayRevolverScheduler$$anon$8.run(Scheduler.scala:381) at java.lang.Thread.run(Thread.java:748)

Reply | Threaded
Open this post in threaded view
|

Re: akka timeout

Chesnay Schepler
The MetricFetcher always use the default akka timeout value.

On 18.08.2017 09:07, Till Rohrmann wrote:
Hi Steven,

I thought that the MetricFetcher picks up the right timeout from the configuration. Which version of Flink are you using?

The timeout is not a critical problem for the job health.

Cheers,
Till

On Fri, Aug 18, 2017 at 7:22 AM, Steven Wu <[hidden email]> wrote:

We have set akka.ask.timeout to 60 s in yaml file. I also confirmed the setting in Flink UI. But I saw akka timeout of 10 s for metric query service. two questions
1) why doesn't metric query use the 60 s value configured in yaml file? does it always use default 10 s value?
2) could this cause heartbeat failure between task manager and job manager? or is this jut non-critical failure that won't affect job health?

Thanks,
Steven

2017-08-17 23:34:33,421 WARN org.apache.flink.runtime.webmonitor.metrics.MetricFetcher - Fetching metrics failed. akka.pattern.AskTimeoutException: Ask timed out on [Actor[akka.tcp://flink@1.2.3.4:39139/user/MetricQueryService_23cd9db754bb7d123d80e6b1c0be21d6]] after [10000 ms] at akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:334) at akka.actor.Scheduler$$anon$7.run(Scheduler.scala:117) at scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:599) at scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109) at scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:597) at akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(Scheduler.scala:474) at akka.actor.LightArrayRevolverScheduler$$anon$8.executeBucket$1(Scheduler.scala:425) at akka.actor.LightArrayRevolverScheduler$$anon$8.nextTick(Scheduler.scala:429) at akka.actor.LightArrayRevolverScheduler$$anon$8.run(Scheduler.scala:381) at java.lang.Thread.run(Thread.java:748)


Reply | Threaded
Open this post in threaded view
|

Re: akka timeout

Steven Wu
Till/Chesnay, thanks for the answers. Look like this is a result/symptom of underline stability issue that I am trying to track down.

It is Flink 1.2.

On Fri, Aug 18, 2017 at 12:24 AM, Chesnay Schepler <[hidden email]> wrote:
The MetricFetcher always use the default akka timeout value.


On 18.08.2017 09:07, Till Rohrmann wrote:
Hi Steven,

I thought that the MetricFetcher picks up the right timeout from the configuration. Which version of Flink are you using?

The timeout is not a critical problem for the job health.

Cheers,
Till

On Fri, Aug 18, 2017 at 7:22 AM, Steven Wu <[hidden email]> wrote:

We have set akka.ask.timeout to 60 s in yaml file. I also confirmed the setting in Flink UI. But I saw akka timeout of 10 s for metric query service. two questions
1) why doesn't metric query use the 60 s value configured in yaml file? does it always use default 10 s value?
2) could this cause heartbeat failure between task manager and job manager? or is this jut non-critical failure that won't affect job health?

Thanks,
Steven

2017-08-17 23:34:33,421 WARN org.apache.flink.runtime.webmonitor.metrics.MetricFetcher - Fetching metrics failed. akka.pattern.AskTimeoutException: Ask timed out on [Actor[akka.tcp://flink@1.2.3.4:39139/user/MetricQueryService_23cd9db754bb7d123d80e6b1c0be21d6]] after [10000 ms] at akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:334) at akka.actor.Scheduler$$anon$7.run(Scheduler.scala:117) at scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:599) at scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109) at scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:597) at akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(Scheduler.scala:474) at akka.actor.LightArrayRevolverScheduler$$anon$8.executeBucket$1(Scheduler.scala:425) at akka.actor.LightArrayRevolverScheduler$$anon$8.nextTick(Scheduler.scala:429) at akka.actor.LightArrayRevolverScheduler$$anon$8.run(Scheduler.scala:381) at java.lang.Thread.run(Thread.java:748)



Reply | Threaded
Open this post in threaded view
|

Re: akka timeout

Till Rohrmann-2
Hi Steven,

quick correction for Flink 1.2. Indeed the MetricFetcher does not pick up the right timeout value from the configuration. Instead it uses a hardcoded 10s timeout. This has only been changed recently and is already committed in the master. So with the next release 1.4 it will properly pick up the right timeout settings.

Just out of curiosity, what's the instability issue you're observing?

Cheers,
Till

On Fri, Aug 18, 2017 at 7:07 PM, Steven Wu <[hidden email]> wrote:
Till/Chesnay, thanks for the answers. Look like this is a result/symptom of underline stability issue that I am trying to track down.

It is Flink 1.2.

On Fri, Aug 18, 2017 at 12:24 AM, Chesnay Schepler <[hidden email]> wrote:
The MetricFetcher always use the default akka timeout value.


On 18.08.2017 09:07, Till Rohrmann wrote:
Hi Steven,

I thought that the MetricFetcher picks up the right timeout from the configuration. Which version of Flink are you using?

The timeout is not a critical problem for the job health.

Cheers,
Till

On Fri, Aug 18, 2017 at 7:22 AM, Steven Wu <[hidden email]> wrote:

We have set akka.ask.timeout to 60 s in yaml file. I also confirmed the setting in Flink UI. But I saw akka timeout of 10 s for metric query service. two questions
1) why doesn't metric query use the 60 s value configured in yaml file? does it always use default 10 s value?
2) could this cause heartbeat failure between task manager and job manager? or is this jut non-critical failure that won't affect job health?

Thanks,
Steven

2017-08-17 23:34:33,421 WARN org.apache.flink.runtime.webmonitor.metrics.MetricFetcher - Fetching metrics failed. akka.pattern.AskTimeoutException: Ask timed out on [Actor[akka.tcp://flink@1.2.3.4:39139/user/MetricQueryService_23cd9db754bb7d123d80e6b1c0be21d6]] after [10000 ms] at akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:334) at akka.actor.Scheduler$$anon$7.run(Scheduler.scala:117) at scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:599) at scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109) at scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:597) at akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(Scheduler.scala:474) at akka.actor.LightArrayRevolverScheduler$$anon$8.executeBucket$1(Scheduler.scala:425) at akka.actor.LightArrayRevolverScheduler$$anon$8.nextTick(Scheduler.scala:429) at akka.actor.LightArrayRevolverScheduler$$anon$8.run(Scheduler.scala:381) at java.lang.Thread.run(Thread.java:748)




Reply | Threaded
Open this post in threaded view
|

Re: akka timeout

Steven Wu
Till,

Once our job was restarted for some reason (e.g. taskmangaer container got killed), it can stuck in continuous restart loop for hours. Right now, I suspect it is caused by GC pause during restart, our job has very high memory allocation in steady state. High GC pause then caused akka timeout, which then caused jobmanager to think taksmanager containers are unhealthy/dead and kill them. And the cycle repeats...

But I hasn't been able to prove or disprove it yet. When I was asking the question, I was still sifting through metrics and error logs.

Thanks,
Steven


On Tue, Aug 22, 2017 at 1:21 AM, Till Rohrmann <[hidden email]> wrote:
Hi Steven,

quick correction for Flink 1.2. Indeed the MetricFetcher does not pick up the right timeout value from the configuration. Instead it uses a hardcoded 10s timeout. This has only been changed recently and is already committed in the master. So with the next release 1.4 it will properly pick up the right timeout settings.

Just out of curiosity, what's the instability issue you're observing?

Cheers,
Till

On Fri, Aug 18, 2017 at 7:07 PM, Steven Wu <[hidden email]> wrote:
Till/Chesnay, thanks for the answers. Look like this is a result/symptom of underline stability issue that I am trying to track down.

It is Flink 1.2.

On Fri, Aug 18, 2017 at 12:24 AM, Chesnay Schepler <[hidden email]> wrote:
The MetricFetcher always use the default akka timeout value.


On 18.08.2017 09:07, Till Rohrmann wrote:
Hi Steven,

I thought that the MetricFetcher picks up the right timeout from the configuration. Which version of Flink are you using?

The timeout is not a critical problem for the job health.

Cheers,
Till

On Fri, Aug 18, 2017 at 7:22 AM, Steven Wu <[hidden email]> wrote:

We have set akka.ask.timeout to 60 s in yaml file. I also confirmed the setting in Flink UI. But I saw akka timeout of 10 s for metric query service. two questions
1) why doesn't metric query use the 60 s value configured in yaml file? does it always use default 10 s value?
2) could this cause heartbeat failure between task manager and job manager? or is this jut non-critical failure that won't affect job health?

Thanks,
Steven

2017-08-17 23:34:33,421 WARN org.apache.flink.runtime.webmonitor.metrics.MetricFetcher - Fetching metrics failed. akka.pattern.AskTimeoutException: Ask timed out on [Actor[akka.tcp://flink@1.2.3.4:39139/user/MetricQueryService_23cd9db754bb7d123d80e6b1c0be21d6]] after [10000 ms] at akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:334) at akka.actor.Scheduler$$anon$7.run(Scheduler.scala:117) at scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:599) at scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109) at scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:597) at akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(Scheduler.scala:474) at akka.actor.LightArrayRevolverScheduler$$anon$8.executeBucket$1(Scheduler.scala:425) at akka.actor.LightArrayRevolverScheduler$$anon$8.nextTick(Scheduler.scala:429) at akka.actor.LightArrayRevolverScheduler$$anon$8.run(Scheduler.scala:381) at java.lang.Thread.run(Thread.java:748)





Reply | Threaded
Open this post in threaded view
|

Re: akka timeout

bowen.li
Hi Steven,
    Yes, GC is a big overhead, it may cause your CPU utilization to reach 100%, and every process stopped working. We ran into this a while too.

    How much memory did you assign to TaskManager? How much the your CPU utilization when your taskmanager is considered 'killed'?

Bowen



On Wed, Aug 23, 2017 at 10:01 AM, Steven Wu <[hidden email]> wrote:
Till,

Once our job was restarted for some reason (e.g. taskmangaer container got killed), it can stuck in continuous restart loop for hours. Right now, I suspect it is caused by GC pause during restart, our job has very high memory allocation in steady state. High GC pause then caused akka timeout, which then caused jobmanager to think taksmanager containers are unhealthy/dead and kill them. And the cycle repeats...

But I hasn't been able to prove or disprove it yet. When I was asking the question, I was still sifting through metrics and error logs.

Thanks,
Steven


On Tue, Aug 22, 2017 at 1:21 AM, Till Rohrmann <[hidden email]> wrote:
Hi Steven,

quick correction for Flink 1.2. Indeed the MetricFetcher does not pick up the right timeout value from the configuration. Instead it uses a hardcoded 10s timeout. This has only been changed recently and is already committed in the master. So with the next release 1.4 it will properly pick up the right timeout settings.

Just out of curiosity, what's the instability issue you're observing?

Cheers,
Till

On Fri, Aug 18, 2017 at 7:07 PM, Steven Wu <[hidden email]> wrote:
Till/Chesnay, thanks for the answers. Look like this is a result/symptom of underline stability issue that I am trying to track down.

It is Flink 1.2.

On Fri, Aug 18, 2017 at 12:24 AM, Chesnay Schepler <[hidden email]> wrote:
The MetricFetcher always use the default akka timeout value.


On 18.08.2017 09:07, Till Rohrmann wrote:
Hi Steven,

I thought that the MetricFetcher picks up the right timeout from the configuration. Which version of Flink are you using?

The timeout is not a critical problem for the job health.

Cheers,
Till

On Fri, Aug 18, 2017 at 7:22 AM, Steven Wu <[hidden email]> wrote:

We have set akka.ask.timeout to 60 s in yaml file. I also confirmed the setting in Flink UI. But I saw akka timeout of 10 s for metric query service. two questions
1) why doesn't metric query use the 60 s value configured in yaml file? does it always use default 10 s value?
2) could this cause heartbeat failure between task manager and job manager? or is this jut non-critical failure that won't affect job health?

Thanks,
Steven

2017-08-17 23:34:33,421 WARN org.apache.flink.runtime.webmonitor.metrics.MetricFetcher - Fetching metrics failed. akka.pattern.AskTimeoutException: Ask timed out on [Actor[akka.tcp://flink@1.2.3.4:39139/user/MetricQueryService_23cd9db754bb7d123d80e6b1c0be21d6]] after [10000 ms] at akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:334) at akka.actor.Scheduler$$anon$7.run(Scheduler.scala:117) at scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:599) at scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109) at scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:597) at akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(Scheduler.scala:474) at akka.actor.LightArrayRevolverScheduler$$anon$8.executeBucket$1(Scheduler.scala:425) at akka.actor.LightArrayRevolverScheduler$$anon$8.nextTick(Scheduler.scala:429) at akka.actor.LightArrayRevolverScheduler$$anon$8.run(Scheduler.scala:381) at java.lang.Thread.run(Thread.java:748)






Reply | Threaded
Open this post in threaded view
|

Re: akka timeout

Steven Wu
Bowen, 

Heap size is ~50G. CPU was actually pretty low (like <20%) when high GC pause and akka timeout was happening. So maybe memory allocation and GC wasn't really an issue. I also recently learned that JVM can pause for writing to GC log for disk I/O. that is another lead I am pursuing.

Thanks,
Steven

On Wed, Aug 23, 2017 at 10:58 AM, Bowen Li <[hidden email]> wrote:
Hi Steven,
    Yes, GC is a big overhead, it may cause your CPU utilization to reach 100%, and every process stopped working. We ran into this a while too.

    How much memory did you assign to TaskManager? How much the your CPU utilization when your taskmanager is considered 'killed'?

Bowen



On Wed, Aug 23, 2017 at 10:01 AM, Steven Wu <[hidden email]> wrote:
Till,

Once our job was restarted for some reason (e.g. taskmangaer container got killed), it can stuck in continuous restart loop for hours. Right now, I suspect it is caused by GC pause during restart, our job has very high memory allocation in steady state. High GC pause then caused akka timeout, which then caused jobmanager to think taksmanager containers are unhealthy/dead and kill them. And the cycle repeats...

But I hasn't been able to prove or disprove it yet. When I was asking the question, I was still sifting through metrics and error logs.

Thanks,
Steven


On Tue, Aug 22, 2017 at 1:21 AM, Till Rohrmann <[hidden email]> wrote:
Hi Steven,

quick correction for Flink 1.2. Indeed the MetricFetcher does not pick up the right timeout value from the configuration. Instead it uses a hardcoded 10s timeout. This has only been changed recently and is already committed in the master. So with the next release 1.4 it will properly pick up the right timeout settings.

Just out of curiosity, what's the instability issue you're observing?

Cheers,
Till

On Fri, Aug 18, 2017 at 7:07 PM, Steven Wu <[hidden email]> wrote:
Till/Chesnay, thanks for the answers. Look like this is a result/symptom of underline stability issue that I am trying to track down.

It is Flink 1.2.

On Fri, Aug 18, 2017 at 12:24 AM, Chesnay Schepler <[hidden email]> wrote:
The MetricFetcher always use the default akka timeout value.


On 18.08.2017 09:07, Till Rohrmann wrote:
Hi Steven,

I thought that the MetricFetcher picks up the right timeout from the configuration. Which version of Flink are you using?

The timeout is not a critical problem for the job health.

Cheers,
Till

On Fri, Aug 18, 2017 at 7:22 AM, Steven Wu <[hidden email]> wrote:

We have set akka.ask.timeout to 60 s in yaml file. I also confirmed the setting in Flink UI. But I saw akka timeout of 10 s for metric query service. two questions
1) why doesn't metric query use the 60 s value configured in yaml file? does it always use default 10 s value?
2) could this cause heartbeat failure between task manager and job manager? or is this jut non-critical failure that won't affect job health?

Thanks,
Steven

2017-08-17 23:34:33,421 WARN org.apache.flink.runtime.webmonitor.metrics.MetricFetcher - Fetching metrics failed. akka.pattern.AskTimeoutException: Ask timed out on [Actor[akka.tcp://flink@1.2.3.4:39139/user/MetricQueryService_23cd9db754bb7d123d80e6b1c0be21d6]] after [10000 ms] at akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:334) at akka.actor.Scheduler$$anon$7.run(Scheduler.scala:117) at scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:599) at scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109) at scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:597) at akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(Scheduler.scala:474) at akka.actor.LightArrayRevolverScheduler$$anon$8.executeBucket$1(Scheduler.scala:425) at akka.actor.LightArrayRevolverScheduler$$anon$8.nextTick(Scheduler.scala:429) at akka.actor.LightArrayRevolverScheduler$$anon$8.run(Scheduler.scala:381) at java.lang.Thread.run(Thread.java:748)







Reply | Threaded
Open this post in threaded view
|

Re: akka timeout

rmetzger0
Hi,
are you using the RocksDB state backend already? 
Maybe writing the state to disk would actually reduce the pressure on the GC (but of course it'll also reduce throughput a bit).

Are there any known issues with the network? Maybe the network bursts on restart cause the timeouts?

On Fri, Aug 25, 2017 at 6:17 PM, Steven Wu <[hidden email]> wrote:
Bowen, 

Heap size is ~50G. CPU was actually pretty low (like <20%) when high GC pause and akka timeout was happening. So maybe memory allocation and GC wasn't really an issue. I also recently learned that JVM can pause for writing to GC log for disk I/O. that is another lead I am pursuing.

Thanks,
Steven

On Wed, Aug 23, 2017 at 10:58 AM, Bowen Li <[hidden email]> wrote:
Hi Steven,
    Yes, GC is a big overhead, it may cause your CPU utilization to reach 100%, and every process stopped working. We ran into this a while too.

    How much memory did you assign to TaskManager? How much the your CPU utilization when your taskmanager is considered 'killed'?

Bowen



On Wed, Aug 23, 2017 at 10:01 AM, Steven Wu <[hidden email]> wrote:
Till,

Once our job was restarted for some reason (e.g. taskmangaer container got killed), it can stuck in continuous restart loop for hours. Right now, I suspect it is caused by GC pause during restart, our job has very high memory allocation in steady state. High GC pause then caused akka timeout, which then caused jobmanager to think taksmanager containers are unhealthy/dead and kill them. And the cycle repeats...

But I hasn't been able to prove or disprove it yet. When I was asking the question, I was still sifting through metrics and error logs.

Thanks,
Steven


On Tue, Aug 22, 2017 at 1:21 AM, Till Rohrmann <[hidden email]> wrote:
Hi Steven,

quick correction for Flink 1.2. Indeed the MetricFetcher does not pick up the right timeout value from the configuration. Instead it uses a hardcoded 10s timeout. This has only been changed recently and is already committed in the master. So with the next release 1.4 it will properly pick up the right timeout settings.

Just out of curiosity, what's the instability issue you're observing?

Cheers,
Till

On Fri, Aug 18, 2017 at 7:07 PM, Steven Wu <[hidden email]> wrote:
Till/Chesnay, thanks for the answers. Look like this is a result/symptom of underline stability issue that I am trying to track down.

It is Flink 1.2.

On Fri, Aug 18, 2017 at 12:24 AM, Chesnay Schepler <[hidden email]> wrote:
The MetricFetcher always use the default akka timeout value.


On 18.08.2017 09:07, Till Rohrmann wrote:
Hi Steven,

I thought that the MetricFetcher picks up the right timeout from the configuration. Which version of Flink are you using?

The timeout is not a critical problem for the job health.

Cheers,
Till

On Fri, Aug 18, 2017 at 7:22 AM, Steven Wu <[hidden email]> wrote:

We have set akka.ask.timeout to 60 s in yaml file. I also confirmed the setting in Flink UI. But I saw akka timeout of 10 s for metric query service. two questions
1) why doesn't metric query use the 60 s value configured in yaml file? does it always use default 10 s value?
2) could this cause heartbeat failure between task manager and job manager? or is this jut non-critical failure that won't affect job health?

Thanks,
Steven

2017-08-17 23:34:33,421 WARN org.apache.flink.runtime.webmonitor.metrics.MetricFetcher - Fetching metrics failed. akka.pattern.AskTimeoutException: Ask timed out on [Actor[akka.tcp://flink@1.2.3.4:39139/user/MetricQueryService_23cd9db754bb7d123d80e6b1c0be21d6]] after [10000 ms] at akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:334) at akka.actor.Scheduler$$anon$7.run(Scheduler.scala:117) at scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:599) at scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109) at scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:597) at akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(Scheduler.scala:474) at akka.actor.LightArrayRevolverScheduler$$anon$8.executeBucket$1(Scheduler.scala:425) at akka.actor.LightArrayRevolverScheduler$$anon$8.nextTick(Scheduler.scala:429) at akka.actor.LightArrayRevolverScheduler$$anon$8.run(Scheduler.scala:381) at java.lang.Thread.run(Thread.java:748)








Reply | Threaded
Open this post in threaded view
|

Re: akka timeout

Steven Wu
this is a stateless job. so we don't use RocksDB.

yeah. network can also be a possibility. will keep it in the radar. unfortunately, our metrics system don't have the tcp metrics when running inside containers.

On Fri, Aug 25, 2017 at 2:09 PM, Robert Metzger <[hidden email]> wrote:
Hi,
are you using the RocksDB state backend already? 
Maybe writing the state to disk would actually reduce the pressure on the GC (but of course it'll also reduce throughput a bit).

Are there any known issues with the network? Maybe the network bursts on restart cause the timeouts?


On Fri, Aug 25, 2017 at 6:17 PM, Steven Wu <[hidden email]> wrote:
Bowen, 

Heap size is ~50G. CPU was actually pretty low (like <20%) when high GC pause and akka timeout was happening. So maybe memory allocation and GC wasn't really an issue. I also recently learned that JVM can pause for writing to GC log for disk I/O. that is another lead I am pursuing.

Thanks,
Steven

On Wed, Aug 23, 2017 at 10:58 AM, Bowen Li <[hidden email]> wrote:
Hi Steven,
    Yes, GC is a big overhead, it may cause your CPU utilization to reach 100%, and every process stopped working. We ran into this a while too.

    How much memory did you assign to TaskManager? How much the your CPU utilization when your taskmanager is considered 'killed'?

Bowen



On Wed, Aug 23, 2017 at 10:01 AM, Steven Wu <[hidden email]> wrote:
Till,

Once our job was restarted for some reason (e.g. taskmangaer container got killed), it can stuck in continuous restart loop for hours. Right now, I suspect it is caused by GC pause during restart, our job has very high memory allocation in steady state. High GC pause then caused akka timeout, which then caused jobmanager to think taksmanager containers are unhealthy/dead and kill them. And the cycle repeats...

But I hasn't been able to prove or disprove it yet. When I was asking the question, I was still sifting through metrics and error logs.

Thanks,
Steven


On Tue, Aug 22, 2017 at 1:21 AM, Till Rohrmann <[hidden email]> wrote:
Hi Steven,

quick correction for Flink 1.2. Indeed the MetricFetcher does not pick up the right timeout value from the configuration. Instead it uses a hardcoded 10s timeout. This has only been changed recently and is already committed in the master. So with the next release 1.4 it will properly pick up the right timeout settings.

Just out of curiosity, what's the instability issue you're observing?

Cheers,
Till

On Fri, Aug 18, 2017 at 7:07 PM, Steven Wu <[hidden email]> wrote:
Till/Chesnay, thanks for the answers. Look like this is a result/symptom of underline stability issue that I am trying to track down.

It is Flink 1.2.

On Fri, Aug 18, 2017 at 12:24 AM, Chesnay Schepler <[hidden email]> wrote:
The MetricFetcher always use the default akka timeout value.


On 18.08.2017 09:07, Till Rohrmann wrote:
Hi Steven,

I thought that the MetricFetcher picks up the right timeout from the configuration. Which version of Flink are you using?

The timeout is not a critical problem for the job health.

Cheers,
Till

On Fri, Aug 18, 2017 at 7:22 AM, Steven Wu <[hidden email]> wrote:

We have set akka.ask.timeout to 60 s in yaml file. I also confirmed the setting in Flink UI. But I saw akka timeout of 10 s for metric query service. two questions
1) why doesn't metric query use the 60 s value configured in yaml file? does it always use default 10 s value?
2) could this cause heartbeat failure between task manager and job manager? or is this jut non-critical failure that won't affect job health?

Thanks,
Steven

2017-08-17 23:34:33,421 WARN org.apache.flink.runtime.webmonitor.metrics.MetricFetcher - Fetching metrics failed. akka.pattern.AskTimeoutException: Ask timed out on [Actor[akka.tcp://flink@1.2.3.4:39139/user/MetricQueryService_23cd9db754bb7d123d80e6b1c0be21d6]] after [10000 ms] at akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:334) at akka.actor.Scheduler$$anon$7.run(Scheduler.scala:117) at scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:599) at scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109) at scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:597) at akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(Scheduler.scala:474) at akka.actor.LightArrayRevolverScheduler$$anon$8.executeBucket$1(Scheduler.scala:425) at akka.actor.LightArrayRevolverScheduler$$anon$8.nextTick(Scheduler.scala:429) at akka.actor.LightArrayRevolverScheduler$$anon$8.run(Scheduler.scala:381) at java.lang.Thread.run(Thread.java:748)









Reply | Threaded
Open this post in threaded view
|

Re: akka timeout

Steven Wu
just to close the thread. akka death watch was triggered by high GC pause, which is caused by memory leak in our code during Flink job restart.

noted that akka.ask.timeout wasn't related to akka death watch, which Flink has documented and linked.

On Sat, Aug 26, 2017 at 10:58 AM, Steven Wu <[hidden email]> wrote:
this is a stateless job. so we don't use RocksDB.

yeah. network can also be a possibility. will keep it in the radar. unfortunately, our metrics system don't have the tcp metrics when running inside containers.

On Fri, Aug 25, 2017 at 2:09 PM, Robert Metzger <[hidden email]> wrote:
Hi,
are you using the RocksDB state backend already? 
Maybe writing the state to disk would actually reduce the pressure on the GC (but of course it'll also reduce throughput a bit).

Are there any known issues with the network? Maybe the network bursts on restart cause the timeouts?


On Fri, Aug 25, 2017 at 6:17 PM, Steven Wu <[hidden email]> wrote:
Bowen, 

Heap size is ~50G. CPU was actually pretty low (like <20%) when high GC pause and akka timeout was happening. So maybe memory allocation and GC wasn't really an issue. I also recently learned that JVM can pause for writing to GC log for disk I/O. that is another lead I am pursuing.

Thanks,
Steven

On Wed, Aug 23, 2017 at 10:58 AM, Bowen Li <[hidden email]> wrote:
Hi Steven,
    Yes, GC is a big overhead, it may cause your CPU utilization to reach 100%, and every process stopped working. We ran into this a while too.

    How much memory did you assign to TaskManager? How much the your CPU utilization when your taskmanager is considered 'killed'?

Bowen



On Wed, Aug 23, 2017 at 10:01 AM, Steven Wu <[hidden email]> wrote:
Till,

Once our job was restarted for some reason (e.g. taskmangaer container got killed), it can stuck in continuous restart loop for hours. Right now, I suspect it is caused by GC pause during restart, our job has very high memory allocation in steady state. High GC pause then caused akka timeout, which then caused jobmanager to think taksmanager containers are unhealthy/dead and kill them. And the cycle repeats...

But I hasn't been able to prove or disprove it yet. When I was asking the question, I was still sifting through metrics and error logs.

Thanks,
Steven


On Tue, Aug 22, 2017 at 1:21 AM, Till Rohrmann <[hidden email]> wrote:
Hi Steven,

quick correction for Flink 1.2. Indeed the MetricFetcher does not pick up the right timeout value from the configuration. Instead it uses a hardcoded 10s timeout. This has only been changed recently and is already committed in the master. So with the next release 1.4 it will properly pick up the right timeout settings.

Just out of curiosity, what's the instability issue you're observing?

Cheers,
Till

On Fri, Aug 18, 2017 at 7:07 PM, Steven Wu <[hidden email]> wrote:
Till/Chesnay, thanks for the answers. Look like this is a result/symptom of underline stability issue that I am trying to track down.

It is Flink 1.2.

On Fri, Aug 18, 2017 at 12:24 AM, Chesnay Schepler <[hidden email]> wrote:
The MetricFetcher always use the default akka timeout value.


On 18.08.2017 09:07, Till Rohrmann wrote:
Hi Steven,

I thought that the MetricFetcher picks up the right timeout from the configuration. Which version of Flink are you using?

The timeout is not a critical problem for the job health.

Cheers,
Till

On Fri, Aug 18, 2017 at 7:22 AM, Steven Wu <[hidden email]> wrote:

We have set akka.ask.timeout to 60 s in yaml file. I also confirmed the setting in Flink UI. But I saw akka timeout of 10 s for metric query service. two questions
1) why doesn't metric query use the 60 s value configured in yaml file? does it always use default 10 s value?
2) could this cause heartbeat failure between task manager and job manager? or is this jut non-critical failure that won't affect job health?

Thanks,
Steven

2017-08-17 23:34:33,421 WARN org.apache.flink.runtime.webmonitor.metrics.MetricFetcher - Fetching metrics failed. akka.pattern.AskTimeoutException: Ask timed out on [Actor[akka.tcp://flink@1.2.3.4:39139/user/MetricQueryService_23cd9db754bb7d123d80e6b1c0be21d6]] after [10000 ms] at akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:334) at akka.actor.Scheduler$$anon$7.run(Scheduler.scala:117) at scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:599) at scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109) at scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:597) at akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(Scheduler.scala:474) at akka.actor.LightArrayRevolverScheduler$$anon$8.executeBucket$1(Scheduler.scala:425) at akka.actor.LightArrayRevolverScheduler$$anon$8.nextTick(Scheduler.scala:429) at akka.actor.LightArrayRevolverScheduler$$anon$8.run(Scheduler.scala:381) at java.lang.Thread.run(Thread.java:748)










Reply | Threaded
Open this post in threaded view
|

Re: akka timeout

Till Rohrmann
Great to hear that you could figure things out Steven.

You are right. The death watch is no longer linked to the akka ask timeout, because of FLINK-6495. Thanks for the feedback. I will correct the documentation.

Cheers,
Till

On Sat, Sep 23, 2017 at 10:24 AM, Steven Wu <[hidden email]> wrote:
just to close the thread. akka death watch was triggered by high GC pause, which is caused by memory leak in our code during Flink job restart.

noted that akka.ask.timeout wasn't related to akka death watch, which Flink has documented and linked.

On Sat, Aug 26, 2017 at 10:58 AM, Steven Wu <[hidden email]> wrote:
this is a stateless job. so we don't use RocksDB.

yeah. network can also be a possibility. will keep it in the radar. unfortunately, our metrics system don't have the tcp metrics when running inside containers.

On Fri, Aug 25, 2017 at 2:09 PM, Robert Metzger <[hidden email]> wrote:
Hi,
are you using the RocksDB state backend already? 
Maybe writing the state to disk would actually reduce the pressure on the GC (but of course it'll also reduce throughput a bit).

Are there any known issues with the network? Maybe the network bursts on restart cause the timeouts?


On Fri, Aug 25, 2017 at 6:17 PM, Steven Wu <[hidden email]> wrote:
Bowen, 

Heap size is ~50G. CPU was actually pretty low (like <20%) when high GC pause and akka timeout was happening. So maybe memory allocation and GC wasn't really an issue. I also recently learned that JVM can pause for writing to GC log for disk I/O. that is another lead I am pursuing.

Thanks,
Steven

On Wed, Aug 23, 2017 at 10:58 AM, Bowen Li <[hidden email]> wrote:
Hi Steven,
    Yes, GC is a big overhead, it may cause your CPU utilization to reach 100%, and every process stopped working. We ran into this a while too.

    How much memory did you assign to TaskManager? How much the your CPU utilization when your taskmanager is considered 'killed'?

Bowen



On Wed, Aug 23, 2017 at 10:01 AM, Steven Wu <[hidden email]> wrote:
Till,

Once our job was restarted for some reason (e.g. taskmangaer container got killed), it can stuck in continuous restart loop for hours. Right now, I suspect it is caused by GC pause during restart, our job has very high memory allocation in steady state. High GC pause then caused akka timeout, which then caused jobmanager to think taksmanager containers are unhealthy/dead and kill them. And the cycle repeats...

But I hasn't been able to prove or disprove it yet. When I was asking the question, I was still sifting through metrics and error logs.

Thanks,
Steven


On Tue, Aug 22, 2017 at 1:21 AM, Till Rohrmann <[hidden email]> wrote:
Hi Steven,

quick correction for Flink 1.2. Indeed the MetricFetcher does not pick up the right timeout value from the configuration. Instead it uses a hardcoded 10s timeout. This has only been changed recently and is already committed in the master. So with the next release 1.4 it will properly pick up the right timeout settings.

Just out of curiosity, what's the instability issue you're observing?

Cheers,
Till

On Fri, Aug 18, 2017 at 7:07 PM, Steven Wu <[hidden email]> wrote:
Till/Chesnay, thanks for the answers. Look like this is a result/symptom of underline stability issue that I am trying to track down.

It is Flink 1.2.

On Fri, Aug 18, 2017 at 12:24 AM, Chesnay Schepler <[hidden email]> wrote:
The MetricFetcher always use the default akka timeout value.


On 18.08.2017 09:07, Till Rohrmann wrote:
Hi Steven,

I thought that the MetricFetcher picks up the right timeout from the configuration. Which version of Flink are you using?

The timeout is not a critical problem for the job health.

Cheers,
Till

On Fri, Aug 18, 2017 at 7:22 AM, Steven Wu <[hidden email]> wrote:

We have set akka.ask.timeout to 60 s in yaml file. I also confirmed the setting in Flink UI. But I saw akka timeout of 10 s for metric query service. two questions
1) why doesn't metric query use the 60 s value configured in yaml file? does it always use default 10 s value?
2) could this cause heartbeat failure between task manager and job manager? or is this jut non-critical failure that won't affect job health?

Thanks,
Steven

2017-08-17 23:34:33,421 WARN org.apache.flink.runtime.webmonitor.metrics.MetricFetcher - Fetching metrics failed. akka.pattern.AskTimeoutException: Ask timed out on [Actor[akka.tcp://flink@1.2.3.4:39139/user/MetricQueryService_23cd9db754bb7d123d80e6b1c0be21d6]] after [10000 ms] at akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:334) at akka.actor.Scheduler$$anon$7.run(Scheduler.scala:117) at scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:599) at scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109) at scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:597) at akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(Scheduler.scala:474) at akka.actor.LightArrayRevolverScheduler$$anon$8.executeBucket$1(Scheduler.scala:425) at akka.actor.LightArrayRevolverScheduler$$anon$8.nextTick(Scheduler.scala:429) at akka.actor.LightArrayRevolverScheduler$$anon$8.run(Scheduler.scala:381) at java.lang.Thread.run(Thread.java:748)











Reply | Threaded
Open this post in threaded view
|

Re: akka timeout

Till Rohrmann
Quick question Steven. Where did you find the documentation concerning that the death watch interval is linke to the akka ask timeout? It was included in the past, but I couldn't find it anymore.

Cheers,
Till

On Mon, Sep 25, 2017 at 9:47 AM, Till Rohrmann <[hidden email]> wrote:
Great to hear that you could figure things out Steven.

You are right. The death watch is no longer linked to the akka ask timeout, because of FLINK-6495. Thanks for the feedback. I will correct the documentation.

Cheers,
Till

On Sat, Sep 23, 2017 at 10:24 AM, Steven Wu <[hidden email]> wrote:
just to close the thread. akka death watch was triggered by high GC pause, which is caused by memory leak in our code during Flink job restart.

noted that akka.ask.timeout wasn't related to akka death watch, which Flink has documented and linked.

On Sat, Aug 26, 2017 at 10:58 AM, Steven Wu <[hidden email]> wrote:
this is a stateless job. so we don't use RocksDB.

yeah. network can also be a possibility. will keep it in the radar. unfortunately, our metrics system don't have the tcp metrics when running inside containers.

On Fri, Aug 25, 2017 at 2:09 PM, Robert Metzger <[hidden email]> wrote:
Hi,
are you using the RocksDB state backend already? 
Maybe writing the state to disk would actually reduce the pressure on the GC (but of course it'll also reduce throughput a bit).

Are there any known issues with the network? Maybe the network bursts on restart cause the timeouts?


On Fri, Aug 25, 2017 at 6:17 PM, Steven Wu <[hidden email]> wrote:
Bowen, 

Heap size is ~50G. CPU was actually pretty low (like <20%) when high GC pause and akka timeout was happening. So maybe memory allocation and GC wasn't really an issue. I also recently learned that JVM can pause for writing to GC log for disk I/O. that is another lead I am pursuing.

Thanks,
Steven

On Wed, Aug 23, 2017 at 10:58 AM, Bowen Li <[hidden email]> wrote:
Hi Steven,
    Yes, GC is a big overhead, it may cause your CPU utilization to reach 100%, and every process stopped working. We ran into this a while too.

    How much memory did you assign to TaskManager? How much the your CPU utilization when your taskmanager is considered 'killed'?

Bowen



On Wed, Aug 23, 2017 at 10:01 AM, Steven Wu <[hidden email]> wrote:
Till,

Once our job was restarted for some reason (e.g. taskmangaer container got killed), it can stuck in continuous restart loop for hours. Right now, I suspect it is caused by GC pause during restart, our job has very high memory allocation in steady state. High GC pause then caused akka timeout, which then caused jobmanager to think taksmanager containers are unhealthy/dead and kill them. And the cycle repeats...

But I hasn't been able to prove or disprove it yet. When I was asking the question, I was still sifting through metrics and error logs.

Thanks,
Steven


On Tue, Aug 22, 2017 at 1:21 AM, Till Rohrmann <[hidden email]> wrote:
Hi Steven,

quick correction for Flink 1.2. Indeed the MetricFetcher does not pick up the right timeout value from the configuration. Instead it uses a hardcoded 10s timeout. This has only been changed recently and is already committed in the master. So with the next release 1.4 it will properly pick up the right timeout settings.

Just out of curiosity, what's the instability issue you're observing?

Cheers,
Till

On Fri, Aug 18, 2017 at 7:07 PM, Steven Wu <[hidden email]> wrote:
Till/Chesnay, thanks for the answers. Look like this is a result/symptom of underline stability issue that I am trying to track down.

It is Flink 1.2.

On Fri, Aug 18, 2017 at 12:24 AM, Chesnay Schepler <[hidden email]> wrote:
The MetricFetcher always use the default akka timeout value.


On 18.08.2017 09:07, Till Rohrmann wrote:
Hi Steven,

I thought that the MetricFetcher picks up the right timeout from the configuration. Which version of Flink are you using?

The timeout is not a critical problem for the job health.

Cheers,
Till

On Fri, Aug 18, 2017 at 7:22 AM, Steven Wu <[hidden email]> wrote:

We have set akka.ask.timeout to 60 s in yaml file. I also confirmed the setting in Flink UI. But I saw akka timeout of 10 s for metric query service. two questions
1) why doesn't metric query use the 60 s value configured in yaml file? does it always use default 10 s value?
2) could this cause heartbeat failure between task manager and job manager? or is this jut non-critical failure that won't affect job health?

Thanks,
Steven

2017-08-17 23:34:33,421 WARN org.apache.flink.runtime.webmonitor.metrics.MetricFetcher - Fetching metrics failed. akka.pattern.AskTimeoutException: Ask timed out on [Actor[akka.tcp://flink@1.2.3.4:39139/user/MetricQueryService_23cd9db754bb7d123d80e6b1c0be21d6]] after [10000 ms] at akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:334) at akka.actor.Scheduler$$anon$7.run(Scheduler.scala:117) at scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:599) at scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109) at scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:597) at akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(Scheduler.scala:474) at akka.actor.LightArrayRevolverScheduler$$anon$8.executeBucket$1(Scheduler.scala:425) at akka.actor.LightArrayRevolverScheduler$$anon$8.nextTick(Scheduler.scala:429) at akka.actor.LightArrayRevolverScheduler$$anon$8.run(Scheduler.scala:381) at java.lang.Thread.run(Thread.java:748)












Reply | Threaded
Open this post in threaded view
|

Re: akka timeout

Steven Wu
Till, sorry for the confusion. I meant Flink documentation has the correct info. our code was mistakenly referring to akka.ask.timeout for death watch.

On Mon, Sep 25, 2017 at 3:52 PM, Till Rohrmann <[hidden email]> wrote:
Quick question Steven. Where did you find the documentation concerning that the death watch interval is linke to the akka ask timeout? It was included in the past, but I couldn't find it anymore.

Cheers,
Till

On Mon, Sep 25, 2017 at 9:47 AM, Till Rohrmann <[hidden email]> wrote:
Great to hear that you could figure things out Steven.

You are right. The death watch is no longer linked to the akka ask timeout, because of FLINK-6495. Thanks for the feedback. I will correct the documentation.

Cheers,
Till

On Sat, Sep 23, 2017 at 10:24 AM, Steven Wu <[hidden email]> wrote:
just to close the thread. akka death watch was triggered by high GC pause, which is caused by memory leak in our code during Flink job restart.

noted that akka.ask.timeout wasn't related to akka death watch, which Flink has documented and linked.

On Sat, Aug 26, 2017 at 10:58 AM, Steven Wu <[hidden email]> wrote:
this is a stateless job. so we don't use RocksDB.

yeah. network can also be a possibility. will keep it in the radar. unfortunately, our metrics system don't have the tcp metrics when running inside containers.

On Fri, Aug 25, 2017 at 2:09 PM, Robert Metzger <[hidden email]> wrote:
Hi,
are you using the RocksDB state backend already? 
Maybe writing the state to disk would actually reduce the pressure on the GC (but of course it'll also reduce throughput a bit).

Are there any known issues with the network? Maybe the network bursts on restart cause the timeouts?


On Fri, Aug 25, 2017 at 6:17 PM, Steven Wu <[hidden email]> wrote:
Bowen, 

Heap size is ~50G. CPU was actually pretty low (like <20%) when high GC pause and akka timeout was happening. So maybe memory allocation and GC wasn't really an issue. I also recently learned that JVM can pause for writing to GC log for disk I/O. that is another lead I am pursuing.

Thanks,
Steven

On Wed, Aug 23, 2017 at 10:58 AM, Bowen Li <[hidden email]> wrote:
Hi Steven,
    Yes, GC is a big overhead, it may cause your CPU utilization to reach 100%, and every process stopped working. We ran into this a while too.

    How much memory did you assign to TaskManager? How much the your CPU utilization when your taskmanager is considered 'killed'?

Bowen



On Wed, Aug 23, 2017 at 10:01 AM, Steven Wu <[hidden email]> wrote:
Till,

Once our job was restarted for some reason (e.g. taskmangaer container got killed), it can stuck in continuous restart loop for hours. Right now, I suspect it is caused by GC pause during restart, our job has very high memory allocation in steady state. High GC pause then caused akka timeout, which then caused jobmanager to think taksmanager containers are unhealthy/dead and kill them. And the cycle repeats...

But I hasn't been able to prove or disprove it yet. When I was asking the question, I was still sifting through metrics and error logs.

Thanks,
Steven


On Tue, Aug 22, 2017 at 1:21 AM, Till Rohrmann <[hidden email]> wrote:
Hi Steven,

quick correction for Flink 1.2. Indeed the MetricFetcher does not pick up the right timeout value from the configuration. Instead it uses a hardcoded 10s timeout. This has only been changed recently and is already committed in the master. So with the next release 1.4 it will properly pick up the right timeout settings.

Just out of curiosity, what's the instability issue you're observing?

Cheers,
Till

On Fri, Aug 18, 2017 at 7:07 PM, Steven Wu <[hidden email]> wrote:
Till/Chesnay, thanks for the answers. Look like this is a result/symptom of underline stability issue that I am trying to track down.

It is Flink 1.2.

On Fri, Aug 18, 2017 at 12:24 AM, Chesnay Schepler <[hidden email]> wrote:
The MetricFetcher always use the default akka timeout value.


On 18.08.2017 09:07, Till Rohrmann wrote:
Hi Steven,

I thought that the MetricFetcher picks up the right timeout from the configuration. Which version of Flink are you using?

The timeout is not a critical problem for the job health.

Cheers,
Till

On Fri, Aug 18, 2017 at 7:22 AM, Steven Wu <[hidden email]> wrote:

We have set akka.ask.timeout to 60 s in yaml file. I also confirmed the setting in Flink UI. But I saw akka timeout of 10 s for metric query service. two questions
1) why doesn't metric query use the 60 s value configured in yaml file? does it always use default 10 s value?
2) could this cause heartbeat failure between task manager and job manager? or is this jut non-critical failure that won't affect job health?

Thanks,
Steven

2017-08-17 23:34:33,421 WARN org.apache.flink.runtime.webmonitor.metrics.MetricFetcher - Fetching metrics failed. akka.pattern.AskTimeoutException: Ask timed out on [Actor[akka.tcp://flink@1.2.3.4:39139/user/MetricQueryService_23cd9db754bb7d123d80e6b1c0be21d6]] after [10000 ms] at akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:334) at akka.actor.Scheduler$$anon$7.run(Scheduler.scala:117) at scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:599) at scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109) at scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:597) at akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(Scheduler.scala:474) at akka.actor.LightArrayRevolverScheduler$$anon$8.executeBucket$1(Scheduler.scala:425) at akka.actor.LightArrayRevolverScheduler$$anon$8.nextTick(Scheduler.scala:429) at akka.actor.LightArrayRevolverScheduler$$anon$8.run(Scheduler.scala:381) at java.lang.Thread.run(Thread.java:748)













Reply | Threaded
Open this post in threaded view
|

Re: akka timeout

Till Rohrmann
Alright. Glad to hear that things are now working :-)

On Tue, Sep 26, 2017 at 9:55 AM, Steven Wu <[hidden email]> wrote:
Till, sorry for the confusion. I meant Flink documentation has the correct info. our code was mistakenly referring to akka.ask.timeout for death watch.

On Mon, Sep 25, 2017 at 3:52 PM, Till Rohrmann <[hidden email]> wrote:
Quick question Steven. Where did you find the documentation concerning that the death watch interval is linke to the akka ask timeout? It was included in the past, but I couldn't find it anymore.

Cheers,
Till

On Mon, Sep 25, 2017 at 9:47 AM, Till Rohrmann <[hidden email]> wrote:
Great to hear that you could figure things out Steven.

You are right. The death watch is no longer linked to the akka ask timeout, because of FLINK-6495. Thanks for the feedback. I will correct the documentation.

Cheers,
Till

On Sat, Sep 23, 2017 at 10:24 AM, Steven Wu <[hidden email]> wrote:
just to close the thread. akka death watch was triggered by high GC pause, which is caused by memory leak in our code during Flink job restart.

noted that akka.ask.timeout wasn't related to akka death watch, which Flink has documented and linked.

On Sat, Aug 26, 2017 at 10:58 AM, Steven Wu <[hidden email]> wrote:
this is a stateless job. so we don't use RocksDB.

yeah. network can also be a possibility. will keep it in the radar. unfortunately, our metrics system don't have the tcp metrics when running inside containers.

On Fri, Aug 25, 2017 at 2:09 PM, Robert Metzger <[hidden email]> wrote:
Hi,
are you using the RocksDB state backend already? 
Maybe writing the state to disk would actually reduce the pressure on the GC (but of course it'll also reduce throughput a bit).

Are there any known issues with the network? Maybe the network bursts on restart cause the timeouts?


On Fri, Aug 25, 2017 at 6:17 PM, Steven Wu <[hidden email]> wrote:
Bowen, 

Heap size is ~50G. CPU was actually pretty low (like <20%) when high GC pause and akka timeout was happening. So maybe memory allocation and GC wasn't really an issue. I also recently learned that JVM can pause for writing to GC log for disk I/O. that is another lead I am pursuing.

Thanks,
Steven

On Wed, Aug 23, 2017 at 10:58 AM, Bowen Li <[hidden email]> wrote:
Hi Steven,
    Yes, GC is a big overhead, it may cause your CPU utilization to reach 100%, and every process stopped working. We ran into this a while too.

    How much memory did you assign to TaskManager? How much the your CPU utilization when your taskmanager is considered 'killed'?

Bowen



On Wed, Aug 23, 2017 at 10:01 AM, Steven Wu <[hidden email]> wrote:
Till,

Once our job was restarted for some reason (e.g. taskmangaer container got killed), it can stuck in continuous restart loop for hours. Right now, I suspect it is caused by GC pause during restart, our job has very high memory allocation in steady state. High GC pause then caused akka timeout, which then caused jobmanager to think taksmanager containers are unhealthy/dead and kill them. And the cycle repeats...

But I hasn't been able to prove or disprove it yet. When I was asking the question, I was still sifting through metrics and error logs.

Thanks,
Steven


On Tue, Aug 22, 2017 at 1:21 AM, Till Rohrmann <[hidden email]> wrote:
Hi Steven,

quick correction for Flink 1.2. Indeed the MetricFetcher does not pick up the right timeout value from the configuration. Instead it uses a hardcoded 10s timeout. This has only been changed recently and is already committed in the master. So with the next release 1.4 it will properly pick up the right timeout settings.

Just out of curiosity, what's the instability issue you're observing?

Cheers,
Till

On Fri, Aug 18, 2017 at 7:07 PM, Steven Wu <[hidden email]> wrote:
Till/Chesnay, thanks for the answers. Look like this is a result/symptom of underline stability issue that I am trying to track down.

It is Flink 1.2.

On Fri, Aug 18, 2017 at 12:24 AM, Chesnay Schepler <[hidden email]> wrote:
The MetricFetcher always use the default akka timeout value.


On 18.08.2017 09:07, Till Rohrmann wrote:
Hi Steven,

I thought that the MetricFetcher picks up the right timeout from the configuration. Which version of Flink are you using?

The timeout is not a critical problem for the job health.

Cheers,
Till

On Fri, Aug 18, 2017 at 7:22 AM, Steven Wu <[hidden email]> wrote:

We have set akka.ask.timeout to 60 s in yaml file. I also confirmed the setting in Flink UI. But I saw akka timeout of 10 s for metric query service. two questions
1) why doesn't metric query use the 60 s value configured in yaml file? does it always use default 10 s value?
2) could this cause heartbeat failure between task manager and job manager? or is this jut non-critical failure that won't affect job health?

Thanks,
Steven

2017-08-17 23:34:33,421 WARN org.apache.flink.runtime.webmonitor.metrics.MetricFetcher - Fetching metrics failed. akka.pattern.AskTimeoutException: Ask timed out on [Actor[akka.tcp://flink@1.2.3.4:39139/user/MetricQueryService_23cd9db754bb7d123d80e6b1c0be21d6]] after [10000 ms] at akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:334) at akka.actor.Scheduler$$anon$7.run(Scheduler.scala:117) at scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:599) at scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109) at scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:597) at akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(Scheduler.scala:474) at akka.actor.LightArrayRevolverScheduler$$anon$8.executeBucket$1(Scheduler.scala:425) at akka.actor.LightArrayRevolverScheduler$$anon$8.nextTick(Scheduler.scala:429) at akka.actor.LightArrayRevolverScheduler$$anon$8.run(Scheduler.scala:381) at java.lang.Thread.run(Thread.java:748)