朋友们、伙计们,我们又见面了,本期来给大家带来线程池相关的知识点,如果看完之后对你有一定的启发,那么请留下你的三连,祝大家心想事成!
C 语 言 专 栏:C语言:从入门到精通
数据结构专栏:数据结构
个 人 主 页 :stackY、
C + + 专 栏 :C++
Linux 专 栏 :Linux
目录
1. 线程池
1.1 预备工作
1.2 模块设计
2. 基础框架
2.1 测试版接口
2.2.1 Main.cc测试
3. 引入任务
3.1 执行任务(ThreadRun)
3.2 添加任务
3.3 Main.cc测试
4. 接入日志功能
5. 设计单例
1. 线程池
我们之前要用到线程的地方都是当有任务来的时候我们才创建线程,就会导致效率比较低下,所以我们想实现一个线程池,预先提现创建好一批线程,然后在线程池中会有一个任务队列,我们也可以向任务队列中添加任务,然后我们预先创建好的线程就开始依次从任务队列中获取任务并执行,这样子效率就高不少了;
1.1 预备工作
我们的线程池因为是多线程,所以必须要有锁,所以我们先将锁进行简单的封装:
LockGuard.hpp:
#pragma once #include <pthread.h> // 不定义锁,默认认为外部会给我们传入锁对象 class Mutex { public: Mutex(pthread_mutex_t *lock):_lock(lock) {} void Lock() { pthread_mutex_lock(_lock); } void Unlock() { pthread_mutex_unlock(_lock); } ~Mutex() {} private: pthread_mutex_t *_lock; }; class LockGuard { public: LockGuard(pthread_mutex_t *lock): _mutex(lock) { _mutex.Lock(); } ~LockGuard() { _mutex.Unlock(); } private: Mutex _mutex; };
然后我们线程池也需要用到线程,所以我们使用之前封装好的线程(Thread.hpp),同时我们还想引入一下日志功能(Log.hpp);
关于日志的详细实现请移步至:负载均衡在线OJ里面有对日志的介绍,这里将其复用一下,稍作修改;
1.2 模块设计
我们实现线程池使用分模块编写,每个文件实现对应的功能,让代码不会冗余在一起;
- ThreadPool.hpp:实现线程池的主要逻辑
- Thread.hpp:封装原生线程库
- LockGuard.hpp:封装互斥锁
- Log.hpp:日志功能
- Main.cc:对线程池进行测试
- Makefile:自动化构建代码
2. 基础框架
- 我们的线程池中需要有一批线程,所以我们使用vector来管理这批线程,还可以设置线程池中需要有多少个线程;
- 然后需要有一个储存任务的队列、锁、条件变量;
- 我们使用模版编程来实现自定义任务类型;
- 初步实现的功能就是一个启动线程池和一个向线程池中push任务。
2.1 测试版接口
- 我们想有一个运行任务的接口(ThreadRun),然后还想要一个等待线程的接口(Wait);
- 我们要运行线程池就先需要创建一批线程,这个工作我们在构造函数中实现;
- 有了线程之后,我们要启动线程就遍历线程池,然后一次调用他们的启动接口即可;
- 为了先进行测试,我们的ThreadRun接口就先写一段测试代码;
- 等待线程的接口同样也是遍历然后调用join;
2.2.1 Main.cc测试
这样子已经实现了一个非常简易版的线程池了;
3. 引入任务
上面实现的代码是用来测试基本逻辑的,我们的任务队列中是没有任务的,所以接下来我们需要引入一批任务:
Task.hpp:
#pragma once #include <iostream> #include <string> #include <unistd.h> const int defaultvalue = 0; enum { ok = 0, div_zero, mod_zero, unknow }; const std::string opers = "+-*/%"; class Task { public: Task() { } Task(int x, int y, char op) : data_x(x), data_y(y), oper(op), result(defaultvalue), code(ok) { } void Run() { switch (oper) { case '+': result = data_x + data_y; break; case '-': result = data_x - data_y; break; case '*': result = data_x * data_y; break; case '/': { if (data_y == 0) code = div_zero; else result = data_x / data_y; } break; case '%': { if (data_y == 0) code = mod_zero; else result = data_x % data_y; } break; default: code = unknow; break; } } void operator()() { Run(); } std::string PrintTask() { std::string s; s = std::to_string(data_x); s += oper; s += std::to_string(data_y); s += "=?"; return s; } std::string PrintResult() { std::string s; s = std::to_string(data_x); s += oper; s += std::to_string(data_y); s += "="; s += std::to_string(result); s += " ["; s += std::to_string(code); s += "]"; return s; } ~Task() { } private: int data_x; int data_y; char oper; // + - * / % int result; int code; // 结果码,0: 结果可信 !0: 结果不可信,1,2,3,4 };
关于任务设计大家也可以自己设计,不想设计的直接拷贝即可,这里的任务就是对数据进行四种运算;
3.1 执行任务(ThreadRun)
有了任务之后,我们想要执行我们的任务,在执行之前我们想要实现两个接口,一个是任务队列没有任务时,线程就一直等待,另一个是有了任务就将线程进行唤醒并执行任务;
在执行任务接口中首先肯定需要取任务(在任务队列中取任务)如果任务队列中没有任务,就需要进行等待,取到任务之后就将任务分配给线程进行处理任务即可,取任务的这个过程是需要用锁进行保护的;
3.2 添加任务
添加任务也是需要用锁保护的,另外将任务添加到任务队列中时需要将线程唤醒去执行任务;
3.3 Main.cc测试
本次测试我们就使用我们引入的任务;
Main.cc:
#include <iostream> #include <memory> #include <ctime> #include "ThreadPool.hpp" #include "Task.hpp" int main() { std::unique_ptr<ThreadPool<Task>> tp(new ThreadPool<Task>()); tp->Start(); srand((uint64_t)time(nullptr) ^ getpid()); while (true) { int x = rand() % 100 + 1; usleep(1234); int y = rand() % 200; usleep(1234); char oper = opers[rand() % opers.size()]; Task t(x, y, oper); tp->Push(t); // std::cout << "make task: " << t.PrintTask() << std::endl; sleep(1); } tp->Wait(); return 0; }
4. 接入日志功能
#pragma once #include <iostream> #include <vector> #include <string> #include <queue> #include <unistd.h> #include "Thread.hpp" #include "LockGuard.hpp" #include "Log.hpp" #include "Task.hpp" using namespace ns_log; static const int defaultnum = 5; // 线程数据 class ThreadData { public: ThreadData(const std::string &name) : threadname(name) { } ~ThreadData() { } public: std::string threadname; }; template <class T> class ThreadPool { public: ThreadPool(int num = defaultnum) : _thread_num(num) { pthread_mutex_init(&_mutex, nullptr); pthread_cond_init(&_cond, nullptr); // 创建指定个数的线程 for (int i = 0; i < _thread_num; i++) { std::string threadname = "thread-"; threadname += std::to_string(i + 1); ThreadData td(threadname); // 创建 // Thread<ThreadData> t(threadname, std::bind(&ThreadPool<T>::ThreadRun, this, std::placeholders::_1), td); // _threads.push_back(t); // 两种写法都可行,这种写法可以减少拷贝 _threads.emplace_back(threadname, std::bind(&ThreadPool<T>::ThreadRun, this, std::placeholders::_1), td); LOG(INFO) << td.threadname << " is created..." << "\n"; } } // 启动线程池 bool Start() { for (auto &thread : _threads) { thread.Start(); LOG(INFO) << thread.ThreadName() << " is running..." << "\n"; } return true; } void ThreadWait() // 等待 { pthread_cond_wait(&_cond, &_mutex); } void ThreadWakeup() // 唤醒 { pthread_cond_signal(&_cond); } // 执行任务 void ThreadRun(ThreadData &td) { while (true) { T t; // 取任务 { // 加锁 LockGuard lockguard(&_mutex); while (_q.empty()) { // 没有任务时就等待 ThreadWait(); } t = _q.front(); _q.pop(); } // 处理任务 t(); LOG(DEBUG) << td.threadname << " handler task " << t.PrintTask() << " ,result is: " << t.PrintResult() << "\n"; } } // 添加任务 void Push(const T &in) { // 加锁 LockGuard lockguard(&_mutex); // 添加 _q.push(in); // 唤醒 ThreadWakeup(); } // 等待线程 void Wait() { for (auto &thread : _threads) { thread.Join(); } } ~ThreadPool() { pthread_mutex_destroy(&_mutex); pthread_cond_destroy(&_cond); } private: ThreadPool(const ThreadPool<T> &tp) = delete; const ThreadPool<T> &operator=(const ThreadPool<T>) = delete; std::queue<T> _q; // 任务队列 std::vector<Thread<ThreadData>> _threads; // 线程池 int _thread_num; // 线程个数 pthread_mutex_t _mutex; // 锁 pthread_cond_t _cond; // 条件变量 };
5. 设计单例
除了上面实现的功能,我们也可以将我们的线程池设置成为单例模式,这里采用的是懒汉模式;
需要注意的是,因为是多线程,所以在获取单例的时候存在线程安全问题,如果同时有多个线程来获取单例,就会创建多个单例,所以也需要用到互斥锁;
所有源码:https://gitee.com/yue-sir-bit/linux/tree/master/ThreadPool