RabbitMQ-死信队列(golang)

news2024/11/18 22:12:48

1、概念

      死信(Dead Letter),字面上可以理解为未被消费者成功消费的信息,正常来说,生产者将消息放入到队列中,消费者从队列获取消息,并进行处理,但是由于某种原因,队列中的消息未被消费者拿到,这样的消息就会成为死信,存放死信消息的队列,也就被称为死信队列(Dead Letter Queue,简称DLQ)。

2、死信产生的原因

文心一言的回答如下:

  1. 消息被拒绝:当消费者使用basic.reject或basic.nack方法拒绝消息,并且requeue参数被设置为false时,消息会被视为死信。这意味着消费者明确表示无法或不愿意处理该消息,并且不希望该消息重新进入队列等待其他消费者处理。
  2. 消息处理失败:消费者由于代码错误、消息格式不正确、业务规则冲突等原因无法成功处理消息时,该消息也可以被标记为死信。这种情况下,尽管消费者尝试处理消息,但由于某些无法克服的错误,消息无法被成功消费。
  3. 消息过期:如果消息设置了生存时间(TTL,Time To Live),并且在这个时间内没有被消费,那么消息会过期并被视为死信。TTL是RabbitMQ中用于指定消息在队列中存活时间的参数,超过该时间的消息将被视为过期并丢弃或转发到死信队列。
  4. 队列长度限制:当队列中的消息数量超过了设置的最大长度时,新到达的消息无法进入队列,这些消息也会被视为死信。队列长度限制是RabbitMQ中用于控制队列大小的一种机制,当队列达到最大容量时,新到达的消息将无法被接收并可能被丢弃或转发到死信队列。

总结来说,主要原因就三个,消息被拒绝、消息过期、队列满

在一些重要的场景,比如支付场景,提交的订单超时未支付的,可以设计为进入死信队列。

3、死信队列使用实践

3.1 消息过期

设置正常队列ttl过期时间为5s,如果5s内消息没有被消费,则会自动放入死信队列中。

关键点:设置正常队列属性,ttl5s过期:

// 设置TTL并让消息过期进入死信队列,5s不消费就为进入死信队列
	args := amqp.Table{
		"x-message-ttl":             int64(5000), // 5秒TTL
		"x-dead-letter-exchange":    "",
		"x-dead-letter-routing-key": dlx.Name,
	}
	// 声明正常队列
	q, err := ch.QueueDeclare(
		"normal_queue", // name
		true,           // durable
		false,          // delete when unused
		false,          // exclusive
		false,          // no-wait
		args,           // arguments
	)

全部代码如下: 

package main

import (
	"fmt"

	amqp "github.com/rabbitmq/amqp091-go"
)

func main() {
	conn, err := amqp.Dial("amqp://guest:guest@xxxx.xx.xx.xxx:5672/")
	if err != nil {
		fmt.Println("Failed to connect to RabbitMQ")
		return
	}
	defer conn.Close()
	ch, err := conn.Channel()
	if err != nil {
		fmt.Println("Failed to open a channel")
		return
	}
	// 声明死信队列
	dlx, err := ch.QueueDeclare(
		"dead_letter_queue", // name
		true,                // durable
		false,               // delete when unused
		false,               // exclusive
		false,               // no-wait
		nil,                 // arguments
	)
	if err != nil {
		fmt.Println("Failed to declare a queue: dead_letter_queue,err:" + err.Error())
		return
	}
	err = ch.ExchangeDeclare(
		"my_exchange", // name
		"direct",      // type
		true,          // durable
		false,         // auto-deleted
		false,         // internal
		false,         // no-wait
		nil,           // arguments
	)
	if err != nil {
		fmt.Println("Failed to declare exchange: my_exchange,err:" + err.Error())
		return
	}
	// 设置TTL并让消息过期进入死信队列,5s不消费就为进入死信队列
	args := amqp.Table{
		"x-message-ttl":             int64(5000), // 5秒TTL
		"x-dead-letter-exchange":    "",
		"x-dead-letter-routing-key": dlx.Name,
	}
	// 声明正常队列,注意,必须在声明队列时就要设置死信队列信息
	q, err := ch.QueueDeclare(
		"normal_queue", // name
		true,           // durable
		false,          // delete when unused
		false,          // exclusive
		false,          // no-wait
		args,           // arguments
	)
	if err != nil {
		fmt.Println("Failed to declare a queue: normal_queue,err:" + err.Error())
		return
	}
	// 将正常队列绑定到交换机,并设置死信交换机和路由键
	err = ch.QueueBind(
		q.Name,        // queue name
		q.Name,        // routing key
		"my_exchange", // exchange
		false,
		nil,
	)
	if err != nil {
		fmt.Println("Failed to bind queue to exchange: my_exchange,err:" + err.Error())
		return
	}
	err = ch.Publish("my_exchange", "normal_queue", false, false, amqp.Publishing{Body: []byte("hello world")})
	if err != nil {
		fmt.Println("Failed to publis msg to exchange: my_exchange,err:" + err.Error())
		return
	}
}

