源码分析RocketMQ之TransactionMQProducer-事物消息

news2024/11/26 10:40:44

Apache RocketMq 在4.3.0版本中已经支持分布式事物消息,采用了2PC的的思想实现提交事物消息,同时增加一个补偿逻辑来处理二阶段超时或者失败的消息。

一、事物消息生产者:TransactionMQProducer
    发送事物消息
    TransactionMQProducer#sendMessageInTransaction  
    1、获取事物监听器,一个消息生产者一个事物监听器
    //事务监听器
    TransactionListener transactionListener = getCheckListener();
    2、忽略延迟级别
    3、在消息属性中添加TRAN_MSG=true 标识事物消息;PGROUP=消息所属发送者组,然后以同步方式发送消息
    4、判断消息发送状态
      1、消息发送成功,回调TransactionListener#executeLocalTransaction方法,执行本地事物,返回本地
      事物状态 localTransactionState
      2、消息发送失败,不执行本地方法,localTransactionState 事物状态设置为回滚
    5、结束事物方法
     this.endTransaction(sendResult, localTransactionState, localException);
     1、获取消息id
     2、获取事物id
     3、获取brokerAddr
     4、构建结束事物消息头
     5、根据事物状态设置回滚或者提交标识
     6、设置生产者组名
     7、设置事物消息队列偏移量
     8、通过同步单向发送结束事物消息
二、事务消息事件监听器:TransactionListener
TransactionListener 的实现
   executeLocalTransaction:方法,记录mq half消息同步发送broker成功后,执行本地事物,
   记录事物状态,
   checkLocalTransaction:本地事物状态回查方法,broker端接收事物消息后定时回查消息,
   根据返回的事物状态,0-UNKNOW,1-COMMIT_MESSAGE,2-ROLLBACK_MESSAGE ,进行事物消息的继续回查,
   提交、或者回滚。
   
三、broker接收处理事物消息
事物消息相对普通消息最大的特点就是一阶段发送的消息对用户不可见,RocketMq 一阶段发送的消息时half消息。

SendMessageProcessor#asyncSendMessage收到处理的消息,通过TRAN_MSG 标识获取事物标识
如果是事物消息通过this.brokerController.getTransactionalMessageService().asyncPrepareMessage(msgInner);进行事物消息处理
 1、备份消息的原主题名称与原队列ID
 2、然后取消是事务消息的消息标签,重新设置消息的主题为:RMQ_SYS_TRANS_HALF_TOPIC
 3、队列ID固定为0

调用org.apache.rocketmq.store.DefaultMessageStore#asyncPutMessage 存储替换后的half事物消息。

四、Commit和Rollback操作
   在完成一阶段half消息写入,二阶段如果是commit操作则需要让消息对用户可见;如果是Rollback操作则需要撤销一阶段的消息
  1、EndTransactionProcessor broker结束事物处理器
    1、提交消息 
      1、根据commitLogOffset从commitlog文件中查找消息返回OperationResult
      2、如果成功找到消息则继续处理,否则返回客户端未找到消息错误
      3、检查消息必要字段
      4、设置消息的相关属性,恢复原消息的数量,取消事物消息的相关系统标记
      5、通过MessageStore将消息存储在CommitLog中,此时消息被转发到原消息主题对应的消费队列,消费端可以消费到
      6、删除half预处理消息,其实是将消息存储在主题为:RMQ_SYS_TRANS_OP_HALF_TOPIC的主题中,代表这些消息已经被处理提交
    2、回滚消息
      1、根据commitlogOffset查找消息
      2、消息存储在RMQ_SYS_TRANS_OP_HALF_TOPIC中,代表该消息已被处理,与提交事务消息不同的是
        提交事务消息会将消息恢复原主题与队列,再次存储在commitlog文件中
      3、删除half预处理消息,其实是将消息存储在主题为:RMQ_SYS_TRANS_OP_HALF_TOPIC的主题中,代表这些消息已经被处理提交

五、事物消息定时回查
   RocketMQ使用TransactionalMessageCheckService线程定时去检测RMQ_SYS_TRANS_HALF_TOPIC主题中的消息,回查消息的事物状态
   TransactionalMessageCheckService检测频率默认1分钟,可以通过broker.conf中设置transactionCheckInterval 来改变单位为毫秒
  1、onWaitEnd 事物回查处理逻辑
  2、从broker配置文件中获取transactionTimeOut参数值默认6秒,表示事务的过期时间
  3、消息的存储时间 + 该值 大于系统当前时间,才对该消息执行事务状态会查
  4、消息默认回查次数15,超过消息会默认为丢弃,即rollback消息
  5、this.brokerController.getTransactionalMessageService().check(timeout, checkMax, this.brokerController.getTransactionalMessageCheckListener())
​    1、事物消息的两个topic
      1、RMQ_SYS_TRANS_HALF_TOPIC:prepare消息的主题,事务消息首先先进入到该主题。
      2、RMQ_SYS_TRANS_OP_HALF_TOPIC:当消息服务器收到事务消息的提交或回滚请求后,会将消息存储在该主题
    2、RMQ_SYS_TRANS_HALF_TOPIC 获取这个topic的所有消费队列,循环遍历消息队列,从单个消息消费队列去获取消息
    3、获取操作队列的消费进度,待操作的消费进度,如果任意一个小于0,忽略该消息队列,继续处理下一个队列
    4、调用fillOpRemoveMap方法填充removeMap、doneOpOffset,主要目的是避免重复调用事物回查接口
    5、getMessageNullCount 获取空消息的次数, newOffset 前处理RMQ_SYS_TRANS_HALF_TOPIC#queueId的最新进度
    6、MAX_PROCESS_TIME_LIMIT 限制每次最多处理的时间,一次最多不超过60秒
    7、如果removeMap中包含当前处理的消息,则继续下一条,removeMap中 的值是通过RMQ_SYS_TRANS_HALF_TOPIC#queueId当前的处理进度时,会添加到removeMap中,表示已处理过
    8、根据消息队列偏移量i从消费队列中获取消息,如果消息为空,则根据允许重复次数进行操作,默认重试一次,目前不可配置。
    9、如果超过重试次数,直接跳出,结束该消息队列的事务状态回查
    10、如果是由于没有新的消息而返回为空(拉取状态为:PullStatus.NO_NEW_MSG),则结束该消息队列的事务状态回查
    11、其他原因,则将偏移量i设置为: getResult.getPullResult().getNextBeginOffset(),重新拉取
    12、判断该消息是否需要discard(吞没,丢弃,不处理)、或skip(跳过)
       1、如果该消息回查的次数超过允许的最大回查次数,则该消息将被丢弃,即事务消息提交失败,
       2、不能被消费者消费,其做法,主要是每回查一次,在消息属性TRANSACTION_CHECK_TIMES中增1,默认最大回查次数为5次
       3、如果事务消息超过文件的过期时间,默认72小时(具体请查看RocketMQ过期文件相关内容),则跳过该消息
    13、valueOfCurrentMinusBorn 该消息已存储的时间,等于系统当前时间减去消息存储的时间戳
    14、checkImmunityTime 立即检测事务消息的时间,其意义是应用程序发送事物消息后,事物不会马上提交
      该时间是假设事物消息发送成功后,应用提交事物的时间,这个时间内RocketMq任务事物未提交,因此这个时间不应该发送
      事物回查请求。
    15、checkImmunityTimeStr 事物消息过期时间, 如果立即检测事务消息的时间超过已存储时间,就算时间小于checkImmunityTime时间,也发送事务回查指令
    16、如果当前时间还未过(应用程序事务结束时间),则跳出本次回查处理的,等下一次再试
    17、从op队列获取opMsg集合,判断是否需要发送事物消息回查
       1、如果从操作队列(RMQ_SYS_TRANS_OP_HALF_TOPIC)中没有已处理消息并且已经超过(应用程序事务结束时间)
       2、如果操作队列不为空,并且最后一天条消息的存储时间已经超过transactionTimeOut值。 
    18、如果需要发送事务状态回查消息,则先将消息再次发送到RMQ_SYS_TRANS_HALF_TOPIC主题中发送成功则返回true,否则返回false
    19、如果发送成功,会将该消息的queueOffset、commitLogOffset设置为重新存入的偏移量,为什么需要这样呢
       答案在listener.resolveHalfMsg(msgExt)中
       1、AbstractTransactionalMessageCheckListener#resolveHalfMsg 发送具体的事务回查机制,这里用一个线程池来异步发送回查消息
       为了回查进度保存的简化,这里只要发送了回查消息,当前回查进度会向前推动,如果回查失败,上一步骤新增的消息将可以再次发送回查消息
       如果回查消息发送成功,那会不会下一次又重复发送回查消息呢?这个可以根据OP队列中的消息来判断是否重复
       如果回查消息发送成功并且消息服务器完成提交或回滚操作,这条消息会发送到OP队列中,然后首先会通过fillOpRemoveMap根据
       处理进度获取一批已处理的消息,来与消息判断是否重复,由于fillopRemoveMap一次只拉32条消息,那又如何保证一定能拉取到与
       当前消息的处理记录呢?其实就是通过第17步,如果此批消息最后一条未超过事务延迟消息,
       则20步继续拉取更多消息进行判断
       2、通过异步方式发送消息回查的实现过程
         1、构建检查事物状态请求消息头
         2、设置消息offsetId、消息事物id、事务消息队列中的偏移量(RMQ_SYS_TRANS_HALF_TOPIC)
         3、恢复原消息的主题、队列,并设置storeSize为0
         4、获取生产者组名称
         5、根据生产者组获取任意一个生产者,通过与其连接发送事务回查消息,回查消息的请求者为【Broker服务器】,接收者为(client,具体为消息生产者)
         6、发送回查消息 brokerController.getBroker2Client().checkProducerTransactionState RequestCode.CHECK_TRANSACTION_STATE
       3、org.apache.rocketmq.client.impl.ClientRemotingProcessor#checkTransactionState 生产者接收回查消息
         1、使用一个匿名类( Runnable )构建一个运行任务
           1、执行TransactionListener#checkLocalTransaction,检测本地事务状态,也就是应用程序需要实现
           2、然后返回COMMIT_MESSAGE、ROLLBACK_MESSAGE、UNKNOW中的一个,然后向Broker发送END_TRANSACTION
         2、然后提交到checkExecutor线程池中执行
    20、 fillOpRemoveMap拉取更多op消息继续进行判断
    21、transactionalMessageBridge.updateConsumeOffset 保存(Prepare)消息队列的回查进度。
    22、transactionalMessageBridge.updateConsumeOffset 保存处理队列(op)的进度  

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1097316.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

手机通过WiFi连接调试UR机器人

1.测试物料 1.1ur机器人 https://item.taobao.com/item.htm?spma1z10.1-c.w4004-25069442759.18.2ff56d6bmuxX0Z&id740002623764 1.2 路由器(TPLINK) https://detail.tmall.com/item.htm?abbucket7&id548610924784&ns1&spma21n57.1.…

性能超越 Clickhouse | 物联网场景中的毫秒级查询案例

1 物联网应用场景简介 物联网(Internet of Things,简称 IoT)是指通过各种信息传感、通信和 IT 技术来实时连接、采集、监管海量的传感设备,从而实现对现实世界的精确感知和快速响应,继而实现自动化、智能化管理。在查…

DITA-OT 4.0新特性 - PDF themes,定制PDF样式的新方法

随着DITA-OT 4.0的发布,它提供了一种新的定制PDF样式方法,这种方法就是PDF theme。这篇文章来聊一聊这种定制PDF输出的新方法和实验结果。 在进入PDF theme细节之前,为各位读者梳理一下DITA-OT将DITA和Markdown发布成PDF的几种方法。 - 1 …

『Linux升级路』基本指令

🔥博客主页:小王又困了 📚系列专栏:Linux 🌟人之为学,不日近则日退 ❤️感谢大家点赞👍收藏⭐评论✍️ 目录 一、认识操作系统 📒1.1什么是操作系统 📒1.2操作系统…

Day15|104.二叉树的最大深度

一、104.二叉树的最大深度 题目链接:https://leetcode.cn/problems/maximum-depth-of-binary-tree/ 文章链接:https://programmercarl.com/0104.%E4%BA%8C%E5%8F%89%E6%A0%91%E7%9A%84%E6%9C%80%E5%A4%A7%E6%B7%B1%E5%BA%A6.html#%E7%9B%B8%E5%85%B3%E9%…

Bootstrap的徽章样式设计,徽章常用作作为显示未读内容或动态计数内容

Bootstrap的徽章样式&#xff0c;通过添加类badge来实现。 目录 01-往标题中添加徽章02-给按钮、链接添加徽章03-设置徽章的颜色04-设置胶囊形徽章 01-往标题中添加徽章 通常在<span>标签添加类badge实现。 示例代码如下&#xff1a; <!DOCTYPE html> <html&g…

二维码智慧门牌管理系统:解决公安标准地址与实际楼栋名称的差异

文章目录 前言一、二维码智慧门牌管理系统的核心功能二、广泛应用领域 前言 在当今信息化社会&#xff0c;精准的地标信息是日常生活中不可或缺的部分。特别是在小区管理中&#xff0c;精准的楼栋名称和地址信息显得尤为重要。但实际上&#xff0c;公安标准地址与实际楼栋名称…

JVM 垃圾回收算法详解

目录 1 垃圾回收算法1.1 标记清除算法1.2 复制算法1.3 标记整理算法1.4 分代回收算法1.4.1 对象进入老年代的条件 1 垃圾回收算法 有四种垃圾回收算法&#xff1a; 标记清除算法复制算法标记整理算法分代回收算法 1.1 标记清除算法 标记&#xff1a;遍历内存区域&#xff0…

AlexNet论文阅读

开始之前的简介:这篇论文是王林蓉师姐推荐给我看的第一篇入门级别的cv领域的论文,也算是我入手研究生阶段的第一篇论文.我是打算先看看这一领域的论文,然后写的自己一点随笔.若有错误欢迎指正. 一. 专有词汇 非饱和神经元 dropout 饱和非线性,非饱和非线性 二. 论文结构 三. 核…

10G SDH传输分析仪该如何选择

TFN D450S 传输分析仪 功能全面 使用方便 是 通信人的不二选择

深入了解企业税收违法信息API:实现智能风险评估

引言 企业税收违法是一项严重的经济犯罪&#xff0c;可能导致严重的法律后果和金融损失。为了帮助企业和金融机构识别并预防潜在的税收违法行为&#xff0c;智能风险评估变得至关重要。在这一领域&#xff0c;企业税收违法信息API发挥着重要的作用&#xff0c;提供了关键的数据…

一图读懂「五度情报站」全盘视野,情报智取,先知先行,决策有道!

「五度情报站」是一款集企业情报监测、管理、分析等多功能于一体微信小程序&#xff0c;其依托全体量产业大数据及强大的数据治理能力&#xff0c;收录了商业、市场、竞争、企业、技术、金融等全类别情报信息&#xff0c;构建了面向用户的业务型标签体系&#xff0c;设计了实用…

PyTorch深度学习实战(21)——从零开始实现Faster R-CNN目标检测

PyTorch深度学习实战&#xff08;21&#xff09;——从零开始实现Faster R-CNN目标检测 0. 前言1. Fast R-CNN 目标检测模型组成1.1 锚框1.2 区域提议网络1.3 分类和回归 2. 实现 R-CNN 目标检测2.1 数据处理2.2 模型构建2.3 模型训练与测试 小结系列链接 0. 前言 Faster R-CN…

手机抬手亮屏解锁,用到了哪些硬件?

随着时代发展&#xff0c;智能手机以丰富的功能及便利性&#xff0c;成为了人们必不可少的物品&#xff0c;其中人脸解锁功能是非常有用的功能&#xff0c;广受年轻人的喜爱&#xff0c;那么你知道她是如何实现吗&#xff1f;今天凡小亿带你们探索&#xff01; 手机抬手亮屏解锁…

谨以此篇,纪念我2023年曲折的计算机保研之路

目录 阶段一&#xff1a;迷茫阶段二&#xff1a;准备个人意愿保研材料准备套磁老师5.1日 浙大线上编程测试5.8日 浙大线上面试 —— 一面5.17日 浙大线上面试——二面5.29日 实验室面试结果5.27日 南开线上面试6.20日 华师电话面试 阶段三&#xff1a;旅途北航CS&#xff08;6.…

ebpf的快速开发工具--libbpf-bootstrap

基于ubuntu22.04-深入浅出 eBPF 基于ebpf的性能工具-bpftrace 基于ebpf的性能工具-bpftrace脚本语法 基于ebpf的性能工具-bpftrace实战(内存泄漏) 什么是libbpf-bootstrap libbpf-bootstrap是一个开源项目&#xff0c;旨在帮助开发者快速启动和开发使用eBPF(Extended Berk…

微服务拆分的思考

一、前言 前面几篇文章介绍了微服务核心的两个组件&#xff1a;注册中心和网关&#xff0c;今天我们来思考一下微服务如何拆分&#xff0c;微服务拆分难度在于粒度和层次&#xff0c;粒度太大拆分的意义不大&#xff0c;粒度太小开发、调试、运维会有很多坑。 二、微服务划分…

【初识Linux】:常见指令(2)

朋友们、伙计们&#xff0c;我们又见面了&#xff0c;本期来给大家解读一下有关Linux的基础知识点&#xff0c;如果看完之后对你有一定的启发&#xff0c;那么请留下你的三连&#xff0c;祝大家心想事成&#xff01; C 语 言 专 栏&#xff1a;C语言&#xff1a;从入门到精通 数…

为健康护航的小帮手,dido E55S Pro智能手表体验

现在很多年轻人每天都要长时间工作&#xff0c;没有时间锻炼身体&#xff0c;很容易导致各种健康隐患&#xff0c;工作效率也容易下降&#xff0c;非常有必要通过智能手表等工具&#xff0c;随时监测自己的健康状态。现在支持健康监测的智能手表非常多&#xff0c;用起来也简单…

Java反射使用实例

Java反射&#xff1a;解析类的秘密 Java反射是一项强大的功能&#xff0c;允许开发人员在运行时检查、操作和实例化类、方法、字段以及其他Java程序中的元素。这种能力赋予了Java语言更大的灵活性和动态性&#xff0c;但也需要慎重使用&#xff0c;因为它可能会导致性能问题和…