Failed checkpointing on HDFS : Flink don't use the right authentication

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

Failed checkpointing on HDFS : Flink don't use the right authentication

Bruno Michelin Rakotondranaivo

Hi,

 

With flink-1.2.0, I want to consume datas from secured kafka 0.10 with SASL_PLAINTEXT protocol using login/pwd from a JAAS file and store them on HDFS in a kerberized cluster with user ‘hive’ as kerberos principal login.  

 

Checkpointing is enabled and states are back end on HDFS ‘filesystem’.

 

There is an error when the job want to initialize checkpoints. The app uses JAAS authentication instead of Kerberos one to write on HDFS.

 

15:32:56,300 INFO  org.apache.flink.runtime.taskmanager.Task                     - Source: Custom Source -> Flat Map -> Flat Map -> Sink: Unnamed (1/1) (373063aff1529c7e2ba362ad30c8b03c) switched from RUNNING to FAILED.

java.lang.Exception: Error while triggering checkpoint 1 for Source: Custom Source -> Flat Map -> Flat Map -> Sink: Unnamed (1/1)

                at org.apache.flink.runtime.taskmanager.Task$3.run(Task.java:1120)

                at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)

                at java.util.concurrent.FutureTask.run(FutureTask.java:262)

                at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)

                at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)

                at java.lang.Thread.run(Thread.java:745)

Caused by: java.lang.Exception: Could not perform checkpoint 1 for operator Source: Custom Source -> Flat Map -> Flat Map -> Sink: Unnamed (1/1).

                at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:534)

                at org.apache.flink.runtime.taskmanager.Task$3.run(Task.java:1111)

                ... 5 more

Caused by: java.io.IOException: The given file URI (hdfs://mynamenode:8020/user/hive/flink-apps/my-app/checkpoints) points to the HDFS NameNode at mynamenode:8020, but the File System could not be initialized with that address.

                at org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.initialize(HadoopFileSystem.java:334)

                at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:288)

                at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:310)

                at org.apache.flink.core.fs.Path.getFileSystem(Path.java:293)

                at org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory.<init>(FsCheckpointStreamFactory.java:105)

                at org.apache.flink.runtime.state.filesystem.FsStateBackend.createStreamFactory(FsStateBackend.java:172)

                at org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.createStreamFactory(StreamTask.java:1155)

                at org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1137)

                at org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1076)

                at org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:641)

                at org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:586)

                at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:529)

                ... 6 more

Caused by: java.lang.NullPointerException

                at org.apache.kafka.common.security.plain.PlainSaslServer$PlainSaslServerFactory.getMechanismNames(PlainSaslServer.java:163)

                at org.apache.hadoop.security.SaslRpcServer$FastSaslServerFactory.<init>(SaslRpcServer.java:378)

                at org.apache.hadoop.security.SaslRpcServer.init(SaslRpcServer.java:183)

                at org.apache.hadoop.ipc.RPC.getProtocolProxy(RPC.java:568)

                at org.apache.hadoop.hdfs.NameNodeProxies.createNNProxyWithClientProtocol(NameNodeProxies.java:418)

                at org.apache.hadoop.hdfs.NameNodeProxies.createNonHAProxy(NameNodeProxies.java:314)

                at org.apache.hadoop.hdfs.NameNodeProxies.createProxy(NameNodeProxies.java:176)

                at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:668)

                at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:604)

                at org.apache.hadoop.hdfs.DistributedFileSystem.initialize(DistributedFileSystem.java:148)

                at org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.initialize(HadoopFileSystem.java:320)

                ... 17 more

 

 

What I have misunderstand?

How can I use JAAS in/ or with Kerberos?

 

Thanks in advance

 

MR

 

Reply | Threaded
Open this post in threaded view
|

Re: Failed checkpointing on HDFS : Flink don't use the right authentication

Aljoscha Krettek
+Gordon Could you please have a look at this? You probably know Kafka best by now and have also worked on security related stuff for a while now.

I’m afraid I’m not much help here but I’m hoping Gordon can help.

