RocketMq基础详解

news2024/9/25 21:30:54

1、RocketMq的架构:

 在RocketMq中有四个部分组成,分别是Producer,Consumer,Broker,以及NameServer,类比于生活中的邮局,分别是发信者,收信者,负责暂存,传输的邮局,以及协调各个地方邮局的管理机构。

1、NameServer:

主要是 Topic 和 Broker 注册中心,支持 Broker 动态注册和发现,主要保存 Topic 和 Borker 之间的关系。通常也是集群部署,但是各 NameServer 之间不会互相通信, 各 NameServer 都有完整的路由信息,即无状态。

2、Broker:

分为 Master Broker 和 Slave Broker,一个 Master Broker 可以对应多个 Slave Broker,但是一个 Slave Broker 只能对应一个 Master Broker。

Master与Slave的对应关系通过指定相同的BrokerName,不同的BrokerId来定义,BrokerId为0表示Master,非0表示Slave。Master也可以部署多个。每个Broker与Name Server集群中的所有节点建立长连接,定时注册Topic信息到所有Name Server。Broker 启动后需要完成一次将自己注册至 Name Server 的操作。

3、Producer:

就是消息生产者,可以集群部署。它会先和 NameServer 集群中的随机一台建立长连接,得知当前要发送的 Topic 存在哪台 Broker Master上,然后再与其建立长连接,支持多种负载平衡模式发送消息。

4、Consumer:

消息消费者,也可以集群部署。它也会先和 NameServer 集群中的随机一台建立长连接,得知当前要消息的 Topic 存在哪台 Broker Master、Slave上,然后它们建立长连接,支持集群消费和广播消费消息。

5、Topic(主题):

可以看做消息的规类,它是消息的第一级类型,比如一个电商系统可以分为:交易消息、物流消息等,一条消息必须有一个 Topic;Topic 与生产者和消费者的关系非常松散,一个 Topic 可以有0个、1个、多个生产者向其发送消息,一个生产者也可以同时向不同的 Topic 发送消息。一个 Topic 也可以被 0个、1个、多个消费者订阅。

rocketMq 允许自动创建Topic和手动创建Topic,自动创建主题那么有可能该主题的消息都只会发往一台 Broker,起不到负载均衡的作用。

因为创建新 Topic 的请求到达 Broker 之后,Broker 创建对应的路由信息,但是心跳是每 30s 发送一次,所以说 NameServer 最长需要 30s 才能得知这个新 Topic 的路由信息。

假设此时发送方还在连续快速的发送消息,那 NameServer 上其实还没有关于这个 Topic 的路由信息,所以有机会让别的允许自动创建的 Broker 也创建对应的 Topic 路由信息,这样集群里的 Broker 就能接受这个 Topic 的信息,达到负载均衡的目的,但也有个别 Broker 可能,没收到,即不创建,或者nameServer还没有收到。

如果发送方这一次发了之后 30s 内一个都不发,之前的那个 Broker 随着心跳把这个路由信息更新到 NameServer 了,那么之后发送该 Topic 消息的 Producer 从 NameServer 只能得知该 Topic 消息只能发往之前的那台 Broker ,这就不均衡了,如果这个新主题消息很多,那台 Broker 负载就很高了。而且达不到高可用,假如这台broker挂掉,也不能负载到另一台,单点故障。所以不建议线上开启允许自动创建主题,即 autoCreateTopicEnable 参数。

6、Tag(标签):

可以看作子主题,它是消息的第二级类型,用于为用户提供额外的灵活性。使用标签,同一业务模块不同目的的消息就可以用相同 Topic 而不同的 Tag 来标识。比如交易消息又可以分为:交易创建消息、交易完成消息等,一条消息可以没有 Tag 。标签有助于保持您的代码干净和连贯,并且还可以为 RocketMQ 提供的查询系统提供帮助。

7、Group:

分组,一个组可以订阅多个Topic。分为ProducerGroup,ConsumerGroup,代表某一类的生产者和消费者,一般来说同一个服务可以做为Group,同一个Group一般来说发送和消费的消息都是一样的。

Producer Group:是一类Producer的集合名称,这类Producer通常发送一类消息,且发送逻辑一致。相同角色的生产者被分组在一起。同一生产者组的另一个生产者实例可能被broker联系,以提交或回滚事务,以防原始生产者在交易后崩溃。有序消息一定要指定消息组,因为不同发送消息组存储可能是无序的。

Consumer Group:Consumer Group是一类Consumer的集合名称,这类Consumer通常消费一类消息,且消费逻辑一致(使用相同 Group ID 的订阅者属于同一个集群。同一个集群下的订阅者消费逻辑必须完全一致(包括 Tag 的使用),这些订阅者在逻辑上可以认为是一个消费节点)。消费者群体是一个伟大的概念,它实现了负载平衡和容错的目标,在信息消费方面,是非常容易的。消费时一定要有消费组的ID。

8、Queue:

队列其实就是对Topic的分片,在Kafka里面就是Partition。

将Topic分片再切分为若干等分,其中的一份就是一个Queue。每个Topic分片等分的Queue的数量可以不同,由用户在创建Topic时指定。queue数量指定方式:

1、代码指定:

producer.setDefaultTopicQueueNums(8);

2、配置文件指定:

同时设置broker服务器的配置文件broker.properties:defaultTopicQueueNums=16;

另一种队列,读写队列,如图(rocketmq控制台):

读写队列的个数其实跟存储没有多大关系,它们仅是为消息投递和消费时负载生产与消费。另外一个目的就是为了Broker缩容。正常情况下读队列个数=写队列个数=topic队列的真实个数,否则可能会出问题,比如:读队列个数 < 写队列个数 < topic队列的真实个数,则会导致消息不能消费。

perm属性用于设置对当前创建Topic的操作权限:2表示:只写,4表示:只读,6表示:读写。

9、Offset:

偏移,本质上有两种Offset,一种是写入时末尾的Offset,另一种是同一消费组读的Offset。RocketMq消息内容是分片存储,CommitLog 的大小默认是1G,当超过大小限制的时候需要准备新的文件,CommitLog 采用混合型存储,也就是所有 Topic 都存在一起,顺序追加写入,文件名用起始偏移量命名。其次RocketMq还存储了消息体与偏移的关系,用于快速随机读取和检索。

 2、Producer生产消息:

producer生产消息有三种方式。

1、同步消息(可靠同步发送):

同步发送是指消息发送方发出数据后,会阻塞直到MQ服务方发回响应消息。

2、异步消息(可靠异步发送):

异步发送是指发送方发出数据后,不等Server响应,接着发送下个数据包的通讯方式。MQ 的异步发送,需要用户实现异步发送回调接口(SendCallback),在执行消息的异步发送时,应用不需要等待服务器响应即可直接返回,通过回调接口接收服务器响应,并对服务器的响应结果进行处理。

3、单向(one-way)消息: 

单向(Oneway)发送特点为只负责发送消息,不等待服务器回应且没有回调函数触发,即只发送请求不等待应答。

4、生产重试:

RocketMQ 为使用者封装了,消息重试的处理流程,无需开发人员手动处理。

相关API:

DefaultMQProducer 可以设置消息发送失败的最大重试次数,并可以结合发送的超时时间来进行重试的处理,具体API如下:

// 设置消息发送失败时的最大重试次数

public void setRetryTimesWhenSendFailed(int retryTimesWhenSendFailed) {

this.retryTimesWhenSendFailed = retryTimesWhenSendFailed;

}

// 同步发送消息,并指定超时时间

