git.net

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: Transactions (Multiple JMS Consumers) and Aggregation


Hi

The aggregate EIP works independently from the original
route/exchange. So in your example the transacted route will commit
after the message has been handed off to the aggregator and then the
route continues, but the route ends there, so it will then commit. And
on the same time the aggregator runs in parallel (independent) and
aggregate messages and then when they are completed, then trigger the
message to be routed independent, but that is routed outside a
transaction.

So if you aim to batch N JMS messages into a single aggregate in the
same TX then what you do there does not work.

For batch JMS in transaction then look at the camel-sjms component
that has such a feature. (the regular camel-jms does not).
https://github.com/apache/camel/blob/master/components/camel-sjms/src/main/docs/sjms-batch-component.adoc

And mind there is camel-sjms2 component also for JMS 2.0 API.

For examples of this, then I suggest to look at the unit tests of itself.

On Wed, Aug 1, 2018 at 2:58 AM, Rajith Muditha Attapattu
<rajith77@xxxxxxxxx> wrote:
> Hi All,
>
> I'm trying to understand how transactions would work in the following
> scenario.
> Given transactions are scoped by sessions, and each concurrent consumer
> will be run in it's own session (possibly different connections as we see
> on wmq connection manager), how does transactions work?
>
> When does the transaction manager commit? (my understanding is when the
> aggregation is completed and the aggregated message gets sent to the next
> endpoint).
>
> When it commits, does the transaction manager go through each session and
> commit?
>
> What happens to messages that are still being aggregated?
> (i.e  once the aggregation completes the aggregated message gets through ..
> but I'm assuming subsequent messages are in-flight and are being
> aggregated, while the previous aggregate is being sent and commit being
> called).
>
> Are there any corner cases here? I feel like we may call commit on inflight
> messages and there's a chance of loosing messages if it the application
> crashes.
>
> from("wmq:myQueue?concurrentConsumers=10")
>   .transacted()
>  // some transformation
>   .aggregate(new MyAggregationStrategy()).constant(true)
>                 .completionSize(Integer.parseInt(AGG_BATCH_SIZE))
>                 .completionInterval(Integer.parseInt(AGG_BATCH_TIMEOUT)
>  .to("some-other-endpoint")
>
> As an aside, we already see the performance is really bad compared to not
> using transactions. Wondering if client-ack could be used in this
> situation. Hypothetically we can keep track of the last message (for each
> session) and call acknowledge on each message.
> But not sure how to identify that.
>
> Regards,
>
> Rajith Muditha Attapattu <http://rajith.2rlabs.com/>



-- 
Claus Ibsen
-----------------
http://davsclaus.com @davsclaus
Camel in Action 2: https://www.manning.com/ibsen2



( ! ) Warning: include(msgfooter.php): failed to open stream: No such file or directory in /var/www/git/apache-camel-users/msg03543.html on line 143
Call Stack
#TimeMemoryFunctionLocation
10.0007368696{main}( ).../msg03543.html:0

( ! ) Warning: include(): Failed opening 'msgfooter.php' for inclusion (include_path='.:/var/www/git') in /var/www/git/apache-camel-users/msg03543.html on line 143
Call Stack
#TimeMemoryFunctionLocation
10.0007368696{main}( ).../msg03543.html:0