[Netty] Mpsc Queue (十七)

news2025/1/13 9:46:29

JCTools 是适用于 JVM 并发开发的工具,主要提供了一些 JDK 确实的并发数据结构,例如非阻塞 Map、非阻塞 Queue 等。其中非阻塞队列可以分为四种类型,可以根据不同的场景选择使用。

  • Spsc 单生产者单消费者
  • Mpsc 多生产者单消费者
  • Spmc 单生产者多消费者
  • Mpmc 多生产者多消费者

Netty 中直接引入了 JCTools 的 Mpsc Queue

文章目录

      • 1.Mpsc Queue介绍
      • 2.Mpsc Queue 源码分析
        • 2.1 使用实例
        • 2.2 入队 offer()
        • 2.3 出队 poll()
      • 3.总结

1.Mpsc Queue介绍

Mpsc 的全称是 Multi Producer Single Consumer, 多生产者单消费者。

Mpsc Queue 可以保证多个生产者同时访问队列是线程安全的, 而且同一时刻只允许一个消费者从队列中读取数据, Netty Reactor 线程中任务队列 taskQueue 必须满足多个生产者可以同时提交任务, 所以 JCTools 提供的 Mpsc Queue 非常适合 Netty Reactor 线程模型。

Mpsc Queue 有多种的实现类, MpscArrayQueue, MpscUnboundedArrayQueue, MpscChunkedArrayQueue

在这里插入图片描述

MpscArrayQueue 继承了MpscXxxPad 和 MpscXxxField。每个有包含属性的类后面都会被 MpscXxxPad 类隔开。

// ConcurrentCircularArrayQueueL0Pad
long p01, p02, p03, p04, p05, p06, p07;
long p10, p11, p12, p13, p14, p15, p16, p17;

// ConcurrentCircularArrayQueue
protected final long mask;
protected final E[] buffer;

// MpmcArrayQueueL1Pad
long p00, p01, p02, p03, p04, p05, p06, p07;
long p10, p11, p12, p13, p14, p15, p16;

// MpmcArrayQueueProducerIndexField
private volatile long producerIndex;

// MpscArrayQueueMidPad
long p01, p02, p03, p04, p05, p06, p07;
long p10, p11, p12, p13, p14, p15, p16, p17;

// MpscArrayQueueProducerLimitField
private volatile long producerLimit;

// MpscArrayQueueL2Pad
long p00, p01, p02, p03, p04, p05, p06, p07;
long p10, p11, p12, p13, p14, p15, p16;

// MpscArrayQueueConsumerIndexField
protected long consumerIndex;

// MpscArrayQueueL3Pad
long p01, p02, p03, p04, p05, p06, p07;
long p10, p11, p12, p13, p14, p15, p16, p17;

MpscXxxPad 类中使用了大量 long 类型的变量, 是为了解决伪共享(false sharing)问题。

Mpsc Queue 采取了空间换时间的策略, 让不同线程共享的对象加载到不同的缓存行。

public class FalseSharingPadding {
    protected long p1, p2, p3, p4, p5, p6, p7;
    protected volatile long value = 0L;
    protected long p9, p10, p11, p12, p13, p14, p15;
}

变量 value 前后都填充了 7 个 long 类型的变量, 可以保证在多线程访问 value 变量时, value 与其他不相关的变量处于不同的 Cache Line。

在这里插入图片描述

2.Mpsc Queue 源码分析

MpscArrayQueue 属性

// ConcurrentCircularArrayQueue
protected final long mask; // 计算数组下标的掩码
protected final E[] buffer; // 存放队列数据的数组

// MpmcArrayQueueProducerIndexField
private volatile long producerIndex; // 生产者的索引

// MpscArrayQueueProducerLimitField
private volatile long producerLimit; // 生产者索引的最大值

// MpscArrayQueueConsumerIndexField
protected long consumerIndex; // 消费者索引

mask 变量表明队列中数组的容量大小肯定是 2 的次幂, Mpsc 是多生产者单消费者队列, 所有producerIndex 和 producerLimit 都是volatile修饰, 其中一个生产者线程的修改需要对其他生产者线程可见

2.1 使用实例

