把学习当成一种习惯
选择往往大于努力,越努力越幸运

  • 本文对源码的学习是基于JDK1.8版本.
  • 对于我这几篇ConcurrentHashMap的文章,必须建立在学习过HashMap的源码基础之上.

前言

  基于 并发容器:ConcurrentHashMap之putVal()源码分析(一) ,我们大致理解了ConcurrentHashMap的底层实现数据结构,并且对于添加元素、初始化table、根据key值获取value进行了源码分析,并且如果当前存在其他线程处于扩容状态(即存在ForwardingNode结点),则当前线程会执行helpTransfer()执行帮助扩容,即ConcurrentHashMap是支持多线程执行transfer()扩容函数的;.
  基于 并发容器:ConcurrentHashMap之addCount()源码分析(二) ,我们大致理解了在成功将元素添加到容器后,需要添加当前容器元素的个数 和 检查是否需要扩容;对于添加元素的个数,是支持多线程操作的,同样的对于扩容函数transfer()也是支持多线程执行的.
  本文对于ConcurrentHashMap是如何支持多线程执行扩容的实现展开源码分析.

transfer()

核心作用 : 多线程扩容操作.

实现思路

先用文字简单说明一下transfer()是如何实现多线程扩容的 : 把旧数组的数据迁移到新数组

  • ① 分段处理:即每条线程各自迁移自己所分配到的桶数据,例如每条线程迁移各自的16个桶数据
  • ② 根据边界值来给每条线程分配各自的桶数据
  • ③ 根据分配到的边界值循环遍历把旧数组的数据迁移到新数组中
  • ④ 当桶的数据迁移完成后,会把当前桶存储为ForwardingNode结点,表示当前桶已经完成数据的迁移

只有第③步是基于新数组操作,其他都是基于旧数组操作

transfer() --- 全局变量


// 当前服务器的CPU核数
static final int NCPU = Runtime.getRuntime().availableProcessors();

// 每条线程最少分配16个桶
private static final int MIN_TRANSFER_STRIDE = 16;

// 新数组对象: 旧数组的数据迁移到该数组中
private transient volatile Node<K,V>[] nextTable;

// 多线程分配资源的一个关键字段,通过该值可以知道从哪个索引值开始分配资源
// 从最右边(数组长度-1)开始向左边分配
// 例如旧数组长度为64,每个线程分配16个桶,那么
// 第一条线程执行扩容的时候,transferIndex值为64,
// 基于transferIndex、每条线程分配16个桶数计算出索引范围是 [48,63] 的桶,
// 并且通过CAS设置transferIndex记录为48,告诉其他线程从48开始向右分配.
// 以此类推 : 
// 第二条线程将分配到索引为 [32,47] 的桶,此时的transferIndex将会被记录为32
// 第三条线程将分配到索引为 [16,31] 的桶,此时的transferIndex将会被记录为16
// ....
private transient volatile int transferIndex;

transfer() --- 简化分析

简单分为三大步骤来分析 :


      private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
      // 旧数组长度、每条线程分配的桶数
      int n = tab.length, stride;
      1. // 计算每条线程分配的桶数
      if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
            1.1 // 保证了每条线程分配的桶数最少是16个
            stride = MIN_TRANSFER_STRIDE;
      2. // 第一条线程执行扩容操作,需要初始化新数组        
      if (nextTab == null) {
            try {
                2.1 // 初始化新数组,扩充到原来的2倍
                @SuppressWarnings("unchecked")
                Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
                nextTab = nt;
            } catch (Throwable ex) {
                sizeCtl = Integer.MAX_VALUE;
                return;
            }
            2.2 // 成功初始化新数组nextTable
            nextTable = nextTab;
            2.3 // 从最右边开始分配资源
            transferIndex = n;
      }

      3. // 根据transferIndex给各自的线程分配资源
      // 做到每条线程迁移各自的桶数据,互不干扰
      // 这里代码量比较多,暂时不贴出来,继续往下看
      
      ......
     
     }

函数的两个参数说明 :

  • Node<K,V>[] tab : 旧数组对象
  • Node<K,V>[] nextTab : 新数组对象

