把学习当成一种习惯
选择往往大于努力,越努力越幸运

本文对源码的学习是基于JDK1.8版本.

前言

线程池作用 :

  • 提高资源利用率 : 通过重复利用已经创建好的线程,减少重复创建、销毁造成的消耗;
  • 提高吞吐量 : 异步执行任务,即当任务到达时,任务可以可以直接放入阻塞队列中,等待线程的执行;
  • 提高代码可维护性 : 使用线程池可以进行统一分配、调优和监控.

线程池的五种运行状态

几个相关的变量 :


    // 运行时状态,默认初始化后为RUNNING状态
    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
    // 29bit位
    private static final int COUNT_BITS = Integer.SIZE - 3;
    1. // 线程池中工作线程的最大容量 : 11111 11111111 11111111 11111111 (29个1)
    private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

    2. // 五种运行状态 : 
    2.1 // 运行状态 : 101 00000 00000000 00000000 00000000
    private static final int RUNNING    = -1 << COUNT_BITS;
    2.2 // 关闭状态 : 000 00000 00000000 00000000 00000000
    private static final int SHUTDOWN   =  0 << COUNT_BITS;
    2.3 // 强制关闭 : 001 00000 00000000 00000000 00000000
    private static final int STOP       =  1 << COUNT_BITS;
    2.4 // 结束状态 : 010 00000 00000000 00000000 00000000
    private static final int TIDYING    =  2 << COUNT_BITS;
    2.5 // 死亡状态 : 011 00000 00000000 00000000 00000000
    private static final int TERMINATED =  3 << COUNT_BITS;
    
    // PS : 
    // ctl的高3bit位 代表着 五种不同运行状态
    // ctl的低29bit位 代表着 当前创建的线程数
    // 这五种状态大致可以分为 :
    // 运行状态(小于0) , 此状态在没饱和状态下会 创建工作线程、接收任务、处理任务、回收工作线程
    // 关闭状态(等于0,SHUTDOWN) , 此状态在没饱和状态下会 创建工作线程、处理已接收任务、但是不接收新任务、处理完全部任务最终回收工作线程
    // 强制关闭状态(大于0,STOP) : 此状态下不会创建工作线程、已接受的任务也不会处理、新任务也不会接收,而是强制中断运行工作线程、回收工作线程
    // 其他状态(大于0,TIDYING \ TERMINATED) 此状态下工作线程已经全部完成回收,进入结束、死亡状态

如下图 : 线程池的五种运行状态

ctl运行状态的相关计算函数 :


    // 获取当前线程池运行状态
    private static int runStateOf(int c)     { return c & ~CAPACITY; }
    // 获取当前线程池中已创建的工作线程数量
    private static int workerCountOf(int c)  { return c & CAPACITY; }
    // 更换运行状态,并且根据当前线程池中已创建的工作线程数量(workerCountOf())来保留当前的工作线程数量;
    // 这个函数最终的作用就是改变了高3bit为的运行状态,并且低29bit位保留了之前创建的工作线程数量
    private static int ctlOf(int rs, int wc) { return rs | wc; }

  • runStateOf(int c) : 根据ctl获取当前线程池运行状态
    • c : 运行期间的ctl状态值
    • ~CAPACITY : 0 , 即29个1全部变为0
    • c & ~CAPACITY : 最终只保留高3bit位,由此可以得到当前线程池的运行状态
  • workerCountOf(int c) : 根据ctl获取当前线程池中已创建的工作线程数量
    • c : 运行期间的ctl状态值
    • CAPACITY : 29个1
    • c & CAPACITY : 最终只保留低29bit位,由此可以得到当前线程池中已创建的工作线程数量
  • ctlOf(int rs, int wc) :
    • rs : 当前需要改变的最新运行状态,例如由RUNNING变为SHUTDOWN的情况下,那么rs传送的就是SHUTDOWN值
    • wc : 当前线程池中已创建的工作线程数量,根据workerCountOf()获取
    • rs | wc : 最终是高3bit位更换了状态,低29bit位保留了当前创建的工作线程数

构造器

 
// 这个使用的默认线程工厂、默认拒绝策略defaultHandler
 public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,
                           long keepAliveTime,TimeUnit unit,
                           BlockingQueue<Runnable> workQueue) {
        this(corePoolSize, maximumPoolSize, 
        keepAliveTime, unit, workQueue,
        Executors.defaultThreadFactory(), 
        defaultHandler);
 }
                
 // 这个是最终调用的构造器
 public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,
                           long keepAliveTime,TimeUnit unit,
                           BlockingQueue<Runnable> workQueue,
                           ThreadFactory threadFactory,
                           RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
  }
 
 4. // 默认使用的拒绝策略 : 直接抛出异常
 private static final RejectedExecutionHandler defaultHandler =
        new AbortPolicy();

