c# 异步进阶———— paralel

news2024/12/26 3:39:17

前言

简单整理一下paralel,以上是并行的意思。

正文

我们在工作中常常使用task await 和 async,也就是将线程池进行了封装,那么还有一些更高级的应用。

是对task的封装,那么来看下paralel。

static void Main(string[] args)
{
	var ints= Enumerable.Range(1, 100);
	var result = Parallel.ForEach(ints, arg =>
	{
		Console.WriteLine(arg);
	});
	
	Console.Read();
}

可以看到结果是并行的。

 

那么来看下实现机制。

public static ParallelLoopResult ForEach<TSource>(IEnumerable<TSource> source, Action<TSource> body)
{
	if (source == null)
	{
		throw new ArgumentNullException(nameof(source));
	}
	if (body == null)
	{
		throw new ArgumentNullException(nameof(body));
	}

	return ForEachWorker<TSource, object>(
		source, s_defaultParallelOptions, body, null, null, null, null, null, null);
}

进行参数检验,然后交给了ForEachWorker。

这是一个基本的代码思路,就是复杂的方法中可以先校验参数,然后具体实现交给另外一个方法。

然后通过不同的类型,进行分类:

 

然后看下具体实现是什么?

进去看就是一个taskreplicator:

 

看下run在做什么。

public static void Run<TState>(ReplicatableUserAction<TState> action, ParallelOptions options, bool stopOnFirstFailure)
{
	int maxConcurrencyLevel = (options.EffectiveMaxConcurrencyLevel > 0) ? options.EffectiveMaxConcurrencyLevel : int.MaxValue;

	TaskReplicator replicator = new TaskReplicator(options, stopOnFirstFailure);
	new Replica<TState>(replicator, maxConcurrencyLevel, CooperativeMultitaskingTaskTimeout_RootTask, action).Start();

	Replica nextReplica;
	while (replicator._pendingReplicas.TryDequeue(out nextReplica))
		nextReplica.Wait();

	if (replicator._exceptions != null)
		throw new AggregateException(replicator._exceptions);
}
  1. 创建了一个taskreplictor,起到管理作用

  2. 然后创建了一个Replica,然后这个start 是关键

  3. 然后通过while,让每一个Replica 都运行完毕才推出,达到同步的效果

if (replicator._exceptions != null)
	throw new AggregateException(replicator._exceptions);

可以看一下这个,这个是一个比较好的技巧。如果一个运行管理,不用抛出异常,之间在管理中进行运行处理总结。

比如结果,异常等。

那么就看下这个start。

protected Replica(TaskReplicator replicator, int maxConcurrency, int timeout)
{
	_replicator = replicator;
	_timeout = timeout;
	_remainingConcurrency = maxConcurrency - 1;
	_pendingTask = new Task(s => ((Replica)s).Execute(), this);
	_replicator._pendingReplicas.Enqueue(this);
}

public void Start()
{
	_pendingTask.RunSynchronously(_replicator._scheduler);
}

将会运行Execute,是同步的,而不是异步的,也就是说第一个task将会运行在当前线程。

那么看Execute在做什么?

public void Execute()
{
	try
	{
		if (!_replicator._stopReplicating && _remainingConcurrency > 0)
		{
			CreateNewReplica();
			_remainingConcurrency = 0; // new replica is responsible for adding concurrency from now on.
		}

		bool userActionYieldedBeforeCompletion;

		ExecuteAction(out userActionYieldedBeforeCompletion);

		if (userActionYieldedBeforeCompletion)
		{
			_pendingTask = new Task(s => ((Replica)s).Execute(), this, CancellationToken.None, TaskCreationOptions.None);
			_pendingTask.Start(_replicator._scheduler);
		}
		else
		{
			_replicator._stopReplicating = true;
			_pendingTask = null;
		}
	}
	catch (Exception ex)
	{
		LazyInitializer.EnsureInitialized(ref _replicator._exceptions).Enqueue(ex);
		if (_replicator._stopOnFirstFailure)
			_replicator._stopReplicating = true;
		_pendingTask = null;
	}
}

一段一段分析:

if (!_replicator._stopReplicating && _remainingConcurrency > 0)
{
	CreateNewReplica();
	_remainingConcurrency = 0; // new replica is responsible for adding concurrency from now on.
}

这里当_replicator 也就是任务复制器没有停止的时候。这里有两种情况会停止,一种是任务完成,一种是任务异常且设置参数异常时候停止。

_remainingConcurrency 指的是副本数,默认是int.max。

那么就复制一个副本。

protected override void CreateNewReplica()
{
	Replica<TState> newReplica = new Replica<TState>(_replicator, _remainingConcurrency, GenerateCooperativeMultitaskingTaskTimeout(), _action);
	newReplica._pendingTask.Start(_replicator._scheduler);
}

复制完副本后,那么就开始运行我们的action了。

protected override void ExecuteAction(out bool yieldedBeforeCompletion)
{
	_action(ref _state, _timeout, out yieldedBeforeCompletion);
}

这里传入了timeout,这个timeout并不是我们限制我们单个task的运行时间,而是当运行到一定时候后,这个task就停止运行,然后另外启动一个副本。

if (CheckTimeoutReached(loopTimeout))
{
	replicationDelegateYieldedBeforeCompletion = true;
	break;
}
if (userActionYieldedBeforeCompletion)
{
	_pendingTask = new Task(s => ((Replica)s).Execute(), this, CancellationToken.None, TaskCreationOptions.None);
	_pendingTask.Start(_replicator._scheduler);
}
else
{
	_replicator._stopReplicating = true;
	_pendingTask = null;
}

这个是为了符合操作系统的调度思想,跑的越久的,基本上优先级会低些。

那么看下这个_action主要在做什么吧。

while (myPartition.MoveNext())
{
	KeyValuePair<long, TSource> kvp = myPartition.Current;
	long index = kvp.Key;
	TSource value = kvp.Value;

	// Update our iteration index
	if (state != null) state.CurrentIteration = index;

	if (simpleBody != null)
		simpleBody(value);
	else if (bodyWithState != null)
		bodyWithState(value, state);
	else if (bodyWithStateAndIndex != null)
		bodyWithStateAndIndex(value, state, index);
	else if (bodyWithStateAndLocal != null)
		localValue = bodyWithStateAndLocal(value, state, localValue);
	else
		localValue = bodyWithEverything(value, state, index, localValue);

	if (sharedPStateFlags.ShouldExitLoop(index)) break;

	// Cooperative multitasking:
	// Check if allowed loop time is exceeded, if so save current state and return.
	// The task replicator will queue up a replacement task. Note that we don't do this on the root task.
	if (CheckTimeoutReached(loopTimeout))
	{
		replicationDelegateYieldedBeforeCompletion = true;
		break;
	}
}

就是拉取我们的enumerator的数据,然后simpleBody(value),进行运行我们写的action。

总结一下,其实Parallel 核心就是一个任务复制器,然后创建多个副本,拉取我们的数据,进行执行我们设置的action。

里面的主要功能,Parallel做到了限制副本数,因为我们知道task并不是越多越好。

第二个,如果长时间运行,那么Parallel是做了优化的,当达到timeout的时候,那么会重新启动一个副本(可以理解为一个线程)

第三点,Parallel 有一个foreach 进行迭代器的处理,这里不仅仅是让任务可以并行。

而且具备c# foreach的基本功能。

static void Main(string[] args)
{
	var ints= Enumerable.Range(1, 100);
	var result = Parallel.ForEach(ints,    (arg, state)
		=>
	{
		if (state.IsStopped)
		{
			return;   
		}
		
		if (arg > 18)
		{
			state.Break();
		}
	});
	if (result.IsCompleted)
	{
		Console.WriteLine("完成");
	}
	Console.Read();
}

可以进行中断。 

还有一个函数,那就是stop,这个stop 比break 停止的快,break 要记录出,最小中断位置。

 

而stop 就是立马停止下来。

在上述中,我们知道可以传递一个taskschedule进行,那么这个taskschedule 是干什么的,对我们的任务调度有什么影响呢? 下一节,自我实现taskschedule。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/879640.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

猿辅导Motiff与IXDC达成战略合作,将在UI设计领域推动AI革新更多可能性

