如何用SSL 和ibm mq 教程建立连接

5438人阅读
端口配置选项
一般最常用的URI是连接到代理的端口URI,通常为TCP或VM端口。
要注意空格:所有的URI都是基于java.net.URI类,它并不允许使用空格。所以,如果你使用failover:或者static:URI,在逗号内不要留有空白。
虚拟机端口
虚拟机端口允许客户可以在虚拟机内部互相连接而不用使用其上的网络连接。这个连接不是socket连接,而是直接使用方法调用来增加嵌入式消息系统的高性能。
使用虚拟机的第一个客户将用来引导一个嵌入的代理,随后连接将附到到相同的代理。一旦连接到代理的所用虚拟机被关闭则嵌入式代理也将被关闭。
更多的信息请查阅。
TCP端口允许客户使用TCP socket连接一个远程ActiveMQ。
更多的信息请查阅。
除去使用新的I/O包其余的和TCP端口一样,这也许会提供更好的性能。注意java NIO包不要和IBM的AIO4J包引起冲突。
从TCP切换到NIO只需要简单的更改URI配置方案,下面是一个定义NIO的XML配置文件。
&&transportConnectors&
&&&&transportConnector name=&nio& uri=&nio://0.0.0.0:61616&/&&
&&/&transportConnectors&
允许你通过TCP使用SSL(通过网络通道的信息进行加密)。
更多的信息请查阅。
NIO SSL端口
通过NIO实现SSL传输。这允许你使用一个代理实例可以连接大量的SSL客户端。这只是服务端端口选项。
&&transportConnectors&
&&&&transportConnector name=&nio+ssl& uri=&nio+ssl://0.0.0.0:61616&/&&
&&/&transportConnectors&
在客户端使用nio+ssl端口URL会初始化常规SSL端口。
对等网络端口
这个端口提供用户使用ActiveMQ的端对端网络。其实它使用虚拟机端口建立本地代理的连接,本地代理建立网络配置连接到另一端的嵌入代理。
更多信息请参阅。
允许你通过UDP进行交谈。
更多的信息请查看。
允许你通过组播进行交谈。
更多的信息请参阅。
HTTP和HTTPS端口
允许ActiveMQ客户和代理通过HTTP建立通道。如果客户不是JMS,你也许想知道或者支持。
更多的信息请查阅端口参考。
WebSockets端口
这个端口使用HTML5WebSockets来和代理交互信息。更多的信息请查阅。
你可以通过URI语法进行其他特性的一些配置。
连接配置URI
任何ActiveMQ JMS连接可以使用URL来进行配置或者直接在ActiveMQConnectionFactory或者ActiveMQConnection对象中设置属性。
你可以使用URI语法来配置不同的。
代理配置URI
你能使用来配置嵌入式代理,也可以在java代码中使用BrokerFactory的helper,或者使用activemq外壳脚本。更多的信息请查阅。
配置Wire格式
通过网络端口传输编组消息推荐使用OpenWire格式,这也是可配置的。
详细信息请参阅。
高级协议URI
通过URI可以配置更高级的协议。
故障切换端口
故障切换端口层从新连接优先其他端口。这曾经用于ActiveMQ 3。它的配置语法允许你指定许多复合的URI。故障切换端口随机选择一个复合URI来尝试建立连接。如果不成功,则从URI列表中挑一个建立新的连接。
详细的信息请查阅。
扇出端口层在其他端口的顶部从新连接并复制逻辑。它用于复制命令到多个代理中。
更多的信息请参阅。
通常使用TCP端口的时候你想查找并定位到有效的代理上。从使用、说法和组播上来说是不同的。使用TCP作为主要的通信方式而不是组播,发现功能纯粹用于发现代理的位置。
发现端口工作起来就像可靠连接的端口一样,只是它使用发现代理来定位要连接的URI列表。
更多信息请查看发现。
零配置端口
零配置端口可以提供发现并向发现端口一样工作,但是它是基于发现机制使用的是自己的组播方式(这允许你配置精确的组播地址和端口),
更多的信息请查看。
服务器端选项
这有许多选项组成,它们用于更改服务器端的行为。
缺省值
和端口进行绑定
端口连接实例名
discoveryURI
如果设置了,客户连接的组播发现地址用于发现代理。
enableStatusMonitor
监视连接来决定是否要阻塞。
updateClusterClients
启用将更新更改代理群的客户连接
rebalanceClusterClients
通过群拓扑更改自动从新平衡客户端。
updateClusterClientsOnRemove
如果代理从群中删除将更新代理。
updateClusterFilter
逗号分隔的正则表达式,名称匹配模板的代理将会进行客户更新。
注意:红色的属性只对版本5.4有效。
配置示例:
&&transportConnectors&
&&&&transportConnector name=&openwire& uri=&tcp://0.0.0.0:61616&enableStatusMonitor=&true&/&&
&&/&transportConnectors&
* 以上用户言论只代表其个人观点,不代表CSDN网站的观点或立场
访问:167864次
积分:2066
积分:2066
排名:第10476名
原创:21篇
转载:22篇
译文:24篇
评论:16条
(1)(2)(3)(2)(3)(1)(1)(1)(8)(1)(1)(3)(7)(4)(1)(1)(2)(3)(2)(4)(3)(1)(1)(1)(2)(3)(2)(1)(2)IBM WebSphere MQ配置管理手册_百度文库
两大类热门资源免费畅读
续费一年阅读会员,立省24元!
IBM WebSphere MQ配置管理手册
上传于||文档简介
&&I​B​M​ ​W​e​b​S​p​h​e​r​e​ ​M​Q​配​置​管​理​手​册
阅读已结束,如果下载本文需要使用
想免费下载本文?
下载文档到电脑,查找使用更方便
还剩9页未读,继续阅读
你可能喜欢3227人阅读
1.ActiveMQ概述
& &&企业消息软件从80年代起就存在,它不只是一种应用间消息传递风格,也是一种集成风格。因此,消息传递可以满足应用间的通知和互相操作。但是开源的解决方案是到最近10年才出现的。Apache ActiveMQ就是其中一种。它使应用间能以异步,松耦合方式交流。ActiveMQ 是Apache出品,最流行的,能力强劲的开源消息总线。
&? &&ActiveMQ是Apache软件基金下的一个开源软件,它遵循JMS规范(Java Message Service),是消息驱动中间件软件(MOM)。它为企业消息传递提供高可用,出色性能,可扩展,稳定和安全保障。ActiveMQ使用Apache许可协议。因此,任何人都可以使用和修改它而不必反馈任何改变。这对于商业上将ActiveMQ用在重要用途的人尤为关键。MOM的工作是在分布式的各应用之间调度事件和消息,使之到达指定的接收者。所以高可用,高性能,高可扩展性尤为关键。
2.ActiveMQ特性
& & ⒈支持多种语言客户端,如:Java,C,C++,C#,Ruby,Perl,Python,PHP。应用协议有 OpenWire,Stomp REST,WS Notification,XMPP,AMQP。
& & ⒉ 完全支持JMS1.1和J2EE1.4规范,它们包括同步和异步消息传递,一次和只有一次的消息传递,对于预订者的持久消息等等。依附于JMS规范意味着,不论JMS消息提供者是谁,同样的基本特性(持久化,XA消息,事务)都是有效的。
& & ⒊ 对Spring的支持,ActiveMQ可以很容易内嵌到使用Spring的系统里面去。
& & ⒋ 通过了常见J2EE服务器(如 Geronimo,JBoss 4,GlassFish,WebLogic)的测试,其中通过JCA 1.5 resource adaptors的配置,可以让ActiveMQ可以自动的部署到任何兼容J2EE 1.4 商业服务器上。
& & ⒌ ActiveMQ提供各种连接选择,包括HTTP,HTTPS,IP多点传送,SSL,STOMP,TCP,UDP,XMPP等。大量的连接协议支持使之具有更好的灵活性。很多现有的系统使用一种特定协议并且不能改变,所以一个支持多种协议的消息平台降低了使用的门槛。虽然连接很重要,但是和其他容器集成也同样重要。
& & 6.ActiveMQ提供多种持久性方案可供选择,也可以完全按自己需求定制验证和授权。例如,ActiveMQ通过KahaDB提供自己的超快速消息持久方案(ultra-fast message persistence),但也支持标准的JDBC方案。ActiveMQ可以通过配置文件提供简单的验证和授权,也提供标准的JAAS登陆模块。
& & 7.ActiveMQ是为开发者设计的。它并不需要专门的管理工具,因为它提供各种易用且强大的管理特性。有很多方法去监控ActiveMQ的各个方面,可以通过JMX使用JConsole或ActiveMQ web console;可以运行ActiveMQ消息报告;可以用命令行脚本;可以通过日志。
& & 8.代理器集群(Broker clustering)----为了利于扩展,多个ActiveMQ broker能够联合工作。这个方式就是network of brokers并且能支持多种拓扑结构;支持客户端-服务器,点对点。
& & 9.支持Ajax, 支持与Axis的整合
3.ActiveMQ优势
& & 1.与OpenJMS、JbossMQ等开源jms&provider相比,ActiveMQ有Apache的支持,持续发展的优势明显。
& & 2.速度很快,JBossMQ的十倍(没有测试)
& & 3.提高系统资源的利用率,主要是任务的派发不是24小时平均的,而是高峰时期任务量很多,比如1秒1000多个,有的时候很低,比如十几秒钟才来一个。应用服务通过JMS队列一个一个的取任务,做完一个再领一个,使系统资源的运用趋于平均。而JMS,比如JMS接收消息的效率是很高的,比如ActiveMQ,在赛扬(2.40GHz)机器上能够达到2000/s,消息大小为1-2k。好一些的服务器可以达到2万以上/秒。
4.ActiveMQ安装
& & 首先去&下载最新版本, 解压apache-activemq-5.8.0-bin.zip(或者)
目录如下:
& &+bin (windows下面的bat和unix/linux下面的sh)
& &+conf (activeMQ配置目录,包含最基本的activeMQ配置文件)
& &+data (默认是空的)
& &+docs (只有index.html)
& &+example (几个例子)
& &+lib (activemMQ使用到的lib)
& &+webapps(后台管理页面)
+webapps-demo(后台管理消息发送页面)
& &+activemq-all-5.8.0.jar&(java开发的jar包)
& &-LICENSE.txt
& &-NOTICE.txt
& &-README.txt
& &-user-guide.html
你可以使用bin\activemq.bat(activemq)启动。
启动ActiveMQ以后,登陆:&默认用户/密码为admin/admin。
⒈ 这个仅仅是最基础的ActiveMQ的配置,很多地方都没有配置因此不要直接使用这个配置用于生存环境。
⒉ 有的时候由于端口被占用,导致ActiveMQ错误,ActiveMQ可能需要以下端口1099(JMX),61616(默认的TransportConnector)。
⒊ 如果没有物理网卡,或者MS的LoopBackAdpater Multicast会报一个错误
5.运行附带的示例程序&
& &1、Queue消息示例:&
& & &*&启动Queue消息消费者&
& & & &cd&example&
& ant&consumer&
& & &*&启动Queue消息生产者&
& & & &cd&example&
& & & &ant&producer&
& & &简要说明:生产者(producer)发消息,消费者(consumer)接消息,发送/接收2000个消息后自动关闭&
& &2、Topic消息示例:&
& & &*&启动Topic消息消费者&
& & & &cd&example&
& & & &ant&topic-listener&
& & &*&启动Topic消息生产者&
& & & &cd&example&
& & & &ant&topic-publisher&
& & &简要说明:重复10轮,publisher每轮发送2000个消息,并等待获取listener的处理结果报告,然后进入下一轮 & & &发送,最后
统计全局发送时间。&
& &3、Queue消息和Topic消息发送之后,可以通过后台点击Queues和Topics查看发送消息具体信息。
6.ActiveMQ类别及开发流程
& &1)、Point-to-Point&(点对点)消息模式开发流程 :
& & & &1、生产者(producer)开发流程(ProducerTool.java):&
& & & & &1.1&创建Connection:&根据url,user和password创建一个jms&Connection。&
& & & & &1.2&创建Session:&在connection的基础上创建一个session,同时设置是否支持事务和ACKNOWLEDGE标识。&
& & & & &1.3&创建Destination对象:&需指定其对应的主题(subject)名称,producer和consumer将根据subject来发送/接收对应的消息。&
& & & & &1.4&创建MessageProducer:&根据Destination创建MessageProducer对象,同时设置其持久模式。&
& & & & &1.5&发送消息到队列(Queue):&封装TextMessage消息,使用MessageProducer的send方法将消息发送出去。&
& & & &2、消费者(consumer)开发流程(ConsumerTool.java):&
& & & & &2.1&实现MessageListener接口:&消费者类必须实现MessageListener接口,然后在onMessage()方法中监听消息的到达并处理。&
& & & & &2.2&创建Connection:&根据url,user和password创建一个jms&Connection,如果是durable模式,还需要给connection设置一个clientId。&
& & & & &2.3&创建Session和Destination:&与ProducerTool.java中的流程类似,不再赘述。&
& & & & &2.4 创建replyProducer【可选】:可以用来将消息处理结果发送给producer。&
& & & & &2.5&创建MessageConsumer:&&根据Destination创建MessageConsumer对象。&
& & & & & & & & & & & & &&2.6&消费message: &在onMessage()方法中接收producer发送过来的消息进行处理,并可以通过replyProducer反馈信息给producer&
if&(message.getJMSReplyTo()&!=&null)&{&&&&
&&replyProducer.send(message.getJMSReplyTo(),&&&&&
&session.createTextMessage(&Reply:&&&+&message.getJMSMessageID()));&&
& &2)、Publisher/Subscriber(发布/订阅者)消息模式开发流程&
& & & &1、订阅者(Subscriber)开发流程(TopicListener.java):&
& & & & &1.1&实现MessageListener接口:&在onMessage()方法中监听发布者发出的消息队列,并做相应处理。&
& & & & &1.2&创建Connection:&根据url,user和password创建一个jms&Connection。&
& & & & &1.3&创建Session:&在connection的基础上创建一个session,同时设置是否支持事务和ACKNOWLEDGE标识。&
&&& & & &1.4&创建Topic: &创建2个Topic,&topictest.messages用于接收发布者发出的消息,topictest.control 用于向发布者发送消息,实现双方的交互。&
& & & & &1.5&创建consumer和producer对象:根据topictest.messages创建consumer,根据topictest.control创建 producer。&
& & & & &1.6&接收处理消息:在onMessage()方法中,对收到的消息进行处理,可直接简单在本地显示消息,或者根 据消息内容不同处理对应的业务逻辑(比如:数据库更新、文件操作等等),并且可以使用producer对象将处理结果返回给发布者。&
& & & &2、发布者(Publisher)开发流程(TopicPublisher.java):
& & & & &2.1&实现MessageListener接口:在onMessage()方法中接收订阅者的反馈消息。&
& & & & &2.2&创建Connection:&根据url,user和password创建一个jms&Connection。&
& & & & & & & & & & & & & &2.3&创建Session:&在connection的基础上创建一个session,同时设置是否支持事务和ACKNOWLEDGE标识。&
& & & & &2.4&创建Topic:&创建2个Topic,topictest.messages用于向订阅者发布消息,topictest.control用于接 收订阅者反馈的消息。这2个topic与订阅者开发流程中的topic是一一对应的。&
& & & & &2.5&创建consumer和producer对象: 根据topictest.messages创建publisher;&根据topictest.control 创建consumer,同时监听订阅者反馈的消息。
& & & & &2.6&给所有订阅者发送消息,并接收反馈消息:&&示例代码中,一共重复10轮操作。&每轮先向所有订阅者 发送2000个消息;&然后堵塞线程,开始等待;&最后通过onMessage()方法,接收到订阅者反馈的“REPORT”类信息后,才print反馈信息并解除线程堵塞,进入下一轮。&
& & & & & & &注:可同时运行多个订阅者测试查看此模式效果&
7.Eclipse代码开发
& &1.建立一个Web Probject 项目,将activemq-all-5.8.0.jar放在lib里面
& &2.Queue(点对点)方式:生产者
package&&&
import&javax.jms.C&&
import&javax.jms.ConnectionF&&
import&javax.jms.DeliveryM&&
import&javax.jms.D&&
import&javax.jms.MessageP&&
import&javax.jms.Q&&
import&javax.jms.S&&
import&javax.jms.TextM&&
import&org.apache.activemq.ActiveMQC&&
import&org.apache.activemq.ActiveMQConnectionF&&
public&class&QueueProducer&{&&
&&&&private&static&String&user&=&ActiveMQConnection.DEFAULT_USER;&&
&&&&private&static&String&password&=ActiveMQConnection.DEFAULT_PASSWORD;&&
&&&&private&static&String&url&=&&&tcp://localhost:61616&;&&
&&&&public&static&void&main(String[]&args)throws&Exception&{&&
&&&&&&&&&&&
&&&&&&&&ConnectionFactory&connectionFactory&=&new&ActiveMQConnectionFactory(user,password,url);&&
&&&&&&&&&&
&&&&&&&&Connection&connection&=&connectionFactory.createConnection();&&
&&&&&&&&&&
&&&&&&&&connection.start();&&
&&&&&&&&System.out.println(&Connection&is&start...&);&&
&&&&&&&&&&
&&&&&&&&Session&session&=&connection.createSession(Boolean.TRUE,Session.AUTO_ACKNOWLEDGE);&&
&&&&&&&&&&
&&&&&&&&Queue&&destination&=&session.createQueue(&example.A&);&&
&&&&&&&&&&
&&&&&&&&MessageProducer&producer&=&session.createProducer(destination);&&
&&&&&&&&&&
&&&&&&&&producer.setDeliveryMode(DeliveryMode.PERSISTENT);&&
&&&&&&&&&&&
&&&&&&&&sendMessage(session,&producer);&&
&&&&&&&&mit();&&
&&&&&&&&connection.close();&&
&&&&&&&&System.out.println(&send&text&ok.&);&&
&&&&public&static&void&sendMessage(Session&session,&MessageProducer&producer)&&
&&&&&&&&&&&&throws&Exception&{&&
&&&&&&&&for&(int&i&=&1;&i&&=&100;&i++)&{&&
&&&&&&&&&&&&TextMessage&message&=&session.createTextMessage(&ActiveMq&发送的消息&&+&i);&&
&&&&&&&&&&&&&&
&&&&&&&&&&&&System.out.println(&发送消息:&&+&&ActiveMq&发送的消息&&+&i);&&
&&&&&&&&&&&&producer.send(message);&&
&&&&&&&&}&&
& 3.Queue(点对点)方式:消费者
package&&&
import&javax.jms.C&&
import&javax.jms.ConnectionF&&
import&javax.jms.D&&
import&javax.jms.JMSE&&
import&javax.jms.M&&
import&javax.jms.MessageC&&
import&javax.jms.MessageL&&
import&javax.jms.Q&&
import&javax.jms.S&&
import&javax.jms.TextM&&
import&org.apache.activemq.ActiveMQC&&
import&org.apache.activemq.ActiveMQConnectionF&&
public&class&QueueConsumer&{&&
&&&&private&static&String&user&=&ActiveMQConnection.DEFAULT_USER;&&
&&&&private&static&String&password&=ActiveMQConnection.DEFAULT_PASSWORD;&&
&&&&private&static&String&url&=&&tcp://localhost:61616&;&&
&&&&public&static&void&main(String[]&args)&throws&Exception{&&
&&&&&&&&&&
&&&&&&&&ConnectionFactory&connectionFactory&=&new&ActiveMQConnectionFactory(user,password,url);&&
&&&&&&&&&&
&&&&&&&&Connection&connection&=&connectionFactory.createConnection();&&
&&&&&&&&connection.start();&&
&&&&&&&&&&
&&&&&&&&final&Session&session&=&connection.createSession(Boolean.TRUE,&Session.AUTO_ACKNOWLEDGE);&&
&&&&&&&&&&
&&&&&&&&Queue&destination=session.createQueue(&example.A&);&&
&&&&&&&&&&
&&&&&&&&MessageConsumer&consumer&=&session.createConsumer(destination);&&
&&&&&&&&consumer.setMessageListener(new&MessageListener(){&&
&&&&&&&&&&&&@Override&&
&&&&&&&&&&&&public&void&onMessage(Message&message)&{&&
&&&&&&&&&&&&&&&&try&{&&
&&&&&&&&&&&&&&&&&&&&TextMessage&textMessage=(TextMessage)&&
&&&&&&&&&&&&&&&&&&&&System.out.println(textMessage.getText());&&
&&&&&&&&&&&&&&&&}&catch&(JMSException&e1)&{&&
&&&&&&&&&&&&&&&&&&&&e1.printStackTrace();&&
&&&&&&&&&&&&&&&&}&&
&&&&&&&&&&&&&&&&try&{&&
&&&&&&&&&&&&&&&&&&&&mit();&&
&&&&&&&&&&&&&&&&}&catch&(JMSException&e)&{&&
&&&&&&&&&&&&&&&&&&&&e.printStackTrace();&&
&&&&&&&&&&&&&&&&}&&
&&&&&&&&&&&&}&&
&&&&&&&&});&&
&&&&&&&&&&
&4.Topic(发布/订阅)方式:发布者
package&&&
import&java.util.D&&
import&javax.jms.C&&
import&javax.jms.ConnectionF&&
import&javax.jms.DeliveryM&&
import&javax.jms.JMSE&&
import&javax.jms.MapM&&
import&javax.jms.MessageP&&
import&javax.jms.Q&&
import&javax.jms.S&&
import&javax.jms.TextM&&
import&javax.jms.T&&
import&org.apache.activemq.ActiveMQC&&
import&org.apache.activemq.ActiveMQConnectionF&&
public&class&TopicPublisher&{&&
&&&&private&static&String&user&=&ActiveMQConnection.DEFAULT_USER;&&
&&&&private&static&String&password&=ActiveMQConnection.DEFAULT_PASSWORD;&&
&&&&private&static&String&url&=&&&tcp://localhost:61616&;&&
&&&&public&static&void&main(String[]&args)throws&Exception&{&&
&&&&&&&&&&&
&&&&&&&&ConnectionFactory&connectionFactory&=&new&ActiveMQConnectionFactory(user,password,url);&&
&&&&&&&&&&
&&&&&&&&Connection&connection&=&connectionFactory.createConnection();&&
&&&&&&&&&&
&&&&&&&&connection.start();&&
&&&&&&&&System.out.println(&Connection&is&start...&);&&
&&&&&&&&&&
&&&&&&&&Session&session&=&connection.createSession(Boolean.TRUE,Session.AUTO_ACKNOWLEDGE);&&
&&&&&&&&&&
&&&&&&&&Topic&&destination&=&session.createTopic(&example.A&);&&
&&&&&&&&&&
&&&&&&&&MessageProducer&producer&=&session.createProducer(destination);&&
&&&&&&&&&&
&&&&&&&&producer.setDeliveryMode(DeliveryMode.PERSISTENT);&&
&&&&&&&&&&&
&&&&&&&&sendMessage(session,&producer);&&
&&&&&&&&mit();&&
&&&&&&&&connection.close();&&
&&&&&&&&System.out.println(&send&text&ok.&);&&
&&&&public&static&void&sendMessage(Session&session,&MessageProducer&producer)&&
&&&&&&&&&&&&throws&Exception&{&&
&&&&&&&&for&(int&i&=&1;&i&&=&100;&i++)&{&&
&&&&&&&&&&&&TextMessage&message&=&session.createTextMessage(&ActiveMq&发送的消息&&+&i);&&
&&&&&&&&&&&&&&
&&&&&&&&&&&&System.out.println(&发送消息:&&+&&ActiveMq&发送的消息&&+&i);&&
&&&&&&&&&&&&producer.send(message);&&
&&&&&&&&}&&
&5.Topic(发布/订阅)方式:订阅者
package&&&
import&javax.jms.C&&
import&javax.jms.ConnectionF&&
import&javax.jms.JMSE&&
import&javax.jms.M&&
import&javax.jms.MessageC&&
import&javax.jms.MessageL&&
import&javax.jms.S&&
import&javax.jms.TextM&&
import&javax.jms.T&&
import&org.apache.activemq.ActiveMQC&&
import&org.apache.activemq.ActiveMQConnectionF&&
public&class&TopicSubscriber&{&&
&&&&private&static&String&user&=&ActiveMQConnection.DEFAULT_USER;&&
&&&&private&static&String&password&=ActiveMQConnection.DEFAULT_PASSWORD;&&
&&&&private&static&String&url&=&&tcp://localhost:61616&;&&
&&&&public&static&void&main(String[]&args)&throws&Exception{&&
&&&&&&&&&&
&&&&&&&&ConnectionFactory&connectionFactory&=&new&ActiveMQConnectionFactory(user,password,url);&&
&&&&&&&&&&
&&&&&&&&Connection&connection&=&connectionFactory.createConnection();&&
&&&&&&&&connection.start();&&
&&&&&&&&&&
&&&&&&&&final&Session&session&=&connection.createSession(Boolean.TRUE,&Session.AUTO_ACKNOWLEDGE);&&
&&&&&&&&&&
&&&&&&&&Topic&destination=session.createTopic(&example.A&);&&
&&&&&&&&&&
&&&&&&&&MessageConsumer&consumer&=&session.createConsumer(destination);&&
&&&&&&&&consumer.setMessageListener(new&MessageListener(){&&
&&&&&&&&&&&&@Override&&
&&&&&&&&&&&&public&void&onMessage(Message&message)&{&&
&&&&&&&&&&&&&&&&try&{&&
&&&&&&&&&&&&&&&&&&&&TextMessage&textMessage=(TextMessage)&&
&&&&&&&&&&&&&&&&&&&&System.out.println(textMessage.getText());&&
&&&&&&&&&&&&&&&&}&catch&(JMSException&e1)&{&&
&&&&&&&&&&&&&&&&&&&&e1.printStackTrace();&&
&&&&&&&&&&&&&&&&}&&
&&&&&&&&&&&&&&&&try&{&&
&&&&&&&&&&&&&&&&&&&&mit();&&
&&&&&&&&&&&&&&&&}&catch&(JMSException&e)&{&&
&&&&&&&&&&&&&&&&&&&&e.printStackTrace();&&
&&&&&&&&&&&&&&&&}&&
&&&&&&&&&&&&}&&
&&&&&&&&});&&
&&&&&&&&&&
Queue(点对点)方式和Topic(发布/订阅)方式 的运行结果最明显的区别是:在Queue(点对点)方式中先运行生产者,再运行消费者,消费者还能接受到消息;
而Topic(发布/订阅)方式就不同了,先运行发布者,再运行订阅者,订阅者收到的消息
可能没有或者是不完全的。
在上一章(&)说到能与spring进行整合,与Spring进行整合有一定的好处,首先是可配置化,然后是能使用Spring的aop,tx等特性进行项目开发.
一.准备工作
我使用的是spring版本是4.0.0.M2,其他版本的也可以,只是配置不同,去Spring官网下载zip包,解开后将dist目录下的所有jar包(根据自己选择)拷贝到项目lib目录下并加入项目项目中的lib中,一般jms所需要的Spring的jar有:
二.代码开发
1.在src目录下新建applicationContext.xml文件并输入一下内容:
&version=&1.0&&encoding=&GBK&&&
&xmlns=&http://www.springframework.org/schema/beans&&&
&&&&xmlns:xsi=&http://www.w3.org/2001/XMLSchema-instance&&&
&&&&xsi:schemaLocation=&http://www.springframework.org/schema/beans&&
&&&&&&&&&&&http://www.springframework.org/schema/beans/spring-beans.xsd&&&
2.引入spring,打开web.xml并将其内容修改为以下内容:
&version=&1.0&&encoding=&GBK&&&
&version=&2.5&&xmlns=&/xml/ns/javaee&&&
&&&&xmlns:xsi=&http://www.w3.org/2001/XMLSchema-instance&&&
&&&&xsi:schemaLocation=&/xml/ns/javaee&&&
&&&&/xml/ns/javaee/web-app_2_5.xsd&&&
&&&&&&&&contextConfigLocation&&
&&&&&&&&classpath*:applicationContext*.xml&&
&&&&&&&&spring&&
&&&&&&&&&&
&&&&&&&&&&&&org.springframework.web.servlet.DispatcherServlet&&
&&&&&&&&&&
&&&&&&&&1&&
&&&&&&&&spring&&
&&&&&&&&/&&
&&&&&&&&index.jsp&&
3.配置JMSTemplate模板
类似于jdbcTemplate,首先要配置一个ConnectionFactory,之后要开始配置JmsTemplate模板了。最后是配置消息目标了。消息分为队列(Queue)和主题(Topic)两大类。在applicationContext.xml中加入如下内容:
&&&&&id=&connectionFactory&&&
&&&&&&&&class=&org.apache.activemq.ActiveMQConnectionFactory&&&
&&&&&&&&&name=&brokerURL&&value=&tcp://localhost:61616&&&&
&&&&&id=&queueDest&&&
&&&&&&&&class=&org.mand.ActiveMQQueue&&&
&&&&&&&&&&
&&&&&&&&&index=&0&&value=&myQueue&&&&
&&&&&id=&jmsQueueTemplate&&&
&&&&&&&&class=&org.springframework.jms.core.JmsTemplate&&&
&&&&&&&&&name=&connectionFactory&&ref=&connectionFactory&&&&
&&&&&&&&&name=&defaultDestination&&ref=&queueDest&&&&
&&&&&&&&&&
&&&&&id=&topicDest&&&
&&&&&&&&class=&org.mand.ActiveMQTopic&&&
&&&&&&&&&&
&&&&&&&&&index=&0&&value=&myTopic&&&&
&&&&&id=&jmsTopicTemplate&&&
&&&&&&&&class=&org.springframework.jms.core.JmsTemplate&&&
&&&&&&&&&name=&connectionFactory&&ref=&connectionFactory&&&&
&&&&&&&&&name=&defaultDestination&&ref=&topicDest&&&&
&&&&&&&&&&
&&&&&&&&&name=&pubSubDomain&&value=&true&&&
receiveTimeout表示接收消息时的超时时间,设置的为10秒,因为如果不设置的话,加入接收消息时是阻塞着的,那么将一直阻塞下去。配置完成了。但是我不建议设置这个时间,如果到达设置时间之后,生产者没有运行,消费者接受到Message对象为null,测试可能会出现异常,而且消费者将停止接受消息.那么如何使用JmsTemplate发送消息呢?
spring的beanfactory得到一个jmsTemplate的实例和消息目标的实例,发送消息,够简单的吧。首先我们还从queue方式开始。下面我们就来编写具体代码。
4、编写Point-to-Point&(点对点)代码
新建生产者类QueueProducerService.java,代码如下:
package&jms.mq.&&
import&java.util.D&&
import&javax.jms.D&&
import&javax.jms.JMSE&&
import&javax.jms.M&&
import&javax.jms.S&&
import&javax.jms.TextM&&
import&org.springframework.jms.core.JmsT&&
import&org.springframework.jms.core.MessageC&&
public&class&QueueProducerService{&&
&&&&JmsTemplate&jmsT&&
&&&&Destination&&&
&&&&public&void&send()&{&&
&&&&&&&&MessageCreator&messageCreator&=&new&MessageCreator()&{&&
&&&&&&&&&&&&public&Message&createMessage(Session&session)&throws&JMSException&{&&
&&&&&&&&&&&&&&&&TextMessage&message&=&session.createTextMessage();&&
&&&&&&&&&&&&&&&&message.setText(&QueueProducerService发送消息&+new&Date());&&
&&&&&&&&&&&&&&&&return&&&
&&&&&&&&&&&&}&&
&&&&&&&&};&&
&&&&&&&&jmsTemplate.send(this.destination,messageCreator);&&
&&&&public&void&setJmsTemplate(JmsTemplate&jmsTemplate)&{&&
&&&&&&&&this.jmsTemplate&=&jmsT&&
&&&&public&void&setDestination(Destination&destination)&{&&
&&&&&&&&this.destination&=&&&
生产者编写完了,下面我们来编写消费者,上面说了,发送消息的时候,spring的beanfactory得到一个jmsTemplate的实例和消息目标的实例,然后发送,那么接受的时候肯定也是得到一个jmsTemplate的实例和消息目标的实例,然后接受,下面我们来看具体代码。
新建一个消费者类QueueConsumerService.java,具体代码如下:
package&jms.mq.&&
import&javax.jms.D&&
import&javax.jms.JMSE&&
import&javax.jms.TextM&&
import&org.springframework.jms.core.JmsT&&
public&class&QueueConsumerService{&&
&&&&JmsTemplate&jmsT&&
&&&&Destination&&&
&&&&public&void&receive()&{&&
&&&&&&&&TextMessage&message&=&(TextMessage)&jmsTemplate.receive();&&
&&&&&&&&try&{&&
&&&&&&&&&&&&System.out.println(&QueueConsumerService收到消息:&+message.getText());&&
&&&&&&&&}&catch&(JMSException&e)&{&&
&&&&&&&&&&&&e.printStackTrace();&&
&&&&&&&&}&&
&&&&public&void&setJmsTemplate(JmsTemplate&jmsTemplate)&{&&
&&&&&&&&this.jmsTemplate&=&jmsT&&
&&&&public&void&setDestination(Destination&destination)&{&&
&&&&&&&&this.destination&=&&&
代码编写完毕,下面要进行bean的配置,在applicationContext.xml中加入如下代码实例化对象和依赖注入:
&id=&queueProducerService&&class=&jms.mq.spring.QueueProducerService&&&
&&&&&name=&jmsTemplate&&ref=&jmsQueueTemplate&&&&
&&&&&name=&destination&&ref=&queueDest&&&&
&id=&queueConsumerService&&class=&jms.mq.spring.QueueConsumerService&&&
&&&&&name=&jmsTemplate&&ref=&jmsQueueTemplate&&&&
&&&&&name=&destination&&ref=&queueDest&&&&
需要的业务代码都已编写完毕,下面编写测试代码。新建一个生产者的测试类QueueProducerTest.java。具体代码如下:
package&jms.mq.&&
import&org.springframework.context.ApplicationC&&
import&org.springframework.context.support.ClassPathXmlApplicationC&&
public&class&QueueProducerTest&{&&
&&&&private&static&ApplicationContext&appContext&=&new&ClassPathXmlApplicationContext(&&applicationContext.xml&);&&
&&&&private&static&void&send()&{&&
&&&&&&&&QueueProducerService&producerService&=&(QueueProducerService)&appContext.getBean(&queueProducerService&);&&
&&&&&&&&producerService.send();&&
&&&&public&static&void&main(String[]&args)&{&&
&&&&&&&&send();&&
再建一个消费者的测试类,QueueConsumerTest.java,具体代码如下:
package&jms.mq.&&
import&org.springframework.context.ApplicationC&&
import&org.springframework.context.support.ClassPathXmlApplicationC&&
public&class&QueueConsumerTest&{&&
&&&&private&static&ApplicationContext&appContext&=&new&ClassPathXmlApplicationContext(&&applicationContext.xml&);&&
&&&&private&static&void&receive()&{&&
&&&&&&&&QueueConsumerService&consumerService&=&(QueueConsumerService)&appContext.getBean(&queueConsumerService&);&&
&&&&&&&&consumerService.receive();&&
&&&&public&static&void&main(String[]&args)&{&&
&&&&&&&&receive();&&
5、运行point-point(点对点)程序
所有代码都编写完了,我们来看一下我们的劳动成果。运行生产者测试类。控制台打印出如下内容,画线标注的就是我们发送的内容:
6、编写Publisher/Subscriber(发布/订阅者)代码
新建发布者TopicPublisherService.java,内容如下:
package&jms.mq.&&
import&java.util.D&&
import&javax.jms.D&&
import&javax.jms.JMSE&&
import&javax.jms.MapM&&
import&javax.jms.M&&
import&javax.jms.S&&
import&javax.jms.TextM&&
import&org.springframework.jms.core.JmsT&&
import&org.springframework.jms.core.MessageC&&
import&jms.spring.QueueProducerS&&
public&class&TopicPublisherService{&&
&&&&JmsTemplate&jmsT&&
&&&&Destination&&&
&&&&public&void&send()&{&&
&&&&&&&&MessageCreator&messageCreator&=&new&MessageCreator()&{&&
&&&&&&&&&&&&public&Message&createMessage(Session&session)&throws&JMSException&{&&
&&&&&&&&&&&&&&&&TextMessage&message&=&session.createTextMessage();&&
&&&&&&&&&&&&&&&&message.setText(&QueueProducerService发送消息&+new&Date());&&
&&&&&&&&&&&&&&&&return&&&
&&&&&&&&&&&&}&&
&&&&&&&&};&&
&&&&&&&&jmsTemplate.send(this.destination,messageCreator);&&
&&&&public&void&setJmsTemplate(JmsTemplate&jmsTemplate)&{&&
&&&&&&&&this.jmsTemplate&=&jmsT&&
&&&&public&void&setDestination(Destination&destination)&{&&
&&&&&&&&this.destination&=&&&
再新建一个订阅者TopicSubscriberService.java,代码如下。
package&jms.mq.&&
import&javax.jms.D&&
import&javax.jms.JMSE&&
import&javax.jms.TextM&&
import&org.springframework.jms.core.JmsT&&
import&jms.spring.QueueConsumerS&&
public&class&TopicSubscriberService{&&
&&&&JmsTemplate&jmsT&&
&&&&Destination&&&
&&&&public&void&receive()&{&&
&&&&&&&&TextMessage&message&=&(TextMessage)&jmsTemplate.receive();&&
&&&&&&&&try&{&&
&&&&&&&&&&&&System.out.println(&QueueConsumerService收到消息:&+message.getText());&&
&&&&&&&&}&catch&(JMSException&e)&{&&
&&&&&&&&&&&&e.printStackTrace();&&
&&&&&&&&}&&
&&&&public&void&setJmsTemplate(JmsTemplate&jmsTemplate)&{&&
&&&&&&&&this.jmsTemplate&=&jmsT&&
&&&&public&void&setDestination(Destination&destination)&{&&
&&&&&&&&this.destination&=&&&
在配置文件中applicationContext.xml增加如下配置:
&style=&white-space:pre&&&&id=&topicPublisherService&&class=&jms.mq.spring.TopicPublisherService&&&
&&&&&&&&name=&jmsTemplate&&ref=&jmsTopicTemplate&&&
&&&&&&&&name=&destination&&ref=&topicDest&&&
&&&&&id=&topicSubscriberService&&class=&jms.mq.spring.TopicSubscriberService&&&
&&&&&&&&name=&jmsTemplate&&ref=&jmsTopicTemplate&&&
&&&&&&&&name=&destination&&ref=&topicDest&&&
编写测试程序发布者测试类,TopicPublisherTest.java
package&jms.mq.&&
import&org.springframework.context.ApplicationC&&
import&org.springframework.context.support.ClassPathXmlApplicationC&&
public&class&TopicPublisherTest&{&&
&&&&private&static&ApplicationContext&appContext&=&new&ClassPathXmlApplicationContext(&&applicationContext.xml&);&&
&&&&private&static&void&send()&{&&
&&&&&&&&TopicPublisherService&topicPublisherService&=&(TopicPublisherService)&appContext.getBean(&topicPublisherService&);&&
&&&&&&&&topicPublisherService.send();&&
&&&&public&static&void&main(String[]&args)&{&&
&&&&&&&&send();&&
编写测试程序订阅者测试类,TopicSubscriberTest.java
package&jms.mq.&&
import&org.springframework.context.ApplicationC&&
import&org.springframework.context.support.ClassPathXmlApplicationC&&
public&class&TopicSubscriberTest&{&&
&&&&private&static&ApplicationContext&appContext&=&new&ClassPathXmlApplicationContext(&&applicationContext.xml&);&&
&&&&private&static&void&receive()&{&&
&&&&&&&&TopicSubscriberService&topicSubscriberService&=&(TopicSubscriberService)&appContext.getBean(&topicSubscriberService&);&&
&&&&&&&&topicSubscriberService.receive();&&
&&&&public&static&void&main(String[]&args)&{&&
&&&&&&&&receive();&&
7.Publisher/Subscriber(发布/订阅者)程序
先运行订阅者,再运行发布者,可以看到订阅者能打印信息;但是反之就不行,这就是Publisher/Subscriber(发布/订阅者)的特性;
跟Point-Point(点对点)对比的话,不管运行生存者还是消费者,都会打印信息,可以阅读前一章了解这两种模式的区别和联系。
附加完整的applicationContext.xml配置文件
&version=&1.0&&encoding=&GBK&&&
&xmlns=&http://www.springframework.org/schema/beans&&&
&&&&xmlns:xsi=&http://www.w3.org/2001/XMLSchema-instance&&&
&&&&xsi:schemaLocation=&http://www.springframework.org/schema/beans&&
&&&&&&&&&&&http://www.springframework.org/schema/beans/spring-beans.xsd&&&
&&&&&id=&connectionFactory&&&
&&&&&&&&class=&org.apache.activemq.ActiveMQConnectionFactory&&&
&&&&&&&&&name=&brokerURL&&value=&tcp://localhost:61616&&&&
&&&&&id=&queueDest&&&
&&&&&&&&class=&org.mand.ActiveMQQueue&&&
&&&&&&&&&&
&&&&&&&&&index=&0&&value=&myQueue&&&&
&&&&&id=&jmsQueueTemplate&&&
&&&&&&&&class=&org.springframework.jms.core.JmsTemplate&&&
&&&&&&&&&name=&connectionFactory&&ref=&connectionFactory&&&&
&&&&&&&&&name=&defaultDestination&&ref=&queueDest&&&&
&&&&&&&&&&
&&&&&id=&topicDest&&&
&&&&&&&&class=&org.mand.ActiveMQTopic&&&
&&&&&&&&&&
&&&&&&&&&index=&0&&value=&myTopic&&&&
&&&&&id=&jmsTopicTemplate&&&
&&&&&&&&class=&org.springframework.jms.core.JmsTemplate&&&
&&&&&&&&&name=&connectionFactory&&ref=&connectionFactory&&&&
&&&&&&&&&name=&defaultDestination&&ref=&topicDest&&&&
&&&&&&&&&&
&&&&&&&&&name=&pubSubDomain&&value=&true&&&
&&&&&id=&queueProducerService&&class=&jms.mq.spring.QueueProducerService&&&
&&&&&&&&&name=&jmsTemplate&&ref=&jmsQueueTemplate&&&&
&&&&&&&&&name=&destination&&ref=&queueDest&&&&
&&&&&id=&queueConsumerService&&class=&jms.mq.spring.QueueConsumerService&&&
&&&&&&&&&name=&jmsTemplate&&ref=&jmsQueueTemplate&&&&
&&&&&&&&&name=&destination&&ref=&queueDest&&&&
&&&&&id=&topicPublisherService&&class=&jms.mq.spring.TopicPublisherService&&&
&&&&&&&&name=&jmsTemplate&&ref=&jmsTopicTemplate&&&
&&&&&&&&name=&destination&&ref=&topicDest&&&
&&&&&id=&topicSubscriberService&&class=&jms.mq.spring.TopicSubscriberService&&&
&&&&&&&&name=&jmsTemplate&&ref=&jmsTopicTemplate&&&
&&&&&&&&name=&destination&&ref=&topicDest&&&
对于让spring管理监听的实现方式有两种方法,一种是自己写监听器,然后交给spring的监听适配器管理,再由监听容器管理监听适配器,另一种是写一个实现MessageListener接口的类。第一种在第一章涉及到,但是没有交给spring托管.其实实现的方法很简单,在j2ee mvc模式中,用到spring的aop特性.这章讲第
二种方法,这样消费者可以时时接受到生产者的消息,订阅者可以时时接受到发布者的消息.不用循环接受。
1.Queue(点对点)方式
消费者QueueConsumerMessageListener类,具体如下:
package&jms.mq.&&
import&javax.jms.JMSE&&
import&javax.jms.M&&
import&javax.jms.MessageL&&
import&javax.jms.TextM&&
public&class&QueueConsumerMessageListener&implements&MessageListener{&&
&&&&public&void&onMessage(Message&msg)&{&&
&&&&&&&&if(msg&instanceof&TextMessage){&&
&&&&&&&&&&&&TextMessage&textMessage&=&(TextMessage)&&&
&&&&&&&&&&&&try&{&&
&&&&&&&&&&&&&&&&System.out.println(&--队列&MessageListener收到信息:&+textMessage.getText());&&
&&&&&&&&&&&&}&catch&(JMSException&e)&{&&
&&&&&&&&&&&&&&&&e.printStackTrace();&&
&&&&&&&&&&&&}&&
&&&&&&&&}&&
2.Topic(发布/订阅)方式
订阅者实现类TopicSubscriberMessageListener类,具体如下:
package&jms.mq.&&
import&javax.jms.JMSE&&
import&javax.jms.M&&
import&javax.jms.MessageL&&
import&javax.jms.TextM&&
public&class&TopicSubscriberMessageListener&implements&MessageListener{&&
&&&&public&void&onMessage(Message&msg)&{&&
&&&&&&&&if(msg&instanceof&TextMessage){&&
&&&&&&&&&&&&TextMessage&textMessage&=&(TextMessage)&&&
&&&&&&&&&&&&try&{&&
&&&&&&&&&&&&&&&&System.out.println(&--订阅者&MessageListener收到信息:&+textMessage.getText());&&
&&&&&&&&&&&&}&catch&(JMSException&e)&{&&
&&&&&&&&&&&&&&&&e.printStackTrace();&&
&&&&&&&&&&&&}&&
&&&&&&&&}&&
3.修改配置文件
&!--&实现接口的方式&--&&&
&&&&&bean&id=&queueConsumerMessageListener&&class=&jms.mq.spring.QueueConsumerMessageListener&&&&
&&&&&/bean&&&
&&&&&!--&新增一个队列地址名字不能跟上面的重复&--&&&
&&&&&bean&id=&queueConsumerMessageListenerDest&&class=&org.mand.ActiveMQQueue&&&&
&&&&&&&&&constructor-arg&index=&0&&value=&myMessageListenerQueue&&/&&&
&&&&&/bean&&&
&&&&&bean&id=&myMsgQueuelistenerContainer&&&
&&&&&&&&class=&org.springframework.jms.listener.DefaultMessageListenerContainer&&&&
&&&&&&&&&property&name=&connectionFactory&&ref=&connectionFactory&&/&&&
&&&&&&&&&property&name=&destination&&ref=&queueConsumerMessageListenerDest&&/&&&
&&&&&&&&&property&name=&messageListener&&ref=&queueConsumerMessageListener&&/&&&
&&&&&&&&&property&name=&receiveTimeout&&value=&10000&&/&&&
&&&&&/bean&&&
&&&&&bean&id=&topicSubscriberMessageListener&&class=&jms.mq.spring.TopicSubscriberMessageListener&&&/bean&&&
&&&&&bean&id=&topicSubscriberMessageListenerDest&&class=&org.mand.ActiveMQTopic&&&&
&&&&&&&&&constructor-arg&index=&0&&value=&myMessageListenerTopic&&/&&&
&&&&&/bean&&&
&&&&&bean&id=&myMsgTopiclistenerContainer&&&
&&&&&&&&class=&org.springframework.jms.listener.DefaultMessageListenerContainer&&&&
&&&&&&&&&property&name=&connectionFactory&&ref=&connectionFactory&&/&&&
&&&&&&&&&property&name=&destination&&ref=&topicSubscriberMessageListenerDest&&/&&&
&&&&&&&&&property&name=&messageListener&&ref=&topicSubscriberMessageListener&&/&&&
&&&&&&&&&property&name=&pubSubDomain&&value=&true&&/&&&
&&&&&&&&&property&name=&receiveTimeout&&value=&10000&&/&&&
&&&&&/bean&&&
顺便将上一章的配置也改一下
&bean&id=&queueProducerService&&class=&jms.mq.spring.QueueProducerService&&&&
&&&&&&&&&property&name=&jmsTemplate&&ref=&jmsQueueTemplate&&/&&&
&&&&&&&&&!--&property&name=&destination&&ref=&queueDest&&/&&--&&&
&&&&&&&&&property&name=&destination&&ref=&queueConsumerMessageListenerDest&&/&&&
&&&&&/bean&&&
&&&&&bean&id=&queueConsumerService&&class=&jms.mq.spring.QueueConsumerService&&&&
&&&&&&&&&property&name=&jmsTemplate&&ref=&jmsQueueTemplate&&/&&&
&&&&&&&&&property&name=&destination&&ref=&queueDest&&/&&&&
&&&&&/bean&&&
&&&&&bean&id=&topicPublisherService&&class=&jms.mq.spring.TopicPublisherService&&&&
&&&&&&&&&property&name=&jmsTemplate&&ref=&jmsTopicTemplate&&/&&&
&&&&&&&&&!--&&property&name=&destination&&ref=&topicDest&&/&&--&&&
&&&&&&&&&property&name=&destination&&ref=&topicSubscriberMessageListenerDest&&/&&&
&&&&&/bean&&&
&&&&&bean&id=&topicSubscriberService&&class=&jms.mq.spring.TopicSubscriberService&&&&
&&&&&&&&&property&name=&jmsTemplate&&ref=&jmsTopicTemplate&&/&&&
&&&&&&&&&property&name=&destination&&ref=&topicDest&&/&&&
&&&&&/bean&&&
注意是修改 bean id为queueProducerService和的topicPublisherService的destination
Queue(点对点)方式,只运行QueueProducerTest.java,结果如下
Topic(发布/订阅)方式,只运行TopicPublisherTest.java,结果如下
ok,如果Queue(点对点)方式和Topic(发布/订阅)方式运行的结果如上图所示,说明你成功了.
前几章都是直接发送MapMessage类型的数据,拿前面的例子来讲,如果生产者发送的是TextMessage,消费者也是必须TextM如果我们自己要发送的数据不是TextMessage类型,而消费者还是TextMessage的,那该怎么办?难道每次接受后都要增加一个转换方法么?其实spring早就考虑到这种情况了。转化器在很多组件中都是必不缺少的东西Spring的MessageConverter接口提供了对消息转换的支持。
1、转换类的相关代码POJO
新建一个类MsgPoJo,就是一个简单的Pojo类。具体代码如下:
package&jms.mq.&&
import&java.io.S&&
public&class&MsgPoJo&implements&Serializable{&&
&&&&private&String&&&
&&&&private&String&&&
&&&&public&String&getId()&{&&
&&&&&&&&return&&&
&&&&public&void&setId(String&id)&{&&
&&&&&&&&this.id&=&&&
&&&&public&String&getText()&{&&
&&&&&&&&return&&&
&&&&public&void&setText(String&text)&{&&
&&&&&&&&this.text&=&&&
&&&&}&&&&&
2.转换类的实现
新建一个类MsgConverter.java,实现MessageConverter接口。生成的代码如下
package&jms.mq.&&
import&javax.jms.JMSE&&
import&javax.jms.M&&
import&javax.jms.S&&
import&javax.jms.TextM&&
import&org.springframework.jms.support.converter.MessageConversionE&&
import&org.springframework.jms.support.converter.MessageC&&
public&class&MsgConverter&implements&MessageConverter{&&
&&&&@Override&&
&&&&public&Object&fromMessage(Message&message)&throws&JMSException,&&
&&&&MessageConversionException&{&&
&&&&&&&&if&(!(message&instanceof&TextMessage))&{&&
&&&&&&&&&&&&throw&new&MessageConversionException(&Message&is&not&TextMessage&);&&
&&&&&&&&}&&
&&&&&&&&System.out.println(&--转换接收的消息--&);&&
&&&&&&&&TextMessage&textMessage&=&(TextMessage)&&&
&&&&&&&&MsgPoJo&msgPojo&=&new&MsgPoJo();&&
&&&&&&&&String[]&texts=textMessage.getText().split(&,&);&&
&&&&&&&&msgPojo.setId(texts[0]);&&
&&&&&&&&msgPojo.setText(texts[1]);&&
&&&&&&&&return&msgP&&
&&&&@Override&&
&&&&public&Message&toMessage(Object&object,&Session&session)&throws&JMSException,&&
&&&&MessageConversionException&{&&
&&&&&&&&if&(!(object&instanceof&MsgPoJo))&{&&
&&&&&&&&&&&&throw&new&MessageConversionException(&obj&is&not&MsgPojo&);&&
&&&&&&&&}&&
&&&&&&&&System.out.println(&--转换发送的消息--&);&&
&&&&&&&&MsgPoJo&msgPojo&=&(MsgPoJo)&&&
&&&&&&&&TextMessage&textMessage&=&session.createTextMessage();&&
&&&&&&&&textMessage.setText(msgPojo.getId()+&,&+msgPojo.getText());&&
&&&&&&&&return&&textM&&
代码很简单就是做些转换,有fromMessage和toMessage两个方法,真好对应发送转换toMessage和接受转换fromMessage。此时,发送和接收消息要换成template.convertAndSend(message);template.receiveAndConvert()。接下来我做一些配置,让spring知道我们的转换类。修改applicationContext.xml中jms模版配置的代码,修改后的代码如下:
&id=&msgConverter&&class=&jms.mq.spring.MsgConverter&&&
&id=&jmsQueueTemplate&&class=&org.springframework.jms.core.JmsTemplate&&&
&&&&&name=&connectionFactory&&ref=&connectionFactory&&&&
&&&&&name=&defaultDestination&&ref=&queueDest&&&&
&&&&&name=&messageConverter&&ref=&msgConverter&&&
注意:如果你有队列监听容器配置,配置jmsQueueTemplate和jmsTopicTemplate可能与队列容器配置冲突。
3、业务相关代码和配置
在QueueProducerService.java增加convertAndSend()方法并在其实现类中实现,实现类的代码如下:
package&jms.mq.&&
import&java.util.D&&
import&javax.jms.D&&
import&javax.jms.JMSE&&
import&javax.jms.M&&
import&javax.jms.S&&
import&javax.jms.TextM&&
import&org.springframework.jms.core.JmsT&&
import&org.springframework.jms.core.MessageC&&
public&class&QueueProducerService{&&
&&&&JmsTemplate&jmsT&&
&&&&Destination&&&
&&&&public&void&send()&{&&
&&&&&&&&MessageCreator&messageCreator&=&new&MessageCreator()&{&&
&&&&&&&&&&&&public&Message&createMessage(Session&session)&throws&JMSException&{&&
&&&&&&&&&&&&&&&&TextMessage&message&=&session.createTextMessage();&&
&&&&&&&&&&&&&&&&message.setText(&QueueProducerService发送消息&+new&Date());&&
&&&&&&&&&&&&&&&&return&&&
&&&&&&&&&&&&}&&
&&&&&&&&};&&
&&&&&&&&jmsTemplate.send(this.destination,messageCreator);&&
&&&&public&void&convertAndSend(){&&
&&&&&&&&MsgPoJo&msgPojo&=&new&MsgPoJo();&&
&&&&&&&&msgPojo.setId(&1&);&&
&&&&&&&&msgPojo.setText(&first&msg&);&&
&&&&&&&&System.out.println(&--发送消息:msgPojo.id为&+msgPojo.getId()+&;msgPojo.text为&+msgPojo.getText());&&
&&&&&&&&jmsTemplate.convertAndSend(this.destination,&msgPojo);&&
&&&&public&void&setJmsTemplate(JmsTemplate&jmsTemplate)&{&&
&&&&&&&&this.jmsTemplate&=&jmsT&&
&&&&public&void&setDestination(Destination&destination)&{&&
&&&&&&&&this.destination&=&&&
同样在QueueConsumerService.java中增加receiveAndConvert()方法并在其实现类中实现,实现类的代码如下:
package&jms.mq.&&
import&javax.jms.D&&
import&javax.jms.JMSE&&
import&javax.jms.TextM&&
import&org.springframework.jms.core.JmsT&&
public&class&QueueConsumerService{&&
&&&&JmsTemplate&jmsT&&
&&&&Destination&&&
&&&&public&void&receive()&{&&
&&&&&&&&TextMessage&message&=&(TextMessage)&jmsTemplate.receive();&&
&&&&&&&&try&{&&
&&&&&&&&&&&&System.out.println(&QueueConsumerService收到消息:&+message.getText());&&
&&&&&&&&}&catch&(JMSException&e)&{&&
&&&&&&&&&&&&e.printStackTrace();&&
&&&&&&&&}&&
&&&&public&void&receiveAndConvert()&{&&
&&&&&&&&MsgPoJo&msgPojo&=&(MsgPoJo)jmsTemplate.receiveAndConvert();&&
&&&&&&&&if(msgPojo!=null){&&
&&&&&&&&&&&&System.out.println(&--收到消息:msgPojo.id为&+msgPojo.getId()+&;msgPojo.text为&+msgPojo.getText());&&
&&&&&&&&}&&
&&&&public&void&setJmsTemplate(JmsTemplate&jmsTemplate)&{&&
&&&&&&&&this.jmsTemplate&=&jmsT&&
&&&&public&void&setDestination(Destination&destination)&{&&
&&&&&&&&this.destination&=&&&
修改我们的两个测试类,增加对转换方法的调用,不再赘述,直接上代码:
QueueConsumerTest.java测试类
package&jms.mq.&&
import&org.springframework.context.ApplicationC&&
import&org.springframework.context.support.ClassPathXmlApplicationC&&
public&class&QueueConsumerTest&{&&
&&&&private&static&ApplicationContext&appContext&=&new&ClassPathXmlApplicationContext(&&applicationContext.xml&);&&
&&&&private&static&void&receive()&{&&
&&&&&&&&QueueConsumerService&consumerService&=&(QueueConsumerService)&appContext.getBean(&queueConsumerService&);&&
&&&&&&&&consumerService.receive();&&
&&&&private&static&void&receiveAndConvert()&{&&
&&&&&&&&QueueConsumerService&consumerService&=&(QueueConsumerService)&appContext.getBean(&queueConsumerService&);&&
&&&&&&&&consumerService.receiveAndConvert();&&
&&&&public&static&void&main(String[]&args)&{&&
&&&&&&&&&&
&&&&&&&&receiveAndConvert();&&
QueueProducerTest.java测试类
package&jms.mq.&&
import&org.springframework.context.ApplicationC&&
import&org.springframework.context.support.ClassPathXmlApplicationC&&
public&class&QueueProducerTest&{&&
&&&&private&static&ApplicationContext&appContext&=&new&ClassPathXmlApplicationContext(&&applicationContext.xml&);&&
&&&&private&static&void&send()&{&&
&&&&&&&&QueueProducerService&producerService&=&(QueueProducerService)&appContext.getBean(&queueProducerService&);&&
&&&&&&&&producerService.send();&&
&&&&private&static&void&convertAndSend()&{&&
&&&&&&&&QueueProducerService&producerService&=&(QueueProducerService)&appContext.getBean(&queueProducerService&);&&
&&&&&&&&producerService.convertAndSend();&&
&&&&public&static&void&main(String[]&args)&{&&
&&&&&&&&&&
&&&&&&&&convertAndSend();&&
代码编写完毕,我们看一下我们的劳动成果。首先运行生产者类和消费者控制台信息如下:
收到的内容与发的内容相同,说明转换成功了。如果这一部分的程序使用的队列跟上面的一样,那你会发现发送的时候打印出的信息不值上面的一个,还包括一个接收的信息,这是为什么呢?了解spring原理的人应该知道,spring是把所有类都加载到内容中,当然也包括我们上门写的按个实现MessageListener的一个消费者类,他们也在运行,如果监听的地址跟你送的地址正好相同的话,他也有可能收到这个信息。所以在测试的时候要注意修改配置文件。
&id=&queueProducerService&&class=&jms.mq.spring.QueueProducerService&&&
&&&&&name=&jmsTemplate&&ref=&jmsQueueTemplate&&&&
&&&&&name=&destination&&ref=&queueDest&&&&&&
&id=&queueConsumerService&&class=&jms.mq.spring.QueueConsumerService&&&
&&&&&name=&jmsTemplate&&ref=&jmsQueueTemplate&&&&
&&&&&name=&destination&&ref=&queueDest&&&&&
4、监听器上的使用方式
我再来学习一下跟监听器联合使用的方式,只在发布订阅者模式上演示一下。我们先来修改发布者的实现方式,在发布者中增加convertAndSend方法并在其实现类中实现,订阅者监听器没有类转换,不用修改,发布者修改后的代码如下:
package&jms.mq.&&
import&java.util.D&&
import&javax.jms.D&&
import&javax.jms.JMSE&&
import&javax.jms.MapM&&
import&javax.jms.M&&
import&javax.jms.S&&
import&javax.jms.TextM&&
import&org.springframework.jms.core.JmsT&&
import&org.springframework.jms.core.MessageC&&
import&jms.spring.QueueProducerS&&
public&class&TopicPublisherService{&&
&&&&JmsTemplate&jmsT&&
&&&&Destination&&&
&&&&public&void&send()&{&&
&&&&&&&&MessageCreator&messageCreator&=&new&MessageCreator()&{&&
&&&&&&&&&&&&public&Message&createMessage(Session&session)&throws&JMSException&{&&
&&&&&&&&&&&&&&&&TextMessage&message&=&session.createTextMessage();&&
&&&&&&&&&&&&&&&&message.setText(&QueueProducerService发送消息&+new&Date());&&
&&&&&&&&&&&&&&&&return&&&
&&&&&&&&&&&&}&&
&&&&&&&&};&&
&&&&&&&&jmsTemplate.send(this.destination,messageCreator);&&
&&&&public&void&convertAndSend(Object&obj)&{&&
&&&&&&&&System.out.println(&--发送PoJo对象...&);&&
&&&&&&&&jmsTemplate.convertAndSend(destination,&obj);&&
&&&&public&void&setJmsTemplate(JmsTemplate&jmsTemplate)&{&&
&&&&&&&&this.jmsTemplate&=&jmsT&&
&&&&public&void&setDestination(Destination&destination)&{&&
&&&&&&&&this.destination&=&&&
发布订阅者配置文件如下
&id=&jmsTopicTemplate&&class=&org.springframework.jms.core.JmsTemplate&&&
&&&&&name=&connectionFactory&&ref=&connectionFactory&&&&
&&&&&name=&defaultDestination&&ref=&topicDest&&&&
&&&&&name=&pubSubDomain&&value=&true&&&&
&&&&&name=&messageConverter&&ref=&msgConverter&&&
&id=&topicPublisherService&&class=&jms.mq.spring.TopicPublisherService&&&
&&&&&&&&&name=&jmsTemplate&&ref=&jmsTopicTemplate&&&&
&&&&&&&&&&
&&&&&&&&&name=&destination&&ref=&topicSubscriberMessageListenerDest&&&&&
&&&&&id=&topicSubscriberService&&class=&jms.mq.spring.TopicSubscriberService&&&
&&&&&&&&&name=&jmsTemplate&&ref=&jmsTopicTemplate&&&&
&&&&&&&&&name=&destination&&ref=&topicDest&&&&
修改上面的发布测试类,修改增加对新增方法的调用,修改后的内容如下:
package&jms.mq.&&
import&org.springframework.context.ApplicationC&&
import&org.springframework.context.support.ClassPathXmlApplicationC&&
public&class&TopicPublisherTest&{&&
&&&&private&static&ApplicationContext&appContext&=&new&ClassPathXmlApplicationContext(&&applicationContext.xml&);&&
&&&&private&static&void&send()&{&&
&&&&&&&&TopicPublisherService&topicPublisherService&=&(TopicPublisherService)&appContext.getBean(&topicPublisherService&);&&
&&&&&&&&topicPublisherService.send();&&
&&&&private&static&void&convertAndSend()&{&&
&&&&&&&&TopicPublisherService&topicPublisherService&=&(TopicPublisherService)&appContext.getBean(&topicPublisherService&);&&
&&&&&&&&MsgPoJo&msgPoJo&=&new&MsgPoJo();&&
&&&&&&&&msgPoJo.setId(&1&);&&
&&&&&&&&msgPoJo.setText(&测试内容&);&&
&&&&&&&&topicPublisherService.convertAndSend(msgPoJo);&&
&&&&public&static&void&main(String[]&args)&{&&
&&&&&&&&&&
&&&&&&&&convertAndSend();&&
运行发布测试类,运行结果如下:
写在到这里,ActiveMQ与spring整合就讲完了,主要讲了ActiveMQ与spring的简单整合,监听器和类转换这些主要功能.
呵呵,写到不好,请大家不要拍砖。
前面几章讲了ActiveMQ以及ActiveMQ与spring的整合,这是jms的ActiveMQ实现。但是jms到底是什么呢。如果开始就讲解这些理论比较强的东西,可能比较难理解,但是现在结合前面几章的实例对比就比较容易上手了。
1、.JMS 的概念
&& &JMS(Java Message Service),即 Java 消息服务,是一组 Java 应用程序接口,用以提供创建、发接、接收、读取消息的服务。SUN 只提供接口,由不同的厂商根据该接口提供自己的实现。
& &更好地理解 JMS 有助于了解 JMS 规范的制定者设置的目标。现在,市场上有许多企业消息产品,生产这些产品的几家公司也参与了 JMS 的开发。
& &现有的这些系统的能力和功能各不相同。这些制定者知道如果 JMS 结合所有现有系统的所有功能,那么它会变得过于复杂。同样,他们相信,他们也不能让 JMS 只局限于所有系统共有的那些特性。
& &制定者相信,让 JMS 包括实现“高级的企业应用程序”所需要的所有功能是很重要的。
& &JMS 规范中声明, JMS 的目标是:
定义一组消息公用概念和实用工具。
最少化程序员使用消息时必须学习的概念。
最大化消息应用程序的可移植性。&
最小化实现一个提供者所需的工作量。
为点对点和 pub/sub&域&提供客户机接口。“域”是用于在前面讨论的消息模型的 JMS 术语。(注意:提供者不需要实现两个域。)
3、.JMS 中间件
& 1).IBM MQ 系列产品提供的服务使得应用程序可以使用消息队列进行交流,通过一系列基于
Java 的 API 提供了 MQseries 在 Java 中应用开发的方法。它支持点到点和发布/订阅两种消息模式。
& 2).WebLogic是BEA公司实现的基于工业标准的J2EE应用服务器,支持大多数企业级JavaAPI,它完全兼容 JMS 规范,支持点到点和发布/订阅消息模式,它支持消息的多点广播、持久消息存储的文件和数据库、XML 消息,以及动态创建持久队列和主题。
& 3).JBoss 是 JBoss 公司开发的一个免费开源的应用服务器,它提供了 EJB 运行的环境,并能够结合 EJB 进行 JMS 消息的收取,支持点到点模型和发布/订阅模型。
& 4).ActiveMQ 是一个基于 Apache 2.0 Licenced 发布的开放源代码的 JMS 产品,它能够提供点到点消息模式和发布/订阅消息模式,支持 JBoss、Geronimo 等开源应用服务器,支持 Spring 框架的消息驱动,新增了一个 P2P 传输层,可以用于创建可靠的 P2P JMS 网络连接,拥有消息持久化、事务、集群支持等 JMS 基础设施服务。
& 5).OpenJMS 是一个开源的 JMS 规范的实现,它支持点对点模型和发布/订阅模型,支持同步与异步消息发送、可视化管理界面,支持 Applet,能够与 Jakarta Tomcat 这样的 Servlet容器结合,支持 RMI、TCP、HTTP 与 SSL 协议。
4、.JMS特性
& & &在 JMS 之前,每一家 MOM 厂商都用专有 API 为应用程序提供对其产品的访问,通常可用于许多种语言,其中包括 Java 语言。JMS 通过 MOM 产品为 Java 程序提供了一个发送和接收消息的标准的、便利的方法。用 JMS 编写的程序可以在任何实现 JMS 标准的 MOM
上运行。JMS 是由 Sun Microsystems 开发的,它为 Java 程序员提供一种访问企业消息系统的方法,即&面向消息的中间件&(MOM)。MOM 通过中间媒介以间接方式在应用程序之间异步传送数据,用这种方法提供一种以松散耦合的灵活方式集成应用程序的机制。
& & 要执行并测试这些程序,需要访问 JMS 的供应商实现。大多数 Java 2 Enterprise Edition (J2EE)供应商都会提供 JMS 的一种实现(也就是上面说的JMS中间件)。要想获得设置
JMS 运行时程序和执行程序的有关信息,请参阅供应商文档。
& & 它为 Java 程序提供一种访问&企业消息系统&的方法。在讨论
JMS 之前,我们分来析一下企业消息系统。
& &企业消息系统,即&面向消息的中间件(MOM),提供了以松散耦合的灵活方式集成应用程序的一种机制。它们提供了基于&存储和转发&的应用程序之间的异步数据发送,即应用程序彼此不直接通信,而是与作为中介的
MOM 通信。
MOM 提供了有保证的消息发送(至少是在尽可能地做到这一点),应用程序开发人员无需了解远程过程调用(PRC)和网络/通信协议的细节。
& &&1)消息灵活性
MOM 将消息路由给应用程序 B,这样,消息就可以存在于完全不同的计算机上,MOM 负责处理网络通信。如果网络连接不可用, MOM 会存储消息,直到连接变得可用时,再将消息转发给应用程序 B。
灵活性的另一方面体现在,当应用程序 A 发送其消息时,应用程序 B 甚至可以不处于执行状态。MOM 将保留这个消息,直到应用程序 B 开始执行并试着检索消息为止。这还防止了应用程序 A 因为等待应用程序 B 检索消息而出现阻塞。
这种异步通信要求应用程序的设计与现在大多数应用程序不同,不过,对于时间无关或并行处理,它可能是一个极其有用的方法。
& &&2)松散耦合
企业消息系统的真正威力在于应用程序的&松散耦合。在前一页的图表中,由应用程序 A 发送消息指定一个特定目标,如“订单处理”。而现在,是由应用程序 B 提供订单处理功能。
但是在将来,我们可以用不同的订单处理程序替换应用程序 B,应用程序 A 将不再是明智之选。替换应用程序将继续发送消息完成“订单处理”,而消息也仍将得到处理。
同样,我们也可以替换应用程序 A,只要替换应用程序继续发送消息进行“订单处理”,订单处理程序就无需知道是否有一个新的应用程序在发送订单。
最初,开发企业消息系统是为了实现&点对点模型&(PTP),由一个应用程序生成的每个消息都由另一个应用程序接收。近年来,出现了一种新的模型,叫做&发布与订阅&(或者
pub/sub)。
Pub/sub 用称为&主题(topic)&的内容分层结构代替了 PTP 模型中的惟一目的地,发送应用程序&发布&自己的消息,指出消息描述的是有关分层结构中的一个主题的信息。
希望接收这些消息的应用程序&订阅了 这个主题。订阅包含子主题的分层结构中的主题的订阅者可以接收该主题和其子主题发表的所有消息。
多个应用程序可以就一个主题发布和订阅消息,而应用程序对其他人仍然是匿名的。MOM 起着&代理(broker)&的作用,将一个主题已发表的消息路由给该主题的所有订阅者。
JMS 应用程序由以下元素组成:
JMS 客户机。&用 JMS API 发送和接收消息的 Java 程序。
非 JMS 客户机。&认识到传统程序通常整个 JMS 应用程序的一部分是非常重要的,在规划时必须优先考虑它们的存在。
消息。就JMS 应用程序设计而言,通过JMS 和非 JMS 客户机所交换的消息的格式和内容是完整的。
JMS 提供者。&如前所述,JMS 定义了一组接口,供应者必须提供特定于其 MOM 产品的具体实现。
管理对象。&消息系统提供者的管理员创建的、独立于提供者的专有技术的对象。
6、.JMS管理对象
MOM 产品的提供者在实现消息时使用的机制和技术有很大不同。为了保持 JMS 客户机的可移植性,实现了 JMS 接口的对象必须与提供者的专有技术隔离。
完成这项任务的机制是&管理对象。这些实现 JMS 接口的对象由提供者的消息系统的管理员创建,并被放置在 JNDI 名字空间中。
然后由 JMS 程序检索这些对象,通过它们实现的 JMS 接口访问这些对象。JMS 提供者必须提供允许创建受管理对象及它们在 JNDI 名字空间中的存放地的工具。
有两种受管理对象:
ConnectionFactory:用于创建到提供者的底层消息系统的连接。
Destination:用
JMS 客户机来指定正发送消息的目的地或正接收消息的来源。
尽管受管理对象本身就是特定于提供者实现的类的例子,但可以使用可移植机制(JNDI)检索它们,并且可以通过可移植接口(JMS)访问它们。JMS 程序只需要知道管理对象的 JNDI 名称和 JMS 接口类型即可,无需了解特定于提供者的知识。
7、.JMS接口
JMS 定义了一组封装各种消息概念的高级接口。而这些接口又因为两个消息域——PTP 和 pub/sub——进行了进一步地定义和定制。
高级接口包括:
ConnectionFactory:一个创建&Connection&的受管理对象。
Connection:连接到提供者的活动连接。
Destination:一个封装消息目的地的身份的受管理对象,如消息的来源地和发送地。
Session:发送和接收消息的单线程环境。为了简化,并且因为&Session&控制事务的缘故,通过多个线程进行并发访问受到了限制。可以将多个&Session&用于多线程应用程序。
MessageProducer:用于发送消息。
MessageConsumer:用于接收消息。
下表列出了从每一个高级接口继承的特定于域的接口。
高级接口PTP
域Pub/sub 域
ConnectionFactory
QueueConnectionFactory
TopicConnectionFactory
Connection
QueueConnection
TopicConnection
Destination
QueueSession
TopicSession
MessageProducer
QueueSender
TopicPublisher
MessageConsumer
QueueReceiver,QueueBrowser
TopicSubscriber
在 JMS 以前的版本中,高级接口是特定于域的接口的父接口,并且只包含在两个域中共有的那些功能。JMS 提供者没有提供高级接口的实现。在 JMS 1.1 中,一些高级接口现在则被认为是“公用接口”,并且它们包含两个域的所有功能; JMS 提供者必须提供这些接口的实现。尽管公用接口仍然是特定于域的接口的父接口,但它们现在是 JMS 客户机编程的首选方法,并且它们提供特定于域的接口只为了后向兼容。
下面重新列出了前一小节中的表,展示了一些公用接口。
JMS 公用接口PTP
域Pub/sub 域
ConnectionFactory
QueueConnectionFactory
TopicConnectionFactory
Connection
QueueConnection
TopicConnection
Destination
QueueSession
TopicSession
MessageProducer
QueueSender
TopicPublisher
MessageConsumer
QueueReceiver,QueueBrowser
TopicSubscriber
统一有公用接口的域会导致继承这些方法的一些特定于域的类不再适合它的域。如果在客户机代码中出现这种情况,JMS 提供者需要使用 IllegalStateException。
8、jms开发
一个典型的 JMS 程序要经过以下步骤才能开始产生和使用消息:
通过 JNDI 查询&ConnectionFactory&。
通过 JNDI 查询一个或者多个&Destination。
用&ConnectionFactory&创建一个&Connection。
用&Connection&创建一个或者多个&Session。
用&Session&和&Destination&创建所需要的&MessageProducer&和&MessageConsumer。
启动&Connection。
这时,消息就可以开始流动,应用程序可以根据需要接收、处理和发送消息。 在后面几节中,我们将开发 JMS 程序,您将会看到这些步骤的细节。
9、jms消息
消息系统的核心当然是消息。JMS 为不同类型的内容提供了几种消息类型,但所有消息都是从&Message&接口派生出来的。
Message&分为三个组成部分:
header&是一组标准字段,客户机和提供者都用它们来标识和路由消息。
Properties&提供了一个给消息添加可选标题字段的实用工具。如果应用程序需要用标准标题字段没有提供的方法对消息进行归类或分类,那么可以为消息添加一个属性来实现这种归类和分类;提供了&set&Type&Property(...)&和&get&Type&Property(...)&方法来设置和获得各种
Java 类型的属性,其中包括&Object。JMS 定义了提供者可以选择性提供的一组标准属性。
消息的&body&包含将 发送到接收应用程序的内容。每一个消息接口都专用于它所支持的内容类型。
10、header属性
下面列出了&Message&的每一个标题字段的名称、它对应的
Java 类型和字段的描述:
JMSMessageID——类型为&string
惟一标识提供者发送的每一条消息。这个字段是在发送过程中由提供者设置的,客户机只能在消息发送后才能确定消息的&JMSMessageID。
JMSDestination——类型为&Destination
消息发送的&Destination,在发送过程中由提供者设置。
JMSDeliveryMode——类型为&int
包含值&DeliveryMode.PERSISTENT&或者&DeliveryMode.NON_PERSISTENT。持久性消息被传输并且只被传输一次,非持久性消息最多被传输一次。要知道“最多一次”包括根本不传输。非持久性消息在应用程序或者系统出故障时被提供者弄丢。因此要格外小心,确保持久性消息不受故障的影响。这比开销通常被认为是发送持久性消息方面的开销,在决定消息的发送模式时,必须仔细考虑,在可靠性和性能之间进行权衡。
JMSTimestamp——类型为&long
提供者发送消息的时间,由提供者在发送过程中设置。
JMSExpiration——类型为&long
消息失效的时间。这个值是在发送过程中计算的,是发送方法的生存时间(time-to-live)值和当前时间值的和。提供者不应发送过期的消息。值 0 表明消息不会过期。
JMSPriority——类型为&int
消息的优先级,由提供者在发送过程中设置。优先级 0 的优先级最低,优先级 9 的优先级最高。
JMSCorrelationID——类型为&string
通常用来链接响应消息与请求消息,由发送消息的 JMS 程序设置。响应来自另一个 JMS 程序的消息的 JMS 程序将正响应消息的&JMSMessageID&拷贝到这个字段中,这样,正作出响应的程序就可以与它所发出的特定请求的响应相&关联。
JMSReplyTo——类型为&Destination
请求程序用它来指出回复消息应发送的地方,由发送消息的 JMS 程序设置。
JMSType——类型为&string
JMS 程序用它来指出消息的类型。一些提供者维护着一个消息类型仓库,并用该字段引用仓库中的定义类型,在这里,JMS 程序不应该使用这个字段。
JMSRedelivered——类型为&boolean
指出消息被过早地发送给了 JMS 程序,程序不知道消息的接收者是谁;由提供者在接收过程中设置。
11、标准属性
下面列表给出了&Message&的每一个标准属性的名称、它对应的
Java 类型和该属性的说明。提供者对标准属性的支持是可选的。JMS 为这些属性和将来 JMS 定义的属性保留了 “JMSX” 属性名。
JMSXUserID——类型为&string
发送消息的用户的身份。
JMSXApplID——类型为&string
发送消息的应用程序的身份。
JMSXDeliveryCount——类型为&int
已经尝试发送消息的次数。
JMSXGroupID——类型为&string
该消息所属的消息组的身份。
JMSXGroupSeq——类型为&int
该消息在消息组中的序号。
JMSXProducerTXID——类型为&string
生成该消息的事务的身份。
JMSXConsumerTXID——类型为&string
使用该消息的事务的身份。
JMSXRcvTimestamp——类型为&long
JMS 将消息发送给客户的时间。
JMSXState——类型为&int
提供者用它来维护消息的消息仓库,通常,它与 JMS 生产者和客户关系不大。
JMSX_&vendor_name&
为特定于提供者的属性而保留。
1、product发送JMS消息
Java代码&&
public&class&QueueSend&{&&
&&&&public&static&void&main(String[]&args)&throws&JMSException&{&&
&&&&&&&&ConnectionFactory&connectionfactory&=&new&ActiveMQConnectionFactory(&tcp://localhost:61616&);&&
&&&&&&&&&&
&&&&&&&&Connection&connection&=&connectionfactory.createConnection();&&
&&&&&&&&Session&session&=connection.createSession(false,Session.AUTO_ACKNOWLEDGE&);&&
&&&&&&&&QueueSend&qs&=&new&QueueSend();&&
&&&&&&&&&&
&&&&&&&&qs.sendTextMsg(session,&使用jms发送文本消息&,&queue.msgText&);&&
&&&&&&&&MapMessage&mapMsg&=&session.createMapMessage();&&
&&&&&&&&mapMsg.setString(&name&,&&李寻欢1&);&&
&&&&&&&&mapMsg.setBoolean(&IsHero&,&true);&&
&&&&&&&&mapMsg.setInt(&age&,&35);&&
&&&&&&&&qs.sendMap(session,&mapMsg,&&queue.msgMap&);&&
&&&&&&&&&&
&&&&&&&&Person&person&=&new&Person(&阿飞&,&23,&&北京.大兴&);&&
&&&&&&&&qs.sendObj(session,&person,&&queue.msgObj&);&&
&&&&&&&&&&
&&&&&&&&session.close();&&
&&&&&&&&connection.close();&&
&&&&public&void&sendTextMsg(Session&session,String&MsgContent,String&name)&throws&JMSException{&&
&&&&&&&&Queue&queue&=&new&ActiveMQQueue(name);&&
&&&&&&&&MessageProducer&msgProducer&=&session.createProducer(queue);&&
&&&&&&&&Message&msg&=&session.createTextMessage(MsgContent);&&
&&&&&&&&msgProducer.send(msg);&&
&&&&&&&&System.out.println(&文本消息已发送&);&&
&&&&public&void&sendMap(Session&session,MapMessage&map,String&name)&throws&JMSException{&&
&&&&&&&&Queue&queue&=&new&ActiveMQQueue(name);&&
&&&&&&&&MessageProducer&msgProducer&=&session.createProducer(queue);&&
&&&&&&&&msgProducer.send(map);&&
&&&&&&&&System.out.println(&Map格式的消息已发送&);&&
&&&&public&void&sendObj(Session&session,Object&obj,String&name)&throws&JMSException{&&
&&&&&&&&Destination&&queue&=&new&ActiveMQQueue(name);&&
&&&&&&&&MessageProducer&msgProducer&=&session.createProducer(queue);&&
&&&&&&&&ObjectMessage&objMsg=session.createObjectMessage((Serializable)&obj);&&
&&&&&&&&MessageProducer&msgPorducer&=session.createProducer(queue);&&
&&&&&&&&msgPorducer.send(objMsg);&&
&&&&&&&&System.out.println(&Object类型的消息已发送&);&&
&&&&&&&&&&
&当消息发送成功时我们可以再web消息管理端看到产生了如下图所示的三条消息。
&2、consumer消费消息
接受jms消息代码&&
public&class&QueueAccept&implements&MessageListener{&&
&&&&public&static&void&main(String[]&args)&throws&JMSException&{&&
&&&&&&&&ConnectionFactory&connectionfactory&=&&
&&&&&&&&Connection&connection=&&
&&&&&&&&Session&session=&&
&&&&&&&&if(connectionfactory==null){&&
&&&&&&&&&&&&connectionfactory&=&new&ActiveMQConnectionFactory(&tcp://localhost:61616&);&&
&&&&&&&&}&&
&&&&&&&&if(connection==null){&&
&&&&&&&&&connection&=&connectionfactory.createConnection();&&
&&&&&&&&&&&&connection.start();&&
&&&&&&&&&}&&
&&&&&&&&&session&=&connection.createSession(false,&Session.AUTO_ACKNOWLEDGE);&&
&&&&&&&&Queue&queue&=&new&ActiveMQQueue(&queue.msgText&);//根据发送的名称接受消息&&
&&&&&&&&MessageConsumer&consumer&=&session.createConsumer(queue);&&
&&&&&&&&consumer.setMessageListener(new&QueueAccept());//不继承MessageListener时可以用consumer.receive()手动接受消息&&
&&&&&&&&&&
&&&&&&&&Queue&queue1&=&new&ActiveMQQueue(&queue.msgMap&);&&
&&&&&&&&MessageConsumer&consumer1&=&session.createConsumer(queue1);&&
&&&&&&&&consumer1.setMessageListener(new&QueueAccept());&&
&&&&&&&&&&
&&&&&&&&Queue&queue2&=&new&ActiveMQQueue(&queue.msgObj&);&&
&&&&&&&&MessageConsumer&consumer2&=&session.createConsumer(queue2);&&
&&&&&&&&consumer2.setMessageListener(new&QueueAccept());&&
&&&&public&void&onMessage(Message&message)&{&&
&&&&&&&&/**&&
&&&&&&&&&*&接受文本类型的消息&&&
&&&&&&&&&*/&&
&&&&&&&&if(message&instanceof&TextMessage){&//instanceof&测试它所指向的对象是否是TextMessage类&&
&&&&&&&&&&&&TextMessage&text&=&(TextMessage)&&&
&&&&&&&&&&&&try&{&&
&&&&&&&&&&&&&&&&System.out.println(&发送的文本消息内容为:&+text.getText());&//接受文本消息&&&
&&&&&&&&&&&&}&catch&(JMSException&e)&{&&
&&&&&&&&&&&&&&&&e.printStackTrace();&&
&&&&&&&&&&&&}&&
&&&&&&&&}&&
&&&&&&&&/**&&
&&&&&&&&&*&接受Map类型的消息&&
&&&&&&&&&*/&&
&&&&&&&&if(message&instanceof&MapMessage){&&
&&&&&&&&&&&&MapMessage&map&=&(MapMessage)&&&
&&&&&&&&&&&&try&{&&
&&&&&&&&&&&&&&&&System.out.println(&姓名:&+map.getString(&name&));&&
&&&&&&&&&&&&&&&&System.out.println(&是否是英雄:&+map.getBoolean(&IsHero&));&&
&&&&&&&&&&&&&&&&System.out.println(&年龄:&+map.getInt(&age&));&&
&&&&&&&&&&&&}&catch&(JMSException&e)&{&&
&&&&&&&&&&&&&&&&e.printStackTrace();&&
&&&&&&&&&&&&}&&
&&&&&&&&}&&
&&&&&&&&if(message&instanceof&ObjectMessage){&&
&&&&&&&&&&&&ObjectMessage&objMsg&=&(ObjectMessage)&&&
&&&&&&&&&&&&try&{&&
&&&&&&&&&&&&&&&&Person&person=(Person)&objMsg.getObject();&&
&&&&&&&&&&&&&&&&System.out.println(&用户名:&+person.getName()+&年龄:&+person.getAge()+&地址:&+person.getAddress());&&
&&&&&&&&&&&&}&catch&(JMSException&e)&{&&
&&&&&&&&&&&&&&&&e.printStackTrace();&&
&&&&&&&&&&&&}&&
&&&&&&&&}&&
&客户端运行后接受到的消息:
& & 在上边的代码中,我们在发送Object对象时,我们发送的对象需要实现serializable接口。记得刚找工作的时候有一回有个面试官问到,为什么一个类要实现serializable接口,也许许多人新人都不知道为什么,只能按照背的面试题中的答案去说。刚好这个demo中遇到了这个问题,就继续复习一下java的基础吧。一个类只有实现了serializable才是可以序列化的,通俗的讲实现了serializable接口后我们将可以把这个类,在网络上进行发送,或者将这个类存入到硬盘,序列化的目的就是保存一个对象。
* 以上用户言论只代表其个人观点,不代表CSDN网站的观点或立场
访问:385707次
积分:5856
积分:5856
排名:第2129名
原创:188篇
转载:90篇
评论:55条
(2)(1)(2)(6)(2)(2)(1)(3)(1)(1)(8)(17)(5)(3)(14)(4)(1)(1)(2)(4)(13)(36)(42)(11)(26)(74)

我要回帖

更多关于 ibm mq 客户端连接mq 的文章

 

随机推荐