Re: Cannot configure akka.ask.timeout

classic Classic list List threaded Threaded
7 messages Options
Reply | Threaded
Open this post in threaded view
|

Re: Cannot configure akka.ask.timeout

Alex Vinnik
Hi there,

Run into the same problem running a batch job with Flink 1.6.1/1.6.2 .

akka.pattern.AskTimeoutException: Ask timed out on [Actor[akka://flink/user/dispatcher#202546747]] after [10000 ms]. Sender[null] sent message of type "org.apache.flink.runtime.rpc.messages.LocalFencedMessage".

akka.ask.timeout: 600s

But looks like it is not honored. Any suggestions what can be done.

Thanks

On 2018/07/13 10:24:16, Lukas Kircher <[hidden email]> wrote:

> Hello,>
>
> I have problems setting configuration parameters for Akka in Flink 1.5.0. When I run a job I get the exception listed below which states that Akka timed out after 10000ms. I tried to increase the timeout by following the Flink configuration documentation. Specifically I did the following:>
>
> 1) Passed a configuration to the Flink execution environment with `akka.ask.timeout` set to a higher value. I started this in Intellij.>
> 2) Passed program arguments via the run configuration in Intellij, e.g. `-Dakka.ask.timeout:100s`>
> 3) Added `akka.ask.timeout: 100 s` to flink-conf.yaml and started a local standalone cluster via start-cluster.sh. The setting is reflected in Flink's web interface.>
>
> However - despite explicit configuration the default setting seems to be used. The exception below states in each case that akka ask timed out after 10000ms.>
>
> As my problem seems very basic I do not include an SSCCE for now but I can try to build one if this helps figuring out the issue.>
>
> ------>
> [...]>
> Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: Could not retrieve JobResult.>
> [...]>
> at org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:619)>
> at org.apache.flink.client.LocalExecutor.executePlan(LocalExecutor.java:234)>
> at org.apache.flink.api.java.LocalEnvironment.execute(LocalEnvironment.java:91)>
> at org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:816)>
> at org.apache.flink.api.java.DataSet.collect(DataSet.java:413)>
> [...]>
> Caused by: akka.pattern.AskTimeoutException: Ask timed out on [Actor[akka://flink/user/dispatcher8df05371-effc-468b-8a22-e2f364f65d6a#582308583]] after [10000 ms]. Sender[null] sent message of type "org.apache.flink.runtime.rpc.messages.LocalFencedMessage".>
> at akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:604)>
> at akka.actor.Scheduler$$anon$4.run(Scheduler.scala:126)>
> at scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:601)>
> at scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109)>
> at scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:599)>
> at akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:329)>
> at akka.actor.LightArrayRevolverScheduler$$anon$4.executeBucket$1(LightArrayRevolverScheduler.scala:280)>
> at akka.actor.LightArrayRevolverScheduler$$anon$4.nextTick(LightArrayRevolverScheduler.scala:284)>
> at akka.actor.LightArrayRevolverScheduler$$anon$4.run(LightArrayRevolverScheduler.scala:236)>
> at java.lang.Thread.run(Thread.java:745)>
> [...]>
> ------>
>
>
> Best regards and thanks for your help,>
> Lukas>
>
>
>
>
Reply | Threaded
Open this post in threaded view
|

Re: Cannot configure akka.ask.timeout

qi luo
Hi Alex and Lukas,

This error is controlled by another RPC timeout (which is hard coded and not affected by “akka.ask.timeout”). Could you open an JIRA issue so I can propose a fix on that?

Cheers,
Qi

On Dec 12, 2018, at 7:07 AM, Alex Vinnik <[hidden email]> wrote:

Hi there,

Run into the same problem running a batch job with Flink 1.6.1/1.6.2 .

