Yarn Kerberos issue

classic Classic list List threaded Threaded
13 messages Options
Reply | Threaded
Open this post in threaded view
|

Yarn Kerberos issue

Juan Gentile

Hello,

 

Im trying to submit a job (batch worcount) to a Yarn cluster. I’m trying to use delegation tokens and I’m getting the following error:

org.apache.flink.client.deployment.ClusterDeploymentException: Couldn't deploy Yarn session cluster

               at org.apache.flink.yarn.AbstractYarnClusterDescriptor.deploySessionCluster(AbstractYarnClusterDescriptor.java:423)

               at org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:262)

               at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:216)

               at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1053)

               at org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1129)

               at java.security.AccessController.doPrivileged(Native Method)

               at javax.security.auth.Subject.doAs(Subject.java:422)

               at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1920)

               at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)

               at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1129)

Caused by: org.apache.hadoop.ipc.RemoteException(java.io.IOException): Delegation Token can be issued only with kerberos or web authentication

               at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getDelegationToken(FSNamesystem.java:7560)

               at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.getDelegationToken(NameNodeRpcServer.java:548)

               at org.apache.hadoop.hdfs.server.namenode.AuthorizationProviderProxyClientProtocol.getDelegationToken(AuthorizationProviderProxyClientProtocol.java:663)

               at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.getDelegationToken(ClientNamenodeProtocolServerSideTranslatorPB.java:981)

               at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)

               at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:617)

               at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:1073)

               at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2221)

               at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2217)

               at java.security.AccessController.doPrivileged(Native Method)

               at javax.security.auth.Subject.doAs(Subject.java:422)

               at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1920)

               at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2215)

                at org.apache.hadoop.ipc.Client.call(Client.java:1472)

               at org.apache.hadoop.ipc.Client.call(Client.java:1409)

               at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:230)

               at com.sun.proxy.$Proxy18.getDelegationToken(Unknown Source)

               at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getDelegationToken(ClientNamenodeProtocolTranslatorPB.java:928)

               at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

               at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)

               at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

               at java.lang.reflect.Method.invoke(Method.java:498)

               at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:256)

               at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:104)

               at com.sun.proxy.$Proxy19.getDelegationToken(Unknown Source)

               at org.apache.hadoop.hdfs.DFSClient.getDelegationToken(DFSClient.java:1082)

               at org.apache.hadoop.hdfs.DistributedFileSystem.getDelegationToken(DistributedFileSystem.java:1499)

               at org.apache.hadoop.fs.FileSystem.collectDelegationTokens(FileSystem.java:546)

               at org.apache.hadoop.fs.FileSystem.collectDelegationTokens(FileSystem.java:557)

               at org.apache.hadoop.fs.FileSystem.addDelegationTokens(FileSystem.java:524)

               at org.apache.hadoop.mapreduce.security.TokenCache.obtainTokensForNamenodesInternal(TokenCache.java:140)

               at org.apache.hadoop.mapreduce.security.TokenCache.obtainTokensForNamenodesInternal(TokenCache.java:100)

               at org.apache.hadoop.mapreduce.security.TokenCache.obtainTokensForNamenodes(TokenCache.java:80)

               at org.apache.flink.yarn.Utils.setTokensFor(Utils.java:235)

               at org.apache.flink.yarn.AbstractYarnClusterDescriptor.startAppMaster(AbstractYarnClusterDescriptor.java:972)

               at org.apache.flink.yarn.AbstractYarnClusterDescriptor.deployInternal(AbstractYarnClusterDescriptor.java:545)

               at org.apache.flink.yarn.AbstractYarnClusterDescriptor.deploySessionCluster(AbstractYarnClusterDescriptor.java:416)

 

The kerberos configuration in this case is the default one. Then I tried with this option set to false ‘security.kerberos.login.use-ticket-cache‘ but I get the same error.

I was able to solve the problem by issuing a ticket (with kinit) but I’d like to know if it’s possible to make flink work with delegation tokens and if so what is the right config.

 

Thank you,

Juan

Reply | Threaded
Open this post in threaded view
|

Re: Yarn Kerberos issue

Chesnay Schepler
From what I understand from the documentation, if you want to use delegation tokens you always first have to issue a ticket using kinit; so you did everything correctly?

On 02/01/2020 13:00, Juan Gentile wrote:

Hello,

 

Im trying to submit a job (batch worcount) to a Yarn cluster. I’m trying to use delegation tokens and I’m getting the following error:

org.apache.flink.client.deployment.ClusterDeploymentException: Couldn't deploy Yarn session cluster

               at org.apache.flink.yarn.AbstractYarnClusterDescriptor.deploySessionCluster(AbstractYarnClusterDescriptor.java:423)

               at org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:262)

               at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:216)

               at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1053)

               at org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1129)

               at java.security.AccessController.doPrivileged(Native Method)

               at javax.security.auth.Subject.doAs(Subject.java:422)

               at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1920)

               at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)

               at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1129)

Caused by: org.apache.hadoop.ipc.RemoteException(java.io.IOException): Delegation Token can be issued only with kerberos or web authentication

               at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getDelegationToken(FSNamesystem.java:7560)

               at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.getDelegationToken(NameNodeRpcServer.java:548)

               at org.apache.hadoop.hdfs.server.namenode.AuthorizationProviderProxyClientProtocol.getDelegationToken(AuthorizationProviderProxyClientProtocol.java:663)

               at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.getDelegationToken(ClientNamenodeProtocolServerSideTranslatorPB.java:981)

               at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)

               at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:617)

               at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:1073)

               at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2221)

               at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2217)

               at java.security.AccessController.doPrivileged(Native Method)

               at javax.security.auth.Subject.doAs(Subject.java:422)

               at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1920)

               at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2215)

                at org.apache.hadoop.ipc.Client.call(Client.java:1472)

               at org.apache.hadoop.ipc.Client.call(Client.java:1409)

               at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:230)

               at com.sun.proxy.$Proxy18.getDelegationToken(Unknown Source)

               at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getDelegationToken(ClientNamenodeProtocolTranslatorPB.java:928)

               at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

               at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)

               at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

               at java.lang.reflect.Method.invoke(Method.java:498)

               at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:256)

               at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:104)

               at com.sun.proxy.$Proxy19.getDelegationToken(Unknown Source)

               at org.apache.hadoop.hdfs.DFSClient.getDelegationToken(DFSClient.java:1082)

               at org.apache.hadoop.hdfs.DistributedFileSystem.getDelegationToken(DistributedFileSystem.java:1499)

               at org.apache.hadoop.fs.FileSystem.collectDelegationTokens(FileSystem.java:546)

               at org.apache.hadoop.fs.FileSystem.collectDelegationTokens(FileSystem.java:557)

               at org.apache.hadoop.fs.FileSystem.addDelegationTokens(FileSystem.java:524)

               at org.apache.hadoop.mapreduce.security.TokenCache.obtainTokensForNamenodesInternal(TokenCache.java:140)

               at org.apache.hadoop.mapreduce.security.TokenCache.obtainTokensForNamenodesInternal(TokenCache.java:100)

               at org.apache.hadoop.mapreduce.security.TokenCache.obtainTokensForNamenodes(TokenCache.java:80)

               at org.apache.flink.yarn.Utils.setTokensFor(Utils.java:235)

               at org.apache.flink.yarn.AbstractYarnClusterDescriptor.startAppMaster(AbstractYarnClusterDescriptor.java:972)

               at org.apache.flink.yarn.AbstractYarnClusterDescriptor.deployInternal(AbstractYarnClusterDescriptor.java:545)

               at org.apache.flink.yarn.AbstractYarnClusterDescriptor.deploySessionCluster(AbstractYarnClusterDescriptor.java:416)

 

The kerberos configuration in this case is the default one. Then I tried with this option set to false ‘security.kerberos.login.use-ticket-cache‘ but I get the same error.

I was able to solve the problem by issuing a ticket (with kinit) but I’d like to know if it’s possible to make flink work with delegation tokens and if so what is the right config.

 

Thank you,

Juan


Reply | Threaded
Open this post in threaded view
|

Re: Yarn Kerberos issue

Rong Rong
Hi Juan,

Chesnay was right. If you are using CLI to launch your session cluster based on the document [1], you following the instruction to use kinit [2] first seems to be one of the right way to go. 
Another way of approaching it is to setup the kerberos settings in the flink-conf.yaml file [3]. FlinkYarnSessionCli will be able to pick up your keytab files and run the CLI securely. 

As far as I know the option `security.kerberos.login.use-ticket-cache` doesn't actually change the behavior of the authentication process, it is more of a hint whether to use the ticket cache instantiated by `kinit`. If you disable using the ticket cache, you will have to use the "keytab/principle" approach - this doc [4] might be helpful to explain better.

Thanks,
Rong



On Fri, Jan 3, 2020 at 7:20 AM Chesnay Schepler <[hidden email]> wrote:
From what I understand from the documentation, if you want to use delegation tokens you always first have to issue a ticket using kinit; so you did everything correctly?

On 02/01/2020 13:00, Juan Gentile wrote:

Hello,

 

Im trying to submit a job (batch worcount) to a Yarn cluster. I’m trying to use delegation tokens and I’m getting the following error:

org.apache.flink.client.deployment.ClusterDeploymentException: Couldn't deploy Yarn session cluster

               at org.apache.flink.yarn.AbstractYarnClusterDescriptor.deploySessionCluster(AbstractYarnClusterDescriptor.java:423)

               at org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:262)

               at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:216)

               at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1053)

               at org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1129)

               at java.security.AccessController.doPrivileged(Native Method)

               at javax.security.auth.Subject.doAs(Subject.java:422)

               at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1920)

               at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)

               at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1129)

Caused by: org.apache.hadoop.ipc.RemoteException(java.io.IOException): Delegation Token can be issued only with kerberos or web authentication

               at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getDelegationToken(FSNamesystem.java:7560)

               at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.getDelegationToken(NameNodeRpcServer.java:548)

               at org.apache.hadoop.hdfs.server.namenode.AuthorizationProviderProxyClientProtocol.getDelegationToken(AuthorizationProviderProxyClientProtocol.java:663)

               at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.getDelegationToken(ClientNamenodeProtocolServerSideTranslatorPB.java:981)

               at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)

               at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:617)

               at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:1073)

               at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2221)

               at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2217)

               at java.security.AccessController.doPrivileged(Native Method)

               at javax.security.auth.Subject.doAs(Subject.java:422)

               at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1920)

               at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2215)

                at org.apache.hadoop.ipc.Client.call(Client.java:1472)

               at org.apache.hadoop.ipc.Client.call(Client.java:1409)

               at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:230)

               at com.sun.proxy.$Proxy18.getDelegationToken(Unknown Source)

               at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getDelegationToken(ClientNamenodeProtocolTranslatorPB.java:928)

               at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

               at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)

               at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

               at java.lang.reflect.Method.invoke(Method.java:498)

               at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:256)

               at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:104)

               at com.sun.proxy.$Proxy19.getDelegationToken(Unknown Source)

               at org.apache.hadoop.hdfs.DFSClient.getDelegationToken(DFSClient.java:1082)

               at org.apache.hadoop.hdfs.DistributedFileSystem.getDelegationToken(DistributedFileSystem.java:1499)

               at org.apache.hadoop.fs.FileSystem.collectDelegationTokens(FileSystem.java:546)

               at org.apache.hadoop.fs.FileSystem.collectDelegationTokens(FileSystem.java:557)

               at org.apache.hadoop.fs.FileSystem.addDelegationTokens(FileSystem.java:524)

               at org.apache.hadoop.mapreduce.security.TokenCache.obtainTokensForNamenodesInternal(TokenCache.java:140)

               at org.apache.hadoop.mapreduce.security.TokenCache.obtainTokensForNamenodesInternal(TokenCache.java:100)

               at org.apache.hadoop.mapreduce.security.TokenCache.obtainTokensForNamenodes(TokenCache.java:80)

               at org.apache.flink.yarn.Utils.setTokensFor(Utils.java:235)

               at org.apache.flink.yarn.AbstractYarnClusterDescriptor.startAppMaster(AbstractYarnClusterDescriptor.java:972)

               at org.apache.flink.yarn.AbstractYarnClusterDescriptor.deployInternal(AbstractYarnClusterDescriptor.java:545)

               at org.apache.flink.yarn.AbstractYarnClusterDescriptor.deploySessionCluster(AbstractYarnClusterDescriptor.java:416)

 

The kerberos configuration in this case is the default one. Then I tried with this option set to false ‘security.kerberos.login.use-ticket-cache‘ but I get the same error.

I was able to solve the problem by issuing a ticket (with kinit) but I’d like to know if it’s possible to make flink work with delegation tokens and if so what is the right config.

 

Thank you,

Juan


Reply | Threaded
Open this post in threaded view
|

Re: Yarn Kerberos issue

Juan Gentile

Hello Rong, Chesnay,

 

Thank you for your answer, the way we are trying to launch the job is through a scheduler (similar to oozie) where we have a keytab for the scheduler user and with that keytab we get delegation tokens impersonating the right user (owner of the job). But the only way I was able to make this work is by getting a ticket (through kinit).

As a comparison, if I launch a spark job (without doing kinit) just with the delegation tokens, it works okay. So I guess Spark does something extra.

This is as far as I could go but at this point I’m not sure if this is something just not supported by Flink or I’m doing something wrong.

 

Thank you,

Juan

 

From: Rong Rong <[hidden email]>
Date: Saturday, January 4, 2020 at 6:06 PM
To: Chesnay Schepler <[hidden email]>
Cc: Juan Gentile <[hidden email]>, "[hidden email]" <[hidden email]>, Oleksandr Nitavskyi <[hidden email]>
Subject: Re: Yarn Kerberos issue

 

Hi Juan,

 

Chesnay was right. If you are using CLI to launch your session cluster based on the document [1], you following the instruction to use kinit [2] first seems to be one of the right way to go. 

Another way of approaching it is to setup the kerberos settings in the flink-conf.yaml file [3]. FlinkYarnSessionCli will be able to pick up your keytab files and run the CLI securely. 

 

As far as I know the option `security.kerberos.login.use-ticket-cache` doesn't actually change the behavior of the authentication process, it is more of a hint whether to use the ticket cache instantiated by `kinit`. If you disable using the ticket cache, you will have to use the "keytab/principle" approach - this doc [4] might be helpful to explain better.

 

Thanks,

Rong

 

 

 

On Fri, Jan 3, 2020 at 7:20 AM Chesnay Schepler <[hidden email]> wrote:

From what I understand from the documentation, if you want to use delegation tokens you always first have to issue a ticket using kinit; so you did everything correctly?

 

On 02/01/2020 13:00, Juan Gentile wrote:

Hello,

 

Im trying to submit a job (batch worcount) to a Yarn cluster. I’m trying to use delegation tokens and I’m getting the following error:

org.apache.flink.client.deployment.ClusterDeploymentException: Couldn't deploy Yarn session cluster

               at org.apache.flink.yarn.AbstractYarnClusterDescriptor.deploySessionCluster(AbstractYarnClusterDescriptor.java:423)

               at org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:262)

               at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:216)

               at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1053)

               at org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1129)

               at java.security.AccessController.doPrivileged(Native Method)

               at javax.security.auth.Subject.doAs(Subject.java:422)

               at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1920)

               at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)

               at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1129)

Caused by: org.apache.hadoop.ipc.RemoteException(java.io.IOException): Delegation Token can be issued only with kerberos or web authentication

               at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getDelegationToken(FSNamesystem.java:7560)

               at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.getDelegationToken(NameNodeRpcServer.java:548)

               at org.apache.hadoop.hdfs.server.namenode.AuthorizationProviderProxyClientProtocol.getDelegationToken(AuthorizationProviderProxyClientProtocol.java:663)

               at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.getDelegationToken(ClientNamenodeProtocolServerSideTranslatorPB.java:981)

               at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)

               at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:617)

               at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:1073)

               at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2221)

               at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2217)

               at java.security.AccessController.doPrivileged(Native Method)

               at javax.security.auth.Subject.doAs(Subject.java:422)

               at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1920)

               at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2215)

                at org.apache.hadoop.ipc.Client.call(Client.java:1472)

               at org.apache.hadoop.ipc.Client.call(Client.java:1409)

               at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:230)

               at com.sun.proxy.$Proxy18.getDelegationToken(Unknown Source)

               at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getDelegationToken(ClientNamenodeProtocolTranslatorPB.java:928)

               at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

               at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)

               at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

               at java.lang.reflect.Method.invoke(Method.java:498)

               at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:256)

               at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:104)

               at com.sun.proxy.$Proxy19.getDelegationToken(Unknown Source)

               at org.apache.hadoop.hdfs.DFSClient.getDelegationToken(DFSClient.java:1082)

               at org.apache.hadoop.hdfs.DistributedFileSystem.getDelegationToken(DistributedFileSystem.java:1499)

               at org.apache.hadoop.fs.FileSystem.collectDelegationTokens(FileSystem.java:546)

               at org.apache.hadoop.fs.FileSystem.collectDelegationTokens(FileSystem.java:557)

               at org.apache.hadoop.fs.FileSystem.addDelegationTokens(FileSystem.java:524)

               at org.apache.hadoop.mapreduce.security.TokenCache.obtainTokensForNamenodesInternal(TokenCache.java:140)

               at org.apache.hadoop.mapreduce.security.TokenCache.obtainTokensForNamenodesInternal(TokenCache.java:100)

               at org.apache.hadoop.mapreduce.security.TokenCache.obtainTokensForNamenodes(TokenCache.java:80)

               at org.apache.flink.yarn.Utils.setTokensFor(Utils.java:235)

               at org.apache.flink.yarn.AbstractYarnClusterDescriptor.startAppMaster(AbstractYarnClusterDescriptor.java:972)

               at org.apache.flink.yarn.AbstractYarnClusterDescriptor.deployInternal(AbstractYarnClusterDescriptor.java:545)

               at org.apache.flink.yarn.AbstractYarnClusterDescriptor.deploySessionCluster(AbstractYarnClusterDescriptor.java:416)

 

The kerberos configuration in this case is the default one. Then I tried with this option set to false ‘security.kerberos.login.use-ticket-cache‘ but I get the same error.

I was able to solve the problem by issuing a ticket (with kinit) but I’d like to know if it’s possible to make flink work with delegation tokens and if so what is the right config.

 

Thank you,

Juan

 

Reply | Threaded
Open this post in threaded view
|

Re: Yarn Kerberos issue

Yang Wang
I guess you have set some kerberos related configuration in spark jobs. For Flink, you need
to do this too by the following configs. And the keytab file should existed on Flink client. In your
environment, it means the scheduler(oozie) could access the keytab file.

security.kerberos.login.keytab
security.kerberos.login.principal
security.kerberos.login.contexts



Best,
Yang

Juan Gentile <[hidden email]> 于2020年1月6日周一 下午3:55写道:

Hello Rong, Chesnay,

 

Thank you for your answer, the way we are trying to launch the job is through a scheduler (similar to oozie) where we have a keytab for the scheduler user and with that keytab we get delegation tokens impersonating the right user (owner of the job). But the only way I was able to make this work is by getting a ticket (through kinit).

As a comparison, if I launch a spark job (without doing kinit) just with the delegation tokens, it works okay. So I guess Spark does something extra.

This is as far as I could go but at this point I’m not sure if this is something just not supported by Flink or I’m doing something wrong.

 

Thank you,

Juan

 

From: Rong Rong <[hidden email]>
Date: Saturday, January 4, 2020 at 6:06 PM
To: Chesnay Schepler <[hidden email]>
Cc: Juan Gentile <[hidden email]>, "[hidden email]" <[hidden email]>, Oleksandr Nitavskyi <[hidden email]>
Subject: Re: Yarn Kerberos issue

 

Hi Juan,

 

Chesnay was right. If you are using CLI to launch your session cluster based on the document [1], you following the instruction to use kinit [2] first seems to be one of the right way to go. 

Another way of approaching it is to setup the kerberos settings in the flink-conf.yaml file [3]. FlinkYarnSessionCli will be able to pick up your keytab files and run the CLI securely. 

 

As far as I know the option `security.kerberos.login.use-ticket-cache` doesn't actually change the behavior of the authentication process, it is more of a hint whether to use the ticket cache instantiated by `kinit`. If you disable using the ticket cache, you will have to use the "keytab/principle" approach - this doc [4] might be helpful to explain better.

 

Thanks,

Rong

 

 

 

On Fri, Jan 3, 2020 at 7:20 AM Chesnay Schepler <[hidden email]> wrote:

From what I understand from the documentation, if you want to use delegation tokens you always first have to issue a ticket using kinit; so you did everything correctly?

 

On 02/01/2020 13:00, Juan Gentile wrote:

Hello,

 

Im trying to submit a job (batch worcount) to a Yarn cluster. I’m trying to use delegation tokens and I’m getting the following error:

org.apache.flink.client.deployment.ClusterDeploymentException: Couldn't deploy Yarn session cluster

               at org.apache.flink.yarn.AbstractYarnClusterDescriptor.deploySessionCluster(AbstractYarnClusterDescriptor.java:423)

               at org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:262)

               at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:216)

               at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1053)

               at org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1129)

               at java.security.AccessController.doPrivileged(Native Method)

               at javax.security.auth.Subject.doAs(Subject.java:422)

               at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1920)

               at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)

               at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1129)

Caused by: org.apache.hadoop.ipc.RemoteException(java.io.IOException): Delegation Token can be issued only with kerberos or web authentication

               at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getDelegationToken(FSNamesystem.java:7560)

               at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.getDelegationToken(NameNodeRpcServer.java:548)

               at org.apache.hadoop.hdfs.server.namenode.AuthorizationProviderProxyClientProtocol.getDelegationToken(AuthorizationProviderProxyClientProtocol.java:663)

               at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.getDelegationToken(ClientNamenodeProtocolServerSideTranslatorPB.java:981)

               at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)

               at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:617)

               at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:1073)

               at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2221)

               at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2217)

               at java.security.AccessController.doPrivileged(Native Method)

               at javax.security.auth.Subject.doAs(Subject.java:422)

               at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1920)

               at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2215)

                at org.apache.hadoop.ipc.Client.call(Client.java:1472)

               at org.apache.hadoop.ipc.Client.call(Client.java:1409)

               at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:230)

               at com.sun.proxy.$Proxy18.getDelegationToken(Unknown Source)

               at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getDelegationToken(ClientNamenodeProtocolTranslatorPB.java:928)

               at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

               at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)

               at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

               at java.lang.reflect.Method.invoke(Method.java:498)

               at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:256)

               at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:104)

               at com.sun.proxy.$Proxy19.getDelegationToken(Unknown Source)

               at org.apache.hadoop.hdfs.DFSClient.getDelegationToken(DFSClient.java:1082)

               at org.apache.hadoop.hdfs.DistributedFileSystem.getDelegationToken(DistributedFileSystem.java:1499)

               at org.apache.hadoop.fs.FileSystem.collectDelegationTokens(FileSystem.java:546)

               at org.apache.hadoop.fs.FileSystem.collectDelegationTokens(FileSystem.java:557)

               at org.apache.hadoop.fs.FileSystem.addDelegationTokens(FileSystem.java:524)

               at org.apache.hadoop.mapreduce.security.TokenCache.obtainTokensForNamenodesInternal(TokenCache.java:140)

               at org.apache.hadoop.mapreduce.security.TokenCache.obtainTokensForNamenodesInternal(TokenCache.java:100)

               at org.apache.hadoop.mapreduce.security.TokenCache.obtainTokensForNamenodes(TokenCache.java:80)

               at org.apache.flink.yarn.Utils.setTokensFor(Utils.java:235)

               at org.apache.flink.yarn.AbstractYarnClusterDescriptor.startAppMaster(AbstractYarnClusterDescriptor.java:972)

               at org.apache.flink.yarn.AbstractYarnClusterDescriptor.deployInternal(AbstractYarnClusterDescriptor.java:545)

               at org.apache.flink.yarn.AbstractYarnClusterDescriptor.deploySessionCluster(AbstractYarnClusterDescriptor.java:416)

 

The kerberos configuration in this case is the default one. Then I tried with this option set to false ‘security.kerberos.login.use-ticket-cache‘ but I get the same error.

I was able to solve the problem by issuing a ticket (with kinit) but I’d like to know if it’s possible to make flink work with delegation tokens and if so what is the right config.

 

Thank you,

Juan

 

Reply | Threaded
Open this post in threaded view
|

Re: Yarn Kerberos issue

Aljoscha Krettek
Hi Juan,

