Cannot configure akka.ask.timeout

classic Classic list List threaded Threaded
7 messages Options
Reply | Threaded
Open this post in threaded view
|

Cannot configure akka.ask.timeout

Lukas Kircher-2
Hello,

I have problems setting configuration parameters for Akka in Flink 1.5.0. When I run a job I get the exception listed below which states that Akka timed out after 10000ms. I tried to increase the timeout by following the Flink configuration documentation. Specifically I did the following:

1) Passed a configuration to the Flink execution environment with `akka.ask.timeout` set to a higher value. I started this in Intellij.
2) Passed program arguments via the run configuration in Intellij, e.g. `-Dakka.ask.timeout:100s`
3) Added `akka.ask.timeout: 100 s` to flink-conf.yaml and started a local standalone cluster via start-cluster.sh. The setting is reflected in Flink's web interface.

However - despite explicit configuration the default setting seems to be used. The exception below states in each case that akka ask timed out after 10000ms.

As my problem seems very basic I do not include an SSCCE for now but I can try to build one if this helps figuring out the issue.

------
[...]
Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: Could not retrieve JobResult.
[...]
at org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:619)
at org.apache.flink.client.LocalExecutor.executePlan(LocalExecutor.java:234)
at org.apache.flink.api.java.LocalEnvironment.execute(LocalEnvironment.java:91)
at org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:816)
at org.apache.flink.api.java.DataSet.collect(DataSet.java:413)
[...]
Caused by: akka.pattern.AskTimeoutException: Ask timed out on [Actor[<a href="akka://flink/user/dispatcher8df05371-effc-468b-8a22-e2f364f65d6a#582308583" class="">akka://flink/user/dispatcher8df05371-effc-468b-8a22-e2f364f65d6a#582308583]] after [10000 ms]. Sender[null] sent message of type "org.apache.flink.runtime.rpc.messages.LocalFencedMessage".
at akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:604)
at akka.actor.Scheduler$$anon$4.run(Scheduler.scala:126)
at scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:601)
at scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109)
at scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:599)
at akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:329)
at akka.actor.LightArrayRevolverScheduler$$anon$4.executeBucket$1(LightArrayRevolverScheduler.scala:280)
at akka.actor.LightArrayRevolverScheduler$$anon$4.nextTick(LightArrayRevolverScheduler.scala:284)
at akka.actor.LightArrayRevolverScheduler$$anon$4.run(LightArrayRevolverScheduler.scala:236)
at java.lang.Thread.run(Thread.java:745)
[...]
------


Best regards and thanks for your help,
Lukas



Reply | Threaded
Open this post in threaded view
|

Re: Cannot configure akka.ask.timeout

Lukas Kircher-2
Hello,

does anybody have an idea what is going on? I have not yet found a solution.

Am I doing something wrong? Or is the 'akka.ask.timeout' parameter not related to the exception stated below?

Could somebody please take a look at this? More details can be found in the message prior to this.

akka.pattern.AskTimeoutException: Ask timed out on [Actor[<a href="akka://flink/user/dispatcher8df05371-effc-468b-8a22-e2f364f65d6a#582308583" class="">akka://flink/user/dispatcher8df05371-effc-468b-8a22-e2f364f65d6a#582308583]] after [10000 ms]

Best regards,
Lukas


On 13. Jul 2018, at 12:24, Lukas Kircher <[hidden email]> wrote:

Hello,

I have problems setting configuration parameters for Akka in Flink 1.5.0. When I run a job I get the exception listed below which states that Akka timed out after 10000ms. I tried to increase the timeout by following the Flink configuration documentation. Specifically I did the following:

1) Passed a configuration to the Flink execution environment with `akka.ask.timeout` set to a higher value. I started this in Intellij.
2) Passed program arguments via the run configuration in Intellij, e.g. `-Dakka.ask.timeout:100s`
3) Added `akka.ask.timeout: 100 s` to flink-conf.yaml and started a local standalone cluster via start-cluster.sh. The setting is reflected in Flink's web interface.

However - despite explicit configuration the default setting seems to be used. The exception below states in each case that akka ask timed out after 10000ms.

As my problem seems very basic I do not include an SSCCE for now but I can try to build one if this helps figuring out the issue.

