再来聊聊总线机制

news2024/12/5 2:24:09

背景

之前写过一篇《Kafka+PostgreSql,构建一个总线服务》,近期在实践过程中又踩了一些坑,有了一些新的体验,拿出来再说道说道。

我们说EventBus 是一种设计模式和编程工具,它简化了应用程序组件之间的通信。通过使用事件总线(Event Bus),开发者可以解耦发布者和订阅者,使得系统更加模块化、易于维护和扩展。

而CAP是EventBus的一个实现库,也是一种处理分布式事务的解决方案,我这里的消息传输层使用的kafka,消息的持久化存储使用的PostgreSQL。

更多关于CAP的信息,可以看官方文档

注意,这里的CAP是库的名称,广义上的CAP还分别代表了

C-一致性

A-可用性

P-分区容错性

一般分布式的解决方案,都没办法同时满足这三点,到了应用层这个级别基本都是优先满足AP,而放弃C,但可以保证最终一致性。

这不是本文讨论的重点,更多关于CAP的内容大家可以自行GPT一下。

场景介绍

还是以近期开发的考试系统为例,它包含了很多非常适合使用总线机制的场景。这里我就举一个比较典型的例子——保存草稿。

这是一个典型的高并发场景,大量的考生用户完成了前置的考试验证环节后,开始在线考试,考试过程中,答题记录除了会在客户端进行临时存储,还会每隔一段时间自动提交一次草稿到服务端,这样用户在考试过程中如果出现了特殊情况,需要更换设备,也能尽可能保证之前的作答记录不丢失。

之所以要引入EventBus的机制,主要还是应对高并发场景,我这里画两个图来展示下引入总线机制之前和之后的两种草稿保存流程。

引入总线机制前

画板

引入总线机制前,业务的处理流程非常简单直接,但数据的保存高度依赖数据库性能,即便使用了线程池的技术,提高了系统的处理能力,但数据库性能上不来依然容易出现性能瓶颈,甚至崩溃卡死的现象,是系统可用性大大降低。所以小规模数据量,采用简单架构没毛病,但当数据规模提高以后,就要重新考虑了

引入总线机制后

画板

改用发布订阅机制后,引入了Kafka,redis等中间件,看起来像是增加了业务的复杂性,但却使草稿的保存变成了一个可线性执行的流程,让数据库也有了喘息之机。而且,从体验层面来说,用户保存草稿的操作变成了一个无感操作,把草稿提交到服务端后,会立刻返回结果,不用等服务端处理完成再进行后续流程,业务解耦,性能,可用性和易操作性,一下子就都提高了。

关键代码

按照上面的思路,首先给出消息发布者的主要代码

发布者部分

[HttpPost, ValidateAntiForgeryToken]
public async Task<IActionResult> SubmitDraft(SubmitMyAnswerDto dto)
{
    if (!string.IsNullOrEmpty(dto.submitAnswerStr) && dto.submitAnswerStr == "null")
        dto.submitAnswerStr = "";

    Assistant.Logger.Warning($"{DateTime.Now}:发布事务--保存草稿");
    if (string.IsNullOrEmpty(dto.submitAnswerStr))
        dto.submitAnswerStr = "[]";
    if (dto.submitAnswerStr.Length > CapConsts.CapMsgMaxLength)
    {
        Assistant.Logger.Warning($"{DateTime.Now}:发布事务--间接保存答案");
        await _redisCachingProvider.StringSetAsync("longAnswer" + dto.recordId, dto.submitAnswerStr, TimeSpan.FromSeconds(60));
        dto.submitAnswerStr = "";               
    }
    await _capPublisher.PublishAsync(CapConsts.ClientPrefix + "SaveDraft", dto);

    await Task.Run(async () =>
    {
        if (!await _redisCachingProvider.KeyExistsAsync(dto.recordId.ToString()))
        {
            var record = await _userAnswerRecordClientRepo.GetMyRecord(dto.recordId);
            await _redisCachingProvider.StringSetAsync(dto.recordId.ToString(),
                JsonHelper.JsonSerialize(record),
                TimeSpan.FromSeconds(120));
        }
    });    

    return Json(_resp.success(
            await _redisCachingProvider.StringGetAsync(dto.recordId.ToString())));
}

消费者部分

 [NonAction]
 [CapSubscribe(CapConsts.ClientPrefix + "SaveDraft")]
 public async Task SaveDraft(SubmitMyAnswerDto dto, [FromCap] CapHeader header)
 {
     Logger.Warning($"{DateTime.Now}:消费事务---模拟保存答案");
     if(await _redisCachingProvider.HExistsAsync(CapConsts.MsgIdCacheClientName, header["cap-msg-id"]))
     {
         Logger.Warning("已消费");
         return;
     }
     //Logger.Warning(System.Text.Json.JsonSerializer.Serialize(header));
     await Task.Run(async () =>
     {
         if (await _userAnswerRecordClientRepo.getAnyAsync(u => u.Id == dto.sid))
         {
             await _userAnswerRecordClientRepo.addItemAsync(new DbServices.Entities.UserRecordLog()
             {
                 Sid = dto.sid,
                 SubmitAnswer = dto.answer,
                 CapMsgId = header["cap-msg-id"]??"",
                 CapInstance = header["cap-exec-instance-id"]??"",
                 CapSenttime = header["cap-senttime"] ?? ""
             });

         }
         await _userAnswerRecordClientRepo.insertOrUpdateAsync(new DbServices.Entities.UserRecordLog()
         {
             Id = dto.sid,
             SubmitAnswer = dto.answer,
             UpdatedAt = DateTime.Now
         });
     });
 }

简单解释下,在发布消息的部分,我是用了Redis来临时存储非常大的草稿内容,目的是避免产生发布消息时出错的情况,我这里持久化队列消息的数据库和我的业务库是分开的,消息库使用的是PostgreSQL,而CAP在初始化的时候,会在PG里创建两个索引,如下

当在队列里传输的参数过长时,就会触发pg的索引机制,像这样

事实上,把保存队列消息的数据库切换为mssql,mysql之类可能就没有这个问题了(mssql是ok的,mysql我没试过),或者手动把索引删掉也行,但这毕竟是一个造成系统不稳定的点,还是规避一下比较好。

所以我引入了redis,检测到提交的草稿字段非常长之后,就把它临时放到redis里中转一下,避免了消息队列里传输的内容过大影响性能和可用性。

然后看一下处理性能👇

windows server 2019,16G内存,Inter Xeon E5-2640 4核2.4GHz
服务器配置:windows server 2019,16G内存,Inter Xeon E5-2640 4核2.4GHz

说到这里,我们还有一个场景涉及到了这种大参数在消息队列里传输,就是组卷。

组卷的场景就是,比如数学类考试的组数学的科目,再根据小,初,高等不同级别,设定不同题型,难度等,还可以设定一些特殊的组别,比如尖子生等等,给他们打上一个自定义的标签之类,自由度还是挺高的,这个组卷的界面大概是这样的👇

虽然这个没有并发的问题,但还是想拿出来介绍一下,因为解决的办法还是不一样的,看下代码

Assistant.Logger.Debug("开始抽题喽");
int i = 1;
foreach (var item in paperIds.Chunk(20))
{
    Assistant.Logger.Debug($"第{i}把") ;
    Assistant.Logger.Debug(string.Join(',', item));
    i++;
    if (!string.IsNullOrEmpty(model.tags))
    {
        TagPaperDto dto = new TagPaperDto()
        {
            PaperIds = item,
            Tags = model.tags
        };
        await _capBus.PublishAsync(CapConsts.PREFIX + "BuildPaperTagRelation", dto, model.adminId);
    }
    await _capBus.PublishAsync(CapConsts.PREFIX + "GeneratePaper", item, model.adminId);

}
return Json(_resp.success(paperIds,"组卷成功,请回到列表页检查是否符合要求"));

这段代码,是发布者的代码,前置的操作是设置组卷规则,比如用户针对某场考试组了500套试卷,每套卷的题目都非常多,如果都在主线程完成不仅会阻塞页面加载,还会影响系统整体性能,尤其当多个管理员同时组卷时,很容易出现问题。

这时就还是遇到了那个大参数的问题,对应的处理逻辑还是先把试卷创建出来,然后在把组卷规则以参数形式发布到消息队列,有订阅者订阅到之后,线性的创建多套试卷。因为这里传递的参数是试卷id的数组,所以这里用到了linq里的chunk方法,分段发布消息,等于一个大消息,分成了多个小消息,既避免了引入新的中间件,也能显著提高处理性能,是另外一种解决消息队列传递大参数时的解决方案。

我这里生成50套卷,按代码里分割消息的规则,20套发布一条消息,总共会发布3条消息,分布执行完成。

执行效果如下

发布消息是控制台打印的信息

消费完成时打印的信息

总结

好了,基本就是这些啦,可能有人会说,一个简单的保存逻辑,至于搞这么复杂吗,引入了这么多中间件,不会让系统变得更不可控吗,还是直接一把梭代码加数据库更好。

我觉得,合适不合适,一定要是建立在实践过的基础上,如果你试都没试过,仅仅凭着感觉就否定或者肯定某些方案,那肯定是不理性的。

如果你对系统架构相关的知识足够了解,也有实践经验,可以得出简单架构能够满足现在以及将来一定时间内的业务需求,那确实是没必要折腾,毕竟业务第一,系统能快速投入使用,变现盈利是第一要务!

可如果你只会简单架构,而且你的简单架构已经难以应对越来越复杂的使用场景,那就一定要优化调整。放下思想包袱,好好琢磨,好好提升才是第一要务。

稳定,固化的代码只是一个相对的状态,随着技术的更迭,代码的更新和迭代也应该是时时刻刻都在进行的,有时候总想着怎么简单怎么来,这也不想动,那也不想改,那现实可能也就怎么简单怎么回报你了,这也少给点,那也少给点。

在软件行业,如果说业务为王,那技术应该为王后。国不能无君,亦不能无后~

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2253356.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

怎么做DNS污染检测

DNS污染是指通过恶意手段篡改DNS解析结果&#xff0c;导致用户访问错误或恶意网站的行为。这种行为不仅影响用户体验&#xff0c;还可能带来安全风险。以下是几种检测DNS污染的方法&#xff1a; 1. 使用在线DNS检查工具 可以使用在线工具如帝恩思旗下的拨测在线DNS检测工具等…

视频融合×室内定位×数字孪生

随着物联网技术的迅猛发展&#xff0c;室内定位与视频融合技术在各行各业中得到了广泛应用。不仅能够提供精确的位置信息&#xff0c;还能通过实时视频监控实现全方位数据的可视化。 与此同时&#xff0c;数字孪生等技术的兴起为智慧城市、智慧工厂等应用提供了强大支持&#…

合规性要求对漏洞管理策略的影响

讨论漏洞管理中持续面临的挑战&#xff0c;包括确定漏洞的优先级和解决修补延迟问题。 介绍合规性要求以及自动化如何简化漏洞管理流程。 您认为为什么尽管技术不断进步&#xff0c;但优先考虑漏洞和修补延迟等挑战仍然存在&#xff1f; 企业基础设施日益复杂&#xff0c;攻…

基于Java Springboot诗词学习APP且微信小程序

一、作品包含 源码数据库设计文档万字PPT全套环境和工具资源部署教程 二、项目技术 前端技术&#xff1a;Html、Css、Js、Vue、Element-ui 数据库&#xff1a;MySQL 后端技术&#xff1a;Java、Spring Boot、MyBatis 三、运行环境 开发工具&#xff1a;IDEA/eclipse微信开…

常见问题QA的前端代码

这个的后端代码参见此文 使用语言向量建立常见问题的模糊搜索-CSDN博客https://blog.csdn.net/chenchihwen/article/details/144207262?spm1001.2014.3001.5501 这段代码实现了一个简单的问答页面&#xff0c;页面分为左右两部分&#xff0c;左侧用于展示对话记录&#xff0c…

CSS学习记录03

CSS背景 CSS 背景属性用于定义元素的背景效果。 CSS background-color background-color属性指定元素的背景色。 页面的背景色设置如下&#xff1a; body {background-color: lightblue; } 通过CSS&#xff0c;颜色通常由以下方式指定&#xff1a; 有效的颜色名称-比如“…

【k8s】kubelet 的相关证书

在 Kubernetes 集群中&#xff0c;kubelet 使用的证书通常存放在节点上的特定目录。这些证书用于 kubelet 与 API 服务器之间的安全通信。具体的位置可能会根据你的 Kubernetes 安装方式和配置有所不同&#xff0c;下图是我自己环境【通过 kubeadm 安装的集群】中的kubelet的证…

JavaWeb:Servlet (学习笔记)【1】

目录 一&#xff0c;Servlet介绍 1&#xff0c;简介 2&#xff0c;Servlet技术特点 3&#xff0c;Servlet在应用程序中的位置 4&#xff0c;Servlet在程序中到底处于一个什么地位? 二&#xff0c;servlet运行过程&#xff1a; 三&#xff0c;servlet路径配置 四&#x…

STM32-C语言基础知识

C语言基础知识 stdint.h简介 给寄存器某个位赋值 给位6赋值为1流程&#xff1a;先清0&#xff0c;再赋值 带参数的宏定义 建议使用do {…}while(0)来构造宏定义 条件编译 条件编译后面必须跟宏语句&#xff0c;如#if _LED_H 指针使用常见的2大问题 1、未初始化 2、越界使…

Android 应用单元测试涉及 Telephony 环境初始化问题

Telephony 相关类注入问题 SubscriptionManager Cannot invoke "android.telephony.SubscriptionManager.getActiveSubscriptionInfoList()" because "this.mSubscriptionManager" is nulljava.lang.NullPointerException: Cannot invoke "android.t…

mysql 存储结构的进化之路

文章目录 前言一、线性结构二、二叉树&#xff08;BST&#xff09;三、平衡二叉树&#xff08;AVL&#xff09;四、多路平衡查找树&#xff08;B Tree&#xff09;五、加强版多路平衡查找树&#xff08;B Tree&#xff09;总结 前言 树形结构是一种具有层次关系的数据结构&…

高速定向广播声光预警系统赋能高速安全管控

近年来&#xff0c;高速重大交通事故屡见不鲜&#xff0c;安全管控一直是高速运营的重中之重。如何利用现代化技术和信息化手段&#xff0c;创新、智能、高效的压降交通事故的发生概率&#xff0c;优化交通安全管控质量&#xff0c;是近年来交管部门的主要工作&#xff0c;也是…

【机器学习】CatBoost 模型实践:回归与分类的全流程解析

一. 引言 本篇博客首发于掘金 https://juejin.cn/post/7441027173430018067。 PS&#xff1a;转载自己的文章也算原创吧。 在机器学习领域&#xff0c;CatBoost 是一款强大的梯度提升框架&#xff0c;特别适合处理带有类别特征的数据。本篇博客以脱敏后的保险数据集为例&#x…

游戏引擎学习第25天

Git: https://gitee.com/mrxiao_com/2d_game 今天的计划 总结和复述&#xff1a; 这段时间的工作已经接近尾声&#xff0c;虽然每次编程的时间只有一个小时&#xff0c;但每一天的进展都带来不少收获。尽管看起来似乎花费了很多时间&#xff0c;实际上这些日积月累的时间并未…

AI开发:生成式对抗网络入门 模型训练和图像生成 -Python 机器学习

阶段1&#xff1a;GAN是个啥&#xff1f; 生成式对抗网络&#xff08;Generative Adversarial Networks, GAN&#xff09;&#xff0c;名字听着就有点“对抗”的意思&#xff0c;没错&#xff01;它其实是两个神经网络互相斗智斗勇的游戏&#xff1a; 生成器&#xff08;Gene…

040集——CAD中放烟花(CAD—C#二次开发入门)

效果如下&#xff1a; 单一颜色的烟花&#xff1a; 渐变色的火花&#xff1a; namespace AcTools {public class HH{public static TransientManager tm TransientManager.CurrentTransientManager;public static Random rand new Random();public static Vector3D G new V…

JavaScript实现tab栏切换

JavaScript实现tab栏切换 代码功能概述 这段代码实现了一个简单的选项卡&#xff08;Tab&#xff09;切换功能。它通过操作 HTML 元素的类名&#xff08;class&#xff09;来控制哪些选项卡&#xff08;Tab&#xff09;和对应的内容板块显示&#xff0c;哪些隐藏。基本思路是先…

【天地图】HTML页面实现车辆轨迹、起始点标记和轨迹打点的完整功能

目录 一、功能演示 二、完整代码 三、参考文档 一、功能演示 运行以后完整的效果如下&#xff1a; 点击开始&#xff0c;小车会沿着轨迹进行移动&#xff0c;点击轨迹点会显示经纬度和时间&#xff1a; 二、完整代码 废话不多说&#xff0c;直接给完整代码&#xff0c;替换…

HCIA笔记6--路由基础与静态路由:浮动路由、缺省路由、迭代查找

文章目录 0. 概念1.路由器工作原理2. 跨网访问流程3. 静态路由配置4. 静态路由的应用场景4.1 路由备份4.2 浮动路由4.3 缺省路由 5. 迭代路由6 问题6.1 为什么路由表中有的下一跳的地址有接口&#xff1f;6.2 个人电脑的网关本质是什么&#xff1f; 0. 概念 自治系统&#xff…

20241129解决在Ubuntu20.04下编译中科创达的CM6125的Android10出现找不到库文件libncurses.so.5的问题

20241129解决在Ubuntu20.04下编译中科创达的CM6125的Android10出现找不到库文件libncurses.so.5的问题 2024/11/29 21:11 缘起&#xff1a;中科创达的高通CM6125开发板的Android10的编译环境需要。 vendor/qcom/proprietary/commonsys/securemsm/seccamera/service/jni/jni_if.…