基础篇:多线程所需知识:RAII接口模式对生产者和消费者封装以及多batch实现

news2025/1/23 5:00:35

我们先来弄一个最基础的infer类:


class Infer{
public: 
    bool load_model(const string &file){
        context_ = file;
        return true;
    } 
    void forward(){
        if(context_.empty()){
            printf("加载模型异常\n");
            return;
        }
        printf("使用%s进行推理\n " , context_.c_str());
    }
    void destroy(){
        context_.clear();
    }
private:
    string context_;

};

正常的模型是很复杂的,但在这里我们就用string代替了。

那这个代码重点就在于异常逻辑,在正常工作代码中,异常逻辑如果没有写好,就会造成封装的不安全性,甚至会导致程序崩溃。

比如之前的load model:如果没有load过还好,但是如果load过了,那不久导致了两次load_model么?

bool load_model(const string &file){
        if (!context_.empty())
        {
            /* code */destroy();
        }
        context_ = file;
        return true;
    } 

所以在load前要先看一眼,如果load过了就destroy

这个时候就能引出我们的RAII + 接口模式

RAII

RAII -------> 资源获取即初始化

接口i模式 ------> 设计模式,是一种封装模式,实现类与接口分离的模式

那就是将原本的

Infer infer_;
infer.load_model("aa");

合为一步,即获取就可以初始化

shared_ptr <Infer> create_infer(const string& file){
    shared_ptr <Infer> instance(new Infer);
    if (!instance->load_model(file))
    {
        instance.reset();
    }
    return instance;
     
}

所以这里是表示如果模型加载失败,则代表资源获取失败,返回空指针。


int main(){
    auto infer_ = create_infer("aa");
    if (infer_ == nullptr)
    {
        /* code */printf("failed\n");
        return -1;
    }
    
    infer_->forward();
    return 0;
}

所以现在的结果非常简单,只要为空指针则表示失败。不为空则成功。

这样就避免了外部执行load_model(RAII制作到一部分没有完全做到)。

避免了load_model不会执行超过一次

获取的模型一定初始化成功,因此forward不用判断模型初始化成功。

load model中可以删除对于重复load的判断,forward可以删掉是否家在成功的判断。

接口模式封装:

那么既然我们已经能用create了,可基于Infer的public属性,还是可以调用load_model的,所以就要进入到接口模式封装了。

接口模式解决问题:

1、解决load_model还能被外部看到的问题。

2、解决成员变量对外可见的问题(比如成员函数是特殊类型的,比如cudaStream_t ,那么头文件必须包含cuda_runtime.h,就会造成命名空间污染,头文件污染的问题 , 不干净的结果就会造成程序错误等异常)

接口类是一个纯虚类

原则是只暴露调用者需要的函数,其他一概隐藏起来

比如说load_model,咱们通过RAII做了定义,因此load_model属于不需要的范畴

内部如果有启动线程等等,start , stop ,也不需要暴露 , 而是初始化的时候自动启动,都是RAII的定义

class InferInterface
{
private:
    /* data */
public:
    virtual void forward() = 0 ;
    
};

forward 要做成纯虚函数。

class InferImpl : public InferInterface{
public: 
    bool load_model(const string &file){
       
        context_ = file;
        return true;
    } 
    virtual void  forward()override{
     
        printf("使用%s进行推理\n " , context_.c_str());
    }
    void destroy(){
        context_.clear();
    }
private:
    string context_;

};

然后要让原来的类继承于接口类

shared_ptr <InferInterface> create_infer(const string& file){
    shared_ptr <InferImpl> instance(new InferImpl);
    if (!instance->load_model(file))
    {
        instance.reset();
    }
    return instance;
     
}

在返回类型的时候返回的是Interface , 在new的时候选的是InferImpl。

 这样作为一个使用者除了forward真的还没有其他的可选项。

之后可以再进一步设置Infer.hpp作为头文件

#ifndef INFER_HPP
#define INFER_HPP
#include<memory>
#include<string>
class InferInterface
{
private:
    /* data */
public:
    virtual void forward() = 0 ;
    
};
std::shared_ptr <InferInterface> create_infer(const std::string& file);
#endif  /. INFER_HPP

实现放到Infer.cpp中:

#include "infer.hpp"
using namespace std;
class InferImpl : public InferInterface{
public: 
    bool load_model(const string &file){
       
        context_ = file;
        return !context_.empty();
    } 
    virtual void  forward()override{
        
        printf("使用%s进行推理\n " , context_.c_str());
    }
    void destroy(){
        context_.clear();
    }
private:
    string context_;

};
shared_ptr <InferInterface> create_infer(const string& file){
    shared_ptr <InferImpl> instance(new InferImpl);
    if (!instance->load_model(file))
    {
        instance.reset();
    }
    return instance;
     
}

