ActiveMQ 是一个完全支持JMS1.1和J2EE规范的 JMS Provider实现,而对Spring的支持,ActiveMQ可以很容易内嵌到使用Spring的系统里面去。本文列出了ActiveMQ的客户端代码以及服务端安装方式。
所需jar包如:
activemq-all-5.3.0.jar
activemq-web-5.3.0.jar
geronimo-j2ee-management_1.0_spec-1.0.jar
geronimo-jms_1.1_spec-1.1.1.jar
geronimo-jta_1.0.1B_spec-1.0.1.jar
消息监听器代码如下:
package com.test; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageListener; import javax.jms.TextMessage; public class MyMessageListener implements MessageListener{ public void onMessage(Message m) { if(m instanceof TextMessage){ try { System.out.println(((TextMessage) m).getText()); } catch (JMSException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } }
简单封装代码如下:
package com.test; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.Session; import javax.jms.TextMessage; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.jms.core.JmsTemplate; import org.springframework.jms.core.MessageCreator; public class MyMessageSender { private JmsTemplate jmsTemplate; private Destination destination; private Destination respDest; private static final Logger LOG = LoggerFactory.getLogger(MyMessageSender.class); public void aSyncSendMsg(final Long seq,final Long txnCd, final String context,final Long hNum){ jmsTemplate.send(destination, new MessageCreator() { public Message createMessage(Session session) throws JMSException { Message msg = session.createTextMessage(context); msg.setLongProperty("reqTxnSeq", seq); msg.setStringProperty("txnCd", txnCd.toString()); msg.setLongProperty("hNumber", hNum); msg.setBooleanProperty("isaSync", true); return msg; } }); } /** * 发送信息并获取返回信息 * * @param seq * @param txnCd * @param context * @return */ public Message sendMsg(final Long reqTxnSeq, final Long txnCd, final String context,final Long hNum) { jmsTemplate.send(destination, new MessageCreator() { public Message createMessage(Session session) throws JMSException { Message msg = session.createTextMessage(context); msg.setJMSReplyTo(respDest); msg.setLongProperty("reqTxnSeq", reqTxnSeq); msg.setStringProperty("txnCd", txnCd.toString()); msg.setLongProperty("hNumber", hNum); msg.setBooleanProperty("isaSync", false); return msg; } }); LOG.info(Thread.currentThread().getName()+"=============成功向{}发送了一条JMS消息,txnCd_reqTxnSeq_context:{}", destination.toString(), txnCd + "_" + reqTxnSeq + "_" + context); Message msg = jmsTemplate.receiveSelected(respDest, "reqTxnSeq = " + reqTxnSeq); if (msg != null) { LOG.info(Thread.currentThread().getName()+"=============成功从{}收到了一条JMS消息,reqTxnSeq_returnMsg:{}", destination.toString(), reqTxnSeq + "_" + msg); return msg; } else { LOG.error("=============接受消息超时了!reqTxnSeq:{}", reqTxnSeq); return null; } } /** * 发送信息并获取返回文字信息 * @param seq * @param txnCd * @param context * @return */ public String sendTextRtnMessage(Long seq, Long txnCd, String context,final Long hNum) { TextMessage msg = (TextMessage) sendMsg(seq, txnCd, context,hNum); String resp = null; try { resp = msg.getText(); LOG.info(resp); } catch (Exception e) { LOG.error("get text msg from return message result error : {}", e.getMessage()); } return resp; } public String recivber(){ TextMessage msg = (TextMessage) jmsTemplate.receive(destination); try { System.out.println("reviced msg is:" + msg.getText()); } catch (Exception e) { // TODO Auto-generated catch block e.printStackTrace(); } return null; } public JmsTemplate getJmsTemplate() { return jmsTemplate; } public void setJmsTemplate(JmsTemplate jmsTemplate) { this.jmsTemplate = jmsTemplate; } public Destination getDestination() { return destination; } public void setDestination(Destination destination) { this.destination = destination; } public Destination getRespDest() { return respDest; } public void setRespDest(Destination respDest) { this.respDest = respDest; } }
配置文件代码如下:
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:amq="http://activemq.apache.org/schema/core" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.1.xsd http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd"> <!-- <bean id="listenerContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer"> <property name="connectionFactory" ref="connectionFactory"></property> <property name="destination" ref="messageQueue"></property> <property name="messageListener" ref="MyMessageListener"></property> </bean> <bean id="connectionFactory" class="org.springframework.jndi.JndiObjectFactoryBean"> <property name="jndiName" value="java:comp/env/myJMS/ConnectionFactory"></property> </bean> <bean id="messageQueue" class="org.springframework.jndi.JndiObjectFactoryBean"> <property name="jndiName" value="java:comp/env/myJMS/MessageQueue"></property> </bean> <bean id="MyMessageListener" class="com.test.MyMessageListener"></bean> <bean id="messageSender" class="com.test.MyMessageSender"> <property name="jmsTemplate" ref="jmsTemplate"></property> </bean> <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate"> <property name="connectionFactory" ref="connectionFactory"></property> <property name="defaultDestination" ref="messageQueue"></property> </bean> --> <!-- 配置JMS连接工厂(注:brokerURL是关键,它应该是上面的amq:transportConnectors里面的值之一对应,因为这里指定连接的对象) --> <amq:connectionFactory id="jmsConnectionFactory" brokerURL="tcp://localhost:61616" /> <!-- 消息发送的目的地(注:”amq:queue”是用于指定是发送topic还是queue) --> <amq:queue name="destination" physicalName="ossQueue" /> <!-- 创建JMS的Session生成类,也就是jmsTemplate --> <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate"> <property name="connectionFactory"> <bean class="org.springframework.jms.connection.SingleConnectionFactory"> <property name="targetConnectionFactory" ref="jmsConnectionFactory" /> </bean> </property> <property name="defaultDestination" ref="txnQueue" /> </bean> <!-- 消息生产者(通过指定目的地, 就可以同时指定其发送的消息模式是topic还是queue) --> <bean id="messageSender" class="com.test.MyMessageSender"> <property name="jmsTemplate" ref="jmsTemplate" /> <property name="destination" ref="txnQueue" /> <property name="respDest" ref="txnReplyQueue" /> </bean> <bean id="MyMessageListener" class="com.test.MyMessageListener"> </bean> <!-- 消息监听容器,其各属性的意义为: connectionFactory:指定所监听的对象,在这里就是监听连接到tcp://test.vemic.com:61616上面的ActiveMQ; destination:监听的消息模式; messageListener:接收者 ) --> <bean id="listenerContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer"> <property name="connectionFactory" ref="jmsConnectionFactory" /> <property name="destination" ref="txnReplyQueue" /> <property name="messageListener" ref="MyMessageListener" /> </bean> <!--发送方--> <bean id="txnReplyQueue" class="org.apache.activemq.command.ActiveMQQueue"> <constructor-arg index="0" value="data" /> </bean> <!--接收方--> <bean id="txnQueue" class="org.apache.activemq.command.ActiveMQQueue"> <constructor-arg index="0" value="data" /> </bean> </beans>
Activemq 服务端下载地址
http://activemq.apache.org/download.html 直接下一步,下一步就可以,账号密码admin/admin