文章详情

短信预约-IT技能 免费直播动态提醒

请输入下面的图形验证码

提交验证

短信预约提醒成功

超详细的分布式调度框架 Elastic-job 实践详解

2024-12-03 12:13

关注

本文转载自微信公众号「Java极客技术」,作者鸭血粉丝 。转载本文请联系Java极客技术公众号。  

一、介绍

在前几篇文章中,我们详细的介绍了 Quartz 的架构原理以及应用实践,虽然 Quartz 也可以通过集群方式来保证服务高可用,但是它也有一个的弊端,那就是服务节点数量的增加,并不能提升任务的执行效率,即不能实现水平扩展!

之所以产生这样的结果,是因为 Quartz 在分布式集群环境下是通过数据库锁方式来实现有且只有一个有效的服务节点来运行服务,从而保证服务在集群环境下定时任务不会被重复调用!

如果需要运行的定时任务很少的话,使用 Quartz 不会有太大的问题,但是如果 现在有这么一个需求,例如理财产品,每天6点系统需要计算每个账户昨天的收益,假如这个理财产品,有几个亿的用户,如果都在一个服务实例上跑,可能第二天都无法处理完这项任务!

类似这样场景还有很多很多,很显然 Quartz 很难满足我们这种大批量、任务执行周期长的任务调度!

因此短板,当当网基于 Quartz 开发了一套适合在分布式环境下能高效率的使用服务器资源的 Elastic-Job 定时任务框架!

Elastic-Job-Lite最大的亮点就是支持弹性扩容缩容,怎么实现的呢?

比如现在有个任务要执行,如果将任务进行分片成10个,那么可以同时在10个服务实例上并行执行,互相不影响,从而大大的提升了任务执行效率,并且充分的利用服务器资源!

对于上面的理财产品,如果这个任务需要处理1个亿用户,那么我们可以通过水平扩展,比如对任务进行分片为500,让500个服务实例同时运行,每个服务实例处理20万条数据,不出意外的话,1 - 2个小时可以全部跑完,如果时间还是很长,还可以继续水平扩张,添加服务实例来运行!

2015 年,当当网将其开源,瞬间吸引了一大批程序员的关注,同时登顶开源中国第一名!

下面我们就一起来了解一下这款使用非常广泛的分布式调度框架。

二、项目架构介绍

Elastic-Job 最开始只有一个 elastic-job-core 的项目,定位轻量级、无中心化,最核心的服务就是支持弹性扩容和数据分片!

从 2.X 版本以后,主要分为 Elastic-Job-Lite 和 Elastic-Job-Cloud 两个子项目。

其中,Elastic-Job-Lite 定位为轻量级 无 中 心 化 解 决 方 案 , 使 用jar 包 的 形 式 提 供 分 布 式 任 务 的 协 调 服 务 。

而 Elastic-Job-Cloud 使用 Mesos + Docker 的解决方案,额外提供资源治理、应用分发以及进程隔离等服务(跟 Lite 的区别只是部署方式不同,他们使用相同的 API,只要开发一次)。

今天我们主要介绍的是Elastic-Job-Lite,最主要的功能特性如下:

当然,还有失效转移、错过执行作业重触发等等功能,大家可以访问官网文档,以获取更多详细资料。

应用在各自的节点执行任务,通过 zookeeper 注册中心协调。节点注册、节点选举、任务分片、监听都在 E-Job 的代码中完成。下图是官网提供得架构图。

 

啥也不用多说了,下面我们直接通过实践介绍,更容易了解里面是怎么玩的!

三、应用实践

3.1、zookeeper 安装

elastic-job-lite,是直接依赖 zookeeper 的,因此在开发之前我们需要先准备好对应的 zookeeper 环境,关于 zookeeper 的安装过程,就不多说了,非常简单,网上都有教程!

3.2、elastic-job-lite-console 安装

elastic-job-lite-console,主要是一个任务作业可视化界面管理系统。

可以单独部署,与平台不关,主要是通过配置注册中心和数据源来抓取数据。

获取的方式也很简单,直接访问https://github.com/apache/shardingsphere-elasticjob地址,然后切换到2.1.5的版本号,然后执行mvn clean install进行打包,获取对应的安装包将其解压,进行bin文件夹启动服务即可!

 

如果你的网速像蜗牛一样的慢,还有一个办法就是从这个地址https://gitee.com/elasticjob/elastic-job获取对应的源码!

启动服务后,在浏览器访问http://127.0.0.1:8899,输入账户、密码(都是root)即可进入控制台页面,类似如下界面!

 

进入之后,将上文所在的 zookeeper 注册中心进行配置,包括数据库 mysql 的数据源也可以配置一下!

3.3、创建工程

本文采用springboot来搭建工程为例,创建工程并添加elastic-job-lite依赖!

  1. -- 引入elastic-job-lite核心模块 --> 
  2.  
  3.     com.dangdang 
  4.     elastic-job-lite-core 
  5.     2.1.5 
  6.  
  7.  
  8. -- 使用springframework自定义命名空间时引入 --> 
  9.  
  10.     com.dangdang 
  11.     elastic-job-lite-spring 
  12.     2.1.5 
  13.  

在配置文件application.properties中提前配置好 zookeeper 注册中心相关信息!

  1. #zookeeper config 
  2. zookeeper.serverList=127.0.0.1:2181 
  3. zookeeper.namespace=example-elastic-job-test 

