S3 connector Hadoop class mismatch

classic Classic list List threaded Threaded
6 messages Options
Reply | Threaded
Open this post in threaded view
|

S3 connector Hadoop class mismatch

Paul Lam
Hi,

I’m using StreamingFileSink of 1.6.0 to write logs to S3 and encounter a classloader problem. It seems that there are conflicts in flink-shaded-hadoop2-uber-1.6.0.jar and flink-s3-fs-hadoop-1.6.0.jar, and maybe related to class loading orders.

Did anyone meet this problem? Thanks a lot!

The stack traces are as below:

java.io.IOException: java.lang.RuntimeException: class org.apache.hadoop.security.LdapGroupsMapping not org.apache.flink.fs.s3hadoop.shaded.org.apache.hadoop.security.GroupMappingServiceProvider
        at org.apache.flink.fs.s3hadoop.shaded.org.apache.flink.runtime.fs.hdfs.AbstractFileSystemFactory.create(AbstractFileSystemFactory.java:62)
        at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:395)
        at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:318)
        at org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.<init>(Buckets.java:111)
        at org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink$RowFormatBuilder.createBuckets(StreamingFileSink.java:242)
        at org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.initializeState(StreamingFileSink.java:327)
        at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)
        at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160)
        at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
        at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:254)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:738)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:289)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.RuntimeException: java.lang.RuntimeException: class org.apache.hadoop.security.LdapGroupsMapping not org.apache.flink.fs.s3hadoop.shaded.org.apache.hadoop.security.GroupMappingServiceProvider
        at org.apache.flink.fs.s3hadoop.shaded.org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2246)
        at org.apache.flink.fs.s3hadoop.shaded.org.apache.hadoop.security.Groups.<init>(Groups.java:108)
        at org.apache.flink.fs.s3hadoop.shaded.org.apache.hadoop.security.Groups.<init>(Groups.java:102)
        at org.apache.flink.fs.s3hadoop.shaded.org.apache.hadoop.security.Groups.getUserToGroupsMappingService(Groups.java:450)
        at org.apache.flink.fs.s3hadoop.shaded.org.apache.hadoop.security.UserGroupInformation.initialize(UserGroupInformation.java:309)
        at org.apache.flink.fs.s3hadoop.shaded.org.apache.hadoop.security.UserGroupInformation.ensureInitialized(UserGroupInformation.java:276)
        at org.apache.flink.fs.s3hadoop.shaded.org.apache.hadoop.security.UserGroupInformation.loginUserFromSubject(UserGroupInformation.java:832)
        at org.apache.flink.fs.s3hadoop.shaded.org.apache.hadoop.security.UserGroupInformation.getLoginUser(UserGroupInformation.java:802)
        at org.apache.flink.fs.s3hadoop.shaded.org.apache.hadoop.security.UserGroupInformation.getCurrentUser(UserGroupInformation.java:675)
        at org.apache.flink.fs.s3hadoop.shaded.org.apache.hadoop.fs.s3a.S3AFileSystem.initialize(S3AFileSystem.java:177)
        at org.apache.flink.fs.s3hadoop.shaded.org.apache.flink.runtime.fs.hdfs.AbstractFileSystemFactory.create(AbstractFileSystemFactory.java:57)
        ... 13 more
Caused by: java.lang.RuntimeException: class org.apache.hadoop.security.LdapGroupsMapping not org.apache.flink.fs.s3hadoop.shaded.org.apache.hadoop.security.GroupMappingServiceProvider
        at org.apache.flink.fs.s3hadoop.shaded.org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2240)
        ... 23 more


Best,
Paul Lam



Reply | Threaded
Open this post in threaded view
|

Re: S3 connector Hadoop class mismatch

Stefan Richter
Hi,

I could not find any open Jira for the problem you describe. Could you please open one?

Best,
Stefan

