Curling /jobs endpoint gives internal server error with Kubernetes HA

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

Curling /jobs endpoint gives internal server error with Kubernetes HA

Adam Roberts-2
Hi everyone, 

We are using Flink 1.13.0 on OpenShift 4.7.11 using a modified version of https://github.com/GoogleCloudPlatform/flink-on-k8s-operator. Kubernetes version is:  v1.19.0+d856161.

We are using org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory.

We are actually using, in another operator we've developed, a readiness check that involves querying the /jobs endpoint, so we know that the REST API is ready for action. 

Reliably we see a problem - here's the big ol' stack trace from the curl to /jobs (in this example I am doing it from inside one of the JobManagers)

{"errors":["Internal server error.","<Exception on server side:\njava.util.concurrent.TimeoutException: Invocation of public abstract java.util.concurrent.CompletableFuture org.apache.flink.runtime.webmonitor.RestfulGateway.requestMultipleJobDetails(org.apache.flink.api.common.time.Time) timed out.\n\tat com.sun.proxy.$Proxy34.requestMultipleJobDetails(Unknown Source)\n\tat org.apache.flink.runtime.rest.handler.job.JobIdsHandler.handleRequest(JobIdsHandler.java:61)\n\tat org.apache.flink.runtime.rest.handler.AbstractRestHandler.respondToRequest(AbstractRestHandler.java:83)\n\tat org.apache.flink.runtime.rest.handler.AbstractHandler.respondAsLeader(AbstractHandler.java:195)\n\tat org.apache.flink.runtime.rest.handler.LeaderRetrievalHandler.lambda$channelRead0$0(LeaderRetrievalHandler.java:83)\n\tat org.apache.flink.runtime.rest.handler.LeaderRetrievalHandler$$Lambda$398/0x00000000d40b4a50.accept(Unknown Source)\n\tat java.base/java.util.Optional.ifPresent(Unknown Source)\n\tat org.apache.flink.util.OptionalConsumer.ifPresent(OptionalConsumer.java:45)\n\tat org.apache.flink.runtime.rest.handler.LeaderRetrievalHandler.channelRead0(LeaderRetrievalHandler.java:80)\n\tat org.apache.flink.runtime.rest.handler.LeaderRetrievalHandler.channelRead0(LeaderRetrievalHandler.java:49)\n\tat org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99)\n\tat org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)\n\tat org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)\n\tat org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)\n\tat org.apache.flink.runtime.rest.handler.router.RouterHandler.routed(RouterHandler.java:115)\n\tat org.apache.flink.runtime.rest.handler.router.RouterHandler.channelRead0(RouterHandler.java:94)\n\tat org.apache.flink.runtime.rest.handler.router.RouterHandler.channelRead0(RouterHandler.java:55)\n\tat org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99)\n\tat org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)\n\tat org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)\n\tat org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)\n\tat org.apache.flink.shaded.netty4.io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)\n\tat org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)\n\tat org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)\n\tat org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)\n\tat org.apache.flink.runtime.rest.FileUploadHandler.channelRead0(FileUploadHandler.java:208)\n\tat org.apache.flink.runtime.rest.FileUploadHandler.channelRead0(FileUploadHandler.java:69)\n\tat org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99)\n\tat org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)\n\tat org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)\n\tat org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)\n\tat org.apache.flink.shaded.netty4.io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:436)\n\tat org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:324)\n\tat org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:296)\n\tat org.apache.flink.shaded.netty4.io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:251)\n\tat org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)\n\tat org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)\n\tat org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)\n\tat org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)\n\tat org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)\n\tat org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)\n\tat org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)\n\tat org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163)\n\tat org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:714)\n\tat org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:650)\n\tat org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:576)\n\tat org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)\n\tat org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)\n\tat org.apache.flink.shaded.netty4.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)\n\tat java.base/java.lang.Thread.run(Unknown Source)\nCaused by: akka.pattern.AskTimeoutException: Recipient [Actor[akka://flink/user/rpc/dispatcher_1#578558895]] had already been terminated. Message of type [org.apache.flink.runtime.rpc.messages.LocalFencedMessage].\n\tat akka.pattern.AskableActorRef$.recipientTerminatedException(AskSupport.scala:288)\n\tat akka.pattern.AskableActorRef$.internalAsk$extension(AskSupport.scala:330)\n\tat akka.pattern.AskSupport$class.ask(AskSupport.scala:81)\n\tat akka.pattern.package$.ask(package.scala:42)\n\tat akka.pattern.Patterns$.ask(Patterns.scala:160)\n\tat akka.pattern.Patterns.ask(Patterns.scala)\n\tat org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.ask(AkkaInvocationHandler.java:372)\n\tat org.apache.flink.runtime.rpc.akka.FencedAkkaInvocationHandler.ask(FencedAkkaInvocationHandler.java:138)\n\tat org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.invokeRpc(AkkaInvocationHandler.java:228)\n\tat org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.invoke(AkkaInvocationHandler.java:132)\n\tat org.apache.flink.runtime.rpc.akka.FencedAkkaInvocationHandler.invoke(FencedAkkaInvocationHandler.java:88)\n\t... 50 more\n\nEnd of exception on server side>"]}

