把学习当成一种习惯
选择往往大于努力,越努力越幸运

管程机制

  在学习Synchronized之前,了解一下操作系统提供的一种解决并发编程的思想 --- 管程,英文名称叫"Monitor",在Java领域中一般称之"监视器",操作系统领域称之为"管程".
  所谓管程,指的是管理共享变量以及对共享变量的操作过程,让他们支持并发.翻译为 Java 领域的语言,就是管理类的成员变量和成员方法,让这个类是线程安全的.

管程 --- MEAS 模型

  在管程的发展史上,先后出现过三种不同的管程模型,分别是:Hasen 模型、Hoare 模型和 MESA 模型.其中,现在广泛应用的是 MESA 模型,并且 Java 管程的实现参考的也是 MESA 模型.

  管程解决了并发编程的两大核心问题 :

  • 互斥 : 即同一时刻只允许一个线程访问共享资源
  • 同步 : 即线程之间如何通信、协作(线程的阻塞和唤醒控制)

管程 --- 互斥

  管程解决互斥的方法 : 将共享变量和操作共享变量的操作统一封装起来,如下面抽象出来的代码,如果存在多条线程访问共享变量,只能通过入队、出队的方法来实现,这两方法保证了互斥性,只允许一条线程进入管程.


Monitor m {
  // 共享变量
  var queue;
  // 入队
  function enq();
  // 出队
  function deq();
}

管程 --- 同步

  管程采用了 MESA 模型,如下图 : 入口维护了一个同步队列(图中是等待队列),只能一条线程操作共享变量,即一个线程获取到锁,而其他线程则在入口的同步队列中等待.
  管程引入了条件变量,每个条件变量对应有一个等待队列(也可以叫条件队列),每个条件变量都有自己对应的等待队列,解决了线程的同步问题,即解决了线程间的协调、通信问题.

阻塞队列 --- 生产者消费者模式

  Java在代码层次也实现了类似的管程思想 : 例如 AQS \ await() \ signal() \ signalAll() \ Condition对象 等这些结合起来实现的管程思想,其中提供的ReetrantLock\ReetrantReadWriteLock等锁的底层都是基于AQS来实现,但本文不关心AQS,本文核心重点是Synchronized.
  多线程中最常用的一种设计模式 : 生产者消费者模式,而阻塞队列支持生产者消费者模式,Java 提供的阻塞队列中有ArrayBlockingQueue和LinkedBlockingQueue 等.
  阻塞队列 : 阻塞队列支持生产者消费者模式 : 有数据生成时,生产者将数据放入队列,而消费者处理数据时,将从队列中获取数据.生产者不需要知道消费者的标识或者数量,或者他们是否是唯一的消费者,只需要将数据放入队列即可.同样,消费者也不需要知道生产者是谁.
  在有界队列的前提下,生产者生产数据放入队列中,此时队列中存储的数据如果为满,则挂起当前的线程,该线程就会加入到队列不为满的这个条件的Condition的等待队列中;同样的,消费者消费数据需要从队列中拿到数据,如果队列中此时没有存储数据,则挂起当前的线程,该线程就会加入到队列不为空的这个条件的Condition的等待队列中.
  Java提供的Condition对象就是我们所说的条件变量,每一个Condition对象都有相对应的等待队列.其中提供的阻塞队列中例如ArrayBlockingQueue和LinkedBlockingQueue等 就有使用到Condition对象 :


   // 借用了LinkedBlockingQueue 的几个全局变量

   /** 获取并移除元素时使用的锁,如take, poll, etc */
   private final ReentrantLock takeLock = new ReentrantLock();


   /** notEmpty条件对象,当队列没有数据时用于挂起执行删除的线程(即取任务的消费者) */
   private final Condition notEmpty = takeLock.newCondition();

   /** 添加元素时使用的锁如 put, offer, etc */
   private final ReentrantLock putLock = new ReentrantLock();

   /** notFull条件对象,当队列数据已满时用于挂起执行添加的线程 */
   private final Condition notFull = putLock.newCondition();


// 阻塞队列的简单思路

public class BlockedQueue<T>{
  
  final Lock lock = new ReetrantLock();
  // 条件变量:队列不满  
  final Condition notFull = lock.newCondition();
  // 条件变量:队列不空  
  final Condition notEmpty = lock.newCondition();
  
  // 入队
  void enq (T t) {
  // 获取锁
  lock.lock();
  try {
  
    while (队列为满) {
      // 挂起,进入 队列不满 相对应的等待队列等待 
      notFull.await();
     }
  
    // 省略入队操作...
    // 入队后, 通知(唤醒) 队列不空 相对应的等待队列
    notEmpty.signal();
  
  } finally {
    // 释放锁
    lock.unlock();
  }
  
  }
  
  
  // 出队
  void deq (T t) {
  // 获取锁
  lock.lock();
  try {
    
    while (队列为空) {
      // 挂起,进入 队列不空 相对应的等待队列等待
      notEmpty.await();
    }
    
    // 省略出队操作...
    // 出队后,通知(唤醒) 队列不满 相对应的等待队列
    notFull.signal();
    
  } finally {
    // 释放锁
    lock.unlock();
  }
  
  }
  
}

  • 对于入队操作,如果队列已满,就需要等待直到队列不满,所以这里用了notFull.await();
  • 对于出队操作,如果队列为空,就需要等待直到队列不空,所以就用了notEmpty.await();
  • 如果入队成功,那么队列就不空了,就需要通知条件变量:队列不空notEmpty对应的等待队列;
  • 如果出队成功,那就队列就不满了,就需要通知条件变量:队列不满notFull对应的等待队列;

【PS : await() \ signal() \ signalAll() 相对应的 wait() \ notify() \ notifyAll() 语义是一样的】
【PS : 除非经过深思熟虑,否则尽量使用 notifyAll(),因为使用notify()可能会导致线程进入饥饿状态】

小结

  并发编程里两大核心问题——互斥和同步,都可以由管程来帮你解决.学好管程,理论上所有的并发问题你都可以解决,并且很多并发工具类底层都是管程实现的,所以学好管程,就是相当于掌握了一把并发编程的万能钥匙.

Synchronized --- 本文核心

  以上知识点有助于我们对管程初步了解,那么接下来进入Synchronized知识点,Synchronized升级为重量级锁的时候,其底层采用的ObjectMonitor对象实现思路与管程有着很大的相似.
  在Java并发编程中,Synchronized一直都是一个元老级别的重量级锁,随着Java SE 1.6 对Synchronized的优化(锁升级),引入了"偏向锁" 和 "轻量锁",关于"偏向锁" 和 "轻量锁"以及锁升级,可以看这篇文章 : 并发编程基础:Java对象头及锁升级
  由上面文章我们可以知道 : 当锁处于轻量级锁状态的时候,对象头的Mark Word是指向Lock Record(指向第一个,可重入情况下有多个Lock Record),其他获取不到的线程就会进入自旋状态;我们在这里假设自旋中的线程是无数次自旋的,并且不会升级会重量级锁,而且我们知道Lock Record是线程私有的,那么自旋中的线程只能依靠一直自旋而看能不能获取到锁,即轻量级锁由于Lock Record,解决了并发的互斥问题,但还是不能解决并发的同步问题,所以,当锁竞争激烈的时候,轻量级锁升级为重量级锁,重量级锁(Synchronized)解决了互斥、同步问题.
  当锁由轻量级锁升级为重量级锁的时候,JVM就会申请一个锁对象的ObjectMontor对象,锁对象的对象头Mark Word此时存储的指针就是该ObjectMontor.
【PS : Java 参考了 MESA 模型,语言内置的管程(ObjectMonitor)对 MESA 模型进行了精简.MESA 模型中,条件变量可以有多个,Java 语言内置的管程里只有一个条件变量,并且与 wait() \ notify() \ notifyAll() 这三个方法都是管程的组成部分. 】

ObjectMonitor机制

  Synchronized同步锁是在JVM层面上实现的,当Synchronized升级为重量级锁的时候,是基于管程(Monitor)对象来实现.Monitor 是由 ObjectMonitor对象 实现,而 ObjectMonitor对象 是由 C++ 的 objectMonitor.hpp 文件实现.
  ObjectMonitor机制就是Synchronized同步块锁机制升级为"重量级锁"后的工作机制,即当Synchronized同步锁升级为"重量级锁"的时候,会向JVM申请ObjectMonitor对象,直奔主题,看看ObjectMonitor长啥样 :


//ObjectMonitor.hpp

ObjectMonitor() {
    // 记录无锁状态的Mark Word
    _header       = NULL;
    _count        = 0;
    // 等待锁的线程个数
    _waiters      = 0,
    // 线程重入次数
    _recursions   = 0;
    // 指向的对象头
    _object       = NULL;
    // 指向线程或者Lock Record,即指向了当前持有锁线程
    _owner        = NULL;
    // 调用wait()方法后等待锁的队列,即条件变量的等待队列
    _WaitSet      = NULL;
    // 等待队列的锁
    _WaitSetLock  = 0 ;
    _Responsible  = NULL ;
    // 下一个被唤醒的线程
    _succ         = NULL ;
    // ObjectWaiter 队列
    _cxq          = NULL ;
    FreeNext      = NULL ;
    // ObjectWaiter 队列,即同步队列
    _EntryList    = NULL ;
    
    _SpinFreq     = 0 ;
    _SpinClock    = 0 ;
    OwnerIsThread = 0 ;
    _previous_owner_tid = 0;
    
 }
 

可以看出,ObjectMonitor提供的信息量可比线程私有栈信息的Lock Record爆炸多了

  • header : 记录无锁状态的Mark Word,同Lock Record的Displaced Mark Word字段
  • ObjectWaiter : 一个双向链表结构对象,封装了线程的信息,包括当前线程的状态
  • EntryList : 当多个线程同时访问同步代码时,这些因自旋到达次数后还没有竞争到锁的线程就会进入阻塞,将封装成ObjectWaiter结点进入EntrySet双向链表(EntryList);而获取到锁的线程,即获取到锁对象的Monitor,Monitor依赖于操作系统底层的Mutex Lock,当线程获取到Mutex Lock时,其他线程将不能获取到Mutex Lock.
  • WaitSet : 当线程调用wait()方法,该线程将主动释放Mutex Lock,并且进入阻塞,将封装成ObjectWaiter结点进入WaitSet双向链表,等待下一次被其他线程调用notify()或者notifyAll()唤醒开始继续竞争锁;
  • owner : 加锁和释放锁是通过owner,即指向了持有锁的线程的Lock Record或者当前持有锁线程
  • recursions : 线程重入次数,即表示Synchronized支持可重入
  • object : 指向了对象头
  • cxq : cxq队列存储的是指向enter函数的时候因为锁已经被其他线程占有而阻塞的线程(单向链表)
  • succ : 下一个被唤醒线程

