Using HadoopInputFormat files from Flink/Yarn in a secure cluster gives an error

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

Using HadoopInputFormat files from Flink/Yarn in a secure cluster gives an error

LINZ, Arnaud

Hello,

 

My application handles as input and output some HDFS files in the jobs and in the driver application.

It works in local cluster mode, but when I’m trying to submit it to a yarn client, when I try to use a HadoopInputFormat (that comes from a HCatalog request), I have the following error: Delegation Token can be issued only with kerberos or web authentication (full stack trace below).

 

Code which I believe causes the error (It’s not clear in the stack trace, as the nearest point in my code is “execEnv.execute()”) :

 

public synchronized DataSet<T> readTable(String dbName, String tableName, String filter, ExecutionEnvironment cluster,

        final HiveBeanFactory<T> factory) throws IOException {

 

        // login kerberos if needed (via UserGroupInformation.loginUserFromKeytab(getKerberosPrincipal(), getKerberosKeytab());)

        HdfsTools.getFileSystem();

 

        // Create M/R job and configure it

        final Job job = Job.getInstance();

        job.setJobName("Flink source for Hive Table " + dbName + "." + tableName);

 

        // Crée la source

        @SuppressWarnings({ "unchecked", "rawtypes" })

        final HadoopInputFormat<NullWritable, DefaultHCatRecord> inputFormat = new HadoopInputFormat<NullWritable, // CHECKSTYLE:OFF

        DefaultHCatRecord>(// CHECKSTYLE:ON

            (InputFormat) HCatInputFormat.setInput(job, dbName, tableName, filter), //

            NullWritable.class, //

            DefaultHCatRecord.class, //

            job);

 

        final HCatSchema inputSchema = HCatInputFormat.getTableSchema(job.getConfiguration());

        @SuppressWarnings("serial")

        final DataSet<T> dataSet = cluster

        // Read the table

            .createInput(inputFormat)

            // map bean (key is useless)

            .flatMap(new FlatMapFunction<Tuple2<NullWritable, DefaultHCatRecord>, T>() {

                @Override

                public void flatMap(Tuple2<NullWritable, DefaultHCatRecord> value, Collector<T> out) throws Exception {  // NOPMD

                    final T record = factory.fromHive(value.f1, inputSchema);

                    if (record != null) {

                        out.collect(record);

                    }

                }

            }).returns(beanClass);

 

        return dataSet;

    }

 

Maybe I need to explicitely get a token on each node in the initialization of HadoopInputFormat() (overriding configure()) ? That would be difficult since the keyfile is on the driver’s local drive…

 

StackTrace :

 

Found YARN properties file /usr/lib/flink/bin/../conf/.yarn-properties

Using JobManager address from YARN properties bt1svlmw.bpa.bouyguestelecom.fr/172.19.115.52:50494

Secure Hadoop environment setup detected. Running in secure context.

2015:08:20 15:04:17 (main) - INFO - com.bouygtel.kuberasdk.main.Application.mainMethod - Dï¿œbut traitement

15:04:18,005 INFO  org.apache.hadoop.security.UserGroupInformation               - Login successful for user alinz using keytab file /usr/users/alinz/alinz.keytab

15:04:20,139 WARN  org.apache.hadoop.hdfs.shortcircuit.DomainSocketFactory       - The short-circuit local reads feature cannot be used because libhadoop cannot be loaded.

Error : Execution Kubera KO : java.lang.IllegalStateException: Error while executing Flink application

com.bouygtel.kuberasdk.main.ApplicationBatch.execCluster(ApplicationBatch.java:84)

com.bouygtel.kubera.main.segment.ApplicationGeoSegment.batchExec(ApplicationGeoSegment.java:68)

com.bouygtel.kuberasdk.main.ApplicationBatch.exec(ApplicationBatch.java:51)

com.bouygtel.kuberasdk.main.Application.mainMethod(Application.java:81)

com.bouygtel.kubera.main.segment.MainGeoSmooth.main(MainGeoSmooth.java:44)

sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)

sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

java.lang.reflect.Method.invoke(Method.java:606)

org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:437)

org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:353)

org.apache.flink.client.program.Client.run(Client.java:315)

org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:584)

org.apache.flink.client.CliFrontend.run(CliFrontend.java:290)

org.apache.flink.client.CliFrontend$2.run(CliFrontend.java:873)

org.apache.flink.client.CliFrontend$2.run(CliFrontend.java:870)

org.apache.flink.runtime.security.SecurityUtils$1.run(SecurityUtils.java:50)

java.security.AccessController.doPrivileged(Native Method)

javax.security.auth.Subject.doAs(Subject.java:415)

org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1628)

org.apache.flink.runtime.security.SecurityUtils.runSecured(SecurityUtils.java:47)

org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:870)

org.apache.flink.client.CliFrontend.main(CliFrontend.java:922)

 

Caused by: org.apache.flink.client.program.ProgramInvocationException: The program execution failed: Failed to submit job dddaf104260eb0f56ff336727ceeb49e (KUBERA-GEO-BRUT2SEGMENT)

org.apache.flink.client.program.Client.run(Client.java:413)

org.apache.flink.client.program.Client.run(Client.java:356)

org.apache.flink.client.program.Client.run(Client.java:349)

org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:63)

com.bouygtel.kuberasdk.main.ApplicationBatch.execCluster(ApplicationBatch.java:80)

com.bouygtel.kubera.main.segment.ApplicationGeoSegment.batchExec(ApplicationGeoSegment.java:68)

com.bouygtel.kuberasdk.main.ApplicationBatch.exec(ApplicationBatch.java:51)

com.bouygtel.kuberasdk.main.Application.mainMethod(Application.java:81)

com.bouygtel.kubera.main.segment.MainGeoSmooth.main(MainGeoSmooth.java:44)

sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)

sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

java.lang.reflect.Method.invoke(Method.java:606)

org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:437)

org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:353)

org.apache.flink.client.program.Client.run(Client.java:315)

org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:584)

org.apache.flink.client.CliFrontend.run(CliFrontend.java:290)

org.apache.flink.client.CliFrontend$2.run(CliFrontend.java:873)

org.apache.flink.client.CliFrontend$2.run(CliFrontend.java:870)

org.apache.flink.runtime.security.SecurityUtils$1.run(SecurityUtils.java:50)

java.security.AccessController.doPrivileged(Native Method)

javax.security.auth.Subject.doAs(Subject.java:415)

org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1628)

org.apache.flink.runtime.security.SecurityUtils.runSecured(SecurityUtils.java:47)

org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:870)

org.apache.flink.client.CliFrontend.main(CliFrontend.java:922)

 

Caused by: org.apache.flink.runtime.client.JobExecutionException: Failed to submit job dddaf104260eb0f56ff336727ceeb49e (KUBERA-GEO-BRUT2SEGMENT)

org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:594)

org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:190)

scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)

scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)

scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)

org.apache.flink.yarn.ApplicationMasterActor$$anonfun$receiveYarnMessages$1.applyOrElse(ApplicationMasterActor.scala:100)

scala.PartialFunction$OrElse.apply(PartialFunction.scala:162)

org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:36)

org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:29)

scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)

org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:29)

akka.actor.Actor$class.aroundReceive(Actor.scala:465)

org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:92)

akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)

akka.actor.ActorCell.invoke(ActorCell.scala:487)

akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)

akka.dispatch.Mailbox.run(Mailbox.scala:221)

akka.dispatch.Mailbox.exec(Mailbox.scala:231)

scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)

scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)

scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)

scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

 

Caused by: org.apache.flink.runtime.JobException: Creating the input splits caused an error: Delegation Token can be issued only with kerberos or web authentication

        at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getDelegationToken(FSNamesystem.java:7609)

        at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.getDelegationToken(NameNodeRpcServer.java:522)

        at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.getDelegationToken(ClientNamenodeProtocolServerSideTranslatorPB.java:977)

        at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)

        at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:619)

        at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:962)

        at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2039)

        at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2035)

        at java.security.AccessController.doPrivileged(Native Method)

        at javax.security.auth.Subject.doAs(Subject.java:415)

        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1628)

        at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2033)

 

org.apache.flink.runtime.executiongraph.ExecutionJobVertex.<init>(ExecutionJobVertex.java:162)

org.apache.flink.runtime.executiongraph.ExecutionGraph.attachJobGraph(ExecutionGraph.java:469)

org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:534)

org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:190)

scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)

scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)

scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)

org.apache.flink.yarn.ApplicationMasterActor$$anonfun$receiveYarnMessages$1.applyOrElse(ApplicationMasterActor.scala:100)

scala.PartialFunction$OrElse.apply(PartialFunction.scala:162)

org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:36)

org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:29)

scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)

org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:29)

akka.actor.Actor$class.aroundReceive(Actor.scala:465)

org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:92)

akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)

akka.actor.ActorCell.invoke(ActorCell.scala:487)

akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)

akka.dispatch.Mailbox.run(Mailbox.scala:221)

akka.dispatch.Mailbox.exec(Mailbox.scala:231)

scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)

scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)

scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)

scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

 

Caused by: org.apache.hadoop.ipc.RemoteException(java.io.IOException): Delegation Token can be issued only with kerberos or web authentication

        at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getDelegationToken(FSNamesystem.java:7609)

        at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.getDelegationToken(NameNodeRpcServer.java:522)

        at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.getDelegationToken(ClientNamenodeProtocolServerSideTranslatorPB.java:977)

        at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)

        at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:619)

        at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:962)

        at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2039)

        at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2035)

        at java.security.AccessController.doPrivileged(Native Method)

        at javax.security.auth.Subject.doAs(Subject.java:415)

        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1628)

        at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2033)

 

org.apache.hadoop.ipc.Client.call(Client.java:1468)

org.apache.hadoop.ipc.Client.call(Client.java:1399)

org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:232)

com.sun.proxy.$Proxy14.getDelegationToken(Unknown Source)

org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getDelegationToken(ClientNamenodeProtocolTranslatorPB.java:909)

sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)

sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

java.lang.reflect.Method.invoke(Method.java:606)

org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:187)

org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)

com.sun.proxy.$Proxy15.getDelegationToken(Unknown Source)

org.apache.hadoop.hdfs.DFSClient.getDelegationToken(DFSClient.java:1029)

org.apache.hadoop.hdfs.DistributedFileSystem.getDelegationToken(DistributedFileSystem.java:1355)

org.apache.hadoop.fs.FileSystem.collectDelegationTokens(FileSystem.java:529)

org.apache.hadoop.fs.FileSystem.addDelegationTokens(FileSystem.java:507)

org.apache.hadoop.hdfs.DistributedFileSystem.addDelegationTokens(DistributedFileSystem.java:2041)

org.apache.hadoop.mapreduce.security.TokenCache.obtainTokensForNamenodesInternal(TokenCache.java:121)

org.apache.hadoop.mapreduce.security.TokenCache.obtainTokensForNamenodesInternal(TokenCache.java:100)

org.apache.hadoop.mapreduce.security.TokenCache.obtainTokensForNamenodes(TokenCache.java:80)

org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:205)

org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:313)

org.apache.hive.hcatalog.mapreduce.HCatBaseInputFormat.getSplits(HCatBaseInputFormat.java:157)

org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormatBase.createInputSplits(HadoopInputFormatBase.java:140)

org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormatBase.createInputSplits(HadoopInputFormatBase.java:51)

org.apache.flink.runtime.executiongraph.ExecutionJobVertex.<init>(ExecutionJobVertex.java:146)

org.apache.flink.runtime.executiongraph.ExecutionGraph.attachJobGraph(ExecutionGraph.java:469)

org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:534)

org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:190)

scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)

scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)

scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)

org.apache.flink.yarn.ApplicationMasterActor$$anonfun$receiveYarnMessages$1.applyOrElse(ApplicationMasterActor.scala:100)

scala.PartialFunction$OrElse.apply(PartialFunction.scala:162)

org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:36)

org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:29)

scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)

org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:29)

akka.actor.Actor$class.aroundReceive(Actor.scala:465)

org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:92)

akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)

akka.actor.ActorCell.invoke(ActorCell.scala:487)

akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)

akka.dispatch.Mailbox.run(Mailbox.scala:221)

akka.dispatch.Mailbox.exec(Mailbox.scala:231)

scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)

scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)

scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)

scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

 

 

 

Do you have any clue?

 

Best regards,

Arnaud

 

 




L'intégrité de ce message n'étant pas assurée sur internet, la société expéditrice ne peut être tenue responsable de son contenu ni de ses pièces jointes. Toute utilisation ou diffusion non autorisée est interdite. Si vous n'êtes pas destinataire de ce message, merci de le détruire et d'avertir l'expéditeur.

The integrity of this message cannot be guaranteed on the Internet. The company that sent this message cannot therefore be held liable for its content nor attachments. Any unauthorized use or dissemination is prohibited. If you are not the intended recipient of this message, then please delete it and notify the sender.
Reply | Threaded
Open this post in threaded view
|

Re: Using HadoopInputFormat files from Flink/Yarn in a secure cluster gives an error

rmetzger0
Hi Arnaud,

I suspect the "HdfsTools" are something internal from your company?
Are they doing any kerberos-related operations?

Is the local cluster mode also reading files from the secured HDFS cluster?

Flink is taking care of sending the authentication tokens from the client to the jobManager and to the TaskManagers.
For HDFS Flink should also use these user settings.
I don't know whether the HCatalog code / Hadoop compatbililty code is also doing some kerberos operations which are interfering with our efforts.

From the logs, you can see:
Secure Hadoop environment setup detected. Running in secure context.
15:04:18,005 INFO  org.apache.hadoop.security.UserGroupInformation               - Login successful for user alinz using keytab file /usr/users/alinz/alinz.keytab


Is the user "alinz" authorized to access the files in HDFS?


I have to admit that I didn't see this issue before.

If possible, can you privately send the the full log of the application, using "yarn logs -applicationId <ID>" ?



On Thu, Aug 20, 2015 at 4:08 PM, LINZ, Arnaud <[hidden email]> wrote:

Hello,

 

My application handles as input and output some HDFS files in the jobs and in the driver application.

It works in local cluster mode, but when I’m trying to submit it to a yarn client, when I try to use a HadoopInputFormat (that comes from a HCatalog request), I have the following error: Delegation Token can be issued only with kerberos or web authentication (full stack trace below).

 

Code which I believe causes the error (It’s not clear in the stack trace, as the nearest point in my code is “execEnv.execute()”) :

 

public synchronized DataSet<T> readTable(String dbName, String tableName, String filter, ExecutionEnvironment cluster,

        final HiveBeanFactory<T> factory) throws IOException {

 

        // login kerberos if needed (via UserGroupInformation.loginUserFromKeytab(getKerberosPrincipal(), getKerberosKeytab());)

        HdfsTools.getFileSystem();

 

        // Create M/R job and configure it

        final Job job = Job.getInstance();

        job.setJobName("Flink source for Hive Table " + dbName + "." + tableName);

 

        // Crée la source

        @SuppressWarnings({ "unchecked", "rawtypes" })

        final HadoopInputFormat<NullWritable, DefaultHCatRecord> inputFormat = new HadoopInputFormat<NullWritable, // CHECKSTYLE:OFF

        DefaultHCatRecord>(// CHECKSTYLE:ON

            (InputFormat) HCatInputFormat.setInput(job, dbName, tableName, filter), //

            NullWritable.class, //

            DefaultHCatRecord.class, //

            job);

 

        final HCatSchema inputSchema = HCatInputFormat.getTableSchema(job.getConfiguration());

        @SuppressWarnings("serial")

        final DataSet<T> dataSet = cluster

        // Read the table

            .createInput(inputFormat)

            // map bean (key is useless)

            .flatMap(new FlatMapFunction<Tuple2<NullWritable, DefaultHCatRecord>, T>() {

                @Override

                public void flatMap(Tuple2<NullWritable, DefaultHCatRecord> value, Collector<T> out) throws Exception {  // NOPMD

                    final T record = factory.fromHive(value.f1, inputSchema);

                    if (record != null) {

                        out.collect(record);

                    }

                }

            }).returns(beanClass);

 

        return dataSet;

    }

 

Maybe I need to explicitely get a token on each node in the initialization of HadoopInputFormat() (overriding configure()) ? That would be difficult since the keyfile is on the driver’s local drive…

 

StackTrace :

 

Found YARN properties file /usr/lib/flink/bin/../conf/.yarn-properties

Using JobManager address from YARN properties bt1svlmw.bpa.bouyguestelecom.fr/172.19.115.52:50494

Secure Hadoop environment setup detected. Running in secure context.

2015:08:20 15:04:17 (main) - INFO - com.bouygtel.kuberasdk.main.Application.mainMethod - Dï¿œbut traitement

15:04:18,005 INFO  org.apache.hadoop.security.UserGroupInformation               - Login successful for user alinz using keytab file /usr/users/alinz/alinz.keytab

15:04:20,139 WARN  org.apache.hadoop.hdfs.shortcircuit.DomainSocketFactory       - The short-circuit local reads feature cannot be used because libhadoop cannot be loaded.

Error : Execution Kubera KO : java.lang.IllegalStateException: Error while executing Flink application

com.bouygtel.kuberasdk.main.ApplicationBatch.execCluster(ApplicationBatch.java:84)

com.bouygtel.kubera.main.segment.ApplicationGeoSegment.batchExec(ApplicationGeoSegment.java:68)

com.bouygtel.kuberasdk.main.ApplicationBatch.exec(ApplicationBatch.java:51)

com.bouygtel.kuberasdk.main.Application.mainMethod(Application.java:81)

com.bouygtel.kubera.main.segment.MainGeoSmooth.main(MainGeoSmooth.java:44)

sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)

sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

java.lang.reflect.Method.invoke(Method.java:606)

org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:437)

org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:353)

org.apache.flink.client.program.Client.run(Client.java:315)

org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:584)

org.apache.flink.client.CliFrontend.run(CliFrontend.java:290)

org.apache.flink.client.CliFrontend$2.run(CliFrontend.java:873)

org.apache.flink.client.CliFrontend$2.run(CliFrontend.java:870)

org.apache.flink.runtime.security.SecurityUtils$1.run(SecurityUtils.java:50)

java.security.AccessController.doPrivileged(Native Method)

javax.security.auth.Subject.doAs(Subject.java:415)

org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1628)

org.apache.flink.runtime.security.SecurityUtils.runSecured(SecurityUtils.java:47)

org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:870)

org.apache.flink.client.CliFrontend.main(CliFrontend.java:922)

 

Caused by: org.apache.flink.client.program.ProgramInvocationException: The program execution failed: Failed to submit job dddaf104260eb0f56ff336727ceeb49e (KUBERA-GEO-BRUT2SEGMENT)

org.apache.flink.client.program.Client.run(Client.java:413)

org.apache.flink.client.program.Client.run(Client.java:356)

org.apache.flink.client.program.Client.run(Client.java:349)

org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:63)

com.bouygtel.kuberasdk.main.ApplicationBatch.execCluster(ApplicationBatch.java:80)

com.bouygtel.kubera.main.segment.ApplicationGeoSegment.batchExec(ApplicationGeoSegment.java:68)

com.bouygtel.kuberasdk.main.ApplicationBatch.exec(ApplicationBatch.java:51)