这样在main函数里就只需要


#include"infer.hpp"
using namespace std;



int main(){
    auto infer_ = create_infer("aa");
    if (infer_ == nullptr)
    {
        /* code */printf("failed\n");
        return -1;
    }
    
    infer_->forward();
    return 0;
}

这样就做到了

  • 头文件只依赖最简单的
  • 成员变量看不到
  • 只能访问forward

RAII接口总结原则:

1、头文件应尽量简洁

2、不需要的东西都放到cpp当中,不要让接口类看到

比如:

#include <cudaruntime.h>
class InferInterface
{
private:
    cudaStream_t stream;
public:
    virtual void forward() = 0 ;
    
};

这样就不符合尽量简洁的原则,因为这里用不到stream,应该尽可能放到cpp里。

3、尽量不要在头文件写using  namespace,因为写了这个就代表命名空间被打开了,这样就使得所有包含了这个头文件的人的所有命名空间都被打开。但CPP里可以写

多batch实现:

在这里我们主要是进行一个推理的实现。

老规矩:弄一个线程worker_thread_,为了尽量使得资源尽量那里分配那里释放,这样可以保证程序足够简单。在worker内实现模型的家在,使用,与释放。

那样的话

  
bool load_model(const string &file){
        //使得资源尽量那里分配那里释放,这样可以保证程序足够简单

        worker_thread_ = thread(&InferImpl::worker , this , file);
           return context_.empty();
    } 

这肯定是不可以的,毕竟这个线程还没来的及进行worker,就要return了

这时候就要使用future

bool load_model(const string &file){
        //使得资源尽量那里分配那里释放,这样可以保证程序足够简单
        promise<bool> pro;
        worker_thread_ = thread(&InferImpl::worker , this , file , ref(pro));
        context_ = file;
        return pro.get_future().get();
    } 

对应的worker:

void worker(string file  , promise<bool>&pro){
       string context_ = file;
        if (context_.empty())
        {
            /* code */pro.set_value(false);
        }else
        {
            pro.set_value(true);
        }
        while (true)
        {
            /* code */
        }
        
        
        
    }

这样我们就完成了在局部内加载使用释放

仿照上一节完成一下消费者模式:


struct Job{
    shared_ptr<promise<string>>pro;
    string input; 
};

class InferImpl : public InferInterface{
public: 
    bool load_model(const string &file){
        //使得资源尽量那里分配那里释放,这样可以保证程序足够简单
        promise<bool> pro;
        worker_thread_ = thread(&InferImpl::worker , this , file , ref(pro));

        return pro.get_future().get();
    } 
    virtual void  forward(string pic) override{
      
        // printf("使用%s进行推理\n " , context_.c_str());
        Job job;
        job.pro.reset(new promise<string>());
        job.input = pic;
        lock_guard<mutex> l(job_lock);
        qjobs_.push(job);
    }
  
    //实际的推理阶段
    void worker(string file  , promise<bool>&pro){
       string context_ = file;
        if (context_.empty())
        {
            /* code */pro.set_value(false);
        }else
        {
            pro.set_value(true);
        }
        while (true)
        {
           if (!qjobs_.empty())
           {
            lock_guard<mutex> l(job_lock);
            auto job = qjobs_.front();
            qjobs_.pop();
            //inference:
            printf();
           }
           this_thread::yield();
        }  
    }
private:
    thread worker_thread_;
    queue<Job> qjobs_;
    mutex  job_lock;
    condition_variable cv_;
};

但这里每次都是一个一个地拿出来pop,我们要的是多batch,最好是一次能拿batchsize个出来。

这是就可以用vector:

  int maxbatchsize  =  5;
        vector<Job> jobs;
        
        while (true)
        {
           if (!qjobs_.empty())
           {
            lock_guard<mutex> l(job_lock);
            while (jobs.size < maxbatchsize && !qjobs_.empty())
            {
                jobs.emplace_back( qjobs_.front());
                qjobs_.pop();
            }
            
            //inference:
            printf();
           }
           this_thread::yield();
        }  

通知模式

那我们这个消费者不能一直while(true)啊,于是就要对其进行一个通知,也就是有东西进来了,再进行消费。所以就要使用到condition_variable的wait()和notify_one()了。

    virtual void  forward(string pic) override{
      
        // printf("使用%s进行推理\n " , context_.c_str());
        Job job;
        job.pro.reset(new promise<string>());
        job.input = pic;
        lock_guard<mutex> l(job_lock);
        qjobs_.push(job);
        cv_.notify_one();
    }