public class MpscArrayQueueTest {
    public static final MpscArrayQueue<String> MPSC_ARRAY_QUEUE = new MpscArrayQueue<>(2);
    public static void main(String[] args) {
        for (int i = 1; i <= 2; i++) {
            int index = i;
            new Thread(() -> MPSC_ARRAY_QUEUE.offer("data" + index), "thread" + index).start();
        }
        try {
            Thread.sleep(1000L);
            MPSC_ARRAY_QUEUE.add("data3"); // 入队操作,队列满则抛出异常
        } catch (Exception e) {
            e.printStackTrace();
        }
        System.out.println("队列大小:" + MPSC_ARRAY_QUEUE.size() + ", 队列容量:" + MPSC_ARRAY_QUEUE.capacity());
        System.out.println("出队:" + MPSC_ARRAY_QUEUE.remove()); // 出队操作,队列为空则抛出异常
        System.out.println("出队:" + MPSC_ARRAY_QUEUE.poll()); // 出队操作,队列为空则返回 NULL
    }
}
java.lang.IllegalStateException: Queue full
	at java.util.AbstractQueue.add(AbstractQueue.java:98)
	at MpscArrayQueueTest.main(MpscArrayQueueTest.java:17)
队列大小:2, 队列容量:2
出队:data1
出队:data2
Disconnected from the target VM, address: '127.0.0.1:58005', transport: 'socket'

入队 offer()和出队 poll()

2.2 入队 offer()

public boolean offer(E e) {
    if (null == e) {
        throw new NullPointerException();
    } else {
        long mask = this.mask;
        long producerLimit = this.lvProducerLimit(); // 获取生产者索引最大限制
        long pIndex;
        long offset;
        do {
            pIndex = this.lvProducerIndex(); // 获取生产者索引
            if (pIndex >= producerLimit) {
                offset = this.lvConsumerIndex(); // 获取消费者索引
                producerLimit = offset + mask + 1L;
                if (pIndex >= producerLimit) {
                    return false; // 队列已满
                }
                this.soProducerLimit(producerLimit); // 更新 producerLimit
            }
        } while(!this.casProducerIndex(pIndex, pIndex + 1L)); // CAS 更新生产者索引,更新成功则退出,说明当前生产者已经占领索引值
        offset = calcElementOffset(pIndex, mask); // 计算生产者索引在数组中下标
        UnsafeRefArrayAccess.soElement(this.buffer, offset, e); // 向数组中放入数据
        return true;
    }
}

producerIndex、producerLimit 以及 consumerIndex 之间的关系

public MpscArrayQueueProducerLimitField(int capacity) {
    super(capacity);
    this.producerLimit = capacity;
}

protected final long lvProducerLimit() {
    return producerLimit;
}

初始化状态, producerLimit 队列的容量是相等的, producerIndex = consumerIndex = 0。接下来 Thread1 和 Thread2 并发向 MpscArrayQueue 中存放数据。

在这里插入图片描述

两个线程此时拿到的 producerIndex 都是 0, 小于 producerLimit, 此时两个线程都会尝试使用 CAS 操作更新 producerIndex, 一个成功, 一个失败。

假设 Thread1 执行 CAS 操作成功, Thread2失败会重新更新producerIndex。

Thread1 更新后 producerIndex 的值为 1, 由于 producerIndex 是 volatile 修饰的, 对于Thread2 可见, 当 Thread1 和 Thread2 都通过 CAS 抢占成功后, 拿到的 pIndex 分别是 0 和 1, 根据 pIndex 进行位运算计算得到数组对应的下标, 然后通过 UNSAFE.putOrderedObject() 方法将数据写入到数组中。

在这里插入图片描述

    public static <E> void soElement(E[] buffer, long offset, E e) {
        UnsafeAccess.UNSAFE.putOrderedObject(buffer, offset, e);
    }

putOrderedObject() 不会立刻将数据更新到内存中, 并把其他 Cache Line 置为失效, 使用的是LazySet 延迟更新机制。性能比putObject() 高。

Java 中有四种类型的内存屏障, LoadLoad、StoreStore、LoadStore 和 StoreLoad, putOrderedObject() 使用了 StoreStore, 对于 Store1,StoreStore,Store2 这样的操作序列, 在 Store2 进行写入之前, 会保证 Store1 的写操作对其他处理器可见。

LazySet 机制是有代价的, 是写操作结果有纳秒级的延迟, 不会立刻被其他线程以及自身线程可见。在Mpsc Queue的使用场景中, 多个生产者只负责写入数据, 并没有写入之后立刻读取的需求, 所以使用 LazySet 机制是没有问题的, 只需StoreStore Barrier 保证多线程写入的顺序即可。

