standalone flink savepoint restoration

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

standalone flink savepoint restoration

Matt Anger
Hello everyone,
I am running a flink job in k8s as a standalone HA job. Now I updated my job w/ some additional sinks, which I guess have made the checkpoints incompatible with the newer version, meaning flink now crashes on bootup with the following:
 Caused by: java.lang.IllegalStateException: There is no operator for the state c9b81dfc309f1368ac7efb5864e7b693

So I rollback the deployment, log into the pod and create a savestate, and then modify my args to add

--allowNonRestoredState 
and
-s <savepoint-dir>

but it doesn't look like the standalone cluster is respecting those arguments. I've tried searching around, but haven't found any solutions. The docker image I have is running the docker-entrypoint.sh and the full arg list is below as copy-pastad out of my k8s yaml file:

 47         - job-cluster
 48         - -Djobmanager.rpc.address=$(SERVICE_NAME)
 49         - -Djobmanager.rpc.port=6123
 50         - -Dresourcemanager.rpc.port=6123
 51         - -Dparallelism.default=$(NUM_WORKERS)
 52         - -Dblob.server.port=6124
 53         - -Dqueryable-state.server.ports=6125
 54         - -Ds3.access-key=$(AWS_ACCESS_KEY_ID)
 55         - -Ds3.secret-key=$(AWS_SECRET_ACCESS_KEY)
 56         - -Dhigh-availability=zookeeper
 57         - -Dhigh-availability.jobmanager.port=50010
 58         - -Dhigh-availability.storageDir=$(S3_HA)
 59         - -Dhigh-availability.zookeeper.quorum=$(ZK_QUORUM)
 60         - -Dstate.backend=filesystem
 61         - -Dstate.checkpoints.dir=$(S3_CHECKPOINT)
 62         - -Dstate.savepoints.dir=$(S3_SAVEPOINT)
 63         - --allowNonRestoredState
 64         - -s $(S3_SAVEPOINT)

I originally didn't have the last 2 args, I added them based upon various emails I saw on this list and other google search results, to no avail.

Thanks
-Matt
Reply | Threaded
Open this post in threaded view
|

Re: standalone flink savepoint restoration

Yun Tang
Hi Matt

Have you ever configured `high-availability.cluster-id` ? If not, Flink standalone job would first try to recover from high-availability checkpoint store named '/default'. If there existed a checkpoint, Flink would always restore from checkpoint disabling 'allowNonRestoredState'[1] (always passing 'false' in). Please consider to configure `high-availability.cluster-id` to different values to enable you could resume job with dropping some operators.



Best
Yun Tang

From: Matt Anger <[hidden email]>
Sent: Thursday, October 17, 2019 5:46
To: [hidden email] <[hidden email]>
Subject: standalone flink savepoint restoration
 
Hello everyone,
I am running a flink job in k8s as a standalone HA job. Now I updated my job w/ some additional sinks, which I guess have made the checkpoints incompatible with the newer version, meaning flink now crashes on bootup with the following:
 Caused by: java.lang.IllegalStateException: There is no operator for the state c9b81dfc309f1368ac7efb5864e7b693

So I rollback the deployment, log into the pod and create a savestate, and then modify my args to add

--allowNonRestoredState 
and
-s <savepoint-dir>

but it doesn't look like the standalone cluster is respecting those arguments. I've tried searching around, but haven't found any solutions. The docker image I have is running the docker-entrypoint.sh and the full arg list is below as copy-pastad out of my k8s yaml file:

 47         - job-cluster
 48         - -Djobmanager.rpc.address=$(SERVICE_NAME)
 49         - -Djobmanager.rpc.port=6123
 50         - -Dresourcemanager.rpc.port=6123
 51         - -Dparallelism.default=$(NUM_WORKERS)
 52         - -Dblob.server.port=6124
 53         - -Dqueryable-state.server.ports=6125
 54         - -Ds3.access-key=$(AWS_ACCESS_KEY_ID)
 55         - -Ds3.secret-key=$(AWS_SECRET_ACCESS_KEY)
 56         - -Dhigh-availability=zookeeper
 57         - -Dhigh-availability.jobmanager.port=50010
 58         - -Dhigh-availability.storageDir=$(S3_HA)
 59         - -Dhigh-availability.zookeeper.quorum=$(ZK_QUORUM)
 60         - -Dstate.backend=filesystem
 61         - -Dstate.checkpoints.dir=$(S3_CHECKPOINT)
 62         - -Dstate.savepoints.dir=$(S3_SAVEPOINT)
 63         - --allowNonRestoredState
 64         - -s $(S3_SAVEPOINT)

I originally didn't have the last 2 args, I added them based upon various emails I saw on this list and other google search results, to no avail.

Thanks
-Matt
Reply | Threaded
Open this post in threaded view
|

Re: standalone flink savepoint restoration

Congxian Qiu
Hi
Do you specify the operatorid for all the operators?[1][2], asking this because from the exception you gave, if you only add new operators and all the old operators have specified operatorid, seems there would not throw such exception.


Yun Tang <[hidden email]> 于2019年10月17日周四 下午12:31写道:
Hi Matt

Have you ever configured `high-availability.cluster-id` ? If not, Flink standalone job would first try to recover from high-availability checkpoint store named '/default'. If there existed a checkpoint, Flink would always restore from checkpoint disabling 'allowNonRestoredState'[1] (always passing 'false' in). Please consider to configure `high-availability.cluster-id` to different values to enable you could resume job with dropping some operators.



Best
Yun Tang

From: Matt Anger <[hidden email]>
Sent: Thursday, October 17, 2019 5:46
To: [hidden email] <[hidden email]>
Subject: standalone flink savepoint restoration
 
Hello everyone,
I am running a flink job in k8s as a standalone HA job. Now I updated my job w/ some additional sinks, which I guess have made the checkpoints incompatible with the newer version, meaning flink now crashes on bootup with the following:
 Caused by: java.lang.IllegalStateException: There is no operator for the state c9b81dfc309f1368ac7efb5864e7b693

So I rollback the deployment, log into the pod and create a savestate, and then modify my args to add

--allowNonRestoredState 
and
-s <savepoint-dir>

but it doesn't look like the standalone cluster is respecting those arguments. I've tried searching around, but haven't found any solutions. The docker image I have is running the docker-entrypoint.sh and the full arg list is below as copy-pastad out of my k8s yaml file:

 47         - job-cluster
 48         - -Djobmanager.rpc.address=$(SERVICE_NAME)
 49         - -Djobmanager.rpc.port=6123
 50         - -Dresourcemanager.rpc.port=6123
 51         - -Dparallelism.default=$(NUM_WORKERS)
 52         - -Dblob.server.port=6124
 53         - -Dqueryable-state.server.ports=6125
 54         - -Ds3.access-key=$(AWS_ACCESS_KEY_ID)
 55         - -Ds3.secret-key=$(AWS_SECRET_ACCESS_KEY)
 56         - -Dhigh-availability=zookeeper
 57         - -Dhigh-availability.jobmanager.port=50010
 58         - -Dhigh-availability.storageDir=$(S3_HA)
 59         - -Dhigh-availability.zookeeper.quorum=$(ZK_QUORUM)
 60         - -Dstate.backend=filesystem
 61         - -Dstate.checkpoints.dir=$(S3_CHECKPOINT)
 62         - -Dstate.savepoints.dir=$(S3_SAVEPOINT)
 63         - --allowNonRestoredState
 64         - -s $(S3_SAVEPOINT)

I originally didn't have the last 2 args, I added them based upon various emails I saw on this list and other google search results, to no avail.

Thanks
-Matt