6个参数说明 :

  • corePoolSize : 核心线程数,即该值保证了线程池中至少创建的corePoolSize条工作线程
  • workQueue : 阻塞队列(任务队列,生产者-消费者队列),作用于保存等待执行的任务的阻塞队列;该参数可为无界阻塞队列、有界阻塞队列、同步队列(SynchronousQueue)、优先队列、双端队列等等.
  • maximumPoolSize : 最大线程数,即如果阻塞队列满了且当前创建的工作线程数小于maximumPoolSize,则需要创建新的线程来帮忙执行阻塞队列中的任务;对于workQueue参数如果为无界阻塞队列,则maximumPoolSize参数无效.(LinkedBlockingQueue创建的时候如果没有传入容量,则使用的是Integer.MAX_VALUE,理论上来说是无界队列了)
  • keepAliveTime : 当线程池中的工作线程数量大于corePoolSize的时候,如果这时没有新的任务提交(workQueue为空),线程在向workQueue获取任务的时候会进入等待状态,直到等待的时间超过了keepAliveTime,那么非核心线程将会被回收、销毁.说白了就是当线程池的线程大于corePoolSize的时候且阻塞队列没有任务,此时不需要更多的线程占用空间了,那么将回收核心线程数以外的线程,回收的这些线程是因为在keepAliveTime时间内没有获取到任务.
  • threadFactory : 线程工厂.默认使用Executors的静态内部类DefaultThreadFactory
  • handler : 拒绝策略,父接口是RejectedExecutionHandler,其中有四种不同的拒绝策略实现了该接口,默认使用AbortPolicy,即抛出异常。当然,我们也可以根据自己的场景来自定义拒绝策略。

execute()

执行该函数会把一个任务提交给线程池.该函数的代码具体可以分为4大部分,即 :

1.会判断当前创建的工作线程是否已经达到设置的核心线程数,如果没有达到,则会创建工作线程,并把当前任务交给这个新创建的工作线程;如果达到了核心线程数,那么会走第1步骤:
2.把任务放进阻塞队列;如果走到第2步骤,说明当前线程池中已经创建好了核心线程数。当然,添加阻塞队列也有可能失败或者成功,如果添加成功,那么本次的执行就结束,如果添加失败,说明阻塞队列满了,那么会走第3步骤:
3.会判断当前创建的工作线程是否已经达到设置的最大线程数,如果没有达到,则会创建工作线程,并把当前任务交给这个新创建的工作线程;如果达到了最大线程数,那么会走第4步骤:
4.执行拒绝策略,走到这个步骤,说明当前的线程池处于"饱和" 状态。PS:“饱和”:创建的工作线程已经达到了最大线程数 且 阻塞队列满了。


public void execute(Runnable command) {
         
        if (command == null)
            throw new NullPointerException();
        // 获取当前运行状态
        int c = ctl.get();
        1. // 根据workerCountOf()获取当前已创建的工作线程数
        // 如果当前已创建的工作线程数小于核心线程数,则执行addWorker()创建工作线程执行任务
        // addWorker() 下面具体分析
        if (workerCountOf(c) < corePoolSize) {
            1.1 // 创建工作线程并执行任务
            if (addWorker(command, true))
                // 创建工作线程成功,直接return
                return;
            1.2 // 上面的创建工作线程失败,说明已经达到核心线程数(或者处于STOP之后的状态)
            // 重新获取最新的状态并且执行下面第2步骤
            c = ctl.get();
        }
        2. // 来到这里说明当前创建的工作线程已经达到核心线程数了(或者处于STOP之后的状态)
        // 如果处于运行状态,则尝试把任务添加到workQueue阻塞队列中
        if (isRunning(c) && workQueue.offer(command)) {
            // 任务添加到队列成功
            int recheck = ctl.get();
            2.1 // 当前已经不是运行状态了,删除当前任务,即SHUTDOWN之后的状态不接收新任务
            if (! isRunning(recheck) && remove(command))
                2.3 // 执行拒绝策略
                reject(command);
            2.4 // 当前是运行状态,但是由于已创建的工作线程数如果等于0,池中至少需要保留一条工作线程来执行任务
            else if (workerCountOf(recheck) == 0)
                2.4.1 // 创建工作线程
                addWorker(null, false);
        }
        3. // 来到这里说明 : 
        // 当前线程池已经达到了第1步骤的corePoolSize核心线程数 并且
        // 第2步骤的添加任务队列失败(阻塞队列饱和了)
        // 则执行addWorker()创建工作线程(最大线程数,maximumPoolSize)
        else if (!addWorker(command, false))
            4. // addWorker()创建线程失败:当前处于饱和状态(任务队列满了 且 线程已经是最大线程数 或者 处于STOP状态)
            // 则执行拒绝策略
            reject(command);
    }
    
    // 小于0处于运行状态
    private static boolean isRunning(int c) {
        return c < SHUTDOWN;
    }

    // 通过handler执行拒绝策略
    final void reject(Runnable command) {
        handler.rejectedExecution(command, this);
    }

