Java的线程池类ThreadPoolExecutor位于java.util.concurrent 包中,它是一个灵活且广泛使用的线程池实现。线程池通过重用线程来减少线程创建和销毁的开销,提高应用程序的性能,线程池的基本组成如下:
- 核心线程数 (corePoolSize): 核心线程数是线程池在空闲时仍保留的线程数。
- 最大线程数 (maximumPoolSize): 线程池中允许的最大线程数。
- 任务队列 (workQueue): 用于保存等待执行任务的阻塞队列。
- 线程工厂 (ThreadFactory): 用于创建新线程的工厂。
- 拒绝策略 (RejectedExecutionHandler): 当任务无法提交到线程池时,如何处理任务的策略。
拒绝策略的类型
ThreadPoolExecutor 提供了四种内置的拒绝策略:
- AbortPolicy: 默认策略。直接抛出 RejectedExecutionException,阻止系统正常工作。
- CallerRunsPolicy: 提交任务的线程自己执行该任务。
- DiscardPolicy: 直接丢弃任务,不予任何处理。
- DiscardOldestPolicy: 丢弃队列中最旧的任务,然后尝试重新提交当前任务。
AbortPolicy
AbortPolicy策略是直接抛出 RejectedExecutionException,不执行任务。适合在需要明确知道任务被拒绝时使用。
下面是AbortPolicy的源码实现:
public static class AbortPolicy implements RejectedExecutionHandler {
public AbortPolicy() { }
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
throw new RejectedExecutionException("Task " + r.toString() +
" rejected from " +
e.toString());
}
}
使用场景:
- 在高可靠性系统中,AbortPolicy 可用于快速发现问题并进行处理。
- 当任务提交失败后需要立即采取补救措施时。
CallerRunsPolicy
CallerRunsPolicy策略由提交任务的线程(通常是主线程)来执行该任务,通过降低任务提交速率来缓解压力。
下面是CallerRunsPolicy的源码实现:
public static class CallerRunsPolicy implements RejectedExecutionHandler {
public CallerRunsPolicy() { }
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
r.run();
}
}
}
使用场景:
- 适用于不希望丢弃任务且能接受任务执行延迟的场景。
- 可用于削峰填谷,防止任务过快提交。
DiscardPolicy
DiscardPolicy策略是指直接丢弃无法执行的任务,不抛异常,也就是不对被丢弃的任务进行任何处理。
下面是DiscardPolicy的源码实现:
public static class DiscardPolicy implements RejectedExecutionHandler {
public DiscardPolicy() { }
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
// Do nothing
}
}
使用场景:
- 适用于不关心单个任务被丢弃的场景。
- 在负载极高且系统能容忍数据丢失的情况下。
DiscardOldestPolicy
DiscardOldestPolicy策略会丢弃队列中最旧的任务,然后尝试重新提交当前任务,这种策略通常用于保证新任务有机会被执行。
下面是DiscardOldestPolicy的源码实现:
public static class DiscardOldestPolicy implements RejectedExecutionHandler {
public DiscardOldestPolicy() { }
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
e.getQueue().poll(); // discard oldest
e.execute(r); // retry
}
}
}
使用场景:
- 适用于需要保证最新任务的优先级高于旧任务的场景。
- 在新任务更重要的实时系统中。
自定义拒绝策略
除了内置策略,开发者可以实现 RejectedExecutionHandler 接口来定义自己的拒绝策略,通过这种方式,开发者可以根据具体需求来处理被拒绝的任务。下面是实现自定义策略的步骤:
- 实现RejectedExecutionHandler接口。
- 覆盖rejectedExecution方法,定义拒绝策略。
- 在ThreadPoolExecutor的构造函数中传入自定义策略。
代码示例如下:
public class CustomRejectedExecutionHandler implements RejectedExecutionHandler {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
// 自定义拒绝逻辑,例如日志记录或重新尝试
log.warn("This is custom rejected: " + r.toString());
// 可以选择重新提交任务或其他处理
}
}
最后,我们再通过代码来展示如何创建一个线程池以及如何使用拒绝策略:
import java.util.concurrent.*;
public class ThreadPoolExample {
public static void main(String[] args) {
// 定义线程池的参数
int corePoolSize = 2;
int maximumPoolSize = 4;
long keepAliveTime = 10;
TimeUnit unit = TimeUnit.SECONDS;
BlockingQueue workQueue = new ArrayBlockingQueue<>(2);
// 创建线程池
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(
corePoolSize,
maximumPoolSize,
keepAliveTime,
unit,
workQueue,
new ThreadPoolExecutor.AbortPolicy() // 默认策略
// new ThreadPoolExecutor.CallerRunsPolicy()
// new ThreadPoolExecutor.DiscardPolicy()
// new ThreadPoolExecutor.DiscardOldestPolicy()
// new CustomRejectedExecutionHandler()
);
// 关闭线程池
threadPool.shutdown();
}
}
使用场景分析
不同的拒绝策略适合不同场景,下面是选择拒绝策略的一些参考因素:
- 实时性要求高: 如果系统不能接受任务被长时间阻塞或丢弃,可以选择 CallerRunsPolicy 或自定义策略,以确保任务被及时处理。
- 任务重要性不同: 对于有些场景,新任务比旧任务更重要,可以选择 DiscardOldestPolicy。
- 任务丢失可接受: 在任务丢失对系统影响较小的情况下,可以选择 DiscardPolicy,以保证系统整体的吞吐量。
- 系统可靠性: 在系统需要对任务被拒绝进行明确处理时,AbortPolicy 可以帮助快速发现和响应。
总结
本文,我们通过源码分析了Java 线程池提供的拒绝策略,整体来说拒绝策略是比较简单的一个知识点,如果业务代码中使用了线程池,拒绝策略是必须掌握的一个知识点,开发者可以根据具体的场景选择合适的策略,甚至可以设计自定义策略来满足特定需求,避免因过载导致的系统崩溃。