1.线程池
1.1初期框架
thread.hpp
#include<iostream>
#include <string>
#include <unistd.h>
#include <functional>
#include <pthread.h>
namespace ThreadModule
{
using func_t = std::function<void()>;
class Thread
{
public:
void Excute()
{
_func();
}
public:
Thread(func_t func, std::string name="none-name")
: _func(func), _threadname(name), _stop(true)
{}
static void *threadroutine(void *args) //注意:类成员函数,形参是有this指针的
{
Thread *self = static_cast<Thread *>(args);
self->Excute();
return nullptr;
}
bool Start()
{
int n = pthread_create(&_tid, nullptr, threadroutine, this);
if(!n)
{
_stop = false;
return true;
}
else
{
return false;
}
}
void Detach()
{
if(!_stop)
{
pthread_detach(_tid);
}
}
void Join()
{
if(!_stop)
{
pthread_join(_tid, nullptr);
}
}
std::string name()
{
return _threadname;
}
void Stop()
{
_stop = true;
}
~Thread() {}
private:
pthread_t _tid;//线程tid
std::string _threadname;//线程名字
func_t _func;//线程所要执行的函数
bool _stop;//判断线程是否停止
};
}
ThreadPool.hpp
#include<vector>
#include<unistd.h>
#include<string>
#include<queue>
#include"Thread.hpp"
using namespace ThreadModule;
const int g_thread_num = 3;//默认线程数
// 线程池->一批线程,一批任务,有任务push、有任务pop,本质是: 生产消费模型
template <typename T>
class ThreadPool
{
public:
ThreadPool(int threadnum=g_thread_num)//构造函数
:_threadnum(threadnum)
, _waitnum(0)
, _isrunning(false)
{
pthread_mutex_init(&_mutex,nullptr);//初始化锁
pthread_cond_init(&_cond,nullptr);//初始化条件变量
}
void Print()
{
while(true)
{
std::cout<<"我是一个线程"<<std::endl;
sleep(1);
}
}
void InitThreadPool()
{
// 指向构建出所有的线程,并不启动
for (int num = 0; num < _threadnum; num++)
{
std::string name = "thread-" + std::to_string(num + 1);
_threads.emplace_back(Print,name);//线程处理函数是Print,注意这里有问题
}
_isrunning = true;
}
void Start()//启动线程池
{
for(auto &thread:_threads)
{
thread.Start();
std::cout<<thread.name()<<"线程:启动成功"<<std::endl;
}
}
void Wait()
{
for(auto &thread:_threads)
{
thread.Join();
}
}
// bool Enqueue(const T &t)
// {
// }
~ThreadPool()//析构
{
pthread_mutex_destroy(&_mutex);
pthread_cond_destroy(&_cond);
}
private:
int _threadnum;//线程的数量
std::vector<Thread> _threads;//用vector来存线程
std::queue<T> _task_queue;//任务队列
pthread_mutex_t _mutex;//锁
pthread_cond_t _cond;//条件变量
int _waitnum;//有几个线程阻塞
bool _isrunning;//判断线程池是否在运行
};
main.cc
#include <iostream>
#include <string>
#include <memory>
#include "threadpool.hpp"
int main()
{
std::unique_ptr<ThreadPool<int>> tp(new ThreadPool<int>());
tp->InitThreadPool();
tp->Start();
sleep(5);
tp->Wait();
return 0;
}
此时会报错:无效使用非静态成员函数...
主要原因是成员函数包含this指针而thread.hpp中线程所要执行函数的参数为空:using func_t = std::function<void()>;,导致参数类型不匹配
有两种解决方法
方法一:在Print函数前面加上static
static void Print()
{
while(true)
{
std::cout<<"我是一个线程"<<std::endl;
sleep(1);
}
}
方法二:在初始化线程池时用bind绑定ThreadPool内部的Print方法,缺省地设置参数this,就是将this参数默认的绑定到Print方法上,这样一来就和thread.hpp中的参数匹配上了
void InitThreadPool()
{
// 指向构建出所有的线程,并不启动
for (int num = 0; num < _threadnum; num++)
{
std::string name = "thread-" + std::to_string(num + 1);
//_threads.emplace_back(Print,name);//线程处理函数是Print
_threads.emplace_back(std::bind(&ThreadPool::Print,this),name);
}
_isrunning = true;
}
也是成功运行
就算后面我们需要更改线程的参数
那么也可以在初始化函数那里固定式的绑定参数了
不需要再去单独给线程设计参数对象了
一个类的成员方法设计成另一个类的回调方法,常见的实现就是这种
类的成员方法也可以成为另一个类的回调方法,方便我们继续类级别的互相调用
1.2代码完善
接下来就是如何入队列以及我们的新线程应该做什么任务...
处理任务:每一个线程进来的时候都需要去任务队列中获取任务,所以我们首当其冲的就要对任务队列给它锁住
任务队列的加锁、解锁以及线程的等待与唤醒(条件变量)
private:
void LockQueue()
{
pthread_mutex_lock(&_mutex);
}
void UnlockQueue()
{
pthread_mutex_unlock(&_mutex);
}
void ThreadSleep()
{
pthread_cond_wait(&_cond, &_mutex);
}
void ThreadWakeup()
{
pthread_cond_signal(&_cond);
}
void ThreadWakeupAll()
{
pthread_cond_broadcast(&_cond);
}
处理任务
void HandlerTask(std::string name)//线程处理任务
{
while (true)
{
//加锁
LockQueue();
//任务队列中不一定有数据,如果任务队列为空且线程池在跑,那么就阻塞住
while(_task_queue.empty()&&_isrunning)
{
_waitnum++;
ThreadSleep();
_waitnum--;
}
//如果任务队列是空的,然后线程池又退出了,那么就没必要运行了
if(_task_queue.empty() && !_isrunning)
{
UnlockQueue();
std::cout<<name<<"quit..."<<std::endl;
sleep(1);
break;
}
//不论线程池有没有退出,走到这说明一定有任务 ->处理任务
T t = _task_queue.front();
_task_queue.pop();
UnlockQueue();//解锁
t();
}
}
注意:这个任务是属于线程独占的任务,不能再任务队列的加锁、解锁之间处理
入任务队列
如果线程阻塞等待的数量大于0,就唤醒一个线程
bool Enqueue(const T &t)
{
bool ret = false;
LockQueue();
if(_isrunning)
{
_task_queue.push(t);
if(_waitnum>0)
{
ThreadWakeup();
}
ret = true;
}
UnlockQueue();
return ret;
}
threadpool.hpp
任务还没写,所以t()先注释掉
#include<iostream>
#include<vector>
#include<unistd.h>
#include<string>
#include<queue>
#include"LockGuard.hpp"
#include"Thread.hpp"
using namespace ThreadModule;
const int g_thread_num = 3;//默认线程数
// 线程池->一批线程,一批任务,有任务push、有任务pop,本质是: 生产消费模型
template <typename T>
class ThreadPool
{
private:
void LockQueue()
{
pthread_mutex_lock(&_mutex);
}
void UnlockQueue()
{
pthread_mutex_unlock(&_mutex);
}
void ThreadSleep()
{
pthread_cond_wait(&_cond, &_mutex);
}
void ThreadWakeup()
{
pthread_cond_signal(&_cond);
}
void ThreadWakeupAll()
{
pthread_cond_broadcast(&_cond);
}
public:
ThreadPool(int threadnum=g_thread_num)//构造函数
:_threadnum(threadnum)
, _waitnum(0)
, _isrunning(false)
{
pthread_mutex_init(&_mutex,nullptr);//初始化锁
pthread_cond_init(&_cond,nullptr);//初始化条件变量
}
// static void Print()
// {
// while(true)
// {
// std::cout<<"我是一个线程"<<std::endl;
// sleep(1);
// }
// }
// void Print(std::string name)
// {
// while(true)
// {
// std::cout<<"我是一个线程,线程名是"<<name<<std::endl;
// sleep(1);
// }
// }
void InitThreadPool()
{
// 指向构建出所有的线程,并不启动
for (int num = 0; num < _threadnum; num++)
{
std::string name = "thread-" + std::to_string(num + 1);
//_threads.emplace_back(Print,name);//线程处理函数是Print
//_threads.emplace_back(std::bind(&ThreadPool::Print,this,std::placeholders::_1),name);
_threads.emplace_back(std::bind(&ThreadPool::HandlerTask,this,std::placeholders::_1),name);
}
_isrunning = true;
}
void Start()//启动线程池
{
for(auto &thread:_threads)
{
thread.Start();
std::cout<<thread.name()<<"线程:启动成功"<<std::endl;
}
}
void HandlerTask(std::string name)//线程处理任务
{
while (true)
{
//加锁
LockQueue();
//任务队列中不一定有数据,如果任务队列为空且线程池在跑,那么就阻塞住
while(_task_queue.empty()&&_isrunning)
{
_waitnum++;
std::cout<<name<<"线程阻塞中..."<<std::endl;
ThreadSleep();
_waitnum--;
}
//如果任务队列是空的,然后线程池又退出了,那么就没必要运行了
if(_task_queue.empty() && !_isrunning)
{
UnlockQueue();
std::cout<<name<<"quit..."<<std::endl;
sleep(1);
break;
}
//不论线程池有没有退出,走到这说明一定有任务 ->处理任务
T t = _task_queue.front();
_task_queue.pop();
UnlockQueue();//解锁
//t();
}
}
void Stop()
{
LockQueue();
_isrunning = false;
ThreadWakeupAll();
UnlockQueue();
}
void Wait()
{
for(auto &thread:_threads)
{
thread.Join();
}
}
bool Enqueue(const T &t)
{
bool ret = false;
LockQueue();
if(_isrunning)
{
_task_queue.push(t);
if(_waitnum>0)
{
ThreadWakeup();
}
ret = true;
}
UnlockQueue();
return ret;
}
~ThreadPool()//析构
{
pthread_mutex_destroy(&_mutex);
pthread_cond_destroy(&_cond);
}
private:
int _threadnum;//线程的数量
std::vector<Thread> _threads;//用vector来存线程
std::queue<T> _task_queue;//任务队列
pthread_mutex_t _mutex;//锁
pthread_cond_t _cond;//条件变量
int _waitnum;
bool _isrunning;//判断线程池是否在运行
};
main.cc
#include <iostream>
#include <string>
#include <memory>
#include "Task.hpp"
#include "threadpool.hpp"
int main()
{
std::unique_ptr<ThreadPool<int>> tp(new ThreadPool<int>());
tp->InitThreadPool();
tp->Start();
sleep(2);
tp->Stop();
tp->Wait();
return 0;
}
2.加上日志与任务
LOG.hpp(日志)
#pragma once
#include <iostream>
#include <fstream>
#include <cstdio>
#include <string>
#include <ctime>
#include <cstdarg>
#include <sys/types.h>
#include <unistd.h>
#include <pthread.h>
#include"LockGuard.hpp"
bool gIsSave = false;
const std::string logname = "log.txt";
// 1. 日志是有等级的
enum Level
{
DEBUG = 0,
INFO,
WARNING,
ERROR,
FATAL
};
void SaveFile(const std::string &filename, const std::string &message)
{
std::ofstream out(filename, std::ios::app);
if (!out.is_open())
{
return;
}
out << message;
out.close();
}
std::string LevelToString(int level)
{
switch (level)
{
case DEBUG:
return "Debug";
case INFO:
return "Info";
case WARNING:
return "Warning";
case ERROR:
return "Error";
case FATAL:
return "Fatal";
default:
return "Unknown";
}
}
std::string GetTimeString()
{
time_t curr_time = time(nullptr);
struct tm *format_time = localtime(&curr_time);
if (format_time == nullptr)
return "None";
char time_buffer[1024];
snprintf(time_buffer, sizeof(time_buffer), "%d-%d-%d %d:%d:%d",
format_time->tm_year + 1900,
format_time->tm_mon + 1,
format_time->tm_mday,
format_time->tm_hour,
format_time->tm_min,
format_time->tm_sec);
return time_buffer;
}
pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;
// 日志是有格式的
// 日志等级 时间 代码所在的文件名/行数 日志的内容
void LogMessage(std::string filename, int line,bool issave,int level, const char *format, ...)
{
std::string levelstr = LevelToString(level);
std::string timestr = GetTimeString();
pid_t selfid = getpid();
char buffer[1024];
va_list arg;
va_start(arg, format);
vsnprintf(buffer, sizeof(buffer), format, arg);
va_end(arg);
std::string message = "[" + timestr + "]" + "[" + levelstr + "]" +
"[" + std::to_string(selfid) + "]" +
"[" + filename + "]" + "[" + std::to_string(line) + "] " + buffer + "\n";
LockGuard lockguard(&lock);
if (!issave)
{
std::cout << message;
}
else
{
SaveFile(logname, message);
}
}
#define LOG(level, format, ...) \
do \
{ \
LogMessage(__FILE__, __LINE__, gIsSave, level, format, ##__VA_ARGS__); \
} while (0)
LockGuard.hpp
#ifndef __LOCK_GUARD_HPP__
#define __LOCK_GUARD_HPP__
#include <iostream>
#include <pthread.h>
class LockGuard
{
public:
LockGuard(pthread_mutex_t *mutex):_mutex(mutex)
{
pthread_mutex_lock(_mutex); // 构造加锁
}
~LockGuard()
{
pthread_mutex_unlock(_mutex);
}
private:
pthread_mutex_t *_mutex;
};
#endif
threadpool.hpp
#include<iostream>
#include<vector>
#include<unistd.h>
#include<string>
#include<queue>
#include"LOG.hpp"
#include"LockGuard.hpp"
#include"Thread.hpp"
using namespace ThreadModule;
const int g_thread_num = 3;//默认线程数
// 线程池->一批线程,一批任务,有任务push、有任务pop,本质是: 生产消费模型
template <typename T>
class ThreadPool
{
private:
void LockQueue()
{
pthread_mutex_lock(&_mutex);
}
void UnlockQueue()
{
pthread_mutex_unlock(&_mutex);
}
void ThreadSleep()
{
pthread_cond_wait(&_cond, &_mutex);
}
void ThreadWakeup()
{
pthread_cond_signal(&_cond);
}
void ThreadWakeupAll()
{
pthread_cond_broadcast(&_cond);
}
public:
ThreadPool(int threadnum=g_thread_num)//构造函数
:_threadnum(threadnum)
, _waitnum(0)
, _isrunning(false)
{
pthread_mutex_init(&_mutex,nullptr);//初始化锁
pthread_cond_init(&_cond,nullptr);//初始化条件变量
LOG(INFO, "线程池构造成功");
}
// static void Print()
// {
// while(true)
// {
// std::cout<<"我是一个线程"<<std::endl;
// sleep(1);
// }
// }
// void Print(std::string name)
// {
// while(true)
// {
// std::cout<<"我是一个线程,线程名是"<<name<<std::endl;
// sleep(1);
// }
// }
void InitThreadPool()
{
// 指向构建出所有的线程,并不启动
for (int num = 0; num < _threadnum; num++)
{
std::string name = "thread-" + std::to_string(num + 1);
//_threads.emplace_back(Print,name);//线程处理函数是Print
//_threads.emplace_back(std::bind(&ThreadPool::Print,this,std::placeholders::_1),name);
_threads.emplace_back(std::bind(&ThreadPool::HandlerTask,this,std::placeholders::_1),name);
LOG(INFO, "线程 %s 初始化成功", name.c_str());
}
_isrunning = true;
}
void Start()//启动线程池
{
for(auto &thread:_threads)
{
thread.Start();
std::cout<<thread.name()<<"线程:启动成功"<<std::endl;
}
}
void HandlerTask(std::string name)//线程处理任务
{
LOG(INFO, "%s 正在运行...", name.c_str());
while (true)
{
//加锁
LockQueue();
//任务队列中不一定有数据,如果任务队列为空且线程池在跑,那么就阻塞住
while(_task_queue.empty()&&_isrunning)
{
_waitnum++;
ThreadSleep();
_waitnum--;
}
//如果任务队列是空的,然后线程池又退出了,那么就没必要运行了
if(_task_queue.empty() && !_isrunning)
{
UnlockQueue();
//std::cout<<name<<"quit..."<<std::endl;
sleep(1);
break;
}
//不论线程池有没有退出,走到这说明一定有任务 ->处理任务
T t = _task_queue.front();
_task_queue.pop();
UnlockQueue();//解锁
LOG(DEBUG, "%s 获得任务", name.c_str());
t();
LOG(DEBUG,"%s 处理任务中,结果是%s",name.c_str(), t.ResultToString().c_str());
}
}
void Stop()
{
LockQueue();
_isrunning = false;
ThreadWakeupAll();
UnlockQueue();
}
void Wait()
{
for(auto &thread:_threads)
{
thread.Join();
LOG(INFO, "%s 线程退出...", thread.name().c_str());
}
}
bool Enqueue(const T &t)
{
bool ret = false;
LockQueue();
if(_isrunning)
{
_task_queue.push(t);
if(_waitnum>0)
{
ThreadWakeup();
}
LOG(DEBUG, "任务入队列成功");
ret = true;
}
UnlockQueue();
return ret;
}
~ThreadPool()//析构
{
pthread_mutex_destroy(&_mutex);
pthread_cond_destroy(&_cond);
}
private:
int _threadnum;//线程的数量
std::vector<Thread> _threads;//用vector来存线程
std::queue<T> _task_queue;//任务队列
pthread_mutex_t _mutex;//锁
pthread_cond_t _cond;//条件变量
int _waitnum;
bool _isrunning;//判断线程池是否在运行
};
thread.hpp
#include<iostream>
#include <string>
#include <unistd.h>
#include <functional>
#include <pthread.h>
namespace ThreadModule
{
using func_t = std::function<void(std::string)>;
class Thread
{
public:
void Excute()
{
_func(_threadname);
}
public:
Thread(func_t func, std::string name="none-name")
: _func(func), _threadname(name), _stop(true)
{}
static void *threadroutine(void *args) // 类成员函数,形参是有this指针的!!
{
Thread *self = static_cast<Thread *>(args);
self->Excute();
return nullptr;
}
bool Start()
{
int n = pthread_create(&_tid, nullptr, threadroutine, this);
if(!n)
{
_stop = false;
return true;
}
else
{
return false;
}
}
void Detach()
{
if(!_stop)
{
pthread_detach(_tid);
}
}
void Join()
{
if(!_stop)
{
pthread_join(_tid, nullptr);
}
}
std::string name()
{
return _threadname;
}
void Stop()
{
_stop = true;
}
~Thread() {}
private:
pthread_t _tid;//线程tid
std::string _threadname;//线程名字
func_t _func;//线程所要执行的函数
bool _stop;//判断线程是否停止
};
}
main.cc
#include <iostream>
#include <string>
#include <memory>
#include "LOG.hpp"
#include "threadpool.hpp"
#include "Task.hpp"
#include<ctime>
int main()
{
srand(time(nullptr) ^ getpid() ^ pthread_self());
std::unique_ptr<ThreadPool<Task>> tp(new ThreadPool<Task>(5));
tp->InitThreadPool();
tp->Start();
int tasknum=3;
while(tasknum)
{
int a = rand() % 12 + 1;
usleep(1000);
int b = rand() % 4 + 1;
Task t(a, b);
LOG(INFO, "主线程推送任务: %s", t.DebugToString().c_str());
tp->Enqueue(t);
sleep(1);
tasknum--;
}
tp->Stop();
tp->Wait();
return 0;
}