RocketMq使用规范(纯技术和实战建议)

news2025/1/12 0:53:17

概述:

  1. 使用规范主要从,生产可靠性、和消费为轴线定义使用规范;
  2. kafka使用核心:削峰解耦、向下游并行广播通知(无可靠性保证)和分布式事务,本规范仅从削峰解耦、向下游并行广播通知论述;

1、可靠性(强制)

可靠性包括Producer发送消息机制的可靠性,RocketMQ Server(Broker)消息持久化刷盘机制和Broker主从节点消息同步机制,Consumer消息的消费机制。

1.1、Producer发送消息的可靠性:

1.1.1、核心参数设置:

生产端(Producer):

  1. sendMsgTimeout:消息发送超时时长,默认:3000,单位毫秒;
  2. retryTimesWhenSendFailed:同步发送重试次数,默认:2;
  3. retryTimesWhenSendAsyncFailed:异步发送重试次数,默认:2;
  4. compressMsgBodyOverHowmuc:消息body需要压缩的阈值,默认:4K;
  5. maxMessageSize:客户端验证,允许发送的最大消息体大小,默认:4M;

注:

  1. rocketmq 的 client 端及 broker 端均有对消息体大小是否超出 maxMessageSize 进行校验;
  2. client 端的 DefaultMQProducer 定义了 maxMessageSize,默认是 4M 大小;
  3. send 方法及 batch 方法都会校验消息的大小;
  4. 服务端 conf/broker.conf 可以指定 maxMessageSize 大小,如果需要修改 maxMessageSize 大小需要跟服务端配合一起修改,否则可能投递失败;

消费端(Consumer):

  1. pullBatchSize:每批次从broker拉取消息的最大个数,默认值是32;
  2. consumeMessageBatchMaxSize:单次消费时一次性消费多少条消息;
  3. consumeFromWhere:指定消息消费读取策略,CONSUME_FROM_FIRST_OFFSET:初次从消息队列头部开始消费,后续再启动接着上次消费的进度开始消费;CONSUME_FROM_LAST_OFFSET:默认策略,初次从该队列最尾开始消费,即跳过历史消息,后续再启动接着上次消费的进度开始消费;CONSUME_FROM_TIMESTAMP:从某个时间点开始消费,默认是半个小时以前,后续再启动着上次消费的进度开始消费;
  4. consumeThreadMin:最小消费线程池数量;

  5. consumeThreadMax:最大消费线程池数量;
  6. messageModel:消费者消费模式,CLUSTERING:集群模式,默认是CLUSTERING;BROADCASTING:广播模式;

注:

  1. pullBatchSize 的大小受制于 Broker 配置文件中 maxTransferCountOnMessageInMemory 参数的设置,该参数默认设置为 32,也即是每次从服务端拉取的最大的数量不能超过 32,因此即使设置 pullBatchSize 超过32,最后也只返回 32。因此,若要每次拉取的消息量超过 32,可以修改 broker 配置文件里该参数的值,并重启 broker 服务;
  2. 当从 Broker 拉取消息的大小超过 consumeMessageBatchMaxSize 的大小时,将会对消息进行拆分,然后提交到线程池进行处理;

1.1.2、刷盘机制:

rocketmq 刷盘机制分为同步刷盘,和异步刷盘。

1.1.2.1、同步刷盘:

同步刷盘数据可靠性更高,主要是防止异常断电消息丢失,但消息发送效率不高。

1.1.2.2、异步刷盘:

异步刷盘数据可靠性不高,异常断电消息可能会丢失,但消息发送效率高。

注: 刷盘方式可以通过Broker配置文件里的flushDiskType参数设置,这个参数有两种值:

  1. SYNC_FLUSH (同步刷盘);
  2. ASYNC_FLUSH (异步刷盘);

这个参数开发人员无法决定,运维人员确定

1.1.3、同步机制:

同步机制主要指,Broker 主从节点之间同步消息,防止单点故障消息丢失,同步机制有同步复制异步复制两种策略。通过 Broker 配置文件里的 brokerRole 参数设置,有三种选项:sync_masterasync_master 和 slavesync_master 和 async_master 用于 Master 角色 Broker 的配制,sync_master 同步复制,async_master 异步复制。slave 则是在 slave 的 Broker 中指定。

  1. 在 SYNC_MASTER 场景下:消息发送到 Master 后,暂时不返回成功/失败,而是等待 slave 拉取,若在规定时间内(默认3s)没有拉取到该消息,则 Master 会返回一个 FLUSH_SLAVE_TIMEOUT 异常给发送方,此时该消息发送即算作失败;
  2. 在 ASYNC_MASTER 场景下:消息发送到 Master 后,不管 slave 有没有拉取到该消息,Master 都会返回成功;

注:不管是哪一种策略,底层同步逻辑是一致的:均是由slave不断轮询master拉取消息,并提交同步offset。

1.1.4、消息生产(producer):

Rocketmq 投递消息有三种方式:单向消息、同步消息、异步消息。

1.1.4.1、单向消息:

单向(Oneway)发送特点只负责发送消息,不等待服务器回应,且没有回调函数触发。即:只发送请求不等待应答。发送效率极高,但极易丢失消息。

 1.1.4.2、同步消息:

同步发送指消息发出后,会阻塞工作线程,直致成功,或者失败返回。发送效率极低,但数据可靠性极高。

 1.1.4.3、异步消息:

异步发送指消息发出后,不会阻塞当前工作线程。异步发送实现发送回调接口,异步处理响应结果,成功、失败、或异常。

 1.1.5、消息消费(consumer):

RocketMq 消息的消费机制可分为分组消费广播消费消费模式消费可靠性和死信队列

1.1.5.1、分组消费:

分组消费,指多个消费端通过同一消费组ID去消费,此种消费同一组,一对一消费模式。分组消费初次订阅Topic时,可以指定Offset从哪消费,即从Topic头开始消费,还是末端消费(最新),消费以后会以消费组维度记录Topic的消费Offset

1.1.5.2、广播消费:

广播消费所有 Consumer 都能收到订阅以后最新的 Topic 消息,即:只消费最新的,Consumer 停了以后也不会去记录 Consumer 消费的 Offset。而且失败不会进入重试和死信队列。

1.1.5.3、消费模式:

RocketMq 的消费模式分为,push 和 pull 消费两种,pull 即:主动从消息服务器拉取信息,push  Broker 主动推送消息到 Counsumer (其实 RocketMq 没有做到,本质上还是拉取,仅是拉取的频率高,近似推送。)

1.1.5.4、消费可靠性:

消息的可靠性分为消息消费的提交方式重试机制
提交方式指

  1. 先提交后消费;
  2. 先消费,消费成功后再提交;

1可以解决重复消费的问题但是会丢失消息(不可靠),2会导制消息重复(可靠),得去从幂等。

重试机制
消费者消费消息后,需要给 Broker 返回消费状态,Topic 消息队列的 Offset 才会下移,否则会重试,重试分为:

  1. 异常重试:由于 Consumer 端逻辑出现了异常,导致返回了 RECONSUME_LATER 状态,那么 Broker 就会在一段时间后尝试重试;
  2. 超时重试:如果 Consumer 端处理时间过长,或者由于某些原因线程挂起,导致迟迟没有返回消费状态,Broker 就会认为 Consumer 消费超时,此时会发起超时重试;

RocketMQ 可在 broker.conf 文件中配置 Consumer 端的重试次数和重试时间间隔,如下:
messageDelayLevel = 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
但是在大部分情况下,如果 Consumer 端逻辑出现异常,重试太多次也没有很大的意义,我们可以在代码中指定最大的重试次数。
RocketMQ 会有一个针对消费组创建重试队列,当消费失败后会放入重试队列,后续消息周期间隔性消费是通过重试队列实现的,达到最大次数会放入死信队列。

1.1.5.5、死信队列:

当一条消息初次消费失败,消息队列 RocketMQ 会自动进行消息重试,达到最大重试次数后,若消费依然失败,则表明消费者在正常情况下无法正确地消费该消息。此时消息队列 RocketMQ 不会立刻将消息丢弃,而是将其发送到该消费者对应的特殊队列中,即死信队列,具有以下特性:

  1. 一个死信队列对应一个 Group ID, 而不是对应单个消费者实例;
  2. 如果一个 Group ID 未产生死信消息,消息队列 RocketMQ 不会为其创建相应的死信队列。一个死信队列包含了对应 Group ID 产生的所有死信消息,不论该消息属于哪个 Topic
  3. 死信队列是一个特殊的 Topic,名称为%DLQ%consumerGroup

 死信队列中的消息需要人工干预,在 RocketMQ 中,可以通过使用 console 控制台对死信队列的权限更改为读写,然后对消息进行重发,或者订阅对应的 Topic 使得消费者实例再次进行消费。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/353331.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

OceanBase 4.0解读:兼顾高效与透明,我们对DDL的设计与思考

关于作者 谢振江,OceanBase 高级技术专家。 2015年加入 OceanBase, 从事存储引擎相关工作,目前在存储-索引与 DDL 组,负责索引,DDL 和 IO 资源调度相关工作。 回顾关系型数据库大规模应用以来的发展,从单机到分布式无…

什么是BOM?与焊盘不匹配,怎么办?

什么是BOM? 简单的理解就是:电子元器件的清单,一个产品由很多零部件组成,包括:电路板、电容、电阻、二三极管、晶振、电感、驱动芯片、单片机、电源芯片、升压降压芯片、LDO芯片、存储芯片、连接器座子、插针、排母、…

成为IT服务台经理需要什么技能

要给员工带来愉快的体验,就必须对你的服务台进行有效的管理。为此,了解为什么服务台经理的角色对于绘制企业组织良好的服务台至关重要。在本指南中,我们将深入探讨他们的角色、能力和贡献,以了解如何顺利处理服务台操作。 IT 服务…

【面试题】前端 移动端自适应?

移动端 h5 开发中有一个绕不开的话题:移动端自适应方案。移动端的设备尺寸不尽相同,要把 UI 设计图较好地展示在移动端上,需要让 h5 页面能自适应设备尺寸。接下来将对移动端自适应的相关概念、方案和其他一些常见问题做个介绍。概念简介大厂…

什么是 Web3?解读未来的去中心化网络:The Decentralized Internet of the Future Explained

目录 互联网的演化 什么是 Web 1.0? 什么是 Web 2.0? Web 2.0 变现与安全性 什么是 Web 3.0? 原生支付 创立公司的新方式 Web3 中的身份 如果你读到这篇文章,那么你已经是当代互联网世界的一员了。我们现在使用的网络和10年前大不相同。所以,互联网是怎么演化的,…

Centos7 安装 Mysql 8.0.32,详细完整教程(好文章!!)

mysql5.7的安装方式参考之前的文章: centos7 安装 Mysql 5.7.27,详细完整教程(好文章!!)_HD243608836的博客-CSDN博客 一、检查mysql版本冲突 先检查是否已经存在mysql,若存在卸载&#xff0…

大数据第一轮复习笔记(2)

Spark ./spark-submit --class com.kgc.myspark01.WordCount --master yarn --deploy-mode cluster /opt/myspark01-1.0-SNAPSHOT.jar 1.Client向YARN的ResourceManager申请启动Application Master。Client中创建SparkContext同时初始化中将创建DAGScheduler和TASKScheduler…

固态继电器的五大优势

固态继电器的优点和五个关键优势,现代电气控制系统因二极管、晶体管和晶闸管等固态器件的发明而得到极大的增强。对于加热器和电机等大负载设备,固态继电器可能比传统的机械继电器具有巨大的优势。 虽然并非适用于所有情况,但它们具有许多吸引…

前端——周总结系列五

