从零开始实现 C++ TinyWebServer 阻塞队列 BlockQueue类详解

news2025/3/25 11:03:29

文章目录

  • 阻塞队列是什么?
  • 为什么需要阻塞队列?
  • BlockQueue 成员变量
  • 实现 push() 函数
  • 实现 pop() 函数
  • 实现 close() 函数
  • BlockQueue 代码
  • BlockQueue 测试

从零开始实现 C++ TinyWebServer 项目总览

项目源码

阻塞队列是什么?

阻塞队列是一种线程安全的数据结构,支持多线程环境中的生产者-消费者模型。其核心特点在于,当队列为空时,消费者线程会进入阻塞状态,直到有新的数据可供消费;而当队列已满时,生产者线程会被阻塞,直至队列中有空闲空间可供使用。

  • 线程安全:借助同步机制,有效避免了多个线程同时操作队列时可能出现的数据竞争问题,确保数据的一致性和完整性。

  • 容量限制:可以根据实际需求灵活设置队列的容量上限,当队列达到最大容量时,生产者线程会被阻塞,避免数据溢出。

  • 阻塞操作:当队列为空时,消费者线程会自动等待;当队列满时,生产者线程也会进入等待状态,实现了线程间的协调与同步。

为什么需要阻塞队列?

  • 解耦生产消费:将日志信息的产生和存储过程进行分离,应用程序只需将日志快速放入队列,无需等待写入操作完成,从而提高了系统的可维护性。

  • 平衡速度差异:有效应对日志生产速度不稳定和消费速度受存储设备性能限制的问题,队列能够缓存多余的日志信息,实现动态平衡,确保系统的稳定运行。

  • 提升并发性能:支持多线程协作,生产者和消费者线程可以同时工作,充分利用多核处理器的性能优势,同时队列的同步机制避免了线程竞争,提高了系统的并发处理能力。

  • 保证日志顺序:阻塞队列的先进先出特性确保日志按产生顺序进行存储,便于后续的问题排查和分析。

BlockQueue 成员变量

bool is_close;          // 是否关闭
size_t capacity_;       // 容量
std::deque<T> deque_;   // 双向队列 

std::mutex mtx_;      // 锁
std::condition_variable condition_producer; // 生产者条件变量
std::condition_variable condition_consumer; // 消费者条件变量

实现 push() 函数

获取互斥锁mtx_保证线程安全;检查队列是否已满,满了就等待;向队列添加元素;通知一个等待的消费者线程。

void push_back(const T& item);

向阻塞队列的尾部添加一个元素。

template<class T>
void BlockQueue<T>::push_back(const T& item) {
    std::unique_lock<std::mutex> locker(mtx_);
    // 队列满了,暂停生产
    while (deque_.size() >= capacity_)
        condition_producer.wait(locker); // 防止虚假唤醒
    deque_.push_back(item);
    condition_consumer.notify_one();
}

向阻塞队列的头部添加一个元素。

template<class T>
void BlockQueue<T>::push_front(const T& item) {
    std::unique_lock<std::mutex> locker(mtx_);
    while (deque_.size() >= capacity_)
        condition_producer.wait(locker);
    deque_.push_front(item);
    condition_consumer.notify_one();
}

实现 pop() 函数

获取互斥锁mtx_;检查队列是否为空,如果为空且队列未关闭,就等待;如果队列关闭,返回false;取出队列头部元素,存储到item中,并移除队列头部元素;通知一个等待的生产者线程,队列中有空间了。

bool pop(T& item);

从阻塞队列的头部取出一个元素。

template<class T>
bool BlockQueue<T>::pop(T& item) {
    std::unique_lock<std::mutex> locker(mtx_);
    // 队列空了,暂停消费
    while (deque_.empty()) {
        if (is_close)
            return false;
        condition_consumer.wait(locker);
    }
    item = deque_.front();
    deque_.pop_front();
    condition_producer.notify_one();
    return true;
}
bool pop(T& item, int timeout);

从阻塞队列的头部取出一个元素,但是有超时机制。