三大步骤分析 :

  • ① : 计算每条线程分配的桶数
    • 如果NCPU为单线程环境下,则直接取数组长度.
    • 否则为多线程环境下,数组长度右移3位(除以8),再除以CPU核数;这里为什么要除以8,个人认为可以让更多的线程参与扩容(同一时间线程数最多为当前核数的8倍线程 + 1 条线程并行扩容),假如不除以8而是直接除以核数,那么分配到的桶数会比较大,间接的参与扩容的线程数就减少了.
    • 无论是单线程或者多线程环境下,分配的桶数必须最少16个.
  • ② : 第一条线程执行扩容操作,需要初始化新数组nextTable : 这里我一开始有个疑惑,transfer()是支持多线程执行的,并且新数组nextTable是全局变量,那么这里的初始化nextTable,就存在多线程操作的可能;但是我想说的是这里并没有并发问题 : 通过全局搜索 transfer() , 发现只有 helpTransfer() 和 addCount() 函数 调用 transfer() :
    • helpTransfer() : 该函数要执行 transfer() 的前提条件是 nextTable 不为null,所以函数排除了;
    • addCount() : 结合并发容器:ConcurrentHashMap之addCount()源码分析(二) 这篇文章的 检查是否扩容 代码块的第3.1步骤的第四个判断条件 ((nt = nextTable) == null) 和 第4步骤 来分析: 当前线程如果执行到第3步骤,说明当前是多线程扩容状态,如果当前线程执行到3.1步骤的第四个判断条件为true,说明存在第一条线程在初始化新数组nextTable,那么直接break,重新循环执行.这里就可以保证了初始化新数组nextTable是在单条线程下执行的.
  • ③ : 根据transferIndex、stride(桶数)来给每条线程分配资源,每个线程把分配到的桶的数据迁移到新数组nextTable中;对每个桶里的数据如果迁移完成,则该桶设置存储为ForwardingNode结点.

第3步骤 : 数据迁移,具体分析在下个知识点,以上三个步骤,我将举了个例子,如下图 :

