8 读写锁
8.1 悲观锁和乐观锁介绍
回顾悲观锁和乐观锁的概念
悲观锁:单独每个人完成事情的时候,执行上锁解锁。解决并发中的问题,不支持并发操作,只能一个一个操作,效率低
顾名思义,就是比较悲观的锁,总是假设最坏的情况,每次去拿数据的时候都认为别人会修改,所以每次在拿数据的时候都会上锁,这样别人想拿这个数据就会阻塞直到它拿到锁(共享资源每次只给一个线程使用,其它线程阻塞,用完后再把资源转让给其它线程)。传统的关系型数据库里边就用到了很多这种锁机制,比如行锁,表锁等,读锁,写锁等,都是在做操作之前先上锁。Java中synchronized
和ReentrantLock
等独占锁就是悲观锁思想的实现。
乐观锁:每执行一件事情,都会比较数据版本号,谁先提交,谁先提交版本号
反之,总是假设最好的情况,每次去拿数据的时候都认为别人不会修改,所以不会上锁,但是在更新的时候会判断一下在此期间别人有没有去更新这个数据,可以使用版本号机制和CAS算法实现。乐观锁适用于多读的应用类型,这样可以提高吞吐量,像数据库提供的类似于write_condition机制,其实都是提供的乐观锁。在Java中java.util.concurrent.atomic
包下面的原子变量类就是使用了乐观锁的一种实现方式CAS实现的。
8.1.1 悲观锁和乐观锁应用场景
从上面对两种锁的介绍,我们知道两种锁各有优缺点,不可认为一种好于另一种,像乐观锁适用于写比较少的情况下(多读场景),即冲突真的很少发生的时候,这样可以省去了锁的开销,加大了系统的整个吞吐量。但如果是多写的情况,一般会经常产生冲突,这就会导致上层应用会不断的进行retry,这样反倒是降低了性能,所以一般多写的场景下用悲观锁就比较合适。
8.1.2 实现方式
上面其实已经说了悲观锁的实现方式了,即synchronized
和ReentrantLock。所以下面主要说说乐观锁的实现方式。主要有两种方式:
-
1.版本号机制:
一般是在数据表中加上一个数据版本号version字段,表示数据被修改的次数,当数据被修改时,version值会加一。当线程A要更新数据值时,在读取数据的同时也会读取version值,在提交更新时,若刚才读取到的version值为当前数据库中的version值相等时才更新,否则重试更新操作,直到更新成功。就是通过version版本号作为一个标识,标识这个字段所属的数据是否被改变。
-
2.CAS算法:
即compare and swap(比较与交换),是一种有名的无锁算法。无锁编程,即不使用锁的情况下实现多线程之间的变量同步,也就是在没有线程被阻塞的情况下实现变量的同步,所以也叫非阻塞同步(Non-blocking Synchronization)。CAS算法涉及到三个操作数
- 需要读写的内存值 V
- 进行比较的值 A
- 拟写入的新值 B
当且仅当 V 的值等于 A时,CAS通过原子方式用新值B来更新V的值,否则不会执行任何操作(比较和替换是一个原子操作)。一般情况下是一个自旋操作,即不断的重试。
8.1.3 乐观锁的缺点
-
1 ABA 问题
如果一个变量V初次读取的时候是A值,并且在准备赋值的时候检查到它仍然是A值,那我们就能说明它的值没有被其他线程修改过了吗?很明显是不能的,因为在这段时间它的值可能被改为其他值,然后又改回A,那CAS操作就会误认为它从来没有被修改过。这个问题被称为CAS操作的 "ABA"问题。
JDK 1.5 以后的
AtomicStampedReference 类
就提供了此种能力,其中的compareAndSet 方法
就是首先检查当前引用是否等于预期引用,并且当前标志是否等于预期标志,如果全部相等,则以原子方式将该引用和该标志的值设置为给定的更新值。 -
2 循环时间长开销大
自旋CAS(也就是不成功就一直循环执行直到成功)如果长时间不成功,会给CPU带来非常大的执行开销。 如果JVM能支持处理器提供的pause指令那么效率会有一定的提升,pause指令有两个作用,第一它可以延迟流水线执行指令(de-pipeline),使CPU不会消耗过多的执行资源,延迟的时间取决于具体实现的版本,在一些处理器上延迟时间是零。第二它可以避免在退出循环的时候因内存顺序冲突(memory order violation)而引起CPU流水线被清空(CPU pipeline flush),从而提高CPU的执行效率。
-
3 只能保证一个共享变量的原子操作
CAS 只对单个共享变量有效,当操作涉及跨多个共享变量时 CAS 无效。但是从 JDK 1.5开始,提供了
AtomicReference类
来保证引用对象之间的原子性,你可以把多个变量放在一个对象里来进行 CAS 操作.所以我们可以使用锁或者利用AtomicReference类
把多个共享变量合并成一个共享变量来操作。
8.2 读写锁
新概念:
表锁:整个表操作,不会发生死锁
行锁:每个表中的单独一行进行加锁,会发生死锁
读锁:共享锁(可以有多个人读),会发生死锁
写锁:独占锁(只能有一个人写),会发生死锁
8.2.1 Volatile
把代码块声明为 synchronized,有两个重要后果,通常是指该代码具有 原子性(atomicity)和 可见性(visibility)。
- 原子性意味着个时刻,只有一个线程能够执行一段代码,这段代码通过一个monitor object保护。从而防止多个线程在更新共享状态时相互冲突。
- 可见性则更为微妙,它必须确保释放锁之前对共享数据做出的更改对于随后获得该锁的另一个线程是可见的。 —— 如果没有同步机制提供的这种可见性保证,线程看到的共享变量可能是修改前的值或不一致的值,这将引发许多严重问题。
volatile的使用条件
Volatile 变量具有
synchronized
的可见性特性,但是不具备原子性。这就是说线程能够自动发现 volatile 变量的最新值。
8.2.2 读写锁的使用
读写锁:一个资源可以被多个读线程访问,也可以被一个写线程访问,但不能同时存在读写线程,读写互斥,读读共享
读写锁ReentrantReadWriteLock
读锁为ReentrantReadWriteLock.ReadLock,readLock()
方法
写锁为ReentrantReadWriteLock.WriteLock,writeLock()
方法
创建读写锁对象private ReadWriteLock rwLock = new ReentrantReadWriteLock();
写锁 加锁 rwLock.writeLock().lock();
,解锁为rwLock.writeLock().unlock();
读锁 加锁rwLock.readLock().lock();
,解锁为rwLock.readLock().unlock();
案例分析:
模拟多线程在map中取数据和读数据
完整代码如图
//资源类
class MyCache {
//创建map集合
private volatile Map<String,Object> map = new HashMap<>();
//创建读写锁对象
private ReadWriteLock rwLock = new ReentrantReadWriteLock();
//放数据
public void put(String key,Object value) {
//添加写锁
rwLock.writeLock().lock();
try {
System.out.println(Thread.currentThread().getName()+" 正在写操作"+key);
//暂停一会
TimeUnit.MICROSECONDS.sleep(300);
//放数据
map.put(key,value);
System.out.println(Thread.currentThread().getName()+" 写完了"+key);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
//释放写锁
rwLock.writeLock().unlock();
}
}
//取数据
public Object get(String key) {
//添加读锁
rwLock.readLock().lock();
Object result = null;
try {
System.out.println(Thread.currentThread().getName()+" 正在读取操作"+key);
//暂停一会
TimeUnit.MICROSECONDS.sleep(300);
result = map.get(key);
System.out.println(Thread.currentThread().getName()+" 取完了"+key);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
//释放读锁
rwLock.readLock().unlock();
}
return result;
}
}
public class ReadWriteLockDemo {
public static void main(String[] args) throws InterruptedException {
MyCache myCache = new MyCache();
//创建线程放数据
for (int i = 1; i <=5; i++) {
final int num = i;
new Thread(()->{
myCache.put(num+"",num+"");
},String.valueOf(i)).start();
}
TimeUnit.MICROSECONDS.sleep(300);
//创建线程取数据
for (int i = 1; i <=5; i++) {
final int num = i;
new Thread(()->{
myCache.get(num+"");
},String.valueOf(i)).start();
}
}
}
通过这一章节和以上文章
总结锁的演变
- 无锁:多线程抢夺资源
- synchronized和ReentrantLock,都是独占,每次只可以一个操作,不能共享
- ReentrantReadWriteLock,读读可以共享,提升性能,但是不能多人写。缺点:造成锁饥饿(一直读,不能写),读进程不能写,写进程可以读。
- 写锁降级为读锁(一般等级写锁高于读锁)
8.2.3 锁的降级
锁降级 : 是指在保持当前拥有的写锁的基础上,再获取读锁,随后释放写锁的过程。
当一个线程获取了写锁,并且又获取了读锁,那么当该线程释放了写锁时,该线程拥有的锁就会进行降级,变为了读锁,其实这个实现从之前看加锁源码就知道了,读锁和写锁的获取都是分开的,所以写锁的释放不会影响到读锁的持有。
8.2.4 锁降级的必要性
只有当一个线程既需要写操作又需要读操作的时候,锁降级才有必要性。
但是如果写完之后,还要继续使用这些状态(状态量可能有多个),如果直接释放写锁,导致其他线程修改了这些状态,那么之后的操作感知不到其他线程的修改。
锁降级过程
获取写锁->获取读锁->释放写锁->释放读锁
//演示读写锁降级
public class Demo1 {
public static void main(String[] args) {
//可重入读写锁对象
ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock();
ReentrantReadWriteLock.ReadLock readLock = rwLock.readLock();//读锁
ReentrantReadWriteLock.WriteLock writeLock = rwLock.writeLock();//写锁
//锁降级
//1 获取写锁
writeLock.lock();
System.out.println("cgg");
//2 获取读锁
readLock.lock();
System.out.println("---read");
//3 释放写锁
writeLock.unlock();
//4 释放读锁
readLock.unlock();
}
}
如果是读之后再写,执行不了
因为读锁权限小于写锁
需要读完之后释放读锁,在进行写锁
//2 获取读锁
readLock.lock();
System.out.println("---read");
//1 获取写锁
writeLock.lock();
System.out.println("manongyanjiuseng");
8.2.5 小结(重要)
- 在线程持有读锁的情况下,该线程不能取得写锁(因为获取写锁的时候,如果发 现当前的读锁被占用,就马上获取失败,不管读锁是不是被当前线程持有)。
- 在线程持有写锁的情况下,该线程可以继续获取读锁(同一个线程能才能获取成功)(获取读锁时如果发现写 锁被占用,只有写锁没有被当前线程占用的情况才会获取失败)。
原因: 当线程获取读锁的时候,可能有其他线程同时也在持有读锁,因此不能把 获取读锁的线程“升级”为写锁;而对于获得写锁的线程,它一定独占了读写 锁,因此可以继续让它获取读锁,当它同时获取了写锁和读锁后,还可以先释 放写锁继续持有读锁,这样一个写锁就“降级”为了读锁。
9 阻塞队列
9.1 BlockingQueue 简介
阻塞队列,顾名思义,首先它是一个队列, 通过一个共享的队列,可以使得数据由队列的一端输入,从另外一端输出;
- 当队列是空的,从队列中获取元素的操作将会被阻塞
- 当队列是满的,从队列中添加元素的操作将会被阻塞
- 试图从空的队列中获取元素的线程将会被阻塞,直到其他线程往空的队列插入新的元素
- 试图向已满的队列中添加新元素的线程将会被阻塞,直到其他线程从队列中移除一个或多个元素或者完全清空,使队列变得空闲起来并后续新增
常用的队列主要有以下两种:
- 先进先出(FIFO):先插入的队列的元素也最先出队列,类似于排队的功能。从某种程度上来说这种队列也体现了一种公平性
- 后进先出(LIFO):后插入队列的元素最先出队列,这种队列优先处理最近发生的事件(栈)
在多线程领域:所谓阻塞,在某些情况下会挂起线程(即阻塞),一旦条件满足,被挂起的线程又会自动被唤起
为什么需要 BlockingQueue
好处是我们不需要关心什么时候需要阻塞线程,什么时候需要唤醒线程,因为这一切BlockingQueue 都给你一手包办了
- 当队列中没有数据的情况下,消费者端的所有线程都会被自动阻塞(挂起),直到有数据放入队列
- 当队列中填满数据的情况下,生产者端的所有线程都会被自动阻塞(挂起),直到队列中有空的位置,线程被自动唤醒
9.2 常见阻塞队列种类
1.ArrayBlockingQueue
基于数组的阻塞队列
由数组结构组成的有界阻塞队列
-ArrayBlockingQueue 在生产者放入数据和消费者获取数据,都是共用同一个锁对象,无法并行
2.LinkedBlockingQueue
基于链表的阻塞队列
由链表结构组成的有界(但大小默认值为integer.MAX_VALUE)阻塞队列
- 之所以能够高效的处理并发数据,还因为其对于生产者端和消费者端分别采用了独立的锁来控制数据同步,这也意味着在高并发的情况下生产者和消费者可以并行地操作队列中的数据,以此来提高整个队列的并发性能
3.DelayQueue
使用优先级队列实现的延迟无界阻塞队列
- DelayQueue 中的元素只有当其指定的延迟时间到了,才能够从队列中获取到该元素。DelayQueue 是一个没有大小限制的队列,因此往队列中插入数据的操作(生产者)永远不会被阻塞,而只有获取数据的操作(消费者)才会被阻塞
4.PriorityBlockingQueue
基于优先级的阻塞队列
支持优先级排序的无界阻塞队列
不会阻塞数据生产者,而只会在没有可消费的数据时,阻塞数据的消费者
5.SynchronousQueue
一种无缓冲的等待队列
相对于有缓冲的 BlockingQueue 来说,少了一个中间经销商的环节(缓冲区)
不存储元素的阻塞队列,也即单个元素的队列
声明一个SynchronousQueue 有两种不同的方式,它们之间有着不太一样的行为。
公平模式和非公平模式的区别:
- 公平模式:SynchronousQueue 会采用公平锁,并配合一个 FIFO 队列来阻塞多余的生产者和消费者,从而体系整体的公平策略;
- 非公平模式(SynchronousQueue 默认):SynchronousQueue 采用非公平锁,同时配合一个 LIFO 队列来管理多余的生产者和消费者
而后一种模式,如果生产者和消费者的处理速度有差距,则很容易出现饥渴的情况,即可能有某些生产者或者是消费者的数据永远都得不到处理
6.LinkedTransferQueue
由链表结构组成的无界阻塞 TransferQueue 队列
由链表组成的无界阻塞队列
- 预占模式。意思就是消费者线程取元素时,如果队列不为空,则直接取走数据,若队列为空,生成一个节点(节点元素为 null)入队,消费者线程被等待在这个节点上,生产者线程入队时发现有一个元素为 null 的节点,生产者线程就不入队了,直接就将元素填充到该节点,并唤醒该节点等待的线程,被唤醒的消费者线程取走元素,从调用的方法返回
7.LinkedBlockingDeque
由链表结构组成的双向阻塞队列
阻塞有两种情况
- 插入元素时: 如果当前队列已满将会进入阻塞状态,一直等到队列有空的位置时再该元素插入,该操作可以通过设置超时参数,超时后返回 false 表示操作失败,也可以不设置超时参数一直阻塞,中断后抛出 InterruptedException异常
- 读取元素时: 如果当前队列为空会阻塞住直到队列不为空然后返回元素,同样可以通过设置超时参数
9.3 BlockingQueue 核心方法
创建阻塞队列 BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(3);
加入元素System.out.println(blockingQueue.add("a"));
,成功为true,失败为false
检查元素System.out.println(blockingQueue.element());
取出元素System.out.println(blockingQueue.remove());
,先进先出
第二种方法:
加入元素System.out.println(blockingQueue.offer(“a”));
取出元素System.out.println(blockingQueue.poll());
第三种方法:
加入元素blockingQueue.put(“a”);
取出元素System.out.println(blockingQueue.take());
该方法加入元素或者取出元素,如果满了或者空了,还进行下一步加入或者取出操作,会出现阻塞的状态,而第一二种方法是直接抛出异常
第四种方法:
加入元素 System.out.println(blockingQueue.offer("a"));
该方法满了或者空了在进行会有阻塞,但可以加入参数,超时退出System.out.println(blockingQueue.offer("w",3L, TimeUnit.SECONDS));