------
[...]
Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: Could not retrieve JobResult.
[...]
at org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:619)
at org.apache.flink.client.LocalExecutor.executePlan(LocalExecutor.java:234)
at org.apache.flink.api.java.LocalEnvironment.execute(LocalEnvironment.java:91)
at org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:816)
at org.apache.flink.api.java.DataSet.collect(DataSet.java:413)
[...]
Caused by: akka.pattern.AskTimeoutException: Ask timed out on [Actor[<a href="akka://flink/user/dispatcher8df05371-effc-468b-8a22-e2f364f65d6a#582308583" class="">akka://flink/user/dispatcher8df05371-effc-468b-8a22-e2f364f65d6a#582308583]] after [10000 ms]. Sender[null] sent message of type "org.apache.flink.runtime.rpc.messages.LocalFencedMessage".
at akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:604)
at akka.actor.Scheduler$$anon$4.run(Scheduler.scala:126)
at scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:601)
at scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109)
at scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:599)
at akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:329)
at akka.actor.LightArrayRevolverScheduler$$anon$4.executeBucket$1(LightArrayRevolverScheduler.scala:280)
at akka.actor.LightArrayRevolverScheduler$$anon$4.nextTick(LightArrayRevolverScheduler.scala:284)
at akka.actor.LightArrayRevolverScheduler$$anon$4.run(LightArrayRevolverScheduler.scala:236)
at java.lang.Thread.run(Thread.java:745)
[...]
------


Best regards and thanks for your help,
Lukas




Reply | Threaded
Open this post in threaded view
|

Re: Cannot configure akka.ask.timeout

Yun Tang
Hi Lukas

From your first two steps' description ("started this in Intellij") and the exception log, I think you run your program locally within Intellij with LocalStreamEnvironment. You can view the configuration related code from org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java below:

Configuration configuration = new Configuration();
configuration.addAll(jobGraph.getJobConfiguration());
configuration.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, -1L);

// add (and override) the settings with what the user defined
configuration.addAll(this.configuration);

if (!configuration.contains(RestOptions.PORT)) {
configuration.setInteger(RestOptions.PORT, 0);
}

int numSlotsPerTaskManager = configuration.getInteger(TaskManagerOptions.NUM_TASK_SLOTS, jobGraph.getMaximumParallelism());

MiniClusterConfiguration cfg = new MiniClusterConfiguration.Builder()
.setConfiguration(configuration)
.setNumSlotsPerTaskManager(numSlotsPerTaskManager)
.build();
Unluckily, from the above code, I don't think you might be able to set specific akka-timout if you don't change this class (if I'm wrong, please correct me), the easiest way is just to change the ASK_TIMEOUT's default value within org/apache/flink/configuration/AkkaOptions.java from "10 s" to "100 s".

Best
Yun


From: Lukas Kircher <[hidden email]>
Sent: Wednesday, July 18, 2018 14:47
To: user
Subject: Re: Cannot configure akka.ask.timeout
 
Hello,

does anybody have an idea what is going on? I have not yet found a solution.

Am I doing something wrong? Or is the 'akka.ask.timeout' parameter not related to the exception stated below?

Could somebody please take a look at this? More details can be found in the message prior to this.

akka.pattern.AskTimeoutException: Ask timed out on [Actor[akka://flink/user/dispatcher8df05371-effc-468b-8a22-e2f364f65d6a#582308583]] after [10000 ms]

Best regards,
Lukas


On 13. Jul 2018, at 12:24, Lukas Kircher <[hidden email]> wrote:

Hello,

I have problems setting configuration parameters for Akka in Flink 1.5.0. When I run a job I get the exception listed below which states that Akka timed out after 10000ms. I tried to increase the timeout by following the Flink configuration documentation. Specifically I did the following:

1) Passed a configuration to the Flink execution environment with `akka.ask.timeout` set to a higher value. I started this in Intellij.
2) Passed program arguments via the run configuration in Intellij, e.g. `-Dakka.ask.timeout:100s`
3) Added `akka.ask.timeout: 100 s` to flink-conf.yaml and started a local standalone cluster via start-cluster.sh. The setting is reflected in Flink's web interface.

However - despite explicit configuration the default setting seems to be used. The exception below states in each case that akka ask timed out after 10000ms.

As my problem seems very basic I do not include an SSCCE for now but I can try to build one if this helps figuring out the issue.

