git.net

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: Eventual errors reading from ActiveMQ/JmsIO


Done. For future reference - https://issues.apache.org/jira/browse/BEAM-4409
On Thu, May 24, 2018 at 8:41 PM Chamikara Jayalath <chamikara@xxxxxxxxxx>
wrote:

> Sounds like a bug. Can you please create a JIRA with instructions for
reproducing the issue ?

> Thanks,
> Cham

> On Thu, May 24, 2018 at 9:09 AM Edward Pricer <
ted.pricer@xxxxxxxxxxxxxxxxxx> wrote:

>> Hi - hoping for a helping hand

>> When trivially reading from an ActiveMQ queue, I eventually get a
>> java.lang.NoSuchMethodException: javax.jms.Message.<init>() exception.

>> The queue is populated out-of-process rapidly with a text message.

>> Exception sometimes appears immediately, sometimes not for some time.
>> Faster queue writes appears to exacerbate the problem.

>> Beam 2.4.0, Java, DirectRunner

>> Apparently the internals of DirectRunner are trying to clone a
>> JmsCheckpointMark by
>> encoding and decoding with a generic AvroCoder, but that's failing
>> because part of the payload doesn't have a default constructor (in
>> fact, it's trying to instantiate an interface). Do I need to be using
>> JmsIO differently, is
>> this a limitation of the DirectRunner, or is this actually a bug?

>> Here's the test-case code. I don't think the publisher side is
>> relevant, but let me know if it is.

>> Pipeline p =
Pipeline.create(PipelineOptionsFactory.fromArgs(args).withValidation().create());

>> // read from the queue
>> ConnectionFactory factory = new
>> ActiveMQConnectionFactory("tcp://localhost:61616");

>> PCollection<String> inputStrings = p.apply("Read from queue",
>> JmsIO.<String>readMessage() .withConnectionFactory(factory)
>> .withQueue("somequeue") .withCoder(StringUtf8Coder.of())
>> .withMessageMapper((JmsIO.MessageMapper<String>) message ->
>> ((TextMessage) message).getText()));

>> // decode PCollection<String> asStrings = inputStrings.apply("Decode
>> Message", ParDo.of(new DoFn<String, String>() { @ProcessElement public
>> void processElement(ProcessContext context) {
>> System.out.println(context.element());
>> context.output(context.element()); } })); p.run();


>> Full stack trace:

>> Exception in thread "main" java.lang.RuntimeException:
>> java.lang.NoSuchMethodException: javax.jms.Message.<init>()
>> at
org.apache.avro.specific.SpecificData.newInstance(SpecificData.java:353)
>> at org.apache.avro.specific.SpecificData.newRecord(SpecificData.java:369)
>> at org.apache.avro.reflect.ReflectData.newRecord(ReflectData.java:901)
>> at
org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:212)
>> at
org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175)
>> at
org.apache.avro.reflect.ReflectDatumReader.readCollection(ReflectDatumReader.java:219)
>> at
org.apache.avro.reflect.ReflectDatumReader.readArray(ReflectDatumReader.java:137)
>> at
org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:177)
>> at
org.apache.avro.reflect.ReflectDatumReader.readField(ReflectDatumReader.java:302)
>> at
org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:222)
>> at
org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175)
>> at
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
>> at
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:145)
>> at org.apache.beam.sdk.coders.AvroCoder.decode(AvroCoder.java:318)
>> at org.apache.beam.sdk.coders.Coder.decode(Coder.java:170)
>> at
org.apache.beam.sdk.util.CoderUtils.decodeFromSafeStream(CoderUtils.java:122)
>> at
org.apache.beam.sdk.util.CoderUtils.decodeFromByteArray(CoderUtils.java:105)
>> at
org.apache.beam.sdk.util.CoderUtils.decodeFromByteArray(CoderUtils.java:99)
>> at org.apache.beam.sdk.util.CoderUtils.clone(CoderUtils.java:148)
>> at
org.apache.beam.runners.direct.UnboundedReadEvaluatorFactory$UnboundedReadEvaluator.getReader(UnboundedReadEvaluatorFactory.java:194)
>> at
org.apache.beam.runners.direct.UnboundedReadEvaluatorFactory$UnboundedReadEvaluator.processElement(UnboundedReadEvaluatorFactory.java:124)
>> at
org.apache.beam.runners.direct.DirectTransformExecutor.processElements(DirectTransformExecutor.java:161)
>> at
org.apache.beam.runners.direct.DirectTransformExecutor.run(DirectTransformExecutor.java:125)
>> at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>> at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>> at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>> at java.lang.Thread.run(Thread.java:748)
>> Caused by: java.lang.NoSuchMethodException: javax.jms.Message.<init>()
>> at java.lang.Class.getConstructor0(Class.java:3082)
>> at java.lang.Class.getDeclaredConstructor(Class.java:2178)
>> at
org.apache.avro.specific.SpecificData.newInstance(SpecificData.java:347)

>> Thanks!