aqs的今生

news2025/2/28 19:50:43

《AQS的前世,从1990年的论文说起》中我们已经对AQS做了简单的介绍,并学习了先于AQS出现的3种基于排队思想的自旋锁。今天我们深入到AQS的设计中,探究Doug Lea是如何构建JUC框架基础组件的。不过在正式开始前,我们先来回顾上一篇中提到的面试题:

  • 原理相关:AQS是什么?它是怎样实现的?
  • 设计相关:如何使用AQS实现Mutex?

希望今天可以帮你解答上面的问题。

Tips

  • 本文基于Java 11完成,与Java 8存在部分差异,注意区分;
  • Doug Lea The java.util.concurrent Synchronizer Framework 2004;
  • 非常幸运,2017年就有大佬完成了论文的翻译:《The java.util.concurrent Synchronizer Framework》 JUC同步器框架(AQS框架)原文翻译。

初衷与目的

《The java.util.concurrent Synchronizer Framework》中清晰的阐述了Doug Lea设计AQS的目的:

This framework provides common mechanics for atomically managing synchronization state, blocking and unblocking threads, and queuing.

(AQS)框架为同步状态的原子性管理,线程的阻塞和唤醒以及排队提供了一种通用的机制。也就是说,可以通过AQS去构建不同的同步器,如:基于AQS而诞生的ReentrantLock

基于构建通用同步机制的目的,Doug Lea分析了各种同步器,总结出它们共同的特性:

  • acquire操作:阻塞调用线程,直到同步状态允许其继续执行;
  • release操作:改变同步状态,唤醒被阻塞的线程。

除此之外,论文中也提到了对AQS的性能要求,Doug Lea认为大家在分析synchronized时提到的2个问题:

  • 如何最小化空间开销(因为任意Java对象都可以作为锁)
  • 如何最小化单核处理器的单线程环境下的时间开销

都不是AQS要考虑的,他认为AQS需要考虑的是scalability(可伸缩性),即大部分场景中,即便存在竞争,也能提供稳定的效率。原文中是这样描述的:

Among the main goals is to minimize the total amount of time during which some thread is permitted to pass a synchronization point but has not done so.

(AQS)主要目标之一是使某一线程被允许通过同步点但还没有通过的情况下耗费的总时间最少,即从一个线程释放锁开始,到另一个线程获取锁,这个过程锁消耗的时间。

设计与实现

Doug Lea先是完成了acquire操作和release操作的伪代码设计:

// acquire操作
while (synchronization state does not allow acquire) {
	enqueue current thread if not already queued;
	possibly block current thread;
}
dequeue current thread if it was queued;

// release操作
update synchronization state;
if (state may permit a blocked thread to acquire)
unblock one or more queued threads;

为了实现上述的操作,需要以下组件的协同工作:原子管理的同步状态,线程的阻塞与唤醒,以及队列

同步状态

AQS使用volatile修饰的int类型变量state保存同步状态,并提供getState,setState和compareAndSetState方法。

AQS中,state不仅用作表示同步状态,也是某些同步器实现的计数器,如:Semaphore中允许通过的线程数量,ReentrantLock中可重入特性的实现,都依赖于state作为计数器的特性。

早期,Java对long类型变量的原子操作需要借助内置锁来完成,性能较差,并且除了CyclicBarrier外,其余同步器使用32位的int类型已经能够满足需求,因此在AQS诞生初期,state可以使用int类型。

Tips

  • CyclicBarrier通过锁来实现;
  • Java 1.6中提供了使用long类型的AbstractQueuedLongSynchronizer;
  • 注意要区别同步状态与线程在队列中的状态。

阻塞与唤醒

早期,线程的阻塞与唤醒只能通过Thread#suspendThread#resume实现,但存在竞态问题,即一个线程先调用了Thread#resume后调用Thread#suspend,那么Thread#resume不会产生任何作用。

AQS使用LockSupport#parkLockSupport#unpark实现阻塞与唤醒,特点是如果LockSupport#unpark发生在LockSupport#park前,则此次的LockSupport#park无效。

