教你如何基于Redis来实现高性能延时消息队列!

news2025/1/10 3:15:47

最近在倒腾自建博客后端系统,需要用到延时任务的功能,但手头只有一套MySQL和Redis,如果搞一套MQ成本有点大,于是想着用redis实现延时消息队列。有些场景用数据库的定时扫表也能简单实现延时消息的功能,不过对于我这边的实际场景(比如计数系统)其实是把数据存到redis中,如果用数据库实现延时消息会对数据库有比较大的压力。

系统设计

这里参考了有赞的延迟队列设计

数据结构设计

事件消息体

type EventEntity struct {
	EventId    int64
	Topic      string
	Body       string
	EffectTime time.Time
}
复制代码
  • EVENT_POOL: 使用redis的hash,里面存储了任务事件的完整信息,key=prefix+namespace+topic,field=EventId, val=EventEntity;
  • EVENT_BUCKET: 使用redis的zset,里面存储了任务事件的有序集合,key=prefix+namespace+topic,score=EffectTime, member=EventId;
  • EVENT_QUEUE: 使用redis的list, list中存储了到期待消费任务的EventId。

延迟队列的执行流程

1、当有新增延时任务过来时,会在EVENT_POOL对应的topic中添加一条记录,同时也会把任务添加到EVENT_BUCKET中,按生效时间排序;

2、搬运线程会定时扫描EVENT_BUCKET中已经到期的任务,将这些任务push到EVENT_QUEUE对应topic的队列当中,之后将这些任务从EVENT_BUCKET中删除;

3、EVENT_QUEUE每个topic会有一个监听线程,当发现当前topic队列中有待消费的任务,则会将任务pop出来,并从EVENT_POOL中查询任务详情,交给consumer消费。

代码实现

核心代码

发布延时任务

func (q *DelayQueue) PublishEvent(ctx context.Context, event *EventEntity) error {
	pipeline := q.redisClient.WithContext(ctx).Pipeline()
	defer pipeline.Close()

    // 向EVENT_POOL中添加任务
	pipeline.HSet(q.genPoolKey(event.Topic), strconv.FormatInt(event.EventId, 10), util.ToJsonString(event))
	// 将任务id添加到EVENT_BUCKET中,按生效时间排序
	pipeline.ZAdd(q.genBucketKey(event.Topic), redis.Z{
		Member: strconv.FormatInt(event.EventId, 10),
		Score:  float64(event.EffectTime.Unix()),
	})
	_, err := pipeline.Exec()
	if err != nil {
		logs.CtxWarn(ctx, "pipeline.Exec", logs.String("err", err.Error()))
		return err
	}
	return nil
}
复制代码

搬运线程扫描到期任务

func (q *DelayQueue) carryEventToQueue(topic string) error {
	ctx := context.Background()
	// 扫描zset中到期的任务
	members, err := q.redisClient.WithContext(ctx).ZRangeByScoreWithScores(q.genBucketKey(topic), redis.ZRangeBy{Min: "0", Max: util.ToString(time.Now().Unix())}).Result()
	if err != nil && err != redis.Nil {
		logs.CtxWarn(ctx, "[carryEventToQueue] ZRangeByScoreWithScores", logs.String("err", err.Error()))
		return err
	}
	if len(members) == 0 {
		return nil
	}

	errMap := make(map[string]error)
	// 将任务添加到对应topic的待消费队列里
	for _, m := range members {
		eventId := m.Member.(string)
		err = q.redisClient.WithContext(ctx).LPush(q.genQueueKey(topic), eventId).Err()
		if err != nil {
			logs.CtxWarn(ctx, "[carryEventToQueue] LPush", logs.String("err", err.Error()))
			errMap[eventId] = err
		}
	}

	// 从Bucket中删除已进入待消费队列的事件
	var doneMembers []interface{}
	for _, m := range members {
		eventId := m.Member.(string)
		if _, ok := errMap[eventId]; !ok {
			doneMembers = append(doneMembers, eventId)
		}
	}
	if len(doneMembers) == 0 {
		return nil
	}

	err = q.redisClient.WithContext(ctx).ZRem(q.genBucketKey(topic), doneMembers...).Err()
	if err != nil {
		logs.CtxWarn(ctx, "[carryEventToQueue] ZRem", logs.String("err", err.Error()))
	}
	return nil
}

复制代码

监听线程消费任务

这里使用了List的BLPop命令,当有数据时会立即返回,没有数据则会一直阻塞直到有数据过来;这样可以避免定时扫描列表浪费资源。

func (q *DelayQueue) runConsumer(topic string, subscriberList []IEventSubscriber) error {
	for {
		ctx := context.Background()
		kvPair, err := q.redisClient.WithContext(ctx).BLPop(60*time.Second, q.genQueueKey(topic)).Result()
		if err != nil {
			logs.CtxWarn(ctx, "[InitOnce] BLPop", logs.String("err", err.Error()))
			continue
		}
		if len(kvPair) < 2 {
			continue
		}

		eventId := kvPair[1]
		data, err := q.redisClient.WithContext(ctx).HGet(q.genPoolKey(topic), eventId).Result()
		if err != nil && err != redis.Nil {
			logs.CtxWarn(ctx, "[InitOnce] HGet", logs.String("err", err.Error()))
			if q.persistFn != nil {
				_ = q.persistFn(&EventEntity{
					EventId: util.String2Int64(eventId),
					Topic:   topic,
				})
			}
			continue
		}
		event := &EventEntity{}
		_ = jsoniter.UnmarshalFromString(data, event)

		for _, subscriber := range subscriberList {
			util.Retry(3, 0, func() (success bool) {
				err = subscriber.Handle(ctx, event)
				if err != nil {
					logs.CtxWarn(ctx, "[InitOnce] subscriber.Handle", logs.String("err", err.Error()))
					return false
				}
				return true
			})
		}

		err = q.redisClient.WithContext(ctx).HDel(q.genPoolKey(topic), eventId).Err()
		if err != nil {
			logs.CtxWarn(ctx, "[InitOnce] HDel", logs.String("err", err.Error()))
		}
	}
}
复制代码

其他

1、优雅关闭

DelayQueue对象中使用wg、isRunning、stop三个变量来实现优雅关闭,具体可参考源码。

type DelayQueue struct {
	namespace   string
	redisClient *redis.Client
	once        sync.Once
	wg          sync.WaitGroup
	isRunning   int32
	stop        chan struct{}
	persistFn   PersistFn
}
复制代码
// gracefully shudown
func (q *DelayQueue) ShutDown() {
	if !atomic.CompareAndSwapInt32(&q.isRunning, 1, 0) {
		return
	}
	close(q.stop)
	q.wg.Wait()
}
复制代码

2、消费失败后持久化任务

可为DelayQueue对象设置持久化方法persistFn,用来在监听线程消费任务失败时将任务id持久化以便人工处理。

...

q.redisClient.WithContext(ctx).HGet(q.genPoolKey(topic), eventId).Result()
if err != nil && err != redis.Nil {
	logs.CtxWarn(ctx, "[InitOnce] HGet", logs.String("err", err.Error()))
	if q.persistFn != nil {
		_ = q.persistFn(&EventEntity{
			EventId: util.String2Int64(eventId),
			Topic:   topic,
		})
	}
	continue
}

...
复制代码

源码地址

redis_delay_queue: github.com/hudingyu/re…

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/71238.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Frida-Dexdump 脱壳工具下载使用以及相关技术介绍

Frida-Dexdump 脱壳工具下载使用以及相关技术介绍 文章目录Frida-Dexdump 脱壳工具下载使用以及相关技术介绍前言一、查壳、反编译、APK工具推荐二、查壳1.方式12.方式二三、脱壳1.启动frida服务2.方式一3.方式二四、反编译总结前言 本案例使用的App是&#xff1a;引力播.apk&…

多无人机空中机器人施工任务分配(Matlab代码实现)

目录 &#x1f4a5;1 概述 &#x1f4da;2 运行结果 &#x1f389;3 参考文献 &#x1f468;‍&#x1f4bb;4 Matlab代码 &#x1f4a5;1 概述 空中机器人作为近年来新兴的热点得到了广泛的关注。小型空中机器人在没有外界卫星定位信号的前提下的导航是空中机器人的研究内…

cmip6数据处理、动力降尺度、统计降尺度、制备CMIP6的WRF驱动数据

收录了CMIP6数据处理方法&#xff0c;典型案例分析实践过程中出现的一些问题&#xff0c;及技术&#xff08;下拉查看&#xff09; 国际耦合模式比较计划进入新的阶段——第六阶段&#xff08;CMIP6&#xff09;&#xff0c;这将为气候变化研究领域提供更丰富的全球气候模式数…

Python字符串格式化的三种方式

Python格式化的三种方式 根据类型定义的格式化 - %s 字符串格式化使用操作符 % 来实现&#xff0c; 示例 my name is %s,my age is %s % (neo, 18) 格式符: %s 连接符&#xff1a;格式化字符串与格式符变量之间用一个 % 连接&#xff0c; % 两边各有一个空格 附&#xff1a;…

08、SpringBoot入门简介

1、简介 Spring Boot 是由 Pivotal 团队提供的全新框架&#xff0c;其设计目的是用来简化新 Spring 应用的初始搭建以及开发过程。 人们把 Spring Boot 称为搭建程序的脚手架。其最主要作用就是帮我们快速的构建庞大的 Spring 项目&#xff0c;并且尽可能的减少一切 XML 配置…

我用Echarts图表分析巴西队历年战绩,预测卡塔尔世界杯能否夺冠

&#x1f431;个人主页&#xff1a;不叫猫先生 &#x1f64b;‍♂️作者简介&#xff1a;前端领域新星创作者、华为云享专家、阿里云专家博主&#xff0c;专注于前端各领域技术&#xff0c;共同学习共同进步&#xff0c;一起加油呀&#xff01; &#x1f4ab;系列专栏&#xff…

baostock量化怎样下载十档行情数据?

baostock量化对数据的下载其实就是通过计算的方式去决策股票的买卖。目前根据量化计算方式其实跟量子计算一点关系都没有。那么&#xff0c;都说在股票量化交易过程中&#xff0c;可以利用l2股票接口来获取策略选股的方案是很普遍的&#xff0c;利用数据接口下载十档行情&#…

[附源码]Python计算机毕业设计SSM基于协同过滤算法的甜品推荐系统(程序+LW)

项目运行 环境配置&#xff1a; Jdk1.8 Tomcat7.0 Mysql HBuilderX&#xff08;Webstorm也行&#xff09; Eclispe&#xff08;IntelliJ IDEA,Eclispe,MyEclispe,Sts都支持&#xff09;。 项目技术&#xff1a; SSM mybatis Maven Vue 等等组成&#xff0c;B/S模式 M…

Echart柱状图表排名

var charts { // 按顺序排列从大到小 cityList: [38号点, 8号点, 15号点, 16号点, 24号点], cityData: [7500, 6200, 5700, 4200, 3500] } var top10CityList charts.cityList var top10CityData charts.cityData var color [#ff9500, #02d8f9, #027fff] var color…

CAS:1351272-41-7;[1-(4-乙烯基苯基)-1,2,2-三苯基]乙烯;AIE材料

中文名称:[1-(4-乙烯基苯基)-1,2,2-三苯基]乙烯 英文名称:(2-(4-vinylphenyl)ethene-1,1,2-triyl)tribenzene CAS:1351272-41-7 分子式:C28H22 分子量:358.47 沸点:460.445.0 C(Predicted) 密度:1.0810.06 g/cm3(Predicted) 用途:仅用于科研,不用于人体 提供提下定制合成…

算法 KECP 被顶会 EMNLP 收录,极少训练数据就能实现机器阅读理解

作者&#xff1a;王嘉宁、汪诚愚、邱明辉、石秋慧、王洪彬、黄俊、高明 近日&#xff0c;阿里云机器学习平台 PAI 与华东师范大学高明教授团队合作在自然语言处理顶级会议 EMNLP2022 上发表基于 Prompt-Tuning 的小样本机器阅读理解算法 KECP&#xff08;Knowledge Enhanced C…

【人工智能】体验一下ChatGPT

体验一下ChatGPT 1.体验地址 chatGPT 注册openai的账号 注意:如果注册过程中一直不成功,可以清清缓存 2.接受短信的手机号 得有个国外手机号&#xff0c;或者1块钱去sms-activate.org注册一个虚拟的手机号 3.功能 最近OpenAI 发布了备受期待的原型通用 ChatGPT&#xf…

一段eslint jsx-a11y/anchor-is-valid警告背后的原因

我们在做React项目时&#xff0c;经常用到a标签做一些跳转动作&#xff0c;但是每次eslint都要提示 jsx-a11y/anchor-is-valid这段代码&#xff0c;我们可以通过/* eslint-disable jsx-a11y/anchor-is-valid */把这个规则屏蔽掉&#xff0c;但是显然不够优化&#xff0c;那到底…

8_2、Java基本语法之多线程的两种创建方式(jdk5之前)

一、前言 Java语言的JVM允许程序运行多个线程&#xff0c;它通过java.lang.Thread 类来体现。 二、JDK1.5之前创建新执行线程有两种方法 继承Thread类的方式 实现Runnable接口的方式 三、继承Thread类的方式 1、使用继承Thread类的方式创建一个线程&#xff1a; ①.创建一个…

c++17可变参函数模板详解

c语言中对于 可变参数的处理是用va_list等一系列宏去做的 他只会生成一个函数 但是理解起来非常麻烦 因为你不得不去了解很多关于汇编层面栈帧的知识 c对于可变参数函数模板进行了改进 他会生成多个函数 而不是在一个函数里玩 个人觉得c这种方式更加先进而且更好理解 接下来让…

线性代数的本质

注&#xff1a;目前没有精力去美化排版&#xff0c;所有博客仅作为自己学习记录所用 《线性代数的本质》课程链接&#xff08;bilibili&#xff09; 目录&#xff1a; P1-P4的内容&#xff1a; 1.线性代数的加法&#xff1a;为什么这样子来定义呢&#xff08;如图&#xff…

(附源码)SSM人力资源管理系统 毕业设计 271621

SSM人力资源管理系统 摘 要 科技进步的飞速发展引起人们日常生活的巨大变化&#xff0c;电子信息技术的飞速发展使得电子信息技术的各个领域的应用水平得到普及和应用。信息时代的到来已成为不可阻挡的时尚潮流&#xff0c;人类发展的历史正进入一个新时代。在现实运用中&#…

Servlet(二):Servlet的运行原理HttpServlet、HttpServletRequest、HttpServletResponse类详解

Servlet运行原理Servlet API详解HttpServlet类HttpServletRequest类HttpServletResponse类Servlet API详解 Servlet API中包含了很多的内容&#xff0c;但我们主要用到的是以下三个类&#xff0c;HttpServlet&#xff0c;HttpServletRequest&#xff0c;HttpServletResponse …

2022-04-10-Docker

layout: post #标题配置 title: Docker #时间配置 date: 2022-04-10 22:50:00 0800 #目录配置 categories: Docker #标签配置 tag: 学习笔记 content {:toc} Docker 1.初识 Docker 1.1 docker概念 我们写的代码会接触到好几个环境&#xff1a;开发环境、测试环境以及生产环…

祖师爷香农,到底有多神?

1916年&#xff0c;第一次世界大战激战正酣。在这一年的4月30日&#xff0c;远离战场的美国密歇根州佩托斯基&#xff08;Petoskey&#xff09;&#xff0c;一个男婴呱呱坠地。这个男婴&#xff0c;就是我们这篇文章的主角——香农。香农的全名&#xff0c;叫做克劳德艾尔伍德香…