3.4、新建 ZookeeperConfig 配置类

  1. @Configuration 
  2. @ConditionalOnExpression("'${zookeeper.serverList}'.length() > 0"
  3. public class ZookeeperConfig { 
  4.  
  5.      
  6.     @Bean(initMethod = "init"
  7.     public ZookeeperRegistryCenter zookeeperRegistryCenter(@Value("${zookeeper.serverList}") String serverList,  
  8.                                                            @Value("${zookeeper.namespace}") String namespace){ 
  9.         return new ZookeeperRegistryCenter(new ZookeeperConfiguration(serverList,namespace)); 
  10.     } 
  11.  

3.5、新建任务处理类

elastic-job支持三种类型的作业任务处理!

3.6、新建 Simple 类型作业

编写一个SimpleJob接口的实现类MySimpleJob,当前工作主要是打印一条日志。

  1. @Slf4j 
  2. public class MySimpleJob implements SimpleJob { 
  3.  
  4.     @Override 
  5.     public void execute(ShardingContext shardingContext) { 
  6.         log.info(String.format("Thread ID: %s, 作业分片总数: %s, " + 
  7.                         "当前分片项: %s.当前参数: %s," + 
  8.                         "作业名称: %s.作业自定义参数: %s" 
  9.                 , 
  10.                 Thread.currentThread().getId(), 
  11.                 shardingContext.getShardingTotalCount(), 
  12.                 shardingContext.getShardingItem(), 
  13.                 shardingContext.getShardingParameter(), 
  14.                 shardingContext.getJobName(), 
  15.                 shardingContext.getJobParameter() 
  16.         )); 
  17.     } 

创建一个MyElasticJobListener任务监听器,用于监听MySimpleJob的任务执行情况。

  1. @Slf4j 
  2. public class MyElasticJobListener implements ElasticJobListener { 
  3.  
  4.     private long beginTime = 0; 
  5.  
  6.     @Override 
  7.     public void beforeJobExecuted(ShardingContexts shardingContexts) { 
  8.         beginTime = System.currentTimeMillis(); 
  9.         log.info("===>{} MyElasticJobListener BEGIN TIME: {} <===",shardingContexts.getJobName(),  DateFormatUtils.format(new Date(), "yyyy-MM-dd HH:mm:ss")); 
  10.     } 
  11.  
  12.     @Override 
  13.     public void afterJobExecuted(ShardingContexts shardingContexts) { 
  14.         long endTime = System.currentTimeMillis(); 
  15.         log.info("===>{} MyElasticJobListener END TIME: {},TOTAL CAST: {} <===",shardingContexts.getJobName(), DateFormatUtils.format(new Date(), "yyyy-MM-dd HH:mm:ss"), endTime - beginTime); 
  16.     } 
  17.  

创建一个MySimpleJobConfig类,将MySimpleJob其注入到zookeeper。

  1. @Configuration 
  2. public class MySimpleJobConfig { 
  3.  
  4.      
  5.     @Value("${simpleJob.mySimpleJob.name}"
  6.     private String mySimpleJobName; 
  7.  
  8.      
  9.     @Value("${simpleJob.mySimpleJob.cron}"
  10.     private String mySimpleJobCron; 
  11.  
  12.      
  13.     @Value("${simpleJob.mySimpleJob.shardingTotalCount}"
  14.     private int mySimpleJobShardingTotalCount; 
  15.  
  16.      
  17.     @Value("${simpleJob.mySimpleJob.shardingItemParameters}"
  18.     private String mySimpleJobShardingItemParameters; 
  19.  
  20.      
  21.     @Value("${simpleJob.mySimpleJob.jobParameters}"
  22.     private String mySimpleJobParameters; 
  23.  
  24.     @Autowired 
  25.     private ZookeeperRegistryCenter registryCenter; 
  26.  
  27.     @Bean 
  28.     public MySimpleJob mySimpleJob() { 
  29.         return new MySimpleJob(); 
  30.     } 
  31.  
  32.     @Bean(initMethod = "init"
  33.     public JobScheduler simpleJobScheduler(final MySimpleJob mySimpleJob) { 
  34.   //配置任务监听器 
  35.    MyElasticJobListener elasticJobListener = new MyElasticJobListener(); 
  36.         return new SpringJobScheduler(mySimpleJob, registryCenter, getLiteJobConfiguration(), elasticJobListener); 
  37.     } 
  38.  
  39.     private LiteJobConfiguration getLiteJobConfiguration() { 
  40.         // 定义作业核心配置 
  41.         JobCoreConfiguration simpleCoreConfig = JobCoreConfiguration.newBuilder(mySimpleJobName, mySimpleJobCron, mySimpleJobShardingTotalCount). 
  42.                 shardingItemParameters(mySimpleJobShardingItemParameters).jobParameter(mySimpleJobParameters).build(); 
  43.         // 定义SIMPLE类型配置 
  44.         SimpleJobConfiguration simpleJobConfig = new SimpleJobConfiguration(simpleCoreConfig, MySimpleJob.class.getCanonicalName()); 
  45.         // 定义Lite作业根配置 
  46.         LiteJobConfiguration simpleJobRootConfig = LiteJobConfiguration.newBuilder(simpleJobConfig).overwrite(true).build(); 
  47.         return simpleJobRootConfig; 
  48.  
  49.     } 

在配置文件application.properties中配置好对应的mySimpleJob参数!

  1. #elastic job 
  2. #simpleJob类型的job 
  3. simpleJob.mySimpleJob.name=mySimpleJob 
  4. simpleJob.mySimpleJob.cron=0/15 * * * * ? 
  5. simpleJob.mySimpleJob.shardingTotalCount=3 
  6. simpleJob.mySimpleJob.shardingItemParameters=0=a,1=b,2=c 
  7. simpleJob.mySimpleJob.jobParameters=helloWorld 

运行程序,看看效果如何?

 

在上图demo中,配置的分片数为3,这个时候会有3个线程进行同时执行任务,因为都是在一台机器上执行的,这个任务被执行来3次,下面修改一下端口配置,创建三个相同的服务实例,在看看效果如下:

 

很清晰的看到任务被执行一次!

3.7、新建 DataFlowJob 类型作业

DataFlowJob 类型的任务配置和SimpleJob类似,操作也很简单!

创建一个DataflowJob类型的实现类MyDataFlowJob。

  1. @Slf4j 
  2. public class MyDataFlowJob implements DataflowJob { 
  3.  
  4.     private boolean flag = false
  5.  
  6.     @Override 
  7.     public List fetchData(ShardingContext shardingContext) { 
  8.         log.info("开始获取数据"); 
  9.         if (flag) { 
  10.             return null
  11.         } 
  12.         return Arrays.asList("qingshan""jack""seven"); 
  13.     } 
  14.  
  15.     @Override 
  16.     public void processData(ShardingContext shardingContext, List data) { 
  17.         for (String val : data) { 
  18.             // 处理完数据要移除掉,不然就会一直跑,处理可以在上面的方法里执行。这里采用 flag 
  19.             log.info("开始处理数据:" + val); 
  20.         } 
  21.         flag = true
  22.     } 

接着创建MyDataFlowJob的配置类,将其注入到zookeeper注册中心。

  1. Configuration 
  2. public class MyDataFlowJobConfig { 
  3.  
  4.      
  5.     @Value("${dataflowJob.myDataflowJob.name}"
  6.     private String jobName; 
  7.  
  8.      
  9.     @Value("${dataflowJob.myDataflowJob.cron}"
  10.     private String jobCron; 
  11.  
  12.      
  13.     @Value("${dataflowJob.myDataflowJob.shardingTotalCount}"
  14.     private int jobShardingTotalCount; 
  15.  
  16.      
  17.     @Value("${dataflowJob.myDataflowJob.shardingItemParameters}"
  18.     private String jobShardingItemParameters; 
  19.  
  20.      
  21.     @Value("${dataflowJob.myDataflowJob.jobParameters}"
  22.     private String jobParameters; 
  23.  
  24.     @Autowired 
  25.     private ZookeeperRegistryCenter registryCenter; 
  26.  
  27.  
  28.     @Bean 
  29.     public MyDataFlowJob myDataFlowJob() { 
  30.         return new MyDataFlowJob(); 
  31.     } 
  32.  
  33.     @Bean(initMethod = "init"
  34.     public JobScheduler dataFlowJobScheduler(final MyDataFlowJob myDataFlowJob) { 
  35.         MyElasticJobListener elasticJobListener = new MyElasticJobListener(); 
  36.         return new SpringJobScheduler(myDataFlowJob, registryCenter, getLiteJobConfiguration(), elasticJobListener); 
  37.     } 
  38.  
  39.     private LiteJobConfiguration getLiteJobConfiguration() { 
  40.         // 定义作业核心配置 
  41.         JobCoreConfiguration dataflowCoreConfig = JobCoreConfiguration.newBuilder(jobName, jobCron, jobShardingTotalCount). 
  42.                 shardingItemParameters(jobShardingItemParameters).jobParameter(jobParameters).build(); 
  43.         // 定义DATAFLOW类型配置 
  44.         DataflowJobConfiguration dataflowJobConfig = new DataflowJobConfiguration(dataflowCoreConfig, MyDataFlowJob.class.getCanonicalName(), false); 
  45.         // 定义Lite作业根配置 
  46.         LiteJobConfiguration dataflowJobRootConfig = LiteJobConfiguration.newBuilder(dataflowJobConfig).overwrite(true).build(); 
  47.         return dataflowJobRootConfig; 
  48.  
  49.     } 

最后,在配置文件application.properties中配置好对应的myDataflowJob参数!

  1. #dataflow类型的job 
  2. dataflowJob.myDataflowJob.name=myDataflowJob 
  3. dataflowJob.myDataflowJob.cron=0/15 * * * * ? 
  4. dataflowJob.myDataflowJob.shardingTotalCount=1 
  5. dataflowJob.myDataflowJob.shardingItemParameters=0=a,1=b,2=c 
  6. dataflowJob.myDataflowJob.jobParameters=myDataflowJobParamter 

运行程序,看看效果如何?

 

需要注意的地方是,如果配置的是流式处理类型,它会不停的拉取数据、处理数据,在拉取的时候,如果返回为空,就不会处理数据!

如果配置的是非流式处理类型,和上面介绍的simpleJob类型,处理一样!

3.8、新建 ScriptJob 类型作业

ScriptJob 类型的任务配置和上面类似,主要是用于定时执行某个脚本,一般用的比较少!

因为目标是脚本,没有执行的任务,所以无需编写任务作业类型!

只需要编写一个ScriptJob类型的配置类即可,命令是echo 'Hello World !内容!

  1. @Configuration 
  2. public class MyScriptJobConfig { 
  3.  
  4.      
  5.     @Value("${scriptJob.myScriptJob.name}"
  6.     private String jobName; 
  7.  
  8.      
  9.     @Value("${scriptJob.myScriptJob.cron}"
  10.     private String jobCron; 
  11.  
  12.      
  13.     @Value("${scriptJob.myScriptJob.shardingTotalCount}"
  14.     private int jobShardingTotalCount; 
  15.  
  16.      
  17.     @Value("${scriptJob.myScriptJob.shardingItemParameters}"
  18.     private String jobShardingItemParameters; 
  19.  
  20.      
  21.     @Value("${scriptJob.myScriptJob.jobParameters}"
  22.     private String jobParameters; 
  23.  
  24.     @Autowired 
  25.     private ZookeeperRegistryCenter registryCenter; 
  26.  
  27.  
  28.     @Bean(initMethod = "init"
  29.     public JobScheduler scriptJobScheduler() { 
  30.         MyElasticJobListener elasticJobListener = new MyElasticJobListener(); 
  31.         return new JobScheduler(registryCenter, getLiteJobConfiguration(), elasticJobListener); 
  32.     } 
  33.  
  34.     private LiteJobConfiguration getLiteJobConfiguration() { 
  35.         // 定义作业核心配置 
  36.         JobCoreConfiguration scriptCoreConfig = JobCoreConfiguration.newBuilder(jobName, jobCron, jobShardingTotalCount). 
  37.                 shardingItemParameters(jobShardingItemParameters).jobParameter(jobParameters).build(); 
  38.         // 定义SCRIPT类型配置 
  39.         ScriptJobConfiguration scriptJobConfig = new ScriptJobConfiguration(scriptCoreConfig, "echo 'Hello World !'"); 
  40.         // 定义Lite作业根配置 
  41.         LiteJobConfiguration scriptJobRootConfig = LiteJobConfiguration.newBuilder(scriptJobConfig).overwrite(true).build(); 
  42.         return scriptJobRootConfig; 
  43.  
  44.     } 

在配置文件application.properties中配置好对应的myScriptJob参数!

  1. #script类型的job 
  2. scriptJob.myScriptJob.name=myScriptJob 
  3. scriptJob.myScriptJob.cron=0/15 * * * * ? 
  4. scriptJob.myScriptJob.shardingTotalCount=3 
  5. scriptJob.myScriptJob.shardingItemParameters=0=a,1=b,2=c 
  6. scriptJob.myScriptJob.jobParameters=myScriptJobParamter 

运行程序,看看效果如何?

 

3.9、将任务状态持久化到数据库

可能有的人会发出疑问,elastic-job是如何存储数据的,用ZooInspector客户端链接zookeeper注册中心,你发现对应的任务配置被存储到相应的树根上!

 

而具体作业任务执行轨迹和状态结果是不会存储到zookeeper,需要我们在项目中通过数据源方式进行持久化!

将任务状态持久化到数据库配置过程也很简单,只需要在对应的配置类上注入数据源即可,以MySimpleJobConfig为例,代码如下:

  1. @Configuration 
  2. public class MySimpleJobConfig { 
  3.  
  4.      
  5.     @Value("${simpleJob.mySimpleJob.name}"
  6.     private String mySimpleJobName; 
  7.  
  8.      
  9.     @Value("${simpleJob.mySimpleJob.cron}"
  10.     private String mySimpleJobCron; 
  11.  
  12.      
  13.     @Value("${simpleJob.mySimpleJob.shardingTotalCount}"
  14.     private int mySimpleJobShardingTotalCount; 
  15.  
  16.      
  17.     @Value("${simpleJob.mySimpleJob.shardingItemParameters}"
  18.     private String mySimpleJobShardingItemParameters; 
  19.  
  20.      
  21.     @Value("${simpleJob.mySimpleJob.jobParameters}"
  22.     private String mySimpleJobParameters; 
  23.  
  24.     @Autowired 
  25.     private ZookeeperRegistryCenter registryCenter; 
  26.  
  27.     @Autowired 
  28.     private DataSource dataSource;; 
  29.  
  30.  
  31.     @Bean 
  32.     public MySimpleJob stockJob() { 
  33.         return new MySimpleJob(); 
  34.     } 
  35.  
  36.     @Bean(initMethod = "init"
  37.     public JobScheduler simpleJobScheduler(final MySimpleJob mySimpleJob) { 
  38.         //添加事件数据源配置 
  39.         JobEventConfiguration jobEventConfig = new JobEventRdbConfiguration(dataSource); 
  40.         MyElasticJobListener elasticJobListener = new MyElasticJobListener(); 
  41.         return new SpringJobScheduler(mySimpleJob, registryCenter, getLiteJobConfiguration(), jobEventConfig, elasticJobListener); 
  42.     } 
  43.  
  44.     private LiteJobConfiguration getLiteJobConfiguration() { 
  45.         // 定义作业核心配置 
  46.         JobCoreConfiguration simpleCoreConfig = JobCoreConfiguration.newBuilder(mySimpleJobName, mySimpleJobCron, mySimpleJobShardingTotalCount). 
  47.                 shardingItemParameters(mySimpleJobShardingItemParameters).jobParameter(mySimpleJobParameters).build(); 
  48.         // 定义SIMPLE类型配置 
  49.         SimpleJobConfiguration simpleJobConfig = new SimpleJobConfiguration(simpleCoreConfig, MySimpleJob.class.getCanonicalName()); 
  50.         // 定义Lite作业根配置 
  51.         LiteJobConfiguration simpleJobRootConfig = LiteJobConfiguration.newBuilder(simpleJobConfig).overwrite(true).build(); 
  52.         return simpleJobRootConfig; 
  53.  
  54.     } 

同时,需要在配置文件application.properties中配置好对应的datasource参数!

  1. spring.datasource.url=jdbc:mysql://127.0.0.1:3306/example-elastic-job-test 
  2. spring.datasource.username=root 
  3. spring.datasource.password=root 
  4. spring.datasource.driver-class-name=com.mysql.jdbc.Driver 

运行程序,然后在elastic-job-lite-console控制台配置对应的数据源!

 

最后,点击【作业轨迹】即可查看对应作业执行情况!

 

四、小结

本文主要围绕elasticjob的使用进行简单介绍,希望大家有所收获!

在分布式环境环境下,elastic-job-lite支持的弹性扩容、任务分片是最大的亮点,在实际使用的时候,任务分片总数尽可能大于服务实例个数,并且是倍数关系,这样任务在分片的时候,会更加均匀!

如果想深入的了解elasticjob,大家可以访问官方文档,获取更加详细的使用教程!

五、参考1、elasticjob - 官方文档

 

博客园 - 吴振照 - 任务调度之 Elastic Job

 

来源:Java极客技术内容投诉

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

软考中级精品资料免费领

  • 历年真题答案解析
  • 备考技巧名师总结
  • 高频考点精准押题
  • 2024年上半年信息系统项目管理师第二批次真题及答案解析(完整版)

    难度     813人已做
    查看
  • 【考后总结】2024年5月26日信息系统项目管理师第2批次考情分析

    难度     354人已做
    查看
  • 【考后总结】2024年5月25日信息系统项目管理师第1批次考情分析

    难度     318人已做
    查看
  • 2024年上半年软考高项第一、二批次真题考点汇总(完整版)

    难度     435人已做
    查看
  • 2024年上半年系统架构设计师考试综合知识真题

    难度     224人已做
    查看

相关文章

发现更多好内容

猜你喜欢

AI推送时光机
位置:首页-资讯-后端开发
咦!没有更多了?去看看其它编程学习网 内容吧
首页课程
资料下载
问答资讯