手拉手整合Springboot3+RocketMQ2.3

news2024/11/22 17:00:12

RocketMQ 基本概念

消息模型Message Model

RocketMQ 主要由 Producer、Broker、Consumer 三部分组成,其中 Producer 负责生产消息,Consumer 负责消费消息,Broker 负责存储消息。Broker 在实际部署过程中对应一台服务器,每个 Broker 可以存储多个 Topic 的消息,每个Topic 的消息也可以分片存储于不同的 Broker。Message Queue 用于存储消息的物理地址, Topic 中的消息地址存储于多个 Message Queue 中。ConsumerGroup 由多个 Consumer 实例构成。

消息生产者Producer

负责生产消息,一般由业务系统负责生产消息。一个消息生产者会把业务应用系统里产生的消息发送到 broker 服务器。RocketMQ 提供多种发送方式,同步发送、异步发送、顺序发送、单向发送。同步和异步方式均需要 Broker 返回确认信息,单向发送不需要。

消息消费者Consumer

负责消费消息,一般是后台系统负责异步消费。一个消息消费者会从 Broker 服务器拉取消息、并将其提供给应用程序。提供了两种消费形式:拉取式消费、推动式消费。

主题Topic

表示一类消息的集合,每个主题包含若干条消息,每条消息只能属于一个主题,是RocketMQ 进行消息订阅的基本单位。

代理服务器Broker Server

消息中转角色,负责存储消息、转发消息。代理服务器在 RocketMQ 系统中负责接收从生产者发送来的消息并存储、同时为消费者的拉取请求作准备。代理服务器也存储消息相关的元数据,包括消费者组、消费进度偏移和主题和队列消息等。

名字服务Name Server

名称服务充当路由消息的提供者。生产者或消费者能够通过名字服务查找各主题相应的Broker IP 列表。多个 Namesrv 实例组成集群,但相互独立,没有信息交换。

Pom.xml加入依赖

<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.3.0</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>2.0.32</version>
</dependency>

application.yml配置

rocketmq:
name-server: 192.168.68.133:9876
producer:
#生产者组名,一个应用里面必须唯一
group: test-producer
#消息发送的超时时间 默认3000ms
send-message-timeout: 3000
#消息达到4096字节的时候,消息就会被压缩。默认 4096
compress-message-body-threshold: 4096
#最大的消息限制,默认为128K
max-message-size: 4194304
#同步消息发送失败重试次数
retry-times-when-send-failed: 2
#在内部发送失败时是否重试其他代理,这个参数在有多个broker时才生效
retry-next-server: true
#异步消息发送失败重试的次数
retry-times-when-send-async-failed: 2

消费者监听器

报错就是重试,重试三次后还是报错,就代表消费失败,会重新入队,根据application.yml的配置

@Component
@RocketMQMessageListener(topic = "TopicTest",consumerGroup = "ConsumerGroup-springboot")
public class RocketListener implements RocketMQListener<MessageExt> {

/**
* onMessage 消费者方法
* @param messages 消息内容
*/
@Override
public void onMessage(MessageExt messages) {
/不报错就是签收信息,
//报错就是重试,重试三次后还是报错,就代表消费失败,会重新入队
System.out.println("接收到消息:"+new String(messages.getBody()));

}
}


 

发送同步消息

生产者

* 发送同步消息
* destination 目的地-主题
* payload 消息

@Test
void sendMsg() {
rocketMQTemplate.syncSend("TopicTest", "同步消息");
}

消费者

@Component
@RocketMQMessageListener(topic = "TopicTest",consumerGroup = "ConsumerGroup-springboot")
public class RocketListener implements RocketMQListener<MessageExt> {

/**
* onMessage 消费者方法
* @param messages 消息内容
*/
@Override
public void onMessage(MessageExt messages) {
//不报错就是签收信息,
//报错就是重试,重试三次后还是报错,就代表消费失败,会重新入队 (根据yml配置)
System.out.println("接收到消息:"+new String(messages.getBody()));
}
}

发送异步消息

生产者


* 发送异步消息
* destination 目的地-主题
* payload 消息

@Test
void asyncTest() {

rocketMQTemplate.asyncSend("TopicTest", "异步消息", new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
System.out.println("发送成功");
}

@Override
public void onException(Throwable throwable) {
System.out.println("发送失败");
}
});
}

消费者

@Component
@RocketMQMessageListener(topic = "TopicTest",consumerGroup = "ConsumerGroup-springboot")
public class RocketListener implements RocketMQListener<MessageExt> {

/**
* onMessage 消费者方法
* @param messages 消息内容
*/
@Override
public void onMessage(MessageExt messages) {
//不报错就是签收信息,
//报错就是重试,重试三次后还是报错,就代表消费失败,会重新入队 (根据yml配置)
System.out.println("接收到消息:"+new String(messages.getBody()));
}
}

发送延时消息

生产者

* 发送延时消息
* destination 目的地-主题
* payload 消息
* timestamp 连接超时
* delayLevel 延时级别

@Test
void delayTest() {

Message<String> msg = MessageBuilder.withPayload("延时消息").build();
rocketMQTemplate.syncSend("TopicTest", msg, 3000, 3);
}

消费者

@Component
@RocketMQMessageListener(topic = "TopicTest",consumerGroup = "ConsumerGroup-springboot")
public class RocketListener implements RocketMQListener<MessageExt> {

/**
* onMessage 消费者方法
* @param messages 消息内容
*/
@Override
public void onMessage(MessageExt messages) {
//不报错就是签收信息,
//报错就是重试,重试三次后还是报错,就代表消费失败,会重新入队 (根据yml配置)
System.out.println("接收到消息:"+new String(messages.getBody()));
}
}

发送单向消息

生产者

* 发送单向消息
* destination 目的地-主题
* payload 消息

@Test
void OneWayTest() {

rocketMQTemplate.sendOneWay("TopicTest", "单向消息");
}

消费者

@Component
@RocketMQMessageListener(topic = "TopicTest",consumerGroup = "ConsumerGroup-springboot")
public class RocketListener implements RocketMQListener<MessageExt> {

/**
* onMessage 消费者方法
* @param messages 消息内容
*/
@Override
public void onMessage(MessageExt messages) {
//不报错就是签收信息,
//报错就是重试,重试三次后还是报错,就代表消费失败,会重新入队 (根据yml配置)
System.out.println("接收到消息:"+new String(messages.getBody()));
}
}

发送顺序消息

顺序消息 生产者需要将一组消息都发送到同一个队列 ,消费者需要单线程消费

生产者

生产者需要将一组消息都发送到同一个队列

List<MessageM> messageMs = Arrays.asList(
new MessageM("sn0001", 1, "下单"),
new MessageM("sn0001", 1, "付款"),
new MessageM("sn0001", 1, "配送"),
new MessageM("sn0002", 2, "下单"),
new MessageM("sn0002", 2, "付款"),
new MessageM("sn0002", 2, "配送")
);


@Test
void orderlyTest() {
/**

* destination 目的地-主题
* payload 消息
*/
for (MessageM messageM : messageMs) {
rocketMQTemplate.syncSendOrderly("orderlyTest", JSON.toJSON(messageM), messageM.getSn());
}
}

消费者

CONCURRENTLY 同时

ORDERLY有序

消费者需要单线程消费

@Component
@RocketMQMessageListener(topic = "orderlyTest",consumerGroup = "orderly",consumeMode = ConsumeMode.ORDERLY)
public class orderlyListener implements RocketMQListener<MessageExt> {

@Override
public void onMessage(MessageExt messageExt) {
MessageM messageM = JSON.parseObject(new String(messageExt.getBody()), MessageM.class);
System.out.println(messageM);
}
}

发送带标签tag

生产者

@Test
void ProducerTagTest(){
rocketMQTemplate.syncSend("TagMQ:tagA","带tagA的消息");
rocketMQTemplate.syncSend("TagMQ:tagB","带tagB的消息");
}

消费者

@Component
@RocketMQMessageListener(topic = "TagMQ",
consumerGroup = "TagMQGroup",
selectorType = SelectorType.TAG, //tag过滤模式
selectorExpression = "tagA || tagB"

)
public class MsgListenerTag implements RocketMQListener<MessageExt> {

@Override
public void onMessage(MessageExt messageExt) {
System.out.println(new String( messageExt.getBody()));
}
}

发送带Key消息

Key带在消息头中

生产者

@Test
void keyTest(){
String Key = UUID.randomUUID().toString();
Message<String> msg = MessageBuilder
.withPayload("带key消息").
setHeader(RocketMQHeaders.KEYS, Key)
.build();
/**
* 带Key消息
*/
rocketMQTemplate.syncSend("ketTopic",msg);
}

消费者

@Component
@RocketMQMessageListener(topic = "ketTopic",consumerGroup = "ketConsumerGroup-springboot")
public class keyMQListener implements RocketMQListener<MessageExt> {

/**
* onMessage 消费者方法
* @param messages 消息内容
*/
@Override
public void onMessage(MessageExt messages) {
//不报错就是签收信息,
//报错就是重试,重试三次后还是报错,就代表消费失败,会重新入队 (根据yml配置)
System.out.println("接收到消息:"+new String(messages.getBody()));
System.out.println("key:"+messages.getKeys());
}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1532101.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【报错】使用gradio渲染html页面无法加载本地图片

【报错】使用gradio渲染html页面无法加载本地图片 【报错】使用gradio渲染html页面无法加载本地图片[HTML] how to load local image by html output #884成功解决 【报错】使用gradio渲染html页面无法加载本地图片 在使用gradio框架渲染html页面&#xff0c;使用绝对路径&quo…

获取cookie

在Servlet9里设置cookie 在Servlet10里进行获取 访问Servlet9.do&#xff0c;再访问Servlet10.do

图书馆RFID(射频识别)数据模型压缩/解压缩算法实现小工具

1. 前言 最近闲来无聊&#xff0c;看了一下《图书馆射频识别数据模型第1部分&#xff1a;数据元素的设置及应用规则》以及《图书馆射频识别数据模型第2部分&#xff1a;基于ISO/IEC 15962的数据元素编码方案》&#xff0c;决定根据上面的编码方法实现一下该算法&#xff0c;于…

PyTorch 深度学习(GPT 重译)(五)

十二、通过指标和增强改进训练 本章涵盖 定义和计算精确率、召回率以及真/假阳性/阴性 使用 F1 分数与其他质量指标 平衡和增强数据以减少过拟合 使用 TensorBoard 绘制质量指标图 上一章的结束让我们陷入了困境。虽然我们能够将深度学习项目的机制放置好&#xff0c;但实…

聚类算法之层次聚类(Hierarchical Clustering)

注意&#xff1a;本文引用自专业人工智能社区Venus AI 更多AI知识请参考原站 &#xff08;[www.aideeplearning.cn]&#xff09; 层次聚类是一种非常独特和强大的聚类方法&#xff0c;与众多其他的聚类技术相比&#xff0c;它不仅为数据集提供了一个划分&#xff0c;还给出了…

3d模型文件导入时没颜色---模大狮模型网

导入3D模型文件时没有颜色显示通常是由于软件在显示模型时未正确解释颜色信息。这可能是由于模型文件本身没有包含颜色信息&#xff0c;或者是软件在导入过程中未正确处理颜色数据所致。 以下是一些可能的解决方法&#xff1a; 检查3D模型文件&#xff1a;首先&#xff0c;确保…

网络简略总结

目录 一、三次握手 四次挥手 1、三次握手:为了建立长链接进行交互即建立一个会话,使用http/https协议 2、四次挥手是一个断开连接释放服务器资源的过程 3、如果已经建立了连接,但是客户端突然出现故障了怎么办? 4、谁可以中断连接?客户端还是服务端还是都可以? 5、…

Linux环境开发工具之vim

前言 上一期我们已经介绍了软件包管理器yum&#xff0c; 已经可以在linux上查找、安装、卸载软件了&#xff0c;本期我们来介绍一下文本编辑器vim。 本期内容介绍 什么是vim vim的常见的模式以及切换 vim命令模式常见的操作 vim底行模式常见的操作 解决普通用户无法执行sudo问…

5G智能网关助力工业铸造设备监测升级

随着物联网技术的迅猛发展和工业4.0浪潮的推进&#xff0c;传统工业正面临着严峻的转型升级压力。在这一背景下&#xff0c;铸造行业——这一典型的传统重工业领域&#xff0c;也必须积极探索借助5G、物联网、边缘计算等技术提升生产经营效率的新路径。 本文就基于佰马合作伙伴…

【晴问算法】提高篇—动态规划专题—最大连续子序列和

题目描述 输入描述 输出描述 输出一个整数&#xff0c;表示最大连续子序列和。 样例1 输入 6 -2 11 -4 13 -5 -2 输出 20 解释 连续子序列和的最大值为&#xff1a; #include<bits/stdc.h> using namespace std; const int MAXN 100; int dp[MAXN];//dp[i]表示以a[i]元…

酷开科技聚焦大屏端数据研究,构建酷开系统深度挖掘大屏商业价值

中国所有的彩色大屏中&#xff0c;智能电视规模已经过半&#xff0c;OTT平台的数据价值越发引起人们关注。作为OTT行业的头部代表&#xff0c;酷开科技一直聚焦大屏端数据研究&#xff0c;目前已经形成一套基于大屏指数的智慧营销体系&#xff0c;让OTT大屏的数字营销化水平实现…

无人机/飞控--ArduPilot、PX4学习记录(2)

这是一篇碎碎念&#xff0c;零零碎碎的记录了环境配置过程&#xff0c;仅供本人记录学习历程和参考。(记录的挺乱的&#xff0c;但是文章链接里的博客写的是真好) 本章主要完成的目标&#xff1a; 安装PX4 并 成功运行出3D无人机界面。 参考文章&#xff1a; 搭建PX4环境&…

【LeetCode-114.二叉树展开为链表】

题目详情&#xff1a; 给你二叉树的根结点 root &#xff0c;请你将它展开为一个单链表&#xff1a; 展开后的单链表应该同样使用 TreeNode &#xff0c;其中 right 子指针指向链表中下一个结点&#xff0c;而左子指针始终为 null 。展开后的单链表应该与二叉树 先序遍历 顺序…

水果软件FL Studio 21 for mac 21.2.3.3586破解版的最新版本2024介绍安装

音乐是人类最美好的语言&#xff0c;它能够跨越国界、文化和语言&#xff0c;将人们紧密地联系在一起。在当今数字化时代&#xff0c;音乐创作已经不再是专业人士的专利&#xff0c;越来越多的音乐爱好者开始尝试自己动手制作音乐。而FL Studio21中文版编曲软件正是这样一个为你…

软件架构对于项目质量的影响

时间&#xff1a;2024年03月20日 作者&#xff1a;小蒋聊技术 邮箱&#xff1a;wei_wei10163.com 微信&#xff1a;wei_wei10 音频地址&#xff1a; 软件架构对于项目质量的影响在线收听-喜马拉雅 前言 大家好&#xff0c;欢迎来到小蒋聊技术&#xff0c;小蒋准备和大家一…

优先级队列(堆)(2)

目录 一. PriorityQueue的特性 二. PriorityQueue常用接口介绍 1. 优先级队列的构造 2. 转成大根堆存储方法: 3. 插入/删除/获取优先级最高的元素 三. Top-k问题 一. PriorityQueue的特性 Java 集合框架中提供了 PriorityQueue 和 PriorityBlockingQueue 两种类型的优先…

PowerShell 无法保留窗口和字体设置

那么&#xff0c;首先&#xff0c;得亮一下版本&#xff0c;默认软件&#xff1a; PS C:\Windows> $PSVersionTableName Value ---- ----- PSVersion 5.1.19041.4170 PSEdition …

Win11初始化系统遇一文解决

这个是目录 一、设置内的初始化无法使用时&#xff0c;使用以下工具二、将桌面移动到D盘三、解决win11桌面右键创建只有一个带盾牌的文件夹问题四、win11 系统停止更新五、office安装1、使用的是 Office Tool plus2、使用WPS 六、D盘有感叹号七、打开组策略编辑器(gpedit.msc)失…

安卓转鸿蒙能有多适配?简直了……

到现在为止&#xff0c;想必很多开发者都或多或少 了解过鸿蒙。许多企业也都已经加入了鸿蒙业务&#xff0c;半推半就的开始学习鸿蒙开发。那么鸿蒙到底好不好搞呢&#xff1f; 首先可以肯定的一点&#xff0c;对于做安卓的来说鸿蒙非常搞&#xff0c;究竟有多好搞呢&#xff…

SQL语句之SELECT语句

一般格式 SELECT DISTINCT/ALL 目标列表达式 //要显示的属性列 FROM 表名/视图名 //查询的对象 WHERE 条件表达式 //查询条件 GROUP BY 列名 HAVING 条件表达式 //查询结果分组 ORDER BY 列名 次序; //最终查询结果排序 文章目录 一、基本查询 1、SELECT 目标列表达…