基于事件总线EventBus实现邮件推送功能

news2025/1/23 4:06:24

什么是事件总线

事件总线是对发布-订阅模式的一种实现。它是一种集中式事件处理机制,允许不同的组件之间进行彼此通信而又不需要相互依赖,达到一种解耦的目的。 关于这个概念,网上有很多讲解的,这里我推荐一个讲的比较好的(事件总线知多少)

什么是RabbitMQ

RabbitMQ这个就不用说了,想必到家都知道。

粗糙流程图

简单来解释就是:

1、定义一个事件抽象类

public abstract class EventData
    {
        /// <summary>
        /// 唯一标识
        /// </summary>
        public string Unique { get; set; }
        /// <summary>
        /// 是否成功
        /// </summary>
        public bool Success { get; set; }
        /// <summary>
        /// 结果
        /// </summary>
        public string Result { get; set; }
    }

2、定义一个事件处理抽象类,以及对应的一个队列消息执行的一个记录、

public abstract class EventHandler<T> where T : EventData
    {
        public async Task Handler(T eventData)
        {
            await BeginHandler(eventData.Unique);
            eventData = await ProcessingHandler(eventData);
            if (eventData.Success)
                await FinishHandler(eventData);
        }
        /// <summary>
        ///  开始处理
        /// </summary>
        /// <param name="unique"></param>
        /// <returns></returns>
        protected abstract Task BeginHandler(string unique);
        /// <summary>
        /// 处理中
        /// </summary>
        /// <param name="eventData"></param>
        /// <returns></returns>
        protected abstract Task<T> ProcessingHandler(T eventData);
        /// <summary>
        /// 处理完成
        /// </summary>
        /// <param name="eventData"></param>
        /// <returns></returns>
        protected abstract Task FinishHandler(T eventData);
    }
    
   [Table("Sys_TaskRecord")]
    public class TaskRecord : Entity<long>
    {
        /// <summary>
        /// 任务类型
        /// </summary>
        public TaskRecordType TaskType { get; set; }
        /// <summary>
        /// 任务状态
        /// </summary>
        public int TaskStatu { get; set; }
        /// <summary>
        /// 任务值
        /// </summary>
        public string TaskValue { get; set; }
        /// <summary>
        /// 任务结果
        /// </summary>
        public string TaskResult { get; set; }
        /// <summary>
        /// 任务开始时间
        /// </summary>
        public DateTime TaskStartTime { get; set; }
        /// <summary>
        /// 任务完成时间
        /// </summary>
        public DateTime? TaskFinishTime { get; set; }
        /// <summary>
        /// 任务最后更新时间
        /// </summary>
        public DateTime? LastUpdateTime { get; set; }
        /// <summary>
        /// 任务名称
        /// </summary>
        public string TaskName { get; set; }
        /// <summary>
        /// 附加数据
        /// </summary>
        public string AdditionalData { get; set; }
    }

3、定义一个邮件事件消息类,继承自EventData,以及一个邮件处理的Hanler继承自EventHandler

public class EmailEventData:EventData
   {
       /// <summary>
       /// 邮件内容
       /// </summary>
       public string Body { get; set; }
       /// <summary>
       /// 接收者
       /// </summary>
       public string Reciver { get; set; }
   }
 
