使用swoole实现跨多个服务器聊天室
使用swoole实现跨多个服务器聊天室
本示例中,借助于swoole框架的websocket构建聊天室,并实现了跨服务器之间的消息通信。
需要注意的问题:
1、经实验发现,swoole框架中的websocket与http服务器不支持receive事件,所有要在websocket服务中另起一个端口监听client消息数据;
2、由于swoole在4.3版本以后逐渐开始移除与协程无关的模块,包括异步客户端、消息队列等。所有在高版本的swoole中要使用swoole_async_writefile方法,需要安装swoole_async扩展,或者使用协程写入文件。
3、$client->isConnected() 返回值为true时,这不代表连接一定是可用的,当执行send或recv时仍然有可能返回错误,因为应用层无法获得底层TCP连接的状态,执行send或recv时应用层与内核发生交互,才能得到真实的连接可用状态。
上代码:
基本类,也是后续所有类的基类
/*** websocket 基本类*/
class WsBase{// 服务protected $server;// 客户端protected $clientArr;// 日志记录路径protected $logFile = '/tmp/logs/ws_main.log';// 当前服务器信息protected $host = "0.0.0.0";protected $port;// 监听client端口protected $clientPort;// 转发服务器信息protected $forwardHost = "127.0.0.1";protected $forwardPort = "9540";/*** 服务器初始化*/protected function init(){// 服务器$this->server->on('start',[$this,'onStart']);$this->server->on('shutdown',[$this,'onShutdown']);// worker进程$this->server->on('workerstart',[$this,'onWorkerStart']);$this->server->on('workerstop',[$this,'onWorkerStop']);$this->server->on('workererror',[$this,'onWorkerError']);$this->server->on('task',[$this,'onTask']);$this->server->on('finish',[$this,'onFinish']);//客户端$this->server->on('connect',[$this,'onConnect']);$this->server->on('open',[$this,'onOpen']);$this->server->on('message',[$this,'onMessage']);$this->server->on('close',[$this,'onClose']);if ($this->server->start() == false) {$this->log('服务启动失败');}$this->log('服务已关闭');}/*** 服务器启动*/public function onStart($ser){swoole_set_process_name('ws_test');$this->log('服务 start');}/*** 服务器停止*/public function onShutdown($ser){$this->log('服务正常终止');}/*** 服务器开启worker进程*/public function onWorkerStart($ser,$workerId){$this->log('worker进程启动,当前是'.$ser->taskworker.',进程id:'.$workerId);}/*** worker进程终止*/public function onWorkerStop($ser,$workerId){$this->log('worker进程终止,当前是:'.$ser->taskworker.',进程id:'.$workerId);}/*** worker进程错误*/public function onWorkerError($ser,$workerId,$workerPid,$code,$signal){$this->log('worker错误,进程id:'.$workerId,',进程父id:'.$workerPid.',编码:'.$code.',进程退出信号:'.$signal);}/*** worker接收数据*/public function onReceive($ser,$fd,$reactorId,$data){$this->log('worker接收到数据,fd:'.$fd.',线程id:'.$reactorId.',数据:'.$data);}/*** 客户端连接*/public function onConnect($ser,$fd,$reactorId){$this->log('客户端连接,fd:'.$fd.',来源线程id:'.$reactorId);}/*** 客户端握手成功*/public function onOpen($ser,$req){$this->log('连接打开,连接fd:'.$req->fd);}/*** 客户端发送消息*/public function onMessage($ser,$frame){$this->log('接收到客户端消息,来源fd:'.$frame->fd.',消息内容为:'.$frame->data);}/*** 客户端关闭连接*/public function onClose($ser,$fd,$reactorId){$this->log('客户端关闭连接,fd:'.$fd.',来源线程id:'.$reactorId);}/*** 异步处理*/public function onTask($ser,$taskId,$srcWorkerId,$data){$this->log('异步处理,task_id:'.$taskId.',来源进程id:'.$srcWorkerId.',数据:'.$param['data']);$ser->finish('数据异步处理完成');}/*** 异步进程完成*/public function onFinish($ser,$taskId,$data){$this->log('task 完成,task_id:'.$taskId.',数据:'.$data);}/*** 记录日志*/protected function log($message,$type='log'){$log = $this->host.":".$this->port."【{$type}】".date('Y-m-d H:i:s').'~~'.$message.PHP_EOL;swoole_async_writefile($this->logFile,$log,function(){},FILE_APPEND);}/*** 向客户端发送消息*/protected function connectClient(&$source,$host,$port){if (!$source->connect($host,$port)) {// 连接失败重试$source->colse(true);if (!$source->connect($host,$port)) {$this->log('连接失败,参数:'.$host.':'.$port);return false;}}return true;}
}
核心消息处理类
在本次实验中使用1台服务器多个端口模拟多服务器的情况。
多个服务器则复制多份该类,修改每个类的 h o s t ( 主 服 务 器 地 址 ) 、 host(主服务器地址)、 host(主服务器地址)、port(服务器端口)、$clientPort(监听客户端消息的端口)三个参数即可
require_once __DIR__.'/WsBase.php';
/*** app端ws连接*/
class WsMain extends WsBase
{// 当前服务器信息protected $host = "0.0.0.0";protected $port = "9533";// 监听client端口protected $clientPort = "9541";public function __construct(){$this->server = new Swoole\WebSocket\Server($this->host, $this->port);$this->server->set(['reactor_num' => 2,'worker_num' => 4,'backlog' => 128,'max_request' => 50,//最大请求数'max_connection' => 100,//最大连接数'heartbeat_check_interval' => 30,//心跳检测'heartbeat_idle_time' => 300,'task_worker_num' => 4,'task_async' => true,]);// 由于http与websocket没有监听receive,所以需要新启用一个端口接收Client消息$clientServer = $this->server->listen($this->host,$this->clientPort,SWOOLE_SOCK_TCP);$clientServer->set(['open_websocket_protocol' => false]);$clientServer->on('receive',[$this,'onReceive']);$this->init();}/*** 服务器启动*/public function onStart($ser){swoole_set_process_name('ws_test');$this->log('服务 start');}/*** 客户端握手成功*/public function onOpen($ser,$req){$params = ['fd_src' => $req->fd,'data' => '用户'.$this->port.':'.$req->fd.'上线了',];$ser->task(json_encode($params,JSON_UNESCAPED_UNICODE));$this->log('连接打开,连接fd:'.$req->fd);}/*** worker接收数据*/public function onReceive($ser,$fd,$reactorId,$data){$this->log('worker接收到数据,fd:'.$fd.',线程id:'.$reactorId.',数据:'.$data);$params = json_decode($data,TRUE);$ser->task($data);$ser->send($fd,'服务器收到了你的消息');}/*** 客户端发送消息*/public function onMessage($ser,$frame){$params = ['fd_src' => $frame->fd,'data' => $this->port.'fd:'.$frame->fd.'说:'.$frame->data,];return $ser->task(json_encode($params,JSON_UNESCAPED_UNICODE));}/*** 异步处理*/public function onTask($ser,$taskId,$srcWorkerId,$data){$params = json_decode($data,TRUE);foreach ($ser->connections as $fd) {// 排除掉来源用户if (isset($params['fd_src']) && $params['fd_src'] == $fd) {continue;}// 检测是否是websocket连接$info = $ser->connection_info($fd);if (!isset($info['websocket_status']) || $info['websocket_status'] != 3) {continue;}$ser->push($fd,$params['data']);}if (isset($params['is_forward'])) {return;}$client = new Swoole\Client(SWOOLE_SOCK_TCP | SWOOLE_KEEP);if (!$this->connectClient($client,$this->forwardHost,$this->forwardPort)) {return;}$this->log('开始发送数据');$sendParams = ['server_src' => $this->host.':'.$this->clientPort,//来源服务器'data' => $params['data'],];$client->send(json_encode($sendParams,JSON_UNESCAPED_UNICODE));$client->recv();$this->log('异步处理,task_id:'.$taskId.',来源进程id:'.$srcWorkerId.',数据:'.$params['data']);$ser->finish($params['data']);}
}
new WsMain();
消息转发类
用于多个服务器之间的消息通信。
有多个服务器时,请根据实际情况设置该类中的$hostArr参数。其中:key为websocket服务器信息,value为websocket服务器监听的client信息(使用该写法的目的:为了将服务器与监听的client一一对应,在异步消息转发时,可以根据服务器信息或者监听的client信息排除掉原始服务器)。
require_once __DIR__.'/WsBase.php';
/*** websocket消息转发类*/
class WsForward extends WsBase
{protected $host = '0.0.0.0';protected $port = '9540';protected $hostArr = [// key => value // key:ws服务器信息,value:ws服务器监听的client信息'127.0.0.1:9533' => '127.0.0.1:9541','127.0.0.1:9534' => '127.0.0.1:9542',];public function __construct(){$this->server = new Swoole\Server($this->host, $this->port);$this->server->set(['reactor_num' => 2,'worker_num' => 4,'backlog' => 128,'max_connection' => 100,//最大连接数'heartbeat_check_interval' => 10,//心跳检测'heartbeat_idle_time' => 30,'task_worker_num' => 4,'task_async' => true,'daemonize' => true,]);$this->server->on('receive',[$this,'onReceive']);$this->init();}/*** 服务器启动*/public function onStart($ser){swoole_set_process_name('ws_forward');$this->log('服务 start');}/*** worker接收数据*/public function onReceive($ser,$fd,$reactorId,$data){$this->log('worker接收到数据,fd:'.$fd.',线程id:'.$reactorId.',数据:'.$data);$params = json_decode($data,TRUE);if (empty($params['data'])) {return $ser->send($fd,'消息不能为空');}$ser->task($data);$ser->send($fd,'服务器收到了你的消息');}/*** 客户端发送消息*/public function onMessage($ser,$frame){$params = ['fd_src' => $frame->fd,'msg' => $this->port.'fd:'.$frame->fd.'说:'.$frame->data,];return $ser->task(json_encode($params,JSON_UNESCAPED_UNICODE));}/*** 异步处理*/public function onTask($ser,$taskId,$srcWorkerId,$data){$this->log('开始异步处理数据');$params = json_decode($data,TRUE);foreach ($this->hostArr as $k => $v) {$params['server_src'] = str_replace('0.0.0.0', '127.0.0.1', $params['server_src']);// 排除源服务器if ($params['server_src'] == $v || $params['server_src'] == $k) {continue;}$arr = explode(':', $v);if (count($arr) !== 2) {$this->log('服务器配置错误,错误信息为:'.$v);continue;}$client = new Swoole\Client(SWOOLE_SOCK_TCP | SWOOLE_KEEP);if (!$this->connectClient($client,$arr[0],$arr[1])) {continue;}$sendParams = ['data' => $params['data'],'is_forward' => 1 // 标记是转发的消息];$client->send(json_encode($sendParams,JSON_UNESCAPED_UNICODE));$client->recv();}$ser->finish($params['data']);}
}
new WsForward();
本文来自互联网用户投稿,文章观点仅代表作者本人,不代表本站立场,不承担相关法律责任。如若转载,请注明出处。 如若内容造成侵权/违法违规/事实不符,请点击【内容举报】进行投诉反馈!
