BucketingSink to S3: Missing class com/amazonaws/AmazonClientException

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

BucketingSink to S3: Missing class com/amazonaws/AmazonClientException

Julio Biason
Hey guys,

I've setup a BucketingSink as a dead letter queue into our Ceph cluster using S3, but when I start the job, I get this error:

java.lang.NoClassDefFoundError: com/amazonaws/AmazonClientException at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:348) at org.apache.hadoop.conf.Configuration.getClassByNameOrNull(Configuration.java:2306) at org.apache.hadoop.conf.Configuration.getClassByName(Configuration.java:2271) at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2367) at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2793) at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.createHadoopFileSystem(BucketingSink.java:1295) at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initFileSystem(BucketingSink.java:432) at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initializeState(BucketingSink.java:376) at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178) at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:254) at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:738) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:289) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711) at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.ClassNotFoundException: com.amazonaws.AmazonClientException at java.net.URLClassLoader.findClass(URLClassLoader.java:381) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) ... 17 more
I find it weird 'cause I've already set up checkpoints (and savepoitns) to use S3 as protocol, and I just assume that, if it works for checkpoints, it should work here.

(I suppose I could add the aws client as a dependency of my build but, again, I assumed that once S3 works for checkpoints, it should work everywhere.)

And kinda related, can I assume that using the FileSystem class to create FSOutputStreams will follow Flink configuration? I have another type of dead letter queue that won't work with BucketingSink and I was thinking about using it directly to create files inside that Ceph/S3.

--
Julio Biason, Sofware Engineer
AZION  |  Deliver. Accelerate. Protect.
Office: <a href="callto:+555130838101" value="+555130838101" style="color:rgb(17,85,204);font-family:arial,sans-serif;font-size:12.8px" target="_blank">+55 51 3083 8101  |  Mobile: <a href="callto:+5551996209291" style="color:rgb(17,85,204)" target="_blank">+55 51 99907 0554
Reply | Threaded
Open this post in threaded view
|

Re: BucketingSink to S3: Missing class com/amazonaws/AmazonClientException

Andrey Zagrebin
Hi Julio,

Looks like some problem with dependencies.
Have you followed the recommended s3 configuration guide [1]?
Is it correct that your job already created checkpoints/savepoints on s3 before?

I think if you manually create file system using FileSystem.get(path), it should be configured the same way as for bucketing sink and checkpoints.

Best,
Andrey


On 2 Oct 2018, at 15:21, Julio Biason <[hidden email]> wrote:

Hey guys,

I've setup a BucketingSink as a dead letter queue into our Ceph cluster using S3, but when I start the job, I get this error:

java.lang.NoClassDefFoundError: com/amazonaws/AmazonClientException at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:348) at org.apache.hadoop.conf.Configuration.getClassByNameOrNull(Configuration.java:2306) at org.apache.hadoop.conf.Configuration.getClassByName(Configuration.java:2271) at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2367) at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2793) at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.createHadoopFileSystem(BucketingSink.java:1295) at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initFileSystem(BucketingSink.java:432) at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initializeState(BucketingSink.java:376) at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178) at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:254) at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:738) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:289) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711) at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.ClassNotFoundException: com.amazonaws.AmazonClientException at java.net.URLClassLoader.findClass(URLClassLoader.java:381) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) ... 17 more
I find it weird 'cause I've already set up checkpoints (and savepoitns) to use S3 as protocol, and I just assume that, if it works for checkpoints, it should work here.

(I suppose I could add the aws client as a dependency of my build but, again, I assumed that once S3 works for checkpoints, it should work everywhere.)

And kinda related, can I assume that using the FileSystem class to create FSOutputStreams will follow Flink configuration? I have another type of dead letter queue that won't work with BucketingSink and I was thinking about using it directly to create files inside that Ceph/S3.

--
Julio Biason, Sofware Engineer
AZION  |  Deliver. Accelerate. Protect.
Office: <a href="callto:+555130838101" value="+555130838101" style="color:rgb(17,85,204);font-family:arial,sans-serif;font-size:12.8px" target="_blank" class="">+55 51 3083 8101  |  Mobile: <a href="callto:+5551996209291" style="color:rgb(17,85,204)" target="_blank" class="">+55 51 99907 0554

Reply | Threaded
Open this post in threaded view
|

Re: BucketingSink to S3: Missing class com/amazonaws/AmazonClientException

Amit Jain
Hi Julio,

What's the Flink version for this setup?

--
Thanks,
Amit

On Wed, Oct 3, 2018 at 4:22 PM Andrey Zagrebin <[hidden email]> wrote:
Hi Julio,

