C#--在多线程中使用任务并行库(TPL)--15

news2025/1/17 12:49:43

目录

一.任务并行库的概念以及定义

二.主要特性

三.代码使用示例

1.最基础的Parallel.For使用方式

2.使用 ParallelOptions 来控制并行执行

3.Parallel.ForEach的使用(用于处理集合)

4.带有本地变量的并行循环(用于需要累加或统计的场景)

5.结合Task和Parallel的高级示例

6.使用ParallelOptions结合异步操作

总结


一.任务并行库的概念以及定义

在C#中,任务并行库(Task Parallel Library,简称TPL)是.NET Framework和.NET Core中提供的一组用于简化并行编程和异步编程的公共类型和API,旨在帮助开发人员更容易的利用多核处理器的能力,提高应用程序的性能和响应速度

任务(Task):

TPL中的核心概念是任务,它表示一个异步操作,可以理解为某项工作或操作的抽象封装.

任务可以异步的执行,并可以返回结果,报告进度,处理取消和异常等

数据并行(Data Parallelism):

通过并行执行集合中的各个元素的操作,如使用Parallel.For和Parallel.ForEach方法,可以同时处理大量数据,提高数据处理的效率

任务并行(Task Parallelism):

利用任务来并发的执行不同的方法或操作,适用于多个操作相互独立,可以同时执行的场景

二.主要特性

  • 1.简化并行编程:TPL提供了更高级别的抽象,开发者无需直接管理线程的创建,同步等复杂细节
  • 2.任务调度:内置的任务调度器会智能地将任务映射到线程池中的线程上,优化资源使用
  • 3.异常处理:提供了机制来捕获和处理并行执行中产生的异常,确保应用程序的稳定性
  • 4.任务组合:支持将多个任务链接到一起,形成任务的连续执行和组合(如 ContinueWith方法)
  • 5.取消和超时:提供了取消令牌(Cancellation Token),允许在需要时取消任务的执行
  • 6.异步和编程支持:与 async 和 await 关键字结合使用,可以编写简单的异步代码,提升应用程序的响应性

三.代码使用示例

1.最基础的Parallel.For使用方式

 Console.WriteLine("\n基础的 Parallel.For 示例:");
 // 这里的 (0, 5) 表示从0开始到4结束(不包含5)
 // i => { } 是一个委托,表示对每个数字要执行的操作
 Parallel.For(0, 5, i =>
 {
     Console.WriteLine($"正在并行处理数字 {i}");
     Thread.Sleep(100); // 模拟一些耗时的工作
 });

代码输出结果示例:

因为是并行处理,所以执行顺序不是固定的.

2.使用 ParallelOptions 来控制并行执行

在C#中, ParallelOptions 是用于配置并行操作行为的选项类,主要用于控制Parallel类提供的并行循环和任务的执行方式

Parallel类包含如:Parallel.For ,Parallel.ForEach和Parallel.Invoke等方法.它们可以利用多核处理器并行执行任务

ParallelOptions的主要属性:

  • MaxDegreeOfParallelism:指定并行操作的最大并行度,即同时运行的最大线程数,默认情况下MaxDegreeOfParallelism为-1,表示不限制并行度,线程数由.NET框架根据可用的处理器核心数量自动调度
  • CancellationToken:用于接收取消请求的令牌,可以在并行操作中响应取消操作.支持任务的取消操作,当需要在某些条件下中止并行任务时,可以使用CancellationTokenSource生成令牌,并在操作中监视CancellationToken的取消请求
  • TaskScheduler:指定任务的调度器,控制任务的执行上下文,如果不设置默认使用TaskScheduler.Default
Console.WriteLine("\n使用 ParallelOptions 的 Parallel.For:");
// 创建一个 CancellationTokenSource,用于发送取消请求
CancellationTokenSource cts = new CancellationTokenSource();
//配置ParallelOptions
var options = new ParallelOptions
{
    // Environment.ProcessorCount 获取CPU的核心数
    // MaxDegreeOfParallelism 控制最大同时执行的任务数
    MaxDegreeOfParallelism = 2,// 这里限制最多同时执行2个任务
    CancellationToken = cts.Token // 将取消令牌传递给并行循环
};

