利用 Redis 实现延迟队列(点赞场景)

news2025/1/9 15:53:11

🌈点赞场景在前段时间有很多人都在争论,我也看了一些视频和文档,最后觉得b站技术的这篇写得很好

【点个赞吧】 - B站千亿级点赞系统服务架构设计 - 哔哩哔哩

🌈所以我也尝试用 Redis 的延迟队列来写一个点赞处理的 demo(这都是基于高并发情况下),完全没有落地。后续有时间会专门写一个点赞的技术方案

🌈至于为什么选择 Redis 延迟队列而不是用常见的 MQ,因为我还在学习阶段,想更好地学习 Redis 的延迟队列

🌈由于只是个菜只因,接触到和学到的技术并不多,可能还有很多情况没有考虑到

目录

1. Redis 实现延迟队列的方案

2. Redis 过期事件监听实现延时任务

3. Redis 过期事件监听实现延时任务功能有什么缺陷

3.1. 时效性差

3.2. 丢消息

3.3. 多服务实例下存在消息重复消息的问题

4. 为什么选用Redission作为延迟队列

5. 使用 Redis 实现延时任务有什么注意的地方?

6. 如何使用 Redisson 实现延迟队列(点赞场景)

6.1. 基础配置

6.2. 延迟队列实战


1. Redis 实现延迟队列的方案

基于 Redis 实现延时任务的功能无非就下面两种方案:

  1. Redis 过期事件监听
  2. Redisson 内置的延时队列

这里选用的是用 Redission 内置的延迟队列,所以实现的着重点放在 Redission

2. Redis 过期事件监听实现延时任务

Redis 2.0 引入了 发布订阅(Pub/Sub) 功能。在 Pub/Sub 模型中,引入了一个名为 channel(频道) 的概念,类似于消息队列中的 topic(主题)

Pub/Sub 涉及两个主要角色:发布者(Publisher)订阅者(Subscriber,也称为消费者)

  • 发布者 通过 PUBLISH 命令将消息发送到指定的 channel
  • 订阅者 通过 SUBSCRIBE 命令订阅感兴趣的 channel,并且可以同时订阅一个或多个 channel

Pub/Sub 模式 中,生产者需要指定将消息发送到哪个 channel,而消费者通过订阅对应的 channel 来获取消息。Redis 内部也存在一些默认的 channel,这些通道用于 Redis 自身发送消息,而非用户代码生成。只需监听这些 channel,即可获取与 过期 key 相关的通知,从而实现延时任务的功能。

这一特性被 Redis 官方称为 Keyspace Notifications,其主要作用是 实时监控 Redis 中键和值的变化。通过它,开发者能够及时捕捉键的变化(如过期、删除等事件),从而执行相应的处理逻辑

3. Redis 过期事件监听实现延时任务功能有什么缺陷

3.1. 时效性差

官方文档的一段介绍解释了时效性差的原因,地址:

Redis keyspace notifications | Docs

Redis 中的 过期事件消息 只有在 Redis 服务器真正删除 key 时才会发布,而不是在 key 到达过期时间后立即发布。

常见的过期数据删除策略有两种:

  1. 惰性删除:仅当访问 key 时,才会检查其是否过期。这种方式对 CPU 友好,但可能导致大量过期 key 未及时删除,继续占用内存。
  2. 定期删除:Redis 会定期抽取一部分 key,检查并删除过期的 key。为了减少删除操作对 CPU 的影响,Redis 会限制删除操作的执行时长和频率。虽然定期删除更有利于释放内存,但也可能增加 CPU 负载。

Redis 结合了这两种策略,采用 定期删除惰性删除 的方式。定期删除保证了内存的回收,而惰性删除则在取用时保证 CPU 性能。

因此,可能会出现这样一种情况:虽然设置了 key 的过期时间,但当该时间到达时,key 可能尚未被删除,导致 过期事件 未及时发布。

其他的文章测试

请勿过度依赖Redis的过期监听-阿里云开发者社区

3.2. 丢消息

Redis 的 pub/sub 模式中的消息并不支持持久化,这与消息队列不同。在 Redis 的 pub/sub 模式中,发布者将消息发送给指定的频道,订阅者监听相应的频道以接收消息。当没有订阅者时,消息会被直接丢弃,在 Redis 中不会存储该消息。

3.3. 多服务实例下存在消息重复消息的问题

Redis 的 pub/sub 模式目前只有广播模式,这意味着当生产者向特定频道发布一条消息时,所有订阅相关频道的消费者都能够收到该消息。

