使用C++11开发一个半同步半异步线程池

news2024/11/17 22:31:19

半同步半异步线程池介绍

        在处理大量并发任务的时候,如果按照传统的方式,一个请求一个线程来处理请求任务,大量的线程创建和销毁将消耗过多的系统资源,还增加了线程上下文切换的开销,而通过线程池技术就可以很好的解决这些问题。线程池技术通过在系统预先创建一定数量的线程,当任务请求到来时从线程池中分配一个预先创建的线程去处理任务,线程在处理完任务之后还可以重用,不会销毁,而再等待下次任务的到来。这样,通过线程池能避免大量的线程创建和销毁动作,从而节省系统资源,这样做的一个好处是,对于多核处理器,由于线程会被分配到多个CPU,会提高并行处理的效率。另一个好处是每个线程独立阻塞,可以防止主线程被阻塞而使主流程被阻塞,导致其他的请求得不到响应的问题。

        线程池分为半同步半异步线程池和领导者追随线程池,本章将主要介绍半同步半异步线程池,这种线程池在实现上更简单,使用的也比较多,也比较方便。半同步半异步线程池分成三层。

        第一层是同步服务层,它处理来自上层的任务请求,上层的请求可能是并发的,这些请求不是马上就会被处理,而是将这些任务放到一个同步排队层中,等待处理。第二层是同步排队层,来自上层的任务请求都会加到排队层中等待处理。第三层是异步服务层,这一层中会有很多个线程同时处理排队层中的任务,异步服务层从同步排队层中取出任务并行的处理。

        这种三层的结构可以最大程度处理上层的并发请求。对于上层来说只要将任务丢到同步队列中就行了,至于谁去处理,什么时候处理都不用关心,主线程也不会阻塞,还能继续发起新的请求。至于任务具体怎么处理,这些细节都是靠异步服务层的多线程并行来完成的,这些线程是一开始就创建的,不会因为大量的任务到来而创建新的线程,避免了频繁创建和销毁线程导致的系统开销,而且通过多核能处理大幅提高处理效率。

线程池实现的关键技术分析

        线程池是由三层组成的:同步服务层,排队层和异步服务层,其中排队层居于核心地位,因为上层会将任务加到排队层中,异步服务层同时也会取出任务,这里有一个同步的过程。在实现时,排队层就是一个同步队列,允许多个线程同时去添加或取出任务,并且要保证操作过程是安全的。线程池有两个活动记录,一个是往同步队列中添加任务,另一个是从同步队列中取任务的过程。

半同步半异步线程活动图

          从活动图中可以看到线程池的活动过程,一开始线程池会启动一定数量的线程,这些线程属于异步层,主要用来并行处理排队层中的任务 ,如果排队层中的任务数为空,则这些线程等待任务的到来。如果发现排队层中有任务了,线程池则会从等待的这些线程中唤醒一个来处理新任务。同步服务层则会不断的将新的任务添加到同步排队层中,这里有一个问题值得注意,有可能上层的任务非常多,而任务又非常耗时,这是,异步层中的线程处理不过来,则同步排队层中的任务会不断添加,如果同步排队层不加上限控制,则可能会导致排队层中的任务过多,内存报错的问题。因此,排队层需要加上上限控制,当排队层中的任务数量达到上限,就不让上层的任务添加进来,起到限制和保护的作用。

同步队列

        同步队列即为线程中三层结构中的中间那一层,它的主要作用是保证队列中共享数据线程安全,还为上一层服务层提供添加新任务的接口,以及为下一层异步服务层提供取任务的接口。同时,还要限制任务数的上限,避免任务过多导致内存暴涨的问题。同步队列的锁是用来线程同步,条件变量是用来实现线程通信,即线程池空了就要等待,不为空就通知一个线程去处理;线程池满了就等待,直到没有满的时候才通知上层添加新的任务。同步队列的具体实现如下所示:

#include <list>
#include <mutex>
#include <thread>
#include <condition_variable>
#include <iostream>
using namespace std;