在传入的时候通知一下我。

    void worker(string file  , promise<bool>&pro){
       string context_ = file;
        if (context_.empty())
        {
            /* code */pro.set_value(false);
        }else
        {
            pro.set_value(true);
        }
        int maxbatchsize  =  5;
        vector<Job> jobs;

        
        //i消费者:
        while (true)
        {
            unique_lock<mutex> l(job_lock);
            cv_.wait(l , [&]{
                return !qjobs_.empty();
            });
          
            while (jobs.size() < maxbatchsize && !qjobs_.empty())
            {
                jobs.emplace_back( qjobs_.front());
                qjobs_.pop();
            }

            //inference:
            printf();
           }

    }

在这里要进行一个等待。之后还可以去除qjobs_.empty()的判断。

这种通知的模式也要比我们之前主动检查模式要更好。

之后就要进行推理了:

for (int  i = 0; i < jobs.size(); i++)
            {
                auto& job = jobs[i];
                char result [100];
                sprintf(result ,"%s : batch-> %d [%d]" , job.input.c_str() , batch_id , jobs.size());
                job.pro->set_value(result);
            }
            batch_id++;
            jobs.clear();
            this_thread::sleep_for(chrono::milliseconds(1000));

 而这个时候set_value过后我们就要考虑一下返回的操作了。

返回future格式

如果forward是采用了

    virtual string  forward(string pic) override{
      
        // printf("使用%s进行推理\n " , context_.c_str());
        Job job;
        job.pro.reset(new promise<string>());
        job.input = pic;
        lock_guard<mutex> l(job_lock);
        qjobs_.push(job);
        cv_.notify_one();
        return job.pro->get_future().get();
    }

这种形式进行返回的话,get() will wait until the whole inference , this not the meaning of batchs

    virtual shared_future<string>  forward(string pic) override{
      
        // printf("使用%s进行推理\n " , context_.c_str());
        Job job;
        job.pro.reset(new promise<string>());
        job.input = pic;
        lock_guard<mutex> l(job_lock);
        qjobs_.push(job);
        cv_.notify_one();
        return job.pro->get_future();
    }

这样的话就可以直接返回future,让执行人有更多的选择。

int main(){
    auto infer_a = create_infer("aa");

   auto result_a =  infer_a->forward("aa");
    auto result_b =infer_a->forward("bb");
    auto result_c =infer_a->forward("cc");
    printf("%s   \n" , result_a.get().c_str());
    printf("%s   \n" , result_b.get().c_str());
    printf("%s   \n" , result_c.get().c_str());
    return 0;
}

执行过后:

 可见是一个batch去实现的。

但是如果要是先get的话,

 那就是分批次了,因为get就代表必须完成整个步骤之后返回。

退出模型

    atomic<bool> running_{false};

设立私有变量,表示程序是否还在运行。

在loadmodel时设立running_为true

bool load_model(const string &file){
        //使得资源尽量那里分配那里释放,这样可以保证程序足够简单
        running_ = true;
        promise<bool> pro;
        worker_thread_ = thread(&InferImpl::worker , this , file , ref(pro));

        return pro.get_future().get();
    } 
 void worker(string file  , promise<bool>&pro){
       string context_ = file;
        if (context_.empty())
        {
            /* code */pro.set_value(false);
        }else
        {
            pro.set_value(true);
        }
        int maxbatchsize  =  5;
        vector<Job> jobs;
        int batch_id = 0;

        //i消费者:
        while (running_)
        {
            unique_lock<mutex> l(job_lock);
            cv_.wait(l , [&]{
                return !qjobs_.empty() ||  !running_;
            });
          if(! running_){
            break;
          }
            while (jobs.size() < maxbatchsize && !qjobs_.empty())
            {
                jobs.emplace_back( qjobs_.front());
                qjobs_.pop();
            }

            for (int  i = 0; i < jobs.size(); i++)
            {
                auto& job = jobs[i];
                char result [100];
                sprintf(result ,"%s : batch-> %d [%d]" , job.input.c_str() , batch_id , jobs.size());
                job.pro->set_value(result);
            }
            batch_id++;
            jobs.clear();
            this_thread::sleep_for(chrono::milliseconds(1000));
           }

  
           printf("释放->%s \n", context_.c_str());
            context_.clear();
           printf("Worker Done\n");
    }

