mqtt js 中乱码_mqtt之上RRPC同步调用实战

1.背景
MQTT协议是基于PUB/SUB的异步通信模式,不适用于服务端同步控制设备端返回结果的场景。物联网平台基于MQTT协议制定了一套请求和响应的同步机制,无需改动MQTT协议即可实现同步通信。物联网平台提供API给服务端,设备端只需要按照固定的格式回复PUB消息,服务端使用API,即可同步获取设备端的响应结果。
2.名词解释
RRPC:Revert-RPC。RPC(Remote Procedure Call)采用客户机/服务器模式,用户不需要了解底层技术协议,即可远程请求服务。RRPC则可以实现由服务端请求设备端并能够使设备端响应的功能。
3.RRPC原理

物联网平台收到来自用户服务器的RRPC调用,下发一条RRPC请求消息给设备。消息体为用户传入的数据,Topic为物联网平台定义的Topic,其中含有唯一的RRPC消息ID。
设备收到下行消息后,按照指定Topic格式(包含之前云端下发的唯一的RRPC消息ID)回复一条RRPC响应消息给云端,云端提取出Topic中的消息ID,和之前的RRPC请求消息匹配上,然后回复给用户服务器。
如果调用时设备不在线,云端会给用户服务器返回设备离线的错误;如果设备没有在超时时间内(5秒内)回复RRPC响应消息,云端会给用户服务器返回超时错误。
3.1 设备端标识 ext=1
为了配合RRPC调用,设备端必须在进行MQTT CONNECT协议设置时,在clientId中增加ext=1参数:

3.2 RRPC通信Topic
RRPC调用的Topic格式如下:

其中${messageId}是IoT物联网平台生成的唯一的RRPC消息id,黑体部分是IoT物联网平台约定,红色部分可以根据业务场景自定义。
3.3 云端POP API调用

4.开发实战
4.1 设备端代码(Nodejs)
/**
"dependencies": { "mqtt": "2.18.8" }
*/
const crypto = require('crypto');
const mqtt = require('mqtt');
//设备身份三元组+区域
const deviceConfig = {
productKey: "替换productKey",
deviceName: "替换deviceName",
deviceSecret: "替换deviceSecret",
regionId: "cn-shanghai"
};
const url = `tcp://${deviceConfig.productKey}.iot-as-mqtt.${deviceConfig.regionId}.aliyuncs.com:1883`;
//2.建立连接
const client = mqtt.connect(url, makeMqttOptions(deviceConfig));
//3.监听RRPC指令
client.subscribe(`/ext/rrpc/+/this/is/my/topic`)
client.on('message', function(topic, message) {
console.log("topic=" + topic)
console.log("payloadJson=" + message)
//4.响应RRPC指令
if (topic.indexOf(`/ext/rrpc/`) > -1) {
client.publish(topic, JSON.stringify({ code: 200, msg: "rrpc ok" }));
}
})
/*
生成MQTT连接参数
*/
function makeMqttOptions() {
const params = {
productKey: deviceConfig.productKey,
deviceName: deviceConfig.deviceName,
timestamp: Date.now(),
clientId: Math.random().toString(36).substr(2),
}
//CONNECT参数
const options = {
keepalive: 60, //60s
clean: false, //cleanSession保持持久会话
protocolVersion: 4 //MQTT v3.1.1
}
//1.生成clientId,username,password
options.password = signHmacSha1(params, deviceConfig.deviceSecret);
options.clientId = `${params.clientId}|securemode=3,signmethod=hmacsha1,timestamp=${params.timestamp},ext=1|`;
options.username = `${params.deviceName}&${params.productKey}`;
return options;
}
/*
生成基于HmacSha1的password
参考文档:https://help.aliyun.com/document_detail/73742.html?#h2-url-1
*/
function signHmacSha1(params, deviceSecret) {
let keys = Object.keys(params).sort();
// 按字典序排序
keys = keys.sort();
const list = [];
keys.map((key) => {
list.push(`${key}${params[key]}`);
});
const contentStr = list.join('');
return crypto.createHmac('sha1', deviceSecret).update(contentStr).digest('hex');
}
4.2 服务端POP API调用代码(Nodejs)
const co = require('co');
const RPCClient = require('@alicloud/pop-core').RPCClient;
const options = {
accessKey:"替换accessKey",
accessKeySecret: "替换accessKeySecret"
};
//1.初始化client
const client = new RPCClient({
accessKeyId: options.accessKey,
secretAccessKey: options.accessKeySecret,
endpoint: 'https://iot.cn-shanghai.aliyuncs.com',
apiVersion: '2018-01-20'
});
//2.构造RRPC参数
const params = {
ProductKey: "你的产品ProductKey",
DeviceName: "你的设备DeviceName",
RequestBase64Byte: Buffer.from("Hello World").toString('base64'),
Timeout: 5000,
Topic:'/this/is/my/topic'
};
co(function*() {
try {
//3.发起API调用
const response = yield client.request('RRpc', params);
console.log(JSON.stringify(response));
} catch (err) {
console.log(err);
}
});
4.3 实际运行效果
# 设备端日志
$ node rrpc_client.js
topic=/ext/rrpc/1128893568908287488/this/is/my/topic
payloadJson=Hello World
# 服务端日志
$ node rrpc_Server.js
{
"MessageId": "1128893568908287488",
"RequestId": "F9540921-8FDB-4671-B50A-84D119DA56D4",
"PayloadBase64Byte": "eyJjb2RlIjoyMDAsIm1zZyI6InJycGMgb2sifQ==",
"Success": true,
"RrpcCode": "SUCCESS"
}
其中 eyJjb2RlIjoyMDAsIm1zZyI6InJycGMgb2sifQ==
解码后即:{ code: 200, msg: "rrpc ok" }
4.4 IoT物联网平台 日志服务


本文来自互联网用户投稿,文章观点仅代表作者本人,不代表本站立场,不承担相关法律责任。如若转载,请注明出处。 如若内容造成侵权/违法违规/事实不符,请点击【内容举报】进行投诉反馈!
