rocketmq的基础概念

news2024/9/24 13:22:12

生产者

生产者生产的过程: producer会在接入nameserver时,获取所有topic和队列的信息,然后在每次发送时,根据负载均衡在topic中选择发送的队列。
生产者的消息是发送给具体的queue,而消费者消费是从具体的queue消费
在 RocketMQ 的集群模式下,生产者发送的一条消息只会发送到一个特定的队列(Queue),但是一个queue可能有多个生产者,但是一定只有一个consumer(集群模式)
producer发送消息是按照topic发送的,通过负载均衡选择queue,然后发送到queue对应的broker
对于生产者来说,RocketMQ 中的队列(Queue)是通过 Broker、Topic 和 Queue 共同来区分的
负载均衡
  • 轮询 (Round Robin):轮询选择队列,依次发送到不同的队列。
  • 随机 (Random):随机选择一个队列进行发送。
  • 自定义策略:可以实现 MessageQueueSelector 接口,定义自己的负载均衡策略。例如根据消息的某个属性(如订单ID)选择队列,确保有序性。

生产者组

创建时需要填写生产组的名称,生产者组是指同一类Producer的集合,这类Producer发送同一类消息且发送逻辑一致。

Tag

Topic 是一级分类,而 Tag 可以理解为是二级分类。
Topic:消息主题,通过 Topic 对不同的业务消息进行分类。
Tag:消息标签,用来进一步区分某个 Topic 下的消息分类,消息从生产者发出即带上的属性。
在这里插入图片描述

什么时候该用 Topic,什么时候该用 Tag?

可以从以下几个方面进行判断:

消息类型是否一致:如普通消息、事务消息、定时(延时)消息、顺序消息,不同的消息类型使用不同的 Topic,无法通过 Tag 进行区分。

业务是否相关联:没有直接关联的消息,如淘宝交易消息,京东物流消息使用不同的 Topic 进行区分;而同样是天猫交易消息,电器类订单、女装类订单、化妆品类订单的消息可以用 Tag 进行区分。

消息优先级是否一致:如同样是物流消息,盒马必须小时内送达,天猫超市 24 小时内送达,淘宝物流则相对会慢一些,不同优先级的消息用不同的 Topic 进行区分。

消息量级是否相当:有些业务消息虽然量小但是实时性要求高,如果跟某些万亿量级的消息使用同一个 Topic,则有可能会因为过长的等待时间而“饿死”,此时需要将不同量级的消息进行拆分,使用不同的 Topic。

Tags的使用

一个应用尽可能用一个Topic,而消息子类型则可以用tags来标识。tags可以由应用自由设置,只有生产者在发送消息设置了tags,消费方在订阅消息时才可以利用tags通过broker做消息过滤:message.setTags(“TagA”)。

Keys

每个消息在业务层面的唯一标识码要设置到keys字段,方便将来定位消息丢失问题。

消息的key是消息的一个用户自定义属性,主要用于消息的检索和查询。它是一个字符串,用户可以在发送消息时设置这个key。
Apache RocketMQ 每个消息可以在业务层面的设置唯一标识码 keys 字段,方便将来定位消息丢失问题。 Broker 端会为每个消息创建索引(哈希索引),应用可以通过 topic、key 来查询这条消息内容,以及消息被谁消费。由于是哈希索引,请务必保证 key 尽可能唯一,这样可以避免潜在的哈希冲突。

   // 订单Id
   String orderId = "20034568923546";
   message.setKeys(orderId);

队列

为了支持高并发和水平扩展,需要对 Topic 进行分区,在 RocketMQ 中这被称为队列,一个 Topic 可能有多个队列,并且可能分布在不同的 Broker 上。
在这里插入图片描述
一般来说一条消息,如果没有重复发送(比如因为服务端没有响应而进行重试),则只会存在在 Topic 的其中一个队列中,消息在队列中按照先进先出的原则存储,每条消息会有自己的位点,每个队列会统计当前消息的总条数,这个称为最大位点 MaxOffset;队列的起始位置对应的位置叫做起始位点 MinOffset。队列可以提升消息发送和消费的并发度。

