Golang实现RabbitMQ中死信队列各个情况

news2024/9/25 21:29:21

下面这段教程针对是你已经有一些基本的MQ的知识,比如说能够很清楚的理解queue、exchange等概念,如果你还不是很理解,我建议你先访问官网查看基本的教程。

文章目录

    • 1、造成死信队列的主要原因
    • 2、操作逻辑图
    • 3、代码实战
      • 3.1 针对原因1:消费者超出时间未应答
      • 3.3 针对原因2:限制一定的长度
      • 3.3 针对原因3:消费者拒绝的消息回到死信队列中

1、造成死信队列的主要原因

  • 消费者超时未应答
  • 队列的容量有限
  • 消费者拒绝了的消息

2、操作逻辑图

请添加图片描述

3、代码实战

其实整体的思路就是分别创建一个normal_exchange、dead_exchange、normal_queue、dead_queue,然后将normal_exchange与normal_queue进行绑定,将dead_exchange与dead_queue进行绑定,这里比较关键的一个点在于说如何将normal_queue与dead_exchange进行绑定,这样才能将错误的消息传递过来。下面就是这段代码的关键。

// 声明一个normal队列
	_, err = ch.QueueDeclare(
		constant.NormalQueue,
		true,
		false,
		false,
		false,
		amqp.Table{
			//"x-message-ttl":             5000,                    // 指定过期时间
			//"x-max-length":              6,						// 指定长度。超过这个长度的消息会发送到dead_exchange中
			"x-dead-letter-exchange":    constant.DeadExchange,    // 指定死信交换机
			"x-dead-letter-routing-key": constant.DeadRoutingKey,  // 指定死信routing-key
		})

3.1 针对原因1:消费者超出时间未应答

consumer1.go

package day07

import (
	amqp "github.com/rabbitmq/amqp091-go"
	"log"
	"v1/utils"
)

type Constant struct {
	NormalExchange   string
	DeadExchange     string
	NormalQueue      string
	DeadQueue        string
	NormalRoutingKey string
	DeadRoutingKey   string
}

func Consumer1() {
	// 获取连接
	ch := utils.GetChannel()
	// 创建一个变量常量
	constant := Constant{
		NormalExchange:   "normal_exchange",
		DeadExchange:     "dead_exchange",
		NormalQueue:      "normal_queue",
		DeadQueue:        "dead_queue",
		NormalRoutingKey: "normal_key",
		DeadRoutingKey:   "dead_key",
	}
	// 声明normal交换机
	err := ch.ExchangeDeclare(
		constant.NormalExchange,
		amqp.ExchangeDirect,
		true,
		false,
		false,
		false,
		nil,
	)
	utils.FailOnError(err, "Failed to declare a normal exchange")
	// 声明一个dead交换机
	err = ch.ExchangeDeclare(
		constant.DeadExchange,
		amqp.ExchangeDirect,
		true,
		false,
		false,
		false,
		nil,
	)
	utils.FailOnError(err, "Failed to declare a dead exchange")

	// 声明一个normal队列
	_, err = ch.QueueDeclare(
		constant.NormalQueue,
		true,
		false,
		false,
		false,
		amqp.Table{
			"x-message-ttl": 5000, // 指定过期时间
			//"x-max-length":              6,
			"x-dead-letter-exchange":    constant.DeadExchange,   // 指定死信交换机
			"x-dead-letter-routing-key": constant.DeadRoutingKey, // 指定死信routing-key
		})
	utils.FailOnError(err, "Failed to declare a normal queue")
	// 声明一个dead队列:注意不要给死信队列设置消息时间,否者死信队列里面的信息会再次过期
	_, err = ch.QueueDeclare(
		constant.DeadQueue,
		true,
		false,
		false,
		false,
		nil)
	utils.FailOnError(err, "Failed to declare a dead queue")

	// 将normal_exchange与normal_queue进行绑定
	err = ch.QueueBind(constant.NormalQueue, constant.NormalRoutingKey, constant.NormalExchange, false, nil)
	utils.FailOnError(err, "Failed to binding normal_exchange with normal_queue")
	// 将dead_exchange与dead_queue进行绑定
	err = ch.QueueBind(constant.DeadQueue, constant.DeadRoutingKey, constant.DeadExchange, false, nil)
	utils.FailOnError(err, "Failed to binding dead_exchange with dead_queue")

	// 消费消息
	msgs, err := ch.Consume(constant.NormalQueue,
		"",
		false, // 这个地方一定要关闭自动应答
		false,
		false,
		false,
		nil)
	utils.FailOnError(err, "Failed to consume in Consumer1")

	var forever chan struct{}

	go func() {
		for d := range msgs {
			if err := d.Reject(false); err != nil {
				utils.FailOnError(err, "Failed to Reject a message")
			}
		}
	}()
	log.Printf(" [*] Waiting for logs. To exit press CTRL+C")

	<-forever
}

