Hi all,
I encountered a problem when I upgrade flink from 1.9.1 to 1.10.0. At first, my job is running on flink stably which JM and TM is flink 1.9.1. And then I try to upgrade to 1.10.0. I stop the JM progress and start another JM progress. At this time, the JM is 1.10.0 and the TM is 1.9.1, but the JM had not start successfully. I check the JM log and find the JM trying to recover the running jobs, but is fail. the error log as the follow. And I was wondering, is there have any way to upgrate the JM and TM but not cancel the running job? 2020-03-18 18:21:11.623 [main] INFO org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint - Rest endpoint listening at 0.0.0.0:8880 2020-03-18 18:21:11.623 [main] INFO org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionService - Starting ZooKeeperLeaderElectionService ZooKeeperLeaderElectionService{leaderPath='/leader/rest_server_lock'}. 2020-03-18 18:21:11.724 [main] INFO org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint - Web frontend listening at http://0.0.0.0:8880. 2020-03-18 18:21:11.769 [main] INFO org.apache.flink.runtime.rpc.akka.AkkaRpcService - Starting RPC endpoint for org.apache.flink.runtime.resourcemanager.StandaloneResourceManager at akka://flink/user/resourcemanager . 2020-03-18 18:21:11.836 [main] INFO org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionService - Starting ZooKeeperLeaderElectionService ZooKeeperLeaderElectionService{leaderPath='/leader/dispatcher_lock'}. 2020-03-18 18:21:11.859 [main] INFO org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService - Starting ZooKeeperLeaderRetrievalService /leader/resource_manager_lock. 2020-03-18 18:21:11.859 [flink-akka.actor.default-dispatcher-3] INFO org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionService - Starting ZooKeeperLeaderElectionService ZooKeeperLeaderElectionService{leaderPath='/leader/resource_manager_lock'}. 2020-03-18 18:21:11.859 [main] INFO org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService - Starting ZooKeeperLeaderRetrievalService /leader/dispatcher_lock. 2020-03-18 18:21:28.015 [main-EventThread] INFO org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint - http://0.0.0.0:8880 was granted leadership with leaderSessionID=2120c8d2-db56-447e-8643-9c5739f26349 2020-03-18 18:21:28.016 [flink-akka.actor.default-dispatcher-3] INFO org.apache.flink.runtime.resourcemanager.StandaloneResourceManager - ResourceManager akka.tcp://flink@192.168.100.162:6123/user/resourcemanager was granted leadership with fencing token 813537a026af401a06441c182b8c453f 2020-03-18 18:21:28.020 [flink-akka.actor.default-dispatcher-3] INFO org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl - Starting the SlotManager. 2020-03-18 18:21:28.086 [main-EventThread] INFO org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess - Start SessionDispatcherLeaderProcess. 2020-03-18 18:21:28.128 [cluster-io-thread-1] INFO org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess - Recover all persisted job graphs. 2020-03-18 18:21:28.151 [flink-akka.actor.default-dispatcher-2] INFO org.apache.flink.runtime.resourcemanager.StandaloneResourceManager - Registering TaskManager with ResourceID bccd9536c1483eff42f9e31c9106cf52 (akka.tcp://flink@127.0.0.1:34120/user/taskmanager_0) at ResourceManager 2020-03-18 18:21:28.158 [cluster-io-thread-1] INFO org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess - Trying to recover job with job id b6f7fd9c542e7570ba4c19e040ab946d. 2020-03-18 18:21:28.301 [cluster-io-thread-1] INFO org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess - Stopping SessionDispatcherLeaderProcess. 2020-03-18 18:21:28.305 [cluster-io-thread-1] ERROR org.apache.flink.runtime.entrypoint.ClusterEntrypoint - Fatal error occurred in the cluster entrypoint. java.util.concurrent.CompletionException: org.apache.flink.util.FlinkRuntimeException: Could not recover job with job id b6f7fd9c542e7570ba4c19e040ab946d. at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273) at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280) at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1592) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.flink.util.FlinkRuntimeException: Could not recover job with job id b6f7fd9c542e7570ba4c19e040ab946d. at org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.recoverJob(SessionDispatcherLeaderProcess.java:149) at org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.recoverJobs(SessionDispatcherLeaderProcess.java:125) at org.apache.flink.runtime.dispatcher.runner.AbstractDispatcherLeaderProcess.supplyUnsynchronizedIfRunning(AbstractDispatcherLeaderProcess.java:184) at org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.recoverJobsIfRunning(SessionDispatcherLeaderProcess.java:115) at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590) ... 3 common frames omitted Caused by: org.apache.flink.util.FlinkException: Could not retrieve submitted JobGraph from state handle under /b6f7fd9c542e7570ba4c19e040ab946d. This indicates that the retrieved state handle is broken. Try cleaning the state handle store. at org.apache.flink.runtime.jobmanager.ZooKeeperJobGraphStore.recoverJobGraph(ZooKeeperJobGraphStore.java:191) at org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.recoverJob(SessionDispatcherLeaderProcess.java:146) ... 7 common frames omitted Caused by: java.io.InvalidClassException: org.apache.flink.api.common.operators.ResourceSpec; incompatible types for field cpuCores at java.io.ObjectStreamClass.matchFields(ObjectStreamClass.java:2399) at java.io.ObjectStreamClass.getReflector(ObjectStreamClass.java:2293) at java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:741) at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1876) at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1745) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2033) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1567) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2278) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2202) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2060) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1567) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2278) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2202) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2060) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1567) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2278) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2202) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2060) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1567) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:427) at java.util.ArrayList.readObject(ArrayList.java:797) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1158) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2060) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1567) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2278) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2202) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2060) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1567) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2278) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2202) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2060) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1567) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2278) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2202) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2060) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1567) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:427) at java.util.ArrayList.readObject(ArrayList.java:797) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1158) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2060) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1567) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2278) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2202) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2060) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1567) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2278) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2202) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2060) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1567) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2278) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2202) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2060) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1567) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:427) at java.util.ArrayList.readObject(ArrayList.java:797) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1158) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2060) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1567) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2278) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2202) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2060) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1567) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2278) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2202) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2060) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1567) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2278) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2202) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2060) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1567) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:427) at java.util.ArrayList.readObject(ArrayList.java:797) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1158) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2060) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1567) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2278) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2202) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2060) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1567) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:427) at java.util.HashMap.readObject(HashMap.java:1409) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1158) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2060) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1567) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2278) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2202) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2060) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1567) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2278) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2158) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2060) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1567) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:427) at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:576) at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:555) at org.apache.flink.runtime.state.RetrievableStreamStateHandle.retrieveState(RetrievableStreamStateHandle.java:58) at org.apache.flink.runtime.jobmanager.ZooKeeperJobGraphStore.recoverJobGraph(ZooKeeperJobGraphStore.java:185) ... 8 common frames omitted 2020-03-18 18:21:28.322 [BlobServer shutdown hook] INFO org.apache.flink.runtime.blob.BlobServer - Stopped BLOB server at 0.0.0.0:45287 |
Hi Reo, I do not think this is always guaranteed by Flink API. The usual supported way is to: - take a savepoint - upgrade the cluster (JM and TM) - maybe rebuild the job against the new Flink version - start the job from the savepoint [1] The externalised checkpoints also do not have to be always compatible between the Flink versions. In your particular case, one of the fields in ResourceSpec class has indeed changed its type and its deserialisation has failed upon job recovery. This mechanism assumes that nothing has changed: neither job nor Flink version. It is for recovery from a failure. Best, Andrey On Wed, Mar 18, 2020 at 4:27 PM Reo Lei <[hidden email]> wrote:
|
Hey Andrey, Thanks for your answer. I know use savepoint to upgrade the flink cluster is the available, but that mean when I upgrading my flink cluster I need to cancel all jobs from JM. And the stream process will be stopped, that will have an impact for the production system which is time sensitive, such as monitor system. That will make the system unavaliable in a short time, and if the number of tasks keeps rising, the time that the system stops will also keep rising. That is not acceptable for the system. So I am finding a way to upgrade my flink cluster smoothly and make the impact less. BR, Reo Andrey Zagrebin <[hidden email]> 于2020年3月18日周三 下午9:57写道:
|
Hi Reo, if you want to reduce downtime, the usual approach is the following: - Let your job run in 1.9 cluster for a while - Start a job in 1.10 where you migrate state, but dump output to /dev/null - As soon as 1.10 job catches up, stop old job and start writing output into the actual storage. It requires some tooling around it, but will actually do what you want. On Thu, Mar 19, 2020 at 3:51 AM Reo Lei <[hidden email]> wrote:
|
Free forum by Nabble | Edit this page |