分布式事务| 使用 dotnetcore/CAP 的本地消息表模式

news2024/11/15 9:33:45

本地消息表模式

本地消息表模式,其作为柔性事务的一种,核心是将一个分布式事务拆分为多个本地事务,事务之间通过事件消息衔接,事件消息和上个事务共用一个本地事务存储到本地消息表,再通过定时任务轮询本地消息表进行消息投递,下游业务订阅消息进行消费,本质上是依靠消息的重试机制达到最终一致性。其示意图如下所示,主要分为以下三步:c4cb1663270b642c367baa565ff6fb1b.png

  1. 本地业务数据和发布的事件消息共享同一个本地事务,进行数据落库,其中事件消息持久化到单独的事件发件箱表中。

  2. 单独的进程或线程不断查询发件箱表中未发布的事件消息。

  3. 将未发布的事件消息发布到消息代理,然后将消息的状态更新为已发布。

dotnetcore/CAP 简介

在《.NET 微服务:适用于容器化 .NET 应用程序的体系结构》电子书中,提及了如何设计兼具原子性和弹性的事件总线,其中提出了三种思路:使用完整的事件溯源模式,使用事务日志挖掘,使用发件箱模式(The outbox pattern)。其中事件溯源模式实现相对复杂,事务日志挖掘局限于特定类型数据库,而发件箱模式则是一种相对平衡的实现方式,其基于事务数据库表和简化的事件溯源模式。发件箱模式的示意图如下所示:

865b12bb45d445a18b831ecd48fa681c.png

从上图可以看出,其实现原理与上面提及的本地消息表模式十分相似,我们可以理解其也是本地消息表模式的一种实现。作者Savorboard也正是受该电子书启发,实现了.NET版本的本地消息表模式,并命名为dotnetcore/CAP,其架构如下图所示。其同时也兼具EventBus的功能,其支持主流消息代理,如RabbitMQ、Redis、Kafka和Pulsar,同时支持多种持久化存储方式进行消息存储,包括MySQL、PostgreSQL、SQL Server和MongoDB。因此基于dotnetcore/CAP,.NET 开发者也可以快速实现微服务间的异步通信和解决分布式事务问题。

0f61e1382cd433c7a5d29de323656af5.png

基于dotnetcore/CAP 实现分布式事务

那具体如何使用dotnetcore/CAP来解决分布式事务问题呢,基于本地消息表加补偿模式实现。dotnetcore/CAP的补偿模式比较巧妙,其基于发布事件的方法签名中提供了一个回调参数。发布方法的事件签名为:PublishAsync<T>(string name, T? contentObj, string? callbackName=null),第一个参数是事件名称,第二个参数为事件数据包,第三个参数用来指定于接收事件消费结果的回调地址(事件),但是否触发回调,取决于事件订阅方是否定义返回参数,若有则触发。如果基于CAP实现下单流程,则其流程如下所示:

b0cd52d80377b35ca52aaca545c84e47.png

接下来就来创建解决方案来实现以上下单流程示例。依次创建以下项目,订单服务、库存服务和支付服务均依赖共享类库项目,其中共享类库添加DotNetCore.CapDotNetCore.Cap.MySqlDotNetCore.Cap.RabbitMQNuGet包。

项目项目名项目类型
订单服务CapDemo.OrderServiceASP.NET Core Web API
库存服务CapDemo.InventoryServiceWorker Service
支付服务CapDemo.PaymentServiceWorker Service
共享类库CapDemo.SharedClass Library

订单服务

订单服务首先需要暴露WebApi用于订单的创建,为了方便数据的持久化,首先添加Pomelo.EntityFrameworkCore.MySqlNuget包,然后创建OrderDbContext

using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using Microsoft.EntityFrameworkCore;
using CapDemo.OrderService.Domains;

namespace CapDemo.OrderService.Data
{
    public class OrderDbContext : DbContext
    {
        public OrderDbContext (DbContextOptions<OrderDbContext> options)
            : base(options) {}