com.bouygtel.kuberasdk.main.Application.mainMethod(Application.java:81)

com.bouygtel.kubera.main.segment.MainGeoSmooth.main(MainGeoSmooth.java:44)

sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)

sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

java.lang.reflect.Method.invoke(Method.java:606)

org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:437)

org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:353)

org.apache.flink.client.program.Client.run(Client.java:315)

org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:584)

org.apache.flink.client.CliFrontend.run(CliFrontend.java:290)

org.apache.flink.client.CliFrontend$2.run(CliFrontend.java:873)

org.apache.flink.client.CliFrontend$2.run(CliFrontend.java:870)

org.apache.flink.runtime.security.SecurityUtils$1.run(SecurityUtils.java:50)

java.security.AccessController.doPrivileged(Native Method)

javax.security.auth.Subject.doAs(Subject.java:415)

org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1628)

org.apache.flink.runtime.security.SecurityUtils.runSecured(SecurityUtils.java:47)

org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:870)

org.apache.flink.client.CliFrontend.main(CliFrontend.java:922)

 

Caused by: org.apache.flink.runtime.client.JobExecutionException: Failed to submit job dddaf104260eb0f56ff336727ceeb49e (KUBERA-GEO-BRUT2SEGMENT)

org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:594)

org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:190)

scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)

scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)

scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)

org.apache.flink.yarn.ApplicationMasterActor$$anonfun$receiveYarnMessages$1.applyOrElse(ApplicationMasterActor.scala:100)

scala.PartialFunction$OrElse.apply(PartialFunction.scala:162)

org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:36)

org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:29)

scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)

org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:29)

akka.actor.Actor$class.aroundReceive(Actor.scala:465)

org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:92)

akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)

akka.actor.ActorCell.invoke(ActorCell.scala:487)

akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)

akka.dispatch.Mailbox.run(Mailbox.scala:221)

akka.dispatch.Mailbox.exec(Mailbox.scala:231)

scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)

scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)

scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)

scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

 

Caused by: org.apache.flink.runtime.JobException: Creating the input splits caused an error: Delegation Token can be issued only with kerberos or web authentication

        at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getDelegationToken(FSNamesystem.java:7609)

        at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.getDelegationToken(NameNodeRpcServer.java:522)

        at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.getDelegationToken(ClientNamenodeProtocolServerSideTranslatorPB.java:977)

        at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)

        at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:619)

        at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:962)

        at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2039)

        at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2035)

        at java.security.AccessController.doPrivileged(Native Method)

        at javax.security.auth.Subject.doAs(Subject.java:415)

        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1628)

        at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2033)

 

org.apache.flink.runtime.executiongraph.ExecutionJobVertex.<init>(ExecutionJobVertex.java:162)

org.apache.flink.runtime.executiongraph.ExecutionGraph.attachJobGraph(ExecutionGraph.java:469)

org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:534)

org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:190)

scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)

scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)

scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)

org.apache.flink.yarn.ApplicationMasterActor$$anonfun$receiveYarnMessages$1.applyOrElse(ApplicationMasterActor.scala:100)

scala.PartialFunction$OrElse.apply(PartialFunction.scala:162)

org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:36)

org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:29)

scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)

org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:29)

akka.actor.Actor$class.aroundReceive(Actor.scala:465)

org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:92)

akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)

akka.actor.ActorCell.invoke(ActorCell.scala:487)

akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)

akka.dispatch.Mailbox.run(Mailbox.scala:221)

akka.dispatch.Mailbox.exec(Mailbox.scala:231)

scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)

scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)

scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)

scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

 

Caused by: org.apache.hadoop.ipc.RemoteException(java.io.IOException): Delegation Token can be issued only with kerberos or web authentication

        at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getDelegationToken(FSNamesystem.java:7609)

        at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.getDelegationToken(NameNodeRpcServer.java:522)

        at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.getDelegationToken(ClientNamenodeProtocolServerSideTranslatorPB.java:977)

        at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)

        at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:619)

        at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:962)

        at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2039)

        at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2035)

        at java.security.AccessController.doPrivileged(Native Method)

        at javax.security.auth.Subject.doAs(Subject.java:415)

        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1628)

        at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2033)

 

org.apache.hadoop.ipc.Client.call(Client.java:1468)

org.apache.hadoop.ipc.Client.call(Client.java:1399)

org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:232)

com.sun.proxy.$Proxy14.getDelegationToken(Unknown Source)

org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getDelegationToken(ClientNamenodeProtocolTranslatorPB.java:909)

sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)

sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

java.lang.reflect.Method.invoke(Method.java:606)

org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:187)

org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)

com.sun.proxy.$Proxy15.getDelegationToken(Unknown Source)

org.apache.hadoop.hdfs.DFSClient.getDelegationToken(DFSClient.java:1029)

org.apache.hadoop.hdfs.DistributedFileSystem.getDelegationToken(DistributedFileSystem.java:1355)

org.apache.hadoop.fs.FileSystem.collectDelegationTokens(FileSystem.java:529)

org.apache.hadoop.fs.FileSystem.addDelegationTokens(FileSystem.java:507)

org.apache.hadoop.hdfs.DistributedFileSystem.addDelegationTokens(DistributedFileSystem.java:2041)

org.apache.hadoop.mapreduce.security.TokenCache.obtainTokensForNamenodesInternal(TokenCache.java:121)

org.apache.hadoop.mapreduce.security.TokenCache.obtainTokensForNamenodesInternal(TokenCache.java:100)

org.apache.hadoop.mapreduce.security.TokenCache.obtainTokensForNamenodes(TokenCache.java:80)

org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:205)

org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:313)

org.apache.hive.hcatalog.mapreduce.HCatBaseInputFormat.getSplits(HCatBaseInputFormat.java:157)

org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormatBase.createInputSplits(HadoopInputFormatBase.java:140)

org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormatBase.createInputSplits(HadoopInputFormatBase.java:51)

org.apache.flink.runtime.executiongraph.ExecutionJobVertex.<init>(ExecutionJobVertex.java:146)

org.apache.flink.runtime.executiongraph.ExecutionGraph.attachJobGraph(ExecutionGraph.java:469)

org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:534)

org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:190)

scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)

scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)

scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)

org.apache.flink.yarn.ApplicationMasterActor$$anonfun$receiveYarnMessages$1.applyOrElse(ApplicationMasterActor.scala:100)

scala.PartialFunction$OrElse.apply(PartialFunction.scala:162)

org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:36)

org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:29)

scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)

org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:29)

akka.actor.Actor$class.aroundReceive(Actor.scala:465)

org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:92)

akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)

akka.actor.ActorCell.invoke(ActorCell.scala:487)

akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)

akka.dispatch.Mailbox.run(Mailbox.scala:221)

akka.dispatch.Mailbox.exec(Mailbox.scala:231)

scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)

scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)

scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)

scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

 

 

 

Do you have any clue?

 

Best regards,

Arnaud

 

 




L'intégrité de ce message n'étant pas assurée sur internet, la société expéditrice ne peut être tenue responsable de son contenu ni de ses pièces jointes. Toute utilisation ou diffusion non autorisée est interdite. Si vous n'êtes pas destinataire de ce message, merci de le détruire et d'avertir l'expéditeur.

The integrity of this message cannot be guaranteed on the Internet. The company that sent this message cannot therefore be held liable for its content nor attachments. Any unauthorized use or dissemination is prohibited. If you are not the intended recipient of this message, then please delete it and notify the sender.

Reply | Threaded
Open this post in threaded view
|

RE: Using HadoopInputFormat files from Flink/Yarn in a secure cluster gives an error

LINZ, Arnaud

Hi Robert,

 

Yes, it’s an internal tool, which get an HDFS FileSystem instance. I do some Kerberos-related operations, needed because I manipulate some HDFS files before executing the application.

The local cluster mode is working fine with the same code, and it does some HCat reading / HDFS writing.

 

What HdfsTools does, in a nutshell :

  final Configuration cfg = new Configuration();

        cfg.addResource(new Path("/home/hadoop/conf/core-site.xml"));

        cfg.addResource(new Path("/home/hadoop/conf/hdfs-site.xml"));

        cfg.addResource(new Path(Environnement.getEnvVarQuietly("HADOOP_CONF_DIR") + "/core-site.xml"));

        cfg.addResource(new Path(Environnement.getEnvVarQuietly("HADOOP_CONF_DIR") + "/hdfs-site.xml"));

        // Kerberos handling

        if (isKerberosActive()) {

            loginKerberos(cfg);

        }

        filesys = FileSystem.get(cfg);

 

And the straightforward kerberos stuff:

public static synchronized void loginKerberos(Configuration cfg) {

        UserGroupInformation.setConfiguration(cfg);

        if (!loggedIn) {

            try {

                UserGroupInformation.loginUserFromKeytab(getKerberosPrincipal(), getKerberosKeytab());

                loggedIn = true;

                JournalUDF.logLocalFS("User " + UserGroupInformation.getLoginUser() + " : Kerberos login succeeded ");

            }

            catch (IOException excep) {

                throw new GaneshRuntimeException("Unable to log (kerberos) : " + excep.toString(), excep);

            }

        }

    }

loggedIn being static to the class, and alinz having all the proper rights.

 

From what I’ve seen on google, spark and hive/oozie ran into the same error and somewhat corrected that, but I don’t know if it will help to see if it’s really the same pb.

I’m sending you the full trace on a private mail.

 

Arnaud

 

De : Robert Metzger [mailto:[hidden email]]
Envoyé : jeudi 20 août 2015 16:42
À : [hidden email]
Objet : Re: Using HadoopInputFormat files from Flink/Yarn in a secure cluster gives an error

 

Hi Arnaud,

 

