libevent高并发网络编程 - 06_基于libevent的C++线程池实现

news2025/1/13 3:10:06

文章目录

    • 1 功能简介
      • 线程池的初始化
      • 线程池执行流程
    • 2 线程池类的设计
      • 线程类XThread
        • XThread.h
        • XThread.cpp
      • 线程池类XThreadPool
        • XThreadPool.h
        • XThreadPool.cpp
      • 任务基类task
        • XTask.h
    • 3 自定义任务的例子
      • 自定义任务类ServerCMD
        • ServerCMD.h
        • ServerCMD.cpp
      • 测试程序
      • 运行效果

1 功能简介

本文利用libevent,实现一个C++线程池,,可自定义用户任务类,继承于任务task基类,重写任务基类的纯虚函数实现多态。比如将定义定义处理客户端的请求任务类,实现对客户端请求的并发处理。

  • 工作队列:可以理解为线程的队列,一个线程同时可以处理一个任务,空闲的线程回从任务队列取出任务执行。当工作队列空时,线程会睡眠。

  • 任务队列:用户将任务加入任务队列,然后通知工作队列,取出一个任务到线程中执行。

线程池的初始化

请添加图片描述

线程池执行流程

请添加图片描述

2 线程池类的设计

线程类XThread

线程类的接口功能
Start() ->		管道可读就激活线程;设置管道属性;进入事件循环,等待管道可读激活线程执行任务
Setup() ->		设置管道属性,将管道读事件绑定到event_base中,等待触发,调用回调
Main() ->		此函数只进入事件循环,等待事件循环退出
    
Notify() ->		读取管道数据,从当前线程对象的任务队列中取出任务,执行任务
AddTask() ->	将任务对象加入线程对象的任务队列,将线程的事件处理器base,保存到任务对象中
Activate() ->	通过管道发送启动标志,来激活线程,发送一个字符'c'激活相当于加入一个任务对象到当前线程的任务队列,通过Notify()处理。
    			调用多次Activate表示加入多个任务,任务顺序被执行。

XThread.h

#pragma once
#include <vector>

/*线程类声明*/
class XThread;

/*任务类声明*/
class XTask;

/*线程池类*/
class XThreadPool
{
public:
    //单例模式创建返回唯一对象
    static XThreadPool* GetInstance();

    //初始化所有线程并启动线程
    void Init(int threadCount);

    //分发线程
    void Dispatch(XTask* task);

private:
    //将构造函数的访问属性设置为 private
    //将构造函数构造声明成私有不使用
    //声明成私有不使用
    XThreadPool(){}                                 //无参构造
    XThreadPool(const XThreadPool&);                //拷贝构造
    XThreadPool& operator= (const XThreadPool&);    //赋值运算符重载
    
    //线程数量
    int threadCount = 0;

    //用来标记下一个使用的线程号
    int lastThread = -1;

    //线程对象数组
    std::vector<XThread *> threads;

    //线程池对象
    static XThreadPool* pInstance;
};

XThread.cpp

#include "XThread.h"
#include "XTask.h"
#include <thread>
#include <iostream>
#include <event2/event.h>
#include <unistd.h>

using namespace std;

XThread::XThread()
{

}

XThread::~XThread()
{

}

//sock 文件描述符,which 事件类型 arg传递的参数
/*
* 函数名: NotifyCB
* 作用:   管道可读事件触发回调函数
*/
static void NotifyCB(evutil_socket_t fd, short which, void *arg)
{
    XThread *th = (XThread*)arg;
    th->Notify(fd, which);
}

/*
* 函数名: XThread::Start
* 作用:   启动线程
* 解释:   管道可读就激活线程;设置管道属性;进入事件循环,等待管道可读激活线程执行任务。
*/
void XThread::Start()
{
    //安装线程,初始化event_base和管道监听事件用于激活
    Setup();

    //启动线程
    thread th(&XThread::Main, this);

    //线程分离
    th.detach();
}