to summarize and clarify various emails: currently, you can only use
Kerberos authentication via tickets (i.e. kinit) or keytab. The relevant
bit of code is in the Hadoop security module: [1]. Here you can see that
we either use keytab.

I think we should be able to extend this to also work with delegation
tokens (DTs). In Spark, how do you pass the DTs to the system as a user?

Best,
Aljoscha

[1]
https://github.com/apache/flink/blob/64e2f27640946bf3b1608d4d85585fe18891dcee/flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/HadoopModule.java#L68

On 06.01.20 09:52, Yang Wang wrote:

> I guess you have set some kerberos related configuration in spark jobs. For
> Flink, you need
> to do this too by the following configs. And the keytab file should existed
> on Flink client. In your
> environment, it means the scheduler(oozie) could access the keytab file.
>
> security.kerberos.login.keytab
> security.kerberos.login.principal
> security.kerberos.login.contexts
>
>
>
> Best,
> Yang
>
> Juan Gentile <[hidden email]> 于2020年1月6日周一 下午3:55写道:
>
>> Hello Rong, Chesnay,
>>
>>
>>
>> Thank you for your answer, the way we are trying to launch the job is
>> through a scheduler (similar to oozie) where we have a keytab for the
>> scheduler user and with that keytab we get delegation tokens impersonating
>> the right user (owner of the job). But the only way I was able to make this
>> work is by getting a ticket (through kinit).
>>
>> As a comparison, if I launch a spark job (without doing kinit) just with
>> the delegation tokens, it works okay. So I guess Spark does something extra.
>>
>> This is as far as I could go but at this point I’m not sure if this is
>> something just not supported by Flink or I’m doing something wrong.
>>
>>
>>
>> Thank you,
>>
>> Juan
>>
>>
>>
>> *From: *Rong Rong <[hidden email]>
>> *Date: *Saturday, January 4, 2020 at 6:06 PM
>> *To: *Chesnay Schepler <[hidden email]>
>> *Cc: *Juan Gentile <[hidden email]>, "[hidden email]" <
>> [hidden email]>, Oleksandr Nitavskyi <[hidden email]>
>> *Subject: *Re: Yarn Kerberos issue
>>
>>
>>
>> Hi Juan,
>>
>>
>>
>> Chesnay was right. If you are using CLI to launch your session cluster
>> based on the document [1], you following the instruction to use kinit [2]
>> first seems to be one of the right way to go.
>>
>> Another way of approaching it is to setup the kerberos settings in the
>> flink-conf.yaml file [3]. FlinkYarnSessionCli will be able to pick up your
>> keytab files and run the CLI securely.
>>
>>
>>
>> As far as I know the option `security.kerberos.login.use-ticket-cache`
>> doesn't actually change the behavior of the authentication process, it is
>> more of a hint whether to use the ticket cache instantiated by `kinit`. If
>> you disable using the ticket cache, you will have to use the
>> "keytab/principle" approach - this doc [4] might be helpful to explain
>> better.
>>
>>
>>
>> Thanks,
>>
>> Rong
>>
>>
>>
>>
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-stable/ops/deployment/yarn_setup.html#start-flink-session
>> <https://eur03.safelinks.protection.outlook.com/?url=https%3A%2F%2Fci.apache.org%2Fprojects%2Fflink%2Fflink-docs-stable%2Fops%2Fdeployment%2Fyarn_setup.html%23start-flink-session&data=02%7C01%7Cj.gentile%40criteo.com%7C62d8034f25d94e52ccb008d791387e31%7C2a35d8fd574d48e3927c8c398e225a01%7C1%7C0%7C637137544137797845&sdata=d6QcPdLPDolW0Nv4jo469HyxP99E5mEGBOUjVw219a0%3D&reserved=0>
>>
>> [2]
>> https://ci.apache.org/projects/flink/flink-docs-stable/ops/security-kerberos.html#using-kinit-yarn-only
>> <https://eur03.safelinks.protection.outlook.com/?url=https%3A%2F%2Fci.apache.org%2Fprojects%2Fflink%2Fflink-docs-stable%2Fops%2Fsecurity-kerberos.html%23using-kinit-yarn-only&data=02%7C01%7Cj.gentile%40criteo.com%7C62d8034f25d94e52ccb008d791387e31%7C2a35d8fd574d48e3927c8c398e225a01%7C1%7C0%7C637137544137807833&sdata=pPCm4%2BNzJ6oQ0tA5%2B7uSLR3BuGAxbJiCI8xs1nc355Y%3D&reserved=0>
>>
>> [3]
>> https://ci.apache.org/projects/flink/flink-docs-stable/ops/security-kerberos.html#yarnmesos-mode
>> <https://eur03.safelinks.protection.outlook.com/?url=https%3A%2F%2Fci.apache.org%2Fprojects%2Fflink%2Fflink-docs-stable%2Fops%2Fsecurity-kerberos.html%23yarnmesos-mode&data=02%7C01%7Cj.gentile%40criteo.com%7C62d8034f25d94e52ccb008d791387e31%7C2a35d8fd574d48e3927c8c398e225a01%7C1%7C0%7C637137544137807833&sdata=rsVxD%2B3QteiNPaIRQriF3lTKV22Rxk7TyU0hbCDr9pk%3D&reserved=0>
>>
>> [4]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/connectors/kafka.html#enabling-kerberos-authentication-for-versions-09-and-above-only
>> <https://eur03.safelinks.protection.outlook.com/?url=https%3A%2F%2Fci.apache.org%2Fprojects%2Fflink%2Fflink-docs-release-1.9%2Fdev%2Fconnectors%2Fkafka.html%23enabling-kerberos-authentication-for-versions-09-and-above-only&data=02%7C01%7Cj.gentile%40criteo.com%7C62d8034f25d94e52ccb008d791387e31%7C2a35d8fd574d48e3927c8c398e225a01%7C1%7C0%7C637137544137807833&sdata=urJNklA%2Bz9IVV7k0H%2Fp8k5NWm526wHcXiw2qY4tiI7g%3D&reserved=0>
>>
>>
>>
>> On Fri, Jan 3, 2020 at 7:20 AM Chesnay Schepler <[hidden email]>
>> wrote:
>>
>>  From what I understand from the documentation, if you want to use
>> delegation tokens you always first have to issue a ticket using kinit; so
>> you did everything correctly?
>>
>>
>>
>> On 02/01/2020 13:00, Juan Gentile wrote:
>>
>> Hello,
>>
>>
>>
>> Im trying to submit a job (batch worcount) to a Yarn cluster. I’m trying
>> to use delegation tokens and I’m getting the following error:
>>
>> *org.apache.flink.client.deployment.ClusterDeploymentException: Couldn't
>> deploy Yarn session cluster*
>>
>> *               at
>> org.apache.flink.yarn.AbstractYarnClusterDescriptor.deploySessionCluster(AbstractYarnClusterDescriptor.java:423)*
>>
>> *               at
>> org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:262)*
>>
>> *               at
>> org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:216)*
>>
>> *               at
>> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1053)*
>>
>> *               at
>> org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1129)*
>>
>> *               at java.security.AccessController.doPrivileged(Native
>> Method)*
>>
>> *               at javax.security.auth.Subject.doAs(Subject.java:422)*
>>
>> *               at
>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1920)*
>>
>> *               at
>> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)*
>>
>> *               at
>> org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1129)*
>>
>> *Caused by: org.apache.hadoop.ipc.RemoteException(java.io.IOException):
>> Delegation Token can be issued only with kerberos or web authentication*
>>
>> *               at
>> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getDelegationToken(FSNamesystem.java:7560)*
>>
>> *               at
>> org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.getDelegationToken(NameNodeRpcServer.java:548)*
>>
>> *               at
>> org.apache.hadoop.hdfs.server.namenode.AuthorizationProviderProxyClientProtocol.getDelegationToken(AuthorizationProviderProxyClientProtocol.java:663)*
>>
>> *               at
>> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.getDelegationToken(ClientNamenodeProtocolServerSideTranslatorPB.java:981)*
>>
>> *               at
>> org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)*
>>
>> *               at
>> org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:617)*
>>
>> *               at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:1073)*
>>
>> *               at
>> org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2221)*
>>
>> *               at
>> org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2217)*
>>
>> *               at java.security.AccessController.doPrivileged(Native
>> Method)*
>>
>> *               at javax.security.auth.Subject.doAs(Subject.java:422)*
>>
>> *               at
>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1920)*
>>
>> *               at
>> org.apache.hadoop.ipc.Server$Handler.run(Server.java:2215) *
>>
>> *                at org.apache.hadoop.ipc.Client.call(Client.java:1472)*
>>
>> *               at org.apache.hadoop.ipc.Client.call(Client.java:1409)*
>>
>> *               at
>> org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:230)*
>>
>> *               at com.sun.proxy.$Proxy18.getDelegationToken(Unknown
>> Source)*
>>
>> *               at
>> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getDelegationToken(ClientNamenodeProtocolTranslatorPB.java:928)*
>>
>> *               at sun.reflect.NativeMethodAccessorImpl.invoke0(Native
>> Method)*
>>
>> *               at
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)*
>>
>> *               at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)*
>>
>> *               at java.lang.reflect.Method.invoke(Method.java:498)*
>>
>> *               at
>> org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:256)*
>>
>> *               at
>> org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:104)*
>>
>> *               at com.sun.proxy.$Proxy19.getDelegationToken(Unknown
>> Source)*
>>
>> *               at
>> org.apache.hadoop.hdfs.DFSClient.getDelegationToken(DFSClient.java:1082)*
>>
>> *               at
>> org.apache.hadoop.hdfs.DistributedFileSystem.getDelegationToken(DistributedFileSystem.java:1499)*
>>
>> *               at
>> org.apache.hadoop.fs.FileSystem.collectDelegationTokens(FileSystem.java:546)*
>>
>> *               at
>> org.apache.hadoop.fs.FileSystem.collectDelegationTokens(FileSystem.java:557)*
>>
>> *               at
>> org.apache.hadoop.fs.FileSystem.addDelegationTokens(FileSystem.java:524)*
>>
>> *               at
>> org.apache.hadoop.mapreduce.security.TokenCache.obtainTokensForNamenodesInternal(TokenCache.java:140)*
>>
>> *               at
>> org.apache.hadoop.mapreduce.security.TokenCache.obtainTokensForNamenodesInternal(TokenCache.java:100)*
>>
>> *               at
>> org.apache.hadoop.mapreduce.security.TokenCache.obtainTokensForNamenodes(TokenCache.java:80)*
>>
>> *               at
>> org.apache.flink.yarn.Utils.setTokensFor(Utils.java:235)*
>>
>> *               at
>> org.apache.flink.yarn.AbstractYarnClusterDescriptor.startAppMaster(AbstractYarnClusterDescriptor.java:972)*
>>
>> *               at
>> org.apache.flink.yarn.AbstractYarnClusterDescriptor.deployInternal(AbstractYarnClusterDescriptor.java:545)*
>>
>> *               at
>> org.apache.flink.yarn.AbstractYarnClusterDescriptor.deploySessionCluster(AbstractYarnClusterDescriptor.java:416)*
>>
>>
>>
>> The kerberos configuration in this case is the default one. Then I tried
>> with this option set to false ‘security.kerberos.login.use-ticket-cache‘
>> but I get the same error.
>>
>> I was able to solve the problem by issuing a ticket (with kinit) but I’d
>> like to know if it’s possible to make flink work with delegation tokens and
>> if so what is the right config.
>>
>>
>>
>> Thank you,
>>
>> Juan
>>
>>
>>
>>
>
Reply | Threaded
Open this post in threaded view
|

Re: Yarn Kerberos issue

Aljoscha Krettek
Hi,

it seems I hin send to early, my mail was missing a small part. This is
the full mail again:

to summarize and clarify various emails: currently, you can only use
Kerberos authentication via tickets (i.e. kinit) or keytab. The relevant
bit of code is in the Hadoop security module [1]. Here you can see that
we either use keytab or try to login as a user.

I think we should be able to extend this to also work with delegation
tokens (DTs). We would need to modify the SecurityUtils [2] to create a
UGI from tokens, i.e. UserGroupInformation.createRemoteUser() and then
addCreds() or addToken(). In Spark, how do you pass the DTs to the
system as a user?

Best,
Aljoscha

[1]
https://github.com/apache/flink/blob/64e2f27640946bf3b1608d4d85585fe18891dcee/flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/HadoopModule.java#L68
[2]
https://github.com/apache/flink/blob/2bbf4e5002d7657018ba0b53b7a3ed8ee3124da8/flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityUtils.java#L89

On 10.01.20 15:02, Aljoscha Krettek wrote:

> Hi Juan,
>
> to summarize and clarify various emails: currently, you can only use
> Kerberos authentication via tickets (i.e. kinit) or keytab. The relevant
> bit of code is in the Hadoop security module: [1]. Here you can see that
> we either use keytab.
>
> I think we should be able to extend this to also work with delegation
> tokens (DTs). In Spark, how do you pass the DTs to the system as a user?
>
> Best,
> Aljoscha
>
> [1]
> https://github.com/apache/flink/blob/64e2f27640946bf3b1608d4d85585fe18891dcee/flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/HadoopModule.java#L68 
>
>
> On 06.01.20 09:52, Yang Wang wrote:
>> I guess you have set some kerberos related configuration in spark
>> jobs. For
>> Flink, you need
>> to do this too by the following configs. And the keytab file should
>> existed
>> on Flink client. In your
>> environment, it means the scheduler(oozie) could access the keytab file.
>>
>> security.kerberos.login.keytab
>> security.kerberos.login.principal
>> security.kerberos.login.contexts
>>
>>
>>
>> Best,
>> Yang
>>
>> Juan Gentile <[hidden email]> 于2020年1月6日周一 下午3:55写道:
>>
>>> Hello Rong, Chesnay,
>>>
>>>
>>>
>>> Thank you for your answer, the way we are trying to launch the job is
>>> through a scheduler (similar to oozie) where we have a keytab for the
>>> scheduler user and with that keytab we get delegation tokens
>>> impersonating
>>> the right user (owner of the job). But the only way I was able to
>>> make this
>>> work is by getting a ticket (through kinit).
>>>
>>> As a comparison, if I launch a spark job (without doing kinit) just with
>>> the delegation tokens, it works okay. So I guess Spark does something
>>> extra.
>>>
>>> This is as far as I could go but at this point I’m not sure if this is
>>> something just not supported by Flink or I’m doing something wrong.
>>>
>>>
>>>
>>> Thank you,
>>>
>>> Juan
>>>
>>>
>>>
>>> *From: *Rong Rong <[hidden email]>
>>> *Date: *Saturday, January 4, 2020 at 6:06 PM
>>> *To: *Chesnay Schepler <[hidden email]>
>>> *Cc: *Juan Gentile <[hidden email]>, "[hidden email]" <
>>> [hidden email]>, Oleksandr Nitavskyi <[hidden email]>
>>> *Subject: *Re: Yarn Kerberos issue
>>>
>>>
>>>
>>> Hi Juan,
>>>
>>>
>>>
>>> Chesnay was right. If you are using CLI to launch your session cluster
>>> based on the document [1], you following the instruction to use kinit
>>> [2]
>>> first seems to be one of the right way to go.
>>>
>>> Another way of approaching it is to setup the kerberos settings in the
>>> flink-conf.yaml file [3]. FlinkYarnSessionCli will be able to pick up
>>> your
>>> keytab files and run the CLI securely.
>>>
>>>
>>>
>>> As far as I know the option `security.kerberos.login.use-ticket-cache`
>>> doesn't actually change the behavior of the authentication process,
>>> it is
>>> more of a hint whether to use the ticket cache instantiated by
>>> `kinit`. If
>>> you disable using the ticket cache, you will have to use the
>>> "keytab/principle" approach - this doc [4] might be helpful to explain
>>> better.
>>>
>>>
>>>
>>> Thanks,
>>>
>>> Rong
>>>
>>>
>>>
>>>
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-stable/ops/deployment/yarn_setup.html#start-flink-session 
>>>
>>> <https://eur03.safelinks.protection.outlook.com/?url=https%3A%2F%2Fci.apache.org%2Fprojects%2Fflink%2Fflink-docs-stable%2Fops%2Fdeployment%2Fyarn_setup.html%23start-flink-session&data=02%7C01%7Cj.gentile%40criteo.com%7C62d8034f25d94e52ccb008d791387e31%7C2a35d8fd574d48e3927c8c398e225a01%7C1%7C0%7C637137544137797845&sdata=d6QcPdLPDolW0Nv4jo469HyxP99E5mEGBOUjVw219a0%3D&reserved=0>
>>>
>>>
>>> [2]
>>> https://ci.apache.org/projects/flink/flink-docs-stable/ops/security-kerberos.html#using-kinit-yarn-only 
>>>
>>> <https://eur03.safelinks.protection.outlook.com/?url=https%3A%2F%2Fci.apache.org%2Fprojects%2Fflink%2Fflink-docs-stable%2Fops%2Fsecurity-kerberos.html%23using-kinit-yarn-only&data=02%7C01%7Cj.gentile%40criteo.com%7C62d8034f25d94e52ccb008d791387e31%7C2a35d8fd574d48e3927c8c398e225a01%7C1%7C0%7C637137544137807833&sdata=pPCm4%2BNzJ6oQ0tA5%2B7uSLR3BuGAxbJiCI8xs1nc355Y%3D&reserved=0>
>>>
>>>
>>> [3]
>>> https://ci.apache.org/projects/flink/flink-docs-stable/ops/security-kerberos.html#yarnmesos-mode 
>>>
>>> <https://eur03.safelinks.protection.outlook.com/?url=https%3A%2F%2Fci.apache.org%2Fprojects%2Fflink%2Fflink-docs-stable%2Fops%2Fsecurity-kerberos.html%23yarnmesos-mode&data=02%7C01%7Cj.gentile%40criteo.com%7C62d8034f25d94e52ccb008d791387e31%7C2a35d8fd574d48e3927c8c398e225a01%7C1%7C0%7C637137544137807833&sdata=rsVxD%2B3QteiNPaIRQriF3lTKV22Rxk7TyU0hbCDr9pk%3D&reserved=0>
>>>
>>>
>>> [4]
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/connectors/kafka.html#enabling-kerberos-authentication-for-versions-09-and-above-only 
>>>
>>> <https://eur03.safelinks.protection.outlook.com/?url=https%3A%2F%2Fci.apache.org%2Fprojects%2Fflink%2Fflink-docs-release-1.9%2Fdev%2Fconnectors%2Fkafka.html%23enabling-kerberos-authentication-for-versions-09-and-above-only&data=02%7C01%7Cj.gentile%40criteo.com%7C62d8034f25d94e52ccb008d791387e31%7C2a35d8fd574d48e3927c8c398e225a01%7C1%7C0%7C637137544137807833&sdata=urJNklA%2Bz9IVV7k0H%2Fp8k5NWm526wHcXiw2qY4tiI7g%3D&reserved=0>
>>>
>>>
>>>
>>>
>>> On Fri, Jan 3, 2020 at 7:20 AM Chesnay Schepler <[hidden email]>
>>> wrote:
>>>
>>>  From what I understand from the documentation, if you want to use
>>> delegation tokens you always first have to issue a ticket using
>>> kinit; so
>>> you did everything correctly?
>>>
>>>
>>>
>>> On 02/01/2020 13:00, Juan Gentile wrote:
>>>
>>> Hello,
>>>
>>>
>>>
>>> Im trying to submit a job (batch worcount) to a Yarn cluster. I’m trying
>>> to use delegation tokens and I’m getting the following error:
>>>
>>> *org.apache.flink.client.deployment.ClusterDeploymentException: Couldn't
>>> deploy Yarn session cluster*
>>>
>>> *               at
>>> org.apache.flink.yarn.AbstractYarnClusterDescriptor.deploySessionCluster(AbstractYarnClusterDescriptor.java:423)*
>>>
>>>
>>> *               at
>>> org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:262)*
>>>
>>>
>>> *               at
>>> org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:216)*
>>>
>>> *               at
>>> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1053)*
>>>
>>>
>>> *               at
>>> org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1129)*
>>>
>>>
>>> *               at java.security.AccessController.doPrivileged(Native
>>> Method)*
>>>
>>> *               at javax.security.auth.Subject.doAs(Subject.java:422)*
>>>
>>> *               at
>>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1920)*
>>>
>>>
>>> *               at
>>> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)*
>>>
>>>
>>> *               at
>>> org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1129)*
>>>
>>> *Caused by: org.apache.hadoop.ipc.RemoteException(java.io.IOException):
>>> Delegation Token can be issued only with kerberos or web authentication*
>>>
>>> *               at
>>> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getDelegationToken(FSNamesystem.java:7560)*
>>>
>>>
>>> *               at
>>> org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.getDelegationToken(NameNodeRpcServer.java:548)*
>>>
>>>
>>> *               at
>>> org.apache.hadoop.hdfs.server.namenode.AuthorizationProviderProxyClientProtocol.getDelegationToken(AuthorizationProviderProxyClientProtocol.java:663)*
>>>
>>>
>>> *               at
>>> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.getDelegationToken(ClientNamenodeProtocolServerSideTranslatorPB.java:981)*
>>>
>>>
>>> *               at
>>> org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)*
>>>
>>>
>>> *               at
>>> org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:617)*
>>>
>>>
>>> *               at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:1073)*
>>>
>>> *               at
>>> org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2221)*
>>>
>>> *               at
>>> org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2217)*
>>>
>>> *               at java.security.AccessController.doPrivileged(Native
>>> Method)*
>>>
>>> *               at javax.security.auth.Subject.doAs(Subject.java:422)*
>>>
>>> *               at
>>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1920)*
>>>
>>>
>>> *               at
>>> org.apache.hadoop.ipc.Server$Handler.run(Server.java:2215) *
>>>
>>> *                at org.apache.hadoop.ipc.Client.call(Client.java:1472)*
>>>
>>> *               at org.apache.hadoop.ipc.Client.call(Client.java:1409)*
>>>
>>> *               at
>>> org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:230)*
>>>
>>>
>>> *               at com.sun.proxy.$Proxy18.getDelegationToken(Unknown
>>> Source)*
>>>
>>> *               at
>>> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getDelegationToken(ClientNamenodeProtocolTranslatorPB.java:928)*
>>>
>>>
>>> *               at sun.reflect.NativeMethodAccessorImpl.invoke0(Native
>>> Method)*
>>>
>>> *               at
>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)*
>>>
>>>
>>> *               at
>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)*
>>>
>>>
>>> *               at java.lang.reflect.Method.invoke(Method.java:498)*
>>>
>>> *               at
>>> org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:256)*
>>>
>>>
>>> *               at
>>> org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:104)*
>>>
>>>
>>> *               at com.sun.proxy.$Proxy19.getDelegationToken(Unknown
>>> Source)*
>>>
>>> *               at
>>> org.apache.hadoop.hdfs.DFSClient.getDelegationToken(DFSClient.java:1082)*
>>>
>>>
>>> *               at
>>> org.apache.hadoop.hdfs.DistributedFileSystem.getDelegationToken(DistributedFileSystem.java:1499)*
>>>
>>>
>>> *               at
>>> org.apache.hadoop.fs.FileSystem.collectDelegationTokens(FileSystem.java:546)*
>>>
>>>
>>> *               at
>>> org.apache.hadoop.fs.FileSystem.collectDelegationTokens(FileSystem.java:557)*
>>>
>>>
>>> *               at
>>> org.apache.hadoop.fs.FileSystem.addDelegationTokens(FileSystem.java:524)*
>>>
>>>
>>> *               at
>>> org.apache.hadoop.mapreduce.security.TokenCache.obtainTokensForNamenodesInternal(TokenCache.java:140)*
>>>
>>>
>>> *               at
>>> org.apache.hadoop.mapreduce.security.TokenCache.obtainTokensForNamenodesInternal(TokenCache.java:100)*
>>>
>>>
>>> *               at
>>> org.apache.hadoop.mapreduce.security.TokenCache.obtainTokensForNamenodes(TokenCache.java:80)*
>>>
>>>
>>> *               at
>>> org.apache.flink.yarn.Utils.setTokensFor(Utils.java:235)*
>>>
>>> *               at
>>> org.apache.flink.yarn.AbstractYarnClusterDescriptor.startAppMaster(AbstractYarnClusterDescriptor.java:972)*
>>>
>>>
>>> *               at
>>> org.apache.flink.yarn.AbstractYarnClusterDescriptor.deployInternal(AbstractYarnClusterDescriptor.java:545)*
>>>
>>>
>>> *               at
>>> org.apache.flink.yarn.AbstractYarnClusterDescriptor.deploySessionCluster(AbstractYarnClusterDescriptor.java:416)*
>>>
>>>
>>>
>>>
>>> The kerberos configuration in this case is the default one. Then I tried
>>> with this option set to false ‘security.kerberos.login.use-ticket-cache‘
>>> but I get the same error.
>>>
>>> I was able to solve the problem by issuing a ticket (with kinit) but I’d
>>> like to know if it’s possible to make flink work with delegation
>>> tokens and
>>> if so what is the right config.
>>>
>>>
>>>
>>> Thank you,
>>>
>>> Juan
>>>
>>>
>>>
>>>
>>
Reply | Threaded
Open this post in threaded view
|

