文章详情

短信预约-IT技能 免费直播动态提醒

请输入下面的图形验证码

提交验证

短信预约提醒成功

TP5 queue队列详解

2023-08-31 12:15

关注

thinkphp-queue 笔记

前言

当前笔记中的内容针对的是 thinkphp-queue 的 v2.0 版本

传统的程序执行流程一般是 即时|同步|串行的,在某些场景下,会存在并发低,吞吐量低,响应时间长等问题。在大型系统中,一般会引入消息队列的组件,将流程中部分任务抽离出来放入消息队列,并由专门的消费者作针对性的处理,从而降低系统耦合度,提高系统性能和可用性。

一般来说,可以抽离的任务具有以下的特点:

thinkphp-queue 是thinkphp 官方提供的一个消息队列服务,它支持消息队列的一些基本特性:

thinkphp-queue 内置了 RedisDatabaseTopthinkSync这四种驱动。本文主要介绍 thinkphp-queue 结合其内置的 redis 驱动的使用方式和基本原理。

注1:如无特殊说明,下文中的 ‘消息’ 和 ‘任务’两个词指代的是同一个概念,即队列中的一个成员。该成员对消息队列而言是其内部保存的消息; 对业务应用而言是一个待执行的任务。请根据语境区分。

注2:本文编写时(2019-01-30)使用的 thinkphp-queue 的版本号是 v2.0 。如有变更,请以官方最新版为准。

一 代码示例

先通过一段代码,了解一下 thinkphp-queue 的基本使用流程。

目标:

在业务控制器中推送一个新消息到一个名为 ‘helloJobQueue’ 的队列中,该消息中包含我们自定义的业务数据,然后,编写一个名为 Hello 的消费者类,并通过命令行去调用该消费者类获取这个消息,拿到定义的数据。

1.1 安装 thinkphp-queue

复制composer install topthink/think-queue

1.2 搭建消息队列的存储环境

1.3 配置消息队列的驱动

根据选择的存储方式,在 \application\config\queue.php 这个配置文件中,添加消息队列对应的驱动配置

复制   return [       'connector'  => 'Redis',       // Redis 驱动       'expire'     => 60,    // 任务的过期时间,默认为60秒; 若要禁用,则设置为 null        'default'    => 'default',     // 默认的队列名称       'host'       => '127.0.0.1',    // redis 主机ip       'port'       => 6379,      // redis 端口       'password'   => '',    // redis 密码       'select'     => 0,     // 使用哪一个 db,默认为 db0       'timeout'    => 0,     // redis连接的超时时间       'persistent' => false,     // 是否是长连接

// ‘connector’ => ‘Database’, // 数据库驱动
// ‘expire’ => 60, // 任务的过期时间,默认为60秒; 若要禁用,则设置为 null
// ‘default’ => ‘default’, // 默认的队列名称
// ‘table’ => ‘jobs’, // 存储消息的表名,不带前缀
// ‘dsn’ => [],

// ‘connector’ => ‘Topthink’, // ThinkPHP内部的队列通知服务平台 ,本文不作介绍
// ‘token’ => ‘’,
// ‘project_id’ => ‘’,
// ‘protocol’ => ‘https’,
// ‘host’ => ‘qns.topthink.com’,
// ‘port’ => 443,
// ‘api_version’ => 1,
// ‘max_retries’ => 3,
// ‘default’ => ‘default’,

// ‘connector’ => ‘Sync’, // Sync 驱动,该驱动的实际作用是取消消息队列,还原为同步执行
];

1.3.1 配置文件中的 expire 参数说明

expire 参数指的是任务的过期时间, 单位为秒。 过期的任务,其准确的定义是

  1. 任务的状态为执行中
  2. 任务的开始执行的时刻 + expire > 当前时刻

expire 不为null 时 ,thinkphp-queue 会在每次获取下一个任务之前检查并重发过期(执行超时)的任务。

expire 为null 时,thinkphp-queue 不会检查过期的任务,性能相对较高一点。但是需要注意:

对expire 参数理解或者使用不当时,很容易产生一些bug,后面会举例提到。

1.4 消息的创建与推送

我们在业务控制器中创建一个新的消息,并推送到 helloJobQueue 队列

新增 \application\index\controller\JobTest.php 控制器,在该控制器中添加 actionWithHelloJob 方法

复制namespace app\index\controller;  use think\Exception;

use thinkQueue;

