1. ConcurrentHashMap
为什么要使用ConcurrentHashmap
在多线程的情况下,使用HashMap是线程不安全的。另外可以使用Hashtable,其是线程安全的,但是Hashtable的运行效率很低,之所以效率低下主要是因为其实现使用了synchronized关键字对put等操作进行加锁,而synchronized关键字加锁是对整个对象进行加锁,也就是说在进行put等修改Hash表的操作时,锁住了整个Hash表,从而使得其表现的效率低下。所以最终就诞生了ConcurrentHashMap.
锁分段技术
HashTable容器在竞争激烈的并发环境下表现出效率低下的原因是所有访问HashTable的 线程都必须竞争同一把锁,假如容器里有多把锁,每一把锁用于锁容器其中一部分数据,那么 当多线程访问容器里不同数据段的数据时,线程间就不会存在锁竞争,从而可以有效提高并 发访问效率,这就是ConcurrentHashMap所使用的锁分段技术。首先将数据分成一段一段地存 储,然后给每一段数据配一把锁,当一个线程占用锁访问其中一个段数据的时候,其他段的数 据也能被其他线程访问。
2. ConcurrentHashMap-JDK1.7
ConcurrentHashMap在对象中保存了一个Segment数组,即将整个Hash表划分为多个分段;而每个Segment元素,即每个分段则类似于一个Hashtable;这样,在执行put操作时首先根据hash算法定位到元素属于哪个Segment,然后对该Seqment加锁即可。因此,ConcurrentHashMap在多线程并发编程中可以实现多线程put操作。接下来分析JDK1.7版本中ConcurrentHashMap的实现原理。
2.1. 数据结构
ConcurrentHashMap为了提高本身的并发能力,在内部采用了一个叫做Segment的结构,一个Segment其实就是一个类Hash Table的结构,Segment内部维护了一个链表数组,我们用下面这一幅图来看下ConcurrentHashMap的内部结构:
通过这样的结构,在同步的过程中,如果是对不同的segment中的数据进行操作,就不涉及到“竞争”关系,只对该元素所在的segment加锁即可,这样在最理想的情况下,ConcurrentHashMap就可以最高支持和segment数量的写操作,因此就提升了并发能力。
2.1.1. segment数据结构
static final class Segment<K,V> extends ReentrantLock implements Serializable {
transient volatile int count;
transient int modCount;
transient int threshold;
transient volatile HashEntry<K,V>[] table;
final float loadFactor;
}
详细解释一下Segment里面的成员变量的意义:
count:Segment中元素的数量
modCount:对table的大小造成影响的操作的数量(比如put或者remove操作)
threshold:阈值,Segment里面元素的数量超过这个值依旧就会对Segment进行扩容
table:链表数组,数组中的每一个元素代表了一个链表的头部
loadFactor:负载因子,用于确定threshold
count用来统计该段数据的个数,它是volatile变量,它用来协调修改和读取操作,以保证读取操作能够读取到几乎最新的修改。协调方式是这样的,每次修改操作做了结构上的改变,如增加/删除节点(修改节点的值不算结构上的改变),都要写count值,每次读取操作开始都要读取count的值。这利用了 Java 5中对volatile语义的增强,对同一个volatile变量的写和读存在happens-before关系。modCount统计段结构改变的次数,主要是为了检测对多个段进行遍历过程中某个段是否发生改变,在讲述跨段操作时会还会详述。threashold用来表示需要进行rehash的界限值。table数组存储段中节点,每个数组元素是个hash链,用HashEntry表示。table也是volatile,这使得能够读取到最新的 table值而不需要同步。loadFactor表示负载因子。
2.1.2. HashEntry
Segment中的元素是以HashEntry的形式存放在链表数组中的,看一下HashEntry的结构:
static final class HashEntry<K,V> {
final K key;
final int hash;
volatile V value;
final HashEntry<K,V> next;
}
可以看到HashEntry的一个特点,除了value以外,其他的几个变量都是final的,这意味着不能从hash链的中间或尾部添加或删除节点,因为这需要修改next 引用值,所有的节点的修改只能从头部开始。对于put操作,可以一律添加到Hash链的头部。但是对于remove操作,可能需要从中间删除一个节点,这就需要将要删除节点的前面所有节点整个复制一遍,最后一个节点指向要删除结点的下一个结点。这在讲解删除操作时还会详述。为了确保读操作能够看到最新的值,将value设置成volatile,这避免了加锁。
2.2. 初始化
在java7中,初始化ConcurretnHashMap主要根据三个参数,他们分别是
initialCapacity
:初始容量。实际操作的时候需要平均分给每个segment。loadFactor
:负载因子,决定了哈希表的扩容阈值。实对每个segment内部扩容使用的,并不是给segment扩容的。concurrencyLevel
:并发级别,表示希望支持的最大线程并发数(也就是segment的个数)。
public ConcurrentHashMap(int initialCapacity,
float loadFactor, int concurrencyLevel) {
// 1. 校验参数是否合法
if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)
throw new IllegalArgumentException();
// 并发级别不能超过最大分段数 MAX_SEGMENTS
if (concurrencyLevel > MAX_SEGMENTS)
concurrencyLevel = MAX_SEGMENTS;
// 2. 计算 Segment 数量(ssize 为 2 的幂次方,方便位运算定位)
int sshift = 0; // 用于记录位移次数
int ssize = 1; // 初始 Segment 数量
while (ssize < concurrencyLevel) {
++sshift; // 每次位移,表示 ssize 向左移一位,2 倍扩展
ssize <<= 1; // ssize <<= 1 相当于 ssize = ssize * 2
}
// 记录 Segment 的位移偏移量和掩码值
segmentShift = 32 - sshift; // 用于定位键的 Segment
segmentMask = ssize - 1; // 掩码,保证索引不会越界
this.segments = Segment.newArray(ssize); // 创建 Segment 数组
// 3. 如果初始容量大于最大值,则设为最大值
if (initialCapacity > MAXIMUM_CAPACITY)
initialCapacity = MAXIMUM_CAPACITY;
// 4. 计算每个 Segment 的初始容量
int c = initialCapacity / ssize; // 平均分配给每个 Segment 的容量
if (c * ssize < initialCapacity)
++c; // 如果不能整除,需要多分配一点
// 5. 计算每个 Segment 容量的最小 2 的幂次方
int cap = 1;
while (cap < c)
cap <<= 1; // 找到大于等于 c 的最小 2 的幂次方
// 6. 初始化每个 Segment,设置其容量和负载因子
for (int i = 0; i < this.segments.length; ++i)
this.segments[i] = new Segment<K,V>(cap, loadFactor);
}
这个初始化方法在使用无参初始化的时候会默认被调用,创建一个带有默认初始容量 (16)、默认加载因子 (0.75) 和 默认并发级别 (16) 的空散列映射表。
static final int DEFAULT_INITIAL_CAPACITY= 16;
/**
* 散列映射表的默认装载因子为 0.75,该值是 table 中包含的 HashEntry 元素的个数与
* table 数组长度的比值
* 当 table 中包含的 HashEntry 元素的个数超过了 table 数组的长度与装载因子的乘积时,
* 将触发 再散列
* 在构造函数中没有指定这个参数时,使用本参数
*/
static final float DEFAULT_LOAD_FACTOR= 0.75f;
/**
* 散列表的默认并发级别为 16。该值表示当前更新线程的估计数
* 在构造函数中没有指定这个参数时,使用本参数
*/
static final int DEFAULT_CONCURRENCY_LEVEL= 16;
原文链接:https://blog.csdn.net/dingjianmin/article/details/79776646
/**
* 创建一个带有默认初始容量 (16)、默认加载因子 (0.75) 和 默认并发级别 (16)
* 的空散列映射表。
*/
public ConcurrentHashMap() {
// 使用三个默认参数,调用上面重载的构造函数来创建空散列映射表
this(DEFAULT_INITIAL_CAPACITY, DEFAULT_LOAD_FACTOR, DEFAULT_CONCURRENCY_LEVEL);
2.3. 定位segment
前面介绍了concurrenthashmap通过segment分段,并对segment加锁,来提升并发能力。那么在插入数据的时候肯定要先定位到segment。在java7的源码中,ConcurrentHashMap会首先使用变种hash算法对元素的哈市Code进行再散列。进行再散列的目的是减少哈希冲突,使元素能够均匀地分布在不同的segment上,从而提升容器的存取效率。
/**
* Applies a supplemental hash function to a given hashCode, which
* defends against poor quality hash functions. This is critical
* because ConcurrentHashMap uses power-of-two length hash tables,
* that otherwise encounter collisions for hashCodes that do not
* differ in lower bits. Note: Null keys always map to hash 0.
*/
static int hash(int h) {
// 扰动函数:将 hashCode 的高 16 位和低 16 位进行异或运算
h += (h << 15) ^ 0xffffcd7d;
h ^= (h >>> 10);
h += (h << 3);
h ^= (h >>> 6);
h += (h << 2) + (h << 14);
return h ^ (h >>> 16);
}
hash函数的输入是对象原始的hashCode,直接来自于key.hashCode();
返回扰动后的哈希值,这个值会被用来定位到具体地segment。定位具体的segment所使用的函数是segmentFor(int hash)
final Segment<K,V> segmentFor(int hash) {
return segments[(hash >>> segmentShift) & segmentMask];
}
2.4. ConcurrentHashMap的操作
2.4.1. get操作
ConcurrentHashMap的get操作是不用加锁的,我们这里看一下其实现:
1 public V get(Object key) {
2 int hash = hash(key.hashCode());
3 return segmentFor(hash).get(key, hash);
4 }
第二行,对hash值进行了二次hash,之所以要进行再哈希,其目的是为了减少哈希冲突,使元素能够均匀的分布在不同的Segment上,从而提高容器的存取效率。
看第三行,segmentFor这个函数用于确定操作应该在哪一个segment中进行,几乎对ConcurrentHashMap的所有操作都需要用到这个函数.
get操作的高效之处在于整个get过程不需要加锁。我们知道HashTable容器的get方法是需要加锁的,那么ConcurrentHash-Map的get操作是如何做到不加锁不出问题的呢?原因是它的get方法里将要使用的共享变量都定义成volatile类型,如用于统计当前 Segement大小的count字段和用于存储值的HashEntry的value。
transient volatile int count;
volatile Vvalue;
2.4.2. put操作
由于put方法里需要对共享变量进行写入操作,所以为了线程安全,在操作共享变量时必须加锁。put方法首先定位到Segment,然后在Segment里进行插入操作。插入操作需要经历两个 步骤,第一步判断是否需要对Segment里的HashEntry数组进行扩容,第二步定位添加元素的位 置,然后将其放在HashEntry数组里。
是否需要扩容:在插入元素前会先判断Segment里的HashEntry数组是否超过容量(threshold),如果超过阈值,则对数组进行扩容。值得一提的是,Segment的扩容判断比HashMap更恰当,因为HashMap 是在插入元素后判断元素是否已经到达容量的,如果到达了就进行扩容,但是很有可能扩容 之后没有新元素插入,这时HashMap就进行了一次无效的扩容。
如何扩容:在扩容的时候,首先会创建一个容量是原来容量两倍的数组,然后将原数组里的元素进行再散列后插入到新的数组里。为了高效,ConcurrentHashMap不会对整个容器进行扩容,而只对某个segment进行扩容。
//首先,根据 key 计算出对应的 hash 值:
public V put(K key, V value) {
if (value == null) //ConcurrentHashMap 中不允许用 null 作为映射值
throw new NullPointerException();
int hash = hash(key.hashCode()); // 计算键对应的散列码
// 根据散列码找到对应的 Segment
return segmentFor(hash).put(key, hash, value, false);
}
//根据 hash 值找到对应的 Segment:
/**
* 使用 key 的散列码来得到 segments 数组中对应的 Segment
*/
final Segment<K,V> segmentFor(int hash) {
// 将散列值右移 segmentShift 个位,并在高位填充 0
// 然后把得到的值与 segmentMask 相“与”
// 从而得到 hash 值对应的 segments 数组的下标值
// 最后根据下标值返回散列码对应的 Segment 对象
return segments[(hash >>> segmentShift) & segmentMask];
}
//在这个 Segment 中执行具体的 put 操作:
V put(K key, int hash, V value, boolean onlyIfAbsent) {
lock(); // 加锁,这里是锁定某个 Segment 对象而非整个 ConcurrentHashMap
try {
int c = count;
if (c++ > threshold) // 如果超过再散列的阈值
rehash(); // 执行再散列,table 数组的长度将扩充一倍
HashEntry<K,V>[] tab = table;
// 把散列码值与 table 数组的长度减 1 的值相“与”
// 得到该散列码对应的 table 数组的下标值
int index = hash & (tab.length - 1);
// 找到散列码对应的具体的那个桶
HashEntry<K,V> first = tab[index];
HashEntry<K,V> e = first;
while (e != null && (e.hash != hash || !key.equals(e.key)))
e = e.next;
V oldValue;
if (e != null) { // 如果键 / 值对以经存在
oldValue = e.value;
if (!onlyIfAbsent)
e.value = value; // 设置 value 值
}
else { // 键 / 值对不存在
oldValue = null;
++modCount; // 要添加新节点到链表中,所以 modCont 要加 1
// 创建新节点,并添加到链表的头部
tab[index] = new HashEntry<K,V>(key, hash, first, value);
count = c; // 写 count 变量
}
return oldValue;
} finally {
unlock(); // 解锁
}
`}
3. ConcurrentHashMap-JDK1.8
并发编程在高并发场景下是一个绕不开的话题,ConcurrentHashMap 是 Java 提供的一种线程安全的哈希表实现。与 HashMap 不同,ConcurrentHashMap 能够在多线程环境下保证数据的安全访问。在 JDK1.8 之前,它通过分段锁(Segment)来实现高效并发控制,而在 JDK1.8 中,ConcurrentHashMap 进行了重大改进,摒弃了分段锁,采用更高效的数据结构和锁机制来提升并发性能。
3.1. 简介
不再使用 Segment 分段锁,而是基于Node+ CAS(Compare-And-Swap) + synchronized 的组合进行并发控制。锁粒度更细:在单个桶(Node)上进行同步操作。使用 数组 + 链表 + 红黑树 作为底层数据结构,当链表长度超过 8 时(并且达到一定的容量要求--node数组超过64,否则不会树化,而是进行扩容),链表会转化为红黑树,以提升查询性能。 Java 8 中,锁粒度更细,synchronized 只锁定当前链表或红黑二叉树的首节点,这样只要 hash 不冲突,就不会产生并发,就不会影响其他 Node 的读写,效率大幅提升。
数据结构
底层数据结构和java8的hashMap是一样的
3.2. 源码分析
3.2.1. Node节点
static class Node<K,V> {
final int hash; // 哈希值
final K key; // 键
volatile V val; // 值
volatile Node<K,V> next; // 下一个节点
}
每个 Node 里面是 key-value 的形式,并且把 value 用 volatile 修饰,以便保证可见性,同时内部还有
一个指向下一个节点的 next 指针。
3.2.2. put方法
public V put(K key, V value) {
return putVal(key, value, false);
}
/** Implementation for put and putIfAbsent */
final V putVal(K key, V value, boolean onlyIfAbsent) {
// key 和 value 不能为空
if (key == null || value == null) throw new NullPointerException();
int hash = spread(key.hashCode());
int binCount = 0;
for (Node<K,V>[] tab = table;;) {
// f = 目标位置元素
Node<K,V> f; int n, i, fh;// fh 后面存放目标位置的元素 hash 值
if (tab == null || (n = tab.length) == 0)
// 数组桶为空,初始化数组桶(自旋+CAS)
tab = initTable();
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
// 桶内为空,CAS 放入,不加锁,成功了就直接 break 跳出
if (casTabAt(tab, i, null,new Node<K,V>(hash, key, value, null)))
break; // no lock when adding to empty bin
}
else if ((fh = f.hash) == MOVED)
tab = helpTransfer(tab, f);
else {
V oldVal = null;
// 使用 synchronized 加锁加入节点
synchronized (f) {
if (tabAt(tab, i) == f) {
// 说明是链表
if (fh >= 0) {
binCount = 1;
// 循环加入新的或者覆盖节点
for (Node<K,V> e = f;; ++binCount) {
K ek;
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break;
}
Node<K,V> pred = e;
if ((e = e.next) == null) {
pred.next = new Node<K,V>(hash, key,
value, null);
break;
}
}
}
else if (f instanceof TreeBin) {
// 红黑树
Node<K,V> p;
binCount = 2;
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}
if (binCount != 0) {
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i);
if (oldVal != null)
return oldVal;
break;
}
}
}
addCount(1L, binCount);
return null;
}
- 根据 key 计算出 hashcode 。
- 判断是否需要进行初始化。
- 即为当前 key 定位出的 Node,如果为空表示当前位置可以写入数据,利用 CAS 尝试写入,失败则自旋保证成功。
- 如果当前位置的 hashcode == MOVED == -1,则需要进行扩容。
- 如果都不满足,则利用 synchronized 锁写入数据。
- 如果数量大于 TREEIFY_THRESHOLD 则要执行树化方法,在 treeifyBin 中会首先判断当前数组长度 ≥64 时才会将链表转换为红黑树。
3.2.3. get
public V get(Object key) {
Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
// key 所在的 hash 位置
int h = spread(key.hashCode());
if ((tab = table) != null && (n = tab.length) > 0 &&
(e = tabAt(tab, (n - 1) & h)) != null) {
// 如果指定位置元素存在,头结点hash值相同
if ((eh = e.hash) == h) {
if ((ek = e.key) == key || (ek != null && key.equals(ek)))
// key hash 值相等,key值相同,直接返回元素 value
return e.val;
}
else if (eh < 0)
// 头结点hash值小于0,说明正在扩容或者是红黑树,find查找
return (p = e.find(h, key)) != null ? p.val : null;
while ((e = e.next) != null) {
// 是链表,遍历查找
if (e.hash == h &&
((ek = e.key) == key || (ek != null && key.equals(ek))))
return e.val;
}
}
return null;
}
总结一下 get 过程:
- 根据 hash 值计算位置。
- 查找到指定位置,如果头节点就是要找的,直接返回它的 value.
- 如果头节点 hash 值小于 0 ,说明正在扩容或者是红黑树,查找之。
- 如果是链表,遍历查找之。