6. SpringBoot 整合 RabbitMQ

news2025/1/16 2:46:38

二八佳人体似酥,腰间仗剑斩愚夫。虽然不见人头落,暗里教君骨髓枯。

创建一个 普通的 Spring Boot Web 项目

整合 RabbitMQ

pom.xml 添加依赖


    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.2.11.RELEASE</version>
    </parent>
        

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <!--引入 amqp 即rabbitmq 的依赖-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
        </dependency>
        <!--RabbitMQ 依赖-->
        <dependency>
            <groupId>org.springframework.amqp</groupId>
            <artifactId>spring-rabbit-test</artifactId>
        </dependency>

    </dependencies>
        

application.yml 配置文件配置

#配置服务器端相应消息
server:
  port: 8088
  servlet:
    context-path: /Server
#配置rabbitmq的相关信息
spring:
  rabbitmq:
    host: 127.0.0.1  # 配置主机
    port: 5672  # 配置端口
    virtual-host: yjl   # 配置虚拟主机
    username: guest   # 配置用户名
    password: guest  # 配置密码
    connection-timeout: 15000
    # 配置回调
    publisher-confirm-type: correlated
#配置rabbit 队列,交换器,路由等相关信息
rabbit:
  fanout:
    exchange: fanout_logs
    queue1: debug_console
    queue2: debug_file
    queue3: debug_db
  direct:
    exchange: direct_logs
    queue1: debug_console
    queue2: debug_file
    queue3: debug_db
  topic:
    exchange: topic_logs
    queue1: topic_log_console
    queue2: topic_log_file
  ttl:
    x_exchange: x
    queue_a: QA
    queue_b: QB
    queue_c: QC
    y_dead_exchange: y
    y_dead_queue_d: QD
    delayed_exchange: delayed_exchange2
    delayed_queue: delayed.queue
    delayed_routing_key: delayed_routing
  confirm:
    # 确认
    exchange: confirm_exchange_1
    queue: confirm_queue
    routing-key: key1
    backup_exchange: backup_exchange
    backup_queue: backup_queue
    warn_queue: warn_queue    

项目结构

项目结构如下:

image.png

SendMessageService 为 生产者发送消息的接口服务。

RecieveMessageService 为 消费者接收到消息后,进行的业务操作流程。

SendMessageController 为生产者创建消息的 Controller 入口。

创建队列

手动在 RabbitMQ 上创建一个队列 debug_console, 如果不存在的话。

image.png

简单的生产者发送消息

    @Resource
    private SendMessageService sendMessageService;

    @RequestMapping("/queue")
    public String queue() {
        Integer randNum = (int) (Math.random() * 1000 + 1);
        sendMessageService.sendQueue(randNum);
        return "存储到队列中的数据是:" + randNum;

    }

    @RequestMapping("/work")
    public String work() {
        sendMessageService.sendWork();
        return "批量生成循环数字";
    }

往队列发送消息, 使用 RabbitTemplate rabbitTemplate (与 RedisTemplate, JdbcTemplate 形似)

SendMessageServiceImpl.java

@Service
public class SendMessageServiceImpl implements SendMessageService {

    @Resource
    private RabbitTemplate rabbitTemplate;

    @Value("${rabbit.direct.queue1}")
    private String queueName;


    // 最普通的.
    @Override
    public void sendQueue(Integer randNum) {
        // 只发送一条消息
        rabbitTemplate.convertAndSend(queueName, String.valueOf(randNum));
    }


    @Override
    public void sendWork() {
        for (int i = 0; i < 10; i++) {
            // 发送多条消息
            rabbitTemplate.convertAndSend(queueName, "第" + i + "条消息,消息内容是:" + i);
        }
    }
}

队列消息消费

ReceiveMessageServiceImpl.java

    @Override
    public void handlerMessage(String message) {
        log.info(">>>> 获取到消息 {},开始进行业务处理",message);
        // 接下来,就是具体的业务去处理这些消息了.
    }
@Component
@Slf4j
public class DirectMqConsumer {

    @Resource
    private ReceiveMessageService receiveMessageService;


    @RabbitListener(bindings = {
            @QueueBinding(
                    value = @Queue("${rabbit.direct.queue1}"),
                    exchange = @Exchange(type = "direct", name = "${rabbit.direct.exchange}"),
                    key = {"debug", "info", "warn", "error"}
            )
    })
    public void fanoutQueueConsumerConsole(String message) {
        log.info("控制台打印输出:" + message);
        receiveMessageService.handlerMessage("控制台打印输出 direct:" + message);
    }
}

