非公平锁加锁过程
一般我们在使用ReentrantLock的时候,代码如下:
@Test
public void test(){
ReentrantLock lock = new ReentrantLock();
lock.lock();
try{
//编写业务逻辑
}catch (Exception e){
lock.unlock();
}
}
当我们在用ReentrantLock独占锁的时候,如果不指定公平非公平,那默认是非公平的,如下:
public ReentrantLock() {
sync = new NonfairSync();
}
当我们在外部调用lock()方法的时候会进入ReentrantLock内部的加锁lock方法,其中sync是ReentrantLock的内部类,sync直接继承了AbstractQueuedSynchronizer(AQS)。
public void lock() {
sync.lock();
}
因为Sync内部的lock()方法是一个抽象方法,如下:
这里以非公平锁为例,代码调用lock会来到ReentrantLock 内部类NonfairSync的lock()方法,如下:
lock内部,一上来直接CAS操作AQS内部的一个state变量,从0到1如果修改成功,则获取锁。
否则else调用AQS内部的acquire方法:
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
acquire方法内部会调用tryAcquire方法,点击去会来到AQS内部的tryAcquire,这个方法没有在内部实现,是为了让子类去实现,模板方法模式的体现。
这里又会回到ReentrantLock 的内部类NonfairSync的tryAcquire方法:
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
接着调用nonfairTryAcquire方法:
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
该方法大致逻辑:
1. 获取AQS内部的state变量,如果等于0说明没有线程获取锁,继续CAS尝试获取锁,如果成功返回true。(如果是公平锁,这里需要判断队列里是否有阻塞的线程等着,入队前)
2. 如果不等于0,则判断当前线程current是否等于已经获取锁的线程,如果等于则+1,返回true,这里就是重入锁的逻辑。
3. 如果CAS即没有获取锁,同时也不是重入锁,则返回false。
到这里,返回false,会回到最初调用tryAcquire方法的地方:
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
//如果acquireQueued返回true,说明阻塞线程在等待的过程中被中断,因为线程被唤醒没有获取到锁需要继续阻塞,导致parkAndCheckInterrupt内部调用了Thread.interrupted()方法给中断标志清除了,所以这里需要把中断标志重新恢复过来
selfInterrupt();
}
这里是竞争共享资源失败需要入队等待唤醒的逻辑,首先调用addWaiter(Node.EXCLUSIVE)方法将当前线程封装成一个Node结点。
addWaiter方法的代码如下:(详细解释见注释)
将当前线程封装成Node结点同时入队(尾插法--->注意入队的3个步骤)
//添加阻塞线程
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
//将pred指针指向队列尾结点
if (pred != null) {
//如果不等于null,说明队列有等待线程
node.prev = pred;
//修正当前node结点的前驱指针,指向队列的尾结点
//CAS尝试将当前结点置为尾结点
if (compareAndSetTail(pred, node)) {
//修正队列尾结点的后继指针,指向当前结点
pred.next = node;
//当前结点入队成功则返回
return node;
}
}
//代码走到这里说明,队列里没有阻塞的线程
enq(node);
return node;
}
//将结点入队
private Node enq(final Node node) {
//这里是CAS+自旋机制,保证当前结点一定要入队成功,因为存在并发的情况
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
//这里是给队列进行初始化,构造头结点
if (compareAndSetHead(new Node()))
tail = head;
} else {
//这里是重点,线程入队步骤,如果这几步调换是会存在安全问题的,这3步顺序不能改变
node.prev = t;
//1.将当前node结点的前驱指针,指向队列的尾结点
if (compareAndSetTail(t, node)) {
//2. CAS尝试将当前结点置为尾结点
//3. 修正队列尾结点的后继指针,指向当前结点
t.next = node;
return t;
}
}
}
}
//尾插法进行入队
//为什么释放锁的时候是从尾部往前遍历,不能从前往后,关键就在于尾插法的三个步骤
// <------ 这里是自己操作自己的指针没有并发冲突
// cas 只有一个能设置成功
// ------>
//顺序不能改变,否则会有线程安全问题(null指针)
//注意:解锁的时候是从尾部往前遍历的
addWaiter方法结束回到acquireQueued方法处,详细解释看注释
这里大致逻辑是阻塞线程入队后获取前驱结点
如果前驱结点是头结点 出于性能考虑,会再次尝试获取锁
bb. 抢到锁了,将当前结点设置成头结点
判断shouldParkAfterFailedAcquire是不是需要park,并且梳理一下指针的指向关系
需要park则调用parkAndCheckInterrupt方法
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
//获取当前结点的前驱结点
final Node p = node.predecessor();
//如果前驱结点是头结点 出于性能考虑,会再次尝试获取锁
if (p == head && tryAcquire(arg)) {
//抢到锁了,将当前结点设置成头结点
setHead(node);
//释放原来的头结点,断点指针,让gc来进行回收
p.next = null; // help GC
failed = false;
return interrupted;
}
//到这里说明 当前结点的前驱结点不是头结点 或者 前驱结点是头结点但是没有拿到锁
//说明前驱结点是SIGNAL状态,那就调用park阻塞
//判断是不是需要park,并且梳理一下指针的指向关系
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
//记录阻塞线程在等待获取锁的过程中有没有被中断
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
//设置头结点
private void setHead(Node node) {
//这里因为只有一个结点会进来,所以不需要CAS
head = node;
node.thread = null;
node.prev = null;
}
//这里需要先了解下AQS内部的结点状态
/** waitStatus value to indicate thread has cancelled */
//取消线程结点
static final int CANCELLED = 1;
/** waitStatus value to indicate successor's thread needs unparking */
//表示后面的线程需要被唤醒
static final int SIGNAL = -1;
/** waitStatus value to indicate thread is waiting on condition */
//该节点会从等待队列中转移到同步队列中,加入到同步状态的获取中
static final int CONDITION = -2;
/**
* waitStatus value to indicate the next acquireShared should
* unconditionally propagate
*/
//表示下一次共享式同步状态获取将会被无条件地传播下去
static final int PROPAGATE = -3;
//判断前驱结点状态,找到一个前驱结点状态是SIGNAL的,如果不是SIGNAL状态的,跳过,修改指针指向
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
return true;
if (ws > 0) {
//前驱结点状态是 CANCELLED 状态需要被取消
//意思就是没有必要在 CANCELLED 结点后面等着了,前驱结点不会进行通知当前结点,那就往前面找,直到不大于0,也就是不是取消 CANCELLED 状态的
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
来到parkAndCheckInterrupt方法:
private final boolean parkAndCheckInterrupt() {
//将当前线程进行阻塞,去睡觉,让出CPU执行权
LockSupport.park(this);
//返回中断标志位,并重置为false
//如果有其他线程调用interrupt方法将其唤醒,当前线程如果获取不到锁,仍然需要阻塞,所以这里需要通过interrupted方法清除中断标志,如果不清除线程会一直在这里自旋,耗费CPU资源
return Thread.interrupted();
}
//延伸:这里为啥要用park,而不是wait、sleep
//wait
//1.调用wait需要释放锁,而释放锁的前提是当前线程得持有锁,没拿到锁就不能释放 wait需要和notify配合使用
//2.而且notify不能指定唤醒的线程
//sleep
//1.sleep不能确定要睡多久
// LockSupport.park(), 可通过两种方式被唤醒。
// 1.LockSupport.unpark()
// 2.interrupt()
public void interrupt() {} // 给线程打一个中断标志
public boolean isInterrupted() {} // 检测下线程是否被中断,不清除中断标志
public static boolean interrupted() {} //也是检测下线程是否被中断,但清除中断标志
这里如果不清楚线程中断的含义和使用,可以参考:Java线程中断
来张图吧:
非公平锁解锁过程
当我们在外部调用lock.unlock()方法的时候,代码如下:
public void unlock() {
sync.release(1);
}
可以看到方法里会通过sync内部类调用AQS内部的release方法:
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
而release方法内部又会调用子类的tryRelease方法(ReentrantLock):
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
protected final void setState(int newState) {
state = newState;
}
tryRelease方法大致的执行过程:
获取AQS内部的state变量,并将其-1
为了代码的健壮性判断当前解锁线程是不是获取锁的线程(防御性编程)
如果等于0,将exclusiveOwnerThread属性置为null
修改state变量值(注意这里没有CAS,因为独占锁只有一个线程能获取到锁)
执行完tryRelease方法,会判断如果队列的头结点不等于null并且waitStatus不等于0,就到了unparkSuccessor方法:
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
该方法大致过程如下:
恢复头结点head的waitStatus 状态 compareAndSetWaitStatus(node, ws, 0)
获取头结点的下一个结点Node s = node.next 如果等于null 或者 waitStatus大于0
从tail结点开始往前遍历,找到最前边的waitStatus 小于等于0的结点
找到了对应的node结点,调用LockSupport类的unpark方法唤醒线程的阻塞。
来张图吧: