Hey all,
We deploy application cluster natively on Kubernetes.
are there any timeouts for Job execution and cluster creation?
In order to get an indication about the cluster ,
we leverage the k8s client to <a href="https://github.com/fabric8io/kubernetes-client/blob/master/doc/CHEATSHEET.md#deployment#:~:text=Watching%20a%20Deployment%3A" title="https://github.com/fabric8io/kubernetes-client/blob/master/doc/CHEATSHEET.md#deployment#:~:text=Watching%20a%20Deployment%3A" style="margin:0px">watch
the deployment in a namespace with specific cluster name and respond accordingly.
we define two timeouts
However, this solution is not ideal because in case this
client lib crashes, the timeouts are gone.
We don't want to manage these timeouts states ourselves.
Any suggestion or better way?
Thanks,
Tamir.
|
Hi Tamir, Thanks for trying the native K8s integration. 1. We do not have a timeout for creating the Flink application cluster. The reason is that the job submission happens on the JobManager side. So the Flink client does not need to wait for the JobManager running and then exit. I think even though the Flink client internally has the timeout, we still have the same problem when the Flink client crashes and then the timeout is gone. I want to share some other solution about the timeout. In our deployer, when a new Flink application is created, the deployer will periodically check the accessibility of Flink rest endpoint. When it is not ready in the timeout(e.g. 120s), the deployer will delete the Flink JobManager deployment and try to create a new one. 2. Actually, the current "flink run-application" does not support the real attached mode(waiting for all the jobs in the application finished). I am curious why you have "infinite job execution" in your Flink application cluster. If all the jobs in the application finished, Flink will deregister the application and all the K8s resources should be cleaned up. Best, Yang Tamir Sagi <[hidden email]> 于2021年4月5日周一 下午11:24写道:
|
Hey Yang
Thank you for your respond
We run the application cluster programmatically.
I discussed about it here with an example how to run it from java and not CLI.
following your comment
I
have not seen it in action actually, I gave a non-existing image . The deployer actually started the k8s deployment but pods failed to start(expected) , The k8s deployment was running infinite.
What
configuration is that ? is it possible to override it ?
I delved into the Flink-Core, and Flink-Kubernetes jars, Since Flink is dependent on Kubernetes , we both need to leverage the Kubernetes client(which Flink does internally) to manage and inspecting
the resources.
My though was about what happens if there is a bug and the job running infinite, job manager crashes over and over again?
What happens if resources don't get cleaned properly ? We don't want to keep the cluster up and running in that case and would like to get a feedback.
Since Flink does not support that we have to inspect that externally.(which makes it more complex)
We could also pull the job status using Flink client, but it become useless if the job is executed infinite.
What do you think?
Best,
Tamir.
From: Yang Wang <[hidden email]>
Sent: Tuesday, April 6, 2021 10:36 AM To: Tamir Sagi <[hidden email]> Cc: [hidden email] <[hidden email]> Subject: Re: Application cluster - Job execution and cluster creation timeouts
Hi Tamir,
Thanks for trying the native K8s integration.
1. We do not have a timeout for creating the Flink application cluster. The reason is that the job submission happens on the JobManager side.
So the Flink client does not need to wait for the JobManager running and then exit.
I think even though the Flink client internally has the timeout, we still have the same problem when the Flink client crashes and then the timeout is
gone.
I want to share some other solution about the timeout. In our deployer, when a new Flink application is created, the deployer will periodically check the
accessibility of Flink rest endpoint. When it is not ready in the timeout(e.g. 120s), the deployer will delete the Flink JobManager deployment and try to
create a new one.
2. Actually, the current "flink run-application" does not support the real attached mode(waiting for all the jobs in the application finished).
I am curious why you have "infinite job execution" in your Flink application cluster. If all the jobs in the application finished, Flink will
deregister the application and all the K8s resources should be cleaned up.
Best,
Yang
Tamir Sagi <[hidden email]> 于2021年4月5日周一 下午11:24写道:
|
Hi Tamir, Maybe I did not make myself clear. Here the "deployer" means our internal Flink application deployer(actually it is ververica platform), not the ApplicationDeployer interface in Flink. It helps with managing the lifecycle of every Flink application. And it has the same native K8s integration mechanism with you have mentioned. In my opinion, cleaning up the infinite failover Flink application(e.g. wrong image) is the responsibility of your own deployer, not the Flink client. In such a case, the JobManager usually could not run normally. However, if the JobManager could be started successfully. Then it will clean up all the K8s resources once all the jobs reached to the terminal status(e.g. FAILED, CANCELED, FINISHED). Even the JobManager crashed, it could recover the jobs from latest checkpoint successfully if HA[1] enabled. Best, Yang Tamir Sagi <[hidden email]> 于2021年4月6日周二 下午6:43写道:
|
Hey Yang, Community
As been discussed few weeks ago, I'm working on Application Cluster - Native K8s approach, running Flink 1.12.2.
We deploy application clusters programmatically which works well.
In addition, we leverage Kubernetes client(Fabric8io) to watch the deployment/pods status and get an indication whether the k8s cluster is up and running.
Job Details
We read file from S3 using hadoop-s3-plugin (https://ci.apache.org/projects/flink/flink-docs-stable/deployment/filesystems/s3.html#hadooppresto-s3-file-systems-plugins)
Process the data and write it back to S3.
We fetch the job list using ClusterClient.( cluent.listJobs() )
No savepoints/backend state are configured.
I would like to raise several questions regarding some scenarios I encountered and would like to get your feedback.
These scenarios showed Flink Application cluster(native
k8s) behavior in case of failures.
Scenario 1: Exception prior env.execute() gets called.
I deploy application cluster and an exception is thrown prior env.execute() gets called.
Result: Received exception, nothing gets cleaned - Job Manager pod is still running despite no jobs are running.
Question: How should we get the "Real" Cluster status, Job manager pod is running but the execution has never occurred. (Memory leak ? )
Scenario 2 : Application state Error and no running jobs.
env.execute gets called and an exception is thrown prior the Job starts.
I did not provide AWS credential and exception com.amazonaws.AmazonClientException was thrown.
which led to "Caused by: org.apache.flink.runtime.client.JobExecutionException" error.
the Application state was failed but the list jobs was empty(they never started)
according to the document(https://ci.apache.org/projects/flink/flink-docs-release-1.12/api/java/org/apache/flink/client/program/ClusterClient.html#listJobs--)
I still see debug logs where Flink is aware of that exception and clean all the resources.
[2021-04-25T13:59:59,569][Info] {} [o.a.f.r.e.ClusterEntrypoint]: Shutting KubernetesApplicationClusterEntrypoint down with application status FAILED. Diagnostics null.
[2021-04-25T13:59:59,569][Info] {} [o.a.f.r.j.MiniDispatcherRestEndpoint]: Shutting down rest endpoint.
[2021-04-25T13:59:59,767][Info] {} [o.a.f.r.r.a.ActiveResourceManager]: Shut down cluster because application is in FAILED, diagnostics null.
[2021-04-25T13:59:59,768][Info] {} [o.a.f.k.KubernetesResourceManagerDriver]: Deregistering Flink Kubernetes cluster, clusterId: test-flink-app-9645, diagnostics:
Result: cluster gets destroyed, listJobs is empty until client gets "UnknownHost" Exception. (Cluster no longer exists)
Question: How can we get the application state outside the cluster? or catch JobExecutionException ?
Scenario 3: Job starts and throws an exception, Job Status remains in progress
Once the job is executed its status changed to In Progress, list jobs are retrieved(within few seconds) and for each job we query job status via "clusterClient.requestJobResult(jobId)", however once the job failed the result never changes to "Failed" but
the ComplteableFuture get an exception due to max number of retries.
Code snippet
try {
CompletableFuture<JobResult> jobResultFuture = client.requestJobResult(flinkJobId);
jobResultFuture.thenAccept(jobResult -> handleJobResult(jobResult))
.exceptionally(throwable -> {
handleJobResultWithException(flinkJobId, Optional.of(throwable));
return null;
});
} catch (Exception e) {
handleGetJobResultWithException(flinkJobId, Optional.of(e));
}
Stacktrace
Caused by: org.apache.flink.runtime.concurrent.FutureUtils$RetryException: Could not complete the operation. Number of retries has been exhausted.
at org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$9(FutureUtils.java:386) ~[flink-runtime_2.12-1.12.1.jar!/:1.12.1]
... 33 more
Caused by: java.util.concurrent.CompletionException: java.net.UnknownHostException: test-flink-app-24569-rest.testing
at java.util.concurrent.CompletableFuture.encodeThrowable(Unknown Source) ~[?:?]
at java.util.concurrent.CompletableFuture.completeThrowable(Unknown Source) ~[?:?]
at java.util.concurrent.CompletableFuture$UniCompose.tryFire(Unknown Source) ~[?:?]
... 31 more
Caused by: java.net.UnknownHostException: test-flink-app-24569-rest.testing
at java.net.InetAddress$CachedAddresses.get(Unknown Source) ~[?:?]
Result: the resources get cleaned, then the future can no longer get the cluster status. we always end up in the exceptionally clause.
Question:
Why the job result is not changed to failed? what am I missing?
Highly appreciate your help.
Tamir.
From: Yang Wang <[hidden email]>
Sent: Wednesday, April 7, 2021 6:24 AM To: Tamir Sagi <[hidden email]> Cc: [hidden email] <[hidden email]> Subject: Re: Application cluster - Job execution and cluster creation timeouts
Hi Tamir,
Maybe I did not make myself clear. Here the "deployer" means our internal Flink application deployer(actually it is ververica platform),
not the ApplicationDeployer interface in Flink. It helps with managing the lifecycle of every Flink application. And it has the same native
K8s integration mechanism with you have mentioned.
In my opinion, cleaning up the infinite failover Flink application(e.g. wrong image) is the responsibility of your own deployer, not the Flink
client. In such a case, the JobManager usually could not run normally.
However, if the JobManager could be started successfully. Then it will clean up all the K8s resources once all the jobs reached to the
terminal status(e.g. FAILED, CANCELED, FINISHED). Even the JobManager crashed, it could recover the jobs from latest checkpoint
successfully if HA[1] enabled.
Best,
Yang
Tamir Sagi <[hidden email]> 于2021年4月6日周二 下午6:43写道:
|
Hey All,
I know Application cluster is not widely used yet, I'm happy to be part of Flink community , test it and share the results.
Following my previous email, I'd like to share more information and get your feedback.
Scenario 4 : requestJobResult() gets out of sync.
The result is very similar to Scenario #3, here I delved into it and understood the reason.
As I mentioned before, once the Cluster is up and running, we create RestCluster , fetch the listJobs() and query each one of them for jobResult using client.requestJobResult(jobId)
requestJobResult(jobId)
will re-schedule requests unless an exception is thrown or the job is completed.
The polling interval is defined in ExponentialWaitStrategy
As for the RestClusterClient class it's initialized with initalTime = 10L, maxWait=2000L
The interval is returned from "sleep" method
which means that after 8 requests the interval will always be 2 seconds.
1st Request will be sent after 10ms
2nd Request will be sent after 20ms
3rd Request will
be sent after 40ms
4th
Request will be sent after 80ms
5th Request will
be sent after 160ms
6th Request will
be sent after 320ms
7th Request will
be sent after 640ms
8th Request will
be sent after 1280ms
9th Request will
be sent after 2000ms
10th Request will
be sent after 2000ms
11th Request will
be sent after 2000ms
.
.
.
and so on
For example:
[2021-04-28T13:44:25,030][Debug] {} [o.a.f.r.r.RestClient]: Received response {"status":{"id":"IN_PROGRESS"}}.
[2021-04-28T13:44:25,045][Debug] {} [o.a.f.r.r.RestClient]: Sending request of class class org.apache.flink.runtime.rest.messages.EmptyRequestBody to test-flink-app-12748-rest.testing:8081/v1/jobs/e9a30c33c015a6498143162947f6b4e6/execution-result
[2021-04-28T13:44:25,107][Debug] {} [o.a.f.r.r.RestClient]: Received response {"status":{"id":"IN_PROGRESS"}}.
[2021-04-28T13:44:25,128][Debug] {} [o.a.f.r.r.RestClient]: Sending request of class class org.apache.flink.runtime.rest.messages.EmptyRequestBody to test-flink-app-12748-rest.testing:8081/v1/jobs/e9a30c33c015a6498143162947f6b4e6/execution-result
[2021-04-28T13:44:25,144][Debug] {} [o.a.f.r.r.RestClient]: Received response {"status":{"id":"IN_PROGRESS"}}.
[2021-04-28T13:44:25,185][Debug] {} [o.a.f.r.r.RestClient]: Sending request of class class org.apache.flink.runtime.rest.messages.EmptyRequestBody to test-flink-app-12748-rest.testing:8081/v1/jobs/e9a30c33c015a6498143162947f6b4e6/execution-result
[2021-04-28T13:44:25,213][Debug] {} [o.a.f.r.r.RestClient]: Received response {"status":{"id":"IN_PROGRESS"}}.
[2021-04-28T13:44:25,294][Debug] {} [o.a.f.r.r.RestClient]: Sending request of class class org.apache.flink.runtime.rest.messages.EmptyRequestBody to test-flink-app-12748-rest.testing:8081/v1/jobs/e9a30c33c015a6498143162947f6b4e6/execution-result
[2021-04-28T13:44:25,302][Debug] {} [o.a.f.r.r.RestClient]: Received response {"status":{"id":"IN_PROGRESS"}}.
[2021-04-28T13:44:25,465][Debug] {} [o.a.f.r.r.RestClient]: Sending request of class class org.apache.flink.runtime.rest.messages.EmptyRequestBody to test-flink-app-12748-rest.testing:8081/v1/jobs/e9a30c33c015a6498143162947f6b4e6/execution-result
[2021-04-28T13:44:25,517][Debug] {} [o.a.f.r.r.RestClient]: Received response {"status":{"id":"IN_PROGRESS"}}.
[2021-04-28T13:44:25,838][Debug] {} [o.a.f.r.r.RestClient]: Sending request of class class org.apache.flink.runtime.rest.messages.EmptyRequestBody to test-flink-app-12748-rest.testing:8081/v1/jobs/e9a30c33c015a6498143162947f6b4e6/execution-result
[2021-04-28T13:44:25,904][Debug] {} [o.a.f.r.r.RestClient]: Received response {"status":{"id":"IN_PROGRESS"}}.
[2021-04-28T13:44:26,545][Debug] {} [o.a.f.r.r.RestClient]: Sending request of class class org.apache.flink.runtime.rest.messages.EmptyRequestBody to test-flink-app-12748-rest.testing:8081/v1/jobs/e9a30c33c015a6498143162947f6b4e6/execution-result
[2021-04-28T13:44:26,924][Debug] {} [o.a.f.r.r.RestClient]: Received response {"status":{"id":"IN_PROGRESS"}}.
[2021-04-28T13:44:28,206][Debug] {} [o.a.f.r.r.RestClient]: Sending request of class class org.apache.flink.runtime.rest.messages.EmptyRequestBody to test-flink-app-12748-rest.testing:8081/v1/jobs/e9a30c33c015a6498143162947f6b4e6/execution-result
[2021-04-28T13:44:28,241][Debug] {} [o.a.f.r.r.RestClient]: Received response {"status":{"id":"IN_PROGRESS"}}.
[2021-04-28T13:44:30,243][Debug] {} [o.a.f.r.r.RestClient]: Sending request of class class org.apache.flink.runtime.rest.messages.EmptyRequestBody to test-flink-app-12748-rest.testing:8081/v1/jobs/e9a30c33c015a6498143162947f6b4e6/execution-result
[2021-04-28T13:44:30,257][Debug] {} [o.a.f.r.r.RestClient]: Received response {"status":{"id":"IN_PROGRESS"}}.
[2021-04-28T13:44:32,258][Debug] {} [o.a.f.r.r.RestClient]: Sending request of class class org.apache.flink.runtime.rest.messages.EmptyRequestBody to test-flink-app-12748-rest.testing:8081/v1/jobs/e9a30c33c015a6498143162947f6b4e6/execution-result
The first few requests are sent way too fast; the job is still in Progress.
The issue is that the job is completed and the cluster gets destroyed between the polling intervals(2s).
which means that on the next request the cluster does not longer exist, the client cannot get its status and the following exception is eventually raised
: java.net.UnknownHostException within
the CompleteableFuture
Questions:
Another question is related to the exception JobInitializationException
I see the stacktrace and the exception in RestClient as a response, but it's not reflected , Neither failure nor exception .
Logs Sample
[2021-04-28T14:08:41,456][Error] {} [o.a.f.r.r.RestClient]: Received response was neither of the expected type ([simple type, class org.apache.flink.runtime.rest.messages.job.JobExecutionResultResponseBody]) nor an error. Response=JsonResponse{json={"status":{"id":"COMPLETED"},"job-execution-result":{"id":"4e56d506ca892a3109e48e6a5b804330","application-status":"FAILED","accumulator-results":{},"net-runtime":15540,"failure-cause":{"class":"org.apache.flink.runtime.client.JobInitializationException","stack-trace":"org.apache.flink.runtime.client.JobInitializationException:
Could not instantiate JobManager.\n\t at org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$5(Dispatcher.java:494)
at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(Unknown Source)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
at java.base/java.lang.Thread.run(Unknown Source)\nCaused by: org.apache.flink.runtime.client.JobExecutionException: Cannot initialize task 'DataSink (TextOutputFormat (s3://xsight-aggregated-data/6376f1b0-b406-463b-bb52-2e55c68ec9ec/86979f6c-d023-43a1-a6fc-55202e696e54/aggregations)
- UTF-8)': doesBucketExist on xsight-aggregated-data: com.amazonaws.AmazonClientException: No AWS Credentials provided by BasicAWSCredentialsProvider EnvironmentVariableCredentialsProvider InstanceProfileCredentialsProvider : com.amazonaws.AmazonServiceException:
Internal Server Error (Service: null; Status Code: 500; Error Code: null; Request ID: null; Proxy: null): No AWS Credentials provided by BasicAWSCredentialsProvider EnvironmentVariableCredentialsProvider InstanceProfileCredentialsProvider : com.amazonaws.AmazonServiceException:
Internal Server Error (Service: null; Status Code: 500; Error Code: null; Request ID: null; Proxy: null)\n\t at org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:239)
at org.apache.flink.runtime.scheduler.SchedulerBase.createExecutionGraph(SchedulerBase.java:322)
at org.apache.flink.runtime.scheduler.SchedulerBase.createAndRestoreExecutionGraph(SchedulerBase.java:276)
at org.apache.flink.runtime.scheduler.SchedulerBase.<init>(SchedulerBase.java:249)
at org.apache.flink.runtime.scheduler.DefaultScheduler.<init>(DefaultScheduler.java:133)
at org.apache.flink.runtime.scheduler.DefaultSchedulerFactory.createInstance(DefaultSchedulerFactory.java:111)
at org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:345)
at org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:330)
at org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:95)
at org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:39)
at org.apache.flink.runtime.jobmaster.JobManagerRunnerImpl.<init>(JobManagerRunnerImpl.java:162)
at org.apache.flink.runtime.dispatcher.DefaultJobManagerRunnerFactory.createJobManagerRunner(DefaultJobManagerRunnerFactory.java:86)
at org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$5(Dispatcher.java:478)
... 4 more\nCaused by: org.apache.hadoop.fs.s3a.AWSClientIOException: doesBucketExist on xsight-aggregated-data: com.amazonaws.AmazonClientException: No AWS Credentials provided by BasicAWSCredentialsProvider EnvironmentVariableCredentialsProvider InstanceProfileCredentialsProvider
: com.amazonaws.AmazonServiceException: Internal Server Error (Service: null; Status Code: 500; Error Code: null; Request ID: null; Proxy: null): No AWS Credentials provided by BasicAWSCredentialsProvider EnvironmentVariableCredentialsProvider InstanceProfileCredentialsProvider
: com.amazonaws.AmazonServiceException: Internal Server Error (Service: null; Status Code: 500; Error Code: null; Request ID: null; Proxy: null)\n\t at org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java:177)\n\t at org.apache.hadoop.fs.s3a.Invoker.once(Invoker.java:111)\n\t
at org.apache.hadoop.fs.s3a.Invoker.lambda$retry$3(Invoker.java:260)\n\t at org.apache.hadoop.fs.s3a.Invoker.retryUntranslated(Invoker.java:317)
Thank you,
Tamir.
From: Tamir Sagi <[hidden email]>
Sent: Sunday, April 25, 2021 10:07 PM To: Yang Wang <[hidden email]> Cc: [hidden email] <[hidden email]> Subject: [SUSPECTED FRAUD]Re: Application cluster - Job execution and cluster creation timeouts
Hey Yang, Community
As been discussed few weeks ago, I'm working on Application Cluster - Native K8s approach, running Flink 1.12.2.
We deploy application clusters programmatically which works well.
In addition, we leverage Kubernetes client(Fabric8io) to watch the deployment/pods status and get an indication whether the k8s cluster is up and running.
Job Details
We read file from S3 using hadoop-s3-plugin (https://ci.apache.org/projects/flink/flink-docs-stable/deployment/filesystems/s3.html#hadooppresto-s3-file-systems-plugins)
Process the data and write it back to S3.
We fetch the job list using ClusterClient.( cluent.listJobs() )
No savepoints/backend state are configured.
I would like to raise several questions regarding some scenarios I encountered and would like to get your feedback.
These scenarios showed Flink Application cluster(native k8s) behavior
in case of failures.
Scenario 1: Exception prior env.execute() gets called.
I deploy application cluster and an exception is thrown prior env.execute() gets called.
Result: Received exception, nothing gets cleaned - Job Manager pod is still running despite no jobs are running.
Question: How should we get the "Real" Cluster status, Job manager pod is running but the execution has never occurred. (Memory leak ? )
Scenario 2 : Application state Error and no running jobs.
env.execute gets called and an exception is thrown prior the Job starts.
I did not provide AWS credential and exception com.amazonaws.AmazonClientException was thrown.
which led to "Caused by: org.apache.flink.runtime.client.JobExecutionException" error.
the Application state was failed but the list jobs was empty(they never started)
according to the document(https://ci.apache.org/projects/flink/flink-docs-release-1.12/api/java/org/apache/flink/client/program/ClusterClient.html#listJobs--)
I still see debug logs where Flink is aware of that exception and clean all the resources.
[2021-04-25T13:59:59,569][Info] {} [o.a.f.r.e.ClusterEntrypoint]: Shutting KubernetesApplicationClusterEntrypoint down with application status FAILED. Diagnostics null.
[2021-04-25T13:59:59,569][Info] {} [o.a.f.r.j.MiniDispatcherRestEndpoint]: Shutting down rest endpoint.
[2021-04-25T13:59:59,767][Info] {} [o.a.f.r.r.a.ActiveResourceManager]: Shut down cluster because application is in FAILED, diagnostics null.
[2021-04-25T13:59:59,768][Info] {} [o.a.f.k.KubernetesResourceManagerDriver]: Deregistering Flink Kubernetes cluster, clusterId: test-flink-app-9645, diagnostics:
Result: cluster gets destroyed, listJobs is empty until client gets "UnknownHost" Exception. (Cluster no longer exists)
Question: How can we get the application state outside the cluster? or catch JobExecutionException ?
Scenario 3: Job starts and throws an exception, Job Status remains in progress
Once the job is executed its status changed to In Progress, list jobs are retrieved(within few seconds) and for each job we query job status via "clusterClient.requestJobResult(jobId)", however once the job failed the result never changes to "Failed" but
the ComplteableFuture get an exception due to max number of retries.
Code snippet
try {
CompletableFuture<JobResult> jobResultFuture = client.requestJobResult(flinkJobId);
jobResultFuture.thenAccept(jobResult -> handleJobResult(jobResult))
.exceptionally(throwable -> {
handleJobResultWithException(flinkJobId, Optional.of(throwable));
return null;
});
} catch (Exception e) {
handleGetJobResultWithException(flinkJobId, Optional.of(e));
}
Stacktrace
Caused by: org.apache.flink.runtime.concurrent.FutureUtils$RetryException: Could not complete the operation. Number of retries has been exhausted.
at org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$9(FutureUtils.java:386) ~[flink-runtime_2.12-1.12.1.jar!/:1.12.1]
... 33 more
Caused by: java.util.concurrent.CompletionException: java.net.UnknownHostException: test-flink-app-24569-rest.testing
at java.util.concurrent.CompletableFuture.encodeThrowable(Unknown Source) ~[?:?]
at java.util.concurrent.CompletableFuture.completeThrowable(Unknown Source) ~[?:?]
at java.util.concurrent.CompletableFuture$UniCompose.tryFire(Unknown Source) ~[?:?]
... 31 more
Caused by: java.net.UnknownHostException: test-flink-app-24569-rest.testing
at java.net.InetAddress$CachedAddresses.get(Unknown Source) ~[?:?]
Result: the resources get cleaned, then the future can no longer get the cluster status. we always end up in the exceptionally clause.
Question:
Why the job result is not changed to failed? what am I missing?
Highly appreciate your help.
Tamir.
From: Yang Wang <[hidden email]>
Sent: Wednesday, April 7, 2021 6:24 AM To: Tamir Sagi <[hidden email]> Cc: [hidden email] <[hidden email]> Subject: Re: Application cluster - Job execution and cluster creation timeouts
Hi Tamir,
Maybe I did not make myself clear. Here the "deployer" means our internal Flink application deployer(actually it is ververica platform),
not the ApplicationDeployer interface in Flink. It helps with managing the lifecycle of every Flink application. And it has the same native
K8s integration mechanism with you have mentioned.
In my opinion, cleaning up the infinite failover Flink application(e.g. wrong image) is the responsibility of your own deployer, not the Flink
client. In such a case, the JobManager usually could not run normally.
However, if the JobManager could be started successfully. Then it will clean up all the K8s resources once all the jobs reached to the
terminal status(e.g. FAILED, CANCELED, FINISHED). Even the JobManager crashed, it could recover the jobs from latest checkpoint
successfully if HA[1] enabled.
Best,
Yang
Tamir Sagi <[hidden email]> 于2021年4月6日周二 下午6:43写道:
|
Free forum by Nabble | Edit this page |