读写锁
文章目录
- 读写锁
- 1. ReentrantReadWriteLock概述
- 2. 编码演示
- 2.1 ReentrantLock实现
- 2.2 ReentrantReadWriteLock实现
- 3. ReentrantReadWriteLock
- 3.1 锁降级
- 3.2 锁降级的必要性
- 3.3 饥饿问题
- 4. StampedLock(邮戳锁也叫票据锁)
- 4.1 特点
- 4.2 三种访问模式
- 4.3 缺点
- 4.4 编码示例
1. ReentrantReadWriteLock概述
读写锁定义为:一个资源能够被多个读线程访问,或者被一个写线程访问,但是不能同时存在读写线程
大多实际场景是“读/读”线程间并不存在互斥关系,只有"读/写"线程或"写/写"线程间的操作需要互斥的。因此引入ReentrantReadWriteLock。
特点:
- 它只允许读读共存,而读写和写写依然是互斥的
- 一个ReentrantReadWriteLock同时只能存在一个写锁但是可以存在多个读锁,但不能同时存在写锁和读锁(切菜还是拍蒜选一个)也即一个资源可以被多个读操作访问或一个写操作访问,但两者不能同时进行
ReentrantReadWriteLock:读写互斥,读读共享,读没有完成时候其它线程无法获得写锁
2. 编码演示
有一个缓存类Cache,需要进行读和写操作,如何使其性能达到最高(StampedLock请看StampedLock章节)?
2.1 ReentrantLock实现
public class Main {
static class Cache {
private final ReentrantLock lock = new ReentrantLock();
Map<String, String> map = new HashMap<>();
public void write(String key, String value){
lock.lock();
try {
System.out.println(Thread.currentThread().getName() + "正在写入");
map.put(key, value);
TimeUnit.MILLISECONDS.sleep(500);
System.out.println(Thread.currentThread().getName() + "完成写入");
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
lock.unlock();
}
}
public String read(String key) {
lock.lock();
try {
System.out.println(Thread.currentThread().getName() + "正在读取");
TimeUnit.MILLISECONDS.sleep(200);
System.out.println(Thread.currentThread().getName() + "完成读取");
return map.get(key);
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
lock.unlock();
}
}
}
public static void main(String[] args) throws InterruptedException {
CountDownLatch countDownLatch = new CountDownLatch(10);
long startTime = System.currentTimeMillis();
Cache cache = new Cache();
for (int i = 0; i < 5; i++) {
int finalI = i;
new Thread(() -> {
cache.write(String.valueOf(finalI), String.valueOf(finalI));
countDownLatch.countDown();
}, String.valueOf(i)).start();
}
for (int i = 5; i < 10; i++) {
int finalI = i;
new Thread(() -> {
cache.read(String.valueOf(finalI));
countDownLatch.countDown();
}, String.valueOf(i)).start();
}
countDownLatch.await();
long endTime = System.currentTimeMillis();
System.out.println("总消耗时间:" + (endTime - startTime) + "ms");
}
}
输出
0正在写入
0完成写入
1正在写入
1完成写入
2正在写入
2完成写入
4正在写入
4完成写入
3正在写入
3完成写入
5正在读取
5完成读取
6正在读取
6完成读取
7正在读取
7完成读取
8正在读取
8完成读取
9正在读取
9完成读取
总消耗时间:3632ms
分析:读写,写写之间确实需要锁进行互斥控制,但是读读之间不需要锁控制,而是可以同时执行的,如何优化读读?引出ReentrantReadWriteLock
2.2 ReentrantReadWriteLock实现
public class Main {
static class Cache {
// private final ReentrantLock lock = new ReentrantLock();
private final ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock();
Map<String, String> map = new HashMap<>();
public void write(String key, String value){
// lock.lock();
readWriteLock.writeLock().lock();
try {
System.out.println(Thread.currentThread().getName() + "正在写入");
map.put(key, value);
TimeUnit.MILLISECONDS.sleep(500);
System.out.println(Thread.currentThread().getName() + "完成写入");
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
// lock.unlock();
readWriteLock.writeLock().unlock();
}
}
public String read(String key) {
// lock.lock();
readWriteLock.readLock().lock();
try {
System.out.println(Thread.currentThread().getName() + "正在读取");
TimeUnit.MILLISECONDS.sleep(200);
System.out.println(Thread.currentThread().getName() + "完成读取");
return map.get(key);
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
// lock.unlock();
readWriteLock.readLock().unlock();
}
}
}
public static void main(String[] args) throws InterruptedException {
CountDownLatch countDownLatch = new CountDownLatch(10);
long startTime = System.currentTimeMillis();
Cache cache = new Cache();
for (int i = 0; i < 5; i++) {
int finalI = i;
new Thread(() -> {
cache.write(String.valueOf(finalI), String.valueOf(finalI));
countDownLatch.countDown();
}, String.valueOf(i)).start();
}
for (int i = 5; i < 10; i++) {
int finalI = i;
new Thread(() -> {
cache.read(String.valueOf(finalI));
countDownLatch.countDown();
}, String.valueOf(i)).start();
}
countDownLatch.await();
long endTime = System.currentTimeMillis();
System.out.println("总消耗时间:" + (endTime - startTime) + "ms");
}
}
输出
0正在写入
0完成写入
2正在写入
2完成写入
1正在写入
1完成写入
4正在写入
4完成写入
3正在写入
3完成写入
5正在读取
6正在读取
8正在读取
9正在读取
7正在读取
6完成读取
8完成读取
9完成读取
5完成读取
7完成读取
总消耗时间:2798ms
3. ReentrantReadWriteLock
允许多个线程同时读,但是只允许一个线程写,在线程获取到写锁的时候,其他写操作和读操作都会处于阻塞状态,读锁和写锁也是互斥的,所以在读的时候是不允许写的,读写锁比传统的synchronized速度要快很多,原因就是在于ReentrantReadWriteLock支持读并发,读读可以共享
3.1 锁降级
目的:锁降级是为了让当前线程感知到数据的变化,目的是保证数据可见性
ReentrantReadWriteLock锁降级: 将写入锁降级为读锁(类似Linux文件读写权限理解,就像写权限要高于读权限一样),程度变强叫做升级,反之叫做降级
写锁的降级,降级成为了读锁
- 如果同一个线程持有了写锁,在没有释放写锁的情况下,它还可以继续获得读锁。这就是写锁的降级,降级成为了读锁
- 规则惯例,先获取写锁,然后获取读锁,再释放写锁的 次序
- 如果释放了写锁,那么就完全转换为读锁。
public class Main {
static int resource = 0;
public static void main(String[] args) throws InterruptedException {
ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
ReentrantReadWriteLock.WriteLock writeLock = lock.writeLock();
ReentrantReadWriteLock.ReadLock readLock = lock.readLock();
writeLock.lock();
readLock.lock();
resource++;
writeLock.unlock();
System.out.println("先写锁后读锁且交替:" + resource);
readLock.unlock();
readLock.lock();
writeLock.lock();
System.out.println("先读锁后写锁且交替:" + resource);
resource++;
readLock.unlock();
writeLock.unlock();
}
}
输出且程序锁死,不会终止程序
先写锁后读锁且交替:1
即使用写锁的过程中可以加入读锁,反之则不行;用读锁的过程中,必须读完才能再使用写锁,否则会导致程序卡死
写锁和读锁是互斥的(这里的互斥是指线程间的互斥,当前线程可以获取到写锁又获取到读锁,但是获取到了读锁不能继续获取写锁),这是因为读写锁要保持写操作的可见性。因为,如果允许读锁在被获取的情况下对写锁的获取,那么正在运行的其他读线程无法感知到当前写线程的操作。
综上,
- ReentrantReadWriteLock读的过程不允许写,只有等待读线程都释放了读锁后才能获取写锁,也就是写入必须等待,这是一种悲观的读锁
3.2 锁降级的必要性
CacheData {
Object data;
volatile boolean cacheValid;
final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
void processCachedData() {
rwl.readLock().lock(); // R1
if (!cacheValid) {
rwl.readLock().unlock(); // R1
rwl.writeLock().lock(); // W1
try {
data = new Object();
cacheValid = true;
rwl.readLock(); // R2 在释放写锁前立刻抢夺读锁
} finally {
rwl.writeLock().unlock(); // W1
}
}
try {
// use(data);
} finally {
rwl.readLock().unlock(); // R2
}
}
}
-
代码中声明了一个volatile类型的cacheValid变量,保证其可见性
-
首先获取读锁,如果cache不可用,则释放读锁。获取写锁,在更改数据之前,再检查一次cachevalid的值,然后修改数据,将achevalid置为true,然后在释放写锁前立刻抢夺获取读锁;此时,cache中数据可用,处理cache中数据,最后释放读锁。这个过程就是一个完整的锁降级的过程,目的是保证数据可见性
总结:一句话,同一个线程自己持有写锁时再去拿读锁,其本质相当于重入。
- 如果违背锁降级的步骤
- 如果当前的线程C在修改完cache中的数据后,没有获取读锁而是直接释放了写锁,那么假设此时另一个线程D获取了写锁并修改了数据,那么C线程无法感知到数据已被修改,则数据出现错误
- 如果遵循锁降级的步骤
- 线租C在释放写锁之前获取读锁,那么线程D在获取写锁时将被阻塞,直到线理C完成数据处理过程。释放读锁。这样可以保证返回的数据是这次更新的数据,该机制是专门为了缓存设计的
3.3 饥饿问题
由上诉可知,当持有读锁时,将无法获取写锁,那么当大量读锁请求竞争资源时,写操作必须等待读锁全部释放才能获取到写锁,导致一直无法写入,这便是写饥饿问题,该问题将由JDK8中的StampedLock解决
4. StampedLock(邮戳锁也叫票据锁)
ReentrantReadWriteLock的读锁被占用的时候,其他线程尝试获取写锁的时候会被阻塞。但是,StampedLock采取乐观获取锁后,其他线程尝试获取写锁时不会被阻塞,这其实是对读锁的优化,所以,在获取乐观读锁后,还需要对结果进行校验
- stamp,戳记,long类型,代表了锁的状态。当stamp返回零时,表示线程获取锁失败。并且当释放锁或转换锁的时候,都要传入最初获取的stamp值
- 对短的只读代码段,使用乐观模式通常可以减少争用并提高吞吐量
- 该锁目前生产上还较少使用
4.1 特点
- 所有获取锁的方法,都返回一个邮戳 (Stamp),Stamp为零表示获取失败,其余都表示成功
- 所有释放锁的方法,都需要一个邮戳 (Stamp),这个Stamp必须是和成功获取锁时得到的Stamp一致
- StampedLock是不可重入的,危险(如果一个线程已经持有了写锁,再去获取写锁的话就会造成死锁)
4.2 三种访问模式
- Reading(读模式悲观):功能和ReentrantReadWriteLock的读锁类似
- Writing(写模式):功能和ReentrantRerdWriteLock的写锁类似
- Optimistic reading (乐观读模式): 无锁机制,类似于数据库中的乐观锁,支持读写并发,很乐观认为读取时没人修改,假如被修改再实现升级为悲观读模式
4.3 缺点
- StampedLock不支持重入
- StampedLock的悲观读和写锁都不支持条件变量(Condition)
- 使用StampedLock一定不要调用中断操作,即不要调用interrupt()方法
4.4 编码示例
public class Main {
static class Cache {
private final StampedLock stampedLock = new StampedLock();
Map<String, String> map = new HashMap<>();
public void write(String key, String value){
long lockStamped = stampedLock.writeLock();
try {
System.out.println(Thread.currentThread().getName() + "正在写入");
map.put(key, value);
TimeUnit.MILLISECONDS.sleep(500);
System.out.println(Thread.currentThread().getName() + "完成写入");
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
stampedLock.unlockWrite(lockStamped);
}
}
public String read(String key) {
long optimisticLockStamped;
do {
optimisticLockStamped = stampedLock.tryOptimisticRead();
System.out.println(Thread.currentThread().getName() + "正在读取");
try {
TimeUnit.MILLISECONDS.sleep(2000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
if (stampedLock.validate(optimisticLockStamped)) {
System.out.println(Thread.currentThread().getName() + "完成读取");
return map.get(key);
}
System.out.println(Thread.currentThread().getName() + "读取失败,自旋");
} while (true);
}
}
public static void main(String[] args) throws InterruptedException {
CountDownLatch countDownLatch = new CountDownLatch(15);
long startTime = System.currentTimeMillis();
Cache cache = new Cache();
for (int i = 0; i < 5; i++) {
int finalI = i;
new Thread(() -> {
cache.write(String.valueOf(finalI), String.valueOf(finalI));
countDownLatch.countDown();
}, String.valueOf(i)).start();
}
for (int i = 5; i < 10; i++) {
int finalI = i;
new Thread(() -> {
cache.read(String.valueOf(finalI));
countDownLatch.countDown();
}, String.valueOf(i)).start();
}
for (int i = 10; i < 15; i++) {
int finalI = i;
new Thread(() -> {
cache.write(String.valueOf(finalI), String.valueOf(finalI));
countDownLatch.countDown();
}, String.valueOf(i)).start();
}
countDownLatch.await();
long endTime = System.currentTimeMillis();
System.out.println("总消耗时间:" + (endTime - startTime) + "ms");
}
}
输出:
0正在写入
5正在读取
7正在读取
8正在读取
6正在读取
9正在读取
0完成写入
4正在写入
4完成写入
3正在写入
3完成写入
2正在写入
2完成写入
7读取失败,自旋
7正在读取
5读取失败,自旋
6读取失败,自旋
6正在读取
8读取失败,自旋
8正在读取
9读取失败,自旋
9正在读取
5正在读取
1正在写入
1完成写入
10正在写入
10完成写入
11正在写入
11完成写入
12正在写入
8读取失败,自旋
8正在读取
7读取失败,自旋
7正在读取
9读取失败,自旋
9正在读取
5读取失败,自旋
5正在读取
6读取失败,自旋
6正在读取
12完成写入
13正在写入
13完成写入
14正在写入
14完成写入
9读取失败,自旋
9正在读取
5读取失败,自旋
7读取失败,自旋
6读取失败,自旋
6正在读取
8读取失败,自旋
8正在读取
7正在读取
5正在读取
5完成读取
7完成读取
8完成读取
6完成读取
9完成读取
总消耗时间:8073ms
读取过程中也可以写入,只要读取过程中存在写入,则该次读取失败,进行自旋,再次读取