
本文对源码的学习是基于JDK1.8版本.
前言
基于并发编程基础:ReentrantLock之AQS独占式源码分析这篇文章,我们学习了AQS的底层数据结构及对同步状态的独占式获取,本文将从Java提供的并发工具 CountDownLatch(闭锁)、Semaphore(信号量) 进行源码分析,从而学习AQS共享式获取锁的实现原理.
共享锁是多个线程可以共享一把锁,如ReentrantReadWriteLock的ReadLock、Semaphore、CountDownLatch 这些都是共享锁.
本文主要角色 : Semaphore(信号量) 、 CountDownLatch(闭锁) 、 AbstractQueuedSynchronizer(简称AQS) .【PS : AQS有两种对同步状态的获取方式 : 独占式(Exclusive,只能一条线程执行) 和 共享式(Share,多个线程可同时执行) , 本文基于CountDownLatch(闭锁)、Semaphore(信号量)的基础上在共享模型下进行源码分析 】
共享式
- 回忆一下ReentrantLock是基于同步状态state来维持只能一条线程持有锁,即state == 0 时是没有线程持有锁的,state == 1 时,存在线程持有锁;获取锁失败的线程将会加入同步队列;当持有锁的线程释放锁时,将会把头结点的后继结点唤醒并尝试获取锁(最少2次,失败了就再次阻塞).
- 而共享模式下也是通过维护同步状态state来维持多条线程可以持有锁,与独占式不同的是,独占式只能一条持有锁的线程,且只能尝试获取一个资源,而共享式可以支持同时多条线程持有锁,且每条线程可以尝试获取多个资源,所以获取共享资源前会判断当前的共享资源够不够用,如果不够用,直接加入同步队列,如果共享资源够用,会通过CAS尝试获取,CAS失败的话会重新自旋,重新计算当前共享资源够不够用,又回到上面那一步;结局就是要么不够用而加入同步队列,要么CAS成功竞争共享资源成功;
- 当持有锁的线程释放锁时,将会把头结点的后继结点唤醒并尝试获取锁(最少2次,失败了就再次阻塞).这个跟独占式一样,不一样的是共享式唤醒头结点的后继结点是实现了传播式唤醒,因为共享式是支持多线程持有锁的,共享资源可能无时无刻在释放资源.传播式唤醒保证了更多线程可以参与到竞争共享资源中.
【PS : AQS主要负责同步队列、条件队列的出入队的维护,不用我们关心,具体对于同步状态state的控制,是我们自己代码来控制,从而实现不同的共享式同步工具 】
Semaphore --- 源码分析
简单例子
public class SemaphoreTest {
public static void main(String[] args) throws InterruptedException {
// 共享资源为3
int count = 3;
// 初始化信号量
Semaphore semaphore = new Semaphore(count);
// 初始化线程池
ThreadPoolExecutor executor = new ThreadPoolExecutor(count, count,
60, TimeUnit.SECONDS, new LinkedBlockingDeque<>(count));
// 提前创建核心线程
executor.prestartCoreThread();
for (int i = 0; i < 5; i++) {
executor.execute(new Runnable() {
@Override
public void run() {
try {
// 尝试获取共享资源
semaphore.acquire();
System.out.println(Thread.currentThread().getName() + " : 成功获取共享资源");
System.out.println(Thread.currentThread().getName() + " : 执行任务开始");
Thread.sleep(2000);
System.out.println(Thread.currentThread().getName() + " : 执行任务结束");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
// 释放共享资源
semaphore.release();
}
}
});
}
// 关闭线程池
executor.shutdown();
// 线程池是否进入 TIDYING 状态
while (executor.getPoolSize() != 0) {
System.out.println("主线程等待5条线程执行完成后往下执行");
Thread.sleep(2000);
}
System.out.println("5条线程已经执行完成");
System.out.println("主线程继续执行下面的任务...");
}
}
- 执行结果 :

- 上面是一个基于信号量实现的一个简单例子 : 信号量初始化了3个共享资源,而线程池开启了5条线程执行任务,前三个线程可以成功竞争到共享资源而执行任务,后面2个线程阻塞于acquire(),等待其他线程调用release()释放共享资源后唤醒竞争共享资源.
介绍
- Semaphore 通常我们叫它信号量,可以用来控制同时访问特定资源的线程数量,通过协调各个线程,以保证合理的使用资源.
- 通常用于那些资源有明确访问数量限制的场景,常用于限流、数据库连接池等
构造器
// 默认采用非公平同步器
public Semaphore(int permits) {
sync = new NonfairSync(permits);
}
// 根据fair选择公平同步器还是非公平
public Semaphore(int permits, boolean fair) {
sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}
Sync同步器
- Semaphore提供了公平、非公平同步器的实现 : FairSync 、NonfairSync .
private final Sync sync;
// 继承了AQS : 底层的同步队列、条件队列由它来维护
abstract static class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 1192457210091910933L;
// 设置共享资源
Sync(int permits) {
setState(permits);
}
// 获取共享资源
final int getPermits() {
return getState();
}
// 非公平的方式尝试获取共享资源
// 直到获取到共享资源 或者 没有共享资源可获取,该函数才返回
final int nonfairTryAcquireShared(int acquires) {
...
}
// 释放共享资源(增加共享资源)
protected final boolean tryReleaseShared(int releases) {
...
}
// 减少共享资源
final void reducePermits(int reductions) {
...
}
// 将共享资源设置为0
final int drainPermits() {
...
}
}
- FairSync :
static final class FairSync extends Sync {
private static final long serialVersionUID = 2014338818796000944L;
FairSync(int permits) {
super(permits);
}
// 公平式获取共享资源
protected int tryAcquireShared(int acquires) {
for (;;) {
if (hasQueuedPredecessors())
return -1;
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
}
- NonfairSync :
static final class NonfairSync extends Sync {
private static final long serialVersionUID = -2694183684443567898L;
NonfairSync(int permits) {
super(permits);
}
// 非公平式获取共享资源
protected int tryAcquireShared(int acquires) {
return nonfairTryAcquireShared(acquires);
}
}
acquire() --- 共享式获取资源(响应中断)
- 非公平获取 :
public void acquire() throws InterruptedException {
// 获取1个资源 : Semaphore只允许竞争1个共享资源
sync.acquireSharedInterruptibly(1);
}
// Semaphore调用的acquire()获取锁,真正实现的函数是这个
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
// 核心在这里
// 共享式获取资源,如果小于0,则表示没有共享资源可以获取了
if (tryAcquireShared(arg) < 0)
// 获取共享资源失败,需要进入同步队列
doAcquireSharedInterruptibly(arg);
}
- tryAcquireShared() : 共享锁获取资源
// 获取共享资源由我们自己来代码实现
// 两种实现,具体如下图
protected int tryAcquireShared(int arg) {
throw new UnsupportedOperationException();
}

- 我们先看非公平获取共享资源 :
protected int tryAcquireShared(int acquires) {
return nonfairTryAcquireShared(acquires);
}
// 最终调用这个函数
final int nonfairTryAcquireShared(int acquires) {
// 自旋
for (;;) {
// 获取当前共享资源
int available = getState();
// 计算共享资源够不够当前线程使用
// 小于0 : 不够
// 等于0 : 刚刚好
// 大于0 :足够
int remaining = available - acquires;
// 如果小于0 则表示 不够 直接返回
// 如果大于等于0,则尝试CAS竞争共享资源
// 失败的话自旋重新获取
// 成功的话 直接返回
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
- tryAcquireShared() : 这个函数的返回结果,小于0表示获取共享资源失败、大于等于0表示获取共享资源成功;回到 acquireSharedInterruptibly()
- acquireSharedInterruptibly() : 如果 tryAcquireShared() 返回小于0,那么将执行 doAcquireSharedInterruptibly()
doAcquireSharedInterruptibly()
- doAcquireSharedInterruptibly() : 如果tryAcquireShared()竞争共享资源失败,需要进入同步队列
- 该函数由AQS自己实现,因为同步队列由AQS自己来维护,不需要我们关注.
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
// 入队,该函数在独占式已经分析过了,本文不具体分析了
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
// 自旋 : 2次机会
for (;;) {
// 获取前继结点 : 前继结点保存了后继结点的waitStatue
final Node p = node.predecessor();
1. // 如果前继结点是head,那么尝试竞争共享资源
if (p == head) {
// 尝试竞争共享资源
int r = tryAcquireShared(arg);
// 竞争成功
if (r >= 0) {
// 设置为head,并唤醒后继结点
// (唤醒后继结点并不是一定的,具体下面会分析)
setHeadAndPropagate(node, r);
p.next = null;
failed = false;
return;
}
}
2. // 前继结点不是head 或者 竞争共享资源失败了
// 那么将设置为SIGNAL状态后挂起
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
- shouldParkAfterFailedAcquire() : 设置SIGNAL状态
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
1. // SIGNAL 不需要设置了
if (ws == Node.SIGNAL)
return true;
2. // 大于0,那么前继结点是取消状态,跳过前继结点,继续向前找(最靠前的那一个结点)
if (ws > 0) {
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
3. // 设置SIGNAL,记录在前继结点
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
【PS : 从上面shouldParkAfterFailedAcquire()代码可知,要设置结点为SIGNAL,必须执行最少2次才可执行parkAndCheckInterrupt()进入阻塞状态】
doAcquireSharedInterruptibly() --- 小结
- 当头结点的后继节点竞争共享资源成功,会调用的setHeadAndPropagate(),该函数下面再继续分析;
- 结点进入阻塞状态前肯定要调用shouldParkAfterFailedAcquire()来设置为SIGNAL状态,且必须经过至少2次的调用,才会调用parkAndCheckInterrupt()进入阻塞状态;
- 头结点的后继结点即使被唤醒过来竞争共享资源,如果竞争共享资源失败也会再次进入阻塞状态 : 第一次竞争失败,会执行第一次shouldParkAfterFailedAcquire()来设置为SIGNAL状态,然后继续自旋第二次,如果第二次还是失败,那么此时将进入阻塞状态;
- 结点的状态保存在前继结点中;
【PS : 以下 setHeadAndPropagate() 、doReleaseShared()、unparkSuccessor() 需要配合理解才能把这三个函数的代码判断串联起来 】
setHeadAndPropagate() --- 设置head
- setHeadAndPropagate() : 设置head,并且判断是否需要唤醒后继结点.
private void setHeadAndPropagate(Node node, int propagate) {
// 保留head
Node h = head;
// 设置head
setHead(node);
// 很多条件,具体下面分析,目前我们知道 propagate是大于等于0的
// 大于0表示还有共享资源(直接短路后面全部判断),需要执行唤醒后继结点竞争共享资源
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
// 唤醒下一结点
Node s = node.next;
// 如果为null是为了防止空指针 : next指针并不是百分百可靠的 或者 node是尾结点
// 如果 s 不为null 继续判断是共享式 需要执行唤醒操作
if (s == null || s.isShared())
// 执行唤醒操作
doReleaseShared();
}
}
- 执行doReleaseShared()唤醒后继结点,为什么propagate条件的后面还需要那么多条件来继续判断?
- doReleaseShared() : 自旋的方式唤醒后继结点
private void doReleaseShared() {
for (;;) {
1. // 获取最新的head
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
2. // 处于SIGNAL状态
if (ws == Node.SIGNAL) {
// 尝试更新为 0 ,即默认正常状态
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue;
// 成功更新为正常状态,执行唤醒操作
unparkSuccessor(h);
}
3. // 处于默认初始化状态,设置为PROPAGATE
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue;
}
4. // head并没有变更,终止唤醒操作
if (h == head)
break;
}
}
- 自旋的方式唤醒后继结点,为什么需要重新获取head、最终终止自旋的前提条件是head没有改变了?
- 为什么会出现第3步骤的 ws == 0 状态,并且设置为PROPAGATE?
- unparkSuccessor() : 唤醒head的后继结点
private void unparkSuccessor(Node node) {
// 获取head记录的状态
int ws = node.waitStatus;
// 可能小于0,设置为0
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
// 获取head的下一结点
Node s = node.next;
// 如果s为null,防止空指针
// 如果s不为null 则唤醒结点的后继结点不能是取消状态
// 那么从tail往前查找最靠前的第一个waitStatus <= 0状态的结点
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
// 唤醒操作
LockSupport.unpark(s.thread);
}
- setHeadAndPropagate() 会改变最新的head,doReleaseShared()的终止执行依赖于head,所以这两函数明显有很大的关联,上面提出的几个问题,需要这两个函数重点分析.
doAcquireSharedInterruptibly() --- 详解
- 要弄清楚以上的问题,我们需要以两种场景来分析 : 单线程场景 、 多线程场景 ;
- 单线程场景 : 执行setHeadAndPropagate(),不会有其他线程执行doReleaseShared(),即两个函数全程只出现一条线程在执行,也就算说单线程串行的执行setHeadAndPropagate() 、doReleaseShared()
- 多线程场景 : 多个线程并行的执行 setHeadAndPropagate() 、doReleaseShared()
回顾一下,从doAcquireSharedInterruptibly()我们可以知道 :
因为头结点的后继结点唤醒后有可能竞争不到资源,所以头结点的后继结点可能是阻塞状态,且头结点的状态保存了后继结点的状态(结点的状态保存在前继结点中)
头结点的后继结点竞争到共享资源,会调用setHeadAndPropagate()设置自己为head结点
如果有共享资源,会调用doReleaseShared()唤醒后继结点竞争共享资源
调用 doReleaseShared()的判断条件有很多,其中第一个条件就是当前存在共享资源,直接短路后面所有判断条件;后面很多判断条件,需要结合多线程场景来分析;
doReleaseShared()是一个可能出现多线程并发执行的自旋函数,因为存在多线程并行的释放共享资源;具体需要结合多线程发场景来解析代码细节 :
- 如果head结点记录的是SIGNAL,那么需要唤醒头结点,通过CAS设置为0状态,如果设置失败,那么表示被其他线程改变了状态,重新自旋获取最新的head;如果设置成功,那么执行unparkSuccessor()唤醒操作;
- 当执行到 ws == 0 时需要尝试设置为PROPAGATE;
- 自旋期间head可能会改变,自旋终止于head没有被其他线程改变;
PS : doAcquireSharedInterruptibly() : 被唤醒的结点竞争共享资源会遇到这种情况 : 第一次竞争失败,会调用shouldParkAfterFailedAcquire() 从状态0再次设置为 -1 , 然后再次自旋继续尝试竞争,如果第二次竞争成功,那么就会进入setHeadAndPropagate()设置头结点,此时头结点的waitStatus因为第一次竞争失败而被设置为-1 , 执行到下面多条件判断会出现不必要的唤醒操作;也就是说 头结点的waitStatus 可能会有 -1 这种状态,这种状态在我下面分析的单线程场景、多线程场景,我们假设没遇到这种情况,另外做分析.
单线程场景
- 单线程场景 : 我们现在假设全程只有一条线程在执行 setHeadAndPropagate() 、doReleaseShared() :
- 线程1执行顺序 : releaseShared() --> doReleaseShared() --> unparkSuccessor()
- 被线程1唤醒的结点执行顺序 : doAcquireSharedInterruptibly() 唤醒后继续自旋竞争资源,如果竞争资源成功,那么执行顺序是 doAcquireSharedInterruptibly() --> setHeadAndPropagate() ,如果存在共享资源,那么执行顺序是 doAcquireSharedInterruptibly() --> setHeadAndPropagate() --> doReleaseShared() --> unparkSuccessor() ;
- 当线程调用 doReleaseShared() :
for (;;) {
// 获取head
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
// 这里肯定成功,因为在此场景没有其他线程参与这个函数的执行
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue;
// 唤醒的结点,状态是0
// 肯定是走这里,唤醒结点
unparkSuccessor(h);
}
// 这里可以得出总结 : 多条线程执行这个函数才可能走这个判断
// 具体下面再分析
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue;
}
// 直接退出,因为在此场景没有其他线程参与setHeadAndPropagate()的执行
// 所以head并没有改变
if (h == head)
break;
}
}
- unparkSuccessor()唤醒结点,结点在 doAcquireSharedInterruptibly() 中继续自旋竞争共享资源;
- doAcquireSharedInterruptibly() : 被唤醒后如果竞争到资源,那么执行setHeadAndPropagate()设置新的head,如果存在共享资源,那么将会调用 doReleaseShared() 来唤醒后继结点;
- 在线程执行 setHeadAndPropagate() 的时候,且没有其他线程执行 doReleaseShared()的这种情况下 : 头结点的状态不会出现被其他线程执行doReleaseShared()改变为PROPAGATE;
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head;
setHead(node);
// propagate > 0 存在共享资源则需要执行唤醒操作,后面的条件不用看
// propagate <= 0 不存在共享资源,需要判断后面的条件
// PS : 在线程执行 setHeadAndPropagate() 的时候,且没有其他线程执行 doReleaseShared() 的这种情况下
// h.waitStatus 是 0 ,所以后面的 h.waitStatus条件都是false(判断null是防止空指针)
// 如果是 h.waitStatus < 0 是true的, 那么是出现多条线程同时执行了
// 即setHeadAndPropagate() 和 doReleaseShared() 分别有线程在执行
// h.waitStatus < 0 为true时
// 对应的其实就是 另一条线程在执行doReleaseShared()的CAS PROPAGATE状态
// PROPAGATE状态不懂的可以暂不纠结,下一个场景会说明
// 这里暂时知道 :
// 单条线程在执行 setHeadAndPropagate() 的时候,且没有其他线程在执行 doReleaseShared() 情况下 ,h.waitStatus 肯定是 0
// 多条线程同时在执行 setHeadAndPropagate()、doReleaseShared() 情况下 如果h.waitStatus < 0 那么就会出现PROPAGATE状态
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
if (s == null || s.isShared())
doReleaseShared();
}
}
如下图 :

- 只有一条线程在执行 setHeadAndPropagate() 和 doReleaseShared()的情况下,很好理解,上面基于单条线程的执行场景只是为了更好的理解多线程场景的PROPAGATE状态 、 setHeadAndPropagate() 的多判断条件;因为只有多条线程执行这两个函数才会真正触发设置 PROPAGATE状态,紧接着相关联的setHeadAndPropagate() 的多条件状态判断会变为true;
- 由于是共享式,所以会存在多线程同时释放共享资源,即多线程执行doReleaseShared(), 那么setHeadAndPropagate() 和 doReleaseShared() 就会存在并发执行,接下来分析这种多线程并发执行setHeadAndPropagate() 和 doReleaseShared()的场景.
多线程场景
- 当只有一条线程在执行 setHeadAndPropagate() 和 doReleaseShared() 时,不会出现 PROPAGATE状态,setHeadAndPropagate() 的多条件判断状态(h.waitStatus)判断肯定是false.
- 当多条线程并行执行 setHeadAndPropagate() 和 doReleaseShared() 时, 会出现 PROPAGATE状态,则setHeadAndPropagate() 的多条件判断状态(h.waitStatus)判断会出现是true.
- doReleaseShared()的自旋终止取决于head,即setHeadAndPropagate()的setHead(node)
- setHeadAndPropagate()的setHead(node)只会存在一条线程执行,即头结点的后继结点
setHeadAndPropagate() 和 doReleaseShared() 并发执行时 :
- 当线程释放资源后执行 doReleaseShared() 来唤醒后继结点 :
- 如果头结点状态是SIGNAL : 尝试CAS设置为0状态(这里保证了只有一条线程来唤醒后继结点),如果成功则执行 unparkSuccessor(h) 唤醒这个后继结点,如果失败,那么表示这个后继结点的唤醒由其他线程完成了;
- 如果头结点状态是0,并不是SIGNAL,那么尝试CAS 设置为 PROPAGATE状态 , 失败的话则重新自旋,重新获取最新的head; 成功的话,看看head是不是变更了, 此时的head如果没变更,那么就会退出本次自旋;如果变更了,说明是其他线程在他之前执行过 setHead(node) 了, 那么可能需要继续唤醒后继结点,继续重新自旋 .
- 当线程执行setHeadAndPropagate(),此时的head的状态要么是0 或者 -3 :
- 执行 Node h = head 保存head,然后执行 setHead(node); 设置最新的head结点.
- 多条件判断 :
- propagate > 0 : 表示存在共享资源,需要唤醒操作;
- h == null || h.waitStatus < 0 : 第一个条件是防止空指针,然后判断是否为 PROPAGATE状态,如果是 肯定需要唤醒操作
- (h = head) == null || h.waitStatus < 0 : 再次重新获取最新的head,并且防止空指针,然后判断是否为 PROPAGATE状态,如果是 肯定需要唤醒操作;(PS : 这里为什么要重新获取最新的head,下面会画图分析)
- propagate > 0 为false , 而 第一个 h.waitStatus < 0 为true 如下图 :

- propagate > 0 为false , 第一个 h.waitStatus < 0 为false, 而第二个 h.waitStatus < 0 为true 如下图 :

- PROPAGATE状态其实就是潜意识的告诉正在执行 setHeadAndPropagate() 的线程 :即使你获取共享资源成功后没有更多的共享资源(propagate<=0),那么也有可能有共享资源,需要你来执行doReleaseShared().
- 当 多线程并发执行 setHeadAndPropagate() 和 doReleaseShared() 时, 通过PROPAGATE状态来保证共享资源获取、释放的连续性.
- doReleaseShared() 保证了只有一条线程成功释放head的后继结点,且自旋终止于head是否变更,即setHeadAndPropagate()的 setHead();
- setHeadAndPropagate() 核心任务的设置最新的head,调用 setHead() 设置head后,正在执行 doReleaseShared() 的线程中只有一条线程会去执行唤醒最新head的后继结点.
- 当 propagate > 0 为false 时, 说明该线程在成功获取到共享资源后,没有剩余的共享资源了,是不需要执行 doReleaseShared() 来唤醒后继结点的, 但是由于是共享资源,所以无时不刻有其他线程在释放资源,所以 propagate 并不是实时、正确的,当线程在执行doReleaseShared()时,发现要唤醒的后继结点是已经被唤醒状态了(h.waitStatus == 0),那么需要设置为PROPAGATE状态,通过该状态来告诉head的后继结点当前有可能有共享资源,需要你来执行doReleaseShared().
不必要的唤醒
- 头结点的waitStatus 可能会有 -1 这种状态,但是头结点的后继结点其实是唤醒状态的,不需要再被唤醒,这样导致的结果就是执行了不必要的唤醒.
- doAcquireSharedInterruptibly() :
- 被唤醒的结点竞争共享资源会遇到这种情况 : 第一次竞争失败,会调用shouldParkAfterFailedAcquire() 从状态0再次设置为 -1 , 然后再次自旋继续尝试竞争,如果第二次竞争成功,那么就会进入setHeadAndPropagate()设置头结点,那么此时头结点的waitStatus == -1 , 执行到下面多条件判断会出现不必要的唤醒操作;
- 或者自旋第一次的时候,前继结点并非head,所以此时会调用shouldParkAfterFailedAcquire() 设置为 -1,但是自旋第二次,此时的前继结点的head,会尝试竞争共享资源,如果竞争成功,那么就会进入setHeadAndPropagate()设置头结点,那么此时头结点的waitStatus == -1 , 执行到下面多条件判断会出现不必要的唤醒操作;

小结
- PROPAGATE状态实现的意义是 传播式的唤醒后继结点,当前head的status是0,所以把0变成PROPAGATE,好让被唤醒线程可以检测到有共享资源可以竞争;
- setHeadAndPropagate函数用来设置新head,并在一定情况下调用doReleaseShared;
- head的后继结点在-1状态下可能是唤醒了的;
release() --- 共享式释放资源
- 当持有共享资源的线程执行完临界资源后将调用 release() 释放共享资源给其他线程获取
public void release() {
// 释放一个共享资源
sync.releaseShared(1);
}
public final boolean releaseShared(int arg) {
// 释放共享资源
if (tryReleaseShared(arg)) {
// 唤醒头结点的后继结点
doReleaseShared();
return true;
}
return false;
}
- tryReleaseShared() : 释放共享资源,由我们自己实现
protected final boolean tryReleaseShared(int releases) {
// 自旋
for (;;) {
int current = getState();
// 增加资源
int next = current + releases;
if (next < current)
throw new Error("Maximum permit count exceeded");
// CAS设置
if (compareAndSetState(current, next))
return true;
}
}
相关API
- 共享式获取资源 :
- acquire() : 该函数上面源码已经分析了,阻塞式调用,只需调用一次即可,如果获取不到共享资源,那么将进入阻塞,直到返回(成功竞争到共享资源).是一个响应中断的共享式获取资源,.
- acquireUninterruptibly : 该函数是阻塞式调用,只需调用一次即可,如果获取不到共享资源,那么将进入阻塞,直到返回(成功竞争到共享资源).是一个非响应中断的共享式获取资源,与上面不同的是即使被中断了也不会立马抛出中断异常,而是竞争到共享资源后,发现被中断过,才抛出中断异常
- tryAcquire() : 该函数是非阻塞式调用,调用后立马返回竞争共享资源的结果
- tryAcquire(long timeout, TimeUnit unit) : 该函数是阻塞式且响应中断调用,根据timeout唤醒竞争共享资源的同步队列结点,即竞争共享资源失败后加入同步队列,timeout时间后唤醒继续竞争共享资源.
- 共享式释放资源 :
- release() : 该函数上面源码已经分析了,释放共享资源后唤醒头结点的后继节点.
CountDownLatch --- 源码分析
简单例子
public class CountDownLatchTest {
public static void main(String[] args) {
int count = 3;
// 初始化闭锁
CountDownLatch countDownLatch = new CountDownLatch(count);
ThreadPoolExecutor executor = new ThreadPoolExecutor(count, count,
60, TimeUnit.SECONDS, new LinkedBlockingDeque<>(count));
// 提前创建核心线程
executor.prestartCoreThread();
for (int i = 0; i < 3; i++) {
executor.execute(new Runnable() {
@Override
public void run() {
try {
System.out.println(Thread.currentThread().getName() + " : 执行任务开始");
Thread.sleep(2000);
System.out.println(Thread.currentThread().getName() + " : 执行任务结束");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
// 线程任务执行完毕,调用countDown()减一
countDownLatch.countDown();
}
}
});
}
// 关闭线程池
executor.shutdown();
try {
System.out.println("主线程阻塞,等待上面三条线程执行完成后执行");
countDownLatch.await();
System.out.println("主线程被唤醒,上面三条线程已经执行完成");
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("主线程继续执行下面的任务...");
}
}
- 上面是一个简单的使用例子 : 初始化三条线程来并行的执行任务,每个线程执行完任务后将调用countDown()减一(state 减一);而主线程将调用await()而阻塞进入等待队列,等待最后一条线程执行完任务后的唤醒;当最后一条线程完成任务后调用countDown()减一,此时的state == 0 ,表示三条线程已执行完成,那么将唤醒等待队列中的主线程,主线程从await()唤醒过来,继续向下执行任务.
介绍
- CountDownLatch又叫闭锁,在JDK1.5被引入,允许一个或多个线程等待其他线程完成操作后再执行.
- CountDownLatch内部会维护一个初始值为线程数量的计数器,主线程执行await方法,如果计数器大于0,则阻塞等待.当一个线程完成任务后,计数器值减1.当计数器为0时,表示所有的线程已经完成任务,等待的主线程被唤醒继续执行.
- 应用场景 : 多个线程(任务)完成后,进行汇总合并
- 不足 : CountDownLatch是一次性的,计数器的值只能在构造方法中初始化一次,之后没有任何机制再次对其设置值,当CountDownLatch使用完毕后,它不能再次被使用.
- CountDownLatch的底层实现也是AQS的共享式
CountDownLatch构造器
通过构造器初始化state , 代码如下 :
// count为初始化AQS的state
public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
// 同步器
this.sync = new Sync(count);
}
还是熟悉的自定义Sync同步器 , 代码如下 :
// 同步器
private static final class Sync extends AbstractQueuedSynchronizer {
// 设置计数,即设置AQS的同步状态state为count
Sync(int count) {
setState(count);
}
// 获取计数state
int getCount() {
return getState();
}
// 该函数的作用是访问同步状态state是否为0
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
// 共享式释放锁,即state减一,countDown()调用的就是这个函数
protected boolean tryReleaseShared(int releases) {
......
}
}
countDown() 和 await()
CountDownLatch常用的 countDown() 和 await() 这两个函数 :
// state减1(同步状态state减少1)
public void countDown() {
sync.releaseShared(1);
}
// state不为0时,是在等待队列(阻塞状态)
// state为0时,被唤醒
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
countDown()
共享式释放锁 --- releaseShared()
- CountDownLatch调用countDown(),即调用sync.releaseShared(1),该函数对应的实现代码如下 : 总共有两个函数 : tryReleaseShared(arg) \ doReleaseShared()
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
看看第一个函数 : tryReleaseShared(arg) :
// 释放锁操作由我们自己实现
protected boolean tryReleaseShared(int arg) {
throw new UnsupportedOperationException();
}
具体看子类,如下图 :

点击CountDownLatch跳转到实际的实现代码如下 :
// 共享式释放锁,在Sync中
protected boolean tryReleaseShared(int releases) {
// 1. 自旋方式释放
for (;;) {
// 2. 获取当前的同步状态state
int c = getState();
// 3. 计数为0,表示没有共享资源了,可能出现这种情况 :
// 执行线程数大于state,防止state减少到小于0,即初始化的时候执行线程数应该跟state相等
// 当然有一些特殊场景支持执行线程数大于state
if (c == 0)
return false;
// 4. state减1
int nextc = c-1;
// 5. CAS设置,失败则自旋继续,成功则判断是否最后一个资源被我释放了,是则执行唤醒操作
if (compareAndSetState(c, nextc))
// 6. 计数为0,即共享资源释放完成,需要执行doReleaseShared()唤醒操作,
// 唤醒的线程会在await()中唤醒,继续往下执行
return nextc == 0;
}
}
当最后一个资源成功释放后,该线程需要执行唤醒操作,即执行doReleaseShared(),该函数在信号量中已经分析过了,就不具体分析了.
await()
- await() : 当共享资源不等于0时,说明还存在线程未完成任务,那么将加入到同步队列
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)
// 存在共享资源,加入到同步队列,该函数已经分析过了,这里就不分析了
doAcquireSharedInterruptibly(arg);
}
// 存在共享资源,返回-1
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
小结
- 线程调用await(),如果此时还有共享资源,说明还有线程未完成任务,那么将加入同步队列;
- 线程调用countDown()对共享资源减1,当共享资源减少为0时,执行doReleaseShared()唤醒头结点的后继结点;
- 头结点的后继节点从await()中唤醒过来,继续往下执行;
相关API
- countDown() : 共享资源减1,直到共享资源减少到为0,就会执行doReleaseShared()唤醒头结点的后继结点.
- await() : 阻塞式且响应中断调用,只需调用一次,直到共享资源为0时被唤醒.
结束语
- 信号量实现了公平、非公平两种方式的同步器,非公平方式可以极大的提高吞吐量,基于并发编程基础:ReentrantLock之AQS独占式源码分析这篇文章有说到公平方式的竞争锁,两者是类似的,本文就不详细叙述了.
- 原创不易
- 希望看完这篇文章的你有所收获!
相关参考资料
- JDK1.8