- 核心概念
- 创建Channel
- 无界通道
- 有界通道
- `FullMode`选项
- 生产者-消费者模式
- 生产者写入数据
- 消费者读取数据
- 完整示例
- 高级配置
- 优化选项:
- 取消操作:通过 `CancellationToken` 取消读写。
- 错误处理
- 适用场景
- `Channel`的类型
- 创建`Channel`
- 写入和读取消息
- 使用场景
- 示例代码
- 注意事项
在
C#
中,
System.Threading.Channels
提供了
高效的异步生产-消费模型,适用于多任务间的数据传递。以下是其核心概念及使用方法的总结:
核心概念
Channel<T>
:异步消息队列,支持多生产者和多消费者。
ChannelWriter<T>
:用于异步写入数据(WriteAsync
),完成后需调用 Complete()
。
ChannelReader<T>
:用于异步读取数据,支持 ReadAsync
或 ReadAllAsync
遍历。
创建Channel
无界通道
var channel = Channel.CreateUnbounded<int>();
容量无限,适用于不确定数据量的场景。
有界通道
var options = new BoundedChannelOptions(10)
{
FullMode = BoundedChannelFullMode.Wait // 满时等待
};
var channel = Channel.CreateBounded<int>(options);
FullMode
选项
-
Wait
(默认):写入时阻塞直到有空间。 -
DropOldest
/DropNewest
:丢弃最旧/最新数据。 -
DropWrite
:丢弃当前写入的数据。
生产者-消费者模式
生产者写入数据
async Task Producer(ChannelWriter<int> writer)
{
for (int i = 0; i < 10; i++)
{
await writer.WriteAsync(i);
await Task.Delay(100);
}
writer.Complete(); // 标记完成
}
消费者读取数据
async Task Consumer(ChannelReader<int> reader)
{
// 方式1: ReadAllAsync遍历
await foreach (var item in reader.ReadAllAsync())
{
Console.WriteLine($"Received: {item}");
}
// 方式2: 手动循环
while (await reader.WaitToReadAsync())
{
while (reader.TryRead(out var item))
{
Console.WriteLine($"Received: {item}");
}
}
}
完整示例
using System;
using System.Threading.Channels;
using System.Threading.Tasks;
class Program
{
static async Task Main()
{
var channel = Channel.CreateUnbounded<int>();
var producer = Producer(channel.Writer);
var consumer = Consumer(channel.Reader);
await Task.WhenAll(producer, consumer);
}
static async Task Producer(ChannelWriter<int> writer)
{
try
{
for (int i = 0; i < 10; i++)
{
await writer.WriteAsync(i);
await Task.Delay(100);
}
}
catch (Exception ex)
{
writer.Complete(ex); // 传递异常
}
finally
{
writer.Complete();
}
}
static async Task Consumer(ChannelReader<int> reader)
{
try
{
await foreach (var item in reader.ReadAllAsync())
{
Console.WriteLine($"Processed: {item}");
}
}
catch (Exception ex)
{
Console.WriteLine($"Error: {ex.Message}");
}
}
}
高级配置
优化选项:
var options = new UnboundedChannelOptions()
{
SingleWriter = true, // 单一生产者优化
SingleReader = false // 允许多消费者
};
取消操作:通过 CancellationToken
取消读写。
await writer.WriteAsync(item, cancellationToken);
错误处理
生产者异常时,调用 writer.Complete(ex)
通知消费者。
消费者通过 try-catch
捕获遍历时的异常。
适用场景
数据流水线处理。
高吞吐量的异步任务。
多任务间的负载均衡。
在C#
中,System.Threading.Channels
是一个强大的异步通信机制,主要用于实现生产者-消费者模式。它提供了线程安全的通道(Channel
),用于在不同线程之间传递数据。以下是关于C# Channel
的详细介绍:
Channel
的类型
Channel
有两种类型:
有界通道(Bounded Channel
):具有固定容量,当通道已满时,可以根据指定的策略处理新消息。
无界通道(Unbounded Channel
):没有容量限制,适合生产者和消费者速度匹配的场景。
创建Channel
使用Channel.CreateBounded<T>
创建有界通道,需要指定容量和满时的处理策略(如Wait
、DropNewest
、DropOldest
等)。
使用Channel.CreateUnbounded<T>
创建无界通道。
写入和读取消息
生产者通过channel.Writer.WriteAsync()
方法写入消息。
消费者通过channel.Reader.ReadAsync()
或channel.Reader.WaitToReadAsync()
读取消息。
使用场景
Channel
主要用于生产者-消费者模式,可以实现高效的异步数据处理。它支持多线程操作,并可以通过SingleReader
和SingleWriter
属性限制通道的读写行为。
示例代码
以下是一个简单的生产者-消费者示例:
var channel = Channel.CreateBounded<int>(new BoundedChannelOptions(10)
{
FullMode = BoundedChannelFullMode.Wait
});
Task producer = Task.Run(async () =>
{
for (int i = 0; i < 10; i++)
{
await channel.Writer.WriteAsync(i);
Console.WriteLine($"Produced: {i}");
}
channel.Writer.Complete();
});
Task consumer = Task.Run(async () =>
{
while (await channel.Reader.WaitToReadAsync())
{
if (channel.Reader.TryRead(out var item))
{
Console.WriteLine($"Consumed: {item}");
}
}
});
await Task.WhenAll(producer, consumer);
注意事项
- 缓冲区溢出:生产者写入速度过快可能导致缓冲区溢出。
- 正确关闭
Channel
:在数据完全消费后关闭Channel
,避免数据丢失。