[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: BucketingSink to S3: Missing class com/amazonaws/AmazonClientException


they are actually using different interfaces and dependencies. Checkpointing uses Flink FileSystem and the shaded Hadoop Filesystem is a special implementation of this based on the Hadoop S3 FileSystem that has all dependencies bundled in. The BucketingSink uses HDFS/Hadoop FileSystem, therefore this needs to have the correct dependency setup.

Flink 1.6. released the new StreamingFileSink which is a replacement for BucketingSink. With Flink 1.7 this will also support the bundled S3 file systems.


On 3. Oct 2018, at 17:55, Julio Biason <julio.biason@xxxxxxxxx> wrote:

Hi Andrey,

Yes, we followed the guide. Our checkpoints/savepoints are already being saved on S3/Ceph, using the ShadedHadoop/S3AFileSystem (because it's the one we managed to completely override the AWS address to point to our Ceph cluster).

I suppose I can add the package with the AmazonClientException to my project, but I still wonder why it works fine for Flink but fails for my project; in theory, both are using the same dependencies, right?

On Wed, Oct 3, 2018 at 7:51 AM, Andrey Zagrebin <andrey@xxxxxxxxxxxxxxxxx> wrote:
Hi Julio,

Looks like some problem with dependencies.
Have you followed the recommended s3 configuration guide [1]?
Is it correct that your job already created checkpoints/savepoints on s3 before?

I think if you manually create file system using FileSystem.get(path), it should be configured the same way as for bucketing sink and checkpoints.


On 2 Oct 2018, at 15:21, Julio Biason <julio.biason@xxxxxxxxx> wrote:

Hey guys,

I've setup a BucketingSink as a dead letter queue into our Ceph cluster using S3, but when I start the job, I get this error:

java.lang.NoClassDefFoundError: com/amazonaws/AmazonClientException at java.lang.Class.forName0(Native Method) at java.lang.Class.forName( at org.apache.hadoop.conf.Configuration.getClassByNameOrNull( at org.apache.hadoop.conf.Configuration.getClassByName( at org.apache.hadoop.conf.Configuration.getClass( at org.apache.hadoop.fs.FileSystem.getFileSystemClass( at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.createHadoopFileSystem( at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initFileSystem( at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initializeState( at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction( at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState( at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState( at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState( at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState( at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke( at at Caused by: java.lang.ClassNotFoundException: com.amazonaws.AmazonClientException at at java.lang.ClassLoader.loadClass( at sun.misc.Launcher$AppClassLoader.loadClass( at java.lang.ClassLoader.loadClass( ... 17 more
I find it weird 'cause I've already set up checkpoints (and savepoitns) to use S3 as protocol, and I just assume that, if it works for checkpoints, it should work here.

(I suppose I could add the aws client as a dependency of my build but, again, I assumed that once S3 works for checkpoints, it should work everywhere.)

And kinda related, can I assume that using the FileSystem class to create FSOutputStreams will follow Flink configuration? I have another type of dead letter queue that won't work with BucketingSink and I was thinking about using it directly to create files inside that Ceph/S3.

Julio Biason, Sofware Engineer
AZION  |  Deliver. Accelerate. Protect.
Office: +55 51 3083 8101  |  Mobile: +55 51 99907 0554

Julio Biason, Sofware Engineer
AZION  |  Deliver. Accelerate. Protect.
Office: +55 51 3083 8101  |  Mobile: +55 51 99907 0554