Looks like some problem with dependencies.
Have you followed the recommended s3 configuration guide [1]?
Is it correct that your job already created checkpoints/savepoints on s3 before?

I think if you manually create file system using FileSystem.get(path), it should be configured the same way as for bucketing sink and checkpoints.

Best,
Andrey


On 2 Oct 2018, at 15:21, Julio Biason <[hidden email]> wrote:

Hey guys,

I've setup a BucketingSink as a dead letter queue into our Ceph cluster using S3, but when I start the job, I get this error:

java.lang.NoClassDefFoundError: com/amazonaws/AmazonClientException at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:348) at org.apache.hadoop.conf.Configuration.getClassByNameOrNull(Configuration.java:2306) at org.apache.hadoop.conf.Configuration.getClassByName(Configuration.java:2271) at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2367) at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2793) at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.createHadoopFileSystem(BucketingSink.java:1295) at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initFileSystem(BucketingSink.java:432) at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initializeState(BucketingSink.java:376) at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178) at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:254) at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:738) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:289) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711) at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.ClassNotFoundException: com.amazonaws.AmazonClientException at java.net.URLClassLoader.findClass(URLClassLoader.java:381) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) ... 17 more
I find it weird 'cause I've already set up checkpoints (and savepoitns) to use S3 as protocol, and I just assume that, if it works for checkpoints, it should work here.

(I suppose I could add the aws client as a dependency of my build but, again, I assumed that once S3 works for checkpoints, it should work everywhere.)

And kinda related, can I assume that using the FileSystem class to create FSOutputStreams will follow Flink configuration? I have another type of dead letter queue that won't work with BucketingSink and I was thinking about using it directly to create files inside that Ceph/S3.

--
Julio Biason, Sofware Engineer
AZION  |  Deliver. Accelerate. Protect.
Office: <a href="callto:+555130838101" value="+555130838101" style="color:rgb(17,85,204);font-family:arial,sans-serif;font-size:12.8px" target="_blank">+55 51 3083 8101  |  Mobile: <a href="callto:+5551996209291" style="color:rgb(17,85,204)" target="_blank">+55 51 99907 0554

Reply | Threaded
Open this post in threaded view
|

Re: BucketingSink to S3: Missing class com/amazonaws/AmazonClientException

Julio Biason
In reply to this post by Andrey Zagrebin
Hi Andrey,

Yes, we followed the guide. Our checkpoints/savepoints are already being saved on S3/Ceph, using the ShadedHadoop/S3AFileSystem (because it's the one we managed to completely override the AWS address to point to our Ceph cluster).

I suppose I can add the package with the AmazonClientException to my project, but I still wonder why it works fine for Flink but fails for my project; in theory, both are using the same dependencies, right?

On Wed, Oct 3, 2018 at 7:51 AM, Andrey Zagrebin <[hidden email]> wrote:
Hi Julio,

Looks like some problem with dependencies.
Have you followed the recommended s3 configuration guide [1]?
Is it correct that your job already created checkpoints/savepoints on s3 before?

I think if you manually create file system using FileSystem.get(path), it should be configured the same way as for bucketing sink and checkpoints.

Best,
Andrey


On 2 Oct 2018, at 15:21, Julio Biason <[hidden email]> wrote:

Hey guys,

I've setup a BucketingSink as a dead letter queue into our Ceph cluster using S3, but when I start the job, I get this error:

java.lang.NoClassDefFoundError: com/amazonaws/AmazonClientException at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:348) at org.apache.hadoop.conf.Configuration.getClassByNameOrNull(Configuration.java:2306) at org.apache.hadoop.conf.Configuration.getClassByName(Configuration.java:2271) at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2367) at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2793) at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.createHadoopFileSystem(BucketingSink.java:1295) at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initFileSystem(BucketingSink.java:432) at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initializeState(BucketingSink.java:376) at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178) at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:254) at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:738) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:289) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711) at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.ClassNotFoundException: com.amazonaws.AmazonClientException at java.net.URLClassLoader.findClass(URLClassLoader.java:381) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) ... 17 more
I find it weird 'cause I've already set up checkpoints (and savepoitns) to use S3 as protocol, and I just assume that, if it works for checkpoints, it should work here.

(I suppose I could add the aws client as a dependency of my build but, again, I assumed that once S3 works for checkpoints, it should work everywhere.)

And kinda related, can I assume that using the FileSystem class to create FSOutputStreams will follow Flink configuration? I have another type of dead letter queue that won't work with BucketingSink and I was thinking about using it directly to create files inside that Ceph/S3.

