Jboss Queues And Topics For Argumentative Essays

The JMS API stands for Java Message Service Application Programming Interface, and it is used by applications to send asynchronous business-quality messages to other applications. In the messaging world, messages are not sent directly to other applications. Instead, messages are sent to destinations, known as queues or topics. Applications sending messages do not need to worry if the receiving applications are up and running, and conversely, receiving applications do not need to worry about the sending application's status. Both senders, and receivers only interact with the destinations.

The JMS API is the standardized interface to a JMS provider, sometimes called a Message Oriented Middleware (MOM) system. JBoss comes with a JMS 1.1 compliant JMS provider called JBoss Messaging or JBossMQ. When you use the JMS API with JBoss, you are using the JBoss Messaging engine transparently. JBoss Messaging fully implements the JMS specification; therefore, the best JBoss Messaging user guide is the JMS specification. For more information about the JMS API please visit the JMS Tutorial or JMS Downloads & Specifications.

This chapter focuses on the JBoss specific aspects of using JMS and message driven beans as well as the JBoss Messaging configuration and MBeans.

In this section we discuss the basics needed to use the JBoss JMS implementation. JMS leaves the details of accessing JMS connection factories and destinations as provider specific details. What you need to know to use the JBoss Messaging layer is:

  • The location of the queue and topic connect factories: In JBoss both connection factory implementations are located under the JNDI name .

  • How to lookup JMS destinations (queues and topics): Destinations are configured via MBeans as we will see when we discuss the messaging MBeans. JBoss comes with a few queues and topics preconfigured. You can find them under the domain in the JMX Console..

  • Which JARS JMS requires: These include , , , , and .

In the following sections we will look at examples of the various JMS messaging models and message driven beans. The chapter example source is located under the directory of the book examples.

Let's start out with a point-to-point (P2P) example. In the P2P model, a sender delivers messages to a queue and a single receiver pulls the message off of the queue. The receiver does not need to be listening to the queue at the time the message is sent. Example 6.1, “A P2P JMS client example” shows a complete P2P example that sends a to the queue and asynchronously receives the message from the same queue.

Example 6.1. A P2P JMS client example

