原创

SpringMVC+JMS +ActiveMQ 配置应用

ActiveMQ 是一个完全支持JMS1.1和J2EE规范的 JMS Provider实现,而对Spring的支持,ActiveMQ可以很容易内嵌到使用Spring的系统里面去。本文列出了ActiveMQ的客户端代码以及服务端安装方式。


所需jar包如:

activemq-all-5.3.0.jar
activemq-web-5.3.0.jar
geronimo-j2ee-management_1.0_spec-1.0.jar
geronimo-jms_1.1_spec-1.1.1.jar
geronimo-jta_1.0.1B_spec-1.0.1.jar


消息监听器代码如下:

package com.test;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;
public class MyMessageListener implements MessageListener{
    public void onMessage(Message m) {
if(m instanceof TextMessage){
    try {
System.out.println(((TextMessage) m).getText());
    } catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
    }
}
    }

}

简单封装代码如下:

package com.test;

import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;

public class MyMessageSender {
    private JmsTemplate jmsTemplate;
    
    private Destination destination;
    
    private Destination respDest;

    private static final Logger LOG = LoggerFactory.getLogger(MyMessageSender.class);
    

    
    public void aSyncSendMsg(final Long seq,final Long txnCd, final String context,final Long hNum){

jmsTemplate.send(destination, new MessageCreator() {
    public Message createMessage(Session session) throws JMSException {
Message msg = session.createTextMessage(context);
msg.setLongProperty("reqTxnSeq", seq);
msg.setStringProperty("txnCd", txnCd.toString());
msg.setLongProperty("hNumber", hNum);
msg.setBooleanProperty("isaSync", true);
return msg;
    }
});
    }
    /**
     * 发送信息并获取返回信息
     * 
     * @param seq
     * @param txnCd
     * @param context
     * @return
     */
    public Message sendMsg(final Long reqTxnSeq, final Long txnCd, final String context,final Long hNum) {
jmsTemplate.send(destination, new MessageCreator() {
    public Message createMessage(Session session) throws JMSException {
Message msg = session.createTextMessage(context);
msg.setJMSReplyTo(respDest);
msg.setLongProperty("reqTxnSeq", reqTxnSeq);
msg.setStringProperty("txnCd", txnCd.toString());
msg.setLongProperty("hNumber", hNum);
msg.setBooleanProperty("isaSync", false);
return msg;
    }
});
LOG.info(Thread.currentThread().getName()+"=============成功向{}发送了一条JMS消息,txnCd_reqTxnSeq_context:{}",
destination.toString(), txnCd + "_" + reqTxnSeq + "_" + context);

Message msg = jmsTemplate.receiveSelected(respDest, "reqTxnSeq = "
+ reqTxnSeq);
if (msg != null) {
    LOG.info(Thread.currentThread().getName()+"=============成功从{}收到了一条JMS消息,reqTxnSeq_returnMsg:{}",
    destination.toString(), reqTxnSeq + "_" + msg);
    return msg;
} else {
    LOG.error("=============接受消息超时了!reqTxnSeq:{}", reqTxnSeq);
    return null;
}
    }

    /**
     * 发送信息并获取返回文字信息
     * @param seq
     * @param txnCd
     * @param context
     * @return
     */
    public String sendTextRtnMessage(Long seq, Long txnCd, String context,final Long hNum) {
TextMessage msg = (TextMessage) sendMsg(seq, txnCd, context,hNum);
String resp = null;
try {
    resp = msg.getText();
    LOG.info(resp);
} catch (Exception e) {
    LOG.error("get text msg from return message result error : {}",
    e.getMessage());
}
return resp;
    }
    
    public String recivber(){
TextMessage msg = (TextMessage) jmsTemplate.receive(destination);
  try {
    System.out.println("reviced msg is:" + msg.getText());
} catch (Exception e) {
    // TODO Auto-generated catch block
    e.printStackTrace();
}
  return null;
    }
    
    public JmsTemplate getJmsTemplate() {
return jmsTemplate;
    }
    public void setJmsTemplate(JmsTemplate jmsTemplate) {
this.jmsTemplate = jmsTemplate;
    }


    public Destination getDestination() {
return destination;
    }


    public void setDestination(Destination destination) {
this.destination = destination;
    }
    public Destination getRespDest() {
return respDest;
    }
    public void setRespDest(Destination respDest) {
this.respDest = respDest;
    }
}

