9张图深入剖析ConcurrentHashMap

news2024/11/19 11:24:48

前言

在日常的开发中,我们经常使用key-value键值对的HashMap,其使用哈希表实现,用空间换取时间,提升查询性能

但在多线程的并发场景中,HashMap并不是线程安全的

如果想使用线程安全的,可以使用ConcurrentHashMap、HashTable、Collections.synchronizedMap等

但由于后面二者使用synchronized的粒度太大,因此一般不使用,而使用并发包中的ConcurrentHashMap

在ConcurrentHashMap中,使用volatile保证内存可见性,使得读场景下不需要“加锁”保证原子性

在写场景下使用CAS+synchronized,synchronized只锁哈希表某个索引位置上的首节点,相当于细粒度加锁,增大并发性能

本篇文章将从ConcurrentHashMap的使用,读、写、扩容的实现原理,设计思想等方面进行剖析

查看本文前需要了解哈希表、volatile、CAS、synchronized等知识

volatile可以查看这篇文章:5个案例和流程图让你从0到1搞懂volatile关键字

CAS、synchronized可以查看这篇文章:15000字、6个代码案例、5个原理图让你彻底搞懂Synchronized

使用ConcurrentHashMap

ConcurrentHashMap是并发场景下线程安全的Map,可以在并发场景下查询存储K、V键值对

不可变对象是绝对线程安全的,无论外界如何使用,都线程安全

ConcurrentHashMap并不是绝对线程安全的,只提供方法的线程安全,如果在外层使用错误依旧会导致线程不安全

