延时任务通知服务的设计及实现(二)-- redisson的延迟队列RDelayedQueue

news2024/11/16 9:18:59

一、接着上文

RDelayedQueue作为redisson封装的一个分布式延迟队列,直接拿来使用还是比较简单的。

本文主要包括以下几部分:

  • 保存至延迟队列(生产者)
  • 读取延迟队列(消费者)
  • 从延迟队列移除任务

在这里插入图片描述

二、redission配置


import org.redisson.Redisson;
import org.redisson.api.RedissonClient;
import org.redisson.config.Config;
import org.redisson.config.SingleServerConfig;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.data.redis.RedisProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * Redisson配置类
 *
 * @author xxx
 */
@Configuration
public class RedissonConfig {
    @Value("${spring.application.name}")
    private String serverName;

    @Bean
    public RedissonClient redissonClient(RedisProperties redisProperties) {
        Config config = new Config();
        SingleServerConfig singleServerConfig = config.useSingleServer();
        singleServerConfig.setAddress("redis://" + redisProperties.getHost() + ":" + redisProperties.getPort());
        singleServerConfig.setPassword(redisProperties.getPassword());
        singleServerConfig.setKeepAlive(true);
        singleServerConfig.setDatabase(redisProperties.getDatabase());
        singleServerConfig.setConnectionMinimumIdleSize(2);
        singleServerConfig.setConnectionPoolSize(4);
        singleServerConfig.setClientName(serverName);
        return Redisson.create(config);
    }
}
spring:
  application:
    name: delay-task-service
  redis:
    host: 192.168.8.18
    port: 6379
    database: 0
    timeout: 3000

三、保存至延迟队列(生产者)

作为延迟任务的生产者,你需要根据预期的回调时间,计算出delay延迟时间。

伪代码见下:

public static final  String REDISSON_QUEUE_NAME = "DelayTaskQueue";

private final RedissonClient redissonClient;

RBlockingQueue<String> blockingQueue = redissonClient.getBlockingQueue(REDISSON_QUEUE_NAME);
RDelayedQueue<String> delayedQueue = redissonClient.getDelayedQueue(blockingQueue);

long delay = DateUtil.between(event.getNotifyDate(), new DateTime(), DateUnit.SECOND);

delayedQueue.offer(event.getTransNo(), delay < 0 ? 1 : delay, TimeUnit.SECONDS);

四、读取延迟队列(消费者)

    public static final  String REDISSON_QUEUE_NAME = "DelayTaskQueue";

    private final RedissonClient redissonClient;
    
    @PostConstruct
    public void init() {
        new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>())
                .execute(() -> {
                    while (true) {
                        try {
                            RBlockingDeque<String> blockingDeque = redissonClient.getBlockingDeque(REDISSON_QUEUE_NAME);
                            RDelayedQueue<String> delayedQueue = redissonClient.getDelayedQueue(blockingDeque);

                            String transNo = blockingDeque.take();

                            if (null == transNo) {
                                return;
                            }

                            if (log.isInfoEnabled()) {
                                log.info("开始执行延迟队列中的任务,transNo={}", transNo);
                            }
                            // 异步执行你的操作
                            notifyTaskService.handleTask(transNo, null);
                        } catch (Exception e) {
                            log.error("延时队列的任务执行出现异常", e);
                        }
                    }
                });
    }

五、从延迟队列移除任务

public static final  String REDISSON_QUEUE_NAME = "DelayTaskQueue";

private final RedissonClient redissonClient;

RBlockingQueue<String> blockingQueue = redissonClient.getBlockingQueue(REDISSON_QUEUE_NAME);
RDelayedQueue<String> delayedQueue = redissonClient.getDelayedQueue(blockingQueue);

delayedQueue.remove(transNo);

六、总结

本文主要是摘要一些源码,仅供参考。

附:相关系列文章链接

延时任务通知服务的设计及实现(一)-- 设计方案

延时任务通知服务的设计及实现(二)-- redisson的延迟队列RDelayedQueue

延时任务通知服务的设计及实现(三)-- JDK的延迟队列DelayQueue

延时任务通知服务的设计及实现(四)-- webhook执行任务

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1630597.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

静态路由深研究

在创建静态路由时&#xff0c;可以同时指定出接口和下一跳。对于不同的出接口类型&#xff0c;也可以只指定出接口或只指定下一跳。对于点到点接口&#xff08;如串口&#xff09;&#xff0c;可以指定出接口或者下一跳。对于广播接口&#xff08;如以太网接口&#xff09;和VT…

NDK 编译(二)—— NDK 编译与集成 FFmpeg

NDK 编译系列文章共三篇&#xff0c;目录如下&#xff1a; NDK 编译&#xff08;一&#xff09;—— Linux 知识汇总 NDK 编译&#xff08;二&#xff09;—— NDK 编译与集成 FFmpeg NDK 编译&#xff08;三&#xff09;—— CMake 原生构建工具 在使用 NDK 进行音视频开发时&…

Taro引入echarts【兼容多端小程序(飞书/微信/支付宝小程序)】

近期接到公司新需求&#xff0c;开发飞书小程序&#xff0c;并且原型中含有大量的图表&#xff0c;本想使用飞书内置图表组件 —— chart-space&#xff0c;但官方表示已经停止维护了&#xff0c;无奈之下&#xff0c;只能另寻他路&#xff0c;于是乎&#xff0c;图表之王&…

CRM客户关系管理-客户资源企业化销售管理平台

什么是CRM 客户关系管理&#xff08;Customer Relationship Management&#xff0c;简称CRM&#xff09;&#xff0c;是指企业为提高核心竞争力&#xff0c;利用相应的信息技术以及互联网技术协调企业与顾客间在销售、营销和服务上的交互&#xff0c;从而提升其管理方式&#…

dockerfile 搭建lamp 实验模拟

一 实验目的 二 实验 环境 1, 实验环境 192.168.217.88一台机器安装docker 并做mysql nginx php 三台容器 2&#xff0c; 大致框架 3&#xff0c; php php:Nginx服务器不能处理动态页面&#xff0c;需要由 Nginx 把动态请求交给 php-fpm 进程进行解析 php有三…

记录些AI Agents设计模式和NL2SQL知识

吴恩达分享的四种 自我反思&#xff08;Reflection&#xff09;&#xff1a;可以自我修正&#xff1b;使用工具&#xff08;Tool Use&#xff09;&#xff1a;链接其他系统去做一些事情&#xff0c;比如把电脑里面的未归档文件做好归档&#xff1b;规划&#xff08;Planning&a…

【炼金术士】BatchSize对网络训练的影响

文章目录 1 BatchSize对于网络训练的影响2 调整学习率可以提高大BatchSize的性能3 实际训练时的建议3.1 设置初始学习率的方法3.2 多卡训练时学习率的设置 参考资料&#xff1a; 【深度学习】Batch Size对神经网络训练的影响【AI不惑境】学习率和batchsize如何影响模型的性能&…

Windows使用SSH登录本机Linux虚拟机

SSH&#xff08;Secure Shell&#xff09;&#xff0c;一种网络协议&#xff0c;可以在安全外壳下实现数据传输通信&#xff0c;所以主要用于计算机间加密登录&#xff0c;可以简单理解为远程控制。除了计算机间直接互联&#xff0c;在git中也可以看到&#xff0c;常见的协议有…

购买 DDoS 高防 IP 防护哪家好?

DDoS 高防 IP 哪里买会比较好?在这场攻与守的游戏里&#xff0c;DDoS 高防 IP 是一种针对 DDoS 攻击的防护措施&#xff0c;通过将网站或应用的 IP 地址映射到高防 IP 上&#xff0c;实现对流量的清洗和过滤&#xff0c;从而有效抵御 DDoS 攻击。在选择 DDoS 高防 IP 服务提供…

Ubuntu下部署gerrit+报错分析(超详细)

Ubuntu下部署gerrit代码平台 之前安装过几次 最后都在Apache代理这里失败了&#xff0c;如下图&#xff0c;总是gerrit.config与Apache2.config配置有问题&#xff0c;后面换了使用ngnix代理&#xff0c;简单多了 安装Mysql、gerrit、jdk、git 这一步也是非必须得&#xff0…

无监督学习的评价指标

轮廓系数&#xff08;Silhouette Coefficient&#xff09; 轮廓系数用于判断聚类结果的紧密度和分离度。轮廓系数综合了样本与其所属簇内的相似度以及最近的其他簇间的不相似度。 其计算方法如下&#xff1a; 1、计算簇中的每个样本i 1.计算a&#xff08;i&#xff09; &#x…

实时采集麦克风并播放(springboot+webscoekt+webrtc)

项目技术 springbootwebscoektwebrtc 项目介绍 项目通过前端webrtc采集麦克风声音&#xff0c;通过websocket发送后台&#xff0c;然后处理成g711-alaw字节数据发生给广播UDP并播放。 后台处理项目使用线程池(5个线程)接受webrtc数据并处理g711-alaw字节数组放到Map容器中&…

opencv基础篇 ——(九)图像几何变换

图像几何变换是通过对图像的几何结构进行变换来改变图像的形状、大小、方向或者透视关系。常见的图像几何变换包括缩放、旋转、平移、仿射变换和透视变换等。下面对这些几何变换进行简要介绍&#xff1a; 矩阵的转置&#xff08;transpose &#xff09;&#xff1a; 对于图像来…

吴恩达2022机器学习专项课程(一) 7.1 逻辑回归的成本函数第三周课后实验:Lab4逻辑回归的损失函数

问题预览/关键词 上节课回顾逻辑回归模型使用线性回归模型的平方误差成本函数单个训练样本的损失损失函数&#xff0c;成本函数&#xff0c;代价函数的区别线性回归损失函数和逻辑回归损失函数的区别逻辑回归模型的成本函数是什么&#xff1f;逻辑回归模型的损失函数实验逻辑回…

STL——List常用接口模拟实现及其使用

认识list list的介绍 list是可以在常数范围内在任意位置进行插入和删除的序列式容器&#xff0c;并且该容器可以前后双向迭代。 list的底层是双向链表结构&#xff0c;双向链表中每个元素存储在互不相关的独立节点中&#xff0c;在节点中通过指针指向其前一个元素和后一个元素…

linux tcpdump的交叉编译以及使用

一、源码下载 官网&#xff1a;点击跳转 二、编译 1、解压 tar -xf libpcap-1.10.4.tar.xz tar -xf tcpdump-4.99.4.tar.xz 2、配置及编译 //libpcap&#xff1a; ./configure --hostarm-linux --targetarm-linux CCarm-linux-gcc --with-pcaplinux --prefix$PWD/build//t…

对象与JSON字符串互转

1、JSON字符串转化成JSON对象 JSONObject jsonobject JSON.parseObject(str); 或者 JSONObject jsonobject JSONObject.parseObject(str); 功能上是一样的&#xff0c;都是将JSON字符串&#xff08;str&#xff09;转换成JSON对象 jsonobject 。注意str一定得是以键值对存在…

STM32之HAL开发——电容按键

电容按键原理 电容器 (简称为电容) 就是可以容纳电荷的器件&#xff0c;两个金属块中间隔一层绝缘体就可以构成一个最简单的电容。如图 32_1 (俯视图)&#xff0c;有两个金属片&#xff0c;之间有一个绝缘介质&#xff0c;这样就构成了一个电容。这样一个电容在电路板上非常容…

二维数组求最大值(C语言)

一、N-S流程图&#xff1b; 二、运行结果&#xff1b; 三、源代码&#xff1b; # define _CRT_SECURE_NO_WARNINGS # include <stdio.h>int main() {//初始化变量值&#xff1b;int i, j, max 0, row 0, colum 0;int arr[3][4] { {1, 2, 3}, {4, 5, 16}, {7, 8, 9} …

线上办理离婚快速离婚,无需双方见面异地可办

现在离婚有两种方式 一种是协议离婚&#xff0c;双方都同意的情况下&#xff0c;可以去民政局协议离婚&#xff0c;有30天冷静期&#xff0c;冷静期过后需要双方再次去民政局办理离婚手续。 另一种是诉讼离婚&#xff0c;一方不同意离婚&#xff0c;可以选择诉讼离婚。可以全…