本文共 10856 字,大约阅读时间需要 36 分钟。
昨天去巡检线上环境的时候,偶然发现了某个服务报了一个错误,而且是每隔90秒报一次,错误信息如下:
意思是内部错误,没有新的messageid可以使用了。
消息队列就不多说了。正常的情况就是一个消息会有一个消息id,如果不了解mqtt的消息id的话,我们正常人的思维就是这个消息id是个随机数,因为消息的id是int类型,所以最大值是2^31-1,大概是21亿,对于一个庞大的系统而且是持续运行的系统,消息肯定会满天飞,数量级可能是百亿甚至千亿,那这个id能装得下么。结合着这个疑问和这个报错,捋了一下这块的代码。
对于消息队列来讲,就是生产者、消费者两个角色。 生产者很简单,生产消息,简单的可以理解为建立连接、发送消息、关闭连接。 消费者是监听某个队列,如果有消息就去接收,并做处理,那么对于消费者来讲就是一个持续连接的状态。
下面先贴下伪代码:
1、相关依赖:
org.eclipse.paho org.eclipse.paho.client.mqttv3 1.1.0 com.aliyun.openservices ons-client 1.3.2.Final com.alibaba fastjson 1.2.32
2、消费者,采用的是异步监听的方式
@Componentpublic class MQTTAsyncClientFactory implements ApplicationRunner { private static MqttAsyncClient mqttAsyncClient; private MQTTAsyncClientFactory() { } public static MqttAsyncClient getMqttAsyncProducerClient() throws MqttException { return mqttAsyncClient; } @Override public void run(ApplicationArguments args) throws Exception { LMQProperties lmqProperties = LMQProperties.getInstance(); String clientId = lmqProperties.getGroupId() + "@@@" + lmqProperties.getDeviceId() + InetAddress.getLocalHost().getHostAddress().toString().replaceAll("\\.", ""); final MemoryPersistence memoryPersistence = new MemoryPersistence(); mqttAsyncClient = new MqttAsyncClient(lmqProperties.getBrokerUrl(), clientId, memoryPersistence); MqttConnectOptions connOpts = new MqttConnectOptions(); connOpts.setUserName(lmqProperties.getUserName()); connOpts.setPassword(lmqProperties.getPassWord().toCharArray()); connOpts.setCleanSession(lmqProperties.getCleanSession()); connOpts.setKeepAliveInterval(10); connOpts.setAutomaticReconnect(true); connOpts.setMaxInflight(lmqProperties.getMaxInflight()); mqttAsyncClient.setCallback(new MqttCallbackExtended() { @Override public void connectComplete(boolean reconnect, String serverURI) { try { //订阅Topic,可以订阅多个 final String topicFilter[] = {lmqProperties.getTopic() + "/" + lmqProperties.getRecieveTopic2() + "/" + lmqProperties.getTopic3()}; //设置QoS级别 final int[] qos = {lmqProperties.getQos()}; Thread.sleep(2000); System.out.println("连接成功,订阅消息"); IMqttToken subscribe = mqttAsyncClient.subscribe(topicFilter, qos); System.out.println("消息ID:"+subscribe.getMessageId()+" Thread="+Thread.currentThread().getName()); } catch (MqttException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } } @Override public void connectionLost(Throwable throwable) { throwable.printStackTrace(); } @Override public void messageArrived(String s, MqttMessage mqttMessage) throws Exception { //处理pos返回消息 rececive(new String(mqttMessage.getPayload())); } @Override public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) { //this notice make sense when qos >0 int serialid = iMqttDeliveryToken.getMessageId(); System.out.println("消息id是"+serialid+" Thread="+Thread.currentThread().getName()); } }); mqttAsyncClient.connect(connOpts).waitForCompletion(); System.out.println("mqtt连接成功"); } public static boolean rececive(String msg){ //TODO 处理pos返回消息 return true; }}
消费者其实实现起来也很简单,就是实现接口
ApplicationRunner,并且把类注册到spring容器中就行了。
ApplicationRunner接口很简单:
public interface ApplicationRunner { /** * Callback used to run the bean. * @param args incoming application arguments * @throws Exception on error */ void run(ApplicationArguments args) throws Exception;}
这个接口的注释说的也很明确:
Interface used to indicate that a bean should run when it is contained within a {@link SpringApplication}. Multiple {@link ApplicationRunner} beans can be defined within the same application context and can be ordered using the {@link Ordered} interface or {@link Order @Order} annotation.
意思是只要你这个bean实现了这个接口,并且包含在spring容器中,就会执行实现的run方法。多个bean可以在同一个spring上下文中,并且可以通过order接口或注解来进行指定执行顺序。
run方法会在spring容器启动之后进行执行,
这里面有一个对象MqttAsyncClient,是在spring容器启动之后进行实例化而且实例化了一次。
正常同步的话,我们可以在
mqttAsyncClient.connect(connOpts)之后,进行订阅
mqttAsyncClient.subscribe(topicFilter, qos);
但是在这个demo里面,我们是在连接成功之后,回调确认连接成功的时候进行的监听,应该是一样的。因为确认连接的过程实际上也是发送了一条消息。
LMQProperties这个类是一个单例,封装了mqtt连接的信息,具体配置的值我已经去掉了。
@Componentpublic class LMQProperties { //服务地址 private String brokerUrl; //一级topic private String topic; //二级发送消息topic 商户ID_标识符 标识符:用1/0标记,1:消息从云平台到POS,0:消息从POS到云平台 private String sendTopic2; //二级接收消息topic private String recieveTopic2; //三级topic private String topic3; //服务系统用户名 private String userName; // private String passWord; //客户端配置分组id 在控制台创建后使用 private String groupId; //Device ID: 每个设备独一无二的标识,由业务方自己指定。需要保证全局唯一,例如每个传感器设备的序列号。 //与groupId 一起生成 clientID = groupId+@@@+deviceId private String deviceId; //QoS级别 private Integer qos; //设置客户端是否使用持久化模式 private Boolean cleanSession; //允许未能及时收到broker回复的ack时的MQ消息最大数量 private int maxInflight; //老版本队列的企业号白名单 private String companyIdsWhite; private LMQProperties(){ brokerUrl = ""; topic = ""; sendTopic2 = "test"; recieveTopic2 = ""; topic3 = ""; userName=""; passWord=""; groupId=""; deviceId=""; qos=1; cleanSession= false; maxInflight=100000; } private static LMQProperties INSTANCE = new LMQProperties(); public static LMQProperties getInstance(){ return INSTANCE; } public String getBrokerUrl() { return brokerUrl; } public void setBrokerUrl(String brokerUrl) { this.brokerUrl = brokerUrl; } public String getTopic() { return topic; } public void setTopic(String topic) { this.topic = topic; } public String getSendTopic2() { return sendTopic2; } public void setSendTopic2(String sendTopic2) { this.sendTopic2 = sendTopic2; } public String getRecieveTopic2() { return recieveTopic2; } public void setRecieveTopic2(String recieveTopic2) { this.recieveTopic2 = recieveTopic2; } public String getTopic3() { return topic3; } public void setTopic3(String topic3) { this.topic3 = topic3; } public String getUserName() { return userName; } public void setUserName(String userName) { this.userName = userName; } public String getPassWord() { return passWord; } public void setPassWord(String passWord) { this.passWord = passWord; } public String getGroupId() { return groupId; } public void setGroupId(String groupId) { this.groupId = groupId; } public String getDeviceId() { return deviceId; } public void setDeviceId(String deviceId) { this.deviceId = deviceId; } public Integer getQos() { return qos; } public void setQos(Integer qos) { this.qos = qos; } public Boolean getCleanSession() { return cleanSession; } public void setCleanSession(Boolean cleanSession) { this.cleanSession = cleanSession; } public int getMaxInflight() { return maxInflight; } public void setMaxInflight(int maxInflight) { this.maxInflight = maxInflight; } public String getCompanyIdsWhite() { return companyIdsWhite; } public void setCompanyIdsWhite(String companyIdsWhite) { this.companyIdsWhite = companyIdsWhite; }
3、生产者
public static void publishMsg(int cnt) throws Exception { MqttAsyncClient mqttClient = MQTTAsyncClientFactory.getMqttAsyncProducerClient(); LMQProperties lmqProperties = LMQProperties.getInstance(); //拼接topic final String mqttSendTopic = lmqProperties.getTopic() + "/" + lmqProperties.getRecieveTopic2() + "/" + lmqProperties.getTopic3(); MqttMessage message = new MqttMessage("我是消息内容".getBytes()); //设置QoS级别 message.setQos(lmqProperties.getQos()); mqttClient.publish(mqttSendTopic, message); if(cnt%1000==0) System.out.println("这是第"+cnt+"次发送消息"); }
生产者很简单,就是执行一下publish就可以了。
消息想产生一定是会在发消息的时候生成,所以接下来就可以跟下publish方法是怎么生成消息id的。
publish调用的是
ClientComms的sendNoWait,
最终调用的是ClientState的send方法
getNextMessageId()方法就是生成新消息id的方法:
private synchronized int getNextMessageId() throws MqttException { int startingMessageId = nextMsgId; // Allow two complete passes of the message ID range. This gives // any asynchronous releases a chance to occur int loopCount = 0; do { nextMsgId++; if ( nextMsgId > MAX_MSG_ID ) { nextMsgId = MIN_MSG_ID; } if (nextMsgId == startingMessageId) { loopCount++; if (loopCount == 2) { throw ExceptionHelper.createMqttException(MqttException.REASON_CODE_NO_MESSAGE_IDS_AVAILABLE); } } } while( inUseMsgIds.containsKey( new Integer(nextMsgId) ) ); Integer id = new Integer(nextMsgId); inUseMsgIds.put(id, id); return nextMsgId; }
这个方法是个同步方法,说的就是给消息获取新的消息id,并且把它存到一个已经在使用的集合里面作为标记。
而这个方法里面的这个异常便是我们的那个异常了
在这个方法里面,涉及到这几个变量:
nextMsgId默认是0, 最小消息id是1,最大消息id是65535。inUseMsgIds这个标记已经使用的消息id的集合是个hashtable。
这个方法大致解释一下,就是进入这个方法之后,会将当前nextMsgId的值存到startingMessageId这个变量上。
定义一个循环数量的变量loopCount,如果循环了达到两次,就抛出那个异常。
进入循环的判断条件就是nextMsgId在inUseMsgIds的集合里面。
循环里面,如果nextMsgId大于最大消息id的时候,会被设置为最小消息id。 所以从这里可以看出,这里的消息id是循环重复使用的。那么什么情况下会出现那个异常呢,就是当从1~65535的消息id都放到了inUseMsgIds里面,并且nextMsgId达到了最大值65535的时候,nextMsgId会被重置为1,会重新再跑一遍1~65535的while循环,就会报那个错了。
inUseMsgIds是记录已经使用的消息id。那么他里面的内容什么时候被清理的呢,ClientState里面提供了
clearState(),close()方法,进行inUseMsgIds.clear()。 而clearState()是在
shutdownConnection的时候执行的。 也就是说在mqtt的client失去连接或者关闭的时候,就会清理inUseMsgIds。
如果只有这个机制才能清理,那么如果一直不失去连接或者不关闭连接,岂不是就会报那个错误,设计一定不会这么傻。所以ClientState还提供了以下方法:
private synchronized void releaseMessageId(int msgId) { inUseMsgIds.remove(new Integer(msgId));}
这个方法总共有三个调用者:
CommsSender,CommsReceiver,CommsCallback
CommsSender在发送成功之后,会调用ClientState.notifySend()方法,进而调用releaseMssageId()。
CommsReceiver在收到ack的消息的时候,会调用ClientState.notifyReceivedAck()方法,进而调用releaseMssageId()。
CommsCallback当等待者和回调处理完消息后调用,会调用ClientState.notifyComplete()方法,进而调用releaseMssageId()。
所以正常情况下,只要消息发送成功,消息成功消费,基本就不存在报那个错。因为消息的id产生了,只要不用了就会被释放。
所以非常遗憾,线上服务重启后就没有再出现这个问题,所以也没有排查到当时为啥会出现那个错误。
转载地址:http://uqcdi.baihongyu.com/