SpringBoot Redis 消息队列

news2024/12/23 4:39:56

文章目录

  • 参考
  • 消息队列
  • list
    • 源码
  • pub/sub
    • 源码

参考

https://www.cnblogs.com/uniqueDong/p/15904837.html
https://www.cnblogs.com/wzh2010/p/17205390.html
https://blog.csdn.net/qq_16557637/article/details/121015736
https://developer.aliyun.com/article/1095035
https://blog.csdn.net/sco5282/article/details/132904956

消息队列

消息队列可以实现消息解耦、消息路由、异步处理、流量削峰填谷。主流消息队列有kafka, rabbitmq, rocketmq
Redis也可以实现消息队列。方式有

  1. list
  2. pub/sub
  3. stream

list

redis的list底层是链表,满足先进先出。
list实现队列比较方便。同时可以满足有序,消息去重。缺点是

  1. 没有订阅功能,消费者要主动查询队列。而为了避免频繁查询队列消耗CPU资源,可以采用阻塞式查询。redis中阻塞查询命令是brpop
  2. 无法保证可靠性。缺少消息确认机制,无法及时感知遗漏消息,导致数据不一致。

源码

完整项目在https://gitcode.com/zsss1/redis_mq/overview
pom.xml添加redisson依赖。

<dependency>
     <groupId>org.springframework.boot</groupId>
     <artifactId>spring-boot-starter-data-redis</artifactId>
 </dependency>
 <dependency>
     <groupId>org.redisson</groupId>
     <artifactId>redisson-spring-boot-starter</artifactId>
     <version>3.40.2</version>
 </dependency>

Redission封装依赖。

@SpringBootTest(classes = DemoApplication.class)
public class RedisListTest {
    @Autowired
    private RedissonClient client;

    private static final String REDIS_QUEUE = "list_queue";

    private static final Logger LOGGER = LoggerFactory.getLogger(RedisListTest.class);
    
    @Test
    public void test_redis_list_mq() throws Exception {
        RedissonBlockingDeque r;
        new Thread(() -> {
            for (int i = 0; i < 10; i++) {
                producer("message" + i);
            }
        }).start();

        new Thread(() -> {
            consumer();
        }).start();

        Thread.currentThread().join();
    }

    // 消费者,阻塞
    public void consumer() {
        RBlockingDeque<String> deque = client.getBlockingDeque(REDIS_QUEUE);
        boolean isCheck = true;
        while (isCheck) {
            try {
                String message = deque.takeLast();
                System.out.println("consumer: " + message);
            } catch (InterruptedException e) {
                LOGGER.error("consumer failed, cause: {}", e.getMessage());
            }
        }
    }

    // 生产者
    public void producer(String message) {
        RBlockingDeque<String> deque = client.getBlockingDeque(REDIS_QUEUE);
        System.out.println(deque.getClass());
        try {
            deque.putFirst(message);
        } catch (InterruptedException e) {
            LOGGER.error("producer failed, msg: {}, cause: {}", message, e.getMessage());
        }
    }
}

pub/sub

发布订阅模式是一种消息传递模式。发送者将消息发送到频道,订阅者订阅频道即可及时收到消息。
它支持组生产者与消费者。但是它会丢失消息。
Redis在server端为每个消费者保留一块内存区域,存储该消费者订阅的数据。如果消费者处理速度慢,内存区域满了,那么Redis会断开消费者连接,这会导致消息丢失。

源码

  1. 定义频道。
public class TopicChannel {
    public static final String SEND_PHONE = "send_phone";
    public static final String SEND_EMAIL = "send_email";
}
  1. 定义监听频道的订阅者。分清org.springframework.data.redis.connection.MessageListenerorg.redisson.api.listener.MessageListener
public class MyMessageListener implements MessageListener {

    private static Map<String, Consumer<String>> RULE = new HashMap<>();

    static {
        RULE.put(TopicChannel.SEND_EMAIL, MyMessageListener::sendEmail);
        RULE.put(TopicChannel.SEND_PHONE, MyMessageListener::sendPhone);
    }

    public static void sendEmail(String msg) {
        System.out.println("listen email:" + msg);
    }

    public static void sendPhone(String msg) {
        System.out.println("listen phone:" + msg);
    }

    @Override
    public void onMessage(Message message, byte[] pattern) {
        byte[] byteChannel = message.getChannel();
        byte[] byteBody = message.getBody();
        try {
            String channel = new String(byteChannel);
            String body = new String(byteBody);
            System.out.println("channel: + " + channel + ", body: " + body);
            RULE.get(channel).accept(body);
        } catch (Exception e) {
            System.out.println(e.getMessage());
        }
    }
}
  1. 在redis注册订阅者。