public SendResult send(Message msg,long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {

return this.defaultMQProducerImpl.send(msg, timeout);

}

因此,实现生产端的重试十分简单,例如:下面的代码可以设置Producer如果在5s内没有发送成功,则重试5次:

// 同步发送消息,如果5秒内没有发送成功,则重试5次

DefaultMQProducer producer = new DefaultMQProducer("DefaultProducer");

producer.setRetryTimesWhenSendFailed(5);

producer.send(msg,5000L);

3、RocketMq 消息的可靠性:

Rocketmq消息的可靠性,其实是靠,1、Producer的生产机制来实现,2、Broker的刷盘机制和主从同步机制来实现,3、Consumer的消费机制来实现。Producer的生产机制上面已述,Consumer的消费机制下面再述,目前只谈2。

1、刷盘机制:

刷盘机制能解决单点故障,分为同步刷盘和异步刷盘。

同步刷盘:

消息写入内存的 PageCache后,立刻通知刷盘线程刷盘,然后等待刷盘完成,刷盘线程执行完成后唤醒等待的线程,返回消息写成功的状态。这种方式可以保证数据绝对安全,但是吞吐量不大。本质上是内存写入磁盘,如果不同步刷,写入内存就返回。

异步刷盘:

消息写入到内存的 PageCache中,就立刻给客户端返回写操作成功,当 PageCache中的消息积累到一定的量时,触发一次写操作,或者定时等策略将 PageCache中的消息写入到磁盘中。本质上是先写入内存,再写入磁盘。

2、同步机制:

Rocketmq 同步机制有同步复制、异步复制两种策略,但不管是哪一种策略,底层同步逻辑是一致的:均是由slave不断轮询master拉取消息,并提交同步offset。

通过Master中的配置项:brokerRole决定,有三种选项:sync_master、async_master和slave。从字面意思理解, sync_master是同步方式,也就是Master角色Broker中的消息要立刻同步过去;async_master是异步方式,也就是Master角色Broker中的消息是通过异步处理的方式同步到Slave角色的机器上的。slave则是在Slave的Broker中指定。

在SYNC_MASTER场景下:消息发送到master后,暂时不返回成功/失败,而是等待slave拉取。slave按照顺序从前往后拉取,拉到了该条消息后,master才返回落盘成功。若在规定时间内(默认3s)没有拉取到该消息,则master会返回一个FLUSH_SLAVE_TIMEOUT异常给发送方,此时该消息发送即算作失败。

在ASYNC_MASTER场景下:消息发送到master后,不管slave有没有拉取到该消息,master都会返回成功。

4、RocketMq 消息的消费机制:

RocketMq 消息的消费机制可分为分组消费,广播消费,消费模式,消费可靠性。

分组消费:

分组消费,即多个消费端通过同一消费组Id(Id通常需要手动预先在RocketMq控制台创建。)去消费,此种消费同一组,一对一消费模式,即同一组中同一时刻只能有一个收到最新的消息。分组消费初次订阅Topic时,可以指定Offset从哪消费,即从Topic头消费,还是末端消费(最新),消费以后RocketMq会以消费组维度记录Topic的消费Offset。

广播消费:

广播消费所有Consumer都能收到订阅以后最新的Topic消息,即只消费最新的,Consumer停了以后也不会去记录Consumer消费的Offset。

消费模式:

RocketMq的消费模式分为,push和pull消费两种,pull即主动从消息服务器拉取信息,push即Broker主动推送消息到Counsumer(其实RocketMq没有做到,本质上还是拉取,仅是拉取的频率高,近似推送。)

消费可靠性:

消息的可靠性分为消息消费的提交方式和重试机制。

提交方式即是:

  1. 先提交后消费;
  2. 先消费,消费成功后再提交;

1可以解决重复消费的问题但是会丢失消息(不可靠),2会导制消息重复(可靠),得去从幂等。

重试机制:

消费者消费消息后,需要给Broker返回消费状态,Topic消息队列的Offset才会下移,否则会重试,重试分为:

异常重试:由于Consumer端逻辑出现了异常,导致返回了RECONSUME_LATER状态,那么Broker就会在一段时间后尝试重试。

超时重试:如果Consumer端处理时间过长,或者由于某些原因线程挂起,导致迟迟没有返回消费状态,Broker就会认为Consumer消费超时,此时会发起超时重试。

RocketMQ可在broker.conf文件中配置Consumer端的重试次数和重试时间间隔,如下:

messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h

但是在大部分情况下,如果Consumer端逻辑出现异常,重试太多次也没有很大的意义,我们可以在代码中指定最大的重试次数。

RocketMQ会有一个针对消费组创建重试队列,当消费失败后会放入重试队列,后续消息周期间隔性消费是通过重试队列实现的。达到最大次数会放入死信队列

5、消息回溯:

 回溯消费是指Consumer已经消费成功的消息,或者之前消费业务逻辑有问题,现在需要重新消费。要支持此功能,则Broker存储端在向Consumer消费端投递成功消息后,消息仍然需要保留。重新消费一般是按照时间维度,例如由于Consumer系统故障,恢复后需要重新消费1小时前的数据。RocketMQ Broker提供了一种机制,可以按照时间维度来回退消费进度,这样就可以保证只要发送成功的消息,只要消息没有过期,消息始终是可以消费到的。

6、死信队列:

当一条消息初次消费失败,消息队列 RocketMQ 会自动进行消息重试;达到最大重试次数后,若消费依然失败,则表明消费者在正常情况下无法正确地消费该消息,此时,消息队列 RocketMQ 不会立刻将消息丢弃,而是将其发送到该消费者对应的特殊队列中。

死信队列具有以下特性:

  1. 一个死信队列对应一个 Group ID, 而不是对应单个消费者实例;
  2. 如果一个 Group ID 未产生死信消息,消息队列 RocketMQ 不会为其创建相应的死信队列。一个死信队列包含了对应 Group ID 产生的所有死信消息,不论该消息属于哪个 Topic;
  3. 死信队列是一个特殊的Topic,名称为%DLQ%consumerGroup;

死信队列中的消息需要人工干预,在RocketMQ中,可以通过使用console控制台对死信队列的权限更改为读写,然后对消息进行重发来或者订阅对应的Topic使得消费者实例再次进行消费。

7、顺序消息:

消息有序指的是,消费者端消费消息时,需按照消息的发送顺序来消费,即先发送的消息,需要先消费(FIFO)。

顺序消息的原理:

在默认的情况下,消息发送会采取Round Robin轮询方式把消息发送到不同的queue;而消费消息的时候从多个queue上拉取消息,这种情况发送和消费是不能保证顺序的。但是如果控制发送的顺序消息只依次发送到同一个queue中,消费的时候只从这个queue上依次拉取,则就保证了顺序。当发送和消费参与的queue只有一个,则是全局有序;如果多个queue参与,则为分区有序,即相对每个queue,消息都是有序的。其实要实现顺序消息的生产与消费,需满足以下三点:

  1. 待需有序的一批消息,落在同一Topic的队列上;
  2. Producer生产消息时,逻辑上顺序串行发送;
  3. Producer需要指定消息组,因为相同消息组的消息按照先后顺序被存储在同一个队列;

注:顺序消息可靠性其实不那么高,1、消息可能丢失,2、消费失败会进入重试队列,和死信队列。种种情况消息未必都有序。

 8、事务消息:

RocketMQ提供类似X/Open XA的分布式事务功能来确保业务发送方和MQ消息的最终一致性,其本质是通过半消息(prepare消息和commit消息)的方式把分布式事务放在MQ端来处理。

 

其中:

  1. 发送方向消息队列 RocketMQ 服务端发送消息;
  2. 服务端将消息持久化成功之后,向发送方 ACK 确认消息已经发送成功,此时消息为半消息;
  3. 发送方开始执行本地事务逻辑;
  4. 发送方根据本地事务执行结果向服务端提交二次确认(Commit 或是 Rollback),服务端收到 Commit 状态则将半消息标记为可投递,订阅方最终将收到该消息;服务端收到 Rollback 状态则删除半消息,订阅方将不会接受该消息;

补偿流程:

  1. 在断网或者是应用重启的特殊情况下,上述步骤 4 提交的二次确认最终未到达服务端,经过固定时间后服务端将对该消息发起消息回查;
  2. 发送方收到消息回查后,需要检查对应消息的本地事务执行的最终结果;
  3. 发送方根据检查得到的本地事务的最终状态再次提交二次确认,服务端仍按照步骤 4 对半消息进行操作;

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/187277.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

找到二叉树中的最大搜索二叉树

题目 给定一棵二叉树的头节点 head&#xff0c;一致其中所有节点的值都不一样&#xff0c;找到含有节点最多的搜索二叉树&#xff0c;并返回这棵子树的头节点。 示例 分析 树形dp套路&#xff1a;如果题目求解目标是S规则&#xff0c;则求解流程可以定成以每一个节点为头节点…

【前端】如何判断是页面滚动还是窗口滚动

在写项目的时候遇到这个问题&#xff0c;现在举两个例子来记录这个问题。 页面滚动 html: <div class"temp"><template v-for"item in 100"><div>{{ item }}</div></template> </div>css: .temp {height: 100px;o…

老马闲评数字化「3」业务说了算还是技术说了算?

原文作者&#xff1a;行云创新CEO 马洪喜 导语 前两集和大伙聊了一下“数字化不转型行不行”以及“你的企业急不急着转”这两个话题。后面收到不少朋友的消息&#xff0c;说写的挺好&#xff0c;但“急着转、不敢转”的情况非常的普遍&#xff0c;有没有啥好主意给说一说。 麦…

冬去春来,ToB行业压缩的弹簧就要迸发了

目前来看&#xff0c;认知、实践、技术、服务这四方面的新变化&#xff0c;都将成为2023年企业数智化业务需求“井喷”的重要原因。 作者|周羽 出品|产业家 2023&#xff0c;冬去春来。 不止于字面。新的一年&#xff0c;中国的ToB厂商即将迎来“拨云见日”的朗朗晴空。 …

[文件上传工具类] MultipartFile 统一校验

目录​​​​​​​ 1. 创建上传文件的统一校验类 包含功能: -> 1. 多文件上传校验 -> 2. 文件名字校验(特殊符号) -> 3. 文件后缀校验 2. 使用方式 建议: 在文件上传开始的位置添加 -> 两个重载方法, 单文件 多文件都支持 -> 示例: 直接可以用, 任意位…

C++ 包装器function

目录 1、为什么需要包装器&#xff1f; 2、包装器的声明和使用 (1) 声明 (2) 实际应用 (3) 包装器接收类成员函数 3、包装器的绑定&#xff1a;bind函数 (1) 调整参数顺序 (2) 调整参数个数 1、为什么需要包装器&#xff1f; 函数模板可以接收各种不同类型的参数&…

光流正负值的含义以及如何利用光流进行warping

本文主要介绍光流的形式&#xff0c;光流值的正负代表什么含义&#xff0c;以及如何利用光流进行warping。 1. 光流正负值的含义 光流的概念&#xff1a;光流表示的是从reference frame到target frame&#xff0c;物体的移动。光流的形式&#xff1a;光流的表示也是数字化的。…

镭速-跨国车企数据高速、安全跨境传输解决方案

一、背景及趋势 在新一代信息技术驱动的数字经济时代&#xff0c;数据已然成为新型生产要素&#xff0c;是国家基础性资源和战略性资源。在汽车市场全球化背景下&#xff0c;产品、数据双跨境将成为车企未来常态。数据的价值核心在于流通和应用&#xff0c;但数据也牵涉着竞争…

23.1.30 将TF-A源码移植的过程,整理成自己的笔记

将TF-A源码移植的过程&#xff0c;整理成自己的笔记&#xff0c;上传到CSDN 一、配置TF-A源码 自动探测 自动换行 1.对tf-a源码进行解压 tar xfz tf-a-stm32mp-2.2.r2-r0.tar.gz 打补丁 3.进入tf-a源码目录 $> cd tf-a-stm32mp-2.2.r2 4.打补丁命令 $> for p in ls -1 .…

JavaScript中的Array对象~

初识Array&#xff1a; Array 对象用于在单个的变量中存储多个值 定义&#xff1a; 方式1 //返回的数组为空&#xff0c;length字段为0 var 变量名new Array(); //size是期望的数组元素个数&#xff0c;返回的length字段将被设置为size的值--返回具有指定个数&#xff0c;元…

vue2面试题持续更新。。。

文章目录1、vue 修改数据页面不重新渲染数组/对象的响应式 &#xff0c;vue 里面是怎么处理的&#xff1f;2、生命周期Vue 生命周期都有哪些&#xff1f;父子组件生命周期执行顺序3、watch 和 computed 的区别4、组件通信&#xff08;组件间传值&#xff09;5、$nextTick6、修饰…

postman常用变量总结

一、变量分类环境变量&#xff1a;只在所属环境内使用&#xff1b;全局变量&#xff1a;整个postman中全部接口皆可使用该变量&#xff1b;集合变量&#xff1a;只在设置的集合中可使用&#xff0c;且与环境无关&#xff1b;局部变量数据变量二、环境变量设置方式方式一方式二方…

网络知识详解之:HTTP协议基础

网络知识详解之&#xff1a;HTTP协议基础 计算机网络相关知识体系详解 网络知识详解之&#xff1a;TCP连接原理详解网络知识详解之&#xff1a;HTTP协议基础网络知识详解之&#xff1a;HTTPS通信原理剖析&#xff08;对称、非对称加密、数字签名、数字证书&#xff09;网络知…

第三章.逻辑回归—逻辑回归

第三章.逻辑回归 3.1 逻辑回归&#xff08;Logistic Regression&#xff09; 线性回归以及非线性回归是用来处理回归问题的&#xff0c;而逻辑回归是用来处理分类问题的。 1.应用场景&#xff1a; 1).分类&#xff1a; 垃圾邮件分类预测肿瘤是良性还是恶行预测某人的信用是好…

ITIL知识管理分析及如何实施

什么是知识管理 知识管理是在 IT 服务台内收集、分析、存储和共享知识的过程。它旨在帮助服务台团队在整个使用寿命期间做出正确的决策 通过有效控制和处理信息流来循环和事件解决过程。 ITIL 4将知识管理定义为负责向以下机构提供知识的一个中央流程 所有其他IT 服务管理 &a…

linux / Generic Netlink

一、概述 Generic Netlink 是内核专门为了扩展 netlink 协议簇而设计的“通用netlink协议簇”。由于 netlink 协议最多支持 32 个协议簇&#xff0c;目前 Linux4.1 的内核中已经使用其中 21 个&#xff0c;对于用户需要定制特殊的协议类型略显不够&#xff0c;而且用户还需自行…

SHELL基本知识超级详解

目录 shell基本知识 1&#xff0c;为什么学习和使用Shell编程 2&#xff0c; shell的起源 3&#xff0c;shell的功能 4&#xff0c;shell的分类 5&#xff0c; shell脚本的基本元素 6&#xff0c; shell脚本编写规范 7&#xff0c;shell脚本的执行方式 8&#xff0c; 执…

JavaScript 类的继承

通过原型链的方式继承 通过实例化一个构造函数&#xff0c;使字类的原型指向父类的实例&#xff0c;字类就可以调用到父类的属性和方法 function Parent() {this.parentName 父亲;this.getParentName function () {console.log("parent name is: %s", this.paren…

剑指 Offer 第13天 第14天

目录 剑指 Offer 21. 调整数组顺序使奇数位于偶数前面 剑指 Offer 57. 和为s的两个数字 剑指 Offer 58 - I. 翻转单词顺序 剑指 Offer 12. 矩阵中的路径 面试题13. 机器人的运动范围 剑指 Offer 21. 调整数组顺序使奇数位于偶数前面 输入一个整数数组&#xff0c;实现一个函…

maven基础-关于什么是maven、如何安装以及在Eclipse中的使用等等

本文是向大家介绍项目管理工具maven的基础使用&#xff0c;它能够实现项目构建打包共享&#xff0c;能够实现自动清理、编译、测试、报告等提高开发效率。一、为什么使用Maven这样的构建工具【why&#xff1f;】二、maven是什么【what&#xff1f;】三、安装maven四、第一个mav…