PLINQ 是语言集成查询(Language Integrate Query , LINQ)的并行实现(P 表示并行)。本章将继续介绍其编程的各个方面以及与之相关的一些优缺点。
本文的主要内容为 PLINQ 中的合并选项以及抛出和处理异常。
本教程对应学习工程:魔术师Dix / HandsOnParallelProgramming · GitCode
4、PLINQ 中的合并选项
如前文所述,当创建并行查询时,将对源集合进行分区,以便多个任务可以同时在各部分上工作。查询完成后,需要合并结果,以便将其提供给使用他的线程。
4.1、使用 NotBuffered 合并选项
使用 NoBuffered 合并选项时,并发任务的结果不被缓冲。一旦完成任何任务,他们就会将结果返回给使用的线程。代码演示如下:
private void RunWithNotBuffered()
{
var task = Task.Run(() =>
{
Debug.Log("RunWithNotBuffered Start !");
var L = ParallelEnumerable.Range(0, 10);
var notBufferedQuery = L.WithMergeOptions(ParallelMergeOptions.NotBuffered)
.Select(async x =>
{
await Task.Delay(x * 1000);
return x;
});
notBufferedQuery.ForEach(t=>
{
Debug.Log($"{t.Result} In notBufferedQuery !");
});
Debug.Log("RunWithNotBuffered End !");
});
}
打印结果如下:
首先,这个执行是无序的;然后虽然每一次选择都有等待,但是每有一个 Select 语句执行完成就会在 ForEach 里执行一次。
4.2、使用 AutoBuffered 合并选项
使用 AutoBuffered 合并选项时,并发任务的结果将被缓冲,并使缓冲区可定期用于使用他的线程。根据集合的大小,可能会返回多个缓冲区。设置此选项后,使用结果的线程将需要等待更长的时间才能获得第一个结果。这也是默认选项。
private void RunWithAutoBuffered()
{
var task = Task.Run(() =>
{
Debug.Log("RunWithAutoBuffered Start !");
var L = ParallelEnumerable.Range(0, 10);
var autoBufferedQuery = L.WithMergeOptions(ParallelMergeOptions.AutoBuffered)
.Select(async x =>
{
await Task.Delay(x * 1000);
return x;
});
autoBufferedQuery.ForEach(t =>
{
Debug.Log($"{t.Result} In autoBufferedQuery !");
});
Debug.Log("RunWithAutoBuffered End !");
});
}
这次打印的结果就比较有意思了:
可以看到第一个结果出来等待了很久(8s),但是后面的结果就很快出来了。
4.3、使用 FullyBuffered 合并选项
使用 FullyBuffered 合并选项时,并发任务的结果在进入使用它的线程之前会被完全缓冲。尽管获得第一个结果所花费的时间会更长,但是可以提高整体性能。
测试代码如下:
private void RunWithFullyBuffered()
{
var task = Task.Run(() =>
{
Debug.Log("RunWithFullyBuffered Start !");
var L = ParallelEnumerable.Range(0, 10);
var fullyBufferedQuery = L.WithMergeOptions(ParallelMergeOptions.FullyBuffered)
.Select(async x =>
{
await Task.Delay(x * 1000);
return x;
});
fullyBufferedQuery.ForEach(t =>
{
Debug.Log($"{t.Result} In fullyBufferedQuery !");
});
Debug.Log("RunWithFullyBuffered End !");
});
}
打印结果如下:
可以看到依次打印出了结果,这是因为我们在等待时设置的代码是 (x*1000) ,也就是 Select 项目都是同时开始的,然后依次结束。
并非所有查询运算符都支持全部的合并模式,运算符及其合并模式限制可参考以下网站:
PLINQ 中的合并选项 | Microsoft Learn详细了解:PLINQ 中的合并选项https://learn.microsoft.com/zh-cn/dotnet/standard/parallel-programming/merge-options-in-plinq#query-operators-that-support-merge-options 除上述运算符外,ForAll 始终为 NotBuffered ,而 OrderBy 始终为 FullyBuffered。如果在这些运算符上制定了任何自定义的合并选项,则它们都会被忽略。
5、使用 PLINQ 抛出和处理异常
简单地使用 TryCatch 语句就能获取到 PLINQ 中的异常:
private void RunException()
{
var range = ParallelEnumerable.Range(1, 10);
var query = range.Select(x => x / (x % 2 - 1));
try
{
query.ForAll(x => Debug.Log(x));
}
catch (AggregateException ex)
{
Debug.LogError(ex.Message);
var exs = ex.InnerExceptions;
foreach (var innerEx in exs)
{
Debug.LogError(innerEx.Message);
}
}
}
上述代码,只要是奇数就会出现除0的错误,直接运行打印如下:
可见运行了多个线程,并出现了多个除0错误。但是我发现这次的错误异常抛出,如果是打印 InnerException 的话,可能会收集不完整,显示不出堆栈信息。但是这个其实也能接受,毕竟只要知道一个错误就行,对于多线程来说都是一样的,并不用在意同一个错误出现了几次。
(未完待续)
本教程对应学习工程:魔术师Dix / HandsOnParallelProgramming · GitCode