Application cluster - Job execution and cluster creation timeouts

classic Classic list List threaded Threaded
6 messages Options
Reply | Threaded
Open this post in threaded view
|

Application cluster - Job execution and cluster creation timeouts

Tamir Sagi
Hey all,

We deploy application cluster natively on Kubernetes.

are there any timeouts for Job execution and cluster creation?

I went over the configuration page here  but did not find anything relevant.

In order to get an indication about the cluster , we leverage the k8s client to <a href="https://github.com/fabric8io/kubernetes-client/blob/master/doc/CHEATSHEET.md#deployment#:~:text=Watching%20a%20Deployment%3A" title="https://github.com/fabric8io/kubernetes-client/blob/master/doc/CHEATSHEET.md#deployment#:~:text=Watching%20a%20Deployment%3A" style="margin:0px">watch the deployment in a namespace with specific cluster name and respond accordingly.

we define two timeouts
  1. Creating the application cluster (i.e. to date if there are errors in pods, the k8s deployment is up but the application cluster is not running.)
  2. Until the application cluster resources get cleaned(upon completion)  - which prevent an infinite job execution or k8s glitches 

However,  this solution is not ideal because in case this client lib crashes, the timeouts are gone.
We don't want to manage these timeouts states ourselves.

Any suggestion or better way?

Thanks,
Tamir.





Confidentiality: This communication and any attachments are intended for the above-named persons only and may be confidential and/or legally privileged. Any opinions expressed in this communication are not necessarily those of NICE Actimize. If this communication has come to you in error you must take no action based on it, nor must you copy or show it to anyone; please delete/destroy and inform the sender by e-mail immediately. 
Monitoring: NICE Actimize may monitor incoming and outgoing e-mails.
Viruses: Although we have taken steps toward ensuring that this e-mail and attachments are free from any virus, we advise that in keeping with good computing practice the recipient should ensure they are actually virus free.

Reply | Threaded
Open this post in threaded view
|

Re: Application cluster - Job execution and cluster creation timeouts

Yang Wang
Hi Tamir,

Thanks for trying the native K8s integration.

1. We do not have a timeout for creating the Flink application cluster. The reason is that the job submission happens on the JobManager side.
So the Flink client does not need to wait for the JobManager running and then exit.

I think even though the Flink client internally has the timeout, we still have the same problem when the Flink client crashes and then the timeout is
gone.

I want to share some other solution about the timeout. In our deployer, when a new Flink application is created, the deployer will periodically check the
accessibility of Flink rest endpoint. When it is not ready in the timeout(e.g. 120s), the deployer will delete the Flink JobManager deployment and try to
create a new one.

2. Actually, the current "flink run-application" does not support the real attached mode(waiting for all the jobs in the application finished).
I am curious why you have "infinite job execution" in your Flink  application cluster. If all the jobs in the application finished, Flink will
deregister the application and all the K8s resources should be cleaned up.


Best,
Yang


Tamir Sagi <[hidden email]> 于2021年4月5日周一 下午11:24写道:
Hey all,

We deploy application cluster natively on Kubernetes.

are there any timeouts for Job execution and cluster creation?

I went over the configuration page here  but did not find anything relevant.

In order to get an indication about the cluster , we leverage the k8s client to watch the deployment in a namespace with specific cluster name and respond accordingly.

we define two timeouts
  1. Creating the application cluster (i.e. to date if there are errors in pods, the k8s deployment is up but the application cluster is not running.)
  2. Until the application cluster resources get cleaned(upon completion)  - which prevent an infinite job execution or k8s glitches 

However,  this solution is not ideal because in case this client lib crashes, the timeouts are gone.
We don't want to manage these timeouts states ourselves.

Any suggestion or better way?

Thanks,
Tamir.





Confidentiality: This communication and any attachments are intended for the above-named persons only and may be confidential and/or legally privileged. Any opinions expressed in this communication are not necessarily those of NICE Actimize. If this communication has come to you in error you must take no action based on it, nor must you copy or show it to anyone; please delete/destroy and inform the sender by e-mail immediately. 
Monitoring: NICE Actimize may monitor incoming and outgoing e-mails.
Viruses: Although we have taken steps toward ensuring that this e-mail and attachments are free from any virus, we advise that in keeping with good computing practice the recipient should ensure they are actually virus free.

Reply | Threaded
Open this post in threaded view
|

Re: Application cluster - Job execution and cluster creation timeouts

Tamir Sagi
Hey Yang

Thank you for your respond

We run the application cluster programmatically. 
 
I discussed about it here with an example how to run it from java and not CLI.

following your comment
accessibility of Flink rest endpoint. When it is not ready in the timeout(e.g. 120s), the deployer will delete the Flink JobManager deployment and try to create a new one.
I have not seen it in action actually, I gave a non-existing image . The deployer actually started the k8s deployment but pods failed to start(expected) , The k8s deployment was running infinite.

What configuration is that ? is it possible to override it ?

I delved into the Flink-Core, and Flink-Kubernetes jars, Since Flink is dependent on Kubernetes ,  we both need to leverage the Kubernetes client(which Flink does internally) to manage and inspecting the resources.
I am curious why you have "infinite job execution" in your Flink  application cluster. If all the jobs in the application finished, Flink will
deregister the application and all the K8s resources should be cleaned up.
My though was about what happens if there is a bug and the job running infinite, job manager crashes over and over again?
What happens if resources don't get cleaned properly ? We don't want to keep the cluster up and running in that case and would like to get a feedback. Since Flink does not support that we have to inspect that externally.(which makes it more complex)
We could also pull the job status using Flink client, but it become useless if the job is executed infinite.

What do you think?

Best,
Tamir.



From: Yang Wang <[hidden email]>
Sent: Tuesday, April 6, 2021 10:36 AM
To: Tamir Sagi <[hidden email]>
Cc: [hidden email] <[hidden email]>
Subject: Re: Application cluster - Job execution and cluster creation timeouts
 