package org.jboss.chap6.ex1; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageListener; import javax.jms.Queue; import javax.jms.QueueConnection; import javax.jms.QueueConnectionFactory; import javax.jms.QueueReceiver; import javax.jms.QueueSender; import javax.jms.QueueSession; import javax.jms.TextMessage; import javax.naming.InitialContext; import javax.naming.NamingException; import EDU.oswego.cs.dl.util.concurrent.CountDown; import org.apache.log4j.Logger; import org.jboss.util.ChapterExRepository; /** * A complete JMS client example program that sends a * TextMessage to a Queue and asynchronously receives the * message from the same Queue. * * @author Scott.Stark@jboss.org * @version $Revision: 1.10 $ */ public class SendRecvClient { static Logger log; static CountDown done = new CountDown(1); QueueConnection conn; QueueSession session; Queue que; public static class ExListener implements MessageListener { public void onMessage(Message msg) { done.release(); TextMessage tm = (TextMessage) msg; try { log.info("onMessage, recv text=" + tm.getText()); } catch(Throwable t) { t.printStackTrace(); } } } public void setupPTP() throws JMSException, NamingException { InitialContext iniCtx = new InitialContext(); Object tmp = iniCtx.lookup("ConnectionFactory"); QueueConnectionFactory qcf = (QueueConnectionFactory) tmp; conn = qcf.createQueueConnection(); que = (Queue) iniCtx.lookup("queue/testQueue"); session = conn.createQueueSession(false, QueueSession.AUTO_ACKNOWLEDGE); conn.start(); } public void sendRecvAsync(String text) throws JMSException, NamingException { log.info("Begin sendRecvAsync"); // Setup the PTP connection, session setupPTP(); // Set the async listener QueueReceiver recv = session.createReceiver(que); recv.setMessageListener(new ExListener()); // Send a text msg QueueSender send = session.createSender(que); TextMessage tm = session.createTextMessage(text); send.send(tm); log.info("sendRecvAsync, sent text=" + tm.getText()); send.close(); log.info("End sendRecvAsync"); } public void stop() throws JMSException { conn.stop(); session.close(); conn.close(); } public static void main(String args[]) throws Exception { ChapterExRepository.init(SendRecvClient.class); log = Logger.getLogger("SendRecvClient"); log.info("Begin SendRecvClient, now=" + System.currentTimeMillis()); SendRecvClient client = new SendRecvClient(); client.sendRecvAsync("A text msg"); client.done.acquire(); client.stop(); log.info("End SendRecvClient"); System.exit(0); } }

The client may be run using the following command line:

[examples]$ ant -Dchap=chap6 -Dex=1p2p run-example ... run-example1p2p: [java] [INFO,SendRecvClient] Begin SendRecvClient, now=1102808673386 [java] [INFO,SendRecvClient] Begin sendRecvAsync [java] [INFO,SendRecvClient] onMessage, recv text=A text msg [java] [INFO,SendRecvClient] sendRecvAsync, sent text=A text msg [java] [INFO,SendRecvClient] End sendRecvAsync [java] [INFO,SendRecvClient] End SendRecvClient

6.1.1. A Point-To-Point Example

The JMS publish/subscribe (Pub-Sub) message model is a one-to-many model. A publisher sends a message to a topic and all active subscribers of the topic receive the message. Subscribers that are not actively listening to the topic will miss the published message. shows a complete JMS client that sends a to a topic and asynchronously receives the message from the same topic.

Example 6.2. A Pub-Sub JMS client example

package org.jboss.chap6.ex1; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageListener; import javax.jms.Topic; import javax.jms.TopicConnection; import javax.jms.TopicConnectionFactory; import javax.jms.TopicPublisher; import javax.jms.TopicSubscriber; import javax.jms.TopicSession; import javax.jms.TextMessage; import javax.naming.InitialContext; import javax.naming.NamingException; import EDU.oswego.cs.dl.util.concurrent.CountDown; /** * A complete JMS client example program that sends a TextMessage to * a Topic and asynchronously receives the message from the same * Topic. * * @author Scott.Stark@jboss.org * @version $Revision: 1.10 $ */ public class TopicSendRecvClient { static CountDown done = new CountDown(1); TopicConnection conn = null; TopicSession session = null; Topic topic = null; public static class ExListener implements MessageListener { public void onMessage(Message msg) { done.release(); TextMessage tm = (TextMessage) msg; try { System.out.println("onMessage, recv text=" + tm.getText()); } catch(Throwable t) { t.printStackTrace(); } } } public void setupPubSub() throws JMSException, NamingException { InitialContext iniCtx = new InitialContext(); Object tmp = iniCtx.lookup("ConnectionFactory"); TopicConnectionFactory tcf = (TopicConnectionFactory) tmp; conn = tcf.createTopicConnection(); topic = (Topic) iniCtx.lookup("topic/testTopic"); session = conn.createTopicSession(false, TopicSession.AUTO_ACKNOWLEDGE); conn.start(); } public void sendRecvAsync(String text) throws JMSException, NamingException { System.out.println("Begin sendRecvAsync"); // Setup the PubSub connection, session setupPubSub(); // Set the async listener TopicSubscriber recv = session.createSubscriber(topic); recv.setMessageListener(new ExListener()); // Send a text msg TopicPublisher send = session.createPublisher(topic); TextMessage tm = session.createTextMessage(text); send.publish(tm); System.out.println("sendRecvAsync, sent text=" + tm.getText()); send.close(); System.out.println("End sendRecvAsync"); } public void stop() throws JMSException { conn.stop(); session.close(); conn.close(); } public static void main(String args[]) throws Exception { System.out.println("Begin TopicSendRecvClient, now=" + System.currentTimeMillis()); TopicSendRecvClient client = new TopicSendRecvClient(); client.sendRecvAsync("A text msg, now="+System.currentTimeMillis()); client.done.acquire(); client.stop(); System.out.println("End TopicSendRecvClient"); System.exit(0); } }

The client may be run using the following command line:

[examples]$ ant -Dchap=chap6 -Dex=1ps run-example ... run-example1ps: [java] Begin TopicSendRecvClient, now=1102809427043 [java] Begin sendRecvAsync [java] onMessage, recv text=A text msg, now=1102809427071 [java] sendRecvAsync, sent text=A text msg, now=1102809427071 [java] End sendRecvAsync [java] End TopicSendRecvClient

Now let's break the publisher and subscribers into separate programs to demonstrate that subscribers only receive messages while they are listening to a topic. Example 6.3, “A JMS publisher client” shows a variation of the previous pub-sub client that only publishes messages to the topic. The subscriber only client is shown in Example 6.3, “A JMS publisher client”.

Example 6.3. A JMS publisher client

package org.jboss.chap6.ex1; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageListener; import javax.jms.Topic; import javax.jms.TopicConnection; import javax.jms.TopicConnectionFactory; import javax.jms.TopicPublisher; import javax.jms.TopicSlistubscriber; import javax.jms.TopicSession; import javax.jms.TextMessage; import javax.naming.InitialContext; import javax.naming.NamingException; /** * A JMS client example program that sends a TextMessage to a Topic * * @author Scott.Stark@jboss.org * @version $Revision: 1.10 $ */ public class TopicSendClient { TopicConnection conn = null; TopicSession session = null; Topic topic = null; public void setupPubSub() throws JMSException, NamingException { InitialContext iniCtx = new InitialContext(); Object tmp = iniCtx.lookup("ConnectionFactory"); TopicConnectionFactory tcf = (TopicConnectionFactory) tmp; conn = tcf.createTopicConnection(); topic = (Topic) iniCtx.lookup("topic/testTopic"); session = conn.createTopicSession(false, TopicSession.AUTO_ACKNOWLEDGE); conn.start(); } public void sendAsync(String text) throws JMSException, NamingException { System.out.println("Begin sendAsync"); // Setup the pub/sub connection, session setupPubSub(); // Send a text msg TopicPublisher send = session.createPublisher(topic); TextMessage tm = session.createTextMessage(text); send.publish(tm); System.out.println("sendAsync, sent text=" + tm.getText()); send.close(); System.out.println("End sendAsync"); } public void stop() throws JMSException { conn.stop(); session.close(); conn.close(); } public static void main(String args[]) throws Exception { System.out.println("Begin TopicSendClient, now=" + System.currentTimeMillis()); TopicSendClient client = new TopicSendClient(); client.sendAsync("A text msg, now="+System.currentTimeMillis()); client.stop(); System.out.println("End TopicSendClient"); System.exit(0); } }

Example 6.4. A JMS subscriber client

package org.jboss.chap6.ex1; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageListener; import javax.jms.Topic; import javax.jms.TopicConnection; import javax.jms.TopicConnectionFactory; import javax.jms.TopicPublisher; import javax.jms.TopicSubscriber; import javax.jms.TopicSession; import javax.jms.TextMessage; import javax.naming.InitialContext; import javax.naming.NamingException; /** * A JMS client example program that synchronously receives a message a Topic * * @author Scott.Stark@jboss.org * @version $Revision: 1.10 $ */ public class TopicRecvClient { TopicConnection conn = null; TopicSession session = null; Topic topic = null; public void setupPubSub() throws JMSException, NamingException { InitialContext iniCtx = new InitialContext(); Object tmp = iniCtx.lookup("ConnectionFactory"); TopicConnectionFactory tcf = (TopicConnectionFactory) tmp; conn = tcf.createTopicConnection(); topic = (Topic) iniCtx.lookup("topic/testTopic"); session = conn.createTopicSession(false, TopicSession.AUTO_ACKNOWLEDGE); conn.start(); } public void recvSync() throws JMSException, NamingException { System.out.println("Begin recvSync"); // Setup the pub/sub connection, session setupPubSub(); // Wait upto 5 seconds for the message TopicSubscriber recv = session.createSubscriber(topic); Message msg = recv.receive(5000); if (msg == null) { System.out.println("Timed out waiting for msg"); } else { System.out.println("TopicSubscriber.recv, msgt="+msg); } } public void stop() throws JMSException { conn.stop(); session.close(); conn.close(); } public static void main(String args[]) throws Exception { System.out.println("Begin TopicRecvClient, now=" + System.currentTimeMillis()); TopicRecvClient client = new TopicRecvClient(); client.recvSync(); client.stop(); System.out.println("End TopicRecvClient"); System.exit(0); } }

Run the followed by the as follows:

[examples]$ ant -Dchap=chap6 -Dex=1ps2 run-example ... run-example1ps2: [java] Begin TopicSendClient, now=1102810007899 [java] Begin sendAsync [java] sendAsync, sent text=A text msg, now=1102810007909 [java] End sendAsync [java] End TopicSendClient [java] Begin TopicRecvClient, now=1102810011524 [java] Begin recvSync [java] Timed out waiting for msg [java] End TopicRecvClient

The output shows that the topic subscriber client () fails to receive the message sent by the publisher due to a timeout.

6.1.2. A Pub-Sub Example

JMS supports a messaging model that is a cross between the P2P and pub-sub models. When a pub-sub client wants to receive all messages posted to the topic it subscribes to even when it is not actively listening to the topic, the client may achieve this behavior using a durable topic. Let's look at a variation of the preceding subscriber client that uses a durable topic to ensure that it receives all messages, include those published when the client is not listening to the topic. Example 6.5, “A durable topic JMS client example” shows the durable topic client with the key differences between the Example 6.4, “A JMS subscriber client” client highlighted in bold.

Example 6.5. A durable topic JMS client example

package org.jboss.chap6.ex1; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageListener; import javax.jms.Topic; import javax.jms.TopicConnection; import javax.jms.TopicConnectionFactory; import javax.jms.TopicPublisher; import javax.jms.TopicSubscriber; import javax.jms.TopicSession; import javax.jms.TextMessage; import javax.naming.InitialContext; import javax.naming.NamingException; /** * A JMS client example program that synchronously receives a message a Topic * * @author Scott.Stark@jboss.org * @version $Revision: 1.10 $ */ public class DurableTopicRecvClient { TopicConnection conn = null; TopicSession session = null; Topic topic = null; public void setupPubSub() throws JMSException, NamingException { InitialContext iniCtx = new InitialContext(); Object tmp = iniCtx.lookup("ConnectionFactory"); TopicConnectionFactory tcf = (TopicConnectionFactory) tmp; conn = tcf.createTopicConnection("john", "needle"); topic = (Topic) iniCtx.lookup("topic/testTopic"); session = conn.createTopicSession(false, TopicSession.AUTO_ACKNOWLEDGE); conn.start(); } public void recvSync() throws JMSException, NamingException { System.out.println("Begin recvSync"); // Setup the pub/sub connection, session setupPubSub(); // Wait upto 5 seconds for the message TopicSubscriber recv = session.createDurableSubscriber(topic, "chap6-ex1dtps"); Message msg = recv.receive(5000); if (msg == null) { System.out.println("Timed out waiting for msg"); } else { System.out.println("DurableTopicRecvClient.recv, msgt=" + msg); } } public void stop() throws JMSException { conn.stop(); session.close(); conn.close(); } public static void main(String args[]) throws Exception { System.out.println("Begin DurableTopicRecvClient, now=" + System.currentTimeMillis()); DurableTopicRecvClient client = new DurableTopicRecvClient(); client.recvSync(); client.stop(); System.out.println("End DurableTopicRecvClient"); System.exit(0); } }

Now run the previous topic publisher with the durable topic subscriber as follows:

[examples]$ ant -Dchap=chap6 -Dex=1psdt run-example ... run-example1psdt: [java] Begin DurableTopicSetup [java] End DurableTopicSetup [java] Begin TopicSendClient, now=1102899834273 [java] Begin sendAsync [java] sendAsync, sent text=A text msg, now=1102899834345 [java] End sendAsync [java] End TopicSendClient [java] Begin DurableTopicRecvClient, now=1102899840043 [java] Begin recvSync [java] DurableTopicRecvClient.recv, msgt=SpyTextMessage { [java] Header { [java] jmsDestination : TOPIC.testTopic.DurableSubscription[ clientId=DurableSubscriberExample name=chap6-ex1dtps selector=null] [java] jmsDeliveryMode : 2 [java] jmsExpiration : 0 [java] jmsPriority : 4 [java] jmsMessageID : ID:3-11028998375501 [java] jmsTimeStamp : 1102899837550 [java] jmsCorrelationID: null [java] jmsReplyTo : null [java] jmsType : null [java] jmsRedelivered : false [java] jmsProperties : {} [java] jmsPropReadWrite: false [java] msgReadOnly : true [java] producerClientId: ID:3 [java] } [java] Body { [java] text :A text msg, now=1102899834345 [java] } [java] } [java] End DurableTopicRecvClient

Items of note for the durable topic example include:

  • The creation in the durable topic client used a username and password, and the creation was done using the method. This is a requirement of durable topic subscribers. The messaging server needs to know what client is requesting the durable topic and what the name of the durable topic subscription is. We will discuss the details of durable topic setup in the configuration section.

  • An client was run prior to the . The reason for this is a durable topic subscriber must have registered a subscription at some point in the past in order for the messaging server to save messages. JBoss supports dynamic durable topic subscribers and the client simply creates a durable subscription receiver and the exits. This leaves an active durable topic subscriber on the and the messaging server knows that any messages posted to this topic must be saved for latter delivery.

  • The does not change for the durable topic. The notion of a durable topic is a subscriber only notion.

  • The sees the message published to the even though it was not listening to the topic at the time the message was published.

6.1.3. A Pub-Sub With Durable Topic Example

Example 6.6, “A TextMessage processing MDB” shows an message driven bean (MDB) that transforms the it receives and sends the transformed messages to the queue found in the incoming message header.

Example 6.6. A TextMessage processing MDB

package org.jboss.chap6.ex2; import javax.ejb.MessageDrivenBean; import javax.ejb.MessageDrivenContext; import javax.ejb.EJBException; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageListener; import javax.jms.Queue; import javax.jms.QueueConnection; import javax.jms.QueueConnectionFactory; import javax.jms.QueueSender; import javax.jms.QueueSession; import javax.jms.TextMessage; import javax.naming.InitialContext; import javax.naming.NamingException; /** * An MDB that transforms the TextMessages it receives and send the * transformed messages to the Queue found in the incoming message * JMSReplyTo header. * * @author Scott.Stark@jboss.org * @version $Revision: 1.10 $ */ public class TextMDB implements MessageDrivenBean, MessageListener { private MessageDrivenContext ctx = null; private QueueConnection conn; private QueueSession session; public TextMDB() { System.out.println("TextMDB.ctor, this="+hashCode()); } public void setMessageDrivenContext(MessageDrivenContext ctx) { this.ctx = ctx; System.out.println("TextMDB.setMessageDrivenContext, this=" + hashCode()); } public void ejbCreate() { System.out.println("TextMDB.ejbCreate, this="+hashCode()); try { setupPTP(); } catch (Exception e) { throw new EJBException("Failed to init TextMDB", e); } } public void ejbRemove() { System.out.println("TextMDB.ejbRemove, this="+hashCode()); ctx = null; try { if (session != null) { session.close(); } if (conn != null) { conn.close(); } } catch(JMSException e) { e.printStackTrace(); } } public void onMessage(Message msg) { System.out.println("TextMDB.onMessage, this="+hashCode()); try { TextMessage tm = (TextMessage) msg; String text = tm.getText() + "processed by: "+hashCode(); Queue dest = (Queue) msg.getJMSReplyTo(); sendReply(text, dest); } catch(Throwable t) { t.printStackTrace(); } } private void setupPTP() throws JMSException, NamingException { InitialContext iniCtx = new InitialContext(); Object tmp = iniCtx.lookup("java:comp/env/jms/QCF"); QueueConnectionFactory qcf = (QueueConnectionFactory) tmp; conn = qcf.createQueueConnection(); session = conn.createQueueSession(false, QueueSession.AUTO_ACKNOWLEDGE); conn.start(); } private void sendReply(String text, Queue dest) throws JMSException { System.out.println("TextMDB.sendReply, this=" + hashCode() + ", dest="+dest); QueueSender sender = session.createSender(dest); TextMessage tm = session.createTextMessage(text); sender.send(tm); sender.close(); } }

The MDB and deployment descriptors are shown in Example 6.7, “The MDB ejb-jar.xml descriptor” and Example 6.8, “The MDB jboss.xml descriptor”.

Example 6.7. The MDB ejb-jar.xml descriptor

<?xml version="1.0"?> <!DOCTYPE ejb-jar PUBLIC "-//Sun Microsystems, Inc.//DTD Enterprise JavaBeans 2.0//EN" "http://java.sun.com/dtd/ejb-jar_2_0.dtd"> <ejb-jar> <enterprise-beans> <message-driven> <ejb-name>TextMDB</ejb-name> <ejb-class>org.jboss.chap6.ex2.TextMDB</ejb-class> <transaction-type>Container</transaction-type> <acknowledge-mode>AUTO_ACKNOWLEDGE</acknowledge-mode> <message-driven-destination> <destination-type>javax.jms.Queue</destination-type> </message-driven-destination> <res-ref-name>jms/QCF</res-ref-name> <resource-ref> <res-type>javax.jms.QueueConnectionFactory</res-type> <res-auth>Container</res-auth> </resource-ref> </message-driven> </enterprise-beans> </ejb-jar>

Example 6.8. The MDB jboss.xml descriptor

<?xml version="1.0"?> <jboss> <enterprise-beans> <message-driven> <ejb-name>TextMDB</ejb-name> <destination-jndi-name>queue/B</destination-jndi-name> <resource-ref> <res-ref-name>jms/QCF</res-ref-name> <jndi-name>ConnectionFactory</jndi-name> </resource-ref> </message-driven> </enterprise-beans> </jboss>

Example 6.9, “A JMS client that interacts with the TextMDB” shows a variation of the P2P client that sends several messages to the destination and asynchronously receives the messages as modified by from queue .

Example 6.9. A JMS client that interacts with the TextMDB

package org.jboss.chap6.ex2; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageListener; import javax.jms.Queue; import javax.jms.QueueConnection; import javax.jms.QueueConnectionFactory; import javax.jms.QueueReceiver; import javax.jms.QueueSender; import javax.jms.QueueSession; import javax.jms.TextMessage; import javax.naming.InitialContext; import javax.naming.NamingException; import EDU.oswego.cs.dl.util.concurrent.CountDown; /** * A complete JMS client example program that sends N TextMessages to * a Queue B and asynchronously receives the messages as modified by * TextMDB from Queue A. * * @author Scott.Stark@jboss.org * @version $Revision: 1.10 $ */ public class SendRecvClient { static final int N = 10; static CountDown done = new CountDown(N); QueueConnection conn; QueueSession session; Queue queA; Queue queB; public static class ExListener implements MessageListener { public void onMessage(Message msg) { done.release(); TextMessage tm = (TextMessage) msg; try { System.out.println("onMessage, recv text="+tm.getText()); } catch(Throwable t) { t.printStackTrace(); } } } public void setupPTP() throws JMSException, NamingException { InitialContext iniCtx = new InitialContext(); Object tmp = iniCtx.lookup("ConnectionFactory"); QueueConnectionFactory qcf = (QueueConnectionFactory) tmp; conn = qcf.createQueueConnection(); queA = (Queue) iniCtx.lookup("queue/A"); queB = (Queue) iniCtx.lookup("queue/B"); session = conn.createQueueSession(false, QueueSession.AUTO_ACKNOWLEDGE); conn.start(); } public void sendRecvAsync(String textBase) throws JMSException, NamingException, InterruptedException { System.out.println("Begin sendRecvAsync"); // Setup the PTP connection, session setupPTP(); // Set the async listener for queA QueueReceiver recv = session.createReceiver(queA); recv.setMessageListener(new ExListener()); // Send a few text msgs to queB QueueSender send = session.createSender(queB); for(int m = 0; m < 10; m ++) { TextMessage tm = session.createTextMessage(textBase+"#"+m); tm.setJMSReplyTo(queA); send.send(tm); System.out.println("sendRecvAsync, sent text=" + tm.getText()); } System.out.println("End sendRecvAsync"); } public void stop() throws JMSException { conn.stop(); session.close(); conn.close(); } public static void main(String args[]) throws Exception { System.out.println("Begin SendRecvClient,now=" + System.currentTimeMillis()); SendRecvClient client = new SendRecvClient(); client.sendRecvAsync("A text msg"); client.done.acquire(); client.stop(); System.exit(0); System.out.println("End SendRecvClient"); } }

Run the client as follows:

[examples]$ ant -Dchap=chap6 -Dex=2 run-example ... run-example2: [copy] Copying 1 file to /tmp/jboss-4.0.2/server/default/deploy [echo] Waiting 5 seconds for deploy... [java] Begin SendRecvClient, now=1102900541558 [java] Begin sendRecvAsync [java] sendRecvAsync, sent text=A text msg#0 [java] sendRecvAsync, sent text=A text msg#1 [java] sendRecvAsync, sent text=A text msg#2 [java] sendRecvAsync, sent text=A text msg#3 [java] sendRecvAsync, sent text=A text msg#4 [java] sendRecvAsync, sent text=A text msg#5 [java] sendRecvAsync, sent text=A text msg#6 [java] sendRecvAsync, sent text=A text msg#7 [java] sendRecvAsync, sent text=A text msg#8 [java] sendRecvAsync, sent text=A text msg#9 [java] End sendRecvAsync [java] onMessage, recv text=A text msg#0processed by: 12855623 [java] onMessage, recv text=A text msg#5processed by: 9399816 [java] onMessage, recv text=A text msg#9processed by: 6598158 [java] onMessage, recv text=A text msg#3processed by: 8153998 [java] onMessage, recv text=A text msg#4processed by: 10118602 [java] onMessage, recv text=A text msg#2processed by: 1792333 [java] onMessage, recv text=A text msg#7processed by: 14251014 [java] onMessage, recv text=A text msg#1processed by: 10775981 [java] onMessage, recv text=A text msg#8processed by: 6056676 [java] onMessage, recv text=A text msg#6processed by: 15679078

The corresponding JBoss server console output is:

19:15:40,232 INFO [EjbModule] Deploying TextMDB 19:15:41,498 INFO [EJBDeployer] Deployed: file:/private/tmp/jboss-4.0.2/server/default/deplo y/chap6-ex2.jar 19:15:45,606 INFO [TextMDB] TextMDB.ctor, this=10775981 19:15:45,620 INFO [TextMDB] TextMDB.ctor, this=1792333 19:15:45,627 INFO [TextMDB] TextMDB.setMessageDrivenContext, this=10775981 19:15:45,638 INFO [TextMDB] TextMDB.ejbCreate, this=10775981 19:15:45,640 INFO [TextMDB] TextMDB.setMessageDrivenContext, this=1792333 19:15:45,640 INFO [TextMDB] TextMDB.ejbCreate, this=1792333 19:15:45,649 INFO [TextMDB] TextMDB.ctor, this=12855623 19:15:45,658 INFO [TextMDB] TextMDB.setMessageDrivenContext, this=12855623 19:15:45,661 INFO [TextMDB] TextMDB.ejbCreate, this=12855623 19:15:45,742 INFO [TextMDB] TextMDB.ctor, this=8153998 19:15:45,744 INFO [TextMDB] TextMDB.setMessageDrivenContext, this=8153998 19:15:45,744 INFO [TextMDB] TextMDB.ejbCreate, this=8153998 19:15:45,763 INFO [TextMDB] TextMDB.ctor, this=10118602 19:15:45,764 INFO [TextMDB] TextMDB.setMessageDrivenContext, this=10118602 19:15:45,764 INFO [TextMDB] TextMDB.ejbCreate, this=10118602 19:15:45,777 INFO [TextMDB] TextMDB.ctor, this=9399816 19:15:45,779 INFO [TextMDB] TextMDB.setMessageDrivenContext, this=9399816 19:15:45,779 INFO [TextMDB] TextMDB.ejbCreate, this=9399816 19:15:45,792 INFO [TextMDB] TextMDB.ctor, this=15679078 19:15:45,798 INFO [TextMDB] TextMDB.setMessageDrivenContext, this=15679078 19:15:45,799 INFO [TextMDB] TextMDB.ejbCreate, this=15679078 19:15:45,815 INFO [TextMDB] TextMDB.ctor, this=14251014 19:15:45,816 INFO [TextMDB] TextMDB.setMessageDrivenContext, this=14251014 19:15:45,817 INFO [TextMDB] TextMDB.ejbCreate, this=14251014 19:15:45,829 INFO [TextMDB] TextMDB.ctor, this=6056676 19:15:45,831 INFO [TextMDB] TextMDB.setMessageDrivenContext, this=6056676 19:15:45,864 INFO [TextMDB] TextMDB.ctor, this=6598158 19:15:45,903 INFO [TextMDB] TextMDB.ejbCreate, this=6056676 19:15:45,906 INFO [TextMDB] TextMDB.setMessageDrivenContext, this=6598158 19:15:45,906 INFO [TextMDB] TextMDB.ejbCreate, this=6598158 19:15:46,236 INFO [TextMDB] TextMDB.onMessage, this=12855623 19:15:46,238 INFO [TextMDB] TextMDB.sendReply, this=12855623, dest=QUEUE.A 19:15:46,734 INFO [TextMDB] TextMDB.onMessage, this=9399816 19:15:46,736 INFO [TextMDB] TextMDB.onMessage, this=8153998 19:15:46,737 INFO [TextMDB] TextMDB.onMessage, this=6598158 19:15:46,768 INFO [TextMDB] TextMDB.sendReply, this=9399816, dest=QUEUE.A 19:15:46,768 INFO [TextMDB] TextMDB.sendReply, this=6598158, dest=QUEUE.A 19:15:46,774 INFO [TextMDB] TextMDB.sendReply, this=8153998, dest=QUEUE.A 19:15:46,903 INFO [TextMDB] TextMDB.onMessage, this=10118602 19:15:46,904 INFO [TextMDB] TextMDB.sendReply, this=10118602, dest=QUEUE.A 19:15:46,927 INFO [TextMDB] TextMDB.onMessage, this=1792333 19:15:46,928 INFO [TextMDB] TextMDB.sendReply, this=1792333, dest=QUEUE.A 19:15:47,002 INFO [TextMDB] TextMDB.onMessage, this=14251014 19:15:47,007 INFO [TextMDB] TextMDB.sendReply, this=14251014, dest=QUEUE.A 19:15:47,051 INFO [TextMDB] TextMDB.onMessage, this=10775981 19:15:47,051 INFO [TextMDB] TextMDB.sendReply, this=10775981, dest=QUEUE.A 19:15:47,060 INFO [TextMDB] TextMDB.onMessage, this=6056676 19:15:47,061 INFO [TextMDB] TextMDB.sendReply, this=6056676, dest=QUEUE.A 19:15:47,064 INFO [TextMDB] TextMDB.onMessage, this=15679078 19:15:47,065 INFO [TextMDB] TextMDB.sendReply, this=15679078, dest=QUEUE.A

Items of note in this example include:

  • The JMS client has no explicit knowledge that it is dealing with an MDB. The client simply uses the standard JMS APIs to send messages to a queue and receive messages from another queue.

  • The MDB declares whether it will listen to a queue or topic in the descriptor. The name of the queue or topic must be specified using a descriptor. In this example the MDB also sends messages to a JMS queue. MDBs may act as queue senders or topic publishers within their callback.

  • The messages received by the client include a "processed by: NNN" suffix, where NNN is the value of the MDB instance that processed the message. This shows that many MDBs may actively process messages posted to a destination. Concurrent processing is one of the benefits of MDBs.

6.1.4. A Point-To-Point With MDB Example

JBossMQ is composed of several services working together to provide JMS API level services to client applications. The services that make up the JBossMQ JMS implementation are introduced in this section.

The Invocation Layer (IL) services are responsible for handling the communication protocols that clients use to send and receive messages. JBossMQ can support running different types of Invocation Layers concurrently. All Invocation Layers support bidirectional communication which allows clients to send and receive messages concurrently. ILs only handle the transport details of messaging. They delegate messages to the JMS server JMX gateway service known as the invoker. This is similar to how the detached invokers expose the EJB container via different transports.

Each IL service binds a JMS connection factory to a specific location in the JNDI tree. Clients choose the protocol they wish to use by the JNDI location used to obtain the JMS connection factory. JBossMQ currently has several different invocation layers.

  • UIL2 IL: The Unified Invocation Layer version 2(UIL2) is the preferred invocation layer for remote messaging. A multiplexing layer is used to provide bidirectional communication. The multiplexing layer creates two virtual sockets over one physical socket. This allows communication with clients that cannot have a connection created from the server back to the client due to firewall or other restrictions. Unlike the older UIL invocation layer which used a blocking round-trip message at the socket level, the UIL2 protocol uses true asynchronous send and receive messaging at the transport level, providing for improved throughput and utilization.

  • JVM IL: The Java Virtual Machine (JVM) Invocation Layer was developed to cut out the TCP/IP overhead when the JMS client is running in the same JVM as the server. This IL uses direct method calls for the server to service the client requests. This increases efficiency since no sockets are created and there is no need for the associated worker threads. This is the IL that should be used by Message Driven Beans (MDB) or any other component that runs in the same virtual machine as the server such as servlets, MBeans, or EJBs.

  • HTTP IL: The HTTP Invocation Layer (HTTPIL) allows for accessing the JBossMQ service over the HTTP or HTTPS protocols. This IL relies on the servlet deployed in the to handle the http traffic. This IL is useful for access to JMS through a firewall when the only port allowed requires HTTP.

6.2.1. Invocation Layer

The JBossMQ is the service that enforces an access control list to guard access to your destinations. This subsystem works closely with the service.

6.2.2. Security Manager

The can be thought as being the central service in JBossMQ. It keeps track of all the destinations that have been created on the server. It also keeps track of the other key services such as the , , and .

6.2.3. Destination Manager

Messages created in the server are passed to the for memory management. JVM memory usage goes up as messages are added to a destination that does not have any receivers. These messages are held in the main memory until the receiver picks them up. If the notices that the JVM memory usage starts passing the defined limits, the starts moving those messages from memory to persistent storage on disk. The uses a least recently used (LRU) algorithm to determine which messages should go to disk.

6.2.4. Message Cache

The (SM) is in charge of keeping track of who is allowed to log into the server and what their durable subscriptions are.

6.2.5. State Manager

The (PM) is used by a destination to store messages marked as being persistent. JBossMQ has several different implementations of the persistent manager, but only one can be enabled per server instance. You should enable the persistence manager that best matches your requirements.

  • JDBC2 persistence manager: The JDBC2 persistence manager allows you to store persistent messages to a relational database using JDBC. The performance of this PM is directly related to the performance that can be obtained from the database. This PM has a very low memory overhead compared to the other persistence managers. Furthermore it is also highly integrated with the to provide efficient persistence on a system that has a very active .

  • Null Persistence Manager: A wrapper persistence manager that can delegate to a real persistence manager. Configuration on the destinations decide whether persistence and caching is actually performed. The example configuration can be found in . To use the null persistence manager backed by a real persistence manager, you need to change the of the real persistence manager and link the new name to the null persistence manager.

6.2.6. Persistence Manager

A destination is the object on the JBossMQ server that clients use to send and receive messages. There are two types of destination objects, and . References to the destinations created by JBossMQ are stored in JNDI.

Clients that are in the point-to-point paradigm typically use queues. They expect that message sent to a queue will be receive by only one other client once and only once. If multiple clients are receiving messages from a single queue, the messages will be load balanced across the receivers. Queue objects, by default, will be stored under the JNDI sub context.

Topics are used in the publish-subscribe paradigm. When a client publishes a message to a topic, he expects that a copy of the message will be delivered to each client that has subscribed to the topic. Topic messages are delivered in the same manner a television show is delivered. Unless you have the TV on and are watching the show, you will miss it. Similarly, if the client is not up, running and receiving messages from the topics, it will miss messages published to the topic. To get around this problem of missing messages, clients can start a durable subscription. This is like having a VCR record a show you cannot watch at its scheduled time so that you can see what you missed when you turn your TV back on.

6.2. JBoss Messaging Overview

This section defines the MBean services that correspond to the components introduced in the previous section along with their MBean attributes. The configuration and service files that make up the JBossMQ system include:

  • deploy/hsqldb-jdbc-state-service.xml: This configures the JDBC state service for storing state in the embedded Hypersonic database.

  • deploy/jms/hsqldb-jdbc2-service.xml: This service descriptor configures the , , and jdbc2 for the embedded Hypersonic database.

  • deploy/jms/jbossmq-destinations-service.xml: This service describes defines default JMS queue and topic destination configurations used by the testsuite unit tests. You can add/remove destinations to this file, or deploy another descriptor with the destination configurations.

  • jbossmq-httpil.sar: This SAR file configures the HTTP invocation layer.

  • deploy/jms/jbossmq-service.xml: This service descriptor configures the core JBossMQ MBeans like the , , , and core interceptor stack. It also defines the MDB default dead letter queue .

  • deploy/jms/jms-ds.xml: This is a JCA connection factory and JMS provider MDB integration services configuration which sets JBossMQ as the JMS provider.

  • deploy/jms/jms-ra.rar: This is a JCA resource adaptor for JMS providers.

  • deploy/jms/jvm-il-service.xml: This service descriptor configures the which provides the JVM IL transport.

  • deploy/jms/rmi-il-service.xml: This service descriptor configures the which provides the RMI IL. The queue and topic connection factory for this IL is bound under the name .

  • deploy/jms/uil2-service.xml: This service descriptor configures the which provides the UIL2 transport. The queue and topic connection factory for this IL is bound under the name as well as to replace the deprecated version 1 UIL service.

We will discuss the associated MBeans in the following subsections.

The MBean is used to configure the JVM IL. The configurable attributes are as follows:

  • Invoker: This attribute specifies JMX ObjectName of the JMS entry point service that is used to pass incoming requests to the JMS server. This is not something you would typically change from the setting unless you change the entry point service.

  • ConnectionFactoryJNDIRef: The JNDI location that this IL will bind a setup to use this IL.

  • XAConnectionFactoryJNDIRef: The JNDI location that this IL will bind a setup to use this IL.

  • PingPeriod: How often, in milliseconds, the client should send a ping message to the server to validate that the connection is still valid. If this is set to zero, then no ping message will be sent. Since it is impossible for JVM IL connection to go bad, it is recommended that you keep this set to 0.

6.3.1. org.jboss.mq.il.jvm.JVMServerILService

The is used to configure the UIL2 IL. The configurable attributes are as follows:

  • Invoker: This attribute specifies JMX of the JMS entry point service that is used to pass incoming requests to the JMS server. This is not something you would typically change from the setting unless you change the entry point service.

  • ConnectionFactoryJNDIRef: The JNDI location that this IL will bind a setup to use this IL.

  • XAConnectionFactoryJNDIRef: The JNDI location that this IL will bind a setup to use this IL.

  • PingPeriod: How often, in milliseconds, the client should send a ping message to the server to validate that the connection is still valid. If this is set to zero, then no ping message will be sent.

  • ReadTimeout: The period in milliseconds is passed onto as the value of the UIL2 socket. This allows detection of dead sockets that are not responsive and are not capable of receiving ping messages. Note that this setting should be longer in duration than the setting.

  • BufferSize: The size in bytes used as the buffer over the basic socket streams. This corresponds to the buffer size.

  • ChunkSize: The size in bytes between stream listener notifications. The UIL2 layer uses the and implementations that support the notion of a heartbeat that is triggered based on data read/written to the stream. Whenever bytes are read/written to a stream. This allows serves as a ping or keepalive notification when large reads or writes require a duration greater than the .

  • ServerBindPort: The protocol listening port for this IL. If not specified default is 0, which means that a random port will be chosen.

  • BindAddress: The specific address this IL listens on. This can be used on a multi-homed host for a that will only accept connection requests on one of its addresses.

  • EnableTcpNoDelay: causes TCP/IP packets to be sent as soon as the request is flushed. This may improve request response times. Otherwise request packets may be buffered by the operating system to create larger IP packets.

  • ServerSocketFactory: The implementation class name to use to create the service . If not specified the default factory will be obtained from .

  • ClientAddress: The address passed to the client as the address that should be used to connect to the server.

  • ClientSocketFactory: The implementation class name to use on the client. If not specified the default factory will be obtained from .

  • SecurityDomain: Specify the security domain name to use with JBoss SSL aware socket factories. This is the JNDI name of the security manager implementation as described for the element of the and descriptors in Section 8.3.1, “Enabling Declarative Security in JBoss Revisited”.

The UIL2 service support the use of SSL through custom socket factories that integrate JSSE using the security domain associated with the IL service. An example UIL2 service descriptor fragment that illustrates the use of the custom JBoss SSL socket factories is shown in Example 6.10, “An example UIL2 config fragment for using SSL”.

Example 6.10. An example UIL2 config fragment for using SSL

<mbean code="org.jboss.mq.il.uil2.UILServerILService" name="jboss.mq:service=InvocationLayer,type=HTTPSUIL2"> <depends optional-attribute-name="Invoker">jboss.mq:service=Invoker</depends> <attribute name="ConnectionFactoryJNDIRef">SSLConnectionFactory</attribute> <attribute name="XAConnectionFactoryJNDIRef">SSLXAConnectionFactory</attribute> <!-- ... --> <!-- SSL Socket Factories --> <attribute name="ClientSocketFactory"> org.jboss.security.ssl.ClientSocketFactory </attribute> <attribute name="ServerSocketFactory"> org.jboss.security.ssl.DomainServerSocketFactory </attribute> <!-- Security domain - see below --> <attribute name="SecurityDomain">java:/jaas/SSL</attribute> </mbean> <!-- Configures the keystore on the "SSL" security domain This mbean is better placed in conf/jboss-service.xml where it can be used by other services, but it will work from anywhere. Use keytool from the sdk to create the keystore. --> <mbean code="org.jboss.security.plugins.JaasSecurityDomain" name="jboss.security:service=JaasSecurityDomain,domain=SSL"> <!-- This must correlate with the java:/jaas/SSL above --> <constructor> <arg type="java.lang.String" value="SSL"/> </constructor> <!-- The location of the keystore resource: loads from the classpath and the server conf dir is a good default --> <attribute name="KeyStoreURL">resource:uil2.keystore</attribute> <attribute name="KeyStorePass">changeme</attribute> </mbean>

6.3.2.1. Configuring UIL2 for SSL

There are several system properties that a JMS client using the UIL2 transport can set to control the client connection back to the server

  • org.jboss.mq.il.uil2.useServerHost: This system property allows a client to connect to the server rather than the value. This will only make a difference if name resolution differs between the server and client environments.

  • org.jboss.mq.il.uil2.localAddr: This system property allows a client to define the local interface to which its sockets should be bound.

  • org.jboss.mq.il.uil2.localPort: This system property allows a client to define the local port to which its sockets should be bound

  • org.jboss.mq.il.uil2.serverAddr: This system property allows a client to override the address to which it attempts to connect to. This is useful for networks where NAT is occcurring between the client and JMS server.

  • org.jboss.mq.il.uil2.serverPort: This system property allows a client to override the port to which it attempts to connect. This is useful for networks where port forwarding is occurring between the client and jms server.

  • org.jboss.mq.il.uil2.retryCount: This system property controls the number of attempts to retry connecting to the JMS server. Retries are only made for failures. A value <= 0 means no retry attempts will be made.

  • org.jboss.mq.il.uil2.retryDelay: This system property controls the delay in milliseconds between retries due to failures.

6.3.2.2. JMS client properties for the UIL2 transport

6.3.2. org.jboss.mq.il.uil2.UILServerILService

The is used to manage the HTTP/S IL. This IL allows for the use of the JMS service over HTTP or HTTPS connections. The relies on the servlet deployed in the to handle the HTTP traffic. The configurable attributes are as follows:

  • TimeOut: The default timeout in seconds that the client HTTP requests will wait for messages. This can be overridden on the client by setting the system property to the number of seconds.

  • RestInterval: The number of seconds the client will sleep after each request. The default is 0, but you can set this value in conjunction with the value to implement a pure timed based polling mechanism. For example, you could simply do a short lived request by setting the value to 0 and then setting the to 60. This would cause the client to send a single non-blocking request to the server, return any messages if available, then sleep for 60 seconds, before issuing another request. Like the value, this can be explicitly overridden on a given client by specifying the with the number of seconds you wish to wait between requests.

  • URL: Set the servlet URL. This value takes precedence over any individual values set (i.e. the , , , etc.) It my be a actual URL or a property name which will be used on the client side to resolve the proper URL by calling . If not specified the URL will be formed from .

  • URLPrefix: The prefix portion of the servlet URL.

  • URLHostName: The hostname portion of the servlet URL.

  • URLPort: The port portion of the URL.

  • URLSuffix: The trailing path portion of the URL.

  • UseHostName: A flag that if set to true the default setting for the attribute will be taken from . If false the default setting for the attribute will be taken from .

6.3.3. org.jboss.mq.il.http.HTTPServerILService

The is used to pass IL requests down to the destination manager service through an interceptor stack. The configurable attributes are as follows:

  • NextInterceptor: The JMX of the next request interceptor. This attribute is used by all the interceptors to create the interceptor stack. The last interceptor in the chain should be the .

6.3.4. org.jboss.mq.server.jmx.Invoker

The is used to load a generic interceptor and make it part of the interceptor stack. This MBean is typically used to load custom interceptors like , which is can be used to efficiently log all client requests via trace level log messages. The configurable attributes are as follows:

  • NextInterceptor: The JMX of the next request interceptor. This attribute is used by all the interceptors to create the interceptor stack. The last interceptor in the chain should be the . This attribute should be setup via a XML tag.

  • InterceptorClass: The class name of the interceptor that will be loaded and made part of the interceptor stack. This class specified here must extend the class.

6.3.5. org.jboss.mq.server.jmx.InterceptorLoader

The MBean is used as the default state manager assigned to the DestinationManager service. It stores user and durable subscriber information in the database. The configurable attributes are as follows:

  • ConnectionManager: This is the of the datasource that the JDBC state manager will write to. For Hypersonic, it is .

  • SqlProperties: The define the SQL statements to be used to persist JMS state data. If the underlying database is changed, the SQL statements used may need to change.

6.3.6. org.jboss.mq.sm.jdbc.JDBCStateManager

If the is part of the interceptor stack, then it will enforce the access control lists assigned to the destinations. The uses JAAS, and as such requires that at application policy be setup for in the JBoss file. The default configuration is shown below.

<application-policy name="jbossmq"> <authentication> <login-module code="org.jboss.security.auth.spi.DatabaseServerLoginModule" flag="required"> <module-option name="unauthenticatedIdentity">guest</module-option> <module-option name="dsJndiName">java:/DefaultDS</module-option> <module-option name="principalsQuery">SELECT PASSWD FROM JMS_USERS WHERE USERID=?</module-option> <module-option name="rolesQuery">SELECT ROLEID, 'Roles' FROM JMS_ROLES WHERE USERID=?</module-option> </login-module> </authentication> </application-policy>

The configurable attributes of the SecurityManager are as follows:

  • NextInterceptor: The JMX of the next request interceptor. This attribute is used by all the interceptors to create the interceptor stack. The last interceptor in the chain should be the .

  • SecurityDomain: Specify the security domain name to use for authentication and role based authorization. This is the JNDI name of the JAAS domain to be used to perform authentication and authorization against.

  • DefaultSecurityConfig: This element specifies the default security configuration settings for destinations. This applies to temporary queues and topics as well as queues and topics that do not specifically specify a security configuration. The should declare some number of elements which represent each role that is allowed access to a destination. Each should have the following attributes:

    • name: The attribute defines the name of the role.

    • create: The attribute is a true/false value that indicates whether the role has the ability to create durable subscriptions on the topic.

    • read: The attribute is a true/false value that indicates whether the role can receive messages from the destination.

    • write: The attribute is a true/false value that indicates whether the role can send messages to the destination.

6.3.7. org.jboss.mq.security.SecurityManager

The must be the last interceptor in the interceptor stack. The configurable attributes are as follows:

  • PersistenceManager: The JMX of the persistence manager service the server should use.

  • StateManager: The JMX of the state manager service the server should use.

  • MessageCache: The JMX of the message cache service the server should use.

Additional read-only attributes and operations that support monitoring include:

  • ClientCount: The number of clients connected to the server.

  • Clients: A instances for the clients connected to the server.

  • MessageCounter: An array of org.jboss.mq.server.MessageCounter instances that provide statistics for a JMS destination.

  • listMessageCounter(): This operation generates an HTML table that contains:

    • Type: Either or indicating the destination type.

    • Name: The name of the destination.

    • Subscription: The subscription ID for a topic.

    • Durable: A boolean indicating if the topic subscription is durable.

    • Count: The number of message delivered to the destination.

    • CountDelta: The change in message count since the previous access of count.

    • Depth: The number of messages in the destination.

    • DepthDelta: The change in the number of messages in the destination since the previous access of depth.

    • Last Add: The date/time string in / format of the last time a message was added to the destination.

  • resetMessageCounter(): This zeros all destination counts and last added times.

Queues and topics can be created and destroyed at runtime through the MBean. The provides and operations for this. Both methods have a one argument version which takes the destination name and a two argument version which takes the destination and the JNDI name of the destination. Queues and topics can be removed using the and operations, both of which take a destination name is input.

6.3.8. org.jboss.mq.server.jmx.DestinationManager

The server determines when to move messages to secondary storage by using the MBean. The configurable attributes are as follows:

  • CacheStore: The JMX of the service that will act as the cache store. The cache store is used by the to move messages to persistent storage. The value you set here typically depends on the type of persistence manager you are using.

  • HighMemoryMark: The amount of JVM heap memory in megabytes that must be reached before the starts to move messages to secondary storage.

  • MaxMemoryMark: The maximum amount of JVM heap memory in megabytes that the considers to be the max memory mark. As memory usage approaches the max memory mark, the will move messages to persistent storage so that the number of messages kept in memory approaches zero.

  • MakeSoftReferences: This controls whether or not the message cache will keep soft references to messages that need to be removed. The default is true.

  • MinimumHard: The minimum number of the in memory cache. JBoss won't try to go below this number of messages in the cache. The default value is 1.

  • MaximumHard: The upper bound on the number of hard references to messages in the cache. JBoss will soften messages to reduce the number of hard references to this level. A value of 0 means that there is no size based upper bound. The default is 0.

  • SoftenWaitMillis: The maximum wait time before checking whether messages need softening. The default is 1000 milliseconds (1 second).

  • SoftenNoMoreOftenThanMillis: The minimum amount of time between checks to soften messages. A value of 0 means that this check should be skipped. The default is 0 milliseconds.

  • SoftenAtLeastEveryMillis: The maximum amount of time between checks to soften messages. A value of 0 means that this check should be skipped. The default is 0.

Additional read-only cache attribute that provide statistics include:

  • CacheHits: The number of times a hard referenced message was accessed

  • CacheMisses: The number of times a softened message was accessed.

  • HardRefCacheSize: The number of messages in the cache that are not softened.

  • SoftRefCacheSize: The number of messages that are currently softened.

  • SoftenedSize: The total number of messages softened since the last boot.

  • TotalCacheSize: The total number of messages that are being managed by the cache.

6.3.9. org.jboss.mq.server.MessageCache

The should be used as the persistence manager assigned to the if you wish to store messages in a database. This PM has been tested against the HypersonSQL, MS SQL, Oracle, MySQL and Postgres databases. The configurable attributes are as follows:

  • MessageCache: The JMX of the that has been assigned to the ..

  • ConnectionManager: The JMX of the JCA data source that will be used to obtain JDBC connections.

  • ConnectionRetryAttempts: An integer count used to allow the PM to retry attempts at getting a connection to the JDBC store. There is a 1500 millisecond delay between each connection failed connection attempt and the next attempt. This must be greater than or equal to 1 and defaults to 5.

  • SqlProperties: A property list is used to define the SQL Queries and other JDBC2 Persistence Manager options. You will need to adjust these properties if you which to run against another database other than Hypersonic. Example 6.11, “Default JDBC2 PeristenceManager SqlProperties” shows default setting for this attribute for the Hypersonic database.

Example 6.11. Default JDBC2 PeristenceManager SqlProperties

<attribute name="SqlProperties"> CREATE_TABLES_ON_STARTUP = TRUE CREATE_USER_TABLE = CREATE TABLE JMS_USERS \ (USERID VARCHAR(32) NOT NULL, PASSWD VARCHAR(32) NOT NULL, \ CLIENTID VARCHAR(128), PRIMARY KEY(USERID)) CREATE_ROLE_TABLE = CREATE TABLE JMS_ROLES \ (ROLEID VARCHAR(32) NOT NULL, USERID VARCHAR(32) NOT NULL, \ PRIMARY KEY(USERID, ROLEID)) CREATE_SUBSCRIPTION_TABLE = CREATE TABLE JMS_SUBSCRIPTIONS \ (CLIENTID VARCHAR(128) NOT NULL, \ SUBNAME VARCHAR(128) NOT NULL, TOPIC VARCHAR(255) NOT NULL, \ SELECTOR VARCHAR(255), PRIMARY KEY(CLIENTID, SUBNAME)) GET_SUBSCRIPTION = SELECT TOPIC, SELECTOR FROM JMS_SUBSCRIPTIONS \ WHERE CLIENTID=? AND SUBNAME=? LOCK_SUBSCRIPTION = SELECT TOPIC, SELECTOR FROM JMS_SUBSCRIPTIONS \ WHERE CLIENTID=? AND SUBNAME=? GET_SUBSCRIPTIONS_FOR_TOPIC = SELECT CLIENTID, SUBNAME, SELECTOR FROM JMS_SUBSCRIPTIONS WHERE TOPIC=? INSERT_SUBSCRIPTION = \ INSERT INTO JMS_SUBSCRIPTIONS (CLIENTID, SUBNAME, TOPIC, SELECTOR) VALUES(?,?,?,?) UPDATE_SUBSCRIPTION = \ UPDATE JMS_SUBSCRIPTIONS SET TOPIC=?, SELECTOR=? WHERE CLIENTID=? AND SUBNAME=? REMOVE_SUBSCRIPTION = DELETE FROM JMS_SUBSCRIPTIONS WHERE CLIENTID=? AND SUBNAME=? GET_USER_BY_CLIENTID = SELECT USERID, PASSWD, CLIENTID FROM JMS_USERS WHERE CLIENTID=? GET_USER = SELECT PASSWD, CLIENTID FROM JMS_USERS WHERE USERID=? POPULATE.TABLES.01 = INSERT INTO JMS_USERS (USERID, PASSWD) \ VALUES ('guest', 'guest') POPULATE.TABLES.02 = INSERT INTO JMS_USERS (USERID, PASSWD) \ VALUES ('j2ee', 'j2ee') POPULATE.TABLES.03 = INSERT INTO JMS_USERS (USERID, PASSWD, CLIENTID) \ VALUES ('john', 'needle', 'DurableSubscriberExample') POPULATE.TABLES.04 = INSERT INTO JMS_USERS (USERID, PASSWD) \ VALUES ('nobody', 'nobody') POPULATE.TABLES.05 = INSERT INTO JMS_USERS (USERID, PASSWD) \ VALUES ('dynsub', 'dynsub') POPULATE.TABLES.06 = INSERT INTO JMS_ROLES (ROLEID, USERID) \ VALUES ('guest','guest') POPULATE.TABLES.07 = INSERT INTO JMS_ROLES (ROLEID, USERID) \ VALUES ('j2ee','guest') POPULATE.TABLES.08 = INSERT INTO JMS_ROLES (ROLEID, USERID) \ VALUES ('john','guest') POPULATE.TABLES.09 = INSERT INTO JMS_ROLES (ROLEID, USERID) \ VALUES ('subscriber','john') POPULATE.TABLES.10 = INSERT INTO JMS_ROLES (ROLEID, USERID) \ VALUES ('publisher','john') POPULATE.TABLES.11 = INSERT INTO JMS_ROLES (ROLEID, USERID) \ VALUES ('publisher','dynsub') POPULATE.TABLES.12 = INSERT INTO JMS_ROLES (ROLEID, USERID) \ VALUES ('durpublisher','john') POPULATE.TABLES.13 = INSERT INTO JMS_ROLES (ROLEID, USERID) \ VALUES ('durpublisher','dynsub') POPULATE.TABLES.14 = INSERT INTO JMS_ROLES (ROLEID, USERID) \ VALUES ('noacc','nobody') </attribute>

Example 6.12, “A sample JDBC2 PeristenceManager SqlProperties for Oracle” shows an alternate setting for Oracle.

Example 6.12. A sample JDBC2 PeristenceManager SqlProperties for Oracle

<attribute name="SqlProperties"> BLOB_TYPE=BINARYSTREAM_BLOB INSERT_TX = INSERT INTO JMS_TRANSACTIONS (TXID) values(?) INSERT_MESSAGE = \ INSERT INTO JMS_MESSAGES (MESSAGEID, DESTINATION, MESSAGEBLOB, TXID, TXOP) \ VALUES(?,?,?,?,?) SELECT_ALL_UNCOMMITED_TXS = SELECT TXID FROM JMS_TRANSACTIONS SELECT_MAX_TX = SELECT MAX(TXID) FROM JMS_MESSAGES SELECT_MESSAGES_IN_DEST = \ SELECT MESSAGEID, MESSAGEBLOB FROM JMS_MESSAGES WHERE DESTINATION=? SELECT_MESSAGE = \ SELECT MESSAGEID, MESSAGEBLOB FROM JMS_MESSAGES WHERE MESSAGEID=? AND DESTINATION=? MARK_MESSAGE = \ UPDATE JMS_MESSAGES SET TXID=?, TXOP=? WHERE MESSAGEID=? AND DESTINATION=? UPDATE_MESSAGE = \ UPDATE JMS_MESSAGES SET MESSAGEBLOB=? WHERE MESSAGEID=? AND DESTINATION=? UPDATE_MARKED_MESSAGES = UPDATE JMS_MESSAGES SET TXID=?, TXOP=? WHERE TXOP=? UPDATE_MARKED_MESSAGES_WITH_TX = \ UPDATE JMS_MESSAGES SET TXID=?, TXOP=? WHERE TXOP=? AND TXID=? DELETE_MARKED_MESSAGES_WITH_TX = \ DELETE FROM JMS_MESSAGES MESS WHERE TXOP=:1 AND EXISTS \ (SELECT TXID FROM JMS_TRANSACTIONS TX WHERE TX.TXID = MESS.TXID) DELETE_TX = DELETE FROM JMS_TRANSACTIONS WHERE TXID = ? DELETE_MARKED_MESSAGES = DELETE FROM JMS_MESSAGES WHERE TXID=? AND TXOP=? DELETE_TEMPORARY_MESSAGES = DELETE FROM JMS_MESSAGES WHERE TXOP='T' DELETE_MESSAGE = DELETE FROM JMS_MESSAGES WHERE MESSAGEID=? AND DESTINATION=? CREATE_MESSAGE_TABLE = CREATE TABLE JMS_MESSAGES ( MESSAGEID INTEGER NOT NULL, \ DESTINATION VARCHAR(255) NOT NULL, TXID INTEGER, TXOP CHAR(1), \ MESSAGEBLOB BLOB, PRIMARY KEY (MESSAGEID, DESTINATION) ) CREATE_IDX_MESSAGE_TXOP_TXID = \ CREATE INDEX JMS_MESSAGES_TXOP_TXID ON JMS_MESSAGES (TXOP, TXID) CREATE_IDX_MESSAGE_DESTINATION = \ CREATE INDEX JMS_MESSAGES_DESTINATION ON JMS_MESSAGES (DESTINATION) CREATE_TX_TABLE = CREATE TABLE JMS_TRANSACTIONS ( TXID INTEGER, PRIMARY KEY (TXID) ) CREATE_TABLES_ON_STARTUP = TRUE </attribute>

Additional examples can be found in the directory of the distribution.

6.3.10. org.jboss.mq.pm.jdbc2.PersistenceManager

This section describes the destination MBeans used in the and descriptors.

The is used to define a queue destination in JBoss. The following shows the configuration of one of the default JBoss queues.

<mbean code="org.jboss.mq.server.jmx.Queue" name="jboss.mq.destination:service=Queue,name=testQueue"> <depends optional-attribute-name="DestinationManager"> jboss.mq:service=DestinationManager </depends> <depends optional-attribute-name="SecurityManager"> jboss.mq:service=SecurityManager </depends> <attribute name="MessageCounterHistoryDayLimit">-1</attribute> <attribute name="SecurityConf"> <security> <role name="guest" read="true" write="true"/> <role name="publisher" read="true" write="true" create="false"/> <role name="noacc" read="false" write="false" create="false"/> </security> </attribute> </mbean>

The attribute of the JMX object name of this MBean is used to determine the destination name. For example. In the case of the queue we just looked at, the name of the queue is . The configurable attributes are as follows:

6.3.11.1. org.jboss.mq.server.jmx.Queue

6.3.11. Destination MBeans

6.3. JBoss Messaging Configuration and MBeans

RabbitMQ JMS Client

Introduction

RabbitMQ JMS Client is a client library for Pivotal RabbitMQ. RabbitMQ is not a JMS provider but includes a plugin needed to support the JMS Queue and Topic messaging models. JMS Client for RabbitMQ implements the JMS 1.1 specification on top of the RabbitMQ Java client, thus allowing new and existing JMS applications to connect to RabbitMQ.

The plugin and the JMS client are meant to work and be used together.

Components

To fully leverage JMS with RabbitMQ, you need the following components:

  • the JMS client library and its dependent libraries.
  • RabbitMQ JMS topic selector plugin that is included with RabbitMQ starting with version 3.6.3. To support message selectors for JMS topics, the RabbitMQ Topic Selector plugin must be installed on the RabbitMQ server. Message selectors allow a JMS application to filter messages using an expression based on SQL syntax. Message selectors for Queues are not currently supported.

JMS and AMQP 0-9-1

JMS is the standard messaging API for the JEE platform. It is available in commercial and open source implementations. Each implementation includes a JMS provider, a JMS client library, and additional, implementation-specific components for administering the messaging system. The JMS provider can be a standalone implementation of the messaging service, or a bridge to a non-JMS messaging system.

The JMS client API is standardized, so JMS applications are portable between vendors’ implementations. However, the underlying messaging implementation is unspecified, so there is no interoperability between JMS implementations. Java applications that want to share messaging must all use the same JMS implementation unless bridging technology exists. Furthermore, non-Java applications cannot access JMS without a vendor-specific JMS client library to enable interoperability.

AMQP 0-9-1 is a messaging protocol, rather than an API like JMS. Any client that implements the protocol can access a broker that supports AMQP 0-9-1. Protocol-level interoperability allows AMQP 0-9-1 clients written in any programming language and running on any operating system to participate in the messaging system with no need to bridge incompatible vendor implementations.

Because JMS Client for RabbitMQ is implemented using the RabbitMQ Java client, it is compliant with both the JMS API and the AMQP 0-9-1 protocol.

You can download the JMS 1.1 specification and API documentation from the Oracle Technology Network Web site.

Limitations

Some JMS 1.1 features are unsupported in the RabbitMQ JMS Client:

  • The JMS Client does not support server sessions.
  • XA transaction support interfaces are not implemented.
  • Topic selectors are supported with the RabbitMQ JMS topic selector plugin. Queue selectors are not yet implemented.
  • SSL and socket options for RabbitMQ connections are supported, but only using the (default) SSL connection protocols that the RabbitMQ client provides.
  • The JMS NoLocal subscription feature, which prevents delivery of messages published from a subscriber’s own connection, is not supported with RabbitMQ. You can call a method that includes the NoLocal argument, but it is ignored.

See the JMS API compliance documentation for a detailed list of supported JMS APIs.

Installing and Configuring

Enabling the Topic Selector Plug-in

The topic selector plugin is included with RabbitMQ. Like any RabbitMQ plugin, you need to enable the plugin in order to use it.

Enable the plugin using the rabbitmq-plugins command:

$ rabbitmq-plugins enable rabbitmq_jms_topic_exchange

You don't need to restart the broker to activate the plugin.

Enabling the JMS client in a Java container

To enable the JMS Client in a Java container (e.g. Java EE application server, web container), you must install the JMS client JAR files and its dependencies in the container and then define JMS resources in the container’s naming system so that JMS clients can look them up. The methods for accomplishing these tasks are container-specific, please refer to the vendors’ documentation.

For standalone applications, you need to add the JMS client JAR files and its dependencies to the application classpath. The JMS resources can be defined programmatically or through a dependency injection framework like Spring.

Defining the JMS Connection Factory

To define the JMS ConnectionFactory in JNDI, e.g. in Tomcat:

<Resource name="jms/ConnectionFactory" type="javax.jms.ConnectionFactory" factory="com.rabbitmq.jms.admin.RMQObjectFactory" username="guest" password="guest" virtualHost="/" host="localhost"/>

To define the JMS ConnectionFactory in JNDI, e.g. in WildFly (as of JMS Client 1.7.0):

<object-factory name="java:global/jms/ConnectionFactory" module="org.jboss.genericjms.provider" class="com.rabbitmq.jms.admin.RMQObjectFactory"> <environment> <property name="className" value="javax.jms.ConnectionFactory"/> <property name="username" value="guest"/> <property name="password" value="guest"/> <property name="virtualHost" value="/"/> <property name="host" value="localhost"/> </environment> </object-factory>

Here is the equivalent Spring bean example (Java configuration):

@Bean public ConnectionFactory jmsConnectionFactory() { RMQConnectionFactory connectionFactory = new RMQConnectionFactory(); connectionFactory.setUsername("guest"); connectionFactory.setPassword("guest"); connectionFactory.setVirtualHost("/"); connectionFactory.setHost("localhost"); return connectionFactory; }

And here is the Spring XML configuration:

<bean id="jmsConnectionFactory" class="com.rabbitmq.jms.admin.RMQConnectionFactory" > <property name="username" value="guest" /> <property name="password" value="guest" /> <property name="virtualHost" value="/" /> <property name="host" value="localhost" /> </bean>

The following table lists all of the attributes/properties that are available.

Attribute/PropertyJNDI only?Description
nameYesName in JNDI.
typeYesName of the JMS interface the object implements, usually javax.jms.ConnectionFactory. Other choices are javax.jms.QueueConnectionFactory and javax.jms.TopicConnectionFactory. You can also use the name of the (common) implementation class, com.rabbitmq.jms.admin.RMQConnectionFactory.
factoryYesJMS Client for RabbitMQ ObjectFactory class, always com.rabbitmq.jms.admin.RMQObjectFactory.
usernameNoName to use to authenticate a connection with the RabbitMQ broker. The default is "guest".
passwordNoPassword to use to authenticate a connection with the RabbitMQ broker. The default is "guest".
virtualHostNoRabbitMQ virtual host within which the application will operate. The default is "/".
hostNoHost on which RabbitMQ is running. The default is "localhost".
portNoRabbitMQ port used for connections. The default is "5672" unless this is an SSL connection, in which case the default is "5671".
sslNoWhether to use an SSL connection to RabbitMQ. The default is "false". See the useSslProtocol methods for more information.
uriNoThe AMQP 0-9-1 URI string used to establish a RabbitMQ connection. The value can encode the host, port, username, password and virtualHost in a single string. Both 'amqp' and 'amqps' schemes are accepted. Note: this property sets other properties and the set order is unspecified.
onMessageTimeoutMsNoHow long to wait for MessageListener#onMessage() to return, in milliseconds. Default is 2000 ms.
preferProducerMessagePropertyNoWhether MessageProducer properties (delivery mode, priority, TTL) take precedence over respective Message properties or not. Default is true (which is compliant to the JMS specification).
requeueOnMessageListenerExceptionNoWhether requeuing messages on a RuntimeException in the MessageListener or not. Default is false.

JMS and AMQP 0-9-1 Destination Interoperability

An interoperability feature allows you to define JMS 'amqp' destinations that read and/or write to non-JMS RabbitMQ resources.

A JMS destination can be defined so that a JMS application can send Messages to a predefined RabbitMQ 'destination' (exchange/routing key) using the JMS API in the normal way. The messages are written "in the clear," which means that any AMQP 0-9-1 client can read them without having to understand the internal format of Java JMS messages. Only BytesMessages and TextMessages can be written in this way.

Similarly, a JMS destination can be defined that reads messages from a predefined RabbitMQ queue. A JMS application can then read these messages using the JMS API. RabbitMQ JMS Client packs them up into JMS Messages automatically. Messages read in this way are, by default, BytesMessages, but individual messages can be marked TextMessage (by adding an AMQP message property called "JMSType" whose value is "TextMessage"), which will interpret the byte-array payload as a UTF8 encoded String and return them as TextMessages.

A single 'amqp' destination can be defined for both reading and writing.

When messages are sent to an 'amqp' Destination, JMS message properties are mapped onto AMQP 0-9-1 headers and properties as appropriate. For example, the JMSPriority property converts to the priority property for the AMQP 0-9-1 message. (It is also set as a header with the name "JMSPriority".) User-defined properties are set as named message header values, provided they are boolean, numeric or String types.

When reading from an 'amqp' Destination, values are mapped back to JMS message properties, except that any explicit JMS property set as a message header overrides the natural AMQP 0-9-1 header value, unless this would misrepresent the message. For example, JMSDeliveryMode cannot be overridden in this way.

JMS 'amqp' RMQDestination Constructor

The com.rabbitmq.jms.admin package contains the RMQDestination class, which implements Destination in the JMS interface. This is extended with a new constructor:

public RMQDestination(String destinationName, String amqpExchangeName, String amqpRoutingKey, String amqpQueueName);

This constructor creates a destination for JMS for RabbitMQ mapped onto an AMQP 0-9-1 resource. The parameters are the following:

  • destinationName - the name of the queue destination
  • amqpExchangeName - the exchange name for the mapped resource
  • amqpRoutingKey - the routing key for the mapped resource
  • amqpQueueName - the queue name of the mapped resource (to listen messages from)

Applications that declare destinations in this way can use them directly, or store them in a JNDI provider for JMS applications to retrieve. Such destinations are non-temporary, queue destinations.

JMS AMQP 0-9-1 Destination Definitions

The RMQDestination object has the following new instance fields:

  • amqp – boolean, indicates if this is an AMQP 0-9-1 destination (if true); the default is false.
  • amqpExchangeName – String, the RabbitMQ exchange name to use when sending messages to this destination, if amqp is true; the default is null.
  • amqpRoutingKey – String, the AMQP 0-9-1 routing key to use when sending messages to this destination, if amqp is true; the default is null.
  • amqpQueueName – String, the RabbitMQ queue name to use when reading messages from this destination, if amqp is true; the default is null.

There are getters and setters for these fields, which means that a JNDI <Resource/> definition or an XML Spring bean definition can use them, for example JNDI with Tomcat:

<Resource name="jms/Queue" type="javax.jms.Queue" factory="com.rabbitmq.jms.admin.RMQObjectFactory" destinationName="myQueue" amqp="true" amqpQueueName="rabbitQueueName" />

This is the equivalent with WildFly (as of JMS Client 1.7.0):

<bindings> <object-factory name="java:global/jms/Queue" module="foo.bar" class="com.rabbitmq.jms.admin.RMQObjectFactory"> <environment> <property name="className" value="javax.jms.Queue"/> <property name="destinationName" value="myQueue"/> <property name="amqp" value="true"/> <property name="amqpQueueName" value="rabbitQueueName"/> </environment> </object-factory> </bindings>

This is the equivalent Spring bean example (Java configuration):

@Bean public Destination jmsDestination() { RMQDestination jmsDestination = new RMQDestination(); jmsDestination.setDestinationName("myQueue"); jmsDestination.setAmqp(true); jmsDestination.setAmqpQueueName("rabbitQueueName"); return jmsDestination; }

And here is the Spring XML configuration:

<bean id="jmsDestination" class="com.rabbitmq.jms.admin.RMQDestination" > <property name="destinationName" value="myQueue" /> <property name="amqp" value="true" /> <property name="amqpQueueName" value="rabbitQueueName" /> </bean>

Following is a complete list of the attributes/properties that are available:

Attribute/Property NameJNDI Only?Description
nameYesName in JNDI.
typeYesName of the JMS interface the object implements, usually javax.jms.Queue. Other choices are javax.jms.Topic and javax.jms.Destination. You can also use the name of the (common) implementation class, com.rabbitmq.jms.admin.RMQDestination.
factoryYesJMS Client for RabbitMQ ObjectFactory class, always com.rabbitmq.jms.admin.RMQObjectFactory.
amqpNo"true" means this is an 'amqp' destination. Default "false".
amqpExchangeNameNoName of the RabbitMQ exchange to publish messages to when an 'amqp' destination. This exchange must exist when messages are published.
amqpRoutingKeyNoThe routing key to use when publishing messages when an 'amqp' destination.
amqpQueueNameNoName of the RabbitMQ queue to receive messages from when an 'amqp' destination. This queue must exist when messages are received.
destinationNameNoName of the JMS destination.

Configuring Logging for the JMS Client

The JMS Client logs messages using SLF4J (Simple Logging Façade for Java). SLF4J delegates to a logging framework, such as Apache log4j or Logback. If no other logging framework is enabled, SLF4J defaults to a built-in, no-op, logger. See the SLF4J documentation for a list of the logging frameworks SLF4J supports.

The target logging framework is configured at deployment time by adding an SLF4J binding for the framework to the classpath. For example, the log4j SLF4J binding is in the slf4j-log4j12-{version}.jar file, which is a part of the SLF4J distribution. To direct JMS client messages to log4j, for example, add the following JARs to the classpath:

  • slf4j-api-1.7.21.jar
  • slf4j-log4j12-1.7.21.jar
  • log4j-1.2.17.jar

The SLF4J API is backwards compatible, so you can use use any version of SLF4J. Version 1.7.5 or higher is recommended. The SLF4J API and bindings, however, must be from the same SLF4J version.

No additional SLF4J configuration is required, once the API and binding JAR files are in the classpath. However, the target framework may have configuration files or command-line options. Refer to the documentation for the target logging framework for configuration details.

Implementation Details

This section provides additional implementation details for specific JMS API classes in the JMS Client.

Deviations from the specification are implemented to support common acknowledgement behaviours.

QueueBrowser Support

Overview of queue browsers

The JMS API includes objects and methods to browse an existing queue destination, reading its messages without removing them from the queue. Topic destinations cannot be browsed in this manner.

A QueueBrowser can be created from a (queue) Destination, with or without a selector expression. The browser has a getEnumeration() method, which returns a Java Enumeration of Messages copied from the queue.

If no selector is supplied, then all messages in the queue appear in the Enumeration. If a selector is supplied, then only those messages that satisfy the selector appear.

Implementation

The destination queue is read when the getEnumeration() method is called. A snapshot is taken of the messages in the queue; and the selector expression, if one is supplied, is used at this time to discard messages that do not match.

The message copies may now be read using the Enumeration interface (nextElement() and hasMoreElements()).

The selector expression and the destination queue of the QueueBrowser may not be adjusted after the QueueBrowser is created.

An Enumeration cannot be "reset", but the getEnumeration() method may be re-issued, taking a new snapshot from the queue each time.

The contents of an Enumeration survive session and/or connection close, but a QueueBrowser may not be used after the session that created it has closed. QueueBrowser.close() has no effect.

Which messages are included

Messages that arrive, expire, are re-queued, or are removed after the getEnumeration() call have no effect on the contents of the Enumeration it produced. If the messages in the queue change while theEnumerationis being built, they may or may not be included. In particular, if messages from the queue are simultaneously read by another client (or session), they may or may not appear in the Enumeration.

Message copies do not "expire" from an Enumeration.

Order of messages

If other client sessions read from a queue that is being browsed, then it is possible that some messages may subsequently be received out of order.

Message order will not be disturbed if no other client sessions read the queue at the same time.

Memory usage

When a message is read from the Enumeration (with nextElement()), then no reference to it is retained in the Java Client. This means the storage it occupies in the client is eligible for release (by garbage collection) if no other references are retained. Retaining an Enumeration will retain the storage for all message copies that remain in it.

If the queue has many messages -- or the messages it contains are very large -- then a getEnumeration() method call may consume a large amount of memory in a very short time. This remains true even if only a few messages are selected. There is currently limited protection against OutOfMemoryError conditions that may arise because of this. See the next section.

Setting a maximum number of messages to browse

Each connection is created with a limit on the number of messages that are examined by a QueueBrowser. The limit is set on the RMQConnectionFactory by RMQConnectionFactory.setQueueBrowserReadMax(int) and is passed to each Connection subsequently created by ConnectionFactory.createConnection().

The limit is an integer that, if positive, stops the queue browser from reading more than this number of messages when building an enumeration. If it is zero or negative, it is interpreted as imposing no limit on the browser, and all of the messages on the queue are scanned.

The default limit for a factory is determined by the rabbit.jms.queueBrowserReadMax system property, if set, and the value is specified as 0 if this property is not set or is not an integer.

If a RMQConnectionFactory value is obtained from a JNDI provider, then the limit set when the factory object was created is preserved.

Release Support

Support for QueueBrowsers is introduced in the JMS Client 1.2.0. Prior to that release, calling Session.createBrowser(Queue queue[, String selector]) resulted in an UnsupportedOperationException.

Group and individual acknowledgement

Prior to version 1.2.0 of the JMS client, in client acknowledgement mode (Session.CLIENT_ACKNOWLEDGE), acknowledging any message from an open session would acknowledge every unacknowledged message of that session, whether they were received before or after the message being acknowledged.

Currently, the behaviour of Session.CLIENT_ACKNOWLEDGE mode is modified so that, when calling msg.acknowledge(), only the message msgand all previously received unacknowledged messages on that session are acknowledged. Messages received aftermsg was received are not affected. This is a form of group acknowledgement, which differs slightly from the JMS 1.1 specification but is likely to be more useful, and is compatible with the vast majority of uses of the existing acknowledge function.

For even finer control, a new acknowledgement mode may be set when creating a session, called RMQSession.CLIENT_INDIVIDUAL_ACKNOWLEDGE.

A session created with this acknowledgement mode will mean that messages received on that session will be acknowledged individually. That is, the call msg.acknowledge() will acknowledge only the message msg and not affect any other messages of that session.

The acknowledgement mode RMQSession.CLIENT_INDIVIDUAL_ACKNOWLEDGE is equivalent to Session.CLIENT_ACKNOWLEDGE in all other respects. In particular the getAcknowledgeMode() method returns Session.CLIENT_ACKNOWLEDGE even if RMQSession.CLIENT_INDIVIDUAL_ACKNOWLEDGE has been set.

Arbitrary Message support

Any instance of a class that implements the javax.jms.Message interface can be sent by a JMS message producer.

All properties of the message required by send() are correctly interpreted except that the JMSReplyTo header and objects (as property values or the body of an ObjectMessage) that cannot be deserialized are ignored.

The implementation extracts the properties and body from the Message instance using interface methods and recreates it as a message of the right (RMQMessage) type (BytesMessage, MapMessage, ObjectMessage, TextMessage, or StreamMessage) before sending it. This means that there is some performance loss due to the copying; but in the normal case, when the message is an instance of com.rabbitmq.jms.client.RMQMessage, no copying is done.

Further Reading

To gain better understanding of AMQP 0-9-1 concepts and interoperability of the RabbitMQ JMS client with AMQP 0-9-1 clients, you may wish to read an Introduction to RabbitMQ Concepts and browse our AMQP 0-9-1 Quick Reference Guide.

0 thoughts on “Jboss Queues And Topics For Argumentative Essays

Leave a Reply

Your email address will not be published. Required fields are marked *