Best,
Aljoscha
On 21. Apr 2017, at 12:46, Bruno Michelin Rakotondranaivo <[hidden email]> wrote:

Hi,
 
With flink-1.2.0, I want to consume datas from secured kafka 0.10 with SASL_PLAINTEXT protocol using login/pwd from a JAAS file and store them on HDFS in a kerberized cluster with user ‘hive’ as kerberos principal login.  
 
Checkpointing is enabled and states are back end on HDFS ‘filesystem’.
 
There is an error when the job want to initialize checkpoints. The app uses JAAS authentication instead of Kerberos one to write on HDFS.
 
15:32:56,300 INFO  org.apache.flink.runtime.taskmanager.Task                     - Source: Custom Source -> Flat Map -> Flat Map -> Sink: Unnamed (1/1) (373063aff1529c7e2ba362ad30c8b03c) switched from RUNNING to FAILED.
java.lang.Exception: Error while triggering checkpoint 1 for Source: Custom Source -> Flat Map -> Flat Map -> Sink: Unnamed (1/1)
                at org.apache.flink.runtime.taskmanager.Task$3.run(Task.java:1120)
                at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
                at java.util.concurrent.FutureTask.run(FutureTask.java:262)
                at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
                at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
                at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.Exception: Could not perform checkpoint 1 for operator Source: Custom Source -> Flat Map -> Flat Map -> Sink: Unnamed (1/1).
                at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:534)
                at org.apache.flink.runtime.taskmanager.Task$3.run(Task.java:1111)
                ... 5 more
Caused by: java.io.IOException: The given file URI (<a href="hdfs://mynamenode:8020/user/hive/flink-apps/my-app/checkpoints" style="color: purple; text-decoration: underline;" class="">hdfs://mynamenode:8020/user/hive/flink-apps/my-app/checkpoints) points to the HDFS NameNode at mynamenode:8020, but the File System could not be initialized with that address.
                at org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.initialize(HadoopFileSystem.java:334)
                at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:288)
                at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:310)
                at org.apache.flink.core.fs.Path.getFileSystem(Path.java:293)
                at org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory.<init>(FsCheckpointStreamFactory.java:105)
                at org.apache.flink.runtime.state.filesystem.FsStateBackend.createStreamFactory(FsStateBackend.java:172)
                at org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.createStreamFactory(StreamTask.java:1155)
                at org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1137)
                at org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1076)
                at org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:641)
                at org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:586)
                at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:529)
                ... 6 more
Caused by: java.lang.NullPointerException
                at org.apache.kafka.common.security.plain.PlainSaslServer$PlainSaslServerFactory.getMechanismNames(PlainSaslServer.java:163)
                at org.apache.hadoop.security.SaslRpcServer$FastSaslServerFactory.<init>(SaslRpcServer.java:378)
                at org.apache.hadoop.security.SaslRpcServer.init(SaslRpcServer.java:183)
                at org.apache.hadoop.ipc.RPC.getProtocolProxy(RPC.java:568)
                at org.apache.hadoop.hdfs.NameNodeProxies.createNNProxyWithClientProtocol(NameNodeProxies.java:418)
                at org.apache.hadoop.hdfs.NameNodeProxies.createNonHAProxy(NameNodeProxies.java:314)
                at org.apache.hadoop.hdfs.NameNodeProxies.createProxy(NameNodeProxies.java:176)
                at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:668)
                at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:604)
                at org.apache.hadoop.hdfs.DistributedFileSystem.initialize(DistributedFileSystem.java:148)
                at org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.initialize(HadoopFileSystem.java:320)
                ... 17 more
 
 
What I have misunderstand? 
How can I use JAAS in/ or with Kerberos?
 
Thanks in advance
 
MR

Reply | Threaded
Open this post in threaded view
|

RE: Failed checkpointing on HDFS : Flink don't use the right authentication

Bruno Michelin Rakotondranaivo
In reply to this post by Bruno Michelin Rakotondranaivo

Hi all,

 

FYI, this issue seems to be fixed in flink 1.2.1.

 

Regards,

 

 

