MASA Framework 事件总线 - 跨进程事件总线

news2025/1/10 20:39:02

概述

跨进程事件总线允许发布和订阅跨服务传输的消息, 服务的发布与订阅不在同一个进程中

在Masa Framework中, 跨进程总线事件提供了一个可以被开箱即用的程序

  • IntegrationEvents: 提供了发件箱模式
    • IntegrationEvents.Dapr: 借助Dapr实现了消息的发布
    • EventLogs.EFCore: 基于EFCore实现的集成事件日志的提供者, 提供消息的记录与状态更新、失败日志重试、删除过期的日志记录等

入门

跨进程事件与Dapr并不是强绑定的, Masa Framework使用了Dapr提供的pub/sub的能力, 如果你不想使用它, 你也可以更换为其它实现, 但目前Masa Framwork中仅提供了Dapr的实现

  • 安装 .NET 6.0
  • 安装 Dapr
  1. 新建ASP.NET Core 空项目Assignment.IntegrationEventBus,并安装Masa.Contrib.Dispatcher.IntegrationEvents.DaprMasa.Contrib.Dispatcher.IntegrationEvents.EventLogs.EFCoreMasa.Contrib.Data.EFCore.SqliteMasa.Contrib.Data.UoW.EFCoreMasa.Contrib.Development.DaprStarter.AspNetCoreMicrosoft.EntityFrameworkCore.Design
dotnet new web -o Assignment.IntegrationEventBus
cd Assignment.IntegrationEventBus

dotnet add package Masa.Contrib.Dispatcher.IntegrationEvents.Dapr --version 0.7.0-preview.8 // 使用dapr提供的pubsub能力
dotnet add package Masa.Contrib.Dispatcher.IntegrationEvents.EventLogs.EFCore --version 0.7.0-preview.8 //本地消息表
dotnet add package Masa.Contrib.Data.EFCore.Sqlite --version 0.7.0-preview.8 //使用EfCore.Sqlite
dotnet add package Masa.Contrib.Data.UoW.EFCore --version 0.7.0-preview.8 //使用工作单元
dotnet add package Masa.Contrib.Development.DaprStarter.AspNetCore --version 0.7.0-preview.8 //开发环境使用DaprStarter协助管理Dapr Sidecar
dotnet add package Microsoft.EntityFrameworkCore.Design --version 6.0.6 //方便后续通过CodeFirst迁移数据库
  1. 新建用户上下文类UserDbContext,并继承MasaDbContext
public class UserDbContext : MasaDbContext
{
    public UserDbContext(MasaDbContextOptions<UserDbContext> options) : base(options)
    {
    }
}
  1. 注册DaprStarter, 协助管理Dapr Sidecar, 修改Program.cs
if (builder.Environment.IsDevelopment())
{
    builder.Services.AddDaprStarter();
}

通过Dapr发布集成事件需要运行Dapr, 线上环境可通过Kubernetes来运行, 开发环境可借助Dapr Starter运行Dapr, 因此仅需要在开发环境使用它

  1. 注册跨进程事件总线,修改类Program
builder.Services.AddIntegrationEventBus(option =>
{
    option.UseDapr()
        .UseEventLog<UserDbContext>()
        .UseUoW<UserDbContext>(optionBuilder => optionBuilder.UseSqlite($"Data Source=./Db/{Guid.NewGuid():N}.db;"));
});
var app = builder.Build();

#region dapr 订阅集成事件使用
app.UseRouting();

app.UseCloudEvents();
app.UseEndpoints(endpoints =>
{
    endpoints.MapSubscribeHandler();
});
#endregion
  1. 新增用户注册事件的集成事件 RegisterUserEvent
public record RegisterUserEvent : IntegrationEvent
{
    public override string Topic { get; set; } = nameof(RegisterUserEvent);

    public string Account { get; set; }

    public string Mobile { get; set; }
}
  1. 打开Assignment.IntegrationEventBus所在文件夹,打开cmd或Powershell执行
dotnet ef migrations add init //创建迁移
dotnet ef database update //更新数据库
  1. 发送跨进程事件,修改Program
app.MapPost("/register", async (IIntegrationEventBus eventBus) =>
{
    //todo: 模拟注册用户并发布注册用户事件
    await eventBus.PublishAsync(new RegisterUserEvent()
    {
        Account = "Tom",
        Mobile = "19999999999"
    });
});
  1. 订阅事件,修改Program