验证

访问网址: http://localhost:8088/Server/send/queue

image.png

访问网址: http://localhost:8088/Server/send/work

image.png

普通的消息异步处理是完成了。 但重要的,应该是 Fanout, Direct 和 Topic 的主题处理.

Fanout 交换机消息配置

创建交换机,队列并绑定 FanoutConfig

package com.yjl.amqp.config.fanout;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;

/**
 * Fanout 形式的 生产
 *
 * @author yuejianli
 * @date 2022-11-22
 */
@Component
public class FanoutConfig {

    @Value("${rabbit.fanout.queue1}")
    private String queue1;
    @Value("${rabbit.fanout.queue2}")
    private String queue2;

    @Value("${rabbit.fanout.exchange}")
    private String exchange;


    // 构建队列 Bean 和 Exchange Bean
    @Bean(value = "fanout_queue1")
    public Queue queue1() {
        return new Queue(queue1);
    }

    @Bean(value = "fanout_queue2")
    public Queue queue2() {
        return new Queue(queue2);
    }

    @Bean(value = "fanout_exchange")
    FanoutExchange fanoutExchange() {
        return new FanoutExchange(exchange);
    }


    //进行绑定
    @Bean
    Binding bindingFanoutExchange1(@Qualifier("fanout_queue1") Queue queue,
                                   @Qualifier("fanout_exchange") FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(queue).to(fanoutExchange);
    }

    @Bean
    Binding bindingFanoutExchange2(@Qualifier("fanout_queue2") Queue queue,
                                   @Qualifier("fanout_exchange") FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(queue).to(fanoutExchange);
    }

}

监听队列 FanoutMqConsumer

也可以使用 RabbitListener 进行绑定

package com.yjl.amqp.config.fanout;

import com.yjl.amqp.service.ReceiveMessageService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;

/**
 * fanout 的消费
 *
 * @author yuejianli
 * @date 2022-11-22
 */
@Component
@Slf4j
public class FanoutMqConsumer {

    @Resource
    private ReceiveMessageService receiveMessageService;


    @RabbitListener(queues = {"${rabbit.fanout.queue1}", "${rabbit.fanout.queue2}"})
    public void fanoutQueueConsumer1An2(String message) {
        log.info("队列 fanout:" + message);
        receiveMessageService.handlerMessage("第一个消费者和第二个消费者获取消息 fanout:" + message);
    }

    // 也可以通过 RabbitListener 进行配置
    @RabbitListener(bindings = {
            @QueueBinding(
                    value = @Queue("${rabbit.fanout.queue3}"),
                    exchange = @Exchange(type = "fanout", name = "${rabbit.fanout.exchange}"),
                    key = {}
            )
    })
    public void fanoutQueueConsumer3(String message) {
        log.info("第三个消费者获取消息 fanout:" + message);
        receiveMessageService.handlerMessage("第三个消费者获取消息 fanout:" + message);
    }
}

发送消息

SendMessageController.java

    @RequestMapping("/fanout")
    public String fanout() {
        sendMessageService.fanout();
        return "fanout生成消息";
    }

SendMessageServiceImpl.java

    @Override
    public void fanout() {
        for (int i = 1; i <= 5; i++) {
            rabbitTemplate.convertAndSend(fanoutExchange, "", "fanout 发送消息:" + i);
        }
    }

验证

输入网址: http://localhost:8088/Server/send/fanout

image.png

Direct 交换机消息配置

通过注解绑定和消费队列消息 DirectMqConsumer

ackage com.yjl.amqp.config.direct;

import com.yjl.amqp.service.ReceiveMessageService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;

/**
 * 用途描述
 *
 * @author yuejianli
 * @date 2022-11-22
 */
@Component
@Slf4j
public class DirectMqConsumer {

    @Resource
    private ReceiveMessageService receiveMessageService;


    @RabbitListener(bindings = {
            @QueueBinding(
                    value = @Queue("${rabbit.direct.queue1}"),
                    exchange = @Exchange(type = "direct", name = "${rabbit.direct.exchange}"),
                    key = {"debug", "info", "warn", "error"}
            )
    })
    public void fanoutQueueConsumerConsole(String message) {
        log.info("控制台打印输出:" + message);
        receiveMessageService.handlerMessage("控制台打印输出 direct:" + message);
    }


