Golang RabbitMQ实现的延时队列

news2024/12/27 10:34:51

文章目录

  • 前言
  • 一、延时队列与应用场景
  • 二、RabbitMQ如何实现延时队列
    • 实现延时队列的基本要素
    • 整体的实现原理如下
  • 三、Go语言实战
    • 生产者
    • 消费者


前言

之前做秒杀商城项目的时候使用到了延时队列来解决订单超时问题,本博客就总结一下Golang是如何利用RabbitMQ实现的延时队列的。

一、延时队列与应用场景

延迟队列是一种特殊类型的消息队列,用于在一定时间后将消息投递给消费者。它可以用于处理需要延迟执行的任务或者具有定时特性的业务场景。使用延迟队列可以灵活地控制消息的发送和处理时间,适用于很多场景,如订单超时处理、提醒任务等

具体应用场景有如下:

  1. 订单取消:当订单生成时,将订单消息发送到延迟队列中,并设置延迟时间为十分钟。消费者在十分钟后接收到订单消息并进行关闭操作。

  2. 店铺商品提醒:在店铺创建时,将提醒消息发送到延迟队列中,并设置延迟时间为十天。消费者在十天后接收到消息并发送提醒通知。

  3. 用户登录提醒:用户注册成功后,将提醒消息发送到延迟队列中,并设置延迟时间为三天。消费者在三天后接收到消息并发送短信提醒。

  4. 退款通知:当用户发起退款时,将通知消息发送到延迟队列中,并设置延迟时间为三天。消费者在三天后接收到消息并通知相关运营人员。

  5. 会议提醒:在会议预定时,将提醒消息发送到延迟队列中,并设置延迟时间为预定时间前十分钟。消费者在指定时间点前十分钟接收到消息并发送会议参加通知。

通过使用延迟队列,可以在指定的时间点触发任务,避免了轮询的低效方式,并且能够满足大量数据和时效性的需求。这种方法提供了更高的性能和实时性,并有效减轻了系统的负载。
下图是订单超时处理的流程图。
在这里插入图片描述

二、RabbitMQ如何实现延时队列

虽然 rabbitmq 没有延时队列的功能,但是稍微变动一下也是可以实现的。
通过设置消息的 TTL 和 DLX 等参数,可以将消息转发到一个指定的队列中,以便在一定的时间后再进行处理。

实现延时队列的基本要素

1、存在一个倒计时机制:Time To Live(TTL)
2、当到达时间点的时候会触发一个发送消息的事件:Dead Letter Exchanges(DLX)

基于第一点,我利用的是消息存在过期时间这一特性, 消息一旦过期就会变成dead letter,可以让单独的消息过期,也可以设置整个队列消息的过期时间 而rabbitmq会有限取两个值的最小
**基于第二点,**是用到了rabbitmq的过期消息处理机制: . x-dead-letter-exchange 将过期的消息发送到指定的 exchange 中 . x-dead-letter-routing-key 将过期的消息发送到自定的 route当中

整体的实现原理如下

发送者将消息发送到延时队列上并设置过期时间,当过期时间到达时,消息会被自动转发到指定的交换机和队列中供接收者消费。
1、建立与 RabbitMQ 服务器的连接并创建通道。
2、发送者通过 ch.Publish 方法将消息发送到延时队列(“test_delay”)上,设置消息的过期时间。
3、延时队列中的消息在到达过期时间后会自动被发送到 “logs” 交换机,由交换机将消息广播给所有绑定的队列。
4、接收者通过监听 “test_logs” 队列接收并处理消息。当有消息到达时,会触发回调函数进行处理。
也就是说要实现延时队列,消费者必须试实现两个队列。
一个是延时队列(“test_delay”),另一个是接收延时消息的队列(“test_logs”)。

这两个队列的作用如下:
延时队列(“test_delay”):这个队列用于接收需要延时发送的消息。发送者通过将消息发送到延时队列,设置消息的过期时间。当消息过期时,RabbitMQ 会自动将消息转发到指定的交换机和队列中。
接收延时消息的队列(“test_logs”):这个队列用于接收延时消息。在示例中,这个队列是通过将 “test_logs” 队列绑定到 “logs” 交换机上来实现的。交换机会将消息广播给所有绑定的队列,因此当延时消息到达过期时间后,会被发送到这个队列中供消费者进行处理。
通过使用两个队列,消息可以被延时发送到指定的队列,并在过期后自动转发到接收队列,实现了延时发送和消费的功能。

三、Go语言实战

生产者

首先建立与 RabbitMQ 服务器的连接,并创建一个通道。然后,通过 ch.Publish 方法将消息发送到延时队列上。这里使用的是空字符串作为交换机(exchange),表示不选择任何交换机,只将消息发送到指定的队列(“test_delay”)。在消息的属性中,设置了消息的过期时间为 5 秒。


func main() {
	conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
	failOnError(err, "Failed to connect to RabbitMQ")
	defer conn.Close()
	ch, err := conn.Channel()
	failOnError(err, "Failed to open a channel")
	defer ch.Close()
	body := "hello"
	// 将消息发送到延时队列上
	err = ch.Publish(
		"", 				// exchange 这里为空则不选择 exchange
		"test_delay",     	// routing key
		false,  			// mandatory
		false,  			// immediate
		amqp.Publishing{
			ContentType: "text/plain",
			Body:        []byte(body),
			Expiration: "5000",	// 设置五秒的过期时间
		})
	failOnError(err, "Failed to publish a message")

	log.Printf(" [x] Sent %s", body)
}

消费者

同样建立与 RabbitMQ 服务器的连接,并创建一个通道。然后,声明了一个名为 “logs” 的交换机,类型为 “fanout”,并且可持久化,表示该交换机会将消息广播给所有绑定的队列。接着,声明了一个常规的队列 “test_logs”,并将其绑定到 “logs” 交换机上。之后,声明了一个延时队列 “test_delay”,并设置了该队列的 x-dead-letter-exchange 参数为 “logs”,即当消息过期时将消息发送到 “logs” 交换机。最后,通过 ch.Consume 方法监听 “test_logs” 队列,并在回调函数中处理接收到的消息。


func main() {
	// 建立链接
	conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
	failOnError(err, "Failed to connect to RabbitMQ")
	defer conn.Close()

	ch, err := conn.Channel()
	failOnError(err, "Failed to open a channel")
	defer ch.Close()

	// 声明一个主要使用的 exchange
	err = ch.ExchangeDeclare(
		"logs",   // name
		"fanout", // type
		true,     // durable
		false,    // auto-deleted
		false,    // internal
		false,    // no-wait
		nil,      // arguments
	)
	failOnError(err, "Failed to declare an exchange")

	// 声明一个常规的队列, 其实这个也没必要声明,因为 exchange 会默认绑定一个队列
	q, err := ch.QueueDeclare(
		"test_logs",    // name
		false, // durable
		false, // delete when unused
		true,  // exclusive
		false, // no-wait
		nil,   // arguments
	)
	failOnError(err, "Failed to declare a queue")

    /**
     * 注意,这里是重点!!!!!
     * 声明一个延时队列, ß我们的延时消息就是要发送到这里
     */
	_, errDelay := ch.QueueDeclare(
		"test_delay",    // name
		false, // durable
		false, // delete when unused
		true,  // exclusive
		false, // no-wait
		amqp.Table{
			// 当消息过期时把消息发送到 logs 这个 exchange
			"x-dead-letter-exchange":"logs",
		},   // arguments
	)
	failOnError(errDelay, "Failed to declare a delay_queue")

	err = ch.QueueBind(
		q.Name, // queue name, 这里指的是 test_logs
		"",     // routing key
		"logs", // exchange
		false,
		nil)
	failOnError(err, "Failed to bind a queue")

	// 这里监听的是 test_logs
	msgs, err := ch.Consume(
		q.Name, // queue name, 这里指的是 test_logs
		"",     // consumer
		true,   // auto-ack
		false,  // exclusive
		false,  // no-local
		false,  // no-wait
		nil,    // args
	)
	failOnError(err, "Failed to register a consumer")

	forever := make(chan bool)

	go func() {
		for d := range msgs {
			log.Printf(" [x] %s", d.Body)
		}
	}()

	log.Printf(" [*] Waiting for logs. To exit press CTRL+C")
	<-forever
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/985165.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【MySQL基础】一条查询和更新语句的执行流程01-02

目录 MySQL的基本架构示意图连接器查询缓存分析器优化器执行器重要的日志模块&#xff1a;redo log重要的日志模块&#xff1a;binlog更新时redo log 和 binlog 两阶段提交 例如在执行下面这个查询语句&#xff1a; mysql> select * from T where ID10&#xff1b;MySQL的基…

Java反序列化之CommonsCollections CC1链分析

前言 cc链的研究可以说是非常适合java代码审计的入门篇了&#xff0c;十分考验java代码功力&#xff0c;其实也是基础功&#xff0c;跨过了这个门槛&#xff0c;在看看其他业务代码就会比较轻松了。不要说代码难&#xff0c;看不懂&#xff0c;作者也是刚入门java没几个月的小…

MATLAB中fillmissing函数用法

目录 语法 说明 示例 包含 NaN 值的向量 由 NaN 值组成的矩阵 插入缺失数据 使用移动中位数方法 使用自定义填充方法 包含缺失端点的矩阵 包含多个数据类型的表 fillmissing函数的功能是填充缺失的条目。 语法 F fillmissing(A,constant,v) F fillmissing(A,meth…

Redis 高可用及持久化

Redis 高可用 在web服务器中&#xff0c;高可用是指服务器可以正常访问的时间&#xff0c;衡量的标准是在多长时间内可以提供正常服务&#xff08;99.9%、99.99%、99.999%等等&#xff09;。但是在Redis语境中&#xff0c;高可用的含义似乎要宽泛一些&#xff0c;除了保证提供…

字节8年经验之谈 —— 冒烟测试、回归测试是什么?

冒烟测试&#xff08;Smoke Testing&#xff09;和回归测试&#xff08;Regression Testing&#xff09;是软件测试中常用的两种测试类型。 冒烟测试&#xff08;Smoke Testing&#xff09;&#xff1a;冒烟测试是在软件开发的早期阶段进行的一种表面级功能验证测试。它主要用…

监控系统典型架构

监控系统典型架构如下&#xff1a; 从左往右看&#xff1a; 采集器是负责采集监控数据的&#xff0c;采集到数据之后传输给服务端&#xff0c;通常是直接写入时序库。 对时序库的数据进行分析和可视化。 告警引擎产生告警事件之后交给告警发送模块做不同媒介的通知。 可视化比…

【月报】Aavegotchi 开发进度更新 - 2023 年 9 月

嗨&#xff0c;Gotchigang&#xff01; 又一个月过去了&#xff0c;我们距离让 Gotchi 游戏走向大众的梦想又近了一步&#xff01; 本月&#xff0c;Gotchi 开发人员正在进行紧张的编程工作&#xff0c;以赶上一些重要的截止日期。 在本月的开发更新中&#xff0c;我们将分享…

2023高教社杯全国大学生数学建模竞赛C题思路分析+代码+论文

如下为C君的2023高教社杯全国大学生数学建模竞赛C题思路分析代码论文 C题蔬菜类商品的自动定价与补货决策 在生鲜商超中&#xff0c;一般蔬菜类商品的保鲜期都比较短&#xff0c;且品相随销售时间的增加而变差, 大部分品种如当日未售出&#xff0c;隔日就无法再售。因此&…

Android图表开发---MPAndroidChart

本章内容主要是MPAndroidChart开源框架中的LineChart api com.github.PhilJay:MPAndroidChart:v3.1.0 <com.github.mikephil.charting.charts.LineChartandroid:id"id/lineChart"android:layout_width"match_parent"android:layout_height"330dp…

飞机降落(dfs全排列)

4957. 飞机降落 - AcWing题库 数据量很小&#xff0c;直接爆搜 #include<bits/stdc.h> using namespace std; const int N20; int n,t,flag,st[N];//st记录是否已经降落&#xff0c;flag标记是否降落完成 struct Node {int t,d,l; }node[N]; void dfs(int u,int last)/…

使用Flask-Restful后handle_error干扰无法正常捕获全局异常的解决

1、发现问题 1.1、追踪Api源码&#xff0c;vscode举例&#xff0c;右键点击Api&#xff0c;选择转到定义&#xff0c;确定flask_restful包的位置 from flask_restful import Api1.2、vscode 打开flask_restful包作为一个项目 1.3、之前的问题是&#xff0c;抛出的HTTPExceptio…

无涯教程-JavaScript - BESSELY函数

描述 BESSELY函数针对x的指定顺序和值返回Bessel函数Yn(x)(也称为Weber函数或Neumann函数)。 语法 BESSELY(X, N)争论 Argument描述Required/OptionalXThe value at which to evaluate the function.RequiredNThe order of the function. If n is not an integer, it is tr…

数据结构与算法之字符串

文章目录 1.字符串定义2.串的几个基本概念2.1 空串:2.2空格串2.3子串2.4串相等2.5串比较 3.串的基本操作(此处以java为例)3.1赋值操作StrAssign(s,t)3.2 连接操作 Concat(s,t)3.3求串长StrLength(s)3.4比较StrCompare(st)3.5 求子串_SubString(s,start,len) 4.串的存储结构4.1 …

j解决Ubuntu无法安装pycairo和PyGObject

环境&#xff1a;虚拟机Ubuntu20.04&#xff0c;vscode无法安装pycairo和PyGObject 虚拟机Ubuntu20.04&#xff0c;vscode中运行Anaconda搭建的vens 的Python3.8.10 首先在vscode中点击ctrlshiftp&#xff0c;选择Python3.8.10的环境&#xff0c;自动激活Python 最近在搞无人…

基于SSM的人才招聘系统

末尾获取源码 开发语言&#xff1a;Java Java开发工具&#xff1a;JDK1.8 后端框架&#xff1a;SSM 前端&#xff1a;采用JSP技术开发 数据库&#xff1a;MySQL5.7和Navicat管理工具结合 服务器&#xff1a;Tomcat8.5 开发软件&#xff1a;IDEA / Eclipse 是否Maven项目&#x…

WorkPlus打造统一用户管理平台,实现企业用户管理的一体化

在企业信息化的进程中&#xff0c;统一用户管理平台扮演着重要的角色。WorkPlus作为领先的品牌&#xff0c;致力于打造一体化的统一用户管理平台&#xff0c;帮助企业实现用户管理的便捷与高效。本文将重点介绍WorkPlus如何通过创新的解决方案&#xff0c;实现企业用户管理的统…

unity fbx动画按配置切割帧片段

主要参考该文章&#xff1a;人无两度s 《unity自动切割动画》 感谢作者分享 执行代码需要将模型与配置文件(.txt)放到同一目录下&#xff0c;批量选中模型后右键&#xff0c;代码中读取了选中的第一个模型同目录下可能存在的“动画帧分段.txt”&#xff0c;按其中的配置对选中…

[HNCTF 2022 Week1]——Web方向 详细Writeup

Week1 [HNCTF 2022 Week1]2048 f12查看源代码 可以看出游戏的分数是score 修改score的值 得到flag [HNCTF 2022 Week1]Interesting_include 得到源码 <?php //WEB手要懂得搜索 //flag in ./flag.phpif(isset($_GET[filter])){$file $_GET[filter];if(!preg_match(&qu…

axios封装/基础配置

步骤&#xff1a;装包 -> 封装axios实例 ->调用实例发送请求 1. 装包 npm install axios 2. 封装 axios基础配置 // axios实例封装 import axios from axios// 创建axios实例 const axiosInstance axios.create({baseURL:http://xxx.net, //基地址timeout:5000 //…

el-table 实现表、表格行、表格列合并

最近写vue开发项目的时候&#xff0c;很多地方用到了Element组件中的Table 表格。经过一周的边学边做&#xff0c;我总结了以下三种有关表格的合并方法。 一、合并表头 话不多说&#xff0c;先看效果图 代码如下&#xff1a; 表格结构如上&#xff0c;其中:header-cell-style对…