Tips:无论提前调用多少次LockSupport#unpark,都只会使后一次LockSupport#park无效。

CLH队列

队列的设计是构建AQS的关键,Doug Lea在论文中使用“The heart of”来形容:

The heart of the framework is maintenance of queues of blocked threads, which are restricted here to FIFO queues.

Doug Lea参考了CLH的设计, 保留了基本的设计,由前驱节点做阻塞与唤醒的控制,但是在队列的选择上做出了改变,AQS选择双向链表来实现队列,节点中添加了prev和next指针。

添加prev指针主要是为了实现取消功能,而next指针的加入可以方便的实现唤醒后继节点。

AQS源码分析

再次强调,本文基于Java 11完成,与Java 8的源码存在差异,如,操作同步状态state时,Java 8借助了UnSafe,而Java 11中使用了VarHandle。另外,本文只讨论AQS的独占(EXCLUSIVE)模式因此会跳过共享(SHARED)模式

队列的结构

有了《AQS的前世,从1990年的论文说起》的铺垫,再结合Doug Lea论文中的描述,我们可以很容易想象到AQS中队列节点的结构:线程状态,前驱节点指针,后继节点指针以及用于保存线程的变量。事实也和我们的猜想十分接近:

static final class Node {
	volatile int waitStatus;
	volatile Node prev;
	volatile Node next;
	volatile Thread thread;
	Node nextWaiter;
}

注意,Node的waitStatus表示线程在队列中的状态,AQS的state表示同步器的状态。Node中定义了waitStatus的5种状态:

  • CANCELLED:1,线程获取锁的请求已经取消;
  • SIGNAL :-1,节点释放后,需要唤醒后继节点
  • CONDITION:-2,节点处于条件队列中;
  • PROPAGATE:-3,共享(SHARED)模式下使用;
  • 0,初始化Node时的默认值。

AQS的实现中,并不是后继节点“监听”前驱节点的状态,来决定自身是否持有锁,而是通过前驱节点释放锁,并主动唤醒后继节点来实现排队的

AQS的结构

AQS的结构就更加简单了:

private transient volatile Node head;
private transient volatile Node tail;
private volatile int state;

static final long SPIN_FOR_TIMEOUT_THRESHOLD = 1000L;

总共4个成员变量,除了我们意料之中的,队列的头尾节点和AQS的同步状态外,还有SPIN_FOR_TIMEOUT_THRESHOLD。看名字会有些误解,以为是自旋的阈值,实际上并不是,AQS提供了带有超时时间的方法,例如doAcquireNanos方法:

private boolean doAcquireNanos(int arg, long nanosTimeout) throws InterruptedException {
	final long deadline = System.nanoTime() + nanosTimeout;
	final Node node = addWaiter(Node.EXCLUSIVE);
	for (;;) {
		final Node p = node.predecessor();
		nanosTimeout = deadline - System.nanoTime();
		if (shouldParkAfterFailedAcquire(p, node) && nanosTimeout > SPIN_FOR_TIMEOUT_THRESHOLD) {
			LockSupport.parkNanos(this, nanosTimeout);
		}
	}
}

可以看到只有在剩余的nanosTimeout大于SPIN_FOR_TIMEOUT_THRESHOLD时,才会调用LockSupport#parkNanos(this, nanosTimeout)

Tips

  • Java 11中,无论是AQS还是Node中都使用了VarHandle,定义了大量的成员变量,我们跳过这部分;
  • 删除了doAcquireNanos方法中大部分内容,重点展示nanosTimeout和SPIN_FOR_TIMEOUT_THRESHOLD的关系。

获取锁

如果是你,你会如何设计AQS的加锁过程?我可能会“按部就班”的构建队列,并将等待线程逐个的加入的队列中。那Doug Lea是如何设计AQS加锁过程的呢?

public final void acquire(int arg) {
if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
	selfInterrupt();
}

acquire方法中,Doug Lea设计了4步操作,如果仅从名字来看,首先tryAcquire尝试获取锁,如果获取失败,则通过addWaiter加入等待,然后调用acquireQueued方法进入排队状态,最后是通过调用selfInterrupt方法使当前线程中断。先来看AQS中的tryAcquire方法。

protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}

AQS中并未给出任何实现,它要求子类必须重写tryAcquire方法,否则抛出异常。

addWaiter方法

接着是addWaiter方法:

private Node addWaiter(Node mode) {
// 注释1:创建节点,通过acquire进入时mode = Node.EXCLUSIVE
Node node = new Node(mode);
for (;;) {
	// 注释2:获取尾节点
	Node oldTail = tail;
	if (oldTail != null) {
		// 注释5:添加新的尾节点
		node.setPrevRelaxed(oldTail);
		if (compareAndSetTail(oldTail, node)) {
			oldTail.next = node;
			return node;
		}
	} else {
		// 注释3:尾节点为空则初始化队列
		initializeSyncQueue();
	}
}
}

static final class Node {
	Node(Node nextWaiter) {
		this.nextWaiter = nextWaiter;
		// 可以看做是:this.thread = Thread.currentThread()
		THREAD.set(this, Thread.currentThread());
	}
}

private final void initializeSyncQueue() {
	Node h;
	// 注释4:创建空节点,作为尾节点
	if (HEAD.compareAndSet(this, null, (h = new Node())))
		tail = h;
}

addWaiter的逻辑并不复杂:

  • 注释1:创建节点node;
  • 注释2:获取AQS的尾节点oldTail,并判断是否存在尾节点;
  • 注释3:初始化队列;
  • 注释4:创建空节点h,作为AQS的头尾节点;
  • 注释5:更新AQS的尾节点为node,并与oldTail关联。

我们知道,只有tryAcquire失败后,才会调用addWaiter方法,也就是说,如果实现了tryAcquire获取锁的逻辑,那么在没有竞争的场景下,AQS就不会构建等待队列

回过头来看addWaiter做了什么?它的核心功能是初始化的等待队列,并返回当前队列的尾节点

acquireQueued方法

当addWaiter创建了等待队列并返回尾节点后,就进入了acquireQueued方法中:

final boolean acquireQueued(final Node node, int arg) {
	// 是否中断的标记
	boolean interrupted = false;
	try {
		for (;;) {
			// 注释1:获取node的前驱节点,node更名为currentNode更方便我理解
			final Node p = node.predecessor();
			if (p == head && tryAcquire(arg)) {
				setHead(node);
				p.next = null;
				return interrupted;
			}

			// 注释2:判断是否需要park当前线程
			if (shouldParkAfterFailedAcquire(p, node))
				interrupted |= parkAndCheckInterrupt();
		}
	} catch (Throwable t) {
		cancelAcquire(node);
		if (interrupted)
			selfInterrupt();
		throw t;
	}
}

注释1的部分,获取到node的前驱节点p,如果p为头节点,则当前线程直接通过tryAcquire尝试获取锁。如果p不是头节点的话可以直接调用tryAcquire吗?

答案是不可以,如果p不是头节点,也就证明当前线程不在获取锁的第二顺位上,前面可能还有若干节点在等待锁,如果任意节点都直接调用tryAcquire,那就失去了acquireQueued方法的意义。

注释2的部分,p不是头节点的情况,也就是当前线程非第二顺位获取锁。那么node就要根据前驱节点的状态来判断是否中断执行了:

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
	// 获取前驱节点的状态,waitStatus初始化的状态为0
	int ws = pred.waitStatus;
	if (ws == Node.SIGNAL)
		// 注释2:前驱节点处于Node.SIGNAL状态
		return true;
	if (ws > 0) {
		do {
			node.prev = pred = pred.prev;
		} while (pred.waitStatus > 0);
		pred.next = node;
	} else {
		// 注释1:更新前驱节点的状态为Node.SIGNAL
		pred.compareAndSetWaitStatus(ws, Node.SIGNAL);
	}
	return false;
}

private final boolean parkAndCheckInterrupt() {
	// 暂停线程
	LockSupport.park(this);
	return Thread.interrupted();
}