I suspect the "HdfsTools" are something internal from your company?

Are they doing any kerberos-related operations?

 

Is the local cluster mode also reading files from the secured HDFS cluster?

 

Flink is taking care of sending the authentication tokens from the client to the jobManager and to the TaskManagers.

For HDFS Flink should also use these user settings.

I don't know whether the HCatalog code / Hadoop compatbililty code is also doing some kerberos operations which are interfering with our efforts.

 

From the logs, you can see:

Secure Hadoop environment setup detected. Running in secure context.
15:04:18,005 INFO  org.apache.hadoop.security.UserGroupInformation               - Login successful for user alinz using keytab file /usr/users/alinz/alinz.keytab

 

Is the user "alinz" authorized to access the files in HDFS?

 

I have to admit that I didn't see this issue before.

If possible, can you privately send the the full log of the application, using "yarn logs -applicationId <ID>" ?

 

 

On Thu, Aug 20, 2015 at 4:08 PM, LINZ, Arnaud <[hidden email]> wrote:

Hello,

 

My application handles as input and output some HDFS files in the jobs and in the driver application.

It works in local cluster mode, but when I’m trying to submit it to a yarn client, when I try to use a HadoopInputFormat (that comes from a HCatalog request), I have the following error: Delegation Token can be issued only with kerberos or web authentication (full stack trace below).

 

Code which I believe causes the error (It’s not clear in the stack trace, as the nearest point in my code is “execEnv.execute()”) :

 

public synchronized DataSet<T> readTable(String dbName, String tableName, String filter, ExecutionEnvironment cluster,

        final HiveBeanFactory<T> factory) throws IOException {

 

        // login kerberos if needed (via UserGroupInformation.loginUserFromKeytab(getKerberosPrincipal(), getKerberosKeytab());)

        HdfsTools.getFileSystem();

 

        // Create M/R job and configure it

        final Job job = Job.getInstance();

        job.setJobName("Flink source for Hive Table " + dbName + "." + tableName);

 

        // Crée la source

        @SuppressWarnings({ "unchecked", "rawtypes" })

        final HadoopInputFormat<NullWritable, DefaultHCatRecord> inputFormat = new HadoopInputFormat<NullWritable, // CHECKSTYLE:OFF

        DefaultHCatRecord>(// CHECKSTYLE:ON

            (InputFormat) HCatInputFormat.setInput(job, dbName, tableName, filter), //

            NullWritable.class, //

            DefaultHCatRecord.class, //

            job);

 

        final HCatSchema inputSchema = HCatInputFormat.getTableSchema(job.getConfiguration());

        @SuppressWarnings("serial")

        final DataSet<T> dataSet = cluster

        // Read the table

            .createInput(inputFormat)

            // map bean (key is useless)

            .flatMap(new FlatMapFunction<Tuple2<NullWritable, DefaultHCatRecord>, T>() {

                @Override

                public void flatMap(Tuple2<NullWritable, DefaultHCatRecord> value, Collector<T> out) throws Exception {  // NOPMD

                    final T record = factory.fromHive(value.f1, inputSchema);

                    if (record != null) {

                        out.collect(record);

                    }

                }

            }).returns(beanClass);

 

        return dataSet;

    }

 

Maybe I need to explicitely get a token on each node in the initialization of HadoopInputFormat() (overriding configure()) ? That would be difficult since the keyfile is on the driver’s local drive…

 

StackTrace :

 

Found YARN properties file /usr/lib/flink/bin/../conf/.yarn-properties

Using JobManager address from YARN properties bt1svlmw.bpa.bouyguestelecom.fr/172.19.115.52:50494

Secure Hadoop environment setup detected. Running in secure context.

2015:08:20 15:04:17 (main) - INFO - com.bouygtel.kuberasdk.main.Application.mainMethod - Dï¿œbut traitement

15:04:18,005 INFO  org.apache.hadoop.security.UserGroupInformation               - Login successful for user alinz using keytab file /usr/users/alinz/alinz.keytab

15:04:20,139 WARN  org.apache.hadoop.hdfs.shortcircuit.DomainSocketFactory       - The short-circuit local reads feature cannot be used because libhadoop cannot be loaded.

Error : Execution Kubera KO : java.lang.IllegalStateException: Error while executing Flink application

com.bouygtel.kuberasdk.main.ApplicationBatch.execCluster(ApplicationBatch.java:84)

com.bouygtel.kubera.main.segment.ApplicationGeoSegment.batchExec(ApplicationGeoSegment.java:68)

com.bouygtel.kuberasdk.main.ApplicationBatch.exec(ApplicationBatch.java:51)

com.bouygtel.kuberasdk.main.Application.mainMethod(Application.java:81)

com.bouygtel.kubera.main.segment.MainGeoSmooth.main(MainGeoSmooth.java:44)

sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)

sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

java.lang.reflect.Method.invoke(Method.java:606)

org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:437)

org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:353)

org.apache.flink.client.program.Client.run(Client.java:315)

org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:584)

org.apache.flink.client.CliFrontend.run(CliFrontend.java:290)

org.apache.flink.client.CliFrontend$2.run(CliFrontend.java:873)

org.apache.flink.client.CliFrontend$2.run(CliFrontend.java:870)

org.apache.flink.runtime.security.SecurityUtils$1.run(SecurityUtils.java:50)

java.security.AccessController.doPrivileged(Native Method)

javax.security.auth.Subject.doAs(Subject.java:415)

org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1628)

org.apache.flink.runtime.security.SecurityUtils.runSecured(SecurityUtils.java:47)

org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:870)

org.apache.flink.client.CliFrontend.main(CliFrontend.java:922)

 

Caused by: org.apache.flink.client.program.ProgramInvocationException: The program execution failed: Failed to submit job dddaf104260eb0f56ff336727ceeb49e (KUBERA-GEO-BRUT2SEGMENT)

org.apache.flink.client.program.Client.run(Client.java:413)

org.apache.flink.client.program.Client.run(Client.java:356)

org.apache.flink.client.program.Client.run(Client.java:349)

org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:63)

com.bouygtel.kuberasdk.main.ApplicationBatch.execCluster(ApplicationBatch.java:80)

com.bouygtel.kubera.main.segment.ApplicationGeoSegment.batchExec(ApplicationGeoSegment.java:68)

com.bouygtel.kuberasdk.main.ApplicationBatch.exec(ApplicationBatch.java:51)

com.bouygtel.kuberasdk.main.Application.mainMethod(Application.java:81)

com.bouygtel.kubera.main.segment.MainGeoSmooth.main(MainGeoSmooth.java:44)

sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)

sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

java.lang.reflect.Method.invoke(Method.java:606)

org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:437)

org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:353)

org.apache.flink.client.program.Client.run(Client.java:315)

org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:584)

org.apache.flink.client.CliFrontend.run(CliFrontend.java:290)

org.apache.flink.client.CliFrontend$2.run(CliFrontend.java:873)

org.apache.flink.client.CliFrontend$2.run(CliFrontend.java:870)

org.apache.flink.runtime.security.SecurityUtils$1.run(SecurityUtils.java:50)

java.security.AccessController.doPrivileged(Native Method)

javax.security.auth.Subject.doAs(Subject.java:415)

org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1628)

org.apache.flink.runtime.security.SecurityUtils.runSecured(SecurityUtils.java:47)

org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:870)

org.apache.flink.client.CliFrontend.main(CliFrontend.java:922)

 

Caused by: org.apache.flink.runtime.client.JobExecutionException: Failed to submit job dddaf104260eb0f56ff336727ceeb49e (KUBERA-GEO-BRUT2SEGMENT)

org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:594)

org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:190)

scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)

scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)

scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)

org.apache.flink.yarn.ApplicationMasterActor$$anonfun$receiveYarnMessages$1.applyOrElse(ApplicationMasterActor.scala:100)

scala.PartialFunction$OrElse.apply(PartialFunction.scala:162)

org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:36)

org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:29)

scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)

org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:29)

akka.actor.Actor$class.aroundReceive(Actor.scala:465)

org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:92)

akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)

akka.actor.ActorCell.invoke(ActorCell.scala:487)

akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)

akka.dispatch.Mailbox.run(Mailbox.scala:221)

akka.dispatch.Mailbox.exec(Mailbox.scala:231)

scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)

scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)

scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)

scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

 

Caused by: org.apache.flink.runtime.JobException: Creating the input splits caused an error: Delegation Token can be issued only with kerberos or web authentication

        at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getDelegationToken(FSNamesystem.java:7609)

        at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.getDelegationToken(NameNodeRpcServer.java:522)

        at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.getDelegationToken(ClientNamenodeProtocolServerSideTranslatorPB.java:977)

        at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)

        at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:619)

        at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:962)

        at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2039)

        at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2035)

        at java.security.AccessController.doPrivileged(Native Method)

        at javax.security.auth.Subject.doAs(Subject.java:415)

        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1628)

        at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2033)

 

org.apache.flink.runtime.executiongraph.ExecutionJobVertex.<init>(ExecutionJobVertex.java:162)

org.apache.flink.runtime.executiongraph.ExecutionGraph.attachJobGraph(ExecutionGraph.java:469)

org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:534)

org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:190)

scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)

scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)

scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)

org.apache.flink.yarn.ApplicationMasterActor$$anonfun$receiveYarnMessages$1.applyOrElse(ApplicationMasterActor.scala:100)

scala.PartialFunction$OrElse.apply(PartialFunction.scala:162)

org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:36)

org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:29)

scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)

org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:29)

akka.actor.Actor$class.aroundReceive(Actor.scala:465)

org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:92)

akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)

akka.actor.ActorCell.invoke(ActorCell.scala:487)

akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)

akka.dispatch.Mailbox.run(Mailbox.scala:221)

