文章详情

短信预约-IT技能 免费直播动态提醒

请输入下面的图形验证码

提交验证

短信预约提醒成功

利用DUCC配置平台实现一个动态化线程池

2024-11-30 17:32

关注

1.背景

在后台开发中,会经常用到线程池技术,对于线程池核心参数的配置很大程度上依靠经验。然而,由于系统运行过程中存在的不确定性,我们很难一劳永逸地规划一个合理的线程池参数。在对线程池配置参数进行调整时,一般需要对服务进行重启,这样修改的成本就会偏高。一种解决办法就是,将线程池的配置放到配置平台侧,系统运行期间开发人员根据系统运行情况对核心参数进行动态配置。

本文以公司DUCC配置平台作为服务配置中心,以修改线程池核心线程数、最大线程数为例,实现一个简单的动态化线程池。

2.代码实现

当前项目中使用的是Spring 框架提供的线程池类ThreadPoolTaskExecutor,而ThreadPoolTaskExecutor底层又使用里了JDK中线程池类ThreadPoolExecutor,线程池类ThreadPoolExecutor有两个成员方法setCorePoolSize、setMaximumPoolSize可以在运行时设置核心线程数和最大线程数。

setCorePoolSize方法执行流程是:首先会覆盖之前构造函数设置的corePoolSize,然后,如果新的值比原始值要小,当多余的工作线程下次变成空闲状态的时候会被中断并销毁,如果新的值比原来的值要大且工作队列不为空,则会创建新的工作线程。流程图如下:

setMaximumPoolSize方法:首先会覆盖之前构造函数设置的maximumPoolSize,然后,如果新的值比原来的值要小,当多余的工作线程下次变成空闲状态的时候会被中断并销毁。

Spring 框架提供的线程池类ThreadPoolTaskExecutor,此类封装了对ThreadPoolExecutor有两个成员方法setCorePoolSize、setMaximumPoolSize的调用。

基于以上源代码分析,要实现一个简单的动态线程池需要以下几步:

(1)定义一个动态线程池类,继承ThreadPoolTaskExecutor,目的跟非动态配置的线程池类ThreadPoolTaskExecutor区分开;

(2)定义和实现一个动态线程池配置定时刷的类,目的定时对比ducc配置的线程池数和本地应用中线程数是否一致,若不一致,则更新本地动态线程池线程池数;

(3)引入公司ducc配置平台相关jar包并创建一个动态线程池配置key;

(4)定义和实现一个应用启动后根据动态线程池Bean和从ducc配置平台拉取配置刷新应用中的线程数配置;

接下来代码一一实现:

(1)动态线程池类


public class DynamicThreadPoolTaskExecutor extends ThreadPoolTaskExecutor {
}

(2)动态线程池配置定时刷新类

@Slf4j
public class DynamicThreadPoolRefresh implements InitializingBean {

private static final ConcurrentMap<String, DynamicThreadPoolTaskExecutor> DTP_REGISTRY = new ConcurrentHashMap<>();


public static void registerDynamicThreadPool(String threadPoolBeanName, DynamicThreadPoolTaskExecutor threadPoolTaskExecutor) {
log.info("DynamicThreadPool register ThreadPoolTaskExecutor, threadPoolBeanName: {}, executor: {}", threadPoolBeanName, ExecutorConverter.convert(threadPoolBeanName, threadPoolTaskExecutor.getThreadPoolExecutor()));
DTP_REGISTRY.putIfAbsent(threadPoolBeanName, threadPoolTaskExecutor);
}

@Override
public void afterPropertiesSet() throws Exception {
this.refresh();
//创建定时任务线程池
ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(1, (new BasicThreadFactory.Builder()).namingPattern("DynamicThreadPoolRefresh-%d").daemon(true).build());
//延迟1秒执行,每个1分钟check一次
executorService.scheduleAtFixedRate(new RefreshThreadPoolConfig(), 1000L, 60000L, TimeUnit.MILLISECONDS);
}

private void refresh() {
String dynamicThreadPool = "";
try {
if (DTP_REGISTRY.isEmpty()) {
log.debug("DynamicThreadPool refresh DTP_REGISTRY is empty");
return;
}
dynamicThreadPool = DuccConfigUtil.getValue(DuccConfigConstants.DYNAMIC_THREAD_POOL);
if (StringUtils.isBlank(dynamicThreadPool)) {
log.debug("DynamicThreadPool refresh dynamicThreadPool not config");
return;
}
log.debug("DynamicThreadPool refresh dynamicThreadPool:{}", dynamicThreadPool);
List<ThreadPoolProperties> threadPoolPropertiesList = JsonUtil.json2Object(dynamicThreadPool, new TypeReference<List<ThreadPoolProperties>>() {
});
if (CollectionUtils.isEmpty(threadPoolPropertiesList)) {
log.error("DynamicThreadPool refresh dynamicThreadPool json2Object error!{}", dynamicThreadPool);
return;
}
for (ThreadPoolProperties properties : threadPoolPropertiesList) {
doRefresh(properties);
}
} catch (Exception e) {
log.error("DynamicThreadPool refresh exception!dynamicThreadPool:{}", dynamicThreadPool, e);
}
}


private void doRefresh(ThreadPoolProperties properties) {
if (StringUtils.isBlank(properties.getThreadPoolBeanName())
|| properties.getCorePoolSize() < 1
|| properties.getMaxPoolSize() < 1
|| properties.getMaxPoolSize() < properties.getCorePoolSize()) {
log.error("DynamicThreadPool refresh, invalid parameters exist, properties: {}", properties);
return;
}
DynamicThreadPoolTaskExecutor threadPoolTaskExecutor = DTP_REGISTRY.get(properties.getThreadPoolBeanName());
if (Objects.isNull(threadPoolTaskExecutor)) {
log.warn("DynamicThreadPool refresh, DTP_REGISTRY not found {}", properties.getThreadPoolBeanName());
return;
}
ThreadPoolProperties oldProp = ExecutorConverter.convert(properties.getThreadPoolBeanName(), threadPoolTaskExecutor.getThreadPoolExecutor());
if (Objects.equals(oldProp.getCorePoolSize(), properties.getCorePoolSize())
&& Objects.equals(oldProp.getMaxPoolSize(), properties.getMaxPoolSize())) {
log.warn("DynamicThreadPool refresh, properties of [{}] have not changed.", properties.getThreadPoolBeanName());
return;
}
if (!Objects.equals(oldProp.getCorePoolSize(), properties.getCorePoolSize())) {
threadPoolTaskExecutor.setCorePoolSize(properties.getCorePoolSize());
log.info("DynamicThreadPool refresh, corePoolSize changed!{} {}", properties.getThreadPoolBeanName(), properties.getCorePoolSize());
}
if (!Objects.equals(oldProp.getMaxPoolSize(), properties.getMaxPoolSize())) {
threadPoolTaskExecutor.setMaxPoolSize(properties.getMaxPoolSize());
log.info("DynamicThreadPool refresh, maxPoolSize changed!{} {}", properties.getThreadPoolBeanName(), properties.getMaxPoolSize());
}

ThreadPoolProperties newProp = ExecutorConverter.convert(properties.getThreadPoolBeanName(), threadPoolTaskExecutor.getThreadPoolExecutor());
log.info("DynamicThreadPool refresh result!{} oldProp:{},newProp:{}", properties.getThreadPoolBeanName(), oldProp, newProp);
}

private class RefreshThreadPoolConfig extends TimerTask {
private RefreshThreadPoolConfig() {
}

@Override
public void run() {
DynamicThreadPoolRefresh.this.refresh();
}
}

}