第3步骤 --- 数据迁移


        int nextn = nextTab.length;
        // 创建ForwardingNode结点
        ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
        // 是否还有资源需要迁移,即是否继续向左边推进
        boolean advance = true;
        // 是否全部线程已完成扩容操作
        boolean finishing = false;
        
        1. // 每个线程都有各自的数组索引边界值 : [bound,i]
        for (int i = 0, bound = 0;;) {
        
            Node<K,V> f; int fh;
            
            # 当前线程查找是否还有资源需要迁移 : 
            # ① 向左推进,确认自己的 [bound,i] 索引值,即自己分配到的资源还没迁移完成 
            # ② 或者 当前已经无需再帮忙扩容了,即没有资源可以分配了 
            # ③ 或者 还存在未迁移的桶数,需基于transferIndex分配 [bound,i]
            
            1.1 // 是否需要继续迁移数据(向左边移动)
            while (advance) {
                int nextIndex, nextBound;
                1.2 // --i : i数组索引值左移一步
                // 如果大于等于bound,表示索引值i还处于[bound,i]之内 则跳出while循环执行下面的第3步骤的迁移数据操作
                // 或者 已完成扩容操作,则跳出while循环,执行下面的第2步骤的代码块
                if (--i >= bound || finishing)
                    advance = false;
                1.3 // 索引值i不处于[bound,i]之内
                // transferIndex <=0 表示当前已经无需再帮忙扩容了
                else if ((nextIndex = transferIndex) <= 0) {
                    // i设置为-1,对应于下面的第2步骤
                    i = -1;
                    // 停止向左推进
                    advance = false;
                }
                1.4 // 当前需要再帮忙扩容,设置transferIndex值
                // 然后给当前线程分配 [bound,i]
                else if (U.compareAndSwapInt
                         (this, TRANSFERINDEX, nextIndex,
                          nextBound = (nextIndex > stride ?
                                       nextIndex - stride : 0))) {
                    // 分配 [bound,i]
                    bound = nextBound;
                    i = nextIndex - 1;
                    // 分配完毕,跳出while循环
                    advance = false;
                }
            }
            
            # 查找分配资源结束 : 
            # ① 要么不用当前线程帮忙扩容了,即当前线程没有资源可分配 则 执行 第2步骤代码块
            # ② 要么还在帮忙扩容阶段,即当前线程分配到资源或者之前分配的资源还没迁移完成 :
            # 则迁移数据 即执行第3步骤后面代码块
            
            
            ## 只有当前线程已经无需再帮忙扩容了,才需要检查一下其他线程是否完成扩容
            
            2. // i < 0 ,结合 1.3 步骤 : 表示 当前已经无需再帮忙扩容了,
            // 后面两个判断条件不知道为啥要这么判断
            if (i < 0 || i >= n || i + n >= nextn) {
                int sc;
                2.1 // 全部线程已经完成扩容
                if (finishing) {
                    2.1.1 // GC nextTable
                    nextTable = null;
                    2.1.2 // 新数组赋值给 table
                    table = nextTab;
                    2.1.3 // 设置阈值
                    sizeCtl = (n << 1) - (n >>> 1);
                    return;
                }
                2.2 // 线程数减1
                if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
                     2.2.1 // 判断是否是最后一个扩容线程,如果是,则需要重新扫描一遍桶数组,做二次确认
                    if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
                        // 当前还存在其他线程扩容中,退出transfer()
                        return;
                      
                    // PS : 这里最后一个扩容线程的二次检查是没有必要的
                    finishing = advance = true;
                    i = n;
                }
            }
            
            ## 检查完毕

            
            3. // 当前迁移的桶没有数据,直接CAS设置为 ForwardingNode结点
            else if ((f = tabAt(tab, i)) == null)
                3.1 // CAS设置成功,回到最开始的while继续向左推进
                advance = casTabAt(tab, i, null, fwd);
            4. // 当前桶是ForwardingNode结点,回到最开始的while继续向左推进
            else if ((fh = f.hash) == MOVED)
                advance = true;
            else {
            
            5. // 当前迁移的桶存在数据,需要迁移
            
            ### 开始迁移数据
            
                synchronized (f) {
                
                    if (tabAt(tab, i) == f) {
                        
                        ### 链表 start
                        
                        5.1 // 当前桶存放的是一条链表
                        // 把链表分为 低位0 和 高位1 两条链表
                        // 低位0的元素的存放位置保持桶位不变
                        // 高位1的元素的存放位置则发生变化 : 
                        // 即在原来桶位索引值基础上再加上旧数组长度
                        // 高位1发生变化的原理我在HashMap第二篇文章已经画图说明过了,忘记的可以回头看看
                        // ln为低位链表
                        // hn为高位链表
                        Node<K,V> ln, hn;
                        
                        if (fh >= 0) {
                        
                            // 我们只需要知道最终的结果就是会拆分成两条链表 : 低位链表(ln)、高位链表(hn)
                            // 我们正常的思路就是定义两个变量(高低位链表),
                            // 然后遍历链表,根据计算来判断结点是存在低位链表还是高位链表
                            // 而作者 Doug Lea 却不是简单的遍历链表,但结果都是一样的
                            
                            #start 花里胡哨、反人类
                            
                            # 寻找lastRun结点 : 不懂的先看下面的图,再回来看代码,文字表达越说越乱
                            int runBit = fh & n;
                            Node<K,V> lastRun = f;
                            for (Node<K,V> p = f.next; p != null; p = p.next) {
                                int b = p.hash & n;
                                if (b != runBit) {
                                    runBit = b;
                                    lastRun = p;
                                }
                            }
                            
                            # 寻找lastRun结点
                            
                            // 把lastRun根据 runBit设置 ln或者hn
                            if (runBit == 0) {
                                ln = lastRun;
                                hn = null;
                            }
                            else {
                                hn = lastRun;
                                ln = null;
                            }
                            
                           
                            #end 花里胡哨、反人类
                            
                            5.2 // 只处理lastRun前的结点
                            for (Node<K,V> p = f; p != lastRun; p = p.next) {
                                int ph = p.hash; K pk = p.key; V pv = p.val;
                                5.2.1 // 低位 连接上 ln
                                if ((ph & n) == 0)
                                    ln = new Node<K,V>(ph, pk, pv, ln);
                                5.2.2 // 高位 连接上 hn
                                else
                                    hn = new Node<K,V>(ph, pk, pv, hn);
                            }
                            
                            5.3 // ln低位链表保持原来桶位置
                            setTabAt(nextTab, i, ln);
                            5.4 // hn高位链表在原来桶位置基础上再加上旧数组长度
                            setTabAt(nextTab, i + n, hn);
                            5.5 // 当前桶数据迁移完成,直接CAS设置为 ForwardingNode结点
                            setTabAt(tab, i, fwd);
                            5.6 // 该桶位置已经迁移完成,然后回到最开始的while继续向左推进
                            advance = true;
                        }
                        
                        ### 链表 end
                        
                        #### 红黑树 start 红黑树结合一篇文章来更加详细的学习
                        else if (f instanceof TreeBin) {
                            TreeBin<K,V> t = (TreeBin<K,V>)f;
                            TreeNode<K,V> lo = null, loTail = null;
                            TreeNode<K,V> hi = null, hiTail = null;
                            int lc = 0, hc = 0;
                            for (Node<K,V> e = t.first; e != null; e = e.next) {
                                int h = e.hash;
                                TreeNode<K,V> p = new TreeNode<K,V>
                                    (h, e.key, e.val, null, null);
                                if ((h & n) == 0) {
                                    if ((p.prev = loTail) == null)
                                        lo = p;
                                    else
                                        loTail.next = p;
                                    loTail = p;
                                    ++lc;
                                }
                                else {
                                    if ((p.prev = hiTail) == null)
                                        hi = p;
                                    else
                                        hiTail.next = p;
                                    hiTail = p;
                                    ++hc;
                                }
                            }
                            ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
                                (hc != 0) ? new TreeBin<K,V>(lo) : t;
                            hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
                                (lc != 0) ? new TreeBin<K,V>(hi) : t;
                            setTabAt(nextTab, i, ln);
                            setTabAt(nextTab, i + n, hn);
                            setTabAt(tab, i, fwd);
                            advance = true;
                        }
                        
                        #### 红黑树 end
                        
                    }
                }
            }
        }

