Spring boot 使用Redis 消息发布订阅

news2025/1/16 4:59:07

Spring boot 使用Redis 消息发布订阅

文章目录

  • Spring boot 使用Redis 消息发布订阅
    • Redis 消息发布订阅
        • Redis 发布订阅 命令
    • Spring boot 实现消息发布订阅
      • 发布消息
      • 消息监听
      • 主题订阅
    • Spring boot 监听 Key 过期事件
      • 消息监听
      • 主题订阅

最近在做请求风控的时候,在网上搜集了大量的解决方案,最后使用Redis 消息发布订阅 比较符合业务。做一下记录

img

Redis 消息发布订阅

img

Redis 发布订阅 命令:redis命令手册

1、Redis 中"pub/sub"的消息,为"即发即失",server 不会保存消息,如果 publish 的消息没有任何 client 处于 “subscribe” 状态,消息将会被丢弃;如果 client 在 subcribe 时,链接断开后重连,那在么此期间的消息也将丢失。

2、Redis server 将会"尽力"将消息发送给处于 subscribe 状态的 client,但是仍不会保证每条消息都能被正确接收。

**优点:**支持发布订阅,支持多组生产者、消费者处理消息

缺点:

  1. 消费者下线数据会丢失

  2. 不支持数据持久化,Redis宕机则数据也会丢失

  3. 消息堆积,缓存区溢出,消费者会被强制踢下线,数据也会丢失

Redis 发布订阅 命令
命令描述
Redis Unsubscribe 命令指退订给定的频道。
Redis Subscribe 命令订阅给定的一个或多个频道的信息。
Redis Pubsub 命令查看订阅与发布系统状态。
Redis Punsubscribe 命令退订所有给定模式的频道。
Redis Publish 命令将信息发送到指定的频道。
Redis Psubscribe 命令订阅一个或多个符合给定模式的频道。

Spring boot 实现消息发布订阅

1、引入 Redis 依赖

    <!--Spring Boot redis 启动器-->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-data-redis</artifactId>
    </dependency>

2、Redis 数据库配置

spring:
  data:
    redis:
      database: 0
      host: localhost
      port: 6379
      password:

发布消息

	/**
     * redis 将信息发送到指定的频道
     * @param topic   :消息所属的主题/频道
     * @param context :消息内容
     * @return
     */
	redisTemplate.convertAndSend(topic, context);

@RequiredArgsConstructor
@Service
public class RequestRateLimiterService {

	private final RedisTemplate<String, Object> redisTemplate;

	// Redis 中的 key 前缀
	private static final String REDIS_KEY_PREFIX = "select_rate_limit:";

	// Redis 中的通道名称
	private static final String REDIS_CHANNEL = "select_rate_limit_channel";

    // 根据用户名 请求风控
	public boolean allowRequest(String username) {
		
		// 每分钟最大请求次数
		Long MAX_REQUESTS_PER_MINUTE = 60L;

		String key = REDIS_KEY_PREFIX + username;
		Long currentRequests = redisTemplate.opsForValue().increment(key);
		if (currentRequests != null && currentRequests > MAX_REQUESTS_PER_MINUTE) {
			redisTemplate.convertAndSend(REDIS_CHANNEL, username);
			return false; // 超过阈值,拒绝请求
		}
		if (currentRequests != null && currentRequests == 1) {
			redisTemplate.expire(key, 1, TimeUnit.MINUTES); // 设置过期时间为1分钟
		}
		return true; // 允许请求
	}

}

消息监听

1、 Redis 消息订阅-消息监听器,当收到阅订的消息时,会将消息交给这个类处理。

/**
 * Redis 消息订阅-消息监听器,当收到阅订的消息时,会将消息交给这个类处理
 * <p>
 * 1、可以直接实现 MessageListener 接口,也可以继承它的实现类 MessageListenerAdapter.
 * 2、自动多线程处理,打印日志即可看出,即使手动延迟,也不会影响后面消息的接收。
 *
 */
@Component
public class RequestRateLimitSubscriber implements MessageListener {
    // 直接从容器中获取
    @Resource
    private RedisTemplate<String, Object> redisTemplate;
    /**
     * 监听到的消息必须进行与发送时相同的方式进行反序列
     * 1、订阅端与发布端 Redis 序列化的方式必须相同,否则会乱码。
     *
     * @param message :消息实体
     * @param pattern :匹配模式
     */
    @Override
    public void onMessage(Message message, byte[] pattern) {
        // 消息订阅的匹配规则,如 new PatternTopic("basic-*") 中的 basic-*
        String msgPattern = new String(pattern);
        // 消息所属的通道,可以根据不同的通道做不同的业务逻辑
        String channel = (String) redisTemplate.getStringSerializer().deserialize(message.getChannel());
        // 接收的消息内容,可以根据自己需要强转为自己需要的对象,但最好先使用 instanceof 判断一下
        Object body = redisTemplate.getValueSerializer().deserialize(message.getBody());
 
        log.info("收到 Redis 订阅消息: channel={} body={} pattern={} ", channel, body, msgPattern);
 
        // 模拟数据处理 ********
        // 发送警告通知,可以通过邮件、短信等方式进行通知
        log.info("------------数据处理完成.......");
    }
}

主题订阅

1、自定义 RedisTemplate 序列化方式(发布者和订阅者必须相同)。

2、配置主题订阅 - Redis 消息监听器绑定监听指定通道。

/**
 * 自定义 RedisTemplate 序列化方式
 * 配置主题订阅 - Redis 消息监听器绑定监听指定通道
 */
@Configuration
public class RedisConfig {
    // 自定义的消息订阅监听器,当收到阅订的消息时,会将消息交给这个类处理
    @Resource
    private RequestRateLimitSubscriber requestRateLimitSubscriber;
 
    //  自定义 RedisTemplate 序列化方式   
    @Bean
	public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory factory) {
		RedisTemplate<String, Object> redisTemplate = new RedisTemplate<>();
		redisTemplate.setKeySerializer(RedisSerializer.string());// key 序列化规则
		redisTemplate.setHashKeySerializer(RedisSerializer.string());// hash key 序列化规则
		redisTemplate.setValueSerializer(RedisSerializer.java());// value 序列化规则
		redisTemplate.setHashValueSerializer(RedisSerializer.java()); // hash value 序列化规则
		redisTemplate.setConnectionFactory(factory); //绑定 RedisConnectionFactory
		return redisTemplate; //返回设置好的 RedisTemplate
	}
    /**
     * 配置主题订阅
     * RedisMessageListenerContainer - Redis 消息监听器绑定监听指定通道
     * 1、可以添加多个监听器,监听多个通道,只需要将消息监听器与订阅的通道/主题绑定即可。
     * 2、订阅的通道可以配置在全局配置文件中,也可以配置在数据库中,
     * <p>
     * addMessageListener(MessageListener listener, Collection<? extends Topic> topics):将消息监听器与多个订阅的通道/主题绑定
     * addMessageListener(MessageListener listener, Topic topic):将消息监听器与订阅的通道/主题绑定
     *
     * @param connectionFactory
     * @return
     */
	@Bean
	public RedisMessageListenerContainer redisMessageListenerContainer(RedisConnectionFactory factory) {
		RedisMessageListenerContainer container = new RedisMessageListenerContainer();
		// 设置连接工厂,RedisConnectionFactory 可以直接从容器中取,也可以从 RedisTemplate 中取
		container.setConnectionFactory(factory);
		// 订阅名称叫 select_rate_limit_channel 的通道, 类似 Redis 中的 subscribe 命令
		container.addMessageListener(requestRateLimitSubscriber, new ChannelTopic("*"));
		// 订阅名称以 'basic-' 开头的全部通道, 类似 Redis 的 pSubscribe 命令
		container.addMessageListener(requestRateLimitSubscriber, new PatternTopic("*"));
		return container;

	}
}

Spring boot 监听 Key 过期事件

1、Redis 数据库可以通过命令设置 Key 的有效时间,当一个 Key 过期后会自动从数据库中删除,释放空间。得益于于这个特性,可以很轻松地实现诸多类似于 “Session” 管理、数据缓存等功能。它们都有一个共同点就是,数据不会永久保存!

2、在有些场景中,可能希望在某些 Key 过期的时候获取到通知,进行一些业务处理。或者是干脆用于 “定时通知/任务” 功能,例如:下单 30 分钟后未支付,则取消订单。那么可以在用户下单的时候使用订单号作为 key 设置到 Redis 数据库中,并且设置过期时间为 30 分钟。当超时后,可以在 “key 过期通知” 中获取到 key 也就是订单号,判断用户是否已经支付从而是否取消订单。

3、Redis 的 Key 过期通知功能本质上是通过 发布/订阅 功能实现的,所以它「不能保证通知消息的交付」,当 Key 过期时如果服务器停机、重启后则该通知消息会永久丢失。

消息监听

1、Spring Data Redis 专门提供了一个密钥过期事件消息侦听器:KeyExpirationEventMessageListener,自定义监听器类继承它,然后覆写 doHandleMessage(Message message) 方法即可。

2、doHandleMessage 方法用于处理 Redis Key 过期通知事件,其中 Message 参数表示通知消息,只有 2 属性,分别表示消息正文(在这里就是过期的 Key 名称)以及来自于哪个 channel。

3、在 Redis Key 过期事件中,「只能获取到已过期的 Key 的名称,不能获取到值。」

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.listener.KeyExpirationEventMessageListener;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.stereotype.Component;
/**
 * Redis 缓存 Key 过期监听器
 * Spring Data Redis 专门提供了一个密钥过期事件消息侦听器:KeyExpirationEventMessageListener,
 * 自定义监听器类继承它,然后覆写 doHandleMessage(Message message) 方法即可。
 */
@Component
public class KeyExpireListener extends KeyExpirationEventMessageListener {
    private static final Logger logger = LoggerFactory.getLogger(KeyExpireListener.class);
    /**
     * 通过构造函数注入 RedisMessageListenerContainer 给 KeyExpirationEventMessageListener
     *
     * @param listenerContainer : Redis消息侦听器容器
     */
    public KeyExpireListener(RedisMessageListenerContainer listenerContainer) {
        super(listenerContainer);
    }
    /**
     * doHandleMessage 方法用于处理 Redis Key 过期通知事件,
     * 在 Redis Key 过期事件中,「只能获取到已过期的 Key 的名称,不能获取到值。」
     *
     * @param message:通知消息,只有 2 属性,分别表示消息正文(在这里就是过期的 Key 名称)以及来自于哪个 channel。
     */
    @Override
    public void doHandleMessage(Message message) {
        // 过期的 key
        String key = new String(message.getBody());
        // 消息通道
        String channel = new String(message.getChannel());
        logger.info("过期key={} 消息通道(channel)={}", key, channel);
    }
}

主题订阅

1、与上面稍微有点不同,因为 key 过期事件属于 Redis 内部消息,内部频道/通道,所以只需要往容器中注入 RedisMessageListenerContainer 就行,不需要 addMessageListener 手动设置监听器 监听指定的通道/频道(topic 表达式)。

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
@Configuration
public class RedisConfig {
    @Bean
    public RedisMessageListenerContainer redisMessageListenerContainer(RedisConnectionFactory factory) {
        RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        container.setConnectionFactory(factory);
        
    //  container.setTaskExecutor(null);            // 设置用于执行监听器方法的 Executor
    //  container.setErrorHandler(null);            // 设置监听器方法执行过程中出现异常的处理器
    //  container.addMessageListener(null, null);   // 手动设置监听器 & 监听的 topic 表达式
        return container;
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1291402.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

智能成绩表 - 华为OD统一考试(C卷)

OD统一考试(C卷) 分值: 100分 题目描述 小明来到某学校当老师,需要将学生按考试总分或单科分数进行排名,你能帮帮他吗? 输入描述 第1行输入两个整数,学生人数n和科目数量m。0<n<100,0<m<10 第2行输入m个科目名称,彼此之间用空格隔开。科目名称只由英文…

备战2024年1月AMC8美国数学竞赛新方式:刷在线真题集(附资源)

今天是2023年12月7日&#xff0c;距离暂定于2024年1月19日举办的AMC8美国数学竞赛的举办日期还有42天&#xff0c;有志于尽早出国留学&#xff0c;或者小升初冲击名校的孩子们相信已经在如火如荼地利用课余时间上辅导班或者自学。 为了帮助大家提高备考2024年1月份AMC8竞赛的效…

二百一十、Hive——Flume采集的JSON数据文件写入Hive的ODS层表后字段的数据残缺

一、目的 在用Flume把Kafka的数据采集写入Hive的ODS层表的HDFS文件路径后&#xff0c;发现HDFS文件中没问题&#xff0c;但是ODS层表中字段的数据却有问题&#xff0c;字段中的JSON数据不全 二、Hive处理JSON数据方式 &#xff08;一&#xff09;将Flume采集Kafka的JSON数据…

设计一算法,对单链表实现就地逆置

对单链表逆置&#xff0c;要联想到单链表的头插性质 举个例子&#xff1a;现在有一个空链表&#xff0c;我们依次对它进行头插123 那么形成的链表是321&#xff0c;这样就形成了逆置 //单链表就地逆置 //思路&#xff1a;把原表接到一个新表上&#xff0c;然后对原表进行头插 …

实例分割 Mask-RCNN

参考文章 使用LabelMe标注目标检测数据集并转换为COCO2017格式_labelme转coco-CSDN博客 数据集选择 voc 这次不选择voc&#xff0c;因为文件组织太难了 voc2012文件夹组织 COCO COCO介绍 MC COCO2017年主要包含以下四个任务&#xff1a;目标检测与分割、图像描述、人体关…

【论文阅读】Bayes’ Rays:神经辐射场的不确定性量化

【论文阅读】Bayes’ Rays&#xff1a;神经辐射场的不确定性量化 1. Introduction2. Related work3. Background3.2. Neural Laplace Approximations 4. Method4.1. Intuition4.2. Modeling perturbations4.3. Approximating H4.4. Spatial uncertainty 5. Experiments & A…

多域名SSL证书该怎么分类

多域名SSL证书多域名SSL证书是一种网络安全工具&#xff0c;它能够为多个域名提供加密和安全保障。多域名SSL证书可以同时保护多个不同的域名站点&#xff0c;为用户打造安全可靠的网络环境。今天就随SSL盾小编了解多域名SSL证书的分类。 1.多域名SSL证书按照验证方式分为DV基础…

Java项目学生管理系统六后端补充

班级管理 1 班级列表&#xff1a;后端 编写JavaBean【已有】编写Mapper【已有】编写Service编写controller 编写Service 接口 package com.czxy.service;import com.czxy.domain.Classes;import java.util.List;/*** author 桐叔* email liangtongitcast.cn* description*/ p…

老师可以做副业吗

当老师&#xff0c;除了教学工作之外&#xff0c;还可以怎样来丰富自己的职业体验和增加收入呢&#xff1f; 自媒体作者 许多教师选择成为自媒体作者&#xff0c;分享自己的教育心得、教学经验以及与学生相处的生活状态等。通过撰写文章、发布在社交媒体上&#xff0c;不仅可以…

单片机第三季-第六课:STM32标准库

1&#xff0c;为什么会有标准外设库 传统单片机软件开发方式&#xff1a; (1)芯片厂商提供数据手册、示例代码、开发环境&#xff1b; (2)单片机软件工程师面向产品功能&#xff0c;查阅数据手册&#xff0c;参考官方示例代码进行开发&#xff1b; (3)硬件操作的方式是用C语言…

大屏图表汇总echarts圆环

圆环效果示例 代码如下 storageStaChart() {let color [#009976,#15E6B5]let charts echarts.init(document.getElementById(storageStaChart));let option this.getPieOption(color);charts.setOption(option, true);}, getPieOption(color) {let data [];data.push({val…

2023年7月13日 Go生态洞察:Govulncheck v1.0.0的全面解析

&#x1f337;&#x1f341; 博主猫头虎&#xff08;&#x1f405;&#x1f43e;&#xff09;带您 Go to New World✨&#x1f341; &#x1f984; 博客首页——&#x1f405;&#x1f43e;猫头虎的博客&#x1f390; &#x1f433; 《面试题大全专栏》 &#x1f995; 文章图文…

学习mysql记录

环境: macbookpro m1 1. 安装mysql 使用苹果自带的包管理工具brew进行安装 1. brew install mysql (安装) 2. brew services start mysql (启动mysql服务) 1.1 如果提示zsh: mysql command not found, 终端执行以下命令 1. cd ~ (切到根目录) 2. vi .bash_profile (进入编辑…

从零开始,利用ChatGPT学会写作的完整指南

文章目录 前言了解ChatGPT访问OpenAI平台使用ChatGPT进行简单的对话定义写作主题逐步生成文章段落添加个性化和细节编辑和润色反复修改直至满意 图书推荐内容简介作者简介获取方式 前言 在数字时代&#xff0c;人工智能技术日益成熟&#xff0c;为我们提供了全新的学习和创作机…

【Linux】在磁盘中如何找到文件 -- 磁盘的物理结构与逻辑结构

Halo&#xff0c;这里是Ppeua。平时主要更新C语言&#xff0c;C&#xff0c;数据结构算法…感兴趣就关注我吧&#xff01;你定不会失望。 本篇导航 0. 磁盘物理结构介绍1. 磁盘逻辑结构2. 文件系统划分3. 如何理解文件目录4. 对文件的增删查改5. 软链接与硬链接5.1 软链接5.2.…

【C++11(二)】lambda表达式以及function包装器

&#x1f493;博主CSDN主页:杭电码农-NEO&#x1f493;   ⏩专栏分类:C从入门到精通⏪   &#x1f69a;代码仓库:NEO的学习日记&#x1f69a;   &#x1f339;关注我&#x1faf5;带你学习C   &#x1f51d;&#x1f51d; C11 1. 前言2. lambda表达式的提出3. lambda表达…

mixamo根动画导入UE5问题:滑铲

最近想做一个跑酷游戏&#xff0c;从mixamo下载滑铲动作后&#xff0c;出了很多动画的问题。花了两周时间&#xff0c;终于是把所有的问题基本上都解决了。 常见问题&#xff1a; 1.【动画序列】人物不移动。 2.【动画序列】人物移动朝向错误。 3.【蒙太奇】人物移动后会被拉回…

c语言五子棋

下面是一个简单的C语言五子棋实现示例&#xff1a; #include <stdio.h>#include <stdlib.h>#define BOARD_SIZE 15char board[BOARD_SIZE][BOARD_SIZE];void init_board() { int i, j; for (i 0; i < BOARD_SIZE; i) { for (j 0; j < BOARD_…

AIGC:使用变分自编码器VAE实现MINIST手写数字生成

1 变分自编码器介绍 变分自编码器&#xff08;Variational Autoencoders&#xff0c;VAE&#xff09;是一种生成模型&#xff0c;用于学习数据的分布并生成与输入数据相似的新样本。它是一种自编码器&#xff08;Autoencoder&#xff09;的扩展&#xff0c;自编码器是一种用于…

vue中shift+alt+f格式化防止格式掉其它内容

好处就是使得提交记录干净&#xff0c;否则修改一两行代码&#xff0c;习惯性按了一下格式化快捷键&#xff0c;遍地飘红&#xff0c;下次找修改就费时间 1.点击设置图标-设置 2.点击这个转成配置文件 {"extensions.ignoreRecommendations": true,"[vue]":…