git.net

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

RabbitMQ acknowledgment


> Hello all,
> 
> We are switching our backend application from Apache ActiveMQ to RabbitMQ Broker and we are trying to re-implement the following functionality:
> 
> from(activemq:queue:endpoint?asyncConsumer=true&acknowledgementMode=4)
>   .process(...)
>   .delay(header(DELAY)).asyncDelayed().executorService(executorService)
>   .process(...)
>   .process(exchange -> {
>       ((JmsMessage) exchange.getIn()).getJmsMessage().acknowledge()
>   })
>   .end();
> 
> 
> The logic is that the Queue will be consumed by our route, each message will be scheduled to a future async task and AFTER the task has been completed the route will acknowledge the broker so that the latter remove the message from the Queue (no message loss in case of errors).
> 
> Based on my investigation of rabbitmq component and Camel's RabbitMQConsumer.java implementation I cannot develop a corresponding route choosing the point upon which the acknowledgement will be returned (feature request). The requirement seems to be handled adding autoAck=false in the from URI params. Then, however, I noticed that the consumers were blocked till they acknowledge the previous message! Is this functioning as designed?
> 
> I would like to acknowledge the consumed messages explicitly at the end of the pipeline and also keep polling the queue in the meantime independently.
> 
> Thanks,
> Panos


-- 

PRIVILEGED AND CONFIDENTIAL COMMUNICATION

This e-mail transmission, and any documents, files or previous e-mail 
messages attached to it, may contain confidential information that is 
legally privileged. If you are not the intended recipient or a person 
responsible for delivering it to the intended recipient, you are hereby 
notified that any disclosure, copying, distribution or use of any of the 
information contained in or attached to this transmission is STRICTLY 
PROHIBITED. If you have received this transmission in error, please: (1) 
immediately notify me by reply e-mail, or by collect telephone call; and 
(2) destroy the original transmission and its attachments without reading 
or saving in any manner.


( ! ) Warning: include(msgfooter.php): failed to open stream: No such file or directory in /var/www/git/apache-camel-users/msg03139.html on line 106
Call Stack
#TimeMemoryFunctionLocation
10.0007364552{main}( ).../msg03139.html:0

( ! ) Warning: include(): Failed opening 'msgfooter.php' for inclusion (include_path='.:/var/www/git') in /var/www/git/apache-camel-users/msg03139.html on line 106
Call Stack
#TimeMemoryFunctionLocation
10.0007364552{main}( ).../msg03139.html:0