akka.pattern.AskTimeoutException: Ask timed out on [Actor[<a href="akka://flink/user/dispatcher#202546747" class="">akka://flink/user/dispatcher#202546747]] after [10000 ms]. Sender[null] sent message of type "org.apache.flink.runtime.rpc.messages.LocalFencedMessage".

akka.ask.timeout: 600s

But looks like it is not honored. Any suggestions what can be done.

Thanks

On 2018/07/13 10:24:16, Lukas Kircher <[hidden email]> wrote:

> Hello,>
>
> I have problems setting configuration parameters for Akka in Flink 1.5.0. When I run a job I get the exception listed below which states that Akka timed out after 10000ms. I tried to increase the timeout by following the Flink configuration documentation. Specifically I did the following:>
>
> 1) Passed a configuration to the Flink execution environment with `akka.ask.timeout` set to a higher value. I started this in Intellij.>
> 2) Passed program arguments via the run configuration in Intellij, e.g. `-Dakka.ask.timeout:100s`>
> 3) Added `akka.ask.timeout: 100 s` to flink-conf.yaml and started a local standalone cluster via start-cluster.sh. The setting is reflected in Flink's web interface.>
>
> However - despite explicit configuration the default setting seems to be used. The exception below states in each case that akka ask timed out after 10000ms.>
>
> As my problem seems very basic I do not include an SSCCE for now but I can try to build one if this helps figuring out the issue.>
>
> ------>
> [...]>
> Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: Could not retrieve JobResult.>
> [...]>
> at org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:619)>
> at org.apache.flink.client.LocalExecutor.executePlan(LocalExecutor.java:234)>
> at org.apache.flink.api.java.LocalEnvironment.execute(LocalEnvironment.java:91)>
> at org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:816)>
> at org.apache.flink.api.java.DataSet.collect(DataSet.java:413)>
> [...]>
> Caused by: akka.pattern.AskTimeoutException: Ask timed out on [Actor[<a href="akka://flink/user/dispatcher8df05371-effc-468b-8a22-e2f364f65d6a#582308583" class="">akka://flink/user/dispatcher8df05371-effc-468b-8a22-e2f364f65d6a#582308583]] after [10000 ms]. Sender[null] sent message of type "org.apache.flink.runtime.rpc.messages.LocalFencedMessage".>
> at akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:604)>
> at akka.actor.Scheduler$$anon$4.run(Scheduler.scala:126)>
> at scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:601)>
> at scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109)>
> at scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:599)>
> at akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:329)>
> at akka.actor.LightArrayRevolverScheduler$$anon$4.executeBucket$1(LightArrayRevolverScheduler.scala:280)>
> at akka.actor.LightArrayRevolverScheduler$$anon$4.nextTick(LightArrayRevolverScheduler.scala:284)>
> at akka.actor.LightArrayRevolverScheduler$$anon$4.run(LightArrayRevolverScheduler.scala:236)>
> at java.lang.Thread.run(Thread.java:745)>
> [...]>
> ------>
>
>
> Best regards and thanks for your help,>
> Lukas>
>
>
>
>
Reply | Threaded
Open this post in threaded view
|

Re: Cannot configure akka.ask.timeout

Alex Vinnik
Hi Qi,

Thanks for looking into this. Here is ticket https://issues.apache.org/jira/browse/FLINK-11143

Best,
-Alex

On Tue, Dec 11, 2018 at 8:47 PM qi luo <[hidden email]> wrote:
Hi Alex and Lukas,

This error is controlled by another RPC timeout (which is hard coded and not affected by “akka.ask.timeout”). Could you open an JIRA issue so I can propose a fix on that?

Cheers,
Qi

On Dec 12, 2018, at 7:07 AM, Alex Vinnik <[hidden email]> wrote:

Hi there,

Run into the same problem running a batch job with Flink 1.6.1/1.6.2 .