队列信息包括绑定的死信队列信息、ttl等信息如下:

运行上方代码,会向队列发送一条信息,我们先不创建消费者,5s后,消息会被自动放入死信队列。

3.2 队列满

当mq队列由于消息量过多导致队列打满时,这个时候过来的消息,将会被自动放入到死信队列中。

设置队列长度属性代码如下:

args := amqp.Table{
		// "x-message-ttl":             int64(5000), // 5秒TTL
		"x-max-length":              2,
		"x-dead-letter-exchange":    "",
		"x-dead-letter-routing-key": dlx.Name,
	}
	// 声明正常队列
	q, err := ch.QueueDeclare(
		"normal_queue", // name
		true,           // durable
		false,          // delete when unused
		false,          // exclusive
		false,          // no-wait
		args,           // arguments
	)

队列属性如下:

发送两条信息:

 继续发送第三个:

 测试代码:

package main

import (
	"fmt"

	amqp "github.com/rabbitmq/amqp091-go"
)

func main() {
	conn, err := amqp.Dial("amqp://guest:guest@xx.xx.xx.xx:5672/")
	if err != nil {
		fmt.Println("Failed to connect to RabbitMQ")
		return
	}
	defer conn.Close()
	ch, err := conn.Channel()
	if err != nil {
		fmt.Println("Failed to open a channel")
		return
	}
	// 声明死信队列
	dlx, err := ch.QueueDeclare(
		"dead_letter_queue", // name
		true,                // durable
		false,               // delete when unused
		false,               // exclusive
		false,               // no-wait
		nil,                 // arguments
	)
	if err != nil {
		fmt.Println("Failed to declare a queue: dead_letter_queue,err:" + err.Error())
		return
	}
	err = ch.ExchangeDeclare(
		"my_exchange", // name
		"direct",      // type
		true,          // durable
		false,         // auto-deleted
		false,         // internal
		false,         // no-wait
		nil,           // arguments
	)
	if err != nil {
		fmt.Println("Failed to declare exchange: my_exchange,err:" + err.Error())
		return
	}
	// 设置TTL并让消息过期进入死信队列,5s不消费就为进入死信队列
	args := amqp.Table{
		// "x-message-ttl":             int64(5000), // 5秒TTL
		"x-max-length":              2,
		"x-dead-letter-exchange":    "",
		"x-dead-letter-routing-key": dlx.Name,
	}
	// 声明正常队列
	q, err := ch.QueueDeclare(
		"normal_queue", // name
		true,           // durable
		false,          // delete when unused
		false,          // exclusive
		false,          // no-wait
		args,           // arguments
	)
	if err != nil {
		fmt.Println("Failed to declare a queue: normal_queue,err:" + err.Error())
		return
	}
	// 将正常队列绑定到交换机,并设置死信交换机和路由键
	err = ch.QueueBind(
		q.Name,        // queue name
		q.Name,        // routing key
		"my_exchange", // exchange
		false,
		nil,
	)
	if err != nil {
		fmt.Println("Failed to bind queue to exchange: my_exchange,err:" + err.Error())
		return
	}
	err = ch.Publish("my_exchange", "normal_queue", false, false, amqp.Publishing{Body: []byte("hello world")})
	if err != nil {
		fmt.Println("Failed to publis msg to exchange: my_exchange,err:" + err.Error())
		return
	}
}

3.3 消息被拒绝

       消息被拒绝的情况,当消费者无法处理某条信息时,客户端想rabbitmq服务器发送一个【负确认】应答,表示消费者未能成功处理此条消息,并且希望RabbitMQ根据配置重新发送这条消息(例如,将其重新排队)或者将其丢弃。

客户端函数:ch.Nack,函数原型:

func (ch *Channel) Nack(tag uint64, multiple bool, requeue bool) error {
	ch.m.Lock()
	defer ch.m.Unlock()

	return ch.send(&basicNack{
		DeliveryTag: tag,
		Multiple:    multiple,
		Requeue:     requeue,
	})
}

入参含义如下: 

tag

这是一个唯一标识符,用于标识消费者之前接收到的特定消息。当消费者调用 ch.Ackch.Nack 或 ch.Reject 时,必须提供这个标识符,以便RabbitMQ知道是对哪条消息进行确认或拒绝。

multiple

这是一个布尔值(bool),用于指示是否应该同时确认(或拒绝)多条消息。如果设置为 true,则RabbitMQ将认为从上一个被确认的消息开始(包括该消息),直到当前消息为止的所有未确认消息都被拒绝。这通常用于批量处理消息确认,但在使用 ch.Nack 时,它的作用更多是关于是否应该重新排队当前消息之后的消息(取决于RabbitMQ的配置和消息的属性)。

requeue

这也是一个布尔值(bool),用于指示被拒绝的消息是否应该被重新放入队列的末尾以便稍后重试。如果设置为 true,则消息将被重新排队;如果设置为 false,则消息将被丢弃(或者根据RabbitMQ的配置可能被发送到死信队列,如果配置了的话)。

测试过程,首先使用3.1或者3.2的代码向mq中写入几条信息:

之后使用如下代码进行消费:

package main

import (
	"fmt"
	"time"

	amqp "github.com/rabbitmq/amqp091-go"
)

func main() {
	conn, err := amqp.Dial("amqp://guest:guest@xx.xx.xx.xx:5672/")
	if err != nil {
		fmt.Println("Failed to connect to RabbitMQ")
		return
	}
	defer conn.Close()
	ch, err := conn.Channel()
	if err != nil {
		fmt.Println("Failed to open a channel")
		return
	}
	err = ch.ExchangeDeclare(
		"my_exchange", // name
		"direct",      // type
		true,          // durable
		false,         // auto-deleted
		false,         // internal
		false,         // no-wait
		nil,           // arguments
	)
	if err != nil {
		fmt.Println("Failed to declare exchange: my_exchange,err:" + err.Error())
		return
	}

	// 声明正常队列
	// q, err := ch.QueueDeclare(
	// 	"normal_queue", // name
	// 	true,           // durable
	// 	false,          // delete when unused
	// 	false,          // exclusive
	// 	false,          // no-wait
	// 	nil,            // arguments
	// )
	// if err != nil {
	// 	fmt.Println("Failed to declare a queue: normal_queue,err:" + err.Error())
	// 	return
	// }
	// 将正常队列绑定到交换机,并设置死信交换机和路由键
	err = ch.QueueBind(
		"normal_queue", // queue name
		"normal_queue", // routing key
		"my_exchange",  // exchange
		false,
		nil,
	)
	if err != nil {
		fmt.Println("Failed to bind queue to exchange: my_exchange,err:" + err.Error())
		return
	}
	msgs, _ := ch.Consume(
		"normal_queue", // queue
		"",             // consumer
		false,          // auto-ack
		true,           // exclusive
		false,          // no-local
		false,          // no-wait
		nil,            // args
	)
	go func() {
		for d := range msgs {
			// 模拟处理失败,全部放入死信队列
			ch.Nack(d.DeliveryTag, false, false)
		}
	}()
	time.Sleep(10 * time.Second)
}

运行代码后,3条消息全部进入到死信队列中:

 4、总结

      RabbitMQ的死信队列(Dead Letter Queue,简称DLQ)是一种用于处理消息失败或无法路由的消息的机制,死信队列中的所有消息都是无法被正常消费的死信,这使得开发者可以集中对这些消息进行管理和分析。通过分析死信队列中的消息,开发者可以了解系统的运行状态、发现潜在的问题,并进行相应的优化和改进,以提升系统的稳定性和可靠性。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2243048.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

第8章利用CSS制作导航菜单

8.1 水平顶部导航栏 8.1.1 简单水平导航栏的设计与实现 8.1.1.1导航栏的创建 <nav>标签是 HIML5 新增的文档结构标签&#xff0c;用于标记导航栏&#xff0c;以便后续与网站的其他内整合&#xff0c;所以常用<nav>标签在页面上创建导航栏菜单区域。 例如,在<na…

「人眼视觉不再是视频消费的唯一形式」丨智能编解码和 AI 视频生成专场回顾@RTE2024

你是否想过&#xff0c;未来你看到的电影预告片、广告&#xff0c;甚至新闻报道&#xff0c;都可能完全由 AI 生成&#xff1f; 在人工智能迅猛发展的今天&#xff0c;视频技术正经历着一场前所未有的变革。从智能编解码到虚拟数字人&#xff0c;再到 AI 驱动的视频生成&#…

C++:哈希拓展-位图

目录 一.问题导入 二.什么是位图? 2.1如何确定目标数在哪个比特位? 2.2如何存放高低位 2.3位图模拟代码实现 2.3.1如何标记一个数 2.3.2如何重置标记 2.3.3如何检查一个数是否被标记 整体代码实现 标准库的Bitset 库中的bitset的缺陷 简单应用 一.问题导入 这道…

nacos-operator在k8s集群上部署nacos-server2.4.3版本踩坑实录

文章目录 操作步骤1. 拉取仓库代码2. 安装nacos-operator3. 安装nacos-server 坑点一坑点二nacos-ui页面访问同一集群环境下微服务连接nacos地址配置待办参考文档 操作步骤 1. 拉取仓库代码 &#xff08;这一步主要用到代码中的相关yml文件&#xff0c;稍加修改用于部署容器&…

Python爬虫----python爬虫基础

一、python爬虫基础-爬虫简介 1、现实生活中实际爬虫有哪些&#xff1f; 2、什么是网络爬虫&#xff1f; 3、什么是通用爬虫和聚焦爬虫&#xff1f; 4、为什么要用python写爬虫程序 5、环境和工具 二、python爬虫基础-http协议和chrome抓包工具 1、什么是http和https协议…

从北美火到中国,大数据洞察品牌“STANLEY”的突围之路

保守直筒大头的“硬汉”外形&#xff0c;以百变颜色踩中时尚命脉&#xff0c;与各路大牌“梦幻联动”&#xff0c;不少时尚弄潮儿没能逃过其“真香”诱惑。 这就是今年以来从北美火到中国的STANLEY&#xff0c;在“巨无霸”水杯中突围出属于自己的一条路。 最近STANLEY又整活…

Java结合ElasticSearch根据查询关键字,高亮显示全文数据。

由于es高亮显示机制的问题。当全文内容过多&#xff0c;且搜索中标又少时&#xff0c;就会出现高亮结果无法覆盖全文。因此需要根据需求手动替换。 1.根据es的ik分词器获取搜索词的分词结果。 es部分&#xff1a; //中文分词解析 post /_analyze {"analyzer":"…

Python绘制雪花

文章目录 系列目录写在前面技术需求完整代码代码分析1. 代码初始化部分分析2. 雪花绘制核心逻辑分析3. 窗口保持部分分析4. 美学与几何特点总结 写在后面 系列目录 序号直达链接爱心系列1Python制作一个无法拒绝的表白界面2Python满屏飘字表白代码3Python无限弹窗满屏表白代码4…

Linux性能优化之火焰图简介

Linux 火焰图&#xff08;Flame Graph&#xff09;是一种可视化工具&#xff0c;用于分析程序性能问题&#xff0c;尤其是 CPU 使用情况。它展示了程序中函数调用的层次结构和各个调用栈占用的时间比例。 以下是详细介绍&#xff0c;包括火焰图的工作原理、生成步骤和实际使用中…

Axure设计之文本编辑器制作教程

文本编辑器是一个功能强大的工具&#xff0c;允许用户在图形界面中创建和编辑文本的格式和布局&#xff0c;如字体样式、大小、颜色、对齐方式等&#xff0c;在Web端实际项目中&#xff0c;文本编辑器的使用非常频繁。以下是在Axure中模拟web端富文本编辑器&#xff0c;来制作文…

Python中的正则表达式教程

一、 正则表达式基础 1。1。概念介绍 正则表达式是用于处理字符串的强大工具,它并不是Python的一部分。 其他编程语言中也有正则表达式的概念,区别只在于不同的编程语言实现支持的语法数量不同。 它拥有自己独特的语法以及一个独立的处理引擎&#xff0c;在提供了正则表达式…

脑机接口、嵌入式 AI 、工业级 MR、空间视频和下一代 XR 浏览器丨RTE2024 空间计算和新硬件专场回顾

这一轮硬件创新由 AI 引爆&#xff0c;或许最大受益者仍是 AI&#xff0c;因为只有硬件才能为 AI 直接获取最真实世界的数据。 在人工智能与硬件融合的新时代&#xff0c;实时互动技术正迎来前所未有的创新浪潮。从嵌入式系统到混合现实&#xff0c;从空间视频到脑机接口&…

Python爬虫下载新闻,Flask展现新闻(2)

上篇讲了用Python从新闻网站上下载新闻&#xff0c;本篇讲用Flask展现新闻。关于Flask安装网上好多教程&#xff0c;不赘述。下面主要讲 HTML-Flask-数据 的关系。 简洁版 如图&#xff0c;页面简单&#xff0c;主要显示新闻标题。 分页&#xff0c;使用最简单的分页技术&…

Linux下编译MFEM

本文记录在Linux下编译MFEM的过程。 零、环境 操作系统Ubuntu 22.04.4 LTSVS Code1.92.1Git2.34.1GCC11.4.0CMake3.22.1Boost1.74.0oneAPI2024.2.1 一、安装依赖 二、编译代码 附录I: CMakeUserPresets.json {"version": 4,"configurePresets": [{&quo…

Win10/11 安装使用 Neo4j Community Edition

如果你下载的是 Neo4j Community Edition 的压缩包&#xff0c;意味着你需要手动解压并配置 Neo4j。以下是详细的使用步骤&#xff1a; 0. 下载压缩包 访问Neo4j官网&#xff0c;找到 Community Edition 版本并选择 4.x 或者 5.x 下载&#xff1a;https://neo4j.com/deployme…

Spring Boot教程之Spring Boot简介

Spring Boot 简介 接下来一段时间&#xff0c;我会持续发布并完成Spring Boot教程 Spring 被广泛用于创建可扩展的应用程序。对于 Web 应用程序&#xff0c;Spring 提供了 Spring MVC&#xff0c;它是 Spring 的一个广泛使用的模块&#xff0c;用于创建可扩展的 Web 应用程序。…

基于java+SpringBoot+Vue的智能物流管理系统设计与实现

项目运行 环境配置&#xff1a; Jdk1.8 Tomcat7.0 Mysql HBuilderX&#xff08;Webstorm也行&#xff09; Eclispe&#xff08;IntelliJ IDEA,Eclispe,MyEclispe,Sts都支持&#xff09;。 项目技术&#xff1a; Springboot mybatis Maven mysql5.7或8.0等等组成&#x…

智能零售柜商品识别

项目源码获取方式见文章末尾&#xff01; 600多个深度学习项目资料&#xff0c;快来加入社群一起学习吧。 《------往期经典推荐------》 项目名称 1.【基于CNN-RNN的影像报告生成】 2.【卫星图像道路检测DeepLabV3Plus模型】 3.【GAN模型实现二次元头像生成】 4.【CNN模型实现…

【Mysql】Mysql函数(上)

1、概述 在Mysql中&#xff0c;为了提高代码重用性和隐藏实现细节&#xff0c;Mysql提供了很多函数。函数可以理解为封装好的模块代码。 2、分类 在Mysql中&#xff0c;函数非常多&#xff0c;主要可以分为以下几类&#xff1a; &#xff08;1&#xff09;聚合函数 &#xf…

sql数据库-分页查询-DQL

目录 语法 注意 举例 语法 select 字段列表 from 表名 limit 起始索引,查询记录数; 注意 起始索引&#xff1a;即从第几条数据开始分页&#xff0c;简单理解为起始索引&#xff08;查询页码-1&#xff09;* 每页显示数据 分页查询在不同的数据库中有不同的方法。 查询第一页…