template<class T>
bool BlockQueue<T>::pop(T& item, int timeout) {
    std::unique_lock<std::mutex> locker(mtx_);
    const std::cv_status TIMEOUT_STATUS = std::cv_status::timeout;
    while (deque_.empty()) {
        if (is_close)
            return false;
        if (condition_consumer.wait_for(locker, 
            std::chrono::seconds(timeout)) == TIMEOUT_STATUS)
            return false;
    }
    item = deque_.front();
    deque_.pop_front();
    condition_producer.notify_one();
    return true;
}

实现 close() 函数

获取互斥锁mtx_,清空队列,设置关闭标志is_closetrue;通知所有等待的生产者和消费者线程。

void close();

关闭阻塞队列,释放所有等待的生产者和消费者线程。

template<class T>
void BlockQueue<T>::close() {
    {
        std::lock_guard<std::mutex> locker(mtx_);
        deque_.clear();
        is_close = true;
    }
    condition_producer.notify_all();
    condition_consumer.notify_all();
}

BlockQueue 代码

模板的定义和实现要放在同一个头文件中,因为模板的代码需要在编译时实例化。

block

#ifndef BLOCKQUEUE_H
#define BLOCKQUEUE_H

#include <iostream>
#include <deque>
#include <mutex>
#include <condition_variable>
#include <chrono>
#include <assert.h>

template<class T>
class BlockQueue {
public:
    BlockQueue(size_t max_size = 1000);
    ~BlockQueue();

    bool empty();
    bool full();
    void clear();
    size_t size();
    size_t capacity();

    void push_front(const T& item);
    void push_back(const T& item);
    bool pop(T& item);
    bool pop(T& item, int timeout);

    T front();
    T back();
    void flush();
    void close();

private:
    bool is_close;          // 是否关闭
    size_t capacity_;       // 容量
    std::deque<T> deque_;   // 双向队列 

    std::mutex mtx_;      // 锁
    std::condition_variable condition_producer; // 生产者条件变量
    std::condition_variable condition_consumer; // 消费者条件变量
};

// 模板的定义和实现要放在同一个头文件中
// 因为模板的代码需要在编译时实例化

template<class T>
BlockQueue<T>::BlockQueue(size_t max_size) : capacity_(max_size) {
    assert(max_size > 0);
    is_close = false;
}

template<class T>
BlockQueue<T>::~BlockQueue() {
    close();
}   

template<class T>
void BlockQueue<T>::push_back(const T& item) {
    std::unique_lock<std::mutex> locker(mtx_);
    // 队列满了,暂停生产
    while (deque_.size() >= capacity_)
        condition_producer.wait(locker); // 防止虚假唤醒
    deque_.push_back(item);
    condition_consumer.notify_one();
}

template<class T>
void BlockQueue<T>::push_front(const T& item) {
    std::unique_lock<std::mutex> locker(mtx_);
    while (deque_.size() >= capacity_)
        condition_producer.wait(locker);
    deque_.push_front(item);
    condition_consumer.notify_one();
}

template<class T>
bool BlockQueue<T>::pop(T& item) {
    std::unique_lock<std::mutex> locker(mtx_);
    // 队列空了,暂停消费
    while (deque_.empty()) {
        if (is_close)
            return false;
        condition_consumer.wait(locker);
    }
    item = deque_.front();
    deque_.pop_front();
    condition_producer.notify_one();
    return true;
}

template<class T>
bool BlockQueue<T>::pop(T& item, int timeout) {
    std::unique_lock<std::mutex> locker(mtx_);
    const std::cv_status TIMEOUT_STATUS = std::cv_status::timeout;
    while (deque_.empty()) {
        if (is_close)
            return false;
        if (condition_consumer.wait_for(locker, 
            std::chrono::seconds(timeout)) == TIMEOUT_STATUS)
            return false;
    }
    item = deque_.front();
    deque_.pop_front();
    condition_producer.notify_one();
    return true;
}

// 关闭阻塞队列,唤醒所有生产者和消费者
template<class T>
void BlockQueue<T>::close() {
    {
        std::lock_guard<std::mutex> locker(mtx_);
        deque_.clear();
        is_close = true;
    }
    condition_producer.notify_all();
    condition_consumer.notify_all();
}

// 唤醒消费者
template<class T>
void BlockQueue<T>::flush() {
    condition_consumer.notify_one();
}

template<class T>
T BlockQueue<T>::front() {
    std::lock_guard<std::mutex> locker(mtx_);
    return deque_.front();
}