    @RabbitListener(bindings = {
            @QueueBinding(
                    value = @Queue("${rabbit.direct.queue2}"),
                    exchange = @Exchange(type = "direct", name = "${rabbit.direct.exchange}"),
                    key = {"info", "warn", "error"}
            )
    })
    public void fanoutQueueConsumerFile(String message) {
        log.info("文件 打印输出:" + message);
        receiveMessageService.handlerMessage("文件打印输出 direct:" + message);
    }

    @RabbitListener(bindings = {
            @QueueBinding(
                    value = @Queue("${rabbit.direct.queue3}"),
                    exchange = @Exchange(type = "direct", name = "${rabbit.direct.exchange}"),
                    key = {"warn", "error"}
            )
    })
    public void fanoutQueueConsumerDb(String message) {
        log.info("Db 打印输出:" + message);
        receiveMessageService.handlerMessage("DB 打印输出 direct:" + message);
    }

}

发送消息

SendMessageController.java

  @RequestMapping("/direct")
    public String direct() {
        sendMessageService.direct();
        return "direct 生成消息";
    }

SendMessageServiceImpl.java

     @Override
    public void direct() {
        rabbitTemplate.convertAndSend(directExchange, "debug", "debug 消息");
        rabbitTemplate.convertAndSend(directExchange, "info", "info 消息");
        rabbitTemplate.convertAndSend(directExchange, "warn", "warn 消息");
        rabbitTemplate.convertAndSend(directExchange, "error", "error 消息");
    }

验证

输入网址: http://localhost:8088/Server/send/direct

image.png

Topic 交换机消息配置

创建交换机,队列并绑定 TopicConfig

package com.yjl.amqp.config.topic;

import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;

/**
 * Topic 形式的 生产
 *
 * @author yuejianli
 * @date 2022-11-22
 */
@Component
public class TopicConfig {

    @Value("${rabbit.topic.queue1}")
    private String queue1;

    @Value("${rabbit.topic.exchange}")
    private String exchange;


    // 构建队列 Bean 和 Exchange Bean
    @Bean(value = "topic_queue1")
    public Queue queue1() {
        return new Queue(queue1);
    }

    @Bean(value = "topic_exchange")
    TopicExchange topicExchange() {
        return new TopicExchange(exchange);
    }


    //进行绑定
    @Bean
    Binding bindingTopicExchange(@Qualifier("topic_queue1") Queue queue,
                                 @Qualifier("topic_exchange") TopicExchange topicExchange) {
        return BindingBuilder.bind(queue).to(topicExchange)
                .with("*.orange.*");
    }

}

监听队列 TopicMqConsumer

package com.yjl.amqp.config.topic;

import com.yjl.amqp.service.ReceiveMessageService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;

/**
 * topic 的队列配置
 *
 * @author yuejianli
 * @date 2022-11-22
 */
@Component
@Slf4j
public class TopicMqConsumer {

    @Resource
    private ReceiveMessageService receiveMessageService;

    @RabbitListener(queues = {"${rabbit.topic.queue1}"})
    public void fanoutQueueConsumer1An2(String message) {
        log.info("队列 topic:" + message);
        receiveMessageService.handlerMessage("console topic:" + message);
    }


    @RabbitListener(bindings = {
            @QueueBinding(
                    value = @Queue("${rabbit.topic.queue2}"),
                    exchange = @Exchange(type = "topic", name = "${rabbit.topic.exchange}"),
                    key = {"lazy.#", "*.*.rabbit"}
            )
    })
    public void fanoutQueueConsumerConsole(String message) {
        log.info("file topic:" + message);
        receiveMessageService.handlerMessage("file topic:" + message);
    }
}

发送消息

SendMessageController.java

   @RequestMapping("/topic")
    public String topic() {
        sendMessageService.topic();
        return "topic 生成消息";
    }

SendMessageServiceImpl.java

  @Override
    public void topic() {

        Map<String, String> messageMap = new HashMap<>();

        messageMap.put("quick.orange.rabbit", "被队列 Q1Q2 接收到");
        messageMap.put("lazy.orange.elephant", "被队列 Q1Q2 接收到");

        messageMap.put("quick.orange.fox", "被队列 Q1 接收到");
        messageMap.put("lazy.brown.fox", "被队列 Q2 接收到");
        messageMap.put("info", "一个 info 消息3 ");

        messageMap.put("lazy.pink.rabbit", "虽然满足两个绑定但只被队列 Q2 接收一次");
        messageMap.put("quick.brown.fox", "不匹配任何绑定不会被任何队列接收到会被丢弃");
        messageMap.put("quick.orange.male.rabbit", "是四个单词不匹配任何绑定会被丢弃");

        messageMap.put("lazy.orange.male.rabbit", "是四个单词但匹配 Q2");

        messageMap.forEach((routingKey, message) -> {
            try {

                rabbitTemplate.convertAndSend(topicExchange, routingKey,
                        message);
            } catch (Exception e) {
                e.printStackTrace();
            }
        });
    }

