Swoole整合ThinkPHP3.2系列教程二

swoole和ThinkPHP的整合

先上一份在我们系统内部的swoole整合的架构预览

├─Application                           应用目录
│  │
│  ├─Cli                                Cli模块
│  │  ├─Controller                      制器类
│  │  │  ├─StartController              TP框架加载时默认加载的控制器
│  │  │  ├─SwooleController             我们的业务逻辑写在这里面
│  │  └─ ...
├─Swoole
│  │
│  ├─log                                swoole运行日志
│  ├─Server.php                         swoole的服务代码
│  ├─swoole.php                         用于cli模式下启动和软重启swoole服务

最早测试的时候不是这样搭建的,而是把Server.php里的关于服务的东西放在了TP控制器里,在cli模式下调用

php index.php(入口文件) Swoole/start (控制器/方法)

这种模式是把swoole套在了TP里运行,也是可行的,但是总觉得启动个swoole服务为毛还要告诉TP一声?

我们想要的一种模式是业务服务器独立运行,swoole服务作为守护进程常驻内存,当浏览器需要运行比较耗时的操作时,需要跟swoole服务进程建立长连接,当耗时的任务执行完毕时会通知浏览器已经完毕。

整个过程下来,swoole服务和业务服务不应该耦合在一起的。最完美的状态是swoole独立运行(使用swoole框架重新写耗时操作的业务逻辑代码),独立连接数据库。当浏览器执行这些任务时,就不再找业务服务器了,而是直接跟swoole服务打交道。

可是我们没有那么多的时间和精力,我们只能在swoole服务的进程里调用TP框架的东西,来执行我们在TP里写的代码。最后参考了网上的一些方案,选择了这样与TP结合。


代码逻辑

/Swoole/Server.php
  • 关于swoole服务的一些配置,比如监听地址,端口号(默认为9501),和一些基础配置

  • 一些回调函数,里面的代码注释很完整,可以直接看代码


// +----------------------------------------------------------------------
// 2017-8-23 09:03:40
// 此次修改为只作为websocket的服务端
// +----------------------------------------------------------------------
class Server{protected $swoole;// 监听所有地址protected $host = '0.0.0.0';// 监听 9501 端口protected $port = 9501;// 配置项protected $option = [//设置启动的worker进程数'worker_num' => 2,//task进程的数量'task_worker_num' => 4,//指定task任务使用消息队列模式,3表示完全争抢模式,task进程会争抢队列,无法使用定向投递'task_ipc_mode' => 3,//task进程的最大任务数'task_max_request' => 1000,// 守护进程化'daemonize'  => false,// 监听队列的长度'backlog'    => 128,//绑定uid时用'dispatch_mode' => 5,//设置日志路径'log_file' => SWOOLE_LOG_PATH,];protected function init(){//异步非阻塞多进程的websocket$this->swoole = new swoole_websocket_server($this->host, $this->port);$eventList    = ['Open', 'Message', 'Close', 'HandShake' , 'Task', 'Finish' , 'WorkerStart' , 'Receive'];// 设置参数if (!empty($this->option)) {$this->swoole->set($this->option);}// 设置回调foreach ($eventList as $event) {if (method_exists($this, 'on' . $event)) {$this->swoole->on($event, [$this, 'on' . $event]);}}}public function start(){$this->init();$this->swoole->start();}public function getHost(){return $this->host;}public function getPort(){return $this->port;}/*** [onOpen 建立连接时的回调函数]* @method onOpen* @param  swoole_server       $serv [description]* @param  swoole_http_request $req    [description]* @return [type]                      [description]*/public function onOpen(swoole_server $serv, swoole_http_request $req){//将连接绑定到uid上面if(!empty($req->get)&&$req->get['uid']){$serv->bind($req->fd , $req->get['uid']);}}/*** [onMessage 接收到socket客户端发送数据的回调函数]* @method onMessage* @param  swoole_server          $serv [description]* @param  swoole_websocket_frame $frame  [description]* @return [type]                         [description]*/public function onMessage(swoole_server $serv,  swoole_websocket_frame $frame){//收到数据时处理数据//根据收到的cmd名字去调用指定的方法$receive = json_decode($frame->data,true);//为了避免数据处理量过大阻塞当前进程,导致服务响应变慢,我们把耗时的操作扔到TaskWorker进程池中执行//当要执行的方法存在并且已经在swoole_log表里备案过的可以丢到task进程池$swooleLog = D('SwooleLog');$swooleController = A('Swoole');//$receive['args']['id']是业务服务器那边数据库SwooleLog表里的idif (method_exists($swooleController, $receive['cmd']) && $receive['args']['id']) {$task_id = $serv->task($receive);//记录task_id信息//...其他你想要做的精细化处理}else{if($receive['cmd'] === 'reload'){//利用Swoole提供的柔性终止/重启的机制$rs = $serv -> reload();$serv->push($frame->fd , $rs);}elseif($receive['args']['id']){$returnData = $this->_returnStr('submit_error');$serv->push($frame->fd , $returnData);}elseif(method_exists($this, $receive['cmd'])){$this->{$receive['cmd']}($serv , $frame);}}}/*** [onTask 在task_worker进程池内被调用时的回调函数]* @method onTask* @param  swoole_server $serv          [description]* @param  int           $task_id       [description]* @param  int           $src_worker_id [description]* @param  mixed         $data          [description]* @return [type]                       [description]*/public function onTask(swoole_server $serv, $task_id, $src_worker_id, $data){//记录任务开始执行的时间//...自己发挥  我们项目的业务逻辑部分已经删除//try{$swooleController = A('Swoole');$rs = $swooleController->{$data['cmd']}($data['args']);return json_encode(['id'=>$data['args']['id'] , 'status' => true , 'other' => $rs]);}catch(\Exception $e){return json_encode(['id'=>$data['args']['id'] , 'status' => false , 'other' => $e->getMessage()]);}}/*** [onFinish 任务执行完毕时的回调函数]* @method onFinish* @param  swoole_server $serv    [description]* @param  int           $task_id [description]* @param  string        $data    [description]* @return [type]                 [description]*/public function onFinish(swoole_server $serv, $task_id, string $data){$rs = json_decode($data , true);$swooleLog = D('SwooleLog');//...//通知用户该任务已经执行完毕了 可以来查看数据了//检查客户端链接是否存在 如果存在的话发送消息$returnData = $this->_returnStr('task_excute_success');foreach($serv->connections as $fd){$conn = $serv->connection_info($fd);//根据uid判断当前连接是活跃的//这里是业务服务器那边能取到的用户信息,这里已经删除,保护我们的项目$serv->push($fd , $returnData);}}/*** [onWorkerStart 为了应用能热加载 把框架的东西放到worker进程启动]* @method onWorkerStart* @param  swoole_server $server    [description]* @param  [type]        $worker_id [description]* @return [type]                   [description]*/public function onWorkerStart(swoole_server $server, $worker_id){// 定义应用目录define('APP_PATH', '../Application/');// 定义应用模式define('APP_Mode', 'cli');define('BIND_MODULE','Cli');// 引入ThinkPHP入口文件require '../ThinkPHP/ThinkPHP.php';}/*** [_returnStr 返回给客户端的数据]* @method _returnStr* @param  [type]     $type [description]* @return [type]           [description]*/private function _returnStr($type){switch ($type) {case 'submit_success':$returnData = ['code' => 202,'info' => '任务已经提交,请等待服务器计算数据!'];break;case 'submit_error':$returnData = ['code' => 404,'info' => '任务提交失败,请联系管理员进行处理!'];break;case 'task_excute_success':$returnData = ['code' => 200,'info' => '任务执行完毕!'];break;}return json_encode($returnData);}/*** [clients 获取所有的客户端连接信息,返回我们的uid]* @method clients* @return [type] [description]*/public function clients(swoole_server $serv , swoole_websocket_frame $frame){//...$serv->push($frame->fd , json_encode($serv->connections));}public function __call($method, $args){call_user_func_array([$this->swoole, $method], $args);}
}
/Swoole/swoole.php

