rocketmq-client-go注册消费者组的问题

news2025/1/3 2:01:49

一、前言

      test环境服务启动,通过代码新注册一个customer group进行消费,服务一直报错如下:

level=error msg="fetch offset of mq from broker error" 
MessageQueue="MessageQueue [topic=xxx, brokerName=broker-a, queueId=1]"
 consumerGroup=xxx underlayError="broker response code: 22, 
 remarks: Not found, V3_0_6_SNAPSHOT maybe this group consumer boot first"

      奇怪的点在于,dev环境相同的代码,是可以注册成功并且消费的,而且运维也登上去通过命令行发送和消费过消息,确认开发环境是ok的。

二、来自chatgpt的回答

Broker 返回状态码 "22",并且附加了错误信息 "Not found,
 V3_0_6_SNAPSHOT maybe this group consumer boot first"。
这意味着该 Consumer Group 可能是第一次启动,并且 Broker 端
还没有关于它的相关记录

      也就是说,这个broker 22 错误是新注册customer group后会经常出现的,因为新的customer group要和broker同步offset,同步之后就会成功。然而实际上服务起了一夜,还是一直报这个错误。猜测是一直轮询,但是每次同步offset就报错了。

三、go客户端为什么一直报错

1、rocketmq客户端问题

参考:
https://github.com/apache/rocketmq-client-go/pull/886
https://github.com/apache/rocketmq-client-go/issues/993

      经过一顿搜索,终于在官方的issue上发现有这个提问,具体链接如上。可以看到,gorocketmq-client-go在新customer group的情况下,同步offset会返回error
在这里插入图片描述
error会导致我们收到上面的错误打印。

		off, err := r.fetchConsumeOffsetFromBroker(r.group, mq)
		if err != nil {
			rlog.Error("fetch offset of mq from broker error", map[string]interface{}{
				rlog.LogKeyConsumerGroup: r.group,
				rlog.LogKeyMessageQueue:  mq.String(),
				rlog.LogKeyUnderlayError: err,
			})
			r.mutex.RUnlock()
			return -1, err
		}

2、rocketmq客户端社区的修复

      rocketmq-client-go社区已经修复了这个问题,在找不到consumer group的时候,返回-1nil,这样就不会报出来错误,阻塞下面的流程。

if res.Code == internal.ResQueryNotFound {
		return -1, nil
	}

// 函数外处理,把offset注册到OffsetTable这个map中
// OffsetTable map[primitive.MessageQueue]int64
// key是mq的消费队列,value是offset值。把offset=-1挂到我们新group对应的mq queue实例上,
// 接下来就可以同步offset了
func (r *remoteBrokerOffsetStore) update(mq *primitive.MessageQueue, offset int64, increaseOnly bool) {
	r.mutex.Lock()
	defer r.mutex.Unlock()
	localOffset, exist := r.OffsetTable[*mq]
	if !exist {
		r.OffsetTable[*mq] = offset
		return
	}
	if increaseOnly {
		if localOffset < offset {
			r.OffsetTable[*mq] = offset
		}
	} else {
		r.OffsetTable[*mq] = offset
	}
}

      有兴趣的同学可以看看java的实现,还是挺清晰的,代码地址:https://github.com/apache/rocketmq/blob/develop/client/src/main/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStore.java#L228

四、为什么dev环境是可以正常消费

      百思不得其解,明明是同一套代码。。后来问了运维,原来dev环境服务刚启动也是报错的状态,然后运维手动去发送了命令,并通过新customer group消费了一次,后来服务就可以成功消费了。

RocketMQ 的 Consumer 连接上 Broker 之后,会自动拉取消息消费进度。Consumer 向 Broker 
发送拉取消息的请求时会携带该 Consumer Group 对应的每个 Message Queue 消费进度,Broker 
会根据这些消费进度返回还未被消费的消息给 Consumer,并将实际消费进度保存到 Consumer 所在的
Consumer Group 的消费进度表中。

      也就是说,命令行之行的时候,自动进行了consumeroffset同步。。。

五、最终的解决方案

1、升级rocket-rocket-go为master版本

master版本已经修复了这个问题

go get github.com/apache/rocketmq-client-go/v2@master
// 下面是我现在的go.mod
//github.com/apache/rocketmq-client-go/v2 v2.1.2-0.20230518020902-2a8172bb9174

      升级master毕竟不是稳定版本,可能会有问题,更多是临时方案,后续还是要安装社区稳定版本的。