akka.pattern.AskTimeoutException: Ask timed out on [Actor[akka://flink/user/dispatcher#202546747]] after [10000 ms]. Sender[null] sent message of type "org.apache.flink.runtime.rpc.messages.LocalFencedMessage".

akka.ask.timeout: 600s

But looks like it is not honored. Any suggestions what can be done.

Thanks

On 2018/07/13 10:24:16, Lukas Kircher <[hidden email]> wrote:

> Hello,>
>
> I have problems setting configuration parameters for Akka in Flink 1.5.0. When I run a job I get the exception listed below which states that Akka timed out after 10000ms. I tried to increase the timeout by following the Flink configuration documentation. Specifically I did the following:>
>
> 1) Passed a configuration to the Flink execution environment with `akka.ask.timeout` set to a higher value. I started this in Intellij.>
> 2) Passed program arguments via the run configuration in Intellij, e.g. `-Dakka.ask.timeout:100s`>
> 3) Added `akka.ask.timeout: 100 s` to flink-conf.yaml and started a local standalone cluster via start-cluster.sh. The setting is reflected in Flink's web interface.>
>
> However - despite explicit configuration the default setting seems to be used. The exception below states in each case that akka ask timed out after 10000ms.>
>
> As my problem seems very basic I do not include an SSCCE for now but I can try to build one if this helps figuring out the issue.>
>
> ------>
> [...]>
> Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: Could not retrieve JobResult.>
> [...]>
> at org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:619)>
> at org.apache.flink.client.LocalExecutor.executePlan(LocalExecutor.java:234)>
> at org.apache.flink.api.java.LocalEnvironment.execute(LocalEnvironment.java:91)>
> at org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:816)>
> at org.apache.flink.api.java.DataSet.collect(DataSet.java:413)>
> [...]>
> Caused by: akka.pattern.AskTimeoutException: Ask timed out on [Actor[akka://flink/user/dispatcher8df05371-effc-468b-8a22-e2f364f65d6a#582308583]] after [10000 ms]. Sender[null] sent message of type "org.apache.flink.runtime.rpc.messages.LocalFencedMessage".>
> at akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:604)>
> at akka.actor.Scheduler$$anon$4.run(Scheduler.scala:126)>
> at scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:601)>
> at scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109)>
> at scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:599)>
> at akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:329)>
> at akka.actor.LightArrayRevolverScheduler$$anon$4.executeBucket$1(LightArrayRevolverScheduler.scala:280)>
> at akka.actor.LightArrayRevolverScheduler$$anon$4.nextTick(LightArrayRevolverScheduler.scala:284)>
> at akka.actor.LightArrayRevolverScheduler$$anon$4.run(LightArrayRevolverScheduler.scala:236)>
> at java.lang.Thread.run(Thread.java:745)>
> [...]>
> ------>
>
>
> Best regards and thanks for your help,>
> Lukas>
>
>
>
>
Reply | Threaded
Open this post in threaded view
|

Re: Cannot configure akka.ask.timeout

Alex Vinnik
Qi,

Job submission timeout is caused by listing too many files in S3 during env.readFile call to create input DataSet. Is there a way NOT to list S3 files during a job submission? It seems like it should help to mitigate that timeout problem.

What hardcoded value you were referring to? 

Best,
-Alex

On Wed, Dec 12, 2018 at 7:47 AM Alex Vinnik <[hidden email]> wrote:
Hi Qi,

Thanks for looking into this. Here is ticket https://issues.apache.org/jira/browse/FLINK-11143

Best,
-Alex

On Tue, Dec 11, 2018 at 8:47 PM qi luo <[hidden email]> wrote:
Hi Alex and Lukas,

This error is controlled by another RPC timeout (which is hard coded and not affected by “akka.ask.timeout”). Could you open an JIRA issue so I can propose a fix on that?

Cheers,
Qi

On Dec 12, 2018, at 7:07 AM, Alex Vinnik <[hidden email]> wrote:

Hi there,

Run into the same problem running a batch job with Flink 1.6.1/1.6.2 .

