基于多反应堆的高并发服务器【C/C++/Reactor】(中)主线程给子线程添加任务以及如何处理该任务

news2024/11/15 20:02:16

在看此篇文章,建议先看我的往期文章: 

基于多反应堆的高并发服务器【C/C++/Reactor】(中)在EventLoop的任务队列中添加新任务-CSDN博客icon-default.png?t=N7T8https://blog.csdn.net/weixin_41987016/article/details/135346492?spm=1001.2014.3001.5501一、了解socketpair函数(CodeGeex生成)

Linux 中的 socketpair 函数用于创建一个双向的 socket 连接,通常用于父子进程之间的通信。在上述代码中,socketpair 函数用于初始化一个事件循环(EventLoop)对象,并为其分配一个 socket pair,用于接收来自子进程的数据。

socketpair 函数的原型为:

#include <sys/socket.h>
int socketpair(int domain, int type, int protocol, int sv[2]);

其中,family 参数指定套接字家族(如 AF_UNIXAF_INET),type 参数指定套接字类型(如 SOCK_STREAM SOCK_DGRAM),protocol 参数指定套接字协议(如 0),sv[2] 参数指向一个整数数组,用于存储创建的文件描述符对。

如果 socketpair 函数返回 0,则表示创建成功,如果返回 -1,则表示创建失败

二、socketpair 初始化

socketpair 是一个在Unix-like系统上创建套接字对的函数。它创建的两个套接字描述符(socketpair[0] socketpair[1])可以用于进程间通信(IPC)socketpair[1] 的作用如下:

  1. 双向通信socketpair[1] 可以用于与 socketpair[0] 进行全双工通信。这意味着,每一个套接字既可以读也可以写。例如,可以往 socketpair[0] 中写,从 socketpair[1] 中读;或者从socketpair[1] 中写,从 socketpair[0] 中读。
  2. 阻塞与非阻塞:如果往一个套接字(如 socketpair[0] )中写入后,再从该套接字读时会阻塞,只能在另一个套接字(如 socketpair[1] )上读成功。
  3. 进程间通信:读、写操作可以位于同一个进程,也可以分别位于不同的进程,如父子进程。如果是父子进程时,一般会功能分离,一个进程用来读,一个用来写。因为文件描述符socketpair[0] socketpair[1]是进程共享的,所以读的进程要关闭写描述符,反之,写的进程关闭读描述符。总的来说,socketpair[1] 在进程间通信中扮演着重要的角色,它使得两个进程可以通过套接字进行数据交换。

>>(1)主线程唤醒子线程 

  • Channel.h 
// 定义函数指针
typedef int(*handleFunc)(void* arg);
 
struct Channel {
    // 文件描述符
    int fd;
    // 事件
    int events;
    // 回调函数
    handleFunc readCallback;// 读回调
    handleFunc writeCallback;// 写回调
    // 回调函数的参数
    void* arg;
};
  • Channel.c 
#include "Channel.h"
 
struct Channel* channelInit(int fd, int events, handleFunc readFunc, handleFunc writeFunc, void* arg) {
    struct Channel* channel = (struct Channel*)malloc(sizeof(struct Channel));
    channel->fd = fd;
    channel->events = events;
    channel->readFunc = readFunc;
    channel->writeFunc = writeFunc;
    channel->arg = arg;
    return channel;
}
  • EventLoop.c

evLoop->socketPair[1]作了封装,得到了一个channel

struct Channel* channel = channelInit(evLoop->socketPair[1],ReadEvent, 
        readLocalMessage,NULL,evLoop);

那么对于evLoop->socketpair[1]这个文件描述符什么时候就能被被激活呢?

  • 通过一个写数据的函数,往socketPair[0]里边写数据的时候,通过socketPair[1]就能够接收到数据,此时这个socketPair[1]就被激活了,就去调用其读事件。
  • 这么做的原因是:底层的dispatcherpoll、epoll_wait或select,它们有可能是阻塞的,比如说它们检测的集合里边有文件描述符没有处于激活状态,我们往这个检测集合里安插了一个内线,通过socketPair[0]去写数据,只要一写数据,对应的poll、epoll_wait或select能够检测到socketPair[1]的读事件被激活了。
  • 如果检测到socketPair[1]的读事件被激活了,那么poll、epoll_wait或select阻塞函数就直接被解除阻塞了。它们解除阻塞了,子线程就能够正常工作了。子线程能够正常工作,子线程就能够处理任务队列里边的任务,通过主线程往evLoop->socketPair[0]发送数据(写数据),void taskWakeup(struct EventLoop* evLoop);
// 写数据
void taskWakeup(struct EventLoop* evLoop) {
    const char* msg = "我是要成为海贼王的男人!";
    write(evLoop->socketPair[0],msg,strlen(msg));
}

此时socketPair[1]被激活了,它所对应的读回调就会被调用。这样就能解除子线程的阻塞,让它去处理任务队列里边的任务

// 读数据 读事件回调函数在执行时需要指定参数
int readLocalMessage(void* arg) {
    struct EventLoop* evLoop = (struct EventLoop*)arg;
    char buffer[256];
    int ret = read(evLoop->socketPair[1],buffer,sizeof(buffer));
    if (ret > 0) {
        printf("read msg: %s\n",buffer);
    }
    return 0;
}

>>(2)channel 添加到任务队列

第一步:channel 添加到任务队列 通过调用eventLoopAddTask(evLoop,channel,ADD);添加到了任务队列 

// channel 添加到任务队列
eventLoopAddTask(evLoop, channel,ADD);

第二步:遍历任务队列,从中取出每一个节点,根据节点里边的类型对这个节点做操作(待续~,在后续的文章中会讲到eventLoopProcessTask这个函数),若是type为添加(ADD),那么就把任务节点添加到任务队列中去

故在下面的这篇文章的基础上,增加一些代码内容:

基于多反应堆的高并发服务器【C/C++/Reactor】(中)EventLoop初始化和启动_一个eventloop 可以有多少个连接-CSDN博客icon-default.png?t=N7T8https://blog.csdn.net/weixin_41987016/article/details/135225734?csdn_share_tail=%7B%22type%22%3A%22blog%22%2C%22rType%22%3A%22article%22%2C%22rId%22%3A%22135225734%22%2C%22source%22%3A%22weixin_41987016%22%7DEventLoop.h中添加 int sockPair[2];表示存储本地通信的fd,通过socketpair初始化

struct EventLoop{
    bool isQuit;// 开关
    struct Dispatcher* dispatcher;
    void* dispatcherData;
    
    // 任务队列
    struct ChannelElement* head;
    struct ChannelElement* tail;

    // 用于存储channel的map
    struct ChannelMap* channelMap;
    
    // 线程ID,Name,mutex
    pthread_t threadID;
    char threadName[32];
    pthread_mutex_t mutex;
    int socketPair[2]; //存储本地通信的fd 通过socketpair初始化
};

EventLoop.c中续写 

struct EventLoop* eventLoopInitEx(const char* threadName) {
    struct EventLoop* evLoop = (struct EventLoop*)malloc(sizeof(struct EventLoop));
    evLoop->isQuit = false; // 没有运行
    evLoop->dispatcher = &EpollDispatcher;
    evLoop->dispatcherData = evLoop->dispatcher->init(); 
    
    // 任务队列(链表)
    evLoop->head = evLoop->tail = NULL;

    // 用于存储channel的map
    evLoop->channelMap = channelMapInit(128);

    evLoop->threadId = pthread_self(); // 当前线程ID
    strcpy(evLoop->threadName,threadName == NULL ? "MainThread" : threadName); // 线程的名字
    pthread_mutex_init(&evLoop->mutex, NULL); 
    
