文章详情

短信预约-IT技能 免费直播动态提醒

请输入下面的图形验证码

提交验证

短信预约提醒成功

Java 线程池 ThreadPoolExecutor 八种拒绝策略浅析

2024-12-11 19:42

关注

前言

谈到java的线程池最熟悉的莫过于ExecutorService接口了,jdk1.5新增的java.util.concurrent包下的这个api,大大的简化了多线程代码的开发。而不论你用FixedThreadPool还是CachedThreadPool其背后实现都是ThreadPoolExecutor。ThreadPoolExecutor是一个典型的缓存池化设计的产物,因为池子有大小,当池子体积不够承载时,就涉及到拒绝策略。JDK中已经预设了4种线程池拒绝策略,下面结合场景详细聊聊这些策略的使用场景,以及我们还能扩展哪些拒绝策略。

 

池化设计思想

池话设计应该不是一个新名词。我们常见的如java线程池、jdbc连接池、redis连接池等就是这类设计的代表实现。这种设计会初始预设资源,解决的问题就是抵消每次获取资源的消耗,如创建线程的开销,获取远程连接的开销等。就好比你去食堂打饭,打饭的大妈会先把饭盛好几份放那里,你来了就直接拿着饭盒加菜即可,不用再临时又盛饭又打菜,效率就高了。除了初始化资源,池化设计还包括如下这些特征:池子的初始值、池子的活跃值、池子的最大值等,这些特征可以直接映射到java线程池和数据库连接池的成员属性中。

线程池触发拒绝策略的时机

和数据源连接池不一样,线程池除了初始大小和池子最大值,还多了一个阻塞队列来缓冲。数据源连接池一般请求的连接数超过连接池的最大值的时候就会触发拒绝策略,策略一般是阻塞等待设置的时间或者直接抛异常。而线程池的触发时机如下图:

 

 

如图,想要了解线程池什么时候触发拒绝粗略,需要明确上面三个参数的具体含义,是这三个参数总体协调的结果,而不是简单的超过最大线程数就会触发线程拒绝粗略,当提交的任务数大于corePoolSize时,会优先放到队列缓冲区,只有填满了缓冲区后,才会判断当前运行的任务是否大于maxPoolSize,小于时会新建线程处理。大于时就触发了拒绝策略,总结就是:当前提交任务数大于(maxPoolSize + queueCapacity)时就会触发线程池的拒绝策略了。

JDK内置4种线程池拒绝策略

拒绝策略接口定义