对于 do-while 循环内的逻辑, 为什么需要两次 if(pIndex >= producerLimit) 判断呢, 说明当生产者索引大于 producerLimit 阈值时, 可能存在 1> producerLimit 缓存值过期了或者队列已经满了, 需要读取最新的消费者索引 consumerIndex, 重新做一次 producerLimit 计算, 2> 生产者索引还是大于 producerLimit 阈值, 说明队列的真的满了。

因为生产者有多个线程, 所以 MpscArrayQueue 采用了 UNSAFE.getLongVolatile() 方法保证获取消费者索引 consumerIndex 的准确性。
getLongVolatile() 使用了 StoreLoad Barrier, 在 Load2 以及后续的读取操作之前, 会保证 Store1 的写入操作对其他处理器可见。

StoreLoad 是四种内存屏障开销最大的, 引入producerLimit 的好处在于, 假设我们的消费速度和生产速度比较均衡的情况下, 差不多走完一圈数组才需要获取一次消费者索引 consumerIndex, 从而减少了getLongVolatile() 方法的使用次数。

2.3 出队 poll()

移除队列的首个元素并返回, 如果队列为空, 返回Null

public E poll() {
    long cIndex = this.lpConsumerIndex(); // 直接返回消费者索引 consumerIndex
    long offset = this.calcElementOffset(cIndex); // 计算数组对应的偏移量
    E[] buffer = this.buffer;
    E e = UnsafeRefArrayAccess.lvElement(buffer, offset); // 取出数组中 offset 对应的元素
    if (null == e) {
        if (cIndex == this.lvProducerIndex()) { // 队列为空
            return null;
        }
        do {
            e = UnsafeRefArrayAccess.lvElement(buffer, offset); 
        } while(e == null); // 等待生产者填充元素
    }
    UnsafeRefArrayAccess.spElement(buffer, offset, (Object)null); // 消费成功后将当前位置置为 NULL
    this.soConsumerIndex(cIndex + 1L); // 更新 consumerIndex 到下一个位置
    return e;
}

只有一个消费者线程, 所以么有CAS操作, 核心思路是获取消费者索引 consumerIndex, 然后根据 consumerIndex 计算得出数组对应的偏移量, 将数组对应位置的元素取出并返回, 最后将 consumerIndex 移动到环形数组下一个位置。

在这里插入图片描述

public static <E> E lvElement(E[] buffer, long offset) {
    return (E) UNSAFE.getObjectVolatile(buffer, offset);
}

getObjectVolatile() 方法则使用的是 LoadLoad Barrier, 对于 Load1, LoadLoad, Load2 来说, 在 Load2 以及后续读取操作之前, Load1读取操作执行完毕。所以 getObjectVolatile() 方法可以保证每次读取数据都可以从内存中拿到最新值。

当调用 lvElement() 方法获取到的元素为 NULL 时,

  1. 队列为空或者生产者填充的元素还没有对消费者可见。
  2. 如果消费者索引 consumerIndex 等于生产者 producerIndex, 说明队列为空。
  3. 只要两者不相等, 消费者需要等待生产者填充数据完毕。

当成功消费数组中的元素之后, 把当前消费者索引 consumerIndex 的位置置为 NULL, 把 consumerIndex 移动到数组下一个位置。

public static <E> void spElement(E[] buffer, long offset, E e) {
    UNSAFE.putObject(buffer, offset, e);
}

protected void soConsumerIndex(long newValue) {
    UNSAFE.putOrderedLong(this, C_INDEX_OFFSET, newValue);
}

putObject() 不会使用任何内存屏障, 会直接更新对象对应偏移量的值。而 putOrderedLong 与 putOrderedObject() 是一样的, 都使用了 StoreStore Barrier。

3.总结

  1. 通过大量填充 long 类型变量解决伪共享问题
  2. 环形数组的容量设置为 2 的次幂,可以通过位运算快速定位到数组对应下标
  3. 入队 offer() 操作中 producerLimit 的巧妙设计,大幅度减少了主动获取消费者索引 consumerIndex 的次数,性能提升显著
  4. 入队和出队操作中都大量使用了 UNSAFE 系列方法,针对生产者和消费者的场景不同,使用的 UNSAFE 方法也是不一样的。Jctools 在底层操作的运用上也是有的放矢,把性能发挥到极致

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/457119.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

