C++多线程的性能优化

news2025/4/7 10:43:59

高效线程池设计与工作窃取算法实现解析

1. 引言

现代多核处理器环境下,线程池技术是提高程序并发性能的重要手段。本文解析一个采用工作窃取(Work Stealing)算法的高效线程池实现,通过详细代码分析和性能测试展示其优势。

2. 线程池核心设计

2.1 类结构概览

class ThreadPool {
public:
    using Task = std::function<void()>;
private:
    struct WorkerData {
        std::deque<Task> task队列;
        std::mutex queue_mutex;
    };
    // 成员变量
    std::vector<WorkerData> worker_data;  // 每个工作线程独立的数据
    std::vector<std::thread> workers;     // 工作线程集合
    std::atomic<bool> stop;               // 停止标志
    std::atomic<size_t> next_worker{0};   // 用于任务分发的轮询计数器
};

关键设计特点:

每个工作线程维护独立的任务队列(避免全局竞争)

使用无锁原子操作实现任务分发

双端队列(deque)支持高效的任务窃取

2.2 任务提交机制

void submit(Task task) {
    size_t index = next_worker.fetch_add(1) % worker_data.size();
    auto& worker = worker_data[index];
    std::lock_guard<std::mutex> lock(worker.queue_mutex);
    worker.task_queue.push_back(std::move(task));
}

提交策略分析:

1使用原子计数器实现轮询分发

2 将任务添加到对应线程的队列尾部

3 细粒度锁(每个线程独立锁)减少竞争

2.3 工作线程主循环

void worker_loop(size_t worker_id) {
    while (!stop.load()) {
        Task task = get_local_task(my_data);  // 优先处理本地任务
        
        if (!task) {
            task = steal_remote_task(worker_id, gen);  // 尝试窃取
        }

        if (task) task();
        else std::this_thread::yield();
    }
}

工作流程:

1 优先执行本地队列任务(LIFO)

2 本地无任务时随机窃取其他线程任务

3 无任务时主动让出CPU

3. 工作窃取算法实现

3.1 本地任务获取

Task get_local_task(WorkerData& data) {
    std::lock_guard<std::mutex> lock(data.queue_mutex);
    if (!data.task_queue.empty()) {
        Task task = std::move(data.task_queue.front());  // 从头部取
        data.task_queue.pop_front();
        return task;
    }
    return nullptr;
}

特点:

本地操作使用队列前端(FIFO)

保持任务执行顺序性

3.2 远程任务窃取

Task get_local_task(WorkerData& data) {
    std::lock_guard<std::mutex> lock(data.queue_mutex);
    if (!data.task_queue.empty()) {
        Task task = std::move(data.task_queue.front());  // 从头部取
        data.task_queue.pop_front();
        return task;
    }
    return nullptr;
}

关键设计:

随机选择窃取目标(避免热点)

尝试获取锁(非阻塞)

从目标队列尾部窃取(减少与本地线程竞争)

使用双端队列实现高效的头/尾操作

4. 性能测试分析

4.1 吞吐量测试

void performance_test() {
    constexpr int NUM_TASKS = 100000;
    constexpr int NUM_THREADS = 8;
    //...
}

典型输出结果:

void performance_test() {
    constexpr int NUM_TASKS = 100000;
    constexpr int NUM_THREADS = 8;
    //...
}

测试结论:

平均每个任务处理时间 ≈ 12.56μs

线程利用率接近线性扩展

4.2 工作窃取效果验证

典型输出:

Task distribution:
Thread 0 processed 2532 tasks
Thread 1 processed 2489 tasks 
Thread 2 processed 2496 tasks
Thread 3 processed 2483 tasks

验证结果:

尽管所有任务初始提交到线程0

工作窃取使任务均匀分布到所有线程

负载均衡效果显著

5. 关键优化技术

本地化优先:线程优先处理本地任务,减少同步开销

随机化窃取:避免多个线程争抢同一个目标队列

双端队列:

本地线程从头部取(FIFO)

窃取线程从尾部取(减少锁竞争)

细粒度锁:每个队列独立锁,而非全局锁

6. 适用场景分析

优势场景:

大量短期任务(μs级)

任务负载不均衡

CPU密集型与IO密集型混合

不适用场景:

任务间强依赖性

需要严格顺序执行

超长时任务(可能导致工作窃取失效)

7. 扩展优化方向

动态线程数量调整

任务优先级支持

批量任务提交

窃取失败时的自适应等待策略

8. 结论

