redis stream restTemplate消息监听队列框架搭建

news2025/1/8 4:45:08

整体思路

        1. pom增加redis依赖;

        2. 消息监听器,实现StreamListener接口,处理消息到达逻辑;

        3. 将消息订阅bean及监听器注册到配置中;

1. pom

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.7.6</version>
    </parent>




<dependencies>

    <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-redis</artifactId>
        </dependency>

2. 消息监听器实现代码

package cn.thuniwhir.fileserver.redis;

import com.alibaba.fastjson.JSON;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.data.redis.connection.stream.MapRecord;
import org.springframework.data.redis.stream.StreamListener;
import org.springframework.stereotype.Component;

import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/**
 * @Description: TODO
 **/
@Component
public class RedisMQListener implements StreamListener<String, MapRecord<String, String, Object>> {

    private static final Logger log = LoggerFactory.getLogger(RedisMQListener.class);

    // 创建一个线程池
    private static final ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
            10, 20, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>());
    @Override
    public void onMessage(MapRecord message) {
        // 异步处理消息
        threadPoolExecutor.execute(()->{
            System.out.println(Thread.currentThread().getName() + ":接收到的消息:" + message.getId() + ";" + JSON.toJSONString(message.getValue()));
        });

    }
}

3. redis订阅bean及监听器注册

package cn.thuniwhir.fileserver.redis;

import cn.thuniwhir.fileserver.context.Constants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.connection.stream.Consumer;
import org.springframework.data.redis.connection.stream.ReadOffset;
import org.springframework.data.redis.connection.stream.StreamInfo;
import org.springframework.data.redis.connection.stream.StreamOffset;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.stream.StreamMessageListenerContainer;
import org.springframework.data.redis.stream.Subscription;

import java.time.Duration;
import java.util.stream.Collectors;

/**
 * @Description: TODO
 **/
@Configuration
public class RedisMQConfig {

    @Autowired
    private RedisMQListener redisMQListener;

    @Autowired
    private RedisUtils redisUtils;

    private static RedisTemplate<Object, Object> redisTemplate;
    private static final Logger log = LoggerFactory.getLogger(RedisMQConfig.class);

    public RedisMQConfig(RedisTemplate<Object, Object> redisTemplate) {
        this.redisTemplate = redisTemplate;
    }

    @Bean
    public Subscription subscription(RedisConnectionFactory redisConnectionFactory) {
        if (redisUtils.hasKey(Constants.FILE_MQ_DISK_THRESHOLD_QUENAME)) {
            StreamInfo.XInfoGroups xInfoGroups = redisTemplate.opsForStream().groups(Constants.FILE_MQ_DISK_THRESHOLD_QUENAME);
            if (xInfoGroups.isEmpty()) {
                redisTemplate.opsForStream().createGroup(Constants.FILE_MQ_DISK_THRESHOLD_QUENAME, Constants.FILE_MQ_DISK_THRESHOLD_GROUPNAME);
            } else {
                if (xInfoGroups.stream().filter(xInfoGroups1 -> xInfoGroups1.groupName().equals(Constants.FILE_MQ_DISK_THRESHOLD_GROUPNAME)).collect(Collectors.toList()).isEmpty()) {
                    redisTemplate.opsForStream().createGroup(Constants.FILE_MQ_DISK_THRESHOLD_QUENAME, Constants.FILE_MQ_DISK_THRESHOLD_GROUPNAME);
                }
            }
        } else {
            redisTemplate.opsForStream().createGroup(Constants.FILE_MQ_DISK_THRESHOLD_QUENAME, Constants.FILE_MQ_DISK_THRESHOLD_GROUPNAME);
        }
        StreamMessageListenerContainer.StreamMessageListenerContainerOptions options = StreamMessageListenerContainer.StreamMessageListenerContainerOptions.builder()
                .pollTimeout(Duration.ofSeconds(1)).build();
        StreamMessageListenerContainer streamMessageListenerContainer = StreamMessageListenerContainer.create(redisConnectionFactory, options);
        Subscription subscription = streamMessageListenerContainer.receiveAutoAck(Consumer.from(Constants.FILE_MQ_DISK_THRESHOLD_GROUPNAME, Constants.FILE_MQ_DISK_THRESHOLD_CONSUMER), StreamOffset.create(Constants.FILE_MQ_DISK_THRESHOLD_QUENAME, ReadOffset.lastConsumed()), redisMQListener);
        streamMessageListenerContainer.start();
        return subscription;
    }

}

4. 测试生产消息 消息监听成功

4.1 生产消息

