本篇是关于 JUC 并发包中独占锁 ReentrantLock 底层源码的解析,在阅读之前需要对 AQS 抽象队列有基本的了解。
文章目录
- 1.1 类图结构
- 1.2 获取锁
- 1)void lock() 方法
- 2)void lockInterruptibly() 方法
- 3)boolean tryLock() 方法
- 4)boolean tryLock(long timeout, TimeUnit unit)
- 1.3 释放锁
- 1.4 使用 ReentrantLock 的好处
ReentrantLock
是一个可重入的独占锁,同时只要一个线程获取到锁,其他线程获取锁的时候,会被阻塞并且放入锁的 AQS 阻塞队列;ReentrantLock 中实现了公平锁和非公平锁,使用默认构造方法得到的是非公平锁,类中提供了一个传入布尔值的构造方法,ReentrantLock(boolean fair)
使用该构造方法传入true
得到的就是公平锁。
1.1 类图结构
先来看一下 ReentrantLock 的类图结构
ReentrantLock
最终还是通过 AQS 队列来实现的:
/**
* Creates an instance of {@code ReentrantLock}.
* This is equivalent to using {@code ReentrantLock(false)}.
*/
public ReentrantLock() {
sync = new NonfairSync();
}
/**
* Creates an instance of {@code ReentrantLock} with the
* given fairness policy.
*
* @param fair {@code true} if this lock should use a fair ordering policy
*/
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
ReentrantLock
中提供了以上的两种构造方法,其作用和引言中说的相同,即构造公平锁或者非公平锁,ReentrantLock
中的内部类 Sync
这个类直接继承了 AQS
,它的两个子类 FairSync
和 NonfairLock
分别实现了公平锁和非公平锁, ReentrantLock
类中的核心方法都是调用这两个子中类的方法来实现的。
在这里 AQS 队列中的 state 代表锁的可重入次数,默认的情况下,state 的值为 0,表示锁没有被任何线程持有,当第一个线程获取锁的时候会尝试使用 CAS 操作将 state 的值设置为 1,如果这个操作成功了就将锁的持有者设置为这个线程。在第二次获取锁的时候这个值会被设置为 1,当线程释放锁的时候会使 state 减一,直到减为 0 的时候,线程将锁释放。
虽然是可重入锁,但是重入的次数并不是无限次,
/**
* The synchronization state.
*/
private volatile int state;
AQS 中的 state 是用 int
声明的,重入的最大次数就是 int
的最大值,也就是
如果超过这个数字,就会因为越界使其变为负数,此时会抛出异常:
throw new Error("Maximum lock count exceeded");
比如我们写这样一段代码:
public class ReentrantLockSourceCode {
static ReentrantLock lock = new ReentrantLock(true);
public static void main(String[] args) {
Thread thread = new Thread(() -> {
// 不断获取锁
while (true) lock.lock();
});
thread.start();
}
}
构造一个线程,在循环中不断的去请求锁,最终我们就会喜提这个异常:
1.2 获取锁
1)void lock() 方法
在上面的案例中我们已经使用过了这个方法,当线程希望获取锁的时候就去调用这个方法:
public void lock() {
sync.lock();
}
这个方法实际上是委托给了 sync 执行的,sync 会在初始化的时候根据传入的值设置为公平锁或非公平锁,这里先来看使用的最多的非公平锁中的实现:
/**
* Performs lock. Try immediate barge, backing up to normal
* acquire on failure.
*/
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
先去使用 CAS 操作将值设置为 1,如果成功了就将当前线程设置为锁的持有者;反之则会调用 AQS 抽象类中的 acquire(1); 方法。
这里调用了 AQS 类中的 acquire 方法:
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
这个方法中首先调用了 tryAcquire(int acquires)
方法去尝试获取锁,如果获取失败了,就将线程放到阻塞队列中并挂起,AQS 作为抽象类,其中没有提供 tryAcquire 的具体实现,根据不同的需求在子类中实现相应的逻辑:
/**
* Performs non-fair tryLock. tryAcquire is implemented in
* subclasses, but both need nonfair try for trylock method.
*/
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
// 锁没有被占有
if (c == 0) {
// 采用 CAS 操作修改 state
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
// 锁被当前线程持有
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires; // 增加 state
if (nextc < 0) // 越界的情况
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
// 以上两种情况都不符合,返回 false
return false;
}
这样就完成了上锁的操作,来梳理一下这个方法的流程
线程尝试去获取锁,使用 CAS 操作尝试将 state 值从 0 修改为 1
- 如果修改成功将锁的持有者设置为该线程
- 如果失败了就有两种可能:当前线程持有锁和其他线程持有锁,调用 AQS 中的 acquire 方法
- 调用子类中实现的 tryAcquire 方法
- 尝试再次使用 CAS 操作,如果修改失败了则判断当前锁是否被当前线程持有
- 持有则递增 state
- 反之直接返回 false
- 尝试再次使用 CAS 操作,如果修改失败了则判断当前锁是否被当前线程持有
- tryAcquire 成功则直接返回,反之则将线程加入队列中并且阻塞挂起。
- 调用子类中实现的 tryAcquire 方法
再来补充一下公平锁的 tryAcquire()
方法的实现:
/**
* Fair version of tryAcquire. Don't grant access unless
* recursive call or no waiters or is first.
*/
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
// (1)
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
}
不同之处在于在上面的(1)位置加上了 hasQueuedPredecessors()
方法去判断队列中是否有前置的线程,如果有则不会去尝试获取锁,这样就保证了公平性。
2)void lockInterruptibly() 方法
这个方法和 lock 方法类似,它的不同之处在于,使用这个方法获取锁的线程对于 interrupt() 方法会做出立即反应,即抛出 InterruptedException 异常然后返回,而 lock 方法则是在阻塞结束后再去判断线程在挂起的过程中是否出现阻塞。
public void lockInterruptibly() throws InterruptedException {
sync.acquireInterruptibly(1);
}
实际调用了抽象类中的 acquireInterruptibly
方法:
public final void acquireInterruptibly(int arg)
throws InterruptedException {
// 如果线程被中断,直接抛出异常
if (Thread.interrupted())
throw new InterruptedException();
if (!tryAcquire(arg))
// 调用 AQS 的可终端方法
doAcquireInterruptibly(arg);
}
doAcquireInterruptibly(arg);
方法当线程返回并发现阻塞的时候,会直接抛出异常,其他方法中则会返回一个布尔值来表示线程在阻塞过程中是否被中断,让开发者自己去设置处理的方式。
3)boolean tryLock() 方法
public boolean tryLock() {
return sync.nonfairTryAcquire(1);
}
/**
* Performs non-fair tryLock. tryAcquire is implemented in
* subclasses, but both need nonfair try for trylock method.
*/
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
尝试获取锁,如果锁没有被其他线程持有,当前线程获取到锁并且返回 true,反之则返回 false;当获取失败了时候,这个方法并不会将线程放到阻塞队列。
4)boolean tryLock(long timeout, TimeUnit unit)
尝试获取锁,与上面方法不同的是,方法设置了超时时间,如果这个时间内还没有获取到锁,则会直接返回 false。
lock.tryLock(1000, TimeUnit.SECONDS);
public boolean tryLock(long timeout, TimeUnit unit)
throws InterruptedException {
// 调用 AQS 中的 tryAcquireNanos 方法
return sync.tryAcquireNanos(1, unit.toNanos(timeout));
}
1.3 释放锁
public void unlock() {
sync.release(1);
}
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
调用了 AQS 队列中 release 方法,方法中调用了 tryRelease 方法来尝试释放锁,同样,这个方法没有在抽象类中实现,而是在子类中去实现,这个方法最终会返回锁是否空闲,如果空闲的话执行 unparkSuccessor 方法去解除下一个可被唤醒线程的挂起状态,让它们去争夺锁。
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
// 如果锁不被当前线程持有,则抛出异常
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
// 如果 state 变为了 0,释放锁
if (c == 0) {
free = true;
// 清空锁的持有者
setExclusiveOwnerThread(null);
}
setState(c);
return free; // 锁是否被线程持有
}
/**
* 如果存在的话,唤醒 Successer 线程
*/
private void unparkSuccessor(Node node) {
/*
* 在进行唤醒操作之前,尝试清零等待状态以便为唤醒操作做准备,
* 即使这种操作可能失败或者状态在操作过程中被修改,也是可以接受的。
*/
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
/*
* 要接触挂起状态的线程一般是下一个线程,但如果下一个线程被取消
* 或者为空,则从尾部开始遍历查找可以解除的线程
*/
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
1.4 使用 ReentrantLock 的好处
- 可定时的锁等待:
ReentrantLock
提供了tryLock(long timeout, TimeUnit unit)
方法,可以在指定的时间内等待获取锁。这在需要避免线程长时间等待的情况下很有用。 - 可中断的锁等待:
ReentrantLock
提供了lockInterruptibly()
方法,允许在等待锁的过程中响应中断,这样可以避免线程长时间阻塞。 - 公平锁和非公平锁:
ReentrantLock
提供了公平锁和非公平锁两种模式,可以通过构造函数来选择。公平锁会按照线程请求锁的顺序来获取锁,而非公平锁则不保证顺序。而synchronized
关键字锁默认是非公平的。 - 可重入性:
ReentrantLock
是可重入锁,同一个线程可以多次获取同一个锁而不会死锁。而synchronized
关键字锁也是可重入的,但是必须在同一个方法或者代码块中。 - 条件变量:
ReentrantLock
提供了Condition
接口(AQS 提供的),可以设置多个 Condition 对象来更好的控制线程的状态。