EXTERNAL EMAIL



Hi Tamir,

Thanks for trying the native K8s integration.

1. We do not have a timeout for creating the Flink application cluster. The reason is that the job submission happens on the JobManager side.
So the Flink client does not need to wait for the JobManager running and then exit.

I think even though the Flink client internally has the timeout, we still have the same problem when the Flink client crashes and then the timeout is
gone.

I want to share some other solution about the timeout. In our deployer, when a new Flink application is created, the deployer will periodically check the
accessibility of Flink rest endpoint. When it is not ready in the timeout(e.g. 120s), the deployer will delete the Flink JobManager deployment and try to
create a new one.

2. Actually, the current "flink run-application" does not support the real attached mode(waiting for all the jobs in the application finished).
I am curious why you have "infinite job execution" in your Flink  application cluster. If all the jobs in the application finished, Flink will
deregister the application and all the K8s resources should be cleaned up.


Best,
Yang


Tamir Sagi <[hidden email]> 于2021年4月5日周一 下午11:24写道:
Hey all,

We deploy application cluster natively on Kubernetes.

are there any timeouts for Job execution and cluster creation?

I went over the configuration page here  but did not find anything relevant.

In order to get an indication about the cluster , we leverage the k8s client to watch the deployment in a namespace with specific cluster name and respond accordingly.

we define two timeouts
  1. Creating the application cluster (i.e. to date if there are errors in pods, the k8s deployment is up but the application cluster is not running.)
  2. Until the application cluster resources get cleaned(upon completion)  - which prevent an infinite job execution or k8s glitches 

However,  this solution is not ideal because in case this client lib crashes, the timeouts are gone.
We don't want to manage these timeouts states ourselves.

Any suggestion or better way?

Thanks,
Tamir.





Confidentiality: This communication and any attachments are intended for the above-named persons only and may be confidential and/or legally privileged. Any opinions expressed in this communication are not necessarily those of NICE Actimize. If this communication has come to you in error you must take no action based on it, nor must you copy or show it to anyone; please delete/destroy and inform the sender by e-mail immediately. 
Monitoring: NICE Actimize may monitor incoming and outgoing e-mails.
Viruses: Although we have taken steps toward ensuring that this e-mail and attachments are free from any virus, we advise that in keeping with good computing practice the recipient should ensure they are actually virus free.


Confidentiality: This communication and any attachments are intended for the above-named persons only and may be confidential and/or legally privileged. Any opinions expressed in this communication are not necessarily those of NICE Actimize. If this communication has come to you in error you must take no action based on it, nor must you copy or show it to anyone; please delete/destroy and inform the sender by e-mail immediately. 
Monitoring: NICE Actimize may monitor incoming and outgoing e-mails.
Viruses: Although we have taken steps toward ensuring that this e-mail and attachments are free from any virus, we advise that in keeping with good computing practice the recipient should ensure they are actually virus free.

Reply | Threaded
Open this post in threaded view
|

Re: Application cluster - Job execution and cluster creation timeouts

Yang Wang
Hi Tamir,

Maybe I did not make myself clear. Here the "deployer" means our internal Flink application deployer(actually it is ververica platform),
not the ApplicationDeployer interface in Flink. It helps with managing the lifecycle of every Flink application. And it has the same native
K8s integration mechanism with you have mentioned.

In my opinion, cleaning up the infinite failover Flink application(e.g. wrong image) is the responsibility of your own deployer, not the Flink
client. In such a case, the JobManager usually could not run normally.

However, if the JobManager could be started successfully. Then it will clean up all the K8s resources once all the jobs reached to the
terminal status(e.g. FAILED, CANCELED, FINISHED). Even the JobManager crashed, it could recover the jobs from latest checkpoint
successfully if HA[1] enabled.



Best,
Yang


Tamir Sagi <[hidden email]> 于2021年4月6日周二 下午6:43写道:
Hey Yang

Thank you for your respond

We run the application cluster programmatically. 
 
I discussed about it here with an example how to run it from java and not CLI.

following your comment
accessibility of Flink rest endpoint. When it is not ready in the timeout(e.g. 120s), the deployer will delete the Flink JobManager deployment and try to create a new one.
I have not seen it in action actually, I gave a non-existing image . The deployer actually started the k8s deployment but pods failed to start(expected) , The k8s deployment was running infinite.

What configuration is that ? is it possible to override it ?

I delved into the Flink-Core, and Flink-Kubernetes jars, Since Flink is dependent on Kubernetes ,  we both need to leverage the Kubernetes client(which Flink does internally) to manage and inspecting the resources.
I am curious why you have "infinite job execution" in your Flink  application cluster. If all the jobs in the application finished, Flink will
deregister the application and all the K8s resources should be cleaned up.
My though was about what happens if there is a bug and the job running infinite, job manager crashes over and over again?
What happens if resources don't get cleaned properly ? We don't want to keep the cluster up and running in that case and would like to get a feedback. Since Flink does not support that we have to inspect that externally.(which makes it more complex)
We could also pull the job status using Flink client, but it become useless if the job is executed infinite.

What do you think?

Best,
Tamir.



From: Yang Wang <[hidden email]>
Sent: Tuesday, April 6, 2021 10:36 AM
To: Tamir Sagi <[hidden email]>
Cc: [hidden email] <[hidden email]>
Subject: Re: Application cluster - Job execution and cluster creation timeouts
 

EXTERNAL EMAIL



Hi Tamir,

Thanks for trying the native K8s integration.

1. We do not have a timeout for creating the Flink application cluster. The reason is that the job submission happens on the JobManager side.
So the Flink client does not need to wait for the JobManager running and then exit.

I think even though the Flink client internally has the timeout, we still have the same problem when the Flink client crashes and then the timeout is
gone.