基于上面的代码,重点需要分析addWorker().

addWorker(Runnable firstTask, boolean core)

该函数的作用是创建工作线程 :

  • firstTask: 任务对象
  • core : true为判断corePoolSize,false为判断maximumPoolSize

    private boolean addWorker(Runnable firstTask, boolean core) {
    
        1. // 检查是否需要创建线程
        retry:
        for (;;) {
            1.1 // 当前运行状态
            int c = ctl.get();
            1.2 // 获取当前运行状态(高3bit)
            int rs = runStateOf(c);

            # 检查 start
            
            SHUTDOWN状态 :不会接收新任务,如果阻塞队列有任务,会创建线程帮忙执行
            STOP之后的状态(包括STOP):不会接受新任务,也不会创建线程帮忙执行
            
            1.3 // 如果当前运行状态不是处于 RUNNING,需要进行检查,看看是否需要走1.4创建线程
            // 这里注意一下 前面有个 !
            // 如果当前处于 SHUTDOWN 并且 firstTask == null 都为true的情况下,说明 SHUTDOWN状态下不接收任务了;
            // 并且 任务队列不为空 为true的情况下,说明任务队列还存在任务需要处理;
            // 虽然目前处于SHUTDOWN,但还是会继续处理阻塞队列的任务,只是不接收任务了
            // 如果当前处于处于 STOP 之后,直接不创建线程
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;

            ## 根据 corePoolSize 或者 maximumPoolSize 继续检查是否需要创建线程来执行任务
            
            1.4 // 通过上面1.3的检查,如果执行到这里说明当前需要创建线程来
            for (;;) {
                // 获取当前已创建的线程数
                int wc = workerCountOf(c);
                1.4.1 // 如果wc已经达到了创建的最大的容量的线程数 或者 判断以达到创建的线程数 ,则 不执行第 2 步骤 创建线程
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    // 线程创建失败
                    return false;
                // 否则需要创建线程
                1.4.2 // CAS设置ctl+1
                if (compareAndIncrementWorkerCount(c))
                    // 跳出第1步骤的循环,执行第2步骤创建线程
                    break retry;
                1.4.3 // 1.4.2 出现竞争了,重新获取当前运行状态
                c = ctl.get();
                // 当前线程数改变了,重新跳回第1步骤继续自旋
                if (runStateOf(c) != rs)
                    continue retry;
            }
        }
        
        ## 检查 end

        2. // 如果执行到这里则代表上面1.4.2步骤竞争成功,需要创建工作线程执行任务
        
        // 创建的工作线程是否成功启动
        boolean workerStarted = false;
        // 是否成功创建了工作线程
        boolean workerAdded = false;
        
        2.1 // 开始创建线程对象,Worker是工作线程对象,具体下面再分析
        Worker w = null;
        try {
            2.2 // 创建线程对象并分配当前任务
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                2.3 // 获取操作workers的锁(workers存放着已创建的工作线程对象)
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    
                    2.3.1 // 获取当前运行状态
                    int rs = runStateOf(ctl.get());
                    
                    2.3.2 // 如果处于RUNNING 或者 
                    // 处于SHUTDOWN并且firstTask == null 这个上面分析过了
                    // 则把创建的线程添加到workers
                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        // 线程是否处于活动状态
                        if (t.isAlive())
                            throw new IllegalThreadStateException();
                        // 执行添加,HashSet容器
                        workers.add(w);
                        int s = workers.size();
                        // largestPoolSize记录着线程池中出现过的最大工作线程数量
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        // 添加成功标识
                        workerAdded = true;
                    }
                } finally {
                    // 释放操作workers的锁
                    mainLock.unlock();
                }
                2.4 // 添加成功标识
                if (workerAdded) {
                    // 2.4.1 启动线程
                    t.start();
                    // 启动成功标识
                    workerStarted = true;
                }
            }
        } finally {
            3. // 没启动成功,做一个删除收尾的操作
            if (! workerStarted)
                addWorkerFailed(w);
        }
        4. // 最终根据是否启动成功这个标识来反映是否创建线程成功
        return workerStarted;
    }