template<typename T>
class SyncQueue
{
public:
    SyncQueue(int maxSize = 1024) : m_maxSize(maxSize), m_needStop(false){}

    void Put(const T& x)
    {
        Add(x);
    }

    void Put(T&& x)
    {
        Add(std::forward<T>(x));
    }

    void Take(std::list<T>& list)
    {
        std::unique_lock<std::mutex> locker(m_mutex);
        m_notEmpty.wait(locker, [this]{ return m_needStop || NotEmpty(); });

        if (m_needStop)
        {
            return;
        }

        list = std::move(m_queue);

        m_notFull.notify_one();
    }

    void Take(T& t)
    {
        std::unique_lock<std::mutex> locker(m_mutex);
        m_notEmpty.wait(locker, [this]{ return m_needStop || NotEmpty(); });

        if (m_needStop)
        {
            return;
        }

        t = m_queue.front();
        m_queue.pop_front();

        m_notFull.notify_one();
    }


    void Stop()
    {
        {
            std::lock_guard<std::mutex> locker(m_mutex);
            m_needStop = true;
        }

        m_notFull.notify_all();
        m_notEmpty.notify_all();
    }

    bool Empty()
    {
        std::lock_guard<std::mutex> locker(m_mutex);
        return m_queue.empty();
    }

    bool Full()
    {
        std::lock_guard<std::mutex> locker(m_mutex);
        return m_queue.size() == m_maxSize;
    }

    size_t Size()
    {
        std::lock_guard<std::mutex> locker(m_mutex);
        return m_queue.size();
    }

    int Count()
    {
        return m_queue.size();
    }

private:
    bool NotFull() const
    {
        bool full = m_queue.size() > m_maxSize;
        if (full)
        {
            cout << "缓冲区满了,需要等待..." << endl;
        }

        return !full;
    }

    bool NotEmpty() const
    {
        bool empty = m_queue.empty();
        if (empty)
        {
            cout << "缓冲区空了,需要等待...异步层的线程ID: " << this_thread::get_id() << endl;
        }

        return !empty;
    }

    template<typename F>
    void Add(F&& x)
    {
        std::unique_lock<std::mutex> locker(m_mutex);
        m_notFull.wait(locker, [this]{ return m_needStop || NotFull(); });

        if (m_needStop)
        {
            return;
        }

        m_queue.push_back(std::forward<F>(x));

        m_notEmpty.notify_one();
    }

private:
    int                     m_maxSize;  ///同步队列最大的size
    bool                    m_needStop; ///停止的标志
    std::list<T>            m_queue;    ///缓冲区
    std::mutex              m_mutex;    ///互斥量和条件变量结合使用
    std::condition_variable m_notEmpty; ///不为空的条件变量
    std::condition_variable m_notFull;  ///没有满的条件变量

};


        Take(std::list<T>& list)函数做到了一次加锁就能将队列中所有数据都取出来,从而大大减少加锁的次数。在获取互斥锁之后,我们不再只获取一条数据,而是通过std::move来将队列中所有数据move到外面去,这样既大大减小了获取数据加锁的次数,又直接通过移动避免了数据的复制,提高了性能。

        下面具体介绍同步队列的3个函数Take,Add和Stop的实现。

Take函数

        先创建一个unique_lock获取mutex,然后再通过条件变量m_notEmpty来等待判断式,判断式由两个条件组成,一个是停止的标志,另一个是不为空的条件,当不满足任何一个条件时,条件变量会释放mutex并将线程置于waiting状态,等待其他线程调用notify_one/notify_all将其唤醒;当满足任何一个条件时,则继续往下执行后面的逻辑,即将队列中的任务取出,并唤醒一个正处于等待状态的添加任务的线程去添加任务。当处于waiting状态的线程被notify_one或notify_all唤醒时,条件变量会先重新获取mutex,然后再检查条件是否满足,如果满足,则往下执行,如果不满足,则释放mutex继续等待。

    void Take(std::list<T>& list)
    {
        std::unique_lock<std::mutex> locker(m_mutex);
        m_notEmpty.wait(locker, [this]{ return m_needStop || NotEmpty(); });

        if (m_needStop)
        {
            return;
        }

        list = std::move(m_queue);

        m_notFull.notify_one();
    }

