科普文: jdk 1.7和 jdk 1.8 中ConcurrentHashMap 原理浅析

news2025/1/24 11:35:33

1. 前言

为什么要使用 ConcurrentHashMap

主要基于两个原因:

  1. 在并发编程中使用 HashMap 可能造成死循环(jdk1.7,jdk1.8 中会造成数据丢失)
  2. HashTable 效率非常低下

2. ConcurrentHashMap 结构

jdk 1.7 和 jdk 1.8 中,ConcurrentHashMap 的结构有着很大的变化,后面会讲解。

2.1 jdk 1.7 中结构

jdk1.7 ConcurrentHashMap结构

 

在 jdk 1.7 中,ConcurrentHashMap 是由 Segment 数据结构和 HashEntry 数组结构构成。采取分段锁来保证安全性。

Segment 是 ReentrantLock 重入锁,在 ConcurrentHashMap 中扮演锁的角色;HashEntry 则用于存储键值对数据。

一个 ConcurrentHashMap 里包含一个 Segment 数组,一个 Segment 里包含一个 HashEntry 数组,Segment 的结构和 HashMap 类似,是一个数组和链表结构。

针对HashTable会锁整个hash表的问题,ConcurrentHashMap提出了分段锁的解决方案。

分段锁的思想就是:锁的时候不锁整个hash表,而是只锁一部分。

如何实现呢?这就用到了ConcurrentHashMap中最关键的Segment。

ConcurrentHashMap中维护着一个Segment数组,每个Segment可以看做是一个HashMap。

而Segment本身继承了ReentrantLock,它本身就是一个锁。

在Segment中通过HashEntry数组来维护其内部的hash表。

每个HashEntry就代表了map中的一个K-V,用HashEntry可以组成一个链表结构,通过next字段引用到其下一个元素。

上述内容在源码中的表示如下:

public class ConcurrentHashMap<K, V> extends AbstractMap<K, V>
        implements ConcurrentMap<K, V>, Serializable {

    // ... 省略 ...
    /**
     * The segments, each of which is a specialized hash table.
     */
    final Segment<K,V>[] segments;

    // ... 省略 ...

    /**
     * Segment是ConcurrentHashMap的静态内部类
     * 
     * Segments are specialized versions of hash tables.  This
     * subclasses from ReentrantLock opportunistically, just to
     * simplify some locking and avoid separate construction.
     */
    static final class Segment<K,V> extends ReentrantLock implements Serializable {
    	// ... 省略 ...
    	/**
         * The per-segment table. Elements are accessed via
         * entryAt/setEntryAt providing volatile semantics.
         */
        transient volatile HashEntry<K,V>[] table;
        // ... 省略 ...
    }
    // ... 省略 ...

    /**
     * ConcurrentHashMap list entry. Note that this is never exported
     * out as a user-visible Map.Entry.
     */
    static final class HashEntry<K,V> {
        final int hash;
        final K key;
        volatile V value;
        volatile HashEntry<K,V> next;
        // ... 省略 ...
    }
}

2.2 jdk 1.8 中结构

jdk1.8 ConcurrentHashMap结构

 

JDK1.8 的实现已经摒弃了 Segment 的概念,而是直接用 Node 数组+链表+红黑树的数据结构来实现,并发控制使用 Synchronized 和 CAS 来操作,整个看起来就像是优化过且线程安全的 HashMap,虽然在 JDK1.8 中还能看到 Segment 的数据结构,但是已经简化了属性,只是为了兼容旧版本。

先来从源码角度看一下JDK8中是怎么定义的存储结构。

/**
 * The array of bins. Lazily initialized upon first insertion.
 * Size is always a power of two. Accessed directly by iterators.
 * 
 * hash表,在第一次put数据的时候才初始化,他的大小总是2的倍数。
 */
transient volatile Node<K,V>[] table;

/**
 * 用来存储一个键值对
 * 
 * Key-value entry.  This class is never exported out as a
 * user-mutable Map.Entry (i.e., one supporting setValue; see
 * MapEntry below), but can be used for read-only traversals used
 * in bulk tasks.  Subclasses of Node with a negative hash field
 * are special, and contain null keys and values (but are never
 * exported).  Otherwise, keys and vals are never null.
 */
static class Node<K,V> implements Map.Entry<K,V> {
    final int hash;
    final K key;
    volatile V val;
    volatile Node<K,V> next;
}

可以发现,JDK8与JDK7的实现由较大的不同,JDK8中不在使用Segment的概念,他更像HashMap的实现方式。 

3. 实现

3.1 JDK 1.7 中的实现

3.1.1 初始化

ConcurrentHashMap 的初始化是通过位运算来初始化 Segment 的大小的(ssize 表示),通过concurrentLevel 计算得出。

int sshift = 0;
int ssize = 1;
while (ssize < concurrencyLevel) {
    ++sshift;
    ssize <<= 1;
}

