【linux 多进程并发】0302 Linux下多进程模型的网络服务器架构设计,实时响应多客户端请求

news2024/12/23 14:51:33

0302 多进程网络服务器架构

专栏内容

  • postgresql使用入门基础
  • 手写数据库toadb
  • 并发编程

个人主页:我的主页
管理社区:开源数据库
座右铭:天行健,君子以自强不息;地势坤,君子以厚德载物.

一、概述


在大规模数据处理中,会有大量的客户端接入同一台服务器,每个客户端都需要长时间提供服务。

服务器采用中心化的部署,而客户端往往分散在不同机器上,服务器与客户端之间跨网络通信,一般采用C/S架构。

而服务端的架构需要能应对大量并发客户端,同时可以给每个客户端独占的服务,这就用到了多任务的网络模型架构,下面我们来看看用多进程如何实现。

二、多路复用的网络模型


C/S架构中,处理大量的网络请求,需要一套基于多路复用的网络处理模型。

  • 可以同时处理网络连接请求和网络数据传递;
  • 减少程序的阻塞时间,避免无效的CPU消耗;
  • 适应不同的并发规模;

以此为目标实现如下网络模型。

2.1 服务端网络监听

多路复用模型这里采用了epoll方式,如果自己的平台不支持,可以换为select或者poll的方式。

在这里插入图片描述

代码如下:

#include <stdio.h>  
#include <stdlib.h>  
#include <string.h>  
#include <unistd.h>  
#include <arpa/inet.h>  
#include <sys/epoll.h>  
#include <fcntl.h>  
#include <errno.h>  
  
#define MAX_EVENTS 10  
#define BUFFER_SIZE 1024  
#define PORT 8080  
  
// 设置文件描述符为非阻塞模式  
int set_nonblocking(int fd) {  
    int flags, s;  
  
    flags = fcntl(fd, F_GETFL, 0);  
    if (flags == -1) {  
        perror("fcntl F_GETFL");  
        return -1;  
    }  
  
    flags |= O_NONBLOCK;  
    s = fcntl(fd, F_SETFL, flags);  
    if (s == -1) {  
        perror("fcntl F_SETFL");  
        return -1;  
    }  
  
    return 0;  
}  
  