Add函数   

        Add的过程和Take的过程类似,也是先获取mutex,然后检查条件是否满足,不满足条件,释放mutex继续等待,如果条件满足,则将新任务插入到队列中,并唤醒取任务的线程去取数据。

    template<typename F>
    void Add(F&& x)
    {
        std::unique_lock<std::mutex> locker(m_mutex);
        m_notFull.wait(locker, [this]{ return m_needStop || NotFull(); });

        if (m_needStop)
        {
            return;
        }

        m_queue.push_back(std::forward<F>(x));

        m_notEmpty.notify_one();
    }

Stop函数 

         Stop函数先获取mutex,然后将停止标志置为true。注意,为了保证线程安全,这里需要先获取mutex,在将其标志置为true,再唤醒所有等待的线程,因为等待的条件是m_needStop,并且满足条件,所以线程会继续往下执行。由于线程在m_needStop为true时会退出,所以所有的等待的线程也相继退出。另外一个值得注意的地方,我们把m_notFull.notify_all()放到lock_guard保护范围之外,这里也可以将把m_notFull.notify_all()放到lock_guard保护范围之内,放到外面是为了做一点优化。因为notify_one或notify_all会唤醒一个在等待的线程,线程被唤醒后先获取mutex再检查条件是否满足,如果这时被lock_guard保护,被唤醒的线程则需要lock_guard析构释放mutex才能获取。如果在lock_guard之外notfiy_one或notify_all,被唤醒的线程获取锁的时候不需要等待lock_guard释放锁,性能会好一点,所以在执行notify_one或notify_all时不需要加锁保护。

    void Stop()
    {
        {
            std::lock_guard<std::mutex> locker(m_mutex);
            m_needStop = true;
        }

        m_notFull.notify_all();
        m_notEmpty.notify_all();
    }

             

线程池

        一个完整的线程池包括三层:同步服务层,排队层和异步服务层,其实这也是一种生产者-消费者模型,同步层是生产者,不断将新任务丢到排队层中,因此,线程池需要提供一个添加新任务的接口供生产者使用;消费者是异步层,具体是由线程池中预先创建的线程去处理排队层中的任务。排队层是一个同步队列,它内部保证了上下两层对共享数据的安全访问,同时还要保证队列不会被无限制地添加任务导致内存暴涨。另外,线程池还要提供一个停止的接口,让用户能够在需要的时候停止线程池的运行,下面是线程池的实现方式:

#include <list>
#include <thread>
#include <functional>
#include <memory>
#include <atomic>
#include "sync.h"

const int MaxTaskCount = 100;
class ThreadPool
{
public:
    using Task = std::function<void()>;
    ThreadPool(int numThreads = std::thread::hardware_concurrency()) :m_queue(MaxTaskCount)
    {
        printf("create thread num:%d\n", numThreads);
        Start(numThreads);
    }

    ~ThreadPool()
    {
        Stop();
    }

    void Stop()
    {
        ///保证多线程情况下只调用一次StopThreadGroup
        std::call_once(m_flag, [this]{StopThreadGroup(); });
    }

    void AddTask(Task&& task)
    {
        m_queue.Put(std::forward<Task>(task));
    }

    void AddTask(const Task& task)
    {
        m_queue.Put(task);
    }

private:
    void Start(int numThreads)
    {
        m_running = true;

        for(int i = 0; i < numThreads; i++)
        {
            ///创建线程组
            m_threadgroup.push_back(std::make_shared<std::thread>(&ThreadPool::RunInThread, this));
        }
    }
    void RunInThread()
    {
        while(m_running)
        {
            ///取任务分别执行
            std::list<Task> list;

            m_queue.Take(list);

            for(auto &task : list)
            {
                if (!m_running)
                    return;

                task();
            }
        }
    }