@Component
public class RedisConfig {
    @Bean
    public MessageListenerAdapter messageListenerAdapter() {
        return new MessageListenerAdapter(new MyMessageListener());
    }

    @Bean
    public RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory, MessageListenerAdapter messageListenerAdapter) {
        RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        // messageListenerAdapter 订阅 SEND_EMAIL 频道
        container.addMessageListener(messageListenerAdapter, new PatternTopic(TopicChannel.SEND_EMAIL));
        // messageListenerAdapter 订阅 SEND_PHONE 频道
        container.addMessageListener(messageListenerAdapter, new PatternTopic(TopicChannel.SEND_PHONE));
        return container;
    }
}
  1. 测试
@SpringBootTest(classes = DemoApplication.class)
public class MyListener {
    @Autowired
    private RedisTemplate<String, String> redisTemplate;

    @Test
    public void test_pub() {
        redisTemplate.convertAndSend(TopicChannel.SEND_EMAIL, "pub email message");
        redisTemplate.convertAndSend(TopicChannel.SEND_PHONE, "pub phone message");
    }
}

测试结果

channel: + send_email, body: pub email message
listen email:pub email message
channel: + send_phone, body: pub phone message
listen phone:pub phone message

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2264053.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

精通Redis(一)

目录 1.NoSQL 非关系型数据库 2.Redis 3.Redis的java客户端 4.Jedis 4.1Jedis快速入门 4.2Jedis连接池及使用 5.SpringDataRedis和RedisTemplate 1.NoSQL 非关系型数据库 基础篇-02.初始Redis-认识NoSQL_哔哩哔哩_bilibili NoSQL与SQL的区别就在于SQL是结构化的、关联…

研发效能DevOps: Vite 使用 Element Plus

目录 一、实验 1.环境 2.初始化前端项目 3.安装 vue-route 4.安装 pinia 5.安装 axios 6.安装 Element Plus 7.gitee创建工程 8. 配置路由映射 9.Vite 使用 Element Plus 二、问题 1.README.md 文档推送到gitee未自动换行 2.访问login页面显示空白 3.表单输入账户…

openbmc hwmon与sensor监控

1.说明 参考文档: https://github.com/openbmc/entity-manager/blob/master/docs/entity_manager_dbus_api.mdhttps://github.com/openbmc/entity-manager/blob/master/docs/my_first_sensors.md 1.1 简单介绍 注意: 本节是快速浏览整个sensor框架&#xff0c;了解大致open…

thinkphp框架diygw-ui-php进销存出库记录操作

将进销存的出库明细记录存储到数据库中&#xff0c;thinkphp框架diygw-ui-php后台通常涉及以下几个步骤&#xff1a; 数据库表定义 实现我们定义了三张表、一个产品表、出库订单表、出库订单产品明细表 生成API 进入DIY可视化API代码生成器&#xff0c;我们生成这三张表结应…

vertx idea快速使用

目录 1.官网下载项目 2.修改代码 2.1拷贝代码方式 为了能够快速使用&#xff0c;我另外创建一个新的maven项目&#xff0c;将下载项目的src文件和pom文件拷贝到新建的maven项目。 2.2删除.mvn方式 3.更新配置 4.配置application 5.idea启动项目 1.官网下载项目 从vert…

ComE(Community Embedding) -- 基于嵌入的社区检测优化算法

ComE&#xff08;Community Embedding&#xff09;是一种基于嵌入的社区检测优化算法。 它结合了节点嵌入技术与社区划分的目标&#xff0c;能够有效识别网络中的社区结构&#xff0c;并在社区划分过程中捕捉复杂的节点相互作用信息。 算法背景 传统的社区检测方法&#xff0c;…

CSS|14 z-index

z-index z-index表示谁压盖着谁&#xff0c;数值大的会压盖住数值小的。只有定位的元素才有z-index值&#xff0c;只有设置了固定定位、相对定位、绝对定位了的元素&#xff0c;才会拥有z-indexz-index的值是没有单位的&#xff0c;值是一个正整数&#xff0c;默认的z-index值…

重撸设计模式--代理模式

文章目录 定义UML图代理模式主要有以下几种常见类型&#xff1a;代理模式涉及的主要角色有&#xff1a;C 代码示例 定义 代理模式&#xff08;Proxy Pattern&#xff09;属于结构型设计模式&#xff0c;它为其他对象提供一种代理以控制对这个对象的访问。 通过引入代理对象&am…

vue中验证码的实现方式

在写登录页的时候有的系统会让你也进行一下验证码绘制&#xff0c;那么验证码如何实现的呢&#xff1f;我在写登录页的时候通过将登录框&#xff0c;验证码分开页面来写&#xff0c;最后将它们变成标签来导入到我的样式页面中&#xff0c;这样写不仅方便&#xff0c;更容易修改…