app.MapPost("/IntegrationEvent/RegisterUser", [Topic("pubsub", nameof(RegisterUserEvent))](RegisterUserEvent @event) =>
{
    Console.WriteLine($"注册用户成功: {@event.Account}");
});

订阅事件暂时未抽象,目前使用的是Dapr原生的订阅方式,后续我们会支持Bind,届时不会由于更换pubsub的实现而导致订阅方式的改变

尽管跨进程事件目前仅支持了Dapr,但这不代表你与RabbitMqKafka等无缘,发布/订阅是Dapr抽象出的能力,实现发布订阅的组件有很多种,RabbitMqKafka是其中一种实现,如果你想深入了解他们之间的关系,可以参考:

  1. 手把手教你学Dapr
  2. PubSub代理

源码解读

首先我们先要知道的基础知识点:

  • IIntegrationEvent: 集成事件接口, 继承 IEvent (本地事件接口)、ITopic (订阅接口, 发布订阅的主题)、ITransaction (事务接口)
  • IIntegrationEventBus: 集成事件总线接口、用于提供发送集成事件的功能
  • IIntegrationEventLogService: 集成事件日志服务的接口 (提供保存本地日志、修改状态为进行中、成功、失败、删除过期日志、获取等待重试日志列表的功能)
  • IntegrationEventLog: 集成事件日志, 提供本地消息表的模型
  • IHasConcurrencyStamp: 并发标记接口 (实现此接口的类会自动为RowVersion赋值)

IntegrationEvent

Masa.Contrib.Dispatcher.IntegrationEvents

提供了集成事件接口的实现类, 并支持了发件箱模式, 其中:

  • IPublisher: 集成事件的发送者
  • IProcessingServer: 后台服务接口
  • IProcessor: 处理程序接口 (后台处理程序中会获取所有的程序程序)
    • DeleteLocalQueueExpiresProcessor: 删除过期程序 (从本地队列删除)
    • DeletePublishedExpireEventProcessor: 删除已过期的发布成功的本地消息程序 (从Db删除)
    • RetryByLocalQueueProcessor: 重试本地消息记录 (从本地队列中获取, 条件: 发送状态为失败或进行中且重试次数小于最大重试次数且重试间隔大于最小重试间隔)
    • RetryByDataProcessor: 重试本地消息记录 (从Db获取, 条件: 发送状态为失败或进行中且重试次数小于最大重试次数且重试间隔大于最小重试间隔, 且不在本地重试队列中)
  • IntegrationEventBus: IIntegrationEvent的实现

Masa.Contrib.Dispatcher.IntegrationEvents中仅提供了发件箱的功能, 但集成事件的发布是由 IPublisher的实现类来提供, 由Db获取本地消息表的功能是由IIntegrationEventLogService的实现类来提供, 它们分别属于Masa.Contrib.Dispatcher.IntegrationEvents.DaprMasa.Contrib.Dispatcher.IntegrationEvents.EventLogs.EFCore的功能, 这也是为什么使用集成事件需要引用包

  • Masa.Contrib.Dispatcher.IntegrationEvents
  • Masa.Contrib.Dispatcher.IntegrationEvents.Dapr
  • Masa.Contrib.Dispatcher.IntegrationEvents.EventLogs.EFCore

如何快速接入其它实现

那会有小伙伴问了, 我现在没有使用Dapr, 未来一段时间暂时也还不希望接入Dapr, 我想自己接入, 以实现集成事件的发布可以吗?

当然是可以的, 如果你希望自行实现集成事件, 那么这个时候你会遇到两种情况

接入方支持发件箱模式

以社区用的较多的库CAP为例, 由于它本身已经完成了发件箱模式, 我们不需要再处理本地消息表, 也无需考虑本地消息记录的管理, 那我们可以这样做

  1. 新建类库Masa.Contrib.Dispatcher.IntegrationEvents.Cap, 添加Masa.BuildingBlocks.Dispatcher.IntegrationEvents的引用, 并安装DotNetCore.CAP
dotnet add package DotNetCore.CAP
  1. 新增类IntegrationEventBus, 并实现IIntegrationEventBus
public class IntegrationEventBus : IIntegrationEventBus
{
    private readonly ICapPublisher _publisher;
    private readonly ICapTransaction _capTransaction;
    private readonly IUnitOfWork? _unitOfWork;
    public IntegrationEventBus(ICapPublisher publisher, ICapTransaction capTransaction, IUnitOfWork? unitOfWork = null)
    {
        _publisher = publisher;
        _capTransaction = capTransaction;
        _unitOfWork = unitOfWork;
    }
    
    public Task PublishAsync<TEvent>(TEvent @event) where TEvent : IEvent
    {
        // 如果使用事务
        // _publisher.Transaction.Value.DbTransaction = unitOfWork.Transaction;
        // _publisher.Publish(@event.Topic, @event);
        throw new NotImplementedException();
    }

    public IEnumerable<Type> GetAllEventTypes()
    {
        throw new NotImplementedException();
    }

    public Task CommitAsync(CancellationToken cancellationToken = default)
    {
        throw new NotImplementedException();
    }
}

CAP已支持本地事务, 使用当前IUnitOfWork提供的事务, 确保数据的原子性

  1. 新建类ServiceCollectionExtensions, 将自定义Publisher注册到服务集合
public static class ServiceCollectionExtensions
{
    public static DispatcherOptions UseRabbitMq(this IServiceCollection services)
    {
         //todo: 注册RabbitMq信息
         services.TryAddScoped<IIntegrationEventBus, IntegrationEventBus>();
         return dispatcherOptions;
    }
}

已经实现发件箱模式的可以直接使用, 而不需要引用

  • Masa.Contrib.Dispatcher.IntegrationEvents
  • Masa.Contrib.Dispatcher.IntegrationEvents.Dapr
  • Masa.Contrib.Dispatcher.IntegrationEvents.EventLogs.EFCore

以上未经过实际验证, 感兴趣的可以尝试下, 欢迎随时提pr

接入方不支持发件箱模式

我希望直接接入RabbitMq, 但我自己没有做发件箱模式, 那我可以怎么做呢?

由于Masa.Contrib.Dispatcher.IntegrationEvents已提供发件箱模式, 如果仅仅希望更换一个发布事件的实现者, 那我们仅需要实现IPublisher即可

  1. 新建类库Masa.Contrib.Dispatcher.IntegrationEvents.RabbitMq, 添加Masa.Contrib.Dispatcher.IntegrationEvents项目引用, 并安装RabbitMQ.Client
dotnet add package RabbitMQ.Client //使用RabbitMq
  1. 新增类Publisher,并实现IPublisher
public class Publisher : IPublisher
{
    public async Task PublishAsync<T>(string topicName, T @event, CancellationToken stoppingToken = default) where T : IIntegrationEvent
    {
        //todo: 通过 RabbitMQ.Client 发送消息到RabbitMq
        throw new NotImplementedException();
    }
}
  1. 新建类DispatcherOptionsExtensions, 将自定义Publisher注册到服务集合
public static class DispatcherOptionsExtensions
{
    public static DispatcherOptions UseRabbitMq(this Masa.Contrib.Dispatcher.IntegrationEvents.Options.DispatcherOptions options)
    {
         //todo: 注册RabbitMq信息
         dispatcherOptions.Services.TryAddSingleton<IPublisher, Publisher>();
         return dispatcherOptions;
    }
}
  1. 如何使用自定义实现RabbitMq
builder.Services.AddIntegrationEventBus(option =>
{
    option.UseRabbitMq();//修改为使用RabbitMq
    option.UseUoW<UserDbContext>(optionBuilder => optionBuilder.UseSqlite($"Data Source=./Db/{Guid.NewGuid():N}.db;"));
    option.UseEventLog<UserDbContext>();
});

本章源码

Assignment12

https://github.com/zhenlei520/MasaFramework.Practice

开源地址

MASA.Framework:https://github.com/masastack/MASA.Framework

如果你对我们的 MASA Framework 感兴趣,无论是代码贡献、使用、提 Issue,欢迎联系我们

  • WeChat:MasaStackTechOps
  • QQ:7424099

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/50600.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

企业想要做好数据分析,可以试试瓴羊Quick BI