class JobTest {

public function actionWithHelloJob(){

// 1.当前任务将由哪个类来负责处理。
// 当轮到该任务时,系统将生成一个该类的实例,并调用其 fire 方法
$jobHandlerClassName = ‘app\index\job\Hello’;

// 2.当前任务归属的队列名称,如果为新队列,会自动创建
$jobQueueName = “helloJobQueue”;

// 3.当前任务所需的业务数据 . 不能为 resource 类型,其他类型最终将转化为json形式的字符串
// ( jobData 为对象时,存储其public属性的键值对 )
$jobData = [ ‘ts’ => time(), ‘bizId’ => uniqid() , ‘a’ => 1 ] ;

// 4.将该任务推送到消息队列,等待对应的消费者去执行
i s P u s h e d < / s p a n > = Q u e u e : : p u s h ( < s p a n c l a s s = " h l j s − v a r i a b l e " > isPushed = Queue::push( isPushed</span>=Queue::push(<spanclass="hljsvariable">jobHandlerClassName , j o b D a t a < / s p a n > , < s p a n c l a s s = " h l j s − v a r i a b l e " > jobData , jobData</span>,<spanclass="hljsvariable">jobQueueName );

// database 驱动时,返回值为 1|false ; redis 驱动时,返回值为 随机字符串|false
if( $isPushed !== false ){
echo date(‘Y-m-d H:i:s’) . " a new Hello Job is Pushed to the MQ".
;
}else{
echo ‘Oops, something went wrong.’;
}
}
}

注意: 在这个例子当中,我们是手动指定的 $jobHandlerClassName ,更合理的做法是先定义好消息名称与消费者类名的映射关系,然后由某个可以获取该映射关系的类来推送这个消息。这样,生产者只需要知道消息的名称,而无需指定哪个消费者类来处理。

除了 Queue::push( $jobHandlerClassName , $jobData , $jobQueueName );这种方式之外,还可以直接传入 Queue::push( $jobHandlerObject ,null , $jobQueueName ); 这时,需要在 $jobHandlerObject 中定义一个 handle() 方法,消息队列在执行到该任务时会自动反序列化该对象,并调用其 handle()方法。 该方式中, 数据需要提前挂载在 $jobHandlerObject 对象上。

1.5 消息的消费与删除

编写 Hello 消费者类,用于处理 helloJobQueue 队列中的任务

新增 \application\index\job\Hello.php 消费者类,并编写其 fire() 方法

复制     namespace app\index\job;

use thinkqueueJob;

class Hello {

    public function fire(Job $job,$data)  {      // 有些消息在到达消费者时,可能已经不再需要执行了      $isJobStillNeedToBeDone = $this->checkDatabaseToSeeIfJobNeedToBeDone($data);      if(!$isJobStillNeedToBeDone){          $job->delete();          return;      }      $isJobDone = $this->doHelloJob($data);      if ($isJobDone) {          // 如果任务执行成功, 记得删除任务          $job->delete();          print("<info>Hello Job has been done and deleted"."</info>\n");      }else{          if ($job->attempts() > 3) {              //通过这个方法可以检查这个任务已经重试了几次了              print("<warn>Hello Job has been retried more than 3 times!"."</warn>\n");                $job->delete();              // 也可以重新发布这个任务              //print("<info>Hello Job will be availabe again after 2s."."</info>\n");              //$job->release(2); //$delay为延迟时间,表示该任务延迟2秒后再执行          }      }  }    private function checkDatabaseToSeeIfJobNeedToBeDone($data){      return true;  }    private function doHelloJob($data)   {      print("<info>Hello Job Started. job Data is: ".var_export($data,true)."</info> \n");      print("<info>Hello Job is Fired at " . date('Y-m-d H:i:s') ."</info> \n");      print("<info>Hello Job is Done!"."</info> \n");      return true;  }

}

至此,所有的代码都已准备完毕,在运行消息队列之前,我们先看一下现在的目录结构:

目录结构-代码示例

1.6 发布任务

在浏览器中访问 http://your.project.domain/index/job_test/actionWithHelloJob ,可以看到消息推送成功。

浏览器提示消息推送结果

1.7 处理任务

切换当前终端窗口的目录到项目根目录下,执行

复制php think queue:work --queue helloJobQueue

可以看到执行的结果类似如下:

命令行执行结果

至此,我们成功地经历了一个消息的 创建 -> 推送 -> 消费 -> 删除 的基本流程

下文,将介绍 thinkphp-queue 的详细使用方法。如配置介绍,基本原理,各种特殊情况的处理等

二 详细介绍

2.1 命令模式

2.2 命令行参数

2.3 work 模式和 listen 模式的区别

两者都可以用于处理消息队列中的任务

区别在于:

复制- expire 是指任务的过期时间。这个时间是全局的,影响到所有的work进程。(不管是独立的work命令还是 listen 模式下创建的的 work 进程) 。expire 针对的对象是 **任务**- timeout 是指 work 进程的超时时间。这个时间只对当前执行的 listen 命令有效。timeout 针对的对象是 **work 进程**

2.4 消息队列的开始,停止与重启

2.5 多模块,多任务的处理

taskType= taskType = taskType</span>=<spanclass="hljsvariable">_GET[‘taskType’];
switch (KaTeX parse error: Expected '}', got 'EOF' at end of input: …hljs-variable">jobHandlerClassName = ‘application\index\job\MultiTask@taskA’;
KaTeX parse error: Expected 'EOF', got '&' at position 63: …'a' =&̲gt; jobQueueName = “multiTaskJobQueue”;
break;
case ‘taskB’:
KaTeX parse error: Undefined control sequence: \index at position 69: …g">'application\̲i̲n̲d̲e̲x̲\job\MultiTask@…jobDataArr = [‘b’ => ‘2’];
$jobQueueName = “multiTaskJobQueue”;
break;
default:
break;
}

isPushed=Queue::push( isPushed = Queue::push( isPushed</span>=Queue::push(<spanclass="hljsvariable">jobHandlerClassName, jobDataArr, jobDataArr, jobDataArr</span>,<spanclass="hljsvariable">jobQueueName);
if (KaTeX parse error: Expected '}', got 'EOF' at end of input: …s="hljs-subst">taskType of MultiTask Job has been Pushed to “.KaTeX parse error: Expected 'EOF', got '&' at position 49: …"hljs-string">"&̲lt;br>"taskType of MultiTask Job Failed!”);
}
}

复制namespace app\index\job;

use thinkqueueJob;

class MultiTask {

public function taskA(Job $job,$data){    $isJobDone = $this->_doTaskA($data);    if ($isJobDone) {        $job->delete();        print("Info: TaskA of Job MultiTask has been done and deleted"."\n");    }else{        if ($job->attempts() > 3) {            $job->delete();          }    }}public function taskB(Job $job,$data){    $isJobDone = $this->_doTaskA($data);    if ($isJobDone) {        $job->delete();        print("Info: TaskB of Job MultiTask has been done and deleted"."\n");    }else{        if ($job->attempts() > 2) {            $job->release();             }    }}private function _doTaskA($data) {    print("Info: doing TaskA of Job MultiTask "."\n");    return true;}private function _doTaskB($data) {    print("Info: doing TaskB of Job MultiTask "."\n");    return true;}

2.6 消息的延迟执行与定时执行

延迟执行,相对于即时执行,是用来限制某个任务的最早可执行时刻。在到达该时刻之前,该任务会被跳过。

可以利用该功能实现定时任务

使用方式:

复制// 即时执行$isPushed = Queue::push($jobHandlerClassName, $jobDataArr, $jobQueueName);

// 延迟 2 秒执行
isPushed=Queue::later(2, isPushed = Queue::later( 2, isPushed</span>=Queue::later(<spanclass="hljsnumber">2</span>,<spanclass="hljsvariable">jobHandlerClassName, jobDataArr, jobDataArr, jobDataArr</span>,<spanclass="hljsvariable">jobQueueName);

// 延迟到 2017-02-18 01:01:01 时刻执行
time2wait=strtotime( ′ 2017−02−1801:01:0 1 ′ )−strtotime( ′ no w ′ ); time2wait = strtotime('2017-02-18 01:01:01') - strtotime('now'); time2wait</span>=strtotime(<spanclass="hljsstring">2017021801:01:01</span>)strtotime(<spanclass="hljsstring">now</span>);<spanclass="hljsvariable">isPushed = Queue::later( time2wait, time2wait, time2wait</span>,<spanclass="hljsvariable">jobHandlerClassName, jobDataArr, jobDataArr, jobDataArr</span>,<spanclass="hljsvariable">jobQueueName);

复制// 重发,即时执行$job->release();

// 重发,延迟 2 秒执行
$job->release(2);

// 延迟到 2017-02-18 01:01:01 时刻执行
time2wait=strtotime( ′ 2017−02−1801:01:0 1 ′ )−strtotime( ′ no w ′ ); time2wait = strtotime('2017-02-18 01:01:01') - strtotime('now'); time2wait</span>=strtotime(<spanclass="hljsstring">2017021801:01:01</span>)strtotime(<spanclass="hljsstring">now</span>);<spanclass="hljsvariable">job->release($time2wait);

复制//如果消费者类的fire()方法抛出了异常且任务未被删除时,将自动重发该任务,重发时,会设置其下次执行前延迟多少秒,默认为0php think queue:work --delay 3

2.7 消息的重发

thinkphp-queue 中,消息的重发时机有3种:

复制if( $isJobDone === false){    $job->release();}

补充:

在database 模式下,2.7.1 和 2.7.2 中的重发逻辑是先删除原来的任务,然后插入一个新的任务。2.7.3 中的重发时机是直接更新原任务。

而在redis 模式下,3种重发都是先删除再插入。

不管是哪种重发方式,重发之后,任务的已尝试次数会在原来的基础上 +1 。

此外,消费者类中需要注意,如果 fire() 方法中可能抛出异常,那么

2.8 任务的失败回调及告警

当同时满足以下条件时,将触发任务失败回调:

注意, queue_failed 标签需要在安装了 thinkphp-queue 之后 手动\application\tags.php 文件中添加。

首先,我们添加 queue_failed 事件标签, 及其对应的回调方法

复制// 文件路径: \application\tags.php// 应用行为扩展定义文件return [    // 应用初始化    'app_init'     => [],    // 应用开始    'app_begin'    => [],    // 模块初始化    'module_init'  => [],    // 操作开始执行    'action_begin' => [],    // 视图内容过滤    'view_filter'  => [],    // 日志写入    'log_write'    => [],    // 应用结束    'app_end'      => [],
// 任务失败统一回调,有四种定义方式'queue_failed'=> [     // 数组形式,[ 'ClassName' , 'methodName']    ['application\\behavior\\MyQueueFailedLogger', 'logAllFailedQueues']     // 字符串(静态方法),'StaicClassName::methodName'     // 'MyQueueFailedLogger::logAllFailedQueues'        // 字符串(对象方法),'ClassName',此时需在对应的ClassName类中添加一个名为 queueFailed 的方法     // 'application\\behavior\\MyQueueFailedLogger'     // 闭包形式     ]

];

这里,我们选择数组形式的回调方式,新增 \application\behavior\MyQueueFailedLogger 类,添加一个 logAllFailedQueues() 方法

复制

namespace appbehavior;

class MyQueueFailedLogger {

const should_run_hook_callback = true;public function logAllFailedQueues(&$jobObject){    $failedJobLog = [        'jobHandlerClassName'   => $jobObject->getName(), // 'application\index\job\Hello'        'queueName' => $jobObject->getQueue(),          // 'helloJobQueue'           'jobData'   => $jobObject->getRawBody()['data'],  // '{'a': 1 }'        'attempts'  => $jobObject->attempts(),            // 3    ];    var_export(json_encode($failedJobLog,true));       // $jobObject->release();     //重发任务      //$jobObject->delete();         //删除任务      //$jobObject->failed();    //通知消费者类任务执行失败    return self::should_run_hook_callback;         }

}

需要注意该回调方法的返回值:

最后,在消费者类中,添加 failed() 方法

复制

public function failed( jobDatastringarray...//jobData/</span><spanclass="hljskeyword">public</span><spanclass="hljsfunction"><spanclass="hljskeyword">function</span><spanclass="hljstitle">failed</span>(<spanclass="hljsparams"><spanclass="hljsvariable">jobData){
send_mail_to_somebody() ;

print(“Warning: Job failed after max retries. job data is :”.var_export($data,true).“\n”;
}

这样,就可以做到任务失败的记录告警

2.9 处理过期的任务

过期这个概念用文字比较难描述清楚,建议先看一下 深入理解3.4 消息处理的详细流程图

三 深入理解

3.1 thinkphp-queue 中消息与队列的保存方式

3.2 thinkphp-queue 的目录结构和类关系图

thinkphp-queue的文件目录

这些类构成了消息队列中的几个角色:

角色类名说明
命令行Command + Worker负责解析命令行参数,控制队列的启动,重启
驱动Queue + Connector负责队列的创建,以及消息的入队,出队等操作
任务Job用于将消息转化为一个任务对象,供消费者使用
生产者业务代码负责消息的创建与发布
消费者业务代码负责任务的接收与执行

各个类之间的关系图如下:

thinkphp-queue类关系图

3.3 Deamon模式的执行流程

Daemon模式与非daemon模式状态图

3.4 Database模式下消息处理的详细流程

下图中,展示了database 模式下消息处理的详细流程,redis 驱动下大体类似

Database模式下消息获取和执行的具体流程

3.5 redis 驱动下的任务重发细节

在redis驱动下,为了实现任务的延迟执行和过期重发,任务将在这三个key中来回转移。

在3.4 Database模式下消息处理的消息流程中,我们知道,如果配置的expire 不是null ,那么 thinkphp-queue的work进程每次在获取下一个可执行任务之前,会先尝试重发所有过期的任务。而在redis驱动下,这个步骤则做了更多的事情,详情如下:

  1. queue:xxx:delayed 的key中查询出有哪些任务在当前时刻已经可以开始执行,然后将这些任务转移到 queue:xxx 的key的尾部。
  2. queue:xxx:reserved 的key中查询出有哪些任务在当前时刻已经过期,然后将这些任务转移到 queue:xxx的key的尾部。
  3. 尝试从 queue:xxx 的key的头部取出一个任务,如果取出成功,那么,将这个任务转移到 queue:xxx:reserved 的key 的头部,同时将这个任务实例化成任务对象,交给消费者去执行。

用图来表示这个步骤的具体过程如下:

redis队列中的过期任务重发步骤—执行前:

redis队列中的过期任务重发步骤-执行前

redis队列中的过期任务重发步骤—执行后:

redis队列中的过期任务重发步骤--执行后

3.6 thinkphp-queue的性能

注意:由于在测试时,Host 机本身的cpu和内存长期100%,并且虚拟机中的各项服务并未专门调优,因此该测试结果并不具备参考性

3.7 thinkphp-queue 的注意事项

四 拓展

4.1 队列的稳定性和拓展性

4.2 消息队列的可视化管理工具

4.2 编写自定义的 thinkphp-queue 驱动

//TBD

4.3 编写消息队列的单元测试

//TBD

4.4 与其他PHP消息队列库的对比

TP5的消息队列与Laravel的消息队列比较相似,下面是与laravel 中的消息队列的一些对比:

thinkphp-queue (v1.1.2)laravel-queue (v5.3)
内置的驱动Database,Redis,Sync,TopThinkDatabase,Redis, Sync(在laravel中称为 null)。
Redis驱动要求安装redis的C扩展安装 predis 包 + LUA脚本
推送任务允许推送 消费者类名,消费者对象允许推送消费者类名,消费者对象,闭包
失败任务处理触发失败回调事件 (有Bug)触发失败回调事件 + 移动任务到 failed_jobs表?
消息订阅subscribe 命令+ Topthink驱动(注:未实现/未提供)subscribe 命令 + 安装IronMQ 驱动
删除任务消费者类中手动删除任务完成后自动删除
推送到多个队列需自己实现原生支持
延迟执行支持 (有Bug)支持
消息重发支持支持
检查已执行次数原生支持需在消费者类中显式 use 相关的 trait
执行方式work 模式 + listen 模式work 模式 + listen 模式
进程命令开启,停止,重启开启,停止,重启
任务命令展示失败任务列表,重试某个失败任务,删除某个失败任务
支持的事件失败回调事件失败回调事件,支持消费前事件,消费后事件

来源地址:https://blog.csdn.net/qq_42958118/article/details/128144243

阅读原文内容投诉

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

软考中级精品资料免费领

  • 历年真题答案解析
  • 备考技巧名师总结
  • 高频考点精准押题
  • 2024年上半年信息系统项目管理师第二批次真题及答案解析(完整版)

    难度     801人已做
    查看
  • 【考后总结】2024年5月26日信息系统项目管理师第2批次考情分析

    难度     348人已做
    查看
  • 【考后总结】2024年5月25日信息系统项目管理师第1批次考情分析

    难度     311人已做
    查看
  • 2024年上半年软考高项第一、二批次真题考点汇总(完整版)

    难度     432人已做
    查看
  • 2024年上半年系统架构设计师考试综合知识真题

    难度     220人已做
    查看

相关文章

发现更多好内容

猜你喜欢

AI推送时光机
位置:首页-资讯-后端开发
咦!没有更多了?去看看其它编程学习网 内容吧
首页课程
资料下载
问答资讯