git.net

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: Using ActiveMQ with more than one consumer instance


Have you confirmed, either via logging or a debugger, that
message.acknowledge() is being called? An exception thrown in
handleMessage() or the JSONObject() constructor would result in the
behavior you're describing.

Why are you calling consumer.receive(1000) after processing the message but
before acknowledging it?

Are you sure you want to use CLIENT_ACKNOWLEDGE mode instead of
INDIVIDUAL_ACKNOWLEDGE mode?

Tim

On Sat, Oct 13, 2018, 9:25 PM Moshe Recanati <
moshe.recanati@xxxxxxxxxxxxxxxxx> wrote:

> Hi Tim,
> Thank you see my code below.
> Keep in mind that I do want that message will be available until it
> processed successfully.
>
> // Establish a connection for the consumer.
>     final Connection consumerConnection =
> connectionFactory.createConnection();
> consumerConnection.start();
> // Create a session.
> final Session consumerSession =
> consumerConnection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
>
> final Destination consumerDestination =
> consumerSession.createQueue(queueName);
>
> *public void recieveMessages(String queueName) throws Exception {*
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> *// Create a message consumer from the session to the queue.final
> MessageConsumer consumer =
> consumerSession.createConsumer(consumerDestination);// Begin to wait for
> messages.Queue queue = consumerSession.createQueue(queueName);QueueBrowser
> queueBrowser = consumerSession.createBrowser(queue);Enumeration msgs =
> queueBrowser.getEnumeration();while (msgs.hasMoreElements()) { //do your
> things here ActiveMQTextMessage message = (ActiveMQTextMessage)
> msgs.nextElement(); if (message == null) continue; //handle message
> System.out.println("Message received in : " + message); try { String text =
> message.getText(); JSONObject messageJson = new JSONObject(text);
> consumer.receive(1000); String responseString = handleMessage(messageJson);
> message.acknowledge();*
>
> *.....*
>
>
> Moshe
>
> On Sun, Oct 14, 2018 at 5:52 AM Tim Bain <tbain@xxxxxxxxxxxxxxx> wrote:
>
> > The behavior you want is the default behavior for a queue, so if you're
> > seeing the behavior you describe, it probably means you're doing
> something
> > wrong with the code that's acknowledging the message. (That, or you're
> > using a topic, not a queue.) Can you post the code that acknowledges the
> > message?
> >
> > Alternatively, can you use AUTO_ACK mode instead of CLIENT_ACK mode, to
> > eliminate any errors relating to your acknowledgement code?
> >
> > Tim
> >
> > On Sat, Oct 13, 2018, 1:33 PM Moshe Recanati <
> > moshe.recanati@xxxxxxxxxxxxxxxxx> wrote:
> >
> > > Hi,
> > > In our architecture we've 2 or more containers for each application for
> > > high availiability puproses.
> > > We're using activeMQ and I would like to implement the following
> > behavior.
> > >
> > > 1. Pushing to a message to queue.
> > > 2. This message will be consumed only by *one *container (the first one
> > > that will read it).
> > > 2. Once message processed successfully the consumer will update this
> > > message can be acknowledged and neglected
> > > 3. I tried using transaction with commit and using CLIENT_ACKNOWLEDGE
> > > however
> > > in both cases both consumers got the message and processed it.
> > >
> > > I want only one consumer to process each message based on availability.
> > >
> > > Please share the way to implement it.
> > >
> > > Thank you
> > > Moshe
> > >
> >
>