无锁队列--知识分享

news2025/4/18 16:53:17

目录

无锁队列

        无锁队列是什么

         为什么需要无锁队列

        队列的类型 

         无锁队列的分类        

ringbuffer(SPSC)

ret_ring(MPMC)


无锁队列

        无锁队列是什么
  •         无锁队列通过原子操作来实现线程安全的队列,属于非阻塞队列
  •         有锁队列通过互斥锁或其他同步机制保证线程安全的队列,属于阻塞队列 
         为什么需要无锁队列

        锁的局限:

  •         线程阻塞带来的切换
  •         死锁风险
  •         性能瓶颈,高并发下锁竞争激烈,吞吐量下降

        队列的类型 

        无锁队列

  •         无锁(lock-free):保证了至少有一个线程在正常运行,其他可能重试,依赖CAS 等原子操作。
  •         无等待(wait-free):所有线程必成功,无重试,依赖exchange等原子操作。

         有锁队列

  •          阻塞队列(blocking-queue):当队列为空时,从队列中获取元素的操作会被阻塞,直到队列中有新的元素加入;当队列已满时,向队列中添加元素的操作会被阻塞,直到队列中有元素被移除,依赖锁实现。
         无锁队列的分类        

        SPSC(单生产者,单消费者)

        含义:单生产者单消费者队列,即队列只有一个生产者线程和一个消费者线程。

        特点:不存在多个生产者或消费者之间的竞争,所以可以使用比较简单高效的算法来实现。

        

        MPSC(多生产者,单消费者)

        含义:多生产者单消费者队列,多个线程可以同时向队列中添加元素,但只有一个线程可以从队列中取出元素。

        特点:实现时需要重点处理多个生产者之间的并发问题,确保多个生产者能够安全地向队列中添加元素。而对于消费者线程,由于是唯一的,处理相对简单。

        SPMC(单生产者,多消费者)

        含义:单生产者多消费者队列,只有一个线程可以向队列中添加元素,但是有多个线程可以同时从队列中取出元素。

        特点:需要处理多个消费者之间的并发问题,保证多个消费者能够安全地从队列中取出元素。同时,要确保生产者在添加元素时不会受到消费者的干扰。

        

        MPMC(多生产者,多消费者)

        含义:多生产者多消费者队列,意味着有多个线程可以同时向队列中添加元素(生产者),也有多个线程可以同时从队列中取出元素(消费者)。

        特点:由于多个生产者和消费者可能会同时访问队列,因此需要更复杂的并发控制机制来保证线程安全。通常会使用 CAS 等原子操作来处理多个线程对队列的并发访问,避免数据竞争和不一致的问题。

ringbuffer(SPSC)

#pragma once

// SPSC
#include <atomic>
#include <cstddef>
#include <type_traits>

template<typename T, std::size_t Capacity>
class RingBuffer
{
public:
    static_assert(Capacity && !(Capacity & (Capacity - 1)), "Capacity must be power of 2");
    RingBuffer() : read_(0), write_(0) {}

    ~RingBuffer() {
        std::size_t r = read_.load(std::memory_order_relaxed);
        std::size_t w = write_.load(std::memory_order_relaxed);
        while (r != w) {
            reinterpret_cast<T *>(&buffer_[r])->~T();
            r = (r + 1) & (Capacity - 1);
        }
    }

    // 这里使用万能引用和完美转发,支持左值和右值
    template<typename U>
    bool Push(U && value) {
        const std::size_t w = write_.load(std::memory_order_relaxed);
        const std::size_t next_w = (w + 1) & (Capacity - 1);
        // 检查缓冲区是否满
        if (next_w == read_.load(std::memory_order_acquire)) {
            return false;
        }
        new (&buffer_[w]) T(std::forward<U>(value));
        write_.store(next_w, std::memory_order_release);
        return true;
    }

    bool Pop(T & value) {
        const std::size_t r = read_.load(std::memory_order_relaxed);
        // 检查缓冲区是否空
        if (r == write_.load(std::memory_order_acquire)) {
            return false;
        }

        // 取出元素并析构
        value = std::move(*reinterpret_cast<T *>(&buffer_[r]));
        reinterpret_cast<T *>(&buffer_[r])->~T();
        read_.store((r + 1) & (Capacity - 1), std::memory_order_release);
        return true;
    }