来看下面的案例,使用value存储自增调用次数,开启10个线程每个执行100次,最终结果应该是1000次,但错误的使用导致不足1000


    public void test() {
//        Map<String, Integer> map = new HashMap(16);
        Map<String, Integer> map = new ConcurrentHashMap(16);

        String key = "key";
        CountDownLatch countDownLatch = new CountDownLatch(10);


        for (int i = 0; i < 10; i++) {
            new Thread(() -> {
                for (int j = 0; j < 100; j++) {
                    incr(map, key);
//                    incrCompute(map, key);
                }
                countDownLatch.countDown();
            }).start();
        }

        try {
            //阻塞到线程跑完
            countDownLatch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        //1000不到
        System.out.println(map.get(key));
    }

    private void incr(Map<String, Integer> map, String key) {
        map.put(key, map.getOrDefault(key, 0) + 1);
    }

在自增方法incr中,先进行读操作,再计算,最后进行写操作,这种复合操作没有保证原子性,导致最终所有结果累加一定不为1000

正确的使用方式是使用JDK8提供的默认方法compute

ConcurrentHashMap实现compute的原理是在put中使用同步手段后再进行计算

    private void incrCompute(Map<String, Integer> map, String key) {
        map.compute(key, (k, v) -> Objects.isNull(v) ? 1 : v + 1);
    }

数据结构

与HashMap类似,使用哈希表+链表/红黑树实现

哈希表

哈希表的实现由数组构成,当发生哈希冲突(哈希算法得到同一索引)时使用链地址法构建成链表

image.png

当链表上的节点太长,遍历寻找开销大,超过阈值时(链表节点超过8个、哈希表长度大于64),树化成红黑树减少遍历寻找开销,时间复杂度从O(n)优化为(log n)

image.png

ConcurrentHashMap由Node数组构成,在扩容时会存在新旧两个哈希表:table、nextTable

public class ConcurrentHashMap<K,V> extends AbstractMap<K,V>
    implements ConcurrentMap<K,V>, Serializable {    
    //哈希表 node数组
    transient volatile Node<K,V>[] table;

    //扩容时为了兼容读写,会存在两个哈希表,这个是新哈希表
    private transient volatile Node<K,V>[] nextTable;

    // 默认为 0
    // 当初始化时, 为 -1
    // 当扩容时, 为 -(1 + 扩容线程数)
    // 当初始化或扩容完成后,为 下一次的扩容的阈值大小
    private transient volatile int sizeCtl;

    //扩容时 用于指定迁移区间的下标
    private transient volatile int transferIndex;

    //统计每个哈希槽中的元素数量
    private transient volatile CounterCell[] counterCells;
}
节点

Node用于实现哈希表数组的节点和发生哈希冲突时,构建成链表的节点

//实现哈希表的节点,数组和链表时使用
static class Node<K,V> implements Map.Entry<K,V> {
    //节点哈希值
    final int hash;
    final K key;
    volatile V val;
    //作为链表时的 后续指针 
    volatile Node<K,V> next;        
}

// 扩容时如果某个 bin 迁移完毕, 用 ForwardingNode 作为旧 table bin 的头结点
static final class ForwardingNode<K,V> extends Node<K,V> {}

// 用在 compute 以及 computeIfAbsent 时, 用来占位, 计算完成后替换为普通 Node
static final class ReservationNode<K,V> extends Node<K,V> {}

// 作为 treebin 的头节点, 存储 root 和 first
static final class TreeBin<K,V> extends Node<K,V> {}

// 作为 treebin 的节点, 存储 parent, left, right
static final class TreeNode<K,V> extends Node<K,V> {}

节点哈希值

//转发节点
static final int MOVED     = -1;
//红黑树在数组中的节点
static final int TREEBIN   = -2;
//占位节点
static final int RESERVED  = -3;

转发节点:继承Node,用于扩容时设置在旧哈希表某索引的首节点,遇到转发节点要去新哈希表中寻找

static final class ForwardingNode<K,V> extends Node<K,V> {
        //新哈希表
        final Node<K,V>[] nextTable;

        ForwardingNode(Node<K,V>[] tab) {
            //哈希值设置为-1
            super(MOVED, null, null, null);
            this.nextTable = tab;
        }
}

红黑树在数组中的节点 TreeBin:继承Node,first指向红黑树的首节点

static final class TreeBin<K,V> extends Node<K,V> {
        TreeNode<K,V> root;
        //红黑树首节点
        volatile TreeNode<K,V> first;
}    

image.png

红黑树节点TreeNode

static final class TreeNode<K,V> extends Node<K,V> {
        TreeNode<K,V> parent;  
        TreeNode<K,V> left;
        TreeNode<K,V> right;
        TreeNode<K,V> prev; 
        boolean red;
}

占位节点:继承Node,需要计算时(使用computer方法),先使用占位节点占位,计算完再构建节点取代占位节点

    static final class ReservationNode<K,V> extends Node<K,V> {
        ReservationNode() {
            super(RESERVED, null, null, null);
        }

        Node<K,V> find(int h, Object k) {
            return null;
        }
    }

实现原理

构造

在构造时会检查入参,然后根据需要存储的数据容量、负载因子计算哈希表容量,最后将哈希表容量调整成2次幂

构造时并不会初始化,而是等到使用再进行创建(懒加载)

    public ConcurrentHashMap(int initialCapacity,
                             float loadFactor, int concurrencyLevel) {
        //检查负载因子、初始容量
        if (!(loadFactor > 0.0f) || initialCapacity < 0 || concurrencyLevel <= 0)
            throw new IllegalArgumentException();

        //concurrencyLevel:1
        if (initialCapacity < concurrencyLevel)   // Use at least as many bins
            initialCapacity = concurrencyLevel;   // as estimated threads
        //计算大小 = 容量/负载因子 向上取整
        long size = (long)(1.0 + (long)initialCapacity / loadFactor);
        //如果超过最大值就使用最大值 
        //tableSizeFor 将大小调整为2次幂
        int cap = (size >= (long)MAXIMUM_CAPACITY) ?
            MAXIMUM_CAPACITY : tableSizeFor((int)size);

        //设置容量
        this.sizeCtl = cap;
    }
读-get

读场景使用volatile保证可见性,即使其他线程修改也是可见的,不用使用其他手段保证同步

读操作需要在哈希表中寻找元素,经过扰动算法打乱哈希值,再使用哈希值通过哈希算法得到索引,根据索引上的首节点分为多种情况处理

  1. 扰动算法将哈希值充分打乱(避免造成太多的哈希冲突),符号位&0保证结果正数

    int h = spread(key.hashCode())

    扰动算法:哈希值高低16位异或运算

    经过扰动算法后,&HASH_BITS = 0x7fffffff (011111...),符号位为0保证结果为正数

    负数的哈希值表示特殊的作用,比如:转发节点、树的首节点、占位节点等

        static final int spread(int h) {
            return (h ^ (h >>> 16)) & HASH_BITS;
        }
  2. 使用打乱的哈希值经过哈希算法得到数组中的索引(下标)

    n 为哈希表长度:(n = tab.length)

    (e = tabAt(tab, (n - 1) & h)

    h为计算后的哈希值,哈希值%(哈希表长度-1) 就能求出索引位置

    为了性能提升,规定哈希表长度为2的n次幂,哈希表长度二进制一定是1000....,而(n-1)的二进制一定是0111...

    因此(n - 1) & h计算索引,进行与运算的结果一定在0~n-1之间 使用位运算提升性能

  3. 得到数组上的节点后,需要进行比较

    找到哈希表上的首个节点后,进行比较key 查看是否是当前节点

    比较规则:先对哈希值进行比较,如果对象哈希值相同,那么可能是同一个对象,还需要比较key(==与equals),如果哈希值都不相同,那么肯定不是同一个对象

    先比较哈希值的好处就是提升查找性能,如果直接使用equals 可能时间复杂度会上升(比如String的equals)

  4. 使用链地址法解决哈希冲突,因此找到节点后可能遍历链表或树;由于哈希表存在扩容,也可能要去新节点上寻找

    4.1 首节点比较成功,直接返回

    4.2 首节点哈希值为负,说明该节点是特殊情况的:转发节点、树的首节点 、计算的预订占位节点

    • 如果是转发节点,正在扩容则去新数组上找
    • 如果是TreeBin则去红黑树中寻找
    • 如果是占位节点 直接返回空

    4.3 遍历该链表依次比较

get代码

public V get(Object key) {
    Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
    //1.spread:扰动算法 + 让key的哈希值不能为负数,因为负数哈希值代表红黑树或ForwardingNode
    int h = spread(key.hashCode());
    //2.(n - 1) & h:下标、索引 实际上就是数组长度模哈希值 位运算效率更高
    //e:哈希表中对应索引位置上的节点
    if ((tab = table) != null && (n = tab.length) > 0 &&
        (e = tabAt(tab, (n - 1) & h)) != null) {
        //3.如果哈希值相等,说明可能找到,再比较key
        if ((eh = e.hash) == h) {
            //4.1 key相等说明找到 返回
            if ((ek = e.key) == key || (ek != null && key.equals(ek)))
                return e.val;
        }
        //4.2 首节点哈希值为负,说明该节点是转发节点,当前正在扩容则去新数组上找
        else if (eh < 0)
            return (p = e.find(h, key)) != null ? p.val : null;

        //4.3 遍历该链表,能找到就返回值,不能返回null
        while ((e = e.next) != null) {
            if (e.hash == h &&
                ((ek = e.key) == key || (ek != null && key.equals(ek))))
                return e.val;
        }
    }
    return null;
}
写-put

添加元素时,使用CAS+synchronized(只锁住哈希表中某个首节点)的同步方式保证原子性

  1. 获取哈希值:扰动算法+确保哈希值为正数
  2. 哈希表为空,CAS保证一个线程初始化
    private final Node<K,V>[] initTable() {
        Node<K,V>[] tab; int sc;
        while ((tab = table) == null || tab.length == 0) {
            //小于0 说明其他线程在初始化 让出CPU时间片 后续初始化完退出
            if ((sc = sizeCtl) < 0)
                Thread.yield(); 
            else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
                //CAS将SIZECTL设置成-1 (表示有线程在初始化)成功后 初始化
                try {
                    if ((tab = table) == null || tab.length == 0) {
                        int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
                        @SuppressWarnings("unchecked")
                        Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
                        table = tab = nt;
                        sc = n - (n >>> 2);
                    }
                } finally {
                    sizeCtl = sc;
                }
                break;
            }
        }
        return tab;
    }
  1. 将哈希值通过哈希算法获取索引上的节点 f = tabAt(tab, i = (n - 1) & hash)

  2. 根据不同情况进行处理

    • 4.1 首节点为空时,直接CAS往索引位置添加节点 casTabAt(tab, i, null,new Node<K,V>(hash, key, value, null))

      image.png

    • 4.2 首节点hash为MOVED -1时,表示该节点是转发节点,说明正在扩容,帮助扩容

    • 4.3 首节点加锁

      • 4.3.1 遍历链表寻找并添加/覆盖

        image.png

      • 4.3.2 遍历树寻找并添加/覆盖

  3. addCount统计每个节点上的数据,并检查扩容

put代码

//onlyIfAbsent为true时,如果原来有k,v则这次不会覆盖
final V putVal(K key, V value, boolean onlyIfAbsent) {
    if (key == null || value == null) throw new NullPointerException();
    //1.获取哈希值:扰动算法+确保哈希值为正数
    int hash = spread(key.hashCode());
    int binCount = 0;
    //乐观锁思想 CSA+失败重试
    for (Node<K,V>[] tab = table;;) {
        Node<K,V> f; int n, i, fh;
        //2.哈希表为空 CAS保证只有一个线程初始化
        if (tab == null || (n = tab.length) == 0)
            tab = initTable();
        //3. 哈希算法求得索引找到索引上的首节点
        //4.1 节点为空时,直接CAS构建节点
        else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
            if (casTabAt(tab, i, null,
                         new Node<K,V>(hash, key, value, null)))
                break;                   // no lock when adding to empty bin
        }
        //4.2 索引首节点hash 为MOVED 说明该节点是转发节点,当前正在扩容,去帮助扩容
        else if ((fh = f.hash) == MOVED)
            tab = helpTransfer(tab, f);
        else {
            V oldVal = null;
            //4.3 首节点 加锁
            synchronized (f) {
                //首节点没变
                if (tabAt(tab, i) == f) {
                    //首节点哈希值大于等于0 说明节点是链表上的节点  
                    //4.3.1 遍历链表寻找然后添加/覆盖
                    if (fh >= 0) {
                        //记录链表上有几个节点
                        binCount = 1;
                        //遍历链表找到则替换,如果遍历完了还没找到就添加(尾插)
                        for (Node<K,V> e = f;; ++binCount) {
                            K ek;
                            //替换
                            if (e.hash == hash &&
                                ((ek = e.key) == key ||
                                 (ek != null && key.equals(ek)))) {
                                oldVal = e.val;
                                //onlyIfAbsent为false允许覆盖(使用xxIfAbsent方法时,有值就不覆盖)
                                if (!onlyIfAbsent)
                                    e.val = value;
                                break;
                            }
                            Node<K,V> pred = e;
                            //添加
                            if ((e = e.next) == null) {
                                pred.next = new Node<K,V>(hash, key,
                                                          value, null);
                                break;
                            }
                        }
                    }
                    //如果是红黑树首节点,则找到对应节点再覆盖
                    //4.3.2 遍历树寻找然后添加/覆盖
                    else if (f instanceof TreeBin) {
                        Node<K,V> p;
                        binCount = 2;
                        //如果是添加返回null,返回不是null则出来添加
                        if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
                                                       value)) != null) {
                            oldVal = p.val;
                            //覆盖
                            if (!onlyIfAbsent)
                                p.val = value;
                        }
                    }
                }
            }

            if (binCount != 0) {
                if (binCount >= TREEIFY_THRESHOLD)
                    //链表上的节点超过TREEIFY_THRESHOLD 8个(不算首节点) 并且 数组长度超过64才树化,否则扩容
                    treeifyBin(tab, i);
                if (oldVal != null)
                    return oldVal;
                break;
            }
        }
    }
    //5.添加计数,用于统计元素(添加节点的情况)
    addCount(1L, binCount);
    return null;
}
扩容