akka.dispatch.Mailbox.exec(Mailbox.scala:231)

scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)

scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)

scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)

scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

 

Caused by: org.apache.hadoop.ipc.RemoteException(java.io.IOException): Delegation Token can be issued only with kerberos or web authentication

        at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getDelegationToken(FSNamesystem.java:7609)

        at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.getDelegationToken(NameNodeRpcServer.java:522)

        at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.getDelegationToken(ClientNamenodeProtocolServerSideTranslatorPB.java:977)

        at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)

        at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:619)

        at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:962)

        at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2039)

        at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2035)

        at java.security.AccessController.doPrivileged(Native Method)

        at javax.security.auth.Subject.doAs(Subject.java:415)

        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1628)

        at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2033)

 

org.apache.hadoop.ipc.Client.call(Client.java:1468)

org.apache.hadoop.ipc.Client.call(Client.java:1399)

org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:232)

com.sun.proxy.$Proxy14.getDelegationToken(Unknown Source)

org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getDelegationToken(ClientNamenodeProtocolTranslatorPB.java:909)

sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)

sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

java.lang.reflect.Method.invoke(Method.java:606)

org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:187)

org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)

com.sun.proxy.$Proxy15.getDelegationToken(Unknown Source)

org.apache.hadoop.hdfs.DFSClient.getDelegationToken(DFSClient.java:1029)

org.apache.hadoop.hdfs.DistributedFileSystem.getDelegationToken(DistributedFileSystem.java:1355)

org.apache.hadoop.fs.FileSystem.collectDelegationTokens(FileSystem.java:529)

org.apache.hadoop.fs.FileSystem.addDelegationTokens(FileSystem.java:507)

org.apache.hadoop.hdfs.DistributedFileSystem.addDelegationTokens(DistributedFileSystem.java:2041)

org.apache.hadoop.mapreduce.security.TokenCache.obtainTokensForNamenodesInternal(TokenCache.java:121)

org.apache.hadoop.mapreduce.security.TokenCache.obtainTokensForNamenodesInternal(TokenCache.java:100)

org.apache.hadoop.mapreduce.security.TokenCache.obtainTokensForNamenodes(TokenCache.java:80)

org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:205)

org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:313)

org.apache.hive.hcatalog.mapreduce.HCatBaseInputFormat.getSplits(HCatBaseInputFormat.java:157)

org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormatBase.createInputSplits(HadoopInputFormatBase.java:140)

org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormatBase.createInputSplits(HadoopInputFormatBase.java:51)

org.apache.flink.runtime.executiongraph.ExecutionJobVertex.<init>(ExecutionJobVertex.java:146)

org.apache.flink.runtime.executiongraph.ExecutionGraph.attachJobGraph(ExecutionGraph.java:469)

org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:534)

org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:190)

scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)

scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)

scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)

org.apache.flink.yarn.ApplicationMasterActor$$anonfun$receiveYarnMessages$1.applyOrElse(ApplicationMasterActor.scala:100)

scala.PartialFunction$OrElse.apply(PartialFunction.scala:162)

org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:36)

org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:29)

scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)

org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:29)

akka.actor.Actor$class.aroundReceive(Actor.scala:465)

org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:92)

akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)

akka.actor.ActorCell.invoke(ActorCell.scala:487)

akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)

akka.dispatch.Mailbox.run(Mailbox.scala:221)

akka.dispatch.Mailbox.exec(Mailbox.scala:231)

scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)

scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)

scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)

scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

 

 

 

Do you have any clue?

 

Best regards,

Arnaud

 

 

 



L'intégrité de ce message n'étant pas assurée sur internet, la société expéditrice ne peut être tenue responsable de son contenu ni de ses pièces jointes. Toute utilisation ou diffusion non autorisée est interdite. Si vous n'êtes pas destinataire de ce message, merci de le détruire et d'avertir l'expéditeur.

The integrity of this message cannot be guaranteed on the Internet. The company that sent this message cannot therefore be held liable for its content nor attachments. Any unauthorized use or dissemination is prohibited. If you are not the intended recipient of this message, then please delete it and notify the sender.

 

Reply | Threaded
Open this post in threaded view
|

Re: Using HadoopInputFormat files from Flink/Yarn in a secure cluster gives an error

rmetzger0
I was able to reproduce the issue. This is the JIRA: https://issues.apache.org/jira/browse/FLINK-2555
I've already opened a pull request with the fix.

The problem was that our HadoopInputFormat wrapper was not correctly passing the security credentials from the Job object to the cluster.

Consider this code posted by Arnaud in the initial message:

final Job job = Job.getInstance();

        job.setJobName("Flink source for Hive Table " + dbName + "." + tableName);

 

        // Crée la source

        @SuppressWarnings({ "unchecked""rawtypes" })

        final HadoopInputFormat<NullWritable, DefaultHCatRecord> inputFormat = new HadoopInputFormat<NullWritable, // CHECKSTYLE:OFF

        DefaultHCatRecord>(// CHECKSTYLE:ON

            (InputFormat) HCatInputFormat.setInput(jobdbNametableNamefilter), //

            NullWritable.class//

            DefaultHCatRecord.class//

            job);


in the "Job.getInstance()" call, the current authentication credentials of the user are stored.

They are later passed to the HadoopInputFormat class (last line), but Flink was not properly making the Credentials available again on the cluster.


The pull request should resolve the issue (I've verified it on a secured CDH 5.3 setup)


Thank you for reporting the bug!



On Thu, Aug 20, 2015 at 5:21 PM, LINZ, Arnaud <[hidden email]> wrote:

Hi Robert,

 

Yes, it’s an internal tool, which get an HDFS FileSystem instance. I do some Kerberos-related operations, needed because I manipulate some HDFS files before executing the application.

The local cluster mode is working fine with the same code, and it does some HCat reading / HDFS writing.

 

What HdfsTools does, in a nutshell :

  final Configuration cfg = new Configuration();

        cfg.addResource(new Path("/home/hadoop/conf/core-site.xml"));

        cfg.addResource(new Path("/home/hadoop/conf/hdfs-site.xml"));

        cfg.addResource(new Path(Environnement.getEnvVarQuietly("HADOOP_CONF_DIR") + "/core-site.xml"));

        cfg.addResource(new Path(Environnement.getEnvVarQuietly("HADOOP_CONF_DIR") + "/hdfs-site.xml"));

        // Kerberos handling

        if (isKerberosActive()) {

            loginKerberos(cfg);

        }

        filesys = FileSystem.get(cfg);

 

And the straightforward kerberos stuff:

public static synchronized void loginKerberos(Configuration cfg) {

        UserGroupInformation.setConfiguration(cfg);

        if (!loggedIn) {

            try {

                UserGroupInformation.loginUserFromKeytab(getKerberosPrincipal(), getKerberosKeytab());

                loggedIn = true;

                JournalUDF.logLocalFS("User " + UserGroupInformation.getLoginUser() + " : Kerberos login succeeded ");

            }

            catch (IOException excep) {

                throw new GaneshRuntimeException("Unable to log (kerberos) : " + excep.toString(), excep);

            }

        }

    }

loggedIn being static to the class, and alinz having all the proper rights.

 

From what I’ve seen on google, spark and hive/oozie ran into the same error and somewhat corrected that, but I don’t know if it will help to see if it’s really the same pb.

I’m sending you the full trace on a private mail.

 

Arnaud

 

De : Robert Metzger [mailto:[hidden email]]
Envoyé : jeudi 20 août 2015 16:42
À : [hidden email]
Objet : Re: Using HadoopInputFormat files from Flink/Yarn in a secure cluster gives an error

 

Hi Arnaud,

 

I suspect the "HdfsTools" are something internal from your company?

Are they doing any kerberos-related operations?

 

Is the local cluster mode also reading files from the secured HDFS cluster?

 

Flink is taking care of sending the authentication tokens from the client to the jobManager and to the TaskManagers.

For HDFS Flink should also use these user settings.

I don't know whether the HCatalog code / Hadoop compatbililty code is also doing some kerberos operations which are interfering with our efforts.

 

From the logs, you can see:

Secure Hadoop environment setup detected. Running in secure context.
15:04:18,005 INFO  org.apache.hadoop.security.UserGroupInformation               - Login successful for user alinz using keytab file /usr/users/alinz/alinz.keytab

 

Is the user "alinz" authorized to access the files in HDFS?

 

I have to admit that I didn't see this issue before.

If possible, can you privately send the the full log of the application, using "yarn logs -applicationId <ID>" ?

 

 

On Thu, Aug 20, 2015 at 4:08 PM, LINZ, Arnaud <[hidden email]> wrote:

Hello,

 

My application handles as input and output some HDFS files in the jobs and in the driver application.

It works in local cluster mode, but when I’m trying to submit it to a yarn client, when I try to use a HadoopInputFormat (that comes from a HCatalog request), I have the following error: Delegation Token can be issued only with kerberos or web authentication (full stack trace below).

 

Code which I believe causes the error (It’s not clear in the stack trace, as the nearest point in my code is “execEnv.execute()”) :

 

public synchronized DataSet<T> readTable(String dbName, String tableName, String filter, ExecutionEnvironment cluster,

        final HiveBeanFactory<T> factory) throws IOException {

 

        // login kerberos if needed (via UserGroupInformation.loginUserFromKeytab(getKerberosPrincipal(), getKerberosKeytab());)

        HdfsTools.getFileSystem();

 

        // Create M/R job and configure it

        final Job job = Job.getInstance();

        job.setJobName("Flink source for Hive Table " + dbName + "." + tableName);

 

        // Crée la source

        @SuppressWarnings({ "unchecked", "rawtypes" })

        final HadoopInputFormat<NullWritable, DefaultHCatRecord> inputFormat = new HadoopInputFormat<NullWritable, // CHECKSTYLE:OFF

        DefaultHCatRecord>(// CHECKSTYLE:ON

            (InputFormat) HCatInputFormat.setInput(job, dbName, tableName, filter), //

            NullWritable.class, //

            DefaultHCatRecord.class, //

            job);

 

        final HCatSchema inputSchema = HCatInputFormat.getTableSchema(job.getConfiguration());

        @SuppressWarnings("serial")

        final DataSet<T> dataSet = cluster

        // Read the table

            .createInput(inputFormat)

            // map bean (key is useless)

            .flatMap(new FlatMapFunction<Tuple2<NullWritable, DefaultHCatRecord>, T>() {

                @Override

                public void flatMap(Tuple2<NullWritable, DefaultHCatRecord> value, Collector<T> out) throws Exception {  // NOPMD

                    final T record = factory.fromHive(value.f1, inputSchema);

                    if (record != null) {

                        out.collect(record);

                    }

                }

            }).returns(beanClass);

 

        return dataSet;

    }

 

Maybe I need to explicitely get a token on each node in the initialization of HadoopInputFormat() (overriding configure()) ? That would be difficult since the keyfile is on the driver’s local drive…

 

StackTrace :

 

Found YARN properties file /usr/lib/flink/bin/../conf/.yarn-properties

Using JobManager address from YARN properties bt1svlmw.bpa.bouyguestelecom.fr/172.19.115.52:50494

Secure Hadoop environment setup detected. Running in secure context.

2015:08:20 15:04:17 (main) - INFO - com.bouygtel.kuberasdk.main.Application.mainMethod - Dï¿œbut traitement

15:04:18,005 INFO  org.apache.hadoop.security.UserGroupInformation               - Login successful for user alinz using keytab file /usr/users/alinz/alinz.keytab

15:04:20,139 WARN  org.apache.hadoop.hdfs.shortcircuit.DomainSocketFactory       - The short-circuit local reads feature cannot be used because libhadoop cannot be loaded.

Error : Execution Kubera KO : java.lang.IllegalStateException: Error while executing Flink application

com.bouygtel.kuberasdk.main.ApplicationBatch.execCluster(ApplicationBatch.java:84)

com.bouygtel.kubera.main.segment.ApplicationGeoSegment.batchExec(ApplicationGeoSegment.java:68)

com.bouygtel.kuberasdk.main.ApplicationBatch.exec(ApplicationBatch.java:51)

com.bouygtel.kuberasdk.main.Application.mainMethod(Application.java:81)

com.bouygtel.kubera.main.segment.MainGeoSmooth.main(MainGeoSmooth.java:44)

sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)

sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

java.lang.reflect.Method.invoke(Method.java:606)

org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:437)

org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:353)

org.apache.flink.client.program.Client.run(Client.java:315)

org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:584)

org.apache.flink.client.CliFrontend.run(CliFrontend.java:290)

org.apache.flink.client.CliFrontend$2.run(CliFrontend.java:873)

org.apache.flink.client.CliFrontend$2.run(CliFrontend.java:870)

org.apache.flink.runtime.security.SecurityUtils$1.run(SecurityUtils.java:50)

java.security.AccessController.doPrivileged(Native Method)

javax.security.auth.Subject.doAs(Subject.java:415)

org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1628)

org.apache.flink.runtime.security.SecurityUtils.runSecured(SecurityUtils.java:47)

org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:870)

org.apache.flink.client.CliFrontend.main(CliFrontend.java:922)

 

Caused by: org.apache.flink.client.program.ProgramInvocationException: The program execution failed: Failed to submit job dddaf104260eb0f56ff336727ceeb49e (KUBERA-GEO-BRUT2SEGMENT)

org.apache.flink.client.program.Client.run(Client.java:413)

org.apache.flink.client.program.Client.run(Client.java:356)

org.apache.flink.client.program.Client.run(Client.java:349)

org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:63)

com.bouygtel.kuberasdk.main.ApplicationBatch.execCluster(ApplicationBatch.java:80)

com.bouygtel.kubera.main.segment.ApplicationGeoSegment.batchExec(ApplicationGeoSegment.java:68)

com.bouygtel.kuberasdk.main.ApplicationBatch.exec(ApplicationBatch.java:51)

com.bouygtel.kuberasdk.main.Application.mainMethod(Application.java:81)

com.bouygtel.kubera.main.segment.MainGeoSmooth.main(MainGeoSmooth.java:44)

sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)

sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

java.lang.reflect.Method.invoke(Method.java:606)

org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:437)

org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:353)

org.apache.flink.client.program.Client.run(Client.java:315)

org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:584)

org.apache.flink.client.CliFrontend.run(CliFrontend.java:290)

org.apache.flink.client.CliFrontend$2.run(CliFrontend.java:873)

org.apache.flink.client.CliFrontend$2.run(CliFrontend.java:870)

org.apache.flink.runtime.security.SecurityUtils$1.run(SecurityUtils.java:50)

java.security.AccessController.doPrivileged(Native Method)

javax.security.auth.Subject.doAs(Subject.java:415)

org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1628)

org.apache.flink.runtime.security.SecurityUtils.runSecured(SecurityUtils.java:47)

org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:870)

org.apache.flink.client.CliFrontend.main(CliFrontend.java:922)

 

Caused by: org.apache.flink.runtime.client.JobExecutionException: Failed to submit job dddaf104260eb0f56ff336727ceeb49e (KUBERA-GEO-BRUT2SEGMENT)

org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:594)

org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:190)

scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)

scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)

scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)

org.apache.flink.yarn.ApplicationMasterActor$$anonfun$receiveYarnMessages$1.applyOrElse(ApplicationMasterActor.scala:100)

scala.PartialFunction$OrElse.apply(PartialFunction.scala:162)

org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:36)

org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:29)

scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)

org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:29)

akka.actor.Actor$class.aroundReceive(Actor.scala:465)

org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:92)

akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)

akka.actor.ActorCell.invoke(ActorCell.scala:487)

akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)

akka.dispatch.Mailbox.run(Mailbox.scala:221)

akka.dispatch.Mailbox.exec(Mailbox.scala:231)

scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)

scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)

scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)

scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

 

Caused by: org.apache.flink.runtime.JobException: Creating the input splits caused an error: Delegation Token can be issued only with kerberos or web authentication

        at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getDelegationToken(FSNamesystem.java:7609)

        at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.getDelegationToken(NameNodeRpcServer.java:522)

        at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.getDelegationToken(ClientNamenodeProtocolServerSideTranslatorPB.java:977)

        at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)

        at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:619)

        at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:962)

        at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2039)

        at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2035)

        at java.security.AccessController.doPrivileged(Native Method)

        at javax.security.auth.Subject.doAs(Subject.java:415)

        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1628)

        at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2033)

 

org.apache.flink.runtime.executiongraph.ExecutionJobVertex.<init>(ExecutionJobVertex.java:162)

org.apache.flink.runtime.executiongraph.ExecutionGraph.attachJobGraph(ExecutionGraph.java:469)

org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:534)

org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:190)

scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)

scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)

scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)

org.apache.flink.yarn.ApplicationMasterActor$$anonfun$receiveYarnMessages$1.applyOrElse(ApplicationMasterActor.scala:100)

scala.PartialFunction$OrElse.apply(PartialFunction.scala:162)

org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:36)

org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:29)

scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)

org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:29)

akka.actor.Actor$class.aroundReceive(Actor.scala:465)

org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:92)

akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)

akka.actor.ActorCell.invoke(ActorCell.scala:487)

akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)

akka.dispatch.Mailbox.run(Mailbox.scala:221)

akka.dispatch.Mailbox.exec(Mailbox.scala:231)

scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)

scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)

scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)

scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

 

Caused by: org.apache.hadoop.ipc.RemoteException(java.io.IOException): Delegation Token can be issued only with kerberos or web authentication

        at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getDelegationToken(FSNamesystem.java:7609)

        at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.getDelegationToken(NameNodeRpcServer.java:522)

        at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.getDelegationToken(ClientNamenodeProtocolServerSideTranslatorPB.java:977)

        at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)

        at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:619)

        at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:962)

        at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2039)

        at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2035)

        at java.security.AccessController.doPrivileged(Native Method)

        at javax.security.auth.Subject.doAs(Subject.java:415)

        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1628)

        at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2033)

 

org.apache.hadoop.ipc.Client.call(Client.java:1468)

org.apache.hadoop.ipc.Client.call(Client.java:1399)

org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:232)

com.sun.proxy.$Proxy14.getDelegationToken(Unknown Source)

org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getDelegationToken(ClientNamenodeProtocolTranslatorPB.java:909)

sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)

sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

java.lang.reflect.Method.invoke(Method.java:606)

org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:187)

org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)

com.sun.proxy.$Proxy15.getDelegationToken(Unknown Source)

org.apache.hadoop.hdfs.DFSClient.getDelegationToken(DFSClient.java:1029)

org.apache.hadoop.hdfs.DistributedFileSystem.getDelegationToken(DistributedFileSystem.java:1355)

org.apache.hadoop.fs.FileSystem.collectDelegationTokens(FileSystem.java:529)

org.apache.hadoop.fs.FileSystem.addDelegationTokens(FileSystem.java:507)

org.apache.hadoop.hdfs.DistributedFileSystem.addDelegationTokens(DistributedFileSystem.java:2041)

org.apache.hadoop.mapreduce.security.TokenCache.obtainTokensForNamenodesInternal(TokenCache.java:121)

org.apache.hadoop.mapreduce.security.TokenCache.obtainTokensForNamenodesInternal(TokenCache.java:100)

org.apache.hadoop.mapreduce.security.TokenCache.obtainTokensForNamenodes(TokenCache.java:80)

