RocketMQ学习笔记(2)—— 集成SpringBoot

news2025/1/23 14:59:03

前置知识:

RocketMQ学习笔记(1)—— 基础使用-CSDN博客

7.集成SpringBoot

以上所述功能均是通过RocketMQ的原生API实现的,除此之外SpringBoot对于一些功能进行了封装,使用更加方便

7.1 producer

依赖

<!-- rocketmq的依赖 -->
<dependency>
  <groupId>org.apache.rocketmq</groupId>
  <artifactId>rocketmq-spring-boot-starter</artifactId>
  <version>2.0.2</version>
</dependency>

<dependency>
  <groupId>com.alibaba</groupId>
  <artifactId>fastjson</artifactId>
  <version>2.0.25</version>
</dependency>

配置文件

rocketmq:
    name-server: hadoop104:9876
    producer:
        group: boot-producer-group    

配置了name server的ip以及生产者组的名称

消息发送

常用函数如下:

  1. 发送同步消息:syncSend(topic,msg)
  2. 发送异步消息:asyncSend(topic,msg,callback)
  3. 发送单向消息:sendOneWay(topic,msg)
  4. 发送延迟消息:syncSend(topic,msg,timeout,delayLevel)(这里的timeout与延迟时间无关,是消息发送的超时时间)
  5. 发送顺序消息:syncSendOrderly(topic,msg,hashKey)(将hashKey相同的消息放入同一个队列中去)
  6. 发送带tag的消息:syncSend(topic:tag,msg)
  7. 发送带key的消息:MessageBuilder.withPayload(msg).setHeader(RocketMQHeaders.KEYS, key).build();(key是写在消息头中的)

代码:

@Test
void bootProducer() {
    //1.同步
    rocketMQTemplate.syncSend("bootTestTopic","我是一条消息");

    //2.异步
    rocketMQTemplate.asyncSend("bootAsyncTopic", "我是一条异步消息", new SendCallback() {
        @Override
        public void onSuccess(SendResult sendResult) {
            System.out.println("消息发送成功");
        }

        @Override
        public void onException(Throwable e) {
            System.out.println("消息发送异常");
        }
    });

    //3.单向
    rocketMQTemplate.sendOneWay("bootOnewayTopic","我是一条单向消息");

    //4.延迟消息
    Message<String> msgDelay = MessageBuilder.withPayload("我是一条延迟消息").build();
    rocketMQTemplate.syncSend("bootMsTopic",msgDelay,3000,2);

    //5.顺序消息
    List<MsgModel> msgModels = Arrays.asList(
        new MsgModel("qwer", 1, "下单"),
        new MsgModel("qwer", 1, "短信"),
        new MsgModel("qwer", 1, "物流"),
        new MsgModel("zxcv", 2, "下单"),
        new MsgModel("zxcv", 2, "短信"),
        new MsgModel("zxcv", 2, "物流")
    );
    //发送时一般以json格式进行处理
    msgModels.forEach(msgModel -> {
        rocketMQTemplate.syncSendOrderly("bootOrderlyTopic", JSON.toJSONString(msgModel),msgModel.getOrderSn());
    });

    //6.带tag的消息
    rocketMQTemplate.syncSend("bootTgTopic:tagA","我是tagA的消息");

    //7.带key的消息
    Message<String> msgWithKey = MessageBuilder.withPayload("我是带key的消息").setHeader(
        RocketMQHeaders.KEYS, "thisIsAKey"
    ).build();
    rocketMQTemplate.syncSend("bootKeyTopic",msgWithKey);

}

7.2 consumer

依赖

<!-- rocketmq的依赖 -->
<dependency>
  <groupId>org.apache.rocketmq</groupId>
  <artifactId>rocketmq-spring-boot-starter</artifactId>
  <version>2.0.2</version>
</dependency>

<dependency>
  <groupId>com.alibaba</groupId>
  <artifactId>fastjson</artifactId>
  <version>2.0.25</version>
</dependency>

配置文件

server:
    port: 8081
rocketmq:
    name-server: hadoop104:9876

设置springboot的端口号以及name server的ip

代码

基础结构
@Component
@RocketMQMessageListener(
    topic = "bootTestTopic",
    consumerGroup = "boot-test-consumer-group"
)
public class ABootSimpleListener implements RocketMQListener<MessageExt> {

    @Override
    public void onMessage(MessageExt messageExt) {
        System.out.println(Arrays.toString(messageExt.getBody()));
    }
}

1.监听器的使用:

通过注册监听器的形式监听topic中的内容并进行消费,主要是通过继承RocketMQListener类,实现onMessage方法来完成的;

RocketMQListener的泛型如果指定了固定的类型,那么onMessage方法中的参数类型与泛型指定的类型一致;MessageExt类型代表了消息的全部内容

如果消息消费过程中没有报错,则签收;如果报错了,则拒收并重试

2.注解的使用

通过@RocketMQMessageListener注解来为监听器设置一些属性:

如topic,设置订阅的主题;consumerGroup,设置消费者组

接下来发送一条简单的消息:rocketMQTemplate.syncSend("bootTestTopic","我是一条消息");

然后开启consumer监听器的springboot Application,即可进行消费:

顺序消息接收
@Component
@RocketMQMessageListener(
        topic = "bootOrderlyTopic",
        consumerGroup = "boot-orderly-consumer-group",
        consumeMode = ConsumeMode.ORDERLY, //顺序消费模式 单线程
        maxReconsumeTimes = 5 //消费重试的次数
)
public class BOrderlyListener implements RocketMQListener<MessageExt> {

    @Override
    public void onMessage(MessageExt messageExt) {
        MsgModel msgModel = JSON.parseObject(new String(messageExt.getBody()), MsgModel.class);
        System.out.println(msgModel.toString());
    }
}

需要在注解中添加consumeMode = ConsumeMode.ORDERLY,表示以单线程模式进行消费,避免因为并发导致的顺序错误

运行结果如下:

带tag的消息接收
@Component
@RocketMQMessageListener(
        topic = "bootTagTopic",
        consumerGroup = "boot-tag-consumer-group",
        selectorType = SelectorType.TAG,
        selectorExpression = "tagA"
)
public class CTagListener implements RocketMQListener<MessageExt> {
    @Override
    public void onMessage(MessageExt messageExt) {
        System.out.println(new String(messageExt.getBody()));
    }
}

需要通过selectorType指定过滤模式,默认使用tag进行过滤,还可以选择SQL92模式,但一般不用;

通过selectorExpression来设置用于过滤的表达式,默认是*,如果需要选取多个tag使用||分割即可,如tagA || tagB

运行结果如下:

7.3 两种消费模式

Rocketmq消息消费的模式分为两种:负载均衡模式和广播模式

  • 负载均衡模式表示多个消费者交替消费同一个主题里面的消息
  • 广播模式表示每个每个消费者都消费一遍订阅的主题的消息

负载均衡模式

需要在注解中设置:messageModel = MessageModel.CLUSTERING

会将消息均匀地发送到各个队列中去

示例代码:

生产者:

@Test
    void modeTest() throws Exception{
        for (int i = 1; i <= 30; i++) {
            rocketMQTemplate.syncSend("bootModeTopic", ("我是第" + i + "个消息").getBytes());
        }
    }

消费者:

@Component
@RocketMQMessageListener(
        topic = "bootModeTopic",
        consumerGroup = "boot-mode-consumer-group-cluster",
        messageModel = MessageModel.CLUSTERING //集群模式 负载均衡
)
public class cluster1 implements RocketMQListener<MessageExt> {
    @Override
    public void onMessage(MessageExt messageExt) {
        System.out.println("我是cluster组的第一个消费者,消息内容是:" + new String(messageExt.getBody()));
    }
}

一共启动3个消费者,处理逻辑完全相同;

运行结果如下:

我是cluster组的第二个消费者,消息内容是:我是第1个消息
我是cluster组的第一个消费者,消息内容是:我是第2个消息
我是cluster组的第一个消费者,消息内容是:我是第3个消息
我是cluster组的第三个消费者,消息内容是:我是第4个消息
我是cluster组的第二个消费者,消息内容是:我是第5个消息
我是cluster组的第一个消费者,消息内容是:我是第6个消息
我是cluster组的第一个消费者,消息内容是:我是第7个消息
我是cluster组的第三个消费者,消息内容是:我是第8个消息
我是cluster组的第二个消费者,消息内容是:我是第9个消息
我是cluster组的第一个消费者,消息内容是:我是第10个消息
我是cluster组的第一个消费者,消息内容是:我是第11个消息
我是cluster组的第三个消费者,消息内容是:我是第12个消息
我是cluster组的第二个消费者,消息内容是:我是第13个消息
我是cluster组的第一个消费者,消息内容是:我是第14个消息
我是cluster组的第一个消费者,消息内容是:我是第15个消息
我是cluster组的第三个消费者,消息内容是:我是第16个消息
我是cluster组的第二个消费者,消息内容是:我是第17个消息
我是cluster组的第一个消费者,消息内容是:我是第18个消息
我是cluster组的第一个消费者,消息内容是:我是第19个消息
我是cluster组的第三个消费者,消息内容是:我是第20个消息
我是cluster组的第二个消费者,消息内容是:我是第21个消息
我是cluster组的第一个消费者,消息内容是:我是第22个消息
我是cluster组的第一个消费者,消息内容是:我是第23个消息
我是cluster组的第一个消费者,消息内容是:我是第26个消息
我是cluster组的第三个消费者,消息内容是:我是第24个消息
我是cluster组的第二个消费者,消息内容是:我是第25个消息
我是cluster组的第一个消费者,消息内容是:我是第27个消息
我是cluster组的第三个消费者,消息内容是:我是第28个消息
我是cluster组的第二个消费者,消息内容是:我是第29个消息
我是cluster组的第一个消费者,消息内容是:我是第30个消息

从面板中可以看到,消息均衡地发送到各个消息队列中去:

根据下图所示消费方式:

显然有一个消费者订阅了两个队列中地数据,剩下的两个消费者各订阅一个队列中的数据;

广播模式

需要在注解中设置:messageModel = MessageModel.BROADCASTING

示例代码如下:

生产者:

    //广播模式
    @Test
    void modeTest2() throws Exception{
        for (int i = 1; i <= 5; i++) {
            rocketMQTemplate.syncSend("bootModeTopic", ("我是第" + i + "个消息").getBytes());
        }
    }

消费者:

@Component
@RocketMQMessageListener(
        topic = "bootModeTopic",
        consumerGroup = "boot-mode-consumer-group-broadcast",
        messageModel = MessageModel.BROADCASTING //集群模式 负载均衡
)
public class broadcast1 implements RocketMQListener<MessageExt> {
    @Override
    public void onMessage(MessageExt messageExt) {
        System.out.println("我是broadcast组的第一个消费者,消息内容是:" + new String(messageExt.getBody()));
    }
}

一共启动3个消费者,处理逻辑完全相同;

7.4 消息堆积问题

一般认为单条队列消息差值>=10w时 出现消息堆积问题

问题出现原因

1.生产太快了 ,解决方法:

  • 生产方可以做业务限流
  • 增加消费者数量,但是消费者数量<=队列数量,也可以适当的设置最大的消费线程数量(根据IO密集型(2n)/CPU密集型(n+1))
  • 动态扩容队列数量,从而增加消费者数量

2.消费者消费出现问题:排查消费者程序的问题

示例代码

可以通过consumeThreadNumber = 40来设置消费者线程的数量

上述的n即为逻辑处理器的数量:

消费者代码如下:

@Component
@RocketMQMessageListener(topic = "jyTopic",
        consumerGroup = "jy-consumer-group",
        consumeThreadNumber = 40,
        consumeMode = ConsumeMode.CONCURRENTLY
)
public class EJyListener2 implements RocketMQListener<String> {


    @Override
    public void onMessage(String message) {
        System.out.println("我是第二个消费者:" + message);
    }
}

7.5 消息丢失问题

消息在消费过程中可能会丢失,解决方案如下:

①记录消息状态

具体思路:

  1. 生产者使用同步发送模式,收到mq的返回值之后向MySQL数据库中写入一条记录,包括:key(唯一标识),createTime(消息生成时间),status=1(消息状态)
  2. 消费者消费之后,根据相应的key找到这条记录,更新消息的状态,设置status=2
  3. 设置定时任务,每天执行一次,查询数据库中createrTime > 1 day and status = 1的数据,进行补发即可

②设置mq的刷盘机制为同步刷盘

采用同步刷盘可以避免消息在缓冲区(buffer)丢失

③开启trace机制(消息追踪)

  1. 停止运行broker
  2. 修改broker.conf配置文件,添加配置:traceTopicEnable=true
  3. 启动broker即可

代码

生产者:

    //轨迹追踪
    @Test
    void traceTest() throws Exception{
        rocketMQTemplate.syncSend("bootTraceTest","我是一条可追踪轨迹的消息");
    }

需要在application.yml文件中配置开启消息轨迹:

 

消费者:需要设置:enableMsgTrace = true,开启消费者方的轨迹

@Component
@RocketMQMessageListener(
        topic = "bootTraceTest",
        consumerGroup = "boot-trace-consumer-group",
        consumeMode = ConsumeMode.CONCURRENTLY,
        enableMsgTrace = true // 开启消费者方的轨迹
)
public class DTraceTest implements RocketMQListener<MessageExt> {

    @Override
    public void onMessage(MessageExt messageExt) {
        System.out.println(new String(messageExt.getBody()));
    }
}
消息轨迹详情

可以通过topic和messageId来查看消息轨迹:

默认会将消息轨迹的数据存在RMQ_SYS_TRACE_TOPIC主题里面:

7.6 安全机制

配置文件

1.开启acl的控制 在broker.conf中开启aclEnable=true

2.配置账号密码 修改plain_acl.yml

并修改远程连接ip为:whiteRemoteAddress: 192.168.*.*

3.修改控制面板的配置文件,放开52/53行并把49行改为true:

然后上传到服务器的jar包平级目录下即可:

之后重启broker和dashboard即可

注意:启动dashboard时一定要在该jar包目录下,这样才能读取到平级目录下的application.properties配置文件:nohup java -jar rocketmq-dashboard-1.0.0.jar > /opt/module/rocketmq-4.9.2/logs/dashboard.log &

否则会报错:

具体使用

代码中

需要在application.yml配置文件中设置:

生产者:

消费者:

控制面板

需要输入用户密码才能登录

默认是admin/admin

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1474630.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

新火种AI|微软扶持下一个OpenAI?Mistral AI新模型对标GPT-4,上线即挤爆

作者&#xff1a;一号 编辑&#xff1a;美美 OpenAI的大金主微软&#xff0c;还想缔造“下一个OpenAI”。 周一晚间&#xff0c;成立仅9个月的Mistral AI正式发布了最强力的旗舰模型Mistral Large。和此前他们所推出的一系列模型不同&#xff0c;Mistral AI本次发布的版本性…

TikTok矩阵系统的功能展示:深入解析与源代码分享!

今天我来和大家说说TikTok矩阵系统&#xff0c;在当今数字化时代&#xff0c;社交媒体平台已成为人们获取信息、交流思想和娱乐放松的重要渠道&#xff0c;其中&#xff0c;TikTok作为一款全球知名的短视频社交平台&#xff0c;凭借其独特的创意内容和强大的算法推荐系统&#…

有效防止CDN网站被溯源ip的教程

如何反溯源隐藏自己的源IP防止溯源&#xff1f; 还有些大牛会进行渗透攻击、CC攻击&#xff0c;溯源打服务器&#xff0c;各式各样的&#xff0c;防不胜防。所以很多站长套起了cdn&#xff0c;比起cdn提供的加速效果&#xff0c;更多的站长可能还是为了保护那可怜弱小的源站ip…

Docker(运维工具)—— 学习笔记

快速构建、运行、管理应用的工具 一、安装docker 参考Install Docker Engine on Ubuntu | Docker Docs 二、快速入门 1、镜像和容器 docker镜像可以做到忽略操作系统的差异&#xff0c;跨平台运行&#xff0c;忽略安装的差异 当我们利用Docker安装应用时&#xff0c;Dock…

关于机器学习梯度下降法以及牛顿法公式符号的解释

如下图&#xff0c;是公式 如上图红线画出的部分&#xff0c;就是梯度下降法的符号&#xff0c;或者说&#xff0c;是 J(theta) 损失函数的一阶导数 整个公式看起来&#xff0c;就是 theta_new theta_old - (一阶导数/二阶导数)

算法day01_ 27. 移除元素、977.有序数组的平方

推荐阅读 从零开始学数组&#xff1a;深入浅出&#xff0c;带你掌握核心要点 初探二分法 再探二分法 系统的纪录一下刷算法的过程&#xff0c;之前一直断断续续的刷题&#xff0c;半途而废&#xff0c;现在重新开始。话不多说&#xff0c;开冲&#xff01; 27.移除元素 题目 给…

Maven编译报processing instruction can not have PITarget with reserveld xml name

在java项目中&#xff0c;平时我们会执行mvn clean package命令来编译我们的java项目&#xff0c;可是博主今天执行编译时突然报了 processing instruction can not have PITarget with reserveld xml name 这个错&#xff0c;网上也说法不一&#xff0c;但是绝大绝大部分是因…

(二十)devops持续集成开发——使用jenkins的docker插件完成docker项目的流水线发布

前言 本节内容主要介绍jenkins如何集成docker插件&#xff0c;完成docker项目的流水线发布&#xff0c;在前面的章节中我们也介绍过docker项目的发布&#xff0c;可直接通过shell命令调用本地的docker服务完成docker项目的发布&#xff0c;本节内容我们使用docker插件来完成do…

LeetCode--代码详解 43.字符串相乘

43.字符串相乘 题目 给定两个以字符串形式表示的非负整数 num1 和 num2&#xff0c;返回 num1 和 num2 的乘积&#xff0c;它们的乘积也表示为字符串形式。 注意&#xff1a;不能使用任何内置的 BigInteger 库或直接将输入转换为整数。 示例 1: 输入: num1 "2",…

ARM系列 -- 虚拟化(四)

今天来看看虚拟中断。 在一个非虚拟化的系统中&#xff0c;操作系统可以直接访问GIC的寄存器&#xff0c;并且处理GIC的物理中断接口&#xff08;physical interrupt interface&#xff09;。 但是在一个虚拟化的系统中&#xff0c;不是这样。Guest OS并不知道它运行在虚拟系…

ETH网络中的账户

ETH网络中的账户 Externally owned accounts (EOA) - 外部账户 由用户控制&#xff0c;我们导入助记词创建的账户就属于此类账户。 Contract accounts (smart contracts) - 合约账户 合约账户由以太坊虚拟机执行的代码控制。它也被称为智能合约。合约帐户有相关的代码和数据存…

防火墙的内容安全

目录 1. 内容安全 1.1 IAE引擎 DPI---深度包检测技术 DFI---深度流检测技术 结论(优缺点)&#xff1a; 1.2 入侵防御&#xff08;检测&#xff09;(IPS) IPS的优势: 入侵检测的方法: 入侵检测的流程 签名 查看预定义签名的内容 新建自定义签名 入侵防御的检测…

uniapp android 原生插件开发-测试流程

前言 最近公司要求研究一下 uniapp 的 android 原生插件的开发&#xff0c;为以后的工作做准备。这篇文章记录一下自己的学习过程&#xff0c;也帮助一下有同样需求的同学们 : ) 一、下载安装Hbuilder X , Android studio&#xff08;相关的安装配置过程网上有很多&#xff0c;…

width:100%和width:auto有啥区别

项目中使用了with属性&#xff0c;突然好奇auto 和 100% 的区别&#xff0c;特地搜索实践总结了一下观点 一、 width属性介绍二、 代码带入三、 分析比较四、 总结 一、 width属性介绍 width 属性用于设置元素的宽度。width 默认设置内容区域的宽度&#xff0c;但如果 box-siz…

XXE 漏洞简单研究

近期在做个基础的 web 常见漏洞的 ppt&#xff0c;主要参考 OWASP TOP 10 2017RC2&#xff0c;此版本中增加了 XXE 攻击&#xff0c;所以自己简单的研究下 XXE 攻击。XXE&#xff08;XML External Entity&#xff09;XML 外部实体&#xff0c;当前端和后端通信数据采用 xml&…

2. Kubernetes 核心数据结构

1. Group、Version、Resource 核心数据结构 理解 Kubernetes 核心数据结构&#xff0c;在阅读源码时可以事半功倍并能够深刻理解 Kubernetes 核心设计。在整个 Kubernetes 体系架构中&#xff0c;资源是 Kubernetes 最重要的概念&#xff0c;可以说 Kubernetes 的生态系统都围…

28. 找出字符串中第一个匹配项的下标(力扣LeetCode)

文章目录 28. 找出字符串中第一个匹配项的下标题目描述暴力KMP算法 28. 找出字符串中第一个匹配项的下标 题目描述 给你两个字符串 haystack 和 needle &#xff0c;请你在 haystack 字符串中找出 needle 字符串的第一个匹配项的下标&#xff08;下标从 0 开始&#xff09;。…

【PHP】Workerman开源应用容器的GatewayWorker 与 iOS-OC对接

Workerman 开源高性能PHP应用容器 workerman是一款开源高性能PHP应用容器,它大大突破了传统PHP应用范围,被广泛的用于互联网、即时通讯、APP开发、硬件通讯、智能家居、物联网等领域的开发。 PHPSocket.io PHP版本的socket.io,具有良好的客户端兼容性,常用于即时通讯领域…

在CentOS上使用Docker搭建Halo博客并实现远程访问的详细指南

&#x1f308;个人主页&#xff1a;聆风吟 &#x1f525;系列专栏&#xff1a;网络奇遇记、数据结构 &#x1f516;少年有梦不应止于心动&#xff0c;更要付诸行动。 文章目录 &#x1f4cb;前言一. Docker部署Halo1.1 检查Docker版本1.2 在Docker中部署Halo 二. Linux安装Cpol…

亚信安慧AntDB数据库与流式处理的有机融合

流式处理的概念 2001年9月11日&#xff0c;美国世贸大楼被袭击&#xff0c;美国国防部第一次将“主动预警”纳入国防的宏观战略规划。而IBM作为当时全球最大的IT公司&#xff0c;承担了大量基础支撑软件研发的任务。其中2009年正式发布的IBM InfoSphere Streams&#xff0c;就是…