为了避免频繁发生哈希冲突,当哈希表中的元素数量 / 哈希表长度 超过负载因子时,进行扩容(增大哈希表的长度)

一般来说扩容都是增大哈希表长度的2倍,比如从32到64保证长度是2次幂;如果扩容长度达到整型上限则使用整型最大值

当发生扩容时,需要将数组中每个槽里的链表或树迁移到新数组中

如果处理器是多核,那么这个迁移的操作并不是一个线程单独完成的,而是会让其他线程也来帮助迁移

在迁移时让每个线程从右往左的每次迁移多个槽,迁移完再判断是否全部迁移完,如果没迁移完则继续循环迁移

扩容操作主要在transfer方法中,扩容主要在三个场景下:

  1. addCount:添加完节点增加计数检查扩容
  2. helpTransfer:线程put时发现正在迁移,来帮忙扩容
  3. tryPresize:尝试调整容量(批量添加putAll,树化数组长度没超过64时treeifyBin调用)

分为以下3个步骤

  1. 根据CPU核数、哈希表总长度计算每次迁移多少个槽,最小16个

  2. 新哈希表为空,说明是初始化

  3. 循环迁移

    • 3.1 分配负责迁移的区间 [bround,i](可能存在多线程同时迁移)

      image.png

    • 3.2 迁移:分为链表迁移、树迁移

      链表迁移

      1. 将链表上的节点充分散列到新哈希表的index、index+旧哈希表长度的两个下标中(与HashMap类似)

      2. 将index位置链表中的节点 (hash & 哈希表长度),结果为0的放到新数组的index位置,结果为1放到新数组index+旧哈希表长度的位置

        image.png

        比如旧哈希表长度为16,在索引3的位置上,16的二进制是10000,hash&16 => hash& 10000 ,也就是说节点哈希值第5位是0就放到新哈希表的3位置上,是1就放到新哈希表的3+16下标

      3. 使用头插法将计算结果为0构建成ln链表,为1构建成hn链表,为方便构建链表,会先寻找lastRun节点:lastRun节点及后续节点都为同一链表上的节点,方便迁移

        构建链表前先构建lastRun,比如图中lastRun e->f ,先将lastRun放到ln链表上,在遍历原始链表,遍历到a :a->e->f,遍历到b:b->a->e->f

      image.png

      1. 每迁移完一个索引位置就将转发节点设置到原哈希表对应位置,当其他线程进行读get操作时,根据转发节点来新哈希表中寻找,进行写put操作时,来帮助扩容(其他区间进行迁移)

        image.png

