
管程机制
在学习Synchronized之前,了解一下操作系统提供的一种解决并发编程的思想 --- 管程,英文名称叫"Monitor",在Java领域中一般称之"监视器",操作系统领域称之为"管程".
所谓管程,指的是管理共享变量以及对共享变量的操作过程,让他们支持并发.翻译为 Java 领域的语言,就是管理类的成员变量和成员方法,让这个类是线程安全的.
管程 --- MEAS 模型
在管程的发展史上,先后出现过三种不同的管程模型,分别是:Hasen 模型、Hoare 模型和 MESA 模型.其中,现在广泛应用的是 MESA 模型,并且 Java 管程的实现参考的也是 MESA 模型.
管程解决了并发编程的两大核心问题 :
- 互斥 : 即同一时刻只允许一个线程访问共享资源
- 同步 : 即线程之间如何通信、协作(线程的阻塞和唤醒控制)
管程 --- 互斥
管程解决互斥的方法 : 将共享变量和操作共享变量的操作统一封装起来,如下面抽象出来的代码,如果存在多条线程访问共享变量,只能通过入队、出队的方法来实现,这两方法保证了互斥性,只允许一条线程进入管程.
Monitor m {
// 共享变量
var queue;
// 入队
function enq();
// 出队
function deq();
}
管程 --- 同步
管程采用了 MESA 模型,如下图 : 入口维护了一个同步队列(图中是等待队列),只能一条线程操作共享变量,即一个线程获取到锁,而其他线程则在入口的同步队列中等待.
管程引入了条件变量,每个条件变量对应有一个等待队列(也可以叫条件队列),每个条件变量都有自己对应的等待队列,解决了线程的同步问题,即解决了线程间的协调、通信问题.

阻塞队列 --- 生产者消费者模式
Java在代码层次也实现了类似的管程思想 : 例如 AQS \ await() \ signal() \ signalAll() \ Condition对象 等这些结合起来实现的管程思想,其中提供的ReetrantLock\ReetrantReadWriteLock等锁的底层都是基于AQS来实现,但本文不关心AQS,本文核心重点是Synchronized.
多线程中最常用的一种设计模式 : 生产者消费者模式,而阻塞队列支持生产者消费者模式,Java 提供的阻塞队列中有ArrayBlockingQueue和LinkedBlockingQueue 等.
阻塞队列 : 阻塞队列支持生产者消费者模式 : 有数据生成时,生产者将数据放入队列,而消费者处理数据时,将从队列中获取数据.生产者不需要知道消费者的标识或者数量,或者他们是否是唯一的消费者,只需要将数据放入队列即可.同样,消费者也不需要知道生产者是谁.
在有界队列的前提下,生产者生产数据放入队列中,此时队列中存储的数据如果为满,则挂起当前的线程,该线程就会加入到队列不为满的这个条件的Condition的等待队列中;同样的,消费者消费数据需要从队列中拿到数据,如果队列中此时没有存储数据,则挂起当前的线程,该线程就会加入到队列不为空的这个条件的Condition的等待队列中.
Java提供的Condition对象就是我们所说的条件变量,每一个Condition对象都有相对应的等待队列.其中提供的阻塞队列中例如ArrayBlockingQueue和LinkedBlockingQueue等 就有使用到Condition对象 :
// 借用了LinkedBlockingQueue 的几个全局变量
/** 获取并移除元素时使用的锁,如take, poll, etc */
private final ReentrantLock takeLock = new ReentrantLock();
/** notEmpty条件对象,当队列没有数据时用于挂起执行删除的线程(即取任务的消费者) */
private final Condition notEmpty = takeLock.newCondition();
/** 添加元素时使用的锁如 put, offer, etc */
private final ReentrantLock putLock = new ReentrantLock();
/** notFull条件对象,当队列数据已满时用于挂起执行添加的线程 */
private final Condition notFull = putLock.newCondition();
// 阻塞队列的简单思路
public class BlockedQueue<T>{
final Lock lock = new ReetrantLock();
// 条件变量:队列不满
final Condition notFull = lock.newCondition();
// 条件变量:队列不空
final Condition notEmpty = lock.newCondition();
// 入队
void enq (T t) {
// 获取锁
lock.lock();
try {
while (队列为满) {
// 挂起,进入 队列不满 相对应的等待队列等待
notFull.await();
}
// 省略入队操作...
// 入队后, 通知(唤醒) 队列不空 相对应的等待队列
notEmpty.signal();
} finally {
// 释放锁
lock.unlock();
}
}
// 出队
void deq (T t) {
// 获取锁
lock.lock();
try {
while (队列为空) {
// 挂起,进入 队列不空 相对应的等待队列等待
notEmpty.await();
}
// 省略出队操作...
// 出队后,通知(唤醒) 队列不满 相对应的等待队列
notFull.signal();
} finally {
// 释放锁
lock.unlock();
}
}
}
- 对于入队操作,如果队列已满,就需要等待直到队列不满,所以这里用了notFull.await();
- 对于出队操作,如果队列为空,就需要等待直到队列不空,所以就用了notEmpty.await();
- 如果入队成功,那么队列就不空了,就需要通知条件变量:队列不空notEmpty对应的等待队列;
- 如果出队成功,那就队列就不满了,就需要通知条件变量:队列不满notFull对应的等待队列;
【PS : await() \ signal() \ signalAll() 相对应的 wait() \ notify() \ notifyAll() 语义是一样的】
【PS : 除非经过深思熟虑,否则尽量使用 notifyAll(),因为使用notify()可能会导致线程进入饥饿状态】
小结
并发编程里两大核心问题——互斥和同步,都可以由管程来帮你解决.学好管程,理论上所有的并发问题你都可以解决,并且很多并发工具类底层都是管程实现的,所以学好管程,就是相当于掌握了一把并发编程的万能钥匙.
Synchronized --- 本文核心
以上知识点有助于我们对管程初步了解,那么接下来进入Synchronized知识点,Synchronized升级为重量级锁的时候,其底层采用的ObjectMonitor对象实现思路与管程有着很大的相似.
在Java并发编程中,Synchronized一直都是一个元老级别的重量级锁,随着Java SE 1.6 对Synchronized的优化(锁升级),引入了"偏向锁" 和 "轻量锁",关于"偏向锁" 和 "轻量锁"以及锁升级,可以看这篇文章 : 并发编程基础:Java对象头及锁升级
由上面文章我们可以知道 : 当锁处于轻量级锁状态的时候,对象头的Mark Word是指向Lock Record(指向第一个,可重入情况下有多个Lock Record),其他获取不到的线程就会进入自旋状态;我们在这里假设自旋中的线程是无数次自旋的,并且不会升级会重量级锁,而且我们知道Lock Record是线程私有的,那么自旋中的线程只能依靠一直自旋而看能不能获取到锁,即轻量级锁由于Lock Record,解决了并发的互斥问题,但还是不能解决并发的同步问题,所以,当锁竞争激烈的时候,轻量级锁升级为重量级锁,重量级锁(Synchronized)解决了互斥、同步问题.
当锁由轻量级锁升级为重量级锁的时候,JVM就会申请一个锁对象的ObjectMontor对象,锁对象的对象头Mark Word此时存储的指针就是该ObjectMontor.
【PS : Java 参考了 MESA 模型,语言内置的管程(ObjectMonitor)对 MESA 模型进行了精简.MESA 模型中,条件变量可以有多个,Java 语言内置的管程里只有一个条件变量,并且与 wait() \ notify() \ notifyAll() 这三个方法都是管程的组成部分. 】

ObjectMonitor机制
Synchronized同步锁是在JVM层面上实现的,当Synchronized升级为重量级锁的时候,是基于管程(Monitor)对象来实现.Monitor 是由 ObjectMonitor对象 实现,而 ObjectMonitor对象 是由 C++ 的 objectMonitor.hpp 文件实现.
ObjectMonitor机制就是Synchronized同步块锁机制升级为"重量级锁"后的工作机制,即当Synchronized同步锁升级为"重量级锁"的时候,会向JVM申请ObjectMonitor对象,直奔主题,看看ObjectMonitor长啥样 :
//ObjectMonitor.hpp
ObjectMonitor() {
// 记录无锁状态的Mark Word
_header = NULL;
_count = 0;
// 等待锁的线程个数
_waiters = 0,
// 线程重入次数
_recursions = 0;
// 指向的对象头
_object = NULL;
// 指向线程或者Lock Record,即指向了当前持有锁线程
_owner = NULL;
// 调用wait()方法后等待锁的队列,即条件变量的等待队列
_WaitSet = NULL;
// 等待队列的锁
_WaitSetLock = 0 ;
_Responsible = NULL ;
// 下一个被唤醒的线程
_succ = NULL ;
// ObjectWaiter 队列
_cxq = NULL ;
FreeNext = NULL ;
// ObjectWaiter 队列,即同步队列
_EntryList = NULL ;
_SpinFreq = 0 ;
_SpinClock = 0 ;
OwnerIsThread = 0 ;
_previous_owner_tid = 0;
}
可以看出,ObjectMonitor提供的信息量可比线程私有栈信息的Lock Record爆炸多了
- header : 记录无锁状态的Mark Word,同Lock Record的Displaced Mark Word字段
- ObjectWaiter : 一个双向链表结构对象,封装了线程的信息,包括当前线程的状态
- EntryList : 当多个线程同时访问同步代码时,这些因自旋到达次数后还没有竞争到锁的线程就会进入阻塞,将封装成ObjectWaiter结点进入EntrySet双向链表(EntryList);而获取到锁的线程,即获取到锁对象的Monitor,Monitor依赖于操作系统底层的Mutex Lock,当线程获取到Mutex Lock时,其他线程将不能获取到Mutex Lock.
- WaitSet : 当线程调用wait()方法,该线程将主动释放Mutex Lock,并且进入阻塞,将封装成ObjectWaiter结点进入WaitSet双向链表,等待下一次被其他线程调用notify()或者notifyAll()唤醒开始继续竞争锁;
- owner : 加锁和释放锁是通过owner,即指向了持有锁的线程的Lock Record或者当前持有锁线程
- recursions : 线程重入次数,即表示Synchronized支持可重入
- object : 指向了对象头
- cxq : cxq队列存储的是指向enter函数的时候因为锁已经被其他线程占有而阻塞的线程(单向链表)
- succ : 下一个被唤醒线程
【PS : Synchronized同步锁为重量级锁的情况下采用了Monitor,因为Monitor是依赖于操作系统底层的Mutex Lock,所以阻塞、唤醒的时候存在用户态与内核态之间的切换,增加了性能开销】
ObjectWaiter节点 : objectMonitor.hpp 文件的第38行代码.
class ObjectWaiter : public StackObj {
public:
enum TStates { TS_UNDEF, TS_READY, TS_RUN, TS_WAIT, TS_ENTER, TS_CXQ };
// 指向下一个节点
ObjectWaiter * volatile _next;
// 指向上一个节点
ObjectWaiter * volatile _prev;
// 线程
Thread* _thread;
// 线程状态
volatile TStates TState;
public:
ObjectWaiter(Thread* thread);
};
锁膨胀为重量锁的过程
直接看代码, synchronizer.cpp 文件的第1211行代码 --- inflate()函数
// oop : 指的是对象头,根据oop可以拿到Mark Word
ObjectMonitor * ATTR ObjectSynchronizer::inflate (Thread * Self, oop object) {
// 自旋获取ObjectMonitor
for (;;) {
1. 根据对象头oop获取Mark Word
const markOop mark = object->mark();
2. 判断当前Mark Word存储的是否Monitor指针,如果是,直接返回Monitor对象
if (mark->has_monitor()) {
ObjectMonitor * inf = mark->monitor();
return inf;
}
3. 当前的锁正在膨胀中,直接跳过,重新循环
if (mark == markOopDesc::INFLATING()) {
ReadStableMark(object) ;
continue ;
}
4. 当前为轻量级锁
if (mark->has_locker()) {
4.1 申请ObjectMonitor对象
ObjectMonitor * m = omAlloc (Self);
4.2 初始化参数
m->Recycle();
m->_Responsible = NULL ;
m->OwnerIsThread = 0 ;
m->_recursions = 0 ;
m->_SpinDuration = ObjectMonitor::Knob_SpinLimit ;
4.3 CAS尝试将Mark Word设置为膨胀状态
markOop cmp = (markOop) Atomic::cmpxchg_ptr (markOopDesc::INFLATING(), object->mark_addr(), mark) ;
if (cmp != mark) {
4.4 修改失败,重新开始循环
omRelease (Self, m, true) ;
continue ;
}
4.5 修改成功,即获取到锁,然后去自己之前的线程栈中的Lock Record拿到Mark Word(无锁状态信息)
markOop dmw = mark->displaced_mark_helper() ;
4.6 将Mark Word(无锁状态信息) 设置到ObjectMonitor的_header字段里
m->set_header(dmw) ;
4.7 设置_owner字段,即指向了锁的持有者线程栈的Lock Record(因为当前是轻量级锁升级)
m->set_owner(mark->locker());
4.8 object为oop对象头,即_object指向了对象头
m->set_object(object);
4.9 对象头oop的Mark Word存储指向ObjectMonitor对象(m)
object->release_set_mark(markOopDesc::encode(m));
...
4.2 成功,返回ObjectMonitor
return m ;
}
5. 无锁状态,首先申请ObjectMonitor
ObjectMonitor * m = omAlloc (Self) ;
6. 初始化及设置属性值
m->Recycle();
m->set_header(mark);
m->set_owner(NULL);
m->set_object(object);
m->OwnerIsThread = 1 ;
m->_recursions = 0 ;
m->_Responsible = NULL ;
m->_SpinDuration = ObjectMonitor::Knob_SpinLimit ;
7. CAS将Mark Word设置为指向ObjectMonitor的指针
if (Atomic::cmpxchg_ptr (markOopDesc::encode(m), object->mark_addr(), mark) != mark) {
7.1 CAS失败,重新循环
m->set_object (NULL) ;
m->set_owner (NULL) ;
m->OwnerIsThread = 0 ;
m->Recycle() ;
omRelease (Self, m, true) ;
m = NULL ;
continue ;
}
...
8. 成功,返回ObjectMonitor
return m ;
}
}
- 在Mark Word为轻量级锁的情况下,可能存在多线程膨胀重量级锁,所以只能一条线程CAS设置Mark Word为膨胀状态,其他线程设置失败的,将重新自旋获取ObjectMonitor.
- 成功升级为重量级锁,即CAS设置Mark Word为膨胀状态成功,对象头oop的Mark Word通过markOopDesc::encode(m)函数设置指向申请的ObjectMonitor对象,其中m为ObjectMonitor对象
static markOop encode(ObjectMonitor* monitor) {
intptr_t tmp = (intptr_t) monitor;
//Mark Word指向ObjectMonitor
return (markOop) (tmp | monitor_value);
}

【PS : 从图中可以知道,markOopDesc::encode(m)函数可安全的设置,不存在多线程竞争】
重量锁获取锁过程
偏向锁、轻量级锁是围绕Mark Word,而重量级锁是围绕ObjectMonitor : objectMonitor.cpp.
初次尝试获取锁 --- enter()函数
objectMonitor.cpp 文件的第321行代码 --- enter()函数
void ATTR ObjectMonitor::enter(TRAPS) {
1. 获取当前线程
Thread * const Self = THREAD ;
void * cur ;
2. CAS尝试设置_owner指向当前线程
cur = Atomic::cmpxchg_ptr (Self, &_owner, NULL) ;
3. CAS修改成功,则获取了重量级锁
if (cur == NULL) {
return ;
}
---------------CAS修改失败就执行一下操作
4. _owner指向的是当前线程,可重入_recursions加1
if (cur == Self) {
_recursions ++ ;
return ;
}
5. _owner指针指向的是一个Lock Recod,说明之前是轻量级锁的持有者,并且Lock Recod是当前线程的
if (Self->is_lock_owned ((address)cur)) {
5.1 重入次数为1次
_recursions = 1 ;
5.2 不指向Lock Record了,而是指向当前线程
_owner = Self ;
OwnerIsThread = 1 ;
return ;
}
....
------------------- 执行到这里,说明没成功获取到锁,进入自旋
6. 自旋
for (;;) {
6.1 获取锁函数,下面会细说
EnterI (THREAD) ;
6.2 获取锁成功,设置属性
_recursions = 0 ;
_succ = NULL ;
6.3 释放锁函数,下面会细说
exit (false, Self) ;
jt->java_suspend_self();
}

由上图可清晰知道,ObjectMonitor是围绕_owner字段来竞争锁的,记下来看看进入自旋的EnterI()函数.
自旋尝试获取锁 --- EnterI()函数
同样的还是objectMonitor.cpp文件的第508行代码.
void ATTR ObjectMonitor::EnterI (TRAPS) {
1. 获取当前线程
Thread * Self = THREAD ;
2. 尝试获取锁
if (TryLock (Self) > 0) {
return ;
}
3. 自旋尝试获取锁
if (TrySpin (Self) > 0) {
return ;
}
-------------执行到这里获取锁还是失败了,进入同步队列
4. 创建ObjectWaiter对象
ObjectWaiter node(Self) ;
5. 挂起/唤醒线程重置参数,此时的线程还没挂起
Self->_ParkEvent->reset() ;
5.1 前驱节点为无效节点(cxq为单向链表)
node._prev = (ObjectWaiter *) 0xBAD ;
5.2 当前节点状态为CXQ,也就是说节点在_cxq队列里
node.TState = ObjectWaiter::TS_CXQ ;
6. 将生成的节点放入_cxq队列头节点中,最后在阻塞前继续尝试还能不能获取到锁
ObjectWaiter * nxt ;
for (;;) {
node._next = nxt = _cxq ;
6.1 CAS设置节点为_cxq队列的头节点,成功则停止自旋
if (Atomic::cmpxchg_ptr (&node, &_cxq, nxt) == nxt) break ;
6.2 加入_cxt队列失败,再次尝试获取锁
if (TryLock (Self) > 0) {
return ;
}
}
#start : 自旋开始
7. 成功加入_cxt队列,此时还未阻塞,而是继续尝试获取锁
for (;;) {
7.1 尝试获取锁
if (TryLock (Self) > 0) break ;
if ((SyncFlags & 2) && _Responsible == NULL) {
Atomic::cmpxchg_ptr (Self, &_Responsible, NULL) ;
}
8. 挂起线程
if (_Responsible == Self || (SyncFlags & 1)) {
8.1 挂起有超时时间(RecheckInterval)
Self->_ParkEvent->park ((jlong) RecheckInterval) ;
8.2 设置时间值RecheckInterval
RecheckInterval *= 8 ;
if (RecheckInterval > 1000) RecheckInterval = 1000 ;
} else {
8.3 挂起没有超时时间
Self->_ParkEvent->park() ;
}
------------------ 接下来的被其他线程唤醒后的操作
9. 唤醒后继续尝试获取锁
if (TryLock(Self) > 0) break ;
9.1 ...还是一些自旋策略来获取锁
}
#end : 自旋结束
------------------------ 执行到这里说明在上面自旋中获取到锁了
10. 将节点从_cxq或_EntryList里移除
UnlinkAfterAcquire (Self, &node) ;
...
return ;
}
从第3~7步骤,在没有获取到锁的情况下,一直在尝试获取锁,直到第8才挂起线程
尝试获取锁的函数 : TryLock (),第489行代码,该函数只执行1次尝试获取锁
int ObjectMonitor::TryLock (Thread * Self) {
1. 自旋,循环名存实亡
for (;;) {
1.1 获取_owner
void * own = _owner ;
1.2 判断_owner是否已经被更改,若是则退出
if (own != NULL) return 0 ;
1.3 CAS设置_owner,成功则返回1
if (Atomic::cmpxchg_ptr (Self, &_owner, NULL) == NULL) {
return 1 ;
}
1.4 获取锁失败
if (true) return -1 ;
}
}
自旋尝试获取锁的函数 : TrySpin (),第2021行代码,该函数只执行10次尝试获取锁(默认10次)
int ObjectMonitor::TrySpin_VaryDuration (Thread * Self) {
...
1. 循环次数默认10次
for (ctr = Knob_PreSpin + 1; --ctr >= 0 ; ) {
if (TryLock(Self) > 0) {
....
return 1 ;
}
1.1 休息一下继续
SpinPause () ;
}
...
}

在没有获取到锁的情况下,一直在尝试获取锁,最后成功进_cxq队列后,还是会尝试去获取锁(求生欲真的很强,不想被挂起).
上面流程,要么是成功获取锁,要么获取不到锁期间一直不断尝试,直到最后实在获取不到(实在尽力了)而进入阻塞状态,等待其他线程的唤醒后继续加入到锁竞争中.
重量锁解锁过程 --- exit()
同样的还是objectMonitor.cpp文件的第962行代码.
void ATTR ObjectMonitor::exit(bool not_suspended, TRAPS) {
Thread * Self = THREAD ;
1. 当前持有锁的线程的_owner指向的是Lock Record,即之前是轻量锁
if (THREAD != _owner) {
1.1 判断当前线程是轻量级锁的持有者
if (THREAD->is_lock_owned((address) _owner)) {
1.2 _owner指向当前线程
_owner = THREAD ;
_recursions = 0 ;
OwnerIsThread = 1 ;
} else {
// 异常情况
if (false) {
THROW(vmSymbols::java_lang_IllegalMonitorStateException());
}
return;
}
}
2. 可重入
if (_recursions != 0) {
_recursions--;
return ;
}
3. 开始释放锁
for (;;) {
if (Knob_ExitPolicy == 0) {
// 默认走这里
3.1 释放锁,此时别的线程可以抢占了,这里是导致Synchronized是一个不公平锁因素之一
OrderAccess::release_store_ptr (&_owner, NULL) ;
OrderAccess::storeload() ;
3.2 没有线程在_cxq/_EntryList等待,则直接退出
if ((intptr_t(_EntryList)|intptr_t(_cxq)) == 0 || _succ != NULL) {
return ;
}
3.3 有线程在等待,再把之前释放的锁拿回来
if (Atomic::cmpxchg_ptr (THREAD, &_owner, NULL) != NULL) {
3.4 若是失败,说明被人抢占了,直接退出
return ;
}
} else {
...
}
ObjectWaiter * w = NULL ;
4. 获取模式,默认QMode=0
int QMode = Knob_QMode ;
....
5. 继续判断当前_EntryList不为空
w = _EntryList ;
if (w != NULL) {
5.1 释放锁
ExitEpilog (Self, w) ;
return ;
}
6. entryList指向_cxq
w = _cxq ;
6.1 _cxq也是空的,跳过本次循环,自旋回到3.1步骤的时候再次判断两个列表是否为空
if (w == NULL) continue ;
7. CAS设置_cxq为NULL
for (;;) {
ObjectWaiter * u = (ObjectWaiter *) Atomic::cmpxchg_ptr (NULL, &_cxq, w) ;
if (u == w) break ;
w = u ;
}
8. 将_cxq转向给entryList
if (QMode == 1) {
// 该模式下,将cxq中的节点按颠倒顺序排到EntryList
QMode == 1
} else {
// QMode == 0 or QMode == 2
// 默认QMode == 0 所以是走这里
8.1 开始处理_EntryList
_EntryList = w ;
9. 由于_cxq是单向链表,所以这里的_EntryList需要设置成双向链表
ObjectWaiter * q = NULL ;
ObjectWaiter * p ;
for (p = w ; p != NULL ; p = p->_next) {
9.1 设置为TS_ENTER状态
p->TState = ObjectWaiter::TS_ENTER ;
9.2 跟上一节点连接起来
p->_prev = q ;
q = p ;
}
}
10. 释放锁
w = _EntryList ;
if (w != NULL) {
ExitEpilog (Self, w) ;
return ;
}
}
}
核心的是从第3步骤的释放锁开始 :
- cxq和EntruList都为空的情况下,直接释放锁;
- EntryList不为空的情况下,调用ExitEpilog()函数释放锁;
- EntryList为空,cxq不为空的情况下,将cxq指针赋值给EntryList,并且将EntryList链表变成双向链表,然后调用ExitEpilog()函数释放锁.
- 占有锁的线程释放锁后最终会从EntryList中唤醒阻塞等待锁的线程
- 默认值QMode=0的情况下,如果是EntryList队列不为空,则取出EntryList队头节点并唤醒
- 默认值QMode=0的情况下,如果是EntryList队列为空,cxq不为空,则EntryList指向cxq,并取出队头节点唤醒
ExitEpilog() 释放锁函数 : 同样的还是objectMonitor.cpp文件的第1333行代码
void ObjectMonitor::ExitEpilog (Thread * Self, ObjectWaiter * Wakee) {
1. 从队列节点里取出ParkEvent
ParkEvent * Trigger = Wakee->_event ;
Wakee = NULL ;
2. 释放锁,将_owner置空
OrderAccess::release_store_ptr (&_owner, NULL) ;
OrderAccess::fence() ;
3. 唤醒节点里封装的线程
Trigger->unpark() ;
}

重量级锁小结
在获取锁的过程中很努力的去尝试获取锁,尽可能的去尝试获取到锁而不用挂起阻塞,即实在没获取到才阻塞,最后成功插入cxq队列头的位置,才挂起自己;
解锁函数exit()的3.1步骤会直接释放锁,导致此时其他线程可以去抢占锁了,而后期去唤醒entryList的头节点可能竞争不到锁,所以Synchronized是一个不公平锁.
偏向锁、轻量级锁都是围绕Mark Word,而重量级锁是围绕ObjectMonitor对象.

重量级锁 --- Demo
# Student.java
public class Student {
private String name;
private Integer age;
public Student(String name, Integer age) {
this.name = name;
this.age = age;
}
/** get、set**/
}
#QueryThreadFactory.java
public class QueryThreadFactory implements ThreadFactory {
private final String name;
private final AtomicInteger nextId = new AtomicInteger(1);
public QueryThreadFactory() {
this.name = "Thread-pool-";
}
@Override
public Thread newThread(Runnable task) {
// 设置线程名称
String index = String.valueOf(nextId.getAndIncrement());
String newName = name + index;
return new Thread(task, newName);
}
}
#QueryThread.java
public class QueryThread implements Runnable {
private final Student student;
public QueryThread(Student student) {
this.student = student;
}
@Override
public void run() {
String threaName = Thread.currentThread().getName();
System.out.println(threaName + " : 开始竞争锁");
synchronized (student) {
try {
System.out.println(threaName + " : 竞争锁成功");
Integer age = student.getAge();
Thread.sleep(5000);
System.out.println("学生名称 : " + student.getName());
System.out.println("学生年龄 : " + age);
student.setAge(++age);
} catch (InterruptedException e) {
}
try {
Thread.sleep(100);
} catch (InterruptedException e) {
}
System.out.println(threaName + " : 锁释放成功");
}
}
}
#TestQueryThread.java
public class TestQueryThread {
private final Student student;
public TestQueryThread(Student student) {
this.student = student;
}
public static void main(String[] args) {
// 锁对象
Student student = new Student("QRB", 23);
TestQueryThread queryThread = new TestQueryThread(student);
// 初始化三条线程
int count = 3;
// 有界队列
LinkedBlockingQueue<Runnable> queue = new LinkedBlockingQueue<>(count);
// 线程池
ExecutorService exec = new ThreadPoolExecutor(count, count, 60,
TimeUnit.SECONDS, queue, new QueryThreadFactory());
for (int i = 0; i < count; i++) {
// 执行任务
exec.execute(new QueryThread(queryThread.student));
}
// 关闭线程池
exec.shutdown();
}
}

等待队列 --- WaitSet
我们知道,当线程调用wait()方法,该线程将主动释放持有的Mutex Lock,并且进入阻塞状态,将封装成ObjectWaiter结点进入WaitSet双向链表中,等待下一次被其他线程调用notify()或者notifyAll()唤醒并开始继续竞争锁,也就是说必须是持有锁线程才会调用wait() \ notify() \ notifyAll() .
Java 的Object 类提供了基于 native 实现的 wait() 、 notify() 、 notifyAll() ,这三个方法起始就是解决了同步锁的同步问题,即支持了线程间通讯、协调的方式,这三个函数也是在JVM层面实现.
挂起阻塞 --- wait()
JVM实现的wait()代码是在 synchronizer.cpp 的第379行代码.
void ObjectSynchronizer::wait(Handle obj, jlong millis, TRAPS) {
1. 如果是偏向锁,则撤销
if (UseBiasedLocking) {
BiasedLocking::revoke_and_rebias(obj, false, THREAD);
}
2. 膨胀为重量级锁,也可以理解为获取ObjectMonitor对象
ObjectMonitor* monitor = ObjectSynchronizer::inflate(THREAD, obj());
DTRACE_MONITOR_WAIT_PROBE(monitor, obj(), THREAD, millis);
2.1 调用ObjectMonitor的wait函数
monitor->wait(millis, true, THREAD);
....
}
调用了wait方法后将膨胀为重量级锁了,所以最终还是调用ObjectMonitor的wait() : objectMonitor.cpp 的第1471行代码.
void ObjectMonitor::wait(jlong millis, bool interruptible, TRAPS) {
1. 获取当前线程
Thread * const Self = THREAD ;
JavaThread *jt = (JavaThread *)THREAD;
2. 如果线程中断,则抛出异常
if (interruptible && Thread::is_interrupted(Self, true) && !HAS_PENDING_EXCEPTION) {
2.1 抛出异常
THROW(vmSymbols::java_lang_InterruptedException());
return ;
}
Self->_Stalled = intptr_t(this) ;
jt->set_current_waiting_monitor(this);
3. 构造ObjectWaiter结点
ObjectWaiter node(Self);
3.1 设置状态为TS_WAIT
node.TState = ObjectWaiter::TS_WAIT ;
Self->_ParkEvent->reset() ;
4. 连接结点到WaitSet
4.1 对WaitSet操作需要获取锁
Thread::SpinAcquire (&_WaitSetLock, "WaitSet - add") ;
AddWaiter (&node) ;
4.2 对WaitSet操作释放锁
Thread::SpinRelease (&_WaitSetLock) ;
5.设置ObjectMonitor属性
5.1 记录旧的递归计数
intptr_t save = _recursions;
5.2 等待锁的线程个数加1
_waiters++;
5.3 初始化可重入数为0
_recursions = 0;
6. 释放锁,唤醒其他线程
exit (true, Self) ;
7. 挂起
int ret = OS_OK ;
int WasNotified = 0 ;
{
OSThread* osthread = Self->osthread();
OSThreadWaitState osts(osthread, true);
{
ThreadBlockInVM tbivm(jt);
jt->set_suspend_equivalent();
7.1 线程中断,不做任何处理
if (interruptible && (Thread::is_interrupted(THREAD, false) || HAS_PENDING_EXCEPTION)) {
} else
if (node._notified == 0) {
7.2 根据时间,调用park(),阻塞当前线程
if (millis <= 0) {
Self->_ParkEvent->park () ;
} else {
ret = Self->_ParkEvent->park (millis) ;
}
}
7.3 阻塞状态被中断
if (ExitSuspendEquivalent (jt)) {
jt->java_suspend_self();
}
}
------------------接下来是线程被唤醒后的操作
8. 当前线程状态为TS_WAIT
if (node.TState == ObjectWaiter::TS_WAIT) {
8.1 获取操作WaitSet的锁
Thread::SpinAcquire (&_WaitSetLock, "WaitSet - unlink") ;
8.2 状态为TS_WAIT
if (node.TState == ObjectWaiter::TS_WAIT) {
8.3 从WaitSet中删除
DequeueSpecificWaiter (&node) ;
8.4 设置状态为TS_RUN
node.TState = ObjectWaiter::TS_RUN ;
}
8.5 释放操作WaitSet的锁
Thread::SpinRelease (&_WaitSetLock) ;
}
9. 开始竞争锁
ObjectWaiter::TStates v = node.TState ;
9.1 如果状态为TS_RUN,则当前竞争锁的线程不在同步队列中
if (v == ObjectWaiter::TS_RUN) {
9.2 开始加入竞争锁,
enter (Self) ;
} else {
9.3 也是竞争锁,只是当前线程在cxq或者entryList队列,由于是在同步队列中,所以竞争锁失败不用再次进入同步队列(notify唤醒后是移到cxq或者entryList)
ReenterI (Self, &node) ;
node.wait_reenter_end(this);
}
10. 走到这,表示获取锁成功,同步队列里移出节点
UnlinkAfterAcquire (Self, SelfNode) ;
}
添加到等待队列函数 : AddWaiter (&node),第2337行代码
inline void ObjectMonitor::AddWaiter(ObjectWaiter* node) {
if (_WaitSet == NULL) {
_WaitSet = node;
node->_prev = node;
node->_next = node;
} else {
ObjectWaiter* head = _WaitSet ;
ObjectWaiter* tail = head->_prev;
tail->_next = node;
head->_prev = node;
node->_next = head;
node->_prev = tail;
}
}
由此可知WaitSet是一个双向链表.
监视调用wait()的重入函数 : ReenterI(),第757行代码.该函数表示当前线程状态不是TS_RUN,即当前线程存在同步队列中
void ATTR ObjectMonitor::ReenterI (Thread * Self, ObjectWaiter * SelfNode) {
for (;;) {
ObjectWaiter::TStates v = SelfNode->TState ;
// 当前线程存在cxq或entryList队列
guarantee (v == ObjectWaiter::TS_ENTER || v == ObjectWaiter::TS_CXQ, "invariant");
1. 尝试一次获取锁
if (TryLock (Self) > 0) break ;
2. 自旋获取锁
if (TrySpin (Self) > 0) break ;
----------------- 获取锁失败,挂起
3. 挂起
if (SyncFlags & 1) {
Self->_ParkEvent->park ((jlong)1000) ;
} else {
Self->_ParkEvent->park () ;
}
3.1 被唤醒后继续获取锁
if (TryLock(Self) > 0) break ;
....
}
}
wait()小结
- 当前线程肯定是在持有锁状态下才调用wait()
- 构建ObjectWaiter结点进入WaitSet,并且WaitSet为双向链表结构
- 释放锁并唤醒同步队列里的头部结点(enter()函数)
- 挂起自己,等待被其他线程唤醒
- 被唤醒,继续竞争锁
唤醒 --- notify()
synchronizer.cpp : 第410行代码.
void ObjectSynchronizer::notify(Handle obj, TRAPS) {
1. 如果是偏向锁,则撤销
if (UseBiasedLocking) {
BiasedLocking::revoke_and_rebias(obj, false, THREAD);
}
2. 获取Mark Word
markOop mark = obj->mark();
2.1 当前是轻量级锁,并且是当前线程,则返回(轻量级锁没有处于阻塞状态的线程)
if (mark->has_locker() && THREAD->is_lock_owned((address)mark->locker())) {
return;
}
3. 膨胀为重量级锁,并调用ObjectMonitor.notify()函数
ObjectSynchronizer::inflate(THREAD, obj())->notify(THREAD);
}
最终还是调用ObjectMonitor的notify(),objectMonitor.cpp,第1706行代码.
void ObjectMonitor::notify(TRAPS) {
1. WaitSet为空,直接返回
if (_WaitSet == NULL) {
return ;
}
2.获取操作WaitSet的锁
Thread::SpinAcquire (&_WaitSetLock, "WaitSet - notify") ;
3. notify 策略,默认是2
int Policy = Knob_MoveNotifyee ;
4. 获取并将队头节点移出队列,即唤醒的是WaitSet的头节点
ObjectWaiter * iterator = DequeueWaiter() ;
5. 开始处理WaitSet的头节点
#if :: iterator != NULL : start
if (iterator != NULL) {
5.1 设置一些,例如线程信息
iterator->_notified = 1 ;
Thread * Self = THREAD;
iterator->_notifier_tid = JFR_THREAD_ID(Self);
ObjectWaiter * List = _EntryList ;
5.2 _EntryList同步队列不为空,则将WaitSet的头节点加入到同步队列中,并设置为TS_ENTER状态
if (List != NULL) {
assert (List->_prev == NULL, "invariant") ;
assert (List->TState == ObjectWaiter::TS_ENTER, "invariant") ;
assert (List != iterator, "invariant") ;
}
// notify 策略,默认Policy=2
5.3 根据不同的策略不同的处理
if (Policy == 0) {
// 该策略是添加到entryList同步队列的头部
} else if (Policy == 1) {
// 该策略是添加到entryList同步队列的尾部
} else if (Policy == 2) {
// 该策略尾默认策略,所以是走这里
// 添加到cxq同步队列的头部
5.4 如果entryList同步队列为空,则直接添加到_EntryList
if (List == NULL) {
iterator->_next = iterator->_prev = NULL ;
_EntryList = iterator ;
} else {
5.4 设置状态为TS_CXQ
iterator->TState = ObjectWaiter::TS_CXQ ;
5.5 添加到cxq队列头部
for (;;) {
ObjectWaiter * Front = _cxq ;
iterator->_next = Front ;
if (Atomic::cmpxchg_ptr (iterator, &_cxq, Front) == Front) {
break ;
}
}
} else if (Policy == 3) {
// 添加到cxq同步队列的尾部
} else {
5.6 都不满足,直接唤醒线程
ParkEvent * ev = iterator->_event ;
iterator->TState = ObjectWaiter::TS_RUN ;
OrderAccess::fence() ;
ev->unpark() ;
}
}
#if :: iterator != NULL : end
}
6.释放操作WaitSet链表的锁
Thread::SpinRelease (&_WaitSetLock) ;
....
}
获取WaitSet头节点,即头节点出队 --- DequeueWaiter() objectMonitor.cpp,第2357行代码.
inline ObjectWaiter* ObjectMonitor::DequeueWaiter() {
ObjectWaiter* waiter = _WaitSet;
if (waiter) {
DequeueSpecificWaiter(waiter);
}
return waiter;
}
inline void ObjectMonitor::DequeueSpecificWaiter(ObjectWaiter* node) {
...
1. 头节点
ObjectWaiter* next = node->_next;
if (next == node) {
1.1 相等,说明WaitSet只有一个结点
_WaitSet = NULL;
} else {
1.2 删除头节点
ObjectWaiter* prev = node->_prev;
assert(prev->_next == node, "invariant check");
assert(next->_prev == node, "invariant check");
next->_prev = prev;
prev->_next = next;
if (_WaitSet == node) {
1.3 将_WaitSet往后移动,指向下一个节点
_WaitSet = next;
}
}
node->_next = NULL;
node->_prev = NULL;
}
notify()小结
- 在默认策略下,对于操作WaitSet需要获取锁,并且在获取操作WaitSet锁成功后,通过DequeueWaiter()函数获取头部结点,所以可以知道notify()唤醒的是WaitSet链表的头节点
- 根据策略不同,对于唤醒的结点处理的方式不同,以默认Policy=2为例子 :
- 如果entryList同步队列为空,则直接添加到_EntryList
- 否则entryList不为空,则添加到cxq队列头部
- notify()干的活就是将WaitSet头节点从等待队列里移出并加入到同步队列里开始新的一轮锁竞争
唤醒全部 --- notifyAll()
只要理解了上面所说的notify()函数,就可以很快的理解了notifyAll()函数,它们两的区别在于,notifyAll()是唤醒WaitSet链表的全部结点,所以我们直接看默认策略下的代码 :
#ObjectMonitor.cpp
if (Policy == 2) {
iterator->TState = ObjectWaiter::TS_CXQ ;
for (;;) {
ObjectWaiter * Front = _cxq ;
iterator->_next = Front ;
if (Atomic::cmpxchg_ptr (iterator, &_cxq, Front) == Front) {
break ;
}
}
}
与notify不同的是,此处是直接插入到_cxq头部了,也就是说原本在等待队列末尾的节点反而排在了同步队列的前面.
一图胜千言

总结
- 重量级锁获取对象是通过CAS设置_owner字段来获取锁.
- 围绕同步队列(EntryList)、等待队列(WaitSet)、owner字段等,ObjectMonitor跟管程思想有很大的相似度.
- 根据不同策略,notify可以有不同的方式将等待队列的结点添加到同步队列,而默认模式是取头节点,即默认模式下,notify唤醒的并不是随机唤醒某个结点.
结束语
- 原创不易
- 希望看完这篇文章的你有所收获!
相关参考资料
- Java并发编程艺术【书籍】
- Java并发编程实战【书籍】
- Java Synchronized 重量级锁原理深入剖析上(互斥篇)【链接】
- Java Synchronized 重量级锁原理深入剖析下(同步篇)【链接】
- Java精通并发-通过openjdk源码分析ObjectMonitor底层实现【链接】