springcloud rocketmq 新增的消费者组从哪里开始消费

news2024/9/20 14:38:16

如果新建一个新的消费者组,是否会消费历史消息,导致重复消费?

直接在 console 界面新增消费者组,但是没有办法绑定订阅关系,没有找到入口,在 控制台项目源码 rocketmq-externals 也没有找到可以确定订阅关系的接口,在阿里云的生产控制台也没有绑定的入口。
在这里插入图片描述
在这里插入图片描述
所以只能是消费者启动后再注册订阅关系。
消费者从哪里消费的计算:
RebalancePushImpl.java
在这里插入图片描述

默认走的是:CONSUME_FROM_LAST_OFFSET 规则,按照官方说法,是从最后的消费位点开始继续消费。
关键的获取消费位点的逻辑:readOffset方法:
RemoteBrokerOffsetStore.java
在这里插入图片描述
集群模式下,是从远程获取的偏移量,跟据 fetchConsumeOffsetFromBroker 方法:
在这里插入图片描述
在这里插入图片描述
报错,其实就是服务端没有该消费者组的offset,被catch住默认返回 -1.
又不是重试队列,所以拿最大的偏置,broker-a queue-8 的 brokerOffset 是 25
在这里插入图片描述
出来到了 RebalanceImpl.java 的 updateProcessQueueTableInRebalance 方法。
在这里插入图片描述
然后会被添加到 pullRequestList 通过 this.dispatchPullRequest(pullRequestList)

控制台topic消费进度中已经保存了新的消费者组的消费进度,但 consumeOffset都是 0, 还有 759 个消息没有消费。
在这里插入图片描述

消费者消费了一些比较早前的消息:
在这里插入图片描述
在这里插入图片描述

消费进度也随之更新。
在这里插入图片描述
为什么和官方的说法不一致呢?CONSUME_FROM_LAST_OFFSET 为什么没有起到作用?
参考官方的修复:Fix CONSUME_FROM_LAST_OFFSET mode may pull data from 0L #4909

~~
至于这个console怎么看?参考以下:
在rocketmq的控制台中,选择 topic -> consumer manage,就可以查看一个主题下的消费者组、集群、队列的消费情况。
在这里插入图片描述
其中,10.122.24.41 是本人的内网ip,如果我本地线程卡住了(或者debug中),这个在线状态也会下线的。目前我是分配到了其中的8个集群队列,broker-a(8~15)。
在这里插入图片描述
offsetTable 的内容和我所描述的一致。
此外:
我还有对比组,是之前创建的废弃的消费者组,集群位点 brokerOffset 不变,消费位点 consumerOffset 落后了许多,落后的总量 diffTotal 代表此消费者组还有这么多未消费的消息。而且也没有在线的消费者客户端 consumerClient。
在这里插入图片描述
如果这时,我配置启动消费者去消费此消费者组。预计会消费 delay = 294 个消息。
结果也确实如此,将消息消费完,而且分配到了所有集群的所有队列。
在这里插入图片描述

测试结果:默认策略会从offset = 0 开始消费。
在这里插入图片描述
所以该参数没有用,还是从0开始消费了,这时候只能靠消费者组重置位点操作了。

~~
tags过滤,是服务端过滤,为什么会直接将不需要的消息也丢失掉呢?
这就要涉及到订阅关系一致性。
在这里插入图片描述
在这里插入图片描述

参考:https://help.aliyun.com/zh/apsaramq-for-rocketmq/cloud-message-queue-rocketmq-5-x-series/developer-reference/message-filtering

tags过滤会将不匹配的直接跳过(丢失)
我的理解是,现在没有为某个tags有单独记录消费进度的地方,所谓的服务端过滤,也只是说用hashcode快速匹配拉取而已,之后也是直接将offset拉到队列尾的。

参考:https://mp.weixin.qq.com/s/RnS675dt-wErnEuolK6Zeg?spm=a2c6h.12873639.article-detail.18.3ba035175CHVos

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1947572.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Windows 11预览补丁KB5040527影响火绒驱动加载的解决办法

7 月 11 日,微软更新Windows 11 预览版本补丁 KB5040527,补丁安装后会影响火绒驱动加载导致火绒安全软件服务异常,补丁相关信息如下: https://blogs.windows.com/windows-insider/2024/07/11/releasing-windows-11-builds-22621-…

微信视频号下载又一工具,免费简单易用!

Res-downloader嗅探资源下载器请收好 官方称支持微信视频号、抖音、快手、小红书等网络资源下载 使用方法 第一步,下载软件 文末扫码 或者搜索关注公众号AIshape 回复 RES 获取 或 自行百度搜索下载 第二步,软件设置 打开软件会弹出接受传入网络链…

CSI-RS在信道中传输的过程

简单介绍CSI-RS信号生成,在信道中传输和接收的过程 1.载波配置 首先需要配置载波相关的参数 系统带宽和子载波间隔 5G NR中,系统带宽和子载波间隔是两个关键参数,共同决定无线资源的分配和使用 系统带宽 5G NR支持广泛的系统带宽&…

【SOC 芯片设计 DFT 学习专栏 -- DFT OCC 与 ATPG的介绍】

请阅读【嵌入式及芯片开发学必备专栏】 请阅读【芯片设计 DFT 学习系列 】 如有侵权,请联系删除 转自: 简矽芯学堂 简矽芯学堂 2024年01月18日 09:00 陕西 文章目录 OCC 介绍Fast ScanFull chip ATPGPartition ATPGHierarchical ATPG OCC 介绍 OCC&am…