> Am 19.09.2018 um 09:54 schrieb Paul Lam <[hidden email]>:
>
> Hi,
>
> I’m using StreamingFileSink of 1.6.0 to write logs to S3 and encounter a classloader problem. It seems that there are conflicts in flink-shaded-hadoop2-uber-1.6.0.jar and flink-s3-fs-hadoop-1.6.0.jar, and maybe related to class loading orders.
>
> Did anyone meet this problem? Thanks a lot!
>
> The stack traces are as below:
>
> java.io.IOException: java.lang.RuntimeException: class org.apache.hadoop.security.LdapGroupsMapping not org.apache.flink.fs.s3hadoop.shaded.org.apache.hadoop.security.GroupMappingServiceProvider
> at org.apache.flink.fs.s3hadoop.shaded.org.apache.flink.runtime.fs.hdfs.AbstractFileSystemFactory.create(AbstractFileSystemFactory.java:62)
> at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:395)
> at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:318)
> at org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.<init>(Buckets.java:111)
> at org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink$RowFormatBuilder.createBuckets(StreamingFileSink.java:242)
> at org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.initializeState(StreamingFileSink.java:327)
> at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)
> at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160)
> at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
> at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:254)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:738)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:289)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.RuntimeException: java.lang.RuntimeException: class org.apache.hadoop.security.LdapGroupsMapping not org.apache.flink.fs.s3hadoop.shaded.org.apache.hadoop.security.GroupMappingServiceProvider
> at org.apache.flink.fs.s3hadoop.shaded.org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2246)
> at org.apache.flink.fs.s3hadoop.shaded.org.apache.hadoop.security.Groups.<init>(Groups.java:108)
> at org.apache.flink.fs.s3hadoop.shaded.org.apache.hadoop.security.Groups.<init>(Groups.java:102)
> at org.apache.flink.fs.s3hadoop.shaded.org.apache.hadoop.security.Groups.getUserToGroupsMappingService(Groups.java:450)
> at org.apache.flink.fs.s3hadoop.shaded.org.apache.hadoop.security.UserGroupInformation.initialize(UserGroupInformation.java:309)
> at org.apache.flink.fs.s3hadoop.shaded.org.apache.hadoop.security.UserGroupInformation.ensureInitialized(UserGroupInformation.java:276)
> at org.apache.flink.fs.s3hadoop.shaded.org.apache.hadoop.security.UserGroupInformation.loginUserFromSubject(UserGroupInformation.java:832)
> at org.apache.flink.fs.s3hadoop.shaded.org.apache.hadoop.security.UserGroupInformation.getLoginUser(UserGroupInformation.java:802)
> at org.apache.flink.fs.s3hadoop.shaded.org.apache.hadoop.security.UserGroupInformation.getCurrentUser(UserGroupInformation.java:675)
> at org.apache.flink.fs.s3hadoop.shaded.org.apache.hadoop.fs.s3a.S3AFileSystem.initialize(S3AFileSystem.java:177)
> at org.apache.flink.fs.s3hadoop.shaded.org.apache.flink.runtime.fs.hdfs.AbstractFileSystemFactory.create(AbstractFileSystemFactory.java:57)
> ... 13 more
> Caused by: java.lang.RuntimeException: class org.apache.hadoop.security.LdapGroupsMapping not org.apache.flink.fs.s3hadoop.shaded.org.apache.hadoop.security.GroupMappingServiceProvider
> at org.apache.flink.fs.s3hadoop.shaded.org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2240)
> ... 23 more
>
>
> Best,
> Paul Lam
>
>
>

Reply | Threaded
Open this post in threaded view
|

Re: S3 connector Hadoop class mismatch

Stephan Ewen
Hi!

A few questions to diagnose/fix this:

 Do you explicitly configure the "hadoop.security.group.mapping"?

  - If not, this setting may have leaked in from a Hadoop config in the classpath. We are fixing this in Flink 1.7, to make this insensitive to such settings leaking in.

  - If yes, then please try setting the config variable to "hadoop.security.group.mapping: org.apache.flink.fs.s3hadoop.shaded.org.apache.hadoop.security.LdapGroupsMapping"?

Please let us know if that works!



On Thu, Sep 20, 2018 at 1:40 PM, Stefan Richter <[hidden email]> wrote:
Hi,

I could not find any open Jira for the problem you describe. Could you please open one?

Best,
Stefan

