Hello Every one,
I was getting "progress.message.jimpl.xmessage.MultipartMessage" on listening to a topic. I need a code sample reading message from "progress.message.jimpl.xmessage.MultipartMessage" I request some one please post the code sample. Please see my code.
public class ListenToTopic implements javax.jms.MessageListener
{
private static final String DEFAULT_BROKER_NAME = "localhost:2506";
private static final String DEFAULT_PASSWORD = "Administrator";
private static final int MESSAGE_LIFESPAN = 1800000; // milliseconds (30 minutes)
private javax.jms.TopicConnection connect = null;
private javax.jms.TopicSession receiveSession = null;
private void talker( String broker, String username, String password, String rTopic)
{
// Create a connection.
try
{
javax.jms.TopicConnectionFactory factory;
factory = (new progress.message.jclient.TopicConnectionFactory (broker));
connect = factory.createTopicConnection(username, password);
receiveSession = connect.createTopicSession(false,javax.jms.Session.AUTO_ACKNOWLEDGE);
}
catch (javax.jms.JMSException jmse)
{
System.err.println("error: Cannot connect to Broker - " + broker);
jmse.printStackTrace();
System.exit(1);
}
try
{
if (rTopic != null)
{
javax.jms.Topic receiveTopic = receiveSession.createTopic(rTopic);
javax.jms.TopicSubscriber tSubscriber = receiveSession.createSubscriber(receiveTopic);
tSubscriber.setMessageListener(this);
connect.start();
}
}
catch (javax.jms.JMSException jmse)
{
jmse.printStackTrace();
exit();
}
}
/**
* Handle the message
* (as specified in the javax.jms.MessageListener interface).
*/
public void onMessage( javax.jms.Message aMessage)
{
try
{
if (aMessage instanceof progress.message.jimpl.xmessage.MultipartMessage){
progress.message.jimpl.xmessage.MultipartMessage mpMessage = (progress.message.jimpl.xmessage.MultipartMessage)aMessage;
Envelope messageEnv = mpMessage.getEnvelope();
System.out.println( messageEnv.getMessage().toString() );
}
}
catch (Exception jmse)
{
jmse.printStackTrace();
}
catch (java.lang.RuntimeException rte)
{
rte.printStackTrace();
}
}
/** Cleanup resources and then exit. */
private void exit()
{
try
{
connect.close();
}
catch (javax.jms.JMSException jmse)
{
jmse.printStackTrace();
}
System.exit(0);
}
/** Main program entry point. */
public static void main(String argv[]) {
// Is there anything to do?
if (argv.length == 0) {
printUsage();
System.exit(1);
}
// Values to be read from parameters
String broker = DEFAULT_BROKER_NAME;
String username = null;
String password = DEFAULT_PASSWORD;
String tReceiver = null;
// Check parameters
for (int i = 0; i < argv.length; i++) {
String arg = argv[i];
// Options
if (!arg.startsWith("-")) {
System.err.println ("error: unexpected argument - "+arg);
printUsage();
System.exit(1);
}
else {
if (arg.equals("-b")) {
if (i == argv.length - 1 || argv[i+1].startsWith("-")) {
System.err.println("error: missing broker name:port");
System.exit(1);
}
broker = argv[++i];
continue;
}
if (arg.equals("-u")) {
if (i == argv.length - 1 || argv[i+1].startsWith("-")) {
System.err.println("error: missing user name");
System.exit(1);
}
username = argv[++i];
continue;
}
if (arg.equals("-p")) {
if (i == argv.length - 1 || argv[i+1].startsWith("-")) {
System.err.println("error: missing password");
System.exit(1);
}
password = argv[++i];
continue;
}
if (arg.equals("-tr")) {
if (i == argv.length - 1 || argv[i+1].startsWith("-")) {
System.err.println("error: missing receive queue parameter");
System.exit(1);
}
tReceiver = argv[++i];
continue;
}
if (arg.equals("-h")) {
printUsage();
System.exit(1);
}
}
}
// Check values read in.
if (username == null) {
System.err.println ("error: user name must be supplied");
printUsage();
System.exit(1);
}
if (tReceiver == null ) {
System.err.println ("error: receive queue, or send queue, must be supplied");
printUsage();
System.exit(1);
}
// Start the JMS client for the "Talk".
Talk talk = new Talk();
talk.talker (broker, username, password, tReceiver);
}
/** Prints the usage. */
private static void printUsage() {
StringBuffer use = new StringBuffer();
use.append("usage: java ListenToTopic (options) ...\n\n");
use.append("options:\n");
use.append(" -b name:port Specify name:port of broker.\n");
use.append(" Default broker: "+DEFAULT_BROKER_NAME+"\n");
use.append(" -u name Specify unique user name. (Required)\n");
use.append(" -p password Specify password for user.\n");
use.append(" Default password: "+DEFAULT_PASSWORD+"\n");
use.append(" -tr queue Specify queue for receiving messages.\n");
use.append(" -ts queue Specify queue for sending messages.\n");
use.append(" -h This help screen.\n");
System.err.println (use);
}
}
Check the sample in SONIC_HOME\MQX.Y\samples\QueuePTP\MultipartMessage
Also check out the JavaDoc at SONIC_HOME\Docs\api\sonicmq_api
Thomas
public void onMessage( javax.jms.Message message){
String msgBody = null;
String msgClass = null;
if (message instanceof javax.jms.TextMessage)
{
msgClass = "javax.jms.TextMessage";
try
{
msgBody = ((javax.jms.TextMessage)message).getText();
}
catch (javax.jms.JMSException jmse)
{
msgBody = "";
}
}
else if (message instanceof javax.jms.BytesMessage)
{
msgClass = "javax.jms.BytesMessage";
//int writeIndex = 0;
byte[] buffer = new byte[2048];
byte[] body = new byte[0];
javax.jms.BytesMessage bytesMessage = (javax.jms.BytesMessage) message;
try
{
while (true)
{
int numBytesRead = bytesMessage.readBytes(buffer);
if (numBytesRead == -1)
{
// No more bytes to read.
break;
}
byte[] tmp1 = new byte[body.length + numBytesRead];
System.arraycopy(body,0,tmp1,0,body.length);
System.arraycopy(buffer,0,tmp1,body.length,numBytesRead);
body = tmp1;
}
}
catch (javax.jms.JMSException jsme)
{
; // Finished reading body.
}
msgBody = new String(body);
}
else if (message instanceof javax.jms.MapMessage)
{
msgClass = "javax.jms.MapMessage";
}
else if (message instanceof javax.jms.ObjectMessage)
{
msgClass = "javax.jms.ObjectMessage";
}
else if (message instanceof javax.jms.StreamMessage)
{
msgClass = "javax.jms.StreamMessage";
}
else if (message instanceof progress.message.jclient.XMLMessage)
{
msgClass = "progress.message.jclient.XMLMessage";
try
{
msgBody = ((progress.message.jclient.XMLMessage)message).getText();
}
catch (javax.jms.JMSException jmse)
{
msgBody = "";
}
}
else if (message instanceof progress.message.jclient.MultipartMessage)
{
msgClass = "progress.message.jclient.MultipartMessage";
StringBuffer body = new StringBuffer();
progress.message.jclient.MultipartMessage mpMessage = (progress.message.jclient.MultipartMessage) message;
try
{
for (int ip=0; ip
{
progress.message.jclient.Part prt = mpMessage.getPart(ip);
progress.message.jclient.Header hdr = prt.getHeader();
body.append("--- Part " + String.valueOf (ip + 1) + " Headers ---\n");
body.append("Content-Type: " + hdr.getContentType() + "\n");
if (hdr.getContentId() != null) body.append("Content-Id: " + hdr.getContentId() + "\n");
java.util.Enumeration hdrFldNames = hdr.getHeaderFieldNames();
while (hdrFldNames.hasMoreElements())
{
String fld = (String) hdrFldNames.nextElement();
body.append(fld + ": " + hdr.getHeaderField(fld) + "\n");
}
if (hdr.getContentType().startsWith("text/"))
{
body.append("--- Part " + String.valueOf (ip + 1) + " Content ---\n");
body.append(prt.getContent() + "\n");
}
}
} catch (javax.jms.JMSException jsme)
{
; // Finished reading body.
}
msgBody = new String(body);
}
log.info("msgBody" + msgBody);
log.info("==================================");
log.info("=============COUNT OF MSG ||"+(++msgCount) +"||==================");
log.info("==================================");
}
Thanks a lot Thomas Steinborn and SK SK, I got the message now I have to fina a way to partition XML mesage from the original message and print out.
Thanks again.
Regards,
Swaroop Kunduru.