Fw:Re:Re: About "Flink 1.7.0 HA based on zookeepers "

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

Fw:Re:Re: About "Flink 1.7.0 HA based on zookeepers "

胡逸才

Hi,All:

   I found some problems about on kubernates flink of 1.6.0 mentioned by Till in "HA for 1.6.0 job cluster with docker-compose" in the email list, but I found that Jira of flink-10291 in the email has been shut down in 1.7.0, and I also found similar errors in on kubernates flink of 1.7.2 at present. Could you please help me check the Settings where I have problems? Here are my Settings:

web.log.path: /var/log/flink/flinkweb.log 
taskmanager.log.pth: /var/log/flink/taskmanager/task.log 

jobmanager.rpc.address: tdh2
jobmanager.rpc.port: 16223
jobstore.cache-size: 5368709120
jobstore.expiration-time: 864000
jobmanager.heap.size: 4096m

taskmanager.heap.size:  6000m
taskmanager.numberOfTaskSlots: 6
parallelism.default: 2

high-availability: zookeeper
high-availability.storageDir: hdfs:///flink1/ha/
high-availability.zookeeper.quorum: tdh2:2181,tdh4:2181,tdh3:2181
high-availability.zookeeper.path.root: /flink
high-availability.zookeeper.client.acl: open
high-availability.jobmanager.port: 62236-62239

rest.port: 18801
io.tmp.dirs: /data/disk1:/data/disk2:/data/disk3:/data/disk4:/data/disk5

security.kerberos.login.use-ticket-cache: true
security.kerberos.login.contexts: Client
security.kerberos.login.keytab: /etc/flink/conf/hdfs.keytab
security.kerberos.login.principal: hdfs

blob.server.port: 16224
query.server.port: 16225


   And the following is the new error report, the earliest error report in the forwarded email message:

apache.flink.runtime.rpc.exceptions.FencingTokenException: Fencing token not set: Ignoring message LocalFencedMessage(98b7a69a48c04a9ca01b1eca2b714146, LocalRpcInvocation(requestRestAddress(Time))) sent to akka.tcp://flink@tdh2:62236/user/dispatcher because the fencing token is null.
at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:59)
... 14 common frames omitted
2019-07-01 17:10:40.159 [flink-rest-server-netty-worker-thread-39] ERROR o.a.f.r.rest.handler.legacy.files.StaticFileServerHandler  - Could not retrieve the redirect address.
java.util.concurrent.CompletionException: org.apache.flink.runtime.rpc.exceptions.FencingTokenException: Fencing token not set: Ignoring message LocalFencedMessage(98b7a69a48c04a9ca01b1eca2b714146, LocalRpcInvocation(requestRestAddress(Time))) sent to akka.tcp://flink@tdh2:62236/user/dispatcher because the fencing token is null.
at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)
at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308)
at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:593)
at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577)
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
at org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:772)
at akka.dispatch.OnComplete.internal(Future.scala:258)
at akka.dispatch.OnComplete.internal(Future.scala:256)
at akka.dispatch.japi$CallbackBridge.apply(Future.scala:186)
at akka.dispatch.japi$CallbackBridge.apply(Future.scala:183)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
at org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:83)
at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:44)
at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:252)
at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:534)
at akka.actor.ActorRef.tell(ActorRef.scala:130)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.sendErrorIfSender(AkkaRpcActor.java:371)
at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:57)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142)
at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40)
at akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
at akka.actor.ActorCell.invoke(ActorCell.scala:495)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
at akka.dispatch.Mailbox.run(Mailbox.scala:224)
at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: org.apache.flink.runtime.rpc.exceptions.FencingTokenException: Fencing token not set: Ignoring message LocalFencedMessage(98b7a69a48c04a9ca01b1eca2b714146, LocalRpcInvocation(requestRestAddress(Time))) sent to akka.tcp://flink@tdh2:62236/user/dispatcher because the fencing token is null.