int main() {  
    int listen_fd, conn_fd, nfds, epoll_fd;  
    struct sockaddr_in server_addr;  
    struct epoll_event ev, events[MAX_EVENTS];  
    char buffer[BUFFER_SIZE];  
    ssize_t count;  
  
    // 创建监听socket  
    listen_fd = socket(AF_INET, SOCK_STREAM, 0);  
    if (listen_fd == -1) {  
        perror("socket");  
        exit(EXIT_FAILURE);  
    }  
  
    // 设置非阻塞模式  
    if (set_nonblocking(listen_fd) == -1) {  
        close(listen_fd);  
        exit(EXIT_FAILURE);  
    }  
  
    // 绑定地址和端口  
    server_addr.sin_family = AF_INET;  
    server_addr.sin_addr.s_addr = INADDR_ANY;  
    server_addr.sin_port = htons(PORT);  
    if (bind(listen_fd, (struct sockaddr*)&server_addr, sizeof(server_addr)) == -1) {  
        perror("bind");  
        close(listen_fd);  
        exit(EXIT_FAILURE);  
    }  
  
    // 开始监听  
    if (listen(listen_fd, SOMAXCONN) == -1) {  
        perror("listen");  
        close(listen_fd);  
        exit(EXIT_FAILURE);  
    }  
  
    // 创建epoll实例  
    epoll_fd = epoll_create1(0);  
    if (epoll_fd == -1) {  
        perror("epoll_create1");  
        close(listen_fd);  
        exit(EXIT_FAILURE);  
    }  
  
    // 添加监听socket到epoll实例  
    ev.events = EPOLLIN;  
    ev.data.fd = listen_fd;  
    if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, listen_fd, &ev) == -1) {  
        perror("epoll_ctl: listen_fd");  
        close(listen_fd);  
        close(epoll_fd);  
        exit(EXIT_FAILURE);  
    }  
  
    // 主循环  
    while (1) {  
        nfds = epoll_wait(epoll_fd, events, MAX_EVENTS, -1);  
        if (nfds == -1) {  
            perror("epoll_wait");  
            exit(EXIT_FAILURE);  
        }  
  
        for (int n = 0; n < nfds; ++n) {  
            if (events[n].data.fd == listen_fd) {  
                // 新的连接  
                conn_fd = accept(listen_fd, NULL, NULL);  
                if (conn_fd == -1) {  
                    perror("accept");  
                    continue;  
                }  
  
                // 设置非阻塞模式  
                if (set_nonblocking(conn_fd) == -1) {  
                    close(conn_fd);  
                    continue;  
                }  
  
                // 添加新的连接socket到epoll实例  
                ev.events = EPOLLIN | EPOLLET;  
                ev.data.fd = conn_fd;  
                if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, conn_fd, &ev) == -1) {  
                    perror("epoll_ctl: conn_fd");  
                    close(conn_fd);  
                }  
            } else {  
                // 处理读事件  
                conn_fd = events[n].data.fd;  
                while ((count = read(conn_fd, buffer, BUFFER_SIZE)) > 0) {  
                    // 处理接收到的数据(这里简单回显)  
                    write(conn_fd, buffer, count);  
                }  
  
                if (count == -1 && errno != EAGAIN) {  
                    // 出现错误或连接关闭  
                    close(conn_fd);  
                } else if (count == 0) {  
                    // 连接关闭  
                    close(conn_fd);  
                }  
  
                // 从epoll实例中移除已关闭的socket  
                if (count <= 0 && errno != EAGAIN) {  
                    ev.events = 0;  
                    epoll_ctl(epoll_fd, EPOLL_CTL_DEL, conn_fd, &ev);  
                }
            }  
        }  
    }  
  
    close(listen_fd);  
    close(epoll_fd);  
    return 0;  
}

说明

  • TCP服务端的基本步骤创建socket,设置为非阻塞模式,绑定IP与端口,开启监听;
  • 这里服务端的socket需要设置为非阻塞模式,因为我们是在单进程中处理多个连接,每个连接不能阻塞等待;
  • 然后加入到epoll监听池中,开始epoll事件的等待;这里只处理接收事件;
  • 如果有服务端socket的接收事件,那么说明有客户端连接消息,进行accep,创建客户端连接的socket;
  • 同样将客户端连接的socket设置为非阻塞,理由同上;加入epoll临听池中,同样也只处理接收事件;
  • 如果有客户端连接的socket上的接收事件,那么说明客户端正在给服务端发消息;
  • 收到客户端消息后,这里只是简单处理,原样再发给客户端;
  • 如果客户端关闭或出错,将客户端连接关闭,并从epoll临听池中移除;

2.2 客户端测试

现在我们来编写一个简单的客户端模拟程序,测试一下多路复用的网络框架。

/*
 * ex020302_client.c
 */

#include <stdio.h>  
#include <stdlib.h>  
#include <string.h>  
#include <unistd.h>  
#include <arpa/inet.h>  
  
#define SERVER_IP "127.0.0.1"  
#define SERVER_PORT 4808  
#define BUFFER_SIZE 1024  

#define CLIENT_SEND_CNT 20