ssize 用位于运算来计算(ssize <<=1),所以 Segment 的大小取值都是以2的N次方,Segment 的大小 ssize 默认为16.

每一个 Segment 元素下的 HashEntry 的初始化也是按照位于运算来计算,用 cap 来表示

int cap = 1; while (cap < c) cap <<= 1; 

HashEntry 大小的计算也是2的N次方(cap <<=1), cap 的初始值为1,所以 HashEntry 最小的容量为2.

3.1.2 get 操作

Segment 的 get 操作实现非常简单和高效,先经过一次再散列,然后使用这个散列值通过散列运算定位到 Segment,再通过散列算法定位到元素。

public V get(Object key){
 int hash = hash(key.hashCode());
 return segmentFor(hash).get(key,hash);
 } 

get 操作的高效之处在于整个 get 过程都不需要加锁,除非读到空的值才会加锁重读。原因就是将使用的共享变量定义成 volatile 类型。

transient volatile int count; 
volatile V value; 

3.1.3 put 操作

对于 ConcurrentHashMap 的数据插入,这里要进行两次 Hash 去定位数据的存储位置

static class Segment<K,V> extends ReentrantLock implements Serializable { //省略 } 
public V put(K key, V value) {
    Segment<K,V> s;
    if (value == null)
        throw new NullPointerException();
    int hash = hash(key);
    int j = (hash >>> segmentShift) & segmentMask;

    // 根据key的hash定位出一个segment,如果指定index的segment还没初始化,则调用ensureSegment方法初始化
    if ((s = (Segment<K,V>)UNSAFE.getObject          // nonvolatile; recheck
         (segments, (j << SSHIFT) + SBASE)) == null) //  in ensureSegment
        s = ensureSegment(j);
    // 调用segment的put方法
    return s.put(key, hash, value, false);
}

最终会调用segment的put方法,将元素put到HashEntry数组中,这里的注释中只给出锁相关的说明

final V put(K key, int hash, V value, boolean onlyIfAbsent) {
    // 因为segment本身就是一个锁
    // 这里调用tryLock尝试获取锁
    // 如果获取成功,那么其他线程都无法再修改这个segment
    // 如果获取失败,会调用scanAndLockForPut方法根据key和hash尝试找到这个node,如果不存在,则创建一个node并返回,如果存在则返回null
    // 查看scanAndLockForPut源码会发现他在查找的过程中会尝试获取锁,在多核CPU环境下,会尝试64次tryLock(),如果64次还没获取到,会直接调用lock()
    // 也就是说这一步一定会获取到锁
    HashEntry<K,V> node = tryLock() ? null :
        scanAndLockForPut(key, hash, value);
    V oldValue;
    try {
        HashEntry<K,V>[] tab = table;
        int index = (tab.length - 1) & hash;
        HashEntry<K,V> first = entryAt(tab, index);
        for (HashEntry<K,V> e = first;;) {
            if (e != null) {
                K k;
                if ((k = e.key) == key ||
                    (e.hash == hash && key.equals(k))) {
                    oldValue = e.value;
                    if (!onlyIfAbsent) {
                        e.value = value;
                        ++modCount;
                    }
                    break;
                }
                e = e.next;
            }
            else {
                if (node != null)
                    node.setNext(first);
                else
                    node = new HashEntry<K,V>(hash, key, value, first);
                int c = count + 1;
                if (c > threshold && tab.length < MAXIMUM_CAPACITY)
                    // 扩容
                    rehash(node);
                else
                    setEntryAt(tab, index, node);
                ++modCount;
                count = c;
                oldValue = null;
                break;
            }
        }
    } finally {
        // 释放锁
        unlock();
    }
    return oldValue;
}

当执行put操作时,会经历两个步骤:

  1. 判断是否需要扩容
  2. 定位到添加元素的位置,将其放入 HashEntry 数组中
  3. Segment段继承了ReentrantLock

插入过程会进行第一次 key 的 hash 来定位 Segment 的位置,如果该 Segment 还没有初始化,即通过 CAS 操作进行赋值,

然后进行第二次 hash 操作,找到相应的 HashEntry 的位置,这里会利用继承过来的锁的特性,

在将数据插入指定的 HashEntry 位置时(尾插法),会通过继承 ReentrantLock 的 tryLock() 方法尝试去获取锁,如果获取成功就直接插入相应的位置,

如果已经有线程获取该Segment的锁,那当前线程会以自旋的方式去继续的调用 tryLock() 方法去获取锁,超过指定次数就挂起,等待唤醒。

3.1.4 size 操作

计算 ConcurrentHashMap 的元素大小是并发操作的,就是在你计算 size 的时候,他还在并发的插入数据,这就可能会导致你计算出来的 size 和你实际的 size 有相差。

ConcurrentHashMap 采取的解决方法是先尝试 2 次通过不锁住 Segment 的方式来统计各个 Segment 大小,统计过程中如果 count 发生变化,则再采用加锁的方式来统计所有 Segment 的大小。

try {
    for (; ; ) {
        if (retries++ == RETRIES_BEFORE_LOCK) {
            for (int j = 0; j < segments.length; ++j)
                ensureSegment(j).lock(); // force creation  
        }
        sum = 0L;
        size = 0;
        overflow = false;
        for (int j = 0; j < segments.length; ++j) {
            Segment<K, V> seg = segmentAt(segments, j);
            if (seg != null) {
                /* 在put、remove、clean方法里操作
                * 元素都会将变量modCount进行加一,
                * 统计也是依靠这个变量的前后变化来进行的 */
                sum += seg.modCount;
                int c = seg.count;
                if (c < 0 || (size += c) < 0) overflow = true;
            }
        }
        if (sum == last)
            break;
        last = sum;
    }
} finally {
    if (retries > RETRIES_BEFORE_LOCK) {
        for (int j = 0; j < segments.length; ++j)
            segmentAt(segments, j).unlock();
    }
}

3.1.5 线程安全的扩容(Rehash)

HashMap的线程安全问题大部分出在扩容(rehash)的过程中。

ConcurrentHashMap的扩容只针对每个segment中的HashEntry数组进行扩容。

由上述put的源码可知,ConcurrentHashMap在rehash的时候是有锁的,所以在rehash的过程中,其他线程无法对segment的hash表做操作,这就保证了线程安全。

3.2 JDK 1.8 中的实现

3.2.1 基本属性及概念

看一下基本属性

//node数组最大容量:2^30=1073741824
private static final int MAXIMUM_CAPACITY = 1 << 30;
//默认初始值,必须是2的幂数
private static final int DEFAULT_CAPACITY = 16;
//数组可能最大值,需要与toArray()相关方法关联
static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;
//并发级别,遗留下来的,为兼容以前的版本
private static final int DEFAULT_CONCURRENCY_LEVEL = 16;
//负载因子
private static final float LOAD_FACTOR = 0.75f;
//链表转红黑树阀值,> 8 链表转换为红黑树
static final int TREEIFY_THRESHOLD = 8;
//树转链表阀值,小于等于6(tranfer时,lc、hc=0两个计数器分别++记录原bin、新binTreeNode数量,<=UNTREEIFY_THRESHOLD 则untreeify(lo))
static final int UNTREEIFY_THRESHOLD = 6;
static final int MIN_TREEIFY_CAPACITY = 64;
private static final int MIN_TRANSFER_STRIDE = 16;
private static int RESIZE_STAMP_BITS = 16;
//2^15-1,help resize的最大线程数
private static final int MAX_RESIZERS = (1 << (32 - RESIZE_STAMP_BITS)) - 1;
//32-16=16,sizeCtl中记录size大小的偏移量
private static final int RESIZE_STAMP_SHIFT = 32 - RESIZE_STAMP_BITS;
//forwarding nodes的hash值
static final int MOVED = -1;
//树根节点的hash值
static final int TREEBIN = -2;
//ReservationNode的hash值
static final int RESERVED = -3;
//可用处理器数量
static final int NCPU = Runtime.getRuntime().availableProcessors();
//存放node的数组
transient volatile Node<K,V>[] table;
/*控制标识符,用来控制table的初始化和扩容的操作,不同的值有不同的含义
    *当为负数时:-1代表正在初始化,-N代表有N-1个线程正在 进行扩容
    *当为0时:代表当时的table还没有被初始化
    *当为正数时:表示初始化或者下一次进行扩容的大小
    */
private transient volatile int sizeCtl;

重要概念:

  1. table: 默认为null,初始化发生在第一次插入操作,默认大小为16的数组,用来存储Node节点数据,扩容时大小总是2的幂次方。
  2. nextTable: 默认为null,扩容时新生成的数组,其大小为原数组的两倍
  3. Node :保存 key,value 及 key 的 hash 值的数据结构。
class Node<K,V> implements Map.Entry<K,V> {
    final int hash;
    final K key;
    volatile V val;
    volatile Node<K,V> next;
    //省略部分代码
}

其中 value 和 next 都用 volatile 修饰,保证并发的可见性。

  1. ForwardingNode: 一个特殊的 Node 节点,hash 值为 -1,其中存储 nextTable 的引用。
final class ForwardingNode<K,V> extends Node<K,V> {
    final Node<K,V>[] nextTable;
    ForwardingNode(Node<K,V>[] tab) {
        super(MOVED, null, null, null);
        this.nextTable = tab;
    }
}

只有table发生扩容的时候,ForwardingNode 才会发挥作用,作为一个占位符放在table中表示当前节点为 null 或则已经被移动。