-------- Forwarding messages --------
From: "Alex.Hu" <[hidden email]>
Date: 2019-07-01 10:17:15
To: "Yang Wang" <[hidden email]>
Cc: user <[hidden email]>
Subject: Re:Re: About "Flink 1.7.0 HA based on zookeepers "
Hi,Wang:
    Thank you very much for answering my question!
    I hope to start multiple jobmanagers on kubernete, because according to my understanding in the document of jobmanager_high_availability, the standalone mode should be that the jobmanager can be replaced seamlessly when the fault node goes down through two jobmanagers nodes. 
    In my follow-up test last Friday, I did use the ability to enable only one jobmanager in kubernete with the jobmanager_high_availability mode turned on, and to set multiple nodes as jobmanager tags to allow kubernate to enable the failed node to automatically transition node startup. But there will still be some switching time. So I am not sure whether flink in kubernate can achieve the above setting of jobmanager seamless hot-swap?


At 2019-06-28 14:11:27, "Yang Wang" <[hidden email]> wrote:
Hi, hu

I am not sure why do you need to start multiple jobmanagers on kubernetes. Just as the manual [1], we use a deployment of 1 to make sure kubernetes detect the crash of jobmanager and start a new one. What we should do is to add the high availability configurations [2] in flink-conf.yaml. You could use the configMap [3] to save your flink-conf.yaml and then mount into to jobmanager pod. Also you could update the flink-conf.yaml in your flink image.


胡逸才 <[hidden email]> 于2019年6月28日周五 上午11:09写道:
HI Tan:
I have the same problem with you when running "flink-1.7.2 ON KUBERNATE HA" mode, may I ask if you have solved this problem? How? After I started the two jobmanagers normally, when I tried to kill one of them, he could not restart normally. Both jobmanagers reported this error. The specific log is as follows:




2019-06-28 09:57:57.253 [flink-akka.actor.default-dispatcher-4] WARN  akka.remote.transport.netty.NettyTransport New I/O boss #3 - Remote connection to [null] failed with java.net.ConnectException: Connection refused: tdh2/192.168.208.55:56529
2019-06-28 09:57:57.253 [flink-akka.actor.default-dispatcher-4] WARN  akka.remote.ReliableDeliverySupervisor flink-akka.remote.default-remote-dispatcher-14 - Association with remote system [akka.tcp://flink@tdh2:56529] has failed, address is now gated for [50] ms. Reason: [Association failed with [akka.tcp://flink@tdh2:56529]] Caused by: [Connection refused: tdh2/192.168.208.55:56529]
2019-06-28 09:57:57.253 [flink-akka.actor.default-dispatcher-4] WARN  akka.remote.ReliableDeliverySupervisor flink-akka.remote.default-remote-dispatcher-14 - Association with remote system [akka.tcp://flink@tdh2:56529] has failed, address is now gated for [50] ms. Reason: [Association failed with [akka.tcp://flink@tdh2:56529]] Caused by: [Connection refused: tdh2/192.168.208.55:56529]
2019-06-28 09:57:57.260 [flink-rest-server-netty-worker-thread-7] ERROR o.a.f.r.rest.handler.legacy.files.StaticFileServerHandler  - Could not retrieve the redirect address.
java.util.concurrent.CompletionException: akka.pattern.AskTimeoutException: Ask timed out on [Actor[akka.tcp://flink@tdh2:56529/user/dispatcher#299521377]] after [10000 ms]. Sender[null] sent message of type "org.apache.flink.runtime.rpc.messages.RemoteFencedMessage".
at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)
at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308)
at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:593)
at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577)
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
at org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:772)
at akka.dispatch.OnComplete.internal(Future.scala:258)
at akka.dispatch.OnComplete.internal(Future.scala:256)
at akka.dispatch.japi$CallbackBridge.apply(Future.scala:186)
at akka.dispatch.japi$CallbackBridge.apply(Future.scala:183)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
at org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:83)
at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:44)
at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:252)
at akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:603)
at akka.actor.Scheduler$$anon$4.run(Scheduler.scala:126)
at scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:601)
at scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109)
at scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:599)
at akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:329)
at akka.actor.LightArrayRevolverScheduler$$anon$4.executeBucket$1(LightArrayRevolverScheduler.scala:280)
at akka.actor.LightArrayRevolverScheduler$$anon$4.nextTick(LightArrayRevolverScheduler.scala:284)
at akka.actor.LightArrayRevolverScheduler$$anon$4.run(LightArrayRevolverScheduler.scala:236)
at java.lang.Thread.run(Thread.java:748)
Caused by: akka.pattern.AskTimeoutException: Ask timed out on [Actor[akka.tcp://flink@tdh2:56529/user/dispatcher#299521377]] after [10000 ms]. Sender[null] sent message of type "org.apache.flink.runtime.rpc.messages.RemoteFencedMessage".
at akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:604)
... 9 common frames omitted


 



 



 

Reply | Threaded
Open this post in threaded view
|

Re: Fw:Re:Re: About "Flink 1.7.0 HA based on zookeepers "

Till Rohrmann
Hi,

how did you start the job masters? Could you maybe share the logs of all components? It looks as if the leader election is not working properly. One thing to make sure is that you specify for every new HA cluster a different cluster ID via `high-availability.cluster-id: cluster_xy`. That way you separate the ZNodes in ZooKeeper so that every cluster uses their own nodes and does not interfere with other clusters. Usually this happens via the JobID but in the case of the `StandaloneJobClusterEntrypoint` we set it to 0. More recently, this was slightly changed. See https://issues.apache.org/jira/browse/FLINK-12617 for more information.

Cheers,
Till

On Mon, Jul 1, 2019 at 11:36 AM Alex.Hu <[hidden email]> wrote:

Hi,All:

   I found some problems about on kubernates flink of 1.6.0 mentioned by Till in "HA for 1.6.0 job cluster with docker-compose" in the email list, but I found that Jira of flink-10291 in the email has been shut down in 1.7.0, and I also found similar errors in on kubernates flink of 1.7.2 at present. Could you please help me check the Settings where I have problems? Here are my Settings:

web.log.path: /var/log/flink/flinkweb.log 
taskmanager.log.pth: /var/log/flink/taskmanager/task.log 

jobmanager.rpc.address: tdh2
jobmanager.rpc.port: 16223
jobstore.cache-size: 5368709120
jobstore.expiration-time: 864000
jobmanager.heap.size: 4096m

taskmanager.heap.size:  6000m
taskmanager.numberOfTaskSlots: 6
parallelism.default: 2

high-availability: zookeeper
high-availability.storageDir: hdfs:///flink1/ha/
high-availability.zookeeper.quorum: tdh2:2181,tdh4:2181,tdh3:2181
high-availability.zookeeper.path.root: /flink
high-availability.zookeeper.client.acl: open
high-availability.jobmanager.port: 62236-62239

rest.port: 18801
io.tmp.dirs: /data/disk1:/data/disk2:/data/disk3:/data/disk4:/data/disk5

security.kerberos.login.use-ticket-cache: true
security.kerberos.login.contexts: Client
security.kerberos.login.keytab: /etc/flink/conf/hdfs.keytab
security.kerberos.login.principal: hdfs

blob.server.port: 16224
query.server.port: 16225


   And the following is the new error report, the earliest error report in the forwarded email message:

apache.flink.runtime.rpc.exceptions.FencingTokenException: Fencing token not set: Ignoring message LocalFencedMessage(98b7a69a48c04a9ca01b1eca2b714146, LocalRpcInvocation(requestRestAddress(Time))) sent to akka.tcp://flink@tdh2:62236/user/dispatcher because the fencing token is null.
at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:59)
... 14 common frames omitted
2019-07-01 17:10:40.159 [flink-rest-server-netty-worker-thread-39] ERROR o.a.f.r.rest.handler.legacy.files.StaticFileServerHandler  - Could not retrieve the redirect address.
java.util.concurrent.CompletionException: org.apache.flink.runtime.rpc.exceptions.FencingTokenException: Fencing token not set: Ignoring message LocalFencedMessage(98b7a69a48c04a9ca01b1eca2b714146, LocalRpcInvocation(requestRestAddress(Time))) sent to akka.tcp://flink@tdh2:62236/user/dispatcher because the fencing token is null.
at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)
at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308)
at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:593)
at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577)
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
at org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:772)
at akka.dispatch.OnComplete.internal(Future.scala:258)
at akka.dispatch.OnComplete.internal(Future.scala:256)
at akka.dispatch.japi$CallbackBridge.apply(Future.scala:186)
at akka.dispatch.japi$CallbackBridge.apply(Future.scala:183)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
at org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:83)
at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:44)
at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:252)
at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:534)
at akka.actor.ActorRef.tell(ActorRef.scala:130)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.sendErrorIfSender(AkkaRpcActor.java:371)
at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:57)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142)
at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40)
at akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
at akka.actor.ActorCell.invoke(ActorCell.scala:495)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
at akka.dispatch.Mailbox.run(Mailbox.scala:224)
at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: org.apache.flink.runtime.rpc.exceptions.FencingTokenException: Fencing token not set: Ignoring message LocalFencedMessage(98b7a69a48c04a9ca01b1eca2b714146, LocalRpcInvocation(requestRestAddress(Time))) sent to akka.tcp://flink@tdh2:62236/user/dispatcher because the fencing token is null.






-------- Forwarding messages --------
From: "Alex.Hu" <[hidden email]>
Date: 2019-07-01 10:17:15
To: "Yang Wang" <[hidden email]>
Cc: user <[hidden email]>
Subject: Re:Re: About "Flink 1.7.0 HA based on zookeepers "
Hi,Wang:
    Thank you very much for answering my question!
    I hope to start multiple jobmanagers on kubernete, because according to my understanding in the document of jobmanager_high_availability, the standalone mode should be that the jobmanager can be replaced seamlessly when the fault node goes down through two jobmanagers nodes. 
    In my follow-up test last Friday, I did use the ability to enable only one jobmanager in kubernete with the jobmanager_high_availability mode turned on, and to set multiple nodes as jobmanager tags to allow kubernate to enable the failed node to automatically transition node startup. But there will still be some switching time. So I am not sure whether flink in kubernate can achieve the above setting of jobmanager seamless hot-swap?


At 2019-06-28 14:11:27, "Yang Wang" <[hidden email]> wrote:
Hi, hu

I am not sure why do you need to start multiple jobmanagers on kubernetes. Just as the manual [1], we use a deployment of 1 to make sure kubernetes detect the crash of jobmanager and start a new one. What we should do is to add the high availability configurations [2] in flink-conf.yaml. You could use the configMap [3] to save your flink-conf.yaml and then mount into to jobmanager pod. Also you could update the flink-conf.yaml in your flink image.


胡逸才 <[hidden email]> 于2019年6月28日周五 上午11:09写道:
HI Tan:
I have the same problem with you when running "flink-1.7.2 ON KUBERNATE HA" mode, may I ask if you have solved this problem? How? After I started the two jobmanagers normally, when I tried to kill one of them, he could not restart normally. Both jobmanagers reported this error. The specific log is as follows:




2019-06-28 09:57:57.253 [flink-akka.actor.default-dispatcher-4] WARN  akka.remote.transport.netty.NettyTransport New I/O boss #3 - Remote connection to [null] failed with java.net.ConnectException: Connection refused: tdh2/192.168.208.55:56529
2019-06-28 09:57:57.253 [flink-akka.actor.default-dispatcher-4] WARN  akka.remote.ReliableDeliverySupervisor flink-akka.remote.default-remote-dispatcher-14 - Association with remote system [akka.tcp://flink@tdh2:56529] has failed, address is now gated for [50] ms. Reason: [Association failed with [akka.tcp://flink@tdh2:56529]] Caused by: [Connection refused: tdh2/192.168.208.55:56529]
2019-06-28 09:57:57.253 [flink-akka.actor.default-dispatcher-4] WARN  akka.remote.ReliableDeliverySupervisor flink-akka.remote.default-remote-dispatcher-14 - Association with remote system [akka.tcp://flink@tdh2:56529] has failed, address is now gated for [50] ms. Reason: [Association failed with [akka.tcp://flink@tdh2:56529]] Caused by: [Connection refused: tdh2/192.168.208.55:56529]
2019-06-28 09:57:57.260 [flink-rest-server-netty-worker-thread-7] ERROR o.a.f.r.rest.handler.legacy.files.StaticFileServerHandler  - Could not retrieve the redirect address.
java.util.concurrent.CompletionException: akka.pattern.AskTimeoutException: Ask timed out on [Actor[akka.tcp://flink@tdh2:56529/user/dispatcher#299521377]] after [10000 ms]. Sender[null] sent message of type "org.apache.flink.runtime.rpc.messages.RemoteFencedMessage".
at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)
at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308)
at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:593)
at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577)
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
at org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:772)
at akka.dispatch.OnComplete.internal(Future.scala:258)
at akka.dispatch.OnComplete.internal(Future.scala:256)
at akka.dispatch.japi$CallbackBridge.apply(Future.scala:186)
at akka.dispatch.japi$CallbackBridge.apply(Future.scala:183)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
at org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:83)
at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:44)
at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:252)
at akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:603)
at akka.actor.Scheduler$$anon$4.run(Scheduler.scala:126)
at scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:601)
at scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109)
at scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:599)
at akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:329)
at akka.actor.LightArrayRevolverScheduler$$anon$4.executeBucket$1(LightArrayRevolverScheduler.scala:280)
at akka.actor.LightArrayRevolverScheduler$$anon$4.nextTick(LightArrayRevolverScheduler.scala:284)
at akka.actor.LightArrayRevolverScheduler$$anon$4.run(LightArrayRevolverScheduler.scala:236)
at java.lang.Thread.run(Thread.java:748)
Caused by: akka.pattern.AskTimeoutException: Ask timed out on [Actor[akka.tcp://flink@tdh2:56529/user/dispatcher#299521377]] after [10000 ms]. Sender[null] sent message of type "org.apache.flink.runtime.rpc.messages.RemoteFencedMessage".
at akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:604)
... 9 common frames omitted


 



 



 

Reply | Threaded
Open this post in threaded view
|

Re:Re: Fw:Re:Re: About "Flink 1.7.0 HA based on zookeepers "

胡逸才
Hi,Till:
    Thank you very much for answering my question!
    In the attachment I store the logs from my last attempt to run (flink started debug-level logs), and my flink cluster runs in a kubernate-based hadoop cluster with 3 nodes and kerberos security authentication enabled. I made a flink1.7.2 image based on the docker image given in wiki, and made flink's kubernates tag yaml configuration based on other kubernates tag setting files in hadoop cluster, in which all pods were set to use the host network port directly. So I adjusted some port parameters in my flink cluster Settings. I currently run only this one flink cluster, and just started the jobmanager pod in the test, according to your email reply I added "high - the availability. The zookeeper. Cluster - id" after the relevant parameters, start after 2 nodes of pod, kubernate shows a node normal boot (tdh3), while another node startup anomaly (tdh2), continue to show before the mail within the same error. I have attached flink configuration file flink-yaml, masters, slaves configuration file, hadoop cluster zookeeper configuration file, and jobmanager pod label configuration file jobmanager-yaml. Thank you very much. Could you please help me check what mistakes I have made?


Thank you,
Alex.hu




At 2019-07-02 21:46:35, "Till Rohrmann" <[hidden email]> wrote:
Hi,

how did you start the job masters? Could you maybe share the logs of all components? It looks as if the leader election is not working properly. One thing to make sure is that you specify for every new HA cluster a different cluster ID via `high-availability.cluster-id: cluster_xy`. That way you separate the ZNodes in ZooKeeper so that every cluster uses their own nodes and does not interfere with other clusters. Usually this happens via the JobID but in the case of the `StandaloneJobClusterEntrypoint` we set it to 0. More recently, this was slightly changed. See https://issues.apache.org/jira/browse/FLINK-12617 for more information.

Cheers,
Till

On Mon, Jul 1, 2019 at 11:36 AM Alex.Hu <[hidden email]> wrote:

Hi,All:

   I found some problems about on kubernates flink of 1.6.0 mentioned by Till in "HA for 1.6.0 job cluster with docker-compose" in the email list, but I found that Jira of flink-10291 in the email has been shut down in 1.7.0, and I also found similar errors in on kubernates flink of 1.7.2 at present. Could you please help me check the Settings where I have problems? Here are my Settings:

web.log.path: /var/log/flink/flinkweb.log 
taskmanager.log.pth: /var/log/flink/taskmanager/task.log 

jobmanager.rpc.address: tdh2
jobmanager.rpc.port: 16223
jobstore.cache-size: 5368709120
jobstore.expiration-time: 864000
jobmanager.heap.size: 4096m

taskmanager.heap.size:  6000m
taskmanager.numberOfTaskSlots: 6
parallelism.default: 2

high-availability: zookeeper
high-availability.storageDir: hdfs:///flink1/ha/
high-availability.zookeeper.quorum: tdh2:2181,tdh4:2181,tdh3:2181
high-availability.zookeeper.path.root: /flink
high-availability.zookeeper.client.acl: open
high-availability.jobmanager.port: 62236-62239

rest.port: 18801
io.tmp.dirs: /data/disk1:/data/disk2:/data/disk3:/data/disk4:/data/disk5

security.kerberos.login.use-ticket-cache: true
security.kerberos.login.contexts: Client
security.kerberos.login.keytab: /etc/flink/conf/hdfs.keytab
security.kerberos.login.principal: hdfs

blob.server.port: 16224
query.server.port: 16225


   And the following is the new error report, the earliest error report in the forwarded email message:

apache.flink.runtime.rpc.exceptions.FencingTokenException: Fencing token not set: Ignoring message LocalFencedMessage(98b7a69a48c04a9ca01b1eca2b714146, LocalRpcInvocation(requestRestAddress(Time))) sent to akka.tcp://flink@tdh2:62236/user/dispatcher because the fencing token is null.
at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:59)
... 14 common frames omitted
2019-07-01 17:10:40.159 [flink-rest-server-netty-worker-thread-39] ERROR o.a.f.r.rest.handler.legacy.files.StaticFileServerHandler  - Could not retrieve the redirect address.
java.util.concurrent.CompletionException: org.apache.flink.runtime.rpc.exceptions.FencingTokenException: Fencing token not set: Ignoring message LocalFencedMessage(98b7a69a48c04a9ca01b1eca2b714146, LocalRpcInvocation(requestRestAddress(Time))) sent to akka.tcp://flink@tdh2:62236/user/dispatcher because the fencing token is null.
at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)
at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308)
at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:593)
at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577)
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
at org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:772)
at akka.dispatch.OnComplete.internal(Future.scala:258)
at akka.dispatch.OnComplete.internal(Future.scala:256)
at akka.dispatch.japi$CallbackBridge.apply(Future.scala:186)
at akka.dispatch.japi$CallbackBridge.apply(Future.scala:183)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
at org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:83)
at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:44)
at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:252)
at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:534)
at akka.actor.ActorRef.tell(ActorRef.scala:130)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.sendErrorIfSender(AkkaRpcActor.java:371)
at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:57)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142)
at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40)
at akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
at akka.actor.ActorCell.invoke(ActorCell.scala:495)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
at akka.dispatch.Mailbox.run(Mailbox.scala:224)
at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: org.apache.flink.runtime.rpc.exceptions.FencingTokenException: Fencing token not set: Ignoring message LocalFencedMessage(98b7a69a48c04a9ca01b1eca2b714146, LocalRpcInvocation(requestRestAddress(Time))) sent to akka.tcp://flink@tdh2:62236/user/dispatcher because the fencing token is null.






-------- Forwarding messages --------
From: "Alex.Hu" <[hidden email]>
Date: 2019-07-01 10:17:15
To: "Yang Wang" <[hidden email]>
Cc: user <[hidden email]>
Subject: Re:Re: About "Flink 1.7.0 HA based on zookeepers "
Hi,Wang:
    Thank you very much for answering my question!
    I hope to start multiple jobmanagers on kubernete, because according to my understanding in the document of jobmanager_high_availability, the standalone mode should be that the jobmanager can be replaced seamlessly when the fault node goes down through two jobmanagers nodes. 
    In my follow-up test last Friday, I did use the ability to enable only one jobmanager in kubernete with the jobmanager_high_availability mode turned on, and to set multiple nodes as jobmanager tags to allow kubernate to enable the failed node to automatically transition node startup. But there will still be some switching time. So I am not sure whether flink in kubernate can achieve the above setting of jobmanager seamless hot-swap?


At 2019-06-28 14:11:27, "Yang Wang" <[hidden email]> wrote:
Hi, hu

I am not sure why do you need to start multiple jobmanagers on kubernetes. Just as the manual [1], we use a deployment of 1 to make sure kubernetes detect the crash of jobmanager and start a new one. What we should do is to add the high availability configurations [2] in flink-conf.yaml. You could use the configMap [3] to save your flink-conf.yaml and then mount into to jobmanager pod. Also you could update the flink-conf.yaml in your flink image.


胡逸才 <[hidden email]> 于2019年6月28日周五 上午11:09写道:
HI Tan:
I have the same problem with you when running "flink-1.7.2 ON KUBERNATE HA" mode, may I ask if you have solved this problem? How? After I started the two jobmanagers normally, when I tried to kill one of them, he could not restart normally. Both jobmanagers reported this error. The specific log is as follows:




2019-06-28 09:57:57.253 [flink-akka.actor.default-dispatcher-4] WARN  akka.remote.transport.netty.NettyTransport New I/O boss #3 - Remote connection to [null] failed with java.net.ConnectException: Connection refused: tdh2/192.168.208.55:56529
2019-06-28 09:57:57.253 [flink-akka.actor.default-dispatcher-4] WARN  akka.remote.ReliableDeliverySupervisor flink-akka.remote.default-remote-dispatcher-14 - Association with remote system [akka.tcp://flink@tdh2:56529] has failed, address is now gated for [50] ms. Reason: [Association failed with [akka.tcp://flink@tdh2:56529]] Caused by: [Connection refused: tdh2/192.168.208.55:56529]
2019-06-28 09:57:57.253 [flink-akka.actor.default-dispatcher-4] WARN  akka.remote.ReliableDeliverySupervisor flink-akka.remote.default-remote-dispatcher-14 - Association with remote system [akka.tcp://flink@tdh2:56529] has failed, address is now gated for [50] ms. Reason: [Association failed with [akka.tcp://flink@tdh2:56529]] Caused by: [Connection refused: tdh2/192.168.208.55:56529]
2019-06-28 09:57:57.260 [flink-rest-server-netty-worker-thread-7] ERROR o.a.f.r.rest.handler.legacy.files.StaticFileServerHandler  - Could not retrieve the redirect address.
java.util.concurrent.CompletionException: akka.pattern.AskTimeoutException: Ask timed out on [Actor[akka.tcp://flink@tdh2:56529/user/dispatcher#299521377]] after [10000 ms]. Sender[null] sent message of type "org.apache.flink.runtime.rpc.messages.RemoteFencedMessage".
at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)
at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308)
at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:593)
at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577)
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
at org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:772)
at akka.dispatch.OnComplete.internal(Future.scala:258)
at akka.dispatch.OnComplete.internal(Future.scala:256)
at akka.dispatch.japi$CallbackBridge.apply(Future.scala:186)
at akka.dispatch.japi$CallbackBridge.apply(Future.scala:183)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
at org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:83)
at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:44)
at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:252)
at akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:603)
at akka.actor.Scheduler$$anon$4.run(Scheduler.scala:126)
at scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:601)
at scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109)
at scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:599)
at akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:329)
at akka.actor.LightArrayRevolverScheduler$$anon$4.executeBucket$1(LightArrayRevolverScheduler.scala:280)
at akka.actor.LightArrayRevolverScheduler$$anon$4.nextTick(LightArrayRevolverScheduler.scala:284)
at akka.actor.LightArrayRevolverScheduler$$anon$4.run(LightArrayRevolverScheduler.scala:236)
at java.lang.Thread.run(Thread.java:748)
Caused by: akka.pattern.AskTimeoutException: Ask timed out on [Actor[akka.tcp://flink@tdh2:56529/user/dispatcher#299521377]] after [10000 ms]. Sender[null] sent message of type "org.apache.flink.runtime.rpc.messages.RemoteFencedMessage".
at akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:604)
... 9 common frames omitted


 



 



 



 


Flink172HALogsAndConf.tar (61K) Download Attachment