2、登录rocketmq实例,手动发送消息

手动发送并消费消息,相当于使用rocketmq官方的client来同步offset,这样就绕开了rocketmq-client-go的这个bug。消费者注册完成后,接下来就可以顺利的消费了。

3、提前创建好consumer group

不要通过代码创建,一了百了

end

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/602733.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

字节跳动面试挂在2面,复盘后,决定二战.....

先说下我基本情况&#xff0c;本科不是计算机专业&#xff0c;现在是学通信&#xff0c;然后做图像处理&#xff0c;可能面试官看我不是科班出身没有问太多计算机相关的问题&#xff0c;因为第一次找工作&#xff0c;字节的游戏专场又是最早开始的&#xff0c;就投递了&#xf…

人体关键点检测

title: 人体关键点检测 date: 2023-06-02 21:28:46 tags: [MMPose,cv] 人体关键点检测 [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-9x4nhwLf-1685714024668)(https://fastly.jsdelivr.net/gh/weijia99/blog_imagemain/1685712470039%E4%BA%BA%E4…

【stm32开发】stm32+oled最小系统板资料(原理图、PCB、示例代码)【六一】

&#x1f389;欢迎来到stm32专栏~stm32oled最小系统板 ☆* o(≧▽≦)o *☆嗨~我是小夏与酒&#x1f379; ✨博客主页&#xff1a;小夏与酒的博客 &#x1f388;该系列文章专栏&#xff1a;stm32专栏 文章作者技术和水平有限&#xff0c;如果文中出现错误&#xff0c;希望大家能…

深度学习训练营之J5周DenseNet+SE-Net实战

深度学习训练营之J5周DenseNetSE-Net实战 原文链接方法介绍SE模块应用分析SE模块的效果对比SE模块代码实现SE模块在DenseNet当中的应用参考内容 原文链接 &#x1f4cc;第J5周&#xff1a;DenseNetSE-Net实战&#x1f4cc; &#x1f368; 本文为&#x1f517;365天深度学习训练…

UART帧格式介绍

UART及通信方式简介 UART Universal Asynchronous Receiver Transmitter 即通用异步收发器&#xff0c;是一种通用的串行、异步通信总线 &#xff0c;该总线有两条数据线&#xff0c;可以实现全双工的发送和接收&#xff0c;在嵌入式系统中常用于主机与辅助设备之间的通信…

Web3 游戏团队如何在项目发布前奠定成功基础——以真实用户支持为核心的运营策略

作者&#xff1a;lesleyfootprint.network 运营 Web3 游戏项目是一项令人兴奋且具有挑战性的任务。无论是对于 NFT 游戏还是链上多人游戏&#xff0c;建立强大且高度参与的用户群体都是游戏成功的关键因素之一。 在本文中&#xff0c;我们将探讨游戏中真实参与和机器人刷量之…

class文件中,常量池之后的相关数据解析!【class二进制文件分析】

前言&#xff1a;前段时间读《深入java虚拟机》介绍到class文件的时候&#xff0c;由于理论知识较多&#xff0c;人总感觉疲惫不堪&#xff0c;就泛泛阅读了一下。在工作中使用起来知识点知道&#xff0c;但是总是需要查阅各种资料。今天有时间&#xff0c;继续整理常量池后面的…

msvcp140.dll丢失的4个解决方法,msvcp140.dll丢失的常见原因

msvcp140.dll是Windows操作系统中的一个动态链接库文件&#xff0c;由Microsoft Visual C程序库所提供。它包含了许多C函数和类的定义&#xff0c;可以为应用程序提供一些基本服务&#xff0c;比如内存管理、文件输入/输出和网络连接等功能。我们在打开游戏或者软件的时候&…

Goby 漏洞更新 |海康威视部分iVMS系统存在文件上传漏洞

漏洞名称&#xff1a;海康威视部分iVMS系统存在文件上传漏洞 English Name&#xff1a;Some Hikvision iVMS file upload vulnerabilitie CVSS core: 9.8 影响资产数&#xff1a;15294 漏洞描述&#xff1a; 海康威视-iVMS综合安防管理平台是一套“集成化”、“数字化”、…

Redis系列----redis网络模型2

一、redis单双线程判断 一、redis单线程定义 主要是指Redis的网络IO和键值对读写是由一个线程来完成的&#xff0c;Redis在处理客户端的请求时包括获取 (socket 读)、解析、执行、内容返回 (socket 写) 等都由一个顺序串行的主线程处理&#xff0c;这就是所谓的“单线程”。这…

VH6501干扰仪的使用

目录 1.VH6501接口介绍 2.使用场景 2.1当VH6501作为硬件接口卡作通信/监测使用时&#xff0c;使用CH1的任意接口与总线连接即可 2.2若将 VH6501 用于干扰总线&#xff0c;针对单节点干扰时则通常情况下接任意一个接口即可 2.3针对多节点干扰时&#xff0c;需要通过CH1的两个…

YCbcr to rgb 笔记

YCbCr 色彩空间的一种 YCbCr或Y’CbCr有的时候会被写作&#xff1a;YCBCR或是Y’CBCR&#xff0c;是色彩空间的一种&#xff0c;通常会用于影片中的影像连续处理&#xff0c;或是数字摄影系统中。Y’为颜色的亮度(luma)成分、而CB和CR则为蓝色和红色的浓度偏移量成份。Y’和Y…

3.4 最大字段和

博主简介&#xff1a;一个爱打游戏的计算机专业学生博主主页&#xff1a; 夏驰和徐策所属专栏&#xff1a;算法设计与分析 1.什么是最大子段和? 我的理解&#xff1a; 最大子段和是一个经典的问题&#xff0c;也称为最大子数组和问题。给定一个整数数组&#xff0c;要求找到…

口撕raft面试100问

1&#xff0c;Raft 协议什么作用 2&#xff0c;详细介绍 Raft 流程 我觉得以下这个流程是比较详细的了&#xff0c; 以下是带上了持久化和日志压缩的细节&#xff1a; 持久化&#xff1a;节点会定期将自己的信息&#xff0c;比如当前任期号、投票信息、日志条目和快照&#…

项目改造操作(图书管理系统为例)

目录 后端 概述 获取所有的读者的借阅卡号 获取所有的未被借阅的图书编号 进行借阅 前端 后端 概述 本模块主要完成对图书的借阅处理。需要实现三个接口&#xff0c;第一个是获取所有的读者的借阅卡号&#xff0c;第二个是获取所有的未被借阅的图书编号&#xff0c;第三…

Spring Boot 启动注解分析

文章目录 1. SpringBootApplication2. EnableAutoConfiguration3. AutoConfigurationImportSelector3.1 isEnabled3.2 getCandidateConfigurations 3.3 removeDuplicates3.4 getExclusions3.5 checkExcludedClasses3.6 removeAll3.7 filter 虽然我们在日常开发中&#xff0c;S…

【Golang】golang中http请求的context传递到异步任务的坑

文章目录 前言一、HTTP请求的Context传递到异步任务的坑 前言 在golang中&#xff0c;context.Context可以用来用来设置截止日期、同步信号&#xff0c;传递请求相关值的结构体。 与 goroutine 有比较密切的关系。 在web程序中&#xff0c;每个Request都需要开启一个goroutin…

使用docker部署nginx并支持https

配置nginx支持https&#xff0c;其实也简单&#xff0c;搞个证书&#xff0c;然后修改下配置文件就好了。我以前一篇文章&#xff08;使用docker部署多个nginx站点并配置负载均衡&#xff09;为例&#xff0c;做个记录。 如前所述&#xff0c;我使用docker&#xff0c;部署了3…

一文带你看懂软件测试(功能、接口、性能、自动化)详解

全文2000字&#xff0c;预计阅读时间10分钟&#xff0c;建议先点赞收藏慢慢看 一、软件测试功能测试 测试用例编写是软件测试的基本技能&#xff1b;也有很多人认为测试用例是软件测试的核心&#xff1b;软件测试中最重要的是设计和生成有效的测试用例&#xff1b;测试用例是测…

面了个京东拿30k出来的,牛逼到家了。。。

今天上班开早会就是新人见面仪式&#xff0c;听说来了个很厉害的大佬&#xff0c;年纪还不大&#xff0c;是上家公司离职过来的&#xff0c;薪资已经达到中高等水平&#xff0c;很多人都好奇不已&#xff0c;能拿到这个薪资应该人不简单&#xff0c;果然&#xff0c;自我介绍的…