template<class T>
T BlockQueue<T>::back() {
    std::lock_guard<std::mutex> locker(mtx_);
    return deque_.back();
}

template<class T>
bool BlockQueue<T>::empty() {
    std::lock_guard<std::mutex> locker(mtx_);
    return deque_.empty();
}

template<class T>
bool BlockQueue<T>::full() {
    std::lock_guard<std::mutex> locker(mtx_);
    return deque_.size() >= capacity_;
}

template<class T>
void BlockQueue<T>::clear() {
    std::lock_guard<std::mutex> locker(mtx_);
    deque_.clear();
}

template<class T>
size_t BlockQueue<T>::size() {
    std::lock_guard<std::mutex> locker(mtx_);
    return deque_.size();
}

template<class T>
size_t BlockQueue<T>::capacity() {
    std::lock_guard<std::mutex> locker(mtx_);
    return capacity_;
}

#endif // BLOCKQUEUE_H

BlockQueue 测试

利用Google TestBlockQueue类进行单元测试,测试对Buffer的基本功能,延时出队,多线程下的生产者-消费者模型,关闭操作。

#include "../code/log/blockqueue.h"
#include <thread>
#include <chrono>
#include <gtest/gtest.h>

// 测试 BlockQueue 的基本功能
TEST(BlockQueueTest, TestBasicFunctionality) {
    BlockQueue<int> queue(5);
    EXPECT_TRUE(queue.empty());
    EXPECT_EQ(queue.capacity(), 5);
    
    for (int i = 0; i < 5; ++i)
        queue.push_back(i);
    EXPECT_TRUE(queue.full());
    EXPECT_EQ(queue.size(), 5);

    int item;
    EXPECT_TRUE(queue.pop(item));
    EXPECT_EQ(item, 0);
    EXPECT_EQ(queue.front(), 1);
    EXPECT_EQ(queue.back(), 4);

    queue.push_front(5);
    EXPECT_EQ(queue.front(), 5);

    queue.clear();
    EXPECT_TRUE(queue.empty());
}

// 测试带有超时的 pop 操作
TEST(BlockQueueTest, TestPopWithTimeout) {
    BlockQueue<int> queue(5);
    int item;
    // 等待失败,花费1s
    EXPECT_FALSE(queue.pop(item, 1));

    queue.push_back(2);
    // 等待成功,不耗时
    EXPECT_TRUE(queue.pop(item, 1));
    EXPECT_EQ(item, 2);
}

// 测试多线程下的 生产者-消费者模式
TEST(BlockQueueTest, TestProducerConsumer) {
    BlockQueue<int> queue(5);

    std::thread producer([&queue]() {
        for (int i = 0; i < 10; ++i) {
            queue.push_back(i);
            // 模拟生产过程
            std::this_thread::sleep_for(std::chrono::milliseconds(200));
        }
    });

    std::thread consumer([&queue]() {
        int item;
        for (int i = 0; i < 10; ++i) {
            if (queue.pop(item))
                EXPECT_EQ(item, i);
            else
                FAIL() << "Failed to consume an item.";
            // 模拟消费过程
            std::this_thread::sleep_for(std::chrono::milliseconds(100));
        }
    });

    producer.join();
    consumer.join();
}

// 测试关闭操作
TEST(BlockQueueTest, TestClose) {
    BlockQueue<int> queue(5);

    std::thread producer([&queue]() {
        for (int i = 0; i < 10; ++i) {
            queue.push_back(i);
            std::this_thread::sleep_for(std::chrono::milliseconds(100));
        }
        queue.close();
    });

    std::thread consumer([&queue]() {
        int item;
        while (queue.pop(item))
            continue;
        EXPECT_TRUE(queue.empty());
    });

    producer.join();
    consumer.join();
}

int main(int argc, char* argv[]) {
    ::testing::InitGoogleTest(&argc, argv);
    return RUN_ALL_TESTS(); 
}

CMakeLists.txt

cmake_minimum_required(VERSION 3.10)
project(tests)

# 设置 C++ 标准和编译器选项
set(CMAKE_CXX_STANDARD 14)
set(CMAKE_CXX_STANDARD_REQUIRED ON)
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wall -Wextra")

