把学习当成一种习惯
选择往往大于努力,越努力越幸运

  • 本文对源码的学习是基于JDK1.8版本.
  • 对于我这几篇ConcurrentHashMap的文章,必须建立在学习过HashMap的源码基础之上.
  • 本文只是先暂时大致了解ConcurrentHashMap的底层实现思路及putVal()、initTable()、get() 函数的源码分析 ,先简单的热热身,对于ConcurrentHashMap,里面的精华太多了,一个函数就可以用一篇文章来概述了.

前言

  基于浅谈HashMap(一)再谈HashMap(二)再谈HashMap(三) 这三篇文章的学习,我们对HashMap的底层实现原理有了一个质的提升.对于HashMap容器,它并不是一个线程安全的容器,即在多线程并发环境下会存在并发问题,为了解决并发问题而引入了HashTable,HashTable的底层实现原理跟HashMap有很大的相似之处,只是HashTable引入了synchronized解决了并发问题.HashTable对于synchronized的使用是直接在函数使用,这样就导致了对于临界资源的可伸缩性降低了(例如锁的粒度太大);在并发度较高的环境下,性能就会很低.
  从上面的HashMap到HashTable的介绍,我们可以知道,HashMap是不支持并发环境下使用的,而HashTable在并发度较高的环境下会导致性能下降,而ConcurrentHashMap就是来解决这两个问题.

ConcurrentHashMap

介绍

  ConcurrentHashMap实现的底层数据结构还是同HashMap一样,使用的数组+链表+红黑树,而为了解决并发问题,引入了CAS和synchronized.

三个原子操作的函数 --- 底层数组table有关

这三个函数暂时知道其作用就好,后面会一篇文章来分析.


    // 根据数组索引值获取该索引值存放的元素
    static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) {
        return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE);
    }
    
    // CAS设置数组索引值i的元素
    static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i,
                                        Node<K,V> c, Node<K,V> v) {
        return U.compareAndSwapObject(tab, ((long)i << ASHIFT) + ABASE, c, v);
    }
    
    
    // 直接设置tab数组索引值i的元素
    static final <K,V> void setTabAt(Node<K,V>[] tab, int i, Node<K,V> v) {
        U.putObjectVolatile(tab, ((long)i << ASHIFT) + ABASE, v);
    }


putVal()

作用 : 该函数是将元素添加到容器中,也是我们常用的函数之一.

全局变量

只列举与putVal()函数有关的变量,太多了容易乱.


// 数组的最大容量,同HashMap一样
private static final int MAXIMUM_CAPACITY = 1 << 30;

// 默认数组初始化容量,同HashMap一样
private static final int DEFAULT_CAPACITY = 16;

// 数组最大容量,HashMap可转化为数组对象 : toArray(),跟putVal()函数无关,只是顺便贴出来
static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;

// 兼容1.8版本之前,1.8版本已经没有使用了,本文可以忽略,具体可以看看1.7版本的ConcurrentHashMap
private static final int DEFAULT_CONCURRENCY_LEVEL = 16;

// 兼容1.8版本之前,1.8版本已经没有使用了,本文可以忽略,具体可以看看1.7版本的ConcurrentHashMap
private static final float LOAD_FACTOR = 0.75f;

// 转化为红黑树的条件之一,同HashMap一样
static final int TREEIFY_THRESHOLD = 8;

// 红黑树退化为链表的条件,同HashMap一样
static final int UNTREEIFY_THRESHOLD = 6;

// 转化为红黑树的条件之一,同HashMap一样
static final int MIN_TREEIFY_CAPACITY = 64;

// 这个变量很重要,例如小于0、大于0都是分别处于不同的状态,下文再解释
private transient volatile int sizeCtl;

// 底层数组
transient volatile Node<K,V>[] table;

putVal() --- 简化分析

