Flink 1.12

classic Classic list List threaded Threaded
16 messages Options
Reply | Threaded
Open this post in threaded view
|

Flink 1.12

Boris Lublinsky
It is great that Flink 1.12 is out. Several questions:

1. Is official Flink 1.12 distribution https://flink.apache.org/downloads.html specifies Scala versions, but not Java versions. Is it Java 8?
2. I do not see any 1.12 docker images here https://hub.docker.com/_/flink. Are they somewhere else?
3 Flink 1.12 introduces Kubernetes HA support https://ci.apache.org/projects/flink/flink-docs-stable/deployment/ha/kubernetes_ha.html, but Flink native Kubernetes support https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/resource-providers/native_kubernetes.html has no mentioning of HA. Are the 2 integrated? DO you have any examples of starting HA cluster using Flink native Kubernetes?

  
Reply | Threaded
Open this post in threaded view
|

Re: Flink 1.12

Chesnay Schepler
1) It is compiled with Java 8 but runs on Java 8 & 11.
2) Docker images are not yet published.
3) It is mentioned at the top of the Kubernetes HA Services documentation that it also works for the native Kubernetes integration.
Kubernetes high availability services can only be used when deploying to Kubernetes. Consequently, they can be configured when using standalone Flink on Kubernetes or the native Kubernetes integration
From what I understand you only need to configure the 3 listed options; the documentation also contains an example configuration.

On 12/14/2020 4:52 AM, Boris Lublinsky wrote:
It is great that Flink 1.12 is out. Several questions:

1. Is official Flink 1.12 distribution https://flink.apache.org/downloads.html specifies Scala versions, but not Java versions. Is it Java 8?
2. I do not see any 1.12 docker images here https://hub.docker.com/_/flink. Are they somewhere else?
3 Flink 1.12 introduces Kubernetes HA support https://ci.apache.org/projects/flink/flink-docs-stable/deployment/ha/kubernetes_ha.html, but Flink native Kubernetes support https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/resource-providers/native_kubernetes.html has no mentioning of HA. Are the 2 integrated? DO you have any examples of starting HA cluster using Flink native Kubernetes?

  


Reply | Threaded
Open this post in threaded view
|

Re: Flink 1.12

Boris Lublinsky
Thanks.
Do you have ETA for docker images?


On Dec 14, 2020, at 3:43 AM, Chesnay Schepler <[hidden email]> wrote:

1) It is compiled with Java 8 but runs on Java 8 & 11.
2) Docker images are not yet published.
3) It is mentioned at the top of the Kubernetes HA Services documentation that it also works for the native Kubernetes integration.
Kubernetes high availability services can only be used when deploying to Kubernetes. Consequently, they can be configured when using standalone Flink on Kubernetes or the native Kubernetes integration
From what I understand you only need to configure the 3 listed options; the documentation also contains an example configuration.

On 12/14/2020 4:52 AM, Boris Lublinsky wrote:
It is great that Flink 1.12 is out. Several questions:

1. Is official Flink 1.12 distribution https://flink.apache.org/downloads.html specifies Scala versions, but not Java versions. Is it Java 8?
2. I do not see any 1.12 docker images here https://hub.docker.com/_/flink. Are they somewhere else?
3 Flink 1.12 introduces Kubernetes HA support https://ci.apache.org/projects/flink/flink-docs-stable/deployment/ha/kubernetes_ha.html, but Flink native Kubernetes support https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/resource-providers/native_kubernetes.html has no mentioning of HA. Are the 2 integrated? DO you have any examples of starting HA cluster using Flink native Kubernetes?

  



Reply | Threaded
Open this post in threaded view
|

Re: Flink 1.12

Chesnay Schepler
Unfortunately no; there are some discussions going on in the docker-library/official-images PR that have to be resolved first, but currently these would require changes on the Flink side that we cannot do (because it is already released!). We are not sure yet whether we can get the PR accepted and defer further changes to 1.12.1 .

On 12/15/2020 5:17 PM, Boris Lublinsky wrote:
Thanks.
Do you have ETA for docker images?


On Dec 14, 2020, at 3:43 AM, Chesnay Schepler <[hidden email]> wrote:

1) It is compiled with Java 8 but runs on Java 8 & 11.
2) Docker images are not yet published.
3) It is mentioned at the top of the Kubernetes HA Services documentation that it also works for the native Kubernetes integration.
Kubernetes high availability services can only be used when deploying to Kubernetes. Consequently, they can be configured when using standalone Flink on Kubernetes or the native Kubernetes integration
From what I understand you only need to configure the 3 listed options; the documentation also contains an example configuration.

On 12/14/2020 4:52 AM, Boris Lublinsky wrote:
It is great that Flink 1.12 is out. Several questions:

1. Is official Flink 1.12 distribution https://flink.apache.org/downloads.html specifies Scala versions, but not Java versions. Is it Java 8?
2. I do not see any 1.12 docker images here https://hub.docker.com/_/flink. Are they somewhere else?
3 Flink 1.12 introduces Kubernetes HA support https://ci.apache.org/projects/flink/flink-docs-stable/deployment/ha/kubernetes_ha.html, but Flink native Kubernetes support https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/resource-providers/native_kubernetes.html has no mentioning of HA. Are the 2 integrated? DO you have any examples of starting HA cluster using Flink native Kubernetes?

  




Reply | Threaded
Open this post in threaded view
|

Re: Flink 1.12

Boris Lublinsky
Thanks Chesney for your quick response,
More carefully and found the sample, I was looking for:

./bin/flink run-application -p 10 -t kubernetes-application -Dkubernetes.cluster-id=k8s-ha-app1 \
-Dkubernetes.container.image=flink:k8s-ha \ -Dkubernetes.container.image.pull-policy=Always \
-Djobmanager.heap.size=4096m -Dtaskmanager.memory.process.size=4096m \
-Dkubernetes.jobmanager.cpu=1 -Dkubernetes.taskmanager.cpu=2 -Dtaskmanager.numberOfTaskSlots=4 \
-Dhigh-availability=org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory \
-Dhigh-availability.storageDir=oss://flink/flink-ha \
-Drestart-strategy=fixed-delay -Drestart-strategy.fixed-delay.attempts=10 \
-Dcontainerized.master.env.ENABLE_BUILT_IN_PLUGINS=flink-oss-fs-hadoop-1.12.jar \
-Dcontainerized.taskmanager.env.ENABLE_BUILT_IN_PLUGINS=flink-oss-fs-hadoop-1.12.jar \
<a href="local:///opt/flink/examples/streaming/StateMachineExample.jar" class="">local:///opt/flink/examples/streaming/StateMachineExample.jar

A couple of questions about it:

./bin/flink run-application -p 10 -t used to be ./bin/flink run-application -t. What is -p 10?
-Dkubernetes.container.image=flink:k8s-ha does it require a special container build?

-Dhigh-availability.storageDir=oss://flink/flink-ha \
-Dcontainerized.master.env.ENABLE_BUILT_IN_PLUGINS=flink-oss-fs-hadoop-1.12.jar \
-Dcontainerized.taskmanager.env.ENABLE_BUILT_IN_PLUGINS=flink-oss-fs-hadoop-1.12.jar \

This is if I use HDFS for save pointing, right? I can instead use PVC - based save pointing, correct?

Also I was trying to understand, how it works, and from the documentation it sounds like there is one active and one or 
more standby JMs. Can I control the amount of standby JMs?

Finally, what is behavior on the rolling restart of JM deployment?




On Dec 15, 2020, at 10:42 AM, Chesnay Schepler <[hidden email]> wrote:

Unfortunately no; there are some discussions going on in the docker-library/official-images PR that have to be resolved first, but currently these would require changes on the Flink side that we cannot do (because it is already released!). We are not sure yet whether we can get the PR accepted and defer further changes to 1.12.1 .

On 12/15/2020 5:17 PM, Boris Lublinsky wrote:
Thanks.
Do you have ETA for docker images?


On Dec 14, 2020, at 3:43 AM, Chesnay Schepler <[hidden email]> wrote:

1) It is compiled with Java 8 but runs on Java 8 & 11.
2) Docker images are not yet published.
3) It is mentioned at the top of the Kubernetes HA Services documentation that it also works for the native Kubernetes integration.
Kubernetes high availability services can only be used when deploying to Kubernetes. Consequently, they can be configured when using standalone Flink on Kubernetes or the native Kubernetes integration
From what I understand you only need to configure the 3 listed options; the documentation also contains an example configuration.

On 12/14/2020 4:52 AM, Boris Lublinsky wrote:
It is great that Flink 1.12 is out. Several questions:

1. Is official Flink 1.12 distribution https://flink.apache.org/downloads.html specifies Scala versions, but not Java versions. Is it Java 8?
2. I do not see any 1.12 docker images here https://hub.docker.com/_/flink. Are they somewhere else?
3 Flink 1.12 introduces Kubernetes HA support https://ci.apache.org/projects/flink/flink-docs-stable/deployment/ha/kubernetes_ha.html, but Flink native Kubernetes support https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/resource-providers/native_kubernetes.html has no mentioning of HA. Are the 2 integrated? DO you have any examples of starting HA cluster using Flink native Kubernetes?

  





Reply | Threaded
Open this post in threaded view
|

Re: Flink 1.12

Yang Wang
Hi Boris,

What is -p 10?
It is same to --parallelism 10. Set the default parallelism to 10.

does it require a special container build?
No, the official flink docker image could be used directly. Unfortunately, we do not have the image now. And we are trying to figure out.
You could follow the instructions below to have your own image.


git clone https://github.com/apache/flink-docker.git

git checkout dev-master
./add-custom.sh -u https://apache.website-solution.net/flink/flink-1.12.0/flink-1.12.0-bin-scala_2.11.tgz -n flink-1.12.0 cd dev/flink-1.12.0-debian docker build . -t flink:flink-1.12.0 docker push flink:flink-1.12.0

This is if I use HDFS for save pointing, right? I can instead use PVC - based save pointing, correct?
It is an example to storing the HA related data to OSS(Alibaba Cloud Object Storage, similar to S3). Since we require a distributed storage, I am afraid you could not use a PVC here. Instead, you could using a minio.

Can I control the amount of standby JMs? 
Currently, you could not control the number of JobManagers. This is only because we have not introduce a config option for it. But you could do it manually via `kubectl edit deploy <clusterID>`. It should also work.

Finally, what is behavior on the rolling restart of JM deployment?
Once a JobManager terminated, it will lose the leadership and a standby one will take over. So on the rolling restart of JM deployment, you will find that the leader switches multiple times and your job also restarts multiple times. I am not sure why you need to roll the JobManager deployment. We are using deployment for JobManager in Flink just because we want the JobManager to be launched once it crashed. Another reason for multiple JobManagers is to get a faster recovery.


Best,
Yang
 

Boris Lublinsky <[hidden email]> 于2020年12月16日周三 上午9:09写道:
Thanks Chesney for your quick response,
More carefully and found the sample, I was looking for:

./bin/flink run-application -p 10 -t kubernetes-application -Dkubernetes.cluster-id=k8s-ha-app1 \
-Dkubernetes.container.image=flink:k8s-ha \ -Dkubernetes.container.image.pull-policy=Always \
-Djobmanager.heap.size=4096m -Dtaskmanager.memory.process.size=4096m \
-Dkubernetes.jobmanager.cpu=1 -Dkubernetes.taskmanager.cpu=2 -Dtaskmanager.numberOfTaskSlots=4 \
-Dhigh-availability=org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory \
-Dhigh-availability.storageDir=oss://flink/flink-ha \
-Drestart-strategy=fixed-delay -Drestart-strategy.fixed-delay.attempts=10 \
-Dcontainerized.master.env.ENABLE_BUILT_IN_PLUGINS=flink-oss-fs-hadoop-1.12.jar \
-Dcontainerized.taskmanager.env.ENABLE_BUILT_IN_PLUGINS=flink-oss-fs-hadoop-1.12.jar \

A couple of questions about it:

./bin/flink run-application -p 10 -t used to be ./bin/flink run-application -t. What is -p 10?
-Dkubernetes.container.image=flink:k8s-ha does it require a special container build?

-Dhigh-availability.storageDir=oss://flink/flink-ha \
-Dcontainerized.master.env.ENABLE_BUILT_IN_PLUGINS=flink-oss-fs-hadoop-1.12.jar \
-Dcontainerized.taskmanager.env.ENABLE_BUILT_IN_PLUGINS=flink-oss-fs-hadoop-1.12.jar \

This is if I use HDFS for save pointing, right? I can instead use PVC - based save pointing, correct?

Also I was trying to understand, how it works, and from the documentation it sounds like there is one active and one or 
more standby JMs. Can I control the amount of standby JMs?

Finally, what is behavior on the rolling restart of JM deployment?




On Dec 15, 2020, at 10:42 AM, Chesnay Schepler <[hidden email]> wrote:

Unfortunately no; there are some discussions going on in the docker-library/official-images PR that have to be resolved first, but currently these would require changes on the Flink side that we cannot do (because it is already released!). We are not sure yet whether we can get the PR accepted and defer further changes to 1.12.1 .

On 12/15/2020 5:17 PM, Boris Lublinsky wrote:
Thanks.
Do you have ETA for docker images?


On Dec 14, 2020, at 3:43 AM, Chesnay Schepler <[hidden email]> wrote:

1) It is compiled with Java 8 but runs on Java 8 & 11.
2) Docker images are not yet published.
3) It is mentioned at the top of the Kubernetes HA Services documentation that it also works for the native Kubernetes integration.
Kubernetes high availability services can only be used when deploying to Kubernetes. Consequently, they can be configured when using standalone Flink on Kubernetes or the native Kubernetes integration
From what I understand you only need to configure the 3 listed options; the documentation also contains an example configuration.

On 12/14/2020 4:52 AM, Boris Lublinsky wrote:
It is great that Flink 1.12 is out. Several questions:

1. Is official Flink 1.12 distribution https://flink.apache.org/downloads.html specifies Scala versions, but not Java versions. Is it Java 8?
2. I do not see any 1.12 docker images here https://hub.docker.com/_/flink. Are they somewhere else?
3 Flink 1.12 introduces Kubernetes HA support https://ci.apache.org/projects/flink/flink-docs-stable/deployment/ha/kubernetes_ha.html, but Flink native Kubernetes support https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/resource-providers/native_kubernetes.html has no mentioning of HA. Are the 2 integrated? DO you have any examples of starting HA cluster using Flink native Kubernetes?

  





Reply | Threaded
Open this post in threaded view
|

Re: Flink 1.12

Boris Lublinsky
Thanks guys,
The reason I am interested in rolling update is to avoid complete restarts in the case of parameter (for example parallelism) changes.

On Dec 15, 2020, at 8:40 PM, Yang Wang <[hidden email]> wrote:

Hi Boris,

What is -p 10?
It is same to --parallelism 10. Set the default parallelism to 10.

does it require a special container build?
No, the official flink docker image could be used directly. Unfortunately, we do not have the image now. And we are trying to figure out.
You could follow the instructions below to have your own image.


git clone https://github.com/apache/flink-docker.git

git checkout dev-master
./add-custom.sh -u https://apache.website-solution.net/flink/flink-1.12.0/flink-1.12.0-bin-scala_2.11.tgz -n flink-1.12.0 cd dev/flink-1.12.0-debian docker build . -t flink:flink-1.12.0 docker push flink:flink-1.12.0

This is if I use HDFS for save pointing, right? I can instead use PVC - based save pointing, correct?
It is an example to storing the HA related data to OSS(Alibaba Cloud Object Storage, similar to S3). Since we require a distributed storage, I am afraid you could not use a PVC here. Instead, you could using a minio.

Can I control the amount of standby JMs? 
Currently, you could not control the number of JobManagers. This is only because we have not introduce a config option for it. But you could do it manually via `kubectl edit deploy <clusterID>`. It should also work.

Finally, what is behavior on the rolling restart of JM deployment?
Once a JobManager terminated, it will lose the leadership and a standby one will take over. So on the rolling restart of JM deployment, you will find that the leader switches multiple times and your job also restarts multiple times. I am not sure why you need to roll the JobManager deployment. We are using deployment for JobManager in Flink just because we want the JobManager to be launched once it crashed. Another reason for multiple JobManagers is to get a faster recovery.


Best,
Yang
 

Boris Lublinsky <[hidden email]> 于2020年12月16日周三 上午9:09写道:
Thanks Chesney for your quick response,
More carefully and found the sample, I was looking for:

./bin/flink run-application -p 10 -t kubernetes-application -Dkubernetes.cluster-id=k8s-ha-app1 \
-Dkubernetes.container.image=flink:k8s-ha \ -Dkubernetes.container.image.pull-policy=Always \
-Djobmanager.heap.size=4096m -Dtaskmanager.memory.process.size=4096m \
-Dkubernetes.jobmanager.cpu=1 -Dkubernetes.taskmanager.cpu=2 -Dtaskmanager.numberOfTaskSlots=4 \
-Dhigh-availability=org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory \
-Dhigh-availability.storageDir=oss://flink/flink-ha \
-Drestart-strategy=fixed-delay -Drestart-strategy.fixed-delay.attempts=10 \
-Dcontainerized.master.env.ENABLE_BUILT_IN_PLUGINS=flink-oss-fs-hadoop-1.12.jar \
-Dcontainerized.taskmanager.env.ENABLE_BUILT_IN_PLUGINS=flink-oss-fs-hadoop-1.12.jar \

A couple of questions about it:

./bin/flink run-application -p 10 -t used to be ./bin/flink run-application -t. What is -p 10?
-Dkubernetes.container.image=flink:k8s-ha does it require a special container build?

-Dhigh-availability.storageDir=oss://flink/flink-ha \
-Dcontainerized.master.env.ENABLE_BUILT_IN_PLUGINS=flink-oss-fs-hadoop-1.12.jar \
-Dcontainerized.taskmanager.env.ENABLE_BUILT_IN_PLUGINS=flink-oss-fs-hadoop-1.12.jar \

This is if I use HDFS for save pointing, right? I can instead use PVC - based save pointing, correct?

Also I was trying to understand, how it works, and from the documentation it sounds like there is one active and one or 
more standby JMs. Can I control the amount of standby JMs?

Finally, what is behavior on the rolling restart of JM deployment?




On Dec 15, 2020, at 10:42 AM, Chesnay Schepler <[hidden email]> wrote:

Unfortunately no; there are some discussions going on in the docker-library/official-images PR that have to be resolved first, but currently these would require changes on the Flink side that we cannot do (because it is already released!). We are not sure yet whether we can get the PR accepted and defer further changes to 1.12.1 .

On 12/15/2020 5:17 PM, Boris Lublinsky wrote:
Thanks.
Do you have ETA for docker images?


On Dec 14, 2020, at 3:43 AM, Chesnay Schepler <[hidden email]> wrote:

1) It is compiled with Java 8 but runs on Java 8 & 11.
2) Docker images are not yet published.
3) It is mentioned at the top of the Kubernetes HA Services documentation that it also works for the native Kubernetes integration.
Kubernetes high availability services can only be used when deploying to Kubernetes. Consequently, they can be configured when using standalone Flink on Kubernetes or the native Kubernetes integration
From what I understand you only need to configure the 3 listed options; the documentation also contains an example configuration.

On 12/14/2020 4:52 AM, Boris Lublinsky wrote:
It is great that Flink 1.12 is out. Several questions:

1. Is official Flink 1.12 distribution https://flink.apache.org/downloads.html specifies Scala versions, but not Java versions. Is it Java 8?
2. I do not see any 1.12 docker images here https://hub.docker.com/_/flink. Are they somewhere else?
3 Flink 1.12 introduces Kubernetes HA support https://ci.apache.org/projects/flink/flink-docs-stable/deployment/ha/kubernetes_ha.html, but Flink native Kubernetes support https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/resource-providers/native_kubernetes.html has no mentioning of HA. Are the 2 integrated? DO you have any examples of starting HA cluster using Flink native Kubernetes?

  






Reply | Threaded
Open this post in threaded view
|

Re: Flink 1.12

Boris Lublinsky
In reply to this post by Yang Wang
And K8 native HA works,
But there are 2 bugs in this implementation.

1. Task manager pods are running as default user account, which fails because it does not have access to config maps to get endpoint’s information. I had to add permissions to default service account to make it work. Ideally both JM and TM pods should run under the same service account. 
2. When a Flink application is deleted, it clears the main config map, but not the ones used for leader election


And finally it works fine with PVC based storage, as long as it is read-write many


On Dec 15, 2020, at 8:40 PM, Yang Wang <[hidden email]> wrote:

Hi Boris,

What is -p 10?
It is same to --parallelism 10. Set the default parallelism to 10.

does it require a special container build?
No, the official flink docker image could be used directly. Unfortunately, we do not have the image now. And we are trying to figure out.
You could follow the instructions below to have your own image.


git clone https://github.com/apache/flink-docker.git

git checkout dev-master
./add-custom.sh -u https://apache.website-solution.net/flink/flink-1.12.0/flink-1.12.0-bin-scala_2.11.tgz -n flink-1.12.0 cd dev/flink-1.12.0-debian docker build . -t flink:flink-1.12.0 docker push flink:flink-1.12.0

This is if I use HDFS for save pointing, right? I can instead use PVC - based save pointing, correct?
It is an example to storing the HA related data to OSS(Alibaba Cloud Object Storage, similar to S3). Since we require a distributed storage, I am afraid you could not use a PVC here. Instead, you could using a minio.

Can I control the amount of standby JMs? 
Currently, you could not control the number of JobManagers. This is only because we have not introduce a config option for it. But you could do it manually via `kubectl edit deploy <clusterID>`. It should also work.

Finally, what is behavior on the rolling restart of JM deployment?
Once a JobManager terminated, it will lose the leadership and a standby one will take over. So on the rolling restart of JM deployment, you will find that the leader switches multiple times and your job also restarts multiple times. I am not sure why you need to roll the JobManager deployment. We are using deployment for JobManager in Flink just because we want the JobManager to be launched once it crashed. Another reason for multiple JobManagers is to get a faster recovery.


Best,
Yang
 

Boris Lublinsky <[hidden email]> 于2020年12月16日周三 上午9:09写道:
Thanks Chesney for your quick response,
More carefully and found the sample, I was looking for:

./bin/flink run-application -p 10 -t kubernetes-application -Dkubernetes.cluster-id=k8s-ha-app1 \
-Dkubernetes.container.image=flink:k8s-ha \ -Dkubernetes.container.image.pull-policy=Always \
-Djobmanager.heap.size=4096m -Dtaskmanager.memory.process.size=4096m \
-Dkubernetes.jobmanager.cpu=1 -Dkubernetes.taskmanager.cpu=2 -Dtaskmanager.numberOfTaskSlots=4 \
-Dhigh-availability=org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory \
-Dhigh-availability.storageDir=oss://flink/flink-ha \
-Drestart-strategy=fixed-delay -Drestart-strategy.fixed-delay.attempts=10 \
-Dcontainerized.master.env.ENABLE_BUILT_IN_PLUGINS=flink-oss-fs-hadoop-1.12.jar \
-Dcontainerized.taskmanager.env.ENABLE_BUILT_IN_PLUGINS=flink-oss-fs-hadoop-1.12.jar \

A couple of questions about it:

./bin/flink run-application -p 10 -t used to be ./bin/flink run-application -t. What is -p 10?
-Dkubernetes.container.image=flink:k8s-ha does it require a special container build?

-Dhigh-availability.storageDir=oss://flink/flink-ha \
-Dcontainerized.master.env.ENABLE_BUILT_IN_PLUGINS=flink-oss-fs-hadoop-1.12.jar \
-Dcontainerized.taskmanager.env.ENABLE_BUILT_IN_PLUGINS=flink-oss-fs-hadoop-1.12.jar \

This is if I use HDFS for save pointing, right? I can instead use PVC - based save pointing, correct?

Also I was trying to understand, how it works, and from the documentation it sounds like there is one active and one or 
more standby JMs. Can I control the amount of standby JMs?

Finally, what is behavior on the rolling restart of JM deployment?




On Dec 15, 2020, at 10:42 AM, Chesnay Schepler <[hidden email]> wrote:

Unfortunately no; there are some discussions going on in the docker-library/official-images PR that have to be resolved first, but currently these would require changes on the Flink side that we cannot do (because it is already released!). We are not sure yet whether we can get the PR accepted and defer further changes to 1.12.1 .

On 12/15/2020 5:17 PM, Boris Lublinsky wrote:
Thanks.
Do you have ETA for docker images?


On Dec 14, 2020, at 3:43 AM, Chesnay Schepler <[hidden email]> wrote:

1) It is compiled with Java 8 but runs on Java 8 & 11.
2) Docker images are not yet published.
3) It is mentioned at the top of the Kubernetes HA Services documentation that it also works for the native Kubernetes integration.
Kubernetes high availability services can only be used when deploying to Kubernetes. Consequently, they can be configured when using standalone Flink on Kubernetes or the native Kubernetes integration
From what I understand you only need to configure the 3 listed options; the documentation also contains an example configuration.

On 12/14/2020 4:52 AM, Boris Lublinsky wrote:
It is great that Flink 1.12 is out. Several questions:

1. Is official Flink 1.12 distribution https://flink.apache.org/downloads.html specifies Scala versions, but not Java versions. Is it Java 8?
2. I do not see any 1.12 docker images here https://hub.docker.com/_/flink. Are they somewhere else?
3 Flink 1.12 introduces Kubernetes HA support https://ci.apache.org/projects/flink/flink-docs-stable/deployment/ha/kubernetes_ha.html, but Flink native Kubernetes support https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/resource-providers/native_kubernetes.html has no mentioning of HA. Are the 2 integrated? DO you have any examples of starting HA cluster using Flink native Kubernetes?

  






Reply | Threaded
Open this post in threaded view
|

Re: Flink 1.12

Yang Wang
Hi Boris,

Thanks for your follow up response and trying the new KubernetesHAService.

1. It is a valid bug. We are not setting the service account for TaskManager pod. Before the KubernetesHAService is introduced, it works fine because the TaskManager does not need to access the K8s resource(e.g. ConfigMap) directly. I have created a ticket[1] to support setting service account for TaskManager. 
2. If you directly delete the JobManager deployment, then the HA related ConfigMap will be retained. It is a by-design behavior. Because the job does not go to a terminal state(SUCCEED, FAILED, CANCELED), we need this cluster could recover in the future. If all the jobs in the application reach to the terminal state, all the HA related ConfigMaps will be cleaned up automatically. You could cancel the job and verify that. Refer here[2] for more information.

For the PVC based storage, if it could support multiple read-write then the KubernetesHAService should work. Actually, it feels like a distributed storage.


Best,
Yang

Boris Lublinsky <[hidden email]> 于2020年12月18日周五 上午7:16写道:
And K8 native HA works,
But there are 2 bugs in this implementation.

1. Task manager pods are running as default user account, which fails because it does not have access to config maps to get endpoint’s information. I had to add permissions to default service account to make it work. Ideally both JM and TM pods should run under the same service account. 
2. When a Flink application is deleted, it clears the main config map, but not the ones used for leader election


And finally it works fine with PVC based storage, as long as it is read-write many


On Dec 15, 2020, at 8:40 PM, Yang Wang <[hidden email]> wrote:

Hi Boris,

What is -p 10?
It is same to --parallelism 10. Set the default parallelism to 10.

does it require a special container build?
No, the official flink docker image could be used directly. Unfortunately, we do not have the image now. And we are trying to figure out.
You could follow the instructions below to have your own image.


git clone https://github.com/apache/flink-docker.git

git checkout dev-master
./add-custom.sh -u https://apache.website-solution.net/flink/flink-1.12.0/flink-1.12.0-bin-scala_2.11.tgz -n flink-1.12.0 cd dev/flink-1.12.0-debian docker build . -t flink:flink-1.12.0 docker push flink:flink-1.12.0

This is if I use HDFS for save pointing, right? I can instead use PVC - based save pointing, correct?
It is an example to storing the HA related data to OSS(Alibaba Cloud Object Storage, similar to S3). Since we require a distributed storage, I am afraid you could not use a PVC here. Instead, you could using a minio.

Can I control the amount of standby JMs? 
Currently, you could not control the number of JobManagers. This is only because we have not introduce a config option for it. But you could do it manually via `kubectl edit deploy <clusterID>`. It should also work.

Finally, what is behavior on the rolling restart of JM deployment?
Once a JobManager terminated, it will lose the leadership and a standby one will take over. So on the rolling restart of JM deployment, you will find that the leader switches multiple times and your job also restarts multiple times. I am not sure why you need to roll the JobManager deployment. We are using deployment for JobManager in Flink just because we want the JobManager to be launched once it crashed. Another reason for multiple JobManagers is to get a faster recovery.


Best,
Yang
 

Boris Lublinsky <[hidden email]> 于2020年12月16日周三 上午9:09写道:
Thanks Chesney for your quick response,
More carefully and found the sample, I was looking for:

./bin/flink run-application -p 10 -t kubernetes-application -Dkubernetes.cluster-id=k8s-ha-app1 \
-Dkubernetes.container.image=flink:k8s-ha \ -Dkubernetes.container.image.pull-policy=Always \
-Djobmanager.heap.size=4096m -Dtaskmanager.memory.process.size=4096m \
-Dkubernetes.jobmanager.cpu=1 -Dkubernetes.taskmanager.cpu=2 -Dtaskmanager.numberOfTaskSlots=4 \
-Dhigh-availability=org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory \
-Dhigh-availability.storageDir=oss://flink/flink-ha \
-Drestart-strategy=fixed-delay -Drestart-strategy.fixed-delay.attempts=10 \
-Dcontainerized.master.env.ENABLE_BUILT_IN_PLUGINS=flink-oss-fs-hadoop-1.12.jar \
-Dcontainerized.taskmanager.env.ENABLE_BUILT_IN_PLUGINS=flink-oss-fs-hadoop-1.12.jar \

A couple of questions about it:

./bin/flink run-application -p 10 -t used to be ./bin/flink run-application -t. What is -p 10?
-Dkubernetes.container.image=flink:k8s-ha does it require a special container build?

-Dhigh-availability.storageDir=oss://flink/flink-ha \
-Dcontainerized.master.env.ENABLE_BUILT_IN_PLUGINS=flink-oss-fs-hadoop-1.12.jar \
-Dcontainerized.taskmanager.env.ENABLE_BUILT_IN_PLUGINS=flink-oss-fs-hadoop-1.12.jar \

This is if I use HDFS for save pointing, right? I can instead use PVC - based save pointing, correct?

Also I was trying to understand, how it works, and from the documentation it sounds like there is one active and one or 
more standby JMs. Can I control the amount of standby JMs?

Finally, what is behavior on the rolling restart of JM deployment?




On Dec 15, 2020, at 10:42 AM, Chesnay Schepler <[hidden email]> wrote:

Unfortunately no; there are some discussions going on in the docker-library/official-images PR that have to be resolved first, but currently these would require changes on the Flink side that we cannot do (because it is already released!). We are not sure yet whether we can get the PR accepted and defer further changes to 1.12.1 .

On 12/15/2020 5:17 PM, Boris Lublinsky wrote:
Thanks.
Do you have ETA for docker images?


On Dec 14, 2020, at 3:43 AM, Chesnay Schepler <[hidden email]> wrote:

1) It is compiled with Java 8 but runs on Java 8 & 11.
2) Docker images are not yet published.
3) It is mentioned at the top of the Kubernetes HA Services documentation that it also works for the native Kubernetes integration.
Kubernetes high availability services can only be used when deploying to Kubernetes. Consequently, they can be configured when using standalone Flink on Kubernetes or the native Kubernetes integration
From what I understand you only need to configure the 3 listed options; the documentation also contains an example configuration.

On 12/14/2020 4:52 AM, Boris Lublinsky wrote:
It is great that Flink 1.12 is out. Several questions:

1. Is official Flink 1.12 distribution https://flink.apache.org/downloads.html specifies Scala versions, but not Java versions. Is it Java 8?
2. I do not see any 1.12 docker images here https://hub.docker.com/_/flink. Are they somewhere else?
3 Flink 1.12 introduces Kubernetes HA support https://ci.apache.org/projects/flink/flink-docs-stable/deployment/ha/kubernetes_ha.html, but Flink native Kubernetes support https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/resource-providers/native_kubernetes.html has no mentioning of HA. Are the 2 integrated? DO you have any examples of starting HA cluster using Flink native Kubernetes?

  






Reply | Threaded
Open this post in threaded view
|

Re: Flink 1.12

Boris Lublinsky
Thanks Yang,


On Dec 17, 2020, at 8:49 PM, Yang Wang <[hidden email]> wrote:

Hi Boris,

Thanks for your follow up response and trying the new KubernetesHAService.

1. It is a valid bug. We are not setting the service account for TaskManager pod. Before the KubernetesHAService is introduced, it works fine because the TaskManager does not need to access the K8s resource(e.g. ConfigMap) directly. I have created a ticket[1] to support setting service account for TaskManager. 
2. If you directly delete the JobManager deployment, then the HA related ConfigMap will be retained. It is a by-design behavior. Because the job does not go to a terminal state(SUCCEED, FAILED, CANCELED), we need this cluster could recover in the future. If all the jobs in the application reach to the terminal state, all the HA related ConfigMaps will be cleaned up automatically. You could cancel the job and verify that. Refer here[2] for more information.

For the PVC based storage, if it could support multiple read-write then the KubernetesHAService should work. Actually, it feels like a distributed storage.


