C# ConcurrentQueue 使用详解

news2025/2/25 1:34:53

总目录


前言

在C#多线程编程中,数据共享如同走钢丝——稍有不慎就会引发竞态条件(Race Condition)或死锁。传统Queue<T>在并发场景下需要手动加锁,而ConcurrentQueue<T>作为.NET Framework 4.0 引入的线程安全集合,采用无锁算法(Lock-Free),能显著提升高并发场景下的性能。今天,我们就来深入探讨一下 ConcurrentQueue<T> 的使用方法和特性。


一、基本信息

1. 基本概念

  • ConcurrentQueue<T> 是一个线程安全的先进先出(FIFO)队列,属于 System.Collections.Concurrent 命名空间。它遵循先进先出(FIFO)的原则,允许多个线程同时对队列进行操作,而无需额外的锁机制。
  • 用于在生产者和消费者场景中高效地处理数据。但需要注意的是,它并不保证元素在同一个线程内入队顺序和出队顺序完全一致。

2. 核心特性速览

1) 线程安全保证

  • 无锁设计:通过CAS(Compare-And-Swap)原子操作实现高效并发
    • 无锁编程ConcurrentQueue<T> 使用了无锁编程技术,减少了锁的开销,提高了性能。
    • 原子操作:队列的入队和出队操作是原子性的,这意味着即使在多线程环境下,操作也不会被打断
  • FIFO原则:先进先出(但线程间顺序不绝对保证,在多线程环境下,队列的顺序可能会受到线程调度的影响。)
  • 高吞吐量:实测在16线程并发下吞吐量可达普通锁队列的3倍+
  • 内存高效:采用链表结构动态扩展,避免数组复制的开销

2) 性能对比(基准测试)

操作类型ConcurrentQueueQueue+Lock
100万次入队45 ms210 ms
100万次出队38 ms195 ms

3. 适用场景

  • 生产者 - 消费者模式(日志记录、任务分发)
    • 在生产者 - 消费者模式中,多个生产者线程同时向队列中放入任务(元素),多个消费者线程从队列中取出任务执行。ConcurrentQueue可以完美适配这种场景,确保数据的安全传递和并发操作的效率。例如,多个网络请求到达服务器(生产者),服务器将这些请求放入ConcurrentQueue,然后多个工作线程从队列中取出请求进行处理(消费者)。
  • 任务调度系统
    • 当需要调度多个任务按照顺序执行时,ConcurrentQueue可以用来存储任务的顺序。多个调度器线程可以从队列中取出任务并分配到合适的资源上执行,保证任务的有序性和并发性。

二、基本操作

1. 初始化队列

var queue = new ConcurrentQueue<string>();

2. 入队操作(Enqueue)

  • Enqueue方法用于向队列中添加元素。例如:
ConcurrentQueue<int> queue = new ConcurrentQueue<int>();
queue.Enqueue(1);
queue.Enqueue(2);
  • 在多线程环境下,多个线程可以同时调用Enqueue方法,而不需要担心数据冲突问题。
// 多线程安全添加
Parallel.For(0, 1000, i => {
    queue .Enqueue($"Item_{i}");
});

2. 出队操作(TryDequeue)

  • TryDequeue方法尝试从队列中取出一个元素。示例代码如下:
int value;
if (queue.TryDequeue(out value))
{
    Console.WriteLine(value);
}
// 或
if (queue.TryDequeue(out int value2))
{
    Console.WriteLine(value2);
}
  • 如果队列中有元素,TryDequeue会成功取出元素并将队列修改为相应的状态,返回true
  • 如果队列为空,则返回falsevalue保持其初始值。这一特性使得它在多线程并发访问队列时非常方便,不需要像普通队列那样额外进行线程同步处理。

3. 查看队首元素(TryPeek)

  • TryPeek方法可以查看队列的第一个元素而不将其移除队列。例如:
ConcurrentQueue<int> queue= new ConcurrentQueue<int>();
for (int i = 0; i < 10000; i++)
{
    queue.Enqueue(i);
}
int result = 0;
if (!queue.TryPeek(out result))
{
    Console.WriteLine("TryPeek failed when it should have succeeded");
}
else if (result!= 0)
{
    Console.WriteLine($"Expected TryPeek result of 0, got {result}");
}

4. TryGetNonEnumeratedCount 与 Count

1)TryGetNonEnumeratedCount 的作用

TryGetNonEnumeratedCount 是 .NET 6+ 引入的通用集合操作方法,其作用如下:

  • 尝试在不枚举集合的情况下获取元素数量
  • 对于实现了ICollection接口的类型(如ConcurrentQueue<T>ConcurrentBag<T>),直接返回Count属性值
  • 避免某些集合类型(如普通IEnumerable)需要枚举才能计数的性能损耗

2)与Count的区别

特性TryGetNonEnumeratedCountCount 属性
适用范围所有IEnumerable类型具体集合类型
返回值类型bool(是否成功获取)int(直接返回数量)
实现机制通过接口检查优化路径直接访问内部计数器
对未实现ICollection的集合可能返回false并需要枚举不可用

3) 示例

var queue = new ConcurrentQueue<int>();
queue.Enqueue(1);
queue.Enqueue(2);

// 传统方式(直接访问 Count 属性)
Console.WriteLine($"Count: {queue.Count}"); 

// 新方式(实现 ICollection 接口的通用方法)
if (queue.TryGetNonEnumeratedCount(out int count)) {
    Console.WriteLine($"Non-enumerated count: {count}");
}

对于ConcurrentQueue<T>,两种方式本质相同。但在编写通用集合处理代码时,TryGetNonEnumeratedCount能更好地兼容各种集合类型,避免对未实现ICollection接口的集合进行低效枚举

5. 其他操作

1)清空队列

// 清空队列(.NET 5+)
queue.Clear();  // 注意:非原子操作!

2)IsEmpty

判断集合是否为空(同样存在瞬时性,可能不准确)。

TryDequeue 可能失败,需结合循环或超时机制

while (!queue.IsEmpty)
{
	if (queue.TryDequeue(out int item)) Process(item);
}

3)批量操作

// 转换为数组
var snapshot = concurrentQueue.ToArray();

// 复制到目标数组
string[] buffer = new string[100];
concurrentQueue.CopyTo(buffer, 0);

三、为什么需要 ConcurrentQueue?

在多线程环境中,普通的队列(如 Queue<T>)可能会引发线程安全问题。例如,当多个线程同时对队列进行读写操作时,可能会导致数据丢失、异常或程序崩溃。而 ConcurrentQueue<T> 内部实现了高效的线程同步机制,确保了在并发场景下的数据安全。

1. 非线程安全案例

using System.Collections;

class Program
{
    static void Main()
    {
        // 非线程安全版本(错误示例)
        var unsafeQueue = new Queue<int>();
        Parallel.For(0, 1000, i => {
            unsafeQueue.Enqueue(i); // 会导致数据丢失或抛出异常
        });
        Console.WriteLine($"非安全集合数量: {unsafeQueue.Count}"); // 结果通常小于1000
    }
}

运行结果

  • 运行代码时,unsafeQueue .Count 通常会小于 1000,甚至可能抛出异常。
  • 结果不确定:由于线程竞争是随机的,每次运行的结果可能不同。

2. 为什么不安全?

1) 问题根源

  • 线程不安全的 Queue
    • Queue 是普通的先进先出(FIFO)集合,但不保证多线程并发操作的安全性。
    • 当多个线程同时调用 Enqueue() 时,可能发生以下问题:
      • 数据覆盖:多个线程可能同时修改队列的底层数组和内部索引(如 _size 和 _tail),导致写入位置冲突,部分数据被覆盖。
      • 容量扩展竞争:当队列需要扩容时,多个线程可能同时触发内部数组的重新分配,导致数据丢失或数组损坏。
      • 计数不一致:Count 属性的值可能因线程间竞争而无法正确累加。
  • Parallel.For 的并发写入
    • Parallel.For(0, 1000, i => { … }) 会创建多个线程并行执行 Enqueue(i)。

2)错误场景

假设两个线程同时执行 Enqueue()

  • 线程 A 和线程 B 同时读取队列的当前尾部索引 _tail,假设此时 _tail = 5。
  • 线程 A 将值写入索引 5,然后更新 _tail 为 6。
  • 线程 B 也将值写入索引 5(因为它在步骤 1 中读到的 _tail 是 5),覆盖线程 A 写入的数据。
  • 最终队列实际写入的数据少于预期,且 Count 的值可能小于 1000。

3. 解决方案

1)使用线程安全的 ConcurrentQueue<T>

var safeQueue = new ConcurrentQueue<int>();
Parallel.For(0, 1000, i => 
{
    safeQueue.Enqueue(i);  // 线程安全
});
Console.WriteLine($"安全集合数量: {safeQueue.Count}");  // 结果为 1000
  • ConcurrentQueue 内部通过无锁算法或细粒度锁保证线程安全。

2)手动同步(lock 语句)

var unsafeQueue = new Queue<int>();
object lockObj = new object();

Parallel.For(0, 1000, i => 
{
    lock (lockObj) 
    {       // 强制串行化写入
        unsafeQueue.Enqueue(i);
    }
});
  • 通过锁强制每次 Enqueue 操作串行执行,但会牺牲并发性能。

4. QueueConcurrentQueue

  1. Queue的区别
    • 在普通的Queue<T>中,如果不是线程安全的环境,在多线程同时进行入队和出队操作时可能会产生数据混乱等问题,需要手动进行加锁等操作来保证线程安全。而ConcurrentQueue<T>是线程安全的,不需要额外的锁操作就能正确处理并发情况。
  2. 性能优势
    • 在高并发场景下,ConcurrentQueue的非阻塞算法(无锁)相比使用锁的传统队列有更好的性能。例如,普通使用锁的入队和出队操作(如下代码),在高并发时会导致线程频繁阻塞和唤醒:
    • ConcurrentQueue通过原子操作避免了线程阻塞,提高了并发处理效率。
    public class LockedQueue<T>
    {
        private Queue<T> _queue = new Queue<T>();
        private object _lock = new object();
        public void Enqueue(T item)
        {
            lock (_lock)
            {
                _queue.Enqueue(item);
            }
        }
        public bool TryDequeue(out T result)
        {
            lock (_lock)
            {
                if (_queue.Count > 0)
                {
                    result = _queue.Dequeue();
                    return true;
                }
                result = default;
                return false;
            }
        }
    }
    

5. 使用示例

using System;
using System.Collections.Concurrent;
using System.Threading.Tasks;

public class Program
{
    static void Main()
    {
        ConcurrentQueue<int> queue = new ConcurrentQueue<int>();

        // 生产者线程
        Task producer = Task.Run(() =>
        {
            for (int i = 0; i < 10; i++)
            {
                queue.Enqueue(i);
                Console.WriteLine($"Enqueued: {i}");
            }
        });

        // 消费者线程
        Task consumer = Task.Run(() =>
        {
            while (true)
            {
                if (queue.TryDequeue(out int result))
                {
                    Console.WriteLine($"Dequeued: {result}");
                }
            }
        });

        Task.WaitAll(producer, consumer);
    }
}

在这个示例中,生产者线程负责向队列中添加数据,消费者线程负责从队列中移除数据。由于 ConcurrentQueue<T> 的线程安全性,我们无需担心线程冲突问题。

四、典型应用场景

1. 生产者-消费者模式(带优雅关闭)

public class PipelineExample
{
    private readonly ConcurrentQueue<DataPacket> _queue = new();
    private readonly CancellationTokenSource _cts = new();

