业务场景
项目公司是主php做开发的,框架为thinkphp。众所周知,php本身的运行效率存在一定的缺陷,所以如果有一个很复杂很耗时的业务时,必须开发一个常驻内存的程序。首先我想到了php的workerman与swoole,但是这里应上面的哈,想将耗时任务交给另一个服务器,同时列队处理。所以这里我想独立部署一个rabbitMQ服务器用于处理列队任务。
当rabbitMQ服务器我们准备好了,建立了一个持久化命名为ceshi的列队,如下:
项目上生产者和消费者的开发我这里全部采用tinkphp6+workerman,为便于管理。这里这么做也是因为发现workerman中对rabbitMQ的文档解释太少了!
所以开始踩坑!
1、首先部署好thinkphp6框架
过程去看thinkphp6手册
2、安装workerman扩展
过程去看thinkphp6手册
3、生产者
配置一个workerman类
创建的Send类代码如下:
<?php
namespace app\workerman;
use Bunny\Channel;
use Workerman\RabbitMQ\Client;
use think\worker\Server;
class Send extends Server
{
//websocket地址,一会用于测试。
protected $socket = 'websocket://127.0.0.1:2345';
public function onMessage($connection, $data)
{
//websocket发送过来的消息
$connection->send('我收到你的信息了:'.$data);
//rabbitMQ配置
$options = [
'host'=>'127.0.0.1',//rabbitMQ IP
'port'=>5672,//rabbitMQ 通讯端口
'user'=>'admin',//rabbitMQ 账号
'password'=>'123456'//rabbitMQ 密码
];
(new Client($options))->connect()->then(function (Client $client) {
return $client->channel();
})->then(function (Channel $channel) {
return $channel->queueDeclare('ceshi', false, true, false, false)->then(function () use ($channel) {
return $channel;
});
})->then(function (Channel $channel) use($data){
echo "发送消息内容:".$data."\n";
return $channel->publish($data, ['content_type' => 'text/plain'], '', 'ceshi')->then(function () use ($channel) {
return $channel;
});
})->then(function (Channel $channel) {
//echo " [x] Sent 'Hello World!'\n";
$client = $channel->getClient();
return $channel->close()->then(function () use ($client) {
return $client;
});
})->then(function (Client $client) {
$client->disconnect();
});
}
public function onConnect($connection)
{
}
public function onClose($connection)
{
}
public function onError($connection, $code, $msg)
{
echo "error $code $msg\n";
}
public function onWorkerStart($worker)
{
}
}
上述都OK以后咱们可以项目路径下通过命令启动这个生产者:
php think worker:server
测试发送数据:
通过这个网站
连接【ws://127.0.0.1:2345】后发送数据!
前往rabbitMQ控制台
列队中有一条消息产生并且等待了!
这个时候你可能问,如果我发送数据不想通过ws发送而是接口发送怎么办?
笨思路呗:接口给内置服务器发消息->内置服务去发消息给rabbitMQ
将协议改为tcp
然后重新启动服务
然后去tp6创建一个路由接口
接口代码
<?php
namespace app\controller;
use app\BaseController;
class Index extends BaseController
{
public function index(string $msg)
{
//连接本地tcp服务
$client = stream_socket_client('tcp://127.0.0.1:2345', $errno, $errmsg, 1);
//发送字符串
fwrite($client, $msg."\n");
//断开服务
fclose($client);
return 'OK';
}
}
执行结果:
说明接口成功的将数据发送给了本地内置的tcp服务。
同时,内置服务将收到的数据给了rabbitMQ服务列队中。
生产者完成。
4、消费者
同生产者一样新创建一个thinkphp6及安装workerman扩展,注意端口别和生产者冲突!这里我设置的是2346端口
创建的Receive类代码如下:
<?php
namespace app\workerman;
use Bunny\Channel;
use Bunny\Message;
use Workerman\RabbitMQ\Client;
use think\worker\Server;
class Receive extends Server
{
protected $socket = 'tcp://127.0.0.1:2346';
public function onMessage($connection, $data)
{
}
public function onConnect($connection)
{
}
public function onClose($connection)
{
}
public function onError($connection, $code, $msg)
{
echo "error $code $msg\n";
}
public function onWorkerStart($worker)
{
//rabbitMQ配置
$options = [
'host'=>'127.0.0.1',//rabbitMQ IP
'port'=>5672,//rabbitMQ 通讯端口
'user'=>'admin',//rabbitMQ 账号
'password'=>'123456'//rabbitMQ 密码
];
(new Client($options))->connect()->then(function (Client $client) {
return $client->channel();
})->then(function (Channel $channel) {
return $channel->queueDeclare('ceshi', false, true, false, false)->then(function () use ($channel) {
return $channel;
});
})->then(function (Channel $channel) {
echo ' [*] Waiting for messages. To exit press CTRL+C', "\n";
$channel->consume(
function (Message $message, Channel $channel, Client $client) {
echo "接收消息内容:", $message->content, "\n";
},
'ceshi',
'',
false,
true
);
});
}
}
都OK以后咱们可以项目路径下通过命令启动这个消费者:
php think worker:server
此时应该会自动消费掉rabbitMQ中等待的消息!
到这里消费者也就结束啦!
5、整体测试
接下来我用cmd来启动两个服务,然后用接口发送消息和消费测试!
至于具体怎么灵活应用自行开拓大脑哦~
比如php项目有些业务吃力,可以去做个java的消费端,让java来完成任务~
以上就是PHP实现RabbitMQ消息列队的示例代码的详细内容,更多关于PHP RabbitMQ消息列队的资料请关注编程网其它相关文章!