------
[...]
Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: Could not retrieve JobResult.
[...]
at org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:619)
at org.apache.flink.client.LocalExecutor.executePlan(LocalExecutor.java:234)
at org.apache.flink.api.java.LocalEnvironment.execute(LocalEnvironment.java:91)
at org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:816)
at org.apache.flink.api.java.DataSet.collect(DataSet.java:413)
[...]
Caused by: akka.pattern.AskTimeoutException: Ask timed out on [Actor[akka://flink/user/dispatcher8df05371-effc-468b-8a22-e2f364f65d6a#582308583]] after [10000 ms]. Sender[null] sent message of type "org.apache.flink.runtime.rpc.messages.LocalFencedMessage".
at akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:604)
at akka.actor.Scheduler$$anon$4.run(Scheduler.scala:126)
at scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:601)
at scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109)
at scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:599)
at akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:329)
at akka.actor.LightArrayRevolverScheduler$$anon$4.executeBucket$1(LightArrayRevolverScheduler.scala:280)
at akka.actor.LightArrayRevolverScheduler$$anon$4.nextTick(LightArrayRevolverScheduler.scala:284)
at akka.actor.LightArrayRevolverScheduler$$anon$4.run(LightArrayRevolverScheduler.scala:236)
at java.lang.Thread.run(Thread.java:745)
[...]
------


Best regards and thanks for your help,
Lukas




Reply | Threaded
Open this post in threaded view
|

Re: Cannot configure akka.ask.timeout

Vishal Santoshi
That stumped us too and I am not sure but could you set up web.timeout higher then t he default 10s. We had issues with timeouts on job submission and were advised to change web.timeout, job submission now being an RPC  call.. please do let us know if that helps...

On Wed, Jul 18, 2018, 5:11 AM Yun Tang <[hidden email]> wrote:
Hi Lukas

From your first two steps' description ("started this in Intellij") and the exception log, I think you run your program locally within Intellij with LocalStreamEnvironment. You can view the configuration related code from org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java below:

Configuration configuration = new Configuration();
configuration.addAll(jobGraph.getJobConfiguration());
configuration.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, -1L);

// add (and override) the settings with what the user defined
configuration.addAll(this.configuration);

if (!configuration.contains(RestOptions.PORT)) {
configuration.setInteger(RestOptions.PORT, 0);
}

int numSlotsPerTaskManager = configuration.getInteger(TaskManagerOptions.NUM_TASK_SLOTS, jobGraph.getMaximumParallelism());

MiniClusterConfiguration cfg = new MiniClusterConfiguration.Builder()
.setConfiguration(configuration)
.setNumSlotsPerTaskManager(numSlotsPerTaskManager)
.build();
Unluckily, from the above code, I don't think you might be able to set specific akka-timout if you don't change this class (if I'm wrong, please correct me), the easiest way is just to change the ASK_TIMEOUT's default value within org/apache/flink/configuration/AkkaOptions.java from "10 s" to "100 s".

Best
Yun


From: Lukas Kircher <[hidden email]>
Sent: Wednesday, July 18, 2018 14:47
To: user
Subject: Re: Cannot configure akka.ask.timeout
 
Hello,

does anybody have an idea what is going on? I have not yet found a solution.

Am I doing something wrong? Or is the 'akka.ask.timeout' parameter not related to the exception stated below?

Could somebody please take a look at this? More details can be found in the message prior to this.

akka.pattern.AskTimeoutException: Ask timed out on [Actor[akka://flink/user/dispatcher8df05371-effc-468b-8a22-e2f364f65d6a#582308583]] after [10000 ms]

Best regards,
Lukas


On 13. Jul 2018, at 12:24, Lukas Kircher <[hidden email]> wrote:

Hello,

I have problems setting configuration parameters for Akka in Flink 1.5.0. When I run a job I get the exception listed below which states that Akka timed out after 10000ms. I tried to increase the timeout by following the Flink configuration documentation. Specifically I did the following:

1) Passed a configuration to the Flink execution environment with `akka.ask.timeout` set to a higher value. I started this in Intellij.
2) Passed program arguments via the run configuration in Intellij, e.g. `-Dakka.ask.timeout:100s`
3) Added `akka.ask.timeout: 100 s` to flink-conf.yaml and started a local standalone cluster via start-cluster.sh. The setting is reflected in Flink's web interface.

However - despite explicit configuration the default setting seems to be used. The exception below states in each case that akka ask timed out after 10000ms.