addWaiter的流程中可以看到,处理node的过程中并没有处理node.waitStatus,此时waitStatus == 0,那么对于node的前驱节点pred也是一样的,因此第一次执行shouldParkAfterFailedAcquire方法时,会进入注释1的部分,并返回false。

再次进入acquireQueued的循环后,shouldParkAfterFailedAcquire返回true,执行parkAndCheckInterrupt方法,需要注意LockSupport#park(this)会让线程暂停在此处,也就是说如果没有线程唤醒,线程会一直停留在此处

至此,AQS的加锁过程已经结束了,我们画张图来回顾下这个过程:

释放锁

接着来看解锁的过程:

public final boolean release(int arg) {
if (tryRelease(arg)) {
	Node h = head;
	if (h != null && h.waitStatus != 0)
		unparkSuccessor(h);
	return true;
}
return false;
}

按照AQS的风格tryRelease必然是要交给子类实现的:

protected boolean tryRelease(int arg) {
throw new UnsupportedOperationException();
}

果不其然。假设tryRelease执行成功,接下来会发生什么?

  • 获取头节点h;
  • 判断头节点的状态h.waitStatus != 0
  • 执行unparkSuccessor。

来看unparkSuccessor的代码:

private void unparkSuccessor(Node node) {
// node是当前线程的前驱节点,也是head节点
int ws = node.waitStatus;
if (ws < 0)
	// 处理node的waitStatus
	node.compareAndSetWaitStatus(ws, 0);
Node s = node.next;
// 注释2:从后向前遍历待唤醒的节点
if (s == null || s.waitStatus > 0) {
	s = null;
	for (Node p = tail; p != node && p != null; p = p.prev)
	if (p.waitStatus <= 0)
		s = p;
}

// 注释1:唤醒后继节点
if (s != null)
	LockSupport.unpark(s.thread);
}

如果一切顺利,那么unparkSuccessor时会跳过注释2的部分,直接执行注释1的LockSupport.unpark。

不过别忘了,待唤醒的线程此时还在acquireQueued方法中阻塞着,唤醒的线程会继续执行acquireQueued中的内容,调用tryAcquire获取锁,并更新AQS的头节点

我们设想一个场景:

当addWaiter执行到compareAndSetTail(oldTail, node)时调用了unparkSuccessor,可能会出现一种情况:

即T1已经与HEAD建立了联系,但HEAD却没有与T1建立联系。因此注释2中,判断HEAD节点没有后继节点时,会通过TAIL节点,从后向前遍历等待队列,查找待唤醒的线程。

好了,AQS的核心源码分析到这里就结束了,至于条件队列,共享模式等就留给大家自行探索了。

构建互斥锁

学习完AQS的核心原理后,我们来实践一下,借助AQS来构建构建互斥锁:

public class MutexLock {
	public void lock() {
		sync.acquire(1);
	}

	public void unlock() {
		sync.release(0);
	}

	private final Sync sync = new Sync();

	static class Sync extends AbstractQueuedSynchronizer {

		@Override
		protected boolean tryAcquire(int arg) {
			Thread currentThread = Thread.currentThread();
			if(compareAndSetState(0, arg)) {
				setExclusiveOwnerThread(currentThread);
				return true;
			}else {
				return false;
			}
		}

		@Override
		protected boolean tryRelease(int arg) {
			if(getState() != 1) {
				return false;
			}
			setState(arg);
			return true;
		}
	}
}

通过AQS实现只有基础功能的互斥锁还是非常简单的,甚至在重写tryAcquire方法时可以不设置独占线程(虽然在现在也没起到作用),只是简单的使用CAS替换掉AQS的state即可:

@Override
protected boolean tryAcquire(int arg) {
return compareAndSetState(0, arg);
}

当然了,这只是一把“玩具锁”,还存在许多问题,比如,非上锁线程依旧可以解锁。其次除了阻塞还排队外,也不支持诸如可重入等高级特性。

结语

好了,关于AQS的部分就到这里了。如果你有看过《AQS的前世,从1990年的论文说起》中基于排队思想自旋锁的演进过程,并理解了MCS锁和CLH锁的实现,那么理解AQS对你来说是非常容易的,虽然它们看起来是不同的东西,但核心原理是相同的,只是在技术实现上有些许差别。

