原创

SpringMVC+JMS +ActiveMQ 配置应用

ActiveMQ 是一个完全支持JMS1.1和J2EE规范的 JMS Provider实现,而对Spring的支持,ActiveMQ可以很容易内嵌到使用Spring的系统里面去。本文列出了ActiveMQ的客户端代码以及服务端安装方式。


所需jar包如:

activemq-all-5.3.0.jar
activemq-web-5.3.0.jar
geronimo-j2ee-management_1.0_spec-1.0.jar
geronimo-jms_1.1_spec-1.1.1.jar
geronimo-jta_1.0.1B_spec-1.0.1.jar


消息监听器代码如下:

package com.test;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;
public class MyMessageListener implements MessageListener{
    public void onMessage(Message m) {
        if(m instanceof TextMessage){
            try {
                System.out.println(((TextMessage) m).getText());
            } catch (JMSException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }
    }

}

简单封装代码如下:

package com.test;

import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;

public class MyMessageSender {
    private JmsTemplate jmsTemplate;
    
    private Destination destination;
    
    private Destination respDest;

    private static final Logger LOG = LoggerFactory.getLogger(MyMessageSender.class);
    

    
    public void aSyncSendMsg(final Long seq,final Long txnCd, final String context,final Long hNum){

        jmsTemplate.send(destination, new MessageCreator() {
            public Message createMessage(Session session) throws JMSException {
                Message msg = session.createTextMessage(context);
                msg.setLongProperty("reqTxnSeq", seq);
                msg.setStringProperty("txnCd", txnCd.toString());
                msg.setLongProperty("hNumber", hNum);
                msg.setBooleanProperty("isaSync", true);
                return msg;
            }
        });
    }
    /**
     * 发送信息并获取返回信息
     * 
     * @param seq
     * @param txnCd
     * @param context
     * @return
     */
    public Message sendMsg(final Long reqTxnSeq, final Long txnCd, final String context,final Long hNum) {
        jmsTemplate.send(destination, new MessageCreator() {
            public Message createMessage(Session session) throws JMSException {
                Message msg = session.createTextMessage(context);
                msg.setJMSReplyTo(respDest);
                msg.setLongProperty("reqTxnSeq", reqTxnSeq);
                msg.setStringProperty("txnCd", txnCd.toString());
                msg.setLongProperty("hNumber", hNum);
                msg.setBooleanProperty("isaSync", false);
                return msg;
            }
        });
        LOG.info(Thread.currentThread().getName()+"=============成功向{}发送了一条JMS消息,txnCd_reqTxnSeq_context:{}",
                destination.toString(), txnCd + "_" + reqTxnSeq + "_" + context);

        Message msg = jmsTemplate.receiveSelected(respDest, "reqTxnSeq = "
                + reqTxnSeq);
        if (msg != null) {
            LOG.info(Thread.currentThread().getName()+"=============成功从{}收到了一条JMS消息,reqTxnSeq_returnMsg:{}",
                    destination.toString(), reqTxnSeq + "_" + msg);
            return msg;
        } else {
            LOG.error("=============接受消息超时了!reqTxnSeq:{}", reqTxnSeq);
            return null;
        }
    }

    /**
     * 发送信息并获取返回文字信息
     * @param seq
     * @param txnCd
     * @param context
     * @return
     */
    public String sendTextRtnMessage(Long seq, Long txnCd, String context,final Long hNum) {
        TextMessage msg = (TextMessage) sendMsg(seq, txnCd, context,hNum);
        String resp = null;
        try {
            resp = msg.getText();
            LOG.info(resp);
        } catch (Exception e) {
            LOG.error("get text msg from return message result error : {}",
                    e.getMessage());
        }
        return resp;
    }
    
    public String recivber(){
        TextMessage msg = (TextMessage) jmsTemplate.receive(destination);
          try {
            System.out.println("reviced msg is:" + msg.getText());
        } catch (Exception e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
          return null;
    }
    
    public JmsTemplate getJmsTemplate() {
        return jmsTemplate;
    }
    public void setJmsTemplate(JmsTemplate jmsTemplate) {
        this.jmsTemplate = jmsTemplate;
    }


    public Destination getDestination() {
        return destination;
    }


    public void setDestination(Destination destination) {
        this.destination = destination;
    }
    public Destination getRespDest() {
        return respDest;
    }
    public void setRespDest(Destination respDest) {
        this.respDest = respDest;
    }
}

配置文件代码如下:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:amq="http://activemq.apache.org/schema/core"
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
 
