- 数据结构
- 锁
- sizeCtl
- concurrencyLevel
- ForwardingNode、ReservationNode
- 扩容
- get、put、remove
hashmap:线程不安全
hashtable:通过synchronized保证线程安全但效率低。强一致性
ConcurrentHashMap:弱一致性
数据结构
ConcurrentHashMap为node数组+链表+红黑树。约等于hashmap
//第一次使用时初始化,大小2^n
transient volatile Node<K,V>[] table;
static class Node<K,V> implements Map.Entry<K,V> {
final int hash;
final K key;
volatile V val;//不允许value改变值,避免加锁
volatile Node<K,V> next;//不能改变next,只能头插
}
锁
JDK1.8 中采用CAS + synchronized,锁首节点,粒度更小
// jdk1.7分段segment加锁,跨段操作时,按顺序锁定全部段,按顺序解锁。
//继承ReentrantLock加锁解锁,保证每个 Segment 是线程安全
static class Segment<K,V> extends ReentrantLock implements Serializable
sizeCtl
/**
-N:有N-1个线程正在扩容
-1:正在初始化(初始化只能由一个线程完成)
0:未初始化
N:下一次进行扩容的大小
*/
private transient volatile int sizeCtl;
concurrencyLevel
/**
* @param concurrencyLevel 估计的并发线程数
*/
public ConcurrentHashMap(int initialCapacity,
float loadFactor, int concurrencyLevel) {
if (!(loadFactor > 0.0f) || initialCapacity < 0 || concurrencyLevel <= 0)
throw new IllegalArgumentException();
if (initialCapacity < concurrencyLevel) // Use at least as many bins
initialCapacity = concurrencyLevel; // as estimated threads
long size = (long)(1.0 + (long)initialCapacity / loadFactor);
int cap = (size >= (long)MAXIMUM_CAPACITY) ?
MAXIMUM_CAPACITY : tableSizeFor((int)size);
this.sizeCtl = cap;
}
ForwardingNode、ReservationNode
static final int MOVED = -1; // hash for forwarding nodes
static final int TREEBIN = -2; // hash for roots of trees
static final int RESERVED = -3; // hash for transient reservations
/**
* 扩容迁移时插到链表头部的节点
* Hash地址为-1,存储nextTable的引用,作为一个占位符放在table中表示当前节点为null或者已被移动
*/
static final class ForwardingNode<K,V> extends Node<K,V> {
final Node<K,V>[] nextTable;
ForwardingNode(Node<K,V>[] tab) {
super(MOVED, null, null);
this.nextTable = tab;
}
Node<K,V> find(int h, Object k) {}
}
/**
* computeIfAbsent and compute时的占位节点
*/
static final class ReservationNode<K,V> extends Node<K,V> {
ReservationNode() {
super(RESERVED, null, null);
}
Node<K,V> find(int h, Object k) {}
}
扩容
将table分成n份 ,每份长度为stride,table大小除以CPU数量,平分给各个线程,每个线程对当前旧table中[transferIndex-stride, transferIndex-1]位置的结点进行迁移。某一位置链表/树全部迁移结束,使用ForwardingNode占据该位置。迁移时不需要rehash,同hashmap一样计算新下标即可
//扩容时使用 nextTable数组变成table的2倍。
private transient volatile Node<K,V>[] nextTable;
//扩容迁移时nextTable 切割的下标 (加一)
private transient volatile int transferIndex;
//扩容迁移时最小步长,避免太小耗费内存
private static final int MIN_TRANSFER_STRIDE = 16;
- 当成功插入节点时,如果链表长度>= 8,则对数组长度进行判断,实现如下:如果数组长度<MIN_TREEIFY_CAPACITY(默认64),,则数组长度扩大两倍,重新调整节点的位置。不会将链表转为红黑树.
- 当成功插入节点时达到阈值,触发扩容
- 扩容状态下其他线程进行插入、修改、删除、合并、compute 等操作时遇到 ForwardingNode 节点会触发扩容 。helpTransfer
get、put、remove
-
remove
hash定位到node,复制节点前的链与节点后的链相连,跳过节点,即为删除
∵使用volatile修饰next,所以next值具有不变性,需要头插
∴节点前的链逆置 -
get
- 计算hash,定位,如果该节点存在,判断hash=-1则在新表中查询(帮助扩容),hash=-2按红黑树查询,否则在当前位置按链表查询
- 不允许Key或Value为null,因为 ConcurrentHashMap 是用于多线程的 ,如果get(key)得到了 null ,不能确定是value为null,还是没有找到对应的key,存在二义性。 HashMap 可以通过containsKey() 去判断。
- get无需加锁。因为 Node 的元素 value 和指针 next 是用 volatile。table用 volatile 修饰是保证扩容的时候保证可见性。
- put
- 第一次插入 初始化table
- 计算hash定位,CAS插入
- 需要插入的位置上有数据&是forward节点(hash=-1),则表示其他线程正在对table进行扩容,参与一起扩容helpTransfer
- 需要插入的位置上有数据&不是forward节点,对首节点加synchronized锁,插入(若此时需要扩容,扩容阻塞等待,先插入)
- addCount()计算数量,扩容/链表调整为红黑树