消息

在不同的场景中,需要使用不同的消息进行发送。比如在电商交易中超时未支付关闭订单的场景,在订单创建时会发送一条延时消息。这条消息将会在 30 分钟以后投递给消费者,消费者收到此消息后需要判断对应的订单是否已完成支付。如支付未完成,则关闭订单。如已完成支付则忽略,此时就需要用到延迟消息;电商场景中,业务上要求同一订单的消息保持严格顺序,此时就要用到顺序消息。在日志处理场景中,可以接受的比较大的发送延迟,但对吞吐量的要求很高,希望每秒能处理百万条日志,此时可以使用批量消息。在银行扣款的场景中,要保持上游的扣款操作和下游的短信通知保持一致,此时就要使用事务消息.
Apache RocketMQ可用于以三种方式发送消息:同步、异步和单向传输。前两种消息类型是可靠的,因为无论它们是否成功发送都有响应。

同步消息

在这里插入图片描述

异步模式:返回结果会调用回调函数

在这里插入图片描述

/ 异步发送消息, 发送结果通过callback返回给客户端
          producer.send(msg, new SendCallback() {
            @Override
            public void onSuccess(SendResult sendResult) {
              System.out.printf("%-10d OK %s %n", index,
                sendResult.getMsgId());
              countDownLatch.countDown();
            }
            @Override
            public void onException(Throwable e) {
              System.out.printf("%-10d Exception %s %n", index, e);
              e.printStackTrace();
              countDownLatch.countDown();
            }
          });

单向模式:单向模式调用sendOneway,不会对返回结果有任何等待和处理。

在这里插入图片描述
适用于某些耗时非常短,但对可靠性要求并不高的场景,例如日志收集。

  // 由于在oneway方式发送消息时没有请求应答处理,如果出现消息发送失败,则会因为没有重试而导致数据丢失。若数据不可丢,建议选用可靠同步或可靠异步发送方式。
  producer.sendOneway(msg);

普通消息

顺序消息

对于一个指定的Topic,消息严格按照先进先出(FIFO)的原则进行消息发布和消费,即先发布的消息先消费,后发布的消息后消费。
RocketMQ 通过生产者和服务端的协议保障单个生产者串行地发送消息,并按序存储和持久化。即顺序消息使用的前提是单个生产者,串行发送消息。

ShardingKey(分片键)用来消息的负载均衡,rocketmq没有具体的类来实现,但是在producer发送消息时会使用这个概念

当消息的key用于消息路由和负载均衡时,它可以充当Sharding Key的角色。

在 RocketMQ 中,Sharding Key 是用于在消息发送时保证消息有序性的机制。使用 Sharding Key,可以确保具有相同 Sharding Key 的消息会被发送到同一个队列中,从而保证这些消息的顺序性。
RocketMQ 通过生产者和服务端的协议保障单个生产者串行地发送消息,并按序存储和持久化。
使用shardingkey的例子:

// 发送消息,并使用自定义的MessageQueueSelector来选择队列
SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
    @Override
    public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
        Integer id = (Integer) arg; // 传递的订单ID
        int index = id % mqs.size(); // 选择队列的索引
        return mqs.get(index); // 返回选择的队列
    }
}, orderId); // 将orderId作为arg传递给MessageQueueSelector

延时消息

延迟消息发送是指消息发送到Apache RocketMQ后,并不期望立马投递这条消息,而是延迟一定时间后才投递到Consumer进行消费。

   Message message = new Message("TestTopic", ("Hello scheduled message " + i).getBytes());
    // This message will be delivered to consumer 10 seconds later.
    message.setDelayTimeLevel(3);
    // Send the message
    producer.send(message);

批量消息

