结合Java代码实现RocketMQ的生产与消费消息

news2025/1/12 18:47:08

前言

在前面的文章中,已经详细介绍并使用到了消息生产者,消息消费者,broker等集群相关的知识,这篇文章介绍一下其他的小组件以及使用Java代码实现生产者对消息的生成,消费者消费消息等知识点。

希望这篇文章能帮助到正在学习RocketMQ知识点的小伙伴儿们!!!

在这里插入图片描述

RocketMQ其他组件

在RocketMQ中,除了生产者,消费者,还有一些其他的小组件,接下来逐一介绍一下他们。

监听器(Listener)

定义:监听器是消费者用于处理消息的组件。在PushConsumer(推)模式下,消费者客户端必须设置消费监听器,以便在接收到消息时执行相应的处理逻辑。(比如一会儿下面的代码)

偏移量(Offset)

定义:偏移量是指在消费消息时,记录消费者已经消费到的消息位置的值。每个消息都有一个唯一的偏移量值,它代表了消息在消息队列中的位置。

偏移量具有很大的作用:它能够保证消费者在重启或宕机后能够从上次消费的位置继续消费消息,避免重复消费或漏消费。

简而言之就是它能告诉消费者已经消费到哪一条消息!!

  • 集群模式:在集群消费模式下,消息队列的消费进度保存在Broker端。消费者每次消费完消息后,会将最新的消费进度同步到Broker,以便在消费者重启或者故障转移的时候能够从上一次消费的位置继续消费。
  • 广播模式:在广播消费模式下,消息队列的消费进度保存在消费者本地。因为广播模式下每条消息都会被所有消费者消费,所以不需要在Broker端保存消费进度。

所以,偏移量的的实现方式有两种:包括存储在本地文件(OffsetStore)和存储在Broker中这两种方式。(这样一看,清晰了吧)

在这里插入图片描述

命名服务器(NameServer)

这个上几篇文章介绍过并且用到过,这个比较重要再介绍一下!

定义:命名服务器是RocketMQ中的轻量级路由服务,存储生产者和消费者与Broker之间的路由信息。

它的作用:提供Broker的动态注册与发现服务,生产者和消费者通过NameServer查询Broker的路由信息,从而进行消息的投递和消费。

消息组成

  1. Topic:消息主题,对不同的业务消息进行分类。
  2. Tag:消息标签,进一步区分某个Topic下的消息分类。使用Tag可以实现对Topic中的消息进行过滤。消费者可以根据Tag来订阅自己感兴趣的消息,而不是接收Topic下的所有消息。
  3. Message Body:消息体,消息的实际内容。
  4. Keys:消息的键值,标识消息的唯一性。在RocketMQ中,每个消息都可以设置Keys字段,以便在需要的时候根据Keys来查询或者定位消息。
  5. 属性:除了上面滴,RocketMQ的消息还可以包含一系列的属性信息,比如消息的发送时间、生产者信息等等。这些属性信息以键值对的形式存在,随着消息一起被存储和传输。

实现生产与消费消息

按之前的步骤搭建完成RocketMQ集群后…

首先我们创建一个空的maven工程,在pom.xml文件中添加RocketMQ的依赖(RocketMQ的依赖版本需要与虚拟机中的保持一致,这里选择和之前一样的4.7.1版本):

        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-client</artifactId>
            <version>4.7.1</version>
        </dependency>

生产消息

然后编写生产者生产消息的代码:

// 1.创建一个DefaultMQProducer实例,指定生产者组名 
DefaultMQProducer producer = new DefaultMQProducer("my-producer-group1");  
// 2.设置NameServer的地址
producer.setNamesrvAddr("192.168.220.135:9876");    
// 3.启动生产者实例  
producer.start();   
// 4.使用for循环发送10条消息  
for (int i = 0; i < 10; i++) {  
    // 创建一条消息,指定Topic为"MyTopic1",Tag标签为"TagA",消息体为"hello rocketmq"加上循环变量的值,同时把字符串转换为字节数组  
    Message message = new Message("MyTopic1","TagA",("hello rocketmq"+i).getBytes(StandardCharsets.UTF_8));       
    // 5.发送消息并接收发送结果  
    SendResult sendResult = producer.send(message);  
    // 打印发送结果,包括消息ID、发送状态等信息  
    System.out.println(sendResult);  
}  
// 6.发送完所有消息后,关闭生产者实例,释放资源  
producer.shutdown();

生产者生产消息和消费者消费消息这块的代码都相对较为简单,已经在代码块中加了注释,这里就不再赘述了。

这个时候就可以访问虚拟机+端口号来搜索到发送的消息详情了!
在这里插入图片描述

在这里插入图片描述

消费消息

// 1.和生产者一样,创建一个DefaultMQPushConsumer实例
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("my-consumer-group1");  
// 2.设置NameServer的地址,消费者通过这个地址与NameServer进行通信,来获取Broker的地址信息  
consumer.setNamesrvAddr("192.168.220.135:9876");  
// 订阅一个或多个Topic,以及Tag来过滤需要消费的消息。我们订阅了"MyTopic1",使用"*"来匹配此Topic下的所有Tag  
consumer.subscribe("MyTopic1", "*");  
// 3.注册消息监听器,用于处理从Broker接收到的消息。使用MessageListenerConcurrently接口的实现,表示并行消费  
consumer.registerMessageListener(new MessageListenerConcurrently() {  
    @Override  
    // 4.当收到消息时,方法会被调用。
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext consumeConcurrentlyContext) {  
        // 5.遍历消息列表,并打印每条消息的内容(注意:这里直接打印msg对象不会得到预期的消息内容字符串)  
        for (MessageExt msg : msgs) {  
            // 所以我们打印msg.getBody()的内容,为了保留消息原样  
            System.out.println("已收到消息" + msg);  
        }  
        // 6.返回消费状态,这里表示消息已成功消费  
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;  
    }  
});  
// 7.启动消费者实例  
consumer.start();  
// 8.打印日志消费者已启动  
System.out.println("消费者已启动");

这里需要注意,msg包含了消息的详细信息,包括消息体、标签、属性等等。如果想打印消息内容,应该使用msg.getBody()方法获取消息体的字节数组,并且把它转换为字符串(如果消息体是文本的话)。
在这里插入图片描述
本篇文章到这里就结束了,后续会继续分享RocketMQ相关的知识,感谢各位小伙伴们的支持!

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1961961.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【信创】samba的命令行使用 _ 统信 _ 麒麟 _ 中科方德

原文链接&#xff1a;【信创】samba的命令行使用 | 统信 | 麒麟 | 中科方德 Hello&#xff0c;大家好啊&#xff01;今天给大家带来一篇关于在信创终端操作系统上使用Samba命令操作的文章。Samba是一种用于实现文件和打印共享的免费软件&#xff0c;它允许不同操作系统&#xf…

《昇思25天学习打卡营第27天》

今天我们继续Diffusion扩散模型的后半部分学习 条件U-Net 网络构建过程如下&#xff1a; 首先&#xff0c;将卷积层应用于噪声图像批上&#xff0c;并计算噪声水平的位置 接下来&#xff0c;应用一系列下采样级。每个下采样阶段由2个ResNet/ConvNeXT块 groupnorm attentio…

JAVA(IO流-字符流)day 7.30

ok了家人们今天继续学习IO流&#xff0c; 一.字符集 使用字节流输出中文可能有乱码。 因为每次读取的字节没有完全读取一个字的字节。 二.字符流 2.1 字符输出流【Writer】&#xff08;抽象类&#xff09; Writer是所有字符流的超类&#xff08;父类&#xff09; 字符输出…

蚓链数字化营销系统:“爆省”!“爆赚”!“爆值”!“爆快”!“爆增”!“爆享”!

随着信息技术的飞速发展和消费者行为的深刻变化&#xff0c;数字化营销已成为企业在市场竞争中取得优势的关键手段。蚓链数字化营销系统凭借其创新的功能和策略&#xff0c;为企业带来了一系列“爆”优势&#xff01; “按效果付费--信息化建设费用爆省”&#xff01; “按效果…

Win11没有记事本怎么办?更新至win11无法右键新建txt文件?

博主更新至Win11系统后目前用了不到一个月时间&#xff0c;今天突然发现 鼠标右键无法新建txt文件 了&#xff0c;一开始还以为Win11系统不支持txt类型文件&#xff0c;遂查找各种网上恢复教程。本文综合了多篇教程的方法&#xff0c;力求一文解决所有可能出现的情况&#xff0…

网络安全是什么?怎么入门网络安全?

一、网络安全的定义 网络安全&#xff0c;简单来说&#xff0c;就是保护网络系统中的硬件、软件以及其中的数据不因偶然或恶意的原因而遭到破坏、更改、泄露&#xff0c;保障系统连续可靠正常地运行&#xff0c;网络服务不中断。 随着信息技术的飞速发展&#xff0c;网络安全的…

JAVA基础 - 网络编程

目录 一. 网络基础 BS&#xff08;Browser/Server&#xff0c;浏览器/服务器架构&#xff09; CS&#xff08;Client/Server&#xff0c;客户端/服务器架构&#xff09; 二. TCP Socket通信 三. Socket类 四. 聊天实例 五. UDP Socket 六. 数据交换格式 一. 网络基础 网…

力反馈设备在远程机器人遥操作中的应用实例

随着科技的飞速发展&#xff0c;力反馈设备在远程机器人遥操作中的应用日益广泛&#xff0c;极大地提升了操作的精确性和安全性。其中&#xff0c;Haption Virtuose 6D力反馈设备以其卓越的性能成为该领域的佼佼者。 Haption Virtuose 6D力反馈设备医疗遥操作应用 在医疗领域&a…

公布一批脸书爬虫(facebook)IP地址,真实采集数据

一、数据来源&#xff1a; 1、这批脸书爬虫&#xff08;facebook&#xff09;IP来源于尚贤达猎头公司网站采集数据&#xff1b; ​ 2、数据采集时间段&#xff1a;2023年10月-2024年7月&#xff1b; 3、判断标准&#xff1a;主要根据用户代理是否包含“facebook”和IP核实。…

谷粒商城实战笔记-92~96-商品发布和查询

文章目录 Spu列表检索接口。Sku列表检索接口。仓库列表接口。问题记录 这一篇包含如下内容&#xff1a; 92-商品服务-API-新增商品-商品保存其他问题处理93-商品服务-API-商品管理-SPU检索94-商品服务-API-商品管理-SKU检索95-仓储服务-API-仓库管理-整合ware服务&获取仓库…

【云原生】Kubernetes中的定时任务CronJob的详细用法与企业级应用案例分享

✨✨ 欢迎大家来到景天科技苑✨✨ &#x1f388;&#x1f388; 养成好习惯&#xff0c;先赞后看哦~&#x1f388;&#x1f388; &#x1f3c6; 作者简介&#xff1a;景天科技苑 &#x1f3c6;《头衔》&#xff1a;大厂架构师&#xff0c;华为云开发者社区专家博主&#xff0c;…

皮尔逊(Person)相关系数

目录 一、总体和样本 二、总体皮尔逊相关系数 三、样本皮尔逊相关系数 四、皮尔逊相关系数的理解误区 五、总结 六、相关系数大小的解释&#xff08;并不严格&#xff09; 七、皮尔逊相关系数的计算 一、总体和样本 二、总体皮尔逊相关系数 三、样本皮尔逊相关系数 四、…

Java并发之ThreadLocal

1. 简介 ThreadLocal是 Java 中提供的一种用于实现线程局部变量的工具类。它允许每个线程都拥有自己的独立副本&#xff0c;从而实现线程隔离&#xff0c;用于解决多线程中共享对象的线程安全问题。 通常&#xff0c;我们会使用 synchronzed 关键字 或者 lock 来控制线程对临…

iPhone 手机如何查看自己的电话号码?这两种方法都可以

设置应用程序查看 第一种查看自己的电话号码的方法是在设置应用程序中的电话选项中查看当前手机的电话号码&#xff0c;下面是具体的操作步骤&#xff1a; 首先我们先打开设置应用程序&#xff0c;然后往下滑动找到电话选项&#xff0c;点击进入。 然后就可以看见界面中有“本…

【100个深度学习实战项目】目标检测、语义分割、目标追踪、图像分类等应有尽有,持续更新~~~

《博主简介》 小伙伴们好&#xff0c;我是阿旭。专注于人工智能、AIGC、python、计算机视觉相关分享研究。 ✌更多学习资源&#xff0c;可关注公-仲-hao:【阿旭算法与机器学习】共同学习交流~ &#x1f44d;感谢小伙伴们点赞、关注&#xff01; 引言 本文主要介绍深度学习在各…

智能城市管理系统设计思路详解:集成InfluxDB、Grafana和MQTTx协议(代码示例)

引言 随着城市化进程的加快&#xff0c;城市管理面临越来越多的挑战。智能城市管理系统的出现&#xff0c;为城市的基础设施管理、资源优化和数据分析提供了现代化的解决方案。本文将详细介绍一个基于开源技术的智能城市管理系统&#xff0c;涵盖系统功能、技术实现、环境搭建…

【扒模块】DFF

图 医学图像分割任务 代码 import torch import torch.nn as nnfrom timm.models.layers import DropPath # 论文&#xff1a;D-Net&#xff1a;具有动态特征融合的动态大核&#xff0c;用于体积医学图像分割&#xff08;3D图像任务&#xff09; # https://arxiv.org/abs/2403…

[C#]基于wpf实现的一百多种音色的Midi键盘软件

键盘 音色库 源码地址&#xff1a;https://download.csdn.net/download/FL1623863129/89599322

前端必知必会-html表单的基本使用

文章目录 HTML 表单<form> 元素<input> 元素文本字段<label> 元素单选按钮复选框提交按钮<input> 的 Name 属性总结 HTML 表单 HTML 表单用于收集用户输入。用户输入通常发送到服务器进行处理。 <form> 元素 HTML <form> 元素用于创建 H…

无人机环保行业解决方案-河道自动巡检

搭配大疆机场&#xff0c;智能化巡检 轻量一体化设计 相较于其他市面产品&#xff0c;大疆机场更加小巧&#xff0c;占地面积不足1平方米。 展开状态&#xff1a;长1675 mm&#xff0c;宽895 mm&#xff0c;高530mm&#xff08;不含气象站&#xff09; 闭合状态&#xff1a;…