近日&#xff0c;“IXDC 2023国际体验设计大会”在北京国家会议中心拉开序幕&#xff0c;3000设计师、1000企业、200全球商业领袖&#xff0c;共襄为期5天的用户体验创新盛会。据了解&#xff0c;此次大会是以“设计领导力”为主题&#xff0c;分享全球设计、科技、商业的前沿趋…

如何手动创建可信任证书DB并配置 nss-config-dir

以阿里云免费邮箱为例 1. 如何下载证书链 证书链说明 使用 gnutls gnutls-cli --print-cert smtp.aliyun.com:465 < /dev/null > aliyun-chain.certs使用 openssl showcerts $ echo -n | openssl s_client -showcerts -connect smtp.aliyun.com:465 | sed -ne /-BE…

PHP8的字符串操作2-PHP8知识详解

今日继续分享《php8的字符串操作》昨天一天都没有写多少&#xff0c;内容多&#xff0c;今天继续&#xff1a; 昨天分享的是1、使用trim()、rtrim()和ltrim()函数去除字符串首尾空格和特殊字符。2、使用strlen()函数和mb_strlen()函数获取字符串的长度。 3、截取字符串 PHP对…

基于强化学习的自动化裁剪CIFAR-10 分类任务(提升模型精度+减少计算量)

基于强化学习的自动化裁剪&#xff0c;提升模型精度的同时减少计算量。 介绍 [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-RFnHlyQG-1691544546106)(./pic/APT-main.png)] 目前的强化学习工作很多集中在利用外部环境的反馈训练agent&#xff0c…

大数据:什么是数据分析及环境搭建

一、什么是数据分析 当今世界对信息技术的依赖程度在不断加深&#xff0c;每天都会有大量的数据产生&#xff0c;我们经常会感到数据越来越多&#xff0c;但是要从中发现有价值的信息却越来越难。这里所说的信息&#xff0c;可以理解为对数据集处理之后的结果&#xff0c;是从…

【Sklearn】基于逻辑回归算法的数据分类预测(Excel可直接替换数据)

【Sklearn】基于逻辑回归算法的数据分类预测&#xff08;Excel可直接替换数据&#xff09; 1.模型原理2.模型参数3.文件结构4.Excel数据5.下载地址6.完整代码7.运行结果 1.模型原理 逻辑回归是一种用于二分类问题的统计学习方法&#xff0c;尽管名字中含有“回归”&#xff0c…

ORCA优化器浅析——IMDRelation Storage type of a relation GP6与GP7对比

如上图所示IMDRelation作为Interface for relations in the metadata cache&#xff0c;其定义了Storage type of a relation表的存储类型&#xff0c;如下所示&#xff1a; enum Erelstoragetype {ErelstorageHeap,ErelstorageAppendOnlyCols,ErelstorageAppendOnlyRows,Erels…

如何使用CSS实现一个模态框(Modal)效果?

聚沙成塔每天进步一点点 ⭐ 专栏简介⭐ 使用CSS实现模态框&#xff08;Modal&#xff09;效果⭐ HTML 结构⭐ CSS 样式⭐ JavaScript⭐ 写在最后 ⭐ 专栏简介 前端入门之旅&#xff1a;探索Web开发的奇妙世界 记得点击上方或者右侧链接订阅本专栏哦 几何带你启航前端之旅 欢迎…

重磅:谷歌发布多平台应用 AI 编程神器

前几天&#xff0c; 谷歌发布了一个多平台应用开发神器&#xff1a;IDX 。 IDX 背靠 AI 编程神器 Codey&#xff0c;支持 React、Vue 等框架&#xff0c;还能补全、解释代码。 更有特色的一点就是&#xff1a;这是一款基于浏览器的开发全栈、用于多平台应用开发的工具。 这款开…

C语言题目的多种解法分享 2之字符串左旋和补充题

前言 有的时候&#xff0c;这个系列专栏中的解法之间并无优劣&#xff0c;只是给大家提供不同的解题思路 我决定将代码实现的过程写成注释&#xff0c;方便大家直接找到对应的函数&#xff0c;只有需要补充说明的知识才会单拿出来强调 这个系列的文章会更的比较慢&#xff0…

