
- 本文对源码的学习是基于JDK1.8版本.
- 对于我这几篇ConcurrentHashMap的文章,必须建立在学习过HashMap的源码基础之上.
前言
基于 并发容器:ConcurrentHashMap之putVal()源码分析(一) ,我们大致理解了ConcurrentHashMap的底层实现数据结构,并且对于添加元素、初始化table、根据key值获取value进行了源码分析,并且如果当前存在其他线程处于扩容状态(即存在ForwardingNode结点),则当前线程会执行helpTransfer()执行帮助扩容,即ConcurrentHashMap是支持多线程执行transfer()扩容函数的;.
基于 并发容器:ConcurrentHashMap之addCount()源码分析(二) ,我们大致理解了在成功将元素添加到容器后,需要添加当前容器元素的个数 和 检查是否需要扩容;对于添加元素的个数,是支持多线程操作的,同样的对于扩容函数transfer()也是支持多线程执行的.
本文对于ConcurrentHashMap是如何支持多线程执行扩容的实现展开源码分析.
transfer()
核心作用 : 多线程扩容操作.
实现思路
先用文字简单说明一下transfer()是如何实现多线程扩容的 : 把旧数组的数据迁移到新数组
- ① 分段处理:即每条线程各自迁移自己所分配到的桶数据,例如每条线程迁移各自的16个桶数据
- ② 根据边界值来给每条线程分配各自的桶数据
- ③ 根据分配到的边界值循环遍历把旧数组的数据迁移到新数组中
- ④ 当桶的数据迁移完成后,会把当前桶存储为ForwardingNode结点,表示当前桶已经完成数据的迁移
只有第③步是基于新数组操作,其他都是基于旧数组操作

transfer() --- 全局变量
// 当前服务器的CPU核数
static final int NCPU = Runtime.getRuntime().availableProcessors();
// 每条线程最少分配16个桶
private static final int MIN_TRANSFER_STRIDE = 16;
// 新数组对象: 旧数组的数据迁移到该数组中
private transient volatile Node<K,V>[] nextTable;
// 多线程分配资源的一个关键字段,通过该值可以知道从哪个索引值开始分配资源
// 从最右边(数组长度-1)开始向左边分配
// 例如旧数组长度为64,每个线程分配16个桶,那么
// 第一条线程执行扩容的时候,transferIndex值为64,
// 基于transferIndex、每条线程分配16个桶数计算出索引范围是 [48,63] 的桶,
// 并且通过CAS设置transferIndex记录为48,告诉其他线程从48开始向右分配.
// 以此类推 :
// 第二条线程将分配到索引为 [32,47] 的桶,此时的transferIndex将会被记录为32
// 第三条线程将分配到索引为 [16,31] 的桶,此时的transferIndex将会被记录为16
// ....
private transient volatile int transferIndex;
transfer() --- 简化分析
简单分为三大步骤来分析 :
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
// 旧数组长度、每条线程分配的桶数
int n = tab.length, stride;
1. // 计算每条线程分配的桶数
if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
1.1 // 保证了每条线程分配的桶数最少是16个
stride = MIN_TRANSFER_STRIDE;
2. // 第一条线程执行扩容操作,需要初始化新数组
if (nextTab == null) {
try {
2.1 // 初始化新数组,扩充到原来的2倍
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
nextTab = nt;
} catch (Throwable ex) {
sizeCtl = Integer.MAX_VALUE;
return;
}
2.2 // 成功初始化新数组nextTable
nextTable = nextTab;
2.3 // 从最右边开始分配资源
transferIndex = n;
}
3. // 根据transferIndex给各自的线程分配资源
// 做到每条线程迁移各自的桶数据,互不干扰
// 这里代码量比较多,暂时不贴出来,继续往下看
......
}
函数的两个参数说明 :
- Node<K,V>[] tab : 旧数组对象
- Node<K,V>[] nextTab : 新数组对象
三大步骤分析 :
- ① : 计算每条线程分配的桶数
- 如果NCPU为单线程环境下,则直接取数组长度.
- 否则为多线程环境下,数组长度右移3位(除以8),再除以CPU核数;这里为什么要除以8,个人认为可以让更多的线程参与扩容(同一时间线程数最多为当前核数的8倍线程 + 1 条线程并行扩容),假如不除以8而是直接除以核数,那么分配到的桶数会比较大,间接的参与扩容的线程数就减少了.
- 无论是单线程或者多线程环境下,分配的桶数必须最少16个.
- ② : 第一条线程执行扩容操作,需要初始化新数组nextTable : 这里我一开始有个疑惑,transfer()是支持多线程执行的,并且新数组nextTable是全局变量,那么这里的初始化nextTable,就存在多线程操作的可能;但是我想说的是这里并没有并发问题 : 通过全局搜索 transfer() , 发现只有 helpTransfer() 和 addCount() 函数 调用 transfer() :
- helpTransfer() : 该函数要执行 transfer() 的前提条件是 nextTable 不为null,所以函数排除了;
- addCount() : 结合并发容器:ConcurrentHashMap之addCount()源码分析(二) 这篇文章的 检查是否扩容 代码块的第3.1步骤的第四个判断条件 ((nt = nextTable) == null) 和 第4步骤 来分析: 当前线程如果执行到第3步骤,说明当前是多线程扩容状态,如果当前线程执行到3.1步骤的第四个判断条件为true,说明存在第一条线程在初始化新数组nextTable,那么直接break,重新循环执行.这里就可以保证了初始化新数组nextTable是在单条线程下执行的.
- ③ : 根据transferIndex、stride(桶数)来给每条线程分配资源,每个线程把分配到的桶的数据迁移到新数组nextTable中;对每个桶里的数据如果迁移完成,则该桶设置存储为ForwardingNode结点.
第3步骤 : 数据迁移,具体分析在下个知识点,以上三个步骤,我将举了个例子,如下图 :