扩容代码

//tab 旧哈希表
//nextTab 新哈希表
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
        //1.计算每次迁移多少个槽
        //n:哈希表长度(多少个槽)
        int n = tab.length, stride;
        //stride:每次负责迁移多少个槽
        //NCPU: CPU核数
        //如果是多核,每次迁移槽数 = 总槽数无符号右移3位(n/8)再除CPU核数  
        //每次最小迁移槽数 = MIN_TRANSFER_STRIDE = 16
        if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
            stride = MIN_TRANSFER_STRIDE; // subdivide range

        //2.如果新哈希表为空,说明是初始化
        if (nextTab == null) {            // initiating
            try {
                @SuppressWarnings("unchecked")
                Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
                nextTab = nt;
            } catch (Throwable ex) {      // try to cope with OOME
                sizeCtl = Integer.MAX_VALUE;
                return;
            }
            nextTable = nextTab;
            //transferIndex用于记录 每次负责迁移的槽右区间下标,从右往左分配,起始为最右
            transferIndex = n;
        }
        //新哈希表长度
        int nextn = nextTab.length;
        //创建转发节点,转发节点一般设置在旧哈希表首节点,通过转发节点可以找到新哈希表
        ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
        //advance:是否继续循环迁移
        boolean advance = true;
        // 
        boolean finishing = false; // to ensure sweep before committing nextTab
        //3.循环迁移
        for (int i = 0, bound = 0;;) {
            Node<K,V> f; int fh;
            //3.1 分配负责迁移的区间
            //bound为左区间 i为右区间
            while (advance) {
                int nextIndex, nextBound;
                //处理完一个槽 右区间 自减
                if (--i >= bound || finishing)
                    advance = false;
                //transferIndex<=0说明 要迁移的区间全分配完
                else if ((nextIndex = transferIndex) <= 0) {
                    i = -1;
                    advance = false;
                }
                //CAS设置本次迁移的区间,防止多线程分到相同区间
                else if (U.compareAndSwapInt
                         (this, TRANSFERINDEX, nextIndex,
                          nextBound = (nextIndex > stride ?
                                       nextIndex - stride : 0))) {
                    bound = nextBound;
                    i = nextIndex - 1;
                    advance = false;
                }
            }

            //3.2 迁移

            //3.2.1 如果右区间i不再范围,说明迁移完
            if (i < 0 || i >= n || i + n >= nextn) {
                int sc;
                //如果完成迁移,设置哈希表、数量
                if (finishing) {
                    nextTable = null;
                    table = nextTab;
                    sizeCtl = (n << 1) - (n >>> 1);
                    return;
                }
                //CAS 将sizeCtl数量-1 表示 一个线程迁移完成 
                if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
                    //如果不是最后一条线程直接返回
                    if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
                        return;
                    //是最后一条线程设置finishing为true  后面再循环 去设置哈希表、数量等操作
                    finishing = advance = true;
                    i = n; // recheck before commit
                }
            }
            //3.2.2 如果旧哈希表i位置节点为空就CAS设置成转发节点
            else if ((f = tabAt(tab, i)) == null)
                advance = casTabAt(tab, i, null, fwd);
            //3.2.3 如果旧哈希表该位置首节点是转发节点,说明其他线程已处理,重新循环
            else if ((fh = f.hash) == MOVED)
                advance = true; // already processed
            else {
                //3.2.4 对首节点加锁 迁移
                synchronized (f) {
                    if (tabAt(tab, i) == f) {
                        Node<K,V> ln, hn;
                        //3.2.4.1 链表迁移
                        //首节点哈希值大于等于0 说明 是链表节点
                        if (fh >= 0) {
                            int runBit = fh & n;
                            Node<K,V> lastRun = f;
                            //寻找lastRun节点 
                            for (Node<K,V> p = f.next; p != null; p = p.next) {
                                int b = p.hash & n;
                                if (b != runBit) {
                                    runBit = b;
                                    lastRun = p;
                                }
                            }
                            //如果最后一次计算值是0
                            //lastRun节点以及后续节点计算值都是0构建成ln链表 否则 都是1构建成hn链表
                            if (runBit == 0) {
                                ln = lastRun;
                                hn = null;
                            }
                            else {
                                hn = lastRun;
                                ln = null;
                            }

                            //遍历构建ln、hn链表 (头插)
                            for (Node<K,V> p = f; p != lastRun; p = p.next) {
                                int ph = p.hash; K pk = p.key; V pv = p.val;
                                //头插:Node构造第四个参数是后继节点
                                if ((ph & n) == 0)
                                    ln = new Node<K,V>(ph, pk, pv, ln);
                                else
                                    hn = new Node<K,V>(ph, pk, pv, hn);
                            }
                            //设置ln链表到i位置
                            setTabAt(nextTab, i, ln);
                            //设置hn链表到i+n位置
                            setTabAt(nextTab, i + n, hn);
                            //设置转发节点
                            setTabAt(tab, i, fwd);
                            advance = true;
                        }
                        //3.2.4.2 树迁移
                        else if (f instanceof TreeBin) {
                            TreeBin<K,V> t = (TreeBin<K,V>)f;
                            TreeNode<K,V> lo = null, loTail = null;
                            TreeNode<K,V> hi = null, hiTail = null;
                            int lc = 0, hc = 0;
                            for (Node<K,V> e = t.first; e != null; e = e.next) {
                                int h = e.hash;
                                TreeNode<K,V> p = new TreeNode<K,V>
                                    (h, e.key, e.val, null, null);
                                if ((h & n) == 0) {
                                    if ((p.prev = loTail) == null)
                                        lo = p;
                                    else
                                        loTail.next = p;
                                    loTail = p;
                                    ++lc;
                                }
                                else {
                                    if ((p.prev = hiTail) == null)
                                        hi = p;
                                    else
                                        hiTail.next = p;
                                    hiTail = p;
                                    ++hc;
                                }
                            }
                            ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
                                (hc != 0) ? new TreeBin<K,V>(lo) : t;
                            hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
                                (lc != 0) ? new TreeBin<K,V>(hi) : t;
                            setTabAt(nextTab, i, ln);
                            setTabAt(nextTab, i + n, hn);
                            setTabAt(tab, i, fwd);
                            advance = true;
                        }
                    }
                }
            }
        }
    }