    std::size_t Size() const {
        const std::size_t r = read_.load(std::memory_order_acquire);
        const std::size_t w = write_.load(std::memory_order_acquire);
        return (w >= r) ? (w - r) : (Capacity - r + w);
    }


private:
//cache line  64B
    alignas(64) std::atomic<std::size_t> read_;
    alignas(64) std::atomic<std::size_t> write_;
    alignas(64) std::aligned_storage_t<sizeof(T), alignof(T)> buffer_[Capacity];  // 支持 pod 和 非 pod 类型
};
ret_ring(MPMC)
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <pthread.h>

#include "rte_ring.h"

#define RING_SIZE 16<<20

typedef struct cc_queue_node {
    int data;
} cc_queue_node_t;

static struct rte_ring *r;

typedef unsigned long long ticks;

static __inline__ ticks getticks(void)
{
    u_int32_t a, d;

    asm volatile("rdtsc" : "=a" (a), "=d" (d));
    return (((ticks)a) | (((ticks)d) << 32));
}


void *enqueue_fun(void *data)
{
    int n = (int)data;
    int i = 0;
    int ret;
    cc_queue_node_t *p;

    for (; i < n; i++) {
        p = (cc_queue_node_t *)malloc(sizeof(cc_queue_node_t));
        p->data = i;
        ret = rte_ring_mp_enqueue(r, p);
        if (ret != 0) {
            printf("enqueue failed: %d\n", i);
        }
    }

    return NULL;
}

void *dequeue_func(void *data)
{
    int ret;
    int i = 0;
    int sum = 0;
    int n = (int)data;
    cc_queue_node_t *p;
    ticks t1, t2, diff;
    //return;

    t1 = getticks();
    while (1) {
        p = NULL;
        ret = rte_ring_sc_dequeue(r, (void **)&p);
        if (ret != 0) {
            //do something
        }
        if (p != NULL) {
            i++;
            sum += p->data;
            free(p);
            if (i == n) {
                break;
            }
        }
    }

    t2 = getticks();
    diff = t2 - t1;
    printf("time diff: %llu\n", diff);
    printf("dequeue total: %d, sum: %d\n", i, sum);

    return NULL;
}


