无锁队列实现及使用场景

news2024/12/24 8:13:30

写在前面

在看无锁队列之前,我们先来看看看队列的操作。队列是一种非常重要的数据结构,其特性是先进先出(FIFO),符合流水线业务流程。在进程间通信、网络通信间经常采用队列做缓存,缓解数据处理压力。根据队列操作的场景可分为以下4种:
在这里插入图片描述
无锁队列应用场景

如果你的业务,数据量不大,一秒只需要处理几百上千的数据,就没必要用无锁队列了。当需要处理的数据非常多,比如,每秒需要处理几十万条数据时,可以考虑用无锁队列。

一、有锁和无锁

实际上有锁和无锁,就是我们平时所说的乐观锁和悲观锁:

  1. 悲观锁:一种悲观的加锁策略,它认为每次访问共享资源的时候,总会发生冲突,所以宁愿牺牲性能(时间)来保证数据安全;
  2. 乐观锁:是一种乐观的策略,它假设线程访问共享资源不会发生冲突,所以不需要加锁,因此线程将不断执行,不需要停止。一旦碰到冲突,就重试当前操作直到没有冲突为止。

无锁通过(CAS Compare And Swap)技术来实现,CAS是原⼦操作的⼀种,可⽤于在多线程编程中实现不被打断的数据交换操作,从⽽避免多线程同时改写某⼀数据时由于执⾏顺序不确定性以及中断的不可预知性产⽣的数据不⼀致问题。

bool CAS( int * pAddr, int nExpected, int nNew )
{
	 if ( *pAddr == nExpected ) {
		 *pAddr = nNew ;
		 return true ;
	 }
	 else{
	 	 return false ;
	 }
}

工作原理:将pAddr地址中的元素与nExpected比较,如果相等,则更新pAddr的值为nNew,并返回true;否则返回false。

二、无锁队列的优势

上面我们提到,在数据量小的时候直接采用加锁的方式,实现资源的排他性访问即可。但是加锁的缺点也很明显:

  1. CPU会将大量的时间用在锁的维护上,而不是数据处理;
  2. 在线程之间切换的时候,导致Cache中的数据失效。CPU访问Cache的速度是远大于内存的,所以需要尽量减少线程频繁切换 。

三、无锁队列实现

3.1 zmq实现

zmq实现无锁队列,只要在ypipe.hpp 和 yqueue.hpp 两个文件,适用于一读一写的场景。

3.1.1 yqueue.hpp

template <typename T, int N>
class yqueue_t
{
	......
	
private:
    //  Individual memory chunk to hold N elements.
    // 链表结点称之为chunk_t
    struct chunk_t
    {
        T values[N]; //每个chunk_t可以容纳N个T类型的元素,以后就以一个chunk_t为单位申请内存
        chunk_t *prev;
        chunk_t *next;
    };

    chunk_t *begin_chunk; // 链表头结点
    int begin_pos;        // 起始点
    chunk_t *back_chunk;  // 队列中最后一个元素所在的链表结点
    int back_pos;         // 尾部
    chunk_t *end_chunk;   // 拿来扩容的,总是指向链表的最后一个结点
    int end_pos;
    
    atomic_ptr_t<chunk_t> spare_chunk; //空闲块(把所有元素都已经出队的chunk,称为空闲块),读写线程的共享变量
};

yqueue_t 结构,内部由⼀个⼀个chunk组成,每个chunk保存N个元素。chunk之间通过指针连接。

在这里插入图片描述

当队列空间不⾜时每次分配⼀个chunk_t,每个chunk_t能存储N个元素。
在数据出队列后,队列有多余空间的时候,回收的chunk也不是⻢上释放,⽽是根据局部性原理先回收到spare_chunk⾥⾯,当再次需要分配chunk_t的时候从spare_chunk中获取。

yqueue_t内部有三个chunk_t类型指针以及对应的索引位置:

  1. begin_chunk/begin_pos:begin_chunk⽤于指向队列头的chunk,begin_pos⽤于指向队列第⼀个元素在当前chunk中的位置。
  2. back_chunk/back_pos:back_chunk⽤于指向队列尾的chunk,back_po⽤于指向队列最后⼀个元素在当前chunk的位置。
  3. end_chunk/end_pos:由于chunk是批量分配的,所以end_chunk⽤于指向分配的最后⼀个chunk位置

