git.net

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: Help with Splitter/Aggregator-like behavior


> On September 20, 2018 at 7:30 AM Claus Ibsen <claus.ibsen@xxxxxxxxx> wrote:
> 
> 
> Hi
> 
> You can use the splitter and aggregator to do something like that.
> 
> The aggregator can group the splitted messages together based on that
> frame, and it can do this in out of order.
> And the output of the aggregator is routed separated from the input (async)
> 
> Maybe try to build a simple use-case / sample / unit test with just
> the splitter and aggregator and see if you can build something.
> And if not then maybe post a bit here again with what you have done,
> so we can better help / understand your use-case.


Hi, Claus.

Thank you for your response.

As it turned out, I didn't need a Splitter or Aggregator at all to implement my "distributed splitter/aggregator".

All I needed was a ProducerTemplate and some simple Processor classes.

It really ended up being a thing of beauty...

I'll include the sanitized code below for anyone who is interested.

[ First I list the "before" SENDER / RECEIVER, which doesn't have data chunking,
  and then the "after" SENDER / RECEIVER which includes a "DataChunker" to do the Splitting
  and a "DataChunkProcessor" to do the Aggregating. ]

Thanks again.

Ron

p.s. And if anyone wants to help me figure out how to get @Autowired to work for ProducerTemplate, that would be great.
     Otherwise, creating and holding onto the first ProducerTemplate I need is working just fine.
     From googling, the best I can determine is that somehow my Processor @Component classes are being processed
     before the CamelContext is fully up (and so there's no default ProducerTemplate to inject).  Yes...?

--------------------------------------------------------------------------------
--------------------------------------------------------------------------------

BEFORE, without data chunking (splitting/aggregating).

SENDER:

@Component
public class Route extends RouteBuilder
{
    @Override
    public void configure () throws Exception
    {
        restConfiguration()
                .host("localhost").port(8080)
                .bindingMode(RestBindingMode.json);

        getContext().setStreamCaching(true);

        from("timer:autos?period={{timer.period}}")
                .streamCaching()
                .to("rest:get:auto/list")
                .to("direct:udp");

        from("direct:udp")
                .convertBodyTo(String.class, “UTF-8”)
                .to("netty4:udp://localhost:{{receiver.port}}?udpConnectionlessSending=true&sync=false")
    }
}

--------------------------------------------------------------------------------

RECEIVER:

@Component
public class AdapterRouteBuilder extends RouteBuilder
{
    @Override
    public void configure () throws Exception
    {
        JacksonDataFormat autoListJson = new ListJacksonDataFormat(Auto.class);

        getContext().setStreamCaching(true);

        from("netty4:udp://localhost:{{receiver.port}}?sync=false")
                .to("direct:autolist");

        from("direct:autolist")
                .unmarshal(autoListJson)
                .process(new AutoListProcessor())  // creates a protobuf, sets it in the Exchange
                .to("direct:protobuf");

        from("direct:protobuf")
                .marshal().protobuf()
                .to("rabbitmq:{{rabbitmq.exch}}?connectionFactory=#rabbitmq&routingKey={{rabbitmq.rkey}}&exchangeType=topic&durable=false&autoDelete=true");
    }

--------------------------------------------------------------------------------
--------------------------------------------------------------------------------

AFTER, with data chunking (splitting/aggregating).

SENDER:

@Component
public class Route extends RouteBuilder
{
    @Override
    public void configure () throws Exception
    {
        JacksonDataFormat autoListJson  = new ListJacksonDataFormat(Auto.class);
        JacksonDataFormat dataChunkJson = new JacksonDataFormat(DataChunk.class);

        getContext().setStreamCaching(true);

        restConfiguration()
                .host("localhost").port(8080)
                .bindingMode(RestBindingMode.json);

        getContext().setStreamCaching(true);

        from("timer:autos?period={{timer.period}}")
                .to("rest:get:auto/list")
                .to("direct:datachunker");

        from("direct:datachunker")
                .unmarshal(autoListJson)
                .process(new DataChunker());  // creates DataChunk; sends directly to "direct:udp"

        from("direct:udp")
                .marshal(dataChunkJson)
                .convertBodyTo(String.class, "UTF-8")
                .to("netty4:udp://localhost:{{receiver.port}}?udpConnectionlessSending=true&sync=false")
    }
}

--------------------------------------------------------------------------------

RECEIVER:

@Component
public class AdapterRouteBuilder extends RouteBuilder
{
    @Override
    public void configure () throws Exception
    {
        JacksonDataFormat dataChunkJson = new JacksonDataFormat(DataChunk.class);

        getContext().setStreamCaching(true);

        from("netty4:udp://localhost:{{receiver.port}}?sync=false")
                .to("direct:datachunk");

        from("direct:datachunk")
                .unmarshal(dataChunkJson)
                .process(new DataChunkProcessor());  // sends directly to "direct:protobuf"

        from("direct:protobuf")
                .marshal().protobuf()
                .to("rabbitmq:{{rabbitmq.exch}}?connectionFactory=#rabbitmq&routingKey={{rabbitmq.rkey}}&exchangeType=topic&durable=false&autoDelete=true");
    }

--------------------------------------------------------------------------------
--------------------------------------------------------------------------------

public class DataChunker implements Processor
{
//    @Autowired
//    private ProducerTemplate producerTemplate;  // XXX: can't get this to work

    private ProducerTemplate producerTemplate = null;

    private static int frame = 0;

    public void process (Exchange exchange) throws Exception
    {
        if ( producerTemplate == null ) producerTemplate = exchange.getContext().createProducerTemplate();

        List<Auto> autos = null;
        try {
            autos = exchange.getIn().getBody(List.class);
        } catch (Exception e) {
            System.out.println("*** ERROR: DataChunker: process: " + e.getMessage());
        }

        ++frame;

        int packet = 0;
        for ( Auto auto : autos )
        {
            DataChunk chunk = new DataChunk();
            chunk.setFrame(frame);
            chunk.setPacket(++packet);
            chunk.setTotalPackets(autos.size());
            chunk.getList().add(auto);

            producerTemplate.sendBody("direct:udp", chunk);
        }

        // XXX: That's it.  Don't need to do: exchange.getIn().setBody(...);
    }
}

--------------------------------------------------------------------------------

public class DataChunkProcessor implements Processor
{
    private static class DataFrame
    {
        public int frame = 0;
        public int size  = 0;  // expected # packets, from a DataChunk header

        public boolean written = false;

        public Set<Integer> packets = new HashSet<Integer>();
        public List<Autos>  autos   = new ArrayList<>();

        // XXX: ... stuff left out
    }

//    @Autowired
//    private ProducerTemplate producerTemplate;  // XXX: can't get this to work

    private ProducerTemplate producerTemplate = null;

    private static DataFrame dataFrame = new DataFrame();

    public void process (Exchange exchange) throws Exception
    {
        DataChunk chunk = null;
        try {
            chunk = exchange.getIn().getBody(DataChunk.class);
        } catch (Exception e) {
            System.out.println("*** ERROR: DataChunkProcessor: process: " + e.getMessage());
        }

        // XXX: process DataChunk, store stuff in 'dataFrame', determine if it is time to write, etc.

        if ( it_is_time_to_write ) {
            buildAndWrite(exchange);
        }

        // XXX: don't have to do anything to the Exchange; the route has effectively ended in RouteBuilder
    }

    public void buildAndWrite (Exchange exchange) throws ParseException
    {
        if ( producerTemplate == null ) producerTemplate = exchange.getContext().createProducerTemplate();

        MyProtobuf proto = buildProtobuf();  // XXX: loops on dataFrame.autos and builds custom protobuf object
        producerTemplate.sendBody("direct:protobuf", proto);
        dataFrame.written = true;
    }
}


> On Thu, Sep 20, 2018 at 7:15 AM Ron Cecchini <roncecchini@xxxxxxxxxxx> wrote:
> >
> >
> > So, I have a situation where I need something like a Splitter and an Aggregator.
> > But as far as I can tell from reading and googling, maybe my situation is nonstandard?
> > From what I can tell, a Splitter and Aggregator are used together within a single route.
> > In my case, I need the Splitter and Aggregator separated into a sender and receiver, resp.
> >
> >
> > I'm just looking for someone to tell me if the following fits squarely within the Splitter
> > and Aggregator patterns - if so, I'll dig in and figure it out - or if there's another pattern
> > or something else to try.
> >
> >
> > Thank you in advance for your guidance, and sorry for being so verbose again (just trying to be clear).
> >
> >
> > -----
> >
> >
> > On the Splitter side, per usual, I need to split a big message into individual messages.
> > However, I can't just split and let each individual message continue on the route.
> > Instead, I need to "wrap" each individual message and stick some header information on it
> >
> >
> > The situation is very much like the following, which is very UDP-like:
> >
> >
> > Big messages come in, and they get split into "packages" of a preset size.
> > All the individual "packages" can be said to belong to a "frame" of data.
> > The header of the individual messages contain the Frame # and Package # and the Total #
> > of packages in the frame so the receiver knows when it has received a full frame of data.
> >
> > Message: 1
> >      Frame: 1 - Package: 1 - Total: 3
> >      Frame: 1 - Package: 2 - Total: 3
> >      Frame: 1 - Package: 3 - Total: 3
> > Message: 2
> >      Frame: 2 - Package: 1 - Total: 2
> >      Frame: 2 - Package: 2 - Total: 2
> >
> >
> > Etc.
> >
> >
> > If I can't accomplish this with a split() of some kind, how could I do it with a regular Processor?
> > Having a Processor manually split and bundle the data into "packages" is trivial.
> > But how does the Processor then write the individual messages back to a "direct:processPackage" route point?
> > Can a Processor invoke (write data to) a route, at some point in the middle of that route?
> >
> >
> > -----
> >
> >
> > The Aggregator, as you would expect, needs to do the opposite of the above:
> >
> >
> > It needs to aggregate "packages" of data until it determines it has a full "frame".
> > Then it bundles all the package payloads into a single, big message.
> > When a frame is not full, data does not flow to the rest of the route.
> > When the frame is full, the data is written to some route mid-point; e.g. "direct:translateMessage".
> >
> >
> > So, can this sort of "asynchronous" aggregating be done?
> > Can an aggregating Processor basically maintain state, and decide to write or not write to a route?
> >
> >
> > Thank you again for any pointers.
> 
> 
> 
> -- 
> Claus Ibsen
> -----------------
> http://davsclaus.com @davsclaus
> Camel in Action 2: https://www.manning.com/ibsen2