具体分为以下步骤 :

  • 1.检查是否需要创建线程来执行任务
    • 1.3 : 如果当前处于SHUTDOWN状态,但是firstTask != null,那么不会执行下面的创建工作线程,此时的firstTask会被抛弃掉,表示不接收新任务,但是firstTask == null则还会往下执行判断当前阻塞队列中是否还存在任务,存在则需要创建工作线程来执行任务队列中存在的任务;
    • 1.4 : 1.4.1步骤,根据corePoolSize 或者 maximumPoolSize 来判断需不需要创建工作线程,如果需要则尝试竞争(1.4.2),竞争成功则执行第2步骤.
  • 2.如果执行到这里则代表上面1.4.2步骤成功,需要创建工作线程执行任务。

Worker --- 线程对象


    // 继承了AQS、实现了Runnable接口
    private final class Worker
        extends AbstractQueuedSynchronizer
        implements Runnable
    {
    
    # 属性
    // 线程对象
    final Thread thread;
    // 任务对象
    Runnable firstTask;
    // 每条线程的执行任务计数器
    volatile long completedTasks;
    
    Worker(Runnable firstTask) {
        // -1表示初始化状态
        setState(-1);
        this.firstTask = firstTask;
        this.thread = getThreadFactory().newThread(this);
    }
    
    // t.start()执行的是runWorker(),具体下面分析
    public void run() {
        runWorker(this);
    }
    
    // 当前Worker对象的锁状态
    protected boolean isHeldExclusively() {
            return getState() != 0;
    }
    
    // 独占式获取锁,并且是不可重入的
    protected boolean tryAcquire(int unused) {
        // CAS设置为1,成功则表示获取到当前Worker对象
        if (compareAndSetState(0, 1)) {
            setExclusiveOwnerThread(Thread.currentThread());
            return true;
        }
        return false;
    }
    
    // 独占式释放锁
    protected boolean tryRelease(int unused) {
        setExclusiveOwnerThread(null);
        // 0表示空闲状态
        setState(0);
        return true;
    }
    
    // 阻塞的方式获取锁
    public void lock()        { acquire(1); }
    // 非阻塞的方式获取锁
    public boolean tryLock()  { return tryAcquire(1); }
    // 释放锁
    public void unlock()      { release(1); }
    // 获取当前Worker对象的锁状态
    public boolean isLocked() { return isHeldExclusively(); }
    
    // 中断当前Worker线程
    void interruptIfStarted() {
        Thread t;
        if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
            try {
                t.interrupt();
            } catch (SecurityException ignore) {
            }
        }
    }
    
    }

  • Worker实现了Runnable接口,当创建的Worker对象,并且调用该对象封装的thread.start()时,最终执行的是 runWorker(),具体代码下面分析.
  • Worker继承了AQS,使用了独占式获取锁的方式,并且是不可重入的:
    • 防止同一时刻同一工作线程执行多个任务;
    • 防止一些工作线程在执行当前任务的过程中被中断,保证正在执行的任务可以顺利执行完;
    • 不可重入:可以获取到当前活跃状态(空闲或者执行)的工作线程;
    • AQS相关文章 : 并发编程基础:ReentrantLock之AQS独占式源码分析

runWorker(Worker w) --- run()

