DAG计算框架:实现业务编排

news2025/1/13 13:25:14

文章目录

  • DAG
  • 如何实现DAG计算框架
    • Node的实现
    • Engine的实现
    • Graph的实现
    • 具体某个节点
    • 如何使用

在工作几年之后,大部分人如果还在继续做着 CRUD的简单重复工作,即使领导不提出对你更高的期望,自身也会感到焦虑吧。学如逆水行舟不进则退,年龄在增,技术深度也需要不断精进,否则就很可能面临淘汰。因此找个时间静下心来,为自己做一个技术规划是非常有必要的。

在工作中想要做好技术规划,就必须抓住一个软件系统的演进见律:

函数->类->组件->脚本->服务->系统->分栈/层->配置化/标准化->自动化->平台化->产品化->规模化

软件工程的本质就是应对规模化所带来的复杂性。

因此如何将复杂的东西变简单,以便于承接更大的规模化发展,这本身就是技术的本质,因此是极其有技术含量的事。

DAG

在图论中,如果一个有向图无法从某个顶点出发经过若干条边回到该点,则这个图是一个有向无环图(DAG Directed Acyclic Graph),在工作中,大部分规则引擎都会用到DAG
在这里插入图片描述

设计时,一般只需要在请求到来时,根据变化的配置信息对DAG进行初始化,根据上下文中的信息(一般用自定义的ctx携带信息),每个node决定是否真的执行具体的task(或者跳过),将业务组件最大程度的复用和内聚。

如何实现DAG计算框架

Node的实现

通过继承扩展实现,业务开发需要实现两个函数EnableRun,其中所有参数检查逻辑在Enable中完成, Enable返回false代表不启用此NodeRun函数是真正执行
业务逻辑的函数实现,这样对于一个具体Node的所有业务逻辑都被高度内聚在了一个文件中实现。

type Node struct {
	Ctx *BizCtx // 自定义的上下文
	Name string //节点名字,一般表示业务单元的标识(一个业务流程是一个Node)
	g *Graph  //整个DAG的控制对象
	ID int64 // 保证唯一的ID
	Deps []string // 所有的父节点
	Nexts []*Node // 所有的子节点 
	Mask int64 // 表示依赖的演码,用于标识当前节点是否可达(可执行)
}

//根据相关参数构建节点
func NewNode(...)*Node{....}
// 获取上下文
func (n *Node)GetCtx()*BizCtx{return n.BizCtx}
// 参数校验
func (n *Node)Enable()bool{...}
// 节点实际运行
func (n *Node)Run(){}
// 获取节点名字
func (n *Node)Name()string{return n.Name}
// 节点依赖,可理解成当前节点的父节点,本示例中,父节点都执行成功后,才可执行当前节点
func (n *Node)Deps()[]string{return n.Deps}
// 所有的子节点
func (n *Node)Nexts()[]*Node{return n.Nexts}
// 节点的配置,不同业务,可能有自己的一些配置参数,如分流参数,奖励参数
func (n *Node)Conf()*NodeConf{}

Engine的实现

提供DAG计算框架的运行时资源管理,协程池管理计算资源原,对象池管理内存资源。

type Engine struct { // engine的生命周期是进程级的
ctxPool *sync.Pool
gPool *sync.Pool
runPool *goPool // 某种协程池实现,接受两个函数,一个函数执行和一个回调函数
graph *Grpah // graph代表着一个真正的DAG,是请求级的生命周期。
}

// 函数式选项模式获得一个Engine对象
func NewEngine(opt Options)*Engine{...}
// 初始化一个图
func (e *Engine)BuildGraph()*Graph{...}

Graph的实现

真正实现DAG调度的组件,请求范围内的生命周期

var RootNode = &Node{ID:-1}
var EndNode = &Node{ID:-2}
type Graph struct{
	e *Engine // Graph 和Engine互相包含
	id int64
	taskChannel chan int64 // 需要执行的节点的ID
	ackChanel   chan int64   // 异步回调的确认chan
	doneChannel chan struct{}  // 执行完成或者异常时,终止DAG的通道
	NameTable   map[string]*Node // 节点名与节点的映射
	IDTable     map[int64]*Node // 节点id与节点的映射
}

// 添加节点,拼接实际的图
// 这里默认添加顺序遵循了添加当前节点时,已经添加完了当前节点依赖的所有父节点
func (g *Graph)ADD(node *Node)*Graph{
	g.NameTable[node.Name()] = node
	// Mask在构建Node时会设置和Node的ID字段相同,然后与所有父节点的ID异或,得到新的Mask值
	// 后面执行时候,当前节点执行成功,就与其所有子节点异或,更新了子节点的Mask值
	// 当子节点的Mask值和其ID值又相等时,说明当前节点的所有父节点都执行成功了,可以执行当前节点了
	for nodeName <- node.Deps(){
		preNode := g.NameTable[nodeName]
		node.Mask ^= preNode.ID
		preNode.Nexts = append(preNode.Nexts, node)
	}
}

func (g *Graph)Run() err {
//在遍历过程中对第一层没有依赖的node添加一个 rootNode,其 ID== -1
//在遍历的过程中对最后没有出度的节点添加上特殊的 终止Nodle,其ID=== -2
// 即默认让-1和-2分别作为根节点和终止节点
g.taskChannel <- -1
for{
	select{
	case taskID <-g.taskChannel:
		// 遇到了终止节点,当前图可以终止执行了
		if taskID == -2 {close(g.doneChannel)}
		node := g.IDTable[taskID]
		if node.Enable(){
			// 使用Engine管理协程执行
			g.e.runPool(func(){node.Run()}, func(id int64){g.ackChannel<-id})
		}
	//这里也可以基于协程池做异步控制,ack
	case taskID <- g.ackChanel:
		node := g.IDTable[taskID]
		// 当前节点致辞哪个成功后,通知所有子节点
		for nextNode <- node.Nexts(){
			nextNode.Mask ^= node.ID//利用相同数字异或结果为0的特性维护任务依赖状态
			// 该子节点可以放入可执行的channel中了
			if nextNode.Mask == nextNode.ID { 
				gg.taskChannel <- nextNode.ID
			}
	case <-g.doneChannel:
		g.Close()
		return

具体某个节点

这里以一个RecoveryNode为例,其可能放于文件:/nodes/recovery.go一个单独文件中,其他具体的节点也都放于单独的文件中,但都共同放在nodes文件夹中。

type RecoveryNode struct{
	 *Node // 继承节点的能力
	.... // 其他与当前节点相关的业务自定义字段
}
//注册名字
func (n *Node)Name()string{return "recovery"}

//注册依赖,假如当前节点依赖了分流和奖励节点
func (n *Node)Deps()[]string{
	return []string{"shunt", "reward"}//可以把Node的name定义为常量进行传递会更好,避免出错
}

func NewRecoveryNode(/**这里也可以传参数**/)*Node{
	return &RecoveryNode{Node:NewNode()}
}

func (n *RecoveryNode)Enable()bool{
	// 利用了ctx的WithValue能力,如下shunt.path就是一个key,其中shunt可以理解成是命名空间,表示是shunt节点中设置的path key,取其值
	// 这里表示分流节点中的通过路径不是1时,可以执行当前节点
	return n.Node.GetCtx().GetString("shunt.path","") != "1"
}

func (n *Node)Run(){
	count := n.Node.GetCtx().GetInt64("reward.count", 20)
	list:=Recovery(count)//示意而已,不用在乎业务具体逻辑
	n.Node.GetCtx().SetInt64List("recovery.success"", true)
}

// ... 其他一些方法的实现

如何使用

var e *Engine
func init(){
	e = NewEngine{}
}

func handler(){
	g := e.BuildGraph().
			ADD(NewShuntNode()).
			ADD(NewRewardNode()).
			ADD(NewRecoveryNode()).
			ADD(NewPackDataNode())
	
	err := g.Run()
	print(err)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2068373.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

解锁Spring Boot、Prometheus与Grafana三合一:打造你的专属自定义指标炫酷大屏!

1. 集成Prometheus到Spring Boot 需要在Spring Boot应用中集成Prometheus。可以通过micrometer库来实现。micrometer是一个应用程序监控库&#xff0c;它支持多种监控系统&#xff0c;包括Prometheus。 一招制胜&#xff01;Spring Boot、Prometheus和Grafana三剑合璧&#x…

数仓工具使用Docker部署DolphinScheduler 3.2.0 (分布式任务调度系统)-单机模式

Apache DolphinScheduler——开源大数据调度器神器 Apache DolphinScheduler(海豚调度),国人之光&#xff0c;是许多国人雷锋开源在Apache的顶级项目&#xff0c;主要功能就是负责任务的调度处理&#xff0c;Apache DolphinScheduler是一个分布式去中心化&#xff0c;易扩展的可…

C++--map和set

目录 1. 关联式容器 2. 键值对 3. 树形结构的关联式容器 3.1 set 3.2 map 3.3 multiset 3.4 multimap 4.底层结构 4.3红黑树与AVL树的比较 1. 关联式容器 前面我们已经接触过 STL 中的部分容器&#xff0c;比如&#xff1a; vector 、 list 、 deque、 forward_list(…

自建CDN/WAF解决方案--GoEdge

目录 概述 核心功能 典型应用场景 优点 适用对象 安装 使用 域名准备 DNSPOD的API秘钥申请 添加DNS厂商账号 添加集群 添加节点 添加网站 工作原理 概述 GoEdge 是一款高性能的、支持多种功能的反向代理服务器&#xff0c;通常用于流量管理、负载均衡、安全防护等…

layui2.9 树组件默认无法修改节点图标,修改过程记录下

官方文档树组件 data 参数值&#xff0c;未提供icon属性配置 需要修改源码 layui.js, 搜索图片中标记部分 查找到之后&#xff0c;修改为 <i class“‘(i.icon || “layui-icon layui-icon-file”)’”> 如图&#xff1a; 修改完成后&#xff0c;即可在data中添加icon…

Mysql双主双从

双主双从 1.安装Mysql1.1 查看linux版本1.2 下载Mysql安装包1.3 上传并解压1.4 安装Mysql1.5 编辑端口号1.6 Mysql启动命令1.7 更新密码 2.搭建Mysql主从复制2.1 搭建Master主服务器2.1.1 修改mysql配置文件2.1.2 重启Mysql服务2.1.3 创建Slave用户, 并授权2.1.4 查看主服务器当…

RTOS实战项目之实现多任务系统

文章目录 一、RTOS引入二、任务的引入2.1 任务的定义2.2 理解C函数的内部机制2.3 ARM架构2.4 汇编指令2.5 怎么保存函数的现场①要保存什么②保存现场的几种场景 三、FreeRTOS中怎么创建任务四、通过链表深入理解调度机制4.1 优先级与状态4.2 调度方法 五、创建任务—伪造现场5…

Python青少年简明教程:赋值语句

Python青少年简明教程&#xff1a;赋值语句 变量赋值是指将一个值分配给变量的过程。Python 支持多种形式的赋值&#xff08;assignment&#xff09;&#xff0c;包括基本赋值、多重赋值、链式赋值和解包赋值等。 为了深入理解Python赋值语句机制&#xff0c;先了解一下id()函数…

[Meachines] [Easy] Legacy nmap 漏洞扫描脚本深度发现+MS08-067

信息收集 IP AddressOpening Ports10.10.10.4TCP:135,139,445 $ nmap -p- 10.10.10.4 --min-rate 1000 -sC -sV -Pn PORT STATE SERVICE VERSION 135/tcp open msrpc Microsoft Windows RPC 139/tcp open netbios-ssn Microsoft Windows n…

战略合作篇白皮书:深度革新,赋能企业跃迁

01背景 企业数字化转型已经成为当今商业环境中不可避免的趋势&#xff0c;主要有以下几个原因&#xff1a; 技术发展&#xff1a;随着信息技术的迅猛发展和普及&#xff0c;企业面临着数字化转型的迫切需求。云计算、大数据、人工智能等技术正在改变商业模式和运营方式&#xf…

【C++第十四章】进阶模板

【C第十四章】进阶模板 非类型模板参数&#x1f9d0; 我们创建一个类&#xff0c;可以用模板开一个大小的为N的数组&#xff0c;这样优于用宏来定义N&#xff0c;因为可以在创建对象时可以根据需求更改数组大小。我们称在模板定义中使用的不依赖于模板类型的参数为非类型模板参…

当前A股平均市盈率

再写一篇【不务正业】的 2023-08-23A股平均市盈率 来自乐咕乐股网 当前A股市盈率是否为低点&#xff1f; 不言而喻 ‌当前A股市场的市盈率确实处于相对低位。‌ 根据东方财富Choice最新数据显示数据&#xff0c;截至2024年8月23日&#xff0c;全A市盈率为13.06倍&#xff0c;…

(贪心) LeetCode 45. 跳跃游戏 II

原题链接 一. 题目描述 给定一个长度为 n 的 0 索引整数数组 nums。初始位置为 nums[0]。 每个元素 nums[i] 表示从索引 i 向前跳转的最大长度。换句话说&#xff0c;如果你在 nums[i] 处&#xff0c;你可以跳转到任意 nums[i j] 处: 0 < j < nums[i] i j < n …

《加油吧少年》热播 编剧蔡璧鸿:创作需要对幽默保持高度关注

近日&#xff0c;校园剧《加油吧少年》正在热播中&#xff0c;该剧以学渣视角&#xff0c;讲述他在高中校园与学霸&#xff0c;女神&#xff0c;死党一起学习&#xff0c;成长和努力拼搏的故事&#xff0c;《加油吧少年》自播出后&#xff0c;便以轻松幽默&#xff0c;诙谐搞笑…

2024口碑最好的四大游泳耳机大揭秘,游泳教练全方位测评分析!

游泳&#xff0c;作为一种全身性的锻炼方式&#xff0c;越来越受到人们的青睐。在水下&#xff0c;人们渴望能够聆听到美妙的音乐&#xff0c;让游泳变得更加有趣和放松。游泳耳机的出现&#xff0c;正是为了满足这一需求。它们不仅能够提供防水、防汗的功能&#xff0c;还能在…

软件测试——自动化测试selenium常用函数

目录 元素的定位cssSelectorxpathxpath语法&#xff1a; 元素定位函数 操作测试对象窗口切换窗口窗口设置大小窗口切换屏幕截图关闭窗口 等待强制等待隐式等待显示等待 浏览器导航弹窗警告弹窗确认弹窗提示弹窗 文件上传浏览器参数设置 元素的定位 web⾃动化测试的操作核⼼是能…

RFID光触发标签在多行业的应用与效益差异

在当今数字化和智能化的浪潮下&#xff0c;RFID技术已成为众多行业优化运营、提升竞争力的关键手段。RFID光触发标签作为这一技术的创新成果&#xff0c;以其独特的性能特点&#xff0c;正逐渐在各个领域发挥着重要作用。 一、RFID光触发标签的特点与参数 &#xff08;一&…

优惠券秒杀项目

一、添加优惠券的同时&#xff0c;将优惠券信息&#xff0c;以及用户列表放到redis中 Override Transactional public void addSeckillVoucher(Voucher voucher) {// 保存优惠券save(voucher);// 保存秒杀信息SeckillVoucher seckillVoucher new SeckillVoucher();seckillVou…

linux dig域名DNS 查询与iptables域名ip访问流量限制;PTR 反向解析从 IP 地址到域名的映射

一、域名 dns查询 在 Linux 系统中&#xff0c;你可以使用多种工具和技术来进行 DNS 查询和 IP 限制。以下是一些常用的方法和工具&#xff1a; DNS 查询 dig 命令&#xff1a; dig 是一个强大的命令行工具&#xff0c;用于查询 DNS 信息。 dig example.com你可以指定查询类型…

【TCP】核心机制:滑动窗口、流量控制和拥塞控制

文章目录 滑动窗口窗口滑动滑动窗口丢包 流量控制拥塞控制窗口大小变化过程 滑动窗口 有一类算法题&#xff0c;就是通过滑动窗口的思想来解决的&#xff0c;算法中的“滑动窗口”借鉴自 TCP 的滑动窗口 TCP 是要保证可靠传输的>代价&#xff0c;降低了传输的效率&#xf…