继承体系
HashMap是我们用得非常频繁的一个集合,但是由于它是非线程安全的,在多线程环境下,put操作是有可能产生死循环的,导致CPU利用率接近100%。为了解决该问题,提供了Hashtable和Collections.synchronizedMap(hashMap) 两种解决方案,但是这两种方案都是对读写加锁,独占式,一个线程在读时其他线程必须等待,吞吐量较低,性能较为低下。因此ConcurrentHashMap诞生了。
ConcurrentHashMap继承自AbstractMap,具有map的特征,实现ConcurrentMap接口。ConcurrentHashMap不支持key是null的,因此ConcurrentMap重写了很多Map接口的方法,变更了部分逻辑。对比HashMap,ConcurrentHashMap实现了并发安全。在1.8版本以前,ConcurrentHashMap采用分段锁的概念,使锁更加细化,但是1.8已经改变了这种思路,而是利用CAS+Synchronized来保证并发更新的安全,其底层采用数组+链表+红黑树的存储结构。
重要变量
ConcurrentHashMap定义了如下几个常量:
// 最大容量:2^30=1073741824
private static final int MAXIMUM_CAPACITY = 1 << 30;
// 默认初始值,必须是2的幕数
private static final int DEFAULT_CAPACITY = 16;
// 最大桶数量
static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;
// 表示默认并发级别
private static final int DEFAULT_CONCURRENCY_LEVEL = 16;
// 加载因子
private static final float LOAD_FACTOR = 0.75f;
// 链表转红黑树阀值,> 8 链表转换为红黑树
static final int TREEIFY_THRESHOLD = 8;
//树转链表阀值,小于等于6(tranfer时,lc、hc=0两个计数器分别++记录原bin、新binTreeNode数量,<=UNTREEIFY_THRESHOLD 则untreeify(lo))
static final int UNTREEIFY_THRESHOLD = 6;
// 最小树化数量
static final int MIN_TREEIFY_CAPACITY = 64;
// 进行一次transfer操作中一个工作线程的最小任务量
private static final int MIN_TRANSFER_STRIDE = 16;
// 用于生成在每次扩容中都唯一的生成戳
private static int RESIZE_STAMP_BITS = 16;
// 2^15-1,help resize的最大线程数
private static final int MAX_RESIZERS = (1 << (32 - RESIZE_STAMP_BITS)) - 1;
// 32-16=16,sizeCtl中记录size大小的偏移量
private static final int RESIZE_STAMP_SHIFT = 32 - RESIZE_STAMP_BITS;
// forwarding nodes的hash值
static final int MOVED = -1;
// 树根节点的hash值
static final int TREEBIN = -2;
// ReservationNode的hash值
static final int RESERVED = -3;
// 可用处理器数量
static final int NCPU = Runtime.getRuntime().availableProcessors();
上面是ConcurrentHashMap定义的常量,下面介绍ConcurrentHashMap几个很重要的概念。
- table:用来存放Node节点数据的,默认为null,默认大小为16的数组,每次扩容时大小总是2的幂次方;
- nextTable:扩容时新生成的数据,数组为table的两倍;
- Node:节点,保存key-value的数据结构;
- ForwardingNode:一个特殊的Node节点,hash值为-1,其中存储nextTable的引用。只有table发生扩容的时候,ForwardingNode才会发挥作用,作为一个占位符放在table中表示当前节点为null或则已经被移动
- sizeCtl :控制标识符,用来控制table初始化和扩容操作的,在不同的地方有不同的用途,其值也不同,所代表的含义也不同
- 负数代表正在进行初始化或扩容操作
- -1代表正在初始化
- -N 表示有N-1个线程正在进行扩容操作
- 正数或0代表hash表还没有被初始化,这个数值表示初始化或下一次进行扩容的大小
重要内部类
ConcurrentHashMap内部存在非常多的内部类,这里我们仅列举几个较为重要的内部类。
Node
作为ConcurrentHashMap中最核心、最重要的内部类,Node担负着重要角色:key-value键值对。所有插入ConCurrentHashMap的中数据都将会包装在Node中。定义如下:
static class Node<K,V> implements Map.Entry<K,V> {
final int hash;
final K key;
volatile V val; //带有volatile,保证可见性
volatile Node<K,V> next; //下一个节点的指针
Node(int hash, K key, V val, Node<K,V> next) {
this.hash = hash;
this.key = key;
this.val = val;
this.next = next;
}
public final K getKey() { return key; }
public final V getValue() { return val; }
public final int hashCode() { return key.hashCode() ^ val.hashCode(); }
public final String toString(){ return key + "=" + val; }
/** 不允许修改value的值 */
public final V setValue(V value) {
throw new UnsupportedOperationException();
}
public final boolean equals(Object o) {
Object k, v, u; Map.Entry<?,?> e;
return ((o instanceof Map.Entry) &&
(k = (e = (Map.Entry<?,?>)o).getKey()) != null &&
(v = e.getValue()) != null &&
(k == key || k.equals(key)) &&
(v == (u = val) || v.equals(u)));
}
/** 赋值get()方法 */
Node<K,V> find(int h, Object k) {
Node<K,V> e = this;
if (k != null) {
do {
K ek;
if (e.hash == h &&
((ek = e.key) == k || (ek != null && k.equals(ek))))
return e;
} while ((e = e.next) != null);
}
return null;
}
}
在Node内部类中,其属性value、next都是带有volatile的。同时其对value的setter方法进行了特殊处理,不允许直接调用其setter方法来修改value的值。最后Node还提供了find方法来赋值map.get()。
TreeNode
在ConcurrentHashMap中,如果链表的数据过长是会转换为红黑树来处理。红黑树的节点就是TreeNode:
static final class TreeNode<K,V> extends Node<K,V> {
TreeNode<K,V> parent; // red-black tree links
TreeNode<K,V> left;
TreeNode<K,V> right;
TreeNode<K,V> prev; // needed to unlink next upon deletion
boolean red;
TreeNode(int hash, K key, V val, Node<K,V> next,
TreeNode<K,V> parent) {
super(hash, key, val, next);
this.parent = parent;
}
Node<K,V> find(int h, Object k) {
return findTreeNode(h, k, null);
}
//查找hash为h,key为k的节点
final TreeNode<K,V> findTreeNode(int h, Object k, Class<?> kc) {
if (k != null) {
TreeNode<K,V> p = this;
do {
int ph, dir; K pk; TreeNode<K,V> q;
TreeNode<K,V> pl = p.left, pr = p.right;
if ((ph = p.hash) > h)
p = pl;
else if (ph < h)
p = pr;
else if ((pk = p.key) == k || (pk != null && k.equals(pk)))
return p;
else if (pl == null)
p = pr;
else if (pr == null)
p = pl;
else if ((kc != null ||
(kc = comparableClassFor(k)) != null) &&
(dir = compareComparables(kc, k, pk)) != 0)
p = (dir < 0) ? pl : pr;
else if ((q = pr.findTreeNode(h, k, kc)) != null)
return q;
else
p = pl;
} while (p != null);
}
return null;
}
}
TreeBin
该类并不负责key-value的键值对包装,它用于在链表转换为红黑树时包装TreeNode节点,也就是说ConcurrentHashMap红黑树存放是TreeBin,不是TreeNode。该类封装了一系列的方法,包括putTreeVal、lookRoot、UNlookRoot、remove、balanceInsetion、balanceDeletion。由于TreeBin的代码太长我们这里只展示构造方法(构造方法就是构造红黑树的过程):
static final class TreeBin<K,V> extends Node<K,V> {
TreeNode<K, V> root;
volatile TreeNode<K, V> first;
volatile Thread waiter;
volatile int lockState;
static final int WRITER = 1; // set while holding write lock
static final int WAITER = 2; // set when waiting for write lock
static final int READER = 4; // increment value for setting read lock
TreeBin(TreeNode<K, V> b) {
super(TREEBIN, null, null, null);
this.first = b;
TreeNode<K, V> r = null;
for (TreeNode<K, V> x = b, next; x != null; x = next) {
next = (TreeNode<K, V>) x.next;
x.left = x.right = null;
if (r == null) {
x.parent = null;
x.red = false;
r = x;
} else {
K k = x.key;
int h = x.hash;
Class<?> kc = null;
for (TreeNode<K, V> p = r; ; ) {
int dir, ph;
K pk = p.key;
if ((ph = p.hash) > h)
dir = -1;
else if (ph < h)
dir = 1;
else if ((kc == null &&
(kc = comparableClassFor(k)) == null) ||
(dir = compareComparables(kc, k, pk)) == 0)
dir = tieBreakOrder(k, pk);
TreeNode<K, V> xp = p;
if ((p = (dir <= 0) ? p.left : p.right) == null) {
x.parent = xp;
if (dir <= 0)
xp.left = x;
else
xp.right = x;
r = balanceInsertion(r, x);
break;
}
}
}
}
this.root = r;
assert checkInvariants(root);
}
...
}
通过构造方法是不是发现了部分端倪,构造方法就是在构造一个红黑树的过程。
ForwardingNode
ForwardingNode仅仅只存活在ConcurrentHashMap扩容操作时。只是一个标志节点,并且指向nextTable,它提供find方法而已。该类也是集成Node节点,其hash为-1,key、value、next均为null。如下:
static final class ForwardingNode<K,V> extends Node<K,V> {
final Node<K,V>[] nextTable;
ForwardingNode(Node<K,V>[] tab) {
super(MOVED, null, null, null);
this.nextTable = tab;
}
Node<K,V> find(int h, Object k) {
// loop to avoid arbitrarily deep recursion on forwarding nodes
outer: for (Node<K,V>[] tab = nextTable;;) {
Node<K,V> e; int n;
if (k == null || tab == null || (n = tab.length) == 0 ||
(e = tabAt(tab, (n - 1) & h)) == null)
return null;
for (;;) {
int eh; K ek;
if ((eh = e.hash) == h &&
((ek = e.key) == k || (ek != null && k.equals(ek))))
return e;
if (eh < 0) {
if (e instanceof ForwardingNode) {
tab = ((ForwardingNode<K,V>)e).nextTable;
continue outer;
}
else
return e.find(h, k);
}
if ((e = e.next) == null)
return null;
}
}
}
}
构造函数
public ConcurrentHashMap() {}
public ConcurrentHashMap(int initialCapacity) {
if (initialCapacity < 0)
throw new IllegalArgumentException();
int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ?
MAXIMUM_CAPACITY :
tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1));
this.sizeCtl = cap;
}
public ConcurrentHashMap(Map<? extends K, ? extends V> m) {
this.sizeCtl = DEFAULT_CAPACITY;
putAll(m);
}
public ConcurrentHashMap(int initialCapacity, float loadFactor) {
this(initialCapacity, loadFactor, 1);
}
public ConcurrentHashMap(int initialCapacity,
float loadFactor, int concurrencyLevel) {
if (!(loadFactor > 0.0f) || initialCapacity < 0 || concurrencyLevel <= 0)
throw new IllegalArgumentException();
if (initialCapacity < concurrencyLevel) // Use at least as many bins
initialCapacity = concurrencyLevel; // as estimated threads
long size = (long)(1.0 + (long)initialCapacity / loadFactor);
int cap = (size >= (long)MAXIMUM_CAPACITY) ?
MAXIMUM_CAPACITY : tableSizeFor((int)size);
this.sizeCtl = cap;
}
构造函数基本上和HashMap一致,但是在ConcurrentHashMap中使用sizeCtl代替hashmap中的threshold和loadFactor来控制扩容。而sizeCtl给的解释有:
(1)-1,表示有线程正在进行初始化操作
(2)-(1 + nThreads),表示有n个线程正在一起扩容
(3)0,默认值,后续在真正初始化的时候使用默认容量
(4)> 0,初始化或扩容完成后下一次的扩容门槛
重要方法
新增键值对
public V put(K key, V value) {
return putVal(key, value, false);
}
final V putVal(K key, V value, boolean onlyIfAbsent) {
// 检验是否为空, 需要注意 ConcurrentHashMap的key、value都不允许为null
if (key == null || value == null) throw new NullPointerException();
int hash = spread(key.hashCode()); // 计算hash值
int binCount = 0; // 要插入的的桶位置上的元素个数,会在循环内部进行更新,用来判断是否需要
// 自旋,失败就重新来
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;
if (tab == null || (n = tab.length) == 0)
// 如果链表还没有初始化,是空的情况,则进行初始化
tab = initTable();
// 计算i对应的桶的位置,并对f赋值,如果获得的元素是null
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
// 使用cas对节点进行赋值,如果失败则意味着存在并发则重新进入循环
if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value, null)))
break;
}
// 如果要插入的元素所在的桶的第一个元素的hash是MOVED,则当前线程帮忙一起迁移元素
else if ((fh = f.hash) == MOVED)
// 线程添加元素时发现正在扩容且当前元素所在的桶元素已经迁移完成了,则协助迁移其它桶的元素。
tab = helpTransfer(tab, f);
// 没有发生奇奇怪怪的事情情况下,开始针对这个桶本身进行遍历插入
else {
// 如果找到同样的key的情况下,可能会被替换,这个代表被替换之后的值
V oldVal = null;
// 锁住桶
synchronized (f) {
// 再次检测第一个元素是否有变化,如果有变化则进入下一次循环,从头来过
if (tabAt(tab, i) == f) {
// 如果第一个元素的hash值大于等于0(说明不是在迁移,也不是树)
// 那就是桶中的元素使用的是链表方式存储
if (fh >= 0) {
binCount = 1;
// 常规操作,遍历,插值
for (Node<K,V> e = f;; ++binCount) {
K ek;
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break;
}
Node<K,V> pred = e;
if ((e = e.next) == null) {
pred.next = new Node<K,V>(hash, key,
value, null);
break;
}
}
}
// 如果是树节点
else if (f instanceof TreeBin) {
Node<K,V> p;
binCount = 2;
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}
// 如果binCount不为0,说明成功插入了元素或者寻找到了元素
if (binCount != 0) {
// 如果链表元素个数达到了8,则尝试树化
// 如果元素插入到树中时,binCount只赋值了2,不知道总元素个数,不会重复树化
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i);
// 如果要插入的元素已经存在,则返回旧值
if (oldVal != null)
return oldVal;
// 退出外层大循环,流程结束
break;
}
}
}
// 每次添加元素后,元素数量加1,并判断是否达到扩容门槛,达到了则进行扩容或协助扩容
addCount(1L, binCount);
return null;
}
initTable方法
当put元素的时候,感知到桶没有初始化将会执行initTable方法。
private final Node<K,V>[] initTable() {
Node<K,V>[] tab; int sc;
// table为null或者没有元素
while ((tab = table) == null || tab.length == 0) {
// 如果sizeCtl<0说明正在初始化或者扩容,让出CPU
if ((sc = sizeCtl) < 0)
Thread.yield(); // lost initialization race; just spin
// 主动把sizeCtl原子更新为-1成功,则当前线程进入初始化
// 如果失败则表达已经有现成进入初始化,进入下一次循环后会进入第一个分支,让出CPU
else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
try {
// 再次检查table是否为空
if ((tab = table) == null || tab.length == 0) {
// 如果sc为0则使用默认值16
int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
// 新建数组
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
// 赋值给table桶数组
table = tab = nt;
// 设置sc为数组长度的0.75倍
// n - (n >>> 2) = n - n/4 = 0.75n 扩容门槛为容量的0.75倍
sc = n - (n >>> 2);
}
} finally {
// 把sc赋值给sizeCtl,这时存储的是扩容门槛
sizeCtl = sc;
}
break;
}
}
return tab;
}
tabAt方法
tabAt方法的作用是寻找指定数组在内存中i位置的数据,其内部使用CAS的方式从主内存中获得数据
static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) {
return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE);
}
上述代码中的U是unsafe的实例,存在以下常用的方法:
// 获取obj对象中offset偏移地址对应的object型field的值,支持volatile load语义。
public native Object getObjectVolatile(Object obj, long offset);
//获取数组中第一个元素的偏移量(get offset of a first element in the array)
public native int arrayBaseOffset(java.lang.Class aClass);
//获取数组中一个元素的大小(get size of an element in the array)
public native int arrayIndexScale(java.lang.Class aClass);
上述中ASHIFT与ABASE的解释如下:
Class<?> ak = Node[].class;
// 获取数组的起始位置
ABASE = U.arrayBaseOffset(ak);
// 获取数组的数据的大小,比如byte类型的数据为1,int类型的为4,long为8
int scale = U.arrayIndexScale(ak);
if ((scale & (scale - 1)) != 0)
throw new Error("data type scale not a power of two");
// Integer.numberOfLeadingZeros(scale) 获得指定数字前面的0的个数
ASHIFT = 31 - Integer.numberOfLeadingZeros(scale);
当我们需要进行数组的随机访问的时候,可以通过寻址地址进行获取,寻址地址的计算公式如下:
// base_address是起始地址,i是数组索引,data_type_size是数组元素的大小长度
a[i]_address = base_address + i * data_type_size
这样一来就有 i << 2 + ABASE
等价于 i * 4 + ABASE
符合寻址地址公式:
Class ak = int[].class;
// 起始位置:16,
int ABASE = unsafe.arrayBaseOffset(ak);
// 数据长度:4,int的长度为4
int scale = unsafe.arrayIndexScale(ak);
long ASHIFT;
// 2 = 31 - 29(4转化为二进制为100,由于是32位的,所以前面有29位0补位)
ASHIFT = 31 - Integer.numberOfLeadingZeros(scale);
helpTransfer方法
helpTransfer()方法为协助扩容方法,当调用该方法的时候,nextTable一定已经创建了,所以该方法主要则是进行复制工作。如下:
final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) {
Node<K,V>[] nextTab; int sc;
// 如果桶数组不为空,并且当前桶第一个元素为ForwardingNode类型,并且nextTab不为空
// 说明当前桶已经迁移完毕了,才去帮忙迁移其它桶的元素
// 扩容时会把旧桶的第一个元素置为ForwardingNode,并让其nextTab指向新桶数组
if (tab != null && (f instanceof ForwardingNode) &&
(nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) {
int rs = resizeStamp(tab.length);
// sizeCtl<0,说明正在扩容
while (nextTab == nextTable && table == tab &&
(sc = sizeCtl) < 0) {
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || transferIndex <= 0)
break;
// 扩容线程数加1
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) {
// 当前线程帮忙迁移元素
transfer(tab, nextTab);
break;
}
}
return nextTab;
}
return table;
}
treeifyBin
在put操作是,如果发现链表结构中的元素超过了TREEIFY_THRESHOLD(默认为8),则会把链表转换为红黑树,已便于提高查询效率。如下:
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i);
调用treeifyBin方法用与将链表转换为红黑树。
private final void treeifyBin(Node<K,V>[] tab, int index) {
Node<K,V> b; int n, sc;
if (tab != null) {
if ((n = tab.length) < MIN_TREEIFY_CAPACITY)//如果table.length<64 就扩大一倍 返回
tryPresize(n << 1);
else if ((b = tabAt(tab, index)) != null && b.hash >= 0) {
synchronized (b) {
if (tabAt(tab, index) == b) {
TreeNode<K,V> hd = null, tl = null;
//构造了一个TreeBin对象 把所有Node节点包装成TreeNode放进去
for (Node<K,V> e = b; e != null; e = e.next) {
TreeNode<K,V> p =
new TreeNode<K,V>(e.hash, e.key, e.val,
null, null);//这里只是利用了TreeNode封装 而没有利用TreeNode的next域和parent域
if ((p.prev = tl) == null)
hd = p;
else
tl.next = p;
tl = p;
}
//在原来index的位置 用TreeBin替换掉原来的Node对象
setTabAt(tab, index, new TreeBin<K,V>(hd));
}
}
}
}
}
从上面源码可以看出,构建红黑树的过程是同步的,进入同步后过程如下:
- 根据table中index位置Node链表,重新生成一个hd为头结点的TreeNode
- 根据hd头结点,生成TreeBin树结构,并用TreeBin替换掉原来的Node对象。
addCount方法
该方法主要做两个工作:1.更新baseCount;2.检测是否需要扩容操作
// 这里使用的思想跟LongAdder类是一模一样的
// 把数组的大小存储根据不同的线程存储到不同的段上
// 有一个baseCount,优先更新baseCount,如果失败了再更新不同线程对应的段
// 这样可以保证尽量小的减少冲突
private final void addCount(long x, int check) {
CounterCell[] as; long b, s;
// 先尝试把数量加到baseCount上,如果失败再加到分段的CounterCell上
if ((as = counterCells) != null ||
!U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
CounterCell a; long v; int m;
boolean uncontended = true;
// 如果as为空
// 或者长度为0
// 或者当前线程所在的段为null
// 或者在当前线程的段上加数量失败
if (as == null || (m = as.length - 1) < 0 ||
(a = as[ThreadLocalRandom.getProbe() & m]) == null ||
!(uncontended =
U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
// 强制增加数量(无论如何数量是一定要加上的,并不是简单地自旋)
// 不同线程对应不同的段都更新失败了
// 说明已经发生冲突了,那么就对counterCells进行扩容
// 以减少多个线程hash到同一个段的概率
// 这里的代码逻辑与LongAdder一致可以切换到该源码分析处
fullAddCount(x, uncontended);
return;
}
if (check <= 1)
return;
// 计算元素个数
s = sumCount();
}
//check >= 0 :则需要进行扩容操作
if (check >= 0) {
Node<K,V>[] tab, nt; int n, sc;
// 如果元素个数达到了扩容门槛,则进行扩容
// 注意,正常情况下sizeCtl存储的是扩容门槛,即容量的0.75倍
while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
(n = tab.length) < MAXIMUM_CAPACITY) {
// rs是扩容时的一个邮戳标识
int rs = resizeStamp(n);
if (sc < 0) {
// sc<0说明正在扩容中
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
transferIndex <= 0)
// 扩容已经完成了,退出循环
// 正常应该只会触发nextTable==null这个条件,其它条件没看出来何时触发
break;
// 扩容未完成,则当前线程加入迁移元素中
// 并把扩容线程数加1
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
transfer(tab, nt);
}
else if (U.compareAndSwapInt(this, SIZECTL, sc,
(rs << RESIZE_STAMP_SHIFT) + 2))
// 这里是触发扩容的那个线程进入的地方
// sizeCtl的高16位存储着rs这个扩容邮戳
// sizeCtl的低16位存储着扩容线程数加1,即(1+nThreads)
// 进入迁移元素
transfer(tab, null);
// 重新计算元素个数
s = sumCount();
}
}
}
- 构建一个nextTable,其大小为原来大小的两倍,这个步骤是在单线程环境下完成的
- 将原来table里面的内容复制到nextTable中,这个步骤是允许多线程操作的,所以性能得到提升,减少了扩容的时间消耗
删除键值对
删除元素跟添加元素一样,都是先找到元素所在的桶,然后采用分段锁的思想锁住整个桶,再进行操作。
public V remove(Object key) {
// 调用替换节点方法
return replaceNode(key, null, null);
}
final V replaceNode(Object key, V value, Object cv) {
// 计算hash
int hash = spread(key.hashCode());
// 自旋
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;
if (tab == null || (n = tab.length) == 0 ||
(f = tabAt(tab, i = (n - 1) & hash)) == null)
// 如果目标key所在的桶不存在,跳出循环返回null
break;
else if ((fh = f.hash) == MOVED)
// 如果正在扩容中,协助扩容
tab = helpTransfer(tab, f);
else {
V oldVal = null;
// 标记是否处理过
boolean validated = false;
synchronized (f) {
// 再次验证当前桶第一个元素是否被修改过
if (tabAt(tab, i) == f) {
if (fh >= 0) {
// fh>=0表示是链表节点
validated = true;
// 遍历链表寻找目标节点
for (Node<K,V> e = f, pred = null;;) {
K ek;
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
// 找到了目标节点
V ev = e.val;
// 检查目标节点旧value是否等于cv
if (cv == null || cv == ev ||
(ev != null && cv.equals(ev))) {
oldVal = ev;
if (value != null)
// 如果value不为空则替换旧值
e.val = value;
else if (pred != null)
// 如果前置节点不为空
// 删除当前节点
pred.next = e.next;
else
// 如果前置节点为空
// 说明是桶中第一个元素,删除之
setTabAt(tab, i, e.next);
}
break;
}
pred = e;
// 遍历到链表尾部还没找到元素,跳出循环
if ((e = e.next) == null)
break;
}
}
else if (f instanceof TreeBin) {
// 如果是树节点
validated = true;
TreeBin<K,V> t = (TreeBin<K,V>)f;
TreeNode<K,V> r, p;
// 遍历树找到了目标节点
if ((r = t.root) != null &&
(p = r.findTreeNode(hash, key, null)) != null) {
V pv = p.val;
// 检查目标节点旧value是否等于cv
if (cv == null || cv == pv ||
(pv != null && cv.equals(pv))) {
oldVal = pv;
if (value != null)
// 如果value不为空则替换旧值
p.val = value;
else if (t.removeTreeNode(p))
// 如果value为空则删除元素
// 如果删除后树的元素个数较少则退化成链表
// t.removeTreeNode(p)这个方法返回true表示删除节点后树的元素个数较少
setTabAt(tab, i, untreeify(t.first));
}
}
}
}
}
// 如果处理过,不管有没有找到元素都返回
if (validated) {
// 如果找到了元素,返回其旧值
if (oldVal != null) {
// 如果要替换的值为空,元素个数减1
if (value == null)
addCount(-1L, -1);
return oldVal;
}
break;
}
}
}
// 没找到元素返回空
return null;
}
获得键值
获取元素,根据目标key所在桶的第一个元素的不同采用不同的方式获取元素,关键点在于find()方法的重写。
public V get(Object key) {
Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
// 计算hash
int h = spread(key.hashCode());
// 如果元素所在的桶存在且里面有元素
if ((tab = table) != null && (n = tab.length) > 0 &&
(e = tabAt(tab, (n - 1) & h)) != null) {
// 如果第一个元素就是要找的元素,直接返回
if ((eh = e.hash) == h) {
if ((ek = e.key) == key || (ek != null && key.equals(ek)))
return e.val;
}
else if (eh < 0)
// hash小于0,说明是树或者正在扩容
// 使用find寻找元素,find的寻找方式依据Node的不同子类有不同的实现方式
return (p = e.find(h, key)) != null ? p.val : null;
// 遍历整个链表寻找元素
while ((e = e.next) != null) {
if (e.hash == h &&
((ek = e.key) == key || (ek != null && key.equals(ek))))
return e.val;
}
}
return null;
}
(1)hash到元素所在的桶;
(2)如果桶中第一个元素就是该找的元素,直接返回;
(3)如果是树或者正在迁移元素,则调用各自Node子类的find()方法寻找元素;
(4)如果是链表,遍历整个链表寻找元素;
(5)获取元素没有加锁;
扩容方法
ConcurrentHashMap的扩容逻辑相对复杂,这里先将大概逻辑说明白。满足扩容的条件下,会先进行新数组的创建,是旧数组的两倍,使用ForwardingNode进行封装。这个时候需要进行搬迁元素了,如果是单线程的场合下,使用CPU与列表长度计算出搬迁的范围,最小一个线程负担16个桶的搬迁。搬迁从尾部开始,如果列表长度大于16则依靠外部循环不断往前进行搬迁。如果是多线程场合下,会将搬迁任务分散到多个线程上,每个线程搬迁自己负责的部分。当没有轮到搬迁的桶可以继续get/put,如果是正在搬迁,则等待搬迁结束。已经结束的则会请求到ForwardingNode内部,操作新列表。具体的源码如下:
//调用该扩容方法的地方有:
//java.util.concurrent.ConcurrentHashMap#addCount 向集合中插入新数据后更新容量计数时发现到达扩容阈值而触发的扩容
//java.util.concurrent.ConcurrentHashMap#helpTransfer 扩容状态下其他线程对集合进行插入、修改、删除、合并、compute 等操作时遇到 ForwardingNode 节点时触发的扩容
//java.util.concurrent.ConcurrentHashMap#tryPresize putAll批量插入或者插入后发现链表长度达到8个或以上,但数组长度为64以下时触发的扩容
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
int n = tab.length, stride;
//计算每条线程处理的桶个数,每条线程处理的桶数量一样,如果CPU为单核,则使用一条线程处理所有桶
//每条线程至少处理16个桶,如果计算出来的结果少于16,则一条线程处理16个桶
if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
stride = MIN_TRANSFER_STRIDE; // subdivide range
if (nextTab == null) { // 初始化新数组(原数组长度的2倍)
try {
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
nextTab = nt;
} catch (Throwable ex) { // try to cope with OOME
sizeCtl = Integer.MAX_VALUE;
return;
}
nextTable = nextTab;
//将 transferIndex 指向最右边的桶,也就是数组索引下标最大的位置
transferIndex = n;
}
int nextn = nextTab.length;
//新建一个占位对象,该占位对象的 hash 值为 -1 该占位对象存在时表示集合正在扩容状态,key、value、next 属性均为 null ,nextTable 属性指向扩容后的数组
//该占位对象主要有两个用途:
// 1、占位作用,用于标识数组该位置的桶已经迁移完毕,处于扩容中的状态。
// 2、作为一个转发的作用,扩容期间如果遇到查询操作,遇到转发节点,会把该查询操作转发到新的数组上去,不会阻塞查询操作。
ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
//该标识用于控制是否继续处理下一个桶,为 true 则表示已经处理完当前桶,可以继续迁移下一个桶的数据
boolean advance = true;
//该标识用于控制扩容何时结束,该标识还有一个用途是最后一个扩容线程会负责重新检查一遍数组查看是否有遗漏的桶
boolean finishing = false; // to ensure sweep before committing nextTab
//这个循环用于处理一个 stride 长度的任务,i 后面会被赋值为该 stride 内最大的下标,而 bound 后面会被赋值为该 stride 内最小的下标
//通过循环不断减小 i 的值,从右往左依次迁移桶上面的数据,直到 i 小于 bound 时结束该次长度为 stride 的迁移任务
//结束这次的任务后会通过外层 addCount、helpTransfer、tryPresize 方法的 while 循环达到继续领取其他任务的效果
for (int i = 0, bound = 0;;) {
Node<K,V> f; int fh;
while (advance) {
int nextIndex, nextBound;
//每处理完一个hash桶就将 bound 进行减 1 操作
if (--i >= bound || finishing)
advance = false;
else if ((nextIndex = transferIndex) <= 0) {
//transferIndex <= 0 说明数组的hash桶已被线程分配完毕,没有了待分配的hash桶,将 i 设置为 -1 ,后面的代码根据这个数值退出当前线的扩容操作
i = -1;
advance = false;
}
//只有首次进入for循环才会进入这个判断里面去,设置 bound 和 i 的值,也就是领取到的迁移任务的数组区间
else if (U.compareAndSwapInt(this, TRANSFERINDEX, nextIndex, nextBound = (nextIndex > stride ? nextIndex - stride : 0))) {
bound = nextBound;
i = nextIndex - 1;
advance = false;
}
}
if (i < 0 || i >= n || i + n >= nextn) {
int sc;
//扩容结束后做后续工作,将 nextTable 设置为 null,表示扩容已结束,将 table 指向新数组,sizeCtl 设置为扩容阈值
if (finishing) {
nextTable = null;
table = nextTab;
sizeCtl = (n << 1) - (n >>> 1);
return;
}
//每当一条线程扩容结束就会更新一次 sizeCtl 的值,进行减 1 操作
if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
//(sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT 成立,说明该线程不是扩容大军里面的最后一条线程,直接return回到上层while循环
if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
return;
//(sc - 2) == resizeStamp(n) << RESIZE_STAMP_SHIFT 说明这条线程是最后一条扩容线程
//之所以能用这个来判断是否是最后一条线程,因为第一条扩容线程进行了如下操作:
// U.compareAndSwapInt(this, SIZECTL, sc, (rs << RESIZE_STAMP_SHIFT) + 2)
//除了修改结束标识之外,还得设置 i = n; 以便重新检查一遍数组,防止有遗漏未成功迁移的桶
finishing = advance = true;
i = n; // recheck before commit
}
}
else if ((f = tabAt(tab, i)) == null)
//遇到数组上空的位置直接放置一个占位对象,以便查询操作的转发和标识当前处于扩容状态
advance = casTabAt(tab, i, null, fwd);
else if ((fh = f.hash) == MOVED)
//数组上遇到hash值为MOVED,也就是 -1 的位置,说明该位置已经被其他线程迁移过了,将 advance 设置为 true ,以便继续往下一个桶检查并进行迁移操作
advance = true; // already processed
else {
synchronized (f) {
if (tabAt(tab, i) == f) {
Node<K,V> ln, hn;
//该节点为链表结构
if (fh >= 0) {
int runBit = fh & n;
Node<K,V> lastRun = f;
//遍历整条链表,找出 lastRun 节点
for (Node<K,V> p = f.next; p != null; p = p.next) {
int b = p.hash & n;
if (b != runBit) {
runBit = b;
lastRun = p;
}
}
//根据 lastRun 节点的高位标识(0 或 1),首先将 lastRun设置为 ln 或者 hn 链的末尾部分节点,后续的节点使用头插法拼接
if (runBit == 0) {
ln = lastRun;
hn = null;
}
else {
hn = lastRun;
ln = null;
}
//使用高位和低位两条链表进行迁移,使用头插法拼接链表
for (Node<K,V> p = f; p != lastRun; p = p.next) {
int ph = p.hash; K pk = p.key; V pv = p.val;
if ((ph & n) == 0)
ln = new Node<K,V>(ph, pk, pv, ln);
else
hn = new Node<K,V>(ph, pk, pv, hn);
}
//setTabAt方法调用的是 Unsafe 类的 putObjectVolatile 方法
//使用 volatile 方式的 putObjectVolatile 方法,能够将数据直接更新回主内存,并使得其他线程工作内存的对应变量失效,达到各线程数据及时同步的效果
//使用 volatile 的方式将 ln 链设置到新数组下标为 i 的位置上
setTabAt(nextTab, i, ln);
//使用 volatile 的方式将 hn 链设置到新数组下标为 i + n(n为原数组长度) 的位置上
setTabAt(nextTab, i + n, hn);
//迁移完成后使用 volatile 的方式将占位对象设置到该 hash 桶上,该占位对象的用途是标识该hash桶已被处理过,以及查询请求的转发作用
setTabAt(tab, i, fwd);
//advance 设置为 true 表示当前 hash 桶已处理完,可以继续处理下一个 hash 桶
advance = true;
}
//该节点为红黑树结构
else if (f instanceof TreeBin) {
TreeBin<K,V> t = (TreeBin<K,V>)f;
//lo 为低位链表头结点,loTail 为低位链表尾结点,hi 和 hiTail 为高位链表头尾结点
TreeNode<K,V> lo = null, loTail = null;
TreeNode<K,V> hi = null, hiTail = null;
int lc = 0, hc = 0;
//同样也是使用高位和低位两条链表进行迁移
//使用for循环以链表方式遍历整棵红黑树,使用尾插法拼接 ln 和 hn 链表
for (Node<K,V> e = t.first; e != null; e = e.next) {
int h = e.hash;
//这里面形成的是以 TreeNode 为节点的链表
TreeNode<K,V> p = new TreeNode<K,V>
(h, e.key, e.val, null, null);
if ((h & n) == 0) {
if ((p.prev = loTail) == null)
lo = p;
else
loTail.next = p;
loTail = p;
++lc;
}
else {
if ((p.prev = hiTail) == null)
hi = p;
else
hiTail.next = p;
hiTail = p;
++hc;
}
}
//形成中间链表后会先判断是否需要转换为红黑树:
//1、如果符合条件则直接将 TreeNode 链表转为红黑树,再设置到新数组中去
//2、如果不符合条件则将 TreeNode 转换为普通的 Node 节点,再将该普通链表设置到新数组中去
//(hc != 0) ? new TreeBin<K,V>(lo) : t 这行代码的用意在于,如果原来的红黑树没有被拆分成两份,那么迁移后它依旧是红黑树,可以直接使用原来的 TreeBin 对象
ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
(hc != 0) ? new TreeBin<K,V>(lo) : t;
hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
(lc != 0) ? new TreeBin<K,V>(hi) : t;
//setTabAt方法调用的是 Unsafe 类的 putObjectVolatile 方法
//使用 volatile 方式的 putObjectVolatile 方法,能够将数据直接更新回主内存,并使得其他线程工作内存的对应变量失效,达到各线程数据及时同步的效果
//使用 volatile 的方式将 ln 链设置到新数组下标为 i 的位置上
setTabAt(nextTab, i, ln);
//使用 volatile 的方式将 hn 链设置到新数组下标为 i + n(n为原数组长度) 的位置上
setTabAt(nextTab, i + n, hn);
//迁移完成后使用 volatile 的方式将占位对象设置到该 hash 桶上,该占位对象的用途是标识该hash桶已被处理过,以及查询请求的转发作用
setTabAt(tab, i, fwd);
//advance 设置为 true 表示当前 hash 桶已处理完,可以继续处理下一个 hash 桶
advance = true;
}
}
}
}
}
}
自描述统计方法
public int size() {
// 调用sumCount()计算元素个数
long n = sumCount();
return ((n < 0L) ? 0 :
(n > (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE :
(int)n);
}
final long sumCount() {
// 计算CounterCell所有段及baseCount的数量之和
CounterCell[] as = counterCells; CounterCell a;
long sum = baseCount;
if (as != null) {
for (int i = 0; i < as.length; ++i) {
if ((a = as[i]) != null)
sum += a.value;
}
}
return sum;
}
需要注意的是,size可能并不精确,因为在统计的中途是有可能发生数据变更的。
总结
ConcurrentHashMap是HashMap的线程安全版本,数据结构与HashMap一致。针对桶数据的put将会使用cas进行预先占位,存在桶数据则使用sychronized锁住。在扩容阶段,并发线程会帮助一起搬迁数据,该逻辑较为复杂。