How to setup HA properly with Kubernetes Standalone Application Cluster

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

How to setup HA properly with Kubernetes Standalone Application Cluster

ChangZhuo Chen (陳昌倬)
Hi,

Recently, we changed our deployment to Kubernetes Standalone Application
Cluster for reactive mode. According to [0], we use Kubernetes Job with
--fromSavepoint to upgrade our application without losing state. The Job
config is identical to the one in document.

However, we found that in this setup, if there is a failure in
jobmanager, Kubernetes will restart the jobmanager with original
savepoint specific in `--fromSavepoint`, instead of the latest
checkpoint. It causes problem when it is a long running job.

Any idea for how to make Flink restoring from latest checkpoint when it
is jobmanager failure in Kubernetes Standalone Application Cluster.


[0] https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/deployment/resource-providers/standalone/kubernetes/#deploy-application-cluster


--
ChangZhuo Chen (陳昌倬) czchen@{czchen,debian}.org
http://czchen.info/
Key fingerprint = BA04 346D C2E1 FE63 C790  8793 CC65 B0CD EC27 5D5B

signature.asc (849 bytes) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: How to setup HA properly with Kubernetes Standalone Application Cluster

Fabian Paul
Hi Chen,

Can you tell us a bit more about the job you are using? 
The intended behaviour you are seeking can only be achieved 
If the Kubernetes HA Services are enabled [1][2].
Otherwise the job cannot recall past checkpoints.

Best,
Fabian

On 14. May 2021, at 10:21, ChangZhuo Chen (陳昌倬) <[hidden email]> wrote:

Hi,

Recently, we changed our deployment to Kubernetes Standalone Application
Cluster for reactive mode. According to [0], we use Kubernetes Job with
--fromSavepoint to upgrade our application without losing state. The Job
config is identical to the one in document.

However, we found that in this setup, if there is a failure in
jobmanager, Kubernetes will restart the jobmanager with original
savepoint specific in `--fromSavepoint`, instead of the latest
checkpoint. It causes problem when it is a long running job.

Any idea for how to make Flink restoring from latest checkpoint when it
is jobmanager failure in Kubernetes Standalone Application Cluster.


[0] https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/deployment/resource-providers/standalone/kubernetes/#deploy-application-cluster


--
ChangZhuo Chen (陳昌倬) czchen@{czchen,debian}.org
http://czchen.info/
Key fingerprint = BA04 346D C2E1 FE63 C790  8793 CC65 B0CD EC27 5D5B

Reply | Threaded
Open this post in threaded view
|

Re: How to setup HA properly with Kubernetes Standalone Application Cluster

ChangZhuo Chen (陳昌倬)
On Fri, May 14, 2021 at 02:00:41PM +0200, Fabian Paul wrote:
> Hi Chen,
>
> Can you tell us a bit more about the job you are using?
> The intended behaviour you are seeking can only be achieved
> If the Kubernetes HA Services are enabled [1][2].
> Otherwise the job cannot recall past checkpoints.

Hi Fabian,

Thanks for the response, the following is our cluster setup.

The HA setting we used is the same one as we used to used in session
cluster, so HA shall work. The following is setting we used for HA:

    high-availability: org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
    high-availability.storageDir: gs://a-bucket/recovery
    kubernetes.cluster-id: a-cluster-id
    kubernetes.context: a-context
    kubernetes.namespace: a-namespace

