RocketMq 事务消息原理

news2024/11/25 7:31:10

Rocketmq 事务消息API使用

使用TransactionMQProducer类。 实现TransactionListener 接口覆盖其方法executeLocalTransaction和checkLocalTransaction 即可。

其中executeLocalTransaction 执行本地方法和checkLocalTransaction 事务状态回查。

玩法

  1. 简历一张本地事务表,字段大概有(rocketmq事务id,业务事务id),
  2. 于executeLoalTransaction,利用数据库事务特性,和业务数据同时持久化到数据库。
  3. checkLocalTransaction. 按rocketmq事务id查询数据库,是否有对应的数据。

为什么需要本地事务表

保证可靠性。当业务事务提交后节点宕机。rocketmq同样也能回查到数据。

流程分析

事务消息源码分析 

实现原理是基于二阶段提交和定时事务状态回查实现的。

二阶段提交分析

涉及相关类

Producer

TransactionMQProducer

DefaultMQProducerImpl

TransactionListener

Broker

SendMessageProcessor

EndTransactionProcessor

分析流程

  1. 入口方法TransactionMQProducer.sendMessageInTransaction 投递事务消息
  2. 调用DefaultMQProducerImpl.sendMessageInTransaction
  3. 为消息头部增加事务消息标志,发送消息。
  4. Broker 入口方法 SendMessageProcessor#sendMessage检查消息头部是否有事务标记,有投递半消息。响应Producer 结果包括事务id
  5. Producer收到消息成功发送结果后,执行本地事务。并通知Broker 本地事务执行结果。
  6. Broker 入口方法EndTransactionProcessor#processRequest 。按收到结果做决定。若是事务提交则投递普通消息,删除半消息。若是事务回滚则删除半消息。

事务消息回查

RocketMQ 通过TramsactionalMessageCheckService 线程定时去检测RMQ_SYS_TRANS_HALF_TOPIC主题中的消息,回查消息的事务状态。TransactionalMessageCheckService 的检测频率默认为1分钟,可通过broker.conf文件中设置transactionCheckInterval 来改变默认值,单位为毫秒

public class TransactionalMessageCheckService extends ServiceThread {
    private static final Logger log = LoggerFactory.getLogger(LoggerName.TRANSACTION_LOGGER_NAME);

    private BrokerController brokerController;

    public TransactionalMessageCheckService(BrokerController brokerController) {
        this.brokerController = brokerController;
    }

  //.... 省略代码
    @Override
    protected void onWaitEnd() {
        long timeout = brokerController.getBrokerConfig().getTransactionTimeOut();
        int checkMax = brokerController.getBrokerConfig().getTransactionCheckMax();
        long begin = System.currentTimeMillis();
        log.info("Begin to check prepare message, begin time:{}", begin);
        this.brokerController.getTransactionalMessageService().check(timeout, checkMax, this.brokerController.getTransactionalMessageCheckListener());
        log.info("End to check prepare message, consumed time:{}", System.currentTimeMillis() - begin);
    }

}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/785300.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Centos报错:[Errno 12] Cannot allocate memory

执行一个脚本刚开始正常,后面就报[Errno 12] Cannot allocate memory 如果内存不足,可能需要增加交换内存。或者可能根本没有启用交换。可以通过以下方式检查您的交换: sudo swapon -s如果它为空,则表示您没有启用任何交换。添加 1GB 交换…

构建自己的ChatGPT:从零开始构建个性化语言模型

🌷🍁 博主 libin9iOak带您 Go to New World.✨🍁 🦄 个人主页——libin9iOak的博客🎐 🐳 《面试题大全》 文章图文并茂🦕生动形象🦖简单易学!欢迎大家来踩踩~&#x1f33…

FPGA XDMA 中断模式实现 PCIE3.0 QT上位机视频传输 提供工程源码和QT上位机源码

目录 1、前言2、我已有的PCIE方案3、PCIE理论4、总体设计思路和方案图像产生、发送、缓存XDMA简介XDMA中断模式图像读取、输出、显示QT上位机及其源码 5、vivado工程详解6、上板调试验证7、福利:工程代码的获取 1、前言 PCIE(PCI Express)采…

【微服务系统设计】系统设计基础:速率限制器

什么是速率限制器? 速率限制是指防止操作的频率超过定义的限制。在大型系统中,速率限制通常用于保护底层服务和资源。速率限制一般在分布式系统中作为一种防御机制,使共享资源能够保持可用性。 速率限制通过限制在给定时间段内可以到达您的 A…

java学习路程之篇三、进阶知识、面向对象高级、接口新特性、代码块、内部类、Lambda表达式、窗体、组件、事件

文章目录 1、接口新特性2、代码块3、内部类4、Lambda表达式5、窗体、组件、事件 1、接口新特性 2、代码块 3、内部类 4、Lambda表达式 5、窗体、组件、事件

python 压测 +paramiko 远程监下载日志 +js 测试报告

目录 前言: 关于压测客户端 netty nio 压测端 python tornado 异步框架压测 python 协程压测端 远程监控 js 解析日志 前言: 在软件开发中,压测和测试是非常重要的一个环节,它可以帮助我们更加全面地检测软件中的安全漏洞…

SR04 超声波测距模块

文章目录 前言一、SR04 模块介绍二、设备树设置三、驱动程序四、测试程序五、上级测试及效果总结 前言 超声波测距模块 是利用超声波来测距。模块先发送超声波,然后接收反射回来的超声波,由反射经历的时间和声音的传播速度 340m/s,计算得出距…

剑指offer40.最小的k个数

简直不要太简单了这道题,先给数组排个序,然后输出前k个数就好了。我用的是快排,这是我的代码: class Solution {public int[] getLeastNumbers(int[] arr, int k) {int n arr.length;quickSort(arr, 0, n-1);int[] res new int…

Mysql 简介

Mysql 简介 学习目的 MySQL作为目前最流行的关系型数据库管理系统之一,因其开源免费的特性,成为小型Web应用的重点关注对象。几乎所有的动态Web应用基本都在使用MySQL作为数据管理系统。学习MySQL的目的也是为了更好地理解数据库相关的SQL注入漏洞&…

【性能优化】MySQL百万数据深度分页优化思路分析

业务场景 一般在项目开发中会有很多的统计数据需要进行上报分析,一般在分析过后会在后台展示出来给运营和产品进行分页查看,最常见的一种就是根据日期进行筛选。这种统计数据随着时间的推移数据量会慢慢的变大,达到百万、千万条数据只是时间问…

关于脑电睡眠分期,你应该知道的还有这些

导读 基于电生理信号(EEG,EOG和EMG)对睡眠阶段进行识别的建议源自Rechtschaffen和Kales手册,由美国睡眠医学学会于2007年发布,并定期更新多年。这些建议对于评估不同类型的睡眠/觉醒主观评定中的客观标志物非常重要。凭借研究的简单、可重复…

windows/linux/mac上编译open3d 0.17.0

目录 写在前面准备编译windows:linux/mac:注: 参考完 写在前面 1、本文内容 windows/linux/mac上编译open3d 0.17.0 2、平台 通过cmake构建项目,跨平台通用 3、转载请注明出处: https://blog.csdn.net/qq_41102371/article/details/1318918…

基于C++的QT基础教程学习笔记

文章目录: 来源 教程社区 一:QT下载安装 二:注意事项 1.在哪里写程序 2.如何看手册 3.技巧 三:常用函数 1.窗口 2.相关 3.按钮 4.信号与槽函数 5.常用栏 菜单栏 工具栏 状态栏 6.铆接部件 7.文本编辑 8…

[ELK安装篇]:基于Docker虚拟容器化(主要LogStash)

文章目录 一:前置准备-(参考之前博客):1.1:准备Elasticsearch和Kibana环境:1.1.1:地址:https://blog.csdn.net/Abraxs/article/details/128517777 二:Docker安装LogStash(数据收集引擎&#xff…

SH-FAPI-4,新型tumor显像剂,其中FAPI通过与FAP结合

资料编辑|陕西新研博美生物科技有限公司小编MISSwu​ SH-FAPI-4 其中FAPI通过与FAP结合,可在PET-CT扫描中可视化tumor的位置和大小,从而帮助确定tumor的类型和位置,并指导tumor treatment的选择。FAPI被认为是一种具有潜在应用前景的新型tu…

vue检测数据变化的原理

vue监测数据变化的原理 vue会监视data中所有层次的数据。 监测对象类型的数据 原理 vue监测对象类型的数据通过setter实现,且要在new Vue时就传入要监测的数据。 对象中后追加的属性,Vue默认不做响应式处理;如需后续添加的属性做响应式&am…

吉林大学计算机软件考研经验贴

文章目录 简介政治英语数学专业课 简介 本人23考研,一战上岸吉林大学软件工程专硕,政治72分,英一71分,数二144分,专业课967综合146分,总分433分,上图: 如果学弟学妹需要专业课资料…

STM32MP157驱动开发——按键驱动(定时器)

“定时器 ”机制: 内核函数 定时器涉及函数参考内核源码:include\linux\timer.h 给定时器的各个参数赋值: setup_timer(struct timer_list * timer, void (*function)(unsigned long),unsigned long data):设置定时器&#xf…

HALCON error #5504 Image too large for this HALCON version in operator问题解决

目录: 一,问题概述:二,解决方法 一,问题概述: 🌀当你直接或间接使用Halcon来做图像读取的时候,你可能遇到5504错误:HalconDotNet.HOperatorException:HALCON error #5504…

传奇开区网站打开跳转到别的网站处理教程

打开跳转被劫持到其他网站如何处理教程。 在解决劫持之前,需要先确定一下身份,如果是网站被劫持了,或者是访客访问自己的网站被劫持到其他的网站上,解决起来的方法不一样,下面一休分类分享给大家 1、访客身份处理方法…