Flink plugin File System for GCS

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

Flink plugin File System for GCS

Alexander Filipchik
Hello,
I'm trying to implement a flink native FS for GCS which can be used with a streaming file sink. I used S3 one as a reference and made it work locally. however, it fails to load when I deploy it to the cluster. If I put hadoop in the fat jar I get:

Caused by: java.lang.LinkageError: loader constraint violation: loader (instance of org/apache/flink/core/plugin/PluginLoader$PluginClassLoader) previously initiated loading for a different type with name "org/apache/hadoop/conf/Configuration"
at java.lang.ClassLoader.defineClass1(Native Method)
at java.lang.ClassLoader.defineClass(ClassLoader.java:756)
at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
at java.net.URLClassLoader.defineClass(URLClassLoader.java:468)
at java.net.URLClassLoader.access$100(URLClassLoader.java:74)
at java.net.URLClassLoader$1.run(URLClassLoader.java:369)
at java.net.URLClassLoader$1.run(URLClassLoader.java:363)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:362)
at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
at org.apache.flink.core.plugin.PluginLoader$PluginClassLoader.loadClass(PluginLoader.java:149)
at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
at com.css.flink.fs.gcs.FlinkGcsFileSystemFactory.createInitializedGcsFS(FlinkGcsFileSystemFactory.java:63)

If i remove hadoop from fat jar but add hadoop uber to lib folder I get:
java.lang.ClassNotFoundException: org.apache.hadoop.conf.Configuration

if I remove hadoop from fat jar and put flink-shaded-hadoop-2-uber-2.8.3-10.0.jar into libs, I'm getting: java.lang.ClassNotFoundException: org.apache.hadoop.fs.FileSystem

I think it is some issue with classloaders, but not sure what exactly causes it. It looks like some classes are loaded from lib and some are not. Any advice?

Thank you,Alex
Reply | Threaded
Open this post in threaded view
|

Re: Flink plugin File System for GCS

Yang Wang
Hi Alex,

Build a fat jar is a good practice for flink filesystem plugin, just like flink-s3-fs-hadoop,
flink-s3-fs-presto, flink-azure-fs-hadoop and flink-oss-fs-hadoop. All the provided filesystem
plugins are self-contained, which means you need to bundle the hadoop in your fat jar.

The reason why you fat could not work probably is you do not shade in the adapter classes
"org.apache.flink.runtime.fs.hdfs" and "org.apache.flink.runtime.util".[1]
This will forces plugin classloader always load them from your fat, not from flink-dist-*.jar.

Could you add the force shading and have another try?



Best,
Yang

Alexander Filipchik <[hidden email]> 于2020年6月18日周四 下午12:27写道:
Hello,
I'm trying to implement a flink native FS for GCS which can be used with a streaming file sink. I used S3 one as a reference and made it work locally. however, it fails to load when I deploy it to the cluster. If I put hadoop in the fat jar I get:

Caused by: java.lang.LinkageError: loader constraint violation: loader (instance of org/apache/flink/core/plugin/PluginLoader$PluginClassLoader) previously initiated loading for a different type with name "org/apache/hadoop/conf/Configuration"
at java.lang.ClassLoader.defineClass1(Native Method)
at java.lang.ClassLoader.defineClass(ClassLoader.java:756)
at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
at java.net.URLClassLoader.defineClass(URLClassLoader.java:468)
at java.net.URLClassLoader.access$100(URLClassLoader.java:74)
at java.net.URLClassLoader$1.run(URLClassLoader.java:369)
at java.net.URLClassLoader$1.run(URLClassLoader.java:363)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:362)
at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
at org.apache.flink.core.plugin.PluginLoader$PluginClassLoader.loadClass(PluginLoader.java:149)
at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
at com.css.flink.fs.gcs.FlinkGcsFileSystemFactory.createInitializedGcsFS(FlinkGcsFileSystemFactory.java:63)

If i remove hadoop from fat jar but add hadoop uber to lib folder I get:
java.lang.ClassNotFoundException: org.apache.hadoop.conf.Configuration

if I remove hadoop from fat jar and put flink-shaded-hadoop-2-uber-2.8.3-10.0.jar into libs, I'm getting: java.lang.ClassNotFoundException: org.apache.hadoop.fs.FileSystem

I think it is some issue with classloaders, but not sure what exactly causes it. It looks like some classes are loaded from lib and some are not. Any advice?

Thank you,Alex