Hi,
Recently, we changed our deployment to Kubernetes Standalone Application Cluster for reactive mode. According to [0], we use Kubernetes Job with --fromSavepoint to upgrade our application without losing state. The Job config is identical to the one in document. However, we found that in this setup, if there is a failure in jobmanager, Kubernetes will restart the jobmanager with original savepoint specific in `--fromSavepoint`, instead of the latest checkpoint. It causes problem when it is a long running job. Any idea for how to make Flink restoring from latest checkpoint when it is jobmanager failure in Kubernetes Standalone Application Cluster. [0] https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/deployment/resource-providers/standalone/kubernetes/#deploy-application-cluster -- ChangZhuo Chen (陳昌倬) czchen@{czchen,debian}.org http://czchen.info/ Key fingerprint = BA04 346D C2E1 FE63 C790 8793 CC65 B0CD EC27 5D5B signature.asc (849 bytes) Download Attachment |
Hi Chen,
Can you tell us a bit more about the job you are using? The intended behaviour you are seeking can only be achieved If the Kubernetes HA Services are enabled [1][2]. Otherwise the job cannot recall past checkpoints. Best, Fabian
|
On Fri, May 14, 2021 at 02:00:41PM +0200, Fabian Paul wrote:
> Hi Chen, > > Can you tell us a bit more about the job you are using? > The intended behaviour you are seeking can only be achieved > If the Kubernetes HA Services are enabled [1][2]. > Otherwise the job cannot recall past checkpoints. Hi Fabian, Thanks for the response, the following is our cluster setup. The HA setting we used is the same one as we used to used in session cluster, so HA shall work. The following is setting we used for HA: high-availability: org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory high-availability.storageDir: gs://a-bucket/recovery kubernetes.cluster-id: a-cluster-id kubernetes.context: a-context kubernetes.namespace: a-namespace The following is Job we used for standalone application cluster. Basically the content is almost the same as we used to used in session cluster, except that it is Job, not Deployment, and the args are different: apiVersion: batch/v1 kind: Job spec: template: spec: restartPolicy: OnFailure containers: - name: jobmanager env: - name: FLINK_CONF_FILE value: /opt/flink/conf/flink-conf.yaml - name: HADOOP_CLASSPATH value: /opt/flink/conf:/opt/hadoop-3.1.1/share/hadoop/common/lib/*:/opt/hadoop-3.1.1/share/hadoop/common/*:/opt/hadoop-3.1.1/share/hadoop/hdfs:/opt/hadoop-3.1.1/share/hadoop/hdfs/lib/*:/opt/hadoop-3.1.1/share/hadoop/hdfs/*:/opt/hadoop-3.1.1/share/hadoop/mapreduce/lib/*:/opt/hadoop-3.1.1/share/hadoop/mapreduce/*:/opt/hadoop-3.1.1/share/hadoop/yarn:/opt/hadoop-3.1.1/share/hadoop/yarn/lib/*:/opt/hadoop-3.1.1/share/hadoop/yarn/*:/contrib/capacity-scheduler/*.jar - name: HADOOP_CONF_DIR value: /opt/flink/conf - name: GOOGLE_APPLICATION_CREDENTIALS value: /opt/flink/conf/gcp-service-account.json args: - standalone-job - --fromSavepoint - gs://a-savepoint - --job-classname - com.example.my.application -- ChangZhuo Chen (陳昌倬) czchen@{czchen,debian}.org http://czchen.info/ Key fingerprint = BA04 346D C2E1 FE63 C790 8793 CC65 B0CD EC27 5D5B signature.asc (849 bytes) Download Attachment |
Hi ChangZhuo, IIRC, even you have specified a savepoint when starting, the JobManager could recover from the latest checkpoint when the JobManager failed. Because when recovering, DefaultCompletedCheckpointStore will sort all the checkpoints(including the savepoint) and pick the latest one. So, could you share the JobManager logs so that we could verify the recovering behavior? Best, Yang ChangZhuo Chen (陳昌倬) <[hidden email]> 于2021年5月14日周五 下午10:03写道: On Fri, May 14, 2021 at 02:00:41PM +0200, Fabian Paul wrote: |
Free forum by Nabble | Edit this page |