【c++并发编程】线程池实现

news2025/1/20 14:51:40

参考https://shanhai.huawei.com/#/page-forum/post-details?postId=43796

完整代码

#include <iostream>
#include <vector>
#include <queue>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <functional>
#include <future>
#include <atomic>

// 任务优先级结构体
struct PriorityTask {
    int priority;
    std::function<void()> func;

    // 优先级比较,优先级数值越小,优先级越高
    bool operator<(const PriorityTask& other) const {
        return priority > other.priority;
    }
};

// 线程池类
class ThreadPool {
public:
    ThreadPool(size_t threads) : stop(false) {
        for (size_t i = 0; i < threads; ++i) {
            workers.emplace_back([this] {
                while (true) {
                    PriorityTask task;

                    {
                        std::unique_lock<std::mutex> lock(this->queueMutex);
                        this->condition.wait(lock, [this] { return this->stop || !this->tasks.empty(); });
                        if (this->stop && this->tasks.empty())
                            return;
                        task = std::move(this->tasks.top());
                        this->tasks.pop();
                    }

                    task.func();
                }
            });
        }
    }

    ~ThreadPool() {
        {
            std::unique_lock<std::mutex> lock(queueMutex);
            stop = true;
        }
        condition.notify_all();
        for (std::thread& worker : workers) {
            worker.join();
        }
    }

    template<class F, class... Args>
    auto enqueue(int priority, F&& f, Args&&... args) 
        -> std::future<typename std::result_of<F(Args...)>::type> {
        using return_type = typename std::result_of<F(Args...)>::type;

        auto task = std::make_shared< std::packaged_task<return_type()> >(
            std::bind(std::forward<F>(f), std::forward<Args>(args)...)
        );

        std::future<return_type> res = task->get_future();
        {
            std::unique_lock<std::mutex> lock(queueMutex);

            // Don't allow enqueueing after stopping the pool
            if(stop)
                throw std::runtime_error("enqueue on stopped ThreadPool");

            tasks.emplace(PriorityTask{priority, [task]() { (*task)(); }});
        }
        condition.notify_one();
        return res;
    }

private:
    // Need to keep track of threads so we can join them
    std::vector< std::thread > workers;
    // The task queue
    std::priority_queue< PriorityTask > tasks;

    // Synchronization
    std::mutex queueMutex;
    std::condition_variable condition;
    std::atomic<bool> stop;
};

// 使用示例
int main() {
    ThreadPool pool(4);

    auto result1 = pool.enqueue(1, []() -> int {
        std::cout << "Executing task 1" << std::endl;
        return 1;
    });

    auto result2 = pool.enqueue(0, []() -> int {
        std::cout << "Executing task 2" << std::endl;
        return 2;
    });

    std::cout << "Task 1 result: " << result1.get() << std::endl;
    std::cout << "Task 2 result: " << result2.get() << std::endl;

    return 0;
}

任务定义

// 任务优先级结构体
struct PriorityTask {
    int priority;
    std::function<void()> func;

    // 优先级比较,优先级数值越小,优先级越高
    bool operator<(const PriorityTask& other) const {
        return priority > other.priority;
    }
};

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
回调函数是一种编程模式,允许一个函数在其他函数完成某些操作后被调用。回调函数通常用于异步编程,即在等待某些操作完成时,可以执行其他任务,然后在操作完成时调用回调函数。
例如,在网络编程中,当发送一个请求后,可以注册一个回调函数,当接收到响应时,这个回调函数会被调用。这样,程序可以在等待响应时执行其他任务,而不是阻塞等待。
在C++中,可以使用std::function对象来存储和调用回调函数。例如:

在这里插入图片描述
在这里插入图片描述

在这个例子中,sendRequest函数接受一个URL和一个回调函数作为参数。在发送请求并接收响应后,它会调用回调函数onResponse,并传递状态码和响应。
需要注意的是,回调函数的使用需要谨慎,因为如果回调函数在被调用前就被销毁,可能会导致未定义行为。因此,通常需要确保回调函数的生命周期足够长,或者使用std::shared_ptr等机制来管理回调函数的生命周期。
总的来说,回调函数是一种强大的编程模式,可以用于实现异步编程、事件驱动编程等复杂功能,通过使用std::function对象,可以方便地存储和调用回调函数。

线程池初始化

class ThreadPool {
public:
    ThreadPool(size_t threads) : stop(false) {
        for (size_t i = 0; i < threads; ++i) {
            workers.emplace_back([this] {
                while (true) {
                    PriorityTask task;

                    {
                        std::unique_lock<std::mutex> lock(this->queueMutex);
                        this->condition.wait(lock, [this] { return this->stop || !this->tasks.empty(); });
                        if (this->stop && this->tasks.empty())
                            return;
                        task = std::move(this->tasks.top());
                        this->tasks.pop();
                    }

                    task.func();
                }
            });
        }
    }

在这里插入图片描述
task = std::move(this->tasks.top())
在这里插入图片描述

std:thread 初始化

#include <thread>
#include <iostream>

void printHello() {
    std::cout << "Hello, world!" << std::endl;
}

int main() {
    std::thread t(printHello);
    t.join();
    return 0;
}

在C++中,std::thread是一个用于表示并发执行线程的类,它是C++标准库中的一部分。std::thread对象可以通过构造函数来初始化,并且可以通过调用其成员函数来控制线程的执行。
以下是一个简单的例子,展示了如何初始化和使用std::thread对象:

#include <thread>
#include <iostream>

void printHello() {
    std::cout << "Hello, world!" << std::endl;
}

int main() {
    std::thread t(printHello);
    t.join();
    return 0;
}

在这个例子中,std::thread t(printHello);创建了一个std::thread对象t,并将其初始化为执行printHello函数。t.join();等待线程t完成执行,然后继续执行后面的代码。
需要注意的是,std::thread对象在创建后会立即启动线程,并执行指定的任务。如果在线程完成执行之前销毁std::thread对象,可能会导致未定义行为。因此,通常需要确保std::thread对象的生命周期足够长,或者使用std::shared_ptr等机制来管理线程的生命周期。
总的来说,std::thread对象可以通过构造函数来初始化,并且可以通过调用其成员函数来控制线程的执行。通过使用std::thread,可以方便地实现多线程编程,并行执行任务。
在这里插入图片描述

线程池入队

template<class F, class... Args>
auto enqueue(int priority, F&& f, Args&&... args) 
    -> std::future<typename std::result_of<F(Args...)>::type> {
    using return_type = typename std::result_of<F(Args...)>::type;

    auto task = std::make_shared< std::packaged_task<return_type()> >(
        std::bind(std::forward<F>(f), std::forward<Args>(args)...)
    );

    std::future<return_type> res = task->get_future();
    {
        std::unique_lock<std::mutex> lock(queueMutex);

        // Don't allow enqueueing after stopping the pool
        if(stop)
            throw std::runtime_error("enqueue on stopped ThreadPool");

        tasks.emplace(PriorityTask{priority, [task]() { (*task)(); }});
    }
    condition.notify_one();
    return res;
}

这段代码是一个线程池类的成员函数,用于将一个新的任务添加到任务队列中。
首先,它使用模板参数F和Args来接受任务的函数和参数。然后,它使用std::bind来创建一个可调用对象,该对象在被调用时会执行函数f并传递参数args。
接着,它使用std::make_shared来创建一个std::packaged_task对象,该对象可以在另一个线程中执行任务。std::packaged_task对象包装了一个可调用对象,并提供了一个get_future方法,该方法返回一个std::future对象,该对象可以用来获取任务的结果。
然后,它使用std::unique_lock来获取queueMutex互斥锁,以确保在修改任务队列时不会有其他线程干扰。
在获取锁之后,它检查stop标志,如果线程池已经停止,那么就抛出一个运行时错误。然后,它将新的任务添加到任务队列中,并使用condition.notify_one()来唤醒一个等待condition条件变量的线程。
最后,它返回std::future对象,该对象可以用来获取任务的结果。
总的来说,这个函数的主要作用是将一个新的任务添加到任务队列中,并返回一个std::future对象,该对象可以用来获取任务的结果。

task { (*task)(); }是一个lambda表达式,它是一个没有参数的函数对象。这个lambda表达式在被调用时会执行task中的任务。
在这个例子中,task是一个std::shared_ptr,它指向一个std::packaged_task对象。std::packaged_task对象包装了一个可调用对象,该对象在被调用时会执行函数f并传递参数args。
当task { (*task)(); }被调用时,它会解引用task智能指针,然后调用std::packaged_task对象的operator(),这会执行包装的任务。
这种用法的主要目的是将一个任务封装成一个可调用对象,这样就可以将任务作为参数传递给其他函数,或者存储在数据结构中,以便在未来的某个时间点执行。
需要注意的是,lambda表达式是一种匿名函对象,它可以捕获外部变量并在其自身的函数体中使用它们。在这个例子中,lambda表达式捕获了task智能指针,并在其函数体中解引用并调用它。这样就可以在新的线程中执行任务。

在lambda表达式中,&task { (*task)(); }会捕获task智能指针的引用,而不是值。这意味着,如果task智能指针在lambda表达式之外被销毁,那么在lambda表达式中使用task时就会出现悬挂引用,这是一种未定义行为。
因此,正确的做法是使用task { (*task)(); }来捕获task智能指针的值,这样就可以确保task智能指针在lambda表达式执行期间一直有效。
需要注意的是,捕获值会导致lambda表达式拷贝task智能指针
在这里插入图片描述

线程池销毁

~ThreadPool() {
    {
        std::unique_lock<std::mutex> lock(queueMutex);
        stop = true;
    }
    condition.notify_all();
    for (std::thread& worker : workers) {
        worker.join();
    }
}

这段代码是一个线程池的析构函数,它的主要目的是在销毁线程池时正确地关闭所有的工作线程。
首先,它使用std::unique_lock来获取queueMutex互斥锁,以确保在修改stop标志时不会有其他线程干扰。然后,它将stop标志设置为true,表示线程池需要停止运行。
接着,它调用condition.notify_all()来唤醒所有正在等待condition条件变量的线程。这些线程通常是工作线程,它们在等待新的任务出现在任务队列中。
最后,它遍历所有的工作线程,并调用join()方法来等待每个线程的结束。join()方法会阻塞当前线程,直到指定的线程结束执行。这样可以确保在销毁线程池时,所有的工作线程都已经正确地结束了。
总的来说,这段代码的目的是在销毁线程池时,正确地关闭所有的工作线程,并确保没有任何线程在运行。

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2218174.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

牛企查:性价比很高的企业综合查询小程序

很多人都会有查询企业信息的需求&#xff1a; 入职公司前查询企业的基本信息&#xff1b; 只是需要简单便捷查询到企业的信用代码注册地址等基础信息&#xff1b; 做企业调查&#xff0c;分析时需要用到企业的一些数据&#xff1b; 研究一些单项数据的时候&#xff0c;需要…

拟声 0.37.0 | 拟物风格,超级优美,功能丰富

拟声是一款功能丰富的音视频播放器&#xff0c;支持多种音频来源&#xff0c;并具备独特的歌词弹幕、音源转换、跨设备共享与控制等功能。其创新的LRC歌词编解码器和新拟物风格的UI设计为用户提供了一个全新的视听体验。 大小&#xff1a;36M 百度网盘&#xff1a;https://pan…

代码审计-Python Flask

1.Jinjia2模版注入 Flask是一个使用 Python 编写的轻量级 Web 应用框架。其 WSGI 工具箱采用 Werkzeug &#xff0c;模板引擎则使用 Jinja2。jinja2是Flask作者开发的一个模板系统&#xff0c;起初是仿django模板的一个模板引擎&#xff0c;为Flask提供模板支持&#xff0c;由于…

ubuntu下安装mysql遇到的问题

ubuntu下安装mysql sudo apt install -y mysql-server 出现问题 ……by process 3455 解决 安装 启动 systemctl status mysql.service sudo mysql -u root -p 如何修改密码 与datagrip的连接 查看IP ifconfig 若没安装 参考 Windows10的DataGrip2024.1.4连接ubuntu22.04中的M…

27.第二阶段x86游戏实战2-遍历周围NPC跳出递归循环

免责声明&#xff1a;内容仅供学习参考&#xff0c;请合法利用知识&#xff0c;禁止进行违法犯罪活动&#xff01;0 本次游戏没法给 内容参考于&#xff1a;微尘网络安全 本人写的内容纯属胡编乱造&#xff0c;全都是合成造假&#xff0c;仅仅只是为了娱乐&#xff0c;请不要…

公开课 | 2024清华大模型公开课 第6课 大模型前沿架构 Part 2(长文本、Scaling Law)

本文由readlecture.cn转录总结专注于音、视频转录与总结&#xff0c;2小时视频&#xff0c;5分钟阅读&#xff0c;加速内容学习与传播。 大纲 引言 介绍长文本和Scaling Law的研究方向 强调大型语言模型在日常生活中的应用 长文本处理 长文本的定义和例子 《哈利波特》系列书…

KubeSphere安装mysql8.4.0

背景 KubeSphere 是在 Kubernetes 之上构建的以应用为中心的多租户容器平台,完全开源,提供全栈的 IT 自动化运维的能力,简化企业的 DevOps 工作流。KubeSphere 提供了运维友好的向导式操作界面&#xff0c;帮助企业快速构建一个强大和功能丰富的容器云平台。 安装组件前提&am…

Metasploit渗透测试之攻击终端设备和绕过安全软件

概述 在之前&#xff0c;重点讨论了针对服务器端的利用。但在当下&#xff0c;最成功的攻击都是针对终端的&#xff1b;原因是&#xff0c;随着大部分安全预算和关注都转向面向互联网的服务器和服务&#xff0c;越来越难找到可利用的服务&#xff0c;或者至少是那些还没有被破…

阿里商品发布框架如何覆盖海量规则

1688商品发布系统升级发品框架GPF&#xff0c;面对商品模型复杂度极高&#xff0c;发布的海量场景、多重业务逻辑如何覆盖&#xff1f; 本文从手工测试到自动化测试&#xff0c;以及完善的质量保障方案一一解答。 1、项目背景 1688商品发布系统运行多年&#xff0c;架构逐步…

OFDM学习-IP核学习-FIFO IP核和FFT IP核在vavido中的配置以及使用

FIFO IP核和FFT IP核在vavido中的配置以及使用 前言一、FFT IP核配置过程二、FIFO IP核配置过程总结 前言 记录一下OFDM学习中遇到的ip核使用方法&#xff0c;个人之前主要用Quatus&#xff0c;之前用ip核也比较少&#xff0c;记录一下配置过程吧以及一些参数的含义&#xff0…

【linux】Microsoft Edge 的 Bookmarks 文件存储位置

在 Linux 系统中&#xff0c;Microsoft Edge 的书签&#xff08;Bookmarks&#xff09;文件存储在用户的配置目录下。具体路径通常如下&#xff1a; ~/.config/microsoft-edge/Default/Bookmarks说明&#xff1a; 路径解释&#xff1a; ~ 表示当前用户的主目录。.config 是一个…

代替AD作为身份认证组件,深信服零信任aTrust与宁盾身份目录实现互操作

9月25日&#xff0c;经深信服科技股份有限公司和上海宁盾信息科技有限公司共同严格测试&#xff1a;宁盾身份目录服务软件能够与深信服零信任访问控制系统 aTrust 兼容对接运行&#xff0c;双方相互兼容&#xff0c;共同为企事业单位提供身份和零信任管理需求。 本次测试包含了…

笔记整理—linux网络部分(2)Linux网络框架

前文说过&#xff0c;在OSI中将网络分为7层&#xff0c;这是理论上将其分为7层&#xff0c;但实际上可以将其分为4层。如TCP协议就是将其分为4层。理论只是提出一种指导意见&#xff0c;但不是行业范本。 驱动层只关系有没有接到包&#xff0c;不关心包经过多少次转发&#xff…

Java 实战虚拟机 进阶 (一 万字)

实战 Java 虚拟机-高级篇 什么是 GraalVM GraalVM 是 Oracle 官方推出的一款 **高性能JDK&#xff0c;**使用它享受比 OpenJDK 或者 OracleJDK 更好的性能。 GraalVM 的官方网址&#xff1a;https://www.graalvm.org/官方标语&#xff1a;Build faster, smaller, leaner appli…

搭子小程序:全新在线找搭子,满足社交

搭子作为一种新的社交方式&#xff0c;为大众带来的各种陪伴型的社交模式&#xff0c;不管是饭搭子、健身、遛狗、学习等&#xff0c;都可以找到适合自己的搭子。搭子主打各个领域的陪伴&#xff0c;双方都能够在社交相处中保持着边界感&#xff0c;不涉及情感纠葛等&#xff0…

群晖前面加了雷池社区版,安装失败,然后无法识别出用户真实访问IP

有nas的相信对公网都不模式&#xff0c;在现在基础上传带宽能有100兆的时代&#xff0c;有公网代表着家里有一个小服务器&#xff0c;像百度网盘&#xff0c;优酷这种在线服务都能部署为私有化服务。但现在运营商几乎不可能提供公网ip&#xff0c;要么自己买个云服务器做内网穿…

【实战篇】用SkyWalking排查线上[xxl-job xxl-rpc remoting error]问题

一、组件简介和问题描述 SkyWalking 简介 Apache SkyWalking 是一个开源的 APM&#xff08;应用性能管理&#xff09;工具&#xff0c;专注于微服务、云原生和容器化环境。它提供了分布式追踪、性能监控和依赖分析等功能&#xff0c;帮助开发者快速定位和解决性能瓶颈和故障。…

矢量线段摆正-二维旋转

用途&#xff1a;通过一些算法&#xff0c;生成了一些矢量线段&#xff0c;但是没有保持绝对的水平、垂直&#xff0c;需要校正。 如下图所示&#xff0c;白色线为初始的矢量线段&#xff0c;只是能达到大致水平&#xff0c;红色线段为校正后&#xff0c;此时&#xff0c;红色…

Hive优化:Hive的执行计划、分桶、MapJoin、数据倾斜

文章目录 1. hive的执行计划1.1 为什么使用EXPLAIN1.2 使用EXPLAIN的步骤1.3 EXPLAIN在什么场合使用 2. 分桶2.1 为什么要使用分桶 3. Map Join3.1 Map Join3.1.1 大小表关联3.1.2 不等连接 3.2 Bucket-MapJoin3.2.1 作用3.2.2 条件 3.3 SMB Join3.3.1 作用 4. 数据倾斜4.1 表连…

nginx过滤模块怎么生效的

在nginx中&#xff0c;如果你要开发一个过滤模块&#xff0c;config中必须要加 HTTP_FILTER_MODULES$HTTP_FILTER_MODULES xxx 否则&#xff0c;即使在postconfiguration回调中加了ngx_http_top_header_filtermy_xxxx_filter_handle&#xff0c;最终my_xxxx_filter_handle也不…