        public DbSet<CapDemo.OrderService.Domains.Order> Order { get; set; } = default!;
    }
}

然后创建OrdersController并添加PostOrder方法如下所示:

using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Http;
using Microsoft.AspNetCore.Mvc;
using Microsoft.EntityFrameworkCore;
using CapDemo.OrderService.Data;
using CapDemo.OrderService.Domains;
using DotNetCore.CAP;
using CapDemo.Shared;
using CapDemo.Shared.Models;

namespace CapDemo.OrderService.Controllers
{
    [Route("api/[controller]")]
    [ApiController]
    public class OrdersController : ControllerBase
    {
        private readonly OrderDbContext _context;
        private readonly ICapPublisher _capPublisher;
        private readonly ILogger<OrdersController> _logger;

        public OrdersController(OrderDbContext context, ICapPublisher capPublisher,ILogger<OrdersController> logger)
        {
            _context = context;
            _capPublisher = capPublisher;
            _logger = logger;
        }
        [HttpPost]
        public async Task<ActionResult<Order>> PostOrder(CreateOrderDto orderDto)
        {
            var shoppingItems =
                orderDto.ShoppingCartItems.Select(item => new ShoppingCartItem(item.SkuId, item.Price, item.Qty));
            var order = new Order(orderDto.CustomerId).NewOrder(shoppingItems.ToArray());
            
            using (var trans = _context.Database.BeginTransaction(_capPublisher, autoCommit: false))
            {
                _context.Order.Add(order);

                var deduceDto = new DeduceInventoryDto()
                {
                    OrderId = order.OrderId,
                    DeduceStockItems = order.OrderItems.Select(
                        item => new DeduceStockItem(item.SkuId, item.Qty, item.Price)).ToList()
                };
                await _capPublisher.PublishAsync(TopicConsts.DeduceInventoryCommand,deduceDto,
                    callbackName: TopicConsts.CancelOrderCommand);
                await _context.SaveChangesAsync();
                await trans.CommitAsync();
            }
                
            _logger.LogInformation($"Order [{order.OrderId}] created successfully!");

            return CreatedAtAction("GetOrder", new { id = order.OrderId }, order);
        }
    }
}

从代码中可以看出,在订单持久化和事件发布之前先行使用事务包裹:using (var trans = _context.Database.BeginTransaction(_capPublisher, autoCommit: false)) {},以确保订单和事件的持久化共享同一个事务,这一步是使用CAP的重中之重。订单服务通过注入了ICapPublisher服务,并通过PublishAsync方法发布扣减库存事件,并指定了callbackName: TopicConsts.CancelOrderCommand。订单服务还需要订阅取消订单和订单支付结果的事件,进行订单状态的更新,添加OrderConsumers如下所示,其中通过实现ICapSubscribe接口来显式标记为消费者,然后定义方法并在方法体上通过[CapSubscribe]特性指定订阅的事件名称来完成事件的消费。

using CapDemo.OrderService.Data;
using CapDemo.Shared;
using DotNetCore.CAP;

namespace CapDemo.OrderService.Consumers;

public class OrderConsumers:ICapSubscribe
{
    private readonly OrderDbContext _orderDbContext;
    private readonly ILogger<OrderConsumers> _logger;

    public OrderConsumers(OrderDbContext orderDbContext,ILogger<OrderConsumers> logger)
    {
        _orderDbContext = orderDbContext;
        _logger = logger;
    }
    [CapSubscribe(TopicConsts.CancelOrderCommand)]
    public async Task CancelOrder(string orderId)
    {
        if(string.IsNullOrEmpty(orderId)) return;
        var order = await _orderDbContext.Order.FindAsync(orderId);
        order?.CancelOrder();
        _logger.LogWarning($"Order [{orderId}] has been canceled!");
        await _orderDbContext.SaveChangesAsync();
    }

    [CapSubscribe(TopicConsts.PayOrderSucceedTopic)]
    public async  Task MarkToPaid(string orderId)
    {
        var order = await _orderDbContext.Order.FindAsync(orderId);
        
        order?.UpdateToPaid();

        await _orderDbContext.SaveChangesAsync();
    }
}

最后修改Program.cs添加CAP服务和消费者的注册。

using CapDemo.OrderService.Consumers;
using CapDemo.OrderService.Data;
using Microsoft.EntityFrameworkCore;
using DotNetCore.CAP;

var builder = WebApplication.CreateBuilder(args);

// 注册 DbContext
var connectionStr = builder.Configuration.GetConnectionString("Default");
builder.Services.AddDbContext<OrderDbContext>(options =>
    options.UseMySql(connectionStr ?? throw new InvalidOperationException("Connection string 'OrderDbContext' not found."), ServerVersion.AutoDetect(connectionStr)));
// 注册CAP
builder.Services.AddCap(x =>
{
    x.UseEntityFramework<OrderDbContext>();
    x.UseRabbitMQ("localhost");
});
// 注册消费者
builder.Services.AddTransient<OrderConsumers>();

库存服务

库存服务在整个下单流程的职责主要是库存的扣减和返还,添加InventoryConsumer来消费库存扣减和返还事件即可。

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Text.Json;
using System.Threading.Tasks;
using CapDemo.Shared;
using CapDemo.Shared.Models;
using DotNetCore.CAP;

namespace CapDemo.InventoryService.Consumers
{
    public class InventoryConsumer : ICapSubscribe
    {
        private readonly ILogger<InventoryConsumer> _logger;
        private readonly ICapPublisher _capPublisher;

        public InventoryConsumer(ILogger<InventoryConsumer> logger, ICapPublisher capPublisher)
        {
            _logger = logger;
            _capPublisher = capPublisher;
        }

        [CapSubscribe(TopicConsts.DeduceInventoryCommand)]
        public async Task DeduceInventory(DeduceInventoryDto deduceStockDto)
        {
            // 省略扣减库存逻辑,直接成功
            _logger.LogInformation($"Inventory has been deducted for order [{deduceStockDto.OrderId}]!");
            var amount = deduceStockDto.DeduceStockItems.Sum(t => t.Price * t.Qty);
            await _capPublisher.PublishAsync(TopicConsts.PayOrderCommand, new PayDto(deduceStockDto.OrderId, amount),
                callbackName: TopicConsts.ReturnInventoryTopic);
        }

        [CapSubscribe(TopicConsts.ReturnInventoryTopic)]
        public void ReturnInventory(PayResult payResult)
        {  
         // 若支付失败,则执行库存返还并发布取消订单命令
            if (!payResult.IsSucceed)
            {
             // 省略返还库存逻辑
                _logger.LogWarning($"Inventory has been returned for order [{payResult.OrderId}]");
                _capPublisher.PublishAsync(TopicConsts.CancelOrderCommand, payResult.OrderId);
            }
        }
    }
}

以上的库存扣减实现中省略了扣减库存逻辑,直接模拟成功扣减,也就无需触发回调,那就可以通过将方法签名定义为public async Task DeduceInventory(DeduceInventoryDto deduceStockDto),这样就不会触发订单服务发布扣减库存事件时指定的回调。库存扣减成功随即发布支付订单的命令,由于不涉及其他数据持久化,因此无需手动开启事务。发布支付订单命令时指定了callbackName: TopicConsts.ReturnInventoryTopic,其将根据订单支付结果也就是ReturnInventory(PayResult payResult)中指定的入参决定是否返还库存。最后同样需要在Program.cs中注入CAP服务和消费者:

using CapDemo.InventoryService;
using CapDemo.InventoryService.Consumers;

IHost host = Host.CreateDefaultBuilder(args)
    .ConfigureServices((context, services) =>
    {
        var connStr = context.Configuration.GetConnectionString("Default");
        services.AddCap(x =>
        {
            x.UseMySql(connStr);
            x.UseRabbitMQ("localhost");
        });

        services.AddTransient<InventoryConsumer>();
    })
    .Build();

await host.RunAsync();

支付服务

对于下单流程的支付用例来说,要么成功要么失败,并不需要像以上两个服务一样定义补偿逻辑,因此仅需要订阅支付订单命令即可,定义PaymentConsumers如下所示,因为库存服务发布支付订单命令时指定的回调依赖支付结果,因此该方法必须指定与回调匹配的返回参数类型,也就是PayResult

using CapDemo.Shared;
using CapDemo.Shared.Models;
using DotNetCore.CAP;

namespace CapDemo.PaymentService.Consumers;

public class PaymentConsumers:ICapSubscribe
{
    private readonly ICapPublisher _capPublisher;
    private readonly ILogger<PaymentConsumers> _logger;

    public PaymentConsumers(ICapPublisher capPublisher,ILogger<PaymentConsumers> logger)
    {
        _capPublisher = capPublisher;
        _logger = logger;
    }
    [CapSubscribe(TopicConsts.PayOrderCommand)]
    public async Task<PayResult> Pay(PayDto payDto)
    {
        bool isSucceed = false;
        if (payDto.Amount % 2 == 0)
        {
            isSucceed = true;
            _logger.LogInformation($"Order [{payDto.OrderId}] paid successfully!");
            await _capPublisher.PublishAsync(TopicConsts.PayOrderSucceedTopic, payDto.OrderId);
        }
        else
        {
            isSucceed = false;
            _logger.LogWarning($"Order [{payDto.OrderId}] payment failed!");
        }

        return new PayResult(payDto.OrderId, isSucceed);
    }
}

最后同样需要在Program.cs中注入CAP服务和消费者:

using CapDemo.PaymentService;
using CapDemo.PaymentService.Consumers;

IHost host = Host.CreateDefaultBuilder(args)
    .ConfigureServices((context, services) =>
    {
        var connStr = context.Configuration.GetConnectionString("Default");
        services.AddCap(x =>
        {
            x.UseMySql(connStr);
            x.UseRabbitMQ("localhost");
        });
        services.AddTransient<PaymentConsumers>();
    })
    .Build();

await host.RunAsync();

运行结果

使用docker启动MySQL和RabbitMQ,然后再启动三个服务,并在订单服务的Swagger中发起订单创建请求,如下图所示:

a4ab83dbca28f37fdf99518a21ef02ef.png

最终执行结果如下图所示:

c97a95626614650891bca98c403f80f7.png

打开RabbitMQ后台,可以看见CAP为每个服务创建了一个唯一队列接收消息,并通过创建的名为cap.default.router的Exchange根据事件名称作为RoutingKey进行消息路由。

e65a59443d7d654a9cedcec716b6a8a8.png

其中通过dotnetcore/CAP发布的消息结构如下图所示,该图是订单服务发布的扣减库存的消息。

e9e7c226df6be0a1ff4a053a965a7c23.png

打开MySQL,可以发现dotnetcore/CAP 根据配置的连接字符串,分别为各个服务创建了cap.publishedcap.received消息表,如下图所示:

072f4aa9840a1dd90c81570cfdf22dbc.png

小结

通过以上示例,可以发现dotnetcore/CAP无疑是一个出色的事件总线,简单易用且能确保事件的有效送达。同时基于dotnetcore/CAP的本地消息表模式和补偿模式,也可以有效的实现分布式事务。但相较而言,补偿仅限于直接上下游服务之间,不能链式反向补偿,控制逻辑比较分散,属于协同式事务,各个服务需要订阅自己关注的事件并实现,适用于小中型项目,对于大型项目而言尤其需要注意事件的流转,以避免陷入事件漩涡。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/185171.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

运放电路中输入失调电压Vos及温漂-运算放大器

实际运放与理想运放具有很多差别&#xff0c;要理解这些差别&#xff0c;就必须认识实际运放的参数。下图是用于描述实际运放几个关键参数的等效模型。模型中&#xff0c;第一个黄色运放是一个近似的理想运放&#xff0c;只有Auo不是无穷大&#xff0c;其余都是理想的。第二个运…

【GD32F427开发板试用】 CAN总线收发测试

本篇文章来自极术社区与兆易创新组织的GD32F427开发板评测活动&#xff0c;更多开发板试用活动请关注极术社区网站。作者&#xff1a;meijing 这篇测试下CAN通信的收发测试&#xff0c;代码使用库例程中修改。 硬件部分 测试用到了CAN0、串口0和定时器1。 1> CAN0使用的接…

ccflow代码

ccflow代码目录概述需求&#xff1a;设计思路实现思路分析1.什么是流程版本管理&#xff1f;流程讲义&#xff1a;参考资料和推荐阅读Survive by day and develop by night. talk for import biz , show your perfect code,full busy&#xff0c;skip hardness,make a better r…

企业如何利用制造业ERP管理系统做好仓库管理?

仓库管理&#xff0c;对于生产制造型企业来说是重中之重&#xff0c;很多制造企业的大部分”身家“&#xff0c;都在仓库里了。众多的原材料和堆积如山的成品、半成品&#xff0c;往往占用了企业大部分的流动资金。来料是否及时&#xff0c;物料是否齐备&#xff0c;库存是否安…

流程引擎与应用系统分布式部署架构

一、为什么应用系统和流程引擎需要分开部署 有句话讲&#xff1a;存在即合理。在实际的企业应用需求里有如下几种场景&#xff0c;需要把业务系统和流程引擎分开部署。 企业流程治理需求。即整个企业只部署一套流程平台BPM&#xff0c;也叫企业级流程中心BPM、或者跨系统端到…

canal数据同步安装、使用

canal源码仓库&#xff1a;https://github.com/alibaba/canal博主使用的是canal 1.5.5版本 MySQL 5.7.32 JDK:1.8 canal各个版本&#xff1a;https://github.com/alibaba/canal/releasescanal-adapter下载 canal-admin 下载 canal-deployer 下载上传到目标服务器对应目录下解压…

初识ros-Navigation

最近一直在看京天Turtlebot3 waffle pi的导航部分&#xff0c;这篇文章就介绍一下相关内容。导航模块是一个独立完整的模块&#xff0c;内容比较多也很深入。因为笔者没有看过源码&#xff0c;只是一些概念上的了解&#xff0c;做个整理&#xff0c;为后续的源码阅读做准备。本…

苏嵌实训——day16

文章目录一、进程间通信&#xff1a;1.传统通信方式&#xff1a;2. IPC通信方式&#xff08;第五代操作系统&#xff09;&#xff1a;&#xff08;1&#xff09;传统通信之无名管道&#xff08;2&#xff09;传统通信方式之有名管道&#xff08;3&#xff09;使用有名管道来实现…

Python实现清除文件夹中重复视频

目录一、二进制文件二、摘要算法(MD5)三、shutil模块四、视频清除视频全在一个文件夹里视频在不同的文件夹里一、二进制文件 二进制文件是以文本的二进制形式存储在计算机中。 用户一般不能直接读取它们&#xff0c;需要通过相应的软件才能将其显示出来。 二进制文件一般是可…

jspssm大学生宿舍管理系统-宿管带前端

目录 摘 要 II Abstract III 1 绪论 1 1.1 课题背景 1 1.2 课题研究现状 1 1.3 初步设计方法与实施方案 2 1.4 本文研究内容 2 2 系统开发环境 4 2.1 JSP技术 4 2.2 B/S架构 5 2.3 Eclipse环境配置 5 2.4 MySQL数据库 6 3 系统分析 7 3…

【微服务】Docker容器化

&#x1f6a9;本文已收录至专栏&#xff1a;微服务探索之旅 &#x1f44d;希望您能有所收获 一.引入 (1) 为什么需要Docker 微服务虽然具备各种各样的优势&#xff0c;但服务的拆分的非常多给部署带来了很大的麻烦。 分布式系统中&#xff0c;依赖的组件非常多&#xff0c;不同…

【1】Python基础语法

字面量 字面量&#xff1a;在程序中&#xff0c;被写下来的固定值&#xff0c;称之为字面量。Python中常用的6种数据类型&#xff1a; 字符串&#xff08;string&#xff09;&#xff0c;又称文本&#xff0c;是由任意数量的字符如中文、英文、各类符号、数字等组成&#xff0…

虚拟化技术学习笔记10

虚拟机镜像管理 学习目标&#xff1a; 能够了解KVM虚拟机支持的镜像格式 能够使用qemu-img实现镜像创建 能够使用qemu-img实现镜像查看 能够使用qemu-img实现镜像格式转换 能够了解后备镜像的作用 能够了解差量镜像的作用 能够基于后备镜像制作差量镜像 能够使用差量镜…

[网鼎杯 2020 朱雀组]Nmap(双解详细分析)

目录 Nmap 相关参数 信息收集 思路 方法一 方法二 nmap常见操作 Nmap 相关参数 -iL 读取文件内容&#xff0c;以文件内容作为搜索目标 -o 输出到文件 -oN 标准保存 -oX XML保存 -oG Grep保存 -oA 保存到所有格式 信息收集 可以对ip进行扫描 思路 方法一 将一句话木马…

【高并发】- 生产级系统搭建 - 3

前言 本章讲解高并发系统动静分离方案设计、热点数据处理、管控等思想。 1. 动静分离方案设计 动静分离实质&#xff0c;将静态页面与动态页面&#xff08;或者静态数据与动态数据&#xff09;解耦分离&#xff0c;用不同系统承载对应流量。这样可以提升整个服务的访问性能和可…

MySql性能优化(五)优化细节

优化细节 当使用数据库列进行查询的时候尽量不要使用表达式&#xff0c;把计算结果放到业务层而不是数据层尽量使用主键索引&#xff0c;而不是其他索引&#xff0c;因此主键索引不会触发回表查询使用前缀索引 有的时候需要索引很长的字符串&#xff0c;这会让索引变的大且慢&…

Ethercat系列(1)COE非周期性数据通信

Ethercat主站通过读写邮箱数据SM通道实现非周期性数据通信。邮箱数据定义邮箱数据单元结构邮箱数据头各字段含义如下表非周期性邮箱数据通信EtherCAT协议中非周期性数据通信称为邮箱数据通信&#xff0c;它可以双向进行---主站到从站和从站到主站。它支持全双工&#xff0c;两个…

存储介质还是存储载体,这不是个问题

在档案领域中&#xff0c;“介质”和“载体”到底有什么区别&#xff1f;能不能混用&#xff1f;这个问题曾经困扰了笔者10几年&#xff0c;直到最近才发觉原来根本不是一个问题。我们先来看两句话&#xff1a; 1、磁盘、光盘、固态硬盘等常见数据存储载体的寿命无法满足电子档…

RabbitMQ快速入门和使用

文章目录1. 基础理论1.1. 同步调用与异步调用1.2. RabbitMQ 安装与运行1.2.1. 常见消息模型2. 基本消息队列的应用2.1. 消息发送流程2.2. 消息接收流程3. SpringAMQP的基础理论与应用(想快速应用看这里)3.1. 基础理论3.2. 【案例一】实现HelloWorld中的基础消息队列功能3.3. 【…

Neo4j数据库模糊查询

1、Neo4j单个查询条件模糊查询1.1使用 ~’.模糊匹配对象.’ 进行表示1.1.1 查询节点MATCH(n:Author) WHERE n.name ~.*梦.* RETURN n1.1.2 查询关系MATCH p({title:锆石U-Pb和Lu-Hf同位素研究内蒙乌努格吐山斑岩型铜钼矿岩浆岩特征})-[r:has_illustration]->(i:Illustration…