The following is Job we used for standalone application cluster.
Basically the content is almost the same as we used to used in session
cluster, except that it is Job, not Deployment, and the args are
different:

    apiVersion: batch/v1
    kind: Job
    spec:
      template:
        spec:
          restartPolicy: OnFailure
          containers:
            - name: jobmanager
              env:
                - name: FLINK_CONF_FILE
                  value: /opt/flink/conf/flink-conf.yaml
                - name: HADOOP_CLASSPATH
                  value: /opt/flink/conf:/opt/hadoop-3.1.1/share/hadoop/common/lib/*:/opt/hadoop-3.1.1/share/hadoop/common/*:/opt/hadoop-3.1.1/share/hadoop/hdfs:/opt/hadoop-3.1.1/share/hadoop/hdfs/lib/*:/opt/hadoop-3.1.1/share/hadoop/hdfs/*:/opt/hadoop-3.1.1/share/hadoop/mapreduce/lib/*:/opt/hadoop-3.1.1/share/hadoop/mapreduce/*:/opt/hadoop-3.1.1/share/hadoop/yarn:/opt/hadoop-3.1.1/share/hadoop/yarn/lib/*:/opt/hadoop-3.1.1/share/hadoop/yarn/*:/contrib/capacity-scheduler/*.jar
                - name: HADOOP_CONF_DIR
                  value: /opt/flink/conf
                - name: GOOGLE_APPLICATION_CREDENTIALS
                  value: /opt/flink/conf/gcp-service-account.json
              args:
                - standalone-job
                - --fromSavepoint
                - gs://a-savepoint
                - --job-classname
                - com.example.my.application

--
ChangZhuo Chen (陳昌倬) czchen@{czchen,debian}.org
http://czchen.info/
Key fingerprint = BA04 346D C2E1 FE63 C790  8793 CC65 B0CD EC27 5D5B

signature.asc (849 bytes) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: How to setup HA properly with Kubernetes Standalone Application Cluster

Yang Wang
Hi ChangZhuo,

IIRC, even you have specified a savepoint when starting, the JobManager could recover from the latest checkpoint when the JobManager failed. Because when recovering, DefaultCompletedCheckpointStore will sort all the checkpoints(including the savepoint) and pick the latest one.

So, could you share the JobManager logs so that we could verify the recovering behavior?

Best,
Yang

ChangZhuo Chen (陳昌倬) <[hidden email]> 于2021年5月14日周五 下午10:03写道:
On Fri, May 14, 2021 at 02:00:41PM +0200, Fabian Paul wrote:
> Hi Chen,
>
> Can you tell us a bit more about the job you are using?
> The intended behaviour you are seeking can only be achieved
> If the Kubernetes HA Services are enabled [1][2].
> Otherwise the job cannot recall past checkpoints.

Hi Fabian,

Thanks for the response, the following is our cluster setup.

The HA setting we used is the same one as we used to used in session
cluster, so HA shall work. The following is setting we used for HA:

    high-availability: org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
    high-availability.storageDir: gs://a-bucket/recovery
    kubernetes.cluster-id: a-cluster-id
    kubernetes.context: a-context
    kubernetes.namespace: a-namespace

The following is Job we used for standalone application cluster.
Basically the content is almost the same as we used to used in session
cluster, except that it is Job, not Deployment, and the args are
different:

    apiVersion: batch/v1
    kind: Job
    spec:
      template:
        spec:
          restartPolicy: OnFailure
          containers:
            - name: jobmanager
              env:
                - name: FLINK_CONF_FILE
                  value: /opt/flink/conf/flink-conf.yaml
                - name: HADOOP_CLASSPATH
                  value: /opt/flink/conf:/opt/hadoop-3.1.1/share/hadoop/common/lib/*:/opt/hadoop-3.1.1/share/hadoop/common/*:/opt/hadoop-3.1.1/share/hadoop/hdfs:/opt/hadoop-3.1.1/share/hadoop/hdfs/lib/*:/opt/hadoop-3.1.1/share/hadoop/hdfs/*:/opt/hadoop-3.1.1/share/hadoop/mapreduce/lib/*:/opt/hadoop-3.1.1/share/hadoop/mapreduce/*:/opt/hadoop-3.1.1/share/hadoop/yarn:/opt/hadoop-3.1.1/share/hadoop/yarn/lib/*:/opt/hadoop-3.1.1/share/hadoop/yarn/*:/contrib/capacity-scheduler/*.jar
                - name: HADOOP_CONF_DIR
                  value: /opt/flink/conf
                - name: GOOGLE_APPLICATION_CREDENTIALS
                  value: /opt/flink/conf/gcp-service-account.json
              args:
                - standalone-job
                - --fromSavepoint
                - gs://a-savepoint
                - --job-classname
                - com.example.my.application

--
ChangZhuo Chen (陳昌倬) czchen@{czchen,debian}.org
http://czchen.info/
Key fingerprint = BA04 346D C2E1 FE63 C790  8793 CC65 B0CD EC27 5D5B