一、AQS原理剖析
什么是AQS
- java.util.concurrent包中的大多数同步器实现都是围绕着共同的基础行为,比如
等待队列
、条件队列
、独占获取
、共享获取
等 - 而这些行为的抽象就是基于AbstractQueuedSynchronizer(简称AQS)实现的,AQS是一个抽象同步框架,可以用来实现一个依赖状态的
同步器
。 - JDK中提供的大多数的同步器如Lock, Latch, Barrier等,都是基于AQS框架来实现的
- 一般是通过一个内部类Sync继承 AQS,然后将同步器所有调用都映射到Sync对应的方法
AQS具备的特性: 阻塞等待队列、共享/独占、公平/非公平、可重入、允许中断
AQS内部维护属性 “volatile int state”
表示资源的可用状态
State三种访问方式: getState() 、setState() 、compareAndSetState()
AQS定义两种资源共享方式
- Exclusive-独占,只有一个线程能执行,如ReentrantLock
- Share-共享,多个线程可以同时执行,如Semaphore/CountDownLatch
AQS定义两种队列
-
同步等待队列
: 主要用于维护获取锁失败时入队的线程 -
条件等待队列
: 调用await()的时候会释放锁,然后线程会加入到条件队列,调用signal()唤醒的时候会把条件队列中的线程节点移动到同步队列中,等待再次获得锁
AQS 定义了5个队列中节点状态:
-
值为0,初始化状态,表示当前节点在sync队列中,等待着获取锁。
-
CANCELLED,值为1,表示当前的线程被取消;
-
SIGNAL,值为-1,表示当前节点的后继节点包含的线程需要运行,也就是unpark;
-
CONDITION,值为-2,表示当前节点在等待condition,也就是在condition队列中;
-
PROPAGATE,值为-3,表示当前场景下后续的acquireShared能够得以执行;
不同的自定义同步器竞争共享资源的方式也不同。自定义同步器在实现时只需要实现共享资源state的获取与释放方式即可,至于具体线程等待队列的维护(如获取资源失败入队/唤醒出队等),AQS已经在顶层实现好了。
自定义同步器实现时主要实现以下几种方法:
- isHeldExclusively():该线程是否正在独占资源。只有用到condition才需要去实现它。
- tryAcquire(int):独占方式。尝试获取资源,成功则返回true,失败则返回false。
- tryRelease(int):独占方式。尝试释放资源,成功则返回true,失败则返回false。
- tryAcquireShared(int):共享方式。尝试获取资源。负数表示失败;0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。
- tryReleaseShared(int):共享方式。尝试释放资源,如果释放后允许唤醒后续等待结点返回true,否则返回false。
同步等待队列
AQS当中的同步等待队列也称 CLH队列
,CLH队列是Craig、Landin、Hagersten三人发明的一种基于双向链表数据结构的队列,是 FIFO
先进先出线程等待队列,Java中的CLH队列是原CLH队列的一个变种,线程由原自旋机制改为阻塞机制。
AQS 依赖CLH同步队列来完成同步状态的管理:
-
当前线程如果获取同步状态失败时,AQS则会将当前线程已经等待状态等信息构造成一个节点(Node)并将其加入到CLH同步队列,同时会阻塞当前线程
-
当同步状态释放时,会把首节点唤醒(公平锁),使其再次尝试获取同步状态。
-
通过
signal
或signalAll
将条件队列中的节点转移到同步队列。(由条件队列转化为同步队列)
条件等待队列
AQS中条件队列是使用 单向列表
保存的,用nextWaiter来连接:
调用await方法阻塞线程,当前线程存在于同步队列的头结点,调用await方法进行阻塞(从同步队列转化到条件队列)
Condition接口详解
-
调用Condition#await方法会
释放当前持有的锁
,然后阻塞当前线程,同时向Condition队列尾部
添加一个节点,所以调用Condition#await方法的时候必须持有锁。 -
调用Condition#signal方法会将Condition队列的首节点移动到阻塞队列尾部
-
然后唤醒因调condition#await方法而阻塞的线程(唤醒之后这个线程就可以去竞争锁了),所以调用Condition#signal方法的时候必须持有锁,持有锁的线程唤醒被因调用Condition#await方法而阻塞的线程。
等待唤醒机制之await/signal测试
@Slf4j
public class ConditionTest {
public static void main(String[] args) {
Lock lock = new ReentrantLock();
Condition condition = lock.newCondition();
new Thread(() -> {
lock.lock();
try {
log.debug(Thread.currentThread().getName() + " 开始处理任务");
condition.await();
log.debug(Thread.currentThread().getName() + " 结束处理任务");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}).start();
new Thread(() -> {
lock.lock();
try {
log.debug(Thread.currentThread().getName() + " 开始处理任务");
Thread.sleep(2000);
condition.signal();
log.debug(Thread.currentThread().getName() + " 结束处理任务");
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}).start();
}
二、ReentrantLock介绍
ReentrantLock是一种基于AQS框架的应用实现,是JDK中的一种线程并发访问的同步手段,它的功能类似于synchronized是一种互斥锁,可以保证线程安全。
相对于 synchronized, ReentrantLock具备如下特点:
- 可中断
- 可以设置超时时间
- 可以设置为公平锁
- 支持多个条件变量
- 与 synchronized 一样,都支持可重入
三、ReentrantLock使用
3.1 线程安全测试
public class ReentrantLockDemo {
private static int sum = 0;
private static Lock lock = new ReentrantLock();
public static void main(String[] args) throws InterruptedException {
for (int i = 0; i < 3; i++) {
Thread thread = new Thread(()->{
//加锁
lock.lock();
try {
for (int j = 0; j < 10000; j++) {
sum++;
}
} finally {
// 解锁
lock.unlock();
}
});
thread.start();
}
Thread.sleep(2000);
System.out.println(sum);
}
3.2 可重入
@Slf4j
public class ReentrantLockDemo2 {
public static ReentrantLock lock = new ReentrantLock();
public static void main(String[] args) {
method1();
}
public static void method1() {
lock.lock();
try {
log.debug("execute method1");
method2();
} finally {
lock.unlock();
}
}
public static void method2() {
lock.lock();
try {
log.debug("execute method2");
method3();
} finally {
lock.unlock();
}
}
public static void method3() {
lock.lock();
try {
log.debug("execute method3");
} finally {
lock.unlock();
}
}
3.3 可中断
@Slf4j
public class ReentrantLockDemo3 {
public static void main(String[] args) {
ReentrantLock lock = new ReentrantLock();
Thread t1 = new Thread(() -> {
log.debug("t1启动...");
try {
lock.lockInterruptibly();
try {
log.debug("t1获得了锁");
} finally {
lock.unlock();
}
} catch (InterruptedException e) {
e.printStackTrace();
log.debug("t1等锁的过程中被中断");
}
}, "t1");
lock.lock();
try {
log.debug("main线程获得了锁");
t1.start();
//先让线程t1执行
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
t1.interrupt();
log.debug("线程t1执行中断");
} finally {
lock.unlock();
}
}
3.4 锁超时
立即失败
@Slf4j
public class ReentrantLockDemo4 {
public static void main(String[] args) {
ReentrantLock lock = new ReentrantLock();
Thread t1 = new Thread(() -> {
log.debug("t1启动...");
// 注意: 即使是设置的公平锁,此方法也会立即返回获取锁成功或失败,公平策略不生效
if (!lock.tryLock()) {
log.debug("t1获取锁失败,立即返回false");
return;
}
try {
log.debug("t1获得了锁");
} finally {
lock.unlock();
}
}, "t1");
lock.lock();
try {
log.debug("main线程获得了锁");
t1.start();
//先让线程t1执行
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
} finally {
lock.unlock();
}
}
超时失败
@Slf4j
public class ReentrantLockDemo4 {
public static void main(String[] args) {
ReentrantLock lock = new ReentrantLock();
Thread t1 = new Thread(() -> {
log.debug("t1启动...");
//超时
try {
if (!lock.tryLock(1, TimeUnit.SECONDS)) {
log.debug("等待 1s 后获取锁失败,返回");
return;
}
} catch (InterruptedException e) {
e.printStackTrace();
return;
}
try {
log.debug("t1获得了锁");
} finally {
lock.unlock();
}
}, "t1");
lock.lock();
try {
log.debug("main线程获得了锁");
t1.start();
//先让线程t1执行
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
} finally {
lock.unlock();
}
}
3.5 公平锁
ReentrantLock 默认是不公平的
@Slf4j
public class ReentrantLockDemo5 {
public static void main(String[] args) throws InterruptedException {
ReentrantLock lock = new ReentrantLock(true); //公平锁
for (int i = 0; i < 500; i++) {
new Thread(() -> {
lock.lock();
try {
try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
log.debug(Thread.currentThread().getName() + " running...");
} finally {
lock.unlock();
}
}, "t" + i).start();
}
// 1s 之后去争抢锁
Thread.sleep(1000);
for (int i = 0; i < 500; i++) {
new Thread(() -> {
lock.lock();
try {
log.debug(Thread.currentThread().getName() + " running...");
} finally {
lock.unlock();
}
}, "强行插入" + i).start();
}
}
}
输出结果:
t0 running...
t1 running...
t2 running...
......
强行插入0 running...
强行插入1 running...
强行插入2 running...
......
3.6 非公平锁
四、ReentrantLock源码剖析
4.1 lock方法
公平&非公平的方法源码
// 公平锁的sync的lock方法
final void lock() {
acquire(1);
}
// 非公平锁的sync的lock方法
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
4.2 acquire方法
acquire是一个业务方法,里面并没有实际的业务处理,都是在调用其他方法
// 核心acquire arg = 1
public final void acquire(int arg) {
/**
* 1. 调用tryAcquire方法:尝试获取锁资源(非公平、公平),拿到锁资源,返回true,直接结束方法,否则执行&&后面的方法
*
* 2. 当没有获取锁资源后,会先调用addWaiter:会将没有获取到锁资源的线程封装为Node对象,并且插入到AQS的队列的末尾,并且作为tail
*
* 3. 继续调用acquireQueued方法,查看当前排队的Node是否在队列的前面,如果在前面(head的next),尝试获取锁资源,否则尝试将线程挂起,阻塞起来!
*/
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
4.3 tryAcquire方法
tryAcquire分为公平和非公平两种
tryAcquire主要做了两件事:
- 如果state为0,尝试获取锁资源
- 如果state不为0,看一下是不是锁重入操作,如果不是
非公平:
// 非公平锁实现!
final boolean nonfairTryAcquire(int acquires) {
// 拿到当前线程!
final Thread current = Thread.currentThread();
// 拿到AQS的state
int c = getState();
// 如果state == 0,说明没有线程占用着当前的锁资源
if (c == 0) {
// 没人占用锁资源,我直接抢一波(不管有没有线程在排队)
if (compareAndSetState(0, acquires)) {
// 将当前占用这个互斥锁的线程属性设置为当前线程
setExclusiveOwnerThread(current);
// 返回true,拿锁成功
return true;
}
}
// 当前state != 0,说明有线程占用着锁资源
// 判断拿着锁的线程是不是当前线程(锁重入)
else if (current == getExclusiveOwnerThread()) {
// 将state再次+1
int nextc = c + acquires;
// 锁重入是否超过最大限制
// 0(符号位) 1111111 11111111 11111111 11111111 + 1
// 1(符号位)0000000 00000000 00000000 00000000
// 抛出error
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
// 将值设置给state
setState(nextc);
// 返回true,拿锁成功
return true;
}
return false;
}
公平锁:
// 公平锁实现
protected final boolean tryAcquire(int acquires) {
// 拿到当前线程!
final Thread current = Thread.currentThread();
// 拿到AQS的state
int c = getState();
// c == 0 没有其他线程获取锁
if (c == 0) {
// 判断是否有线程在排队,如果有线程排队,直接不执行返回最外层的false
// 如果没有线程排队,执行compareAndSetState()方法,CAS尝试修改status属性为1(获取锁资源)
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current); // 设置当前独占锁的线程
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
4.4 addWaiter方法
在获取锁资源 失败
后,需要将当前线程封装为Node对象,并且插入到AQS队列的末尾
// 将当前线程封装为Node对象,并且插入到AQS队列的末尾
private Node addWaiter(Node mode) {
// 将当前线程封装为Node对象,mode为null,代表互斥锁
Node node = new Node(Thread.currentThread(), mode);
// pred是tail节点
Node pred = tail;
// 如果pred不为null,有线程正在排队
if (pred != null) {
// 将当前节点的prev,指定tail尾节点
node.prev = pred;
// 以CAS的方式,将当前节点变为tail节点
if (compareAndSetTail(pred, node)) {
// 之前的tail的next指向当前节点
pred.next = node;
return node;
}
}
// 添加的流程为, 自己prev指向、tail指向自己、前节点next指向我
// 如果上述方式,CAS操作失败,导致加入到AQS末尾失败,如果失败,就基于enq的方式添加到AQS队列
enq(node);
return node;
}
// enq,无论怎样都添加进入
private Node enq(final Node node) {
for (;;) {
// 拿到tail
Node t = tail;
// 如果tail为null,说明当前没有Node在队列中
if (t == null) {
// 创建一个新的Node作为head,并且将tail和head指向一个Node
if (compareAndSetHead(new Node()))
tail = head;
} else {
// 和上述代码一致!
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
4.5 acquireQueued
- acquireQueued方法会查看当前排队的Node是否是head的next
- 如果是,尝试获取锁资源,如果不是或者获取锁资源失败那么就尝试将当前Node的线程挂起(unsafe.park())
- 在挂起线程前,需要确认当前节点的上一个节点的状态必须是小于等于0,
-
如果为1,代表是取消的节点,不能挂起
-
如果为-1,代表挂起当前线程
-
如果为-2,-3,需要将状态改为-1之后,才能挂起当前线程
// acquireQueued方法
// 查看当前排队的Node是否是head的next,
// 如果是,尝试获取锁资源,
// 如果不是或者获取锁资源失败那么就尝试将当前Node的线程挂起(unsafe.park())
final boolean acquireQueued(final Node node, int arg) {
// 标识。
boolean failed = true;
try {
// 循环走起
for (;;) {
// 拿到上一个节点
final Node p = node.predecessor();
if (p == head && // 说明当前节点是head的next
tryAcquire(arg)) { // 竞争锁资源,成功:true,失败:false
// 进来说明拿到锁资源成功
// 将当前节点置位head,thread和prev属性置位null
setHead(node);
// 帮助快速GC
p.next = null;
// 设置获取锁资源成功
failed = false;
// 不管线程中断。
return interrupted;
}
// 如果不是或者获取锁资源失败,尝试将线程挂起
// 第一个事情,当前节点的上一个节点的状态正常!
// 第二个事情,挂起线程
if (shouldParkAfterFailedAcquire(p, node) &&
// 通过LockSupport将当前线程挂起
parkAndCheckInterrupt())
}
} finally {
if (failed)
cancelAcquire(node);
}
}
// 确保上一个节点状态是正确的
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
// 拿到上一个节点的状态
int ws = pred.waitStatus;
// 如果上一个节点为 -1
if (ws == Node.SIGNAL)
// 返回true,挂起线程
return true;
// 如果上一个节点是取消状态
if (ws > 0) {
// 循环往前找,找到一个状态小于等于0的节点
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
// 将小于等于0的节点状态该为-1
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
4.6 unlock方法
释放锁资源:
- 将state-1。
- 如果state减为0了,唤醒在队列中排队的Node。(一定唤醒离head最近的)
释放锁不分公平和非公平,就一个方法。
// 真正释放锁资源的方法
public final boolean release(int arg) {
// 核心的释放锁资源方法
if (tryRelease(arg)) {
// 释放锁资源释放干净了。 (state == 0)
Node h = head;
// 如果头节点不为null,并且头节点的状态不为0,唤醒排队的线程
if (h != null && h.waitStatus != 0)、
// 唤醒线程
unparkSuccessor(h);
return true;
}
// 释放锁成功,但是state != 0
return false;
}
// 核心的释放锁资源方法
protected final boolean tryRelease(int releases) {
// 获取state - 1
int c = getState() - releases;
// 如果释放锁的线程不是占用锁的线程,抛异常
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
// 是否成功的将锁资源释放利索 (state == 0)
boolean free = false;
if (c == 0) {
// 锁资源释放干净。
free = true;
// 将占用锁资源的属性设置为null
setExclusiveOwnerThread(null);
}
// 将state赋值
setState(c);
// 返回true,代表释放干净了
return free;
}
// 唤醒节点
private void unparkSuccessor(Node node) {
// 拿到头节点状态
int ws = node.waitStatus;
// 如果头节点状态小于0,换为0
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
// 拿到当前节点的next
Node s = node.next;
// 如果s == null ,或者s的状态为1
if (s == null || s.waitStatus > 0) {
// next节点不需要唤醒,需要唤醒next的next
s = null;
// 从尾部往前找,找到状态正常的节点。(小于等于0代表正常状态)
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
// 经过循环的获取,如果拿到状态正常的节点,并且不为null
if (s != null)
// 唤醒线程
LockSupport.unpark(s.thread);
}
4.7 为什么唤醒线程时,为啥从尾部往前找,而不是从前往后找?
- 因为在addWaiter操作时,是先将当前Node的prev指针指向前面的节点,然后是通过CAS将tail赋值给当前Node
- 最后才是能上一个节点的next指针,指向当前Node。
- 如果从前往后,通过next去找,可能CAS成功把tail指向新插入的node时,还未把前驱I节点的next指向当前节点,可能会
丢失
某个节点,导致这个节点不会被唤醒~ - 如果从后往前找,肯定可以找到全部的节点。