MTQQ协议初探

MQTT协议

协议简介

MQTT是一个客户端服务端架构的发布/订阅模式的消息传输协议。它的设计思想是轻巧、开放、简单、规范,易于实现。这些特点使得它对很多场景来说都是很好的选择,特别是对于受限的环境如机器与机器的通信(M2M)以及物联网环境(IoT)。

1、术语

  • 网络连接(Network Connection):MQTT协议使用的底层传输协议基础设施(客户端使用它连接服务器、提供有序双向可靠的字节流服务)。
  • 应用消息(Application Message):应用消息通过MQTT传输时,他们有关联的服务质量(QoS)和主题(Topic)。
  • 客户端(Client):发布应用消息给其他的客户端、订阅(取消订阅)以请求相关的应用消息、从服务端端开连接。
  • 服务端(Server):作为发送消息的客户端和请求消息的客户端的中介。接受来自客户端的网络连接、接受客户端发布的应用消息、处理客户端的订阅或取消订阅请求、转发应用请求给符合条件的客户端。
  • 订阅(Subscription):订阅包含一个主题过滤器(Topic Filter)和一个最大的服务质量(QoS)等级。订阅与单个会话(Session)关联。会话可以包含多于一个的订阅。会话的每个订阅都有一个不同的主题过滤器。
  • 主题名(Topic Name):附加在应用消息上的一个标签,服务端已知且与订阅匹配。服务端发送应用消息的一个副本给每一个匹配的客户端订阅。
  • 主题过滤器(Topic Filter):订阅中包含的一个表达式,用于表示相关的一个或多个主题。主题过滤器可以使用通配符。
  • 会话(Session):客户端和服务端之间的状态交互。一些会话持续时长与网络连接一样,另一些可以在客户端和服务端的多个连续网络连接间扩展。
  • 传输报文(MQTT Control Packet):通过网络连接发送的信息数据包。MQTT规范定义了十四种不同类型的控制报文,会在后续详细讲解。

2、控制报文

MQTT协议通过交换预定义的MQTT控制报文来通信。由三部分组成,按照下表描述的顺序。

报文英文名称中文名称备注
Fixed header固定报头所有控制报文都包含
Variable header可变报头部分控制报文包含
Payload有效载荷部分控制报文包含

具体协议内部信息可见:mtqq中文文档

协议服务器搭建及搭建

c语言版本

1、安装依赖库

(1) cmake、gcc、openssl-devel yum install cmakeyum install gcc-c++yum install openssl-devel //mosquitto默认支持openssl(2) c-areswget http://c-ares.haxx.se/download/c-ares-1.10.0.tar.gztar xvf c-ares-1.10.0.tar.gzcd c-ares-1.10.0./configuremakemake install(3) lib-uuidyum install libuuid-devel(4) 安装libwebsocket

2、安装mosquitto

wget http://mosquitto.org/files/source/mosquitto-1.4.4.tar.gz
tar -xzvf mosquitto-1.4.4.tar.gz
cd mosquitto-1.4.4
make
make install

安装完成后主要信息路径如下:

路径程序文件
/usr/local/sbinmosquiotto server
/etc/mosquittoconfiguration
/usr/local/binutility command

注:启动客户端可能出现“连接库无法被找到相关错误”,解决方法如下:

vim /etc/ld.so.conf.d/liblocal.conf
/usr/local/lib64
/usr/local/lib
ldconfig

3、配置文件主要信息说明

(1) 生成程序配置文件mv /etc/mosquitto/mosquitto.conf.example /etc/mosquitto/mosquitto.conf(2) 部分配置项说明# 服务进程的PID# pid_file /var/run/mosquitto.pid# 服务进程的系统用户# user mosquitto# 服务绑定的IP地址# bind_address# 服务绑定的端口号# port 1883# 允许的最大连接数,-1表示没有限制# max_connections -1# 允许匿名用户# allow_anonymous true

4、启动服务

/usr/local/sbin/mosquitto -c /etc/mosquitto/mosquitto.conf -d

5、客户端测试

(1)订阅主题mosquitto_sub -t location(2)推送消息mosquitto_pub -t location -h localhost -m "new location"

6、libmosquitto部分函数说明

