目录
ReentrantLock 概述
ReentrantLock 的原理
什么是 AQS ?
获取锁资源(以⾮公平锁为例)
tryAcquire
addWaiter
acquireQueued
释放锁资源
⾮公平锁体现在哪⾥?
调试代码
总结
ReentrantLock 概述
ReentrantLock是Lock接⼝的默认实现,是⼀种独占锁。相对synchronized⽽⾔,ReentrantLock 提供了更多的操作⽅式以及更细粒度的加锁⽅式。
主要特性:
- 可重⼊。ReentrantLock是可重⼊锁,因为它会记录之前获得锁线程对象,保存在 exclusiveOwenerThread变量中,当⼀个线程要获取锁时,会先判断当前线程是不是已经获取锁的线程。synchronized也是可重⼊锁。
- 可中断。ReentrantLock是可中断锁,它提供了lockInterruptibly这种可中断的加锁⽅式,可以有效的避免线程之间因为互相持续占有资源⽽导致阻塞。synchronized⽆法实现可中断。
- 公平锁与⾮公平锁可选。ReentrantLock默认是⾮公平锁,但是也可以通过构造⽅法选择⾮公平锁。公平锁是指当多个线程尝试获取同⼀个锁时,获取锁的顺序按照到达的时间顺序排序。
ReentrantLock 的原理
ReentrantLock 是继承了队列同步器 AQS 来实现锁功能的。我们先了解 ReentranctLock 的两个构造⽅法,ReentrantLock 通过构造器参数选择到底是公平模式还是⾮公平模式。
public ReentrantLock() {
sync = new NonfairSync();//默认为不公平锁
}
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();//若为true,则是公平
锁
}
再通过⼀张类关系图看⼀下NonfairSync和FairSync这两个内部类的来头。
可以看到NonfairSync和FairSync都是继承了Sync这个抽象类,⽽Sync则继承了AQS。Sync、
NonfairSync、FairSync都是ReentrantLock的静态内部类,ReentrantLock的许多⽅法都是Sync类代为实现。
接下来,我们就按照公平锁和⾮公平锁来看看 ReentrantLock 是如何实现锁资源的获取与释放的。
什么是 AQS ?
AQS的全称是AbstractQueuedSynchronizer,即抽象队列同步器,其底层是 volatile 与 CAS,⽽其上层则是基于该抽象类构建的许多并发组件,如ReentrantLock、Semaphore等。AQS⾃身实现了⼀些基
本⽅法,还剩余⼀些⾯向上层的⽅法,这些⽅法需要继承该抽象类的同步组件去实现。
AQS最核⼼的数据结构是⼀个 volatile int state 和⼀条双向链表。
- state:代表可共享资源的数量,如果是互斥访问,⼀般设置为1,⽽如果是共享访问,可以设置为 N(N为可共享线程的个数);
- 双向链表:线程等待队列是⼀个双向链表,⽆法⽴即获得锁⽽进⼊阻塞状态的线程会加⼊队列的尾部。当然对state以及队列的操作都是采⽤了volatile + CAS + ⾃旋的操作⽅式,采⽤的是乐观锁的概念。
AQS有两种资源共享的实现⽅式
1.
独占式:每次只能有⼀个线程持有锁,例如ReentrantLock实现的就是独占式的锁资源。
2.
共享式:允许多个线程同时获取锁,并发访问共享资源,ReentrantWriteLock和CountDownLatch
等就是实现的这种模式。
在AbstractQueuedSynchronizer的类⾥⾯有⼀个静态内部类Node。它代表的是队列中的每⼀个节点。
我们看看具体的代码
// 节点的状态
volatile int waitStatus;
// 当前节点的前⼀个节点
volatile Node prev;
// 当前节点的后⼀个节点
volatile Node next;
// 当前节点中所包含的线程对象
volatile Thread thread;
// 等待队列中的下⼀个节点
Node nextWaiter;
图形化出来,如下所示
获取锁资源(以⾮公平锁为例)
最为关键的lock⽅法
public void lock() {
sync.lock();//sync可能是NonfairSync或者FairSync
}
很简单的⼀个⽅法实现,sync代为实现lock的逻辑,⽽sync是Sync的实例,对于ReentrantLock来说,它的代码不需要知道Sync到底是NonfariSync还是FairSync,在运⾏时才会知道。这就是设计模式中的
策略模式。
接下来先来看NonfairSync的lock⽅法。state = 0 的意思是,当前这个锁资源,还没有⼈拥有。
final void lock() {
if (compareAndSetState(0, 1)) //1.
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1); // 2.
}
- 先进⾏ CAS 看锁资源是否没给占⽤(也就是 state 是否为 0),如果没有给占⽤,那我设置当前线程为锁资源拥有者,并 state 赋值 1。
- 如果锁给占⽤了,那执⾏ acquire ⽅法。
acquire⽅法在AQS中已经提供实现不需要重写。
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt(); //中断线程
}
tryAcquire
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) { // 1.
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) { // 2.
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false; // 3.
}
我们看⾮公平锁的 tryAcquire 实现
- 同样 CAS 判断锁资源是否给占⽤,没有的话,设置当前线程为所资源拥有者,state=1
- 如果发现当前锁资源已经给占⽤了,且锁资源和当前线程⼀样,那也就是锁重⼊了,对应 state +1
- 如果没能获取锁,那就返回 false,对应 acquire ⽅法中取⾮,让程序进⾏到 addWaiter ⽅法 和 acquireQueued ⽅法
addWaiter
该⽅法主要是将当前线程作为节点添加到队列尾部
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode); // 1.
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) { // 2.
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node); // ⼊队列
return node;
}
- 将需要执⾏的内容主体和当前线程相关联形成新的 node
- 判断队列的是否为空的,如果是,直接⼊队列;如果不是,那当前的 node 就是作为队列新的 tail,并返回最新节点 node
我们接着看⼊队列是如何实现的
private Node enq(final Node node) {
for (;;) { //1.
Node t = tail;
if (t == null) { // Must initialize 2.
if (compareAndSetHead(new Node()))
tail = head;
} else { //3.
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
- ⾃旋进⾏⼊队列操作
- 如果队列还没初始化,进⾏初始化,头部尾部都是指向⼀个节点
- 如果已经初始化了,则进⾏ CAS 将新节点 Node 设置为队列新的 tail
acquireQueued
阻塞获取锁资源
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) { //1.
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) { //2.
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) && //3.
parkAndCheckInterrupt()) //4.
interrupted = true;
}
} finally {
if (failed)
0 (node);
}
}
- ⾃旋获取锁资源
- 当前节点的前⼀个节点是否为 head,且通过 tryAcquire ⽅法获取到锁,则剔除前节点,将当前节点设置为头部节点,然后退出⾃旋,执⾏线程主体的业务。
- 如果前⾯的拿锁失败了,判断是否应该进⼊ park 状态
- 如果第三步确定可以进⼊ park 状态,就执⾏执⾏ parkAndCheckInterrupt 将当前线程进⼊到 park 状态,阻塞线程并释放 cpu 资源
我们看看是如何判断当前线程是可以进⼊到 park 状态的
private static boolean shouldParkAfterFailedAcquire(Node pred, Node no
de) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
return true;
if (ws > 0) {
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
当前线程第⼀次进⼊ acquireQueued() ⾃旋,前⼀节点的 waitStatus 都是等于 0 的。所以第⼀次作⽤就是 compareAndSetWaitStatus 修改 waitStatus 的值,返回 false 后,再⾃旋从上⾯的第⼀步开始
执⾏,然后再次进⼊到 shouldParkAfterFailedAcquire ⽅法中,这个时候的前节点的 waitStatus 值为
Node.SIGNAL 了,就会返回 true,然后执⾏下⼀步,调⽤parkAndCheckInterrupt ⽅法
阻塞当前线程,进⼊到 park 状态,释放 CPU 资源
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
public static void park(Object blocker) {
Thread t = Thread.currentThread();
setBlocker(t, blocker);
UNSAFE.park(false, 0L);
setBlocker(t, null);
}
- 调⽤系统指令进⼊ park 状态,这个时候就会阻塞在这⼀步了,不再进⾏下去。
- 然后等到锁的资源释放,其中锁资源释放有个 unpark 操作(释放资源的时候再详细说明),这个时候,解除阻塞,然后⼜回到阻塞获取锁资源的第⼀步中.
释放锁资源
我们来看看 ReentrantLock 的释放锁资源是怎样的
public void unlock() {
sync.release(1);
}
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0) //2.
unparkSuccessor(h);
return true;
}
return false;
}
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) { //1.
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0)
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread); //3.
}
- 调⽤ tryRelease 将当前锁资源拥有者设置为 null,进⾏步骤2
- 发现阻塞队列中存在着元素,(注意和前⾯的获取锁失败,修改 waitStatus 进⼊到 park 状态进⾏对⽐就知道这⾥为什么这⾥这么判断了),调⽤ unpark
- 对 head 的的下⼀个节点进⾏ unpark 操作,所以下⼀个节点的就得以继续执⾏下去,也就是之前上⾯标识的步骤6,然后再次进⼊到 for ⾃旋获取锁
⾮公平锁体现在哪⾥?
我们在回头看看之前的⾮公平 nonfairTryAcquire ⽅法
/**
* Performs non-fair tryLock. tryAcquire is implemented in
* subclasses, but both need nonfair try for trylock method.
*/
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
结合上⾯的 unparkSuccessor ⽅法,如果这个时候已经执⾏完了
compareAndSetWaitStatus(node,
ws, 0) ⽅法,这个时候 ws 是等于 0 的,恰好就有新的线程过来竞争锁,那么这个线程就可以直接获取到锁了
调试代码
public class ReLockDemo {
public static void main(String[] args) throws InterruptedException {
test1();
}
/**
* 场景⼀ 线程1拿到锁,线程2尝试拿锁。
*/
public static void test1() throws InterruptedException {
Lock lock = new ReentrantLock();
Thread t1 = new Thread(() -> {
lock.lock();
System.out.println("线程" + Thread.currentThread().getId() + "拿到锁");
try {
Thread.sleep(6000L);
} catch (InterruptedException e) {
System.out.println("线程" + Thread.currentThread().getId()
+ "释放锁");
}
lock.unlock();
});
t1.start();
new Thread(() -> {
lock.lock();
System.out.println("线程" + Thread.currentThread().getId() + "拿到锁");
lock.unlock();
}).start();
t1.interrupt();
}
public static void test2() throws InterruptedException {
Lock lock = new ReentrantLock();
new Thread(()->{
lock.lock();
try {
Thread.sleep(5000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
lock.unlock();
}).start();
for(int i=0;i<100;i++){
int t = i;
new Thread(()->{
lock.lock();
System.out.println(t);
lock.unlock();
}).start();
Thread.sleep(10);
}
}
}
总结
- park 状态是由系统帮我们控制的阻塞状态,需要切换到内核态执⾏。所以 ReentrantLock 的想法是能少切换到内核态就尽量避免
- 整个 ReentrantLock 主体想法还是通过 for ⾃旋 + CAS + Queue 实现锁的功能
- 选择是创建⾮公平的锁,⾮公平体现在执⾏完 unlock 后,如果存在新线程,新线程会和队列中 head 的 next 节点进⾏竞争锁