配置文件代码如下:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:amq="http://activemq.apache.org/schema/core"
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
 
 xsi:schemaLocation="http://www.springframework.org/schema/beans 
http://www.springframework.org/schema/beans/spring-beans-3.1.xsd  
http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">
  <!-- <bean id="listenerContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer"> 
   
 <property name="connectionFactory" 
ref="connectionFactory"></property> <property 
name="destination" ref="messageQueue"></property> 
    <property name="messageListener" 
ref="MyMessageListener"></property> </bean> <bean 
id="connectionFactory" 
    class="org.springframework.jndi.JndiObjectFactoryBean"> <property 
name="jndiName" 
value="java:comp/env/myJMS/ConnectionFactory"></property> 
   
 </bean> <bean id="messageQueue" 
class="org.springframework.jndi.JndiObjectFactoryBean"> <property 
name="jndiName" 
    value="java:comp/env/myJMS/MessageQueue"></property> 
</bean> <bean id="MyMessageListener" 
class="com.test.MyMessageListener"></bean> 
    <bean id="messageSender" class="com.test.MyMessageSender"> <property name="jmsTemplate" 
   
 ref="jmsTemplate"></property> </bean> <bean 
id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate"> 
    <property name="connectionFactory" ref="connectionFactory"></property> <property name="defaultDestination" 
    ref="messageQueue"></property> </bean> -->

  <!-- 配置JMS连接工厂(注:brokerURL是关键,它应该是上面的amq:transportConnectors里面的值之一对应,因为这里指定连接的对象) -->
  <amq:connectionFactory id="jmsConnectionFactory" brokerURL="tcp://localhost:61616" />
  <!-- 消息发送的目的地(注:”amq:queue”是用于指定是发送topic还是queue) -->
  <amq:queue name="destination" physicalName="ossQueue" />
  
  <!-- 创建JMS的Session生成类,也就是jmsTemplate -->
  <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
    <property name="connectionFactory">
      <bean class="org.springframework.jms.connection.SingleConnectionFactory">
<property name="targetConnectionFactory" ref="jmsConnectionFactory" />
      </bean>
    </property>
    <property name="defaultDestination" ref="txnQueue" />
  </bean>
  
  <!-- 消息生产者(通过指定目的地, 就可以同时指定其发送的消息模式是topic还是queue) -->
  <bean id="messageSender" class="com.test.MyMessageSender">
    <property name="jmsTemplate" ref="jmsTemplate" />
    <property name="destination" ref="txnQueue" />
    <property name="respDest" ref="txnReplyQueue" />
  </bean>
  <bean id="MyMessageListener" class="com.test.MyMessageListener">
  </bean>
  
  <!-- 消息监听容器,其各属性的意义为: connectionFactory:指定所监听的对象,在这里就是监听连接到tcp://test.vemic.com:61616上面的ActiveMQ; destination:监听的消息模式; 
    messageListener:接收者 ) -->
  <bean id="listenerContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
    <property name="connectionFactory" ref="jmsConnectionFactory" />
    <property name="destination" ref="txnReplyQueue" />
    <property name="messageListener" ref="MyMessageListener" />
  </bean>
<!--发送方-->
  <bean id="txnReplyQueue" class="org.apache.activemq.command.ActiveMQQueue">
    <constructor-arg index="0" value="data" />
  </bean>
<!--接收方-->
  <bean id="txnQueue" class="org.apache.activemq.command.ActiveMQQueue">
    <constructor-arg index="0" value="data" />
  </bean>
</beans>


Activemq 服务端下载地址
http://activemq.apache.org/download.html 直接下一步,下一步就可以,账号密码admin/admin

~阅读全文-人机检测~

微信公众号“Java精选”(w_z90110),专注Java技术干货分享!让你从此路人变大神!回复关键词领取资料:如Mysql、Hadoop、Dubbo、Spring Boot等,免费领取视频教程、资料文档和项目源码。微信搜索小程序“Java精选面试题”,内涵3000+道Java面试题!

涵盖:互联网那些事、算法与数据结构、SpringMVC、Spring boot、Spring Cloud、ElasticSearch、Linux、Mysql、Oracle等

评论

分享:

支付宝

微信