Re: Yarn Kerberos issue

Juan Gentile

Hello Aljoscha!

 

The way we send the DTs to spark is by setting an env variable (HADOOP_TOKEN_FILE_LOCATION) which interestingly we have to unset when we run Flink because even if we do kinit that variable affects somehow Flink and doesn’t work.

I’m not an expert but what you describe (We would need to modify the SecurityUtils [2] to create a UGI from tokens, i.e. UserGroupInformation.createRemoteUser() and then addCreds() or addToken()) makes sense.

If you are able to do this it would be great and help us a lot!

 

Thank you,

Juan

 

On 1/10/20, 3:13 PM, "Aljoscha Krettek" <[hidden email]> wrote:

 

    Hi,

   

    it seems I hin send to early, my mail was missing a small part. This is

    the full mail again:

   

    to summarize and clarify various emails: currently, you can only use

    Kerberos authentication via tickets (i.e. kinit) or keytab. The relevant

    bit of code is in the Hadoop security module [1]. Here you can see that

    we either use keytab or try to login as a user.

   

    I think we should be able to extend this to also work with delegation

    tokens (DTs). We would need to modify the SecurityUtils [2] to create a

    UGI from tokens, i.e. UserGroupInformation.createRemoteUser() and then

    addCreds() or addToken(). In Spark, how do you pass the DTs to the

    system as a user?

   

    Best,

    Aljoscha

   

    [1]

    https://eur03.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgithub.com%2Fapache%2Fflink%2Fblob%2F64e2f27640946bf3b1608d4d85585fe18891dcee%2Fflink-runtime%2Fsrc%2Fmain%2Fjava%2Forg%2Fapache%2Fflink%2Fruntime%2Fsecurity%2Fmodules%2FHadoopModule.java%23L68&amp;data=02%7C01%7Cj.gentile%40criteo.com%7C6078fcc4ad06497baf3b08d795d7358e%7C2a35d8fd574d48e3927c8c398e225a01%7C1%7C0%7C637142623860605798&amp;sdata=VI0wk9X312%2FKgK%2Fe52%2BvGFhHZilnrv7x9rDzpwybIEQ%3D&amp;reserved=0

    [2]

    https://eur03.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgithub.com%2Fapache%2Fflink%2Fblob%2F2bbf4e5002d7657018ba0b53b7a3ed8ee3124da8%2Fflink-runtime%2Fsrc%2Fmain%2Fjava%2Forg%2Fapache%2Fflink%2Fruntime%2Fsecurity%2FSecurityUtils.java%23L89&amp;data=02%7C01%7Cj.gentile%40criteo.com%7C6078fcc4ad06497baf3b08d795d7358e%7C2a35d8fd574d48e3927c8c398e225a01%7C1%7C0%7C637142623860615798&amp;sdata=mWRKoyG3ThIhpCZAPCUR7EGUJXDofw4DEhVvf0kpuGs%3D&amp;reserved=0

   

    On 10.01.20 15:02, Aljoscha Krettek wrote:

    > Hi Juan,

    >

    > to summarize and clarify various emails: currently, you can only use

    > Kerberos authentication via tickets (i.e. kinit) or keytab. The relevant

    > bit of code is in the Hadoop security module: [1]. Here you can see that

    > we either use keytab.

    >

    > I think we should be able to extend this to also work with delegation

    > tokens (DTs). In Spark, how do you pass the DTs to the system as a user?

    >

    > Best,

    > Aljoscha

    >

    > [1]

    > https://eur03.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgithub.com%2Fapache%2Fflink%2Fblob%2F64e2f27640946bf3b1608d4d85585fe18891dcee%2Fflink-runtime%2Fsrc%2Fmain%2Fjava%2Forg%2Fapache%2Fflink%2Fruntime%2Fsecurity%2Fmodules%2FHadoopModule.java%23L68&amp;data=02%7C01%7Cj.gentile%40criteo.com%7C6078fcc4ad06497baf3b08d795d7358e%7C2a35d8fd574d48e3927c8c398e225a01%7C1%7C0%7C637142623860615798&amp;sdata=ULOtGZDsAnMEfgaQGjhEQpc7qnw5zCUc3I019SeJans%3D&amp;reserved=0

    >

    >

    > On 06.01.20 09:52, Yang Wang wrote:

    >> I guess you have set some kerberos related configuration in spark

    >> jobs. For

    >> Flink, you need

    >> to do this too by the following configs. And the keytab file should

    >> existed

    >> on Flink client. In your

    >> environment, it means the scheduler(oozie) could access the keytab file.

    >>

    >> security.kerberos.login.keytab

    >> security.kerberos.login.principal

    >> security.kerberos.login.contexts

    >>

    >>

    >>

    >> Best,

    >> Yang

    >>

    >> Juan Gentile <[hidden email]> 202016日周一 下午3:55写道:

    >>

    >>> Hello Rong, Chesnay,

    >>>

    >>>

    >>>

    >>> Thank you for your answer, the way we are trying to launch the job is

    >>> through a scheduler (similar to oozie) where we have a keytab for the

    >>> scheduler user and with that keytab we get delegation tokens

    >>> impersonating

    >>> the right user (owner of the job). But the only way I was able to

    >>> make this

    >>> work is by getting a ticket (through kinit).

    >>>

    >>> As a comparison, if I launch a spark job (without doing kinit) just with

    >>> the delegation tokens, it works okay. So I guess Spark does something

    >>> extra.

    >>>

    >>> This is as far as I could go but at this point I’m not sure if this is

    >>> something just not supported by Flink or I’m doing something wrong.

    >>>

    >>>

    >>>

    >>> Thank you,

    >>>

    >>> Juan

    >>>

    >>>

    >>>

    >>> *From: *Rong Rong <[hidden email]>

    >>> *Date: *Saturday, January 4, 2020 at 6:06 PM

    >>> *To: *Chesnay Schepler <[hidden email]>

    >>> *Cc: *Juan Gentile <[hidden email]>, "[hidden email]" <

    >>> [hidden email]>, Oleksandr Nitavskyi <[hidden email]>

    >>> *Subject: *Re: Yarn Kerberos issue

    >>>

    >>>

    >>>

    >>> Hi Juan,

    >>>

    >>>

    >>>

    >>> Chesnay was right. If you are using CLI to launch your session cluster

    >>> based on the document [1], you following the instruction to use kinit

    >>> [2]

    >>> first seems to be one of the right way to go.

    >>>

    >>> Another way of approaching it is to setup the kerberos settings in the

    >>> flink-conf.yaml file [3]. FlinkYarnSessionCli will be able to pick up

    >>> your

    >>> keytab files and run the CLI securely.

    >>>

    >>>

    >>>

    >>> As far as I know the option `security.kerberos.login.use-ticket-cache`

    >>> doesn't actually change the behavior of the authentication process,

    >>> it is

    >>> more of a hint whether to use the ticket cache instantiated by

    >>> `kinit`. If

    >>> you disable using the ticket cache, you will have to use the

    >>> "keytab/principle" approach - this doc [4] might be helpful to explain

    >>> better.

    >>>

    >>>

    >>>

    >>> Thanks,

    >>>

    >>> Rong

    >>>

    >>>

    >>>

    >>>

    >>>

    >>> [1]

    >>> https://eur03.safelinks.protection.outlook.com/?url=https%3A%2F%2Fci.apache.org%2Fprojects%2Fflink%2Fflink-docs-stable%2Fops%2Fdeployment%2Fyarn_setup.html%23start-flink-session&amp;data=02%7C01%7Cj.gentile%40criteo.com%7C6078fcc4ad06497baf3b08d795d7358e%7C2a35d8fd574d48e3927c8c398e225a01%7C1%7C0%7C637142623860615798&amp;sdata=%2BS5ctybPpB6ReIsLiRI3ltssY9w0NBf3Q5WG1Gady%2Bk%3D&amp;reserved=0

    >>>

    >>> <https://eur03.safelinks.protection.outlook.com/?url=https%3A%2F%2Fci.apache.org%2Fprojects%2Fflink%2Fflink-docs-stable%2Fops%2Fdeployment%2Fyarn_setup.html%23start-flink-session&amp;data=02%7C01%7Cj.gentile%40criteo.com%7C6078fcc4ad06497baf3b08d795d7358e%7C2a35d8fd574d48e3927c8c398e225a01%7C1%7C0%7C637142623860615798&amp;sdata=%2BS5ctybPpB6ReIsLiRI3ltssY9w0NBf3Q5WG1Gady%2Bk%3D&amp;reserved=0>

    >>>

    >>>

    >>> [2]

    >>> https://eur03.safelinks.protection.outlook.com/?url=https%3A%2F%2Fci.apache.org%2Fprojects%2Fflink%2Fflink-docs-stable%2Fops%2Fsecurity-kerberos.html%23using-kinit-yarn-only&amp;data=02%7C01%7Cj.gentile%40criteo.com%7C6078fcc4ad06497baf3b08d795d7358e%7C2a35d8fd574d48e3927c8c398e225a01%7C1%7C0%7C637142623860615798&amp;sdata=%2FMmtvg%2BNMV6RO0tttGOZtrzV%2BEZLy7znWcWhUuZ2ipw%3D&amp;reserved=0

    >>>

    >>> <https://eur03.safelinks.protection.outlook.com/?url=https%3A%2F%2Fci.apache.org%2Fprojects%2Fflink%2Fflink-docs-stable%2Fops%2Fsecurity-kerberos.html%23using-kinit-yarn-only&amp;data=02%7C01%7Cj.gentile%40criteo.com%7C6078fcc4ad06497baf3b08d795d7358e%7C2a35d8fd574d48e3927c8c398e225a01%7C1%7C0%7C637142623860615798&amp;sdata=%2FMmtvg%2BNMV6RO0tttGOZtrzV%2BEZLy7znWcWhUuZ2ipw%3D&amp;reserved=0>

    >>>

    >>>

    >>> [3]

    >>> https://eur03.safelinks.protection.outlook.com/?url=https%3A%2F%2Fci.apache.org%2Fprojects%2Fflink%2Fflink-docs-stable%2Fops%2Fsecurity-kerberos.html%23yarnmesos-mode&amp;data=02%7C01%7Cj.gentile%40criteo.com%7C6078fcc4ad06497baf3b08d795d7358e%7C2a35d8fd574d48e3927c8c398e225a01%7C1%7C0%7C637142623860615798&amp;sdata=bXNkHqSGhSplvoS2XG1%2Bi7KEpuJUWCsGAEHqhfZcQUY%3D&amp;reserved=0

    >>>

    >>> <https://eur03.safelinks.protection.outlook.com/?url=https%3A%2F%2Fci.apache.org%2Fprojects%2Fflink%2Fflink-docs-stable%2Fops%2Fsecurity-kerberos.html%23yarnmesos-mode&amp;data=02%7C01%7Cj.gentile%40criteo.com%7C6078fcc4ad06497baf3b08d795d7358e%7C2a35d8fd574d48e3927c8c398e225a01%7C1%7C0%7C637142623860615798&amp;sdata=bXNkHqSGhSplvoS2XG1%2Bi7KEpuJUWCsGAEHqhfZcQUY%3D&amp;reserved=0>

    >>>

    >>>

    >>> [4]

    >>> https://eur03.safelinks.protection.outlook.com/?url=https%3A%2F%2Fci.apache.org%2Fprojects%2Fflink%2Fflink-docs-release-1.9%2Fdev%2Fconnectors%2Fkafka.html%23enabling-kerberos-authentication-for-versions-09-and-above-only&amp;data=02%7C01%7Cj.gentile%40criteo.com%7C6078fcc4ad06497baf3b08d795d7358e%7C2a35d8fd574d48e3927c8c398e225a01%7C1%7C0%7C637142623860615798&amp;sdata=b0Mgd9iSzk%2BnMlOaZXBtACUEoPcNbOhGGsJ7%2BtMtPaI%3D&amp;reserved=0

    >>>

    >>> <https://eur03.safelinks.protection.outlook.com/?url=https%3A%2F%2Fci.apache.org%2Fprojects%2Fflink%2Fflink-docs-release-1.9%2Fdev%2Fconnectors%2Fkafka.html%23enabling-kerberos-authentication-for-versions-09-and-above-only&amp;data=02%7C01%7Cj.gentile%40criteo.com%7C6078fcc4ad06497baf3b08d795d7358e%7C2a35d8fd574d48e3927c8c398e225a01%7C1%7C0%7C637142623860615798&amp;sdata=b0Mgd9iSzk%2BnMlOaZXBtACUEoPcNbOhGGsJ7%2BtMtPaI%3D&amp;reserved=0>

    >>>

    >>>

    >>>

    >>>

    >>> On Fri, Jan 3, 2020 at 7:20 AM Chesnay Schepler <[hidden email]>

    >>> wrote:

    >>>

    >>>  From what I understand from the documentation, if you want to use

    >>> delegation tokens you always first have to issue a ticket using

    >>> kinit; so

    >>> you did everything correctly?

    >>>

    >>>

    >>>

    >>> On 02/01/2020 13:00, Juan Gentile wrote:

    >>>

    >>> Hello,

    >>>

    >>>

    >>>

    >>> Im trying to submit a job (batch worcount) to a Yarn cluster. I’m trying

    >>> to use delegation tokens and I’m getting the following error:

    >>>

    >>> *org.apache.flink.client.deployment.ClusterDeploymentException: Couldn't

    >>> deploy Yarn session cluster*

    >>>

    >>> *               at

    >>> org.apache.flink.yarn.AbstractYarnClusterDescriptor.deploySessionCluster(AbstractYarnClusterDescriptor.java:423)*

    >>>

    >>>

    >>> *               at

    >>> org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:262)*

    >>>

    >>>

    >>> *               at

    >>> org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:216)*

    >>>

    >>> *               at

    >>> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1053)*

    >>>

    >>>

    >>> *               at

    >>> org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1129)*

    >>>

    >>>

    >>> *               at java.security.AccessController.doPrivileged(Native

    >>> Method)*

    >>>

    >>> *               at javax.security.auth.Subject.doAs(Subject.java:422)*

    >>>

    >>> *               at

    >>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1920)*

    >>>

    >>>

    >>> *               at

    >>> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)*

    >>>

    >>>

    >>> *               at

    >>> org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1129)*

    >>>

    >>> *Caused by: org.apache.hadoop.ipc.RemoteException(java.io.IOException):

    >>> Delegation Token can be issued only with kerberos or web authentication*

    >>>

    >>> *               at

    >>> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getDelegationToken(FSNamesystem.java:7560)*

    >>>

    >>>

    >>> *               at

    >>> org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.getDelegationToken(NameNodeRpcServer.java:548)*

    >>>

    >>>

    >>> *               at

    >>> org.apache.hadoop.hdfs.server.namenode.AuthorizationProviderProxyClientProtocol.getDelegationToken(AuthorizationProviderProxyClientProtocol.java:663)*

    >>>

    >>>

    >>> *               at

    >>> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.getDelegationToken(ClientNamenodeProtocolServerSideTranslatorPB.java:981)*

    >>>

    >>>

    >>> *               at

    >>> org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)*

    >>>

    >>>

    >>> *               at

    >>> org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:617)*

    >>>

    >>>

    >>> *               at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:1073)*

    >>>

    >>> *               at

    >>> org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2221)*

    >>>

    >>> *               at

    >>> org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2217)*

    >>>

    >>> *               at java.security.AccessController.doPrivileged(Native

    >>> Method)*

    >>>

    >>> *               at javax.security.auth.Subject.doAs(Subject.java:422)*

    >>>

    >>> *               at

    >>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1920)*

    >>>

    >>>

    >>> *               at

    >>> org.apache.hadoop.ipc.Server$Handler.run(Server.java:2215) *

    >>>

    >>> *                at org.apache.hadoop.ipc.Client.call(Client.java:1472)*

    >>>

    >>> *               at org.apache.hadoop.ipc.Client.call(Client.java:1409)*

    >>>

    >>> *               at

    >>> org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:230)*

    >>>

    >>>

    >>> *               at com.sun.proxy.$Proxy18.getDelegationToken(Unknown

    >>> Source)*

    >>>

    >>> *               at

    >>> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getDelegationToken(ClientNamenodeProtocolTranslatorPB.java:928)*

    >>>

    >>>

    >>> *               at sun.reflect.NativeMethodAccessorImpl.invoke0(Native

    >>> Method)*

    >>>

    >>> *               at

    >>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)*

    >>>

    >>>

    >>> *               at

    >>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)*

    >>>

    >>>

    >>> *               at java.lang.reflect.Method.invoke(Method.java:498)*

    >>>

    >>> *               at

    >>> org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:256)*

    >>>

    >>>

    >>> *               at

    >>> org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:104)*

    >>>

    >>>

    >>> *               at com.sun.proxy.$Proxy19.getDelegationToken(Unknown

    >>> Source)*

    >>>

    >>> *               at

    >>> org.apache.hadoop.hdfs.DFSClient.getDelegationToken(DFSClient.java:1082)*

    >>>

    >>>

    >>> *               at

    >>> org.apache.hadoop.hdfs.DistributedFileSystem.getDelegationToken(DistributedFileSystem.java:1499)*

    >>>

    >>>

    >>> *               at

    >>> org.apache.hadoop.fs.FileSystem.collectDelegationTokens(FileSystem.java:546)*

    >>>

    >>>

    >>> *               at

    >>> org.apache.hadoop.fs.FileSystem.collectDelegationTokens(FileSystem.java:557)*

    >>>

    >>>

    >>> *               at

    >>> org.apache.hadoop.fs.FileSystem.addDelegationTokens(FileSystem.java:524)*

    >>>

    >>>

    >>> *               at

    >>> org.apache.hadoop.mapreduce.security.TokenCache.obtainTokensForNamenodesInternal(TokenCache.java:140)*

    >>>

    >>>

    >>> *               at

    >>> org.apache.hadoop.mapreduce.security.TokenCache.obtainTokensForNamenodesInternal(TokenCache.java:100)*

    >>>

    >>>

    >>> *               at

    >>> org.apache.hadoop.mapreduce.security.TokenCache.obtainTokensForNamenodes(TokenCache.java:80)*

    >>>

    >>>

    >>> *               at

    >>> org.apache.flink.yarn.Utils.setTokensFor(Utils.java:235)*

    >>>

    >>> *               at

    >>> org.apache.flink.yarn.AbstractYarnClusterDescriptor.startAppMaster(AbstractYarnClusterDescriptor.java:972)*

    >>>

    >>>

    >>> *               at

    >>> org.apache.flink.yarn.AbstractYarnClusterDescriptor.deployInternal(AbstractYarnClusterDescriptor.java:545)*

    >>>

    >>>

    >>> *               at

    >>> org.apache.flink.yarn.AbstractYarnClusterDescriptor.deploySessionCluster(AbstractYarnClusterDescriptor.java:416)*

    >>>

    >>>

    >>>

    >>>

    >>> The kerberos configuration in this case is the default one. Then I tried

    >>> with this option set to false ‘security.kerberos.login.use-ticket-cache‘

    >>> but I get the same error.

    >>>

    >>> I was able to solve the problem by issuing a ticket (with kinit) but I’d

    >>> like to know if it’s possible to make flink work with delegation

    >>> tokens and

    >>> if so what is the right config.

    >>>

    >>>

    >>>

    >>> Thank you,

    >>>

    >>> Juan

    >>>

    >>>

    >>>

    >>>

    >>

   

Reply | Threaded
Open this post in threaded view
|

Re: Yarn Kerberos issue

Aljoscha Krettek
Hi,

Interesting! What problem are you seeing when you don't unset that
environment variable? From reading UserGroupInformation.java our code
should almost work when that environment variable is set.

Best,
Aljoscha

On 10.01.20 15:23, Juan Gentile wrote:

> Hello Aljoscha!
>
>
>
> The way we send the DTs to spark is by setting an env variable (HADOOP_TOKEN_FILE_LOCATION) which interestingly we have to unset when we run Flink because even if we do kinit that variable affects somehow Flink and doesn’t work.
>
> I’m not an expert but what you describe (We would need to modify the SecurityUtils [2] to create a UGI from tokens, i.e. UserGroupInformation.createRemoteUser() and then addCreds() or addToken()) makes sense.
>
> If you are able to do this it would be great and help us a lot!
>
>
>
> Thank you,
>
> Juan
>
>
>
> On 1/10/20, 3:13 PM, "Aljoscha Krettek" <[hidden email]> wrote:
>
>
>
>      Hi,
>
>
>
>      it seems I hin send to early, my mail was missing a small part. This is
>
>      the full mail again:
>
>
>
>      to summarize and clarify various emails: currently, you can only use
>
>      Kerberos authentication via tickets (i.e. kinit) or keytab. The relevant
>
>      bit of code is in the Hadoop security module [1]. Here you can see that
>
>      we either use keytab or try to login as a user.
>
>
>
>      I think we should be able to extend this to also work with delegation
>
>      tokens (DTs). We would need to modify the SecurityUtils [2] to create a
>
>      UGI from tokens, i.e. UserGroupInformation.createRemoteUser() and then
>
>      addCreds() or addToken(). In Spark, how do you pass the DTs to the
>
>      system as a user?
>
>
>
>      Best,
>
>      Aljoscha
>
>
>
>      [1]
>
>      https://eur03.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgithub.com%2Fapache%2Fflink%2Fblob%2F64e2f27640946bf3b1608d4d85585fe18891dcee%2Fflink-runtime%2Fsrc%2Fmain%2Fjava%2Forg%2Fapache%2Fflink%2Fruntime%2Fsecurity%2Fmodules%2FHadoopModule.java%23L68&amp;data=02%7C01%7Cj.gentile%40criteo.com%7C6078fcc4ad06497baf3b08d795d7358e%7C2a35d8fd574d48e3927c8c398e225a01%7C1%7C0%7C637142623860605798&amp;sdata=VI0wk9X312%2FKgK%2Fe52%2BvGFhHZilnrv7x9rDzpwybIEQ%3D&amp;reserved=0
>
>      [2]
>
>      https://eur03.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgithub.com%2Fapache%2Fflink%2Fblob%2F2bbf4e5002d7657018ba0b53b7a3ed8ee3124da8%2Fflink-runtime%2Fsrc%2Fmain%2Fjava%2Forg%2Fapache%2Fflink%2Fruntime%2Fsecurity%2FSecurityUtils.java%23L89&amp;data=02%7C01%7Cj.gentile%40criteo.com%7C6078fcc4ad06497baf3b08d795d7358e%7C2a35d8fd574d48e3927c8c398e225a01%7C1%7C0%7C637142623860615798&amp;sdata=mWRKoyG3ThIhpCZAPCUR7EGUJXDofw4DEhVvf0kpuGs%3D&amp;reserved=0
>
>
>
>      On 10.01.20 15:02, Aljoscha Krettek wrote:
>
>      > Hi Juan,
>
>      >
>
>      > to summarize and clarify various emails: currently, you can only use
>
>      > Kerberos authentication via tickets (i.e. kinit) or keytab. The relevant
>
>      > bit of code is in the Hadoop security module: [1]. Here you can see that
>
>      > we either use keytab.
>
>      >
>
>      > I think we should be able to extend this to also work with delegation
>
>      > tokens (DTs). In Spark, how do you pass the DTs to the system as a user?
>
>      >
>
>      > Best,
>
>      > Aljoscha
>
>      >
>
>      > [1]
>
>      > https://eur03.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgithub.com%2Fapache%2Fflink%2Fblob%2F64e2f27640946bf3b1608d4d85585fe18891dcee%2Fflink-runtime%2Fsrc%2Fmain%2Fjava%2Forg%2Fapache%2Fflink%2Fruntime%2Fsecurity%2Fmodules%2FHadoopModule.java%23L68&amp;data=02%7C01%7Cj.gentile%40criteo.com%7C6078fcc4ad06497baf3b08d795d7358e%7C2a35d8fd574d48e3927c8c398e225a01%7C1%7C0%7C637142623860615798&amp;sdata=ULOtGZDsAnMEfgaQGjhEQpc7qnw5zCUc3I019SeJans%3D&amp;reserved=0
>
>      >
>
>      >
>
>      > On 06.01.20 09:52, Yang Wang wrote:
>
>      >> I guess you have set some kerberos related configuration in spark
>
>      >> jobs. For
>
>      >> Flink, you need
>
>      >> to do this too by the following configs. And the keytab file should
>
>      >> existed
>
>      >> on Flink client. In your
>
>      >> environment, it means the scheduler(oozie) could access the keytab file.
>
>      >>
>
>      >> security.kerberos.login.keytab
>
>      >> security.kerberos.login.principal
>
>      >> security.kerberos.login.contexts
>
>      >>
>
>      >>
>
>      >>
>
>      >> Best,
>
>      >> Yang
>
>      >>
>
>      >> Juan Gentile <[hidden email]> 于2020年1月6日周一 下午3:55写道:
>
>      >>
>
>      >>> Hello Rong, Chesnay,
>
>      >>>
>
>      >>>
>
>      >>>
>
>      >>> Thank you for your answer, the way we are trying to launch the job is
>
>      >>> through a scheduler (similar to oozie) where we have a keytab for the
>
>      >>> scheduler user and with that keytab we get delegation tokens
>
>      >>> impersonating
>
>      >>> the right user (owner of the job). But the only way I was able to
>
>      >>> make this
>
>      >>> work is by getting a ticket (through kinit).
>
>      >>>
>
>      >>> As a comparison, if I launch a spark job (without doing kinit) just with
>
>      >>> the delegation tokens, it works okay. So I guess Spark does something
>
>      >>> extra.
>
>      >>>
>
>      >>> This is as far as I could go but at this point I’m not sure if this is
>
>      >>> something just not supported by Flink or I’m doing something wrong.
>
>      >>>
>
>      >>>
>
>      >>>
>
>      >>> Thank you,
>
>      >>>
>
>      >>> Juan
>
>      >>>
>
>      >>>
>
>      >>>
>
>      >>> *From: *Rong Rong <[hidden email]>
>
>      >>> *Date: *Saturday, January 4, 2020 at 6:06 PM
>
>      >>> *To: *Chesnay Schepler <[hidden email]>
>
>      >>> *Cc: *Juan Gentile <[hidden email]>, "[hidden email]" <
>
>      >>> [hidden email]>, Oleksandr Nitavskyi <[hidden email]>
>
>      >>> *Subject: *Re: Yarn Kerberos issue
>
>      >>>
>
>      >>>
>
>      >>>
>
>      >>> Hi Juan,
>
>      >>>
>
>      >>>
>
>      >>>
>
>      >>> Chesnay was right. If you are using CLI to launch your session cluster
>
>      >>> based on the document [1], you following the instruction to use kinit
>
>      >>> [2]
>
>      >>> first seems to be one of the right way to go.
>
>      >>>
>
>      >>> Another way of approaching it is to setup the kerberos settings in the
>
>      >>> flink-conf.yaml file [3]. FlinkYarnSessionCli will be able to pick up
>
>      >>> your
>
>      >>> keytab files and run the CLI securely.
>
>      >>>
>
>      >>>
>
>      >>>
>
>      >>> As far as I know the option `security.kerberos.login.use-ticket-cache`
>
>      >>> doesn't actually change the behavior of the authentication process,
>
>      >>> it is
>
>      >>> more of a hint whether to use the ticket cache instantiated by
>
>      >>> `kinit`. If
>
>      >>> you disable using the ticket cache, you will have to use the
>
>      >>> "keytab/principle" approach - this doc [4] might be helpful to explain
>
>      >>> better.
>
>      >>>
>
>      >>>
>
>      >>>
>
>      >>> Thanks,
>
>      >>>
>
>      >>> Rong
>
>      >>>
>
>      >>>
>
>      >>>
>
>      >>>
>
>      >>>
>
>      >>> [1]
>
>      >>> https://eur03.safelinks.protection.outlook.com/?url=https%3A%2F%2Fci.apache.org%2Fprojects%2Fflink%2Fflink-docs-stable%2Fops%2Fdeployment%2Fyarn_setup.html%23start-flink-session&amp;data=02%7C01%7Cj.gentile%40criteo.com%7C6078fcc4ad06497baf3b08d795d7358e%7C2a35d8fd574d48e3927c8c398e225a01%7C1%7C0%7C637142623860615798&amp;sdata=%2BS5ctybPpB6ReIsLiRI3ltssY9w0NBf3Q5WG1Gady%2Bk%3D&amp;reserved=0
>
>      >>>
>
>      >>> <https://eur03.safelinks.protection.outlook.com/?url=https%3A%2F%2Fci.apache.org%2Fprojects%2Fflink%2Fflink-docs-stable%2Fops%2Fdeployment%2Fyarn_setup.html%23start-flink-session&amp;data=02%7C01%7Cj.gentile%40criteo.com%7C6078fcc4ad06497baf3b08d795d7358e%7C2a35d8fd574d48e3927c8c398e225a01%7C1%7C0%7C637142623860615798&amp;sdata=%2BS5ctybPpB6ReIsLiRI3ltssY9w0NBf3Q5WG1Gady%2Bk%3D&amp;reserved=0>
>
>      >>>
>
>      >>>
>
>      >>> [2]
>
>      >>> https://eur03.safelinks.protection.outlook.com/?url=https%3A%2F%2Fci.apache.org%2Fprojects%2Fflink%2Fflink-docs-stable%2Fops%2Fsecurity-kerberos.html%23using-kinit-yarn-only&amp;data=02%7C01%7Cj.gentile%40criteo.com%7C6078fcc4ad06497baf3b08d795d7358e%7C2a35d8fd574d48e3927c8c398e225a01%7C1%7C0%7C637142623860615798&amp;sdata=%2FMmtvg%2BNMV6RO0tttGOZtrzV%2BEZLy7znWcWhUuZ2ipw%3D&amp;reserved=0
>
>      >>>
>
>      >>> <https://eur03.safelinks.protection.outlook.com/?url=https%3A%2F%2Fci.apache.org%2Fprojects%2Fflink%2Fflink-docs-stable%2Fops%2Fsecurity-kerberos.html%23using-kinit-yarn-only&amp;data=02%7C01%7Cj.gentile%40criteo.com%7C6078fcc4ad06497baf3b08d795d7358e%7C2a35d8fd574d48e3927c8c398e225a01%7C1%7C0%7C637142623860615798&amp;sdata=%2FMmtvg%2BNMV6RO0tttGOZtrzV%2BEZLy7znWcWhUuZ2ipw%3D&amp;reserved=0>
>
>      >>>
>
>      >>>
>
>      >>> [3]
>
>      >>> https://eur03.safelinks.protection.outlook.com/?url=https%3A%2F%2Fci.apache.org%2Fprojects%2Fflink%2Fflink-docs-stable%2Fops%2Fsecurity-kerberos.html%23yarnmesos-mode&amp;data=02%7C01%7Cj.gentile%40criteo.com%7C6078fcc4ad06497baf3b08d795d7358e%7C2a35d8fd574d48e3927c8c398e225a01%7C1%7C0%7C637142623860615798&amp;sdata=bXNkHqSGhSplvoS2XG1%2Bi7KEpuJUWCsGAEHqhfZcQUY%3D&amp;reserved=0
>
>      >>>
>
>      >>> <https://eur03.safelinks.protection.outlook.com/?url=https%3A%2F%2Fci.apache.org%2Fprojects%2Fflink%2Fflink-docs-stable%2Fops%2Fsecurity-kerberos.html%23yarnmesos-mode&amp;data=02%7C01%7Cj.gentile%40criteo.com%7C6078fcc4ad06497baf3b08d795d7358e%7C2a35d8fd574d48e3927c8c398e225a01%7C1%7C0%7C637142623860615798&amp;sdata=bXNkHqSGhSplvoS2XG1%2Bi7KEpuJUWCsGAEHqhfZcQUY%3D&amp;reserved=0>
>
>      >>>
>
>      >>>
>
>      >>> [4]
>
>      >>> https://eur03.safelinks.protection.outlook.com/?url=https%3A%2F%2Fci.apache.org%2Fprojects%2Fflink%2Fflink-docs-release-1.9%2Fdev%2Fconnectors%2Fkafka.html%23enabling-kerberos-authentication-for-versions-09-and-above-only&amp;data=02%7C01%7Cj.gentile%40criteo.com%7C6078fcc4ad06497baf3b08d795d7358e%7C2a35d8fd574d48e3927c8c398e225a01%7C1%7C0%7C637142623860615798&amp;sdata=b0Mgd9iSzk%2BnMlOaZXBtACUEoPcNbOhGGsJ7%2BtMtPaI%3D&amp;reserved=0
>
>      >>>
>
>      >>> <https://eur03.safelinks.protection.outlook.com/?url=https%3A%2F%2Fci.apache.org%2Fprojects%2Fflink%2Fflink-docs-release-1.9%2Fdev%2Fconnectors%2Fkafka.html%23enabling-kerberos-authentication-for-versions-09-and-above-only&amp;data=02%7C01%7Cj.gentile%40criteo.com%7C6078fcc4ad06497baf3b08d795d7358e%7C2a35d8fd574d48e3927c8c398e225a01%7C1%7C0%7C637142623860615798&amp;sdata=b0Mgd9iSzk%2BnMlOaZXBtACUEoPcNbOhGGsJ7%2BtMtPaI%3D&amp;reserved=0>
>
>      >>>
>
>      >>>
>
>      >>>
>
>      >>>
>
>      >>> On Fri, Jan 3, 2020 at 7:20 AM Chesnay Schepler <[hidden email]>
>
>      >>> wrote:
>
>      >>>
>
>      >>>  From what I understand from the documentation, if you want to use
>
>      >>> delegation tokens you always first have to issue a ticket using
>
>      >>> kinit; so
>
>      >>> you did everything correctly?
>
>      >>>
>
>      >>>
>
>      >>>
>
>      >>> On 02/01/2020 13:00, Juan Gentile wrote:
>
>      >>>
>
>      >>> Hello,
>
>      >>>
>
>      >>>
>
>      >>>
>
>      >>> Im trying to submit a job (batch worcount) to a Yarn cluster. I’m trying
>
>      >>> to use delegation tokens and I’m getting the following error:
>
>      >>>
>
>      >>> *org.apache.flink.client.deployment.ClusterDeploymentException: Couldn't
>
>      >>> deploy Yarn session cluster*
>
>      >>>
>
>      >>> *               at
>
>      >>> org.apache.flink.yarn.AbstractYarnClusterDescriptor.deploySessionCluster(AbstractYarnClusterDescriptor.java:423)*
>
>      >>>
>
>      >>>
>
>      >>> *               at
>
>      >>> org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:262)*
>
>      >>>
>
>      >>>
>
>      >>> *               at
>
>      >>> org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:216)*
>
>      >>>
>
>      >>> *               at
>
>      >>> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1053)*
>
>      >>>
>
>      >>>
>
>      >>> *               at
>
>      >>> org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1129)*
>
>      >>>
>
>      >>>
>
>      >>> *               at java.security.AccessController.doPrivileged(Native
>
>      >>> Method)*
>
>      >>>
>
>      >>> *               at javax.security.auth.Subject.doAs(Subject.java:422)*
>
>      >>>
>
>      >>> *               at
>
>      >>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1920)*
>
>      >>>
>
>      >>>
>
>      >>> *               at
>
>      >>> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)*
>
>      >>>
>
>      >>>
>
>      >>> *               at
>
>      >>> org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1129)*
>
>      >>>
>
>      >>> *Caused by: org.apache.hadoop.ipc.RemoteException(java.io.IOException):
>
>      >>> Delegation Token can be issued only with kerberos or web authentication*
>
>      >>>
>
>      >>> *               at
>
>      >>> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getDelegationToken(FSNamesystem.java:7560)*
>
>      >>>
>
>      >>>
>
>      >>> *               at
>
>      >>> org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.getDelegationToken(NameNodeRpcServer.java:548)*
>
>      >>>
>
>      >>>
>
>      >>> *               at
>
>      >>> org.apache.hadoop.hdfs.server.namenode.AuthorizationProviderProxyClientProtocol.getDelegationToken(AuthorizationProviderProxyClientProtocol.java:663)*
>
>      >>>
>
>      >>>
>
>      >>> *               at
>
>      >>> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.getDelegationToken(ClientNamenodeProtocolServerSideTranslatorPB.java:981)*
>
>      >>>
>
>      >>>
>
>      >>> *               at
>
>      >>> org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)*
>
>      >>>
>
>      >>>
>
>      >>> *               at
>
>      >>> org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:617)*
>
>      >>>
>
>      >>>
>
>      >>> *               at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:1073)*
>
>      >>>
>
>      >>> *               at
>
>      >>> org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2221)*
>
>      >>>
>
>      >>> *               at
>
>      >>> org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2217)*
>
>      >>>
>
>      >>> *               at java.security.AccessController.doPrivileged(Native
>
>      >>> Method)*
>
>      >>>
>
>      >>> *               at javax.security.auth.Subject.doAs(Subject.java:422)*
>
>      >>>
>
>      >>> *               at
>
>      >>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1920)*
>
>      >>>
>
>      >>>
>
>      >>> *               at
>
>      >>> org.apache.hadoop.ipc.Server$Handler.run(Server.java:2215) *
>
>      >>>
>
>      >>> *                at org.apache.hadoop.ipc.Client.call(Client.java:1472)*
>
>      >>>
>
>      >>> *               at org.apache.hadoop.ipc.Client.call(Client.java:1409)*
>
>      >>>
>
>      >>> *               at
>
>      >>> org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:230)*
>
>      >>>
>
>      >>>
>
>      >>> *               at com.sun.proxy.$Proxy18.getDelegationToken(Unknown
>
>      >>> Source)*
>
>      >>>
>
>      >>> *               at
>
>      >>> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getDelegationToken(ClientNamenodeProtocolTranslatorPB.java:928)*
>
>      >>>
>
>      >>>
>
>      >>> *               at sun.reflect.NativeMethodAccessorImpl.invoke0(Native
>
>      >>> Method)*
>
>      >>>
>
>      >>> *               at
>
>      >>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)*
>
>      >>>
>
>      >>>
>
>      >>> *               at
>
>      >>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)*
>
>      >>>
>
>      >>>
>
>      >>> *               at java.lang.reflect.Method.invoke(Method.java:498)*
>
>      >>>
>
>      >>> *               at
>
>      >>> org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:256)*
>
>      >>>
>
>      >>>
>
>      >>> *               at
>
>      >>> org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:104)*
>
>      >>>
>
>      >>>
>
>      >>> *               at com.sun.proxy.$Proxy19.getDelegationToken(Unknown
>
>      >>> Source)*
>
>      >>>
>
>      >>> *               at
>
>      >>> org.apache.hadoop.hdfs.DFSClient.getDelegationToken(DFSClient.java:1082)*
>
>      >>>
>
>      >>>
>
>      >>> *               at
>
>      >>> org.apache.hadoop.hdfs.DistributedFileSystem.getDelegationToken(DistributedFileSystem.java:1499)*
>
>      >>>
>
>      >>>
>
>      >>> *               at
>
>      >>> org.apache.hadoop.fs.FileSystem.collectDelegationTokens(FileSystem.java:546)*
>
>      >>>
>
>      >>>
>
>      >>> *               at
>
>      >>> org.apache.hadoop.fs.FileSystem.collectDelegationTokens(FileSystem.java:557)*
>
>      >>>
>
>      >>>
>
>      >>> *               at
>
>      >>> org.apache.hadoop.fs.FileSystem.addDelegationTokens(FileSystem.java:524)*
>
>      >>>
>
>      >>>
>
>      >>> *               at
>
>      >>> org.apache.hadoop.mapreduce.security.TokenCache.obtainTokensForNamenodesInternal(TokenCache.java:140)*
>
>      >>>
>
>      >>>
>
>      >>> *               at
>
>      >>> org.apache.hadoop.mapreduce.security.TokenCache.obtainTokensForNamenodesInternal(TokenCache.java:100)*
>
>      >>>
>
>      >>>
>
>      >>> *               at
>
>      >>> org.apache.hadoop.mapreduce.security.TokenCache.obtainTokensForNamenodes(TokenCache.java:80)*
>
>      >>>
>
>      >>>
>
>      >>> *               at
>
>      >>> org.apache.flink.yarn.Utils.setTokensFor(Utils.java:235)*
>
>      >>>
>
>      >>> *               at
>
>      >>> org.apache.flink.yarn.AbstractYarnClusterDescriptor.startAppMaster(AbstractYarnClusterDescriptor.java:972)*
>
>      >>>
>
>      >>>
>
>      >>> *               at
>
>      >>> org.apache.flink.yarn.AbstractYarnClusterDescriptor.deployInternal(AbstractYarnClusterDescriptor.java:545)*
>
>      >>>
>
>      >>>
>
>      >>> *               at
>
>      >>> org.apache.flink.yarn.AbstractYarnClusterDescriptor.deploySessionCluster(AbstractYarnClusterDescriptor.java:416)*
>
>      >>>
>
>      >>>
>
>      >>>
>
>      >>>
>
>      >>> The kerberos configuration in this case is the default one. Then I tried
>
>      >>> with this option set to false ‘security.kerberos.login.use-ticket-cache‘
>
>      >>> but I get the same error.
>
>      >>>
>
>      >>> I was able to solve the problem by issuing a ticket (with kinit) but I’d
>
>      >>> like to know if it’s possible to make flink work with delegation
>
>      >>> tokens and
>
>      >>> if so what is the right config.
>
>      >>>
>
>      >>>
>
>      >>>
>
>      >>> Thank you,
>
>      >>>
>
>      >>> Juan
>
>      >>>
>
>      >>>
>
>      >>>
>
>      >>>
>
>      >>
>
>
Reply | Threaded
Open this post in threaded view
|

Re: Yarn Kerberos issue

Juan Gentile
The error we get is the following:

org.apache.flink.client.deployment.ClusterDeploymentException: Couldn't deploy Yarn session cluster
        at org.apache.flink.yarn.AbstractYarnClusterDescriptor.deploySessionCluster(AbstractYarnClusterDescriptor.java:423)
        at org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:262)
        at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:216)
        at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1053)
        at org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1129)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:422)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1920)
        at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
        at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1129)
Caused by: java.lang.RuntimeException: Hadoop security with Kerberos is enabled but the login user does not have Kerberos credentials
        at org.apache.flink.yarn.AbstractYarnClusterDescriptor.deployInternal(AbstractYarnClusterDescriptor.java:490)
        at org.apache.flink.yarn.AbstractYarnClusterDescriptor.deploySessionCluster(AbstractYarnClusterDescriptor.java:416)
        ... 9 more

