文章详情

短信预约-IT技能 免费直播动态提醒

请输入下面的图形验证码

提交验证

短信预约提醒成功

XXL-JOB真的要凉了?出现了一个王炸级别的分布式任务调度与计算框架?

2024-11-30 04:51

关注

适用场景

同类产品对比

基本概念

分组概念

核心概念

  1. 单机任务(STANDALONE):一个 JobInstance 对应一个 Task
  2. 广播任务(BROADCAST):一个 JobInstance 对应 N 个 Task,N为集群机器数量,即每一台机器都会生成一个 Task
  3. Map/MapReduce任务:一个 JobInstance 对应若干个 Task,由开发者手动 map 产生

扩展概念

定时任务类型

备注:固定延迟和固定频率任务统称秒级任务,这两种任务无法被停止,只有任务被关闭或删除时才能真正停止任务。

搭建PowerJob环境

本地启动

初始化项目

git clone https://github.com/PowerJob/PowerJob.git

导入 IDE,源码结构如下,我们需要启动调度服务器(powerjob-server),同时在 samples 工程中编写自己的处理器代码

启动调度服务器

  1. 创建数据库(仅需要创建数据库):找到你的 DB,运行 SQL CREATE DATABASE IF NOT EXISTS `powerjob-daily` DEFAULT CHARSET utf8mb4,搞定~
  2. 修改配置文件:配置文件的说明官方文档写的非常详细,此处不再赘述。需要修改的地方为数据库配置spring.datasource.core.jdbc-url、spring.datasource.core.username和spring.datasource.core.password,当然,有 mongoDB 的同学也可以修改spring.data.mongodb.uri以获取完全版体验。

powerjob-server 日常环境配置文件:application-daily.properties

oms.env=DAILY
logging.cnotallow=classpath:logback-dev.xml


####### 外部数据库配置(需要用户更改为自己的数据库配置) #######
spring.datasource.core.driver-class-name=com.mysql.cj.jdbc.Driver
spring.datasource.core.jdbc-url=jdbc:mysql://localhost:3306/powerjob-daily?useUnicode=true&characterEncoding=UTF-8&serverTimeznotallow=Asia/Shanghai
spring.datasource.core.username=root
spring.datasource.core.password=No1Bug2Please3!
spring.datasource.core.hikari.maximum-pool-size=20
spring.datasource.core.hikari.minimum-idle=5


####### mongoDB配置,非核心依赖,通过配置 oms.mongodb.enable=false 来关闭 #######
oms.mongodb.enable=true
spring.data.mongodb.uri=mongodb://localhost:27017/powerjob-daily


####### 邮件配置(不需要邮件报警可以删除以下配置来避免报错) #######
spring.mail.host=smtp.163.com
spring.mail.username=zqq@163.com
spring.mail.password=GOFZPNARMVKCGONV
spring.mail.properties.mail.smtp.auth=true
spring.mail.properties.mail.smtp.starttls.enable=true
spring.mail.properties.mail.smtp.starttls.required=true


####### 资源清理配置 #######
oms.instanceinfo.retentinotallow=1
oms.container.retention.local=1
oms.container.retention.remote=-1


####### 缓存配置 #######
oms.instance.metadata.cache.size=1024
  1. 启动应用:完成配置文件的修改后,可以直接通过启动类 tech.powerjob.server.PowerJobServerApplication 启动调度服务器(注意:需要使用 daily 配置文件启动,可自行百度搜索“SpringBoot 指定配置文件启动”),观察启动日志,查看是否启动成功~启动成功后,访问 http://127.0.0.1:7700/ ,如果能顺利出现 Web 界面,则说明调度服务器启动成功!
  2. 注册应用:点击主页应用注册按钮,填入 powerjob-agent-test 和控制台密码(用于进入控制台),注册示例应用(当然你也可以注册其他的 appName,只是别忘记在示例程序中同步修改~)

图片

docker-compose启动

环境要求

本地需要安装docker和docker-compose

下载代码

git clone --depth=1 https://github.com/PowerJob/PowerJob.git

运行

cd PowerJob
docker-compose up
docker-compose up -d

刚开始启动时,powerjob-worker-samples会启动失败,等powerjob-server启动成功后,powerjob-worker-samples才会启动成功。这大概需要几分钟。

运行成功后,浏览器访问 http://127.0.0.1:7700/ 
应用名称:powerjob-worker-samples
密码:powerjob123

停止

docker-compose down
Stopping powerjob-worker-samples ... done
Stopping powerjob-server         ... done
Stopping powerjob-mysql          ... done
Removing powerjob-worker-samples ... done
Removing powerjob-server         ... done
Removing powerjob-mysql          ... done
cd PowerJob
rm -rf powerjob-data

