RocketMQ之(一)RocketMQ入门

news2025/1/11 9:54:56

一、RocketMQ入门

  • 一、RocketMQ 介绍
    • 1.1 RocketMQ 是什么?
    • 1.2 RocketMQ 应用场景
      • 01、应用解耦
      • 02、流量削峰
      • 03、数据分发
    • 1.3 RocketMQ 核心组成
      • 01、NameServer
      • 02、Broker
      • 03、Producer
      • 04、Consumer
    • 1.6 运转流程
    • 1.5 RocketMQ 架构
      • 01、NameServer 集群
      • 02、Broker 集群
      • 03、Producer 集群
      • 04、Consumer 集群
      • 07、集群工作流程
      • 06、集群间的交互方式
    • 1.6 RocketMQ 优缺点
      • 01、优点
      • 02、缺点
    • 1.7 各种 MQ 比较
  • 二、RocketMQ 安装(Linux 版本)
    • 2.1 环境要求
    • 2.2 安装步骤
      • 01、上传安装包
      • 02、解压安装包
      • 03、参数配置
    • 2.3 目录介绍
    • 2.4 启动 RocketMQ
    • 2.5 测试 RocketMQ
    • 2.6 关闭 RocketMQ
  • 三、rocketmq-console 集群监控平台搭建
    • 3.1 简介
    • 3.2 搭建集群监控平台
      • 01、下载
      • 02、上传解压
      • 03、修改配置参数
      • 04、打包
      • 05、启动和访问
      • 06、问题点
  • 四、RocketMQ 发送消息基本样例
    • 4.1 普通消息发送
    • 4.2 普通消息消费

一、RocketMQ 介绍

1.1 RocketMQ 是什么?

RocketMQ 是一款纯 java、分布式、队列模型的开源消息中间件,支持事务消息、顺序消息、批量消息、定时消息、消息回溯等。

1.2 RocketMQ 应用场景

消息队列是一种"先进先出"的数据结构,其应用场景主要包含以下三个方面:

01、应用解耦

系统的耦合性越高,容错性就越低。

以电商应用为例:用户创建订单后,如果耦合调用库存系统、物流系统、支付系统,任何一个子系统出了故障或者因为升级等原因暂时不可用,都会造成下单操作异常,影响用户使用体验。
在这里插入图片描述
使用消息队列解耦,系统的耦合性就会提高了。比如:如果物流系统发生故障,需要几分钟才能修复好,在这段时间内,物流系统要处理的数据被缓存到消息队列中,用户的下单操作正常完成。当物流系统恢复后,补充处理存在消息队列中的订单消息即可,终端系统感知不到物流系统发生过几分钟故障。
在这里插入图片描述

02、流量削峰

应用系统如果遇到系统请求流量的瞬间猛增,有可能会将系统压垮。有了消息队列后,可以将大量请求缓存起来,分散到很长一段时间处理,这样可以大大提升系统的稳定性和用户体验。
在这里插入图片描述
一般情况下,为了保证系统的稳定性,如果系统负载超过阈值,就会阻止用户请求,这会影响用户体验。而如果使用消息队列将请求缓存起来,等待系统处理完毕后通知用户下单完毕,这样的体验应该要好很多。
在这里插入图片描述
处于经济考量的目的:
业务系统正常时段的 QPS 如果是 1000,流量最高峰是 10000,为了应对流量高峰配置高性能的服务器显然不划算,这时就可以考虑使用消息队列对峰值流量削峰。

03、数据分发

通过消息队列可以让数据在多个系统之间进行流通。数据的产生方不需要关心谁来使用数据,只需要将数据发送到消息队列,数据使用方直接在消息队列中直接获取数据即可。
在这里插入图片描述
在这里插入图片描述

1.3 RocketMQ 核心组成

RocketMQ 主要有四大核心组成部分:NameServer、Broker、Producer、Consumer。

在这里插入图片描述
Topic:区分消息的种类。一个发送者可以发送消息给一个或多个 Topic,一个消息的接收者可以订阅一个或多个 Topic 消息。

Message Queue:相当于是 Topic 的分区,用于并行发送和接收消息。

01、NameServer

NameServer 是一个几乎无状态节点,可集群部署,节点之间没有任何信息同步。所以 RocketMQ 需要先启动 NameServer 再启动 Broker。

  • 作用

    NameServer 是整个 RocketMQ 的 "大脑",它相当于是服务注册中心的角色,用来管理 Broker。举例:各个邮局的管理机构。

    每个 NameServer 中都保存着 Broker 集群的整个路由信息和用于客户端查询的队列信息,生产者和消费者通过 NameServer 去获取整个 Broker 集群的路由信息,从而进行消息的投递和消费。

    每个 Broker 在启动的时候会到 NameServer 中注册,Producer 在发送消息前会根据 Topic 到 NameServer 获取到 Broker 的路由信息,进而和 Broker 取得连接。Consumer 也会定时获取 Topic 的路由信息。所以从功能上看应该是和 ZooKeeper 差不多的,但是据说 RocketMQ 的早期版本确实是使用的ZooKeeper ,后来改为了自己实现 NameServer。

  • 与 ZooKeeper 的区别

    NameServer 和 ZooKeeper 的作用大致是相同的。从宏观上来看,NameServer 做的东西很少,就是保存一些运行数据,NameServer 之间不互相连,这就需要 Broker 端连接所有的 NameServer,运行数据的改动要发送到每一个 NameServer ,从而来保证运行数据的一致性(这个一致性确实有点弱),这样就变成了 NameServer很轻量级,但是 Broker 端就要做更多的东西了。

    但是在 ZooKeeper 中,Broker 只需要连接其中的一台机器,运行数据分发、一致性都交给了 ZooKeeper 来完成。

  • 高可用保障

    Broker 在启动时向所有 NameServer 注册(主要是服务器地址等) ,生产者在发送消息之前先从NameServer 获取 Broker 服务器地址列表(消费者一样),然后根据负载均衡算法从列表中选择一台服务器进行消息发送。

    NameServer 与每台 Broker 服务保持长连接,并间隔 30S 检查 Broker 是否存活,如果检测到 Broker 宕机,则从路由注册表中将其移除,这样就可以实现 RocketMQ 的高可用。

02、Broker

Broker 是消息存储中心,主要作用是接收来自 Producer 的消息并存储,Consumer 从这里取得消息。举例:邮局。

它还存储与消息相关的元数据,包括用户组、消费进度偏移量、队列信息等。从部署结构图中可以看出 Broker 有 Master 和 Slave 两种类型,Master 既可以写又可以读,Slave 只可以读不可以写。

从物理结构上看 Broker 的集群部署方式有四种:单 Master 、多 Master 、多 Master 多 Slave(同步刷盘)、多 Master 多 Slave(异步刷盘)。

  • 单 Master

    这种方式风险较大,一旦 Broker 重启或者宕机时,会导致整个服务不可用。不建议线上环境使用,可以用于本地测试。

  • 多 Master

    所有消息都是 Master,没有 Slave。这种方式优缺点:

    • 优点:配置简单,单个 Master 宕机或重启维护对应用无影响。在磁盘配置为 RAID10 时,即使机器宕机不可恢复的情况下,由于RAID10磁盘非常可靠,消息也不会丢(异步刷盘丢失少量消息,同步刷盘一条不丢),性能最高;
    • 缺点:单台机器宕机期间,这台机器上未被消费的消息在机器恢复之前不可订阅,消息实时性会受到影响。
  • 多 Master 多 Slave(异步复制)

    异步:先响应后再存入磁盘。

    每个 Master 配置一个 Slave,有多对 Master-Slave,HA 采用异步复制方式,主备之间有毫秒级消息延迟。这种方式优缺点:

    • 优点:即使磁盘损坏,消息丢失的非常少,且消息实时性不会受到影响。同时 Master 宕机后,消费者仍然可以从 Salve 消费,而且此过程对应用透明,不需要人工干预,性能同多Master模式几乎一样;
    • 缺点:Master宕机,磁盘损坏情况下会丢失少量消息。
  • 多 Master 多 Slave(同步双写)

    同步:立刻存入磁盘后响应。

    每个 Master 配置一个 Slave,有多对 Master-Slave,HA 采用同步双写方式,即只有主备都写成功,才向应用返回成功。这种模式的优缺点:

    • 优点:数据与服务都无单点故障,Master 宕机情况下,消息无延迟,服务可用性与数据可用性都非常高;
    • 缺点:性能比异步复制模式略低(大约低10%左右),发送单个消息的 RT 会略高,且目前版本在主节点宕机后,备机不能自动切换为主机。
  • 高可用保障

    每个 Broker 与 NameServer 集群中的所有节点建立长连接,定时(每隔30s)注册 Topic 信息到所有 NameServer。NameServer定时(每隔10s)扫描所有存活 Broker 的连接,如果 NameServer 超过2分钟没有收到心跳,则 NameServer 断开与 Broker 的连接。

03、Producer

Producer 也称为消息发布者,负责生产并发送消息至 Topic。举例:发信者。生产者向 brokers 发送由业务应用程序系统生成的消息。RocketMQ 提供了发送:同步、异步和单向(one-way)的多种范例。

  • 同步发送

    同步发送消息指消息发送方发出数据后会在收到接收方发回响应之后才发下一个数据包。一般用于重要通知消息,比如重要通知邮件、营销短信等。

  • 异步发送

    异步发送指发送方发出数据后,不等接收方发回响应,接着发送下个数据包,一般用于可能链路耗时较长而对响应时间敏感的业务场景,例如用户视频上传后通知启动转码服务。假如过一段时间检测到某个信息发送失败,可以选择重新发送。

  • 单向发送

    单向发送是指只负责发送消息而不等待服务器回应且没有回调函数触发,适用于某些耗时非常短但对可靠性要求并不高的场景,例如日志收集。

  • 生产者组

    生产者组(Producer Group)是一类 Producer 的集合,这类 Producer 通常发送一类消息并且发送逻辑一致,所以将这些 Producer 分组在一起。从部署结构上看生产者通过 Producer Group 的名字来标记自己是一个集群。

  • 高可用保障

    Producer 与 NameServer 集群中的其中一个节点(随机选择)建立长连接,定期从 NameServer 取 Topic 路由信息,并向提供 Topic 服务的 Master 建立长连接,且定时向 Master 发送心跳。Producer 完全无状态,可集群部署。

    Producer 每隔30s(由 ClientConfig 的 pollNameServerInterval )从 Nameserver 获取所有 topic 队列的最新情况,这意味着如果 Broker 不可用,Producer 最多30s能够感知,在此期间内发往 Broker 的所有消息都会失败。

    Producer 每隔30s(由 ClientConfig 中 heartbeatBrokerInterval 决定)向所有关联的 broker 发送心跳,Broker 每隔10s扫描所有存活的连接,如果 Broker 在2分钟内没有收到心跳数据,则关闭与 Producer 的连接。

04、Consumer

Consumer 也称为消息订阅者,负责从 Topic 接收并消费消息。举例:收信者。消费者从 brokers 里拉取信息并将其输入应用程序中。

  • 消费者组

    消费者组(Consumer Group)是一类 Consumer 的集合名称,这类 Consumer 通常消费同一类消息并且消费逻辑一致,所以将这些 Consumer 分组在一起。消费者组与生产者组类似,都是将相同角色的分组在一起并命名。

    RocketMQ 中的消息有个特点:同一条消息,只能被某一消费组其中的一台机器消费,但是可以同时被不同的消费组消费。

  • 高可用保障

    Consumer 与 NameServer 集群中的其中一个节点(随机选择)建立长连接,定期从 NameServer 取 Topic 路由信息,并向提供 Topic 服务的 Master、Slave 建立长连接,且定时向 Master、Slave 发送心跳。Consumer 既可以从 Master 订阅消息,也可以从 Slave 订阅消息,订阅规则由 Broker 配置决定。

    Consumer 每隔30s从 Nameserver 获取 topic 的最新队列情况,这意味着 Broker 不可用时,Consumer 最多最需要30s才能感知。

    Consumer 每隔30s(由 ClientConfig 中 heartbeatBrokerInterval 决定)向所有关联的 broker 发送心跳,Broker 每隔10s扫描所有存活的连接,若某个连接2分钟内没有发送心跳数据,则关闭连接;并向该 Consumer Group 的所有 Consumer 发出通知,Group 内的 Consumer 重新分配队列,然后继续消费。

    当 Consumer 得到 master 宕机通知后,转向 slave 消费,slave 不能保证 master 的消息100%都同步过来了,因此会有少量的消息丢失。但是一旦 master 恢复,未同步过去的消息会被最终消费掉。

1.6 运转流程

在这里插入图片描述

  1. NameServer 先启动;
  2. Broker 启动时向 NameServer 注册;
  3. 生产者在发送某个主题的消息之前先从 NamerServer 获取 Broker 服务器地址列表(有可能是集群),然后根据负载均衡算法从列表中选择一台 Broker 进行消息发送;
  4. NameServer 与每台 Broker 服务器保持长连接,并间隔 30S 检测 Broker 是否存活,如果检测到 Broker 宕机(使用心跳机制, 如果检测超120S),则从路由注册表中将其移除;
  5. 消费者在订阅某个主题的消息之前从 NamerServer 获取 Broker 服务器地址列表(有可能是集群),但是消费者选择从 Broker 中 订阅消息,订阅规则由 Broker 配置决定。

1.5 RocketMQ 架构

在这里插入图片描述
RocketMQ 架构图中展示了四个集群:

01、NameServer 集群

提供轻量级的服务发现及路由,每个 NameServer 记录完整的路由信息,提供相应的读写服务,支持快速存储扩展。

NameServer 是一个功能齐全的服务器,主要包含两个功能:

  1. Broker 管理,接收来自 Broker 集群的注册请求,提供心跳机制检测 Broker 是否存活;
  2. 路由管理,每个 NameServer 持有全部有关 Broker 集群和客户端请求队列的路由信息。

02、Broker 集群

通过提供轻量级的 Topic 和 Queue 机制处理消息存储。同时支持推(Push)和拉(Pull)两种模型,包含容错机制。提供强大的峰值填充和以原始时间顺序累积数千亿条消息的能力。此外还提供灾难恢复,丰富的指标统计数据和警报机制,这些都是传统的消息系统缺乏的。

Broker 有几个重要的子模块:

  1. 远程处理模块,Broker 入口,处理来自客户端的请求;
  2. 客户端管理(包括消息生产者和消费者),维护消费者的主题订阅;
  3. 存储服务,提供在物理硬盘上存储和查询消息的简单 API;
  4. HA 服务,提供主从 Broker 间数据同步;
  5. 索引服务,通过指定键为消息建立索引并提供快速消息查询。

03、Producer 集群

消息生产者支持分布式部署,分布式生产者通过多种负载均衡模式向 Broker 集群发送消息。

04、Consumer 集群

消息消费者也支持 Push 和 Pull 模型的分布式部署,还支持集群消费和消息广播。提供了实时的消息订阅机制,可以满足大多数消费者的需求。

DefultMQPullConsumer :consumer 定时向 broker 发送请求获取内存数据,避免给 broker 造成巨大的压力。一般会在本地使用定时任务实现。

DefultMQPushConsumer :consumer 向 broker 发送请求,两者保持长链接的状态。broker 会定时(每 5 秒)去查询 consumer 中是否有要订阅的数据,有就将消息推送给 consumer。

无论是 pull 还是 push,其实本质上都是拉取消息。

07、集群工作流程

  1. 启动 NameServer,NameServer 启动后监听端口,等待 Broker、Producer、Consumer 连接,相当于一个路由控制中心。
  2. Broker 启动后,跟所有的 NameServer 保持长连接,定时发送心跳包。心跳包中包含当前 Broker 信息(IP + 端口等)以及存储所有 Topic 信息。注册成功后,NameServer 集群中就有 Topic 与 Broker 的映射关系。
  3. 收发消息前,先创建 Topic,创建 Topic 时需要指定该 Topic 要存储在哪些 Broker 上,也可以在发送消息时自动创建 Topic。
  4. Producer 发送消息,启动时先跟 NameServer 集群中的其中一台建立长连接,并从 NameServer 中获取当前发送的 Topic 存在哪些 Broker 上,轮询从队列列表中选择一个队列,然后与队列所在的 Broker 建立长连接从而向 Broker 发消息。
  5. Consumer 跟 Producer 类似,跟其中一台 NameServer 建立长连接,获取当前订阅 Topic 存在哪些 Broker 上,然后直接跟 Broker 建立连接通道,开始消费消息。

06、集群间的交互方式

  1. Broker Master 和 Broker Slave 是主从结构,会执行数据同步 Data Sync
    每个 Broker 与 NameServer 集群中所有节点建立长连接,定时注册 Topic 信息到所有 NameServer;
  2. Producer 与 NameServer 集群中的其中一个节点(随机)建立长连接,定期从 NameServer 获取 Topic 路由信息,并与提供 Topic 服务的 Broker Master 建立长连接,定时向 Broker 发送心跳;
  3. Producer 只能将消息发送到 Broker Master,但是 Consumer 同时和 Broker Master 和 Broker Slave 建立长连接,既可以从 Master 订阅消息,也可以从 Slave 订阅消息。

1.6 RocketMQ 优缺点

01、优点

  • 单机吞吐量:十万级
  • 可用性:非常高,分布式架构
  • 消息可靠性:经过参数优化配置,消息可以做到 0 丢失
  • 功能支持:MQ 功能较为完善,还是分布式的,扩展性好
  • 支持 10 亿级别的消息堆积,不会因为堆积导致性能下降
  • 源码是 Java,方便结合公司自己的业务进行二次开发
  • 天生为金融互联网领域而生,对于可靠性要求很高的场景,尤其是电商里面的订单扣款,以及业务削峰,在大量交易涌入时,后端可能无法及时处理的情况
  • RoketMQ 在稳定性上可能更值得信赖,这些业务场景在阿里双11已经经历了多次考验。

02、缺点

  • 支持的客户端语言不多,目前仅支持 Java 及 C++,而且 C++ 还不成熟
  • 没有在 MQ 核心中去实现 JMS 等接口,有些系统要迁移需要修改大量代码

1.7 各种 MQ 比较

在这里插入图片描述

二、RocketMQ 安装(Linux 版本)

安装 RocketMQ 版本:4.5.0,我这里使用的是阿里云服务器,也可以在虚拟机上操作。

2.1 环境要求

  1. Linux 64 位操作系统
  2. JDK 1.8
  3. Maven 3.9.0(maven 版本不固定,但是最好不要使用最高版本)

2.2 安装步骤

01、上传安装包

将下载好的 RocketMQ 安装包上传到服务器上:
在这里插入图片描述

02、解压安装包

# 解压
unzip rocketmq-all-4.5.0-bin-release.zip

# 将解压包移动到指定路径下
mv rocketmq-all-4.5.0-bin-release ../software

在这里插入图片描述

03、参数配置

这里需要配置三个文件:

  1. /conf/broker.conf

    指定 broker 的命名空间地址和当前 broker 监听的 IP:
    在这里插入图片描述
    默认情况下,namesrvAddr = 127.0.0.1:9876brokerIP1 = 127.0.0.1

  2. /bin/runserver.sh

    RocketMQ 默认的虚拟机内存比较大,启动 Broker 如果因为内存不足失败,就需要编辑这两个配置文件,修改 JVM 内存大小:

    JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn128m -XX:MetaspaceSize=128m  -XX:MaxMetaspaceSize=320m"
    

    在这里插入图片描述

  3. /bin/runbroker.sh

    同上,修改 JVM 内存大小:

    JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn128m"
    

    在这里插入图片描述

2.3 目录介绍

  • bin:启动脚本,包括 shell 脚本和 cmd 脚本
  • conf:实例配置文件,包括 broker 配置文件、logback 配置文件等
  • lib:依赖 jar 包,包括 Netty、commons-lang、FastJSON 等

2.4 启动 RocketMQ

启动前先查看进程:
在这里插入图片描述
启动命令(在 bin 目录下执行):

# 启动 nameserver
nohup sh mqnamesrv -n 公网IP:9876 &

# 启动 broker
nohup sh mqbroker -n 公网IP:9876 -c conf/broker.conf autoCreateTopicEnable=true &

启动后查看进程,有这两个进程即启动成功:
在这里插入图片描述

2.5 测试 RocketMQ

  • 模拟生产者发送消息

    # 生产者
    sh tools.sh org.apache.rocketmq.example.quickstart.Producer
    

    输入这个命令后,控制台会输出很多的信息,不报错就说明发送成功了:
    在这里插入图片描述

  • 模拟消费者接收消息

    # 消费者
    sh tools.sh org.apache.rocketmq.example.quickstart.Consumer
    

    在这里插入图片描述

2.6 关闭 RocketMQ

# 关闭服务
sh mqshutdown namesrv
sh mqshutdown broker

三、rocketmq-console 集群监控平台搭建

3.1 简介

RocketMQ 有个可视化的管理界面,通过可视化界面,我们可以方便地监控 RocketMQ 集群,并实现很多操作。比如:创建管理 Topic,查看和发送 ,essage等等。

3.2 搭建集群监控平台

01、下载

RocketMQ提供了UI管理工具,名为rocketmq-console,项目地址:https://github.com/apache/rocketmq-externals/tree/master/rocketmq-console。

这个是 rocketmq 的扩展,里面不仅包含控制台的扩展,也包含对大数据 flume、hbase 等组件的对接和扩展。

在这里插入图片描述

02、上传解压

  • 上传
    在这里插入图片描述
  • 解压
    # 解压
    unzip rocketmq-console.zip
    
    # 移动到 software 目录下
    mv rocketmq-console ../software
    
    在这里插入图片描述

03、修改配置参数

修改 rocketmq-console\src\main\resources\application.properties 配置文件:
在这里插入图片描述

04、打包

进入 rocketmq-console 目录下执行命令:

# 打包
mvn clean package ‐Dmaven.test.skip=true

打包完成后,会在 /software/rocketMQ/rocketmq-console 目录下生成一个 target 文件夹

05、启动和访问

  • 启动

    进入 /rocketmq-console/target 目录下执行:

    # 指定端口号和命名空间地址
    java -jar rocketmq-console-ng-1.0.1.jar --server.port=8088 --rocketmq.config.namesrvAddr=公网IP:9876
    

    在这里插入图片描述

  • 访问

    http://106.15.0.30:8088
    在这里插入图片描述

06、问题点

  • 防火墙

    防火墙需要开放访问 RocketMQ 的一系列端口号:

    # 查看防火墙状态
    systemctl status firewalld
    
    # 关闭防火墙
    systemctl stop firewalld
    
    # 启动防火墙
    systemctl start firewalld
    
    # 永久开放指定端口号【把用到的端口号都开放】
    firewall-cmd --zone=public --add-port=10909/tcp --permanent
    firewall-cmd --zone=public --add-port=10911/tcp --permanent
    firewall-cmd --zone=public --add-port=9876/tcp --permanent
    firewall-cmd --zone=public --add-port=9877/tcp --permanent
    
    # 重新加载防火墙
    firewall-cmd --reload
    
    # 或者重启防火墙
    systemctl restart firewalld.service
    
    # 查看防火墙信息列表
    firewall-cmd --list-all
    
    # 只查看防火墙开放端口号列表
    firewall-cmd --list-ports
    

    除了这一层防火墙之外,阿里云服务器自己还有一层防火墙 iptables,是默认配置的,我们也需要关闭这层防火墙或者开放对应的端口号:

    # 查看防火墙状态出现的问题
    service iptables status
    
    # 关闭防火墙
    service iptables stop
    
  • 安全组

    防火墙端口号开放之后,同时也需要在 ECS 服务器安全组中添加端口规则:
    在这里插入图片描述

四、RocketMQ 发送消息基本样例

引入 jar 包:

<dependency>
   <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-client</artifactId>
    <version>4.5.0</version>
</dependency>

4.1 普通消息发送

/**
 * 同步发送消息
 *
 * @author qiaohaojie
 * @date 2023/2/20  17:00
 */
public class SyncProducer {
    public static void main(String[] args) throws Exception {
        // 1. 实例化消息生产者 producer
        DefaultMQProducer producer = new DefaultMQProducer("group_producer_demo1");
        // 2. 设置 nameServer 的地址
        producer.setNamesrvAddr("公网IP:9876");
        // 关闭 VIP 通道
//        producer.setVipChannelEnabled(false);
        producer.setSendMsgTimeout(3000);
        // 3. 启动 Producer 实例
        producer.start();
        for (int i = 0; i < 10; i++) {
            // 4. 创建消息 message
            Message message = new Message("Topic_Demo", "Tags", "Hello RocketMQ" + i, "hello".getBytes(RemotingHelper.DEFAULT_CHARSET));
            // 5. 发送消息
            SendResult result = producer.send(message);
            System.out.println(result);
        }
        // 6. 关闭 producer
        producer.shutdown();
    }
}

这里的 message 实例中有几个参数:
在这里插入图片描述

  1. topic:代表消息的主题
  2. tags:主要用于消息过滤
  3. keys:消息的唯一值
  4. body:消息体,代表消息的内容
    在这里插入图片描述

4.2 普通消息消费

/**
 * 同步发送消息-消费者
 *
 * @author qiaohaojie
 * @date 2023/2/20  21:39
 */
public class Consumer {
    public static void main(String[] args) throws Exception {
        // 1. 创建DefaultMQPushConsumer实例
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group_consumer_demo1");
        // 2. 设置nameServer地址
        consumer.setNamesrvAddr("公网IP:9876");
        // 设置消息拉取最大数
        consumer.setConsumeMessageBatchMaxSize(2);
        // 3. 设置subscribe,这里是要读取的主题信息
        consumer.subscribe("Topic_Demo", "*");
        // 4. 设置消息监听
        consumer.setMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                // 5. 获取消息信息
                // 迭代消息信息
                for (MessageExt msg : msgs) {
                    try {
                        // 获取主题
                        String topic = msg.getTopic();
                        // 获取标签
                        String tags = msg.getTags();
                        // 获取信息
                        String result = new String(msg.getBody(), RemotingHelper.DEFAULT_CHARSET);
                        System.out.println("Consumer消费消息--topic:" + topic + ",targs:" + tags + ",result:" + result);
                    } catch (UnsupportedEncodingException e) {
                        e.printStackTrace();
                        // 消息重试
                        return ConsumeConcurrentlyStatus.RECONSUME_LATER;
                    }
                }
                // 6. 返回消息读取状态
                // 消息消费完成
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        // 开启Consumer
        consumer.start();
    }
}

消费消息时有两个参数:
在这里插入图片描述

  1. topic:指定要消费的主题
  2. subExpression:消息过滤规则
    在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/359361.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

NetApp Cloud Volumes ONTAP 将数据复制到云或从云中复制

NetApp Cloud Volumes ONTAP 将数据复制到云或从云中复制&#xff0c;为开发运营和基于云的灾难恢复提供支持。 无论应用位于何处&#xff0c;都可以使用企业级存储,让云存储基础架构更经济、更智能、更合规且更安全。 为什么选择 NetApp Cloud Volumes ONTAP NetApp Cloud …

RocketMQ 第二章

RocketMQ 第二章 7、SpringBoot整合RocketMQ SpringBoot 提供了快捷操作 RocketMQ 的 RocketMQTemplate 对象。 7.1、引入依赖 注意依赖的版本需要和 RocketMQ 的版本相同。 <dependency><groupId>org.apache.rocketmq</groupId><artifactId>rock…

本地部署element-plus文档

由于一直使用的前端组件element-plus&#xff0c;所以需要经常看文档&#xff0c;但无奈官网实在不给力&#xff0c;经常报503或者404&#xff0c;大大影响效率和心情&#xff0c;忍无可忍就本地化部署一套解决此问题。 百度了一下大多数都是使用 vscode的live server, 或者放…

JAVA保姆式JDBC数据库免费教程之02-连接池技术

连接池 连接池概念 ​ 概念&#xff1a;其实就是一个容器(集合)&#xff0c;存放数据库连接的容器。 当系统初始化好后&#xff0c;容器被创建&#xff0c;容器中会申请一些连接对象&#xff0c;当用户来访问数据库时&#xff0c;从容器中获取连接对象&#xff0c;用户访问完…

【MySQL】MySQL 架构

一、MySQL 架构 C/S 架构&#xff0c;即客户端/服务器架构。服务器程序直接和我们存储的数据打交道&#xff0c;多个客户端连接这个服务器程序。客户端发送请求&#xff0c;服务器响应请求。 MySQL 数据库实例 &#xff1a;即 MySQL 服务器的进程 &#xff08;我们使用任务管理…

Vue组件间通信的四种方式(函数回调,自定义事件,事件总线,消息订阅与发布)

目录 概述 props配置项-回调函数实现 自定义事件实现 事件总线实现 消息订阅与发布实现 概述 在组件化编程中&#xff0c;组件间的通信是重要的&#xff0c;我们可以有四种方式实现组件间的通信。 分别是&#xff1a;函数回调&#xff0c;自定义事件&#xff0c;事件总…

可调恒流驱动LED电路分析

https://www.icxbk.com/article/detail?aid884 常规使用的pwm调亮度不仅会导致频闪&#xff0c;而且在长时间使用的时候&#xff0c;有损坏led的风险&#xff0c;所以这次设计了一个恒流调亮度电路&#xff0c;其电路图如下所示 电路原理的解读&#xff1a; 左侧的电位计起着…

【JavaScript】js实现深拷贝的方法

前言 在js中我们想要实现深拷贝&#xff0c;首先要了解深浅拷贝的区别。 浅拷贝&#xff1a;只是拷贝数据的内存地址&#xff0c;而不是在内存中重新创建一个一模一样的对象&#xff08;数组&#xff09; 深拷贝&#xff1a;在内存中开辟一个新的存储空间&#xff0c;完完全全…

Java语言常用哪些运算符?

之前有个大家讨论过java的数据类型&#xff0c;总体来说类型和其他几种语言也相差无几&#xff0c;我为什么会这样说&#xff1f;我们应该都要知道Python可还有个复数类型。 这里主要给大家讲解Java运算符的分类和使用。 一、运算符分类 说到运算符&#xff0c;我们可以先了…

硬件系统工程师宝典(9)-----如何正确使用去耦电容

各位同学大家好&#xff0c;欢迎继续做客电子工程学习圈&#xff0c;今天我们继续来讲这本书&#xff0c;硬件系统工程师宝典。上篇我们说到在电源完整性分析时&#xff0c;明确噪声来源可以有效的避免、解决噪声问题。今天我们来看看电源完整性分析中重要的一环&#xff0c;去…

【自动化测试】web自动化测试验证码如何测?如何处理验证码问题?解决方案......

目录&#xff1a;导读前言一、Python编程入门到精通二、接口自动化项目实战三、Web自动化项目实战四、App自动化项目实战五、一线大厂简历六、测试开发DevOps体系七、常用自动化测试工具八、JMeter性能测试九、总结&#xff08;尾部小惊喜&#xff09;前言 在对安全性有要求的…

线程池ThreadPoolExecutor源码剖析

一、Java构建线程的方式 继承Thread &#xff08;也实现了Runnable&#xff09; 实现Runnable 实现Callable &#xff08;与Runnable区别…&#xff09; 线程池方式 &#xff08;Java提供了构建线程池的方式&#xff09;[可以实现Runnable 和 Callable 功能] Java提供了Exe…

使用Vue3实现一个可复制的表格

前言 表格是前端非常常用的一个控件&#xff0c;但是每次都使用v-for指令手动绘制tr/th/td这些元素是非常麻烦的。同时&#xff0c;基础的 table 样式通常也是不满足需求的&#xff0c;因此一个好的表格封装就显得比较重要了。 最基础的表格封装 最基础基础的表格封装所要做…

【并发编程十七】c++实现一个线程池

【并发编程十七】c实现一个线程池一、线程池原理二、实现重点三、个人理解四、实验简介&#xff1a; 大多数系统上&#xff0c;若因某些任务可以与其他任务并行处理&#xff0c;就分别给他们配备专属的线程&#xff0c;则这种做法不切实际。但是只要有可能&#xff0c;我们还是…

C语言进阶——动态内存管理(上)

&#x1f307;个人主页&#xff1a;_麦麦_ &#x1f4da;今日名言&#xff1a;“你若爱&#xff0c;生活哪里都可爱。你若恨&#xff0c;生活哪里都可恨。你若感恩&#xff0c;处处可感恩。你若成长&#xff0c;事事可成长。不是世界选择了你&#xff0c;是你选择了这个世界。既…

mdio协议

1. 简介 MDIO接口中有特定的术语定义总线上的各种设备&#xff0c;驱动MDIO总线的设备被定义为站管理实体&#xff08;STA&#xff09;&#xff0c;而被MDC管理的目标设备称为可被MDIO管理的设备&#xff08;MMD&#xff09;。 STA初始化MDIO所有的通信&#xff0c;同时负责驱动…

【数据结构与算法】哈希表1:字母异位词 两数交集 快乐数 两数之和

文章目录今日任务1.哈希表理论基础&#xff08;1&#xff09;哈希表&#xff08;2&#xff09;哈希函数&#xff08;3&#xff09;哈希碰撞&#xff08;4&#xff09;链地址法&#xff08;拉链法&#xff09;&#xff08;5&#xff09;线性探测法&#xff08;6&#xff09;常见…

Python采集双色球数据,做数据分析,让我自己实现自己的富豪梦

来唠点嗑&#xff1f; 咳咳&#xff0c;最近是咋的了&#xff0c;某站掀起了一股双色球热潮&#xff1f;一般我自己的账号上&#xff0c;是很少看到关于python这些内容的&#xff0c;都是小姐姐和热梗&#xff0c;或者其他搞笑视频 由于&#x1f4b4;的吸引力…手不自觉的就点…

《系统架构设计》-03-软件结构体系和架构风格

文章目录1. 软件结构体系1.1 抽象&#xff08;Abstract&#xff09;1.1.1 抽象的应用1.1.2 不同层次的抽象1.2 组件&#xff08;Component&#xff09;1.2.1 定义1.2.2 切入点1.3 组织过程资产&#xff08;Organizational Process Assets&#xff09;1.3.1 定义1.3.2 作用1.4 体…

springboot整合Chat Generative Pre-trained Transformer

什么是Chat Generative Pre-trained Transformer Chat Generative Pre-trained Transformer&#xff0c;是以人工智能驱动的聊天机器人程序 &#xff0c;已经更新多个版本&#xff0c;很多大厂也都在接入其API。 整合难度 难度一颗星&#xff0c;基本上就是给官方API发请求&am…