org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:205)

org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:313)

org.apache.hive.hcatalog.mapreduce.HCatBaseInputFormat.getSplits(HCatBaseInputFormat.java:157)

org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormatBase.createInputSplits(HadoopInputFormatBase.java:140)

org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormatBase.createInputSplits(HadoopInputFormatBase.java:51)

org.apache.flink.runtime.executiongraph.ExecutionJobVertex.<init>(ExecutionJobVertex.java:146)

org.apache.flink.runtime.executiongraph.ExecutionGraph.attachJobGraph(ExecutionGraph.java:469)

org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:534)

org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:190)

scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)

scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)

scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)

org.apache.flink.yarn.ApplicationMasterActor$$anonfun$receiveYarnMessages$1.applyOrElse(ApplicationMasterActor.scala:100)

scala.PartialFunction$OrElse.apply(PartialFunction.scala:162)

org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:36)

org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:29)

scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)

org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:29)

akka.actor.Actor$class.aroundReceive(Actor.scala:465)

org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:92)

akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)

akka.actor.ActorCell.invoke(ActorCell.scala:487)

akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)

akka.dispatch.Mailbox.run(Mailbox.scala:221)

akka.dispatch.Mailbox.exec(Mailbox.scala:231)

scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)

scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)

scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)

scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

 

 

 

Do you have any clue?

 

Best regards,

Arnaud

 

 

 



L'intégrité de ce message n'étant pas assurée sur internet, la société expéditrice ne peut être tenue responsable de son contenu ni de ses pièces jointes. Toute utilisation ou diffusion non autorisée est interdite. Si vous n'êtes pas destinataire de ce message, merci de le détruire et d'avertir l'expéditeur.

The integrity of this message cannot be guaranteed on the Internet. The company that sent this message cannot therefore be held liable for its content nor attachments. Any unauthorized use or dissemination is prohibited. If you are not the intended recipient of this message, then please delete it and notify the sender.

 


Reply | Threaded
Open this post in threaded view
|

Re: Using HadoopInputFormat files from Flink/Yarn in a secure cluster gives an error

Stephan Ewen
I think we need to extend our own FileInputFormats as well to pass the credentials...

On Fri, Aug 21, 2015 at 12:44 PM, Robert Metzger <[hidden email]> wrote:
I was able to reproduce the issue. This is the JIRA: https://issues.apache.org/jira/browse/FLINK-2555
I've already opened a pull request with the fix.

The problem was that our HadoopInputFormat wrapper was not correctly passing the security credentials from the Job object to the cluster.

Consider this code posted by Arnaud in the initial message:

final Job job = Job.getInstance();

        job.setJobName("Flink source for Hive Table " + dbName + "." + tableName);

 

        // Crée la source

        @SuppressWarnings({ "unchecked""rawtypes" })

        final HadoopInputFormat<NullWritable, DefaultHCatRecord> inputFormat = new HadoopInputFormat<NullWritable, // CHECKSTYLE:OFF

        DefaultHCatRecord>(// CHECKSTYLE:ON

            (InputFormat) HCatInputFormat.setInput(jobdbNametableNamefilter), //

            NullWritable.class//

            DefaultHCatRecord.class//

            job);


in the "Job.getInstance()" call, the current authentication credentials of the user are stored.

They are later passed to the HadoopInputFormat class (last line), but Flink was not properly making the Credentials available again on the cluster.


The pull request should resolve the issue (I've verified it on a secured CDH 5.3 setup)


Thank you for reporting the bug!



On Thu, Aug 20, 2015 at 5:21 PM, LINZ, Arnaud <[hidden email]> wrote:

Hi Robert,

 

Yes, it’s an internal tool, which get an HDFS FileSystem instance. I do some Kerberos-related operations, needed because I manipulate some HDFS files before executing the application.

The local cluster mode is working fine with the same code, and it does some HCat reading / HDFS writing.

 

What HdfsTools does, in a nutshell :

  final Configuration cfg = new Configuration();

        cfg.addResource(new Path("/home/hadoop/conf/core-site.xml"));

        cfg.addResource(new Path("/home/hadoop/conf/hdfs-site.xml"));

        cfg.addResource(new Path(Environnement.getEnvVarQuietly("HADOOP_CONF_DIR") + "/core-site.xml"));

        cfg.addResource(new Path(Environnement.getEnvVarQuietly("HADOOP_CONF_DIR") + "/hdfs-site.xml"));

        // Kerberos handling

        if (isKerberosActive()) {

            loginKerberos(cfg);

        }

        filesys = FileSystem.get(cfg);

 

And the straightforward kerberos stuff:

public static synchronized void loginKerberos(Configuration cfg) {

        UserGroupInformation.setConfiguration(cfg);

        if (!loggedIn) {

            try {

                UserGroupInformation.loginUserFromKeytab(getKerberosPrincipal(), getKerberosKeytab());

                loggedIn = true;

                JournalUDF.logLocalFS("User " + UserGroupInformation.getLoginUser() + " : Kerberos login succeeded ");

            }

            catch (IOException excep) {

                throw new GaneshRuntimeException("Unable to log (kerberos) : " + excep.toString(), excep);

            }

        }

    }

loggedIn being static to the class, and alinz having all the proper rights.

 

From what I’ve seen on google, spark and hive/oozie ran into the same error and somewhat corrected that, but I don’t know if it will help to see if it’s really the same pb.

I’m sending you the full trace on a private mail.

 

Arnaud

 

De : Robert Metzger [mailto:[hidden email]]
Envoyé : jeudi 20 août 2015 16:42
À : [hidden email]
Objet : Re: Using HadoopInputFormat files from Flink/Yarn in a secure cluster gives an error

 

Hi Arnaud,

 

I suspect the "HdfsTools" are something internal from your company?

Are they doing any kerberos-related operations?

 

Is the local cluster mode also reading files from the secured HDFS cluster?

 

Flink is taking care of sending the authentication tokens from the client to the jobManager and to the TaskManagers.

For HDFS Flink should also use these user settings.

I don't know whether the HCatalog code / Hadoop compatbililty code is also doing some kerberos operations which are interfering with our efforts.

 

From the logs, you can see:

Secure Hadoop environment setup detected. Running in secure context.
15:04:18,005 INFO  org.apache.hadoop.security.UserGroupInformation               - Login successful for user alinz using keytab file /usr/users/alinz/alinz.keytab

 

Is the user "alinz" authorized to access the files in HDFS?

 

I have to admit that I didn't see this issue before.

If possible, can you privately send the the full log of the application, using "yarn logs -applicationId <ID>" ?

 

 

On Thu, Aug 20, 2015 at 4:08 PM, LINZ, Arnaud <[hidden email]> wrote:

Hello,

 

My application handles as input and output some HDFS files in the jobs and in the driver application.

It works in local cluster mode, but when I’m trying to submit it to a yarn client, when I try to use a HadoopInputFormat (that comes from a HCatalog request), I have the following error: Delegation Token can be issued only with kerberos or web authentication (full stack trace below).

 

Code which I believe causes the error (It’s not clear in the stack trace, as the nearest point in my code is “execEnv.execute()”) :

 

public synchronized DataSet<T> readTable(String dbName, String tableName, String filter, ExecutionEnvironment cluster,

        final HiveBeanFactory<T> factory) throws IOException {

 

        // login kerberos if needed (via UserGroupInformation.loginUserFromKeytab(getKerberosPrincipal(), getKerberosKeytab());)

        HdfsTools.getFileSystem();

 

        // Create M/R job and configure it

        final Job job = Job.getInstance();

        job.setJobName("Flink source for Hive Table " + dbName + "." + tableName);

 

        // Crée la source

        @SuppressWarnings({ "unchecked", "rawtypes" })

        final HadoopInputFormat<NullWritable, DefaultHCatRecord> inputFormat = new HadoopInputFormat<NullWritable, // CHECKSTYLE:OFF

        DefaultHCatRecord>(// CHECKSTYLE:ON

            (InputFormat) HCatInputFormat.setInput(job, dbName, tableName, filter), //

            NullWritable.class, //

            DefaultHCatRecord.class, //

            job);

 

        final HCatSchema inputSchema = HCatInputFormat.getTableSchema(job.getConfiguration());

        @SuppressWarnings("serial")

        final DataSet<T> dataSet = cluster

        // Read the table

            .createInput(inputFormat)

            // map bean (key is useless)

            .flatMap(new FlatMapFunction<Tuple2<NullWritable, DefaultHCatRecord>, T>() {

                @Override

                public void flatMap(Tuple2<NullWritable, DefaultHCatRecord> value, Collector<T> out) throws Exception {  // NOPMD

                    final T record = factory.fromHive(value.f1, inputSchema);

                    if (record != null) {

                        out.collect(record);

                    }

                }

            }).returns(beanClass);

 

        return dataSet;

    }

 

Maybe I need to explicitely get a token on each node in the initialization of HadoopInputFormat() (overriding configure()) ? That would be difficult since the keyfile is on the driver’s local drive…

 

StackTrace :

 

Found YARN properties file /usr/lib/flink/bin/../conf/.yarn-properties

Using JobManager address from YARN properties bt1svlmw.bpa.bouyguestelecom.fr/172.19.115.52:50494

Secure Hadoop environment setup detected. Running in secure context.

2015:08:20 15:04:17 (main) - INFO - com.bouygtel.kuberasdk.main.Application.mainMethod - Dï¿œbut traitement

15:04:18,005 INFO  org.apache.hadoop.security.UserGroupInformation               - Login successful for user alinz using keytab file /usr/users/alinz/alinz.keytab

15:04:20,139 WARN  org.apache.hadoop.hdfs.shortcircuit.DomainSocketFactory       - The short-circuit local reads feature cannot be used because libhadoop cannot be loaded.

Error : Execution Kubera KO : java.lang.IllegalStateException: Error while executing Flink application

com.bouygtel.kuberasdk.main.ApplicationBatch.execCluster(ApplicationBatch.java:84)

com.bouygtel.kubera.main.segment.ApplicationGeoSegment.batchExec(ApplicationGeoSegment.java:68)

com.bouygtel.kuberasdk.main.ApplicationBatch.exec(ApplicationBatch.java:51)

com.bouygtel.kuberasdk.main.Application.mainMethod(Application.java:81)

com.bouygtel.kubera.main.segment.MainGeoSmooth.main(MainGeoSmooth.java:44)

sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)

sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

java.lang.reflect.Method.invoke(Method.java:606)

org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:437)

org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:353)

org.apache.flink.client.program.Client.run(Client.java:315)

org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:584)

org.apache.flink.client.CliFrontend.run(CliFrontend.java:290)

org.apache.flink.client.CliFrontend$2.run(CliFrontend.java:873)

org.apache.flink.client.CliFrontend$2.run(CliFrontend.java:870)

org.apache.flink.runtime.security.SecurityUtils$1.run(SecurityUtils.java:50)

java.security.AccessController.doPrivileged(Native Method)

javax.security.auth.Subject.doAs(Subject.java:415)

org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1628)

org.apache.flink.runtime.security.SecurityUtils.runSecured(SecurityUtils.java:47)

org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:870)

org.apache.flink.client.CliFrontend.main(CliFrontend.java:922)

 

Caused by: org.apache.flink.client.program.ProgramInvocationException: The program execution failed: Failed to submit job dddaf104260eb0f56ff336727ceeb49e (KUBERA-GEO-BRUT2SEGMENT)

org.apache.flink.client.program.Client.run(Client.java:413)

org.apache.flink.client.program.Client.run(Client.java:356)

org.apache.flink.client.program.Client.run(Client.java:349)

org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:63)

com.bouygtel.kuberasdk.main.ApplicationBatch.execCluster(ApplicationBatch.java:80)

com.bouygtel.kubera.main.segment.ApplicationGeoSegment.batchExec(ApplicationGeoSegment.java:68)

com.bouygtel.kuberasdk.main.ApplicationBatch.exec(ApplicationBatch.java:51)

com.bouygtel.kuberasdk.main.Application.mainMethod(Application.java:81)

com.bouygtel.kubera.main.segment.MainGeoSmooth.main(MainGeoSmooth.java:44)

sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)

sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

java.lang.reflect.Method.invoke(Method.java:606)

org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:437)

org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:353)

org.apache.flink.client.program.Client.run(Client.java:315)

org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:584)

org.apache.flink.client.CliFrontend.run(CliFrontend.java:290)

org.apache.flink.client.CliFrontend$2.run(CliFrontend.java:873)

org.apache.flink.client.CliFrontend$2.run(CliFrontend.java:870)

org.apache.flink.runtime.security.SecurityUtils$1.run(SecurityUtils.java:50)

java.security.AccessController.doPrivileged(Native Method)

javax.security.auth.Subject.doAs(Subject.java:415)

org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1628)

org.apache.flink.runtime.security.SecurityUtils.runSecured(SecurityUtils.java:47)

org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:870)

org.apache.flink.client.CliFrontend.main(CliFrontend.java:922)

 

Caused by: org.apache.flink.runtime.client.JobExecutionException: Failed to submit job dddaf104260eb0f56ff336727ceeb49e (KUBERA-GEO-BRUT2SEGMENT)

org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:594)

org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:190)

scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)

scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)

scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)

org.apache.flink.yarn.ApplicationMasterActor$$anonfun$receiveYarnMessages$1.applyOrElse(ApplicationMasterActor.scala:100)

scala.PartialFunction$OrElse.apply(PartialFunction.scala:162)

org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:36)

org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:29)

scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)

org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:29)

akka.actor.Actor$class.aroundReceive(Actor.scala:465)

org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:92)

akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)

akka.actor.ActorCell.invoke(ActorCell.scala:487)

akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)

akka.dispatch.Mailbox.run(Mailbox.scala:221)

akka.dispatch.Mailbox.exec(Mailbox.scala:231)

scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)

scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)

scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)

scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

 

Caused by: org.apache.flink.runtime.JobException: Creating the input splits caused an error: Delegation Token can be issued only with kerberos or web authentication

        at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getDelegationToken(FSNamesystem.java:7609)

        at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.getDelegationToken(NameNodeRpcServer.java:522)

        at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.getDelegationToken(ClientNamenodeProtocolServerSideTranslatorPB.java:977)

        at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)

        at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:619)

        at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:962)

        at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2039)

        at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2035)

        at java.security.AccessController.doPrivileged(Native Method)

        at javax.security.auth.Subject.doAs(Subject.java:415)

        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1628)

        at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2033)

 

org.apache.flink.runtime.executiongraph.ExecutionJobVertex.<init>(ExecutionJobVertex.java:162)

org.apache.flink.runtime.executiongraph.ExecutionGraph.attachJobGraph(ExecutionGraph.java:469)

org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:534)

org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:190)

scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)

scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)

scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)

org.apache.flink.yarn.ApplicationMasterActor$$anonfun$receiveYarnMessages$1.applyOrElse(ApplicationMasterActor.scala:100)

scala.PartialFunction$OrElse.apply(PartialFunction.scala:162)

org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:36)

org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:29)

scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)

org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:29)

akka.actor.Actor$class.aroundReceive(Actor.scala:465)

org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:92)

akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)

akka.actor.ActorCell.invoke(ActorCell.scala:487)

akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)

akka.dispatch.Mailbox.run(Mailbox.scala:221)

akka.dispatch.Mailbox.exec(Mailbox.scala:231)

scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)

scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)

scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)

scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

 

Caused by: org.apache.hadoop.ipc.RemoteException(java.io.IOException): Delegation Token can be issued only with kerberos or web authentication

        at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getDelegationToken(FSNamesystem.java:7609)

        at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.getDelegationToken(NameNodeRpcServer.java:522)

        at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.getDelegationToken(ClientNamenodeProtocolServerSideTranslatorPB.java:977)

        at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)

        at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:619)

        at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:962)

        at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2039)

        at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2035)

        at java.security.AccessController.doPrivileged(Native Method)

        at javax.security.auth.Subject.doAs(Subject.java:415)

        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1628)

        at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2033)

 

org.apache.hadoop.ipc.Client.call(Client.java:1468)

org.apache.hadoop.ipc.Client.call(Client.java:1399)

org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:232)

com.sun.proxy.$Proxy14.getDelegationToken(Unknown Source)

org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getDelegationToken(ClientNamenodeProtocolTranslatorPB.java:909)

sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)

sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

java.lang.reflect.Method.invoke(Method.java:606)

org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:187)

org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)

com.sun.proxy.$Proxy15.getDelegationToken(Unknown Source)

org.apache.hadoop.hdfs.DFSClient.getDelegationToken(DFSClient.java:1029)

org.apache.hadoop.hdfs.DistributedFileSystem.getDelegationToken(DistributedFileSystem.java:1355)

org.apache.hadoop.fs.FileSystem.collectDelegationTokens(FileSystem.java:529)

org.apache.hadoop.fs.FileSystem.addDelegationTokens(FileSystem.java:507)

org.apache.hadoop.hdfs.DistributedFileSystem.addDelegationTokens(DistributedFileSystem.java:2041)

org.apache.hadoop.mapreduce.security.TokenCache.obtainTokensForNamenodesInternal(TokenCache.java:121)

org.apache.hadoop.mapreduce.security.TokenCache.obtainTokensForNamenodesInternal(TokenCache.java:100)

org.apache.hadoop.mapreduce.security.TokenCache.obtainTokensForNamenodes(TokenCache.java:80)

org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:205)

org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:313)

org.apache.hive.hcatalog.mapreduce.HCatBaseInputFormat.getSplits(HCatBaseInputFormat.java:157)

org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormatBase.createInputSplits(HadoopInputFormatBase.java:140)

org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormatBase.createInputSplits(HadoopInputFormatBase.java:51)

org.apache.flink.runtime.executiongraph.ExecutionJobVertex.<init>(ExecutionJobVertex.java:146)

org.apache.flink.runtime.executiongraph.ExecutionGraph.attachJobGraph(ExecutionGraph.java:469)

org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:534)

org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:190)

scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)

scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)

scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)

org.apache.flink.yarn.ApplicationMasterActor$$anonfun$receiveYarnMessages$1.applyOrElse(ApplicationMasterActor.scala:100)

scala.PartialFunction$OrElse.apply(PartialFunction.scala:162)

org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:36)

org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:29)

scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)

org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:29)

akka.actor.Actor$class.aroundReceive(Actor.scala:465)

org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:92)

akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)

akka.actor.ActorCell.invoke(ActorCell.scala:487)

akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)

akka.dispatch.Mailbox.run(Mailbox.scala:221)

akka.dispatch.Mailbox.exec(Mailbox.scala:231)

scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)

scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)

scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)

scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

 

 

 

Do you have any clue?

 

Best regards,

Arnaud

 

 

 



L'intégrité de ce message n'étant pas assurée sur internet, la société expéditrice ne peut être tenue responsable de son contenu ni de ses pièces jointes. Toute utilisation ou diffusion non autorisée est interdite. Si vous n'êtes pas destinataire de ce message, merci de le détruire et d'avertir l'expéditeur.

The integrity of this message cannot be guaranteed on the Internet. The company that sent this message cannot therefore be held liable for its content nor attachments. Any unauthorized use or dissemination is prohibited. If you are not the intended recipient of this message, then please delete it and notify the sender.