  1. TreeNode类和TreeBin类:  TreeNode类表示的是红黑树上的每个节点。当一个链表上的节点数量超过了指定的值,会将这个链表变为红黑树,当然每个节点就转换为TreeNode。不像HashMap,ConcurrentHashMap在桶里面直接存储的不是TreeNode,而是一个TreeBin,在TreeBin内部维护一个红黑树,也就是说TreeNode在TreeBin内部使用的。

3.2.2 初始化

以无参数构造函数为例,来看一下ConcurrentHashMap类初始化的时候会做些什么。

ConcurrentHashMap<String, String> map = new ConcurrentHashMap<>();

首先会执行静态代码块和初始化类变量。 主要会初始化以下这些类变量:

// Unsafe mechanics
private static final sun.misc.Unsafe U;
private static final long SIZECTL;
private static final long TRANSFERINDEX;
private static final long BASECOUNT;
private static final long CELLSBUSY;
private static final long CELLVALUE;
private static final long ABASE;
private static final int ASHIFT;

static {
    try {
        U = sun.misc.Unsafe.getUnsafe();
        Class<?> k = ConcurrentHashMap.class;
        SIZECTL = U.objectFieldOffset
            (k.getDeclaredField("sizeCtl"));
        TRANSFERINDEX = U.objectFieldOffset
            (k.getDeclaredField("transferIndex"));
        BASECOUNT = U.objectFieldOffset
            (k.getDeclaredField("baseCount"));
        CELLSBUSY = U.objectFieldOffset
            (k.getDeclaredField("cellsBusy"));
        Class<?> ck = CounterCell.class;
        CELLVALUE = U.objectFieldOffset
            (ck.getDeclaredField("value"));
        Class<?> ak = Node[].class;
        ABASE = U.arrayBaseOffset(ak);
        int scale = U.arrayIndexScale(ak);
        if ((scale & (scale - 1)) != 0)
            throw new Error("data type scale not a power of two");
        ASHIFT = 31 - Integer.numberOfLeadingZeros(scale);
    } catch (Exception e) {
        throw new Error(e);
    }
}

实例化 ConcurrentHashMap 时带参数时,会根据参数调整 table 的大小,假设参数为 100,最终会调整成 256,确保 table 的大小总是2的幂次方.

table 初始化

private final Node<K,V>[] initTable() {
    Node<K,V>[] tab; int sc;
    while ((tab = table) == null || tab.length == 0) {
        //如果一个线程发现sizeCtl<0,意味着另外的线程执行CAS操作成功,当前线程只需要让出cpu时间片
        if ((sc = sizeCtl) < 0) 
            Thread.yield(); // lost initialization race; just spin
        else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
            try {
                if ((tab = table) == null || tab.length == 0) {
                    int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
                    @SuppressWarnings("unchecked")
                    Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
                    table = tab = nt;
                    sc = n - (n >>> 2);
                }
            } finally {
                sizeCtl = sc;
            }
            break;
        }
    }
    return tab;
}

成员变量sizeCtl在ConcurrentHashMap中的其中一个作用相当于HashMap中的threshold,当hash表中元素个数超过sizeCtl时,触发扩容; 他的另一个作用类似于一个标识,例如,当他等于-1的时候,说明已经有某一线程在执行hash表的初始化了,一个小于-1的值表示某一线程正在对hash表执行resize。

这个方法首先判断sizeCtl是否小于0,如果小于0,直接将当前线程变为就绪状态的线程。

当sizeCtl大于等于0时,当前线程会尝试通过CAS的方式将sizeCtl的值修改为-1。修改失败的线程会进入下一轮循环,判断sizeCtl<0了,被yield住;修改成功的线程会继续执行下面的初始化代码。

在new Node[]之前,要再检查一遍table是否为空,这里做双重检查的原因在于,如果另一个线程执行完#1代码后挂起,此时另一个初始化的线程执行完了#6的代码,此时sizeCtl是一个大于0的值,那么再切回这个线程执行的时候,是有可能重复初始化的。关于这个问题会在下图的并发场景中说明。

然后初始化hash表,并重新计算sizeCtl的值,最终返回初始化好的hash表。

下图详细说明了几种可能导致重复初始化hash表的并发场景,我们假设Thread2最终成功初始化hash表。

  • Thread1模拟的是CAS更新sizeCtl变量的并发场景
  • Thread2模拟的是table的双重检查的必要性

由上图可以看出,在Thread1中如果不对sizeCtl的值更新做并发控制,Thread1是有可能走到new Node[]这一步的。 在Thread3中,如果不做双重判断,Thread3也会走到new Node[]这一步。

3.2.3 put 操作

假设 table 已经初始化完成,put 操作采用 CAS + synchronized 实现并发插入或更新操作。

final V putVal(K key, V value, boolean onlyIfAbsent) {
    if (key == null || value == null) throw new NullPointerException();
    int hash = spread(key.hashCode());
    int binCount = 0;
    for (Node<K,V>[] tab = table;;) {
        Node<K,V> f; int n, i, fh;
        if (tab == null || (n = tab.length) == 0)
            tab = initTable();
        else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
            if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value, null)))
                break;                   // no lock when adding to empty bin
        }
        else if ((fh = f.hash) == MOVED)
            tab = helpTransfer(tab, f);
        ...省略部分代码
    }
    addCount(1L, binCount);
    return null;
}