--
Julio Biason, Sofware Engineer
AZION  |  Deliver. Accelerate. Protect.
Office: <a href="callto:+555130838101" value="+555130838101" style="color:rgb(17,85,204);font-family:arial,sans-serif;font-size:12.8px" target="_blank">+55 51 3083 8101  |  Mobile: <a href="callto:+5551996209291" style="color:rgb(17,85,204)" target="_blank">+55 51 99907 0554




--
Julio Biason, Sofware Engineer
AZION  |  Deliver. Accelerate. Protect.
Office: <a href="callto:+555130838101" value="+555130838101" style="color:rgb(17,85,204);font-family:arial,sans-serif;font-size:12.8px" target="_blank">+55 51 3083 8101  |  Mobile: <a href="callto:+5551996209291" style="color:rgb(17,85,204)" target="_blank">+55 51 99907 0554
Reply | Threaded
Open this post in threaded view
|

Re: BucketingSink to S3: Missing class com/amazonaws/AmazonClientException

Aljoscha Krettek
Hi,

they are actually using different interfaces and dependencies. Checkpointing uses Flink FileSystem and the shaded Hadoop Filesystem is a special implementation of this based on the Hadoop S3 FileSystem that has all dependencies bundled in. The BucketingSink uses HDFS/Hadoop FileSystem, therefore this needs to have the correct dependency setup.

Flink 1.6. released the new StreamingFileSink which is a replacement for BucketingSink. With Flink 1.7 this will also support the bundled S3 file systems.

Best,
Aljoscha

On 3. Oct 2018, at 17:55, Julio Biason <[hidden email]> wrote:

Hi Andrey,

Yes, we followed the guide. Our checkpoints/savepoints are already being saved on S3/Ceph, using the ShadedHadoop/S3AFileSystem (because it's the one we managed to completely override the AWS address to point to our Ceph cluster).

I suppose I can add the package with the AmazonClientException to my project, but I still wonder why it works fine for Flink but fails for my project; in theory, both are using the same dependencies, right?

On Wed, Oct 3, 2018 at 7:51 AM, Andrey Zagrebin <[hidden email]> wrote:
Hi Julio,

Looks like some problem with dependencies.
Have you followed the recommended s3 configuration guide [1]?
Is it correct that your job already created checkpoints/savepoints on s3 before?

I think if you manually create file system using FileSystem.get(path), it should be configured the same way as for bucketing sink and checkpoints.

Best,
Andrey


On 2 Oct 2018, at 15:21, Julio Biason <[hidden email]> wrote:

Hey guys,

I've setup a BucketingSink as a dead letter queue into our Ceph cluster using S3, but when I start the job, I get this error:

java.lang.NoClassDefFoundError: com/amazonaws/AmazonClientException at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:348) at org.apache.hadoop.conf.Configuration.getClassByNameOrNull(Configuration.java:2306) at org.apache.hadoop.conf.Configuration.getClassByName(Configuration.java:2271) at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2367) at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2793) at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.createHadoopFileSystem(BucketingSink.java:1295) at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initFileSystem(BucketingSink.java:432) at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initializeState(BucketingSink.java:376) at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178) at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:254) at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:738) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:289) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711) at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.ClassNotFoundException: com.amazonaws.AmazonClientException at java.net.URLClassLoader.findClass(URLClassLoader.java:381) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) ... 17 more
I find it weird 'cause I've already set up checkpoints (and savepoitns) to use S3 as protocol, and I just assume that, if it works for checkpoints, it should work here.

(I suppose I could add the aws client as a dependency of my build but, again, I assumed that once S3 works for checkpoints, it should work everywhere.)

And kinda related, can I assume that using the FileSystem class to create FSOutputStreams will follow Flink configuration? I have another type of dead letter queue that won't work with BucketingSink and I was thinking about using it directly to create files inside that Ceph/S3.

--
Julio Biason, Sofware Engineer
AZION  |  Deliver. Accelerate. Protect.
Office: <a href="callto:+555130838101" value="+555130838101" style="color:rgb(17,85,204);font-family:arial,sans-serif;font-size:12.8px" target="_blank" class="">+55 51 3083 8101  |  Mobile: <a href="callto:+5551996209291" style="color:rgb(17,85,204)" target="_blank" class="">+55 51 99907 0554




--
Julio Biason, Sofware Engineer
AZION  |  Deliver. Accelerate. Protect.
Office: <a href="callto:+555130838101" value="+555130838101" style="color:rgb(17,85,204);font-family:arial,sans-serif;font-size:12.8px" target="_blank" class="">+55 51 3083 8101  |  Mobile: <a href="callto:+5551996209291" style="color:rgb(17,85,204)" target="_blank" class="">+55 51 99907 0554