将Abp默认事件总线改造为分布式事件总线

news2024/9/25 1:14:01

文章目录

  • 原理
    • 创建分布式事件总线
    • 实现自动订阅和事件转发
  • 使用
    • 启动Redis服务
    • 配置
    • 传递Abp默认事件
    • 传递自定义事件
  • 项目地址

原理

本地事件总线是通过Ioc容器来实现的。

IEventBus接口定义了事件总线的基本功能,如注册事件、取消注册事件、触发事件等。

Abp.Events.Bus.EventBus是本地事件总线的实现类,其中私有成员ConcurrentDictionary<Type, List<IEventHandlerFactory>> _handlerFactories是事件订阅表。通过维护事件订阅表来实现事件处理器的注册和取消注册。当对应类型的事件触发时,通过订阅表查找所有事件处理器,通过Ioc容器来获取处理器实例,然后通过反射来调用事件处理器的"HandleEvent"方法。

创建分布式事件总线

首先,我们需要一个分布式事件总线中间件,用来将事件从本地事件总线转发到分布式事件总线。常用的中间件有RabbitMQ、Kafka、Redis等。

开源社区已经有实现好的库,本项目参考了 wuyi6216/Abp.RemoteEventBus

这里已经定义好了一个分布式事件总线接口


public interface IDistributedEventBus : IDisposable
{
    void MessageHandle(string topic, string message);

    void Publish(IDistributedEventData eventData);

    void Subscribe(string topic);

    void Unsubscribe(string topic);

    void UnsubscribeAll();
}

为了兼容本地事件总线,我们需要定义一个分布式事件总线接口,继承自IEventBus接口。


public interface IMultipleEventBus : IDistributedEventBus, IEventBus
{

}


实现自动订阅和事件转发

当注册本地事件时,将订阅分布式事件,事件Topic为类型的字符串表现形式

public IDisposable Register(Type eventType, IEventHandlerFactory factory)
{
    GetOrCreateHandlerFactories(eventType);
    List<IEventHandlerFactory> currentLists;
    if (_handlerFactories.TryGetValue(eventType, out currentLists))
    {
        lock (currentLists)
        {
            if (currentLists.Count == 0)
            {
                //Register to distributed event
                this.Subscribe(eventType.ToString());
            }
            currentLists.Add(factory);
        }
    }
    return new FactoryUnregistrar(this, eventType, factory);
}

创建TriggerRemote,此方法用于将本地事件参数打包成为分布式事件消息payload,并发布该消息

public void TriggerRemote(Type eventType, object eventSource, IEventData eventData)
{
    var exceptions = new List<Exception>();
    eventData.EventSource = eventSource;
    try
    {
        var payloadDictionary = new Dictionary<string, object>
                {
                    { PayloadKey, eventData }
                };
        var distributedeventData = new DistributedEventData(eventType.ToString(), payloadDictionary);
        Publish(distributedeventData);
    }

    catch (Exception ex)
    {
        exceptions.Add(ex);
    }
    if (exceptions.Any())
    {
        if (exceptions.Count == 1)
        {
            exceptions[0].ReThrow();
        }

        throw new AggregateException("More than one error has occurred while triggering the event: " + eventType, exceptions);
    }
}


当触发本地事件时,将消息转发至分布式事件总线。
在Trigger方法中调用TriggerRemote,事件状态回调和事件异常回调将不会被转发。

if (!(typeof(DistributedEventBusEvent) == eventType
   || typeof(DistributedEventBusEvent).IsAssignableFrom(eventType)
   || typeof(DistributedEventMessageHandleExceptionData) == eventType
   || typeof(DistributedEventHandleExceptionData) == eventType
    ))
{
    if (typeof(DistributedEventArgs) != eventType)
    {
        TriggerRemote(eventType, eventSource, eventData);

    }
}

在消费端接收到分布式事件消息时,从Topic中解析类型,转发给本地事件。若此类型在本地事件注册过,则将消息反序列化为本地事件参数,然后触发本地事件。
本地事件处理器将触发最终的处理方法。


