Java program for listening to a topic

Posted by Admin on 19-Jul-2010 20:48

Hello Every one,

I was getting "progress.message.jimpl.xmessage.MultipartMessage" on listening to a topic. I need a code sample reading message from "progress.message.jimpl.xmessage.MultipartMessage" I request some one please post the code sample. Please see my code.

public class ListenToTopic    implements javax.jms.MessageListener
{
    private static final String DEFAULT_BROKER_NAME = "localhost:2506";
    private static final String DEFAULT_PASSWORD = "Administrator";
    private static final int    MESSAGE_LIFESPAN = 1800000;  // milliseconds (30 minutes)

    private javax.jms.TopicConnection connect = null;
    private javax.jms.TopicSession receiveSession = null;
   
   

    private void talker( String broker, String username, String password, String rTopic)
    {
        // Create a connection.
        try
        {
            javax.jms.TopicConnectionFactory factory;
            factory = (new progress.message.jclient.TopicConnectionFactory (broker));
            connect = factory.createTopicConnection(username, password);
            receiveSession = connect.createTopicSession(false,javax.jms.Session.AUTO_ACKNOWLEDGE);
        }
        catch (javax.jms.JMSException jmse)
        {
            System.err.println("error: Cannot connect to Broker - " + broker);
            jmse.printStackTrace();
            System.exit(1);
        }

        try
        {
            if (rTopic != null)
            {
                javax.jms.Topic receiveTopic = receiveSession.createTopic(rTopic);
                javax.jms.TopicSubscriber tSubscriber = receiveSession.createSubscriber(receiveTopic);
                tSubscriber.setMessageListener(this);
                connect.start();
            }
        }
        catch (javax.jms.JMSException jmse)
        {
            jmse.printStackTrace();
            exit();
        }

       
    }

    /**
     * Handle the message
     * (as specified in the javax.jms.MessageListener interface).
     */
    public void onMessage( javax.jms.Message aMessage)
    {
        try
        {
                if (aMessage instanceof progress.message.jimpl.xmessage.MultipartMessage){
                    progress.message.jimpl.xmessage.MultipartMessage mpMessage = (progress.message.jimpl.xmessage.MultipartMessage)aMessage;
                    Envelope messageEnv = mpMessage.getEnvelope();
                    System.out.println( messageEnv.getMessage().toString() );
                }
               
            }
            catch (Exception jmse)
            {
                jmse.printStackTrace();
            }
       
        catch (java.lang.RuntimeException rte)
        {
            rte.printStackTrace();
        }
    }

    /** Cleanup resources and then exit. */
    private void exit()
    {
        try
        {
            connect.close();
        }
        catch (javax.jms.JMSException jmse)
        {
            jmse.printStackTrace();
        }

        System.exit(0);
    }


    /** Main program entry point. */
    public static void main(String argv[]) {

        // Is there anything to do?
        if (argv.length == 0) {
            printUsage();
            System.exit(1);
        }

        // Values to be read from parameters
        String broker        = DEFAULT_BROKER_NAME;
        String username      = null;
        String password      = DEFAULT_PASSWORD;
        String tReceiver    = null;

        // Check parameters
        for (int i = 0; i < argv.length; i++) {
            String arg = argv[i];

            // Options
            if (!arg.startsWith("-")) {
                System.err.println ("error: unexpected argument - "+arg);
                printUsage();
                System.exit(1);
            }
            else {
                if (arg.equals("-b")) {
                    if (i == argv.length - 1 || argv[i+1].startsWith("-")) {
                        System.err.println("error: missing broker name:port");
                        System.exit(1);
                    }
                    broker = argv[++i];
                    continue;
                }

                if (arg.equals("-u")) {
                    if (i == argv.length - 1 || argv[i+1].startsWith("-")) {
                        System.err.println("error: missing user name");
                        System.exit(1);
                    }
                    username = argv[++i];
                    continue;
                }

                if (arg.equals("-p")) {
                    if (i == argv.length - 1 || argv[i+1].startsWith("-")) {
                        System.err.println("error: missing password");
                        System.exit(1);
                    }
                    password = argv[++i];
                    continue;
                }

                if (arg.equals("-tr")) {
                    if (i == argv.length - 1 || argv[i+1].startsWith("-")) {
                        System.err.println("error: missing receive queue parameter");
                        System.exit(1);
                    }
                    tReceiver = argv[++i];
                    continue;
                }
                if (arg.equals("-h")) {
                    printUsage();
                    System.exit(1);
                }
            }
        }

        // Check values read in.
        if (username == null) {
            System.err.println ("error: user name must be supplied");
            printUsage();
            System.exit(1);
        }

        if (tReceiver == null ) {
            System.err.println ("error: receive queue, or send queue, must be supplied");
            printUsage();
            System.exit(1);
        }

        // Start the JMS client for the "Talk".
        Talk talk = new Talk();
        talk.talker (broker, username, password, tReceiver);

    }

