任务(task)
异步编程(async&await)
并发编程概述
前言
说实话,在我软件开发的头两年几乎不考虑并发编程,请求与响应把业务逻辑尽快完成一个星期的任务能两天完成绝不拖三天(剩下时间各种浪),根本不会考虑性能问题(能接受范围内)。但随着工作内容的变化,一些问题,它的解决方案已经让我避不开并发编程这一块知识点了。为了一劳永逸,此系列与并发编程有关的系列文章诞生,希望对各有有所帮助。
基础术语
- 同步(synchronization):关于协调线程或进程之间的活动,并确保被多个线程或进程访问的数据一直有效,同步允许线程和进程一致地操作。
- 并发(concurrency):同时做多间事情,是关于程序的各个方面的合作和串联工作,以实现目标。
- 进程(Process):一个具有一定独立功能的程序关于某个数据集合的一次运行活动,是系统进行资源分配和调度运行的基本单位,当一个程序开始运行时,它在系统中奖开启一个或多个进程,一个进程又有多个线程组成。
- 线程(thread):代表程序中的单个执行逻辑流程,是一个独立处理的执行路径,是轻量级的进程。
- 多线程(multithreading):多个线程来执行程序。并发的一种形式,但不是唯一的方式。
- 并行处理:把正在执行的大量的任务分割成小块,分配给多个同时运行的线程。多线程的一种。
- 异步编程:并发的一种形式,采用future模式或回调(callback)机制,以避免产生不必要的线程。
异步编程
异步编程并不一定要用多线程去实现,多线程只是其中一种实现手段。在.Net中,新版funture类型有Task和Task。
老式异步编程API中采用回调或事件(event)。异步编程的核心理念是异步操作。
- 异步操作:启动了的操作将会在一段时间后完成。这个操作执行时,不会阻塞原来的线程。启动了这个操作的线程,可以继续执行其他任务。当操作完成时,会通知它的future或调用回调函数,以便让程序指导操作已经结束。
- 响应式编程:一种声明式的编程模式,程序在该模式中对事件做出响应,区别于异步编程是因为它是基于异步事件(asynchronous
evnt)。并发编程的一种形式。
I/O密集与计算密集
- I/O密集(I/O-bound):如果一个操作将大部分时间用于等待一个条件的产生,那么它就被成为I/O密集操作。
- 计算密集(compute-bound):如果一个操作将大部分时间用于执行CPU密集操作,那么它被称为计算密集操作。
并发编程
优秀软件的关键特征就是具有并发性,程序在同一时间做着更多的事情,而不是过去我们看到的一种单请求单响应。智能化、高用户体验的程序已经离不并发编程。
并行编程(Parallel Framework)
前言
并行编程:通过编码方式利用多核或多处理器称为并行编程,多线程概念的一个子集。
并行处理:把正在执行的大量的任务分割成小块,分配给多个同时运行的线程。多线程的一种。
并行编程分为如下几个结构:
1.并行的LINQ或PLINQ
2.Parallel类
3.任务并行结构
4.并发集合
5.SpinLock和SpinWait
这些是.NET 4.0引入的功能,一般被称为PFX(Parallel Framework,并行框架)。
Parallel类和任务并行结构称为TPL(Task Parallel Library,任务并行库)。
并行框架(PFX)
1.并行框架基础
当前CPU技术达到瓶颈,而制造商将关注重点转移到提高内核技术上,而标准单线程代码并不会因此而自动提高运行速度。
利用多核提升程序性能通常需要对计算密集型代码进行一些处理:
1.将代码划分成块。
2.通过多线程并行执行这些代码块。
3.结果变为可用后,以线程安全和高性能的方式整合这些结果。
传统多线程结构虽然实现功能,但难度颇高且不方便,特别是划分和整理的步骤(本质问题是:多线程同时使用相同数据时,出于线程安全考虑进行锁定的常用策略会引发大量竞争)。
而并行框架(Parallel Framework)专门用于在这些应用场景中提供帮助。
2.并行框架组成
PFX:高层由两个数据并行API组成:PLINQ或Parallel类。底层包含任务并行类和一组另外的结构为并行编程提供帮助。
基础并行语言集成查询(PLINQ)
语言集成查询(Language Integrated Query,LINQ)提供了一个简捷的语法来查询数据集合。而这种由一个线程顺序处理数据集合的方式我们称为顺序查询(sequential query)。
并行语言集成查询(Parallel LINQ)是LINQ的并行版。它将顺序查询转换为并行查询,在内部使用任务,将集合中数据项的处理工作分散到多个CPU上,以并发处理多个数据项。
PLINQ将自动并行化本地的LINQ查询,System.Linq.ParallelEnumerable类(它定义在System.Core.dll中,需要引用System.Linq)公开了所有标准LINQ操作符的并行版本。这些所有方法是依据System.Linq.ParallelQuery扩展而来。
1.LINQ to PLINQ
要让LINQ查询调用并行版本,必须将自己的顺序查询(基于IEnumerable或IEnumerable)转换成并行查询(基于ParallelQuery或ParallelQuery),使用ParallelEnumerable的AsParallel方法实现,如示例:
1.PLINQ执行模型
Parallel类
Parallel类是对线程的一个很好的抽象。该类位于System.Threading.Tasks命名空间中,提供了数据和任务并行性。
PFX通过Parallel类中的三个静态方法,提供了一种基本形式的结构化并行机制:
1.Parallel.Invoke
Parallel.Invoke:用于并行执行一组委托,示例如下:
任务并行
对于任务并行的内容,请戳 任务(Task) 和 异步编程(async&await)。
2.Parallel.For
Parallel.For:执行C# for循环的并行化等价循环,示例如下:
class ParallelDemo
{
static void Main(string[] args)
{
//顺序循环
{
for (int i = 0; i < 10; i++)
{
Test(i);
}
}
Console.WriteLine("并行化for开始");
//顺序执行转换为并行化
{
Parallel.For(0, 10, i => Test(i));
}
//顺序执行转换为并行化(更简单的方式)
{
Parallel.For(0, 10, Test);
}
Console.ReadKey();
}
static void Test(int i)
{
Console.WriteLine($"当前线程Id:{Thread.CurrentThread.ManagedThreadId},输出结果为:{i}");
}
}
3.Parallel.ForEach
Parallel.ForEach:执行C# foreach循环的并行化等价循环,示例如下:
static void Main(string[] args)
{
顺序循环
//{
// for (int i = 0; i < 10; i++)
// {
// Test(i);
// }
//}
//Console.WriteLine("并行化for开始");
顺序执行转换为并行化
//{
// Parallel.For(0, 10, i => Test(i));
//}
顺序执行转换为并行化(更简单的方式)
//{
// Parallel.For(0, 10, Test);
//}
string[] data = { "zero", "one", "two", "three", "four", "five", "six", "seven", "eight", "nine" };
//顺序循环
{
foreach (string num in data)
{
Test(num);
}
}
Console.WriteLine("并行化foreach开始");
//顺序执行转换为并行化
{
Parallel.ForEach(data, num => Test(num));
}
Console.ReadKey();
Console.ReadKey();
}
static void Test(int i)
{
Console.WriteLine($"当前线程Id:{Thread.CurrentThread.ManagedThreadId},输出结果为:{i}");
}
static void Test(string i)
{
Console.WriteLine($"当前线程Id:{Thread.CurrentThread.ManagedThreadId},输出结果为:{i}");
}
4.索引&跳出(ParallelLoopState)
有时迭代索引很有用处,但是切忌不可像顺序循环的用法使用共享变量(循环内i++)的方式使用,因为共享变量值在并行上下文中是线程不安全的。
同样的,因为并行For或ForEach中的循环体是一个委托,所以无法使用break语句提前退出循环,必须调用ParallelLoopState对象上的Break或Stop方法。
以ForEach为例,ForEach重载的其中之一如下,它包含Acton的其中有三个参数(TSourec=子元素,ParallelLoopState=并行循环状态,long=索引):
public static ParallelLoopResult ForEach<TSource>(IEnumerable<TSource> source, Action<TSource, ParallelLoopState, long> body)
``
```csharp
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace ConsoleApp2
{
class ParallelDemo
{
static void Main(string[] args)
{
顺序循环
//{
// for (int i = 0; i < 10; i++)
// {
// Test(i);
// }
//}
//Console.WriteLine("并行化for开始");
顺序执行转换为并行化
//{
// Parallel.For(0, 10, i => Test(i));
//}
顺序执行转换为并行化(更简单的方式)
//{
// Parallel.For(0, 10, Test);
//}
string[] data = { "zero", "one", "two", "three", "four", "five", "six", "seven", "eight", "nine" };
//顺序循环
//{
// foreach (string num in data)
// {
// Test(num);
// }
//}
Console.WriteLine("并行化foreach开始");
//顺序执行转换为并行化
{
Parallel.ForEach(data, (num , state, i)=>
{
Console.WriteLine($"当前索引为:{i},状态为:{state}");
Test(num);
if (num == "six")
state.Break();
});
}
Console.ReadKey();
Console.ReadKey();
}
static void Test(int i)
{
Console.WriteLine($"当前线程Id:{Thread.CurrentThread.ManagedThreadId},输出结果为:{i}");
}
static void Test(string i)
{
Console.WriteLine($"当前线程Id:{Thread.CurrentThread.ManagedThreadId},输出结果为:{i}");
}
}
}
并发集合概述
.NET 4.0在System.Collections.Concurrent命名空间中提供了一组新的集合。所有这些集合都完全是线程安全的:
这些集合不仅是为使用带锁的普通集合提供了快捷方式,而且可以在一般的多线程中使用并发集合,但需要注意:
1.并发集合针对并行编程进行了调整。只有在高度并发的应用场景中,传统集合的性能才能胜过它们。
2.线程安全的集合不能确保使用它的代码也是安全的。
3.如果枚举一个并发集合的同时,另一个线程要修改它,不会抛出任何异常,相反,得到旧内容与新内容的混合。
4.不存在任何List的并发版本。
5.它们的内存利用率没有非并发的Stack和Queue类高效,但对于并发访问的效果更好。
1.结构概述
这些并发集合与传统集合的区别是:它们公开了特殊方法来执行原子测试和行动操作,而这些方法都是通过IProducerConsumerCollection接口提供的。
IProducerConsumerCollection接口代表一个线程安全的生产者/消费者集合,这三个类继承并实现了IProducerConsumerCollection接口:
ConcurrentStack、ConcurrentQueue、ConcurrentBag。
它们实现的TryAdd和TryTake方法用于测试一个添加/删除操作能否执行,如果可以,则执行添加/删除操作。测试与行动不需要对传统集合上锁。
ConcurrentBag用于保存对象的无需集合,适用于调用Take或TryTake时不关心获取那个元素的额情况。
相对于并发队列或堆栈,在多线程同时调用一个ConcurrentBag的Add时,不存在竞争,但队列或堆栈并行调用Add会引起一些竞争,所以ConcurrentBag上调用Take方法非常高效。
BlockingCollection类似阻塞集合,适用于等待新元素的出现,可以把它看作一个容器,使用一个阻塞集合封装所有实现IProducerConsumerCollection的集合,并且允许从封装的集合中去除元素,若没有元素,操作会阻塞
2.基础方法
常用的一些方法,整理自 zy__ :
ConcurrentQueue:完全无锁,但面临资源竞争失败时可能会陷入自旋并重试操作。
Enqueue:在队尾插入元素
TryDequeue:尝试删除队头元素,并通过out参数返回
TryPeek:尝试将对头元素通过out参数返回,但不删除该元素。
ConcurrentStack:完全无锁,但面临资源竞争失败时可能会陷入自旋并重试操作。
Push:向栈顶插入元素
TryPop:从栈顶弹出元素,并且通过out 参数返回
TryPeek:返回栈顶元素,但不弹出。
ConcurrentBag:一个无序的集合,程序可以向其中插入元素,或删除元素。在同一个线程中向集合插入,删除元素的效率很高。
Add:向集合中插入元素
TryTake:从集合中取出元素并删除
TryPeek:从集合中取出元素,但不删除该元素。
BlockingCollection:一个支持界限和阻塞的容器
Add :向容器中插入元素
TryTake:从容器中取出元素并删除
TryPeek:从容器中取出元素,但不删除。
CompleteAdding:告诉容器,添加元素完成。此时如果还想继续添加会发生异常。
IsCompleted:告诉消费线程,生产者线程还在继续运行中,任务还未完成。
ConcurrentDictionary:对于读操作是完全无锁的,当很多线程要修改数据时,它会使用细粒度的锁。
AddOrUpdate:如果键不存在,方法会在容器中添加新的键和值,如果存在,则更新现有的键和值。
GetOrAdd:如果键不存在,方法会向容器中添加新的键和值,如果存在则返回现有的值,并不添加新值。
TryAdd:尝试在容器中添加新的键和值。
TryGetValue:尝试根据指定的键获得值。
TryRemove:尝试删除指定的键。
TryUpdate:有条件的更新当前键所对应的值。
GetEnumerator:返回一个能够遍历整个容器的枚举器。