public virtual void MessageHandle(string topic, string message)
{
    Logger.Debug($"Receive message on topic {topic}");
    try
    {
        var eventData = _remoteEventSerializer.Deserialize<DistributedEventData>(message);
        var eventArgs = new DistributedEventArgs(eventData, topic, message);
        Trigger(this, new DistributedEventBusHandlingEvent(eventArgs));

        if (!string.IsNullOrEmpty(eventData.Type))
        {
            string pattern = @"(.*?)\[(.*?)\]";
            Match match = Regex.Match(eventData.Type, pattern);
            if (match.Success)
            {

                var type = match.Groups[1].Value;
                var type2 = match.Groups[2].Value;

                var localTriggerType = typeFinder.Find(c => c.FullName == type).FirstOrDefault();
                var genericType = typeFinder.Find(c => c.FullName == type2).FirstOrDefault();

                if (localTriggerType != null && genericType != null)
                {

                    if (localTriggerType.GetTypeInfo().IsGenericType
                        && localTriggerType.GetGenericArguments().Length == 1
                        && !genericType.IsAbstract && !genericType.IsInterface
                        )
                    {
                        var localTriggerGenericType = localTriggerType.GetGenericTypeDefinition().MakeGenericType(genericType);


                        if (eventData.Data.TryGetValue(PayloadKey, out var payload))
                        {
                            var payloadObject = (payload as JObject).ToObject(localTriggerGenericType);
                            Trigger(localTriggerGenericType, this, (IEventData)payloadObject);

                        }
                    }
                }


            }
            else
            {
                var localTriggerType = typeFinder.Find(c => c.FullName == eventData.Type).FirstOrDefault();
                if (localTriggerType != null && !localTriggerType.IsAbstract && !localTriggerType.IsInterface)
                {
                    if (eventData.Data.TryGetValue(PayloadKey, out var payload))
                    {
                        var payloadObject = (payload as JObject).ToObject(localTriggerType);
                        Trigger(localTriggerType, this, (IEventData)payloadObject);

                    }

                }
            }
            Trigger(this, new DistributedEventBusHandledEvent(eventArgs));

        }
    }
    catch (Exception ex)
    {
        Logger.Error("Consume remote message exception", ex);
        Trigger(this, new DistributedEventMessageHandleExceptionData(ex, topic, topic));
    }
}

使用

DistributedEventBus有不同的实现方式,这里以Redis为例

启动Redis服务

下载Redis并启动服务,使用默认端口6379

配置

生产者和消费者端都需要配置分布式事件总线

首先引用Abp.DistributedEventBus.Redis,并配置Abp模块依赖

[DependsOn(typeof(AbpDistributedEventBusRedisModule))]

在PreInitialize方法中配置Redis连接信息

 Configuration.Modules.DistributedEventBus().UseRedis().Configure(setting =>
 {
     setting.Server = "127.0.0.1:6379";
 });

用MultipleEventBus替换Abp默认事件总线

 //todo: 事件总线
 Configuration.ReplaceService(
  typeof(IEventBus),
  () => IocManager.IocContainer.Register(
      Component.For<IEventBus>().ImplementedBy<MultipleEventBus>()
  ));

传递Abp默认事件

我们知道在使用仓储时,Abp会自动触发一些事件,如创建、更新、删除等。我们来测试这些事件是否能通过分布式事件总线来传递。

定义一个实体类,用于传递实体的增删改事件。


public class Person : FullAuditedEntity<int>
{

    public string Name { get; set; }
    public int Age { get; set; }
    public string PhoneNumber { get; set; }

}

在消费者端,定义一个事件处理器,用于处理实体的增删改事件。


