异构线程池的c++实现方案

news2024/11/29 2:50:55

概要

通常线程池是同质的,每个线程都可以执行任意的task(每个线程中的task顺序执行),如下图所示:

 但本文所介绍的线程和task之间有绑定关系,如A task只能跑在A thread上(因此称为异构线程池,每个线程的功能是有所区别的),如下图所示:

接口设计

TThreadPool接口设计

// 线程池
class TThreadPool
{
public:
    TThreadPool() {}
	TThreadPool(const TThreadPool&) = delete;
	TThreadPool& operator=(const TThreadPool& other) = delete;
    ~TThreadPool();
    bool add_thread(std::string name); // add one thread into pool
    bool delete_thread(std::string name); // remove one thread from pool
    TThread* get_thread_by_name(std::string threadname); // get the thread by name, don't delete return object by yourself
	bool append_task_by_thread(const std::string threadname, const std::function<void()>& task); // add task to pointed thread

private:
	std::mutex m_mutex;
	std::map<std::string, TThread*> m_threads;
};

 TThreadPool类的主要功能是管理创建的线程(TThread,它是线程的具体实现),它提供了增加/删除线程的接口,同时给每个线程打上了标签(name)。

TThread接口设计

// 对std::thread的封装类
class TThread
{
public:
    TThread(std::string name);
    TThread(const TThread& other) = delete;
	TThread& operator=(const TThread& other) = delete;
    ~TThread();

    bool push_task(const std::function<void()>& task); // one add task
    std::thread::id get_thread_id(); // for log purpose
    void set_max_task_size(int s); // avoid thread too busy
    int get_task_size(); // get current task number

private:
	void work(); // real work thread
    void notify(); // notify work thread to quit
private:
    std::string s_name;
    std::atomic_bool b_running;
    std::thread* p_thread;
    std::mutex m_mutex;
	std::queue<std::function<void()> > m_tasks;
	std::condition_variable m_cond;
    std::atomic<int> i_maxTaskSize;
};

TThread类的主要功能是分配任务(push_task函数)和处理任务(work函数)。

代码实现

TThreadPool类

TThreadPool::~TThreadPool() {
    std::unique_lock<std::mutex> lk(m_mutex);
    for(auto iter=m_threads.begin(); iter!=m_threads.end(); iter++) {
        if(iter->second != nullptr) {
            delete iter->second;
        }
    }
    m_threads.clear();
}

bool TThreadPool::add_thread(std::string name) {
    std::unique_lock<std::mutex> lk(m_mutex);
    if(m_threads.count(name)) {
        return false;
    }
    auto tt = new TThread(name);
    if(tt == nullptr) {
        return false;
    }
    m_threads[name] = tt;
    return true;
}

bool TThreadPool::delete_thread(std::string name) {
    std::unique_lock<std::mutex> lk(m_mutex);
    if(m_threads.count(name) == 0) {
        return false;
    }
    delete m_threads[name];
    m_threads.erase(name);
    return true;
}

TThread* TThreadPool::get_thread_by_name(std::string threadname) {
    std::unique_lock<std::mutex> lk(m_mutex);
    if(m_threads.count(threadname) == 0) {
        return nullptr;
    }
    return m_threads[threadname];
}

bool TThreadPool::append_task_by_thread(const std::string threadname, const std::function<void()>& task)
{
    std::unique_lock<std::mutex> lk(m_mutex);
    if(m_threads.count(threadname) == 0) {
        return false;
    }
    auto tt = m_threads[threadname];
    return tt->push_task(task);
}

TThread类

const int MAX_TASK_SIZE = 100; // max task size in one thread

TThread::TThread(std::string name) : s_name(name)
                              , b_running(true)
                              , i_maxTaskSize(MAX_TASK_SIZE) {
    p_thread = new std::thread(&TThread::work, this);
}

TThread::~TThread() {
    notify(); // notify work thread quit
    if(p_thread->joinable()) {
        p_thread->join();
    }
    delete p_thread;
}

bool TThread::push_task(const std::function<void()>& task) {
	std::unique_lock<std::mutex> lk(m_mutex);
    if(!b_running) {
        return false;
    }
    if(m_tasks.size() > i_maxTaskSize.load()) {
        return false;
    }
    m_tasks.push(task);
    m_cond.notify_one();
    return true;
}

std::thread::id TThread::get_thread_id() {
    return p_thread->get_id();
}

void TThread::set_max_task_size(int s) {
    if(s <= 0) {return;}
    i_maxTaskSize.store(s);
}

void TThread::work() {
    std::cout << std::this_thread::get_id() << " begin work thread" << std::endl;

	while (b_running) { // quit even tasks remaining
		std::function<void()> task;
		{
			std::unique_lock<std::mutex> lk(m_mutex);
			if (!m_tasks.empty()) {
				task = m_tasks.front();
				m_tasks.pop();
			} else if (b_running && m_tasks.empty()) {
				m_cond.wait(lk);
            }
		}

		if (task)
			task(); // do the task
	}
    std::cout << std::this_thread::get_id() << " end work thread"  << std::endl;
}

void TThread::notify() {
    std::unique_lock<std::mutex> lk(m_mutex);
    b_running = false;
    m_cond.notify_one(); 
    // mutex will be released here, therefore another thread would lock it afterward
}

int TThread::get_task_size() {
    std::unique_lock<std::mutex> lk(m_mutex);
    return m_tasks.size();
}

使用方式

有两种方式可以调用对应的线程

公共代码

void func1(int i) {
    std::cout << "into func1: " << i << std::endl;
    sleep(2); // simulate real work
}
    TThreadPool thread_pool;
    thread_pool.add_thread("vdr"); // 启动vdr线程
    thread_pool.add_thread("xgb"); // 启动xgb线程

方式一、(先获取线程对象,然后对该线程对象添加任务)

auto tt = thread_pool.getThreadByName("vdr");
tt->push_task(std::bind(func1, 2));
tt->push_task(std::bind(func1, 5));

方式二、(直接通过线程池给对应线程添加任务)

thread_pool.append_task_by_thread("vdr", std::bind(func1, 2));
thread_pool.append_task_by_thread("vdr", std::bind(func1, 5));

注:

task是std::function<void()>类型,上面的demo是普通函数实现的,真实场景应该是类函数,实现如下:

class A {
public:
    void func(std::string str) {
        std::cout << "into A func: " << str << std::endl;
    }
};
    A a;
    thread_pool.append_task_by_thread("vdr", std::bind(&A::func, &a, "2"));
    thread_pool.append_task_by_thread("vdr", std::bind(&A::func, &a, "5"));

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/793109.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

《脱离“一支笔、一双手、一道力扣”困境的秘诀》:突破LeetCode难题的五个关键步骤

导言&#xff1a; 在解决LeetCode等编程题时&#xff0c;不少人会陷入“一支笔、一双手、一道力扣&#xff08;LeetCode&#xff09;做一宿”的困境。尽管已经掌握了相关知识和算法&#xff0c;但在实际挑战中却无从下手。本文将分享如何摆脱这一困境的秘诀&#xff0c;让你在面…

Java带符号右移(>>)、不带符号右移(>>>)

Java的右移涉及带符号右移&#xff08;>>&#xff09;、不带符号右移&#xff08;>>>&#xff09;。 对于正数&#xff0c;因为符号位是0&#xff0c;带符号右移和不带符号右移左侧都用0填充&#xff0c;所以结果相同。 对于负数&#xff0c;因为符号位是1&…

python + requests实现的电商API接口自动化框架详细教程

1、首先&#xff0c;我们先来理一下思路。 正常的电商接口测试流程是什么&#xff1f; 脑海里的反应是不是这样的&#xff1a; 确定测试接口的工具 —> 配置需要的接口参数 —> 进行测试 —> 检查测试结果&#xff08;有的需要数据库辅助&#xff09; —> 生成测…

第一节:我用Python论证移动平均线(MA)真的靠谱吗?

视频地址&#xff1a;点我查看文章配套视频 什么是MA 所谓“移动平均线”是指一段时间内股票价格的算术平均线&#xff0c;通常以收盘价作为计算值。它是一种趋向类技术指标。 在证券投资中&#xff0c;要赚钱无非是降低成本、提高收入&#xff0c;以期获得较高利润&#xff0…

内存泄漏是什么?有什么危害

内存泄漏是什么&#xff1f;有什么危害 1. 前言1.内存泄漏是什么&#xff1f;2. 为什么会发生内存泄漏3. 内存泄漏的危害4. 总结 1. 前言 在各种项目开发中&#xff0c;内存泄漏是一个很严重的问题。对资源管理、性能优越、系统稳定性&#xff0c;以及是否安全产生极大印象。本…

Seaborn库绘制单变量分布和双变量分布

Matplotlib虽然已经是比较优秀的绘图库了&#xff0c;但是它有个今人头疼的问题&#xff0c;那就是API使用过于复杂&#xff0c;它里面有上千个函数和参数&#xff0c;属于典型的那种可以用它做任何事&#xff0c;却无从下手。 Seaborn基于 Matplotlib核心库进行了更高级的API…

1-Linux的目录结构

Linux的目录结构是规定好的&#xff0c;不可以随意进行更改&#xff01; Linux的文件系统是采用级层式的树状目录结构&#xff0c;最上层是根目录–/&#xff0c;然后再在根目录下创建其它的目录。 各个目录中主要负责的功能和作用如下&#xff1a;&#xff08;主体的结构一定…

TypeScript入门学习汇总

1.快速入门 1.1 简介 TypeScript 是 JavaScript 的一个超集&#xff0c;支持 ECMAScript 6 标准。 TypeScript 由微软开发的自由和开源的编程语言。 TypeScript 设计目标是开发大型应用&#xff0c;它可以编译成纯 JavaScript&#xff0c;编译出来的 JavaScript 可以运行在…

