介绍
github地址
CAP 是一个基于 .NET Standard 的 C# 库,它是一种处理分布式事务的解决方案,同样具有 EventBus 的功能,它具有轻量级、易使用、高性能等特点。
新建项目
新建.net7web项目
安装依赖包
安装软件
安装redis和Sql Server
修改代码
新建RedisConfigModel
namespace CAPStu01.Models;
public class RedisConfigModel
{
/// <summary>
/// 服务器地址
/// </summary>
public string Host { get; set; }
/// <summary>
/// 端口号
/// </summary>
public int Port { get; set; }
/// <summary>
/// 密码
/// </summary>
public string Pwd { get; set; }
}
修改appsettings.json
{
"Logging": {
"LogLevel": {
"Default": "Information",
"Microsoft.AspNetCore": "Warning"
}
},
"ConnectionStrings": {
"SQlServer": "server=127.0.0.1;User ID=sa;Password=xxxx;database=capstu;Encrypt=True;TrustServerCertificate=True;connection timeout=600;"
},
"RedisConfig": {
"Host": "127.0.0.1",
"Port": 6379,
"Pwd": ""
}
}
修改Program.cs
using CAPStu01.Models;
var builder = WebApplication.CreateBuilder(args);
builder.Services.AddControllers();
var redisConfig = builder.Configuration.GetSection("RedisConfig").Get<RedisConfigModel>();
var connectionStr = builder.Configuration.GetConnectionString("SQlServer") ?? "";
builder.Services.AddCap(x =>
{
x.UseRedis(options =>
{
if (options.Configuration != null && redisConfig != null)
{
options.Configuration.EndPoints.Add(redisConfig.Host, redisConfig.Port);
options.Configuration.Password = redisConfig?.Pwd ?? "";
}
});
x.UseSqlServer(sqlServerOptions =>
{
sqlServerOptions.Schema = "dbo";
sqlServerOptions.ConnectionString = connectionStr;
});
//开启面板
x.UseDashboard(d =>
{
//允许匿名访问
d.AllowAnonymousExplicit = true;
});
});
var app = builder.Build();
app.UseRouting();
app.MapControllers();
app.Run();
新建HomeController
using DotNetCore.CAP;
using Microsoft.AspNetCore.Mvc;
namespace CAPStu01.Controllers;
[ApiController]
public class HomeController:ControllerBase
{
public HomeController()
{
}
/// <summary>
/// 发送消息
/// </summary>
/// <returns></returns>
[HttpGet("/")]
public IActionResult Index([FromServices]ICapPublisher capBus)
{
capBus.Publish("test.show.time","你好,CAP");
return Content("发送消息成功");
}
/// <summary>
/// 接受消息
/// </summary>
/// <param name="data"></param>
[NonAction]
[CapSubscribe("test.show.time")]
public void ReceiveMessage(string data)
{
Console.WriteLine("message data is:" + data);
}
}
结果
如果使用redis需要定期清理streams内容
安装freeredis,修改Program.cs
builder.Services.AddSingleton<IRedisClient>(new RedisClient($"{redisConfig.Host}:{redisConfig.Port},password={redisConfig.Pwd},defaultDatabase=0"));
新增清除方法
private readonly IRedisClient _redisClient;
public HomeController(IRedisClient redisClient)
{
_redisClient = redisClient;
}
/// <summary>
/// 清除已处理的redis数据
/// </summary>
/// <returns></returns>
[HttpGet("/clear")]
public IActionResult ClearAckStream()
{
var groups = _redisClient.XInfoGroups("test.show.time");
var unreandMsgs = new List<string>();
//获取所有的未读消息
foreach (var group in groups)
{
if (group.pending > 0)
{
//有未读消息
var unReadList = _redisClient.XPending("test.show.time", group.name);
if (unReadList.count > 0)
{
var groupInfo = _redisClient.XPending("test.show.time", group.name);
var unreandList = _redisClient.XPending("test.show.time", group.name, groupInfo.minId, groupInfo.maxId,
groupInfo.count);
foreach (var unre in unreandList)
{
unreandMsgs.Add(unre.id);
}
}
}
}
//获取全部的消息
var allMsgs = _redisClient.XRange("test.show.time", "-", "+");
foreach (var msg in allMsgs)
{
if (unreandMsgs.Contains(msg.id))
{
//这个消息未读则跳过
continue;
}
//删除已处理的消息
_redisClient.XDel("test.show.time", msg.id);
}
return Content($"共处理未读消息:{unreandMsgs.Count}个,已读消息{allMsgs.Length}个");
}