仿RabbitMq实现简易消息队列基础篇(future操作实现异步线程池)

news2025/1/10 1:35:57
@TOC

介绍        

        std::future 是C++11标准库中的一个模板类,他表示一个异步操作的结果,当我们在多线程编程中使用异步任务时,std::future可以帮助我们在需要的时候,获取任务的执行结果,std::future 的一个重要特性是能够阻塞当前线程,直到异步操作完成,从而确保我们在获取结果时不会遇到未完成的操作。

应用场景

  • 异步任务:当我们需要在后台执行一些耗时操作时,如网络请求或计算密集型任务等,std::future可以用来表示这些异步任务的结果,通过将任务与主线程分离,我们可以实现任务的并行操作,从而提高程序的执行效率
  • 并发操作:在多线程编程中,我们可能需要等待某些任务完成后才能执行其他操作,通过使用std::future,我们可以实现线程之间的同步,确保任务完成后再获取结果并继续执行后续结果
  • 结果获取:std::future提供了一种安全的方式来获取异步任务的结果。我们可以使用std::future::get()函数来获取任务的结果,此函数会阻塞当前线程,直到异步操作完成。这样,在调用get()函数时,我们可以确保已经获取到了所需的结果

使用方法

std::async

        std::async是一种将任务与std::future关联的简单方法,它创建并运行一个异步任务,并返回一个与该任务结果关联的std::future对象。默认情况下,std::async是否启动了一个新的线程,或者在等待future时,任务是否同步运行取决于你给的参数,这个参数为std::launch类型:

  1. std::launch::deferred 表明该函数会被延迟调用,直到在future上调用get()或者wait()才会开始执行任务
  2. std::launch::async表明函数会在自己创建的线程上运行
  3. std::launch::deferred | std::launch::async 内部通过系统等条件自动选择策略
#include <iostream>
#include <future>
#include <thread>
#include <chrono>

using namespace std;

int async_task()
{
    std::this_thread::sleep_for(std::chrono::seconds(2));
    return 2;
}

int main()
{
    // 关联异步任务async_task 和 future
    std::future<int> result = std::async(std::launch::deferred | std::launch::async, async_task);
    // 此时可以执行其他操作
    cout << "干其他事情" << endl;
    // 获取异步任务结果
    int ret = result.get();
    cout << ret << endl;
    return 0;
}

std::packaged_task

        std::packaged_task就是将任务和std::future绑定在一起的模板,是一种对任务的封装,我们可以通过std::packaged_task对象获取任务相关联的std::future对象,通过调用get_future()方法获取。std::packaged_task的模板参数是函数签名。

所谓函数签名就是一个函数头去掉函数名

下面介绍一下std::packaged_task,首先这个类型的对象是不可复制的

可以看到拷贝构造函数被delete了。

        std::packaged_task是用来包装异步任务的工具,它的本质是将一个可调用对象封装起来,和std::future结合起来,这个不能被直接调用,因为这样的实质是同步调用任务,而不是异步调用,并且std::packaged_task对象是没有返回值的,因为是不可拷贝的, 所以std::packaged_task对象在使用的时候,需要创建一个线程,然后使用智能指针或者move函数来进行传递。注意因为创建了一个新的线程,并且需要获取到这个新的线程执行任务的结果,所以我们就需要进行等待或者分离,即join和detach()。

使用move进行移动

#include <iostream>
#include <future>
#include <thread>
#include <memory>
#include <chrono>

int Add(int num1, int num2)
{
    std::this_thread::sleep_for(std::chrono::seconds(2));
    return num1 + num2;
}

int main()
{
    std::packaged_task<int(int, int)> task(Add);
    std::future<int> fu = task.get_future();
    std::thread th(std::move(task), 11, 22);

    int ret = fu.get();
    std::cout << ret << std::endl;
    
    th.join();
    return 0;
}

使用智能指针

#include <iostream>
#include <future>
#include <thread>
#include <memory>
#include <chrono>

int Add(int num1, int num2)
{
    std::this_thread::sleep_for(std::chrono::seconds(2));
    return num1 + num2;
}

