git.net

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: throttling/backpressure on a route


Hi,
I would suggest
1- Take a look at throttle-eip. (Make sure you understand split, aggregate
and mongodb3 producer adocs around mongodb aggregation. Try and see your
case unless you can explain or create a unit test to pinpoint your issue
and also check mongodb3 components test case for better examples.)
2- All in memory. Depending on your route design, they are either in
memoryaggregationrepository or flowing through downstream enpoints / routes.
3- For your example, take a look at split, aggreagte, mongodb3 aggregate
functionality and possibly, throttle or delay eips.


On Tue, Oct 16, 2018 at 10:18 AM Peter Nagy (Jr) <pnagy@xxxxxxxxxx> wrote:

> Newbie question incoming.
>
> I have a route that looks like
>
> ...
> .setHeader("CamelMongoDbBatchSize", 128)
> .to("mongodb3:mymongo?...&operation=aggregate&outputType=MongoIterable")
> .split(constant(true))
> .streaming()
> ...
> .aggregate(constant(true), new GroupedBodyAggregationStrategy())
> .completionSize(128)
> .to("mongodb3:myothermongo?...&operation=bulkWrite")
>
> Pardon if there's some typos, I wrote this by hand since I'm using Camel
> from clojure and the clojure code looks a bit different.
>
> This works as expected, the aggregation is retrieved in batches and sent
> downstream one-by-one, being processed with various Processors and
> finally written in batches.
>
> I had a bug before where I had
>
> .aggregate().body().completionSize(128).completionTimeout(30000)
>
> which resulted in the aggregation waiting for the timeout. However in
> the meantime the mongo aggregation query finished processing everything.
>
> This raised some questions for me.
>
> 1. My process needs to be memory-friendly. Even after fixing the bug I
> need to be sure the aggregation query won't fill the RAM when downstream
> can't keep up. How can I apply some backpressure? How can I tell the
> route "don't process more until someone downstream says you can"? E.g.
> saying if there's more than N exchanges pending, wait.
>
> 2. Who was buffering the exchanges? I had ~2k entries flowing through,
> where did they end up queued?
>
> 3. Did I miss a doc page where these questions are explained? I spent a
> considerable amount of time searching for an answer thinking "This must
> be a common requirement, surely there's an example somewhere".
>
> --
> To reach a goal one has to enjoy the journey.
>