try
{
    // 使用配置了选项的并行循环
    Parallel.For(0, 5, options, i =>
    {
        Console.WriteLine($"使用受限并行度处理数字 {i},线程Id: {Task.CurrentId}");
        // 可选:在工作中检查取消请求
        options.CancellationToken.ThrowIfCancellationRequested();
        Thread.Sleep(100);
        Console.WriteLine($"完成处理索引 {i}");
    });
}
catch (OperationCanceledException)
{

    Console.WriteLine("操作已被取消。");
}
finally
{
    cts.Dispose();
}

代码输出结果示例:

代码中限制了最多同时执行两个线程的任务

当两个线程中的其中一个线程完成任务并释放后才会处理下一个任务

3.Parallel.ForEach的使用(用于处理集合)

  Console.WriteLine("\n使用 Parallel.ForEach 示例:");
  // 创建一个简单的字符串列表
  List<string> fruits = new List<string>
  {
      "苹果", "香蕉", "橙子", "葡萄"
  };

  // 并行处理列表中的每一项
  Parallel.ForEach(fruits, fruit =>
  {
      Console.WriteLine($"正在处理水果: {fruit},线程Id: {Task.CurrentId}");
      Thread.Sleep(100);
      Console.WriteLine($"完成处理水果: {fruit}");
  });

代码示例中的Parallel.ForEach方法概述:

第一个参数: fruits

  • 类型: IEnumberable<T>,在这是时IEnumberable<string>
  • 代表要迭代的集合,也就是我们要并行处理的元素序列(在当前示例中指代fruits)

第二个参数: fruit=>{...}

  • 类型:Action<T> 这里是Action<string>
  • 这是一个委托,表示要对每个元素执行的操作.在这里我们使用了 Lambda表达式 定义这个操作
  • fruit: Lambda表达式的参数,代表当前正在处理的集合元素.在每次的迭代中,fruit将被赋值为fruits集合中的一个元素

代码输出结果示例:

4.带有本地变量的并行循环(用于需要累加或统计的场景)

  Console.WriteLine("\n带本地变量的 Parallel.For 示例:");
  int sum = 0; // 定义一个全局计数器
  Parallel.For(0, 10,
      // 初始化方法:为每个并行任务创建一个本地计数器
      () => 0,

      // 本体方法:处理每个数字并更新本地计数器
      (i, loop, localSum) =>
      {
          localSum += i; // 将当前数字加到本地计数器
          Console.WriteLine($"线程 {Task.CurrentId} 处理数字 {i}, 当前本地和为 {localSum}");
          return localSum; // 返回更新后的本地计数器
      },

      // 最终方法:处理每个线程的本地计数器最终值
      (finalLocalSum) =>
      {
          Console.WriteLine($"一个线程完成{Task.CurrentId},最终本地和为: {finalLocalSum}");
          Interlocked.Add(ref sum, finalLocalSum); // 使用原子操作将本地计数器加到全局计数器,确保线程安全,避免竞态条件
      }
  );
Console.WriteLine($"所有线程完成,最终和为: {sum}");

代码概述:使用Parallel.For进行并行循环,通过引用线程本地变量,累加从0到9的数字,并在每个线程完成时输出其本地计算的总和

当前代码使用的Parallel.For的完整签名为:

public static ParallelLoopResult For<TLocal>(
    int fromInclusive,
    int toExclusive,
    Func<TLocal> localInit,
    Func<int, ParallelLoopState, TLocal, TLocal> body,
    Action<TLocal> localFinally
)

参数解析:

  • fromInclusive:(起始索引,包含)
    • 类型: int
    • 作用:并行循环的起始索引,包含该值
  • toExclusive:(结束索引,不包含)
    • 类型:int
    • 作用:并行循环的结束索引,不包含该值
  • localInit(本地初始化器):
    • 类型:Func<TLocal>
    • 作用:一个函数,定义了每个线程(任务)的本地变量的初始值
    • 在示例代码中()=>0,表示每个线程的本地计数器初始值为0
  • body(循环主体):
    • 类型:Func<int,ParallelLoopState,TLocal,TLocal>
      • 参数说明:
        • int i:当前处理的元素索引
        • ParallelLoopState:用于控制循环的状态(如中断,停止)
        • TLocal local 线程的本地变量
      • 返回值 TLocal,即更新后的本地变量
      • 作用:定义了并行循环中要执行的操作,并可以更新本地变量
        (i, loop, localSum) =>
        {
            localSum += i;
            Console.WriteLine($"线程 {Task.CurrentId} 处理数字 {i}, 当前本地和为 {localSum}");
            return localSum;
        }
        
        在示例代码中:
        • 将当前索引i加到本地计数器localSum中
        • 输出当前线程处理的数字和(本地和)
        • 返回更新后的localSum
    • localFinally(本地最终操作)
      • 类型:Action<TLocal>
      • 作用:定义每个线程完成其任务后执行的动作,接收该线程的最终本地变量值

 代码执行流程:

1.初始化阶段:

  • 并行循环开始前,Parallel.For会为每个参与的线程调用localInit函数,初始化本地变量
  • 在示例代码中,每个线程的localSum的初始值为0

2.并行执行阶段:

  • 对于从0到9的每个索引i,Parallel.For会并行执行body函数
  • 线程轮流或同时处理不同的i值,更新其各自的localSum
  • 在body函数中
    • localSum+=i; 将当前的索引i加到本地变量localSum中
    • return localSum:返回更新后的本地变量,以便在下一次循环中使用

3.最终处理阶段:

  • 当一个线程完成了其被分配的索引之后,Parallel.For会调用localFinally函数,传递该线程的最终localSum值.

代码输出结果示例:

 根据以上的输出结果,我们可以注意到线程35处理了两个数字

第一次处理数字5时,局部和是5

接着又处理了数字9,局部和更新为5+9=14

由此我们可以得知,在并行执行中,任务的分配可能并不均匀,某些线程可能会被分配到多个任务(数字),因此会出现局部和超过单个数字值的情况

在示例代码中,存在 Interlocked.Add(ref sum,finalLocalSum)

这是一个线程安全的代码,用于在多线程的环境下安全地修改变量的值,避免竞态条件

Interlocked.Add是一种用于实现原子操作的同步机制,可以确保多个线程在并发修改同意变量时,不会产生竞态条件,从而保证数据的一致性和正确性,通过使用这样的原子操作,可以在多线程环境中安全的更新共享变量

5.结合Task和Parallel的高级示例
 

Console.WriteLine("\nTask和Parallel结合使用示例:");

// 创建一个任务数组,每个任务使用Parallel处理不同的数据集
var tasks = new List<Task>();

// 创建第一个并行处理任务
tasks.Add(Task.Run(() =>
{
    Parallel.For(0, 5, i =>
    {
        Console.WriteLine($"任务1处理数字 {i}, 线程Id: {Task.CurrentId}");
        Thread.Sleep(50);
    });
}));

// 创建第二个并行处理任务
tasks.Add(Task.Run(async () =>
{
    await Task.Delay(100); // 模拟一些异步操作
    Parallel.ForEach(new[] { "数据1", "数据2", "数据3" }, item =>
    {
        Console.WriteLine($"任务2处理项目: {item}, 线程Id: {Task.CurrentId}");
        Thread.Sleep(50);
    });
}));

// 等待所有任务完成
await Task.WhenAll(tasks);
Console.WriteLine("示例5所有任务已完成");

代码概述 :

通过Task和Parallel结合,实现多个(两个)任务下的并行处理

代码输出结果示例:

6.使用ParallelOptions结合异步操作

 Console.WriteLine("\n带有异步操作的Parallel示例:");
 var parallelOptions = new ParallelOptions
 {
     MaxDegreeOfParallelism = Environment.ProcessorCount//  为 CPU 核心数(自动去调度剩余空闲CPU)
 };

 await Task.Run(() =>
 {
     Parallel.For(0, 3, parallelOptions, async i =>
     {
         await Task.Delay(100); // 异步等待
         Console.WriteLine($"异步并行处理索引 {i}, 线程Id: {Environment.CurrentManagedThreadId}"); //通过Environment.CurrentManagedThreadId获取线程Id
     });
 });

 Console.WriteLine("示例6所有操作已完成");

 代码输出结果示例:

在输出结果中先打印"示例6所有操作已完成"说明Task中的任务并行任务在异步执行

总结

任务并行库(TPL)提供了丰富的方法来简化多线程和异步编程:

  • Parallel.For和Parallel.ForEach:用于并行执行循环或遍历集合
  • ParallelOptions:允许配置并行操作的行为,例如最大并行度和取消操作支持
  • 本地变量:在并行操作中使用本地变量,可以避免线程间的数据竞争,提高性能和安全性
  • 结合Task:通过并行操作嵌套在任务中,可以创建复杂的并行和异步流程

在使用TPL中,需要注意以下几点:

  • 线程安全:在并行操作中访问共享资源时,需确保线程安全,使用锁或线程安全的操作(如Interlocked类)
  • 资源管理:过高的并行度可能导致资源竞争和性能下降,应根据实际情况设置合适的并行长度
  • 异常处理:并行操作中的异常需要去妥善的处理,避免在线程崩溃导致程序不稳定
  • 取消和超时:使用CancellationToken支持任务的取消,在需要能够及时中断长时间的并行操作

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2278004.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

python 寻找数据拐点

import numpy as np import cv2 from scipy.signal import find_peaks# 示例数据 y_data [365.63258786, 318.34824281, 258.28434505, 228.8913738, 190.87220447, 158.28434505, 129.53035144, 111.95846645, 111.95846645, 120.26517572, 140.71246006, 161.79872204, 180.…

论文笔记-arXiv2025-A survey about Cold Start Recommendation

论文笔记-arXiv2025-Cold-Start Recommendation towards the Era of Large Language Models: A Comprehensive Survey and Roadmap 面向大语言模型&#xff08;LLMs&#xff09;时代的冷启动推荐&#xff1a;全面调研与路线图1.引言2.前言3.内容特征3.1数据不完整学习3.1.1鲁棒…

设计模式03:行为型设计模式之策略模式的使用情景及其基础Demo

1.策略模式 好处&#xff1a;动态切换算法或行为场景&#xff1a;实现同一功能用到不同的算法时和简单工厂对比&#xff1a;简单工厂是通过参数创建对象&#xff0c;调用同一个方法&#xff08;实现细节不同&#xff09;&#xff1b;策略模式是上下文切换对象&#xff0c;调用…

飞机电气系统技术分析:数字样机技术引领创新

现代飞机正向着更安全、环保和经济的方向发展&#xff0c;飞机系统的设计日益复杂&#xff0c;对各子系统的性能和可靠性也提出了更高要求。作为飞机的重要组成部分&#xff0c;电气系统&#xff08;Electrical System&#xff0c;ES&#xff09;不仅负责为各类机载设备提供稳定…

(01)FreeRTOS移植到STM32

一、以STM32的裸机工程模板 任意模板即可 二、去官网上下载FreeRTOS V9.0.0 源码 在移植之前&#xff0c;我们首先要获取到 FreeRTOS 的官方的源码包。这里我们提供两个下载 链 接 &#xff0c; 一 个 是 官 网 &#xff1a; http://www.freertos.org/ &#xff0c; 另…

【Unity-Game4Automation PRO 插件】

Game4Automation PRO 插件 是一个用于 Unity 引擎 的工业自动化仿真工具&#xff0c;它提供了对工业自动化领域的仿真和虚拟调试支持&#xff0c;特别是在与工业机器人、生产线、PLC 系统的集成方面。该插件旨在将工业自动化的实时仿真与游戏开发的高质量 3D 可视化能力结合起来…

element select 绑定一个对象{}

背景&#xff1a; select组件的使用&#xff0c;适用广泛的基础单选 v-model 的值为当前被选中的 el-option 的 value 属性值。但是我们这里想绑定一个对象&#xff0c;一个el-option对应的对象。 <el-select v-model"state.form.modelA" …

mybatis延迟加载、缓存

目录 一、所需表 二、延迟加载 1.延迟加载概念 2.立即加载和延迟加载的应用场景 3.多对一延迟加载查询演示 (1)实体类 User Account (2)AccountMapper接口 (3)AccountMapper.xml (4)UserMapper接口 (5)UserMapper.xml (6)在总配置文件(mybatis-config.xml)中开启延…

VIVADO FIFO (同步和异步) IP 核详细使用配置步骤

VIVADO FIFO (同步和异步) IP 核详细使用配置步骤 目录 前言 一、同步FIFO的使用 1、配置 2、仿真 二、异步FIFO的使用 1、配置 2、仿真 前言 在系统设计中&#xff0c;利用FIFO&#xff08;first in first out&#xff09;进行数据处理是再普遍不过的应用了&#xff0c…

一、1-2 5G-A通感融合基站产品及开通