public class RemoteEntityChangedEventHandler :
    IEventHandler<EntityUpdatedEventData<Person>>,
    IEventHandler<EntityCreatedEventData<Person>>,
    IEventHandler<EntityDeletedEventData<Person>>,
    ITransientDependency
{

    void IEventHandler<EntityUpdatedEventData<Person>>.HandleEvent(EntityUpdatedEventData<Person> eventData)
    {
        var person = eventData.Entity;
        Console.WriteLine($"Remote Entity Updated - Name:{person.Name}, Age:{person.Age}, PhoneNumber:{person.PhoneNumber}");
    }

    void IEventHandler<EntityCreatedEventData<Person>>.HandleEvent(EntityCreatedEventData<Person> eventData)
    {
        var person = eventData.Entity;
        Console.WriteLine($"Remote Entity Created - Name:{person.Name}, Age:{person.Age}, PhoneNumber:{person.PhoneNumber}");

    }

    void IEventHandler<EntityDeletedEventData<Person>>.HandleEvent(EntityDeletedEventData<Person> eventData)
    {
        var person = eventData.Entity;
        Console.WriteLine($"Remote Entity Deleted - Name:{person.Name}, Age:{person.Age}, PhoneNumber:{person.PhoneNumber}");

    }
}


在生产者端,用IRepository对实体进行增删改操作。


var person = new Person()
{

    Name = "John",
    Age = 36,
    PhoneNumber = "18588888888"

};

personRepository.Insert(person);

var person2 = new Person()
{

    Name = "John2",
    Age = 36,
    PhoneNumber = "18588888889"

};
personRepository.Insert(person2);

var persons = personRepository.GetAllList();
foreach (var p in persons)
{
    p.Age += 1;
    personRepository.Update(p);
    Console.WriteLine($"Entity Updated - Name:{p.Name}, Age:{p.Age}, PhoneNumber:{p.PhoneNumber}");

}
foreach (var p in persons)
{
    personRepository.Delete(p);
    Console.WriteLine($"Entity Deleted - Name:{p.Name}, Age:{p.Age}, PhoneNumber:{p.PhoneNumber}");

}


运行程序(同时运行消费者端和生产者端),可以看到消费者端打印出了实体的增删改事件。

在这里插入图片描述

注意:

分布式事件总线在两个独立系统间传递事件,所以需要定义一个共同的类型对象,用于事件参数的传递。
因此消费者端需要引用生产者端的模块,以便获取共同的类型对象。

public override Assembly[] GetAdditionalAssemblies()
{
    var clientModuleAssembly = typeof(Person).GetAssembly();
    return [clientModuleAssembly];
}

传递自定义事件

定义NotificationEventData,用于传递自定义事件。


public class NotificationEventData : EventData
{
    public int Id { get; set; }
    
    public string Title { get; set; }

    public string Message { get; set; }

    public bool IsRead { get; set; }
}

在消费者端,定义一个事件处理器,用于处理自定义事件。

public class NotificationEventHandler :
    IEventHandler<NotificationEventData>,      
    ITransientDependency
{
    
    void IEventHandler<NotificationEventData>.HandleEvent(NotificationEventData eventData)
    {
        Console.WriteLine($"Id: {eventData.Id}");
        Console.WriteLine($"Title: {eventData.Title}");
        Console.WriteLine($"Message: {eventData.Message}");
        Console.WriteLine($"IsRead: {eventData.IsRead}");

    }
}

在生产者端,触发自定义事件。

var eventBus = IocManager.Instance.Resolve<IEventBus>();


eventBus.Trigger<NotificationEventData>(new NotificationEventData()
{
    Title = "Hi",
    Message = "Customized definition event test!",
    Id = 100,
    IsRead = true,
});

运行程序(同时运行消费者端和生产者端),可以看到消费者端打印出了自定义事件。

在这里插入图片描述

项目地址

Github:DistributedEventBus

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1324368.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

关键字:void关键字

在编程中&#xff0c;void 是一个关键字&#xff0c;用于表示函数没有返回值。具体来说&#xff0c;void 关键字的作用如下&#xff1a; 函数声明&#xff1a;在函数声明中使用 void 关键字可以指定函数没有返回值。例如&#xff1a; 这表示 func() 函数不返回任何值。 函数…