第3步骤 --- 数据迁移
int nextn = nextTab.length;
// 创建ForwardingNode结点
ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
// 是否还有资源需要迁移,即是否继续向左边推进
boolean advance = true;
// 是否全部线程已完成扩容操作
boolean finishing = false;
1. // 每个线程都有各自的数组索引边界值 : [bound,i]
for (int i = 0, bound = 0;;) {
Node<K,V> f; int fh;
# 当前线程查找是否还有资源需要迁移 :
# ① 向左推进,确认自己的 [bound,i] 索引值,即自己分配到的资源还没迁移完成
# ② 或者 当前已经无需再帮忙扩容了,即没有资源可以分配了
# ③ 或者 还存在未迁移的桶数,需基于transferIndex分配 [bound,i]
1.1 // 是否需要继续迁移数据(向左边移动)
while (advance) {
int nextIndex, nextBound;
1.2 // --i : i数组索引值左移一步
// 如果大于等于bound,表示索引值i还处于[bound,i]之内 则跳出while循环执行下面的第3步骤的迁移数据操作
// 或者 已完成扩容操作,则跳出while循环,执行下面的第2步骤的代码块
if (--i >= bound || finishing)
advance = false;
1.3 // 索引值i不处于[bound,i]之内
// transferIndex <=0 表示当前已经无需再帮忙扩容了
else if ((nextIndex = transferIndex) <= 0) {
// i设置为-1,对应于下面的第2步骤
i = -1;
// 停止向左推进
advance = false;
}
1.4 // 当前需要再帮忙扩容,设置transferIndex值
// 然后给当前线程分配 [bound,i]
else if (U.compareAndSwapInt
(this, TRANSFERINDEX, nextIndex,
nextBound = (nextIndex > stride ?
nextIndex - stride : 0))) {
// 分配 [bound,i]
bound = nextBound;
i = nextIndex - 1;
// 分配完毕,跳出while循环
advance = false;
}
}
# 查找分配资源结束 :
# ① 要么不用当前线程帮忙扩容了,即当前线程没有资源可分配 则 执行 第2步骤代码块
# ② 要么还在帮忙扩容阶段,即当前线程分配到资源或者之前分配的资源还没迁移完成 :
# 则迁移数据 即执行第3步骤后面代码块
## 只有当前线程已经无需再帮忙扩容了,才需要检查一下其他线程是否完成扩容
2. // i < 0 ,结合 1.3 步骤 : 表示 当前已经无需再帮忙扩容了,
// 后面两个判断条件不知道为啥要这么判断
if (i < 0 || i >= n || i + n >= nextn) {
int sc;
2.1 // 全部线程已经完成扩容
if (finishing) {
2.1.1 // GC nextTable
nextTable = null;
2.1.2 // 新数组赋值给 table
table = nextTab;
2.1.3 // 设置阈值
sizeCtl = (n << 1) - (n >>> 1);
return;
}
2.2 // 线程数减1
if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
2.2.1 // 判断是否是最后一个扩容线程,如果是,则需要重新扫描一遍桶数组,做二次确认
if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
// 当前还存在其他线程扩容中,退出transfer()
return;
// PS : 这里最后一个扩容线程的二次检查是没有必要的
finishing = advance = true;
i = n;
}
}
## 检查完毕
3. // 当前迁移的桶没有数据,直接CAS设置为 ForwardingNode结点
else if ((f = tabAt(tab, i)) == null)
3.1 // CAS设置成功,回到最开始的while继续向左推进
advance = casTabAt(tab, i, null, fwd);
4. // 当前桶是ForwardingNode结点,回到最开始的while继续向左推进
else if ((fh = f.hash) == MOVED)
advance = true;
else {
5. // 当前迁移的桶存在数据,需要迁移
### 开始迁移数据
synchronized (f) {
if (tabAt(tab, i) == f) {
### 链表 start
5.1 // 当前桶存放的是一条链表
// 把链表分为 低位0 和 高位1 两条链表
// 低位0的元素的存放位置保持桶位不变
// 高位1的元素的存放位置则发生变化 :
// 即在原来桶位索引值基础上再加上旧数组长度
// 高位1发生变化的原理我在HashMap第二篇文章已经画图说明过了,忘记的可以回头看看
// ln为低位链表
// hn为高位链表
Node<K,V> ln, hn;
if (fh >= 0) {
// 我们只需要知道最终的结果就是会拆分成两条链表 : 低位链表(ln)、高位链表(hn)
// 我们正常的思路就是定义两个变量(高低位链表),
// 然后遍历链表,根据计算来判断结点是存在低位链表还是高位链表
// 而作者 Doug Lea 却不是简单的遍历链表,但结果都是一样的
#start 花里胡哨、反人类
# 寻找lastRun结点 : 不懂的先看下面的图,再回来看代码,文字表达越说越乱
int runBit = fh & n;
Node<K,V> lastRun = f;
for (Node<K,V> p = f.next; p != null; p = p.next) {
int b = p.hash & n;
if (b != runBit) {
runBit = b;
lastRun = p;
}
}
# 寻找lastRun结点
// 把lastRun根据 runBit设置 ln或者hn
if (runBit == 0) {
ln = lastRun;
hn = null;
}
else {
hn = lastRun;
ln = null;
}
#end 花里胡哨、反人类
5.2 // 只处理lastRun前的结点
for (Node<K,V> p = f; p != lastRun; p = p.next) {
int ph = p.hash; K pk = p.key; V pv = p.val;
5.2.1 // 低位 连接上 ln
if ((ph & n) == 0)
ln = new Node<K,V>(ph, pk, pv, ln);
5.2.2 // 高位 连接上 hn
else
hn = new Node<K,V>(ph, pk, pv, hn);
}
5.3 // ln低位链表保持原来桶位置
setTabAt(nextTab, i, ln);
5.4 // hn高位链表在原来桶位置基础上再加上旧数组长度
setTabAt(nextTab, i + n, hn);
5.5 // 当前桶数据迁移完成,直接CAS设置为 ForwardingNode结点
setTabAt(tab, i, fwd);
5.6 // 该桶位置已经迁移完成,然后回到最开始的while继续向左推进
advance = true;
}
### 链表 end
#### 红黑树 start 红黑树结合一篇文章来更加详细的学习
else if (f instanceof TreeBin) {
TreeBin<K,V> t = (TreeBin<K,V>)f;
TreeNode<K,V> lo = null, loTail = null;
TreeNode<K,V> hi = null, hiTail = null;
int lc = 0, hc = 0;
for (Node<K,V> e = t.first; e != null; e = e.next) {
int h = e.hash;
TreeNode<K,V> p = new TreeNode<K,V>
(h, e.key, e.val, null, null);
if ((h & n) == 0) {
if ((p.prev = loTail) == null)
lo = p;
else
loTail.next = p;
loTail = p;
++lc;
}
else {
if ((p.prev = hiTail) == null)
hi = p;
else
hiTail.next = p;
hiTail = p;
++hc;
}
}
ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
(hc != 0) ? new TreeBin<K,V>(lo) : t;
hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
(lc != 0) ? new TreeBin<K,V>(hi) : t;
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
setTabAt(tab, i, fwd);
advance = true;
}
#### 红黑树 end
}
}
}
}
lastRun结点

