.Net Core对于RabbitMQ封装分布式事件总线

news2025/1/10 17:50:42

首先我们需要了解到分布式事件总线是什么;

分布式事件总线是一种在分布式系统中提供事件通知、订阅和发布机制的技术。它允许多个组件或微服务之间的协作和通信,而无需直接耦合或了解彼此的实现细节。通过事件总线,组件或微服务可以通过发布或订阅事件来实现异步通信。

例如,当一个组件完成了某项任务并生成了一个事件,它可以通过事件总线发布该事件。其他相关组件可以通过订阅该事件来接收通知,并做出相应的反应。这样,组件之间的耦合就被减轻了,同时也提高了系统的可维护性和可扩展性。

然后了解一下RabbitMQ

RabbitMQ是一种开源的消息代理和队列管理系统,用于在分布式系统中进行异步通信。它的主要功能是接收和分发消息,并且支持多种协议,包括AMQP,STOMP,MQTT等。RabbitMQ通过一个中间层,可以把消息发送者与消息接收者隔离开来,因此消息发送者和消息接收者并不需要在同一时刻在线,并且也不需要互相知道对方的地址。

  1. RabbitMQ的主要功能包括:

    1. 消息存储:RabbitMQ可以将消息存储在内存或硬盘上,以保证消息的完整性。

    2. 消息路由:RabbitMQ支持消息的路由功能,可以将消息从生产者发送到消费者。

    3. 消息投递:RabbitMQ提供了多种消息投递策略,包括简单模式、工作队列、发布/订阅模式等。

    4. 可靠性:RabbitMQ保证消息的可靠性,即消息不会丢失、不重复、按顺序投递。

    5. 可扩展性:RabbitMQ支持水平扩展,可以通过增加节点来扩展系统的处理能力。

本文将讲解使用RabbitMQ实现分布式事件

实现我们创建一个EventsBus.Contract的类库项目,用于提供基本接口,以支持其他实现

在项目中添加以下依赖引用,并且记得添加EventsBus.Contract项目引用

<ItemGroup>
 <PackageReference Include="Microsoft.Extensions.DependencyInjection.Abstractions" Version="7.0.0" />
    <PackageReference Include="Microsoft.Extensions.Options" Version="7.0.0" />
    <PackageReference Include="Microsoft.Extensions.Options.ConfigurationExtensions" Version="7.0.0" />
    <PackageReference Include="RabbitMQ.Client" Version="6.4.0" />
</ItemGroup>

创建项目完成以后分别创建EventsBusOptions.cs,IEventsBusHandle.cs,RabbitMQEventsManage.cs,ILoadEventBus.cs ,提供我们的分布式事件基本接口定义

EventsBusOptions.cs

namespace EventsBus.Contract;

public class EventsBusOptions
{
    /// <summary>
    /// 接收时异常事件
    /// </summary>
    public static Action<IServiceProvider, Exception,byte[]>? ReceiveExceptionEvent;
}

IEventsBusHandle.cs

namespace EventsBus.Contract;

public interface IEventsBusHandle<in TEto> where TEto : class
{
    Task HandleAsync(TEto eventData);
}

ILoadEventBus.cs

namespace EventsBus.Contract;

public interface ILoadEventBus
{
    /// <summary>
    /// 发布事件
    /// </summary>
    /// <param name="eto"></param>
    /// <typeparam name="TEto"></typeparam>
    /// <returns></returns>
    Task PushAsync<TEto>(TEto eto) where TEto : class;
}

EventsBusAttribute.cs:用于Eto(Eto 是我们按照约定使用的Event Transfer Objects(事件传输对象)的后缀. s虽然这不是必需的,但我们发现识别这样的事件类很有用(就像应用层上的DTO 一样))的名称,对应到RabbitMQ的通道

namespace EventsBus.RabbitMQ;

[AttributeUsage(AttributeTargets.Class)]
public class EventsBusAttribute : Attribute
{
    public readonly string Name;

    public EventsBusAttribute(string name)
    {
        Name = name;
    }
}

