使用redisMQ-spring-boot-starter实现消息队列和延时队列

news2024/11/15 21:30:14

简介

redisMQ-spring-boot-starter是一个轻量级的、基于Redis实现的消息队列中间件,它有如下优点:

  • 开箱即用,你几乎不用添加额外的配置
  • 支持消息队列、延时队列,并提供精细化配置参数
  • 提供消息确认机制
  • 支持虚拟空间,不同虚拟空间的数据互相隔离
  • 支持web控制台,实时查看各个队列的消费情况

开始使用

引用依赖

springboot3.0以下版本:

<dependency>
    <groupId>io.github.lengmianshi</groupId>
    <artifactId>redisMQ-spring-boot-starter</artifactId>
    <version>1.0.2</version>
</dependency>

<!-- 以下配置可以改为你自己的版本 -->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-redis</artifactId>
    <version>2.1.0.RELEASE</version>
</dependency>
<dependency>
    <groupId>redis.clients</groupId>
    <artifactId>jedis</artifactId>
    <version>2.9.2</version>
</dependency>
<dependency>
    <groupId>com.alibaba</groupId>
    <artifactId>fastjson</artifactId>
    <version>1.2.83</version>
</dependency>

注:spring-boot-starter-data-redis依赖于spring-data-redis,如果发生依赖冲突,要确保spring-data-redis的版本不低于2.1.0.RELEASE,可在你的pom.xml中锁定版本:

<dependencyManagement>
    <dependencies>
          <dependency>
              <groupId>org.springframework.data</groupId>
              <artifactId>spring-data-redis</artifactId>
              <version>2.1.2.RELEASE</version>
          </dependency>
    </dependencies>
</dependencyManagement>

springboot3.0:

<dependency>
    <groupId>io.github.lengmianshi</groupId>
    <artifactId>redisMQ-spring-boot-starter</artifactId>
    <version>2.0.2</version>
</dependency>
        
<!-- 以下配置可以改为你自己的版本 -->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-redis</artifactId>
    <version>3.2.1</version>
</dependency>
<dependency>
    <groupId>redis.clients</groupId>
    <artifactId>jedis</artifactId>
    <version>5.1.0</version>
</dependency>
<dependency>
    <groupId>com.alibaba</groupId>
    <artifactId>fastjson</artifactId>
    <version>1.2.83</version>
</dependency>

配置redis

一般引入redis的项目都会事先配置,如果你的项目没配置过,则可在application.yml中加上如下配置:

springboot3.0以下版本:

spring:
  redis:
    host: test.redis.com  #改成你的
    password: vC403V2KMc0Kghz #改成你的
    port: 6379 #改成你的
    jedis:
      pool:
        max-active: 100
        max-idle: 10
        min-idle: 10
    timeout: 2000

springboot3.0:

spring:
  data:
    redis:
      host: test.redis.com #改成你的
      password: vC403V2KMc0Kghz #改成你的
      port: 6379 #改成你的
      jedis:
        pool:
          max-active: 100
          max-idle: 10
          min-idle: 10
      timeout: 2000

消息队列

生产者发送消息

@Autowired
private RedisQueueTemplate redisQueueTemplate;

/**
 * 1次只发送一条消息
 */
@Test
public void test1() {
    JSONObject message = new JSONObject();
    message.put("bondId", "17f62f1dfb5afb12e8d67cd651c1df53");
    message.put("year", 2022);
    redisQueueTemplate.sendMessage("test_queue", message);
}

/**
 * 批量发送消息
 */
@Test
public void test2() {
    List messageList = new ArrayList<>();
    for (int i = 0; i < 5000; i++) {
        JSONObject mess = new JSONObject();
        mess.put("index", i);
        messageList.add(mess);
    }

    redisQueueTemplate.sendMessageAll("test_queue", messageList);
}

注:示例中每条MQ消息都用JSONObject包装,这只是我的个人习惯,你也可以使用实体类

消费者消费消息

消费方法的参数只能有1个,并且类型要与生产者发送消费的类型保存一致:

@Component
public class QueueConsumer {
    //使用默认参数
    @RedisQueueListener(queue = "test_queue")
    public void test(JSONObject message){
        System.out.println(message);
    }

    //指定单个实例下使用5个消费线程
    @RedisQueueListener(queue = "test_queue2", consumers = 5)
    public void test2(JSONObject message){
        System.out.println(message);
    }