当Thread.start()时则会运行该函数.


    final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        // 将-1初始化状态设置为0
        w.unlock();
        // 是否因为异常退出循环
        boolean completedAbruptly = true;
        
        # 开始获取任务执行
        
        try {
            
            ## while循环获取Task start
            
            1. // 通过getTask()获取任务,如果获取为null则退出循环,线程也被回收
            while (task != null || (task = getTask()) != null) {
                1.1 // 加锁,即当前Worker在执行任务
                w.lock();
                
                ## 是否需要中断当前线程 start
                
                // 检查是否需要中断线程 :
                #1 // 当前运行状态处于 STOP 之后的状态(包括STOP),那么是需要中断当前线程的,则走#3条件的判断;
                // 否则,当前运行状态处于 RUNNING 或者 SHUTDOWN,不需要中断线程。那么需要判断#2条件的另一种可能;
                #2 // 根据interrupted()判断一下当前线程是否中断状态,并且清除中断标识(设置为false),
                //如果是中断状态,继续判断 当前运行状态处于 STOP 之后的状态(包括STOP),如果是,那么是需要中断当前线程,则走#3条件的判断;
                #3 // isInterrupted()会判断一下当前线程的中断标识,如果为true,说明是中断状态,不需要执行1.2步骤,
                // 如果是false,说明不是处于中断状态,需要执行1.2步骤中断线程。
                
                #小结一下
                #如果#1为true,那么肯定是要中断线程的,会根据#3来判断是否执行中断线程
                #如果#1为false,但是#2可能为ture,即当前线程是中断状态(可能是线程池执行的业务代码设置的)
                #并且 当前运行状态处于 STOP 之后的状态(包括STOP),那么也是要中断线程,会根据#3来判断是否执行中断线程
                #从这里可以看出,即使我们在业务代码去中断线程,线程池如果不是STOP之后的状态,线程也不会被设置中断,
                #而是通过Thread.interrupted()重置了中断标识,
                #只有通过shutdownNow()强制关闭 (该函数等下具体分析)才能真正中断并回收线程。
                #当然我们也可以在task中抛出异常,让task.run()捕获到异常,这样会使这个线程被回收,不建议这么做。
                
                if (
                ( #1 runStateAtLeast(ctl.get(), STOP) ||
                  #2 ( Thread.interrupted() && runStateAtLeast(ctl.get(), STOP)) ) &&
                  #3!wt.isInterrupted())
                    1.2 // 中断线程,此时的中断标识变为true
                    wt.interrupt();
                
                ## 是否需要中断当前线程 end
                
                // 值得注意的是,Java的中断线程并不是直接中断线程,而是一种协调机制,
                // 所以即使如1.2步骤执行了中断线程,但还是会继续往下执行,但是task对象可以根据这些isInterrupted()来判断当前线程是中断状态
                
                1.3 // 开始执行
                try {
                    // 执行任务前的函数,该函数为空,可以子类来实现,这是线程池的一个扩展
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        //当我们在task执行出现异常时,Runnable.run()不能捕获到,
                        //需要做一层处理,这里才会走catch,这种做法不建议,因为这里走catch,那么该线程会直接跳出循环,最终被回收、重新创建线程
                        1.4 // 执行任务
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        // 执行任务后的函数,该函数为空,可以子类来实现,这是线程池的一个扩展
                        afterExecute(task, thrown);
                    }
                } finally {
                    // GC
                    task = null;
                    // 统计次数
                    w.completedTasks++;
                    // 释放
                    w.unlock();
                }
            }
            
            // 如果没有获取到任务,那么while循环就结束了
            ## while循环获取Task end
            
            // 不是因为异常退出,是正常退出
            // 当我们在task类处理成抛出异常能被task.run()捕获时,这个值才会是true
            completedAbruptly = false;
        } finally {
            // 退出当前线程后需要清理当前线程,具体下面分析
            processWorkerExit(w, completedAbruptly);
        }
    }


while循环获取task :

  • 根据getTask()获取任务,该函数下面再具体分析;
  • 如果通过getTask()获取到任务,则开始执行,否则可能是阻塞队列没有任务,那么会阻塞,返回null的话是退出,那么该线程会被剔除(processWorkerExit());
    • 根据 当前线程池运行状态 判断是否需要中断线程;
    • Java的中断机制具体可以看看这篇文章 : Thread的中断机制(interrupt)
    • 无论有没有执行wt.interrupt()来中断线程,都会继续往下执行代码;(Java的中断机制只是协调,并不会真的强制中断,最终中不中断我们可以在task的业务代码来控制);例如我们可以在task中通过中断标识或者InterruptedException来检测中断进行抛出异常,这样1.4步骤task.run();就可以捕获到,那么当前的线程就会异常退出、回收、重新创建,但是不建议这么做;
  • 最终调用1.4步骤的 task.run() , 这里才是真正的执行任务;
  • 获取task为空时,退出while循环,执行processWorkerExit()回收线程资源。
  • 【PS : beforeExecute() 、 afterExecute() 是ThreadPoolExecutor扩展的函数】

getTask() --- 获取任务