SpringBoot集成PowerJob

添加相关maven依赖


  tech.powerjob
  powerjob-worker-spring-boot-starter
  ${latest.powerjob.version}

配置文件配置

powerjob:
  worker:
    # akka 工作端口,可选,默认 27777
    akka-port: 27777
    # 接入应用名称,用于分组隔离,推荐填写 本 Java 项目名称
    app-name: ${spring.application.name}
    # 调度服务器地址,IP:Port 或 域名,多值逗号分隔
    server-address: 81.70.117.188:7700
    # 持久化方式,可选,默认 disk
    store-strategy: disk
    # 任务返回结果信息的最大长度,超过这个长度的信息会被截断,默认值 8192
    max-result-length: 8192
    # 单个任务追加的工作流上下文最大长度,超过这个长度的会被直接丢弃,默认值 8192
    max-appended-wf-context-length: 8192

处理器(Processor)开发

处理器概述

基本概念

PowerJob 支持 Python、Shell、HTTP、SQL 等众多通用任务的处理,开发者只需要引入依赖,在控制台配置好相关参数即可,关于这部分详见 官方处理器 ,此处不再赘述。本章将重点阐述 Java 处理器开发方法与使用技巧。

单机处理器(BasicProcessor)对应了单机任务,即某个任务的某次运行只会有某一台机器的某一个线程参与运算。

广播处理器(BroadcastProcessor)对应了广播任务,即某个任务的某次运行会调动集群内所有机器参与运算。

Map处理器(MapProcessor)对应了Map任务,即某个任务在运行过程中,允许产生子任务并分发到其他机器进行运算。

MapReduce 处理器(MapReduceProcessor)对应了 MapReduce 任务,在 Map 任务的基础上,增加了所有任务结束后的汇总统计。

入参 TaskContext

TaskContext 包含了本次任务的上下文信息,具体信息如下

属性列表(红色标注的为常用属性)

属性名称

意义/用法

jobId

任务 ID,开发者一般无需关心此参数

instanceId

任务实例 ID,全局唯一,开发者一般无需关心此参数

subInstanceId

子任务实例 ID,秒级任务使用,开发者一般无需关心此参数

taskId

采用链式命名法的 ID,在某个任务实例内唯一,开发者一般无需关心此参数

taskName

task 名称,Map/MapReduce 任务的子任务的值为开发者指定,否则为系统默认值,开发者一般无需关心此参数

jobParams

任务参数

对于非工作流中的任务其值等同于控制台录入的任务参数;如果该任务为工作流中的任务且有配置节点参数信息,那么接收到的是节点配置的参数信息

instanceParams

任务实例参数

对于非工作流中的任务 其值 等同于 OpenAPI 传递的实例参数,非 OpenAPI 触发的任务则一定为空。如果该任务为工作流中的任务那么这里实际接收到的是工作流上下文信息,建议使用 getWorkflowContext 方法获取上下文信息 


maxRetryTimes

Task 的最大重试次数

currentRetryTimes

Task 的当前重试次数,和 maxRetryTimes 联合起来可以判断当前是否为该 Task 的最后一次运行机会

subTask

子 Task,Map/MapReduce 处理器专属,开发者调用map方法时传递的子任务列表中的某一个

omsLogger

在线日志,用法同 Slf4J,记录的日志可以直接通过控制台查看,非常便捷和强大!不过使用过程中需要注意频率,滥用在线日志会对 Server 造成巨大的压力

userContext

用户在 PowerJobWorkerConfig 中设置的自定义上下文

workflowContext

工作流上下文,更多信息见下方说明

工作流上下文( WorkflowContext )

该属性是 v4.0.0 版本的重大变更之一,移除了原来的参数传递机制,提供了 API 让开发者可以更加灵活便捷地在工作流中实现信息的传递。

属性列表

属性名称

意义/用法

wfInstanceId

工作流实例 ID

data

工作流上下文数据,键值对

appendedContextData

当前任务向工作流上下文中追加的数据。在任务执行完成后 ProcessorTracker 会将其上报给 TaskTracker,TaskTracker 在当前任务执行完成后会将这个信息上报给 server ,追加到当前的工作流上下文中,供下游任务消费

上游任务通过  WorkflowContext#appendData2WfContext(String key,Object value)  方法向工作流上下文中追加数据,下游任务便可以通过 WorkflowContext#fetchWorkflowContext()  方法获取到相应的数据进行消费。注意,当追加的上下文信息的 key 已经存在于当前的上下文中时,新的 value 会覆盖之前的值。另外,每次任务实例追加的上下文数据大小也会受到 worker 的配置项  powerjob.worker.max-appended-wf-context-length 的限制,超过这个长度的会被直接丢弃。