/*
* 函数名: XThread::Main
* 作用:   线程入口函数
* 解释:   此函数只进入事件循环,等待事件循环退出
*/
void XThread::Main()
{
    cout << id << " XThread::Main() begin" << endl;
    event_base_dispatch(base);  //进入事件循环
    event_base_free(base);
	cout << id << " XThread::Main() end" << endl;
}

/*
* 函数名: XThread::Setup
* 作用:   安装线程
* 解释:   设置管道属性,将管道读事件绑定到event_base中,等待触发,调用回调
*/
bool XThread::Setup()
{
	//windows用配对socket linux用管道
    //创建的管道
    int fds[2];
    if(pipe(fds)){
        cerr << "pipe failed!" << endl;
		return false;    
    }

    //读取绑定到event事件中,写入要保存
    //保存管道的写fd
    notify_send_fd = fds[1];

    //创建一个新的事件处理器对象
    this->base = event_base_new();

    //创建一个新的事件对象
    //添加管道监听事件读fd,用于激活线程执行任务
    event *ev = event_new(base, fds[0], EV_READ|EV_PERSIST, NotifyCB, this);
    //将事件对象(struct event)添加到指定的事件处理器(event_base)中
    event_add(ev, 0);

    return true;
}


/*
* 函数名: XThread::Notify
* 作用:   线程激活执行任务
* 解释:   读取管道数据,从当前线程对象的任务队列中取出任务,执行任务
*/
void XThread::Notify(evutil_socket_t fd, short which)
{
    //水平触发 只要没有接受完成,会再次进来
    char buf[2] = {0};

	int len = read(fd, buf, 1);
    if (len <= 0)
		return;
	cout << id << " thread " << buf << endl;

    //获取任务,并初始化任务
    XTask* task = NULL;
    tasks_mutex.lock();
    if(tasks.empty()){  //队列为空
        tasks_mutex.unlock();
        return;
    }

    task = tasks.front();   //先进先出
    tasks.pop_front();
    tasks_mutex.unlock();
    task->Init();
}

/*
* 函数名: XThread::Activate
* 作用:   激活线程
* 解释:   通过管道发送启动标志,来激活线程,发送一个字符'c'激活相当于加入一个任务对象到当前线程的任务队列,通过Notify()处理。
*         调用多次Activate表示加入多个任务,任务顺序被执行。
*/
void XThread::Activate()
{
    char act[10] = {0};

    int len = write(this->notify_send_fd, "c", 1);

    if (len <= 0)
	{
		cerr << "XThread::Activate() failed!" << endl;
	}
    cout << "currect thread:" << id << ", notify_send_fd:" << this->notify_send_fd << endl;
}


/*
* 函数名: XThread::AddTask
* 作用:   将任务对象加入线程对象的任务队列,将线程的事件处理器base,保存到任务对象中
*/
void XThread::AddTask(XTask* task)
{
    if(!task)return;

    task->base = this->base;

    tasks_mutex.lock();
    tasks.push_back(task);
    tasks_mutex.unlock();
}

线程池类XThreadPool

线程类的接口功能
GetInstance() ->	单例模式创建返回唯一对象
Init() ->			创建指定数量线程对象,启动线程,并把线程对象加入到线程池的线程对象数组 
Dispatch() ->		从线程对象数组取出线程对象,并把任务加入线程对象的任务队列中,激活该线程执行任务

XThreadPool.h

#pragma once
#include <vector>

/*线程类声明*/
class XThread;

/*任务类声明*/
class XTask;

/*线程池类*/
class XThreadPool
{
public:
    //单例模式创建返回唯一对象
    static XThreadPool* GetInstance();

    //初始化所有线程并启动线程
    void Init(int threadCount);

    //分发线程
    void Dispatch(XTask* task);

private:
    //将构造函数的访问属性设置为 private
    //将构造函数构造声明成私有不使用
    //声明成私有不使用
    XThreadPool(){}                                 //无参构造
    XThreadPool(const XThreadPool&);                //拷贝构造
    XThreadPool& operator= (const XThreadPool&);    //赋值运算符重载
    
