目录
一.任务并行库的概念以及定义
二.主要特性
三.代码使用示例
1.最基础的Parallel.For使用方式
2.使用 ParallelOptions 来控制并行执行
3.Parallel.ForEach的使用(用于处理集合)
4.带有本地变量的并行循环(用于需要累加或统计的场景)
5.结合Task和Parallel的高级示例
6.使用ParallelOptions结合异步操作
总结
一.任务并行库的概念以及定义
在C#中,任务并行库(Task Parallel Library,简称TPL)是.NET Framework和.NET Core中提供的一组用于简化并行编程和异步编程的公共类型和API,旨在帮助开发人员更容易的利用多核处理器的能力,提高应用程序的性能和响应速度
任务(Task):
TPL中的核心概念是任务,它表示一个异步操作,可以理解为某项工作或操作的抽象封装.
任务可以异步的执行,并可以返回结果,报告进度,处理取消和异常等
数据并行(Data Parallelism):
通过并行执行集合中的各个元素的操作,如使用Parallel.For和Parallel.ForEach方法,可以同时处理大量数据,提高数据处理的效率
任务并行(Task Parallelism):
利用任务来并发的执行不同的方法或操作,适用于多个操作相互独立,可以同时执行的场景
二.主要特性
- 1.简化并行编程:TPL提供了更高级别的抽象,开发者无需直接管理线程的创建,同步等复杂细节
- 2.任务调度:内置的任务调度器会智能地将任务映射到线程池中的线程上,优化资源使用
- 3.异常处理:提供了机制来捕获和处理并行执行中产生的异常,确保应用程序的稳定性
- 4.任务组合:支持将多个任务链接到一起,形成任务的连续执行和组合(如 ContinueWith方法)
- 5.取消和超时:提供了取消令牌(Cancellation Token),允许在需要时取消任务的执行
- 6.异步和编程支持:与 async 和 await 关键字结合使用,可以编写简单的异步代码,提升应用程序的响应性
三.代码使用示例
1.最基础的Parallel.For使用方式
Console.WriteLine("\n基础的 Parallel.For 示例:");
// 这里的 (0, 5) 表示从0开始到4结束(不包含5)
// i => { } 是一个委托,表示对每个数字要执行的操作
Parallel.For(0, 5, i =>
{
Console.WriteLine($"正在并行处理数字 {i}");
Thread.Sleep(100); // 模拟一些耗时的工作
});
代码输出结果示例:
因为是并行处理,所以执行顺序不是固定的.
2.使用 ParallelOptions 来控制并行执行
在C#中, ParallelOptions 是用于配置并行操作行为的选项类,主要用于控制Parallel类提供的并行循环和任务的执行方式
Parallel类包含如:Parallel.For ,Parallel.ForEach和Parallel.Invoke等方法.它们可以利用多核处理器并行执行任务
ParallelOptions的主要属性:
- MaxDegreeOfParallelism:指定并行操作的最大并行度,即同时运行的最大线程数,默认情况下MaxDegreeOfParallelism为-1,表示不限制并行度,线程数由.NET框架根据可用的处理器核心数量自动调度
- CancellationToken:用于接收取消请求的令牌,可以在并行操作中响应取消操作.支持任务的取消操作,当需要在某些条件下中止并行任务时,可以使用CancellationTokenSource生成令牌,并在操作中监视CancellationToken的取消请求
- TaskScheduler:指定任务的调度器,控制任务的执行上下文,如果不设置默认使用TaskScheduler.Default
Console.WriteLine("\n使用 ParallelOptions 的 Parallel.For:");
// 创建一个 CancellationTokenSource,用于发送取消请求
CancellationTokenSource cts = new CancellationTokenSource();
//配置ParallelOptions
var options = new ParallelOptions
{
// Environment.ProcessorCount 获取CPU的核心数
// MaxDegreeOfParallelism 控制最大同时执行的任务数
MaxDegreeOfParallelism = 2,// 这里限制最多同时执行2个任务
CancellationToken = cts.Token // 将取消令牌传递给并行循环
};
try
{
// 使用配置了选项的并行循环
Parallel.For(0, 5, options, i =>
{
Console.WriteLine($"使用受限并行度处理数字 {i},线程Id: {Task.CurrentId}");
// 可选:在工作中检查取消请求
options.CancellationToken.ThrowIfCancellationRequested();
Thread.Sleep(100);
Console.WriteLine($"完成处理索引 {i}");
});
}
catch (OperationCanceledException)
{
Console.WriteLine("操作已被取消。");
}
finally
{
cts.Dispose();
}
代码输出结果示例:
代码中限制了最多同时执行两个线程的任务
当两个线程中的其中一个线程完成任务并释放后才会处理下一个任务
3.Parallel.ForEach的使用(用于处理集合)
Console.WriteLine("\n使用 Parallel.ForEach 示例:");
// 创建一个简单的字符串列表
List<string> fruits = new List<string>
{
"苹果", "香蕉", "橙子", "葡萄"
};
// 并行处理列表中的每一项
Parallel.ForEach(fruits, fruit =>
{
Console.WriteLine($"正在处理水果: {fruit},线程Id: {Task.CurrentId}");
Thread.Sleep(100);
Console.WriteLine($"完成处理水果: {fruit}");
});
代码示例中的Parallel.ForEach方法概述:
第一个参数: fruits
- 类型: IEnumberable<T>,在这是时IEnumberable<string>
- 代表要迭代的集合,也就是我们要并行处理的元素序列(在当前示例中指代fruits)
第二个参数: fruit=>{...}
- 类型:Action<T> 这里是Action<string>
- 这是一个委托,表示要对每个元素执行的操作.在这里我们使用了 Lambda表达式 定义这个操作
- fruit: Lambda表达式的参数,代表当前正在处理的集合元素.在每次的迭代中,fruit将被赋值为fruits集合中的一个元素
代码输出结果示例:
4.带有本地变量的并行循环(用于需要累加或统计的场景)
Console.WriteLine("\n带本地变量的 Parallel.For 示例:");
int sum = 0; // 定义一个全局计数器
Parallel.For(0, 10,
// 初始化方法:为每个并行任务创建一个本地计数器
() => 0,
// 本体方法:处理每个数字并更新本地计数器
(i, loop, localSum) =>
{
localSum += i; // 将当前数字加到本地计数器
Console.WriteLine($"线程 {Task.CurrentId} 处理数字 {i}, 当前本地和为 {localSum}");
return localSum; // 返回更新后的本地计数器
},
// 最终方法:处理每个线程的本地计数器最终值
(finalLocalSum) =>
{
Console.WriteLine($"一个线程完成{Task.CurrentId},最终本地和为: {finalLocalSum}");
Interlocked.Add(ref sum, finalLocalSum); // 使用原子操作将本地计数器加到全局计数器,确保线程安全,避免竞态条件
}
);
Console.WriteLine($"所有线程完成,最终和为: {sum}");
代码概述:使用Parallel.For进行并行循环,通过引用线程本地变量,累加从0到9的数字,并在每个线程完成时输出其本地计算的总和
当前代码使用的Parallel.For的完整签名为:
public static ParallelLoopResult For<TLocal>(
int fromInclusive,
int toExclusive,
Func<TLocal> localInit,
Func<int, ParallelLoopState, TLocal, TLocal> body,
Action<TLocal> localFinally
)
参数解析:
- fromInclusive:(起始索引,包含)
- 类型: int
- 作用:并行循环的起始索引,包含该值
- toExclusive:(结束索引,不包含)
- 类型:int
- 作用:并行循环的结束索引,不包含该值
- localInit(本地初始化器):
- 类型:Func<TLocal>
- 作用:一个函数,定义了每个线程(任务)的本地变量的初始值
- 在示例代码中()=>0,表示每个线程的本地计数器初始值为0
- body(循环主体):
- 类型:Func<int,ParallelLoopState,TLocal,TLocal>
- 参数说明:
- int i:当前处理的元素索引
- ParallelLoopState:用于控制循环的状态(如中断,停止)
- TLocal local 线程的本地变量
- 返回值 TLocal,即更新后的本地变量
- 作用:定义了并行循环中要执行的操作,并可以更新本地变量
在示例代码中:(i, loop, localSum) => { localSum += i; Console.WriteLine($"线程 {Task.CurrentId} 处理数字 {i}, 当前本地和为 {localSum}"); return localSum; }
- 将当前索引i加到本地计数器localSum中
- 输出当前线程处理的数字和(本地和)
- 返回更新后的localSum
- 参数说明:
- localFinally(本地最终操作)
- 类型:Action<TLocal>
- 作用:定义每个线程完成其任务后执行的动作,接收该线程的最终本地变量值
- 类型:Func<int,ParallelLoopState,TLocal,TLocal>
代码执行流程:
1.初始化阶段:
- 并行循环开始前,Parallel.For会为每个参与的线程调用localInit函数,初始化本地变量
- 在示例代码中,每个线程的localSum的初始值为0
2.并行执行阶段:
- 对于从0到9的每个索引i,Parallel.For会并行执行body函数
- 线程轮流或同时处理不同的i值,更新其各自的localSum
- 在body函数中
- localSum+=i; 将当前的索引i加到本地变量localSum中
- return localSum:返回更新后的本地变量,以便在下一次循环中使用
3.最终处理阶段:
- 当一个线程完成了其被分配的索引之后,Parallel.For会调用localFinally函数,传递该线程的最终localSum值.
代码输出结果示例:
根据以上的输出结果,我们可以注意到线程35处理了两个数字
第一次处理数字5时,局部和是5
接着又处理了数字9,局部和更新为5+9=14
由此我们可以得知,在并行执行中,任务的分配可能并不均匀,某些线程可能会被分配到多个任务(数字),因此会出现局部和超过单个数字值的情况
在示例代码中,存在 Interlocked.Add(ref sum,finalLocalSum)
这是一个线程安全的代码,用于在多线程的环境下安全地修改变量的值,避免竞态条件
Interlocked.Add是一种用于实现原子操作的同步机制,可以确保多个线程在并发修改同意变量时,不会产生竞态条件,从而保证数据的一致性和正确性,通过使用这样的原子操作,可以在多线程环境中安全的更新共享变量
5.结合Task和Parallel的高级示例
Console.WriteLine("\nTask和Parallel结合使用示例:");
// 创建一个任务数组,每个任务使用Parallel处理不同的数据集
var tasks = new List<Task>();
// 创建第一个并行处理任务
tasks.Add(Task.Run(() =>
{
Parallel.For(0, 5, i =>
{
Console.WriteLine($"任务1处理数字 {i}, 线程Id: {Task.CurrentId}");
Thread.Sleep(50);
});
}));
// 创建第二个并行处理任务
tasks.Add(Task.Run(async () =>
{
await Task.Delay(100); // 模拟一些异步操作
Parallel.ForEach(new[] { "数据1", "数据2", "数据3" }, item =>
{
Console.WriteLine($"任务2处理项目: {item}, 线程Id: {Task.CurrentId}");
Thread.Sleep(50);
});
}));
// 等待所有任务完成
await Task.WhenAll(tasks);
Console.WriteLine("示例5所有任务已完成");
代码概述 :
通过Task和Parallel结合,实现多个(两个)任务下的并行处理
代码输出结果示例:
6.使用ParallelOptions结合异步操作
Console.WriteLine("\n带有异步操作的Parallel示例:");
var parallelOptions = new ParallelOptions
{
MaxDegreeOfParallelism = Environment.ProcessorCount// 为 CPU 核心数(自动去调度剩余空闲CPU)
};
await Task.Run(() =>
{
Parallel.For(0, 3, parallelOptions, async i =>
{
await Task.Delay(100); // 异步等待
Console.WriteLine($"异步并行处理索引 {i}, 线程Id: {Environment.CurrentManagedThreadId}"); //通过Environment.CurrentManagedThreadId获取线程Id
});
});
Console.WriteLine("示例6所有操作已完成");
代码输出结果示例:
在输出结果中先打印"示例6所有操作已完成"说明Task中的任务并行任务在异步执行
总结
任务并行库(TPL)提供了丰富的方法来简化多线程和异步编程:
- Parallel.For和Parallel.ForEach:用于并行执行循环或遍历集合
- ParallelOptions:允许配置并行操作的行为,例如最大并行度和取消操作支持
- 本地变量:在并行操作中使用本地变量,可以避免线程间的数据竞争,提高性能和安全性
- 结合Task:通过并行操作嵌套在任务中,可以创建复杂的并行和异步流程
在使用TPL中,需要注意以下几点:
- 线程安全:在并行操作中访问共享资源时,需确保线程安全,使用锁或线程安全的操作(如Interlocked类)
- 资源管理:过高的并行度可能导致资源竞争和性能下降,应根据实际情况设置合适的并行长度
- 异常处理:并行操作中的异常需要去妥善的处理,避免在线程崩溃导致程序不稳定
- 取消和超时:使用CancellationToken支持任务的取消,在需要能够及时中断长时间的并行操作