【Rust基础】使用Rocket构建基于SSE的流式回复

news2025/4/19 3:02:32

背景

我们正在使用Rust开发基于RAG的知识库系统,其中对于模型的回复使用了常用的SSE,Web框架使用Rocket,Rocket提供了一个简单的方式支持SSE,但没有会话保持、会话恢复等功能,因此我们自己简单实现这两个功能。

在这里插入图片描述

使用Rocket推送消息流

以下,是Rocket给出的示例:

#[get("/text/stream")]
fn stream() -> TextStream![String] {
    TextStream! {
        let mut interval = time::interval(Duration::from_secs(1));
        for i in 0..10 {
            yield format!("n: {}", i);
            interval.tick().await;
        }
    }
}

我们需要改造这个示例,以满足将模型回复的消息推送给前端的需求。

首先,对于既然推流,需要知道将流推送给谁,也就是要推送到哪个会话中,所以我们在发起会话的时候,需要一个会话ID来标识一个唯一的会话。

我们使用sse.js这个库作为SSE的客户端,用于发起SSE连接,该库可以通过发起POST请求来建立连接,可以携带额外的数据和请求头。

使用以下结构来接收一个对话请求:

pub struct ChatMessageReq {
    /// 会话ID
    pub session_id: String,
    /// 消息内容
    pub content: String,
}

于是我们的接口需要修改为这样:

#[post("/chat", data = "<req>")]
async fn question(
    req: Json<ChatMessageReq>) -> (ContentType, TextStream<impl Stream<Item = String>>) 
{
	//TODO
}

其中TextStream<impl Stream<Item = String>>等价于TextStream![String]

需要注意的是,如果返回值没有指定ContentType,那么Rocket默认响应的ContentType是文本类型,而非stream类型,会导致前端无法解析。

接下来我们实现会话管理功能。

我们定义了一个名为SsePool的struct,来存储并管理SSE连接:

struct SsePool {
    /// 消息流传输通道
    channel: DashMap<String, Sender<SseEvent>>,
}

impl SsePool {
    /// 初始化连接池
    pub fn init() -> Self {
        Self {
            channel: DashMap::new(),
        }
    }

    /// 移除连接
    fn remove(&self, id: &String) {
        if let Some((_, sender)) = self.channel.remove(id) {
            drop(sender);
        }
    }

    /// 获取连接
    fn get_sender(&self, id: &String) -> Option<Sender<SseEvent>> {
        self.channel.get(id).map(|v| v.value().clone())
    }

    /// 新建channel
    pub fn new_channel(&self, id: String) -> (Sender<SseEvent>, Receiver<SseEvent>) {
        let (sender, receiver) = tokio::sync::mpsc::channel(10_0000);

        // 获取并移除旧sender
        let old_sender = self.channel.remove(&id).map(|(_, s)| s);

        // 插入新sender
        self.channel.insert(id, sender.clone());

        // 处理旧sender
        if let Some(old_sender) = old_sender {
            tokio::spawn(async move {
                // 发送终止信号
                let _ = old_sender.send(SseEvent::Abort).await;
            });
        }

        (sender, receiver)
    }

    /// 发送消息
    pub async fn send_message(&self, id: &String, message: ChatMessage) {
        if let Some(sender) = self.get_sender(id) {
            if let Err(e) = sender.send(SseEvent::ChatMessage(message)).await {
                log::warn!("消息发送失败,session id: {},失败原因:{}", &id, e);
            };
            // drop(sender);
        }
    }
}

其中channel使用的是tokio中mpsc的channel。

值得注意的是,new_channel中,新建连接时,需要向channel发送一条终止事件,确保已有的receiver关闭,返回新的receiver,这一点是用于后续的会话恢复使用。new_channel会返回receiver和sender,用于消息接收和发送。

当收到模型回复是,调用SsePool::send_message发送消息到channel,再头通过receiver接收消息,转发到前端。

可以把它初始化到静态变量中,方便全局调用:

static SSE_POOL: LazyLock<SsePool> = LazyLock::new(|| SsePool::init());

于是,我们的接口可以完善为以下内容:

#[post("/chat", data = "<req>")]
async fn question(
    req: Json<ChatMessageReq>,
) -> (ContentType, TextStream<impl Stream<Item = String>>)
 {
    // 请求新消息,并返回receiver
    let (_, _, mut receiver) = service::new_message(req).await.unwrap();
    let stream = TextStream! {
    	// 持续接收receiver的消息,然后推送到前端
        while let Some(item) = receiver.recv().await {
            match item{
                //模型回复的消息
                SseEvent::ChatMessage(message) => {
                    yield SseEvent::ChatMessage(message.clone()).to_message();
                    if SseEvent::is_done(&message) {
                    	// 推送消息
                        yield SseEvent::Abort.to_message();
                        break;
                    }
                },
                // 关闭通道
                SseEvent::Abort => {
                    yield SseEvent::Abort.to_message();
                    break;
                },
                _ => {}
            }
        }
        yield SseEvent::Abort.to_message();
        drop(receiver);
    };

    (ContentType::EventStream, stream)
}

至此,新会话的接口就完成了。

接下来是会话的恢复。

当前端切换会话或刷新页面时,我们希望能够继续收到未回复完成的消息,所以需要一个用于会话恢复的接口。同样的,接口需要会话ID来区分恢复哪一个会话。

#[post("/resume-stream", data = "<req>")]
async fn resume_stream(
    req: Json<ResumeStreamReq>,
) -> (ContentType, TextStream<impl Stream<Item = String>>)
 {
    // 会话ID
    let session_id = req.session_id.clone();
    
    let stream = TextStream! {
    	// 尝试恢复会话,并返回receiver,如果能够返回receiver说明会话未完成,否则已经完成
        if let Some(mut receiver) = service::resume_stream(&req.session_id)
            .await
            .unwrap()
        {
        	// 持续接收未回复完成的消息
            while let Some(item) = receiver.recv().await {
                match item {
                    // 模型回复的消息
                    SseEvent::ChatMessage(message) => {
                        yield SseEvent::ChatMessage(message.clone()).to_message();
                        if SseEvent::is_done(&message) {
                            yield SseEvent::Abort.to_message();
                            break;
                        }
                    }
                    // 关闭通道
                    SseEvent::Abort => {
                        yield SseEvent::Abort.to_message();
                        break;
                    }
                    _ => {}
                }
            }
            drop(receiver);
        }
        yield SseEvent::Abort.to_message();
    };

    (ContentType::EventStream, stream)
}

service::resume_stream中,首先检查对应会话ID的channel是否存在,存在则新建channel返回receiver,不存在则表明已经回复完成。

pub async fn resume_stream(
    session_id: &String,
) -> AppResult<Option<Receiver<SseEvent>>> {
    if let None = chat::get_connection(session_id) {
        return Ok(None);
    }
    // 获取会话对应的channel,如果channel存在则标识消息仍在回复中
    let (_, receiver) = chat::new_connection(session_id);
    Ok(Some(receiver))
}

至此,便实现了会话恢复,刷新页面后仍能后接收strem消息。

总结

使用Rust写这些业务代码的速度,终归是没有Java快,一些常用的库,没有Java系列封装的简单易用,不过应用占用资源确实比Java小很多。

本次使用的一些库:

  • tokio:异步运行环境,以及mpsc的channel,
  • dashmap:支持并发的hashmap,但是使用不当容易造成死锁。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2337677.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

大前端基础学习

一、cs架构和bs架构 c&#xff1a;客户端&#xff0c; b&#xff1a;浏览器&#xff08;无需安装&#xff0c;无需更新&#xff0c;可跨平台&#xff09;√ s&#xff1a;server服务端&#xff0c;帮我们保 存信息&#xff0c;传递信息 二、 altshift向下键向下复制一行 …

Axios 的 POST 请求:QS 处理数据的奥秘与使用场景解析