然后可以创建我们的RabbitMQ实现了,创建EventsBus.RabbitMQ类库项目,用于编写EventsBus.ContractRabbitMQ实现

创建项目完成以后分别创建Extensions\EventsBusRabbitMQExtensions.cs,Options\RabbitMQOptions.cs,EventsBusAttribute.cs,,RabbitMQFactory.cs,RabbitMQLoadEventBus.cs

Extensions\EventsBusRabbitMQExtensions.cs:提供我们RabbitMQ扩展方法让使用者更轻松的注入,命名空间使用Microsoft.Extensions.DependencyInjection,这样就在注入的时候减少过度使用命名空间了

using EventsBus.Contract;
using EventsBus.RabbitMQ;
using EventsBus.RabbitMQ.Options;
using Microsoft.Extensions.Configuration;

namespace Microsoft.Extensions.DependencyInjection;

public static class EventsBusRabbitMQExtensions
{
    public static IServiceCollection AddEventsBusRabbitMQ(this IServiceCollection services,
        IConfiguration configuration)
    {
        services.AddSingleton<RabbitMQFactory>();
        services.AddSingleton(typeof(RabbitMQEventsManage<>));
        services.Configure<RabbitMQOptions>(configuration.GetSection(nameof(RabbitMQOptions)));
        services.AddSingleton<ILoadEventBus, RabbitMQLoadEventBus>();
        
        return services;
    }
}

Options\RabbitMQOptions.cs:提供基本的Options 读取配置文件中并且注入,services.Configure<RabbitMQOptions>(configuration.GetSection(nameof(RabbitMQOptions)));的方法是读取IConfiguration的名称为RabbitMQOptions的配置东西,映射到Options中,具体使用往下看。

using RabbitMQ.Client;

namespace EventsBus.RabbitMQ.Options;

public class RabbitMQOptions
{
    /// <summary>
    /// 要连接的端口。 <see cref="AmqpTcpEndpoint.UseDefaultPort"/>
    /// 指示应使用的协议的缺省值。
    /// </summary>
    public int Port { get; set; } = AmqpTcpEndpoint.UseDefaultPort;

    /// <summary>
    /// 地址
    /// </summary>
    public string HostName { get; set; }

    /// <summary>
    /// 账号
    /// </summary>
    public string UserName { get; set; }

    /// <summary>
    /// 密码
    /// </summary>
    public string Password { get; set; }
}

RabbitMQEventsManage.cs:用于管理RabbitMQ的数据接收,并且将数据传输到指定的事件处理程序

using System.Reflection;
using System.Text.Json;
using EventsBus.Contract;
using Microsoft.Extensions.DependencyInjection;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;

namespace EventsBus.RabbitMQ;

public class RabbitMQEventsManage<TEto> where TEto : class
{
    private readonly IServiceProvider _serviceProvider;
    private readonly RabbitMQFactory _rabbitMqFactory;

    public RabbitMQEventsManage(IServiceProvider serviceProvider, RabbitMQFactory rabbitMqFactory)
    {
        _serviceProvider = serviceProvider;
        _rabbitMqFactory = rabbitMqFactory;
        _ = Task.Run(Start);
    }

    private void Start()
    {
        var channel = _rabbitMqFactory.CreateRabbitMQ();
        var eventBus = typeof(TEto).GetCustomAttribute<EventsBusAttribute>();
        var name = eventBus?.Name ?? typeof(TEto).Name;
        channel.QueueDeclare(name, false, false, false, null);
        var consumer = new EventingBasicConsumer(channel); //消费者
        channel.BasicConsume(name, true, consumer); //消费消息
        consumer.Received += async (model, ea) =>
        {
            var bytes = ea.Body.ToArray();
            try
            {
                // 这样就可以实现多个订阅
                var events = _serviceProvider.GetServices<IEventsBusHandle<TEto>>();
                foreach (var handle in events)
                {
                    await handle?.HandleAsync(JsonSerializer.Deserialize<TEto>(bytes));
                }
            }
            catch (Exception e)
            {
                EventsBusOptions.ReceiveExceptionEvent?.Invoke(_serviceProvider, e, bytes);
            }
        };
    }
}

RabbitMQFactory.cs:提供RabbitMQ链接工厂,在这里你可以自己去定义和管理RabbitMQ工厂

using EventsBus.RabbitMQ.Options;
using Microsoft.Extensions.Options;
using RabbitMQ.Client;

namespace EventsBus.RabbitMQ;

public class RabbitMQFactory : IDisposable
{
    private readonly RabbitMQOptions _options;
    private readonly ConnectionFactory _factory;
    private IConnection? _connection;

    public RabbitMQFactory(IOptions<RabbitMQOptions> options)
    {
        _options = options?.Value;
        // 将Options中的参数添加到ConnectionFactory
        _factory = new ConnectionFactory
        {
            HostName = _options.HostName,
            UserName = _options.UserName,
            Password = _options.Password,
            Port = _options.Port
        };
    }

    public IModel CreateRabbitMQ()
    {
        // 当第一次创建RabbitMQ的时候进行链接
        _connection ??= _factory.CreateConnection();

        return _connection.CreateModel();
    }

    public void Dispose()
    {
        _connection?.Dispose();
    }
}

RabbitMQLoadEventBus.cs:用于实现ILoadEventBus.cs通过ILoadEventBus发布事件RabbitMQLoadEventBus.cs是RabbitMQ的实现

using System.Reflection;
using System.Text.Json;
using EventsBus.Contract;
using Microsoft.Extensions.DependencyInjection;

namespace EventsBus.RabbitMQ;

public class RabbitMQLoadEventBus : ILoadEventBus
{
    private readonly IServiceProvider _serviceProvider;
    private readonly RabbitMQFactory _rabbitMqFactory;

    public RabbitMQLoadEventBus(IServiceProvider serviceProvider, RabbitMQFactory rabbitMqFactory)
    {
        _serviceProvider = serviceProvider;
        _rabbitMqFactory = rabbitMqFactory;
    }

    public async Task PushAsync<TEto>(TEto eto) where TEto : class
    {

        //创建一个通道
        //这里Rabbit的玩法就是一个通道channel下包含多个队列Queue
        using var channel = _rabbitMqFactory.CreateRabbitMQ();
        
        // 获取Eto中的EventsBusAttribute特性,获取名称,如果没有默认使用类名称
        var eventBus = typeof(TEto).GetCustomAttribute<EventsBusAttribute>();
        var name = eventBus?.Name ?? typeof(TEto).Name;
        
        // 使用获取的名称创建一个通道
        channel.QueueDeclare(name, false, false, false, null);
        var properties = channel.CreateBasicProperties();
        properties.DeliveryMode = 1;
        // 将数据序列号,然后发布
        channel.BasicPublish("", name, false, properties, JsonSerializer.SerializeToUtf8Bytes(eto)); //生产消息
        // 让其注入启动管理服务,RabbitMQEventsManage需要手动激活,由于RabbitMQEventsManage是单例,只有第一次激活才有效,
        var eventsManage = _serviceProvider.GetService<RabbitMQEventsManage<TEto>>();
        
        await Task.CompletedTask;
    }
}

在这里我们的RabbitMQ分布式事件就设计完成了,注:这只是简单的一个示例,并未经过大量测试,请勿直接在生产使用;

然后我们需要使用RabbitMQ分布式事件总线工具包

使用RabbitMQ分布式事件总线的示例

首先我们需要准备一个RabbitMQ,可以在官网自行下载,我就先使用简单的,通过docker compose启动一个RabbitMQ,下面提供一个compose文件

version: '3.1'
services:
  rabbitmq:
    restart: always # 开机自启
    image: rabbitmq:3.11-management # RabbitMQ使用的镜像
    container_name: rabbitmq # docker名称
    hostname: rabbit
    ports:
      - 5672:5672 # 只是RabbitMQ SDK使用的端口
      - 15672:15672 # 这是RabbitMQ管理界面使用的端口
    environment:
      TZ: Asia/Shanghai # 设置RabbitMQ时区
      RABBITMQ_DEFAULT_USER: token # rabbitMQ账号
      RABBITMQ_DEFAULT_PASS: dd666666 # rabbitMQ密码
    volumes:
      - ./data:/var/lib/rabbitmq

启动以后我们创建一个WebApi项目,项目名称Demo,创建完成打开项目文件添加引用

<Project Sdk="Microsoft.NET.Sdk.Web">

    <PropertyGroup>
        <TargetFramework>net7.0</TargetFramework>
        <Nullable>enable</Nullable>
        <ImplicitUsings>enable</ImplicitUsings>
    </PropertyGroup>

    <ItemGroup>
        <PackageReference Include="Microsoft.AspNetCore.OpenApi" Version="7.0.0" />
        <PackageReference Include="Swashbuckle.AspNetCore" Version="6.4.0" />
    </ItemGroup>

    <ItemGroup>
        <!-- 引用RabbitMQ事件总线项目-->
        <ProjectReference Include="..\EventsBus.RabbitMQ\EventsBus.RabbitMQ.csproj" />
    </ItemGroup>

</Project>

修改appsettings.json配置文件:将RabbitMQ的配置写上,RabbitMQOptions名称对应在EventsBus.RabbitMQ中的RabbitMQOptions文件

在这里注入的时候将配置注入好了

{
  "Logging": {
    "LogLevel": {
      "Default": "Information",
      "Microsoft.AspNetCore": "Warning"
    }
  },
  "AllowedHosts": "*",
  "RabbitMQOptions": {
    "HostName": "127.0.0.1",
    "UserName": "token",
    "Password": "dd666666"
  }
}

创建DemoEto.cs文件:

using EventsBus.RabbitMQ;

namespace Demo;

[EventsBus("Demo")]
public class DemoEto
{
    public int Size { get; set; }
    
    public string Value { get; set; }
}

创建DemoEventsBusHandle.cs文件:这里是订阅DemoEto事件,相当于是DemoEto的处理程序

using System.Text.Json;
using EventsBus.Contract;

namespace Demo;

/// <summary>
/// 事件处理服务,相当于订阅事件
/// </summary>
public class DemoEventsBusHandle : IEventsBusHandle<DemoEto>
{
    public async Task HandleAsync(DemoEto eventData)
    {
        Console.WriteLine($"DemoEventsBusHandle: {JsonSerializer.Serialize(eventData)}");
        await Task.CompletedTask;
    }
}

打开Program.cs 修改代码: 在这里注入了事件总线服务,和我们的事件处理服务

using Demo;
using EventsBus.Contract;

var builder = WebApplication.CreateBuilder(args);

builder.Services.AddControllers();

builder.Services.AddEndpointsApiExplorer();
builder.Services.AddSwaggerGen();

// 注入事件处理服务
builder.Services.AddSingleton(typeof(IEventsBusHandle<DemoEto>),typeof(DemoEventsBusHandle));

// 注入RabbitMQ服务
builder.Services.AddEventsBusRabbitMQ(builder.Configuration);

var app = builder.Build();

// 只有在Development显示Swagger
if (app.Environment.IsDevelopment())
{
    app.UseSwagger();
    app.UseSwaggerUI();
}

// 强制Https
app.UseHttpsRedirection();

app.UseAuthorization();

app.MapControllers();

app.Run();

创建Controllers\EventBusController.cs控制器:我们在控制器中注入了ILoadEventBus ,通过调用接口实现发布事件;

using EventsBus.Contract;
using Microsoft.AspNetCore.Mvc;

namespace Demo.Controllers;

[ApiController]
[Route("[controller]")]
public class EventBusController : ControllerBase
{
    private readonly ILoadEventBus _loadEventBus;

    public EventBusController(ILoadEventBus loadEventBus)
    {
        _loadEventBus = loadEventBus;
    }

    /// <summary>
    /// 发送信息
    /// </summary>
    /// <param name="eto"></param>
    [HttpPost]
    public async Task Send(DemoEto eto)
    {
        await _loadEventBus.PushAsync(eto);
    }
}

然后我们启动程序会打开Swagger调试界面:

然后我们发送一下事件:

我们可以看到,在数据发送的时候也同时订阅到了我们的信息,也可以通过分布式事件总线限流等实现, 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/343545.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

深入浅出C语言——数据在内存中的存储

文章目录一、数据类型详细介绍1. C语言中的内置类型2. 类型的基本归类&#xff1a;二. 整形在内存中的存储1. 原码、反码、补码2. 大小端三.浮点数存储规则一、数据类型详细介绍 1. C语言中的内置类型 C语言的内置类型有char、short、int、long、long long、float、double&…

第四章.误差反向传播法—误差反向传播法实现手写数字识别神经网络

第四章.误差反向传播法 4.3 误差反向传播法实现手写数字识别神经网络 通过像组装乐高积木一样组装第四章中实现的层&#xff0c;来构建神经网络。 1.神经网络学习全貌图 1).前提&#xff1a; 神经网络存在合适的权重和偏置&#xff0c;调整权重和偏置以便拟合训练数据的过程称…

2023年1月洗衣机品牌销量排行:总销售额近30亿,海尔品牌领跑

鲸参谋电商大数据2023年1月京东平台“洗衣机”品类完整销售数据出炉&#xff01; 根据鲸参谋电商数据显示&#xff0c;2023年1月在京东平台上&#xff0c;洗衣机的销量为174.5万&#xff0c;相较于2022年12月&#xff0c;环比上涨了5.1%&#xff0c;但相较于去年同期&#xff0…

C语言学习笔记(七): 指针的使用

指针变量 指针是一种特殊的变量&#xff0c;它存储的是某个变量的内存地址。指针变量可以存储内存地址&#xff0c;并且通过指针变量可以间接操作内存中的数据 include <stdio.h> int main() {int a1, * p; //定义指针变量,*是指针运算符p &a; //把a的地…

线段树--RMQ问题

线段树由来算法讲解分析树的数据结构结点四个基本操作例题天才的记忆最大数由来 线段树是RMQ区间最值问题的一种解题方法&#xff0c;在给出的区间是静态不变的时候&#xff0c;可以使用ST算法进行离线查询某个区间的最值&#xff0c;先预处理后进行m次查询&#xff0c;时间复…

9. QML_OpenGL--2. 在QQuick中搭建加载OpenGL框架

1. 说明&#xff1a; OPenGL一般在 QtWidget 中使用&#xff0c;但目前使用 QML 做界面开发是一种趋势&#xff0c;同时在QML中使用OPenGL进行渲染也是十分必要&#xff0c;文章简单介绍如何在QML中使用 OPenGL&#xff0c;搭建了一种基本的框架。整体思路和在 QtWidget 中类似…

RabbitMQ学习(四):消息应答

一、消息应答的概念消费者完成一个任务可能需要一段时间&#xff0c;如果其中一个消费者处理一个长的任务并仅只完成 了部分突然它挂掉了&#xff0c;会发生什么情况。RabbitMQ 一旦向消费者传递了一条消息&#xff0c;便立即将该消 息标记为删除。在这种情况下&#xff0c;突然…

C++中引用的本质以及与指针的区别(c++数据在内存中的分配)

1、引用的意义 引用作为变量别名而存在&#xff0c;因此在一些场合可以替代指针&#xff0c;引用相对于指针来说具有更好的可读性和实用性 // swap函数的实现对比 #include <iostream> using namespace std;void swap1(int a, int b); void swap2(int *p1, int *p2); v…

【数据结构】---顺序表的实现

最近学校开始学习数据结构了&#xff0c;没事就手搓一个顺序表。&#x1f308;线性表线性表是n个具有相同特性的数据元素的有限序列&#xff0c;是一种实际中广泛使用的数据结构&#xff0c;常见的线性表有顺序表、链表、栈、队列、字符串。线性表在逻辑上是线性结构&#xff0…

【C语言学习笔记】:静态库

一、什么是库 库是写好的现有的&#xff0c;成熟的&#xff0c;可以复用的代码。现实中每个程序都要依赖很多基础的底层库&#xff0c;不可能每个人的代码都从零开始&#xff0c;因此库的存在意义非同寻常。 本质上来说库是一种可执行代码的二进制形式&#xff0c;可以被操作…

基于”PLUS模型+“生态系统服务多情景模拟预测实践技术应用

生态系统服务是人类直接或间接从生态系统中获得的惠益&#xff0c;在应对城市挑战和实施可持续发展方面发挥着至关重要的作用。随着全球城市化的快速发展, 频繁的人类活动导致了土地利用的快速变化&#xff0c;导致生态系统结构和功能的变化&#xff0c;影响生态系统服务的供应…

【Nginx】Docker配置ngnix,实现同服务器ip多站点多域名

Docker配置ngnix&#xff0c;实现同服务器ip&#xff0c;多域名映射多站点 本文首发于 慕雪的寒舍 1.说明 一般情况下&#xff0c;我们的域名映射到ip后&#xff0c;默认访问的是80端口。如果你的服务器只部署了一个服务&#xff0c;这样也是够用的。 但是很多项目对性能的占…

CAN总线详细介绍

1.1 CAN是什么&#xff1f; CAN 最终成为国际标准 &#xff08; ISO11898(高速应用)和 ISO11519&#xff08;低速应用&#xff09;&#xff09;&#xff0c;是国际上应用最广泛的现场总线之一。 1.2 CAN总线特点 多主方式: 可以多主方式工作&#xff0c;网络上任意一个节点…

前端学习第一阶段——第五章(上)

5-1 CSS基本选择器 01-CSS层叠样式表导读 02-CSS简介 03-体验CSS语法规范 04-CSS代码风格 05-CSS选择器的作用 06-标签选择器 07-类选择器 08-使用类选择器画盒子 09-类选择器特殊使用-多类名 10-id选择器 11-通配符选择器 5-2 CSS样式 12-font-family设置字体系列 13-font-s…

商场技术点-3

1.后端服务校验 1.1JSR-303介绍 JSR是Java Specification Requests的缩写&#xff0c;意思是Java 规范提案。是指向JCP(Java Community Process)提出新增一个标准化技术规范的正式请求。任何人都可以提交JSR&#xff0c;以向Java平台增添新的API和服务。JSR已成为Java界的一个…

springboot项目配置文件加密

1背景&#xff1a; springboot项目中要求不能采用明文密码&#xff0c;故采用配置文件加密. 目前采用有密码的有redis nacos rabbitmq mysql 这些配置文件 2技术 2.1 redis nacos rabbitmq 配置文件加密 采用加密方式是jasypt 加密 2.1.1 加密步骤 2.1.2 引入maven依赖 …

Android进阶之路 - StringUtils、NumberUtils 场景源码

忘记是在去年还是前年的时候遇到一个需要检测所传字符串是否为数字的场景&#xff0c;开始使用 NumberUtils.isNumber() 提示错误 &#xff0c;没有解决问题&#xff08;可能是因为依赖版本导致&#xff09;&#xff0c;最后使用的是StringUtils.isNumeric()&#xff0c;当时关…

剑指 Offer 43. 1~n 整数中 1 出现的次数

题目 输入一个整数 n &#xff0c;求1&#xff5e;n这n个整数的十进制表示中1出现的次数。 例如&#xff0c;输入12&#xff0c;1&#xff5e;12这些整数中包含1 的数字有1、10、11和12&#xff0c;1一共出现了5次。 思路 要求出小于等于 n 的非负整数中数字 1 出现的个数…

Prometheus系列(五)grafana web 配置邮件告警

目录 1. contact points&#xff08;创建告警渠道&#xff09; 2. Notification policies&#xff08;创建告警通道匹配规则&#xff09; 3. Alert rules&#xff08;配置告警策略&#xff09; 告警配置 告警页面名词解释&#xff1a; 1. contact points&#xff08;创建告…

玩转数据结构之Java实现线段树

前言 线段树是一种二叉搜索树&#xff0c;线段树的每个结点都存储了一个区间&#xff0c;也可以理解成一个线段&#xff0c;在这些线段上进行搜索操作得到你想要的答案。 线段树的适用范围很广&#xff0c;可以在线维护修改以及查询区间上的最值&#xff0c;求和。更可以扩充到…