public class CreateEmailHandler<T> : Core.EventBus.EventHandler<T> where T : EventData
   {
       private IEmailService emailService;
       private IUnitOfWork unitOfWork;
       private ITaskRecordService taskRecordService;
       public CreateEmailHandler(IEmailService emailService, IUnitOfWork unitOfWork, ITaskRecordService taskRecordService)
       {
           this.emailService = emailService;
           this.unitOfWork = unitOfWork;
           this.taskRecordService = taskRecordService;
       }
       protected override async Task BeginHandler(string unique)
       {
           await taskRecordService.UpdateRecordStatu(Convert.ToInt64(unique), (int)MqMessageStatu.Processing);
           await unitOfWork.CommitAsync();
       }
 
       protected override async Task<T> ProcessingHandler(T eventData)
       {
           try
           {
               EmailEventData emailEventData = eventData as EmailEventData;
               await emailService.SendEmail(emailEventData.Reciver, emailEventData.Reciver, emailEventData.Body, "[闲蛋]收到一条留言");
               eventData.Success = true;
           }
           catch (Exception ex)
           {
               await taskRecordService.UpdateRecordFailStatu(Convert.ToInt64(eventData.Unique), (int)MqMessageStatu.Fail,ex.Message);
               await unitOfWork.CommitAsync();
               eventData.Success = false;
           }
           return eventData;
 
       }
 
       protected override async Task FinishHandler(T eventData)
       {
           await taskRecordService.UpdateRecordSuccessStatu(Convert.ToInt64(eventData.Unique), (int)MqMessageStatu.Finish,"");
           await unitOfWork.CommitAsync();
       }

 4、接着就是如何把事件消息和事件Hanler关联起来,那么我这里思路就是把EmailEventData的类型和CreateEmailHandler的类型先注册到字典里面,这样我就可以根据EmailEventData找到对应的处理程序了,找类型还不够,如何创建实例呢,这里就还需要把CreateEmailHandler注册到DI容器里面,这样就可以根据容器获取对象了,如下

  public void AddSub<T, TH>()
             where T : EventData
             where TH : EventHandler<T>
        {
            Type eventDataType = typeof(T);
            Type handlerType = typeof(TH);
            if (!eventhandlers.ContainsKey(typeof(T)))
                eventhandlers.TryAdd(eventDataType, handlerType);
            _serviceDescriptors.AddScoped(handlerType);
        }
-------------------------------------------------------------------------------------------------------------------
 public Type FindEventType(string eventName)
        {
            if (!eventTypes.ContainsKey(eventName))
                throw new ArgumentException(string.Format("eventTypes不存在类名{0}的key", eventName));
            return eventTypes[eventName];
        }
------------------------------------------------------------------------------------------------------------------------------------------------------------
  public object FindHandlerType(Type eventDataType)
        {
            if (!eventhandlers.ContainsKey(eventDataType))
                throw new ArgumentException(string.Format("eventhandlers不存在类型{0}的key", eventDataType.FullName));
            var obj = _buildServiceProvider(_serviceDescriptors).GetService(eventhandlers[eventDataType]);
            return obj;
        }
----------------------------------------------------------------------------------------------------------------------------------
 private static IServiceCollection AddEventBusService(this IServiceCollection services)
        {
            string exchangeName = ConfigureProvider.configuration.GetSection("EventBusOption:ExchangeName").Value;
            services.AddEventBus(Assembly.Load("XianDan.Application").GetTypes())
                .AddSubscribe<EmailEventData, CreateEmailHandler<EmailEventData>>(exchangeName, ExchangeType.Direct, BizKey.EmailQueueName);
            return services;
        }

5、发送消息,这里代码简单,就是简单的发送消息,这里用eventData.GetType().Name作为消息的RoutingKey,这样消费这就可以根据这个key调用FindEventType,然后找到对应的处理程序了

using (IModel channel = connection.CreateModel())
{
     string routeKey = eventData.GetType().Name;
     string message = JsonConvert.SerializeObject(eventData);
     byte[] body = Encoding.UTF8.GetBytes(message);
     channel.ExchangeDeclare(exchangeName, exchangeType, true, false, null);
     channel.QueueDeclare(queueName, true, false, false, null);
     channel.BasicPublish(exchangeName, routeKey, null, body);
}

6、订阅消息,核心的是这一段

  Type eventType = _eventBusManager.FindEventType(eventName);  var eventData = (T)JsonConvert.DeserializeObject(body, eventType);  EventHandler<T> eventHandler = _eventBusManager.FindHandlerType(eventType)  as       EventHandler<T>;

public void Subscribe<T, TH>(string exchangeName, string exchangeType, string queueName)
           where T : EventData
           where TH : EventHandler<T>
       {
           try
           {
               _eventBusManager.AddSub<T, TH>();
               IModel channel = connection.CreateModel();
               channel.QueueDeclare(queueName, true, false, false, null);
               channel.ExchangeDeclare(exchangeName, exchangeType, true, false, null);
               channel.QueueBind(queueName, exchangeName, typeof(T).Name, null);
               var consumer = new EventingBasicConsumer(channel);
               consumer.Received += async (model, ea) =>
               {
                   string eventName = ea.RoutingKey;
                   byte[] resp = ea.Body.ToArray();
                   string body = Encoding.UTF8.GetString(resp);
                   try
                   {
                       Type eventType = _eventBusManager.FindEventType(eventName);
                       var eventData = (T)JsonConvert.DeserializeObject(body, eventType);
                       EventHandler<T> eventHandler = _eventBusManager.FindHandlerType(eventType) as EventHandler<T>;
                       await eventHandler.Handler(eventData);
                   }
                   catch (Exception ex)
                   {
                       LogUtils.LogError(ex, "EventBusRabbitMQ", ex.Message);
                   }
                   finally
                   {
                       channel.BasicAck(ea.DeliveryTag, false);
                   }
                  
 
               };
               channel.BasicConsume(queueName, autoAck: false, consumer: consumer);
           }
           catch (Exception ex)
           {
               LogUtils.LogError(ex, "EventBusRabbitMQ.Subscribe", ex.Message);
           }
 
       }

注意,这里我使用的时候有个小坑,就是最开始是用using包裹这个IModel channel = connection.CreateModel();导致最后程序启动后无法收到消息,然后去rabbitmq的管理界面发现没有channel连接,队列也没有消费者,最后发现可能是using执行完后就释放掉了,把using去掉就好了。

好了,到此,我的思路大概讲完了,现在我的网站留言也可以收到邮件了,那么多测试邮件,哈哈哈哈哈

文章转载自:灬丶

原文链接:https://www.cnblogs.com/MrHanBlog/p/18381572

体验地址:引迈 - JNPF快速开发平台_低代码开发平台_零代码开发平台_流程设计器_表单引擎_工作流引擎_软件架构

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2080151.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

光伏设计时要画出哪些模型?

在光伏系统的设计中&#xff0c;为了确保项目的顺利实施与高效运行&#xff0c;设计师需要绘制多种模型来综合考虑各种因素&#xff0c;包括参照物、障碍物以及楼顶配房等。这些模型不仅有助于预测光伏系统的发电效率&#xff0c;还能帮助规划最佳的安装布局&#xff0c;减少阴…

碎片笔记|Computer Journal 期刊投稿注意事项

前言&#xff1a;3月份把之前做的一篇工作转投到了computer journal&#xff0c;8月7号来信说我投稿的工作之前因为挂在arXiv上&#xff0c;因此和正常的投稿要求不太一致&#xff0c;需要更换投稿方式&#xff0c;编辑提供了两种选择如下。 The first choice is to keep your …

python 接口自动化测试中的高阶函数!

高阶函数简介 高阶函数是指接受函数作为参数或者返回函数作为结果的函数。在 Python 中&#xff0c;有许多内置的高阶函数&#xff0c;如 map, filter, reduce 等&#xff0c;它们可以极大地简化代码并提高代码的可维护性。 summer camp map 函数 map 函数接收一个函数和一个…

Jmeter下载、配置环境变量

Jmeter下载 下载地址&#xff1a;Apache JMeter - Download Apache JMeter 下载后无需安装&#xff0c;解压后即可使用。解压后目录如下 配置环境变量 JMETER_HOME 环境变量Path %JMETER_HOME%\bin 环境变量CLASSPATH %JMETER_HOME%\lib 验证是否配置成功 在cmd命令窗中 输入…

玄机又成国漫首创!IP与AI融合,凭实力火出圈

现在国漫越来越卷了&#xff0c;不仅卷制作质量&#xff0c;还卷各种花式联动。最近玄机科技和百度文库联合举办的AI漫画大赛圆满结束&#xff0c;这还是国内的IP第一次和AI技术融合&#xff0c;而且产出了不少好作品。下面就一起来看看吧&#xff01; 提到玄机科技&#xff0c…

机器人笛卡尔空间轨迹规划-直线差补和圆弧差补

上一文&#xff0c;我们讨论了三次多项式和五次多项式的差补算法&#xff0c;那么这边文章具体讨论一下笛卡尔空间轨迹规划的直线差补和圆弧差补。 步骤 &#xff08;1&#xff09;知道起始点和终止点的位姿&#xff0c;和速度信息。 &#xff08;2&#xff09;根据两点能确定一…

湖仓一体大数据平台:开启企业数据管理新时代(附Hudi案例)

湖仓一体大数据平台&#xff1a;开启企业数据管理新时代&#xff08;附Hudi案例&#xff09; 前言湖仓一体大数据平台 前言 在当今数字化浪潮汹涌澎湃的时代&#xff0c;数据如同企业发展的珍贵宝藏&#xff0c;而如何高效地挖掘、管理和利用这些宝藏&#xff0c;成为了企业在…

详细的Anaconda安装jupyter notebook与使用

jupyter notebook概念 Jupyter Notebook 是一种交互式计算环境&#xff0c;广泛用于数据分析、机器学习和编程学习等领域。 一、pip安装 打开 Anaconda Prompt 输入&#xff1a;pip install jupyter notebook pip install jupyter notebook 安装成功画面 输入命令&#xff1…

Git实战精粹

一、快速入门 1. 什么是Git Git是一个分布式的版本控制软件。 软件&#xff0c;类似于QQ、office、dota等安装到电脑上才能使用的工具版本控制&#xff0c;类似于毕业论文、写文案、视频剪辑等&#xff0c;需要反复修改和保留原历史数据分布式 文件夹拷贝本地版本控制集中式…

SpringMVC基于注解的使用

SpringMVC基于注解的使用 首先导入spring-mvc的依赖文件 然后配置上篇文章的web.xml文件 在配置上篇文章的spring-mvc.xml文件 创建一个ParamsContrller类写个方法方法里面的参数名可以用到客户端请求&#xff0c;且可以为参数写任意类型 如果想改参数名可以用RequestParam为…

3D模型OBJ格式详解

竹杖芒鞋轻胜马,谁怕?一蓑烟雨任平生~ 个人主页&#xff1a; rainInSunny | 个人专栏&#xff1a; C那些事儿、 Qt那些事儿 文章目录 写在前面OBJ格式解析OBJ文件示例MTL文件OBJ渲染简述 写在前面 OBJ格式是一种简单的文本格式&#xff0c;用于描述三维模型的几何形状和材质…

golang本地缓存fastcache高性能实现原理

1. git仓库 https://github.com/abbothzhang/fastcache 2. 整体原理 initCache时不会申请内存&#xff0c;只有第一次set时候才会申请&#xff0c;且会一次性申请64MB&#xff0c;后面不够了又一次性申请1024*64MB大小内存 2.1. 时序图 3. 高性能原因 将cache分为512个buc…

Unity(2022.3.41LTS) - 网格,纹理,材质

目录 零.简介 一、网格&#xff08;Mesh&#xff09; 二、材质&#xff08;Material&#xff09; 三、纹理&#xff08;Texture&#xff09; 四、三者之间的关系 零.简介 在 Unity 中&#xff0c;网格&#xff08;Mesh&#xff09;、纹理&#xff08;Texture&#xff09;和…

软考评测知识点

常见的存储单位&#xff1a; 1B8bit 1TB1024GB 1GBMBKBB 机器数&#xff1a;将符号数字化的数&#xff0c;是数字在计算机中的二进制表示形式。&#xff08;最高位0表示正数&#xff0c;1表示负数&#xff09; 二进制正数的原码、反码、补码不变&#xff0c;移码等于补码符号位…

外包干了两年,快要废了。。。

&#x1f345; 点击文末小卡片&#xff0c;免费获取软件测试全套资料&#xff0c;资料在手&#xff0c;涨薪更快 简单的说下&#xff0c;我大学的一个同学&#xff0c;毕业后我自己去了自研的公司&#xff0c;他去了外包&#xff0c;快两年了我薪资、技术各个方面都有了很大的…

Ubuntu下部署Hadoop集群+Hive(三)

Hive部署 准备环境 apache-hive-4.0.0-bin.tar.gz、mysql-connector-j-8.1.0.jar 如果是离线安装的话&#xff0c;使用mysql-8.0.34-1.el7.x86_64.rpm-bundle.tar&#xff0c;在线安装的话则不用&#xff1b; hive下载地址&#xff1a;Index of /hive (apache.org) mysql …

面试中的SEO优化:从基本概念到实用策略

前言 为什么要学习SEO SEO对于Web站点很重要&#xff0c;有助于优化网页在搜索引擎中的排名&#xff0c;提升网站可见性和流量。掌握SEO技术可以确保网页结构和内容对搜索引擎友好&#xff0c;从而提高用户访问量和用户体验。而且SEO被面试问的很多 SEO是什么&#xff1f; …

day02-面向对象-多态抽象类接口

一、⭐多态⭐ 1.1 概述 1.多态是在继承/实现情况下的一种现象, 表现为对象多态和行为多态 ​ 2.⭐对象多态写法&#xff1a; ​继承&#xff1a;父类 变量 new 子类1()&#xff1b; ​父类 变量 new 子类2()&#xff1b;实现&#xff1a;接口 变量 new 实现类(); ​ 3.多态…

Comsol 微穿孔板吸声性能优化、提升吸声系数

微穿孔板吸声体是由穿孔直径在1毫米以下的薄板和板后空腔组成的共振吸声结构。与传统的吸声材料及普通穿孔板吸声体相比,微穿孔板吸声体清洁,可收回重复利用,不燃,坚固,重量轻,由于不需另加纤维等多孔性吸声材料即可获得良好的吸声性能,且制造不受材料限制,不污染环境,已成功应…

【Python 千题 —— 基础篇】简易银行

= Python 千题持续更新中 …… 脑图地址 👉:⭐https://twilight-fanyi.gitee.io/mind-map/Python千题.html⭐ 题目描述 题目描述 编写一个面向对象的程序,模拟一个简化的银行系统。要求定义一个 BankAccount 类,具有基本的存款、取款和查询余额的功能。然后,创建一个 S…