线程池是一种多线程处理形式,处理过程中将任务添加到队列,然后在创建线程后自动启动这些任务。线程池线程都是后台线程。每个线程都使用默认的堆栈大小,以默认的优先级运行,并处于多线程单元中。如果某个线程在托管代码中空闲(如正在等待某个事件),则线程池将插入另一个辅助线程来使所有处理器保持繁忙。如果所有线程池线程都始终保持繁忙,但队列中包含挂起的工作,则线程池将在一段时间后创建另一个辅助线程但线程的数目永远不会超过最大值。超过最大值的线程可以排队,但他们要等到其他线程完成后才启动。
C++ 11标准支持线程,但是未支持线程池,本文章使用C++11实现最简单线程池,无优先级
C++ 11 实现简单线程池
使用到c++11的新特性有
std::unique_lock
std::mutex
std::atomic_bool
std::thread
std::future
std::condition_variable
std::function
std::forward
std::packaged_task
std::make_shared
auto、decltype 自动类型推导
线程安全队列
基于std::queue<T> 实现,保证访问内部对象的线程安全性
/**
* @brief 线程安全队列
* @author GGX
* @date 2023-08-13
*/
template <typename T>
class CSateQueue
{
public:
CSateQueue() = default;
~CSateQueue() {}
/**
* @brief 队列是否为空
* @param 无
* @return true 空,false 非空
* @author GGX
* @date 2023-08-19
*/
bool empty() const
{
std::unique_lock<std::mutex> lock(m_mutex);
return m_queue.empty();
}
/**
* @brief 队列是否为空
* @param 无
* @return size_t 队列大小
* @author GGX
* @date 2023-08-19
*/
size_t size() const
{
std::unique_lock<std::mutex> lock(m_mutex);
return m_queue.size();
}
/**
* @brief 压栈
* @param 对象
* @return 无
* @author GGX
* @date 2023-08-19
*/
void push(T &t)
{
std::unique_lock<std::mutex> lock(m_mutex);
m_queue.emplace(t);
}
/**
* @brief 出栈
* @param 对象
* @return true 成功,false 失败
* @author GGX
* @date 2023-08-19
*/
bool front(T &t)
{
std::unique_lock<std::mutex> lock(m_mutex);
if (m_queue.empty())
{
return false;
}
t = std::move(m_queue.front());
m_queue.pop();
return true;
}
private:
// 利用模板函数构造队列
std::queue<T> m_queue;
// 互斥量,可修改
mutable std::mutex m_mutex;
};
线程池
常驻线程,有任务时,才启动线程
/**
* @brief 线程池
* @author GGX
* @date 2023-08-13
*/
class ThreadPool
{
public:
/**
* @brief 构造函数,默认六个线程
* @param [in] size 线程数目
* @return
* @author GGX
* @date 2023-08-19
*/
explicit ThreadPool(size_t size = 6): m_size(size)
{
m_active = false;
}
~ThreadPool()
{
stop();
}
/**
* @brief 启动线程池
* @param 无
* @return 无
* @author GGX
* @date 2023-08-19
*/
void start()
{
//标记线程启动
m_active = true;
m_threads.reserve(m_size);
for (size_t i = 0; i < m_size; i++)
{
//创建线程池
m_threads.emplace_back(ThreadPool::work, this);
}
std::cout << "ThreadPool start" << std::endl;
}
/**
* @brief 停止线程池
* @param 无
* @return 无
* @author GGX
* @date 2023-08-19
*/
void stop()
{
//标记线程停止
m_active = false;
//唤醒所有线程
m_cv.notify_all();
for (thread &th : m_threads)
{
//优雅的退出线程
if (th.joinable())
{
th.join();
}
}
std::cout << "ThreadPool stop" << std::endl;
}
void work()
{
std::function<void()> func; // 定义基础函数类func
bool dequeued = false; // 是否正在取出队列中元素
while (m_active)
{
dequeued = false;
{
std::unique_lock<std::mutex> lock(m_mutex);
//判断任务队列是否为空,线程是否已停止
auto funcR = [](CSateQueue<std::function<void()>> &tasks,
std::atomic_bool & bActive)->bool
{
return !tasks.empty() || !bActive;
};
//等待线程被唤醒
m_cv.wait(lock, std::bind(funcR, std::ref(m_tasks), std::ref(m_active)));
if (!m_active)
{
break;
}
dequeued = m_tasks.front(func);
}
//取到任务,则运行
if (dequeued)
{
func();
}
}
}
/**
* @brief 提交任务,让线程执行
* @param [in] f 任务
* @param [in] ...args 可变参
* @return auto 返回值类型,自动推导
* @author GGX
* @date 2023-08-19
*/
template <typename F, typename ...Args>
auto submit(F && f, Args && ...args)->std::future<decltype(f(args...))>
{
//返回值类型
using returnType = decltype(f(args...))();
//函数包
std::function<returnType> func =
std::bind(std::forward<F>(f), std::forward<Args>(args)...);
//包装任何可调用函数对象
auto fu_ptr = std::make_shared<std::packaged_task<returnType>> (func);
std::function<void()> task = [fu_ptr]
{
(*fu_ptr)();
};
//函数放入队列
m_tasks.push(task);
//唤醒一个线程
m_cv.notify_one();
//返回结果
return fu_ptr->get_future();
}
private:
size_t m_size; //线程大小
std::atomic_bool m_active; //线程是否运行,原子变量
vector<thread> m_threads; //线程容器
std::condition_variable m_cv;//条件变量,唤醒线程
std::mutex m_mutex; //互斥量,保证线程安全
CSateQueue<std::function<void()>> m_tasks; //任务队列
};
调用示例
#include <iostream>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <atomic>
#include <vector>
#include <queue>
#include <functional>
#include <future>
#include <random>
using namespace std;
/**
* @brief 线程安全队列
* @author GGX
* @date 2023-08-13
*/
template <typename T>
class CSateQueue
{
public:
CSateQueue() = default;
~CSateQueue() {}
/**
* @brief 队列是否为空
* @param 无
* @return true 空,false 非空
* @author GGX
* @date 2023-08-19
*/
bool empty() const
{
std::unique_lock<std::mutex> lock(m_mutex);
return m_queue.empty();
}
/**
* @brief 队列是否为空
* @param 无
* @return size_t 队列大小
* @author GGX
* @date 2023-08-19
*/
size_t size() const
{
std::unique_lock<std::mutex> lock(m_mutex);
return m_queue.size();
}
/**
* @brief 压栈
* @param 对象
* @return 无
* @author GGX
* @date 2023-08-19
*/
void push(T &t)
{
std::unique_lock<std::mutex> lock(m_mutex);
m_queue.emplace(t);
}
/**
* @brief 出栈
* @param 对象
* @return true 成功,false 失败
* @author GGX
* @date 2023-08-19
*/
bool front(T &t)
{
std::unique_lock<std::mutex> lock(m_mutex);
if (m_queue.empty())
{
return false;
}
t = std::move(m_queue.front());
m_queue.pop();
return true;
}
private:
// 利用模板函数构造队列
std::queue<T> m_queue;
// 互斥量,可修改
mutable std::mutex m_mutex;
};
/**
* @brief 线程池
* @author GGX
* @date 2023-08-13
*/
class ThreadPool
{
public:
/**
* @brief 构造函数,默认六个线程
* @param [in] size 线程数目
* @return
* @author GGX
* @date 2023-08-19
*/
explicit ThreadPool(size_t size = 6): m_size(size)
{
m_active = false;
}
~ThreadPool()
{
stop();
}
/**
* @brief 启动线程池
* @param 无
* @return 无
* @author GGX
* @date 2023-08-19
*/
void start()
{
//标记线程启动
m_active = true;
m_threads.reserve(m_size);
for (size_t i = 0; i < m_size; i++)
{
//创建线程池
m_threads.emplace_back(ThreadPool::work, this);
}
std::cout << "ThreadPool start" << std::endl;
}
/**
* @brief 停止线程池
* @param 无
* @return 无
* @author GGX
* @date 2023-08-19
*/
void stop()
{
//标记线程停止
m_active = false;
//唤醒所有线程
m_cv.notify_all();
for (thread &th : m_threads)
{
//优雅的退出线程
if (th.joinable())
{
th.join();
}
}
std::cout << "ThreadPool stop" << std::endl;
}
void work()
{
std::function<void()> func; // 定义基础函数类func
bool dequeued = false; // 是否正在取出队列中元素
while (m_active)
{
dequeued = false;
{
std::unique_lock<std::mutex> lock(m_mutex);
//判断任务队列是否为空,线程是否已停止
auto funcR = [](CSateQueue<std::function<void()>> &tasks,
std::atomic_bool & bActive)->bool
{
return !tasks.empty() || !bActive;
};
//等待线程被唤醒
m_cv.wait(lock, std::bind(funcR, std::ref(m_tasks), std::ref(m_active)));
if (!m_active)
{
break;
}
dequeued = m_tasks.front(func);
}
//取到任务,则运行
if (dequeued)
{
func();
}
}
}
/**
* @brief 提交任务,让线程执行
* @param [in] f 任务
* @param [in] ...args 可变参
* @return auto 返回值类型,自动推导
* @author GGX
* @date 2023-08-19
*/
template <typename F, typename ...Args>
auto submit(F && f, Args && ...args)->std::future<decltype(f(args...))>
{
//返回值类型
using returnType = decltype(f(args...))();
//函数包
std::function<returnType> func =
std::bind(std::forward<F>(f), std::forward<Args>(args)...);
//包装任何可调用函数对象
auto fu_ptr = std::make_shared<std::packaged_task<returnType>> (func);
std::function<void()> task = [fu_ptr]
{
(*fu_ptr)();
};
//函数放入队列
m_tasks.push(task);
//唤醒一个线程
m_cv.notify_one();
//返回结果
return fu_ptr->get_future();
}
private:
size_t m_size; //线程大小
std::atomic_bool m_active; //线程是否运行,原子变量
vector<thread> m_threads; //线程容器
std::condition_variable m_cv;//条件变量,唤醒线程
std::mutex m_mutex; //互斥量,保证线程安全
CSateQueue<std::function<void()>> m_tasks; //任务队列
};
std::random_device rd; // 真实随机数产生器
std::mt19937 mt(rd()); //生成计算随机数mt
std::uniform_int_distribution<int> dist(-1000, 1000); //生成-1000到1000之间的离散均匀分布数
auto rnd = std::bind(dist, mt);
/**
* @brief 测试线程池
* @author GGX
* @date 2023-08-13
*/
class CTest
{
public:
void simulate_hard_computation()
{
std::this_thread::sleep_for(std::chrono::milliseconds(2000 + rnd()));
}
void multiply(const int a, const int b)
{
simulate_hard_computation();
const int res = a * b;
std::cout << "multiply thid: " << std::this_thread::get_id() << std::endl;
std::cout << a << " * " << b << " = " << res << std::endl;
}
void multiply_output(int &out, const int a, const int b)
{
simulate_hard_computation();
out = a * b;
std::cout << "multiply_output thid: " << std::this_thread::get_id() << std::endl;
std::cout << a << " * " << b << " = " << out << std::endl;
}
int multiply_return(const int a, const int b)
{
simulate_hard_computation();
const int res = a * b;
std::cout << "multiply_return thid: " << std::this_thread::get_id() << std::endl;
std::cout << a << " * " << b << " = " << res << std::endl;
return res;
}
};
void example()
{
CTest a;
ThreadPool pool(6);
pool.start();
//对象方法,因此需要绑定对象
std::function<void(int, int)> multiply =
std::bind(&CTest::multiply, &a, std::placeholders::_1, std::placeholders::_2);
for (int i = 1; i < 3; i++)
{
for (int j = 0; j < 3; j++)
{
//提交调用
pool.submit(multiply, i, j);
}
}
auto future = pool.submit(multiply, 3, 10);
future.get();
int output_ref;
std::function<void(int&, int, int)> multiply_output =
std::bind(&CTest::multiply_output, &a, std::placeholders::_1,
std::placeholders::_2, std::placeholders::_3);
// 使用ref传递的输出参数提交函数
auto future1 = pool.submit(multiply_output, std::ref(output_ref), 5, 6);
future1.get();
std::cout << "Last operation result is equals to " << output_ref << std::endl;
// 使用return参数提交函数
std::function<int(int, int)> multiply_return =
std::bind(&CTest::multiply_return, &a, std::placeholders::_1, std::placeholders::_2);
auto future2 = pool.submit(multiply_return, 5, 3);
// 等待乘法输出完成
int res = future2.get();
std::cout << "Last operation result is equals to " << res << std::endl;
pool.stop();
}
int main()
{
std::cout << "std::thread::hardware_concurrency() "
<< std::thread::hardware_concurrency() << std::endl << std::endl;
example();
return 0;
}
输出