【PS : Synchronized同步锁为重量级锁的情况下采用了Monitor,因为Monitor是依赖于操作系统底层的Mutex Lock,所以阻塞、唤醒的时候存在用户态与内核态之间的切换,增加了性能开销】

ObjectWaiter节点 : objectMonitor.hpp 文件的第38行代码.


class ObjectWaiter : public StackObj {
 public:
  enum TStates { TS_UNDEF, TS_READY, TS_RUN, TS_WAIT, TS_ENTER, TS_CXQ };

  // 指向下一个节点
  ObjectWaiter * volatile _next;

  // 指向上一个节点
  ObjectWaiter * volatile _prev;

  // 线程
  Thread*   _thread;
  
  // 线程状态
  volatile TStates TState;
  
 public:
  ObjectWaiter(Thread* thread);
  
};

锁膨胀为重量锁的过程

直接看代码, synchronizer.cpp 文件的第1211行代码 --- inflate()函数


// oop : 指的是对象头,根据oop可以拿到Mark Word
ObjectMonitor * ATTR ObjectSynchronizer::inflate (Thread * Self, oop object) {
// 自旋获取ObjectMonitor
 for (;;) {
 
 1. 根据对象头oop获取Mark Word
 const markOop mark = object->mark();
 
 2. 判断当前Mark Word存储的是否Monitor指针,如果是,直接返回Monitor对象
 if (mark->has_monitor()) {
    ObjectMonitor * inf = mark->monitor();
    return inf;
 }
 
 3. 当前的锁正在膨胀中,直接跳过,重新循环
 if (mark == markOopDesc::INFLATING()) {
    ReadStableMark(object) ;
    continue ;
 }
 
 4. 当前为轻量级锁
 if (mark->has_locker()) {
 
    4.1 申请ObjectMonitor对象
    ObjectMonitor * m = omAlloc (Self);
 
    4.2 初始化参数
    m->Recycle();
    m->_Responsible  = NULL ;
    m->OwnerIsThread = 0 ;
    m->_recursions   = 0 ;
    m->_SpinDuration = ObjectMonitor::Knob_SpinLimit ;
 
    4.3 CAS尝试将Mark Word设置为膨胀状态
markOop cmp = (markOop) Atomic::cmpxchg_ptr (markOopDesc::INFLATING(), object->mark_addr(), mark) ;
      if (cmp != mark) {
         4.4 修改失败,重新开始循环
         omRelease (Self, m, true) ;
         continue ; 
      }
 
 4.5 修改成功,即获取到锁,然后去自己之前的线程栈中的Lock Record拿到Mark Word(无锁状态信息)
 markOop dmw = mark->displaced_mark_helper() ;
 
 4.6 将Mark Word(无锁状态信息) 设置到ObjectMonitor的_header字段里
 m->set_header(dmw) ;
 
 4.7 设置_owner字段,即指向了锁的持有者线程栈的Lock Record(因为当前是轻量级锁升级)
 m->set_owner(mark->locker());
 
 4.8 object为oop对象头,即_object指向了对象头
 m->set_object(object);
 
 4.9 对象头oop的Mark Word存储指向ObjectMonitor对象(m)
 object->release_set_mark(markOopDesc::encode(m));
 
 ...
 
 4.2 成功,返回ObjectMonitor
 return m ;
 
 }
 
 5. 无锁状态,首先申请ObjectMonitor
 ObjectMonitor * m = omAlloc (Self) ;
 
 6. 初始化及设置属性值
 m->Recycle();
 m->set_header(mark);
 m->set_owner(NULL);
 m->set_object(object);
 m->OwnerIsThread = 1 ;
 m->_recursions   = 0 ;
 m->_Responsible  = NULL ;
 m->_SpinDuration = ObjectMonitor::Knob_SpinLimit ;
 
 7. CAS将Mark Word设置为指向ObjectMonitor的指针
 if (Atomic::cmpxchg_ptr (markOopDesc::encode(m), object->mark_addr(), mark) != mark) {
     7.1 CAS失败,重新循环
     m->set_object (NULL) ;
     m->set_owner  (NULL) ;
     m->OwnerIsThread = 0 ;
     m->Recycle() ;
     omRelease (Self, m, true) ;
     m = NULL ;
     continue ;
         
 }
  
 ...
 8. 成功,返回ObjectMonitor
 return m ;
 
 }
 
}

  • 在Mark Word为轻量级锁的情况下,可能存在多线程膨胀重量级锁,所以只能一条线程CAS设置Mark Word为膨胀状态,其他线程设置失败的,将重新自旋获取ObjectMonitor.
  • 成功升级为重量级锁,即CAS设置Mark Word为膨胀状态成功,对象头oop的Mark Word通过markOopDesc::encode(m)函数设置指向申请的ObjectMonitor对象,其中m为ObjectMonitor对象

  static markOop encode(ObjectMonitor* monitor) {
    intptr_t tmp = (intptr_t) monitor;
    //Mark Word指向ObjectMonitor
    return (markOop) (tmp | monitor_value);
  }
  

【PS : 从图中可以知道,markOopDesc::encode(m)函数可安全的设置,不存在多线程竞争】

重量锁获取锁过程

偏向锁、轻量级锁是围绕Mark Word,而重量级锁是围绕ObjectMonitor : objectMonitor.cpp.

初次尝试获取锁 --- enter()函数

objectMonitor.cpp 文件的第321行代码 --- enter()函数


void ATTR ObjectMonitor::enter(TRAPS) { 

1. 获取当前线程
Thread * const Self = THREAD ;
void * cur ;

2. CAS尝试设置_owner指向当前线程
cur = Atomic::cmpxchg_ptr (Self, &_owner, NULL) ;

3. CAS修改成功,则获取了重量级锁
if (cur == NULL) {
   return ;
}

---------------CAS修改失败就执行一下操作

4. _owner指向的是当前线程,可重入_recursions加1
if (cur == Self) {
   _recursions ++ ;
   return ;
}

5. _owner指针指向的是一个Lock Recod,说明之前是轻量级锁的持有者,并且Lock Recod是当前线程的
if (Self->is_lock_owned ((address)cur)) {

   5.1 重入次数为1次
   _recursions = 1 ;
   
   5.2 不指向Lock Record了,而是指向当前线程
   _owner = Self ;
   OwnerIsThread = 1 ;
   return ;
   
}

....

------------------- 执行到这里,说明没成功获取到锁,进入自旋

6. 自旋
for (;;) { 

6.1 获取锁函数,下面会细说
EnterI (THREAD) ;

6.2 获取锁成功,设置属性
_recursions = 0 ;
_succ = NULL ;

6.3 释放锁函数,下面会细说
exit (false, Self) ;

jt->java_suspend_self();

}

由上图可清晰知道,ObjectMonitor是围绕_owner字段来竞争锁的,记下来看看进入自旋的EnterI()函数.

自旋尝试获取锁 --- EnterI()函数

同样的还是objectMonitor.cpp文件的第508行代码.


void ATTR ObjectMonitor::EnterI (TRAPS) {

1. 获取当前线程
Thread * Self = THREAD ;

2. 尝试获取锁
if (TryLock (Self) > 0) {
   return ;
}

3. 自旋尝试获取锁
if (TrySpin (Self) > 0) {
   return ;
}

-------------执行到这里获取锁还是失败了,进入同步队列

4. 创建ObjectWaiter对象
ObjectWaiter node(Self) ;

5. 挂起/唤醒线程重置参数,此时的线程还没挂起
Self->_ParkEvent->reset() ;

5.1 前驱节点为无效节点(cxq为单向链表)
node._prev   = (ObjectWaiter *) 0xBAD ;
5.2 当前节点状态为CXQ,也就是说节点在_cxq队列里
node.TState  = ObjectWaiter::TS_CXQ ;


6. 将生成的节点放入_cxq队列头节点中,最后在阻塞前继续尝试还能不能获取到锁
ObjectWaiter * nxt ;

    for (;;) {
    
        node._next = nxt = _cxq ;
        
        6.1 CAS设置节点为_cxq队列的头节点,成功则停止自旋
        if (Atomic::cmpxchg_ptr (&node, &_cxq, nxt) == nxt) break ;

        6.2 加入_cxt队列失败,再次尝试获取锁
        if (TryLock (Self) > 0) {
            return ;
        }
        
    }

#start : 自旋开始

7. 成功加入_cxt队列,此时还未阻塞,而是继续尝试获取锁
for (;;) { 
7.1 尝试获取锁
if (TryLock (Self) > 0) break ;


if ((SyncFlags & 2) && _Responsible == NULL) {
   Atomic::cmpxchg_ptr (Self, &_Responsible, NULL) ;
}


8. 挂起线程
if (_Responsible == Self || (SyncFlags & 1)) {

   8.1 挂起有超时时间(RecheckInterval)
   Self->_ParkEvent->park ((jlong) RecheckInterval) ;
   
   8.2 设置时间值RecheckInterval
   RecheckInterval *= 8 ;
   if (RecheckInterval > 1000) RecheckInterval = 1000 ;
   
   } else {
   
   8.3 挂起没有超时时间
   Self->_ParkEvent->park() ;
   
}

------------------ 接下来的被其他线程唤醒后的操作

9. 唤醒后继续尝试获取锁
if (TryLock(Self) > 0) break ;
9.1 ...还是一些自旋策略来获取锁
}

#end : 自旋结束

------------------------ 执行到这里说明在上面自旋中获取到锁了

10. 将节点从_cxq或_EntryList里移除
UnlinkAfterAcquire (Self, &node) ;
...

return ;

}

从第3~7步骤,在没有获取到锁的情况下,一直在尝试获取锁,直到第8才挂起线程

尝试获取锁的函数 : TryLock (),第489行代码,该函数只执行1次尝试获取锁


int ObjectMonitor::TryLock (Thread * Self) {

1. 自旋,循环名存实亡
   for (;;) {
   
   1.1 获取_owner
   void * own = _owner ;
   
   1.2 判断_owner是否已经被更改,若是则退出
   if (own != NULL) return 0 ;
   
   1.3 CAS设置_owner,成功则返回1
   if (Atomic::cmpxchg_ptr (Self, &_owner, NULL) == NULL) {
         return 1 ;
   }
   
   1.4 获取锁失败   
   if (true) return -1 ;
   
   }
   
}

自旋尝试获取锁的函数 : TrySpin (),第2021行代码,该函数只执行10次尝试获取锁(默认10次)


int ObjectMonitor::TrySpin_VaryDuration (Thread * Self) {

...

1. 循环次数默认10次
for (ctr = Knob_PreSpin + 1; --ctr >= 0 ; ) {
      if (TryLock(Self) > 0) {
      
       ....
       
       return 1 ;
      }
      1.1 休息一下继续
      SpinPause () ;
    }
    
...

}

  在没有获取到锁的情况下,一直在尝试获取锁,最后成功进_cxq队列后,还是会尝试去获取锁(求生欲真的很强,不想被挂起).
  上面流程,要么是成功获取锁,要么获取不到锁期间一直不断尝试,直到最后实在获取不到(实在尽力了)而进入阻塞状态,等待其他线程的唤醒后继续加入到锁竞争中.

重量锁解锁过程 --- exit()

同样的还是objectMonitor.cpp文件的第962行代码.


void ATTR ObjectMonitor::exit(bool not_suspended, TRAPS) {

Thread * Self = THREAD ;

1. 当前持有锁的线程的_owner指向的是Lock Record,即之前是轻量锁
if (THREAD != _owner) {
     1.1 判断当前线程是轻量级锁的持有者
     if (THREAD->is_lock_owned((address) _owner)) {
       1.2 _owner指向当前线程
       _owner = THREAD ;
       
       _recursions = 0 ;
       OwnerIsThread = 1 ;
       
     } else {
     
       // 异常情况
       if (false) {
          THROW(vmSymbols::java_lang_IllegalMonitorStateException());
       }
       
       return;
     }
}
   
2. 可重入
if (_recursions != 0) {
  _recursions--;  
  return ;
}

3. 开始释放锁
for (;;) {
    
    if (Knob_ExitPolicy == 0) {
    // 默认走这里
    3.1 释放锁,此时别的线程可以抢占了,这里是导致Synchronized是一个不公平锁因素之一
    OrderAccess::release_store_ptr (&_owner, NULL) ;   
    OrderAccess::storeload() ;      
    
    3.2 没有线程在_cxq/_EntryList等待,则直接退出
    if ((intptr_t(_EntryList)|intptr_t(_cxq)) == 0 || _succ != NULL) {
       return ;
    }
    
    3.3 有线程在等待,再把之前释放的锁拿回来
    if (Atomic::cmpxchg_ptr (THREAD, &_owner, NULL) != NULL) {
       3.4 若是失败,说明被人抢占了,直接退出
       return ;
    }
    
    } else {
    ...
    }
    

ObjectWaiter * w = NULL ;
4. 获取模式,默认QMode=0
int QMode = Knob_QMode ;

....
    
5. 继续判断当前_EntryList不为空
 w = _EntryList  ;
 if (w != NULL) { 
 5.1 释放锁
 ExitEpilog (Self, w) ;
  return ;
 }
 
6. entryList指向_cxq
w = _cxq ;
6.1 _cxq也是空的,跳过本次循环,自旋回到3.1步骤的时候再次判断两个列表是否为空
if (w == NULL) continue ;

7. CAS设置_cxq为NULL
for (;;) {
  ObjectWaiter * u = (ObjectWaiter *) Atomic::cmpxchg_ptr (NULL, &_cxq, w) ;
  if (u == w) break ;
  w = u ;
}

8. 将_cxq转向给entryList
if (QMode == 1) {
// 该模式下,将cxq中的节点按颠倒顺序排到EntryList
   QMode == 1
} else {
// QMode == 0 or QMode == 2

// 默认QMode == 0 所以是走这里

8.1 开始处理_EntryList
_EntryList = w ;

9. 由于_cxq是单向链表,所以这里的_EntryList需要设置成双向链表
ObjectWaiter * q = NULL ;
ObjectWaiter * p ;
for (p = w ; p != NULL ; p = p->_next) {

    9.1 设置为TS_ENTER状态
    p->TState = ObjectWaiter::TS_ENTER ;
    
    9.2 跟上一节点连接起来
    p->_prev = q ;
    q = p ;
    
}

}

10. 释放锁
w = _EntryList  ;
  if (w != NULL) {
    ExitEpilog (Self, w) ;
    return ;
 }

}
   
}

核心的是从第3步骤的释放锁开始 :

  • cxq和EntruList都为空的情况下,直接释放锁;
  • EntryList不为空的情况下,调用ExitEpilog()函数释放锁;
  • EntryList为空,cxq不为空的情况下,将cxq指针赋值给EntryList,并且将EntryList链表变成双向链表,然后调用ExitEpilog()函数释放锁.
  • 占有锁的线程释放锁后最终会从EntryList中唤醒阻塞等待锁的线程
    • 默认值QMode=0的情况下,如果是EntryList队列不为空,则取出EntryList队头节点并唤醒
    • 默认值QMode=0的情况下,如果是EntryList队列为空,cxq不为空,则EntryList指向cxq,并取出队头节点唤醒

ExitEpilog() 释放锁函数 : 同样的还是objectMonitor.cpp文件的第1333行代码


void ObjectMonitor::ExitEpilog (Thread * Self, ObjectWaiter * Wakee) {
   1. 从队列节点里取出ParkEvent
   ParkEvent * Trigger = Wakee->_event ;
   Wakee  = NULL ;

   2. 释放锁,将_owner置空
   OrderAccess::release_store_ptr (&_owner, NULL) ;
   OrderAccess::fence() ;   
   
   3. 唤醒节点里封装的线程
   Trigger->unpark() ;
   
}

重量级锁小结

  在获取锁的过程中很努力的去尝试获取锁,尽可能的去尝试获取到锁而不用挂起阻塞,即实在没获取到才阻塞,最后成功插入cxq队列头的位置,才挂起自己;
  解锁函数exit()的3.1步骤会直接释放锁,导致此时其他线程可以去抢占锁了,而后期去唤醒entryList的头节点可能竞争不到锁,所以Synchronized是一个不公平锁.
  偏向锁、轻量级锁都是围绕Mark Word,而重量级锁是围绕ObjectMonitor对象.

重量级锁 --- Demo

# Student.java

public class Student {

    private String name;

    private Integer age;

    public Student(String name, Integer age) {
        this.name = name;
        this.age = age;
    }

    /** get、set**/

}

#QueryThreadFactory.java

public class QueryThreadFactory implements ThreadFactory {


    private final String name;

    private final AtomicInteger nextId = new AtomicInteger(1);

    public QueryThreadFactory() {
        this.name = "Thread-pool-";
    }

    @Override
    public Thread newThread(Runnable task) {
        // 设置线程名称
        String index = String.valueOf(nextId.getAndIncrement());
        String newName = name + index;
        return new Thread(task, newName);
    }

}

#QueryThread.java

public class QueryThread implements Runnable {

    private final Student student;

    public QueryThread(Student student) {
        this.student = student;
    }

    @Override
    public void run() {
        String threaName = Thread.currentThread().getName();
        System.out.println(threaName + " : 开始竞争锁");
        synchronized (student) {
            try {
                System.out.println(threaName + " : 竞争锁成功");
                Integer age = student.getAge();
                Thread.sleep(5000);
                System.out.println("学生名称 : " + student.getName());
                System.out.println("学生年龄 : " + age);
                student.setAge(++age);
            } catch (InterruptedException e) {

            }
            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {

            }
            System.out.println(threaName + " : 锁释放成功");
        }
    }
    
}

#TestQueryThread.java

public class TestQueryThread {

    private final Student student;

    public TestQueryThread(Student student) {
        this.student = student;
    }

    public static void main(String[] args) {
        // 锁对象
        Student student = new Student("QRB", 23);
        TestQueryThread queryThread = new TestQueryThread(student);

        // 初始化三条线程
        int count = 3;
        // 有界队列
        LinkedBlockingQueue<Runnable> queue = new LinkedBlockingQueue<>(count);
        // 线程池
        ExecutorService exec = new ThreadPoolExecutor(count, count, 60,
                TimeUnit.SECONDS, queue, new QueryThreadFactory());
        for (int i = 0; i < count; i++) {
            // 执行任务
            exec.execute(new QueryThread(queryThread.student));
        }
        // 关闭线程池
        exec.shutdown();

    }

}

运行结果

等待队列 --- WaitSet

  我们知道,当线程调用wait()方法,该线程将主动释放持有的Mutex Lock,并且进入阻塞状态,将封装成ObjectWaiter结点进入WaitSet双向链表中,等待下一次被其他线程调用notify()或者notifyAll()唤醒并开始继续竞争锁,也就是说必须是持有锁线程才会调用wait() \ notify() \ notifyAll() .
  Java 的Object 类提供了基于 native 实现的 wait() 、 notify() 、 notifyAll() ,这三个方法起始就是解决了同步锁的同步问题,即支持了线程间通讯、协调的方式,这三个函数也是在JVM层面实现.

挂起阻塞 --- wait()

JVM实现的wait()代码是在 synchronizer.cpp 的第379行代码.


void ObjectSynchronizer::wait(Handle obj, jlong millis, TRAPS) { 

1. 如果是偏向锁,则撤销
 if (UseBiasedLocking) {
    BiasedLocking::revoke_and_rebias(obj, false, THREAD);
}
 
2. 膨胀为重量级锁,也可以理解为获取ObjectMonitor对象
 ObjectMonitor* monitor = ObjectSynchronizer::inflate(THREAD, obj());
 DTRACE_MONITOR_WAIT_PROBE(monitor, obj(), THREAD, millis);
 
 2.1 调用ObjectMonitor的wait函数
 monitor->wait(millis, true, THREAD);

....
}

调用了wait方法后将膨胀为重量级锁了,所以最终还是调用ObjectMonitor的wait() : objectMonitor.cpp 的第1471行代码.


void ObjectMonitor::wait(jlong millis, bool interruptible, TRAPS) {

1. 获取当前线程
Thread * const Self = THREAD ;
JavaThread *jt = (JavaThread *)THREAD;

2. 如果线程中断,则抛出异常
if (interruptible && Thread::is_interrupted(Self, true) && !HAS_PENDING_EXCEPTION) { 

 2.1 抛出异常
 THROW(vmSymbols::java_lang_InterruptedException());
 return ;

}

Self->_Stalled = intptr_t(this) ;
jt->set_current_waiting_monitor(this);


3. 构造ObjectWaiter结点
ObjectWaiter node(Self);
3.1 设置状态为TS_WAIT
node.TState = ObjectWaiter::TS_WAIT ;
Self->_ParkEvent->reset() ;


4. 连接结点到WaitSet
4.1 对WaitSet操作需要获取锁
Thread::SpinAcquire (&_WaitSetLock, "WaitSet - add") ;
AddWaiter (&node) ;
4.2 对WaitSet操作释放锁
Thread::SpinRelease (&_WaitSetLock) ;

5.设置ObjectMonitor属性

5.1 记录旧的递归计数
intptr_t save = _recursions;
5.2 等待锁的线程个数加1
_waiters++;
5.3 初始化可重入数为0
_recursions = 0; 


6. 释放锁,唤醒其他线程
exit (true, Self) ;


7. 挂起
int ret = OS_OK ;
int WasNotified = 0 ;
   {
     OSThread* osthread = Self->osthread();
     OSThreadWaitState osts(osthread, true);
     {
     
       ThreadBlockInVM tbivm(jt);
       jt->set_suspend_equivalent();

       7.1 线程中断,不做任何处理
       if (interruptible && (Thread::is_interrupted(THREAD, false) || HAS_PENDING_EXCEPTION)) {
           
       } else
       if (node._notified == 0) {
       
       7.2 根据时间,调用park(),阻塞当前线程
         if (millis <= 0) {
            Self->_ParkEvent->park () ;
         } else {
            ret = Self->_ParkEvent->park (millis) ;
         }
         
       }

       7.3 阻塞状态被中断
       if (ExitSuspendEquivalent (jt)) {
          jt->java_suspend_self();
       }

   }

------------------接下来是线程被唤醒后的操作

8. 当前线程状态为TS_WAIT
if (node.TState == ObjectWaiter::TS_WAIT) {
         8.1 获取操作WaitSet的锁
         Thread::SpinAcquire (&_WaitSetLock, "WaitSet - unlink") ;
         
         8.2 状态为TS_WAIT
         if (node.TState == ObjectWaiter::TS_WAIT) {
 
         8.3 从WaitSet中删除
         DequeueSpecificWaiter (&node) ; 

         8.4 设置状态为TS_RUN
         node.TState = ObjectWaiter::TS_RUN ;
         
         }
         
         8.5 释放操作WaitSet的锁
         Thread::SpinRelease (&_WaitSetLock) ;
         
  }
  
9. 开始竞争锁
 ObjectWaiter::TStates v = node.TState ;
    9.1 如果状态为TS_RUN,则当前竞争锁的线程不在同步队列中
    if (v == ObjectWaiter::TS_RUN) {
         9.2 开始加入竞争锁,
         enter (Self) ;
     } else {
         9.3 也是竞争锁,只是当前线程在cxq或者entryList队列,由于是在同步队列中,所以竞争锁失败不用再次进入同步队列(notify唤醒后是移到cxq或者entryList)
         ReenterI (Self, &node) ;
         node.wait_reenter_end(this);
     }
     
10. 走到这,表示获取锁成功,同步队列里移出节点
UnlinkAfterAcquire (Self, SelfNode) ;

}


添加到等待队列函数 : AddWaiter (&node),第2337行代码


inline void ObjectMonitor::AddWaiter(ObjectWaiter* node) {
 
  if (_WaitSet == NULL) {
    _WaitSet = node;
    node->_prev = node;
    node->_next = node;
  } else {
    ObjectWaiter* head = _WaitSet ;
    ObjectWaiter* tail = head->_prev;
    tail->_next = node;
    head->_prev = node;
    node->_next = head;
    node->_prev = tail;
  }
  
}

由此可知WaitSet是一个双向链表.

监视调用wait()的重入函数 : ReenterI(),第757行代码.该函数表示当前线程状态不是TS_RUN,即当前线程存在同步队列中


void ATTR ObjectMonitor::ReenterI (Thread * Self, ObjectWaiter * SelfNode) {

 for (;;) {
 
 ObjectWaiter::TStates v = SelfNode->TState ;
 // 当前线程存在cxq或entryList队列
 guarantee (v == ObjectWaiter::TS_ENTER || v == ObjectWaiter::TS_CXQ, "invariant");
 
 1. 尝试一次获取锁
 if (TryLock (Self) > 0) break ;
 2. 自旋获取锁
 if (TrySpin (Self) > 0) break ;
 
 
 ----------------- 获取锁失败,挂起
 
 3. 挂起
 if (SyncFlags & 1) {
        Self->_ParkEvent->park ((jlong)1000) ;
  } else {
        Self->_ParkEvent->park () ;
 }
 
 3.1 被唤醒后继续获取锁
 if (TryLock(Self) > 0) break ;
 
 ....
 
 }

}

wait()小结

  • 当前线程肯定是在持有锁状态下才调用wait()
  • 构建ObjectWaiter结点进入WaitSet,并且WaitSet为双向链表结构
  • 释放锁并唤醒同步队列里的头部结点(enter()函数)
  • 挂起自己,等待被其他线程唤醒
  • 被唤醒,继续竞争锁

唤醒 --- notify()

synchronizer.cpp : 第410行代码.


void ObjectSynchronizer::notify(Handle obj, TRAPS) {

1. 如果是偏向锁,则撤销
if (UseBiasedLocking) {
    BiasedLocking::revoke_and_rebias(obj, false, THREAD);
  }
  
2. 获取Mark Word
 markOop mark = obj->mark();
 2.1 当前是轻量级锁,并且是当前线程,则返回(轻量级锁没有处于阻塞状态的线程)
  if (mark->has_locker() && THREAD->is_lock_owned((address)mark->locker())) {
    return;
  }

3. 膨胀为重量级锁,并调用ObjectMonitor.notify()函数
ObjectSynchronizer::inflate(THREAD, obj())->notify(THREAD);

}

最终还是调用ObjectMonitor的notify(),objectMonitor.cpp,第1706行代码.


void ObjectMonitor::notify(TRAPS) {

1. WaitSet为空,直接返回
if (_WaitSet == NULL) {
     return ;
  }

2.获取操作WaitSet的锁
Thread::SpinAcquire (&_WaitSetLock, "WaitSet - notify") ;

3. notify 策略,默认是2
int Policy = Knob_MoveNotifyee ;

4. 获取并将队头节点移出队列,即唤醒的是WaitSet的头节点
ObjectWaiter * iterator = DequeueWaiter() ;

5. 开始处理WaitSet的头节点
#if :: iterator != NULL : start
if (iterator != NULL) {
  
5.1 设置一些,例如线程信息
iterator->_notified = 1 ;
Thread * Self = THREAD;
iterator->_notifier_tid = JFR_THREAD_ID(Self);

ObjectWaiter * List = _EntryList ;

  5.2 _EntryList同步队列不为空,则将WaitSet的头节点加入到同步队列中,并设置为TS_ENTER状态
  if (List != NULL) {
     
     assert (List->_prev == NULL, "invariant") ;
     assert (List->TState == ObjectWaiter::TS_ENTER, "invariant") ;
     assert (List != iterator, "invariant") ;
     
  }
  
  // notify 策略,默认Policy=2
  
  5.3 根据不同的策略不同的处理
  if (Policy == 0) {  
      // 该策略是添加到entryList同步队列的头部
  } else if (Policy == 1) {
      // 该策略是添加到entryList同步队列的尾部
  } else if (Policy == 2) {
      // 该策略尾默认策略,所以是走这里
      // 添加到cxq同步队列的头部
      
      5.4 如果entryList同步队列为空,则直接添加到_EntryList
      if (List == NULL) {
      
         iterator->_next = iterator->_prev = NULL ;
         _EntryList = iterator ;
         
      } else {
         
         5.4 设置状态为TS_CXQ
         iterator->TState = ObjectWaiter::TS_CXQ ;
         
         5.5 添加到cxq队列头部
         for (;;) {
           ObjectWaiter * Front = _cxq ;
           iterator->_next = Front ;
           if (Atomic::cmpxchg_ptr (iterator, &_cxq, Front) == Front) {
              break ;
         }
          
      } 
  } else if (Policy == 3) {
    // 添加到cxq同步队列的尾部
  } else {
  
    5.6 都不满足,直接唤醒线程
    ParkEvent * ev = iterator->_event ;
        
    iterator->TState = ObjectWaiter::TS_RUN ;
    OrderAccess::fence() ;
    ev->unpark() ;
    
  }
         
}
#if :: iterator != NULL : end  

}

6.释放操作WaitSet链表的锁
Thread::SpinRelease (&_WaitSetLock) ;

....

}

获取WaitSet头节点,即头节点出队 --- DequeueWaiter() objectMonitor.cpp,第2357行代码.


inline ObjectWaiter* ObjectMonitor::DequeueWaiter() {
 
  ObjectWaiter* waiter = _WaitSet;
  if (waiter) {
    DequeueSpecificWaiter(waiter);
  }
  
  return waiter;
}

inline void ObjectMonitor::DequeueSpecificWaiter(ObjectWaiter* node) {

  ...
  
  1. 头节点
  ObjectWaiter* next = node->_next;
  if (next == node) {
    1.1 相等,说明WaitSet只有一个结点
    _WaitSet = NULL;
  } else {
  
    1.2 删除头节点
    ObjectWaiter* prev = node->_prev;
    assert(prev->_next == node, "invariant check");
    assert(next->_prev == node, "invariant check");
    next->_prev = prev;
    prev->_next = next;
    
    if (_WaitSet == node) {
      1.3 将_WaitSet往后移动,指向下一个节点
      _WaitSet = next;
    }
    
  }
  
  node->_next = NULL;
  node->_prev = NULL;
  
}

notify()小结

  • 在默认策略下,对于操作WaitSet需要获取锁,并且在获取操作WaitSet锁成功后,通过DequeueWaiter()函数获取头部结点,所以可以知道notify()唤醒的是WaitSet链表的头节点
  • 根据策略不同,对于唤醒的结点处理的方式不同,以默认Policy=2为例子 :
    • 如果entryList同步队列为空,则直接添加到_EntryList
    • 否则entryList不为空,则添加到cxq队列头部
  • notify()干的活就是将WaitSet头节点从等待队列里移出并加入到同步队列里开始新的一轮锁竞争

唤醒全部 --- notifyAll()

  只要理解了上面所说的notify()函数,就可以很快的理解了notifyAll()函数,它们两的区别在于,notifyAll()是唤醒WaitSet链表的全部结点,所以我们直接看默认策略下的代码 :


#ObjectMonitor.cpp

 if (Policy == 2) { 
    
    iterator->TState = ObjectWaiter::TS_CXQ ;
      for (;;) {
      
        ObjectWaiter * Front = _cxq ;
        iterator->_next = Front ;
        
        if (Atomic::cmpxchg_ptr (iterator, &_cxq, Front) == Front) {
           break ;
        }
        
    }
    
 }
 

与notify不同的是,此处是直接插入到_cxq头部了,也就是说原本在等待队列末尾的节点反而排在了同步队列的前面.

一图胜千言

总结

  • 重量级锁获取对象是通过CAS设置_owner字段来获取锁.
  • 围绕同步队列(EntryList)、等待队列(WaitSet)、owner字段等,ObjectMonitor跟管程思想有很大的相似度.
  • 根据不同策略,notify可以有不同的方式将等待队列的结点添加到同步队列,而默认模式是取头节点,即默认模式下,notify唤醒的并不是随机唤醒某个结点.

结束语

  • 原创不易
  • 希望看完这篇文章的你有所收获!

相关参考资料


目录