返回值 ProcessResult

方法的返回值为 ProcessResult,代表了本次 Task 执行的结果,包含 success 和 msg 两个属性,分别用于传递 Task 是否执行成功和 Task 需要返回的信息。

处理器开发示例

单机处理器:BasicProcessor

单机执行的策略下,server 会在所有可用 worker 中选取健康度最佳的机器进行执行。单机执行任务需要实现接口 BasicProcessor,代码示例如下:


@Component
@Slf4j
public class StandaloneProcessor implements BasicProcessor {


    @Override
    public ProcessResult process(TaskContext context) throws Exception {
        OmsLogger omsLogger = context.getOmsLogger();
        omsLogger.info("处理器启动成功,context 是 {}.", context);


        log.info("单机处理器正在处理");
        log.info(context.getJobParams());


        omsLogger.info("处理器执行结束");


        boolean success = true;


        return new ProcessResult(success, context + ": " + success);
    }
}

执行结果

图片

广播处理器:BroadcastProcessor

广播执行的策略下,所有机器都会被调度执行该任务。为了便于资源的准备和释放,广播处理器在BasicProcessor 的基础上额外增加了 preProcess 和 postProcess 方法,分别在整个集群开始之前/结束之后选一台机器执行相关方法。代码示例如下:

@Slf4j
@Component
public class BroadcastProcessorDemo implements BroadcastProcessor {


    @Override
    public ProcessResult preProcess(TaskContext context) throws Exception {
        OmsLogger omsLogger = context.getOmsLogger();
        omsLogger.info("预执行,会在所有 worker 执行 process 方法前调用");
        log.info("预执行,会在所有 worker 执行 process 方法前调用");
        // 预执行,会在所有 worker 执行 process 方法前调用
        return new ProcessResult(true, "init success");
    }


    @Override
    public ProcessResult process(TaskContext context) throws Exception {
        OmsLogger omsLogger = context.getOmsLogger();
        // 撰写整个worker集群都会执行的代码逻辑
        omsLogger.info("撰写整个worker集群都会执行的代码逻辑");
        log.info("撰写整个worker集群都会执行的代码逻辑");
        return new ProcessResult(true, "release resource success");
    }


    @Override
    public ProcessResult postProcess(TaskContext context, List taskResults) throws Exception {


        // taskResults 存储了所有worker执行的结果(包括preProcess)
        // 收尾,会在所有 worker 执行完毕 process 方法后调用,该结果将作为最终的执行结果
        OmsLogger omsLogger = context.getOmsLogger();
        omsLogger.info("收尾,会在所有 worker 执行完毕 process 方法后调用,该结果将作为最终的执行结果");
        log.info("收尾,会在所有 worker 执行完毕 process 方法后调用,该结果将作为最终的执行结果");
        return new ProcessResult(true, "process success");
    }
}

执行结果

图片

并行处理器:MapReduceProcessor

MapReduce 是最复杂也是最强大的一种执行器,它允许开发者完成任务的拆分,将子任务派发到集群中其他Worker 执行,是执行大批量处理任务的不二之选!实现 MapReduce 处理器需要继承 MapReduceProcessor类,具体用法如下示例代码所示:

@Slf4j
@Component
public class MapReduceProcessorDemo implements MapReduceProcessor {
    @Override
    public ProcessResult process(TaskContext context) throws Exception {
        final OmsLogger omsLogger = context.getOmsLogger();
        // 判断是否为根任务
        if (isRootTask()) {


            // 构造子任务
            List subTaskList = Lists.newLinkedList();
            SubTask subTask=new SubTask();
            subTask.setSiteId(1L);
            subTask.setName("iron.guo");
            subTaskList.add(subTask);
            


            // 调用 map 方法,派发子任务(map 可能会失败并抛出异常,做好业务操作)
            map(subTaskList, "DATA_PROCESS_TASK");
            omsLogger.info("执行根任务-派发子任务");
            return new ProcessResult(true, "ROOT_PROCESS_SUCCESS");
        }


        // 非子任务,可根据 subTask 的类型 或 TaskName 来判断分支
        if (context.getSubTask() instanceof SubTask) {
            omsLogger.info("执行子任务开始");
            omsLogger.info("Get from SubTask : name is {} and id is {}",((SubTask) context.getSubTask()).getName(),((SubTask) context.getSubTask()).getSiteId());
            // 执行子任务,注:子任务人可以 map 产生新的子任务,可以构建任意级的 MapReduce 处理器
            return new ProcessResult(true, "PROCESS_SUB_TASK_SUCCESS");
        }


        return new ProcessResult(false, "UNKNOWN_BUG");
    }


    @Override
    public ProcessResult reduce(TaskContext taskContext, List taskResults) {


        // 所有 Task 执行结束后,reduce 将会被执行
        // taskResults 保存了所有子任务的执行结果


        // 用法举例,统计执行结果
        AtomicLong successCnt = new AtomicLong(0);
        taskResults.forEach(tr -> {
            if (tr.isSuccess()) {
                successCnt.incrementAndGet();
            }
        });
        // 该结果将作为任务最终的执行结果
        return new ProcessResult(true, "success task num:" + successCnt.get());
    }


    // 自定义的子任务
    @Getter
    @Setter
    @NoArgsConstructor
    @AllArgsConstructor
    private static class SubTask {
        private Long siteId;
        private String name;




    }


}

执行结果

图片

注:Map 处理器相当于 MapReduce 处理器的阉割版本(阉割了 reduce 方法),此处不再单独举例。

工作流

图片

点击右上角按钮 新建工作流,即可录入新的工作流,具体界面和说明如下所示。

CRON -> 填写 CRON 表达式(在线生成网站)

API -> 不需要填写任何参数,表明该任务由 OpenAPI 触发

DAG 操作指南

编辑依赖关系

v4.0.0 以后支持节点的自由拖拉拽,不用再点点点了,哈哈哈 ~

导入任务节点

任务为之前创建的任务,可用工作流形式串联起来执行。

图片

编辑节点信息

点击需要编辑的节点,在右侧会弹出一个编辑框,如下图所示

图片

特殊节点说明

判断节点

图片

判断节点 不允许失败跳过以及禁用,节点参数中存储的是 Groovy 代码(执行 Groovy 代码时会将当前工作流上下文作为 context 变量注入到代码执行的上下文中),其执行结果仅能返回 "true" 或者 "false",同时判断节点仅有且必须有两条“输出”路径。会根据该代码的执行结果决定下游需要执行的节点。这里处理的原则是, 仅 cancel 那些只能通过被 disable 掉的边可达的节点

举个两个栗子,灰色代表相应的边 或者 节点被 disable 或 cancel,菱形代表判断节点,假定执行结果为 true

图片

case 3 以及 case 4 中的节点 3 都会被 cancel ,因为它只能通过节点 1 -> 节点 3 的边可达(该边的属性为 false),但对于节点 5 而言,在 case 4 中因为判断节点 2 的执行结果为 true ,那么其可以通过节点 2 -> 节点 5 的边可达,所以不会被 disable 。

备注:如果需要根据上游节点的执行结果决定下游节点,可以将上游节点的执行结果注入上下文中,再在判断节点中做相应的判断。

工作流嵌套节点

图片

该节点代表对某个工作流的引用,节点的 jobId 属性存储的是工作流 id,其他属性和普通的任务节点一致。不允许出现循环引用以及多级嵌套的情况,即嵌套节点中指向的工作流一定是一个不含嵌套节点的工作流。

执行到该节点时,如果该节点处于启用状态,那么将启动该节点所引用工作流的一个新实例,待该实例执行完成后再同步更新该节点的状态。

注意,创建子工作流时,会透传当前的上下文作为工作流的实例参数,在子工作流执行完成时会合并子工作流的上下文至父工作流的上下文中。

重试子工作流不会联动重试父工作流,但失败的子工作流会随着父工作流的重试而原地重试(不会生成新的实例)

来源:HELLO程序员内容投诉

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

软考中级精品资料免费领

  • 历年真题答案解析
  • 备考技巧名师总结
  • 高频考点精准押题
  • 2024年上半年信息系统项目管理师第二批次真题及答案解析(完整版)

    难度     813人已做
    查看
  • 【考后总结】2024年5月26日信息系统项目管理师第2批次考情分析

    难度     354人已做
    查看
  • 【考后总结】2024年5月25日信息系统项目管理师第1批次考情分析

    难度     318人已做
    查看
  • 2024年上半年软考高项第一、二批次真题考点汇总(完整版)

    难度     435人已做
    查看
  • 2024年上半年系统架构设计师考试综合知识真题

    难度     224人已做
    查看

相关文章

发现更多好内容

猜你喜欢

AI推送时光机
位置:首页-资讯-后端开发
咦!没有更多了?去看看其它编程学习网 内容吧
首页课程
资料下载
问答资讯