从阻塞队列中获取任务.


    private Runnable getTask() {
        
        boolean timedOut = false;
        
*         //自旋
        for (;;) {
            // 获取当前运行状态
            int c = ctl.get();
            // 保留高3bit位
            int rs = runStateOf(c);

            1. // 如果当前运行状态处于SHUTDOWN 之后(包括SHUTDOWN) ,可能需要回收线程, 继续判断 :
            // 处于 STOP 之后,不管队列的任务了,直接回收线程; 
            // 或者 处于SHUTDOWN 并且阻塞队列为空的话,也回收线程 :
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                // 线程数减1
                decrementWorkerCount();
                //返回null,退出上一次的while循环
                return null;
            }

            // 获取当前线程数
            int wc = workerCountOf(c);

            2. // allowCoreThreadTimeOut:是否需要回收核心线程,
            // true表示没有核心线程的概念,全部都是非核心线程,默认为false
            // wc > corePoolSize : 当前创建的工作线程是否大于核心线程数
            //两个条件只要符合其中一个,那么当前存在非核心线程需要回收,
            //非核心线程从阻塞队列获取任务时,会有个时间限制,超过这个时间没有新任务的到来,那么将会被回收
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

            // wc > maximumPoolSize的情况是因为可能在此方法执行阶段同时执行了setMaximumPoolSize方法重置了
            // timed && timedOut 如果为true,表示当前存在需要回收的非核心线程,
            // 并且上次从阻塞队列中获取任务发生了超时,需要回收非核心线程
            // 接下来判断,如果当前工作线程数量至少2条时,或者阻塞队列是空的,那么尝试将workerCount减1,即回收非核心线程
            // 如果wc == 1时,并且阻塞队列还有任务,那么需要保留至少一条线程
            if ((wc > maximumPoolSize || (timed && timedOut))
                && (wc > 1 || workQueue.isEmpty())) {
                //CAS设置线程数减1
                if (compareAndDecrementWorkerCount(c))
                    return null;
                // 如果减1失败,则返回重试
                continue;
            }

            try {
                3. // 根据timed判断是否从workQueue超时阻塞式获取任务
                // workQueue是生产者-消费者模式,所以队列为null的时候,获取任务会进入阻塞状态
                // 如果 timed 为true : 则表示需要回收非核心线程,超时阻塞式获取任务
                // 如果 timed 为false : 则表示不需要回收非核心线程,非超时阻塞式获取
                Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
                //成功获取到任务
                if (r != null)
                    return r;
                // 在规定的时间里没有获取到任务
                timedOut = true;
            } catch (InterruptedException retry) {
                // 被中断唤醒,没有超时
                timedOut = false;
            }
        }
    }

  • 1.处于SHUTDOWN状态会继续执行阻塞队列中的任务,而处于STOP之后的状态,即使阻塞队列还存在任务也不处理了,直接获取任务失败,回收线程.
  • 2.超时控制 : 如果希望核心线程也被回收(allowCoreThreadTimeOut为true)或者 创建的线程数 大于 corePoolSize时(存在非核心线程) ,则存在非核心线程,那么非核心线程在获取任务时,需要根据 keepAliveTime 来超时阻塞式获取任务,如果在这段时间没有新任务的到来,那么这些工作线程就会被回收;如果核心线程不被回收(allowCoreThreadTimeOut为false)或者当前创建的线程数 小于等于 corePoolSize时 , 并且此时阻塞队列没有任务,那么这些线程将阻塞在workQueue.take()中,等待新的任务到来并唤醒.

以下的情况,getTask()获取为null :

  • 当前运行状态处于 STOP 之后(包括STOP),即使阻塞队列存在任务;
  • 当前运行状态处于 SHUTDOWN,并且阻塞队列没有任务了;
  • 当前allowCoreThreadTimeOut为true,即没有核心线程的概念,或者 创建的线程数 大于 corePoolSize,并且阻塞队列没有任务了,在限定的时间内没有等到阻塞队列新任务的到来,那么非核心线程会被回收;

当出现以上三种情况时,那么将跳出runWorker()的while循环,执行processWorkerExit()清理工作线程.

