前言:
为什么需要学习ReentrantLock?
目前项目开发中使用到的几乎都是分布式锁,平时可能很少用到java自带的锁; 但实际在我们java的源码中,随处可见需要使用锁来保证线程安全,所以还是有必要学习下ReentrantLock。
1.管程模型
可以说AQS就是基于管程模型来实现的,所以我们还需要了解;管程是管理共享变量以及对共享变量的操作过程
,让他们支持并发。
在管程的发展史上,先后出现过三种不同的管程模型,分别是Hasen模型、Hoare模型和MESA模型。现在正在广泛使用的是MESA模型
。
管程中引入了条件变量的概念,而且每个条件变量都对应有一个等待队列。条件变量和等待队列的作用是解决线程之间的同步问题。
入口等待队列: 例A和B竞争锁(共享变量),B竞争失败,B就放入该队列中等待。
例子:AQS中所有等待获取锁资源的线程的队列。具体来说,当一个线程尝试获取锁资源失败时,它将会被加入到同步队列的末尾,并进入自旋等待状态,直到获取到锁资源或被唤醒。而当其他线程释放锁资源时,它们会从同步队列的头部选取一个或多个线程,并将其移动到锁资源的拥有者上,从而使其可以继续执行。
条件队列: 例Condition接口 调用了 await 的线程加入条件队列;signal唤醒后的线程最终还是加入到入口等待队列。
例子:AQS中,每个条件变量(Condition)都有一个等待队列,用来存放等待该条件的线程。当一个线程调用条件变量的await()方法时,它将会自动释放当前持有的锁,并加入到对应条件变量的等待队列中。而当其他线程调用条件变量的signal()或signalAll()方法时,它们会从对应条件变量的等待队列中选取一个或多个线程,并将其从等待队列中移动到同步队列(Sync Queue)中,以便重新竞争获取锁资源。
小结:同步队列主要解决了多个线程同时请求同一个锁资源时的竞争问题。条件等待队列主要解决了线程等待特定条件的问题,避免了线程不必要的忙等待和性能损失。
2.先根据代码分析ReentrantLock的实现
ReentrantLock lock = new ReentrantLock(false);
lock.lock() //加锁
=======这里写业务代码=========
lock.unlock() //解锁
从上面代码可以看到,我们开发人员只需要在 lock和 unlock中间写业务代码,就能实现线程安全的业务逻辑,那我们可以猜想到,两个方法之间肯定是有保证同一时间段仅一个线程进入的锁。而java自带的魔法类 UnSafe中提供了CAS来保证原子操作,所以代码变成了下面这样。
ReentrantLock lock = new ReentrantLock(false);
lock.lock() //加锁
while(true){
if(Cas加锁){
break;//走到此说明加锁成功,跳出循环
}
}
=======获取到锁的线程在这里执行业务代码=========
lock.unlock() //解锁
上面已经保证了线程安全,但是也存在很大的问题,假如有1000个线程进来,只有T0拿到了锁,剩余999个线程一直在空转? Cpu不堪重负,因此可以通过一些方案来处理没抢到锁的线程,例如 yeild,sleep。
ReentrantLock lock = new ReentrantLock(false);
lock.lock() //加锁
while(true){
if(Cas加锁){
break;//走到此说明加锁成功,跳出循环
}
//让出CPU使用权,一直让也不合适,999个线程互让没有意义,搞的跟活锁似的,还是只有T0在执行
Thread.yeild()
//睡多久?不好确定,万一T0瞬间就完成了
Thread.sleep(1);
//最终方案:让加锁失败的线程阻塞在这里
LockSupport.park();
}
=======获取到锁的线程在这里执行业务代码=========
lock.unlock() //解锁
上面的代码看着好像没问题了,但没抢到锁的999个线程难道一直阻塞下去??占着茅坑不拉屎,所以需要有人去唤醒他,而java自带的工具 LockSupport有park,自然也有unPark方法。 LockSupport.park(要唤醒的线程)
ReentrantLock lock = new ReentrantLock(false);
lock.lock() //加锁
while(true){
if(Cas加锁){
break;//走到此说明加锁成功,跳出循环
}
//让出CPU使用权,一直让也不合适,999个线程互让没有意义,还是只有T0在执行
Thread.yeild()
//睡多久?不好确定,万一T0瞬间就完成了
Thread.sleep(1);
//将要阻塞的线程保存起来,例如保存到 HashSet(不好处理公平问题) 或 Queue
HashSet.add(Thread) ; LikedQueued.put(Thread)
//最终方案:让加锁失败的线程阻塞在这里
LockSupport.park();
}
=======获取到锁的线程在这里执行业务代码=========
lock.unlock() //解锁
thread = HashSet.get() ; thread = LikedQueued.take()
LockSupport.unpark(thread);
小结:三板斧 【自旋,Cas,LockSupport】+存储容器
至此,我们对实现一把锁有了一定概念,接下来就深入源码来查看JDK8中ReentrantLock的实现。
3. 源码阅读
ReentrantLock的锁分为公平锁和非公平锁,下面简单介绍一下:
公平:排队打饭,新来的人得排在你的后面。
非公平:排队打饭,新来的人可能会插队,排在你前面。
以下面这个demo为例子,来进行源码的调试。
/**
* 模拟抢票场景 3个线程抢2张票
*/
public class ReentrantLockDemo {
private final ReentrantLock lock = new ReentrantLock();//默认非公平
private static volatile int tickets = 2; // 总票数
public void buyTicket() {
lock.lock(); // 获取锁
try {
if (tickets > 0) { // 还有票 读
try {
Thread.sleep(10); // 休眠10ms
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + "购买了第" + tickets-- + "张票"); //写
} else {
System.out.println("票已经卖完了," + Thread.currentThread().getName() + "抢票失败");
}
} finally {
//zheli 唤醒
lock.unlock(); // 释放锁
}
}
public static void main(String[] args) {
ReentrantLockDemo ticketSystem = new ReentrantLockDemo();
Thread[] threads = new Thread[3];
for (int i = 0; i < threads.length; i++) {
// 抢票
threads[i] = new Thread(ticketSystem::buyTicket, "线程" + i);
}
// 启动线程
for (Thread thread : threads) {
thread.start();
}
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println("剩余票数:" + tickets);
}
}
进入lock.lock() 的实现,再进去 sync.lock()的实现
此时出现了三个核心方法,我们一个一个讲:
tryAcquire(尝试获取锁)
else if 中代码块说明:
判断当前线程是否等于拿到锁的那个线程, 如果是则将当前 state + acquires(例 state+1),表示这是第N次重入,这里可清晰看出可重入锁的判断依据。 statue 解锁时需要循环调用直到state为0,才表示锁已释放 。
简单来说,tryAcquire方法就是尝试获取锁,获取锁成功方法返回true,当前线程就可以去执行业务代码了。
addWaiter(构建节点并入队)
至此,入队后,无论模式是否公平,都需要按队列的顺序去排队了。
acquireQueued(CLH队列中尝试获取锁,获取不到的阻塞)
predecessor 什么时候获取前节点会为null
到此处得注意下,shouldParkAfterFailedAcquire方法外是一个循环,也就是说节点第一次进来时,大多都会走到设置前置节点的waitStatus由0变为-1的逻辑,当第二次循环时才会执行parkAndCheckInterrupt阻塞线程。
至此线程就被阻塞了,新的线程进来,流程也是大同小异不再赘述;当线程被中断或unpark唤醒时,会走到上图的 return Thread.interrupted(),继续在for中尝试是否满足抢锁的条件,不满足就阻塞,以此类推。
所以接下来就开始讲下解锁逻辑:
当执行完业务代码,他就会执行 unlock()方法释放锁。
从尝试释放锁的逻辑可看出重入锁lock了多少次,就要释放多少次。(即调用多少次unlock)
到这里好像流程就走完了,其实不然,上图只是唤醒了下个节点,但是与头结点还保持着链接!从上面的源码中也并没有看到 lock.unlock方法中将【被唤醒线程与前节点的关联关系解除】。是的,其实他是在唤醒后去做的,就在 acquireQueued方法中的 p.next = null; // help GC
和synchronized的一些区别:
1.sync存在锁升级的过程,通过对象头2bit的标记记录着锁的级别。
2.都是可重入锁,但是reentrantLock加了几次锁,就得手动释放几次锁,否则其他线程也拿不到锁。
3.reentrantLock是可以被interrupt信号和release方法中断唤醒的,sync全靠操作系统,不可控,对于我们开发并不方便。
注:今天讲的源码中并未使用到条件等待队列(Condition接口)
4.其他
自问自答:
waitStatus 类似我们业务的 state字段,只是一个标记,例如 = -1时,表示当前节点的下个节点允许被唤醒,只看reentrantLock的话没必要这个属性,但是AQS是很多工具类的基类,考虑的比较全。
唤醒阻塞线程的2种方式,一种是 lock.unLock,其实就是去调用 LockSupport.unpark(要唤醒的线程),
另一种就是interrupt信号唤醒,不过要区分 park方法
LockSupport.park 和 LockSupport.park(this) 区别:
一旦用 interrupt去唤醒 park,这个线程以后再也不会被 park住了。而park(this)就不会存在该问题。
为什么selfInterrupt方法打了一个中断标记?
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
//上面都为true时,这里会打一个中断标记,因为 parkAndCheckInterrupt 被唤醒的其中一种方式,是 interrupt
//且方法中判断完标记后,会清除, 这样我们使用这个工具的人(也就是程序员)不知道他是怎么被唤醒的,所以他这里又打了个标记。其实我感觉框架直接用 isInterrupt就够了,没必要重新去打标记
selfInterrupt();
}
AQS 定义了5个队列中节点状态:
值为0,初始化状态,表示当前节点在sync队列中,等待着获取锁。
CANCELLED,值为1,表示当前的线程被取消;可能是异常,中断引起之类,需要被废弃结束
SIGNAL,值为-1,表示当前节点的后继节点包含的线程可以运行了,也就是当前可以unpark;
CONDITION,值为-2,表示当前节点在等待condition,也就是在condition队列中;
PROPAGATE,值为-3,表示当前场景下后续的acquireShared能够得以执行;
最后附一份方法解析图