文章目录
- 背景
- 尝试
- 方案
- 使用IHubContext上下文进行处理
- 第一步 创建一个类用于处理上下文(WarningBroadcast)
- 第二步:如何实例化这个对象呢
- 下面是我实现的方式
- 第三步:使用扩展类
- 调用通知
- 关于MessageBus的使用
- demo
- 发布消息
- 订阅消息
背景
当服务端进行数据处理时(数据可能来源于非人工操作,比如各种控制器和信号源),需要将处理结果推送给客户端,进行实时数据显示。
尝试
在上一篇文章中我们创建过服务端,代码吗如下
using Microsoft.AspNetCore.SignalR;
namespace Demo_WarningMonitor.Api.SignalRHubs
{
/// <summary>
/// 报警广播
/// </summary>
public class WarningBroadcastHub : Hub
{
private ILogger<WarningBroadcastHub> logger;
public WarningBroadcastHub(ILogger<WarningBroadcastHub> logger, IConfiguration configuration)
{
this.logger = logger;
}
public async Task SendMessage(string user, string message)
{
try
{
await Clients.All.SendAsync("ReceiveMessage", user, message);
}
catch (Exception ex)
{
logger.LogError(ex,"发送消息出现异常");
}
}
}
}
我想这直接在数据处理服务中i性能调用SendMessage方法就能实现发送消息了,
但是事与愿违,因为这里涉及到一个对象声明周期的问题,在调用Clients.All的时候会提示调用了一个已经释放的资源对象,具体错误如下:System.ObjectDisposedException:"Cannot access a disposed object.
方案
使用IHubContext上下文进行处理
https://learn.microsoft.com/zh-cn/aspnet/core/signalr/hubcontext?view=aspnetcore-6.0
第一步 创建一个类用于处理上下文(WarningBroadcast)
我这里使用了MessageBus的一个中间件,这样就实现了解耦,Messagebus的nuget包
using Demo_WarningMonitor.Api.Models;
using iml6yu.MessageBus;
using iml6yu.MessageBus.Constand;
using Microsoft.AspNetCore.SignalR;
namespace Demo_WarningMonitor.Api.SignalRHubs
{
public class WarningBroadcast
{
private ISubscriber<WarningMessage> warningSubscriber;
private ISubscriber<string> realtimeDataSubscriber;
private IConfiguration configuration;
private string warningChannel;
private ILogger<WarningBroadcast> logger;
private IHubContext<WarningBroadcastHub> hubContext;//
public WarningBroadcast(IConfiguration configuration, IHubContext<WarningBroadcastHub> context, ILogger<WarningBroadcast> logger)//<WarningBroadcastHub>
{
hubContext = context;
this.logger = logger;
this.configuration = configuration;
warningChannel = configuration.GetSection("WarningChannel").Get<string>();
warningSubscriber = MessageBuses.Subscrib<WarningMessage>(warningChannel);
warningSubscriber.OnNoticed += Subscriber_OnNoticed;
realtimeDataSubscriber = MessageBuses.Subscrib<string>(configuration.GetSection("RealtimeDataChannel").Get<string>());
realtimeDataSubscriber.OnNoticed += RealtimeDataSubscriber_OnNoticed;
}
private async void RealtimeDataSubscriber_OnNoticed(MessageChannel channel, MessageBusArges<string> message)
{
try
{
await hubContext.Clients.All.SendAsync("ReceiveMessage", "HubServiceCenter", message.Data);
}
catch (Exception ex)
{
logger.LogError(ex, "WarningBroadcast发送消息出现异常");
}
}
/// <summary>
/// 接收到消息的通知后进行分发处理
/// </summary>
/// <param name="channel"></param>
/// <param name="message"></param>
/// <exception cref="NotImplementedException"></exception>
private async void Subscriber_OnNoticed(MessageChannel channel, MessageBusArges<WarningMessage> message)
{
try
{
Console.ForegroundColor = ConsoleColor.Red;
var msg = System.Text.Json.JsonSerializer.Serialize(message.Data);
await hubContext.Clients.All.SendAsync("WarningMessage", "HubServiceCenter", msg);
Console.WriteLine("发送一次报警信息\r\n" + msg);
Console.ForegroundColor = ConsoleColor.White;
}
catch (Exception ex)
{
logger.LogError(ex, "WarningBroadcast发送消息出现异常");
}
}
}
}
这里的重点就是构造函数
第二步:如何实例化这个对象呢
这里涉及到如何获取IHubContext
实例,可以查看文章
https://learn.microsoft.com/zh-cn/aspnet/core/signalr/hubcontext?view=aspnetcore-6.0
下面是我实现的方式
var hubContext = app.Services.GetService<IHubContext<WarningBroadcastHub>>();
if (hubContext != null)
{
new WarningBroadcast(app.Services.GetService<IConfiguration>(), hubContext,app.Services.GetService<ILogger<WarningBroadcast>>());
}
对此我新建了一个扩展类,完整代码如下
using Microsoft.AspNetCore.SignalR;
namespace Demo_WarningMonitor.Api.SignalRHubs
{
/// <summary>
/// 扩展类
/// </summary>
public static class WarningBroadcastExt
{
public static WebApplication UseWarningBroadcast(this WebApplication app)
{
var hubContext = app.Services.GetService<IHubContext<WarningBroadcastHub>>();
if (hubContext != null)
{
new WarningBroadcast(app.Services.GetService<IConfiguration>(), hubContext,app.Services.GetService<ILogger<WarningBroadcast>>());
}
return app;
}
}
}
第三步:使用扩展类
在main函数中使用
app.UseWarningBroadcast();
调用通知
在需要发送给客户端消息的时候使用
MessageBuses.Publish 推送一个消息,订阅者直接收到消息后就会调用await hubContext.Clients.All.SendAsync(“WarningMessage”, “HubServiceCenter”, msg); 给客户端发送消息了
关于MessageBus的使用
demo
发布消息
MessageBuses.Publish<MyClass1>(new MyClass1());
订阅消息
ISubscriber<MyClass1> subscriber;
subscriber = MessageBuses.Subscrib<MyClass1>("newWin");
subscriber.OnNoticed += (channel,e)=>{
Debug.WriteLine(e.Data.ToString())
};