这里调用非常简单,将消息打包成 Collection msgs 传入方法中即可,需要注意的是批量消息的大小不能超过 1MiB(否则需要自行分割),其次同一批 batch 中 topic 必须相同。

producer.start();

//If you just send messages of no more than 1MiB at a time, it is easy to use batch
//Messages of the same batch should have: same topic, same waitStoreMsgOK and no schedule support
String topic = "BatchTest";
List<Message> messages = new ArrayList<>();
messages.add(new Message(topic, "Tag", "OrderID001", "Hello world 0".getBytes()));
messages.add(new Message(topic, "Tag", "OrderID002", "Hello world 1".getBytes()));
messages.add(new Message(topic, "Tag", "OrderID003", "Hello world 2".getBytes()));

producer.send(messages);

事务消息

消费者

消费者组订阅一个或多个主题,组内的消费者实例根据订阅的主题和过滤条件来消费消息。
消费者组订阅一个或多个Topic时,实际上是订阅了这些Topic下的所有队列。

集群模式(Clustering Mode)和广播模式(Broadcasting Mode)是针对消费者而言的,和生产者无关

在集群消费模式下,一个消息队列通常会被分配给一个消费者实例进行消费,确保消息的有序性和避免重复消费。
在广播消费模式下,每个消息队列中的消息会被所有订阅该 Topic 的消费者实例消费一次,每个消息被所有消费者实例处理一次。

消费者消费的过程:消费者实例启动后会向NameServer注册,获取该Topic的路由信息,包括所有Broker的地址和Queue信息,RocketMQ通过负载均衡算法将Topic下的Queue均衡分配给消费者组内的各个消费者实例,消费者实例根据分配到的Queue,从Broker中主动拉取消息进行消费。

在这里插入图片描述
在这里插入图片描述
默认的分配策略是平均分配,这也是最常见的策略。平均分配策略下消费组内的消费者会按照类似分页的策略均摊消费。
在这里插入图片描述
在这里插入图片描述
在集群模式下,消费位点是由客户端提给交服务端保存的,在广播模式下,消费位点是由客户端自己保存的。

Rocketmq的消费者的消费模式:push,pull和长轮询

push
public class Consumer {
  public static void main(String[] args) throws InterruptedException, MQClientException {
    // 初始化consumer,并设置consumer group name
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name");
   
    // 设置NameServer地址 
    consumer.setNamesrvAddr("localhost:9876");
    //订阅一个或多个topic,并指定tag过滤条件,这里指定*表示接收所有tag的消息
    consumer.subscribe("TopicTest", "*");
    //注册回调接口来处理从Broker中收到的消息
    consumer.registerMessageListener(new MessageListenerConcurrently() {
      @Override
      public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
        System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
        // 返回消息消费状态,ConsumeConcurrentlyStatus.CONSUME_SUCCESS为消费成功
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
      }
    });
    // 启动Consumer
    consumer.start();
    System.out.printf("Consumer Started.%n");
  }
}

消息重试

若Consumer消费某条消息失败,则RocketMQ会在重试间隔时间后,将消息重新投递给Consumer消费,若达到最大重试次数后消息还没有成功被消费,则消息将被投递至死信队列

消息重试只针对集群消费模式生效;广播消费模式不提供失败重试特性,即消费失败后,失败消息不再重试,继续消费新的消息
consumer.setMaxReconsumeTimes(10);//最大重试次数
consumer.setSuspendCurrentQueueTimeMillis(5000);//重试间隔

死信队列

当一条消息初次消费失败,RocketMQ会自动进行消息重试,达到最大重试次数后,若消费依然失败,则表明消费者在正常情况下无法正确地消费该消息。此时,该消息不会立刻被丢弃,而是将其发送到该消费者对应的特殊队列中,这类消息称为死信消息(Dead-Letter Message),存储死信消息的特殊队列称为死信队列(Dead-Letter Queue),死信队列是死信Topic下分区数唯一的单独队列。如果产生了死信消息,那对应的ConsumerGroup的死信Topic名称为%DLQ%ConsumerGroupName,死信队列的消息将不会再被消费。可以利用RocketMQ Admin工具或者RocketMQ Dashboard上查询到对应死信消息的信息。

pull

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1677952.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Git系列:git add 被忽视的操作技巧

&#x1f49d;&#x1f49d;&#x1f49d;欢迎莅临我的博客&#xff0c;很高兴能够在这里和您见面&#xff01;希望您在这里可以感受到一份轻松愉快的氛围&#xff0c;不仅可以获得有趣的内容和知识&#xff0c;也可以畅所欲言、分享您的想法和见解。 推荐:「stormsha的主页」…

C语言——栈和队列

文章目录 一、栈1. 栈的概念2. 栈的基本功能3. 栈的实现 二、 队列1. 队列的概念2. 队列的基本功能3. 队列的实现 一、栈 1. 栈的概念 栈是一种特殊的线性表&#xff0c;限定仅在表尾进行插入和删除的线性表。这一端称之为栈顶&#xff0c;另一端称为栈底。 栈又称为后进先出…

python将两张图片对齐

目录 需要对齐的照片如下&#xff1a; 源码&#xff1a; 结果&#xff1a; 需要对齐的照片如下&#xff1a; 源码&#xff1a; import cv2 import numpy as np from matplotlib import pyplot as plt# 读取两张图片 imgA cv2.imread(./out/out/3.png) imgB cv2.imread(./…

工具:资源包提取

1.提取unity资源包的工具 一定要通过文件夹的方式选择unity文件否则导出来后的资源不完整

锚点组件--支持点击、滚动高亮锚点

实现一个锚点组件&#xff0c;页面滚动时高亮当前位置锚点、点击锚点时跳转到指定冒点位置&#xff0c;同时选中锚点也高亮 效果图 父组件 import ./index.less; import Anchor from ./Anchor; import Content from ./Content;export default function index() {return (<…

rocketmq的流程

生产过程 消费过程 存储 在RocketMQ中&#xff0c;一个Broker的所有Topic的消息都会被写入到同一个CommitLog文件中。 每个队列&#xff08;Queue&#xff09;都有对应的ConsumeQueue文件。 ConsumeQueue每个记录定长&#xff0c;20字节&#xff0c;消息在commitlog中的偏移量…

【软件的安装与基本设置】AD21软件的PCB规则设置

在绘制PCB之前&#xff0c;要进行规则的创建&#xff0c;因为在绘制PCB的过程中&#xff0c;难免会出现很多错误&#xff0c;所以需要先对绘制PCB创建规则&#xff0c;即所有的打孔&#xff0c;走线&#xff0c;铺铜都要基于电气性能规则去设计&#xff0c;等到后期&#xff0c…

[vue] nvm

nvm ls // 看安装的所有node.js的版本nvm list available // 查显示可以安装的所有node.js的版本可以在可选列表里。选择任意版本安装&#xff0c;比如安装16.15.0 执行&#xff1a; nvm install 16.15.0安装好了之后。可以执行&#xff1a; …

云服务器修改端口通常涉及几个步骤

云服务器修改端口通常涉及几个步骤 远程连接并登录到Linux云服务器&#xff1a; 使用SSH工具&#xff08;如PuTTY、SecureCRT等&#xff09;远程连接到云服务器。 输入云服务器的IP地址、用户名和密码&#xff08;或密钥&#xff09;进行登录。 修改SSH配置文件&#xff1a…

智能数据提取:在严格数据治理与安全标准下的实践路径

一、引言 随着信息技术的飞速发展&#xff0c;数据已成为企业最宝贵的资产之一。然而&#xff0c;数据量的爆炸式增长和数据格式的多样化&#xff0c;使得传统的数据提取方法变得效率低下且难以满足业务需求。智能数据提取技术应运而生&#xff0c;它通过应用人工智能和机器学…