 xsi:schemaLocation="http://www.springframework.org/schema/beans 
http://www.springframework.org/schema/beans/spring-beans-3.1.xsd  
        http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">
  <!-- <bean id="listenerContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer"> 
   
 <property name="connectionFactory" 
ref="connectionFactory"></property> <property 
name="destination" ref="messageQueue"></property> 
    <property name="messageListener" 
ref="MyMessageListener"></property> </bean> <bean 
id="connectionFactory" 
    class="org.springframework.jndi.JndiObjectFactoryBean"> <property 
name="jndiName" 
value="java:comp/env/myJMS/ConnectionFactory"></property> 
   
 </bean> <bean id="messageQueue" 
class="org.springframework.jndi.JndiObjectFactoryBean"> <property 
name="jndiName" 
    value="java:comp/env/myJMS/MessageQueue"></property> 
</bean> <bean id="MyMessageListener" 
class="com.test.MyMessageListener"></bean> 
    <bean id="messageSender" class="com.test.MyMessageSender"> <property name="jmsTemplate" 
   
 ref="jmsTemplate"></property> </bean> <bean 
id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate"> 
    <property name="connectionFactory" ref="connectionFactory"></property> <property name="defaultDestination" 
    ref="messageQueue"></property> </bean> -->

  <!-- 配置JMS连接工厂(注:brokerURL是关键,它应该是上面的amq:transportConnectors里面的值之一对应,因为这里指定连接的对象) -->
  <amq:connectionFactory id="jmsConnectionFactory" brokerURL="tcp://localhost:61616" />
  <!-- 消息发送的目的地(注:”amq:queue”是用于指定是发送topic还是queue) -->
  <amq:queue name="destination" physicalName="ossQueue" />
  
  <!-- 创建JMS的Session生成类,也就是jmsTemplate -->
  <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
    <property name="connectionFactory">
      <bean class="org.springframework.jms.connection.SingleConnectionFactory">
        <property name="targetConnectionFactory" ref="jmsConnectionFactory" />
      </bean>
    </property>
    <property name="defaultDestination" ref="txnQueue" />
  </bean>
  
  <!-- 消息生产者(通过指定目的地, 就可以同时指定其发送的消息模式是topic还是queue) -->
  <bean id="messageSender" class="com.test.MyMessageSender">
    <property name="jmsTemplate" ref="jmsTemplate" />
    <property name="destination" ref="txnQueue" />
    <property name="respDest" ref="txnReplyQueue" />
  </bean>
  <bean id="MyMessageListener" class="com.test.MyMessageListener">
  </bean>
  
  <!-- 消息监听容器,其各属性的意义为: connectionFactory:指定所监听的对象,在这里就是监听连接到tcp://test.vemic.com:61616上面的ActiveMQ; destination:监听的消息模式; 
    messageListener:接收者 ) -->
  <bean id="listenerContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
    <property name="connectionFactory" ref="jmsConnectionFactory" />
    <property name="destination" ref="txnReplyQueue" />
    <property name="messageListener" ref="MyMessageListener" />
  </bean>
<!--发送方-->
  <bean id="txnReplyQueue" class="org.apache.activemq.command.ActiveMQQueue">
    <constructor-arg index="0" value="data" />
  </bean>
<!--接收方-->
  <bean id="txnQueue" class="org.apache.activemq.command.ActiveMQQueue">
    <constructor-arg index="0" value="data" />
  </bean>
</beans>


Activemq 服务端下载地址
http://activemq.apache.org/download.html 直接下一步,下一步就可以,账号密码admin/admin

关注下方微信公众号“Java精选”(w_z90110),回复关键字领取资料:如HadoopDubboCAS源码等等,免费领取资料视频和项目。 

涵盖:程序人生、搞笑视频、算法与数据结构、黑客技术与网络安全、前端开发、Java、Python、Redis缓存、Spring源码、各大主流框架、Web开发、大数据技术、Storm、Hadoop、MapReduce、Spark、elasticsearch、单点登录统一认证、分布式框架、集群、安卓开发、iOS开发、C/C++、.NET、Linux、Mysql、Oracle、NoSQL非关系型数据库、运维等。

评论

分享:

支付宝

微信