RocketMQ不同的类型消息

news2025/1/24 5:08:12

目录

普通消息

可靠同步发送

可靠异步发送

单向发送

三种发送方式的对比

顺序消息

事物消息

 两个概念

 事务消息发送步骤

事务消息回查步骤

消息消费要注意的细节

RocketMQ支持两种消息模式:


普通消息

RocketMQ提供三种方式来发送普通消息:可靠同步发送、可靠异步发送和单向发送。

可靠同步发送

同步发送是指消息发送放发出数据后,会在收到接收方 响应之后才发下一个数据包的通讯方式。

这种方式应用场景非常广泛,列如重要通知邮件、报名短信通知、营销短信系统等。

//同步消息
   @Test
   public void testSyncSend() {
//参数一: topic, 如果想添加tag 可以使用"topic:tag"的写法
//参数二: 消息内容
      SendResult sendResult =
              rocketMQTemplate.syncSend("test-topic-1", "这是一条同步消息");                         
      System.out.println(sendResult);
   }

可靠异步发送

异步发送是指发送放  发出数据后,不等接收方 发回响应,接着发送下一个数据包的通讯方式。发送方 通过回调接口接收服务器响应,并对响应结果进行处理。

异步发送一般用于 链路耗时较长,对RT响应时间较为敏感的业务场景,列如用户视频上传后通知启动转码服务,转码完成后通知推送转码结果等。

//异步消息
   @Test
   public void testAsyncSend() throws InterruptedException { 
        public void testSyncSendMsg() {
//参数一: topic, 如果想添加tag 可以使用"topic:tag"的写法
//参数二: 消息内容
//参数三: 回调函数, 处理返回结果
          rocketMQTemplate.asyncSend(
                "test-topic-1", 
                "这是一条异步消息",
          new SendCallback() {
             @Override
             public void onSuccess(SendResult sendResult) {             
                 System.out.println(sendResult);
              }
              @Override
              public void onException(Throwable throwable) {             
                 System.out.println(throwable);
              }
        });
//让线程不要终止
       Thread.sleep(30000000);
   }

单向发送

单向发送是指发送方 只负责发送消息,不等待服务器回应 没有 回调函数触发,只发送请求不等待应答。

适合用于某些耗时非常短,但对可靠性要求并不高的场景,不如说日志收集。

//单向消息
      @Test
      public void testOneWay() {
         rocketMQTemplate.sendOneWay("test-topic-1", "这是一条单向消息");
      }

三种发送方式的对比

 

顺序消息

//同步顺序消息[异步顺序 单向顺序写法类似]
        public void testSyncSendOrderly() {
//第三个参数用于队列的选择
            rocketMQTemplate.syncSendOrderly(
                "test-topic-1", 
                "这是一条异步顺序消息",
                "xxxx");
        }

事物消息

 RocketMQ提供了事务消息,通过事务消息就能达到分布式事务的最终一致。

 两个概念

        半事务消息:暂不能投递的消息,发送方已经成功地将消息发送到 RocketMQ服务,但是服务端 未收到生产者对该消息的二次确认,此时该消息被标记成“暂不能投递”状态,处于该种状态下的消息 为半事务消息。

         消息回查:由于网络闪断、生产者应用重启等原因,导致的某条事务消息的二次确认丢失,

 RocketMQ服务端通过扫描发现某条消息长期处于“半事务消息”时,需要主动向消息生产者询问该消息的最终状态(Commit 或者 Rollback),该询问过程 消息回查。

 事务消息发送步骤

   1.发送方 将半事务消息发送至PocketMQ服务端。

    2.RocketMQ服务端将消息持久化之后,向发送方返回Ack确认消息已经发送成功,此时消息为半事务消息。

    3.发送方开始执行本地事务逻辑

    4.发送方根据本地事务执行结果向服务端提交二次确认(Commit 或者 Rollback),服务端收到Commit 状态则将半事务消息标记为可投递,订阅方最终将收到该消息。服务端说到R0llback状态则删除半事务消息,订阅方将不会接受该消息。

事务消息回查步骤

     1.在断网或者是应用重启的特殊情况下,上述步骤4提交的二次确认最终未到达服务端,经过固定时间后服务端将对该数据发起消息回查

     2.发送方收到消息回查后,需要检查对应消息的本地事务执行的最终结果。

     3.发送方根据检查得到的本地事务的最终状态再次提交二次确认,服务端仍然按照步骤4对半事务消息  进行操作。

//事物日志
@Entity(name = "shop_txlog")

@Data
public class TxLog {

        @Id
        private String txLogId;

        private String content;

        private Date date;
}


@Service
public class OrderServiceImpl4 {
    @Autowired

    private OrderDao orderDao;


    @Autowired
    private TxLogDao txLogDao;


    @Autowired
    private RocketMQTemplate rocketMQTemplate;


    public void createOrderBefore(Order order) {

         String txId =  UUID.randomUUID().toString();

        //发送半事务消息

        rocketMQTemplate.sendMessageInTransaction(
                                        "tx_producer_group",

                                        "tx_topic",
                                        MessageBuilder.withPayload(order).setHeader(

                                                                        "txId",
                                                                        txId).build(),

                                        order
                                        );
    }


     //本地事物

@Transactional

public void createOrder(String txId, Order order) {
        //本地事物代码

        orderDao.save(order);

        //记录日志到数据库,回查使用

        TxLog txLog = new TxLog();
        txLog.setTxLogId(txId);

        txLog.setContent("事物测试");

        txLog.setDate(new Date());
        txLogDao.save(txLog);
    }
}

@RocketMQTransactionListener(txProducerGroup = "tx_producer_group")

public class OrderServiceImpl4Listener implements RocketMQLocalTransactionListener {

        @Autowired
         private TxLogDao txLogDao;


        @Autowired
        private OrderServiceImpl4 orderServiceImpl4;


        //执行本地事物

    @Override

    public RocketMQLocalTransactionState executeLocalTransaction(

        Message msg, Object arg) {
        try {
        //本地事物
            orderServiceImpl4.createOrder((String) msg.getHeaders().get("txId"), (Order) arg);
           

            return RocketMQLocalTransactionState.COMMIT;
        } catch (Exception e) {
            return RocketMQLocalTransactionState.ROLLBACK;
        }
    }


    //消息回查

    @Override
    public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
        //查询日志记录
        TxLog txLog = txLogDao.findById((String) msg.getHeaders().get("txId")).get();

        if (txLog == null) {
            return RocketMQLocalTransactionState.COMMIT;
        } else {
            return RocketMQLocalTransactionState.ROLLBACK;
        }
    }
}

消息消费要注意的细节

@RocketMQMessageListener(
        consumerGroup = "shop",//消费者分组

        topic = "order-topic",//要消费的主题
        consumeMode = ConsumeMode.CONCURRENTLY, //消费模式:无序和有序                messageModel = MessageModel.CLUSTERING, //消息模式:广播和集群,默认是集群
)
public class SmsService implements RocketMQListener<Order> {

}

RocketMQ支持两种消息模式:

  • 广播消费: 每个消费者实例都会收到消息,也就是一条消息可以被每个消费者实例处理;

  • 集群消费: 一条消息只能被一个消费者实例消费

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/499988.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

剑指Offer题集(力扣)

文章目录 剑指Offer题集&#xff08;[力扣题单](https://leetcode.cn/problemset/all/?listIdlcof&page1)&#xff09;[剑指 Offer 03. 数组中重复的数字](https://leetcode.cn/problems/shu-zu-zhong-zhong-fu-de-shu-zi-lcof/)[剑指 Offer 04. 二维数组中的查找](https:…

SSM框架练习一(登录后关联数据表的业务模型)

需要实现的整体功能&#xff1a; 登录反馈信息列表展示查询反馈信息发表反馈 1.数据库设计 创建数据库 创建表结构及其约束 添加测试数据 工具&#xff1a;PHP、Navicat create table tab_user(id int primary key auto_increment,uname varchar(30) not null,pwd varc…

Weblogic XMLDecoder 反序列化漏洞(CVE-2017-10271复现)

文章目录 前言影响版本环境搭建漏洞复现深度利用 前言 CVE-2017-10271漏洞产生的原因大致是Weblogic的WLS Security组件对外提供webservice服务&#xff0c;其中使用了XMLDecoder来解析用户传入的XML数据&#xff0c;在解析的过程中出现反序列化漏洞&#xff0c;导致可执行任意…

从搬砖工到架构师,Java全栈学习路线总结

&#x1f307;文章目录 前言一、前置知识二、 Web前端基础示例&#xff1a;1.文本域2.密码字段 三、后端基础一. Java基础二. 数据库技术三. Web开发技术四. 框架技术五. 服务器部署 四、其他技术五、全栈开发六、综合实践七、学习教程一、前端开发二、后端开发三、数据库开发四…

springboot+jsp乡村中小学校园网站建设

随着科学技术的飞速发展&#xff0c;社会的方方面面、各行各业都在努力与现代的先进技术接轨&#xff0c;通过科技手段来提高自身的优势&#xff0c;乡村小学校园网当然也不能排除在外&#xff0c;从校园概况、学校风采、招生信息的统计和分析&#xff0c;在过程中会产生大量的…

Maven依赖原则及如何解决Maven依赖冲突

前言 在大数据应用中&#xff0c;现在发现依赖关系非常复杂&#xff0c;在上线之前很长测试&#xff0c;前一段时间在部署udf 出现了导致生产Hiveserver2 宕机问题&#xff0c;出现严重事故。现在就咨询研究一下。Maven虽然已经诞生多年&#xff0c;但仍然是当前最流行的Java系…

Arrays:点燃你的数组操作技巧的隐秘武器。

前言 数组在 Java 中是一种常用的数据结构&#xff0c;用于存储和操作大量数据。但是在处理数组中的数据&#xff0c;可能会变得复杂和繁琐。Arrays 是我们在处理数组时的一把利器。它提供了丰富的方法和功能&#xff0c;使得数组操作变得更加简单、高效和可靠。无论是排序、搜…

【c语言】字符串类型转换 | itoa函数的使用

创作不易&#xff0c;本篇文章如果帮助到了你&#xff0c;还请点赞 关注支持一下♡>&#x16966;<)!! 主页专栏有更多知识&#xff0c;如有疑问欢迎大家指正讨论&#xff0c;共同进步&#xff01; 给大家跳段街舞感谢支持&#xff01;ጿ ኈ ቼ ዽ ጿ ኈ ቼ ዽ ጿ ኈ ቼ …

MySQL innodb介绍

InnoDB引擎的优点是支持兼容ACID的事务&#xff0c;以及参数完整性&#xff08;即对外键的支持&#xff09;。 Oracle公司2005年10月收购了Innovase&#xff1b;Innobase采用双认证授权。它使用GNU发行&#xff0c;也允许其他想将InnoDB结合到商业软件的团体好的授权 mysql5.…

Java 动态原理详解

Java 动态代理是一种非常重要的编程技术&#xff0c;它在很多场景下都有着广泛的应用。本文将介绍 Java 动态代理的实现原理&#xff0c;并附上相应的源码&#xff0c;以帮助读者更好地理解和应用这一技术。 一、什么是 Java 动态代理&#xff1f; Java 动态代理是一种在运行时…

【并发基础】Happens-Before模型详解

目录 一、Happens-Before模型简介 二、组成Happens-Before模型的八种规则 2.1 程序顺序规则&#xff08;as-if-serial语义&#xff09; 2.2 传递性规则 2.3 volatile变量规则 2.4 监视器锁规则 2.5 start规则 2.6 Join规则 一、Happens-Before模型简介 除了显示引用vo…

