JUC中的锁API
在juc中有一个Lock接口他的作用和synchronized相似都是为了保证线程安全性提供的解决方案 Lock中定义了一系列释放锁和抢占锁相关的API
lock()
抢占锁资源 如果当前线程没有抢占到锁 则阻塞
tryLock()
尝试抢占锁资源 如果抢占成功则返回true 否则返回false
unlock()
释放锁
…
Lock的实现类
Lock是一个接口他只提供抽象方法 所有的实现由不同的子类去实现
ReentrantLock:
重入锁 属于排它锁类型 和synchronized相似
ReentrantReadWriteLock:
可重入读写锁 它一共提供了两把锁 一把是读锁(ReadLock)和一把写锁WriteLock
StampedLock:
java8新引入的锁机制 是ReentrantReadWriteLock的改进版本
ReentrantLock的基本应用
ReentrantLock是一把可以支持重入的排它锁 同一时刻只允许一个线程获得锁资源 而重入就是如果某个线程已经获得了锁资源那么该线程后续再去抢占锁资源时 不需要再加锁只需要记录重入次数
package com.alipay.alibabademo.thread;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class ReentrantLockDemo {
private Lock lock = new ReentrantLock();
private int count = 0;
public void incr() {
lock.lock();
try {
count++;
}finally {
lock.unlock();
}
}
public static void main(String[] args) throws InterruptedException {
ReentrantLockDemo reentrantLockDemo = new ReentrantLockDemo();
Thread[] threads = new Thread[2];
for (int i = 0; i <2 ; i++) {
threads[i] = new Thread(()->{
for (int j = 0; j <100000 ; j++) {
reentrantLockDemo.incr();
}
});
threads[i].start();
}
threads[0].join();
threads[1].join();
System.out.println(reentrantLockDemo.count);
}
}
上述案例代码中通过ReentrantLock的lock保证count++这个非原子操作加锁保证count++在多线程访问情况下的线程安全性
ReentrantReadWriteLock应用
package com.alipay.alibabademo.thread;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class ReentrantReadWriteLockDemo {
private Lock lock = new ReentrantLock();
private List<String> dataList = new ArrayList<>();
public void add(String value) {
try {
lock.lock();
dataList.add(value);
}finally {
lock.unlock();
}
}
public String get(int index) {
lock.lock();
try {
return dataList.get(index);
}finally {
lock.unlock();
}
}
}
上述案例中提供的add 和get方法 由于ArrayList是线程不安全的类 所以分别在add和get上加了ReentrantLock的lock方法保证原子性(当然也可以通过别的方式比如CopyOnWriteArrayList)但是我们发现当一个线程访问get方法查询数据时 如果其他线程抢占了锁 则会使得该线程阻塞在get方法上 然而读取数据又不会对数据造成任何的影响所以这一操作时多余的 ,我们需要达到的目的是允许多个线程同时调用 get方法 但是只有一个任何一个线程在写 其他线程如果想写必须阻塞 这样大大的提升了读写的性能所以引入了ReentrantReadWriteLock(读写锁)
读写锁的特性
1.读/读不互斥 ,如果多个线程访问读方法 那么这些线程不会阻塞
2.读/写互斥 如果一个线程在访问读方法 另外一个线程访问写方法 那么为了保证数据的一致性调用写方法的线程要阻塞
3.写/写互斥 ,如果多个线程同时访问写方法 则必须要按照互斥规则进行同步
package com.alipay.alibabademo.thread;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
public class ReentrantReadWriteLockDemo {
private ReadWriteLock reentrantReadWriteLock = new ReentrantReadWriteLock();
private Lock readLock = reentrantReadWriteLock.readLock();
private Lock writeLock = reentrantReadWriteLock.writeLock();
private List<String> dataList = new ArrayList<>();
public void add(String value) {
try {
writeLock.lock();
dataList.add(value);
}finally {
writeLock.unlock();
}
}
public String get(int index) {
readLock.lock();
try {
return dataList.get(index);
}finally {
readLock.unlock();
}
}
}
ReentrantReadWriteLock通过读写两把锁的思想,从而减少了读操作带来的锁竞争提升了性能
StampedLock 应用
ReentrantReadWriteLock存在一个问题 如果当前有线程调用get方法 那么所有调用add方法的线程必须等待get方法的线程释放锁之后才能抢占锁进行写 也就相当于在读的过程中不允许 那么如果调用get方法的线程非常多 就会导致写线程一直被阻塞
为了解决ReentrantReadWriteLock中的问题 在java8引入了StampedLock 机制 提供了一种乐观锁策略 当线程调用get方法读取数据时 不会阻塞准备执行写操作的线程
class Point {
private double x, y;
private final StampedLock sl = new StampedLock();
void move(double deltaX, double deltaY) { // an exclusively locked method
long stamp = sl.writeLock();
try {
x += deltaX;
y += deltaY;
} finally {
sl.unlockWrite(stamp);
}
}
double distanceFromOrigin() { // A read-only method
long stamp = sl.tryOptimisticRead();
double currentX = x, currentY = y;
if (!sl.validate(stamp)) {
stamp = sl.readLock();
try {
currentX = x;
currentY = y;
} finally {
sl.unlockRead(stamp);
}
}
return Math.sqrt(currentX * currentX + currentY currentY);
}
void moveIfAtOrigin(double newX, double newY) { // upgrade
// Could instead start with optimistic, not read mode
long stamp = sl.readLock();
try {
while (x == 0.0 && y == 0.0) {
long ws = sl.tryConvertToWriteLock(stamp);
if (ws != 0L) {
stamp = ws;
x = newX;
y = newY;
break;
}
else {
sl.unlockRead(stamp);
stamp = sl.writeLock();
}
}
} finally {
sl.unlock(stamp);
}
}
}}
writeLock:获取写锁
readLock:获取读锁
tryOptimisticRead:获取读锁 当有线程获得读锁时 他不会阻塞其他线程的写操作,通过返回的stamp字段作为一个版本号 用来表示当前线程在读操作期间数据是否被修改过 。StampedLock提供了一个validate方法来验证stamp如果线程在读取过程中没有其他线程对数据进行修改 那么stamp的值不会发生变化 validate方法返回true 否则就验证失败但会false 在验证失败之后为了保证数据的一致性在通过readLock方法来获取阻塞机制的读锁
ReentrantLock实现原理
从上图可以看出 ReentrantLock定义了一个Sync同步类该类又有两个实现FairSync(公平同步)和NonfairSync(非公平同步)这两个类分别代表了ReentrantLock中的公平和非公平的特性而Sync又继承了AbstractQueuedSynchronizer 所以排它锁的一些逻辑应该是在AbstractQueuedSynchronizer 中完成的
AbstractQueuedSynchronizer
AbstractQueuedSynchronizer 又被称为AQS是ReentrantLock实现同步锁的核心类
AQS中提供了两种锁的实现
1.独占锁 :同一时刻只能有一个线程获得锁
2.共享锁:同一时刻允许多个线程同时获得锁
AQS实现排它锁原理流程图
state字段表示互斥变量 当线程来抢占锁资源时 会基于该变量判断当前锁资源是否空闲
双向链表用于存储没抢占到锁资源的线程 每个队列中的线程都会有一个自旋的操作抢占锁
线程的阻塞和唤醒是通过LockSupport.park和unpark来实现
加锁流程
ReentrantLock源码解析
ReentrantLock.lock
public void lock() {
sync.lock();
}
sync是一个抽象的静态内部类通过继承AQS来实现重入锁的逻辑
sync有两个具体的实现
NonfairSync:非公平锁,允许在不排队的情况下直接尝试抢占锁默认使用非公平锁
FairSync:公平锁 必须按照FIFO的规则来访问锁资源
FairSync.lock
final void lock() {
acquire(1);
}
NonfairSync.lock
先通过CAS抢占资源 如果成功就表示获得了锁 如果失败就调用acquire执行锁竞争
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
通过CAS乐观锁的方式 如果当前内存中seate的值和预期值expect相等则更新为update如果更新成功则返回true 否则返回false
protected final boolean compareAndSetState(int expect, int update) {
// See below for intrinsics setup to support this
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
当state =0时表示无锁状态
当state >0时表示已经有线程获得了锁 因为ReentrantLock可以重入所以当同一个线程多次获得同步锁的时候 state会底层 比如重入了三次 那么state = 3 而释放的时候也需要释放3次 直到state = 0其他线程才有资格获得锁
在非公平锁中如果CAS操作未成功 则说明已经有线程持有锁 此时会调用tryAcquire(1)
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
1.通过tryAcquire方法尝试获取独占锁 如果成功则返回true 否则返回false
2.如果tryAcquire方法返回false 这说明当前锁被占用 只能通过addWaiter方法将当前线程封装成Node并添加到AQS的同步队列中
3.acquireQueued方法将Node作为参数 通过自旋去尝试获取锁
tryAcquire(int arg)
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
final boolean nonfairTryAcquire(int acquires) {
//获取当前获得锁的线程
final Thread current = Thread.currentThread();
//获取State状态
int c = getState();
//0表示无锁状态
if (c == 0) {
//通过cas替换State的值如果成功表示抢占到锁
if (compareAndSetState(0, acquires)) {
//保存当前获得锁的线程
setExclusiveOwnerThread(current);
return true;
}
}
//current == getExclusiveOwnerThread()表示获得锁的线程是同一个表示线程可以重入
else if (current == getExclusiveOwnerThread()) {
//如果是同一个线程来获得锁 则直接增加重入次数
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
addWaiter(Node mode)
当尝试获取锁失败之后会调用addWaiter把当前线程封装成一个Node加入同步队列中
private Node addWaiter(Node mode) {
//把当前线程封装成Node
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
//tail是AQS中的尾部 默认为null
Node pred = tail;
//在tail 不为空的情况下 队列中会存在节点
if (pred != null) {
//把当前线程的Node的prev指向tail
node.prev = pred;
//通过CAS把node加入AQS队列
if (compareAndSetTail(pred, node)) {
//把原tail节点的next指向当前node
pred.next = node;
return node;
}
}
//当tail = null时 把node添加到同步队列
enq(node);
return node;
}
enq
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
//如果尾节点 = = null 用cas 构建一个节点
if (compareAndSetHead(new Node()))
//把头节点赋值给尾节点
tail = head;
} else {
//如果尾节点不等于空 把当前节点当成尾节点 然后把prev指针指向上一个节点 把新进来的节点改成尾节点
node.prev = t;
if (compareAndSetTail(t, node)) {
//把上一个节点的next 指针指向刚进来的节点
t.next = node;
return t;
}
}
}
}
acquireQueued
addWaiter方法把线程组装链表后把当前线程的Node节点作为参数传递给acquireQueued加入阻塞队列
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
//获取当前节点的前一个节点
final Node p = node.predecessor();
//如果当前节点的前一个结点是头节点 则说明有资格去争夺锁
if (p == head && tryAcquire(arg)) {
//把当前节点设置成头节点
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
shouldParkAfterFailedAcquire
检查当前节点的前置节点状态 如果是SIGNAL则表示可以放心的阻塞 否则需要通过compareAndSetWaitStatus修改前直接点的状态为SIGNAL
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
//前置节点的 waitStatus
int ws = pred.waitStatus;
//等待其他前直接点的线程被释放
if (ws == Node.SIGNAL)
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
//说明可以挂起
return true;
// 大于0 说明prev节点取消了排队 直接移除这个节点
if (ws > 0) {
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
设置prev 节点状态为-1
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
Node的五种状态
CANCELLED(1):表示当前结点已取消调度。当timeout或被中断(响应中断的情况下),会触发变更为此状态,进入该状态后的结点将不会再变化。
SIGNAL(-1):表示后继结点在等待当前结点唤醒。后继结点入队时,会将前继结点的状态更新为SIGNAL。
CONDITION(-2):表示结点等待在Condition上,当其他线程调用了Condition的signal()方法后,CONDITION状态的结点将从等待队列转移到同步队列中,等待获取同步锁。
PROPAGATE(-3):共享模式下,前继结点不仅会唤醒其后继结点,同时也可能会唤醒后继的后继结点。
0:新结点入队时的默认状态。
parkAndCheckInterrupt
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
cancelAcquire
private void cancelAcquire(Node node) {
// Ignore if node doesn't exist
if (node == null)
return;
node.thread = null;
// Skip cancelled predecessors
//当前节点的前一个节点
Node pred = node.prev;
//前一个节点的 waitStatus> 0 (结束状态)
while (pred.waitStatus > 0)
node.prev = pred = pred.prev;
// predNext is the apparent node to unsplice. CASes below will
// fail if not, in which case, we lost race vs another cancel
// or signal, so no further action is necessary.
Node predNext = pred.next;
// Can use unconditional write instead of CAS here.
// After this atomic step, other Nodes can skip past us.
// Before, we are free of interference from other threads.
//将 node.waitStatus 设置成 CANCELLED 状态
node.waitStatus = Node.CANCELLED;
//4. 如果node是tail,更新tail为pred,并使pred.next指向null
// If we are the tail, remove ourselves.
if (node == tail && compareAndSetTail(node, pred)) {
compareAndSetNext(pred, predNext, null);
} else {
// If successor needs signal, try to set pred's next-link
// so it will get one. Otherwise wake it up to propagate.
int ws;
//5. 如果node既不是tail,又不是head的后继节点
//则将node的前继节点的waitStatus置为SIGNAL
//并使node的前继节点指向node的后继节点
if (pred != head &&
((ws = pred.waitStatus) == Node.SIGNAL ||
(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
pred.thread != null) {
Node next = node.next;
if (next != null && next.waitStatus <= 0)
compareAndSetNext(pred, predNext, next);
} else {
//6. 如果node是head的后继节点,则直接唤醒node的后继节点
unparkSuccessor(node);
}
node.next = node; // help GC
}
}
释放锁
当获得锁的线程需要释放锁的时候 调用ReentrantLock的unlock方法
public void unlock() {
sync.release(1);
}
public final boolean release(int arg) {
//释放锁成功
if (tryRelease(arg)) {
Node h = head;
//如果头节点不为空 并且状态不为0
if (h != null && h.waitStatus != 0)
//唤醒
unparkSuccessor(h);
return true;
}
return false;
}
protected final boolean tryRelease(int releases) {
//state -1
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
//如果c =0 表示当前是无锁状态 把线程iq清空
free = true;
setExclusiveOwnerThread(null);
}
//重新设置 state
setState(c);
return free;
}
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int ws = node.waitStatus;
if (ws < 0)
//设置head节点的状态为0
compareAndSetWaitStatus(node, ws, 0);
/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
//拿到head节点的下一个节点
Node s = node.next;
//如果下一个节点为null 或者 status>0则表示是 CANCELLED 状态
//听过尾部节点开始扫描 找到距离 head最近的一个 waitStatus<=0的节点
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
//如果next 节点不等于空直接唤醒这个线程
if (s != null)
LockSupport.unpark(s.thread);
}
判断当前节点的状态 如果节点状态已失效 则从tail节点开始扫描找到离head节点最近且状态为SIGNAL的节点
通过LockSupport.unpark方法唤醒该节点
被唤醒的线程会再次去抢占锁资源