把学习当成一种习惯
选择往往大于努力,越努力越幸运

本文对源码的学习是基于JDK1.8版本.

前言

  基于并发编程基础:ReentrantLock之AQS独占式源码分析这篇文章,我们学习了AQS的底层数据结构及对同步状态的独占式获取,本文将从Java提供的并发工具 CountDownLatch(闭锁)、Semaphore(信号量) 进行源码分析,从而学习AQS共享式获取锁的实现原理.
  共享锁是多个线程可以共享一把锁,如ReentrantReadWriteLock的ReadLock、Semaphore、CountDownLatch 这些都是共享锁.
  本文主要角色 : Semaphore(信号量) 、 CountDownLatch(闭锁) 、 AbstractQueuedSynchronizer(简称AQS) .

【PS : AQS有两种对同步状态的获取方式 : 独占式(Exclusive,只能一条线程执行) 和 共享式(Share,多个线程可同时执行) , 本文基于CountDownLatch(闭锁)、Semaphore(信号量)的基础上在共享模型下进行源码分析 】

共享式

  • 回忆一下ReentrantLock是基于同步状态state来维持只能一条线程持有锁,即state == 0 时是没有线程持有锁的,state == 1 时,存在线程持有锁;获取锁失败的线程将会加入同步队列;当持有锁的线程释放锁时,将会把头结点的后继结点唤醒并尝试获取锁(最少2次,失败了就再次阻塞).
  • 而共享模式下也是通过维护同步状态state来维持多条线程可以持有锁,与独占式不同的是,独占式只能一条持有锁的线程,且只能尝试获取一个资源,而共享式可以支持同时多条线程持有锁,且每条线程可以尝试获取多个资源,所以获取共享资源前会判断当前的共享资源够不够用,如果不够用,直接加入同步队列,如果共享资源够用,会通过CAS尝试获取,CAS失败的话会重新自旋,重新计算当前共享资源够不够用,又回到上面那一步;结局就是要么不够用而加入同步队列,要么CAS成功竞争共享资源成功;
    • 当持有锁的线程释放锁时,将会把头结点的后继结点唤醒并尝试获取锁(最少2次,失败了就再次阻塞).这个跟独占式一样,不一样的是共享式唤醒头结点的后继结点是实现了传播式唤醒,因为共享式是支持多线程持有锁的,共享资源可能无时无刻在释放资源.传播式唤醒保证了更多线程可以参与到竞争共享资源中.

【PS : AQS主要负责同步队列、条件队列的出入队的维护,不用我们关心,具体对于同步状态state的控制,是我们自己代码来控制,从而实现不同的共享式同步工具 】

Semaphore --- 源码分析

简单例子


public class SemaphoreTest {
    public static void main(String[] args) throws InterruptedException {
        // 共享资源为3
        int count = 3;
        // 初始化信号量
        Semaphore semaphore = new Semaphore(count);
        // 初始化线程池
        ThreadPoolExecutor executor = new ThreadPoolExecutor(count, count,
                60, TimeUnit.SECONDS, new LinkedBlockingDeque<>(count));
        // 提前创建核心线程
        executor.prestartCoreThread();

        for (int i = 0; i < 5; i++) {
            executor.execute(new Runnable() {
                @Override
                public void run() {
                    try {
                        // 尝试获取共享资源
                        semaphore.acquire();
                        System.out.println(Thread.currentThread().getName() + " : 成功获取共享资源");
                        System.out.println(Thread.currentThread().getName() + " : 执行任务开始");
                        Thread.sleep(2000);
                        System.out.println(Thread.currentThread().getName() + " : 执行任务结束");
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    } finally {
                        // 释放共享资源
                        semaphore.release();
                    }
                }
            });
        }

        // 关闭线程池
        executor.shutdown();

        // 线程池是否进入 TIDYING 状态
        while (executor.getPoolSize() != 0) {
            System.out.println("主线程等待5条线程执行完成后往下执行");
            Thread.sleep(2000);
        }
        System.out.println("5条线程已经执行完成");
        System.out.println("主线程继续执行下面的任务...");

    }
}

  • 执行结果 :
  • 上面是一个基于信号量实现的一个简单例子 : 信号量初始化了3个共享资源,而线程池开启了5条线程执行任务,前三个线程可以成功竞争到共享资源而执行任务,后面2个线程阻塞于acquire(),等待其他线程调用release()释放共享资源后唤醒竞争共享资源.