From: Bruno Michelin Rakotondranaivo [mailto:[hidden email]]
Sent: vendredi 21 avril 2017 12:47
To: [hidden email]
Subject: Failed checkpointing on HDFS : Flink don't use the right authentication

 

Hi,

 

With flink-1.2.0, I want to consume datas from secured kafka 0.10 with SASL_PLAINTEXT protocol using login/pwd from a JAAS file and store them on HDFS in a kerberized cluster with user ‘hive’ as kerberos principal login.  

 

Checkpointing is enabled and states are back end on HDFS ‘filesystem’.

 

There is an error when the job want to initialize checkpoints. The app uses JAAS authentication instead of Kerberos one to write on HDFS.

 

15:32:56,300 INFO  org.apache.flink.runtime.taskmanager.Task                     - Source: Custom Source -> Flat Map -> Flat Map -> Sink: Unnamed (1/1) (373063aff1529c7e2ba362ad30c8b03c) switched from RUNNING to FAILED.

java.lang.Exception: Error while triggering checkpoint 1 for Source: Custom Source -> Flat Map -> Flat Map -> Sink: Unnamed (1/1)

                at org.apache.flink.runtime.taskmanager.Task$3.run(Task.java:1120)

                at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)

                at java.util.concurrent.FutureTask.run(FutureTask.java:262)

                at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)

                at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)

                at java.lang.Thread.run(Thread.java:745)

Caused by: java.lang.Exception: Could not perform checkpoint 1 for operator Source: Custom Source -> Flat Map -> Flat Map -> Sink: Unnamed (1/1).

                at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:534)

                at org.apache.flink.runtime.taskmanager.Task$3.run(Task.java:1111)

                ... 5 more

Caused by: java.io.IOException: The given file URI (hdfs://mynamenode:8020/user/hive/flink-apps/my-app/checkpoints) points to the HDFS NameNode at mynamenode:8020, but the File System could not be initialized with that address.

                at org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.initialize(HadoopFileSystem.java:334)

                at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:288)

                at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:310)

                at org.apache.flink.core.fs.Path.getFileSystem(Path.java:293)

                at org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory.<init>(FsCheckpointStreamFactory.java:105)

                at org.apache.flink.runtime.state.filesystem.FsStateBackend.createStreamFactory(FsStateBackend.java:172)

                at org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.createStreamFactory(StreamTask.java:1155)

                at org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1137)

                at org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1076)

                at org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:641)

                at org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:586)

                at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:529)

                ... 6 more

Caused by: java.lang.NullPointerException

                at org.apache.kafka.common.security.plain.PlainSaslServer$PlainSaslServerFactory.getMechanismNames(PlainSaslServer.java:163)

                at org.apache.hadoop.security.SaslRpcServer$FastSaslServerFactory.<init>(SaslRpcServer.java:378)

                at org.apache.hadoop.security.SaslRpcServer.init(SaslRpcServer.java:183)

                at org.apache.hadoop.ipc.RPC.getProtocolProxy(RPC.java:568)

                at org.apache.hadoop.hdfs.NameNodeProxies.createNNProxyWithClientProtocol(NameNodeProxies.java:418)

                at org.apache.hadoop.hdfs.NameNodeProxies.createNonHAProxy(NameNodeProxies.java:314)

                at org.apache.hadoop.hdfs.NameNodeProxies.createProxy(NameNodeProxies.java:176)

                at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:668)

                at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:604)

                at org.apache.hadoop.hdfs.DistributedFileSystem.initialize(DistributedFileSystem.java:148)

                at org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.initialize(HadoopFileSystem.java:320)

                ... 17 more

 

 

What I have misunderstand?

How can I use JAAS in/ or with Kerberos?

 

Thanks in advance

 

MR

 

Reply | Threaded
Open this post in threaded view
|

RE: Failed checkpointing on HDFS : Flink don't use the right authentication

Tzu-Li (Gordon) Tai
Hi Bruno,

Thanks for reporting this! And sorry for the stale response here, this one slipped out of my notice.

As far as I can tell, this seems to have been fixed indirectly by https://issues.apache.org/jira/browse/FLINK-5949.