int main()
{
    auto ptask = std::make_shared<std::packaged_task<int(int, int)>>(Add);
    std::future<int> fu = ptask->get_future();
    std::thread th([ptask]()
    {
        (*ptask)(11, 22);
    });

    int ret = fu.get();
    std::cout << ret << std::endl;
    
    th.join();
    return 0;
}

std::promise

        std::promise提供了一种设置值的方式,它可以在设置之后通过相关联的std::future对象进行读取,换种说法来说就是之前说过的std::future可以读取一个异步函数的返回值了,但是要等待就绪,而std::promise就提供了一个方式手动让std::future就绪

#include <iostream>
#include <future>
#include <chrono>

void task(std::promise<int> result_promise)
{
    int result = 2;
    result_promise.set_value(result);
    std::cout << result << std::endl;
}


int main()
{
    std::promise<int> result;
    std::future<int> reuslt_future = result.get_future();

    std::thread th(task, std::move(result));

    int ret = reuslt_future.get();
    std::cout << ret << std::endl;
    th.join();
    return 0;
}

线程池设计

#include <iostream>
#include <functional>
#include <memory>
#include <thread>
#include <future>
#include <mutex>
#include <condition_variable>
#include <vector>

class threadPool
{
public:
    using ptr = std::shared_ptr<threadPool>;
    using Functor = std::function<void(void)>;

    threadPool(int thr_count = 1)
        : _stop(false)
    {
        for (int i = 0; i < thr_count; i++)
        {
            _threads.emplace_back(&threadPool::entry, this);
        }
    }
    ~threadPool()
    {
        stop();
    }
    // push传入的是首先有一个函数--用户要执行的函数,接下来是不定参,标识要处理的数据就是要传入到函数中的参数
    // push函数内部,会将这个传入的函数封装成一个异步任务(packaged_task),
    // 使用 lambda 生成一个可调用对象(内部执行异步程序)抛入到任务池中,由工作线程取出进行执行

    template <typename F, typename ...Args>
    auto push(F &&func, Args &&...args) -> std::future<decltype(func(args...))>
    {
        // 1. 将传入的函数封装成一个packaged_task任务
        using return_type = decltype(func(args...));
        auto tmp_func = std::bind(std::forward<F>(func), std::forward<Args>(args)...);
        //auto task = std::make_shared<std::packaged_task<return_type(std::forward<Args>(args)...)>>(std::forward(func));
        auto task = std::make_shared<std::packaged_task<return_type()>>(tmp_func);
        std::future<return_type> fu = task->get_future();
        // 2. 构造一个lambda 匿名函数(捕获任务对象),函数内执行任务对象
        {
            std::unique_lock<std::mutex> lock(_mutex);
            // 3. 将构造出来的匿名函数,抛入到任务池中

            _taskpool.push_back([task]()
                                { (*task)(); });
            _cv.notify_one();
        }
        return fu;
    }

    void stop()
    {
        if(_stop == true) return ;
        _stop = true;
        _cv.notify_all();
        for (auto &thread : _threads)
        {
            thread.join();
        }
    }

private:
    // 线程入口函数---内部不断的从任务池中取出任务进行执行
    void entry()
    {
        while (!_stop)
        {
            std::vector<Functor> tmp_taskpool;
            {
                // 加锁
                std::unique_lock<std::mutex> lock(_mutex);
                // 等待任务池不为空,或者_stop 被置位返回true
                _cv.wait(lock, [this]()
                         { return _stop || !_taskpool.empty(); });
                // 取出任务进行执行
                tmp_taskpool.swap(_taskpool);   // 这里是将全局任务池里面的任务全部给一个线程里面的任务池    
            }
            for (auto &task : tmp_taskpool)
            {
                task();
            }
        }
    }

private:
    std::atomic<bool> _stop;
    std::vector<Functor> _taskpool;   // 任务池
    std::mutex _mutex;              // 互斥锁
    std::condition_variable _cv;    // 条件变量
    std::vector<std::thread> _threads;
};


main函数

#include "threadpool.hpp"


int Add(int num1, int num2)
{
    return num1 + num2;
}

