Spring Boot集成Redisson实现延迟队列

news2025/1/15 13:20:29

项目场景:

   在电商、支付等领域,往往会有这样的场景,用户下单后放弃支付了,那这笔订单会在指定的时间段后进行关闭操作,细心的你一定发现了像某宝、某东都有这样的逻辑,而且时间很准确,误差在1s内;那他们是怎么实现的呢?

   一般实现的方法有几种:使用 redisson、rocketmq、rabbitmq等消息队列的延时投递功能。


解决方案:

   一般项目集成redis的比较多,所以我这篇文章就说下redisson延迟队列,如果使用rocketmq或rabbitmq需要额外集成中间件,比较麻烦一点。

1.集成redisson

maven依赖

<dependency>
	<groupId>org.redisson</groupId>
	<artifactId>redisson-spring-boot-starter</artifactId>
	<version>3.21.1</version>
</dependency>

yml配置,单节点配置可以兼容redis的配置方式

# redis配置
spring:
  redis:
    database: 0
    host: 127.0.0.1
    password: redis@pass
    port: 6001

 更详细的配置参考:Spring Boot整合Redisson的两种方式-CSDN博客

2.配置多线程

因为延迟队列可能会多个任务同时执行,所以需要多线程处理。

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

import java.util.concurrent.ThreadPoolExecutor;

@Configuration
@EnableAsync
public class ExecutorConfig {
    /**
     * 异步任务自定义线程池
     */
    @Bean(name = "taskExecutor")
    public ThreadPoolTaskExecutor asyncServiceExecutor() {
    	ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        //配置核心线程数
        executor.setCorePoolSize(50);
        //配置最大线程数
        executor.setMaxPoolSize(500);
        //配置队列大小
        executor.setQueueCapacity(300);
        //允许线程空闲时间
        executor.setKeepAliveSeconds(60);
        //配置线程池中的线程的名称前缀
        executor.setThreadNamePrefix("taskExecutor-");
        // rejection-policy:当pool已经达到max size的时候,如何处理新任务
        // CALLER_RUNS:不在新线程中执行任务,而是有调用者所在的线程来执行
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        //调用shutdown()方法时等待所有的任务完成后再关闭
        executor.setWaitForTasksToCompleteOnShutdown(true);
        //等待所有任务完成后的最大等待时间
		executor.setAwaitTerminationSeconds(60);
        return executor;
    }
}

3.具体业务

比如消息通知、关闭订单等 ,这里加上了@Async注解,可以异步执行

import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;

import java.text.SimpleDateFormat;
import java.util.Date;

@Service
public class AsyncService {

	@Async
	public void executeQueue(Object value) {
		System.out.println();
		System.out.println("当前线程:"+Thread.currentThread().getName());
		System.out.println("执行任务:"+value);

		//打印时间方便查看
		SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
		System.out.println("执行任务的时间:"+sdf.format(new Date()));
		//自己的业务逻辑,可以根据id发送通知消息等
		//......
	}
}

4.延迟队列(关键代码)

这里包括添加延迟队列,和消费延迟队列,@PostConstruct注解的意思是服务启动加载一次,参考Spring Boot中多个PostConstruct注解执行顺序控制_多个postconstruct执行顺序-CSDN博客

import org.redisson.api.RBlockingQueue;
import org.redisson.api.RDelayedQueue;
import org.redisson.api.RedissonClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Service;

import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.TimeUnit;

@Service
public class TestService {

	@Resource
	private AsyncService asyncService;
	@Resource
	private ThreadPoolTaskExecutor executor;
	@Autowired
	private RedissonClient redissonClient;

	/**
	 * 添加延迟任务
	 */
	public void addQueue() {
		//获取延迟队列
		RBlockingQueue<Object> blockingQueue = redissonClient.getBlockingQueue("delayedQueue");
		RDelayedQueue<Object> delayedQueue = redissonClient.getDelayedQueue(blockingQueue);
		for (int i = 1; i <= 10; i++) {
			long delayTime = 5+i; //延迟时间(秒)
//			long delayTime = 5; //这里时间统一,可以测试并发执行
			delayedQueue.offer("延迟任务"+i, delayTime, TimeUnit.SECONDS);
		}
		//打印时间方便查看
		SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
		System.out.println("添加任务的时间:"+sdf.format(new Date()));
	}

	/**
	 *	服务启动时加载,开始消费延迟队列
	 */
	@PostConstruct
	public void consumer() {
		System.out.println("服务启动时加载>>>>>>");
		//获取延迟队列
		RBlockingQueue<Object> delayedQueue = redissonClient.getBlockingQueue("delayedQueue");

		//启用一个线程来消费这个延迟队列
		executor.execute(() ->{
			while (true){
				try {
//					System.out.println("while中的线程:"+Thread.currentThread().getName());
					//获取延迟队列中的任务
					Object value = delayedQueue.poll();
					if(value == null){
						//如果没有任务就休眠1秒,休眠时间根据业务自己定义
						Thread.sleep(1000);	//这里休眠时间越短,误差就越小
						continue;
					}
					//异步处理延迟队列中的消息
					asyncService.executeQueue(value);
				} catch (Exception e) {
					e.printStackTrace();
				}
			}
		});
	}
}

5.测试接口 

import com.test.service.TestService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
@RequestMapping("/test")
public class TestController {

	@Autowired
	private TestService testService;

	/*
	 * 添加延迟任务
	 */
	@GetMapping(value = "/addQueue")
	public String addQueue() {
		testService.addQueue();
		return "success";
	}

}

6.测试结果


 总结:

  1. Redisson的的RDelayedQueue是基于Redis实现的,而Redis本身并不保证数据的持久性。如果Redis服务器宕机,那么所有在RDelayedQueue中的数据都会丢失。因此,我们需要在应用层面进行持久化设计,例如定期将RDelayedQueue中的数据持久化到数据库。
  2. 在设计延迟任务时,我们应该根据实际需求来合理设置延迟时间,避免设置过长的延迟时间导致内存占用过高。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1623352.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

K-近邻算法的 sklearn 实现

实验目的与要求 掌握基于 K-近邻分类算法的编程方法通过编程理解 K-近邻分类算法和该算法的基本步骤 实验器材 硬件&#xff1a;PC 机&#xff08;参与实验的学生每人一台&#xff09;软件环境&#xff1a;Python3.7 Pycharm 实验内容 使用 sklearn 库中的 neighbors 模块实…

【java、maven环境变量配置问题】

这里写目录标题 软件版本查询所遇问题及解决方法1、java环境变量修改后不起效果&#xff1a;变量值2、java环境变量修改后不起效果&#xff1a;变量名结论&#xff1a; 软件版本查询 查询 java jdk 版本&#xff1a;java -version 查询 maven 版本&#xff1a; mvn -v 所遇问…

如何安装最新版Docker Compose?

Docker Compose 是一个用于定义和运行多容器 Docker 应用程序的工具。通过 Compose&#xff0c;您可以使用 YAML 文件来配置应用服务&#xff0c;然后只需一个简单的命令便能创建和启动所有服务。在本篇博客中&#xff0c;我们将详细介绍如何在 Linux 系统上安装 Docker Compos…

恭喜!喜提美国匹兹堡大学儿童医院访问学者邀请函

➡️【院校简介】 匹兹堡UPMC儿童医院该院是匹兹堡大学医学中心的一部分&#xff0c;也是大匹兹堡唯一一家专门护理26岁以下婴儿&#xff0c;儿童&#xff0c;青少年和年轻人的医院。该医院隶属于匹兹堡大学医学院&#xff0c;设有一个获得州级认证的一级儿科创伤中心&#xf…

ESP32开发WebSocket报错TRANSPORT_WS: Sec-WebSocket-Accept not found

我的芯片是ESP32-S3&#xff0c;用ESP-IDF框架进行开发的时候&#xff0c;用官方的WebSocket的example创建了项目。然后把WebSocket连接uri替换为自己的服务器后&#xff0c;运行到esp_websocket_client_start开始连接后&#xff0c;直接报错&#xff1a; E (10615) TRANSPORT…

C++|运算符重载(3)|日期类的计算

前面介绍了运算符重载相关规则和方法&#xff0c;今天用运算重载函数实现对日期类的操作。 目录 前面准备 实现功能&#xff1a; -运算符 Date类和int 相减 Date类和Date类相减 运算符 &#xff0c;-运算符 ,!运算符 >,>运算符 <,<运算符 &#xff0c;-…

MIS微调SAM模型实时交互UI界面

前言 SAM模型的基本介绍可见SAM&#xff08;Segment Anything Model&#xff09;大模型使用--point prompt_sam大模型-CSDN博客 针对Meta团队去年发布的SAM大模型在医学图像分割领域表现性能较差的情况&#xff0c;笔者收集了一些MIS领域的数据集对SAM的架构进行fine tune&am…

银河麒麟安装SSH工具

查看22端口是否启用 netstat -ntlp|grep 22 二、安装SSH工具 1、安装openssh 执行指令# sudo apt-get install openssh-server 2、更新ubuntu源 执行指令# sudo apt-get update 3、安装openssh-server 执行指令# sudo apt-get install openssh-server 4、安装ufw防火…

分享:抖音阳哥说的人力RPO项目有哪些优势?

在数字化浪潮的推动下&#xff0c;人力资源行业也迎来了前所未有的变革。抖音平台上&#xff0c;阳哥以其独到的见解和丰富的经验&#xff0c;对人力RPO(招聘流程外包)项目进行了深入解读。今天&#xff0c;我们就来探讨一下人力RPO项目究竟有哪些优势。 人力RPO项目的一大优势…

【C语言__联合和枚举__复习篇10】

目录 前言 一、联合体 1.1 联合体的概念 1.2 联合体与结构体关于声明和内存布局的比较 1.3 联合体的大小如何计算 1.4 使用联合体的2个示例 二、枚举体 2.2 枚举体的概念 2.2 枚举体的优点 前言 本篇主要讨论以下问题&#xff1a; 1. 联合体是什么&#xff0c;它有什么特点 …

SpringMVC笔记——SpringMVC基础Tomcat环境配置

Tomcat安装配置 下载Apache Tomcat 进入官网https://tomcat.apache.org/&#xff0c;选择tomcat 9 这边使用idea开发&#xff0c;建议直接下载压缩包 无法访问下载的可以直接用我的下载链接&#xff1a;https://cloudreve.zxbdwy.online/s/6nSA 提取码&#xff1a;w1pwk3将压…

玩转微服务-SonarQube

这里写目录标题 第一节 SonarQube1.1 简介1.2 四个组成部分1.2.1 SonarQube服务器1.2.2 SonarQube数据库1.2.3 插件1.2.4 Scanner 1.3 工作流程 第二节 SonarQube的安装2.1 安装2.2 插件 第三节 P3C规范3.1 简介3.2 SonarQube 配置 P3C规范3.3 IDEA配置 P3C规范 第四节 Maven项…

基于opencv的单目相机标定

openCv版本&#xff1a;4.4.0 从源码处拷贝标定代码出来使用&#xff0c;需要拷贝samples/cpp/tutorial_code/calib3d/camera_calibration 需要的文件如下&#xff1a; -rw-rw-r-- 1 rog rog 28490 Jul 18 2020 camera_calibration.cpp -rw-rw-r-- 1 rog rog 3152 Jul 18 …

初识C++·类和对象(中)(3)

前言&#xff0c;最难的已经结束了&#xff0c;来点轻松了放松一下。 目录 1 流重载 2 const成员 3 取地址及const取地址操作符重载 1 流重载 C语言中printf和scanf是有局限性&#xff0c;只能直接打印内置类型&#xff0c;对于自定义类型就哦豁了&#xff0c;所以在C中就…

5月计算机各省报名时间汇总报名流程

&#x1f4e3;5月有5省可进行计算机报名 天津&#xff1a;5月6日-5月10日 福建&#xff1a;5月6日9:00-5月12日17:00 广西&#xff1a;5月6日9:00-5月12日23:55 重庆&#xff1a;5月6日9:00-5月12日24:00 西藏&#xff1a;预计5月6日-12日 &#x1f50d;计算机等级考试报…

linux DNS域名解析服务

目录 一.DNS DNS系统的作用 域名结构&#xff1a; 根域 顶级域 二级域 子域 主机 二.DNS解析过程 迭代查询&#xff1a; 递归查询&#xff1a; 三.实验模拟 主、从服务器设置 1.搭建本地DNS服务器------(主服务器配置) 1&#xff09;初始化系统 ​编辑2&#xf…

win c++使用lua环境配置 5.3.5版本

编译lua 下载lua源码&#xff0c;github仓库 使用vs编译源码&#xff0c;新建一个静态库项目(只会生成lib文件)&#xff0c;想要dll的话就新建dll项目&#xff08;有一个lib文件和dll文件&#xff09; 把lua源码下面的文件夹都是&#xff0c;复制到vs项目中 lib目录是我手动…

Linux蓝牙驱动模拟HID设备(把Linux系统模拟成蓝牙鼠标和蓝牙键盘)

by fanxiushu 2024-04-24 转载或引用请注明原始作者。 在经过windows的蓝牙驱动开发模拟成HID设备的大风大浪之后&#xff0c; 现在回到linux下实现相同功能&#xff0c;简直就是如小孩嬉闹一样的轻松。 但无论如何&#xff0c;作为模拟蓝牙HID设备的windows&#xff0c;linux一…

冰箱、保险柜门不要对准外面

风水真的和我们的生活息息相关&#xff0c;它会影响到我们的事业财运、婚姻感情、健康平安等等。很多人在不知情的情况下&#xff0c;触犯了风水里的禁忌&#xff0c;那结果可想而知&#xff01; 峰民特别提示您&#xff1a;千万不要将冰箱、保险柜的门对准外面。冰箱是不能与…

什么是重放攻击(Reply attack)?

什么是重放攻击(Reply attack)? 重放攻击&#xff0c;也称为回放攻击&#xff0c;是一种网络攻击方式。重放攻击是一种中间人攻击&#xff0c;攻击者通过截获合法的数据传输并重新发送它们来欺骗接收方&#xff0c;让接收方误以为是合法的消息。重放攻击是非常常见的&#xf…