线程池配置类

@Data
public class ThreadPoolProperties {

private String threadPoolBeanName;

private int corePoolSize;

private int maxPoolSize;
}

(3)引入公司ducc配置平台相关jar包并创建一个动态线程池配置key

配置value:

[
{
"threadPoolBeanName": "submitOrderThreadPoolTaskExecutor",
"corePoolSize": 32,
"maxPoolSize": 128
}
]

(4) 应用启动刷新应用本地动态线程池配置

@Slf4j
public class DynamicThreadPoolPostProcessor implements BeanPostProcessor {

@Override
public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
if (bean instanceof DynamicThreadPoolTaskExecutor) {
DynamicThreadPoolRefresh.registerDynamicThreadPool(beanName, (DynamicThreadPoolTaskExecutor) bean);
}
return bean;
}
}

3.动态线程池应用

动态线程池Bean声明


<bean id="threadPoolTaskExecutor" class="com.jd.concurrent.ThreadPoolTaskExecutorWrapper">

<property name="corePoolSize" value="128"/>

<property name="maxPoolSize" value="512"/>

<property name="queueCapacity" value="500"/>

<property name="keepAliveSeconds" value="60"/>

<property name="rejectedExecutionHandler">




<bean class="java.util.concurrent.ThreadPoolExecutor$CallerRunsPolicy"/>
property>
bean>

<bean id="submitOrderThreadPoolTaskExecutor" class="com.jd.concurrent.DynamicThreadPoolTaskExecutor">

<property name="corePoolSize" value="32"/>

<property name="maxPoolSize" value="128"/>

<property name="queueCapacity" value="500"/>

<property name="keepAliveSeconds" value="60"/>

<property name="rejectedExecutionHandler">




<bean class="java.util.concurrent.ThreadPoolExecutor$CallerRunsPolicy"/>
property>
bean>

<bean class="com.jd.concurrent.DynamicThreadPoolPostProcessor"/>
<bean class="com.jd.concurrent.DynamicThreadPoolRefresh"/>

业务类注入Spring Bean后,直接使用即可

@Resource
private ThreadPoolTaskExecutor submitOrderThreadPoolTaskExecutor;


Runnable asyncTask = ()->{...};
CompletableFuture.runAsync(asyncTask, this.submitOrderThreadPoolTaskExecutor);

4.小结

本文从实际项目的业务痛点场景出发,并基于公司已有的ducc配置平台简单实现了线程池线程数量可配置。​

来源:今日头条内容投诉

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

软考中级精品资料免费领

  • 历年真题答案解析
  • 备考技巧名师总结
  • 高频考点精准押题
  • 2024年上半年信息系统项目管理师第二批次真题及答案解析(完整版)

    难度     813人已做
    查看
  • 【考后总结】2024年5月26日信息系统项目管理师第2批次考情分析

    难度     354人已做
    查看
  • 【考后总结】2024年5月25日信息系统项目管理师第1批次考情分析

    难度     318人已做
    查看
  • 2024年上半年软考高项第一、二批次真题考点汇总(完整版)

    难度     435人已做
    查看
  • 2024年上半年系统架构设计师考试综合知识真题

    难度     224人已做
    查看

相关文章

发现更多好内容

猜你喜欢

AI推送时光机
位置:首页-资讯-后端开发
咦!没有更多了?去看看其它编程学习网 内容吧
首页课程
资料下载
问答资讯