FileSource may cause akka.pattern.AskTimeoutException, and akka.ask.timeout not workable

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

FileSource may cause akka.pattern.AskTimeoutException, and akka.ask.timeout not workable

oscar.chen
Hello all,

Our team encounter akka.pattern.AskTimeoutException when start jobmanager. Base on the error message, we try to setup akka.ask.timeout and web.timeout to 360s, but both of them doesn't work. 

We guess the issue may cause by FileSource.forRecordFileFormat. The application will load files in batch mode to rebuild our historical data. The job can run normally in small batch. But it will be broken when run over lots of files. (around 30000 files distributed in 1500 folders)
 
The flink application is on kubernetes in application mode and files stores in Google Cloud Storage.

Our questions are, 
1. How to enlarge akka.ask.timeout correctly in our case?
2. Is it cause by FileSource? If yes, could you provide some suggestions to prevent it?


Following is our settings.
```
2021-06-10 03:44:14,317 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: kubernetes.container.image, *****/****:**.*.**
2021-06-10 03:44:14,317 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: fs.hdfs.hadoopconfig, /opt/flink/conf
2021-06-10 03:44:14,317 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: taskmanager.numberOfTaskSlots, 4
2021-06-10 03:44:14,317 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: kubernetes.rest-service.exposed.type, ClusterIP
2021-06-10 03:44:14,317 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: high-availability.jobmanager.port, 6123
2021-06-10 03:44:14,318 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: akka.ask.timeout, 360s
2021-06-10 03:44:14,318 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: state.backend.rocksdb.memory.write-buffer-ratio, 0.7
2021-06-10 03:44:14,318 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: metrics.reporter.prom.class, org.apache.flink.metrics.prometheus.PrometheusReporter
2021-06-10 03:44:14,318 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: state.storage.fs.memory-threshold, 1048576
2021-06-10 03:44:14,318 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: execution.checkpointing.unaligned, true
2021-06-10 03:44:14,318 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: web.timeout, 1000000
2021-06-10 03:44:14,318 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: execution.target, kubernetes-application
2021-06-10 03:44:14,318 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: restart-strategy.fixed-delay.attempts, 2147483647
2021-06-10 03:44:14,319 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: jobmanager.memory.process.size, 8g
2021-06-10 03:44:14,319 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: taskmanager.rpc.port, 6122
2021-06-10 03:44:14,319 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: akka.framesize, 104857600b
2021-06-10 03:44:14,319 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: containerized.master.env.HADOOP_CLASSPATH, /opt/flink/conf:/opt/hadoop-3.1.1/share/hadoop/common/lib/*:/opt/hadoop-3.1.1/share/hadoop/common/*:/opt/hadoop-3.1.1/share/hadoop/hdfs:/opt/hadoop-3.1.1/share/hadoop/hdfs/lib/*:/opt/hadoop-3.1.1/share/hadoop/hdfs/*:/opt/hadoop-3.1.1/share/hadoop/mapreduce/lib/*:/opt/hadoop-3.1.1/share/hadoop/mapreduce/*:/opt/hadoop-3.1.1/share/hadoop/yarn:/opt/hadoop-3.1.1/share/hadoop/yarn/lib/*:/opt/hadoop-3.1.1/share/hadoop/yarn/*:/contrib/capacity-scheduler/*.jar
2021-06-10 03:44:14,319 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: execution.attached, true
2021-06-10 03:44:14,319 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: internal.cluster.execution-mode, NORMAL
2021-06-10 03:44:14,319 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: high-availability, org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
2021-06-10 03:44:14,320 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: execution.checkpointing.externalized-checkpoint-retention, DELETE_ON_CANCELLATION
2021-06-10 03:44:14,320 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: execution.shutdown-on-attached-exit, false
2021-06-10 03:44:14,320 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: pipeline.jars, local:///opt/flink/usrlib/*****-assembly.jar
2021-06-10 03:44:14,320 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: execution.checkpointing.min-pause, 20min
2021-06-10 03:44:14,320 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: restart-strategy, fixed-delay
2021-06-10 03:44:14,320 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: env.java.opts.taskmanager, -XX:+UseG1GC
2021-06-10 03:44:14,320 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: kubernetes.secrets, ******
2021-06-10 03:44:14,320 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: state.checkpoints.dir, gs://**********/
```