    void StopThreadGroup()
    {
        m_queue.Stop();

        m_running = false;

        for(auto thread : m_threadgroup)
        {
            if (thread)
            {
                thread->join();
            }
        }

        m_threadgroup.clear();
    }

private:
    std::list<std::shared_ptr<std::thread>>     m_threadgroup;    ///处理任务的线程组
    SyncQueue<Task>                             m_queue;          ///同步队列
    atomic_bool                                 m_running;        ///是否停止的标志
    std::once_flag                              m_flag;
};


ThreadPool pool;

void func1()
{
    for(int i = 0; i < 10; i++)
    {
        auto thdId = this_thread::get_id();
        pool.AddTask([thdId]{cout << "同步层线程1的线程ID:" << thdId << endl;} );
    }
}

void func2()
{
    for(int i = 0; i < 10; i++)
    {
        auto thdId = this_thread::get_id();
        pool.AddTask([thdId]{cout << "同步层线程2的线程ID:" << thdId << endl;} );
    }
}

int main()
{
    thread t1(func1);
    thread t2(func2);

    getchar();

    pool.Stop();
    t1.join();
    t2.join();

    return 0;
}


        在上面的例子中,ThreadPool有3个成员,一个是线程组,这个线程组中的线程是预先创建的,应该创建多少个线程由外部传入,一般建议创建CPU核数的线程以达到最优的效率,线程组循环从同步队列中取出任务并执行,如果线程池为空,线程组将处于等待状态,等待任务的到来。另外一个成员变量是同步队列,它不仅用来做线程同步,还用来限制同步队列的上限,这个上限也是由使用者设置的。第三个成员变量是用来停止线程池的,为了保证线程的安全。

        在这个例子中,外部线程将不断的像线程池中添加新任务,线程池内部的线程将会并行处理同步队列中的任务。

缓冲区空了,需要等待...异步层的线程ID: 140667074565888
缓冲区空了,需要等待...异步层的线程ID: 140666806126336
同步层线程1的线程ID:140666537686784
同步层线程1的线程ID:缓冲区空了,需要等待...异步层的线程ID: 140666537686784
同步层线程2的线程ID:140666269247232
同步层线程2的线程ID:140666269247232
同步层线程2的线程ID:140666269247232
同步层线程2的线程ID:140666269247232
140666806126336同步层线程2的线程ID:140666269247232
同步层线程2的线程ID:140666269247232
同步层线程2的线程ID:140666269247232
同步层线程2的线程ID:140666269247232
同步层线程2的线程ID:140666269247232

同步层线程2的线程ID:140666269247232
同步层线程1的线程ID:140666537686784
同步层线程1的线程ID:140666537686784
同步层线程1的线程ID:140666537686784
同步层线程1的线程ID:140666537686784
同步层线程1的线程ID:140666537686784
同步层线程1的线程ID:140666537686784
同步层线程1的线程ID:140666537686784
同步层线程1的线程ID:140666537686784
缓冲区空了,需要等待...异步层的线程ID: 140667074565888

        测试结果如上所示。

        由测试结果可以看到,线程池初始化创建了两个内部的线程,线程ID分别是140667074565888和140666806126336,由于初始化时,线程池中的同步队列是空的,所以这两个线程将进入等待状态,直到队列中有数据时才开始处理数据。线程池的上层有两个线程,线程ID分别是140666537686784和140666269247232,这两个线程不断往线程池中添加数据,这些数据会被添加到排队层中,供异步服务层的线程处理。最终的结果是,异步层的线程交替处理来自上层的任务,交替打印出上层的线程ID,缓冲区空了就会等待,满了之后也会等待,不允许无限制添加任务。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1313836.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

命令执行 [BUUCTF 2018]Online Tool1

打开题目 我们代码审计一下 if (isset($_SERVER[HTTP_X_FORWARDED_FOR])) { $_SERVER[REMOTE_ADDR] $_SERVER[HTTP_X_FORWARDED_FOR]; } 如果存在xxf头且不为空&#xff0c;则将xxf头内容&#xff08;真实的客户端ip&#xff09;赋给ROMOTE_ADDR&#xff08;代理服务器传过…