双目测距--5 双目相机 联合 YOLOv8

目录 效果&#xff1a; 1、立体矫正不改变图像尺寸 2、视差图尺寸与原图尺寸一致 3、视差图、深度信息图 4、几个重要的函数 createTracker() 5、代码 main.cpp utils.cpp 效果&#xff1a; 1、立体矫正不改变图像尺寸 左右相机图像立体矫正后&#xff0c;图像尺寸为变化…

freeRTOS中使用看门狗的一点思考

关于看门狗想必各位嵌入式软件开发的朋友应该都不会陌生的。在嵌入式软件开发中&#xff0c;看门狗常被用于监测cpu的程序是否正常在运行&#xff0c;如果cpu程序运行异常会由看门狗在达到设定的阈值时触发复位&#xff0c;从而让整个cpu复位重新开始运行。 看门狗的本质是一个…

Qt QQueue 安全的多线程队列、阻塞队列

文章目录 1. C queue 队列基本用法2. Qt QQueue 队列基本用法3. Qt QQueue 多线程队列4. Qt BlockingQueue 自定义线程安全的阻塞队列 1. C queue 队列基本用法 在C中&#xff0c;queue是一个模板类&#xff0c;用于实现队列数据结构&#xff0c;遵循先进先出的原则。 ♦ 常用…

测试3:用例

目录 1.测试用例的基本要素 2.测试用例的设计方法 1.基于需求的设计方法 2.等价类 1.概念 2.步骤: 3.例子 3.边界值 1.概念 2.步骤 3.例子 4.判定表 1.概念 2.设计测试用例 3.例子 5.正交排列 1.什么是正交表 2.测试用例 3.如何通过正交表设计测试用例 6.场景…

(3)Qt——信号槽

目录 1.信号槽的概念** 2.信号槽的连接*** 2.1自带信号 → 自带槽 2.2 自带信号 → 自定义槽 2.3 自定义信号 3. 参数传递** 3.1 全局变量 3.2 信号槽传参 4. 对应关系** 4.1 一对多 4.2 多对一 1.信号槽的概念** 信号槽指的是信号函数与槽函数的连接&#xff0c;可…

AI绘图入门 安装 stable-diffusion-webui

下面介绍了N卡&#xff0c;A卡&#xff0c;或CPU跑 stable-diffusion-webui的方法。 1.安装python 3.10.x https://www.python.org/downloads/ 2.安装Git https://git-scm.com/downloads 【非必要】打开代理工具&#xff08;比如clash&#xff09;然后在cmd配置git的http和…

软件测试相关概念

✏️作者&#xff1a;银河罐头 &#x1f4cb;系列专栏&#xff1a;JavaEE &#x1f332;“种一棵树最好的时间是十年前&#xff0c;其次是现在” 目录 需求需求的定义测试人员眼中的需求为什么需求对测试人员如此重要如何深入理解需求 测试用例定义为什么要有测试用例 软件错误…

IT服务规划设计笔记

规划设计处于整个IT服务生命周期中的前端&#xff0c;其主要目的在于&#xff1a; &#xff08;1&#xff09;设计满足业务需求的IT服务 &#xff08;2&#xff09;设计SLA、测量方法和指标 &#xff08;3&#xff09;设计服务过程及其控制方法 &#xff08;4&#xff09;规…

learn_C_deep_9 (汇编角度理解return的含义、const 的各种应用场景)

return 关键字 不知道我们大家是否有一个疑惑&#xff1a;我们下载一个大型游戏软件&#xff08;王者荣耀&#xff09;&#xff0c;都要花几个小时去下载&#xff0c;但是一旦我们游戏连输&#xff0c;想要删除这个软件的时候&#xff0c;它仅仅只需要十几秒&#xff0c;这是为…