博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
mqtt的messageId是怎么回事
阅读量:4036 次
发布时间:2019-05-24

本文共 10856 字,大约阅读时间需要 36 分钟。

昨天去巡检线上环境的时候,偶然发现了某个服务报了一个错误,而且是每隔90秒报一次,错误信息如下:

意思是内部错误,没有新的messageid可以使用了。

消息队列就不多说了。正常的情况就是一个消息会有一个消息id,如果不了解mqtt的消息id的话,我们正常人的思维就是这个消息id是个随机数,因为消息的id是int类型,所以最大值是2^31-1,大概是21亿,对于一个庞大的系统而且是持续运行的系统,消息肯定会满天飞,数量级可能是百亿甚至千亿,那这个id能装得下么。结合着这个疑问和这个报错,捋了一下这块的代码。

 

对于消息队列来讲,就是生产者、消费者两个角色。 生产者很简单,生产消息,简单的可以理解为建立连接、发送消息、关闭连接。 消费者是监听某个队列,如果有消息就去接收,并做处理,那么对于消费者来讲就是一个持续连接的状态。

下面先贴下伪代码:

1、相关依赖:

org.eclipse.paho
org.eclipse.paho.client.mqttv3
1.1.0
com.aliyun.openservices
ons-client
1.3.2.Final
com.alibaba
fastjson
1.2.32

2、消费者,采用的是异步监听的方式

