How to implement Aggregator in ESB ?

Posted by Admin on 10-Sep-2009 16:27

Hello,

I would like to implement an aggregator in ESB - the main motivation is performance: we often get bursts of many small messages that take long to process. The processing time would be greatly improved if data scattered in these messages would be processed in one message.

So I plan to create a service acting like this: after receiving a message, it would wait N seconds (N is somewhere between 1 and 15 for now), and collect data from all incoming messages into one. After that time, it would release the "aggregate" message. All messages have the same itinerary, so here should be no problem.

The only issue I have is that I want to assure that no messages are lost (AtLeastOnce delivery semantics). Obviously if I would keep the messages' data in memory and a machine/process crash would happen during the aggregation phase, all messages received would be lost. Working with files and/or databases seems to be an unnecessary overhead. What I plan to do is to delay JMS acknowledge until the aggregate is sent.

How to implement that ? Creating a custom JMS session with a custom JMS consumer could be a way but it seems to me to be a too low-level solution. Is there any way to do this using standard ESB API ? (Dealing with endpoints and XQMessages instead of topics/queues and JMS Messages)

Thank you,

Pavol Mederly

All Replies

Posted by gliptak on 11-Sep-2009 11:57

You might want to read this thread http://communities.progress.com/pcom/message/22489 (and possible search for earlier comments around this topic).

Maybe you can take an approach where a "sink" process (acknowledges and) collects all incoming messages into a queue (ensuring they are transacted and recoverable), while an aggregator process (which wakes up every N minutes) reads the queue above and sends out the "aggregate" message.

Posted by Admin on 12-Sep-2009 07:16

Thanks, Gabor. Actually I searched PSDN before posting my question, and I also have found the thread you mention. David Millman proposes there a couple of solutions but these are based on assumption that I have some "starting message" that triggers processing at more services and I wish to aggregate the results -- something that can be easily implemented using e.g Split & Join (Parallel) Service.

I do not know if I understand your proposal completely -- you think of sending all the messages into a queue, and then reading N of them, sending the aggregate out, and acknowledging them. Yes, but: is this possible to do in ESB without having to work directly with JMS ? Because in ESB if you hang a service on an entry point (the queue), and start processing messages, after returning from the service() call the individual (aggregatee) message is gone and can not be recovered if a container crashes a second later.

I have got an idea of a solution: reading incoming messages in multiple threads, sending the aggregate out, and just after that returning from service() call in all these threads (causing acknowledge of the messages). There are problems also here, e.g. (i) such solution is possibly not preserving the original order of messages, (ii) to aggregate N messages it requires N threads. Also I have some technical difficulties implementing this. But if I succeed, I will post here the code :-)

Any other ideas ?

Best regards,

Pavol Mederly

Posted by Bill Wood on 01-Oct-2009 08:43

mederly wrote:

. Yes, but: is this possible to do in ESB without having to work directly with JMS ? Because in ESB if you hang a service on an entry point (the queue), and start processing messages, after returning from the service() call the individual (aggregatee) message is gone and can not be recovered if a container crashes a second later.

You are correct that the service() method manages the acknowledgement.  However, you can have a service that reuses the ESB's JMS connection and write your own message listener as a JMS client (or it could make its own JMS connection).   Because what you want to do is write what is effectively a special JMS onramp to the ESB, you should do that in JMS. 

  • create your listener with Client_Acknowledge.
  • read messages until you think you are done.
  • when the last one arriives:
    • Create an XQMessage with the combined result and Dispatch to the process or step that you want to go to.
  • ack the last message.

(If the messages in the queue are not all for the same aggregator, then you might want to use SINGLE_MESSAGE_ACKNOWLEDGE).

This thread is closed