【Spring Boot】Spring Boot 集成 RocketMQ 实现简单的消息发送和消费

news2025/1/10 20:24:35

文章目录

  • 前言
  • 基本概念
    • 消息和主题相关
    • 发送普通消息
  • 发送顺序消息
  • RocketMQTemplate的API介绍
  • 参考资料:

前言

本文主要有以下内容:

  • 简单消息的发送
  • 顺序消息的发送
  • RocketMQTemplate的API介绍

环境搭建:
RocketMQ的安装教程:在官网上下载bin文件,解压到本地,并配置环境变量,如下图所示:
在这里插入图片描述

在 Spring boot 项目中引入 RocketMQ 依赖:

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-spring-boot-starter</artifactId>
    <version>2.2.3</version>
</dependency>

在application.yml增加相关配置:

server:
  port: 10001
rocketmq:
  name-server: 127.0.0.1:9876
  producer:
    group: springboot_produce_group # 必须指定group
    send-message-timeout: 3000 # 消息发送超时时长,默认3s
    retry-times-when-send-failed: 3 # 同步发送消息失败重试次数,默认2
    retry-times-when-send-async-failed: 3 # 异步发送消息失败重试次数,默认2
  consumer:
    group: springboot_consumer_group

在 Spring Boot 中使用RocketMQ很简单直接注入RocketMQTemplate对象即可:

@Resource
private RocketMQTemplate rocketMQTemplate;

基本概念

消息和主题相关

消息 message:通信交互的载体,分为事务消息,半事务消息,延迟消息,顺序消息等。
主题 topic:一类消息的集合,逻辑概念。
队列 queue:主题由一个队列或者多个队列构成,当消息发送到某一个主题时,需要选择某一个队列。
偏移量 offset:消息追加到主题的队列后会分配一个数值,表示该队列的几条消息。
消费者相关:
消费组 consume group:消费组用于订阅主题消费消息,可以订阅多个主题,一个消费组可以有多个消费者。
广播模式:同一个消费组内的所有消费者都会消费订阅主题的所有消息。即一条消息会被该消费者组的所有消费者消费。
集群模式:同一个消费组内的所有消费者只消费订阅主题的一部分消息,即一条消息只会被改消费组的一个消费者消费。
并发消费:同一个队列的消息由多线程消费且不保证消息的顺序。
顺序消费:保证同一队列的消息按顺序消费。

发送普通消息

创建MsgController,代码如下:

@RestController
@RequestMapping("send/")
@CrossOrigin(allowedHeaders = "*", origins = "*")
@Slf4j
public class MsgController {
    @Resource
    private RocketMQTemplate rocketMQTemplate;
    @GetMapping("normal")
    public void sendNormalMsg() {
        Message<String> msg = MessageBuilder.withPayload("Hello,RocketMQ Normal_msg").build();
        rocketMQTemplate.send("normal_msg", msg);
    }
}

创建消息的消费者,只需要实现RocketMQListener接口中的方法即可,代码如下:

@Component
@RocketMQMessageListener(topic = "normal_msg", consumerGroup = "consumer_normal")
@Slf4j
public class NormalMsgConsumer implements RocketMQListener<String> {
    @Override
    public void onMessage(String message) {
        log.info("Receive Normal Msg: {}",message);
    }
}

@RocketMQMessageListener注解用在消费者类上,指定当前类消费的主题。

topic:指定消费者的主题 comsumerGroup:指定消费者组(Consumer Group)名称,用于区分不同的消费者。

启动项目,运行结果如下图所示:
在这里插入图片描述

发送顺序消息

顺序消息:保证同一队列的消息按顺序消费。
在MsgController 中添加如下代码:

@GetMapping("order")
public void sendOrderMsg(){
​
    log.info("开始发送顺序消息");
    for (int j = 0; j < 10; j++) {
        Message<String> sendOrderMsg = MessageBuilder.withPayload("Send Order Msg = " + j + " time: "+ LocalDateTime.now()).build();
        try {
            TimeUnit.SECONDS.sleep(1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        rocketMQTemplate.convertAndSend("msg:order", sendOrderMsg);
    }
    log.info("顺序消息发送结束");
}

创建对应topic消息的消费者,代码如下所示:

@Component
@RocketMQMessageListener(topic = "msg",
        consumerGroup = "consumer_order_group",
        selectorExpression = "order",
        messageModel = MessageModel.CLUSTERING,
        selectorType = SelectorType.TAG)
@Slf4j
public class OrderMsgConsumer implements RocketMQListener<String> {
    @Override
    public void onMessage(String message) {
        log.info("Receive Order Msg: {}",message);
    }
}

@RocketMQMessageListener其他属性介绍:

  • selectorExpression: 消息选择表达式,用于过滤消息,只有满足表达式条件的消息才会被消费。默认值为 *,表示订阅所有消息。

全匹配:*,默认值。
属性匹配:指定tag = ‘tagName’,上面的代码就可以改写为"tag = ‘order’"
表达式匹配:需要指定selectType = SelectorType.SQL92,见下面。

  • selectorType:指明了消息选择通过tag的方式,默认值SelectorType.TAG。可选值有SelectorType.SQL92

TAG:支持"tagName"的方式配置,如果有多个标签则用||进行连接
SQL92:关键字有AND, OR, NOT, BETWEEN, IN, TRUE, FALSE, IS, NULL。支持的数据类型有Boolean, String, Decimal, Float number等。使用方式如(a > 10 AND a < 100) OR (b IS NOT NULL AND b=TRUE)

  • messageModel:消息模式,可选值为 MessageModel.CLUSTERING(默认)或 MessageModel.BROADCASTING,分别表示集群模式和广播模式。

重新启动项目,运行结果如下图所示:
在这里插入图片描述

RocketMQTemplate的API介绍

在上面的api使用中,都没有去关注是否消息发送的状态,如是否成功,发送到了哪一个队列等。接下来就介绍一下相关API的使用

带返回值的发送普通消息SendResult syncSend(String destination, Message<?> message);

在MsgController添加如下代码:

@GetMapping("normal_result")
public void sendNormalResultMsg() {
    Message<String> msg = MessageBuilder.withPayload("normal_return_result").build();
    SendResult normalMsg = rocketMQTemplate.syncSend("normal_msg", msg);
    log.info("normalMsg = {}",normalMsg);
}

在这里插入图片描述

如log所示,可以看到发送状态等信息。

发送异步消息,在MsgController中添加如下代码:

@GetMapping("callback")
public void sendNormalResultMsgWithCallback(){
    Message<String> msg = MessageBuilder.withPayload("normal_return_result").build();
    rocketMQTemplate.asyncSend("normal_msg", msg, new SendCallback() {
        @Override
        public void onSuccess(SendResult sendResult) {
            log.info("success");
        }
        @Override
        public void onException(Throwable throwable) {
            log.info("error");
        }
    });
}

运行结果如下所示:
在这里插入图片描述

发送顺序消息:在第二部分以及展示过了也可以用如下代码替换

rocketMQTemplate.convertAndSend("msg:order", sendOrderMsg);
// 替换为
rocketMQTemplate.syncSendOrderly("msg:order", sendOrderMsg,String.valueOf(j));

发送单向消息

@GetMapping("oneway")
public void  sendOneWay(){
    Message<String> oneWay = MessageBuilder.withPayload("Send Order Msg = " + " time: "+ LocalDateTime.now()).build();
    rocketMQTemplate.sendOneWay("normal_msg",oneWay);
}

运行结果如下图所示:
在这里插入图片描述

发送事务消息:暂不举例,后续补充
发送事务消息带回调:和syncSend()类似,后续补充相关用法。

参考资料:

  • 《RocketMQ 实战》

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/842195.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

JNI之Java实现蓝牙交互

蓝牙概述 蓝牙&#xff0c;是一种支持设备短距离通信&#xff08;一般10m内&#xff09;的无线电技术&#xff0c;能在包括移动电话、PDA、无线耳机、笔记本电脑、相关外设等众多设备之间&#xff0c;通过蓝牙设备之间的无线通信实现数据传输&#xff0c;实现数据传输&#xf…

golang代码热加载,热更新库air库实践

windows下先生成air.exe文件&#xff0c;然后移动到golang的执行目录&#xff1a; 2.简介 air是一款基于golang开发的实时热加载工具&#xff0c;通过使用该工具&#xff0c;使得开发人员能专注于coding&#xff0c;而不会被编译过程打断。 项目地址: https://github.com/cos…

【工作中问题解决实践 九】Spring中事务传播的问题排查

最近在工作中遇到了两个关于事务操作的问题&#xff0c;顺便就着这两个问题又回顾了一遍Spring的事务相关的操作&#xff0c;想着一次性把这个问题研究明白了&#xff0c;后续使用事务的时候也能踏实点&#xff0c;让事务发挥真实的作用 什么是事务&#xff1f;什么是事务管理…

【探索Linux】—— 强大的命令行工具 P.2(Linux下基本指令)

前言 前面我们讲了C语言的基础知识&#xff0c;也了解了一些数据结构&#xff0c;并且讲了有关C的一些知识&#xff0c;也相信大家都掌握的不错&#xff0c;今天博主将会新开一个Linux专题&#xff0c;带领大家继续学习有关Linux的内容。今天第一篇文章博主首先带领大家了解一下…

客服型电话呼叫中心系统,助力企业提升客户服务质量

客服型电话呼叫中心系统是企业客户服务的重要工具之一&#xff0c;它通过电话和网络等方式&#xff0c;为客户提供快速、便捷、高效的服务。客服型电话呼叫中心系统具备自动接听来电、自动路由、管理知识库、录音和监控、生成报表分析等多种功能&#xff0c;有利于企业提高客户…

IP提取器对比器

需求&#xff1a; 一个html 页面 &#xff0c;有两个输入框 第一个输入框输入文本中包含多个ip&#xff0c;输入的ip是不规则的&#xff0c;需要使用正则表达式提取出 输入文本的ip地址 &#xff0c; 然后在第二个输入框中输入内容&#xff0c;并提取出内容的ip &#xff0c;如…

实时渲染与传统渲染有啥区别?实时渲染器有哪些

您是否曾经玩过 3D 视频游戏&#xff0c;或观看过让您感觉身处真实的建筑环境&#xff1f;如果是&#xff0c;那么您已经体验过实时渲染。和传统的渲染有什么不同吗&#xff1f;在本文中了解有关实时渲染的所有信息。 什么是实时渲染&#xff1f; 为了更容易理解什么是实时渲…

jupyter文档转换成markdown

背景 上一篇文章**《如何优雅地用python生成模拟数据》**我就使用jupyter写的&#xff0c;这个真的是万能的&#xff0c;可以插入markdown格式的内容&#xff0c;也可写代码&#xff0c;关键是像ipython一样&#xff0c;可以分步执行。 我可以这样自由的写我的博客内容&#x…

Docker入门——保姆级

Docker概述 ​ —— Notes from WAX through KuangShen 准确来说&#xff0c;这是一篇学习笔记&#xff01;&#xff01;&#xff01; Docker为什么出现 一款产品&#xff1a;开发—上线 两套环境&#xff01;应用环境如何铜鼓&#xff1f; 开发 – 运维。避免“在我的电脑…

【Groups】50 Matplotlib Visualizations, Python实现,源码可复现

详情请参考博客: Top 50 matplotlib Visualizations 因编译更新问题&#xff0c;本文将稍作更改&#xff0c;以便能够顺利运行。 1 Dendrogram 树状图根据给定的距离度量将相似的点组合在一起&#xff0c;并根据点的相似性将它们组织成树状的链接。 新建文件Dendrogram.py: …

怎样在pdf上直接修改?看看这几种修改方法

怎样在pdf上直接修改&#xff1f;PDF是一种非常流行的文件格式&#xff0c;它在保持文档格式不变的同时也可以压缩文件大小&#xff0c;便于分享。尽管 PDF 文件很便捷&#xff0c;但是在 PDF 上进行修改却是一件比较困难的事情。幸运的是&#xff0c;有很多工具可以帮助你在 P…

AUTOSAR笔记2:AP主要模块

1 CM CM&#xff08;Communication Management&#xff09;组件提供独立于网络和协议的应用间通信服 务&#xff0c;支持如下功能&#xff1a; 服务发现&#xff0c;包括服务注册、服务查找等&#xff1b;应用间通信&#xff0c;支持单向数据收发&#xff08;Event&#xff0…

STM32入门——定时器

内容为江科大STM32标准库学习记录 TIM简介 TIM&#xff08;Timer&#xff09;定时器定时器可以对输入的时钟进行计数&#xff0c;并在计数值达到设定值时触发中断16位计数器、预分频器、自动重装寄存器的时基单元&#xff0c;在72MHz计数时钟下可以实现最大59.65s的定时&…

TFTP 的使用操作指南(轻松入门版)

(꒪ꇴ꒪ ),hello我是祐言博客主页&#xff1a;C语言基础,Linux基础,软件配置领域博主&#x1f30d;快上&#x1f698;&#xff0c;一起学习&#xff01;送给读者的一句鸡汤&#x1f914;&#xff1a;集中起来的意志可以击穿顽石!作者水平很有限&#xff0c;如果发现错误&#x…

springCache-缓存

SpringCache 简介&#xff1a;是一个框架&#xff0c;实现了基于注解的缓存功能&#xff0c;底层可以切换不同的cache的实现&#xff0c;具体是通过CacheManager接口实现 使用springcache,根据实现的缓存技术&#xff0c;如使用的redis,需要导入redis的依赖包 基于map缓存 …

一招让你的Python爬虫事半功倍

在Python爬虫的世界里&#xff0c;你是否也被网站的IP封锁问题困扰过&#xff1f;别担心&#xff0c;我来教你一个简单而又有效的爬虫ip设置方法&#xff0c;让你的爬虫畅行无阻&#xff01;快来跟我学&#xff0c;让你的Python爬虫事半功倍&#xff0c;轻松搞定IP封锁问题&…

【室内定位】UWB TDOA定位,PDOA定位介绍

当前室内应用场景&#xff0c;最大的难点是没有基础设施&#xff0c;目前应用的场景中&#xff0c;都是基于用户的需求&#xff0c;或采用 UWB 技术&#xff0c;或采用蓝牙技术&#xff0c;并根据不同的室内环境来定制化的定制化的布设定位网络&#xff0c;并借助同技术的UWB定…

[C++项目] Boost文档 站内搜索引擎(4): 搜索的相关接口的实现、线程安全的单例index接口、cppjieba分词库的使用、综合调试...

有关Boost文档搜索引擎的项目的前三篇文章, 已经分别介绍分析了: 项目背景: &#x1fae6;[C项目] Boost文档 站内搜索引擎(1): 项目背景介绍、相关技术栈、相关概念介绍…文档解析、处理模块parser的实现: &#x1fae6;[C项目] Boost文档 站内搜索引擎(2): 文档文本解析模块…

百模大战,谁是赢家?文心3.5稳坐国内第一,综合评分超ChatGPT!

近日&#xff0c;清华大学新闻与传播学院沈阳团队发布《大语言模型综合性能评估报告》&#xff08;下文简称“报告”&#xff09;&#xff0c;报告显示百度文心一言在三大维度20项指标中综合评分国内第一&#xff0c;超越ChatGPT&#xff0c;其中中文语义理解排名第一&#xff…

取多个元素的整数部分 numpy.fix()

【小白从小学Python、C、Java】 【计算机等级考试500强双证书】 【Python-数据分析】 取多个元素的整数部分 numpy.fix() [太阳]选择题 请问关于以下代码最后的输出结果的是&#xff1f; import numpy as np a [1.6, 2.3, -3.8, -4.2] print("【显示】a",a) print(&…