SpringBoot日常:封装rabbitmq starter组件

news2024/9/9 4:03:00

文章目录

    • 逻辑实现
      • RabbitExchangeEnum
      • RabbitConfig
      • RabbitModuleInfo
      • RabbitModuleInitializer
      • RabbitProperties
      • RabbitProducerManager
      • POM.xml
      • spring.factories
    • 功能测试
      • application.yml配置
      • 生产者:
      • 消费者:
      • 测试结果:
      • 总结

本章内容主要介绍编写一个rabbitmq starter,能够通过配置文件进行配置交换机、队列以及绑定关系等等。项目引用该组件后能够自动初始化交换机和队列,并进行简单通信。
如若有其他需求,可自行扩展,例如消息消费的确认等
参考文章:SpringBoot日常:自定义实现SpringBoot Starter

逻辑实现

下面直接进入主题,介绍整体用到的文件和逻辑内容

RabbitExchangeEnum

交换机枚举类,四种交换机类型,分别是直连交换机、主题交换机、扇出交换机和标题交换机

/**
 * @Author 码至终章
 * @Version 1.0
 */
public enum RabbitExchangeEnum {

    DIRECT,
    TOPIC,
    FANOUT,
    HEADERS;
}

RabbitConfig

初始化配置文件

import org.springframework.amqp.core.AmqpAdmin;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * @Author 码至终章
 * @Version 1.0
 */
@Configuration
public class RabbitConfig {

    /**
     * 通过yaml配置,创建队列、交换机初始化器
     */
    @Bean
    @ConditionalOnMissingBean
    public RabbitModuleInitializer rabbitModuleInitializer(AmqpAdmin amqpAdmin, RabbitProperties rabbitProperties) {
        return new RabbitModuleInitializer(amqpAdmin, rabbitProperties);
    }
}

RabbitModuleInfo

配置信息的映射的文件,用于接收配置文件中配置的交换机和队列属性

import cn.seczone.starter.rabbitmq.enums.RabbitExchangeEnum;
import lombok.Data;

import java.util.Map;

/**
 * 队列和交换机机绑定关系实体对象
 *
 * @Author 码至终章
 * @Version 1.0
 */
@Data
public class RabbitModuleInfo {

    /**
     * 路由Key
     */
    private String routingKey;
    /**
     * 队列信息
     */
    private Queue queue;
    /**
     * 交换机信息
     */
    private Exchange exchange;

    /**
     * 交换机信息类
     */
    @Data
    public static class Exchange {
        /**
         * 交换机类型
         * 默认直连交换机
         */
        private RabbitExchangeEnum type = RabbitExchangeEnum.DIRECT;
        /**
         * 交换机名称
         */
        private String name;
        /**
         * 是否持久化
         * 默认true持久化,重启消息不会丢失
         */
        private boolean durable = true;
        /**
         * 当所有队绑定列均不在使用时,是否自动删除交换机
         * 默认false,不自动删除
         */
        private boolean autoDelete = false;
        /**
         * 交换机其他参数
         */
        private Map<String, Object> arguments;
    }

    /**
     * 队列信息类
     */
    @Data
    public static class Queue {
        /**
         * 队列名称
         */
        private String name;
        /**
         * 是否持久化
         * 默认true持久化,重启消息不会丢失
         */
        private boolean durable = true;
        /**
         * 是否具有排他性
         * 默认false,可多个消费者消费同一个队列
         */
        private boolean exclusive = false;
        /**
         * 当消费者均断开连接,是否自动删除队列
         * 默认false,不自动删除,避免消费者断开队列丢弃消息
         */
        private boolean autoDelete = false;
        /**
         * 绑定死信队列的交换机名称
         */
        private String deadLetterExchange;
        /**
         * 绑定死信队列的路由key
         */
        private String deadLetterRoutingKey;


        private Map<String, Object> arguments;
    }

}

RabbitModuleInitializer

执行初始化逻辑详情文件,具体的逻辑为根据配置文件信息创建对应的交换机和队列,并设置其属性和绑定关系。

import cn.hutool.core.convert.Convert;
import cn.seczone.starter.rabbitmq.enums.RabbitExchangeEnum;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.SmartInitializingSingleton;
import org.springframework.util.Assert;
import org.springframework.util.CollectionUtils;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;

/**
 * @Author cys
 * @Date 2024/6/17 14:23
 * @Version 1.0
 */
@Slf4j
public class RabbitModuleInitializer implements SmartInitializingSingleton {

    AmqpAdmin amqpAdmin;

    RabbitProperties rabbitProperties;

