C#中通道(Channels)的应用之(生产者-消费者模式)

news2025/1/16 10:02:46

一.生产者-消费者模式概述

生产者-消费者模式是一种经典的设计模式,它将数据的生成(生产者)和处理(消费者)分离到不同的模块或线程中。这种模式的核心在于一个共享的缓冲区,生产者将数据放入缓冲区,而消费者从缓冲区中取出数据进行处理。这种模式有助于提高系统的响应性和吞吐量,因为它允许生产者和消费者并行工作,互不干扰。

二.Channels 概念

Channels提供了一种通信机制,允许生产者和消费者之间安全、可靠地交换信息,即使它们在不同的执行线程上运行。自.NET Core 3.0引入以来,System.Threading.Channels命名空间为我们处理生产者-消费者模式等复杂场景提供了强大的支持。Channels已经完全集成到.NET的异步模型中,支持async/await关键字,提供了一种异步的消息传递机制。通道本质上是一个线程安全的队列,支持在生产者和消费者之间安全、可靠地传递数据。通道有两种类型:有限容量的bound Channel无限容量的unbound Channel。有限容量的通道在达到容量上限时会根据指定的策略处理新消息,而无限容量的通道则没有容量限制。

三.Channels 生产者-消费者模式实现

创建通道来作为生产者和消费者之间的共享缓冲区
  1. 无界通道
  • 无界容量的通道(即没有明确限制可以存储的项目数量的通道),使用 Channel.CreateUnbounded<T>() 方法,如:
// 创建一个无界通道
var unboundedChannel = Channel.CreateUnbounded<string>();
  1. 有界通道
  • 创建有界通道则需要指定通道的容量上限,对于有限容量的通道,当通道满时,生产者可能需要等待或丢弃新数据。同样,当通道空时,消费者可能需要等待新数据的到来。通道提供了多种策略BoundedChannelFullMode 枚举处理方式:Wait:当通道已满时,写操作会等待直到队列中有空间来写入新的数据。这种情况下如果 TryWrite 操作会返回 false。DropOldest:如果通道已满,会删除最旧的数据(也就是最早进入通道但还未被读取的数据),以便给新的数据腾出空间。DropNewest:与 DropOldest 相反,会删除最新写入但还未被读取的数据来让新数据容纳进来。DropWrite:直接删除当前正在尝试写入的数据。
    使用 Channel.CreateBounded<T>(int capacity) 方法。例如:
// 创建一个 有界通道
var boundedChannel = Channel.CreateBounded<string>(100);
实现生产者
  • 生产者负责生成数据并将其写入通道。通常使用循环,在该循环中,生产者生成数据并使用WriteAsync方法将其写入通道。
async Task ProducerAsync(ChannelWriter<string> writer)
{
    for (int i = 0; i < 100; i++)
    {
        await writer.WriteAsync(i.ToString());
        await Task.Delay(100); // 模拟数据生成的时间间隔
    }
    writer.Complete(); // 标记通道为完成写入,不再接受新数据
}
实现消费者
  • 消费者负责从通道中读取数据并进行处理。通常使用循环,在该循环中,消费者使用ReadAsync或ReadAllAsync方法从通道中读取数据,并对其进行处理。

async Task ConsumerAsync(ChannelReader<string> reader)
{
   while (await reader.WaitToReadAsync())
   {
      if (reader.TryRead(out var msgstring))
      {
         Console.WriteLine($"Consumed: {msgstring}");
        // 在这里处理数据
      }
   }
}

下面展示一个完整的生产者和消费者示例

  1. 启动 Program
// See https://aka.ms/new-console-template for more information

using System.Threading.Channels;
using System.Threading.Tasks;
using TestChannels;

Console.WriteLine("选择运行的模式?例如:1");
Console.WriteLine("1. 单生产单消费");
Console.WriteLine("2. 多生产单消费");
Console.WriteLine("3. 单生产多消费");
Console.WriteLine("4. 多生产多消费");
Console.WriteLine("请输入编号:");
var key = Console.ReadKey();

switch (key.KeyChar)
{
   case '1':
       await SingleProducerSingleConsumer();
       break;

   case '2':
       await MultiProducerSingleConsumer();
       break;

   case '3':
       await SingleProduceMultipleConsumers();
       break;

   case '4':
       await MultiProducerMultipleConsumers();
       break;
   default:
       Console.WriteLine("请先选择运行模式!");
       break;
}