本文实现的线程池通过工作窃取算法有效解决了传统线程池的负载不均问题。测试表明,该设计在保持低延迟的同时,能实现良好的负载均衡,特别适合现代多核处理器环境下的高并发场景。
完整代码

#include <vector>
#include <deque>
#include <thread>
#include <mutex>
#include <atomic>
#include <functional>
#include <random>
#include <algorithm>
#include <iostream>
#include <chrono>

class ThreadPool {
public:
    using Task = std::function<void()>;

    explicit ThreadPool(size_t num_threads) 
        : stop(false), worker_data(num_threads) {
        for (size_t i = 0; i < num_threads; ++i) {
            workers.emplace_back([this, i] { worker_loop(i); });
        }
    }

    ~ThreadPool() {
        stop.store(true);
        for (auto& t : workers) t.join();
    }

    void submit(Task task) {
        // 轮询选择工作线程
        size_t index = next_worker.fetch_add(1) % worker_data.size();
        auto& worker = worker_data[index];
        std::lock_guard<std::mutex> lock(worker.queue_mutex);
        worker.task_queue.push_back(std::move(task));
    }

private:
    struct WorkerData {
        std::deque<Task> task_queue;
        std::mutex queue_mutex;
    };

    std::vector<WorkerData> worker_data;
    std::vector<std::thread> workers;
    std::atomic<bool> stop;
    std::atomic<size_t> next_worker{0};

    void worker_loop(size_t worker_id) {
        WorkerData& my_data = worker_data[worker_id];
        std::random_device rd;
        std::mt19937 gen(rd());

        while (!stop.load()) {
            Task task = get_local_task(my_data);
            
            if (!task) {
                task = steal_remote_task(worker_id, gen);
            }

            if (task) {
                task();
            } else {
                std::this_thread::yield();
            }
        }
    }

    Task get_local_task(WorkerData& data) {
        std::lock_guard<std::mutex> lock(data.queue_mutex);
        if (!data.task_queue.empty()) {
            Task task = std::move(data.task_queue.front());
            data.task_queue.pop_front();
            return task;
        }
        return nullptr;
    }

    Task steal_remote_task(size_t worker_id, std::mt19937& gen) {
        std::uniform_int_distribution<size_t> dist(0, worker_data.size()-1);
        size_t start = dist(gen);
        
        for (size_t i = 0; i < worker_data.size(); ++i) {
            size_t idx = (start + i) % worker_data.size();
            if (idx == worker_id) continue;

            auto& target = worker_data[idx];
            std::unique_lock<std::mutex> lock(target.queue_mutex, std::try_to_lock);
            
            if (lock.owns_lock() && !target.task_queue.empty()) {
                Task task = std::move(target.task_queue.back());
                target.task_queue.pop_back();
                return task;
            }
        }
        return nullptr;
    }
};

// 测试方案
void performance_test() {
    constexpr int NUM_TASKS = 100000;
    constexpr int NUM_THREADS = 8;
    
    ThreadPool pool(NUM_THREADS);
    std::atomic<int> counter{0};
    
    auto start = std::chrono::high_resolution_clock::now();

    // 提交任务
    for (int i = 0; i < NUM_TASKS; ++i) {
        pool.submit([&] {
            // 模拟IO密集型任务
            std::this_thread::sleep_for(std::chrono::microseconds(10));
            counter.fetch_add(1);
        });
    }

    // 等待任务完成
    while (counter.load() < NUM_TASKS) {
        std::this_thread::sleep_for(std::chrono::milliseconds(1));
    }

    auto end = std::chrono::high_resolution_clock::now();
    auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(end - start);
    
    std::cout << "Processed " << NUM_TASKS << " tasks in " 
              << duration.count() << " ms using "
              << NUM_THREADS << " threads" << std::endl;
}

void stealing_effect_test() {
    constexpr int NUM_TASKS = 10000;
    constexpr int NUM_THREADS = 4;
    
    ThreadPool pool(NUM_THREADS);
    std::mutex cout_mutex;
    std::vector<int> task_counts(NUM_THREADS, 0);

    // 将所有任务提交到第一个工作线程
    for (int i = 0; i < NUM_TASKS; ++i) {
        pool.submit([&, i] {
            std::this_thread::sleep_for(std::chrono::microseconds(100));
            {
                std::lock_guard<std::mutex> lock(cout_mutex);
                // 记录任务被哪个线程执行
                static thread_local int executed_by = -1;
                if (executed_by == -1) {
                    executed_by = std::hash<std::thread::id>{}(std::this_thread::get_id()) % NUM_THREADS;
                }
                task_counts[executed_by]++;
            }
        });
    }

    // 等待任务完成
    std::this_thread::sleep_for(std::chrono::seconds(2));
    
    std::cout << "\nTask distribution:\n";
    for (int i = 0; i < NUM_THREADS; ++i) {
        std::cout << "Thread " << i << " processed " 
                  << task_counts[i] << " tasks\n";
    }
}