小结
总体分为三大步骤 :
- ① 继续在自己分配到的资源向左推进迁移数据或者基于transferIndex向左推进分配资源,这个步骤可以确认线程是继续迁移数据或者没有分配到资源.
- ② 如果线程没有分配到资源,则检查是否全部线程已经完成扩容,如果是则设置标识和把新数组赋值给table;否则直接return退出扩容函数.
- ③ 如果线程分配到资源(或者说分配到的资源还没处理完),则需要继续迁移数据,迁移完当前桶的数据后将继续回到第①步继续向左推进.
helpTransfer()
上面的transfer()函数能够理解的情况下,helpTransfer()理解起来也会容易多了.
并发容器:ConcurrentHashMap之putVal()源码分析(一) 这篇文章的 putVal() 函数中,当(fh = f.hash) == MOVED 这个条件为true时,表示当前桶位置存放的是一个 ForwardingNode 结点 , 即当前容器处于 正在扩容阶段 , 所以执行 helpTransfer() 帮忙扩容; 我们再看看下面 ForwardingNode 结点的构造器 , 再结合transfer()函数中对于 ForwardingNode 结点 的CAS设置,是不是这几个知识点就关联起来了?
ForwardingNode(Node<K,V>[] tab) {
super(MOVED, null, null, null);
this.nextTable = tab;
}
helpTransfer() : 帮忙扩容
final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) {
Node<K,V>[] nextTab; int sc;
1. // 当前桶位置存放的是一个 ForwardingNode
if (tab != null && (f instanceof ForwardingNode) &&
(nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) {
2. // 基于数组长度计算一个16bit位的高位标识
int rs = resizeStamp(tab.length);
3. // sizeCtl < 0 表示正处于多线程扩容阶段
while (nextTab == nextTable && table == tab &&
(sc = sizeCtl) < 0) {
3.1 // 下面这几个判断忘记的可以看看第二篇文章的 addCount() 中的 检查是否扩容 这一代码块
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || transferIndex <= 0)
break;
3.2 // sizeCtl 低16bit位表示当前正在扩容的线程数
// 并且 等于2时表示当前只有一条线程正在扩容
// sizeCtl + 1 , 新增一条线程帮忙扩容
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) {
// 执行扩容函数
transfer(tab, nextTab);
break;
}
}
return nextTab;
}
return table;
}
helpTransfer()还是挺简单的,前提是这篇并发容器:ConcurrentHashMap之addCount()源码分析(二) 文章的 检查是否扩容 代码块 和本文的 transfer() 能否理解.
总结
- 并发容器:ConcurrentHashMap之putVal()源码分析(一)、并发容器:ConcurrentHashMap之addCount()源码分析(二) 和 本文的知识点,可以说由点到线连接起来了,后面还有一篇查漏补缺及总结的文章,这三篇文章如果能够串联起来,对ConcurrentHashMap底层的实现也有一个深刻的理解了.
结束语
- 原创不易
- 希望看完这篇文章的你有所收获!
- 红黑树会独立一篇文章
相关参考资料
- JDK1.8