    //线程数量
    int threadCount = 0;

    //用来标记下一个使用的线程号
    int lastThread = -1;

    //线程对象数组
    std::vector<XThread *> threads;

    //线程池对象
    static XThreadPool* pInstance;
};

XThreadPool.cpp

#include "XThreadPool.h"
#include "XThread.h"
#include <thread>
#include <iostream>
//#include <chrono>

using namespace std;

//静态成员变量类外初始化
XThreadPool* XThreadPool::pInstance = NULL;

/*
* 函数名: XThreadPool::GetInstance
* 作用:   单例模式创建返回唯一对象
*/
XThreadPool* XThreadPool::GetInstance()
{
    //当需要使用对象时,访问instance 的值
    //空值:创建对象,并用instance 标记
    //非空值: 返回instance 标记的对象
    if( pInstance == NULL )
    {
        pInstance = new XThreadPool();
    }
    
    return pInstance;
}

/*
* 函数名: XThreadPool::Init
* 作用:   初始化所有线程并启动线程
* 解释:   创建指定数量线程对象,启动线程,并把线程对象加入到线程池的线程对象数组       
*/
void XThreadPool::Init(int threadCount)
{
    this->threadCount = threadCount;
    this->lastThread = -1;
    for (int i = 0; i < threadCount; i++)
    {
        XThread *t = new XThread();
        t->id = i + 1;
        cout << "Create thread " << i << endl;

        //启动线程
        t->Start();
        threads.push_back(t);
        this_thread::sleep_for(std::chrono::microseconds(10));
    }
}

/*
* 函数名: XThreadPool::Dispatch
* 作用:   分发线程
* 解释:   从线程对象数组取出线程对象,并把任务加入线程对象的任务队列中,激活该线程执行任务。     
*/
void XThreadPool::Dispatch(XTask* task)
{
    //轮询
    if(!task)return;
    int tid = (lastThread + 1) % threadCount;
    lastThread = tid;
    cout << "lastThread:" << lastThread << endl;

    XThread *XTh = threads[tid];

    //添加任务
    XTh->AddTask(task);

    //线程激活
    XTh->Activate();
}

任务基类task

XTask.h

#pragma once
#include <iostream>

class XTask
{
public:
    //事件处理器对象
    struct event_base* base = NULL;

    //客户端连接的socket
    int sock = 0;

    //初始化任务 纯虚函数
    virtual bool Init() = 0;
};

3 自定义任务的例子

自定义任务类ServerCMD

线程类的接口功能
Init() ->		初始化任务,注册当前socket的读事件和超时事件,绑定回调函数
ReadCB() ->		读事件回调函数 
EventCB() ->	客户端超时未发请求,断开连接退出任务

ServerCMD.h

#pragma once

#include "XTask.h"

class XFtpServerCMD : public XTask
{
public:
    //初始化任务
    virtual bool Init();

    XFtpServerCMD();
    ~XFtpServerCMD();
};

ServerCMD.cpp

#include "XFtpServerCMD.h"
#include <event2/event.h>
#include <event2/bufferevent.h>
#include <iostream>
#include <string.h>

using namespace std;


/*
* 函数名: EventCB
* 作用:   超时事件回调函数
* 解释:   客户端超时未发请求,断开连接退出任务
*/
void EventCB(struct bufferevent *bev, short what, void *arg)
{
    XFtpServerCMD* cmd = (XFtpServerCMD*)arg;
    //如果对方网络断掉,或者机器死机有可能收不到BEV_EVENT_EOF数据
    if(what & (BEV_EVENT_EOF | BEV_EVENT_ERROR | BEV_EVENT_TIMEOUT))
    {
        cout << "BEV_EVENT_EOF | BEV_EVENT_ERROR |BEV_EVENT_TIMEOUT" << endl;
		bufferevent_free(bev);
		delete cmd;
    }
}


