可重入锁
又叫递归锁,同一个线程在外层方法获得锁的时候,再进入该线程内层方法会自动获取锁,(前提锁对象是同一个对象)。不会因为之前已经获取过还没释放而阻塞。
Synchronized和ReentrantLock都是可重入锁,优点是可一定程度避免死锁
synchronized重入实现机理
每个锁对象拥有一个锁计数器和一个指向持有该锁的线程的指针。
LockSupport
用于创建锁和其他同步类的基本线程阻塞原语。
该类与使用它的每个线程关联一个许可证,如果许可证可用,立即返回park,并在此过程中消费,如果尚未获得许可,则致电unpark获得许可。
对线程等待唤醒机制的改良
synchronized里面是wait和notify,Object中的方法。
限制:但是必须先wait后notify,必须在同步块或方法里且成对使用。
lock里面,通过lock.newCondition方法获取到Condition,这里方法有await和signal。
限制:必须和lock和unlock组队才能,必须在同步块或方法里且成对使用。
所以上面其实一样的限制,传统的实现等待唤醒都有约束。
LockSupport类中的方法park()和unpark()阻塞当前线程和唤醒指定被阻塞的线程,没有了上面两种的限制。没有顺序,先后都行。
所以有三种等待唤醒的方法
那LockSupport原理是什么?
LockSupport是一个线程阻塞工具类,所有方法都是静态方法,可以让线程在任意位置阻塞,阻塞后也有对应的唤醒方法。归根结底,LockSupport调用的Unsafe类中的native方法。
LockSupport和每个使用他的线程都有一个许可permit关联,permit相当于1,0的开关,默认0
调用一次unpark就加1变成1
调用一次park会消费permit,将1变成0,同时park立即返回
如再次调用park会变成阻塞(因为permit为0会阻塞在这里,一直到permit为1),这是调用unpark会把permit置为1
每个线程都有一个相关的permit,permit最多只有一个,重复调用unpark也不会积累凭证。
阻塞是0,唤醒后变为1.unpark发放通行证0变成1,阻塞park消耗通行证将1变成0
面试题:
为什么可以先唤醒线程后阻塞线程?
因为unpark或得一个凭证,之后再调用park方法,就可以名正言顺的凭证消费,所以不会阻塞。
为什么唤醒两次后阻塞两次,但最终结果还会阻塞线程?
因为凭证数量最多为1,连续调用两次和调用一次unpark效果一样,只会增加一个凭证,而调用两次park却需要消耗两个凭证,证不够(两个车只有一个通行证),不能放行。
学习上面为了AQS
AQS—抽象队列同步器,抽象类
底层LockSupport
是用来构建锁(ReentrantLock,CountDownLunch,Semphore,CyclicBy)或者其他同步器组件的重量级基础框架及整个JUC体系的基石。通过内置的FIFO先进先出队列来完成资源获取线程的排队工作,并通过一个int类型变量表示持有锁的状态。
源码jar包有三个,最重要的AbstractQueueSynchronizer,简称AQS
AQS就是变量state➕队列
如果共享资源state被占用,就需要一定的阻塞等待唤醒机制来保证锁分配。这个机制主要用的是CLH队列的变体实现的,将暂时获取不到锁的线程加入到队列中,这个队列就是AQS的抽象表现。它将请求共享资源的线程封装成队列的节点Node,通过CAS,自旋以及LockSupport的park方法的方式,维护state变量的状态,使并发达到同步的控制效果。
和AQS有关的
ReentrantLock,CountDownLatch,ReentrantReadWriteLock,Semaphore,CyclicBarrier等都和AQS有关。。。所以AQS重要。。
AQS干嘛?加锁导致阻塞,需要排队,所以AQS
AQS使用一个volatile的int类型的成员变量来表示同步状态,通过内置的FIFO队列来完成资源获取的排队工作,将每条要去抢占资源的线程封装成一个Node节点来实现锁的分配,通过CAS完成对state值的修改。
源码
public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer
implements java.io.Serializable {
/**
* Head of the wait queue, lazily initialized. Except for
* initialization, it is modified only via method setHead. Note:
* If head exists, its waitStatus is guaranteed not to be
* CANCELLED.
*/
private transient volatile Node head;//头部
/**
* Tail of the wait queue, lazily initialized. Modified only via
* method enq to add new wait node.
*/
private transient volatile Node tail;//尾部
/**
* The synchronization state.
*/
private volatile int state;//state表示状态,volatile修饰,默认0
//Node节点
static final class Node {
//共享
static final Node SHARED = new Node();
//独占
static final Node EXCLUSIVE = null;
//线程被取消了
static final int CANCELLED = 1;
//后续线程需要唤醒
static final int SIGNAL = -1;
//等待condition唤醒
static final int CONDITION = -2;
//共享式同步状态获取将会无条件地传播下去
static final int PROPAGATE = -3;
//队列中每一个线程等待状态
volatile int waitStatus;
volatile Node prev;//前指针
volatile Node next;//后指针
volatile Thread thread;//Node节点里面包的是线程
Node nextWaiter;
/**
* Returns true if node is waiting in shared mode.
*/
final boolean isShared() {
return nextWaiter == SHARED;
}
/**
* Returns previous node, or throws NullPointerException if null.
* Use when predecessor cannot be null. The null check could
* be elided, but is present to help the VM.
*
* @return the predecessor of this node
*/
final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}
Node() { // Used to establish initial head or SHARED marker
}
Node(Thread thread, Node mode) { // Used by addWaiter
this.nextWaiter = mode;
this.thread = thread;
}
Node(Thread thread, int waitStatus) { // Used by Condition
this.waitStatus = waitStatus;
this.thread = thread;
}
}
}
然后再看这张图,通过源码看图
1.同步状态的state成员变量,volatile修饰,0没有人,1,有人占用,等着。自旋等待
2.CLH队列,Node节点实现的一个虚拟双向队列。尾部入队,头部出队。
通过ReentrantLock源码解读AQS(重点)
公平锁和非公平的代码区别
少了一个hasQueuePredecessors判断是否需要排队
公平锁,先来先到,线程在获取锁时,如果这个锁的等待队列中已经有线程在等待,那么当前线程进入等待队列。
非公平锁,不管是否有等待队列,如果可以获取锁,立刻占有,队列第一个线程在unpark,之后还是需要竞争。
开始
- 首先A线程lock方法,cas修改aqs中state为1,然后设置为当前A线程
- B线程及其他线程进来cas失败,走acquire方法
- acquire方法中,tryAcquire,acquireQueued,addWaiter方法重点
- 正常BtryAcquire方法返回false,然后调用addWaiter
final void lock() {
//第一个线程进入cas设置aqs中的state值0到1,如果成功,占到了
if (compareAndSetState(0, 1))
//设置为当前第一个线程
setExclusiveOwnerThread(Thread.currentThread());
else
//第二个线程没有cas成功,走这里,1表示一个人
acquire(1);
}
arg=1
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
tryAcquire方法实现
final boolean nonfairTryAcquire(int acquires) {
//这个current就是线程B
final Thread current = Thread.currentThread();
//获取state的值
int c = getState();
//如果是0,说明释放了,直接cas
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
//判断B线程和当前正在执行业务线程是否同一个,不是就返回false
//还有情况是A占用了两次,这里有可能state是2,可重入锁
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
addWaiter方法
线程B没有抢到锁,
addWaiter方法加入队列,如果此时C线程进入,也是进入队列。慢慢就形成了队列
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
//c线程进入
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
//B线程开始进入队列
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
//开始初始化,第一个节点是空节点new Node,作为头节点,也叫哨兵节点,作用是占位
if (compareAndSetHead(new Node()))
//头尾节点
tail = head;
} else {
//B节点的前指针是哨兵节点
node.prev = t;
//比较并交换,设置尾节点
if (compareAndSetTail(t, node)) {
//尾节点
t.next = node;
return t;
}
}
}
}
//公平锁
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
public final boolean hasQueuedPredecessors() {
// The correctness of this depends on head being initialized
// before tail and on head.next being accurate if the current
// thread is first in queue.
Node t = tail; // Read fields in reverse initialization order
Node h = head;
Node s;
return h != t &&
((s = h.next) == null || s.thread != Thread.currentThread());
}
acquireQueue方法
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
}
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
//是否被打断
boolean interrupted = false;
//自旋
for (;;) {
//B线程进入,返回p是哨兵节点
final Node p = node.predecessor();
//tryAcquire再抢一次,失败,不进来
//如果抢占成功,
if (p == head && tryAcquire(arg)) {
//哨兵节点出队,现在开始指向B节点,B节点此时前指针是null,B变成了哨兵节点
setHead(node);
p.next = null; // help GC,原来的哨兵节点没用了,被GC回收
failed = false;
return interrupted;
}
//在抢占失败后park
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
第一次进入,哨兵节点waitStatus=0,变成-1
第二次进入等于-1,返回true
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
//哨兵节点的waitStatus=0
int ws = pred.waitStatus;
//SINGAL=-1
if (ws == Node.SIGNAL)
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
return true;
if (ws > 0) {
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
//ws的值从0变成-1,将哨兵节点waitStatus变成-1
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
这里就是LockSupport,这里park被阻塞,真正的B线程进入队列,阻塞了,其他线程也是在这里开始park阻塞。线程挂起。
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
A线程占用半天了,需要unlock了。
public void unlock() {
sync.release(1);
}
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
//哨兵节点此时waitstatus=-1
if (h != null && h.waitStatus != 0)
//将waitstatus设置为0
unparkSuccessor(h);
return true;
}
return false;
}
AQS解锁
protected final boolean tryRelease(int releases) {
//1-1=0
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
//此时=0
if (c == 0) {
free = true;
//设置当前占用线程为null
setExclusiveOwnerThread(null);
}
//设置state=0
setState(c);
return free;
}
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int ws = node.waitStatus;
//哨兵节点从-1设置为0
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
//获取到的s是B节点
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
//B节点不是null
if (s != null)
//B节点唤醒,将前面park阻塞的唤醒,出队列了
LockSupport.unpark(s.thread);
}
然后Thread.interrupted()开始执行,返回false,然后又进入循环开始抢占B线程占用
占用线程后,原来哨兵节点不用,被GC回收,B线程节点变成新的哨兵节点
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}