> Am 19.09.2018 um 09:54 schrieb Paul Lam <[hidden email]>:
>
> Hi,
>
> I’m using StreamingFileSink of 1.6.0 to write logs to S3 and encounter a classloader problem. It seems that there are conflicts in flink-shaded-hadoop2-uber-1.6.0.jar and flink-s3-fs-hadoop-1.6.0.jar, and maybe related to class loading orders.
>
> Did anyone meet this problem? Thanks a lot!
>
> The stack traces are as below:
>
> java.io.IOException: java.lang.RuntimeException: class org.apache.hadoop.security.LdapGroupsMapping not org.apache.flink.fs.s3hadoop.shaded.org.apache.hadoop.security.GroupMappingServiceProvider
>       at org.apache.flink.fs.s3hadoop.shaded.org.apache.flink.runtime.fs.hdfs.AbstractFileSystemFactory.create(AbstractFileSystemFactory.java:62)
>       at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:395)
>       at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:318)
>       at org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.<init>(Buckets.java:111)
>       at org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink$RowFormatBuilder.createBuckets(StreamingFileSink.java:242)
>       at org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.initializeState(StreamingFileSink.java:327)
>       at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)
>       at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160)
>       at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
>       at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:254)
>       at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:738)
>       at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:289)
>       at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
>       at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.RuntimeException: java.lang.RuntimeException: class org.apache.hadoop.security.LdapGroupsMapping not org.apache.flink.fs.s3hadoop.shaded.org.apache.hadoop.security.GroupMappingServiceProvider
>       at org.apache.flink.fs.s3hadoop.shaded.org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2246)
>       at org.apache.flink.fs.s3hadoop.shaded.org.apache.hadoop.security.Groups.<init>(Groups.java:108)
>       at org.apache.flink.fs.s3hadoop.shaded.org.apache.hadoop.security.Groups.<init>(Groups.java:102)
>       at org.apache.flink.fs.s3hadoop.shaded.org.apache.hadoop.security.Groups.getUserToGroupsMappingService(Groups.java:450)
>       at org.apache.flink.fs.s3hadoop.shaded.org.apache.hadoop.security.UserGroupInformation.initialize(UserGroupInformation.java:309)
>       at org.apache.flink.fs.s3hadoop.shaded.org.apache.hadoop.security.UserGroupInformation.ensureInitialized(UserGroupInformation.java:276)
>       at org.apache.flink.fs.s3hadoop.shaded.org.apache.hadoop.security.UserGroupInformation.loginUserFromSubject(UserGroupInformation.java:832)
>       at org.apache.flink.fs.s3hadoop.shaded.org.apache.hadoop.security.UserGroupInformation.getLoginUser(UserGroupInformation.java:802)
>       at org.apache.flink.fs.s3hadoop.shaded.org.apache.hadoop.security.UserGroupInformation.getCurrentUser(UserGroupInformation.java:675)
>       at org.apache.flink.fs.s3hadoop.shaded.org.apache.hadoop.fs.s3a.S3AFileSystem.initialize(S3AFileSystem.java:177)
>       at org.apache.flink.fs.s3hadoop.shaded.org.apache.flink.runtime.fs.hdfs.AbstractFileSystemFactory.create(AbstractFileSystemFactory.java:57)
>       ... 13 more
> Caused by: java.lang.RuntimeException: class org.apache.hadoop.security.LdapGroupsMapping not org.apache.flink.fs.s3hadoop.shaded.org.apache.hadoop.security.GroupMappingServiceProvider
>       at org.apache.flink.fs.s3hadoop.shaded.org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2240)
>       ... 23 more
>
>
> Best,
> Paul Lam
>
>
>


Reply | Threaded
Open this post in threaded view
|

Re: S3 connector Hadoop class mismatch

Paul Lam
Hi Stefan, Stephan,

Yes, the `hadoop.security.group.mapping` option is explicitly set to `org.apache.hadoop.security.LdapGroupsMapping`. Guess that was why the classloader found an unshaded class. 

I don’t have the permission to change the Hadoop cluster configurations so I modified the `core-default-shaded.xml` and marked the option as final to solve the problem, after which the class loading exceptions were gone. 

But anther problem came up (likely not related to the previous problem):

In case of the old bucketing sink (version 1.5.3), it seems that the `org.apache.hadoop.fs.s3a.S3AFileSystem` is initiated twice before the task starts running. The first time is called by `org.apache.flink.fs.s3hadoop.S3FileSystemFactory` and works well, but the second time is called by bucketing sink itself, and fails to leverage the `s3.*` parameters like the access key and the secret key. 

The stack traces are as below:

```
com.amazonaws.AmazonClientException: Unable to load AWS credentials from any provider in the chain
	at com.amazonaws.auth.AWSCredentialsProviderChain.getCredentials(AWSCredentialsProviderChain.java:117)
	at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:3521)
	at com.amazonaws.services.s3.AmazonS3Client.headBucket(AmazonS3Client.java:1031)
	at com.amazonaws.services.s3.AmazonS3Client.doesBucketExist(AmazonS3Client.java:994)
	at org.apache.hadoop.fs.s3a.S3AFileSystem.initialize(S3AFileSystem.java:297)
	at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.createHadoopFileSystem(BucketingSink.java:1307)
	at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initFileSystem(BucketingSink.java:426)
	at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initializeState(BucketingSink.java:370)
	at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)
	at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160)
	at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
	at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:254)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:730)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:295)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:712)
	at java.lang.Thread.run(Thread.java:748)
```

I haven’t figured out why the s3a filesystem needs to be initiated twice. And is it a bug that the bucketing sink does not use filesystem factories to create filesystem?

Thank you very much!

Best,
Paul Lam


在 2018年9月20日,23:35,Stephan Ewen <[hidden email]> 写道:

Hi!

A few questions to diagnose/fix this:

 Do you explicitly configure the "hadoop.security.group.mapping"?

  - If not, this setting may have leaked in from a Hadoop config in the classpath. We are fixing this in Flink 1.7, to make this insensitive to such settings leaking in.

  - If yes, then please try setting the config variable to "hadoop.security.group.mapping: org.apache.flink.fs.s3hadoop.shaded.org.apache.hadoop.security.LdapGroupsMapping"?

Please let us know if that works!



On Thu, Sep 20, 2018 at 1:40 PM, Stefan Richter <[hidden email]> wrote:
Hi,

I could not find any open Jira for the problem you describe. Could you please open one?

Best,
Stefan

> Am 19.09.2018 um 09:54 schrieb Paul Lam <[hidden email]>:

> Hi, 

> I’m using StreamingFileSink of 1.6.0 to write logs to S3 and encounter a classloader problem. It seems that there are conflicts in flink-shaded-hadoop2-uber-1.6.0.jar and flink-s3-fs-hadoop-1.6.0.jar, and maybe related to class loading orders. 

> Did anyone meet this problem? Thanks a lot!

> The stack traces are as below:

> java.io.IOException: java.lang.RuntimeException: class org.apache.hadoop.security.LdapGroupsMapping not org.apache.flink.fs.s3hadoop.shaded.org.apache.hadoop.security.GroupMappingServiceProvider
>       at org.apache.flink.fs.s3hadoop.shaded.org.apache.flink.runtime.fs.hdfs.AbstractFileSystemFactory.create(AbstractFileSystemFactory.java:62)
>       at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:395)
>       at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:318)
>       at org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.<init>(Buckets.java:111)
>       at org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink$RowFormatBuilder.createBuckets(StreamingFileSink.java:242)
>       at org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.initializeState(StreamingFileSink.java:327)
>       at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)
>       at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160)
>       at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
>       at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:254)
>       at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:738)
>       at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:289)
>       at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
>       at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.RuntimeException: java.lang.RuntimeException: class org.apache.hadoop.security.LdapGroupsMapping not org.apache.flink.fs.s3hadoop.shaded.org.apache.hadoop.security.GroupMappingServiceProvider
>       at org.apache.flink.fs.s3hadoop.shaded.org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2246)
>       at org.apache.flink.fs.s3hadoop.shaded.org.apache.hadoop.security.Groups.<init>(Groups.java:108)
>       at org.apache.flink.fs.s3hadoop.shaded.org.apache.hadoop.security.Groups.<init>(Groups.java:102)
>       at org.apache.flink.fs.s3hadoop.shaded.org.apache.hadoop.security.Groups.getUserToGroupsMappingService(Groups.java:450)
>       at org.apache.flink.fs.s3hadoop.shaded.org.apache.hadoop.security.UserGroupInformation.initialize(UserGroupInformation.java:309)
>       at org.apache.flink.fs.s3hadoop.shaded.org.apache.hadoop.security.UserGroupInformation.ensureInitialized(UserGroupInformation.java:276)
>       at org.apache.flink.fs.s3hadoop.shaded.org.apache.hadoop.security.UserGroupInformation.loginUserFromSubject(UserGroupInformation.java:832)
>       at org.apache.flink.fs.s3hadoop.shaded.org.apache.hadoop.security.UserGroupInformation.getLoginUser(UserGroupInformation.java:802)
>       at org.apache.flink.fs.s3hadoop.shaded.org.apache.hadoop.security.UserGroupInformation.getCurrentUser(UserGroupInformation.java:675)
>       at org.apache.flink.fs.s3hadoop.shaded.org.apache.hadoop.fs.s3a.S3AFileSystem.initialize(S3AFileSystem.java:177)
>       at org.apache.flink.fs.s3hadoop.shaded.org.apache.flink.runtime.fs.hdfs.AbstractFileSystemFactory.create(AbstractFileSystemFactory.java:57)
>       ... 13 more
> Caused by: java.lang.RuntimeException: class org.apache.hadoop.security.LdapGroupsMapping not org.apache.flink.fs.s3hadoop.shaded.org.apache.hadoop.security.GroupMappingServiceProvider
>       at org.apache.flink.fs.s3hadoop.shaded.org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2240)
>       ... 23 more


> Best,
> Paul Lam





Reply | Threaded
Open this post in threaded view
|

Re: S3 connector Hadoop class mismatch

Paul Lam

Hi Stephan!

It's bad that I'm using Hadoop 2.6, so I have to stick to the old bucketing sink. I made it by explicitly setting Hadoop conf for the bucketing sink in the user code.

Thank you very much!

Best,
Paul Lam


Stephan Ewen <[hidden email]> 于2018年9月21日周五 下午6:30写道:
Hi!

The old bucketing sink does not work with the Flink file systems, it only works with Hadoop's direct file system support. IIRC it grabs the Flink File System (which creates s3a) to get the Hadoop config etc and then creates the Hadoop File System (s3a again).

The new streaming file sink will use Flink Filesystem support, which is important more efficient streaming fault tolerance. S3 support will be part of Flink 1.7

Best,
Stephan


On Fri, Sep 21, 2018 at 10:41 AM Paul Lam <[hidden email]> wrote:
Hi Stefan, Stephan,

Yes, the `hadoop.security.group.mapping` option is explicitly set to `org.apache.hadoop.security.LdapGroupsMapping`. Guess that was why the classloader found an unshaded class. 

I don’t have the permission to change the Hadoop cluster configurations so I modified the `core-default-shaded.xml` and marked the option as final to solve the problem, after which the class loading exceptions were gone. 

But anther problem came up (likely not related to the previous problem):

In case of the old bucketing sink (version 1.5.3), it seems that the `org.apache.hadoop.fs.s3a.S3AFileSystem` is initiated twice before the task starts running. The first time is called by `org.apache.flink.fs.s3hadoop.S3FileSystemFactory` and works well, but the second time is called by bucketing sink itself, and fails to leverage the `s3.*` parameters like the access key and the secret key. 

The stack traces are as below:

```
com.amazonaws.AmazonClientException: Unable to load AWS credentials from any provider in the chain
	at com.amazonaws.auth.AWSCredentialsProviderChain.getCredentials(AWSCredentialsProviderChain.java:117)
	at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:3521)
	at com.amazonaws.services.s3.AmazonS3Client.headBucket(AmazonS3Client.java:1031)
	at com.amazonaws.services.s3.AmazonS3Client.doesBucketExist(AmazonS3Client.java:994)
	at org.apache.hadoop.fs.s3a.S3AFileSystem.initialize(S3AFileSystem.java:297)
	at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.createHadoopFileSystem(BucketingSink.java:1307)
	at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initFileSystem(BucketingSink.java:426)
	at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initializeState(BucketingSink.java:370)
	at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)
	at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160)
	at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
	at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:254)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:730)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:295)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:712)
	at java.lang.Thread.run(Thread.java:748)
```

I haven’t figured out why the s3a filesystem needs to be initiated twice. And is it a bug that the bucketing sink does not use filesystem factories to create filesystem?

Thank you very much!

Best,
Paul Lam


在 2018年9月20日,23:35,Stephan Ewen <[hidden email]> 写道:

Hi!

A few questions to diagnose/fix this:

 Do you explicitly configure the "hadoop.security.group.mapping"?

  - If not, this setting may have leaked in from a Hadoop config in the classpath. We are fixing this in Flink 1.7, to make this insensitive to such settings leaking in.

  - If yes, then please try setting the config variable to "hadoop.security.group.mapping: org.apache.flink.fs.s3hadoop.shaded.org.apache.hadoop.security.LdapGroupsMapping"?

Please let us know if that works!



On Thu, Sep 20, 2018 at 1:40 PM, Stefan Richter <[hidden email]> wrote:
Hi,

I could not find any open Jira for the problem you describe. Could you please open one?

Best,
Stefan

> Am 19.09.2018 um 09:54 schrieb Paul Lam <[hidden email]>:

> Hi, 

> I’m using StreamingFileSink of 1.6.0 to write logs to S3 and encounter a classloader problem. It seems that there are conflicts in flink-shaded-hadoop2-uber-1.6.0.jar and flink-s3-fs-hadoop-1.6.0.jar, and maybe related to class loading orders. 

> Did anyone meet this problem? Thanks a lot!

> The stack traces are as below:

> java.io.IOException: java.lang.RuntimeException: class org.apache.hadoop.security.LdapGroupsMapping not org.apache.flink.fs.s3hadoop.shaded.org.apache.hadoop.security.GroupMappingServiceProvider
>       at org.apache.flink.fs.s3hadoop.shaded.org.apache.flink.runtime.fs.hdfs.AbstractFileSystemFactory.create(AbstractFileSystemFactory.java:62)
>       at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:395)
>       at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:318)
>       at org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.<init>(Buckets.java:111)
>       at org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink$RowFormatBuilder.createBuckets(StreamingFileSink.java:242)
>       at org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.initializeState(StreamingFileSink.java:327)
>       at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)
>       at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160)
>       at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
>       at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:254)
>       at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:738)
>       at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:289)
>       at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
>       at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.RuntimeException: java.lang.RuntimeException: class org.apache.hadoop.security.LdapGroupsMapping not org.apache.flink.fs.s3hadoop.shaded.org.apache.hadoop.security.GroupMappingServiceProvider
>       at org.apache.flink.fs.s3hadoop.shaded.org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2246)
>       at org.apache.flink.fs.s3hadoop.shaded.org.apache.hadoop.security.Groups.<init>(Groups.java:108)
>       at org.apache.flink.fs.s3hadoop.shaded.org.apache.hadoop.security.Groups.<init>(Groups.java:102)
>       at org.apache.flink.fs.s3hadoop.shaded.org.apache.hadoop.security.Groups.getUserToGroupsMappingService(Groups.java:450)
>       at org.apache.flink.fs.s3hadoop.shaded.org.apache.hadoop.security.UserGroupInformation.initialize(UserGroupInformation.java:309)
>       at org.apache.flink.fs.s3hadoop.shaded.org.apache.hadoop.security.UserGroupInformation.ensureInitialized(UserGroupInformation.java:276)
>       at org.apache.flink.fs.s3hadoop.shaded.org.apache.hadoop.security.UserGroupInformation.loginUserFromSubject(UserGroupInformation.java:832)
>       at org.apache.flink.fs.s3hadoop.shaded.org.apache.hadoop.security.UserGroupInformation.getLoginUser(UserGroupInformation.java:802)
>       at org.apache.flink.fs.s3hadoop.shaded.org.apache.hadoop.security.UserGroupInformation.getCurrentUser(UserGroupInformation.java:675)
>       at org.apache.flink.fs.s3hadoop.shaded.org.apache.hadoop.fs.s3a.S3AFileSystem.initialize(S3AFileSystem.java:177)
>       at org.apache.flink.fs.s3hadoop.shaded.org.apache.flink.runtime.fs.hdfs.AbstractFileSystemFactory.create(AbstractFileSystemFactory.java:57)
>       ... 13 more
> Caused by: java.lang.RuntimeException: class org.apache.hadoop.security.LdapGroupsMapping not org.apache.flink.fs.s3hadoop.shaded.org.apache.hadoop.security.GroupMappingServiceProvider
>       at org.apache.flink.fs.s3hadoop.shaded.org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2240)
>       ... 23 more


