http://deprecated-apache-flink-user-mailing-list-archive.369.s1.nabble.com/Fw-Re-Re-About-Flink-1-7-0-HA-based-on-zookeepers-tp28491.html
Hi,All:
I found some problems about on kubernates flink of 1.6.0
mentioned by Till in "HA for 1.6.0 job cluster with docker-compose" in
the email list, but I found that Jira of flink-10291 in the email has
been shut down in 1.7.0, and I also found similar errors in on
kubernates flink of 1.7.2 at present. Could you please help me check the
Settings where I have problems? Here are my Settings:
web.log.path: /var/log/flink/flinkweb.log
taskmanager.log.pth: /var/log/flink/taskmanager/task.log
jobmanager.rpc.address: tdh2
jobmanager.rpc.port: 16223
jobstore.cache-size: 5368709120
jobstore.expiration-time: 864000
jobmanager.heap.size: 4096m
taskmanager.heap.size: 6000m
taskmanager.numberOfTaskSlots: 6
parallelism.default: 2
high-availability: zookeeper
high-availability.storageDir: hdfs:///flink1/ha/
high-availability.zookeeper.quorum: tdh2:2181,tdh4:2181,tdh3:2181
high-availability.zookeeper.path.root: /flink
high-availability.zookeeper.client.acl: open
high-availability.jobmanager.port: 62236-62239
rest.port: 18801
io.tmp.dirs: /data/disk1:/data/disk2:/data/disk3:/data/disk4:/data/disk5
security.kerberos.login.use-ticket-cache: true
security.kerberos.login.contexts: Client
security.kerberos.login.keytab: /etc/flink/conf/hdfs.keytab
security.kerberos.login.principal: hdfs
blob.server.port: 16224
query.server.port: 16225
And the following is the new error report, the earliest error report in the forwarded email message:
apache.flink.runtime.rpc.exceptions.FencingTokenException: Fencing
token not set: Ignoring message
LocalFencedMessage(98b7a69a48c04a9ca01b1eca2b714146,
LocalRpcInvocation(requestRestAddress(Time))) sent to
akka.tcp://flink@tdh2:62236/user/dispatcher because the fencing token is
null.
at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:59)
... 14 common frames omitted
2019-07-01
17:10:40.159 [flink-rest-server-netty-worker-thread-39] ERROR
o.a.f.r.rest.handler.legacy.files.StaticFileServerHandler - Could not
retrieve the redirect address.
java.util.concurrent.CompletionException:
org.apache.flink.runtime.rpc.exceptions.FencingTokenException: Fencing
token not set: Ignoring message
LocalFencedMessage(98b7a69a48c04a9ca01b1eca2b714146,
LocalRpcInvocation(requestRestAddress(Time))) sent to
akka.tcp://flink@tdh2:62236/user/dispatcher because the fencing token is
null.
at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)
at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308)
at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:593)
at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577)
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
at org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:772)
at akka.dispatch.OnComplete.internal(Future.scala:258)
at akka.dispatch.OnComplete.internal(Future.scala:256)
at akka.dispatch.japi$CallbackBridge.apply(Future.scala:186)
at akka.dispatch.japi$CallbackBridge.apply(Future.scala:183)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
at org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:83)
at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:44)
at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:252)
at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:534)
at akka.actor.ActorRef.tell(ActorRef.scala:130)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.sendErrorIfSender(AkkaRpcActor.java:371)
at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:57)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142)
at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40)
at akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
at akka.actor.ActorCell.invoke(ActorCell.scala:495)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
at akka.dispatch.Mailbox.run(Mailbox.scala:224)
at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused
by: org.apache.flink.runtime.rpc.exceptions.FencingTokenException:
Fencing token not set: Ignoring message
LocalFencedMessage(98b7a69a48c04a9ca01b1eca2b714146,
LocalRpcInvocation(requestRestAddress(Time))) sent to
akka.tcp://flink@tdh2:62236/user/dispatcher because the fencing token is
null.
-------- Forwarding messages --------
From: "Alex.Hu" <
[hidden email]>
Date: 2019-07-01 10:17:15
To: "Yang Wang" <
[hidden email]>
Cc: user <
[hidden email]>
Subject: Re:Re: About "Flink 1.7.0 HA based on zookeepers "
Hi,Wang:
Thank you very much for answering my question!
I hope to start multiple jobmanagers on kubernete, because according to my understanding in the document of jobmanager_high_availability, the standalone mode should be that the jobmanager can be replaced seamlessly when the fault node goes down through two jobmanagers nodes.
In my follow-up test last Friday, I did use the ability to enable only one jobmanager in kubernete with the jobmanager_high_availability mode turned on, and to set multiple nodes as jobmanager tags to allow kubernate to enable the failed node to automatically transition node startup. But there will still be some switching time. So I am not sure whether flink in kubernate can achieve the above setting of jobmanager seamless hot-swap?
At 2019-06-28 14:11:27, "Yang Wang" <
[hidden email]> wrote:
Hi, hu
I am not sure why do you need to start multiple jobmanagers on kubernetes. Just as the manual [1], we use a deployment of 1 to make sure kubernetes detect the crash of jobmanager and start a new one. What we should do is to add the high availability configurations [2] in flink-conf.yaml. You could use the configMap [3] to save your flink-conf.yaml and then mount into to jobmanager pod. Also you could update the flink-conf.yaml in your flink image.
HI Tan:
I have the same problem with you when running "flink-1.7.2 ON KUBERNATE HA" mode, may I ask if you have solved this problem? How? After I started the two jobmanagers normally, when I tried to kill one of them, he could not restart normally. Both jobmanagers reported this error. The specific log is as follows:
2019-06-28 09:57:57.253 [flink-akka.actor.default-dispatcher-4]
WARN akka.remote.transport.netty.NettyTransport New I/O boss #3 -
Remote connection to [null] failed with java.net.ConnectException:
Connection refused: tdh2/
192.168.208.55:565292019-06-28
09:57:57.253 [flink-akka.actor.default-dispatcher-4] WARN
akka.remote.ReliableDeliverySupervisor
flink-akka.remote.default-remote-dispatcher-14 - Association with remote
system [akka.tcp://flink@tdh2:56529] has failed, address is now gated
for [50] ms. Reason: [Association failed with
[akka.tcp://flink@tdh2:56529]] Caused by: [Connection refused:
tdh2/
192.168.208.55:56529]
2019-06-28 09:57:57.253
[flink-akka.actor.default-dispatcher-4] WARN
akka.remote.ReliableDeliverySupervisor
flink-akka.remote.default-remote-dispatcher-14 - Association with remote
system [akka.tcp://flink@tdh2:56529] has failed, address is now gated
for [50] ms. Reason: [Association failed with
[akka.tcp://flink@tdh2:56529]] Caused by: [Connection refused:
tdh2/
192.168.208.55:56529]
2019-06-28 09:57:57.260
[flink-rest-server-netty-worker-thread-7] ERROR
o.a.f.r.rest.handler.legacy.files.StaticFileServerHandler - Could not
retrieve the redirect address.
java.util.concurrent.CompletionException:
akka.pattern.AskTimeoutException: Ask timed out on
[Actor[akka.tcp://flink@tdh2:56529/user/dispatcher#299521377]] after
[10000 ms]. Sender[null] sent message of type
"org.apache.flink.runtime.rpc.messages.RemoteFencedMessage".
at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)
at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308)
at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:593)
at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577)
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
at org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:772)
at akka.dispatch.OnComplete.internal(Future.scala:258)
at akka.dispatch.OnComplete.internal(Future.scala:256)
at akka.dispatch.japi$CallbackBridge.apply(Future.scala:186)
at akka.dispatch.japi$CallbackBridge.apply(Future.scala:183)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
at org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:83)
at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:44)
at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:252)
at akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:603)
at akka.actor.Scheduler$$anon$4.run(Scheduler.scala:126)
at scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:601)
at scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109)
at scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:599)
at akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:329)
at akka.actor.LightArrayRevolverScheduler$$anon$4.executeBucket$1(LightArrayRevolverScheduler.scala:280)
at akka.actor.LightArrayRevolverScheduler$$anon$4.nextTick(LightArrayRevolverScheduler.scala:284)
at akka.actor.LightArrayRevolverScheduler$$anon$4.run(LightArrayRevolverScheduler.scala:236)
at java.lang.Thread.run(Thread.java:748)
Caused
by: akka.pattern.AskTimeoutException: Ask timed out on
[Actor[akka.tcp://flink@tdh2:56529/user/dispatcher#299521377]] after
[10000 ms]. Sender[null] sent message of type
"org.apache.flink.runtime.rpc.messages.RemoteFencedMessage".
at akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:604)
... 9 common frames omitted