@RequestMapping("/produceMessage")
    public JSONObject produceMessage(@RequestBody JSONObject jsonObject) {
        String key = jsonObject.getString("key");
        String value = jsonObject.getString("value");
        MapRecord<Object, String, Object> mapRecord = MapRecord.create(Constants.FILE_MQ_DISK_THRESHOLD_QUENAME, Collections.singletonMap(key, value));
        redisTemplate.opsForStream().add(mapRecord);
        System.out.println("produceMessage Thread Name:" + Thread.currentThread().getName());
        return formatResult(null);
    }

4.2 消息监听器监听消息到达 代码见第二节

4.3 测试结果

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1375384.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【MySQL】锁机制

文章目录 一、表级锁和行级锁二、排他锁和共享锁三、InnoDB行级锁行级锁间隙锁意向共享锁和意向排他锁 四、InnoDB表级锁五、死锁六、锁的优化建议 一、表级锁和行级锁 表级锁&#xff1a; 对整张表加锁。开销小&#xff0c;加锁快&#xff0c;不会出现死锁&#xff1b;锁粒度…

c++学习笔记-STL案例-机房预约系统1-准备工作

前言 准备工作包括&#xff1a;需求分析、项目创建、主菜单实现、退出功能实现 目录 1 机房预约系统需求 1.1 简单介绍 1.2 身份介绍 1.3 机房介绍 1.4 申请介绍 1.5 系统具体要求 1.6 预约系统-主界面思维导图 2 创建项目 2.1 创建项目 2.2 添加文件 ​编辑 3 创建…

VTK开发调试环境下载(VTK开发环境一步到位直接开发,无需自己配置编译 VS2017+Qt5.12.10+VTK)

一、无与伦比的优势 直接下载代码就可以调试的VTK代码仓库。 二、资源制作原理 这个资源根据VTK源码 编译出动态库文件 pdb lib dll 文件&#xff08; x64 debug &#xff09; 并将这两者同时放在一个代码仓库里&#xff0c;下载就能用。 三、使用方法&#xff08;vtk-so…

【hyperledger-fabric】部署Java应用远程访问智能合约

简介 首先是根据b站的视频 hyperledger-fabric【3】在 java 应用中访问合约 以及hyperledger-fabric【5】Java应用和私有数据&#xff0c;本文章主要讲述的是视频中我遇到的问题&#xff0c;以及相关知识点的总结。 遇到的问题 问题1&#xff1a;git clone下载下来的代码发现…

Unity Meta Quest 一体机开发(十三):【手势追踪】自定义交互事件 EventWrapper

文章目录 &#x1f4d5;教程说明&#x1f4d5;交互事件概述&#x1f4d5;自定义交互逻辑⭐方法一&#xff1a;Inspector 面板赋值⭐方法二&#xff1a;纯代码处理 此教程相关的详细教案&#xff0c;文档&#xff0c;思维导图和工程文件会放入 Spatial XR 社区。这是一个高质量…

XCTF:CatCatCat[WriteUP]

从题目中下载到一张图片和一个txt文件 编码的开头是&#xff1a;U2FsdGVkX1所以是rabbit加密 尝试使用密钥&#xff1a;91 密码不对&#xff0c;无法解密所以从图片下手 使用010Editor搜索图片文本内容 尝试搜索password、flag等敏感字体 直接拿到rabbit解密需要的密钥是&am…

【AI视野·今日Sound 声学论文速览 第四十四期】Tue, 9 Jan 2024

AI视野今日CS.Sound 声学论文速览 Tue, 9 Jan 2024 Totally 27 papers &#x1f449;上期速览✈更多精彩请移步主页 Daily Sound Papers DJCM: A Deep Joint Cascade Model for Singing Voice Separation and Vocal Pitch Estimation Authors Haojie Wei, Xueke Cao, Wenbo Xu…

移动光缆使用规定

移动光缆使用的规定&#xff1a; 1.1在移动光缆的施工过程中&#xff0c;其弯曲半径应该是移动光缆外径的15倍&#xff0c;并且不能小于20倍。 1.2在进行移动光缆敷设时&#xff0c;需要确保施加的拉力不超过移动光缆所允许的张力的80%。移动光缆的最大张力允许达到100%的限制…

Appium 自动化测试

1.Appium介绍 1&#xff0c;appium是开源的移动端自动化测试框架&#xff1b; 2&#xff0c;appium可以测试原生的、混合的、以及移动端的web项目&#xff1b; 3&#xff0c;appium可以测试ios&#xff0c;android应用&#xff08;当然了&#xff0c;还有firefoxos&#xff09;…

比亚迪:从低谷中涅槃,内功造就辉煌

修炼内功才能绽放光芒吗? 比亚迪用自己的奋斗史证明~ 只有经历低谷的洗礼&#xff0c;才能铸就属于自己的辉煌。 比亚迪这家公司的发展历程可谓是一部从战略转型到今天这个行业翘楚的奋斗史&#xff0c;真是跌宕起伏令人唏嘘。早期比亚迪从一个传统企业转型到汽车行业&#xf…

【Kafka-3.x-教程】-【五】Kafka-监控-Eagle

【Kafka-3.x-教程】专栏&#xff1a; 【Kafka-3.x-教程】-【一】Kafka 概述、Kafka 快速入门 【Kafka-3.x-教程】-【二】Kafka-生产者-Producer 【Kafka-3.x-教程】-【三】Kafka-Broker、Kafka-Kraft 【Kafka-3.x-教程】-【四】Kafka-消费者-Consumer 【Kafka-3.x-教程】-【五…

《ORANGE’S:一个操作系统的实现》读书笔记(二十八)文件系统(三)

上一篇文章记录了文件系统&#xff0c;我们将硬盘映像进行了分区&#xff0c;并且清楚了如何获取分区信息。但是硬盘驱动程序目前只能处理DEV_OPEN消息&#xff0c;这显然是不够的&#xff0c;这篇文章记录对硬盘驱动的完善&#xff0c;让它能够处理更多的消息&#xff0c;并且…

C++力扣题目110--平衡二叉树

给定一个二叉树&#xff0c;判断它是否是高度平衡的二叉树。 本题中&#xff0c;一棵高度平衡二叉树定义为&#xff1a; 一个二叉树每个节点 的左右两个子树的高度差的绝对值不超过 1 。 示例 1&#xff1a; 输入&#xff1a;root [3,9,20,null,null,15,7] 输出&#xff1a;t…

YOLOv8 + openVINO 多线程数据读写顺序处理

多线程数据读写顺序处理 一个典型的生产者-消费者模型&#xff0c;在这个模型中&#xff0c;多个工作线程并行处理从共享队列中获取的数据&#xff0c;并将处理结果以保持原始顺序的方式放入另一个队列。 多线程处理模型&#xff0c;具体细节如下&#xff1a; 1.数据:数据里必…

第三站:C/C++基础-二维数组

二维数组的概念 一维数组本身是多个大小相同的内存块,从0开始逐渐递增所组成的在横向上的有序"组合", 二维数组就是很多个一维数组在纵向上的组合,每一个一维数组就是二维数组在纵向上的从0开始的逐渐递增的一个单位,(所以一维数组在二维数组的基础上,每一个内存块…

AcWing1210-连号区间

文章目录 题目输入格式输出格式数据范围样例输入样例1输出样例1输入样例2输出样例2样例解释 思路代码 题目 输入格式 输出格式 数据范围 样例 输入样例1 4 3 2 4 1 输出样例1 7 输入样例2 5 3 4 2 5 1 输出样例2 9 样例解释 思路 固定L&#xff0c;遍历R在[L,R]区域中找到最大…

Java并发之互斥一:管程

1、简单聊聊什么是管程模型 &#xff08;共享资源&#xff09;&#xff1a;定义一个共享变量&#xff0c;可以理解锁&#xff0c;令牌这类的东西&#xff08;互斥访问共享资源&#xff09;&#xff1a;获取这个锁、令牌的时候是排好队的&#xff0c;只允许单线程访问&#xff…

《射雕三部曲》人物关系可视化及问答系统

背景&#xff1a; 该项目旨在构建一个基于图数据库和知识图谱的《射雕三部曲》人物关系可视化及问答系统。通过分析小说中的人物关系&#xff0c;将其构建成图数据库&#xff0c;并结合问答系统和数据分析技术&#xff0c;提供用户可视化的人物关系展示和相关问题的回答。 介绍…

快速排序-排序算法

算法思想 快速排序采用的仍然是分治的思想。 Step1.每次在无序的序列中选取一个基准数。 Step2.然后将大于和小于基准数的元素分别放置于基准数两边。&#xff08;前面部分的元素均小于或等于基准数&#xff0c;后面部分均大于或等于基准数&#xff09; Step3.然后采用分治法&…

mySQL 汇总

登录MySQL winR 打开查询命令 输入 cmd 输入net start MySQL 打开mysql 报错:系统错误&#xff0c;拒绝访问 &#xff08;没权限&#xff01;&#xff09; 解决办法&#xff1a;搜索栏查询‘cmd’ 使用管理员身份运行 &#xff08;或鼠标右键‘开始’&#xff0c;windows po…