and following are the error messages.
```
2021-06-10 03:45:54,040 WARN  org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap [] - Application failed unexpectedly:
java.util.concurrent.CompletionException: org.apache.flink.client.deployment.application.ApplicationExecutionException: Could not execute application.
at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:331) ~[?:?]
at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:346) ~[?:?]
at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:1063) ~[?:?]
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) ~[?:?]
at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2088) ~[?:?]
at org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:257) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
at org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.lambda$runApplicationAsync$1(ApplicationDispatcherBootstrap.java:212) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) [?:?]
at java.util.concurrent.FutureTask.run(FutureTask.java:264) [?:?]
at org.apache.flink.runtime.concurrent.akka.ActorSystemScheduledExecutorAdapter$ScheduledFutureTask.run(ActorSystemScheduledExecutorAdapter.java:159) [flink-dist_2.12-1.13.1.jar:1.13.1]
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40) [flink-dist_2.12-1.13.1.jar:1.13.1]
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44) [flink-dist_2.12-1.13.1.jar:1.13.1]
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) [flink-dist_2.12-1.13.1.jar:1.13.1]
at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) [flink-dist_2.12-1.13.1.jar:1.13.1]
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) [flink-dist_2.12-1.13.1.jar:1.13.1]
at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) [flink-dist_2.12-1.13.1.jar:1.13.1]
Caused by: org.apache.flink.client.deployment.application.ApplicationExecutionException: Could not execute application.
... 11 more
Caused by: org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: java.util.concurrent.TimeoutException: Invocation of public default java.util.concurrent.CompletableFuture org.apache.flink.runtime.webmonitor.RestfulGateway.requestJobStatus(org.apache.flink.api.common.JobID,org.apache.flink.api.common.time.Time) timed out.
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:372) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
at org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:242) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
... 10 more
Caused by: java.util.concurrent.ExecutionException: java.util.concurrent.TimeoutException: Invocation of public default java.util.concurrent.CompletableFuture org.apache.flink.runtime.webmonitor.RestfulGateway.requestJobStatus(org.apache.flink.api.common.JobID,org.apache.flink.api.common.time.Time) timed out.
at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:395) ~[?:?]
at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1999) ~[?:?]
at org.apache.flink.client.program.StreamContextEnvironment.getJobExecutionResult(StreamContextEnvironment.java:123) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
at org.apache.flink.client.program.StreamContextEnvironment.execute(StreamContextEnvironment.java:80) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1834) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
at org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:801) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
at com.appier.rt.rt_match.LookbackService$.main(LookbackService.scala:125) ~[?:?]
at com.appier.rt.rt_match.LookbackService.main(LookbackService.scala) ~[?:?]
at jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:?]
at jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:?]
at jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:?]
at java.lang.reflect.Method.invoke(Method.java:566) ~[?:?]
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
at org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:242) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
... 10 more
Caused by: java.util.concurrent.TimeoutException: Invocation of public default java.util.concurrent.CompletableFuture org.apache.flink.runtime.webmonitor.RestfulGateway.requestJobStatus(org.apache.flink.api.common.JobID,org.apache.flink.api.common.time.Time) timed out.
at org.apache.flink.runtime.rpc.akka.$Proxy47.requestJobStatus(Unknown Source) ~[?:1.13.1]
at org.apache.flink.client.deployment.application.JobStatusPollingUtils.lambda$getJobResult$0(JobStatusPollingUtils.java:57) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
at org.apache.flink.client.deployment.application.JobStatusPollingUtils.pollJobResultAsync(JobStatusPollingUtils.java:87) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
at org.apache.flink.client.deployment.application.JobStatusPollingUtils.pollJobResultAsync(JobStatusPollingUtils.java:69) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
at org.apache.flink.client.deployment.application.JobStatusPollingUtils.getJobResult(JobStatusPollingUtils.java:56) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
at org.apache.flink.client.deployment.application.EmbeddedJobClient.getJobExecutionResult(EmbeddedJobClient.java:128) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
at org.apache.flink.client.program.StreamContextEnvironment.getJobExecutionResult(StreamContextEnvironment.java:102) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
at org.apache.flink.client.program.StreamContextEnvironment.execute(StreamContextEnvironment.java:80) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1834) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
at org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:801) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
at com.appier.rt.rt_match.LookbackService$.main(LookbackService.scala:125) ~[?:?]
at com.appier.rt.rt_match.LookbackService.main(LookbackService.scala) ~[?:?]
at jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:?]
at jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:?]
at jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:?]
at java.lang.reflect.Method.invoke(Method.java:566) ~[?:?]
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
at org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:242) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
... 10 more
Caused by: akka.pattern.AskTimeoutException: Ask timed out on [Actor[akka://flink/user/rpc/dispatcher_1#248799098]] after [60000 ms]. Message of type [org.apache.flink.runtime.rpc.messages.LocalFencedMessage]. A typical reason for `AskTimeoutException` is that the recipient actor didn't send a reply.
at akka.pattern.PromiseActorRef$.$anonfun$defaultOnTimeout$1(AskSupport.scala:635) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
at akka.pattern.PromiseActorRef$.$anonfun$apply$1(AskSupport.scala:650) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
at akka.actor.Scheduler$$anon$4.run(Scheduler.scala:205) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
at scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:870) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
at scala.concurrent.BatchingExecutor.execute(BatchingExecutor.scala:109) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
at scala.concurrent.BatchingExecutor.execute$(BatchingExecutor.scala:103) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
at scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:868) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
at akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:328) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
at akka.actor.LightArrayRevolverScheduler$$anon$3.executeBucket$1(LightArrayRevolverScheduler.scala:279) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
at akka.actor.LightArrayRevolverScheduler$$anon$3.nextTick(LightArrayRevolverScheduler.scala:283) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
at akka.actor.LightArrayRevolverScheduler$$anon$3.run(LightArrayRevolverScheduler.scala:235) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
at java.lang.Thread.run(Thread.java:834) ~[?:?]
2021-06-10 03:45:54,048 ERROR org.apache.flink.runtime.entrypoint.ClusterEntrypoint        [] - Fatal error occurred in the cluster entrypoint.
org.apache.flink.util.FlinkException: Application failed unexpectedly.
at org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.lambda$runApplicationAndShutdownClusterAsync$0(ApplicationDispatcherBootstrap.java:170) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
at java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:930) ~[?:?]
at java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:907) ~[?:?]
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) ~[?:?]
at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2088) ~[?:?]
at org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:257) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
at org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.lambda$runApplicationAsync$1(ApplicationDispatcherBootstrap.java:212) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) [?:?]
at java.util.concurrent.FutureTask.run(FutureTask.java:264) [?:?]
at org.apache.flink.runtime.concurrent.akka.ActorSystemScheduledExecutorAdapter$ScheduledFutureTask.run(ActorSystemScheduledExecutorAdapter.java:159) [flink-dist_2.12-1.13.1.jar:1.13.1]
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40) [flink-dist_2.12-1.13.1.jar:1.13.1]
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44) [flink-dist_2.12-1.13.1.jar:1.13.1]
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) [flink-dist_2.12-1.13.1.jar:1.13.1]
at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) [flink-dist_2.12-1.13.1.jar:1.13.1]
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) [flink-dist_2.12-1.13.1.jar:1.13.1]
at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) [flink-dist_2.12-1.13.1.jar:1.13.1]
Caused by: java.util.concurrent.CompletionException: org.apache.flink.client.deployment.application.ApplicationExecutionException: Could not execute application.
at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:331) ~[?:?]
at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:346) ~[?:?]
at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:1063) ~[?:?]
... 13 more
Caused by: org.apache.flink.client.deployment.application.ApplicationExecutionException: Could not execute application.
... 11 more
Caused by: org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: java.util.concurrent.TimeoutException: Invocation of public default java.util.concurrent.CompletableFuture org.apache.flink.runtime.webmonitor.RestfulGateway.requestJobStatus(org.apache.flink.api.common.JobID,org.apache.flink.api.common.time.Time) timed out.
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:372) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
at org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:242) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
... 10 more
Caused by: java.util.concurrent.ExecutionException: java.util.concurrent.TimeoutException: Invocation of public default java.util.concurrent.CompletableFuture org.apache.flink.runtime.webmonitor.RestfulGateway.requestJobStatus(org.apache.flink.api.common.JobID,org.apache.flink.api.common.time.Time) timed out.
at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:395) ~[?:?]
at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1999) ~[?:?]
at org.apache.flink.client.program.StreamContextEnvironment.getJobExecutionResult(StreamContextEnvironment.java:123) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
at org.apache.flink.client.program.StreamContextEnvironment.execute(StreamContextEnvironment.java:80) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1834) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
at org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:801) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
at com.appier.rt.rt_match.LookbackService$.main(LookbackService.scala:125) ~[?:?]
at com.appier.rt.rt_match.LookbackService.main(LookbackService.scala) ~[?:?]
at jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:?]
at jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:?]
at jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:?]
at java.lang.reflect.Method.invoke(Method.java:566) ~[?:?]
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
at org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:242) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
... 10 more
Caused by: java.util.concurrent.TimeoutException: Invocation of public default java.util.concurrent.CompletableFuture org.apache.flink.runtime.webmonitor.RestfulGateway.requestJobStatus(org.apache.flink.api.common.JobID,org.apache.flink.api.common.time.Time) timed out.
at org.apache.flink.runtime.rpc.akka.$Proxy47.requestJobStatus(Unknown Source) ~[?:1.13.1]
at org.apache.flink.client.deployment.application.JobStatusPollingUtils.lambda$getJobResult$0(JobStatusPollingUtils.java:57) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
at org.apache.flink.client.deployment.application.JobStatusPollingUtils.pollJobResultAsync(JobStatusPollingUtils.java:87) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
at org.apache.flink.client.deployment.application.JobStatusPollingUtils.pollJobResultAsync(JobStatusPollingUtils.java:69) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
at org.apache.flink.client.deployment.application.JobStatusPollingUtils.getJobResult(JobStatusPollingUtils.java:56) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
at org.apache.flink.client.deployment.application.EmbeddedJobClient.getJobExecutionResult(EmbeddedJobClient.java:128) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
at org.apache.flink.client.program.StreamContextEnvironment.getJobExecutionResult(StreamContextEnvironment.java:102) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
at org.apache.flink.client.program.StreamContextEnvironment.execute(StreamContextEnvironment.java:80) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1834) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
at org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:801) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
at com.appier.rt.rt_match.LookbackService$.main(LookbackService.scala:125) ~[?:?]
at com.appier.rt.rt_match.LookbackService.main(LookbackService.scala) ~[?:?]
at jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:?]
at jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:?]
at jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:?]
at java.lang.reflect.Method.invoke(Method.java:566) ~[?:?]
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
at org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:242) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
... 10 more
Caused by: akka.pattern.AskTimeoutException: Ask timed out on [Actor[akka://flink/user/rpc/dispatcher_1#248799098]] after [60000 ms]. Message of type [org.apache.flink.runtime.rpc.messages.LocalFencedMessage]. A typical reason for `AskTimeoutException` is that the recipient actor didn't send a reply.
at akka.pattern.PromiseActorRef$.$anonfun$defaultOnTimeout$1(AskSupport.scala:635) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
at akka.pattern.PromiseActorRef$.$anonfun$apply$1(AskSupport.scala:650) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
at akka.actor.Scheduler$$anon$4.run(Scheduler.scala:205) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
at scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:870) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
at scala.concurrent.BatchingExecutor.execute(BatchingExecutor.scala:109) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
at scala.concurrent.BatchingExecutor.execute$(BatchingExecutor.scala:103) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
at scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:868) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
at akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:328) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
at akka.actor.LightArrayRevolverScheduler$$anon$3.executeBucket$1(LightArrayRevolverScheduler.scala:279) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
at akka.actor.LightArrayRevolverScheduler$$anon$3.nextTick(LightArrayRevolverScheduler.scala:283) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
at akka.actor.LightArrayRevolverScheduler$$anon$3.run(LightArrayRevolverScheduler.scala:235) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
at java.lang.Thread.run(Thread.java:834) ~[?:?]
```