这⾥特别需要注意区分back_chunk/back_pos和end_chunk/end_pos的作⽤:

  • back_chunk/back_pos:对应的是元素存储位置;
  • end_chunk/end_pos:决定是否要分配chunk或者回收chunk。

yqueue_t 构造函数

    inline yqueue_t()
    {
        begin_chunk = (chunk_t *)malloc(sizeof(chunk_t));
        alloc_assert(begin_chunk);
        begin_pos = 0;
        back_chunk = NULL; //back_chunk总是指向队列中最后一个元素所在的chunk,现在还没有元素,所以初始为空
        back_pos = 0;
        end_chunk = begin_chunk; //end_chunk总是指向链表的最后一个chunk
        end_pos = 0;
    }

说明:
采用chunk的机制,减少内存分配次数,采用spark_chunk的机制回收读完的chunk,充分利用局部性原理,提升性能。
end_chunk 总是指向最后分配的chunk,刚分配出来的chunk,end_pos也是0
back_chunk 在插入元素时才会指向对应的chunk,初始化的是是指向NULL的。
在这里插入图片描述

ront、back函数


inline T &front() // 返回的是引⽤,是个左值,调⽤者可以通过其修改容器的值
{
	return begin_chunk->values[begin_pos]; // 队列⾸个chunk对应的的begin_pos
}

inline T &back() // 返回的是引⽤,是个左值,调⽤者可以通过其修改容器的值
{
	return back_chunk->values[back_pos];
}

说明:
我们可以通过 begin_chunk->values[begin_pos] 获取到队列的头部,以及back_chunk->values[back_pos]获取到队列的尾部

Push函数

每次调用push,会更新 back_chunk 和 back_pos ,以及根据end_pos的值,决定是否需要重新分配chunk。

inline void push()
    {
        back_chunk = end_chunk;
        back_pos = end_pos; //

        if (++end_pos != N) //end_pos!=N表明这个chunk节点还没有满
            return;

        chunk_t *sc = spare_chunk.xchg(NULL); // 为什么设置为NULL? 因为如果把之前值取出来了则没有spare chunk了,所以设置为NULL
        if (sc)                               // 如果有spare chunk则继续复用它
        {
            end_chunk->next = sc;
            sc->prev = end_chunk;
        }
        else // 没有则重新分配
        {
            // static int s_cout = 0;
            // printf("s_cout:%d\n", ++s_cout);
            end_chunk->next = (chunk_t *)malloc(sizeof(chunk_t)); // 分配一个chunk
            alloc_assert(end_chunk->next);
            end_chunk->next->prev = end_chunk;  
        }
        end_chunk = end_chunk->next;
        end_pos = 0;
    }

重新分配规则如下:

  1. 如果 ++end_pos != N 说明当前chunk还有空间,直接返回
  2. 如果++end_pos == N 说明,当前chunk只有N-1的位置可用,需要再按分配一个chunk。 这个chunk 会先尝试从spare_chunk获取,如果spare_chunk为NULL,则需要重新分配。

pop函数

pop的时候begin_pos就会++,也就是会更新front的位置。当chunk 中的所有元素都被取出才会触发chunk回收机制。spare_chunk的操作要求是原子操作,因为读写线程都会访问spare_chunk。

inline void pop()
    {
        if (++begin_pos == N) // 删除满一个chunk才回收chunk
        {
            chunk_t *o = begin_chunk;
            begin_chunk = begin_chunk->next;
            begin_chunk->prev = NULL;
            begin_pos = 0;

            //  'o' has been more recently used than spare_chunk,
            //  so for cache reasons we'll get rid of the spare and
            //  use 'o' as the spare.
            chunk_t *cs = spare_chunk.xchg(o); //由于局部性原理,总是保存最新的空闲块而释放先前的空闲快
            free(cs);
        }
    }