    //单个实例5个线程,手动确认
    @RedisQueueListener(queue = "test_queue3", consumers = 5, autoAck = false)
    public void test3(JSONObject message){
        System.out.println(message);
    }

}

@RedisQueueListener注解支持的所有参数:

package com.leng.project.redisqueue.annotation;

import java.lang.annotation.*;

@Target({ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface RedisQueueListener {
    /**
     * 队列名
     *
     * @return
     */
    String queue() default "";

    /**
     * 消费者线程数
     *
     * @return
     */
    int consumers() default 1;

    /**
     * 是否自动确认
     *
     * @return
     */
    boolean autoAck() default true;

    /**
     * 一次从队列中取多少数据
     *
     * @return
     */
    int prefetch() default 50;

    /**
     * 获取消息的频率,单位秒
     * @return
     */
    long frequency() default 2;
}

其中:

  • consumers:单个实例下启动多少个消费线程,默认为1
  • autoAck:是否自动确认消息,默认为true。自动确认与手动确认的区别:
    • 自动确认:消费线程从队列中取出消息,如果消费失败,则该条消息丟失
    • 手动确认:消费线程从队列中取出消息,并将消息写入待确认队列中;如果消费失败,则一段时间后(15分钟)会重新入队,消费端要做幂等性处理
  • prefetch:一个消费线程一次性从队列中取出多少条消息,因为涉及锁的竞争,不宜过小,默认为50
  • frequency:单个消费线程每隔多少秒获取一次消息,默认为2,最小值为1。有人可能会奇怪,消息不是应该即时消费吗?不是越快越好吗?实际上,有些业务场景对消息的实时性要求很低,几天、几个月、甚至一年才执行一次,这时我们完全可以把frequency调大,以减轻redis的压力

延时队列

延时队列的常用场景如用户下单,xx分钟后没有支付则自动关闭订单;已支持的订单,xxx天后自动确认收货等。

生产者发送消息

@Autowired
private RedisQueueTemplate redisQueueTemplate;

/**
 * 1次只发送1条消息
 */
public void test1(){
    JSONObject message = new JSONObject();
    message.put("bondId", "17f62f1dfb5afb12e8d67cd651c1df53");
    message.put("year", 2022);
    //延时5秒
    redisQueueTemplate.sendDelayMessage("test_delay_queue", message, 5, TimeUnit.SECONDS);
}

/**
 * 批量发送,每条消息的延时时长一样
 */
public void test2(){
    List messageList = new ArrayList<>();
    for (int i = 0; i < 5000; i++) {
        JSONObject mess = new JSONObject();
        mess.put("index", i);
        messageList.add(mess);
    }
    //延时5秒
    redisQueueTemplate.sendDelayMessageAll(queue, messageList, 5, TimeUnit.SECONDS);
}

/**
 * 批量发送,每条消息的延时时长各不相同
 */
public void test3(){
    List messageList=new ArrayList<>();
    for(int i=0; i< 5000; i++){
        JSONObject mess=new JSONObject();
        mess.put("index",i);
        
        //每条消息可以使用不同的延时时长,这里为了简便,统一写成5了
        DelayMessageParam param=new DelayMessageParam(mess,5,TimeUnit.SECONDS);
        messageList.add(param);
    }

    redisQueueTemplate.sendDelayMessageAll(queue,messageList);
}

注:示例中每条MQ消息都用JSONObject包装,这只是我的个人习惯,你也可以使用实体类

消费者消费消息

@RedisDelayQueueListener注解的参数与@RedisQueueListener完全相同;消费方法的参数只能有1个,并且类型要与生产者发送消费的类型保存一致:

@Component
public class DelayQueueConsumer {
  /**
   * 使用默认参数
   * @param message
   */
  @RedisDelayQueueListener(queue = "test_delay_queue")
    public void test(JSONObject message){
        System.out.println(message);
    }

  /**
   * 单个实例5个消费线程
   * @param message
   */
  @RedisDelayQueueListener(queue = "test_delay_queue2", consumers = 5)
    public void test2(JSONObject message){
        System.out.println(message);
    }

  /**
   * 单个实例5个消费线程,手动确认
   * @param message
   */
  @RedisDelayQueueListener(queue = "test_delay_queue3", consumers = 5, autoAck = false)
    public void test3(JSONObject message){
        System.out.println(message);
    }

}

虚拟空间

参考了RabbitMQ的设计。虚拟空间很有必要,例如,开发环境和测试环境的数据如果没有隔离,在调试时被测试环境的消费端干扰。
配置虚拟空间:

queue:
  virtual-host: '/dev'  #默认为 /

Web管理平台

浏览器访问:http://ip:port/queue.html,默认的账号密码为admin/admin

配置账号:

queue:
  console:
    #是否启用web控制台
    enable: true
    username: admin #登录用户名
    password: 123456 #密码

登录成功后的界面,可查看所有虚拟空间的队列及消费情况:
image.png

注:如果你的系统使用了权限控制框架,如shiro、spring-security等,则需要对如下3个资源放行:

  • /queue.html
  • /queue/**
  • /static/**

ps:
项目地址:https://github.com/lengmianshi/redisMQ-spring-boot-starter-parent,欢迎提bug

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1465027.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

华为算法题 go语言或者ptython

1 给定一个整数数组 nums 和一个整数目标值 target&#xff0c;请你在该数组中找出 和为目标值 target 的那 两个 整数&#xff0c;并返回它们的数组下标。 你可以假设每种输入只会对应一个答案。但是&#xff0c;数组中同一个元素在答案里不能重复出现。 你可以按任意顺序返…

基于YOLOv8/YOLOv7/YOLOv6/YOLOv5的景区垃圾识别系统(Python+PySide6界面+训练代码)

摘要&#xff1a;本文介绍了一个先进的基于深度学习的景区垃圾检测系统&#xff0c;该系统集成了最新的YOLOv8算法&#xff0c;并与YOLOv7、YOLOv6、YOLOv5等前代算法进行了性能对比&#xff0c;通过对比实验证明了其在图像、视频、实时视频流和批量文件处理中对景区垃圾进行精…

【深度学习笔记】3_1 线性回归

注&#xff1a;本文为《动手学深度学习》开源内容&#xff0c;仅为个人学习记录&#xff0c;无抄袭搬运意图 3.1 线性回归 线性回归输出是一个连续值&#xff0c;因此适用于回归问题。回归问题在实际中很常见&#xff0c;如预测房屋价格、气温、销售额等连续值的问题。与回归问…

HTTP基本概念-HTTP 常见的状态码有哪些?

资料来源 : 小林coding 小林官方网站 : 小林coding (xiaolincoding.com) HTTP 常见的状态码有哪些? 1xx 类状态码属于提示信息&#xff0c;是协议处理中的一种中间状态&#xff0c;实际用到的比较少。 2xx 类状态码表示服务器成功处理了客户端的请求&#xff0c;也是我们最愿…

数据库事物复习

事务 比如说将张三的银行账户拿出一千给李四&#xff0c;首先需要查询张三的账户余额&#xff0c;扣除1000&#xff0c;然后如果给李四加上1000的过程中出现异常会回滚事务&#xff0c;临时修改的数据会回复回去。 -- 1. 查询张三账户余额 select * from account where name …

OpenAI Sora模型,官方技术文档翻译

技术报告地址&#xff1a;https://openai.com/research/video-generation-models-as-world-simulators 本技术报告的重点是&#xff08;1&#xff09;将所有类型的视觉数据转化为统一表示&#xff0c;从而能够大规模训练生成模型的方法&#xff1b;以及&#xff08;2&#xff0…

互联网广告投放与IP地理位置定位

随着互联网的发展和普及&#xff0c;互联网广告投放成为各行业推广营销的重要方式之一。而结合IP地理位置定位技术&#xff0c;可以实现精准定向&#xff0c;提高广告投放的效果和精准度。IP数据云将探讨互联网广告投放与IP地理位置定位的关系&#xff0c;分析其优势和应用场景…

基于springboot+vue的智能物流管理系统(前后端分离)

博主主页&#xff1a;猫头鹰源码 博主简介&#xff1a;Java领域优质创作者、CSDN博客专家、阿里云专家博主、公司架构师、全网粉丝5万、专注Java技术领域和毕业设计项目实战&#xff0c;欢迎高校老师\讲师\同行交流合作 ​主要内容&#xff1a;毕业设计(Javaweb项目|小程序|Pyt…

设计模式-创建型模式-原型模式

原型模式&#xff08;Prototype Pattern&#xff09;&#xff1a;使用原型实例指定创建对象的种类&#xff0c;并且通过克隆这些原型创建新的对象。原型模式是一种对象创建型模式。原型模式其实就是从一个对象再创建另外一个可定制的对象&#xff0c;而且不需知道任何创建的细节…

Nginx知识笔记

一、前言 首先&#xff0c;我们来看一张关于正向代理和反向代理的图片 简单理解正向代理和反向代理的概念&#xff1a; 正向代理&#xff1a;在客户端配置代理服务器(和跳板机功能类似&#xff0c;比如公司很多机器需要通过跳板机才允许登录&#xff0c;正向代理的典型用途是…

npmjs官网(查询依赖包)

npmjs官网 可以方便的查看依赖包的安装、使用说明及相关注意事项等。 以wechat-http为例&#xff1a;

1.CSS单位总结

CSS 单位总结 经典真题 px 和 em 的区别 CSS 中的哪些单位 首先&#xff0c;在 CSS 中&#xff0c;单位分为两大类&#xff0c;绝对长度单位和相对长度单位。 绝对长度单位 我们先来说这个&#xff0c;绝对长度单位最好理解&#xff0c;和我们现实生活中是一样的。在我们…

rabbitmq知识梳理

一.WorkQueues模型 Work queues&#xff0c;任务模型。简单来说就是让多个消费者绑定到一个队列&#xff0c;共同消费队列中的消息。 当消息处理比较耗时的时候&#xff0c;可能生产消息的速度会远远大于消息的消费速度。长此以往&#xff0c;消息就会堆积越来越多&#xff0c…

男性美颜SDK解决方案,专属男性美化新体验

随着科技的发展&#xff0c;美颜技术已广泛应用于摄影、社交、直播等领域&#xff0c;满足了用户对美的追求。然而&#xff0c;传统的美颜算法往往更偏向于女性用户&#xff0c;忽视了男性用户对于自然、真实美的需求。美摄科技针对这一市场痛点&#xff0c;推出了专为男性设计…

APP的UI自动化demo(appium+java)

文章目录 appium连接手机java代码实现-第一版第二版-接入testng和隐式等待显示等待 appium连接手机 准备工作 1、查看连接手机模拟器是否连接成功&#xff0c;获取设备名称 执行命令&#xff1a;adb devices 2、查看android内核版本号—>paltformVersion 执行命令&#xf…

NLP 使用Word2vec实现文本分类

&#x1f368; 本文为[&#x1f517;365天深度学习训练营学习记录博客 &#x1f366; 参考文章&#xff1a;365天深度学习训练营 &#x1f356; 原作者&#xff1a;[K同学啊 | 接辅导、项目定制]\n&#x1f680; 文章来源&#xff1a;[K同学的学习圈子](https://www.yuque.com/…

某胜物流软件三个接口sql注入漏洞(附漏洞检测脚本)

免责声明 文章中涉及的漏洞均已修复&#xff0c;敏感信息均已做打码处理&#xff0c;文章仅做经验分享用途&#xff0c;切勿当真&#xff0c;未授权的攻击属于非法行为&#xff01;文章中敏感信息均已做多层打马处理。传播、利用本文章所提供的信息而造成的任何直接或者间接的…

云呐矿井智能化运维工是什么?智能机器人运维岗位

煤矿智能运维是指利用先进的信息技术和自动控制&#xff0c;在煤矿生产过程中对煤矿设备进行监测、维护和管理。其职责和工作任务主要包括: 工作环境:  面对复杂的地质条件和极端的气候环境&#xff0c;煤矿智能运维工程师往往需要在地下煤矿、监测中心等环境中工作。因此&a…

2024年【陕西省安全员C证】模拟考试题库及陕西省安全员C证操作证考试

题库来源&#xff1a;安全生产模拟考试一点通公众号小程序 陕西省安全员C证模拟考试题库考前必练&#xff01;安全生产模拟考试一点通每个月更新陕西省安全员C证操作证考试题目及答案&#xff01;多做几遍&#xff0c;其实通过陕西省安全员C证模拟考试题库很简单。 1、【多选题…

TS04——四通道灵敏度自校准电容触摸控制电路,可通过外部电容可独立调节灵敏率和外部电阻调节内部频率,广泛应用于移动设备,门钥匙锁矩阵的应用

TS04是一块四通道灵敏度自校准电容触摸控制电路。 主要特点&#xff1a; ● 灵敏度 自动校准的电容触摸电路 ● 并行接口 ● 通过外部电容可独立调节灵敏率 ● 通过外部电阻调节内部频率 ● 嵌入高频的噪音消除电路 ● 电流工作小 ● 封装形式: SOP14、 16QFN 应用&#xff1a…