    public void StartProcessing(int consumerCount)
    {
        // 生产者线程
        Task.Run(() =>
        {
            while (!_cts.IsCancellationRequested)
            {
                var data = ReceiveNetworkPacket();
                _queue.Enqueue(data);
            }
        });

        // 消费者线程池
        Parallel.For(0, consumerCount, i =>
        {
            while (true)
            {
                if (_queue.TryDequeue(out var data))
                {
                    ProcessData(data);
                }
                else if (_cts.IsCancellationRequested)
                {
                    break;
                }
                else
                {
                    SpinWait.SpinUntil(() => !_queue.IsEmpty || _cts.IsCancellationRequested);
                }
            }
        });
    }

    public void Stop() => _cts.Cancel();
}
ConcurrentQueue<SensorData> dataQueue = new();

// 生产者线程
Task.Run(() => 
{
    while (true) 
    {
        var data = ReadSensor();
        dataQueue.Enqueue(data);
        Thread.Sleep(100);
    }
});

// 消费者线程
Task.Run(() => 
{
    while (true) 
    {
        if (dataQueue.TryDequeue(out SensorData data)) 
        {
            SaveToDatabase(data);
        }
        else 
        {
            Thread.Sleep(50); // 降低CPU占用
        }
    }
});

2. 高并发日志系统设计

public static class AsyncLogger
{
    private static readonly ConcurrentQueue<string> _logQueue = new();
    private static readonly AutoResetEvent _signal = new(false);
    
    static AsyncLogger()
    {
        Task.Run(() =>
        {
            using var writer = new StreamWriter("app.log");
            while (true)
            {
                _signal.WaitOne();
                while (_logQueue.TryDequeue(out var message))
                {
                    writer.WriteLine($"[{DateTime.UtcNow:O}] {message}");
                }
                writer.Flush();
            }
        });
    }

    public static void Log(string message)
    {
        _logQueue.Enqueue(message);
        _signal.Set();
    }
}

五、注意事项

  1. 元素顺序的相对性
    • 虽然ConcurrentQueue遵循FIFO原则,但是由于并发操作的存在,同一个线程内先入队的元素可能会后出队。在编写代码时需要考虑到这种情况,避免对元素顺序有过于严格的预期。
    • 虽然号称FIFO,但在以下场景可能出现顺序异常:
    // 线程A
    cq.Enqueue(1); // 时间戳T1
    cq.Enqueue(2); // T2
    
    // 线程B
    cq.Enqueue(3); // T1.5
    
    // 可能出队顺序:1 → 3 → 2
    
  2. 内存管理
    • 在高频率入队和出队操作中,要注意内存的使用情况,因为队列中的元素可能会随着时间不断积累(如果没有及时消费),可能会导致内存占用过高。
    • 对象池模式:复用出队对象,减少GC压力
    • 容量监控:定期检查cq.Count,设置阈值报警
// 对象池示例
var objectPool = new ObjectPool<DataModel>(() => new DataModel());
var item = objectPool.Get();
try {
    // 使用item...
} finally {
    objectPool.Return(item);
}
  1. 避免频繁计数Count 属性需要遍历链表,复杂度O(n)

六、 替代方案

当需要线程安全的先进先出集合时,ConcurrentQueue<T>通常是首选。但在以下场景需考虑替代方案:

  • 优先级队列PriorityQueue(.NET 6+)
  • 延迟处理System.Threading.Channels
  • 跨进程通信MemoryMappedFile + 环形缓冲区
  • 在需要阻塞操作时考虑结合 BlockingCollection

与其他并发容器的对比

特性ConcurrentQueueBlockingCollectionChannels
阻塞操作✔️✔️ (.NET Core+)
边界控制✔️✔️
内存效率
适用场景非阻塞队列有界集合异步管道

结语

回到目录页:C#/.NET 知识汇总
希望以上内容可以帮助到大家,如文中有不对之处,还请批评指正。


参考资料:
ConcurrentQueue<T> 类

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2304640.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

spring中关于Bean的复习(IOC和DI)

