Golang基于DTM的分布式事务SAGA实战

news2025/1/22 20:46:46

SAGA介绍

SAGA是“长时间事务”运作效率的方法,大致思路是把一个大事务分解为可以交错运行的一系列子事务的集合。原本提出 SAGA 的目的,是为了避免大事务长时间锁定数据库的资源,后来才逐渐发展成将一个分布式环境中的大事务,分解为一系列本地事务的设计模式

SAGA事务典型的时序图

SAGA失败的时序图

如图TM事务管理器,DTM是开源的分布式事务管理中间件

DTM的SAGA支持

dtm根据http的不同状态码来代表当前事务的处理结果

dtm事务默认无回滚时间支持,尽最大能力交付

失败重试默认为指数回避算法。需要固定时间重试需要在saga属性配置

dtm默认事务执行顺序为并发执行也是顺序执行,可以设置属性为并行执行

http状态码当前版本不能完全代表业务成功需要结合 返回msg具体看业务代码

实战

代码在宿主机运行 docker network:bridge

docker安装,安装成功后可以访问http://localhost:36789/ 打开dtm事务web-ui

代码github GitHub - Ssummer520/dtm-gin

docker run -itd  --name dtm -p 36789:36789 -p 36790:36790  yedf/dtm:latest
创建tm事务管理器提交全局事务
package main

import (
	"fmt"
	"github.com/dtm-labs/dtmcli"
	"github.com/gin-gonic/gin"
	"github.com/lithammer/shortuuid/v3"
	"log"
)

func main() {
	app := gin.Default()

	app.GET("/test", func(c *gin.Context) {
		QsFireRequest()
		log.Printf("TransOut")
		c.JSON(200, "sss")
	})
	app.Run(":1111")

}

const qsBusiAPI = "/api/busi_start"
const qsBusiPortIN = 8881
const qsBusiPortOUT = 8880
const dtmServer = "http://localhost:36789/api/dtmsvr"

var qsBusiIN = fmt.Sprintf("http://host.docker.internal:%d%s", qsBusiPortIN, qsBusiAPI)
var qsBusiOUT = fmt.Sprintf("http://host.docker.internal:%d%s", qsBusiPortOUT, qsBusiAPI)

func QsFireRequest() string {
	req := &ReqHTTP{Amount: 30} // load of micro-service
	// DtmServer is the url of dtm
	saga := dtmcli.NewSaga(dtmServer, shortuuid.New()).
		// add a TransOut sub-transaction,forward operation with url: qsBusi+"/TransOut", reverse compensation operation with url: qsBusi+"/TransOutCompensate"
		Add(qsBusiOUT+"/TransOut", qsBusiOUT+"/TransOutCompensate", req).
		// add a TransIn sub-transaction, forward operation with url: qsBusi+"/TransIn", reverse compensation operation with url: qsBusi+"/TransInCompensate"
		Add(qsBusiIN+"/TransIn", qsBusiIN+"/TransInCompensate", req)
	// submit the created saga transaction,dtm ensures all sub-transactions either complete or get revoked
	saga.RetryInterval = 1
	//saga.RequestTimeout = 10
	err := saga.Submit()

	if err != nil {
		panic(err)
	}
	return saga.Gid
}

type ReqHTTP struct {
	Amount int `json:"amount"`
}

saga全局事务属性设置

saga属性事务设置
type TransOptions struct {
	WaitResult         bool              `json:"wait_result,omitempty" gorm:"-"`     // 是否等待结果,默认为false
	TimeoutToFail      int64             `json:"timeout_to_fail,omitempty" gorm:"-"` // 事务失败的超时时间,单位:秒
	RequestTimeout     int64             `json:"request_timeout,omitempty" gorm:"-"` // 全局事务的请求超时时间,单位:秒
	RetryInterval      int64             `json:"retry_interval,omitempty" gorm:"-"`  // 重试间隔时间,单位:秒
	PassthroughHeaders []string          `json:"passthrough_headers,omitempty" gorm:"-"` // 需要传递的HTTP头部字段
	BranchHeaders      map[string]string `json:"branch_headers,omitempty" gorm:"-"`  // 自定义的分支头部字段,DTM服务器到服务API
	Concurrent         bool              `json:"concurrent" gorm:"-"`                // 是否并发执行,适用于saga和消息事务类型
}
rm1表示第一个微服务业务
package main

import (
	"fmt"
	"github.com/dtm-labs/dtmcli"
	"github.com/dtm-labs/dtmcli/dtmimp"
	"github.com/dtm-labs/dtmcli/logger"
	"github.com/gin-gonic/gin"
	"log"
	"net/http"
)

func main() {
	QsStartSvr()

}

// busi address
const qsBusiAPI = "/api/busi_start"
const qsBusiPort = 8881

// QsStartSvr quick start: start server
func QsStartSvr() {
	app := gin.Default()
	qsAddRoute(app)
	log.Printf("quick start examples listening at %d", qsBusiPort)

	app.Run(fmt.Sprintf(":%d", qsBusiPort))

}

func qsAddRoute(app *gin.Engine) {
	app.POST(qsBusiAPI+"/TransIn", func(c *gin.Context) {
		info := infoFromContext(c)
		var req ReqHTTP
		c.ShouldBindJSON(&req)
		log.Printf("TransIn:%v,gid:%v", req.Amount, info.Gid)
		c.JSON(http.StatusOK, dtmimp.OrString(MainSwitch.QueryPreparedResult.Fetch(), dtmcli.ResultSuccess)) // Status 409 for Failure. Won't be retried
	})
	app.POST(qsBusiAPI+"/TransInCompensate", func(c *gin.Context) {
		info := infoFromContext(c)
		var req ReqHTTP
		c.ShouldBindJSON(&req)
		log.Printf("TransInCompensate:%v,gid:%v", req.Amount, info.Gid)
		c.JSON(http.StatusOK, dtmimp.OrString(MainSwitch.QueryPreparedResult.Fetch(), dtmcli.ResultSuccess))
	})

}
func string2DtmError(str string) error {
	return map[string]error{
		dtmcli.ResultFailure: dtmcli.ErrFailure,
		dtmcli.ResultOngoing: dtmcli.ErrOngoing,
		dtmcli.ResultSuccess: nil,
		"":                   nil,
	}[str]
}

type mainSwitchType struct {
	TransInResult         AutoEmptyString
	TransOutResult        AutoEmptyString
	TransInConfirmResult  AutoEmptyString
	TransOutConfirmResult AutoEmptyString
	TransInRevertResult   AutoEmptyString
	TransOutRevertResult  AutoEmptyString
	QueryPreparedResult   AutoEmptyString
	NextResult            AutoEmptyString
	JrpcResult            AutoEmptyString
	FailureReason         AutoEmptyString
}

// AutoEmptyString auto reset to empty when used once
type AutoEmptyString struct {
	value string
}

// SetOnce set a value once
func (s *AutoEmptyString) SetOnce(v string) {
	s.value = v
}

// Fetch fetch the stored value, then reset the value to empty
func (s *AutoEmptyString) Fetch() string {
	v := s.value
	s.value = ""
	if v != "" {
		logger.Debugf("fetch obtain not empty value: %s", v)
	}
	return v
}

// MainSwitch controls busi success or fail
var MainSwitch mainSwitchType

func infoFromContext(c *gin.Context) *dtmcli.BranchBarrier {
	info := dtmcli.BranchBarrier{
		TransType: c.Query("trans_type"),
		Gid:       c.Query("gid"),
		BranchID:  c.Query("branch_id"),
		Op:        c.Query("op"),
	}
	return &info
}

type ReqHTTP struct {
	Amount int `json:"amount"`
}
rm2表示第二个微服务业务
package main

import (
	"fmt"
	"github.com/dtm-labs/dtmcli"
	"github.com/dtm-labs/dtmcli/dtmimp"
	"github.com/dtm-labs/dtmcli/logger"
	"github.com/gin-gonic/gin"
	"log"
	"net/http"
)

func main() {
	app := gin.Default()
	app.POST(qsBusiAPI+"/TransOut", func(c *gin.Context) {
		info := infoFromContext(c)
		var req ReqHTTP
		c.ShouldBindJSON(&req)
		log.Printf("TransOut:%v,gid:%v", req.Amount, info.Gid)
		c.JSON(http.StatusOK, dtmimp.OrString(MainSwitch.QueryPreparedResult.Fetch(), dtmcli.ResultSuccess))
	})
	app.POST(qsBusiAPI+"/TransOutCompensate", func(c *gin.Context) {
		info := infoFromContext(c)
		var req ReqHTTP
		c.ShouldBindJSON(&req)
		log.Printf("TransOutCompensate:%vgid:%v", req.Amount, info.Gid)
		c.JSON(http.StatusOK, dtmimp.OrString(MainSwitch.QueryPreparedResult.Fetch(), dtmcli.ResultSuccess))
	})
	log.Printf("quick start examples listening at %d", qsBusiPort)

	app.Run(fmt.Sprintf(":%d", qsBusiPort))
}

// busi address
const qsBusiAPI = "/api/busi_start"
const qsBusiPort = 8880

// QsStartSvr quick start: start server
func QsStartSvr() {

}

type mainSwitchType struct {
	TransInResult         AutoEmptyString
	TransOutResult        AutoEmptyString
	TransInConfirmResult  AutoEmptyString
	TransOutConfirmResult AutoEmptyString
	TransInRevertResult   AutoEmptyString
	TransOutRevertResult  AutoEmptyString
	QueryPreparedResult   AutoEmptyString
	NextResult            AutoEmptyString
	JrpcResult            AutoEmptyString
	FailureReason         AutoEmptyString
}

// AutoEmptyString auto reset to empty when used once
type AutoEmptyString struct {
	value string
}

// SetOnce set a value once
func (s *AutoEmptyString) SetOnce(v string) {
	s.value = v
}

// Fetch fetch the stored value, then reset the value to empty
func (s *AutoEmptyString) Fetch() string {
	v := s.value
	s.value = ""
	if v != "" {
		logger.Debugf("fetch obtain not empty value: %s", v)
	}
	return v
}

// MainSwitch controls busi success or fail
var MainSwitch mainSwitchType

type ReqHTTP struct {
	Amount int `json:"amount"`
}

func infoFromContext(c *gin.Context) *dtmcli.BranchBarrier {
	info := dtmcli.BranchBarrier{
		TransType: c.Query("trans_type"),
		Gid:       c.Query("gid"),
		BranchID:  c.Query("branch_id"),
		Op:        c.Query("op"),
	}
	return &info
}
结果

运行tm提交一个全局事务

rm1返回

rm2返回

dtm webui管理页面

当前业务已经消费成功

我们把这块修改为rm1 提交失败,看到rm2事务回滚

const (
	// StatusPrepared 表示全局/分支事务的状态。
	// 第一步,事务准备阶段
	StatusPrepared = "prepared"
	// StatusSubmitted 表示全局事务的状态。
	StatusSubmitted = "submitted"
	// StatusSucceed 表示全局/分支事务的状态。
	StatusSucceed = "succeed"
	// StatusFailed 表示全局/分支事务的状态。
	// 注意:将全局状态更改为失败可以停止触发(在生产环境中不推荐)
	StatusFailed = "failed"
	// StatusAborting 表示全局事务的状态。
	StatusAborting = "aborting"

	// ResultSuccess 事务/事务分支的结果成功
	ResultSuccess = dtmimp.ResultSuccess
	// ResultFailure 事务/事务分支的结果失败
	ResultFailure = dtmimp.ResultFailure
	// ResultOngoing 事务/事务分支的结果进行中
	ResultOngoing = dtmimp.ResultOngoing

	// DBTypeMysql 数据库驱动类型:MySQL
	DBTypeMysql = dtmimp.DBTypeMysql
	// DBTypePostgres 数据库驱动类型:PostgreSQL
	DBTypePostgres = dtmimp.DBTypePostgres
)

参考资料SAGA事务模式 | DTM开源项目文档

https://zhuanlan.zhihu.com/p/688088173

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2044190.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

关于tresos Studio(EB)的MCAL配置之DIO

General Dio Development Error Detect开发者错误检测 Dio Flip Channel Api翻转通道电平接口Dio_FlipChannel是否启用 Dio Version Info Api决定Dio_GetVersionInfo接口是否启用,一般打开就行。 Dio Reverse Port Bits让端口的位(通道)进…

最新号卡推广单页源码/仿制手机卡流量卡号卡代理推广源码/简洁实用/带弹窗公告+后台管理

源码简介: 最新号卡推广单页源码,它是手机卡流量卡号卡代理推广源码量身打造的,不仅设计得简洁实用,而且还有炫酷的弹窗公告功能和强大的后台管理系统哦! 一款号卡推广单页源码,自己仿制来的,…

arcgis-坡度坡向分析

坡向的描述有定性和定量两种方式,定量是以东为0,顺时针递增,南为90,西为180,北为270等,范围在0~35959′59″之间。 定性描述有8方向法和4方向法. 8 方向为东、东南、南、西南、西、西北、北、东…

Linux安装jdk8,tomcat和mysql

目录 Linux安装jdk8 第一步:下载jdk8 第二步:把下载好的压缩包通过finalShell发送到linux虚拟机上 ​编辑 第三步:解压缩 第四步:配置环境变量 第五步:重新加载profile配置文件 第六步:检查是否安装成…

C++ -- 负载均衡式在线OJ (三)

文章目录 四、oj_server模块1. oj_server的功能路由2. 建立文件版的题库3. model模块4.controller模块5.judge模块(负载均衡)6.view模块整体代码结构(前端的东西,不是重点) 五、最终效果项目源码 前面部分请看这里C –…

Unite Shanghai 2024 团结引擎专场 | 团结引擎实时全局光照

在 2024 年 7 月 24 日的 Unite Shanghai 2024 团结引擎专场演讲中,Unity 中国高级技术经理周赫带大家深入解析了团结引擎的实时全局光照系统。该系统支持完全动态的场景、动态材质和动态灯光的 GI 渲染,包括无限次弹射的漫反射和镜面反射 GI。 周赫&…

2024年职场常备!3款高效数据恢复软件免费版,让打工人工作无忧

嘿,职场的朋友们!咱们现在工作,数据就跟空气一样重要,对吧?但有时候,手一滑,文件没了,硬盘突然就挂了,系统也闹点小情绪,那心情,比股市大跌还难受…

基于Django的boss直聘数据分析可视化系统的设计与实现

研究背景 随着互联网的发展,在线招聘平台逐渐成为求职者与企业之间的重要桥梁。Boss直聘作为国内领先的招聘平台,以其独特的直聊模式和高效的匹配算法,吸引了大量的用户。然而,随着平台用户的增长,海量的招聘数据带来…

基于Faster-RCNN的停车场空位检测,支持图像和视频检测(pytorch框架)【python源码+UI界面+功能源码详解】

功能演示: 基于Faster-RCNN的停车场空位检测系统,支持图像检测和视频检测(pytorch框架)_哔哩哔哩_bilibili (一)简介 基于Faster-RCNN的停车场空位检测系统是在pytorch框架下实现的,这是一个…

YB5214B 同步开关型降压锂电池充电管理芯片

概述: 是一款支持 4.5-16V 输入电压范围,最大输出为 2A 电流的同步降压锂电池充电管理芯片。芯片内部集成了低阻功率 MOSFETS,采用 500kHz的开关频率以实现较小的元件尺寸和较高的充电效率。 内部还集成了多重保护功能,能够最大程…

基于NXP IMX6Q+FPGA全自动血液分析仪解决方案

全自动血细胞分析仪 ,临床又称血常规检测仪、血液分析仪、血球分析仪、血液细胞分析仪、血球计数仪,是指对一定体积全血内血细胞异质性进行自动分析的临床检验常规仪器。 NXP IMX6Q核心板采用四核Cortex-A9架构,主频1GHz,12层PCB…

知识竞赛中风险题环节竞赛规则有哪些设计方案

风险题环节是知识竞赛活动中一个高潮环节,很多时候都是放到最后压轴,选手会根据之前的成绩进行最后一博。那么,常用的风险题环节规则应怎么设计呢?下面列出的这些大家可以参考一下。 1.设置不同分值的题,由选手根据自…

CSS——字体背景(Font Background)

一、字体族 1、字体的相关样式: ① color 用来设置字体颜色(前景颜色) ② font-size 字体的大小 和font-size相关的单位: em 相对于当前元素的一个font-size rem 相对于根元素的一个font-size ③ font-family 字体族&#x…

软件测试第4章 白盒测试方法(逻辑覆盖测试)

一、白盒测试方法 二、白盒测试 VS 静态测试 【在不运行程序的情况下(即静态测试,程序审查)】 三、白盒测试方法 1、程序控制流图 2、逻辑覆盖测试 测试覆盖率 用于确定测试所执行到的覆盖项的百分比,其中覆盖项是指作为测试基础的一个入口或属性&am…

异常信息转储笔记-demangle函数名字符

前情 上一篇笔记留下了两个待解决问题,其中之一是输出的函数名被奇怪字符覆盖了一部分,本篇笔记即将解决这个问题(下图问题1)。 问题描述 如上,使用libunwind输出core堆栈信息时,有部分字符被覆盖&#x…

fetch_20newsgroups报错403的两种解决办法

在使用sklearn机器学习库使用fetch_20newsgroups调用数据集时候(如下方代码所示),报错403怎么办,本人亲测两种方法,分享大家: data fetch_20newsgroup(subset"train")一、第一种方法 1.下载压…

十大护眼落地灯品牌中护眼效果最好的是哪款?落地灯十大知名品牌

根据不完全统计,我国儿童青少年的近视率达到了52.7%,也就是说10个孩子中有5个以上的视力都处于一个亚健康的状态,这和户外运动少及室内灯光差有很大关系,在面临这种现状下,很多家长对日常用眼时的光线质量越来越重视&a…

智慧叉车监管系统,司机权限启动车辆,杜绝无证驾驶!

叉车广泛应用于各种生产场所,是常见的一种作业工具。叉车作业具有较大的危险性,司机的不安全操作行为是导致叉车事故发生的主要原因之一。近年来,由于操作人员无证驾驶、违章作业等原因,国内发生了多起叉车安全事故,造…

【爬虫新手村】零基础入门到实战:解锁互联网数据收集的密钥,爬虫技术全攻略

文章目录 前言一、爬虫1.基本概念2.常用库3.基本步骤4.注意事项 二、爬虫示例代码1.案例一:requests 的基本使用2.案例二:爬取古诗文(requestsBeautifulSoup)3.案例三:爬取美食(requestsBeautifulSoup&…

智能语音识别技术在无人驾驶领域的应用案例

随着无人驾驶技术的进步与发展,越来越多的企业、创业者注意到无人驾驶领域潜藏的巨大市场经济价值,越来越多的企业和创业者进入无人驾驶领域,以近期业内关注的萝卜快跑为例,其在武汉地区的成功推广与落地预示着无人驾驶在网约车领…