MQTT 生产者(同步)代码解读

发布时间 2023-06-20 09:36:11作者: eiSouthBoy

一、问题引入

官方给出了MQTT Client的同步和异步发布的例子,本随笔就是同步发布的example。同步和异步都有一套API函数和结构体。

同步发布消息算是最简单的案例了,这里总结一下代码。

二、解决过程

简要介绍编写 MQTT Producer的消息发布(同步)过程:

  • 第1步:创建客户端
LIBMQTT_API int MQTTClient_create(MQTTClient* handle, const char* serverURI, const char* clientId,
		int persistence_type, void* persistence_context);
  • 第2步:设置客户端与服务器的连接选项属性
MQTTClient_connectOptions conn_opts = MQTTClient_connectOptions_initializer;

conn_opts.keepAliveInterval = 30;
//conn_opts的其它选项
  • 第3步:连接服务器
LIBMQTT_API int MQTTClient_connect(MQTTClient handle, MQTTClient_connectOptions* options);
  • 第4步:设置消息发布选项属性
MQTTClient_message pubmsg = MQTTClient_message_initializer;

pubmsg.qos = 1;
//pubmsg的其他选项
  • 第5步:发布消息
LIBMQTT_API int MQTTClient_publishMessage(MQTTClient handle, const char* topicName, 
        MQTTClient_message* msg, MQTTClient_deliveryToken* dt);
  • 第6步:等待消息发布完成(同步消息发布模式才有这一步)
LIBMQTT_API int MQTTClient_waitForCompletion(MQTTClient handle, MQTTClient_deliveryToken dt, unsigned long timeout);
  • 第7步:断开与服务器的连接
LIBMQTT_API int MQTTClient_disconnect(MQTTClient handle, int timeout);
  • 第8步:释放内存
LIBMQTT_API void MQTTClient_destroy(MQTTClient* handle);

2-1 数据类型的声明

创建客户端需要定义一个客户端指针(其实是一个无类指针),客户端成功创建会申请一段内存,该指针指向这段内存,且程序结束后要进行释放

/**
 * A handle representing an MQTT client. A valid client handle is available
 * following a successful call to MQTTClient_create().
 */
typedef void* MQTTClient;

MQTTClient client;

连接客户端,需要通过一个结构体传送设置选项信息

// MQTTClient_connectOptions 类型的变量通过指定宏进行初始化
#define MQTTClient_connectOptions_initializer { {'M', 'Q', 'T', 'C'}, 8, 60, 1, 1, NULL, NULL, NULL, 30, 0, NULL,\
0, NULL, MQTTVERSION_DEFAULT, {NULL, 0, 0}, {0, NULL}, -1, 0, NULL, NULL, NULL}

MQTTClient_connectOptions conn_opts = MQTTClient_connectOptions_initializer;

MQTTClient_connectOptions结构体的成员:

成员名称 取值范围 功能
char struct_id[4] 结构体识别序号
int struct_version 0~8 结构体号码
int keepAliveInterval 心跳包时间间隔
int cleansession True or False 清除会话标志
int reliable True or False 可靠性,控制单条发送 or 批量发送
MQTTClient_willOptions* will 遗嘱功能选项
const char* username 用户名,服务器支持MQTT v3.1.1版本的用户名和密码的授权和认证机制
const char* password 密码,同上
int connectTimeout 连接超时时长
int retryInterval 消息重发时间间隔,MQTT v3.1.1以及之后的版本不要了
MQTTClient_SSLOptions* ssl 加密功能
int serverURIcount 可选服务器数量,默认是0
char* const* serverURIs 可选服务器列表,即MQTT集群功能
int MQTTVersion 0:默认3.1.1,3:3.1,4:3.1.1,5:5.0 MQTT版本
struct returned MQTT v3.1.1连接返回值
struct binarypwd 可选的二进制密码
int maxInflightMessages 传输中的最大消息数
int cleanstart MQTT v5.0版本,清除开始标志位
const MQTTClient_nameValue* httpHeaders websockets的http头部
const char* httpProxy http代理
const char* httpsProxy https代理

MQTTClient_willOptions结构体的成员

成员名称 取值范围 功能
char struct_id[4] 结构体识别序号
int struct_version 0~1 结构体号码
const char* topicName 带有遗嘱的主题
const char* message 遗嘱消息
int retained True or False 消息保留标志位
int qos 0~2 消息质量
struct payload 遗嘱消息二进制格式

消息发布中的属性通过一个结构体传送

// MQTTClient_message 类型的变量通过指定宏进行初始化
#define MQTTClient_message_initializer { {'M', 'Q', 'T', 'M'}, 1, 0, NULL, 0, 0, 0, 0, MQTTProperties_initializer }

MQTTClient_message pubmsg = MQTTClient_message_initializer;

MQTTClient_message结构体的成员

成员名称 取值范围 功能
char struct_id[4] 结构体识别序号
int struct_version 0~1 结构体号码
int payloadlen 消息长度
void* payload 消息
int qos 0~2 消息质量
int retained True or False 消息保留标志位
int dup True or False 消息副本标志,仅在接收QOS=1的消息时有效
int msgid 消息标识符
MQTTProperties properties MQTT v5.0关联的消息属性

消息发布后,会产生一个token令牌,该令牌被用作检查消息是否被分发到目的地。

typedef int MQTTClient_deliveryToken;
typedef int MQTTClient_token;

三、反思总结

验证同步发布消息是否成功,通过函数:MQTTClient_waitForCompletion() 进行阻塞判断,返回值为:MQTTCLIENT_SUCCESS 表示消息成功发布到Broker。

同步发布消息的效率很低,但可靠性很强,不会造成消息未发送完成,但消息丢失的情况。

四、参考引用