RocketMQ(三):集成SpringBoot

news2025/1/20 18:40:03

RocketMQ系列文章

RocketMQ(一):基本概念和环境搭建

RocketMQ(二):原生API快速入门

RocketMQ(三):集成SpringBoot


目录

  • 一、搭建环境
  • 二、不同类型消息
    • 1、同步消息
    • 2、异步消息
    • 3、单向消息
    • 4、延迟消息
    • 5、顺序消息
    • 6、带tag消息
    • 7、带key消息

一、搭建环境

  • 需要创建两个服务,消息生产服务和消息消费者服务
  • 生产消息存在多个服务,消费则统一由一个服务处理
  • 这样可以做到解耦

pom.xml

  • 生产者和消费者都需要
<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-spring-boot-starter</artifactId>
    <version>2.2.2</version>
</dependency>

生产者配置文件

  • 设置统一的生产者组,这样发送消息时就不用指定了
rocketmq:
    name-server: 127.0.0.1:9876     # rocketMq的nameServer地址
    producer:
        group: boot-producer-group        # 生产者组别
        send-message-timeout: 3000  # 消息发送的超时时间
        retry-times-when-send-async-failed: 2  # 异步消息发送失败重试次数
        max-message-size: 4194304       # 消息的最大长度

生产者配置文件

  • 不能设置统一的消费者组,因为不同的消费者订阅关系不一致,需要设置不同的消费者组
rocketmq:
    name-server: localhost:9876

二、不同类型消息

直接引入即可

@Autowired
private RocketMQTemplate rocketMQTemplate;

1、同步消息

生产消息

  • 消息由消费者发送到broker后,会得到一个确认,是具有可靠性的
  • 比如:重要的消息通知,短信通知等
rocketMQTemplate.syncSend("bootTestTopic", "我是boot的一个消息");

消费消息

  • RocketMQListener的泛型类型即消息类型
    • MessageExt类型是消息的所有内容
    • 其他类型则就只是消息体内容,没有消息头内容(keys、msgId、延迟时间、重试次数、主题名称...)
  • onMessage方法内没有报错就是签收了,报错就是拒收会重试
@Component
@RocketMQMessageListener(topic = "bootTestTopic", consumerGroup = "boot-test-consumer-group")
public class ABootSimpleMsgListener implements RocketMQListener<MessageExt> {
    @Override
    public void onMessage(MessageExt message) {
        System.out.println(new String(message.getBody()));
    }
}

2、异步消息

  • 发送异步消息,发送完以后会有一个异步通知
  • 不影响程序往下执行
rocketMQTemplate.asyncSend("bootAsyncTestTopic", "我是boot的一个异步消息", new SendCallback() {
    @Override
    public void onSuccess(SendResult sendResult) {
        System.out.println("成功");
    }
    @Override
    public void onException(Throwable throwable) {
        System.out.println("失败" + throwable.getMessage());
    }
});

3、单向消息

  • 不关心发送结果的场景,这种方式吞吐量很大,但是存在消息丢失的风险
  • 例如日志信息的发送
rocketMQTemplate.sendOneWay("bootOnewayTopic", "单向消息");

4、延迟消息

  • RocketMQ不支持任意时间的延时
  • 只支持以下18个固定的延时等级,等级1就对应1s,以此类推,最高支持2h延迟
  • private String messageDelayLevel = “1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h”;
  • 发送一个延时消息,延迟等级为4级,也就是30s后被监听消费
Message<String> msg = MessageBuilder.withPayload("我是一个延迟消息").build();
rocketMQTemplate.syncSend("bootMsTopic", msg, 3000, 4);

5、顺序消息

生产消息

  • 根据syncSendOrderly方法的第三个参数计算hash值决定消息放入哪个队列
// 顺序消息 发送者放 需要将一组消息 都发在同一个队列中去  消费者 需要单线程消费
List<MsgModel> msgModels = Arrays.asList(
        new MsgModel("qwer", 1, "下单"),
        new MsgModel("qwer", 1, "短信"),
        new MsgModel("qwer", 1, "物流"),

        new MsgModel("zxcv", 2, "下单"),
        new MsgModel("zxcv", 2, "短信"),
        new MsgModel("zxcv", 2, "物流")
);
msgModels.forEach(msgModel -> {
    // 发送  一般都是以json的方式进行处理
    // 根据第三个参数计算hash值决定消息放入哪个队列
    rocketMQTemplate.syncSendOrderly("bootOrderlyTopic", JSON.toJSONString(msgModel), msgModel.getOrderSn());
});

消费消息

  • 默认是并发消费模式,可以设置为单线程顺序模式
  • 设置消费重试次数
@Component
@RocketMQMessageListener(topic = "bootOrderlyTopic",
        consumerGroup = "boot-orderly-consumer-group",
        consumeMode = ConsumeMode.ORDERLY, // 顺序消费模式 单线程
        maxReconsumeTimes = 5 // 消费重试的次数
)
public class BOrderlyMsgListener implements RocketMQListener<MessageExt> {
    @Override
    public void onMessage(MessageExt message) {
        MsgModel msgModel = JSON.parseObject(new String(message.getBody()), MsgModel.class);
        System.out.println(msgModel);
    }
}

6、带tag消息

  • tag带在主题后面用:来携带
rocketMQTemplate.syncSend("bootTagTopic:tagA", "我是一个带tag的消息");

7、带key消息

Message<String> message = MessageBuilder
        .withPayload("我是一个带key的消息")
        .setHeader(RocketMQHeaders.KEYS, "10086")
        .build();
rocketMQTemplate.syncSend("bootKeyTopic", message);

获取带key和tag的消费者

  • 过滤模式有两种:正则表达式和sql92方式
  • keys从MessageExt对象中获取
@Component
@RocketMQMessageListener(topic = "bootTagTopic",
        consumerGroup = "boot-tag-consumer-group",
        selectorType = SelectorType.TAG,// tag过滤模式
        selectorExpression = "tagA || tagB"
//        selectorType = SelectorType.SQL92,// sql92过滤模式
//        selectorExpression = "a in (3,5,7)" // broker.conf中开启enbalePropertyFilter=true
)
public class CTagMsgListener implements RocketMQListener<MessageExt> {

    @Override
    public void onMessage(MessageExt message) {
        System.out.println("获取keys: " + message.getKeys());
        System.out.println("消息内容: " + new String(message.getBody()));
    }
}

查看源码

  • destination目标 = 主题 : 标签
  • keys从消息头里面获取

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1227935.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【IPC】 共享内存

1、概述 共享内存允许两个或者多个进程共享给定的存储区域。 共享内存的特点 1、 共享内存是进程间共享数据的一种最快的方法。 一个进程向共享的内存区域写入了数据&#xff0c;共享这个内存区域的所有进程就可以立刻看到 其中的内容。 2、使用共享内存要注意的是多个进程…

如何定位el-tree中的树节点当父元素滚动时如何定位子元素

使用到的方法 Element 接口的 scrollIntoView() 方法会滚动元素的父容器&#xff0c;使被调用 scrollIntoView() 的元素对用户可见。 参数 alignToTop可选 一个布尔值&#xff1a; 如果为 true&#xff0c;元素的顶端将和其所在滚动区的可视区域的顶端对齐。相应的 scrollIntoV…

C语言入门笔记—static、extern、define、指针、结构体

一、static static修饰局部变量的时候&#xff0c;局部变量出了作用域&#xff0c;不销毁。本质上&#xff0c;static修饰局部变量的时候&#xff0c;改变了变量的存储位置。详见下图&#xff0c;当a不被static修饰和被static修饰的时候。 C/C static关键字详解&#xff…

随机过程-张灏

文章目录 导论随机过程相关 学习视频来源&#xff1a;https://www.bilibili.com/video/BV18p4y1u7NP?p1&vd_source5e8340c2f2373cdec3870278aedd8ca4 将持续更新—— 第一次更新&#xff1a;2023-11-19 导论 教材&#xff1a;《随机过程及其应用》陆大絟.张颢 参考&…

CD36 ; + Lectin;

CD2 LIMP-2&#xff0c; LGP85 SR-BI&#xff0c; CD36&#xff1b; 清道夫受体蛋白CD36超家族的成员是 脂质代谢 和 先天免疫 的重要调节因子。它们识别正常和修饰的脂蛋白&#xff0c;以及与病原体相关的分子模式。 该家族由三个成员组成&#xff1a; SR-BI &am…

ZJU Beamer学习手册(二)

ZJU Beamer学习手册基于 Overleaf 的 ZJU Beamer模板 进行解读&#xff0c;本文则基于该模版进行进一步修改。 参考文献 首先在frame文件夹中增加reference.tex文件&#xff0c;文件内容如下。这段代码对参考文献的引用进行了预处理。 \usepackage[backendbiber]{biblatex} \…

异常语法详解

异常语法详解 一&#xff1a;异常的分类&#xff1a;二&#xff1a;异常的处理1:异常的抛出:throw2&#xff1a;异常的声明:throws3&#xff1a;try-catch捕获并处理异常 三&#xff1a;finally关键字四&#xff1a;自定义异常类&#xff1a; 一&#xff1a;异常的分类&#xf…

电路的基本原理

文章目录 一、算数逻辑单元(ALU)1、功能2、组成 二、电路基本知识1、逻辑运算2、复合逻辑 三、加法器实现1、一位加法器2、串行加法器3、并行加法器 一、算数逻辑单元(ALU) 1、功能 算术运算&#xff1a;加、减、乘、除等 逻辑运算&#xff1a;与、或、非、异或等 辅助功能&am…

对vb.net 打印条形码code39、code128A、code128C、code128Auto(picturebox和打印机)封装类一文的补充

在【精选】vb.net 打印条形码code39、code128A、code128C、code128Auto&#xff08;picturebox和打印机&#xff09;封装类_vb.net打印标签_WormJan的博客-CSDN博客 这篇文章中&#xff0c;没有对含有字母的编码进行处理。这里另开一篇帖子&#xff0c;处理这种情况。 在那篇文…

【C++入门】拷贝构造运算符重载

目录 1. 拷贝构造函数 1.1 概念 1.2 特征 1.3 常用场景 2. 赋值运算符重载 2.1 运算符重载 2.2 特征 2.3 赋值运算符 前言 拷贝构造和运算符重载是面向对象编程中至关重要的部分&#xff0c;它们C编程中的一个核心领域&#xff0c;本期我详细的介绍拷贝构造和运算符重载。 1. …

Js中clientX/Y、offsetX/Y和screenX/Y之间区别

Js中client、offset和screen的区别 前言图文解说实例代码解说 前言 本文主要讲解JavaScript中clientX、clientY、offsetX、offsetY、screenX、screenY之间的区别。 图文解说 在上图中&#xff0c;有三个框&#xff0c;第一个为屏幕&#xff0c;第二个为浏览器大小&#xff0c…

约数个数定理

首先在讲这个定理前&#xff0c;首先科普一下前置知识 约数&#xff1a; 何为约数&#xff0c;只要能整除n的整数就是n的约数&#xff0c;举个例子&#xff0c;3的约束是1和3因为1和3能整除3 质数&#xff1a; 除了这个数字本身和1以外没有其他因子的数字就叫质数&#xff…

AVL树和红黑树

AVL树和红黑树 一、AVL树1. 概念2. 原理AVL树节点的定义插入不违反AVL树性质违反AVL树性质左单旋右单旋左右双旋右左双旋总结 删除 3. 验证代码4. AVL树完整实现代码 二、红黑树1. 概念2. 性质3. 原理红黑树节点的定义默认约定插入情况一 &#xff08;u存在且为红&#xff09;情…

论文速览 Arxiv 2023 | DMV3D: 单阶段3D生成方法

注1:本文系“最新论文速览”系列之一,致力于简洁清晰地介绍、解读最新的顶会/顶刊论文 论文速览 Arxiv 2023 | DMV3D: DENOISING MULTI-VIEW DIFFUSION USING 3D LARGE RECONSTRUCTION MODEL 使用3D大重建模型来去噪多视图扩散 论文原文:https://arxiv.org/pdf/2311.09217.pdf…

【2017年数据结构真题】

请设计一个算法&#xff0c;将给定的表达式树&#xff08;二叉树&#xff09;转换成等价的中缀表达式&#xff08;通过括号反映次序&#xff09;&#xff0c;并输出。例如&#xff0c;当下列两棵表达式树作为算法的输入时&#xff1a; 输出的等价中缀表达式分别为(ab)(a(-d)) 和…

OpenAI Assistants-API简明教程

OpenAI在11月6号的开发者大会上&#xff0c;除了公布了gpt4-v、gpt-4-turbo等新模型外&#xff0c;还有一个assistants-api&#xff0c;基于assistants-api开发者可以构建自己的AI助手&#xff0c;目前assistants-api有三类的工具可以用。首先就是之前大火的代码解释器(Code In…

隐式转换导致索引失效的原因

Num1 int Num2 varchar Str1不能为null Str2可null 例子1&#xff1a; 结果&#xff1a;124非常快&#xff0c;0.001~0.005秒出结果。3最慢&#xff0c;4~5秒出结果。 查询执行计划&#xff1a;124索引扫描。3全表扫描。 解释&#xff1a;首先四个23都产生隐式转换&#x…

第7天:信息打点-资产泄漏amp;CMS识别amp;Git监控amp;SVNamp;DS_Storeamp;备份

第7天&#xff1a;信息打点-资产泄漏&CMS识别&Git监控&SVN&DS_Store&备份 知识点&#xff1a; 一、cms指纹识别获取方式 网上开源的程序&#xff0c;得到名字就可以搜索直接获取到源码。 cms在线识别&#xff1a; CMS识别&#xff1a;https://www.yun…

【Gradle-13】SNAPSHOT版本检查

1、什么是SNAPSHOT SNAPSHOT版本是指尚未发布的版本&#xff0c;是一个「动态版本」&#xff0c;它始终指向最新的发布工件&#xff08;gav&#xff09;&#xff0c;也就是说同一个SNAPSHOT版本可以反复用来发布。 这种情况在大型app多团队的开发中比较常见&#xff0c;比如us…

【Linux系统化学习】进程的状态 | 僵尸进程 | 孤儿进程

个人主页点击直达&#xff1a;小白不是程序媛 Linux专栏&#xff1a;Linux系统化学习 目录 操作系统进程的状态 运行状态 阻塞状态 进程阻塞的现象 挂起阻塞状态 Linux进程状态 Linux内核源代码怎么说 R&#xff08;running状态&#xff09;运行状态 S&#xff08;sl…