> Best,
> Paul Lam





Reply | Threaded
Open this post in threaded view
|

Re: S3 connector Hadoop class mismatch

Stephan Ewen
There is a Pull Request to enable the new streaming sink for Hadoop < 2.7, so it may become an option in the next release.

Thanks for bearing with us!

Best,
Stephan


On Sat, Sep 22, 2018 at 2:27 PM Paul Lam <[hidden email]> wrote:

Hi Stephan!

It's bad that I'm using Hadoop 2.6, so I have to stick to the old bucketing sink. I made it by explicitly setting Hadoop conf for the bucketing sink in the user code.

Thank you very much!

Best,
Paul Lam


Stephan Ewen <[hidden email]> 于2018年9月21日周五 下午6:30写道:
Hi!

The old bucketing sink does not work with the Flink file systems, it only works with Hadoop's direct file system support. IIRC it grabs the Flink File System (which creates s3a) to get the Hadoop config etc and then creates the Hadoop File System (s3a again).

The new streaming file sink will use Flink Filesystem support, which is important more efficient streaming fault tolerance. S3 support will be part of Flink 1.7

Best,
Stephan


On Fri, Sep 21, 2018 at 10:41 AM Paul Lam <[hidden email]> wrote:
Hi Stefan, Stephan,

Yes, the `hadoop.security.group.mapping` option is explicitly set to `org.apache.hadoop.security.LdapGroupsMapping`. Guess that was why the classloader found an unshaded class. 

I don’t have the permission to change the Hadoop cluster configurations so I modified the `core-default-shaded.xml` and marked the option as final to solve the problem, after which the class loading exceptions were gone. 

But anther problem came up (likely not related to the previous problem):

In case of the old bucketing sink (version 1.5.3), it seems that the `org.apache.hadoop.fs.s3a.S3AFileSystem` is initiated twice before the task starts running. The first time is called by `org.apache.flink.fs.s3hadoop.S3FileSystemFactory` and works well, but the second time is called by bucketing sink itself, and fails to leverage the `s3.*` parameters like the access key and the secret key. 

The stack traces are as below:

```
com.amazonaws.AmazonClientException: Unable to load AWS credentials from any provider in the chain
	at com.amazonaws.auth.AWSCredentialsProviderChain.getCredentials(AWSCredentialsProviderChain.java:117)
	at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:3521)
	at com.amazonaws.services.s3.AmazonS3Client.headBucket(AmazonS3Client.java:1031)
	at com.amazonaws.services.s3.AmazonS3Client.doesBucketExist(AmazonS3Client.java:994)
	at org.apache.hadoop.fs.s3a.S3AFileSystem.initialize(S3AFileSystem.java:297)
	at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.createHadoopFileSystem(BucketingSink.java:1307)
	at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initFileSystem(BucketingSink.java:426)
	at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initializeState(BucketingSink.java:370)
	at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)
	at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160)
	at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
	at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:254)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:730)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:295)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:712)
	at java.lang.Thread.run(Thread.java:748)
```

I haven’t figured out why the s3a filesystem needs to be initiated twice. And is it a bug that the bucketing sink does not use filesystem factories to create filesystem?

Thank you very much!

Best,
Paul Lam


在 2018年9月20日,23:35,Stephan Ewen <[hidden email]> 写道:

Hi!