Best,
Yang

Boris Lublinsky <[hidden email]> 于2020年12月18日周五 上午7:16写道:
And K8 native HA works,
But there are 2 bugs in this implementation.

1. Task manager pods are running as default user account, which fails because it does not have access to config maps to get endpoint’s information. I had to add permissions to default service account to make it work. Ideally both JM and TM pods should run under the same service account. 
2. When a Flink application is deleted, it clears the main config map, but not the ones used for leader election


And finally it works fine with PVC based storage, as long as it is read-write many


On Dec 15, 2020, at 8:40 PM, Yang Wang <[hidden email]> wrote:

Hi Boris,

What is -p 10?
It is same to --parallelism 10. Set the default parallelism to 10.

does it require a special container build?
No, the official flink docker image could be used directly. Unfortunately, we do not have the image now. And we are trying to figure out.
You could follow the instructions below to have your own image.


git clone https://github.com/apache/flink-docker.git

git checkout dev-master
./add-custom.sh -u https://apache.website-solution.net/flink/flink-1.12.0/flink-1.12.0-bin-scala_2.11.tgz -n flink-1.12.0 cd dev/flink-1.12.0-debian docker build . -t flink:flink-1.12.0 docker push flink:flink-1.12.0

This is if I use HDFS for save pointing, right? I can instead use PVC - based save pointing, correct?
It is an example to storing the HA related data to OSS(Alibaba Cloud Object Storage, similar to S3). Since we require a distributed storage, I am afraid you could not use a PVC here. Instead, you could using a minio.

Can I control the amount of standby JMs? 
Currently, you could not control the number of JobManagers. This is only because we have not introduce a config option for it. But you could do it manually via `kubectl edit deploy <clusterID>`. It should also work.

Finally, what is behavior on the rolling restart of JM deployment?
Once a JobManager terminated, it will lose the leadership and a standby one will take over. So on the rolling restart of JM deployment, you will find that the leader switches multiple times and your job also restarts multiple times. I am not sure why you need to roll the JobManager deployment. We are using deployment for JobManager in Flink just because we want the JobManager to be launched once it crashed. Another reason for multiple JobManagers is to get a faster recovery.


Best,
Yang
 

Boris Lublinsky <[hidden email]> 于2020年12月16日周三 上午9:09写道:
Thanks Chesney for your quick response,
More carefully and found the sample, I was looking for:

./bin/flink run-application -p 10 -t kubernetes-application -Dkubernetes.cluster-id=k8s-ha-app1 \
-Dkubernetes.container.image=flink:k8s-ha \ -Dkubernetes.container.image.pull-policy=Always \
-Djobmanager.heap.size=4096m -Dtaskmanager.memory.process.size=4096m \
-Dkubernetes.jobmanager.cpu=1 -Dkubernetes.taskmanager.cpu=2 -Dtaskmanager.numberOfTaskSlots=4 \
-Dhigh-availability=org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory \
-Dhigh-availability.storageDir=oss://flink/flink-ha \
-Drestart-strategy=fixed-delay -Drestart-strategy.fixed-delay.attempts=10 \
-Dcontainerized.master.env.ENABLE_BUILT_IN_PLUGINS=flink-oss-fs-hadoop-1.12.jar \
-Dcontainerized.taskmanager.env.ENABLE_BUILT_IN_PLUGINS=flink-oss-fs-hadoop-1.12.jar \

A couple of questions about it:

./bin/flink run-application -p 10 -t used to be ./bin/flink run-application -t. What is -p 10?
-Dkubernetes.container.image=flink:k8s-ha does it require a special container build?

-Dhigh-availability.storageDir=oss://flink/flink-ha \
-Dcontainerized.master.env.ENABLE_BUILT_IN_PLUGINS=flink-oss-fs-hadoop-1.12.jar \
-Dcontainerized.taskmanager.env.ENABLE_BUILT_IN_PLUGINS=flink-oss-fs-hadoop-1.12.jar \

This is if I use HDFS for save pointing, right? I can instead use PVC - based save pointing, correct?

Also I was trying to understand, how it works, and from the documentation it sounds like there is one active and one or 
more standby JMs. Can I control the amount of standby JMs?

Finally, what is behavior on the rolling restart of JM deployment?




On Dec 15, 2020, at 10:42 AM, Chesnay Schepler <[hidden email]> wrote:

Unfortunately no; there are some discussions going on in the docker-library/official-images PR that have to be resolved first, but currently these would require changes on the Flink side that we cannot do (because it is already released!). We are not sure yet whether we can get the PR accepted and defer further changes to 1.12.1 .

On 12/15/2020 5:17 PM, Boris Lublinsky wrote:
Thanks.
Do you have ETA for docker images?


On Dec 14, 2020, at 3:43 AM, Chesnay Schepler <[hidden email]> wrote:

1) It is compiled with Java 8 but runs on Java 8 & 11.
2) Docker images are not yet published.
3) It is mentioned at the top of the Kubernetes HA Services documentation that it also works for the native Kubernetes integration.
Kubernetes high availability services can only be used when deploying to Kubernetes. Consequently, they can be configured when using standalone Flink on Kubernetes or the native Kubernetes integration
From what I understand you only need to configure the 3 listed options; the documentation also contains an example configuration.

On 12/14/2020 4:52 AM, Boris Lublinsky wrote:
It is great that Flink 1.12 is out. Several questions:

1. Is official Flink 1.12 distribution https://flink.apache.org/downloads.html specifies Scala versions, but not Java versions. Is it Java 8?
2. I do not see any 1.12 docker images here https://hub.docker.com/_/flink. Are they somewhere else?
3 Flink 1.12 introduces Kubernetes HA support https://ci.apache.org/projects/flink/flink-docs-stable/deployment/ha/kubernetes_ha.html, but Flink native Kubernetes support https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/resource-providers/native_kubernetes.html has no mentioning of HA. Are the 2 integrated? DO you have any examples of starting HA cluster using Flink native Kubernetes?

  







Reply | Threaded
Open this post in threaded view
|

Re: Flink 1.12

Boris Lublinsky
In reply to this post by Yang Wang

This does not seem right:
To keep HA data while restarting the Flink cluster, simply delete the deployment (via kubectl delete deploy <cluster-id>). All the Flink cluster related resources will be deleted (e.g. JobManager Deployment, TaskManager pods, services, Flink conf ConfigMap). HA related ConfigMaps will be retained because they do not set the owner reference. When restarting the cluster, all previously running jobs will be recovered and restarted from the latest successful checkpoint.

Last successful checkpoint is not in the config maps, but rather on persistent volume. Config map can be safely deleted. If you restart JM, it will create a new leader anyways., So I would suggest to add owner reference there


On Dec 17, 2020, at 8:49 PM, Yang Wang <[hidden email]> wrote:

Hi Boris,

Thanks for your follow up response and trying the new KubernetesHAService.

1. It is a valid bug. We are not setting the service account for TaskManager pod. Before the KubernetesHAService is introduced, it works fine because the TaskManager does not need to access the K8s resource(e.g. ConfigMap) directly. I have created a ticket[1] to support setting service account for TaskManager. 
2. If you directly delete the JobManager deployment, then the HA related ConfigMap will be retained. It is a by-design behavior. Because the job does not go to a terminal state(SUCCEED, FAILED, CANCELED), we need this cluster could recover in the future. If all the jobs in the application reach to the terminal state, all the HA related ConfigMaps will be cleaned up automatically. You could cancel the job and verify that. Refer here[2] for more information.

For the PVC based storage, if it could support multiple read-write then the KubernetesHAService should work. Actually, it feels like a distributed storage.


Best,
Yang

Boris Lublinsky <[hidden email]> 于2020年12月18日周五 上午7:16写道:
And K8 native HA works,
But there are 2 bugs in this implementation.

1. Task manager pods are running as default user account, which fails because it does not have access to config maps to get endpoint’s information. I had to add permissions to default service account to make it work. Ideally both JM and TM pods should run under the same service account. 
2. When a Flink application is deleted, it clears the main config map, but not the ones used for leader election


And finally it works fine with PVC based storage, as long as it is read-write many


On Dec 15, 2020, at 8:40 PM, Yang Wang <[hidden email]> wrote:

Hi Boris,

What is -p 10?
It is same to --parallelism 10. Set the default parallelism to 10.

does it require a special container build?
No, the official flink docker image could be used directly. Unfortunately, we do not have the image now. And we are trying to figure out.
You could follow the instructions below to have your own image.


git clone https://github.com/apache/flink-docker.git

git checkout dev-master
./add-custom.sh -u https://apache.website-solution.net/flink/flink-1.12.0/flink-1.12.0-bin-scala_2.11.tgz -n flink-1.12.0 cd dev/flink-1.12.0-debian docker build . -t flink:flink-1.12.0 docker push flink:flink-1.12.0

This is if I use HDFS for save pointing, right? I can instead use PVC - based save pointing, correct?
It is an example to storing the HA related data to OSS(Alibaba Cloud Object Storage, similar to S3). Since we require a distributed storage, I am afraid you could not use a PVC here. Instead, you could using a minio.

Can I control the amount of standby JMs? 
Currently, you could not control the number of JobManagers. This is only because we have not introduce a config option for it. But you could do it manually via `kubectl edit deploy <clusterID>`. It should also work.

Finally, what is behavior on the rolling restart of JM deployment?
Once a JobManager terminated, it will lose the leadership and a standby one will take over. So on the rolling restart of JM deployment, you will find that the leader switches multiple times and your job also restarts multiple times. I am not sure why you need to roll the JobManager deployment. We are using deployment for JobManager in Flink just because we want the JobManager to be launched once it crashed. Another reason for multiple JobManagers is to get a faster recovery.


Best,
Yang
 

Boris Lublinsky <[hidden email]> 于2020年12月16日周三 上午9:09写道:
Thanks Chesney for your quick response,
More carefully and found the sample, I was looking for:

./bin/flink run-application -p 10 -t kubernetes-application -Dkubernetes.cluster-id=k8s-ha-app1 \
-Dkubernetes.container.image=flink:k8s-ha \ -Dkubernetes.container.image.pull-policy=Always \
-Djobmanager.heap.size=4096m -Dtaskmanager.memory.process.size=4096m \
-Dkubernetes.jobmanager.cpu=1 -Dkubernetes.taskmanager.cpu=2 -Dtaskmanager.numberOfTaskSlots=4 \
-Dhigh-availability=org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory \
-Dhigh-availability.storageDir=oss://flink/flink-ha \
-Drestart-strategy=fixed-delay -Drestart-strategy.fixed-delay.attempts=10 \
-Dcontainerized.master.env.ENABLE_BUILT_IN_PLUGINS=flink-oss-fs-hadoop-1.12.jar \
-Dcontainerized.taskmanager.env.ENABLE_BUILT_IN_PLUGINS=flink-oss-fs-hadoop-1.12.jar \

A couple of questions about it:

./bin/flink run-application -p 10 -t used to be ./bin/flink run-application -t. What is -p 10?
-Dkubernetes.container.image=flink:k8s-ha does it require a special container build?

-Dhigh-availability.storageDir=oss://flink/flink-ha \
-Dcontainerized.master.env.ENABLE_BUILT_IN_PLUGINS=flink-oss-fs-hadoop-1.12.jar \
-Dcontainerized.taskmanager.env.ENABLE_BUILT_IN_PLUGINS=flink-oss-fs-hadoop-1.12.jar \