int main(int argc, char *argv[])
{
    int ret = 0;
    pthread_t pid1, pid2, pid3, pid4, pid5, pid6;
    pthread_attr_t pthread_attr;

    r = rte_ring_create("test", RING_SIZE, 0);

    if (r == NULL) {
        return -1;
    }

    printf("start enqueue, 5 producer threads, echo thread enqueue 1000 numbers.\n");

    pthread_attr_init(&pthread_attr);
    if ((ret = pthread_create(&pid1, &pthread_attr, enqueue_fun, (void *)1000)) == 0) {
        pthread_detach(pid1);
    }

    if ((ret = pthread_create(&pid2, &pthread_attr, enqueue_fun, (void *)1000)) == 0) {
        pthread_detach(pid2);
    }

    if ((ret = pthread_create(&pid3, &pthread_attr, enqueue_fun, (void *)1000)) == 0) {
        pthread_detach(pid3);
    }
    
    if ((ret = pthread_create(&pid4, &pthread_attr, enqueue_fun, (void *)1000)) == 0) {
        pthread_detach(pid4);
    }

    if ((ret = pthread_create(&pid5, &pthread_attr, enqueue_fun, (void *)1000)) == 0) {
        pthread_detach(pid5);
    }

    printf("start dequeue, 1 consumer thread.\n");

    if ((ret = pthread_create(&pid6, &pthread_attr, dequeue_func, (void *)5000)) == 0) {
        //pthread_detach(pid6);
    }
    
    pthread_join(pid6, NULL);

    rte_ring_free(r);

    return 0;
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2336571.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

2025低代码平台选型策略:ROI导向下的功能与成本权衡

在当今快速变化的商业环境中&#xff0c;企业面临着前所未有的挑战与机遇。数字化转型已成为企业提升竞争力的关键&#xff0c;而软件开发的高成本和长周期无疑是实现这一转型的绊脚石。 低代码平台的兴起&#xff0c;为企业提供了一种高效、灵活的解决方案&#xff0c;使得非…

Redis的IO多路复用

1 传统的socket编码模型 传统 Socket 模型通常采用 多线程/多进程 或 阻塞 I/O 的方式处理网络请求。以下是典型实现步骤&#xff1a; 创建套接字&#xff08;Socket&#xff09; 步骤&#xff1a;调用 socket() 创建一个 TCP/UDP 套接字。通常把这个套接字称为【主动套接字】…

基于YOLOv9的课堂行为检测系统

基于YOLOv9的课堂行为检测系统 项目概述 本项目是一个基于YOLOv9深度学习模型的课堂行为检测系统&#xff0c;旨在通过计算机视觉技术自动识别和监测课堂中学生的各种行为状态&#xff0c;帮助教师更好地了解课堂教学效果。 项目结构 课堂行为检测/ ├── data/ │ ├──…

端、管、云一体化原生安全架构 告别外挂式防护!

面对数字化转型浪潮&#xff0c;企业网络安全风险日益凸显。数据泄露、黑客勒索等事件频发&#xff0c;合规要求加速推进。尽管企业纷纷部署了防病毒、身份认证、文件加密、入侵防护、流量监控等多种安全系统&#xff0c;但分散且孤立的架构非但没有有效抵御风险&#xff0c;反…

BI面向模型开发和面向报表开发,有什么区别?

在数字化时代&#xff0c;商业智能&#xff08;BI&#xff09;已成为企业决策不可或缺的工具。BI项目实施时&#xff0c;通常有两种开发模式&#xff1a;面向模型开发和面向报表开发。虽然两者都旨在通过数据驱动决策&#xff0c;但在开发逻辑、目标价值和技术路径上存在显著差…

进程控制(上)【Linux操作系统】

进程控制 写时拷贝 本质是一种减少深拷贝的方法 Linux中有很多拷贝的场景都用得上写时拷贝&#xff0c;下面以创建子进程时的写时拷贝为例&#xff1a; 子进程被创建的时候&#xff1a; 会继承父进程的mm_struct和页表 所以子进程刚刚继承时&#xff0c;父子进程的代码和数据…

5G网络下客户端数据业务掉线频繁

上层应用的日志和界面在待机状态下&#xff08;即没有做通话等业务操作&#xff09;&#xff0c;会频繁提示“离线”。 主要先看有没有丢网&#xff0c;UL BLER有没有问题。确认没有问题。看到业务信道释放后也可以成功重新建链。所以以为这个只是终端业务进入dormant态的提示…

【Docker项目实战】使用Docker部署Gitblit服务器

【Docker项目实战】使用Docker部署Gitblit服务器 一、Gitblit介绍1.1 Gitblit 介绍1.2 主要特点 二、本次实践规划2.1 本地环境规划2.2 本次实践介绍 三、本地环境检查3.1 检查Docker服务状态3.2 检查Docker版本3.3 检查docker compose 版本 四、下载Gitblit镜像五、部署Gitbli…

Vitis: 使用自定义IP时 Makefile错误 导致编译报错

参考文章: 【小梅哥FPGA】 Vitis开发中自定义IP的Makefile路径问题解决方案 Vitis IDE自定义IP Makefile错误&#xff08;arm-xilinx-eabi-gcc.exe: error: *.c: Invalid argument&#xff09;解决方法 Vitis 使用自定义IP时: Makefile 文件里的语句是需要修改的&#xff0c;…

helm的go模板语法学习

1、helm chart 1.0、什么是helm&#xff1f; 介绍&#xff1a;就是个包管理器。理解为java的maven、linux的yum就好。 安装方法也可参见官网&#xff1a; https://helm.sh/docs/intro/install 通过前面的演示我们知道&#xff0c;有了helm之后应用的安装、升级、查看、停止都…

AI 语音公司 ElevenLabs 进军亚太市场设立东京子公司;EverTutor Live :语音交互 AI 教育平台丨日报

开发者朋友们大家好&#xff1a; 这里是 「RTE 开发者日报」 &#xff0c;每天和大家一起看新闻、聊八卦。我们的社区编辑团队会整理分享 RTE&#xff08;Real-Time Engagement&#xff09; 领域内「有话题的 技术 」、「有亮点的 产品 」、「有思考的 文章 」、「有态度的 观…

STM32启动流程详解

STM32启动流程详解 本文档详细介绍STM32微控制器从上电到main函数执行的完整启动流程。 1. 上电与复位过程 当STM32芯片上电或复位时&#xff0c;硬件会执行以下步骤&#xff1a; 上电复位(POR)/低电平复位(PDR): 芯片接通电源或NRST引脚置低时触发初始PC值设置: 程序计数器…

Langchain + Gemini API调用基本操作

本文参考Langchain中ChatGoogleGenerativeAI的官方文档&#xff0c;在本地的jupyter notebook中运行。 关于API的细节在官方文档最开头给出&#xff1a; 我们在使用时&#xff0c;可以选择model"gemini-2.0-flash-001"或者生成图片的ChatGoogleGenerativeAI(model“…

【数据结构】4.单链表实现通讯录

在上一篇文章我们学会了用单链表来实现各种方法&#xff0c;在这一篇文章我们将在单链表的基础上实现通讯录。 0、准备工作 实现通讯录之前&#xff0c;我们还需要在单链表的基础上添加2个文件&#xff0c;头文件Contact.h和源文件Contact.c。Contact.c来实现通讯录方法的声明…

接口自动化测试(一)

一、HTTP请求的核心概念及原理详解 HTML:超文本标记语言-----通过<标记符>内容</标记符>格式-------页面 URL:统一资源定位符 返回数据有很多&#xff1a;页面、图片、视频&#xff0c;都可以进行返回---统称为&#xff1a;资源HTTP:超文本传输协议(请求-响应的协…

【JavaEE】Spring AOP的注解实现

目录 一、AOP 与 Spring AOP二、Spring AOP简单实现三、详解Spring AOP3.1 Spring AOP 核心概念3.1.1 切点&#xff08;Pointcut&#xff09;3.1.2 连接点&#xff08;Join Point&#xff09;3.1.3 通知&#xff08;Advice&#xff09;3.1.4 切面&#xff08;Aspect&#xff09…

揭秘大数据 | 22、软件定义存储

揭秘大数据 | 19、软件定义的世界-CSDN博客 揭秘大数据 | 20、软件定义数据中心-CSDN博客 揭秘大数据 | 21、软件定义计算-CSDN博客 老规矩&#xff0c;先把这个小系列的前三篇奉上。今天书接上文&#xff0c;接着叙软件定义存储的那些事儿。 软件定义存储源于VMware公司于…

OpenCV 图形API(37)图像滤波-----分离过滤器函数sepFilter()

操作系统&#xff1a;ubuntu22.04 OpenCV版本&#xff1a;OpenCV4.9 IDE:Visual Studio Code 编程语言&#xff1a;C11 算法描述 应用一个可分离的线性滤波器到一个矩阵&#xff08;图像&#xff09;。 该函数对矩阵应用一个可分离的线性滤波器。也就是说&#xff0c;首先&a…

flutter下载SDK环境配置步骤详解

目录 1.Flutter官网地址、SDK下载地址? 1.1 选择你电脑的系统​ 2.配置环境 3.解决环境报错 zsh:command not found:flutter 1.Flutter官网地址、SDK下载地址? flutter官网地址: URL 1.1 选择你电脑的系统 下载解压动目录就OK了 2.配置环境 1、打开命令行&#xf…

数据结构与算法入门 Day 0:程序世界的基石与密码

&#x1f31f;数据结构与算法入门 Day 0&#xff1a;程序世界的基石与密码&#x1f511; ps&#xff1a;接受到了不少的私信反馈&#xff0c;说应该先把前置的知识内容做一个梳理&#xff0c;所以把昨天的文章删除了&#xff0c;重新开启今天的博文写作 Hey 小伙伴们&#xff…