(二)延时任务篇——通过redis的key监听,实现延迟任务实战

news2025/1/15 20:08:43

前言

本节内容是关于使用redis的过期key,通过开启其监听失效策略,模拟订单延迟任务的执行流程。其核心原理是通过使用redis订阅与发布的方式,将过期失效的key通过广播的方式,发布给客户端,客户端可以监听此消息进而消费消息。需要注意的是官方并不推荐此方式,因为其容易造成数据丢失,例如没有客户端消费消息,消息也会丢失。对于一些安全性要求比较低的场景,可以使用此方式实现延迟队列。

正文

  • 引入redis的pom依赖
<dependency>
	<groupId>org.apache.commons</groupId>
	<artifactId>commons-pool2</artifactId>
</dependency>

<dependency>
	<groupId>org.springframework.boot</groupId>
	<artifactId>spring-boot-starter-data-redis-reactive</artifactId>
</dependency>
  • application.yml中配置redis连接
spring:
  data:
    redis:
      host: 127.0.0.1
      port: 6379
      database: 0
      connect-timeout: 30000
      timeout: 30000
      lettuce:
        pool:
          enabled: true
          max-active: 200
          max-idle: 50
          max-wait: -1
          min-idle: 10
        shutdown-timeout: 100
  •  配置redis的缓冲池,并注入redis的消息监听容器
package com.yundi.tps.config;

import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.PropertyAccessor;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.github.benmanes.caffeine.cache.Caffeine;
import org.springframework.cache.CacheManager;
import org.springframework.cache.caffeine.CaffeineCacheManager;
import org.springframework.cache.support.CompositeCacheManager;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.cache.RedisCacheConfiguration;
import org.springframework.data.redis.cache.RedisCacheManager;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.listener.PatternTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.serializer.GenericJackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.RedisSerializationContext;
import org.springframework.data.redis.serializer.StringRedisSerializer;

import java.util.concurrent.TimeUnit;


@Configuration
public class RedisConfig {

    @Bean
    public CacheManager cacheManager(RedisConnectionFactory connectionFactory) {
        
        // redis缓存管理器
        RedisCacheConfiguration defaultCacheConfig = RedisCacheConfiguration.defaultCacheConfig()
                .serializeValuesWith(RedisSerializationContext
                        .SerializationPair
                        .fromSerializer(new GenericJackson2JsonRedisSerializer()));
        RedisCacheManager redisCacheManager = RedisCacheManager.builder(connectionFactory)
                .cacheDefaults(defaultCacheConfig)
                .transactionAware()
                .build();
        return new CompositeCacheManager(redisCacheManager);
    }

    @Bean
    public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory factory) {
        RedisTemplate<String, Object> template = new RedisTemplate<>();
        template.setConnectionFactory(factory);
        Jackson2JsonRedisSerializer<Object> serializer = new Jackson2JsonRedisSerializer<>(Object.class);
        ObjectMapper objectMapper = new ObjectMapper();
        objectMapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
        serializer.serialize(objectMapper);
        template.setValueSerializer(serializer);
        template.setKeySerializer(new StringRedisSerializer());
        template.afterPropertiesSet();
        return template;
    }

    @Bean
    public RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory) {
        RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        return container;
    }
}

  • 实现一个KeyExpirationEventMessageListener过期的监听器RedisKeyExpirationListener
package com.yundi.tps.listener;

import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.listener.KeyExpirationEventMessageListener;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.stereotype.Component;


@Component
@Slf4j
public class RedisKeyExpirationListener extends KeyExpirationEventMessageListener {


    public RedisKeyExpirationListener(RedisMessageListenerContainer listenerContainer) {
        super(listenerContainer);
    }

    /**
     * Redis失效事件 key
     *
     * @param message
     * @param pattern
     */
    @Override
    public void onMessage(Message message, byte[] pattern) {
        //notify-keyspace-events Ex
        // 匹配规则
        String patternRule = new String(pattern);
        log.info("patternRule:{}", patternRule);
        // 监听的通道
        byte[] channel = message.getChannel();
        log.info("channel:{}", new String(channel));
        // 过期的key
        String expireKey = message.toString();
        log.info("expireKey:{}", expireKey);
        //TODO 处理订单的后续业务逻辑

    }

}
  •  实现一个创建订单的延时任务接口,模拟订单超时
package com.yundi.tps.controller;

import com.baomidou.mybatisplus.core.toolkit.IdWorker;
import com.yundi.xyxc.tps.common.ApiResponse;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.tags.Tag;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.concurrent.TimeUnit;


@Tag(name = "订单管理")
@RestController
@RequestMapping("/api/tps/order")
public class OrderController {
    @Autowired
    private StringRedisTemplate stringRedisTemplate;

    @Operation(summary = "创建订单")
    @PostMapping("save")
    public ApiResponse save() {
        String orderId = String.valueOf(IdWorker.getId());
        stringRedisTemplate.opsForValue().setIfAbsent(orderId, orderId, 60, TimeUnit.SECONDS);
        return ApiResponse.ok();
    }
}
  • 开启redis key的失效监听,在redis配置中添加以下配置
notify-keyspace-events Ex

  •  启动redis服务和客户端项目,发送延时订单任务,看客户端是否能够消费到此延迟任务

结语

需要注意的是,该方式实现的延迟任务安全性较低,对于安全性高的场景,并不推荐此种方式。关于使用redis的key监听,实现延迟任务实战内容到这里就结束了,下期见。。。。。。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1959043.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

如何将旧电脑的数据迁移到新电脑?旧电脑数据迁移技巧

随着科技的不断发展&#xff0c;电脑硬件的更新换代速度也越来越快。当我们购买了一台新电脑时&#xff0c;如何将旧电脑的数据迁移到新电脑&#xff0c;成为了我们必须面对的问题。本文将详细介绍几种数据迁移的方法&#xff0c;帮助您顺利完成数据迁移&#xff0c;确保重要资…

【已解决】ERROR: No matching distribution found for torch.安装torch一次性解决方法

文章目录 环境异常原因直接解决方案成功 环境 python 安装 torch 异常 (base) ➜ ComfyUI git:(master) pip install -i https://pypi.mirrors.ustc.edu.cn/simple torch Looking in indexes: https://pypi.mirrors.ustc.edu.cn/simple ERROR: Could not find a version tha…

无人机WIFI集群组网技术详解及成本分析

一、技术详解 1. 无人机WIFI集群组网概述 无人机WIFI集群组网技术是指利用无人机作为移动平台&#xff0c;通过集成高性能的WIFI模块&#xff0c;实现多架无人机之间以及无人机与地面控制站之间的无线通信组网。该技术不仅能够提升无人机集群的协同作业能力&#xff0c;还能在…

【每日一题】python输入两个字,共随机出现100个,查询分别出现多少次

print(""" 分别输入两个字&#xff0c;共100个字&#xff0c;随机出现。 自动查询每个字出现的次数 """) str1input("输入一个字:") str2input("输入一个字:") import random m[str1,str2] i1 x0 y0 while i<9…

代码改进跑通 创新点 文章复现 人工智能

代码改进跑通➕创新点➕文章复现➕人工智能 高质量接创新点代码改进跑通复现代码&#xff0c;模型优化 python代跑时间序列预测分析代码编写python编 程 深度学习算法自然语言处理神经网络跑通指导爬虫调试 项目指导定制代做改进提升创新优化Python Matlab COpencvNlp Pytorch …

C++(week15): C++提高:(三)计算机网络

文章目录 一、计算机网络基础1.协议概念2.分层模型3.协议格式(1)以太网帧格式(2)IP段格式(3)TCP/UDP数据报格式4.TCP协议(1)TCP协议的特点(2)三次握手(3)四次挥手(4)SYN攻击5.状态迁移图的解析:11种状态6.TCP通信状态与程序结合分析二、网络编程(Socket编程)1.网络编程基础2.字…

《LeetCode热题100》---<哈希三道>

本篇博客讲解 LeetCode热题100道中的哈希篇中的三道题。分别是 1.第一道&#xff1a;两数之和&#xff08;简单&#xff09; 2.第二道&#xff1a;字母异位词分组&#xff08;中等&#xff09; 3.第三道&#xff1a;最长连续序列&#xff08;中等&#xff09; 第一道&#xff1…

各类型算法题整理(python、c++版)hot100

1. 组合数&#xff1a;n个数找k个数的组合 这题的核心是每次遍历从begin到n之间的所有数&#xff0c;并放到一个path里。当pathk的时候返回。要注意两点&#xff1a; &#xff08;1&#xff09;不要在path长度为k的时候清空path&#xff01;回溯不需要清空&#xff0c;因为回…

夏季如何预防脑血管疾病

众所周知&#xff0c;冬季是脑血管病的高发季节。然而&#xff0c;还有资料显示&#xff0c;在炎炎夏日&#xff0c;脑血管疾病的发病率也呈明显的上升趋势。为什么夏季也会高发脑血管病呢&#xff1f;我们来一起了解一下。 1. 出汗量大大增加&#xff0c;血容量就会减少&#…

【Java】韩顺平Java学习笔记 第22章 多用户通讯系统

文章目录 项目开发流程需求分析整体分析用户登录注意 拉取在线用户列表无异常退出私聊功能注意 发送文件服务端推送新闻接收离线消息和文件 项目开发流程 需求分析设计阶段实现阶段测试阶段实施阶段维护阶段 需求分析 用户登录拉取在线用户列表无异常退出&#xff08;客户端、…

【BES2500x系列 -- RTX5操作系统】系统执行流程 -- 引导程序(boot loader)--(十)

&#x1f48c; 所属专栏&#xff1a;【BES2500x系列】 &#x1f600; 作  者&#xff1a;我是夜阑的狗&#x1f436; &#x1f680; 个人简介&#xff1a;一个正在努力学技术的CV工程师&#xff0c;专注基础和实战分享 &#xff0c;欢迎咨询&#xff01; &#x1f49…

【C++】嵌套循环案例 乘法口诀表

乘法口诀表利用嵌套循环语句就可以实现 下面是一个实例 #include<iostream> using namespace std;int main() {for (int i 1; i < 10; i){for (int j 1; j < i; j){cout << j << " * " << i << " " << i *…

Agent终于能主动进化?揭秘首个让AI自我进化的训练框架!突破人类专家局限,告别手动调优!端到端符号化框架如何引领AI自我革命

随着大型语言模型(LLMs)的兴起和AI Agent框架的开源&#xff0c;基于这些强大模型的智能体在学术界和工业界受到了极大的关注&#xff0c;并在多个场景中取得了显著的成果。然而&#xff0c;尽管AI Agent在一些应用中已经落地&#xff0c;其研究和开发仍然主要依赖于“专家中心…

Apollo:目录分析, test ok

apollo: Apollo (阿波罗)是一个开放的、完整的、安全的平台,将帮助汽车行业及自动驾驶领域的合作伙伴结合车辆和硬件系统,快速搭建一套属于自己的自动驾驶系统。 - Gitee.comhttps://github.com/ApolloAuto/apolloapollo 目录名称目录作用cyber消息中间件,替换ros作为消息层…

2024年【制冷与空调设备运行操作】考试题及制冷与空调设备运行操作新版试题

题库来源&#xff1a;安全生产模拟考试一点通公众号小程序 制冷与空调设备运行操作考试题根据新制冷与空调设备运行操作考试大纲要求&#xff0c;安全生产模拟考试一点通将制冷与空调设备运行操作模拟考试试题进行汇编&#xff0c;组成一套制冷与空调设备运行操作全真模拟考试…

python绘制图像无法显示汉字、数字

解决的问题&#xff1a;python绘制图像无法正确显示汉字、数字&#xff0c;图中汉字数字以方块形式显示。 直接先上代码&#xff1a; # 确保图表中的汉字可以显示 plt.rcParams["font.sans-serif"] ["SimHei"] plt.rcParams["axes.unicode_minus…

25.惰性队列

介绍 消费者由于各种原因而致使长时间不能消费消息造成堆积。比如有一百万条消息发送到mq中&#xff0c;消费者这时宕机了不能消费消息&#xff0c;造成了消息堆积。惰性队列就有必要了。 正常情况下&#xff0c;消息保存在内存中。消费者从内存中读取消息消费&#xff0c;速…

【设计模式】代理模式详解

1.简介 代理模式是常用的Java设计模式&#xff0c;该模式的特点是代理类与委托类共享相同的接口。代理类主要负责预处理消息、过滤消息、将消息转发给委托类&#xff0c;并在事后处理消息等。代理类与委托类之间通常存在关联关系&#xff0c;一个代理类对象与一个委托类对象关…

TPM管理咨询公司在项目实施过程中提供哪些培训和支持?

在竞争激烈的市场环境中&#xff0c;企业项目的成功实施不仅是技术的较量&#xff0c;更是管理智慧的体现。而TPM管理咨询公司&#xff0c;作为提升企业运营效率与竞争力的专业伙伴&#xff0c;深知在项目推进的每一步中&#xff0c;专业的培训与强大的支持体系对于确保项目顺利…

shell脚本编写、一键安装nginx、条件语句、 检测网段脚本、 打印九九乘法表、

1.shell脚本 1.编写及运行脚本 [root13git ~]# vim hello.sh [root13git ~]# bash hello.sh [root13git ~]# sh hello.sh [root13git ~]# source hello.sh //在当前进程执行 [root13git ~]# chmod x hello.sh [root13git ~]# ./hello.sh 2.一键安装nginx [root13g…