【RocketMQ】RocketMQ快速入门

news2025/1/21 15:27:15

🎯 导读:该文档介绍了Apache RocketMQ消息队列的基础应用,包括消息发送与接收的基本流程。首先通过创建生产者实例,并指定名称服务器地址,启动后即可发送消息至指定主题。然后创建消费者实例订阅相应主题,并设置监听器处理接收到的消息。文档中还提供了代码示例,展示了如何实现简单的生产和消费逻辑。此外,文档解释了消息队列在不同场景下的分发策略,如负载均衡与广播模式,并强调了队列数量与消费者数量之间的关系以确保消息的合理分配。

文章目录

    • 消息发送和监听的流程
      • 消息生产者
      • 消息消费者
    • 搭建RocketMQ入门案例
      • 创建项目
      • 加入依赖
      • 编写生产者
      • 编写消费者
    • 说明
      • 一个消费者组消费一个topic
      • 两个消费者组消费一个topic
      • 生产者的消息发送给主题的哪个队列
      • 消费者如何从队列中拉取消息
        • 只有一个消费者,要拉取所有队列的消息
        • 两个消费者,每个消费者要负责两个队列
        • 三个消费者(要求尽量平衡)
        • 四个消费者,一人一个
        • 五个消费者,第五个消费者永远不接收消息

RocketMQ提供了发送多种发送消息的模式,例如同步消息,异步消息,顺序消息,延迟消息,事务消息

消息发送和监听的流程

消息生产者

1、创建消息生产者 producer ,并指定生产者组名

2、指定 Nameserver 地址

3、启动 producer

4、创建消息对象,指定主题 Topic、Tag 和消息体等

5、发送消息

6、关闭 producer

消息消费者

1、创建消费者 consumer ,指定消费者组名

2、指定 Nameserver 地址

3、创建监听订阅主题 Topic和Tag 等

4、处理消息

5、启动消费者 consumer

搭建RocketMQ入门案例

创建项目

在这里插入图片描述

在这里插入图片描述

加入依赖

引入原生API,先不用spring-boot-starter版本

<dependencies>
    <dependency>
        <groupId>org.apache.rocketmq</groupId>
        <artifactId>rocketmq-client</artifactId>
        <version>4.9.2</version>
        <!--docker的用下面这个版本-->
        <version>4.4.0</version>
    </dependency>
    <dependency>
        <groupId>junit</groupId>
        <artifactId>junit</artifactId>
        <version>4.12</version>
    </dependency>
    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
        <version>1.18.22</version>
    </dependency>
</dependencies>

编写生产者

/**
 * 测试生产者
 *
 * @throws Exception
 */
@Test
public void testProducer() throws Exception {
    // 创建默认的生产者(指定生产者组名)
    DefaultMQProducer producer = new DefaultMQProducer("test-group");
    // 设置nameServer地址
    producer.setNamesrvAddr("localhost:9876");
    // 启动实例
    producer.start();
    for (int i = 0; i < 1; i++) {
        // 创建消息
        // 第一个参数:主题的名字
        // 第二个参数:消息内容(要转化为字节数组)
        Message msg = new Message("TopicTest", ("Hello RocketMQ " + i).getBytes());
        // 发送结果
        SendResult send = producer.send(msg);
        // 打印发送状态
        System.out.println(send.getSendStatus());
    }
    // 关闭实例
    producer.shutdown();
}

为了连接方便,可以使用一个常量NAME_SRV_ADDR来存储localhost:9876

【运行】

在这里插入图片描述

在控制台中可以看到创建了一个主题 testTopic

在这里插入图片描述

点击状态,一个主题默认4个队列

在这里插入图片描述

点击路由,可以查看 broker 的 ip 地址

在这里插入图片描述

在CONSUMER管理中,可以查看消费者

编写消费者

@Test
public void simpleConsumer() throws Exception {
    // 创建一个消费者
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("test-consumer-group");
    // 连接 namesrv
    consumer.setNamesrvAddr(MqConstant.NAME_SRV_ADDR);
    // 订阅一个主题  * 表示订阅这个主题中所有的消息,后面会有消息过滤的教程
    consumer.subscribe("testTopic", "*");
    // 设置一个监听器 (一直监听的,异步回调方式,消费者线程和监听线程不是一个线程)
    consumer.registerMessageListener(new MessageListenerConcurrently() {
        @Override
        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
            // 这个就是消费的方法 (业务处理)
            System.out.println("我是消费者");
            // msgs 虽然是List,但是只有一条消息,所以get(0)就行
            System.out.println(msgs.get(0).toString());
            // 消息内容从字节数组转化为String
            System.out.println("消息内容:" + new String(msgs.get(0).getBody()));
            System.out.println("消费上下文:" + context);
            // 返回值  CONSUME_SUCCESS成功,消息会从mq出队
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            // RECONSUME_LATER(报错/null)失败 消息会重新回到队列 过一会重新投递出来 给当前消费者或者其他消费者消费的
//                return ConsumeConcurrentlyStatus.RECONSUME_LATER;
        }
    });
    // 启动
    consumer.start();
    // 挂起当前的jvm,让监听一直存在
    System.in.read();
}

【运行】

在这里插入图片描述

说明

  • 一个生产者组可以投递到多个主题
  • 一个消费者组只能订阅一个主题

在这里插入图片描述

一个消费者组消费一个topic

在这里插入图片描述

【负载均衡模式】消息1给 C1 消费,消息2给 C2 消费,以此类推

【广播模式】同一条消息既给 C1 消费,又给 C2 消费

两个消费者组消费一个topic

同一消息,两个消费者组都获取到,但是组内要分配给哪个消费者,就看是负载【均衡模式】还是【广播模式】了

在这里插入图片描述

生产者的消息发送给主题的哪个队列

生产者会将消息轮询发送到主题的4个队列

在这里插入图片描述

消费者如何从队列中拉取消息

只有一个消费者,要拉取所有队列的消息

在这里插入图片描述

  • 代理者:MQ
  • 消费者:我们的程序

测试,生产者生产12个消息

在这里插入图片描述

在这里插入图片描述

差值:代理者位点-消费者位点。如果差值太大,说明消息堆积

两个消费者,每个消费者要负责两个队列

在这里插入图片描述

三个消费者(要求尽量平衡)

在这里插入图片描述

四个消费者,一人一个

在这里插入图片描述

五个消费者,第五个消费者永远不接收消息

队列数量最好大于等于消费者组内的消费者数量!!!

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2175396.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

js逆向——webpack实战案例(一)

今日受害者网站&#xff1a;https://www.iciba.com/translate?typetext 首先通过跟栈的方法找到加密位置 我们跟进u函数&#xff0c;发现是通过webpack加载的 向上寻找u的加载位置&#xff0c;然后打上断点&#xff0c;刷新网页&#xff0c;让程序断在加载函数的位置 u r.n…

Mamba模型初步解析 — Mamba : Linear-Time Sequence Modeling with Selective State Spaces

Mamba模型初步接触 — Mamba : Linear-Time Sequence Modeling with Selective State Spaces "Mamba"是一种序列建模架构&#xff0c;它采用了称为选择性状态空间模型&#xff08;SSMs&#xff09;的结构来优化处理长序列数据的效率和性能&#xff0c;这在语言处理、…

如果只能保留一个复制粘贴软件,那一定是它pastemate

下载地址&#xff1a;Pastemate 在日常的工作和生活中&#xff0c;使用电脑必离不开的功能中&#xff0c;一定有复制粘贴。传统的复制粘贴方式效率不那么高&#xff0c;Windows内置的剪切板功能感觉又差那么些意思。 &#x1f9d0;对于功能和颜值都有要求的你&#xff0c;一定…

端口隔离配置的实验

端口隔离配置是一种网络安全技术&#xff0c;用于在网络设备中实现不同端口之间的流量隔离和控制。以下是对端口隔离配置的详细解析&#xff1a; 基本概念&#xff1a;端口隔离技术允许用户将不同的端口加入到隔离组中&#xff0c;从而实现这些端口之间的二层数据隔离。这种技…

Linux入门2——初识Linux权限

目录 0. Linux下的用户 1.文件访问者的分类 2.文件类型和访问权限 3. 文件权限值的表示方法 4.文件访问权限的相关设置方法 4.1 修改文件的访问权限 4.2修改文件的拥有者和所属组 0. Linux下的用户 在学习Linux权限之前&#xff0c;我们要先来了解Linux下的用户&#x…

(十七)、Mac 安装k8s

文章目录 1、Enable Kubernetes2、查看k8s运行状态3、启用 kubernetes-dashboard3.1、如果启动成功&#xff0c;可以在浏览器访问3.2、如果没有跳转&#xff0c;需要单独安装 kubernetes-dashboard3.2.1、方式一&#xff1a;一步到位3.2.2、方式二&#xff1a;逐步进行 1、Enab…

杭州网站设计中的常见误区及解决方案

在杭州网站设计领域&#xff0c;随着数字经济的快速发展&#xff0c;越来越多的企业意识到互联网的重要性。然而&#xff0c;在实际的网站设计过程中&#xff0c;仍然存在一些常见的误区&#xff0c;这些误区可能会影响用户体验和网站的整体效果。以下是几种普遍存在的误区及其…

国产动漫论坛系统小程序的设计

管理员账户功能包括&#xff1a;系统首页&#xff0c;个人中心&#xff0c;用户管理&#xff0c;动漫分类管理&#xff0c;动漫视频管理&#xff0c;动漫图片管理&#xff0c;动漫文章管理&#xff0c;交流论坛&#xff0c;系统管理 微信端账号功能包括&#xff1a;系统首页&a…

汇编语言 访问CMOS RAM并打印时间(未完)

题目:以"年/月/日 时:分:秒"的格式,显示当前的日期,时间 提示:在此代码的基础上加以改造 assume cs:code code segment start:mov al,9 ;年out 70h,al ;传入9号单元的地址in al,71h ;取9号单元的内容&#xff0c;高4位为十位、低4位为各位mov ah,almov cl,4shr ah,…

1-仙灵之谜(区块链游戏详情介绍)

1-仙灵之谜&#xff08;区块链游戏详情介绍&#xff09; 前言&#xff08;该游戏仅供娱乐&#xff09;正文 前言&#xff08;该游戏仅供娱乐&#xff09; 依稀记得本科那会儿参加了一个区块链实验室&#xff0c;那时每周末大家都会爬山或者抽出一下午讨论区块链以及未来&#x…

< 初等物理 >

SI国际单位制 常见的公制单位 为什么需要单位&#xff0c;是统一衡量的标准 通过国际单位&#xff0c;以及单位的拓展&#xff0c;以及单位的组合&#xff0c;形成一系列新的测量单位 面积 m^2 速率 m/s 米每二次方秒&#xff0c;m / s, delta表示增量, 每秒移动多少米 加…

pdf怎么转变成jpg图片?值得推荐的几种PDF转jpg方法

pdf怎么转变成jpg图片&#xff1f;jpg格式的图像在电子邮件、社交媒体等在线平台上分享非常方便&#xff0c;用户无需担心软件兼容性问题。将PDF内容转换为jpg后&#xff0c;能够有效保留原始文档的视觉布局&#xff0c;使信息更加生动易懂&#xff0c;适合用于演示和展示。同时…

【小沐学GIS】基于ubuntu+three.js的OSM建筑模型显示(node.js、Python)

文章目录 1、简介1.1 ubuntu1.2 node1.3 python1.4 osm1.5 three.js 2、安装ubuntu3、安装node4、安装python结语 1、简介 1.1 ubuntu https://cn.ubuntu.com/download https://ubuntu.com/download Ubuntu是一个以桌面应用为主的Linux发行版操作系统&#xff0c;其名称来自非…

萝卜大杂烩 | 快速掌控seaborn(画图必备)

本文来源公众号“萝卜大杂烩”&#xff0c;仅用于学术分享&#xff0c;侵权删&#xff0c;干货满满。 原文链接&#xff1a;快速掌控seaborn Matplotlib绘制一张美图需要很多参数调整&#xff0c;于是就出现了high-level版的Seaborn&#xff0c;几行代码即可输出美美的图形&am…

超详细的 GitHub 个人主页美化教程

Guthub 个人主页 &#xff08;官方称呼是 profile&#xff09;可以展示很多有用的信息&#xff0c;例如添加一个首页被访问次数的计数器&#xff0c;一个被 Star 与 Commit 的概览信息&#xff0c;以及各种技能标签&#xff0c;设备标签等&#xff0c;还可以利用 wakatime 显示…

一文上手SpringSecurity【七】

之前我们在测试的时候,都是使用的字符串充当用户名称和密码,本篇将其换成MySQL数据库. 一、替换为真实的MySQL 1.1 引入依赖 <dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>8.0.33</v…

一文理解mysql 联合索引和各种SQL语句分析

文章目录 索引图示主键索引二级索引表SQL总结索引图示 主键索引 二级索引 这里如果是联合索引的话,那里面的key就是多个colume的值 表 -- demo.`order` definitionCREATE TABLE `order` (

请求转发和响应重定位

一、请求转发 二、响应重定位 302&#xff1a;服务器的收到请求&#xff0c;但所需要的行为和资源要重定位到其他地方&#xff08;可以是外部和服务器的其他位置&#xff09;时就会像请求者发送302状态码 location响应头&#xff1a;告诉请求者重定位的URL路径

【前端】35道JavaScript进阶问题(1)

来源&#xff1a; javascript-questions/zh-CN/README-zh_CN.md at master lydiahallie/javascript-questions GitHub 记录一些有趣的题。 1 输出是&#xff1f; const shape {radius: 10,diameter() {return this.radius * 2},perimeter: () > 2 * Math.PI * this.rad…

如何通过python+sqlalchemy获得MSsql视图的结构

话不多说 目的:为了对接第三方表视图,需要知道表视图的字段结构,如名称,对应的表字段类型 实现结果如图: 直接上代码: from sqlalchemy import create_engine, MetaData, select, text from web import urlquoteDRIVER "ODBC Driver 18 for SQL Server" INSTANCE…