akka.pattern.AskTimeoutException: Ask timed out on [Actor[akka://flink/user/dispatcher#202546747]] after [10000 ms]. Sender[null] sent message of type "org.apache.flink.runtime.rpc.messages.LocalFencedMessage".

akka.ask.timeout: 600s

But looks like it is not honored. Any suggestions what can be done.

Thanks

On 2018/07/13 10:24:16, Lukas Kircher <[hidden email]> wrote:

> Hello,>
>
> I have problems setting configuration parameters for Akka in Flink 1.5.0. When I run a job I get the exception listed below which states that Akka timed out after 10000ms. I tried to increase the timeout by following the Flink configuration documentation. Specifically I did the following:>
>
> 1) Passed a configuration to the Flink execution environment with `akka.ask.timeout` set to a higher value. I started this in Intellij.>
> 2) Passed program arguments via the run configuration in Intellij, e.g. `-Dakka.ask.timeout:100s`>
> 3) Added `akka.ask.timeout: 100 s` to flink-conf.yaml and started a local standalone cluster via start-cluster.sh. The setting is reflected in Flink's web interface.>
>
> However - despite explicit configuration the default setting seems to be used. The exception below states in each case that akka ask timed out after 10000ms.>
>
> As my problem seems very basic I do not include an SSCCE for now but I can try to build one if this helps figuring out the issue.>
>
> ------>
> [...]>
> Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: Could not retrieve JobResult.>
> [...]>
> at org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:619)>
> at org.apache.flink.client.LocalExecutor.executePlan(LocalExecutor.java:234)>
> at org.apache.flink.api.java.LocalEnvironment.execute(LocalEnvironment.java:91)>
> at org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:816)>
> at org.apache.flink.api.java.DataSet.collect(DataSet.java:413)>
> [...]>
> Caused by: akka.pattern.AskTimeoutException: Ask timed out on [Actor[akka://flink/user/dispatcher8df05371-effc-468b-8a22-e2f364f65d6a#582308583]] after [10000 ms]. Sender[null] sent message of type "org.apache.flink.runtime.rpc.messages.LocalFencedMessage".>
> at akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:604)>
> at akka.actor.Scheduler$$anon$4.run(Scheduler.scala:126)>
> at scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:601)>
> at scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109)>
> at scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:599)>
> at akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:329)>
> at akka.actor.LightArrayRevolverScheduler$$anon$4.executeBucket$1(LightArrayRevolverScheduler.scala:280)>
> at akka.actor.LightArrayRevolverScheduler$$anon$4.nextTick(LightArrayRevolverScheduler.scala:284)>
> at akka.actor.LightArrayRevolverScheduler$$anon$4.run(LightArrayRevolverScheduler.scala:236)>
> at java.lang.Thread.run(Thread.java:745)>
> [...]>
> ------>
>
>
> Best regards and thanks for your help,>
> Lukas>
>
>
>
>
Reply | Threaded
Open this post in threaded view
|

Re: Cannot configure akka.ask.timeout

qi luo
Hi Alex,

The hard code I’ve found is [1] and [2].

We encountered a similar issue like yours (listing a lot of HDFS files). We end up with a newer version of HDFSFileInput which lists files concurrently. Another hack we did is to list the files in client side and pass them to JobManager via serialization (not recommended though as it doesn’t follow Flink framework mechanism). 

You can also try listing S3 files concurrently, or paste your sample code here.

[2] https://github.com/apache/flink/blob/b0496f21d70cc1af15569f3632d7a58fd53b8f95/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java#L117

On Dec 13, 2018, at 1:09 AM, Alex Vinnik <[hidden email]> wrote:

Qi,

Job submission timeout is caused by listing too many files in S3 during env.readFile call to create input DataSet. Is there a way NOT to list S3 files during a job submission? It seems like it should help to mitigate that timeout problem.

What hardcoded value you were referring to? 

Best,
-Alex

On Wed, Dec 12, 2018 at 7:47 AM Alex Vinnik <[hidden email]> wrote:
Hi Qi,

Thanks for looking into this. Here is ticket https://issues.apache.org/jira/browse/FLINK-11143

Best,
-Alex

On Tue, Dec 11, 2018 at 8:47 PM qi luo <[hidden email]> wrote:
Hi Alex and Lukas,

This error is controlled by another RPC timeout (which is hard coded and not affected by “akka.ask.timeout”). Could you open an JIRA issue so I can propose a fix on that?

Cheers,
Qi

On Dec 12, 2018, at 7:07 AM, Alex Vinnik <[hidden email]> wrote:

Hi there,

Run into the same problem running a batch job with Flink 1.6.1/1.6.2 .