processWorkerExit() --- 清理工作线程


    private void processWorkerExit(Worker w, boolean completedAbruptly) {
        // 是否为异常而退出
        if (completedAbruptly)
            // 是,则需要设置ctl线程数减1
            decrementWorkerCount();

        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            // 统计任务完成次数
            completedTaskCount += w.completedTasks;
            // 删除工作线程
            workers.remove(w);
        } finally {
            mainLock.unlock();
        }

        // 判断当前线程池是否设置为TIDYING、TERMINATED状态,具体代码等下再分析
        tryTerminate();

        // 获取当前运行状态
        int c = ctl.get();
        // 如果 当前运行状态是 RUNNING 或者 SHUTDOWN
        if (runStateLessThan(c, STOP)) {
            // 工作线程是正常退出
            if (!completedAbruptly) {
                int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
                // workQueue存在任务,至少保留1个工作线程
                if (min == 0 && ! workQueue.isEmpty())
                    min = 1;
                // 当前工作线程数大于等于1条,不创建了
                if (workerCountOf(c) >= min)
                    return;
            }
    
            // 创建线程 :
            //2种情况:异常退出,重新创建线程 或者 
            //当前运行状态是 RUNNING 或者 SHUTDOWN,线程池中还存在任务,需要至少保留1条线程
            addWorker(null, false);
        }
        
    }


到目前为止,线程池的生命周期还没说完,但是工作线程从创建到死亡的生命周期已经可以做一下总结了.

  • 1.当工作线程数 小于 corePoolSize 时, 我们需要调用 addWorker() 创建线程来执行任务;
  • 2.当线程数 大于等于 corePoolSize 时,此时不是优先考虑创建工作线程,而是将任务添加到阻塞队列中,如果添加失败,说明阻塞队列已经放满了任务,那么此时需要调用 addWorker() 创建线程(maximumPoolSize) .
  • 3.此时的阻塞队列满了 并且 工作线程数 已经达到了 maximumPoolSize , 属于饱和状态,那么执行拒绝策略
  • 4.通过 addWorker() 成功创建的线程(Worker)并分配任务(Runneble),然后通过调用 start() 启动执行任务,最终调用的是 runWorker();
  • 5.每条线程执行runWorker()是在一个while循环,即线程每处理完一个任务,都会通过getTask()继续从阻塞队列中获取任务
  • 6.getTask() 控制着工作线程是否会被销毁 :
    • 当前运行状态处于 STOP 之后的状态(包括STOP),即使阻塞队列存在任务,也会返回给工作线程会null,跳出while循环;
    • 当前运行状态处于 SHUTDOWN 的状态,同时阻塞队列没有任务,也会返回给工作线程会null,跳出while循环;
    • 当前allowCoreThreadTimeOut为true,即没有核心线程的概念,或者 创建的线程数 大于 corePoolSize,并且阻塞队列没有任务了,在限定的时间内没有等到阻塞队列新任务的到来,也会返回给工作线程会null,跳出while循环;
    • 当前创建的线程数 小于等于 corePoolSize,并且阻塞队列没有任务了,那么这些核心线程将被阻塞在workQueue.take()中,等待任务的到来唤醒.
  • 7.跳出runWorker()的while循环后,将执行processWorkerExit()销毁工作线程.

线程池的生命周期

前面说到线程池的五种运行状态,代表着线程池的生命周期,从运行状态到死亡状态.

execute()

这个函数我们分析过了;当通过构造器创建线程池后,线程池处于运行状态(RUNNING),对应的如下代码 :


    // 默认为 RUNNING
    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

shutdown() --- 温和的关闭线程池

当线程池处于运行状态时,如果调用shutdown(),则线程池进入SHUTDOWN状态,结合上面execute()代码分析,我们可以知道 : 当处于 SHUTDOWN状态时, 是不接收新来的任务,但是会处理已经开始运行的任务 和 阻塞队列的任务, 处理完之后,全部已经创建的线程交由给JVM清理回收.


    public void shutdown() {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            // 检查
            checkShutdownAccess();
            // 设置线程池的运行状态为 SHUTDOWN
            advanceRunState(SHUTDOWN);
            // 中断空闲线程
            interruptIdleWorkers();
            // 这个是其他类的实现,可以忽略
            onShutdown();
        } finally {
            mainLock.unlock();
        }
        // 该函数下面分析
        tryTerminate();
    }

    
    # 设置线程池的运行状态为 SHUTDOWN
    private void advanceRunState(int targetState) {
        for (;;) {
            // 获取当前运行状态
            int c = ctl.get();
            // 如果当前运行状态处于 SHUTDOWN 之后,那么不需要设置了
            // 否则 高3bit位设置为 SHUTDOWN状态 , 低29bit位 保留线程数
            if (runStateAtLeast(c, targetState) ||
                ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c))))
                // 退出
                break;
        }
    }


    private void interruptIdleWorkers() {
        interruptIdleWorkers(false);
    }
    
    #最终调用 interruptIdleWorkers() : 中断空闲线程
    private void interruptIdleWorkers(boolean onlyOne) {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            for (Worker w : workers) {
                Thread t = w.thread;
                // tryLock()是不可重入的,所以如果处于空闲状态则返回true
                if (!t.isInterrupted() && w.tryLock()) {
                    try {
                        // 中断空闲线程
                        t.interrupt();
                    } catch (SecurityException ignore) {
                    } finally {
                        w.unlock();
                    }
                }
                // 是否只中断一条空闲线程
                if (onlyOne)
                    break;
            }
        } finally {
            mainLock.unlock();
        }
    }