实现原理并没有对红黑树进行太多描述,一方面是红黑树的概念太多,另一方面是我忘的差不多了(已经老了,不像大学那样可以手写红黑树了)

还有一方面是:我认为只需要知道使用红黑树的好处就足够,而且工作中也不常用,就算死扣红黑树要怎么变色、左旋、右旋去满足红黑树的条件也没什么意义,感兴趣的同学去学习就好了

迭代器

ConcurrentHashMap中的迭代器是弱一致性,在获取时使用记录的哈希表重新构建新对象

Entry迭代器:

public Iterator<Map.Entry<K,V>> iterator() {
    ConcurrentHashMap<K,V> m = map;
    Node<K,V>[] t;
    int f = (t = m.table) == null ? 0 : t.length;
    return new EntryIterator<K,V>(t, f, 0, f, m);
}

key迭代器

public Enumeration<K> keys() {
    Node<K,V>[] t;
    int f = (t = table) == null ? 0 : t.length;
    return new KeyIterator<K,V>(t, f, 0, f, this);
}

value迭代器

public Enumeration<V> elements() {
    Node<K,V>[] t;
    int f = (t = table) == null ? 0 : t.length;
    return new ValueIterator<K,V>(t, f, 0, f, this);
}

总结

ConcurrentHashMap使用哈希表的数据结构,当发生哈希冲突时,使用链地址法解决,将哈希到同一索引的节点构建成链表,当数据量达到一定阈值,会将链表转化为红黑树