akka.pattern.AskTimeoutException: Ask timed out on [Actor[akka://flink/user/dispatcher#202546747]] after [10000 ms]. Sender[null] sent message of type "org.apache.flink.runtime.rpc.messages.LocalFencedMessage".

akka.ask.timeout: 600s

But looks like it is not honored. Any suggestions what can be done.

Thanks

On 2018/07/13 10:24:16, Lukas Kircher <[hidden email]> wrote:

> Hello,>
>
> I have problems setting configuration parameters for Akka in Flink 1.5.0. When I run a job I get the exception listed below which states that Akka timed out after 10000ms. I tried to increase the timeout by following the Flink configuration documentation. Specifically I did the following:>
>
> 1) Passed a configuration to the Flink execution environment with `akka.ask.timeout` set to a higher value. I started this in Intellij.>
> 2) Passed program arguments via the run configuration in Intellij, e.g. `-Dakka.ask.timeout:100s`>
> 3) Added `akka.ask.timeout: 100 s` to flink-conf.yaml and started a local standalone cluster via start-cluster.sh. The setting is reflected in Flink's web interface.>
>
> However - despite explicit configuration the default setting seems to be used. The exception below states in each case that akka ask timed out after 10000ms.>
>
> As my problem seems very basic I do not include an SSCCE for now but I can try to build one if this helps figuring out the issue.>
>
> ------>
> [...]>
> Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: Could not retrieve JobResult.>
> [...]>
> at org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:619)>
> at org.apache.flink.client.LocalExecutor.executePlan(LocalExecutor.java:234)>
> at org.apache.flink.api.java.LocalEnvironment.execute(LocalEnvironment.java:91)>
> at org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:816)>
> at org.apache.flink.api.java.DataSet.collect(DataSet.java:413)>
> [...]>
> Caused by: akka.pattern.AskTimeoutException: Ask timed out on [Actor[akka://flink/user/dispatcher8df05371-effc-468b-8a22-e2f364f65d6a#582308583]] after [10000 ms]. Sender[null] sent message of type "org.apache.flink.runtime.rpc.messages.LocalFencedMessage".>
> at akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:604)>
> at akka.actor.Scheduler$$anon$4.run(Scheduler.scala:126)>
> at scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:601)>
> at scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109)>
> at scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:599)>
> at akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:329)>
> at akka.actor.LightArrayRevolverScheduler$$anon$4.executeBucket$1(LightArrayRevolverScheduler.scala:280)>
> at akka.actor.LightArrayRevolverScheduler$$anon$4.nextTick(LightArrayRevolverScheduler.scala:284)>
> at akka.actor.LightArrayRevolverScheduler$$anon$4.run(LightArrayRevolverScheduler.scala:236)>
> at java.lang.Thread.run(Thread.java:745)>
> [...]>
> ------>
>
>
> Best regards and thanks for your help,>
> Lukas>
>
>
>
>

Reply | Threaded
Open this post in threaded view
|

Re: Cannot configure akka.ask.timeout

Alex Vinnik
Qi,

Thanks for references! How do enable concurrent s3 file listing? Here is the code.

// Consume the JSON files
Configuration configuration = new Configuration(GlobalConfiguration.loadConfiguration());
configuration.setBoolean(JsonLinesInputFormat.ENUMERATE_NESTED_FILES_FLAG, true);

JsonLinesInputFormat jsonInputFormat = new JsonLinesInputFormat(new Path(inputPath), configuration);
jsonInputFormat.setFilesFilter(new BucketingSinkFilter());

DataSet<ObjectNode> input = env.readFile(jsonInputFormat, inputPath).withParameters(configuration);

On Wed, Dec 12, 2018 at 8:53 PM qi luo <[hidden email]> wrote:
Hi Alex,

The hard code I’ve found is [1] and [2].

We encountered a similar issue like yours (listing a lot of HDFS files). We end up with a newer version of HDFSFileInput which lists files concurrently. Another hack we did is to list the files in client side and pass them to JobManager via serialization (not recommended though as it doesn’t follow Flink framework mechanism). 

You can also try listing S3 files concurrently, or paste your sample code here.

[2] https://github.com/apache/flink/blob/b0496f21d70cc1af15569f3632d7a58fd53b8f95/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java#L117

On Dec 13, 2018, at 1:09 AM, Alex Vinnik <[hidden email]> wrote:

Qi,

Job submission timeout is caused by listing too many files in S3 during env.readFile call to create input DataSet. Is there a way NOT to list S3 files during a job submission? It seems like it should help to mitigate that timeout problem.

What hardcoded value you were referring to? 

Best,
-Alex

On Wed, Dec 12, 2018 at 7:47 AM Alex Vinnik <[hidden email]> wrote:
Hi Qi,

Thanks for looking into this. Here is ticket https://issues.apache.org/jira/browse/FLINK-11143

Best,
-Alex

On Tue, Dec 11, 2018 at 8:47 PM qi luo <[hidden email]> wrote:
Hi Alex and Lukas,

This error is controlled by another RPC timeout (which is hard coded and not affected by “akka.ask.timeout”). Could you open an JIRA issue so I can propose a fix on that?

Cheers,
Qi

On Dec 12, 2018, at 7:07 AM, Alex Vinnik <[hidden email]> wrote:

Hi there,