大模型(LLM)训练微调综述学习

总览 介绍大模型训练的微调方法&#xff0c;包括prompt tuning、prefix tuning、LoRA、p-tuning和AdaLoRA等。介绍使用deepspeed和LoRA进行大模型训练的相关代码。给出petals的介绍&#xff0c;它可以将模型划分为多个块&#xff0c;每个用户的机器负责其中一块&#xff0c;分…

PS学习记录-PPI与DPI

先说两者的重点&#xff1a; dpi是印刷图像时候用的&#xff0c;ppi是设计图像时候用的dpi&#xff1a;【点/英寸】 是印刷计量单位&#xff0c;是每英寸上所印刷的【墨点数】&#xff0c;打印图片一般300dpi左右&#xff0c;代表打印的解析度。ppi&#xff1a;【像素/英寸】是…

快速精通Git

一、 版本控制工具 1.1. 什么是版本控制系统&#xff1f; 版本控制系统&#xff08;Version Control System&#xff09;:是一种记录一个或若干文件内容变化&#xff0c;以便将来查阅特定版本修订情况的系统。版本控制系统不仅可以应用于软件源代码的文本文件&#xff0c;而且…

温度调制式差示扫描量热法(MTDSC)中的正弦波温度控制技术

摘要&#xff1a;在调制温度式差式扫描量热仪&#xff08;MTDSC&#xff09;中&#xff0c;关键技术之一是正弦波加热温度的实现&#xff0c;此技术是制约目前国内无法生产MTDSC量热仪的重要障碍&#xff0c;这主要是因为现有的PID温控技术根本无法实现不同幅值和频率正弦波这样…

uie-base使用记录(paddlenlp)

参考文章&#xff1a;https://aistudio.baidu.com/aistudio/modelsdetail?modelId22 参考文章&#xff1a;https://paddlenlp.readthedocs.io/zh/latest/FAQ.html 参考文章&#xff1a;https://developer.aliyun.com/article/1066857 参考文章&#xff1a;https://github.com/…

【计算机基本原理-数据结构】八大数据结构分类

【计算机基本原理-数据结构】八大数据结构分类 1&#xff09;数组2&#xff09;链表3&#xff09;队列4&#xff09;栈5&#xff09;树6&#xff09;图7&#xff09;堆8&#xff09;散列表(哈希表) 数据结构是计算机存储、组织数据的方式。一种好的数据结构可以带来更高的运行或…

Java基础(十三)系统相关类

1. 系统相关类 1.1 java.lang.System类 System类代表系统&#xff0c;系统级的很多属性和控制方法都放置在该类的内部。该类位于java.lang包。 由于该类的构造器是private的&#xff0c;所以无法创建该类的对象。其内部的成员变量和成员方法都是static的&#xff0c;所以也可…

Netty(一)深入Hotspot源码与Linux内核理解NIO与Epoll

深入Hotspot源码与Linux内核理解NIO与Epoll 前言介绍Netty 的介绍Netty 的应用场景理解阻塞和同步关键字初始BIO、NIO、AIOBIO(Blocking IO)缺点&#xff1a;应用场景&#xff1a; NIO(Non Blocking IO)应用场景&#xff1a;NIO非阻塞代码示例存在的问题 NIO 有三大核心组件&am…

Mybatis学习基础篇(一)——使用Maven快速搭建一个mybatis项目,并实现简单的增删改查

题外话&#xff1a; 在了解mybatis框架之前&#xff0c;我先说明一句&#xff0c;目前主流的框架技术层出不穷&#xff0c;每个人都有自己喜欢的技术框架&#xff0c;自己喜欢用就行。技术并没有高低之分&#xff0c;喜欢用就用&#xff0c;虽然目前大部分人都喜欢向新技术看齐…

flex布局下两端对齐,不满左对齐