JS的Map对象 概述 ES6新增的一种数据结构Map,对操作键值对很友好,键值对集合,提供属性和方法供开发者使用。存有键值对,键可以是任何数据类型;按照原始插入顺序存储(FIFO)原则;具有…

关于ChatGPT,我们到底在担心什么?

“ChatGPT已对教育产生了巨大冲击” “ChatGPT对程序员造成了哪些影响” “ChatGPT会取代人类的哪些工作?” “谷歌宣布推出类ChatGPT产品Bard” “Bing新版本引入ChatGPT” …… 显然,在这段时间内,ChatGPT这个词已经触发了“全民焦虑”。 …

低代码平台调研

一、什么是低代码 首先,我们来看一下低代码的概念。在维基百科上,低代码是这样定义的,它的全称叫做低代码开发平台,它为开发者提供了一种创建应用软件的开发环境,可以通过图形化界面和参数配置的方式来代替传统的纯手…

协方差以及PCA

概念:协方差(Covariance)在概率论和统计学中用于衡量两个变量的总体误差。而方差是协方差的一种特殊情况,即当两个变量是相同的情况。协方差就是衡量两个变量相关性的变量。当协方差为正时,两个变量呈正相关关系&#…

我用vue开发了一个动态网站--百宝阁 万字长文(spa电商,首页没有做动态,搜索页是动态)

一、前言 学习前端已有大半年了,虽然其中备考软件设计师考试花了两个月,但我还是收获颇丰,从最开始的html,到css,js,在到es6,promise,ajax,node.js、vue、webpack我已经有较为靠谱的编码习惯,亲…

Vue实战第4章:主页设计之中部内容设计

前言 本篇在讲什么 接上篇文章,我们制作了一个自定义的网页导航栏,本篇文章我们简单制作一个内容页 仅介绍简单的应用,仅供参考 本篇适合什么 适合初学Vue的小白 适合想要自己搭建网站的新手 适合没有接触过vue-router的前端程序 本篇…

Vue常用指令及声明周期

文章目录知识点前端开发环境配置v-text && v-htmlv-if、v-else && v-showv-forv-onv-modelv-bind、v-cloak、v-pre&&v-once全局 API 是什么Vue.directive 自定义组件Vue.directive 是什么自定义组件回调函数参数自定义组件的生命周期Vue.set 全局操作为…

【Kafka】一.认识Kafka

kafka是一个分布式消息队列。由 Scala 开发的高性能跨语言分布式消息队列,单机吞吐量可以到达 10w 级,消息延迟在 ms 级。具有高性能、持久化、多副本备份、横向扩展能力。 生产者往队列里写消息,消费者从队列里取消息进行业务逻辑。 一般在…

Unity对接接口丨简单教学丨UnityWebRequest

新手制作接口对接前言使用过程Postman测试第一次测试第二次测试第三次测试第四次测试第五次测试第六次测试总结前言 提示:大体介绍今日功能介绍 介绍大概UnityWebRequest对接接口方式,博主也是作为刚开始对接口的使用,相当详细。 使用过程 这里为内容…

你知道IP属地是怎么来的?

在互联网高速发展的时代,登录网络使用网络的过程当中,会存在非常独特的IP属地,这个独特的概念就是在互联网不断发展过程当中,对于大家来说非常熟悉而又陌生的一个必要设备,在使用各种电子设备上网的时候,都…

Flutter For Web实践

1 什么是Flutter Flutter是Google开源的一套UI工具包,帮助开发者通过一套代码库高效构建多平台精美应用,支持移动APP、web、桌面和嵌入式平台。Flutter和其他的跨平台解决方案的实现方式上有比较大的差异。 我们以React Native(下文简称RN&…

kubernetes教程 --组件详细介绍

组件详细介绍 NameSpace 在 Kubernetes 中,名字空间(Namespace) 提供一种机制,将同一集群中的资源划分为相互隔离的组。 同一名字空间内的资源名称要唯一,但跨名字空间时没有这个要求。 名字空间作用域仅针对带有名字…