@Componentpublic class MQTTAsyncClientFactory implements ApplicationRunner {    private static MqttAsyncClient mqttAsyncClient;    private MQTTAsyncClientFactory() {    }    public static MqttAsyncClient getMqttAsyncProducerClient() throws MqttException {        return mqttAsyncClient;    }    @Override    public void run(ApplicationArguments args) throws Exception {        LMQProperties lmqProperties = LMQProperties.getInstance();        String clientId = lmqProperties.getGroupId() + "@@@" + lmqProperties.getDeviceId() + InetAddress.getLocalHost().getHostAddress().toString().replaceAll("\\.", "");        final MemoryPersistence memoryPersistence = new MemoryPersistence();        mqttAsyncClient = new MqttAsyncClient(lmqProperties.getBrokerUrl(), clientId, memoryPersistence);        MqttConnectOptions connOpts = new MqttConnectOptions();        connOpts.setUserName(lmqProperties.getUserName());        connOpts.setPassword(lmqProperties.getPassWord().toCharArray());        connOpts.setCleanSession(lmqProperties.getCleanSession());        connOpts.setKeepAliveInterval(10);        connOpts.setAutomaticReconnect(true);        connOpts.setMaxInflight(lmqProperties.getMaxInflight());        mqttAsyncClient.setCallback(new MqttCallbackExtended() {            @Override            public void connectComplete(boolean reconnect, String serverURI) {                try {                    //订阅Topic,可以订阅多个                    final String topicFilter[] = {lmqProperties.getTopic() + "/" + lmqProperties.getRecieveTopic2() + "/" + lmqProperties.getTopic3()};                    //设置QoS级别                    final int[] qos = {lmqProperties.getQos()};                    Thread.sleep(2000);                    System.out.println("连接成功,订阅消息");                    IMqttToken subscribe = mqttAsyncClient.subscribe(topicFilter, qos);                    System.out.println("消息ID:"+subscribe.getMessageId()+" Thread="+Thread.currentThread().getName());                } catch (MqttException e) {                    e.printStackTrace();                } catch (InterruptedException e) {                    e.printStackTrace();                }            }            @Override            public void connectionLost(Throwable throwable) {                throwable.printStackTrace();            }            @Override            public void messageArrived(String s, MqttMessage mqttMessage) throws Exception {                //处理pos返回消息                rececive(new String(mqttMessage.getPayload()));            }            @Override            public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {                //this notice make sense when qos >0                int serialid = iMqttDeliveryToken.getMessageId();                System.out.println("消息id是"+serialid+" Thread="+Thread.currentThread().getName());            }        });        mqttAsyncClient.connect(connOpts).waitForCompletion();        System.out.println("mqtt连接成功");    }    public static boolean rececive(String msg){        //TODO 处理pos返回消息        return true;    }}

消费者其实实现起来也很简单,就是实现接口

ApplicationRunner,并且把类注册到spring容器中就行了。

ApplicationRunner接口很简单:

public interface ApplicationRunner {	/**	 * Callback used to run the bean.	 * @param args incoming application arguments	 * @throws Exception on error	 */	void run(ApplicationArguments args) throws Exception;}

这个接口的注释说的也很明确:

Interface used to indicate that a bean should run when it is contained within a {@link SpringApplication}. Multiple {@link ApplicationRunner} beans can be defined within the same application context and can be ordered using the {@link Ordered} interface or {@link Order @Order} annotation.

意思是只要你这个bean实现了这个接口,并且包含在spring容器中,就会执行实现的run方法。多个bean可以在同一个spring上下文中,并且可以通过order接口或注解来进行指定执行顺序。

run方法会在spring容器启动之后进行执行,

这里面有一个对象MqttAsyncClient,是在spring容器启动之后进行实例化而且实例化了一次。

正常同步的话,我们可以在

mqttAsyncClient.connect(connOpts)之后,进行订阅
mqttAsyncClient.subscribe(topicFilter, qos);

但是在这个demo里面,我们是在连接成功之后,回调确认连接成功的时候进行的监听,应该是一样的。因为确认连接的过程实际上也是发送了一条消息。

LMQProperties这个类是一个单例,封装了mqtt连接的信息,具体配置的值我已经去掉了。
@Componentpublic class LMQProperties {    //服务地址    private String brokerUrl;    //一级topic    private String topic;    //二级发送消息topic   商户ID_标识符   标识符:用1/0标记,1:消息从云平台到POS,0:消息从POS到云平台    private String sendTopic2;    //二级接收消息topic    private String recieveTopic2;    //三级topic    private String topic3;    //服务系统用户名    private String userName;    //    private String passWord;    //客户端配置分组id 在控制台创建后使用    private String groupId;    //Device ID: 每个设备独一无二的标识,由业务方自己指定。需要保证全局唯一,例如每个传感器设备的序列号。    //与groupId 一起生成 clientID = groupId+@@@+deviceId    private String deviceId;    //QoS级别    private Integer qos;    //设置客户端是否使用持久化模式    private Boolean cleanSession;    //允许未能及时收到broker回复的ack时的MQ消息最大数量    private int maxInflight;    //老版本队列的企业号白名单    private String companyIdsWhite;    private LMQProperties(){        brokerUrl = "";        topic = "";        sendTopic2 = "test";        recieveTopic2 = "";        topic3 = "";        userName="";        passWord="";        groupId="";        deviceId="";        qos=1;        cleanSession= false;        maxInflight=100000;    }    private static LMQProperties INSTANCE = new LMQProperties();    public static LMQProperties getInstance(){        return INSTANCE;    }    public String getBrokerUrl() {        return brokerUrl;    }    public void setBrokerUrl(String brokerUrl) {        this.brokerUrl = brokerUrl;    }    public String getTopic() {        return topic;    }    public void setTopic(String topic) {        this.topic = topic;    }    public String getSendTopic2() {        return sendTopic2;    }    public void setSendTopic2(String sendTopic2) {        this.sendTopic2 = sendTopic2;    }    public String getRecieveTopic2() {        return recieveTopic2;    }    public void setRecieveTopic2(String recieveTopic2) {        this.recieveTopic2 = recieveTopic2;    }    public String getTopic3() {        return topic3;    }    public void setTopic3(String topic3) {        this.topic3 = topic3;    }    public String getUserName() {        return userName;    }    public void setUserName(String userName) {        this.userName = userName;    }    public String getPassWord() {        return passWord;    }    public void setPassWord(String passWord) {        this.passWord = passWord;    }    public String getGroupId() {        return groupId;    }    public void setGroupId(String groupId) {        this.groupId = groupId;    }    public String getDeviceId() {        return deviceId;    }    public void setDeviceId(String deviceId) {        this.deviceId = deviceId;    }    public Integer getQos() {        return qos;    }    public void setQos(Integer qos) {        this.qos = qos;    }    public Boolean getCleanSession() {        return cleanSession;    }    public void setCleanSession(Boolean cleanSession) {        this.cleanSession = cleanSession;    }    public int getMaxInflight() {        return maxInflight;    }    public void setMaxInflight(int maxInflight) {        this.maxInflight = maxInflight;    }    public String getCompanyIdsWhite() {        return companyIdsWhite;    }    public void setCompanyIdsWhite(String companyIdsWhite) {        this.companyIdsWhite = companyIdsWhite;    }

3、生产者

public static void publishMsg(int cnt) throws Exception {        MqttAsyncClient mqttClient = MQTTAsyncClientFactory.getMqttAsyncProducerClient();        LMQProperties lmqProperties = LMQProperties.getInstance();        //拼接topic        final String mqttSendTopic = lmqProperties.getTopic() + "/" + lmqProperties.getRecieveTopic2() + "/" + lmqProperties.getTopic3();        MqttMessage message = new MqttMessage("我是消息内容".getBytes());        //设置QoS级别        message.setQos(lmqProperties.getQos());        mqttClient.publish(mqttSendTopic, message);        if(cnt%1000==0)        System.out.println("这是第"+cnt+"次发送消息");    }

生产者很简单,就是执行一下publish就可以了。

 

消息想产生一定是会在发消息的时候生成,所以接下来就可以跟下publish方法是怎么生成消息id的。

publish调用的是

ClientComms的sendNoWait,

 

 

最终调用的是ClientState的send方法

getNextMessageId()方法就是生成新消息id的方法:
private synchronized int getNextMessageId() throws MqttException {		int startingMessageId = nextMsgId;		// Allow two complete passes of the message ID range. This gives		// any asynchronous releases a chance to occur		int loopCount = 0;	    do {	        nextMsgId++;	        if ( nextMsgId > MAX_MSG_ID ) {	            nextMsgId = MIN_MSG_ID;	        }	        if (nextMsgId == startingMessageId) {	        	loopCount++;	        	if (loopCount == 2) {	        		throw ExceptionHelper.createMqttException(MqttException.REASON_CODE_NO_MESSAGE_IDS_AVAILABLE);	        	}	        }	    } while( inUseMsgIds.containsKey( new Integer(nextMsgId) ) );	    Integer id = new Integer(nextMsgId);	    inUseMsgIds.put(id, id);	    return nextMsgId;	}

这个方法是个同步方法,说的就是给消息获取新的消息id,并且把它存到一个已经在使用的集合里面作为标记。

 

而这个方法里面的这个异常便是我们的那个异常了

在这个方法里面,涉及到这几个变量:

nextMsgId默认是0, 最小消息id是1,最大消息id是65535。inUseMsgIds这个标记已经使用的消息id的集合是个hashtable。

这个方法大致解释一下,就是进入这个方法之后,会将当前nextMsgId的值存到startingMessageId这个变量上。

定义一个循环数量的变量loopCount,如果循环了达到两次,就抛出那个异常。

进入循环的判断条件就是nextMsgId在inUseMsgIds的集合里面。

循环里面,如果nextMsgId大于最大消息id的时候,会被设置为最小消息id。 所以从这里可以看出,这里的消息id是循环重复使用的。那么什么情况下会出现那个异常呢,就是当从1~65535的消息id都放到了inUseMsgIds里面,并且nextMsgId达到了最大值65535的时候,nextMsgId会被重置为1,会重新再跑一遍1~65535的while循环,就会报那个错了。

inUseMsgIds是记录已经使用的消息id。那么他里面的内容什么时候被清理的呢,ClientState里面提供了

clearState(),close()方法,进行inUseMsgIds.clear()。 而clearState()是在
shutdownConnection的时候执行的。 也就是说在mqtt的client失去连接或者关闭的时候,就会清理inUseMsgIds。

如果只有这个机制才能清理,那么如果一直不失去连接或者不关闭连接,岂不是就会报那个错误,设计一定不会这么傻。所以ClientState还提供了以下方法:

private synchronized void releaseMessageId(int msgId) {   inUseMsgIds.remove(new Integer(msgId));}

这个方法总共有三个调用者:

CommsSender,CommsReceiver,CommsCallback

 

CommsSender在发送成功之后,会调用ClientState.notifySend()方法,进而调用releaseMssageId()。

CommsReceiver在收到ack的消息的时候,会调用ClientState.notifyReceivedAck()方法,进而调用releaseMssageId()。
CommsCallback当等待者和回调处理完消息后调用,会调用ClientState.notifyComplete()方法,进而调用releaseMssageId()。

所以正常情况下,只要消息发送成功,消息成功消费,基本就不存在报那个错。因为消息的id产生了,只要不用了就会被释放。

所以非常遗憾,线上服务重启后就没有再出现这个问题,所以也没有排查到当时为啥会出现那个错误。

 

 

 

转载地址:http://uqcdi.baihongyu.com/

你可能感兴趣的文章
关于无人驾驶的过去、现在以及未来,看这篇文章就够了!
查看>>
所谓的进步和提升,就是完成认知升级
查看>>
昨夜今晨最大八卦终于坐实——人类首次直接探测到了引力波
查看>>
如何优雅、机智地和新公司谈薪水?
查看>>
为什么读了很多书,却学不到什么东西?
查看>>
长文干货:如何轻松应对工作中最棘手的13种场景?
查看>>
如何用好碎片化时间,让思维更有效率?
查看>>
No.147 - LeetCode1108
查看>>
No.174 - LeetCode1305 - 合并两个搜索树
查看>>
No.175 - LeetCode1306
查看>>
No.176 - LeetCode1309
查看>>
No.182 - LeetCode1325 - C指针的魅力
查看>>
mysql:sql create database新建utf8mb4 数据库
查看>>
mysql:sql alter database修改数据库字符集
查看>>
mysql:sql drop table (删除表)
查看>>
mysql:sql truncate (清除表数据)
查看>>
scrapy:xpath string(.)非常注意问题
查看>>
yuv to rgb 转换失败呀。天呀。谁来帮帮我呀。
查看>>
yuv420 format
查看>>
YUV420只绘制Y通道
查看>>