golang实现延迟队列(delay queue)

news2025/1/9 1:47:35

golang实现延迟队列

1 延迟队列:邮件提醒、订单自动取消

延迟队列:处理需要在未来某个特定时间执行的任务。这些任务被添加到队列中,并且指定了一个执行时间,只有达到指定的时间点时才能从队列中取出并执行。
应用场景:

  • 邮件提醒
  • 订单自动取消(超过多少时间未支付,就取消订单)
  • 对超时任务的处理等

由于任务的执行是在未来的某个时间点,因此这些任务不会立即执行,而是存储在队列中,直到它的预定执行时间才会被执行。

2 实现

2.1 simple简单版:go自带的time包实现

思路:

  1. 定义Task结构体,包含
  • ExecuteTime time.Time
  • Job func()
  1. 定义DelayQueue
  • TaskQueue []Task
  • func AddTask
  • func RemoveTask
  • ExecuteTask

这种方案存在的问题:

Go程序重启时,存储在slice中的延迟处理任务将全部丢失

完整代码:

package main

import (
	"fmt"
	"time"
)

/*
基于go实现延迟队列
*/
type Task struct {
	ExecuteTime time.Time
	Job         func()
}

type DelayQueue struct {
	Tasks []*Task
}

func (d *DelayQueue) AddTask(t *Task) {
	d.Tasks = append(d.Tasks, t)
}

func (d *DelayQueue) RemoveTask() {
	//FIFO: remove the first task to enqueue
	d.Tasks = d.Tasks[1:]
}

func (d *DelayQueue) ExecuteTask() {
	for len(d.Tasks) > 0 {
		//dequeue a task
		currentTask := d.Tasks[0]
		if time.Now().Before(currentTask.ExecuteTime) {
			//if the task execution time is not up, wait
			time.Sleep(currentTask.ExecuteTime.Sub(time.Now()))
		}
		//execute the task
		currentTask.Job()
		//remove task who has been executed
		d.RemoveTask()
	}

}

func main() {
	fmt.Println("start delayQueue")
	delayQueue := &DelayQueue{}
	firstTask := &Task{
		ExecuteTime: time.Now().Add(time.Second * 1),
		Job: func() {
			fmt.Println("executed task 1 after delay")
		},
	}
	delayQueue.AddTask(firstTask)
	secondTask := &Task{
		ExecuteTime: time.Now().Add(time.Second * 7),
		Job: func() {
			fmt.Println("executed task 2 after delay")
		},
	}
	delayQueue.AddTask(secondTask)
	delayQueue.ExecuteTask()
	fmt.Println("all tasks have been done!!!")
}

效果:
在这里插入图片描述

2.2 complex持久版:go+redis

为了防止Go重启后存储到delayQueue的数据丢失,我们可以将任务持久化到redis中。

思路:

  1. 初始化redis连接
  2. 延迟队列采用redis的zset(有序集合)实现

前置准备:

# 安装docker
yum install -y yum-utils
yum-config-manager \
    --add-repo \
    https://download.docker.com/linux/centos/docker-ce.repo
yum install docker
systemctl start docker

# docker搭建redis
mkdir -p /Users/ziyi2/docker-home/redis
docker run -d --name redis -v /Users/ziyi2/docker-home/redis:/data -p 6379:6379 redis

完整代码:

package main

import (
	"fmt"
	"github.com/go-redis/redis"
	log "github.com/ziyifast/log"
	"time"
)

/*
基于redis zset实现延迟队列
*/
var redisdb *redis.Client
var DelayQueueKey = "delay-queue"

func initClient() (err error) {
	redisdb = redis.NewClient(&redis.Options{
		Addr:     "localhost:6379",
		Password: "", // not set password
		DB:       0,  //use default db
	})
	_, err = redisdb.Ping().Result()
	if err != nil {
		log.Errorf("%v", err)
		return err
	}
	return nil
}

func main() {
	err := initClient()
	if err != nil {
		log.Errorf("init redis client err: %v", err)
		return
	}
	addTaskToQueue("task1", time.Now().Add(time.Second*3).Unix())
	addTaskToQueue("task2", time.Now().Add(time.Second*8).Unix())
	//执行队列中的任务
	getAndExecuteTask()
}

// executeTime为unix时间戳,作为zset中的score。允许redis按照task应该执行时间来进行排序
func addTaskToQueue(task string, executeTime int64) {
	err := redisdb.ZAdd(DelayQueueKey, redis.Z{
		Score:  float64(executeTime),
		Member: task,
	}).Err()
	if err != nil {
		panic(err)
	}
}

// 从redis中取一个task并执行
func getAndExecuteTask() {
	for {
		tasks, err := redisdb.ZRangeByScore(DelayQueueKey, redis.ZRangeBy{
			Min:    "-inf",
			Max:    fmt.Sprintf("%d", time.Now().Unix()),
			Offset: 0,
			Count:  1,
		}).Result()
		if err != nil {
			time.Sleep(time.Second * 1)
			continue
		}
		//处理任务
		for _, task := range tasks {
			fmt.Println("Execute task: ", task)
			//执行完任务之后用 ZREM 移除该任务
			redisdb.ZRem(DelayQueueKey, task)
		}
		time.Sleep(time.Second * 1)
	}
}

效果:

redis一直从延迟队列中取数据,如果处理完一批则睡眠1s

  • 具体根据大家的业务调整,此处主要介绍思路

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1462744.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

支付功能设计及实现思路

支付功能设计 主要包括:订单表,订单日志表,订单队列,定时任务。 主要考虑:事务性、幂等性、安全性。 表结构设计 订单表: 订单表,最主要的就是订单号、支付状态。 CREATE TABLE t_order (…

IOday1作业

1> 使用fgets统计给定文件的行数 2> 使用fputs和fgets完成两个文件的拷贝 3> 完成注册登录功能 做个小菜单,功能1是注册功能,输入注册账户和注册密码,将账户和密码写入文件中 功能2是登录功能,提示并输入登录账户和登录…

无人机设计技术,四旋翼无人机整机及控制系统技术浅谈

四旋翼无人机的飞行控制技术是无人机研究的重点之一。它使用直接力矩, 实现六自由度(位置与姿态)控制,具有多变量、非线性、强耦合和干扰敏感的特性。此外, 由于飞行过程中,微型飞行器同时受到多种物理效应的作用,还很容易受到气流等外部环境的干扰,模型准确性和传感…

LLM 模型融合实践指南:低成本构建高性能语言模型

编者按:随着大语言模型技术的快速发展,模型融合成为一种低成本但高性能的模型构建新途径。本文作者 Maxime Labonne 利用 mergekit 库探索了四种模型融合方法:SLERP、TIES、DARE和passthrough。通过配置示例和案例分析,作者详细阐…

一线城市打工人的大龄焦虑:都市容不下躯壳,老家容不下灵魂(含华为 OD 面试原题)

互联网的大龄焦虑 今天看到一个老生常谈的话题「大龄焦虑:都市容不下躯壳,老家容不下灵魂」。 现如今,内卷已不是互联网行业专属名称,早已渗透到一线城市中的各行各业。 但地域落差对职业的影响,互联网行业还是稳稳的位…

关于YOLOv5训练结果results.txt绘制对比图

目录 用两个results.txt在一幅图中画2条对比曲线 用一个results.txt在一幅图中画多条对比曲线: 用两个results.txt在一幅图中画2条对比曲线 # -*- coding:utf-8 -*- import matplotlib.pyplot as plt import numpy as npcolumn [epoch, train_GIOU_loss, train_o…

美国CFTC启用举报奖励机制!打击人工智能投资欺诈行为

最近几周,美国监管机构对欺诈者利用人工智能 (AI) 的说法来引诱投资者实施诈骗发出警告。掌握人工智能诈骗原始信息的个人可以匿名举报 ,并有资格根据商品期货交易委员会 (CFTC) 和证券交易委员会 (SEC) 举报计划获得金钱奖励。CFTC 关于人工智能诈骗的咨…

Docker部署Halo容器并结合内网穿透实现公网访问本地个人博客

文章目录 1. Docker部署Halo1.1 检查Docker版本如果未安装Docker可参考已安装Docker步骤:1.2 在Docker中部署Halo 2. Linux安装Cpolar2.1 打开服务器防火墙2.2 安装cpolar内网穿透 3. 配置Halo个人博客公网地址4. 固定Halo公网地址 本文主要介绍如何在CentOS 7系统使…

迷你世界制作甜甜圈

local R,r20,5--中心半径(内径与外径和的1/4)、扩充半径(高的一半) local x0,y0,z00,20,0--起点坐标 local d10000--单次遍历方块数 local id600--方块id local s,all,t-1,0,0--步骤、总方块数、循环 local demath.random(2,18)/2 local x,y,z0,0,0--当前坐标 local func…

毕业神刊!1区、CCF推荐SCI,最新26天录用!6天见刊

一手刊源 • 毕业快刊 春节一过,距离2024年上半年毕业的日子近在眼前,你的毕业论文已安排投稿了吗?时间着急的作者,不妨重点关注本期小编推荐的毕业快刊:无中科院预警记录,非三大出版社,指标情…

还在为选择办公软件而烦恼吗?不妨试试ONLYofficeV8.0

目录 一.优势一DOC 1.丰富的文字处理功能 2.按用户既定的规则编辑 3.使用AI助手 4.保持创意 5.深入分析文本 6.改善团队工作流程 7.轻松对比文档 8.扩展编辑功能 二.优势二sheet 1.数据分析 2.轻松实现精准计算 3.轻松分析数据 4.可视化呈现数据 5.增强团队协作…

通过盲注脚本复习sqllabs第46关 order by 注入

Less-46 从本关开始,我们开始学习 order by 相关注入的知识。本关的 sql 语句为$sql "SELECT * FROM users ORDER BY $id";尝试?sort1 desc或者 asc,显示结果不同,则表明可以注入。(升序 or 降序排列)从上述的 sql 语句中我们可以…

Netty-初体验

Netty 引入依赖服务端服务端处理读写业务的Handler客户端实现客户端handler Netty具备设计优雅、使⽤⽅便、性能强劲等优点&#xff0c; 引入依赖 <dependency><groupId>io.netty</groupId><artifactId>netty-all</artifactId><version>…

CVE-2024-0918 TEW-800MB RCE漏洞分析

漏洞描述 固件版本为1.0.1.0的TEW-800MB路由器存在命令注入漏洞。如果攻击者获得了web管理权限&#xff0c;他们可以将命令注入到httpd未知函数中的post请求参数DeviceURL中&#xff0c;从而获得shell权限。。 参考链接 TEW-800MB (notion.site)https://warp-desk-89d.notio…

盘点那些世界名校计算机专业采用的教材

清华、北大、MIT、CMU、斯坦福的学霸们在新学期里要学什么&#xff1f;今天我们来盘点一下那些世界名校计算机专业采用的教材。 书单目录 1.《深入理解计算机系统》&#xff08;原书第3版&#xff09;2. 《算法导论》&#xff08;原书第3版&#xff09;3. 《计算机程序的构造和…

【已解决】Zip压缩文件设置了密码怎么办?

Zip是一种常见的压缩格式文件&#xff0c;文件还可以设置密码保护。那设置了密码的Zip文件要如何打开呢&#xff1f;不清楚的小伙伴一起来看看吧。 想要打开带密码的Zip文件&#xff0c;我们需要用到适用于Zip格式的解压缩软件&#xff0c;比如WinRAR或者7-Zip软件都是可以的。…

【数据结构】_队列

目录 1.概念 2.队列的使用 3.队列模拟实现 4.循环队列 5.双端队列 6.栈与队列的互相实现 6.1 用队列实现栈 6.2 用栈实现队列 1.概念 &#xff08;1&#xff09;队列是只允许在一端进行插入数据操作&#xff0c;在另一端进行删除数据操作的特殊线性表&#xff1b; &am…

PyTorch深度学习实战(37)——CycleGAN详解与实现

PyTorch深度学习实战&#xff08;37&#xff09;——CycleGAN详解与实现 0. 前言1. CycleGAN 基本原理2. CycleGAN 模型分析3. 实现 CycleGAN小结系列链接 0. 前言 CycleGAN 是一种用于图像转换的生成对抗网络(Generative Adversarial Network, GAN)&#xff0c;可以在不需要配…

分布式事务-CAP定理+BASE理论+解决方案(两阶段提交+TCC事务)

事务就是指一个操作单元&#xff0c;操作单元中的所有操作要么同时成功要么同时失败。单体项目中的事务一般都是使用数据库提供的事务机制来完成的&#xff0c;但是分布式事务的事务参与者位于不同的节点上&#xff0c;也就是分布在不同的服务器上。分布式事务的最大问题就是各…

安卓游戏开发之音频技术优劣分析

一、引言 在安卓游戏开发中&#xff0c;音频处理技术扮演着至关重要的角色&#xff0c;它不仅能够增强游戏的沉浸感和玩家体验&#xff0c;还能通过声音效果传达关键的游戏信息。以下将对几种常见的安卓游戏音频处理技术进行优劣分析&#xff0c;并结合应用场景来阐述其特点。 …