This is if I use HDFS for save pointing, right? I can instead use PVC - based save pointing, correct?

Also I was trying to understand, how it works, and from the documentation it sounds like there is one active and one or 
more standby JMs. Can I control the amount of standby JMs?

Finally, what is behavior on the rolling restart of JM deployment?




On Dec 15, 2020, at 10:42 AM, Chesnay Schepler <[hidden email]> wrote:

Unfortunately no; there are some discussions going on in the docker-library/official-images PR that have to be resolved first, but currently these would require changes on the Flink side that we cannot do (because it is already released!). We are not sure yet whether we can get the PR accepted and defer further changes to 1.12.1 .

On 12/15/2020 5:17 PM, Boris Lublinsky wrote:
Thanks.
Do you have ETA for docker images?


On Dec 14, 2020, at 3:43 AM, Chesnay Schepler <[hidden email]> wrote:

1) It is compiled with Java 8 but runs on Java 8 & 11.
2) Docker images are not yet published.
3) It is mentioned at the top of the Kubernetes HA Services documentation that it also works for the native Kubernetes integration.
Kubernetes high availability services can only be used when deploying to Kubernetes. Consequently, they can be configured when using standalone Flink on Kubernetes or the native Kubernetes integration
From what I understand you only need to configure the 3 listed options; the documentation also contains an example configuration.

On 12/14/2020 4:52 AM, Boris Lublinsky wrote:
It is great that Flink 1.12 is out. Several questions:

1. Is official Flink 1.12 distribution https://flink.apache.org/downloads.html specifies Scala versions, but not Java versions. Is it Java 8?
2. I do not see any 1.12 docker images here https://hub.docker.com/_/flink. Are they somewhere else?
3 Flink 1.12 introduces Kubernetes HA support https://ci.apache.org/projects/flink/flink-docs-stable/deployment/ha/kubernetes_ha.html, but Flink native Kubernetes support https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/resource-providers/native_kubernetes.html has no mentioning of HA. Are the 2 integrated? DO you have any examples of starting HA cluster using Flink native Kubernetes?

  







Reply | Threaded
Open this post in threaded view
|

Re: Flink 1.12

Yang Wang
The latest successful checkpoint pointer is stored in the ConfigMap, as well as the JobGraph pointer.
They could help us recover the running jobs before you delete the K8s deployment. If the HA ConfigMaps
are deleted, then when you create a Flink cluster with the same cluster-id, it could not recover from the latest
successful checkpoint automatically.

Best,
Yang




Boris Lublinsky <[hidden email]> 于2020年12月18日周五 上午11:42写道:

This does not seem right:
To keep HA data while restarting the Flink cluster, simply delete the deployment (via kubectl delete deploy <cluster-id>). All the Flink cluster related resources will be deleted (e.g. JobManager Deployment, TaskManager pods, services, Flink conf ConfigMap). HA related ConfigMaps will be retained because they do not set the owner reference. When restarting the cluster, all previously running jobs will be recovered and restarted from the latest successful checkpoint.

Last successful checkpoint is not in the config maps, but rather on persistent volume. Config map can be safely deleted. If you restart JM, it will create a new leader anyways., So I would suggest to add owner reference there


On Dec 17, 2020, at 8:49 PM, Yang Wang <[hidden email]> wrote:

Hi Boris,

Thanks for your follow up response and trying the new KubernetesHAService.

1. It is a valid bug. We are not setting the service account for TaskManager pod. Before the KubernetesHAService is introduced, it works fine because the TaskManager does not need to access the K8s resource(e.g. ConfigMap) directly. I have created a ticket[1] to support setting service account for TaskManager. 
2. If you directly delete the JobManager deployment, then the HA related ConfigMap will be retained. It is a by-design behavior. Because the job does not go to a terminal state(SUCCEED, FAILED, CANCELED), we need this cluster could recover in the future. If all the jobs in the application reach to the terminal state, all the HA related ConfigMaps will be cleaned up automatically. You could cancel the job and verify that. Refer here[2] for more information.

For the PVC based storage, if it could support multiple read-write then the KubernetesHAService should work. Actually, it feels like a distributed storage.


Best,
Yang

Boris Lublinsky <[hidden email]> 于2020年12月18日周五 上午7:16写道:
And K8 native HA works,
But there are 2 bugs in this implementation.

1. Task manager pods are running as default user account, which fails because it does not have access to config maps to get endpoint’s information. I had to add permissions to default service account to make it work. Ideally both JM and TM pods should run under the same service account. 
2. When a Flink application is deleted, it clears the main config map, but not the ones used for leader election


And finally it works fine with PVC based storage, as long as it is read-write many


On Dec 15, 2020, at 8:40 PM, Yang Wang <[hidden email]> wrote:

Hi Boris,

What is -p 10?
It is same to --parallelism 10. Set the default parallelism to 10.

does it require a special container build?
No, the official flink docker image could be used directly. Unfortunately, we do not have the image now. And we are trying to figure out.
You could follow the instructions below to have your own image.


git clone https://github.com/apache/flink-docker.git

git checkout dev-master
./add-custom.sh -u https://apache.website-solution.net/flink/flink-1.12.0/flink-1.12.0-bin-scala_2.11.tgz -n flink-1.12.0 cd dev/flink-1.12.0-debian docker build . -t flink:flink-1.12.0 docker push flink:flink-1.12.0

This is if I use HDFS for save pointing, right? I can instead use PVC - based save pointing, correct?
It is an example to storing the HA related data to OSS(Alibaba Cloud Object Storage, similar to S3). Since we require a distributed storage, I am afraid you could not use a PVC here. Instead, you could using a minio.

Can I control the amount of standby JMs? 
Currently, you could not control the number of JobManagers. This is only because we have not introduce a config option for it. But you could do it manually via `kubectl edit deploy <clusterID>`. It should also work.

Finally, what is behavior on the rolling restart of JM deployment?
Once a JobManager terminated, it will lose the leadership and a standby one will take over. So on the rolling restart of JM deployment, you will find that the leader switches multiple times and your job also restarts multiple times. I am not sure why you need to roll the JobManager deployment. We are using deployment for JobManager in Flink just because we want the JobManager to be launched once it crashed. Another reason for multiple JobManagers is to get a faster recovery.


Best,
Yang
 

Boris Lublinsky <[hidden email]> 于2020年12月16日周三 上午9:09写道:
Thanks Chesney for your quick response,
More carefully and found the sample, I was looking for:

./bin/flink run-application -p 10 -t kubernetes-application -Dkubernetes.cluster-id=k8s-ha-app1 \
-Dkubernetes.container.image=flink:k8s-ha \ -Dkubernetes.container.image.pull-policy=Always \
-Djobmanager.heap.size=4096m -Dtaskmanager.memory.process.size=4096m \
-Dkubernetes.jobmanager.cpu=1 -Dkubernetes.taskmanager.cpu=2 -Dtaskmanager.numberOfTaskSlots=4 \
-Dhigh-availability=org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory \
-Dhigh-availability.storageDir=oss://flink/flink-ha \
-Drestart-strategy=fixed-delay -Drestart-strategy.fixed-delay.attempts=10 \
-Dcontainerized.master.env.ENABLE_BUILT_IN_PLUGINS=flink-oss-fs-hadoop-1.12.jar \
-Dcontainerized.taskmanager.env.ENABLE_BUILT_IN_PLUGINS=flink-oss-fs-hadoop-1.12.jar \

A couple of questions about it:

./bin/flink run-application -p 10 -t used to be ./bin/flink run-application -t. What is -p 10?
-Dkubernetes.container.image=flink:k8s-ha does it require a special container build?

-Dhigh-availability.storageDir=oss://flink/flink-ha \
-Dcontainerized.master.env.ENABLE_BUILT_IN_PLUGINS=flink-oss-fs-hadoop-1.12.jar \
-Dcontainerized.taskmanager.env.ENABLE_BUILT_IN_PLUGINS=flink-oss-fs-hadoop-1.12.jar \

This is if I use HDFS for save pointing, right? I can instead use PVC - based save pointing, correct?

Also I was trying to understand, how it works, and from the documentation it sounds like there is one active and one or 
more standby JMs. Can I control the amount of standby JMs?

Finally, what is behavior on the rolling restart of JM deployment?




On Dec 15, 2020, at 10:42 AM, Chesnay Schepler <[hidden email]> wrote:

Unfortunately no; there are some discussions going on in the docker-library/official-images PR that have to be resolved first, but currently these would require changes on the Flink side that we cannot do (because it is already released!). We are not sure yet whether we can get the PR accepted and defer further changes to 1.12.1 .

On 12/15/2020 5:17 PM, Boris Lublinsky wrote:
Thanks.
Do you have ETA for docker images?


On Dec 14, 2020, at 3:43 AM, Chesnay Schepler <[hidden email]> wrote:

1) It is compiled with Java 8 but runs on Java 8 & 11.
2) Docker images are not yet published.
3) It is mentioned at the top of the Kubernetes HA Services documentation that it also works for the native Kubernetes integration.
Kubernetes high availability services can only be used when deploying to Kubernetes. Consequently, they can be configured when using standalone Flink on Kubernetes or the native Kubernetes integration
From what I understand you only need to configure the 3 listed options; the documentation also contains an example configuration.

On 12/14/2020 4:52 AM, Boris Lublinsky wrote:
It is great that Flink 1.12 is out. Several questions:

1. Is official Flink 1.12 distribution https://flink.apache.org/downloads.html specifies Scala versions, but not Java versions. Is it Java 8?
2. I do not see any 1.12 docker images here https://hub.docker.com/_/flink. Are they somewhere else?
3 Flink 1.12 introduces Kubernetes HA support https://ci.apache.org/projects/flink/flink-docs-stable/deployment/ha/kubernetes_ha.html, but Flink native Kubernetes support https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/resource-providers/native_kubernetes.html has no mentioning of HA. Are the 2 integrated? DO you have any examples of starting HA cluster using Flink native Kubernetes?

  







Reply | Threaded
Open this post in threaded view
|

Re: Flink 1.12

Boris Lublinsky
Thanks Yang,
This is still confusing.
I did more experiments and see that checkpointing information is stored twice - in config map and in high-availability.storageDir
Do we need this duplication?
Or just specifying

high-availability: org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
Is sufficient?



On Dec 17, 2020, at 10:09 PM, Yang Wang <[hidden email]> wrote:

The latest successful checkpoint pointer is stored in the ConfigMap, as well as the JobGraph pointer.
They could help us recover the running jobs before you delete the K8s deployment. If the HA ConfigMaps
are deleted, then when you create a Flink cluster with the same cluster-id, it could not recover from the latest
successful checkpoint automatically.

Best,
Yang




Boris Lublinsky <[hidden email]> 于2020年12月18日周五 上午11:42写道:

This does not seem right:
To keep HA data while restarting the Flink cluster, simply delete the deployment (via kubectl delete deploy <cluster-id>). All the Flink cluster related resources will be deleted (e.g. JobManager Deployment, TaskManager pods, services, Flink conf ConfigMap). HA related ConfigMaps will be retained because they do not set the owner reference. When restarting the cluster, all previously running jobs will be recovered and restarted from the latest successful checkpoint.

Last successful checkpoint is not in the config maps, but rather on persistent volume. Config map can be safely deleted. If you restart JM, it will create a new leader anyways., So I would suggest to add owner reference there


On Dec 17, 2020, at 8:49 PM, Yang Wang <[hidden email]> wrote:

Hi Boris,

Thanks for your follow up response and trying the new KubernetesHAService.

1. It is a valid bug. We are not setting the service account for TaskManager pod. Before the KubernetesHAService is introduced, it works fine because the TaskManager does not need to access the K8s resource(e.g. ConfigMap) directly. I have created a ticket[1] to support setting service account for TaskManager. 
2. If you directly delete the JobManager deployment, then the HA related ConfigMap will be retained. It is a by-design behavior. Because the job does not go to a terminal state(SUCCEED, FAILED, CANCELED), we need this cluster could recover in the future. If all the jobs in the application reach to the terminal state, all the HA related ConfigMaps will be cleaned up automatically. You could cancel the job and verify that. Refer here[2] for more information.

For the PVC based storage, if it could support multiple read-write then the KubernetesHAService should work. Actually, it feels like a distributed storage.


Best,
Yang

Boris Lublinsky <[hidden email]> 于2020年12月18日周五 上午7:16写道:
And K8 native HA works,
But there are 2 bugs in this implementation.

1. Task manager pods are running as default user account, which fails because it does not have access to config maps to get endpoint’s information. I had to add permissions to default service account to make it work. Ideally both JM and TM pods should run under the same service account. 
2. When a Flink application is deleted, it clears the main config map, but not the ones used for leader election


And finally it works fine with PVC based storage, as long as it is read-write many


On Dec 15, 2020, at 8:40 PM, Yang Wang <[hidden email]> wrote:

Hi Boris,

What is -p 10?
It is same to --parallelism 10. Set the default parallelism to 10.

does it require a special container build?
No, the official flink docker image could be used directly. Unfortunately, we do not have the image now. And we are trying to figure out.
You could follow the instructions below to have your own image.


git clone https://github.com/apache/flink-docker.git

git checkout dev-master
./add-custom.sh -u https://apache.website-solution.net/flink/flink-1.12.0/flink-1.12.0-bin-scala_2.11.tgz -n flink-1.12.0 cd dev/flink-1.12.0-debian docker build . -t flink:flink-1.12.0 docker push flink:flink-1.12.0

This is if I use HDFS for save pointing, right? I can instead use PVC - based save pointing, correct?
It is an example to storing the HA related data to OSS(Alibaba Cloud Object Storage, similar to S3). Since we require a distributed storage, I am afraid you could not use a PVC here. Instead, you could using a minio.

Can I control the amount of standby JMs? 
Currently, you could not control the number of JobManagers. This is only because we have not introduce a config option for it. But you could do it manually via `kubectl edit deploy <clusterID>`. It should also work.

Finally, what is behavior on the rolling restart of JM deployment?
Once a JobManager terminated, it will lose the leadership and a standby one will take over. So on the rolling restart of JM deployment, you will find that the leader switches multiple times and your job also restarts multiple times. I am not sure why you need to roll the JobManager deployment. We are using deployment for JobManager in Flink just because we want the JobManager to be launched once it crashed. Another reason for multiple JobManagers is to get a faster recovery.


Best,
Yang
 

Boris Lublinsky <[hidden email]> 于2020年12月16日周三 上午9:09写道:
Thanks Chesney for your quick response,
More carefully and found the sample, I was looking for:

./bin/flink run-application -p 10 -t kubernetes-application -Dkubernetes.cluster-id=k8s-ha-app1 \
-Dkubernetes.container.image=flink:k8s-ha \ -Dkubernetes.container.image.pull-policy=Always \
-Djobmanager.heap.size=4096m -Dtaskmanager.memory.process.size=4096m \
-Dkubernetes.jobmanager.cpu=1 -Dkubernetes.taskmanager.cpu=2 -Dtaskmanager.numberOfTaskSlots=4 \
-Dhigh-availability=org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory \
-Dhigh-availability.storageDir=oss://flink/flink-ha \
-Drestart-strategy=fixed-delay -Drestart-strategy.fixed-delay.attempts=10 \
-Dcontainerized.master.env.ENABLE_BUILT_IN_PLUGINS=flink-oss-fs-hadoop-1.12.jar \
-Dcontainerized.taskmanager.env.ENABLE_BUILT_IN_PLUGINS=flink-oss-fs-hadoop-1.12.jar \

A couple of questions about it:

./bin/flink run-application -p 10 -t used to be ./bin/flink run-application -t. What is -p 10?
-Dkubernetes.container.image=flink:k8s-ha does it require a special container build?

-Dhigh-availability.storageDir=oss://flink/flink-ha \
-Dcontainerized.master.env.ENABLE_BUILT_IN_PLUGINS=flink-oss-fs-hadoop-1.12.jar \
-Dcontainerized.taskmanager.env.ENABLE_BUILT_IN_PLUGINS=flink-oss-fs-hadoop-1.12.jar \

This is if I use HDFS for save pointing, right? I can instead use PVC - based save pointing, correct?

Also I was trying to understand, how it works, and from the documentation it sounds like there is one active and one or 
more standby JMs. Can I control the amount of standby JMs?

Finally, what is behavior on the rolling restart of JM deployment?




On Dec 15, 2020, at 10:42 AM, Chesnay Schepler <[hidden email]> wrote:

Unfortunately no; there are some discussions going on in the docker-library/official-images PR that have to be resolved first, but currently these would require changes on the Flink side that we cannot do (because it is already released!). We are not sure yet whether we can get the PR accepted and defer further changes to 1.12.1 .

On 12/15/2020 5:17 PM, Boris Lublinsky wrote:
Thanks.
Do you have ETA for docker images?


On Dec 14, 2020, at 3:43 AM, Chesnay Schepler <[hidden email]> wrote:

1) It is compiled with Java 8 but runs on Java 8 & 11.
2) Docker images are not yet published.
3) It is mentioned at the top of the Kubernetes HA Services documentation that it also works for the native Kubernetes integration.
Kubernetes high availability services can only be used when deploying to Kubernetes. Consequently, they can be configured when using standalone Flink on Kubernetes or the native Kubernetes integration
From what I understand you only need to configure the 3 listed options; the documentation also contains an example configuration.

On 12/14/2020 4:52 AM, Boris Lublinsky wrote:
It is great that Flink 1.12 is out. Several questions:

1. Is official Flink 1.12 distribution https://flink.apache.org/downloads.html specifies Scala versions, but not Java versions. Is it Java 8?
2. I do not see any 1.12 docker images here https://hub.docker.com/_/flink. Are they somewhere else?
3 Flink 1.12 introduces Kubernetes HA support https://ci.apache.org/projects/flink/flink-docs-stable/deployment/ha/kubernetes_ha.html, but Flink native Kubernetes support https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/resource-providers/native_kubernetes.html has no mentioning of HA. Are the 2 integrated? DO you have any examples of starting HA cluster using Flink native Kubernetes?

  








Reply | Threaded
Open this post in threaded view
|

Re: Flink 1.12

Yang Wang
I am afraid only the state handle is stored in the ConfigMap. The real state data is stored in
the distributed storage configured via "high-availability.storageDir". I believe you could find
more information in this class KubernetesStateHandleStore[1].

How could you find that the checkpointing information is stored twice? It should not happen.



Best,
Yang

Boris Lublinsky <[hidden email]> 于2020年12月20日周日 上午12:49写道:
Thanks Yang,
This is still confusing.
I did more experiments and see that checkpointing information is stored twice - in config map and in high-availability.storageDir
Do we need this duplication?
Or just specifying

high-availability: org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
Is sufficient?



On Dec 17, 2020, at 10:09 PM, Yang Wang <[hidden email]> wrote:

The latest successful checkpoint pointer is stored in the ConfigMap, as well as the JobGraph pointer.
They could help us recover the running jobs before you delete the K8s deployment. If the HA ConfigMaps
are deleted, then when you create a Flink cluster with the same cluster-id, it could not recover from the latest
successful checkpoint automatically.

Best,
Yang




Boris Lublinsky <[hidden email]> 于2020年12月18日周五 上午11:42写道:

This does not seem right:
To keep HA data while restarting the Flink cluster, simply delete the deployment (via kubectl delete deploy <cluster-id>). All the Flink cluster related resources will be deleted (e.g. JobManager Deployment, TaskManager pods, services, Flink conf ConfigMap). HA related ConfigMaps will be retained because they do not set the owner reference. When restarting the cluster, all previously running jobs will be recovered and restarted from the latest successful checkpoint.

Last successful checkpoint is not in the config maps, but rather on persistent volume. Config map can be safely deleted. If you restart JM, it will create a new leader anyways., So I would suggest to add owner reference there


On Dec 17, 2020, at 8:49 PM, Yang Wang <[hidden email]> wrote:

Hi Boris,

Thanks for your follow up response and trying the new KubernetesHAService.

1. It is a valid bug. We are not setting the service account for TaskManager pod. Before the KubernetesHAService is introduced, it works fine because the TaskManager does not need to access the K8s resource(e.g. ConfigMap) directly. I have created a ticket[1] to support setting service account for TaskManager. 
2. If you directly delete the JobManager deployment, then the HA related ConfigMap will be retained. It is a by-design behavior. Because the job does not go to a terminal state(SUCCEED, FAILED, CANCELED), we need this cluster could recover in the future. If all the jobs in the application reach to the terminal state, all the HA related ConfigMaps will be cleaned up automatically. You could cancel the job and verify that. Refer here[2] for more information.

For the PVC based storage, if it could support multiple read-write then the KubernetesHAService should work. Actually, it feels like a distributed storage.


Best,
Yang

Boris Lublinsky <[hidden email]> 于2020年12月18日周五 上午7:16写道:
And K8 native HA works,
But there are 2 bugs in this implementation.

1. Task manager pods are running as default user account, which fails because it does not have access to config maps to get endpoint’s information. I had to add permissions to default service account to make it work. Ideally both JM and TM pods should run under the same service account. 
2. When a Flink application is deleted, it clears the main config map, but not the ones used for leader election


And finally it works fine with PVC based storage, as long as it is read-write many


On Dec 15, 2020, at 8:40 PM, Yang Wang <[hidden email]> wrote:

Hi Boris,

What is -p 10?
It is same to --parallelism 10. Set the default parallelism to 10.

does it require a special container build?
No, the official flink docker image could be used directly. Unfortunately, we do not have the image now. And we are trying to figure out.
You could follow the instructions below to have your own image.


git clone https://github.com/apache/flink-docker.git

git checkout dev-master
./add-custom.sh -u https://apache.website-solution.net/flink/flink-1.12.0/flink-1.12.0-bin-scala_2.11.tgz -n flink-1.12.0 cd dev/flink-1.12.0-debian docker build . -t flink:flink-1.12.0 docker push flink:flink-1.12.0

This is if I use HDFS for save pointing, right? I can instead use PVC - based save pointing, correct?
It is an example to storing the HA related data to OSS(Alibaba Cloud Object Storage, similar to S3). Since we require a distributed storage, I am afraid you could not use a PVC here. Instead, you could using a minio.