hash算法

static final int spread(int h) { return (h ^ (h >>> 16)) & HASH_BITS; } 

table 中定位索引位置,n 是 table 的大小

int index = (n - 1) & hash

获取 table 中对应索引的元素f

Unsafe.getObjectVolatile 可以直接获取指定内存的数据,保证了每次拿到数据都是最新的。

如果 f 为 null,说明 table 中这个位置第一次插入元素,利用Unsafe.compareAndSwapObject 方法插入 Node 节点。

如果 CAS 成功,说明 Node 节点已经插入,随后 addCount(1L, binCount) 方法会检查当前容量是否需要进行扩容。

如果 CAS 失败,说明有其它线程提前插入了节点,自旋重新尝试在这个位置插入节点。

如果f的 hash 值为 -1,说明当前 f 是 ForwardingNode 节点,意味有其它线程正在扩容,则一起进行扩容操作。

其余情况把新的 Node 节点按链表或红黑树的方式插入到合适的位置,这��过程采用同步内置锁实现并发,代码如下:

synchronized (f) {
    if (tabAt(tab, i) == f) {
        if (fh >= 0) {
            binCount = 1;
            for (Node<K,V> e = f;; ++binCount) {
                K ek;
                if (e.hash == hash &&
                    ((ek = e.key) == key ||
                     (ek != null && key.equals(ek)))) {
                    oldVal = e.val;
                    if (!onlyIfAbsent)
                        e.val = value;
                    break;
                }
                Node<K,V> pred = e;
                if ((e = e.next) == null) {
                    pred.next = new Node<K,V>(hash, key,
                                              value, null);
                    break;
                }
            }
        }
        else if (f instanceof TreeBin) {
            Node<K,V> p;
            binCount = 2;
            if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
                                           value)) != null) {
                oldVal = p.val;
                if (!onlyIfAbsent)
                    p.val = value;
            }
        }
    }
}

在节点 f 上进行同步,节点插入之前,再次利用tabAt(tab, i) == f判断,防止被其它线程修改。

如果 f.hash >= 0,说明 f 是链表结构的头结点,遍历链表,如果找到对应的 node 节点,则修改 value,否则在链表尾部加入节点。 如果 f 是 TreeBin 类型节点,说明 f 是红黑树根节点,则在树结构上遍历元素,更新或增加节点。 如果链表中节点数 binCount >= TREEIFY_THRESHOLD(默认是8),则把链表转化为红黑树结构。

table扩容

当 table 容量不足的时候,即 table 的元素数量达到容量阈值 sizeCtl,需要对 table 进行扩容。

整个扩容分为两部分:

构建一个 nextTable,大小为 table 的两倍。 把 table 的数据复制到 nextTable 中。

这两个过程在单线程下实现很简单,但是 ConcurrentHashMap 是支持并发插入的,扩容操作自然也会有并发的出现,这种情况下,第二步可以支持节点的并发复制,这样性能自然提升不少,但实现的复杂度也上升了一个台阶。

先看第一步,构建nextTable,毫无疑问,这个过程只能只有单个线程进行 nextTable 的初始化,具体实现如下:

private final void addCount(long x, int check) {
    ... 省略部分代码
    if (check >= 0) {
        Node<K,V>[] tab, nt; int n, sc;
        while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
               (n = tab.length) < MAXIMUM_CAPACITY) {
            int rs = resizeStamp(n);
            if (sc < 0) {
                if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
                    sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
                    transferIndex <= 0)
                    break;
                if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
                    transfer(tab, nt);
            }
            else if (U.compareAndSwapInt(this, SIZECTL, sc,
                                         (rs << RESIZE_STAMP_SHIFT) + 2))
                transfer(tab, null);
            s = sumCount();
        }
    }
}

通过 Unsafe.compareAndSwapInt 修改 sizeCtl 值,保证只有一个线程能够初始化 nextTable,扩容后的数组长度为原来的两倍,但是容量是原来的 1.5。

节点从 table 移动到 nextTable,大体思想是遍历、复制的过程。

首先根据运算得到需要遍历的次数i,然后利用 tabAt 方法获得 i 位置的元素 f,初始化一个 forwardNode 实例 fwd。

如果 f == null,则在 table 中的 i 位置放入 fwd,这个过程是采用 Unsafe.compareAndSwapObjectf 方法实现的,很巧妙的实现了节点的并发移动。

如果 f 是链表的头节点,就构造一个反序链表,把他们分别放在 nextTable 的 i 和 i+n 的位置上,移动完成,采用 Unsafe.putObjectVolatile 方法给 table 原位置赋值 fwd。 如果 f 是 TreeBin 节点,也做一个反序处理,并判断是否需要 untreeify,把处理的结果分别放在 nextTable 的 i 和 i+n 的位置上,移动完成,同样采用 Unsafe.putObjectVolatile 方法给 table 原位置赋值 fwd。 遍历过所有的节点以后就完成了复制工作,把 table 指向 nextTable,并更新 sizeCtl 为新数组大小的 0.75 倍 ,扩容完成。