ConcurrentHashMap使用volatile修饰存储数据,使得在读场景下对其他线程的修改可见,不需要使用同步机制,使用CAS与synchronzied保证写场景下的原子性

在get查询数据时,先将key的哈希值通过扰动算法(高低16位异或)并保证结果为正数(与上符号位0),再与上哈希表长度-1求出索引值,找到索引后再根据不同情况查找(比较先判断哈希值,相等再判断key)

在put添加/覆盖数据时,也是先通过扰动算法和哈希求出索引位置,在根据不同情况查找,找到则覆盖,找不到则替换

在需要扩容时,会为线程安排需要迁移的槽区间,当其他线程进行put时也会来帮忙迁移,每次线程迁移完一个槽,会设置转发节点到原哈希表中,这样有线程查询就可以通过转发节点来新哈希表中查找,当迁移完所有槽时留一个线程来设置哈希表、数量等

迭代器使用的是弱一致性,在获取迭代器时通过哈希表去构建新的对象

ConcurrentHashMap 只保证相对线程安全,不能保证绝对线程安全,如果需要进行一系列操作时,要正确的去使用

本文由博客一文多发平台 OpenWrite 发布!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1062517.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Swing程序设计(5)绝对布局,流布局

文章目录 前言一、布局管理器二、介绍 1.绝对布局2.流布局总结 前言 Swing窗体中&#xff0c;每一个组件都有大小和具体的位置。而在容器中摆放各种组件时&#xff0c;很难判断其组件的具体位置和大小。即一个完整的界面中&#xff0c;往往有多个组件&#xff0c;那么如何将这…