企业的数字化发展已经成为了一个发展的方向&#xff0c;可是各个企业如何才可以实现数字化发展确实成为了难题。从很多企业的内部发展中来讲&#xff0c;每一个部分的数据化的分析都已经成为了行业的难点&#xff0c;如何做好这些分析工作确实是很关键的内容。 而在具体数字化…

艾美捷细胞低密度脂肪酸(LDL)摄取试剂盒的功能应用

胆固醇是一种重要的细胞成分&#xff0c;维持胆固醇稳态对正常生理功能至关重要。血浆胆固醇水平升高与各种病理状况有关&#xff0c;最明显的是冠心病&#xff0c;高胆固醇水平导致动脉泡沫细胞形成和斑块堆积&#xff0c;可能导致心脏病发作或中风。细胞胆固醇代谢和血浆胆固…

教育在线学习系统,教育培训都能用,支持多个终端

随着居家隔离不断反复&#xff0c;在线教育也成为了居家学习的必备工具之一。常用的方式就是在线教育培训。教育在线学习系统为教育行业的发展提供了有效工具&#xff0c;推动着教育行业逐步转型到线上线下的教育模式。学生在家通过手机就能随时随地学习&#xff0c;非常的方便…

基于矩阵分解模型的协同过滤理论概述(涉及到SVD,SVD++,TimeSVD++)

前言 本篇文章是对博客&#xff1a;从item-base到svd再到rbm&#xff0c;多种Collaborative Filtering(协同过滤算法)从原理到实现的补全&#xff0c;感谢该作者的分享 本文补全的内容为&#xff1a; SVD中 yjy_jyj​的实际含义理解&#xff0c;以及对应的梯度下降公式TimeS…

[附源码]JAVA毕业设计高校校园社交网络(系统+LW)

[附源码]JAVA毕业设计高校校园社交网络&#xff08;系统LW&#xff09; 目运行 环境项配置&#xff1a; Jdk1.8 Tomcat8.5 Mysql HBuilderX&#xff08;Webstorm也行&#xff09; Eclispe&#xff08;IntelliJ IDEA,Eclispe,MyEclispe,Sts都支持&#xff09;。 项目技术…

高通导航器软件开发包使用指南(15)

高通导航器软件开发包使用指南&#xff08;15&#xff09;9 基本参数调整9.1关键调谐参数9.2调整过程9.2.1传感器方向9.2.2电子速度控制器&#xff08;ESC&#xff09;9.2.3螺旋桨和电机特性9.2.4指定车辆总质量的参数9.2.5最小和最大推力命令9.2.6推进器配置9.2.7姿态控制增益…

魔兽世界服务端源码各个重要文件详细情况说明开服一条龙

魔兽服务端开服源文件各文件翻译 很多文件在服务器中我们知道是跟什么有关&#xff0c;但就是不知道其作用是什么。就算我们知道在这些地方中的文件都是有着不小的作用。但是由于不知道各个文件代表的是什么意思所以在面对这些文件的时候都会有无从下手的感觉&#xff0c;所以…

入耳式无线蓝牙耳机哪款好?无线入耳蓝牙耳机推荐

随着近几年蓝牙耳机的快速发展&#xff0c;使用蓝牙耳机的人也越来越多&#xff0c;可供人们选择的蓝牙耳机也有很多。那么&#xff0c;在现如今的蓝牙耳机市场中&#xff0c;哪款无线蓝牙耳机好&#xff1f;下面&#xff0c;我来给大家推荐几款无线入耳蓝牙耳机&#xff0c;可…

HTML+CSS+JavaScript仿京东购物网站制作 html静态网页设计制作 dw静态网页成品模板素材网页 web前端网页设计与制作 div静态网页设计

HTML实例网页代码, 本实例适合于初学HTML的同学。该实例里面有设置了css的样式设置&#xff0c;有div的样式格局&#xff0c;这个实例比较全面&#xff0c;有助于同学的学习,本文将介绍如何通过从头开始设计个人网站并将其转换为代码的过程来实践设计。 ⚽精彩专栏推荐&#x1…

JavaWeb中的VUE快速入门

目录 概述: Vue的安装 Vue的常用指令 通过VUE高效提交表单调用接口请求 Vue的生命周期 概述: Vue是一套前端框架&#xff0c;免除原生JavaScript中的DOM操作&#xff0c;简化书写。Vue为当前的国内前端主流框架&#xff0c;基于MVVM&#xff08;Model-View-ViewModel&…

