目录
1.概念
2.封装原生线程方便使用
3.线程池工作日志
4.线程池需要处理的任务
5.进程池的实现
6.线程池运行测试
7.优化线程池(单例模式 )
单例模式概念
优化后的代码
8.测试单例模式
1.概念
线程池:* 一种线程使用模式。线程过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价。线程池不仅能够保证内核的充分利用,还能防止过分调度。可用线程数量应该取决于可用的并发处理器、处理器内核、内存、网络sockets等的数量。
* 线程池的应用场景:* 1. 需要大量的线程来完成任务,且完成任务的时间比较短。 WEB服务器完成网页请求这样的任务,使用线程池技术是非常合适的。因为单个任务小,而任务数量巨大,你可以想象一个热门网站的点击次数。 但对于长时间的任务,比如一个Telnet连接请求,线程池的优点就不明显了。因为Telnet会话时间比线程的创建时间大多了。
* 2. 对性能要求苛刻的应用,比如要求服务器迅速响应客户请求。
* 3. 接受突发性的大量请求,但不至于使服务器因此产生大量线程的应用。突发性大量客户请求,在没有线程池情况下,将产生大量线程,虽然理论上大部分操作系统线程数目最大值不是问题,短时间内产生大量线程可能使内存到达极限,出现错误.线程池包含:
- 线程池:就是预先创建一定数量的线程,并将它们放入一个“池子”中。当程序需要执行新任务时,它会从线程池中取出一个空闲的线程来执行该任务,而不是每次都创建一个新的线程。
- 任务队列:用于存放待执行的任务。当线程池中的所有线程都忙时,新任务会被添加到这个队列中等待处理。
本质上就是一个生产者和消费者模型。
2.封装原生线程方便使用
Thread.hpp:
#pragma once #include <iostream> #include <string> #include <functional> #include <pthread.h> namespace ThreadMoudle { // typedef std::function<void()> func_t; //using就是给该类型重命名,有参数接收线程名 //便于知道是什么线程做了什么事情 using func_t = std::function<void(const std::string&)>;//函数对象 class Thread { public: void Excute() { _isrunning = true; _func(_name); _isrunning = false; } public: Thread(const std::string &name, func_t func):_name(name), _func(func) { } static void *ThreadRoutine(void *args) // 新线程都会执行该方法! { Thread *self = static_cast<Thread*>(args); // 获得了当前对象 self->Excute(); return nullptr; } bool Start() { int n = ::pthread_create(&_tid, nullptr, ThreadRoutine, this); if(n != 0) return false; return true; } std::string Status() { if(_isrunning) return "running"; else return "sleep"; } void Stop() { if(_isrunning) { ::pthread_cancel(_tid); _isrunning = false; } } void Join() { ::pthread_join(_tid, nullptr); } std::string Name() { return _name; } ~Thread() { } private: std::string _name; pthread_t _tid; bool _isrunning; func_t _func; // 线程要执行的回调函数 }; }
3.线程池工作日志
关于日志的添加
在做大型项目的时候一般都要有日志
一般公司内部都有自己的日志库
日志是软件运行的记录信息,向显示器打印,向文件中打印。具有特定的格式
[日志等级][pid][filename][filenumber][time] 日志内容(支持可变参数)
日志等级:DEBUG INFO WANNING ERROR FATAL--致命的
这里我们自己写一个日志:
为了保证在多线程模式下,日志资源的安全,我们需要上锁,这是对锁的封装,方便调用:
LockGuard.hpp
#pragma once #include <pthread.h> class LockGuard { public: LockGuard(pthread_mutex_t *mutex):_mutex(mutex) { pthread_mutex_lock(_mutex); } ~LockGuard() { pthread_mutex_unlock(_mutex); } private: pthread_mutex_t *_mutex; };
日志代码:Log.hpp
#include<iostream> #pragma once #include <iostream> #include <sys/types.h> #include <unistd.h> #include <ctime> #include <cstdarg> #include <fstream> #include <cstring> #include <pthread.h> #include "LockGuard.hpp" namespace log_ns { enum//日志等级 { DEBUG = 1, INFO, WARNING, ERROR, FATAL }; //将日志等级转为字符串 std::string LevelToString(int level) { switch (level) { case DEBUG: return "DEBUG"; case INFO: return "INFO"; case WARNING: return "WARNING"; case ERROR: return "ERROR"; case FATAL: return "FATAL"; default: return "UNKNOWN"; } } //获取当前时间 std::string GetCurrTime() { time_t now = time(nullptr);//该函数可获取此时刻时间戳 //localtime通过时间戳可转化为当前的时间的年月日时分秒 struct tm *curr_time = localtime(&now); char buffer[128]; snprintf(buffer, sizeof(buffer), "%d-%02d-%02d %02d:%02d:%02d", curr_time->tm_year + 1900, curr_time->tm_mon + 1, curr_time->tm_mday, curr_time->tm_hour, curr_time->tm_min, curr_time->tm_sec); return buffer; } class logmessage { public: std::string _level;//等级 pid_t _id;//进程的id std::string _filename;//文件名 int _filenumber;//文件编号 std::string _curr_time;//时间 std::string _message_info;//日志内容 }; #define SCREEN_TYPE 1 //向显示器打印 #define FILE_TYPE 2 //往文件写 //给个缺省的文件名方便测试 const std::string glogfile = "./log.txt"; //保证在多线程模式下,日志资源的安全(上锁) pthread_mutex_t glock = PTHREAD_MUTEX_INITIALIZER; // log.logMessage("", 12, INFO, "this is a %d message ,%f, %s hellwrodl", x, , , ); class Log { public: Log(const std::string &logfile = glogfile) : _logfile(logfile), _type(SCREEN_TYPE) { } //调用该函数可以选择向哪里打印 void Enable(int type) { _type = type; } void FlushLogToScreen(const logmessage &lg) {//向显示器打印的方法 printf("[%s][%d][%s][%d][%s] %s", lg._level.c_str(), lg._id, lg._filename.c_str(), lg._filenumber, lg._curr_time.c_str(), lg._message_info.c_str()); } void FlushLogToFile(const logmessage &lg) {//向文件打印的方法 //文件操作的接口,std::ios::app表“append”模式 //所有的输出都会被追加到文件的末尾,而不是覆盖文件的现有内容。 std::ofstream out(_logfile, std::ios::app); if (!out.is_open()) return; char logtxt[2048]; snprintf(logtxt, sizeof(logtxt), "[%s][%d][%s][%d][%s] %s", lg._level.c_str(), lg._id, lg._filename.c_str(), lg._filenumber, lg._curr_time.c_str(), lg._message_info.c_str()); out.write(logtxt, strlen(logtxt)); out.close(); } void FlushLog(const logmessage &lg) { //对日志做保护本质是对打印的资源做保护 LockGuard lockguard(&glock); switch (_type)//选择向哪里打印 { case SCREEN_TYPE: FlushLogToScreen(lg); break; case FILE_TYPE: FlushLogToFile(lg); break; } } //记录的日志信息 void logMessage(std::string filename, int filenumber, int level, const char *format, ...) { logmessage lg; lg._level = LevelToString(level); lg._id = getpid();//当前进程自己的pid lg._filename = filename; lg._filenumber = filenumber; lg._curr_time = GetCurrTime(); //捕获可变参数c语言的处理方法 va_list ap; //可变参初始化 va_start(ap, format); //取出可变参数 char log_info[1024]; //把可变参变为转为字符串存放在log_info vsnprintf(log_info, sizeof(log_info), format, ap); //销毁 va_end(ap); lg._message_info = log_info; // 打印出来日志 FlushLog(lg); } ~Log() { } private: int _type;//表示向什么设备上打印 std::string _logfile;//如果向文件里打印,那么就要接收文件路径 }; //包含Log.hpp就能直接使用了,对外直接使用下面的三个接口就行了 Log lg; //使用宏封装调用接口,__FILE__ 和 __LINE__ 是两个预定义的宏, //它们分别用于在编译时提供当前源文件的名称和当前行号。 #define LOG(Level, Format, ...) \ do \ { \ lg.logMessage(__FILE__, __LINE__, Level, Format, ##__VA_ARGS__); \ } while (0) //向显示器打印 #define EnableScreen() \ do \ { \ lg.Enable(SCREEN_TYPE); \ } while (0) //向文件打印 #define EnableFILE() \ do \ { \ lg.Enable(FILE_TYPE); \ } while (0) };
在这个
LOG
宏中,##__VA_ARGS__
的用途主要是为了解决当LOG
宏被调用时没有提供除了Level
和Format
之外的额外参数时的问题。如果没有##
操作符,当LOG
宏以少于三个参数的形式被调用时(例如LOG(INFO, "Message");
),编译器会收到一个关于__VA_ARGS__
展开成空字符串后,逗号多余的错误,因为logMessage
函数的调用会像这样:lg.logMessage(__FILE__, __LINE__, Level, Format, ,);
,注意这里的两个逗号之间什么都没有。使用
##
操作符后,如果__VA_ARGS__
为空,则逗号会被省略,从而避免了编译错误。因此,当LOG
宏没有提供额外的参数时,logMessage
函数的调用会正确地成为lg.logMessage(__FILE__, __LINE__, Level, Format);
。总结来说,虽然在这个特定的例子中
##
操作符并没有直接连接两个标记,但它通过允许__VA_ARGS__
在宏展开时可能为空(从而省略了多余的逗号),确保了宏的灵活性和正确性。
4.线程池需要处理的任务
Task.hpp:
#pragma once #include<iostream> #include<functional> class Task { public: Task() { } Task(int x, int y) : _x(x), _y(y) { } void Excute() { _result = _x + _y; } void operator ()() { Excute(); } std::string debug() { std::string msg = std::to_string(_x) + "+" + std::to_string(_y) + "=?"; return msg; } std::string result() { std::string msg = std::to_string(_x) + "+" + std::to_string(_y) + "=" + std::to_string(_result); return msg; } private: int _x; int _y; int _result; };
5.进程池的实现
TheadPool.hpp:线程池的实现还是相对简单的,需要注意的地方在代码中已经做了注释
#pragma once #include <iostream> #include <unistd.h> #include <string> #include <vector> #include <queue> #include <functional> #include "Thread.hpp" #include "Log.hpp" #include "LockGuard.hpp" using namespace ThreadMoudle; using namespace log_ns;//日志 static const int gdefaultnum = 5; void test() { while (true) { std::cout << "hello world" << std::endl; sleep(1); } } template <typename T> class ThreadPool { public: void LockQueue() { pthread_mutex_lock(&_mutex);//锁住队列 } void UnlockQueue() { pthread_mutex_unlock(&_mutex);//解锁 } void Wakeup()//唤醒操作 { pthread_cond_signal(&_cond); } void WakeupAll()//全部唤醒 { pthread_cond_broadcast(&_cond); } void Sleep()//阻塞等待 { //线程被挂起,锁被归还 pthread_cond_wait(&_cond, &_mutex); } bool IsEmpty()//判断队列是否为空 { return _task_queue.empty(); } //处理任务 (处理任务队列中的任务 void HandlerTask(const std::string &name) // this { while (true)//线程不退出 { // 取任务 LockQueue(); //保护临界资源 //任务为空且线程处于运行状态,就该去休眠 while (IsEmpty() && _isrunning) { _sleep_thread_num++; LOG(INFO, "%s thread sleep begin!\n", name.c_str()); Sleep(); LOG(INFO, "%s thread wakeup!\n", name.c_str()); _sleep_thread_num--; } // 任务为空线程没有运行,那么就该退出了 if (IsEmpty() && !_isrunning) { UnlockQueue(); LOG(INFO, "%s thread quit\n", name.c_str()); break; } // 有任务 //T为任务类型,取任务 T t = _task_queue.front(); _task_queue.pop(); UnlockQueue(); // 处理任务 t(); // 处理任务,此处不用也不能在临界区中处理 //任务被取出来从任务队列中移走,放在一个临时空间中 //此处的任务只属于该线程,处理任务和临界资源的访问是两件事 //这样做提高了效率,不然处理任务就成了串行执行了 // std::cout << name << ": " << t.result() << std::endl; LOG(DEBUG, "hander task done, task is : %s\n", t.result().c_str()); } } void Init()//初始化,给封装的原生线程的pthread_create进行传参 { //HandlerTask有隐含的this指针,func_t只是单参数的,所以不能接收 //bind 被用来创建一个可调用对象 func,封装该类HandlerTask 成员函数 //和对该成员函数所属对象的引用(即 this 指针)。 //然后,这个可调用对象被传递给 th 类的构造函数,并存储在 std::vector<th> 中。 func_t func = std::bind(&ThreadPool::HandlerTask, this, std::placeholders::_1); //构建线程对象 for (int i = 0; i < _thread_num; i++) { std::string threadname = "thread-" + std::to_string(i + 1); //将任务也构造到线程中 _threads.emplace_back(threadname, func); //日志 LOG(DEBUG, "construct thread %s done, init success\n", threadname.c_str()); } } void Start()//启动线程 { _isrunning = true;//true则启动 for (auto &thread : _threads) { LOG(DEBUG, "start thread %s done.\n", thread.Name().c_str()); thread.Start();//这是封装的原生线程中的成员函数用于 //该成员函数封装了pthread_create,用于创建且运行线程 } } //构造 ThreadPool(int thread_num = gdefaultnum) : _thread_num(thread_num), _isrunning(false), _sleep_thread_num(0) { pthread_mutex_init(&_mutex, nullptr); pthread_cond_init(&_cond, nullptr); } public: void Stop() { LockQueue(); _isrunning = false;//false表示每运行则停止 WakeupAll();//如果设为false后有线程在休眠那么就退出不了了 UnlockQueue();//所以执行停止要将所有线程 全都唤醒 LOG(INFO, "Thread Pool Stop Success!\n"); } void Equeue(const T &in)//把任务放到任务队列中 { LockQueue(); //生产工作 if (_isrunning)//保证线程池是运行状态才执行生产工作 { _task_queue.push(in); if (_sleep_thread_num > 0)//有线程休眠才进行唤醒 Wakeup(); } UnlockQueue(); } ~ThreadPool() { pthread_mutex_destroy(&_mutex); pthread_cond_destroy(&_cond); } private: int _thread_num;//线程数量 std::vector<Thread> _threads;//组织多个线程 std::queue<T> _task_queue;//任务队列 bool _isrunning;//线程池是否在运行 int _sleep_thread_num;//在休眠的线程便于唤醒 pthread_mutex_t _mutex;//互斥锁 pthread_cond_t _cond;//条件变量 };
6.线程池运行测试
test.cc:
#include "ThreadPool.hpp" #include "Task.hpp" #include "Log.hpp" using namespace log_ns;//日志 int main() { EnableScreen();//向显示器打印日志 ThreadPool<Task> *tp = new ThreadPool<Task>(); tp->Init();//线程初始化 tp->Start();//线程创建好了,开始运行 int cnt = 10; while(cnt) { // 不断地向线程池推送任务 sleep(1); Task t(1,1); tp->Equeue(t);//把任务放到任务队列中 LOG(INFO, "equeue a task, %s\n", t.debug().c_str()); sleep(1); cnt--; } tp->Stop(); LOG(INFO, "thread pool stop!\n"); return 0; }
运行效果:
7.优化线程池(单例模式 )
单例模式概念
什么是设计模式?
IT行业这么火, 涌入的人很多. 俗话说林子大了啥鸟都有. 大佬和菜鸡们两极分化的越来越严重. 为了让菜鸡们不太拖大佬的后腿, 于是大佬们针对一些经典的常见的场景, 给定了一些对应的解决方案, 这个就是 设计模式
单例模式的特点
某些类, 只应该具有一个对象(实例), 就称之为单例.
例如一个男人只能有一个媳妇.
在很多服务器开发场景中, 经常需要让服务器加载很多的数据 (上百G) 到内存中. 此时往往要用一个单例的类来管理这些数据.
饿汉实现方式和懒汉实现方式
[洗完的例子]
吃完饭, 立刻洗碗, 这种就是饿汉方式. 因为下一顿吃的时候可以立刻拿着碗就能吃饭.
吃完饭, 先把碗放下, 然后下一顿饭用到这个碗了再洗碗, 就是懒汉方式.
懒汉方式最核心的思想是 "延时加载". 从而能够优化服务器的启动速度饿汉方式实现单例模式:
template <typename T> class Singleton { static T data; public: static T* GetInstance() { return &data; } };
只要通过 Singleton 这个包装类来使用 T 对象, 则一个进程中只有一个 T 对象的实例
懒汉方式实现单例模式:template <typename T> class Singleton { static T* inst; public: static T* GetInstance() { if (inst == NULL) { inst = new T(); } return inst; } };
在多线程环境中,如果多个线程同时调用
GetInstance()
方法,并且此时inst
还未被初始化(即inst == NULL
),那么这些线程可能会同时进入if
语句块,从而导致inst
被多次初始化。这不仅违反了单例模式的原则(即确保一个类仅有一个实例),还可能引发资源泄露或其他不可预测的行为。懒汉方式实现单例模式(线程安全版本)
// 懒汉模式, 线程安全 template <typename T> class Singleton { volatile static T* inst; // 需要设置 volatile 关键字, 否则可能被编译器优化. static std::mutex lock; public: static T* GetInstance() { if (inst == NULL) { // 双重判定空指针, 降低锁冲突的概率, 提高性能. lock.lock(); // 使用互斥锁, 保证多线程情况下也只调用一次 new. if (inst == NULL) { inst = new T(); } lock.unlock(); } return inst; } };
注意事项:
1. 加锁解锁的位置
2. 双重 if 判定, 避免不必要的锁竞争
3. volatile关键字防止过度优化volatile
关键字告诉编译器,被修饰的变量可能会在程序控制之外被修改,因此编译器在编译过程中不能对该变量的访问进行优化,比如不能将其缓存到寄存器中,从而每次访问都直接从内存中读取。这在某些嵌入式系统或硬件编程中很有用,因为硬件状态可能会随时改变。
优化后的代码
这里我们使用懒汉模式:
在单例模式中,禁止拷贝构造(以及拷贝赋值操作)是确保类的唯一实例不被意外复制,在多线程下引发资源泄露、竞争条件或数据不一致等问题。
变化部分:
创建线程池,只能通过获取单例的方法执行:
直接设置创建单例的指针,指向创建的线程池。需要对单例上锁。
完整代码:
#pragma once #include <iostream> #include <unistd.h> #include <string> #include <vector> #include <queue> #include <functional> #include "Thread.hpp" #include "Log.hpp" #include "LockGuard.hpp" using namespace ThreadMoudle; using namespace log_ns;//日志 static const int gdefaultnum = 5; void test() { while (true) { std::cout << "hello world" << std::endl; sleep(1); } } template <typename T> class ThreadPool { private: void LockQueue() { pthread_mutex_lock(&_mutex);//锁住队列 } void UnlockQueue() { pthread_mutex_unlock(&_mutex);//解锁 } void Wakeup()//唤醒操作 { pthread_cond_signal(&_cond); } void WakeupAll()//全部唤醒 { pthread_cond_broadcast(&_cond); } void Sleep()//阻塞等待 { //线程被挂起,锁被归还 pthread_cond_wait(&_cond, &_mutex); } bool IsEmpty()//判断队列是否为空 { return _task_queue.empty(); } //处理任务 (处理任务队列中的任务 void HandlerTask(const std::string &name) // this { while (true)//线程不退出 { // 取任务 LockQueue(); //保护临界资源 //任务为空且线程处于运行状态,就该去休眠 while (IsEmpty() && _isrunning) { _sleep_thread_num++; LOG(INFO, "%s thread sleep begin!\n", name.c_str()); Sleep(); LOG(INFO, "%s thread wakeup!\n", name.c_str()); _sleep_thread_num--; } // 任务为空线程没有运行,那么就该退出了 if (IsEmpty() && !_isrunning) { UnlockQueue(); LOG(INFO, "%s thread quit\n", name.c_str()); break; } // 有任务 //T为任务类型,取任务 T t = _task_queue.front(); _task_queue.pop(); UnlockQueue(); // 处理任务 t(); // 处理任务,此处不用也不能在临界区中处理 //任务被取出来从任务队列中移走,放在一个临时空间中 //此处的任务只属于该线程,处理任务和临界资源的访问是两件事 //这样做提高了效率,不然处理任务就成了串行执行了 // std::cout << name << ": " << t.result() << std::endl; LOG(DEBUG, "hander task done, task is : %s\n", t.result().c_str()); } } void Init()//初始化,给封装的原生线程的pthread_create进行传参 { //HandlerTask有隐含的this指针,func_t只是单参数的,所以不能接收 //bind 被用来创建一个可调用对象 func,封装该类HandlerTask 成员函数 //和对该成员函数所属对象的引用(即 this 指针)。 //然后,这个可调用对象被传递给 th 类的构造函数,并存储在 std::vector<th> 中。 func_t func = std::bind(&ThreadPool::HandlerTask, this, std::placeholders::_1); //构建线程对象 for (int i = 0; i < _thread_num; i++) { std::string threadname = "thread-" + std::to_string(i + 1); //将任务也构造到线程中 _threads.emplace_back(threadname, func); //日志 LOG(DEBUG, "construct thread %s done, init success\n", threadname.c_str()); } } void Start()//启动线程 { _isrunning = true;//true则启动 for (auto &thread : _threads) { LOG(DEBUG, "start thread %s done.\n", thread.Name().c_str()); thread.Start();//这是封装的原生线程中的成员函数用于 //该成员函数封装了pthread_create,用于创建且运行线程 } } //构造函数私有 ThreadPool(int thread_num = gdefaultnum) : _thread_num(thread_num), _isrunning(false), _sleep_thread_num(0) { pthread_mutex_init(&_mutex, nullptr); pthread_cond_init(&_cond, nullptr); } ThreadPool(const ThreadPool<T> &) = delete;//单例模式下禁止拷贝构造 //不然会有线程安全问题 void operator=(const ThreadPool<T> &) = delete;//同理赋值操作也是不能有的 public: void Stop() { LockQueue(); _isrunning = false;//false表示每运行则停止 WakeupAll();//如果设为false后有线程在休眠那么就退出不了了 UnlockQueue();//所以执行停止要将所有线程 全都唤醒 LOG(INFO, "Thread Pool Stop Success!\n"); } // 多线程获取单例的方法 static ThreadPool<T> *GetInstance()//引用了静态成员变量,该函数也得是静态的 { if (_tp == nullptr)//为空才能创建该对象,线程池只需要创建一次 { //创建线程池的过程必须是串行的,上锁! LockGuard lockguard(&_sig_mutex); if (_tp == nullptr) { LOG(INFO, "create threadpool\n"); // thread-1 thread-2 thread-3.... _tp = new ThreadPool(); _tp->Init(); _tp->Start(); } else { LOG(INFO, "get threadpool\n"); } } return _tp; } void Equeue(const T &in)//把任务放到任务队列中 { LockQueue(); //生产工作 if (_isrunning)//保证线程池是运行状态才执行生产工作 { _task_queue.push(in); if (_sleep_thread_num > 0)//有线程休眠才进行唤醒 Wakeup(); } UnlockQueue(); } ~ThreadPool() { pthread_mutex_destroy(&_mutex); pthread_cond_destroy(&_cond); } private: int _thread_num;//线程数量 std::vector<Thread> _threads;//组织多个线程 std::queue<T> _task_queue;//任务队列 bool _isrunning;//线程池是否在运行 int _sleep_thread_num;//在休眠的线程便于唤醒 pthread_mutex_t _mutex;//互斥锁 pthread_cond_t _cond;//条件变量 // 单例模式 // volatile static ThreadPool<T> *_tp; static ThreadPool<T> *_tp;//线程池所对应的指针,静态成员只能在类外完成初始化(单例) static pthread_mutex_t _sig_mutex;//单例的锁 }; //静态成员只能在类外完成初始化 template <typename T> ThreadPool<T>* ThreadPool<T>::_tp = nullptr; template <typename T> pthread_mutex_t ThreadPool<T>::_sig_mutex = PTHREAD_MUTEX_INITIALIZER;
8.测试单例模式
#include "ThreadPool.hpp" #include "Task.hpp" #include "Log.hpp" using namespace log_ns; int main() { EnableFILE();//向文件打印 int cnt =10; while(cnt) { // 不断地向线程池推送任务 sleep(1); Task t(1,1); ThreadPool<Task>::GetInstance()->Equeue(t);//单例模式下的创建 LOG(INFO, "equeue a task, %s\n", t.debug().c_str()); sleep(1); cnt--; } ThreadPool<Task>::GetInstance()->Stop(); LOG(INFO, "thread pool stop!\n"); return 0; }
运行效果: