无锁队列 SPSC

news2025/1/11 6:14:10

无锁队列 SPSC Queueicon-default.png?t=N7T8https://www.cnblogs.com/sinkinben/p/17949761/spsc-queue

在多线程编程中,一个著名的问题是生产者-消费者问题 (Producer Consumer Problem, PC Problem)。

对于这类问题,通过信号量加锁 (https://www.cnblogs.com/sinkinben/p/14087750.html) 来设计 RingBuffer 是十分容易实现的,但欠缺性能。

考虑一个特殊的场景,生产者和消费者均只有一个 (Single Producer Single Consumer, SPSC),在这种情况下,我们可以设计一个无锁队列来解决 PC 问题。

0. Background

考虑以下场景:在一个计算密集型 (Computing Intensive) 和延迟敏感的 for 循环当中,每次循环结束,需要打印当前的迭代次数以及计算结果。

void matrix_compute()
{
    for (i = 0 to n)
    {
        // code of computing
        ...
        // print i and result of computing
        std::cout << ...
    }
}

在这种情况下,如果使用简单的 std::cout 输出,由于 I/O 的性质,将会造成严重的延迟 (Latency)。

一个直观的解决办法是:将 Log 封装为一个字符串,传递给其他线程,让其他线程打印该字符串,实现异步的 Logging 。

1. Lock-free SPSC Queue

此处使用一个 RingBuffer 来实现队列。

由于是 SPSC 型的队列,队列头部 head 只会被 Consumer 写入,队列尾部 tail 只会被 Producer 写入,所以 SPSC Queue 可以是无锁的,但需要保证写入的原子性。

template <class T> class spsc_queue
{
  private:
    std::vector<T> m_buffer;
    std::atomic<size_t> m_head;
    std::atomic<size_t> m_tail;
  public:
    spsc_queue(size_t capacity) : m_buffer(capacity + 1), m_head(0), m_tail(0) {}
    inline bool enqueue(const T &item);
    inline bool dequeue(T &item);
};

对于一个 RingBuffer 而言,判空与判满的方法如下:

  • Empty 的条件:head == tail
  • Full 的条件:(tail + 1) % N == head

因此,enqueue 和 dequeue 可以是以下的实现:

inline bool enqueue(const T &item)
{
    const size_t tail = m_tail.load(std::memory_order_relaxed);
    const size_t next = (tail + 1) % m_buffer.size();

    if (next == m_head.load(std::memory_order_acquire))
        return false;

    m_buffer[tail] = item;
    m_tail.store(next, std::memory_order_release);
    return true;
}

inline bool dequeue(T &item)
{
    const size_t head = m_head.load(std::memory_order_relaxed);

    if (head == m_tail.load(std::memory_order_acquire))
        return false;

    item = m_buffer[head];
    const size_t next = (head + 1) % m_buffer.size();
    m_head.store(next, std::memory_order_release);
    return true;
}

std::memory_order 的使用说明:std::memory_order - cppreference.com

Benchmark 计算 SPSC Queue 的吞吐量:

Mean:   29,158,897.200000 elements/s 
Median: 29,178,822.000000 elements/s 
Max:    29,315,199 elements/s 
Min:    28,995,515 elements/s 

Benchmark 的计算方法为:

  • Producer 和 Consumer 分别执行 1e8 次 enqueue 和 dequeue ,计算队列为空所耗费的总时间 t, 1e8 / t 即为吞吐量。
  • 上述过程执行 10 次,最终计算 mean, median, min, max 的值。

2. Remove cache false sharing

什么是 Cache False Sharing? 参考 Architecture of Modern CPU 的 Exercise 一节。

int *a = new int[1024]; 
void worker(int idx)
{
    for (int j = 0; j < 1e9; j++)
        a[idx] = a[idx] + 1;
}

考虑以下程序:

  • P1: 开启 2 线程,执行 worker(0), worker(1)
  • P2: 开启 2 线程,执行 worker(0), worker(16)

P2 的执行速度会比 P1 快,现代 CPU 的 Cache Line 大小一般为 64 字节,由于 a[0], a[1] 位于同一个 CPU Core 的同一个 Cache Line,每次写入都会带来数据竞争 (Data Race) ,触发缓存和内存的同步(参考 MESI 协议),而 a[0], a[16] 之间相差了 64 字节,不在同一个 Cache Line,所以避免了这个问题。

所以,对于上述的 SPSC Queue,可以进行以下改进:

template <class T>
class spsc_queue
{
private:
    std::vector<T> m_buffer;
    alignas(64) std::atomic<size_t> m_head;
    alignas(64) std::atomic<size_t> m_tail;
};

这里的 alignas(64) 实际上改为 std::hardware_constructive_interference_size 更加合理,因为 Cache Line 的大小取决于具体 CPU 硬件的实现,并不总是为 64 字节。

#ifdef __cpp_lib_hardware_interference_size
using std::hardware_constructive_interference_size;
using std::hardware_destructive_interference_size;
#else
// 64 bytes on x86-64 │ L1_CACHE_BYTES │ L1_CACHE_SHIFT │ __cacheline_aligned │ ...
constexpr std::size_t hardware_constructive_interference_size = 64;
constexpr std::size_t hardware_destructive_interference_size = 64;
#endif

Benchmark 结果:

Mean:   38,993,940.400000 elements/s 
Median: 39,027,123.000000 elements/s 
Max:    39,253,946 elements/s 
Min:    38,624,197 elements/s 

3. Remove useless memory access

在使用 spsc_queue 的时候,通常会有以下形式的代码:

spsc_queue sq(1024);
// Producer keep spinning
int x = 233;
while (!sq.enqueue(x)) {}

而在 dequeue/enqueue 中,存在判空/判满的代码:

inline bool enqueue(const T &item)
{
    const size_t tail = m_tail.load(std::memory_order_relaxed);
    const size_t next = (tail + 1) % m_buffer.size();
    if (next == m_head.load(std::memory_order_acquire))
        return false;
    // ...
}

每次执行 m_head.load,Producer 线程的 CPU 都会访问一次 m_head 所在的内存,但实际上触发该条件的概率较小(因为在实际的场景下, Producer/Consumer 都是计算密集型,否则根本不需要无锁的数据结构)。在判空/判满的时候,可以去 “离 CPU 更近” 的 Cache 去获取 m_head 的值。

template <class T>
class spsc_queue
{
private:
    std::vector<T> m_buffer;
    alignas(hardware_constructive_interference_size) std::atomic<size_t> m_head;
    alignas(hardware_constructive_interference_size) std::atomic<size_t> m_tail;

    alignas(hardware_constructive_interference_size) size_t cached_head;
    alignas(hardware_constructive_interference_size) size_t cached_tail;
};

inline bool enqueue(const T &item)
{
    const size_t tail = m_tail.load(std::memory_order_relaxed);
    const size_t next = (tail + 1) % m_buffer.size();

    if (next == cached_head)
    {
        cached_head = m_head.load(std::memory_order_acquire);
        if (next == cached_head)
            return false;
    }
    // ...
}

Benchmark 结果:

Mean:   79,740,671.300000 elements/s 
Median: 79,838,314.000000 elements/s 
Max:    80,044,793 elements/s 
Min:    79,241,180 elements/s 

4. Summary

Github: GitHub - sinkinben/lock-free-queue: Lock free spsc-queue (single producer and single consumer).

3 个版本的 spsc_queue 的吞吐量比较(均值,中位数,最大值,最小值)。在优化 Cache False Sharing 和优先从 Cache 读取 head, tail 之后,可得到 x2 的提升。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1363838.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

06、Kafka ------ 各个功能的作用解释(ISR 同步副本、非同步副本、自动创建主题、修改主题、删除主题)

目录 CMAK 各个功能的作用解释★ ISR副本 (同步副本&#xff09;★ 非同步副本★ 自动创建主题★ 修改主题★ 删除主题 CMAK 各个功能的作用解释 ★ ISR副本 (同步副本&#xff09; 简单来说 &#xff0c;ISR 副本 就是 Kafka 认为与 领导者副本 同步的副本。 ISR&#xff0…

基于黄金正弦算法优化的Elman神经网络数据预测 - 附代码

基于黄金正弦算法优化的Elman神经网络数据预测 - 附代码 文章目录 基于黄金正弦算法优化的Elman神经网络数据预测 - 附代码1.Elman 神经网络结构2.Elman 神经用络学习过程3.电力负荷预测概述3.1 模型建立 4.基于黄金正弦优化的Elman网络5.测试结果6.参考文献7.Matlab代码 摘要&…

vue+nodejs微信小程序基于uniapp的学生宿舍打卡失物招领管理系统

基于微信的宿舍管理系统的设计基于现有的手机&#xff0c;可以实现等功能。方便用户对宿舍管理系统查看个人中心、失物招领管理、失物认领管理、晚归打卡管理、宿舍信息管理、宿舍更新管理、交流论坛、系统管理等功能模块的管理及详细的设计与统计分析。根据系统功能需求建立的…

JS手写apply,call,bind函数

本篇文章咱们来手写简易版的apply&#xff0c;call&#xff0c;bind函数。 实现思路 首先咱们需要思考下这三个函数放到哪里比较合适&#xff0c;因为这三个函数是被函数对象调用的&#xff0c;并且每个函数都可以调用&#xff0c;所以不难想到有一个位置非常合适&#xff0c;…

【每日论文阅读】生成模型篇

联邦多视图合成用于元宇宙 标题: Federated Multi-View Synthesizing for Metaverse 作者: Yiyu Guo; Zhijin Qin; Xiaoming Tao; Geoffrey Ye Li 摘要: 元宇宙有望提供沉浸式娱乐、教育和商务应用。然而&#xff0c;虚拟现实&#xff08;VR&#xff09;在无线网络上的传输是…

HNU-数据库系统-作业

数据库系统-作业 计科210X 甘晴void 202108010XXX 第一章作业 10.09 1.(名词解释)试述数据、数据库、数据库管理系统、数据库系统的概念。 数据&#xff0c;是描述事物的符号记录。 数据库&#xff08;DB&#xff09;&#xff0c;是长期存储在计算机内、有组织、可共享的大量…

windows安装nvm以及nvm常用命令

目录 1.什么是nvm以及为啥要用nvm 1.什么是nvm 2.为什么要用nvm 2.安装nvm 1. 下载 2. 安装 1.双击解压后的文件,nvm-setup.exe 2.同意 3.安装路径 4.下一步&#xff0c;这里有建议改成自己的文件夹&#xff0c;这个是用来存储通过nvm切换node后版本的存储路径 5.安装…

基础面试题整理2

1.抽象类与接口区别 语法&#xff1a; 抽象类用abstract定义&#xff1b;接口用interface定义抽象类被子类继承extends&#xff08;不可用final修饰&#xff09;&#xff1b;接口被类实现implements抽象类的属性访问无限制,方法不可用private修饰&#xff1b;接口中的方法只能…

DQL命令查询数据(三)

本课目标 掌握MySQL的多表查询 SQL语句的综合应用 多表连接查询 通过各个表之间共同列的关联性&#xff08;例如&#xff1a;外键&#xff09;来查询的 分类&#xff1a; 内连接(INNER JOIN) &#xff0c;可简写为 JOIN&#xff1b;左外连接(LEFT OUTER JOIN)&#xff0c;…

2023全球软件研发技术大会(SDCon2023)-核心PPT资料下载

一、峰会简介 本次峰会包含12大会议主题&#xff1a;云原生设施与平台、微服务架构实践、软件质量与效能、大数据实践与前沿、架构设计与演进、高可用与高性能架构、Web与大前端开发、编程语言与平台、AIGC与大模型、推荐系统实践、AI智能应用与研究、机器学习架构实践。 软件…

SpringCloud系列篇:核心组件之声明式HTTP客户端组件【远程消费】

&#x1f973;&#x1f973;Welcome Huihuis Code World ! !&#x1f973;&#x1f973; 接下来看看由辉辉所写的关于SpringCloud的相关操作吧 目录 &#x1f973;&#x1f973;Welcome Huihuis Code World ! !&#x1f973;&#x1f973; 一. 远程消费组件是什么 二. 远程消…

一个简单的KNN实现方法

对于许多离散问题&#xff0c;经过神经网络解决再通过softmax之后每一个值在[0,1]之间的连续变量&#xff0c;想要将其离散化&#xff0c;即离散化到每个元素都是 binary-variable&#xff0c;即 0-1 &#xff0c;这时可以用KNN方法&#xff0c;其实就是找到与这个向量的方差最…

JavaWeb——新闻管理系统(Jsp+Servlet)之jsp新闻修改

java-ee项目结构设计 1.dao:对数据库的访问&#xff0c;实现了增删改查 2.entity:定义了新闻、评论、用户三个实体&#xff0c;并设置对应实体的属性 3.filter&#xff1a;过滤器&#xff0c;设置字符编码都为utf8&#xff0c;防止乱码出现 4.service:业务逻辑处理 5.servlet:处…

力扣刷题记录(29)LeetCode:695、1020、130

695. 岛屿的最大面积 这道题和计算岛屿周长类似&#xff0c;在这里dfs的功能就是由一块陆地出发&#xff0c;找出这块陆地所在的岛屿并返回岛屿面积。 class Solution { public:int dfs(vector<vector<int>>& grid,int i,int j){if(i<0||i>grid.size())…

kannegiesser触摸屏维修CTT-11 4PP420.1043-K37

贝加莱触摸屏维修4PP420.1043-K37 kannegiesser工控机触摸屏维修CTT-11 工控机触摸屏维修常见故障现象 1、工控机开机有显示&#xff0c;但是屏幕很暗&#xff0c;用调亮度功能键调试无任何变化&#xff1b; 2、工控机开机触摸屏白屏或花屏&#xff0c;但是外接显示器正常&a…

2024——剑之所至,所向披靡

目录 *年度总结导航 一.开篇——写在篇头 二.工作篇——心之所向 1.CSDN记录篇 1,1博客主页 ​编辑1.2 第一篇博文 1.3.产品测试 1.4C站获奖博文 1.5团队创建 2.腾讯云记录篇 2.1博主的主页 2.2 博主好文推荐 2.3腾讯云产品体验 三.励志篇——未来可期 2023年…

编程语言的未来:贴近人类、灵活高效与探索无限

编程语言的未来&#xff1f; 在当今的科技潮流中&#xff0c;编程语言是至关重要的一环&#xff0c;更是赋能科技行业的基础工具。我深信&#xff0c;未来的编程语言可能将朝着更贴近人类、灵活高效和面向无限可能的方向发展。 人性化是我预期的第一个趋势。未来的编程语言将…

java CAS

CAS 在高并发场景&#xff0c;可以使用加锁 或者CAS来保证原子性&#xff0c;但是加锁是很重量级的操作&#xff0c;CAS类似于乐观锁CAS &#xff08; Compare and swap &#xff09;比较并交换&#xff0c;是实现并发算法时常用到的技术&#xff0c;包含三个操作数&#xff1…

MySql海量数据存储与优化

一、Mysql架构原理和存储机制 1.体系结构 2.查询缓存 3.存储引擎 存储引擎的分类 innodb&#xff1a;支持事务&#xff0c;具有支持回滚&#xff0c;提交&#xff0c;崩溃恢复等功能&#xff0c;事务安全myisam:不支持事务和外键&#xff0c;查询速度高Memory&#xff1a;利…

网络安全学习资源

好久没写博客了&#xff0c;记录一些宝藏学习资源&#xff0c;不定时更新 Regex Learn - Step by step, from zero to advanced. 这是一个我认为最好的正则表达式学习网站&#xff0c;很多正则表达式学习资料都只提供了一个概念&#xff0c;但是正则表达式需要大量的练习&#…