当调用shutdown()时,是线程池是处于SHUTDOWN状态,从execute()分析的线程的生命周期可以知道,该状态下是不接受新来的任务,但是会处理剩下的任务.

shutdownNow() --- 暴力强制的关闭线程池

相比shutdown(),shutdownNow()是非常暴力的;当调用shutdownNow()时,线程池会处于STOP状态,结合上面execute()代码分析,我们可以知道 : 处于此状态下,不接收新任务,并且阻塞队列的任务也不处理了,但是这些阻塞队列的任务会被保留返回.具体可以看看 drainQueue().


    public List<Runnable> shutdownNow() {
        List<Runnable> tasks;
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            checkShutdownAccess();
            // 更改位STOP状态,上面代码分析了
            advanceRunState(STOP);
            // 中断线程
            interruptWorkers();
            // 返回阻塞队列中没有处理的任务
            tasks = drainQueue();
        } finally {
            mainLock.unlock();
        }
        // 该函数下面分析
        tryTerminate();
        return tasks;
    }


    # 中断线程
    private void interruptWorkers() {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            for (Worker w : workers)
                w.interruptIfStarted();
        } finally {
            mainLock.unlock();
        }
    }


    # 返回阻塞队列中没有处理的任务
    private List<Runnable> drainQueue() {
        BlockingQueue<Runnable> q = workQueue;
        ArrayList<Runnable> taskList = new ArrayList<Runnable>();
        q.drainTo(taskList);
        if (!q.isEmpty()) {
            for (Runnable r : q.toArray(new Runnable[0])) {
                if (q.remove(r))
                    taskList.add(r);
            }
        }
        return taskList;
    }

值得注意的是,Java的中断机制只是一种抛出异常的协调机制,并不是调用中断函数就会立马中断当前线程,也就是说如果调用中断函数后,当前线程的Task中如果没有对抛出的异常采用处理中断异常,那么当前的任务也会继续执行完的;

tryTerminate()

当线程退出时,会调用tryTerminate() 进行收尾操作 :

    
    final void tryTerminate() {
        for (;;) {
            // 获取当前运行状态
            int c = ctl.get();
            // 1. 当前运行状态是 RUNNING  暂时不设置TIDYING、TERMINATED 
            // 2. 或者 当前运行状态 处于 TIDYING 或者 TERMINATED, 那么也是暂时不设置TIDYING、TERMINATED
            // 3. 或者 就是在 SHUTDOWN ,并且存在任务需要处理,那么也是暂时不设置TIDYING、TERMINATED
            // 4. 否则 就是STOP,那么需要往下走
            if (isRunning(c) ||
                runStateAtLeast(c, TIDYING) ||
                (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
                return;
            // 执行到这里 说明处于 STOP状态, 但是还存在工作线程没处理完,帮忙中断线程
            // 如果线程数量不为0,则中断一个空闲的工作线程,并返回
            if (workerCountOf(c) != 0) { 
                interruptIdleWorkers(ONLY_ONE);
                return;
            }

            // 执行到这里 说明处于 SHUTDOWN(阻塞队列没有任务了) 或者 STOP
            // 那么设置 TIDYING、TERMINATED
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                // 设置 TIDYING
                if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
                    try {
                        // terminated方法默认什么都不做,留给子类实现
                        terminated();
                    } finally {
                        // 设置 TERMINATED
                        ctl.set(ctlOf(TERMINATED, 0));
                        termination.signalAll();
                    }
                    return;
                }
            } finally {
                mainLock.unlock();
            }
            
        }
    }

至此,线程池的生命周期就结束了,如下图 :

PS:上图中强制关闭,正在执行任务中的线程由于锁的原因不会被中断到,但是会唤醒阻塞在阻塞队列的线程。

结束语

  • 原创不易
  • 希望看完这篇文章的你有所收获!

相关参考资料

  • JDK1.8

目录