文章目录 解决方案一 (利用父级的 :after 占位)解决方案二(利用:last-child和:nth-child()占位)解决方案三(补位添加节点法&#xff0c;这种方案适用于多种排列方式) 问题情境&#xff1a; 在flex布局下&#xff0c;多行排列&#xff0c;如何让flex布局最后一行没有排满时&…

如何构建适合自己的DevOps软件测试改进方案

根据2022年的DevOps全球调查报告显示&#xff0c;主流软件企业采用或部分采用DevOps且已获得良好成效的占比已达70%&#xff0c;DevOps俨然成为当下软件开发研究的重要方向。 测试作为软件开发的必要过程&#xff0c;是提升软件可靠性、保证软件质量的关键环节。然而&#xff…

上海亚商投顾:沪指失守3300点 传媒、游戏板块逆市大涨

上海亚商投顾前言&#xff1a;无惧大盘涨跌&#xff0c;解密龙虎榜资金&#xff0c;跟踪一线游资和机构资金动向&#xff0c;识别短期热点和强势个股。 市场情绪 三大指数今日继续调整&#xff0c;沪指午后跌超1%&#xff0c;失守3300点整数关口&#xff0c;创业板指一度跌逾2%…

基于PWM技术的三相光伏逆变器研究(Simulink)

&#x1f4a5;&#x1f4a5;&#x1f49e;&#x1f49e;欢迎来到本博客❤️❤️&#x1f4a5;&#x1f4a5; &#x1f3c6;博主优势&#xff1a;&#x1f31e;&#x1f31e;&#x1f31e;博客内容尽量做到思维缜密&#xff0c;逻辑清晰&#xff0c;为了方便读者。 ⛳️座右铭&a…

【手把手做ROS2机器人系统开发二】熟悉ROS2基本命令

【手把手做ROS2机器人系统开发二】熟悉ROS2基本命令 一、上讲回顾 在上一讲开发环境搭建中&#xff0c;我们讲解了如何搭建Ubuntu系统环境和ROS2开发运行环境。 1.Ubuntu系统安装 2.ROS2系统环境安装 二、ROS2核心命令讲解 1、daemon-各种守护进程相关的子命令 查看帮助&am…

[网络原理] 详解Cookie与Session

做好准备,迎接所有的成功吧 文章目录 1. Cookie的概念2. Session的概念3. Cookie与Session的关联与区别3.1 关联3.2 区别 4. Cookie与Session中的核心方法 1. Cookie的概念 Cookie是用户首次登陆网站成功之后,对应页面的服务器会返回给用户一个身份标识,被保存在用户主机的硬盘…

无人机视频与GIS融合三维实景怎么实现?

无人机视频与GIS融合三维实景怎么实现?无人机三维GIS作为一项新兴的测绘重要手段&#xff0c;具有续航时间长、成本低、机动灵活等优点&#xff0c;为城市的规划建设带来极大便利。 那么此项技术有什么样的特点呢?下面智汇云舟就带大家一起来了解一下。 三维是将采集以及经运…

leetcode 1416. Restore The Array(恢复数组)

一台打印机没有把空格打印出来&#xff0c;以至于不知道打印出的 s 中到底有哪些数字。 现在知道数字的取值范围在1 ~ k, 数字开头不能是0. 返回可能的数字个数。取模1097. 思路&#xff1a; DP 假设dp[ i ]为 i ~ n位的s 所能组成的数字组合数。 从右到左遍历&#xff0c;…

【云原生进阶之容器】第六章容器网络6.7.1--阿里云Terway网络模式综述

《云原生进阶之容器》专题索引: 第一章Docker核心技术1.1节——Docker综述第一章Docker核心技术1.2节——Linux容器LXC第一章Docker核心技术1.3节——命名空间Namespace第一章Docker核心技术1.4节——chroot技术第一章Docker核心技术1.5.1节——cgroup综述

如何在在一个账户内管理多个WhatsApp号

许多企业拥有多个WhatsApp Business账户。这可能是因为他们在多个地点都有商店&#xff0c;或者可能在全球范围内都有客户&#xff0c;并希望用当地语言迎合他们每个人。 无论出于何种原因&#xff0c;管理多个WhatsApp企业帐户既耗时又困难。但是&#xff0c;如果我们说有一种…

LoadRunner参数化最佳实践:让你的性能测试更加出色!

距离上次使用loadrunnr 已经有一年多的时间了。初做测试时在项目中用过&#xff0c;后面项目中用不到&#xff0c;自己把重点放在了工具之外的东西上&#xff0c;认为性能测试不仅仅是会用工具&#xff0c;最近又想有一把好的利器毕竟可以帮助自己更好的完成性能测试工作。这算…