/*
* 函数名: ReadCB
* 作用:   读事件回调函数
*/
void ReadCB(struct bufferevent *bev, void *arg)
{
    XFtpServerCMD* cmd = (XFtpServerCMD*)arg;
    char data[1024] = {0};

    for (;;)
    {
        int len = bufferevent_read(bev, data, sizeof(data)-1);
        if(len <= 0)break;
        data[len] = '\0';
        cout << data << endl << flush;

        //测试代码,要清理掉
        if(strstr(data, "quit"))
        {
            bufferevent_free(bev);
            delete cmd;
            break;
        }
    }
}


/*
* 函数名: XFtpServerCMD::Init
* 作用:   初始化任务
* 解释:   初始化任务,注册当前socket的读事件和超时事件,绑定回调函数。
*/
bool XFtpServerCMD::Init()
{
    cout << "XFtpServerCMD::Init() sock:" << sock << endl;
	//监听socket bufferevent
	// base socket
    bufferevent* bev = bufferevent_socket_new(base, sock, BEV_OPT_CLOSE_ON_FREE);
    bufferevent_setcb(bev, ReadCB, 0 ,EventCB, this);
    bufferevent_enable(bev, EV_READ | EV_WRITE);

    //添加超时
    timeval rt = {10, 0};       //10秒
    bufferevent_set_timeouts(bev, &rt, 0);  //设置读超时回调函数
	return true;
}

XFtpServerCMD::XFtpServerCMD()
{

}

XFtpServerCMD::~XFtpServerCMD()
{

}

测试程序

#include <event2/event.h>
#include <event2/listener.h>
#include <string.h>
#include "XThreadPool.h"
#include <signal.h>
#include <iostream>

#include "XFtpServerCMD.h"

using namespace std;
#define SPORT 5001

/*
* 函数名: listen_cb
* 作用:   接收到连接的回调函数
* 解释:   通过多态来创建任务对象,将当前socket保存到任务对象中,分发任务执行
*/
void listen_cb(struct evconnlistener *e, evutil_socket_t s, struct sockaddr *a, int socklen, void *arg)
{
	cout << "listen_cb" << endl;
	XTask* task = new XFtpServerCMD();
	task->sock = s;
	XThreadPool::GetInstance()->Dispatch(task);
}

int main()
{
	//忽略管道信号,发送数据给已关闭的socket
	if (signal(SIGPIPE, SIG_IGN) == SIG_ERR)
		return 1;

	//1 初始化线程池
    XThreadPool::GetInstance()->Init(5);

	std::cout << "test thread pool!\n"; 
	//创建libevent的上下文
	event_base* base = event_base_new();
	if (base)
	{
		cout << "event_base_new success!" << endl;
	}

	//监听端口
	//socket ,bind,listen 绑定事件
	sockaddr_in sin;
	memset(&sin, 0, sizeof(sin));
	sin.sin_family = AF_INET;
	sin.sin_port = htons(SPORT);

	evconnlistener* ev = evconnlistener_new_bind(base,	// libevent的上下文
		listen_cb,										//接收到连接的回调函数
		base,											//回调函数获取的参数 arg
		LEV_OPT_REUSEABLE | LEV_OPT_CLOSE_ON_FREE,		//地址重用,evconnlistener关闭同时关闭socket
		10,												//连接队列大小,对应listen函数
		(sockaddr*)&sin,								//绑定的地址和端口
		sizeof(sin)
		);

	//事件分发处理
	if(base)
		event_base_dispatch(base);
	if(ev)
		evconnlistener_free(ev);
	if(base)
		event_base_free(base);


    return 0;
}

运行效果

初始化线程池,创建5个线程,通过telnet和网络调试软件模拟客户端的接入,客户端发送信息服务器打印出来,当客户端超时未发请求,断开连接退出任务。

请添加图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/547539.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

华为机试(JAVA)真题Od【A卷+B卷】2023

目录 华为OD机试是什么&#xff1f;华为OD面试流程&#xff1f;华为OD机试通过率高吗&#xff1f;华为OD薪资待遇&#xff1f;华为OD晋升空间&#xff1f; 大家好&#xff0c;我是哪吒。 本专栏包含了最新最全的华为OD机试真题&#xff0c;有详细的分析和Java代码解答。已帮助…

【信息安全案例】——信息内容安全(学习笔记)

&#x1f4d6; 前言&#xff1a;在数字化时代&#xff0c;信息内容安全问题越来越引起人们的关注。信息内容安全主要包括对数据的机密性、完整性和可用性的保护&#xff0c;以及对用户隐私的保护等方面。针对信息内容安全的威胁&#xff0c;采取科学有效的安全措施和技术手段至…

每日学术速递5.20

CV - 计算机视觉 | ML - 机器学习 | RL - 强化学习 | NLP 自然语言处理 Subjects: cs.CV 1.Improved baselines for vision-language pre-training 标题&#xff1a;改进视觉语言预训练的基线 作者&#xff1a;Enrico Fini, Pietro Astolfi, Adriana Romero-Soriano, Jak…

10-《简单算法》

10-《简单算法》 一、时间复杂度二、空间复杂度三、排序算法1.比较排序1.1冒泡排序&#xff1a;1.2选择排序&#xff1a;1.3插入排序&#xff1a;1.4归并排序(非常重要)1.5快速排序&#xff08;非常重要&#xff09;1.6堆排序1.7排序算法稳定性 2.线性排序2.1桶排序2.2计数排序…

数据结构初阶(3)(链表:链表的基本概念、链表的类型、单向不带头非循环链表的实现、链表的优缺点 )

接上次博客&#xff1a;和数组处理有关的一些OJ题&#xff1b;ArrayList 实现简单的洗牌算法&#xff08;JAVA)(ArrayList&#xff09;_di-Dora的博客-CSDN博客 目录 链表的基本概念 链表的类型 单向、不带头、非循环链表的实现 遍历链表并打印节点值&#xff1a; 在链…

uni-app小程序uni.navigateBack返回上一个页面并传递参数.返回上个页面并刷新

返回上一个打开的页面并传递一个参数。有种办法就是使用 假如从B页面返回A页面&#xff1a; var pages getCurrentPages(); var prevPage pages[pages.length - 2]; //上一个页面 prevPage.setData({ mdata:1 })经过测试&#xff0c;在uni.app中使用B页面使用setData设置A页…

【Spring篇】AOP案例

&#x1f353;系列专栏:Spring系列专栏 &#x1f349;个人主页:个人主页 一、案例&#xff1a;业务层接口执行效率 1.需求分析 这个需求也比较简单&#xff0c;前面我们在介绍 AOP 的时候已经演示过 : 需求 : 任意业务层接口执行均可显示其执行效率&#xff08;执行时长&…

如何选对适合你的FPGA?快速掌握选型技巧!

FPGA厂家和芯片型号众多&#xff0c;在开发过程中&#xff0c;特别是新产品新项目时&#xff0c;都会面临FPGA选型的问题。 如何选择出适合的FPGA型号非常关键&#xff0c;需要评估需求、功能、成本、存储器、高速收发器等各种因素&#xff0c;选出性能与成本平衡的FPGA芯片。…

从零玩转设计模式之外观模式-waiguanmos

title: 从零玩转设计模式之外观模式 date: 2022-12-12 15:49:05.322 updated: 2022-12-23 15:34:40.394 url: https://www.yby6.com/archives/waiguanmos categories: - 设计模式 tags: - 设计模式 什么是外观模式 外观模式是一种软件设计模式&#xff0c;它提供了一种将多个…

进阶必看:高速PCB Layout设计的技术指南

当今电子行业中&#xff0c;高速PCB电路越来越广泛&#xff0c;已成为当代PCB工程师的重要技能&#xff0c;而在高速PCB电路中&#xff0c;高速PCB Layout设计是一项高难度高技术的工作&#xff0c;其设计质量直接关系到电路的性能。所以做好PCB Layout设计是非常非常重要的。 …