On 1/10/20, 3:36 PM, "Aljoscha Krettek" <[hidden email]> wrote:

    Hi,
   
    Interesting! What problem are you seeing when you don't unset that
    environment variable? From reading UserGroupInformation.java our code
    should almost work when that environment variable is set.
   
    Best,
    Aljoscha
   
    On 10.01.20 15:23, Juan Gentile wrote:
    > Hello Aljoscha!
    >
    >
    >
    > The way we send the DTs to spark is by setting an env variable (HADOOP_TOKEN_FILE_LOCATION) which interestingly we have to unset when we run Flink because even if we do kinit that variable affects somehow Flink and doesn’t work.
    >
    > I’m not an expert but what you describe (We would need to modify the SecurityUtils [2] to create a UGI from tokens, i.e. UserGroupInformation.createRemoteUser() and then addCreds() or addToken()) makes sense.
    >
    > If you are able to do this it would be great and help us a lot!
    >
    >
    >
    > Thank you,
    >
    > Juan
    >
    >
    >
    > On 1/10/20, 3:13 PM, "Aljoscha Krettek" <[hidden email]> wrote:
    >
    >
    >
    >      Hi,
    >
    >
    >
    >      it seems I hin send to early, my mail was missing a small part. This is
    >
    >      the full mail again:
    >
    >
    >
    >      to summarize and clarify various emails: currently, you can only use
    >
    >      Kerberos authentication via tickets (i.e. kinit) or keytab. The relevant
    >
    >      bit of code is in the Hadoop security module [1]. Here you can see that
    >
    >      we either use keytab or try to login as a user.
    >
    >
    >
    >      I think we should be able to extend this to also work with delegation
    >
    >      tokens (DTs). We would need to modify the SecurityUtils [2] to create a
    >
    >      UGI from tokens, i.e. UserGroupInformation.createRemoteUser() and then
    >
    >      addCreds() or addToken(). In Spark, how do you pass the DTs to the
    >
    >      system as a user?
    >
    >
    >
    >      Best,
    >
    >      Aljoscha
    >
    >
    >
    >      [1]
    >
    >      https://eur03.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgithub.com%2Fapache%2Fflink%2Fblob%2F64e2f27640946bf3b1608d4d85585fe18891dcee%2Fflink-runtime%2Fsrc%2Fmain%2Fjava%2Forg%2Fapache%2Fflink%2Fruntime%2Fsecurity%2Fmodules%2FHadoopModule.java%23L68&amp;data=02%7C01%7Cj.gentile%40criteo.com%7C812d636410c8447bf75608d795da85b1%7C2a35d8fd574d48e3927c8c398e225a01%7C1%7C0%7C637142638087617028&amp;sdata=3FKN2UWzWr1K14klFN1Su2hKvhi8%2BADj4GCwdTDnKQU%3D&amp;reserved=0
    >
    >      [2]
    >
    >      https://eur03.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgithub.com%2Fapache%2Fflink%2Fblob%2F2bbf4e5002d7657018ba0b53b7a3ed8ee3124da8%2Fflink-runtime%2Fsrc%2Fmain%2Fjava%2Forg%2Fapache%2Fflink%2Fruntime%2Fsecurity%2FSecurityUtils.java%23L89&amp;data=02%7C01%7Cj.gentile%40criteo.com%7C812d636410c8447bf75608d795da85b1%7C2a35d8fd574d48e3927c8c398e225a01%7C1%7C0%7C637142638087626978&amp;sdata=to%2FsDgs1JlTASVg76OmxRD4dVPuSuwZC5jBpPms8hFY%3D&amp;reserved=0
    >
    >
    >
    >      On 10.01.20 15:02, Aljoscha Krettek wrote:
    >
    >      > Hi Juan,
    >
    >      >
    >
    >      > to summarize and clarify various emails: currently, you can only use
    >
    >      > Kerberos authentication via tickets (i.e. kinit) or keytab. The relevant
    >
    >      > bit of code is in the Hadoop security module: [1]. Here you can see that
    >
    >      > we either use keytab.
    >
    >      >
    >
    >      > I think we should be able to extend this to also work with delegation
    >
    >      > tokens (DTs). In Spark, how do you pass the DTs to the system as a user?
    >
    >      >
    >
    >      > Best,
    >
    >      > Aljoscha
    >
    >      >
    >
    >      > [1]
    >
    >      > https://eur03.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgithub.com%2Fapache%2Fflink%2Fblob%2F64e2f27640946bf3b1608d4d85585fe18891dcee%2Fflink-runtime%2Fsrc%2Fmain%2Fjava%2Forg%2Fapache%2Fflink%2Fruntime%2Fsecurity%2Fmodules%2FHadoopModule.java%23L68&amp;data=02%7C01%7Cj.gentile%40criteo.com%7C812d636410c8447bf75608d795da85b1%7C2a35d8fd574d48e3927c8c398e225a01%7C1%7C0%7C637142638087626978&amp;sdata=T9Oy6ZwcDyH2mraQqvYXusan271romFT2tEjQRTk%2FVc%3D&amp;reserved=0
    >
    >      >
    >
    >      >
    >
    >      > On 06.01.20 09:52, Yang Wang wrote:
    >
    >      >> I guess you have set some kerberos related configuration in spark
    >
    >      >> jobs. For
    >
    >      >> Flink, you need
    >
    >      >> to do this too by the following configs. And the keytab file should
    >
    >      >> existed
    >
    >      >> on Flink client. In your
    >
    >      >> environment, it means the scheduler(oozie) could access the keytab file.
    >
    >      >>
    >
    >      >> security.kerberos.login.keytab
    >
    >      >> security.kerberos.login.principal
    >
    >      >> security.kerberos.login.contexts
    >
    >      >>
    >
    >      >>
    >
    >      >>
    >
    >      >> Best,
    >
    >      >> Yang
    >
    >      >>
    >
    >      >> Juan Gentile <[hidden email]> 于2020年1月6日周一 下午3:55写道:
    >
    >      >>
    >
    >      >>> Hello Rong, Chesnay,
    >
    >      >>>
    >
    >      >>>
    >
    >      >>>
    >
    >      >>> Thank you for your answer, the way we are trying to launch the job is
    >
    >      >>> through a scheduler (similar to oozie) where we have a keytab for the
    >
    >      >>> scheduler user and with that keytab we get delegation tokens
    >
    >      >>> impersonating
    >
    >      >>> the right user (owner of the job). But the only way I was able to
    >
    >      >>> make this
    >
    >      >>> work is by getting a ticket (through kinit).
    >
    >      >>>
    >
    >      >>> As a comparison, if I launch a spark job (without doing kinit) just with
    >
    >      >>> the delegation tokens, it works okay. So I guess Spark does something
    >
    >      >>> extra.
    >
    >      >>>
    >
    >      >>> This is as far as I could go but at this point I’m not sure if this is
    >
    >      >>> something just not supported by Flink or I’m doing something wrong.
    >
    >      >>>
    >
    >      >>>
    >
    >      >>>
    >
    >      >>> Thank you,
    >
    >      >>>
    >
    >      >>> Juan
    >
    >      >>>
    >
    >      >>>
    >
    >      >>>
    >
    >      >>> *From: *Rong Rong <[hidden email]>
    >
    >      >>> *Date: *Saturday, January 4, 2020 at 6:06 PM
    >
    >      >>> *To: *Chesnay Schepler <[hidden email]>
    >
    >      >>> *Cc: *Juan Gentile <[hidden email]>, "[hidden email]" <
    >
    >      >>> [hidden email]>, Oleksandr Nitavskyi <[hidden email]>
    >
    >      >>> *Subject: *Re: Yarn Kerberos issue
    >
    >      >>>
    >
    >      >>>
    >
    >      >>>
    >
    >      >>> Hi Juan,
    >
    >      >>>
    >
    >      >>>
    >
    >      >>>
    >
    >      >>> Chesnay was right. If you are using CLI to launch your session cluster
    >
    >      >>> based on the document [1], you following the instruction to use kinit
    >
    >      >>> [2]
    >
    >      >>> first seems to be one of the right way to go.
    >
    >      >>>
    >
    >      >>> Another way of approaching it is to setup the kerberos settings in the
    >
    >      >>> flink-conf.yaml file [3]. FlinkYarnSessionCli will be able to pick up
    >
    >      >>> your
    >
    >      >>> keytab files and run the CLI securely.
    >
    >      >>>
    >
    >      >>>
    >
    >      >>>
    >
    >      >>> As far as I know the option `security.kerberos.login.use-ticket-cache`
    >
    >      >>> doesn't actually change the behavior of the authentication process,
    >
    >      >>> it is
    >
    >      >>> more of a hint whether to use the ticket cache instantiated by
    >
    >      >>> `kinit`. If
    >
    >      >>> you disable using the ticket cache, you will have to use the
    >
    >      >>> "keytab/principle" approach - this doc [4] might be helpful to explain
    >
    >      >>> better.
    >
    >      >>>
    >
    >      >>>
    >
    >      >>>
    >
    >      >>> Thanks,
    >
    >      >>>
    >
    >      >>> Rong
    >
    >      >>>
    >
    >      >>>
    >
    >      >>>
    >
    >      >>>
    >
    >      >>>
    >
    >      >>> [1]
    >
    >      >>> https://eur03.safelinks.protection.outlook.com/?url=https%3A%2F%2Fci.apache.org%2Fprojects%2Fflink%2Fflink-docs-stable%2Fops%2Fdeployment%2Fyarn_setup.html%23start-flink-session&amp;data=02%7C01%7Cj.gentile%40criteo.com%7C812d636410c8447bf75608d795da85b1%7C2a35d8fd574d48e3927c8c398e225a01%7C1%7C0%7C637142638087626978&amp;sdata=UiAsVViNxO9kFF93N0VXS704lMaCXCnI4%2F4tTsHglsk%3D&amp;reserved=0
    >
    >      >>>
    >
    >      >>> <https://eur03.safelinks.protection.outlook.com/?url=https%3A%2F%2Fci.apache.org%2Fprojects%2Fflink%2Fflink-docs-stable%2Fops%2Fdeployment%2Fyarn_setup.html%23start-flink-session&amp;data=02%7C01%7Cj.gentile%40criteo.com%7C812d636410c8447bf75608d795da85b1%7C2a35d8fd574d48e3927c8c398e225a01%7C1%7C0%7C637142638087626978&amp;sdata=UiAsVViNxO9kFF93N0VXS704lMaCXCnI4%2F4tTsHglsk%3D&amp;reserved=0>
    >
    >      >>>
    >
    >      >>>
    >
    >      >>> [2]
    >
    >      >>> https://eur03.safelinks.protection.outlook.com/?url=https%3A%2F%2Fci.apache.org%2Fprojects%2Fflink%2Fflink-docs-stable%2Fops%2Fsecurity-kerberos.html%23using-kinit-yarn-only&amp;data=02%7C01%7Cj.gentile%40criteo.com%7C812d636410c8447bf75608d795da85b1%7C2a35d8fd574d48e3927c8c398e225a01%7C1%7C0%7C637142638087626978&amp;sdata=fW03W5sRPF3SiTc4j7DG8YMzUrhh3sOp0nW5Fo1CxZc%3D&amp;reserved=0
    >
    >      >>>
    >
    >      >>> <https://eur03.safelinks.protection.outlook.com/?url=https%3A%2F%2Fci.apache.org%2Fprojects%2Fflink%2Fflink-docs-stable%2Fops%2Fsecurity-kerberos.html%23using-kinit-yarn-only&amp;data=02%7C01%7Cj.gentile%40criteo.com%7C812d636410c8447bf75608d795da85b1%7C2a35d8fd574d48e3927c8c398e225a01%7C1%7C0%7C637142638087626978&amp;sdata=fW03W5sRPF3SiTc4j7DG8YMzUrhh3sOp0nW5Fo1CxZc%3D&amp;reserved=0>
    >
    >      >>>
    >
    >      >>>
    >
    >      >>> [3]
    >
    >      >>> https://eur03.safelinks.protection.outlook.com/?url=https%3A%2F%2Fci.apache.org%2Fprojects%2Fflink%2Fflink-docs-stable%2Fops%2Fsecurity-kerberos.html%23yarnmesos-mode&amp;data=02%7C01%7Cj.gentile%40criteo.com%7C812d636410c8447bf75608d795da85b1%7C2a35d8fd574d48e3927c8c398e225a01%7C1%7C0%7C637142638087626978&amp;sdata=xoRo3%2BM1Lz8ka3NsLR1JnUmUhqU2QARj%2Bq8kf%2BiIbRY%3D&amp;reserved=0
    >
    >      >>>
    >
    >      >>> <https://eur03.safelinks.protection.outlook.com/?url=https%3A%2F%2Fci.apache.org%2Fprojects%2Fflink%2Fflink-docs-stable%2Fops%2Fsecurity-kerberos.html%23yarnmesos-mode&amp;data=02%7C01%7Cj.gentile%40criteo.com%7C812d636410c8447bf75608d795da85b1%7C2a35d8fd574d48e3927c8c398e225a01%7C1%7C0%7C637142638087626978&amp;sdata=xoRo3%2BM1Lz8ka3NsLR1JnUmUhqU2QARj%2Bq8kf%2BiIbRY%3D&amp;reserved=0>
    >
    >      >>>
    >
    >      >>>
    >
    >      >>> [4]
    >
    >      >>> https://eur03.safelinks.protection.outlook.com/?url=https%3A%2F%2Fci.apache.org%2Fprojects%2Fflink%2Fflink-docs-release-1.9%2Fdev%2Fconnectors%2Fkafka.html%23enabling-kerberos-authentication-for-versions-09-and-above-only&amp;data=02%7C01%7Cj.gentile%40criteo.com%7C812d636410c8447bf75608d795da85b1%7C2a35d8fd574d48e3927c8c398e225a01%7C1%7C0%7C637142638087626978&amp;sdata=3a%2B4nIUvW%2FTaJY%2FuNEcE2yWg%2Fd%2FsLsJt%2FlkTntAvcOc%3D&amp;reserved=0
    >
    >      >>>
    >
    >      >>> <https://eur03.safelinks.protection.outlook.com/?url=https%3A%2F%2Fci.apache.org%2Fprojects%2Fflink%2Fflink-docs-release-1.9%2Fdev%2Fconnectors%2Fkafka.html%23enabling-kerberos-authentication-for-versions-09-and-above-only&amp;data=02%7C01%7Cj.gentile%40criteo.com%7C812d636410c8447bf75608d795da85b1%7C2a35d8fd574d48e3927c8c398e225a01%7C1%7C0%7C637142638087626978&amp;sdata=3a%2B4nIUvW%2FTaJY%2FuNEcE2yWg%2Fd%2FsLsJt%2FlkTntAvcOc%3D&amp;reserved=0>
    >
    >      >>>
    >
    >      >>>
    >
    >      >>>
    >
    >      >>>
    >
    >      >>> On Fri, Jan 3, 2020 at 7:20 AM Chesnay Schepler <[hidden email]>
    >
    >      >>> wrote:
    >
    >      >>>
    >
    >      >>>  From what I understand from the documentation, if you want to use
    >
    >      >>> delegation tokens you always first have to issue a ticket using
    >
    >      >>> kinit; so
    >
    >      >>> you did everything correctly?
    >
    >      >>>
    >
    >      >>>
    >
    >      >>>
    >
    >      >>> On 02/01/2020 13:00, Juan Gentile wrote:
    >
    >      >>>
    >
    >      >>> Hello,
    >
    >      >>>
    >
    >      >>>
    >
    >      >>>
    >
    >      >>> Im trying to submit a job (batch worcount) to a Yarn cluster. I’m trying
    >
    >      >>> to use delegation tokens and I’m getting the following error:
    >
    >      >>>
    >
    >      >>> *org.apache.flink.client.deployment.ClusterDeploymentException: Couldn't
    >
    >      >>> deploy Yarn session cluster*
    >
    >      >>>
    >
    >      >>> *               at
    >
    >      >>> org.apache.flink.yarn.AbstractYarnClusterDescriptor.deploySessionCluster(AbstractYarnClusterDescriptor.java:423)*
    >
    >      >>>
    >
    >      >>>
    >
    >      >>> *               at
    >
    >      >>> org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:262)*
    >
    >      >>>
    >
    >      >>>
    >
    >      >>> *               at
    >
    >      >>> org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:216)*
    >
    >      >>>
    >
    >      >>> *               at
    >
    >      >>> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1053)*
    >
    >      >>>
    >
    >      >>>
    >
    >      >>> *               at
    >
    >      >>> org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1129)*
    >
    >      >>>
    >
    >      >>>
    >
    >      >>> *               at java.security.AccessController.doPrivileged(Native
    >
    >      >>> Method)*
    >
    >      >>>
    >
    >      >>> *               at javax.security.auth.Subject.doAs(Subject.java:422)*
    >
    >      >>>
    >
    >      >>> *               at
    >
    >      >>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1920)*
    >
    >      >>>
    >
    >      >>>
    >
    >      >>> *               at
    >
    >      >>> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)*
    >
    >      >>>
    >
    >      >>>
    >
    >      >>> *               at
    >
    >      >>> org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1129)*
    >
    >      >>>
    >
    >      >>> *Caused by: org.apache.hadoop.ipc.RemoteException(java.io.IOException):
    >
    >      >>> Delegation Token can be issued only with kerberos or web authentication*
    >
    >      >>>
    >
    >      >>> *               at
    >
    >      >>> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getDelegationToken(FSNamesystem.java:7560)*
    >
    >      >>>
    >
    >      >>>
    >
    >      >>> *               at
    >
    >      >>> org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.getDelegationToken(NameNodeRpcServer.java:548)*
    >
    >      >>>
    >
    >      >>>
    >
    >      >>> *               at
    >
    >      >>> org.apache.hadoop.hdfs.server.namenode.AuthorizationProviderProxyClientProtocol.getDelegationToken(AuthorizationProviderProxyClientProtocol.java:663)*
    >
    >      >>>
    >
    >      >>>
    >
    >      >>> *               at
    >
    >      >>> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.getDelegationToken(ClientNamenodeProtocolServerSideTranslatorPB.java:981)*
    >
    >      >>>
    >
    >      >>>
    >
    >      >>> *               at
    >
    >      >>> org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)*
    >
    >      >>>
    >
    >      >>>
    >
    >      >>> *               at
    >
    >      >>> org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:617)*
    >
    >      >>>
    >
    >      >>>
    >
    >      >>> *               at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:1073)*
    >
    >      >>>
    >
    >      >>> *               at
    >
    >      >>> org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2221)*
    >
    >      >>>
    >
    >      >>> *               at
    >
    >      >>> org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2217)*
    >
    >      >>>
    >
    >      >>> *               at java.security.AccessController.doPrivileged(Native
    >
    >      >>> Method)*
    >
    >      >>>
    >
    >      >>> *               at javax.security.auth.Subject.doAs(Subject.java:422)*
    >
    >      >>>
    >
    >      >>> *               at
    >
    >      >>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1920)*
    >
    >      >>>
    >
    >      >>>
    >
    >      >>> *               at
    >
    >      >>> org.apache.hadoop.ipc.Server$Handler.run(Server.java:2215) *
    >
    >      >>>
    >
    >      >>> *                at org.apache.hadoop.ipc.Client.call(Client.java:1472)*
    >
    >      >>>
    >
    >      >>> *               at org.apache.hadoop.ipc.Client.call(Client.java:1409)*
    >
    >      >>>
    >
    >      >>> *               at
    >
    >      >>> org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:230)*
    >
    >      >>>
    >
    >      >>>
    >
    >      >>> *               at com.sun.proxy.$Proxy18.getDelegationToken(Unknown
    >
    >      >>> Source)*
    >
    >      >>>
    >
    >      >>> *               at
    >
    >      >>> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getDelegationToken(ClientNamenodeProtocolTranslatorPB.java:928)*
    >
    >      >>>
    >
    >      >>>
    >
    >      >>> *               at sun.reflect.NativeMethodAccessorImpl.invoke0(Native
    >
    >      >>> Method)*
    >
    >      >>>
    >
    >      >>> *               at
    >
    >      >>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)*
    >
    >      >>>
    >
    >      >>>
    >
    >      >>> *               at
    >
    >      >>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)*
    >
    >      >>>
    >
    >      >>>
    >
    >      >>> *               at java.lang.reflect.Method.invoke(Method.java:498)*
    >
    >      >>>
    >
    >      >>> *               at
    >
    >      >>> org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:256)*
    >
    >      >>>
    >
    >      >>>
    >
    >      >>> *               at
    >
    >      >>> org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:104)*
    >
    >      >>>
    >
    >      >>>
    >
    >      >>> *               at com.sun.proxy.$Proxy19.getDelegationToken(Unknown
    >
    >      >>> Source)*
    >
    >      >>>
    >
    >      >>> *               at
    >
    >      >>> org.apache.hadoop.hdfs.DFSClient.getDelegationToken(DFSClient.java:1082)*
    >
    >      >>>
    >
    >      >>>
    >
    >      >>> *               at
    >
    >      >>> org.apache.hadoop.hdfs.DistributedFileSystem.getDelegationToken(DistributedFileSystem.java:1499)*
    >
    >      >>>
    >
    >      >>>
    >
    >      >>> *               at
    >
    >      >>> org.apache.hadoop.fs.FileSystem.collectDelegationTokens(FileSystem.java:546)*
    >
    >      >>>
    >
    >      >>>
    >
    >      >>> *               at
    >
    >      >>> org.apache.hadoop.fs.FileSystem.collectDelegationTokens(FileSystem.java:557)*
    >
    >      >>>
    >
    >      >>>
    >
    >      >>> *               at
    >
    >      >>> org.apache.hadoop.fs.FileSystem.addDelegationTokens(FileSystem.java:524)*
    >
    >      >>>
    >
    >      >>>
    >
    >      >>> *               at
    >
    >      >>> org.apache.hadoop.mapreduce.security.TokenCache.obtainTokensForNamenodesInternal(TokenCache.java:140)*
    >
    >      >>>
    >
    >      >>>
    >
    >      >>> *               at
    >
    >      >>> org.apache.hadoop.mapreduce.security.TokenCache.obtainTokensForNamenodesInternal(TokenCache.java:100)*
    >
    >      >>>
    >
    >      >>>
    >
    >      >>> *               at
    >
    >      >>> org.apache.hadoop.mapreduce.security.TokenCache.obtainTokensForNamenodes(TokenCache.java:80)*
    >
    >      >>>
    >
    >      >>>
    >
    >      >>> *               at
    >
    >      >>> org.apache.flink.yarn.Utils.setTokensFor(Utils.java:235)*
    >
    >      >>>
    >
    >      >>> *               at
    >
    >      >>> org.apache.flink.yarn.AbstractYarnClusterDescriptor.startAppMaster(AbstractYarnClusterDescriptor.java:972)*
    >
    >      >>>
    >
    >      >>>
    >
    >      >>> *               at
    >
    >      >>> org.apache.flink.yarn.AbstractYarnClusterDescriptor.deployInternal(AbstractYarnClusterDescriptor.java:545)*
    >
    >      >>>
    >
    >      >>>
    >
    >      >>> *               at
    >
    >      >>> org.apache.flink.yarn.AbstractYarnClusterDescriptor.deploySessionCluster(AbstractYarnClusterDescriptor.java:416)*
    >
    >      >>>
    >
    >      >>>
    >
    >      >>>
    >
    >      >>>
    >
    >      >>> The kerberos configuration in this case is the default one. Then I tried
    >
    >      >>> with this option set to false ‘security.kerberos.login.use-ticket-cache‘
    >
    >      >>> but I get the same error.
    >
    >      >>>
    >
    >      >>> I was able to solve the problem by issuing a ticket (with kinit) but I’d
    >
    >      >>> like to know if it’s possible to make flink work with delegation
    >
    >      >>> tokens and
    >
    >      >>> if so what is the right config.
    >
    >      >>>
    >
    >      >>>
    >
    >      >>>
    >
    >      >>> Thank you,
    >
    >      >>>
    >
    >      >>> Juan
    >
    >      >>>
    >
    >      >>>
    >
    >      >>>
    >
    >      >>>
    >
    >      >>
    >
    >
   

Reply | Threaded
Open this post in threaded view
|

Re: Yarn Kerberos issue

Rong Rong
Hi Juan,

I have some time to dig deeper into the code, It seems like the HADOOP_TOKEN_FILE_LOCATION is actually a static environment variable field that the UserGroupInformation will read. 
Interestingly Flink's Hadoop security module actually treats it differently depending on whether this is set, see [1]. This is the major difference from how UserGroupInformation handles credential [2]

We may just need to spawn a thread to periodically renew the delegation token. I already filed a JIRA ticket for tracking this [3]

--
Thanks,
Rong



On Fri, Jan 10, 2020 at 8:19 AM Juan Gentile <[hidden email]> wrote:
The error we get is the following:

org.apache.flink.client.deployment.ClusterDeploymentException: Couldn't deploy Yarn session cluster
        at org.apache.flink.yarn.AbstractYarnClusterDescriptor.deploySessionCluster(AbstractYarnClusterDescriptor.java:423)
        at org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:262)
        at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:216)
        at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1053)
        at org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1129)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:422)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1920)
        at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
        at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1129)
Caused by: java.lang.RuntimeException: Hadoop security with Kerberos is enabled but the login user does not have Kerberos credentials
        at org.apache.flink.yarn.AbstractYarnClusterDescriptor.deployInternal(AbstractYarnClusterDescriptor.java:490)
        at org.apache.flink.yarn.AbstractYarnClusterDescriptor.deploySessionCluster(AbstractYarnClusterDescriptor.java:416)
        ... 9 more

