Spring Boot 集成 Redisson 实现消息队列

news2025/1/5 15:11:56

包含组件内容

  • RedisQueue:消息队列监听标识
  • RedisQueueInit:Redis队列监听器
  • RedisQueueListener:Redis消息队列监听实现
  • RedisQueueService:Redis消息队列服务工具

代码实现

RedisQueue

import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;

/**
 * Redis消息队列注解
 */
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
public @interface RedisQueue {
    /**
     * 队列名
     */
    String value();
}

RedisQueueInit

import jakarta.annotation.Resource;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import lombok.extern.slf4j.Slf4j;
import org.jetbrains.annotations.NotNull;
import org.redisson.api.RBlockingQueue;
import org.redisson.api.RedissonClient;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.stereotype.Component;

/**
 * 初始化Redis队列监听器
 *
 * @author 十八
 * @createTime 2024-09-09 22:49
 */
@Slf4j
@Component
public class RedisQueueInit implements ApplicationContextAware {

    final AtomicBoolean shutdownRequested = new AtomicBoolean(false);
    @Resource
    private RedissonClient redissonClient;
    private ExecutorService executorService;

    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        Map<String, RedisQueueListener> map = applicationContext.getBeansOfType(RedisQueueListener.class);
        executorService = createThreadPool("redis-queue");
        for (Map.Entry<String, RedisQueueListener> entry : map.entrySet()) {
            RedisQueue redisQueue = entry.getValue().getClass().getAnnotation(RedisQueue.class);
            if (redisQueue != null) {
                String queueName = redisQueue.value();
                executorService.submit(() -> listenQueue(queueName, entry.getValue()));
            }
        }
    }

    private ExecutorService createThreadPool(String namePrefix) {
        return Executors.newFixedThreadPool(
                Runtime.getRuntime().availableProcessors(),
                new NamedThreadFactory(namePrefix)
        );
    }

    private void listenQueue(String queueName, RedisQueueListener redisQueueListener) {
        RBlockingQueue<?> blockingQueue = redissonClient.getBlockingQueue(queueName);
        log.info("Redis队列监听: {}", queueName);
        while (true) {
            if (shutdownRequested.get() || redissonClient.isShutdown()) {
                log.info("Redisson已关闭,停止监听队列: {}", queueName);
                break;
            }

            try {
                Object message = blockingQueue.take();
                redisQueueListener.invoke(message);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                log.error("监听队列线程被中断", e);
                break;
            } catch (Exception e) {
                log.error("监听队列线程错误", e);
            }
        }
    }

    public void shutdown() {
        if (executorService != null) {
            executorService.shutdown();
            try {
                if (!executorService.awaitTermination(60, TimeUnit.SECONDS)) {
                    executorService.shutdownNow();
                }
            } catch (InterruptedException ex) {
                executorService.shutdownNow();
                Thread.currentThread().interrupt();
            }
        }
        shutdownRequested.set(true);
        if (redissonClient != null && !redissonClient.isShuttingDown()) {
            redissonClient.shutdown();
        }
    }

    private static class NamedThreadFactory implements ThreadFactory {
        private final AtomicInteger threadNumber = new AtomicInteger(1);
        private final String namePrefix;

        public NamedThreadFactory(String prefix) {
            this.namePrefix = prefix;
        }

        @Override
        public Thread newThread(@NotNull Runnable r) {
            return new Thread(r, namePrefix + "-" + threadNumber.getAndIncrement());
        }
    }
}

RedisQueueListener

/**
 * Redis消息队列监听实现
 *
 * @author 十八
 * @createTime 2024-09-09 22:51
 */
public interface RedisQueueListener<T> {

    /**
     * 队列消费方法
     *
     * @param content 消息内容
     */
    void invoke(T content);
}

RedisQueueService

import jakarta.annotation.Resource;
import java.util.concurrent.TimeUnit;
import org.redisson.api.RBlockingQueue;
import org.redisson.api.RDelayedQueue;
import org.redisson.api.RedissonClient;
import org.springframework.stereotype.Component;

/**
 * Redis 消息队列服务
 *
 * @author 十八
 * @createTime 2024-09-09 22:52
 */
@Component
public class RedisQueueService {

    @Resource
    private RedissonClient redissonClient;

    /**
     * 添加队列
     *
     * @param queueName 队列名称
     * @param content   消息
     * @param <T>       泛型
     */
    public <T> void send(String queueName, T content) {
        RBlockingQueue<T> blockingQueue = redissonClient.getBlockingQueue(queueName);
        blockingQueue.add(content);
    }

    /**
     * 添加延迟队列
     *
     * @param queueName 队列名称
     * @param content   消息类型
     * @param delay     延迟时间
     * @param timeUnit  单位
     * @param <T>       泛型
     */
    public <T> void sendDelay(String queueName, T content, long delay, TimeUnit timeUnit) {
        RBlockingQueue<T> blockingFairQueue = redissonClient.getBlockingQueue(queueName);
        RDelayedQueue<T> delayedQueue = redissonClient.getDelayedQueue(blockingFairQueue);
        delayedQueue.offer(content, delay, timeUnit);
    }

