Change Akka Ask Timeout for Job Submission Only

classic Classic list List threaded Threaded
7 messages Options
Reply | Threaded
Open this post in threaded view
|

Change Akka Ask Timeout for Job Submission Only

Abdul Qadeer
Hi!

I am using Flink 1.8.3 and facing an issue where job submission through RestClusterClient times out on Akka (default value 10s). In previous Flink versions there was an option to set a different timeout value just for the submission client (ClusterClient config), but looks like it is not honored now as job submission from client is no more through Akka and it will use the same value present with Dispatcher. I wanted to know how to increase this timeout just for job submission without affecting other akka threads in TaskManager/JobManager, or any other solution for the problem.

The relevant stack trace is pasted below:

"cause":{"commonElementCount":8,"localizedMessage":"Could not submit job (JobID: 26940c17ae3130fb8be1323cce1036e4)","message":"Could not submit job (JobID: 26940c17ae3130fb8be1323cce1036e4)","name":"org.apache.flink.client.program.ProgramInvocationException","cause":{"commonElementCount":3,"localizedMessage":"Failed to submit JobGraph.","message":"Failed to submit JobGraph.","name":"org.apache.flink.runtime.client.JobSubmissionException","cause":{"commonElementCount":3,"localizedMessage":"[Internal server error., <Exception on server side:\nakka.pattern.AskTimeoutException: Ask timed out on [Actor[akka://flink/user/dispatcher#1457923918]] after [10000 ms]. Sender[null] sent message of type \"org.apache.flink.runtime.rpc.messages.LocalFencedMessage\".\n\tat akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:604)\n\tat akka.actor.Scheduler$$anon$4.run(Scheduler.scala:126)\n\tat scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:601)\n\tat scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109)\n\tat scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:599)\n\tat akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:329)\n\tat akka.actor.LightArrayRevolverScheduler$$anon$4.executeBucket$1(LightArrayRevolverScheduler.scala:280)\n\tat akka.actor.LightArrayRevolverScheduler$$anon$4.nextTick(LightArrayRevolverScheduler.scala:284)\n\tat akka.actor.LightArrayRevolverScheduler$$anon$4.run(LightArrayRevolverScheduler.scala:236)\n\tat java.lang.Thread.run(Thread.java:745)\n\nEnd of exception on server side>]","message":"[Internal server error., <Exception on server side:\nakka.pattern.AskTimeoutException: Ask timed out on [Actor[akka://flink/user/dispatcher#1457923918]] after [10000 ms]. Sender[null] sent message of type \"org.apache.flink.runtime.rpc.messages.LocalFencedMessage\".\n\tat akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:604)\n\tat akka.actor.Scheduler$$anon$4.run(Scheduler.scala:126)\n\tat scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:601)\n\tat scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109)\n\tat scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:599)\n\tat akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:329)\n\tat akka.actor.LightArrayRevolverScheduler$$anon$4.executeBucket$1(LightArrayRevolverScheduler.scala:280)\n\tat akka.actor.LightArrayRevolverScheduler$$anon$4.nextTick(LightArrayRevolverScheduler.scala:284)\n\tat akka.actor.LightArrayRevolverScheduler$$anon$4.run(LightArrayRevolverScheduler.scala:236)\n\tat java.lang.Thread.run(Thread.java:745)\n\nEnd of exception on server side>]","name":"org.apache.flink.runtime.rest.util.RestClientException","extendedStackTrace":[{"class":"org.apache.flink.runtime.rest.RestClient","method":"parseResponse","file":"RestClient.java","line":389,"exact":false,"location":"flink-runtime_2.11-1.8.2.jar","version":"1.8.2"},{"class":"org.apache.flink.runtime.rest.RestClient","method":"lambda$submitRequest$3","file":"RestClient.java","line":373,"exact":false,"location":"flink-runtime_2.11-1.8.2.jar","version":"1.8.2"}
Reply | Threaded
Open this post in threaded view
|

Re: Change Akka Ask Timeout for Job Submission Only

tison
In previous version there is an "akka.client.timeout" option but it is only used for timeout the future in client side so I don't think it change akka scope timeout.

Best,
tison.


Abdul Qadeer <[hidden email]> 于2019年12月20日周五 上午10:44写道:
Hi!

I am using Flink 1.8.3 and facing an issue where job submission through RestClusterClient times out on Akka (default value 10s). In previous Flink versions there was an option to set a different timeout value just for the submission client (ClusterClient config), but looks like it is not honored now as job submission from client is no more through Akka and it will use the same value present with Dispatcher. I wanted to know how to increase this timeout just for job submission without affecting other akka threads in TaskManager/JobManager, or any other solution for the problem.

The relevant stack trace is pasted below:

"cause":{"commonElementCount":8,"localizedMessage":"Could not submit job (JobID: 26940c17ae3130fb8be1323cce1036e4)","message":"Could not submit job (JobID: 26940c17ae3130fb8be1323cce1036e4)","name":"org.apache.flink.client.program.ProgramInvocationException","cause":{"commonElementCount":3,"localizedMessage":"Failed to submit JobGraph.","message":"Failed to submit JobGraph.","name":"org.apache.flink.runtime.client.JobSubmissionException","cause":{"commonElementCount":3,"localizedMessage":"[Internal server error., <Exception on server side:\nakka.pattern.AskTimeoutException: Ask timed out on [Actor[akka://flink/user/dispatcher#1457923918]] after [10000 ms]. Sender[null] sent message of type \"org.apache.flink.runtime.rpc.messages.LocalFencedMessage\".\n\tat akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:604)\n\tat akka.actor.Scheduler$$anon$4.run(Scheduler.scala:126)\n\tat scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:601)\n\tat scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109)\n\tat scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:599)\n\tat akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:329)\n\tat akka.actor.LightArrayRevolverScheduler$$anon$4.executeBucket$1(LightArrayRevolverScheduler.scala:280)\n\tat akka.actor.LightArrayRevolverScheduler$$anon$4.nextTick(LightArrayRevolverScheduler.scala:284)\n\tat akka.actor.LightArrayRevolverScheduler$$anon$4.run(LightArrayRevolverScheduler.scala:236)\n\tat java.lang.Thread.run(Thread.java:745)\n\nEnd of exception on server side>]","message":"[Internal server error., <Exception on server side:\nakka.pattern.AskTimeoutException: Ask timed out on [Actor[akka://flink/user/dispatcher#1457923918]] after [10000 ms]. Sender[null] sent message of type \"org.apache.flink.runtime.rpc.messages.LocalFencedMessage\".\n\tat akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:604)\n\tat akka.actor.Scheduler$$anon$4.run(Scheduler.scala:126)\n\tat scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:601)\n\tat scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109)\n\tat scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:599)\n\tat akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:329)\n\tat akka.actor.LightArrayRevolverScheduler$$anon$4.executeBucket$1(LightArrayRevolverScheduler.scala:280)\n\tat akka.actor.LightArrayRevolverScheduler$$anon$4.nextTick(LightArrayRevolverScheduler.scala:284)\n\tat akka.actor.LightArrayRevolverScheduler$$anon$4.run(LightArrayRevolverScheduler.scala:236)\n\tat java.lang.Thread.run(Thread.java:745)\n\nEnd of exception on server side>]","name":"org.apache.flink.runtime.rest.util.RestClientException","extendedStackTrace":[{"class":"org.apache.flink.runtime.rest.RestClient","method":"parseResponse","file":"RestClient.java","line":389,"exact":false,"location":"flink-runtime_2.11-1.8.2.jar","version":"1.8.2"},{"class":"org.apache.flink.runtime.rest.RestClient","method":"lambda$submitRequest$3","file":"RestClient.java","line":373,"exact":false,"location":"flink-runtime_2.11-1.8.2.jar","version":"1.8.2"}
Reply | Threaded
Open this post in threaded view
|

Re: Change Akka Ask Timeout for Job Submission Only

Abdul Qadeer
The relevant config here is "akka.ask.timeout".

On Thu, Dec 19, 2019 at 6:51 PM tison <[hidden email]> wrote:
In previous version there is an "akka.client.timeout" option but it is only used for timeout the future in client side so I don't think it change akka scope timeout.

Best,
tison.


Abdul Qadeer <[hidden email]> 于2019年12月20日周五 上午10:44写道:
Hi!

I am using Flink 1.8.3 and facing an issue where job submission through RestClusterClient times out on Akka (default value 10s). In previous Flink versions there was an option to set a different timeout value just for the submission client (ClusterClient config), but looks like it is not honored now as job submission from client is no more through Akka and it will use the same value present with Dispatcher. I wanted to know how to increase this timeout just for job submission without affecting other akka threads in TaskManager/JobManager, or any other solution for the problem.

The relevant stack trace is pasted below:

"cause":{"commonElementCount":8,"localizedMessage":"Could not submit job (JobID: 26940c17ae3130fb8be1323cce1036e4)","message":"Could not submit job (JobID: 26940c17ae3130fb8be1323cce1036e4)","name":"org.apache.flink.client.program.ProgramInvocationException","cause":{"commonElementCount":3,"localizedMessage":"Failed to submit JobGraph.","message":"Failed to submit JobGraph.","name":"org.apache.flink.runtime.client.JobSubmissionException","cause":{"commonElementCount":3,"localizedMessage":"[Internal server error., <Exception on server side:\nakka.pattern.AskTimeoutException: Ask timed out on [Actor[akka://flink/user/dispatcher#1457923918]] after [10000 ms]. Sender[null] sent message of type \"org.apache.flink.runtime.rpc.messages.LocalFencedMessage\".\n\tat akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:604)\n\tat akka.actor.Scheduler$$anon$4.run(Scheduler.scala:126)\n\tat scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:601)\n\tat scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109)\n\tat scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:599)\n\tat akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:329)\n\tat akka.actor.LightArrayRevolverScheduler$$anon$4.executeBucket$1(LightArrayRevolverScheduler.scala:280)\n\tat akka.actor.LightArrayRevolverScheduler$$anon$4.nextTick(LightArrayRevolverScheduler.scala:284)\n\tat akka.actor.LightArrayRevolverScheduler$$anon$4.run(LightArrayRevolverScheduler.scala:236)\n\tat java.lang.Thread.run(Thread.java:745)\n\nEnd of exception on server side>]","message":"[Internal server error., <Exception on server side:\nakka.pattern.AskTimeoutException: Ask timed out on [Actor[akka://flink/user/dispatcher#1457923918]] after [10000 ms]. Sender[null] sent message of type \"org.apache.flink.runtime.rpc.messages.LocalFencedMessage\".\n\tat akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:604)\n\tat akka.actor.Scheduler$$anon$4.run(Scheduler.scala:126)\n\tat scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:601)\n\tat scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109)\n\tat scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:599)\n\tat akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:329)\n\tat akka.actor.LightArrayRevolverScheduler$$anon$4.executeBucket$1(LightArrayRevolverScheduler.scala:280)\n\tat akka.actor.LightArrayRevolverScheduler$$anon$4.nextTick(LightArrayRevolverScheduler.scala:284)\n\tat akka.actor.LightArrayRevolverScheduler$$anon$4.run(LightArrayRevolverScheduler.scala:236)\n\tat java.lang.Thread.run(Thread.java:745)\n\nEnd of exception on server side>]","name":"org.apache.flink.runtime.rest.util.RestClientException","extendedStackTrace":[{"class":"org.apache.flink.runtime.rest.RestClient","method":"parseResponse","file":"RestClient.java","line":389,"exact":false,"location":"flink-runtime_2.11-1.8.2.jar","version":"1.8.2"},{"class":"org.apache.flink.runtime.rest.RestClient","method":"lambda$submitRequest$3","file":"RestClient.java","line":373,"exact":false,"location":"flink-runtime_2.11-1.8.2.jar","version":"1.8.2"}
Reply | Threaded
Open this post in threaded view
|

Re: Change Akka Ask Timeout for Job Submission Only

Yang Wang
It seems that not because the timeout of rest client. It is a server side akka timeout exception. 
Could you share the jobmanager logs?

Best,
Yang

Abdul Qadeer <[hidden email]> 于2019年12月20日周五 上午10:59写道:
The relevant config here is "akka.ask.timeout".

On Thu, Dec 19, 2019 at 6:51 PM tison <[hidden email]> wrote:
In previous version there is an "akka.client.timeout" option but it is only used for timeout the future in client side so I don't think it change akka scope timeout.

Best,
tison.


Abdul Qadeer <[hidden email]> 于2019年12月20日周五 上午10:44写道:
Hi!

I am using Flink 1.8.3 and facing an issue where job submission through RestClusterClient times out on Akka (default value 10s). In previous Flink versions there was an option to set a different timeout value just for the submission client (ClusterClient config), but looks like it is not honored now as job submission from client is no more through Akka and it will use the same value present with Dispatcher. I wanted to know how to increase this timeout just for job submission without affecting other akka threads in TaskManager/JobManager, or any other solution for the problem.

The relevant stack trace is pasted below:

"cause":{"commonElementCount":8,"localizedMessage":"Could not submit job (JobID: 26940c17ae3130fb8be1323cce1036e4)","message":"Could not submit job (JobID: 26940c17ae3130fb8be1323cce1036e4)","name":"org.apache.flink.client.program.ProgramInvocationException","cause":{"commonElementCount":3,"localizedMessage":"Failed to submit JobGraph.","message":"Failed to submit JobGraph.","name":"org.apache.flink.runtime.client.JobSubmissionException","cause":{"commonElementCount":3,"localizedMessage":"[Internal server error., <Exception on server side:\nakka.pattern.AskTimeoutException: Ask timed out on [Actor[akka://flink/user/dispatcher#1457923918]] after [10000 ms]. Sender[null] sent message of type \"org.apache.flink.runtime.rpc.messages.LocalFencedMessage\".\n\tat akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:604)\n\tat akka.actor.Scheduler$$anon$4.run(Scheduler.scala:126)\n\tat scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:601)\n\tat scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109)\n\tat scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:599)\n\tat akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:329)\n\tat akka.actor.LightArrayRevolverScheduler$$anon$4.executeBucket$1(LightArrayRevolverScheduler.scala:280)\n\tat akka.actor.LightArrayRevolverScheduler$$anon$4.nextTick(LightArrayRevolverScheduler.scala:284)\n\tat akka.actor.LightArrayRevolverScheduler$$anon$4.run(LightArrayRevolverScheduler.scala:236)\n\tat java.lang.Thread.run(Thread.java:745)\n\nEnd of exception on server side>]","message":"[Internal server error., <Exception on server side:\nakka.pattern.AskTimeoutException: Ask timed out on [Actor[akka://flink/user/dispatcher#1457923918]] after [10000 ms]. Sender[null] sent message of type \"org.apache.flink.runtime.rpc.messages.LocalFencedMessage\".\n\tat akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:604)\n\tat akka.actor.Scheduler$$anon$4.run(Scheduler.scala:126)\n\tat scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:601)\n\tat scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109)\n\tat scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:599)\n\tat akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:329)\n\tat akka.actor.LightArrayRevolverScheduler$$anon$4.executeBucket$1(LightArrayRevolverScheduler.scala:280)\n\tat akka.actor.LightArrayRevolverScheduler$$anon$4.nextTick(LightArrayRevolverScheduler.scala:284)\n\tat akka.actor.LightArrayRevolverScheduler$$anon$4.run(LightArrayRevolverScheduler.scala:236)\n\tat java.lang.Thread.run(Thread.java:745)\n\nEnd of exception on server side>]","name":"org.apache.flink.runtime.rest.util.RestClientException","extendedStackTrace":[{"class":"org.apache.flink.runtime.rest.RestClient","method":"parseResponse","file":"RestClient.java","line":389,"exact":false,"location":"flink-runtime_2.11-1.8.2.jar","version":"1.8.2"},{"class":"org.apache.flink.runtime.rest.RestClient","method":"lambda$submitRequest$3","file":"RestClient.java","line":373,"exact":false,"location":"flink-runtime_2.11-1.8.2.jar","version":"1.8.2"}
Reply | Threaded
Open this post in threaded view
|

Re: Change Akka Ask Timeout for Job Submission Only

tison
IIRC this issue is possibly caused by resource limited or some occasional reasons. Ever I heard that someone upgrade Java version and the issue vanished.

For "akka.ask.timeout", it is used for all akka ask requests timeout. And I second Yang that the timeout is irrelevant with client-server connection.

Best,
tison.


Yang Wang <[hidden email]> 于2019年12月20日周五 上午11:02写道:
It seems that not because the timeout of rest client. It is a server side akka timeout exception. 
Could you share the jobmanager logs?

Best,
Yang

Abdul Qadeer <[hidden email]> 于2019年12月20日周五 上午10:59写道:
The relevant config here is "akka.ask.timeout".

On Thu, Dec 19, 2019 at 6:51 PM tison <[hidden email]> wrote:
In previous version there is an "akka.client.timeout" option but it is only used for timeout the future in client side so I don't think it change akka scope timeout.

Best,
tison.


Abdul Qadeer <[hidden email]> 于2019年12月20日周五 上午10:44写道:
Hi!

I am using Flink 1.8.3 and facing an issue where job submission through RestClusterClient times out on Akka (default value 10s). In previous Flink versions there was an option to set a different timeout value just for the submission client (ClusterClient config), but looks like it is not honored now as job submission from client is no more through Akka and it will use the same value present with Dispatcher. I wanted to know how to increase this timeout just for job submission without affecting other akka threads in TaskManager/JobManager, or any other solution for the problem.

The relevant stack trace is pasted below:

"cause":{"commonElementCount":8,"localizedMessage":"Could not submit job (JobID: 26940c17ae3130fb8be1323cce1036e4)","message":"Could not submit job (JobID: 26940c17ae3130fb8be1323cce1036e4)","name":"org.apache.flink.client.program.ProgramInvocationException","cause":{"commonElementCount":3,"localizedMessage":"Failed to submit JobGraph.","message":"Failed to submit JobGraph.","name":"org.apache.flink.runtime.client.JobSubmissionException","cause":{"commonElementCount":3,"localizedMessage":"[Internal server error., <Exception on server side:\nakka.pattern.AskTimeoutException: Ask timed out on [Actor[akka://flink/user/dispatcher#1457923918]] after [10000 ms]. Sender[null] sent message of type \"org.apache.flink.runtime.rpc.messages.LocalFencedMessage\".\n\tat akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:604)\n\tat akka.actor.Scheduler$$anon$4.run(Scheduler.scala:126)\n\tat scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:601)\n\tat scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109)\n\tat scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:599)\n\tat akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:329)\n\tat akka.actor.LightArrayRevolverScheduler$$anon$4.executeBucket$1(LightArrayRevolverScheduler.scala:280)\n\tat akka.actor.LightArrayRevolverScheduler$$anon$4.nextTick(LightArrayRevolverScheduler.scala:284)\n\tat akka.actor.LightArrayRevolverScheduler$$anon$4.run(LightArrayRevolverScheduler.scala:236)\n\tat java.lang.Thread.run(Thread.java:745)\n\nEnd of exception on server side>]","message":"[Internal server error., <Exception on server side:\nakka.pattern.AskTimeoutException: Ask timed out on [Actor[akka://flink/user/dispatcher#1457923918]] after [10000 ms]. Sender[null] sent message of type \"org.apache.flink.runtime.rpc.messages.LocalFencedMessage\".\n\tat akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:604)\n\tat akka.actor.Scheduler$$anon$4.run(Scheduler.scala:126)\n\tat scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:601)\n\tat scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109)\n\tat scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:599)\n\tat akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:329)\n\tat akka.actor.LightArrayRevolverScheduler$$anon$4.executeBucket$1(LightArrayRevolverScheduler.scala:280)\n\tat akka.actor.LightArrayRevolverScheduler$$anon$4.nextTick(LightArrayRevolverScheduler.scala:284)\n\tat akka.actor.LightArrayRevolverScheduler$$anon$4.run(LightArrayRevolverScheduler.scala:236)\n\tat java.lang.Thread.run(Thread.java:745)\n\nEnd of exception on server side>]","name":"org.apache.flink.runtime.rest.util.RestClientException","extendedStackTrace":[{"class":"org.apache.flink.runtime.rest.RestClient","method":"parseResponse","file":"RestClient.java","line":389,"exact":false,"location":"flink-runtime_2.11-1.8.2.jar","version":"1.8.2"},{"class":"org.apache.flink.runtime.rest.RestClient","method":"lambda$submitRequest$3","file":"RestClient.java","line":373,"exact":false,"location":"flink-runtime_2.11-1.8.2.jar","version":"1.8.2"}
Reply | Threaded
Open this post in threaded view
|

Re: Change Akka Ask Timeout for Job Submission Only

tison
Forward to user list.

Best,
tison.


Abdul Qadeer <[hidden email]> 于2019年12月20日周五 下午12:57写道:
Around submission time, logs from jobmanager:

{"timeMillis":1576764854245,"thread":"flink-akka.actor.default-dispatcher-1016","level":"INFO","loggerName":"org.apache.flink.runtime.dispatcher.StandaloneDispatcher","message":"Received JobGraph submission 714829e8f6c8cd0daaed335c1b8c588a (sample).","endOfBatch":false,"loggerFqcn":"org.apache.logging.slf4j.Log4jLogger","threadId":1126,"threadPriority":5}^M
{"timeMillis":1576764854247,"thread":"flink-akka.actor.default-dispatcher-1016","level":"INFO","loggerName":"org.apache.flink.runtime.dispatcher.StandaloneDispatcher","message":"Submitting job 714829e8f6c8cd0daaed335c1b8c588a (sample).","endOfBatch":false,"loggerFqcn":"org.apache.logging.slf4j.Log4jLogger","threadId":1126,"threadPriority":5}^M
{"timeMillis":1576764856119,"thread":"flink-akka.actor.default-dispatcher-1036","level":"INFO","loggerName":"akka.actor.RepointableActorRef","message":"Message [org.apache.flink.runtime.rpc.messages.LocalFencedMessage] from Actor[akka://flink/deadLetters] to Actor[akka://flink/user/jobmanager_4#-2122695705] was not delivered. [87] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.","endOfBatch":false,"loggerFqcn":"org.apache.logging.slf4j.Log4jLogger","threadId":1150,"threadPriority":5}^M
{"timeMillis":1576764877732,"thread":"flink-akka.actor.default-dispatcher-1039","level":"INFO","loggerName":"akka.actor.RepointableActorRef","message":"Message [org.apache.flink.runtime.rpc.messages.LocalFencedMessage] from Actor[akka://flink/deadLetters] to Actor[akka://flink/user/jobmanager_4#-2122695705] was not delivered. [88] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.","endOfBatch":false,"loggerFqcn":"org.apache.logging.slf4j.Log4jLogger","threadId":1155,"threadPriority":5}^M
{"timeMillis":1576764877732,"thread":"flink-scheduler-1","level":"ERROR","loggerName":"org.apache.flink.runtime.rest.handler.job.JobSubmitHandler","message":"Unhandled exception.","thrown":{"commonElementCount":0,"localizedMessage":"Ask timed out on [Actor[akka://flink/user/dispatcher#1899316777]] after [10000 ms]. Sender[null] sent message of type \"org.apache.flink.runtime.rpc.messages.LocalFencedMessage\".","message":"Ask timed out on [Actor[akka://flink/user/dispatcher#1899316777]] after [10000 ms]. Sender[null] sent message of type \"org.apache.flink.runtime.rpc.messages.LocalFencedMessage\".","name":"akka.pattern.AskTimeoutException","extendedStackTrace":[{"class":"akka.pattern.PromiseActorRef$$anonfun$1","method":"apply$mcV$sp","file":"AskSupport.scala","line":604,"exact":true,"location":"akka-actor_2.11-2.4.20.jar","version":"?"},{"class":"akka.actor.Scheduler$$anon$4","method":"run","file":"Scheduler.scala","line":126,"exact":true,"location":"akka-actor_2.11-2.4.20.jar","version":"?"},{"class":"scala.concurrent.Future$InternalCallbackExecutor$","method":"unbatchedExecute","file":"Future.scala","line":601,"exact":true,"location":"scala-library-2.11.12.jar","version":"?"},{"class":"scala.concurrent.BatchingExecutor$class","method":"execute","file":"BatchingExecutor.scala","line":109,"exact":true,"location":"scala-library-2.11.12.jar","version":"?"},{"class":"scala.concurrent.Future$InternalCallbackExecutor$","method":"execute","file":"Future.scala","line":599,"exact":true,"location":"scala-library-2.11.12.jar","version":"?"},{"class":"akka.actor.LightArrayRevolverScheduler$TaskHolder","method":"executeTask","file":"LightArrayRevolverScheduler.scala","line":329,"exact":true,"location":"akka-actor_2.11-2.4.20.jar","version":"?"},{"class":"akka.actor.LightArrayRevolverScheduler$$anon$4","method":"executeBucket$1","file":"LightArrayRevolverScheduler.scala","line":280,"exact":true,"location":"akka-actor_2.11-2.4.20.jar","version":"?"},{"class":"akka.actor.LightArrayRevolverScheduler$$anon$4","method":"nextTick","file":"LightArrayRevolverScheduler.scala","line":284,"exact":true,"location":"akka-actor_2.11-2.4.20.jar","version":"?"},{"class":"akka.actor.LightArrayRevolverScheduler$$anon$4","method":"run","file":"LightArrayRevolverScheduler.scala","line":236,"exact":true,"location":"akka-actor_2.11-2.4.20.jar","version":"?"},{"class":"java.lang.Thread","method":"run","file":"Thread.java","line":745,"exact":true,"location":"?","version":"1.8.0_66"}]},"endOfBatch":false,"loggerFqcn":"org.apache.logging.slf4j.Log4jLogger","threadId":41,"threadPriority":5}^M
{"timeMillis":1576764877809,"thread":"flink-akka.actor.default-dispatcher-1016","level":"INFO","loggerName":"org.apache.flink.runtime.rpc.akka.AkkaRpcService","message":"Starting RPC endpoint for org.apache.flink.runtime.jobmaster.JobMaster at akka://flink/user/jobmanager_5 .","endOfBatch":false,"loggerFqcn":"org.apache.logging.slf4j.Log4jLogger","threadId":1126,"threadPriority":5}^M
{"timeMillis":1576764877810,"thread":"flink-akka.actor.default-dispatcher-1016","level":"INFO","loggerName":"org.apache.flink.runtime.jobmaster.JobMaster","message":"Initializing job sample (714829e8f6c8cd0daaed335c1b8c588a).","endOfBatch":false,"loggerFqcn":"org.apache.logging.slf4j.Log4jLogger","threadId":1126,"threadPriority":5}^M
{"timeMillis":1576764877809,"thread":"flink-scheduler-1","level":"ERROR","loggerName":"org.apache.flink.runtime.rest.handler.job.JobsOverviewHandler","message":"Unhandled exception.","thrown":{"commonElementCount":0,"localizedMessage":"Ask timed out on [Actor[akka://flink/user/dispatcher#1899316777]] after [10000 ms]. Sender[null] sent message of type \"org.apache.flink.runtime.rpc.messages.LocalFencedMessage\".","message":"Ask timed out on [Actor[akka://flink/user/dispatcher#1899316777]] after [10000 ms]. Sender[null] sent message of type \"org.apache.flink.runtime.rpc.messages.LocalFencedMessage\".","name":"akka.pattern.AskTimeoutException","extendedStackTrace":[{"class":"akka.pattern.PromiseActorRef$$anonfun$1","method":"apply$mcV$sp","file":"AskSupport.scala","line":604,"exact":true,"location":"akka-actor_2.11-2.4.20.jar","version":"?"},{"class":"akka.actor.Scheduler$$anon$4","method":"run","file":"Scheduler.scala","line":126,"exact":true,"location":"akka-actor_2.11-2.4.20.jar","version":"?"},{"class":"scala.concurrent.Future$InternalCallbackExecutor$","method":"unbatchedExecute","file":"Future.scala","line":601,"exact":true,"location":"scala-library-2.11.12.jar","version":"?"},{"class":"scala.concurrent.BatchingExecutor$class","method":"execute","file":"BatchingExecutor.scala","line":109,"exact":true,"location":"scala-library-2.11.12.jar","version":"?"},{"class":"scala.concurrent.Future$InternalCallbackExecutor$","method":"execute","file":"Future.scala","line":599,"exact":true,"location":"scala-library-2.11.12.jar","version":"?"},{"class":"akka.actor.LightArrayRevolverScheduler$TaskHolder","method":"executeTask","file":"LightArrayRevolverScheduler.scala","line":329,"exact":true,"location":"akka-actor_2.11-2.4.20.jar","version":"?"},{"class":"akka.actor.LightArrayRevolverScheduler$$anon$4","method":"executeBucket$1","file":"LightArrayRevolverScheduler.scala","line":280,"exact":true,"location":"akka-actor_2.11-2.4.20.jar","version":"?"},{"class":"akka.actor.LightArrayRevolverScheduler$$anon$4","method":"nextTick","file":"LightArrayRevolverScheduler.scala","line":284,"exact":true,"location":"akka-actor_2.11-2.4.20.jar","version":"?"},{"class":"akka.actor.LightArrayRevolverScheduler$$anon$4","method":"run","file":"LightArrayRevolverScheduler.scala","line":236,"exact":true,"location":"akka-actor_2.11-2.4.20.jar","version":"?"},{"class":"java.lang.Thread","method":"run","file":"Thread.java","line":745,"exact":true,"location":"?","version":"1.8.0_66"}]},"endOfBatch":false,"loggerFqcn":"org.apache.logging.slf4j.Log4jLogger","threadId":41,"threadPriority":5}^M
{"timeMillis":1576764877812,"thread":"flink-scheduler-1","level":"ERROR","loggerName":"org.apache.flink.runtime.rest.handler.cluster.ClusterOverviewHandler","message":"Unhandled exception.","thrown":{"commonElementCount":0,"localizedMessage":"Ask timed out on [Actor[akka://flink/user/dispatcher#1899316777]] after [10000 ms]. Sender[null] sent message of type \"org.apache.flink.runtime.rpc.messages.LocalFencedMessage\".","message":"Ask timed out on [Actor[akka://flink/user/dispatcher#1899316777]] after [10000 ms]. Sender[null] sent message of type \"org.apache.flink.runtime.rpc.messages.LocalFencedMessage\".","name":"akka.pattern.AskTimeoutException","extendedStackTrace":[{"class":"akka.pattern.PromiseActorRef$$anonfun$1","method":"apply$mcV$sp","file":"AskSupport.scala","line":604,"exact":true,"location":"akka-actor_2.11-2.4.20.jar","version":"?"},{"class":"akka.actor.Scheduler$$anon$4","method":"run","file":"Scheduler.scala","line":126,"exact":true,"location":"akka-actor_2.11-2.4.20.jar","version":"?"},{"class":"scala.concurrent.Future$InternalCallbackExecutor$","method":"unbatchedExecute","file":"Future.scala","line":601,"exact":true,"location":"scala-library-2.11.12.jar","version":"?"},{"class":"scala.concurrent.BatchingExecutor$class","method":"execute","file":"BatchingExecutor.scala","line":109,"exact":true,"location":"scala-library-2.11.12.jar","version":"?"},{"class":"scala.concurrent.Future$InternalCallbackExecutor$","method":"execute","file":"Future.scala","line":599,"exact":true,"location":"scala-library-2.11.12.jar","version":"?"},{"class":"akka.actor.LightArrayRevolverScheduler$TaskHolder","method":"executeTask","file":"LightArrayRevolverScheduler.scala","line":329,"exact":true,"location":"akka-actor_2.11-2.4.20.jar","version":"?"},{"class":"akka.actor.LightArrayRevolverScheduler$$anon$4","method":"executeBucket$1","file":"LightArrayRevolverScheduler.scala","line":280,"exact":true,"location":"akka-actor_2.11-2.4.20.jar","version":"?"},{"class":"akka.actor.LightArrayRevolverScheduler$$anon$4","method":"nextTick","file":"LightArrayRevolverScheduler.scala","line":284,"exact":true,"location":"akka-actor_2.11-2.4.20.jar","version":"?"},{"class":"akka.actor.LightArrayRevolverScheduler$$anon$4","method":"run","file":"LightArrayRevolverScheduler.scala","line":236,"exact":true,"location":"akka-actor_2.11-2.4.20.jar","version":"?"},{"class":"java.lang.Thread","method":"run","file":"Thread.java","line":745,"exact":true,"location":"?","version":"1.8.0_66"}]},"endOfBatch":false,"loggerFqcn":"org.apache.logging.slf4j.Log4jLogger","threadId":41,"threadPriority":5}^M
{"timeMillis":1576764877815,"thread":"flink-akka.actor.default-dispatcher-1038","level":"INFO","loggerName":"akka.actor.RepointableActorRef","message":"Message [org.apache.flink.runtime.rpc.messages.LocalFencedMessage] from Actor[akka://flink/deadLetters] to Actor[akka://flink/user/jobmanager_4#-2122695705] was not delivered. [89] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.","endOfBatch":false,"loggerFqcn":"org.apache.logging.slf4j.Log4jLogger","threadId":1152,"threadPriority":5}^M
{"timeMillis":1576764877816,"thread":"flink-akka.actor.default-dispatcher-1038","level":"INFO","loggerName":"akka.actor.RepointableActorRef","message":"Message [org.apache.flink.runtime.rpc.messages.LocalFencedMessage] from Actor[akka://flink/deadLetters] to Actor[akka://flink/user/jobmanager_4#-2122695705] was not delivered. [90] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.","endOfBatch":false,"loggerFqcn":"org.apache.logging.slf4j.Log4jLogger","threadId":1152,"threadPriority":5}^M
{"timeMillis":1576764877816,"thread":"flink-akka.actor.default-dispatcher-1038","level":"INFO","loggerName":"akka.actor.RepointableActorRef","message":"Message [org.apache.flink.runtime.rpc.messages.LocalFencedMessage] from Actor[akka://flink/deadLetters] to Actor[akka://flink/user/jobmanager_4#-2122695705] was not delivered. [91] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.","endOfBatch":false,"loggerFqcn":"org.apache.logging.slf4j.Log4jLogger","threadId":1152,"threadPriority":5}^M
{"timeMillis":1576764877830,"thread":"flink-akka.actor.default-dispatcher-1016","level":"INFO","loggerName":"org.apache.flink.runtime.jobmaster.JobMaster","message":"Using restart strategy FixedDelayRestartStrategy(maxNumberRestartAttempts=2147483647, delayBetweenRestartAttempts=10000) for sample (714829e8f6c8cd0daaed335c1b8c588a).","endOfBatch":false,"loggerFqcn":"org.apache.logging.slf4j.Log4jLogger","threadId":1126,"threadPriority":5}^M
{"timeMillis":1576764877834,"thread":"flink-akka.actor.default-dispatcher-1016","level":"INFO","loggerName":"org.apache.flink.runtime.executiongraph.ExecutionGraph","message":"Job recovers via failover strategy: full graph restart","endOfBatch":false,"loggerFqcn":"org.apache.logging.slf4j.Log4jLogger","threadId":1126,"threadPriority":5}^M
{"timeMillis":1576764877837,"thread":"flink-akka.actor.default-dispatcher-1016","level":"INFO","loggerName":"org.apache.flink.runtime.jobmaster.JobMaster","message":"Running initialization on master for job sample (714829e8f6c8cd0daaed335c1b8c588a).","endOfBatch":false,"loggerFqcn":"org.apache.logging.slf4j.Log4jLogger","threadId":1126,"threadPriority":5}^M
{"timeMillis":1576764877837,"thread":"flink-akka.actor.default-dispatcher-1016","level":"INFO","loggerName":"org.apache.flink.runtime.jobmaster.JobMaster","message":"Successfully ran initialization on master in 0 ms.","endOfBatch":false,"loggerFqcn":"org.apache.logging.slf4j.Log4jLogger","threadId":1126,"threadPriority":5}^M
{"timeMillis":1576764877876,"thread":"flink-akka.actor.default-dispatcher-1016","level":"INFO","loggerName":"org.apache.flink.runtime.util.ZooKeeperUtils","message":"Initialized ZooKeeperCompletedCheckpointStore in '/checkpoints/714829e8f6c8cd0daaed335c1b8c588a'.","endOfBatch":false,"loggerFqcn":"org.apache.logging.slf4j.Log4jLogger","threadId":1126,"threadPriority":5}^M
{"timeMillis":1576764877878,"thread":"flink-akka.actor.default-dispatcher-1016","level":"INFO","loggerName":"org.apache.flink.runtime.jobmaster.JobMaster","message":"State backend is set to heap memory (checkpoints to filesystem \"file:/var/data/ndp/_no_backup_/flink/pipeline-checkpoint/fs\")","endOfBatch":false,"loggerFqcn":"org.apache.logging.slf4j.Log4jLogger","threadId":1126,"threadPriority":5}^M
{"timeMillis":1576764878554,"thread":"flink-akka.actor.default-dispatcher-1016","level":"INFO","loggerName":"org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore","message":"Recovering checkpoints from ZooKeeper.","endOfBatch":false,"loggerFqcn":"org.apache.logging.slf4j.Log4jLogger","threadId":1126,"threadPriority":5}^M
{"timeMillis":1576764878579,"thread":"flink-akka.actor.default-dispatcher-1016","level":"INFO","loggerName":"org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore","message":"Found 0 checkpoints in ZooKeeper.","endOfBatch":false,"loggerFqcn":"org.apache.logging.slf4j.Log4jLogger","threadId":1126,"threadPriority":5}^M
{"timeMillis":1576764878579,"thread":"flink-akka.actor.default-dispatcher-1016","level":"INFO","loggerName":"org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore","message":"Trying to fetch 0 checkpoints from storage.","endOfBatch":false,"loggerFqcn":"org.apache.logging.slf4j.Log4jLogger","threadId":1126,"threadPriority":5}^M
{"timeMillis":1576764878579,"thread":"flink-akka.actor.default-dispatcher-1016","level":"INFO","loggerName":"org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionService","message":"Starting ZooKeeperLeaderElectionService ZooKeeperLeaderElectionService{leaderPath='/leader/714829e8f6c8cd0daaed335c1b8c588a/job_manager_lock'}.","endOfBatch":false,"loggerFqcn":"org.apache.logging.slf4j.Log4jLogger","threadId":1126,"threadPriority":5}^M
{"timeMillis":1576764878584,"thread":"flink-akka.actor.default-dispatcher-1027","level":"INFO","loggerName":"akka.remote.RemoteActorRefProvider$RemoteDeadLetterActorRef","message":"Message [org.apache.flink.runtime.messages.Acknowledge] from Actor[akka://flink/deadLetters] to Actor[akka://flink/deadLetters] was not delivered. [92] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.","endOfBatch":false,"loggerFqcn":"org.apache.logging.slf4j.Log4jLogger","threadId":1137,"threadPriority":5}^M
{"timeMillis":1576764878623,"thread":"main-EventThread","level":"INFO","loggerName":"org.apache.flink.runtime.jobmaster.JobManagerRunner","message":"JobManager runner for job sample (714829e8f6c8cd0daaed335c1b8c588a) was granted leadership with session id 69063d3f-7610-4c42-8f25-cf654d87a7e1 at akka.tcp://flink@169.254.35.76:43723/user/jobmanager_5.","endOfBatch":false,"loggerFqcn":"org.apache.logging.slf4j.Log4jLogger","threadId":58,"threadPriority":5}^M
{"timeMillis":1576764878653,"thread":"flink-akka.actor.default-dispatcher-1027","level":"INFO","loggerName":"org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService","message":"Starting ZooKeeperLeaderRetrievalService /leader/resource_manager_lock.","endOfBatch":false,"loggerFqcn":"org.apache.logging.slf4j.Log4jLogger","threadId":1137,"threadPriority":5}^M
{"timeMillis":1576764878654,"thread":"flink-akka.actor.default-dispatcher-1027","level":"INFO","loggerName":"org.apache.flink.runtime.jobmaster.JobMaster","message":"Starting execution of job sample (714829e8f6c8cd0daaed335c1b8c588a) under job master id 8f25cf654d87a7e169063d3f76104c42.","endOfBatch":false,"loggerFqcn":"org.apache.logging.slf4j.Log4jLogger","threadId":1137,"threadPriority":5}^M
{"timeMillis":1576764878654,"thread":"flink-akka.actor.default-dispatcher-1027","level":"INFO","loggerName":"org.apache.flink.runtime.executiongraph.ExecutionGraph","message":"Job sample (714829e8f6c8cd0daaed335c1b8c588a) switched from state CREATED to RUNNING.","endOfBatch":false,"loggerFqcn":"org.apache.logging.slf4j.Log4jLogger","threadId":1137,"threadPriority":5}^M

On Thu, Dec 19, 2019 at 7:20 PM tison <[hidden email]> wrote:
IIRC this issue is possibly caused by resource limited or some occasional reasons. Ever I heard that someone upgrade Java version and the issue vanished.

For "akka.ask.timeout", it is used for all akka ask requests timeout. And I second Yang that the timeout is irrelevant with client-server connection.

Best,
tison.


Yang Wang <[hidden email]> 于2019年12月20日周五 上午11:02写道:
It seems that not because the timeout of rest client. It is a server side akka timeout exception. 
Could you share the jobmanager logs?

Best,
Yang

Abdul Qadeer <[hidden email]> 于2019年12月20日周五 上午10:59写道:
The relevant config here is "akka.ask.timeout".

On Thu, Dec 19, 2019 at 6:51 PM tison <[hidden email]> wrote:
In previous version there is an "akka.client.timeout" option but it is only used for timeout the future in client side so I don't think it change akka scope timeout.

Best,
tison.


Abdul Qadeer <[hidden email]> 于2019年12月20日周五 上午10:44写道:
Hi!

I am using Flink 1.8.3 and facing an issue where job submission through RestClusterClient times out on Akka (default value 10s). In previous Flink versions there was an option to set a different timeout value just for the submission client (ClusterClient config), but looks like it is not honored now as job submission from client is no more through Akka and it will use the same value present with Dispatcher. I wanted to know how to increase this timeout just for job submission without affecting other akka threads in TaskManager/JobManager, or any other solution for the problem.

The relevant stack trace is pasted below:

"cause":{"commonElementCount":8,"localizedMessage":"Could not submit job (JobID: 26940c17ae3130fb8be1323cce1036e4)","message":"Could not submit job (JobID: 26940c17ae3130fb8be1323cce1036e4)","name":"org.apache.flink.client.program.ProgramInvocationException","cause":{"commonElementCount":3,"localizedMessage":"Failed to submit JobGraph.","message":"Failed to submit JobGraph.","name":"org.apache.flink.runtime.client.JobSubmissionException","cause":{"commonElementCount":3,"localizedMessage":"[Internal server error., <Exception on server side:\nakka.pattern.AskTimeoutException: Ask timed out on [Actor[akka://flink/user/dispatcher#1457923918]] after [10000 ms]. Sender[null] sent message of type \"org.apache.flink.runtime.rpc.messages.LocalFencedMessage\".\n\tat akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:604)\n\tat akka.actor.Scheduler$$anon$4.run(Scheduler.scala:126)\n\tat scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:601)\n\tat scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109)\n\tat scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:599)\n\tat akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:329)\n\tat akka.actor.LightArrayRevolverScheduler$$anon$4.executeBucket$1(LightArrayRevolverScheduler.scala:280)\n\tat akka.actor.LightArrayRevolverScheduler$$anon$4.nextTick(LightArrayRevolverScheduler.scala:284)\n\tat akka.actor.LightArrayRevolverScheduler$$anon$4.run(LightArrayRevolverScheduler.scala:236)\n\tat java.lang.Thread.run(Thread.java:745)\n\nEnd of exception on server side>]","message":"[Internal server error., <Exception on server side:\nakka.pattern.AskTimeoutException: Ask timed out on [Actor[akka://flink/user/dispatcher#1457923918]] after [10000 ms]. Sender[null] sent message of type \"org.apache.flink.runtime.rpc.messages.LocalFencedMessage\".\n\tat akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:604)\n\tat akka.actor.Scheduler$$anon$4.run(Scheduler.scala:126)\n\tat scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:601)\n\tat scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109)\n\tat scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:599)\n\tat akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:329)\n\tat akka.actor.LightArrayRevolverScheduler$$anon$4.executeBucket$1(LightArrayRevolverScheduler.scala:280)\n\tat akka.actor.LightArrayRevolverScheduler$$anon$4.nextTick(LightArrayRevolverScheduler.scala:284)\n\tat akka.actor.LightArrayRevolverScheduler$$anon$4.run(LightArrayRevolverScheduler.scala:236)\n\tat java.lang.Thread.run(Thread.java:745)\n\nEnd of exception on server side>]","name":"org.apache.flink.runtime.rest.util.RestClientException","extendedStackTrace":[{"class":"org.apache.flink.runtime.rest.RestClient","method":"parseResponse","file":"RestClient.java","line":389,"exact":false,"location":"flink-runtime_2.11-1.8.2.jar","version":"1.8.2"},{"class":"org.apache.flink.runtime.rest.RestClient","method":"lambda$submitRequest$3","file":"RestClient.java","line":373,"exact":false,"location":"flink-runtime_2.11-1.8.2.jar","version":"1.8.2"}
Reply | Threaded
Open this post in threaded view
|

Re: Change Akka Ask Timeout for Job Submission Only

Chesnay Schepler
There are 3 communication layers involved here:

1) client <=> server (REST API)

This goes through REST and does not use timeouts AFAIK. We wait until a response comes or the connection terminates.

2) server (REST API) <=> processes (JM, Dispatcher)

This goes through akka, with "web.timeout" being used for the timeout.

3) processes <=> processes

Also akka, with "akka.ask.timeout" being used.


The timeout in question occurs on layer 2) due to the JM being incredibly busy, possible due to some heavy-weight computation in the job setup.
In any case, you can try increasing web.timeout to maybe resolve this issue.


On 20/12/2019 06:13, tison wrote:
Forward to user list.

Best,
tison.


Abdul Qadeer <[hidden email]> 于2019年12月20日周五 下午12:57写道:
Around submission time, logs from jobmanager:

{"timeMillis":1576764854245,"thread":"flink-akka.actor.default-dispatcher-1016","level":"INFO","loggerName":"org.apache.flink.runtime.dispatcher.StandaloneDispatcher","message":"Received JobGraph submission 714829e8f6c8cd0daaed335c1b8c588a (sample).","endOfBatch":false,"loggerFqcn":"org.apache.logging.slf4j.Log4jLogger","threadId":1126,"threadPriority":5}^M
{"timeMillis":1576764854247,"thread":"flink-akka.actor.default-dispatcher-1016","level":"INFO","loggerName":"org.apache.flink.runtime.dispatcher.StandaloneDispatcher","message":"Submitting job 714829e8f6c8cd0daaed335c1b8c588a (sample).","endOfBatch":false,"loggerFqcn":"org.apache.logging.slf4j.Log4jLogger","threadId":1126,"threadPriority":5}^M
{"timeMillis":1576764856119,"thread":"flink-akka.actor.default-dispatcher-1036","level":"INFO","loggerName":"akka.actor.RepointableActorRef","message":"Message [org.apache.flink.runtime.rpc.messages.LocalFencedMessage] from Actor[akka://flink/deadLetters] to Actor[akka://flink/user/jobmanager_4#-2122695705] was not delivered. [87] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.","endOfBatch":false,"loggerFqcn":"org.apache.logging.slf4j.Log4jLogger","threadId":1150,"threadPriority":5}^M
{"timeMillis":1576764877732,"thread":"flink-akka.actor.default-dispatcher-1039","level":"INFO","loggerName":"akka.actor.RepointableActorRef","message":"Message [org.apache.flink.runtime.rpc.messages.LocalFencedMessage] from Actor[akka://flink/deadLetters] to Actor[akka://flink/user/jobmanager_4#-2122695705] was not delivered. [88] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.","endOfBatch":false,"loggerFqcn":"org.apache.logging.slf4j.Log4jLogger","threadId":1155,"threadPriority":5}^M
{"timeMillis":1576764877732,"thread":"flink-scheduler-1","level":"ERROR","loggerName":"org.apache.flink.runtime.rest.handler.job.JobSubmitHandler","message":"Unhandled exception.","thrown":{"commonElementCount":0,"localizedMessage":"Ask timed out on [Actor[akka://flink/user/dispatcher#1899316777]] after [10000 ms]. Sender[null] sent message of type \"org.apache.flink.runtime.rpc.messages.LocalFencedMessage\".","message":"Ask timed out on [Actor[akka://flink/user/dispatcher#1899316777]] after [10000 ms]. Sender[null] sent message of type \"org.apache.flink.runtime.rpc.messages.LocalFencedMessage\".","name":"akka.pattern.AskTimeoutException","extendedStackTrace":[{"class":"akka.pattern.PromiseActorRef$$anonfun$1","method":"apply$mcV$sp","file":"AskSupport.scala","line":604,"exact":true,"location":"akka-actor_2.11-2.4.20.jar","version":"?"},{"class":"akka.actor.Scheduler$$anon$4","method":"run","file":"Scheduler.scala","line":126,"exact":true,"location":"akka-actor_2.11-2.4.20.jar","version":"?"},{"class":"scala.concurrent.Future$InternalCallbackExecutor$","method":"unbatchedExecute","file":"Future.scala","line":601,"exact":true,"location":"scala-library-2.11.12.jar","version":"?"},{"class":"scala.concurrent.BatchingExecutor$class","method":"execute","file":"BatchingExecutor.scala","line":109,"exact":true,"location":"scala-library-2.11.12.jar","version":"?"},{"class":"scala.concurrent.Future$InternalCallbackExecutor$","method":"execute","file":"Future.scala","line":599,"exact":true,"location":"scala-library-2.11.12.jar","version":"?"},{"class":"akka.actor.LightArrayRevolverScheduler$TaskHolder","method":"executeTask","file":"LightArrayRevolverScheduler.scala","line":329,"exact":true,"location":"akka-actor_2.11-2.4.20.jar","version":"?"},{"class":"akka.actor.LightArrayRevolverScheduler$$anon$4","method":"executeBucket$1","file":"LightArrayRevolverScheduler.scala","line":280,"exact":true,"location":"akka-actor_2.11-2.4.20.jar","version":"?"},{"class":"akka.actor.LightArrayRevolverScheduler$$anon$4","method":"nextTick","file":"LightArrayRevolverScheduler.scala","line":284,"exact":true,"location":"akka-actor_2.11-2.4.20.jar","version":"?"},{"class":"akka.actor.LightArrayRevolverScheduler$$anon$4","method":"run","file":"LightArrayRevolverScheduler.scala","line":236,"exact":true,"location":"akka-actor_2.11-2.4.20.jar","version":"?"},{"class":"java.lang.Thread","method":"run","file":"Thread.java","line":745,"exact":true,"location":"?","version":"1.8.0_66"}]},"endOfBatch":false,"loggerFqcn":"org.apache.logging.slf4j.Log4jLogger","threadId":41,"threadPriority":5}^M
{"timeMillis":1576764877809,"thread":"flink-akka.actor.default-dispatcher-1016","level":"INFO","loggerName":"org.apache.flink.runtime.rpc.akka.AkkaRpcService","message":"Starting RPC endpoint for org.apache.flink.runtime.jobmaster.JobMaster at akka://flink/user/jobmanager_5 .","endOfBatch":false,"loggerFqcn":"org.apache.logging.slf4j.Log4jLogger","threadId":1126,"threadPriority":5}^M
{"timeMillis":1576764877810,"thread":"flink-akka.actor.default-dispatcher-1016","level":"INFO","loggerName":"org.apache.flink.runtime.jobmaster.JobMaster","message":"Initializing job sample (714829e8f6c8cd0daaed335c1b8c588a).","endOfBatch":false,"loggerFqcn":"org.apache.logging.slf4j.Log4jLogger","threadId":1126,"threadPriority":5}^M
{"timeMillis":1576764877809,"thread":"flink-scheduler-1","level":"ERROR","loggerName":"org.apache.flink.runtime.rest.handler.job.JobsOverviewHandler","message":"Unhandled exception.","thrown":{"commonElementCount":0,"localizedMessage":"Ask timed out on [Actor[akka://flink/user/dispatcher#1899316777]] after [10000 ms]. Sender[null] sent message of type \"org.apache.flink.runtime.rpc.messages.LocalFencedMessage\".","message":"Ask timed out on [Actor[akka://flink/user/dispatcher#1899316777]] after [10000 ms]. Sender[null] sent message of type \"org.apache.flink.runtime.rpc.messages.LocalFencedMessage\".","name":"akka.pattern.AskTimeoutException","extendedStackTrace":[{"class":"akka.pattern.PromiseActorRef$$anonfun$1","method":"apply$mcV$sp","file":"AskSupport.scala","line":604,"exact":true,"location":"akka-actor_2.11-2.4.20.jar","version":"?"},{"class":"akka.actor.Scheduler$$anon$4","method":"run","file":"Scheduler.scala","line":126,"exact":true,"location":"akka-actor_2.11-2.4.20.jar","version":"?"},{"class":"scala.concurrent.Future$InternalCallbackExecutor$","method":"unbatchedExecute","file":"Future.scala","line":601,"exact":true,"location":"scala-library-2.11.12.jar","version":"?"},{"class":"scala.concurrent.BatchingExecutor$class","method":"execute","file":"BatchingExecutor.scala","line":109,"exact":true,"location":"scala-library-2.11.12.jar","version":"?"},{"class":"scala.concurrent.Future$InternalCallbackExecutor$","method":"execute","file":"Future.scala","line":599,"exact":true,"location":"scala-library-2.11.12.jar","version":"?"},{"class":"akka.actor.LightArrayRevolverScheduler$TaskHolder","method":"executeTask","file":"LightArrayRevolverScheduler.scala","line":329,"exact":true,"location":"akka-actor_2.11-2.4.20.jar","version":"?"},{"class":"akka.actor.LightArrayRevolverScheduler$$anon$4","method":"executeBucket$1","file":"LightArrayRevolverScheduler.scala","line":280,"exact":true,"location":"akka-actor_2.11-2.4.20.jar","version":"?"},{"class":"akka.actor.LightArrayRevolverScheduler$$anon$4","method":"nextTick","file":"LightArrayRevolverScheduler.scala","line":284,"exact":true,"location":"akka-actor_2.11-2.4.20.jar","version":"?"},{"class":"akka.actor.LightArrayRevolverScheduler$$anon$4","method":"run","file":"LightArrayRevolverScheduler.scala","line":236,"exact":true,"location":"akka-actor_2.11-2.4.20.jar","version":"?"},{"class":"java.lang.Thread","method":"run","file":"Thread.java","line":745,"exact":true,"location":"?","version":"1.8.0_66"}]},"endOfBatch":false,"loggerFqcn":"org.apache.logging.slf4j.Log4jLogger","threadId":41,"threadPriority":5}^M
{"timeMillis":1576764877812,"thread":"flink-scheduler-1","level":"ERROR","loggerName":"org.apache.flink.runtime.rest.handler.cluster.ClusterOverviewHandler","message":"Unhandled exception.","thrown":{"commonElementCount":0,"localizedMessage":"Ask timed out on [Actor[akka://flink/user/dispatcher#1899316777]] after [10000 ms]. Sender[null] sent message of type \"org.apache.flink.runtime.rpc.messages.LocalFencedMessage\".","message":"Ask timed out on [Actor[akka://flink/user/dispatcher#1899316777]] after [10000 ms]. Sender[null] sent message of type \"org.apache.flink.runtime.rpc.messages.LocalFencedMessage\".","name":"akka.pattern.AskTimeoutException","extendedStackTrace":[{"class":"akka.pattern.PromiseActorRef$$anonfun$1","method":"apply$mcV$sp","file":"AskSupport.scala","line":604,"exact":true,"location":"akka-actor_2.11-2.4.20.jar","version":"?"},{"class":"akka.actor.Scheduler$$anon$4","method":"run","file":"Scheduler.scala","line":126,"exact":true,"location":"akka-actor_2.11-2.4.20.jar","version":"?"},{"class":"scala.concurrent.Future$InternalCallbackExecutor$","method":"unbatchedExecute","file":"Future.scala","line":601,"exact":true,"location":"scala-library-2.11.12.jar","version":"?"},{"class":"scala.concurrent.BatchingExecutor$class","method":"execute","file":"BatchingExecutor.scala","line":109,"exact":true,"location":"scala-library-2.11.12.jar","version":"?"},{"class":"scala.concurrent.Future$InternalCallbackExecutor$","method":"execute","file":"Future.scala","line":599,"exact":true,"location":"scala-library-2.11.12.jar","version":"?"},{"class":"akka.actor.LightArrayRevolverScheduler$TaskHolder","method":"executeTask","file":"LightArrayRevolverScheduler.scala","line":329,"exact":true,"location":"akka-actor_2.11-2.4.20.jar","version":"?"},{"class":"akka.actor.LightArrayRevolverScheduler$$anon$4","method":"executeBucket$1","file":"LightArrayRevolverScheduler.scala","line":280,"exact":true,"location":"akka-actor_2.11-2.4.20.jar","version":"?"},{"class":"akka.actor.LightArrayRevolverScheduler$$anon$4","method":"nextTick","file":"LightArrayRevolverScheduler.scala","line":284,"exact":true,"location":"akka-actor_2.11-2.4.20.jar","version":"?"},{"class":"akka.actor.LightArrayRevolverScheduler$$anon$4","method":"run","file":"LightArrayRevolverScheduler.scala","line":236,"exact":true,"location":"akka-actor_2.11-2.4.20.jar","version":"?"},{"class":"java.lang.Thread","method":"run","file":"Thread.java","line":745,"exact":true,"location":"?","version":"1.8.0_66"}]},"endOfBatch":false,"loggerFqcn":"org.apache.logging.slf4j.Log4jLogger","threadId":41,"threadPriority":5}^M
{"timeMillis":1576764877815,"thread":"flink-akka.actor.default-dispatcher-1038","level":"INFO","loggerName":"akka.actor.RepointableActorRef","message":"Message [org.apache.flink.runtime.rpc.messages.LocalFencedMessage] from Actor[akka://flink/deadLetters] to Actor[akka://flink/user/jobmanager_4#-2122695705] was not delivered. [89] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.","endOfBatch":false,"loggerFqcn":"org.apache.logging.slf4j.Log4jLogger","threadId":1152,"threadPriority":5}^M
{"timeMillis":1576764877816,"thread":"flink-akka.actor.default-dispatcher-1038","level":"INFO","loggerName":"akka.actor.RepointableActorRef","message":"Message [org.apache.flink.runtime.rpc.messages.LocalFencedMessage] from Actor[akka://flink/deadLetters] to Actor[akka://flink/user/jobmanager_4#-2122695705] was not delivered. [90] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.","endOfBatch":false,"loggerFqcn":"org.apache.logging.slf4j.Log4jLogger","threadId":1152,"threadPriority":5}^M
{"timeMillis":1576764877816,"thread":"flink-akka.actor.default-dispatcher-1038","level":"INFO","loggerName":"akka.actor.RepointableActorRef","message":"Message [org.apache.flink.runtime.rpc.messages.LocalFencedMessage] from Actor[akka://flink/deadLetters] to Actor[akka://flink/user/jobmanager_4#-2122695705] was not delivered. [91] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.","endOfBatch":false,"loggerFqcn":"org.apache.logging.slf4j.Log4jLogger","threadId":1152,"threadPriority":5}^M
{"timeMillis":1576764877830,"thread":"flink-akka.actor.default-dispatcher-1016","level":"INFO","loggerName":"org.apache.flink.runtime.jobmaster.JobMaster","message":"Using restart strategy FixedDelayRestartStrategy(maxNumberRestartAttempts=2147483647, delayBetweenRestartAttempts=10000) for sample (714829e8f6c8cd0daaed335c1b8c588a).","endOfBatch":false,"loggerFqcn":"org.apache.logging.slf4j.Log4jLogger","threadId":1126,"threadPriority":5}^M
{"timeMillis":1576764877834,"thread":"flink-akka.actor.default-dispatcher-1016","level":"INFO","loggerName":"org.apache.flink.runtime.executiongraph.ExecutionGraph","message":"Job recovers via failover strategy: full graph restart","endOfBatch":false,"loggerFqcn":"org.apache.logging.slf4j.Log4jLogger","threadId":1126,"threadPriority":5}^M
{"timeMillis":1576764877837,"thread":"flink-akka.actor.default-dispatcher-1016","level":"INFO","loggerName":"org.apache.flink.runtime.jobmaster.JobMaster","message":"Running initialization on master for job sample (714829e8f6c8cd0daaed335c1b8c588a).","endOfBatch":false,"loggerFqcn":"org.apache.logging.slf4j.Log4jLogger","threadId":1126,"threadPriority":5}^M
{"timeMillis":1576764877837,"thread":"flink-akka.actor.default-dispatcher-1016","level":"INFO","loggerName":"org.apache.flink.runtime.jobmaster.JobMaster","message":"Successfully ran initialization on master in 0 ms.","endOfBatch":false,"loggerFqcn":"org.apache.logging.slf4j.Log4jLogger","threadId":1126,"threadPriority":5}^M
{"timeMillis":1576764877876,"thread":"flink-akka.actor.default-dispatcher-1016","level":"INFO","loggerName":"org.apache.flink.runtime.util.ZooKeeperUtils","message":"Initialized ZooKeeperCompletedCheckpointStore in '/checkpoints/714829e8f6c8cd0daaed335c1b8c588a'.","endOfBatch":false,"loggerFqcn":"org.apache.logging.slf4j.Log4jLogger","threadId":1126,"threadPriority":5}^M
{"timeMillis":1576764877878,"thread":"flink-akka.actor.default-dispatcher-1016","level":"INFO","loggerName":"org.apache.flink.runtime.jobmaster.JobMaster","message":"State backend is set to heap memory (checkpoints to filesystem \<a class="moz-txt-link-rfc2396E" href="file:/var/data/ndp/_no_backup_/flink/pipeline-checkpoint/fs\">"file:/var/data/ndp/_no_backup_/flink/pipeline-checkpoint/fs\")","endOfBatch":false,"loggerFqcn":"org.apache.logging.slf4j.Log4jLogger","threadId":1126,"threadPriority":5}^M
{"timeMillis":1576764878554,"thread":"flink-akka.actor.default-dispatcher-1016","level":"INFO","loggerName":"org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore","message":"Recovering checkpoints from ZooKeeper.","endOfBatch":false,"loggerFqcn":"org.apache.logging.slf4j.Log4jLogger","threadId":1126,"threadPriority":5}^M
{"timeMillis":1576764878579,"thread":"flink-akka.actor.default-dispatcher-1016","level":"INFO","loggerName":"org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore","message":"Found 0 checkpoints in ZooKeeper.","endOfBatch":false,"loggerFqcn":"org.apache.logging.slf4j.Log4jLogger","threadId":1126,"threadPriority":5}^M
{"timeMillis":1576764878579,"thread":"flink-akka.actor.default-dispatcher-1016","level":"INFO","loggerName":"org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore","message":"Trying to fetch 0 checkpoints from storage.","endOfBatch":false,"loggerFqcn":"org.apache.logging.slf4j.Log4jLogger","threadId":1126,"threadPriority":5}^M
{"timeMillis":1576764878579,"thread":"flink-akka.actor.default-dispatcher-1016","level":"INFO","loggerName":"org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionService","message":"Starting ZooKeeperLeaderElectionService ZooKeeperLeaderElectionService{leaderPath='/leader/714829e8f6c8cd0daaed335c1b8c588a/job_manager_lock'}.","endOfBatch":false,"loggerFqcn":"org.apache.logging.slf4j.Log4jLogger","threadId":1126,"threadPriority":5}^M
{"timeMillis":1576764878584,"thread":"flink-akka.actor.default-dispatcher-1027","level":"INFO","loggerName":"akka.remote.RemoteActorRefProvider$RemoteDeadLetterActorRef","message":"Message [org.apache.flink.runtime.messages.Acknowledge] from Actor[akka://flink/deadLetters] to Actor[akka://flink/deadLetters] was not delivered. [92] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.","endOfBatch":false,"loggerFqcn":"org.apache.logging.slf4j.Log4jLogger","threadId":1137,"threadPriority":5}^M
{"timeMillis":1576764878623,"thread":"main-EventThread","level":"INFO","loggerName":"org.apache.flink.runtime.jobmaster.JobManagerRunner","message":"JobManager runner for job sample (714829e8f6c8cd0daaed335c1b8c588a) was granted leadership with session id 69063d3f-7610-4c42-8f25-cf654d87a7e1 at akka.tcp://flink@169.254.35.76:43723/user/jobmanager_5.","endOfBatch":false,"loggerFqcn":"org.apache.logging.slf4j.Log4jLogger","threadId":58,"threadPriority":5}^M
{"timeMillis":1576764878653,"thread":"flink-akka.actor.default-dispatcher-1027","level":"INFO","loggerName":"org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService","message":"Starting ZooKeeperLeaderRetrievalService /leader/resource_manager_lock.","endOfBatch":false,"loggerFqcn":"org.apache.logging.slf4j.Log4jLogger","threadId":1137,"threadPriority":5}^M
{"timeMillis":1576764878654,"thread":"flink-akka.actor.default-dispatcher-1027","level":"INFO","loggerName":"org.apache.flink.runtime.jobmaster.JobMaster","message":"Starting execution of job sample (714829e8f6c8cd0daaed335c1b8c588a) under job master id 8f25cf654d87a7e169063d3f76104c42.","endOfBatch":false,"loggerFqcn":"org.apache.logging.slf4j.Log4jLogger","threadId":1137,"threadPriority":5}^M
{"timeMillis":1576764878654,"thread":"flink-akka.actor.default-dispatcher-1027","level":"INFO","loggerName":"org.apache.flink.runtime.executiongraph.ExecutionGraph","message":"Job sample (714829e8f6c8cd0daaed335c1b8c588a) switched from state CREATED to RUNNING.","endOfBatch":false,"loggerFqcn":"org.apache.logging.slf4j.Log4jLogger","threadId":1137,"threadPriority":5}^M

On Thu, Dec 19, 2019 at 7:20 PM tison <[hidden email]> wrote:
IIRC this issue is possibly caused by resource limited or some occasional reasons. Ever I heard that someone upgrade Java version and the issue vanished.

For "akka.ask.timeout", it is used for all akka ask requests timeout. And I second Yang that the timeout is irrelevant with client-server connection.

Best,
tison.


Yang Wang <[hidden email]> 于2019年12月20日周五 上午11:02写道:
It seems that not because the timeout of rest client. It is a server side akka timeout exception. 
Could you share the jobmanager logs?

Best,
Yang

Abdul Qadeer <[hidden email]> 于2019年12月20日周五 上午10:59写道:
The relevant config here is "akka.ask.timeout".

On Thu, Dec 19, 2019 at 6:51 PM tison <[hidden email]> wrote:
In previous version there is an "akka.client.timeout" option but it is only used for timeout the future in client side so I don't think it change akka scope timeout.

Best,
tison.


Abdul Qadeer <[hidden email]> 于2019年12月20日周五 上午10:44写道:
Hi!

I am using Flink 1.8.3 and facing an issue where job submission through RestClusterClient times out on Akka (default value 10s). In previous Flink versions there was an option to set a different timeout value just for the submission client (ClusterClient config), but looks like it is not honored now as job submission from client is no more through Akka and it will use the same value present with Dispatcher. I wanted to know how to increase this timeout just for job submission without affecting other akka threads in TaskManager/JobManager, or any other solution for the problem.

The relevant stack trace is pasted below:

"cause":{"commonElementCount":8,"localizedMessage":"Could not submit job (JobID: 26940c17ae3130fb8be1323cce1036e4)","message":"Could not submit job (JobID: 26940c17ae3130fb8be1323cce1036e4)","name":"org.apache.flink.client.program.ProgramInvocationException","cause":{"commonElementCount":3,"localizedMessage":"Failed to submit JobGraph.","message":"Failed to submit JobGraph.","name":"org.apache.flink.runtime.client.JobSubmissionException","cause":{"commonElementCount":3,"localizedMessage":"[Internal server error., <Exception on server side:\nakka.pattern.AskTimeoutException: Ask timed out on [Actor[akka://flink/user/dispatcher#1457923918]] after [10000 ms]. Sender[null] sent message of type \"org.apache.flink.runtime.rpc.messages.LocalFencedMessage\".\n\tat akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:604)\n\tat akka.actor.Scheduler$$anon$4.run(Scheduler.scala:126)\n\tat scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:601)\n\tat scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109)\n\tat scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:599)\n\tat akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:329)\n\tat akka.actor.LightArrayRevolverScheduler$$anon$4.executeBucket$1(LightArrayRevolverScheduler.scala:280)\n\tat akka.actor.LightArrayRevolverScheduler$$anon$4.nextTick(LightArrayRevolverScheduler.scala:284)\n\tat akka.actor.LightArrayRevolverScheduler$$anon$4.run(LightArrayRevolverScheduler.scala:236)\n\tat java.lang.Thread.run(Thread.java:745)\n\nEnd of exception on server side>]","message":"[Internal server error., <Exception on server side:\nakka.pattern.AskTimeoutException: Ask timed out on [Actor[akka://flink/user/dispatcher#1457923918]] after [10000 ms]. Sender[null] sent message of type \"org.apache.flink.runtime.rpc.messages.LocalFencedMessage\".\n\tat akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:604)\n\tat akka.actor.Scheduler$$anon$4.run(Scheduler.scala:126)\n\tat scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:601)\n\tat scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109)\n\tat scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:599)\n\tat akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:329)\n\tat akka.actor.LightArrayRevolverScheduler$$anon$4.executeBucket$1(LightArrayRevolverScheduler.scala:280)\n\tat akka.actor.LightArrayRevolverScheduler$$anon$4.nextTick(LightArrayRevolverScheduler.scala:284)\n\tat akka.actor.LightArrayRevolverScheduler$$anon$4.run(LightArrayRevolverScheduler.scala:236)\n\tat java.lang.Thread.run(Thread.java:745)\n\nEnd of exception on server side>]","name":"org.apache.flink.runtime.rest.util.RestClientException","extendedStackTrace":[{"class":"org.apache.flink.runtime.rest.RestClient","method":"parseResponse","file":"RestClient.java","line":389,"exact":false,"location":"flink-runtime_2.11-1.8.2.jar","version":"1.8.2"},{"class":"org.apache.flink.runtime.rest.RestClient","method":"lambda$submitRequest$3","file":"RestClient.java","line":373,"exact":false,"location":"flink-runtime_2.11-1.8.2.jar","version":"1.8.2"}