这篇文章主要介绍了SpringBoot Schedule调度任务的动态管理方法是什么的相关知识,内容详细易懂,操作简单快捷,具有一定借鉴价值,相信大家阅读完这篇SpringBoot Schedule调度任务的动态管理方法是什么文章都会有所收获,下面我们一起来看看吧。
前言
定时任务动态管理分为两种方式:
方式一:Web前台配置Trigger触发器(关联Cron)、ThreadPoolTaskScheduler类创建Scheduler方式下进行Schedule调度任务的动态管理
方式二:基于已创建的Schedule调度任务的动态管理,即以组件类 @Scheduled注解声明Schedule调度,在启动程序前一次性初始化,如:
@Componentpublic class TestTask { private DateTimeFormatter df = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"); @Scheduled(cron = "0/2 * * * * ?") public void robReceiveExpireTask() { System.out.println(df.format(LocalDateTime.now()) + "测试测试"); }}
缺陷:目前无法在运行期间增加Schedule以及stop、Start、Reset等管理。
一、架构流程图
二、代码实现流程
架构为SpringBoot + Spring + mybatis-plus
1.引入库
pom.xml
<?xml version="1.0" encoding="UTF-8"?><project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <parent> <artifactId>merak-hyper-automation-boot</artifactId> <groupId>com.merak.automation</groupId> <version>1.0.0</version> </parent> <modelVersion>4.0.0</modelVersion> <artifactId>automation-quartz</artifactId> <packaging>jar</packaging> <repositories> <repository> <id>aliyun</id> <name>aliyun Repository</name> <url>http://maven.aliyun.com/nexus/content/groups/public</url> <snapshots> <enabled>false</enabled> </snapshots> </repository> </repositories> <dependencies> <!-- Spring框架基本的核心工具 --> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-context-support</artifactId> </dependency> <!-- SpringWeb模块 --> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-web</artifactId> </dependency> <!-- mysql --> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> </dependency> <!-- druid数据连接池 --> <dependency> <groupId>com.alibaba</groupId> <artifactId>druid-spring-boot-starter</artifactId> </dependency> <dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-lang3</artifactId> </dependency> <dependency> <groupId>jakarta.validation</groupId> <artifactId>jakarta.validation-api</artifactId> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> </dependency> <dependency> <groupId>commons-io</groupId> <artifactId>commons-io</artifactId> </dependency> <dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-annotations</artifactId> </dependency> <!--引入quartz定时框架--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-quartz</artifactId> <version>2.2.5.RELEASE</version> </dependency> </dependencies> <build> <plugins> <!-- 打包跳过测试 --> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build></project>
resources目录下文件/application.yml:
spring:
profiles:
active: dev
resources目录下文件/application-dev.yml:
server:
port: 12105
servlet:
context-path: /automation-quartzmanagement:
endpoints:
web:
exposure:
include: '*'# Spring配置
spring:
resources:
static-locations: classpath:/static/,classpath:/templates/
mvc:
throw-exception-if-no-handler-found: true
static-path-pattern: xml@EnableScheduling@EnableAsync@MapperScan(basePackages = {"com.merak.hyper.automation.persist.**.mapper"})@SpringBootApplication(scanBasePackages = {"com.merak.hyper.automation.**"}, exclude = {SecurityAutoConfiguration.class})public class MerakQuartzApplication { public static final Logger log = LoggerFactory.getLogger(MerakQuartzApplication.class); public static void main(String[] args) { SpringApplication.run(MerakQuartzApplication.class, args); } private int taskSchedulerCorePoolSize = 15; private int awaitTerminationSeconds = 60; private String threadNamePrefix = "taskExecutor-"; @Bean public ThreadPoolTaskScheduler threadPoolTaskScheduler() { ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler(); taskScheduler.setPoolSize(taskSchedulerCorePoolSize); taskScheduler.setThreadNamePrefix(threadNamePrefix); taskScheduler.setWaitForTasksToCompleteOnShutdown(false); taskScheduler.setAwaitTerminationSeconds(awaitTerminationSeconds); taskScheduler.initialize();// isinitialized = true; log.info("初始化ThreadPoolTaskScheduler ThreadNamePrefix=" + threadNamePrefix + ",PoolSize=" + taskSchedulerCorePoolSize + ",awaitTerminationSeconds=" + awaitTerminationSeconds); return taskScheduler; } @Bean("asyncTaskExecutor") public Executor taskExecutor() { ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor(); taskExecutor.setCorePoolSize(5); taskExecutor.setMaxPoolSize(50); taskExecutor.setQueueCapacity(200); taskExecutor.setKeepAliveSeconds(60); taskExecutor.setThreadNamePrefix("asyncTaskExecutor-"); taskExecutor.setWaitForTasksToCompleteOnShutdown(true); taskExecutor.setAwaitTerminationSeconds(60); //修改拒绝策略为使用当前线程执行 taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); //初始化线程池 taskExecutor.initialize(); return taskExecutor; }}一、启动时项目启动时,加载任务关联的触发器,并全量执行流程。
initLineRunner类:
package com.merak.hyper.automation.Scheduling;import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;import com.merak.hyper.automation.persist.entity.AutoTriggerInfo;import com.merak.hyper.automation.persist.entity.BusWorkflow;import com.merak.hyper.automation.persist.service.IAutoTriggerInfoService;import com.merak.hyper.automation.persist.service.IBusWorkflowService;import com.merak.hyper.automation.util.CommonUtil;import com.merak.hyper.automation.util.ScheduleUtil;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.boot.CommandLineRunner;import org.springframework.core.annotation.Order;import org.springframework.stereotype.Component;import java.time.LocalDateTime;import java.time.format.DateTimeFormatter;import java.util.Iterator;import java.util.List;import java.util.Map;@Component@Order(1)public class initLineRunner implements CommandLineRunner { public static final Logger log = LoggerFactory.getLogger(initLineRunner.class); private DateTimeFormatter df = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"); @Autowired private TaskService taskService; @Autowired private IAutoTriggerInfoService triggerInfoService; @Autowired private IBusWorkflowService workflowService; @Override public void run(String... args) { log.info("项目启动:加载数字员工关联的触发器信息并全量执行," + df.format(LocalDateTime.now())); QueryWrapper<BusWorkflow> wrapper = new QueryWrapper<>(); wrapper.eq("wf_type", "3");//3:云托管 wrapper.eq("wf_state", "1"); List<BusWorkflow> busWorkflows = workflowService.list(wrapper); List<AutoTriggerInfo> triggerInfos = triggerInfoService.list(); if( 0 == busWorkflows.size() || 0 == triggerInfos.size() ){ log.info("数字员工关联的触发器信息不正确,员工记录数:"+busWorkflows.size()+",触发器记录数:"+triggerInfos.size()); } else{ //数字员工关联的触发器信息 Map<String,AutoTriggerInfo> loadWfidAndTriggerInfo = CommonUtil.loadWfidAndTriggerInfo(busWorkflows,triggerInfos); Iterator<Map.Entry<String, AutoTriggerInfo>> entries = loadWfidAndTriggerInfo.entrySet().iterator(); while (entries.hasNext()) { Map.Entry<String, AutoTriggerInfo> entry = entries.next(); String wfId = entry.getKey(); BusWorkflow workflow = busWorkflows.stream().filter( t -> wfId.equals(t.getWfId()) ).findAny().orElse(null); if( null != workflow ){ ScheduleUtil.start(new ScheduleTask(wfId,String.valueOf(workflow.getWfCreateuserId()),taskService), entry.getValue()); } } log.info("数字员工关联的触发器信息全量执行完成,数字员工定时个数:"+loadWfidAndTriggerInfo.size()+","+df.format(LocalDateTime.now())); } }}核心代码:```java ScheduleUtil.start(new ScheduleTask(wfId,String.valueOf(workflow.getWfCreateuserId()),taskService), entry.getValue());
Scheduler管理工具类:启动、取消、修改等管理
package com.merak.hyper.automation.util;import com.merak.hyper.automation.Scheduling.ScheduleTask;import com.merak.hyper.automation.persist.entity.AutoTriggerInfo;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;import org.springframework.scheduling.support.CronTrigger;import java.util.HashMap;import java.util.Map;import java.util.concurrent.ScheduledFuture;public class ScheduleUtil { public static final Logger log = LoggerFactory.getLogger(ScheduleUtil.class); private static ThreadPoolTaskScheduler threadPoolTaskScheduler = SpringContextUtils.getBean(ThreadPoolTaskScheduler.class); //存储[数字员工wfI,dScheduledFuture]集合 private static Map<String, ScheduledFuture<?>> scheduledFutureMap = new HashMap<>(); public static boolean start(ScheduleTask scheduleTask, AutoTriggerInfo triggerInfo) { String wfId = scheduleTask.getId(); log.info("启动数字员工"+wfId+"定时任务线程" + scheduleTask.getId()); ScheduledFuture<?> scheduledFuture = threadPoolTaskScheduler.schedule(scheduleTask, new CronTrigger(triggerInfo.getLogicConfig())); scheduledFutureMap.put(wfId, scheduledFuture); return true; } public static boolean cancel(ScheduleTask scheduleTask) { log.info("关闭定时任务线程 taskId " + scheduleTask.getId()); ScheduledFuture<?> scheduledFuture = scheduledFutureMap.get(scheduleTask.getId()); if (scheduledFuture != null && !scheduledFuture.isCancelled()) { scheduledFuture.cancel(false); } scheduledFutureMap.remove(scheduleTask.getId()); return true; } public static boolean reset(ScheduleTask scheduleTask, AutoTriggerInfo triggerInfo) { //先取消定时任务 cancel(scheduleTask); //然后启动新的定时任务 start(scheduleTask, triggerInfo); return true; }}
ScheduleTask类:ScheduleTask任务类
package com.merak.hyper.automation.Scheduling;import org.slf4j.Logger;import org.slf4j.LoggerFactory;public class ScheduleTask implements Runnable { private static final int TIMEOUT = 30000; private String id; private String userId; private TaskService service; public static final Logger log = LoggerFactory.getLogger(ScheduleTask.class); public String getId() { return id; } public ScheduleTask(String id, String userId, TaskService service) { this.id = id; this.userId = userId; this.service = service; } @Override public void run() { log.info("ScheduleTask-执行数字员工消息的发送,id:"+ this.id + ",用户id:"+userId); service.work(this.id,this.userId); }}
public interface TaskService { void work(String keyword,String userId);}@Servicepublic class TaskServiceImpl implements TaskService { public static final Logger log = LoggerFactory.getLogger(TaskServiceImpl.class); @Autowired private IAutoDeviceInfoService deviceInfoService; @Override public void work(String wfId,String userId) { try { log.info("定时任务:根据数字员工wfId"+ wfId +",用户id:"+userId+",发送消息..."); //sendRobotMsg(wfId,userId); Thread.sleep(200); } catch (InterruptedException e) { e.printStackTrace(); } }
二、通过WEB配置的变更,动态管理定时任务
ScheduledController类:scheduled Web业务层:启动、取消、修改等管理schedule
调度任务信息变更(如1:Trigger Cron变更 2:任务停止 3:任务新增加等)
package com.merak.hyper.automation.controller;import com.merak.hyper.automation.common.core.domain.AjaxResult;import com.merak.hyper.automation.common.core.vo.ScheduledApiVo;import com.merak.hyper.automation.persist.entity.AutoTriggerInfo;import com.merak.hyper.automation.persist.service.IAutoTriggerInfoService;import com.merak.hyper.automation.util.ScheduledHelper;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.web.bind.annotation.*;@RestController@RequestMapping("/api/scheduled")public class ScheduledController { public static final Logger log = LoggerFactory.getLogger(ScheduledController.class); @Autowired private IAutoTriggerInfoService triggerInfoService; @Autowired private ScheduledHelper scheduledHelper; @PostMapping("/add") public AjaxResult addScheduleds(@RequestBody ScheduledApiVo scheduledApiVo){ AutoTriggerInfo autoTriggerInfo = triggerInfoService.getById(scheduledApiVo.getTriggerId()); scheduledHelper.addScheduleds(scheduledApiVo,autoTriggerInfo); return AjaxResult.success(); } @PostMapping("/reset") public AjaxResult resetScheduleds(@RequestBody ScheduledApiVo scheduledApiVo){ AutoTriggerInfo autoTriggerInfo = triggerInfoService.getById(scheduledApiVo.getTriggerId()); scheduledHelper.resetScheduleds(scheduledApiVo,autoTriggerInfo); return AjaxResult.success(); } @PostMapping("/stop") public AjaxResult stopScheduleds(@RequestBody ScheduledApiVo scheduledApiVo){ AutoTriggerInfo autoTriggerInfo = triggerInfoService.getById(scheduledApiVo.getTriggerId()); scheduledHelper.stopScheduleds(scheduledApiVo); return AjaxResult.success(); }}ScheduledHelper类:对外提供ScheduledHelper管理:创建、变更、停止```javapackage com.merak.hyper.automation.util;import com.merak.hyper.automation.Scheduling.ScheduleTask;import com.merak.hyper.automation.Scheduling.TaskService;import com.merak.hyper.automation.common.core.vo.ScheduledApiVo;import com.merak.hyper.automation.persist.entity.AutoTriggerInfo;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.scheduling.annotation.Async;import org.springframework.stereotype.Component;@Componentpublic class ScheduledHelper { public static final Logger log = LoggerFactory.getLogger(ScheduledHelper.class); @Async("asyncTaskExecutor") public void addScheduleds(ScheduledApiVo scheduledApiVo, AutoTriggerInfo triggerInfo) { //addSchedule任务 log.warn("创建原数字员工["+scheduledApiVo.getWfId()+"],同步启动Schedule任务"); TaskService taskService = SpringContextUtils.getBean(TaskService.class); ScheduleUtil.start(new ScheduleTask(scheduledApiVo.getWfId(), scheduledApiVo.getUserId(), taskService), triggerInfo); } @Async("asyncTaskExecutor") public void resetScheduleds(ScheduledApiVo scheduledApiVo,AutoTriggerInfo triggerInfo) { //cron值改变,变更Schedule任务 log.warn("数字员工["+scheduledApiVo.getWfId()+"]关联的触发器信息cron值改变,变更Schedule任务"); TaskService taskService = SpringContextUtils.getBean(TaskService.class); ScheduleUtil.reset(new ScheduleTask(scheduledApiVo.getWfId(), scheduledApiVo.getUserId(), taskService), triggerInfo); } @Async("asyncTaskExecutor") public void stopScheduleds(ScheduledApiVo scheduledApiVo) { //移除Wfid,停止原Schedule任务 log.warn("原数字员工["+scheduledApiVo.getWfId()+"]无效,同步停止Schedule任务"); TaskService taskService = SpringContextUtils.getBean(TaskService.class); ScheduleUtil.cancel(new ScheduleTask(scheduledApiVo.getWfId(), scheduledApiVo.getUserId(), taskService)); }}
SpringContextUtils类:
package com.merak.hyper.automation.util;import org.springframework.beans.BeansException;import org.springframework.context.ApplicationContext;import org.springframework.context.ApplicationContextAware;import org.springframework.stereotype.Component;@Componentpublic class SpringContextUtils implements ApplicationContextAware { private static ApplicationContext applicationContext; @Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { SpringContextUtils.applicationContext = applicationContext; } public static Object getBean(String name) { return applicationContext.getBean(name); } public static <T> T getBean(Class<T> requiredType) { return applicationContext.getBean(requiredType); } public static <T> T getBean(String name, Class<T> requiredType) { return applicationContext.getBean(name, requiredType); } public static boolean containsBean(String name) { return applicationContext.containsBean(name); } public static boolean isSingleton(String name) { return applicationContext.isSingleton(name); } public static Class<? extends Object> getType(String name) { return applicationContext.getType(name); }}
ScheduledApiVo类:
import java.io.Serializable;public class ScheduledApiVo implements Serializable { private String wfId; private String userId; private String triggerId; //set get 略}
最终:Web端通过发送Http请求 ,调用ScheduledHelper管理类接口,实现Scheduled创建、变更、停止操作
log.info("3:云托管更新启动数字员工操作"); ScheduledApiVo scheduledApiVo = new ScheduledApiVo(); scheduledApiVo.setWfId(wfId); scheduledApiVo.setUserId(String.valueOf(updateUserId)); scheduledApiVo.setTriggerId(newTriggerInfo.getId()); String webHookBody = JSON.toJSONString(scheduledApiVo); EmsApiUtil.SendQuartzMessage(url, "add", webHookBody); ******************** 分隔 ************************ public static boolean SendQuartzMessage(String quartzUrl, String method, String webHookBody){ boolean result = false; try{ //org.apache.httpcomponents.httpclient sendPost,pom依赖如下dependency String resp = HttpClientUtil.sendPostByJson(quartzUrl+"/"+method, webHookBody,0); if( "error".equals(resp) || resp.contains("405 Not Allowed")){ log.error("调用任务调度中心消息发送失败,地址:"+quartzUrl); } else { JSONObject jsonObject = JSON.parseObject(resp); if( "200".equals(String.valueOf(jsonObject.get("code"))) ){ result = true; } else{ log.error("调用任务调度中心失败,msg:"+String.valueOf(jsonObject.get("msg"))); } } }catch (Exception e){ log.error("调用任务调度中心失败,msg:"+e.getMessage()); } return result; }
<dependency> <groupId>org.apache.httpcomponents</groupId> <artifactId>httpclient</artifactId> <version>4.5.2</version> </dependency>
关于“SpringBoot Schedule调度任务的动态管理方法是什么”这篇文章的内容就介绍到这里,感谢各位的阅读!相信大家对“SpringBoot Schedule调度任务的动态管理方法是什么”知识都有一定的了解,大家如果还想学习更多知识,欢迎关注编程网行业资讯频道。