MQTT协议

发布时间 2023-08-31 13:29:10作者: 肖恩雷

1.MQTT协议介绍

官网: http://mqtt.p2hp.com/

image-20230830142113093

MQTT

image-20230830142250638

https://blog.csdn.net/weixin_36173034/article/details/112511014

2.MQTT协议原理

3.MQTT协议数据包结构

image-20230830150152818

image-20230830150242838

Byte1:低4位

image-20230830150337891

image-20230830151032791

MQTT消息质量QoS

取决于发布者发布消息的Qos与订阅者订阅消息的Qos,取他们两者Qos最小的,即”木桶原理“中的最短者

image-20230830151421219

image-20230830151644467

  • QoS0消息发布订阅(最多一次)

发布者发布消息后,服务器直接讲消息发送给订阅者,之间是没有消息确认的

  • QoS1消息发布订阅(最少一次)

    image-20230830152037855

  • QoS2消息发布订阅(只有一次)

image-20230830152349563

4.EMQX简介

官网:https://www.emqx.io/zh

image-20230830155404662

1.安装部署

用docker安装部署

docker run -d --name emqx -p 1883:1883 -p 8083:8083 -p 8084:8084 -p 8883:8883 -p 18083:18083 emqx/emqx:5.0.26
# 端口介绍
# 1883:MQTT 协议端口
# 8084:MQTT/SSL 端口
# 8083:MQTT/WebSocket 端口
# 8080:HTTP API 端口
# 18083:Dashboard 管理控制台端口

部署成功后访问测试:IP:18083

image-20230830161506675

默认登录名:admin ,密码:public

参考文档: https://www.kuangstudy.com/bbs/1401763436836229121

实战项目:https://github.com/xiongsihao/yikekong/tree/master

Java开发MQTT

  1. 试用 mica-mqtt-client-spring-boot-starter进行开发

    1. 添加POM依赖

      <dependency>
          <groupId>net.dreamlu</groupId>
          <artifactId>mica-mqtt-client-spring-boot-starter</artifactId>
          <version>2.1.1</version>
      </dependency>
      
    2. MQTT客户端配置项

      mqtt:
        client:
          enabled: true               # 是否开启客户端,默认:true
          ip: 127.0.0.1               # 连接的服务端 ip ,默认:127.0.0.1
          port: 1883                  # 端口:默认:1883
          name: Mica-Mqtt-Client      # 名称,默认:Mica-Mqtt-Client
          clientId: 000001            # 客户端Id(非常重要,一般为设备 sn,不可重复)
          user-name: mica             # 认证的用户名
          password: 123456            # 认证的密码
          timeout: 5                  # 超时时间,单位:秒,默认:5秒
          reconnect: true             # 是否重连,默认:true
          re-interval: 5000           # 重连时间,默认 5000 毫秒
          version: mqtt_3_1_1         # mqtt 协议版本,可选 MQTT_3_1、mqtt_3_1_1、mqtt_5,默认:mqtt_3_1_1
          read-buffer-size: 8KB       # 接收数据的 buffer size,默认:8k
          max-bytes-in-message: 10MB  # 消息解析最大 bytes 长度,默认:10M
          buffer-allocator: heap      # 堆内存和堆外内存,默认:堆内存
          keep-alive-secs: 60         # keep-alive 时间,单位:秒
          clean-session: true         # mqtt clean session,默认:true
          ssl:
            enabled: false            # 是否开启 ssl 认证,2.1.0 开始支持双向认证
            keystore-path:            # 可选参数:ssl 双向认证 keystore 目录,支持 classpath:/ 路径。
            keystore-pass:            # 可选参数:ssl 双向认证 keystore 密码
            truststore-path:          # 可选参数:ssl 双向认证 truststore 目录,支持 classpath:/ 路径。
            truststore-pass:          # 可选参数:ssl 双向认证 truststore 密码
      

参考文档: https://gitee.com/596392912/mica-mqtt/blob/master/starter/mica-mqtt-client-spring-boot-starter/README.md

			3. 客户端上下线监听

   ```java
   @Service
   @Slf4j
   public class MqttClientConnectListener {
   
       @Autowired
       private MqttClientCreator mqttClientCreator;
   
       @EventListener
       public void onConnected(MqttConnectedEvent event)
       {
           log.info("Mqtt连接上了:{}", event);
       }
       @EventListener
       public void onDisConnected(MqttDisconnectEvent event)
       {
           log.info("Mqtt断开连接:{}",event);
           mqttClientCreator.clientId("newClient" + System.currentTimeMillis());
       }
   
   }
   ```

			4. 订阅消息服务 MqttClientSubscribeListener

   ```java
   @Service
   @Slf4j
   public class MqttClientSubscribeListener {
       @Resource
       private MqttClientTemplate client;
   
       public void subQos0() {
           client.subQos0("/test/#",(context, topic, message, payload)->{
               log.info("context:{},message:{}",context,message);
               log.info(topic + '\t' + new String(payload.array(), StandardCharsets.UTF_8));
               //获取到信息后可以进行后续的处理,存数据等
           });
       }
   
   }
   ```

			5. 在服务启动时,多线程执行订阅消息

   ```java
   @Component
   @Slf4j
   public class StartRunner implements CommandLineRunner {
       @Autowired
       private MqttClientSubscribeListener subscribeListener;
       @Override
       public void run(String... args) throws Exception {
           new Thread(()->{
               while (true)
               {
                   subscribeListener.subQos0();
                   try {
                       Thread.sleep(1000*1);
                   } catch (InterruptedException e) {
                       throw new RuntimeException(e);
                   }
               }
           }).start();
       }
   }
   ```

   `CommandLineRunner` 是一个 Spring Boot 接口,用于编写在应用程序启动时执行的代码。当您的应用程序启动时,Spring Boot 会自动查找并执行实现了 `CommandLineRunner` 接口的 bean。这样,您可以在应用程序启动时运行一些特定的代码,例如初始化数据、加载配置等。
  1. 使用MQTTX客户端模拟发布者发布消息

    image-20230831131518027

  2. 验证,观察日志中打印出了发布者发布的消息

    image-20230831131613042

连接到EMQX 服务器上有2个客户端,一个是QTTX连接的发布者客户端,一个是java程序连接的订阅者客户端

image-20230831131806354