Cheers,
Gordon


On 18 May 2017 at 3:15:18 PM, Bruno Michelin Rakotondranaivo ([hidden email]) wrote:

Hi all,

 

FYI, this issue seems to be fixed in flink 1.2.1.

 

Regards,

 

 

From: Bruno Michelin Rakotondranaivo [mailto:[hidden email]]
Sent: vendredi 21 avril 2017 12:47
To: [hidden email]
Subject: Failed checkpointing on HDFS : Flink don't use the right authentication

 

Hi,

 

With flink-1.2.0, I want to consume datas from secured kafka 0.10 with SASL_PLAINTEXT protocol using login/pwd from a JAAS file and store them on HDFS in a kerberized cluster with user ‘hive’ as kerberos principal login.  

 

Checkpointing is enabled and states are back end on HDFS ‘filesystem’.

 

There is an error when the job want to initialize checkpoints. The app uses JAAS authentication instead of Kerberos one to write on HDFS.

 

15:32:56,300 INFO  org.apache.flink.runtime.taskmanager.Task                     - Source: Custom Source -> Flat Map -> Flat Map -> Sink: Unnamed (1/1) (373063aff1529c7e2ba362ad30c8b03c) switched from RUNNING to FAILED.

java.lang.Exception: Error while triggering checkpoint 1 for Source: Custom Source -> Flat Map -> Flat Map -> Sink: Unnamed (1/1)

                at org.apache.flink.runtime.taskmanager.Task$3.run(Task.java:1120)

                at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)

                at java.util.concurrent.FutureTask.run(FutureTask.java:262)

                at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)

                at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)

                at java.lang.Thread.run(Thread.java:745)

Caused by: java.lang.Exception: Could not perform checkpoint 1 for operator Source: Custom Source -> Flat Map -> Flat Map -> Sink: Unnamed (1/1).

                at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:534)

                at org.apache.flink.runtime.taskmanager.Task$3.run(Task.java:1111)

                ... 5 more

Caused by: java.io.IOException: The given file URI (hdfs://mynamenode:8020/user/hive/flink-apps/my-app/checkpoints) points to the HDFS NameNode at mynamenode:8020, but the File System could not be initialized with that address.

                at org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.initialize(HadoopFileSystem.java:334)

                at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:288)

                at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:310)

                at org.apache.flink.core.fs.Path.getFileSystem(Path.java:293)

                at org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory.<init>(FsCheckpointStreamFactory.java:105)

                at org.apache.flink.runtime.state.filesystem.FsStateBackend.createStreamFactory(FsStateBackend.java:172)

                at org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.createStreamFactory(StreamTask.java:1155)

                at org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1137)

                at org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1076)

                at org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:641)

                at org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:586)

                at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:529)

                ... 6 more

Caused by: java.lang.NullPointerException

                at org.apache.kafka.common.security.plain.PlainSaslServer$PlainSaslServerFactory.getMechanismNames(PlainSaslServer.java:163)

                at org.apache.hadoop.security.SaslRpcServer$FastSaslServerFactory.<init>(SaslRpcServer.java:378)

                at org.apache.hadoop.security.SaslRpcServer.init(SaslRpcServer.java:183)

                at org.apache.hadoop.ipc.RPC.getProtocolProxy(RPC.java:568)

                at org.apache.hadoop.hdfs.NameNodeProxies.createNNProxyWithClientProtocol(NameNodeProxies.java:418)

                at org.apache.hadoop.hdfs.NameNodeProxies.createNonHAProxy(NameNodeProxies.java:314)

                at org.apache.hadoop.hdfs.NameNodeProxies.createProxy(NameNodeProxies.java:176)

                at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:668)

                at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:604)

                at org.apache.hadoop.hdfs.DistributedFileSystem.initialize(DistributedFileSystem.java:148)

                at org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.initialize(HadoopFileSystem.java:320)

                ... 17 more

 

 

What I have misunderstand?

How can I use JAAS in/ or with Kerberos?

 

Thanks in advance

 

MR