Thanks,
Oscar
Reply | Threaded
Open this post in threaded view
|

Re: FileSource may cause akka.pattern.AskTimeoutException, and akka.ask.timeout not workable

Roman Khachatryan
Hi,

I think you need to increase client.timeout [1].
Regarding the FileSource, it's difficult to say whether it is the
reason. The logs you provided are from the client, JobManager logs
would be helpful.

[1]
https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/config/#client-timeout

Regards,
Roman

On Thu, Jun 10, 2021 at 6:34 AM 陳樺威 <[hidden email]> wrote:

>
> Hello all,
>
> Our team encounter akka.pattern.AskTimeoutException when start jobmanager. Base on the error message, we try to setup akka.ask.timeout and web.timeout to 360s, but both of them doesn't work.
>
> We guess the issue may cause by FileSource.forRecordFileFormat. The application will load files in batch mode to rebuild our historical data. The job can run normally in small batch. But it will be broken when run over lots of files. (around 30000 files distributed in 1500 folders)
>
> The flink application is on kubernetes in application mode and files stores in Google Cloud Storage.
>
> Our questions are,
> 1. How to enlarge akka.ask.timeout correctly in our case?
> 2. Is it cause by FileSource? If yes, could you provide some suggestions to prevent it?
>
>
> Following is our settings.
> ```
> 2021-06-10 03:44:14,317 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: kubernetes.container.image, *****/****:**.*.**
> 2021-06-10 03:44:14,317 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: fs.hdfs.hadoopconfig, /opt/flink/conf
> 2021-06-10 03:44:14,317 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: taskmanager.numberOfTaskSlots, 4
> 2021-06-10 03:44:14,317 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: kubernetes.rest-service.exposed.type, ClusterIP
> 2021-06-10 03:44:14,317 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: high-availability.jobmanager.port, 6123
> 2021-06-10 03:44:14,318 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: akka.ask.timeout, 360s
> 2021-06-10 03:44:14,318 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: state.backend.rocksdb.memory.write-buffer-ratio, 0.7
> 2021-06-10 03:44:14,318 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: metrics.reporter.prom.class, org.apache.flink.metrics.prometheus.PrometheusReporter
> 2021-06-10 03:44:14,318 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: state.storage.fs.memory-threshold, 1048576
> 2021-06-10 03:44:14,318 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: execution.checkpointing.unaligned, true
> 2021-06-10 03:44:14,318 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: web.timeout, 1000000
> 2021-06-10 03:44:14,318 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: execution.target, kubernetes-application
> 2021-06-10 03:44:14,318 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: restart-strategy.fixed-delay.attempts, 2147483647
> 2021-06-10 03:44:14,319 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: jobmanager.memory.process.size, 8g
> 2021-06-10 03:44:14,319 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: taskmanager.rpc.port, 6122
> 2021-06-10 03:44:14,319 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: akka.framesize, 104857600b
> 2021-06-10 03:44:14,319 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: containerized.master.env.HADOOP_CLASSPATH, /opt/flink/conf:/opt/hadoop-3.1.1/share/hadoop/common/lib/*:/opt/hadoop-3.1.1/share/hadoop/common/*:/opt/hadoop-3.1.1/share/hadoop/hdfs:/opt/hadoop-3.1.1/share/hadoop/hdfs/lib/*:/opt/hadoop-3.1.1/share/hadoop/hdfs/*:/opt/hadoop-3.1.1/share/hadoop/mapreduce/lib/*:/opt/hadoop-3.1.1/share/hadoop/mapreduce/*:/opt/hadoop-3.1.1/share/hadoop/yarn:/opt/hadoop-3.1.1/share/hadoop/yarn/lib/*:/opt/hadoop-3.1.1/share/hadoop/yarn/*:/contrib/capacity-scheduler/*.jar
> 2021-06-10 03:44:14,319 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: execution.attached, true
> 2021-06-10 03:44:14,319 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: internal.cluster.execution-mode, NORMAL
> 2021-06-10 03:44:14,319 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: high-availability, org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
> 2021-06-10 03:44:14,320 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: execution.checkpointing.externalized-checkpoint-retention, DELETE_ON_CANCELLATION
> 2021-06-10 03:44:14,320 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: execution.shutdown-on-attached-exit, false
> 2021-06-10 03:44:14,320 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: pipeline.jars, local:///opt/flink/usrlib/*****-assembly.jar
> 2021-06-10 03:44:14,320 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: execution.checkpointing.min-pause, 20min
> 2021-06-10 03:44:14,320 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: restart-strategy, fixed-delay
> 2021-06-10 03:44:14,320 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: env.java.opts.taskmanager, -XX:+UseG1GC
> 2021-06-10 03:44:14,320 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: kubernetes.secrets, ******
> 2021-06-10 03:44:14,320 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: state.checkpoints.dir, gs://**********/
> ```
>
> and following are the error messages.
> ```
> 2021-06-10 03:45:54,040 WARN  org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap [] - Application failed unexpectedly:
> java.util.concurrent.CompletionException: org.apache.flink.client.deployment.application.ApplicationExecutionException: Could not execute application.
> at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:331) ~[?:?]
> at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:346) ~[?:?]
> at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:1063) ~[?:?]
> at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) ~[?:?]
> at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2088) ~[?:?]
> at org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:257) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
> at org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.lambda$runApplicationAsync$1(ApplicationDispatcherBootstrap.java:212) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) [?:?]
> at java.util.concurrent.FutureTask.run(FutureTask.java:264) [?:?]
> at org.apache.flink.runtime.concurrent.akka.ActorSystemScheduledExecutorAdapter$ScheduledFutureTask.run(ActorSystemScheduledExecutorAdapter.java:159) [flink-dist_2.12-1.13.1.jar:1.13.1]
> at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40) [flink-dist_2.12-1.13.1.jar:1.13.1]
> at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44) [flink-dist_2.12-1.13.1.jar:1.13.1]
> at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) [flink-dist_2.12-1.13.1.jar:1.13.1]
> at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) [flink-dist_2.12-1.13.1.jar:1.13.1]
> at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) [flink-dist_2.12-1.13.1.jar:1.13.1]
> at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) [flink-dist_2.12-1.13.1.jar:1.13.1]
> Caused by: org.apache.flink.client.deployment.application.ApplicationExecutionException: Could not execute application.
> ... 11 more
> Caused by: org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: java.util.concurrent.TimeoutException: Invocation of public default java.util.concurrent.CompletableFuture org.apache.flink.runtime.webmonitor.RestfulGateway.requestJobStatus(org.apache.flink.api.common.JobID,org.apache.flink.api.common.time.Time) timed out.
> at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:372) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
> at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
> at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
> at org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:242) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
> ... 10 more
> Caused by: java.util.concurrent.ExecutionException: java.util.concurrent.TimeoutException: Invocation of public default java.util.concurrent.CompletableFuture org.apache.flink.runtime.webmonitor.RestfulGateway.requestJobStatus(org.apache.flink.api.common.JobID,org.apache.flink.api.common.time.Time) timed out.
> at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:395) ~[?:?]
> at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1999) ~[?:?]
> at org.apache.flink.client.program.StreamContextEnvironment.getJobExecutionResult(StreamContextEnvironment.java:123) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
> at org.apache.flink.client.program.StreamContextEnvironment.execute(StreamContextEnvironment.java:80) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
> at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1834) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
> at org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:801) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
> at com.appier.rt.rt_match.LookbackService$.main(LookbackService.scala:125) ~[?:?]
> at com.appier.rt.rt_match.LookbackService.main(LookbackService.scala) ~[?:?]
> at jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:?]
> at jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:?]
> at jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:?]
> at java.lang.reflect.Method.invoke(Method.java:566) ~[?:?]
> at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
> at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
> at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
> at org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:242) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
> ... 10 more
> Caused by: java.util.concurrent.TimeoutException: Invocation of public default java.util.concurrent.CompletableFuture org.apache.flink.runtime.webmonitor.RestfulGateway.requestJobStatus(org.apache.flink.api.common.JobID,org.apache.flink.api.common.time.Time) timed out.
> at org.apache.flink.runtime.rpc.akka.$Proxy47.requestJobStatus(Unknown Source) ~[?:1.13.1]
> at org.apache.flink.client.deployment.application.JobStatusPollingUtils.lambda$getJobResult$0(JobStatusPollingUtils.java:57) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
> at org.apache.flink.client.deployment.application.JobStatusPollingUtils.pollJobResultAsync(JobStatusPollingUtils.java:87) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
> at org.apache.flink.client.deployment.application.JobStatusPollingUtils.pollJobResultAsync(JobStatusPollingUtils.java:69) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
> at org.apache.flink.client.deployment.application.JobStatusPollingUtils.getJobResult(JobStatusPollingUtils.java:56) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
> at org.apache.flink.client.deployment.application.EmbeddedJobClient.getJobExecutionResult(EmbeddedJobClient.java:128) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
> at org.apache.flink.client.program.StreamContextEnvironment.getJobExecutionResult(StreamContextEnvironment.java:102) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
> at org.apache.flink.client.program.StreamContextEnvironment.execute(StreamContextEnvironment.java:80) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
> at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1834) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
> at org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:801) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
> at com.appier.rt.rt_match.LookbackService$.main(LookbackService.scala:125) ~[?:?]
> at com.appier.rt.rt_match.LookbackService.main(LookbackService.scala) ~[?:?]
> at jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:?]
> at jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:?]
> at jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:?]
> at java.lang.reflect.Method.invoke(Method.java:566) ~[?:?]
> at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
> at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
> at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
> at org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:242) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
> ... 10 more
> Caused by: akka.pattern.AskTimeoutException: Ask timed out on [Actor[akka://flink/user/rpc/dispatcher_1#248799098]] after [60000 ms]. Message of type [org.apache.flink.runtime.rpc.messages.LocalFencedMessage]. A typical reason for `AskTimeoutException` is that the recipient actor didn't send a reply.
> at akka.pattern.PromiseActorRef$.$anonfun$defaultOnTimeout$1(AskSupport.scala:635) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
> at akka.pattern.PromiseActorRef$.$anonfun$apply$1(AskSupport.scala:650) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
> at akka.actor.Scheduler$$anon$4.run(Scheduler.scala:205) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
> at scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:870) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
> at scala.concurrent.BatchingExecutor.execute(BatchingExecutor.scala:109) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
> at scala.concurrent.BatchingExecutor.execute$(BatchingExecutor.scala:103) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
> at scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:868) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
> at akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:328) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
> at akka.actor.LightArrayRevolverScheduler$$anon$3.executeBucket$1(LightArrayRevolverScheduler.scala:279) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
> at akka.actor.LightArrayRevolverScheduler$$anon$3.nextTick(LightArrayRevolverScheduler.scala:283) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
> at akka.actor.LightArrayRevolverScheduler$$anon$3.run(LightArrayRevolverScheduler.scala:235) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
> at java.lang.Thread.run(Thread.java:834) ~[?:?]
> 2021-06-10 03:45:54,048 ERROR org.apache.flink.runtime.entrypoint.ClusterEntrypoint        [] - Fatal error occurred in the cluster entrypoint.
> org.apache.flink.util.FlinkException: Application failed unexpectedly.
> at org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.lambda$runApplicationAndShutdownClusterAsync$0(ApplicationDispatcherBootstrap.java:170) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
> at java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:930) ~[?:?]
> at java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:907) ~[?:?]
> at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) ~[?:?]
> at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2088) ~[?:?]
> at org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:257) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
> at org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.lambda$runApplicationAsync$1(ApplicationDispatcherBootstrap.java:212) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) [?:?]
> at java.util.concurrent.FutureTask.run(FutureTask.java:264) [?:?]
> at org.apache.flink.runtime.concurrent.akka.ActorSystemScheduledExecutorAdapter$ScheduledFutureTask.run(ActorSystemScheduledExecutorAdapter.java:159) [flink-dist_2.12-1.13.1.jar:1.13.1]
> at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40) [flink-dist_2.12-1.13.1.jar:1.13.1]
> at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44) [flink-dist_2.12-1.13.1.jar:1.13.1]
> at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) [flink-dist_2.12-1.13.1.jar:1.13.1]
> at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) [flink-dist_2.12-1.13.1.jar:1.13.1]
> at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) [flink-dist_2.12-1.13.1.jar:1.13.1]
> at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) [flink-dist_2.12-1.13.1.jar:1.13.1]
> Caused by: java.util.concurrent.CompletionException: org.apache.flink.client.deployment.application.ApplicationExecutionException: Could not execute application.
> at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:331) ~[?:?]
> at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:346) ~[?:?]
> at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:1063) ~[?:?]
> ... 13 more
> Caused by: org.apache.flink.client.deployment.application.ApplicationExecutionException: Could not execute application.
> ... 11 more
> Caused by: org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: java.util.concurrent.TimeoutException: Invocation of public default java.util.concurrent.CompletableFuture org.apache.flink.runtime.webmonitor.RestfulGateway.requestJobStatus(org.apache.flink.api.common.JobID,org.apache.flink.api.common.time.Time) timed out.
> at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:372) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
> at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
> at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
> at org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:242) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
> ... 10 more
> Caused by: java.util.concurrent.ExecutionException: java.util.concurrent.TimeoutException: Invocation of public default java.util.concurrent.CompletableFuture org.apache.flink.runtime.webmonitor.RestfulGateway.requestJobStatus(org.apache.flink.api.common.JobID,org.apache.flink.api.common.time.Time) timed out.
> at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:395) ~[?:?]
> at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1999) ~[?:?]
> at org.apache.flink.client.program.StreamContextEnvironment.getJobExecutionResult(StreamContextEnvironment.java:123) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
> at org.apache.flink.client.program.StreamContextEnvironment.execute(StreamContextEnvironment.java:80) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
> at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1834) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
> at org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:801) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
> at com.appier.rt.rt_match.LookbackService$.main(LookbackService.scala:125) ~[?:?]
> at com.appier.rt.rt_match.LookbackService.main(LookbackService.scala) ~[?:?]
> at jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:?]
> at jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:?]
> at jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:?]
> at java.lang.reflect.Method.invoke(Method.java:566) ~[?:?]
> at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
> at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
> at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
> at org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:242) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
> ... 10 more
> Caused by: java.util.concurrent.TimeoutException: Invocation of public default java.util.concurrent.CompletableFuture org.apache.flink.runtime.webmonitor.RestfulGateway.requestJobStatus(org.apache.flink.api.common.JobID,org.apache.flink.api.common.time.Time) timed out.
> at org.apache.flink.runtime.rpc.akka.$Proxy47.requestJobStatus(Unknown Source) ~[?:1.13.1]
> at org.apache.flink.client.deployment.application.JobStatusPollingUtils.lambda$getJobResult$0(JobStatusPollingUtils.java:57) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
> at org.apache.flink.client.deployment.application.JobStatusPollingUtils.pollJobResultAsync(JobStatusPollingUtils.java:87) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
> at org.apache.flink.client.deployment.application.JobStatusPollingUtils.pollJobResultAsync(JobStatusPollingUtils.java:69) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
> at org.apache.flink.client.deployment.application.JobStatusPollingUtils.getJobResult(JobStatusPollingUtils.java:56) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
> at org.apache.flink.client.deployment.application.EmbeddedJobClient.getJobExecutionResult(EmbeddedJobClient.java:128) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
> at org.apache.flink.client.program.StreamContextEnvironment.getJobExecutionResult(StreamContextEnvironment.java:102) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
> at org.apache.flink.client.program.StreamContextEnvironment.execute(StreamContextEnvironment.java:80) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
> at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1834) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
> at org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:801) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
> at com.appier.rt.rt_match.LookbackService$.main(LookbackService.scala:125) ~[?:?]
> at com.appier.rt.rt_match.LookbackService.main(LookbackService.scala) ~[?:?]
> at jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:?]
> at jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:?]
> at jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:?]
> at java.lang.reflect.Method.invoke(Method.java:566) ~[?:?]
> at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
> at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
> at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
> at org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:242) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
> ... 10 more
> Caused by: akka.pattern.AskTimeoutException: Ask timed out on [Actor[akka://flink/user/rpc/dispatcher_1#248799098]] after [60000 ms]. Message of type [org.apache.flink.runtime.rpc.messages.LocalFencedMessage]. A typical reason for `AskTimeoutException` is that the recipient actor didn't send a reply.
> at akka.pattern.PromiseActorRef$.$anonfun$defaultOnTimeout$1(AskSupport.scala:635) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
> at akka.pattern.PromiseActorRef$.$anonfun$apply$1(AskSupport.scala:650) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
> at akka.actor.Scheduler$$anon$4.run(Scheduler.scala:205) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
> at scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:870) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
> at scala.concurrent.BatchingExecutor.execute(BatchingExecutor.scala:109) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
> at scala.concurrent.BatchingExecutor.execute$(BatchingExecutor.scala:103) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
> at scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:868) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
> at akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:328) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
> at akka.actor.LightArrayRevolverScheduler$$anon$3.executeBucket$1(LightArrayRevolverScheduler.scala:279) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
> at akka.actor.LightArrayRevolverScheduler$$anon$3.nextTick(LightArrayRevolverScheduler.scala:283) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
> at akka.actor.LightArrayRevolverScheduler$$anon$3.run(LightArrayRevolverScheduler.scala:235) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
> at java.lang.Thread.run(Thread.java:834) ~[?:?]
> ```
>
> Thanks,
> Oscar