函数名函数作用
mosquitto_lib_version获取libmosquitto版本信息
mosquitto_lib_init初始化,任何mosquitto库函数之前调用
mosquitto_lib_cleanup释放lib资源
mosquitto_new建立一个新的mosq客户端对象
mosquitto_destroy销毁一个mosq客户端对象
mosquitto_reinitialise重新定义一个mosq客户端对象
mosquitto_will_set在connect之前调用,配置mosq对象topic
mosquitto_will_clear删除mosq对象提前配置信息
mosquitto_username_pw_set设置mosq对象的用户名和密码
mosquitto_connect连接到mosq代理
mosquitto_connect_bind连接到mosq代理,可以绑定特定接口
mosquitto_connect_async连接到mosq代理,非阻塞(与mosquittoloopstart一起使用)
mosquitto_connect_bind_async以上两个功能相加
mosquitto_reconnect与代理重连
mosquitto_reconnect_async与代理重连(异步)
mosquitto_disconnect与代理断开连接
mosquitto_publish在一个特定topic上发布一条消息
mosquitto_subscribe订阅一个话题
mosquitto_unsubscribe取消订阅一个话题
mosquitto_message_copy消息结构体复制
mosquitto_message_free完全释放mosquitto_message结构体
mosquitto_message_free_contents仅释放mosquitto_message结构体内容
mosquitto_loop客户端启动mosq客户端服务
mosquitto_loop_forever客户端一直启动mosq客户端服务
mosquitto_loop_start另起一个线程启动mosq客户端服务
mosquitto_loop_stop关闭一个mosq客户端服务
mosquitto_socket将mosq对象转换成fd套接字
mosquitto_loop_read启动度操作
mosquitto_loop_write启动写操作
mosquitto_loop_miscping等杂项操作
mosquitto_want_write有数据准备好在套接字上写,则返回true
mosquitto_threaded_set设置使用的线程
mosquitto_opts_set为客户端设置选项
mosquitto_tls_set配置客户端tsl/ssl证书
mosquitto_tls_opts_set设置高级tsl/ssl选项
mosquitto_tls_psk_set配置客户端psk
mosquitto_connect_callback_set连接应答回调
mosquitto_disconnect_callback_set关闭连接回调
mosquitto_publish_callback_set发布消息回调
mosquitto_message_callback_set获得推送消息回调
mosquitto_subscribe_callback_set订阅消息回调
mosquitto_unsubscribe_callback_set取消订阅回调
mosquitto_log_callback_set日志回调
mosquitto_reconnect_delay_set重连延时设置
mosquitto_max_inflight_messages_set允许多大数量的QoS为1或2消息被同时进行传输处理
mosquitto_user_data_set用户数据设置

6、推送客户端demo和订阅客户端demo

推送客户端(mqtt_client_pub.c)

/******************************************************************************                                                                                                   
** Coypright(C) 2014-2024 Qiware technology Co., Ltd                           
**                                                                             
** 文件名: client_pub.c                                                        
** 版本号: 1.0                                                                 
** 描  述:                                                                     
** 作  者: # Zengyao.pang # 2017年09月04日 星期一 11时36分40秒 #               
******************************************************************************/
#include                                                           
#include                                                               
#include                                                              void my_connect_callback(struct mosquitto *mosq, void *obj, int result)         
{                                                                               int mid = 0;                                                                if (!result) {                                                              mosquitto_publish(mosq, &mid, "subb", 0, NULL, 2, true);                } else {                                                                    fprintf(stderr, "Connect failed\n");                                    }                                                                           
}                                                                               void my_log_callback(struct mosquitto *mosq, void *userdata, int level,     const char *str)
{                                                                               printf("%s\n", str);                                                        
}                                                                               void my_publish_callback(struct mosquitto *mosq, void *obj, int mid)            
{                                                                               fprintf(stderr, "publish (mid: %d)\n", mid);                                
}                                                                               int main()                                                                      
{                                                                               int i;                                                                      char *host = "localhost";                                                   int port = 1883;                                                            int keepalive = 60;                                                         bool clean_session = false;                                                 struct mosquitto *mosq = NULL;                                              char message[100];                                                          int ret;                                                                    int mid = 1;                                                                char *will_mess = "will_mess";                                              char *userid = "test_pub";                                                  /* > mosquitto_lib 初始化(第一句)*/                                       mosquitto_lib_init();/* > 新建一个mosq对象 */                                                    mosq = mosquitto_new(userid, clean_session, NULL);                          if (!mosq) {                                                                fprintf(stderr, "Error: Out of memory.\n");                             return -1;                                                              }                                                                           /* > 日志回调 */                                                            mosquitto_log_callback_set(mosq, my_log_callback);                          /* > 收到连接应答回调 */                                                    mosquitto_connect_callback_set(mosq, my_connect_callback);                  /* > 发布消息回调 */                                                        mosquitto_publish_callback_set(mosq, my_publish_callback);                  /* > 设置遗嘱信息 */                                                        mosquitto_will_set(mosq, "subb", strlen(will_mess) + 1, will_mess, 2, 0);   /* > 连接 */                                                                mosquitto_connect(mosq, host, port, keepalive);                             /* > 在另外一个线程启动 */                                                  mosquitto_loop_start(mosq);                                                 /* > 从终端得到消息发布 */                                                  while (scanf("%s", message)) {                                              getchar();                                                              message[strlen(message)] = '\0';                                        ret = mosquitto_publish(mosq, &mid, "subb", strlen(message) + 1, message, 2, true);++mid;                                                                  if (ret) {                                                              mosquitto_disconnect(mosq);                                         fprintf(stderr, "mosquitto_publish ERROR");                         }                                                                       }                                                                           mosquitto_destroy(mosq);                                                    mosquitto_lib_cleanup();                                                    return 0;                                                                   
}