A few questions to diagnose/fix this:

 Do you explicitly configure the "hadoop.security.group.mapping"?

  - If not, this setting may have leaked in from a Hadoop config in the classpath. We are fixing this in Flink 1.7, to make this insensitive to such settings leaking in.

  - If yes, then please try setting the config variable to "hadoop.security.group.mapping: org.apache.flink.fs.s3hadoop.shaded.org.apache.hadoop.security.LdapGroupsMapping"?

Please let us know if that works!



On Thu, Sep 20, 2018 at 1:40 PM, Stefan Richter <[hidden email]> wrote:
Hi,

I could not find any open Jira for the problem you describe. Could you please open one?

Best,
Stefan

> Am 19.09.2018 um 09:54 schrieb Paul Lam <[hidden email]>:

> Hi, 

> I’m using StreamingFileSink of 1.6.0 to write logs to S3 and encounter a classloader problem. It seems that there are conflicts in flink-shaded-hadoop2-uber-1.6.0.jar and flink-s3-fs-hadoop-1.6.0.jar, and maybe related to class loading orders. 

> Did anyone meet this problem? Thanks a lot!

> The stack traces are as below:

> java.io.IOException: java.lang.RuntimeException: class org.apache.hadoop.security.LdapGroupsMapping not org.apache.flink.fs.s3hadoop.shaded.org.apache.hadoop.security.GroupMappingServiceProvider
>       at org.apache.flink.fs.s3hadoop.shaded.org.apache.flink.runtime.fs.hdfs.AbstractFileSystemFactory.create(AbstractFileSystemFactory.java:62)
>       at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:395)
>       at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:318)
>       at org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.<init>(Buckets.java:111)
>       at org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink$RowFormatBuilder.createBuckets(StreamingFileSink.java:242)
>       at org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.initializeState(StreamingFileSink.java:327)
>       at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)
>       at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160)
>       at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
>       at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:254)
>       at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:738)
>       at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:289)
>       at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
>       at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.RuntimeException: java.lang.RuntimeException: class org.apache.hadoop.security.LdapGroupsMapping not org.apache.flink.fs.s3hadoop.shaded.org.apache.hadoop.security.GroupMappingServiceProvider
>       at org.apache.flink.fs.s3hadoop.shaded.org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2246)
>       at org.apache.flink.fs.s3hadoop.shaded.org.apache.hadoop.security.Groups.<init>(Groups.java:108)
>       at org.apache.flink.fs.s3hadoop.shaded.org.apache.hadoop.security.Groups.<init>(Groups.java:102)
>       at org.apache.flink.fs.s3hadoop.shaded.org.apache.hadoop.security.Groups.getUserToGroupsMappingService(Groups.java:450)
>       at org.apache.flink.fs.s3hadoop.shaded.org.apache.hadoop.security.UserGroupInformation.initialize(UserGroupInformation.java:309)
>       at org.apache.flink.fs.s3hadoop.shaded.org.apache.hadoop.security.UserGroupInformation.ensureInitialized(UserGroupInformation.java:276)
>       at org.apache.flink.fs.s3hadoop.shaded.org.apache.hadoop.security.UserGroupInformation.loginUserFromSubject(UserGroupInformation.java:832)
>       at org.apache.flink.fs.s3hadoop.shaded.org.apache.hadoop.security.UserGroupInformation.getLoginUser(UserGroupInformation.java:802)
>       at org.apache.flink.fs.s3hadoop.shaded.org.apache.hadoop.security.UserGroupInformation.getCurrentUser(UserGroupInformation.java:675)
>       at org.apache.flink.fs.s3hadoop.shaded.org.apache.hadoop.fs.s3a.S3AFileSystem.initialize(S3AFileSystem.java:177)
>       at org.apache.flink.fs.s3hadoop.shaded.org.apache.flink.runtime.fs.hdfs.AbstractFileSystemFactory.create(AbstractFileSystemFactory.java:57)
>       ... 13 more
> Caused by: java.lang.RuntimeException: class org.apache.hadoop.security.LdapGroupsMapping not org.apache.flink.fs.s3hadoop.shaded.org.apache.hadoop.security.GroupMappingServiceProvider
>       at org.apache.flink.fs.s3hadoop.shaded.org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2240)
>       ... 23 more


> Best,
> Paul Lam