Curling to /jars is completely fine (I'm guessing we're right to check /jobs, and something actually bad is going on!).

The UI also becomes practically unusable:

7a5e1200-bedc-11eb-9afc-fdba1fa2141e.png

We are aware of a few handy looking config options (around web.timeout and akka.ask.timeout), so those are the obvious things to try, but AFAIK curl has a very long default timeout and I should be able to curl this endpoint from a JobManager container without seeing the above error.

We have three JobManagers and the ConfigMaps are all provided below (there are no leftovers from previous installs).

Looking at a TaskManager we see:

03:02:02.370 [flink-akka.actor.default-dispatcher-4] ERROR org.apache.flink.runtime.taskexecutor.TaskExecutor - Registration at ResourceManager failed due to an error
java.util.concurrent.CompletionException: org.apache.flink.runtime.rpc.exceptions.FencingTokenException: Fencing token not set: Ignoring message RemoteFencedMessage(b870874c1c590d593178811f052a42c9, RemoteRpcInvocation(registerTaskExecutor(TaskExecutorRegistration, Time))) sent to akka.tcp://[hidden email]:6123/user/rpc/resourcemanager_0 because the fencing token is null.


EventProcessor is the name of our custom resource, and the URI for it (which we use to do the query) is https://eventprocessor-sampl-9e2e-eve-29ee-ep-jobmanager.acme-iaf.svc:8081.

This is a Kubernetes service that looks like this:

spec:
  clusterIP: None
  ports:
  - name: rpc
    port: 6123
    protocol: TCP
    targetPort: rpc
  - name: blob
    port: 6124
    protocol: TCP
    targetPort: blob
  - name: query
    port: 6125
    protocol: TCP
    targetPort: query
  - name: ui
    port: 8080
    protocol: TCP
    targetPort: ui
  - name: proxy-client
    port: 8081
    protocol: TCP
    targetPort: proxy-client
  selector:
    app: flink


We deploy behind nginx so we can have authorisation, but the HA communication is done over port 8080.

Important factors to note.

Of the three jobmanagers, curling the URI from the first gives the above big error.
Curling from the second jobmanager never returns.
Curling from the third jobmanager returns what we want: {"jobs":[]} - which is great.

eventprocessor-sampl-9e2e-eve-29ee-ep-dispatcher-leader this configmap says jobmanager-1 is where the dispatcher is (the second jobmanager). 

eventprocessor-sampl-9e2e-eve-29ee-ep-resourcemanager-leader this configmap says jobmanager-0 is where the resourcemanger is (the first jobmanager).

eventprocessor-sampl-9e2e-eve-29ee-ep-restserver-leader this configmap says jobmanager-1 is the restserver leader (the second jobmanager).

Everything starts up ok and this happens after a few hours. No jobs are submitted during this time period either.

Any thoughts would be very welcome - perhaps it's something being worked on as part of Flink 1.13.1? 

Happy to raise a JIRA as well if it's a confirmed "nope...you're doing it right, this looks like a bug" type thing - I'm hoping it's a simple tweak we can do on our operator side or we can get success through config.

Thanks everyone!