On 1/10/20, 3:36 PM, "Aljoscha Krettek" <[hidden email]> wrote:

    Hi,

    Interesting! What problem are you seeing when you don't unset that
    environment variable? From reading UserGroupInformation.java our code
    should almost work when that environment variable is set.

    Best,
    Aljoscha

    On 10.01.20 15:23, Juan Gentile wrote:
    > Hello Aljoscha!
    >
    >
    >
    > The way we send the DTs to spark is by setting an env variable (HADOOP_TOKEN_FILE_LOCATION) which interestingly we have to unset when we run Flink because even if we do kinit that variable affects somehow Flink and doesn’t work.
    >
    > I’m not an expert but what you describe (We would need to modify the SecurityUtils [2] to create a UGI from tokens, i.e. UserGroupInformation.createRemoteUser() and then addCreds() or addToken()) makes sense.
    >
    > If you are able to do this it would be great and help us a lot!
    >
    >
    >
    > Thank you,
    >
    > Juan
    >
    >
    >
    > On 1/10/20, 3:13 PM, "Aljoscha Krettek" <[hidden email]> wrote:
    >
    >
    >
    >      Hi,
    >
    >
    >
    >      it seems I hin send to early, my mail was missing a small part. This is
    >
    >      the full mail again:
    >
    >
    >
    >      to summarize and clarify various emails: currently, you can only use
    >
    >      Kerberos authentication via tickets (i.e. kinit) or keytab. The relevant
    >
    >      bit of code is in the Hadoop security module [1]. Here you can see that
    >
    >      we either use keytab or try to login as a user.
    >
    >
    >
    >      I think we should be able to extend this to also work with delegation
    >
    >      tokens (DTs). We would need to modify the SecurityUtils [2] to create a
    >
    >      UGI from tokens, i.e. UserGroupInformation.createRemoteUser() and then
    >
    >      addCreds() or addToken(). In Spark, how do you pass the DTs to the
    >
    >      system as a user?
    >
    >
    >
    >      Best,
    >
    >      Aljoscha
    >
    >
    >
    >      [1]
    >
    >      https://eur03.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgithub.com%2Fapache%2Fflink%2Fblob%2F64e2f27640946bf3b1608d4d85585fe18891dcee%2Fflink-runtime%2Fsrc%2Fmain%2Fjava%2Forg%2Fapache%2Fflink%2Fruntime%2Fsecurity%2Fmodules%2FHadoopModule.java%23L68&amp;data=02%7C01%7Cj.gentile%40criteo.com%7C812d636410c8447bf75608d795da85b1%7C2a35d8fd574d48e3927c8c398e225a01%7C1%7C0%7C637142638087617028&amp;sdata=3FKN2UWzWr1K14klFN1Su2hKvhi8%2BADj4GCwdTDnKQU%3D&amp;reserved=0
    >
    >      [2]
    >
    >      https://eur03.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgithub.com%2Fapache%2Fflink%2Fblob%2F2bbf4e5002d7657018ba0b53b7a3ed8ee3124da8%2Fflink-runtime%2Fsrc%2Fmain%2Fjava%2Forg%2Fapache%2Fflink%2Fruntime%2Fsecurity%2FSecurityUtils.java%23L89&amp;data=02%7C01%7Cj.gentile%40criteo.com%7C812d636410c8447bf75608d795da85b1%7C2a35d8fd574d48e3927c8c398e225a01%7C1%7C0%7C637142638087626978&amp;sdata=to%2FsDgs1JlTASVg76OmxRD4dVPuSuwZC5jBpPms8hFY%3D&amp;reserved=0
    >
    >
    >
    >      On 10.01.20 15:02, Aljoscha Krettek wrote:
    >
    >      > Hi Juan,
    >
    >      >
    >
    >      > to summarize and clarify various emails: currently, you can only use
    >
    >      > Kerberos authentication via tickets (i.e. kinit) or keytab. The relevant
    >
    >      > bit of code is in the Hadoop security module: [1]. Here you can see that
    >
    >      > we either use keytab.
    >
    >      >
    >
    >      > I think we should be able to extend this to also work with delegation
    >
    >      > tokens (DTs). In Spark, how do you pass the DTs to the system as a user?
    >
    >      >
    >
    >      > Best,
    >
    >      > Aljoscha
    >
    >      >
    >
    >      > [1]
    >
    >      > https://eur03.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgithub.com%2Fapache%2Fflink%2Fblob%2F64e2f27640946bf3b1608d4d85585fe18891dcee%2Fflink-runtime%2Fsrc%2Fmain%2Fjava%2Forg%2Fapache%2Fflink%2Fruntime%2Fsecurity%2Fmodules%2FHadoopModule.java%23L68&amp;data=02%7C01%7Cj.gentile%40criteo.com%7C812d636410c8447bf75608d795da85b1%7C2a35d8fd574d48e3927c8c398e225a01%7C1%7C0%7C637142638087626978&amp;sdata=T9Oy6ZwcDyH2mraQqvYXusan271romFT2tEjQRTk%2FVc%3D&amp;reserved=0
    >
    >      >
    >
    >      >
    >
    >      > On 06.01.20 09:52, Yang Wang wrote:
    >
    >      >> I guess you have set some kerberos related configuration in spark
    >
    >      >> jobs. For
    >
    >      >> Flink, you need
    >
    >      >> to do this too by the following configs. And the keytab file should
    >
    >      >> existed
    >
    >      >> on Flink client. In your
    >
    >      >> environment, it means the scheduler(oozie) could access the keytab file.
    >
    >      >>
    >
    >      >> security.kerberos.login.keytab
    >
    >      >> security.kerberos.login.principal
    >
    >      >> security.kerberos.login.contexts
    >
    >      >>
    >
    >      >>
    >
    >      >>
    >
    >      >> Best,
    >
    >      >> Yang
    >
    >      >>
    >
    >      >> Juan Gentile <[hidden email]> 于2020年1月6日周一 下午3:55写道:
    >
    >      >>
    >
    >      >>> Hello Rong, Chesnay,
    >
    >      >>>
    >
    >      >>>
    >
    >      >>>
    >
    >      >>> Thank you for your answer, the way we are trying to launch the job is
    >
    >      >>> through a scheduler (similar to oozie) where we have a keytab for the
    >
    >      >>> scheduler user and with that keytab we get delegation tokens
    >
    >      >>> impersonating
    >
    >      >>> the right user (owner of the job). But the only way I was able to
    >
    >      >>> make this
    >
    >      >>> work is by getting a ticket (through kinit).
    >
    >      >>>
    >
    >      >>> As a comparison, if I launch a spark job (without doing kinit) just with
    >
    >      >>> the delegation tokens, it works okay. So I guess Spark does something
    >
    >      >>> extra.
    >
    >      >>>
    >
    >      >>> This is as far as I could go but at this point I’m not sure if this is
    >
    >      >>> something just not supported by Flink or I’m doing something wrong.
    >
    >      >>>
    >
    >      >>>
    >
    >      >>>
    >
    >      >>> Thank you,
    >
    >      >>>
    >
    >      >>> Juan
    >
    >      >>>
    >
    >      >>>
    >
    >      >>>
    >
    >      >>> *From: *Rong Rong <[hidden email]>
    >
    >      >>> *Date: *Saturday, January 4, 2020 at 6:06 PM
    >
    >      >>> *To: *Chesnay Schepler <[hidden email]>
    >
    >      >>> *Cc: *Juan Gentile <[hidden email]>, "[hidden email]" <
    >
    >      >>> [hidden email]>, Oleksandr Nitavskyi <[hidden email]>
    >
    >      >>> *Subject: *Re: Yarn Kerberos issue
    >
    >      >>>
    >
    >      >>>
    >
    >      >>>
    >
    >      >>> Hi Juan,
    >
    >      >>>
    >
    >      >>>
    >
    >      >>>
    >
    >      >>> Chesnay was right. If you are using CLI to launch your session cluster
    >
    >      >>> based on the document [1], you following the instruction to use kinit
    >
    >      >>> [2]
    >
    >      >>> first seems to be one of the right way to go.
    >
    >      >>>
    >
    >      >>> Another way of approaching it is to setup the kerberos settings in the
    >
    >      >>> flink-conf.yaml file [3]. FlinkYarnSessionCli will be able to pick up
    >
    >      >>> your
    >
    >      >>> keytab files and run the CLI securely.
    >
    >      >>>
    >
    >      >>>
    >
    >      >>>
    >
    >      >>> As far as I know the option `security.kerberos.login.use-ticket-cache`
    >
    >      >>> doesn't actually change the behavior of the authentication process,
    >
    >      >>> it is
    >
    >      >>> more of a hint whether to use the ticket cache instantiated by
    >
    >      >>> `kinit`. If
    >
    >      >>> you disable using the ticket cache, you will have to use the
    >
    >      >>> "keytab/principle" approach - this doc [4] might be helpful to explain
    >
    >      >>> better.
    >
    >      >>>
    >
    >      >>>
    >
    >      >>>
    >
    >      >>> Thanks,
    >
    >      >>>
    >
    >      >>> Rong
    >
    >      >>>
    >
    >      >>>
    >
    >      >>>
    >
    >      >>>
    >
    >      >>>
    >
    >      >>> [1]
    >
    >      >>> https://eur03.safelinks.protection.outlook.com/?url=https%3A%2F%2Fci.apache.org%2Fprojects%2Fflink%2Fflink-docs-stable%2Fops%2Fdeployment%2Fyarn_setup.html%23start-flink-session&amp;data=02%7C01%7Cj.gentile%40criteo.com%7C812d636410c8447bf75608d795da85b1%7C2a35d8fd574d48e3927c8c398e225a01%7C1%7C0%7C637142638087626978&amp;sdata=UiAsVViNxO9kFF93N0VXS704lMaCXCnI4%2F4tTsHglsk%3D&amp;reserved=0
    >
    >      >>>
    >
    >      >>> <https://eur03.safelinks.protection.outlook.com/?url=https%3A%2F%2Fci.apache.org%2Fprojects%2Fflink%2Fflink-docs-stable%2Fops%2Fdeployment%2Fyarn_setup.html%23start-flink-session&amp;data=02%7C01%7Cj.gentile%40criteo.com%7C812d636410c8447bf75608d795da85b1%7C2a35d8fd574d48e3927c8c398e225a01%7C1%7C0%7C637142638087626978&amp;sdata=UiAsVViNxO9kFF93N0VXS704lMaCXCnI4%2F4tTsHglsk%3D&amp;reserved=0>
    >
    >      >>>
    >
    >      >>>
    >
    >      >>> [2]
    >
    >      >>> https://eur03.safelinks.protection.outlook.com/?url=https%3A%2F%2Fci.apache.org%2Fprojects%2Fflink%2Fflink-docs-stable%2Fops%2Fsecurity-kerberos.html%23using-kinit-yarn-only&amp;data=02%7C01%7Cj.gentile%40criteo.com%7C812d636410c8447bf75608d795da85b1%7C2a35d8fd574d48e3927c8c398e225a01%7C1%7C0%7C637142638087626978&amp;sdata=fW03W5sRPF3SiTc4j7DG8YMzUrhh3sOp0nW5Fo1CxZc%3D&amp;reserved=0
    >
    >      >>>
    >
    >      >>> <https://eur03.safelinks.protection.outlook.com/?url=https%3A%2F%2Fci.apache.org%2Fprojects%2Fflink%2Fflink-docs-stable%2Fops%2Fsecurity-kerberos.html%23using-kinit-yarn-only&amp;data=02%7C01%7Cj.gentile%40criteo.com%7C812d636410c8447bf75608d795da85b1%7C2a35d8fd574d48e3927c8c398e225a01%7C1%7C0%7C637142638087626978&amp;sdata=fW03W5sRPF3SiTc4j7DG8YMzUrhh3sOp0nW5Fo1CxZc%3D&amp;reserved=0>
    >
    >      >>>
    >
    >      >>>
    >
    >      >>> [3]
    >
    >      >>> https://eur03.safelinks.protection.outlook.com/?url=https%3A%2F%2Fci.apache.org%2Fprojects%2Fflink%2Fflink-docs-stable%2Fops%2Fsecurity-kerberos.html%23yarnmesos-mode&amp;data=02%7C01%7Cj.gentile%40criteo.com%7C812d636410c8447bf75608d795da85b1%7C2a35d8fd574d48e3927c8c398e225a01%7C1%7C0%7C637142638087626978&amp;sdata=xoRo3%2BM1Lz8ka3NsLR1JnUmUhqU2QARj%2Bq8kf%2BiIbRY%3D&amp;reserved=0
    >
    >      >>>
    >
    >      >>> <https://eur03.safelinks.protection.outlook.com/?url=https%3A%2F%2Fci.apache.org%2Fprojects%2Fflink%2Fflink-docs-stable%2Fops%2Fsecurity-kerberos.html%23yarnmesos-mode&amp;data=02%7C01%7Cj.gentile%40criteo.com%7C812d636410c8447bf75608d795da85b1%7C2a35d8fd574d48e3927c8c398e225a01%7C1%7C0%7C637142638087626978&amp;sdata=xoRo3%2BM1Lz8ka3NsLR1JnUmUhqU2QARj%2Bq8kf%2BiIbRY%3D&amp;reserved=0>
    >
    >      >>>
    >
    >      >>>
    >
    >      >>> [4]
    >
    >      >>> https://eur03.safelinks.protection.outlook.com/?url=https%3A%2F%2Fci.apache.org%2Fprojects%2Fflink%2Fflink-docs-release-1.9%2Fdev%2Fconnectors%2Fkafka.html%23enabling-kerberos-authentication-for-versions-09-and-above-only&amp;data=02%7C01%7Cj.gentile%40criteo.com%7C812d636410c8447bf75608d795da85b1%7C2a35d8fd574d48e3927c8c398e225a01%7C1%7C0%7C637142638087626978&amp;sdata=3a%2B4nIUvW%2FTaJY%2FuNEcE2yWg%2Fd%2FsLsJt%2FlkTntAvcOc%3D&amp;reserved=0
    >
    >      >>>
    >
    >      >>> <https://eur03.safelinks.protection.outlook.com/?url=https%3A%2F%2Fci.apache.org%2Fprojects%2Fflink%2Fflink-docs-release-1.9%2Fdev%2Fconnectors%2Fkafka.html%23enabling-kerberos-authentication-for-versions-09-and-above-only&amp;data=02%7C01%7Cj.gentile%40criteo.com%7C812d636410c8447bf75608d795da85b1%7C2a35d8fd574d48e3927c8c398e225a01%7C1%7C0%7C637142638087626978&amp;sdata=3a%2B4nIUvW%2FTaJY%2FuNEcE2yWg%2Fd%2FsLsJt%2FlkTntAvcOc%3D&amp;reserved=0>
    >
    >      >>>
    >
    >      >>>
    >
    >      >>>
    >
    >      >>>
    >
    >      >>> On Fri, Jan 3, 2020 at 7:20 AM Chesnay Schepler <[hidden email]>
    >
    >      >>> wrote:
    >
    >      >>>
    >
    >      >>>  From what I understand from the documentation, if you want to use
    >
    >      >>> delegation tokens you always first have to issue a ticket using
    >
    >      >>> kinit; so
    >
    >      >>> you did everything correctly?
    >
    >      >>>
    >
    >      >>>
    >
    >      >>>
    >
    >      >>> On 02/01/2020 13:00, Juan Gentile wrote:
    >
    >      >>>
    >
    >      >>> Hello,
    >
    >      >>>
    >
    >      >>>
    >
    >      >>>
    >
    >      >>> Im trying to submit a job (batch worcount) to a Yarn cluster. I’m trying
    >
    >      >>> to use delegation tokens and I’m getting the following error:
    >
    >      >>>
    >
    >      >>> *org.apache.flink.client.deployment.ClusterDeploymentException: Couldn't
    >
    >      >>> deploy Yarn session cluster*
    >
    >      >>>
    >
    >      >>> *               at
    >
    >      >>> org.apache.flink.yarn.AbstractYarnClusterDescriptor.deploySessionCluster(AbstractYarnClusterDescriptor.java:423)*
    >
    >      >>>
    >
    >      >>>
    >
    >      >>> *               at
    >
    >      >>> org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:262)*
    >
    >      >>>
    >
    >      >>>
    >
    >      >>> *               at
    >
    >      >>> org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:216)*
    >
    >      >>>
    >
    >      >>> *               at
    >
    >      >>> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1053)*
    >
    >      >>>
    >
    >      >>>
    >
    >      >>> *               at
    >
    >      >>> org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1129)*
    >
    >      >>>
    >
    >      >>>
    >
    >      >>> *               at java.security.AccessController.doPrivileged(Native
    >
    >      >>> Method)*
    >
    >      >>>
    >
    >      >>> *               at javax.security.auth.Subject.doAs(Subject.java:422)*
    >
    >      >>>
    >
    >      >>> *               at
    >
    >      >>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1920)*
    >
    >      >>>
    >
    >      >>>
    >
    >      >>> *               at
    >
    >      >>> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)*
    >
    >      >>>
    >
    >      >>>
    >
    >      >>> *               at
    >
    >      >>> org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1129)*
    >
    >      >>>
    >
    >      >>> *Caused by: org.apache.hadoop.ipc.RemoteException(java.io.IOException):
    >
    >      >>> Delegation Token can be issued only with kerberos or web authentication*
    >
    >      >>>
    >
    >      >>> *               at
    >
    >      >>> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getDelegationToken(FSNamesystem.java:7560)*
    >
    >      >>>
    >
    >      >>>
    >
    >      >>> *               at
    >
    >      >>> org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.getDelegationToken(NameNodeRpcServer.java:548)*
    >
    >      >>>
    >
    >      >>>
    >
    >      >>> *               at
    >
    >      >>> org.apache.hadoop.hdfs.server.namenode.AuthorizationProviderProxyClientProtocol.getDelegationToken(AuthorizationProviderProxyClientProtocol.java:663)*
    >
    >      >>>
    >
    >      >>>
    >
    >      >>> *               at
    >
    >      >>> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.getDelegationToken(ClientNamenodeProtocolServerSideTranslatorPB.java:981)*
    >
    >      >>>
    >
    >      >>>
    >
    >      >>> *               at
    >
    >      >>> org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)*
    >
    >      >>>
    >
    >      >>>
    >
    >      >>> *               at
    >
    >      >>> org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:617)*
    >
    >      >>>
    >
    >      >>>
    >
    >      >>> *               at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:1073)*
    >
    >      >>>
    >
    >      >>> *               at
    >
    >      >>> org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2221)*
    >
    >      >>>
    >
    >      >>> *               at
    >
    >      >>> org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2217)*
    >
    >      >>>
    >
    >      >>> *               at java.security.AccessController.doPrivileged(Native
    >
    >      >>> Method)*
    >
    >      >>>
    >
    >      >>> *               at javax.security.auth.Subject.doAs(Subject.java:422)*
    >
    >      >>>
    >
    >      >>> *               at
    >
    >      >>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1920)*
    >
    >      >>>
    >
    >      >>>
    >
    >      >>> *               at
    >
    >      >>> org.apache.hadoop.ipc.Server$Handler.run(Server.java:2215) *
    >
    >      >>>
    >
    >      >>> *                at org.apache.hadoop.ipc.Client.call(Client.java:1472)*
    >
    >      >>>
    >
    >      >>> *               at org.apache.hadoop.ipc.Client.call(Client.java:1409)*
    >
    >      >>>
    >
    >      >>> *               at
    >
    >      >>> org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:230)*
    >
    >      >>>
    >
    >      >>>
    >
    >      >>> *               at com.sun.proxy.$Proxy18.getDelegationToken(Unknown
    >
    >      >>> Source)*
    >
    >      >>>
    >
    >      >>> *               at
    >
    >      >>> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getDelegationToken(ClientNamenodeProtocolTranslatorPB.java:928)*
    >
    >      >>>
    >
    >      >>>
    >
    >      >>> *               at sun.reflect.NativeMethodAccessorImpl.invoke0(Native
    >
    >      >>> Method)*
    >
    >      >>>
    >
    >      >>> *               at
    >
    >      >>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)*
    >
    >      >>>
    >
    >      >>>
    >
    >      >>> *               at
    >
    >      >>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)*
    >
    >      >>>
    >
    >      >>>
    >
    >      >>> *               at java.lang.reflect.Method.invoke(Method.java:498)*
    >
    >      >>>
    >
    >      >>> *               at
    >
    >      >>> org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:256)*
    >
    >      >>>
    >
    >      >>>
    >
    >      >>> *               at
    >
    >      >>> org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:104)*
    >
    >      >>>
    >
    >      >>>
    >
    >      >>> *               at com.sun.proxy.$Proxy19.getDelegationToken(Unknown
    >
    >      >>> Source)*
    >
    >      >>>
    >
    >      >>> *               at
    >
    >      >>> org.apache.hadoop.hdfs.DFSClient.getDelegationToken(DFSClient.java:1082)*
    >
    >      >>>
    >
    >      >>>
    >
    >      >>> *               at
    >
    >      >>> org.apache.hadoop.hdfs.DistributedFileSystem.getDelegationToken(DistributedFileSystem.java:1499)*
    >
    >      >>>
    >
    >      >>>
    >
    >      >>> *               at
    >
    >      >>> org.apache.hadoop.fs.FileSystem.collectDelegationTokens(FileSystem.java:546)*
    >
    >      >>>
    >
    >      >>>
    >
    >      >>> *               at
    >
    >      >>> org.apache.hadoop.fs.FileSystem.collectDelegationTokens(FileSystem.java:557)*
    >
    >      >>>
    >
    >      >>>
    >
    >      >>> *               at
    >
    >      >>> org.apache.hadoop.fs.FileSystem.addDelegationTokens(FileSystem.java:524)*
    >
    >      >>>
    >
    >      >>>
    >
    >      >>> *               at
    >
    >      >>> org.apache.hadoop.mapreduce.security.TokenCache.obtainTokensForNamenodesInternal(TokenCache.java:140)*
    >
    >      >>>
    >
    >      >>>
    >
    >      >>> *               at
    >
    >      >>> org.apache.hadoop.mapreduce.security.TokenCache.obtainTokensForNamenodesInternal(TokenCache.java:100)*
    >
    >      >>>
    >
    >      >>>
    >
    >      >>> *               at
    >
    >      >>> org.apache.hadoop.mapreduce.security.TokenCache.obtainTokensForNamenodes(TokenCache.java:80)*
    >
    >      >>>
    >
    >      >>>
    >
    >      >>> *               at
    >
    >      >>> org.apache.flink.yarn.Utils.setTokensFor(Utils.java:235)*
    >
    >      >>>
    >
    >      >>> *               at
    >
    >      >>> org.apache.flink.yarn.AbstractYarnClusterDescriptor.startAppMaster(AbstractYarnClusterDescriptor.java:972)*
    >
    >      >>>
    >
    >      >>>
    >
    >      >>> *               at
    >
    >      >>> org.apache.flink.yarn.AbstractYarnClusterDescriptor.deployInternal(AbstractYarnClusterDescriptor.java:545)*
    >
    >      >>>
    >
    >      >>>
    >
    >      >>> *               at
    >
    >      >>> org.apache.flink.yarn.AbstractYarnClusterDescriptor.deploySessionCluster(AbstractYarnClusterDescriptor.java:416)*
    >
    >      >>>
    >
    >      >>>
    >
    >      >>>
    >
    >      >>>
    >
    >      >>> The kerberos configuration in this case is the default one. Then I tried
    >
    >      >>> with this option set to false ‘security.kerberos.login.use-ticket-cache‘
    >
    >      >>> but I get the same error.
    >
    >      >>>
    >
    >      >>> I was able to solve the problem by issuing a ticket (with kinit) but I’d
    >
    >      >>> like to know if it’s possible to make flink work with delegation
    >
    >      >>> tokens and
    >
    >      >>> if so what is the right config.
    >
    >      >>>
    >
    >      >>>
    >
    >      >>>
    >
    >      >>> Thank you,
    >
    >      >>>
    >
    >      >>> Juan
    >
    >      >>>
    >
    >      >>>
    >
    >      >>>
    >
    >      >>>
    >
    >      >>
    >
    >


Reply | Threaded
Open this post in threaded view
|

Re: Yarn Kerberos issue

Juan Gentile

Thank you Rong

 

We believe that the job (or scheduler) launching Flink should be the one responsible for renewing the DT. Here is some documentation that could be useful regarding Spark https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/deploy/security/README.md#dt-renewal-renewers-and-yarn

 

If you think that you need more information about our issue, we can organize a call and discuss about it.

 

Regards,

Juan

 

From: Rong Rong <[hidden email]>
Date: Sunday, January 12, 2020 at 6:13 PM
To: Juan Gentile <[hidden email]>
Cc: Aljoscha Krettek <[hidden email]>, "[hidden email]" <[hidden email]>, Arnaud Dufranne <[hidden email]>, Oleksandr Nitavskyi <[hidden email]>
Subject: Re: Yarn Kerberos issue

 

Hi Juan,

 

I have some time to dig deeper into the code, It seems like the HADOOP_TOKEN_FILE_LOCATION is actually a static environment variable field that the UserGroupInformation will read. 

Interestingly Flink's Hadoop security module actually treats it differently depending on whether this is set, see [1]. This is the major difference from how UserGroupInformation handles credential [2]

 

We may just need to spawn a thread to periodically renew the delegation token. I already filed a JIRA ticket for tracking this [3]

 

--

Thanks,

Rong

 

 

 

On Fri, Jan 10, 2020 at 8:19 AM Juan Gentile <[hidden email]> wrote:

The error we get is the following:

org.apache.flink.client.deployment.ClusterDeploymentException: Couldn't deploy Yarn session cluster
        at org.apache.flink.yarn.AbstractYarnClusterDescriptor.deploySessionCluster(AbstractYarnClusterDescriptor.java:423)
        at org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:262)
        at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:216)
        at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1053)
        at org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1129)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:422)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1920)
        at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
        at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1129)
Caused by: java.lang.RuntimeException: Hadoop security with Kerberos is enabled but the login user does not have Kerberos credentials
        at org.apache.flink.yarn.AbstractYarnClusterDescriptor.deployInternal(AbstractYarnClusterDescriptor.java:490)
        at org.apache.flink.yarn.AbstractYarnClusterDescriptor.deploySessionCluster(AbstractYarnClusterDescriptor.java:416)
        ... 9 more