int main() 
{  
    int sockfd;  
    struct sockaddr_in server_addr;  
    char buffer[BUFFER_SIZE] = {0};  
    const char *message = "Hello, Server!";  
  
    // 创建套接字  
    if ((sockfd = socket(AF_INET, SOCK_STREAM, 0)) < 0) 
    {  
        perror("socket creation failed");  
        exit(EXIT_FAILURE);  
    }  
  
    // 配置服务器地址信息  
    server_addr.sin_family = AF_INET;  
    server_addr.sin_port = htons(SERVER_PORT);  
  
    // 将IP地址从字符串转换为二进制形式  
    if (inet_pton(AF_INET, SERVER_IP, &server_addr.sin_addr) <= 0) 
    {  
        perror("Invalid address/ Address not supported");  
        close(sockfd);  
        exit(EXIT_FAILURE);  
    }  
  
    // 连接到服务器  
    if (connect(sockfd, (struct sockaddr *)&server_addr, sizeof(server_addr)) < 0) 
    {  
        perror("Connection Failed");  
        close(sockfd);  
        exit(EXIT_FAILURE);  
    }  
  
    for(int i = 0; i < CLIENT_SEND_CNT; i++)
    {
        // 发送消息到服务器  
        send(sockfd, message, strlen(message), 0);  
        printf("Message sent: %s\n", message);  
    
        // 接收服务器的响应  
        int bytes_received = recv(sockfd, buffer, BUFFER_SIZE - 1, 0);  
        if (bytes_received < 0) 
        {  
            perror("Error in receiving");  
        } 
        else if (bytes_received == 0) 
        {  
            printf("Server closed the connection\n");  
        } 
        else 
        {  
            buffer[bytes_received] = '\0'; // 确保字符串以空字符结尾  
            printf("Message received from server: %s\n", buffer);  
        }  

        sleep(1);
    }
  
    // 关闭套接字  
    close(sockfd);  
  
    return 0;  
}

说明

  • TCP客户端建立的基本步骤,创建socket,初始化服务端地址,连接服务器;
  • 然后向服务端发送相同的消息;
  • 每次发送完成后,等待接收消息;

2.3 客户端测试

可以看到,服务端处理客户端的请求时,都是按照接收到的顺序进行串行处理;

当客户端的数量达到成百上千时,对客户端的响应时间就会出现非常明显的延迟,

这种延迟会随着业务的复杂度而放大。

这时就需要充分利用多核CPU硬件资源,来进行并发任务的处理。

三、多进程服务处理


上面是在单个任务进程中处理了监听和大量任务连接的网络处理,各客户端连接的服务会相互影响,实际是串行化处理的。

要让大量的客户端能同时被响应,需要采用多任务的方式,那么在上面的网络模型基础上加入多进程,服务端为每个客户端连接准备一个独立的进程,这样就可以及时响应。

3.1 多进程架构

首先我们利用前面几个章节的介绍,来搭建一个多进程的代码架构,由主进程根据需要进行创建子进程,并且由主进程进行全局的控制。

在这里插入图片描述

/*
 * ex020302_netprocess.c
 */
#include <stdio.h>  
#include <stdlib.h>  
#include <string.h>  
#include <unistd.h>  
#include <arpa/inet.h>  
#include <sys/epoll.h>  
#include <fcntl.h>  
#include <errno.h>  
  
#define MAX_EVENTS 10  
#define BUFFER_SIZE 1024  
#define PORT 4808  

void daemon_fork()
{
	int pid = -1;
	pid = fork();
	if(pid < 0)
	{
		printf("fork error[%s]\n",strerror(errno));
		exit(-1);
	}
	else if(pid > 0)
	{
		// parent exit.
		exit(0);
	}
	else 
	{
		// child daemon
		return;
	}
}

void subprocess(int sock)
{
    int pid = -1;
	pid = fork();
	if(pid < 0)
	{
		printf("fork error[%s]\n",strerror(errno));
		exit(-1);
	}
	else if(pid > 0)
	{
		// parent.
        close(sock);
		return;
	}
	else 
	{
		// child 
        close(listen_fd);
        processMsg(sock);
		exit(0);
	}
}

说明

  • daemon服务程序函数,这个前一章节已经介绍过了,服务端以后台进程的方式运行;
  • 子进程任务处理函数;这里创建的是任务子进程,并在子进程中调用消息处理函数;
  • 这里需要注意的是,在子进程中要关闭服务端的socket,同时在父进程中要关闭客户端连接的socket; 因为父子进程会复制内存空间,但是在各自的进程中,已经不再需要;

3.2 并发网络处理模型