    /**
     * 发送延迟队列消息(单位毫秒)
     *
     * @param queueName 队列名称
     * @param content   消息类型
     * @param delay     延迟时间
     * @param <T>       泛型
     */
    public <T> void sendDelay(String queueName, T content, long delay) {
        RBlockingQueue<T> blockingFairQueue = redissonClient.getBlockingQueue(queueName);
        RDelayedQueue<T> delayedQueue = redissonClient.getDelayedQueue(blockingFairQueue);
        delayedQueue.offer(content, delay, TimeUnit.MILLISECONDS);
    }
}

测试

创建监听对象

import cn.yiyanc.infrastructure.redis.annotation.RedisQueue;
import cn.yiyanc.infrastructure.redis.queue.RedisQueueListener;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;

/**
 * @author 十八
 * @createTime 2024-09-10 00:09
 */
@Slf4j
@Component
@RedisQueue("test")
public class TestListener implements RedisQueueListener<String> {
    @Override
    public void invoke(String content) {
        log.info("队列消息接收 >>> {}", content);
    }
}

测试用例

import jakarta.annotation.Resource;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

/**
 * @author 十八
 * @createTime 2024-09-10 00:11
 */
@RestController
@RequestMapping("queue")
public class QueueController {

    @Resource
    private RedisQueueService redisQueueService;

    @PostMapping("send")
    public void send(String message) {
        redisQueueService.send("test", message);
        redisQueueService.sendDelay("test", "delay messaege -> " + message, 1000);
    }

}

测试结果

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2120412.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

GD32E230 RTC报警中断功能使用

GD32E230 RTC报警中断使用 GD32E230 RTC时钟源有3个&#xff0c;一个是内部RC振动器产生的40KHz作为时钟源&#xff0c;或者是有外部32768Hz晶振.,或者外部高速时钟晶振分频作为时钟源。 &#x1f516;个人认为最难理解难点的就是有关RTC时钟异步预分频和同步预分频的计算。在对…

C++第二节入门 - 缺省参数和函数重载

一、缺省参数 1、概念 缺省参数是声明或定义函数时为函数的参数指定一个缺省值。 在调用该函数的时候&#xff0c;如果没有指定实参则采用该形参的缺省值&#xff0c;否则使用指定的实参&#xff01; #include<iostream> using namespace std;void Func(int a 0) {c…

2024 水博会,国信华源登场,数智创新助力水利高质量发展

9月4日-6日&#xff0c;由中国水利学会和中国水利工程协会共同主办的2024中国水博览会暨第十九届中国&#xff08;国际&#xff09;水务创新技术交流会在重庆国际博览中心召开。 本次水博会以“展水利前沿新技术 览新质生产力场景”为主题&#xff0c;国信华源携最新智能监测预…

【佳学基因检测】如何升级一个不再维护的软件包中的PHP代码?

如何升级一个不再维护的软件包中的PHP代码&#xff1f; 为什么要升级一个不再维护但是仍在使用的软件包中的PHP代码&#xff1f; 升级一个不再维护但仍在使用的软件包中的 PHP 代码是一个复杂但重要的过程。虽然这些软件包可能已经不再活跃地维护或更新&#xff0c;但升级其代…

通信工程学习:什么是ATM异步转移模式

ATM&#xff1a;异步转移模式 ATM&#xff1a;Asynchronous Transfer Mode&#xff08;异步转移模式&#xff09;是一种先进的通信技术&#xff0c;它采用固定长度的信元&#xff08;Cell&#xff09;作为信息传输、复用、交换及处理的基本单位&#xff0c;并通过异步时分复用的…

挖矿木马-Linux

目录 介绍步骤 介绍 1、挖矿木马靶机中切换至root用户执行/root目录下的start.sh和attack.sh 2、题目服务器中包含两个应用场景&#xff0c;redis服务和hpMyAdmin服务&#xff0c;黑客分别通过两场景进行入侵&#xff0c;入侵与后续利用线路路如下&#xff1a; redis服务&…

Tomcat Request Cookie 丢失问题

优质博文&#xff1a;IT-BLOG-CN 一、问题描述 生产环境偶尔(涉及到多线程处理)出现"前端传递Cookie为空"的告警&#xff0c;导致前端请求丢失&#xff0c;出现请求失败问题。告警内容如下 前端传递Cookie为空 告警内容&#xff1a;服务端获取request Cookie为空&…

2024网安周今日开幕,亚信安全亮相30城

2024年国家网络安全宣传周今天在广州拉开帷幕。今年网安周继续以“网络安全为人民&#xff0c;网络安全靠人民”为主题。2024年国家网络安全宣传周涵盖了1场开幕式、1场高峰论坛、5个重要活动、15场分论坛/座谈会/闭门会、6个主题日活动和网络安全“六进”活动。亚信安全出席20…

每日一练:螺旋矩阵

一、题目要求 给你一个 m 行 n 列的矩阵 matrix &#xff0c;请按照 顺时针螺旋顺序 &#xff0c;返回矩阵中的所有元素。 示例 1&#xff1a; 输入&#xff1a;matrix [[1,2,3],[4,5,6],[7,8,9]] 输出&#xff1a;[1,2,3,6,9,8,7,4,5]示例 2&#xff1a; 输入&#xff1a;ma…

Java | Leetcode Java题解之第396题旋转函数

题目&#xff1a; 题解&#xff1a; class Solution {public int maxRotateFunction(int[] nums) {int f 0, n nums.length, numSum Arrays.stream(nums).sum();for (int i 0; i < n; i) {f i * nums[i];}int res f;for (int i n - 1; i > 0; i--) {f numSum - …

NISP 一级 | 3.1 网络基础知识

关注这个证书的其他相关笔记&#xff1a;NISP 一级 —— 考证笔记合集-CSDN博客 0x01&#xff1a;Internet 和 TCP/IP 协议 因特网&#xff08;Internet&#xff09;通过 TCP/IP 协议将遍布在全世界各地的计算机互联&#xff0c;从而形成超级计算机网络。因特网为用户提供了非…

【50 Pandas+Pyecharts | 暑期档电影票房数据分析可视化】

文章目录 &#x1f3f3;️‍&#x1f308; 1. 导入模块&#x1f3f3;️‍&#x1f308; 2. Pandas数据处理2.1 读取数据2.2 提取电影名称 &#x1f3f3;️‍&#x1f308; 3. Pyecharts数据可视化3.1 电影总票房排行3.2 各电影票房占比3.3 2023中国各省地区大学数量分布3.4 《抓…

浅谈产线工控安全,产线工控安全的有效方案

随着工业4.0的发展&#xff0c;产线日益智能化&#xff0c;生产网已经发展成一个组网的计算机环境&#xff0c;虽然都进行了隔离&#xff0c;但仍需和外部进行数据交互&#xff0c;导致有病毒入侵可能。 产线工控安全事件不断 深信达MCK主机加固方案&#xff0c;针对产线工控…

工业无人机性能参数特点!!!

一、基本性能参数 动力系统&#xff1a;工业无人机多采用电动或油动动力系统&#xff0c;以提供足够的推力和续航能力。电动无人机通常具有较低的噪音和振动&#xff0c;适合城市或近距离作业&#xff1b;而油动无人机则具有更长的续航时间和更大的载重能力&#xff0c;适合远…

安科瑞Acrel-1000DP分布式光伏监控系统平台的设计与应用-安科瑞 蒋静

针对用户新能源接入后存在安全隐患、缺少有效监控、发电效率无法保证、收益计算困难、运行维护效率低等通点&#xff0c;提出的Acrel-1000DP分布式光伏监控系统平台&#xff0c;对整个用户电站全面监控&#xff0c;为用户实现降低能源使用成本、减轻变压器负载、余电上网&#…

MySQL基础作业三

查询 1.分别查询student表和score表的所有记录 mysql> select *from student; ---------------------------------------------------- | id | name | sex | birth | department | address | ---------------------------------------------------- | 901 | 张三丰…

大模型×认知科学:多维潜空间洞悉复杂认知

最近的一篇来自于Cognitive Sciences的精炼综述带给了我一些对于当下AI的某种反向思考&#x1f914;&#xff0c;分享给大家&#xff1a; 这篇综述讨论了如何通过多种降维技术揭示认知科学中的潜在表征空间&#xff0c;并探讨了选择适合研究目标的嵌入算法时需要考虑的关键因素…

每日一题,力扣leetcode Hot100之206反转链表

原来的链表是1-2-3-4-5-null 反转后是5-4-3-2-1-null 只需要循环遍历&#xff0c;并且借一个temp便可以完成反转 class Solution:def reverseList(self, head: ListNode) -> ListNode:cur, pre head, Nonewhile cur:tmp cur.next # 暂存后继节点 cur.nextcur.next pre…

AndroidStudio清除重置Http Proxy代理的方式

问题背景 在国内做代码开发的都知道&#xff0c;在国际互联网我们存在看不见的墙&#xff0c;导致无法访问一些代码库和资源&#xff0c;所以在使用开发工具拉取第三方库的时候总会遇到无法连接或者连接超时的情况&#xff0c;所以就会使用一些安全的网络代理工具&#xff0c;辅…

[项目][WebServer][项目介绍及知识铺垫][上]详细讲解

目录 1.何为WWW?2.HTTP分层1.整体2.细节3.DNS?4.协议之间是如何协同运作的&#xff1f; 3.Http相关概念1.特点2.URI && URL && URN3.HTTP URL格式 1.何为WWW? WWW是环球信息网的缩写&#xff0c;常简称为Web分为Web客户端和Web服务器程序&#xff0c;WWW可…