是一个面向cli编程的代码,主要提供了start和reload两个命令

#!/bin/env php

/*** 定义项目根目录&swoole-task pid*/
define('SWOOLE_PATH', __DIR__);
define('SWOOLE_LOG_PATH', SWOOLE_PATH . DIRECTORY_SEPARATOR . 'log' . DIRECTORY_SEPARATOR . 'Swoole' . '.log');/*** 加载 swoole server*/
include SWOOLE_PATH .  DIRECTORY_SEPARATOR . 'Server.php';//提示帮助信息
if ($argc != 2 || in_array($argv[1], array('--help', '-help', '-h', '-?'))) {echo <<exit;
}//执行命令行选项
switch($argv[1]){case 'start':start();break;case 'reload':reload();break;case 'list':clients();break;default:exit("输入命令有误 : {$argv[1]}, 请查看帮助文档\n");break;
}//启动swoole服务
function start(){$server = new Server();$server->start();
}//柔性重启swoole服务
function reload(){echo "正在柔性重启swoole的worker进程..." . PHP_EOL;try {$server = new Server();$port = $server->getPort();$host = '127.0.0.1';$cli = new swoole_http_client($host, $port);$cli->set(['websocket_mask' => true]);$cli->on('message', function ($_cli, $frame) {if($frame->data){exit('swoole重启成功!'.PHP_EOL);}});$cli->upgrade('/', function ($cli) {$cli->push(json_encode(['cmd' => 'reload']));});} catch (Exception $e) {exit($e->getMessage() . PHP_EOL . $e->getTraceAsString());}
}function clients(){try {$server = new Server();$port = $server->getPort();$host = '127.0.0.1';$cli = new swoole_http_client($host, $port);$cli->set(['websocket_mask' => true]);$cli->on('message', function ($_cli, $frame) {$users = json_decode($frame->data , true);if(empty($users)){echo '当前没有客户端连接'.PHP_EOL;}else{foreach($users as $user){echo '---'.$user.PHP_EOL;}}exit();});$cli->upgrade('/', function ($cli) {$cli->push(json_encode(['cmd' => 'clients']));});} catch (Exception $e) {exit($e->getMessage() . PHP_EOL . $e->getTraceAsString());}
}

存在的问题

0.以上的代码是从项目里摘出来的,大体思路还在,业务部分代码删除了。不保证正常运行,但是问题不大,如果报错了聪明的你一定一看就知道是什么错误。

1.由于我们将TP框架的东西放在workstart回调函数里启动,这里只要启动worker进程和task进程,都会加载一次TP框架的东西到内存里,对内存的占用问题还需要仔细研究。

2.目前还不支持那些不支持HTML5 WebSocket的浏览器 我会尽快想办法解决。用flash模拟socket请求的方法没有试成功,近期重新再搞一波。


本文来自互联网用户投稿,文章观点仅代表作者本人,不代表本站立场,不承担相关法律责任。如若转载,请注明出处。 如若内容造成侵权/违法违规/事实不符,请点击【内容举报】进行投诉反馈!

相关文章

立即
投稿

微信公众账号

微信扫一扫加关注

返回
顶部