mqtt js 中乱码_mqtt之上RRPC同步调用实战

617592274544fe8da113338914b34df0.png

1.背景

MQTT协议是基于PUB/SUB的异步通信模式,不适用于服务端同步控制设备端返回结果的场景。物联网平台基于MQTT协议制定了一套请求和响应的同步机制,无需改动MQTT协议即可实现同步通信。物联网平台提供API给服务端,设备端只需要按照固定的格式回复PUB消息,服务端使用API,即可同步获取设备端的响应结果。

2.名词解释

RRPC:Revert-RPC。RPC(Remote Procedure Call)采用客户机/服务器模式,用户不需要了解底层技术协议,即可远程请求服务。RRPC则可以实现由服务端请求设备端并能够使设备端响应的功能。

3.RRPC原理

5e954787fa85524d1a787fe951aea7f2.png

  1. 物联网平台收到来自用户服务器的RRPC调用,下发一条RRPC请求消息给设备。消息体为用户传入的数据,Topic为物联网平台定义的Topic,其中含有唯一的RRPC消息ID。

  2. 设备收到下行消息后,按照指定Topic格式(包含之前云端下发的唯一的RRPC消息ID)回复一条RRPC响应消息给云端,云端提取出Topic中的消息ID,和之前的RRPC请求消息匹配上,然后回复给用户服务器。

  3. 如果调用时设备不在线,云端会给用户服务器返回设备离线的错误;如果设备没有在超时时间内(5秒内)回复RRPC响应消息,云端会给用户服务器返回超时错误。

3.1 设备端标识 ext=1

为了配合RRPC调用,设备端必须在进行MQTT CONNECT协议设置时,在clientId中增加ext=1参数:

2a61d8259d31c2c21402ef9b5fcf318d.png

3.2 RRPC通信Topic

RRPC调用的Topic格式如下:

5c9bc88f78763fa36bba85202ff2b987.png

其中${messageId}是IoT物联网平台生成的唯一的RRPC消息id,黑体部分是IoT物联网平台约定,红色部分可以根据业务场景自定义。

3.3 云端POP API调用

5b18a2e1e0f3adc4d8e5e7d2369e59d3.png

4.开发实战

4.1 设备端代码(Nodejs)

/**
"dependencies": { "mqtt": "2.18.8" }
*/
const crypto = require('crypto');
const mqtt = require('mqtt');
//设备身份三元组+区域 

const deviceConfig = {
    productKey: "替换productKey",
    deviceName: "替换deviceName",
    deviceSecret: "替换deviceSecret",
    regionId: "cn-shanghai"
};

const url = `tcp://${deviceConfig.productKey}.iot-as-mqtt.${deviceConfig.regionId}.aliyuncs.com:1883`;
//2.建立连接
const client = mqtt.connect(url, makeMqttOptions(deviceConfig));
//3.监听RRPC指令
client.subscribe(`/ext/rrpc/+/this/is/my/topic`)
client.on('message', function(topic, message) {
    console.log("topic=" + topic)
    console.log("payloadJson=" + message)
    //4.响应RRPC指令
    if (topic.indexOf(`/ext/rrpc/`) > -1) {
        client.publish(topic, JSON.stringify({ code: 200, msg: "rrpc ok" }));
    }

})
/*
  生成MQTT连接参数
*/
function makeMqttOptions() {

    const params = {
        productKey: deviceConfig.productKey,
        deviceName: deviceConfig.deviceName,
        timestamp: Date.now(),
        clientId: Math.random().toString(36).substr(2),
    }
    //CONNECT参数
    const options = {
        keepalive: 60, //60s
        clean: false, //cleanSession保持持久会话
        protocolVersion: 4 //MQTT v3.1.1
    }
    //1.生成clientId,username,password
    options.password = signHmacSha1(params, deviceConfig.deviceSecret);
    options.clientId = `${params.clientId}|securemode=3,signmethod=hmacsha1,timestamp=${params.timestamp},ext=1|`;
    options.username = `${params.deviceName}&${params.productKey}`;

    return options;
}

/*
  生成基于HmacSha1的password
  参考文档:https://help.aliyun.com/document_detail/73742.html?#h2-url-1
*/
function signHmacSha1(params, deviceSecret) {

    let keys = Object.keys(params).sort();
    // 按字典序排序
    keys = keys.sort();
    const list = [];
    keys.map((key) => {
        list.push(`${key}${params[key]}`);
    });
    const contentStr = list.join('');
    return crypto.createHmac('sha1', deviceSecret).update(contentStr).digest('hex');
}

4.2 服务端POP API调用代码(Nodejs)

const co = require('co');
const RPCClient = require('@alicloud/pop-core').RPCClient;

const options = {
    accessKey:"替换accessKey",
    accessKeySecret: "替换accessKeySecret"
};

//1.初始化client
const client = new RPCClient({
    accessKeyId: options.accessKey,
    secretAccessKey: options.accessKeySecret,
    endpoint: 'https://iot.cn-shanghai.aliyuncs.com',
    apiVersion: '2018-01-20'
});
//2.构造RRPC参数
const params = {
    ProductKey: "你的产品ProductKey",
    DeviceName: "你的设备DeviceName",
    RequestBase64Byte: Buffer.from("Hello World").toString('base64'),
    Timeout: 5000,
    Topic:'/this/is/my/topic'
};

co(function*() {
    try {
        //3.发起API调用
        const response = yield client.request('RRpc', params);

        console.log(JSON.stringify(response));
    } catch (err) {
        console.log(err);
    }
});

4.3 实际运行效果

# 设备端日志
$ node rrpc_client.js 
topic=/ext/rrpc/1128893568908287488/this/is/my/topic
payloadJson=Hello World

# 服务端日志
$ node rrpc_Server.js 
{
    "MessageId": "1128893568908287488",
    "RequestId": "F9540921-8FDB-4671-B50A-84D119DA56D4",
    "PayloadBase64Byte": "eyJjb2RlIjoyMDAsIm1zZyI6InJycGMgb2sifQ==",
    "Success": true,
    "RrpcCode": "SUCCESS"
}

其中 eyJjb2RlIjoyMDAsIm1zZyI6InJycGMgb2sifQ== 
解码后即:{ code: 200, msg: "rrpc ok" }

4.4 IoT物联网平台 日志服务

94e6d01df91841480d96f0ae2a961dba.png

c7501675c655203dc81bd846e8c46d06.png


本文来自互联网用户投稿,文章观点仅代表作者本人,不代表本站立场,不承担相关法律责任。如若转载,请注明出处。 如若内容造成侵权/违法违规/事实不符,请点击【内容举报】进行投诉反馈!

相关文章

立即
投稿

微信公众账号

微信扫一扫加关注

返回
顶部