lastRun结点

小结

总体分为三大步骤 :

  • ① 继续在自己分配到的资源向左推进迁移数据或者基于transferIndex向左推进分配资源,这个步骤可以确认线程是继续迁移数据或者没有分配到资源.
  • ② 如果线程没有分配到资源,则检查是否全部线程已经完成扩容,如果是则设置标识和把新数组赋值给table;否则直接return退出扩容函数.
  • ③ 如果线程分配到资源(或者说分配到的资源还没处理完),则需要继续迁移数据,迁移完当前桶的数据后将继续回到第①步继续向左推进.

helpTransfer()

  上面的transfer()函数能够理解的情况下,helpTransfer()理解起来也会容易多了.
  并发容器:ConcurrentHashMap之putVal()源码分析(一) 这篇文章的 putVal() 函数中,当(fh = f.hash) == MOVED 这个条件为true时,表示当前桶位置存放的是一个 ForwardingNode 结点 , 即当前容器处于 正在扩容阶段 , 所以执行 helpTransfer() 帮忙扩容; 我们再看看下面 ForwardingNode 结点的构造器 , 再结合transfer()函数中对于 ForwardingNode 结点 的CAS设置,是不是这几个知识点就关联起来了?


    ForwardingNode(Node<K,V>[] tab) {
        super(MOVED, null, null, null);
        this.nextTable = tab;
    }

helpTransfer() : 帮忙扩容


    final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) {
        Node<K,V>[] nextTab; int sc;
        1. // 当前桶位置存放的是一个 ForwardingNode
        if (tab != null && (f instanceof ForwardingNode) &&
            (nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) {
            2. // 基于数组长度计算一个16bit位的高位标识
            int rs = resizeStamp(tab.length);
            3. // sizeCtl < 0 表示正处于多线程扩容阶段
            while (nextTab == nextTable && table == tab &&
                   (sc = sizeCtl) < 0) {
                3.1 // 下面这几个判断忘记的可以看看第二篇文章的 addCount() 中的 检查是否扩容 这一代码块 
                if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
                    sc == rs + MAX_RESIZERS || transferIndex <= 0)
                    break;
                3.2 // sizeCtl 低16bit位表示当前正在扩容的线程数
                // 并且 等于2时表示当前只有一条线程正在扩容
                
                // sizeCtl + 1 , 新增一条线程帮忙扩容
                if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) {
                    // 执行扩容函数
                    transfer(tab, nextTab);
                    break;
                }
            }
            return nextTab;
        }
        return table;
    }

  helpTransfer()还是挺简单的,前提是这篇并发容器:ConcurrentHashMap之addCount()源码分析(二) 文章的 检查是否扩容 代码块 和本文的 transfer() 能否理解.

总结

结束语

  • 原创不易
  • 希望看完这篇文章的你有所收获!
  • 红黑树会独立一篇文章

相关参考资料

  • JDK1.8

目录