级联(数据字典)

二级级联&#xff1a; 一&#xff1a;新建两个Bean 父级&#xff1a; /*** Description 数据字典* Author WangKun* Date 2023/7/25 10:15* Version*/ Data AllArgsConstructor NoArgsConstructor TableName("HW_DICT_KEY") public class DictKey implements Seri…

学习笔记整理-JS-06-函数

一、函数基本使用 1. 什么是函数 函数就是语句的封装&#xff0c;可以让这些代码方便地被复用。函数具有"一次定义&#xff0c;多次调用"的优点。使用函数&#xff0c;可以简化代码&#xff0c;让代码更具有可读性。 2. 函数的定义和调用 和变量类似&#xff0c;函…

C++:模拟实现list及迭代器类模板优化方法

文章目录 迭代器模拟实现 本篇模拟实现简单的list和一些其他注意的点 迭代器 如下所示是利用拷贝构造将一个链表中的数据挪动到另外一个链表中&#xff0c;构造两个相同的链表 list(const list<T>& lt) {emptyinit();for (auto e : lt){push_back(e);} }void test_…

【ES】【elasticsearch】分布式搜索

文章目录 ☀️安装elasticsearch☀️1.部署单点es&#x1f338;1.1.创建网络&#x1f338;1.2.下载镜像&#x1f338;1.3.运行 ☀️2.部署kibana&#x1f338;2.1.部署&#x1f338;2.2.DevTools ☀️3.安装IK分词器&#x1f338;3.1.在线安装ik插件&#xff08;较慢&#xff0…

ARM汇编快速入门

本文主要分享如何快速上手ARM汇编开发的经验、汇编开发中常见的Bug以及Debug方法、用的Convolution Dephtwise算子的汇编实现相对于C版本的加速效果三方面内容。 前言 神经网络模型能够在移动端实现快速推理离不开高性能算子&#xff0c;直接使用ARM汇编指令来进行算子开发无疑…

ad+硬件每日学习十个知识点(32)23.8.12 (元器件封装、PCB封装、3D的PCB封装)

文章目录 1.元器件封装属性值说明2.PCB封装标准说明&#xff08;M、N、L&#xff09;3.电阻的PCB封装&#xff08;阻焊层&#xff09;4.电感的PCB封装&#xff08;CD、CDRH&#xff09;1.CD31的意思是&#xff0c;直径3mm&#xff0c;高度1mm![在这里插入图片描述](https://img…

【SQL应知应会】索引(二)• MySQL版

欢迎来到爱书不爱输的程序猿的博客, 本博客致力于知识分享&#xff0c;与更多的人进行学习交流 本文收录于SQL应知应会专栏,本专栏主要用于记录对于数据库的一些学习&#xff0c;有基础也有进阶&#xff0c;有MySQL也有Oracle 索引 • MySQL版 前言一、索引1.简介2.创建2.1 索引…

Gradio——快速部署可视化人智能应用

前言 Gradio是一个开源的Python库&#xff0c;用于快速构建机器学习和数据科学演示的应用。它可以帮助你快速创建一个简单漂亮的用户界面&#xff0c;以便向客户、合作者、用户或学生展示你的机器学习模型。此外&#xff0c;还可以通过自动共享链接快速部署模型&#xff0c;并获…

IntelliJ IDEA热部署:JRebel插件的安装与使用

热部署 概述JRebel 概述 热部署&#xff0c;指修改代码后&#xff0c;无需停止应用程序&#xff0c;即可使修改后的代码生效&#xff0c;其有利于提高开发效率。 热部署方式&#xff1a; 手动热部署&#xff1a;修改代码后&#xff0c;重新编译项目&#xff0c;然后启动应用程…

【软件测试】Linux系统下安装jdk配置环境变量(详细步骤)

目录&#xff1a;导读 前言一、Python编程入门到精通二、接口自动化项目实战三、Web自动化项目实战四、App自动化项目实战五、一线大厂简历六、测试开发DevOps体系七、常用自动化测试工具八、JMeter性能测试九、总结&#xff08;尾部小惊喜&#xff09; 前言 1、安装环境 操作…