[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: BucketingSink to S3: Missing class com/amazonaws/AmazonClientException

Hi Julio,

What's the Flink version for this setup?


On Wed, Oct 3, 2018 at 4:22 PM Andrey Zagrebin <andrey@xxxxxxxxxxxxxxxxx> wrote:
Hi Julio,

Looks like some problem with dependencies.
Have you followed the recommended s3 configuration guide [1]?
Is it correct that your job already created checkpoints/savepoints on s3 before?

I think if you manually create file system using FileSystem.get(path), it should be configured the same way as for bucketing sink and checkpoints.


On 2 Oct 2018, at 15:21, Julio Biason <julio.biason@xxxxxxxxx> wrote:

Hey guys,

I've setup a BucketingSink as a dead letter queue into our Ceph cluster using S3, but when I start the job, I get this error:

java.lang.NoClassDefFoundError: com/amazonaws/AmazonClientException at java.lang.Class.forName0(Native Method) at java.lang.Class.forName( at org.apache.hadoop.conf.Configuration.getClassByNameOrNull( at org.apache.hadoop.conf.Configuration.getClassByName( at org.apache.hadoop.conf.Configuration.getClass( at org.apache.hadoop.fs.FileSystem.getFileSystemClass( at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.createHadoopFileSystem( at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initFileSystem( at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initializeState( at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction( at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState( at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState( at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState( at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState( at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke( at at Caused by: java.lang.ClassNotFoundException: com.amazonaws.AmazonClientException at at java.lang.ClassLoader.loadClass( at sun.misc.Launcher$AppClassLoader.loadClass( at java.lang.ClassLoader.loadClass( ... 17 more
I find it weird 'cause I've already set up checkpoints (and savepoitns) to use S3 as protocol, and I just assume that, if it works for checkpoints, it should work here.

(I suppose I could add the aws client as a dependency of my build but, again, I assumed that once S3 works for checkpoints, it should work everywhere.)

And kinda related, can I assume that using the FileSystem class to create FSOutputStreams will follow Flink configuration? I have another type of dead letter queue that won't work with BucketingSink and I was thinking about using it directly to create files inside that Ceph/S3.

Julio Biason, Sofware Engineer
AZION  |  Deliver. Accelerate. Protect.
Office: +55 51 3083 8101  |  Mobile: +55 51 99907 0554