核心代码 : 我将putVal()函数分步骤来具体分析,核心代码没贴出来,这样看起来也不会那么乱,每个步骤会在下面分出来说明.


  final V putVal(K key, V value, boolean onlyIfAbsent) {
  
   if (key == null || value == null) throw new NullPointerException();
   // 计算hash值
   int hash = spread(key.hashCode());
   // 这个是跟扩容、转换红黑树有关的
   int binCount = 0;
   
   // 自旋的方式put元素
   for (Node<K,V>[] tab = table;;) {
            Node<K,V> f; int n, i, fh;
            1. // 如果table数组为空,则需要初始化 : initTable()
            if (tab == null || (n = tab.length) == 0){
             tab = initTable();
            }
            2. // table数组已经初始化过了
            // 根据计算出来的数组索引值对应的位置是空的,即没有哈希冲突,直接添加进去
            else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
             ......
             break;  
            }
            3. // 数组索引值位置不为空,存在结点
            // 走到这里如果条件为true,则表示当前容器正处于扩容状态,执行帮助扩容
            // 这个函数会独立一篇文章来讲,现在只知道ConcurrentHashMap的扩容是支持多线程扩容的
            else if ((fh = f.hash) == MOVED){
             ......
            }
            4. // 走到这里,说明已经初始化过了,并且此时是正常状态(不在扩容状态)
            // 那么就是哈希冲突了,哈希冲突依靠 链表+红黑树 解决
            else {
                4.1 // 给要自己加入到对应数组位置加锁,我们这里就说f对象
                synchronized (f) {
                    // 再判断一下对应数组位置的对象有没有被改变过
                    if (tabAt(tab, i) == f) {
                    
                        4.2 // 第3步骤已经把f对象的hash值已经设置给fh了
                        // 这里的fh的hash值大于0,那么就是一个链表
                        if (fh >= 0) {
                          ......
                        }
                        4.3 // f对象的hash值小于0,再判断一下f对象是否属于 TreeBin,即红黑树类型
                        // 3 和 4.2 和 4.3 步骤的这三个判断等下再给个解释
                        else if (f instanceof TreeBin) {
                          ......
                        }
                    }
                }
                
                5. // 判断链表是否大于等于TREEIFY_THRESHOLD,则转化红黑树
                if (binCount != 0) {
                    if (binCount >= TREEIFY_THRESHOLD)
                        5.1 // 这个函数还有一个转换红黑树的条件,即table的长度大于等于64
                        treeifyBin(tab, i);
                    if (oldVal != null)
                        return oldVal;
                    break;
                }
            }
        }
        
        6. // 添加容器的元素个数,会独立一篇文章
        addCount(1L, binCount);
        return null;
  
  }



按步骤梳理一下,通过一个一个步骤代码分析 : 采用自旋的方式添加元素

  • ① 如果table为空,则执行initTable()初始化,该函数等下会具体分析
  • ② 通过hash值找到要添加的table数组索引位置,如果当前数组索引位置是空的,代表没有哈希冲突,直接通过CAS添加到该数组索引位置,否则则表示数组索引位置不为空,往下执行
  • ③ 该数组索引位置的hash值等于MOVED,说明当前处于扩容状态,执行helpTransfer(tab, f)帮忙扩容,从这里可以看出ConcurrentHashMap是支持多线程扩容的,具体内容看这篇文章 : 并发容器:ConcurrentHashMap之transfer()源码分析(三) .
  • ④ 走到这里,说明已经初始化过了,并且此时是正常状态(不在扩容状态),那么就是哈希冲突了,哈希冲突是通过 链表 + 红黑树 解决的 :
    • 4.1 只给自己对应的数组索引位置加锁,这里就减少了锁住的临界资源,提高了并发度
    • 4.2 这里的fh的hash值大于0,那么就是一个链表,则循环遍历这条链表,如果没有相同的key,则将元素从尾部添加进去
    • 4.3 执行这里的话fh的hash值肯定是小于0,并且是TreeBin对象,即是一颗红黑树,将元素添加到红黑树对象中
  • ⑤ 判断链表是否大于等于TREEIFY_THRESHOLD(8),则转化红黑树
    • 5.1 还有一个转换红黑树的条件,即table的长度大于等于64
  • ⑥ 最后的一个步骤,添加容器的元素个数,具体内容看这篇文章 : 并发容器:ConcurrentHashMap之addCount()源码分析(二).

putVal() --- 步骤分析

步骤① --- initTable()初始化table

当第一个元素被添加进来的时候就需要执行initTable()初始化table :


    private final Node<K,V>[] initTable() {
        Node<K,V>[] tab; int sc;
        
        1. // table为null 需要初始化 
        while ((tab = table) == null || tab.length == 0) {
            2. // sizeCtl < 0 说明存在其他线程也在初始化table,则让出CPU时间片,本线程不执行初始化操作了
            // 该操作保证了多线程下只能一条线程完成初始化操作
            // sizeCtl不同状态有着不同的作用,这里我们看到的是小于0时存在其他线程在初始化table
            // (其实是sizeCtl == -1时才是处于初始化状态,至于-2 -3等其他负数是其他状态,我们先暂时只知道-1状态就行)
            if ((sc = sizeCtl) < 0)
                Thread.yield();
            3. // CAS 设置 sizeCtl为-1,成功则表示 sizeCtl == -1 ,告诉其他线程有线程在初始化了
            else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
                try {
                    if ((tab = table) == null || tab.length == 0) {
                        3.1 // 第2步骤已经把sizeCtl赋值给sc
                        // 当通过构造器初始化自定义数组容量的时候,这个自定义数组容量是赋值给sizeCtl的
                        // 如果是自定义初始化数组容量,则用自定义数组容量否则使用默认16(DEFAULT_CAPACITY)
                        int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
                        
                        3.2 // 初始化 table
                        @SuppressWarnings("unchecked")
                        Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
                        table = tab = nt;
                        
                        3.3 // 数组阈值 等于 数组长度 减去 数组长度右移2位(除以4) 
                        // 这里相当于 16 - (16/4) = 12
                        // 相当于使用默认的负载因子0.75
                        // 这里相结合HashMap的 负载因子默认0.75、阈值 = 负载因子 * 数组长度 就可以懂了
                        sc = n - (n >>> 2);
                        
                    }
                } finally {
                    // 设置数组扩容的阈值
                    sizeCtl = sc;
                }
                4. // 初始化完成  跳出循环
                break;
            }
        }
        5. // 返回tab
        return tab;
    }


