git.net

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: Transactions (Multiple JMS Consumers) and Aggregation


Thank you Claus!

The use case is as follows.
Queue ----> some validation ---> simple transformation ---> Batch/Aggregate
---> transform to SAP payload ----> SAP

With sjms, we could do
Queue --> sjms (with batching) ---> iterate over list to do validation -->
transform to SAP payload ---> SAP.
                                                                     |
                                                                     ---->
failed messages sent to error queue.

Since sjms supports transactions, I'm assuming that any
unexpected/unrecoverable errors at anypoint in the route will rollback the
transaction.


On Wed, Aug 1, 2018 at 8:15 AM, Claus Ibsen <claus.ibsen@xxxxxxxxx> wrote:

> Hi
>
> Yes exactly, and yeah camel-sjms has a special support for this kind
> of use-case you have, which is built directly into its consumer, so it
> would be just
>
> from A (incl aggregate) to B
>
>
> On Wed, Aug 1, 2018 at 2:10 PM, Rajith Muditha Attapattu
> <rajith77@xxxxxxxxx> wrote:
> > Thank you Claus for your quick response.
> >
> > Let's say my sending endpoint is another jms queue and considering my
> > original route....
> > Is the transaction boundaries as follows?
> >
> > from(myQueueA) .transacted() ....... Aggregator ......... to(myQueueB)
> > <------------------tx------------------------->
> > <------------------tx------->
> > I'm assuming this is why a persistent aggregator is recommend to avoid
> > loosing messages.
> >
> > The sjms seems like a better fit. I will look into it.
> > Thank you!
> >
> > On Wed, Aug 1, 2018 at 3:32 AM, Claus Ibsen <claus.ibsen@xxxxxxxxx>
> wrote:
> >
> >> Hi
> >>
> >> The aggregate EIP works independently from the original
> >> route/exchange. So in your example the transacted route will commit
> >> after the message has been handed off to the aggregator and then the
> >> route continues, but the route ends there, so it will then commit. And
> >> on the same time the aggregator runs in parallel (independent) and
> >> aggregate messages and then when they are completed, then trigger the
> >> message to be routed independent, but that is routed outside a
> >> transaction.
> >>
> >> So if you aim to batch N JMS messages into a single aggregate in the
> >> same TX then what you do there does not work.
> >>
> >> For batch JMS in transaction then look at the camel-sjms component
> >> that has such a feature. (the regular camel-jms does not).
> >> https://github.com/apache/camel/blob/master/components/
> >> camel-sjms/src/main/docs/sjms-batch-component.adoc
> >>
> >> And mind there is camel-sjms2 component also for JMS 2.0 API.
> >>
> >> For examples of this, then I suggest to look at the unit tests of
> itself.
> >>
> >> On Wed, Aug 1, 2018 at 2:58 AM, Rajith Muditha Attapattu
> >> <rajith77@xxxxxxxxx> wrote:
> >> > Hi All,
> >> >
> >> > I'm trying to understand how transactions would work in the following
> >> > scenario.
> >> > Given transactions are scoped by sessions, and each concurrent
> consumer
> >> > will be run in it's own session (possibly different connections as we
> see
> >> > on wmq connection manager), how does transactions work?
> >> >
> >> > When does the transaction manager commit? (my understanding is when
> the
> >> > aggregation is completed and the aggregated message gets sent to the
> next
> >> > endpoint).
> >> >
> >> > When it commits, does the transaction manager go through each session
> and
> >> > commit?
> >> >
> >> > What happens to messages that are still being aggregated?
> >> > (i.e  once the aggregation completes the aggregated message gets
> through
> >> ..
> >> > but I'm assuming subsequent messages are in-flight and are being
> >> > aggregated, while the previous aggregate is being sent and commit
> being
> >> > called).
> >> >
> >> > Are there any corner cases here? I feel like we may call commit on
> >> inflight
> >> > messages and there's a chance of loosing messages if it the
> application
> >> > crashes.
> >> >
> >> > from("wmq:myQueue?concurrentConsumers=10")
> >> >   .transacted()
> >> >  // some transformation
> >> >   .aggregate(new MyAggregationStrategy()).constant(true)
> >> >                 .completionSize(Integer.parseInt(AGG_BATCH_SIZE))
> >> >                 .completionInterval(Integer.
> parseInt(AGG_BATCH_TIMEOUT)
> >> >  .to("some-other-endpoint")
> >> >
> >> > As an aside, we already see the performance is really bad compared to
> not
> >> > using transactions. Wondering if client-ack could be used in this
> >> > situation. Hypothetically we can keep track of the last message (for
> each
> >> > session) and call acknowledge on each message.
> >> > But not sure how to identify that.
> >> >
> >> > Regards,
> >> >
> >> > Rajith Muditha Attapattu <http://rajith.2rlabs.com/>
> >>
> >>
> >>
> >> --
> >> Claus Ibsen
> >> -----------------
> >> http://davsclaus.com @davsclaus
> >> Camel in Action 2: https://www.manning.com/ibsen2
> >>
> >
> >
> >
> > --
> > Regards,
> >
> > Rajith Muditha Attapattu <http://rajith.2rlabs.com/>
>
>
>
> --
> Claus Ibsen
> -----------------
> http://davsclaus.com @davsclaus
> Camel in Action 2: https://www.manning.com/ibsen2
>



-- 
Regards,

Rajith Muditha Attapattu <http://rajith.2rlabs.com/>


( ! ) Warning: include(msgfooter.php): failed to open stream: No such file or directory in /var/www/git/apache-camel-users/msg03547.html on line 230
Call Stack
#TimeMemoryFunctionLocation
10.0006372792{main}( ).../msg03547.html:0

( ! ) Warning: include(): Failed opening 'msgfooter.php' for inclusion (include_path='.:/var/www/git') in /var/www/git/apache-camel-users/msg03547.html on line 230
Call Stack
#TimeMemoryFunctionLocation
10.0006372792{main}( ).../msg03547.html:0