使用rust写一个Web服务器——多线程版本

news2024/10/3 11:15:30

文章目录

    • 模拟慢请求
    • 多线程Web服务器实现
      • 为每个请求单独生成一个线程
      • 限制创建线程的数量
      • ThreadPool的初始化
      • ThreadPool的存储
      • ThreadPool的设计
    • 关闭和资源清理
      • 为ThreadPool实现Drop
      • 停止工作线程
      • 测试

仓库地址: 1037827920/web-server: 使用rust编写的简单web服务器 (github.com)

模拟慢请求

一个单线程版本的web服务器只能一次处理一个请求,可是如果一个请求持续的时间太长,就会导致其他请求有可能饥饿,下面使用sleep方式让每次请求持续5s,模拟真实的慢请求:

use std::{
    fs,
    io::{prelude::*, BufReader},
    net::{TcpListener, TcpStream},
    thread,
    time::Duration,
};

fn main() {
    // 监听本地8080端口
    let listener = TcpListener::bind("localhost:8080").unwrap();
    
    for stream in listener.incoming() {
        let stream = stream.unwrap();
        
        // 处理连接
        handle_connection(stream);
    }
}

fn handle_connection(mut stream: TcpStream) {
	let buf_reader = BufReader::new(&mut stream);
    // 使用next而不是lines,因为我们只需要读取第一行,判断具体的request方法
    let request_line = buf_reader.lines().next().unwrap().unwrap();
    
	// match方法不会像之前的方法那样自动做引用或解引用,因此我们需要显式调用
    let (status_line, filename) = match &request_line[..] {
        "GET / HTTP/1.1" => ("HTTP/1.1 200 OK", "hello.html"), // 请求 / 资源
        "GET /sleep HTTP/1.1" => { // 请求 /sleep 资源
            thread::sleep(Duration::from_secs(5));
            ("HTTP/1.1 200 OK", "hello.html")
        }
        _ => ("HTTP/1.1 404 NOT FOUND", "404.html"),
    };

    // 读取文件内容
    let contents = fs::read_to_string(filename).unwrap();
    let length = contents.len();

    // 格式化HTTP Response
    let response =
        format!("{status_line}\r\nContent-Length: {length}\r\n\r\n{contents}");

    // 将response写入stream
    stream.write_all(response.as_bytes()).unwrap();
}

运行代码后访问localhost:8080/sleep,然后紧接着继续运行localhost:8080,会发现后者的请求必须等待前者完成后才能被处理,下面使用线程池改善吞吐量

多线程Web服务器实现

线程池: 包含一组已经生成的线程,它们时刻等待着接收并处理新的任务,当程序接收到新任务时,它会将线程池中的一个线程指派给该任务,在该线程忙着处理时,新来的任务交给池中剩余的线程进行处理,最终,当执行任务的线程处理完后,它会被重新放入到线程池中,准备处理新任务。注意: 需要限制线程池中的线程数量,以保护服务器免受拒绝服务攻击(DoS)的影响:如果针对每个请求创建一个新线程,那么一个人向我们的服务器发出1000万个请求,会直接耗尽资源,导致后续用户的请求无法被处理,这也是拒绝服务名称的来源。

因此,需要对线程池进行一定的架构设计,首先是设定最大线程数的上限,其次是维护一个请求队列。池中的线程去队列中依次弹出请求并处理。

为每个请求单独生成一个线程

修改main函数,每次处理一个任务就创建一个新的线程并执行任务

use std::{
    fs,
    io::{prelude::*, BufReader},
    net::{TcpListener, TcpStream},
    thread,
    time::Duration,
};

fn main() {
	let listener = TcpListener::bind("localhost:8080").unwrap();
    
    for stream in listener.incoming() {
        let stream = stream.unwrap();
        
        thread::spawn(|| {
            handle_connection(stream);
        });
    }
}