这个时候,我们需要注意多个服务实例重复处理消息的问题,这会增加代码开发量和维护难度。

4. 为什么选用Redission作为延迟队列

Redisson 是一个开源的 Java Redis 客户端,提供了许多开箱即用的功能,包括多种分布式锁的实现和延迟队列。Redisson 内置的延迟队列 RDelayedQueue 利用 Redis 的 SortedSet 实现延时任务功能。

SortedSet 是一个有序集合,每个元素都有一个分数,代表其优先级或时间权重。Redisson 通过将需要延迟执行的任务插入到 SortedSet 中,并为它们设置相应的过期时间作为分数来实现延迟队列。

Redisson 在客户端启动一个定时任务,当时间到达时,它使用 zrangebyscore 命令扫描 SortedSet 中已过期的元素(即分数小于或等于当前时间的元素)。这些过期元素会被从 SortedSet 中移除,并加入到就绪消息列表(List 结构)中。

当任务被移到就绪消息列表时,Redisson 通常还会通过 Redis 的发布/订阅机制(Pub/Sub)通知消费者有新任务到达。就绪消息列表是一个阻塞队列,消费者可以使用阻塞操作(如 BLPOP key 0,其中0表示无限等待)来监听。由于 Redis 的 Pub/Sub 机制是事件驱动的,它避免了轮询开销,只有在有新消息时才会触发处理逻辑。

需要注意的是,Redisson 的定时任务调度器并不是以固定时间间隔频繁调用 zrangebyscore 命令进行扫描,而是根据 SortedSet 中最近的到期时间动态调整下一次检查的时间点。

相比于使用 Redis 过期事件监听实现延时任务,Redisson 延迟队列具有以下优势:

  1. 减少丢失消息的可能性RDelayedQueue 中的消息会被持久化,即使 Redis 宕机,根据持久化机制,可能仅丢失少量消息,影响不大。此外,还可以使用数据库扫描作为补偿机制。
  2. 避免消息重复消费:所有客户端从同一个目标队列获取任务,避免了重复消费的问题。

尽管 Redisson 提供了便利的延迟队列功能,但在实际项目中,如果需要更高的吞吐量和可靠性,通常优先选择使用消息队列的延时消息方案。消息队列可以通过保障消息消费的可靠性和控制生产者与消费者数量来实现更好的性能。

5. 使用 Redis 实现延时任务有什么注意的地方?

在任务时间跨度较大且任务数量众多的场景中,需要特别注意内存管理。大量任务可能会导致内存占用过高,而长时间保存任务则会造成资源浪费。为了解决这些问题,可以结合使用 MySQL 和 Redis 来优化任务管理:

  1. 短期任务:对于延迟时间较短的任务(例如几分钟到几个小时内执行的任务),可以继续存储在 Redis 中,以便快速访问和处理。
  2. 长期任务:对于延迟时间较长的任务(例如几天或几周后执行的任务),则可以存储在 MySQL 中。通过这种方式,可以有效减少 Redis 的内存占用。
  3. 定期扫描:使用定时任务(例如 XXL-JOB 或 Spring Task)定期扫描 MySQL 中即将到期的任务(例如未来 2 小时内到期的任务),并将这些任务推送到 Redis 中进行处理。这种做法可以确保任务在适当的时候被加载到内存中。
  4. 优化查询:在定期扫描 MySQL 时,可能需要处理大量数据。为提高查询效率,可以使用索引或进行分库分表等优化措施。

将 Redis 和 MySQL 结合使用的优势

  1. 节省缓存资源:通过将长期任务存储在 MySQL 中,避免了在 Redis 中存储大量长期任务导致的内存浪费。
  2. 可靠性和成本:MySQL 提供的事务机制可以保证任务数据的可靠性,同时存储成本也相对较低。
  3. 避免大 key 问题:如果仅使用一个 RDelayedQueue,任务数量过大会产生大 key 问题。可以通过将任务按某种逻辑(如时间段、任务类型)分片存储到多个 RDelayedQueue 中来避免这一问题。

通过这种结合使用的方式,既能利用 Redis 的快速访问能力,又能依靠 MySQL 的持久化存储和事务支持,有效地管理大时间跨度和大量的延时任务。

6. 如何使用 Redisson 实现延迟队列

6.1. 基础配置

maven 依赖:

<dependency>
  <groupId>org.redisson</groupId>
  <artifactId>redisson-spring-boot-starter</artifactId>
  <version>3.16.2</version>
</dependency>

基础 Redission 配置文件

@Configuration
public class RedissonConfig {

    @Bean
    public RedissonClient redissonClient() {
        Config config = new Config();
        config.useSingleServer().setAddress("redis://localhost:6379");
        return Redisson.create(config);
    }
}

6.2. 延迟队列实战

这里就随便写一个需要频繁修改数据的场景,就例如一个点赞的场景

点赞功能虽然看起来简单,但如果系统流量大,用户频繁点赞,尤其是针对热门内容时,后台需要处理的大量并发请求就会成为性能瓶颈

点赞计数并发处理

  • 并发场景:在点赞的场景中,可能会有很多用户同时对同一个帖子进行点赞。为了避免计数错误(比如两个人同时点赞,只增加了一次的情况),就需要使用分布式锁或原子操作来确保计数的正确性。
  • 解决方案:使用 RAtomicLong 是一个比较好的选择,它提供原子操作,可以确保多个线程或多个分布式节点对同一个点赞计数进行安全的增加或覆盖操作。

缓存

  • 高频访问优化:在点赞场景中,某个内容可能会在短时间内被大量用户点赞。将点赞数保存在 Redis 这样的缓存系统中,可以极大地减轻数据库的压力,提高系统的响应速度。

延迟持久化

  • 持久化问题:每次点赞都立即写入数据库会对数据库产生巨大的压力,尤其是在高并发情况下。因此,通常的策略是将点赞数暂时缓存在 Redis 中,等待合适的时间再批量持久化到数据库。
  • 解决方案:通过 RDelayedQueue 实现延迟处理的功能。在点赞操作发生时,不立即持久化,而是将操作推迟 15 分钟再处理。这一做法既能确保点赞数不丢失,又减少了频繁持久化操作的开销。

使用 RBlockingQueue 代替 RQueue 的好处

  • 避免频繁轮询:在原本的代码中使用了 RQueue,这种队列会需要不断地去轮询,判断是否有新的任务需要处理,这对资源是一种浪费。
  • 阻塞队列的优化:RBlockingQueue 提供了阻塞机制,只有在有新元素到来时才会唤醒队列进行处理,节省了系统资源的消耗,减少不必要的 CPU 轮询开销。

public Long likeIncrementCount(String postId, Long directLikeNum, int countStrategy) {
    String key = LikeCacheKey.Like_COUNT.getKey(postId);
    RAtomicLong rAtomicLong = redissonClient.getAtomicLong(key);

    // 初始化操作数,如果 Redis 数据不存
    if (!rAtomicLong.isExists()) {
        getLikeNum(postId);
    }

    long likeCount;
    // 根据策略计数
    switch (countStrategy) {
        case ACCUMULATION.getType(): // 累加
            if (directLikeNum == null) {
                likeCount = rAtomicLong.incrementAndGet();
            } else {
                likeCount = rAtomicLong.addAndGet(directLikeNum);
            }
            break;
        case COVER.getType(): // 覆盖
            if (directLikeNum == null) {
                throw new IllegalArgumentException("Direct like number cannot be null when using override strategy");
            }
            rAtomicLong.set(directLikeNum);
            likeCount = directLikeNum;
            break;
        default: // 默认返回当前值
            likeCount = rAtomicLong.get();
            break;
    }

    // 设置过期时间
    rAtomicLong.expire(60, TimeUnit.MINUTES);

    // 使用RBlockingQueue避免频繁轮询
    RBlockingQueue<String> blockingQueue = redissonClient.getBlockingQueue(LikeCacheKey.Like_DYNAMIC.getKey());
    RDelayedQueue<String> delayedQueue = redissonClient.getDelayedQueue(blockingQueue);

    // 如果队列中不包含当前key,则添加到延迟队列中
    if (!delayedQueue.contains(key)) {
        // 延迟 10 分钟统计
        delayedQueue.offerAsync(key, 10, TimeUnit.MINUTES);
    }

    return likeCount;
}

接下来就是处理延迟队列

@Slf4j
@Component
@RequiredArgsConstructor
public class LikePersistenceTask implements ApplicationRunner {

    private final ScheduledExecutorService executorService = Executors.newSingleThreadExecutor();
    private final LikeService likeService;  
    private final RedissonClient redissonClient;

    @Override
    public void run(ApplicationArguments args) {
        executorService.submit(this::processLikeData);
        log.info("启动一个后台线程,用于处理 Redis 点赞统计数据持久化。");
    }

    private void processLikeData() {
        RBlockingQueue<String> blockingQueue = redissonClient.getBlockingQueue(LikeCacheKey.LIKE_DYNAMIC.getKey());
        while (!Thread.currentThread().isInterrupted()) {
            try {
                String key = blockingQueue.take();  // 阻塞直到有元素
                if (StringUtils.isNotBlank(key)) {
                    processKey(key);
                }
            } catch (InterruptedException e) {
                log.error("处理 Redis 点赞统计数据持久化线程被中断", e);
                Thread.currentThread().interrupt();  // 恢复中断状态
            } catch (Exception e) {
                log.error("处理 Redis 点赞统计数据持久化时发生错误", e);
            }
        }
    }

    private void processKey(String key) {
        String[] objs = LikeCacheKey.LIKE_COUNT.parseKeyArg(key);
        String postId = objs[0];
        Integer actionType = Integer.valueOf(objs[1]);
        likeService.persistenceLikeData(postId, actionType);
    }

    @PreDestroy
    public void shutdown() {
        log.info("正在关闭 LikePersistenceTask 线程池...");
        executorService.shutdown();
        try {
            if (!executorService.awaitTermination(60, TimeUnit.SECONDS)) {
                executorService.shutdownNow();
                if (!executorService.awaitTermination(60, TimeUnit.SECONDS)) {
                    log.error("线程池未能在指定时间内终止");
                }
            }
        } catch (InterruptedException ie) {
            executorService.shutdownNow();
            Thread.currentThread().interrupt();
        }
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2111737.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

iOS——通知协议代理

通知 概要 观察者和被观察者都无需知晓对方&#xff0c;只需要通过标记在NSNotificationCenter中找到监听该通知所对应的类&#xff0c;从而调用该类的方法。并且在NSNotificationCenter中&#xff0c;观察者可以只订阅某一特定的通知&#xff0c;并对其做出相应操作&#xf…

《论多源数据集成及应用》写作框架,软考高级系统架构设计师

论文真题 在如今信息爆炸的时代,企业、组织和个人面临着大量的数据。这些数据来自不同的渠道和资源,包括传感器、社交媒体、销售记录等,它们各自具有不同的数据格式、分布和存储方式。因此如何收集、整理和清洗数据,以建立一个一致、完整的数据集尤为重要。多源数据集成可…

Leetcode 700-二叉搜索树中的搜索

给定二叉搜索树&#xff08;BST&#xff09;的根节点 root 和一个整数值 val。 你需要在 BST 中找到节点值等于 val 的节点。 返回以该节点为根的子树。 如果节点不存在&#xff0c;则返回 null 。 题解 如果root.val>val&#xff0c;则搜索左子树&#xff0c;如果roo…

finalshell 4.5.x在m1mac闪退

使用过程中会出现突然闪退&#xff0c;尤其在定位生产打开一堆窗口的情况下&#xff0c;绝绝子 闪退崩溃日志&#xff1a; Thread 116 Crashed:: Java: pool-4-thread-28 0 libsystem_kernel.dylib 0x18e926600 __pthread_kill 8 1 libsystem_pthread.dyl…

基于opencv实现双目立体匹配点云距离

双目相机或两个单目相机。 一、相机标定 MATLAB软件&#xff0c;打开双目标定app。 点击add images&#xff0c;弹出加载图像的窗口&#xff0c;分别导入左图和右图&#xff0c;设置黑白格长度&#xff08;标定板的长度一般为20&#xff09;。 点击确定&#xff0c;弹出加载…

ArrayList,LinkedList

ArrayList集合 底层原理 1.利用空参创建的集合&#xff0c;在底层创建一个默认长度为0的数组 2.添加第一个元素时&#xff0c;底层会创建一个新的长度为10的数组 3.存满时&#xff0c;会扩容1.5倍 4.如果一次添加多个元素&#xff0c;1.5倍还放不下&#xff0c;则新创建数…

【C++】list的使用与简单模拟实现

目录 1、list的介绍和使用&#xff1a; 1、结构&#xff1a; 2、接口函数&#xff1a; 迭代器遍历&#xff1a; 增删查改&#xff1a; 翻转与排序&#xff1a; 2、list的模拟实现&#xff1a; 1、节点的封装&#xff1a; 2、迭代器的封装&#xff1a; 3、list的模拟实…

Flutter中自定义气泡框效果的实现

在用户界面的设计中&#xff0c;气泡框&#xff08;Bubble&#xff09;是一种非常有效的视觉工具&#xff0c;它可以用来突出显示信息或提示用户。气泡框广泛应用于聊天应用、通知提示等场景。在 Flutter 中&#xff0c;虽然有很多现成的气泡框组件&#xff0c;但如果你想要更多…

关于 ubuntu系统install的cmake版本较低无法编译项目升级其版本 的解决方法

若该文为原创文章&#xff0c;转载请注明原文出处 本文章博客地址&#xff1a;https://hpzwl.blog.csdn.net/article/details/141933927 长沙红胖子Qt&#xff08;长沙创微智科&#xff09;博文大全&#xff1a;开发技术集合&#xff08;包含Qt实用技术、树莓派、三维、OpenCV…

django摄影竞赛小程序论文源码调试讲解

2系统关键技术及工具简介 系统开发过程中设计的关键技术是系统的核心&#xff0c;而开发工具则会影响的项目开发的进程和效率。第二部分便描述了系统的设计与实现等相关开发工具。 2.1 Python简介 Python 属于一个高层次的脚本语言&#xff0c;以解释性&#xff0c;编译性&am…

Android Studio调试Flutter项目

run运行项目中途&#xff0c;点击Flutter Attach 等一会就可以调试&#xff01; 或者&#xff0c;直接Debug允行项目。

C++相关概念和易错语法(32)(单例模式、类型转换)

1.单例模式 &#xff08;1&#xff09;设计模式是什么&#xff1f; 简单来说&#xff0c;被反复使用&#xff0c;多数人知晓、经过分类的代码设计经验的总结就叫设计模式&#xff0c;它建立在特殊类的设计之上&#xff0c;实现特殊的功能&#xff0c;运用的知识也十分综合。如…

协议集合(学习笔记)

按照数据的传送方式&#xff0c;通信协议可分为以下2种。 串行通信&#xff1a;串行&#xff08;Serial&#xff09;指的是逐个传输数据位&#xff0c;一次只传输一个位。 并行通信&#xff1a;并行&#xff08;Parallel&#xff09;指的是同时传输多个数据位&#xff0c;一次…

VMware 中 kali Linux的安装与使用

文章目录 前言 一、安装虚拟机 二、使用步骤 总结 前言 随着信息技术的飞速发展&#xff0c;虚拟化技术已经成为现代企业和个人用户不可或缺的一部分。通过虚拟化技术&#xff0c;我们可以在一台物理计算机上运行多个独立的操作系统和应用程序&#xff0c;从而实现资源的高效利…

基于WiFi的智能照明控制系统的设计与实现(论文+源码)

1系统方案设计 本设计智能照明控制系统&#xff0c;结合STM32F103单片机、光照检测模块、显示模块、按键模块、太阳能板、LED灯模块、WIFI模块等器件构成整个系统&#xff0c;在功能上可以实现光照强度检测&#xff0c;并且在自动模式下可以自动调节照明亮度&#xff0c;在手动…

【spring】例子2:mvc web开发

领域类 开发时编译时用lombok提供支持 最终生成的包里不包含lombok

【Android】程序开发组件—探究Jetpack

引言 Jetpack是一个开发组件工具集&#xff0c;它的主要目的是帮助我们编写出更加简洁的代码&#xff0c;并简化我们的开发过程&#xff0c;在这么多的组件当中&#xff0c;最需要我们关注的其实还是架构组件&#xff0c;接下来就对Jetpack的主要架构组件进行学习&#xff01;…

数据结构-----栈、队列

一、栈 1、栈(stack)是限定仅在表尾进行插入和删除操作的线性表。 把允许插入和删除的一端称为栈顶&#xff08;top)&#xff0c;另一端称为栈底&#xff08;bottom)&#xff0c;不含任何数据元素的栈称为空栈。栈又称为后进先出&#xff08;Last In First Out)的线性表,简称L…

OpenAI gym: Trouble installing Atari dependency (Mac OS X)

题意&#xff1a; 使用OpenAI Gym库时&#xff0c;安装Atari环境可能会遇到一些依赖问题&#xff0c;尤其是在Mac OS X系统上 问题背景&#xff1a; Im new to OpenAI gym. Ive successfully installed OpenAI gym on my Mac OS X (High Sierra 10.13.3) laptop and made a D…

C语言程序设计(算法的概念及其表示)

一、算法的概念 一个程序应包括两个方面的内容: 对数据的描述:数据结构 对操作的描述:算法 著名计算机科学家沃思提出一个公式: 数据结构 +算法 =程序 完整的程序设计应该是: 数据结构+算法+程序设计方法+语言工具 广义地说,为解决一个问题而采取的方法和步骤…