3.2 ypipe_t

    inline ypipe_t()
    {
        //  Insert terminator element into the queue.
        queue.push(); //yqueue_t的尾指针加1,开始back_chunk为空,现在back_chunk指向第一个chunk_t块的第一个位置

        //  Let all the pointers to point to the terminator.
        //  (unless pipe is dead, in which case c is set to NULL).
        r = w = f = &queue.back(); //就是让r、w、f、c四个指针都指向这个end迭代器
        c.set(&queue.back());
    }

在ypipe对象创建的时候,更新yqueue的back_chunk的值,以及初始化 r w f c四个指针。
在这里插入图片描述

write函数

write的incomplete_标志位决定要不要更新f的位置

    inline void write(const T &value_, bool incomplete_)
    {
        //  Place the value to the queue, add new terminator element.
        queue.back() = value_;
        queue.push();

        //  Move the "flush up to here" poiter.
        if (!incomplete_)
        {
            f = &queue.back(); // 记录要刷新的位置
            // printf("1 f:%p, w:%p\n", f, w);
        }
        else
        {
            //  printf("0 f:%p, w:%p\n", f, w);
        }
    }

flush函数

flush的核心是更新c指针和w指针的位置

inline bool flush()
    {
        //  If there are no un-flushed items, do nothing.
        if (w == f) // 不需要刷新,即是还没有新元素加入
            return true;

        //  Try to set 'c' to 'f'.
        // read时如果没有数据可以读取则c的值会被置为NULL,如果c==null 说明read线程在 休眠,可以安全的设置c的值
        if (c.cas(w, f) != w) // 尝试将c设置为f,即是准备更新w的位置
        {

            //  Compare-and-swap was unseccessful because 'c' is NULL.
            //  This means that the reader is asleep. Therefore we don't
            //  care about thread-safeness and update c in non-atomic
            //  manner. We'll return false to let the caller know
            //  that reader is sleeping.
            c.set(f); // 更新w的位置
            w = f;
            return false; //线程看到flush返回false之后会发送一个消息给读线程,这需要写业务去做处理
        }
        else  // 读端还有数据可读取
        {
            //  Reader is alive. Nothing special to do now. Just move
            //  the 'first un-flushed item' pointer to 'f'.
            w = f;             // 只需要更新w的位置
            return true;
        }
    }

read函数

read会先采取预读的机制,判断有无数据可读,可读就填充到value_。核心就是 check_read函数。主要通过比较c和队头的位置判断是否有数据可读,如果可读返回的就是flush后的f的位置。

    inline bool check_read()
    {
        //  Was the value prefetched already? If so, return.
        if (&queue.front() != r && r) //判断是否在前几次调用read函数时已经预取数据了return true;
            return true;

        //  There's no prefetched value, so let us prefetch more values.
        //  Prefetching is to simply retrieve the
        //  pointer from c in atomic fashion. If there are no
        //  items to prefetch, set c to NULL (using compare-and-swap).
        // 两种情况
        // 1. 如果c值和queue.front()相等,返回旧c值,将c值置为NULL,说明此时没有数据可读
        //    当c==&queue.front()时,代表数据被取完了,这时把c指向NULL,接着读线程会睡眠,这也是给写线程检查读线程是否睡眠的标志。
        // 2. 如果c值和queue.front()不相等, 直接返回c值,此时可能有数据度的去
        r = c.cas(&queue.front(), NULL); //尝试预取数据

        //  If there are no elements prefetched, exit.
        //  During pipe's lifetime r should never be NULL, however,
        //  it can happen during pipe shutdown when items are being deallocated.
        if (&queue.front() == r || !r) //判断是否成功预取数据
            return false;

        //  There was at least one value prefetched.
        return true;
    }

    inline bool read(T *value_)
    {
        //  Try to prefetch a value.
        if (!check_read())
            return false;

        //  There was at least one value prefetched.
        //  Return it to the caller.
        *value_ = queue.front();
        queue.pop();
        return true;
    }

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/514836.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

device_node转换成platform_device

device_node转换成platform_device 文章目录 device_node转换成platform_device转换规则主要核心函数of_default_bus_match_tablearmarm64of_platform_register_reconfig_notifier Linux内核是如何将device_node转换成platform_deviceof_platform_populate函数处理根节点下的子…

