JDK1.8-ConcurrentHashMap详解

共 4866字,需浏览 10分钟

 ·

2022-04-21 03:12

ava JDK升级到1.8后有些集合类的实现有了变化,其中ConcurrentHashMap就有进行结构上的大调整。jdk1.6、1.7实现的共同点主要是通过采用分段锁Segment减少热点域来提高并发效率,1.8版本的实现有哪些变化呢?

重要概念

在正式研究前,我们需要先知道几个重要参数,提前说明其值所代表的意义以便更好的讲解源码实现。

table

所有数据都存在table中,table的容量会根据实际情况进行扩容,table[i]存放的数据类型有以下3种:

  • TreeBin 用于包装红黑树结构的结点类型
  • ForwardingNode 扩容时存放的结点类型,并发扩容的实现关键之一
  • Node 普通结点类型,表示链表头结点

nextTable

扩容时用于存放数据的变量,扩容完成后会置为null。

sizeCtl

以volatile修饰的sizeCtl用于数组初始化与扩容控制,它有以下几个值:

当前未初始化:
 = 0  //未指定初始容量
 > 0  //由指定的初始容量计算而来,再找最近的2的幂次方。
  //比如传入6,计算公式为6+6/2+1=10,最近的2的幂次方为16,所以sizeCtl就为16。
初始化中:
 = -1 //table正在初始化
 = -N //N是int类型,分为两部分,高15位是指定容量标识,低16位表示
      //并行扩容线程数+1,具体在resizeStamp函数介绍。
初始化完成:
 =table.length * 0.75  //扩容阈值调为table容量大小的0.75倍

其它的分析相应源码时再细说。

put()

public V put(K key, V value) {
    return putVal(key, value, false);
}

final V putVal(K key, V value, boolean onlyIfAbsent) {
    if (key == null || value == nullthrow new NullPointerException();
    int hash = spread(key.hashCode());   //@1,讲解见下面小标题。
     //i处结点数量,2: TreeBin或链表结点数, 其它:链表结点数。主要用于每次加入结点后查看是否要由链表转为红黑树
    int binCount = 0
    for (Node[] tab = table;;) {   //CAS经典写法,不成功无限重试,让再次进行循环进行相应操作。
        Node f; int n, i, fh;
        //除非构造时指定初始化集合,否则默认构造不初始化table,所以需要在添加时元素检查是否需要初始化。
        if (tab == null || (n = tab.length) == 0)
            tab = initTable();  //@2
        //CAS操作得到对应table中元素
        else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) { //@3
            if (casTabAt(tab, i, null,
                         new Node(hash, key, value, null)))
                break;                   //null创建Node对象做为链表首结点
        }
        else if ((fh = f.hash) == MOVED)  //当前结点正在扩容
         //让当前线程调用helpTransfer也参与到扩容过程中来,扩容完毕后tab指向新table。
            tab = helpTransfer(tab, f); 
        else {
            V oldVal = null;
            synchronized (f) {
                if (tabAt(tab, i) == f) {  //双重检查i处结点未变化
                    if (fh >= 0) {  //表明是链表结点类型,hash值是大于0的,即spread()方法计算而来
                        binCount = 1;
                        for (Node e = f;; ++binCount) {
                            K ek;
                            if (e.hash == hash &&
                                ((ek = e.key) == key ||
                                 (ek != null && key.equals(ek)))) {
                                oldVal = e.val;
                                //onlyIfAbsent表示是新元素才加入,旧值不替换,默认为fase。
                                if (!onlyIfAbsent)  
                                    e.val = value;
                                break;
                            }
                            Node pred = e;
                            if ((e = e.next) == null) {
                                //jdk1.8版本是把新结点加入链表尾部,next由volatile修饰
                                pred.next = new Node(hash, key,
                                                          value, null);
                                break;
                            }
                        }
                    }
                    else if (f instanceof TreeBin) {  //红黑树结点类型
                        Node p;
                        binCount = 2;
                        if ((p = ((TreeBin)f).putTreeVal(hash, key,
                                                       value)) != null) {   //@4
                            oldVal = p.val;
                            if (!onlyIfAbsent)
                                p.val = value;
                        }
                    }
                }
            }
            if (binCount != 0) {
                if (binCount >= TREEIFY_THRESHOLD)  //默认桶中结点数超过8个数据结构会转为红黑树
                    treeifyBin(tab, i);   //@5
                if (oldVal != null)
                    return oldVal;
                break;
            }
        }
    }
    addCount(1L, binCount);  //更新size,检测扩容
    return null;
}

注释已说的比较明白,上面的代码中的数字注释再单独细说下:

spread()

jdk1.8的hash策略,与以往版本一样都是为了减少hash冲突:

static final int HASH_BITS = 0x7fffffff// usable bits of normal node hash   //01111111_11111111_11111111_11111111

static final int spread(int h) {
    //无符号右移加入高位影响,与HASH_BITS做与操作保留对hash有用的比特位,有让hash>0的意思
    return (h ^ (h >>> 16)) & HASH_BITS;
}

initTable()

initTable()用于里面table数组的初始化,值得一提的是table初始化是没有加锁的,那么如何处理并发呢?

由下面代码可以看到,当要初始化时会通过CAS操作将sizeCtl置为-1,而sizeCtl由volatile修饰,保证修改对后面线程可见。

这之后如果再有线程执行到此方法时检测到sizeCtl为负数,说明已经有线程在给扩容了,这个线程就会调用Thread.yield()让出一次CPU执行时间。

private final Node[] initTable() {
        Node[] tab; int sc;
        while ((tab = table) == null || tab.length == 0) {
            if ((sc = sizeCtl) < 0)
                Thread.yield(); 
                //正在初始化时将sizeCtl设为-1
            else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
                try {
                    if ((tab = table) == null || tab.length == 0) {
                        int n = (sc > 0) ? sc : DEFAULT_CAPACITY;  //DEFAULT_CAPACITY为16
                        @SuppressWarnings("unchecked")
                        Node[] nt = (Node[])new Node[n];
                        table = tab = nt;
                        sc = n - (n >>> 2);   //扩容阈值为新容量的0.75倍
                    }
                } finally {
                    sizeCtl = sc;   //扩容保护
                }
                break;
            }
        }
        return tab;
    }

tabAt()/casTabAt()/setTabAt()

ABASE表示table中首个元素的内存偏移地址,所以(long)i << ASHIFT) + ABASE得到table[i]的内存偏移地址:

static final  Node tabAt(Node[] tab, int i) {
    return (Node)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE);
}

static final  boolean casTabAt(Node[] tab, int i,
                                    Node c, Node v)
 
{
    return U.compareAndSwapObject(tab, ((long)i << ASHIFT) + ABASE, c, v);
}