kubernetes之pod详解

pod详解 文章目录pod详解Pod生命周期一、创建和终止二、 初始化容器Pod调度定向调度亲和性调度污点和容忍Pod生命周期 我们一般将pod对象从创建至终的这段时间范围称为pod的生命周期&#xff0c;它主要包含下面的过程&#xff1a; pod创建过程运行初始化容器&#xff08;init …

ByteX-shrink_r源码解析

背景 为什么要对R文件内联处理&#xff1f; 这里首先说一下Android R文件的产生&#xff0c;对于Android开发者我们都知道&#xff0c;当我们要使用要使用一些布局文件&#xff0c;drawable等其他资源时&#xff0c;可以直接用 R.id. R.drawble.等直接使用&#xff0c;而这个…

Redis 内存管理

前言 Redis 的同学应该都知道&#xff0c;它基于键值对&#xff08;key-value&#xff09;的内存数据库&#xff0c;所有数据存放在内存中&#xff0c;内存在 Redis 中扮演一个核心角色&#xff0c;所有的操作都是围绕它进行。我们在实际维护过程中经常会被问到如下问题&#x…

详解设计模式:组合模式

组合模式&#xff08;Composite Pattern&#xff09;&#xff0c;又叫部分整体模式&#xff0c;是 GoF 的 23 种设计模式中的一种结构型设计模式。 组合模式 是用于把一组相似的对象当作一个单一的对象。组合模式依据树形结构来组合对象&#xff0c;用来表示部分以及整体层次。…

Codeforces Round #726 (Div. 2) E1. Erase and Extend (Easy Version)

翻译&#xff1a; 这是这个问题的简单版本。唯一的区别是&#x1d45b;和&#x1d458;上的约束。只有当所有版本的问题都解决了&#xff0c;你才能进行hack。 你有一个字符串&#x1d460;&#xff0c;你可以对它做两种类型的操作: 删除字符串的最后一个字符。 复制字符串:…

UI 智能化的原理和未来

本文将从 GUI 中用户体验的构建开始&#xff0c;用高质量、可调控、交互体验创新三个部分&#xff0c;分别介绍如何从传统 UI 一步步迈向 UI 智能化。最后&#xff0c;用如何实现 UI 智能化的一些思考收尾。 本文仅代表作者个人观点。前言&#xff1a;「UI 智能化才是用户体验的…

第十七章《MySQL数据库及SQL语言简介》第3节:数据库管理

17.2小节主要讲解的是MySQL数据库的下载、配置和安装。从严格意义来讲,17.2小节所做的工作是对“数据库管理系统”进行下载、安装和配置。本小节所要讲解的数据库管理是指如何用数据库管理系统新建、重命名和删除一个数据库。 程序员操作数据库管理系统主要有两种方式:1、通…

[附源码]Python计算机毕业设计SSM课程教学质量综合分析平台(程序+LW)

项目运行 环境配置&#xff1a; Jdk1.8 Tomcat7.0 Mysql HBuilderX&#xff08;Webstorm也行&#xff09; Eclispe&#xff08;IntelliJ IDEA,Eclispe,MyEclispe,Sts都支持&#xff09;。 项目技术&#xff1a; SSM mybatis Maven Vue 等等组成&#xff0c;B/S模式 M…

企业里使用最广泛的技术之一SparkSQL

声明&#xff1a; 文章中代码及相关语句为自己根据相应理解编写&#xff0c;文章中出现的相关图片为自己实践中的截图和相关技术对应的图片&#xff0c;若有相关异议&#xff0c;请联系删除。感谢。转载请注明出处&#xff0c;感谢。 By luoyepiaoxue2014 B站&#xff…

mybatis实战:一、mybatis入门(配置、一些问题的解决)

出自《MyBatis从入门到精通》刘增辉&#xff0c;精简 1.pom.xml 1.设置源码编码方式为 UTF -8 2.设置编译源代码的 JDK 版本 3.添加mybatis依赖 4.还需要添加会用到的 Log4j JUnit ySql 驱动的依赖。 <?xml version"1.0" encoding"UTF-8"?> <pr…