RocketMQ基本概念与入门

news2025/1/10 17:16:36

文章目录

  • MQ基本结构
  • 依赖
    • 案例:
      • product
      • Consumer
    • 核心概念
      • 1.nameserver
      • 2.broker
      • 3.主题队列
      • 4.queue队列
      • 5. 生产者
      • 6.消费者分组和生产者分组
      • 7.消费点位

MQ基本结构

在这里插入图片描述

  • message: 消息数据对象
  • product: 程序代码,生成消息,发送消息到队列
  • consumer: 程序代码,监听(绑定)队列,获取消息,执行消费代码
  • queue: Rocketmq rabbitmq kafka这些消息队列中间件软件.

依赖

<dependency>
    <!--2.2.2底层rocketmq客户端4.9.1-->
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-spring-boot-starter</artifactId>
    <version>2.2.2</version>
</dependency>

案例:

product

public class MyProducer {
    /**
     * 向rocketmq发送第一条消息
     */
    @Test
    public void sendTest01() throws Exception {
    //1.准备一个生产者对象,开启长链接
        DefaultMQProducer producer=new DefaultMQProducer();
        //对当前producer设置分组
        producer.setProducerGroup("first-producer-group");
        //连接nameserver localhost:9876
        producer.setNamesrvAddr("localhost:9876");
        //开启长链接
        producer.start();
    //2.封装一个消息对象,我们想要发送的内容,只是消息的一部分
        //创建一个消息对象
        Message message=new Message();
        //消息携带的内容 body
        String msg="当前发送的第一条消息";
        message.setBody(msg.getBytes(StandardCharsets.UTF_8));
        //设置消息主题,分类,按业务分类
        message.setTopic("first-topic-a");
        //主题标签 和key标识 
    //3.调用api方法将消息发送,接收返回结果,查看发送的信息比如状态
        //分为异步发送,同步发送,异步发送性能速度更高,但是无法保证成功.
        //同步发送,性能速度没有异步快,但是可以接收反馈结果
        SendResult send = producer.send(message);
        //result解析获取发送相关的信息
        System.out.println("发送状态:"+send.getSendStatus());
        System.out.println("消息到达主题,队列,broker信息:"+send.getMessageQueue());
    }
}

Consumer

public class MyConsumer1 {
    @Test
    public void consumerTest01() throws Exception {
    //1.构建一个消费者对象,连接nameserver创建长链接
        // push pull的区别 push消费端,消费的消息是队列推送给他的
        // pull 消费端代码执行一次pull 拉取过来一条消息
        // 收邮件 推的, 抢红包 拉取的
        DefaultMQPushConsumer consumer=new DefaultMQPushConsumer();
        //设置nameserver地址
        consumer.setNamesrvAddr("localhost:9876");
        //消费者分组
        consumer.setConsumerGroup("first-consumer-group-a");
        //定义监听的主题,消费端代码会根据定义的主题寻找nameserver路由信息,找到主题的队列进行绑定
        //topic 主题名称,subExpression 定义过滤逻辑 *表示匹配所有
        consumer.subscribe("first-topic-a","*");
    //2.执行api开始监听主题,实现队列的消费
        //提供给consumer一个监听器
        consumer.setMessageListener(new MessageListenerConcurrently() {
            /**
             * 推送过来的消息,都会调用consumerMessage执行消费逻辑
             * @param list 消息数据 list表示可以批量处理的消息,不是批量消息,list元素只有1个
             * @param consumeConcurrentlyContext
             * @return
             */
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(
                    List<MessageExt> list,
                    ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                //获取消息 由于不是批量发送只有list一个元素
                MessageExt messageExt = list.get(0);
                messageExt.getMsgId();//唯一的一个标识,每次消息组装的对象都会在发送时,生成一个msgId
                byte[] body = messageExt.getBody();
                //将消息转化
                String message=new String(body, StandardCharsets.UTF_8);
                System.out.println("消费端获取到消息:"+message);
                //context 控制返回确认信息 ackIndex顺序
                //返回消费状态 success 队列会将消息对应当前消费组,移动偏移量,记录消费完成
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        //开启长连接
        consumer.start();
        while (true);
    }
}

核心概念

1.nameserver

NameServer是一个简单的 Topic 路由注册中心,支持 Topic、Broker 的动态注册与发现。
主要包括两个功能:

  • Broker管理,NameServer接受Broker集群的注册信息并且保存下来作为路由信息的基本数据。然后提供心跳检测机制,检查Broker是否还存活;
  • 路由信息管理,每个NameServer将保存关于 Broker 集群的整个路由信息和用于客户端查询的队列信息。Producer和Consumer通过NameServer就可以知道整个Broker集群的路由信息,从而进行消息的投递和消费。

NameServer通常会有多个实例部署,各实例间相互不进行信息通讯。Broker是向每一台NameServer注册自己的路由信息,所以每一个NameServer实例上面都保存一份完整的路由信息。当某个NameServer因某种原因下线了,客户端仍然可以向其它NameServer获取路由信息。

  • 总之: nameserver作为协调器, 谁的信息被用到,就要到nameserver注册,谁要用注册信息,就要到nameserver同步抓取.
    broker要作为rocketmq容器被生产者和消费者代码使用.

2.broker

Broker主要负责消息的存储、投递和查询以及服务高可用保证。

  • NameServer几乎无状态节点,因此可集群部署,节点之间无任何信息同步。Broker部署相对复杂。
  • 在 Master-Slave 架构中,Broker 分为 Master 与 Slave。一个Master可以对应多个Slave,但是一个Slave只能对应一个Master。Master 与 Slave 的对应关系通过指定相同的BrokerName,不同的BrokerId 来定义,BrokerId为0表示Master,非0表示Slave。Master也可以部署多个。

3.主题队列

  • 简单理解,主题是一类消息的集合,每次我们发送消息必须指定消息绑定某一个主题.

  • 生产者发送的某一条消息,只能指向一个主题,多条消息可以指向同一个主题,同一个主题中有多个消息队列保存消息,消费端可以根据订阅的主题消费不同主题的消息.这样可以实现业务隔离.

  • 比如电商主题可以是order,主题也可以是cart,还可以是product相关的…

  • 一类消息,从数据的格式,携带body格式,都是完全一致的. 不会出现"第一条消息的"body是普通字符串,第二条消息是个对象Json,不可能第一条消息延迟消息(支付订单倒计时取消),第二条消息普通同步消息.
    在这里插入图片描述

4.queue队列

存储消息的物理实体(最小单位)。一个Topic中可以包含多个Queue(分布式体现的关键),每个Queue中存放的就是该Topic的消息。一个Topic的Queue也被称为一个Topic中消息的分区**(Partition**)。

注意:一个Topic的Queue中的消息只能被一个消费者组中的一个消费者消费(消费点位逻辑)。一个Queue中的消息不允许同一个消费者组中的多个消费者同时消费。
在这里插入图片描述

5. 生产者

问题:通过上述概念的了解,生产者,nameserver,broker之间是如何交互的.

  • 启动 nameserver 保存broker路由信息
  • 主题一旦创建,保存broker里,同时生成队列,这些数据,作为路由信息保存nameserver
  • 生产者从nameserver拿到当前集群所注册信息(路由)
  • 发送消息的时候,连接具体的那个broker找具体的topic的具体queue实现消息发送,使用的具体信息,在返回的SendResult中体现

6.消费者分组和生产者分组

消息生产者,负责生产消息。本质上是程序中的一段代码.Producer投递消息到broker代理中.找到主题,负载均衡存放到队列中.
RocketMQ中的消息生产者都是以生产者组(Producer Group)的形式出现的。生产者组是同一类生产者的集合,产生同一类型的消息,这类Producer发送相同Topic类型的消息。一个生产者组可以同时向多个主题发送消息。
在这里插入图片描述
RocketMQ中的消息消费者都是以消费者组(Consumer Group)的形式出现的。消费者组是同一类消费者的集合,这类Consumer消费的是同一个Topic类型的消息,对应同一类消息数据。消费者组使得在消息消费方面,实现负载均衡(将一个Topic中的不同的Queue平均分配给同一个Consumer Group的不同的Consumer,注意,并不是将消息负载均衡)和容错(一个Consmer挂了,该Consumer Group中的其它Consumer可以接着执行消费逻辑.

由于主题中有多个队列,一组消费者,最多有和队列一样数量的消费成员,再多,无法绑定队列消费消息了.

7.消费点位

在队列中记录了所有和偏移量有关的数据比如:

  • 最小偏移量:都是0
  • 最大偏移量:当前消息的个数

在消费者中也在记录偏移量

  • 当前组对应主题队列的消费最小偏移量,和队列的最大偏移量(通过这两个值,能够知道当前消费者消费到哪个消息,还有多少没消费)
  • 在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/787430.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

MPU6050简介

文章目录 简介参数IIC通信的从机地址硬件电路框图 简介 MPU6050是一个6轴姿态传感器&#xff0c;可以测量芯片自身X,Y,Z轴的加速度&#xff0c;角速度&#xff0c;从而得到姿态角&#xff0c;用于平衡车&#xff0c;飞行器等。 内部结构&#xff1a; 3轴加速度计&#xff08;A…

数据库运维作业2

1.理解MySQL主从复制原理。 主要基于MySQL二进制日志 主要包括三个线程&#xff08;2个I/O线程&#xff0c;1个SQL线程&#xff09; 1&#xff09;MySQL将数据变化记录到二进制日志中&#xff1b; 2&#xff09;Slave将MySQL的二进制日志拷贝到Slave的中继日志中&#xff1b; 3…

vscode插件unocss无法正常使用

官网文档&#xff1a;https://alfred-skyblue.github.io/unocss-docs-cn/integrations/vscode#VPSidebarNav 无法使用的原因 很多人在使用的时候会配置uno.config.ts文件&#xff0c;但这个文件不一定放置在根目录下。扩展程序将尝试在您的项目中查找 UnoCSS 配置。如果未找到…

selenium 获取请求响应信息,包括请求的响应头和响应体

在我们使用selenium请求网页时&#xff0c;有时不想从浏览器解析后的html标签获取数据&#xff0c;如果能直接获取url返回的json格式数据会更容易解析。就像request和scrapy爬虫返回的响应数据一样。那么&#xff0c;我们用selenium应该怎么做呢&#xff1f; selenium并不支持…

pytest——断言后继续执行

前言 在编写测试用例的时候&#xff0c;一条用例可能会有多条断言结果&#xff0c;当然在自动化测试用例中也会遇到这种问题&#xff0c;我们普通的断言结果一旦失败后&#xff0c;就会出现报错&#xff0c;哪么如何进行多个断言呢&#xff1f;pytest-assume这个pytest的插件就…

Git的核心概念:探索Git中的提交、分支、合并、标签等核心概念,深入理解其作用和使用方法

&#x1f337;&#x1f341; 博主 libin9iOak带您 Go to New World.✨&#x1f341; &#x1f984; 个人主页——libin9iOak的博客&#x1f390; &#x1f433; 《面试题大全》 文章图文并茂&#x1f995;生动形象&#x1f996;简单易学&#xff01;欢迎大家来踩踩~&#x1f33…

学生管理系统-02项目案例(2)

一、表单的验证 完成表单验证的步骤 在el-form表单元素上添加一个rules属性&#xff0c;rules中配置相关的验证规则 在el-form表单元素上添加:model将data中验证的响应式数据关联起来 在el-form-item中添加prop属性&#xff0c;该属性一定和rules中的key值对应来 <el-fo…

区块链实验室(12) - 网络拓扑对PBFT共识流量的影响

区块链实验室(10) - 实例说明PBFT的共识过程说明了1个简单又极端的网络&#xff0c;在这个网络中完成1个交易的共识&#xff0c;需要26次通信&#xff0c;见下图所示。 换1个网络&#xff0c;这个网络是强连通图&#xff0c;见下图。 在这个网络中完成1次交易&#xff0c;流量见…

vue3 +ts 报错 index.vue 不是模块

那是因为index.vue中创建了一个空的script标签&#xff0c;而且语法使用的是ts语法。vue-cli会用ts语法解析和校验 如果是无状态组件&#xff0c;删掉 如果是有状态组件&#xff0c;导出该组件的实例 去掉null的script后&#xff1a;

如何防止word转pdf乱码?这几个方法你试过了吗?

在工作中&#xff0c;我们常常需要处理各种类型的文件&#xff0c;并且会根据不同需求进行PDF文件与其他格式文件的相互转换&#xff0c;然而有时候在将Word文件转换成PDF时&#xff0c;可能会遇到乱码等问题&#xff0c;让人感到困扰。今天我将介绍两种方法&#xff0c;让您在…

除了Midjourney,这5个绘画网站同样好用

如今随着科技的发展&#xff0c;AI绘画网站走进了人们的视线。今天本文会为大家介绍5个同Midjourney一样好用的AI绘画王章&#xff0c;带大家体验AI绘画带来的乐趣&#xff0c;也帮助设计师更快地实现绘画创作&#xff0c;一起来看看吧&#xff01; 1、即时灵感 即时灵感是一…

世界头号黑客,渗透成功率100%,他为什么这么厉害?

这个世界有一些人&#xff0c;他们特别擅长计算机编程&#xff0c;比如Linux之父Linus&#xff0c;id software创始人John Carmack、QEMU, FFMPEG作者Fabrice Bellard。 这个世界也有一些人&#xff0c;他们特别擅长和人打交道&#xff0c;三言两语之间就能让你敞开心扉&#…

抖斗音直播间评论引流助手,支持直播间喊话+视频评论区喊话=到指定直播间引流精准粉丝【永久脚本+详细教程】

如果你觉得直播间发言手动太麻烦了&#xff0c;或许这个自动工具能帮到你&#xff01; 1.开始运行前&#xff0c;需要手动去打开打开直播间或者视频评论区&#xff0c;再运行脚本。 2.脚本就是模拟人工操作&#xff0c;在相应的APP里进行评论&#xff0c;无突破APP限制功能。…

【【51单片机的DS18B20温度传感器】】

学习温度传感器&#xff0c;探索单片机的奥秘 DS18B20是一种常见的数字温度传感器 测温范围-55度 ~125度 通信接口 1-wire 其他特征 可形成总线结构 内置温度报警功能 可寄生供电 其中的64位bit ROM 作为器件地址&#xff0c;用于总线通信的寻址 SCRATCHPAD暂存器 用于总线…

科技资讯|苹果计划本月推出Vision Pro头显开发套件,电池有重大更新

根据消息源 aaronp613 分享的信息&#xff0c;苹果计划本月底面向开发者&#xff0c;发布 Vision Pro 头显开发套件。消息源还指出苹果更新了 Vision Pro 头显电池组的代号&#xff0c;共有 A2781&#xff0c;A2988 和 A2697 三种不同的型号&#xff0c;目前尚不清楚三者之间的…

从零构建深度学习推理框架-1 简介和Tensor

源代码作者&#xff1a;https://github.com/zjhellofss 本文仅作为个人学习心得领悟 &#xff0c;将原作品提炼&#xff0c;更加适合新手 什么是推理框架&#xff1f; 深度学习推理框架用于对已训练完成的神经网络进行预测&#xff0c;也就是说&#xff0c;能够将深度训练框…

openGauss学习笔记-19 openGauss 简单数据管理-ORDER BY子句

文章目录 openGauss学习笔记-19 openGauss 简单数据管理-ORDER BY子句19.1 语法格式19.2 参数说明19.3 示例 openGauss学习笔记-19 openGauss 简单数据管理-ORDER BY子句 ORDER BY对SELECT语句检索得到的一列或者多列数据进行升序&#xff08;ASC&#xff09;或者降序&#xf…

Linux 两种GPIO控制方式

1、采用sysfs的方式控制&#xff0c;这是内核标准的sysfs接口 如&#xff1a; echo 25 > /sys/class/gpio/export echo "out" > /sys/class/gpio/gpio25/direction echo 1 > /sys/class/gpio/gpio25/value 2、采用libgpiod 控制内核生成的节点来控制/d…

Qt:强大API、简化框架、多语言支持,构建全面应用程序“

强大的API&#xff1a;Qt提供了丰富的API&#xff0c;包括250多个C类&#xff0c;基于模板的集合、序列化、文件操作、IO设备、目录管理、日期/时间等功能。还包括正则表达式处理和支持2D/3D图形渲染&#xff0c;以及OpenGL和XML支持。此外&#xff0c;Qt还允许导入第三方图形设…

Spring Boot 框架中的配置文件

一、配置文件作用 整个项目中所有重要的数据都是在配置文件中配置的&#xff0c;比如&#xff1a; 数据库的连接信息&#xff08;包含用户名和密码的设置&#xff09;&#xff1b; 项目的启动端口&#xff1b; 第三方系统的调用秘钥等信息&#xff1b; 用于发现和定位问题的普…