Boost开发指南-1.2progress_display

Progress_display progress_display可以在控制台上显示程序的执行进度&#xff0c;如果程序执行很耗费时间&#xff0c;那么它能够提供一个友好的用户界面&#xff0c;不至于让用户在等待中失去耐心。 progress_display位于名字空间boost&#xff0c;为了使用progress_displa…

内网自建代理ChatGPT

使用GPT比较频繁&#xff0c;一开始翻墙还能接受&#xff0c;但是用美国节点访问其他国外网站&#xff0c;确实比较麻烦。因此决定自己转发一个出来。 一、获取OpenAI授权密钥 首先&#xff0c;进入platform.openai.com-Personal-View API keys 不过OpenAI的key并不是免费的&…

VMware虚拟机三种网络模式详解之Bridged(桥接模式)

VMware虚拟机三种网络模式详解 Bridged&#xff08;桥接模式&#xff09; 由于Linux目前很热门&#xff0c;越来越多的人在学习Linux&#xff0c;但是买一台服务放家里来学习&#xff0c;实在是很浪费。那么如何解决这个问题&#xff1f;虚拟机软件是很好的选择&#xff0c;常…

登高作业安全带穿戴识别系统 yolov5

登高作业安全带穿戴识别系统通过yolov5python网络框架模型技术&#xff0c;登高作业安全带穿戴识别算法模型实现对登高作业人员是否穿戴安全带进行监测并及时发出警报。YOLO系列算法是一类典型的one-stage目标检测算法&#xff0c;其利用anchor box将分类与目标定位的回归问题结…

前端web入门-HTML-day02

(创作不易&#xff0c;感谢有你&#xff0c;你的支持&#xff0c;就是我前行的最大动力&#xff0c;如果看完对你有帮助&#xff0c;请留下您的足迹&#xff09; 目录 列表 无序列表 有序列表 定义列表 表格 基本使用 合并单元格 跨列合并 跨行合并 表单 input …

网络安全里主要的岗位有哪些?小白如何快速入门学习黑客?

入门Web安全、安卓安全、二进制安全、工控安全还是智能硬件安全等等&#xff0c;每个不同的领域要掌握的技能也不同。 当然入门Web安全相对难度较低&#xff0c;也是很多人的首选。主要还是看自己的兴趣方向吧。 本文就以下几个问题来说明网络安全大致学习过程&#x1f447; 网…

软件设计师 数据库刷题项并包含知识点总结

**两级映像 有概念模式和内模式跟物理独立性相关&#xff0c;有外模式和概念模式跟逻辑独立性相关 ** 属性列就是RS共同拥有的ABC&#xff0c;一般去除后面的&#xff0c;所以就只有前面三个ABC&#xff0c;元组就是有没有自然连接成功的&#xff0c;就是R.AS.A R.BS.B… 选项里…

[人工智能原理]

软件工程 定义 采用工程概念、原理、技术、方法来开发、维护软件&#xff0c;把经过时间考验而证明正确的管理技术和当前能够得到的最好技术方法结合起来&#xff0c;经济开发出高质量软件并有效的维护 基本目标 目标 可用性 正确性 合算性 原则 采用适合的开发范型、开…

计算机操作系统(慕课版)第一章课后题答案

第一章 操作系统引论 一、简答题 1.在计算机系统上配置OS的目标是什么&#xff1f;作用表现在哪几个方面&#xff1f; 在计算机系统上配置OS&#xff0c;主要目标是实现&#xff1a;方便性、有效性、可扩充性和开放性&#xff1b; OS的作用主要表现在以下3个方面&#xff1a; 1…

matplotlib后端@backend@高清图输出格式控制@SVG格式输出

文章目录 notebookmatplotlib&#x1f388;matplotlib backendbackendfrontend后端类型AGG配置后端Note不区分大小写三种配置方式在matplotlibrc文件中使用rcParams["backend"]参数&#xff1a;使用MPLBACKEND环境变量&#xff1a;使用matplotlib.use()函数&#xff…