SpringBoot实战(十一)集成RebbitMQ

news2024/11/28 20:59:05

目录

    • 1.工作原理图
      • 1.1 配置阶段
      • 1.2 生产者
      • 1.3 消费者
    • 2.Maven依赖
    • 3.常用交换机类型
      • 3.1 direct 直连交换机
      • 3.2 fanout 广播交换机
      • 3.3 topic 主题交换机
    • 4.Direct 直连交换机
      • 4.1 yml配置
      • 4.2 配置类
      • 4.3 消息推送类
      • 4.4 消息监听类
      • 4.5 测试
    • 5.Fanout 广播交换机
      • 5.1 配置类
      • 5.2 消息推送类
    • 6.topic 主题交换机
      • 6.1 配置类
      • 6.2 消息推送类
    • 7.源码地址

1.工作原理图

在这里插入图片描述

1.1 配置阶段

  • Queue(消息队列) 和 Exchange(交换机)通过 RoutingKey(路由键)进行绑定。

在这里插入图片描述

1.2 生产者

  • 通过 Exchange(交换机)和 RoutingKey(路由键)唯一确认 Queue(消息队列),推送消息内容。

在这里插入图片描述

1.3 消费者

  • 根据 Queue(消息队列)名称,接收新消息内容。

在这里插入图片描述

2.Maven依赖

<!-- RabbitMQ -->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
    <version>3.0.0</version>
</dependency>

3.常用交换机类型

3.1 direct 直连交换机

  • 根据 Exchange(交换机)和 RoutingKey(路由键)将消息推送给对应队列。

大致流程:

  1. 一个队列绑定到一个直连交换机上,同时赋予一个 RoutingKey(路由键)。

  2. 当消息通过生产者发送给 Exchange(交换机)时,Exchange(交换机)就会根据 RoutingKey(路由键)去找绑定的 Queue(消息队列)。

    Exchange -> RoutingKey -> Queue

3.2 fanout 广播交换机

  • 也叫扇形交换机,这种交换机没有 RoutingKey(路由键)概念,就算你绑定了路由键也是无视的。这个交换机在接收到消息后,会直接转发到绑定它上面的所有队列。

    Exchange -> Queue

3.3 topic 主题交换机

  • 这种交换机会根据通配符对 RoutingKey(路由键)进行匹配,只要 RoutingKey(路由键)满足匹配规则,就会被路由到对应的 Queue` 上。

通配符的匹配规则:

  • RoutingKey(路由键)必须是一串字符串,每个单词用”.“分隔;
  • 符号 “#” 表示匹配一个或多个单词;
  • 符号 “*” 表示匹配一个单词。

例如:

  • “*.123” 能够匹配到 “abc.123”,但匹配不到 “abc.def.123”;
  • “#.123” 能够匹配到 “abc.123”,也能匹配不到 “abc.def.123”。

Exchange -> RoutingKey(demo.*/demo.#)-> Queue

4.Direct 直连交换机

4.1 yml配置

server:
  port: 8081

spring:
  application:
    name: springboot-rabbitmq
  rabbitmq:
    # 此处不建议单独配置host和port,单独配置不支持连接RabbitMQ集群
  	addresses: 127.0.0.1:5672
    username: guest
    password: guest
    # 虚拟host 可以不设置,使用server默认host
    virtual-host: /
    # 是否开启发送端消息抵达队列的确认
    publisher-returns: true
    # 发送方确认机制,默认为NONE,即不进行确认;SIMPLE:同步等待消息确认;CORRELATED:异步确认
    publisher-confirm-type: correlated
    # 消费者监听相关配置
    listener:
      simple:
        acknowledge-mode: auto # 确认模式,默认auto,自动确认;manual:手动确认
        prefetch: 1 # 限制每次发送一条数据
        concurrency: 3 # 同一个队列启动几个消费者
        max-concurrency: 3 # 启动消费者最大数量
        # 重试机制
        retry:
          # 开启消费者(程序出现异常)重试机制,默认开启并一直重试
          enabled: true
          # 最大重试次数
          max-attempts: 3
          # 重试间隔时间(毫秒)
          initial-interval: 3000

4.2 配置类

RabbitDirectConfig.java

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * <p> @Title RabbitDirectConfig
 * <p> @Description 直连交换机配置
 * Direct Exchange是RabbitMQ默认的交换机模式,也是最简单的模式,根据key全文匹配去寻找队列。
 *
 * @author ACGkaka
 * @date 2023/1/12 15:09
 */
@Configuration
public class RabbitDirectConfig {

    /**
     * 队列,命名:TestDirectQueue
     *
     * @return 队列
     */
    @Bean
    public Queue TestDirectQueue() {
        // durable:是否持久化,默认是false,持久化队列:会被存储在磁盘上,当消息代理重启时仍然存在,暂存队列:当前连接有效
        // exclusive:默认false,只能被当前创建的连接使用,而且当连接关闭后队列即被删除。此参考优先级高于durable。
        // autoDelete:是否自动删除,当没有生产者或消费者使用此队列,该队列会自动删除。

        // 一般设置一下队列的持久化就好,其余两个默认false
        return new Queue("TestDirectQueue", true);
    }

    /**
     * Direct交换机,命名:TestDirectExchange
     * @return Direct交换机
     */
    @Bean
    DirectExchange TestDirectExchange() {
        return new DirectExchange("TestDirectExchange", true, false);
    }

    /**
     * 绑定 将队列和交换机绑定,并设置用于匹配键:TestDirectRouting
     * @return 绑定
     */
    @Bean
    Binding bindingDirect() {
        return BindingBuilder.bind(TestDirectQueue()).to(TestDirectExchange()).with("TestDirectRouting");
    }
}

4.3 消息推送类

SendMessageController.java

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;

/**
 * <p> @Title SendMessageController
 * <p> @Description 推送消息接口
 *
 * @author ACGkaka
 * @date 2023/1/12 15:23
 */
@RestController
public class SendMessageController {

    /**
     * 使用 RabbitTemplate,这提供了接收/发送等方法。
     */
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @GetMapping("/sendDirectMessage")
    public String sendDirectMessage() {
        String messageId = String.valueOf(UUID.randomUUID());
        String messageData = "Hello world.";
        String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
        Map<String, Object> map = new HashMap<>();
        map.put("messageId", messageId);
        map.put("messageData", messageData);
        map.put("createTime", createTime);
        // 将消息携带绑定键值:TestDirectRouting,发送到交换机:TestDirectExchange
        rabbitTemplate.convertAndSend("TestDirectExchange", "TestDirectRouting", map);
        return "OK";
    }
}

4.4 消息监听类

DirectReceiver.java

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.util.Map;

/**
 * <p> @Title DirectReceiver
 * <p> @Description 直连型交换机监听类
 *
 * @author ACGkaka
 * @date 2023/1/12 15:59
 */
@Component
@RabbitListener(queues = "TestDirectQueue")
public class DirectReceiver {

    @RabbitHandler
    public void process(Map testMessage) {
        System.out.println("DirectReceiver消费者收到消息:" + testMessage.toString());
    }
}

4.5 测试

1)先注释消息监听类,请求如下地址: http://localhost:8081/sendDirectMessage

在这里插入图片描述

2)打开RabbitMQ页面,可以看到有一条就绪待消费的消息。

在这里插入图片描述

3)打开消息监听类注释,重启项目,可以看到消息被消费了。

在这里插入图片描述

在这里插入图片描述

5.Fanout 广播交换机

5.1 配置类

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * <p> @Title RabbitFanoutConfig
 * <p> @Description 广播交换机配置
 * Fanout 就是我们熟悉的广播模式或者订阅模式,给Fanout交换机发送消息,绑定了这个交换机的所有队列都收到这个消息。
 *
 * @author zhj
 * @date 2023/1/16 5:42
 */
@Configuration
public class RabbitFanoutConfig {

    public static final String FANOUT_QUEUE_NAME_1 = "testFanoutQueue1";
    public static final String FANOUT_QUEUE_NAME_2 = "testFanoutQueue2";

    @Bean
    public Queue testFanoutQueue1() {
        return new Queue(RabbitFanoutConfig.FANOUT_QUEUE_NAME_1);
    }
    @Bean
    public Queue testFanoutQueue2() {
        return new Queue(RabbitFanoutConfig.FANOUT_QUEUE_NAME_2);
    }
    /**
     * 任何发送到Fanout Exchange的消息都会被转发到与该Exchange绑定(Binding)的所有队列上。
     */
    @Bean
    FanoutExchange testFanoutExchange() {
        return new FanoutExchange("testFanoutExchange", true, false);
    }
    @Bean
    Binding bindingFanout1(Queue testFanoutQueue1, FanoutExchange testFanoutExchange) {
        return BindingBuilder.bind(testFanoutQueue1).to(testFanoutExchange);
    }
    @Bean
    Binding bindingFanout2(Queue testFanoutQueue2, FanoutExchange testFanoutExchange) {
        return BindingBuilder.bind(testFanoutQueue2).to(testFanoutExchange);
    }
}

5.2 消息推送类

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;

/**
 * <p> @Title SendMessageController
 * <p> @Description 推送消息接口
 *
 * @author ACGkaka
 * @date 2023/1/12 15:23
 */
@RestController
public class SendMessageController {

    /**
     * 使用 RabbitTemplate,这提供了接收/发送等方法。
     */
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @GetMapping("/sendFanoutMessage")
    public String sendFanoutMessage() {
        String messageId = String.valueOf(UUID.randomUUID());
        String messageData = "Hello world.";
        String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
        Map<String, Object> map = new HashMap<>();
        map.put("messageId", messageId);
        map.put("messageData", messageData);
        map.put("createTime", createTime);
        // 将消息携带绑定键值:testFanoutRouting,发送到交换机:testFanoutExchange
        rabbitTemplate.convertAndSend("testFanoutExchange", "testFanoutRouting", map);
        return "OK";
    }
}

其余与 Direct 直连交换机基本相同,不再赘述。

6.topic 主题交换机

6.1 配置类

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * <p> @Title RabbitTopicConfig
 * <p> @Description 主题交换机配置类
 *
 * @author zhj
 * @date 2023/1/16 6:02
 */
@Configuration
public class RabbitTopicConfig {

    public static final String TOPIC_QUEUE_NAME_1 = "testTopicQueue1";
    public static final String TOPIC_QUEUE_NAME_2 = "testTopicQueue2";
    public static final String TOPIC_QUEUE_NAME_3 = "testTopicQueue3";
    public static final String TOPIC_EXCHANGE_NAME = "testTopicExchange";
    public static final String TOPIC_ROUTING_NAME_1 = "test";
    public static final String TOPIC_ROUTING_NAME_2 = "test.topic";
    public static final String TOPIC_ROUTING_NAME_3 = "test.topic.message";

    @Bean
    public Queue testTopicQueue1() {
        return new Queue(RabbitTopicConfig.TOPIC_QUEUE_NAME_1);
    }
    @Bean
    public Queue testTopicQueue2() {
        return new Queue(RabbitTopicConfig.TOPIC_QUEUE_NAME_2);
    }
    @Bean
    public Queue testTopicQueue3() {
        return new Queue(RabbitTopicConfig.TOPIC_QUEUE_NAME_3);
    }
    /**
     * 交换机(Exchange) 描述:接收消息并且转发到绑定的队列,交换机不存储消息
     */
    @Bean
    TopicExchange testTopicExchange() {
        return new TopicExchange(RabbitTopicConfig.TOPIC_EXCHANGE_NAME, true, false);
    }

    /**
     * 綁定队列 testTopicQueue1() 到 testTopicExchange 交换机,路由键只接受完全匹配 test.topic1 的队列接受者可以收到消息
     */
    @Bean
    Binding bindingTestTopic1(Queue testTopicQueue1, TopicExchange testTopicExchange) {
        return BindingBuilder.bind(testTopicQueue1).to(testTopicExchange).with(RabbitTopicConfig.TOPIC_ROUTING_NAME_1);
    }
    @Bean
    Binding bindingTestTopic2(Queue testTopicQueue2, TopicExchange testTopicExchange) {
        return BindingBuilder.bind(testTopicQueue2).to(testTopicExchange).with(RabbitTopicConfig.TOPIC_ROUTING_NAME_2);
    }
    @Bean
    Binding bindingTestTopic3(Queue testTopicQueue3, TopicExchange testTopicExchange) {
        return BindingBuilder.bind(testTopicQueue3).to(testTopicExchange).with(RabbitTopicConfig.TOPIC_ROUTING_NAME_3);
    }
}

6.2 消息推送类

import com.demo.config.RabbitTopicConfig;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;

/**
 * <p> @Title SendMessageController
 * <p> @Description 推送消息接口
 *
 * @author ACGkaka
 * @date 2023/1/12 15:23
 */
@RestController
public class SendMessageController {

    /**
     * 使用 RabbitTemplate,这提供了接收/发送等方法。
     */
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @GetMapping("/sendTopicMessage")
    public String sendTopicMessage() {
        String messageId = String.valueOf(UUID.randomUUID());
        String messageData = "Hello world.";
        String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
        Map<String, Object> map = new HashMap<>();
        map.put("messageId", messageId);
        map.put("messageData", messageData);
        map.put("createTime", createTime);
        // 将消息携带绑定键值:testFanoutRouting,发送到交换机:testFanoutExchange
        rabbitTemplate.convertAndSend(RabbitTopicConfig.TOPIC_EXCHANGE_NAME, "test.#", map);
        return "OK";
    }
}

其余与 Direct 直连交换机基本相同,不再赘述。

7.源码地址

源码地址: https://gitee.com/acgkaka/SpringBootExamples/tree/master/springboot-rabbitmq





参考地址:

1.Springboot 整合RabbitMq ,用心看完这一篇就够了,https://blog.csdn.net/qq_35387940/article/details/100514134

2.【RabbitMQ的那点事】如何保证消息的正确发送,https://www.jianshu.com/p/15f0c1a105fb

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/166409.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

javascript画全年日历

前些日子闲聊群里有人提了用js画全年日历的需求&#xff0c;趁闲暇时间画了个小demo&#xff0c;下面还是先上效果图吧。 高亮显示的是今天的日期和标记要高亮显示的日期&#xff0c;也添加了点击事件的钩子&#xff0c;自己可以实现钩子函数&#xff0c;从而操作点击的日期值。…

综述 | 深度强化学习在自动驾驶中的应用

本文是2020年的综述论文《Deep Reinforcement Learning for Autonomous Driving: A Survey》的部分内容节选。翻译稿全文共2万6千字&#xff0c;本文略掉了第3、4节强化学习理论的介绍及扩展部分。摘要随着深度表征学习(deep representation learning)的发展&#xff0c;强化学…

【8】SCI易中期刊推荐——图像处理领域(中科院4区)

🚀🚀🚀NEW!!!SCI易中期刊推荐栏目来啦 ~ 📚🍀 SCI即《科学引文索引》(Science Citation Index, SCI),是1961年由美国科学信息研究所(Institute for Scientific Information, ISI)创办的文献检索工具,创始人是美国著名情报专家尤金加菲尔德(Eugene Garfield…

windows下Docker部署Flask的教程

Docker默认安装路径是在C盘&#xff0c;Windows中修改Docker**默认安装****路径方法&#xff1a; 1.先创建 D:\Program Files\Docker 目录 2.运行命令&#xff0c;创建链接 mklink /J "C:\Program Files\Docker" "D:\codeSoftware\Docker"3.点击exe安装…

logstash 向多目标输出多份日志输出syslog

logstash默认情况下是内置了输入syslog日志的&#xff0c;但是不支持输出syslog&#xff0c;需要输出syslog的情况下&#xff0c;就需要手动安装logstash-output-syslog插件。安装方法如下&#xff1a;下载logstash-output-syslog插件&#xff0c;https://rubygems.org/downloa…

SpringBoot 注册自己的Servlet(三种方式)

SpringBoot 注册自己的Servlet&#xff08;三种方式&#xff09; 目录SpringBoot 注册自己的Servlet&#xff08;三种方式&#xff09;方法1:使用servlet3.0规范提供的注解方式注册Servlet1,声明servlet及映射2&#xff0c;加上ServletComponentScan 才会扫描加了这个注解运行结…

LeetCode 62. 不同路径

&#x1f308;&#x1f308;&#x1f604;&#x1f604; 欢迎来到茶色岛独家岛屿&#xff0c;本期将为大家揭晓LeetCode 62. 不同路径&#xff0c;做好准备了么&#xff0c;那么开始吧。 &#x1f332;&#x1f332;&#x1f434;&#x1f434; 一、题目名称 LeetCode 62. …

蚂蚁智能内容合规产品,提供一站式营销合规管控解决方案

随着互联网服务的不断深化&#xff0c;产品营销的形式从传统文本、长图文&#xff0c;增加到短视频、直播等新媒介形态&#xff0c;展现形式愈加丰富的同时&#xff0c;也为营销宣传内容合规审核带来了诸多难题。如何解决与日俱增的审核量与合规审核人员有限之间的矛盾&#xf…

旧手机闲置?教你用Termux搭建个移动服务器

目录 前言 准备工作 实践 安装Termux&#xff1a; 运行Termux&#xff1a; 环境配置&#xff1a; 效果展示 写在最后 前言 最近偶然看到网上有人用KSWEB搭建本地服务器&#xff0c;于是突发奇想也想在手机中搭建一个node环境试试&#xff0c;趁着周末有空&#xff0c;…

Vue3商店后台管理系统设计文稿篇(五)

记录使用vscode构建Vue3商店后台管理系统&#xff0c;这是第五篇&#xff0c;主要记录Vue3项目路由知识 文章目录一、Vue3路由二、安装Element Plus三、NPM设置淘宝镜像四、Yarn 设置淘宝镜像正文内容&#xff1a; 一、Vue3路由 路由用于设定访问路径, 将路径和组件映射起来&…

【vue系列-06】vue的组件化编程

深入理解vue的组件一&#xff0c;vue组件1&#xff0c;什么是vue组件2&#xff0c;单文件组件和非单文件组件3&#xff0c;非单组件的基本使用4&#xff0c;vue组件命名规范4.1&#xff0c;一个单词组成4.2&#xff0c;多个单词组成5&#xff0c;组件与组件间的嵌套6&#xff0…

Tomcat结构体系

总体结构Tomcat中最顶层的容器是Server&#xff0c;代表着整个服务器&#xff0c;从上图中可以看出&#xff0c;一个Server可以包含至少一个Service&#xff0c;用于具体提供服务。Service主要包含两个部分&#xff1a;Connector和Container。从上图可以看出 Tomcat 的心脏就是…

opencv的mat openvino的tensor libtorch的tensor

opencv的mat 对于矩阵数据&#xff0c;在opencv里面是通过使用mat这个数据结构来实现的&#xff0c;我觉得这个数据结构本身设计是用来做图片的存储&#xff0c;所以很多的教程都是关于三维矩阵的&#xff08;其中一个维度是channel&#xff09;&#xff0c;关于三维矩阵的定义…

通讯录小练习:柔性数组和文件操作实现

目录 一.程序功能 二.定义关键类型的头文件与枚举的应用 三.封装柔性数组的增容函数与缩容函数 四.添加联系人功能模块 五 .联系人信息打印模块 六. 查找指定联系人的模块 七.删除指定联系人模块 八.修改指定联系人信息模块 九.排序模块 九.文件操作模块 十.通讯录初…

如何实现外网远程登录访问jupyter notebook?

Jupyter Notebook是一个交互式笔记本&#xff0c;本质是一个 Web 应用程序&#xff0c;支持运行 40 多种编程语言&#xff0c;此前被称为 IPython notebook。Jupyter Notebook 便于创建和共享程序文档、支持实时代码、数学方程、可视化和 markdown&#xff0c;应用场景有数据清…

机器学习基础——k-近邻算法概述和简单实现

本章内容 k-近邻分类算法 从文本文件中解析数据 前言 众所周知&#xff0c;电影可以按照题材分类&#xff0c;然而题材本身是如何定义的?由谁来判定某部电影属于哪个题材?也就是说同一题材的电影具有哪些公共特征?这些都是在进行电影分类时必须要考虑的问题。没有哪个电影人…

Revit问题:降板表面填充图案和构件上色

一、Revit中如何为降板表面填充不同的图案 在平面图中该如何利用填充图案来区别降板跟楼板&#xff1f; 1、中间的楼板为降板(120)/-150mm,下面我们通过“过滤器”来为其填充表面图案。 2、通过快捷键VV打开“可见性/图形替换”对话框&#xff0c;单击选择“过滤器”一项。 3、…

2023/1 寒假期间自学c++计划安排

寒假一期学习总结 寒假一期学习是在线下进行的&#xff0c;总的来说&#xff0c;非常充实&#xff0c;也很有收获&#xff0c;成体系的学习了 二分&#xff0c;高精度&#xff0c;函数&#xff0c;结构体&#xff0c;STL 等等内容&#xff0c;既开心有学到了知识。 在这7天的集…

最新ios证书申请流程

苹果官方申请ios证书的方法&#xff0c;需要mac电脑&#xff0c;需要使用钥匙串管理先生成csr文件&#xff0c;然后去苹果开发者中心生成证书&#xff0c;然后再用mac电脑导出p12证书。假如我们没有mac电脑&#xff0c;又如何申请证书呢&#xff1f;这个教程我将教会大家如何使…

从汇编的角度了解C++原理——类的储存结构和函数调用

本文用到的反汇编工具是objconv&#xff0c;使用方法可以看我另一篇文章https://blog.csdn.net/weixin_45001971/article/details/128660642。 1、类的储存结构和函数调用 以这段代码为例。 编译后对obj文件反汇编&#xff0c;得到以下汇编代码&#xff0c;配合常量的值来分…