在ubuntu连接Xlight FTP Server

一 在windows上搭建服务器 http://www.xlightftpd.com/download.htm 使用英文版&#xff0c;使防止在ubuntu中登录中文版时&#xff0c;显示乱码 新建用户和用户对应的服务器目录 如下所示&#xff0c;默认只有读权限 全都勾选 勾选完毕后的效果 在目录中放一个文件&#…

10款常用的原型设计工具,包含一键生成原型工具

原型图是产品设计师日常工作的“常客”&#xff0c;原型图软件也扮演着产品设计师的“武器”角色。 许多新产品设计师不知道如何选择原型图软件。本文盘点了10个优秀的原型图软件&#xff0c;让我们来看看。 1.即时设计 即时设计是一款免费的在线 UI 设计工具&#xff0c;无…

【Java数据结构】排序

排序 插入排序希尔排序选择排序堆排序冒泡排序快速排序序列的分割Hoare法挖坑法快慢指针法 优化1 - 三数取中优化2- 数据规模小时的插入 归并排序 插入排序 直接插入排序是一种简单的插入排序法&#xff0c;其基本思想是&#xff1a; 把待排序的记录按其关键码值的大小逐个插…

Jetson Orin环境安装Opencv+Cuda以及vscode环境配置

文章目录 一&#xff1a;Opencv Cuda源码的下载、编译1.卸载jetson上自带的无cuda加速Opencv2.安装Opencv依赖库3.下载 OpenCV 和 opencv_contrib 源码4.编译安装 OpenCV、opencv_contrib 二&#xff1a;Opencv 的环境配置三&#xff1a;Vscode 中的Opencv环境配置四&#xff…

系统分析师---系统建模相关高频考试知识点

系统规划---成本效益分析 评价信息系统经济效益常用的方法主要有成本效益分析法,投入产出分析法和价值工程方法。盈亏平衡法常用于销售定价; 可行性分析 系统规划是信息系统生命周期的第一个阶段,其任务是对企业的环境、目标以及现有系统的状况进行初步调查,根据企业目标…

张正友相机标定原理

相机标定 记录1.1 张正友相机标定相关 参考 记录 最小二乘法&#xff1a;A^T A x 0 奇异值分解的办法求解最小二乘法 因为可以假设标定板平面在世界坐标系Z0的平面上&#xff0c; 1.1 张正友相机标定相关 单目相机标定实现–张正友标定法(包含具体的实现以及C代码&#xff0…

《花雕学AI》ChatGPT Shortcut Chrome 扩展:让生产力和创造力加倍的 ChatGPT 快捷指令库

你是否想要与一个智能的对话伙伴聊天&#xff0c;或者让它帮你完成各种任务&#xff0c;如写作、编程、摘要、翻译等&#xff1f;如果是的话&#xff0c;你可能会对 ChatGPT 感兴趣。ChatGPT 是一个基于 GPT-3.5 的对话式人工智能&#xff0c;可以与用户进行自然、流畅、有趣的…

文件看不见了,内存还占着容量的找回教程

U盘文件突然不见了但还占用内存空间的解决方法 如果文件看不见了但内存占用仍然存在&#xff0c;可能是因为以下原因&#xff1a; 文件被隐藏。某些操作系统允许隐藏文件&#xff0c;这些文件只能在文件浏览器中被找到。 文件被损坏。如果文件损坏&#xff0c;它可能不会显示在…

Python图形化编程开源项目拼码狮PinMaShi

开源仓库 #项目地址 https://github.com/supercoderlee/pinmashi https://gitee.com/supercoderlee/pinmashiPinMaShi采用electron开发&#xff0c;图形化拖拽式编程有效降低编程难度&#xff0c;对Python编程的初学者非常友好&#xff1b;积木式编程加快Python程序的开发&…

黑马Redis笔记-高级篇

黑马Redis笔记-高级篇 1、Redis持久化&#xff08;解决数据丢失&#xff09;1.1 RDB持久化1.1.1 定义1.1.2 异步持久化bgsave原理 1.2 AOF持久化1.3 RDB和AOF比较 2、Redis主从&#xff08;解决并发问题&#xff09;2.1 搭建主从架构2.2 主从数据同步原理2.2.1 全量同步2.2.2 增…

