任务(task)
一、 任务概述
- 线程(Thread)是创建并发的底层工具,因此有一定的局限性(不易得到返回值(必须通过创建共享域);异常的捕获和处理也麻烦;同时线程执行完毕后无法再次开启该线程),这些局限性会降低性能同时影响并发性的实现(不容易组合较小的并发操作实现较大的并发操作,会增加手工同步处理(加锁,发送信号)的依赖,容易出现问题)。
- 线程池的(ThreadPool)的QueueUserWorkItem方法很容发起一次异步的计算限制操作。但这个技术同样有着许多限制,最大的问题是没有内建的机制让你知道操作在什么时候完成,也没有机制在操作完成时获得返回值。
而Task类可以解决上述所有的问题。
- 任务(Task)表示一个通过或不通过线程实现的并发操作,任务是可组合的,使用延续(continuation)可将它们串联在一起,它们可以使用线程池减少启动延迟,可使用回调方法避免多个线程同时等待I/O密集操作。
二、 基础任务(Task)
微软在.NET 4.0 引入任务(Task)的概念。通过System.Threading.Tasks命名空间使用任务。它是在ThreadPool的基础上进行封装的。Task默认都是使用池化线程,它们都是后台线程,这意味着主线程结束时其它任务也会随之停止。
启动一个任务有多种方式,如以下示例:
//----------1.基础任务(Task)------------------------
//第一种线程启动方式: 实例化方式Start启动
{
Task task = new Task(()=> {
Test("one-ok");
});
}
//第二种方式:通过Task类静态方法Run进行启动
Task.Run(()=> {
Test("two-ok");
});
//第三种方式:通过TaskFactory 启动
TaskFactory taskFactory = new TaskFactory();
taskFactory.StartNew(()=> {
Test("three-ok");
});
//第四种方式:通过TaskFactory的StartNew方法启动
Task task1 = Task.Factory.StartNew(()=> {
Test("four-ok");
});
//第五种:通过Task对象的RunSynchronously方法启动(同步,由主线程执行,会卡主线程)
Task taskRunSync = new Task(()=> {
Console.WriteLine("线程Id:{0},执行方法:five-ok", Thread.CurrentThread.ManagedThreadId);
});
taskRunSync.RunSynchronously();
Thread.Sleep(1000);
ThreadPool.GetAvailableThreads(out workerThreadsCount, out completionPortThreadsCount);
Console.WriteLine("剩余工作线程数:{0},剩余IO线程数{1}", workerThreadsCount, completionPortThreadsCount);
Console.ReadKey();
三、 返回值(Task)&状态(Status)
Task有一个泛型子类Task,它允许任务返回一个值。调用Task.Run,传入一个Func代理或兼容的Lambda表达式,然后查询Result属性获得结果。如果任务没有完成,那么访问Result属性会阻塞当前线程,直至任务完成。
public static Task<TResult> Run<TResult>(Func<TResult> function);
而任务的Status属性可用于跟踪任务的执行状态,如下所示:
Reulst属性内部会调用Wait(等待);
任务的Status属性是一个TaskStatus枚举类型:
public TaskStatus Status { get; }
枚举值 | 说明 |
---|---|
Canceled | 任务已通过对其自身的 CancellationToken 引发 OperationCanceledException 对取消进行了确认,此时该标记处于已发送信号状态;或者在该任务开始执行之前,已向该任务的 CancellationToken 发出了信号。Created 该任务已初始化,但尚未被计划。 |
Created | 该任务已初始化,但尚未被计划。 |
Faulted | 由于未处理异常的原因而完成的任务。 |
RanToCompletion | 已完成执行的任务。 |
Running | 任务正在运行,尚未完成。 |
WaitingForActivation | 该任务正在等待 .NET Framework 基础结构在内部将其激活并进行计划。 |
WaitingForChildrenToComplete | 该任务已完成执行,正在隐式等待附加的子任务完成。 |
WaitingToRun | 该任务已被计划执行,但尚未开始执行。 |
//--------2.返回值(Task<TResult>)&状态(Status)-----------------
Task<int> taks22 = (Task<int>)Task.Run(()=> {
int total = 0;
for( int i=0;i<=100;i++)
{
total += i;
}
Thread.Sleep(2000);
return total;
});
Console.WriteLine("任务状态:{0}", taks22.Status);
Thread.Sleep(1000);
Console.WriteLine("任务状态:{0}", taks22.Status);
int totalCount = taks22.Result;//如果任务没有完成,则阻塞
Console.WriteLine("任务状态:{0}", taks22.Status);
Console.WriteLine("总数为:{0}", totalCount);
Console.ReadKey();
四、 任务集合返回值(WhenAll&WhenAny)
Task中有非常方便的对并行运行的任务集合获取返回值的方式,比如WhenAll和WhenAny。
1.WhenAll:等待提供的所有 Task 对象完成执行过程(所有任务全部完成)。
List<Task<int>> taskList = new List<Task<int>>();
TaskFactory taskFactory1 = new TaskFactory();
for (int i = 0; i < 5; i++)
{
int total = i;
Task<int> task = taskFactory1.StartNew(() =>Test1(total) );
taskList.Add(task);
}
Console.WriteLine("主线程Id:{0},继续执行A.....", Thread.CurrentThread.ManagedThreadId);
Task<int[]> taskReulstList = Task.WhenAll(taskList);//创建一个任务,该任务将集合中的所有 Task 对象都完成时完成
for (int i = 0; i < taskReulstList.Result.Length; i++)//这里调用了Result,所以会阻塞线程,等待集合内所有任务全部完成
{
Console.WriteLine("返回值:{0}", taskReulstList.Result[i]);//遍历任务集合内Task返回的值
}
Console.WriteLine("主线程Id:{0},继续执行B.....", Thread.CurrentThread.ManagedThreadId);
2. WhenAny:等待提供的任一 Task 对象完成执行过程(只要有一个任务完成)。
List<Task<int>> tasllistAnyAll = new List<Task<int>>();
TaskFactory taskFactory2 = new TaskFactory();
for (int i = 0; i < 5; i++)
{
int total = i;
Task<int> task = taskFactory2.StartNew(()=>Test1(total));
tasllistAnyAll.Add(task);
}
Console.WriteLine("主线程Id:{0},继续执行A.....", Thread.CurrentThread.ManagedThreadId);
//创建一个任务,该任务将在集合中的任意 Task 对象完成时完成
Task<Task<int>> taskReulstList1 = Task.WhenAny(tasllistAnyAll);
Console.WriteLine("返回值:{0}", taskReulstList1.Result.Result);//得到任务集合内最先完成的任务的返回值
五、 等待(Wait)&执行方式(TaskCreationOptions)
1.任务等待(Wait)
调用任务的Wait方法可以阻塞任务直至任务完成,类似于线程的join。
//--------5.等待(Wait)&执行方式(TaskCreationOptions)-----------------
Task task2 = Task.Run(()=> {
Console.WriteLine("线程执行Begin");
Thread.Sleep(2000);
Console.WriteLine("线程执行End");
});
Console.WriteLine("任务是否完成:{0}", task2.IsCompleted);
task2.Wait();//阻塞,直至任务完成
Console.WriteLine("任务是否完成:{0}", task2.IsCompleted);
Console.ReadKey();
注意:
线程调用Wait方法时,系统检测线程要等待的Task是否已经开始执行。如果是线程则会阻塞直到Task运行结束为止。但如果Task还没有开始执行任务,系统可能(取决于TaskScheduler)使用调用Wait的线程来执行Task,这种情况下调用Wait的线程不会阻塞,它会执行Task并立即返回。好处在于没有线程会被阻塞,所以减少了资源占用。不好的地方在于加入线程在调用Wait前已经获得了一个线程同步锁,而Task试图获取同一个锁,就会造成死锁的线程。
2. 任务执行方式(TaskCreationOptions)
我们知道为了创建一个Task,需要调用构造函数并传递一个Action或Action委托,如果传递的是期待一个Object的方法,还必须向Task的构造函数穿都要传给操作的实参。还可以选择向构造器传递一些TaskCreationOptions标记来控制Task的执行方式。
TaskCreationOptions为枚举类型
枚举值 | 说明 |
---|---|
None | 默认。 |
PreferFairness | 尽可能公平的方式安排任务,即先进先执行。 |
LongRunning | 指定任务将是长时间运行的,会新建线程执行,不会使用池化线程。 |
AttachedToParent | 指定将任务附加到任务层次结构中的某个父级 |
AttachedToParent | 任务试图和这个父任务连接将抛出一个InvalidOperationException |
HideScheduler | 强迫子任务使用默认调度而非父级任务调度 |
在默认情况下,Task内部是运行在池化线程上,这种线程会非常适合执行短计算密集作业。如果要执行长阻塞操作,则要避免使用池化线程。
在池化线程上运行一个长任务问题不大,但是如果要同时运行多个长任务(特别是会阻塞的任务),则会对性能产生影响。最好使用:TaskCreationOptions.LongRunning。
Console.WriteLine("主线程ID:{0}", Thread.CurrentThread.ManagedThreadId);
int workerThreadsCount, completionPortThreadsCount;
ThreadPool.GetAvailableThreads(out workerThreadsCount, out completionPortThreadsCount);
//--------5.等待(Wait)&执行方式(TaskCreationOptions)-----------------
Task task2 = Task.Run(()=> {
Console.WriteLine("线程执行Begin");
Thread.Sleep(2000);
Console.WriteLine("线程执行End");
});
Console.WriteLine("任务是否完成:{0}", task2.IsCompleted);
task2.Wait();//阻塞,直至任务完成
Console.WriteLine("任务是否完成:{0}", task2.IsCompleted);
Console.ReadKey();
Console.WriteLine("剩余工作线程数:{0},剩余IO线程数{1},主线程Id:{2}", workerThreadsCount, completionPortThreadsCount, Thread.CurrentThread.ManagedThreadId);
Task task3 = Task.Factory.StartNew(()=> {
Console.WriteLine("长任务执行,线程Id:{0}", Thread.CurrentThread.ManagedThreadId);
Thread.Sleep(2000);
},TaskCreationOptions.LongRunning);
Thread.Sleep(1000);
ThreadPool.GetAvailableThreads(out workerThreadsCount, out completionPortThreadsCount);
Console.WriteLine("剩余工作线程数:{0},剩余IO线程数{1},主线程Id:{2}", workerThreadsCount, completionPortThreadsCount, Thread.CurrentThread.ManagedThreadId);
Console.ReadKey();
注意:
- 如果使运行I/O密集任务,则可以使用TaskCompletionSource和异步函数(asynchronous
functions),通过回调(延续)实现并发性,而是不通过线程实现。 - 如果使运行计算密集性任务,则可以使用一个生产者/消费者队列,控制这些任务的并发数量,避免出现线程和进程阻塞的问题。
六、 延续(continuation)&延续选项(TaskContinuationOptions)
延续(continuation)会告诉任务在完成后继续执行下面的操作。延续通常由一个回调方法实现,它会在操作完成之后执行一次。给一个任务附加延续的方法有两种
1.GetAwaiter
任务的方法GetAwaiter是Framework 4.5新增加的,而C# 5.0的异步功能使用了这种方法,因此它非常重要。给一个任务附加延续如下:
//-------------GetAwaiter---------------
//任务的方法GetAwaiter是Framework 4.5新增加的,而C# 5.0的异步功能使用了这种方法,因此它非常重要。给一个任务附加延续如下:
Task<int> task5 = Task.Run(()=> {
int total = 0;
for (int i = 0; i < 100; i++)
{
total += i;
}
Thread.Sleep(2000);
return total;
});
var awaiter5 = task5.GetAwaiter();
awaiter5.OnCompleted(() => {
int result = awaiter5.GetResult(); //在延续中获取Task的执行结果
Console.WriteLine(result);
});
Console.ReadKey();
执行结果控制台会打印:5050。
调用GetAwaiter会返回一个等待者(awaiter)对象,它会让先导(antecedent)任务在任务完成(或出错)之后执行一个代理。已经完成的任务也可以附加一个延续,这事延续会马上执行。
注意:
1.等待者(awaiter)可以是任意对象,但必须包含特定的两个方法和一个Boolean类型属性。
public struct TaskAwaiter<TResult> : ICriticalNotifyCompletion, INotifyCompletion
{
public bool IsCompleted { get; }
public TResult GetResult();
public void OnCompleted(Action continuation);
}
2.先导任务出现错误,那么当延续代码调用awaiter.GetResult()时就会重新抛出异常。我们可以需要调用GetResult,而是直接访问先导任务的Result属性(task.Result)。
GetResult的好处是,当先导任务出现错误时,异常可以直接抛出而不封装在AggregateException中。
3.如果出现同步上下文,那么会自动捕捉它,然后延续提交到这个上下文中。在无需同步上下文的情况下通常不采用这种方法,使用ConfigureAwait代替它。它通常会使延续运行在先导任务所在的线程上,从而避免不必要的过载。
var awaiter = task.ConfigureAwait(false).GetAwaiter();
2.ContinueWith
另一种附加延续的方法是调用任务的ContinueWith方法:
//-------------ContinueWith---------------
Task<int> task6 = Task.Run(() => {
int total = 0;
for (int i = 0; i < 100; i++)
{
total += i;
}
Thread.Sleep(2000);
return total;
});
task6.ContinueWith(continuationAction=>
{
int result = continuationAction.Result;
Console.WriteLine(result);
});
Console.ReadKey();
ContinueWith本身会返回一个Task,它非常适用于添加更多的延续。然后如果任务出现错误,我们必须直接处理AggregateException。
如果想让延续运行在统一个线程上,必须指定 TaskContinuationOptions.ExecuteSynchronously;否则它会弹回线程池。ContinueWith特别适用于并行编程场景。
3.延续选项(TaskContinuationOptions)
在使用ContinueWith时可以指定任务的延续选项即TaskContinuationOptions,它的前六个枚举类型与之前说的TaskCreationOptions枚举提供的标志完全一样,补充后续几个枚举值:
ExecuteSynchronously是指同步执行,两个任务都在同一个=线程一前一后的执行。
ContinueWith结合TaskContinuationOptions使用的示例:
在这里插入代码片
执行结果会打印:报错,如果注释掉抛出异常的代码则会打印5050。
七、 TaskCompletionSource
另一种创建任务的方法是使用TaskCompletionSource。它允许创建一个任务,并可以任务分发给使用者,并且这些使用者可以使用该任务的任何成员。它的实现原理是通过一个可以手动操作的“附属”任务,用于指示操作完成或出错的时间。
TaskCompletionSource的真正作用是创建一个不绑定线程的任务(手动控制任务工作流,可以使你把创建任务和完成任务分开)。
这种方法非常适合I/O密集作业:可以利用所有任务的优点(它们能够生成返回值、异常和延续),但不会在操作执行期间阻塞线程。
例如,假设一个任务需要等待2秒,然后返回10,我们的方法会返回在一个2秒后完成的任务,通过给任务附加一个延续就可以在不阻塞任何线程的前提下打印这个结果,如下:
static Task<int> Demo(int millis)
{
//创建一个任务完成源
TaskCompletionSource<int> taskCompletionSource = new TaskCompletionSource<int>();
var timer = new System.Timers.Timer(millis)
{
AutoReset = false
};
timer.Elapsed += delegate
{
timer.Dispose();
taskCompletionSource.SetResult(10);//写入返回值
}; timer.Start(); Console.WriteLine("主线程继续执行....");
return taskCompletionSource.Task;//返回任务
}
static void Main(string[] args)
{
//-------------TaskCompletionSource---------------
//得到任务通过延续输出返回值
var awaiter = Demo(2000).GetAwaiter();
awaiter.OnCompleted(()=> {
Console.WriteLine(awaiter.GetResult());
});
Console.WriteLine("主线程继续执行....");
Console.ReadKey();
八、异步等待 (Task.Delay)
异步等待非常实用,因此它成为Task类的一个静态方法
//第1种
6 {
7 Task.Delay(2000).ContinueWith((o) =>
8 {
9 Console.WriteLine("线程Id:{0},异步等待2秒后执行的逻辑", Thread.CurrentThread.ManagedThreadId);
10 });
11 }
12 //第2种
13 {
14 Task.Delay(3000).GetAwaiter().OnCompleted(() =>
15 {
16 Console.WriteLine("线程Id:{0},异步等待3秒后执行的逻辑", Thread.CurrentThread.ManagedThreadId);
17 });
18 }
19 Console.WriteLine("主线程Id:{0},继续执行", Thread.CurrentThread.ManagedThreadId);
20 Console.ReadKey();
Task.Delay是Thread.Sleep的异步版本。而它们的区别如下(引自 禅道 ):
1.Thread.Sleep 是同步延迟,Task.Delay异步延迟。
2.Thread.Sleep 会阻塞线程,Task.Delay不会。
3.Thread.Sleep不能取消,Task.Delay可以。
4. Task.Delay() 比 Thread.Sleep() 消耗更多的资源,但是Task.Delay()可用于为方法返回Task类型;或者根据CancellationToken取消标记动态取消等待。
5. Task.Delay() 实质创建一个运行给定时间的任务, Thread.Sleep() 使当前线程休眠给定时间。
九、异常(AggregateException)
与线程不同,任务可以随时抛出异常。所以,如果任务中的代码抛出一个未处理异常,那么这个异常会自动传递到调用Wait()或Task的Result属性的代码上。
任务的异常将会自动捕获并抛给调用者。为确保报告所有的异常,CLR会将异常封装在AggregateException容器中,该容器公开的InnerExceptions属性中包含所有捕获的异常,从而更适合并行编程。
try
6 {
7 Task.Run(() =>
8 {
9 throw new Exception("错误");
10 }).Wait();
11 }
12 catch (AggregateException axe)
13 {
14 foreach (var item in axe.InnerExceptions)
15 {
16 Console.WriteLine(item.Message);
17 }
18 }
19 Console.ReadKey();
20 }
上述示例控制台会显示:错误
注意:
使用Task的IsFaulted和IsCanceled属性,就可以不重新抛出异常而检测出错的任务。
1.IsFaulted和IsCanceled都返回False,表示没有错误发生。
2.IsCanceled为True,则任务抛出了OperationCanceledOperation(取消线程正在执行的操作时在线程中抛出的异常)。
3.IsFaulted为True,则任务抛出另一种异常,而Exception属性包含了该错误。
1.Flatten
当子任务抛出异常时,通过调用Flatten方法,可以消除任意层次的嵌套以简化异常处理。
var parent = Task.Factory.StartNew(() =>
6 {
7 int[] numbers = { 0 };
8 var childFactory = new TaskFactory(TaskCreationOptions.AttachedToParent, TaskContinuationOptions.None);
9 childFactory.StartNew(() => 10 / numbers[0]);//除零
10 childFactory.StartNew(() => numbers[1]);//超出索引范围
11 childFactory.StartNew(() => throw null);//空引用
12 });
13 try
14 {
15 parent.Wait();
16 }
17 catch (AggregateException axe)
18 {
19 foreach (var item in axe.Flatten().InnerExceptions)
20 {
21 Console.WriteLine(item.Message);
22 }
23 }
24 Console.ReadKey();
2.Handle
如果需要只捕获特定类型异常,并重抛其它类型的异常,Handle方法为此提供了一种快捷方式。
Handle接受一个predicate(异常断言),并在每个内部异常上运行此断言。
1 public void Handle(Func<Exception, bool> predicate);
如果断言返回True,它认为该异常是“已处理”,当所有异常过滤之后:
1.如果所有异常是已处理的,异常不会抛出。
2.如果存在异常未处理,就会构造一个新的AggregateException对象来包含这些异常并抛出。
class Program
2 {
3 static void Main(string[] args)
4 {
5 var parent = Task.Factory.StartNew(() =>
6 {
7 int[] numbers = { 0 };
8 var childFactory = new TaskFactory(TaskCreationOptions.AttachedToParent, TaskContinuationOptions.None);
9 childFactory.StartNew(() => 10 / numbers[0]);//除零
10 childFactory.StartNew(() => numbers[1]);//超出索引范围
11 childFactory.StartNew(() => throw null);//空引用
12 });
13 try
14 {
15 try
16 {
17 parent.Wait();
18 }
19 catch (AggregateException axe)
20 {
21 axe.Flatten().Handle(ex =>
22 {
23 if (ex is DivideByZeroException)
24 {
25 Console.WriteLine("除零-错误处理完毕");
26 return true;
27 }
28 if (ex is IndexOutOfRangeException)
29 {
30 Console.WriteLine("超出索引范围-错误处理完毕");
31 return true;
32 }
33 return false;//所有其它 异常重新抛出
34 });
35
36 }
37 }
38 catch (AggregateException axe)
39 {
40 foreach (var item in axe.InnerExceptions)//捕获重新抛出的异常
41 {
42 Console.WriteLine(item.Message);
43 }
44 }
45 Console.ReadKey();
46 }
47 }