现在就可以将上面的多路复用网络处理放入多进程架构中,处理逻辑进行如下切分:

  • 服务端监听socket初始化,多路复用器的初始化等,都放在主进程中,作为服务端网络初始化的一部分;
  • 每个客户端连接的socket,以及它的读写消息处理逻辑,放在子进程中;这样每个客户端连接对应一个后台服务子进程;
  • 创建子进程的时机,也就是在主进程中接收到新连接时,创建新连接成功后,就可以新建子进程进行处理;
  • 而子进程的退出时间,就是客户端断开连接,或者处理出错时;
void initializeServerNet()
{
    struct sockaddr_in server_addr;  
  
    // 创建监听socket  
    listen_fd = socket(AF_INET, SOCK_STREAM, 0);  
    if (listen_fd == -1) 
    {  
        perror("socket");  
        exit(EXIT_FAILURE);  
    }  
   
    // 绑定地址和端口  
    server_addr.sin_family = AF_INET;  
    server_addr.sin_addr.s_addr = INADDR_ANY;  
    server_addr.sin_port = htons(PORT);  
    if (bind(listen_fd, (struct sockaddr*)&server_addr, sizeof(server_addr)) == -1) 
    {  
        perror("bind");  
        close(listen_fd);  
        exit(EXIT_FAILURE);  
    }

    // 开始监听
    if (listen(listen_fd, SOMAXCONN) == -1)
    {
        perror("listen");
        close(listen_fd);
        exit(EXIT_FAILURE);
    }
}

void closeServerFd()
{
    close(listen_fd);  
}

void dispatchLoop()
{
    int conn_fd;  

    // 主循环  
    while (1) 
    {  
        // 新的连接  
        conn_fd = accept(listen_fd, NULL, NULL);  
        if (conn_fd == -1) 
        {  
            perror("accept");  
            sleep(1);
            continue;  
        }

        subprocess(conn_fd);
    }  
}

void processMsg(int sock)
{
    char buffer[BUFFER_SIZE];  
    ssize_t count;  

    printf("serv-process:%d start.\n");

    while ((count = read(sock, buffer, BUFFER_SIZE)) > 0)
    {
        // 处理接收到的数据(这里简单回显)
        write(sock, buffer, count);
    }

    if (count == -1 && errno != EAGAIN)
    {
        // 出现错误或连接关闭
        close(sock);
    }
    else if (count == 0)
    {
        // 连接关闭
        close(sock);
    }

    printf("serv-process:%d exit.\n");
}

那么主程序实现如下:

void daemon_fork();
void subprocess(int sock);
void processMsg(int sock);

void initializeServerNet();
void closeServerFd();
void dispatchLoop();

int listen_fd;

int main(int argc ,char *argv[])
{
	daemon_fork();

	initializeServerNet();
    
  dispatchLoop();

  closeServerFd();
	return 0;
}
  • 在主进程中先进程服务端初始化;
  • 然后就可以开始监听,并接收客户端的连接;
  • 当有客户端连接时,就创建客户端连接,并启动子进程与该客户端进行网络通信;
  • 子进程在客户端断开连接或出错时,就会退出;

2.3 客户端测试

可以看到将客户端发送次数调大后,开启的客户端越多,服务端启动的子进程也就会越多;

此时,可以看到服务端每个进程的CPU使用率并不是很高;

但是随着客户端数量越来越多,服务端进程数量超过CPU核数时,就会增加系统的负担;

四、总结


本文主要介绍了基于多进程架构的网络服务器的设计与实现,在多进程架构中每个客户端会有一个服务端的进程专门处理通信,增加了对客户端消息的响应效率,提升了并发处理能力。

结尾


非常感谢大家的支持,在浏览的同时别忘了留下您宝贵的评论,如果觉得值得鼓励,请点赞,收藏,我会更加努力!

作者邮箱:study@senllang.onaliyun.com
如有错误或者疏漏欢迎指出,互相学习。

注:未经同意,不得转载!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2230360.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

vue3+vant实现视频播放(含首次禁止进度条拖拽,视频看完后恢复,保存播放视频进度,刷新及下次进入继续播放,判断视频有无全部看完等)

1、效果图 2、 <div><videocontrolsclass"video_player"ref"videoPlayer":src"videoSrc"timeupdate"handleTimeUpdate"play"onPlay"pause"onPause"ended"onVideoEnded"></video><…

C++设计模式创建型模式———简单工厂模式、工厂方法模式、抽象工厂模式

文章目录 一、引言二、简单工厂模式三、工厂方法模式三、抽象工厂模式四、总结 一、引言 创建一个类对象的传统方式是使用关键字new &#xff0c; 因为用 new 创建的类对象是一个堆对象&#xff0c;可以实现多态。工厂模式通过把创建对象的代码包装起来&#xff0c;实现创建对…

【数据库系统概论】第3章 关系数据库标准语言SQL(二)数据查询(超详细)

目录 一、单表查询 1. 简单的数据查询 &#xff08;1&#xff09;选择表中若干列 &#xff08;2&#xff09;选择表中若干行&#xff08;元祖&#xff09; 2. 聚合函数与分组查询 聚集函数 GROUP BY分组查询 二、联接查询 1、连接概述 2. 内联接&#xff08;INNER JO…

Android Framework AMS(10)广播组件分析-1

该系列文章总纲链接&#xff1a;专题总纲目录 Android Framework 总纲 本章关键点总结 & 说明&#xff1a; 说明&#xff1a;本章节主要解读应用层广播组件的发送广播和接收处理广播 2个过程&#xff0c;以及从APP层到AMS调用之间的打通。关注思维导图中左上部分即可。 有…

磁盘空间不足导致postgreSQL启动失败

背景&#xff1a; 智慧庭审平台安装了ivr/xvr等vr应用后&#xff0c;磁盘空间不足导致postgreSQL数据库一直重启 排查 到服务器下使用 systemctl status hik.postgresql96linux64.rdbms.1.service 查看进程报错信息 这次报的是 FATAL: could not write lock file "po…

C++进阶:C++11的新特性

✨✨所属专栏&#xff1a;C✨✨ ✨✨作者主页&#xff1a;嶔某✨✨ C11的发展历史 2011年&#xff0c;C标准委员会发布了C11标准&#xff0c;这是C的一次巨大飞跃&#xff0c;引入了许多重要的新特性&#xff0c;如智能指针、lambda表达式、并发编程支持等。这一版本的发布对C社…

LongVU :Meta AI 的解锁长视频理解模型,利用自适应时空压缩技术彻底改变视频理解方式

Meta AI在视频理解方面取得了令人瞩目的里程碑式成就&#xff0c;推出了LongVU&#xff0c;这是一种开创性的模型&#xff0c;能够理解以前对人工智能系统来说具有挑战性的长视频。 研究论文 "LongVU&#xff1a;用于长视频语言理解的时空自适应压缩 "提出了一种革命…

Ubuntu 22.04安装部署

一、部署环境 表 1‑1 环境服务版本号系统Ubuntu22.04 server lts运行环境1JDK1.8前端WEBNginx1.8数据库postgresqlpostgresql13postgis3.1pgrouting3.1消息队列rabbitmq3.X(3.0以上)运行环境2erlang23.3.3.1 二、安装系统 2.1安装 1.安装方式&#xff0c;选第一条。 2.选择…

无需手动部署的正式版comfyUI是否就此收费?开源等同免费?

​ ​ 关于ComfyUI的正式版是否会收费的问题是很多AI玩家都关心的问题。 一旦ComfyUI正式版发布&#xff0c;我们是否需要为它买单&#xff1f;不再开源 同时这也引出了一个核心问题&#xff1a;开源究竟等不等于免费&#xff1f; ComfyUI正式版到底是什么&#xff1f;它会收…

云计算作业二Spark:问题解决备忘

安装spark 教程源地址&#xff1a;https://blog.csdn.net/weixin_52564218/article/details/141090528 镜像下载 教程给的官网下载地址很慢&#xff0c;https://archive.apache.org/dist/spark/spark-3.1.1/ 这里的镜像快很多&#xff1a; 清华软件源&#xff1a;https://mi…

C语言 | Leetcode C语言题解之第524题通过删除字母匹配到字典里最长单词