同时worker也要改变,while的判断机制也需要改变,cv_.wait也不能是只有qjobs为空,还要加上是否还在运行。如果不运行了要break出while循环。

在结束之后要释放模型。

析构函数

    virtual ~InferImpl(){
        running_ =  false;
        cv_.notify_one();
        if(worker_thread_.joinable()){
            worker_thread_.join();
        }
    } 

重点在于析沟函数,在该类退出后设置线程同步,并且在running_设置为false的时候通知一次wait。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/826104.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

图解:订单系统的设计

目录 订单系统简介 1. 订单系统在企业中的角色 2. 订单系统与各业务系统的关系 3. 订单系统上下游关系 4. 订单系统的业务架构 订单系统核心功能 1. 订单中所包含的内容信息 2. 流程引擎 订单系统的发展 最后 本文主要讲述了在传统电商企业中&#xff0c;订单系统应承…

SpringCloud入门Day01-服务注册与发现、服务通信、负载均衡与算法

SpringCloudNetflix入门 一、应用架构的演变 伴随互联网的发展&#xff0c;使用互联网的人群越来越多&#xff0c;软件应用的体量越来越大和复杂。而传统单体应用 可能不足以支撑大数据量以及发哦并发场景应用的框架也随之进行演变从最开始的单体应用架构到分布式&#xff08…

RB-Heparin罗丹明B-肝素偶联物的合成【星戈瑞】

罗丹明B标记肝素是一种荧光标记的肝素探针。在生物医学和生物化学研究中&#xff0c;研究人员常常需要将特定的生物分子或化合物进行标记&#xff0c;以便能够在实验中可视化或追踪它们的位置和相互作用。 合成Rhodamine B-Heparin罗丹明B-肝素偶联物需要将罗丹明B染料与肝素进…

20款奔驰S350升级原厂HUD抬头显示系统,提升您的行车安全

HUD是平视显示器的简称&#xff0c;它原先是运用在航空器上的飞行辅助仪器。指飞行员不需要低头&#xff0c;就能够看到他需要的重要资讯。由于HUD的方便性以及能够提高飞行安全&#xff0c;这项技术后来也发展到汽车行业。汽车搭载的HUD抬头数字显示功能&#xff0c;是利用光学…

使用Linux部署Jpress博客系统

环境要求 linux系统&#xff1a;我使用的操作系统是CentOS7 数据库&#xff1a;mysql&#xff0c;也可以使用mariadb jdk&#xff1a;与你的Linux操作系统能兼容的版本 tomcat&#xff1a;我使用的是tomcat8版本 如果没有数据库&#xff0c;请先自行下载 如果没有安装jdk…

MySQL内置函数使用说明

MySQL函数使用说明 MySQL 是一个流行的关系型数据库管理系统&#xff0c;它提供了许多内置函数来处理和操作数据。这些函数可以简化数据库查询和操作的过程&#xff0c;提高代码的可读性和效率。以下是一些常见的 MySQL 内置函数及其使用说明和示例。 数值函数 ABS() 函数原…

带 SPI 接口的独立 CAN 控制器 SIT2515

 芯片功能与 MCP2515 完全一致  适用 CAN2.0B 1.0Mb/s 的速度 0-8 字节长度数据场 支持标准帧扩展帧和远程帧  接收缓存、掩码与过滤码 两个带有存储优先级的接收缓存器 6 个 29 位过滤码 2 个 29 位掩码  采用前两个字节的数据进行报文过滤  三…

android 如何分析应用的内存(十五)——Visual Studio Code 调试Android应用

android 如何分析应用的内存&#xff08;十五&#xff09;——Visual Studio Code 调试Android 应用 在上一篇文章介绍了jdb调试java应用 接下来介绍用UI界面调试java应用&#xff0c;达到同jdb一样的效果。 同样的UI界面有很多选择&#xff0c;如Eclipse&#xff0c;Android …

Qt 中引入ffmpeg 动态库

1、前期准备 在qt引入ffmpeg动态库的时候&#xff0c;需要准备ffmpeg的动态库和头文件。 2、打开qt项目 在qt项目的.pro文件中添加以下几行代码 INCLUDEPATH $$PWD/thirtLib/ffmpeg4.2/include win32: LIBS -L$$PWD/thirtLib/ffmpeg4.2/lib/ -lavcodec -lavdevice -lavf…

亚马逊云科技与真格基金发起「AI超新星计划」,助力早期创业者快速启动项目