在分析JDK自带的线程池拒绝策略前,先看下JDK定义的 拒绝策略接口,如下:

  1. public interface RejectedExecutionHandler { 
  2.  void rejectedExecution(Runnable r, ThreadPoolExecutor executor); 

接口定义很明确,当触发拒绝策略时,线程池会调用你设置的具体的策略,将当前提交的任务以及线程池实例本身传递给你处理,具体作何处理,不同场景会有不同的考虑,下面看JDK为我们内置了哪些实现:

CallerRunsPolicy(调用者运行策略)

  1. public static class CallerRunsPolicy implements RejectedExecutionHandler { 
  2.  public CallerRunsPolicy() { } 
  3.  public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { 
  4.  if (!e.isShutdown()) { 
  5.  r.run(); 
  6.  } 
  7.  } 
  8.  } 

AbortPolicy(中止策略)

  1. public static class AbortPolicy implements RejectedExecutionHandler { 
  2.  public AbortPolicy() { } 
  3.  public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { 
  4.  throw new RejectedExecutionException("Task " + r.toString() + 
  5.  " rejected from " + 
  6.  e.toString()); 
  7.  } 
  8.  } 

DiscardPolicy(丢弃策略)

  1. public static class DiscardPolicy implements RejectedExecutionHandler { 
  2.  public DiscardPolicy() { } 
  3.  public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { 
  4.  } 
  5.  } 

DiscardOldestPolicy(弃老策略)

  1. public static class DiscardOldestPolicy implements RejectedExecutionHandler { 
  2.  public DiscardOldestPolicy() { } 
  3.  public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { 
  4.  if (!e.isShutdown()) { 
  5.  e.getQueue().poll(); 
  6.  e.execute(r); 
  7.  } 
  8.  } 
  9.  } 

第三方实现的拒绝策略

dubbo中的线程拒绝策略

  1. public class AbortPolicyWithReport extends ThreadPoolExecutor.AbortPolicy { 
  2.  protected static final Logger logger = LoggerFactory.getLogger(AbortPolicyWithReport.class); 
  3.  private final String threadName; 
  4.  private final URL url; 
  5.  private static volatile long lastPrintTime = 0; 
  6.  private static Semaphore guard = new Semaphore(1); 
  7.  public AbortPolicyWithReport(String threadName, URL url) { 
  8.  this.threadName = threadName; 
  9.  this.url = url; 
  10.  } 
  11.  @Override 
  12.  public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { 
  13.  String msg = String.format("Thread pool is EXHAUSTED!" + 
  14.  " Thread Name: %s, Pool Size: %d (active: %d, core: %d, max: %d, largest: %d), Task: %d (completed: %d)," + 
  15.  " Executor status:(isShutdown:%s, isTerminated:%s, isTerminating:%s), in %s://%s:%d!"
  16.  threadName, e.getPoolSize(), e.getActiveCount(), e.getCorePoolSize(), e.getMaximumPoolSize(), e.getLargestPoolSize(), 
  17.  e.getTaskCount(), e.getCompletedTaskCount(), e.isShutdown(), e.isTerminated(), e.isTerminating(), 
  18.  url.getProtocol(), url.getIp(), url.getPort()); 
  19.  logger.warn(msg); 
  20.  dumpJStack(); 
  21.  throw new RejectedExecutionException(msg); 
  22.  } 
  23.  private void dumpJStack() { 
  24.  //省略实现 
  25.  } 

可以看到,当dubbo的工作线程触发了线程拒绝后,主要做了三个事情,原则就是尽量让使用者清楚触发线程拒绝策略的真实原因

Netty中的线程池拒绝策略

  1. private static final class NewThreadRunsPolicy implements RejectedExecutionHandler { 
  2.  NewThreadRunsPolicy() { 
  3.  super(); 
  4.  } 
  5.  public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { 
  6.  try { 
  7.  final Thread t = new Thread(r, "Temporary task executor"); 
  8.  t.start(); 
  9.  } catch (Throwable e) { 
  10.  throw new RejectedExecutionException( 
  11.  "Failed to start a new thread", e); 
  12.  } 
  13.  } 
  14.  } 

Netty中的实现很像JDK中的CallerRunsPolicy,舍不得丢弃任务。不同的是,CallerRunsPolicy是直接在调用者线程执行的任务。而 Netty是新建了一个线程来处理的。所以,Netty的实现相较于调用者执行策略的使用面就可以扩展到支持高效率高性能的场景了。但是也要注意一点,Netty的实现里,在创建线程时未做任何的判断约束,也就是说只要系统还有资源就会创建新的线程来处理,直到new不出新的线程了,才会抛创建线程失败的异常

activeMq中的线程池拒绝策略

  1. new RejectedExecutionHandler() { 
  2.  @Override 
  3.  public void rejectedExecution(final Runnable r, final ThreadPoolExecutor executor) { 
  4.  try { 
  5.  executor.getQueue().offer(r, 60, TimeUnit.SECONDS); 
  6.  } catch (InterruptedException e) { 
  7.  throw new RejectedExecutionException("Interrupted waiting for BrokerService.worker"); 
  8.  } 
  9.  throw new RejectedExecutionException("Timed Out while attempting to enqueue Task."); 
  10.  } 
  11.  }); 

activeMq中的策略属于最大努力执行任务型,当触发拒绝策略时,在尝试一分钟的时间重新将任务塞进任务队列,当一分钟超时还没成功时,就抛出异常

pinpoint中的线程池拒绝策略

  1. public class RejectedExecutionHandlerChain implements RejectedExecutionHandler { 
  2.  private final RejectedExecutionHandler[] handlerChain; 
  3.  public static RejectedExecutionHandler build(List chain) { 
  4.  Objects.requireNonNull(chain, "handlerChain must not be null"); 
  5.  RejectedExecutionHandler[] handlerChain = chain.toArray(new RejectedExecutionHandler[0]); 
  6.  return new RejectedExecutionHandlerChain(handlerChain); 
  7.  } 
  8.  private RejectedExecutionHandlerChain(RejectedExecutionHandler[] handlerChain) { 
  9.  this.handlerChain = Objects.requireNonNull(handlerChain, "handlerChain must not be null"); 
  10.  } 
  11.  @Override 
  12.  public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { 
  13.  for (RejectedExecutionHandler rejectedExecutionHandler : handlerChain) { 
  14.  rejectedExecutionHandler.rejectedExecution(r, executor); 
  15.  } 
  16.  } 

 

pinpoint的拒绝策略实现很有特点,和其他的实现都不同。他定义了一个拒绝策略链,包装了一个拒绝策略列表,当触发拒绝策略时,会将策略链中的rejectedExecution依次执行一遍

结语

前文从线程池设计思想,以及线程池触发拒绝策略的时机引出java线程池拒绝策略接口的定义。并辅以JDK内置4种以及四个第三方开源软件的拒绝策略定义描述了线程池拒绝策略实现的各种思路和使用场景。希望阅读此文后能让你对java线程池拒绝策略有更加深刻的认识,能够根据不同的使用场景更加灵活的应用。

来源:今日头条内容投诉

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

软考中级精品资料免费领

  • 历年真题答案解析
  • 备考技巧名师总结
  • 高频考点精准押题
  • 2024年上半年信息系统项目管理师第二批次真题及答案解析(完整版)

    难度     813人已做
    查看
  • 【考后总结】2024年5月26日信息系统项目管理师第2批次考情分析

    难度     354人已做
    查看
  • 【考后总结】2024年5月25日信息系统项目管理师第1批次考情分析

    难度     318人已做
    查看
  • 2024年上半年软考高项第一、二批次真题考点汇总(完整版)

    难度     435人已做
    查看
  • 2024年上半年系统架构设计师考试综合知识真题

    难度     224人已做
    查看

相关文章

发现更多好内容

猜你喜欢

AI推送时光机
位置:首页-资讯-后端开发
咦!没有更多了?去看看其它编程学习网 内容吧
首页课程
资料下载
问答资讯