先说下线程池中几个参数的含义:
ThreadPoolExecutor初始化的时候一般会有7个参数:
- corePoolSize:核心线程数
- maximumPoolSize:最大线程数
- keepAliveTime:非核心线程保活时间
- unit:单位
- workQueue:队列
- Executors.defaultThreadFactory():线程工场
- 拒绝策略
ThreadPoolExecutor的工作原理:
往线程池中提交第一个任务,底层会创建第一个核心线程,将线程和任务封装为一个woker对象放入set集合中,接下来每提交一个的任务都会对应创建一个核心线程和这个任务封装的woker对象放入set集合中,直到核心线程数达到corePoolSize,再次提交到线程池的任务会被放到阻塞队列排队执行,如果放队列的过程中,队列满了,就会创建一个非核心线程和这个任务封装为woker对象放入set集合中,如果最终已经达到最大线程数maximumPoolSize,就采用拒绝策略。如果放入队列过程中发现工作线程数位0,则创建一个空任务的Worker。
再来看下线程池的标识:
线程池的标识有两层含义:
- 一个含义是当前线程池中的线程数量;
- 一个含义是当前线程池的状态;
底层是用按位分隔的设计方式将一个int类型的变量的32位进行分割,用高3位表示线程状态,低29位表示线程数量。
线程池的5个状态:RUNNING= -1,正常运行状态 SHUTDOWN= 0, 表示不接受新任务,只把队列中的任务处理完结束。STOP= 1,表示不接受新任务,也不处理队列中的任务了。IDYING= 2,非正常状态 TERMINATED = 3,死亡状态
按位分割的好处就是用一个变量表示两个状态,在修改的时候可以利用cas保证原子性。
Worker对象创建逻辑是由addWorker方法实现的。
addWorker方法逻辑:
retry;双层循环
第一层循环主要判断:如果当前线程池状态为RUNNING就放行, 如果状态为SHUTDOWN就必须满足传进来的新任务为null,队列中有待处理的任务才会放行(因为SHUTDOWN状态下不接受新任务,只处理队列中的任务);如果状态为STOP,IDYING,TERMINATED就一定不放行;
第二层循环主要是判断线程数,如果是创建核心线程,就判断是否达到corePoolSize,否则就判断是否达到maximumPoolSize,如果达到就返回fasle不放行。
如果未达到就放行,放行的时候会利用cas更新线程数,如果更新成功则两层循环结束,继续下面的逻辑。
因为是cas操作,多线程的情况下可能会有更新线程数量失败的情况,在这种情况下要判断之前获取的线程池状态和现在的线程池状态是否一致,如果不一致那就要重新判断状态,从而进入到外层循环的下一轮循环,如果一致就只需要进入到内层循环的下一轮循环。
创建Worker对象
接下来就是创建Worker(任务),Worker类继承aqs,封装了线程工厂,初始化的时候会利用工厂创建一个线程,并且和传进来的任务封装为worker对象。
获取线程池全局锁(reentrylock作为线程池全局锁),进行上锁操作
将创建的worker加入workers集合,workers是一个hashset集合。
放入集合后就可以解锁了
worker创建完成了,接下来就是启动线程,启动线程后就会执行worker中的run方法
run方法流程
这个方法中主要的逻辑是这段代码
while (task != null || (task = getTask()) != null){
逻辑
}
task是worker对象封装的任务。如果当前worker对象上没有任务就调用getTask去阻塞队列拿任务,如果能拿到就处理任务。如果getTask返回null就跳出循环,进入processWorkerExit方法。
我们知道线程池中的任务是放在队列中的,ThreadPoolExecutor中的队列一般默认是阻塞队列LinkedBlockingQueue,
getTask()方法会在这个队列中拿任务,如果有任务就直接返回任务,如果此时队列中无任务,当前线程会阻塞等待任务到来。
但是如果设置了非核心线程最大空闲时间keepAliveTime,代表非核心线程的worker对象中的线程在拿任务的时候不会用take方法,而是用poll,poll这个方法可以设置阻塞等待时间为keepAliveTime。当超过这个时间还没有任务就会返回null。
- processWorkerExit方法逻辑
上一步中如果没有获取到任务并且返回了null就会进入processWorkerExit方法。这个方法的逻辑就是把当前非核心线程的worker从workers集合中移除。最后会做一个判断:如果此时没有任何工作线程了,并且阻塞队列中还有任务,那就再创建一个不带任务的非核心线程worker。保证有线程去处理队列中的任务。
拒绝策略:
- AbortPolicy(默认):丢弃任务并抛出 RejectedExecutionException 异常。
- CallerRunsPolicy:由调用线程处理该任务。
- DiscardPolicy:丢弃任务,但是不抛出异常。可以配合这种模式进行自定义的处理方式。
- DiscardOldestPolicy:丢弃队列最早的未处理任务,然后重新尝试执行任务。
其他了解:
线程监控API:
while (true) {
System.out.println();
int queueSize = tpe.getQueue().size();
System.out.println("当前排队线程数:" + queueSize);
int activeCount = tpe.getActiveCount();
System.out.println("当前活动线程数:" + activeCount);
long completedTaskCount = tpe.getCompletedTaskCount();
System.out.println("执行完成线程数:" + completedTaskCount);
long taskCount = tpe.getTaskCount();
System.out.println("总线程数:" + taskCount);
Thread.sleep(3000);
}