I want to share some other solution about the timeout. In our deployer, when a new Flink application is created, the deployer will periodically check the
accessibility of Flink rest endpoint. When it is not ready in the timeout(e.g. 120s), the deployer will delete the Flink JobManager deployment and try to
create a new one.

2. Actually, the current "flink run-application" does not support the real attached mode(waiting for all the jobs in the application finished).
I am curious why you have "infinite job execution" in your Flink  application cluster. If all the jobs in the application finished, Flink will
deregister the application and all the K8s resources should be cleaned up.


Best,
Yang


Tamir Sagi <[hidden email]> 于2021年4月5日周一 下午11:24写道:
Hey all,

We deploy application cluster natively on Kubernetes.

are there any timeouts for Job execution and cluster creation?

I went over the configuration page here  but did not find anything relevant.

In order to get an indication about the cluster , we leverage the k8s client to watch the deployment in a namespace with specific cluster name and respond accordingly.

we define two timeouts
  1. Creating the application cluster (i.e. to date if there are errors in pods, the k8s deployment is up but the application cluster is not running.)
  2. Until the application cluster resources get cleaned(upon completion)  - which prevent an infinite job execution or k8s glitches 

However,  this solution is not ideal because in case this client lib crashes, the timeouts are gone.
We don't want to manage these timeouts states ourselves.

Any suggestion or better way?

Thanks,
Tamir.





Confidentiality: This communication and any attachments are intended for the above-named persons only and may be confidential and/or legally privileged. Any opinions expressed in this communication are not necessarily those of NICE Actimize. If this communication has come to you in error you must take no action based on it, nor must you copy or show it to anyone; please delete/destroy and inform the sender by e-mail immediately. 
Monitoring: NICE Actimize may monitor incoming and outgoing e-mails.
Viruses: Although we have taken steps toward ensuring that this e-mail and attachments are free from any virus, we advise that in keeping with good computing practice the recipient should ensure they are actually virus free.


Confidentiality: This communication and any attachments are intended for the above-named persons only and may be confidential and/or legally privileged. Any opinions expressed in this communication are not necessarily those of NICE Actimize. If this communication has come to you in error you must take no action based on it, nor must you copy or show it to anyone; please delete/destroy and inform the sender by e-mail immediately. 
Monitoring: NICE Actimize may monitor incoming and outgoing e-mails.
Viruses: Although we have taken steps toward ensuring that this e-mail and attachments are free from any virus, we advise that in keeping with good computing practice the recipient should ensure they are actually virus free.

Reply | Threaded
Open this post in threaded view
|

Re: Application cluster - Job execution and cluster creation timeouts

Tamir Sagi
Hey Yang, Community

As been discussed few weeks ago, I'm working on Application Cluster - Native K8s approach, running Flink 1.12.2.
We deploy application clusters programmatically which works well.
In addition, we leverage Kubernetes client(Fabric8io) to watch the deployment/pods status and get an indication whether the k8s cluster is up and running.

Job Details
Process the data and write it back to S3.

