rocketMQ5.0顺序消息golang接入

news2025/2/24 0:49:04

本人理解,顺序消息如果不分消息组,那么会影响并行处理速度,所以尽量消息组分的散一些
首先上要求,官方文档如下:
在这里插入图片描述
总结:
1.必须同一个消息组,消息组和消费组不是一个概念,不要混
2.必须单一生产者,也就是说线上生产只能开一个 pod,感觉局限有点高,无法多 pod 接入
3.必须串行发送,这个点也不太好,限制过高
以上三点
在我的案例中(企业微信回调消息)
那么只能开一个接口服务Pod 来接收微信回调,如果挂了,那完蛋,开多个 pod的话,那只有把请求放队列,通过队列pop进行消费再生产消息到 mq,这样同时也解决了第三点的串行发送。

生产的时候如何确定是顺序消息,只需要生产消息的时候给设定一个消息组

msg := &rmq_client.Message{
Topic: Topic,
	Body:  []byte("this is a message : " + strconv.Itoa(i)),
}
// set keys and tag
msg.SetKeys("a", "b")
msg.SetTag("ab")
// 这里设置消息组
msg.SetMessageGroup("fifo")

分组分的越细越好

提高消费速度

当拿到消息后,根据消息分组来进行并发处理,每个分组内进行串行处理,关键代码如下


func (m *RocketMq) ConsumerOrderly(funcMap map[string]func([]byte) error) {
	var err error

	m.consumerOnce.Do(func() {
		Log().Info("##############pro consume orderly start#############", m.MqConfig)
		errTmp := m.proSimConsumer.Start()
		if errTmp != nil {
			Log().Panic("MQ启动失败", errTmp)
			return
		}
	})
	defer m.proSimConsumer.GracefulStop()
	// 总体保证有N个在运行
	var ch = make(chan int, m.MaxGoroutine)
	for {
		fmt.Println("start receive message")
		mvs, errReceive := m.proSimConsumer.Receive(context.TODO(), 8, 20*time.Second)
		if errReceive != nil {
			if strings.Contains(errReceive.Error(), "no new message") {
				Log().Info(errReceive)
			} else {
				Log().Error("顺序消息,拉取MQ消息失败:", errReceive)
			}
			time.Sleep(time.Second * 2)
			continue
		}
	
		var msgGroupMap = make(map[string][]*rmq_client.MessageView, 0)
		for _, v := range mvs {
			if _, ok := funcMap[*v.GetTag()]; !ok {
				Log().Error(v.GetTag(), ": action do not exist")
				continue
			} else {
				// 根据msgGroup汇总
				msgGroup := v.GetMessageGroup()
				msgGroupMap[*msgGroup] = append(msgGroupMap[*msgGroup], v)
			}
		}
		// 最大程度多线程消费
		for _, item := range msgGroupMap {
			ch <- 1
			go func(item []*rmq_client.MessageView) {
				for _, v := range item {
					fmt.Println(*v.GetTag(), *v.GetMessageGroup(), string(v.GetBody()))
					action, _ := funcMap[*v.GetTag()]
					if errTmp := action(v.GetBody()); errTmp != nil {
						Log().Error("mq顺序消费失败", errTmp)
						break
					} else {
						m.proSimConsumer.Ack(context.TODO(), v)
					}
				}
				<-ch
			}(item)
		}
	}
}

最后需要注意的点:

同一个消费者 Group ID 下所有的 Consumer 实例必须保证订阅的 Topic 一致,并且也必须保证订阅 Topic 时设置的过滤规则(Tag)一致。否则您的消息可能会丢失
请保证订阅一致性
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1246132.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

大语言模型概述(二):基于亚马逊云科技的研究分析与实践

上期介绍了大语言模型的定义和发展历史&#xff0c;本期将分析基于亚马逊云科技的大语言模型相关研究方向&#xff0c;以及大语言模型的训练和构建优化。 大语言模型研究方向分析 Amazon Titan 2023 年 4 月&#xff0c;亚马逊云科技宣布推出 Amazon Titan 大语言模型。根据…

Azure Machine Learning - 创建Azure AI搜索服务

目录 准备工作查找 Azure AI 搜索产品/服务选择订阅设置资源组为服务命名选择区域选择层创建服务配置身份验证扩展服务何时添加第二个服务将多个服务添加到订阅 Azure AI 搜索是用于将全文搜索体验添加到自定义应用的 Azure 资源&#xff0c;本文介绍如何创建Azure AI搜索服务 …

ROS知识:卡尔曼滤波

https://en.wikipedia.org/wiki/Kalman_filter 一、提要 在卡尔曼滤波的相关技术文献中,其数学表达看起来都非常晦涩和不透明。这很糟糕,如果您以正确的方式看待卡尔曼滤波器,它实际上非常简单易懂。这里的叙述简单,先决条件也很简单;您所需要的只是对概率和矩阵的基本了解…

数字孪生智慧校园 Web 3D 可视化监测

当今&#xff0c;智慧校园发展阶段亟需推动信息可视化建设与发展&#xff0c;将大数据、云计算、可视化等高新技术相融合&#xff0c;为校园师生创造科学智能的学习环境&#xff0c;并实现教学资源最大化和信息服务智能化。帮助学校更好地应用校园可视化技术&#xff0c;提升校…

java--static修饰成员方法

1.成员方法的分类 ①类方法&#xff1a;有static修饰的成员方法&#xff0c;属于类 ②实例方法&#xff1a;无static修饰的成员方法&#xff0c;属于对象。 2.成员方法的执行原理 解析&#xff1a; 第一行代码&#xff1a;扫描class包名&#xff0c;在方法区生成一个Test.cl…

chatGPT4机器学习数据后最终保留在机器里的是什么? 机器是怎么产生智能的? TensorFlow没有直接开发出类似GPT-4这样的模型

机器学习数据后最终保留在机器里的是机器学习模型。机器学习模型是机器学习系统中的核心&#xff0c;它是机器学习系统能够进行推理和预测的基础。 机器学习模型通常由参数组成。参数是机器学习模型的权重和偏差。机器学习系统通过训练来学习这些参数。训练是指让机器学习系统…

46、Flink 的table api与sql之配项列表及示例

Flink 系列文章 1、Flink 部署、概念介绍、source、transformation、sink使用示例、四大基石介绍和示例等系列综合文章链接 13、Flink 的table api与sql的基本概念、通用api介绍及入门示例 14、Flink 的table api与sql之数据类型: 内置数据类型以及它们的属性 15、Flink 的ta…

opencv-GrabCut 图像分割算法

GrabCut 是一种图像分割算法&#xff0c;通过迭代优化的方式将图像分割为前景和背景。这种算法最初由Carsten Rother、Vladimir Kolmogorov和Andrew Blake于2004年提出。 GrabCut 算法的基本思想是通过用户**提供的一个矩形区域&#xff08;称为"掩模"&#xff09;*…

【C++】:多态

朋友们、伙计们&#xff0c;我们又见面了&#xff0c;本期来给大家解读一下有关多态的知识点&#xff0c;如果看完之后对你有一定的启发&#xff0c;那么请留下你的三连&#xff0c;祝大家心想事成&#xff01; C 语 言 专 栏&#xff1a;C语言&#xff1a;从入门到精通 数据结…

MFC所有控件介绍及基本使用

一、前言 本篇文档介绍了MFC控件的基本使用&#xff0c;同时提供了关于MFC控件使用的工程代码&#xff0c;程序界面如下图&#xff0c;有兴趣的可以到文档最后的链接处进行下载。 二、控件介绍 2.1 Button &#xff08;按钮&#xff09; 2.2 CheckBox&#xff08;复选框&am…

【算法】链表-20231124

这里写目录标题 一、83. 删除排序链表中的重复元素二、206. 反转链表三、234. 回文链表 一、83. 删除排序链表中的重复元素 简单 1.1K 相关企业 给定一个已排序的链表的头 head &#xff0c; 删除所有重复的元素&#xff0c;使每个元素只出现一次 。返回 已排序的链表 。 示例…

Android相机性能提高50%

文章目录 应用举例&#xff08;可以不看这一part&#xff0c;直接跳过看具体怎么做&#xff09;&#xff1a;Snapchat 通过 Camera2 Extensions API 将新相机功能的集成速度提高了 50%**Camera2 扩展 API 可以访问高级功能更多设备上的更多机会 正文&#xff1a;开始使用扩展架…

甄知燕千云ITAM,您的IT资产管理专家

IT 资产是实现企业持续发展的重要资源之一&#xff0c;也是反观企业数字化转型发展的缩影。通常情况下&#xff0c;IT 资产指组织拥有、租赁或使用的任何信息技术设备和资源&#xff0c;这些设备和资源对组织的业务运行起到支持作用。包括硬件设备&#xff08;如服务器、计算机…

Rust语言入门教程(一) - 简介及Cargo使用

Rust编程入门 为什么学习Rust 我本人是一个DevOps工程师&#xff0c;并不是专职的开发人员&#xff0c;但需要了解各种各样的语言的基本知识和特性&#xff0c;以便在不同的项目中帮助开发人员设计软件架构&#xff0c;部署流程以及进行错误排查和调试。但是对任何新生的优秀…

大语言模型概述(三):基于亚马逊云科技的研究分析与实践

上期介绍了基于亚马逊云科技的大语言模型相关研究方向&#xff0c;以及大语言模型的训练和构建优化。本期将介绍大语言模型训练在亚马逊云科技上的最佳实践。 大语言模型训练在亚马逊云科技上的最佳实践 本章节内容&#xff0c;将重点关注大语言模型在亚马逊云科技上的最佳训…

数据结构算法-贪心算法

引言 贪心&#xff1a;人只要有 “需求“ &#xff0c;都会有有点“贪“&#xff0c; 这种“贪“是一种选择&#xff0c;或者“”取舍“ RTS&#xff08;即时战略&#xff09;游戏&#xff1a; 帝国时代里 首先确保拥有足够的人口 足够的粮食&#xff0c;足够的战略资源 足够的…

VMware vShere download

VMware 前言 VMware vSphere 是 VMware 的虚拟化平台,可将数据中心转换为包括 CPU、存储和网络资源的聚合计算基础架构。vSphere 将这些基础架构作为一个统一的运行环境进行管理,并为您提供工具来管理加入该环境的数据中心。 vSphere 的两个核心组件是 ESXi 和 vCenter Ser…

前端必学——实现电商图片放大镜效果(附代码)

放大镜可以说是前端人必须学会的程序之一,今天的案例为大家展示一下怎么实现放大镜的效果&#xff01; 效果图展示 整个效果就是当鼠标放到展示图上的时候&#xff0c;会出现一个遮罩层以及弹出来一个框展示一个详情图&#xff0c;并且鼠标移动的时候详情图跟着移动&#xff0c…

老外对开发信的评价是什么?如何写开发信?

老外对开发信的评价和态度怎么样&#xff1f;国外客户喜欢的开发信类型有哪些&#xff1f; 许多中国公司和个人都在与老外打交道时&#xff0c;不可避免地需要发送开发信。但是&#xff0c;老外对开发信的评价究竟如何呢&#xff1f;在这篇文章中&#xff0c;蜂邮将深入探讨老…