git.net

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: Eventual errors reading from ActiveMQ/JmsIO


Sounds like a bug. Can you please create a JIRA with instructions for reproducing the issue ?

Thanks,
Cham

On Thu, May 24, 2018 at 9:09 AM Edward Pricer <ted.pricer@xxxxxxxxxxxxxxxxxx> wrote:
Hi - hoping for a helping hand

When trivially reading from an ActiveMQ queue, I eventually get a
java.lang.NoSuchMethodException: javax.jms.Message.<init>() exception.

The queue is populated out-of-process rapidly with a text message.

Exception sometimes appears immediately, sometimes not for some time.
Faster queue writes appears to exacerbate the problem.

Beam 2.4.0, Java, DirectRunner

Apparently the internals of DirectRunner are trying to clone a
JmsCheckpointMark by
encoding and decoding with a generic AvroCoder, but that's failing
because part of the payload doesn't have a default constructor (in
fact, it's trying to instantiate an interface). Do I need to be using
JmsIO differently, is
this a limitation of the DirectRunner, or is this actually a bug?

Here's the test-case code. I don't think the publisher side is
relevant, but let me know if it is.

Pipeline p = Pipeline.create(PipelineOptionsFactory.fromArgs(args).withValidation().create());

// read from the queue
ConnectionFactory factory = new
ActiveMQConnectionFactory("tcp://localhost:61616");

PCollection<String> inputStrings = p.apply("Read from queue",
JmsIO.<String>readMessage() .withConnectionFactory(factory)
.withQueue("somequeue") .withCoder(StringUtf8Coder.of())
.withMessageMapper((JmsIO.MessageMapper<String>) message ->
((TextMessage) message).getText()));

// decode PCollection<String> asStrings = inputStrings.apply("Decode
Message", ParDo.of(new DoFn<String, String>() { @ProcessElement public
void processElement(ProcessContext context) {
System.out.println(context.element());
context.output(context.element()); } })); p.run();


Full stack trace:

Exception in thread "main" java.lang.RuntimeException:
java.lang.NoSuchMethodException: javax.jms.Message.<init>()
at org.apache.avro.specific.SpecificData.newInstance(SpecificData.java:353)
at org.apache.avro.specific.SpecificData.newRecord(SpecificData.java:369)
at org.apache.avro.reflect.ReflectData.newRecord(ReflectData.java:901)
at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:212)
at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175)
at org.apache.avro.reflect.ReflectDatumReader.readCollection(ReflectDatumReader.java:219)
at org.apache.avro.reflect.ReflectDatumReader.readArray(ReflectDatumReader.java:137)
at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:177)
at org.apache.avro.reflect.ReflectDatumReader.readField(ReflectDatumReader.java:302)
at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:222)
at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175)
at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:145)
at org.apache.beam.sdk.coders.AvroCoder.decode(AvroCoder.java:318)
at org.apache.beam.sdk.coders.Coder.decode(Coder.java:170)
at org.apache.beam.sdk.util.CoderUtils.decodeFromSafeStream(CoderUtils.java:122)
at org.apache.beam.sdk.util.CoderUtils.decodeFromByteArray(CoderUtils.java:105)
at org.apache.beam.sdk.util.CoderUtils.decodeFromByteArray(CoderUtils.java:99)
at org.apache.beam.sdk.util.CoderUtils.clone(CoderUtils.java:148)
at org.apache.beam.runners.direct.UnboundedReadEvaluatorFactory$UnboundedReadEvaluator.getReader(UnboundedReadEvaluatorFactory.java:194)
at org.apache.beam.runners.direct.UnboundedReadEvaluatorFactory$UnboundedReadEvaluator.processElement(UnboundedReadEvaluatorFactory.java:124)
at org.apache.beam.runners.direct.DirectTransformExecutor.processElements(DirectTransformExecutor.java:161)
at org.apache.beam.runners.direct.DirectTransformExecutor.run(DirectTransformExecutor.java:125)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.NoSuchMethodException: javax.jms.Message.<init>()
at java.lang.Class.getConstructor0(Class.java:3082)
at java.lang.Class.getDeclaredConstructor(Class.java:2178)
at org.apache.avro.specific.SpecificData.newInstance(SpecificData.java:347)

Thanks!