Unity里的Time

Time and frame rate management Time类&#xff1a; Time script reference page. 一些常见的属性有&#xff1a; Time.time 返回从游戏开始经历的时间.Time.deltaTime 返回从上帧结束到现在经历的时间&#xff0c;和帧率成反比Time.timeScale 控制时间流逝的因子Time.fixe…

一个制剂生产人眼中的精益管理

精益管理&#xff08;Lean Management&#xff09;是一种通过减少浪费和提高价值创造的方法&#xff0c;广泛应用于各个行业中&#xff0c;包括药品制剂生产领域。 本文&#xff0c;以一个多年从事药品制剂生产的人的角度&#xff0c;从优点、功能以及与其他管理方法的比较等方…

交通灯-设计说明书

设计摘要&#xff1a; 本设计基于单片机技术&#xff0c;旨在实现智能化交通信号控制&#xff0c;并具备夜间模式、禁止通行模式、同行模式切换以及车流量监测功能。通过按键S1和S2实现夜间模式和禁止通行模式的切换&#xff0c;确保夜间交通安全和禁止通行的需要。按键S3和S4…

阿里云OSS如果指定某个文件夹给子账户

** 第一步创建子账号 ** 创建完用户不要给任何权限&#xff01; 当前页面切换到认证管理获取AccessKey即可 第二步目录授权 找到对应桶文件目录 上面授权按钮操作 选择添加的子账号账号保存即可&#xff01;

springmvc核心流程

核心流程及配置 核心流程 执行流程 用户发送请求到DispatcherServlet前端控制器&#xff0c;前端控制器收到请求后自己不进行处理&#xff0c;而是委托给其他的解析器进行处理&#xff0c;作为统一访问点&#xff0c;进行全局的流程控制 DispatcherServlet调用HandlerMapping映…

电机完美控制的感觉如何【应用案例】

当电机控制技术成为人体的一部分时&#xff0c;对控制系统的组件尺寸和可靠性要求将极大提高。得益于集成式FOC控制系统组件&#xff0c;第一款具有两个活动关节的义肢得以在短时间内完成—— 赶上在苏黎世举办的全球半机械人奥运会(Cybathlon)。 失去肢体显然会对一个人的生活…

社交媒体的探索者:探寻Facebook的发展历程

在当今数字化时代&#xff0c;社交媒体已经成为了人们日常生活中不可或缺的一部分&#xff0c;而Facebook作为最具影响力的社交平台之一&#xff0c;其发展历程承载着无数的探索与创新。本文将深入探讨Facebook的发展历程&#xff0c;从其创立初期到如今的全球化影响&#xff0…

MySQL深入理解MVCC机制(详解)

深入理解MVCC 1、MVCC定义 MVCC:Multi-Version Concurrency Control&#xff0c;多版本并发控制机制。 在mysql中&#xff0c;为了满足事务的四大特性之一的隔离性&#xff0c;就是当前事务中的查询的数据不受其他事务的增删改操作的影响&#xff0c;因此mysql主要是通过这个…

智能鱼缸-设计说明书

设计摘要&#xff1a; 本论文以STC89C52单片机为核心控制器&#xff0c;构建了一套智能鱼缸系统。该系统由中控部分、输入部分和输出部分组成。中控部分采用STC89C52单片机&#xff0c;负责获取输入部分数据并进行处理&#xff0c;控制输出部分。输入部分包括TDS水质水温检测模…

PyCharm2023 社区版安装 +中文语言包+配置教程+Python环境搭建

一、Python 安装 我们在安装Pycharm之前&#xff0c;首先要先安装Python环境也就是安装Python解释器 因为PyCharm是一个用于编写和调试Python代码的开发工具&#xff0c;而Python解释器是用于解释执行Python代码PyCharm需要依赖Python解释器来执行Python代码&#xff0c;因此…