红黑树构造

注意:如果链表结构中元素超过 TREEIFY_THRESHOLD 阈值,默认为 8 个,则把链表转化为红黑树,提高遍历查询效率。

if (binCount != 0) {
    if (binCount >= TREEIFY_THRESHOLD)
        treeifyBin(tab, i);
    if (oldVal != null)
        return oldVal;
    break;
}

接下来我们看看如何构造树结构,代码如下:

private final void treeifyBin(Node<K,V>[] tab, int index) {
    Node<K,V> b; int n, sc;
    if (tab != null) {
        if ((n = tab.length) < MIN_TREEIFY_CAPACITY)
            tryPresize(n << 1);
        else if ((b = tabAt(tab, index)) != null && b.hash >= 0) {
            synchronized (b) {
                if (tabAt(tab, index) == b) {
                    TreeNode<K,V> hd = null, tl = null;
                    for (Node<K,V> e = b; e != null; e = e.next) {
                        TreeNode<K,V> p =
                            new TreeNode<K,V>(e.hash, e.key, e.val,
                                              null, null);
                        if ((p.prev = tl) == null)
                            hd = p;
                        else
                            tl.next = p;
                        tl = p;
                    }
                    setTabAt(tab, index, new TreeBin<K,V>(hd));
                }
            }
        }
    }
}

可以看出,生成树节点的代码块是同步的,进入同步代码块之后,再次验证 table 中 index 位置元素是否被修改过。

  1. 根据 table 中 index 位置 Node 链表,重新生成一个 hd 为头结点的 TreeNode 链表。
  2. 根据 hd 头结点,生成 TreeBin 树结构,并把树结构的root节点写到 table 的 index 位置的内存中,具体实现如下:
TreeBin(TreeNode<K,V> b) {
    super(TREEBIN, null, null, null);
    this.first = b;
    TreeNode<K,V> r = null;
    for (TreeNode<K,V> x = b, next; x != null; x = next) {
        next = (TreeNode<K,V>)x.next;
        x.left = x.right = null;
        if (r == null) {
            x.parent = null;
            x.red = false;
            r = x;
        }
        else {
            K k = x.key;
            int h = x.hash;
            Class<?> kc = null;
            for (TreeNode<K,V> p = r;;) {
                int dir, ph;
                K pk = p.key;
                if ((ph = p.hash) > h)
                    dir = -1;
                else if (ph < h)
                    dir = 1;
                else if ((kc == null &&
                          (kc = comparableClassFor(k)) == null) ||
                         (dir = compareComparables(kc, k, pk)) == 0)
                    dir = tieBreakOrder(k, pk);
                    TreeNode<K,V> xp = p;
                if ((p = (dir <= 0) ? p.left : p.right) == null) {
                    x.parent = xp;
                    if (dir <= 0)
                        xp.left = x;
                    else
                        xp.right = x;
                    r = balanceInsertion(r, x);
                    break;
                }
            }
        }
    }
    this.root = r;
    assert checkInvariants(root);

主要根据 Node 节点的 hash 值大小构建二叉树。

3.2.4 get 操作

get操作和put操作相比,显得简单了许多。

public V get(Object key) {
    Node<K,V>[] tab; 
    Node<K,V> e, p;
    int n, eh; 
    K ek;
    int h = spread(key.hashCode());
    if ((tab = table) != null && (n = tab.length) > 0 &&
        (e = tabAt(tab, (n - 1) & h)) != null) {
        if ((eh = e.hash) == h) {
            if ((ek = e.key) == key || (ek != null && key.equals(ek)))
                return e.val;
        }
        else if (eh < 0)
            return (p = e.find(h, key)) != null ? p.val : null;
        while ((e = e.next) != null) {
            if (e.hash == h &&
                ((ek = e.key) == key || (ek != null && key.equals(ek))))
                return e.val;
        }
    }
    return null;
}

  1. 判断table是否为空,如果为空,直接返回null。
  2. 计算key的hash值,并获取指定table中指定位置的Node节点,通过遍历链表或则树结构找到对应的节点,返回value值。

3.2.4 size 操作

JDK1.8 size 是通过对 baseCount 和 counterCell 进行 CAS 计算,最终通过 baseCount 和 遍历 CounterCell 数组得出 size。 具体参考:ConcurrentHashMap 的size方法原理分析

4. JDK 1.8 中为什么要摒弃分段锁

很多人不明白为什么Doug Lea在JDK1.8为什么要做这么大变动,使用重级锁synchronized,性能反而更高,原因如下:

  1. jdk1.8中锁的粒度更细了。jdk1.7中ConcurrentHashMap 的concurrentLevel(并发数)基本上是固定的。jdk1.8中的concurrentLevel是和数组大小保持一致的,每次扩容,并发度扩大一倍.
  2. 红黑树的引入,对链表的优化使得 hash 冲突时的 put 和 get 效率更高
  3. 获得JVM的支持 ,ReentrantLock 毕竟是 API 这个级别的,后续的性能优化空间很小。 synchronized 则是 JVM 直接支持的, JVM 能够在运行时作出相应的优化措施:锁粗化、锁消除、锁自旋等等。这就使得 synchronized 能够随着 JDK 版本的升级而不改动代码的前提下获得性能上的提升。

5. 小结&参考资料

小结

可以看出 JDK1.8 版本的 ConcurrentHashMap 的数据结构已经接近 HashMap,相对而言,ConcurrentHashMap 只是增加了同步的操作来控制并发,从 JDK1.7 版本的 ReentrantLock+Segment+HashEntry,到 JDK1.8 版本中synchronized+CAS+HashEntry+红黑树,优化确实很大。

参考资料

  • HashMap1.8中多线程扩容引起的死循环问题
  • ConcurrentHashMap底层原理
  • 深入浅出ConcurrentHashMap1.8
  • ConcurrentHashMap 的size方法原理分析
  • Java并发编程的艺术
  • ConcurrentHashMap性能在jdk1.8大于jdk1.7

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1969973.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

软件产品测试报告包括哪些内容?专业软件测试服务供应商推荐

在当今快速发展的软件行业中&#xff0c;软件产品测试报告的重要性愈加凸显。卓码软件测评作为专业的软件测试服务供应商&#xff0c;深知一份高质量的测试报告对于开发团队、管理层以及客户的重要性。 软件产品测试报告是对软件产品在测试过程中所表现出来的各项指标和特性的…

【架构师进阶必备】Spring - 运行时以四种方式动态注册 bean

Spring — 运行时以四种方式动态注册 bean 1. 概述 在本教程中&#xff0c;我们将学习“使用 spring 动态注册 bean”或“在运行时动态将 bean 添加到 spring-context”。在Spring 应用程序中加载和删除 bean 时&#xff0c;无需在运行时重新启动应用程序即可完成此操作。 如…

先用先发!小样本故障诊断新思路!Transformer-SVM组合模型多特征分类预测/故障诊断(Matlab)

先用先发&#xff01;小样本故障诊断新思路&#xff01;Transformer-SVM组合模型多特征分类预测/故障诊断&#xff08;Matlab&#xff09; 目录 先用先发&#xff01;小样本故障诊断新思路&#xff01;Transformer-SVM组合模型多特征分类预测/故障诊断&#xff08;Matlab&#…

【RabbitMQ】路由模式(Routing)

一、基本概念 生产者&#xff08;Producer&#xff09;&#xff1a;发送消息到交换机的程序。在发送消息时&#xff0c;需要指定一个路由键。交换机&#xff08;Exchange&#xff09;&#xff1a;接收生产者发送的消息&#xff0c;并根据路由键将消息路由到与之匹配的队列。在…

【C++BFS】1466. 重新规划路线

本文涉及知识点 CBFS算法 LeetCode1466. 重新规划路线 n 座城市&#xff0c;从 0 到 n-1 编号&#xff0c;其间共有 n-1 条路线。因此&#xff0c;要想在两座不同城市之间旅行只有唯一一条路线可供选择&#xff08;路线网形成一颗树&#xff09;。去年&#xff0c;交通运输部…

软件测试--易用性测试

人体工程学这是一门将日常使用的东西设计为易于使用何实用性强的科学。因此人体工程学的主要目标是达到易用性。 用户界面测试 用于与软件程序交互的方式称为用户界面或UI。大家都熟悉的计算机UI随着时间推移发生了变化。早期的计算机有触发开关和发光管。纸带、穿孔卡和电传打…

AIGC技术的未来航向:深度解析与Java实践

摘要&#xff1a; 本文深入探讨了人工智能生成内容&#xff08;AIGC&#xff09;技术的未来发展方向&#xff0c;从技术创新、可持续可拓展性、用户体验、应用场景、政府赋能等多维度进行分析&#xff0c;并结合Java技术实践&#xff0c;提供具体的实现策略和代码示例。 引言…

PDF翻译神器:这四款可以实现一键搞定,留学党必备!

外文的阅读还是需要一定的语言功底&#xff0c;现在大家也对外文越来越重视起来了&#xff0c;但是借助一些翻译工具进行翻译可以很大程度地提升工作的效率&#xff0c;就算是遇到批量的文件处理也可以一键翻译出来&#xff0c;所以今天借此文章整理了四款好用的pdf翻译工具&am…

计算机基础(Windows 10+Office 2016)教程 —— 第3章 操作系统基础(下)

操作系统基础 Windows 10的系统管理3.5.1 设置日期和时间3.5.2 Windows 10 个性化设置3.5.3 安装和卸载应用程序3.5.4 分区管理3.5.5 格式化磁盘3.5.6 清理磁盘 3.6 Windows 10的网络功能3.6.1 网络软硬件的安装3.6.2 查看网络中其他计算机3.6.3 资源共享 3.7 Windows 10系统的…

数据灾备及时恢复应急预案

第一节总则 1&#xff0c;灾难备份的目的 为了规范本所重要数据备份清单的建立&#xff0c;备份的职责&#xff0c;备份的检查。以及系统受到破坏后的恢复工作&#xff0c;合理防范计算机及信息系统使用过程中的风险&#xff0c;特制定本预案。 2&#xff0c;灾难恢复的定义 灾…

1.kafka面试题之零拷贝

1. 写在前面 Kafka 是一个高性能的分布式消息系统&#xff0c;它使用了多种优化技术来提高数据传输效率&#xff0c;其中之一就是 “零拷贝”&#xff08;Zero Copy&#xff09;。零拷贝技术可以显著减少数据在内存中的复制次数&#xff0c;从而提高 I/O 操作的效率&#xff0…

模拟栈解决表达式求值-java

主要讲述了通过栈来解决后缀表达式&#xff0c;来计算出表达式的结果&#xff0c;可以好好熟悉一下思路。 目录 前言 一、表达式求值问题 二、栈模拟计算表达式 1.算法思路 2.代码解释 三、代码实现 1.代码如下&#xff1a; 2.测试样例如下&#xff1a; 3.运行结果如下…

【轨物推荐】经济长波:创新周期的历史

原创 丑丑姐姐 专利分析可视化 2021年08月01日 21:18 图片来源&#xff1a;Visual Capitalist 在开始本文之前&#xff0c;我们先来学习两个概念&#xff1a; 经济长波&#xff08;Long Waves&#xff09;&#xff0c;亦称“大循环理论”、“康德拉季耶夫周期”。经济长波理论…

redis持久化存储,rdb快照文件,aof文件

redis作为内存数据库&#xff0c;在内存中进行读写操作&#xff0c;将读写操作从毫秒级别降为纳秒级别&#xff0c;得到极大的性能提升&#xff0c;与此同时&#xff0c;作为内存数据库其也有致命缺陷&#xff0c;一旦redis发生意外宕机&#xff0c;那么内存中的数据将全部消失…

智慧医院临床检验管理系统源码(LIS),全套LIS系统源码交付,商业源码,自主版权,支持二次开发

实验室信息系统是集申请、采样、核收、计费、检验、审核、发布、质控、查询、耗材控制等检验科工作为一体的网络管理系统。它的开发和应用将加快检验科管理的统一化、网络化、标准化的进程。一体化设计&#xff0c;与其他系统无缝连接&#xff0c;全程化条码管理。支持危机值管…

如何手动修复DLL丢失?2种手动修复dll文件方法

DLL&#xff08;动态链接库&#xff09;文件是Windows操作系统中非常重要的组成部分&#xff0c;它们包含了程序运行所需的代码和数据。然而&#xff0c;由于各种原因&#xff0c;如系统更新、软件卸载不当或病毒感染&#xff0c;DLL文件有时会丢失或损坏&#xff0c;导致程序无…

Python pyautogui 自动控制 MDK Keil_v5 Pack Installer 的 Packs 安装过程

MDK Keil_v5 安装完成后&#xff0c;会自动进行 Pack Installer 的 Packs 安装&#xff0c;安装过程中首先 install 需要一行行用鼠标点&#xff0c;然后每一行的 Pack 都会出现同意安装或连接超时的弹窗&#xff0c;需要鼠标操作确认。 pyautogui 可以帮助自动控制鼠标完成确…

【C++】关于仿函数Functor 的理解和应用

C中的仿函数&#xff08;Functor&#xff09;&#xff1a;深入理解与应用 仿函数的基本概念仿函数在STL中的应用仿函数的分类STL中的常见仿函数 仿函数的优势结论 在C编程中&#xff0c;仿函数&#xff08;Functor&#xff09;是一种特殊的类&#xff0c;它通过重载函数调用运算…

【RabbitMQ】通配符模式(Topics)

一、基本概念 生产者&#xff08;Producer&#xff09;&#xff1a;发送消息到RabbitMQ交换机的程序。生产者定义消息的路由键&#xff0c;用于标识消息的目的地。交换机&#xff08;Exchange&#xff09;&#xff1a;接收生产者发送的消息&#xff0c;并根据路由键和绑定规则…

IT运维中,如何快速进行故障排查?(以银行APP交易故障为例)

一、事件背景 正值"五一"黄金周旅游高峰期&#xff0c;某城商行的手机APP突然出现大面积交易失败和严重卡顿现象。据初步统计&#xff0c;从上午10点开始APP的交易成功率从正常的99%骤降至75%左右&#xff0c;用户反馈的交易失败投诉量在短短2小时内激增了500%。与此…