consumer2.go

package day07

import (
	amqp "github.com/rabbitmq/amqp091-go"
	"log"
	"v1/utils"
)

func Consumer2() {
	// 拿取信道
	ch := utils.GetChannel()

	// 声明一个交换机
	err := ch.ExchangeDeclare(
		"dead_exchange",
		amqp.ExchangeDirect,
		true,
		false,
		false,
		false,
		nil)
	utils.FailOnError(err, "Failed to Declare a exchange")

	// 接收消息的应答
	msgs, err := ch.Consume("dead_queue",
		"",
		false,
		false,
		false,
		false,
		nil,
	)

	var forever chan struct{}
	go func() {
		for d := range msgs {
			log.Printf("[x] %s", d.Body)
			// 开启手动应答ß
			d.Ack(false)
		}
	}()
	log.Printf(" [*] Waiting for logs. To exit press CTRL+C")
	<-forever

}

produce.go

package day07

import (
	"context"
	amqp "github.com/rabbitmq/amqp091-go"
	"strconv"
	"time"
	"v1/utils"
)

func Produce() {
	// 获取信道
	ch := utils.GetChannel()
	// 声明一个交换机
	err := ch.ExchangeDeclare(
		"normal_exchange",
		amqp.ExchangeDirect,
		true,
		false,
		false,
		false,
		nil)
	utils.FailOnError(err, "Failed to declare a exchange")
	ctx, cancer := context.WithTimeout(context.Background(), 5*time.Second)
	defer cancer()

	// 发送了10条消息
	for i := 0; i < 10; i++ {
		msg := "Info:" + strconv.Itoa(i)
		ch.PublishWithContext(ctx,
			"normal_exchange",
			"normal_key",
			false,
			false,
			amqp.Publishing{
				ContentType: "text/plain",
				Body:        []byte(msg),
			})
	}
}

3.3 针对原因2:限制一定的长度

只需要改变consumer1.go中的对normal_queue的声明

// 声明一个normal队列
	_, err = ch.QueueDeclare(
		constant.NormalQueue,
		true,
		false,
		false,
		false,
		amqp.Table{
			//"x-message-ttl": 5000, // 指定过期时间
			"x-max-length":              6,
			"x-dead-letter-exchange":    constant.DeadExchange,   // 指定死信交换机
			"x-dead-letter-routing-key": constant.DeadRoutingKey, // 指定死信routing-key
		})

3.3 针对原因3:消费者拒绝的消息回到死信队列中

这里需要完成两点工作
工作1:需要在consumer1中作出拒绝的操作

go func() {
		for d := range msgs {
			if err := d.Reject(false); err != nil {
				utils.FailOnError(err, "Failed to Reject a message")
			}
		}
	}()

工作2:如果你consume的时候开启了自动应答一定要关闭

// 消费消息
	msgs, err := ch.Consume(constant.NormalQueue,
		"",
		false, // 这个地方一定要关闭自动应答
		false,
		false,
		false,
		nil)

其他的部分不需要改变,按照问题1中的设计即可。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/376346.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

win10下Vue环境搭建(脚手架初始化+项目启动)教程(详解多图)

提示&#xff1a;文章写完后&#xff0c;目录可以自动生成&#xff0c;如何生成可参考右边的帮助文档 文章目录前言一、下载安装node.js二、node.js环境配置三、下载安装vue脚手架前言 初学Vue 搭环境快把我整死了QAQ 差点入门即入土 一、下载安装node.js ①下载地址&#x…

bug的创建和等级

1.如何合理的创建一个bug 创建bug的要素 &#xff1a;问题的版本&#xff0c;发现问题的环境&#xff0c;发现问题的步骤&#xff0c;预取结果&#xff0c;实际结果。 eg&#xff1a; 1.问题的版本&#xff1a;谷歌浏览器108版本 2.发现问题的环境&#xff1a;windows11家庭版…

算法训练营 day56 动态规划 最长递增子序列 最长连续递增序列 最长重复子数组

算法训练营 day56 动态规划 最长递增子序列 最长连续递增序列 最长重复子数组 最长递增子序列 300. 最长递增子序列 - 力扣&#xff08;LeetCode&#xff09; 给你一个整数数组 nums &#xff0c;找到其中最长严格递增子序列的长度。 子序列 是由数组派生而来的序列&#x…

如何理解API?API 是如何工作的?(5分钟诠释)

大家可能最近经常听到 API 这个概念&#xff0c;那什么是API&#xff0c;它又有什么特点和好处呢&#xff1f; wiki 百科镇楼 …[APIs are] a set of subroutine definitions, protocols, and tools for building application software. In general terms, it’s a set of cle…

Linux文件属性--软连接和硬链接

文章目录软链接硬链接软链接和硬链接的区别软链接 软链接&#xff08;Soft Link&#xff09;又叫符号链接&#xff08;Symbolic Link &#xff09;&#xff0c;是linux特殊文件的一种&#xff0c;文件类型为l,它的数据是它所链接的文件或目录的路径。软链接可以跨磁盘和 分区…

《数据库系统概论》学习笔记——第三章 关系数据库标准语言SQL

教材为数据库系统概论第五版&#xff08;王珊&#xff09; 最重量级的一章。从后续的学习&#xff0c;基本所有实验&#xff0c;大作业和考试都会涉及SQL&#xff0c;SQL实际上是有很多变化的&#xff0c;书上讲的只是最基本的&#xff08;做了大作业才知道SQL能有这么多变化&a…

移动端自动化测试(一)appium环境搭建

自动化测试有主要有两个分类&#xff0c;接口自动化和ui自动化&#xff0c;ui自动化呢又分移动端的和web端的&#xff0c;当然还有c/s架构的&#xff0c;这种桌面程序应用的自动化&#xff0c;使用QTP&#xff0c;只不过现在没人做了。 web自动化呢&#xff0c;现在基本上都是…

k8s 强制删除 namespace

k8s 强制删除 namespace1、命名空间查看资源2、删除k8s指定命名空间3、强制删除3.1、导出命名空间jsno3.2、使用kubectl代理3.3、代理测试3.4、使用http接口进行删除1、命名空间查看资源 kubectl api-resources -o name --verbslist --namespaced | xargs -n 1 kubectl get --…

jsp运动会管理系统论文

技术&#xff1a;Java、JSP等摘要&#xff1a;运动会作为各项体育运动的基础&#xff0c;具有广泛的群众性。每年一度的学校运动会声势浩 大&#xff0c;是提高、检验学生的健康水平&#xff0c;开展全民健身运动的有效途径。在规模上虽然不如大型运动会&#xff0c;但由于比赛…

【Python数据挖掘入门】2.2文本分析-中文分词(jieba库cut方法/自定义词典load_userdict/语料库分词)

中文分词就是将一个汉字序列切分成一个一个单独的词。例如&#xff1a; 另外还有停用词的概念&#xff0c;停用词是指在数据处理时&#xff0c;需要过滤掉的某些字或词。 一、jieba库 安装过程见&#xff1a;https://blog.csdn.net/momomuabc/article/details/128198306 ji…

Docker -- Docker底层原理深度剖析

概论 谈到Docker原理&#xff0c;我们先来三板斧。 Linux命名空间&#xff08;namespace&#xff09;控制组&#xff08;cgroups&#xff09;联合文件系统&#xff08;UnionFS&#xff09; 然后我们心中要明白一件事情&#xff1a; 容器是一种特殊的进程。容器技术的核心功能…

vue2 diff算法

diff是什么 diff 算法是一种通过同层的树节点进行比较的高效算法 其有两个特点&#xff1a; ♥比较只会在同层级进行, 不会跨层级比较 ♥在diff比较的过程中&#xff0c;循环从两边向中间比较 diff 算法的在很多场景下都有应用&#xff0c;在 vue 中&#xff0c;作用于虚拟 dom…

HTML#4超链接标签,列表标签,表格标签和布局标签

一. 超链接标签介绍<a> 定义超链接,用于连接到另一个资源herf: 指定访问资源的URLtarget: 指定打开资源的方式代码<!DOCTYPE html> <html lang"en"> <head><meta charset"UTF-8"><title>超链接标签</title> <…

ES6-ES13教程笔记(一)

课程目录 1、走入ES6 1.1、初识ES6 1.2 let声明变量与const声明常量 1、let声明 1、块级作用域&#xff0c;不会成为全局变量影响别人&#xff1b; 2、不允许重复声明&#xff1b; 3、变量不会提升&#xff1b;&#xff08;会存在暂时性死区&#xff09; 4、不与顶层对象挂…

模糊神经网络(FNN)的实现(Python,附源码及数据集)

文章目录一、理论基础1、模糊神经网络结构2、前向传播过程3、反向传播过程4、建模步骤二、模糊神经网络的实现1、训练过程&#xff08;FNN.py&#xff09;2、测试过程&#xff08;test.py&#xff09;3、测试结果4、参考源码及实验数据集一、理论基础 模糊神经网络&#xff08…

阿里大数据之路总结

一、数据采集 二、数据同步 2.1、数据同步方式&#xff1a; 数据同步的三种方式&#xff1a;直连方式、数据文件同步、数据库日志解析方式 关系型数据库的结构化数据&#xff1a;MYSQL、Oracle、DB2、SQL Server非关系型数据库的非结构化数据&#xff08;数据库表形式存储&am…

力扣Top100题之两数相加(Java解法)

0 题目描述 给你两个 非空 的链表&#xff0c;表示两个非负的整数。它们每位数字都是按照 逆序 的方式存储的&#xff0c;并且每个节点只能存储 一位 数字。 请你将两个数相加&#xff0c;并以相同形式返回一个表示和的链表。 你可以假设除了数字 0 之外&#xff0c;这两个数…

Unity IOS 通过命令行导出IPA

新建一个文件没有后缀然后输入如下内容 #!/usr/bin/env sh /Applications/Unity/Hub/Editor/2020.1.5f1c1/Unity.app/Contents/MacOS/Unity -quit -batchmode -projectPath /Users/zyt/Test -executeMethod Test.BuildEditor.BuildApp cd /Users/zyt/Test/Xcode/unity-xcode x…

Redis:SETNX解决分布式锁误删问题

Redis&#xff1a;SETNX解决分布式锁误删问题一.概述二. 分布式锁&#xff08;初级&#xff09;&#xff08;1&#xff09;锁接口&#xff08;2&#xff09;锁实现类上锁&#xff08;3&#xff09;释放锁&#xff08;4&#xff09;存在的问题三. 改进释放锁&#xff08;1&#…

linux:字符串拷贝的五种方法:使用指针下标,指针变量加偏移量,指针变量自加等

字符串数组名做函数形参&#xff0c;会退化正指针变量&#xff0c;需要使用指针变量操作字符串 代码&#xff1a; #include <stdlib.h> #include <stdio.h> #include <unistd.h> #include <sys/types.h> #include <sys/stat.h> #include <s…