As my problem seems very basic I do not include an SSCCE for now but I can try to build one if this helps figuring out the issue.

------
[...]
Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: Could not retrieve JobResult.
[...]
at org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:619)
at org.apache.flink.client.LocalExecutor.executePlan(LocalExecutor.java:234)
at org.apache.flink.api.java.LocalEnvironment.execute(LocalEnvironment.java:91)
at org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:816)
at org.apache.flink.api.java.DataSet.collect(DataSet.java:413)
[...]
Caused by: akka.pattern.AskTimeoutException: Ask timed out on [Actor[akka://flink/user/dispatcher8df05371-effc-468b-8a22-e2f364f65d6a#582308583]] after [10000 ms]. Sender[null] sent message of type "org.apache.flink.runtime.rpc.messages.LocalFencedMessage".
at akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:604)
at akka.actor.Scheduler$$anon$4.run(Scheduler.scala:126)
at scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:601)
at scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109)
at scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:599)
at akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:329)
at akka.actor.LightArrayRevolverScheduler$$anon$4.executeBucket$1(LightArrayRevolverScheduler.scala:280)
at akka.actor.LightArrayRevolverScheduler$$anon$4.nextTick(LightArrayRevolverScheduler.scala:284)
at akka.actor.LightArrayRevolverScheduler$$anon$4.run(LightArrayRevolverScheduler.scala:236)
at java.lang.Thread.run(Thread.java:745)
[...]
------


Best regards and thanks for your help,
Lukas




Reply | Threaded
Open this post in threaded view
|

Re: Cannot configure akka.ask.timeout

Vishal Santoshi
Scratch that... that is a different exception....

On Wed, Jul 18, 2018, 8:40 AM Vishal Santoshi <[hidden email]> wrote:
That stumped us too and I am not sure but could you set up web.timeout higher then t he default 10s. We had issues with timeouts on job submission and were advised to change web.timeout, job submission now being an RPC  call.. please do let us know if that helps...

On Wed, Jul 18, 2018, 5:11 AM Yun Tang <[hidden email]> wrote:
Hi Lukas

From your first two steps' description ("started this in Intellij") and the exception log, I think you run your program locally within Intellij with LocalStreamEnvironment. You can view the configuration related code from org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java below:

Configuration configuration = new Configuration();
configuration.addAll(jobGraph.getJobConfiguration());
configuration.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, -1L);

// add (and override) the settings with what the user defined
configuration.addAll(this.configuration);

if (!configuration.contains(RestOptions.PORT)) {
configuration.setInteger(RestOptions.PORT, 0);
}

int numSlotsPerTaskManager = configuration.getInteger(TaskManagerOptions.NUM_TASK_SLOTS, jobGraph.getMaximumParallelism());

MiniClusterConfiguration cfg = new MiniClusterConfiguration.Builder()
.setConfiguration(configuration)
.setNumSlotsPerTaskManager(numSlotsPerTaskManager)
.build();
Unluckily, from the above code, I don't think you might be able to set specific akka-timout if you don't change this class (if I'm wrong, please correct me), the easiest way is just to change the ASK_TIMEOUT's default value within org/apache/flink/configuration/AkkaOptions.java from "10 s" to "100 s".

Best
Yun


From: Lukas Kircher <[hidden email]>
Sent: Wednesday, July 18, 2018 14:47
To: user
Subject: Re: Cannot configure akka.ask.timeout
 
Hello,

does anybody have an idea what is going on? I have not yet found a solution.

Am I doing something wrong? Or is the 'akka.ask.timeout' parameter not related to the exception stated below?

Could somebody please take a look at this? More details can be found in the message prior to this.

akka.pattern.AskTimeoutException: Ask timed out on [Actor[akka://flink/user/dispatcher8df05371-effc-468b-8a22-e2f364f65d6a#582308583]] after [10000 ms]

Best regards,
Lukas


On 13. Jul 2018, at 12:24, Lukas Kircher <[hidden email]> wrote:

Hello,

I have problems setting configuration parameters for Akka in Flink 1.5.0. When I run a job I get the exception listed below which states that Akka timed out after 10000ms. I tried to increase the timeout by following the Flink configuration documentation. Specifically I did the following:

1) Passed a configuration to the Flink execution environment with `akka.ask.timeout` set to a higher value. I started this in Intellij.
2) Passed program arguments via the run configuration in Intellij, e.g. `-Dakka.ask.timeout:100s`
3) Added `akka.ask.timeout: 100 s` to flink-conf.yaml and started a local standalone cluster via start-cluster.sh. The setting is reflected in Flink's web interface.

