1. 写在前面
CopyOnWriteArraySet 是 Java 中一个线程安全的 Set 实现,它的底层是基于 CopyOnWriteArrayList 实现的。这种数据结构在并发编程中非常有用,因为它在写操作时会创建一个新的数组副本,从而避免了并发修改问题。不知道大家对它的底层实现有没有研究过,比如下面几个问题:
- CopyOnWriteArraySet 适用于哪些场景?
- CopyOnWriteArraySet 的优缺点是什么?
- CopyOnWriteArraySet 如何保证线程安全?
- CopyOnWriteArraySet 的迭代器是线程安全的吗?
- CopyOnWriteArraySet 如何实现添加元素的?
- CopyOnWriteArraySet 如何实现删除元素的?
- CopyOnWriteArraySet 与 HashSet 的区别是什么?
- CopyOnWriteArraySet 如何应对高频写操作
2. 从使用说起
2.1 基础使用
import java.util.concurrent.CopyOnWriteArraySet;
public class BasicUsage {
public static void main(String[] args) {
CopyOnWriteArraySet<String> set = new CopyOnWriteArraySet<>();
// 添加元素
set.add("A");
set.add("B");
set.add("C");
// 迭代元素
for (String s : set) {
System.out.println(s);
}
// 删除元素
set.remove("B");
// 再次迭代元素
for (String s : set) {
System.out.println(s);
}
}
}
2.2 多线程读操作
在多线程环境下,CopyOnWriteArraySet 可以高效地进行读操作,因为读操作不会被写操作阻塞。
import java.util.concurrent.CopyOnWriteArraySet;
public class MultiThreadedRead {
public static void main(String[] args) {
CopyOnWriteArraySet<String> set = new CopyOnWriteArraySet<>();
set.add("A");
set.add("B");
set.add("C");
Runnable readTask = () -> {
for (String s : set) {
System.out.println(Thread.currentThread().getName() + " - " + s);
}
};
Thread t1 = new Thread(readTask);
Thread t2 = new Thread(readTask);
t1.start();
t2.start();
}
}
2.3 读多写少的场景
CopyOnWriteArraySet 非常适合读多写少的场景,例如缓存、配置数据等。
import java.util.concurrent.CopyOnWriteArraySet;
public class ReadMostly {
private static CopyOnWriteArraySet<String> cache = new CopyOnWriteArraySet<>();
public static void main(String[] args) {
cache.add("Config1");
cache.add("Config2");
// 读操作
System.out.println(getConfig("Config1"));
// 写操作
updateConfig("Config3");
// 再次读操作
System.out.println(getConfig("Config3"));
}
public static String getConfig(String config) {
for (String s : cache) {
if (s.equals(config)) {
return s;
}
}
return null;
}
public static void updateConfig(String config) {
cache.add(config);
}
}
2.4 并发迭代
由于 CopyOnWriteArraySet 的迭代器是基于快照的,因此在迭代过程中,可以进行安全的读写操作,迭代器不会抛出 ConcurrentModificationException。
import java.util.concurrent.CopyOnWriteArraySet;
public class ConcurrentIteration {
public static void main(String[] args) {
CopyOnWriteArraySet<String> set = new CopyOnWriteArraySet<>();
set.add("A");
set.add("B");
set.add("C");
Runnable readTask = () -> {
for (String s : set) {
System.out.println(Thread.currentThread().getName() + " - " + s);
}
};
Runnable writeTask = () -> {
set.add("D");
set.remove("A");
};
Thread t1 = new Thread(readTask);
Thread t2 = new Thread(writeTask);
t1.start();
t2.start();
}
}
2.5 高频写操作的替代方案
虽然 CopyOnWriteArraySet 不适合高频写操作,但在这种场景下,可以考虑使用其他线程安全的集合类,如 ConcurrentHashMap
import java.util.concurrent.ConcurrentHashMap;
public class HighFrequencyWrite {
public static void main(String[] args) {
ConcurrentHashMap<String, Boolean> map = new ConcurrentHashMap<>();
// 添加元素
map.put("A", true);
map.put("B", true);
map.put("C", true);
// 迭代元素
for (String key : map.keySet()) {
System.out.println(key);
}
// 删除元素
map.remove("B");
// 再次迭代元素
for (String key : map.keySet()) {
System.out.println(key);
}
}
}
3. add(E e)的底层实现
这段代码展示了 CopyOnWriteArraySet 的核心方法之一:add 方法及其辅助方法 addIfAbsent。CopyOnWriteArraySet 是基于 CopyOnWriteArrayList 实现的,al 就是 CopyOnWriteArrayList 的实例。下面我们逐行分析这段代码,解释其工作原理。
public boolean add(E e) {
return al.addIfAbsent(e);
}
public boolean addIfAbsent(E e) {
Object[] snapshot = getArray();
return indexOf(e, snapshot, 0, snapshot.length) >= 0 ? false :
addIfAbsent(e, snapshot);
}
3.1 addIfAbsent 方法
public boolean addIfAbsent(E e) {
Object[] snapshot = getArray();
return indexOf(e, snapshot, 0, snapshot.length) >= 0 ? false :
addIfAbsent(e, snapshot);
}
addIfAbsent 方法用于在集合中添加元素 e,如果元素已经存在,则不添加。它的实现分为两个步骤:
- 获取当前数组的快照。
Object[] snapshot = getArray();
getArray 方法返回当前 CopyOnWriteArrayList 的底层数组。这是一个快照,保证了在迭代过程中数组不会被修改。
2. 检查元素是否已经存在,如果不存在则添加。
return indexOf(e, snapshot, 0, snapshot.length) >= 0 ? false :
addIfAbsent(e, snapshot);
- indexOf(e, snapshot, 0, snapshot.length) >= 0:检查元素 e 是否已经存在于数组中。如果存在,返回 false 表示添加失败。
- addIfAbsent(e, snapshot):如果元素 e 不存在,则调用 addIfAbsent(e, snapshot) 方法将其添加到数组中。
3.2 辅助方法 addIfAbsent
private boolean addIfAbsent(E e, Object[] snapshot) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
Object[] current = getArray();
int len = current.length;
if (snapshot != current) {
// Optimize for lost race to another addXXX operation
int common = Math.min(snapshot.length, len);
for (int i = 0; i < common; i++) {
if (current[i] != snapshot[i] && eq(e, current[i]))
return false;
}
if (indexOf(e, current, common, len) >= 0)
return false;
}
Object[] newArray = Arrays.copyOf(current, len + 1);
newArray[len] = e;
setArray(newArray);
return true;
} finally {
lock.unlock();
}
}
addIfAbsent 方法在持有锁的情况下执行,以确保线程安全。
3.2.1 获取锁
final ReentrantLock lock = this.lock;
lock.lock();
获取锁以确保线程安全。
3.2.2 获取当前数组
Object[] current = getArray();
int len = current.length;
获取当前数组并记录其长度。
3.2.3 检查快照和当前数组是否一致
if (snapshot != current) {
int common = Math.min(snapshot.length, len);
for (int i = 0; i < common; i++) {
if (current[i] != snapshot[i] && eq(e, current[i]))
return false;
}
if (indexOf(e, current, common, len) >= 0)
return false;
}
如果快照和当前数组不一致,说明在获取快照后,数组可能已经被其他线程修改。需要再次检查元素是否已经存在。
3.2.4 创建新数组并添加元素
Object[] newArray = Arrays.copyOf(current, len + 1);
newArray[len] = e;
setArray(newArray);
创建一个新数组,将当前数组的元素复制到新数组中,并在新数组的末尾添加新元素 e。
3.2.5 释放锁并返回结果
return true;
返回 true 表示添加成功,并在 finally 块中释放锁。
4. set(int index, E element) 的底层实现
4.1 方法签名
public E set(int index, E element) {
set 方法用于替换指定索引 index 处的元素为 element,并返回旧值。
4.2 获取锁
final ReentrantLock lock = this.lock;
lock.lock();
获取 ReentrantLock 锁以确保线程安全。所有的修改操作都在持有锁的情况下进行。
4.3 主体逻辑
try {
Object[] elements = getArray();
E oldValue = get(elements, index);
4.3.1 获取当前数组
Object[] elements = getArray();
调用 getArray 方法获取当前 CopyOnWriteArrayList 的底层数组。
4.3.2 获取旧值
E oldValue = get(elements, index);
调用 get 方法获取指定索引 index 处的旧值。
4.4 检查并替换元素
if (oldValue != element) {
int len = elements.length;
Object[] newElements = Arrays.copyOf(elements, len);
newElements[index] = element;
setArray(newElements);
} else {
// Not quite a no-op; ensures volatile write semantics
setArray(elements);
}
4.4.1 检查旧值和新值是否相同
if (oldValue != element) {
如果旧值和新值不同,则进行替换操作。
4.4.2 创建新数组并替换元素
int len = elements.length;
Object[] newElements = Arrays.copyOf(elements, len);
newElements[index] = element;
setArray(newElements);
- Arrays.copyOf(elements, len):创建一个新数组 newElements,将当前数组 elements 的所有元素复制到新数组中。
- newElements[index] = element:将新值 element 替换到指定索引 index 处。
- setArray(newElements):将新数组设置为 CopyOnWriteArrayList 的底层数组。
4.4.3 处理相同值的情况
} else {
// Not quite a no-op; ensures volatile write semantics
setArray(elements);
}
如果旧值和新值相同,虽然不需要实际替换元素,但仍然调用 setArray 方法来确保 volatile 写语义(即确保内存可见性)。
4.5 返回旧值并释放锁
return oldValue;
} finally {
lock.unlock();
}
- return oldValue;:返回旧值。
- finally 块:在 finally 块中释放锁,以确保锁在任何情况下都能被释放,避免死锁。
5. remove(int index)的底层实现
这段代码展示了 CopyOnWriteArrayList 类中的 remove 方法,它用于移除指定索引处的元素并返回被移除的值。这是一个线程安全的方法,通过使用 ReentrantLock 来确保操作的原子性。下面我们逐行分析这段代码,解释其工作原理。
public E remove(int index) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
Object[] elements = getArray();
int len = elements.length;
E oldValue = get(elements, index);
int numMoved = len - index - 1;
if (numMoved == 0)
setArray(Arrays.copyOf(elements, len - 1));
else {
Object[] newElements = new Object[len - 1];
System.arraycopy(elements, 0, newElements, 0, index);
System.arraycopy(elements, index + 1, newElements, index,
numMoved);
setArray(newElements);
}
return oldValue;
} finally {
lock.unlock();
}
}
5.1 方法签名
public E remove(int index) {
remove 方法用于移除指定索引 index 处的元素,并返回被移除的元素。
5.2 获取锁
final ReentrantLock lock = this.lock;
lock.lock();
获取 ReentrantLock 锁以确保线程安全。所有的修改操作都在持有锁的情况下进行。
5.3 主体逻辑
try {
Object[] elements = getArray();
int len = elements.length;
E oldValue = get(elements, index);
5.3.1 获取当前数组
Object[] elements = getArray();
调用 getArray 方法获取当前 CopyOnWriteArrayList 的底层数组。
5.3.2 获取数组长度
int len = elements.length;
获取当前数组的长度。
5.3.3 获取旧值
E oldValue = get(elements, index);
调用 get 方法获取指定索引 index 处的旧值。
5.4 移除元素
int numMoved = len - index - 1;
if (numMoved == 0)
setArray(Arrays.copyOf(elements, len - 1));
else {
Object[] newElements = new Object[len - 1];
System.arraycopy(elements, 0, newElements, 0, index);
System.arraycopy(elements, index + 1, newElements, index, numMoved);
setArray(newElements);
}
5.4.1 计算需要移动的元素数量
int numMoved = len - index - 1;
计算从 index + 1 到数组末尾的元素数量,这些元素需要向前移动一个位置。
5.4.2 处理无需移动的情况
if (numMoved == 0)
setArray(Arrays.copyOf(elements, len - 1));
如果 numMoved 为 0,表示移除的是最后一个元素。直接创建一个新数组,长度为 len - 1,并将前 len - 1 个元素复制到新数组中。
5.4.3 处理需要移动的情况
else {
Object[] newElements = new Object[len - 1];
System.arraycopy(elements, 0, newElements, 0, index);
System.arraycopy(elements, index + 1, newElements, index, numMoved);
setArray(newElements);
}
- 创建新数组:Object[] newElements = new Object[len - 1]; 创建一个新数组,长度为 len - 1。
- 复制前半部分:System.arraycopy(elements, 0, newElements, 0, index); 将原数组 elements 中从索引 0 到 index - 1 的元素复制到新数组 newElements 中。
- 复制后半部分:System.arraycopy(elements, index + 1, newElements, index, numMoved); 将原数组 elements 中从索引 index + 1 到末尾的元素复制到新数组 newElements 中,从索引 index 开始。
- 设置新数组:setArray(newElements); 将新数组设置为 CopyOnWriteArrayList 的底层数组。
5.5 返回旧值并释放锁
return oldValue;
} finally {
lock.unlock();
}
- return oldValue;:返回被移除的旧值。
- finally 块:在 finally 块中释放锁,以确保锁在任何情况下都能被释放,避免死锁。
系列文章
1.JDK源码阅读之环境搭建
2.JDK源码阅读之目录介绍
3.jdk源码阅读之ArrayList(上)
4.jdk源码阅读之ArrayList(下)
5.jdk源码阅读之HashMap
6.jdk源码阅读之HashMap(下)
7.jdk源码阅读之ConcurrentHashMap(上)
8.jdk源码阅读之ConcurrentHashMap(下)
9.jdk源码阅读之ThreadLocal
10.jdk源码阅读之ReentrantLock
11.jdk源码阅读之CountDownLatch
12.jdk源码阅读之CyclicBarrier
13.jdk源码阅读之Semaphore
14.jdk源码阅读之线程池(上)
15.jdk源码阅读之线程池(下)
16.jdk源码阅读之ArrayBlockingQueue
17.jdk源码阅读之LinkedBlockingQueue
18.jdk源码阅读之CopyOnWriteArrayList
19.jdk源码阅读之FutureTask
20.jdk源码阅读之CompletableFuture
21.jdk源码阅读之AtomicLong
22.jdk源码阅读之Thread(上)
23.jdk源码阅读之Thread(下)
24.jdk源码阅读之ExecutorService
25.jdk源码阅读之Executors
26.jdk源码阅读之ConcurrentLinkedQueue
27.jdk源码阅读之ConcurrentLinkedDeque