    // 已续写
    int ret = socketpair(AF_UNIX, SOCK_STREAM, 0, evLoop->socketPair);
    if(ret == -1) {
        perror("socketpair");
        exit(0);
    }
    // 指定规则:evLoop->socketPair[0] 发送数据,evLoop->socketPair[1]接收数据
    struct Channel* channel = channelInit(evLoop->socketPair[1],ReadEvent, 
        readLocalMessage,NULL,evLoop);
    // channel 添加到任务队列
    eventLoopAddTask(evLoop, channel,ADD);
    return evLoop;
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1360526.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

kubesphere和k8s的使用分享

文章目录 什么是kubernetesKubernetes的部分核心概念互式可视化管理平台与kubernetes的关系市面是常见的kubernetes管理平台 什么是kubesphereKubesphere默认安装的组件Kubesphere涉及的服务组件kubesphere的安装Kubesphere相关的内容 什么是kubernetes 就在这场因“容器”而起…

backtrader框架初探,轻松跑通策略并策略分析

网上有很多backtrader的文章&#xff0c;并有些将其与vnpy做比较&#xff0c;经过安装后发现&#xff0c;还是backtrader教程简单。 1、前期准备 # 安装akshare免费行情源 pip install akshare -i http://mirrors.aliyun.com/pypi/simple/ --trusted-hostmirrors.aliyun.com …

kubectl 源码分析

Cobra库 k8s各组件的cli部分都使用Cobra库实现&#xff0c;Cobra 中文文档 - 掘金 (juejin.cn)&#xff0c;获取方式如下&#xff1a; go get -u github.com/spf13/cobralatest cobra库中的Command结构体的字段&#xff0c;用于定义命令行工具的行为和选项。它们的作用如下&…

性能优化-OpenMP基础教程(五)-全面讲解OpenMP基本编程方法

本文主要介绍OpenMP编程的编程要素和实战&#xff0c;包括并行域管理详细实战、任务分担详细实战。 &#x1f3ac;个人简介&#xff1a;一个全栈工程师的升级之路&#xff01; &#x1f4cb;个人专栏&#xff1a;高性能&#xff08;HPC&#xff09;开发基础教程 &#x1f380;C…

Linux与安全

本心、输入输出、结果 文章目录 系统设计 - 我们如何通俗的理解那些技术的运行原理 - 第八部分&#xff1a;Linux、安全 前言 Linux 文件系统解释应该知道的 18 个最常用的 Linux 命令HTTPS如何工作&#xff1f; 数据是如何加密和解密的&#xff1f;为什么HTTPS在数据传输过程…

IntelliJ IDEA远程查看修改Ubuntu上AOSP源码

IntelliJ IDEA远程查看修改Ubuntu上的源码 本人操作环境windows10,软件版本IntelliJ IDEA 2023.2.3&#xff0c;虚拟机Ubuntu 22.04.3 LTS 1、Ubuntu系统安装openssh 查看是否安装&#xff1a; ssh -V 如果未安装&#xff1a; sudo apt install openssh-server # 开机自启…

php 数组中的元素进行排列组合

需求背景&#xff1a;计算出数组[A,B,C,D]各种排列组合&#xff0c;希望得到的是数据如下图 直接上代码&#xff1a; private function finish_combination($array, &$groupResult [], $splite ,){$result [];$finish_result [];$this->diffArrayItems($array, $…

springboot实现ChatGPT式调用(一次调用,持续返回)

下边实现了一个持续返回100以内随机数的接口&#xff0c;在接口超时之前会每隔1秒返回一个随机数 GetMapping(value "/getRandomNum", produces MediaType.TEXT_EVENT_STREAM_VALUE) public SseEmitter getRandomNum() {SseEmitter emitter new SseEmitter();Th…

使用STM32的定时器和PWM实现LCD1602的背光控制

使用STM32的定时器和PWM功能来控制LCD1602的背光是一种常见的方法&#xff0c;它可以实现背光的亮度调节和闪烁效果。在本文中&#xff0c;我们将讨论如何利用STM32的定时器和PWM来实现LCD1602的背光控制&#xff0c;并提供相应的代码示例。 1. 硬件连接和初始化 首先&#x…

负责任的人工智能与人机环境系统智能

负责任的人工智能是指在人工智能系统的设计、开发、管理、使用和维护过程中&#xff0c;所有相关的角色&#xff08;包括设计者、开发者、管理者、使用者、维护者等等&#xff09;都承担其行为的道义、法律和社会责任。这意味着这些角色需要确保人工智能系统的设计与使用符合伦…

C++完成使用map Update数据 二进制数据

1、在LXMysql.h和LXMysql.cpp分别定义和编写关于pin语句的代码 //获取更新数据的sql语句 where语句中用户要包含where 更新std::string GetUpdatesql(XDATA kv, std::string table, std::string where); std::string LXMysql::GetUpdatesql(XDATA kv, std::string table, std…

window服务器thinkphp队列监听服务

经常使用linux的同学们应该对使用宝塔来做队列监听一定非常熟悉&#xff0c;但对于windows系统下&#xff0c;如何去做队列的监听&#xff1f;是一个很麻烦的事情。 本文将通过windows系统的服务来实现队列的监听。 对于thinkphp6 queue如何使用&#xff0c;不再赘述。其它系…

Java经典框架之Zookeeper

Zookeeper Java 是第一大编程语言和开发平台。它有助于企业降低成本、缩短开发周期、推动创新以及改善应用服务。如今全球有数百万开发人员运行着超过 51 亿个 Java 虚拟机&#xff0c;Java 仍是企业和开发人员的首选开发平台。 课程内容的介绍 1. Zookeeper的介绍和安装 2. …

JavaScript 基础学习笔记(四):循环语句、while循环、中止循环、无限循环、for 语句

目录 一、循环语句 1.1 while循环 1.2 中止循环 1.3 无限循环 二、综合案例-ATM存取款机 三、for 语句 一、循环语句 1.1 while循环 while : 在…. 期间&#xff0c; 所以 while循环 就是在满足条件期间&#xff0c;重复执行某些代码。 语法&#xff1a; while (条件表…

vue3-admin-element框架实现动态路由(根据接口返回)

第一步&#xff1a;在src-utils-handleRoutes&#xff0c;修改代码&#xff1a; export function convertRouter(routers) {let array routersrouters []for (let i in array) {for(let s in asyncRoutes){if (array[i].path asyncRoutes[s].path) {routers.push(asyncRout…

如何下载 ASTR 数据

ASTR (Advanced Spaceborne Thermal Emission and Reflection Radiometer) 卫星是由美国宇航局 (NASA) 和日本国家航空航天局 (JAXA) 合作开发和运营的。ASTR 主要用于地球观测&#xff0c;其主要仪器包括三个子系统&#xff1a; VNIR (Visible and Near Infrared) 子系统&…

C++矩阵例题分析(3):螺旋矩阵

一、审题 时间限制&#xff1a;1000ms 内存限制&#xff1a;256MB 各平台平均AC率&#xff1a;14.89% 题目描述 输出一个n*n大小的螺旋矩阵。 螺旋矩阵的样子&#xff1a; 输入描述 共一行&#xff0c;一个正整数n&#xff0c;表示矩阵变长的长度…

掌握 Postman Newman:快速启动 API 测试自动化

Postman 中的 Newman 是什么&#xff1f; Newman 是一个 CLI&#xff08;命令行界面&#xff09;工具&#xff0c;用于运行 Postman 中的集合&#xff08;Collection&#xff09;和环境&#xff08;Environment&#xff09;来进行自动化测试。它允许直接从命令行运行 Postman …

Docker数据卷详解

文章目录 数据卷1 cp命令2 数据卷2.1 数据卷类型2.2 宿主机数据卷2.3 命名的数据卷2.4 匿名数据卷2.5 清理数据卷2.6 数据卷容器 数据卷 ​ 当我们在使用docker容器的时候&#xff0c;会产生一系列的数据文件&#xff0c;这些数据文件在我们删除docker容器时是会消失的&#x…