在现代前端开发中&#xff0c;Axios 已经成为了进行 HTTP 请求的首选库之一&#xff0c;它的简洁易用和强大功能深受开发者喜爱。当使用 Axios 进行 POST 请求时&#xff0c;我们常常会遇到一个问题&#xff1a;是否需要使用 QS 库来处理请求数据&#xff1f;什么时候又可以不用…

Linux 防火墙( iptables )

目录 一、 Linux 防火墙基础 1. 防火墙基础概念 &#xff08;1&#xff09;防火墙的概述与作用 &#xff08;2&#xff09;防火墙的结构与匹配流程 &#xff08;3&#xff09;防火墙的类别与各个防火墙的区别 2. iptables 的表、链结构 &#xff08;1&#xff09;规则表 …

【redis进阶三】分布式系统之主从复制结构(1)

目录 一 为什么要有分布式系统&#xff1f; 二 分布式系统涉及到的非常关键的问题&#xff1a;单点问题 三 学习部署主从结构的redis (1)创建一个目录 (2)进入目录拷贝两份原有redis (3)使用vim修改几个选项 (4)启动两个从节点服务器 (5)建立复制&#xff0c;要想配…

EM储能网关ZWS智慧储能云应用(9) — 远程OTA升级

ZWS智慧储能云平台支持远程OTA固件升级&#xff0c;可以针对具体的储能设备进行升级&#xff0c;升级储能网关、EMS主控软件、PCS、BMS等。 简介 储能系统通常高度集成化&#xff0c;一体化设计&#xff0c;将EMS、BMS&#xff08;电池管理系统&#xff09;、PCS&#xff08…

ubuntu24.04LTS安装向日葵解决方案

去向日葵官方下载ubuntu使用的deb包 向日葵 输入如下命令安装&#xff0c;将具体版本修改成自己下载的版本 andrew in ~/下载 λ sudo dpkg -i SunloginClient_15.2.0.63064_amd64.deb 正在选中未选择的软件包 sunloginclient。 (正在读取数据库 ... 系统当前共安装有 290947…

达梦官方管理工具SQLark:自动识别外键约束、check约束与虚拟列,助力高效生成测试数据

在数据库管理和应用开发过程中&#xff0c;高质量的测试数据对于系统调试和POC测试至关重要。达梦官方推出的新一代管理工具 SQLark百灵连接&#xff0c;其数据生成功能&#xff0c;可以为应用开发者、DBA 以及测试人员带来极大便利&#xff0c;能够轻松应对各类复杂的测试场景…

不关“猫”如何改变外网IP?3种免重启切换IP方案

每次更换外网IP都要重启路由器&#xff1f;太麻烦了&#xff01;那么&#xff0c;不关猫怎么改变外网IP&#xff1f;无论是为了网络调试、爬虫需求&#xff0c;还是解决IP限制问题&#xff0c;频繁重启设备既耗时又影响效率。其实&#xff0c;更换外网IP并不一定要依赖“重启大…

C#进阶学习(五)单向链表和双向链表,循环链表(中)双向链表

目录 一、双向链表的声明 1. 节点类声明 2. 链表类框架声明 3、实现其中的每一个函数 增删操作&#xff08;核心方法组&#xff09; 删除操作&#xff08;核心方法组&#xff09; 查询操作&#xff08;辅助方法&#xff09; 维护方法&#xff08;内部逻辑&#xff09; …

重学Redis:Redis常用数据类型+存储结构(源码篇)

一、SDS 1&#xff0c;SDS源码解读 sds (Simple Dynamic String)&#xff0c;Simple的意思是简单&#xff0c;Dynamic即动态&#xff0c;意味着其具有动态增加空间的能力&#xff0c;扩容不需要使用者关心。String是字符串的意思。说白了就是用C语言自己封装了一个字符串类型&a…

js原型和原型链

