Internal TLS on K8s

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

Internal TLS on K8s

Enrique
Hi All,

I am trying to configure internal TLS between TMs and JMs on Kubernetes. I have a session cluster deployed using a StatefulSet in which I mount Client certificates and a CA cert which I then bundle into respectively a Keystore and Truststore. I see the correct Flink properties to point at these with their passwords.

When I access the UI, I can see that the JM has been able to find the TMs and I have verified the certificates using OpenSSL and Curl and they look good.

The only thing I have noticed is the following warning message repeatedly:
```
2021-06-24 12:54:56,170 WARN  akka.remote.transport.netty.NettyTransport                   [] - Remote connection to [/10.254.20.1:49694] failed with javax.net.ssl.SSLException: Cannot kickstart, the connection is broken or closed
```

Enabling DEBUG logging doesn't give me much more, just the fact that the connection has closed and the exception is not raised:
```
2021-06-24 12:54:56,170 DEBUG org.apache.flink.shaded.akka.org.jboss.netty.handler.ssl.SslHandler [] - Swallowing an exception raised while writing non-app data
java.nio.channels.ClosedChannelException: null
        at org.apache.flink.shaded.akka.org.jboss.netty.channel.socket.nio.AbstractNioWorker.cleanUpWriteBuffer(AbstractNioWorker.java:433) ~[flink-dist_2.11-1.13.0.jar:1.13.0]
        at org.apache.flink.shaded.akka.org.jboss.netty.channel.socket.nio.AbstractNioWorker.writeFromUserCode(AbstractNioWorker.java:128) ~[flink-dist_2.11-1.13.0.jar:1.13.0]
        at org.apache.flink.shaded.akka.org.jboss.netty.channel.socket.nio.NioServerSocketPipelineSink.handleAcceptedSocket(NioServerSocketPipelineSink.java:99) ~[flink-dist_2.11-1.13.0.jar:1.13.0]
        at org.apache.flink.shaded.akka.org.jboss.netty.channel.socket.nio.NioServerSocketPipelineSink.eventSunk(NioServerSocketPipelineSink.java:36) ~[flink-dist_2.11-1.13.0.jar:1.13.0]
        at org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendDownstream(DefaultChannelPipeline.java:779) ~[flink-dist_2.11-1.13.0.jar:1.13.0]
        at org.apache.flink.shaded.akka.org.jboss.netty.channel.Channels.write(Channels.java:725) ~[flink-dist_2.11-1.13.0.jar:1.13.0]
        at org.apache.flink.shaded.akka.org.jboss.netty.channel.Channels.write(Channels.java:686) ~[flink-dist_2.11-1.13.0.jar:1.13.0]
        at org.apache.flink.shaded.akka.org.jboss.netty.handler.ssl.SslHandler.wrapNonAppData(SslHandler.java:1111) ~[flink-dist_2.11-1.13.0.jar:1.13.0]
        at org.apache.flink.shaded.akka.org.jboss.netty.handler.ssl.SslHandler.unwrap(SslHandler.java:1253) ~[flink-dist_2.11-1.13.0.jar:1.13.0]
        at org.apache.flink.shaded.akka.org.jboss.netty.handler.ssl.SslHandler.unwrapNonAppData(SslHandler.java:1167) ~[flink-dist_2.11-1.13.0.jar:1.13.0]
        at org.apache.flink.shaded.akka.org.jboss.netty.handler.ssl.SslHandler.wrap(SslHandler.java:1039) ~[flink-dist_2.11-1.13.0.jar:1.13.0]
        at org.apache.flink.shaded.akka.org.jboss.netty.handler.ssl.SslHandler.handleDownstream(SslHandler.java:557) ~[flink-dist_2.11-1.13.0.jar:1.13.0]
        at org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipeline.sendDownstream(DefaultChannelPipeline.java:591) ~[flink-dist_2.11-1.13.0.jar:1.13.0]
        at org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendDownstream(DefaultChannelPipeline.java:784) ~[flink-dist_2.11-1.13.0.jar:1.13.0]
        at org.apache.flink.shaded.akka.org.jboss.netty.channel.Channels.write(Channels.java:725) ~[flink-dist_2.11-1.13.0.jar:1.13.0]
        at org.apache.flink.shaded.akka.org.jboss.netty.handler.codec.oneone.OneToOneEncoder.doEncode(OneToOneEncoder.java:71) ~[flink-dist_2.11-1.13.0.jar:1.13.0]
        at org.apache.flink.shaded.akka.org.jboss.netty.handler.codec.oneone.OneToOneEncoder.handleDownstream(OneToOneEncoder.java:59) ~[flink-dist_2.11-1.13.0.jar:1.13.0]
        at org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipeline.sendDownstream(DefaultChannelPipeline.java:591) ~[flink-dist_2.11-1.13.0.jar:1.13.0]
        at org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipeline.sendDownstream(DefaultChannelPipeline.java:582) ~[flink-dist_2.11-1.13.0.jar:1.13.0]
        at org.apache.flink.shaded.akka.org.jboss.netty.channel.Channels.write(Channels.java:704) ~[flink-dist_2.11-1.13.0.jar:1.13.0]
        at org.apache.flink.shaded.akka.org.jboss.netty.channel.Channels.write(Channels.java:671) ~[flink-dist_2.11-1.13.0.jar:1.13.0]
        at org.apache.flink.shaded.akka.org.jboss.netty.channel.AbstractChannel.write(AbstractChannel.java:347) ~[flink-dist_2.11-1.13.0.jar:1.13.0]
        at akka.remote.transport.netty.NettyTransport$.gracefulClose(NettyTransport.scala:290) ~[flink-dist_2.11-1.13.0.jar:1.13.0]
        at akka.remote.transport.netty.TcpAssociationHandle.disassociate(TcpSupport.scala:103) ~[flink-dist_2.11-1.13.0.jar:1.13.0]
        at akka.remote.transport.AssociationHandle$class.disassociate(Transport.scala:282) ~[flink-dist_2.11-1.13.0.jar:1.13.0]
        at akka.remote.transport.netty.TcpAssociationHandle.disassociate(TcpSupport.scala:87) ~[flink-dist_2.11-1.13.0.jar:1.13.0]
        at akka.remote.transport.ProtocolStateActor$$anonfun$5.applyOrElse(AkkaProtocolTransport.scala:595) ~[flink-dist_2.11-1.13.0.jar:1.13.0]
        at akka.remote.transport.ProtocolStateActor$$anonfun$5.applyOrElse(AkkaProtocolTransport.scala:556) ~[flink-dist_2.11-1.13.0.jar:1.13.0]
        at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36) ~[flink-dist_2.11-1.13.0.jar:1.13.0]
        at akka.actor.FSM$class.terminate(FSM.scala:758) ~[flink-dist_2.11-1.13.0.jar:1.13.0]
        at akka.actor.FSM$class.applyState(FSM.scala:697) ~[flink-dist_2.11-1.13.0.jar:1.13.0]
        at akka.remote.transport.ProtocolStateActor.applyState(AkkaProtocolTransport.scala:286) ~[flink-dist_2.11-1.13.0.jar:1.13.0]
        at akka.actor.FSM$class.processEvent(FSM.scala:689) ~[flink-dist_2.11-1.13.0.jar:1.13.0]
        at akka.remote.transport.ProtocolStateActor.processEvent(AkkaProtocolTransport.scala:286) ~[flink-dist_2.11-1.13.0.jar:1.13.0]
        at akka.actor.FSM$class.akka$actor$FSM$$processMsg(FSM.scala:678) ~[flink-dist_2.11-1.13.0.jar:1.13.0]
        at akka.actor.FSM$$anonfun$receive$1.applyOrElse(FSM.scala:672) ~[flink-dist_2.11-1.13.0.jar:1.13.0]
        at akka.actor.Actor$class.aroundReceive(Actor.scala:517) ~[flink-dist_2.11-1.13.0.jar:1.13.0]
        at akka.remote.transport.ProtocolStateActor.aroundReceive(AkkaProtocolTransport.scala:286) ~[flink-dist_2.11-1.13.0.jar:1.13.0]
        at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) ~[flink-dist_2.11-1.13.0.jar:1.13.0]
        at akka.actor.ActorCell.invoke(ActorCell.scala:561) ~[flink-dist_2.11-1.13.0.jar:1.13.0]
        at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) ~[flink-dist_2.11-1.13.0.jar:1.13.0]
        at akka.dispatch.Mailbox.run(Mailbox.scala:225) ~[flink-dist_2.11-1.13.0.jar:1.13.0]
        at akka.dispatch.Mailbox.exec(Mailbox.scala:235) ~[flink-dist_2.11-1.13.0.jar:1.13.0]
        at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) ~[flink-dist_2.11-1.13.0.jar:1.13.0]
        at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) ~[flink-dist_2.11-1.13.0.jar:1.13.0]
        at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) ~[flink-dist_2.11-1.13.0.jar:1.13.0]
        at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) ~[flink-dist_2.11-1.13.0.jar:1.13.0]
```

I'm not sure if it's due to Readiness and Liveness probes in K8s. Any ideas would be appreciated!

Thanks,
Enrique