static final  void setTabAt(Node[] tab, int i, Node v) {
    U.putObjectVolatile(tab, ((long)i << ASHIFT) + ABASE, v);
}

对i位置结点的写操作有两个方法,casTabAt()与setTabAt()。源码中有这样一段注释:

* Note that calls to setTabAt always occur within locked regions,
* and so in principle require only release ordering, not
* full volatile semantics, but are currently coded as volatile
* writes to be conservative.

所以要原子语义的写操作需要使用casTabAt(),setTabAt()是在锁定桶的状态下才会被调用,之所以实现成这样只是带保守性的一种写法而已。放松一下继续~

5dc2451b18861972a54b4ed6a63c34b5.webp

TreeBin

注释4、5都是有关TreeBin的操作,为进一步提升性能,ConcurrentHashMap引入了红黑树。引入红黑树是因为链表查询的时间复杂度为O(n),红黑树查询的时间复杂度为O(log(n)),所以在结点比较多的情况下使用红黑树可以大大提升性能。

红黑树是一种自平衡二叉查找树,有如下性质:

  • 每个节点要么是红色,要么是黑色。
  • 根节点永远是黑色的。
  • 所有的叶节点都是空节点(即 null),并且是黑色的。
  • 每个红色节点的两个子节点都是黑色。(从每个叶子到- 根的路径上不会有两个连续的红色节点)
  • 从任一节点到其每个叶子的所有简单路径都包含相同数目的黑色节点。

图例:

8d2d217402ef8cd57a4008d7a1c3aa92.webp

treeifyBin()

桶内元素超时8个时会调用到此方法。

