MQ最终一致性理论与实践

news2025/1/13 14:27:41

MQ最终一致性理论与实践

原理

分布式事务无论是2PC&3PC还是TCC,基本都遵守XA协议的思想,但全局事务方案并发性较差;

最终一致性方案指的是将最有可能出错的业务以本地事务的方式完成后,采用不断重试的方式(不限于消息系统)来促使同一个分布式事务中的其他关联业务全部完成,不遵从XA协议。相关理论可以参考并学习 分布式事务 | 凤凰架构 | 可靠事件队列

需求

创建订单(order-service), 同时扣减库存(repo-service)

非事务型消息队列

非事务型消息队列

order-service 本地事务

  1. 在t_order表添加订单记录
  2. 在transaction_log 添加对应的扣减库存消息

repo-service 本地事务

  1. 检查本次扣减库存操作是否已经执行过 && 是否可以扣减库存
  2. 执行扣减库存
  3. 写判重表
  4. 向MQ 发送消费完成 ACK

repo-service重复收到消息的原因,一是生产者重复生产,二是中间件重传。为了实现业务的幂等性,repo-service 中维护了一张判重表

  1. order-service后台任务会把消息表中的消息发送给MQ,成功后则删除消息表中的消息。如网络超时则会重新发送消息直到MQ响应成功ACK。这样可能会导致消息的重复,需要repo-service做去重操作。
  2. MQ向repo-service推送消息时,repo-service处理消费完成后会向MQ进行ACK响应,但如果ACK响应发送网络超时则也会出现消费重复消费的情况,需要repo-service做去重操作。

RocketMQ事务型消息队列

RocketMQ事务型消息队列

存在的问题

  1. producer发送失败
  2. producer.send()返回消息异常
  3. 本地事务执行,如果异常,此时如何解决

解决方案

  1. 如果发送失败,那么调用端直接抛出异常,后续不会执行。✅
  2. 如果producer.send没有返回send_ok,则不会执行executeLocal方法,后续会执行check回查,一直查不到信息,最后会回滚消息。✅
  3. rocketMQ的client在事务消息中的bug 下文分析 ❌

如实现方案类似@Transactional add(serviceA→producer.sendTransactionMessage()→saveTransaction(中置half)这一流程,如果在本地事务执行过程中,saveTransaction出现异常,当前操作不会成功,但是由于exception会被RocketMQ捕获,且不会继续抛出,因此异常不会被事务方法add感知,导致serivceA执行成功,但是本地事务不成功。然后执行回查时,会回滚消息,后续的serviceB不会消费消息。

RocketMQ 源码解析

try {
	sendResult = this.send(msg); //同步发送消息
} catch (Exception e) {
    throw new MQClientException("send message Exception", e);
}

LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW;
Throwable localException = null;

//判断sendResult类型
switch (sendResult.getSendStatus()) {
    case SEND_OK: {
        try {
            if (sendResult.getTransactionId() != null) {
                msg.putUserProperty("__transactionId__", sendResult.getTransactionId());
            }
            String transactionId = msg.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);
            if (null != transactionId && !"".equals(transactionId)) {
                msg.setTransactionId(transactionId);
            }
            if (null != localTransactionExecuter) {
                localTransactionState = localTransactionExecuter.executeLocalTransactionBranch(msg, arg);
            } else if (transactionListener != null) {
                log.debug("Used new transaction API");
                //执行本地事务
                localTransactionState = transactionListener.executeLocalTransaction(msg, arg);
            }
            if (null == localTransactionState) {
                localTransactionState = LocalTransactionState.UNKNOW;
            }

            if (localTransactionState != LocalTransactionState.COMMIT_MESSAGE) {
                log.info("executeLocalTransactionBranch return {}", localTransactionState);
                log.info(msg.toString());
            }
        } catch (Throwable e) {
        		//在executeLocalTransaction执行中,如果抛出异常,会被catch掉,但是没有重新throw,因此不会被调用方感知
            log.info("executeLocalTransactionBranch exception", e);
            log.info(msg.toString());
            localException = e;
        }
    }
    break;
    case FLUSH_DISK_TIMEOUT:
    case FLUSH_SLAVE_TIMEOUT:
    case SLAVE_NOT_AVAILABLE:
        localTransactionState = LocalTransactionState.ROLLBACK_MESSAGE;
        break;
    default:
        break;
}

RocketMQ最终一致性如何正确开发

producer端

发送half后置

当前half后置发送的开发,订单服务+事务消息落库+producer.sendHalf都是在一个事务中,注意去判断发送消息返回是否为send_ok。
在这里插入图片描述

  1. 订单数据或者事务消息有异常,由于在同一个事务中,因此事务rollback ✅
  2. 如果half消息发送异常,外层事务方法可以感知,因此事务rollback ✅
  3. 对sendResult的发送结果判断事务是否发送成功,如果发送结果不是send_ok,那么需要抛出异常,此时执行事务rollback ✅

发送half中置

订单和producer.sendHalf在同一个事务方法中,事务消息持久化在executeLocalTransaction方法中。
在这里插入图片描述

  1. 如果下单异常,那么事务rollback,则send不会执行 ✅
  2. 如果half发送失败,事务rollback ✅
  3. 如果send_ok,那么下单操作完成,但是executeLocalTransaction执行失败
    1. 如果不抛出异常,localTransactionState=RollBack时,订单的事务方法感知不到异常,导致订单落库,事务消息存储失败 ❌
    2. 如果抛出异常,会被rocketmq捕获,也感知不到异常 ❌

发送half前置

把业务方法和事务持久化的操作,统一放在executeLocalTransaction方法中
在这里插入图片描述

  1. producer.sendHalf() 异常或者状态不为send_ok,那么抛出异常,本地事务不执行 ✅
  2. 如果本地事务抛出异常,事务Rollback。抛出的异常被rocketmq捕获,broker不会得到事务状态和启动本地回查。✅

结论

对于half前后置都可以保证事务的最终一致性,但是对于把所有的事务执行放在executeLocalTransaction中执行,略微有些问题

  1. 如果业务方法耗时,执行executeLocalTransaction的执行时间过长,可以会增加不必要的回查;而half消息后置,把业务方法先执行,那么会减少不必要的事务回查。
  2. 会添加将object转化为java-bean的代码。

consumer端

  1. consumer端消费失败而去执行回滚的话,需要付出更多的代价,而且还会引发其他系统回退导致的新问题。
  2. consumer端会返回reconsume_later,并重发消息,且默认重试16次,直到消费成功。如果失败,则人工介入处理。

解决方案

  1. 设置消息重试次数,如果达到指定次数,就发邮件或者短信通知人工介入;
  2. 等待消息重试次数超过默认的16次,进入死信队列,然后程序监听对应的私信队列主题,通知人工介入或者在rocketMQ控制台查看处理。

实战(代码样例)

欢迎star https://github.com/WeiXiao-Hyy/mq-eventual-consistency

参考资料

  • MQ最终一致性事务 - 文章分类 - LBJboy - 博客园
  • SpringCloud Alibaba微服务实战三十二 - 集成RocketMQ实现分布式事务
  • 基于RocketMQ分布式事务 - 完整示例 - 掘金

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1459289.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Sora内测申请详细教程

Sora内测申请详细教程 Sora 的创作能力,已经让很多人震惊了,自己制作电影的时代来了,以前做个短视频觉得已经够满足了,现在人人都能成为导演。 这几天大家都在等什么时候能用上,我给他分享一个可能提前用上Sora的方法…

Panalog大数据日志审计系统libres_syn_delete.php存在命令执行漏洞

文章目录 前言声明一、Panalog大数据日志审计系统简介二、漏洞描述三、影响版本四、漏洞复现五、整改意见 前言 Panalog大数据日志审计系统定位于将大数据产品应用于高校、 公安、 政企、 医疗、 金融、 能源等行业之中,针对网络流量的信息进行日志留存&#xff0c…

MyBatis学习总结

MyBatis分页如何实现 分页分为 逻辑分页:查询出所有的数据缓存到内存里面,在从内存中筛选出需要的数据进行分页 物理分页:直接用数据库语法进行分页limit mybatis提供四种方法分页: 直接在sql语句中分页,传递分页参数…

多进程(1)

1> 使用多个进程实现文件拷贝 #include<myhead.h> int main(int argc, const char *argv[]) {pid_t pid;pidfork();int fdr;char buf;if((fdropen(argv[1],O_RDONLY))-1){perror("open error");return -1;}int lenlseek(fdr,0,SEEK_END)-lseek(fdr,0,SEEK_…

openGauss学习笔记-224 openGauss性能调优-系统调优-数据库系统参数调优-数据库并发队列参数调优

文章目录 openGauss学习笔记-224 openGauss性能调优-系统调优-数据库系统参数调优-数据库并发队列参数调优224.1 全局并发队列224.2 局部并发队列 openGauss学习笔记-224 openGauss性能调优-系统调优-数据库系统参数调优-数据库并发队列参数调优 数据库提供两种手段进行并发队…

掌握社区店选址技巧,提升商业成功率

对于想开实体店或创业的人来说&#xff0c;选址是决定商业成功的关键因素之一。本人在社区店开鲜奶吧5年时间&#xff0c;我将分享一些实用的社区店选址技巧&#xff0c;帮助你提升商业成功率。 1、人口密度和流量&#xff1a; 选择人口密集、流量大的社区&#xff0c;这样可以…

redis scan命令导致cpu飙升

一.背景 今天下午Redis的cpu占用突然异常升高&#xff0c;一度占用达到了90%&#xff0c;触发了钉钉告警&#xff0c;之后又回到正常水平&#xff0c;跟DBA沟通&#xff0c;他说主要是下面这个语句的问题 SCAN 0 MATCH fastUser:6136* COUNT 10000这个语句的执行时长很短&…

苍穹外卖学习-----2024/02/19

1.开发环境搭建 我的git截图我使用的datagrip 运行sql学习到jwt令牌一种新的配置方式&#xff0c;写配置文件学习到了build属性nginx解决跨域的问题2.导入接口的文档 结果如图所示 3.Swagger /*** 通过knife4j生成接口文档* return*/Beanpublic Docket docket() {ApiInfo api…

