C# Channel的入门与应用

news2024/11/23 12:44:05

C# Channel的入门与应用

1. 入门

Channel 是微软在 .NET Core 3.0 以后推出的新的集合类型,该类型位于 System.Threading.Channels 命名空间下,具有异步 API 、高性能、线程安全等等的特点。目前,Channel 最主要的应用场景是生产者-消费者模型。如下图所示,生产者负责向队列中写入数据,消费者负责从队列中读出数据。在此基础上,通过增加生产者或者消费者的数目,对这个模型做进一步的扩展。我们平时使用到的 RabbitMQ 或者 Kafka,都可以认为是生产者-消费者模型在特定领域内的一种应用,甚至于我们还能从中读出一点广义上的读写分离的味道。

class Producer<T>
{
    private readonly Queue<T> _queue;
    public Producer(Queue<T> queue) { _queue = queue; }
}

class Consumer<T>
{
    private readonly Queue<T> _queue;
    public Consumer(Queue<T> queue) { _queue = queue; }
}

这个思路理论上是没有问题的,可惜实际操作起来槽点满满。譬如,生产者应该只负责写,消费者应该只负责读,可当你亲手把一个队列传递给它们的时候,想要保持这种职责上的纯粹属实是件困难的事情,更不必说,在使用队列的过程中,生产者会有队列“满”的忧虑,消费者会有队列“空”的烦恼,如果再考虑多个生产者、多个消费者、多线程/锁等等的因素,显然,这并不是一个简单的问题。为了解决这个问题,微软先后增加了 BlockingCollection 和 BufferBlock 两种数据结构,这里以前者为例,下面是一个典型的生产者-消费者模型:

var bc = new BlockingCollection<int>();

// 生产者
var producer = Task.Run(() => {
    for (var i = 0; i < Count; i++) {
        bc.Add(i);
        Console.WriteLine("Producer Write Item: {0}", i);
    }
    bc.CompleteAdding();
});

// 消费者
var consumer = Task.Run(() => {
    while (!bc.IsCompleted) {
        if (bc.TryTake(out var item)) {
            Console.WriteLine("Consumer Read Item: {0}", item);
        }
    }
});

await Task.WhenAll(producer, consumer);

在这里插入图片描述
测试了读写 10000 条数据的场景下,三种数据结构各自的性能表现,显而易见 Channel 的性能是最好的.

2. 应用

// 创建一个有限容量的 Channel
var boundedChannel = Channel.CreateBounded<int>(100);

// 创建一个无限容量的 Channel
var unboundedChannel = Channel.CreateUnbounded<string>();

在生产者-消费者模型中,一个容量有限的固定,一定会无可避免地出现队列“满”的情形,此时,我们就需要制定某种策略或者机制来完善整个模型。对于这个问题,Channel 的解决方案是 BoundedChannelFullMode :

var boundedChannel = Channel.CreateBounded<string>(
    new BoundedChannelOptions(100) {FullMode = BoundedChannelFullMode.Wait});

这是一个枚举类型,事实上,它共有 Wait、DropNewest、DropOldest、DropWrite 四个取值,默认为 Wait。其中:

  • Wait:当队列已满时,写入数据时会返回 false,直到队列内有空间时可以继续写入。
  • DropNewest:移除最新的数据,即从队列尾部开始移除元素。
  • DropOldest:移除最旧的数据,即从队列头部开始移除元素。
  • DropWrite:可以写入数据,但是数据会被立即丢弃。

除了队列“满”或者队列“空”的问题,我们还考虑过多线程环境下的生产者-消费者模型可能会遇到的问题。值得庆幸的是, Channel 天生就支持多线程,我们可以通过 ChannelOptions 的 SingleWriter 和 SingleReader 来指定 Channel 是否是单一的消费者或者生产者,默认情况下,这两个值都是 false :

var boundedChannel = Channel.CreateBounded<string>(
    new BoundedChannelOptions(100) {
        SingleWriter = true,
        SingleReader = false,
        FullMode = BoundedChannelFullMode.Wait
});

通过以上代码片段,我们就可以创建出一个单生产者、多消费者的 Channel ,对于 Channel 而言,其最重要的两个成员分别是 Writer 和 Reader , 前者对应生产者,类型定义为:ChannelWriter;后者对应消费者,类型定义为:ChannelReader,这一次,我们做到了真正意义上的读写分离:

// 生产者生产数据
channel.Writer.TryWrite("大漠孤烟直,长河落日圆。");

// 消费者消费数据
// 模式一:一次读一个
while (await channel.Reader.WaitToReadAsync())
{
    while (channel.Reader.TryRead(out var item))
    {
        // 在这里写具体的处理逻辑
    }
}

// 模式二:一次全部读出来
while (await channel.Reader.WaitToReadAsync())
{
    await foreach (var item in channel.Reader.ReadAllAsync())
    {
        // 在这里写具体的处理逻辑
    }
}

下面这三个方法做了一件什么样的事情呢?我个人以为,这其实就是我们上面提到的数据流,首先,我们通过 GetFiles() 方法获得指定目录内的文件信息;然后,这些信息交给 Analyse() 方法去做处理,这里做的事情是统计出 markdown 格式文件的字符串,以及筛选出那些非 markdown 格式的文件或者子目录;最后,通过 Merge() 函数,我们将上一步的结果进行汇总输出。

// GetFiles
Task<Channel<string>> GetFiles(string root) {
    var filePathChannel = Channel.CreateUnbounded<string>();
    var directoryInfo = new DirectoryInfo(root);

    foreach (var file in directoryInfo.EnumerateFileSystemInfos()) {
        filePathChannel.Writer.TryWrite(file.FullName);
    }

    filePathChannel.Writer.Complete();
    return Task.FromResult(filePathChannel);
}

// Analyse
async Task<Channel<string>[]> Analyse(Channel<string> rootChannel) {
    var counterChannel = Channel.CreateUnbounded<string>();
    var errorsChannel = Channel.CreateUnbounded<string>();

    while (await rootChannel.Reader.WaitToReadAsync()) {
        await foreach (var filePath in rootChannel.Reader.ReadAllAsync()) {
            var fileInfo = new FileInfo(filePath);
            if (fileInfo.Extension == ".md") {
                var totalWords = File.ReadAllText(filePath).Length;
                counterChannel.Writer.TryWrite($"文章 [{fileInfo.Name}] 共 {totalWords} 个字符.");
            } else {
                errorsChannel.Writer.TryWrite($"路径 [{filePath}] 是文件夹或者格式不正确.");
            }
        }
    }

    counterChannel.Writer.Complete();
    errorsChannel.Writer.Complete();

    return new Channel<string>[] { counterChannel, errorsChannel };
}

// Merge
async Task<Channel<string>> Merge(params Channel<string>[] channels) {
    var mergeTasks = new List<Task>();
    var outputChannel = Channel.CreateUnbounded<string>();

    foreach (var channel in channels) {
        var thisChannel = channel;
        var mergeTask = Task.Run(async () => {
            while (await thisChannel.Reader.WaitToReadAsync()) {
                await foreach (var item in thisChannel.Reader.ReadAllAsync()) {
                    outputChannel.Writer.TryWrite(item);
                }
            }
        });

        mergeTasks.Add(mergeTask);
    }

    await Task.WhenAll(mergeTasks);
    outputChannel.Writer.Complete();

    return outputChannel;
}

// Run
var filePathChannel = await GetFiles(@"/hugo-blog/content/posts/");
var analysedChannels = await Analyse(filePathChannel);
var mergedChannel = await Merge(analysedChannels);
while (await mergedChannel.Reader.WaitToReadAsync()) {
    await foreach (var item in mergedChannel.Reader.ReadAllAsync()) {
        Console.WriteLine(item);
    }
}

从某种意义上来讲,这是一种“分治”策略,即:把一个大任务分解为若干个小任务,再将这些小任务的结果合并起来。很多年前,我曾在一本讲并行编程的书上见过类似的代码片段,那个时候我已经对 Google 的 MapReduce 略有耳闻,后来又接触到了 Parallel ,我突然意识到,如果 Map() 和 Reduce() 两个函数运行在一台远程服务器上,那么这个过程可以认为是 RPC,而运行在远程服务器上的这些函数,其实是在并行地执行着某种运算,那么这个过程可以认为是并行计算。当这些并行计算,使用的是世界各地的可伸缩计算资源时,那么这个过程其实就是云计算。所以说,写作这个过程还是挺有意思的,对不对?

原文链接


本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1511147.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

​FastIce-Tech 企业官网开源模版:专为中小企业设计的轻量级网址

标题&#xff1a;FastIce-Tech 企业官网开源模版&#xff1a;专为中小企业设计的轻量级网址 中小企业在建立企业官网时常常面临着时间、资源和技术的限制。为了解决这些问题&#xff0c;FastIce-Tech 企业官网开源模版应运而生。它是一个基于 Vue.js、ElementUI 和 Vue-Router …

帮管客CRM(jiliyu)接口SQL注入漏洞

文章目录 前言声明一、漏洞描述二、影响版本三、漏洞复现四、修复建议 前言 帮管客CRM客户管理系统专注于为企业提供crm客户关系管理、crm管理系统、crm软件产品及企业销售管理流程解决方案服务,助力企业业绩增长。 声明 请勿利用文章内的相关技术从事非法测试&#xff0c;由…

7-2 然后是几点

题目链接&#xff1a;7-2 然后是几点 一. 题目 1. 题目 2. 输入输出格式 3. 输入输出样例 4. 限制 二、代码 1. 代码实现 #include <stdio.h>int time_calc(int start_time, int used_time){int start_hour start_time / 100;int start_minute start_time % 100;…

利用ffmpeg对两个音频文件进行混音处理

前言 最近&#xff0c;拿到了一个语音识别程序&#xff0c;想测试一下它识别的准确性。原本程序有一段自己的测试音频&#xff0c;准确性还可以&#xff0c;但是&#xff0c;自己想增加一下测试素材的复杂性。想到了在原本的测试音频中引入干扰数据&#xff08;噪点&#xff…

灵魂指针,教给(三)

欢迎来到白刘的领域 Miracle_86.-CSDN博客 系列专栏 C语言知识 先赞后看&#xff0c;已成习惯 创作不易&#xff0c;多多支持&#xff01; 目录 一、 字符指针变量 二、数组指针变量 2.1 数组指针变量是什么 2.2 数组指针变量如何初始化 三、二维数组传参本质 四、函数…

C语言 —— 图形打印

题目1&#xff1a; 思路&#xff1a; 如果我们要打印一个实心正方形&#xff0c;其实就是一个二维数组&#xff0c;i控制行&#xff0c;j控制列&#xff0c;行列不需要控制&#xff0c;arr[i][j]直接打印星号即可。 对于空心正方形&#xff0c;我们只需要控制行和列的条件&…

【C语言程序设计】C语言求圆周率π(三种方法)

题目一&#xff1a; 利用公式①计求π的近似值&#xff0c;要求累加到最后一项小于10^(-6)为止。 程序代码&#xff1a; #include <stdio.h> #include <stdlib.h> #include <math.h> int main(){float s1;float pi0;float i1.0;float n1.0;while(fabs(i)&…

PaddleOCR表格识别运行实例

目录 PaddleOCR 开源项目地址 一、数据集 1. 训练数据下载 2.数据集介绍 &#xff08;1&#xff09;PubTabNet数据集 &#xff08;2&#xff09; 好未来表格识别竞赛数据集 &#xff08;3&#xff09;WTW中文场景表格数据集 二、训练步骤 1.数据放置 2.环境配置 &…

【递归搜索回溯专栏】专题二:二叉树中的深搜----二叉树剪枝

本专栏内容为&#xff1a;递归&#xff0c;搜索与回溯算法专栏。 通过本专栏的深入学习&#xff0c;你可以了解并掌握算法。 &#x1f493;博主csdn个人主页&#xff1a;小小unicorn ⏩专栏分类&#xff1a;递归搜索回溯专栏 &#x1f69a;代码仓库&#xff1a;小小unicorn的代…

Vue3全家桶 - Vue3 - 【4】侦听器

侦听器 一、 组合式API&#xff1a; 1.1 watch()函数 创建侦听器: 语法:// 先导入 watch 函数 import { watch } from vue watch(source, callback, options)source&#xff1a; 需要侦听的数据源&#xff0c;可以是 ref&#xff08;包括计算属性&#xff09;、一个响应式对…

mangoDB:2024安装

mangoDB:2024安装 mangoDB: 下载链接 取消勾选 配置环境变量 启动服务 同级目录下创建一个db文件夹 然后执行命令&#xff0c;启动服务 mongod --dbpath D:\environment\mango\db访问http://localhost:27017/ 出现下面的就是安装成功 2然后在管理员权限下给mango服务重…

【日常记录】【工具】随机生成图片的网站 Lorem Picsum

文章目录 1、介绍2、获取固定宽高的图片3、处理图片缓存4、 Emmet 缩写语法 1、介绍 Lorem Picsum 是一个免费的图片占位符服务&#xff0c;可以用于网站、应用程序或任何需要占位符图片的地方。它提供了一个简单的 API&#xff0c;可以通过 HTTP 请求获取随机图片&#xff0c;…

嵌入式驱动学习第三周——设备号与字符设备的注册、分配、释放

前言 这一篇博客来谈谈字符设备的注册、分配与释放。 嵌入式驱动学习专栏将详细记录博主学习驱动的详细过程&#xff0c;未来预计四个月将高强度更新本专栏&#xff0c;喜欢的可以关注本博主并订阅本专栏&#xff0c;一起讨论一起学习。现在关注就是老粉啦&#xff01; 目录 前…

CentOS Linux - Oracle Primavera P6安装及分享

引言 根据计划&#xff0c;近期我制作了多套基于ORACLE Primavera P6 最新发布的23.12版本预构建了虚拟机环境&#xff0c;里面包含了全套P6 最新版应用服务&#xff0c;相比于之前常使用的WindowsServer&#xff0c;这次使用了Linux作为运行平台。 此虚拟机仅用于演示、培训和…

掌握Redis,看完这篇文章就够了

提示&#xff1a;文章写完后&#xff0c;目录可以自动生成&#xff0c;如何生成可参考右边的帮助文档 文章目录 前言一、Redis是什么&#xff1f;二、Redis安装三、Redis相关数据类型 四、基础操作&#xff08;使用了python连接redis&#xff09;1.字符串2.键对应操作3.哈希&am…

实时查询银行卡归属地的API接口,快速获取卡片发卡地信息

快速查询银行卡发卡地信息是一项非常实用的功能&#xff0c;对于进行业务合作、风险评估等方面都有很大的帮助。在本文中&#xff0c;我们将介绍一个实时查询银行卡归属地的API接口&#xff0c;并提供相应的代码示例。 该API接口可以通过输入银行卡号&#xff0c;查询该卡片的…

Tictoc3例子

在tictoc3中&#xff0c;实现了让 tic 和 toc 这两个简单模块之间传递消息&#xff0c;传递十次后结束仿真。 首先来介绍一下程序中用到的两个函数&#xff1a; 1.omnetpp中获取模块名称的函数 virtual const char *getName() const override {return name ? name : "&q…

Rust 安装与版本更新

Rust 简介 Rust &#xff0c;一门赋予每个人构建可靠且高效软件能力的语言&#xff0c;主打内存安全。 2024年2月&#xff0c;在一份 19 页的报告《回归基础构件&#xff1a;通往安全软件之路》中&#xff0c;白宫国家网络主任办公室&#xff08;ONCD&#xff09;呼吁开发者使…

linux ,Windows部署

Linux部署 准备好虚拟机 连接好查看版本&#xff1a;java -version安装jdk 解压命令&#xff1a;tar -zxvf 加jdk的压缩文件名cd /etc 在编辑vim profile文件 在最底下写入&#xff1a; export JAVA_HOME/root/soft/jdk1.8.0_151&#xff08;跟自己的jdk保持一致&#xff0…

【网站项目】012医院住院管理系统

&#x1f64a;作者简介&#xff1a;拥有多年开发工作经验&#xff0c;分享技术代码帮助学生学习&#xff0c;独立完成自己的项目或者毕业设计。 代码可以私聊博主获取。&#x1f339;赠送计算机毕业设计600个选题excel文件&#xff0c;帮助大学选题。赠送开题报告模板&#xff…