步骤① --- 小结

  • 通过sizeCtl保证了多线程下只能一个线程进行初始化table.
  • sizeCtl 变量 :
    • -1 : 当前存在线程正在初始化table.
    • 大于0 : 前期还没初始化的情况下,通过构造器自定义数组长度,那么sizeCtl就为初始化数组的长度,后期初始化完成后,sizeCtl为扩容数组的阈值
    • 小于-1 : 本文暂时没用到,跳过

步骤② --- 不存在哈希冲突


    1. // 不存在哈希冲突,则直接添加到该数组索引值位置
    else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
        2. // CAS设置
        if (casTabAt(tab, i, null,new Node<K,V>(hash, key, value, null)))
                2.1 // 跳出自旋循环
                break;  
    }

步骤③ --- 当前是否处于扩容阶段

当前容器正处于扩容状态(其他线程在扩容),执行帮助扩容;这个函数独立一篇文章来讲(并发容器:ConcurrentHashMap之transfer()源码分析(三)),现在只知道ConcurrentHashMap的扩容是支持多线程扩容的


    1. // 当前容器正处于扩容状态,执行帮助扩容
    else if ((fh = f.hash) == MOVED) {
        2. // 帮忙扩容
        tab = helpTransfer(tab, f);
    }

步骤④ --- 哈希冲突、步骤⑤ --- 是否需要链表转化为红黑树

走到这里,说明已经初始化过了,并且此时是正常状态(不处于扩容状态);那么就是哈希冲突了,哈希冲突依靠 链表+红黑树 解决.

    
    else {
    
        V oldVal = null;
        
        ### 获取锁
        
        1. // 锁住对应的数组索引位置
        synchronized (f) {
        
            if (tabAt(tab, i) == f) {
            
                2. // 对应的数组索引位置的hash值为大于等于0,表示链表
                if (fh >= 0) {
                    binCount = 1;
                    3. // 遍历循环链表
                    for (Node<K,V> e = f;; ++binCount) {
                        K ek;
                        3.1 // 存在相同的key
                        if (e.hash == hash &&
                            ((ek = e.key) == key ||
                             (ek != null && key.equals(ek)))) {
                            oldVal = e.val;
                            3.2 // 是否替换 默认false,即替换
                            if (!onlyIfAbsent)
                                e.val = value;
                            break;
                        }
                        Node<K,V> pred = e;
                        3.3 // 已经遍历到尾结点
                        if ((e = e.next) == null) {
                            3.4 // 从尾结点插入
                            pred.next = new Node<K,V>(hash, key, value, null);
                            break;
                        }
                    }
                    
                }
                4. // 对应的数组索引位置的hash值小于0 且 是 红黑树
                else if (f instanceof TreeBin) {
                    Node<K,V> p;
                    binCount = 2;
                    4.1 // 添加到红黑树中
                    if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,value)) != null) {
                        oldVal = p.val;
                        4.2 // 是否替换 默认false,即替换
                        if (!onlyIfAbsent)
                            p.val = value;
                    }
                    
                }
                
            }
            
        }
        
        ### 释放锁
        
        5. // 是否需要转化为红黑树,同HashMap一个道理
        if (binCount != 0) {
            if (binCount >= TREEIFY_THRESHOLD)
                // 转化红黑树
                treeifyBin(tab, i);
            if (oldVal != null)
                return oldVal;
            break;
        }
        
    }

步骤④:哈希冲突 --- 小结

  • 解决哈希冲突的方法还是跟HashMap一样.
  • 锁住的是自己添加到相对应数组索引的位置,锁的临界资源减少了,并发度提高了.

步骤⑤:转换红黑树 --- 小结

  • 转换红黑树条件跟HashMap是一样的.
  • 为什么第⑤步骤的转化红黑树代码不放在链表一起执行 : 个人认为是减少锁的粒度,假如当前线程在第1步骤执行后达到了转化红黑树的条件,释放锁后准备转换红黑树,而此时其他线程哈希冲突到该数组索引位置,先获取到该数组索引位置的锁,进而继续执行插入链表操作(此时链表不止8个元素了),这个时候可能会存在多个线程执行treeifyBin()转化红黑树,但是该函数保证了多线程条件下只能一条线程执行转换红黑树,从而不会导致多个线程对同一个数组索引位置进行多次红黑树转换.综合可以知道 : 在此种高并发插入情况下,即使链表长度大于8了,在保证了多线程情况下不会导致多个线程对同一个数组索引位置进行多次红黑树转换,是没有关系的.
  • 转换红黑树还有另外一个判断条件 : (n = tab.length) < MIN_TREEIFY_CAPACITY,即数组长度没达到64是不转换红黑树的,而是执行扩容函数,此时多线程执行扩容函数也没事,因为ConcurrentHashMap支持多线程扩容.

红黑树会独立一篇文章来学习,这里只是顺便提及一下.

步骤⑥ --- 添加容器元素个数

addCount() : 主要做两件事:

  • 添加容器元素的长度(remove的时候参数传的是-1)
  • 检查是否需要扩容
  • 这个函数里面也用到了很多精髓,会独立一篇文章学习

数组存储的不同结点类型

  • 解释简化分析中存在的疑惑,即3 和 4.2 和 4.3 步骤的这三个判断 : 都代表着找到了数组索引位置,并且该位置存放的不同类型由hash来分类
    • 第③步骤 : (fh = f.hash) == MOVED,MOVED的值为-1 , 即 当前数组索引值位置存放的是一个hash == -1 的结点,那么表示其他线程在扩容,则执行帮忙扩容;(剧透 扩容函数transfer()创建的ForwardingNode结点就是使用了MOVED作为hash值,该结点继承了Node类).现在我们只需要知道如果发现数组索引位置的结点的hash == -1,那么肯定就是一个ForwardingNode结点,具体的会独立一篇扩容函数transfer()的文章,到时候就知道了,现在可暂时跳过.
    • 第4.2步骤 : fh >= 0,哈希值大于等于0,即代表着一条链表,即使用的是Node结点(spread()函数保证了hash值大于等于0)
    • 第4.3步骤 : f instanceof TreeBin,该数组索引值存储的是红黑树对象(TreeBin对象的哈希值是小于0的,可以看看构造器对应使用的hash值为TREEBIN(-2))

  可以总结为 : 数组存储的可能为null、链表(Node结点)、红黑树(TreeBin对象,对应的TreeNode结点)、ForwardingNode结点(多线程帮助扩容).

get()

get() : 根据key值获取value值


    public V get(Object key) {
        Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
        int h = spread(key.hashCode());
        
        1. // table不为null 且 根据hash值计算出对应的数组索引值位置不为null,说明该索引位置存在元素
        if ((tab = table) != null && (n = tab.length) > 0 &&
            (e = tabAt(tab, (n - 1) & h)) != null) {
            2. // 判断当前数组索引位置只存储一个元素,
            // 如果是则判断是否相同的key
            // 否则存在多个元素(可能是链表或者红黑树或者ForwardingNode结点)
            if ((eh = e.hash) == h) {
                if ((ek = e.key) == key || (ek != null && key.equals(ek)))
                    return e.val;
            }
            3. // 可能是红黑树或者ForwardingNode结点,执行各自结点实现的find()
            else if (eh < 0)
                return (p = e.find(h, key)) != null ? p.val : null;
            4. // 不是红黑树、ForwardingNode结点,则是链表了
            while ((e = e.next) != null) {
                if (e.hash == h &&
                    ((ek = e.key) == key || (ek != null && key.equals(ek))))
                    return e.val;
            }
            
        }
        return null;
    }

  结合上面的知识点,get()函数理解起来很简单,不同的结点(Node\TreeNode\ForwardingNode)实现的不同find(),有兴趣的可以自己去看,本文就不展开学习了.

总结

  • 数组存储的可能为null、链表(Node结点)、红黑树(TreeBin对象)、ForwardingNode结点(多线程帮助扩容),相比HashMap多了ForwardingNode结点,即支持多线程帮助扩容.
  • sizeCtl : 该变量可以表示多种状态 :
    • -1 : 存在线程正在初始化table;
    • 大于0 : 前期还没初始化的情况下,通过构造器自定义数组长度,那么sizeCtl就为初始化数组的长度,后期初始化完成后,sizeCtl为扩容数组的阈值;
    • 小于0 : 本文暂时没用到,跳过

结束语

  • 原创不易
  • 希望看完这篇文章的你有所收获!

相关参考资料

  • JDK1.8

目录