Spring(三)-SpringWeb-概述、特点、搭建、运行流程、组件、接受请求、获取请求数据、特殊处理、拦截器

文章目录 一、SpringWeb概述 二、SpringWeb特点 三、搭建SpringWeb&#xff08;在web项目中&#xff09; 1、导包 2、在web.xml文件中配置统一拦截分发器 DispatcherServlet 3、开启 SpringWEB 注解 4、处理器搭建 四、SpringWeb运行流程 五、SpringWeb组件 1、前端控…

构建MacOS应用小白教程(打包 签名 公证 上架)

打包 在package.json中&#xff0c;dependencies会被打进 Electron 应用的包里&#xff0c;而devDependencies则不会&#xff0c;所以必要的依赖需要放到dependencies中。files中定义自己需要被打进 Electron 包里的文件。以下是一个完整的 mac electron-builder的配置文件。 …

2.4 网络概念(分层、TCP)

网络层与传输层概述 网络层&#xff1a; 抽象概念&#xff1a;网络层是基于 IP 的抽象概念&#xff0c;与数据链路层用 MAC 地址标记设备不同。MAC 地址是一种具体化的概念&#xff0c;绑定于所在的物理网络&#xff0c;而 IP 地址可以是固定的&#xff0c;也可以通过路由动态…

【JetPack】Room数据库笔记

Room数据库笔记 ORM框架&#xff1a;对齐数据库数据结构与面向对象数据结构之间的关系&#xff0c;使开发编程只考虑面向对象不需要考虑数据库的结构 Entity : 数据实体&#xff0c;对应数据库中的表 <完成面向对象与数据库表结构的映射> 注解&#xff1a; 类添加注解…

基于前端技术UniApp和后端技术Node.js的电影购票系统

文章目录 摘要Abstruct第一章 绪论1.1 研究背景与意义1.2 国内外研究现状 第二章 需求分析2.1 功能需求分析2.2 非功能性需求分析 第二章系统设计3.1 系统架构设计3.1.1 总体架构3.1.2 技术选型 3.2 功能架构 第四章 系统实现4.1 用户端系统实现4.1.1 用户认证模块实现4.1.2 电…

大模型微调---Lora微调实战

目录 一、前言二、LoRA实战2.1、下载模型到本地2.2、加载模型与数据集2.3、处理数据2.4、LoRA微调2.5、训练参数配置2.6、开始训练 三、模型评估四、完整训练代码 一、前言 LoRA是一种参数高效的微调技术&#xff0c;通过低秩转换对大型语言模型进行适应性更新&#xff0c;减少…

centos7下docker 容器实现redis主从同步

1.下载redis 镜像 docker pull bitnami/redis2. 文件夹授权 此文件夹是 你自己映射到宿主机上的挂载目录 chmod 777 /app/rd13.创建docker网络 docker network create mynet4.运行docker 镜像 安装redis的master -e 是设置环境变量值 docker run -d -p 6379:6379 \ -v /a…

SLAAC如何工作?

SLAAC如何工作&#xff1f; IPv6无状态地址自动配置(SLAAC)-常见问题 - 苍然满关中 - 博客园 https://support.huawei.com/enterprise/zh/doc/EDOC1100323788?sectionj00shttps://www.zhihu.com/question/6691553243/answer/57023796400 主机在启动或接口UP后&#xff0c;发…

2024.12.21辩论赛感受

背景 今天辩论赛的双方论点是&#xff1a; 正方&#xff1a;寒假留在研发中心的收获大 反方&#xff1a;寒假去做其他事情的收获 辩论赛&#xff0c;为了锻炼自己&#xff0c;选择了不想选择以及相对不好辩论的反方。出现的状况有一下几点&#xff1a; 1.发现自己脑子完全跟不…

【从零开始入门unity游戏开发之——C#篇21】C#面向对象的封装——`this`扩展方法、运算符重载、内部类、`partial` 定义分部类

文章目录 一、this扩展方法1、扩展方法的基本语法2、使用扩展方法3、扩展方法的注意事项5、扩展方法的限制6、总结 二、运算符重载1、C# 运算符重载2、运算符重载的基本语法3. 示例&#xff1a;重载加法运算符 ()4、使用重载的运算符5、支持重载的运算符6、不能重载的运算符7、…

C语言:文件IO

C语言&#xff1a;文件IO 文件操作 概述 什么是文件 文件是保存在外存储器&#xff08;一般代指磁盘&#xff0c;U盘&#xff0c;移动硬盘等&#xff09;的数据的集合 文件操作体现在哪几个方面 文件内容的读取文件内容的写入 数据的读取和写入可被视为针对文件进行输入&…