Hello and happy new year to all flink users,
I have a streaming application (flink v1.7.0) on a Kerberized cluster, using a flink configuration file where the following parameters are set : security.kerberos.login.use-ticket-cache: false security.kerberos.login.keytab: XXXXX security.kerberos.login.principal: XXXXX As it is not sufficient, I also log to Kerberos the "open()" method of sources/sinks using hdfs or hiveserver2/impala servers using UserGroupInformation.loginUserFromKeytab(). And as it is even not sufficient in some case (namely the HiveServer2/Impala connection), I also attach a Jaas object to the TaskManager setting java.security.auth.login.config property dynamically. And as it is in some rare cases not even sufficient, I do run kinit as an external process from the task manager to create a local ticket cache... With all that stuff, everything works fine for several days when the streaming app does not experience any problem. However, when a problem occurs, when restoring from the checkpoint (hdfs backend), I get the following exception if it occurs after 24h from the initial application launch (24h is the Kerberos ticket validation time): java.io.IOException: Failed on local exception: java.io.IOException: javax.security.sasl.SaslException: GSS initiate failed [Caused by GSSException: No valid credentials provided (Mechanism level: Failed to find any Kerberos tgt)]; Host Details : local host is: "xxxxx"; destination host is: "xxxxx; org.apache.hadoop.net.NetUtils.wrapException(NetUtils.java:772) org.apache.hadoop.ipc.Client.call(Client.java:1474) org.apache.hadoop.ipc.Client.call(Client.java:1401) org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:232) com.sun.proxy.$Proxy9.mkdirs(Unknown Source) org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.mkdirs(ClientNamenodeProtocolTranslatorPB.java:539) sun.reflect.GeneratedMethodAccessor19.invoke(Unknown Source) sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) java.lang.reflect.Method.invoke(Method.java:498) org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:187) org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102) com.sun.proxy.$Proxy10.mkdirs(Unknown Source) org.apache.hadoop.hdfs.DFSClient.primitiveMkdir(DFSClient.java:2742) org.apache.hadoop.hdfs.DFSClient.mkdirs(DFSClient.java:2713) org.apache.hadoop.hdfs.DistributedFileSystem$17.doCall(DistributedFileSystem.java:870) org.apache.hadoop.hdfs.DistributedFileSystem$17.doCall(DistributedFileSystem.java:866) org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81) org.apache.hadoop.hdfs.DistributedFileSystem.mkdirsInternal(DistributedFileSystem.java:866) org.apache.hadoop.hdfs.DistributedFileSystem.mkdirs(DistributedFileSystem.java:859) org.apache.hadoop.fs.FileSystem.mkdirs(FileSystem.java:1819) org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.mkdirs(HadoopFileSystem.java:170) org.apache.flink.core.fs.SafetyNetWrapperFileSystem.mkdirs(SafetyNetWrapperFileSystem.java:112) org.apache.flink.runtime.state.filesystem.FsCheckpointStorage.<init>(FsCheckpointStorage.java:83) org.apache.flink.runtime.state.filesystem.FsCheckpointStorage.<init>(FsCheckpointStorage.java:58) org.apache.flink.runtime.state.filesystem.FsStateBackend.createCheckpointStorage(FsStateBackend.java:444) org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:257) org.apache.flink.runtime.taskmanager.Task.run(Task.java:704) java.lang.Thread.run(Thread.java:748) Caused by : java.io.IOException: javax.security.sasl.SaslException: GSS initiate failed [Caused by GSSException: No valid credentials provided (Mechanism level: Failed to find any Kerberos tgt)] org.apache.hadoop.ipc.Client$Connection$1.run(Client.java:682) java.security.AccessController.doPrivileged(Native Method) javax.security.auth.Subject.doAs(Subject.java:422) (...) Any idea about how I can circumvent this? For instance, can I "hook" the restoring process before the mkdir to relog to Kerberos by hand? Best regards, Arnaud ________________________________ L'intégrité de ce message n'étant pas assurée sur internet, la société expéditrice ne peut être tenue responsable de son contenu ni de ses pièces jointes. Toute utilisation ou diffusion non autorisée est interdite. Si vous n'êtes pas destinataire de ce message, merci de le détruire et d'avertir l'expéditeur. The integrity of this message cannot be guaranteed on the Internet. The company that sent this message cannot therefore be held liable for its content nor attachments. Any unauthorized use or dissemination is prohibited. If you are not the intended recipient of this message, then please delete it and notify the sender. |
Hi,
I've managed to correct this by implementing my own FsStateBackend based on the original one with proper Kerberos relogin in createCheckpointStorage(). Regards, Arnaud -----Message d'origine----- De : LINZ, Arnaud Envoyé : vendredi 4 janvier 2019 11:32 À : user <[hidden email]> Objet : Kerberos error when restoring from HDFS backend after 24 hours Hello and happy new year to all flink users, I have a streaming application (flink v1.7.0) on a Kerberized cluster, using a flink configuration file where the following parameters are set : security.kerberos.login.use-ticket-cache: false security.kerberos.login.keytab: XXXXX security.kerberos.login.principal: XXXXX As it is not sufficient, I also log to Kerberos the "open()" method of sources/sinks using hdfs or hiveserver2/impala servers using UserGroupInformation.loginUserFromKeytab(). And as it is even not sufficient in some case (namely the HiveServer2/Impala connection), I also attach a Jaas object to the TaskManager setting java.security.auth.login.config property dynamically. And as it is in some rare cases not even sufficient, I do run kinit as an external process from the task manager to create a local ticket cache... With all that stuff, everything works fine for several days when the streaming app does not experience any problem. However, when a problem occurs, when restoring from the checkpoint (hdfs backend), I get the following exception if it occurs after 24h from the initial application launch (24h is the Kerberos ticket validation time): java.io.IOException: Failed on local exception: java.io.IOException: javax.security.sasl.SaslException: GSS initiate failed [Caused by GSSException: No valid credentials provided (Mechanism level: Failed to find any Kerberos tgt)]; Host Details : local host is: "xxxxx"; destination host is: "xxxxx; org.apache.hadoop.net.NetUtils.wrapException(NetUtils.java:772) org.apache.hadoop.ipc.Client.call(Client.java:1474) org.apache.hadoop.ipc.Client.call(Client.java:1401) org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:232) com.sun.proxy.$Proxy9.mkdirs(Unknown Source) org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.mkdirs(ClientNamenodeProtocolTranslatorPB.java:539) sun.reflect.GeneratedMethodAccessor19.invoke(Unknown Source) sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) java.lang.reflect.Method.invoke(Method.java:498) org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:187) org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102) com.sun.proxy.$Proxy10.mkdirs(Unknown Source) org.apache.hadoop.hdfs.DFSClient.primitiveMkdir(DFSClient.java:2742) org.apache.hadoop.hdfs.DFSClient.mkdirs(DFSClient.java:2713) org.apache.hadoop.hdfs.DistributedFileSystem$17.doCall(DistributedFileSystem.java:870) org.apache.hadoop.hdfs.DistributedFileSystem$17.doCall(DistributedFileSystem.java:866) org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81) org.apache.hadoop.hdfs.DistributedFileSystem.mkdirsInternal(DistributedFileSystem.java:866) org.apache.hadoop.hdfs.DistributedFileSystem.mkdirs(DistributedFileSystem.java:859) org.apache.hadoop.fs.FileSystem.mkdirs(FileSystem.java:1819) org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.mkdirs(HadoopFileSystem.java:170) org.apache.flink.core.fs.SafetyNetWrapperFileSystem.mkdirs(SafetyNetWrapperFileSystem.java:112) org.apache.flink.runtime.state.filesystem.FsCheckpointStorage.<init>(FsCheckpointStorage.java:83) org.apache.flink.runtime.state.filesystem.FsCheckpointStorage.<init>(FsCheckpointStorage.java:58) org.apache.flink.runtime.state.filesystem.FsStateBackend.createCheckpointStorage(FsStateBackend.java:444) org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:257) org.apache.flink.runtime.taskmanager.Task.run(Task.java:704) java.lang.Thread.run(Thread.java:748) Caused by : java.io.IOException: javax.security.sasl.SaslException: GSS initiate failed [Caused by GSSException: No valid credentials provided (Mechanism level: Failed to find any Kerberos tgt)] org.apache.hadoop.ipc.Client$Connection$1.run(Client.java:682) java.security.AccessController.doPrivileged(Native Method) javax.security.auth.Subject.doAs(Subject.java:422) (...) Any idea about how I can circumvent this? For instance, can I "hook" the restoring process before the mkdir to relog to Kerberos by hand? Best regards, Arnaud ________________________________ L'intégrité de ce message n'étant pas assurée sur internet, la société expéditrice ne peut être tenue responsable de son contenu ni de ses pièces jointes. Toute utilisation ou diffusion non autorisée est interdite. Si vous n'êtes pas destinataire de ce message, merci de le détruire et d'avertir l'expéditeur. The integrity of this message cannot be guaranteed on the Internet. The company that sent this message cannot therefore be held liable for its content nor attachments. Any unauthorized use or dissemination is prohibited. If you are not the intended recipient of this message, then please delete it and notify the sender. |
Free forum by Nabble | Edit this page |