C++:设计一个线程安全的队列

news2025/1/20 19:17:02

文章目录

    • 1. 目的
    • 2. 实现?验证!
      • makefile
      • Queue 类的 public 成员
      • 单元测试
    • 3. 实现 Queue 类的方案

在这里插入图片描述

1. 目的

串行的程序只用到单个 CPU 核心, 希望加速整个程序, 考虑使用多线程加速。典型情况下可以找到生产者、消费者,两个角色之间通过队列进行数据交互:

  • 生产者负责把数据放入队列Q
  • 消费者负责从队列取出数据Q

要求队列是线程安全的,即:不能有读写冲突等。

2. 实现?验证!

这里并不给出具体实现, 主要原因是网络上有太多的“实现”,也许很强大,但是否正确则有待验证,反倒是怎样验证正确性,总是被忽略:

  • 新手小白,或者“算法工程师”们,往往没怎么写过合格的单元测试
  • 验证也许只是粗略跑一下,Thread Sanitizer 这样的有力武器没有被用上

makefile

我是在 Linux 下验证的, 用的 makefile 如下, 重点是 tsan 的设定, 以及 gtest 的配置:

SANITIZER_OPTION=-fsanitize=thread -fno-omit-frame-pointer
#SANITIZER_OPTION=

all:
	clang++ test_queue.cpp -I. -g `pkg-config --cflags --libs gtest gtest_main` ${SANITIZER_OPTION}

Queue 类的 public 成员

template<typename T>
class Queue
{
public:
    Queue(unsigned int max_size = 0);
    ~Queue();
    void push(const T& elem);
    T pop();
    bool empty();
    size_t size();

其中:

  • Queue是模板类,这样可以支持任意数据类型作为队列元素(但队列中所有元素类型需要相同)
  • 所有成员函数都不能是 const 的, 尤其是 empty 和 size 函数, 原因是当前线程调用它们时,其他线程可能立即改变队列成员,需要 mutex 锁住, 对于 mutex 的操作导致函数不再是 const 的
  • 支持设定队列最大元素数量,如果没指定, 看似用0,实际表示“无限”

单元测试

如下是基于 GoogleTest 和 Queue 的 ADT 给出的单元测试代码。
如果你基于上述 Queue 类的定义, 能通过如下单元测试, 那么程序的正确性应该说比较高了。这部分代码的价值比 Queue 本身的价值要更高, 但往往被人们忽略:

#include <gtest/gtest.h>
#include <digimon/queue.hpp>
#include <shadow/queue.hpp>
#include <unistd.h>

using namespace digimon;
//using namespace Shadow;

TEST(Queue, SingleThread)
{
    Queue<int> q;
    EXPECT_EQ(q.empty(), true);

    q.push(1);
    q.push(2);
    EXPECT_EQ(q.empty(), false);
    
    int x = q.pop();
    EXPECT_EQ(x, 1);

    x = q.pop();
    EXPECT_EQ(x, 2);
}

class ThreadData
{
public:
    ThreadData() {}
    ThreadData(Queue<int>* _q, int _start, int _end) :
        q(_q), start(_start), end(_end)
    {}

public:
    Queue<int>* q;
    int start;
    int end;
};

class ConsumerThreadData
{
public:
    ConsumerThreadData(Queue<int>* _q, int _start, int _end) :
        q(_q), start(_start), end(_end), sum(0)
    {
        pthread_mutex_init(&mutex, NULL);
    }
    ~ConsumerThreadData()
    {
        pthread_mutex_destroy(&mutex);
    }

public:
    Queue<int>* q;
    int start;
    int end;
    int sum;
    pthread_mutex_t mutex;
};

static void* producer(void* _thread_data)
{
    ThreadData* thread_data = (ThreadData*)_thread_data;

    for (int i = thread_data->start; i < thread_data->end; i++)
    {
        thread_data->q->push(i);
    }

    return NULL;
}

TEST(Queue, MultiThread_MultiProducer)
{
    Queue<int> q;

    pthread_t t1;
    ThreadData thread_data1(&q, 0, 10);
    pthread_create(&t1, NULL, producer, (void*)&thread_data1);

    pthread_t t2;
    ThreadData thread_data2(&q, 0, 10);
    pthread_create(&t2, NULL, producer, (void*)&thread_data2);

    pthread_join(t1, NULL);
    pthread_join(t2, NULL);

    EXPECT_EQ(q.empty(), false);
    EXPECT_EQ(q.size(), 20);

    int sum = 0;
    while (!q.empty())
    {
        int x = q.pop();
        sum += x;
    }
    int expected_sum = 90;
    EXPECT_EQ(expected_sum, sum);
}

static void* consumer(void* _thread_data)
{
    ConsumerThreadData* thread_data = (ConsumerThreadData*)_thread_data;

    for (int i = thread_data->start; i < thread_data->end; i++)
    {
        int x = thread_data->q->pop();
        thread_data->sum += x;
        std::cout << x << std::endl;
    }

    return NULL;
}

TEST(Queue, MultiThread_SingleProducer_SingleConsumer)
{
    Queue<int> q;

    pthread_t t1;
    ThreadData thread_data1(&q, 0, 10);
    pthread_create(&t1, NULL, producer, (void*)&thread_data1);

    pthread_t t2;
    ConsumerThreadData thread_data2(&q, 0, 10);
    pthread_create(&t2, NULL, consumer, (void*)&thread_data2);

    pthread_join(t1, NULL);
    pthread_join(t2, NULL);

    EXPECT_EQ(q.empty(), true);
    EXPECT_EQ(q.size(), 0);
}

static void* producer_slow(void* _thread_data)
{
    ThreadData* thread_data = (ThreadData*)_thread_data;

    for (int i = thread_data->start; i < thread_data->end; i++)
    {
        sleep(1);
        thread_data->q->push(i);
    }

    return NULL;
}

TEST(Queue, MultiThread_Consumer_Meaningless_Grab_Mutex)
{
    Queue<int> q;

    pthread_t t1;
    ThreadData thread_data1(&q, 0, 3);
    pthread_create(&t1, NULL, producer_slow, (void*)&thread_data1);

    pthread_t t2;
    ConsumerThreadData thread_data2(&q, 0, 3);
    pthread_create(&t2, NULL, consumer, (void*)&thread_data2);

    pthread_join(t1, NULL);
    pthread_join(t2, NULL);

    EXPECT_EQ(q.empty(), true);
    EXPECT_EQ(q.size(), 0);
    EXPECT_EQ(thread_data2.sum, 3);
}

static void* consumer_slow(void* _thread_data)
{
    ConsumerThreadData* thread_data = (ConsumerThreadData*)_thread_data;

    for (int i = thread_data->start; i < thread_data->end; i++)
    {
        EXPECT_EQ(thread_data->q->size(), 5);
        int x = thread_data->q->pop();
        thread_data->sum += x;
        sleep(1);
        std::cout << x << std::endl;
    }

    return NULL;
}

TEST(Queue, LimitedQueueSize)
{
    Queue<int> q(5);

    pthread_t t1;
    ThreadData thread_data1(&q, 0, 10);
    pthread_create(&t1, NULL, producer, (void*)&thread_data1);

    pthread_t t2;
    ConsumerThreadData thread_data2(&q, 0, 5);
    pthread_create(&t2, NULL, consumer_slow, (void*)&thread_data2);
    
    pthread_join(t1, NULL);
    pthread_join(t2, NULL);

    EXPECT_EQ(q.empty(), false);
    EXPECT_EQ(q.size(), 5);
}

3. 实现 Queue 类的方案

可以基于 C++ 11 实现, 不过据说 C++11 的 thread 在华为手机上有问题,传闻中 pthread 能消除问题;
于是乎还有另一个选择: C++03 + pthread 实现 Queue 类。

Windows 平台上可以使用 windows-pthreads 库, 它是基于 Windows threads 模拟实现了 PThread 和 Semaphore 接口。(完)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/522845.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

基于ESP或ESP8266 单通道Lorawan网关硬件制作

软件代码设计资料下载链接》》 基于 Comresult PCB 的单通道网关 介绍 这是 ESP8266 LoRa 网关的最新版本。基于 ESP8266 mcu 的 LoRa 网关在过去几年里有了很大的发展。您想构建一个小型网关并保持尽可能多的 GPIO 引脚可用&#xff0c;Hallard 板是无与伦比的。另一种解决…

C++多态练习题

文章目录 1.虚函数的调用过程2.虚函数例题例题一例题二例题三例题四例题四 1.虚函数的调用过程 从汇编上面来看&#xff1a; []代表指针解引用操作 1.op指向test对象的首地址&#xff08;存放vptr&#xff09;&#xff0c;并存放在eax里面&#xff1b; 2.将eax所指之物(虚表…

使用不同的梯度下降法优化算法

本篇将使用以下几个梯度下降算法进行对比&#xff1a; 不使用任何的优化算法&#xff08;即整批数据集作为一次迭代更新梯度&#xff0c;也叫batch梯度下降&#xff0c;BGD&#xff09; mini-batch梯度下降法&#xff08;Mini-batchGD&#xff09; 使用具有动量的梯度下降算法&…

无标签背景图(负样本)的拼图代码

训练目标检测模型有个很令人头疼的问题&#xff0c;就是有些特征与要训练的特征较为相似的背景区域也被误检出来&#xff08;作为本应不该检测出来的负样本却被误检出为正样本的FP&#xff09;。 根据这一问题的解决办法&#xff0c;除了可以对正样本特征较为模糊或者有歧义的样…

Intel SGX学习笔记(2):用数组向Enclave传递5个数实现自增操作

写在前面 1、实现一个简单的Intel SGX的应用&#xff1a;非安全区定义初始化一个数组&#xff0c;数组里面存储5个数&#xff0c;然后向安全区&#xff08;enclave&#xff09;传入&#xff0c;在安全区中进行加减乘除&#xff0c;然后返回。 2、Intel SGX开发初学整体思路&a…

代码随想录算法训练营day39 | 62.不同路径,63. 不同路径 II

代码随想录算法训练营day39 | 62.不同路径&#xff0c;63. 不同路径 II 62.不同路径解法一&#xff1a;动态规划解法二&#xff1a;深度搜索&#xff08;明天补充&#xff09;解法三&#xff1a;数论&#xff08;明天补充&#xff09; 63. 不同路径 II解法一&#xff1a;动态规…

RuoYi-Vue下载与运行

一、源码下载 若依官网&#xff1a;RuoYi 若依官方网站 鼠标放到"源码地址"上&#xff0c;点击"RuoYi-Vue 前端分离版"。 跳转至Gitee页面&#xff0c;点击"克隆/下载"&#xff0c;复制HTTPS链接即可。 源码地址为&#xff1a;https://gitee.…

左值引用、右值引用,std::move() 的汇编解释

1&#xff1a;左值引用 引用其实还是指针&#xff0c;但回避了指针这个名字。由编译器完成从地址中取值。以vs2019反汇编&#xff1a; 如图&#xff0c;指针和引用的汇编代码完全一样。但引用在高级语言层面更友好&#xff0c;对人脑。比如可以少写一个 * 号和 -> 。 &…

F280049C实现Simulink调制,以及多个PWM实例之间的同步

文章目录 前言基本概念调制发波载波同步问题 前言 最近作实验碰到了载波不同步的问题&#xff0c;以前也有碰到过这个问题&#xff0c;现在终于解决了&#xff0c;做个记录。 为了以示区分&#xff0c;实例指ePWMx&#xff0c;x1,2,3,4,5,6,7,8&#xff1b;通道指ePWMxA/B&am…

如何使用jmeter进行压测

目录 1.概述 2.测试计划、线程组、取样器 3.调试运行 4.请求默认值 5.流量录制 6.模拟时间间隔 7.压力测试 8.报表 1.概述 一款工具&#xff0c;功能往往是很多的&#xff0c;细枝末节的地方也很多&#xff0c;实际的测试工作中&#xff0c;绝大多数场景会用到的也就是…

看大老如何用Postman+Jmeter实现接口实例

一、接口基础 为什么要单独测试接口&#xff1f; 1. 程序是分开开发的&#xff0c;前端还没有开发&#xff0c;后端已经开发完了&#xff0c;可以提前进入测试 2. 接口直接返回的数据------越底层发现bug&#xff0c;修复成本是越低的 3. 接口测试能模拟功能测试不能测到的异常…

数十位高级测工联合讲解Selenium自动化测试框架工作原理

一、Selenium是什么&#xff1f;   用官网的一句话来讲&#xff1a;Selenium automates browsers. Thats it&#xff01;简单来讲&#xff0c;Selenium是一个用于Web应用程序自动化测试工具。Selenium测试直接运行在浏览器中&#xff0c;就像真正的用户在操作浏览器一样。支持…

uvm寄存器模型

一、基础知识 前门访问与后门访问是两种寄存器的访问方式。 所谓前门访问, 指的是通过模拟cpu在总线上发出读指令, 进行读写操作。 在这个过程中, 仿真时间( $time函数得到的时间) 是一直往前走的。而后门访问是与前门访问相对的概念。 它并不通过总线进行读写操作, 而是…

2023/5/14周报

目录 摘要 论文阅读 1、标题和现存问题 2、准备知识 3、模型结构 4、实验准备 5、实验结果 深度学习 1、大气数据和水质数据 2、数据清洗 3、项目框架设定 总结 摘要 本周在论文阅读上&#xff0c;阅读了一篇时空图卷积网络:交通预测的深度学习框架的论文。文章的时…

oracle使用with as创建临时表

一、业务需求 在oracle项目的开发过程中&#xff0c;使用sql编写好对应的分析报表内容后&#xff0c;由于sql分析报表涉及到的一些线别丢失&#xff0c;导致呈现的报表分类统计时固定用醒目颜色标识的统计行数据显示错位&#xff1b;因此需要修复分析报表填充完整的线别。 二、…

LeetCode 62 不同路径

题目&#xff1a; 一个机器人位于一个 m x n 网格的左上角 &#xff08;起始点在下图中标记为 “Start” &#xff09;。机器人每次只能向下或者向右移动一步。机器人试图达到网格的右下角&#xff08;在下图中标记为 “Finish” &#xff09;。问总共有多少条不同的路径&#…

云上高校导航 小程序 开发教程

Gitee仓库&#xff1a;云上高校导航 GitHub仓库&#xff1a;云上高校导航 “云上高校导航”是一套基于小程序云开发的校园导航类系统开发方案&#xff0c;该开发方案可供开发者进行二次开发&#xff0c;用于解决师生和访客的校园出行需求。 项目优势及创新&#xff1a; 使…

Flink 常用API(2)——转换算子+聚合算子

转换算子&#xff08;Transformation&#xff09; 映射&#xff08;map&#xff09; 用于将数据流中的数据进行转换&#xff0c;形成新的数据流 “一一映射”&#xff0c;消费一个元素就产出一个元素 参数&#xff1a;接口 MapFunction 的实现 方法&#xff1a;map 返回值…

C#串口通信从入门到精通(13)——多个串口发送数据

文章目录 前言1、多串口数据的发送2、源码前言 我们在开发串口通信程序时,有时候会需要连接不止一个串口,这时候该怎么写程序呢?本文就来介绍多个串口数据的发送 1、多串口数据的发送 我们在之前的专栏中介绍了串口数据的发送,当时有提到过,我们是通过创建一个SerialPo…

支付系统设计三:渠道网关设计06-业务处理

文章目录 前言一、业务服务工厂二、业务处理服务1. 业务处理服务2. 业务处理抽象服务3. 流量控制4. 报文提交4.1 获取交易的服务端通讯列表4.2 循环请求支付渠道4.2.1 报文组装4.2.2 报文发送4.2.2.1 协议处理器获取4.2.2.2 构建通讯客户端4.2.2.3 发送请求4.2.2.4 响应报文读取…