基于Redis实现高性能延时消息队列

news2025/1/8 4:00:58

最近在倒腾自建博客后端系统,需要用到延时任务的功能,但手头只有一套MySQL和Redis,如果搞一套MQ成本有点大,于是想着用redis实现延时消息队列。有些场景用数据库的定时扫表也能简单实现延时消息的功能,不过对于我这边的实际场景(比如计数系统)其实是把数据存到redis中,如果用数据库实现延时消息会对数据库有比较大的压力。

系统设计

 

数据结构设计

事件消息体

type EventEntity struct {
	EventId    int64
	Topic      string
	Body       string
	EffectTime time.Time
}
复制代码
  • EVENT_POOL: 使用redis的hash,里面存储了任务事件的完整信息,key=prefix+namespace+topic,field=EventId, val=EventEntity;
  • EVENT_BUCKET: 使用redis的zset,里面存储了任务事件的有序集合,key=prefix+namespace+topic,score=EffectTime, member=EventId;
  • EVENT_QUEUE: 使用redis的list, list中存储了到期待消费任务的EventId。

延迟队列的执行流程

1、当有新增延时任务过来时,会在EVENT_POOL对应的topic中添加一条记录,同时也会把任务添加到EVENT_BUCKET中,按生效时间排序;

2、搬运线程会定时扫描EVENT_BUCKET中已经到期的任务,将这些任务push到EVENT_QUEUE对应topic的队列当中,之后将这些任务从EVENT_BUCKET中删除;

3、EVENT_QUEUE每个topic会有一个监听线程,当发现当前topic队列中有待消费的任务,则会将任务pop出来,并从EVENT_POOL中查询任务详情,交给consumer消费。

代码实现

核心代码

发布延时任务

func (q *DelayQueue) PublishEvent(ctx context.Context, event *EventEntity) error {
	pipeline := q.redisClient.WithContext(ctx).Pipeline()
	defer pipeline.Close()

    // 向EVENT_POOL中添加任务
	pipeline.HSet(q.genPoolKey(event.Topic), strconv.FormatInt(event.EventId, 10), util.ToJsonString(event))
	// 将任务id添加到EVENT_BUCKET中,按生效时间排序
	pipeline.ZAdd(q.genBucketKey(event.Topic), redis.Z{
		Member: strconv.FormatInt(event.EventId, 10),
		Score:  float64(event.EffectTime.Unix()),
	})
	_, err := pipeline.Exec()
	if err != nil {
		logs.CtxWarn(ctx, "pipeline.Exec", logs.String("err", err.Error()))
		return err
	}
	return nil
}
复制代码

搬运线程扫描到期任务

func (q *DelayQueue) carryEventToQueue(topic string) error {
	ctx := context.Background()
	// 扫描zset中到期的任务
	members, err := q.redisClient.WithContext(ctx).ZRangeByScoreWithScores(q.genBucketKey(topic), redis.ZRangeBy{Min: "0", Max: util.ToString(time.Now().Unix())}).Result()
	if err != nil && err != redis.Nil {
		logs.CtxWarn(ctx, "[carryEventToQueue] ZRangeByScoreWithScores", logs.String("err", err.Error()))
		return err
	}
	if len(members) == 0 {
		return nil
	}

	errMap := make(map[string]error)
	// 将任务添加到对应topic的待消费队列里
	for _, m := range members {
		eventId := m.Member.(string)
		err = q.redisClient.WithContext(ctx).LPush(q.genQueueKey(topic), eventId).Err()
		if err != nil {
			logs.CtxWarn(ctx, "[carryEventToQueue] LPush", logs.String("err", err.Error()))
			errMap[eventId] = err
		}
	}

	// 从Bucket中删除已进入待消费队列的事件
	var doneMembers []interface{}
	for _, m := range members {
		eventId := m.Member.(string)
		if _, ok := errMap[eventId]; !ok {
			doneMembers = append(doneMembers, eventId)
		}
	}
	if len(doneMembers) == 0 {
		return nil
	}

	err = q.redisClient.WithContext(ctx).ZRem(q.genBucketKey(topic), doneMembers...).Err()
	if err != nil {
		logs.CtxWarn(ctx, "[carryEventToQueue] ZRem", logs.String("err", err.Error()))
	}
	return nil
}