介绍

  • Semaphore 通常我们叫它信号量,可以用来控制同时访问特定资源的线程数量,通过协调各个线程,以保证合理的使用资源.
  • 通常用于那些资源有明确访问数量限制的场景,常用于限流、数据库连接池等

构造器


 // 默认采用非公平同步器
 public Semaphore(int permits) {
     sync = new NonfairSync(permits);
 }
 // 根据fair选择公平同步器还是非公平
 public Semaphore(int permits, boolean fair) {
     sync = fair ? new FairSync(permits) : new NonfairSync(permits);
 }

Sync同步器

  • Semaphore提供了公平、非公平同步器的实现 : FairSync 、NonfairSync .

    private final Sync sync;
    // 继承了AQS : 底层的同步队列、条件队列由它来维护
    abstract static class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = 1192457210091910933L;

        // 设置共享资源
        Sync(int permits) {
            setState(permits);
        }

        // 获取共享资源
        final int getPermits() {
            return getState();
        }

        // 非公平的方式尝试获取共享资源
        // 直到获取到共享资源 或者 没有共享资源可获取,该函数才返回
        final int nonfairTryAcquireShared(int acquires) {
           ...
        }

        // 释放共享资源(增加共享资源)
        protected final boolean tryReleaseShared(int releases) {
           ...
        }
        
        // 减少共享资源
        final void reducePermits(int reductions) {
           ...
        }

        // 将共享资源设置为0
        final int drainPermits() {
           ...
        }
        
    }

  • FairSync :

    static final class FairSync extends Sync {
        private static final long serialVersionUID = 2014338818796000944L;

        FairSync(int permits) {
            super(permits);
        }

        // 公平式获取共享资源
        protected int tryAcquireShared(int acquires) {
            for (;;) {
                if (hasQueuedPredecessors())
                    return -1;
                int available = getState();
                int remaining = available - acquires;
                if (remaining < 0 ||
                    compareAndSetState(available, remaining))
                    return remaining;
            }
        }
        
    }

  • NonfairSync :

    static final class NonfairSync extends Sync {
        private static final long serialVersionUID = -2694183684443567898L;

        NonfairSync(int permits) {
            super(permits);
        }

        // 非公平式获取共享资源
        protected int tryAcquireShared(int acquires) {
            return nonfairTryAcquireShared(acquires);
        }
    }

acquire() --- 共享式获取资源(响应中断)

  • 非公平获取 :

    public void acquire() throws InterruptedException {
        // 获取1个资源 : Semaphore只允许竞争1个共享资源
        sync.acquireSharedInterruptibly(1);
    }
    
    // Semaphore调用的acquire()获取锁,真正实现的函数是这个
    public final void acquireSharedInterruptibly(int arg)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        // 核心在这里
        // 共享式获取资源,如果小于0,则表示没有共享资源可以获取了
        if (tryAcquireShared(arg) < 0)
            // 获取共享资源失败,需要进入同步队列
            doAcquireSharedInterruptibly(arg);
    }

  • tryAcquireShared() : 共享锁获取资源

    // 获取共享资源由我们自己来代码实现
    // 两种实现,具体如下图
    protected int tryAcquireShared(int arg) {
        throw new UnsupportedOperationException();
    }

  • 我们先看非公平获取共享资源 :

    protected int tryAcquireShared(int acquires) {
        return nonfairTryAcquireShared(acquires);
    }
    
    // 最终调用这个函数
    final int nonfairTryAcquireShared(int acquires) {
         // 自旋
         for (;;) {
             // 获取当前共享资源
             int available = getState();
             // 计算共享资源够不够当前线程使用
             // 小于0 : 不够
             // 等于0 : 刚刚好
             // 大于0 :足够
             int remaining = available - acquires;
             // 如果小于0 则表示 不够 直接返回
             // 如果大于等于0,则尝试CAS竞争共享资源
             // 失败的话自旋重新获取
             // 成功的话 直接返回
             if (remaining < 0 ||
                 compareAndSetState(available, remaining))
                 return remaining;
         }
     }

  • tryAcquireShared() : 这个函数的返回结果,小于0表示获取共享资源失败、大于等于0表示获取共享资源成功;回到 acquireSharedInterruptibly()
  • acquireSharedInterruptibly() : 如果 tryAcquireShared() 返回小于0,那么将执行 doAcquireSharedInterruptibly()

