AQS
之前介绍synchronized关键字时提到过管程的概念,synchronized就是JVM内置管程,其使用的是管程的MESA模型。但是synchronized有一些缺点:
- 非公平锁,可能会使得一些线程长久抢占不到锁,导致其处于饥饿状态;
- 不可被中断;
- 在早期(JDK1.5以前)的JDK版本中,synchronized性能较差。
为了解决以上问题,JVM在Java代码层面实现了自己的加锁机制,并且此加锁机制也是参照MESA模型来设计的,这个加锁机制就是AQS(AbstractQueueSynchronized)。AQS是一个抽象同步框架,可以用来实现一个依赖状态的同步器。java.util.concurrent包中的大多数同步器都是围绕着AQS定义的一些共同的基础行为来实现的,例如等待队列、条件队列、独占获取、共享获取等。
JDK提供的大多数的同步器都是基于AQS框架来实现的,例如Lock、CountDownLatch和CyclicBarrier等。其实现方式基本都是定义一个内部类Sync继承AQS,随后在内部类中实现当前同步器需要的所有的基础行为。
AQS结构介绍
在AQS的设计中,使用一个volatile修饰的state变量来表示资源的状态,但对于AQS的不同实现,这个state表示的意义也是不同的。例如对于ReentrantLock来说state是某个线程重入锁的次数,对于Semaphore来说state是共享资源的数量等等。
AQS具有MESA模型所定义的同步队列和条件队列。
- 同步等待队列:主要用于维护获取锁失败时入队的线程;
- 条件等待队列:调用await()时会释放锁,线程加入到条件队列,调用signal()方法时会唤醒条件队列中的线程,将其移动到同步等待队列中,等待再次获得锁。
AQS在其内部自定义了表示同步队列节点或条件等待队列节点的Node类,每个Node都封装了一个线程Thread对象、该线程在等待队列的状态以及当前线程获取的锁的性质(独占锁or共享锁)。AQS定义了5个队列中节点状态:
- 初始化状态,值为0,表示当前节点在sync队列中,等待获取锁;
- CANCELLED,值为1,表示当前线程被取消;
- SIGNAL,值为-1,表示当前节点的后继节点包含的线程可以调用unpark()方法唤醒;
- CONDITION,值为-2,表示当前节点在等待condition,即在condition队列中;
- PROPAGATE,值为-3,表示当前场景下后续的acquireShare能够执行。
不同的自定义同步器竞争共享资源的方式一般不同,当我们自定义同步器时只需要实现共享资源state的获取和释放方式即可,对于具体线程等待队列的维护,AQS已经在顶层实现好了。自定义同步器主要实现以下几个方法:
- isHeldExclusively():该线程是否正在独占资源,当用到Condition时需要实现;
- tryAcquire(int):以独占的方式尝试获取资源,成功则返回true,失败则返回false;
- tryRelease(int):以独占的方式尝试释放资源,成功返回true,失败返回false;
- tryAcquireShared(int):以共享的方式尝试获取资源,返回负数表示失败;0表示成功,但没有剩余可用资源;正数表示成功且有剩余可用资源;
- tryReleaseShared(int):以共享的方式尝试释放资源,如果释放后允许唤醒后续等待结点返回true,否则返回false。
AQS的实现可以用于重入锁的特性(例如ReentrantLock),这个特性是由AQS实现AbstractOwnableSynchronizer接口赋予的,此接口只包含一个独占锁的锁拥有者属性exclusiveOwnerThread以及访问该属性的方法。
同步等待队列
AQS中的同步等待队列也叫CLH队列,是一种基于双向链表数据结构的FIFO先进先出的线程等待队列。AQS依赖此同步队列来完成同步状态的管理,关于此队列的操作如下:
- 当前线程如果获取同步状态失败,AQS会将当前线程的等待状态等信息构造成一个节点,将其加入到CLH同步队列,同时会阻塞当前线程;
- 当同步状态释放时,AQS会将CLH队列的首节点唤醒(公平锁),使其再次尝试获取同步状态;
- 通过调用singal()或singalAll()方法可以将条件队列中的节点转移到同步队列。
条件队列
AQS中的条件队列是使用单向链表保存的,且与synchronized不同的是,AQS的MESA模型包含多个条件队列。关于条件队列的操作如下:
- 调用await()方法阻塞线程,将该线程加入条件队列;
- 当前线程存在于同步队列的头节点,调用await()方法进行阻塞,将其放入条件队列。
Condition接口
AQS对于线程进入条件队列和从条件队列中被唤醒的操作是由Condition接口实现的。Condition接口主要提供了以下两个方法:
- await():调用await()方法的线程会释放当前持有的锁,然后阻塞当前线程,同时向Condition队列尾部添加一个节点,因此调用该方法时当前线程必须要持有锁;
- singal():调用singal()方法会将会唤醒因调用当前Condition对象的await()方法而阻塞的线程,将条件队列的首节点移动到CLH队列尾部,随后这个线程就可以竞争锁了。
ReentrantLock
ReentrantLock是一种基于AQS框架的应用实现,是JDK中的一种线程并发访问的同步手段,其功能类似于synchronized,是一种互斥锁,可以保证线程安全。但ReentrantLock相比synchronized,又具有以下特点:
- 支持中断;
- 可以设置超时时间;
- 具有公平锁的实现;
- 支持多个条件变量和条件队列。
ReentrantLock和synchronized的区别如下:
- synchronized是JVM层次的锁实现,ReentrantLock是JDK层次的锁实现;
- synchronized的锁状态是无法在代码中直接判断的,ReentrantLock可以通过isLocked()方法判断;
- synchronized是非公平锁,ReentrantLock可以是公平锁也可以是非公平锁;
- synchronized是不能被中断的,ReentrantLock提供了lockInterruptibly()来支持中断;
- 在发生异常时synchronized可以自动释放锁,ReentrantLock需要我们在finally块中手动释放;
- ReentrantLock获取锁的形式有多种,更加灵活;
- synchronized在特定的情况下对于已经在等待的线程是后来的线程先获得锁,ReentrantLock对于已经等待的线程是先来先得。
API
ReentrantLock是AQS的独占锁的实现,在其内部通过创建一个内部类Sync来继承AbstractQueuedSynchronized来实现相应的功能。ReentrantLock分别实现了公平锁和非公平锁,因此分别又有一个公平Sync和非公平Sync通过继承Sync来使用AQS提供的基本功能的同时外加自己的特定逻辑。
ReentrantLock默认是非公平锁的实现,其构造函数有两个,如下:
//默认构造函数,非公平锁
public ReentrantLock() {
sync = new NonfairSync();
}
//传参是否公平锁
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
ReentrantLock常用方法如下:
- lock():加锁;
- lockInterruptibly():可响应中断地加锁;
- unlock():解锁;
- newCondition():创建条件变量对象;
- isLocked():是否加锁;
- isFair():是否公平锁;
使用示例
使用ReentrantLock加锁的方式如下面的代码:
public class ReentrantLockDemo {
private static int sum = 0;
private static Lock lock = new ReentrantLock();
public static void main(String[] args) throws InterruptedException {
for (int i = 0; i < 3; i++) {
Thread thread = new Thread(()->{
//加锁
lock.lock();
try {
// 临界区代码
for (int j = 0; j < 10000; j++) {
sum++;
}
} finally {
// 解锁
lock.unlock();
}
});
thread.start();
}
Thread.sleep(2000);
System.out.println(sum);
}
}
以上程序在不使用任何锁的情况下是有线程安全问题的,在使用ReentrantLock加锁后最后运行结果为3000,可以验证ReentrantLock可以保证线程安全。
ReentrantLock还支持在获取锁时设置超时时间,超时则直接获取锁失败。例如下面的程序中main线程首先获取锁,随后线程t1获取锁并设置超时时间1S,由于main线程睡眠2S,线程t1是肯定会超时的。
public class ReentrantLockDemo4 {
public static void main(String[] args) throws InterruptedException {
ReentrantLock lock = new ReentrantLock();
Thread t1 = new Thread(() -> {
log.debug("t1启动...");
try {
if (!lock.tryLock(1, TimeUnit.SECONDS)) {
log.debug("等待 1s 后获取锁失败,返回");
return;
}
} catch (InterruptedException e) {
e.printStackTrace();
return;
}
try {
log.debug("t1获得了锁");
} finally {
lock.unlock();
}
}, "t1");
lock.lock();
try {
log.debug("main线程获得了锁");
t1.start();
//先让线程t1执行
Thread.sleep(2000);
} finally {
lock.unlock();
}
}
}
控制台打印结果如下:
ReentrantLock支持公平锁和非公平锁,默认是非公平锁。在下面的程序中,首先让500个线程执行,当执行1S后,再启动500个线程执行,由后面500个线程是否存在有线程插入到前面500个线程执行来判断ReentrantLock默认是否是非公平锁。
public class ReentrantLockDemo5 {
public static void main(String[] args) throws InterruptedException {
ReentrantLock lock = new ReentrantLock();
for (int i = 0; i < 500; i++) {
new Thread(() -> {
lock.lock();
try {
try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
log.debug(Thread.currentThread().getName() + " running...");
} finally {
lock.unlock();
}
}, "t" + i).start();
}
// 1s 之后去争抢锁
Thread.sleep(1000);
for (int i = 0; i < 500; i++) {
new Thread(() -> {
lock.lock();
try {
log.debug(Thread.currentThread().getName() + " running...");
} finally {
lock.unlock();
}
}, "强行插入" + i).start();
}
}
}
控制台打印结果如下,可以看到后面的500个线程确实有线程插入到前面500个线程中执行,可以证明ReentrantLock默认是非公平锁。
ReentrantLock源码
lock()加锁
- ReentrantLock的lock()方法由其内部抽象类Sync继承AbstractQueuedSynchronized。
public void lock() {
sync.lock();
}
- ReentrantLock实现了公平锁和非公平锁,因此Sync分别有公平锁和非公平锁的两个实现。公平锁的实现类是FairSync,其lock()方法实现如下:
final void lock() {
acquire(1);
}
acquire()是AQS中定义的模版方法,所有的独占锁获取锁都会调用到该方法,其实现如下:
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
tryAcquire()方法在AQS中未实现,是模版方法中提供给子类实现的方法,ReentrantLock的内部类FairSync的实现如下:
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
//获取state的值,该值在ReentrantLock中表示重入锁的次数
int c = getState();
//state为0表示未加锁,可以尝试加锁
if (c == 0) {
//判断同步等待队列中当前节点前面没有正在阻塞等待的节点且使用CAS修改state成功,则认为加锁成功
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
//设置当前锁的占有线程为当前线程
setExclusiveOwnerThread(current);
//true表示加锁成功
return true;
}
}
//state不为0则表示已加锁,判断加锁线程是否是当前线程
else if (current == getExclusiveOwnerThread()) {
//如果是当前线程,则增加重入锁次数
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
//其他情况则加锁失败
return false;
}
如果tryAcquire()方法返回true加锁成功,则acquire()直接返回;如果返回false,则还需执行acquireQueued(addWaiter(Node.EXCLUSIVE), arg),addWaiter()方法是将当前线程封装成一个同步等待队列中的Node节点对象,acquireQueued()方法实现如下:
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
//此处循环的目的是当前线程在阻塞被唤醒后需要一直判断是否是同步等待队列首节点
for (;;) {
final Node p = node.predecessor();
//判断当前节点的前一个节点如果是同步等待队列的首节点,则再次尝试获取锁,获取成功则设置当前节点为首节点并返回
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
//阻塞当前线程
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
- 非公平锁的lock()方法实现如下,与公平锁的区别是:公平锁在获取锁时只有当同步队列中当前线程前面没有线程在阻塞等待的情况下才会去尝试获取锁,而非公平锁在调用lock()方法时首先就会去尝试获取锁,这也就是为啥它是非公平的。
final void lock() {
//尝试获取锁
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
//获取锁失败则入队同步等待队列
acquire(1);
}
- 非公平锁NoFairSync的tryAcquire()方法实现也与公平锁有些差别:公平锁在tryAcquire()时始终会去看同步等待队列中当前线程前面没有正在阻塞等待的线程才会去获取锁,而非公平锁直接去尝试获取锁。
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
final boolean nonfairTryAcquire(int acquires) {
//获取state的值,该值在ReentrantLock中表示重入锁的次数
final Thread current = Thread.currentThread();
int c = getState();
//state为0表示未加锁,可以尝试加锁
if (c == 0) {
//此处与公平锁实现有区别,直接尝试获取锁
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
//state不为0则表示已加锁,判断加锁线程是否是当前线程
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
unlock()解锁
- unlock()实现如下:
public void unlock() {
sync.release(1);
}
- release()方法也是AQS的模版方法,其中tryRelease()方法是提供给子类实现的方法,实现如下:
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
//唤醒同步等待队列首节点线程
unparkSuccessor(h);
return true;
}
return false;
}
- ReentrantLock的公平锁和非公平锁主要是针对加锁的场景,因此tryRelease()方法在ReentrantLock中只有一种实现,该实现定义在Sync中,如下:
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
//判断占有锁的线程是否是当前线程
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
//如果state为0则表示当前锁是未加锁状态,设置锁占有线程为null
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
流程图
ReentrantLock调用lock方法加锁的逻辑如下图,其中紫色标记的过程都是tryAcquire()方法的逻辑。
从上面的过程我们可以总结出以下信息:
- 对于ReentrantLock来说,state的值为0时表示当前锁可以被获取,为1时表示当前锁已被某个线程获取,大于1时表示当前锁被某个线程重入的次数;
- 某个节点封装的线程可以被唤醒的条件是其前一个节点的waitStatus必须是-1,即SIGNAL状态;
- 只有同步等待队列的head节点的下一个节点才可能被唤醒。
ReentrantLock解锁逻辑如下:
head头节点的下一个节点被唤醒后,仍处在加锁逻辑的最后几步的for循环中(即判断当前节点的前一个节点P是否是头节点),当其被唤醒后再次调用tryAcquire()方法获取锁后,首先将同步队列的head节点出队,随后将当前Node节点的thread置为空(因为当前线程已经被唤醒获得锁了,不需要在同步队列中),最后跳出for循环,这时候当前线程才真正结束了lock方法的执行。