1、通感融合定义和场景&#xff08;阅读&#xff09; 1.1通感融合定义 1.2通感融合应用场景 2、通感融合架构和原理&#xff08;较难&#xff0c;理解即可&#xff09; 2.1 感知方式 2.2 通感融合架构 SF&#xff08;Sensing Function&#xff09;&#xff1a;核心网感知控制…

某政务行业基于 SeaTunnel 探索数据集成平台的架构实践

分享嘉宾&#xff1a;某政务公司大数据技术经理 孟小鹏 编辑整理&#xff1a;白鲸开源 曾辉 导读&#xff1a;本篇文章将从数据集成的基础概念入手&#xff0c;解析数据割裂给企业带来的挑战&#xff0c;阐述数据集成的重要性&#xff0c;并对常见的集成场景与工具进行阐述&…

【MySQL】使用C语言链接

&#x1f308; 个人主页&#xff1a;Zfox_ &#x1f525; 系列专栏&#xff1a;MySQL 目录 一&#xff1a;&#x1f525; MySQL connect &#x1f98b; Connector / C 使用&#x1f98b; mysql 接口介绍&#x1f98b; 完整代码样例 二&#xff1a;&#x1f525; 共勉 一&#…

《Java核心技术II》并行流

并行流 从集合中获取并行流&#xff1a;Stream paralleWords words.parallelStream(); parallel方法将任意顺序流转换为并行流&#xff1a;Stream paralleWords Stream.of(wordArray).parallel(); 以下是不好的示范&#xff0c;假设对字符串的所有短单词计数&#xff1a; …

【Rust自学】13.2. 闭包 Pt.2:闭包的类型推断和标注

13.2.0. 写在正文之前 Rust语言在设计过程中收到了很多语言的启发&#xff0c;而函数式编程对Rust产生了非常显著的影响。函数式编程通常包括通过将函数作为值传递给参数、从其他函数返回它们、将它们分配给变量以供以后执行等等。 在本章中&#xff0c;我们会讨论 Rust 的一…

ETW HOOK[InfinityHook]技术解析

文章目录 概述分析过程参考资料 概述 ETW是操作系统为了对系统调用、异常等信息做了一个日志操作&#xff0c;本质就是在进行调用这些中断、异常、系统调用时会走向这个代码函数区域日志保存的功能。而ETW HOOK就是在驱动层微软的PatchGuard并未对其做到很好的检测&#xff0c…

码编译安装httpd 2.4,测试

下载链接&#xff1a;https://dlcdn.apache.org/httpd/httpd-2.4.62.tar.gz [rootopenEuler-1 ~]# yum install gcc gcc-c make -y [rootopenEuler-1 ~]# ll /root total 9648 -rw-------. 1 root root 920 Jan 10 17:15 anaconda-ks.cfg -rw-r--r-- 1 root root 9872432…

步入响应式编程篇(一)

响应式编程 为什么要有响应式编程&#xff1f;响应式编程的用法Flow api的用法处理器 为什么要有响应式编程&#xff1f; 传统编码&#xff0c;操作流程常见的是命令式编程范式&#xff0c;如对于一个请求或操作来说&#xff0c;都是串行执行&#xff0c;直到异常或执行结束&a…

C++—18、C++ 中如何写类

一、类的功能阐述 今天我们将用目前学到的类的基础知识从头开始编写一个类。只编写一个基本的Log类&#xff0c;来演示到目前为止我们学过的一些基本特性。随着接下来的学习你会看到从一个类的基本版本到一个更高级版本的过程和区别。高级版本可以做同样的事情&#xff0c;但可…

SW - 查看装配图中的零件的全路径名称

文章目录 SW - 查看装配图中的零件的全路径名称概述笔记END SW - 查看装配图中的零件的全路径名称 概述 装配图中&#xff0c;如果本机有多个不同版本的同名零件(e.g. v1/p1零件, v2/p1零件)&#xff0c;在装配图中想确认是哪个版本的零件。 如果编辑错了文件&#xff0c;或者…

【开源分享】nlohmann C++ JSON解析库

文章目录 1. Nlohmann JSON 库介绍2. 编译和使用2.1 获取库2.2 包含头文件2.3 使用示例2.4 编译 3. 优势4. 缺点5. 总结参考 1. Nlohmann JSON 库介绍 Nlohmann JSON 是一个用于 C 的现代 JSON 库&#xff0c;由 Niels Lohmann 开发。它以易用性和高性能著称&#xff0c;支持 …