论文阅读——ONE-PEACE

ONE-PEACE: EXPLORING ONE GENERAL REPRESENTATION MODEL TOWARD UNLIMITED MODALITIES 适应不同模态并且支持多模态交互。 预训练任务不仅能提取单模态信息&#xff0c;还能模态间对齐。 预训练任务通用且直接&#xff0c;使得他们可以应用到不同模态。 各个模态独立编码&am…

阿里云服务器多少钱一台?61元一年您看行吗?

2024年阿里云服务器租用价格表更新&#xff0c;云服务器ECS经济型e实例2核2G、3M固定带宽99元一年、ECS u1实例2核4G、5M固定带宽、80G ESSD Entry盘优惠价格199元一年&#xff0c;轻量应用服务器2核2G3M带宽轻量服务器一年61元、2核4G4M带宽轻量服务器一年165元12个月、2核4G服…

Python简单小案例之 筷手美女下载保存本地

嗨喽&#xff0c;大家好呀~这里是爱看美女的茜茜呐 知识点: 动态数据抓包 requests发送请求 开发环境: python 3.8 运行代码 解释器 pycharm 2022.3 辅助敲代码 编辑器 requests pip install requests &#x1f447; &#x1f447; &#x1f447; 更多精彩机密、教程&…

【C++】类与对象(构造函数、析构函数、拷贝构造函数、常引用)

&#x1f308;个人主页&#xff1a;秦jh__https://blog.csdn.net/qinjh_?spm1010.2135.3001.5343&#x1f525; 系列专栏&#xff1a;http://t.csdnimg.cn/eCa5z 目录 类的6个默认成员函数 构造函数 特性 析构函数 特性 析构的顺序 拷贝构造函数 特性 常引用 前言 &…

频谱仿真平台HTZ Communications为私有5G建设铺平道路

韩国的国家监管机构韩国通信委员会&#xff08;KCA&#xff09;计划在德思特频谱仿真平台HTZ Communications的支持下加快扩大无线电接入范围&#xff0c;提升全国电信服务的质量和效率。 韩国通信委员会&#xff08;KCA&#xff09;在韩国的监管环境中扮演着至关重要的角色&am…

Spring6学习技术|IoC+基于xml管理bean

学习材料 尚硅谷Spring零基础入门到进阶&#xff0c;一套搞定spring6全套视频教程&#xff08;源码级讲解&#xff09; IoC 控制反转。是一种设计思想。 1.获取bean对象的方法 通过id&#xff0c;通过class&#xff0c;和双重方式。 ApplicationContext context new Cla…

云呐智能运维硬件包括哪些?智能运维体系包括哪些?

智能运维体系时&#xff0c;能够详细了解该体系包含的各个组成部分。具体来说&#xff0c;我们应该知道智能运维体系中涉及的软件组件有哪些&#xff0c;以及这些组件是如何相互协作以实现高效运维的。此外&#xff0c;智能运维体系中使用的硬件设备感兴趣。列举了智能运维硬件…

测试工具之压测工具JMeter(一)

有时候我们接到的需求是秒杀或者抽奖类的功能开发&#xff0c;这时候可能会在某一时间点大量请求并发&#xff0c;我们手工自测很难发现一些高并发场景下的问题&#xff0c;这时候可以借助一些压测工具帮我们模拟出大量请求来测试我们的接口是否能满足业务要求。JMeter是Apache…

数据分析 — 招聘数据爬取和分析

目录 一、数据获取二、词云图语法1、jieba 分词2、词云图 一、数据获取 需求&#xff1a; 招聘数据获取地址&#xff1a;https://careers.tencent.com/home.html 获取字段&#xff1a;岗位的名称、岗位职责、发布时间 import pandas as pd # 导入 Pandas 库并使用别名 pd im…

深度学习——概念引入

深度学习 深度学习简介深度学习分类根据网络结构划分&#xff1a;循环神经网络卷积神经网络 根据学习方式划分&#xff1a;监督学习无监督学习半监督学习 根据应用领域划分&#xff1a;计算机视觉自然语言处理语音识别生物信息学 深度学习简介 深度学习&#xff08;Deep Learni…

什么是矩阵的秩?如何计算矩阵的秩?(done)

什么是矩阵的秩&#xff1f;https://search.bilibili.com/all?vt21986927&keyword%E4%BB%80%E4%B9%88%E6%98%AF%E7%9F%A9%E9%98%B5%E7%9A%84%E7%A7%A9%EF%BC%9F&from_sourcewebtop_search&spm_id_from333.1007&search_source5 矩阵本质上是线性方程组。但是方…

Rust可以解决的常见问题

文章目录 前言1. 悬垂指针&#xff08;Dangling Pointers&#xff09;修复悬垂指针问题 2. 缓冲区溢出&#xff08;Buffer Overflow&#xff09;那么是什么是缓冲区溢出&#xff1f;rust处理缓冲区溢出问题 3. 数据竞争&#xff08;Data Races&#xff09;4. 空指针&#xff08…