文章目录
- 1 功能简介
- 线程池的初始化
- 线程池执行流程
- 2 线程池类的设计
- 线程类XThread
- XThread.h
- XThread.cpp
- 线程池类XThreadPool
- XThreadPool.h
- XThreadPool.cpp
- 任务基类task
- XTask.h
- 3 自定义任务的例子
- 自定义任务类ServerCMD
- ServerCMD.h
- ServerCMD.cpp
- 测试程序
- 运行效果
1 功能简介
本文利用libevent,实现一个C++线程池,,可自定义用户任务类,继承于任务task基类,重写任务基类的纯虚函数实现多态。比如将定义定义处理客户端的请求任务类,实现对客户端请求的并发处理。
-
工作队列:可以理解为线程的队列,一个线程同时可以处理一个任务,空闲的线程回从任务队列取出任务执行。当工作队列空时,线程会睡眠。
-
任务队列:用户将任务加入任务队列,然后通知工作队列,取出一个任务到线程中执行。
线程池的初始化
线程池执行流程
2 线程池类的设计
线程类XThread
线程类的接口功能
Start() -> 管道可读就激活线程;设置管道属性;进入事件循环,等待管道可读激活线程执行任务
Setup() -> 设置管道属性,将管道读事件绑定到event_base中,等待触发,调用回调
Main() -> 此函数只进入事件循环,等待事件循环退出
Notify() -> 读取管道数据,从当前线程对象的任务队列中取出任务,执行任务
AddTask() -> 将任务对象加入线程对象的任务队列,将线程的事件处理器base,保存到任务对象中
Activate() -> 通过管道发送启动标志,来激活线程,发送一个字符'c'激活相当于加入一个任务对象到当前线程的任务队列,通过Notify()处理。
调用多次Activate表示加入多个任务,任务顺序被执行。
XThread.h
#pragma once
#include <vector>
/*线程类声明*/
class XThread;
/*任务类声明*/
class XTask;
/*线程池类*/
class XThreadPool
{
public:
//单例模式创建返回唯一对象
static XThreadPool* GetInstance();
//初始化所有线程并启动线程
void Init(int threadCount);
//分发线程
void Dispatch(XTask* task);
private:
//将构造函数的访问属性设置为 private
//将构造函数构造声明成私有不使用
//声明成私有不使用
XThreadPool(){} //无参构造
XThreadPool(const XThreadPool&); //拷贝构造
XThreadPool& operator= (const XThreadPool&); //赋值运算符重载
//线程数量
int threadCount = 0;
//用来标记下一个使用的线程号
int lastThread = -1;
//线程对象数组
std::vector<XThread *> threads;
//线程池对象
static XThreadPool* pInstance;
};
XThread.cpp
#include "XThread.h"
#include "XTask.h"
#include <thread>
#include <iostream>
#include <event2/event.h>
#include <unistd.h>
using namespace std;
XThread::XThread()
{
}
XThread::~XThread()
{
}
//sock 文件描述符,which 事件类型 arg传递的参数
/*
* 函数名: NotifyCB
* 作用: 管道可读事件触发回调函数
*/
static void NotifyCB(evutil_socket_t fd, short which, void *arg)
{
XThread *th = (XThread*)arg;
th->Notify(fd, which);
}
/*
* 函数名: XThread::Start
* 作用: 启动线程
* 解释: 管道可读就激活线程;设置管道属性;进入事件循环,等待管道可读激活线程执行任务。
*/
void XThread::Start()
{
//安装线程,初始化event_base和管道监听事件用于激活
Setup();
//启动线程
thread th(&XThread::Main, this);
//线程分离
th.detach();
}
/*
* 函数名: XThread::Main
* 作用: 线程入口函数
* 解释: 此函数只进入事件循环,等待事件循环退出
*/
void XThread::Main()
{
cout << id << " XThread::Main() begin" << endl;
event_base_dispatch(base); //进入事件循环
event_base_free(base);
cout << id << " XThread::Main() end" << endl;
}
/*
* 函数名: XThread::Setup
* 作用: 安装线程
* 解释: 设置管道属性,将管道读事件绑定到event_base中,等待触发,调用回调
*/
bool XThread::Setup()
{
//windows用配对socket linux用管道
//创建的管道
int fds[2];
if(pipe(fds)){
cerr << "pipe failed!" << endl;
return false;
}
//读取绑定到event事件中,写入要保存
//保存管道的写fd
notify_send_fd = fds[1];
//创建一个新的事件处理器对象
this->base = event_base_new();
//创建一个新的事件对象
//添加管道监听事件读fd,用于激活线程执行任务
event *ev = event_new(base, fds[0], EV_READ|EV_PERSIST, NotifyCB, this);
//将事件对象(struct event)添加到指定的事件处理器(event_base)中
event_add(ev, 0);
return true;
}
/*
* 函数名: XThread::Notify
* 作用: 线程激活执行任务
* 解释: 读取管道数据,从当前线程对象的任务队列中取出任务,执行任务
*/
void XThread::Notify(evutil_socket_t fd, short which)
{
//水平触发 只要没有接受完成,会再次进来
char buf[2] = {0};
int len = read(fd, buf, 1);
if (len <= 0)
return;
cout << id << " thread " << buf << endl;
//获取任务,并初始化任务
XTask* task = NULL;
tasks_mutex.lock();
if(tasks.empty()){ //队列为空
tasks_mutex.unlock();
return;
}
task = tasks.front(); //先进先出
tasks.pop_front();
tasks_mutex.unlock();
task->Init();
}
/*
* 函数名: XThread::Activate
* 作用: 激活线程
* 解释: 通过管道发送启动标志,来激活线程,发送一个字符'c'激活相当于加入一个任务对象到当前线程的任务队列,通过Notify()处理。
* 调用多次Activate表示加入多个任务,任务顺序被执行。
*/
void XThread::Activate()
{
char act[10] = {0};
int len = write(this->notify_send_fd, "c", 1);
if (len <= 0)
{
cerr << "XThread::Activate() failed!" << endl;
}
cout << "currect thread:" << id << ", notify_send_fd:" << this->notify_send_fd << endl;
}
/*
* 函数名: XThread::AddTask
* 作用: 将任务对象加入线程对象的任务队列,将线程的事件处理器base,保存到任务对象中
*/
void XThread::AddTask(XTask* task)
{
if(!task)return;
task->base = this->base;
tasks_mutex.lock();
tasks.push_back(task);
tasks_mutex.unlock();
}
线程池类XThreadPool
线程类的接口功能
GetInstance() -> 单例模式创建返回唯一对象
Init() -> 创建指定数量线程对象,启动线程,并把线程对象加入到线程池的线程对象数组
Dispatch() -> 从线程对象数组取出线程对象,并把任务加入线程对象的任务队列中,激活该线程执行任务
XThreadPool.h
#pragma once
#include <vector>
/*线程类声明*/
class XThread;
/*任务类声明*/
class XTask;
/*线程池类*/
class XThreadPool
{
public:
//单例模式创建返回唯一对象
static XThreadPool* GetInstance();
//初始化所有线程并启动线程
void Init(int threadCount);
//分发线程
void Dispatch(XTask* task);
private:
//将构造函数的访问属性设置为 private
//将构造函数构造声明成私有不使用
//声明成私有不使用
XThreadPool(){} //无参构造
XThreadPool(const XThreadPool&); //拷贝构造
XThreadPool& operator= (const XThreadPool&); //赋值运算符重载
//线程数量
int threadCount = 0;
//用来标记下一个使用的线程号
int lastThread = -1;
//线程对象数组
std::vector<XThread *> threads;
//线程池对象
static XThreadPool* pInstance;
};
XThreadPool.cpp
#include "XThreadPool.h"
#include "XThread.h"
#include <thread>
#include <iostream>
//#include <chrono>
using namespace std;
//静态成员变量类外初始化
XThreadPool* XThreadPool::pInstance = NULL;
/*
* 函数名: XThreadPool::GetInstance
* 作用: 单例模式创建返回唯一对象
*/
XThreadPool* XThreadPool::GetInstance()
{
//当需要使用对象时,访问instance 的值
//空值:创建对象,并用instance 标记
//非空值: 返回instance 标记的对象
if( pInstance == NULL )
{
pInstance = new XThreadPool();
}
return pInstance;
}
/*
* 函数名: XThreadPool::Init
* 作用: 初始化所有线程并启动线程
* 解释: 创建指定数量线程对象,启动线程,并把线程对象加入到线程池的线程对象数组
*/
void XThreadPool::Init(int threadCount)
{
this->threadCount = threadCount;
this->lastThread = -1;
for (int i = 0; i < threadCount; i++)
{
XThread *t = new XThread();
t->id = i + 1;
cout << "Create thread " << i << endl;
//启动线程
t->Start();
threads.push_back(t);
this_thread::sleep_for(std::chrono::microseconds(10));
}
}
/*
* 函数名: XThreadPool::Dispatch
* 作用: 分发线程
* 解释: 从线程对象数组取出线程对象,并把任务加入线程对象的任务队列中,激活该线程执行任务。
*/
void XThreadPool::Dispatch(XTask* task)
{
//轮询
if(!task)return;
int tid = (lastThread + 1) % threadCount;
lastThread = tid;
cout << "lastThread:" << lastThread << endl;
XThread *XTh = threads[tid];
//添加任务
XTh->AddTask(task);
//线程激活
XTh->Activate();
}
任务基类task
XTask.h
#pragma once
#include <iostream>
class XTask
{
public:
//事件处理器对象
struct event_base* base = NULL;
//客户端连接的socket
int sock = 0;
//初始化任务 纯虚函数
virtual bool Init() = 0;
};
3 自定义任务的例子
自定义任务类ServerCMD
线程类的接口功能
Init() -> 初始化任务,注册当前socket的读事件和超时事件,绑定回调函数
ReadCB() -> 读事件回调函数
EventCB() -> 客户端超时未发请求,断开连接退出任务
ServerCMD.h
#pragma once
#include "XTask.h"
class XFtpServerCMD : public XTask
{
public:
//初始化任务
virtual bool Init();
XFtpServerCMD();
~XFtpServerCMD();
};
ServerCMD.cpp
#include "XFtpServerCMD.h"
#include <event2/event.h>
#include <event2/bufferevent.h>
#include <iostream>
#include <string.h>
using namespace std;
/*
* 函数名: EventCB
* 作用: 超时事件回调函数
* 解释: 客户端超时未发请求,断开连接退出任务
*/
void EventCB(struct bufferevent *bev, short what, void *arg)
{
XFtpServerCMD* cmd = (XFtpServerCMD*)arg;
//如果对方网络断掉,或者机器死机有可能收不到BEV_EVENT_EOF数据
if(what & (BEV_EVENT_EOF | BEV_EVENT_ERROR | BEV_EVENT_TIMEOUT))
{
cout << "BEV_EVENT_EOF | BEV_EVENT_ERROR |BEV_EVENT_TIMEOUT" << endl;
bufferevent_free(bev);
delete cmd;
}
}
/*
* 函数名: ReadCB
* 作用: 读事件回调函数
*/
void ReadCB(struct bufferevent *bev, void *arg)
{
XFtpServerCMD* cmd = (XFtpServerCMD*)arg;
char data[1024] = {0};
for (;;)
{
int len = bufferevent_read(bev, data, sizeof(data)-1);
if(len <= 0)break;
data[len] = '\0';
cout << data << endl << flush;
//测试代码,要清理掉
if(strstr(data, "quit"))
{
bufferevent_free(bev);
delete cmd;
break;
}
}
}
/*
* 函数名: XFtpServerCMD::Init
* 作用: 初始化任务
* 解释: 初始化任务,注册当前socket的读事件和超时事件,绑定回调函数。
*/
bool XFtpServerCMD::Init()
{
cout << "XFtpServerCMD::Init() sock:" << sock << endl;
//监听socket bufferevent
// base socket
bufferevent* bev = bufferevent_socket_new(base, sock, BEV_OPT_CLOSE_ON_FREE);
bufferevent_setcb(bev, ReadCB, 0 ,EventCB, this);
bufferevent_enable(bev, EV_READ | EV_WRITE);
//添加超时
timeval rt = {10, 0}; //10秒
bufferevent_set_timeouts(bev, &rt, 0); //设置读超时回调函数
return true;
}
XFtpServerCMD::XFtpServerCMD()
{
}
XFtpServerCMD::~XFtpServerCMD()
{
}
测试程序
#include <event2/event.h>
#include <event2/listener.h>
#include <string.h>
#include "XThreadPool.h"
#include <signal.h>
#include <iostream>
#include "XFtpServerCMD.h"
using namespace std;
#define SPORT 5001
/*
* 函数名: listen_cb
* 作用: 接收到连接的回调函数
* 解释: 通过多态来创建任务对象,将当前socket保存到任务对象中,分发任务执行
*/
void listen_cb(struct evconnlistener *e, evutil_socket_t s, struct sockaddr *a, int socklen, void *arg)
{
cout << "listen_cb" << endl;
XTask* task = new XFtpServerCMD();
task->sock = s;
XThreadPool::GetInstance()->Dispatch(task);
}
int main()
{
//忽略管道信号,发送数据给已关闭的socket
if (signal(SIGPIPE, SIG_IGN) == SIG_ERR)
return 1;
//1 初始化线程池
XThreadPool::GetInstance()->Init(5);
std::cout << "test thread pool!\n";
//创建libevent的上下文
event_base* base = event_base_new();
if (base)
{
cout << "event_base_new success!" << endl;
}
//监听端口
//socket ,bind,listen 绑定事件
sockaddr_in sin;
memset(&sin, 0, sizeof(sin));
sin.sin_family = AF_INET;
sin.sin_port = htons(SPORT);
evconnlistener* ev = evconnlistener_new_bind(base, // libevent的上下文
listen_cb, //接收到连接的回调函数
base, //回调函数获取的参数 arg
LEV_OPT_REUSEABLE | LEV_OPT_CLOSE_ON_FREE, //地址重用,evconnlistener关闭同时关闭socket
10, //连接队列大小,对应listen函数
(sockaddr*)&sin, //绑定的地址和端口
sizeof(sin)
);
//事件分发处理
if(base)
event_base_dispatch(base);
if(ev)
evconnlistener_free(ev);
if(base)
event_base_free(base);
return 0;
}
运行效果
初始化线程池,创建5个线程,通过telnet和网络调试软件模拟客户端的接入,客户端发送信息服务器打印出来,当客户端超时未发请求,断开连接退出任务。