文章目录
- 概要
- AQS概述
- 公平锁与非公平锁原理
- 可重入
概要
假设现在需要写一个SDK层面的锁,应该如何实现呢?
初步的思路如下:
- 搞一个状态标记,用来表示持有或未持有锁,但得是 volatile 类型的保证线程可见性。
- 编写一个
lock
,unlock
函数用于抢锁和释放锁,就是对状态标记的修改操作 unlock
函数要保证并发下只能有一个线程能抢到锁,其他线程要等待获取锁(阻塞式),可以采用CAS+自旋的方式实现
初步实现如下:
public class MyLock {
// 定义一个状态变量status:为1表示锁被持有,为0表示锁未被持有
private volatile int status;
private static final long valueOffset;
private static final Unsafe unsafe = reflectGetUnsafe();
static {
try {
valueOffset = unsafe.objectFieldOffset
(MyLock.class.getDeclaredField("status"));
} catch (Exception ex) {
throw new Error(ex);
}
}
private static Unsafe reflectGetUnsafe() {
Field field = null;
try {
field = Unsafe.class.getDeclaredField("theUnsafe");
field.setAccessible(true);
return (Unsafe) field.get(null);
} catch (Exception e) {
e.printStackTrace();
return null;
}
}
/**
* 阻塞式获取锁
*
* @return
*/
public boolean lock() {
while (!compareAndSet(0, 1)) {
}
return true;
}
// cas 设置 status
public final boolean compareAndSet(int expect, int update) {
return unsafe.compareAndSwapInt(this, valueOffset,
expect, update);
}
/**
* 释放锁
*/
public void unlock() {
status = 0;
}
}
初步实现的代码,存在获取不到锁自旋时,是空转,浪费CPU的问题
改进1:
/**
* 阻塞式获取锁
*
* @return
*/
public boolean lock() {
while (!compareAndSet(0, 1)) {
Thread.yield();//yield+自旋,尽可能的防止CPU空转,让出CPU资源
}
return true;
}
或者可以采用线程休眠的方式,但是休眠时间不太好确定,太长太短都不好。
2、采用等待唤醒机制,但是这里由于没有使用 synchronized 关键字,所以也无法使用 wait/notify ,但是我们可以使用 park/unpark ,获取不到锁的线程 park 并且去队列排队,释放锁时从队列拿出一个线程 unpark
private static final Queue<Thread> QUEUE = new LinkedBlockingQueue<>();
/**
* 阻塞式获取锁
*
* @return
*/
public boolean lock() {
while (!compareAndSet(0, 1)) {
//Thread.yield();//yield+自旋,尽可能的防止CPU空转,让出CPU资源
QUEUE.offer(Thread.currentThread());
LockSupport.park();//线程休眠
}
return true;
}
/**
* 释放锁
*/
public void unlock() {
status = 0;
LockSupport.unpark(QUEUE.poll());
}
上面基本能实现一个简单的锁的功能,如果在AQS框架的基础上实现,又应该如何实现呢
AQS概述
AQS(AbstractQueuedSynchronizer
):抽象队列同步器,定义了一套多线程访问共享资源的同步器框架,提供了SDK层面的锁机制,JUC中的很多类譬如:ReentrantLock/Semaphore/CountDownLatch…等都是基于它。
- AQS用一个
volatile int state
; 属性表示锁状态,1表示锁被持有,0表示未被持有,具体的维护由子类去维护,但是提供了修改该属性的三个方法:getState()
,setState(int newState)
,compareAndSetState(int expect, int update)
,其中CAS方法是核心。 - 框架内部维护了一个FIFO的等待队列,是用双向链表实现的,我们称之为CLH队列
- 框架也内部也实现了条件变量
Condition
,用它来实现等待唤醒机制,并且支持多个条件变量 - AQS支持两种资源共享的模式:独占模式(Exclusive)和共享模式(Share),所谓独占模式就是任意时刻只允许一个线程访问共享资源,譬如
ReentrantLock
;而共享模式指的就是允许多个线程同时访问共享资源,譬如Semaphore/CountDownLatch
- 使用者只需继承
AbstractQueuedSynchronizer
并重写指定的方法,在方法内完成对共享资源 state 的获取和释放,至于具体线程等待队列的维护,AQS已经在顶层实现好了,在那些 final 的模板方法里 - AQS底层使用了模板方法模式,给我们提供了许多模板方法,我们直接使用即可
API | 说明 |
---|---|
final void acquire(int arg) | 独占模式获取锁,AQS顶层已实现,内部调用了tryAcquire |
boolean tryAcquire(int arg) | 独占模式尝试获取锁,AQS中未实现,由子类去实现,获取到锁返回true |
final boolean release(int arg) | 释放独占锁,AQS顶层已实现,内部调用了tryRelease |
boolean tryRelease(int arg) | 尝试释放独占锁,AQS中未实现,由子类去实现,成功释放返回true |
final void acquireShared(int arg) | 共享模式获取锁,AQS顶层已实现,内部调用了tryAcquireShared |
int tryAcquireShared(int arg) | 尝试获取共享锁,返回负数表示失败,0表示成功,但没有剩余可用资源; 正数表示成功,且有剩余资源,AQS中未实现,由子类实现 |
final boolean releaseShared(int arg) | 释放共享锁,返回true代表释放成功,AQS中已实现,内部调用了tryReleaseShared |
boolean tryReleaseShared(int arg) | 尝试释放锁,释放后允许唤醒后续等待结点返回true,否则返回false, AQS中未实现,需要由子类实现 |
boolean isHeldExclusively() | 共享资源是否被独占 |
AQS基本使用
现在有个场景,基于AQS来实现一个锁
/**
* 基于 aqs实现锁
*/
public class MyLock2 implements Lock {
//同步器
private Syn syn = new Syn();
@Override
public void lock() {
// 模板方法
syn.acquire(1);
}
@Override
public void unlock() {
// 模板方法
syn.release(0);
}
class Syn extends AbstractQueuedSynchronizer {
@Override
protected boolean tryAcquire(int arg) {
if (compareAndSetState(0, arg)) {
return true;
}
return false;
}
@Override
protected boolean tryReleaseShared(int arg) {
setState(arg);
return true;
}
}
// 其他接口方法暂时先不实现 省略
}
公平锁与非公平锁原理
自己实现的锁在使用过程中发现一个问题,就是有时候有的线程特别容易抢到锁,而有的线程老是抢不到锁。这其实就是涉及到锁是否是公平的,那么什么是公平锁什么是非公平锁呢?这时候就要看看获取锁的模板方法中是如何实现的
/**
* Acquires in exclusive mode, ignoring interrupts. Implemented
* by invoking at least once {@link #tryAcquire},
* returning on success. Otherwise the thread is queued, possibly
* repeatedly blocking and unblocking, invoking {@link
* #tryAcquire} until success. This method can be used
* to implement method {@link Lock#lock}.
*
* @param arg the acquire argument. This value is conveyed to
* {@link #tryAcquire} but is otherwise uninterpreted and
* can represent anything you like.
*/
public final void acquire(int arg) {
// tryAcquire由子类来实现
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
-
线程一来首先调用 tryAcquire ,在 tryAcquire 中直接CAS获取锁,如果获取不成功通过 addWaiter 加入等待队列,然后走 acquireQueued 让队列中的某个等待线程去获取锁。
-
不公平就体现在这里,线程来了也不先看一下等待队列中是否有线程在等待,如果没有线程等待,那直接获取锁没什么 问题,如果有线程等待就直接去获取锁不就相当于插队么?
如何实现公平性呢?
查看 AbstractQueuedSynchronizer
的类定义,主要定义如下
public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer
implements java.io.Serializable {
private static final long serialVersionUID = 7373984972572414691L;
protected AbstractQueuedSynchronizer() { }
/**
* Wait queue node class.
*
*/
static final class Node {
/** Marker to indicate a node is waiting in shared mode */
static final Node SHARED = new Node();
/** Marker to indicate a node is waiting in exclusive mode */
static final Node EXCLUSIVE = null;
/** waitStatus value to indicate thread has cancelled */
static final int CANCELLED = 1;
/** waitStatus value to indicate successor's thread needs unparking */
static final int SIGNAL = -1;
/** waitStatus value to indicate thread is waiting on condition */
static final int CONDITION = -2;
static final int PROPAGATE = -3;
volatile int waitStatus;
volatile Node prev;
volatile Node next;
volatile Thread thread;
Node nextWaiter;
}
public class ConditionObject implements Condition, java.io.Serializable { }
}
内部类 Node 以及其类型的变量head 和 tail 就表示 AQS 内部的一个等待队列,而剩下的 state 变量就用来表示锁的状态。
等待队列应该就是线程获取锁失败时,需要临时存放的一个地方,用来等待被唤醒并尝试获取锁。再看 Node 的属性我们知道, Node 存放了当前线程的指针 thread ,也即可以表示当前线程并对其进行某些操作, prev 和 next 说明它构成了一个双向链表,也就是为某些需要得到前驱或后继节点的算法提供便利。
AQS加锁最核心代码如下:
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
其获取锁标识的过程,图解如下
那如何让自定义的锁是公平的呢?
其实导致不公平的原因就是线程每次调用 acquire
时,都会先去tryAcquire
,而该方法目前的实现时直接去抢锁,也不看现在等待队列中有没有线程在排队,如果有线程在排队,那岂不是变成了插队,导致不公平。所以现在的解决办法就是,在 tryAcquire
时先看一下等待队列中是否有在排队的,如果有那就乖乖去排队,不插队,如果没有则可以直接去获取锁。
那如何知道线程AQS等待队列中是否有线程排队呢?其实AQS顶层已经实现好了,它提供了一个 hasQueuedPredecessors
函数:如果在当前线程之前有一个排队的线程,则为True; 如果当前线程位于队列的头部(head.next )
或队列为空,则为false。
@Override
protected boolean tryAcquire(int arg) {
//先判断等待队列中是否有线程在排队 没有线程排队则直接去获取锁
if (!hasQueuedPredecessors() &&
compareAndSetState(0, arg)) {
return true;
}
return false;
}
现在已经有公平锁了,稍微改造,就可以既能支持公平锁,也支持非公平锁
public class MyLock2 implements Lock {
//同步器
private Sync syn;
MyLock2() {
syn = new NoFairSync();
}
MyLock2(boolean fair) {
syn = fair ? new FairSync() : new NoFairSync();
}
@Override
public void lock() {
//调用模板方法
syn.acquire(1);
}
@Override
public void unlock() {
//调用模板方法
syn.release(0);
}
// 实现一个独占同步器
class Sync extends AbstractQueuedSynchronizer {
@Override
protected boolean tryRelease(int arg) {
setState(arg);
return true;
}
}
class FairSync extends Sync {
@Override
protected boolean tryAcquire(int arg) {
//先判断等待队列中是否有线程在排队 没有线程排队则直接去获取锁
if (!hasQueuedPredecessors() &&
compareAndSetState(0, arg)) {
return true;
}
return false;
}
}
class NoFairSync extends Sync {
@Override
protected boolean tryAcquire(int arg) {
//直接去获取锁
if (compareAndSetState(0, arg)) {
return true;
}
return false;
}
}
}
可重入
那如何让锁支持可重入呢?也就是说如果一个线程持有锁之后,还能继续获取锁,也就是说让锁只对不同线程互斥。
查看 AbstractQueuedSynchronizer
的定义我们发现,它还继承自另一个类: AbstractOwnableSynchronizer
public abstract class AbstractQueuedSynchronizer extends
AbstractOwnableSynchronizer
implements java.io.Serializable {...}
public abstract class AbstractOwnableSynchronizer
implements java.io.Serializable {
private transient Thread exclusiveOwnerThread;
protected final void setExclusiveOwnerThread(Thread
thread) {...}
protected final Thread getExclusiveOwnerThread(){...}
}
AQS 中有个变量是可以保存当前持有独占锁的线程的。当我们获取锁时,如果发现锁被持有不要着急放弃,先看看
持有锁的线程是否时当前线程,如果是还能继续获取锁。
另外关于可重入锁,还要注意一点,锁的获取和释放操作是成对出现的,就像下面这样
lock
lock
lock
lock
....
unlock
unlock
unlock
unlock
对于重入锁不仅要能记录锁被持有,还要记录重入的次数,释放的时候也不是直接将锁真实的释放,而是先减少重入次数,能释放的时候在释放。
故此时状态变量 state 不在只有两个取值 0,1 ,某线程获取到锁state=1 ,如果当前线程重入获取只需增加状态值 state=2 ,依次同理,锁释放时释放一次状态值 -1 ,当 state=0 时才真正释放,其他线程才能继续获取锁
修改后代码如下:
public class MyLock2 implements Lock {
//同步器
private Sync syn;
MyLock2() {
syn = new NoFairSync();
}
MyLock2(boolean fair) {
syn = fair ? new FairSync() : new NoFairSync();
}
@Override
public void lock() {
//调用模板方法
syn.acquire(1);
}
@Override
public void unlock() {
//调用模板方法
syn.release(0);
}
// 实现一个独占同步器
class Sync extends AbstractQueuedSynchronizer {
@Override
protected boolean tryRelease(int arg) {
if (Thread.currentThread() !=
getExclusiveOwnerThread()) {
throw new IllegalMonitorStateException();
}
boolean realRelease = false;
int nextState = getState() - arg;
if (nextState == 0) {
realRelease = true;
setExclusiveOwnerThread(null);
}
setState(nextState);
return realRelease;
}
}
class FairSync extends Sync {
@Override
protected boolean tryAcquire(int arg) {
final Thread currentThread = Thread.currentThread();
int currentState = getState();
if (currentState == 0) { // 可以获取锁
//先判断等待队列中是否有线程在排队 没有线程排队则直接去获取锁
if (!hasQueuedPredecessors() &&
compareAndSetState(0, arg)) {
setExclusiveOwnerThread(currentThread);
return true;
}
} else if (currentThread == getExclusiveOwnerThread()) {
//重入逻辑 增加 state值
int nextState = currentState + arg;
if (nextState < 0) {
throw new Error("Maximum lock count exceeded");
}
setState(nextState);
return true;
}
return false;
}
}
class NoFairSync extends Sync {
@Override
protected boolean tryAcquire(int arg) {
Thread currentThread = Thread.currentThread();
int currentState = getState();
if (currentState == 0) {
//直接去获取锁
if (compareAndSetState(0, arg)) {
setExclusiveOwnerThread(currentThread);
return true;
}
} else if (currentThread == getExclusiveOwnerThread()) {
//重入逻辑 增加 state值
int nextState = currentState + arg;
if (nextState < 0) {
throw new Error("Maximum lock count exceeded");
}
setState(nextState);
return true;
}
return false;
}
}
}