// 单生产单消费
static async Task SingleProducerSingleConsumer()
{
   var channel = Channel.CreateUnbounded<string>();

   var producer1 = new Producer(channel.Writer, 1, 2000);
   var consumer1 = new Consumer(channel.Reader, 1, 1500);

   Task consumerTask1 = consumer1.ConsumerAsync(); // 开始消费
   Task producerTask1 = producer1.ProducerAsync(); // 开始生产

   await producerTask1.ContinueWith(_ => channel.Writer.Complete());

   await consumerTask1;
}

// 多生产单消费
static async Task MultiProducerSingleConsumer()
{
   var channel = Channel.CreateUnbounded<string>();
   List<Task> producerTasks = new List<Task>();
   for (int i = 1; i <= 3; i++)
   {
       producerTasks.Add(Task.Run(async () => {
           var producer = new Producer(channel.Writer, i, 2000);
           await producer.ProducerAsync();
       }));

       await Task.Delay(500); // 暂停500毫秒,启动另外一个生产
   }
   var consumer1 = new Consumer(channel.Reader, 1, 250);
   Task consumerTask1 = consumer1.ConsumerAsync(); // 开始消费

   await Task.WhenAll(producerTasks.ToArray()).ContinueWith(_ => channel.Writer.Complete());
   await consumerTask1;
}

// 单生产多消费
static async Task SingleProduceMultipleConsumers()
{
   var channel = Channel.CreateUnbounded<string>();

   var producer1 = new Producer(channel.Writer, 1, 100);
   List<Task> consumerTasks = new List<Task>();
   for (int i = 1; i <= 3; i++)
   {
       consumerTasks.Add(Task.Run(async () => {
           var consumer = new Consumer(channel.Reader, 1, 1500);
           await consumer.ConsumerAsync();
       }));
   }

   Task producerTask1 = producer1.ProducerAsync();

   await producerTask1.ContinueWith(_ => channel.Writer.Complete());

   await Task.WhenAll(consumerTasks.ToArray());
}


// 多生产多消费
static async Task MultiProducerMultipleConsumers()
{
   var channel = Channel.CreateUnbounded<string>();
   List<Task> producerTasks = new List<Task>();
   for (int i = 1; i <=3; i++)
   {
       Console.WriteLine("线程"+i.ToString());
       producerTasks.Add(Task.Run(async () => {
           var producer = new Producer(channel.Writer, i, 100);
           await producer.ProducerAsync();
       }));
       await Task.Delay(500); // 暂停500毫秒,启动另外一个生产
   }
   List<Task> consumerTasks = new List<Task>();
   for (int i = 1; i < 3; i++)
   {
       consumerTasks.Add(Task.Run(async () => {
           var consumer = new Consumer(channel.Reader, 1, 1500);
           await consumer.ConsumerAsync();
       }));
   }

   await Task.WhenAll(producerTasks.ToArray()).ContinueWith(_ => channel.Writer.Complete());

   await Task.WhenAll(consumerTasks.ToArray());
}



  1. 生产者Producer
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Channels;
using System.Threading.Tasks;

namespace TestChannels
{
   internal class Producer
   {
       private readonly ChannelWriter<string> _writer;
       private readonly int _identifier;
       private readonly int _delay;

       public Producer(ChannelWriter<string> writer, int identifier, int delay)
       {
           _writer = writer;
           _identifier = identifier;
           _delay = delay;
       }

       public async Task ProducerAsync()
       {
           Console.WriteLine($"开始 ({_identifier}): 发布消息");

           for (var i = 0; i < 10; i++)
           {
               await Task.Delay(_delay); // 停顿一下,方便观察数据

               var msg = $"P{_identifier} - {DateTime.Now:G}-{i}";

               Console.WriteLine($"发布 ({_identifier}): 消息成功 {msg}");

               await _writer.WriteAsync(msg);
           }

           Console.WriteLine($"发布 ({_identifier}): 完成");
       }
   }
}

  1. 消费者Consumer
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Channels;
using System.Threading.Tasks;

namespace TestChannels
{
   /// <summary>
   /// 消费
   /// </summary>
   internal class Consumer
   {
       private readonly ChannelReader<string> _reader;
       private readonly int _identifier;
       private readonly int _delay;

       public Consumer(ChannelReader<string> reader, int identifier, int delay)
       {
           _reader = reader;
           _identifier = identifier;
           _delay = delay;
       }