Can I control the amount of standby JMs? 
Currently, you could not control the number of JobManagers. This is only because we have not introduce a config option for it. But you could do it manually via `kubectl edit deploy <clusterID>`. It should also work.

Finally, what is behavior on the rolling restart of JM deployment?
Once a JobManager terminated, it will lose the leadership and a standby one will take over. So on the rolling restart of JM deployment, you will find that the leader switches multiple times and your job also restarts multiple times. I am not sure why you need to roll the JobManager deployment. We are using deployment for JobManager in Flink just because we want the JobManager to be launched once it crashed. Another reason for multiple JobManagers is to get a faster recovery.


Best,
Yang
 

Boris Lublinsky <[hidden email]> 于2020年12月16日周三 上午9:09写道:
Thanks Chesney for your quick response,
More carefully and found the sample, I was looking for:

./bin/flink run-application -p 10 -t kubernetes-application -Dkubernetes.cluster-id=k8s-ha-app1 \
-Dkubernetes.container.image=flink:k8s-ha \ -Dkubernetes.container.image.pull-policy=Always \
-Djobmanager.heap.size=4096m -Dtaskmanager.memory.process.size=4096m \
-Dkubernetes.jobmanager.cpu=1 -Dkubernetes.taskmanager.cpu=2 -Dtaskmanager.numberOfTaskSlots=4 \
-Dhigh-availability=org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory \
-Dhigh-availability.storageDir=oss://flink/flink-ha \
-Drestart-strategy=fixed-delay -Drestart-strategy.fixed-delay.attempts=10 \
-Dcontainerized.master.env.ENABLE_BUILT_IN_PLUGINS=flink-oss-fs-hadoop-1.12.jar \
-Dcontainerized.taskmanager.env.ENABLE_BUILT_IN_PLUGINS=flink-oss-fs-hadoop-1.12.jar \

A couple of questions about it:

./bin/flink run-application -p 10 -t used to be ./bin/flink run-application -t. What is -p 10?
-Dkubernetes.container.image=flink:k8s-ha does it require a special container build?

-Dhigh-availability.storageDir=oss://flink/flink-ha \
-Dcontainerized.master.env.ENABLE_BUILT_IN_PLUGINS=flink-oss-fs-hadoop-1.12.jar \
-Dcontainerized.taskmanager.env.ENABLE_BUILT_IN_PLUGINS=flink-oss-fs-hadoop-1.12.jar \

This is if I use HDFS for save pointing, right? I can instead use PVC - based save pointing, correct?

Also I was trying to understand, how it works, and from the documentation it sounds like there is one active and one or 
more standby JMs. Can I control the amount of standby JMs?

Finally, what is behavior on the rolling restart of JM deployment?




On Dec 15, 2020, at 10:42 AM, Chesnay Schepler <[hidden email]> wrote:

Unfortunately no; there are some discussions going on in the docker-library/official-images PR that have to be resolved first, but currently these would require changes on the Flink side that we cannot do (because it is already released!). We are not sure yet whether we can get the PR accepted and defer further changes to 1.12.1 .

On 12/15/2020 5:17 PM, Boris Lublinsky wrote:
Thanks.
Do you have ETA for docker images?


On Dec 14, 2020, at 3:43 AM, Chesnay Schepler <[hidden email]> wrote:

1) It is compiled with Java 8 but runs on Java 8 & 11.
2) Docker images are not yet published.
3) It is mentioned at the top of the Kubernetes HA Services documentation that it also works for the native Kubernetes integration.
Kubernetes high availability services can only be used when deploying to Kubernetes. Consequently, they can be configured when using standalone Flink on Kubernetes or the native Kubernetes integration
From what I understand you only need to configure the 3 listed options; the documentation also contains an example configuration.

On 12/14/2020 4:52 AM, Boris Lublinsky wrote:
It is great that Flink 1.12 is out. Several questions:

1. Is official Flink 1.12 distribution https://flink.apache.org/downloads.html specifies Scala versions, but not Java versions. Is it Java 8?
2. I do not see any 1.12 docker images here https://hub.docker.com/_/flink. Are they somewhere else?
3 Flink 1.12 introduces Kubernetes HA support https://ci.apache.org/projects/flink/flink-docs-stable/deployment/ha/kubernetes_ha.html, but Flink native Kubernetes support https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/resource-providers/native_kubernetes.html has no mentioning of HA. Are the 2 integrated? DO you have any examples of starting HA cluster using Flink native Kubernetes?

  








Reply | Threaded
Open this post in threaded view
|

Re: Flink 1.12

Boris Lublinsky
I understand this.
State storage Is defined defined by state.checkpointing.dir, for example
state.checkpoints.dir: file:///mnt/flink/storage/checkpoints

I am talking about reference defined in 2 places


On Dec 20, 2020, at 8:05 PM, Yang Wang <[hidden email]> wrote:

I am afraid only the state handle is stored in the ConfigMap. The real state data is stored in
the distributed storage configured via "high-availability.storageDir". I believe you could find
more information in this class KubernetesStateHandleStore[1].

How could you find that the checkpointing information is stored twice? It should not happen.



Best,
Yang

Boris Lublinsky <[hidden email]> 于2020年12月20日周日 上午12:49写道:
Thanks Yang,
This is still confusing.
I did more experiments and see that checkpointing information is stored twice - in config map and in high-availability.storageDir
Do we need this duplication?
Or just specifying

high-availability: org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
Is sufficient?



On Dec 17, 2020, at 10:09 PM, Yang Wang <[hidden email]> wrote:

The latest successful checkpoint pointer is stored in the ConfigMap, as well as the JobGraph pointer.
They could help us recover the running jobs before you delete the K8s deployment. If the HA ConfigMaps
are deleted, then when you create a Flink cluster with the same cluster-id, it could not recover from the latest
successful checkpoint automatically.

Best,
Yang




Boris Lublinsky <[hidden email]> 于2020年12月18日周五 上午11:42写道:

This does not seem right:
To keep HA data while restarting the Flink cluster, simply delete the deployment (via kubectl delete deploy <cluster-id>). All the Flink cluster related resources will be deleted (e.g. JobManager Deployment, TaskManager pods, services, Flink conf ConfigMap). HA related ConfigMaps will be retained because they do not set the owner reference. When restarting the cluster, all previously running jobs will be recovered and restarted from the latest successful checkpoint.

Last successful checkpoint is not in the config maps, but rather on persistent volume. Config map can be safely deleted. If you restart JM, it will create a new leader anyways., So I would suggest to add owner reference there


On Dec 17, 2020, at 8:49 PM, Yang Wang <[hidden email]> wrote:

Hi Boris,

Thanks for your follow up response and trying the new KubernetesHAService.

1. It is a valid bug. We are not setting the service account for TaskManager pod. Before the KubernetesHAService is introduced, it works fine because the TaskManager does not need to access the K8s resource(e.g. ConfigMap) directly. I have created a ticket[1] to support setting service account for TaskManager. 
2. If you directly delete the JobManager deployment, then the HA related ConfigMap will be retained. It is a by-design behavior. Because the job does not go to a terminal state(SUCCEED, FAILED, CANCELED), we need this cluster could recover in the future. If all the jobs in the application reach to the terminal state, all the HA related ConfigMaps will be cleaned up automatically. You could cancel the job and verify that. Refer here[2] for more information.

For the PVC based storage, if it could support multiple read-write then the KubernetesHAService should work. Actually, it feels like a distributed storage.


Best,
Yang

Boris Lublinsky <[hidden email]> 于2020年12月18日周五 上午7:16写道:
And K8 native HA works,
But there are 2 bugs in this implementation.

1. Task manager pods are running as default user account, which fails because it does not have access to config maps to get endpoint’s information. I had to add permissions to default service account to make it work. Ideally both JM and TM pods should run under the same service account. 
2. When a Flink application is deleted, it clears the main config map, but not the ones used for leader election


And finally it works fine with PVC based storage, as long as it is read-write many


On Dec 15, 2020, at 8:40 PM, Yang Wang <[hidden email]> wrote:

Hi Boris,

What is -p 10?
It is same to --parallelism 10. Set the default parallelism to 10.

does it require a special container build?
No, the official flink docker image could be used directly. Unfortunately, we do not have the image now. And we are trying to figure out.
You could follow the instructions below to have your own image.


git clone https://github.com/apache/flink-docker.git

git checkout dev-master
./add-custom.sh -u https://apache.website-solution.net/flink/flink-1.12.0/flink-1.12.0-bin-scala_2.11.tgz -n flink-1.12.0 cd dev/flink-1.12.0-debian docker build . -t flink:flink-1.12.0 docker push flink:flink-1.12.0

This is if I use HDFS for save pointing, right? I can instead use PVC - based save pointing, correct?
It is an example to storing the HA related data to OSS(Alibaba Cloud Object Storage, similar to S3). Since we require a distributed storage, I am afraid you could not use a PVC here. Instead, you could using a minio.

Can I control the amount of standby JMs? 
Currently, you could not control the number of JobManagers. This is only because we have not introduce a config option for it. But you could do it manually via `kubectl edit deploy <clusterID>`. It should also work.

Finally, what is behavior on the rolling restart of JM deployment?
Once a JobManager terminated, it will lose the leadership and a standby one will take over. So on the rolling restart of JM deployment, you will find that the leader switches multiple times and your job also restarts multiple times. I am not sure why you need to roll the JobManager deployment. We are using deployment for JobManager in Flink just because we want the JobManager to be launched once it crashed. Another reason for multiple JobManagers is to get a faster recovery.


Best,
Yang
 

Boris Lublinsky <[hidden email]> 于2020年12月16日周三 上午9:09写道:
Thanks Chesney for your quick response,
More carefully and found the sample, I was looking for:

./bin/flink run-application -p 10 -t kubernetes-application -Dkubernetes.cluster-id=k8s-ha-app1 \
-Dkubernetes.container.image=flink:k8s-ha \ -Dkubernetes.container.image.pull-policy=Always \
-Djobmanager.heap.size=4096m -Dtaskmanager.memory.process.size=4096m \
-Dkubernetes.jobmanager.cpu=1 -Dkubernetes.taskmanager.cpu=2 -Dtaskmanager.numberOfTaskSlots=4 \
-Dhigh-availability=org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory \
-Dhigh-availability.storageDir=oss://flink/flink-ha \
-Drestart-strategy=fixed-delay -Drestart-strategy.fixed-delay.attempts=10 \
-Dcontainerized.master.env.ENABLE_BUILT_IN_PLUGINS=flink-oss-fs-hadoop-1.12.jar \
-Dcontainerized.taskmanager.env.ENABLE_BUILT_IN_PLUGINS=flink-oss-fs-hadoop-1.12.jar \

A couple of questions about it:

./bin/flink run-application -p 10 -t used to be ./bin/flink run-application -t. What is -p 10?
-Dkubernetes.container.image=flink:k8s-ha does it require a special container build?

-Dhigh-availability.storageDir=oss://flink/flink-ha \
-Dcontainerized.master.env.ENABLE_BUILT_IN_PLUGINS=flink-oss-fs-hadoop-1.12.jar \
-Dcontainerized.taskmanager.env.ENABLE_BUILT_IN_PLUGINS=flink-oss-fs-hadoop-1.12.jar \

This is if I use HDFS for save pointing, right? I can instead use PVC - based save pointing, correct?

Also I was trying to understand, how it works, and from the documentation it sounds like there is one active and one or 
more standby JMs. Can I control the amount of standby JMs?

Finally, what is behavior on the rolling restart of JM deployment?




On Dec 15, 2020, at 10:42 AM, Chesnay Schepler <[hidden email]> wrote:

Unfortunately no; there are some discussions going on in the docker-library/official-images PR that have to be resolved first, but currently these would require changes on the Flink side that we cannot do (because it is already released!). We are not sure yet whether we can get the PR accepted and defer further changes to 1.12.1 .

On 12/15/2020 5:17 PM, Boris Lublinsky wrote:
Thanks.
Do you have ETA for docker images?


On Dec 14, 2020, at 3:43 AM, Chesnay Schepler <[hidden email]> wrote:

1) It is compiled with Java 8 but runs on Java 8 & 11.
2) Docker images are not yet published.
3) It is mentioned at the top of the Kubernetes HA Services documentation that it also works for the native Kubernetes integration.
Kubernetes high availability services can only be used when deploying to Kubernetes. Consequently, they can be configured when using standalone Flink on Kubernetes or the native Kubernetes integration
From what I understand you only need to configure the 3 listed options; the documentation also contains an example configuration.

On 12/14/2020 4:52 AM, Boris Lublinsky wrote:
It is great that Flink 1.12 is out. Several questions:

1. Is official Flink 1.12 distribution https://flink.apache.org/downloads.html specifies Scala versions, but not Java versions. Is it Java 8?
2. I do not see any 1.12 docker images here https://hub.docker.com/_/flink. Are they somewhere else?
3 Flink 1.12 introduces Kubernetes HA support https://ci.apache.org/projects/flink/flink-docs-stable/deployment/ha/kubernetes_ha.html, but Flink native Kubernetes support https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/resource-providers/native_kubernetes.html has no mentioning of HA. Are the 2 integrated? DO you have any examples of starting HA cluster using Flink native Kubernetes?

  









Reply | Threaded
Open this post in threaded view
|

Re: Flink 1.12

Yang Wang
IIUC, "state.checkpoints.dir" is specifying an external checkpoint path, which will not be cleaned up unless
the users configured it explicitly[1].

However, for "high-availability.storageDir", it will be cleaned up automatically when all the jobs in the application
reaches to the terminal state. Moreover, not only the checkpoints, but also the generated job graphs, user jars/artifacts
are stored in this storage. You could check the content of this directory.


Best,
Yang

Boris Lublinsky <[hidden email]> 于2020年12月21日周一 上午10:18写道:
I understand this.
State storage Is defined defined by state.checkpointing.dir, for example
state.checkpoints.dir: file:///mnt/flink/storage/checkpoints

I am talking about reference defined in 2 places


On Dec 20, 2020, at 8:05 PM, Yang Wang <[hidden email]> wrote:

I am afraid only the state handle is stored in the ConfigMap. The real state data is stored in
the distributed storage configured via "high-availability.storageDir". I believe you could find
more information in this class KubernetesStateHandleStore[1].

How could you find that the checkpointing information is stored twice? It should not happen.



Best,
Yang

Boris Lublinsky <[hidden email]> 于2020年12月20日周日 上午12:49写道:
Thanks Yang,
This is still confusing.
I did more experiments and see that checkpointing information is stored twice - in config map and in high-availability.storageDir
Do we need this duplication?
Or just specifying

high-availability: org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
Is sufficient?



On Dec 17, 2020, at 10:09 PM, Yang Wang <[hidden email]> wrote:

The latest successful checkpoint pointer is stored in the ConfigMap, as well as the JobGraph pointer.
They could help us recover the running jobs before you delete the K8s deployment. If the HA ConfigMaps
are deleted, then when you create a Flink cluster with the same cluster-id, it could not recover from the latest
successful checkpoint automatically.

Best,
Yang




Boris Lublinsky <[hidden email]> 于2020年12月18日周五 上午11:42写道:

This does not seem right:
To keep HA data while restarting the Flink cluster, simply delete the deployment (via kubectl delete deploy <cluster-id>). All the Flink cluster related resources will be deleted (e.g. JobManager Deployment, TaskManager pods, services, Flink conf ConfigMap). HA related ConfigMaps will be retained because they do not set the owner reference. When restarting the cluster, all previously running jobs will be recovered and restarted from the latest successful checkpoint.

Last successful checkpoint is not in the config maps, but rather on persistent volume. Config map can be safely deleted. If you restart JM, it will create a new leader anyways., So I would suggest to add owner reference there


On Dec 17, 2020, at 8:49 PM, Yang Wang <[hidden email]> wrote:

Hi Boris,

Thanks for your follow up response and trying the new KubernetesHAService.

1. It is a valid bug. We are not setting the service account for TaskManager pod. Before the KubernetesHAService is introduced, it works fine because the TaskManager does not need to access the K8s resource(e.g. ConfigMap) directly. I have created a ticket[1] to support setting service account for TaskManager. 
2. If you directly delete the JobManager deployment, then the HA related ConfigMap will be retained. It is a by-design behavior. Because the job does not go to a terminal state(SUCCEED, FAILED, CANCELED), we need this cluster could recover in the future. If all the jobs in the application reach to the terminal state, all the HA related ConfigMaps will be cleaned up automatically. You could cancel the job and verify that. Refer here[2] for more information.

For the PVC based storage, if it could support multiple read-write then the KubernetesHAService should work. Actually, it feels like a distributed storage.


Best,
Yang

Boris Lublinsky <[hidden email]> 于2020年12月18日周五 上午7:16写道:
And K8 native HA works,
But there are 2 bugs in this implementation.

1. Task manager pods are running as default user account, which fails because it does not have access to config maps to get endpoint’s information. I had to add permissions to default service account to make it work. Ideally both JM and TM pods should run under the same service account. 
2. When a Flink application is deleted, it clears the main config map, but not the ones used for leader election


And finally it works fine with PVC based storage, as long as it is read-write many


On Dec 15, 2020, at 8:40 PM, Yang Wang <[hidden email]> wrote:

Hi Boris,

What is -p 10?
It is same to --parallelism 10. Set the default parallelism to 10.

does it require a special container build?
No, the official flink docker image could be used directly. Unfortunately, we do not have the image now. And we are trying to figure out.
You could follow the instructions below to have your own image.


git clone https://github.com/apache/flink-docker.git

git checkout dev-master
./add-custom.sh -u https://apache.website-solution.net/flink/flink-1.12.0/flink-1.12.0-bin-scala_2.11.tgz -n flink-1.12.0 cd dev/flink-1.12.0-debian docker build . -t flink:flink-1.12.0 docker push flink:flink-1.12.0

This is if I use HDFS for save pointing, right? I can instead use PVC - based save pointing, correct?
It is an example to storing the HA related data to OSS(Alibaba Cloud Object Storage, similar to S3). Since we require a distributed storage, I am afraid you could not use a PVC here. Instead, you could using a minio.

Can I control the amount of standby JMs? 
Currently, you could not control the number of JobManagers. This is only because we have not introduce a config option for it. But you could do it manually via `kubectl edit deploy <clusterID>`. It should also work.

Finally, what is behavior on the rolling restart of JM deployment?
Once a JobManager terminated, it will lose the leadership and a standby one will take over. So on the rolling restart of JM deployment, you will find that the leader switches multiple times and your job also restarts multiple times. I am not sure why you need to roll the JobManager deployment. We are using deployment for JobManager in Flink just because we want the JobManager to be launched once it crashed. Another reason for multiple JobManagers is to get a faster recovery.


Best,
Yang
 

Boris Lublinsky <[hidden email]> 于2020年12月16日周三 上午9:09写道:
Thanks Chesney for your quick response,
More carefully and found the sample, I was looking for:

./bin/flink run-application -p 10 -t kubernetes-application -Dkubernetes.cluster-id=k8s-ha-app1 \
-Dkubernetes.container.image=flink:k8s-ha \ -Dkubernetes.container.image.pull-policy=Always \
-Djobmanager.heap.size=4096m -Dtaskmanager.memory.process.size=4096m \
-Dkubernetes.jobmanager.cpu=1 -Dkubernetes.taskmanager.cpu=2 -Dtaskmanager.numberOfTaskSlots=4 \
-Dhigh-availability=org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory \
-Dhigh-availability.storageDir=oss://flink/flink-ha \
-Drestart-strategy=fixed-delay -Drestart-strategy.fixed-delay.attempts=10 \
-Dcontainerized.master.env.ENABLE_BUILT_IN_PLUGINS=flink-oss-fs-hadoop-1.12.jar \
-Dcontainerized.taskmanager.env.ENABLE_BUILT_IN_PLUGINS=flink-oss-fs-hadoop-1.12.jar \

A couple of questions about it:

./bin/flink run-application -p 10 -t used to be ./bin/flink run-application -t. What is -p 10?
-Dkubernetes.container.image=flink:k8s-ha does it require a special container build?

-Dhigh-availability.storageDir=oss://flink/flink-ha \
-Dcontainerized.master.env.ENABLE_BUILT_IN_PLUGINS=flink-oss-fs-hadoop-1.12.jar \
-Dcontainerized.taskmanager.env.ENABLE_BUILT_IN_PLUGINS=flink-oss-fs-hadoop-1.12.jar \

This is if I use HDFS for save pointing, right? I can instead use PVC - based save pointing, correct?

Also I was trying to understand, how it works, and from the documentation it sounds like there is one active and one or 
more standby JMs. Can I control the amount of standby JMs?

Finally, what is behavior on the rolling restart of JM deployment?




On Dec 15, 2020, at 10:42 AM, Chesnay Schepler <[hidden email]> wrote:

Unfortunately no; there are some discussions going on in the docker-library/official-images PR that have to be resolved first, but currently these would require changes on the Flink side that we cannot do (because it is already released!). We are not sure yet whether we can get the PR accepted and defer further changes to 1.12.1 .

On 12/15/2020 5:17 PM, Boris Lublinsky wrote:
Thanks.
Do you have ETA for docker images?


On Dec 14, 2020, at 3:43 AM, Chesnay Schepler <[hidden email]> wrote:

1) It is compiled with Java 8 but runs on Java 8 & 11.
2) Docker images are not yet published.
3) It is mentioned at the top of the Kubernetes HA Services documentation that it also works for the native Kubernetes integration.
Kubernetes high availability services can only be used when deploying to Kubernetes. Consequently, they can be configured when using standalone Flink on Kubernetes or the native Kubernetes integration
From what I understand you only need to configure the 3 listed options; the documentation also contains an example configuration.

On 12/14/2020 4:52 AM, Boris Lublinsky wrote:
It is great that Flink 1.12 is out. Several questions:

1. Is official Flink 1.12 distribution https://flink.apache.org/downloads.html specifies Scala versions, but not Java versions. Is it Java 8?
2. I do not see any 1.12 docker images here https://hub.docker.com/_/flink. Are they somewhere else?
3 Flink 1.12 introduces Kubernetes HA support https://ci.apache.org/projects/flink/flink-docs-stable/deployment/ha/kubernetes_ha.html, but Flink native Kubernetes support https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/resource-providers/native_kubernetes.html has no mentioning of HA. Are the 2 integrated? DO you have any examples of starting HA cluster using Flink native Kubernetes?