Processing one message at a time (and not losing any)

Posted by yuruki on 30-Sep-2010 01:37

I have a topic with messages in it and a custom service which processes messages from that topic. The service communicates with a legacy system over a socket protocol. For every message sent over the socket the service expects an acknowledgement message. The communication has strict sequence numbering and only one message is allowed at a time. Also, if the service is shutdown or killed, it should continue with the last non-ack'd message. We are using SonicESB 7.5.1.

Now, I have implemented the service with locking at service level. Service() method is synchronized and when a message is sent out through the socket it blocks until an ack has been received. It works, but doesn't handle shutdowns or restarts well. When message reaches service() call, it has already been removed from the topic and is on service's responsibility and will disappear if the service dies, right? I tried to work around this by writing the pending/blocking messages to another topic at service stop(), but the environment seems not to be available anymore for sending messages at that point (I find this strange, by the way).

Ideally (and this might not be possible), topic would somehow inform the service that it has messages, service would set up the communications with the legacy system and take one message from the topic (but not remove it), service would send the message out and wait for the ack. Once the ack has been received the message would be removed from the topic and the process would start over. This way the service would start where it left even if the Java process itself was killed unexpectedly. If there's no ack, the sending will timeout and the message will be sent again. If a negative ack is received the message is sent to a fail queue.

Is it possible to achieve something like this outside the service logic with the right configuration? Or can a service requeue messages to the front of a topic/queue? And is it not possible to send messages in the service stop() method (I get com.sonicsw.xq.XQAddressNotFoundExceptions trying to do that)?

All Replies

Posted by mnair on 30-Sep-2010 07:12

You can configure your service to realize your use case with explicitly adding synchronization logic:

1. Use a Quality of Service of Exactly Once or At Least Once on the entry endpoint of the service. With both these modes you will end up getting persistent messaging. With ALO QOS the incoming message is not acknowledged till the service method finishes processing the message. With EXO QOS the session is committed only after the service finishes processing the message, otherwise the session is rolled back and the message is not consumed.

2. If message ordering is important then configure this service with listener count set to 1.

3. If you are using this service in a process and if intracontainer messaging is enabled then this service might get invoked directly from a service that is an upstream step in the process. In that case configure the QOS and listener count for the entry endpoint of this service as well.


Thanks

-Mahesh

Posted by yuruki on 01-Oct-2010 01:00

Great!

This is exactly the information I was looking for. I have read about the QoS, but never really understood what it means on service() call level. So, messages sent to service() are acknowledged (and removed from the queue/topic) only when and if the call returns? And if I set the listener count for the service instance to 1, it guarantees that only one service() call can be active at once, right?

I believe 1 listener and EXO solves the case of unexpected failure. But how about shutdown with stop()? If the service is waiting for socket ack with Thread.wait() Sonic can't shut it down, and if I release the wait at stop(), service() will return and the message will be removed from the queue. Is there a way to exit service() without losing the message in the queue?

Posted by mnair on 04-Oct-2010 07:29

> This is exactly the information I was looking for. I have read about the QoS,

> but never really understood what it means on service() call level. So, messages

> sent to service() are acknowledged (and removed from the queue/topic) only when

> and if the call returns? And if I set the listener count for the service

> instance to 1, it guarantees that only one service() call can be active at

> once, right

Yes, for both questions.

> I believe 1 listener and EXO solves the case of unexpected failure. But how

> about shutdown with stop()? If the service is waiting for socket ack with

> Thread.wait() Sonic can't shut it down, and if I release the wait at stop(),

> service() will return and the message will be removed from the queue. Is there

> a way to exit service() without losing the message in the queue?

If you are using ESB 7.6 or 8.0, then you can make use of the abort functionality that in XQLifeCycleManager accessible from the XQInitContext and XQServiceContext. If you call abort on the service then all the messages that are currently being processed will:

- not be acknowledged in case of ALO QOS

- be rolled back in case of EXO QOS

Thanks

-Mahesh

Posted by yuruki on 05-Oct-2010 02:57

Suddenly it occurred to me that it's possible to abort the service by throwing an XQServiceException.

At stop() I set a flag indicating that the service is shutting down and release the wait(). If waiting-for-the-ack flag and service-stopped flag are on at the same time an XQServiceException is thrown stating the situation.

Posted by mnair on 05-Oct-2010 06:06

If you throw an Exception then the message gets sent to the RME and the behavior is not the same as doing an abort.

Posted by yuruki on 05-Oct-2010 06:23

AbortService would probably be the cleaner approach. But since it is not available in version 7.5.1, throwing an exception is the best I can do. The message sent to RME still stays in the incoming queue and is not lost.

This thread is closed