[pluginviteimport-analysis] vite 提示jsx语法报错

参考文章 https://segmentfault.com/q/1010000043499356https://blog.csdn.net/kkkkkkgg/article/details/131168224 报错内容 内容类似如下&#xff1a; 03:16:26 [vite] Internal server error: Failed to parse source for import analysis because the content contains…

上海亚商投顾:沪指收复3000点,房地产板块集体走强

上海亚商投顾前言&#xff1a;无惧大盘涨跌&#xff0c;解密龙虎榜资金&#xff0c;跟踪一线游资和机构资金动向&#xff0c;识别短期热点和强势个股。 一.市场情绪 沪指昨日窄幅震荡&#xff0c;创业板指走势较弱&#xff0c;科创50指数跌近1%。房地产板块集体走强&#xff0…

Linux服务器开发太麻烦? 试试IntelliJ IDEA公网远程访问开发极大提升开发效率

文章目录 1. 检查Linux SSH服务2. 本地连接测试3. Linux 安装Cpolar4. 创建远程连接公网地址5. 公网远程连接测试6. 固定连接公网地址7. 固定地址连接测试 本文主要介绍如何在IDEA中设置远程连接服务器开发环境&#xff0c;并结合Cpolar内网穿透工具实现无公网远程连接&#xf…

C语言刷题数组------数组交换

输入一个长度为 10的整数数组 X[10]&#xff0c;将里面的非正整数全部替换为 1&#xff0c;输出替换完成后的数组。 输入格式 输入包含 10个整数&#xff0c;每个整数占一行。输出格式 输出新数组中的所有元素&#xff0c;每个元素占一行。输出格式为 X[i] x&#xff0c;其中…

spring面试:二、bean的生命周期和循环引入问题(三级缓存、@Lazy)

bean的生命周期 Spring容器在进行实例化时&#xff0c;会将xml配置的的信息封装成一个BeanDefinition对象&#xff0c;Spring根据BeanDefinition来创建Bean对象&#xff0c;里面有很多的属性用来描述Bean。 其中比较重要的是&#xff1a; beanClassName&#xff1a;bean 的类…

焊盘:十字连接VS全覆盖 铺铜

在铺铜规则中&#xff0c;焊盘连接方式有两种&#xff1a; 十字连接 优点&#xff1a;较好焊接&#xff1a;因铺铜面积减少&#xff0c;温度下降速度降低&#xff0c;较好焊接&#xff0c;不易虚焊。 缺点&#xff1a;载流能力较弱&#xff1a;铺铜面积↓ → 载流能力↓全连接…

python如何发送企业微信群消息

一、创建机器人&#xff0c;并获取webhook 1.1 进入企业微信中&#xff0c;添加群机器人&#xff0c;添加完成后可以获取到一个webhook的地址 1.2 群机器人企业微信接口的调用可以参考这个文件 https://developer.work.weixin.qq.com/document/path/99110#%E5%A6%82%E4%BD%…

问答区故意在结题前回答混赏金的狗

此贴专记录CSDN问答社区里面&#xff0c;一些回答者在临近结题时胡乱回答&#xff0c;只为分取结题赏金的人。 所有图片均为事实&#xff0c;绝无半点虚假。各位看官可以自行搜索问题题目或者通过查看此人回答求证 所有图片均为事实&#xff0c;绝无半点虚假。各位看官可以自行…

服务器数据恢复-EqualLogic PS存储硬盘坏道导致存储不可用的数据恢复案例

服务器数据恢复环境&#xff1a; 一台DELL EqualLogic PS系列存储&#xff0c;存储中有一组由16块SAS硬盘组成的RAID5。上层是VMFS文件系统&#xff0c;存放虚拟机文件。存储上层分了4个卷。 服务器故障&检测&#xff1a; 存储上有2个硬盘指示灯显示黄色&#xff0c;磁盘出…

学习Linux(3)-Linux软件安装之yum

什么是yum yum&#xff08; Yellow dog Updater, Modified&#xff09;是一个在 Fedora 和 RedHat 以及 SUSE 中的 Shell 前端软件包管理器。 假设&#xff0c;在一台window系统的电脑上要用qq&#xff0c;那么我们回去下载qq的安装包&#xff0c;然后执行qq.exe文件在本机上进…

csrf和ssrf的区别,攻击如何防护

CSRF&#xff08;跨站请求伪造&#xff09;和SSRF&#xff08;服务器端请求伪造&#xff09;都是网络安全中的常见攻击类型&#xff0c;但它们的目标和攻击方式有所不同。理解这两种攻击的区别对于有效地防御它们至关重要。 CSRF和SSRF的主要区别在于攻击的发起者和目标。CSRF…

Linux-----3、物理机安装Linux

# 物理机安装Linux # 系统镜像获取 http://isoredirect.centos.org/centos/7/isos/ 例如&#xff1a; CentOS7.9.2009 arch (opens new window) 阿里云镜像 CentOS7.9.2009 x86 (opens new window) # 华为Atlas 500pro 表 2-1 系统版本及适配信息 名称内容操作系统型号CentO…

Ubuntu18.04.6下安装opencv库及OpenCV安装libjasper-dev依赖包错误

目录 01 解压安装包 02 安装cmake和依赖库 03 配置编译环境 01 解压安装包 创建一个名为Opencv的文件夹 mkdir opencv 将源码的压缩包复制到opencv目录下 将压缩包解压到opencv文件夹&#xff08;指定一个文件夹&#xff09; unzip opencv-3.4.11.zip -d opencv02 安装cm…

解决nuxt3子路由router-view中出现的document not defined错误

之前讲过几种解决document not defined错误的方法&#xff0c;但是今天碰到一种新情况&#xff1a; 就是访问根路由/ , 然后再跳转到子路由没有问题: 但是如果直接访问子路由时router-view会报这个错误。 我怀疑原因是&#xff1a; 直接访问子路由时&#xff0c;有可能dom树还…

Tableau快速入门-下载安装加载数据与仪表盘构建

官网介绍 官网连接如下&#xff1a; https://www.tableau.com/zh-cn tableau的产品包括如下&#xff1a; 参考:https://zhuanlan.zhihu.com/p/341882097 Tableau是功能强大、灵活且安全些很高的端到端的数据分析平台&#xff0c;它提供了从数据准备、连接、分析、协作到查阅…

【Matlab】三角函数的周期性图像可视化(附完整MATLAB代码)

三角函数的周期性图像可视化 前言三角函数:MATLAB对三角函数的理解和帮助: 正文思考步骤 代码实现结果 前言 三角函数: 三角函数是数学中一类描述角度和周期性变化的特殊函数。常见的三角函数包括正弦函数 ( sin ⁡ ) (\sin ) (sin) &#xff0c;余弦函数 ( cos ⁡ ) (\cos…

力扣22. 括号生成(java 回溯法)

Problem: 22. 括号生成 文章目录 题目描述思路解题方法复杂度Code 题目描述 思路 我们首先要知道&#xff0c;若想生成正确的括号我们需要让右括号去适配左括号&#xff0c;在此基础上我们利用回溯去解决此题目 1.题目给定n个括号&#xff0c;即当回溯决策路径长度等于 2 n 2n…

【自动化测试】web3py 连接 goerli

web3py 连接 goerli 直接使用库里方法 if __name__ __main__:from web3.auto.infura.goerli import w3w3.eth.get_balance(get_address_by_private_key(os.getenv("AAA_KEY")))error info: websockets.exceptions.InvalidStatusCode: server rejected WebSocket …

计算机网络:物理层(三种数据交换方式)

今天又学到一个知识&#xff0c;加油&#xff01; 目录 前言 一、电路交换 二、报文交换 三、分组交换 1、数据报方式 2、虚电路方式 3、比较 总结 前言 为什么要进行数据交换&#xff1f; 一、电路交换 电路交换原理&#xff1a;在数据传输期间&#xff0c;源结点与…