最后,希望通过AQS的前世和今生,能够帮助你重新认识AQS,理解Doug Lea设计这样一个同步器基础组件。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/924571.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Microsoft Excel整合Python:数据分析的新纪元

&#x1f337;&#x1f341; 博主猫头虎 带您 Go to New World.✨&#x1f341; &#x1f984; 博客首页——猫头虎的博客&#x1f390; &#x1f433;《面试题大全专栏》 文章图文并茂&#x1f995;生动形象&#x1f996;简单易学&#xff01;欢迎大家来踩踩~&#x1f33a; &a…

单片机UART一对多:同时读取多个传感器基于modbus协议

文章目录 背景MODBUS协议介绍UART接口改一对多参考链接 背景 很多传感器现在都做成了串口模块&#xff0c;如激光测距传感器TOF050&#xff0c;在开发时使用串口功能模块不仅大大加快了我们的开发进度&#xff0c;还能降低功能模块直接的耦合度&#xff0c;专业是功能交给专业…

postman访问ruoyi后台接口

打开若依页面&#xff0c;登录进去&#xff0c;F12打开控制台&#xff0c;选一个后台服务&#xff0c;把下图两个节点&#xff0c;补到postman请求header里面即可

gradio使用transformer模块demo介绍2:Images Computer Vision

文章目录 图像分类 Image Classification图像分割 Image Segmentation图像风格变换 Image Transformation with AnimeGAN3D模型 3D models 图像分类 Image Classification import gradio as gr import torch import requests from torchvision import transformsmodel torch.…

openCV实战-系列教程7:轮廓检测2与模板匹配(轮廓检测/轮廓特征/轮廓近似/轮廓边界矩阵/轮廓边界圆/模版匹配)、原理解析、源码解读

打印一个图片可以做出一个函数&#xff1a; def cv_show(img,name):cv2.imshow(name,img)cv2.waitKey()cv2.destroyAllWindows() 1、轮廓特征与近似 1.1 轮廓特征 前面我们计算了这个图片的轮廓&#xff1a; 它的轮廓信息保存在了contours中&#xff0c;取出第一个轮廓&…

java八股文面试[Spring]——如何实现一个IOC容器

什么是IOC容器 IOC不是一种技术&#xff0c;只是一种思想&#xff0c;一个重要的面向对象编程的法则&#xff0c;它能指导我们如何设计出松耦合&#xff0c;更优良的程序。传统应用程序都是由我们在类内部主动创建依赖对象&#xff0c;从而导致类与类之间高耦合&#xff0c;难于…

时序分解 | MATLAB实现基于SGMD辛几何模态分解的信号分解分量可视化

时序分解 | MATLAB实现基于SGMD辛几何模态分解的信号分解分量可视化 目录 时序分解 | MATLAB实现基于SGMD辛几何模态分解的信号分解分量可视化效果一览基本介绍程序设计参考资料 效果一览 基本介绍 SGMD分解算法&#xff08;辛几何模态分解&#xff09;&#xff0c;分解结果可视…

SQL 大小敏感问题

在SQL中&#xff0c;关键字和函数名 是不区分 大小写的 比如&#xff08;select、where、order by 、group by update 等关键字&#xff09;&#xff0c;以及函数(ABS、MOD、round、min等) window系统默认是大小写不敏感 &#xff08;ZEN文件和zen 文件 不能同时存在&#xff…

maven下载不了仓库地址为https的依赖jar,配置参数忽略ssl安全检查

问题原因 私服使用的https地址&#xff0c;然后安全证书过期的或没有&#xff0c;使用maven命令时&#xff0c;可以添加以下参数&#xff0c;忽略安全检查 mvn -Dmaven.wagon.http.ssl.insecuretrue -Dmaven.wagon.http.ssl.allowalltrue -Dmaven.wagon.http.ssl.ignore.vali…

如何DIY制作干洗店洗护小程序

