目录
1、AQS初步
2、AQS源码
2.1、ReentrantLock类解析
2.2、AQS源码
JUC-->AQS-->AbstractQueuedSynchronizer:字面意思:抽象的队列同步器
AQS是用来构建锁或者其它同步器组件的重量级基础框架及整个JUC体系的基石,通过内置的FIFO队列来完成资源获取线程的排队工作,并通过一个int类型变量表示持有锁的状态
CLH:Craig、Landin and Hagersten队列,是一个单向链表,AQS中的队列是CLH变体的虚拟双向队列FIFO。
@author Doug Lea
public class ReentrantLock implements Lock, java.io.Serializable {
...
abstract static class Sync extends AbstractQueuedSynchronizer {
...
abstract void lock();
...
}
@author Doug Lea
public class CountDownLatch {
private static final class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 4982264981922014374L;
Sync(int count) {
setState(count);
}
int getCount() {
return getState();
}
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
}
private final Sync sync;
@author Doug Lea
public class ReentrantReadWriteLock
implements ReadWriteLock, java.io.Serializable {
...
abstract static class Sync extends AbstractQueuedSynchronizer {
@author Doug Lea
public class Semaphore implements java.io.Serializable {
...
abstract static class Sync extends AbstractQueuedSynchronizer {
并发大神 Doug Lea,提出统一规范并简化了锁的实现,屏蔽了同步状态管理、阻塞线程排队和通知、唤醒机制等。
加锁会导致阻塞:有阻塞就需要排队,实现排队必然需要有某种形式的队列来进行管理
解释说明:抢到资源的线程直接使用处理业务逻辑,抢不到资源的必然涉及一种排队等候机制。抢占资源失败的线程继续去等待(类似银行业务办理窗口都满了,暂时没有受理窗口的顾客只能去候客区排队等候),但等候线程仍然保留获取锁的可能且获取锁流程仍然继续(候客区的顾客也在等着叫号,轮到了再去受理窗口办理业务)。
既然说到了排队等候机制,那么就一定会有某种队列形成,这样的队列是什么数据结构呢?
如果共享资源被占用,就需要一定的阻塞等待唤醒机制来保证锁分配。这个机制主要用的是CLH队列的变体实现的,将暂时获取不到锁的线程加入到队列中,这个队列就是AQS的抽象表现。它将请求共享资源的线程封装成队列的节点(Node),通过CAS,自旋以及LockSupport.park()的方式,维护state变量的状态,使并发达到同步的控制效果。
1、AQS初步
/**
* Provides a framework for implementing blocking locks and related
//为实现阻塞锁和相关的同步器提供一个框架,它是依赖于先进先出的一个等待
* synchronizers (semaphores, events, etc) that rely on
* first-in-first-out (FIFO) wait queues. This class is designed to
* be a useful basis for most kinds of synchronizers that rely on a
* single atomic {@code int} value to represent state. Subclasses
* must define the protected methods that change this state, and which
//依靠单个原子int值来表示状态,通过占用和释放方法,改变状态值
* define what that state means in terms of this object being acquired
* or released. Given these, the other methods in this class carry
* out all queuing and blocking mechanics. Subclasses can maintain
* other state fields, but only the atomically updated {@code int}
* value manipulated using methods {@link #getState}, {@link
* #setState} and {@link #compareAndSetState} is tracked with respect
* to synchronization.
......
* @since 1.5
* @author Doug Lea
*/
public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer
implements java.io.Serializable {
有阻塞就需要排队,实现排队必然需要队列
AQS使用一个volatile的int类型的成员变量来表示同步状态,通过内置的FIFO队列来完成资源获取的排队工作将每条要去抢占资源的线程封装成一个Node节点来实现锁的分配,通过CAS完成对state值的修改。
AQS=state变量+CLH变种的双端队列
* @since 1.5
* @author Doug Lea
*/
public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer
implements java.io.Serializable {
...
/**
* Wait queue node class.
*
* <p>The wait queue is a variant of a "CLH" (Craig, Landin, and
* Hagersten) lock queue. CLH locks are normally used for
* spinlocks. We instead use them for blocking synchronizers, but
* use the same basic tactic of holding some of the control
* information about a thread in the predecessor of its node. A
* "status" field in each node keeps track of whether a thread
* should block. A node is signalled when its predecessor
* releases. Each node of the queue otherwise serves as a
* specific-notification-style monitor holding a single waiting
* thread. The status field does NOT control whether threads are
* granted locks etc though. A thread may try to acquire if it is
* first in the queue. But being first does not guarantee success;
* it only gives the right to contend. So the currently released
* contender thread may need to rewait.
*
* <p>To enqueue into a CLH lock, you atomically splice it in as new
* tail. To dequeue, you just set the head field.
* <pre>
* +------+ prev +-----+ +-----+
* head | | <---- | | <---- | | tail
* +------+ +-----+ +-----+
* </pre>
...
//队列中每个排队的个体就是一个Node,Node<Thread>, Node=waitStatus+前后指针指向
static final class Node {
/** Marker to indicate a node is waiting in shared mode */
static final Node SHARED = new Node();//共享模式:表示线程以共享模式等待锁
/** Marker to indicate a node is waiting in exclusive mode */
static final Node EXCLUSIVE = null;//独占模式:表示线程正在以独占的方式等待锁
/** waitStatus value to indicate thread has cancelled */
static final int CANCELLED = 1;//线程被取消了
/** waitStatus value to indicate successor's thread needs unparking */
static final int SIGNAL = -1;//后继线程需要唤醒
/** waitStatus value to indicate thread is waiting on condition */
static final int CONDITION = -2;//等待condition唤醒
/**
* waitStatus value to indicate the next acquireShared should
* unconditionally propagate
*/
static final int PROPAGATE = -3;//共享式同步状态获取将会无条件地传播下去
volatile int waitStatus;//当前节点在队列中的状态,初始为0,状态是上面的几种
volatile Node prev;//前置节点
volatile Node next;//后继节点
/**
* The thread that enqueued this node. Initialized on
* construction and nulled out after use.
*/
volatile Thread thread;//表示处于该节点的线程
...
}
/**
* Head of the wait queue, lazily initialized. Except for
* initialization, it is modified only via method setHead. Note:
* If head exists, its waitStatus is guaranteed not to be
* CANCELLED.
*/
private transient volatile Node head;
/**
* Tail of the wait queue, lazily initialized. Modified only via
* method enq to add new wait node.
*/
private transient volatile Node tail;
/**
* The synchronization state.
*/
private volatile int state;
....
protected final boolean compareAndSetState(int expect, int update) {
// See below for intrinsics setup to support this
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
AQS同步队列的基本结构
2、AQS源码
2.1、ReentrantLock类解析
从ReentrantLock开始解读AQS,Lock接口的实现类,基本都是通过[聚合]了一个[队列同步器]的子类完成线程访问控制的。
对比公平锁和非公平锁的tryAcquire()方法的实现代码,其实差别在于非公平锁获取锁时比公平锁少一个判断!hasQueuedPredecessors()
public final boolean hasQueuedPredecessors() {
// The correctness of this depends on head being initialized
// before tail and on head.next being accurate if the current
// thread is first in queue.
Node t = tail; // Read fields in reverse initialization order
Node h = head;
Node s;
return h != t &&
((s = h.next) == null || s.thread != Thread.currentThread());
}
!hasQueuedPredecessors()中判断了是否需要排队,导致公平锁和非公平锁的差异如下:
公平锁:公平锁讲究先来先到,线程在获取锁时,如果这个锁的等待队列中已经有线程在等待,那么当前线程就会进入等待队列中。
非公平锁:不管是否有等待队列,如果可以获取锁,则立刻占有锁对象。也就是说队列的第一个排队线程在unpark(),之后还是需要竞争锁(存在线程竞争的情况下)
public class ReentrantLock implements Lock, java.io.Serializable {
....
public ReentrantLock() {
sync = new NonfairSync();
}
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
static final class FairSync extends Sync {
final void lock() {
acquire(1);
}
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
}
static final class NonfairSync extends Sync {
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
}
2.2、AQS源码
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
public class AQSDemo {
public static void main(String[] args) {
ReentrantLock lock = new ReentrantLock();
// 引入银行办理业务场景
// 3个线程模拟3个来银行办理业务的客户,暂时只开一个受理窗口
// A 顾客是第一个顾客,受理窗口没有人,A可以直接去办理
new Thread(() -> {
lock.lock();
try {
System.out.println("--A thread come in");
try {
TimeUnit.MINUTES.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
} finally {
lock.unlock();
}
}, "A").start();
// B 顾客,第二个来的只能等待
new Thread(() -> {
lock.lock();
try {
System.out.println("--B thread come in");
} finally {
lock.unlock();
}
}, "B").start();
// C 顾客,第三个来的只能等待
new Thread(() -> {
lock.lock();
try {
System.out.println("--C thread come in");
} finally {
lock.unlock();
}
}, "C").start();
}
}
下面不贴图了
//AQS主要源码,往下
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
//1.tryAcquire
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
//2.进入队列等待
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
//2.1进入队列
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))//头节点为new Node()空节点,占位使用,可称傀儡节点/哨兵节点
tail = head;
} else {
node.prev = t;//把当前节点的前指针指向尾指针
if (compareAndSetTail(t, node)) {//比较并交换把当前要入队的节点和尾节点进行交换
t.next = node;//上一个节点的下一个节点是当前节点
return t;//
}
}
}
}
//3.
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
干我们这行,啥时候懈怠,就意味着长进的停止,长进的停止就意味着被淘汰,只能往前冲,直到凤凰涅槃的一天!