fn handle_connection(mut stream: TcpStream) {
	let buf_reader = BufReader::new(&mut stream);
    // 使用next而不是lines,因为我们只需要读取第一行,判断具体的request方法
    let request_line = buf_reader.lines().next().unwrap().unwrap();
    
	// match方法不会像之前的方法那样自动做引用或解引用,因此我们需要显式调用
    let (status_line, filename) = match &request_line[..] {
        "GET / HTTP/1.1" => ("HTTP/1.1 200 OK", "hello.html"), // 请求 / 资源
        "GET /sleep HTTP/1.1" => { // 请求 /sleep 资源
            thread::sleep(Duration::from_secs(5));
            ("HTTP/1.1 200 OK", "hello.html")
        }
        _ => ("HTTP/1.1 404 NOT FOUND", "404.html"),
    };

    // 读取文件内容
    let contents = fs::read_to_string(filename).unwrap();
    let length = contents.len();

    // 格式化HTTP Response
    let response =
        format!("{status_line}\r\nContent-Length: {length}\r\n\r\n{contents}");

    // 将response写入stream
    stream.write_all(response.as_bytes()).unwrap();
}

这样简单粗暴就能实现多线程的Web服务器,但是这样不能达到限制线程池中线程数的效果

限制创建线程的数量

利用线程池,继续修改main函数

fn main() {
	let listener = TcpListener::bind("localhost:8080").unwrap();
	// 首先创建一个包含4个线程的线程池
    let pool = ThreadPool::new(4);
    
    for stream in listener.incoming() {
        let stream = stream.unwrap();
        
        // 分发执行请求
        pool.execute(|| {
            handle_connection(stream)
        });
    }
}

可以看出,我们至少要实现ThreadPool这个结构体和execute方法

ThreadPool的初始化

首先要确定使用new还是build来初始化ThreadPool实例,new往往用于简单初始化一个实例,而build往往会完成更加复杂的构建工作,我们并不需要在初始化线程池的同时创建相应的线程,因此new更合适。

在src/lib.rs写入以下代码:

pub struct ThreadPool;

impl ThreadPool {
    /// # 函数功能
    /// 创建一个新的线程池
    pub fn new(size: usize) -> ThreadPool {
        assert!(size > 0);

        ThreadPool
    }
    
    /// # 函数功能
    /// 执行传入的函数f
    pub fn execute<F>(&self, f: F)
    where
    	F: FnOnce() + Send + 'static
    {
        todo!();
    }
}

在src/main.rs中导入lib.rs的ThreadPool:

use <project_name>::ThreadPool;

ThreadPool的存储

ThreadPool作为一个线程池,肯定是要能够存储线程的对吧,继续修改ThreadPool,添加threads字段,使其能够存储线程

use std::thread::{self, Thread};

pub struct ThreadPool {
    threads: Vec<thread::JoinHandle<()>>,
}

impl ThreadPool {
    /// # 函数功能
    /// 创建一个新的线程池
    pub fn new(size: usize) -> ThreadPool {
        assert!(size > 0);
        
        // 使用with_capacity可以提前分配好内存空间,比Vec::new的性能好
        let mut threads = Vec::with_capacity(size);

        for _ in 0..size {
            // 创建线程并将其存储在vector中
            todo!();
        }

        ThreadPool { threads }
    }
    
    /// # 函数功能
    /// 执行传入的函数f
    pub fn execute<F>(&self, f: F)
    where
    	F: FnOnce() + Send + 'static
    {
        todo!();
    }
}

ThreadPool的设计

使用thread::spawn是生成线程的最好方式,但是它会立即执行传入的任务,我们需要的是创建线程和执行任务是要分离的。也就是说,我们可以先创建线程后这个线程就进入loop循环等待,直到有执行任务的信号过来这个线程才会执行任务。

可以考虑创建一个Worker结构体,存放id和对应的线程。作为ThreadPool和任务线程联系的桥梁,通过channel,ThreadPool持有Sender,通过execute方法将任务发送给Worker,而Worker持有Receiver,在loop循环中接收ThreadPool发送过来的任务。

ThreadPool结构体:

use std::{
    sync::{mpsc, Arc, Mutex},
    thread,
};

pub struct ThreadPool {
    workers: Vec<Worker>,
    sender: mpsc::Sender<Job>,
}

impl ThreadPool {
    /// # 函数功能
    /// 创建一个新的线程池
    pub fn new(size: usize) -> ThreadPool {
        assert!(size > 0);
        
        // 获得Sender和Receiver
        let (sender, receiver) = mpsc::channel();
        
        // receiver会在多线程中移动,因此要保证线程安全,需要使用Arc和Mutex。Arc可以允许多个Worker同时持有Receiver,而Mutex可以确保一次只有一个Worker能从Receiver中获取任务,防止任务被多次执行
        let receiver = Arc::new(Mutex::new(receiver));
        
        let mut workers = Vec::with_capacity(size);
        
        for id in 0..size {
            workers.push(Worker::new(id, Arc::clone(&receiver)));
        }
        
        ThreadPool { workers, sender }
    }    
    /// # 函数功能
    /// 执行传入的函数f
    pub fn execute<F>(&self, f: F) 
    where
    	F: FnOnce() + Send + 'static
    {
        let job = Box::new(f);
        // Sender往通道中发送任务
        self.sender.send(job).unwrap();
    }
}

Worker结构体:

// 闭包的大小编译是未知的,使用Box可以在堆上动态分配内存,从而存储闭包
type Job = Box<dyn FnOnce() + Send + 'static>;

struct Worker {
    id: usize,
    thread: thread::JoinHandle<()>,
}

impl Worker {
    fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
        let thread = thread::spawn(move || loop {
            // Receiver会阻塞直到有任务
            let job = receiver.lock().unwrap().recv().unwrap();
            
            println!("Workder {id} got a job; executing");
            // 执行任务
            job();
        });
        // 让每个Worker都拥有自己的唯一id
        Worker { id, thread }
    }
}

关闭和资源清理

为ThreadPool实现Drop

当线程池被Drop时,需要等待所有的子线程完成它们的工作,然后再退出:

struct Worker {
    id: usize,
    // 因为Worker中的thread字段的JoinHandle类型没有实现copy trait,可以修改Worker的thread字段,使用Option,然后通过take可以拿走内部值的所有权,同时留下一个None
    thread: Option<thread::JoinHandle<()>>,
}
impl Worker {
    fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
        let thread = thread::spawn(move || loop {
            let job = receiver.lock().unwrap().recv().unwrap();
            
            println!("Workder {id} got a job; executing");
            job();
        });
        // 让每个Worker都拥有自己的唯一id
        Worker { 
            id, 
            thread: Some(thread)
        }
    }
}

impl Drop for ThreadPool {
    fn drop(&mut self) {
        for worker in &mut self.workers {
            println!("Shuting down worker {}", worker.id);
            if let Some(thread) = worker.thread.take() {
                thread.join().unwrap();
            }
        }
    }
}

停止工作线程

虽然调用了join,但是目标线程依然不会停止,原因在于它们在无限地loop循环等待,需要channel的drop机制:释放sender后,receiver会收到错误,然后再退出

pub struct ThreadPool {
    workers: Vec<Worker>,
    // 增加Option封装,这样可以用take拿走所有权
    sender: Option<mpsc::Sender<Job>>,
}

impl ThreadPool {
    pub fn new(size: usize) -> ThreadPool {
        assert!(size > 0);
        
        let (sender, receiver) = mpsc::channel();
        
        let receiver = Arc::new(Mutex::new(receiver));
        
        let mut workers = Vec::with_capacity(size);
        
        for id in 0..size {
            workers.push(Worker::new(id, Arc::clone(&receiver)));
        }
        
        ThreadPool { 
            workers, 
            sender: Some(sender)
        }
    }
    pub fn execute<F>(&self, f: F) 
    where
    	F: FnOnce() + Send + 'static
    {
        let job = Box::new(f);
        self.sender.as_ref().unwrap().send(job).unwrap();
    }
}

impl Drop for ThreadPool {
    fn drop(&mut self) {
        // 主动调用drop关闭sender
        drop(self.sender.take());
        
        for worker in &mut self.workers {
            println!("Shuting down worker {}", worker.id);
            if let Some(thread) = worker.thread.take() {
                thread.join().unwrap();
            }
        }
    }
}

当sender被关闭后,将关闭对应的channel,所以loop的receiver就会收到一个错误,根据错误再进一步的错误:

impl Worker {
    fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
        let thread = thread::spawn(move || loop {
            let message = receiver.lock().unwrap().recv();
            
            match message {
                Ok(job) => {
                    println!("Worker {id} got a job; executing");
                    job();
                }
                Err(_) => {
                    println!("Worker {id} disconnected; shutting down.");
                    break;
                }
            }
        });
        
        Worker {
            id,
            thread: Some(thread),
        }
    }
}

测试

为了验证代码的正确性,修改main:

fn main() {
    let listener = TcpListener::bind("localhost:8080").unwrap();
    let pool = ThreadPool::new(4);

    for stream in listener.incoming().take(2) {
        let stream = stream.unwrap();

        pool.execute(|| {
            handle_connection(stream);
        });
    }

    println!("Shutting down.");
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2186099.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

①EtherCAT转ModbusTCP, EtherCAT/Ethernet/IP/Profinet/ModbusTCP协议互转工业串口网关

EtherCAT/Ethernet/IP/Profinet/ModbusTCP协议互转工业串口网关https://item.taobao.com/item.htm?ftt&id822721028899 协议转换通信网关 EtherCAT 转 ModbusTCP GW系列型号 MS-GW15 简介 MS-GW15 是 EtherCAT 和 Modbus TCP 协议转换网关&#xff0c;为用户提供一种 …

map_set的使用

map_set的使用 关联式容器树形结构的关联式容器setset的介绍set的使用 multisetmultiset的介绍multiset的使用 mapmap的介绍map的使用键值对 multimapmultimap的介绍 &#x1f30f;个人博客主页&#xff1a;个人主页 关联式容器 在初阶阶段&#xff0c;我们已经接触过STL中的部…

黑科技外绘神器:一键扩展图像边界

黑科技外绘神器&#xff1a;一键扩展图像边界 Diffusers Image Outpaint✨是一个开源工具&#xff0c;能智能扩展图像边界&#xff0c;创造完美视觉效果&#x1f3de;️。用户可自定义风格&#xff0c;生成高清图像&#x1f929;&#xff0c;应用场景广泛&#xff0c;释放你的…

大模型~合集6

我自己的原文哦~ https://blog.51cto.com/whaosoft/11566566 # 深度模型融合&#xff08;LLM/基础模型/联邦学习/微调等&#xff09; 23年9月国防科大、京东和北理工的论文“Deep Model Fusion: A Survey”。 深度模型融合/合并是一种新兴技术&#xff0c;它将多个深度学习模…

爬虫——爬取小音乐网站

爬虫有几部分功能&#xff1f;&#xff1f;&#xff1f; 1.发请求&#xff0c;获得网页源码 #1.和2是在一步的 发请求成功了之后就能直接获得网页源码 2.解析我们想要的数据 3.按照需求保存 注意&#xff1a;开始爬虫前&#xff0c;需要给其封装 headers {User-…

本地化测试对游戏漏洞修复的影响

本地化测试在游戏开发的质量保证过程中起着至关重要的作用&#xff0c;尤其是在修复bug方面。当游戏为全球市场做准备时&#xff0c;它们通常会被翻译和改编成各种语言和文化背景。这种本地化带来了新的挑战&#xff0c;例如潜在的语言错误、文化误解&#xff0c;甚至是不同地区…

C++ 双端队列(deque)的深入理解

前言&#xff1a; 双端队列deque看起来是一个相当牛的容器&#xff0c;表面看起来将list和vector进行结合起来&#xff0c;形成了一个看起来很完美的容器&#xff0c;但是事实不是这样&#xff0c;要是deque如此完美&#xff0c;数据结构也就没list和vector的事情了&#xff0c…

多系统萎缩患者必看!这些维生素助你对抗病魔

亲爱的朋友们&#xff0c;今天我们来聊聊一个相对陌生但重要的健康话题——多系统萎缩&#xff08;MSA&#xff09;。这是一种罕见的神经系统疾病&#xff0c;影响着患者的自主神经系统、运动系统和平衡功能。面对这样的挑战&#xff0c;科学合理的饮食和营养补充显得尤为重要。…

暴力数据结构——AVL树

1.认识AVL树 AVL树最先发明的⾃平衡⼆叉查找树,AVL可以是⼀颗空树,或者具备下列性质的⼆叉搜索树&#xff1a; • 它的左右⼦树都是AV树&#xff0c;且左右⼦树的⾼度差的绝对值不超过1 • AVL树是⼀颗⾼度平衡搜索⼆叉树&#xff0c; 通过控制⾼度差去控制平衡 AVL树整体结点…

路由交换实验指南

案例 01&#xff1a;部署使用 eNSP 平台实验需求&#xff1a; 安装华为 eNSP 网络模拟平台打开 eNSP 平台&#xff0c;新建拓扑并绘制网络能够成功启动交换机、计算机设备 实验步骤&#xff1a; 安装华为 eNSP 网络模拟平台启动安装程序 配置安装内容 防护墙允许 eNSP 程序的…

IDTL:茶叶病害识别数据集(猫脸码客 第205期)

Identifying Disease in Tea Leaves茶叶病害识别数据集 一、引言 在农业领域&#xff0c;茶叶作为一种重要的经济作物&#xff0c;其生产过程中的病害防治是确保茶叶质量和产量的关键环节。然而&#xff0c;传统的病害识别方法主要依赖于人工观察和经验判断&#xff0c;这不仅…

从零开始实现RPC框架---------项目介绍及环境准备

一&#xff0c;介绍 RPC&#xff08;Remote Procedure Call&#xff09;远程过程调⽤&#xff0c;是⼀种通过⽹络从远程计算机上请求服务&#xff0c;⽽不需要 了解底层⽹络通信细节。RPC可以使⽤多种⽹络协议进⾏通信&#xff0c; 如HTTP、TCP、UDP等&#xff0c; 并且在 TCP/…

匿名方法与Lambda表达式+泛型委托

匿名方法 和委托搭配使用&#xff0c;方便我们快速对委托进行传参&#xff0c;不需要我们定义一个新的函数&#xff0c;直接用delegate关键字代替方法名&#xff0c;后面跟上参数列表与方法体。 格式&#xff1a;delegate(参数列表){方法体} lambda表达式 是匿名方法的升级…

Brave编译指南2024 MacOS篇-环境配置(四)

引言 在上一篇文章中&#xff0c;我们成功获取了Brave浏览器的源代码。现在&#xff0c;我们将进入编译过程的关键阶段&#xff1a;环境配置。正确的环境配置对于成功编译Brave浏览器至关重要&#xff0c;它能确保所有必要的工具和依赖项都已就位&#xff0c;并且版本兼容。 …

JAVAIDEA初始工程的创建

四结构 建工程综述* 初始*&#xff1a; 1、先建个空项目&#xff0c; 2、打开文件中的项目结构新建module模块&#xff08;模块下有src&#xff09; 修改模块名&#xff1a; 也是Refactor&#xff0c;Rename&#xff0c;但是要选第三个同时改模块和文件夹名字 导入模块&am…

【Python】ftfy 使用指南:修复 Unicode 编码问题

ftfy&#xff08;fixes text for you&#xff09;是一个专为修复各种文本编码错误而设计的 Python 工具。它的主要目标是将损坏的 Unicode 文本恢复为正确的 Unicode 格式。ftfy 并非用于处理非 Unicode 编码&#xff0c;而是旨在修复因为编码不一致、解码错误或混合编码导致的…

【Python】path:简化文件路径处理的 Python 库

path 是一个 Python 库&#xff0c;提供了对文件系统路径的简洁抽象&#xff0c;使文件和目录操作更加直观和 Pythonic。该库建立在 pathlib 的基础上&#xff0c;扩展了文件路径处理的功能&#xff0c;使得开发者能够更高效地进行文件操作&#xff0c;如文件读写、目录遍历、路…

Redis缓存穿透雪崩击穿及解决

封装缓存空对象解决缓存穿透与逻辑过期解决缓存击穿工具类 Slf4j Component public class CacheClient {private final StringRedisTemplate stringRedisTemplate;public CacheClient(StringRedisTemplate stringRedisTemplate) {this.stringRedisTemplate stringRedisTemplat…

《Linux从小白到高手》理论篇(十一):Linux的系统环境管理

值此国庆佳节&#xff0c;深宅家中&#xff0c;闲来无事&#xff0c;就多写几篇博文。本篇详细深入介绍Linux的系统环境管理。 环境变量 linux系统下&#xff0c;如果你下载并安装了应用程序&#xff0c;很有可能在键入它的名称时出现“command not found”的提示内容。如果每…

震撼!AI造声新标杆,20字生成完美音频

震撼&#xff01;AI造声新标杆&#xff0c;20字生成完美音频 EzAudio是一款革命性的文本到音频生成AI&#x1f3b6;&#xff0c;快速生成高质量音频&#xff0c;告别机械音&#x1f50a;。它能将文字瞬间变成音乐和配音&#xff0c;为创作增添无限可能✨&#xff01;快来体验这…