ThinkPHP使用rabbitMQ

背景说明

百度搜索php+rabbitMQ的资料虽然很多,但大多数不在TP框架的,即使有TP框架的资料也都不是非常细,我这重新整理了下,希望小伙伴不要走弯路!另外本文所使用的是PHP的amqp扩展,不是composer安装的php-amqplib,php-amqplib相对来说比较简单,有空我再写一个php-amqplib的使用说明

1.环境安装

rabbitMQ的安装->点此查看

wget https://pecl.php.net/get/amqp-1.9.3.tgz #下载
wget https://github.com/alanxz/rabbitmq-c/releases/download/v0.8.0/rabbitmq-c-0.8.0.tar.gz
tar -xvf amqp-1.9.3.tgz #解压
tar zxf rabbitmq-c-0.8.0.tar.gz
cd rabbitmq-c-0.8.0
./configure --prefix=/usr/local/rabbitmq-c
make && make install
cd amqp-1.9.3
/www/server/php/72/bin/phpize
./configure --with-php-config=/www/server/php/72/bin/php-config --with-amqp --with-librabbitmq-dir=/usr/local/rabbitmq-c/
make
make install

在你的php扩展包(/www/server/php/extensions)下查看是否已经有了扩展文件

在这里插入图片描述
打开php的配置文件,在底部添加扩展

extension = /www/server/php/72/lib/php/extensions/no-debug-non-zts-20170718/amqp.so

重启php

完成后phpInfo()如果能看到下图说明是安装成功了
在这里插入图片描述

2.封装消费者和生产者

消费者和生产者的大多数代码都是一样的,搜索到的资料也都是写2个php文件,没有封装好的,我这就简单整一下
在tp框架目录下的extend/下创建RabbitMqUtilsOri类,代码如下:

class RabbitMqUtilsOri
{private $conn;private $channel;private $ex;private $queue;private $routingKey;function __construct($exchangeName,$queueName,$routingKey){$this->exchangeName = $exchangeName;$this->queueName = $queueName;$this->routingKey = $routingKey;// 配置建议放到config目录下,这里方便大家看就直接写了$conf = ['host' => '127.0.0.1','port' => 5672,'login' => 'admin','password' => '111111',];//  1、创建链接对象$this->conn = new \AMQPConnection($conf);if (!$this->conn->connect()) {return json(["st" => 505, "errMsg" => "无法链接rabbitMQ"]);}//  2、创建通道$ch = new \AMQPChannel($this->conn);//  3、创建交换机$this->ex = new \AMQPExchange($ch);//  4、设置交换机名称$this->ex->setName($exchangeName);//  5、设置交换机的类型$this->ex->setType(AMQP_EX_TYPE_TOPIC);//  6、设置交换机持久化$this->ex->setFlags(AMQP_DURABLE);//  7、声明交换机$this->ex->declareExchange();//  8、创建消息队列$this->queue = new \AMQPQueue($ch);//  9、设置消息队列的名称$this->queue->setName($queueName);//  10、设置消息队列持久化(消息要想持久化,交换机和消息队列必须要持久化)$this->queue->setFlags(AMQP_DURABLE);//  11、声明消息队列$this->queue->declareQueue();//  12、绑定交换机和路由键$this->queue->bind($exchangeName, $routingKey);}/*** @param $msg array 消息内容* @throws AMQPChannelException* @throws AMQPConnectionException* @throws AMQPExchangeException*/public function sendMsg($msg){$msgBody = json_encode($msg);$this->ex->publish($msgBody,$this->routingKey,AMQP_NOPARAM,array('delivery_mode' => 2));}/*** @param $parma array [$message,"processMessage"]* $message为Message的对象,processMessage为Message的方法* 调用该方法需要用php的cli模式* @throws AMQPChannelException* @throws AMQPConnectionException* @throws AMQPEnvelopeException*/public function consume($parma){// 阻塞while(True){$this->queue->consume($parma);}}
}

3.生产者发送消息

这里模拟一个创建用户的队列消息,在mysql中创建一个rabbit_user的表,一会让消费者把队列中的用户插进去
控制器中创建一个发消息的方法:

 public function sendMsg(Request $request){$msg = ["name" => "小张", "age" => 22];$mq = new \RabbitMqUtilsOri("user_ex","user_que","user.insert");// RabbitMqUtilsOri是上面自己封装的类,第一个参数是交换机名称,第二个参数是队列名称,第三个参数是路由键名称,这里采用的是TOPIC模型,可以匹配字符串$mq->sendMsg($msg);return json(["st"=>200,"msg"=>"success"]);
}

3行代码搞定,看,封装完了后调用发消息是不是很简单,看下效果吧
首先打开rabbitmq的控制界面,不知道怎么操作的回到本文顶部环境安装有链接
在这里插入图片描述
现在队列是干净的,现在开始访问这个发消息的方法
浏览器返回
在这里插入图片描述
然后看下控制界面,首先交换机下有一个刚才我们创建的user_ex,并且是持久化的
在这里插入图片描述
然后看下队列
在这里插入图片描述
哎队列中也有了,并且有一条消息,这条消息是是ready状态,说明还没被消费,可以点进去偷偷看下
在这里插入图片描述
emmm,就是我们刚才发的这条,那么发送消息就完成了!

4.消费者接收消息

消费者一般都是cli模式在xshell下运行一个php 文件,如果我偏要用TP框架怎么操作呢,首先要进入项目的public目录,

cd /www/wwwroot/pro/public

然后运行

php index.php 模块/控制器/方法 &

&代表守护进程运行
然后是消费者的代码,实际业务中,消费者在接到消息的时候,肯定要根据不同的消息做不同的逻辑处理,比如我们这就是需要把消息中的user信息插入到数据库,所以如果直接把消息处理的逻辑写到消费者代码中耦合度太高,相关资料中呢都是这样写的

 $q->consume("receive");function receive(){// TODO
}

这种只适合在但php脚本文件中运行,在TP框架没法弄啊,我是这么操作的
创建一个Message的类也好,控制器也行,代码如下

class Message extends Controller
{function processMessage($envelope, $queue) {$msg = json_decode($envelope->getBody(),true);// RabbitUser是modelRabbitUser::create($msg);$queue->ack($envelope->getDeliveryTag());}
}

一会接到消息让这个方法去处理,这个类下也可以写很多的方法,根据不同的消息处理不同的业务逻辑
好,下面是消费者代码:

 public function receiveMsg(Request $request){$mq = new \RabbitMqUtilsOri("user_ex","user_que","user.*");// 第一个参数是交换机名称,第二个参数是队列名称,第三个参数是路由键名称,因为采用的topic模型,所以可以用*代替任何user.xxx的操作$message = new Message();$mq->consume([$message,"processMessage"]);}   

当监听到消息时,会调用Message中的processMessage方法,改方法中RabbitUser::create($msg);会向数据库插入一条纪录
运行代码,在/public下,运行

php index.php index/Consume/receiveMsg &

![在这里插入图片描述](https://img-blog.csdnimg.cn/20210129142514989.png在这里插入图片描述
先看下rabbitmq,刚才的消息被消费掉了
在这里插入图片描述
然后看下数据库
在这里插入图片描述
顺利插入了一条纪录,完工!!!


本文来自互联网用户投稿,文章观点仅代表作者本人,不代表本站立场,不承担相关法律责任。如若转载,请注明出处。 如若内容造成侵权/违法违规/事实不符,请点击【内容举报】进行投诉反馈!

相关文章

立即
投稿

微信公众账号

微信扫一扫加关注

返回
顶部