mqtt jmsmessageidd应该放在哪

IBM Bluemix
点击按钮,开始云上的开发!
developerWorks 社区
通过本文,学习如何使用 IBM Lotus Expeditor micro broker 支持 MQ Telemetry Transport (MQTT) 发布 / 订阅消息传递协议。在本文中,我们将创建一个示例发布程序、将消息发布到一个主题并验证消息的接收。
, 顾问软件工程师, High Performance On Demand Services, IBM
Brian O'Connell 是位于 Research Triangle Park, NC 的 IBM 的一位软件工程师。他在 Events Infrastructure 团队工作。他专长的领域包括 Java 并发编程、可伸缩 I/O 设计和发布/订阅系统。他现任发布/订阅系统的首席架构师和开发人员,此系统支持由 IBM 资助的运动项目网站。Brian 获得了弗吉尼亚技术学院暨州立大学(Virginia Polytechnic Institute and State University)计算机科学的学士学位。您可以通过 boc@ 与他联系。
, 高级工程师、杰出发明人、经理, Pervasive Messaging Technologies, EMC
Andy Stanford-Clark 是 IBM 的一名高级工程师,被评为英国 IBM Hursley Park Development Lab 的 “杰出发明人”。他专长的领域包括消息代理技术、远程监控、Internet 技术以及普及计算。他领导 Advanced Technology 团队构建将远程监控与控制设备的数据集成到业务应用程序的技术。Andy 获得了位于英国诺里奇 University of East Anglia 的计算机科学与数学理学士学位和并行计算博士学位。您可以通过
与 Andy 联系。
, 首席开发人员,普及和高级消息传递技术, IBM
Martin Gale 任职于 IBM 在英国的 Hursley 软件开发实验室。他是 Hursley 普及和高级消息传递技术团队的微代理产品的首席开发人员。他的研究背景包括互联网技术在 Web 和普及性领域的应用。
消息传递中间件提供了业务集成解决方案上下文中可靠灵活的连接服务。MQ Telemetry Transport (MQTT) 是 IBM 众多的消息传递中间件技术之一,是受 Lotus Expeditor micro broker 支持的一种协议。MQTT 是基于 TCP/IP 的发布 / 订阅消息传递协议,针对低负荷网络的通信而设计。micro broker 是一种小型的消息代理(只有不足 2 MB 的 Java 代码),主要在小型的应用程序型设备上部署,通常设备都在离企业数据中心很远的位置。在本文中,将创建一个能够连接到 Lotus Expeditor micro broker 的示例发布程序,并用它向一个主题发布消息和验证代理对该消息的接收。学习完本文后,您应该具备了按自己的业务需要创建简单的 MQTT 发布程序所需的知识。有关向兼容 MQTT 的代理进行订阅的历史、设计动机、协议特点和操作指导,可以在另一篇文章 “” 中找到。本文扩展了上述文章中所触及的概念并继续介绍了如何使用 MQTT 发布消息。上述文章涵盖了订阅方面的内容,使用的是 IBM Websphere Message Broker;本文将探究消息发布,使用的是更新的 Lotus Expeditor micro broker。本文所涵盖的方方面面都可以直接应用到 WebSphere Message Broker,因为二者都支持 MQTT 协议。本文是用 Java 编写 MQTT 发布程序的指南。本文所详细介绍的这个示例应用程序模仿了机场上有航班抵达时的通知消息发布。为了简化此示例,我们将此场景限制为只包括两个航空公司(Air Freedom 和 Northern Air)和两个机场(Raleigh-Durham/RDU 和 London Heathrow/LHR)。要完成本示例的编译,Lotus Expeditor micro broker 附带的 MQTT 库必须可用。此外,为了用示例 MQTT 客户机连接和发布消息,还要安装和运行 Lotus Expeditor micro broker。主题在消息的发布 / 订阅中,消息的目的地称为主题。MQTT 协议具有分层的主题空间,这意味着主题可以按这样一种方式构成,即订阅者和发布者可以使用不同精度指定目的主题。MQTT 对于主题空间有一些强制规定,但设计适合您自己应用程序的逻辑信息空间则由您负责。经验表明,详细描述的主题空间要比简单扼要的空间更可取。例如,不建议使用 a、b 或 c 作为主题。使用简短的主题虽然可以节约带宽,但一个设计良好的主题空间则应该更具描述性且允许在订阅应用程序中使用通配符,因此更有实际意义和益处。主题是无格式的字符串,可以包含任何单一字节的字符。但字符 /、+ 和 # 具有特定意义,在本文稍后会详细讨论。主题长度最长可达 32,767 个字符。图 1 的示例展示了为飞机抵达和起飞消息所设计的逻辑主题空间。此主题空间将贯穿全文。图 1. 主题空间该主题空间的层次为:Flight Times - Airport - Airline - Arrivals or Departures - Flight Number。要将主题层次转变成字符串,可以使用前斜线 (/) 字符分隔主题层次中的每个部分,这又增加了额外的逻辑层精度。例如,要发布有关 Air Freedom 1326 次航班抵达 RDU 的消息 , 消息主题应该是 Flight Times/RDU/Air Freedom/Arrivals/Flight 1326。这种逻辑主题空间允许不同的订阅者组可以只订阅他们所感兴趣的消息。而设计糟糕的主题空间则会导致客户的过度订阅,即客户订阅了超出自身所需的主题空间,继而会收到很多无关的消息。这些无关消息需要客户应用程序提供额外的消息过滤以决定消息是否是所感兴趣的。过度订阅会导致带宽浪费和客户机效率的降低。接下来,我们将描述三组截然不同的订阅者,他们均对全球范围内航班的抵达和起飞时间感兴趣。一个人在机场等待飞机抵达机场显示航班信息航空公司跟踪准时航班的百分比在第一个场景中,假设此人的电话上安装了 MQTT 客户应用程序,他在机场等待接机。在本例中,此人只对特定机场(LHR)的某一个航班(Air Freedom 1024 次航班)感兴趣。为了接收航班抵达的通知,客户应用程序订阅了 Flight Times/LHR/Air Freedom/Arrivals/Flight 1024。在第二个订阅场景中,希思罗机场 (LHR) 显示其航班抵达和起飞的信息。在本例中,客户只对 LHR 机场的航班抵达和起飞信息感兴趣。此逻辑主题空间应该允许对 LHR 机场的航班抵达和起飞信息进行单个订阅。所需的订阅主题字符串是 Flight Times/LHR/#。# 字符是个特殊的通配符,可匹配主题空间内合适位置中的所有主题,可以将其视为是订阅主题空间的一个子树。最后一个场景是航空公司跟踪其准点航班的数据。我们的示例航空公司是 Northern Air,该公司很关注其全球范围的准点航班的百分比。因此 Northern Air 需要单一订阅全球范围内的航班抵达时间。在本例中,Northern Air 只关心航班抵达,不关心航班起飞。Northern Air 这一特定需求的主题字符串是 Flight Times/+/Northern Air/Arrivals/#。此主题字符串使用了特殊的通配符 + 字符,它让 Northern Air 得以订阅所有机场的航班抵达信息。与 # 通配符不同,它只匹配主题空间层次中的某一层,并不试图匹配层次中的所有稍低的层。在创建 MQTT 应用程序时,必须认真将主题空间设计得符合逻辑,并且支持灵活订阅。这种方式确保了发布的大多数客户都无需多次订阅和过度订阅。连接 MQTT用 MQ Telemetry Transport 进行消息发布需要连接到 Lotus Expeditor micro broker 或任何支持 MQTT 协议的消息服务器,比如,WebSphere Message Broker。连接到 broker 需要几个步骤。首先,必须构成并向客户机创建工厂提供 MQTT 属性对象。此属性对象提供实例化客户机的配置。属性之一是 Boolean 标志,指定了客户应用程序是否是一个 clean session 客户机,如果是的话,每次连接客户机时都不依赖于先前连接到代理的既有知识(比如任何之前的订阅或任何等待发布的消息)。如果此标志为 false,客户机状态就在连接代理过程中保持不变;例如,客户应用程序无需每次在后续的重连接时都重新订阅。此外,clean session 设置为 false,客户机和代理都试图恢复在连接中断时所打断的进展中的消息交换(根据为消息指定的服务质量)。要使用非 clean session 客户机,必须提供
MqttPersistence 接口实现。包含此接口的实现对客户机创建工厂而言意味着客户应用程序需要使用持久(可靠)消息发布。本例使用的是 clean session 客户机,并假设网络足够可靠。属性配置之后,MQTT 客户实例会从 MQTT 客户机工厂获得。创建 MQTT 客户实例需要几个参数,包括一个惟一客户机 ID、代理 IP 地址和端口以及可选的 MqttProperties 对象。客户机 ID 向代理表明客户机的身份。它主要用来启用持久消息的传递和在客户机数个连接和断开过程中保持订阅状态。每个连接到代理的客户机都要使用不同的客户机 ID。如果两个客户机试图在连接到代理时使用相同的客户机 ID,系统只承认后一个连接,前一个连接会强制断开。这种设计实际上实现了当之前的连接未完全消除时重新连接客户机。客户机 ID 最长为 23 字符。参见清单 1。清单 1. 连接
* Create a MqttClient object after configuring the MqttProperties object as
* required.
private MqttClient createClient() throws MqttException {
MqttProperties mqttProps = new MqttProperties();
// Stateless "clean session" client
mqttProps.setCleanStart(true);
* Create the client from the factory. The client ID for this client is
* "testClient" and the URL in the second parameter describes the
* location of the broker, in this case, on the local machine.
MqttClient mqttClient = MqttClientFactory.INSTANCE.createMqttClient(
"testClient", "tcp://mybroker:1883", mqttProps);
return mqttC
* Connect the MqttClient to a broker.
* @throws MqttException
If an error occurs during connection operations.
private void connect() throws MqttException {
* Register this application for callbacks from the client
client.registerCallback(this);
* Connect the client to a broker.
client.connect();
}发布MQTT 成功连接之后,就可以发布消息了。应用程序通过 MQTT 客户对象发布消息。发布消息的方法签名是 int publish(String, MqttPayload, byte, Boolean)。这四个参数详细解释如下:String:主题参数的类型是字符串,代理用此字符串来针对订阅者的兴趣(使用之前描述的订阅主题语法指定)匹配发布。MqttPayload:第二个参数是一个 MqttPayload 对象。这个 MqttPayload 对象包含应用程序数据和任何此发布的协议头。此外,还提供了一个偏移量以便应用程序能够决定数据在 MqttPayload 中从何处开始。这实现了对底层字节数组的访问,而无需在数据写到网络后创建额外的副本。提供这种访问是为了在对象构成后和进行传输之前直接在载荷内操纵数据。Byte:第三个参数,是此发布的服务质量(Quality of Service,QoS)。QoS 有三级:0、1 或 2:QoS 为 0 表明发布程序和代理都只尝试消息的一次传递,不会采取任何额外的步骤,亦不会超越 TCP/IP 去确保消息的传递。这一级有时又称作 “发后即忘”,原因是向目的地发送消息后并不验证消息的接收。QoS 为 1 表明消息会被确保传递到代理;但消息可能会传递多次。QoS 为 2 则告知 MQTT 传递消息且只传递一次。
QoS 级别增加会导致额外的处理器和网络开销。QoS 的设置会影响到消息传递解决方案的整体可扩展性。而且还会增加客户机存储未传递消息的负担。因而,在为每个所发布的消息选择合适的 QoS 级别时要格外注意。总的来说,若非迫切的传递保证所需,尽量使用较低级别的 QoS。随消息提供的 QoS 值指定了在客户机和代理之间发布消息的服务质量。此外,该值还决定了代理用来向其订阅者发布消息的最大 QoS 级别。
订阅者可以基于每个主题指定消息传递的最大 QoS,所以若消息在 QoS 2 发布,此消息有可能不是在该级别发布给其订阅者。订阅者可以请求为它所收到的消息提供降级 QoS。也许您会觉得消息的端到端 QoS 不由发布程序控制多少有些奇怪,但这样做会提高消息使用者的灵活性。当发布后的消息发送给订阅者时,代理会以订阅过程中由此订阅者指定的最高的 QoS 级别或是以此发布消息的 QoS 级别(如果此级别更低)传递此消息。例如,以 QoS 2 向订阅者发布的消息,若订阅者指定此主题的级别为 QoS 1,那么主题就会以 QoS 1 传递。对于同样级别的标题,若向相同的订阅者发布 QoS 0 级别的消息,那么此消息就会以 QoS 0 级别发送给订阅者。 Boolean:第四个参数是 Boolean 标志,该标志表明它是否是保留发布。保留发布存在于代理之内,作为针对给定主题收到的最后一个消息。保留发布让后续订阅者可以在订阅之后立即收到关于某主题的最新消息,即使是他们在此消息发布之后才连接。对于启动之后即刻填充某个显示应用程序并随后用对此信息的后续更改更新它,这种方式十分有效。如果此标志设为 false,只有当前订阅到该主题的订阅者可以收到消息。清单 2 中所示的示例使用的是一个非保留发布。这种发布方法会返回一个整型消息 ID。它可以与已注册的 MqttAdvancedCallback 方法结合使用来检查消息是否已由代理接收。清单 2 中的代码发布的消息表明 Air Freedom 的 1024 次航班已经抵达伦敦希思罗机场 (LHR)。清单 2. 发布 /**
* Invoke from the command line with a single parameter, the broker URI,
* e.g. tcp://mybroker:1883.
public static void main(String args[]) {
MqttPublisher publisher =
publisher = new MqttPublisher(args[0]);
* Connect the newly created publisher to the supplied broker.
publisher.connect();
* Publish an "Arrival" message.
publisher.publishMessage(
"Flight Times/LHR/Air Freedom/Arrivals/Flight 1024",
(byte) 2, "Arrived");
* Sleep for 1 second waiting to receive notification of
* publication. Real applications should use appropriate
* inter-thread signaling mechanisms such as wait/notify,
* cyclic barriers or latches.
Thread.sleep(1000);
catch (MqttException exception) {
System.err.println("Exception occurred during either instantiation,
connection, or publication: "
+ exception.getMessage());
catch (InterruptedException exception) {
System.err.println("Interrupted while waiting for publication: "
+ exception.getMessage());
* Close the publisher if instantiated.
if (publisher != null) {
publisher.disconnectClient();
catch (MqttException exception) {
System.err.println("Exception occurred closing publisher: "
+ exception.getMessage());
* Construct a new MqttPublisher containing an unconnected MqttClient.
* @param brokerURL
Broker URL to (eventually) connect to.
* @throws MqttException
If an underlying MQTT error occurs instantiating the client
private MqttPublisher(String brokerURL) throws MqttException {
this.brokerURL = brokerURL;
this.client = createClient();
* Publish a string as a message in byte form with the given quality of
* service to the given topic.
public void publishMessage(String topic, byte qos, String message)
throws MqttException {
client.publish(topic, new MqttPayload(message.getBytes(), 0), qos,
}回调此示例客户机连接到代理,并具有发布消息的能力。回调可为发布程序提供增强的功能性,这与我们的另一篇文章中所描述的客户机订阅类似。要收到发布的确认,必须创建回调处理程序,此处理程序还必须用 MQTT 客户机对象注册。有两种回调处理程序,简单回调处理程序和高级回调处理程序。它们分别由 MqttCallback 接口和 MqttAdvancedCallback 接口实现。MqttAdvancedCallback 接口用来扩展 MqttCallback,所以在使用高级回调接口时,也必须实现在简单回调接口中定义的方法。要使用高级接口,除了从简单回调函数中继承的方法之外,还必须实现另外三个方法:subscribed(int, byte[])、unsubscribed(int) 和 published(int)。前两个回调方法:subscribed(int, byte[]) 和 unsubscribed(int) 主要针对想要监视订阅确认的客户机。当代理确认了订阅请求后,即调用订阅方法。同样地,第二个方法 unsubscribed 在从主题取消订阅的请求得到确认后调用。由于本示例只侧重于发布,所以示例客户机没有使用这两个方法;然而,骨架实现还是成功的代码编译所必需的。发布客户机最感兴趣的方法是 published(int)。此方法提供了消息已成功传递给代理的通知。此方法只有一个整型参数,即 messageID。应用程序可能希望将这个 messageID 与发布方法所返回的 messageID 进行匹配。回调只能用于以 QoS 级别 1 或 2 发布的那些消息。MqttPersistence 实现所提供的客户端持久性与 QoS 1 或 2 的结合使用实现了为确保消息传递而使用的回调冗余。MQTT 客户机可跟踪连接故障时的所有发布,并试图一旦重建连接就即刻完成消息传递。但某些应用程序却愿意进行分步消息传递或采用其自身的传递保障机制。要接收通知,高级回调接口的实现必须要用 MQTT 客户端注册。registerCallback 方法提供 MQTT 客户机的注册特性。中的示例代码由清单 2 扩展而来,是一个功能全面的 MQTT 发布程序。MqttAdvancedCallback 接口由此类实现,并由之前创建的 MQTT 对象注册。该类可使用单一一个参数从命令行开启,而此参数中包含代理 URI,比如,tcp://mybroker:1883。结束语MQTT 是面向发布 / 订阅消息范型的一个功能强大的传送机制。在需要小型客户机和低网络负荷的情况下,它能提供在其他发布 / 订阅协议上增强了的实用工具。本文介绍了如何创建功能全面的 MQTT 发布程序。示例客户机连接到代理并向主题发布消息。此示例还展示了 MqttAdvancedCallback 接口,以用来通知消息已发送给代理。
参考资料 您可以参阅本文在 developerWorks 全球网站上的 。
阅读 developerWorks 文章 “”。
阅读 developerWorks Lotus 文章 “”。
阅读 developerWorks Lotus 文章 “”。
阅读 developerWorks Lotus 文章 “”。
阅读 developerWorks Lotus 文章 “”。
阅读 developerWorks Lotus 文章 “”。
阅读 developerWorks Lotus 文章 “”。
阅读 developerWorks Lotus 文章 “”。
阅读 IBM 红皮书 。
developerWorks: 登录
标有星(*)号的字段是必填字段。
保持登录。
单击提交则表示您同意developerWorks 的条款和条件。 查看条款和条件。
在您首次登录 developerWorks 时,会为您创建一份个人概要。您的个人概要中的信息(您的姓名、国家/地区,以及公司名称)是公开显示的,而且会随着您发布的任何内容一起显示,除非您选择隐藏您的公司名称。您可以随时更新您的 IBM 帐户。
所有提交的信息确保安全。
选择您的昵称
当您初次登录到 developerWorks 时,将会为您创建一份概要信息,您需要指定一个昵称。您的昵称将和您在 developerWorks 发布的内容显示在一起。昵称长度在 3 至 31 个字符之间。
您的昵称在 developerWorks 社区中必须是唯一的,并且出于隐私保护的原因,不能是您的电子邮件地址。
标有星(*)号的字段是必填字段。
(昵称长度在 3 至 31 个字符之间)
单击提交则表示您同意developerWorks 的条款和条件。 .
所有提交的信息确保安全。
文章、教程、演示,帮助您构建、部署和管理云应用。
立即加入来自 IBM 的专业 IT 社交网络。
为灾难恢复构建应用,赢取现金大奖。
static.content.url=/developerworks/js/artrating/SITE_ID=10Zone=Lotus, WebSphereArticleID=327506ArticleTitle=使用 IBM Lotus Expeditor micro broker MQTT 客户机发布消息publish-date=Java Code Example org.eclipse.paho.client.mqttv3.internal.wire.MqttWireMessage
Java Code Examples for org.eclipse.paho.client.mqttv3.internal.wire.MqttWireMessage
The following are top voted examples for showing how to use
org.eclipse.paho.client.mqttv3.internal.wire.MqttWireMessage. These examples are extracted from open source projects.
You can vote up the examples you like and your votes will be used in our system to product
more good examples.
+ Save this class to your library
* Inserts a new message to the list, ensuring that list is ordered from lowest to highest in terms of the message id's.
* @param list the list to insert the message into
* @param newMsg the message to insert into the list
private void insertInOrder(Vector list, MqttWireMessage newMsg) {
int newMsgId = newMsg.getMessageId();
for (int i = 0; i & list.size(); i++) {
MqttWireMessage otherMsg = (MqttWireMessage) list.elementAt(i);
int otherMsgId = otherMsg.getMessageId();
if (otherMsgId & newMsgId) {
list.insertElementAt(newMsg, i);
list.addElement(newMsg);
* Cleans up the supplied queue, notifying any tokens waiting for the
* messages on the queue.
private void cleanUpQueue(Vector queue) {
//@TRACE 636=cleanUpQueue
trace.trace(Trace.FINE,636);
Enumeration e = queue.elements();
MqttDeliveryTokenI
MqttException ex = ExceptionHelper.createMqttException(MqttException.REASON_CODE_CLIENT_DISCONNECTING);
Integer messageId;
while (e.hasMoreElements()) {
message = (MqttWireMessage) e.nextElement();
token = this.tokenStore.getToken(message);
messageId = new Integer(message.getMessageId());
// It may be QoS 2, so prevent the CONFIRM from being sent.
if (outboundQoS2.containsKey(messageId)) {
outboundQoS2.remove(messageId);
// Outbound acks do not have tokens in the store
if (token != null) {
token.notifyException(ex);
tokenStore.removeToken(message);
queue.removeElement(message);
* Inserts a new message to the list, ensuring that list is ordered from lowest to highest in terms of the message id's.
* @param list the list to insert the message into
* @param newMsg the message to insert into the list
private void insertInOrder(Vector list, MqttWireMessage newMsg) {
int newMsgId = newMsg.getMessageId();
for (int i = 0; i & list.size(); i++) {
MqttWireMessage otherMsg = (MqttWireMessage) list.elementAt(i);
int otherMsgId = otherMsg.getMessageId();
if (otherMsgId & newMsgId) {
list.insertElementAt(newMsg, i);
list.addElement(newMsg);
* Called by the CommsReceiver when a message has been received.
* Handles inbound messages and other flows such as PUBREL.
* @param message
* @throws MqttException
protected void notifyReceivedMsg(MqttWireMessage message) throws MqttException {
final String methodName = &notifyReceivedMsg&;
this.lastInboundActivity = System.currentTimeMillis();
// @TRACE 651=received key={0} message={1}
log.fine(className, methodName, &651&, new Object[] {
new Integer(message.getMessageId()), message });
if (!quiescing) {
if (message instanceof MqttPublish) {
MqttPublish send = (MqttPublish)
switch (send.getMessage().getQos()) {
if (callback != null) {
callback.messageArrived(send);
persistence.put(getReceivedPersistenceKey(message),
(MqttPublish) message);
inboundQoS2.put(new Integer(send.getMessageId()), send);
this.send(new MqttPubRec(send), null);
} else if (message instanceof MqttPubRel) {
MqttPublish sendMsg = (MqttPublish) inboundQoS2
.get(new Integer(message.getMessageId()));
if (sendMsg != null) {
if (callback != null) {
callback.messageArrived(sendMsg);
// Original publish has already been delivered.
MqttPubComp pubComp = new MqttPubComp(message
.getMessageId());
this.send(pubComp, null);
* Inserts a new message to the list, ensuring that list is ordered from lowest to highest in terms of the message id's.
* @param list the list to insert the message into
* @param newMsg the message to insert into the list
private void insertInOrder(Vector list, MqttWireMessage newMsg) {
int newMsgId = newMsg.getMessageId();
for (int i = 0; i & list.size(); i++) {
MqttWireMessage otherMsg = (MqttWireMessage) list.elementAt(i);
int otherMsgId = otherMsg.getMessageId();
if (otherMsgId & newMsgId) {
list.insertElementAt(newMsg, i);
list.addElement(newMsg);
* Called by the CommsReceiver when a message has been received.
* Handles inbound messages and other flows such as PUBREL.
* @param message
* @throws MqttException
protected void notifyReceivedMsg(MqttWireMessage message) throws MqttException {
final String methodName = &notifyReceivedMsg&;
this.lastInboundActivity = System.currentTimeMillis();
// @TRACE 651=received key={0} message={1}
log.fine(className, methodName, &651&, new Object[] {
new Integer(message.getMessageId()), message });
if (!quiescing) {
if (message instanceof MqttPublish) {
MqttPublish send = (MqttPublish)
switch (send.getMessage().getQos()) {
if (callback != null) {
callback.messageArrived(send);
persistence.put(getReceivedPersistenceKey(message),
(MqttPublish) message);
inboundQoS2.put(new Integer(send.getMessageId()), send);
this.send(new MqttPubRec(send), null);
} else if (message instanceof MqttPubRel) {
MqttPublish sendMsg = (MqttPublish) inboundQoS2
.get(new Integer(message.getMessageId()));
if (sendMsg != null) {
if (callback != null) {
callback.messageArrived(sendMsg);
// Original publish has already been delivered.
MqttPubComp pubComp = new MqttPubComp(message
.getMessageId());
this.send(pubComp, null);
* Inserts a new message to the list, ensuring that list is ordered from lowest to highest in terms of the message id's.
* @param list the list to insert the message into
* @param newMsg the message to insert into the list
private void insertInOrder(Vector list, MqttWireMessage newMsg) {
int newMsgId = newMsg.getMessageId();
for (int i = 0; i & list.size(); i++) {
MqttWireMessage otherMsg = (MqttWireMessage) list.elementAt(i);
int otherMsgId = otherMsg.getMessageId();
if (otherMsgId & newMsgId) {
list.insertElementAt(newMsg, i);
list.addElement(newMsg);
* Called by the CommsReceiver when a message has been received.
* Handles inbound messages and other flows such as PUBREL.
* @param message
* @throws MqttException
protected void notifyReceivedMsg(MqttWireMessage message) throws MqttException {
final String methodName = &notifyReceivedMsg&;
this.lastInboundActivity = System.currentTimeMillis();
// @TRACE 651=received key={0} message={1}
log.fine(CLASS_NAME, methodName, &651&, new Object[] {
new Integer(message.getMessageId()), message });
if (!quiescing) {
if (message instanceof MqttPublish) {
MqttPublish send = (MqttPublish)
switch (send.getMessage().getQos()) {
if (callback != null) {
callback.messageArrived(send);
persistence.put(getReceivedPersistenceKey(message),
(MqttPublish) message);
inboundQoS2.put(new Integer(send.getMessageId()), send);
this.send(new MqttPubRec(send), null);
//should NOT reach here
} else if (message instanceof MqttPubRel) {
MqttPublish sendMsg = (MqttPublish) inboundQoS2
.get(new Integer(message.getMessageId()));
if (sendMsg != null) {
if (callback != null) {
callback.messageArrived(sendMsg);
// Original publish has already been delivered.
MqttPubComp pubComp = new MqttPubComp(message
.getMessageId());
this.send(pubComp, null);
* Inserts a new message to the list, ensuring that list is ordered from lowest to highest in terms of the message id's.
* @param list the list to insert the message into
* @param newMsg the message to insert into the list
private void insertInOrder(Vector list, MqttWireMessage newMsg) {
int newMsgId = newMsg.getMessageId();
for (int i = 0; i & list.size(); i++) {
MqttWireMessage otherMsg = (MqttWireMessage) list.elementAt(i);
int otherMsgId = otherMsg.getMessageId();
if (otherMsgId & newMsgId) {
list.insertElementAt(newMsg, i);
list.addElement(newMsg);
Example 10
* Cleans up the supplied queue, notifying any tokens waiting for the
* messages on the queue.
private void cleanUpQueue(Vector queue) {
//@TRACE 636=cleanUpQueue
trace.trace(Trace.FINE,636);
Enumeration e = queue.elements();
MqttDeliveryTokenI
MqttException ex = ExceptionHelper.createMqttException(MqttException.REASON_CODE_CLIENT_DISCONNECTING);
Integer messageId;
while (e.hasMoreElements()) {
message = (MqttWireMessage) e.nextElement();
token = this.tokenStore.getToken(message);
messageId = new Integer(message.getMessageId());
// It may be QoS 2, so prevent the CONFIRM from being sent.
if (outboundQoS2.containsKey(messageId)) {
outboundQoS2.remove(messageId);
// Outbound acks do not have tokens in the store
if (token != null) {
token.notifyException(ex);
tokenStore.removeToken(message);
queue.removeElement(message);

我要回帖

更多关于 flex messageid 的文章

 

随机推荐