文章目录
- 九、多线程
- 10. 线程池
- 未完待续
九、多线程
10. 线程池
这里我没实现一些 懒汉单例模式 的线程池,并且包含 日志打印 的线程池:
Makefile:
threadpool:Main.cc
g++ -o $@ $^ -std=c++11 -lpthread
.PHONY:clean
clean:
rm -f threadpool
Thread.hpp:
#ifndef __THREAD_HPP__
#define __THREAD_HPP__
#include <iostream>
#include <string>
#include <unistd.h>
#include <functional>
#include <pthread.h>
namespace ThreadModule
{
// 类型别名
using func_t = std::function<void(std::string)>;
// 线程类
class Thread
{
public:
void Excute()
{
_func(_threadname);
}
public:
Thread(func_t func, std::string name = "none-name")
:_func(func)
,_threadname(name)
,_stop(true)
{}
// 执行任务
static void *threadroutine(void *args)
{
Thread *self = static_cast<Thread*>(args);
self->Excute();
return nullptr;
}
// 启动线程
bool Start()
{
int n = pthread_create(&_tid, nullptr, threadroutine, this);
if(!n)
{
_stop = false;
return true;
}
else
{
return false;
}
}
// 停止线程
void Detach()
{
if(!_stop)
{
pthread_detach(_tid);
}
}
// 等待线程结束
void Join()
{
if(!_stop)
{
pthread_join(_tid, nullptr);
}
}
std::string name()
{
return _threadname;
}
void Stop()
{
_stop = true;
}
~Thread()
{}
private:
pthread_t _tid;
std::string _threadname;
func_t _func;
bool _stop;
};
}
#endif
LockGuard.hpp:
#ifndef __LOCK_GUARD_HPP__
#define __LOCK_GUARD_HPP__
#include <iostream>
#include <pthread.h>
class LockGuard
{
public:
// 构造函数加锁
LockGuard(pthread_mutex_t *mutex)
:_mutex(mutex)
{
pthread_mutex_lock(_mutex);
}
// 析构函数解锁
~LockGuard()
{
pthread_mutex_unlock(_mutex);
}
private:
pthread_mutex_t *_mutex;
};
#endif
Log.hpp:
#pragma once
#include <iostream>
#include <fstream>
#include <cstdio>
#include <string>
#include <ctime>
#include <cstdarg>
#include <sys/types.h>
#include <unistd.h>
#include <pthread.h>
#include "LockGuard.hpp"
// 宏定义,用于定义日志格式
#define LOG(level, format, ...) do{LogMessage(__FILE__, __LINE__, gIsSave, level, format, ##__VA_ARGS__);}while (0)
// 将日志输入到文件
#define EnableFile() do{gIsSave = true;}while (0)
// 将日志输出到显示器
#define EnableScreen() do{gIsSave = false;}while (0)
bool gIsSave = false;
// 日志文件名
const std::string logname = "log.txt";
// 枚举日志级别
enum Level
{
DEBUG = 0,
INFO,
WARNING,
ERROR,
FATAL
};
// 保存日志到文件
void SaveFile(const std::string &filename, const std::string &message)
{
std::ofstream out(filename, std::ios::app);
if (!out.is_open())
{
return;
}
out << message;
out.close();
}
// 日志级别转字符串
std::string LevelToString(int level)
{
switch (level)
{
case DEBUG:
return "Debug";
case INFO:
return "Info";
case WARNING:
return "Warning";
case ERROR:
return "Error";
case FATAL:
return "Fatal";
default:
return "Unknown";
}
}
// 获取当前时间字符串
std::string GetTimeString()
{
time_t curr_time = time(nullptr);
struct tm *format_time = localtime(&curr_time);
if (format_time == nullptr)
return "None";
char time_buffer[1024];
snprintf(time_buffer, sizeof(time_buffer), "%d-%d-%d %d:%d:%d",
format_time->tm_year + 1900,
format_time->tm_mon + 1,
format_time->tm_mday,
format_time->tm_hour,
format_time->tm_min,
format_time->tm_sec);
return time_buffer;
}
// 日志锁,同一时刻只能写一个日志
pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;
// 日志信息
void LogMessage(std::string filename, int line, bool issave, int level, const char *format, ...)
{
// 日志级别
std::string levelstr = LevelToString(level);
// 时间
std::string timestr = GetTimeString();
// 进程id
pid_t selfid = getpid();
// 日志内容
char buffer[1024];
va_list arg;
va_start(arg, format);
vsnprintf(buffer, sizeof(buffer), format, arg);
va_end(arg);
// 日志格式化
std::string message = "[" + timestr + "]" + "[" + levelstr + "]" +
"[" + std::to_string(selfid) + "]" +
"[" + filename + "]" + "[" + std::to_string(line) + "] " + buffer + "\n";
LockGuard lockguard(&lock);
// 输出日志
if (!issave)
{
std::cout << message;
}
else
{
SaveFile(logname, message);
}
}
Task.hpp:
#pragma once
#include <iostream>
#include <string>
#include <functional>
class Task
{
public:
Task()
{}
Task(int a, int b)
:_a(a)
,_b(b)
,_result(0)
{}
// 执行加法功能
void Excute()
{
_result = _a + _b;
}
// 结果
std::string ResultToString()
{
return std::to_string(_a) + "+" + std::to_string(_b) + "=" + std::to_string(_result);
}
// 问题
std::string DebugToString()
{
return std::to_string(_a) + "+" + std::to_string(_b) + "= ?";
}
// 重载()运算符
void operator()()
{
Excute();
}
private:
int _a;
int _b;
int _result;
};
ThreadPool.hpp:
#pragma once
#include <iostream>
#include <vector>
#include <queue>
#include <pthread.h>
#include "Log.hpp"
#include "Thread.hpp"
#include "LockGuard.hpp"
using namespace ThreadModule;
// 线程池默认线程数
const static int gdefaultthreadnum = 10;
template <typename T>
class ThreadPool
{
private:
// 线程互斥锁
void LockQueue()
{
pthread_mutex_lock(&_mutex);
}
// 线程互斥解锁
void UnlockQueue()
{
pthread_mutex_unlock(&_mutex);
}
// 线程等待
void ThreadSleep()
{
pthread_cond_wait(&_cond, &_mutex);
}
// 线程唤醒
void ThreadWakeup()
{
pthread_cond_signal(&_cond);
}
// 唤醒全部线程
void ThreadWakeupAll()
{
pthread_cond_broadcast(&_cond);
}
// 私有构造函数
ThreadPool(int threadnum = gdefaultthreadnum)
:_threadnum(threadnum)
,_waitnum(0)
,_isrunning(false)
{
// 初始化锁
pthread_mutex_init(&_mutex, nullptr);
pthread_cond_init(&_cond, nullptr);
// 日志
LOG(INFO, "ThreadPool Construct()");
}
// 初始化线程池
void InitThreadPool()
{
// 创建一批线程
for (int num = 0; num < _threadnum; num++)
{
std::string name = "thread-" + std::to_string(num + 1);
_threads.emplace_back(std::bind(&ThreadPool::HandlerTask, this, std::placeholders::_1), name);
// 日志
LOG(INFO, "init thread %s done", name.c_str());
}
_isrunning = true;
}
// 启动线程池
void Start()
{
for (auto &thread : _threads)
{
thread.Start();
}
}
// 任务处理函数
void HandlerTask(std::string name) // 类的成员方法,也可以成为另一个类的回调方法,方便我们继续类级别的互相调用!
{
// 日志
LOG(INFO, "%s is running...", name.c_str());
while (true)
{
// 加锁
LockQueue();
while (_task_queue.empty() && _isrunning)
{
_waitnum++;
ThreadSleep();
_waitnum--;
}
// 退出情况
if (_task_queue.empty() && !_isrunning)
{
UnlockQueue();
break;
}
// 取出任务
T t = _task_queue.front();
_task_queue.pop();
UnlockQueue();
// 日志
LOG(DEBUG, "%s get a task", name.c_str());
// 执行任务
t();
// 日志
LOG(DEBUG, "%s handler a task, result is: %s", name.c_str(), t.ResultToString().c_str());
}
}
// 禁用拷贝构造和赋值操作
ThreadPool<T> &operator=(const ThreadPool<T> &) = delete;
ThreadPool(const ThreadPool<T> &) = delete;
public:
static ThreadPool<T> *GetInstance()
{
// 首次使用时,创建线程池单例
if (nullptr == _instance)
{
// 对于多线程创建单例时加锁,保证线程安全
LockGuard lockguard(&_lock);
if (nullptr == _instance)
{
// 创建线程池实例
_instance = new ThreadPool<T>();
_instance->InitThreadPool();
_instance->Start();
LOG(DEBUG, "创建线程池单例");
return _instance;
}
}
// 已经创建过线程池单例,直接返回
LOG(DEBUG, "获取线程池单例");
return _instance;
}
// 停止线程池
void Stop()
{
LockQueue();
_isrunning = false;
ThreadWakeupAll();
UnlockQueue();
}
// 等待线程池退出
void Wait()
{
for (auto &thread : _threads)
{
thread.Join();
LOG(INFO, "%s is quit...", thread.name().c_str());
}
}
// 向线程池中添加任务
bool Enqueue(const T &t)
{
bool ret = false;
LockQueue();
if (_isrunning)
{
_task_queue.push(t);
if (_waitnum > 0)
{
ThreadWakeup();
}
LOG(DEBUG, "enqueue task success");
ret = true;
}
UnlockQueue();
return ret;
}
// 析构自动释放锁资源
~ThreadPool()
{
pthread_mutex_destroy(&_mutex);
pthread_cond_destroy(&_cond);
}
private:
// 线程池中线程个数
int _threadnum;
// 线程
std::vector<Thread> _threads;
// 任务队列
std::queue<T> _task_queue;
// 互斥锁
pthread_mutex_t _mutex;
// 条件变量
pthread_cond_t _cond;
// 等待线程数
int _waitnum;
// 线程池是否运行
bool _isrunning;
// 线程池单例
static ThreadPool<T> *_instance;
// 全局锁
static pthread_mutex_t _lock;
};
// 初始化静态变量
template <typename T>
ThreadPool<T> *ThreadPool<T>::_instance = nullptr;
// 全局锁
template <typename T>
pthread_mutex_t ThreadPool<T>::_lock = PTHREAD_MUTEX_INITIALIZER;
Main.cc:
#include "ThreadPool.hpp"
#include "Task.hpp"
#include "Log.hpp"
#include <iostream>
#include <string>
#include <memory>
#include <ctime>
int main()
{
// 日志
LOG(DEBUG, "程序已经加载");
sleep(2);
// 创建线程池单例
ThreadPool<Task>::GetInstance();
sleep(2);
// 获取单例
ThreadPool<Task>::GetInstance();
sleep(2);
ThreadPool<Task>::GetInstance();
sleep(2);
ThreadPool<Task>::GetInstance();
sleep(2);
// 等待线程结束
ThreadPool<Task>::GetInstance()->Wait();
sleep(2);
return 0;
}
结果演示: