
本文对源码的学习是基于JDK1.8版本.
前言
线程池作用 :
- 提高资源利用率 : 通过重复利用已经创建好的线程,减少重复创建、销毁造成的消耗;
- 提高吞吐量 : 异步执行任务,即当任务到达时,任务可以可以直接放入阻塞队列中,等待线程的执行;
- 提高代码可维护性 : 使用线程池可以进行统一分配、调优和监控.
线程池的五种运行状态
几个相关的变量 :
// 运行时状态,默认初始化后为RUNNING状态
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
// 29bit位
private static final int COUNT_BITS = Integer.SIZE - 3;
1. // 线程池中工作线程的最大容量 : 11111 11111111 11111111 11111111 (29个1)
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
2. // 五种运行状态 :
2.1 // 运行状态 : 101 00000 00000000 00000000 00000000
private static final int RUNNING = -1 << COUNT_BITS;
2.2 // 关闭状态 : 000 00000 00000000 00000000 00000000
private static final int SHUTDOWN = 0 << COUNT_BITS;
2.3 // 强制关闭 : 001 00000 00000000 00000000 00000000
private static final int STOP = 1 << COUNT_BITS;
2.4 // 结束状态 : 010 00000 00000000 00000000 00000000
private static final int TIDYING = 2 << COUNT_BITS;
2.5 // 死亡状态 : 011 00000 00000000 00000000 00000000
private static final int TERMINATED = 3 << COUNT_BITS;
// PS :
// ctl的高3bit位 代表着 五种不同运行状态
// ctl的低29bit位 代表着 当前创建的线程数
// 这五种状态大致可以分为 :
// 运行状态(小于0) , 此状态在没饱和状态下会 创建工作线程、接收任务、处理任务、回收工作线程
// 关闭状态(等于0,SHUTDOWN) , 此状态在没饱和状态下会 创建工作线程、处理已接收任务、但是不接收新任务、处理完全部任务最终回收工作线程
// 强制关闭状态(大于0,STOP) : 此状态下不会创建工作线程、已接受的任务也不会处理、新任务也不会接收,而是强制中断运行工作线程、回收工作线程
// 其他状态(大于0,TIDYING \ TERMINATED) 此状态下工作线程已经全部完成回收,进入结束、死亡状态
如下图 : 线程池的五种运行状态

