本章继续学习实现数据并行,本文主要介绍取消循环。
本教程对应学习工程:魔术师Dix / HandsOnParallelProgramming · GitCode
4、取消循环
在顺序循环中,可以使用 break 来跳出循环,而在并行循环的情况下,由于他们在多个线程上执行,因此无法使用 break 和 continue 关键字。
4.1、ParallelLoopState.Break
测试代码如下,这里我们把最大并行度设置为2 :
private void CancelByParallelBreak()
{
int length = commonPanel.GetInt32Parameter();
var L = TestFunction.GetTestList(length);
//设置最大并行度为2
var options = new ParallelOptions { MaxDegreeOfParallelism = 2 };
var result = Parallel.ForEach(L, options, (i, state) =>
{
Debug.Log(L[i]);
state.Break();//跳出循环
Debug.Log($"Run Parallel Break At {Task.CurrentId}");
});
Debug.Log($"CancelByParallelBreak End! : {result.IsCompleted} | {result.LowestBreakIteration}");
}
此时,我们看到打印结果如下:
两个线程,各自打印了1次就中断了,并且结果返回的 IsCompleted 为 False。
当任何处理器遇到 Break()方法时,它将在 ParallelLoopState 对象的 LowestBreakIteration 属性中设置一个迭代数,这将成为迭代的最大次数或可以执行的最后一次迭代。其他所有任务将继续迭代,直至达到此次数。
因此,我们把代码做部分修改:
……
Debug.Log($"Log {L[i]} At :{Task.CurrentId}");
if (L[i] == 0)
{
state.Break();
Debug.Log($"Run Parallel Break At {Task.CurrentId}");
}
……
这里我们在其中一个线程中设置为值为 0,也就是在第一次循环中就中断循环。但得到的打印结果却有所不同:
可以看到,另一个线程执行了3次才中断循环。
4.2、ParallelLoopState.Stop
同样上述的方法,同样将并行度设置为2,我们把 Break 改成 Stop:
private void CancelByParallelStop()
{
int length = commonPanel.GetInt32Parameter();
var L = TestFunction.GetTestList(length);
var options = new ParallelOptions { MaxDegreeOfParallelism = 2 };
var result = Parallel.ForEach(L, options, (i, state) =>
{
Debug.Log($"Log {L[i]} At :{Task.CurrentId}");
if (L[i] == 0)
{
state.Stop();
Debug.Log($"Run Parallel Stop At {Task.CurrentId}");
}
});
Debug.Log($"CancelByParallelStop End! : {result.IsCompleted} | {result.LowestBreakIteration}");
}
打印结果如下:
4.3、CancellationToken
和正常任务一样,也可以使用 CacellationToken 类来取消迭代。当取消令牌时,循环将完成当前并行运行的迭代,但不会开始新的迭代。现有迭代完成后,并行循环将抛出 OperationCanceledException 。代码示例如下:
private void CancelByParallelCancellationToken()
{
int length = commonPanel.GetInt32Parameter();
var L = TestFunction.GetTestList(length);
//将取消令牌设置为运行参数
CancellationTokenSource cancellationTokenSource = new CancellationTokenSource();
var options = new ParallelOptions
{
MaxDegreeOfParallelism = 2,
CancellationToken = cancellationTokenSource.Token
};
//开始运行循环
Debug.Log($"Start Run Parallel.ForEach : {length}");
Task.Run(() =>
{
var result = Parallel.ForEach(L, options, (i, state) =>
{
//等待 100 ms
Thread.Sleep(100);
Debug.Log($"Log {L[i]} In Task :{Task.CurrentId}");
});
Debug.Log($"Parallel.ForEach End : {result.IsCompleted} | {result.LowestBreakIteration}");
});
//等待250 ms 后取消循环
Task.Run(async () =>
{
await Task.Delay(250);
cancellationTokenSource.Cancel();
Debug.Log("Cancel Token !");
});
}
同样的,并行度为2。但这次代码上做了修改:之前的代码其实会在主线程同步,这次我们放到 Task 中执行循环任务,并且每打印一个值等待100ms。之后取消令牌也是在 Task 中执行,等待250ms 之后取消。
直接从代码上来推理,上述循环在打印了到第3次(200ms开始,300ms 结束)时,在 Sleep 的过程中,就已经取消任务了。由于已经安排的任务仍然会执行,因此第3次打印会生效,但第四次打印则不会执行:
可见打印结果和预期一致。
但值得注意的是:Parallel.ForEach End 这一行打印(24行)并没有打印出来!也就是取消令牌是直接取消了整个线程,而不是单单取消了循环!
未完待续。
本教程对应学习工程:魔术师Dix / HandsOnParallelProgramming · GitCode