仿RabbiteMq简易消息队列基础篇(future操作实现异步线程池)

news2024/9/22 13:25:13
@TOC

介绍        

        std::future 是C++11标准库中的一个模板类,他表示一个异步操作的结果,当我们在多线程编程中使用异步任务时,std::future可以帮助我们在需要的时候,获取任务的执行结果,std::future 的一个重要特性是能够阻塞当前线程,直到异步操作完成,从而确保我们在获取结果时不会遇到未完成的操作。

应用场景

  • 异步任务:当我们需要在后台执行一些耗时操作时,如网络请求或计算密集型任务等,std::future可以用来表示这些异步任务的结果,通过将任务与主线程分离,我们可以实现任务的并行操作,从而提高程序的执行效率
  • 并发操作:在多线程编程中,我们可能需要等待某些任务完成后才能执行其他操作,通过使用std::future,我们可以实现线程之间的同步,确保任务完成后再获取结果并继续执行后续结果
  • 结果获取:std::future提供了一种安全的方式来获取异步任务的结果。我们可以使用std::future::get()函数来获取任务的结果,此函数会阻塞当前线程,直到异步操作完成。这样,在调用get()函数时,我们可以确保已经获取到了所需的结果

使用方法

std::async

        std::async是一种将任务与std::future关联的简单方法,它创建并运行一个异步任务,并返回一个与该任务结果关联的std::future对象。默认情况下,std::async是否启动了一个新的线程,或者在等待future时,任务是否同步运行取决于你给的参数,这个参数为std::launch类型:

  1. std::launch::deferred 表明该函数会被延迟调用,直到在future上调用get()或者wait()才会开始执行任务
  2. std::launch::async表明函数会在自己创建的线程上运行
  3. std::launch::deferred | std::launch::async 内部通过系统等条件自动选择策略
#include <iostream>
#include <future>
#include <thread>
#include <chrono>

using namespace std;

int async_task()
{
    std::this_thread::sleep_for(std::chrono::seconds(2));
    return 2;
}

int main()
{
    // 关联异步任务async_task 和 future
    std::future<int> result = std::async(std::launch::deferred | std::launch::async, async_task);
    // 此时可以执行其他操作
    cout << "干其他事情" << endl;
    // 获取异步任务结果
    int ret = result.get();
    cout << ret << endl;
    return 0;
}

std::packaged_task

        std::packaged_task就是将任务和std::future绑定在一起的模板,是一种对任务的封装,我们可以通过std::packaged_task对象获取任务相关联的std::future对象,通过调用get_future()方法获取。std::packaged_task的模板参数是函数签名。

所谓函数签名就是一个函数头去掉函数名

下面介绍一下std::packaged_task,首先这个类型的对象是不可复制的

可以看到拷贝构造函数被delete了。

        std::packaged_task是用来包装异步任务的工具,它的本质是将一个可调用对象封装起来,和std::future结合起来,这个不能被直接调用,因为这样的实质是同步调用任务,而不是异步调用,并且std::packaged_task对象是没有返回值的,因为是不可拷贝的, 所以std::packaged_task对象在使用的时候,需要创建一个线程,然后使用智能指针或者move函数来进行传递。注意因为创建了一个新的线程,并且需要获取到这个新的线程执行任务的结果,所以我们就需要进行等待或者分离,即join和detach()。

使用move进行移动

#include <iostream>
#include <future>
#include <thread>
#include <memory>
#include <chrono>

int Add(int num1, int num2)
{
    std::this_thread::sleep_for(std::chrono::seconds(2));
    return num1 + num2;
}

int main()
{
    std::packaged_task<int(int, int)> task(Add);
    std::future<int> fu = task.get_future();
    std::thread th(std::move(task), 11, 22);

    int ret = fu.get();
    std::cout << ret << std::endl;
    
    th.join();
    return 0;
}

使用智能指针

#include <iostream>
#include <future>
#include <thread>
#include <memory>
#include <chrono>

int Add(int num1, int num2)
{
    std::this_thread::sleep_for(std::chrono::seconds(2));
    return num1 + num2;
}

int main()
{
    auto ptask = std::make_shared<std::packaged_task<int(int, int)>>(Add);
    std::future<int> fu = ptask->get_future();
    std::thread th([ptask]()
    {
        (*ptask)(11, 22);
    });

    int ret = fu.get();
    std::cout << ret << std::endl;
    
    th.join();
    return 0;
}

std::promise

        std::promise提供了一种设置值的方式,它可以在设置之后通过相关联的std::future对象进行读取,换种说法来说就是之前说过的std::future可以读取一个异步函数的返回值了,但是要等待就绪,而std::promise就提供了一个方式手动让std::future就绪

#include <iostream>
#include <future>
#include <chrono>

void task(std::promise<int> result_promise)
{
    int result = 2;
    result_promise.set_value(result);
    std::cout << result << std::endl;
}


int main()
{
    std::promise<int> result;
    std::future<int> reuslt_future = result.get_future();

    std::thread th(task, std::move(result));

    int ret = reuslt_future.get();
    std::cout << ret << std::endl;
    th.join();
    return 0;
}

线程池设计

#include <iostream>
#include <functional>
#include <memory>
#include <thread>
#include <future>
#include <mutex>
#include <condition_variable>
#include <vector>

class threadPool
{
public:
    using ptr = std::shared_ptr<threadPool>;
    using Functor = std::function<void(void)>;

    threadPool(int thr_count = 1)
        : _stop(false)
    {
        for (int i = 0; i < thr_count; i++)
        {
            _threads.emplace_back(&threadPool::entry, this);
        }
    }
    ~threadPool()
    {
        stop();
    }
    // push传入的是首先有一个函数--用户要执行的函数,接下来是不定参,标识要处理的数据就是要传入到函数中的参数
    // push函数内部,会将这个传入的函数封装成一个异步任务(packaged_task),
    // 使用 lambda 生成一个可调用对象(内部执行异步程序)抛入到任务池中,由工作线程取出进行执行

    template <typename F, typename ...Args>
    auto push(F &&func, Args &&...args) -> std::future<decltype(func(args...))>
    {
        // 1. 将传入的函数封装成一个packaged_task任务
        using return_type = decltype(func(args...));
        auto tmp_func = std::bind(std::forward<F>(func), std::forward<Args>(args)...);
        //auto task = std::make_shared<std::packaged_task<return_type(std::forward<Args>(args)...)>>(std::forward(func));
        auto task = std::make_shared<std::packaged_task<return_type()>>(tmp_func);
        std::future<return_type> fu = task->get_future();
        // 2. 构造一个lambda 匿名函数(捕获任务对象),函数内执行任务对象
        {
            std::unique_lock<std::mutex> lock(_mutex);
            // 3. 将构造出来的匿名函数,抛入到任务池中

            _taskpool.push_back([task]()
                                { (*task)(); });
            _cv.notify_one();
        }
        return fu;
    }

    void stop()
    {
        if(_stop == true) return ;
        _stop = true;
        _cv.notify_all();
        for (auto &thread : _threads)
        {
            thread.join();
        }
    }

private:
    // 线程入口函数---内部不断的从任务池中取出任务进行执行
    void entry()
    {
        while (!_stop)
        {
            std::vector<Functor> tmp_taskpool;
            {
                // 加锁
                std::unique_lock<std::mutex> lock(_mutex);
                // 等待任务池不为空,或者_stop 被置位返回true
                _cv.wait(lock, [this]()
                         { return _stop || !_taskpool.empty(); });
                // 取出任务进行执行
                tmp_taskpool.swap(_taskpool);   // 这里是将全局任务池里面的任务全部给一个线程里面的任务池    
            }
            for (auto &task : tmp_taskpool)
            {
                task();
            }
        }
    }

private:
    std::atomic<bool> _stop;
    std::vector<Functor> _taskpool;   // 任务池
    std::mutex _mutex;              // 互斥锁
    std::condition_variable _cv;    // 条件变量
    std::vector<std::thread> _threads;
};


main函数

#include "threadpool.hpp"


int Add(int num1, int num2)
{
    return num1 + num2;
}

int main()
{
    threadPool pool;
    for(int i = 0; i < 10; i++)
    {
        std::future<int> fu = pool.push(Add, 11, i);
        std::cout << fu.get() << std::endl;
    }
    pool.stop();
    return 0;
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2037086.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

lvm知识终结

、什么是 LVM LVM 是 Linux 下对磁盘分区进行管理的一种工具&#xff0c;适合管理大存储设备&#xff0c;并允许用户动态调整文件系统的大小 lvm 常用的命令 功能 PV 管理命令 VG 管理命令 LV 管理命令 scan 扫描 pvscan vgscan lvscan create 创建 pvcreate v…

水仙花语:花中情诗,心灵低语

一、水仙花语的丰富内涵 水仙花的花语丰富多样&#xff0c;其中“纯洁”是其最为显著的象征之一。水仙花洁白无瑕的花瓣&#xff0c;宛如纯洁无暇的心灵&#xff0c;给人以清新、高雅之感。这种纯洁不仅体现在花朵的外观上&#xff0c;更蕴含着一种纯净、美好的精神内涵&#x…

【JS逆向学习】AES加密文本如何在python中进行调用js解密

对于上边的图片我们用F12找到我们就发现一个无限循环debugger , 我的解决方案是关闭调试 在一个currentitems中发现全是加密的数据 我通过ctrl + shift +f 搜索AES P 这就是解密的js函数,存到js中,进行处理 const crypto = require(crypto) var CryptoJS = require(crypt…

2.7单窗口中显示多幅图像

目录 1.实验原理 2.实验代码 3.运行结果 1.实验原理 在Opencv中&#xff0c;我们可以综合利用坐标变换与Rect区域提取来实现单窗口显示多幅图像。首先根据输入图像个数与尺寸确定输入源图像小窗口的构成形态&#xff0c;然后设定每个图像小窗口的具体构成&#xff0c;包括边…

超级外链工具,可发9600条优质外链

超级外链工具&#xff0c;是一款在线全自动化发外链的推广工具。使用本工具可免费为网站在线批量增加外链&#xff0c;大大提高外链发布工作效率&#xff0c;是广大草根站长们必备的站长工具。 外链工具只是网站推广的辅助工具&#xff0c;一般适用于短时间内无法建设大量外链…

stm32入门学习16-闪存

&#xff08;一&#xff09;闪存 在之前的学习中&#xff0c;已经学习过了W25Q64这个外挂闪存&#xff0c;在stm32内部也有一块闪存&#xff0c;其主要用于存放我们编译的代码&#xff0c;如果我们需要一些掉电不丢失的数据&#xff0c;但是又懒得外挂一块闪存&#xff0c;就可…

树莓派4b换源+安装neo4j知识图谱

烧录树莓派系统&#xff0c; ssh 1.在SD中的boot区中&#xff0c;新建两个文件ssh(没有任何后缀)和wpa_supplicant.conf。 2.往wpa_supplicant.conf中写入 countryCN ctrl_interfaceDIR/var/run/wpa_supplicant GROUPnetdev update_config1 network{ ssid“wifi账号” psk“…

VS Code 配置docker 管理员权限终端

问题描述 在容器中需要使用sudo或者su root时候&#xff0c;权限不够&#xff0c;被灵魂提问。 然而&#xff0c;镜像是官方发布的&#xff0c;翻遍了githubissues也没有找到password. 解决 Attach shell 在docker插件中&#xff0c;attach shell 可以直接获得shell。 所…

AI顾投高级策略之六:马丁格尔策略

作者&#xff1a;老余捞鱼 原创不易&#xff0c;转载请标明出处及原作者。 写在前面的话&#xff1a; 在投资世界中&#xff0c;隐藏着众多精妙的策略&#xff0c;其中一些历经时间的考验&#xff0c;被基金经理们广泛使用。今天&#xff0c;我们要探讨的是马丁格尔策略…

不依靠for循环,Python如何对列表进行去重并保留排列顺序

在python中&#xff0c;我们想要从列表中删除重复元素&#xff0c;并且保留去重之前的先后排列顺序。在这里&#xff0c;我们本文不谈论for循环&#xff0c;我们来谈论其他的更优方法——OrderedDict和set。 要知道&#xff0c;OrderedDict可以通过保留插入顺序来实现元素去重…

【面试题】接雨水

接雨水 仅学习 一、问题描述 二、解调思路 这个问题是一个典型的双指针问题&#xff0c;也称为"接雨水问题"。我们可以通过遍历数组两次来解决这个问题&#xff1a;一次从左到右&#xff0c;一次从右到左&#xff0c;分别记录每个位置左边和右边的最大高度。然后&…

springboot学校防疫物资管理平台的设计与实现boot--论文源码调试讲解

第2章 开发环境与技术 本章节对开发学校防疫物资管理平台管理系统需要搭建的开发环境&#xff0c;还有学校防疫物资管理平台管理系统开发中使用的编程技术等进行阐述。 2.1 Java语言 Java语言是当今为止依然在编程语言行业具有生命力的常青树之一。Java语言最原始的诞生&…

C语言—扫雷项目

一、扫雷游戏分析和设计 &#xff08;1.1&#xff09;扫雷游戏功能说明 • 使⽤控制台实现经典的扫雷游戏 • 游戏可以通过菜单实现继续玩或者退出游戏 • 扫雷的棋盘是9*9的格⼦ • 默认随机布置10个雷 • 可以排查雷 ◦ 如果位置不是雷&#xff0c;就显⽰周围有⼏个雷 ◦ 如…

C# 元组类型详解与示例

文章目录 1. 元组的基本概念1.1 元组的定义1.2、元组的类型 2. 元组的特性2.1 元组的命名元素2.2 元组的类型推断2.3 元组的结构2.4 元组的比较和哈希 3. 元组的实际应用3.1 方法返回多个值3.2 作为数据容器3.3 元组与数据结构 4. 总结 元组&#xff08;Tuple&#xff09;是 C#…

【AI】机器学习基本概念详解1

友情链接&#xff1a;numpy使用、SciPy、Matplotlib 定期更新&#xff0c;建议关注、点赞、收藏。 目录 监督学习 or 非监督学习clustering & non-clustering分类 or 回归线性回归逻辑回归 目标函数or 损失函数规范化归一化标准化正则化&#xff08;惩罚penalty&#xff09…

如何制作优秀的年终总结PPT?

制作优秀的年终总结PPT&#xff0c;是每位职场人士在年底时的一项重要任务。 一个优秀的年终总结PPT不仅能够清晰地展示你过去一年的工作成果&#xff0c;还能让领导对你的工作能力和态度留下深刻印象。 下面&#xff0c;我将从几个方面详细讲解如何制作这样的PPT&#xff0c…

杰理AC7916与MK米客方德SD NAND:高效适配,卓越存储

杰理科技的AC7916是一款高性能、多功能的系统级芯片&#xff08;SoC&#xff09;&#xff0c;广泛应用于智能终端产品&#xff0c;如AI智能音箱、蓝牙音箱、蓝牙耳机等。这款芯片以其卓越的性能和丰富的接口选项&#xff0c;满足了多样化的市场需求。 AC7916接口及性能特点 AC…

如何避免常见的R语言学习陷阱?

学习R语言不仅能够增强数据分析能力&#xff0c;还能开拓解决复杂问题的新方法。然而&#xff0c;在学习R语言的过程中&#xff0c;许多初学者会遇到各种陷阱&#xff0c;这些陷阱不仅会延缓学习进度&#xff0c;还可能导致学习动机的丧失。 陷阱一&#xff1a;忽视基础知识 基…

Java面向对象与封装

目录 封装 封装引入 private修饰符 this关键字 构造函数 JavaBean 标准JavaBean JavaBean中的成员与数据库的关系 static关键字 static基本使用 static关键字访问特点 可变参数 对象数组与传值/址调用 对象数组 传值调用与传址调用 命令行参数 封装 封装引入 …