AskTimeoutException

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

AskTimeoutException

Alex Soto
Hello,

I am using Flink version 1.7.1. In a unit test, I create a local environment:

Configuration cfg = new Configuration();
cfg.setString(AkkaOptions.ASK_TIMEOUT, "2 min");
cfg.setString(AkkaOptions.CLIENT_TIMEOUT, "2 min");

LocalEnvironment env = ExecutionEnvironment.createLocalEnvironment(cfg);

Yet, when I run the test, I am getting the following error:

Caused by: akka.pattern.AskTimeoutException: Ask timed out on [Actor[<a href="akka://flink/user/dispatcher87b320bd-c3c8-485f-82f7-113f52fb46a1#-1843625489" class="">akka://flink/user/dispatcher87b320bd-c3c8-485f-82f7-113f52fb46a1#-1843625489]] after [10000 ms]. Sender[null] sent message of type "org.apache.flink.runtime.rpc.messages.LocalFencedMessage".
at akka.pattern.PromiseActorRef$.$anonfun$apply$1(AskSupport.scala:604)
at akka.actor.Scheduler$$anon$4.run(Scheduler.scala:126)
at scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:870)
at scala.concurrent.BatchingExecutor.execute(BatchingExecutor.scala:109)
at scala.concurrent.BatchingExecutor.execute$(BatchingExecutor.scala:103)
at scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:868)
at akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:329)
at akka.actor.LightArrayRevolverScheduler$$anon$4.executeBucket$1(LightArrayRevolverScheduler.scala:280)
at akka.actor.LightArrayRevolverScheduler$$anon$4.nextTick(LightArrayRevolverScheduler.scala:284)
at akka.actor.LightArrayRevolverScheduler$$anon$4.run(LightArrayRevolverScheduler.scala:236)
at java.lang.Thread.run(Thread.java:748)



The question is why doesn’t Flink honor the timeout configuration values I am passing when creating the local environment.  I am passing 2 minutes, bu the error message says it timed out after 10 seconds.


Best regards,
Alex soto




Reply | Threaded
Open this post in threaded view
|

Re: AskTimeoutException

Abdul Qadeer
Hi Alex,

The timeout shown in the exception is due to  AkkaOptions.LOOKUP_TIMEOUT

On Fri, 12 Apr 2019 at 09:45, Alex Soto <[hidden email]> wrote:
Hello,

I am using Flink version 1.7.1. In a unit test, I create a local environment:

Configuration cfg = new Configuration();
cfg.setString(AkkaOptions.ASK_TIMEOUT, "2 min");
cfg.setString(AkkaOptions.CLIENT_TIMEOUT, "2 min");

LocalEnvironment env = ExecutionEnvironment.createLocalEnvironment(cfg);

Yet, when I run the test, I am getting the following error:

Caused by: akka.pattern.AskTimeoutException: Ask timed out on [Actor[akka://flink/user/dispatcher87b320bd-c3c8-485f-82f7-113f52fb46a1#-1843625489]] after [10000 ms]. Sender[null] sent message of type "org.apache.flink.runtime.rpc.messages.LocalFencedMessage".
at akka.pattern.PromiseActorRef$.$anonfun$apply$1(AskSupport.scala:604)
at akka.actor.Scheduler$$anon$4.run(Scheduler.scala:126)
at scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:870)
at scala.concurrent.BatchingExecutor.execute(BatchingExecutor.scala:109)
at scala.concurrent.BatchingExecutor.execute$(BatchingExecutor.scala:103)
at scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:868)
at akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:329)
at akka.actor.LightArrayRevolverScheduler$$anon$4.executeBucket$1(LightArrayRevolverScheduler.scala:280)
at akka.actor.LightArrayRevolverScheduler$$anon$4.nextTick(LightArrayRevolverScheduler.scala:284)
at akka.actor.LightArrayRevolverScheduler$$anon$4.run(LightArrayRevolverScheduler.scala:236)
at java.lang.Thread.run(Thread.java:748)



The question is why doesn’t Flink honor the timeout configuration values I am passing when creating the local environment.  I am passing 2 minutes, bu the error message says it timed out after 10 seconds.


Best regards,
Alex soto




Reply | Threaded
Open this post in threaded view
|

Re: AskTimeoutException

Alex Soto
Thanks Abdul for the help.  So I added this:

cfg.setString(AkkaOptions.LOOKUP_TIMEOUT, "2 min");


But I am still I am getting the same error:

Caused by: akka.pattern.AskTimeoutException: Ask timed out on [Actor[<a href="akka://flink/user/dispatcher62bab021-4a79-4d10-8d45-7a33c493a925#-199361569" class="">akka://flink/user/dispatcher62bab021-4a79-4d10-8d45-7a33c493a925#-199361569]] after [10000 ms]. Sender[null] sent message of type "org.apache.flink.runtime.rpc.messages.LocalFencedMessage".



Best regards,
Alex soto





On Apr 12, 2019, at 6:34 PM, Abdul Qadeer <[hidden email]> wrote:

Hi Alex,

The timeout shown in the exception is due to  AkkaOptions.LOOKUP_TIMEOUT

On Fri, 12 Apr 2019 at 09:45, Alex Soto <[hidden email]> wrote:
Hello,

I am using Flink version 1.7.1. In a unit test, I create a local environment:

Configuration cfg = new Configuration();
cfg.setString(AkkaOptions.ASK_TIMEOUT, "2 min");
cfg.setString(AkkaOptions.CLIENT_TIMEOUT, "2 min");

LocalEnvironment env = ExecutionEnvironment.createLocalEnvironment(cfg);

Yet, when I run the test, I am getting the following error:

Caused by: akka.pattern.AskTimeoutException: Ask timed out on [Actor[akka://flink/user/dispatcher87b320bd-c3c8-485f-82f7-113f52fb46a1#-1843625489]] after [10000 ms]. Sender[null] sent message of type "org.apache.flink.runtime.rpc.messages.LocalFencedMessage".
at akka.pattern.PromiseActorRef$.$anonfun$apply$1(AskSupport.scala:604)
at akka.actor.Scheduler$$anon$4.run(Scheduler.scala:126)
at scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:870)
at scala.concurrent.BatchingExecutor.execute(BatchingExecutor.scala:109)
at scala.concurrent.BatchingExecutor.execute$(BatchingExecutor.scala:103)
at scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:868)
at akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:329)
at akka.actor.LightArrayRevolverScheduler$$anon$4.executeBucket$1(LightArrayRevolverScheduler.scala:280)
at akka.actor.LightArrayRevolverScheduler$$anon$4.nextTick(LightArrayRevolverScheduler.scala:284)
at akka.actor.LightArrayRevolverScheduler$$anon$4.run(LightArrayRevolverScheduler.scala:236)
at java.lang.Thread.run(Thread.java:748)



The question is why doesn’t Flink honor the timeout configuration values I am passing when creating the local environment.  I am passing 2 minutes, bu the error message says it timed out after 10 seconds.


Best regards,
Alex soto





Reply | Threaded
Open this post in threaded view
|

Re: AskTimeoutException

Alex Soto
I found the issue was a hard-coded timeout value in MiniCluster class, which is used for stand alone execution:


public MiniCluster(MiniClusterConfiguration miniClusterConfiguration) {
this.miniClusterConfiguration = checkNotNull(miniClusterConfiguration, "config may not be null");

this.rpcTimeout = Time.seconds(10L);
this.terminationFuture = CompletableFuture.completedFuture(null);
running = false;
}


This was fixed in later versions:  https://issues.apache.org/jira/browse/FLINK-11690

So the solution is to upgrade to 1.7.3 or 1.8.0

Best regards,
Alex soto




On Apr 24, 2019, at 1:39 PM, Alex Soto <[hidden email]> wrote:

Thanks Abdul for the help.  So I added this:

cfg.setString(AkkaOptions.LOOKUP_TIMEOUT, "2 min");


But I am still I am getting the same error:

Caused by: akka.pattern.AskTimeoutException: Ask timed out on [Actor[<a href="akka://flink/user/dispatcher62bab021-4a79-4d10-8d45-7a33c493a925#-199361569" class="">akka://flink/user/dispatcher62bab021-4a79-4d10-8d45-7a33c493a925#-199361569]] after [10000 ms]. Sender[null] sent message of type "org.apache.flink.runtime.rpc.messages.LocalFencedMessage".



Best regards,
Alex soto





On Apr 12, 2019, at 6:34 PM, Abdul Qadeer <[hidden email]> wrote:

Hi Alex,

The timeout shown in the exception is due to  AkkaOptions.LOOKUP_TIMEOUT

On Fri, 12 Apr 2019 at 09:45, Alex Soto <[hidden email]> wrote:
Hello,

I am using Flink version 1.7.1. In a unit test, I create a local environment:

Configuration cfg = new Configuration();
cfg.setString(AkkaOptions.ASK_TIMEOUT, "2 min");
cfg.setString(AkkaOptions.CLIENT_TIMEOUT, "2 min");

LocalEnvironment env = ExecutionEnvironment.createLocalEnvironment(cfg);

Yet, when I run the test, I am getting the following error:

Caused by: akka.pattern.AskTimeoutException: Ask timed out on [Actor[akka://flink/user/dispatcher87b320bd-c3c8-485f-82f7-113f52fb46a1#-1843625489]] after [10000 ms]. Sender[null] sent message of type "org.apache.flink.runtime.rpc.messages.LocalFencedMessage".
at akka.pattern.PromiseActorRef$.$anonfun$apply$1(AskSupport.scala:604)
at akka.actor.Scheduler$$anon$4.run(Scheduler.scala:126)
at scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:870)
at scala.concurrent.BatchingExecutor.execute(BatchingExecutor.scala:109)
at scala.concurrent.BatchingExecutor.execute$(BatchingExecutor.scala:103)
at scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:868)
at akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:329)
at akka.actor.LightArrayRevolverScheduler$$anon$4.executeBucket$1(LightArrayRevolverScheduler.scala:280)
at akka.actor.LightArrayRevolverScheduler$$anon$4.nextTick(LightArrayRevolverScheduler.scala:284)
at akka.actor.LightArrayRevolverScheduler$$anon$4.run(LightArrayRevolverScheduler.scala:236)
at java.lang.Thread.run(Thread.java:748)



The question is why doesn’t Flink honor the timeout configuration values I am passing when creating the local environment.  I am passing 2 minutes, bu the error message says it timed out after 10 seconds.


Best regards,
Alex soto