int main()
{
    threadPool pool;
    for(int i = 0; i < 10; i++)
    {
        std::future<int> fu = pool.push(Add, 11, i);
        std::cout << fu.get() << std::endl;
    }
    pool.stop();
    return 0;
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2047319.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【Java学习】Stream流详解

所属专栏&#xff1a;Java学习 Stream流是JDK 8引入的一个概念&#xff0c;它提供了一种高效且表达力强的方式来处理数据集合&#xff08;如List、Set等&#xff09;或数组。Stream API可以以声明性方式&#xff08;指定做什么&#xff09;来处理数据序列。流操作可以被分为两大…

GD32 ADC配置跳坑

GD32 ADC配置跳坑 &#xff1a;时钟使能配置需在ADC前面 放在后面读取ADC值失败。 DMA配置放在ADC配置后面可以正常读取ADC的值 不同的模式选择可能会导致ADC存在读取失败的问题&#xff0c;红色部分是常用的模式&#xff0c;一般可以读取到相应的ADC的值 adc_software_trigge…

优雅谈大模型:Python编程篇

Python在机器学习领域的地位十分关键&#xff0c;虽然后面有Julia&#xff0c;Mojo等其他对手的挑战&#xff0c;然而Python拥有庞大的机器学习库和框架&#xff0c;尤其是生态系统比以往任何时候又强大了不少。从另外维度它和Java&#xff0c;Scala&#xff0c;Go&#xff0c;…

游戏安全入门-扫雷分析远程线程注入

前言 无论学习什么&#xff0c;首先&#xff0c;我们应该有个目标&#xff0c;那么入门windows游戏安全&#xff0c;脑海中浮现出来的一个游戏 – 扫雷&#xff0c;一款家喻户晓的游戏&#xff0c;虽然已经被大家分析的不能再透了&#xff0c;但是我觉得自己去分析一下还是极好…

适配器模式, 修饰器模式 与 代理模式

这三种模式, 感觉非常类似, 都是把核心类包一层, 在外部做一些额外的事情, 我还没发现他们之间具体的区别, 有想法的同学, 可以评论或者私聊我 适配器模式 简介: 就是在目标类外面包一层, 用以适配其他的模块,兼容整个程序框架 举个例子: 比如运动员, 中国运动员参加法国奥运…

市域社会治理平台规划建设方案

1. 建设背景与市域治理定义 市域社会治理作为国家治理体系的重要组成部分&#xff0c;具有承上启下的枢纽作用。2019年&#xff0c;全国市域社会治理现代化工作会议提出了推进市域社会治理现代化的总体思路&#xff0c;强调以城带乡、以点带面&#xff0c;明确了市域治理的方向…

[项目]文海泛舟测试报告

目录 一、项目背景 二、项目功能 三、功能测试 1. 测试用例&#xff1a; 2. 实际测试的部分&#xff08;含截图&#xff09; 1. 正常登录 2. 文章列表页显示/登录用户信息显示 3. 文章详情页内容显示/文章作者信息显示 4. 编辑功能 1. 点击“更新博客”按钮前 2. 点击…

前端开发攻略---Vue实现图像裁剪功能,支持用户通过图形界面进行裁剪区域的调整,最终生成裁剪后的图像。

目录 1、演示 2、实现原理 3、实现功能 4、代码 1、演示 2、实现原理 这里有详细介绍&#xff1a; 前端开发攻略---图片裁剪上传的原理-CSDN博客 3、实现功能 上传图像&#xff1a; 用户选择文件后&#xff0c;changeFile 方法读取文件内容并将其转换为 Data URL&#xff0c…

Amesim中动力电池建模方法与原则简介

引言 新能源动力电池一维仿真与三维仿真的主要区别在与&#xff0c;一维仿真中无法在仿真中精准的得到各个点的温度变化&#xff0c;其仅为质量块的平均温度。而在新能源动力电池一维仿真中&#xff0c;旨在对动力电池的策略、充放电时间等进行验证。而无论是策略还是充放电时…

jmreport测试数据库出现 权限不足,此功能需要分配角色 解决方法

目录 前言1. 问题所示2. 原理分析3. 解决方法前言 关于jmreport的补充可看官网:jmreport上线安全配置 1. 问题所示 jmreport测试数据库出现,出现如下所示的问题:权限不足,此功能需要分配角色! 截图如下所示: 2. 原理分析 对于原理分析的Bug,代表当前用户没有足够的…

HDFS的编程

一、HDFS原理 HDFS(Hadoop Distributed File System)是hadoop生态系统的一个重要组成部分,是hadoop中的的存储组件,在整个Hadoop中的地位非同一般,是最基础的一部分,因为它涉及到数据存储,MapReduce等计算模型都要依赖于存储在HDFS中的数据。HDFS是一个分布式文件系统,…

20款必试AI工具:轻松搞定设计到协作

随着人工智能技术的发展&#xff0c;各种AI工具如雨后春笋般涌现&#xff0c;给我们的工作和生活带来了极大便利。 在AI工具的海洋中&#xff0c;哪一款才是你的真命天子&#xff1f; 众所周知&#xff0c;AI工具如雨后春笋般涌现&#xff0c;让人目不暇接。面对琳琅满目的选…

Oracle 字符串转多行(REGEXP_SUBSTR)

方案一&#xff1a; SQL 1.一个数据表(TABLE1_ZK)中存在一个字段(STRS)&#xff08;存储格式是以【,】隔开的字符串&#xff09; 2.现需要将其查分为多行数据&#xff08;每行为其中一个字符串&#xff09; 3.sql SELECT t.id,REGEXP_SUBSTR(t.STRS, [^,], 1, LEVEL) AS ma…

招聘|头部云厂商招 PG 核心骨干 DBA【上海】

我们的招聘专区又回来了&#xff01;&#x1f3c3; Bytebase 作为先进的数据库 DevOps 团队协同工具 &#x1f527;&#xff0c;用户群里汇聚了 &#x1f497; 业界优秀的 DBA&#xff0c;SRE&#xff0c;运维的同学们 &#x1f31f;。 上周用户群里有小伙伴发招聘信息 &…

【观察者模式】设计模式系列: 实现与最佳实践案例分析

文章目录 观察者模式深入解析&#xff1a;在Java中的实现与应用1. 引言1.1 观察者模式简介1.2 模式的重要性及其在现实世界的应用示例1.3 本文的目标和读者定位 2. 观察者模式的基本概念2.1 定义与原理2.2 UML类图和时序图2.3 核心原则2.4 使用场景 3. 观察者模式与其他模式的关…

【数据结构】Java实现链表

目录 链表的概念 链表的实现 链表的功能 框架搭建 功能实现 打印链表 获取数据数量 查询数据 插入数据 头插法 尾插法 指定位置插入 删除数据 删除一个数据 删除多个相同数据 删除链表 完整代码 链表的概念 链表是一种物理存储结构上非连续存储结构&#xff0…

nosql----redis三主三从集群部署

&#x1f49d;&#x1f49d;&#x1f49d;欢迎来到我的博客&#xff0c;很高兴能够在这里和您见面&#xff01;希望您在这里可以感受到一份轻松愉快的氛围&#xff0c;不仅可以获得有趣的内容和知识&#xff0c;也可以畅所欲言、分享您的想法和见解。 推荐:Linux运维老纪的首页…

【uniapp/uview1.x】解决在 u-popup 弹出层中使用 u-calendar 日历组件弹出方向出 bug 的问题

这个方法适用 uview 1.x 版本&#xff1b; 如果这个方法不适用可能是 uview 版本不一样&#xff0c;可以参考&#xff1a;https://github.com/dcloudio/uni-ui/issues/915 试试看 bug 的效果如图所示&#xff1a; 因为我为 popup 设置的方向为 top&#xff1a; <u-popup …

人工智能算法,图像识别技术;基于大语言模型的跨境商品识别与问答系统;图像识别

目录 一 .研究背景 二,大语言模型介绍 三,数据采集与预处理 商品识别算法 四. 跨境商品问答系统设计 五.需要源码联系 一 .研究背景 在当今全球化的背景下&#xff0c;跨境电商行业迅速发展&#xff0c;为消费者提供了更广泛的购物选择和更便利的购物方式。然而&#xf…

OLED屏幕制造工艺流程

OLED屏幕制造工艺流程是一个复杂且精细的过程&#xff0c;涉及多个关键步骤以确保最终的显示效果和性能。以下是OLED屏幕制造工艺流程的主要步骤&#xff1a; 1. 衬底制作与准备 材料选择&#xff1a;OLED器件需要一个透明的导电衬底&#xff0c;通常使用玻璃或塑料材料。 清…