MASA Framework 事件总线 - 进程内事件总线

news2024/11/13 12:54:26

概述

事件总线是一种事件发布/订阅结构,通过发布订阅模式可以解耦不同架构层级,同样它也可以来解决业务之间的耦合,它有以下优点

  • 松耦合
  • 横切关注点
  • 可测试性
  • 事件驱动

发布订阅模式

通过下图我们可以快速了解发布订阅模式的本质

  1. 订阅者将自己关心的事件在调度中心进行注册
  2. 事件的发布者通过调度中心把事件发布出去
  3. 订阅者收到自己关心的事件变更并执行相对应业务

EventBus.png

其中发布者无需知道订阅者是谁,订阅者彼此之间也互不认识,彼此之间互不干扰

事件总线类型

在Masa Framework中,将事件划分为

  • 进程内事件 (Event)

本地事件,它的发布与订阅需要在同一个进程中,订阅方与发布方需要在同一个项目中

  • 跨进程事件 (IntegrationEvent)

集成事件,它的发布与订阅一定不在同一个进程中,订阅方与发布方可以在同一个项目中,也可以在不同的项目中

下面我们会用一个注册用户的例子来说明如何使用本地事件

入门

  • 安装.NET 6.0
  1. 新建ASP.NET Core 空项目Assignment.InProcessEventBus,并安装Masa.Contrib.Dispatcher.Events
dotnet new web -o Assignment.InProcessEventBus
cd Assignment.InProcessEventBus
dotnet add package Masa.Contrib.Dispatcher.Events --version 0.7.0-preview.7
  1. 注册EventBus (用于发布本地事件), 修改Program.cs
builder.Services.AddEventBus();
  1. 新增RegisterUserEvent类并继承Event,用于发布注册用户事件
public record RegisterEvent : Event
{
    public string Account { get; set; }

    public string Email { get; set; }

    public string Password { get; set; }
}
  1. 新增注册用户处理程序

在指定事件处理程序方法上增加特性 EventHandler,并在方法中增加参数 RegisterUserEvent

public class UserHandler
{
    private readonly ILogger<UserHandler>? _logger;

    public UserHandler(ILogger<UserHandler>? logger = null)
    {
        //todo: 根据需要可在构造函数中注入其它服务 (需支持从DI获取)
        _logger = logger;
    }

    [EventHandler]
    public void RegisterUser(RegisterUserEvent @event)
    {
        //todo: 1. 编写注册用户业务
        _logger?.LogDebug("-----------{Message}-----------", "检测用户是否存在并注册用户");
        
        //todo: 2. 编写发送注册通知等
        _logger?.LogDebug("-----------{Account} 注册成功 {Message}-----------", @event.Account, "发送邮件提示注册成功");
    }
}

注册用户的处理程序可以放到任意一个类中,但其构造函数参数必须支持从DI获取,且处理程序的方法仅支持 TaskVoid 两种, 不支持其它类型

  1. 发送注册用户事件,修改Program.cs
app.MapPost("/register", async (RegisterUserEvent @event, IEventBus eventBus) =>
{
    await eventBus.PublishAsync(@event);
});

进阶

处理流程

EventBus的 请求管道包含一系列请求委托,依次调用。 它们与ASP.NET Core中间件有异曲同工之妙,区别点在于中间件的执行顺序与注册顺序相反,最先注册的最后执行

EventBus.png

每个委托均可在下一个委托前后执行操作,其中TransactionMiddleware是EventBus发布后第一个要进入的中间件 (默认提供),并且它是不支持多次嵌套的。

EventBus 支持嵌套,这意味着我们可以在Handler中重新发布一个新的Event,但TransactionMiddleware仅会在最外层进入时被触发一次

自定义中间件

根据需要我们可以自定义中间件,并注册到EventBus的请求管道中,比如通过增加FluentValidation, 将参数验证从业务代码中剥离开来,从而使得处理程序更专注于业务

  1. 注册FluentValidation, 修改Program.cs
builder.Services.AddValidatorsFromAssembly(Assembly.GetEntryAssembly());
  1. 自定义验证中间件ValidatorMiddleware.cs,用于验证参数
public class ValidatorMiddleware<TEvent> : Middleware<TEvent>
    where TEvent : IEvent
{
    private readonly ILogger<ValidatorMiddleware<TEvent>>? _logger;
    private readonly IEnumerable<IValidator<TEvent>> _validators;

    public ValidatorMiddleware(IEnumerable<IValidator<TEvent>> validators, ILogger<ValidatorMiddleware<TEvent>>? logger = null)
    {
        _validators = validators;
        _logger = logger;
    }

    public override async Task HandleAsync(TEvent @event, EventHandlerDelegate next)
    {
        var typeName = @event.GetType().FullName;

        _logger?.LogDebug("----- Validating command {CommandType}", typeName);

        var failures = _validators
            .Select(v => v.Validate(@event))
            .SelectMany(result => result.Errors)
            .Where(error => error != null)
            .ToList();

        if (failures.Any())
        {
            _logger?.LogError("Validation errors - {CommandType} - Event: {@Command} - Errors: {@ValidationErrors}",
                typeName,
                @event,
                failures);

            throw new ValidationException("Validation exception", failures);
        }

        await next();
    }
}
  1. 注册EventBus并使用验证中间件ValidatorMiddleware
builder.Services.AddEventBus(eventBusBuilder=>eventBusBuilder.UseMiddleware(typeof(ValidatorMiddleware<>)));
  1. 添加注册用户验证类RegisterUserEventValidator.cs
public class RegisterUserEventValidator : AbstractValidator<RegisterUserEvent>
{
    public RegisterUserEventValidator()
    {
        RuleFor(e => e.Account).NotNull().WithMessage("用户名不能为空");
        RuleFor(e => e.Email).NotNull().WithMessage("邮箱不能为空");
        RuleFor(e => e.Password)
            .NotNull().WithMessage("密码不能为空")
            .MinimumLength(6)
            .WithMessage("密码必须大于6位")
            .MaximumLength(20)
            .WithMessage("密码必须小于20位");
    }
}

编排

EventBus 支持事件编排,它们可以用来处理一些对执行顺序有要求的业务,比如: 注册用户必须成功之后才可以发送注册邮件通知,发送奖励等等,那我们可以这样做

将注册用户业务拆分为三个Handler,并通过指定Order的值来对执行事件排序

public class UserHandler
{
    private readonly ILogger<UserHandler>? _logger;

    public UserHandler(ILogger<UserHandler>? logger = null)
    {
        _logger = logger;
    }

    [EventHandler(1)]
    public void RegisterUser(RegisterUserEvent @event)
    {
        _logger?.LogDebug("-----------{Message}-----------", "检测用户是否存在并注册用户");
        //todo: 编写注册用户业务
    }

    [EventHandler(2)]
    public void SendAwardByRegister(RegisterUserEvent @event)
    {
        _logger?.LogDebug("-----------{Account} 注册成功 {Message}-----------", @event.Account, "发送注册奖励");
        //todo: 编写发送奖励等
    }

    [EventHandler(3)]
    public void SendNoticeByRegister(RegisterUserEvent @event)
    {
        _logger?.LogDebug("-----------{Account} 注册成功 {Message}-----------", @event.Account, "发送注册成功邮件");
        //todo: 编写发送注册通知等
    }
}

Saga

EventBus支持Saga模式

Saga

具体是怎么做呢?

[EventHandler(1, IsCancel = true)]
public void CancelSendAwardByRegister(RegisterUserEvent @event)
{
    _logger?.LogDebug("-----------{Account} 注册成功,发放奖励失败 {Message}-----------", @event.Account, "发放奖励补偿");
}

当发送奖励出现异常时,则执行补偿机制,执行顺序为 (2 - 1) > 0,由于目前仅存在一个Order为1的Handler,则执行奖励补偿后退出

但对于部分不需要执行失败但不需要执行回退的方法,我们可以修改 FailureLevels 确保不会因为当前方法的异常而导致执行补偿机制

[EventHandler(3, FailureLevels = FailureLevels.Ignore)]
public void SendNoticeByRegister(RegisterUserEvent @event)
{
    _logger?.LogDebug("-----------{Account} 注册成功 {Message}-----------", @event.Account, "发送邮件提示注册成功");
    //todo: 编写发送注册通知等
}

源码解读

EventHandler

  • FailureLevels: 失败级别, 默认: Throw
    • Throw:发生异常后,依次执行Order小于当前Handler的Order的取消动作,比如:Handler顺序为 1、2、3,CancelHandler为 1、2、3,如果执行 Handler3 异常,则依次执行 2、1
    • ThrowAndCancel:发生异常后,依次执行Order小于等于当前Handler的Order的取消动作,比如:Handler顺序为 1、2、3,CancelHandler为 1、2、3,如果执行 Handler3 异常,则依次执行 3、2、1
    • Ignore:发生异常后,忽略当前异常(不执行取消动作),继续执行其他Handler
  • Order: 执行顺序,默认: int.MaxValue,用于控制当前方法的执行顺序
  • EnableRetry: 当Handler异常后是否启用重试, 默认: false
  • RetryTimes: 重试次数,当出现异常后执行多少次重试, 需开启重试配置
  • IsCancel: 是否是补偿机制,默认: false

Middleware

  • SupportRecursive: 是否支持递归 (嵌套), 默认: true
    • 部分中间件仅在最外层被触发一次,像TransactionMiddleware 就是如此,但也有很多中间件是需要被多次执行的,比如ValidatorMiddleware,每次发布事件时都需要验证参数是否正确
  • HandleAsync(TEvent @event, EventHandlerDelegate next): 处理程序,通过调用 next() 使得请求进入下一个Handler

IEventHandler 与 ISagaEventHandler

  • HandleAsync(TEvent @event): 提供事件的Handler
  • CancelAsync(TEvent @event): 提供事件的补偿Handler

EventHandler功能类似,提供基本的Handler以及补偿Handler,推荐使用EventHandler的方式使用

TransactionMiddleware

提供事务中间件,当EventBusUoW以及Masa提供的Repository来使用时,当存在待提交的数据时,会自动执行保存并提交,当出现异常后,会执行事务回滚,无需担心脏数据入库

性能测试

与市面上使用较多的MeidatR作了对比,结果如下图所示:

BenchmarkDotNet=v0.13.1, OS=Windows 10.0.19043.1023 (21H1/May2021Update)
11th Gen Intel Core i7-11700 2.50GHz, 1 CPU, 16 logical and 8 physical cores
.NET SDK=7.0.100-preview.4.22252.9
[Host] : .NET 6.0.6 (6.0.622.26707), X64 RyuJIT DEBUG
Job-MHJZJL : .NET 6.0.6 (6.0.622.26707), X64 RyuJIT

Runtime=.NET 6.0 IterationCount=100 RunStrategy=ColdStart

MethodMeanErrorStdDevMedianMinMax
AddShoppingCartByEventBusAsync124.80 us346.93 us1,022.94 us8.650 us6.500 us10,202.4 us
AddShoppingCartByMediatRAsync110.57 us306.47 us903.64 us7.500 us5.300 us9,000.1 us

根据性能测试我们发现,EventBus与MediatR性能差距很小,但EventBus提供的功能却要强大的多

常见问题

  1. 按照文档操作,通过EventBus发布事件后,对应的Handler并没有执行,也没有发现错误?

①. EventBus.PublishAsync(@event) 是异步方法,确保等待方法调用成功,检查是否出现同步方法调用异步方法的情况
②. 注册EventBus时指定程序集集合, Assembly被用于注册时获取并保存事件与Handler的对应关系

var assemblies = new[]
{
    typeof(UserHandler).Assembly
};
builder.Services.AddEventBus(assemblies);

程序集: 手动指定Assembly集合 -> MasaApp.GetAssemblies() -> AppDomain.CurrentDomain.GetAssemblies()

但由于NetCore按需加载,未使用的程序集在当前域中不存在,因此可能会导致部分事件以及Handler的对应关系未正确保存,因此可通过手动指定Assembly集合或者修改全局配置中的Assembly集合来修复这个问题

  1. 通过EventBus发布事件,Handler出错,但数据依然保存到数据库中

①. 检查是否禁用事务

  1. DisableRollbackOnFailure是否为true (是否失败时禁止回滚)
  2. UseTransaction是否为false (禁止使用事务)

②. 检查当前数据库是否支持回滚。例如: 使用的是Mysql数据库,但回滚数据失败,请查看

本章源码

Assignment11

https://github.com/zhenlei520/MasaFramework.Practice

开源地址

MASA.Framework:https://github.com/masastack/MASA.Framework

如果你对我们的 MASA Framework 感兴趣,无论是代码贡献、使用、提 Issue,欢迎联系我们

  • WeChat:MasaStackTechOps
  • QQ:7424099

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/31033.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

nginx简单学习笔记

目录什么是nginx&#xff1f;反向代理负载均衡动静分离安装nginxNginx常用命令nginx.conf配置文件1、位置2 nginx配置文件组成Nginx 反向代理实例Nginx 配置实例-负载均衡nginx 分配服务器策略Nginx配置实例-动静分离nginx原理什么是nginx&#xff1f; Nginx是一个高性能的 HT…

【附源码】计算机毕业设计JAVA医院住院综合服务管理系统

【附源码】计算机毕业设计JAVA医院住院综合服务管理系统 目运行 环境项配置&#xff1a; Jdk1.8 Tomcat8.5 Mysql HBuilderX&#xff08;Webstorm也行&#xff09; Eclispe&#xff08;IntelliJ IDEA,Eclispe,MyEclispe,Sts都支持&#xff09;。 项目技术&#xff1a; …

第五站:操作符(第一幕)

操作符相关的知识&#xff0c;在我们初识C语言&#xff08;第三幕&#xff09;这篇文章中其实已经讲到过了。但是那一次讲解仅仅只是一些粗略的知识讲解&#xff0c;我们在那里面已经提到过&#xff0c;后续会专门超级详细的讲解操作符的知识&#xff0c;我们现在就来攻下操作符…

2022亚太数学杯数学建模竞赛B题(思路、程序......)

&#x1f352;&#x1f352;&#x1f352;欢迎关注&#x1f308;&#x1f308;&#x1f308; &#x1f4dd;个人主页&#xff1a;我爱Matlab &#x1f44d;点赞➕评论➕收藏 养成习惯&#xff08;一键三连&#xff09;&#x1f33b;&#x1f33b;&#x1f33b; &#x1f34c;希…

C妈妈加密返回值逆向分析

内容仅供参考学习 目标 网址&#xff1a;https://www.chanmama.com/promotionDetail/0IHXaJ1y7lRNaZyiTsKa6vHtvaZDe6zy/live 直播记录接口返回值加密 分析 打开调试工具&#xff0c;先截获一个请求&#xff1a; 进入Initiator 一般情况通过这边进入函数中挨着…

HTML CSS游戏官网网页模板 大学生游戏介绍网站毕业设计 DW游戏主题网页模板下载 游戏娱乐网页成品代码

&#x1f389;精彩专栏推荐&#x1f447;&#x1f3fb;&#x1f447;&#x1f3fb;&#x1f447;&#x1f3fb; ✍️ 作者简介: 一个热爱把逻辑思维转变为代码的技术博主 &#x1f482; 作者主页: 【主页——&#x1f680;获取更多优质源码】 &#x1f393; web前端期末大作业…

#边学边记 必修5 高项:对人管理 第2章 项目沟通管理和干系人管理 2-5 项目干系人管理

导学 重要知识点&#xff1a;干系人分析&#xff0c;干系人分类、分析技术、项目干系人管理过程的输入、工具与技术、输出等。下午的案例题可能会涉及与项目干系人管理知识点相关的简答题。 干系人管理的过程 1. 识别干系人 识别能够影响项目决策、活动或结果的个人、群体或…

Lysozyme C (46-61) (chicken),62982-31-4

AMPs是由相对较小的分子组成的异质基团&#xff0c;通常含有不到100个氨基酸。 它们最初是在20世纪60年代由Zeya和Spitznagel 在多形核白细胞溶酶体中描述的。 迄今为止&#xff0c;已在数据库&#xff08;如数据库&#xff09;中 确定和登记了2600多个AMP。 它们是由几乎所有的…

Mysql语法三:表的约束和表与表之间的关系以及高级查询

目录 1.表的约束 1.1:约束类型 1.2&#xff1a;NULL约束 1.3&#xff1a;UNIQUE&#xff1a;唯一约束 1.4&#xff1a;DEFAULT &#xff1a;默认值约束 1.5&#xff1a;PRIMARY KEY&#xff1a;主键约束 1.5.1:联合主键 1.5.2&#xff1a;自增主键 1.6&#xff1a;FORE…

HOOPS 3DGS技术概述

1.什么是HOOPS 3D图像系统 HOOPS 3D图形系统&#xff08;HOOPS/3DGS&#xff09;是一款高性能3D图形工具包&#xff0c;适用于开发人员构建Windows和UNIX操作系统以及Internet应用程序。HOOPS/3DGS高度优化的数据结构和算法大大简化了基于CAD/CAM/CAE、科学可视化和地理信息系…

Flutter的三棵树

一、Flutter常见的家族成员 Widget常见的家族成员 Element常见的家族成员 Render常见的家族成员 二、示例代码对应的Flutter Inspector树 示例代码&#xff1a;MyApp->MyHomePage->ErrorWidget&#xff0c;包含了StatelessWidget、StatefulWidget、LeafRenderObjectWid…

远程医疗解决方案-最新全套文件

远程医疗解决方案-最新全套文件一、建设背景二、建设思路三、建设方案四、获取 - 远程医疗全套最新解决方案合集一、建设背景 针对当今社会医疗资源分布不均、看病难看病贵、医学单位间学术交流和研讨开展困难&#xff0c;华为公司推出了远程医疗解决方案&#xff0c;实现远程…

springboot(13):spring 过滤器和拦截器的区别

目录过滤器和拦截器的区别过滤器的使用1.使用spring boot提供的FilterRegistrationBean2.使用原生servlet注解定义Filter拦截器的使用在前面我们讲过拦截器怎么使用&#xff0c;参考&#xff1a;拦截器 和拦截器有个差不多的叫过滤器。 过滤器和拦截器的区别 首先看一下下面…

1535. 找出数组游戏的赢家

给你一个由 不同 整数组成的整数数组 arr 和一个整数 k 。 每回合游戏都在数组的前两个元素&#xff08;即 arr[0] 和 arr[1] &#xff09;之间进行。比较 arr[0] 与 arr[1] 的大小&#xff0c;较大的整数将会取得这一回合的胜利并保留在位置 0 &#xff0c;较小的整数移至数组…

[附源码]计算机毕业设计JAVA竞价拍卖系统

[附源码]计算机毕业设计JAVA竞价拍卖系统 项目运行 环境配置&#xff1a; Jdk1.8 Tomcat7.0 Mysql HBuilderX&#xff08;Webstorm也行&#xff09; Eclispe&#xff08;IntelliJ IDEA,Eclispe,MyEclispe,Sts都支持&#xff09;。 项目技术&#xff1a; SSM mybatis M…

Spring 源码阅读 04:BeanFactory 初始化

本篇要阅读的是 BeanFactory 初始化的部分&#xff0c;也就是 refresh 方法中的这一行方法调用&#xff1a; // Tell the subclass to refresh the internal bean factory. // 这里会调用模版方法&#xff0c;通过子类的实现&#xff0c;初始化 BeanFactory 并解析 XML 配置 C…

企业快速构建可落地的IT服务管理体系的五大关键点

随着数字化转型的发展&#xff0c;IT运维管理环境日益复杂&#xff0c;对管理的要求也随之增高如何提升运维效率&#xff0c;快速落地做好运维管理&#xff0c;搭建一套IT服务管理必不可少&#xff0c;以往我们也对IT服务管理框架进行过总结&#xff0c;当下&#xff0c;面对很…

Linux ALSA 之三:简单的 ALSA Driver 实现

简单的 ALSA Driver 实现一、概述二、Linux ALSA 音频设备驱动实例1、注册 Platform Device & Platform Driver2、创建 card3、PCM 设备相关设定3.1 创建 PCM Device3.2 设置 PCM 操作3.2 PCM HW 初始化4、Control 设备相关设定4.1 定义 snd_kcontrol_new4.2 构造 control5…

C语言实现冒泡排序(图解)

目录 一、冒泡排序是什么&#xff1f; 二、图解冒泡排序过程 三、代码实现 3.1易错点&#xff08;切记切记&#xff09; 四、优化 4.1优化代码 一、冒泡排序是什么&#xff1f; int arr[]{9,8,7,6,5,4,3,2,1,0} &#xff0c;像这样的数组&#xff0c;升序排序。 冒泡排序…

Dynamic Potential-Based Reward Shaping将势能塑形奖励函数拓展为F(s,t,s‘,t‘)

摘要 基于势能的奖励塑形可以显著降低学习最优策略所需的时间&#xff0c;并且在多agent系统中&#xff0c;可以显著提高最终联合策略的性能。已经证明&#xff0c;它不会改变一个agent单独学习的最优策略或多个agent一起学习的纳什均衡。 ------然而&#xff0c;现有证明的一…