【Java】SpringBoot中实现Redis Stream队列

news2025/1/11 18:46:43

SpringBoot实现Redis Stream队列

前言

简单实现一下在SpringBoot中操作Redis Stream队列的方式,监听队列中的消息进行消费。

jdk:1.8

springboot-version:2.6.3

redis:5.0.1(5版本以上才有Stream队列)

准备工作

1pom

redis 依赖包(version 2.6.3)

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-redis</artifactId>
        </dependency>

2 yml

spring: 
  redis:
    database: 0
    host: 127.0.0.1

3 RedisStreamUtil工具类

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.connection.stream.MapRecord;
import org.springframework.data.redis.connection.stream.StreamInfo;
import org.springframework.data.redis.connection.stream.StreamOffset;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;

import java.util.List;
import java.util.Map;

@Component
public class RedisStreamUtil {

	@Autowired
	private RedisTemplate<String, Object> redisTemplate;

	/**
	 * 创建消费组
	 *
	 * @param key   键名称
	 * @param group 组名称
	 * @return {@link String}
	 */
	public String oup(String key, String group) {
		return redisTemplate.opsForStream().createGroup(key, group);
	}

	/**
	 * 获取消费者信息
	 *
	 * @param key   键名称
	 * @param group 组名称
	 * @return {@link StreamInfo.XInfoConsumers}
	 */
	public StreamInfo.XInfoConsumers queryConsumers(String key, String group) {
		return redisTemplate.opsForStream().consumers(key, group);
	}

	/**
	 * 查询组信息
	 *
	 * @param key 键名称
	 * @return
	 */
	public StreamInfo.XInfoGroups queryGroups(String key) {
		return redisTemplate.opsForStream().groups(key);
	}

	// 添加Map消息
	public String addMap(String key, Map<String, Object> value) {
		return redisTemplate.opsForStream().add(key, value).getValue();
	}

	// 读取消息
	public List<MapRecord<String, Object, Object>> read(String key) {
		return redisTemplate.opsForStream().read(StreamOffset.fromStart(key));
	}

	// 确认消费
	public Long ack(String key, String group, String... recordIds) {
		return redisTemplate.opsForStream().acknowledge(key, group, recordIds);
	}

	// 删除消息。当一个节点的所有消息都被删除,那么该节点会自动销毁
	public Long del(String key, String... recordIds) {
		return redisTemplate.opsForStream().delete(key, recordIds);
	}

	// 判断是否存在key
	public boolean hasKey(String key) {
		Boolean aBoolean = redisTemplate.hasKey(key);
		return aBoolean != null && aBoolean;
	}
}


代码实现

生产者发送消息

生产者发送消息,在Service层创建addMessage方法,往队列中发送消息。

代码中addMap()方法第一个参数为key,第二个参数为value,该key要和后续配置的保持一致,暂时先记住这个key。

@Service
@Slf4j
@RequiredArgsConstructor
public class RedisStreamMqServiceImpl implements RedisStreamMqService {

    private final RedisStreamUtil redisStreamUtil;

    /**
     * 发送一个消息
     *
     * @return {@code Object}
     */
    @Override
    public Object addMessage() {
        RedisUser redisUser = new RedisUser();
        redisUser.setAge(18);
        redisUser.setName("hcr");
        redisUser.setEmail("156ef561@gmail.com");

        Map<String, Object> message = new HashMap<>();
        message.put("user", redisUser);

        String recordId = redisStreamUtil.addMap("mystream", message);
        return recordId;
    }
}

controller接口方法

@RestController
@RequestMapping("/redis")
@Slf4j
@RequiredArgsConstructor
public class RedisController {

    private final RedisStreamMqService redisStreamMqService;

    @GetMapping("/addMessage")
    public Object addMessage() {
        return redisStreamMqService.addMessage();
    }
}

调用测试,查看redis中是否正常添加数据。

接口返回数据

1702622585248-0

查看redis中的数据
在这里插入图片描述

消费者监听消息进行消费

创建RedisConsumersListener监听器

import cn.hcr.utils.RedisStreamUtil;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.connection.stream.MapRecord;
import org.springframework.data.redis.connection.stream.RecordId;
import org.springframework.data.redis.stream.StreamListener;
import org.springframework.stereotype.Component;

import java.util.Map;

@Component
@Slf4j
@RequiredArgsConstructor
public class RedisConsumersListener implements StreamListener<String, MapRecord<String, String, String>> {

    public final RedisStreamUtil redisStreamUtil;