复制代码

监听线程消费任务

这里使用了List的BLPop命令,当有数据时会立即返回,没有数据则会一直阻塞直到有数据过来;这样可以避免定时扫描列表浪费资源。

func (q *DelayQueue) runConsumer(topic string, subscriberList []IEventSubscriber) error {
	for {
		ctx := context.Background()
		kvPair, err := q.redisClient.WithContext(ctx).BLPop(60*time.Second, q.genQueueKey(topic)).Result()
		if err != nil {
			logs.CtxWarn(ctx, "[InitOnce] BLPop", logs.String("err", err.Error()))
			continue
		}
		if len(kvPair) < 2 {
			continue
		}

		eventId := kvPair[1]
		data, err := q.redisClient.WithContext(ctx).HGet(q.genPoolKey(topic), eventId).Result()
		if err != nil && err != redis.Nil {
			logs.CtxWarn(ctx, "[InitOnce] HGet", logs.String("err", err.Error()))
			if q.persistFn != nil {
				_ = q.persistFn(&EventEntity{
					EventId: util.String2Int64(eventId),
					Topic:   topic,
				})
			}
			continue
		}
		event := &EventEntity{}
		_ = jsoniter.UnmarshalFromString(data, event)

		for _, subscriber := range subscriberList {
			util.Retry(3, 0, func() (success bool) {
				err = subscriber.Handle(ctx, event)
				if err != nil {
					logs.CtxWarn(ctx, "[InitOnce] subscriber.Handle", logs.String("err", err.Error()))
					return false
				}
				return true
			})
		}

		err = q.redisClient.WithContext(ctx).HDel(q.genPoolKey(topic), eventId).Err()
		if err != nil {
			logs.CtxWarn(ctx, "[InitOnce] HDel", logs.String("err", err.Error()))
		}
	}
}
复制代码

其他

1、优雅关闭

DelayQueue对象中使用wg、isRunning、stop三个变量来实现优雅关闭,具体可参考源码。

type DelayQueue struct {
	namespace   string
	redisClient *redis.Client
	once        sync.Once
	wg          sync.WaitGroup
	isRunning   int32
	stop        chan struct{}
	persistFn   PersistFn
}
复制代码
// gracefully shudown
func (q *DelayQueue) ShutDown() {
	if !atomic.CompareAndSwapInt32(&q.isRunning, 1, 0) {
		return
	}
	close(q.stop)
	q.wg.Wait()
}
复制代码

2、消费失败后持久化任务

可为DelayQueue对象设置持久化方法persistFn,用来在监听线程消费任务失败时将任务id持久化以便人工处理。

...

q.redisClient.WithContext(ctx).HGet(q.genPoolKey(topic), eventId).Result()
if err != nil && err != redis.Nil {
	logs.CtxWarn(ctx, "[InitOnce] HGet", logs.String("err", err.Error()))
	if q.persistFn != nil {
		_ = q.persistFn(&EventEntity{
			EventId: util.String2Int64(eventId),
			Topic:   topic,
		})
	}
	continue
}

...
复制代码

源码地址

redis_delay_queue: github.com/hudingyu/re…

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/73547.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Bitmap,布隆过滤器初步了解

布隆过滤器使用教程 文章目录布隆过滤器使用教程1.背景2.什么是Bitmap3.布隆过滤器3.1 什么是布隆过滤器3.2 布隆过滤器的作用3.3 布隆过滤器的基本原理4.布隆过滤器的实现Guava和Redisson4.1 实现思路4.2 SpringBoot实现这些操作Bitmap,guava,redisson布隆过滤器1.背景 最近公…

redis知识点汇总

一、Redis的数据类型和数据结构 1、Redis五种数据类型 String&#xff08;字符串&#xff09;、List&#xff08;列表&#xff09;、Hash&#xff08;哈希&#xff09;、Set&#xff08;集合&#xff09;和Sorted Set&#xff08;有序集合&#xff09;。 2、Redis的底层数据…

C# 数据类型分值类型及引用类型

一 程序中的变量与常量 程序的基本任务是&#xff1a;对数据进行处理&#xff1b; 数据分为变量(variable)与常量(literal) int age18; 变量是值可以改变&#xff0c;本质上是内存的空间&#xff0c;用来存储信息 常量的值是固定的&#xff0c;直接写出来的&#xff0c;称字面…

点击按钮,下载文件

实现文件的下载功能 1、使用a标签 直接下载仅适用于浏览器无法识别的文件。 如果是浏览器支持的文件格式&#xff0c;如html、jpg、png、pdf等&#xff0c;则不会触发文件下载&#xff0c;而是直接被浏览器解析并展示 <ahref"http://xxxxxx.rar"download>下载…

vue中的性能优化

文章目录一、Vue为什么要做性能优化二、如何做vue的性能优化1. 网络请求优化link标签项目静态资源压缩懒加载利用浏览器的缓存机制高效复用项目文件总结2. 代码优化3. 用户体验优化场景1场景2一、Vue为什么要做性能优化 性能优化的目的是使网站的加载速度更快&#xff0c;用户…

【语音处理】基于自适应差分脉冲编码调制(ADPCM)的实现研究附Matlab代码

✅作者简介&#xff1a;热爱科研的Matlab仿真开发者&#xff0c;修心和技术同步进步&#xff0c;matlab项目目标合作可私信。 &#x1f34e;个人主页&#xff1a;Matlab科研工作室 &#x1f34a;个人信息&#xff1a;格物致知。 更多Matlab仿真内容点击&#x1f447; 智能优化算…

音视频直播系统之 WebRTC 中的协议UDP、TCP、RTP、RTCP详解

一、UDP/TCP 如果让你自己开发一套实时互动直播系统&#xff0c;在选择网络传输协议时&#xff0c;你会选择使用UDP协议还是TCP协议 假如使用 TCP 会怎样呢&#xff1f;在极端网络情况下&#xff0c;TCP 为了传输的可靠性&#xff0c;将会进行反复重发信息的操作 在 TCP 协议…

Nagios篇之Nagios服务关联飞书实现消息告警

一、前言 通常情况下&#xff0c;我们在利用Nagios监控来做服务器监控时&#xff0c;告警是必不可少的&#xff0c;以便于运维人员能够及时发现异常&#xff0c;进而处理问题&#xff0c;所以关联Nagios就变得极为重要。 Nagios关联告警的形式很多&#xff0c;可以进行短信推送…

wpf-ListView中放置可动态调节范围的刻度尺

需求描述 某个ListView占据整个窗口&#xff0c;当窗口的宽度发生改变时&#xff0c;某一列中显示的、某一行的字符数目&#xff0c;能跟随窗口宽度变化而增减。 下面是我做完的效果&#xff1a;&#xff08;只展示窗口的一部分&#xff09; 此时是窗口缩放的极限&#xff0…

为什么全光谱台灯对孩子眼睛好呢?台灯全光谱到底是什么意思

相信大家在购买台灯时有经常看到“全光谱”、“高显色”等关键词&#xff0c;其实这指的是台灯的某方面特性&#xff0c;所谓全光谱&#xff0c;就是指光线的光谱成分完全&#xff0c;与自然光别无二致。 我们都知道&#xff0c;一束自然太阳光不是由某个单一成分构成&#xff…

Briefings in Bioinformatics2021 | 药物挖掘分子设计--生成模型综述

原文标题&#xff1a;Molecular design in drug discovery: a comprehensive review of deep generative models 论文地址&#xff1a;Molecular design in drug discovery: a comprehensive review of deep generative models | Briefings in Bioinformatics | Oxford Academ…

35_DMA基本原理

目录 DMA简介 DMA框图 STM32的DMA有一下一些特征 DMA1控制器 DMA处理 数据方向 仲裁器 DMA通道 可编程的数据量 指针增量 循环模式 存储器到存储器模式 通道传输数据量 中断 通道配置过程 DMA简介 DMA全称Direct Memroy Access, 既直接存储器访问。 DMA传输将…

重编内核导致ubuntu有线连接不出现的问题

网卡是intel的i225v 千兆网卡&#xff0c;系统为ubuntu18.0.4&#xff0c;原始内核为5.4.0-135-generic&#xff0c;但是重新编译出错&#xff0c;不知道少了什么东西&#xff0c;也没去深究&#xff0c;重新下载了5.9.0的内核&#xff1b;结果重新编译内核重启有线网卡就不能用…

2.MyBatis环境搭建

数据准备 CREATE TABLE user (id int(11) NOT NULL,username varchar(30) NOT NULL,sex varchar(1) NOT NULL,birthday varchar(10) NOT NULL,address varchar(100) NOT NULL,PRIMARY KEY (id) ) ENGINEInnoDB DEFAULT CHARSETutf8;insert into user values(1,"steven&qu…

ADI Blackfin DSP处理器-BF533的开发详解2:开发环境的搭建

软硬件开发环境的搭建 纯流程化的东西&#xff0c;没什么技术含量&#xff0c;照着做就行了。 开发板和仿真器进行物理链接&#xff0c;也就是插上JTAG头。 特别特别特别注意&#xff0c;仿真器和开发板均不上电的情况下插JTAG头&#xff0c;不要带电插JTAG头&#xff0c;你…

ORB-SLAM2 ---- Frame::GetFeaturesInArea函数

目录 1.函数用处 2.步骤 3.code 4.函数解释 4.1 函数思想 4.2 代码解释 1.函数用处 找到在 以为中心&#xff0c;半径为的圆形内且金字塔层级在的特征点。 2.步骤 Step 1 计算半径为r圆左右上下边界所在的网格列和行的id Step 2 遍历圆形区域内的所有网格&#xff0c…

代码随想录Day44|完全背包、518.零钱兑换II、377.组合总和IV

文章目录完全背包518.零钱兑换II377.组合总和IV完全背包 文章链接:代码随想录 (programmercarl.com) 背包最大重量为4。 物品为&#xff1a;如果求组合数就是外层for循环遍历物品&#xff0c;内层for遍历背包。 如果求排列数就是外层for遍历背包&#xff0c;内层for循环遍历…

Linux多线程C++版(九) 线程同步和互斥-----线程信号量

目录1.基本概念2.信号量创建和销毁3.信号量加和减操作4.代码理解信号量5.信号量实例银行账户取款----实现互斥6.信号量实例计算和取结果----实现线程同步1.基本概念 信号量从本质上是一个非负整数计数器&#xff0c;是共享资源的的数目&#xff0c;通常被用来控制对共享资源的…

[附源码]JAVA毕业设计同学录网站(系统+LW)

[附源码]JAVA毕业设计同学录网站&#xff08;系统LW&#xff09; 项目运行 环境项配置&#xff1a; Jdk1.8 Tomcat8.5 Mysql HBuilderX&#xff08;Webstorm也行&#xff09; Eclispe&#xff08;IntelliJ IDEA,Eclispe,MyEclispe,Sts都支持&#xff09;。 项目技术&#…

从源码出发剖解正则表达式

✨✨hello&#xff0c;愿意点进来的小伙伴们&#xff0c;你们好呐&#xff01; &#x1f43b;&#x1f43b;系列专栏&#xff1a;【JavaSE】 &#x1f432;&#x1f432;本篇内容&#xff1a;详解正则表达式 &#x1f42f;&#x1f42f;作者简介:一名现大二的三非编程小白&…