R函数optim()最小化或者最大化多参数函数

一、optimize()最小化或者最大化单参数函数 1.1函数介绍 函数功能描述&#xff1a;给定一个单参数函数f&#xff0c;需要找到使得f达到其最小值或者最大值的点。 使用optimize()函数最小化单参数函数时&#xff0c;需要指定最小化的函数f及其定义域&#xff08;x的上界和下界…

Seata 源码篇之AT模式启动流程 - 中 - 03

Seata 源码篇之AT模式启动流程 - 中 - 03 数据源代理会话代理锁定查询执行器本地事务提交本地事务回滚 更新执行器删除执行器插入执行器 小节 本系列文章: Seata 源码篇之核心思想 - 01Seata 源码篇之AT模式启动流程 - 上 - 02 数据源代理 当我们的数据源被代理后&#xff0c…

[C]嵌入式中变量存储方案

#include<stdio.h>#define uint8_t unsigned char #define uint16_t unsigned short #define uint24_t unsigned int #define uint32_t unsigned int #define uint64_t unsigned long long//用户自定义变量名字&#xff0c;用于存储 typedef enum {first_run 0,//…

Linux: tcpdump抓包示例

文章目录 1. 前言2. TCP 状态机3. tcpdump 抓包示例3.1 抓连接握手包&#xff1a;三次握手3.2 抓数据包示例3.3 抓终结连接&#xff1a;四次挥手 4. 参考资料 1. 前言 限于作者能力水平&#xff0c;本文可能存在谬误&#xff0c;因此而给读者带来的损失&#xff0c;作者不做任…

【面试HOT100】哈希双指针滑动窗口

系列综述&#xff1a; &#x1f49e;目的&#xff1a;本系列是个人整理为了秋招面试的&#xff0c;整理期间苛求每个知识点&#xff0c;平衡理解简易度与深入程度。 &#x1f970;来源&#xff1a;材料主要源于LeetCodeHot100进行的&#xff0c;每个知识点的修正和深入主要参考…

创建vue3工程

一、新建工程目录E:\vue\projectCode\npm-demo用Visual Studio Code 打开目录 二、点击新建文件夹按钮&#xff0c;新建vue3-01-core文件夹 三、右键vue3-01-core文件夹点击在集成终端中打开 四、初始化项目&#xff0c;输入npm init 一直敲回车直到创建成功如下图 npm init 五…

The directory ‘*‘ or its parent directory is not owned by the current user

python安装编译时出现如下错误 The directory /home/admin/.cache/pip/http or its parent directory is not owned by the current user and the cache has been disabled. Please check the permissions and owner of that directory. If executing pip with sudo, you may …

【办公软件】案例:电路中计算出的电阻值为5欧,怎么通过Excel匹配到仓库里最接近的电阻值?

在实际工作中&#xff0c;比如我们计算出一个电阻值为46欧&#xff0c;那么我们的库里到底是有哪个电阻值最接近呢&#xff1f;可能有一些有经验的工程师会说当然是47欧呀。 但是如果我们计算出来的是80.2欧呢&#xff1f;是不是得去查一下表格看看到底哪个最接近&#xff0c;…

