文章目录
- 线程池概述:
- 线程池示例:
- 代码细节
- 代码
- 结果展示
线程池概述:
一种线程使用模式。
- 线程过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价。线程池不仅能够保证内核的充分利用,还能防止过分调度。可用线程数量应该取决于可用的并发处理器、处理器内核、内存、网络sockets等的数量。
线程池的应用场景:
-
- 需要大量的线程来完成任务,且完成任务的时间比较短。 WEB服务器完成网页请求这样的任务,使用线程池技术是非常合适的。因为单个任务小,而任务数量巨大,你可以想象一个热门网站的点击次数。 但对于长时间的任务,比如一个Telnet连接请求,线程池的优点就不明显了。因为Telnet会话时间比线程的创建时间大多了。
-
- 对性能要求苛刻的应用,比如要求服务器迅速响应客户请求。
-
- 接受突发性的大量请求,但不至于使服务器因此产生大量线程的应用。突发性大量客户请求,在没有线程池情况下,将产生大量线程,虽然理论上大部分操作系统线程数目最大值不是问题,短时间内产生大量线程可能使内存到达极限,出现错误.
线程池示例:
-
- 创建固定数量线程池,循环从任务队列中获取任务对象,
-
- 获取到任务对象后,执行任务对象中的任务接口
代码细节
- 线程创建和等待封装
- 日志封装
- to_string: 可以把数字转换为字符串
- _c++11(可变参数模板)
- 内存池–执行任务
- 执行任务我们知道创建线程时需要传void*类型的函数(我们执行任务)它只有一个参数;我们需要在类内设置成静态函数(去掉this指针);而内存池的this的this指针提高args传参
int pthread_create(pthread_t *__restrict__ __newthread, const pthread_attr_t *__restrict__ __attr, void *(*__start_routine)(void *), void *__restrict__ __arg)
-
类内成员变量访问我们通过类成员函数就可以了
-
执行任务和发布任务(线程池本质也是一个生产消费模型)我们此次需要条件变量和线程锁来保证线程安全(同步与互斥关系)
-
锁封装时保证这一把锁是一个就可以了(指针或者引用都可以)
代码
- 线程锁封装
LockGuard.hpp
#pragma once
// 线程锁封装
#include <iostream>
#include <pthread.h>
class Mutex
{
public:
Mutex(pthread_mutex_t* mtx):_mtx(mtx)
{}
void lock()
{
//std::cout << "要进行加锁" << std::endl;
pthread_mutex_lock(_mtx);
}
void unlock()
{
//std::cout << "要进行解锁" << std::endl;
pthread_mutex_unlock(_mtx);
}
~Mutex()
{}
public:
pthread_mutex_t* _mtx;
};
// RAII风格的加锁方式
class LockGuard
{
public:
LockGuard(pthread_mutex_t* mtx):_mx(mtx)
{
_mx.lock();
}
~LockGuard()
{
_mx.unlock();
}
private:
Mutex _mx;
};
- 日志
Log.hpp
#pragma once
#include <iostream>
#include <cstdio>
#include <cstdarg>
#include <ctime>
#include <string>
using std::cout;
using std::endl;
using std::string;
// 日志是有日志级别的
#define DEBUG 0 // 调试
#define NORMAL 1 // 正常
#define WARNING 2 // 警告
#define ERROR 3 // 错误
#define FATAL 4 // 致命错误
static const size_t BUFFER_NUM = 1024;
const char *gLevelMap[] = {
"DEBUG",
"NORMAL",
"WARNING",
"ERROR",
"FATAL"};
// 完整的日志功能,至少: 日志等级 时间 支持用户自定义(日志内容, 文件行,文件名)
void LogMessage(int level, const char *format, ...)
{
char stdBuffer[BUFFER_NUM]; // 标准部分
const time_t timestamp = time(nullptr);
struct tm *L_Time = localtime(×tamp);
string time;
time += "日期-时间:" + std::to_string(L_Time->tm_year+1900) + "/" + std::to_string(L_Time->tm_mon) + "/" + std::to_string(L_Time->tm_mday) + "-" + std::to_string(L_Time->tm_hour) + ":" + std::to_string(L_Time->tm_min) + ":" + std::to_string(L_Time->tm_sec);
std::to_string(L_Time->tm_sec);
snprintf(stdBuffer, sizeof stdBuffer, "[%s][%s]",
gLevelMap[level], time.c_str());
char logBuffer[BUFFER_NUM]; // 自定义部分
va_list args;
va_start(args, format);
vsnprintf(logBuffer, sizeof logBuffer, format, args);
va_end(args);
printf("%s%s\n", stdBuffer, logBuffer);
}
- 任务(封装)
Task.hpp
#pragma once
// 制作任务(封装)
#include "thread.hpp"
#include "Log.hpp"
using func_t = std::function<int(int, int)>;
class Task
{
public:
Task() {} // 便于获取任务
Task(int x, int y, func_t func) : _x(x), _y(y), _func(func) // 制作任务
{
}
int operator()(string name) // 仿函数
{
//cout << "名字:" << name << "结果:" << _x << '*' << _y << '=' << _func(_x, _y) << endl;
LogMessage(NORMAL, "%s处理完成: %d*%d=%d | %s | %d",
name.c_str(), _x, _y, _func(_x, _y), __FILE__, __LINE__);
}
public:
int _x;
int _y;
func_t _func;
};
- 多线程封装
thread.hpp
#pragma once
#include <iostream>
#include <string>
#include <functional>
#include <cstdio>
using std::cout;
using std::endl;
using std::string;
static const size_t NAME_NUM = 1024;
typedef void*(*fun_t)(void*);
class ThreadDate
{
public:
void *_args;
string _name;
};
class Thread
{
public:
Thread(int num, fun_t callback, void *args) : _func(callback)
{
char nameBuffer[NAME_NUM];
snprintf(nameBuffer, sizeof nameBuffer, "Thread-%d", num);
_tdata._name = nameBuffer;
_tdata._args = args;
}
void start() // 创造线程
{
pthread_create(&_tid, nullptr, _func, (void *)&_tdata);
}
void join() // 等待线程
{
pthread_join(_tid, nullptr);
}
string &name() // 线程名字
{
return _tdata._name;
}
private:
fun_t _func;
ThreadDate _tdata;
pthread_t _tid;
};
- 线程池封装
threadPool.hpp
#pragma once
#include <vector>
#include <queue>
#include <thread>
#include "thread.hpp"
#include "LockGuard.hpp"
#include "Log.hpp"
static const size_t g_thread_num = 4;
template <class T>
class ThreadPool
{
public:
pthread_mutex_t *getMutex()
{
return &_lock;
}
bool isEmpty()
{
return _taskQueue.empty();
}
void waitCond()
{
pthread_cond_wait(&_cond, &_lock);
}
T getTask()
{
T t = _taskQueue.front();
_taskQueue.pop();
return t;
}
public:
ThreadPool(int threadNum = g_thread_num)
{
// 初始化
pthread_mutex_init(&_lock, nullptr);
pthread_cond_init(&_cond, nullptr);
for (size_t i = 1; i <= threadNum; ++i)
{
_threads.push_back(new Thread(i, Routine, this));
}
}
void run()
{
for (auto &iter : _threads)
{
iter->start();
// std::cout << iter->name() << " 启动成功" << std::endl;
LogMessage(NORMAL, "%s: 启动成功 | %s | %d", iter->name().c_str(), __FILE__, __LINE__);
}
}
// void joins() // 测试
// {
// for (auto &iter : _threads)
// {
// iter->join();
// }
// }
static void *Routine(void *args) // 消费过程
{
ThreadDate *td = (ThreadDate *)args;
ThreadPool<T> *tp = (ThreadPool<T> *)td->_args; // 类this指针
while (true)
{
T task;
{
LockGuard LockGuard(tp->getMutex()); // 上锁
while (tp->isEmpty())
tp->waitCond(); // 条件变量
task = tp->getTask(); // 任务队列是共享的-> 将任务从共享,拿到自己的私有空间
} // 局部变量出作用域解锁
task(td->_name); // 仿函数执行任务
}
}
// 生产过程
void PushTask(const T &task)
{
LockGuard LockGuard(&_lock);
_taskQueue.push(task); // 私有->公有
pthread_cond_signal(&_cond); // 唤醒Routine 开始执行任务
}
~ThreadPool()
{
for (auto &iter : _threads)
{
iter->join();
delete iter;
}
pthread_mutex_destroy(&_lock);
pthread_cond_destroy(&_cond);
}
private:
std::vector<Thread *> _threads;
std::queue<T> _taskQueue;
// 线程池本质也是一个生产消费模型
pthread_mutex_t _lock;
pthread_cond_t _cond;
};
- 主函数测试
#include <unistd.h>
#include <ctime>
#include "threadPool.hpp"
#include "Task.hpp"
int main()
{
srand((u_int64_t)time(nullptr) ^ getpid() ^ 2023210); // 随机数种子
ThreadPool<Task> *tp = new ThreadPool<Task>();
tp->run();
while (true)
{
// 生产的过程,制作任务的时候,要花时间
int x = rand() % 100 + 1;
usleep(2023);
int y = rand() % 210 + 1;
Task t(x, y, [](int x, int y) // lambda
{ return x * y; });
// cout << "制作任务完成: " << x << "*" << y << "=?" << endl;
LogMessage(WARNING, "制作任务完成: %d+%d=? | %s | %d", x, y, __FILE__, __LINE__);
// 推送任务到线程池中
tp->PushTask(t); // 超市-> 私有-》共有
sleep(1);
}
return 0;
}