js原型&#xff1a; 1、原型诞生的目的是什么呢&#xff1f; js原型的产生是为了解决在js对象实例之间共享属性和方法&#xff0c;并把他们很好聚集在一起&#xff08;原型对象上&#xff09;。每个函数都会创建一个prototype属性&#xff0c;这个属性指向的就是原型对象。 …

OpenHarmony - 小型系统内核(LiteOS-A)(五)

OpenHarmony - 小型系统内核&#xff08;LiteOS-A&#xff09;&#xff08;五&#xff09; 六、文件系统 虚拟文件系统 基本概念 VFS&#xff08;Virtual File System&#xff09;是文件系统的虚拟层&#xff0c;它不是一个实际的文件系统&#xff0c;而是一个异构文件系统之…

PyTorch进阶学习笔记[长期更新]

第一章 PyTorch简介和安装 PyTorch是一个很强大的深度学习库&#xff0c;在学术中使用占比很大。 我这里是Mac系统的安装&#xff0c;相比起教程中的win/linux安装感觉还是简单不少&#xff08;之前就已经安好啦&#xff09;&#xff0c;有需要指导的小伙伴可以评论。 第二章…

proteus8.17 环境配置

Proteus介绍 Proteus 8.17 是一款功能强大的电子设计自动化&#xff08;EDA&#xff09;软件&#xff0c;广泛应用于电子电路设计、仿真和分析。以下是其主要特点和新功能&#xff1a; ### 主要功能 - **电路仿真**&#xff1a;支持数字和模拟电路的仿真&#xff0c;包括静态…

Java对接Dify API接口完整指南

Java对接Dify API接口完整指南 一、Dify API简介 Dify是一款AI应用开发平台&#xff0c;提供多种自然语言处理能力。通过调用Dify开放API&#xff0c;开发者可以快速集成智能对话、文本生成等功能到自己的Java应用中。 二、准备工作 获取API密钥 登录Dify平台控制台在「API密…

极狐GitLab GEO 功能介绍

极狐GitLab 是 GitLab 在中国的发行版&#xff0c;关于中文参考文档和资料有&#xff1a; 极狐GitLab 中文文档极狐GitLab 中文论坛极狐GitLab 官网 Geo (PREMIUM SELF) Geo 是广泛分布的开发团队的解决方案&#xff0c;可作为灾难恢复策略的一部分提供热备份。Geo 不是 开箱…

云原生(Cloud Native)的详解、开发流程及同类软件对比

以下是云原生&#xff08;Cloud Native&#xff09;的详解、开发流程及同类软件对比&#xff1a; 一、云原生核心概念 定义&#xff1a; 云原生&#xff08;Cloud Native&#xff09;是基于云环境设计和运行应用程序的方法论&#xff0c;强调利用云平台的弹性、分布式和自动化…

学习笔记:减速机工作原理

学习笔记&#xff1a;减速机工作原理 一、减速机图片二、减速比概念三、减速机的速比与扭矩之间的关系四、题外内容--电机扭矩 一、减速机图片 二、减速比概念 即减速装置的传动比&#xff0c;是传动比的一种&#xff0c;是指减速机构中&#xff0c;驱动轴与被驱动轴瞬时输入速…

《UE5_C++多人TPS完整教程》学习笔记36 ——《P37 拾取组件(Pickup Widget)》

本文为B站系列教学视频 《UE5_C多人TPS完整教程》 —— 《P37 拾取组件&#xff08;Pickup Widget&#xff09;》 的学习笔记&#xff0c;该系列教学视频为计算机工程师、程序员、游戏开发者、作家&#xff08;Engineer, Programmer, Game Developer, Author&#xff09; Steph…

《空间复杂度(C语言)》

文章目录 前言一、什么是空间复杂度&#xff1f;通俗理解&#xff1a; 二、空间复杂度的数学定义三、常见空间复杂度举例&#xff08;含C语言代码&#xff09;&#x1f539; O(1)&#xff1a;常数空间&#x1f539; O(n)&#xff1a;线性空间&#x1f539; O(n^2)&#xff1a;平…