PyTorch入门之【tensor】

目录 tensor的创建tensor的相关信息tensor的运算 tensor的创建 1.手动创建 import torch test1torch.tensor([1,2,3])#一维时为向量 test2torch.tensor([[1,2,3]])#二维时为矩阵 test3torch.tensor([[[1,2,3]]])#三维及以上统称为tensor print(test1) print(test2) print(tes…

C++ 类和对象篇(四) 构造函数

目录 一、概念 1. 构造函数是什么&#xff1f; 2. 为什么C要引入构造函数&#xff1f; 3. 怎么用构造函数&#xff1f; 3.1 创建构造函数 3.2 调用构造函数 二、构造函数的特性 三、构造函数对成员变量初始化 0. 对构造函数和成员变量分类 1. 带参构造函数对成员变量初始化 2. …

云存储解决方案-阿里云OSS

1. 阿里云OSS简介 阿里云对象存储服务&#xff08;Object Storage Service&#xff0c;简称OSS&#xff09;为用户提供基于网络的数据存取服务。使用OSS&#xff0c;用户可以通过网络随时存储和调用包括文本、图片、音频和视频等在内的各种非结构化数据文件。 阿里云OSS将数据…

练[BJDCTF2020]Easy MD5

[BJDCTF2020]Easy MD5 文章目录 [BJDCTF2020]Easy MD5掌握知识解题思路关键paylaod 掌握知识 ​ 强等于和弱等于的MD5绕过&#xff0c;数据库查询的MD5加密绕过&#xff0c;代码审计 解题思路 打开题目链接&#xff0c;发现是一个post提交框&#xff0c;提交完了也就是url发…

自然语言处理 | WordNet

WordNet是词汇数据库,即英语词典,专为自然语言处理而设计。 Synset是一种特殊的简单接口,存在于 NLTK 中, 用于在 WordNet 中查找单词。同义词集实例是表达相同概念的同义词的分组。有些单词只有一个同义词集,有些则有多个。

【kubernetes】使用helm部署redis

1 什么是helm 在学习使用k8s进行应用的部署时&#xff0c;或者从github上下载一些组件进行部署时&#xff0c;通常是直接用yaml的方式部署&#xff0c;用这种方式部署时&#xff0c;有个比较大的问题是&#xff0c;当参数需要调整时&#xff0c;就需要阅读整个yaml文件&#x…

UG\NX CAM二次开发 加工模块获取 UF _ask_application_module

文章作者:代工 来源网站:NX CAM二次开发专栏 简介: UG\NX CAM二次开发 加工模块获取 UF _ask_application_module 代码: void MyClass::do_it() { // TODO: add your code here // 获取NX当前所在的模块 int module_id = 0; // UF_ask_application_module(&…

Android改造CardView为圆形View,Kotlin

Android改造CardView为圆形View&#xff0c;Kotlin 可以利用androidx.cardview.widget.CardView的cardCornerRadius特性&#xff0c;将CardView改造成一个圆形的View&#xff0c;技术实现的关键首先设定CardView为一个宽高相等的View&#xff08;正方形&#xff09;&#xff0c…

在PHP8中使用instanceof操作符检测对象类型-PHP8知识详解

在PHP8中使用instanceof操作符可以检测当前对象属于哪个类。语法格式如下&#xff1a; objectName instanceof classname下面我们用一个实例来讲解使用instanceof操作符检测对象类型。 本实例将将创建3个类&#xff0c;其中有两个类是父类和子类的关系&#xff0c;然后实例化…

时序预测 | MATLAB实现EMD-iCHOA+GRU基于经验模态分解-改进黑猩猩算法优化门控循环单元的时间序列预测

时序预测 | MATLAB实现EMD-iCHOAGRU基于经验模态分解-改进黑猩猩算法优化门控循环单元的时间序列预测 目录 时序预测 | MATLAB实现EMD-iCHOAGRU基于经验模态分解-改进黑猩猩算法优化门控循环单元的时间序列预测预测效果基本介绍程序设计参考资料 预测效果 基本介绍 EMD-iCHOAGR…

第一百六十四回 如何实现NumberPicker

文章目录 1.概念介绍2.使用方法2.1 NumberPicker2.2 CupertinoPicker 3.示例代码4.内容总结 我们在上一章回中介绍了"如何在任意位置显示PopupMenu"相关的内容&#xff0c;本章回中将介绍如何实现NumberPicker.闲话休提&#xff0c;让我们一起Talk Flutter吧。 1.概…