git.net

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: AvroInputFormat Serialisation Issue


Hi Padarn,

usually people are using the AvroInputFormat with the Avro class generated by an Avro schema. But after looking into the implementation, one should also be able to use the GenericRecord class as a parameter. So your exception seems to be a bug if it works locally but not distributed. What do you mean with "this is an issue that is mentioned in the documentation" where is this issue documented?

Regards,
Timo



Am 14.05.18 um 18:53 schrieb Padarn Wilson:
Hi all - sorry this seems like a silly question, but I can't figure it out. 

I'm using an AvroInputFormat in order to read an Avro file like this:
val textInputFormat = new AvroInputFormat[GenericRecord](infile, classOf[GenericRecord])
val lines = env.readFile(textInputFormat, path)
This works fine in local mode, but when submitted to a flink cluster I get serialisation errors that look like this:

org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:566)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:524)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:504)
	at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:830)
	at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:808)
	at org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollect(StreamSourceContexts.java:305)
	at org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collect(StreamSourceContexts.java:394)
	at org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator$SplitReader.run(ContinuousFileReaderOperator.java:325)
Caused by: com.esotericsoftware.kryo.KryoException: Error constructing instance of class: org.apache.avro.Schema$StringSchema
Serialization trace:
schema (org.apache.avro.Schema$Field)
fieldMap (org.apache.avro.Schema$RecordSchema)
elementType (org.apache.avro.Schema$ArraySchema)
schema (org.apache.avro.Schema$Field)
fieldMap (org.apache.avro.Schema$RecordSchema)
schema (org.apache.avro.generic.GenericData$Record)
	at com.twitter.chill.Instantiators$$anon$1.newInstance(KryoBase.scala:126)
	at com.esotericsoftware.kryo.Kryo.newInstance(Kryo.java:1061)
	at com.esotericsoftware.kryo.serializers.FieldSerializer.create(FieldSerializer.java:547)
	at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:523)
	at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
	at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
	at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
	at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
	at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:143)
	at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21)
	at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
	at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
	at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
	at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
	at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
	at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
	at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
	at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
	at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
	at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
	at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:143)
	at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21)
	at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
	at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
	at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
	at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
	at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
	at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
	at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:657)
	at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:189)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:547)
	... 7 more
Caused by: java.lang.IllegalAccessException: Class com.twitter.chill.Instantiators$$anonfun$normalJava$1 can not access a member of class org.apache.avro.Schema$StringSchema with modifiers "public"
	at sun.reflect.Reflection.ensureMemberAccess(Reflection.java:102)
	at java.lang.reflect.AccessibleObject.slowCheckMemberAccess(AccessibleObject.java:296)
	at java.lang.reflect.AccessibleObject.checkAccess(AccessibleObject.java:288)
	at java.lang.reflect.Constructor.newInstance(Constructor.java:413)
	at com.twitter.chill.Instantiators$$anonfun$normalJava$1.apply(KryoBase.scala:160)
	at com.twitter.chill.Instantiators$$anon$1.newInstance(KryoBase.scala:123)
	... 37 more


I realise this is an issue that is mentioned in the documentation, but given that it looks like it is a problem with some class insider the AvroInputFormat that is having trouble being serialised, I'm not sure on what he best solution is.

This works fine if I specify the class not to be generic - i.e
val textInputFormat = new AvroInputFormat[GenericRecord](infile, classOf[Example])
val lines = env.readFile(textInputFormat, path
However I can't get this to run in local mode with a case class `Example` that is nested, which is required as the Avro files have very nested fields.




( ! ) Warning: include(msgfooter.php): failed to open stream: No such file or directory in /var/www/git/apache-flink-users/msg09285.html on line 188
Call Stack
#TimeMemoryFunctionLocation
10.0008372760{main}( ).../msg09285.html:0

( ! ) Warning: include(): Failed opening 'msgfooter.php' for inclusion (include_path='.:/var/www/git') in /var/www/git/apache-flink-users/msg09285.html on line 188
Call Stack
#TimeMemoryFunctionLocation
10.0008372760{main}( ).../msg09285.html:0