背景
之前写过一篇《Kafka+PostgreSql,构建一个总线服务》,近期在实践过程中又踩了一些坑,有了一些新的体验,拿出来再说道说道。
我们说EventBus 是一种设计模式和编程工具,它简化了应用程序组件之间的通信。通过使用事件总线(Event Bus),开发者可以解耦发布者和订阅者,使得系统更加模块化、易于维护和扩展。
而CAP是EventBus的一个实现库,也是一种处理分布式事务的解决方案,我这里的消息传输层使用的kafka,消息的持久化存储使用的PostgreSQL。
更多关于CAP的信息,可以看官方文档
注意,这里的CAP是库的名称,广义上的CAP还分别代表了
C-一致性
A-可用性
P-分区容错性
一般分布式的解决方案,都没办法同时满足这三点,到了应用层这个级别基本都是优先满足AP,而放弃C,但可以保证最终一致性。
这不是本文讨论的重点,更多关于CAP的内容大家可以自行GPT一下。
场景介绍
还是以近期开发的考试系统为例,它包含了很多非常适合使用总线机制的场景。这里我就举一个比较典型的例子——保存草稿。
这是一个典型的高并发场景,大量的考生用户完成了前置的考试验证环节后,开始在线考试,考试过程中,答题记录除了会在客户端进行临时存储,还会每隔一段时间自动提交一次草稿到服务端,这样用户在考试过程中如果出现了特殊情况,需要更换设备,也能尽可能保证之前的作答记录不丢失。
之所以要引入EventBus的机制,主要还是应对高并发场景,我这里画两个图来展示下引入总线机制之前和之后的两种草稿保存流程。
引入总线机制前
引入总线机制前,业务的处理流程非常简单直接,但数据的保存高度依赖数据库性能,即便使用了线程池的技术,提高了系统的处理能力,但数据库性能上不来依然容易出现性能瓶颈,甚至崩溃卡死的现象,是系统可用性大大降低。所以小规模数据量,采用简单架构没毛病,但当数据规模提高以后,就要重新考虑了
引入总线机制后
改用发布订阅机制后,引入了Kafka,redis等中间件,看起来像是增加了业务的复杂性,但却使草稿的保存变成了一个可线性执行的流程,让数据库也有了喘息之机。而且,从体验层面来说,用户保存草稿的操作变成了一个无感操作,把草稿提交到服务端后,会立刻返回结果,不用等服务端处理完成再进行后续流程,业务解耦,性能,可用性和易操作性,一下子就都提高了。
关键代码
按照上面的思路,首先给出消息发布者的主要代码
发布者部分
[HttpPost, ValidateAntiForgeryToken]
public async Task<IActionResult> SubmitDraft(SubmitMyAnswerDto dto)
{
if (!string.IsNullOrEmpty(dto.submitAnswerStr) && dto.submitAnswerStr == "null")
dto.submitAnswerStr = "";
Assistant.Logger.Warning($"{DateTime.Now}:发布事务--保存草稿");
if (string.IsNullOrEmpty(dto.submitAnswerStr))
dto.submitAnswerStr = "[]";
if (dto.submitAnswerStr.Length > CapConsts.CapMsgMaxLength)
{
Assistant.Logger.Warning($"{DateTime.Now}:发布事务--间接保存答案");
await _redisCachingProvider.StringSetAsync("longAnswer" + dto.recordId, dto.submitAnswerStr, TimeSpan.FromSeconds(60));
dto.submitAnswerStr = "";
}
await _capPublisher.PublishAsync(CapConsts.ClientPrefix + "SaveDraft", dto);
await Task.Run(async () =>
{
if (!await _redisCachingProvider.KeyExistsAsync(dto.recordId.ToString()))
{
var record = await _userAnswerRecordClientRepo.GetMyRecord(dto.recordId);
await _redisCachingProvider.StringSetAsync(dto.recordId.ToString(),
JsonHelper.JsonSerialize(record),
TimeSpan.FromSeconds(120));
}
});
return Json(_resp.success(
await _redisCachingProvider.StringGetAsync(dto.recordId.ToString())));
}
消费者部分
[NonAction]
[CapSubscribe(CapConsts.ClientPrefix + "SaveDraft")]
public async Task SaveDraft(SubmitMyAnswerDto dto, [FromCap] CapHeader header)
{
Logger.Warning($"{DateTime.Now}:消费事务---模拟保存答案");
if(await _redisCachingProvider.HExistsAsync(CapConsts.MsgIdCacheClientName, header["cap-msg-id"]))
{
Logger.Warning("已消费");
return;
}
//Logger.Warning(System.Text.Json.JsonSerializer.Serialize(header));
await Task.Run(async () =>
{
if (await _userAnswerRecordClientRepo.getAnyAsync(u => u.Id == dto.sid))
{
await _userAnswerRecordClientRepo.addItemAsync(new DbServices.Entities.UserRecordLog()
{
Sid = dto.sid,
SubmitAnswer = dto.answer,
CapMsgId = header["cap-msg-id"]??"",
CapInstance = header["cap-exec-instance-id"]??"",
CapSenttime = header["cap-senttime"] ?? ""
});
}
await _userAnswerRecordClientRepo.insertOrUpdateAsync(new DbServices.Entities.UserRecordLog()
{
Id = dto.sid,
SubmitAnswer = dto.answer,
UpdatedAt = DateTime.Now
});
});
}
简单解释下,在发布消息的部分,我是用了Redis来临时存储非常大的草稿内容,目的是避免产生发布消息时出错的情况,我这里持久化队列消息的数据库和我的业务库是分开的,消息库使用的是PostgreSQL,而CAP在初始化的时候,会在PG里创建两个索引,如下
当在队列里传输的参数过长时,就会触发pg的索引机制,像这样
事实上,把保存队列消息的数据库切换为mssql,mysql之类可能就没有这个问题了(mssql是ok的,mysql我没试过),或者手动把索引删掉也行,但这毕竟是一个造成系统不稳定的点,还是规避一下比较好。
所以我引入了redis,检测到提交的草稿字段非常长之后,就把它临时放到redis里中转一下,避免了消息队列里传输的内容过大影响性能和可用性。
然后看一下处理性能👇
服务器配置:windows server 2019,16G内存,Inter Xeon E5-2640 4核2.4GHz
说到这里,我们还有一个场景涉及到了这种大参数在消息队列里传输,就是组卷。
组卷的场景就是,比如数学类考试的组数学的科目,再根据小,初,高等不同级别,设定不同题型,难度等,还可以设定一些特殊的组别,比如尖子生等等,给他们打上一个自定义的标签之类,自由度还是挺高的,这个组卷的界面大概是这样的👇
虽然这个没有并发的问题,但还是想拿出来介绍一下,因为解决的办法还是不一样的,看下代码
Assistant.Logger.Debug("开始抽题喽");
int i = 1;
foreach (var item in paperIds.Chunk(20))
{
Assistant.Logger.Debug($"第{i}把") ;
Assistant.Logger.Debug(string.Join(',', item));
i++;
if (!string.IsNullOrEmpty(model.tags))
{
TagPaperDto dto = new TagPaperDto()
{
PaperIds = item,
Tags = model.tags
};
await _capBus.PublishAsync(CapConsts.PREFIX + "BuildPaperTagRelation", dto, model.adminId);
}
await _capBus.PublishAsync(CapConsts.PREFIX + "GeneratePaper", item, model.adminId);
}
return Json(_resp.success(paperIds,"组卷成功,请回到列表页检查是否符合要求"));
这段代码,是发布者的代码,前置的操作是设置组卷规则,比如用户针对某场考试组了500套试卷,每套卷的题目都非常多,如果都在主线程完成不仅会阻塞页面加载,还会影响系统整体性能,尤其当多个管理员同时组卷时,很容易出现问题。
这时就还是遇到了那个大参数的问题,对应的处理逻辑还是先把试卷创建出来,然后在把组卷规则以参数形式发布到消息队列,有订阅者订阅到之后,线性的创建多套试卷。因为这里传递的参数是试卷id的数组,所以这里用到了linq里的chunk方法,分段发布消息,等于一个大消息,分成了多个小消息,既避免了引入新的中间件,也能显著提高处理性能,是另外一种解决消息队列传递大参数时的解决方案。
我这里生成50套卷,按代码里分割消息的规则,20套发布一条消息,总共会发布3条消息,分布执行完成。
执行效果如下
发布消息是控制台打印的信息
消费完成时打印的信息
总结
好了,基本就是这些啦,可能有人会说,一个简单的保存逻辑,至于搞这么复杂吗,引入了这么多中间件,不会让系统变得更不可控吗,还是直接一把梭代码加数据库更好。
我觉得,合适不合适,一定要是建立在实践过的基础上,如果你试都没试过,仅仅凭着感觉就否定或者肯定某些方案,那肯定是不理性的。
如果你对系统架构相关的知识足够了解,也有实践经验,可以得出简单架构能够满足现在以及将来一定时间内的业务需求,那确实是没必要折腾,毕竟业务第一,系统能快速投入使用,变现盈利是第一要务!
可如果你只会简单架构,而且你的简单架构已经难以应对越来越复杂的使用场景,那就一定要优化调整。放下思想包袱,好好琢磨,好好提升才是第一要务。
稳定,固化的代码只是一个相对的状态,随着技术的更迭,代码的更新和迭代也应该是时时刻刻都在进行的,有时候总想着怎么简单怎么来,这也不想动,那也不想改,那现实可能也就怎么简单怎么回报你了,这也少给点,那也少给点。
在软件行业,如果说业务为王,那技术应该为王后。国不能无君,亦不能无后~