大模型创业热度仍旧在持续增加&#xff0c;“百模大战”中AI创业者们的机会更多是在应用层。为了尽可能降低AI创业者的启动门槛&#xff0c;亚马逊云科技携手头部早期投资机构真格基金共同发起了「AI超新星计划」&#xff0c;为心怀梦想的AI应用创业者们提供了从云资源、模型选…

[腾讯云Cloud Studio实战训练营]基于Cloud Studio完成图书管理系统

[腾讯云Cloud Studio实战训练营]基于Cloud Studio完成图书管理系统 ⭐前言&#x1f31c;Cloud Studio产品介绍1.登录2.创建工作空间3.工作空间界面简介4.环境的使用 ⭐实验实操&#x1f31c;Cloud Studio实现图书管理系统1.实验目的 2. 实验过程2.实验环境3.源码讲解3.1添加数据…

如何解决大量小文件传输慢的问题

在信息化社会的今天&#xff0c;大量小文件传输已经成为日常工作中不可或缺的需求。无论是云存储、大数据处理还是软件更新等场景&#xff0c;都需要快速高效地传输大量小文件。然而&#xff0c;传统的传输方式往往受到网络连接速度、传输协议和存储介质等方面的限制&#xff0…

【从零开始学习JAVA | 第三十六篇】IO流下的高级流

目录 前言&#xff1a; 1.缓冲流&#xff1a; 2.转换流&#xff1a; 3.序列化流&#xff1a; 4.打印流&#xff1a; 5.压缩流&#xff1a; 总结&#xff1a; 前言&#xff1a; 在前面我们从IO流体系出发&#xff0c;分别介绍了字节流和字符流&#xff0c;并且详细讲解了其…

推荐两款github敏感信息搜集工具(gsil、gshark)

推荐两款github敏感信息搜集工具&#xff08;gsil、gshark&#xff09; - 云社区 - 腾讯云 (tencent.com) github敏感信息泄露是很多企业时常忽视的一个问题&#xff0c;国外有一份研究报告显示&#xff0c;在超过24,000份的GitHub公开数据中&#xff0c;发现有数千个文件中可能…

python开发实战——ip池

前言 代理IP池是一组可用的代理IP地址&#xff0c;用于访问网站或执行其他网络请求。它可以帮助我们在网络请求时隐藏我们的真实IP地址&#xff0c;从而提高网络安全性、匿名性和稳定性。同时&#xff0c;代理IP池还可以通过定时更新和测试代理IP&#xff0c;保证代理IP的有效…

C#生成dll给c++调用 方法二COM方式 vs2022 NO Make Assembly COM-Visible选错了 不需要clr

有些C项目中也用了C语言.c,用方法一就无法使用【不能使用 /clr 选项编译 C 文件】。就用方法2。 方法二:COM方式 参考&#xff1a; https://www.5axxw.com/questions/content/2ozion 1.C# 生成dll using System; using System.Collections.Generic; using System.Linq; usin…

【Linux多线程】基于生产消费模型写的一个实例(附源代码+讲解)

生产消费模型 生产消费模型为何要使用生产者消费者模型生产者消费者模型优点 基于BlockingQueue的生产者消费者模型BlockQueue.cc代码解释 BlockQueue.hpp代码解释 Makefile代码解释 Task.hpp代码解释 生产消费模型 为何要使用生产者消费者模型 生产者消费者模式就是通过一个…

tdengine入门详解

TDengine是什么&#xff1f; TDengine 是一款开源、高性能、云原生的时序数据库&#xff08;Time Series Database, TSDB&#xff09;, 它专为物联网、车联网、工业互联网、金融、IT 运维等场景优化设计&#xff0c;基于C语言开发。 什么是时序数据库&#xff1f;时序数据产生…

三元运算符引发的自动拆装箱问题

文章目录 问题背景问题排查排查过程问题扩展总结 问题背景 生产环境上出现空指针异常&#xff0c;追踪报错位置得知以下代码报错 if (isNull(aiGroup)) {return null;}aiGroup.setNum(isNull(param.getNum()) ? aiGroup.getNum() : param.getNum().doubleValue());问题排查 …

1500-2000元预算性价比吉他推荐,雅马哈FG800和VEAZEN费森VZ90怎么选?评测对比哪一款更适合初学者入门选购!

在2000元价位入门进阶吉他圈里&#xff0c;可谓是群雄角逐&#xff0c;Yamaha 雅马哈入门级FG800系列和VEAZEN 费森VZ90系列是一直都很热销的面单吉他型号&#xff0c;初学者想要在其中挑选出一把合适自己的吉他还是有点难度的。 那么&#xff0c;今天就以它们为本期的评测主角…