英国版咸鱼「Depop」,小众二手跨境电商平台如何入驻?

对标美国二手闲鱼平台Mercia,PoshMark、东南亚Etsy&#xff0c;Depop是英国的一个面向创意人群的二手时尚市场&#xff0c;类似于Instagram&#xff0c;但更专注于买卖二手服装、配饰和艺术品。 近一年来,受通胀和高利率影响,英国的经济几乎一直处于停滞状态&#xff0c;零售市…

亚信安慧AntDB数据库——助力5G计费核心替换,全面自主可控

数字经济时代&#xff0c;5G以更快、更丰富、更智能的连接方式服务于各行各业。AntDB数据库&#xff0c;源于亚信科技&#xff0c;自2008年起成功落地全国24个省份的中国移动、中国电信、中国联通和中国广电等运营商项目&#xff0c;为数字化服务和信息化基础建设提供支持。 在…

【开源软件】最好的开源软件-2023-第四名 vaadin

自我介绍 做一个简单介绍&#xff0c;酒架年近48 &#xff0c;有20多年IT工作经历&#xff0c;目前在一家500强做企业架构&#xff0e;因为工作需要&#xff0c;另外也因为兴趣涉猎比较广&#xff0c;为了自己学习建立了三个博客&#xff0c;分别是【全球IT瞭望】&#xff0c;【…

DAPLink源码固件编译与制作

DAPLink源码固件编译与制作 ✨这里以Air/stm32f103cbt6固件编译为例。&#x1f4cc;DAPLink源码地址&#xff1a;https://github.com/ARMmbed/DAPLink&#x1f516; 如果不想自己生成&#xff0c;可以使用合宙提供的现成的工程以及固件&#xff1b;https://gitee.com/openLuat/…

以太网的数据速率、互连介质和物理层规范

以太网协议连接已经广泛应用于我们周围的大量事物或设备中。过去&#xff0c;以太网用在局域网 (LAN) 和城域网 (MAN) 中&#xff0c;而如今&#xff0c;由于以太网的普及和多种优势&#xff0c;例如巨大的生态体系和日益增长的规模经济&#xff0c;它越来越多地用在存储和汽车…

手拉手全栈EasyExcel实现web上传下载

环境介绍 技术栈 springbootmybatis-plusmysqleasyexcel 软件 版本 mysql 8 IDEA IntelliJ IDEA 2022.2.1 JDK 1.8 Spring Boot 2.7.13 mybatis-plus 3.5.3.2 EasyExcel是一个基于Java的、快速、简洁、解决大文件内存溢出的Excel处理工具。 他能让你在不用考虑性…

思码逸签约 | 与致景科技达成战略合作,共同推动研发效能提升

此次合作旨在利用思码逸的先进度量和分析能力&#xff0c;帮助致景科技进一步完善其研发度量指标&#xff0c;从而提升整体研发效能。 本次合作的核心&#xff0c;是致景科技对思码逸在代码当量价值评估方面的高度认可。合作将专注于两个主要方面&#xff1a;一方面&#xff0…

(数据结构)单链表的插入删除

代码实现 #include<stdio.h> #include<stdlib.h> typedef struct LNode {int data;struct LNode* next; }LNode, * LinkList; //创建头结点 LNode* InitList(LinkList L) {L (LNode*)malloc(sizeof(LNode));if (L NULL){printf("申请头结点失败\n");…

鸿道(Intewell)工业操作系统推动新型工业化时代下的产教融合

为进一步落实推进粤港澳大湾区建设的国家战略&#xff0c;加速新一代信息技术与制造业深度融合&#xff0c;提升控制科学与工程学科建设水平&#xff0c;华南理工大学自动化科学与工程学院召开粤港澳大湾区机器智能产教融合论坛暨控制学科建设研讨会。作为国内新型工业操作系统…

