1. 前言
今天我们来探讨下另一个核心锁
ReentrantLock
. 从具体的实现到JVM层面是如何实现的。 我们都会一一进行讨论的,好了,废话不多说了,我们就开始吧
2. ReentrantLock
以及synchronized
- 核心区别:
ReentrantLock
是一个抽象的基类synchronized
是一个关键字。- 从JVM层面来看的话,都是互斥锁的一种实现
- 效率区别:
- 如果竞争比较激烈,推荐ReentrantLock去实现,不存在锁升级概念。
- 而synchronized是存在锁升级概念的,如果升级到重量级锁,是不存在锁降级的
- 底层实现区别:
ReentrantLock
基于AQS实现的synchronized
是基于ObjectMonitor
- 功能区别:
ReentrantLock
的功能比synchronized更全面ReentrantLock
支持公平锁和非公平锁ReentrantLock
可以指定等待锁资源的时间
3. 简单实例展示
public class T21_Thread_Lock11 {
public static int count = 0;
private static Lock lock = new ReentrantLock();
public static void main(String[] args) throws InterruptedException {
Thread t1 = new Thread(T21_Thread_Lock11::add);
Thread t2 = new Thread(T21_Thread_Lock11::add);
t1.start();
t2.start();
t1.join();
t2.join();
System.out.println(count);
}
public static void add() {
try {
lock.lock();
for (int i = 0; i < 100000; i++) {
count ++;
}
} finally {
lock.unlock();
}
}
}
ReentrantLock
是需要手动开启以及关闭锁的。所以为了防止程序中途出现异常,将锁关闭的部分放到finally
中,说明一定会执行的。
4. 核心分析
4.1 AQS概述
- AQS就是AbstractQueuedSynchronizer抽象类,AQS其实就是JUC包下的一个基类,JUC下的很多内容都是基于AQS实现了部分功能,比如ReentrantLock,ThreadPoolExecutor,阻塞队列,CountDownLatch,Semaphore,CyclicBarrier等等都是基于AQS实现
- 首先AQS中提供了一个由volatile修饰,并且采用CAS方式修改的int类型的state变量
- 其次AQS中维护了一个双向链表,有head,有tail,并且每个节点都是Node对象
4.2 关键字分析
4.2.1 抽象基类
通过上述的截图中我们可以看到抽象类
AbstractQueuedSynchronizer
是由Doug Lea
在JDK1.5之后实现的
4.2.2 AbstractQueuedSynchronizer
关键字
public abstract class AbstractQueuedSynchronizer {
// 表示抢锁状态。 默认是0, 每次抢锁就会+1 锁释放就会-1
private volatile int state;
// 双向链表的头节点
private transient volatile Node head;
// 双向链表的尾节点
private transient volatile Node tail;
}
static final class Node {
// 表示共享锁的状态
static final Node SHARED = new Node();
// 表示互斥锁的状态
static final Node EXCLUSIVE = null;
// 表示线程取消
static final int CANCELLED = 1;
// 表示线程挂起
static final int SIGNAL = -1;
static final int CONDITION = -2;
static final int PROPAGATE = -3;
// 节点状态。一般都是表示后继节点的状态。例如:如果是-1 表示挂起
volatile int waitStatus;
// 表示前继节点
volatile Node prev;
// 表示后继节点
volatile Node next;
// 表示抢锁的线程
volatile Thread thread;
}
state
表示线程抢锁的一个状态head
双向链表的头节点tail
双向链表的尾节点SHARED
共享锁的标志EXCLUSIVE
互斥锁的标志waitStatus
其实下一个锁的状态。比如说是否需要唤醒等thread
表示当前执行/ 待执行 的线程prev
表示上一个Node节点next
表示下一个Node节点
4.2.3 加锁流程的概述
非公平锁的加锁方式
4.2.4 lock
实现部分
非公平锁
,1. 首先先进行抢锁,如果抢锁成功了将状态0 修改为 状态1,然后设置持有锁的线程。2.acquire
内部就是再次尝试抢锁,然后添加到队列中
- 非公平锁
final void lock() {
// 尝试修改state状态,进行抢锁
if (compareAndSetState(0, 1))
// 如果抢锁成功的话 直接设置线程为当前线程
setExclusiveOwnerThread(Thread.currentThread());
else
// 内部再次尝试抢锁,反之就是添加到队列中
acquire(1);
}
- 公平锁
final void lock() {
// 内部再次尝试抢锁,反之就是添加到队列中
acquire(1);
}
4.2.5 acquire
实现部分
方法
tryAcquire
再次尝试抢锁,如果抢锁抢不到的话,直接将节点标记为互斥锁后封装为node,添加到双向链表中
public final void acquire(int arg) {
// 再次抢锁。如果没有抢到的话 返回false
if (!tryAcquire(arg) &&
// 将线程封装为node 添加到双向链表 尾部
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
4.2.6 tryAcquire
实现部分
抢锁大致分为两种形式:1. 如果状态
state
为0的话,表示没有线程抢锁,尝试抢锁,抢锁成功后直接返回。 2. 状态state
不是0 && 被抢到的锁的线程 就是 当前的线程,此时表示是重入锁。 如果两者都不是的话,直接返回false。
final boolean nonfairTryAcquire(int acquires) {
// 表示获取当前线程
final Thread current = Thread.currentThread();
// 表示线程抢锁的state状态
int c = getState();
// 如果是0的话 表示还未抢到锁
if (c == 0) {
// 再次尝试抢锁。如果抢锁成功直接返回true
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
// 如果当前线程 跟 已抢锁线程保持一致的话 说明是锁重入的状态
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
// 上锁两种情况都不是的话 直接返回false
return false;
}
4.2.7 addWaiter
实现部分
将线程封装为Node元素,添加到链表的尾部
private Node addWaiter(Node mode) {
// 将当前节点 封装为node
Node node = new Node(Thread.currentThread(), mode);
// 表示尾节点
Node pred = tail;
// 如果尾节点不为空。 如果尾节点为null的话 说明链表中没有节点
if (pred != null) {
// 维持双向链表的关系
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
private Node enq(final Node node) {
// 死循环
for (;;) {
Node t = tail;
// 如果尾节点为空的 说明链表中没有节点
if (t == null) { // Must initialize
// 利用CAS加锁 来创建伪节点
if (compareAndSetHead(new Node()))
tail = head;
} else {
// 维持节点关系
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
4.2.8 acquireQueued
实现部分
// 当前没有拿到锁资源后,并且到AQS排队之后触发的方法
final boolean acquireQueued(final Node node, int arg) {
// 判断锁是否获取成功
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
// 表示当前节点的前继节点
final Node p = node.predecessor();
// 如果前继节点是head 再次尝试获取锁 如果前继节点是head的话,那么node就是第二个元素
if (p == head && tryAcquire(arg)) {
// 设置head
setHead(node);
// 将原来的head 设置为空。 方便GC回收
p.next = null; // help GC
failed = false;
return interrupted;
}
// 没拿到锁资源...
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
// 修改head节点的方法
private void setHead(Node node) {
// 将当前节点设置为head 节点
head = node;
// 当前节点是伪节点 无需要设置thread 线程
node.thread = null;
// 无前节点
node.prev = null;
}
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
// 表示前继节点的 后节点状态
int ws = pred.waitStatus;
// 如果后节点状态是-1的话 说明是后节点是挂起状态 直接返回true
if (ws == Node.SIGNAL)
return true;
// 如果ws是1的话 满足ws > 0的条件。 但是此时的节点是无效节点
if (ws > 0) {
// 通过循环之前往前找 直到是有效节点,将节点挂载到后面
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
// 设置状态 表示需要将当前节点挂起
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
4.2.9 tryLock
无参实现部分
// 此方法也是尝试加锁。
// 字段acquires 此时的值为1
final boolean nonfairTryAcquire(int acquires) {
// 获取执行的线程
final Thread current = Thread.currentThread();
// 表示抢锁 状态。
int c = getState();
// 此时表示 还没有线程抢到锁
if (c == 0) {
// 利用CAS来尝试修改state 状态
if (compareAndSetState(0, acquires)) {
// 设置线程
setExclusiveOwnerThread(current);
return true;
}
}
// 此步骤为重入锁的判断
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
4.2.10 tryLock
有参实现部分
大致的原理就是使用死循环来 递归判断时间来抢锁。如果时间到了 && 还没抢到锁 就直接跳出循环。
// 此方法也是尝试加锁
// 修改的state状态的值 以及时间
public final boolean tryAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
// 如果此线程被 标记为中断了 直接包异常
if (Thread.interrupted())
throw new InterruptedException();
// tryAcquire 尝试加锁。
return tryAcquire(arg) ||
doAcquireNanos(arg, nanosTimeout);
}
// 尝试指定时间内抢锁
private boolean doAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
// 如果延迟的时间为0的话 直接返回
if (nanosTimeout <= 0L)
return false;
// 表示结束时间
final long deadline = System.nanoTime() + nanosTimeout;
// 将节点添加到链表中
final Node node = addWaiter(Node.EXCLUSIVE);
// 默认抢锁是失败的
boolean failed = true;
try {
// 使用一个死循环进行抢锁
for (;;) {
// 表示前继节点
final Node p = node.predecessor();
// 如果前继节点是头节点。 再次尝试抢锁
if (p == head && tryAcquire(arg)) {
// 抢锁成功后 将当前节点设置为头结点
setHead(node);
p.next = null; // help GC
failed = false;
return true;
}
// 计算剩余时间
nanosTimeout = deadline - System.nanoTime();
// 如果没有剩余时间了 直接返回
if (nanosTimeout <= 0L)
return false;
if (shouldParkAfterFailedAcquire(p, node) &&
nanosTimeout > spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanosTimeout);
if (Thread.interrupted())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
// 节点状态判断的方法
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
// 如果此时状态是-1的话 表示node节点是一个挂起的状态
if (ws == Node.SIGNAL)
return true;
// 如果是ws是>0的话 表示是无效状态。
if (ws > 0) {
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
// 如果此时是等于0的状态的话, 将状态0修改为-1. 表示后继节点挂起
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
4.2.11 AQS
常见的问题
4.2.11.1 AQS中为什么要有一个虚拟的head节点
因为AQS提供了ReentrantLock的基本实现,而在ReentrantLock释放锁资源时,需要去考虑是否需要执行unparkSuccessor方法,去唤醒后继节点。
因为Node中存在waitStatus的状态,默认情况下状态为0,如果当前节点的后继节点线程挂起了,那么就将当前节点的状态设置为-1。这个-1状态的出现是为了避免重复唤醒或者释放资源的问题。
因为AQS中排队的Node中的线程如果挂起了,是无法自动唤醒的。需要释放锁或者释放资源后,再被释放的线程去唤醒挂起的线程。 因为唤醒节点需要从整个AQS双向链表中找到离head最近的有效节点去唤醒。而这个找离head最近的Node可能需要遍历整个双向链表。如果AQS中,没有挂起的线程,代表不需要去遍历AQS双向链表去找离head最近的有效节点。
为了避免出现不必要的循环链表操作,提供了一个-1的状态。如果只有一个Node进入到AQS中排队,所以发现如果是第一个Node进来,他必须先初始化一个虚拟的head节点作为头,来监控后继节点中是否有挂起的线程。
4.2.11.2 AQS中为什么选择使用双向链表,而不是单向链表
首先AQS中一般是存放没有获取到资源的Node,而在竞争锁资源时,ReentrantLock提供了一个方法,lockInterruptibly方法,也就是线程在竞争锁资源的排队途中,允许中断。中断后会执行cancelAcquire方法,从而将当前节点状态置位1,并且从AQS队列中移除掉。如果采用单向链表,当前节点只能按到后继或者前继节点,这样是无法将前继节点指向后继节点的,需要遍历整个AQS从头或者从尾去找。单向链表在移除AQS中排队的Node时,成本很高。
当前在唤醒后继节点时,如果是单向链表也会出问题,因为节点插入方式的问题,导致只能单向的去找有效节点去唤醒,从而造成很多次无效的遍历操作,如果是双向链表就可以解决这个问题。
5. 总结:
- 其实
ReentrantLock
的实现逻辑还是很简单,有几个比较重要的点,这里可以描述下:- 字段
state
,被volatile
修饰。保证了可见性以及有序性。为抢锁是否成功提供了凭据- 用双向链表来存放排队挂起的线程。无非是使用特殊的手段来维护链表
- 每个链表节点中存在属性
waitStatus
来表示下个节点是否挂起,默认是0,如果挂起表示-1
. 同时存在伪head节点来 表示第一个真实节点是否被挂起
废话不多说了,分享就到这里,如果什么新的想法评论区记得及时留言哦。