文章目录
- 悲观锁 和 乐观锁
- 1.基于CAS实现乐观锁
- 2.自旋锁
- 2.1.不可重入自旋锁
- 2.2.可重入自旋锁
- 2.3.CLH自旋锁
悲观锁 和 乐观锁
Java中的synchronized就是悲观锁的一个实现,悲观锁可以确保无论哪个线程持有锁,都能独占式的访问临界区代码,虽然悲观锁的实现比较简单,但是还是会存在不少问题。
悲观锁总是假设会发生最坏的情况,每次线程去读取数据的时候,也会上锁。这样其他线程在读取数据的时候也会被阻塞,直到它拿到锁,传统的关系型数据库就用到了很多的悲观锁,如行锁
,表锁
,读锁
,写锁
。
悲观锁会存在以下几个问题:
- 在多线程环境下, 加锁和释放锁都会导致线程
上下文的切换
以及调度延时
,会引发一系列性能问题。 - 一个线程持有锁的时候,会导致其他抢锁线程都被
临时挂起
。 - 如果一个
线程优先级高的线程
等待一个优先级低的线程
释放锁,就会导致线程优先级倒置
,从而引发性能风险。
解决以上悲观锁的方式,就是使用乐观锁去替代 悲观锁。乐观锁其实时一种思想,在使用乐观锁的时候,每次线程
都去读取
的数据的时候都认为其他线程不会进行修改
,所以不会上锁
,仅仅在更新
的时候判断一下其他线程有没有去更新这个数据
。
数据库操作中的带版本号的数据更新,JUC原子类中都使用了乐观锁的方式来提高性能。
这里针对于悲观锁,我们就就不在举例说明了,感兴趣的话可以去学习一下看一下之前的synchronized章节。
1.基于CAS实现乐观锁
乐观锁的实现步骤主要就两个
(1)冲突监测
(2)数据更新
乐观锁时一种比较典型的CAS原子操作,JUC强大的高并发性能就是建立在CAS原子操作上的,CAS操作中包含
三个操作数
(1)需要操作的内存位置(V)
(2)进行比较的预期原值(A)
(3)拟写入的新值(B)
如果
内存 V的位置的值
和预期原值 A
比较一致
,那么CPU会自动将该位置的值替换为新的值 B
,否则
CPU不做任何操作
下面我们通过一个案例来了解一下CAS中的乐观锁,在Java中,乐观锁的一种常见实现方式是借助于java.util.concurrent.atomic
包下的原子类,比如AtomicInteger
。下面我们通过一个简单的银行账户转账的例子来展示如何使用CAS(Compare-And-Swap)操作实现乐观锁。
/**
* CAS乐观锁的一个实现
*/
public class OptimismLockDemo {
private final Logger logger = LoggerFactory.getLogger(OptimismLockDemo.class);
@Test
@DisplayName("测试JUC的CAS操作")
public void testOptimismLock() {
// 初始余额设置100
Bank bank = new Bank(100);
// 线程1存钱 100
Thread t1 = new Thread(() -> {
if (bank.updateCount(100)) {
logger.error("存钱100成功!");
} else {
logger.error("取钱成功!");
}
}, "t1");
// 线程2 取钱 100
Thread t2 = new Thread(() -> {
if (bank.updateCount(-100)) {
logger.error("取钱100成功!");
} else {
logger.error("取钱失败!");
}
}, "t2");
// 启动t1 和 t2 线程
t1.start();
t2.start();
// 等待全部线程执行完毕
try {
t2.join();
t1.join();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
static class Bank {
// 定义余额
private AtomicInteger balance;
Bank(int initBalance) {
this.balance = new AtomicInteger(initBalance);
}
public boolean updateCount(int money) {
// 获取当前余额
int currentMoney = balance.get();
// 计算出预期的结果
int newMoney = currentMoney + money;
// 如果是转出且余额不足,直接返回false,否则尝试更新余额
if (money < 0 && newMoney < 0) {
return false;
}
// 尝试更新余额,这里就是CAS操作的核心
// compareAndSet比较并交换,如果当前余额仍为currentMoney 则更新为newMoney,否则就告知更新失败
return balance.compareAndSet(currentMoney, newMoney);
}
}
}
在这个例子中,我们创建了一个初始余额为100的账户,然后启动了两个线程分别执行转出100元和转入100元的操作。由于转账方法基于CAS实现,因此在并发环境下能够确保转账的原子性和正确性,避免了传统锁机制可能导致的性能瓶颈。
2.自旋锁
在实际情况中,一个成功的数据更新操作可能需要多次执行CAS(比较并交换)操作,这就是所谓的CAS自旋。通过反复尝试,直至更新成功,这样的机制无需锁定资源,实现了多线程环境下变量状态的高效协同,我们称之为“无锁同步”或“非阻塞同步”。这种方式,正是乐观锁的核心思想之一,它体现了在并发编程领域追求高性能与低冲突的“乐观”策略。
2.1.不可重入自旋锁
自旋锁(SpinLock)的基本含义就是:当一个线程在获取锁 的时候,如果锁已经被其他线程获取,那么该调用线程就一致那里循环监测锁是否已经被释放,一直到获取那个锁之后才会退出循环。
package com.hrfan.thread.lock.type.spin;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
/**
* 自旋锁
* 不可重入锁
*/
public class SpinLockDemo {
private static final Logger log = LoggerFactory.getLogger(SpinLockDemo.class);
@Test
@DisplayName("测试不可重入自旋锁")
public void testSpinLock() {
SpinLock spinLock = new SpinLock();
// 这里为了模拟多线程的一个并发性能 我们使用线程池来进行测试
ExecutorService executorService = Executors.newFixedThreadPool(10);
for (int i = 0; i < 3; i++) {
executorService.submit(() -> {
spinLock.lock();
try {
// 模拟执行操作
log.error("抢锁成功!执行操作");
} finally {
spinLock.unlock();
}
});
}
executorService.shutdown();
}
static class SpinLock implements Lock {
/**
* 当前锁的拥有者
*/
private AtomicReference<Thread> owner = new AtomicReference<>();
@Override
public void lock() {
// 抢占锁
// 获取当前线程
Thread thread = Thread.currentThread();
// 开始抢占锁 (不断cas 直到owner的值为null 才更新为当前线程)
while (!owner.compareAndSet(null, thread)) {
log.error("抢锁失败!让出剩余CPU时间片!");
// 如果抢锁失败(即当前锁已被其他线程持有),则让出CPU时间片给其他线程,稍后再试
Thread.yield();
}
}
@Override
public void unlock() {
// 通过代码 我们可以发现,SpinLock是不支持重入的,在一个线程获取锁没有释放之前,它不可能再次获得锁。
// 释放锁
Thread thread = Thread.currentThread();
// 只有拥有者才能够释放锁
if (thread == owner.get()) {
log.error("释放锁成功!");
// 这里设置为拥有者为空,不需要在使用compareAndSet() 因为上面已经判断
owner.set(null);
}
}
@Override
public void lockInterruptibly() throws InterruptedException {
}
@Override
public boolean tryLock() {
return false;
}
@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
return false;
}
@Override
public Condition newCondition() {
return null;
}
}
}
2.2.可重入自旋锁
为了实现可重入自旋锁,这里引入一个计数器,用来记录一个线程获取锁的次数。
package com.hrfan.thread.lock.type.spin;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
/**
* 可重入自旋锁
*/
public class ReentrantSpinLockDemo {
ReentrantSpinLock lock = new ReentrantSpinLock();
@Test
@DisplayName("测试可重入锁")
public void test() throws InterruptedException {
new Thread(() -> {
lock.lock();
try {
log.error("开始执行任务! 重入次数{}", lock.getCount());
reentrantSpinLock();
} finally {
lock.unlock();
}
}, "t1").start();
Thread.sleep(1000);
}
public void reentrantSpinLock() {
lock.lock();
try {
log.error("开始执行重入方法! 重入次数{}", lock.getCount());
} finally {
lock.unlock();
}
}
private static final Logger log = LoggerFactory.getLogger(ReentrantSpinLockDemo.class);
static class ReentrantSpinLock implements Lock {
/**
* 当前锁的拥有者
* 使用拥有者的Thread作为同步状态,而不是使用一个简单的整数作为同步状态
*/
private AtomicReference<Thread> owner = new AtomicReference<>();
/**
* 记录一个线程同步获取锁的状态
*/
private int count = 0;
@Override
public void lock() {
// 抢占锁
Thread thread = Thread.currentThread();
// 如果是重入 增加重入次数返回
if (thread == owner.get()) {
count++;
return;
}
// 如果不是重入 那么进行自旋操作
while (!owner.compareAndSet(null, thread)) {
log.error("抢锁失败!让出剩余CPU时间片!");
// 如果抢锁失败(即当前锁已被其他线程持有),则让出CPU时间片给其他线程,稍后再试
Thread.yield();
}
}
@Override
public void unlock() {
// 只有拥有者才能释放锁
Thread thread = Thread.currentThread();
if (thread == owner.get()) {
// 如果发现count的次数不是 0减少重入次数 并返回
if (count > 0) {
count--;
} else {
// 直接将拥有者设置为空
owner.set(null);
}
}
}
@Override
public void lockInterruptibly() throws InterruptedException {
}
@Override
public boolean tryLock() {
return false;
}
@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
return false;
}
@Override
public Condition newCondition() {
return null;
}
public int getCount() {
return count;
}
}
}
自旋锁的特点:线程在获取锁的时候,如果锁被其他线程持有,当前线程循环等待,直到获取锁。线程抢锁期间线程的状态不会改变
,一直时运行状态,在操作系统层面线程处于用户态。
自旋锁的问题:在争用激烈的场景下,如果某个线程持有的锁的时间太长,就会导致其他自旋线程的CPU资源耗尽
,另外,如果大量的线程进行空自旋,还可能导致硬件层面的总线风暴
2.3.CLH自旋锁
前面提到了CAS自旋
可能引发的一些性能问题
,尤其是它可能导致CPU层面的总线争用(总线风暴)
。面对这一挑战,Java并发包(JUC)中的轻量级锁如何有效规避这一问题呢?其解决方案在于利用队列机制对试图获取锁的线程进行有序排列
,从而大幅度减少了CAS操作的频次,从根本上减轻了对CPU和总线的压力。
具体而言,CLH锁
作为一种典型的基于队列
(常采用单向链表形式)实现的自旋锁机制
,为这一问题提供了高效的解答。在CLH锁的模型中,任何一个请求加锁的线程首先会尝试通过CAS操作将自己的信息节点添加到队列的末端
。一旦成功入队
,该线程随后仅需在其直接前驱节点
上执行相对简单的自旋
等待操作,直至前驱释放锁资源。
这一设计精妙之处在于,CLH锁仅在节点初次尝试加入队列时涉及一次CAS操作
。一旦入队完成
,后续的自旋过程无需进一步的CAS介入
,仅需执行标准的自旋逻辑即可。因此,在高度竞争的并发场景下,CLH锁能够显著削减CAS操作的总量,有效避开了可能引发的总线风暴现象。
在Java并发工具包(JUC)中,显式锁的实现底层依赖于AbstractQueuedSynchronizer(AQS),这一框架实质上是对CLH锁原理的一种扩展与变体应用,进一步强化了锁的管理和线程调度能力,展现了高级的并发控制策略。
上面提到的简单自旋 其实就是
普通自旋是指线程在未能立即获取锁时,不进行复杂的操作如CAS(比较并交换)而是执行简单的循环检查某个条件是否满足(比如前驱节点是否已经释放了锁)。
下面我们通过一个案例来学习一下CLH自旋锁
package com.hrfan.thread.lock.type.spin;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.sound.sampled.FloatControl;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
/**
* CLH版本自旋锁
*/
public class CLHSpinLockDemo {
private static final Logger log = LoggerFactory.getLogger(CLHSpinLockDemo.class);
public static int count = 0;
@Test
@DisplayName("测试CLH自旋锁")
public void test() {
long startTime = System.currentTimeMillis();
// 创建CLH自旋锁
CLHSpinLock lock = new CLHSpinLock();
// ReentrantLock lock = new ReentrantLock();
// 线程数量
int threads = 10;
// 每次执行的次数
int turns = 10000;
// 通过线程池来创建线程
ExecutorService executorService = Executors.newFixedThreadPool(threads);
// 创建计时器
CountDownLatch latch = new CountDownLatch(threads);
for (int i = 0; i < threads; i++) {
executorService.submit(() -> {
for (int j = 0; j < turns; j++) {
// 创建锁
lock.lock();
try {
count++;
} finally {
lock.unlock();
}
}
// 更新计时器
latch.countDown();
});
}
// 等待全部线程执行完毕
try {
latch.await();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
long endTime = System.currentTimeMillis();
log.error("-------------线程执行结束,最终结果为:{}", count);
log.error("耗时:{}", endTime - startTime);
}
static class Node {
// 当前线程正在抢占锁,或者已经占有锁 true
// 当前线程已经释放锁 下一个线程可以占有锁了 false
volatile boolean locked;
// 前驱节点 需要监听其lock字段
Node preNode;
public Node(boolean locked, Node preNode) {
this.locked = locked;
this.preNode = preNode;
}
// 空节点
public static final Node EMPTY = new Node(false, null);
public boolean isLocked() {
return locked;
}
public void setLocked(boolean locked) {
this.locked = locked;
}
public Node getPreNode() {
return preNode;
}
public void setPreNode(Node preNode) {
this.preNode = preNode;
}
}
static class CLHSpinLock implements Lock {
// 创建当前节点的本地变量
private static ThreadLocal<Node> currentNodeLocal = new ThreadLocal<Node>();
// CLH队列的尾指针,使用AtomicReference,方法CAS操作
AtomicReference<Node> tail = new AtomicReference<>(null);
public CLHSpinLock() {
// 设置尾部节点
tail.getAndSet(Node.EMPTY);
}
@Override
public void lock() {
// 加锁操作
// 将节点添加到等待队列的尾部
Node curNode = new Node(true, null);
Node preNode = tail.get();
// 通过CAS自旋 将当前节点插入到队列的尾部
while (!tail.compareAndSet(preNode, curNode)) {
preNode = tail.get();
}
// 设置前驱节点
curNode.setPreNode(preNode);
// 自旋监听前驱节点的locked变量,直到其值为false,如果前驱节点为true说明上一个线程还没释放锁
while (curNode.getPreNode().isLocked()) {
// 抢锁失败 让出cpu时间片
Thread.yield();
}
// 到这里说明已经抢到了锁
// log.error("已经抢锁成功!");
// 将当前节点缓存到线程本地变量中 释放锁的时候需要使用
currentNodeLocal.set(curNode);
}
@Override
public void lockInterruptibly() throws InterruptedException {
}
@Override
public boolean tryLock() {
return false;
}
@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
return false;
}
@Override
public void unlock() {
// 释放锁
// 获取当前线程的threadLocal
Node node = currentNodeLocal.get();
// 将locked标志改为false
node.setLocked(false);
// 将前驱节点设置为null 断开引用方便垃圾回收
node.setPreNode(null);
// 释放当前缓存中的线程信息
currentNodeLocal.set(null);
}
@Override
public Condition newCondition() {
return null;
}
}
}