private final void treeifyBin(Node[] tab, int index) {
        Node b; int n, sc;
        if (tab != null) {
            if ((n = tab.length) < MIN_TREEIFY_CAPACITY)
                tryPresize(n << 1);  //如果数组整体容量太小则去扩容,放弃转红黑树结构
            else if ((b = tabAt(tab, index)) != null && b.hash >= 0) {
                synchronized (b) {
                    if (tabAt(tab, index) == b) {
                        TreeNode hd = null, tl = null;
                        for (Node e = b; e != null; e = e.next) {
                            TreeNode p =
                                new TreeNode(e.hash, e.key, e.val,
                                                  nullnull);
                            if ((p.prev = tl) == null)
                                hd = p;
                            else
                                tl.next = p;
                            tl = p;
                        }
                        setTabAt(tab, index, new TreeBin(hd));

可以看出按原Node链表顺序转为了TreeNode链表,每个TreeNode的prev、next已完备,传入头结点hd构造红黑树。

TreeBin构造函数

TreeBin(TreeNode b) {
   //Node(int hash, K key, V val, Node next)
            super(TREEBIN, nullnullnull);
            this.first = b;
            TreeNode r = null;
            for (TreeNode x = b, next; x != null; x = next) {  //依次处理每个结点
                next = (TreeNode)x.next;
                x.left = x.right = null;
                if (r == null) {
                    x.parent = null;
                    x.red = false;  //根结点为黑色
                    r = x;
                }
                else {
                    K k = x.key;
                    int h = x.hash;
                    Class kc = null;
                    for (TreeNode p = r;;) { //遍历查找新结点存放位置
                        int dir, ph;
                        K pk = p.key;
                        if ((ph = p.hash) > h)
                            dir = -1;
                        else if (ph < h)
                            dir = 1;
                        //key有实现Comparable接口则使用compareTo()进行比较,否则采用tieBreakOrder中默认的比较方式,即比较hashCode。
                        else if ((kc == null &&
                                  (kc = comparableClassFor(k)) == null) ||
                                 (dir = compareComparables(kc, k, pk)) == 0)
                            dir = tieBreakOrder(k, pk);
                            TreeNode xp = p;
                        if ((p = (dir <= 0) ? p.left : p.right) == null) {  //左子节点或右子节点为空则在p下添加新结点,否则p的值更新为子节点继续查找。红黑树中结点p.left <= p <= p.right
                            x.parent = xp;  //保存新结点的父结点
                            if (dir <= 0)
                                xp.left = x; //排序小的放左边
                            else
                                xp.right = x;  //排序大的放右边
                            r = balanceInsertion(r, x);  //平衡红黑树
                            break;
...
            this.root = r;
...
        }

putTreeVal()与此方法遍历方式类似不再介绍。

扩容实现

写这篇文章主要就是想讲讲扩容,Let’s go!

什么时候会扩容?

使用put()添加元素时会调用addCount(),内部检查sizeCtl看是否需要扩容。

tryPresize()被调用,此方法被调用有两个调用点:

  • 链表转红黑树(put()时检查)时如果table容量小于64(MIN_TREEIFY_CAPACITY),则会触发扩容。
  • 调用putAll()之类一次性加入大量元素,会触发扩容。

addCount()

addCount()与tryPresize()实现很相似,我们先以addCount()分析下扩容逻辑:

private final void addCount(long x, int check) {
    ...
    //check就是结点数量,有新元素加入成功才检查是否要扩容。
    if (check >= 0) {
        Node[] tab, nt; int n, sc;
        //s表示加入新元素后容量大小,计算已省略。
        //新容量大于当前扩容阈值并且小于最大扩容值才扩容,如果tab=null说明正在初始化,死循环等待初始化完成。
        while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
               (n = tab.length) < MAXIMUM_CAPACITY) {
            int rs = resizeStamp(n);  //@1
            //sc<0表示已经有线程在进行扩容工作
            if (sc < 0) {
                //条件1:检查是对容量n的扩容,保证sizeCtl与n是一块修改好的
                //条件2与条件3:应该是进行sc的最小值或最大值判断。
                //条件4与条件5: 确保tranfer()中的nextTable相关初始化逻辑已走完。
                if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
                    sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
                    transferIndex <= 0)
                    break;
                if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))  //有新线程参与扩容则sizeCtl加1
                    transfer(tab, nt);
            }
            //没有线程在进行扩容,将sizeCtl的值改为(rs << RESIZE_STAMP_SHIFT) + 2),原因见下面sizeCtl值的计算分析。
            else if (U.compareAndSwapInt(this, SIZECTL, sc,
                                         (rs << RESIZE_STAMP_SHIFT) + 2))
                transfer(tab, null);
            s = sumCount();
        }
    }
}

resizeStamp()

在上面的代码中首先有调用到这样的一个方法。

/**
 * The number of bits used for generation stamp in sizeCtl.
 * Must be at least 6 for 32bit arrays.
*/

private static int RESIZE_STAMP_BITS = 16;

/**
 * The bit shift for recording size stamp in sizeCtl.
 */

private static final int RESIZE_STAMP_SHIFT = 32 - RESIZE_STAMP_BITS;

static final int resizeStamp(int n) {
        return Integer.numberOfLeadingZeros(n) | (1 << (RESIZE_STAMP_BITS - 1));
}

Integer.numberOfLeadingZeros(n)用于计算n转换成二进制后前面有几个0。这个有什么作用呢?

首先ConcurrentHashMap的容量必定是2的幂次方,所以不同的容量n前面0的个数必然不同,这样可以保证是在原容量为n的情况下进行扩容。(搜索公众号Java知音,回复“2021”,送你一份Java面试题宝典)

(1 << (RESIZE_STAMP_BITS - 1)即是1<<15,表示为二进制即是高16位为0,第16位为1:

0000 0000 0000 0000 1000 0000 0000 0000

所以resizeStamp()的返回值(简称为rs) 高16位置0,第16位为1,低15位存放当前容量n扩容标识,用于表示是对n的扩容。rs与RESIZE_STAMP_SHIFT配合可以求出新的sizeCtl的值,分情况如下:

  • sc >= 0表示没有线程在扩容,使用CAS将sizeCtl的值改为(rs << RESIZE_STAMP_SHIFT) + 2)。
  • sc < 0已经有线程在扩容,将sizeCtl+1并调用transfer()让当前线程参与扩容。

rs即resizeStamp(n),如当前容量为8时sc(sizeCtl)的计算过程如下:

//容量n=8
0000 0000 0000 0000 0000 0000 0000 1000
//Integer.numberOfLeadingZeros(8)=28,二进制表示如下:
0000 0000 0000 0000 0000 0000 0001 1100
//rs
0000 0000 0000 0000 1000 0000 0001 1100
//temp = rs << RESIZE_STAMP_SHIFT,即 temp = rs << 16,左移16后temp最高位为1,所以temp成了一个负数。
1000 0000 0001 1100 0000 0000 0000 0000
//第一个线程要扩容时,sc = (rs << RESIZE_STAMP_SHIFT) + 2)
1000 0000 0001 1100 0000 0000 0000 0010

那么在扩容时sizeCtl值的意义便如下图所示:

361cd28d71957f353a8402ad257c89b6.webp

tryPresize()

private final void tryPresize(int size) {
        //根据传入的size计算出真正的新容量,新容量需要是2的幂次方。
        int c = (size >= (MAXIMUM_CAPACITY >>> 1)) ? MAXIMUM_CAPACITY :
            tableSizeFor(size + (size >>> 1) + 1);
        int sc;
        while ((sc = sizeCtl) >= 0) {
            Node[] tab = table; int n;
            if (tab == null || (n = tab.length) == 0) {
                n = (sc > c) ? sc : c;   //table未初始化则给一个初始容量
                //后面相似代码不再讲解
                if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
                    try {
                        if (table == tab) {
                            @SuppressWarnings("unchecked")
                            Node[] nt = (Node[])new Node[n];
                            table = nt;
                            sc = n - (n >>> 2);
                        }
                    } finally {
                        sizeCtl = sc;
                    }
                }
            }
            else if (c <= sc || n >= MAXIMUM_CAPACITY)
                break;
            else if (tab == table) {
                int rs = resizeStamp(n);
                if (sc < 0) {
                    Node[] nt;
                    if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
                        sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
                        transferIndex <= 0)
                        break;
                    if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
                        //传入指定容量
                        transfer(tab, nt);
                }
                else if (U.compareAndSwapInt(this, SIZECTL, sc,
                                             (rs << RESIZE_STAMP_SHIFT) + 2))
                    transfer(tab, null);
            }
        }
    }

transfer()

jdk1.8版本的ConcurrentHashMap支持并发扩容,上面已经分析了一小部分,下面这个方法是真正进行并行扩容的地方。

private final void transfer(Node[] tab, Node[] nextTab) {
        int n = tab.length, stride;
        if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)  //每个线程处理桶的最小数目,可以看出核数越高步长越小,最小16个。
            stride = MIN_TRANSFER_STRIDE; // subdivide range
        if (nextTab == null) {
            try {
                @SuppressWarnings("unchecked")
                Node[] nt = (Node[])new Node[n << 1];  //扩容到2倍
                nextTab = nt;
            } catch (Throwable ex) {      // try to cope with OOME
                sizeCtl = Integer.MAX_VALUE;  //扩容保护
                return;
            }
            nextTable = nextTab;
            transferIndex = n;  //扩容总进度,>=transferIndex的桶都已分配出去。
        }
        int nextn = nextTab.length;
          //扩容时的特殊节点,标明此节点正在进行迁移,扩容期间的元素查找要调用其find()方法在nextTable中查找元素。
        ForwardingNode fwd = new ForwardingNode(nextTab); 
        //当前线程是否需要继续寻找下一个可处理的节点
        boolean advance = true;
        boolean finishing = false//所有桶是否都已迁移完成。
        for (int i = 0, bound = 0;;) {
            Node f; int fh;
            //此循环的作用是确定当前线程要迁移的桶的范围或通过更新i的值确定当前范围内下一个要处理的节点。
            while (advance) {
                int nextIndex, nextBound;
                if (--i >= bound || finishing)  //每次循环都检查结束条件
                    advance = false;
                //迁移总进度<=0,表示所有桶都已迁移完成。
                else if ((nextIndex = transferIndex) <= 0) {  
                    i = -1;
                    advance = false;
                }
                else if (U.compareAndSwapInt
                         (this, TRANSFERINDEX, nextIndex,
                          nextBound = (nextIndex > stride ?
                                       nextIndex - stride : 0))) {  //transferIndex减去已分配出去的桶。
                    //确定当前线程每次分配的待迁移桶的范围为[bound, nextIndex)
                    bound = nextBound;
                    i = nextIndex - 1;
                    advance = false;
                }
            }
            //当前线程自己的活已经做完或所有线程的活都已做完,第二与第三个条件应该是下面让"i = n"后,再次进入循环时要做的边界检查。
            if (i < 0 || i >= n || i + n >= nextn) {
                int sc;
                if (finishing) {  //所有线程已干完活,最后才走这里。
                    nextTable = null;
                    table = nextTab;  //替换新table
                    sizeCtl = (n << 1) - (n >>> 1); //调sizeCtl为新容量0.75倍。
                    return;
                }
                //当前线程已结束扩容,sizeCtl-1表示参与扩容线程数-1。
                if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
                 //还记得addCount()处给sizeCtl赋的初值吗?相等时说明没有线程在参与扩容了,置finishing=advance=true,为保险让i=n再检查一次。
                    if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)   
                        return;
                    finishing = advance = true;
                    i = n; // recheck before commit
                }
            }
            else if ((f = tabAt(tab, i)) == null)
                advance = casTabAt(tab, i, null, fwd);  //如果i处是ForwardingNode表示第i个桶已经有线程在负责迁移了。
            else if ((fh = f.hash) == MOVED)
                advance = true// already processed
            else {
                synchronized (f) {  //桶内元素迁移需要加锁。
                    if (tabAt(tab, i) == f) {
                        Node ln, hn;
                        if (fh >= 0) {  //>=0表示是链表结点
                            //由于n是2的幂次方(所有二进制位中只有一个1),如n=16(0001 0000),第4位为1,那么hash&n后的值第4位只能为0或1。所以可以根据hash&n的结果将所有结点分为两部分。
                            int runBit = fh & n;
                            Node lastRun = f;
                            //找出最后一段完整的fh&n不变的链表,这样最后这一段链表就不用重新创建新结点了。
                            for (Node p = f.next; p != null; p = p.next) {
                                int b = p.hash & n;
                                if (b != runBit) {
                                    runBit = b;
                                    lastRun = p;
                                }
                            }
                            if (runBit == 0) {
                                ln = lastRun;
                                hn = null;
                            }
                            else {
                                hn = lastRun;
                                ln = null;
                            }
                            //lastRun之前的结点因为fh&n不确定,所以全部需要重新迁移。
                            for (Node p = f; p != lastRun; p = p.next) {
                                int ph = p.hash; K pk = p.key; V pv = p.val;
                                if ((ph & n) == 0)
                                    ln = new Node(ph, pk, pv, ln);
                                else
                                    hn = new Node(ph, pk, pv, hn);
                            }
                            //低位链表放在i处
                            setTabAt(nextTab, i, ln);
                            //高位链表放在i+n处
                            setTabAt(nextTab, i + n, hn);
                            setTabAt(tab, i, fwd);  //在原table中设置ForwardingNode节点以提示该桶扩容完成。
                            advance = true;
                        }
                        else if (f instanceof TreeBin) { //红黑树处理。
                            ...

helpTransfer()

添加、删除节点之处都会检测到table的第i个桶是ForwardingNode的话会调用helpTransfer()方法。

final Node[] helpTransfer(Node[] tab, Node f) {
        Node[] nextTab; int sc;
        if (tab != null && (f instanceof ForwardingNode) &&
            (nextTab = ((ForwardingNode)f).nextTable) != null) {
            int rs = resizeStamp(tab.length);
            while (nextTab == nextTable && table == tab &&
                   (sc = sizeCtl) < 0) {
                if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
                    sc == rs + MAX_RESIZERS || transferIndex <= 0)
                    break;
                if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) {
                    transfer(tab, nextTab);
                    break;
                }
            }
            return nextTab;
        }
        return table;
    }

并发扩容总结

  • 单线程新建nextTable,新容量一般为原table容量的两倍。
  • 每个线程想增/删元素时,如果访问的桶是ForwardingNode节点,则表明当前正处于扩容状态,协助一起扩容完成后再完成相应的数据更改操作。
  • 扩容时将原table的所有桶倒序分配,每个线程每次最小分配16个桶,防止资源竞争导致的效率下降。单个桶内元素的迁移是加锁的,但桶范围处理分配可以多线程,在没有迁移完成所有桶之前每个线程需要重复获取迁移桶范围,直至所有桶迁移完成。
  • 一个旧桶内的数据迁移完成但不是所有桶都迁移完成时,查询数据委托给ForwardingNode结点查询nextTable完成(这个后面看find()分析)。
  • 迁移过程中sizeCtl用于记录参与扩容线程的数量,全部迁移完成后sizeCtl更新为新table容量的0.75倍。

扩容节结束!其它常用操作再说下。(搜索公众号Java知音,回复“2021”,送你一份Java面试题宝典)

get()

public V get(Object key) {
        Node[] tab; Node e, p; int n, eh; K ek;
        int h = spread(key.hashCode());
        if ((tab = table) != null && (n = tab.length) > 0 &&
            (e = tabAt(tab, (n - 1) & h)) != null) {
            if ((eh = e.hash) == h) {  //总是检查头结点
                if ((ek = e.key) == key || (ek != null && key.equals(ek)))
                    return e.val;
            }
            else if (eh < 0)  //在迁移或都是TreeBin
                return (p = e.find(h, key)) != null ? p.val : null;
            while ((e = e.next) != null) {  //链表则直接遍历查询
                if (e.hash == h &&
                    ((ek = e.key) == key || (ek != null && key.equals(ek))))
                    return e.val;
            }
        }
        return null;
    }

remove()

public V remove(Object key) {
    return replaceNode(key, nullnull);
}

final V replaceNode(Object key, V value, Object cv) {
    int hash = spread(key.hashCode());
    for (Node[] tab = table;;) {
        Node f; int n, i, fh;
        if (tab == null || (n = tab.length) == 0 ||
            (f = tabAt(tab, i = (n - 1) & hash)) == null)
            break;
        else if ((fh = f.hash) == MOVED)  //删除时也需要确实扩容完成后才可以操作。
            tab = helpTransfer(tab, f);
        else {
            V oldVal = null;
            boolean validated = false;
            synchronized (f) {
                if (tabAt(tab, i) == f) {
                    if (fh >= 0) {
                        validated = true;
                        for (Node e = f, pred = null;;) {
                            K ek;
                            if (e.hash == hash &&
                                ((ek = e.key) == key ||
                                 (ek != null && key.equals(ek)))) {
                                V ev = e.val;
                                if (cv == null || cv == ev ||
                                    (ev != null && cv.equals(ev))) {  //cv不为null则替换,否则是删除。
                                    oldVal = ev;
                                    if (value != null)
                                        e.val = value;
                                    else if (pred != null)
                                        pred.next = e.next;
                                    else
                                        //没前置节点就是头节点
                                        setTabAt(tab, i, e.next);
                                }
                                break;
                            }
                            pred = e;
                            if ((e = e.next) == null)
                                break;
                        }
                    }
                    else if (f instanceof TreeBin) {
                        //...
                    }
                }
            }
            if (validated) {
                if (oldVal != null) {
                    if (value == null)
                        addCount(-1L, -1);
                    return oldVal;
                }
                break;
            }
        }
    }
    return null;
}

主要改进

CAS无锁算法与synchronized保证并发安全,支持并发扩容,数据结构变更为数组+链表+红黑树,提高性能。

jdk1.8版弃用变量

  • Segment只有序列化时会用到。

  • loadFactor仅用于构造函数中设定初始容量,已不能影响扩容阈值,jdk1.8版本中阈值计算基本恒定为0.75。

  • concurrencyLevel只能影响初始容量,table容量大小与它无关。

ava JDK升级到1.8后有些集合类的实现有了变化,其中ConcurrentHashMap就有进行结构上的大调整。jdk1.6、1.7实现的共同点主要是通过采用分段锁Segment减少热点域来提高并发效率,1.8版本的实现有哪些变化呢?

重要概念

在正式研究前,我们需要先知道几个重要参数,提前说明其值所代表的意义以便更好的讲解源码实现。

table

所有数据都存在table中,table的容量会根据实际情况进行扩容,table[i]存放的数据类型有以下3种:

  • TreeBin 用于包装红黑树结构的结点类型
  • ForwardingNode 扩容时存放的结点类型,并发扩容的实现关键之一
  • Node 普通结点类型,表示链表头结点

nextTable

扩容时用于存放数据的变量,扩容完成后会置为null。

sizeCtl

以volatile修饰的sizeCtl用于数组初始化与扩容控制,它有以下几个值:

当前未初始化:
 = 0  //未指定初始容量
 > 0  //由指定的初始容量计算而来,再找最近的2的幂次方。
  //比如传入6,计算公式为6+6/2+1=10,最近的2的幂次方为16,所以sizeCtl就为16。
初始化中:
 = -1 //table正在初始化
 = -N //N是int类型,分为两部分,高15位是指定容量标识,低16位表示
      //并行扩容线程数+1,具体在resizeStamp函数介绍。
初始化完成:
 =table.length * 0.75  //扩容阈值调为table容量大小的0.75倍

其它的分析相应源码时再细说。

put()

public V put(K key, V value) {
    return putVal(key, value, false);
}

final V putVal(K key, V value, boolean onlyIfAbsent) {
    if (key == null || value == nullthrow new NullPointerException();
    int hash = spread(key.hashCode());   //@1,讲解见下面小标题。
     //i处结点数量,2: TreeBin或链表结点数, 其它:链表结点数。主要用于每次加入结点后查看是否要由链表转为红黑树
    int binCount = 0
    for (Node[] tab = table;;) {   //CAS经典写法,不成功无限重试,让再次进行循环进行相应操作。
        Node f; int n, i, fh;
        //除非构造时指定初始化集合,否则默认构造不初始化table,所以需要在添加时元素检查是否需要初始化。
        if (tab == null || (n = tab.length) == 0)
            tab = initTable();  //@2
        //CAS操作得到对应table中元素
        else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) { //@3
            if (casTabAt(tab, i, null,
                         new Node(hash, key, value, null)))
                break;                   //null创建Node对象做为链表首结点
        }
        else if ((fh = f.hash) == MOVED)  //当前结点正在扩容
         //让当前线程调用helpTransfer也参与到扩容过程中来,扩容完毕后tab指向新table。
            tab = helpTransfer(tab, f); 
        else {
            V oldVal = null;
            synchronized (f) {
                if (tabAt(tab, i) == f) {  //双重检查i处结点未变化
                    if (fh >= 0) {  //表明是链表结点类型,hash值是大于0的,即spread()方法计算而来
                        binCount = 1;
                        for (Node e = f;; ++binCount) {
                            K ek;
                            if (e.hash == hash &&
                                ((ek = e.key) == key ||
                                 (ek != null && key.equals(ek)))) {
                                oldVal = e.val;
                                //onlyIfAbsent表示是新元素才加入,旧值不替换,默认为fase。
                                if (!onlyIfAbsent)  
                                    e.val = value;
                                break;
                            }
                            Node pred = e;
                            if ((e = e.next) == null) {
                                //jdk1.8版本是把新结点加入链表尾部,next由volatile修饰
                                pred.next = new Node(hash, key,
                                                          value, null);
                                break;
                            }
                        }
                    }
                    else if (f instanceof TreeBin) {  //红黑树结点类型
                        Node p;
                        binCount = 2;
                        if ((p = ((TreeBin)f).putTreeVal(hash, key,
                                                       value)) != null) {   //@4
                            oldVal = p.val;
                            if (!onlyIfAbsent)
                                p.val = value;
                        }
                    }
                }
            }
            if (binCount != 0) {
                if (binCount >= TREEIFY_THRESHOLD)  //默认桶中结点数超过8个数据结构会转为红黑树
                    treeifyBin(tab, i);   //@5
                if (oldVal != null)
                    return oldVal;
                break;
            }
        }
    }
    addCount(1L, binCount);  //更新size,检测扩容
    return null;
}

注释已说的比较明白,上面的代码中的数字注释再单独细说下:

spread()

jdk1.8的hash策略,与以往版本一样都是为了减少hash冲突:

static final int HASH_BITS = 0x7fffffff// usable bits of normal node hash   //01111111_11111111_11111111_11111111

static final int spread(int h) {
    //无符号右移加入高位影响,与HASH_BITS做与操作保留对hash有用的比特位,有让hash>0的意思
    return (h ^ (h >>> 16)) & HASH_BITS;
}

initTable()

initTable()用于里面table数组的初始化,值得一提的是table初始化是没有加锁的,那么如何处理并发呢?

由下面代码可以看到,当要初始化时会通过CAS操作将sizeCtl置为-1,而sizeCtl由volatile修饰,保证修改对后面线程可见。

这之后如果再有线程执行到此方法时检测到sizeCtl为负数,说明已经有线程在给扩容了,这个线程就会调用Thread.yield()让出一次CPU执行时间。

private final Node[] initTable() {
        Node[] tab; int sc;
        while ((tab = table) == null || tab.length == 0) {
            if ((sc = sizeCtl) < 0)
                Thread.yield(); 
                //正在初始化时将sizeCtl设为-1
            else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
                try {
                    if ((tab = table) == null || tab.length == 0) {
                        int n = (sc > 0) ? sc : DEFAULT_CAPACITY;  //DEFAULT_CAPACITY为16
                        @SuppressWarnings("unchecked")
                        Node[] nt = (Node[])new Node[n];
                        table = tab = nt;
                        sc = n - (n >>> 2);   //扩容阈值为新容量的0.75倍
                    }
                } finally {
                    sizeCtl = sc;   //扩容保护
                }
                break;
            }
        }
        return tab;
    }

tabAt()/casTabAt()/setTabAt()

ABASE表示table中首个元素的内存偏移地址,所以(long)i << ASHIFT) + ABASE得到table[i]的内存偏移地址:

static final  Node tabAt(Node[] tab, int i) {
    return (Node)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE);
}

static final  boolean casTabAt(Node[] tab, int i,
                                    Node c, Node v)
 
{
    return U.compareAndSwapObject(tab, ((long)i << ASHIFT) + ABASE, c, v);
}

static final  void setTabAt(Node[] tab, int i, Node v) {
    U.putObjectVolatile(tab, ((long)i << ASHIFT) + ABASE, v);
}

对i位置结点的写操作有两个方法,casTabAt()与setTabAt()。源码中有这样一段注释:

* Note that calls to setTabAt always occur within locked regions,
* and so in principle require only release ordering, not
* full volatile semantics, but are currently coded as volatile
* writes to be conservative.

所以要原子语义的写操作需要使用casTabAt(),setTabAt()是在锁定桶的状态下才会被调用,之所以实现成这样只是带保守性的一种写法而已。放松一下继续~

5dc2451b18861972a54b4ed6a63c34b5.webp

TreeBin

注释4、5都是有关TreeBin的操作,为进一步提升性能,ConcurrentHashMap引入了红黑树。引入红黑树是因为链表查询的时间复杂度为O(n),红黑树查询的时间复杂度为O(log(n)),所以在结点比较多的情况下使用红黑树可以大大提升性能。

红黑树是一种自平衡二叉查找树,有如下性质:

  • 每个节点要么是红色,要么是黑色。
  • 根节点永远是黑色的。
  • 所有的叶节点都是空节点(即 null),并且是黑色的。
  • 每个红色节点的两个子节点都是黑色。(从每个叶子到- 根的路径上不会有两个连续的红色节点)
  • 从任一节点到其每个叶子的所有简单路径都包含相同数目的黑色节点。

图例:

8d2d217402ef8cd57a4008d7a1c3aa92.webp

treeifyBin()

桶内元素超时8个时会调用到此方法。

private final void treeifyBin(Node[] tab, int index) {
        Node b; int n, sc;
        if (tab != null) {
            if ((n = tab.length) < MIN_TREEIFY_CAPACITY)
                tryPresize(n << 1);  //如果数组整体容量太小则去扩容,放弃转红黑树结构
            else if ((b = tabAt(tab, index)) != null && b.hash >= 0) {
                synchronized (b) {
                    if (tabAt(tab, index) == b) {
                        TreeNode hd = null, tl = null;
                        for (Node e = b; e != null; e = e.next) {
                            TreeNode p =
                                new TreeNode(e.hash, e.key, e.val,
                                                  nullnull);
                            if ((p.prev = tl) == null)
                                hd = p;
                            else
                                tl.next = p;
                            tl = p;
                        }
                        setTabAt(tab, index, new TreeBin(hd));

可以看出按原Node链表顺序转为了TreeNode链表,每个TreeNode的prev、next已完备,传入头结点hd构造红黑树。

TreeBin构造函数

TreeBin(TreeNode b) {
   //Node(int hash, K key, V val, Node next)
            super(TREEBIN, nullnullnull);
            this.first = b;
            TreeNode r = null;
            for (TreeNode x = b, next; x != null; x = next) {  //依次处理每个结点
                next = (TreeNode)x.next;
                x.left = x.right = null;
                if (r == null) {
                    x.parent = null;
                    x.red = false;  //根结点为黑色
                    r = x;
                }
                else {
                    K k = x.key;
                    int h = x.hash;
                    Class kc = null;
                    for (TreeNode p = r;;) { //遍历查找新结点存放位置
                        int dir, ph;
                        K pk = p.key;
                        if ((ph = p.hash) > h)
                            dir = -1;
                        else if (ph < h)
                            dir = 1;
                        //key有实现Comparable接口则使用compareTo()进行比较,否则采用tieBreakOrder中默认的比较方式,即比较hashCode。
                        else if ((kc == null &&
                                  (kc = comparableClassFor(k)) == null) ||
                                 (dir = compareComparables(kc, k, pk)) == 0)
                            dir = tieBreakOrder(k, pk);
                            TreeNode xp = p;
                        if ((p = (dir <= 0) ? p.left : p.right) == null) {  //左子节点或右子节点为空则在p下添加新结点,否则p的值更新为子节点继续查找。红黑树中结点p.left <= p <= p.right
                            x.parent = xp;  //保存新结点的父结点
                            if (dir <= 0)
                                xp.left = x; //排序小的放左边
                            else
                                xp.right = x;  //排序大的放右边
                            r = balanceInsertion(r, x);  //平衡红黑树
                            break;
...
            this.root = r;
...
        }

putTreeVal()与此方法遍历方式类似不再介绍。

扩容实现

写这篇文章主要就是想讲讲扩容,Let’s go!

什么时候会扩容?

使用put()添加元素时会调用addCount(),内部检查sizeCtl看是否需要扩容。

tryPresize()被调用,此方法被调用有两个调用点:

  • 链表转红黑树(put()时检查)时如果table容量小于64(MIN_TREEIFY_CAPACITY),则会触发扩容。
  • 调用putAll()之类一次性加入大量元素,会触发扩容。

addCount()

addCount()与tryPresize()实现很相似,我们先以addCount()分析下扩容逻辑:

private final void addCount(long x, int check) {
    ...
    //check就是结点数量,有新元素加入成功才检查是否要扩容。
    if (check >= 0) {
        Node[] tab, nt; int n, sc;
        //s表示加入新元素后容量大小,计算已省略。
        //新容量大于当前扩容阈值并且小于最大扩容值才扩容,如果tab=null说明正在初始化,死循环等待初始化完成。
        while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
               (n = tab.length) < MAXIMUM_CAPACITY) {
            int rs = resizeStamp(n);  //@1
            //sc<0表示已经有线程在进行扩容工作
            if (sc < 0) {
                //条件1:检查是对容量n的扩容,保证sizeCtl与n是一块修改好的
                //条件2与条件3:应该是进行sc的最小值或最大值判断。
                //条件4与条件5: 确保tranfer()中的nextTable相关初始化逻辑已走完。
                if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
                    sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
                    transferIndex <= 0)
                    break;
                if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))  //有新线程参与扩容则sizeCtl加1
                    transfer(tab, nt);
            }
            //没有线程在进行扩容,将sizeCtl的值改为(rs << RESIZE_STAMP_SHIFT) + 2),原因见下面sizeCtl值的计算分析。
            else if (U.compareAndSwapInt(this, SIZECTL, sc,
                                         (rs << RESIZE_STAMP_SHIFT) + 2))
                transfer(tab, null);
            s = sumCount();
        }
    }
}

resizeStamp()

在上面的代码中首先有调用到这样的一个方法。

/**
 * The number of bits used for generation stamp in sizeCtl.
 * Must be at least 6 for 32bit arrays.
*/

private static int RESIZE_STAMP_BITS = 16;

/**
 * The bit shift for recording size stamp in sizeCtl.
 */

private static final int RESIZE_STAMP_SHIFT = 32 - RESIZE_STAMP_BITS;

static final int resizeStamp(int n) {
        return Integer.numberOfLeadingZeros(n) | (1 << (RESIZE_STAMP_BITS - 1));
}

Integer.numberOfLeadingZeros(n)用于计算n转换成二进制后前面有几个0。这个有什么作用呢?

首先ConcurrentHashMap的容量必定是2的幂次方,所以不同的容量n前面0的个数必然不同,这样可以保证是在原容量为n的情况下进行扩容。(搜索公众号Java知音,回复“2021”,送你一份Java面试题宝典)

(1 << (RESIZE_STAMP_BITS - 1)即是1<<15,表示为二进制即是高16位为0,第16位为1:

0000 0000 0000 0000 1000 0000 0000 0000

所以resizeStamp()的返回值(简称为rs) 高16位置0,第16位为1,低15位存放当前容量n扩容标识,用于表示是对n的扩容。rs与RESIZE_STAMP_SHIFT配合可以求出新的sizeCtl的值,分情况如下:

  • sc >= 0表示没有线程在扩容,使用CAS将sizeCtl的值改为(rs << RESIZE_STAMP_SHIFT) + 2)。
  • sc < 0已经有线程在扩容,将sizeCtl+1并调用transfer()让当前线程参与扩容。

rs即resizeStamp(n),如当前容量为8时sc(sizeCtl)的计算过程如下:

//容量n=8
0000 0000 0000 0000 0000 0000 0000 1000
//Integer.numberOfLeadingZeros(8)=28,二进制表示如下:
0000 0000 0000 0000 0000 0000 0001 1100
//rs
0000 0000 0000 0000 1000 0000 0001 1100
//temp = rs << RESIZE_STAMP_SHIFT,即 temp = rs << 16,左移16后temp最高位为1,所以temp成了一个负数。
1000 0000 0001 1100 0000 0000 0000 0000
//第一个线程要扩容时,sc = (rs << RESIZE_STAMP_SHIFT) + 2)
1000 0000 0001 1100 0000 0000 0000 0010

那么在扩容时sizeCtl值的意义便如下图所示:

361cd28d71957f353a8402ad257c89b6.webp

tryPresize()

private final void tryPresize(int size) {
        //根据传入的size计算出真正的新容量,新容量需要是2的幂次方。
        int c = (size >= (MAXIMUM_CAPACITY >>> 1)) ? MAXIMUM_CAPACITY :
            tableSizeFor(size + (size >>> 1) + 1);
        int sc;
        while ((sc = sizeCtl) >= 0) {
            Node[] tab = table; int n;
            if (tab == null || (n = tab.length) == 0) {
                n = (sc > c) ? sc : c;   //table未初始化则给一个初始容量
                //后面相似代码不再讲解
                if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
                    try {
                        if (table == tab) {
                            @SuppressWarnings("unchecked")
                            Node[] nt = (Node[])new Node[n];
                            table = nt;
                            sc = n - (n >>> 2);
                        }
                    } finally {
                        sizeCtl = sc;
                    }
                }
            }
            else if (c <= sc || n >= MAXIMUM_CAPACITY)
                break;
            else if (tab == table) {
                int rs = resizeStamp(n);
                if (sc < 0) {
                    Node[] nt;
                    if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
                        sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
                        transferIndex <= 0)
                        break;
                    if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
                        //传入指定容量
                        transfer(tab, nt);
                }
                else if (U.compareAndSwapInt(this, SIZECTL, sc,
                                             (rs << RESIZE_STAMP_SHIFT) + 2))
                    transfer(tab, null);
            }
        }
    }

transfer()

jdk1.8版本的ConcurrentHashMap支持并发扩容,上面已经分析了一小部分,下面这个方法是真正进行并行扩容的地方。

private final void transfer(Node[] tab, Node[] nextTab) {
        int n = tab.length, stride;
        if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)  //每个线程处理桶的最小数目,可以看出核数越高步长越小,最小16个。
            stride = MIN_TRANSFER_STRIDE; // subdivide range
        if (nextTab == null) {
            try {
                @SuppressWarnings("unchecked")
                Node[] nt = (Node[])new Node[n << 1];  //扩容到2倍
                nextTab = nt;
            } catch (Throwable ex) {      // try to cope with OOME
                sizeCtl = Integer.MAX_VALUE;  //扩容保护
                return;
            }
            nextTable = nextTab;
            transferIndex = n;  //扩容总进度,>=transferIndex的桶都已分配出去。
        }
        int nextn = nextTab.length;
          //扩容时的特殊节点,标明此节点正在进行迁移,扩容期间的元素查找要调用其find()方法在nextTable中查找元素。
        ForwardingNode fwd = new ForwardingNode(nextTab); 
        //当前线程是否需要继续寻找下一个可处理的节点
        boolean advance = true;
        boolean finishing = false//所有桶是否都已迁移完成。
        for (int i = 0, bound = 0;;) {
            Node f; int fh;
            //此循环的作用是确定当前线程要迁移的桶的范围或通过更新i的值确定当前范围内下一个要处理的节点。
            while (advance) {
                int nextIndex, nextBound;
                if (--i >= bound || finishing)  //每次循环都检查结束条件
                    advance = false;
                //迁移总进度<=0,表示所有桶都已迁移完成。
                else if ((nextIndex = transferIndex) <= 0) {  
                    i = -1;
                    advance = false;
                }
                else if (U.compareAndSwapInt
                         (this, TRANSFERINDEX, nextIndex,
                          nextBound = (nextIndex > stride ?
                                       nextIndex - stride : 0))) {  //transferIndex减去已分配出去的桶。
                    //确定当前线程每次分配的待迁移桶的范围为[bound, nextIndex)
                    bound = nextBound;
                    i = nextIndex - 1;
                    advance = false;
                }
            }
            //当前线程自己的活已经做完或所有线程的活都已做完,第二与第三个条件应该是下面让"i = n"后,再次进入循环时要做的边界检查。
            if (i < 0 || i >= n || i + n >= nextn) {
                int sc;
                if (finishing) {  //所有线程已干完活,最后才走这里。
                    nextTable = null;
                    table = nextTab;  //替换新table
                    sizeCtl = (n << 1) - (n >>> 1); //调sizeCtl为新容量0.75倍。
                    return;
                }
                //当前线程已结束扩容,sizeCtl-1表示参与扩容线程数-1。
                if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
                 //还记得addCount()处给sizeCtl赋的初值吗?相等时说明没有线程在参与扩容了,置finishing=advance=true,为保险让i=n再检查一次。
                    if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)   
                        return;
                    finishing = advance = true;
                    i = n; // recheck before commit
                }
            }
            else if ((f = tabAt(tab, i)) == null)
                advance = casTabAt(tab, i, null, fwd);  //如果i处是ForwardingNode表示第i个桶已经有线程在负责迁移了。
            else if ((fh = f.hash) == MOVED)
                advance = true// already processed
            else {
                synchronized (f) {  //桶内元素迁移需要加锁。
                    if (tabAt(tab, i) == f) {
                        Node ln, hn;
                        if (fh >= 0) {  //>=0表示是链表结点
                            //由于n是2的幂次方(所有二进制位中只有一个1),如n=16(0001 0000),第4位为1,那么hash&n后的值第4位只能为0或1。所以可以根据hash&n的结果将所有结点分为两部分。
                            int runBit = fh & n;
                            Node lastRun = f;
                            //找出最后一段完整的fh&n不变的链表,这样最后这一段链表就不用重新创建新结点了。
                            for (Node p = f.next; p != null; p = p.next) {
                                int b = p.hash & n;
                                if (b != runBit) {
                                    runBit = b;
                                    lastRun = p;
                                }
                            }
                            if (runBit == 0) {
                                ln = lastRun;
                                hn = null;
                            }
                            else {
                                hn = lastRun;
                                ln = null;
                            }
                            //lastRun之前的结点因为fh&n不确定,所以全部需要重新迁移。
                            for (Node p = f; p != lastRun; p = p.next) {
                                int ph = p.hash; K pk = p.key; V pv = p.val;
                                if ((ph & n) == 0)
                                    ln = new Node(ph, pk, pv, ln);
                                else
                                    hn = new Node(ph, pk, pv, hn);
                            }
                            //低位链表放在i处
                            setTabAt(nextTab, i, ln);
                            //高位链表放在i+n处
                            setTabAt(nextTab, i + n, hn);
                            setTabAt(tab, i, fwd);  //在原table中设置ForwardingNode节点以提示该桶扩容完成。
                            advance = true;
                        }
                        else if (f instanceof TreeBin) { //红黑树处理。
                            ...

helpTransfer()

添加、删除节点之处都会检测到table的第i个桶是ForwardingNode的话会调用helpTransfer()方法。

final Node[] helpTransfer(Node[] tab, Node f) {
        Node[] nextTab; int sc;
        if (tab != null && (f instanceof ForwardingNode) &&
            (nextTab = ((ForwardingNode)f).nextTable) != null) {
            int rs = resizeStamp(tab.length);
            while (nextTab == nextTable && table == tab &&
                   (sc = sizeCtl) < 0) {
                if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
                    sc == rs + MAX_RESIZERS || transferIndex <= 0)
                    break;
                if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) {
                    transfer(tab, nextTab);
                    break;
                }
            }
            return nextTab;
        }
        return table;
    }

并发扩容总结

  • 单线程新建nextTable,新容量一般为原table容量的两倍。
  • 每个线程想增/删元素时,如果访问的桶是ForwardingNode节点,则表明当前正处于扩容状态,协助一起扩容完成后再完成相应的数据更改操作。
  • 扩容时将原table的所有桶倒序分配,每个线程每次最小分配16个桶,防止资源竞争导致的效率下降。单个桶内元素的迁移是加锁的,但桶范围处理分配可以多线程,在没有迁移完成所有桶之前每个线程需要重复获取迁移桶范围,直至所有桶迁移完成。
  • 一个旧桶内的数据迁移完成但不是所有桶都迁移完成时,查询数据委托给ForwardingNode结点查询nextTable完成(这个后面看find()分析)。
  • 迁移过程中sizeCtl用于记录参与扩容线程的数量,全部迁移完成后sizeCtl更新为新table容量的0.75倍。

扩容节结束!其它常用操作再说下。(搜索公众号Java知音,回复“2021”,送你一份Java面试题宝典)

get()

public V get(Object key) {
        Node[] tab; Node e, p; int n, eh; K ek;
        int h = spread(key.hashCode());
        if ((tab = table) != null && (n = tab.length) > 0 &&
            (e = tabAt(tab, (n - 1) & h)) != null) {
            if ((eh = e.hash) == h) {  //总是检查头结点
                if ((ek = e.key) == key || (ek != null && key.equals(ek)))
                    return e.val;
            }
            else if (eh < 0)  //在迁移或都是TreeBin
                return (p = e.find(h, key)) != null ? p.val : null;
            while ((e = e.next) != null) {  //链表则直接遍历查询
                if (e.hash == h &&
                    ((ek = e.key) == key || (ek != null && key.equals(ek))))
                    return e.val;
            }
        }
        return null;
    }

remove()

public V remove(Object key) {
    return replaceNode(key, nullnull);
}

final V replaceNode(Object key, V value, Object cv) {
    int hash = spread(key.hashCode());
    for (Node[] tab = table;;) {
        Node f; int n, i, fh;
        if (tab == null || (n = tab.length) == 0 ||
            (f = tabAt(tab, i = (n - 1) & hash)) == null)
            break;
        else if ((fh = f.hash) == MOVED)  //删除时也需要确实扩容完成后才可以操作。
            tab = helpTransfer(tab, f);
        else {
            V oldVal = null;
            boolean validated = false;
            synchronized (f) {
                if (tabAt(tab, i) == f) {
                    if (fh >= 0) {
                        validated = true;
                        for (Node e = f, pred = null;;) {
                            K ek;
                            if (e.hash == hash &&
                                ((ek = e.key) == key ||
                                 (ek != null && key.equals(ek)))) {
                                V ev = e.val;
                                if (cv == null || cv == ev ||
                                    (ev != null && cv.equals(ev))) {  //cv不为null则替换,否则是删除。
                                    oldVal = ev;
                                    if (value != null)
                                        e.val = value;
                                    else if (pred != null)
                                        pred.next = e.next;
                                    else
                                        //没前置节点就是头节点
                                        setTabAt(tab, i, e.next);
                                }
                                break;
                            }
                            pred = e;
                            if ((e = e.next) == null)
                                break;
                        }
                    }
                    else if (f instanceof TreeBin) {
                        //...
                    }
                }
            }
            if (validated) {
                if (oldVal != null) {
                    if (value == null)
                        addCount(-1L, -1);
                    return oldVal;
                }
                break;
            }
        }
    }
    return null;
}

主要改进

CAS无锁算法与synchronized保证并发安全,支持并发扩容,数据结构变更为数组+链表+红黑树,提高性能。

jdk1.8版弃用变量

  • Segment只有序列化时会用到。

  • loadFactor仅用于构造函数中设定初始容量,已不能影响扩容阈值,jdk1.8版本中阈值计算基本恒定为0.75。

  • concurrencyLevel只能影响初始容量,table容量大小与它无关。


浏览 45
点赞
评论
收藏
分享

手机扫一扫分享

分享
举报
评论
图片
表情
推荐
点赞
评论
收藏
分享

手机扫一扫分享

分享
举报