Golang使用消息队列(RabbitMQ)

news2024/11/24 22:27:14

最近在使用Golang做了一个网盘项目(类似百度网盘),这个网盘项目有一个功能描述如下:用户会删除一个文件到垃圾回收站,回收站的文件有一个时间期限,比如24h,24h后数据库中记录和oss中文件会被删除,在之前的版本中,可以使用定时任务来检查数据库记录中删除时间来判断是否删除,但是这不是最佳的,因此考虑如何基于RabbitMQ来实现这个功能。

使用RabbitMQ的架构

在这里插入图片描述

代码

因为前端有点麻烦,这里全部使用Golang后端来模拟实现整个架构,包括生产端和消费端。这里有一些细节

  • 注意交换机和队列的绑定,一定要细心
  • 交换机一旦声明了就不能更改,如果要发生一些属性的更改,就要删除原来的内容,重新生成
  • 下列的内容不包含RabbitMQ持久化的内容
package main

import (
	"fmt"
	"github.com/streadway/amqp"
	"log"
	"strings"
)

func InitRabbitMQ() *amqp.Connection {
	mq := "amqp"
	host := "127.0.0.1"
	port := "5672"
	user := "root"
	pwd := "root"
	dns := strings.Join([]string{mq, "://", user, ":", pwd,
		"@", host, ":", port, "/"}, "")
	conn, err := amqp.Dial(dns)
	if err != nil {
		log.Fatalf("Failed to connect to RabbitMQ: %v", err)
	}
	return conn
}

func InitMainExchangeAndQueue(ch *amqp.Channel, userID string) *amqp.Channel {
	// 队列信息
	exchangeName := "main_exchange"
	queueName := fmt.Sprintf("user_queue_%s", userID)
	messageTTL := int32(300000)

	// 声明主交换机
	err := ch.ExchangeDeclare(
		exchangeName, // 交换机名
		"direct",     // Exchange type
		false,        // Durable
		false,        // Auto-deleted
		false,        // Internal
		false,        // No-wait
		nil,          // Arguments
	)
	if err != nil {
		log.Fatalf("Failed to declare an main exchange: %v", err)
	}

	// 声明用户队列
	_, err = ch.QueueDeclare(
		queueName, // 队列名
		false,     // Durable
		false,     // Delete when unused
		false,     // Exclusive
		false,     // No-wait
		amqp.Table{
			"x-dead-letter-routing-key": "dead",          // routing-key
			"x-dead-letter-exchange":    "dead_exchange", // 死信交换机
			"x-message-ttl":             messageTTL,      // TTL
		},
	)
	if err != nil {
		log.Fatalf("Failed to declare a queue: %v", err)
	}

	// 绑定
	err = ch.QueueBind(queueName, userID, "main_exchange", false, nil)
	if err != nil {
		log.Fatalf("Failed to bind queue to exchange: %v", err)
	}

	return ch
}

func InitDeadExchangeAndQueue(ch *amqp.Channel) {
	// 声明死信交换机
	err := ch.ExchangeDeclare(
		"dead_exchange",
		amqp.ExchangeDirect,
		true,
		false,
		false,
		false,
		nil,
	)
	if err != nil {
		log.Fatalf("Failed to declare an dead exchange: %v", err)
	}

	// 声明一个死信队列
	_, err = ch.QueueDeclare(
		"dead_queue",
		true,
		false,
		false,
		false,
		nil)
	if err != nil {
		log.Fatalf("Failed to declare a queue: %v", err)
	}

	// 绑定
	err = ch.QueueBind("dead_queue", "dead", "dead_exchange", false, nil)
	if err != nil {
		log.Fatalf("Failed to bind queue to exchange: %v", err)
	}
}

func PublishMessage(ch *amqp.Channel, userID, fileID string) {
	// 用户信息
	message := fmt.Sprintf("%s|%s", userID, fileID)
	exchangeName := "main_exchange"
	// 发布用户消息
	err := ch.Publish(
		exchangeName, // Exchange
		userID,       // Routing key
		false,        // Mandatory
		false,        // Immediate
		amqp.Publishing{
			ContentType: "text/plain",
			Body:        []byte(message),
		})
	if err != nil {
		log.Fatalf("Failed to publish a message: %v", err)
	}
	log.Printf("Message sent to user %s: %s", userID, message)
}

func ConsumeTTL(ch *amqp.Channel) {
	// 声明死信交换机
	err := ch.ExchangeDeclare(
		"dead_exchange", // 交换机名
		"direct",        // Exchange type
		true,            // Durable
		false,           // Auto-deleted
		false,           // Internal
		false,           // No-wait
		nil,             // Arguments
	)
	if err != nil {
		log.Fatalf("Failed to declare a dead letter exchange: %v", err)
	}

	// 创建消费者并阻塞等待消费死信队列中的消息
	megs, err := ch.Consume(
		"dead_queue", // Queue
		"",           // Consumer
		false,        // Auto-acknowledge
		false,        // Exclusive
		false,        // No-local
		false,        // No-wait
		nil,          // Args
	)
	if err != nil {
		log.Fatalf("Failed to register a consumer for dead letter queue: %v", err)
	}

	// 使用无限循环一直监听
	fmt.Println("Waiting for message from dead_queue......")
	for d := range megs {
		// 实际中,处理消息的逻辑,例如删除文件或其他操作
		fmt.Println(string(d.Body))

		// 消费完成后手动确认消息
		err = d.Ack(false)
		if err != nil {
			log.Fatalf("Failed to ack message: %v", err)
		}
	}
}

func Consume(ch *amqp.Channel, userID string) {
	// 下面的信息可以通过前后端进行传递
	queueName := fmt.Sprintf("user_queue_%s", userID)

	// 消费消息
	megs, err := ch.Consume(
		queueName, // Queue
		"",        // Consumer
		true,      // Auto-acknowledge
		false,     // Exclusive
		false,     // No-local
		false,     // No-wait
		nil,       // Args
	)
	if err != nil {
		log.Fatalf("Failed to register a consumer: %v", err)
	}

	// 这里直接是由前端发送过来的API进行触发,所以不用一直阻塞监听
	d, ok := <-megs
	if !ok {
		log.Fatalf("Failed to get message: %v", err)
	}
	fmt.Println(string(d.Body))
	// 消息完成后确认消息
	err = d.Ack(true)
	if err != nil {
		log.Fatalf("Failed to ack message: %v", err)
	}
}

func main() {
	// 获取客户端
	client := InitRabbitMQ()
	defer client.Close()

	ch, err := client.Channel()
	if err != nil {
		log.Fatalf("Failed to open a channel: %v", err)
	}
	defer ch.Close()

	//ConsumeTTL(ch)

	// 构造dead_exchange及dead_queue
	// InitDeadExchangeAndQueue(ch)

	// 假设这是web请求信息
	//var userID1 = "test-id1"
	//var fileID1 = "file1"

	// 构造main_exchange及user_queue
	//ch = InitMainExchangeAndQueue(ch, userID1)
	// 针对用户1:假设还消息没有过期时候就被recovery,即在user_queue中就被消费,实际中发布消息的这部分逻辑应当放在前端中
	//PublishMessage(ch, userID1, fileID1)

	//time.Sleep(20 * time.Second)

	// 模拟后端消费消息
	//Consume(ch, userID1)

	// 针对用户2:模拟其不被后端消费,过期到死信队列中
	var userID2 = "test-id2"
	var fileID2 = "file2"
	ch = InitMainExchangeAndQueue(ch, userID2)
	PublishMessage(ch, userID2, fileID2)
	// 注意这个消息没有被消费,理论上应当被死信队列消费
}

从dead_exchange中消费:
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/904392.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

使用Vscode 编辑器 导出、导入和运行Excel中的VBA代码

使用Vscode 编辑器 导出、导入和运行Excel中的VBA代码 前言 Excel自带的 Microsoft Visual Basic for Applications 编辑器常被人称为上古编辑器&#xff0c;的确不适合代码编辑&#xff0c;这是其一&#xff0c;其二是当系统语言与Excel的安装语言不一致时&#xff0c;往往出现…

QChart类用来 管理 图表的:数据序列(series)、图例(legend)和坐标轴(axis)

QChart类用来 管理 图表的&#xff1a;数据序列&#xff08;series&#xff09;、图例&#xff08;legend&#xff09;和坐标轴&#xff08;axis&#xff09; 1、数据序列类 继承关系 2、坐标轴类 的继承关系 3、图例类 什么是图例&#xff1f; 图例&#xff1a;是集中于地图…

Docker搭建LNMP运行Wordpress平台

一、项目1.1 项目环境1.2 服务器环境1.3 任务需求 二、Linux 系统基础镜像三、Nginx1、建立工作目录2、编写 Dockerfile 脚本3、准备 nginx.conf 配置文件4、生成镜像5、创建自定义网络6、启动镜像容器7、验证 nginx 四、Mysql1、建立工作目录2、编写 Dockerfile3、准备 my.cnf…

如何做H5性能测试?

提起H5性能测试&#xff0c;可能许多同学有所耳闻&#xff0c;但是不知道该如何对H5做性能测试&#xff0c;或者不知道H5应该关注哪些性能指标。今天我们就来看下&#xff0c;希望阅读本文后&#xff0c;能够有所了解。 常用指标 1、H5性能相关参数介绍 白屏时间&#xff1a;…

FRP内网穿透,配置本地电脑作为服务器

FRP内网穿透&#xff0c;配置本地电脑作为服务器 下载FRP服务端客户端 参考链接&#xff1a; https://www.it235.com/实用工具/内网穿透/pierce.html https://www.cnblogs.com/007sx/p/17469301.html 由于没有公网ip&#xff0c;所以尝试内网穿透将本地电脑作为服务器&#xff…

第 6 章 递归(1)(应用场景,概念,调用机制,解决问题类型,重要规则)

6.1递归应用场景 看个实际应用场景&#xff0c;迷宫问题(回溯)&#xff0c; 递归(Recursion) 6.2递归的概念 简单的说: 递归就是方法自己调用自己,每次调用时传入不同的变量.递归有助于编程者解决复杂的问题,同时可以让代码变得简洁。 6.3递归调用机制 我列举两个小案例,…

代码随想录算法训练营之JAVA|第三十四天|509. 斐波那契数

今天是第 天刷leetcode&#xff0c;立个flag&#xff0c;打卡60天&#xff0c;如果做不到&#xff0c;完成一件评论区点赞最高的挑战。 算法挑战链接 509. 斐波那契数https://leetcode.cn/problems/fibonacci-number/ 第一想法 这个就是求斐波那契数&#xff0c;感觉应该不用…

Error creating bean with name ‘esUtils‘ defined in file

报错异常&#xff1a; 背景&#xff1a; esUtils在common服务中、启动media服务时候、报这个异常、后排查esUtils在启动时候发生异常引起的、在相关bean中加入try{}catch{}即可解决问题 String[] split url.split(","); HttpHost[] httpHosts new HttpHost[split.…

卷积网络手动实现和nn实现

代码中涉及的图片实验数据下载地址&#xff1a;https://download.csdn.net/download/m0_37567738/88235543?spm1001.2014.3001.5501 &#xff08;一&#xff09;手动实现卷积算法 代码&#xff1a; import os import torch.nn.functional as F from PIL import Image import…

装饰器读取不到被装饰函数的参数-已解决

def write_case_log(func):def wrapper(*args, **kwargs):logger.info("{}开始执行".format(func.__name__))func(*args,**kwargs)logger.info("{}执行中".format(args))logger.info("{}执行结束",format(func.__name__))return wrapper被装饰函…

卡方分箱(chi-square)

统计学&#xff0c;风控建模经常遇到卡方分箱算法ChiMerge。卡方分箱在金融信贷风控领域是逻辑回归评分卡的核心&#xff0c;让分箱具有统计学意义&#xff08;单调性&#xff09;。卡方分箱在生物医药领域可以比较两种药物或两组病人是否具有显著区别。但很多建模人员搞不清楚…

基于SSM的校园旧书交易交换网站

博主主页&#xff1a;猫头鹰源码 博主简介&#xff1a;Java领域优质创作者、CSDN博客专家、公司架构师、全网粉丝5万、专注Java技术领域和毕业设计项目实战 主要内容&#xff1a;毕业设计(Javaweb项目|小程序等)、简历模板、学习资料、面试题库、技术咨询 文末联系获取 项目介绍…

CVE-2023-21292 AMS框架层高危漏洞分析

文章目录 前言漏洞细节故事起源漏洞利用漏洞修复 总结 前言 本周在分析 Google 官方发布的 Android 2023 年8 月安全公告 涉及的漏洞补丁的时候&#xff0c;遇到一个有意思的漏洞&#xff1a;CVE-2023-21292。 之所以说它有意思是因为这个漏洞早在去年年底就在某平台上被国外…

shell脚本语句

一、语句 一、条件语句 一、以用户为例演示 一、显示当前登录系统的用户信息 w命令 二、显示有多少个用户 w | wc -l 显示有7个用户 前两个是固定标题&#xff0c;从第三个开始才是登录用户&#xff0c;所以要统计数量需要 命令&#xff1a;echo $[$(w | wc -l) -2] 显示…

知识储备--基础算法篇-Hash table

1.哈希表的基础概念 哈希表是一种数据结构&#xff0c;它使用哈希函数将键映射到存储桶或槽位中。它通过将键转换为索引来实现快速的插入、查找和删除操作。哈希表通常用于需要高效查找的场景&#xff0c;如字典、缓存和数据库中。 常见哈希结构 数组set&#xff08;集合&am…

系统架构设计专业技能 · 数据库设计

系列文章目录 系统架构设计专业技能 软件工程&#xff08;一&#xff09;【系统架构设计师】 系统架构设计高级技能 软件架构概念、架构风格、ABSD、架构复用、DSSA&#xff08;一&#xff09;【系统架构设计师】 系统架构设计高级技能 系统质量属性与架构评估&#xff08;…

高频面试题:多线程顺序打印ABC字符20次

一个关于多线程协作的题目经常会出现在大厂的面试中&#xff1a;有三个线程分别打印A、B、C&#xff0c;请让这三个线程按顺序打印出ABC20次。 我们知道&#xff0c;线程调度机制是非确定性的&#xff0c;如果不加上额外的并发控制&#xff0c;直接启动三个线程&#xff0c;那么…

【软件测试】Java和Python做自动化测试哪个更有优势?

Java和Python做自动化测试&#xff0c;哪个更有优势&#xff1f;这两个语言都是很流行的语言&#xff0c;所以从技术上很难说谁好谁不好的。因为要说好不好得看使用的环境和要求。就像生活在中国&#xff0c;你天天说英语&#xff0c;别人会说你“又拽鸟语啊”&#xff1f;但是…

Postman的高级用法—Runner的使用​

1.首先在postman新建要批量运行的接口文件夹&#xff0c;新建一个接口&#xff0c;并设置好全局变量。 2.然后在Test里面设置好要断言的方法 如&#xff1a; tests["Status code is 200"] responseCode.code 200; tests["Response time is less than 10000…

接口自动化中如何完成接口加密与解密?

加密是一种限制对网络上传输数据的访问权的技术。将密文还原为原始明文的过程称为解密&#xff0c;它是加密的反向处理。在接口开发中使用加密、解密技术&#xff0c;可以防止机密数据被泄露或篡改。在接口自动化测试过程中&#xff0c;如果要验证加密接口响应值正确性的话&…