int main() {
    performance_test();
    stealing_effect_test();
    return 0;
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2326648.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

UI设计系统:如何构建一套高效的设计规范?

UI设计系统&#xff1a;如何构建一套高效的设计规范&#xff1f; 1. 色彩系统的建立与应用 色彩系统是设计系统的基础之一&#xff0c;它不仅影响界面的整体美感&#xff0c;还对用户体验有着深远的影响。首先&#xff0c;设计师需要定义主色调、辅助色和强调色&#xff0c;并…

【计算机网络】记录一次校园网无法上网的解决方法

问题现象 环境&#xff1a;实训室教室内时间&#xff1a;近期突然出现 &#xff08;推测是学校在施工&#xff0c;部分设备可能出现问题&#xff09;症状&#xff1a; 连接校园网 SWXY-WIFI 后&#xff1a; 连接速度极慢偶发无 IP 分配&#xff08;DHCP 失败&#xff09;即使分…

第二十一章:Python-Plotly库实现数据动态可视化

Plotly是一个强大的Python可视化库&#xff0c;支持创建高质量的静态、动态和交互式图表。它特别擅长于绘制三维图形&#xff0c;能够直观地展示复杂的数据关系。本文将介绍如何使用Plotly库实现函数的二维和三维可视化&#xff0c;并提供一些优美的三维函数示例。资源绑定附上…

系统思考反馈

最近交付的都是一些持续性的项目&#xff0c;越来越感觉到&#xff0c;系统思考和第五项修炼不只是简单的一门课程&#xff0c;它们能真正融入到我们的日常工作和业务中&#xff0c;帮助我们用更清晰的思维方式解决复杂问题&#xff0c;推动团队协作&#xff0c;激发创新。 特…

【C++】vector常用方法总结

&#x1f4dd;前言&#xff1a; 在C中string常用方法总结中我们讲述了string的常见用法&#xff0c;vector中许多接口与string类似&#xff0c;作者水平有限&#xff0c;所以这篇文章我们主要通过读vector官方文档的方式来学习vector中一些较为常见的重要用法。 &#x1f3ac;个…

2025年数智化电商产业带发展研究报告260+份汇总解读|附PDF下载

原文链接&#xff1a;https://tecdat.cn/?p41286 在数字技术与实体经济深度融合的当下&#xff0c;数智化产业带正成为经济发展的关键引擎。 从云南鲜花产业带的直播热销到深圳3C数码的智能转型&#xff0c;数智化正重塑产业格局。2023年数字经济规模突破53.9万亿元&#xff…

Linux中常用服务器监测命令(性能测试监控服务器实用指令)

1.查看进程 ps -ef|grep 进程名以下指令需要先安装:sysstat,安装指令: yum install sysstat2.查看CPU使用情况(间隔1s打印一个,打印6次) sar -u 1 63.#查看内存使用(间隔1s打印一个,打印6次) sar -r 1 6

基于 GEE 的区域降水数据可视化:从数据处理到等值线绘制

目录 1 引言 2 代码功能概述 3 代码详细解析 3.1 几何对象处理与地图显示 3.2 加载 CHIRPS 降水数据 3.3 筛选不同时间段的降水数据 3.4 绘制降水时间序列图 3.5 计算并可视化短期和长期降水总量 3.6 绘制降水等值线图 4 总结 5 完整代码 6 运行结果 1 引言 在气象…

曲线拟合 | Matlab基于贝叶斯多项式的曲线拟合

效果一览 代码功能 代码功能简述 目标&#xff1a;实现贝叶斯多项式曲线拟合&#xff0c;动态展示随着数据点逐步增加&#xff0c;模型后验分布的更新过程。 核心步骤&#xff1a; 数据生成&#xff1a;在区间[0,1]生成带噪声的正弦曲线作为训练数据。 参数设置&#xff1a…

Qt6调试项目找不到Bluetooth Component蓝牙组件

错误如图所示 Failed to find required Qt component "Bluetooth" 解决方法&#xff1a;搜索打开Qt maintenance tool 工具 打开后&#xff0c;找到这个Qt Connectivity&#xff0c;勾选上就能解决该错误

JAVA- 锁机制介绍 进程锁

进程锁 基于文件的锁基于Socket的锁数据库锁分布式锁基于Redis的分布式锁基于ZooKeeper的分布式锁 实际工作中都是集群部署&#xff0c;通过负载均衡多台服务器工作&#xff0c;所以存在多个进程并发执行情况&#xff0c;而在每台服务器中又存在多个线程并发的情况&#xff0c;…

Java Spring Boot 与前端结合打造图书管理系统:技术剖析与实现

目录 运行展示引言系统整体架构后端技术实现后端代码文件前端代码文件1. 项目启动与配置2. 实体类设计3. 控制器设计4. 异常处理 前端技术实现1. 页面布局与样式2. 交互逻辑 系统功能亮点1. 分页功能2. 搜索与筛选功能3. 图书操作功能 总结 运行展示 引言 本文将详细剖析一个基…

深入剖析JavaScript多态:从原理到高性能实践

摘要 JavaScript多态作为面向对象编程的核心特性&#xff0c;在动态类型系统的支持下展现了独特的实现范式。本文深入解析多态的三大实现路径&#xff1a;参数多态、子类型多态与鸭子类型&#xff0c;详细揭示它们在动态类型系统中的理论基础与实践意义。结合V8引擎的优化机制…

GalTransl开源程序支持GPT-4/Claude/Deepseek/Sakura等大语言模型的Galgame自动化翻译解决方案

一、软件介绍 文末提供程序和源码下载 GalTransl是一套将数个基础功能上的微小创新与对GPT提示工程&#xff08;Prompt Engineering&#xff09;的深度利用相结合的Galgame自动化翻译工具&#xff0c;用于制作内嵌式翻译补丁。支持GPT-4/Claude/Deepseek/Sakura等大语言模型的…

TGES 2024 | 基于空间先验融合的任意尺度高光谱图像超分辨率

Arbitrary-Scale Hyperspectral Image Super-Resolution From a Fusion Perspective With Spatial Priors TGES 2024 10.1109/TGRS.2024.3481041 摘要&#xff1a;高分辨率高光谱图像&#xff08;HR-HSI&#xff09;在遥感应用中起着至关重要的作用。单HSI超分辨率&#xff…

算法基础_基础算法【高精度 + 前缀和 + 差分 + 双指针】

算法基础_基础算法【高精度 前缀和 差分 双指针】 ---------------高精度---------------791.高精度加法题目介绍方法一&#xff1a;代码片段解释片段一&#xff1a; 解题思路分析 792. 高精度减法题目介绍方法一&#xff1a;代码片段解释片段一&#xff1a; 解题思路分析 7…

Python数据类型-list

列表(List)是Python中最常用的数据类型之一&#xff0c;它是一个有序、可变的元素集合。 1. 列表基础 创建列表 empty_list [] # 空列表 numbers [1, 2, 3, 4, 5] # 数字列表 fruits [apple, banana, orange] # 字符串列表 mixed [1, hello, 3.14, True] # 混合类型…

如何使用cpp操作香橙派GPIO --使用<wiringPi.h>

香橙派是国产SBC &#xff0c;对标树莓派。不过国内的开发环境确实挺惨的&#xff0c;没多少帖子讨论。楼主决定从今天起&#xff0c;不定期更新香橙派的教程。 今天的教程是如何使用香橙派下载wiringOP 并使用CPP操作GPIO 操作GPIO 下载wiringPi 检查git 版本克隆wiringPi…

nacos-sdk-go v2.29 中一个拼写错误,我定位了3个小时 ……

文章目录 问题背景问题现象问题定位解决方案经验总结 问题背景 今天在给项目增加服务注册和发现功能时,选择了 nacos 作为服务注册中心。在使用 nacos-sdk-go v2.29 版本进行开发时,遇到了一个令人啼笑皆非的问题,足足花了3个小时才找到原因。 问题现象 在实现服务订阅通知功…

Linux中的文件寻址

Linux的层级结构 在Linux中一切皆文件 其中 要注意在命令行中看实际选择写哪一种路径 相对路径 绝对路径名称的简写&#xff0c;省略了用户当前所在的系统位置此名称只有在管理当前所在系统目录中子文件时才能使用系统中不以/开有的文件名称都为相对路径在程序操作时会自动…