I have a sonic service which is listening to a endpoint.I want the service not to pick each & every message, but pick messages based on some criteria.The incoming message is an xml message.I want to base the selection on a particular element value of that message(destinationid).If that is not possible I can add a custom header to that incoming message( it is my other component of the application only which is sending the message) which could be used as a message selector.How to do that?
Moreover, I don't want to consume the message the moment it arrives.I want 5 minutes to pass or 50 messages to accumulate( whichever happens earlier), then only I want to consume the message from the queue so that all the messages could be batched. You have such a queue browser functionality (QueueSession.createBrowser(Queue) etc) for a client application. I want same functionality within a sonic service. Is it possible?
Regards,
Subhendu
For this:
Selector == this is available in SMC. The UI for Sonic Endpoint creations has a "Selector" field.
QueueBrowser == I would not recommend doing this with JMS, personally, as you are trying to turn Messaging into a Database. Better to really use a database. With respect to Sonic ESB, you can't do 'consuming' from a QueueBrowers this way. You can have your Sonic Service take the JMS connection and make your our queuebrowser. If you want to do it as 'JMS programming', then the prescribed mechanism is to actally use JMS programming for this.
Hello,
there are two possible ways.
1. and also the 'proper' way is to create a service type in ESB, implement your custom service. And when you deploy the service and create an instance of it, you assing entry endpoint. When you create an endpoint in sonic, you can define message selector there. So, you create as many instances of custom services you need and assign different endpoints to different instances with different message selectors.
2. The second, a bit more harcore (if for some unknown reason you need to control the message selector on the fly without a reconfiguration and restart) - create custom service, deploy, create instance with entry endpoint that points to a topic that never gets messages posted to. And then create separate JMS connection, implement MessageListener and off you go.
You can also create Dynamic Endponts for this, or reuse the JMS Connection from Sonic ESB (so you don't need to create your our Connection)
Thanks both of you for replying.But I still do not get a handle on this.
Can you please clarify more.
Regards,
Subhendu
smahanta wrote:
... From this connection you have to get session, from the session you have to create QueueBrowser ( for topic also it is queue-browser, right?).Though XQServiceContext also have getCurrentJMSSession() if called createBrowser on it will throw java.lang.UnsupportedOperationException.I think from the connection you have to create a new session....
Two things here.
1. Topics don't support a QueueBrowser. javax.jms.QueueBrowser is a JMS construct that only works for topics.
2. You need a QueueSession to create a QueueBrowser. You might be getting a TopicConnection or a TopicSession here, but I'm not sure.
Are you trying to implement a system where you have a Sonic Service that polls a queue and then looks at all the messages and decides which one it wants to process (and then creates a QueueReceiver on that message using a selector).?
I ask because this pattern is not terribly efficient from a JMS standpoint. If you have any sort of performance requirements, you might find them hard to meet. Generally, this pattern is treating the SonicMQ queue as a storage you want to get random access to. Queuing systems are not optimized for this. A better pattern is to have a service that implements the JMS queuelistener with SINGLE_MESSAGE_ACKNOWLEDGE. Read all the messages that come in, and take the ones you want and (a) forward them (b) ack them. You do need to eventually process all messages, so you can't leave things forever in an unack'd state as this will fill up your memory.
Hi William,
Thanks for replying.Actually I have 2 use-cases to be addressed.This is a bi-directional message flow.
1)In one direction I have to batch messages based on certain criteria.There I am going to use database for storage as you adviced.
2) In another direction, in ceratin circumstances sonic is processing messages faster than what some of the down-stream non-sonic components can handle.When messages are directed for this slow guys we need to slow down the message throughput.How-ever some other downstream applications are fast enough. When message is directed towards these faster guys performance is a criteria.But when the slower ones are at the downstream then performance is not a critera.We have to only ensure no message gets lost.
3)We thought of using in-memory data storage (java List)inside the sonic service to temporarily hold the message so that we can slow down the message flow.Downside of that approach is, if the sonic service or container goes down, as the service has already taken the message from queue the message is lost.
4) So we are thinking of using a hybrid approach. For cases where we want speed the sonic service will directly forward the message to destination.When we want to slow it down we will send the messages to a retry queue.We will have some timertasks which will keep polling the retry queue, based on message selector will pick up messages and send to the same queue where it was originally so that the service will pick it up again and may be this time will send to the destination directly.
Hopefully we don't burden sonic so much that the broker goes down.
Regards,
Subhendu
If you use a SINGLE_MESSAGE_ACKNOWLEDGE at the JMS level, you can hold things in memory and they won't be lost in the event of a crash. They will just be re-enqueued.
If the issue is the downstream speed of services, it seems most natural to just toss in on the entry queue for these services and let Sonic Queues (or Durable Subscriptions) just handle the work.
Hi William,
Many thanks for the help.
What will be the downside if we introduce Thread.sleep(msg_thruput_downstream_app_can_handle) in the service method itself?
As I undestand listeners are just threads in a threadpool.Whenever a new message arrives at the entry end point a listener is picked from the threadpool & it executes the service method.So if we introduce sleep message processing will slowdown but if we have hundreds of listeners, some other listener will pick up the next incoming message, and may be that message was not meant to be slowed down, so will be be processesed without sleep.
Slower downstream services are TCP/IP based, ie after processing message will be sent through socket connection.
If this approach is not right I am going to use jms api in synchronous mode with SINGLE_MESSAGE_ACKNOWLEDGE.
Regards,
Subhendu
Introducing a sleep in your service method is not "nice" towards the broker since it means messages are longer indoubt. Also, since you are using PubSub (Topics) this service will become a slow subscriber, causing flow control and having side effects on all subscribers. You can address these problems by using flow to disk (e.g. buffering to the broker db) but it is important that you catch up eventually.
I don't know the details for your downstream tcp service, but i would somehow assume that you at some point cannot send to the socket anymore since buffers are full etc. So the socket send will throttle your subscriber naturally, won't it? Thus, I am wondering if what is the benefit if introducing an artificial sleep. Can you elaborate?
Hi Thomas,
Thank you for replying.
Some of the downstream services are http based, some are web-service based, some are tcp-ip based.We do not have problem with http & web-service ones, but for tcp-ip based ones socket buffer gets full, resulting in throttling of message. So we need to either re-send the messages for which negative acknowledgement came or we have to somehow slowdown the message throughput.
One way could be to store the failed messages in database & send them at throttle rate.
Regards,
Subhendu
So the problem in the TCP case is that you are getting NACKs, and you want to avoid that, right?
Or rather, in the case of a NACK you need to try again once the backend system is free again.
Well in that case using a queue (note: purposefully not topic) that buffers all messages towards the system is the right choice. In other words:
E.g. the key is to separate dispatching by destination from the actual send.
Note: As Bill pointed out earlier, messaging systems are not databases. So the outbound queue could equally be a db, but the concepts still apply.
HTH
Thomas