65英寸OLED透明屏的显示效果出色吗?

65英寸OLED透明屏是一种新型的显示技术&#xff0c;它采用有机发光二极管&#xff08;OLED&#xff09;作为显示元件&#xff0c;具有高亮度、高对比度、快速响应和广视角等优点。 与传统的液晶显示屏相比&#xff0c;OLED透明屏具有更高的透明度和更好的显示效果。 OLED透明屏…

VMPWN的入门级别题目详解(一)

实验一 VMPWN1 题目简介 这是一道基础的VM相关题目&#xff0c;VMPWN的入门级别题目。前面提到VMPWN一般都是接收字节码然后对字节码进行解析&#xff0c;但是这道题目不接受字节码&#xff0c;它接收字节码的更高一级语言&#xff1a;汇编。程序直接接收类似”mov”、”add”…

Python安装pip命令教程及更改镜像源

1、官方地址&#xff1a;地址 2、解压完成后&#xff0c;我们进入pip-23.2.1文件目录&#xff0c;找到setup.py&#xff0c;打开cmd进入此目录&#xff0c;或者在文件地址中输入cmd打开&#xff0c;输入以下命令&#xff1a; python setup.py install进l进行安装&#xff0c;安…

全网最全,Jmeter+Ant 接口自动化测试,从0到1精通实战...

目录&#xff1a;导读 前言一、Python编程入门到精通二、接口自动化项目实战三、Web自动化项目实战四、App自动化项目实战五、一线大厂简历六、测试开发DevOps体系七、常用自动化测试工具八、JMeter性能测试九、总结&#xff08;尾部小惊喜&#xff09; 前言 注&#xff1a;涉…

用友全版本任意文件上传漏洞复现

声明 本文仅用于技术交流&#xff0c;请勿用于非法用途 由于传播、利用此文所提供的信息而造成的任何直接或者间接的后果及损失&#xff0c;均由使用者本人负责&#xff0c;文章作者不为此承担任何责任。 文章作者拥有对此文章的修改和解释权。如欲转载或传播此文章&#xff0c…

大数据技术之Hive1

目录标题 1、Hive基本概念1.1 定义1.2 优缺点1.3 Hive架构原理1.4 hive和数据库比较 2、Hive安装2.1 Hive 安装地址 1、Hive基本概念 1.1 定义 hive是基于hadoop的一个数据仓库工具&#xff0c;可以将结构化数据文件映射成一张表&#xff0c;并提供类SQL查询功能。 本质&…

(202307)wonderful-sql:复杂一点的查询(task3)

教程链接&#xff1a;Datawhale - 一个热爱学习的社区 知识学习 1 视图 视图是一张虚拟的表。《sql基础教程第2版》用一句话非常凝练的概括了视图与表的区别---“是否保存了实际的数据”。 通过定义视图可以将频繁使用的SELECT语句保存以提高效率。通过定义视图可以使用户看…

OSCP最新考试QA

枚举提示 初始枚举 对你的目标进行光线扫描。 例如&#xff0c;扫描您的考试机器上的10个常见端口。 在等待彻底和更长时间的扫描时&#xff0c;手动与找到的服务交互。 仔细列举 避免对多个目标进行大量扫描。 运行不安全扫描后还原计算机。 重新运行扫描以确保所有信…

进程线程间的通信

进程和线程之间有很多种方法进行通信&#xff0c;如下是需要掌握的通信方式&#xff1a; 无名管道&#xff08;pipe&#xff09;有名管道&#xff08;fifo&#xff09;信号&#xff08;signal&#xff09;共享内存&#xff08;mmap&#xff09; 本文章代码存放在GitHub中的UNIX…

libhv之hloop源码分析

1int hloop_run(hloop_t* loop) hloop_run总结&#xff1a; 1.loop状态判断和管理 2.loop的flags管理&#xff08;HLOOP_FLAG_QUIT_WHEN_NO_ACTIVE_EVENTS&#xff0c;HLOOP_FLAG_RUN_ONCE&#xff0c;HLOOP_FLAG_AUTO_FREE&#xff09; 3.创建custom event通信fd&#xff0c;方…

Docker consul容器服务更新与发现

Docker consul容器服务更新与发现 一、什么事服务注册与发现二、什么是consul三、consul部署1、consul服务器2、registrator服务器3、consul-template 一、什么事服务注册与发现 服务注册与发现是微服务架构中不可或缺的重要组件。起初服务都是单节点的&#xff0c;不保障高可…

MyBatis学习笔记之高级映射及延迟加载

文章目录 环境搭建&#xff0c;数据配置多对一的映射的思路逻辑级联属性映射association分布查询 一对多的映射的思路逻辑collection分布 环境搭建&#xff0c;数据配置 t_class表 t_stu表 多对一的映射的思路逻辑 多对一&#xff1a;多个学生对应一个班级 多的一方是st…