四、单线程多路IO复用+多线程业务工作池

news2025/1/16 17:37:35

文章目录

  • 一、前言
    • 1 编译方法
  • 二、单线程多路IO复用+多线程业务工作池结构
  • 三、重写`Client_Context`类
  • 四、编写`Server`类

一、前言

我们以及讲完单线程多路IO复用 以及任务调度与执行的C++线程池,接下来我们就给他结合起来。

由于项目变大,尝试解耦项目,使用CMake ,可以看这篇文章现代CMake使用,使C++代码解耦

本节代码均可在仓库TinyWebServer 中找到

1 编译方法

# 进入Server目录下
mkdir build
cd build
cmake ..
cmake --build .

二、单线程多路IO复用+多线程业务工作池结构

简单来说就是把,读写任务交给线程池。

单线程多路IO复用+多线程业务工作池

三、重写Client_Context

上一节的Client_Context类并不能做到线程安全,以及管理客户端状态。所以做以下改变。

// client_context.h

class ClientContext {
public:
    ClientContext() : active(true) {}
    void pushMessage(const string &msg);
    bool hasMessages() const;
    string popMessage();
    void setWriteReady(bool ready);
    bool isWriteReady() const;
    bool isActive() const;
    void deactivate();

private:
    queue<string> send_queue;	// 消息队列
    bool write_ready = false;	// 是否可写
    mutable mutex mtx;			// const下也可锁
    atomic<bool> active;		// 活跃检测
};
// client_context.cpp

#include "client_context.h"

// ClientContext implementation
void ClientContext::pushMessage(const string &msg) {
    lock_guard<mutex> lock(mtx);
    send_queue.push(msg);
}

bool ClientContext::hasMessages() const {
    lock_guard<mutex> lock(mtx);
    return !send_queue.empty();
}
string ClientContext::popMessage() {
    lock_guard<mutex> lock(mtx);
    string msg = send_queue.front();
    send_queue.pop();
    return msg;
}
void ClientContext::setWriteReady(bool ready) {
    lock_guard<mutex> lock(mtx);
    write_ready = ready;
}
bool ClientContext::isWriteReady() const {
    lock_guard<mutex> lock(mtx);
    return write_ready;
}
bool ClientContext::isActive() const { return active; }
void ClientContext::deactivate() { active = false; }

四、编写Server

封装成类,隐藏细节。

#pragma once
#include <fcntl.h>
#include <netinet/in.h>
#include <sys/epoll.h>
#include <sys/socket.h>
#include <unistd.h>

#include <memory>
#include <unordered_map>

#include "client_context.h"
#include "thread_pool.h"

const int MAX_EVENTS = 10;
const int BUFFER_SIZE = 1024;

class Server {
public:
    Server(int port);
    void run();

private:
    void handleNewConnection();
    void handleClientEvent(epoll_event &event);
    void handleRead(int client_fd);
    void handleWrite(int client_fd);
    void removeClient(int client_fd);
    void modifyEpollEvent(int fd, uint32_t events);

    int server_fd;
    int epoll_fd;
    ThreadPool pool;
    unordered_map<int, shared_ptr<ClientContext>> clients;
    mutex clients_mutex;
};
#include "server.h"
#include <cstdint>

// Server implementation
Server::Server(int port) : pool(4) {
    server_fd = socket(AF_INET, SOCK_STREAM | SOCK_NONBLOCK, 0);
    if (server_fd < 0) {
        throw runtime_error("Socket creation failed");
    }

    sockaddr_in server_addr;
    server_addr.sin_family = AF_INET;
    server_addr.sin_addr.s_addr = INADDR_ANY;
    server_addr.sin_port = htons(port);

    if (bind(server_fd, (sockaddr *)&server_addr, sizeof(server_addr)) < 0) {
        close(server_fd);
        throw runtime_error("Bind failed");
    }

    if (listen(server_fd, SOMAXCONN) < 0) {
        close(server_fd);
        throw runtime_error("Listen failed");
    }

    epoll_fd = epoll_create1(0);
    if (epoll_fd < 0) {
        close(server_fd);
        throw runtime_error("epoll_create1 failed");
    }

    epoll_event event;
    event.data.fd = server_fd;
    event.events = EPOLLIN | EPOLLET;
    if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, server_fd, &event) < 0) {
        close(server_fd);
        close(epoll_fd);
        throw runtime_error("epoll_ctl failed");
    }
}

void Server::run() {
    vector<epoll_event> events(MAX_EVENTS);

    while (true) {
        int event_count = epoll_wait(epoll_fd, events.data(), MAX_EVENTS, -1);

        if (event_count < 0) {
            cerr << "epoll_wait failed: " << endl;
            break;
        }

        for (int i = 0; i < event_count; i++) {
            if (events[i].data.fd == server_fd) {
                handleNewConnection();
            } else {
                handleClientEvent(events[i]);
            }
        }
    }
}

void Server::handleNewConnection() {
    while (true) {
        sockaddr_in client_addr;
        socklen_t client_len = sizeof(client_addr);
        int client_fd = accept4(server_fd, (sockaddr *)&client_addr, &client_len, SOCK_NONBLOCK);
        if (client_fd < 0) {
            if (errno == EAGAIN || EWOULDBLOCK) {
                cout << "No more new connections to accept" << endl;
                break;
            } else {
                cerr << "Accept failed: " << endl;
                break;
            }
        }

        epoll_event event;
        event.data.fd = client_fd;
        event.events = EPOLLIN | EPOLLET;
        int epoll_ctl_result = epoll_ctl(epoll_fd, EPOLL_CTL_ADD, client_fd, &event);
        if (epoll_ctl_result < 0) {
            cerr << "epoll_ctl failed for client socket: " << endl;
            close(client_fd);
        } else {
            {
                lock_guard<mutex> lock(clients_mutex);
                clients[client_fd] = make_shared<ClientContext>();
            }
        }
    }
}

void Server::handleClientEvent(epoll_event &event) {
    int client_fd = event.data.fd;
    if (event.events & (EPOLLERR | EPOLLHUP)) {
        cout << "Error or hangup event for client: " << client_fd << endl;
        removeClient(client_fd);
    } else {
        if (event.events & EPOLLIN) handleRead(client_fd);
        if (event.events & EPOLLOUT) handleWrite(client_fd);   
    }
}

void Server::handleRead(int client_fd) {
    pool.enqueue([this, client_fd] {
        shared_ptr<ClientContext> client;
        {
            lock_guard<mutex> lock(clients_mutex);
            auto it = clients.find(client_fd);
            if (it == clients.end() || !it->second->isActive()) {
                cout << "Client " << client_fd << " not found or not active, skipping read handling" << endl;
                return;
            }
            client = it->second;
        }

        string buffer(BUFFER_SIZE, 0);
        while (true) {
            int read_len = read(client_fd, buffer.data(), buffer.size());
            if (read_len < 0) {
                if (errno == EAGAIN || errno == EWOULDBLOCK)
                    break;
                else {
                    cerr << "Read failed on socket " << client_fd << endl;
                    removeClient(client_fd);
                    break;
                }
            } else if (read_len == 0) {
                cout << "Client disconnected: " << client_fd << endl;
                removeClient(client_fd);
                break;
            } else {
                cout << "Received from client " << client_fd << ": " << buffer.substr(0, read_len) << endl;
                string message = "Echo: " + buffer.substr(0, read_len);
                client->pushMessage(message);
                client->setWriteReady(true);
                modifyEpollEvent(client_fd, EPOLLIN | EPOLLOUT);
            }
        }
    });
}

void Server::handleWrite(int client_fd) {
    pool.enqueue([this, client_fd] {
        shared_ptr<ClientContext> client;
        {
            lock_guard<mutex> lock(clients_mutex);
            auto it = clients.find(client_fd);
            if (it == clients.end() || !it->second->isActive()) {
                cout << "Client " << client_fd << " not found or not active, skipping write handling" << endl;
                return;
            }
            client = it->second;
        }

        if (!client->isWriteReady()) return;

        bool keep_writing = true;
        while (keep_writing && client->hasMessages()) {
            string message = client->popMessage();
            size_t total_sent = 0;
            while (total_sent < message.size()) {
                int write_len = write(client_fd, message.data() + total_sent, message.size() - total_sent);
                if (write_len < 0) {
                    if (errno == EAGAIN || errno == EWOULDBLOCK) {
                        client->pushMessage(message.substr(total_sent));
                        keep_writing = false;
                        break;
                    } else {
                        cerr << "Write error on socket " << client_fd << endl;
                        removeClient(client_fd);
                        return;
                    }
                } else total_sent += write_len;
            }
            if (total_sent == message.size()) 
                cout << "Sent to client " << client_fd << ": " << message << endl;
        }

        if (!client->hasMessages()) {
            client->setWriteReady(false);
            modifyEpollEvent(client_fd, EPOLLIN);
        }
    });
}

void Server::removeClient(int client_fd) {
    shared_ptr<ClientContext> client;
    {
        lock_guard<mutex> lock(clients_mutex);
        auto it = clients.find(client_fd);
        if (it != clients.end()) {
            client = it->second;
            clients.erase(it);
        }
    }

    if (client) {
        client->deactivate();
        int epoll_ctl_result = epoll_ctl(epoll_fd, EPOLL_CTL_DEL, client_fd, nullptr);
        if (epoll_ctl_result < 0) 
            cerr << "Failed to remove client from epoll: " << endl;
        close(client_fd);
    }
}

void Server::modifyEpollEvent(int fd, uint32_t events) {
    epoll_event event;
        event.data.fd = fd;
        event.events = events | EPOLLET;
        if (epoll_ctl(epoll_fd, EPOLL_CTL_MOD, fd, &event) < 0) 
            cerr << "Failed to modify epoll event for fd " << fd << endl;
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1949781.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

基于opencv的答题卡识别

文章目录 一、背景需求二、处理步骤图片预处理检测到答题卡轮廓透视变换找每个圆圈的轮廓轮廓排序判断是否答题正确 一、背景需求 传统的手动评分方法耗时且容易出错&#xff0c;自动化评分可以可以显著提高评分过程的速度和准确性、减少人工成本。 答题卡图片处理效果如下&am…

使用法国云手机进行面向法国的社媒营销

在当今数字化和全球化的时代&#xff0c;社交媒体已经成为企业营销和拓展市场的重要工具。对于想进入法国市场的企业来说&#xff0c;如何在海外社媒营销中脱颖而出、抓住更多的市场份额&#xff0c;成为了一个关键问题。法国云手机正为企业提供全新的营销工具&#xff0c;助力…

观测云产品更新 | 异常追踪、场景图表、快照、监控等

观测云更新 异常追踪 1、新增【分析看板】&#xff1a;可视化展示不同指标数据。 2、新增【日程】管理和【通知策略】&#xff1a;对 Issue 的内容范围做进一步通知分配。 场景 1、图表&#xff1a;新增【时间偏移】设置。启用时间偏移后&#xff0c;当查询相对时间区间时&a…

项目架构知识点总结

项目架构知识点总结 【一】重要注解【1】SpringBootApplication&#xff08;1&#xff09;⭐️ComponentScan 注解&#xff08;2&#xff09;⭐️EnableAutoConfiguration 注解&#xff08;3&#xff09;⭐️SpringBootConfiguration 注解&#xff08;4&#xff09;Inherited 注…

昇思25天学习打卡营第01天|昇思MindSpore大模型基础j介绍

昇思MindSpore和华为昇思MindSpore大模型学习打卡系列文章&#xff0c;本文仅供参考~ 文章目录 前言一、昇思MindSpore是什么&#xff1f;二、执行流程三、设计理念四、层次结构五、Huawei昇腾AI全栈 前言 随着计算机大模型的不断发展&#xff0c;Ai这门技术也越来越重要&#…

常见CSS属性(二)——浮动

一、浮动简述 浏览器在解析html文档时&#xff0c;正常的顺序是从上往下、从左往右解析。这个正常的解析过程&#xff0c;叫做正常文档流(标准文档流)&#xff0c;而浮动就是使得元素脱离文档流&#xff0c;“浮”在浏览器上。 浮动会使元素脱离文档流&#xff0c;不占位置&…

在 MinIO 使用 SVE 将 ARM 带入人工智能数据基础设施领域

MinIO 性能如此之高的原因之一是&#xff0c;我们做了其他人不会或不能做的细粒度工作。从 SIMD 加速到 AVX-512 优化&#xff0c;我们已经完成了艰巨的任务。ARM CPU 架构的最新发展&#xff0c;特别是可扩展矢量扩展 &#xff08;SVE&#xff09;&#xff0c;为我们提供了比前…

网页秒表小工具

网页秒表小工具 效果展示 HTML代码 <!DOCTYPE html> <html lang"zh-CN"> <head><meta charset"UTF-8"><title>简洁秒表</title><style>body {font-family: Arial, sans-serif;display: flex;justify-content:…

现货白银交易中spring形态的应用

在现货白银市场中交易想取得成功并从市场中获利&#xff0c;掌握一些工具是必不可少的&#xff0c;而今天我们要介绍的现货白银的交易工具就是spring形态。 对于spring这个英文&#xff0c;我们都很熟悉&#xff0c;它有春天的意思&#xff0c;但这里所说的spring形态并不是指春…

重塑生态体系 深挖应用场景 萤石诠释AI时代智慧生活新图景

7月24日&#xff0c;“智动新生&#xff0c;尽在掌控”2024萤石夏季新品发布会在杭州举办。来自全国各地的萤石合作伙伴、行业从业者及相关媒体&#xff0c;共聚杭州&#xff0c;共同见证拥抱AI的萤石&#xff0c;将如何全新升级&#xff0c;AI加持下的智慧生活又有何不同。 发…

架构设计面试经验总结

文章收录在网站&#xff1a;http://hardyfish.top/ 文章收录在网站&#xff1a;http://hardyfish.top/ 文章收录在网站&#xff1a;http://hardyfish.top/ 文章收录在网站&#xff1a;http://hardyfish.top/ 学习架构设计知识的思路总结为以下几点&#xff1a; 想要学习架构…

Python + PyQt 搭建可视化页面(PyCharm)

Python PyQt 搭建可视化页面&#xff08;PyCharm&#xff09; 配置PyQt5环境 1.1 安装PyQt5和PyQt5-tools pip install PyQt5pip install PyQt5-tools1.2 QtDesigner和PyUIC环境的配置 配置QTDesigner&#xff0c;用来打开QT可视化开发工具 在PyCharm中依次打开&#xff1a…

经典文献阅读之--World Models for Autonomous Driving(自动驾驶的世界模型:综述)

Tip: 如果你在进行深度学习、自动驾驶、模型推理、微调或AI绘画出图等任务&#xff0c;并且需要GPU资源&#xff0c;可以考虑使用UCloud云计算旗下的Compshare的GPU算力云平台。他们提供高性价比的4090 GPU&#xff0c;按时收费每卡2.6元&#xff0c;月卡只需要1.7元每小时&…

2024-07-24 Linux C語言使用inotify进行文件变化检测

一、在Linux中&#xff0c;用C语言检测文件内容变化的方法有几种&#xff0c;最常用的包括以下几种&#xff1a; 轮询&#xff08;Polling&#xff09;&#xff1a;周期性地读取文件并检查内容是否变化。inotify&#xff1a;使用Linux内核提供的inotify接口&#xff0c;这是一…

【AIGC】构建自己的谷歌搜索引擎服务并使用

一、谷歌 谷歌的搜索引擎需要自己创建服务才能启用检索api。&#xff08;需自行翻墙和创建自己的谷歌账号&#xff09; 1.1 API服务创建 1&#xff09;登陆https://console.cloud.google.com/: 2&#xff09; 选择新建项目&#xff0c;取号项目名即可&#xff08;比如:Olin…

scrapy爬取城市天气数据

scrapy爬取城市天气数据 一、创建scrapy项目二、修改settings,设置UA,开启管道三、编写爬虫文件四、编写items.py五、在weather.py中导入WeatherSpiderItem类六、管道中存入数据,保存至csv文件七、完整代码一、创建scrapy项目 先来看一下爬取的字段情况: 本次爬取城市天…

谷粒商城实战笔记-60-商品服务-API-品牌管理-效果优化与快速显示开关

文章目录 一&#xff0c;显示状态列改为switch开关二&#xff0c;监听状态改变 首先&#xff0c;把ESLint语法检查关掉&#xff0c;因为这个语法检查过于严格&#xff0c;在控制台输出很多错误信息&#xff0c;干扰开发。 在build目录下下webpack.base.conf.js中&#xff0c;把…

类、名称空间和类库

类(class)构成程序的主体 名称空间(namespace)以树型结构组织类&#xff0c;例如Button和Path类 如何知道类属于哪个名称空间呢&#xff1a; 点击帮助-查看帮助 在该界面下搜索&#xff1a; 查看名称空间 搜索System 名称空间&#xff0c;可以查找与操作系统打交道的重要的名…

使用kali进行端口扫描

目录 一、使用nping工具向目标主机的指定端口发送自定义数据包 二、使用Nmap工具进行端口扫描 三、使用Zenmap工具进行扫描 一、使用nping工具向目标主机的指定端口发送自定义数据包 nping工具允许用户产生各种网络数据包&#xff08;TCP&#xff0c;UDP&#xff0c;ICMP&am…

【YashanDB知识库】服务端是GBK编码,导致从22.2.12.100升级到22.2.13.100失败问题

问题现象 问题单&#xff1a;22.2.12.100升级到22.2.13.100失败 现象&#xff1a;如下图&#xff0c;从22.2.12.100升级到22.2.13.100失败&#xff0c;报错。 问题风险及影响 版本升级失败&#xff0c;影响上线 问题发生版本 客户版本&#xff1a;22.2.12.100 现在版本已…