Run into the same problem running a batch job with Flink 1.6.1/1.6.2 .

akka.pattern.AskTimeoutException: Ask timed out on [Actor[akka://flink/user/dispatcher#202546747]] after [10000 ms]. Sender[null] sent message of type "org.apache.flink.runtime.rpc.messages.LocalFencedMessage".

akka.ask.timeout: 600s

But looks like it is not honored. Any suggestions what can be done.

Thanks

On 2018/07/13 10:24:16, Lukas Kircher <[hidden email]> wrote:

> Hello,>
>
> I have problems setting configuration parameters for Akka in Flink 1.5.0. When I run a job I get the exception listed below which states that Akka timed out after 10000ms. I tried to increase the timeout by following the Flink configuration documentation. Specifically I did the following:>
>
> 1) Passed a configuration to the Flink execution environment with `akka.ask.timeout` set to a higher value. I started this in Intellij.>
> 2) Passed program arguments via the run configuration in Intellij, e.g. `-Dakka.ask.timeout:100s`>
> 3) Added `akka.ask.timeout: 100 s` to flink-conf.yaml and started a local standalone cluster via start-cluster.sh. The setting is reflected in Flink's web interface.>
>
> However - despite explicit configuration the default setting seems to be used. The exception below states in each case that akka ask timed out after 10000ms.>
>
> As my problem seems very basic I do not include an SSCCE for now but I can try to build one if this helps figuring out the issue.>
>
> ------>
> [...]>
> Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: Could not retrieve JobResult.>
> [...]>
> at org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:619)>
> at org.apache.flink.client.LocalExecutor.executePlan(LocalExecutor.java:234)>
> at org.apache.flink.api.java.LocalEnvironment.execute(LocalEnvironment.java:91)>
> at org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:816)>
> at org.apache.flink.api.java.DataSet.collect(DataSet.java:413)>
> [...]>
> Caused by: akka.pattern.AskTimeoutException: Ask timed out on [Actor[akka://flink/user/dispatcher8df05371-effc-468b-8a22-e2f364f65d6a#582308583]] after [10000 ms]. Sender[null] sent message of type "org.apache.flink.runtime.rpc.messages.LocalFencedMessage".>
> at akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:604)>
> at akka.actor.Scheduler$$anon$4.run(Scheduler.scala:126)>
> at scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:601)>
> at scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109)>
> at scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:599)>
> at akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:329)>
> at akka.actor.LightArrayRevolverScheduler$$anon$4.executeBucket$1(LightArrayRevolverScheduler.scala:280)>
> at akka.actor.LightArrayRevolverScheduler$$anon$4.nextTick(LightArrayRevolverScheduler.scala:284)>
> at akka.actor.LightArrayRevolverScheduler$$anon$4.run(LightArrayRevolverScheduler.scala:236)>
> at java.lang.Thread.run(Thread.java:745)>
> [...]>
> ------>
>
>
> Best regards and thanks for your help,>
> Lukas>
>
>
>
>

Reply | Threaded
Open this post in threaded view
|

Re: Cannot configure akka.ask.timeout

qi luo
Hi Alex,

I’m not very familiar with JsonLinesInputFormat, is that your own implementation? You may look into the `createInputSplits()` method which should do the listing work. You may rewrite it with concurrent listing.

On Dec 13, 2018, at 11:56 PM, Alex Vinnik <[hidden email]> wrote:

Qi,

Thanks for references! How do enable concurrent s3 file listing? Here is the code.

// Consume the JSON files
Configuration configuration = new Configuration(GlobalConfiguration.loadConfiguration());
configuration.setBoolean(JsonLinesInputFormat.ENUMERATE_NESTED_FILES_FLAG, true);

JsonLinesInputFormat jsonInputFormat = new JsonLinesInputFormat(new Path(inputPath), configuration);
jsonInputFormat.setFilesFilter(new BucketingSinkFilter());

DataSet<ObjectNode> input = env.readFile(jsonInputFormat, inputPath).withParameters(configuration);

On Wed, Dec 12, 2018 at 8:53 PM qi luo <[hidden email]> wrote:
Hi Alex,

The hard code I’ve found is [1] and [2].

We encountered a similar issue like yours (listing a lot of HDFS files). We end up with a newer version of HDFSFileInput which lists files concurrently. Another hack we did is to list the files in client side and pass them to JobManager via serialization (not recommended though as it doesn’t follow Flink framework mechanism). 

You can also try listing S3 files concurrently, or paste your sample code here.

[2] https://github.com/apache/flink/blob/b0496f21d70cc1af15569f3632d7a58fd53b8f95/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java#L117

On Dec 13, 2018, at 1:09 AM, Alex Vinnik <[hidden email]> wrote:

Qi,

Job submission timeout is caused by listing too many files in S3 during env.readFile call to create input DataSet. Is there a way NOT to list S3 files during a job submission? It seems like it should help to mitigate that timeout problem.

What hardcoded value you were referring to? 

Best,
-Alex

On Wed, Dec 12, 2018 at 7:47 AM Alex Vinnik <[hidden email]> wrote:
Hi Qi,

Thanks for looking into this. Here is ticket https://issues.apache.org/jira/browse/FLINK-11143

Best,
-Alex

On Tue, Dec 11, 2018 at 8:47 PM qi luo <[hidden email]> wrote:
Hi Alex and Lukas,

This error is controlled by another RPC timeout (which is hard coded and not affected by “akka.ask.timeout”). Could you open an JIRA issue so I can propose a fix on that?

Cheers,
Qi

On Dec 12, 2018, at 7:07 AM, Alex Vinnik <[hidden email]> wrote:

Hi there,

Run into the same problem running a batch job with Flink 1.6.1/1.6.2 .

akka.pattern.AskTimeoutException: Ask timed out on [Actor[akka://flink/user/dispatcher#202546747]] after [10000 ms]. Sender[null] sent message of type "org.apache.flink.runtime.rpc.messages.LocalFencedMessage".

akka.ask.timeout: 600s

But looks like it is not honored. Any suggestions what can be done.

Thanks

On 2018/07/13 10:24:16, Lukas Kircher <[hidden email]> wrote:

> Hello,>
>
> I have problems setting configuration parameters for Akka in Flink 1.5.0. When I run a job I get the exception listed below which states that Akka timed out after 10000ms. I tried to increase the timeout by following the Flink configuration documentation. Specifically I did the following:>
>
> 1) Passed a configuration to the Flink execution environment with `akka.ask.timeout` set to a higher value. I started this in Intellij.>
> 2) Passed program arguments via the run configuration in Intellij, e.g. `-Dakka.ask.timeout:100s`>
> 3) Added `akka.ask.timeout: 100 s` to flink-conf.yaml and started a local standalone cluster via start-cluster.sh. The setting is reflected in Flink's web interface.>
>
> However - despite explicit configuration the default setting seems to be used. The exception below states in each case that akka ask timed out after 10000ms.>
>
> As my problem seems very basic I do not include an SSCCE for now but I can try to build one if this helps figuring out the issue.>
>
> ------>
> [...]>
> Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: Could not retrieve JobResult.>
> [...]>
> at org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:619)>
> at org.apache.flink.client.LocalExecutor.executePlan(LocalExecutor.java:234)>
> at org.apache.flink.api.java.LocalEnvironment.execute(LocalEnvironment.java:91)>
> at org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:816)>
> at org.apache.flink.api.java.DataSet.collect(DataSet.java:413)>
> [...]>
> Caused by: akka.pattern.AskTimeoutException: Ask timed out on [Actor[akka://flink/user/dispatcher8df05371-effc-468b-8a22-e2f364f65d6a#582308583]] after [10000 ms]. Sender[null] sent message of type "org.apache.flink.runtime.rpc.messages.LocalFencedMessage".>
> at akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:604)>
> at akka.actor.Scheduler$$anon$4.run(Scheduler.scala:126)>
> at scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:601)>
> at scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109)>
> at scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:599)>
> at akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:329)>
> at akka.actor.LightArrayRevolverScheduler$$anon$4.executeBucket$1(LightArrayRevolverScheduler.scala:280)>
> at akka.actor.LightArrayRevolverScheduler$$anon$4.nextTick(LightArrayRevolverScheduler.scala:284)>
> at akka.actor.LightArrayRevolverScheduler$$anon$4.run(LightArrayRevolverScheduler.scala:236)>
> at java.lang.Thread.run(Thread.java:745)>
> [...]>
> ------>
>
>
> Best regards and thanks for your help,>
> Lukas>
>
>
>
>