验证

输入网址: http://localhost:8088/Server/send/topic

image.png

这是 RabbitMQ 异步处理消息的常见用法。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/59693.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【云原生】一文带你吃透FlexManager数据传入华为云IOT

文章目录一、华为云设备接入服务1、创建设备接入基础版2、创建产品实例3、在产品下创建设备实例二、FlexManager部署一、华为云设备接入服务 设备接入服务&#xff08;IoTDA&#xff09;是华为云的物联网平台&#xff0c;提供海量设备连接上云、设备和云端双向消息通信、批量设…

【Linux】守护进程

文章目录任务管理进程组作业会话任务管理操作相关操作守护进程创建守护进程daemon函数创建守护进程所谓的项目部署任务管理 进程组 进程组是一个或多个进程的集合。每个进程除了有一个进程ID之外&#xff0c;还属于一个进程组。 每个进程组有一个唯一的进程组ID。每个进程组都…

CTFShow re2 (RC4

参考&#xff1a;CTFSHOW re2 本文&#xff1a;跟着大佬的博客一步一步做CTFShow re2的记录 IDA分析 有个比较函数 re一下 s "DH~mqqvqxB^||zllJq~jkwpmvez{" s1 for i in s:s1 chr(ord(i) ^ 0x1f) print(s1)得到 再四处看看 跟进sub_401028 四个sub点进去看看…

禅道登录-调用API方式

禅道提供了API机制方便于大家和其他的系统进行集成&#xff0c;API机制也都是基于http协议的&#xff0c;返回的数据以json格式存储。禅道的API都是需要先登录后才能进行接口调用&#xff08;登录返回的cookie需要在之后的每次请求中携带用于验证身份信息&#xff09;。网上关于…

DNS中有哪些值得学习的优秀设计

为什么要有DNS 如果我们想要访问某度&#xff0c;你可以在浏览器上的搜索栏里输入112.80.248.76这个IP地址&#xff0c;直达页面。 通过IP访问网页 这样的行为&#xff0c;合法&#xff0c;但有病。 大部分人&#xff0c;连自己对象的电话号码都记不住&#xff0c;又怎么可能…

shell脚本受限执行

shell 中运行的脚本或脚本的个代码断会禁用一些正常 shell 中可以执行的命令.这是限制脚本用户的权限和最小化运行脚本导致的破坏的安全措施.受限的内容包括&#xff1a;使用 cd 命令更改工作目录. 更改环境变量$PATH, $SHELL, $BASH_ENV,或$ENV 的值. 读或更改 shell 环境选项…

ARM 反汇编工具objdump的使用简介

一、反汇编的原理 & 为什么要反汇编 arm-linux-objdump -D led.elf > led_elf.dis-D, --disassemble-all Display assembler contents of all sectionsobjdump 是 gcc 工具链中的反汇编工具&#xff0c;作用是由编译链接好的 elf 格式的可执行程序反过来得到汇编源代…

【期末大作业】基于HTML+CSS+JavaScript网上订餐系统(23个页面)

&#x1f389;精彩专栏推荐 &#x1f4ad;文末获取联系 ✍️ 作者简介: 一个热爱把逻辑思维转变为代码的技术博主 &#x1f482; 作者主页: 【主页——&#x1f680;获取更多优质源码】 &#x1f393; web前端期末大作业&#xff1a; 【&#x1f4da;毕设项目精品实战案例 (10…

26-38-javajvm-类加载器子系统

26-javajvm-类加载器子系统&#xff1a; 1.内存结构概述 2.类加载子系统 2.1 类加载器子系统的作用 1&#xff09;、类加载器子系统负责从文件系统或者网络中加载Class文件&#xff0c;class文件在文件开头有特定的文件标识。 2&#xff09;、ClassLoader只负责class文件的加…

常用的shell命令

常用的shell命令 1、ls命令 功能&#xff1a;显示文件和目录的信息 ls 以默认方式显示当前目录文件列表 ls -a 显示所有文件包括隐藏文件 ls -l 显示文件属性&#xff0c;包括大小&#xff0c;日期&#xff0c;符号连接&#xff0c;是否可读写及是否可执行 ls -lh 显示文件的…

1547_AURIX_TC275_CPU子系统_数据存储接口

全部学习汇总&#xff1a; GreyZhang/g_TC275: happy hacking for TC275! (github.com) 这个可以对比PMI来看一下&#xff0c;相比于PMI&#xff0c;DMI的结构简单很多。 1. 之前在看cache以及其对指令读取速度影响的时候&#xff0c;可能弄错了一个概念。如果cache不命中&…

【JavaEE】认识多线程(一)

✨哈喽&#xff0c;进来的小伙伴们&#xff0c;你们好耶&#xff01;✨ &#x1f6f0;️&#x1f6f0;️系列专栏:【JavaEE】 ✈️✈️本篇内容:了解多线程(初阶) &#x1f680;&#x1f680;代码存放仓库gitee&#xff1a;JavaEE初阶代码存放&#xff01; ⛵⛵作者简介&#x…

单变量微积分重点(2)

泰勒公式 用柯西定理证明 拉格朗日余项 麦克劳林展开式&#xff1a; 皮亚诺余项的泰勒公式&#xff1a; 弧长的微分 注意s(t)需要在后面证明&#xff08;定积分的知识&#xff09; 不定积分&#xff1a; 注意&#xff0c;不同的积分方法经常会得到不同的结果&#xff0c;但它们…

IDEA如何配置 Gradle 及 Gradle 安装过程(详细版)

IDEA如何配置 Gradle&#xff08;详细版&#xff09; 一、安装 Gradle 1、下载 Gradle 安装包 官网下载链接&#xff1a;https://gradle.org/releases/ 2、下载后解压 3、文件夹如图所示 二、环境变量配置 1、点击我的电脑->属性->高级系统设置->环境变量 2、新建&…

[附源码]JAVA毕业设计抗击新冠疫情专题宣传网站(系统+LW)

[附源码]JAVA毕业设计抗击新冠疫情专题宣传网站&#xff08;系统LW&#xff09; 目运行 环境项配置&#xff1a; Jdk1.8 Tomcat8.5 Mysql HBuilderX&#xff08;Webstorm也行&#xff09; Eclispe&#xff08;IntelliJ IDEA,Eclispe,MyEclispe,Sts都支持&#xff09;。 …

JSP+MySQL绿色环境保护网站的设计于实现

环保已经是当前中国的一个基本国策,国家领导人和各地政府也制定了一系列相关的政策来号召全民积极的参加到环保事业中来,为了能够更好的响应多家的号召我们开发了本JSP&#xff1a;MySQL&#xff1a;SSH 绿色环保网站,希望更多的人能够积极的参加到环保事业中来,本网站通过环境…

华为模拟器手把手安装教程-HCIE(华为网络工程师)

一、准备工作 请各位提前准备好eNSP_Setup安装程序、VirtualBox安装程序、Wireshark-win64位安装程序和WinPcap安装程序&#xff08;在wireshark安装过程中可以一起安装&#xff0c;也可以单独安装&#xff09;&#xff0c;获取相关安装程序文件可以联系小编哦&#xff01; 二、…

【蓝桥杯】第十四届模拟赛第一期及第二期填空汇总

目录 1.A题&#xff08;进制位数&#xff09; 位运算符 第一期 问题描述 解析 第二期 解析 代码 2.B题&#xff08;日期问题&#xff09; 第一期 问题描述 解析 代码实现 执行结果 第二期 问题描述 解析 3.C题&#xff08;数学问题&#xff09; 第一期 问题…

windows域控上批量修改域账号密码

目录 一、查询密码过期域账号信息 &#xff08;一&#xff09;根据OU组织架构查询密码过期账号 &#xff08;二&#xff09;查询域控所有密码过期账号 &#xff08;三&#xff09;导出dsquery查询的信息 二、批量修改过期域账号密码 &#xff08;一&#xff09;根据dsque…

【YOLO系列改进NO.47】改进激活函数为GELU

文章目录前言一、解决问题二、基本原理三、​添加方法四、总结前言 作为当前先进的深度学习目标检测算法YOLOv7&#xff0c;已经集合了大量的trick&#xff0c;但是还是有提高和改进的空间&#xff0c;针对具体应用场景下的检测难点&#xff0c;可以不同的改进方法。此后的系列…