MTQQ协议初探
MQTT协议
协议简介
MQTT是一个客户端服务端架构的发布/订阅模式的消息传输协议。它的设计思想是轻巧、开放、简单、规范,易于实现。这些特点使得它对很多场景来说都是很好的选择,特别是对于受限的环境如机器与机器的通信(M2M)以及物联网环境(IoT)。
1、术语
- 网络连接(Network Connection):MQTT协议使用的底层传输协议基础设施(客户端使用它连接服务器、提供有序双向可靠的字节流服务)。
- 应用消息(Application Message):应用消息通过MQTT传输时,他们有关联的服务质量(QoS)和主题(Topic)。
- 客户端(Client):发布应用消息给其他的客户端、订阅(取消订阅)以请求相关的应用消息、从服务端端开连接。
- 服务端(Server):作为发送消息的客户端和请求消息的客户端的中介。接受来自客户端的网络连接、接受客户端发布的应用消息、处理客户端的订阅或取消订阅请求、转发应用请求给符合条件的客户端。
- 订阅(Subscription):订阅包含一个主题过滤器(Topic Filter)和一个最大的服务质量(QoS)等级。订阅与单个会话(Session)关联。会话可以包含多于一个的订阅。会话的每个订阅都有一个不同的主题过滤器。
- 主题名(Topic Name):附加在应用消息上的一个标签,服务端已知且与订阅匹配。服务端发送应用消息的一个副本给每一个匹配的客户端订阅。
- 主题过滤器(Topic Filter):订阅中包含的一个表达式,用于表示相关的一个或多个主题。主题过滤器可以使用通配符。
- 会话(Session):客户端和服务端之间的状态交互。一些会话持续时长与网络连接一样,另一些可以在客户端和服务端的多个连续网络连接间扩展。
- 传输报文(MQTT Control Packet):通过网络连接发送的信息数据包。MQTT规范定义了十四种不同类型的控制报文,会在后续详细讲解。
2、控制报文
MQTT协议通过交换预定义的MQTT控制报文来通信。由三部分组成,按照下表描述的顺序。
| 报文英文名称 | 中文名称 | 备注 |
|---|---|---|
| Fixed header | 固定报头 | 所有控制报文都包含 |
| Variable header | 可变报头 | 部分控制报文包含 |
| Payload | 有效载荷 | 部分控制报文包含 |
具体协议内部信息可见:mtqq中文文档
协议服务器搭建及搭建
c语言版本
1、安装依赖库
(1) cmake、gcc、openssl-devel yum install cmakeyum install gcc-c++yum install openssl-devel //mosquitto默认支持openssl(2) c-areswget http://c-ares.haxx.se/download/c-ares-1.10.0.tar.gztar xvf c-ares-1.10.0.tar.gzcd c-ares-1.10.0./configuremakemake install(3) lib-uuidyum install libuuid-devel(4) 安装libwebsocket
2、安装mosquitto
wget http://mosquitto.org/files/source/mosquitto-1.4.4.tar.gz
tar -xzvf mosquitto-1.4.4.tar.gz
cd mosquitto-1.4.4
make
make install
安装完成后主要信息路径如下:
| 路径 | 程序文件 |
|---|---|
| /usr/local/sbin | mosquiotto server |
| /etc/mosquitto | configuration |
| /usr/local/bin | utility command |
注:启动客户端可能出现“连接库无法被找到相关错误”,解决方法如下:
vim /etc/ld.so.conf.d/liblocal.conf
/usr/local/lib64
/usr/local/lib
ldconfig
3、配置文件主要信息说明
(1) 生成程序配置文件mv /etc/mosquitto/mosquitto.conf.example /etc/mosquitto/mosquitto.conf(2) 部分配置项说明# 服务进程的PID# pid_file /var/run/mosquitto.pid# 服务进程的系统用户# user mosquitto# 服务绑定的IP地址# bind_address# 服务绑定的端口号# port 1883# 允许的最大连接数,-1表示没有限制# max_connections -1# 允许匿名用户# allow_anonymous true
4、启动服务
/usr/local/sbin/mosquitto -c /etc/mosquitto/mosquitto.conf -d
5、客户端测试
(1)订阅主题mosquitto_sub -t location(2)推送消息mosquitto_pub -t location -h localhost -m "new location"
6、libmosquitto部分函数说明
| 函数名 | 函数作用 |
|---|---|
| mosquitto_lib_version | 获取libmosquitto版本信息 |
| mosquitto_lib_init | 初始化,任何mosquitto库函数之前调用 |
| mosquitto_lib_cleanup | 释放lib资源 |
| mosquitto_new | 建立一个新的mosq客户端对象 |
| mosquitto_destroy | 销毁一个mosq客户端对象 |
| mosquitto_reinitialise | 重新定义一个mosq客户端对象 |
| mosquitto_will_set | 在connect之前调用,配置mosq对象topic |
| mosquitto_will_clear | 删除mosq对象提前配置信息 |
| mosquitto_username_pw_set | 设置mosq对象的用户名和密码 |
| mosquitto_connect | 连接到mosq代理 |
| mosquitto_connect_bind | 连接到mosq代理,可以绑定特定接口 |
| mosquitto_connect_async | 连接到mosq代理,非阻塞(与mosquittoloopstart一起使用) |
| mosquitto_connect_bind_async | 以上两个功能相加 |
| mosquitto_reconnect | 与代理重连 |
| mosquitto_reconnect_async | 与代理重连(异步) |
| mosquitto_disconnect | 与代理断开连接 |
| mosquitto_publish | 在一个特定topic上发布一条消息 |
| mosquitto_subscribe | 订阅一个话题 |
| mosquitto_unsubscribe | 取消订阅一个话题 |
| mosquitto_message_copy | 消息结构体复制 |
| mosquitto_message_free | 完全释放mosquitto_message结构体 |
| mosquitto_message_free_contents | 仅释放mosquitto_message结构体内容 |
| mosquitto_loop | 客户端启动mosq客户端服务 |
| mosquitto_loop_forever | 客户端一直启动mosq客户端服务 |
| mosquitto_loop_start | 另起一个线程启动mosq客户端服务 |
| mosquitto_loop_stop | 关闭一个mosq客户端服务 |
| mosquitto_socket | 将mosq对象转换成fd套接字 |
| mosquitto_loop_read | 启动度操作 |
| mosquitto_loop_write | 启动写操作 |
| mosquitto_loop_misc | ping等杂项操作 |
| mosquitto_want_write | 有数据准备好在套接字上写,则返回true |
| mosquitto_threaded_set | 设置使用的线程 |
| mosquitto_opts_set | 为客户端设置选项 |
| mosquitto_tls_set | 配置客户端tsl/ssl证书 |
| mosquitto_tls_opts_set | 设置高级tsl/ssl选项 |
| mosquitto_tls_psk_set | 配置客户端psk |
| mosquitto_connect_callback_set | 连接应答回调 |
| mosquitto_disconnect_callback_set | 关闭连接回调 |
| mosquitto_publish_callback_set | 发布消息回调 |
| mosquitto_message_callback_set | 获得推送消息回调 |
| mosquitto_subscribe_callback_set | 订阅消息回调 |
| mosquitto_unsubscribe_callback_set | 取消订阅回调 |
| mosquitto_log_callback_set | 日志回调 |
| mosquitto_reconnect_delay_set | 重连延时设置 |
| mosquitto_max_inflight_messages_set | 允许多大数量的QoS为1或2消息被同时进行传输处理 |
| mosquitto_user_data_set | 用户数据设置 |
6、推送客户端demo和订阅客户端demo
推送客户端(mqtt_client_pub.c)
/******************************************************************************
** Coypright(C) 2014-2024 Qiware technology Co., Ltd
**
** 文件名: client_pub.c
** 版本号: 1.0
** 描 述:
** 作 者: # Zengyao.pang # 2017年09月04日 星期一 11时36分40秒 #
******************************************************************************/
#include
#include
#include void my_connect_callback(struct mosquitto *mosq, void *obj, int result)
{ int mid = 0; if (!result) { mosquitto_publish(mosq, &mid, "subb", 0, NULL, 2, true); } else { fprintf(stderr, "Connect failed\n"); }
} void my_log_callback(struct mosquitto *mosq, void *userdata, int level, const char *str)
{ printf("%s\n", str);
} void my_publish_callback(struct mosquitto *mosq, void *obj, int mid)
{ fprintf(stderr, "publish (mid: %d)\n", mid);
} int main()
{ int i; char *host = "localhost"; int port = 1883; int keepalive = 60; bool clean_session = false; struct mosquitto *mosq = NULL; char message[100]; int ret; int mid = 1; char *will_mess = "will_mess"; char *userid = "test_pub"; /* > mosquitto_lib 初始化(第一句)*/ mosquitto_lib_init();/* > 新建一个mosq对象 */ mosq = mosquitto_new(userid, clean_session, NULL); if (!mosq) { fprintf(stderr, "Error: Out of memory.\n"); return -1; } /* > 日志回调 */ mosquitto_log_callback_set(mosq, my_log_callback); /* > 收到连接应答回调 */ mosquitto_connect_callback_set(mosq, my_connect_callback); /* > 发布消息回调 */ mosquitto_publish_callback_set(mosq, my_publish_callback); /* > 设置遗嘱信息 */ mosquitto_will_set(mosq, "subb", strlen(will_mess) + 1, will_mess, 2, 0); /* > 连接 */ mosquitto_connect(mosq, host, port, keepalive); /* > 在另外一个线程启动 */ mosquitto_loop_start(mosq); /* > 从终端得到消息发布 */ while (scanf("%s", message)) { getchar(); message[strlen(message)] = '\0'; ret = mosquitto_publish(mosq, &mid, "subb", strlen(message) + 1, message, 2, true);++mid; if (ret) { mosquitto_disconnect(mosq); fprintf(stderr, "mosquitto_publish ERROR"); } } mosquitto_destroy(mosq); mosquitto_lib_cleanup(); return 0;
}
订阅客户端(mqtt_client_sub.c)
/******************************************************************************
** Coypright(C) 2014-2024 Qiware technology Co., Ltd
**
** 文件名: mtqq_client.c
** 版本号: 1.0
** 描 述:
** 作 者: # Zengyao.Pang # 2017年09月01日 星期五 16时26分10秒 #
******************************************************************************/
#include
#include
#include void my_message_callback(struct mosquitto *mosq, void *userdata, const struct mosquitto_message *message)
{ if (message->payloadlen) { printf("%s %s\n", message->topic, message->payload); } else { printf("%s (null)\n", message->topic); } fflush(stdout);
} void my_connect_callback(struct mosquitto *mosq, void *userdata, int result)
{ int i; if(!result){ //mosquitto_subscribe(mosq, NULL, "$SYS/#", 2); /* > 进行消息订阅 */ mosquitto_subscribe(mosq, NULL, "subb", 2); } else { fprintf(stderr, "Connect failed\n"); }
} void my_subscribe_callback(struct mosquitto *mosq, void *userdata, int mid, int qos_count, const int *granted_qos)
{ int i; printf("Subscribed (mid: %d): %d", mid, granted_qos[0]); for(i=1; i
编译运行
gcc mqtt_client_pub.c –lmosquitto –o pub
gcc mqtt_client_sub.c –lmosquitto –o sub
./pub
./sub
在pub终端输入任意字符串,按回车进行推送,在sub终端收到信息
go语言版本
相关库安装
1、 git clone https://github.com/golang/net.git
2、 拷贝net目录到“$GO_PATH/src/golang.org/x”目录下
3、 进入“$GO_PATH/src/golang.org/x/net/websocket",运行go build
4、 运行"go get github.com/eclipse/paho.mqtt.golang"
5、 go get golang.org/x/net/websocket
6、 go get golang.org/x/net/proxy
paho.mqtt.golang相关说明
接口文档连接 https://godoc.org/github.com/eclipse/paho.mqtt.golang
相关函数简要说明(跟c语言实现的功能类似),主要接口实现:
1、type Client
type Client interface {/* 判断是否连接 */IsConnected() bool/* 进行连接 */Connect() Token/* 关闭连接 */Disconnect(quiesce uint)/* 发布消息 */Publish(topic string, qos byte, retained bool, payload interface{}) Token/* 订阅消息,主题 */Subscribe(topic string, qos byte, callback MessageHandler) Token/*订阅消息,主题过滤器 */SubscribeMultiple(filters map[string]byte, callback MessageHandler) Token/* 取消订阅 */Unsubscribe(topics ...string) Token/* 增加订阅主题 */AddRoute(topic string, callback MessageHandler)
}2、type ClientOptions以下结构体内容均可以调用相关函数进行设置。type ClientOptions struct {Servers []*url.URL /* 设置mqtt代理服务 */ClientID string /* 客户端id */Username string /* 用户名 */Password string /* 用户密码 */CleanSession bool /* 断线后是否清理会话 */Order bool /* 是否保证每个QoS级别顺序 */WillEnabled bool /* 是否设定遗嘱 */WillTopic string /* 遗嘱话题 */WillPayload []byte /* 遗嘱消息 */WillQos byte /* 遗嘱级别 */WillRetained bool /* 断线后是否保留遗嘱会话 */ProtocolVersion uint /* 协议版本 */TLSConfig tls.Config /* tls配置 */KeepAlive int64 /* 保活 */PingTimeout time.Duration /* ping */ConnectTimeout time.Duration /* 连接超时 */MaxReconnectInterval time.Duration /* 最大重连间隔 */AutoReconnect bool /* 是否自动重连 */Store Store /* 提供持久性的消息接口 */DefaultPublishHandler MessageHandler /* 发布消息回调 */OnConnect OnConnectHandler /* 连接回调 */OnConnectionLost ConnectionLostHandler /* 连接意外中断回调 */WriteTimeout time.Duration /* 写超时回调 */MessageChannelDepth uint /* 设置离线期间队列大小*/// contains filtered or unexported fields}3、ClientOptionsReader与2相反,此对象主要提供读取option相关信息4、ConnectTokenclient类型建立连接时返回类型type ConnectToken interface {/* 返回错误信息 */Error() error/* 返回在connect过程中connack相应的code */ReturnCode() byte/* 无限期的等到连接完成 */Wait() bool/* 在d时间范围内等待连接完成WaitTimeout(d time.Duration) bool}5、DisconnectTokenclient类型关闭连接时返回类型type DisconnectToken interface {/* 返回错误信息 */Error() error/* 无限期的等到连接中断完成 */Wait() bool/* 在d时间范围内等待连接中断完成WaitTimeout(d time.Duration) bool}注:还有PublishToken、SubscribeToken、UnsubscribeToken接口类似在此不一一列举6、MId消息id(uint16)7、MemoryStoreMemoryStore实现了存储界面,以提供完全存储在内存中的“持久性”机制。 只要客户机实例存在,store就存在。type MemoryStore struct {sync.RWMutex// contains filtered or unexported fields}/* 返回一个指向MemoryStore新的实例指针,没有初始化,等待open调用 */func NewMemoryStore() *MemoryStore/* 返回一个包含memorystore所有当前键的字符串 */func (store *MemoryStore) All() []string/* 执行此操作后,不允许修改MemoryStore状态func (store *MemoryStore) Close()/* 删除一个键 */func (store *MemoryStore) Del(key string)/* 通过键值获取消息指针 */func (store *MemoryStore) Get(key string) packets.ControlPacket/* 初始化一个MemoryStore实例 */func (store *MemoryStore) Open()/* 将一个键值和一个消息指针放入store中 */func (store *MemoryStore) Put(key string, message packets.ControlPacket)/* 删除所有持久化消息 */func (store *MemoryStore) Reset()注:Store接口与其类似8、Messagetype Message interface {Duplicate() bool /* dup标志 */Qos() byte /* 服务质量 */Retained() bool /* retain标志 */Topic() string /* 主题 */MessageID() uint16 /* 消息id */Payload() []byte /* 消息内容 */}
客户端demo
MTQQ_client.go 功能:消息订阅、发送、并接收 go run MTQQ_client.go
package main import ( "fmt" MQTT "github.com/eclipse/paho.mqtt.golang" "os" "time"
) /* 消息到来处理回调 */
var f MQTT.MessageHandler = func(client MQTT.Client, msg MQTT.Message) { fmt.Printf("TOPIC: %s\n", msg.Topic()) fmt.Printf("MSG: %s\n", msg.Payload())
} func main() { /* 构建一个客户端配置对象,可以包含客户端id,代理地址,handler等*/ opts := MQTT.NewClientOptions().AddBroker("tcp://iot.eclipse.org:1883") opts.SetClientID("go-simple") opts.SetDefaultPublishHandler(f) /* 用opts建立一个mqtt对象 */ c := MQTT.NewClient(opts) if token := c.Connect(); token.Wait() && token.Error() != nil { panic(token.Error()) } /* 订阅 */ if token := c.Subscribe("go-mqtt/sample", 0, nil); token.Wait() && token.Error() != nil {fmt.Println(token.Error()) os.Exit(1) } /* 发布 */ for i := 0; i < 5; i++ { text := fmt.Sprintf("this is msg #%d!", i) token := c.Publish("go-mqtt/sample", 0, false, text) token.Wait() } time.Sleep(3 * time.Second) /* 取消订阅 */ if token := c.Unsubscribe("go-mqtt/sample"); token.Wait() && token.Error() != nil {fmt.Println(token.Error()) os.Exit(1) } /* 断开连接 */ c.Disconnect(250)
}
本文来自互联网用户投稿,文章观点仅代表作者本人,不代表本站立场,不承担相关法律责任。如若转载,请注明出处。 如若内容造成侵权/违法违规/事实不符,请点击【内容举报】进行投诉反馈!