    /**
     * 监听器
     *
     * @param message
     */
    @Override
    public void onMessage(MapRecord<String, String, String> message) {
        // stream的key值
        String streamKey = message.getStream();
        //消息ID
        RecordId recordId = message.getId();
        //消息内容
        Map<String, String> msg = message.getValue();
        log.info("【streamKey】= " + streamKey + ",【recordId】= " + recordId + ",【msg】=" + msg);

        //处理逻辑

        //逻辑处理完成后,ack消息,删除消息,group为消费组名称
        StreamInfo.XInfoGroups xInfoGroups = redisStreamUtil.queryGroups(streamKey);
        xInfoGroups.forEach(xInfoGroup -> redisStreamUtil.ack(streamKey, xInfoGroup.groupName(), recordId.getValue()));
        redisStreamUtil.del(streamKey, recordId.getValue());
    }
}

创建RedisConfig配置类,配置监听

package cn.hcr.config;

import cn.hcr.listener.RedisConsumersListener;
import cn.hcr.utils.RedisStreamUtil;
import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.PropertyAccessor;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.extern.slf4j.Slf4j;
import lombok.var;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.connection.stream.Consumer;
import org.springframework.data.redis.connection.stream.MapRecord;
import org.springframework.data.redis.connection.stream.ReadOffset;
import org.springframework.data.redis.connection.stream.StreamOffset;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;
import org.springframework.data.redis.stream.StreamMessageListenerContainer;
import org.springframework.data.redis.stream.Subscription;

import javax.annotation.Resource;
import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

@Configuration
@Slf4j
public class RedisConfig {

    @Resource
    private RedisStreamUtil redisStreamUtil;

    /**
     * redis序列化
     *
     * @param redisConnectionFactory
     * @return {@code RedisTemplate<String, Object>}
     */
    @Bean
    public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) {
        RedisTemplate<String, Object> template = new RedisTemplate<>();
        template.setConnectionFactory(redisConnectionFactory);
        Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class);
        ObjectMapper om = new ObjectMapper();
        om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
        om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
        jackson2JsonRedisSerializer.setObjectMapper(om);
        StringRedisSerializer stringRedisSerializer = new StringRedisSerializer();
        template.setKeySerializer(stringRedisSerializer);
        template.setHashKeySerializer(stringRedisSerializer);
        template.setValueSerializer(jackson2JsonRedisSerializer);
        template.setHashValueSerializer(jackson2JsonRedisSerializer);
        template.afterPropertiesSet();
        return template;
    }

    @Bean
    public Subscription subscription(RedisConnectionFactory factory) {
        AtomicInteger index = new AtomicInteger(1);
        int processors = Runtime.getRuntime().availableProcessors();
        ThreadPoolExecutor executor = new ThreadPoolExecutor(processors, processors, 0, TimeUnit.SECONDS,
                new LinkedBlockingDeque<>(), r -> {
            Thread thread = new Thread(r);
            thread.setName("async-stream-consumer-" + index.getAndIncrement());
            thread.setDaemon(true);
            return thread;
        });
        StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, MapRecord<String, String, String>> options =
                StreamMessageListenerContainer
                        .StreamMessageListenerContainerOptions
                        .builder()
                        // 一次最多获取多少条消息
                        .batchSize(5)
                        .executor(executor)
                        .pollTimeout(Duration.ofSeconds(1))
                        .errorHandler(throwable -> {
                            log.error("[MQ handler exception]", throwable);
                            throwable.printStackTrace();
                        })
                        .build();
        
        //该key和group可根据需求自定义配置
        String streamName = "mystream";
        String groupname = "mygroup";

        initStream(streamName, groupname);
        var listenerContainer = StreamMessageListenerContainer.create(factory, options);
        // 手动ask消息
        Subscription subscription = listenerContainer.receive(Consumer.from(groupname, "zhuyazhou"),
                StreamOffset.create(streamName, ReadOffset.lastConsumed()), new RedisConsumersListener(redisStreamUtil));
        // 自动ask消息
           /* Subscription subscription = listenerContainer.receiveAutoAck(Consumer.from(redisMqGroup.getName(), redisMqGroup.getConsumers()[0]),
                    StreamOffset.create(streamName, ReadOffset.lastConsumed()), new ReportReadMqListener());*/
        listenerContainer.start();
        return subscription;
    }

    private void initStream(String key, String group) {
        boolean hasKey = redisStreamUtil.hasKey(key);
        if (!hasKey) {
            Map<String, Object> map = new HashMap<>(1);
            map.put("field", "value");
            //创建主题
            String result = redisStreamUtil.addMap(key, map);
            //创建消费组
            redisStreamUtil.oup(key, group);
            //将初始化的值删除掉
            redisStreamUtil.del(key, result);
            log.info("stream:{}-group:{} initialize success", key, group);
        }
    }
}


redisTemplate:该bean用于配置redis序列化

subscription:配置监听

initStream:初始化消费组

监听测试

使用addMessage()方法投送一条消息后,查看控制台输出信息。

【streamKey】= mystream,
【recordId】= 1702623008044-0,
【msg】=
{user=[
    "cn.hcr.pojo.RedisUser",
    {"name":"hcr","age":18,"email":"156ef561@gmail.com"}
    ]
}

总结

以上就是在SpringBoot中简单实现Redis Stream队列的Demo,如有需要源码或者哪里不清楚的请评论或者发送私信。
Template:该bean用于配置redis序列化

subscription:配置监听

initStream:初始化消费组

监听测试

使用addMessage()方法投送一条消息后,查看控制台输出信息。

【streamKey】= mystream,
【recordId】= 1702623008044-0,
【msg】=
{user=[
    "cn.hcr.pojo.RedisUser",
    {"name":"hcr","age":18,"email":"156ef561@gmail.com"}
    ]
}

总结

以上就是在SpringBoot中简单实现Redis Stream队列的Demo,如有需要源码或者哪里不清楚的请评论或者发送私信。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1314091.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

机器学习算法---回归

1. 线性回归&#xff08;Linear Regression&#xff09; 原理&#xff1a; 通过拟合一个线性方程来预测连续响应变量。线性回归假设特征和响应变量之间存在线性关系&#xff0c;并通过最小化误差的平方和来优化模型。优点&#xff1a; 简单、直观&#xff0c;易于理解和实现。…

力扣题:数字与字符串间转换-12.16

力扣题-12.16 [力扣刷题攻略] Re&#xff1a;从零开始的力扣刷题生活 力扣题1&#xff1a;640. 求解方程 解题思想&#xff1a;首先将方程按照“”进行划分&#xff0c;然后分别记录x的因数和常数项&#xff0c;最后进行返回的判断即可 class Solution(object):def solveEqu…

娱乐新拐点:TikTok如何改变我们的日常生活?

在数字时代的浪潮中&#xff0c;社交媒体平台不断涌现&#xff0c;其中TikTok以其独特的短视频内容在全球范围内掀起了一场娱乐革命。本文将深入探讨TikTok如何改变我们的日常生活&#xff0c;从社交互动、文化传播到个人创意表达&#xff0c;逐步改写了娱乐的新篇章。 短视频潮…

【Idea】SpringBoot项目中,jar包引用冲突异常的排查 / SM2算法中使用bcprov-jdk15to18的报错冲突问题

问题描述以及解决方法&#xff1a; 项目中使用了bcprov-jdk15to18 pom依赖&#xff0c;但是发现代码中引入的版本不正确。 追溯代码发现版本引入的是bcprov-jdk15on&#xff0c;而不是bcprov-jdk15to18&#xff0c;但是我找了半天pom依赖也没有发现有引入bcprov-jdk15on依赖。…

【MySQL】:表的增加和查寻

表的增查 一.Create(增)1.基本插入2.插入是否更新3.替换 二.Retrieve(查)1.select列1.全列查询2.指定列查询3.查询字段为表达式4.结果去重 2.where条件查询1.运算符2.运算符使用 3.结果排序4.筛选分页结果 一.Create(增) 1.基本插入 对于表的增加&#xff0c;前面已经用过很多…

什么是供应链安全及其工作原理?

6000公里长的丝绸之路将丝绸、谷物和其他货物从中国运送到帕尔米拉。尽管蒙古治下的和平保护丝绸之路免受海盗、强盗和内部盗窃的侵害&#xff0c;但商人仍然装备精良&#xff0c;并依赖于大型商队旅行和战略性放置的小型堡垒所提供的安全。 为什么供应链安全很重要&#xff1…

(1)(1.8) MSP(MultiWii 串行协议)(4.1 版)

文章目录 前言 1 协议概述 2 配置 3 参数说明 前言 ArduPilot 支持 MSP 协议&#xff0c;可通过任何串行端口进行遥测和传感器。这允许 ArduPilot 将其遥测数据发送到 MSP 兼容设备&#xff08;如大疆护目镜&#xff09;&#xff0c;用于屏幕显示&#xff08;OSD&#xff…

微服务保护--熔断降级

1.熔断降级介绍 熔断降级是解决雪崩问题的重要手段。其思路是由断路器统计服务调用的异常比例、慢请求比例&#xff0c;如果超出阈值则会熔断该服务。即拦截访问该服务的一切请求&#xff1b;而当服务恢复时&#xff0c;断路器会放行访问该服务的请求。 断路器控制熔断和放行…

ISCTF(a)

where_is_the_flag 答案应该被分成了三份了 蚁剑连接看看 第一个 第二个 第三个&#xff0c;在www下 Yunxi{0797d78c-0cb2-4cfb-87e6-f9c102f716f3} 命令执行 POST : 1 system ( tac flag.php ); 1 system ( tac /flag2 ); 1 system ( env ); 1z_Ssql 使用万能密码 后…

【LeetCode刷题笔记(7-1)】【Python】【四数之和】【哈希表】【中等】

文章目录 四数之和题目描述示例 1示例 2提示解决方案1&#xff1a;【四层遍历查找】解决方案2&#xff1a;【哈希表】【三层遍历】 结束语 四数之和 四数之和 题目描述 给你一个由 n 个整数组成的数组 nums &#xff0c;和一个目标值 target 。请你找出并返回满足下述全部条件…

鸿蒙原生应用/元服务开发-Stage模型能力接口(五)

说明 Common模块将二级模块API组织在一起方便开发者进行导出。本模块首批接口从API version 9开始支持。后续版本的新增接口&#xff0c;采用上角标单独标记接口的起始版本。本模块接口仅可在Stage模型下使用 二、 导入模块 import common from ohos.app.ability.common; 三、 …

动手学深度学习-自然语言处理:应用

自然语言处理中的应用主要有哪些&#xff1f; 自然语言处理&#xff1a;应用 情感分析及数据集 情感分析研究人们在文本中的情感&#xff0c;这被认为是一个文本分类问题&#xff0c;它将可变长度的文本序列进行转换为固定长度的文本类别。经过预处理后&#xff0c;我们可以使…

ros2+xml格式launch文件示例代码(重要内容)

源自githubeasy_ros2_launch_talk/easy_launch_demo/launch/demo_launch.xml at main tylerjw/easy_ros2_launch_talk GitHub <launch><arg name"robot_ip" default"xxx.yyy.zzz.www" /><arg name"use_fake_hardware" default…

「Verilog学习笔记」RAM的简单实现

专栏前言 本专栏的内容主要是记录本人学习Verilog过程中的一些知识点&#xff0c;刷题网站用的是牛客网 timescale 1ns/1ns module ram_mod(input clk,input rst_n,input write_en,input [7:0]write_addr,input [3:0]write_data,input read_en,input [7:0]read_addr,output reg…

鸿蒙开发ArkTS语言-XML解析

XML概述 XML&#xff08;可扩展标记语言&#xff09;是一种用于描述数据的标记语言&#xff0c;旨在提供一种通用的方式来传输和存储数据&#xff0c;特别是Web应用程序中经常使用的数据。XML并不预定义标记。因此&#xff0c;XML更加灵活&#xff0c;并且可以适用于广泛的应用…

黑马点评04集群下的并发安全

实战篇-08.优惠券秒杀-集群下的线程并发安全问题_哔哩哔哩_bilibili 为了应对高并发&#xff0c;需要把项目部署到多个机器构成集群&#xff0c;所以需要配置nginx。 1.如何模拟集群 通过idea的ctrl d修改配置&#xff0c;实现多个tomcat运行模拟集群 然后在nginx上配置节点&…

Swift爬虫采集唯品会商品详情

我有个朋友之前在唯品会开的店&#xff0c;现在想转战其他平台&#xff0c;想要店铺信息商品信息全部迁移过去&#xff0c;如果想要人工手动操作就有点麻烦了&#xff0c;然后有天找到我 &#xff0c;让我看看能不能通过技术手段实现商品信息迁移。嫌来无事&#xff0c;写了下面…

高速大文件传输对企业的重要性

随着信息技术的迅猛发展&#xff0c;企业在日常运营中越来越频繁地涉足大文件的传输领域。大文件传输是指在企业或个人之间传递容量较大的文件&#xff0c;例如高分辨率图像、视频、数据库备份等。过去&#xff0c;这样的传输可能需要数小时甚至更长时间&#xff0c;但随着业务…

一种用于心音分类的轻量级1D-CNN+DWT网络

这是由National Institute of Technology Rourkela, Central University of Rajasthan发布在2022 ICETCI的论文&#xff0c;利用离散小波变换(DWT)得到的多分辨率域特征对1D-CNN模型进行心音分类训练。 预处理& DWT 由于FHS和各种病理声的频率范围在500hz以下[5]&#xff…

嵌入式人工智能(钱多?好学?前景好?)

概念 嵌入式人工智能&#xff08;Embedded AI&#xff09;是指将人工智能&#xff08;AI&#xff09;技术集成到各种设备和系统中&#xff0c;使其具备智能化和自主性。与传统的中央化计算模型不同&#xff0c;嵌入式人工智能将AI能力嵌入到设备本身&#xff0c;使其能够在本地…