We fetch the job list using ClusterClient.( cluent.listJobs()
No savepoints/backend state are configured. 

I would like to raise several questions regarding some scenarios I encountered and would like to get your feedback.
These scenarios showed Flink Application cluster(native k8s) behavior in case of failures.


Scenario 1: Exception prior env.execute() gets called.
I deploy application cluster and an exception is thrown prior env.execute() gets called.

Result: Received exception, nothing gets cleaned - Job Manager pod is still running despite no jobs are running.  

Question: How should we get the "Real" Cluster status, Job manager pod is running but the execution has never occurred. (Memory leak ? )

Scenario 2 : Application state Error and no running jobs.
env.execute gets called and an exception is thrown prior the Job starts.
I did not provide AWS credential and exception com.amazonaws.AmazonClientException was thrown.
which led to "Caused by: org.apache.flink.runtime.client.JobExecutionException" error.

the Application state was failed but the list jobs was empty(they never started)
Lists the currently running and finished jobs on the cluster.
I still see debug logs where Flink is aware of that exception and clean all the resources. 

[2021-04-25T13:59:59,569][Info] {} [o.a.f.r.e.ClusterEntrypoint]: Shutting KubernetesApplicationClusterEntrypoint down with application status FAILED. Diagnostics null.
[2021-04-25T13:59:59,569][Info] {} [o.a.f.r.j.MiniDispatcherRestEndpoint]: Shutting down rest endpoint.
[2021-04-25T13:59:59,767][Info] {} [o.a.f.r.r.a.ActiveResourceManager]: Shut down cluster because application is in FAILED, diagnostics null.
[2021-04-25T13:59:59,768][Info] {} [o.a.f.k.KubernetesResourceManagerDriver]: Deregistering Flink Kubernetes cluster, clusterId: test-flink-app-9645, diagnostics:

Result: cluster gets destroyed, listJobs is empty until client gets "UnknownHost" Exception. (Cluster no longer exists)
Question: How can we get the application state outside the cluster? or catch JobExecutionException ? 


Scenario 3: Job starts and throws an exception, Job Status remains in progress
Once the job is executed its status changed to In Progress,   list jobs are retrieved(within few seconds) and for each job we query job status via "clusterClient.requestJobResult(jobId)", however once the job failed the result never changes to "Failed" but the ComplteableFuture get an exception due to max number of retries.

Code snippet

try {
        CompletableFuture<JobResult> jobResultFuture = client.requestJobResult(flinkJobId);
        jobResultFuture.thenAccept(jobResult -> handleJobResult(jobResult))
                    .exceptionally(throwable -> {
                        handleJobResultWithException(flinkJobId, Optional.of(throwable));
                        return null;
                    });
        } catch (Exception e) {
            handleGetJobResultWithException(flinkJobId, Optional.of(e));
}

Stacktrace

Caused by: org.apache.flink.runtime.concurrent.FutureUtils$RetryException: Could not complete the operation. Number of retries has been exhausted.
at org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$9(FutureUtils.java:386) ~[flink-runtime_2.12-1.12.1.jar!/:1.12.1]
... 33 more
Caused by: java.util.concurrent.CompletionException: java.net.UnknownHostException: test-flink-app-24569-rest.testing
at java.util.concurrent.CompletableFuture.encodeThrowable(Unknown Source) ~[?:?]
at java.util.concurrent.CompletableFuture.completeThrowable(Unknown Source) ~[?:?]
at java.util.concurrent.CompletableFuture$UniCompose.tryFire(Unknown Source) ~[?:?]
... 31 more
Caused by: java.net.UnknownHostException: test-flink-app-24569-rest.testing
at java.net.InetAddress$CachedAddresses.get(Unknown Source) ~[?:?]


Result: the resources get cleaned, then the future can no longer get the cluster status. we always end up in the exceptionally clause.
Question: Why the job result is not changed to failed? what am I missing?


Highly appreciate your help.

Tamir.







From: Yang Wang <[hidden email]>
Sent: Wednesday, April 7, 2021 6:24 AM
To: Tamir Sagi <[hidden email]>
Cc: [hidden email] <[hidden email]>
Subject: Re: Application cluster - Job execution and cluster creation timeouts
 

EXTERNAL EMAIL



Hi Tamir,

Maybe I did not make myself clear. Here the "deployer" means our internal Flink application deployer(actually it is ververica platform),
not the ApplicationDeployer interface in Flink. It helps with managing the lifecycle of every Flink application. And it has the same native
K8s integration mechanism with you have mentioned.

In my opinion, cleaning up the infinite failover Flink application(e.g. wrong image) is the responsibility of your own deployer, not the Flink
client. In such a case, the JobManager usually could not run normally.

However, if the JobManager could be started successfully. Then it will clean up all the K8s resources once all the jobs reached to the
terminal status(e.g. FAILED, CANCELED, FINISHED). Even the JobManager crashed, it could recover the jobs from latest checkpoint
successfully if HA[1] enabled.



Best,
Yang


Tamir Sagi <[hidden email]> 于2021年4月6日周二 下午6:43写道:
Hey Yang

Thank you for your respond

We run the application cluster programmatically. 
 
I discussed about it here with an example how to run it from java and not CLI.

following your comment
accessibility of Flink rest endpoint. When it is not ready in the timeout(e.g. 120s), the deployer will delete the Flink JobManager deployment and try to create a new one.
I have not seen it in action actually, I gave a non-existing image . The deployer actually started the k8s deployment but pods failed to start(expected) , The k8s deployment was running infinite.

What configuration is that ? is it possible to override it ?

I delved into the Flink-Core, and Flink-Kubernetes jars, Since Flink is dependent on Kubernetes ,  we both need to leverage the Kubernetes client(which Flink does internally) to manage and inspecting the resources.
I am curious why you have "infinite job execution" in your Flink  application cluster. If all the jobs in the application finished, Flink will
deregister the application and all the K8s resources should be cleaned up.
My though was about what happens if there is a bug and the job running infinite, job manager crashes over and over again?
What happens if resources don't get cleaned properly ? We don't want to keep the cluster up and running in that case and would like to get a feedback. Since Flink does not support that we have to inspect that externally.(which makes it more complex)
We could also pull the job status using Flink client, but it become useless if the job is executed infinite.

What do you think?

Best,
Tamir.



From: Yang Wang <[hidden email]>
Sent: Tuesday, April 6, 2021 10:36 AM
To: Tamir Sagi <[hidden email]>
Cc: [hidden email] <[hidden email]>
Subject: Re: Application cluster - Job execution and cluster creation timeouts
 

EXTERNAL EMAIL



Hi Tamir,

Thanks for trying the native K8s integration.

1. We do not have a timeout for creating the Flink application cluster. The reason is that the job submission happens on the JobManager side.
So the Flink client does not need to wait for the JobManager running and then exit.

I think even though the Flink client internally has the timeout, we still have the same problem when the Flink client crashes and then the timeout is
gone.

I want to share some other solution about the timeout. In our deployer, when a new Flink application is created, the deployer will periodically check the
accessibility of Flink rest endpoint. When it is not ready in the timeout(e.g. 120s), the deployer will delete the Flink JobManager deployment and try to
create a new one.

2. Actually, the current "flink run-application" does not support the real attached mode(waiting for all the jobs in the application finished).
I am curious why you have "infinite job execution" in your Flink  application cluster. If all the jobs in the application finished, Flink will
deregister the application and all the K8s resources should be cleaned up.


Best,
Yang


Tamir Sagi <[hidden email]> 于2021年4月5日周一 下午11:24写道:
Hey all,

We deploy application cluster natively on Kubernetes.

are there any timeouts for Job execution and cluster creation?

I went over the configuration page here  but did not find anything relevant.

In order to get an indication about the cluster , we leverage the k8s client to watch the deployment in a namespace with specific cluster name and respond accordingly.

we define two timeouts
  1. Creating the application cluster (i.e. to date if there are errors in pods, the k8s deployment is up but the application cluster is not running.)
  2. Until the application cluster resources get cleaned(upon completion)  - which prevent an infinite job execution or k8s glitches 

However,  this solution is not ideal because in case this client lib crashes, the timeouts are gone.
We don't want to manage these timeouts states ourselves.

Any suggestion or better way?

Thanks,
Tamir.





Confidentiality: This communication and any attachments are intended for the above-named persons only and may be confidential and/or legally privileged. Any opinions expressed in this communication are not necessarily those of NICE Actimize. If this communication has come to you in error you must take no action based on it, nor must you copy or show it to anyone; please delete/destroy and inform the sender by e-mail immediately. 
Monitoring: NICE Actimize may monitor incoming and outgoing e-mails.
Viruses: Although we have taken steps toward ensuring that this e-mail and attachments are free from any virus, we advise that in keeping with good computing practice the recipient should ensure they are actually virus free.


Confidentiality: This communication and any attachments are intended for the above-named persons only and may be confidential and/or legally privileged. Any opinions expressed in this communication are not necessarily those of NICE Actimize. If this communication has come to you in error you must take no action based on it, nor must you copy or show it to anyone; please delete/destroy and inform the sender by e-mail immediately. 
Monitoring: NICE Actimize may monitor incoming and outgoing e-mails.
Viruses: Although we have taken steps toward ensuring that this e-mail and attachments are free from any virus, we advise that in keeping with good computing practice the recipient should ensure they are actually virus free.


Confidentiality: This communication and any attachments are intended for the above-named persons only and may be confidential and/or legally privileged. Any opinions expressed in this communication are not necessarily those of NICE Actimize. If this communication has come to you in error you must take no action based on it, nor must you copy or show it to anyone; please delete/destroy and inform the sender by e-mail immediately. 
Monitoring: NICE Actimize may monitor incoming and outgoing e-mails.
Viruses: Although we have taken steps toward ensuring that this e-mail and attachments are free from any virus, we advise that in keeping with good computing practice the recipient should ensure they are actually virus free.

Reply | Threaded
Open this post in threaded view
|

Re: Application cluster - Job execution and cluster creation timeouts

Tamir Sagi
Hey All,

I know Application cluster is not widely used yet, I'm happy to be part of Flink community , test it and share the results.

Following my previous email, I'd like to share more information and get your feedback.

Scenario 4 : requestJobResult() gets out of sync.
The result is very similar to Scenario #3, here I delved into it and understood the reason.

As I mentioned before, once the Cluster is up and running, we create RestCluster , fetch the listJobs() and query each one of them for jobResult using client.requestJobResult(jobId)

requestJobResult(jobId) will re-schedule requests unless an exception is thrown or the job is completed.


As for the RestClusterClient class it's initialized with initalTime = 10L, maxWait=2000L

The interval is returned from "sleep" method


which means that after 8 requests the interval will always be 2 seconds.
1st Request will be sent after 10ms
2nd Request will be sent after 20ms
3rd Request will be sent after 40ms
4th Request will be sent after 80ms
5th Request will be sent after 160ms
6th Request will be sent after 320ms
7th Request will be sent after 640ms
8th Request will be sent after 1280ms
9th Request will be sent after 2000ms
10th Request will be sent after 2000ms
11th Request will be sent after 2000ms
.
.
.
and so on

For example:
[2021-04-28T13:44:25,030][Debug] {} [o.a.f.r.r.RestClient]: Received response {"status":{"id":"IN_PROGRESS"}}.
[2021-04-28T13:44:25,045][Debug] {} [o.a.f.r.r.RestClient]: Sending request of class class org.apache.flink.runtime.rest.messages.EmptyRequestBody to test-flink-app-12748-rest.testing:8081/v1/jobs/e9a30c33c015a6498143162947f6b4e6/execution-result
[2021-04-28T13:44:25,107][Debug] {} [o.a.f.r.r.RestClient]: Received response {"status":{"id":"IN_PROGRESS"}}.
[2021-04-28T13:44:25,128][Debug] {} [o.a.f.r.r.RestClient]: Sending request of class class org.apache.flink.runtime.rest.messages.EmptyRequestBody to test-flink-app-12748-rest.testing:8081/v1/jobs/e9a30c33c015a6498143162947f6b4e6/execution-result
[2021-04-28T13:44:25,144][Debug] {} [o.a.f.r.r.RestClient]: Received response {"status":{"id":"IN_PROGRESS"}}.
[2021-04-28T13:44:25,185][Debug] {} [o.a.f.r.r.RestClient]: Sending request of class class org.apache.flink.runtime.rest.messages.EmptyRequestBody to test-flink-app-12748-rest.testing:8081/v1/jobs/e9a30c33c015a6498143162947f6b4e6/execution-result
[2021-04-28T13:44:25,213][Debug] {} [o.a.f.r.r.RestClient]: Received response {"status":{"id":"IN_PROGRESS"}}.
[2021-04-28T13:44:25,294][Debug] {} [o.a.f.r.r.RestClient]: Sending request of class class org.apache.flink.runtime.rest.messages.EmptyRequestBody to test-flink-app-12748-rest.testing:8081/v1/jobs/e9a30c33c015a6498143162947f6b4e6/execution-result
[2021-04-28T13:44:25,302][Debug] {} [o.a.f.r.r.RestClient]: Received response {"status":{"id":"IN_PROGRESS"}}.
[2021-04-28T13:44:25,465][Debug] {} [o.a.f.r.r.RestClient]: Sending request of class class org.apache.flink.runtime.rest.messages.EmptyRequestBody to test-flink-app-12748-rest.testing:8081/v1/jobs/e9a30c33c015a6498143162947f6b4e6/execution-result
[2021-04-28T13:44:25,517][Debug] {} [o.a.f.r.r.RestClient]: Received response {"status":{"id":"IN_PROGRESS"}}.
[2021-04-28T13:44:25,838][Debug] {} [o.a.f.r.r.RestClient]: Sending request of class class org.apache.flink.runtime.rest.messages.EmptyRequestBody to test-flink-app-12748-rest.testing:8081/v1/jobs/e9a30c33c015a6498143162947f6b4e6/execution-result
[2021-04-28T13:44:25,904][Debug] {} [o.a.f.r.r.RestClient]: Received response {"status":{"id":"IN_PROGRESS"}}.
[2021-04-28T13:44:26,545][Debug] {} [o.a.f.r.r.RestClient]: Sending request of class class org.apache.flink.runtime.rest.messages.EmptyRequestBody to test-flink-app-12748-rest.testing:8081/v1/jobs/e9a30c33c015a6498143162947f6b4e6/execution-result
[2021-04-28T13:44:26,924][Debug] {} [o.a.f.r.r.RestClient]: Received response {"status":{"id":"IN_PROGRESS"}}.
[2021-04-28T13:44:28,206][Debug] {} [o.a.f.r.r.RestClient]: Sending request of class class org.apache.flink.runtime.rest.messages.EmptyRequestBody to test-flink-app-12748-rest.testing:8081/v1/jobs/e9a30c33c015a6498143162947f6b4e6/execution-result
[2021-04-28T13:44:28,241][Debug] {} [o.a.f.r.r.RestClient]: Received response {"status":{"id":"IN_PROGRESS"}}.
[2021-04-28T13:44:30,243][Debug] {} [o.a.f.r.r.RestClient]: Sending request of class class org.apache.flink.runtime.rest.messages.EmptyRequestBody to test-flink-app-12748-rest.testing:8081/v1/jobs/e9a30c33c015a6498143162947f6b4e6/execution-result
[2021-04-28T13:44:30,257][Debug] {} [o.a.f.r.r.RestClient]: Received response {"status":{"id":"IN_PROGRESS"}}.
[2021-04-28T13:44:32,258][Debug] {} [o.a.f.r.r.RestClient]: Sending request of class class org.apache.flink.runtime.rest.messages.EmptyRequestBody to test-flink-app-12748-rest.testing:8081/v1/jobs/e9a30c33c015a6498143162947f6b4e6/execution-result

The first few requests are sent way too fast; the job is still in Progress.
The issue is that the job is completed and the cluster gets destroyed between the polling intervals(2s). 
which means that on the next request the cluster does not longer exist, the client cannot get its status and the following exception is eventually raised : java.net.UnknownHostException within the CompleteableFuture

Questions:
  • Is it a good practice to use RestCluster for Application Cluster? - One possible solution is to send a message from the job itself upon completion/failure. (i.e to Kafka topic). it will redundant the JobResult polling.
  • If the RestCluster is suitable also for Application Clusters(by design), Why the polling interval is not configurable?  Maybe we could solve it by smaller polling interval.


Another question is related to the exception JobInitializationException
I see the stacktrace and the exception in RestClient as a response, but it's not reflected , Neither failure nor exception .

Logs Sample
[2021-04-28T14:08:41,456][Error] {} [o.a.f.r.r.RestClient]: Received response was neither of the expected type ([simple type, class org.apache.flink.runtime.rest.messages.job.JobExecutionResultResponseBody]) nor an error. Response=JsonResponse{json={"status":{"id":"COMPLETED"},"job-execution-result":{"id":"4e56d506ca892a3109e48e6a5b804330","application-status":"FAILED","accumulator-results":{},"net-runtime":15540,"failure-cause":{"class":"org.apache.flink.runtime.client.JobInitializationException","stack-trace":"org.apache.flink.runtime.client.JobInitializationException: Could not instantiate JobManager.\n\t at org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$5(Dispatcher.java:494)
  at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(Unknown Source)
  at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
  at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
  at java.base/java.lang.Thread.run(Unknown Source)\nCaused by: org.apache.flink.runtime.client.JobExecutionException: Cannot initialize task 'DataSink (TextOutputFormat (s3://xsight-aggregated-data/6376f1b0-b406-463b-bb52-2e55c68ec9ec/86979f6c-d023-43a1-a6fc-55202e696e54/aggregations) - UTF-8)': doesBucketExist on xsight-aggregated-data: com.amazonaws.AmazonClientException: No AWS Credentials provided by BasicAWSCredentialsProvider EnvironmentVariableCredentialsProvider InstanceProfileCredentialsProvider : com.amazonaws.AmazonServiceException: Internal Server Error (Service: null; Status Code: 500; Error Code: null; Request ID: null; Proxy: null): No AWS Credentials provided by BasicAWSCredentialsProvider EnvironmentVariableCredentialsProvider InstanceProfileCredentialsProvider : com.amazonaws.AmazonServiceException: Internal Server Error (Service: null; Status Code: 500; Error Code: null; Request ID: null; Proxy: null)\n\t at org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:239)
    at org.apache.flink.runtime.scheduler.SchedulerBase.createExecutionGraph(SchedulerBase.java:322)
    at org.apache.flink.runtime.scheduler.SchedulerBase.createAndRestoreExecutionGraph(SchedulerBase.java:276)
    at org.apache.flink.runtime.scheduler.SchedulerBase.<init>(SchedulerBase.java:249)
    at org.apache.flink.runtime.scheduler.DefaultScheduler.<init>(DefaultScheduler.java:133)
    at org.apache.flink.runtime.scheduler.DefaultSchedulerFactory.createInstance(DefaultSchedulerFactory.java:111)
    at org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:345)
    at org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:330)
    at org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:95)
    at org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:39)
    at org.apache.flink.runtime.jobmaster.JobManagerRunnerImpl.<init>(JobManagerRunnerImpl.java:162)
    at org.apache.flink.runtime.dispatcher.DefaultJobManagerRunnerFactory.createJobManagerRunner(DefaultJobManagerRunnerFactory.java:86)
    at org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$5(Dispatcher.java:478)
      ... 4 more\nCaused by: org.apache.hadoop.fs.s3a.AWSClientIOException: doesBucketExist on xsight-aggregated-data: com.amazonaws.AmazonClientException: No AWS Credentials provided by BasicAWSCredentialsProvider EnvironmentVariableCredentialsProvider InstanceProfileCredentialsProvider : com.amazonaws.AmazonServiceException: Internal Server Error (Service: null; Status Code: 500; Error Code: null; Request ID: null; Proxy: null): No AWS Credentials provided by BasicAWSCredentialsProvider EnvironmentVariableCredentialsProvider InstanceProfileCredentialsProvider : com.amazonaws.AmazonServiceException: Internal Server Error (Service: null; Status Code: 500; Error Code: null; Request ID: null; Proxy: null)\n\t at org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java:177)\n\t at org.apache.hadoop.fs.s3a.Invoker.once(Invoker.java:111)\n\t at org.apache.hadoop.fs.s3a.Invoker.lambda$retry$3(Invoker.java:260)\n\t at org.apache.hadoop.fs.s3a.Invoker.retryUntranslated(Invoker.java:317)


Thank you,
Tamir.



From: Tamir Sagi <[hidden email]>
Sent: Sunday, April 25, 2021 10:07 PM
To: Yang Wang <[hidden email]>
Cc: [hidden email] <[hidden email]>
Subject: [SUSPECTED FRAUD]Re: Application cluster - Job execution and cluster creation timeouts
 
Hey Yang, Community

As been discussed few weeks ago, I'm working on Application Cluster - Native K8s approach, running Flink 1.12.2.
We deploy application clusters programmatically which works well.
In addition, we leverage Kubernetes client(Fabric8io) to watch the deployment/pods status and get an indication whether the k8s cluster is up and running.

Job Details
Process the data and write it back to S3.

We fetch the job list using ClusterClient.( cluent.listJobs()
No savepoints/backend state are configured. 

I would like to raise several questions regarding some scenarios I encountered and would like to get your feedback.
These scenarios showed Flink Application cluster(native k8s) behavior in case of failures.


Scenario 1: Exception prior env.execute() gets called.
I deploy application cluster and an exception is thrown prior env.execute() gets called.

Result: Received exception, nothing gets cleaned - Job Manager pod is still running despite no jobs are running.  

Question: How should we get the "Real" Cluster status, Job manager pod is running but the execution has never occurred. (Memory leak ? )

Scenario 2 : Application state Error and no running jobs.
env.execute gets called and an exception is thrown prior the Job starts.
I did not provide AWS credential and exception com.amazonaws.AmazonClientException was thrown.
which led to "Caused by: org.apache.flink.runtime.client.JobExecutionException" error.

the Application state was failed but the list jobs was empty(they never started)
Lists the currently running and finished jobs on the cluster.
I still see debug logs where Flink is aware of that exception and clean all the resources. 

[2021-04-25T13:59:59,569][Info] {} [o.a.f.r.e.ClusterEntrypoint]: Shutting KubernetesApplicationClusterEntrypoint down with application status FAILED. Diagnostics null.
[2021-04-25T13:59:59,569][Info] {} [o.a.f.r.j.MiniDispatcherRestEndpoint]: Shutting down rest endpoint.
[2021-04-25T13:59:59,767][Info] {} [o.a.f.r.r.a.ActiveResourceManager]: Shut down cluster because application is in FAILED, diagnostics null.
[2021-04-25T13:59:59,768][Info] {} [o.a.f.k.KubernetesResourceManagerDriver]: Deregistering Flink Kubernetes cluster, clusterId: test-flink-app-9645, diagnostics:

Result: cluster gets destroyed, listJobs is empty until client gets "UnknownHost" Exception. (Cluster no longer exists)
Question: How can we get the application state outside the cluster? or catch JobExecutionException ? 


Scenario 3: Job starts and throws an exception, Job Status remains in progress
Once the job is executed its status changed to In Progress,   list jobs are retrieved(within few seconds) and for each job we query job status via "clusterClient.requestJobResult(jobId)", however once the job failed the result never changes to "Failed" but the ComplteableFuture get an exception due to max number of retries.

Code snippet

try {
        CompletableFuture<JobResult> jobResultFuture = client.requestJobResult(flinkJobId);
        jobResultFuture.thenAccept(jobResult -> handleJobResult(jobResult))
                    .exceptionally(throwable -> {
                        handleJobResultWithException(flinkJobId, Optional.of(throwable));
                        return null;
                    });
        } catch (Exception e) {
            handleGetJobResultWithException(flinkJobId, Optional.of(e));
}

Stacktrace

Caused by: org.apache.flink.runtime.concurrent.FutureUtils$RetryException: Could not complete the operation. Number of retries has been exhausted.
at org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$9(FutureUtils.java:386) ~[flink-runtime_2.12-1.12.1.jar!/:1.12.1]
... 33 more
Caused by: java.util.concurrent.CompletionException: java.net.UnknownHostException: test-flink-app-24569-rest.testing
at java.util.concurrent.CompletableFuture.encodeThrowable(Unknown Source) ~[?:?]
at java.util.concurrent.CompletableFuture.completeThrowable(Unknown Source) ~[?:?]
at java.util.concurrent.CompletableFuture$UniCompose.tryFire(Unknown Source) ~[?:?]
... 31 more
Caused by: java.net.UnknownHostException: test-flink-app-24569-rest.testing
at java.net.InetAddress$CachedAddresses.get(Unknown Source) ~[?:?]


Result: the resources get cleaned, then the future can no longer get the cluster status. we always end up in the exceptionally clause.
Question: Why the job result is not changed to failed? what am I missing?


Highly appreciate your help.

Tamir.







From: Yang Wang <[hidden email]>
Sent: Wednesday, April 7, 2021 6:24 AM
To: Tamir Sagi <[hidden email]>
Cc: [hidden email] <[hidden email]>
Subject: Re: Application cluster - Job execution and cluster creation timeouts
 

EXTERNAL EMAIL



Hi Tamir,

Maybe I did not make myself clear. Here the "deployer" means our internal Flink application deployer(actually it is ververica platform),
not the ApplicationDeployer interface in Flink. It helps with managing the lifecycle of every Flink application. And it has the same native
K8s integration mechanism with you have mentioned.

In my opinion, cleaning up the infinite failover Flink application(e.g. wrong image) is the responsibility of your own deployer, not the Flink
client. In such a case, the JobManager usually could not run normally.

However, if the JobManager could be started successfully. Then it will clean up all the K8s resources once all the jobs reached to the
terminal status(e.g. FAILED, CANCELED, FINISHED). Even the JobManager crashed, it could recover the jobs from latest checkpoint
successfully if HA[1] enabled.



Best,
Yang


Tamir Sagi <[hidden email]> 于2021年4月6日周二 下午6:43写道:
Hey Yang

Thank you for your respond

We run the application cluster programmatically. 
 
I discussed about it here with an example how to run it from java and not CLI.

following your comment
accessibility of Flink rest endpoint. When it is not ready in the timeout(e.g. 120s), the deployer will delete the Flink JobManager deployment and try to create a new one.
I have not seen it in action actually, I gave a non-existing image . The deployer actually started the k8s deployment but pods failed to start(expected) , The k8s deployment was running infinite.

What configuration is that ? is it possible to override it ?

I delved into the Flink-Core, and Flink-Kubernetes jars, Since Flink is dependent on Kubernetes ,  we both need to leverage the Kubernetes client(which Flink does internally) to manage and inspecting the resources.
I am curious why you have "infinite job execution" in your Flink  application cluster. If all the jobs in the application finished, Flink will
deregister the application and all the K8s resources should be cleaned up.
My though was about what happens if there is a bug and the job running infinite, job manager crashes over and over again?
What happens if resources don't get cleaned properly ? We don't want to keep the cluster up and running in that case and would like to get a feedback. Since Flink does not support that we have to inspect that externally.(which makes it more complex)
We could also pull the job status using Flink client, but it become useless if the job is executed infinite.

What do you think?

Best,
Tamir.



From: Yang Wang <[hidden email]>
Sent: Tuesday, April 6, 2021 10:36 AM
To: Tamir Sagi <[hidden email]>
Cc: [hidden email] <[hidden email]>
Subject: Re: Application cluster - Job execution and cluster creation timeouts
 

EXTERNAL EMAIL



Hi Tamir,

Thanks for trying the native K8s integration.

1. We do not have a timeout for creating the Flink application cluster. The reason is that the job submission happens on the JobManager side.
So the Flink client does not need to wait for the JobManager running and then exit.

I think even though the Flink client internally has the timeout, we still have the same problem when the Flink client crashes and then the timeout is
gone.

I want to share some other solution about the timeout. In our deployer, when a new Flink application is created, the deployer will periodically check the
accessibility of Flink rest endpoint. When it is not ready in the timeout(e.g. 120s), the deployer will delete the Flink JobManager deployment and try to
create a new one.

2. Actually, the current "flink run-application" does not support the real attached mode(waiting for all the jobs in the application finished).
I am curious why you have "infinite job execution" in your Flink  application cluster. If all the jobs in the application finished, Flink will
deregister the application and all the K8s resources should be cleaned up.


Best,
Yang


Tamir Sagi <[hidden email]> 于2021年4月5日周一 下午11:24写道:
Hey all,

We deploy application cluster natively on Kubernetes.

are there any timeouts for Job execution and cluster creation?

I went over the configuration page here  but did not find anything relevant.

In order to get an indication about the cluster , we leverage the k8s client to watch the deployment in a namespace with specific cluster name and respond accordingly.

we define two timeouts
  1. Creating the application cluster (i.e. to date if there are errors in pods, the k8s deployment is up but the application cluster is not running.)
  2. Until the application cluster resources get cleaned(upon completion)  - which prevent an infinite job execution or k8s glitches 

However,  this solution is not ideal because in case this client lib crashes, the timeouts are gone.
We don't want to manage these timeouts states ourselves.

Any suggestion or better way?

Thanks,
Tamir.





Confidentiality: This communication and any attachments are intended for the above-named persons only and may be confidential and/or legally privileged. Any opinions expressed in this communication are not necessarily those of NICE Actimize. If this communication has come to you in error you must take no action based on it, nor must you copy or show it to anyone; please delete/destroy and inform the sender by e-mail immediately. 
Monitoring: NICE Actimize may monitor incoming and outgoing e-mails.
Viruses: Although we have taken steps toward ensuring that this e-mail and attachments are free from any virus, we advise that in keeping with good computing practice the recipient should ensure they are actually virus free.


Confidentiality: This communication and any attachments are intended for the above-named persons only and may be confidential and/or legally privileged. Any opinions expressed in this communication are not necessarily those of NICE Actimize. If this communication has come to you in error you must take no action based on it, nor must you copy or show it to anyone; please delete/destroy and inform the sender by e-mail immediately. 
Monitoring: NICE Actimize may monitor incoming and outgoing e-mails.
Viruses: Although we have taken steps toward ensuring that this e-mail and attachments are free from any virus, we advise that in keeping with good computing practice the recipient should ensure they are actually virus free.


Confidentiality: This communication and any attachments are intended for the above-named persons only and may be confidential and/or legally privileged. Any opinions expressed in this communication are not necessarily those of NICE Actimize. If this communication has come to you in error you must take no action based on it, nor must you copy or show it to anyone; please delete/destroy and inform the sender by e-mail immediately. 
Monitoring: NICE Actimize may monitor incoming and outgoing e-mails.
Viruses: Although we have taken steps toward ensuring that this e-mail and attachments are free from any virus, we advise that in keeping with good computing practice the recipient should ensure they are actually virus free.


Confidentiality: This communication and any attachments are intended for the above-named persons only and may be confidential and/or legally privileged. Any opinions expressed in this communication are not necessarily those of NICE Actimize. If this communication has come to you in error you must take no action based on it, nor must you copy or show it to anyone; please delete/destroy and inform the sender by e-mail immediately. 
Monitoring: NICE Actimize may monitor incoming and outgoing e-mails.
Viruses: Although we have taken steps toward ensuring that this e-mail and attachments are free from any virus, we advise that in keeping with good computing practice the recipient should ensure they are actually virus free.