SpringSecurity深度解析与实践(1)

目录 引言1. SpringSecurity1.1 SpringSecurity简介1.2 SpringSecurity工作原理1.3.特点 2. SpringSecurity的快速使用总结 引言 SpringSecurity作为Spring框架中的一个重要组成部分&#xff0c;扮演着保护应用程序安全的重要角色。本文将深入探讨SpringSecurity的原理、使用方…

比例导引(PNG)-Matlab 程序

本文提供比例导引的matlab程序&#xff0c;想要看理论的可以看书《导弹飞行力学》或者我的博客 比例导引详解 代码 %% 三维比例导引末制导clc;clear; close all;%% 设置导弹初始参数和目标参数% 总步长 length 1000000; x_m zeros(length,1); y_m zeros(length,1); z_m z…

【UML】第9篇 类图

目录 一、类图的概念 二、类图的主要作用 三、类图的构成 3.1 类的名称 3.2 抽象类&#xff08;Abstract Class&#xff09; 一、类图的概念 类图是UML模型中静态视图。它用来描述系统中的有意义的概念&#xff0c;包括具体的概念、抽象的概念、实现方面的概念等。静态视…

Halcon识别瓶盖字体,极坐标转换

Halcon识别瓶盖字体&#xff0c;极坐标转换 read_image (Image, D:/image/bilibili/photo/检测字符.png) ***转为灰度图 rgb1_to_gray (Image, GrayImage) threshold (GrayImage, Regions, 115, 255) get_image_size (GrayImage, Width, Height) *****填充 fill_up (Regions, …

算法基础之二分图的最大匹配

二分图的最大匹配 核心思想&#xff1a;匈牙利算法 : 寻找有没有可重新连接的路 #include<iostream>#include<cstring>#include<algorithm>using namespace std;const int N 510 , M 100010;int h[N],e[M],ne[M],idx;int match[N]; //记录与j匹配的iint n…

《大观》期刊杂志发表投稿方式

《大观》杂志刊登文化、文学、艺术、民俗、影视等领域的理论研究文章&#xff0c;杂志内容丰富&#xff0c;雅俗共赏&#xff0c;集权威性、实用性、前瞻性与专业性于一体&#xff0c;具有很高的学术价值和社会影响力。是广大专家、学者、教师 、学子发表论文、交流信息的重要平…

HarmonyOS:Neural Network Runtime对接AI推理框架开发指导

场景介绍 Neural Network Runtime 作为 AI 推理引擎和加速芯片的桥梁&#xff0c;为 AI 推理引擎提供精简的 Native 接口&#xff0c;满足推理引擎通过加速芯片执行端到端推理的需求。 本文以图 1 展示的 Add 单算子模型为例&#xff0c;介绍 Neural Network Runtime 的开发流…

Linux线程——死锁

什么是死锁 死锁是一组相互竞争资源的线程因为他们之间得到互相等待导致“永久“阻塞的现象&#xff1b;&#xff08;你等我 我等你 你不放我也不放 就导致“永久“阻塞的现象&#xff09; 死锁是指两个或两个以上的进程在执行过程中&#xff0c;由于竞争资源或者由于彼此通信…

【AntDesign】如何设置Form表单初始值以及会出现的问题

方法一&#xff1a;使用 setFieldsValue() 方法&#xff08;推荐&#xff09; 首先&#xff0c;解构出form实例 const [form] Form.useForm()然后&#xff0c;将该实例与Form绑定 <Form form{form} ...>...</Form>恰当时机&#xff0c;调用setFieldsValue()方法…

Python (十二) NumPy操作

程序员的公众号&#xff1a;源1024&#xff0c;获取更多资料&#xff0c;无加密无套路&#xff01; 最近整理了一波电子书籍资料&#xff0c;包含《Effective Java中文版 第2版》《深入JAVA虚拟机》&#xff0c;《重构改善既有代码设计》&#xff0c;《MySQL高性能-第3版》&…