Hi there,
Run into the same problem running a batch job with Flink 1.6.1/1.6.2 . akka.pattern.AskTimeoutException: Ask timed out on [Actor[akka://flink/user/dispatcher#202546747]] after [10000 ms]. Sender[null] sent message of type "org.apache.flink.runtime.rpc.messages.LocalFencedMessage". akka.ask.timeout: 600s But looks like it is not honored. Any suggestions what can be done. Thanks On 2018/07/13 10:24:16, Lukas Kircher <[hidden email]> wrote: > Hello,> > > I have problems setting configuration parameters for Akka in Flink 1.5.0. When I run a job I get the exception listed below which states that Akka timed out after 10000ms. I tried to increase the timeout by following the Flink configuration documentation. Specifically I did the following:> > > 1) Passed a configuration to the Flink execution environment with `akka.ask.timeout` set to a higher value. I started this in Intellij.> > 2) Passed program arguments via the run configuration in Intellij, e.g. `-Dakka.ask.timeout:100s`> > 3) Added `akka.ask.timeout: 100 s` to flink-conf.yaml and started a local standalone cluster via start-cluster.sh. The setting is reflected in Flink's web interface.> > > However - despite explicit configuration the default setting seems to be used. The exception below states in each case that akka ask timed out after 10000ms.> > > As my problem seems very basic I do not include an SSCCE for now but I can try to build one if this helps figuring out the issue.> > > ------> > [...]> > Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: Could not retrieve JobResult.> > [...]> > at org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:619)> > at org.apache.flink.client.LocalExecutor.executePlan(LocalExecutor.java:234)> > at org.apache.flink.api.java.LocalEnvironment.execute(LocalEnvironment.java:91)> > at org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:816)> > at org.apache.flink.api.java.DataSet.collect(DataSet.java:413)> > [...]> > Caused by: akka.pattern.AskTimeoutException: Ask timed out on [Actor[akka://flink/user/dispatcher8df05371-effc-468b-8a22-e2f364f65d6a#582308583]] after [10000 ms]. Sender[null] sent message of type "org.apache.flink.runtime.rpc.messages.LocalFencedMessage".> > at akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:604)> > at akka.actor.Scheduler$$anon$4.run(Scheduler.scala:126)> > at scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:601)> > at scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109)> > at scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:599)> > at akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:329)> > at akka.actor.LightArrayRevolverScheduler$$anon$4.executeBucket$1(LightArrayRevolverScheduler.scala:280)> > at akka.actor.LightArrayRevolverScheduler$$anon$4.nextTick(LightArrayRevolverScheduler.scala:284)> > at akka.actor.LightArrayRevolverScheduler$$anon$4.run(LightArrayRevolverScheduler.scala:236)> > at java.lang.Thread.run(Thread.java:745)> > [...]> > ------> > > > Best regards and thanks for your help,> > Lukas> > > > > |
Hi Alex and Lukas,
This error is controlled by another RPC timeout (which is hard coded and not affected by “akka.ask.timeout”). Could you open an JIRA issue so I can propose a fix on that? Cheers, Qi
|
Hi Qi, Thanks for looking into this. Here is ticket https://issues.apache.org/jira/browse/FLINK-11143 Best, -Alex On Tue, Dec 11, 2018 at 8:47 PM qi luo <[hidden email]> wrote:
|
Qi, Job submission timeout is caused by listing too many files in S3 during env.readFile call to create input DataSet. Is there a way NOT to list S3 files during a job submission? It seems like it should help to mitigate that timeout problem. What hardcoded value you were referring to? Best, -Alex On Wed, Dec 12, 2018 at 7:47 AM Alex Vinnik <[hidden email]> wrote:
|
Hi Alex,
The hard code I’ve found is [1] and [2]. We encountered a similar issue like yours (listing a lot of HDFS files). We end up with a newer version of HDFSFileInput which lists files concurrently. Another hack we did is to list the files in client side and pass them to JobManager via serialization (not recommended though as it doesn’t follow Flink framework mechanism). You can also try listing S3 files concurrently, or paste your sample code here. [2] https://github.com/apache/flink/blob/b0496f21d70cc1af15569f3632d7a58fd53b8f95/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java#L117
|
Qi, Thanks for references! How do enable concurrent s3 file listing? Here is the code. // Consume the JSON files On Wed, Dec 12, 2018 at 8:53 PM qi luo <[hidden email]> wrote:
|
Hi Alex,
I’m not very familiar with JsonLinesInputFormat, is that your own implementation? You may look into the `createInputSplits()` method which should do the listing work. You may rewrite it with concurrent listing.
|
Free forum by Nabble | Edit this page |