On 1/10/20, 3:36 PM, "Aljoscha Krettek" <[hidden email]> wrote:

    Hi,

    Interesting! What problem are you seeing when you don't unset that
    environment variable? From reading UserGroupInformation.java our code
    should almost work when that environment variable is set.

    Best,
    Aljoscha

    On 10.01.20 15:23, Juan Gentile wrote:
    > Hello Aljoscha!
    >
    >
    >
    > The way we send the DTs to spark is by setting an env variable (HADOOP_TOKEN_FILE_LOCATION) which interestingly we have to unset when we run Flink because even if we do kinit that variable affects somehow Flink and doesn’t work.
    >
    > I’m not an expert but what you describe (We would need to modify the SecurityUtils [2] to create a UGI from tokens, i.e. UserGroupInformation.createRemoteUser() and then addCreds() or addToken()) makes sense.
    >
    > If you are able to do this it would be great and help us a lot!
    >
    >
    >
    > Thank you,
    >
    > Juan
    >
    >
    >
    > On 1/10/20, 3:13 PM, "Aljoscha Krettek" <[hidden email]> wrote:
    >
    >
    >
    >      Hi,
    >
    >
    >
    >      it seems I hin send to early, my mail was missing a small part. This is
    >
    >      the full mail again:
    >
    >
    >
    >      to summarize and clarify various emails: currently, you can only use
    >
    >      Kerberos authentication via tickets (i.e. kinit) or keytab. The relevant
    >
    >      bit of code is in the Hadoop security module [1]. Here you can see that
    >
    >      we either use keytab or try to login as a user.
    >
    >
    >
    >      I think we should be able to extend this to also work with delegation
    >
    >      tokens (DTs). We would need to modify the SecurityUtils [2] to create a
    >
    >      UGI from tokens, i.e. UserGroupInformation.createRemoteUser() and then
    >
    >      addCreds() or addToken(). In Spark, how do you pass the DTs to the
    >
    >      system as a user?
    >
    >
    >
    >      Best,
    >
    >      Aljoscha
    >
    >
    >
    >      [1]
    >
    >      https://eur03.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgithub.com%2Fapache%2Fflink%2Fblob%2F64e2f27640946bf3b1608d4d85585fe18891dcee%2Fflink-runtime%2Fsrc%2Fmain%2Fjava%2Forg%2Fapache%2Fflink%2Fruntime%2Fsecurity%2Fmodules%2FHadoopModule.java%23L68&amp;data=02%7C01%7Cj.gentile%40criteo.com%7C812d636410c8447bf75608d795da85b1%7C2a35d8fd574d48e3927c8c398e225a01%7C1%7C0%7C637142638087617028&amp;sdata=3FKN2UWzWr1K14klFN1Su2hKvhi8%2BADj4GCwdTDnKQU%3D&amp;reserved=0
    >
    >      [2]
    >
    >      https://eur03.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgithub.com%2Fapache%2Fflink%2Fblob%2F2bbf4e5002d7657018ba0b53b7a3ed8ee3124da8%2Fflink-runtime%2Fsrc%2Fmain%2Fjava%2Forg%2Fapache%2Fflink%2Fruntime%2Fsecurity%2FSecurityUtils.java%23L89&amp;data=02%7C01%7Cj.gentile%40criteo.com%7C812d636410c8447bf75608d795da85b1%7C2a35d8fd574d48e3927c8c398e225a01%7C1%7C0%7C637142638087626978&amp;sdata=to%2FsDgs1JlTASVg76OmxRD4dVPuSuwZC5jBpPms8hFY%3D&amp;reserved=0
    >
    >
    >
    >      On 10.01.20 15:02, Aljoscha Krettek wrote:
    >
    >      > Hi Juan,
    >
    >      >
    >
    >      > to summarize and clarify various emails: currently, you can only use
    >
    >      > Kerberos authentication via tickets (i.e. kinit) or keytab. The relevant
    >
    >      > bit of code is in the Hadoop security module: [1]. Here you can see that
    >
    >      > we either use keytab.
    >
    >      >
    >
    >      > I think we should be able to extend this to also work with delegation
    >
    >      > tokens (DTs). In Spark, how do you pass the DTs to the system as a user?
    >
    >      >
    >
    >      > Best,
    >
    >      > Aljoscha
    >
    >      >
    >
    >      > [1]
    >
    >      > https://eur03.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgithub.com%2Fapache%2Fflink%2Fblob%2F64e2f27640946bf3b1608d4d85585fe18891dcee%2Fflink-runtime%2Fsrc%2Fmain%2Fjava%2Forg%2Fapache%2Fflink%2Fruntime%2Fsecurity%2Fmodules%2FHadoopModule.java%23L68&amp;data=02%7C01%7Cj.gentile%40criteo.com%7C812d636410c8447bf75608d795da85b1%7C2a35d8fd574d48e3927c8c398e225a01%7C1%7C0%7C637142638087626978&amp;sdata=T9Oy6ZwcDyH2mraQqvYXusan271romFT2tEjQRTk%2FVc%3D&amp;reserved=0
    >
    >      >
    >
    >      >
    >
    >      > On 06.01.20 09:52, Yang Wang wrote:
    >
    >      >> I guess you have set some kerberos related configuration in spark
    >
    >      >> jobs. For
    >
    >      >> Flink, you need
    >
    >      >> to do this too by the following configs. And the keytab file should
    >
    >      >> existed
    >
    >      >> on Flink client. In your
    >
    >      >> environment, it means the scheduler(oozie) could access the keytab file.
    >
    >      >>
    >
    >      >> security.kerberos.login.keytab
    >
    >      >> security.kerberos.login.principal
    >
    >      >> security.kerberos.login.contexts
    >
    >      >>
    >
    >      >>
    >
    >      >>
    >
    >      >> Best,
    >
    >      >> Yang
    >
    >      >>
    >
    >      >> Juan Gentile <[hidden email]> 202016日周一 下午3:55写道:
    >
    >      >>
    >
    >      >>> Hello Rong, Chesnay,
    >
    >      >>>
    >
    >      >>>
    >
    >      >>>
    >
    >      >>> Thank you for your answer, the way we are trying to launch the job is
    >
    >      >>> through a scheduler (similar to oozie) where we have a keytab for the
    >
    >      >>> scheduler user and with that keytab we get delegation tokens
    >
    >      >>> impersonating
    >
    >      >>> the right user (owner of the job). But the only way I was able to
    >
    >      >>> make this
    >
    >      >>> work is by getting a ticket (through kinit).
    >
    >      >>>
    >
    >      >>> As a comparison, if I launch a spark job (without doing kinit) just with
    >
    >      >>> the delegation tokens, it works okay. So I guess Spark does something
    >
    >      >>> extra.
    >
    >      >>>
    >
    >      >>> This is as far as I could go but at this point I’m not sure if this is
    >
    >      >>> something just not supported by Flink or I’m doing something wrong.
    >
    >      >>>
    >
    >      >>>
    >
    >      >>>
    >
    >      >>> Thank you,
    >
    >      >>>
    >
    >      >>> Juan
    >
    >      >>>
    >
    >      >>>
    >
    >      >>>
    >
    >      >>> *From: *Rong Rong <[hidden email]>
    >
    >      >>> *Date: *Saturday, January 4, 2020 at 6:06 PM
    >
    >      >>> *To: *Chesnay Schepler <[hidden email]>
    >
    >      >>> *Cc: *Juan Gentile <[hidden email]>, "[hidden email]" <
    >
    >      >>> [hidden email]>, Oleksandr Nitavskyi <[hidden email]>
    >
    >      >>> *Subject: *Re: Yarn Kerberos issue
    >
    >      >>>
    >
    >      >>>
    >
    >      >>>
    >
    >      >>> Hi Juan,
    >
    >      >>>
    >
    >      >>>
    >
    >      >>>
    >
    >      >>> Chesnay was right. If you are using CLI to launch your session cluster
    >
    >      >>> based on the document [1], you following the instruction to use kinit
    >
    >      >>> [2]
    >
    >      >>> first seems to be one of the right way to go.
    >
    >      >>>
    >
    >      >>> Another way of approaching it is to setup the kerberos settings in the
    >
    >      >>> flink-conf.yaml file [3]. FlinkYarnSessionCli will be able to pick up
    >
    >      >>> your
    >
    >      >>> keytab files and run the CLI securely.
    >
    >      >>>
    >
    >      >>>
    >
    >      >>>
    >
    >      >>> As far as I know the option `security.kerberos.login.use-ticket-cache`
    >
    >      >>> doesn't actually change the behavior of the authentication process,
    >
    >      >>> it is
    >
    >      >>> more of a hint whether to use the ticket cache instantiated by
    >
    >      >>> `kinit`. If
    >
    >      >>> you disable using the ticket cache, you will have to use the
    >
    >      >>> "keytab/principle" approach - this doc [4] might be helpful to explain
    >
    >      >>> better.
    >
    >      >>>
    >
    >      >>>
    >
    >      >>>
    >
    >      >>> Thanks,
    >
    >      >>>
    >
    >      >>> Rong
    >
    >      >>>
    >
    >      >>>
    >
    >      >>>
    >
    >      >>>
    >
    >      >>>
    >
    >      >>> [1]
    >
    >      >>> https://eur03.safelinks.protection.outlook.com/?url=https%3A%2F%2Fci.apache.org%2Fprojects%2Fflink%2Fflink-docs-stable%2Fops%2Fdeployment%2Fyarn_setup.html%23start-flink-session&amp;data=02%7C01%7Cj.gentile%40criteo.com%7C812d636410c8447bf75608d795da85b1%7C2a35d8fd574d48e3927c8c398e225a01%7C1%7C0%7C637142638087626978&amp;sdata=UiAsVViNxO9kFF93N0VXS704lMaCXCnI4%2F4tTsHglsk%3D&amp;reserved=0
    >
    >      >>>
    >
    >      >>> <https://eur03.safelinks.protection.outlook.com/?url=https%3A%2F%2Fci.apache.org%2Fprojects%2Fflink%2Fflink-docs-stable%2Fops%2Fdeployment%2Fyarn_setup.html%23start-flink-session&amp;data=02%7C01%7Cj.gentile%40criteo.com%7C812d636410c8447bf75608d795da85b1%7C2a35d8fd574d48e3927c8c398e225a01%7C1%7C0%7C637142638087626978&amp;sdata=UiAsVViNxO9kFF93N0VXS704lMaCXCnI4%2F4tTsHglsk%3D&amp;reserved=0>
    >
    >      >>>
    >
    >      >>>
    >
    >      >>> [2]
    >
    >      >>> https://eur03.safelinks.protection.outlook.com/?url=https%3A%2F%2Fci.apache.org%2Fprojects%2Fflink%2Fflink-docs-stable%2Fops%2Fsecurity-kerberos.html%23using-kinit-yarn-only&amp;data=02%7C01%7Cj.gentile%40criteo.com%7C812d636410c8447bf75608d795da85b1%7C2a35d8fd574d48e3927c8c398e225a01%7C1%7C0%7C637142638087626978&amp;sdata=fW03W5sRPF3SiTc4j7DG8YMzUrhh3sOp0nW5Fo1CxZc%3D&amp;reserved=0
    >
    >      >>>
    >
    >      >>> <https://eur03.safelinks.protection.outlook.com/?url=https%3A%2F%2Fci.apache.org%2Fprojects%2Fflink%2Fflink-docs-stable%2Fops%2Fsecurity-kerberos.html%23using-kinit-yarn-only&amp;data=02%7C01%7Cj.gentile%40criteo.com%7C812d636410c8447bf75608d795da85b1%7C2a35d8fd574d48e3927c8c398e225a01%7C1%7C0%7C637142638087626978&amp;sdata=fW03W5sRPF3SiTc4j7DG8YMzUrhh3sOp0nW5Fo1CxZc%3D&amp;reserved=0>
    >
    >      >>>
    >
    >      >>>
    >
    >      >>> [3]
    >
    >      >>> https://eur03.safelinks.protection.outlook.com/?url=https%3A%2F%2Fci.apache.org%2Fprojects%2Fflink%2Fflink-docs-stable%2Fops%2Fsecurity-kerberos.html%23yarnmesos-mode&amp;data=02%7C01%7Cj.gentile%40criteo.com%7C812d636410c8447bf75608d795da85b1%7C2a35d8fd574d48e3927c8c398e225a01%7C1%7C0%7C637142638087626978&amp;sdata=xoRo3%2BM1Lz8ka3NsLR1JnUmUhqU2QARj%2Bq8kf%2BiIbRY%3D&amp;reserved=0
    >
    >      >>>
    >
    >      >>> <https://eur03.safelinks.protection.outlook.com/?url=https%3A%2F%2Fci.apache.org%2Fprojects%2Fflink%2Fflink-docs-stable%2Fops%2Fsecurity-kerberos.html%23yarnmesos-mode&amp;data=02%7C01%7Cj.gentile%40criteo.com%7C812d636410c8447bf75608d795da85b1%7C2a35d8fd574d48e3927c8c398e225a01%7C1%7C0%7C637142638087626978&amp;sdata=xoRo3%2BM1Lz8ka3NsLR1JnUmUhqU2QARj%2Bq8kf%2BiIbRY%3D&amp;reserved=0>
    >
    >      >>>
    >
    >      >>>
    >
    >      >>> [4]
    >
    >      >>> https://eur03.safelinks.protection.outlook.com/?url=https%3A%2F%2Fci.apache.org%2Fprojects%2Fflink%2Fflink-docs-release-1.9%2Fdev%2Fconnectors%2Fkafka.html%23enabling-kerberos-authentication-for-versions-09-and-above-only&amp;data=02%7C01%7Cj.gentile%40criteo.com%7C812d636410c8447bf75608d795da85b1%7C2a35d8fd574d48e3927c8c398e225a01%7C1%7C0%7C637142638087626978&amp;sdata=3a%2B4nIUvW%2FTaJY%2FuNEcE2yWg%2Fd%2FsLsJt%2FlkTntAvcOc%3D&amp;reserved=0
    >
    >      >>>
    >
    >      >>> <https://eur03.safelinks.protection.outlook.com/?url=https%3A%2F%2Fci.apache.org%2Fprojects%2Fflink%2Fflink-docs-release-1.9%2Fdev%2Fconnectors%2Fkafka.html%23enabling-kerberos-authentication-for-versions-09-and-above-only&amp;data=02%7C01%7Cj.gentile%40criteo.com%7C812d636410c8447bf75608d795da85b1%7C2a35d8fd574d48e3927c8c398e225a01%7C1%7C0%7C637142638087626978&amp;sdata=3a%2B4nIUvW%2FTaJY%2FuNEcE2yWg%2Fd%2FsLsJt%2FlkTntAvcOc%3D&amp;reserved=0>
    >
    >      >>>
    >
    >      >>>
    >
    >      >>>
    >
    >      >>>
    >
    >      >>> On Fri, Jan 3, 2020 at 7:20 AM Chesnay Schepler <[hidden email]>
    >
    >      >>> wrote:
    >
    >      >>>
    >
    >      >>>  From what I understand from the documentation, if you want to use
    >
    >      >>> delegation tokens you always first have to issue a ticket using
    >
    >      >>> kinit; so
    >
    >      >>> you did everything correctly?
    >
    >      >>>
    >
    >      >>>
    >
    >      >>>
    >
    >      >>> On 02/01/2020 13:00, Juan Gentile wrote:
    >
    >      >>>
    >
    >      >>> Hello,
    >
    >      >>>
    >
    >      >>>
    >
    >      >>>
    >
    >      >>> Im trying to submit a job (batch worcount) to a Yarn cluster. I’m trying
    >
    >      >>> to use delegation tokens and I’m getting the following error:
    >
    >      >>>
    >
    >      >>> *org.apache.flink.client.deployment.ClusterDeploymentException: Couldn't
    >
    >      >>> deploy Yarn session cluster*
    >
    >      >>>
    >
    >      >>> *               at
    >
    >      >>> org.apache.flink.yarn.AbstractYarnClusterDescriptor.deploySessionCluster(AbstractYarnClusterDescriptor.java:423)*
    >
    >      >>>
    >
    >      >>>
    >
    >      >>> *               at
    >
    >      >>> org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:262)*
    >
    >      >>>
    >
    >      >>>
    >
    >      >>> *               at
    >
    >      >>> org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:216)*
    >
    >      >>>
    >
    >      >>> *               at
    >
    >      >>> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1053)*
    >
    >      >>>
    >
    >      >>>
    >
    >      >>> *               at
    >
    >      >>> org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1129)*
    >
    >      >>>
    >
    >      >>>
    >
    >      >>> *               at java.security.AccessController.doPrivileged(Native
    >
    >      >>> Method)*
    >
    >      >>>
    >
    >      >>> *               at javax.security.auth.Subject.doAs(Subject.java:422)*
    >
    >      >>>
    >
    >      >>> *               at
    >
    >      >>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1920)*
    >
    >      >>>
    >
    >      >>>
    >
    >      >>> *               at
    >
    >      >>> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)*
    >
    >      >>>
    >
    >      >>>
    >
    >      >>> *               at
    >
    >      >>> org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1129)*
    >
    >      >>>
    >
    >      >>> *Caused by: org.apache.hadoop.ipc.RemoteException(java.io.IOException):
    >
    >      >>> Delegation Token can be issued only with kerberos or web authentication*
    >
    >      >>>
    >
    >      >>> *               at
    >
    >      >>> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getDelegationToken(FSNamesystem.java:7560)*
    >
    >      >>>
    >
    >      >>>
    >
    >      >>> *               at
    >
    >      >>> org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.getDelegationToken(NameNodeRpcServer.java:548)*
    >
    >      >>>
    >
    >      >>>
    >
    >      >>> *               at
    >
    >      >>> org.apache.hadoop.hdfs.server.namenode.AuthorizationProviderProxyClientProtocol.getDelegationToken(AuthorizationProviderProxyClientProtocol.java:663)*
    >
    >      >>>
    >
    >      >>>
    >
    >      >>> *               at
    >
    >      >>> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.getDelegationToken(ClientNamenodeProtocolServerSideTranslatorPB.java:981)*
    >
    >      >>>
    >
    >      >>>
    >
    >      >>> *               at
    >
    >      >>> org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)*
    >
    >      >>>
    >
    >      >>>
    >
    >      >>> *               at
    >
    >      >>> org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:617)*
    >
    >      >>>
    >
    >      >>>
    >
    >      >>> *               at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:1073)*
    >
    >      >>>
    >
    >      >>> *               at
    >
    >      >>> org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2221)*
    >
    >      >>>
    >
    >      >>> *               at
    >
    >      >>> org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2217)*
    >
    >      >>>
    >
    >      >>> *               at java.security.AccessController.doPrivileged(Native
    >
    >      >>> Method)*
    >
    >      >>>
    >
    >      >>> *               at javax.security.auth.Subject.doAs(Subject.java:422)*
    >
    >      >>>
    >
    >      >>> *               at
    >
    >      >>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1920)*
    >
    >      >>>
    >
    >      >>>
    >
    >      >>> *               at
    >
    >      >>> org.apache.hadoop.ipc.Server$Handler.run(Server.java:2215) *
    >
    >      >>>
    >
    >      >>> *                at org.apache.hadoop.ipc.Client.call(Client.java:1472)*
    >
    >      >>>
    >
    >      >>> *               at org.apache.hadoop.ipc.Client.call(Client.java:1409)*
    >
    >      >>>
    >
    >      >>> *               at
    >
    >      >>> org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:230)*
    >
    >      >>>
    >
    >      >>>
    >
    >      >>> *               at com.sun.proxy.$Proxy18.getDelegationToken(Unknown
    >
    >      >>> Source)*
    >
    >      >>>
    >
    >      >>> *               at
    >
    >      >>> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getDelegationToken(ClientNamenodeProtocolTranslatorPB.java:928)*
    >
    >      >>>
    >
    >      >>>
    >
    >      >>> *               at sun.reflect.NativeMethodAccessorImpl.invoke0(Native
    >
    >      >>> Method)*
    >
    >      >>>
    >
    >      >>> *               at
    >
    >      >>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)*
    >
    >      >>>
    >
    >      >>>
    >
    >      >>> *               at
    >
    >      >>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)*
    >
    >      >>>
    >
    >      >>>
    >
    >      >>> *               at java.lang.reflect.Method.invoke(Method.java:498)*
    >
    >      >>>
    >
    >      >>> *               at
    >
    >      >>> org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:256)*
    >
    >      >>>
    >
    >      >>>
    >
    >      >>> *               at
    >
    >      >>> org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:104)*
    >
    >      >>>
    >
    >      >>>
    >
    >      >>> *               at com.sun.proxy.$Proxy19.getDelegationToken(Unknown
    >
    >      >>> Source)*
    >
    >      >>>
    >
    >      >>> *               at
    >
    >      >>> org.apache.hadoop.hdfs.DFSClient.getDelegationToken(DFSClient.java:1082)*
    >
    >      >>>
    >
    >      >>>
    >
    >      >>> *               at
    >
    >      >>> org.apache.hadoop.hdfs.DistributedFileSystem.getDelegationToken(DistributedFileSystem.java:1499)*
    >
    >      >>>
    >
    >      >>>
    >
    >      >>> *               at
    >
    >      >>> org.apache.hadoop.fs.FileSystem.collectDelegationTokens(FileSystem.java:546)*
    >
    >      >>>
    >
    >      >>>
    >
    >      >>> *               at
    >
    >      >>> org.apache.hadoop.fs.FileSystem.collectDelegationTokens(FileSystem.java:557)*
    >
    >      >>>
    >
    >      >>>
    >
    >      >>> *               at
    >
    >      >>> org.apache.hadoop.fs.FileSystem.addDelegationTokens(FileSystem.java:524)*
    >
    >      >>>
    >
    >      >>>
    >
    >      >>> *               at
    >
    >      >>> org.apache.hadoop.mapreduce.security.TokenCache.obtainTokensForNamenodesInternal(TokenCache.java:140)*
    >
    >      >>>
    >
    >      >>>
    >
    >      >>> *               at
    >
    >      >>> org.apache.hadoop.mapreduce.security.TokenCache.obtainTokensForNamenodesInternal(TokenCache.java:100)*
    >
    >      >>>
    >
    >      >>>
    >
    >      >>> *               at
    >
    >      >>> org.apache.hadoop.mapreduce.security.TokenCache.obtainTokensForNamenodes(TokenCache.java:80)*
    >
    >      >>>
    >
    >      >>>
    >
    >      >>> *               at
    >
    >      >>> org.apache.flink.yarn.Utils.setTokensFor(Utils.java:235)*
    >
    >      >>>
    >
    >      >>> *               at
    >
    >      >>> org.apache.flink.yarn.AbstractYarnClusterDescriptor.startAppMaster(AbstractYarnClusterDescriptor.java:972)*
    >
    >      >>>
    >
    >      >>>
    >
    >      >>> *               at
    >
    >      >>> org.apache.flink.yarn.AbstractYarnClusterDescriptor.deployInternal(AbstractYarnClusterDescriptor.java:545)*
    >
    >      >>>
    >
    >      >>>
    >
    >      >>> *               at
    >
    >      >>> org.apache.flink.yarn.AbstractYarnClusterDescriptor.deploySessionCluster(AbstractYarnClusterDescriptor.java:416)*
    >
    >      >>>
    >
    >      >>>
    >
    >      >>>
    >
    >      >>>
    >
    >      >>> The kerberos configuration in this case is the default one. Then I tried
    >
    >      >>> with this option set to false ‘security.kerberos.login.use-ticket-cache‘
    >
    >      >>> but I get the same error.
    >
    >      >>>
    >
    >      >>> I was able to solve the problem by issuing a ticket (with kinit) but I’d
    >
    >      >>> like to know if it’s possible to make flink work with delegation
    >
    >      >>> tokens and
    >
    >      >>> if so what is the right config.
    >
    >      >>>
    >
    >      >>>
    >
    >      >>>
    >
    >      >>> Thank you,
    >
    >      >>>
    >
    >      >>> Juan
    >
    >      >>>
    >
    >      >>>
    >
    >      >>>
    >
    >      >>>
    >
    >      >>
    >
    >

Reply | Threaded
Open this post in threaded view
|

Re: Yarn Kerberos issue

Rong Rong
Hi Juan,

Sorry I think I hit send button too soon as well :-) There's 2nd part of the analysis which was already captured in FLINK-15561 but not sent: 
It seems like the delegation token checker is imposed in YarnClusterDescriptor [1], but not in HadoopModule [2].

in regards to your comment, you are probably right, I am no expert in how DT works, just merely pointing out the difference. It might not be related. However, I think the security HadoopModule is both used on the client side as well as the job manager / task manager side: see: [3,4] so I think they go down the same code path. 

Regarding the DT renewal: you are right on both end: there's less reason to run renewal thread on the client side, and I think most of the case renewal doesn't matter since in Spark's scenario: DT is valid for 7 days (maximum) and renewal is only required every 24 hours.

I will try to follow up on the ticket and see if I can find more DT related issues. 
If you have more comments or information, could you also please comment on the JIRA ticket as well? 


Thanks,
Rong


On Mon, Jan 13, 2020 at 1:24 AM Juan Gentile <[hidden email]> wrote:

Thank you Rong

 

We believe that the job (or scheduler) launching Flink should be the one responsible for renewing the DT. Here is some documentation that could be useful regarding Spark https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/deploy/security/README.md#dt-renewal-renewers-and-yarn

 

If you think that you need more information about our issue, we can organize a call and discuss about it.

 

Regards,

Juan

 

From: Rong Rong <[hidden email]>
Date: Sunday, January 12, 2020 at 6:13 PM
To: Juan Gentile <[hidden email]>
Cc: Aljoscha Krettek <[hidden email]>, "[hidden email]" <[hidden email]>, Arnaud Dufranne <[hidden email]>, Oleksandr Nitavskyi <[hidden email]>
Subject: Re: Yarn Kerberos issue

 

Hi Juan,

 

I have some time to dig deeper into the code, It seems like the HADOOP_TOKEN_FILE_LOCATION is actually a static environment variable field that the UserGroupInformation will read. 

Interestingly Flink's Hadoop security module actually treats it differently depending on whether this is set, see [1]. This is the major difference from how UserGroupInformation handles credential [2]

 

We may just need to spawn a thread to periodically renew the delegation token. I already filed a JIRA ticket for tracking this [3]

 

--

Thanks,

Rong

 

 

 

On Fri, Jan 10, 2020 at 8:19 AM Juan Gentile <[hidden email]> wrote:

The error we get is the following:

org.apache.flink.client.deployment.ClusterDeploymentException: Couldn't deploy Yarn session cluster
        at org.apache.flink.yarn.AbstractYarnClusterDescriptor.deploySessionCluster(AbstractYarnClusterDescriptor.java:423)
        at org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:262)
        at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:216)
        at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1053)
        at org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1129)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:422)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1920)
        at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
        at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1129)