题目&#xff1a; 题解&#xff1a; char * findLongestWord(char * s, char ** d, int dSize){char *result "";int max -1;for (int i 0; i < dSize; i) {char *p s, *q d[i];int j 0, k 0;while (p[j] ! \0 && q[k] ! \0) {if (p[j] q[k]) {k…

【含文档】基于ssm+jsp的学科竞赛系统(含源码+数据库+lw)

1.开发环境 开发系统:Windows10/11 架构模式:MVC/前后端分离 JDK版本: Java JDK1.8 开发工具:IDEA 数据库版本: mysql5.7或8.0 数据库可视化工具: navicat 服务器: apache tomcat 主要技术: Java,Spring,SpringMvc,mybatis,mysql,vue 2.视频演示地址 3.功能 系统定义了四个…

【5.5】指针算法-三指针解决颜色分类

一、题目 给定一个包含红色、白色和蓝色&#xff0c;一共n个元素的数组&#xff0c;原地对它们进行排序&#xff0c;使得相同颜色的元素相邻&#xff0c;并按照红色、白色、蓝色顺序排列。 此题中&#xff0c;我们使用整数0、1和2分别表示红色、白色和蓝色。 示例 1&#xff1…

双向链表及如何使用GLib的GList实现双向链表

双向链表是一种比单向链表更为灵活的数据结构&#xff0c;与单向链表相比可以有更多的应用场景&#xff0c;本文讨论双向链表的基本概念及实现方法&#xff0c;并着重介绍使用GLib的GList实现单向链表的方法及步骤&#xff0c;本文给出了多个实际范例源代码&#xff0c;旨在帮助…

web——warmup——攻防世界

这道题还是没有做出来。。&#xff0c;来总结一下 1.ctrlU显示源码 2.看见body里有source.php 打开这个source.php 看见了源码 highlight_file(FILE); 这行代码用于高亮显示当前文件的源码&#xff0c;适合调试和学习&#xff0c;但在生产环境中通常不需要。 class emmm 定义…

【MATLAB代码】三个CT模型的IMM例程,各CT旋转速率不同,适用于定位、导航、目标跟踪

三个CT模型&#xff0c;各CT模型下的运动旋转速率不同&#xff0c;适用于定位、导航、目标跟踪 文章目录 代码构成运行结果源代码代码讲解概述代码结构1. 初始化2. 仿真参数设置3. 生成量测数据4. IMM迭代5. 绘图 主要功能函数部分1. 卡尔曼滤波函数2. 模型综合函数3. 模型概率…

sklearn 实现随机森林分类器 - python 实现

python sklearn 实现随机森林分类器 from sklearn.ensemble import RandomForestClassifier from sklearn.datasets import load_iris # 加载数据集 irisload_iris() x,yiris.data,iris.target print("x y shape:",x.shape,y.shape) # 创建并训练模型 model Random…

GetX的一些高级API

目录 前言 一、一些常用的API 二、局部状态组件 1.可选的全局设置和手动配置 2.局部状态组件 1.ValueBuilder 1.特点 2.基本用法 2.ObxValue 1.特点 2.基本用法 前言 这篇文章主要讲解GetX的一些高级API和一些有用的小组件。 一、一些常用的API GetX提供了一些高级…

WPF+MVVM案例实战(十一)- 环形进度条实现

文章目录 1、运行效果2、功能实现1、文件创建与代码实现2、角度转换器实现3、命名空间引用3、源代码下载1、运行效果 2、功能实现 1、文件创建与代码实现 打开 Wpf_Examples 项目,在Views 文件夹下创建 CircularProgressBar.xaml 窗体文件。 CircularProgressBar.xaml 代码实…

SYN590RH

一般描述 SYN590RH是SYNOXO全新开发设计的一款宽电压范围&#xff0c;低功耗&#xff0c;高性能&#xff0c;无需外置AGC电容&#xff0c;灵敏度达到典型-110 dBm,400MHz~450MHz频率范围应用的单芯片ASK或00 K射频接收器。 SYN590RH是一款典型的即插即用型单片高…