订阅客户端(mqtt_client_sub.c)

/******************************************************************************                                                                                                   
** Coypright(C) 2014-2024 Qiware technology Co., Ltd                              
**                                                                                
** 文件名: mtqq_client.c                                                          
** 版本号: 1.0                                                                    
** 描  述:                                                                        
** 作  者: # Zengyao.Pang # 2017年09月01日 星期五 16时26分10秒 #                  
******************************************************************************/
#include                                                              
#include                                                                  
#include                                                                 void my_message_callback(struct mosquitto *mosq, void *userdata, const struct mosquitto_message *message)
{                                                                                  if (message->payloadlen) {                                                     printf("%s %s\n", message->topic, message->payload);                       } else {                                                                       printf("%s (null)\n", message->topic);                                     }                                                                              fflush(stdout);                                                                
}                                                                                  void my_connect_callback(struct mosquitto *mosq, void *userdata, int result)       
{                                                                                  int i;                                                                         if(!result){                                                                   //mosquitto_subscribe(mosq, NULL, "$SYS/#", 2);                            /* > 进行消息订阅 */                                                       mosquitto_subscribe(mosq, NULL, "subb", 2);                                } else {                                                                       fprintf(stderr, "Connect failed\n");                                       }                                                                              
}                                                                                  void my_subscribe_callback(struct mosquitto *mosq, void *userdata, int mid, int qos_count, const int *granted_qos)
{                                                                                  int i;                                                                         printf("Subscribed (mid: %d): %d", mid, granted_qos[0]);                       for(i=1; i

编译运行

gcc mqtt_client_pub.c –lmosquitto –o pub
gcc mqtt_client_sub.c –lmosquitto –o sub
./pub
./sub

在pub终端输入任意字符串,按回车进行推送,在sub终端收到信息

go语言版本

相关库安装

1、  git clone https://github.com/golang/net.git
2、  拷贝net目录到“$GO_PATH/src/golang.org/x”目录下
3、  进入“$GO_PATH/src/golang.org/x/net/websocket",运行go build
4、  运行"go get github.com/eclipse/paho.mqtt.golang"
5、  go get golang.org/x/net/websocket
6、  go get golang.org/x/net/proxy

paho.mqtt.golang相关说明

接口文档连接 https://godoc.org/github.com/eclipse/paho.mqtt.golang

相关函数简要说明(跟c语言实现的功能类似),主要接口实现:
1、type Client
type Client interface {/*  判断是否连接 */IsConnected() bool/* 进行连接 */Connect() Token/* 关闭连接 */Disconnect(quiesce uint)/* 发布消息 */Publish(topic string, qos byte, retained bool, payload interface{}) Token/* 订阅消息,主题 */Subscribe(topic string, qos byte, callback MessageHandler) Token/*订阅消息,主题过滤器 */SubscribeMultiple(filters map[string]byte, callback MessageHandler) Token/* 取消订阅 */Unsubscribe(topics ...string) Token/* 增加订阅主题 */AddRoute(topic string, callback MessageHandler)
}2、type ClientOptions以下结构体内容均可以调用相关函数进行设置。type ClientOptions struct {Servers          []*url.URL     /* 设置mqtt代理服务 */ClientID         string         /* 客户端id */Username         string         /* 用户名 */Password         string         /* 用户密码 */CleanSession     bool           /* 断线后是否清理会话 */Order            bool           /*  是否保证每个QoS级别顺序 */WillEnabled      bool           /* 是否设定遗嘱 */WillTopic        string         /* 遗嘱话题 */WillPayload      []byte         /* 遗嘱消息 */WillQos          byte           /* 遗嘱级别 */WillRetained     bool           /* 断线后是否保留遗嘱会话 */ProtocolVersion  uint           /* 协议版本 */TLSConfig             tls.Config               /* tls配置 */KeepAlive             int64                    /* 保活 */PingTimeout           time.Duration            /* ping */ConnectTimeout        time.Duration            /* 连接超时 */MaxReconnectInterval  time.Duration            /* 最大重连间隔 */AutoReconnect         bool                     /* 是否自动重连 */Store                 Store                    /* 提供持久性的消息接口 */DefaultPublishHandler MessageHandler           /* 发布消息回调 */OnConnect             OnConnectHandler         /* 连接回调 */OnConnectionLost      ConnectionLostHandler    /* 连接意外中断回调 */WriteTimeout          time.Duration            /* 写超时回调 */MessageChannelDepth   uint                     /* 设置离线期间队列大小*/// contains filtered or unexported fields}3、ClientOptionsReader与2相反,此对象主要提供读取option相关信息4、ConnectTokenclient类型建立连接时返回类型type ConnectToken interface {/* 返回错误信息 */Error() error/* 返回在connect过程中connack相应的code */ReturnCode() byte/* 无限期的等到连接完成 */Wait() bool/* 在d时间范围内等待连接完成WaitTimeout(d time.Duration) bool}5、DisconnectTokenclient类型关闭连接时返回类型type DisconnectToken interface {/* 返回错误信息 */Error() error/* 无限期的等到连接中断完成 */Wait() bool/* 在d时间范围内等待连接中断完成WaitTimeout(d time.Duration) bool}注:还有PublishToken、SubscribeToken、UnsubscribeToken接口类似在此不一一列举6、MId消息id(uint16)7、MemoryStoreMemoryStore实现了存储界面,以提供完全存储在内存中的“持久性”机制。 只要客户机实例存在,store就存在。type MemoryStore struct {sync.RWMutex// contains filtered or unexported fields}/* 返回一个指向MemoryStore新的实例指针,没有初始化,等待open调用 */func NewMemoryStore() *MemoryStore/* 返回一个包含memorystore所有当前键的字符串 */func (store *MemoryStore) All() []string/* 执行此操作后,不允许修改MemoryStore状态func (store *MemoryStore) Close()/* 删除一个键 */func (store *MemoryStore) Del(key string)/* 通过键值获取消息指针 */func (store *MemoryStore) Get(key string) packets.ControlPacket/* 初始化一个MemoryStore实例 */func (store *MemoryStore) Open()/* 将一个键值和一个消息指针放入store中 */func (store *MemoryStore) Put(key string, message packets.ControlPacket)/* 删除所有持久化消息 */func (store *MemoryStore) Reset()注:Store接口与其类似8、Messagetype Message interface {Duplicate() bool         /* dup标志 */Qos() byte               /* 服务质量 */Retained() bool          /* retain标志 */Topic() string           /* 主题 */MessageID() uint16       /* 消息id */Payload() []byte         /* 消息内容 */}

客户端demo

MTQQ_client.go 功能:消息订阅、发送、并接收 go run MTQQ_client.go

package main                                                                                                                                                                      import (                                                                           "fmt"                                                                          MQTT "github.com/eclipse/paho.mqtt.golang"                                     "os"                                                                           "time"                                                                         
)                                                                                  /* 消息到来处理回调 */                                                             
var f MQTT.MessageHandler = func(client MQTT.Client, msg MQTT.Message) {           fmt.Printf("TOPIC: %s\n", msg.Topic())                                         fmt.Printf("MSG: %s\n", msg.Payload())                                         
}                                                                                  func main() {                                                                      /* 构建一个客户端配置对象,可以包含客户端id,代理地址,handler等*/             opts := MQTT.NewClientOptions().AddBroker("tcp://iot.eclipse.org:1883")        opts.SetClientID("go-simple")                                                  opts.SetDefaultPublishHandler(f)                                               /* 用opts建立一个mqtt对象 */                                                   c := MQTT.NewClient(opts)                                                      if token := c.Connect(); token.Wait() && token.Error() != nil {                panic(token.Error())                                                       }                                                                              /* 订阅 */                                                                     if token := c.Subscribe("go-mqtt/sample", 0, nil); token.Wait() && token.Error() != nil {fmt.Println(token.Error())                                                 os.Exit(1)                                                                 }                                                                              /* 发布 */                                                                     for i := 0; i < 5; i++ {                                                       text := fmt.Sprintf("this is msg #%d!", i)                                 token := c.Publish("go-mqtt/sample", 0, false, text)                       token.Wait()                                                               }                                                                              time.Sleep(3 * time.Second)                                                    /* 取消订阅 */                                                                 if token := c.Unsubscribe("go-mqtt/sample"); token.Wait() && token.Error() != nil {fmt.Println(token.Error())                                                 os.Exit(1)                                                                 }                                                                              /* 断开连接 */                                                                 c.Disconnect(250)                                                              
} 


本文来自互联网用户投稿,文章观点仅代表作者本人,不代表本站立场,不承担相关法律责任。如若转载,请注明出处。 如若内容造成侵权/违法违规/事实不符,请点击【内容举报】进行投诉反馈!

相关文章

立即
投稿

微信公众账号

微信扫一扫加关注

返回
顶部