Hello,
I have problems setting configuration parameters for Akka in Flink 1.5.0. When I run a job I get the exception listed below which states that Akka timed out after 10000ms. I tried to increase the timeout by following the Flink configuration documentation. Specifically I did the following: 1) Passed a configuration to the Flink execution environment with `akka.ask.timeout` set to a higher value. I started this in Intellij. 2) Passed program arguments via the run configuration in Intellij, e.g. `-Dakka.ask.timeout:100s` 3) Added `akka.ask.timeout: 100 s` to flink-conf.yaml and started a local standalone cluster via start-cluster.sh. The setting is reflected in Flink's web interface. However - despite explicit configuration the default setting seems to be used. The exception below states in each case that akka ask timed out after 10000ms. As my problem seems very basic I do not include an SSCCE for now but I can try to build one if this helps figuring out the issue. ------ [...] Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: Could not retrieve JobResult. [...] at org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:619) at org.apache.flink.client.LocalExecutor.executePlan(LocalExecutor.java:234) at org.apache.flink.api.java.LocalEnvironment.execute(LocalEnvironment.java:91) at org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:816) at org.apache.flink.api.java.DataSet.collect(DataSet.java:413) [...] Caused by: akka.pattern.AskTimeoutException: Ask timed out on [Actor[<a href="akka://flink/user/dispatcher8df05371-effc-468b-8a22-e2f364f65d6a#582308583" class="">akka://flink/user/dispatcher8df05371-effc-468b-8a22-e2f364f65d6a#582308583]] after [10000 ms]. Sender[null] sent message of type "org.apache.flink.runtime.rpc.messages.LocalFencedMessage". at akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:604) at akka.actor.Scheduler$$anon$4.run(Scheduler.scala:126) at scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:601) at scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109) at scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:599) at akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:329) at akka.actor.LightArrayRevolverScheduler$$anon$4.executeBucket$1(LightArrayRevolverScheduler.scala:280) at akka.actor.LightArrayRevolverScheduler$$anon$4.nextTick(LightArrayRevolverScheduler.scala:284) at akka.actor.LightArrayRevolverScheduler$$anon$4.run(LightArrayRevolverScheduler.scala:236) at java.lang.Thread.run(Thread.java:745) [...] ------ Best regards and thanks for your help, Lukas |
Hello,
does anybody have an idea what is going on? I have not yet found a solution. Am I doing something wrong? Or is the 'akka.ask.timeout' parameter not related to the exception stated below? Could somebody please take a look at this? More details can be found in the message prior to this. akka.pattern.AskTimeoutException: Ask timed out on [Actor[<a href="akka://flink/user/dispatcher8df05371-effc-468b-8a22-e2f364f65d6a#582308583" class="">akka://flink/user/dispatcher8df05371-effc-468b-8a22-e2f364f65d6a#582308583]] after [10000 ms] Best regards, Lukas
|
Hi Lukas
From your first two steps' description ("started this in Intellij") and the exception log, I think you run your program locally within Intellij with LocalStreamEnvironment. You can view the configuration related code from org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java
below:
Configuration configuration = new Configuration();Unluckily, from the above code, I don't think you might be able to set specific akka-timout if you don't change this class (if I'm wrong, please correct me), the easiest way is just to change the ASK_TIMEOUT's default value within org/apache/flink/configuration/AkkaOptions.java from "10 s" to "100 s".
Best
Yun
From: Lukas Kircher <[hidden email]>
Sent: Wednesday, July 18, 2018 14:47 To: user Subject: Re: Cannot configure akka.ask.timeout Hello,
does anybody have an idea what is going on? I have not yet found a solution.
Am I doing something wrong? Or is the 'akka.ask.timeout' parameter not related to the exception stated below?
Could somebody please take a look at this? More details can be found in the message prior to this.
akka.pattern.AskTimeoutException: Ask timed out on [Actor[akka://flink/user/dispatcher8df05371-effc-468b-8a22-e2f364f65d6a#582308583]] after [10000 ms]
Best regards,
Lukas
|
That stumped us too and I am not sure but could you set up web.timeout higher then t he default 10s. We had issues with timeouts on job submission and were advised to change web.timeout, job submission now being an RPC call.. please do let us know if that helps... On Wed, Jul 18, 2018, 5:11 AM Yun Tang <[hidden email]> wrote:
|
Scratch that... that is a different exception.... On Wed, Jul 18, 2018, 8:40 AM Vishal Santoshi <[hidden email]> wrote:
|
In reply to this post by Lukas Kircher-2
Hi Lukas, It seems that when using MiniCluster, the config key akka.ask.timeout is not respected. Instead, a hardcoded timeout of 10s is used [1]. Since all communication is locally, it would be interesting to see in detail what your job looks like that it exceeds the timeout. The key akka.ask.timeout specifies the default RPC timeout. However, for requests originating from the REST API, web.timeout overrides this value. When submitting a job using the CLI to a (standalone) cluster, a request is issued against the REST API. Therefore, you can try setting web.timeout=100000 in the flink-conf.yaml as already proposed by Vishal Santoshi. Best, Gary [1] https://github.com/apache/flink/blob/749dd29935f319b062051141e150eed7a1a5f298/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java#L185 On Fri, Jul 13, 2018 at 12:24 PM, Lukas Kircher <[hidden email]> wrote:
|
Thanks for your answers.
In my use case I am reading from a large number of individual files. Jobs are issued directly from the Java API, the results are collected (in memory) and re-used partially in follow-up jobs. I feared that using a MiniCluster or local environment I would not be able to overwrite the default for `akka.ask.timeout` without tinkering with Flink's source code - thanks for your confirmation. For now I'll stick to this solution though. Cheers, Lukas
|
Free forum by Nabble | Edit this page |