Swoole入门教程(二):搭建物联网服务

多端口监听

在开发服务器的时候,我们常常要同事对外开放很多端口,比如开放80端口提供http服务,开放9501端口提供Tcp服务,开放9502端口提供websocket服务,这时我们不需要new多个server,只需要用listen方法新增端口监听和协议即可
由于协程服务类Swoole\Coroutine\Http\Server没有多端口函数,我们用异步服务类Swoole\Http\Server来编写,代码如下


$server = new \Swoole\Http\Server('0.0.0.0',88);
$server->on('request',function (\Swoole\Http\Request $request,\Swoole\Http\Response $response){$route = $request->server['request_uri'];echo $route;$response->end("

Hello 你来自{$route}

"
); }); $server->on('start',function(){echo '服务启动'; }); $tcp = $server->listen('0.0.0.0',9501,SWOOLE_SOCK_TCP); $webSocket = $server->listen('0.0.0,0',9502,SWOOLE_SOCK_TCP); $webSocket->set(['open_websocket_protocol'=>true]); $webSocket->on('receive',function(){echo 'Websocket收到了数据'; }); $server->start();

$webSocket端口对象要添加on方法,不然会报 Swoole\Server::start(): require onMessage callback 错误

关于Server的构造器参数,请自行阅读官方文档:https://wiki.swoole.com/#/server/tcp_init

通过上面的代码我们就建立了3个服务88端口的http服务、9501端口的TCP服务,9502端口的websocket服务
那么如何接收请求呢,会不会混乱因为已经用端口区分开了,让每个端口各自处理请求,用的是Port对象的on方法,listen函数会返回一个\Swoole\Server\Port对象,根据协议不同,Port对象可以处理的事件不同,具体如下
TCP 服务器

  • onConnect
  • onClose
  • onReceive

UDP 服务器

  • onPacket
  • onReceive

HTTP 服务器

  • onRequest

WebSocket 服务器

  • onMessage
  • onOpen
  • onHandshake

Swoole在物联网的应用

MODBUS通讯

用Swoole可以开发物联网应用,假设我们手上有一个环境监测设备,设备采用标准 MODBUS-RTU 通信协议,RS485 信号输出,现在我们就来开发一个服务器和这个设备通讯,
1、首先,我们一般会拿到设备的对接文档,大概长这样子的
image.png
image.png
image.png
2、然后我们随便找一个网络调试工具,用于模拟设备端来开发调试
image.png
3、我们就开始写服务端代码
假如我们要向03号设备查询当前环境的温度值,根据文档,我们需要构造的发送端数据是

地址码功能码起始地址数据长度校验码低位校验码高位
0x030x03num2hex(501)(2个字节)num2hex(2)(2个字节)crc16低位crc16高位
function num2hex($num,$len){$hex = base_convert($num,10,16);//一个字节=2个16进制位$len = $len*2;if(strlen($hex)<$len){$hex = str_repeat('0',$len-strlen($hex)).$hex;}return $hex;
}function crc16($string)
{$hex = pack('H*', $string);$crc = 0xFFFF;for ($x = 0; $x < strlen($hex); $x++) {$crc = $crc ^ ord($hex[$x]);for ($y = 0; $y < 8; $y++) {if (($crc & 0x0001) == 0x0001) {$crc = (($crc >> 1) ^ 0xA001);} else {$crc = $crc >> 1;}}}return $crc;
}$low = sprintf('%02s',dechex($crc %256));// 低八位$high = sprintf('%02s',dechex(floor($crc /256)));// 高8位

通过函数的处理,便可得出发送数据为
03 03 01 f5 00 02 d4 27

go(function(){$server = new \Swoole\Coroutine\Server('0.0.0.0',9501,false,true);$server->handle(function (\Swoole\Coroutine\Server\Connection $conn){echo '您好,欢迎进入聊天室'."\n";$socket = $conn->exportSocket();//向设备发送数据$command = '030301f50002d427';$conn->send(hex2bin($command));while (true){$data = $conn->recv();if ($data === '' || $data === false) {
//                $errCode = swoole_last_error();
//                $errMsg = socket_strerror($errCode);
//                echo "协程风格下链接是动态创建和销毁的,可以设置5秒没收到数据自动超时关闭\n";
//                $conn->close();break;}else{echo '收到数据:'.bin2hex($data);}}//向设备发送请求数据echo '客户端:'.$socket->fd.'的链接已经关闭';});$server->start();
});

在设备端点击连接后就收到了数据请求了
image.png
这时设备端需要回复应答帧给服务端去解析
image.png
image.png
解析的过程刚好是刚才组装数据的逆向,由于设备发来的数据可能存在丢包,所以要先做一下crc校验,校验非常简单,去掉最后2个字节,用0303040292ff9b通过crc16函数算出crc的低位和高位,然后对比79fd看是否对应上,如果对应上了,那数据就是正确的了。拿到数据后,再通过进制换算就可以拿到温度数据了。
0xFF9B (十六进制)= -101 => 温度 = -10.1℃

TCP 粘包问题

Swoole在没有并发的情况下快速启动中的代码可以正常运行,但是并发高了就会有 TCP 数据包边界问题,TCP 协议在底层机制上解决了 UDP 协议的顺序和丢包重传问题,但相比 UDP 又带来了新的问题,TCP 协议是流式的,数据包没有边界,应用程序使用 TCP 通信就会面临这些难题,俗称** TCP 粘包问题**。
因为 TCP 通信是流式的,在接收 1 个大数据包时,可能会被拆分成多个数据包发送。多次 Send 底层也可能会合并成一次进行发送。这里就需要 2 个操作来解决:

  • 分包:Server 收到了多个数据包,需要拆分数据包
  • 合包:Server 收到的数据只是包的一部分,需要缓存数据,合并成完整的包

所以 TCP 网络通信时需要设定通信协议。常见的 TCP 通用网络通信协议有 HTTP、HTTPS、FTP、SMTP、POP3、IMAP、SSH、Redis、Memcache、MySQL 。
值得一提的是,Swoole 内置了很多常见通用协议的解析,来解决这些协议的服务器的 TCP 数据包边界问题,只需要简单的配置即可,参考 open_http_protocol/open_http2_protocol/open_websocket_protocol/open_mqtt_protocol
前面的例子中,我们用的modbus不在通用协议里面,那么如何处理呢,其实,除了通用协议外还可以自定义协议,Swoole 支持了 2 种类型的自定义网络通信协议

  • EOF 结束符协议

此协议处理的原理是每个数据包结尾加一串特殊字符表示包已结束,所以才用此处理方式的话要保证每个数据段里面不能有EOF结束符,否则会造成分包错误,设置的代码如下

$server->set(array('open_eof_check' => true,'package_eof' => "\r\n",
));
$client->set(array('open_eof_check' => true,'package_eof' => "\r\n",
));
  • 固定包头 + 包体协议

固定包头的方法是最通用的,我们前面用的modbus可以通过这种方式来解决粘包问题。在服务器端程序中经常能看到。这种协议的特点是一个数据包总是由包头 + 包体 2 部分组成。包头由一个字段指定了包体或整个包的长度,长度一般是使用 2 字节 /4 字节整数来表示。服务器收到包头后,可以根据长度值来精确控制需要再接收多少数据就是完整的数据包。Swoole 的配置可以很好的支持这种协议,可以灵活地设置 4 项参数应对所有情况,具体文档在这:https://wiki.swoole.com/#/server/setting?id=open_length_check,这对前面的modbus,我们的需要的配置如下

    $server->set(array('open_length_check'   => true,'package_length_func' => function ($data) {//提取前面3个字节,第3个字节为数据区长度,然后+5(包括2个字节校验码)就是总长度if (strlen($data) < 3) {return 0;}$length = hexdec(bin2hex($data[2]));if ($length <= 0) {return -1;}$total = $length + 5;echo $total;return $total;},'package_max_length'  => 2000000,  //协议最大长度));

搭建Mqtt服务

关于什么是MQTT,请看《什么是MQTT,物联网MQTT协议详解》
通过设置 open_mqtt_protocol 选项,启用后Swoole会解析 MQTT 包头,Worker 进程的 onReceive 事件每次会返回一个完整的 MQTT 数据包。
可以使用 Swoole 作为 MQTT 服务端(broker)或客户端(client),实现一套完整物联网(IOT)解决方案
以下是实现一个MQTT服务端的代码


function decodeValue($data)
{return 256 * ord($data[0]) + ord($data[1]);
}function decodeString($data)
{$length = decodeValue($data);return substr($data, 2, $length);
}function mqttGetHeader($data)
{$byte = ord($data[0]);$header['type'] = ($byte & 0xF0) >> 4;$header['dup'] = ($byte & 0x08) >> 3;$header['qos'] = ($byte & 0x06) >> 1;$header['retain'] = $byte & 0x01;return $header;
}function eventConnect($header, $data)
{$connect_info['protocol_name'] = decodeString($data);$offset = strlen($connect_info['protocol_name']) + 2;$connect_info['version'] = ord(substr($data, $offset, 1));$offset += 1;$byte = ord($data[$offset]);$connect_info['willRetain'] = ($byte & 0x20 == 0x20);$connect_info['willQos'] = ($byte & 0x18 >> 3);$connect_info['willFlag'] = ($byte & 0x04 == 0x04);$connect_info['cleanStart'] = ($byte & 0x02 == 0x02);$offset += 1;$connect_info['keepalive'] = decodeValue(substr($data, $offset, 2));$offset += 2;$connect_info['clientId'] = decodeString(substr($data, $offset));return $connect_info;
}$server = new Swoole\Server('0.0.0.0', 9501, SWOOLE_BASE);$server->set(['open_mqtt_protocol' => true, // 启用 MQTT 协议'worker_num' => 1,
]);$server->on('Connect', function ($server, $fd) {echo "mqtt客户端连接进来.\n";
});$server->on('Receive', function ($server, $fd, $reactor_id, $data) {$header = mqttGetHeader($data);//swoole默认为什么解析了var_dump($header);if ($header['type'] == 1) {$resp = chr(32) . chr(2) . chr(0) . chr(0);eventConnect($header, substr($data, 2));$server->send($fd, $resp);} elseif ($header['type'] == 3) {$offset = 2;$topic = decodeString(substr($data, $offset));$offset += strlen($topic) + 2;$msg = substr($data, $offset);echo "client msg: {$topic}\n----------\n{$msg}\n";//file_put_contents(__DIR__.'/data.log', $data);}echo "收到消息,长度为" . strlen($data) . "\n";
});$server->on('Close', function ($server, $fd) {echo "客户端关闭连接.\n";
});
$server->on('start',function (){echo "Mqtt Server启动了!";
});$server->start();

启动后,我们通过Mqtt工具MQTTX来测试
image.png
image.png

异步任务

异步任务在任何系统中都至关重要,在 Server 程序中如果需要执行很耗时的操作,比如一个聊天服务器发送广播,比如文件传输,比如Web 服务器中发送邮件。如果直接去执行这些函数就会阻塞当前进程,导致服务器响应变慢,因此,我们需要异步任务
下面演示用代码演示,下载一本8M多的MQTT pdf书籍,书籍地址:https://www.laojunsay.com/wp-content/uploads/2023/04/Gaston-C.-Hillar-MQTT-Essentials-A-Lightweight-IoT-Protocol-2017-Packt-Publishing-libgen.li_.pdf

$server = new \Swoole\Server('0.0.0.0',9501,SWOOLE_PROCESS,SWOOLE_SOCK_TCP);
$server->set(array('worker_num'    => 4,     // 进程数'task_worker_num'=>4
));//此回调函数在worker进程中执行。
$server->on('Receive', function($serv, $fd, $reactor_id, $data) {//投递异步任务$url = $data;echo "收到一个下载地址:地址为:{$url}\n";$taskInfo = ['fd'=>$fd,'url'=>$url];$task_id = $serv->task($taskInfo);echo "立即指派给异步任务去下载: 任务id={$task_id},我要去处理别的事情了\n";
});//处理异步任务(此回调函数在task进程中执行)。
$server->on('Task', function ($serv, $task_id, $reactor_id, $data) {echo "异步开始下载,[id={$task_id}]".PHP_EOL;$url = $data['url'];$ch = curl_init();$timeout = 60;curl_setopt($ch, CURLOPT_URL, $url);curl_setopt($ch, CURLOPT_RETURNTRANSFER, 1);curl_setopt($ch, CURLOPT_CONNECTTIMEOUT, $timeout);//在需要用户检测的网页里需要增加下面两行curl_setopt($ch, CURLOPT_SSL_VERIFYPEER, false);//curl_setopt($ch, CURLOPT_HTTPAUTH, CURLAUTH_ANY);//curl_setopt($ch, CURLOPT_USERPWD, US_NAME.”:”.US_PWD);$contents = curl_exec($ch);curl_close($ch);file_put_contents('./mqtt.pdf',$contents);//返回任务执行的结果$finishInfo = ['msg'=>"资源:{$data['url']} -> 下载完成!",'fd'=>$data['fd']];$serv->finish($finishInfo);
});//处理异步任务的结果(此回调函数在worker进程中执行)。
$server->on('Finish', function ($serv, $task_id, $data) {echo "异步任务[{$task_id}] 处理完成: {$data['msg']}".PHP_EOL;$serv->send($data['fd'],iconv('utf-8','gb2312',$data['msg']));
});$server->on('start',function (){echo '服务器启动!'."\r\n";
});
$server->start();

演示结果
image.png
image.png
image.png
Swoole入门教程(一):服务器开发


本文来自互联网用户投稿,文章观点仅代表作者本人,不代表本站立场,不承担相关法律责任。如若转载,请注明出处。 如若内容造成侵权/违法违规/事实不符,请点击【内容举报】进行投诉反馈!

相关文章

立即
投稿

微信公众账号

微信扫一扫加关注

返回
顶部