However - despite explicit configuration the default setting seems to be used. The exception below states in each case that akka ask timed out after 10000ms.

As my problem seems very basic I do not include an SSCCE for now but I can try to build one if this helps figuring out the issue.

------
[...]
Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: Could not retrieve JobResult.
[...]
at org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:619)
at org.apache.flink.client.LocalExecutor.executePlan(LocalExecutor.java:234)
at org.apache.flink.api.java.LocalEnvironment.execute(LocalEnvironment.java:91)
at org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:816)
at org.apache.flink.api.java.DataSet.collect(DataSet.java:413)
[...]
Caused by: akka.pattern.AskTimeoutException: Ask timed out on [Actor[akka://flink/user/dispatcher8df05371-effc-468b-8a22-e2f364f65d6a#582308583]] after [10000 ms]. Sender[null] sent message of type "org.apache.flink.runtime.rpc.messages.LocalFencedMessage".
at akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:604)
at akka.actor.Scheduler$$anon$4.run(Scheduler.scala:126)
at scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:601)
at scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109)
at scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:599)
at akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:329)
at akka.actor.LightArrayRevolverScheduler$$anon$4.executeBucket$1(LightArrayRevolverScheduler.scala:280)
at akka.actor.LightArrayRevolverScheduler$$anon$4.nextTick(LightArrayRevolverScheduler.scala:284)
at akka.actor.LightArrayRevolverScheduler$$anon$4.run(LightArrayRevolverScheduler.scala:236)
at java.lang.Thread.run(Thread.java:745)
[...]
------


Best regards and thanks for your help,
Lukas




Reply | Threaded
Open this post in threaded view
|

Re: Cannot configure akka.ask.timeout

Gary Yao-2
In reply to this post by Lukas Kircher-2
Hi Lukas,

It seems that when using MiniCluster, the config key akka.ask.timeout is not
respected. Instead, a hardcoded timeout of 10s is used [1]. Since all
communication is locally, it would be interesting to see in detail what your
job looks like that it exceeds the timeout.

The key akka.ask.timeout specifies the default RPC timeout. However, for
requests originating from the REST API, web.timeout overrides this value. When
submitting a job using the CLI to a (standalone) cluster, a request is issued
against the REST API. Therefore, you can try setting web.timeout=100000 in the
flink-conf.yaml as already proposed by Vishal Santoshi.

Best,
Gary

[1] https://github.com/apache/flink/blob/749dd29935f319b062051141e150eed7a1a5f298/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java#L185

On Fri, Jul 13, 2018 at 12:24 PM, Lukas Kircher <[hidden email]> wrote:
Hello,

I have problems setting configuration parameters for Akka in Flink 1.5.0. When I run a job I get the exception listed below which states that Akka timed out after 10000ms. I tried to increase the timeout by following the Flink configuration documentation. Specifically I did the following:

1) Passed a configuration to the Flink execution environment with `akka.ask.timeout` set to a higher value. I started this in Intellij.
2) Passed program arguments via the run configuration in Intellij, e.g. `-Dakka.ask.timeout:100s`
3) Added `akka.ask.timeout: 100 s` to flink-conf.yaml and started a local standalone cluster via start-cluster.sh. The setting is reflected in Flink's web interface.

However - despite explicit configuration the default setting seems to be used. The exception below states in each case that akka ask timed out after 10000ms.

As my problem seems very basic I do not include an SSCCE for now but I can try to build one if this helps figuring out the issue.

------
[...]
Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: Could not retrieve JobResult.
[...]
at org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:619)
at org.apache.flink.client.LocalExecutor.executePlan(LocalExecutor.java:234)
at org.apache.flink.api.java.LocalEnvironment.execute(LocalEnvironment.java:91)
at org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:816)
at org.apache.flink.api.java.DataSet.collect(DataSet.java:413)
[...]
Caused by: akka.pattern.AskTimeoutException: Ask timed out on [Actor[akka://flink/user/dispatcher8df05371-effc-468b-8a22-e2f364f65d6a#582308583]] after [10000 ms]. Sender[null] sent message of type "org.apache.flink.runtime.rpc.messages.LocalFencedMessage".
at akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:604)
at akka.actor.Scheduler$$anon$4.run(Scheduler.scala:126)
at scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:601)
at scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109)
at scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:599)
at akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:329)
at akka.actor.LightArrayRevolverScheduler$$anon$4.executeBucket$1(LightArrayRevolverScheduler.scala:280)
at akka.actor.LightArrayRevolverScheduler$$anon$4.nextTick(LightArrayRevolverScheduler.scala:284)
at akka.actor.LightArrayRevolverScheduler$$anon$4.run(LightArrayRevolverScheduler.scala:236)
at java.lang.Thread.run(Thread.java:745)
[...]
------


Best regards and thanks for your help,
Lukas




Reply | Threaded
Open this post in threaded view
|

Re: Cannot configure akka.ask.timeout

Lukas Kircher-2
Thanks for your answers.

In my use case I am reading from a large number of individual files. Jobs are issued directly from the Java API, the results are collected (in memory) and re-used partially in follow-up jobs.

I feared that using a MiniCluster or local environment I would not be able to overwrite the default for `akka.ask.timeout` without tinkering with Flink's source code - thanks for your confirmation. For now I'll stick to this solution though.

Cheers,
Lukas

On 19. Jul 2018, at 11:03, Gary Yao <[hidden email]> wrote:

Hi Lukas,

It seems that when using MiniCluster, the config key akka.ask.timeout is not
respected. Instead, a hardcoded timeout of 10s is used [1]. Since all
communication is locally, it would be interesting to see in detail what your
job looks like that it exceeds the timeout.

The key akka.ask.timeout specifies the default RPC timeout. However, for
requests originating from the REST API, web.timeout overrides this value. When
submitting a job using the CLI to a (standalone) cluster, a request is issued
against the REST API. Therefore, you can try setting web.timeout=100000 in the
flink-conf.yaml as already proposed by Vishal Santoshi.

Best,
Gary

[1] https://github.com/apache/flink/blob/749dd29935f319b062051141e150eed7a1a5f298/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java#L185

On Fri, Jul 13, 2018 at 12:24 PM, Lukas Kircher <[hidden email]> wrote:
Hello,

I have problems setting configuration parameters for Akka in Flink 1.5.0. When I run a job I get the exception listed below which states that Akka timed out after 10000ms. I tried to increase the timeout by following the Flink configuration documentation. Specifically I did the following:

1) Passed a configuration to the Flink execution environment with `akka.ask.timeout` set to a higher value. I started this in Intellij.
2) Passed program arguments via the run configuration in Intellij, e.g. `-Dakka.ask.timeout:100s`
3) Added `akka.ask.timeout: 100 s` to flink-conf.yaml and started a local standalone cluster via start-cluster.sh. The setting is reflected in Flink's web interface.

However - despite explicit configuration the default setting seems to be used. The exception below states in each case that akka ask timed out after 10000ms.

As my problem seems very basic I do not include an SSCCE for now but I can try to build one if this helps figuring out the issue.

------
[...]
Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: Could not retrieve JobResult.
[...]
at org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:619)
at org.apache.flink.client.LocalExecutor.executePlan(LocalExecutor.java:234)
at org.apache.flink.api.java.LocalEnvironment.execute(LocalEnvironment.java:91)
at org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:816)
at org.apache.flink.api.java.DataSet.collect(DataSet.java:413)
[...]
Caused by: akka.pattern.AskTimeoutException: Ask timed out on [Actor[akka://flink/user/dispatcher8df05371-effc-468b-8a22-e2f364f65d6a#582308583]] after [10000 ms]. Sender[null] sent message of type "org.apache.flink.runtime.rpc.messages.LocalFencedMessage".
at akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:604)
at akka.actor.Scheduler$$anon$4.run(Scheduler.scala:126)
at scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:601)
at scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109)
at scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:599)
at akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:329)
at akka.actor.LightArrayRevolverScheduler$$anon$4.executeBucket$1(LightArrayRevolverScheduler.scala:280)
at akka.actor.LightArrayRevolverScheduler$$anon$4.nextTick(LightArrayRevolverScheduler.scala:284)
at akka.actor.LightArrayRevolverScheduler$$anon$4.run(LightArrayRevolverScheduler.scala:236)
at java.lang.Thread.run(Thread.java:745)
[...]
------


Best regards and thanks for your help,
Lukas