洗护行业正逐渐迎来线上化的浪潮&#xff0c;传统的干洗店也开始尝试将业务线上化&#xff0c;以提供更便捷的服务给消费者。而制作一款洗护小程序&#xff0c;成为了干洗店实现线上化的重要一环。今天&#xff0c;我们就来分享一下如何使用第三方制作平台制作洗护小程序的教程…

ChatGPT 随机动态可视化图表分析

动态可视化图表分析实例如下图&#xff1a; 这样的动态可视化图表可以使用ChatGPT OpenAI 来实现。 给ChatGPT发送指令&#xff1a; 你现在是一个数据分析师&#xff0c;请使用HTML&#xff0c;JS&#xff0c;Echarts&#xff0c;来完成一个动态条形图&#xff0c;条形图方向横…

java gradle 项目 在idea上 搭建一个简单的thrift实例

前言 Thrift是RPC通信的一种方式&#xff0c;可以通过跨语言进行通信&#xff0c;最近项目需要进行跨语言的通信&#xff0c;因此首先尝试搭建了一个简单的thrift框架&#xff0c;因为网上的实例大都参差不全&#xff0c;通过gpt查询得到的结果对我帮助更大一点&#xff0c;但…

四信5G智慧交通方案

5G是第五代移动通信技术的简称&#xff0c;是具有高速率低时延和大连接特点的新一代宽带移动通信技术&#xff0c;是实现智慧交通中的“车、路、人、环境”等交通要素互联互通的网络基础设施。相比以往的移动通信网络&#xff0c;5G网络以一种灵活部署的架构提供10Gbps以上的带…

实现遮挡层

css实现遮挡层 position: fixed;left: 0;top: 0;bottom: 0;right: 0;opacity: 0.5;background-color: gray;<div class"mask"></div>vue3 teleport 实现 <template><div class"box"><p>这里是box</p><div class&…

车载毫米波雷达的上车安装与标定问题

说明 雷达的上车安装和标定问题在产品开发流程中应该算比较后期的工作了(我的理解应该至少是C样乃至SOP阶段)。当然&#xff0c;如果该雷达在研发的初期就有车型对接并有需求方给的很具体的SOR&#xff0c;那这部分的工作在结构设计和算法开发阶段就可以开展。 上车安装和标定是…

07-Numpy基础-伪随机数生成

numpy.random模块对Python内置的random进行了补充&#xff0c;增加了一些用于高效生成多种概率分布的样本值的函数。 例如&#xff0c;你可以用normal来得到一个标准正态分布的44样本数组&#xff1a; 而Python内置的random模块则只能一次生成一个样本值。从下面的测试结果中可…

Docker consul的容器服务注册与发现

前言一、服务注册与发现二、consul 介绍三、consul 部署3.1 consul服务器3.1.1 建立 Consul 服务3.1.2 查看集群信息3.1.3 通过 http api 获取集群信息 3.2 registrator服务器3.2.1 安装 Gliderlabs/Registrator3.2.2 测试服务发现功能是否正常3.2.3 验证 http 和 nginx 服务是…

【视频】Python用LSTM长短期记忆神经网络对不稳定降雨量时间序列进行预测分析|数据分享...

全文下载链接&#xff1a;http://tecdat.cn/?p23544 在本文中&#xff0c;长短期记忆网络——通常称为“LSTM”——是一种特殊的RNN递归神经网络&#xff0c;能够学习长期依赖关系&#xff08;点击文末“阅读原文”获取完整代码数据&#xff09;。 本文使用降雨量数据&#xf…

C++,运算符重载——关系运算符练习

一、关系运算符重载 > > < < ! #include <iostream> using namespace std;class Relates { private:int a;int b; public:Relates() {}Relates(int a,int b):a(a),b(b) {}bool operator>(const Relates &R) const{if((a>R.a&&b>R.b) …

Docker搭建elasticsearch+kibana测试

最近需要做大数据画像&#xff0c;所以先简单搭建一个eskibana学习使用&#xff0c;记录一下搭建过程和遇到的问题以及解决办法 1.拉取es和kibana镜像 在拉取镜像之前先搜索一下 elasticsearch发现是存在elasticsearch镜像的&#xff0c;我一般习惯性拉取最新镜像&#xff0c…