# 查找 Google Test 包
find_package(GTest REQUIRED)
# 包含 Google Test 头文件目录
include_directories(${GTEST_INCLUDE_DIRS})
# 添加可执行文件
add_executable(blockqueue_unit_test blockqueue_unit_test.cc)
# 链接 Google Test 库
target_link_libraries(blockqueue_unit_test ${GTEST_LIBRARIES} pthread)
# 启用测试
enable_testing()
# 添加测试
add_test(NAME blockqueue_unit_test COMMAND blockqueue_unit_test)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2319066.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Linux驱动开发基础(can)

目录 1.can的介绍 2.can的硬件连接 2.1 CPU自带can控制器 2.2 CPU没有can控制器 3.电气属性 4.can的特点 5.can协议 5.1 can的种类 5.2 数据帧 5.2.1 标准数据帧格式 5.3.1 扩展数据帧格式 5.3 遥控帧 5.4 错误帧 5.5 过载帧 5.6 帧间隔 5.7 位填充 5.8 位时…

leetcode热题100道——字母异位词分组

给你一个字符串数组&#xff0c;请你将 字母异位词 组合在一起。可以按任意顺序返回结果列表。 字母异位词 是由重新排列源单词的所有字母得到的一个新单词。 示例 1: 输入: strs ["eat", "tea", "tan", "ate", "nat", &…

MCU-芯片时钟与总线和定时器关系,举例QSPI

时钟源&#xff1a; 时钟源为系统时钟提供原始频率信号&#xff0c;系统时钟则通过&#xff08;分频、倍频、选择器&#xff09;成为整个芯片的“主时钟”&#xff0c;驱动 CPU 内核、总线&#xff08;AHB、APB&#xff09;及外设的运行。 内部时钟源&#xff1a; HSI&#x…

技术分享 | MySQL内存使用率高问题排查

本文为墨天轮数据库管理服务团队第51期技术分享&#xff0c;内容原创&#xff0c;如需转载请联系小墨&#xff08;VX&#xff1a;modb666&#xff09;并注明来源。 一、问题现象 问题实例mysql进程实际内存使用率过高 二、问题排查 2.1 参数检查 mysql版本 &#xff1a;8.0.…

分享一个精灵图生成和拆分的实现

概述 精灵图&#xff08;Sprite&#xff09;是一种将多个小图像合并到单个图像文件中的技术&#xff0c;广泛应用于网页开发、游戏开发和UI设计中。在MapboxGL中&#xff0c;跟之配套的还有一个json文件用来记录图标的大小和位置。本文分享基于Node和sharp库实现精灵图的合并与…

函数:形参和实参

在函数的使用过程中分为实参和形参&#xff0c;实参是主函数实际调用的值而形参则是给实参调用的值&#xff0c;如果函数没被调用则函式不会向内存申请空间&#xff0c;先用一段代码演示 形参&#xff1a; int test(int x ,int y ) {int z 0;z x y;return z; } 为何会叫做…

【C#知识点详解】ExcelDataReader介绍

今天来给大家介绍一下ExcelDataReader&#xff0c;ExcelDataReader是一个轻量级的可快速读取Excel文件中数据的工具。话不多说直接开始。 ExcelDataReader简介 ExcelDataReader支持.xlsx、.xlsb、.xls、.csv格式文件的读取&#xff0c;版本基本在2007及以上版本&#xff0c;支…

《视觉SLAM十四讲》ch13 设计SLAM系统 相机轨迹实现

前言 相信大家在slam学习中&#xff0c;一定会遇到slam系统的性能评估问题。虽然有EVO这样的开源评估工具&#xff0c;我们也需要自己了解系统生成的trajectory.txt的含义&#xff0c;方便我们更好的理解相机的运行跟踪过程。 项目配置如下&#xff1a; 数据解读&#xff1a; …

在类Unix终端中如何实现快速进入新建目录

&#x1f6aa; 前言 相信喜欢使用终端工作的小伙伴或多或少会被一个小地方给膈应&#xff0c;那就是每次想要新建一个文件夹并且进入之&#xff0c;那么就需要两条指令&#xff1a;mkdir DIR和cd DIR&#xff0c;有些人可能要杠了&#xff0c;我一条指令也能&#xff0c;mkdir…

TG电报群管理机器人定制开发的重要性