文章目录 1.spring程序开发步骤1.1 导入spring开发的基本包坐标1.2 编写Dao接口和实现类1.3 创建spring核心配置文件1.4 在spring配置文件中配置UserDaoImpl1.5 使用Spring的Api获得Bean实例 2. Bean实例化的三种方式2.1 无参构造方法实例化2.2 工厂静态方法实例化2.3 工厂实例…

Docker内存芭蕾:优雅调整容器内存的极限艺术

title: “&#x1f4be; Docker内存芭蕾&#xff1a;优雅调整容器内存的极限艺术” author: “Cjs” date: “2025-2-23” emoji: “&#x1fa70;&#x1f4a5;&#x1f4ca;” 当你的容器变成内存吸血鬼时… &#x1f680; 完美内存编排示范 &#x1f4dc; 智能内存管家脚本…

一周学会Flask3 Python Web开发-flask3上下文全局变量session,g和current_app

锋哥原创的Flask3 Python Web开发 Flask3视频教程&#xff1a; 2025版 Flask3 Python web开发 视频教程(无废话版) 玩命更新中~_哔哩哔哩_bilibili flask3提供了session,g和current_app上下文全局变量来方便我们操作访问数据。 以下是一个表格&#xff0c;用于比较Flask中的…

【蓝桥杯单片机】客观题

一、第十三届省赛&#xff08;一&#xff09; 二、第十三届省赛&#xff08;二&#xff09;

QT中经常出现的用法:组合

在 C 中&#xff0c;一个类包含另一个类的对象称为组合&#xff08; Composition &#xff09;。这是一种常见的设计模式&#xff0c;用 于表示一个类是由另一个类的对象组成的。这种关系通常表示一种 " 拥有 " &#xff08; "has-a" &#xff09;的关系。…

下载CentOS 10

1. 进入官网&#xff1a;https://www.centos.org/ 2. 点击右上角的Download进入下载页面。 3. 选择对应的CPU架构&#xff0c;点击ISOs下面的Mirrors开始下载。

简说spring 的设计模式

spring 的设计模式&#xff08;23种…) &#xff08;面试题&#xff09;说说BeanFactory和FactoryBean的实现原理和区别&#xff1f; spring 中你还知道哪些设计模式&#xff1f;&#xff1f; 1.简单工厂模式 实质&#xff1a; 由一个工厂类根据传入的参数&#xff0c;动态决…