基于哈里斯鹰算法优化的核极限学习机(KELM)分类算法 -附代码

基于哈里斯鹰算法优化的核极限学习机(KELM)分类算法 文章目录 基于哈里斯鹰算法优化的核极限学习机(KELM)分类算法1.KELM理论基础2.分类问题3.基于哈里斯鹰算法优化的KELM4.测试结果5.Matlab代码 摘要&#xff1a;本文利用哈里斯鹰算法对核极限学习机(KELM)进行优化&#xff0c…

【小梦C嘎嘎——启航篇】基本语法格式:namespace ?

基本语法格式&#xff1a;namespace &#xff1f;&#x1f60e; 前言&#x1f64c;namespace 是什么&#xff1f;namespace 的意义何在&#xff1f; 总结撒花&#x1f49e; &#x1f60e;博客昵称&#xff1a;博客小梦 &#x1f60a;最喜欢的座右铭&#xff1a;全神贯注的上吧&…

springboot + vue 部署 阿里云云服务器 ECS

安装所需文件 安装mysql5.7 下载MySQL的yum源配置 wget http://dev.mysql.com/get/mysql57-community-release-el7-11.noarch.rpm安装MySQL的yum源 yum -y install mysql57-community-release-el7-11.noarch.rpm使用yum方式安装MySQL5.7&#xff08;下载需要点时间&#xf…

【 断电延时继电器 电源监视 导轨安装 JOSEF约瑟 HJZS-E202 AC220V】

品牌&#xff1a;JOSEF约瑟型号&#xff1a;HJZS-E202名称&#xff1a;断电延时继电器额定电压&#xff1a;110、220VDC/AC&#xff1b;100VAC触点容量&#xff1a;250V/5A功率消耗&#xff1a;≤4.2W返回系数&#xff1a;10%额定电压 系列型号&#xff1a; HJZS-E202断电延时…

4.4 栈实现及其应用

目录 栈 顺序栈 创建栈: 清空栈: 判断栈是否空 &#xff1a; 进栈 : 出栈 : 取栈顶元素: 栈 栈是限制在一端进行插入操作和删除操作的线性表&#xff08;俗称堆栈&#xff09; 允许进行操作的一端称为“栈顶” 另一固定端称为“栈底” 当栈中没有元素时称为“空栈”…

Robbin负载均衡详解及实践---SpringCloud组件(三)

Robbin负载均衡详解及实践 一 为什么使用Robbin&#xff1f;二 Robbin概念三 负载均衡实践1.启动eureka客户端2.启动多个provider服务&#xff0c;注册到eureka3.在consumer端配置负载均衡参数 四 Robbin源码剖析 一 为什么使用Robbin&#xff1f; 在Eureka详解及实践—Spring…

SAS初识

1、SAS常用工作窗口 “结果”&#xff08;Result&#xff09;窗口——管理SAS程序的输出结果&#xff1b; “日志”&#xff08;Log&#xff09;窗口——记录程序的运行情况&#xff1b; “SAS资源管理器”&#xff08;Explore&#xff09;窗口&#xff1b; “输出”&#xff0…

详解vue中的Object.defineProperty

如果想要age遍历的话 就设置属性 打印出来 发现有可以枚举的属性age 参考课程&#xff1a; 011_尚硅谷Vue技术_Object.defineProperty_哔哩哔哩_bilibili // 1.Vue中的数据代理&#xff1a; // 通过Vm对象来代理data对象中属性的操作&#xff08;读/写&#xff09; // 2…

STL容器 —— list 了解、接口使用,以及模拟实现list(部分常用接口)

注意 &#xff1a; 以下所有文档都来源此网站 &#xff1a; http://cplusplus.com/ 一、vector的介绍及使用 list文档的介绍&#xff1a;https://cplusplus.com/reference/list/list/ 1. vector 的介绍 1. list是可以在常数范围内在任意位置进行插入和删除的序列式容器&…