ctl运行状态的相关计算函数 :
// 获取当前线程池运行状态
private static int runStateOf(int c) { return c & ~CAPACITY; }
// 获取当前线程池中已创建的工作线程数量
private static int workerCountOf(int c) { return c & CAPACITY; }
// 更换运行状态,并且根据当前线程池中已创建的工作线程数量(workerCountOf())来保留当前的工作线程数量;
// 这个函数最终的作用就是改变了高3bit为的运行状态,并且低29bit位保留了之前创建的工作线程数量
private static int ctlOf(int rs, int wc) { return rs | wc; }
- runStateOf(int c) : 根据ctl获取当前线程池运行状态
- c : 运行期间的ctl状态值
- ~CAPACITY : 0 , 即29个1全部变为0
- c & ~CAPACITY : 最终只保留高3bit位,由此可以得到当前线程池的运行状态
- workerCountOf(int c) : 根据ctl获取当前线程池中已创建的工作线程数量
- c : 运行期间的ctl状态值
- CAPACITY : 29个1
- c & CAPACITY : 最终只保留低29bit位,由此可以得到当前线程池中已创建的工作线程数量
- ctlOf(int rs, int wc) :
- rs : 当前需要改变的最新运行状态,例如由RUNNING变为SHUTDOWN的情况下,那么rs传送的就是SHUTDOWN值
- wc : 当前线程池中已创建的工作线程数量,根据workerCountOf()获取
- rs | wc : 最终是高3bit位更换了状态,低29bit位保留了当前创建的工作线程数
构造器
// 这个使用的默认线程工厂、默认拒绝策略defaultHandler
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,
long keepAliveTime,TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
this(corePoolSize, maximumPoolSize,
keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(),
defaultHandler);
}
// 这个是最终调用的构造器
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,
long keepAliveTime,TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
4. // 默认使用的拒绝策略 : 直接抛出异常
private static final RejectedExecutionHandler defaultHandler =
new AbortPolicy();
6个参数说明 :
- corePoolSize : 核心线程数,即该值保证了线程池中至少创建的corePoolSize条工作线程
- workQueue : 阻塞队列(任务队列,生产者-消费者队列),作用于保存等待执行的任务的阻塞队列;该参数可为无界阻塞队列、有界阻塞队列、同步队列(SynchronousQueue)、优先队列、双端队列等等.
- maximumPoolSize : 最大线程数,即如果阻塞队列满了且当前创建的工作线程数小于maximumPoolSize,则需要创建新的线程来帮忙执行阻塞队列中的任务;对于workQueue参数如果为无界阻塞队列,则maximumPoolSize参数无效.(LinkedBlockingQueue创建的时候如果没有传入容量,则使用的是Integer.MAX_VALUE,理论上来说是无界队列了)
- keepAliveTime : 当线程池中的工作线程数量大于corePoolSize的时候,如果这时没有新的任务提交(workQueue为空),线程在向workQueue获取任务的时候会进入等待状态,直到等待的时间超过了keepAliveTime,那么非核心线程将会被回收、销毁.说白了就是当线程池的线程大于corePoolSize的时候且阻塞队列没有任务,此时不需要更多的线程占用空间了,那么将回收核心线程数以外的线程,回收的这些线程是因为在keepAliveTime时间内没有获取到任务.
- threadFactory : 线程工厂.默认使用Executors的静态内部类DefaultThreadFactory
- handler : 拒绝策略,父接口是RejectedExecutionHandler,其中有四种不同的拒绝策略实现了该接口,默认使用AbortPolicy,即抛出异常。当然,我们也可以根据自己的场景来自定义拒绝策略。
execute()
执行该函数会把一个任务提交给线程池.该函数的代码具体可以分为4大部分,即 :
1.会判断当前创建的工作线程是否已经达到设置的核心线程数,如果没有达到,则会创建工作线程,并把当前任务交给这个新创建的工作线程;如果达到了核心线程数,那么会走第1步骤:
2.把任务放进阻塞队列;如果走到第2步骤,说明当前线程池中已经创建好了核心线程数。当然,添加阻塞队列也有可能失败或者成功,如果添加成功,那么本次的执行就结束,如果添加失败,说明阻塞队列满了,那么会走第3步骤:
3.会判断当前创建的工作线程是否已经达到设置的最大线程数,如果没有达到,则会创建工作线程,并把当前任务交给这个新创建的工作线程;如果达到了最大线程数,那么会走第4步骤:
4.执行拒绝策略,走到这个步骤,说明当前的线程池处于"饱和" 状态。PS:“饱和”:创建的工作线程已经达到了最大线程数 且 阻塞队列满了。