       public async Task ConsumerAsync()
       {
           Console.WriteLine($" 开始({_identifier}):消费 ");

           while (await _reader.WaitToReadAsync())
           {
               if (_reader.TryRead(out var timeString))
               {
                   await Task.Delay(_delay); // 停顿一下,方便观察数据

                   Console.WriteLine($"消费 ({_identifier}): 成功 {timeString}");
               }
           }

           Console.WriteLine($"消费 ({_identifier}): 完成");
       }
   }
}

运行

  • [ 参考] : https://learn.microsoft.com/en-us/dotnet/api/system.threading.channels?view=netcore-3.0

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2277486.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

4.寻找两个正序数组的中位数--力扣

给定两个大小分别为 m 和 n 的正序&#xff08;从小到大&#xff09;数组 nums1 和 nums2。请你找出并返回这两个正序数组的 中位数 。 算法的时间复杂度应该为 O(log (mn)) 。 示例 1&#xff1a; 输入&#xff1a;nums1 [1,3], nums2 [2] 输出&#xff1a;2.00000 解释&…

2Spark Core

2Spark Core 1.RDD 详解1) 为什么要有 RDD?2) RDD 是什么?3) RDD 主要属性 2.RDD-API1) RDD 的创建方式2) RDD 的算子分类3) Transformation 转换算子4) Action 动作算子 3. RDD 的持久化/缓存4. RDD 容错机制 Checkpoint5. RDD 依赖关系1) 宽窄依赖2) 为什么要设计宽窄依赖 …

面试题刷题

i 或 i 基础几个9&#xff08;评价系统的指标&#xff09; Arrays.aslist 的bug 方法做了重写 这样就能使用了 list的迭代器 不能使用list.remove方法。需要使用迭代器的remove方法 正确操作 Hashcode hashcode是object对象的方法 是一个native方法 hashcode冲突案例和hashcod…

编译pytorch——cuda-toolkit-nvcc

链接 https://blog.csdn.net/wjinjie/article/details/108997692https://docs.nvidia.com/cuda/cuda-installation-guide-linux/#switching-between-driver-module-flavorshttps://forums.developer.nvidia.com/t/can-not-load-nvidia-drivers-on-ubuntu-22-10/239750https://…

Linux网络_套接字_UDP网络_TCP网络

一.UDP网络 1.socket()创建套接字 #include<sys/socket.h> int socket(int domain, int type, int protocol);domain (地址族): AF_INET网络 AF_UNIX本地 AF_INET&#xff1a;IPv4 地址族&#xff0c;适用于 IPv4 协议。用于网络通信AF_INET6&#xff1a;IPv6 地址族&a…

【Go】Go Gorm 详解

1. 概念 Gorm 官网&#xff1a;https://gorm.io/zh_CN/docs/ Gorm&#xff1a;The fantastic ORM library for Golang aims to be developer friendly&#xff0c;这是官网的介绍&#xff0c;简单来说 Gorm 就是一款高性能的 Golang ORM 库&#xff0c;便于开发人员提高效率 那…

51单片机 AT24C02(I2C总线)

存储器 随机存储 RAM 只读存储 ROM AT24C02芯片 是一种可以实现掉电不丢失的存储器&#xff0c;可用于保存单片机运行时想要永久保存的数据信息 存储材质&#xff1a;E2PROM 通讯接口&#xff1a;I2C总线 容量&#xff1a;256字节 I2C总线 一种通用的数据总线 两根通信线…

再见IT!

再见IT 学了三年半前端&#xff0c;今天可能真的要和我最爱的前端说拜拜了&#xff01;没办法大局为重&#xff01; 在这个AI乱飞和短视频风口的时代&#xff0c;只能说当下学习任何一个技术远比2020年学习起来要简单的多。往后技术的发展无疑是飞速的&#xff0c;智能的&…

【开源免费】基于Vue和SpringBoot的人口老龄化社区服务与管理平台(附论文)

本文项目编号 T 140 &#xff0c;文末自助获取源码 \color{red}{T140&#xff0c;文末自助获取源码} T140&#xff0c;文末自助获取源码 目录 一、系统介绍二、数据库设计三、配套教程3.1 启动教程3.2 讲解视频3.3 二次开发教程 四、功能截图五、文案资料5.1 选题背景5.2 国内…

回归预测 | MATLAB实SVM支持向量机多输入单输出回归预测

效果一览 基本介绍 回归预测 | MATLAB实SVM支持向量机多输入单输出回归预测 …………训练集误差指标………… 1.均方差(MSE)&#xff1a;166116.6814 2.根均方差(RMSE)&#xff1a;407.5741 3.平均绝对误差&#xff08;MAE&#xff09;&#xff1a;302.5888 4.平均相对百分误…

系统学习算法:专题四 前缀和

题目一&#xff1a; 算法原理&#xff1a; 这道题是一维前缀和的模板题&#xff0c;通过这道题我们可以了解什么是前缀和 题意很简单&#xff0c;就是先输入数组个数和查询次数&#xff0c;然后将数组的值放进数组&#xff0c;每次查询给2个数&#xff0c;第一个是起点&#x…

智能科技与共情能力加持,哈曼重新定义驾乘体验

2025年1月6日&#xff0c;拉斯维加斯&#xff0c;2025年国际消费电子展——想象一下&#xff0c;当您步入一辆汽车&#xff0c;它不仅能响应您的指令&#xff0c;更能理解您的需求、适应您的偏好&#xff0c;并为您创造一个独特且专属的交互环境。作为汽车科技领域的知名企业和…

[java基础-集合篇]LinkedBlockingQueue源码解析

关联较强的上一篇&#xff1a;[java基础-集合篇]有界阻塞队列ArrayBlockingQueue源码解析-CSDN博客 总的来说。LinkedBlockingQueue 是一个基于链表节点的自定大小的线程安全的阻塞队列。遵循FIFO&#xff0c;结构上一端进一端出的单向队列。 源码注释 翻译 An optionally-boun…

从论文到实践:Stable Diffusion模型一键生成高质量AI绘画

&#x1f3e1;作者主页&#xff1a;点击&#xff01; &#x1f916;编程探索专栏&#xff1a;点击&#xff01; ⏰️创作时间&#xff1a;2024年12月24日10点02分 神秘男子影, 秘而不宣藏。 泣意深不见, 男子自持重, 子夜独自沉。 AI绘画一键生成美图-变成画家 本地部…

业务幂等性技术架构体系之消息幂等深入剖析

在系统中当使用消息队列时&#xff0c;无论做哪种技术选型&#xff0c;有很多问题是无论如何也不能忽视的&#xff0c;如&#xff1a;消息必达、消息幂等等。本文以典型的RabbitMQ为例&#xff0c;讲解如何保证消息幂等的可实施解决方案&#xff0c;其他MQ选型均可参考。 一、…

【2024年华为OD机试】 (B卷,100分)- 跳房子I(Java JS PythonC/C++)

一、问题描述 题目描述 跳房子&#xff0c;也叫跳飞机&#xff0c;是一种世界性的儿童游戏。 游戏参与者需要分多个回合按顺序跳到第1格直到房子的最后一格。 跳房子的过程中&#xff0c;可以向前跳&#xff0c;也可以向后跳。 假设房子的总格数是count&#xff0c;小红每…

鸿蒙打包发布

HarmonyOS应用/元服务发布&#xff08;打包发布&#xff09; https://developer.huawei.com/consumer/cn/doc/harmonyos-guides-V13/ide-publish-app-V13?catalogVersionV13 密钥&#xff1a;包含非对称加密中使用的公钥和私钥&#xff0c;存储在密钥库文件中&#xff0c;格式…

JAVA:在IDEA引入本地jar包的方法(不读取maven目录jar包)

问题&#xff1a; 有时maven使用的jar包版本是最新版&#xff0c;但项目需要的是旧版本&#xff0c;每次重新install会自动将mavan的jar包覆盖到项目的lib目录中&#xff0c;导致项目报错。 解决&#xff1a; 在IDEA中手动配置该jar包对应的目录。 点击菜单File->Projec…

Mac上安装Label Studio

在Mac上安装Anaconda并随后安装Label Studio&#xff0c;可以按照以下步骤进行&#xff1a; 1. 在Mac上安装Anaconda 首先&#xff0c;你需要从Anaconda的官方网站下载适用于Mac的安装程序。访问Anaconda官网&#xff0c;点击“Download Anaconda”按钮&#xff0c;选择适合M…

docker-compose和docker仓库

一、docker-compose 1.概述 docker-compose是一个自动编排工具&#xff0c;可以根据dockerfile自动化部署docker容器。 主要功能 配置定义 使用YAML文件&#xff08;通常命名为docker - compose.yml&#xff09;来描述应用程序的服务、网络和卷等配置。 容器编排 可以同时…