首先我们需要了解到分布式事件总线是什么;
分布式事件总线是一种在分布式系统中提供事件通知、订阅和发布机制的技术。它允许多个组件或微服务之间的协作和通信,而无需直接耦合或了解彼此的实现细节。通过事件总线,组件或微服务可以通过发布或订阅事件来实现异步通信。
例如,当一个组件完成了某项任务并生成了一个事件,它可以通过事件总线发布该事件。其他相关组件可以通过订阅该事件来接收通知,并做出相应的反应。这样,组件之间的耦合就被减轻了,同时也提高了系统的可维护性和可扩展性。
然后了解一下RabbitMQ
RabbitMQ是一种开源的消息代理和队列管理系统,用于在分布式系统中进行异步通信。它的主要功能是接收和分发消息,并且支持多种协议,包括AMQP,STOMP,MQTT等。RabbitMQ通过一个中间层,可以把消息发送者与消息接收者隔离开来,因此消息发送者和消息接收者并不需要在同一时刻在线,并且也不需要互相知道对方的地址。
RabbitMQ的主要功能包括:
消息存储:RabbitMQ可以将消息存储在内存或硬盘上,以保证消息的完整性。
消息路由:RabbitMQ支持消息的路由功能,可以将消息从生产者发送到消费者。
消息投递:RabbitMQ提供了多种消息投递策略,包括简单模式、工作队列、发布/订阅模式等。
可靠性:RabbitMQ保证消息的可靠性,即消息不会丢失、不重复、按顺序投递。
可扩展性:RabbitMQ支持水平扩展,可以通过增加节点来扩展系统的处理能力。
本文将讲解使用RabbitMQ实现分布式事件
实现我们创建一个EventsBus.Contract的类库项目,用于提供基本接口,以支持其他实现
在项目中添加以下依赖引用,并且记得添加EventsBus.Contract项目引用
<ItemGroup>
<PackageReferenceInclude="Microsoft.Extensions.DependencyInjection.Abstractions"Version="7.0.0" /><PackageReferenceInclude="Microsoft.Extensions.Options"Version="7.0.0" /><PackageReferenceInclude="Microsoft.Extensions.Options.ConfigurationExtensions"Version="7.0.0" /><PackageReferenceInclude="RabbitMQ.Client"Version="6.4.0" /></ItemGroup>
创建项目完成以后分别创建EventsBusOptions.cs,IEventsBusHandle.cs,RabbitMQEventsManage.cs,ILoadEventBus.cs ,提供我们的分布式事件基本接口定义
EventsBusOptions.cs:
namespace EventsBus.Contract;
public classEventsBusOptions
{/// <summary>/// 接收时异常事件/// </summary>
public static Action<IServiceProvider, Exception,byte[]>? ReceiveExceptionEvent;
}
IEventsBusHandle.cs:
namespace EventsBus.Contract;
public interface IEventsBusHandle<in TEto> where TEto : class
{
Task HandleAsync(TEto eventData);
}
ILoadEventBus.cs:
namespace EventsBus.Contract;
public interface ILoadEventBus
{
/// <summary>/// 发布事件/// </summary>/// <param name="eto"></param>/// <typeparam name="TEto"></typeparam>/// <returns></returns>
Task PushAsync<TEto>(TEto eto) where TEto : class;
}
EventsBusAttribute.cs:用于Eto(Eto 是我们按照约定使用的Event Transfer Objects(事件传输对象)的后缀. s虽然这不是必需的,但我们发现识别这样的事件类很有用(就像应用层上的DTO 一样))的名称,对应到RabbitMQ的通道
namespace EventsBus.RabbitMQ;
[AttributeUsage(AttributeTargets.Class)]
public classEventsBusAttribute : Attribute
{
public readonly string Name;
public EventsBusAttribute(string name)
{
Name = name;
}
}
然后可以创建我们的RabbitMQ实现了,创建EventsBus.RabbitMQ类库项目,用于编写EventsBus.Contract的RabbitMQ实现
创建项目完成以后分别创建Extensions\EventsBusRabbitMQExtensions.cs,Options\RabbitMQOptions.cs,EventsBusAttribute.cs,,RabbitMQFactory.cs,RabbitMQLoadEventBus.cs
Extensions\EventsBusRabbitMQExtensions.cs:提供我们RabbitMQ扩展方法让使用者更轻松的注入,命名空间使用Microsoft.Extensions.DependencyInjection,这样就在注入的时候减少过度使用命名空间了
using EventsBus.Contract;
using EventsBus.RabbitMQ;
using EventsBus.RabbitMQ.Options;
using Microsoft.Extensions.Configuration;
namespace Microsoft.Extensions.DependencyInjection;
public staticclassEventsBusRabbitMQExtensions
{
public static IServiceCollection AddEventsBusRabbitMQ(this IServiceCollection services,
IConfiguration configuration)
{
services.AddSingleton<RabbitMQFactory>();
services.AddSingleton(typeof(RabbitMQEventsManage<>));
services.Configure<RabbitMQOptions>(configuration.GetSection(nameof(RabbitMQOptions)));
services.AddSingleton<ILoadEventBus, RabbitMQLoadEventBus>();
return services;
}
}
Options\RabbitMQOptions.cs:提供基本的Options 读取配置文件中并且注入,services.Configure<RabbitMQOptions>(configuration.GetSection(nameof(RabbitMQOptions)));的方法是读取IConfiguration的名称为RabbitMQOptions的配置东西,映射到Options中,具体使用往下看。
using RabbitMQ.Client;
namespace EventsBus.RabbitMQ.Options;
public classRabbitMQOptions
{/// <summary>/// 要连接的端口。 <see cref="AmqpTcpEndpoint.UseDefaultPort"/>/// 指示应使用的协议的缺省值。/// </summary>
public int Port { get; set; } = AmqpTcpEndpoint.UseDefaultPort;
/// <summary>/// 地址/// </summary>
public string HostName { get; set; }
/// <summary>/// 账号/// </summary>
public string UserName { get; set; }
/// <summary>/// 密码/// </summary>
public string Password { get; set; }
}
RabbitMQEventsManage.cs:用于管理RabbitMQ的数据接收,并且将数据传输到指定的事件处理程序
using System.Reflection;
using System.Text.Json;
using EventsBus.Contract;
using Microsoft.Extensions.DependencyInjection;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
namespace EventsBus.RabbitMQ;
public classRabbitMQEventsManage<TEto> where TEto : class
{
private readonly IServiceProvider _serviceProvider;
private readonly RabbitMQFactory _rabbitMqFactory;
public RabbitMQEventsManage(IServiceProvider serviceProvider, RabbitMQFactory rabbitMqFactory)
{
_serviceProvider = serviceProvider;
_rabbitMqFactory = rabbitMqFactory;
_ = Task.Run(Start);
}
private voidStart()
{
var channel = _rabbitMqFactory.CreateRabbitMQ();
var eventBus = typeof(TEto).GetCustomAttribute<EventsBusAttribute>();
var name = eventBus?.Name ?? typeof(TEto).Name;
channel.QueueDeclare(name, false, false, false, null);
var consumer = new EventingBasicConsumer(channel); //消费者
channel.BasicConsume(name, true, consumer); //消费消息
consumer.Received += async (model, ea) =>
{
var bytes = ea.Body.ToArray();
try
{
// 这样就可以实现多个订阅
var events = _serviceProvider.GetServices<IEventsBusHandle<TEto>>();
foreach (var handle in events)
{
await handle?.HandleAsync(JsonSerializer.Deserialize<TEto>(bytes));
}
}
catch (Exception e)
{
EventsBusOptions.ReceiveExceptionEvent?.Invoke(_serviceProvider, e, bytes);
}
};
}
}
RabbitMQFactory.cs:提供RabbitMQ链接工厂,在这里你可以自己去定义和管理RabbitMQ工厂
using EventsBus.RabbitMQ.Options;
using Microsoft.Extensions.Options;
using RabbitMQ.Client;
namespace EventsBus.RabbitMQ;
public classRabbitMQFactory : IDisposable
{
private readonly RabbitMQOptions _options;
private readonly ConnectionFactory _factory;
private IConnection? _connection;
public RabbitMQFactory(IOptions<RabbitMQOptions> options)
{
_options = options?.Value;
// 将Options中的参数添加到ConnectionFactory
_factory = new ConnectionFactory
{
HostName = _options.HostName,
UserName = _options.UserName,
Password = _options.Password,
Port = _options.Port
};
}
public IModel CreateRabbitMQ()
{
// 当第一次创建RabbitMQ的时候进行链接
_connection ??= _factory.CreateConnection();
return _connection.CreateModel();
}
public voidDispose()
{
_connection?.Dispose();
}
}
RabbitMQLoadEventBus.cs:用于实现ILoadEventBus.cs通过ILoadEventBus发布事件RabbitMQLoadEventBus.cs是RabbitMQ的实现
using System.Reflection;
using System.Text.Json;
using EventsBus.Contract;
using Microsoft.Extensions.DependencyInjection;
namespace EventsBus.RabbitMQ;
public classRabbitMQLoadEventBus : ILoadEventBus
{
private readonly IServiceProvider _serviceProvider;
private readonly RabbitMQFactory _rabbitMqFactory;
public RabbitMQLoadEventBus(IServiceProvider serviceProvider, RabbitMQFactory rabbitMqFactory)
{
_serviceProvider = serviceProvider;
_rabbitMqFactory = rabbitMqFactory;
}
public async Task PushAsync<TEto>(TEto eto) where TEto : class
{//创建一个通道//这里Rabbit的玩法就是一个通道channel下包含多个队列Queue
using var channel = _rabbitMqFactory.CreateRabbitMQ();
// 获取Eto中的EventsBusAttribute特性,获取名称,如果没有默认使用类名称
var eventBus = typeof(TEto).GetCustomAttribute<EventsBusAttribute>();
var name = eventBus?.Name ?? typeof(TEto).Name;
// 使用获取的名称创建一个通道
channel.QueueDeclare(name, false, false, false, null);
var properties = channel.CreateBasicProperties();
properties.DeliveryMode = 1;
// 将数据序列号,然后发布
channel.BasicPublish("", name, false, properties, JsonSerializer.SerializeToUtf8Bytes(eto)); //生产消息// 让其注入启动管理服务,RabbitMQEventsManage需要手动激活,由于RabbitMQEventsManage是单例,只有第一次激活才有效,
var eventsManage = _serviceProvider.GetService<RabbitMQEventsManage<TEto>>();
await Task.CompletedTask;
}
}
在这里我们的RabbitMQ分布式事件就设计完成了,注:这只是简单的一个示例,并未经过大量测试,请勿直接在生产使用;
然后我们需要使用RabbitMQ分布式事件总线工具包
使用RabbitMQ分布式事件总线的示例
首先我们需要准备一个RabbitMQ,可以在官网自行下载,我就先使用简单的,通过docker compose启动一个RabbitMQ,下面提供一个compose文件
version:'3.1'services:rabbitmq:restart:always# 开机自启image:rabbitmq:3.11-management# RabbitMQ使用的镜像container_name:rabbitmq# docker名称hostname:rabbitports:-5672:5672# 只是RabbitMQ SDK使用的端口-15672:15672# 这是RabbitMQ管理界面使用的端口environment:TZ:Asia/Shanghai# 设置RabbitMQ时区RABBITMQ_DEFAULT_USER:token# rabbitMQ账号RABBITMQ_DEFAULT_PASS:dd666666# rabbitMQ密码volumes:-./data:/var/lib/rabbitmq
启动以后我们创建一个WebApi项目,项目名称Demo,创建完成打开项目文件添加引用
<ProjectSdk="Microsoft.NET.Sdk.Web"><PropertyGroup><TargetFramework>net7.0</TargetFramework><Nullable>enable</Nullable><ImplicitUsings>enable</ImplicitUsings></PropertyGroup><ItemGroup><PackageReferenceInclude="Microsoft.AspNetCore.OpenApi"Version="7.0.0" /><PackageReferenceInclude="Swashbuckle.AspNetCore"Version="6.4.0" /></ItemGroup><ItemGroup><!-- 引用RabbitMQ事件总线项目--><ProjectReferenceInclude="..\EventsBus.RabbitMQ\EventsBus.RabbitMQ.csproj" /></ItemGroup></Project>
修改appsettings.json配置文件:将RabbitMQ的配置写上,RabbitMQOptions名称对应在EventsBus.RabbitMQ中的RabbitMQOptions文件![image-20230211022801105]
在这里注入的时候将配置注入好了
{"Logging":{"LogLevel":{"Default":"Information","Microsoft.AspNetCore":"Warning"}},"AllowedHosts":"*","RabbitMQOptions":{"HostName":"127.0.0.1","UserName":"token","Password":"dd666666"}}
创建DemoEto.cs文件:
using EventsBus.RabbitMQ;
namespace Demo;
[EventsBus("Demo")]
public classDemoEto
{
public int Size { get; set; }
public string Value { get; set; }
}
创建DemoEventsBusHandle.cs文件:这里是订阅DemoEto事件,相当于是DemoEto的处理程序
using System.Text.Json;
using EventsBus.Contract;
namespace Demo;
/// <summary>/// 事件处理服务,相当于订阅事件/// </summary>
public classDemoEventsBusHandle : IEventsBusHandle<DemoEto>
{
public async Task HandleAsync(DemoEto eventData)
{
Console.WriteLine($"DemoEventsBusHandle: {JsonSerializer.Serialize(eventData)}");
await Task.CompletedTask;
}
}
打开Program.cs 修改代码: 在这里注入了事件总线服务,和我们的事件处理服务
using Demo;
using EventsBus.Contract;
var builder = WebApplication.CreateBuilder(args);
builder.Services.AddControllers();
builder.Services.AddEndpointsApiExplorer();
builder.Services.AddSwaggerGen();
// 注入事件处理服务
builder.Services.AddSingleton(typeof(IEventsBusHandle<DemoEto>),typeof(DemoEventsBusHandle));
// 注入RabbitMQ服务
builder.Services.AddEventsBusRabbitMQ(builder.Configuration);
var app = builder.Build();
// 只有在Development显示Swaggerif (app.Environment.IsDevelopment())
{
app.UseSwagger();
app.UseSwaggerUI();
}
// 强制Https
app.UseHttpsRedirection();
app.UseAuthorization();
app.MapControllers();
app.Run();
创建Controllers\EventBusController.cs控制器:我们在控制器中注入了ILoadEventBus ,通过调用接口实现发布事件;
using EventsBus.Contract;
using Microsoft.AspNetCore.Mvc;
namespace Demo.Controllers;
[ApiController]
[Route("[controller]")]
public classEventBusController : ControllerBase
{
private readonly ILoadEventBus _loadEventBus;
public EventBusController(ILoadEventBus loadEventBus)
{
_loadEventBus = loadEventBus;
}
/// <summary>/// 发送信息/// </summary>/// <param name="eto"></param>
[HttpPost]
public async Task Send(DemoEto eto)
{
await _loadEventBus.PushAsync(eto);
}
}
然后我们启动程序会打开Swagger调试界面:
然后我们发送一下事件:
我们可以看到,在数据发送的时候也同时订阅到了我们的信息,也可以通过分布式事件总线限流等实现,