public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
// 获取当前运行状态
int c = ctl.get();
1. // 根据workerCountOf()获取当前已创建的工作线程数
// 如果当前已创建的工作线程数小于核心线程数,则执行addWorker()创建工作线程执行任务
// addWorker() 下面具体分析
if (workerCountOf(c) < corePoolSize) {
1.1 // 创建工作线程并执行任务
if (addWorker(command, true))
// 创建工作线程成功,直接return
return;
1.2 // 上面的创建工作线程失败,说明已经达到核心线程数(或者处于STOP之后的状态)
// 重新获取最新的状态并且执行下面第2步骤
c = ctl.get();
}
2. // 来到这里说明当前创建的工作线程已经达到核心线程数了(或者处于STOP之后的状态)
// 如果处于运行状态,则尝试把任务添加到workQueue阻塞队列中
if (isRunning(c) && workQueue.offer(command)) {
// 任务添加到队列成功
int recheck = ctl.get();
2.1 // 当前已经不是运行状态了,删除当前任务,即SHUTDOWN之后的状态不接收新任务
if (! isRunning(recheck) && remove(command))
2.3 // 执行拒绝策略
reject(command);
2.4 // 当前是运行状态,但是由于已创建的工作线程数如果等于0,池中至少需要保留一条工作线程来执行任务
else if (workerCountOf(recheck) == 0)
2.4.1 // 创建工作线程
addWorker(null, false);
}
3. // 来到这里说明 :
// 当前线程池已经达到了第1步骤的corePoolSize核心线程数 并且
// 第2步骤的添加任务队列失败(阻塞队列饱和了)
// 则执行addWorker()创建工作线程(最大线程数,maximumPoolSize)
else if (!addWorker(command, false))
4. // addWorker()创建线程失败:当前处于饱和状态(任务队列满了 且 线程已经是最大线程数 或者 处于STOP状态)
// 则执行拒绝策略
reject(command);
}
// 小于0处于运行状态
private static boolean isRunning(int c) {
return c < SHUTDOWN;
}
// 通过handler执行拒绝策略
final void reject(Runnable command) {
handler.rejectedExecution(command, this);
}
基于上面的代码,重点需要分析addWorker().
addWorker(Runnable firstTask, boolean core)
该函数的作用是创建工作线程 :
- firstTask: 任务对象
- core : true为判断corePoolSize,false为判断maximumPoolSize
private boolean addWorker(Runnable firstTask, boolean core) {
1. // 检查是否需要创建线程
retry:
for (;;) {
1.1 // 当前运行状态
int c = ctl.get();
1.2 // 获取当前运行状态(高3bit)
int rs = runStateOf(c);
# 检查 start
SHUTDOWN状态 :不会接收新任务,如果阻塞队列有任务,会创建线程帮忙执行
STOP之后的状态(包括STOP):不会接受新任务,也不会创建线程帮忙执行
1.3 // 如果当前运行状态不是处于 RUNNING,需要进行检查,看看是否需要走1.4创建线程
// 这里注意一下 前面有个 !
// 如果当前处于 SHUTDOWN 并且 firstTask == null 都为true的情况下,说明 SHUTDOWN状态下不接收任务了;
// 并且 任务队列不为空 为true的情况下,说明任务队列还存在任务需要处理;
// 虽然目前处于SHUTDOWN,但还是会继续处理阻塞队列的任务,只是不接收任务了
// 如果当前处于处于 STOP 之后,直接不创建线程
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
## 根据 corePoolSize 或者 maximumPoolSize 继续检查是否需要创建线程来执行任务
1.4 // 通过上面1.3的检查,如果执行到这里说明当前需要创建线程来
for (;;) {
// 获取当前已创建的线程数
int wc = workerCountOf(c);
1.4.1 // 如果wc已经达到了创建的最大的容量的线程数 或者 判断以达到创建的线程数 ,则 不执行第 2 步骤 创建线程
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
// 线程创建失败
return false;
// 否则需要创建线程
1.4.2 // CAS设置ctl+1
if (compareAndIncrementWorkerCount(c))
// 跳出第1步骤的循环,执行第2步骤创建线程
break retry;
1.4.3 // 1.4.2 出现竞争了,重新获取当前运行状态
c = ctl.get();
// 当前线程数改变了,重新跳回第1步骤继续自旋
if (runStateOf(c) != rs)
continue retry;
}
}
## 检查 end
2. // 如果执行到这里则代表上面1.4.2步骤竞争成功,需要创建工作线程执行任务
// 创建的工作线程是否成功启动
boolean workerStarted = false;
// 是否成功创建了工作线程
boolean workerAdded = false;
2.1 // 开始创建线程对象,Worker是工作线程对象,具体下面再分析
Worker w = null;
try {
2.2 // 创建线程对象并分配当前任务
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
2.3 // 获取操作workers的锁(workers存放着已创建的工作线程对象)
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
2.3.1 // 获取当前运行状态
int rs = runStateOf(ctl.get());
2.3.2 // 如果处于RUNNING 或者
// 处于SHUTDOWN并且firstTask == null 这个上面分析过了
// 则把创建的线程添加到workers
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
// 线程是否处于活动状态
if (t.isAlive())
throw new IllegalThreadStateException();
// 执行添加,HashSet容器
workers.add(w);
int s = workers.size();
// largestPoolSize记录着线程池中出现过的最大工作线程数量
if (s > largestPoolSize)
largestPoolSize = s;
// 添加成功标识
workerAdded = true;
}
} finally {
// 释放操作workers的锁
mainLock.unlock();
}
2.4 // 添加成功标识
if (workerAdded) {
// 2.4.1 启动线程
t.start();
// 启动成功标识
workerStarted = true;
}
}
} finally {
3. // 没启动成功,做一个删除收尾的操作
if (! workerStarted)
addWorkerFailed(w);
}
4. // 最终根据是否启动成功这个标识来反映是否创建线程成功
return workerStarted;
}
具体分为以下步骤 :
- 1.检查是否需要创建线程来执行任务
- 1.3 : 如果当前处于SHUTDOWN状态,但是firstTask != null,那么不会执行下面的创建工作线程,此时的firstTask会被抛弃掉,表示不接收新任务,但是firstTask == null则还会往下执行判断当前阻塞队列中是否还存在任务,存在则需要创建工作线程来执行任务队列中存在的任务;
- 1.4 : 1.4.1步骤,根据corePoolSize 或者 maximumPoolSize 来判断需不需要创建工作线程,如果需要则尝试竞争(1.4.2),竞争成功则执行第2步骤.
- 2.如果执行到这里则代表上面1.4.2步骤成功,需要创建工作线程执行任务。
Worker --- 线程对象
// 继承了AQS、实现了Runnable接口
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{
# 属性
// 线程对象
final Thread thread;
// 任务对象
Runnable firstTask;
// 每条线程的执行任务计数器
volatile long completedTasks;
Worker(Runnable firstTask) {
// -1表示初始化状态
setState(-1);
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
// t.start()执行的是runWorker(),具体下面分析
public void run() {
runWorker(this);
}
// 当前Worker对象的锁状态
protected boolean isHeldExclusively() {
return getState() != 0;
}
// 独占式获取锁,并且是不可重入的
protected boolean tryAcquire(int unused) {
// CAS设置为1,成功则表示获取到当前Worker对象
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
// 独占式释放锁
protected boolean tryRelease(int unused) {
setExclusiveOwnerThread(null);
// 0表示空闲状态
setState(0);
return true;
}
// 阻塞的方式获取锁
public void lock() { acquire(1); }
// 非阻塞的方式获取锁
public boolean tryLock() { return tryAcquire(1); }
// 释放锁
public void unlock() { release(1); }
// 获取当前Worker对象的锁状态
public boolean isLocked() { return isHeldExclusively(); }
// 中断当前Worker线程
void interruptIfStarted() {
Thread t;
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
}
- Worker实现了Runnable接口,当创建的Worker对象,并且调用该对象封装的thread.start()时,最终执行的是 runWorker(),具体代码下面分析.
- Worker继承了AQS,使用了独占式获取锁的方式,并且是不可重入的:
- 防止同一时刻同一工作线程执行多个任务;
- 防止一些工作线程在执行当前任务的过程中被中断,保证正在执行的任务可以顺利执行完;
- 不可重入:可以获取到当前活跃状态(空闲或者执行)的工作线程;
- AQS相关文章 : 并发编程基础:ReentrantLock之AQS独占式源码分析
runWorker(Worker w) --- run()
当Thread.start()时则会运行该函数.
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
// 将-1初始化状态设置为0
w.unlock();
// 是否因为异常退出循环
boolean completedAbruptly = true;
# 开始获取任务执行
try {
## while循环获取Task start
1. // 通过getTask()获取任务,如果获取为null则退出循环,线程也被回收
while (task != null || (task = getTask()) != null) {
1.1 // 加锁,即当前Worker在执行任务
w.lock();
## 是否需要中断当前线程 start
// 检查是否需要中断线程 :
#1 // 当前运行状态处于 STOP 之后的状态(包括STOP),那么是需要中断当前线程的,则走#3条件的判断;
// 否则,当前运行状态处于 RUNNING 或者 SHUTDOWN,不需要中断线程。那么需要判断#2条件的另一种可能;
#2 // 根据interrupted()判断一下当前线程是否中断状态,并且清除中断标识(设置为false),
//如果是中断状态,继续判断 当前运行状态处于 STOP 之后的状态(包括STOP),如果是,那么是需要中断当前线程,则走#3条件的判断;
#3 // isInterrupted()会判断一下当前线程的中断标识,如果为true,说明是中断状态,不需要执行1.2步骤,
// 如果是false,说明不是处于中断状态,需要执行1.2步骤中断线程。
#小结一下
#如果#1为true,那么肯定是要中断线程的,会根据#3来判断是否执行中断线程
#如果#1为false,但是#2可能为ture,即当前线程是中断状态(可能是线程池执行的业务代码设置的)
#并且 当前运行状态处于 STOP 之后的状态(包括STOP),那么也是要中断线程,会根据#3来判断是否执行中断线程
#从这里可以看出,即使我们在业务代码去中断线程,线程池如果不是STOP之后的状态,线程也不会被设置中断,
#而是通过Thread.interrupted()重置了中断标识,
#只有通过shutdownNow()强制关闭 (该函数等下具体分析)才能真正中断并回收线程。
#当然我们也可以在task中抛出异常,让task.run()捕获到异常,这样会使这个线程被回收,不建议这么做。
if (
( #1 runStateAtLeast(ctl.get(), STOP) ||
#2 ( Thread.interrupted() && runStateAtLeast(ctl.get(), STOP)) ) &&
#3!wt.isInterrupted())
1.2 // 中断线程,此时的中断标识变为true
wt.interrupt();
## 是否需要中断当前线程 end
// 值得注意的是,Java的中断线程并不是直接中断线程,而是一种协调机制,
// 所以即使如1.2步骤执行了中断线程,但还是会继续往下执行,但是task对象可以根据这些isInterrupted()来判断当前线程是中断状态
1.3 // 开始执行
try {
// 执行任务前的函数,该函数为空,可以子类来实现,这是线程池的一个扩展
beforeExecute(wt, task);
Throwable thrown = null;
try {
//当我们在task执行出现异常时,Runnable.run()不能捕获到,
//需要做一层处理,这里才会走catch,这种做法不建议,因为这里走catch,那么该线程会直接跳出循环,最终被回收、重新创建线程
1.4 // 执行任务
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
// 执行任务后的函数,该函数为空,可以子类来实现,这是线程池的一个扩展
afterExecute(task, thrown);
}
} finally {
// GC
task = null;
// 统计次数
w.completedTasks++;
// 释放
w.unlock();
}
}
// 如果没有获取到任务,那么while循环就结束了
## while循环获取Task end
// 不是因为异常退出,是正常退出
// 当我们在task类处理成抛出异常能被task.run()捕获时,这个值才会是true
completedAbruptly = false;
} finally {
// 退出当前线程后需要清理当前线程,具体下面分析
processWorkerExit(w, completedAbruptly);
}
}
while循环获取task :
- 根据getTask()获取任务,该函数下面再具体分析;
- 如果通过getTask()获取到任务,则开始执行,否则可能是阻塞队列没有任务,那么会阻塞,返回null的话是退出,那么该线程会被剔除(processWorkerExit());
- 根据 当前线程池运行状态 判断是否需要中断线程;
- Java的中断机制具体可以看看这篇文章 : Thread的中断机制(interrupt)
- 无论有没有执行wt.interrupt()来中断线程,都会继续往下执行代码;(Java的中断机制只是协调,并不会真的强制中断,最终中不中断我们可以在task的业务代码来控制);例如我们可以在task中通过中断标识或者InterruptedException来检测中断进行抛出异常,这样1.4步骤task.run();就可以捕获到,那么当前的线程就会异常退出、回收、重新创建,但是不建议这么做;
- 最终调用1.4步骤的 task.run() , 这里才是真正的执行任务;
- 获取task为空时,退出while循环,执行processWorkerExit()回收线程资源。
- 【PS : beforeExecute() 、 afterExecute() 是ThreadPoolExecutor扩展的函数】
getTask() --- 获取任务
从阻塞队列中获取任务.
private Runnable getTask() {
boolean timedOut = false;
* //自旋
for (;;) {
// 获取当前运行状态
int c = ctl.get();
// 保留高3bit位
int rs = runStateOf(c);
1. // 如果当前运行状态处于SHUTDOWN 之后(包括SHUTDOWN) ,可能需要回收线程, 继续判断 :
// 处于 STOP 之后,不管队列的任务了,直接回收线程;
// 或者 处于SHUTDOWN 并且阻塞队列为空的话,也回收线程 :
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
// 线程数减1
decrementWorkerCount();
//返回null,退出上一次的while循环
return null;
}
// 获取当前线程数
int wc = workerCountOf(c);
2. // allowCoreThreadTimeOut:是否需要回收核心线程,
// true表示没有核心线程的概念,全部都是非核心线程,默认为false
// wc > corePoolSize : 当前创建的工作线程是否大于核心线程数
//两个条件只要符合其中一个,那么当前存在非核心线程需要回收,
//非核心线程从阻塞队列获取任务时,会有个时间限制,超过这个时间没有新任务的到来,那么将会被回收
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
// wc > maximumPoolSize的情况是因为可能在此方法执行阶段同时执行了setMaximumPoolSize方法重置了
// timed && timedOut 如果为true,表示当前存在需要回收的非核心线程,
// 并且上次从阻塞队列中获取任务发生了超时,需要回收非核心线程
// 接下来判断,如果当前工作线程数量至少2条时,或者阻塞队列是空的,那么尝试将workerCount减1,即回收非核心线程
// 如果wc == 1时,并且阻塞队列还有任务,那么需要保留至少一条线程
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
//CAS设置线程数减1
if (compareAndDecrementWorkerCount(c))
return null;
// 如果减1失败,则返回重试
continue;
}
try {
3. // 根据timed判断是否从workQueue超时阻塞式获取任务
// workQueue是生产者-消费者模式,所以队列为null的时候,获取任务会进入阻塞状态
// 如果 timed 为true : 则表示需要回收非核心线程,超时阻塞式获取任务
// 如果 timed 为false : 则表示不需要回收非核心线程,非超时阻塞式获取
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
//成功获取到任务
if (r != null)
return r;
// 在规定的时间里没有获取到任务
timedOut = true;
} catch (InterruptedException retry) {
// 被中断唤醒,没有超时
timedOut = false;
}
}
}
- 1.处于SHUTDOWN状态会继续执行阻塞队列中的任务,而处于STOP之后的状态,即使阻塞队列还存在任务也不处理了,直接获取任务失败,回收线程.
- 2.超时控制 : 如果希望核心线程也被回收(allowCoreThreadTimeOut为true)或者 创建的线程数 大于 corePoolSize时(存在非核心线程) ,则存在非核心线程,那么非核心线程在获取任务时,需要根据 keepAliveTime 来超时阻塞式获取任务,如果在这段时间没有新任务的到来,那么这些工作线程就会被回收;如果核心线程不被回收(allowCoreThreadTimeOut为false)或者当前创建的线程数 小于等于 corePoolSize时 , 并且此时阻塞队列没有任务,那么这些线程将阻塞在workQueue.take()中,等待新的任务到来并唤醒.
以下的情况,getTask()获取为null :
- 当前运行状态处于 STOP 之后(包括STOP),即使阻塞队列存在任务;
- 当前运行状态处于 SHUTDOWN,并且阻塞队列没有任务了;
- 当前allowCoreThreadTimeOut为true,即没有核心线程的概念,或者 创建的线程数 大于 corePoolSize,并且阻塞队列没有任务了,在限定的时间内没有等到阻塞队列新任务的到来,那么非核心线程会被回收;
当出现以上三种情况时,那么将跳出runWorker()的while循环,执行processWorkerExit()清理工作线程.
processWorkerExit() --- 清理工作线程
private void processWorkerExit(Worker w, boolean completedAbruptly) {
// 是否为异常而退出
if (completedAbruptly)
// 是,则需要设置ctl线程数减1
decrementWorkerCount();
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 统计任务完成次数
completedTaskCount += w.completedTasks;
// 删除工作线程
workers.remove(w);
} finally {
mainLock.unlock();
}
// 判断当前线程池是否设置为TIDYING、TERMINATED状态,具体代码等下再分析
tryTerminate();
// 获取当前运行状态
int c = ctl.get();
// 如果 当前运行状态是 RUNNING 或者 SHUTDOWN
if (runStateLessThan(c, STOP)) {
// 工作线程是正常退出
if (!completedAbruptly) {
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
// workQueue存在任务,至少保留1个工作线程
if (min == 0 && ! workQueue.isEmpty())
min = 1;
// 当前工作线程数大于等于1条,不创建了
if (workerCountOf(c) >= min)
return;
}
// 创建线程 :
//2种情况:异常退出,重新创建线程 或者
//当前运行状态是 RUNNING 或者 SHUTDOWN,线程池中还存在任务,需要至少保留1条线程
addWorker(null, false);
}
}
到目前为止,线程池的生命周期还没说完,但是工作线程从创建到死亡的生命周期已经可以做一下总结了.
- 1.当工作线程数 小于 corePoolSize 时, 我们需要调用 addWorker() 创建线程来执行任务;
- 2.当线程数 大于等于 corePoolSize 时,此时不是优先考虑创建工作线程,而是将任务添加到阻塞队列中,如果添加失败,说明阻塞队列已经放满了任务,那么此时需要调用 addWorker() 创建线程(maximumPoolSize) .
- 3.此时的阻塞队列满了 并且 工作线程数 已经达到了 maximumPoolSize , 属于饱和状态,那么执行拒绝策略
- 4.通过 addWorker() 成功创建的线程(Worker)并分配任务(Runneble),然后通过调用 start() 启动执行任务,最终调用的是 runWorker();
- 5.每条线程执行runWorker()是在一个while循环,即线程每处理完一个任务,都会通过getTask()继续从阻塞队列中获取任务
- 6.getTask() 控制着工作线程是否会被销毁 :
- 当前运行状态处于 STOP 之后的状态(包括STOP),即使阻塞队列存在任务,也会返回给工作线程会null,跳出while循环;
- 当前运行状态处于 SHUTDOWN 的状态,同时阻塞队列没有任务,也会返回给工作线程会null,跳出while循环;
- 当前allowCoreThreadTimeOut为true,即没有核心线程的概念,或者 创建的线程数 大于 corePoolSize,并且阻塞队列没有任务了,在限定的时间内没有等到阻塞队列新任务的到来,也会返回给工作线程会null,跳出while循环;
- 当前创建的线程数 小于等于 corePoolSize,并且阻塞队列没有任务了,那么这些核心线程将被阻塞在workQueue.take()中,等待任务的到来唤醒.
- 7.跳出runWorker()的while循环后,将执行processWorkerExit()销毁工作线程.
线程池的生命周期
前面说到线程池的五种运行状态,代表着线程池的生命周期,从运行状态到死亡状态.
execute()
这个函数我们分析过了;当通过构造器创建线程池后,线程池处于运行状态(RUNNING),对应的如下代码 :
// 默认为 RUNNING
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
shutdown() --- 温和的关闭线程池
当线程池处于运行状态时,如果调用shutdown(),则线程池进入SHUTDOWN状态,结合上面execute()代码分析,我们可以知道 : 当处于 SHUTDOWN状态时, 是不接收新来的任务,但是会处理已经开始运行的任务 和 阻塞队列的任务, 处理完之后,全部已经创建的线程交由给JVM清理回收.
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 检查
checkShutdownAccess();
// 设置线程池的运行状态为 SHUTDOWN
advanceRunState(SHUTDOWN);
// 中断空闲线程
interruptIdleWorkers();
// 这个是其他类的实现,可以忽略
onShutdown();
} finally {
mainLock.unlock();
}
// 该函数下面分析
tryTerminate();
}
# 设置线程池的运行状态为 SHUTDOWN
private void advanceRunState(int targetState) {
for (;;) {
// 获取当前运行状态
int c = ctl.get();
// 如果当前运行状态处于 SHUTDOWN 之后,那么不需要设置了
// 否则 高3bit位设置为 SHUTDOWN状态 , 低29bit位 保留线程数
if (runStateAtLeast(c, targetState) ||
ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c))))
// 退出
break;
}
}
private void interruptIdleWorkers() {
interruptIdleWorkers(false);
}
#最终调用 interruptIdleWorkers() : 中断空闲线程
private void interruptIdleWorkers(boolean onlyOne) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers) {
Thread t = w.thread;
// tryLock()是不可重入的,所以如果处于空闲状态则返回true
if (!t.isInterrupted() && w.tryLock()) {
try {
// 中断空闲线程
t.interrupt();
} catch (SecurityException ignore) {
} finally {
w.unlock();
}
}
// 是否只中断一条空闲线程
if (onlyOne)
break;
}
} finally {
mainLock.unlock();
}
}
当调用shutdown()时,是线程池是处于SHUTDOWN状态,从execute()分析的线程的生命周期可以知道,该状态下是不接受新来的任务,但是会处理剩下的任务.
shutdownNow() --- 暴力强制的关闭线程池
相比shutdown(),shutdownNow()是非常暴力的;当调用shutdownNow()时,线程池会处于STOP状态,结合上面execute()代码分析,我们可以知道 : 处于此状态下,不接收新任务,并且阻塞队列的任务也不处理了,但是这些阻塞队列的任务会被保留返回.具体可以看看 drainQueue().
public List<Runnable> shutdownNow() {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
// 更改位STOP状态,上面代码分析了
advanceRunState(STOP);
// 中断线程
interruptWorkers();
// 返回阻塞队列中没有处理的任务
tasks = drainQueue();
} finally {
mainLock.unlock();
}
// 该函数下面分析
tryTerminate();
return tasks;
}
# 中断线程
private void interruptWorkers() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers)
w.interruptIfStarted();
} finally {
mainLock.unlock();
}
}
# 返回阻塞队列中没有处理的任务
private List<Runnable> drainQueue() {
BlockingQueue<Runnable> q = workQueue;
ArrayList<Runnable> taskList = new ArrayList<Runnable>();
q.drainTo(taskList);
if (!q.isEmpty()) {
for (Runnable r : q.toArray(new Runnable[0])) {
if (q.remove(r))
taskList.add(r);
}
}
return taskList;
}
值得注意的是,Java的中断机制只是一种抛出异常的协调机制,并不是调用中断函数就会立马中断当前线程,也就是说如果调用中断函数后,当前线程的Task中如果没有对抛出的异常采用处理中断异常,那么当前的任务也会继续执行完的;
tryTerminate()
当线程退出时,会调用tryTerminate() 进行收尾操作 :
final void tryTerminate() {
for (;;) {
// 获取当前运行状态
int c = ctl.get();
// 1. 当前运行状态是 RUNNING 暂时不设置TIDYING、TERMINATED
// 2. 或者 当前运行状态 处于 TIDYING 或者 TERMINATED, 那么也是暂时不设置TIDYING、TERMINATED
// 3. 或者 就是在 SHUTDOWN ,并且存在任务需要处理,那么也是暂时不设置TIDYING、TERMINATED
// 4. 否则 就是STOP,那么需要往下走
if (isRunning(c) ||
runStateAtLeast(c, TIDYING) ||
(runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
return;
// 执行到这里 说明处于 STOP状态, 但是还存在工作线程没处理完,帮忙中断线程
// 如果线程数量不为0,则中断一个空闲的工作线程,并返回
if (workerCountOf(c) != 0) {
interruptIdleWorkers(ONLY_ONE);
return;
}
// 执行到这里 说明处于 SHUTDOWN(阻塞队列没有任务了) 或者 STOP
// 那么设置 TIDYING、TERMINATED
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 设置 TIDYING
if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
try {
// terminated方法默认什么都不做,留给子类实现
terminated();
} finally {
// 设置 TERMINATED
ctl.set(ctlOf(TERMINATED, 0));
termination.signalAll();
}
return;
}
} finally {
mainLock.unlock();
}
}
}
至此,线程池的生命周期就结束了,如下图 :

PS:上图中强制关闭,正在执行任务中的线程由于锁的原因不会被中断到,但是会唤醒阻塞在阻塞队列的线程。
结束语
- 原创不易
- 希望看完这篇文章的你有所收获!
相关参考资料
- JDK1.8