简介
TP 中使用 think-queue 可以实现普通队列和延迟队列。
think-queue 是thinkphp 官方提供的一个消息队列服务,它支持消息队列的一些基本特性:
- 消息的发布,获取,执行,删除,重发,失败处理,延迟执行,超时控制等
- 队列的多队列, 内存限制 ,启动,停止,守护等
- 消息队列可降级为同步执行
消息队列实现过程
- 通过生产者推送消息到消息队列服务中
- 消息队列服务将收到的消息存入redis队列中(zset)
- 消费者进行监听队列,当监听到队列有新的消息时,获取队列第一条
- 处理获取下来的消息调用业务类进行处理相关业务
- 业务处理后,需要从队列中删除消息
安装queue
composer 安装 think-queue
composer require topthink/think-queue
配置文件
在 \application\extra 新建queue.php 为 queue 的配置文件
声明启动redis (服务器redis需要先开启安装redis扩展)
return [ 'connector' => 'Redis', // Redis 驱动 'expire' => null, // 任务的过期时间,默认为60秒; 若要禁用,则设置为 null 'default' => 'default', // 默认的队列名称 'host' => '127.0.0.1', // redis 主机ip 'port' => 6379, // redis 端口 'password' => '', // redis 密码 'select' => 0, // 使用哪一个 db,默认为 db0 'timeout' => 0, // redis连接的超时时间 'persistent' => false, // 是否是长连接];
queue服务基类
首先要写一个声明一个服务基类 命名空间及文件夹名 自己定义好 到直接位置
我这里是在 \application\common\library\task\OrderClose.php 这个路径
namespace app\common\library\task;use app\common\controller\Frontend;use think\Lang;use think\Response;use think\queue\Job;class OrderClose{ public function fire(Job $job, $data) { // 此处做一些 check,提前判断是否需要执行 // $isJobStillNeedToBeDone = $this->checkJob($data); // if(! $isJobStillNeedToBeDone){ // $job->delete(); // return; // } // if ($job->attempts() > 6) { // $job->delete(); // // 也可以重新发布这个任务 // //$job->release(2); // $delay为延迟时间,表示该任务延迟2秒后再执行 // } // 执行逻辑处理(即:你需要该消息队列做什么) $isJobDone = $this->doJob($data); if ($isJobDone) { // 如果任务执行成功,记得删除任务 $job->delete(); // $job->release(10); } else { //删除任务 $job->delete(); // 通过这个方法可以检查这个任务已经重试了几次了 } } private function checkJob($data){ // $order_id = !empty($data['order_id']) ? $data['order_id'] : ''; // if(!$order_id) return false; // $orderModel = new \app\api\model\order\Order; // $orderInfo = $orderModel->where(['id'=>$order_id])->find(); // if(!$orderInfo) return false; // if($orderInfo->status != '1') return false; // //更新订单数据 // $updateData = [ // 'status'=>'10', // 'close_time'=>time(), // 'remarks'=>'未支付订单超时自动关闭' // ]; // $ret = $orderModel->save($updateData); // if($ret){ // return true; // }else{ // return false; // } } private function doJob($data) { $order_id = !empty($data['order_id']) ? $data['order_id'] : ''; if(!$order_id) return false; $orderInfo = \app\api\model\order\Order::where(['id'=>$order_id])->find(); // \think\Log::write(['msg'=>'测试队列7878','data'=>$orderInfo,'order_id'=>$order_id,'activity_id'=>$orderInfo['activity_id']],'error'); if(!$orderInfo) return false; if($orderInfo->status != '1') return false; //更新订单数据 $updateData = [ 'status'=>'10', 'close_time'=>time(), 'remarks'=>'未支付订单超时自动关闭' ]; $ret = \app\api\model\order\Order::where(['id'=>$order_id])->update($updateData); //归还库存 if($orderInfo['activity_id']){ // \think\Log::write(['msg'=>'测试队列返还库存','data'=>$orderInfo,'order_id'=>$order_id,'activity_id'=>$orderInfo['activity_id']],'error'); $orderProductInfo = \app\api\model\order\OrderProduct::where('order_id',$order_id)->find(); $backStock = \app\api\model\activity\Activity::where('id',$orderInfo['activity_id'])->setInc('stock',$orderProductInfo['number']); } if($ret){ return true; }else{ return false; } } }
$job->release(2);
重新调用任务 2是延迟2秒执行
$job->delete();
删除任务
调用
在实际应用中调用该基类
//创建订单自动关闭任务$jobHandlerClassName = 'app\common\library\task\OrderClose';$jobData = ['order_id'=>$orderProductDatas['order_id']];$jobQueueName = 'order_close';//延迟执行$isPushed = \think\Queue::later(30 * 60,$jobHandlerClassName, $jobData,$jobQueueName);//立即执行$isPushed = \think\Queue::push($jobHandlerClassName, $jobData,$jobQueueName);
两个方法,前者是(30 * 60)秒后执行,后者立即执行
注意的是
later 延迟执行 无反参 我执行成功 返回了null
push 有反参 返回随机的字符串
开启守护进程
我服务器是用宝塔的守护进程
在这里安装Supervisor管理器
安装完需要配置一下守护进程的命令行
名称必须是英文
运行目录要选择项目根目录
命令行如下 注意order_close 是你项目基类名称
php think queue:listen --queue order_close
填写完成之后 保存就可以了
如果不是宝塔环境 请看一下 supervisor—进程管理神器
给我大哥点点关注
结束
这里的配置就基本结束了 基类中的 注释 表明了一下需要参数和方法 需要自己多揣摩一下
还有自己的业务逻辑也要修改 这里只作为参考 大家自行删除就可以
来源地址:https://blog.csdn.net/weixin_43866089/article/details/126436143