doAcquireSharedInterruptibly()

  • doAcquireSharedInterruptibly() : 如果tryAcquireShared()竞争共享资源失败,需要进入同步队列
  • 该函数由AQS自己实现,因为同步队列由AQS自己来维护,不需要我们关注.
    
    private void doAcquireSharedInterruptibly(int arg)
        throws InterruptedException {
        // 入队,该函数在独占式已经分析过了,本文不具体分析了
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            // 自旋 : 2次机会
            for (;;) {
                // 获取前继结点 : 前继结点保存了后继结点的waitStatue
                final Node p = node.predecessor();
                1. // 如果前继结点是head,那么尝试竞争共享资源
                if (p == head) {
                    // 尝试竞争共享资源
                    int r = tryAcquireShared(arg);
                    // 竞争成功
                    if (r >= 0) {
                        // 设置为head,并唤醒后继结点
                        // (唤醒后继结点并不是一定的,具体下面会分析)
                        setHeadAndPropagate(node, r);
                        p.next = null;
                        failed = false;
                        return;
                    }
                }
                2. // 前继结点不是head 或者 竞争共享资源失败了
                // 那么将设置为SIGNAL状态后挂起
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

  • shouldParkAfterFailedAcquire() : 设置SIGNAL状态

    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus;
        1. // SIGNAL 不需要设置了
        if (ws == Node.SIGNAL)
            return true;
        2. // 大于0,那么前继结点是取消状态,跳过前继结点,继续向前找(最靠前的那一个结点)  
        if (ws > 0) {
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            3. // 设置SIGNAL,记录在前继结点
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }

【PS : 从上面shouldParkAfterFailedAcquire()代码可知,要设置结点为SIGNAL,必须执行最少2次才可执行parkAndCheckInterrupt()进入阻塞状态】

doAcquireSharedInterruptibly() --- 小结

  • 当头结点的后继节点竞争共享资源成功,会调用的setHeadAndPropagate(),该函数下面再继续分析;
  • 结点进入阻塞状态前肯定要调用shouldParkAfterFailedAcquire()来设置为SIGNAL状态,且必须经过至少2次的调用,才会调用parkAndCheckInterrupt()进入阻塞状态;
  • 头结点的后继结点即使被唤醒过来竞争共享资源,如果竞争共享资源失败也会再次进入阻塞状态 : 第一次竞争失败,会执行第一次shouldParkAfterFailedAcquire()来设置为SIGNAL状态,然后继续自旋第二次,如果第二次还是失败,那么此时将进入阻塞状态;
  • 结点的状态保存在前继结点中;

【PS : 以下 setHeadAndPropagate() 、doReleaseShared()、unparkSuccessor() 需要配合理解才能把这三个函数的代码判断串联起来 】

setHeadAndPropagate() --- 设置head

  • setHeadAndPropagate() : 设置head,并且判断是否需要唤醒后继结点.

    private void setHeadAndPropagate(Node node, int propagate) {
        // 保留head
        Node h = head;
        // 设置head
        setHead(node);
        
        // 很多条件,具体下面分析,目前我们知道 propagate是大于等于0的
        // 大于0表示还有共享资源(直接短路后面全部判断),需要执行唤醒后继结点竞争共享资源
        if (propagate > 0 || h == null || h.waitStatus < 0 ||
            (h = head) == null || h.waitStatus < 0) {
            // 唤醒下一结点
            Node s = node.next;
            // 如果为null是为了防止空指针 : next指针并不是百分百可靠的 或者 node是尾结点
            // 如果 s 不为null 继续判断是共享式 需要执行唤醒操作
            if (s == null || s.isShared())
                // 执行唤醒操作
                doReleaseShared();
        }
    }

  • 执行doReleaseShared()唤醒后继结点,为什么propagate条件的后面还需要那么多条件来继续判断?
  • doReleaseShared() : 自旋的方式唤醒后继结点

    private void doReleaseShared() {
       
        for (;;) {
            1. // 获取最新的head
            Node h = head;
            if (h != null && h != tail) {
                int ws = h.waitStatus;
                2. // 处于SIGNAL状态
                if (ws == Node.SIGNAL) {
                    // 尝试更新为 0 ,即默认正常状态
                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                        continue;
                    // 成功更新为正常状态,执行唤醒操作
                    unparkSuccessor(h);
                }
                3. // 处于默认初始化状态,设置为PROPAGATE
                else if (ws == 0 &&
                         !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                    continue;
            }
            4. // head并没有变更,终止唤醒操作
            if (h == head)
                break;
        }
    }

  • 自旋的方式唤醒后继结点,为什么需要重新获取head、最终终止自旋的前提条件是head没有改变了?
  • 为什么会出现第3步骤的 ws == 0 状态,并且设置为PROPAGATE?
  • unparkSuccessor() : 唤醒head的后继结点

    private void unparkSuccessor(Node node) {
        
        // 获取head记录的状态
        int ws = node.waitStatus;
        // 可能小于0,设置为0
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);

        // 获取head的下一结点
        Node s = node.next;
        // 如果s为null,防止空指针
        // 如果s不为null 则唤醒结点的后继结点不能是取消状态
        // 那么从tail往前查找最靠前的第一个waitStatus <= 0状态的结点
        if (s == null || s.waitStatus > 0) {
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)
            // 唤醒操作
            LockSupport.unpark(s.thread);
    }


  • setHeadAndPropagate() 会改变最新的head,doReleaseShared()的终止执行依赖于head,所以这两函数明显有很大的关联,上面提出的几个问题,需要这两个函数重点分析.

doAcquireSharedInterruptibly() --- 详解

  • 要弄清楚以上的问题,我们需要以两种场景来分析 : 单线程场景 、 多线程场景 ;
  • 单线程场景 : 执行setHeadAndPropagate(),不会有其他线程执行doReleaseShared(),即两个函数全程只出现一条线程在执行,也就算说单线程串行的执行setHeadAndPropagate() 、doReleaseShared()
  • 多线程场景 : 多个线程并行的执行 setHeadAndPropagate() 、doReleaseShared()

回顾一下,从doAcquireSharedInterruptibly()我们可以知道 :

  • 因为头结点的后继结点唤醒后有可能竞争不到资源,所以头结点的后继结点可能是阻塞状态,且头结点的状态保存了后继结点的状态(结点的状态保存在前继结点中)

  • 头结点的后继结点竞争到共享资源,会调用setHeadAndPropagate()设置自己为head结点

  • 如果有共享资源,会调用doReleaseShared()唤醒后继结点竞争共享资源

  • 调用 doReleaseShared()的判断条件有很多,其中第一个条件就是当前存在共享资源,直接短路后面所有判断条件;后面很多判断条件,需要结合多线程场景来分析;

  • doReleaseShared()是一个可能出现多线程并发执行的自旋函数,因为存在多线程并行的释放共享资源;具体需要结合多线程发场景来解析代码细节 :

    • 如果head结点记录的是SIGNAL,那么需要唤醒头结点,通过CAS设置为0状态,如果设置失败,那么表示被其他线程改变了状态,重新自旋获取最新的head;如果设置成功,那么执行unparkSuccessor()唤醒操作;
    • 当执行到 ws == 0 时需要尝试设置为PROPAGATE;
    • 自旋期间head可能会改变,自旋终止于head没有被其他线程改变;
  • PS : doAcquireSharedInterruptibly() : 被唤醒的结点竞争共享资源会遇到这种情况 : 第一次竞争失败,会调用shouldParkAfterFailedAcquire() 从状态0再次设置为 -1 , 然后再次自旋继续尝试竞争,如果第二次竞争成功,那么就会进入setHeadAndPropagate()设置头结点,此时头结点的waitStatus因为第一次竞争失败而被设置为-1 , 执行到下面多条件判断会出现不必要的唤醒操作;也就是说 头结点的waitStatus 可能会有 -1 这种状态,这种状态在我下面分析的单线程场景、多线程场景,我们假设没遇到这种情况,另外做分析.

单线程场景

  • 单线程场景 : 我们现在假设全程只有一条线程在执行 setHeadAndPropagate() 、doReleaseShared() :
    • 线程1执行顺序 : releaseShared() --> doReleaseShared() --> unparkSuccessor()
    • 被线程1唤醒的结点执行顺序 : doAcquireSharedInterruptibly() 唤醒后继续自旋竞争资源,如果竞争资源成功,那么执行顺序是 doAcquireSharedInterruptibly() --> setHeadAndPropagate() ,如果存在共享资源,那么执行顺序是 doAcquireSharedInterruptibly() --> setHeadAndPropagate() --> doReleaseShared() --> unparkSuccessor() ;
  • 当线程调用 doReleaseShared() :

     for (;;) {
            // 获取head
            Node h = head;
            if (h != null && h != tail) {
                int ws = h.waitStatus;
                if (ws == Node.SIGNAL) {
                    // 这里肯定成功,因为在此场景没有其他线程参与这个函数的执行
                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                        continue;
                    // 唤醒的结点,状态是0
                    // 肯定是走这里,唤醒结点
                    unparkSuccessor(h);
                }
                // 这里可以得出总结 : 多条线程执行这个函数才可能走这个判断
                // 具体下面再分析
                else if (ws == 0 &&
                         !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                    continue;
            }
            // 直接退出,因为在此场景没有其他线程参与setHeadAndPropagate()的执行
            // 所以head并没有改变
            if (h == head)
                break;
        }
    }

  • unparkSuccessor()唤醒结点,结点在 doAcquireSharedInterruptibly() 中继续自旋竞争共享资源;
  • doAcquireSharedInterruptibly() : 被唤醒后如果竞争到资源,那么执行setHeadAndPropagate()设置新的head,如果存在共享资源,那么将会调用 doReleaseShared() 来唤醒后继结点;
  • 在线程执行 setHeadAndPropagate() 的时候,且没有其他线程执行 doReleaseShared()的这种情况下 : 头结点的状态不会出现被其他线程执行doReleaseShared()改变为PROPAGATE;

    private void setHeadAndPropagate(Node node, int propagate) {
        Node h = head;
        setHead(node);
        
        // propagate > 0 存在共享资源则需要执行唤醒操作,后面的条件不用看
        // propagate <= 0 不存在共享资源,需要判断后面的条件
         // PS : 在线程执行 setHeadAndPropagate() 的时候,且没有其他线程执行 doReleaseShared() 的这种情况下 
        // h.waitStatus 是 0 ,所以后面的 h.waitStatus条件都是false(判断null是防止空指针)
        // 如果是 h.waitStatus < 0 是true的, 那么是出现多条线程同时执行了
        // 即setHeadAndPropagate() 和 doReleaseShared() 分别有线程在执行
        // h.waitStatus < 0 为true时
        // 对应的其实就是 另一条线程在执行doReleaseShared()的CAS PROPAGATE状态
        // PROPAGATE状态不懂的可以暂不纠结,下一个场景会说明
        // 这里暂时知道 :
        // 单条线程在执行 setHeadAndPropagate() 的时候,且没有其他线程在执行 doReleaseShared() 情况下 ,h.waitStatus 肯定是 0
        // 多条线程同时在执行 setHeadAndPropagate()、doReleaseShared() 情况下 如果h.waitStatus < 0 那么就会出现PROPAGATE状态
        if (propagate > 0 || h == null || h.waitStatus < 0 ||
            (h = head) == null || h.waitStatus < 0) {
            Node s = node.next;
            if (s == null || s.isShared())
                doReleaseShared();
        }
    }

如下图 :

  • 只有一条线程在执行 setHeadAndPropagate() 和 doReleaseShared()的情况下,很好理解,上面基于单条线程的执行场景只是为了更好的理解多线程场景的PROPAGATE状态 、 setHeadAndPropagate() 的多判断条件;因为只有多条线程执行这两个函数才会真正触发设置 PROPAGATE状态,紧接着相关联的setHeadAndPropagate() 的多条件状态判断会变为true;
  • 由于是共享式,所以会存在多线程同时释放共享资源,即多线程执行doReleaseShared(), 那么setHeadAndPropagate() 和 doReleaseShared() 就会存在并发执行,接下来分析这种多线程并发执行setHeadAndPropagate() 和 doReleaseShared()的场景.

多线程场景

  • 当只有一条线程在执行 setHeadAndPropagate() 和 doReleaseShared() 时,不会出现 PROPAGATE状态,setHeadAndPropagate() 的多条件判断状态(h.waitStatus)判断肯定是false.
  • 当多条线程并行执行 setHeadAndPropagate() 和 doReleaseShared() 时, 会出现 PROPAGATE状态,则setHeadAndPropagate() 的多条件判断状态(h.waitStatus)判断会出现是true.
    • doReleaseShared()的自旋终止取决于head,即setHeadAndPropagate()的setHead(node)
    • setHeadAndPropagate()的setHead(node)只会存在一条线程执行,即头结点的后继结点

setHeadAndPropagate() 和 doReleaseShared() 并发执行时 :

  • 当线程释放资源后执行 doReleaseShared() 来唤醒后继结点 :
    • 如果头结点状态是SIGNAL : 尝试CAS设置为0状态(这里保证了只有一条线程来唤醒后继结点),如果成功则执行 unparkSuccessor(h) 唤醒这个后继结点,如果失败,那么表示这个后继结点的唤醒由其他线程完成了;
    • 如果头结点状态是0,并不是SIGNAL,那么尝试CAS 设置为 PROPAGATE状态 , 失败的话则重新自旋,重新获取最新的head; 成功的话,看看head是不是变更了, 此时的head如果没变更,那么就会退出本次自旋;如果变更了,说明是其他线程在他之前执行过 setHead(node) 了, 那么可能需要继续唤醒后继结点,继续重新自旋 .
  • 当线程执行setHeadAndPropagate(),此时的head的状态要么是0 或者 -3 :
    • 执行 Node h = head 保存head,然后执行 setHead(node); 设置最新的head结点.
    • 多条件判断 :
    • propagate > 0 : 表示存在共享资源,需要唤醒操作;
    • h == null || h.waitStatus < 0 : 第一个条件是防止空指针,然后判断是否为 PROPAGATE状态,如果是 肯定需要唤醒操作
    • (h = head) == null || h.waitStatus < 0 : 再次重新获取最新的head,并且防止空指针,然后判断是否为 PROPAGATE状态,如果是 肯定需要唤醒操作;(PS : 这里为什么要重新获取最新的head,下面会画图分析)
  • propagate > 0 为false , 而 第一个 h.waitStatus < 0 为true 如下图 :
  • propagate > 0 为false , 第一个 h.waitStatus < 0 为false, 而第二个 h.waitStatus < 0 为true 如下图 :
  • PROPAGATE状态其实就是潜意识的告诉正在执行 setHeadAndPropagate() 的线程 :即使你获取共享资源成功后没有更多的共享资源(propagate<=0),那么也有可能有共享资源,需要你来执行doReleaseShared().
  • 当 多线程并发执行 setHeadAndPropagate() 和 doReleaseShared() 时, 通过PROPAGATE状态来保证共享资源获取、释放的连续性.
  • doReleaseShared() 保证了只有一条线程成功释放head的后继结点,且自旋终止于head是否变更,即setHeadAndPropagate()的 setHead();
  • setHeadAndPropagate() 核心任务的设置最新的head,调用 setHead() 设置head后,正在执行 doReleaseShared() 的线程中只有一条线程会去执行唤醒最新head的后继结点.
  • 当 propagate > 0 为false 时, 说明该线程在成功获取到共享资源后,没有剩余的共享资源了,是不需要执行 doReleaseShared() 来唤醒后继结点的, 但是由于是共享资源,所以无时不刻有其他线程在释放资源,所以 propagate 并不是实时、正确的,当线程在执行doReleaseShared()时,发现要唤醒的后继结点是已经被唤醒状态了(h.waitStatus == 0),那么需要设置为PROPAGATE状态,通过该状态来告诉head的后继结点当前有可能有共享资源,需要你来执行doReleaseShared().

不必要的唤醒

  • 头结点的waitStatus 可能会有 -1 这种状态,但是头结点的后继结点其实是唤醒状态的,不需要再被唤醒,这样导致的结果就是执行了不必要的唤醒.
  • doAcquireSharedInterruptibly() :
    • 被唤醒的结点竞争共享资源会遇到这种情况 : 第一次竞争失败,会调用shouldParkAfterFailedAcquire() 从状态0再次设置为 -1 , 然后再次自旋继续尝试竞争,如果第二次竞争成功,那么就会进入setHeadAndPropagate()设置头结点,那么此时头结点的waitStatus == -1 , 执行到下面多条件判断会出现不必要的唤醒操作;
    • 或者自旋第一次的时候,前继结点并非head,所以此时会调用shouldParkAfterFailedAcquire() 设置为 -1,但是自旋第二次,此时的前继结点的head,会尝试竞争共享资源,如果竞争成功,那么就会进入setHeadAndPropagate()设置头结点,那么此时头结点的waitStatus == -1 , 执行到下面多条件判断会出现不必要的唤醒操作;

小结

  • PROPAGATE状态实现的意义是 传播式的唤醒后继结点,当前head的status是0,所以把0变成PROPAGATE,好让被唤醒线程可以检测到有共享资源可以竞争;
  • setHeadAndPropagate函数用来设置新head,并在一定情况下调用doReleaseShared;
  • head的后继结点在-1状态下可能是唤醒了的;

release() --- 共享式释放资源

  • 当持有共享资源的线程执行完临界资源后将调用 release() 释放共享资源给其他线程获取

    public void release() {
        // 释放一个共享资源
        sync.releaseShared(1);
    }

    
    public final boolean releaseShared(int arg) {
        // 释放共享资源
        if (tryReleaseShared(arg)) {
            // 唤醒头结点的后继结点
            doReleaseShared();
            return true;
        }
        return false;
    }

  • tryReleaseShared() : 释放共享资源,由我们自己实现

    protected final boolean tryReleaseShared(int releases) {
         // 自旋
         for (;;) {
             int current = getState();
             // 增加资源
             int next = current + releases;
             if (next < current)
                 throw new Error("Maximum permit count exceeded");
             // CAS设置
             if (compareAndSetState(current, next))
                 return true;
         }
     }

相关API

  • 共享式获取资源 :
    • acquire() : 该函数上面源码已经分析了,阻塞式调用,只需调用一次即可,如果获取不到共享资源,那么将进入阻塞,直到返回(成功竞争到共享资源).是一个响应中断的共享式获取资源,.
    • acquireUninterruptibly : 该函数是阻塞式调用,只需调用一次即可,如果获取不到共享资源,那么将进入阻塞,直到返回(成功竞争到共享资源).是一个非响应中断的共享式获取资源,与上面不同的是即使被中断了也不会立马抛出中断异常,而是竞争到共享资源后,发现被中断过,才抛出中断异常
    • tryAcquire() : 该函数是非阻塞式调用,调用后立马返回竞争共享资源的结果
    • tryAcquire(long timeout, TimeUnit unit) : 该函数是阻塞式且响应中断调用,根据timeout唤醒竞争共享资源的同步队列结点,即竞争共享资源失败后加入同步队列,timeout时间后唤醒继续竞争共享资源.
  • 共享式释放资源 :
    • release() : 该函数上面源码已经分析了,释放共享资源后唤醒头结点的后继节点.

CountDownLatch --- 源码分析

简单例子


    public class CountDownLatchTest {
        public static void main(String[] args) {
            int count = 3;
            // 初始化闭锁
            CountDownLatch countDownLatch = new CountDownLatch(count);

            ThreadPoolExecutor executor = new ThreadPoolExecutor(count, count,
                    60, TimeUnit.SECONDS, new LinkedBlockingDeque<>(count));
            // 提前创建核心线程
            executor.prestartCoreThread();

            for (int i = 0; i < 3; i++) {
                executor.execute(new Runnable() {
                    @Override
                    public void run() {
                        try {
                            System.out.println(Thread.currentThread().getName() + " : 执行任务开始");
                            Thread.sleep(2000);
                            System.out.println(Thread.currentThread().getName() + " : 执行任务结束");
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        } finally {
                            // 线程任务执行完毕,调用countDown()减一
                            countDownLatch.countDown();
                        }
                    }
                });
            }

            // 关闭线程池
            executor.shutdown();

            try {
                System.out.println("主线程阻塞,等待上面三条线程执行完成后执行");
                countDownLatch.await();
                System.out.println("主线程被唤醒,上面三条线程已经执行完成");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

            System.out.println("主线程继续执行下面的任务...");

        }
    }

  • 上面是一个简单的使用例子 : 初始化三条线程来并行的执行任务,每个线程执行完任务后将调用countDown()减一(state 减一);而主线程将调用await()而阻塞进入等待队列,等待最后一条线程执行完任务后的唤醒;当最后一条线程完成任务后调用countDown()减一,此时的state == 0 ,表示三条线程已执行完成,那么将唤醒等待队列中的主线程,主线程从await()唤醒过来,继续向下执行任务.

介绍

  • CountDownLatch又叫闭锁,在JDK1.5被引入,允许一个或多个线程等待其他线程完成操作后再执行.
  • CountDownLatch内部会维护一个初始值为线程数量的计数器,主线程执行await方法,如果计数器大于0,则阻塞等待.当一个线程完成任务后,计数器值减1.当计数器为0时,表示所有的线程已经完成任务,等待的主线程被唤醒继续执行.
  • 应用场景 : 多个线程(任务)完成后,进行汇总合并
  • 不足 : CountDownLatch是一次性的,计数器的值只能在构造方法中初始化一次,之后没有任何机制再次对其设置值,当CountDownLatch使用完毕后,它不能再次被使用.
  • CountDownLatch的底层实现也是AQS的共享式

CountDownLatch构造器

通过构造器初始化state , 代码如下 :


    // count为初始化AQS的state
    public CountDownLatch(int count) {
        if (count < 0) throw new IllegalArgumentException("count < 0");
        // 同步器
        this.sync = new Sync(count);
    }

还是熟悉的自定义Sync同步器 , 代码如下 :


    // 同步器
    private static final class Sync extends AbstractQueuedSynchronizer {

        // 设置计数,即设置AQS的同步状态state为count
        Sync(int count) {
            setState(count);
        }

        // 获取计数state
        int getCount() {
            return getState();
        }

        // 该函数的作用是访问同步状态state是否为0
        protected int tryAcquireShared(int acquires) {
            return (getState() == 0) ? 1 : -1;
        }

        // 共享式释放锁,即state减一,countDown()调用的就是这个函数
        protected boolean tryReleaseShared(int releases) {
         ......
        }
    }


countDown() 和 await()

CountDownLatch常用的 countDown() 和 await() 这两个函数 :


    // state减1(同步状态state减少1)
    public void countDown() {
        sync.releaseShared(1);
    }
    
    // state不为0时,是在等待队列(阻塞状态)
    // state为0时,被唤醒
    public void await() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }

countDown()

共享式释放锁 --- releaseShared()

  • CountDownLatch调用countDown(),即调用sync.releaseShared(1),该函数对应的实现代码如下 : 总共有两个函数 : tryReleaseShared(arg) \ doReleaseShared()

    public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {
            doReleaseShared();
            return true;
        }
        return false;
    }

看看第一个函数 : tryReleaseShared(arg) :


    // 释放锁操作由我们自己实现
    protected boolean tryReleaseShared(int arg) {
        throw new UnsupportedOperationException();
    }

具体看子类,如下图 :

点击CountDownLatch跳转到实际的实现代码如下 :

       // 共享式释放锁,在Sync中
       protected boolean tryReleaseShared(int releases) {
            // 1. 自旋方式释放
            for (;;) {
                // 2. 获取当前的同步状态state
                int c = getState();
                // 3. 计数为0,表示没有共享资源了,可能出现这种情况 :
                // 执行线程数大于state,防止state减少到小于0,即初始化的时候执行线程数应该跟state相等
                // 当然有一些特殊场景支持执行线程数大于state
                if (c == 0)
                    return false;
                // 4. state减1
                int nextc = c-1;
                // 5. CAS设置,失败则自旋继续,成功则判断是否最后一个资源被我释放了,是则执行唤醒操作
                if (compareAndSetState(c, nextc))
                    // 6. 计数为0,即共享资源释放完成,需要执行doReleaseShared()唤醒操作,
                    // 唤醒的线程会在await()中唤醒,继续往下执行
                    return nextc == 0;
            }
        }

当最后一个资源成功释放后,该线程需要执行唤醒操作,即执行doReleaseShared(),该函数在信号量中已经分析过了,就不具体分析了.

await()

  • await() : 当共享资源不等于0时,说明还存在线程未完成任务,那么将加入到同步队列

    public void await() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }
    
    public final void acquireSharedInterruptibly(int arg)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        if (tryAcquireShared(arg) < 0)
            // 存在共享资源,加入到同步队列,该函数已经分析过了,这里就不分析了
            doAcquireSharedInterruptibly(arg);
    }
    
    // 存在共享资源,返回-1
    protected int tryAcquireShared(int acquires) {
        return (getState() == 0) ? 1 : -1;
    }

小结

  • 线程调用await(),如果此时还有共享资源,说明还有线程未完成任务,那么将加入同步队列;
  • 线程调用countDown()对共享资源减1,当共享资源减少为0时,执行doReleaseShared()唤醒头结点的后继结点;
  • 头结点的后继节点从await()中唤醒过来,继续往下执行;

相关API

  • countDown() : 共享资源减1,直到共享资源减少到为0,就会执行doReleaseShared()唤醒头结点的后继结点.
  • await() : 阻塞式且响应中断调用,只需调用一次,直到共享资源为0时被唤醒.

结束语

  • 信号量实现了公平、非公平两种方式的同步器,非公平方式可以极大的提高吞吐量,基于并发编程基础:ReentrantLock之AQS独占式源码分析这篇文章有说到公平方式的竞争锁,两者是类似的,本文就不详细叙述了.
  • 原创不易
  • 希望看完这篇文章的你有所收获!

相关参考资料

  • JDK1.8

目录