Caused by: java.lang.RuntimeException: Hadoop security with Kerberos is enabled but the login user does not have Kerberos credentials
        at org.apache.flink.yarn.AbstractYarnClusterDescriptor.deployInternal(AbstractYarnClusterDescriptor.java:490)
        at org.apache.flink.yarn.AbstractYarnClusterDescriptor.deploySessionCluster(AbstractYarnClusterDescriptor.java:416)
        ... 9 more

On 1/10/20, 3:36 PM, "Aljoscha Krettek" <[hidden email]> wrote:

    Hi,

    Interesting! What problem are you seeing when you don't unset that
    environment variable? From reading UserGroupInformation.java our code
    should almost work when that environment variable is set.

    Best,
    Aljoscha

    On 10.01.20 15:23, Juan Gentile wrote:
    > Hello Aljoscha!
    >
    >
    >
    > The way we send the DTs to spark is by setting an env variable (HADOOP_TOKEN_FILE_LOCATION) which interestingly we have to unset when we run Flink because even if we do kinit that variable affects somehow Flink and doesn’t work.
    >
    > I’m not an expert but what you describe (We would need to modify the SecurityUtils [2] to create a UGI from tokens, i.e. UserGroupInformation.createRemoteUser() and then addCreds() or addToken()) makes sense.
    >
    > If you are able to do this it would be great and help us a lot!
    >
    >
    >
    > Thank you,
    >
    > Juan
    >
    >
    >
    > On 1/10/20, 3:13 PM, "Aljoscha Krettek" <[hidden email]> wrote:
    >
    >
    >
    >      Hi,
    >
    >
    >
    >      it seems I hin send to early, my mail was missing a small part. This is
    >
    >      the full mail again:
    >
    >
    >
    >      to summarize and clarify various emails: currently, you can only use
    >
    >      Kerberos authentication via tickets (i.e. kinit) or keytab. The relevant
    >
    >      bit of code is in the Hadoop security module [1]. Here you can see that
    >
    >      we either use keytab or try to login as a user.
    >
    >
    >
    >      I think we should be able to extend this to also work with delegation
    >
    >      tokens (DTs). We would need to modify the SecurityUtils [2] to create a
    >
    >      UGI from tokens, i.e. UserGroupInformation.createRemoteUser() and then
    >
    >      addCreds() or addToken(). In Spark, how do you pass the DTs to the
    >
    >      system as a user?
    >
    >
    >
    >      Best,
    >
    >      Aljoscha
    >
    >
    >
    >      [1]
    >
    >      https://eur03.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgithub.com%2Fapache%2Fflink%2Fblob%2F64e2f27640946bf3b1608d4d85585fe18891dcee%2Fflink-runtime%2Fsrc%2Fmain%2Fjava%2Forg%2Fapache%2Fflink%2Fruntime%2Fsecurity%2Fmodules%2FHadoopModule.java%23L68&amp;data=02%7C01%7Cj.gentile%40criteo.com%7C812d636410c8447bf75608d795da85b1%7C2a35d8fd574d48e3927c8c398e225a01%7C1%7C0%7C637142638087617028&amp;sdata=3FKN2UWzWr1K14klFN1Su2hKvhi8%2BADj4GCwdTDnKQU%3D&amp;reserved=0
    >
    >      [2]
    >
    >      https://eur03.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgithub.com%2Fapache%2Fflink%2Fblob%2F2bbf4e5002d7657018ba0b53b7a3ed8ee3124da8%2Fflink-runtime%2Fsrc%2Fmain%2Fjava%2Forg%2Fapache%2Fflink%2Fruntime%2Fsecurity%2FSecurityUtils.java%23L89&amp;data=02%7C01%7Cj.gentile%40criteo.com%7C812d636410c8447bf75608d795da85b1%7C2a35d8fd574d48e3927c8c398e225a01%7C1%7C0%7C637142638087626978&amp;sdata=to%2FsDgs1JlTASVg76OmxRD4dVPuSuwZC5jBpPms8hFY%3D&amp;reserved=0
    >
    >
    >
    >      On 10.01.20 15:02, Aljoscha Krettek wrote:
    >
    >      > Hi Juan,
    >
    >      >
    >
    >      > to summarize and clarify various emails: currently, you can only use
    >
    >      > Kerberos authentication via tickets (i.e. kinit) or keytab. The relevant
    >
    >      > bit of code is in the Hadoop security module: [1]. Here you can see that
    >
    >      > we either use keytab.
    >
    >      >
    >
    >      > I think we should be able to extend this to also work with delegation
    >
    >      > tokens (DTs). In Spark, how do you pass the DTs to the system as a user?
    >
    >      >
    >
    >      > Best,
    >
    >      > Aljoscha
    >
    >      >
    >
    >      > [1]
    >
    >      > https://eur03.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgithub.com%2Fapache%2Fflink%2Fblob%2F64e2f27640946bf3b1608d4d85585fe18891dcee%2Fflink-runtime%2Fsrc%2Fmain%2Fjava%2Forg%2Fapache%2Fflink%2Fruntime%2Fsecurity%2Fmodules%2FHadoopModule.java%23L68&amp;data=02%7C01%7Cj.gentile%40criteo.com%7C812d636410c8447bf75608d795da85b1%7C2a35d8fd574d48e3927c8c398e225a01%7C1%7C0%7C637142638087626978&amp;sdata=T9Oy6ZwcDyH2mraQqvYXusan271romFT2tEjQRTk%2FVc%3D&amp;reserved=0
    >
    >      >
    >
    >      >
    >
    >      > On 06.01.20 09:52, Yang Wang wrote:
    >
    >      >> I guess you have set some kerberos related configuration in spark
    >
    >      >> jobs. For
    >
    >      >> Flink, you need
    >
    >      >> to do this too by the following configs. And the keytab file should
    >
    >      >> existed
    >
    >      >> on Flink client. In your
    >
    >      >> environment, it means the scheduler(oozie) could access the keytab file.
    >
    >      >>
    >
    >      >> security.kerberos.login.keytab
    >
    >      >> security.kerberos.login.principal
    >
    >      >> security.kerberos.login.contexts
    >
    >      >>
    >
    >      >>
    >
    >      >>
    >
    >      >> Best,
    >
    >      >> Yang
    >
    >      >>
    >
    >      >> Juan Gentile <[hidden email]> 202016日周一 下午3:55写道:
    >
    >      >>
    >
    >      >>> Hello Rong, Chesnay,
    >
    >      >>>
    >
    >      >>>
    >
    >      >>>
    >
    >      >>> Thank you for your answer, the way we are trying to launch the job is
    >
    >      >>> through a scheduler (similar to oozie) where we have a keytab for the
    >
    >      >>> scheduler user and with that keytab we get delegation tokens
    >
    >      >>> impersonating
    >
    >      >>> the right user (owner of the job). But the only way I was able to
    >
    >      >>> make this
    >
    >      >>> work is by getting a ticket (through kinit).
    >
    >      >>>
    >
    >      >>> As a comparison, if I launch a spark job (without doing kinit) just with
    >
    >      >>> the delegation tokens, it works okay. So I guess Spark does something
    >
    >      >>> extra.
    >
    >      >>>
    >
    >      >>> This is as far as I could go but at this point I’m not sure if this is
    >
    >      >>> something just not supported by Flink or I’m doing something wrong.
    >
    >      >>>
    >
    >      >>>
    >
    >      >>>
    >
    >      >>> Thank you,
    >
    >      >>>
    >
    >      >>> Juan
    >
    >      >>>
    >
    >      >>>
    >
    >      >>>
    >
    >      >>> *From: *Rong Rong <[hidden email]>
    >
    >      >>> *Date: *Saturday, January 4, 2020 at 6:06 PM
    >
    >      >>> *To: *Chesnay Schepler <[hidden email]>
    >
    >      >>> *Cc: *Juan Gentile <[hidden email]>, "[hidden email]" <
    >
    >      >>> [hidden email]>, Oleksandr Nitavskyi <[hidden email]>
    >
    >      >>> *Subject: *Re: Yarn Kerberos issue
    >
    >      >>>
    >
    >      >>>
    >
    >      >>>
    >
    >      >>> Hi Juan,
    >
    >      >>>
    >
    >      >>>
    >
    >      >>>
    >
    >      >>> Chesnay was right. If you are using CLI to launch your session cluster
    >
    >      >>> based on the document [1], you following the instruction to use kinit
    >
    >      >>> [2]
    >
    >      >>> first seems to be one of the right way to go.
    >
    >      >>>
    >
    >      >>> Another way of approaching it is to setup the kerberos settings in the
    >
    >      >>> flink-conf.yaml file [3]. FlinkYarnSessionCli will be able to pick up
    >
    >      >>> your
    >
    >      >>> keytab files and run the CLI securely.
    >
    >      >>>
    >
    >      >>>
    >
    >      >>>
    >
    >      >>> As far as I know the option `security.kerberos.login.use-ticket-cache`
    >
    >      >>> doesn't actually change the behavior of the authentication process,
    >
    >      >>> it is
    >
    >      >>> more of a hint whether to use the ticket cache instantiated by
    >
    >      >>> `kinit`. If
    >
    >      >>> you disable using the ticket cache, you will have to use the
    >
    >      >>> "keytab/principle" approach - this doc [4] might be helpful to explain
    >
    >      >>> better.
    >
    >      >>>
    >
    >      >>>
    >
    >      >>>
    >
    >      >>> Thanks,
    >
    >      >>>
    >
    >      >>> Rong
    >
    >      >>>
    >
    >      >>>
    >
    >      >>>
    >
    >      >>>
    >
    >      >>>
    >
    >      >>> [1]
    >
    >      >>> https://eur03.safelinks.protection.outlook.com/?url=https%3A%2F%2Fci.apache.org%2Fprojects%2Fflink%2Fflink-docs-stable%2Fops%2Fdeployment%2Fyarn_setup.html%23start-flink-session&amp;data=02%7C01%7Cj.gentile%40criteo.com%7C812d636410c8447bf75608d795da85b1%7C2a35d8fd574d48e3927c8c398e225a01%7C1%7C0%7C637142638087626978&amp;sdata=UiAsVViNxO9kFF93N0VXS704lMaCXCnI4%2F4tTsHglsk%3D&amp;reserved=0
    >
    >      >>>
    >
    >      >>> <https://eur03.safelinks.protection.outlook.com/?url=https%3A%2F%2Fci.apache.org%2Fprojects%2Fflink%2Fflink-docs-stable%2Fops%2Fdeployment%2Fyarn_setup.html%23start-flink-session&amp;data=02%7C01%7Cj.gentile%40criteo.com%7C812d636410c8447bf75608d795da85b1%7C2a35d8fd574d48e3927c8c398e225a01%7C1%7C0%7C637142638087626978&amp;sdata=UiAsVViNxO9kFF93N0VXS704lMaCXCnI4%2F4tTsHglsk%3D&amp;reserved=0>
    >
    >      >>>
    >
    >      >>>
    >
    >      >>> [2]
    >
    >      >>> https://eur03.safelinks.protection.outlook.com/?url=https%3A%2F%2Fci.apache.org%2Fprojects%2Fflink%2Fflink-docs-stable%2Fops%2Fsecurity-kerberos.html%23using-kinit-yarn-only&amp;data=02%7C01%7Cj.gentile%40criteo.com%7C812d636410c8447bf75608d795da85b1%7C2a35d8fd574d48e3927c8c398e225a01%7C1%7C0%7C637142638087626978&amp;sdata=fW03W5sRPF3SiTc4j7DG8YMzUrhh3sOp0nW5Fo1CxZc%3D&amp;reserved=0
    >
    >      >>>
    >
    >      >>> <https://eur03.safelinks.protection.outlook.com/?url=https%3A%2F%2Fci.apache.org%2Fprojects%2Fflink%2Fflink-docs-stable%2Fops%2Fsecurity-kerberos.html%23using-kinit-yarn-only&amp;data=02%7C01%7Cj.gentile%40criteo.com%7C812d636410c8447bf75608d795da85b1%7C2a35d8fd574d48e3927c8c398e225a01%7C1%7C0%7C637142638087626978&amp;sdata=fW03W5sRPF3SiTc4j7DG8YMzUrhh3sOp0nW5Fo1CxZc%3D&amp;reserved=0>
    >
    >      >>>
    >
    >      >>>
    >
    >      >>> [3]
    >
    >      >>> https://eur03.safelinks.protection.outlook.com/?url=https%3A%2F%2Fci.apache.org%2Fprojects%2Fflink%2Fflink-docs-stable%2Fops%2Fsecurity-kerberos.html%23yarnmesos-mode&amp;data=02%7C01%7Cj.gentile%40criteo.com%7C812d636410c8447bf75608d795da85b1%7C2a35d8fd574d48e3927c8c398e225a01%7C1%7C0%7C637142638087626978&amp;sdata=xoRo3%2BM1Lz8ka3NsLR1JnUmUhqU2QARj%2Bq8kf%2BiIbRY%3D&amp;reserved=0
    >
    >      >>>
    >
    >      >>> <https://eur03.safelinks.protection.outlook.com/?url=https%3A%2F%2Fci.apache.org%2Fprojects%2Fflink%2Fflink-docs-stable%2Fops%2Fsecurity-kerberos.html%23yarnmesos-mode&amp;data=02%7C01%7Cj.gentile%40criteo.com%7C812d636410c8447bf75608d795da85b1%7C2a35d8fd574d48e3927c8c398e225a01%7C1%7C0%7C637142638087626978&amp;sdata=xoRo3%2BM1Lz8ka3NsLR1JnUmUhqU2QARj%2Bq8kf%2BiIbRY%3D&amp;reserved=0>
    >
    >      >>>
    >
    >      >>>
    >
    >      >>> [4]
    >
    >      >>> https://eur03.safelinks.protection.outlook.com/?url=https%3A%2F%2Fci.apache.org%2Fprojects%2Fflink%2Fflink-docs-release-1.9%2Fdev%2Fconnectors%2Fkafka.html%23enabling-kerberos-authentication-for-versions-09-and-above-only&amp;data=02%7C01%7Cj.gentile%40criteo.com%7C812d636410c8447bf75608d795da85b1%7C2a35d8fd574d48e3927c8c398e225a01%7C1%7C0%7C637142638087626978&amp;sdata=3a%2B4nIUvW%2FTaJY%2FuNEcE2yWg%2Fd%2FsLsJt%2FlkTntAvcOc%3D&amp;reserved=0
    >
    >      >>>
    >
    >      >>> <https://eur03.safelinks.protection.outlook.com/?url=https%3A%2F%2Fci.apache.org%2Fprojects%2Fflink%2Fflink-docs-release-1.9%2Fdev%2Fconnectors%2Fkafka.html%23enabling-kerberos-authentication-for-versions-09-and-above-only&amp;data=02%7C01%7Cj.gentile%40criteo.com%7C812d636410c8447bf75608d795da85b1%7C2a35d8fd574d48e3927c8c398e225a01%7C1%7C0%7C637142638087626978&amp;sdata=3a%2B4nIUvW%2FTaJY%2FuNEcE2yWg%2Fd%2FsLsJt%2FlkTntAvcOc%3D&amp;reserved=0>
    >
    >      >>>
    >
    >      >>>
    >
    >      >>>
    >
    >      >>>
    >
    >      >>> On Fri, Jan 3, 2020 at 7:20 AM Chesnay Schepler <[hidden email]>
    >
    >      >>> wrote:
    >
    >      >>>
    >
    >      >>>  From what I understand from the documentation, if you want to use
    >
    >      >>> delegation tokens you always first have to issue a ticket using
    >
    >      >>> kinit; so
    >
    >      >>> you did everything correctly?
    >
    >      >>>
    >
    >      >>>
    >
    >      >>>
    >
    >      >>> On 02/01/2020 13:00, Juan Gentile wrote:
    >
    >      >>>
    >
    >      >>> Hello,
    >
    >      >>>
    >
    >      >>>
    >
    >      >>>
    >
    >      >>> Im trying to submit a job (batch worcount) to a Yarn cluster. I’m trying
    >
    >      >>> to use delegation tokens and I’m getting the following error:
    >
    >      >>>
    >
    >      >>> *org.apache.flink.client.deployment.ClusterDeploymentException: Couldn't
    >
    >      >>> deploy Yarn session cluster*
    >
    >      >>>
    >
    >      >>> *               at
    >
    >      >>> org.apache.flink.yarn.AbstractYarnClusterDescriptor.deploySessionCluster(AbstractYarnClusterDescriptor.java:423)*
    >
    >      >>>
    >
    >      >>>
    >
    >      >>> *               at
    >
    >      >>> org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:262)*
    >
    >      >>>
    >
    >      >>>
    >
    >      >>> *               at
    >
    >      >>> org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:216)*
    >
    >      >>>
    >
    >      >>> *               at
    >
    >      >>> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1053)*
    >
    >      >>>
    >
    >      >>>
    >
    >      >>> *               at
    >
    >      >>> org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1129)*
    >
    >      >>>
    >
    >      >>>
    >
    >      >>> *               at java.security.AccessController.doPrivileged(Native
    >
    >      >>> Method)*
    >
    >      >>>
    >
    >      >>> *               at javax.security.auth.Subject.doAs(Subject.java:422)*
    >
    >      >>>
    >
    >      >>> *               at
    >
    >      >>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1920)*
    >
    >      >>>
    >
    >      >>>
    >
    >      >>> *               at
    >
    >      >>> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)*
    >
    >      >>>
    >
    >      >>>
    >
    >      >>> *               at
    >
    >      >>> org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1129)*
    >
    >      >>>
    >
    >      >>> *Caused by: org.apache.hadoop.ipc.RemoteException(java.io.IOException):
    >
    >      >>> Delegation Token can be issued only with kerberos or web authentication*
    >
    >      >>>
    >
    >      >>> *               at
    >
    >      >>> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getDelegationToken(FSNamesystem.java:7560)*
    >
    >      >>>
    >
    >      >>>
    >
    >      >>> *               at
    >
    >      >>> org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.getDelegationToken(NameNodeRpcServer.java:548)*
    >
    >      >>>
    >
    >      >>>
    >
    >      >>> *               at
    >
    >      >>> org.apache.hadoop.hdfs.server.namenode.AuthorizationProviderProxyClientProtocol.getDelegationToken(AuthorizationProviderProxyClientProtocol.java:663)*
    >
    >      >>>
    >
    >      >>>
    >
    >      >>> *               at
    >
    >      >>> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.getDelegationToken(ClientNamenodeProtocolServerSideTranslatorPB.java:981)*
    >
    >      >>>
    >
    >      >>>
    >
    >      >>> *               at
    >
    >      >>> org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)*
    >
    >      >>>
    >
    >      >>>
    >
    >      >>> *               at
    >
    >      >>> org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:617)*
    >
    >      >>>
    >
    >      >>>
    >
    >      >>> *               at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:1073)*
    >
    >      >>>
    >
    >      >>> *               at
    >
    >      >>> org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2221)*
    >
    >      >>>
    >
    >      >>> *               at
    >
    >      >>> org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2217)*
    >
    >      >>>
    >
    >      >>> *               at java.security.AccessController.doPrivileged(Native
    >
    >      >>> Method)*
    >
    >      >>>
    >
    >      >>> *               at javax.security.auth.Subject.doAs(Subject.java:422)*
    >
    >      >>>
    >
    >      >>> *               at
    >
    >      >>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1920)*
    >
    >      >>>
    >
    >      >>>
    >
    >      >>> *               at
    >
    >      >>> org.apache.hadoop.ipc.Server$Handler.run(Server.java:2215) *
    >
    >      >>>
    >
    >      >>> *                at org.apache.hadoop.ipc.Client.call(Client.java:1472)*
    >
    >      >>>
    >
    >      >>> *               at org.apache.hadoop.ipc.Client.call(Client.java:1409)*
    >
    >      >>>
    >
    >      >>> *               at
    >
    >      >>> org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:230)*
    >
    >      >>>
    >
    >      >>>
    >
    >      >>> *               at com.sun.proxy.$Proxy18.getDelegationToken(Unknown
    >
    >      >>> Source)*
    >
    >      >>>
    >
    >      >>> *               at
    >
    >      >>> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getDelegationToken(ClientNamenodeProtocolTranslatorPB.java:928)*
    >
    >      >>>
    >
    >      >>>
    >
    >      >>> *               at sun.reflect.NativeMethodAccessorImpl.invoke0(Native
    >
    >      >>> Method)*
    >
    >      >>>
    >
    >      >>> *               at
    >
    >      >>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)*
    >
    >      >>>
    >
    >      >>>
    >
    >      >>> *               at
    >
    >      >>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)*
    >
    >      >>>
    >
    >      >>>
    >
    >      >>> *               at java.lang.reflect.Method.invoke(Method.java:498)*
    >
    >      >>>
    >
    >      >>> *               at
    >
    >      >>> org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:256)*
    >
    >      >>>
    >
    >      >>>
    >
    >      >>> *               at
    >
    >      >>> org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:104)*
    >
    >      >>>
    >
    >      >>>
    >
    >      >>> *               at com.sun.proxy.$Proxy19.getDelegationToken(Unknown
    >
    >      >>> Source)*
    >
    >      >>>
    >
    >      >>> *               at
    >
    >      >>> org.apache.hadoop.hdfs.DFSClient.getDelegationToken(DFSClient.java:1082)*
    >
    >      >>>
    >
    >      >>>
    >
    >      >>> *               at
    >
    >      >>> org.apache.hadoop.hdfs.DistributedFileSystem.getDelegationToken(DistributedFileSystem.java:1499)*
    >
    >      >>>
    >
    >      >>>
    >
    >      >>> *               at
    >
    >      >>> org.apache.hadoop.fs.FileSystem.collectDelegationTokens(FileSystem.java:546)*
    >
    >      >>>
    >
    >      >>>
    >
    >      >>> *               at
    >
    >      >>> org.apache.hadoop.fs.FileSystem.collectDelegationTokens(FileSystem.java:557)*
    >
    >      >>>
    >
    >      >>>
    >
    >      >>> *               at
    >
    >      >>> org.apache.hadoop.fs.FileSystem.addDelegationTokens(FileSystem.java:524)*
    >
    >      >>>
    >
    >      >>>
    >
    >      >>> *               at
    >
    >      >>> org.apache.hadoop.mapreduce.security.TokenCache.obtainTokensForNamenodesInternal(TokenCache.java:140)*
    >
    >      >>>
    >
    >      >>>
    >
    >      >>> *               at
    >
    >      >>> org.apache.hadoop.mapreduce.security.TokenCache.obtainTokensForNamenodesInternal(TokenCache.java:100)*
    >
    >      >>>
    >
    >      >>>
    >
    >      >>> *               at
    >
    >      >>> org.apache.hadoop.mapreduce.security.TokenCache.obtainTokensForNamenodes(TokenCache.java:80)*
    >
    >      >>>
    >
    >      >>>
    >
    >      >>> *               at
    >
    >      >>> org.apache.flink.yarn.Utils.setTokensFor(Utils.java:235)*
    >
    >      >>>
    >
    >      >>> *               at
    >
    >      >>> org.apache.flink.yarn.AbstractYarnClusterDescriptor.startAppMaster(AbstractYarnClusterDescriptor.java:972)*
    >
    >      >>>
    >
    >      >>>
    >
    >      >>> *               at
    >
    >      >>> org.apache.flink.yarn.AbstractYarnClusterDescriptor.deployInternal(AbstractYarnClusterDescriptor.java:545)*
    >
    >      >>>
    >
    >      >>>
    >
    >      >>> *               at
    >
    >      >>> org.apache.flink.yarn.AbstractYarnClusterDescriptor.deploySessionCluster(AbstractYarnClusterDescriptor.java:416)*
    >
    >      >>>
    >
    >      >>>
    >
    >      >>>
    >
    >      >>>
    >
    >      >>> The kerberos configuration in this case is the default one. Then I tried
    >
    >      >>> with this option set to false ‘security.kerberos.login.use-ticket-cache‘
    >
    >      >>> but I get the same error.
    >
    >      >>>
    >
    >      >>> I was able to solve the problem by issuing a ticket (with kinit) but I’d
    >
    >      >>> like to know if it’s possible to make flink work with delegation
    >
    >      >>> tokens and
    >
    >      >>> if so what is the right config.
    >
    >      >>>
    >
    >      >>>
    >
    >      >>>
    >
    >      >>> Thank you,
    >
    >      >>>
    >
    >      >>> Juan
    >
    >      >>>
    >
    >      >>>
    >
    >      >>>
    >
    >      >>>
    >
    >      >>
    >
    >