文章详情

短信预约-IT技能 免费直播动态提醒

请输入下面的图形验证码

提交验证

短信预约提醒成功

think\queue 消息队列

2023-09-10 22:13

关注

简介

TP 中使用 think-queue 可以实现普通队列和延迟队列。

think-queue 是thinkphp 官方提供的一个消息队列服务,它支持消息队列的一些基本特性:

消息队列实现过程

  1. 通过生产者推送消息到消息队列服务中
  2. 消息队列服务将收到的消息存入redis队列中(zset)
  3. 消费者进行监听队列,当监听到队列有新的消息时,获取队列第一条
  4. 处理获取下来的消息调用业务类进行处理相关业务
  5. 业务处理后,需要从队列中删除消息

安装queue

composer 安装 think-queue

composer require topthink/think-queue

配置文件

在 \application\extra 新建queue.php 为 queue 的配置文件
声明启动redis (服务器redis需要先开启安装redis扩展)

return [    'connector'  => 'Redis',          // Redis 驱动    'expire'     => null,             // 任务的过期时间,默认为60秒; 若要禁用,则设置为 null    'default'    => 'default',        // 默认的队列名称    'host'       => '127.0.0.1',      // redis 主机ip    'port'       => 6379,             // redis 端口    'password'   => '',               // redis 密码    'select'     => 0,                // 使用哪一个 db,默认为 db0    'timeout'    => 0,                // redis连接的超时时间    'persistent' => false,            // 是否是长连接];

queue服务基类

首先要写一个声明一个服务基类 命名空间及文件夹名 自己定义好 到直接位置
我这里是在 \application\common\library\task\OrderClose.php 这个路径

namespace app\common\library\task;use app\common\controller\Frontend;use think\Lang;use think\Response;use think\queue\Job;class OrderClose{            public function fire(Job $job, $data)    {        // 此处做一些 check,提前判断是否需要执行        // $isJobStillNeedToBeDone = $this->checkJob($data);        // if(! $isJobStillNeedToBeDone){        //     $job->delete();        //     return;        // }        // if ($job->attempts() > 6) {        //     $job->delete();        //     // 也可以重新发布这个任务        //     //$job->release(2); // $delay为延迟时间,表示该任务延迟2秒后再执行        // }        // 执行逻辑处理(即:你需要该消息队列做什么)        $isJobDone = $this->doJob($data);        if ($isJobDone) {            // 如果任务执行成功,记得删除任务            $job->delete();            // $job->release(10);        } else {            //删除任务            $job->delete();            // 通过这个方法可以检查这个任务已经重试了几次了                    }    }            private function checkJob($data){        // $order_id = !empty($data['order_id']) ? $data['order_id'] : '';        // if(!$order_id) return false;        // $orderModel = new \app\api\model\order\Order;        // $orderInfo = $orderModel->where(['id'=>$order_id])->find();        // if(!$orderInfo) return false;        // if($orderInfo->status != '1') return false;        // //更新订单数据        // $updateData = [        //     'status'=>'10',        //     'close_time'=>time(),        //     'remarks'=>'未支付订单超时自动关闭'        // ];        // $ret = $orderModel->save($updateData);        // if($ret){        //     return true;        // }else{        //     return false;            // }    }            private function doJob($data)    {        $order_id = !empty($data['order_id']) ? $data['order_id'] : '';        if(!$order_id) return false;        $orderInfo = \app\api\model\order\Order::where(['id'=>$order_id])->find();        // \think\Log::write(['msg'=>'测试队列7878','data'=>$orderInfo,'order_id'=>$order_id,'activity_id'=>$orderInfo['activity_id']],'error');        if(!$orderInfo) return false;        if($orderInfo->status != '1') return false;        //更新订单数据        $updateData = [            'status'=>'10',            'close_time'=>time(),            'remarks'=>'未支付订单超时自动关闭'        ];        $ret = \app\api\model\order\Order::where(['id'=>$order_id])->update($updateData);        //归还库存        if($orderInfo['activity_id']){            // \think\Log::write(['msg'=>'测试队列返还库存','data'=>$orderInfo,'order_id'=>$order_id,'activity_id'=>$orderInfo['activity_id']],'error');            $orderProductInfo = \app\api\model\order\OrderProduct::where('order_id',$order_id)->find();            $backStock = \app\api\model\activity\Activity::where('id',$orderInfo['activity_id'])->setInc('stock',$orderProductInfo['number']);        }        if($ret){            return true;        }else{            return false;            }    }    }

$job->release(2); 重新调用任务 2是延迟2秒执行
$job->delete(); 删除任务

调用

在实际应用中调用该基类

//创建订单自动关闭任务$jobHandlerClassName = 'app\common\library\task\OrderClose';$jobData = ['order_id'=>$orderProductDatas['order_id']];$jobQueueName = 'order_close';//延迟执行$isPushed = \think\Queue::later(30 * 60,$jobHandlerClassName, $jobData,$jobQueueName);//立即执行$isPushed = \think\Queue::push($jobHandlerClassName, $jobData,$jobQueueName);

两个方法,前者是(30 * 60)秒后执行,后者立即执行
注意的是
later 延迟执行 无反参 我执行成功 返回了null
push 有反参 返回随机的字符串

开启守护进程

我服务器是用宝塔的守护进程
在这里插入图片描述
在这里安装Supervisor管理器
安装完需要配置一下守护进程的命令行

在这里插入图片描述
名称必须是英文
运行目录要选择项目根目录
命令行如下 注意order_close 是你项目基类名称

php think queue:listen --queue order_close

填写完成之后 保存就可以了

如果不是宝塔环境 请看一下 supervisor—进程管理神器
给我大哥点点关注

结束

这里的配置就基本结束了 基类中的 注释 表明了一下需要参数和方法 需要自己多揣摩一下
还有自己的业务逻辑也要修改 这里只作为参考 大家自行删除就可以

来源地址:https://blog.csdn.net/weixin_43866089/article/details/126436143

阅读原文内容投诉

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

软考中级精品资料免费领

  • 历年真题答案解析
  • 备考技巧名师总结
  • 高频考点精准押题
  • 2024年上半年信息系统项目管理师第二批次真题及答案解析(完整版)

    难度     801人已做
    查看
  • 【考后总结】2024年5月26日信息系统项目管理师第2批次考情分析

    难度     348人已做
    查看
  • 【考后总结】2024年5月25日信息系统项目管理师第1批次考情分析

    难度     311人已做
    查看
  • 2024年上半年软考高项第一、二批次真题考点汇总(完整版)

    难度     432人已做
    查看
  • 2024年上半年系统架构设计师考试综合知识真题

    难度     220人已做
    查看

相关文章

发现更多好内容

猜你喜欢

AI推送时光机
位置:首页-资讯-后端开发
咦!没有更多了?去看看其它编程学习网 内容吧
首页课程
资料下载
问答资讯