    public RabbitModuleInitializer(AmqpAdmin amqpAdmin, RabbitProperties rabbitProperties) {
        this.amqpAdmin = amqpAdmin;
        this.rabbitProperties = rabbitProperties;
    }

    @Override
    public void afterSingletonsInstantiated() {
        log.info("初始化rabbitmq交换机、队列----------------start");
        declareRabbitModule();
        log.info("初始化rabbitmq交换机、队列----------------end");
    }

    /**
     * RabbitMQ 根据配置动态创建和绑定队列、交换机
     */
    private void declareRabbitModule() {
        List<RabbitModuleInfo> rabbitModuleInfos = rabbitProperties.getModules();
        if (CollectionUtils.isEmpty(rabbitModuleInfos)) {
            return;
        }
        for (RabbitModuleInfo rabbitModuleInfo : rabbitModuleInfos) {
            configParamValidate(rabbitModuleInfo);
            // 队列
            Queue queue = convertQueue(rabbitModuleInfo.getQueue());
            // 交换机
            Exchange exchange = convertExchange(rabbitModuleInfo.getExchange());
            // 绑定关系
            String routingKey = rabbitModuleInfo.getRoutingKey();
            String queueName = rabbitModuleInfo.getQueue().getName();
            String exchangeName = rabbitModuleInfo.getExchange().getName();
            Binding binding = new Binding(queueName, Binding.DestinationType.QUEUE, exchangeName, routingKey, null);
            // 创建队列
            if (!isExistQueue(queueName)) {
                amqpAdmin.declareQueue(queue);
            }
            // 创建交换机
            amqpAdmin.declareExchange(exchange);
            // 队列 绑定 交换机
            amqpAdmin.declareBinding(binding);
        }
    }

    /**
     * RabbitMQ动态配置参数校验
     *
     * @param rabbitModuleInfo 队列和交换机机绑定关系
     */
    public void configParamValidate(RabbitModuleInfo rabbitModuleInfo) {
        String routingKey = rabbitModuleInfo.getRoutingKey();
        Assert.isTrue(StringUtils.isNotBlank(routingKey), "RoutingKey 未配置");
        Assert.isTrue(rabbitModuleInfo.getExchange() != null, String.format("routingKey:%s未配置exchange", routingKey));
        Assert.isTrue(StringUtils.isNotBlank(rabbitModuleInfo.getExchange().getName()), String.format("routingKey:%s未配置exchange的name属性", routingKey));
        Assert.isTrue(rabbitModuleInfo.getQueue() != null, String.format("routingKey:%s未配置queue", routingKey));
        Assert.isTrue(StringUtils.isNotBlank(rabbitModuleInfo.getQueue().getName()), String.format("routingKey:%s未配置exchange的name属性", routingKey));

    }

    /**
     * 转换生成RabbitMQ队列
     *
     * @param queue 队列
     * @return Queue
     */
    public Queue convertQueue(RabbitModuleInfo.Queue queue) {
        Map<String, Object> arguments = queue.getArguments();

        // 转换ttl的类型为long
        if (arguments != null && arguments.containsKey("x-message-ttl")) {
            arguments.put("x-message-ttl", Convert.toLong(arguments.get("x-message-ttl")));
        }
        // 是否需要绑定死信队列
        String deadLetterExchange = queue.getDeadLetterExchange();
        String deadLetterRoutingKey = queue.getDeadLetterRoutingKey();
        if (StringUtils.isNotBlank(deadLetterExchange) && StringUtils.isNotBlank(deadLetterRoutingKey)) {
            if (arguments == null) {
                arguments = new HashMap<>(4);
            }
            arguments.put("x-dead-letter-exchange", deadLetterExchange);
            arguments.put("x-dead-letter-routing-key", deadLetterRoutingKey);

        }
        return new Queue(queue.getName(), queue.isDurable(), queue.isExclusive(), queue.isAutoDelete(), arguments);
    }


    /**
     * 转换生成RabbitMQ交换机
     *
     * @param exchangeInfo 交换机信息
     * @return Exchange
     */
    public Exchange convertExchange(RabbitModuleInfo.Exchange exchangeInfo) {

        AbstractExchange exchange = null;
        RabbitExchangeEnum exchangeType = exchangeInfo.getType();
        String exchangeName = exchangeInfo.getName();
        boolean isDurable = exchangeInfo.isDurable();
        boolean isAutoDelete = exchangeInfo.isAutoDelete();

        Map<String, Object> arguments = exchangeInfo.getArguments();

        switch (exchangeType) {
            case DIRECT:
                // 直连交换机
                exchange = new DirectExchange(exchangeName, isDurable, isAutoDelete, arguments);
                break;
            case TOPIC:
                // 主题交换机
                exchange = new TopicExchange(exchangeName, isDurable, isAutoDelete, arguments);
                break;
            case FANOUT:
                //扇形交换机
                exchange = new FanoutExchange(exchangeName, isDurable, isAutoDelete, arguments);
                break;
            case HEADERS:
                // 头交换机
                exchange = new HeadersExchange(exchangeName, isDurable, isAutoDelete, arguments);
                break;
        }
        return exchange;
    }

