如何用rust实现一个异步channel

news2024/12/26 9:17:35

目录

  • 前言
  • 思路
    • 实现功能
    • 代码实现
  • 测试
    • 先引测试版包
    • 测试代码
    • 结果与分析
    • 思考
  • 尾语

前言

使用通信来共享内存,而不是通过共享内存来通信

上面这句话,是每个go开发者在 处理多线程通信时 的座右铭,go甚至把实现这个理念的channel直接焊在编译器里,几乎所有的go程序里都有channel的身影。
rust的异步和go的goroutine有异曲同工之妙,甚至可以把 tokio::spawn 理解为go关键字。但在rust中好像并没有异步channel的实现。本着求人不如求己的原则,决定diy一个类似go的channel。

思路

先看一下发送流程

唤醒发送任务
缓存已满,添加任务到
插入成功
重新发送
发送消息
尝试添加到缓存中
缓存空出
发送队列
唤起接收任务
End

再看一下接收流程

唤醒接受任务
缓存为空,添加任务至
取值成功
重新取值
接收消息
尝试从缓存中取消息
缓存入值
接收队列
唤起发送任务
End

总体来说流程清晰易懂,不管接收还是发送,都是先尝试从缓存队列中操作值,不成功则加入到对应队列,等待再次执行。反之则唤起相关任务,结束操作。

实现功能

  1. 首先需要实现一个存放值的环形缓冲区,并且每个单元应该是单独加锁的,从而避免全局锁。
  2. 需要两个任务队列,用来存放在饥饿模式(从缓存操作失败)下的 发送任务和接受任务。
  3. 按照rust习惯,将发送者和接受者拆开,并各自实现future
  4. 因为唤醒不是同步的,需要通过一个唤醒器来唤醒沉默的任务。
  5. 使用原子操作替代锁

代码实现

具体的就不写了,放在github上了
github地址:https://github.com/woshihaoren4/wd_tools/tree/main/src/channel

测试

这里主要和async-channel测试一下

  • async-channel 是最常见的异步channel,在crateio上有两千万的下载。

先引测试版包

cargo.toml

[dependencies]
tokio = {version = "1.22.0",features=["full"]}
wd_tools = {version = "0.8.3",features = ["sync","chan"]}
async-channel = "1.8.0"
  • wd_tools 是我们的channel,这里引用的sync chan两个feature,前者用于测试,后者是chan实现。

测试代码

测试场景:设置缓存长度为10,发100万数据,接100万数据。在1发送者1接受者,1发送者10接受者,10发送者1接受者,10发送者10接受者四种情况下的收发性能。

use std::fmt::Debug;
use wd_tools::channel as wd;
use async_channel as ac;

#[tokio::main]
async fn main(){
    let ts = TestService::new(10);
    println!("test start ------------> wd_tools");
    ts.send_to_recv("1-v-1",true,100_0000,1,100_0000,1,|x|x).await;
    ts.send_to_recv("1-v-10",true,100_0000,1,10_0000,10,|x|x).await;
    ts.send_to_recv("10-v-1",true,10_0000,10,100_0000,1,|x|x).await;
    ts.send_to_recv("10-v-10",true,10_0000,10,10_0000,10,|x|x).await;
    println!("wd_tools <------------- test over");
    println!("test start ------------> async-channel");
    ts.send_to_recv("1-v-1",false,100_0000,1,100_0000,1,|x|x).await;
    ts.send_to_recv("1-v-10",false,100_0000,1,10_0000,10,|x|x).await;
    ts.send_to_recv("10-v-1",false,10_0000,10,100_0000,1,|x|x).await;
    ts.send_to_recv("10-v-10",false,10_0000,10,10_0000,10,|x|x).await;
    println!("async-channel <------------ test over");
}

struct TestService<T>{
    wd_sender : wd::Sender<T>,
    wd_receiver : wd::Receiver<T>,
    ac_sender : ac::Sender<T>,
    ac_receiver : ac::Receiver<T>
}

impl<T:Unpin+Send+Sync+Debug+'static> TestService<T>{
    pub fn new(cap:usize)->TestService<T>{
        let (wd_sender,wd_receiver) = wd::Channel::new(cap);
        let (ac_sender,ac_receiver) = ac::bounded(cap);
        TestService{wd_sender,wd_receiver,ac_sender,ac_receiver}
    }

    pub fn send<G:Fn(usize)->T+Send+Sync+'static>(&self,wg:wd_tools::sync::WaitGroup,is_wd:bool,max:usize,generater:G){
        let wd_sender = self.wd_sender.clone();
        let ac_sender = self.ac_sender.clone();

        wg.defer_args1(|is_wd|async move{
            for i in 0..max {
                let t = generater(i);
                if is_wd {
                    wd_sender.send(t).await.expect(" 发送失败");
                }else{
                    ac_sender.send(t).await.expect(" 发送失败");
                }
            }
        },is_wd);
    }
    pub fn recv(&self,wg:wd_tools::sync::WaitGroup,is_wd:bool,max:usize){
        let wd_receiver = self.wd_receiver.clone();
        let ac_receiver = self.ac_receiver.clone();

        wg.defer_args1(|is_wd|async move{
            for _i in 0..max {
                if is_wd {
                    wd_receiver.recv().await.expect(" 接收失败");
                }else{
                    ac_receiver.recv().await.expect(" 接收失败");
                }
            }
        },is_wd);
    }

    pub async fn send_to_recv<G:Fn(usize)->T+Send+Sync+Clone+'static>(&self,info:&'static str, is_wd:bool, sbase:usize, sgroup:usize, rbase:usize, rgroup:usize, generater:G){
        let now = std::time::Instant::now();
        let wg = wd_tools::sync::WaitGroup::default();
        let wg_send = wd_tools::sync::WaitGroup::default();
        let wg_recv = wd_tools::sync::WaitGroup::default();

        for _ in 0..sgroup{
            self.send(wg_send.clone(),is_wd,sbase,generater.clone());
        }
        for _ in 0..rgroup{
            self.recv(wg_recv.clone(),is_wd,rbase);
        }

        wg.defer(move ||async move{
            let now = std::time::Instant::now();
            wg_send.wait().await;
            println!("test[{}] ---> send use time:{}ms",info,now.elapsed().as_millis());
        });
        wg.defer(move ||async move{
            let now = std::time::Instant::now();
            wg_recv.wait().await;
            println!("test[{}] ---> recv use time:{}ms",info,now.elapsed().as_millis());
        });

        wg.wait().await;
        println!("test[{}] ---> all use time:{}ms",info,now.elapsed().as_millis());
    }
}

结果与分析

测试10次,取平均值做表,如下
在这里插入图片描述
如上图,得结论

  • 在1发收者和10发收者的情况下,两种channel效率相差不多。
  • 在发送者和接受者数量不等时,wd_tools::channel的性能明显优于async-channel

思考

分析结论之前先看一下async-channel的实现。虽然async-channel也是异步,但它并不依赖某个异步运行时来进行任务的上线文切换,而是使用concurrent-queueevent-listener进行消息调度,底层依赖于std::thread::park_timeout

相比event-listener的调度方式,直接管理tokio的Context则更适用于异步环境。尤其是存在大量等待的场景。如上面测试,接受者和发送者数量不等,需要长时间等待的情况。实际开发中,接受者或者发送者可能长时间处于饥饿的情况下,wd_tools::channel不会产生多余的资源开销,毕竟上下文被挂起了,也就不会被cpu执行。

当然实际是复杂的,因情而异,使用的CPU数量(线程数),缓存长度,异步任务数同样会影响消息队列的性能,尤其是不需要等待的场景下async-channel性能更优。

wd_tools::channel则更适合tokio异步环境。并且不会引起线程park,而产生其他影响。

尾语

wd_tools::channel 目前只是一个初级版本,还有很多地方待优化,比如过多的状态判断,对缓存区直接轮训加锁,而没有采用优化算法, 唤醒器完全可以通过一定优化策略替换带。
但这个思路是没错的,欢迎有想法的同志加入进来。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/695055.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

台灯太亮会影响视力吗?选灯一定要注意这几个点!

灯太亮对眼睛有没有影响&#xff0c;取决于灯“亮”的程度和使用的时间。如果是偶尔有需求&#xff0c;灯过于亮&#xff0c;使用时间不长的话对眼睛倒是没有太大的影响。但如果是长时间使用的&#xff0c;就不能使用过亮的灯了&#xff0c;容易导致睫状肌代偿性收缩、导致眼睛…

RISC-V处理器的设计与实现(三)—— 上板验证

文章目录 RISC-V处理器的设计与实现&#xff08;一&#xff09;—— 基本指令集_Patarw_Li的博客-CSDN博客 RISC-V处理器的设计与实现&#xff08;二&#xff09;—— CPU框架设计_Patarw_Li的博客-CSDN博客 RISC-V处理器的设计与实现&#xff08;三&#xff09;—— 上板验…

人机混合智能概述

人机混合智能是指将人类的智能和计算机的智能结合起来&#xff0c;实现更加智能化的决策和行动。人机混合智能的发展历史可以追溯到20世纪50年代早期&#xff0c;当时计算机还是庞大的机器&#xff0c;只能由专业人员操作。但随着计算机技术的不断发展&#xff0c;出现了更为普…

JavaScript之鼠标事件、坐标轴、定位、clientXY、offsetXY、layerXY、pageXY、screenXY

文章目录 MouseEvent的事件类别阻止鼠标的默认事件去除单击右键菜单阻止图像默认拖拽阻止文字的拖拽和选择阻止表单提交及重置打印输出MouseEvent对象内容clientX和clientY与x和yoffsetXYlayerXYpageXYscreenXY总结 MouseEvent的事件类别 序号事件描述1mousedown鼠标按下2mouse…

多元回归预测 | Matlab鲸鱼算法(WOA)优化极限学习机ELM回归预测,WOA-ELM回归预测,多变量输入模型

文章目录 效果一览文章概述部分源码参考资料效果一览 文章概述 多元回归预测 | Matlab鲸鱼算法(WOA)优化极限学习机ELM回归预测,WOA-ELM回归预测,多变量输入模型 评价指标包括:MAE、RMSE和R2等,代码质量极高,方便学习和替换数据。要求2018版本及以上。 部分源码 %% 清空环…

【C/C++】使用类和对象 设计立方体案例

创作不易&#xff0c;本篇文章如果帮助到了你&#xff0c;还请点赞 关注支持一下♡>&#x16966;<)!! 主页专栏有更多知识&#xff0c;如有疑问欢迎大家指正讨论&#xff0c;共同进步&#xff01; &#x1f525;c系列专栏&#xff1a;C/C零基础到精通 &#x1f525; 给大…

6.2.2 复制、删除与移动: cp, rm, mv

要复制文件&#xff0c;使用cp(copy)指令&#xff0c;移动目录与文件&#xff0c;使用mv(move)&#xff0c;这个指令直接拿来作更名的动作&#xff08;rename&#xff09;&#xff0c;移除是rm(remove)。 cp &#xff08;复制文件或目录&#xff09; 复制&#xff08;cp&#…

(07)装拆箱,自定义泛型,泛型约束,foreach,枚举器,迭代器,文件目录操作,TreeView,递归

一、作业问题 1.CompareTo是按什么规则标准进行比较的&#xff1f; 当前区域性执行单词 (区分大小写和区分区域性) 比较。 有关单词、字符串和序号排序 的详细信息&#xff0c;请参阅 System.Globalization.CompareOptions。 并不是按照…

每天一点Python——day42

#第四十二天 #判断字典中关键字是否存在in 存在返回Ture&#xff1b;反之为False not in 不存在返回True&#xff1b;反之为False#例&#xff1a; b{师傅:1000,师祖:10000,徒弟:500} print(师傅in b) print(师傅 not in b) #字典元素的删除del 字典名[健名]#例 a{张三:100,李四…

为什么现代的低代码开发平台都不支持导出源代码?

摘要&#xff1a;本文由葡萄城技术团队于CSDN原创并首发。转载请注明出处&#xff1a;葡萄城官网&#xff0c;葡萄城为开发者提供专业的开发工具、解决方案和服务&#xff0c;赋能开发者。 初次接触低代码的程序员大多会纠结一个问题&#xff0c;为什么功能越强大的低代码开发平…

C语言进阶--自定义类型详解

目录 一.结构体 1.1.结构的声明 1.2.结构的自引用 1.3.结构体变量的定义和初始化 1.4.结构成员的访问 1.5.结构体内存对齐 1.6.修改默认对齐数 1.7.offsetof宏 1.8.结构体传参 1.9.位段 二.枚举 2.1.枚举的定义 2.2.枚举的使用 2.3.枚举的优点 三.联合(共用体) …

ODrive电路设计中的接地环路

对于要进行通信的电气设备,大多数时候它们需要公共接地连接。最佳实践是将接地连接回一个点,称为“星形接地”。如果有多个接地路径,则会形成“接地环路”。接地环路和导线电感可能会导致 ODrive 等大电流电子设备出现问题。作为可能出错的示例,请查看下图。 问题: 问题在…

【计算机网络】数据链路层--点对点协议PPP

1.概念 2.构成 3.封装成帧 - 帧格式 4.透明传输 4.1字节填充法&#xff08;面向字节的异步链路&#xff09; 4.2.比特填充法&#xff08;面向比特的同步链路&#xff09; 5.差错检测 6.工作状态 7.小结

使用Vite 搭建高可用的服务端渲染SSR工程

在非常早期的 Web 开发中&#xff0c;大家还在使用 JSP 这种古老的模板语法来编写前端的页面&#xff0c;然后直接将 JSP 文件放到服务端&#xff0c;在服务端填入数据并渲染出完整的页面内容&#xff0c;可以说那个时代的做法是天然的服务端渲染。但随着 AJAX 技术的成熟以及各…

Typescript中的interface,type和class的相同点和不同点

感觉他们很像是不是&#xff1f; 他们确实有一些相同点&#xff1a; 相同点&#xff1a; 它们都可以用来描述对象的形状&#xff0c;即属性和方法。它们都可以被继承或实现&#xff0c;形成新的类型或类。它们都可以使用泛型参数&#xff0c;增加类型的灵活性和复用性。 不同…

jenkins shell脚本问题

问题描述&#xff1a; mac电脑配置了jenkins,同样的脚本&#xff0c;mac 电脑终端执行没有问题&#xff0c;复制到jenkins时&#xff0c;jenkins shell命令识别不了 -n指令。 解决方案&#xff1a; jenkins 系统配置中&#xff0c;找到shell 模块&#xff0c;配置上本地的路…

继骨传导耳机之后,新发布开放式耳机又成断货王!2年3代爆款,南卡怎么吸引年轻人?

今年618后&#xff0c;南卡的开放式耳机OE Pro成了新一代“断货王”&#xff0c;火爆程度直逼南卡的骨传导耳机Pro系列。 仔细想想&#xff0c;南卡已做出了3代爆款&#xff1a;骨传导Pro系列、骨传导Noe系列&#xff0c;南卡开放式OE系列&#xff0c;并且每一代都带动了该系列…

四、Docker镜像详情

学习参考&#xff1a;尚硅谷Docker实战教程、Docker官网、其他优秀博客(参考过的在文章最后列出) 目录 前言一、Docker镜像1.1 概念1.2 UnionFS&#xff08;联合文件系统&#xff09;1.3 Docker镜像加载原理1.4 重点理解 二、docker commit 命令2.1 是什么&#xff1f;2.2 命令…

分布式调用与高并发处理 Zookeeper分布式协调服务

一、Zookeeper概述 1.1 集中式和分布式 单机架构 一个系统业务量很小的时候所有的代码都放在一个项目中就好了&#xff0c;然后这个项目部署在一台服务器上&#xff0c;整个项目所有的服务都由这台服务器提供。 缺点&#xff1a; 服务性能存在瓶颈&#xff0c;用户增长的时候…

LENOVO联想笔记本电脑 拯救者Y520-15IKBN(80Y5)原装Win10系统文件,恢复出厂OEM系统

lenovo联想笔记本电脑&#xff0c;拯救者Y520-15IKBN(1050、1050Ti) (80Y5)出厂状态Windows10系统&#xff0c;原装OEM系统镜像 系统自带所有驱动、出厂主题壁纸LOGO、Office办公软件、联想电脑管家等预装程序 所需要工具&#xff1a;16G或以上的U盘 文件格式&#xff1a;IS…