参考https://shanhai.huawei.com/#/page-forum/post-details?postId=43796
完整代码
#include <iostream>
#include <vector>
#include <queue>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <functional>
#include <future>
#include <atomic>
// 任务优先级结构体
struct PriorityTask {
int priority;
std::function<void()> func;
// 优先级比较,优先级数值越小,优先级越高
bool operator<(const PriorityTask& other) const {
return priority > other.priority;
}
};
// 线程池类
class ThreadPool {
public:
ThreadPool(size_t threads) : stop(false) {
for (size_t i = 0; i < threads; ++i) {
workers.emplace_back([this] {
while (true) {
PriorityTask task;
{
std::unique_lock<std::mutex> lock(this->queueMutex);
this->condition.wait(lock, [this] { return this->stop || !this->tasks.empty(); });
if (this->stop && this->tasks.empty())
return;
task = std::move(this->tasks.top());
this->tasks.pop();
}
task.func();
}
});
}
}
~ThreadPool() {
{
std::unique_lock<std::mutex> lock(queueMutex);
stop = true;
}
condition.notify_all();
for (std::thread& worker : workers) {
worker.join();
}
}
template<class F, class... Args>
auto enqueue(int priority, F&& f, Args&&... args)
-> std::future<typename std::result_of<F(Args...)>::type> {
using return_type = typename std::result_of<F(Args...)>::type;
auto task = std::make_shared< std::packaged_task<return_type()> >(
std::bind(std::forward<F>(f), std::forward<Args>(args)...)
);
std::future<return_type> res = task->get_future();
{
std::unique_lock<std::mutex> lock(queueMutex);
// Don't allow enqueueing after stopping the pool
if(stop)
throw std::runtime_error("enqueue on stopped ThreadPool");
tasks.emplace(PriorityTask{priority, [task]() { (*task)(); }});
}
condition.notify_one();
return res;
}
private:
// Need to keep track of threads so we can join them
std::vector< std::thread > workers;
// The task queue
std::priority_queue< PriorityTask > tasks;
// Synchronization
std::mutex queueMutex;
std::condition_variable condition;
std::atomic<bool> stop;
};
// 使用示例
int main() {
ThreadPool pool(4);
auto result1 = pool.enqueue(1, []() -> int {
std::cout << "Executing task 1" << std::endl;
return 1;
});
auto result2 = pool.enqueue(0, []() -> int {
std::cout << "Executing task 2" << std::endl;
return 2;
});
std::cout << "Task 1 result: " << result1.get() << std::endl;
std::cout << "Task 2 result: " << result2.get() << std::endl;
return 0;
}
任务定义
// 任务优先级结构体
struct PriorityTask {
int priority;
std::function<void()> func;
// 优先级比较,优先级数值越小,优先级越高
bool operator<(const PriorityTask& other) const {
return priority > other.priority;
}
};
回调函数是一种编程模式,允许一个函数在其他函数完成某些操作后被调用。回调函数通常用于异步编程,即在等待某些操作完成时,可以执行其他任务,然后在操作完成时调用回调函数。
例如,在网络编程中,当发送一个请求后,可以注册一个回调函数,当接收到响应时,这个回调函数会被调用。这样,程序可以在等待响应时执行其他任务,而不是阻塞等待。
在C++中,可以使用std::function对象来存储和调用回调函数。例如:
在这个例子中,sendRequest函数接受一个URL和一个回调函数作为参数。在发送请求并接收响应后,它会调用回调函数onResponse,并传递状态码和响应。
需要注意的是,回调函数的使用需要谨慎,因为如果回调函数在被调用前就被销毁,可能会导致未定义行为。因此,通常需要确保回调函数的生命周期足够长,或者使用std::shared_ptr等机制来管理回调函数的生命周期。
总的来说,回调函数是一种强大的编程模式,可以用于实现异步编程、事件驱动编程等复杂功能,通过使用std::function对象,可以方便地存储和调用回调函数。
线程池初始化
class ThreadPool {
public:
ThreadPool(size_t threads) : stop(false) {
for (size_t i = 0; i < threads; ++i) {
workers.emplace_back([this] {
while (true) {
PriorityTask task;
{
std::unique_lock<std::mutex> lock(this->queueMutex);
this->condition.wait(lock, [this] { return this->stop || !this->tasks.empty(); });
if (this->stop && this->tasks.empty())
return;
task = std::move(this->tasks.top());
this->tasks.pop();
}
task.func();
}
});
}
}
task = std::move(this->tasks.top())
std:thread 初始化
#include <thread>
#include <iostream>
void printHello() {
std::cout << "Hello, world!" << std::endl;
}
int main() {
std::thread t(printHello);
t.join();
return 0;
}
在C++中,std::thread是一个用于表示并发执行线程的类,它是C++标准库中的一部分。std::thread对象可以通过构造函数来初始化,并且可以通过调用其成员函数来控制线程的执行。
以下是一个简单的例子,展示了如何初始化和使用std::thread对象:
#include <thread>
#include <iostream>
void printHello() {
std::cout << "Hello, world!" << std::endl;
}
int main() {
std::thread t(printHello);
t.join();
return 0;
}
在这个例子中,std::thread t(printHello);创建了一个std::thread对象t,并将其初始化为执行printHello函数。t.join();等待线程t完成执行,然后继续执行后面的代码。
需要注意的是,std::thread对象在创建后会立即启动线程,并执行指定的任务。如果在线程完成执行之前销毁std::thread对象,可能会导致未定义行为。因此,通常需要确保std::thread对象的生命周期足够长,或者使用std::shared_ptr等机制来管理线程的生命周期。
总的来说,std::thread对象可以通过构造函数来初始化,并且可以通过调用其成员函数来控制线程的执行。通过使用std::thread,可以方便地实现多线程编程,并行执行任务。
线程池入队
template<class F, class... Args>
auto enqueue(int priority, F&& f, Args&&... args)
-> std::future<typename std::result_of<F(Args...)>::type> {
using return_type = typename std::result_of<F(Args...)>::type;
auto task = std::make_shared< std::packaged_task<return_type()> >(
std::bind(std::forward<F>(f), std::forward<Args>(args)...)
);
std::future<return_type> res = task->get_future();
{
std::unique_lock<std::mutex> lock(queueMutex);
// Don't allow enqueueing after stopping the pool
if(stop)
throw std::runtime_error("enqueue on stopped ThreadPool");
tasks.emplace(PriorityTask{priority, [task]() { (*task)(); }});
}
condition.notify_one();
return res;
}
这段代码是一个线程池类的成员函数,用于将一个新的任务添加到任务队列中。
首先,它使用模板参数F和Args来接受任务的函数和参数。然后,它使用std::bind来创建一个可调用对象,该对象在被调用时会执行函数f并传递参数args。
接着,它使用std::make_shared来创建一个std::packaged_task对象,该对象可以在另一个线程中执行任务。std::packaged_task对象包装了一个可调用对象,并提供了一个get_future方法,该方法返回一个std::future对象,该对象可以用来获取任务的结果。
然后,它使用std::unique_lock来获取queueMutex互斥锁,以确保在修改任务队列时不会有其他线程干扰。
在获取锁之后,它检查stop标志,如果线程池已经停止,那么就抛出一个运行时错误。然后,它将新的任务添加到任务队列中,并使用condition.notify_one()来唤醒一个等待condition条件变量的线程。
最后,它返回std::future对象,该对象可以用来获取任务的结果。
总的来说,这个函数的主要作用是将一个新的任务添加到任务队列中,并返回一个std::future对象,该对象可以用来获取任务的结果。
task { (*task)(); }是一个lambda表达式,它是一个没有参数的函数对象。这个lambda表达式在被调用时会执行task中的任务。
在这个例子中,task是一个std::shared_ptr,它指向一个std::packaged_task对象。std::packaged_task对象包装了一个可调用对象,该对象在被调用时会执行函数f并传递参数args。
当task { (*task)(); }被调用时,它会解引用task智能指针,然后调用std::packaged_task对象的operator(),这会执行包装的任务。
这种用法的主要目的是将一个任务封装成一个可调用对象,这样就可以将任务作为参数传递给其他函数,或者存储在数据结构中,以便在未来的某个时间点执行。
需要注意的是,lambda表达式是一种匿名函对象,它可以捕获外部变量并在其自身的函数体中使用它们。在这个例子中,lambda表达式捕获了task智能指针,并在其函数体中解引用并调用它。这样就可以在新的线程中执行任务。
在lambda表达式中,&task { (*task)(); }会捕获task智能指针的引用,而不是值。这意味着,如果task智能指针在lambda表达式之外被销毁,那么在lambda表达式中使用task时就会出现悬挂引用,这是一种未定义行为。
因此,正确的做法是使用task { (*task)(); }来捕获task智能指针的值,这样就可以确保task智能指针在lambda表达式执行期间一直有效。
需要注意的是,捕获值会导致lambda表达式拷贝task智能指针
线程池销毁
~ThreadPool() {
{
std::unique_lock<std::mutex> lock(queueMutex);
stop = true;
}
condition.notify_all();
for (std::thread& worker : workers) {
worker.join();
}
}
这段代码是一个线程池的析构函数,它的主要目的是在销毁线程池时正确地关闭所有的工作线程。
首先,它使用std::unique_lock来获取queueMutex互斥锁,以确保在修改stop标志时不会有其他线程干扰。然后,它将stop标志设置为true,表示线程池需要停止运行。
接着,它调用condition.notify_all()来唤醒所有正在等待condition条件变量的线程。这些线程通常是工作线程,它们在等待新的任务出现在任务队列中。
最后,它遍历所有的工作线程,并调用join()方法来等待每个线程的结束。join()方法会阻塞当前线程,直到指定的线程结束执行。这样可以确保在销毁线程池时,所有的工作线程都已经正确地结束了。
总的来说,这段代码的目的是在销毁线程池时,正确地关闭所有的工作线程,并确保没有任何线程在运行。