【 Avalonia UI 语言国际化 I18n】图文结合教学,保姆级教学,语言国际化就是这么简单(.Net C#)

完整项目地址 github : https://github.com/Crazy-GrowUp/AvaloniaI18nTest/tree/master gitee :https://gitee.com/jack_of_disco/avalonia-i18n-test 0.项目新建 Properties 文件夹 对应的项目配置文件里面就会增加 <Folder Include"Properties\" /> 1.项…

Spring Boot 日志管理(官网文档解读)

摘要 本篇文章详细介绍了SpringBoot 日志管理相关的内容&#xff0c;文章主要参考官网文章的描述内容&#xff0c;并在其基础上进行一定的总结和拓展&#xff0c;以方便学习Spring Boot 的小伙伴能快速掌握Spring Boot 日志管理相关的内容。 日志实现方式 Sping Boot 的日志管…

【玩转 Postman 接口测试与开发2_020】(完结篇)DIY 实战:随书示例 API 项目本地部署保姆级搭建教程(含完整调试过程)

《API Testing and Development with Postman》最新第二版封面 文章目录 最新版《Postman 接口测试与开发实战》示例 API 项目本地部署保姆级搭建教程1 前言2 准备工作3 具体部署3.1 将项目 Fork 到自己名下3.2 创建虚拟环境并安装依赖3.3 初始运行与项目调试 4 示例项目的用法…

网络运维学习笔记 021 HCIA-Datacom新增知识点02 SDN与NFV概述

SDN与NFV概述 经典IP网络是一个分布式的、对等控制的网络。 每台网络设备存在独立的数据平面、控制平面和管理平面。 设备的控制平面对等的交互路由协议&#xff0c;然后独立的生成数据平面指导报文转发。 它的优势在于设备与协议解耦&#xff0c;厂家间的兼容性较好且故障场景…

【Linux】多线程 -> 线程同步与基于BlockingQueue的生产者消费者模型

线程同步 条件变量 当一个线程互斥地访问某个变量时&#xff0c;它可能发现在其它线程改变状态之前&#xff0c;它什么也做不了。 例如&#xff1a;一个线程访问队列时&#xff0c;发现队列为空&#xff0c;它只能等待&#xff0c;直到其它线程将一个节点添加到队列中。这…

Docker Mysql 数据迁移

查看启动命令目录映射 查看容器名称 docker ps查看容器的启动命令 docker inspect mysql8.0 |grep CreateCommand -A 20如下图所示:我这边是把/var/lib/mysql 目录映射到我宿主机的/mnt/mysql/data目录下,而且我的数量比较大使用方法1的话时间比较久,所以我采用方法2 如果没…

四步彻底卸载IDEA!!!

各位看官早安午安晚安呀 如果您觉得这篇文章对您有帮助的话 欢迎您一键三连&#xff0c;小编尽全力做到更好 欢迎您分享给更多人哦 大家好&#xff0c;我们今天来学习四步彻底卸载IDEA&#xff01;&#xff01;&#xff01; 首先我要提醒各位 如果你想删除 IDEA 相关&#xf…

HTTP实验(ENSP模拟器实现)

HTTP概述 HTTP&#xff08;HyperText Transfer Protocol&#xff0c;超文本传输协议&#xff09;&#xff0c;设计HTTP最初的目的是为了提供一种发布和接收HTML页面的方法。 HTTP定义了多种请求方法&#xff0c;常用的包括&#xff1a; GET&#xff1a;请求资源。 POST&…

【网络安全】常见的web攻击

1、SQL注入攻击 定义&#xff1a; 攻击者在HTTP请求中注入恶意的SQL代码&#xff0c;当服务器利用参数构建SQL语句的时候&#xff0c;恶意的SQL代码被一起构建,并在数据库中执行。 示例&#xff1a; 用户登录&#xff1a; 输入用户名xx&#xff0c; 密码 or 1 …

登录-05.JWT令牌-介绍

一.JWT令牌 JWT令牌是一种简洁的、自包含的格式&#xff0c;用于在通讯双方之间以json数据格式安全的传输数据。说白了&#xff0c;JWT令牌就是将json格式的数据进行封装&#xff0c;从而实现安全传输。 所谓简洁&#xff0c;就是指JWT令牌就是一个简单的字符串。 所谓自包含…

K8S下redis哨兵集群使用secret隐藏configmap内明文密码方案详解

#作者&#xff1a;朱雷 文章目录 一、背景环境及方案说明1.1、环境说明1.2、方案一&#xff1a;使用配置文件设置密码1.3、方案二&#xff1a;使用args 的命令行传参设置密码 二、redis secret configmap deployment参考2.1 创建secret-redis.yaml参考2.2 修改configmap配置参…

Spring框架基本使用(Maven详解)

前言&#xff1a; 当我们创建项目的时候&#xff0c;第一步少不了搭建环境的相关准备工作。 那么如果想让我们的项目做起来方便快捷&#xff0c;应该引入更多的管理工具&#xff0c;帮我们管理。 Maven的出现帮我们大大解决了管理的难题&#xff01;&#xff01; Maven&#xf…

鸿蒙NEXT应用App测试-专项测试(DevEco Testing)

注意&#xff1a;大家记得先学通用测试在学专项测试 鸿蒙NEXT应用App测试-通用测试-CSDN博客 注意&#xff1a;博主有个鸿蒙专栏&#xff0c;里面从上到下有关于鸿蒙next的教学文档&#xff0c;大家感兴趣可以学习下 如果大家觉得博主文章写的好的话&#xff0c;可以点下关注…