Ubuntu下载jdk:cannot execute binary file

虚拟机上Ubuntu系统安装jdk且配置环境之后,java -version显示cannot execute binary file,多番查阅推测是由于系统和jdk版本不兼容的原因。 uname -m查看系统版本位i686,是32位的,和64位的jdk版本不兼容。因此,下载32位…

QT--进程

一、进程QProcess QProcess 用于启动和控制外部进程,管理其输入输出流。 使用方法 start():启动一个新进程。setStandardInputFile():将文件作为标准输入。将进程的标准输入(stdin)重定向到指定的文件。换句话说&am…

【Linux】从零开始认识多线程 --- 线程互斥

人生有许多事情 正如船后的波纹 总要过后才觉得美的 -- 余光中 线程互斥 1 线程类的封装1.1 框架搭建1.2 线程启动1.3 线程终止1.4 线程等待1.5 运行测试 2 线程互斥2.1 多线程访问的问题2.2 解决办法 --- 锁2.3 从原理角度理解锁 Thanks♪(・ω・)&am…

QT信号和信号槽

信号和信号槽 一.信号与槽1.信号和槽的概述1.2.信号的本质1.3.信号的本质 二.信号和槽的使用2.1 连接信号和槽connect()函数原型:参数的说明 三.自定义信号和槽3.1基本语法1.自定义信号槽的书写规范2、自定义槽函数书写规范3.发送信号 3.2带参数的信号和槽 四.信号与…

Eclipse 搭建 C/C++ 开发环境以及eclipse的使用

一、下载、安装 MinGW 1、下载: 下载地址:MinGW - Minimalist GNU for Windows - Browse Files at SourceForge.net 点击“Download Latest Version”即可 下载完成后,得到一个名为 mingw-get-setup.exe 的安装文件。双击运行,安装即可。 …

一套功能齐全、二开友好的即时通讯IM工具,提供能力库和UI库,支持单聊、频道和机器人(附源码)

前言 在当今数字化时代,即时通讯(IM)和实时音视频(RTC)功能已成为众多应用的标配。然而,现有的解-决方案往往存在一些痛点,如架构落后、成本高昂、数据安全性和隐私保护不足,以及二次开发和部署的复杂性。 为了解决这些问题&…

超薄超小单独北斗定位的4G工牌记录仪、4G胸卡记录仪

AIoT万物智联,智能安全帽生产厂家,执法记录仪生产厂家,智能安全帽、智能头盔、头盔记录仪、执法记录仪、智能视频分析/边缘计算AI盒子、车载DVR/NVR、布控球、智能眼镜、智能手电、智能电子工牌、无人机4G补传系统等统一接入大型融合通信可视…

无法继续安装 计算机正忙于安装一个非 Visual Studio 的程序。

解决办法 以管理员身份打开cmd运行窗口,执行以下命令: taskkill -f -im msiexec*

网络三剑客之sed

目录 一、sed是什么 二、sed为什么这样工作(原理) 三、sed命令该怎么操作 3.1 基本的操作选项 3.2 怎么去使用sed 3.3 操作实例 3.3.1 打印输出(-n、-p、-r) 3.3.2 增加内容(i、a) 3.3.3 删除&…

网络安全入门教程(非常详细)从零基础入门到精通_网路安全 教程

前言 1.入行网络安全这是一条坚持的道路,三分钟的热情可以放弃往下看了。2.多练多想,不要离开了教程什么都不会了,最好看完教程自己独立完成技术方面的开发。3.有时多百度,我们往往都遇不到好心的大神,谁会无聊天天给…

机器学习 | 回归算法原理——最速下降法(梯度下降法)

Hi,大家好,我是半亩花海。接着上次的最小二乘法继续更新《白话机器学习的数学》这本书的学习笔记,在此分享最速下降法(梯度下降法)这一回归算法原理。本章的回归算法原理基于《基于广告费预测点击量》项目,…

大文件分片上传(前端TS实现)

大文件分片上传 内容 一般情况下,前端上传文件就是new FormData,然后把文件 append 进去,然后post发送给后端就完事了,但是文件越大,上传的文件也就越长,如果在上传过程中,突然网络故障,又或者…

opencascade AIS_InteractiveContext源码学习9 obsolete methods

AIS_InteractiveContext 前言 交互上下文(Interactive Context)允许您在一个或多个视图器中管理交互对象的图形行为和选择。类方法使这一操作非常透明。需要记住的是,对于已经被交互上下文识别的交互对象,必须使用上下文方法进行…

鸿蒙SDK开发能力

什么是鸿蒙SDK:HarmonyOS(Software Development Kit)是面向应用和服务开发的开放能力合集,本质就是工具集,与JDK、AndroidSDK在逻辑上有相似之处 18N:1指的是手机,8指的是车机、音箱、耳机、手表/手环、平板、大屏、PC、AR/VR&am…

Python——使用Seaborn钻石数据可视化分析(2)

续 Python——使用Seaborn钻石数据可视化分析(1) 目录 📈 4、非数值变量描述性统计分析 1️⃣ 柱状图——分析钻石切工的情况 📍 sns.countplot —— 绘制柱状图、条形图 2️⃣ 箱线图——分析不同切工的钻石的价格情况 📍 sns.barplot —— 不同分类变量之间的数…

用这些宝藏AI工具打造副业!实现被动收入!

前言 大家好,我是月月!今天我们来梳理一下在目前的形势下,如何用AI工具打造一个躺赚的副业,实现被动收入?有哪些方法和途径?在本篇文章我主要提供一些已有的AI工具,后面我们再根据具体的AI工具…