在Telegram&#xff08;电报&#xff09;用户突破20亿、中文社群规模持续扩张的背景下&#xff0c;定制化群管理机器人的开发已成为社群运营的战略刚需。这种技术工具不仅解决了海量用户管理的效率难题&#xff0c;更通过智能化功能重构了数字社群的治理范式。本文从管理效能、…

VNA操作使用学习-01 界面说明

以我手里面的liteVNA为例。也可以参考其他的nanoVNA的操作说明。我先了解一下具体的菜单意思。 今天我想做一个天调&#xff0c;居然发现我连一颗基本的50欧姆插件电阻和50欧姆的smt电阻的幅频特性都没有去测试过&#xff0c;那买来这个nva有什么用途呢&#xff0c;束之高阁求…

耘想Docker版Linux NAS的安装说明

耘想LinNAS&#xff08;Linux NAS&#xff09;可以通过Docker部署&#xff0c;支持x86和arm64两种硬件架构。下面讲解LinNAS的部署过程。 1. 安装Docker CentOS系统&#xff1a;yum install docker –y Ubuntu系统&#xff1a;apt install docker.io –y 2. 下载LinNas镜像…

OpenCV图像拼接(4)图像拼接模块的一个匹配器类cv::detail::BestOf2NearestRangeMatcher

操作系统&#xff1a;ubuntu22.04 OpenCV版本&#xff1a;OpenCV4.9 IDE:Visual Studio Code 编程语言&#xff1a;C11 算法描述 cv::detail::BestOf2NearestRangeMatcher 是 OpenCV 库中用于图像拼接模块的一个匹配器类&#xff0c;专门用于寻找两幅图像之间的最佳特征点匹配…

不用 Tomcat?SpringBoot 项目用啥代替?

在SpringBoot框架中&#xff0c;我们使用最多的是Tomcat&#xff0c;这是SpringBoot默认的容器技术&#xff0c;而且是内嵌式的Tomcat。 同时&#xff0c;SpringBoot也支持Undertow容器&#xff0c;我们可以很方便的用Undertow替换Tomcat&#xff0c;而Undertow的性能和内存使…

Zabbix安装(保姆级教程)

Zabbix 是一款开源的企业级监控解决方案&#xff0c;能够监控网络的多个参数以及服务器、虚拟机、应用程序、服务、数据库、网站和云的健康状况和完整性。它提供了灵活的通知机制&#xff0c;允许用户为几乎任何事件配置基于电子邮件的告警&#xff0c;从而能够快速响应服务器问…

鸿蒙开发真机调试:无线调试和USB调试

前言 在鸿蒙开发的旅程中&#xff0c;真机调试堪称至关重要的环节&#xff0c;其意义不容小觑。虽说模拟器能够为我们提供初步的测试环境&#xff0c;方便我们在开发过程中快速预览应用的基本效果&#xff0c;但它与真机环境相比&#xff0c;仍存在诸多差异。就好比在模拟器中…

工厂函数详解:概念、目的与作用

一、什么是工厂函数&#xff1f; 工厂函数&#xff08;Factory Function&#xff09;是一种设计模式&#xff0c;其核心是通过一个函数来 创建并返回对象&#xff0c;而不是直接使用 new 或构造函数实例化对象。它封装了对象的创建过程&#xff0c;使代码更灵活、可维护。 二、…

Python简单爬虫实践案例

学习目标 能够知道Web开发流程 能够掌握FastAPI实现访问多个指定网页 知道通过requests模块爬取图片 知道通过requests模块爬取GDP数据 能够用pyecharts实现饼图 能够知道logging日志的使用 一、基于FastAPI之Web站点开发 1、基于FastAPI搭建Web服务器 # 导入FastAPI模…

基于springboot的房产销售系统(016)

摘 要 随着科学技术的飞速发展&#xff0c;各行各业都在努力与现代先进技术接轨&#xff0c;通过科技手段提高自身的优势&#xff1b;对于房产销售系统当然也不能排除在外&#xff0c;随着网络技术的不断成熟&#xff0c;带动了房产销售系统&#xff0c;它彻底改变了过去传统的…

云盘搭建笔记

报错问题&#xff1a; No input file specified. 伪静态 location / {if (!-e $request_filename) { rewrite ^(.*)$ /index.php/$1 last;break;} } location / { if (!-e $request_filename) { rewrite ^(.*)$ /index.php/$1 last; break; } } 设…