    /**
     * 判断队列是否存在
     *
     * @param queueName 队列名
     * @return boolean
     */
    private boolean isExistQueue(String queueName) {
        if (StringUtils.isBlank(queueName)) {
            throw new RuntimeException("队列名称为空");
        }

        boolean flag = true;
        Properties queueProperties = amqpAdmin.getQueueProperties(queueName);
        if (queueProperties == null) {
            flag = false;
        }
        return flag;
    }


}

RabbitProperties

import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;

import java.util.List;

/**
 * @Author 码至终章
 * @Version 1.0
 */
@Component
@ConfigurationProperties(prefix = "cys.rabbit")
@Data
public class RabbitProperties {

    private List<RabbitModuleInfo> modules;
}

RabbitProducerManager

发送消息的生产者方法

public class RabbitProducerManager {
    private static final Logger log = LoggerFactory.getLogger(RabbitProducerManager.class);
    private final RabbitTemplate rabbitTemplate;

    public void sendMessage(String exchange, String rabbitRouting, Object message) {
        this.rabbitTemplate.convertAndSend(exchange, rabbitRouting, message);
        log.info("向路由:{}, 发送消息成功:{}", rabbitRouting, message);
    }

    public void sendMessage(String exchange, String rabbitRouting, Object message, CorrelationData correlationData) {
        this.rabbitTemplate.convertAndSend(exchange, rabbitRouting, message);
        log.info("向路由:{}, 发送消息成功:{}, correlationData:{}", new Object[]{rabbitRouting, message, correlationData});
    }

    public RabbitProducerManager(final RabbitTemplate rabbitTemplate) {
        this.rabbitTemplate = rabbitTemplate;
    }
}

POM.xml

<dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
            <version>2.7.18</version>
        </dependency>
        <!--RabbitMQ 依赖-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
            <version>2.7.18</version>
        </dependency>
        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-lang3</artifactId>
			<version>3.12.0</version>
        </dependency>
        <dependency>
            <groupId>cn.hutool</groupId>
            <artifactId>hutool-all</artifactId>
            <version>5.8.25</version>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.30</version>
        </dependency>
    </dependencies>

spring.factories

org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
cn.mycomponent.starter.rabbitmq.config.RabbitConfig,\
cn.mycomponent.starter.rabbitmq.client.RabbitProducerManager,\
cn.mycomponent.starter.rabbitmq.config.RabbitProperties

功能测试

application.yml配置

spring:
  profiles:
    active: dev
  ## rabbitmq链接配置  
  rabbitmq:
    host: 192.168.199.199
    port: 5672
    username: test
    password: 123456789
    virtual-host: test

cys:
  rabbit:
    modules:
      - exchange:
          name: mytest
          #type为RabbitExchangeTypeEnum枚举中的值。不配置默认为Direct
          type: DIRECT
        queue:
          name: default.queue
          arguments:
            # 队列中所有消息的最大存活时间。单位毫秒。 1分钟
            x-message-ttl: 60000
        # routing-key可以为空
        routing-key: default.queue.key

生产者:

@TableName(value ="task",autoResultMap = true)
@Data
public class TaskEntity implements Serializable {
    /**
     * 主键
     */
    @TableId(type = IdType.AUTO)
    @TableField(value = "cust_id")
    private Long custId;
}

@RestController
@RequestMapping("/mqtest")
public class MqController {

    @Autowired
    RabbitProducerManager rabbitProducerManager;
    @Autowired
    MailService mailService;

    @GetMapping("/mqtest")
    public void test(){
        TaskEntity taskEntity = new TaskEntity();
        taskEntity.setCustId(211212L);
        rabbitProducerManager.sendMessage("mytest","default.queue.key", JSON.toJSONString(taskEntity));
    }
}

消费者:

@Component
public class MyListener {

    @RabbitListener(queues = "default.queue")
    public void handMessage(String message){

        TaskEntity taskEntity = JSON.parseObject(message, TaskEntity.class);
        System.out.println("接收到的消息"+taskEntity);

    }
}

测试结果:

请求接口/mqtest/mqtest
在这里插入图片描述

总结

到这为止,关于封装rabbitmq starter就结束了。当然,本文只是介绍了最基础的部分,后续大家可以在这基础上实现扩展,比如统一接受消息再通过事件监听、同一队列设置多个消费者线程等等,说到这里,如果只是丰富的小伙伴可能会想到spring-cloud-starter-stream-rabbit,大家也可以参考参考这个是如何实现的。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1918106.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

自闭症孩子能否正常上普校:一场充满挑战与希望的探讨

在教育的舞台上&#xff0c;自闭症孩子​​​​​​​能否正常融入普通学校&#xff0c;是一个备受关注且充满争议的话题。 支持自闭症孩子上普校的观点认为&#xff0c;普通学校能为他们提供更接近真实社会的环境。在普校中&#xff0c;自闭症孩子有机会与不同类型的同学交流互…

javaweb学习day5--《HTML篇》Springboot的模块创建、HTML的相关知识点详解

一、前言 从今天开始&#xff0c;就要启动后端的学习了&#xff0c;Springboot会贯穿到底&#xff0c;一定要跟着小编严谨的去搭建Springboot环境&#xff0c;依赖添加的过程可能需要2分钟左右&#xff0c;读者们要耐心等待一下&#xff0c;搭建好Springboot之后才算正式的开始…

【深度学习入门篇 ②】Pytorch完成线性回归!

&#x1f34a;嗨&#xff0c;大家好&#xff0c;我是小森( &#xfe61;ˆoˆ&#xfe61; )&#xff01; 易编橙终身成长社群创始团队嘉宾&#xff0c;橙似锦计划领衔成员、阿里云专家博主、腾讯云内容共创官、CSDN人工智能领域优质创作者 。 易编橙&#xff1a;一个帮助编程小…

WEB攻防-通用漏洞SQL注入-ACCESS一般注入与偏移注入

ACCESS数据库 Access数据库是Microsoft Office套件中一款小型关系型数据库管理系统 单个数据库文件&#xff1a;Access数据库通常以一个单独的文件形式存在&#xff08;如.accdb或旧版本的.mdb文件&#xff09;&#xff0c;这个文件包含了数据库的所有对象&#xff0c;如表、字…

确保智慧校园安全,充分利用操作日志功能

智慧校园基础平台系统的操作日志功能是确保整个平台运行透明、安全及可追溯的核心组件。它自动且详尽地记录下系统内的每一次关键操作细节&#xff0c;涵盖操作的具体时间、执行操作的用户账号、涉及的数据对象&#xff08;例如学生信息更新、课程调度变动等&#xff09;、操作…

Windows 桌面改造小技巧 · 一键去除快捷方式小箭头和小盾牌

Windows的桌面上&#xff0c;总会有一些不如意的小地方&#xff0c;比如快捷方式上的小箭头和小盾牌图标&#xff1a; 标志挡住了应用图标&#xff0c;显得很难受 这些角标作用如下&#xff1a; 快捷方式角标是用来提示你这是一个快捷方式的&#xff0c;其实这个角标还好&…

充气膜游泳馆安全吗—轻空间

充气膜游泳馆&#xff0c;作为一种新型的游泳场馆&#xff0c;以其独特的结构和众多优点&#xff0c;逐渐受到各地体育设施建设者的青睐。然而&#xff0c;关于充气膜游泳馆的安全性&#xff0c;一些人仍然心存疑虑。那么&#xff0c;充气膜游泳馆到底安全吗&#xff1f;轻空间…

进程的状态和优先级

一.进程的退出 进程内核PCB数据和代码&#xff0c;它们都要占据内存空间&#xff0c;进程退出的核心工作之一就是释放掉自己的PCB数据和代码。 为什么要创建出进程呢&#xff1f;一定是我要进程完成某些任务&#xff01; 当进程退出了&#xff0c;不光光只是退出了这么简单&…

巢链接小程序正式上线!全面开启AI共创新时代

巢链接小程序正式上线&#xff01;全面开启AI共创新时代 &#x1f680; 今天我们官宣&#xff0c;巢链接小程序正式上线啦&#xff01;这是一款致力于打造连接开发者、创业者和运营者的全新平台&#xff0c;旨在通过共创和分享实现共同成长。快来了解巢链接小程序的诸多功能&a…

普中51单片机:定时器与计数器详解及应用(七)

文章目录 引言定时器工作原理TMOD定时器/计数器工作模式寄存器定时器工作模式模式0(13位定时器/计数器)模式1(16位定时器/计数器)模式2(8位自动重装模式)模式3(两个8位计数器) 定时器配置流程代码演示——LED1间隔1秒闪烁代码演示——按键1控制LED流水灯状态代码演示——LCD160…

抖音机构号授权矩阵系统源码:打造自媒体帝国的新利器

在自媒体风起云涌的时代&#xff0c;抖音作为短视频领域的佼佼者&#xff0c;早已成为内容创作者们争相入驻的热门平台。然而&#xff0c;随着竞争加剧&#xff0c;如何在这场流量大战中脱颖而出&#xff0c;成为每一位自媒体人不得不面对的课题。今天&#xff0c;我们将带您深…

JVM:类加载器

文章目录 一、什么是类加载器 一、什么是类加载器 类加载器&#xff08;ClassLoader&#xff09;是Java虚拟机提供给应用程序去实现获取类和接口字节码数据的技术。类加载器只参与加载过程总的字节码获取并加载到内存这一部分。

【JavaWeb程序设计】JavaBean(一)

目录 一、<jsp:useBean>、<jsp:setProperty>、<jsp:getProperty>的使用 1. 运行截图 2. UserBean.java 3. login.html 4. display.jsp 二、设计求三角形面积 1. 运行截图 2. 设计View&#xff08;inputTriangle.jsp&#xff09; 3. 设计Model&#…

AI发展到现在,国内大模型行业还有哪些机会?

随着各种AI工具的发展&#xff0c;AI领域的竞争格局正在悄然变化。GPT-4o被许多人误解为重大突破的更新&#xff0c;实际上主要是在多模态功能上的增强&#xff0c;而核心智能水平似乎仍停留在GPT-4阶段。这一现象为我们思考AI发展路径和国内大模型行业的机遇提供了新的视角。 …

运维Tips | Ubuntu 24.04 安装配置 xrdp 远程桌面服务

[ 知识是人生的灯塔,只有不断学习,才能照亮前行的道路 ] Ubuntu 24.04 Desktop 安装配置 xrdp 远程桌面服务 描述:Xrdp是一个微软远程桌面协议(RDP)的开源实现,它允许我们通过图形界面控制远程系统。这里使用RDP而不是VNC作为远程桌面,是因为Windows自带的远程桌面连接软…

软件缺陷简介

缺陷种类 遗漏&#xff0c;指规定或预期的需求为体现在产品种错误&#xff0c;需求是明确的&#xff0c;在实现阶段未将需求的功能正确实现冗余&#xff0c;需求说明文档中未涉及的需求被实现了不满意&#xff0c;用户对产品的实现不满意也成为缺陷 缺陷等级划分 致命&#…

Qt QWebSocket网络编程

学习目标&#xff1a;Qt QWebSocket网络编程 学习前置环境 QT TCP多线程网络通信-CSDN博客 学习内容 WebSocket是一种通过单个TCP连接提供全双工通信信道的网络技术。2011年&#xff0c;IETF将WebSocket协议标准化为 RFC6455&#xff0c;QWebSocket可用于客户端应用程序和服…

金龙鱼:只是躺枪?

中储粮罐车运输油罐混用事件持续发酵&#xff0c;食用油板块集体躺枪。 消费者愤怒的火&#xff0c;怕是会让食用油企们一点就着。 今天&#xff0c;我们聊聊“油”茅——金龙鱼。 一边是业内人士指出&#xff0c;油罐混用的现象普遍存在&#xff0c;另一边是金龙鱼回应称&am…

Mac虚拟机跑Windows流畅吗 Mac虚拟机连不上网络怎么解决 mac虚拟机网速慢怎么解决

随着技术的发展&#xff0c;很多用户希望能在Mac电脑上运行Windows系统&#xff0c;从而能够使用那些仅支持Windows系统的软件。使用虚拟机软件可以轻松满足这一需求。但是&#xff0c;很多人可能会有疑问&#xff1a;“Mac虚拟机跑Windows流畅吗&#xff1f;”&#xff0c;而且…

3SRB5016-ASEMI逆变箱专用3SRB5016

编辑&#xff1a;ll 3SRB5016-ASEMI逆变箱专用3SRB5016 型号&#xff1a;3SRB5016 品牌&#xff1a;ASEMI 封装&#xff1a;SGBJ-5 批号&#xff1a;2024 现货&#xff1a;50000 最大重复峰值反向电压&#xff1a;1600V 最大正向平均整流电流(Vdss)&#xff1a;50A 功…