    /** Prints the usage. */
    private static void printUsage() {

        StringBuffer use = new StringBuffer();
        use.append("usage: java ListenToTopic (options) ...\n\n");
        use.append("options:\n");
        use.append("  -b name:port Specify name:port of broker.\n");
        use.append("               Default broker: "+DEFAULT_BROKER_NAME+"\n");
        use.append("  -u name      Specify unique user name. (Required)\n");
        use.append("  -p password  Specify password for user.\n");
        use.append("               Default password: "+DEFAULT_PASSWORD+"\n");
        use.append("  -tr queue    Specify queue for receiving messages.\n");
        use.append("  -ts queue    Specify queue for sending messages.\n");
        use.append("  -h           This help screen.\n");
        System.err.println (use);
    }

}

All Replies

Posted by tsteinbo on 20-Jul-2010 04:03

Check the sample in SONIC_HOME\MQX.Y\samples\QueuePTP\MultipartMessage

Also check out the JavaDoc at SONIC_HOME\Docs\api\sonicmq_api

Thomas

Posted by sk185050 on 20-Jul-2010 06:14

public void onMessage( javax.jms.Message message){
        String msgBody = null;
        String msgClass = null;
       
       
        if (message instanceof javax.jms.TextMessage)
        {
               
            msgClass = "javax.jms.TextMessage";
            try
            {
                msgBody = ((javax.jms.TextMessage)message).getText();
            }
            catch (javax.jms.JMSException jmse)
            {
                msgBody = "";
            }
        }
        else if (message instanceof javax.jms.BytesMessage)
        {
            msgClass = "javax.jms.BytesMessage";
            //int writeIndex = 0;
            byte[] buffer = new byte[2048];
            byte[] body = new byte[0];
            javax.jms.BytesMessage bytesMessage = (javax.jms.BytesMessage) message;
            try
            {
                while (true)
                {
                    int numBytesRead = bytesMessage.readBytes(buffer);
                    if (numBytesRead == -1)
                    {
                        // No more bytes to read.
                        break;
                    }

                    byte[] tmp1 = new byte[body.length + numBytesRead];
                    System.arraycopy(body,0,tmp1,0,body.length);
                    System.arraycopy(buffer,0,tmp1,body.length,numBytesRead);
                    body = tmp1;
                }
            }
            catch (javax.jms.JMSException jsme)
            {
                ; // Finished reading body.
            }
            msgBody = new String(body);
        }
        else if (message instanceof javax.jms.MapMessage)
        {
            msgClass = "javax.jms.MapMessage";
        }
        else if (message instanceof javax.jms.ObjectMessage)
        {
            msgClass = "javax.jms.ObjectMessage";
        }
        else if (message instanceof javax.jms.StreamMessage)
        {
            msgClass = "javax.jms.StreamMessage";
        }
        else if (message instanceof progress.message.jclient.XMLMessage)
        {
            msgClass = "progress.message.jclient.XMLMessage";
            try
            {
                msgBody = ((progress.message.jclient.XMLMessage)message).getText();
            }
            catch (javax.jms.JMSException jmse)
            {
                msgBody = "";
            }
        }
        else if (message instanceof progress.message.jclient.MultipartMessage)
        {
            msgClass = "progress.message.jclient.MultipartMessage";
            StringBuffer body = new StringBuffer();
            progress.message.jclient.MultipartMessage mpMessage = (progress.message.jclient.MultipartMessage) message;
            try
            {
                for (int ip=0; ip
                {
                    progress.message.jclient.Part prt = mpMessage.getPart(ip);
                    progress.message.jclient.Header hdr = prt.getHeader();
                    body.append("--- Part " + String.valueOf (ip + 1) + " Headers ---\n");
                    body.append("Content-Type: " + hdr.getContentType() + "\n");
                    if (hdr.getContentId() != null) body.append("Content-Id: " + hdr.getContentId() + "\n");
                    java.util.Enumeration hdrFldNames = hdr.getHeaderFieldNames();
                    while (hdrFldNames.hasMoreElements())
                    {
                        String fld = (String) hdrFldNames.nextElement();
                        body.append(fld + ": " + hdr.getHeaderField(fld) + "\n");
                    }
                    if (hdr.getContentType().startsWith("text/"))
                    {
                        body.append("--- Part " + String.valueOf (ip + 1) + " Content ---\n");
                        body.append(prt.getContent() + "\n");
                    }
                }
            } catch (javax.jms.JMSException jsme)
            {
                ; // Finished reading body.
            }
            msgBody = new String(body);
        } 
       
    log.info("msgBody" + msgBody);
    log.info("==================================");
    log.info("=============COUNT OF MSG ||"+(++msgCount) +"||==================");
    log.info("==================================");
   
     }

Posted by Admin on 20-Jul-2010 09:51

Thanks a lot Thomas Steinborn and SK SK, I got the message now I have to fina a way to partition XML mesage from the original message and print out.

Thanks again.

Regards,

Swaroop Kunduru.

This thread is closed