Spring Boot 整合RabbitMQ

news2025/1/16 5:48:56

系列文章目录

第一章 Java线程池技术应用
第二章 CountDownLatch和Semaphone的应用
第三章 Spring Cloud 简介
第四章 Spring Cloud Netflix 之 Eureka
第五章 Spring Cloud Netflix 之 Ribbon
第六章 Spring Cloud 之 OpenFeign
第七章 Spring Cloud 之 GateWay
第八章 Spring Cloud Netflix 之 Hystrix
第九章 代码管理gitlab 使用
第十章 SpringCloud Alibaba 之 Nacos discovery
第十一章 SpringCloud Alibaba 之 Nacos Config
第十二章 Spring Cloud Alibaba 之 Sentinel
第十三章 JWT
第十四章 RabbitMQ应用

在这里插入图片描述


文章目录

  • 系列文章目录
    • @[TOC](文章目录)
  • 前言
  • 1、RabbitMQ概念概念
    • 1.1、生产者和消费者
    • 1.2、队列
    • 1.3、交换机、路由键、绑定
      • 1.3.1、交换机类型
  • 2、RabbitMQ运转流程
    • 2.1、生产者发送消息流程
    • 2.2、消费者接收消息的过程
    • 2.3、AMQP协议
  • 3、RabbitMQ windows安装
    • 3.1、下载
    • 3.2、安装
  • 4、Spring Boot 整合RabbitMQ
    • 4.1、在user-service添加依赖
    • 4.2、配置文件添加
    • 4.3、增加RabbitMQ配置类
    • 4.4、新增消费监听类
    • 4.5、消息生产端

前言

一般MQ用于系统解耦、削峰使用,常见于微服务、业务活动等场景。 MQ(消息队列)在微服务、业务活动等场景中的应用主要表现为系统解耦和削峰。

系统解耦

场景描述:在微服务架构中,服务与服务之间需要通信。如果采用直接调用方式,服务间会存在强依赖关系,一个服务的改动可能引发连锁反应。
MQ作用:服务间可以通过消息队列进行通信,一个服务将消息放入队列,另一个服务从队列中取出消息进行处理。这种方式下,服务间实现了解耦,降低了相互的依赖。

削峰

场景描述:在业务活动期间,由于用户请求量短时间内剧增,可能导致系统压力过大甚至崩溃。
MQ作用:通过消息队列实现请求的缓冲。在高并发场景下,系统可以将请求放入消息队列,然后异步处理这些请求,从而平滑系统的处理负载,确保系统的稳定性。

综上所述,MQ因其独特的队列属性和消息传递模式,在分布式、微服务架构中发挥着重要的作用,提高了系统的可用性和稳定性。

1、RabbitMQ概念概念

RabbitMQ整体上是一个生产者与消费者模型,主要负责接收、存储和转发消息。
在这里插入图片描述

1.1、生产者和消费者

  • Producer:生产者,就是投递消息的一方。消息一般可以包含2个部分:消息体和标签(Label)。消息的标签用来描述这条消息,比如一个交换器的名称和一个路由键。
  • Consumer:消费者,就是接受消息的一方。消费者连接到RabbitMQ服务器,并订阅到队列上。当消费者消费一条消息时,只是消费消息的消息体(payload)
  • Broker:消息中间件的服务节点。一个RabbitMQ Broker看做一台RabbitMQ服务器
    在这里插入图片描述

1.2、队列

Queue:队列,是RabbitMQ的内部对象,用于存储消息
在这里插入图片描述
在这里插入图片描述

1.3、交换机、路由键、绑定

Exchange:交换器。生产者将消息发送到Exchange(交换器,通常也可以用大写的"X"来表示),有交换器将消息路由到一个或者多个队列中。如果路由不到,或许会返回给生产者,或许直接丢弃。
在这里插入图片描述
RoutingKey:路由键。生产者将消息发给交换器的时候,一般会指定一个RoutingKey,用来指定这个消息的路由规则,而这个RoutingKey需要与交换器类型和绑定键(BindingKey)联合使用才能最终生效。
Binding:绑定。RabbitMQ中通过绑定将交换器与队列联合起来,在绑定的时候一般会指定一个绑定键(BindingKey),这样RabbitMQ就知道如何正确地将消息路由到队列了。
在这里插入图片描述

1.3.1、交换机类型

  • Direct Exchange:直连交换机,根据Routing Key(路由键)进行投递到不同队列。
    在这里插入图片描述
    在这里插入图片描述
  • Fanout Exchange:扇形交换机,采用广播模式,根据绑定的交换机,路由到与之对应的所有队列。
    在这里插入图片描述
  • Topic Exchange:主题交换机,对路由键进行模式匹配后进行投递,符号#表示一个或多个词,*表示一个词。
    在这里插入图片描述
  • Header Exchange:头交换机,不处理路由键。而是根据发送的消息内容中的headers属性进行匹配。
    在这里插入图片描述
    自学参考:https://blog.csdn.net/qq_38550836/article/details/95358353

2、RabbitMQ运转流程

2.1、生产者发送消息流程

  • 生产者连接到RabbitMQ Broker,建立一个连接(Connection),开启一个信道(Channel)
  • 生产者声明一个交换器,并设置相关属性,比如交换器类型、是否持久化等
  • 生产者声明一个队列并设置相关属性,比如是否排他、是否持久化、是否自动删除等
  • 生产者通过路由键将交换器和队列绑定起来
  • 生产者发送消息至RabbitMQ Broker,其中包含路由键、交换器等信息
  • 相应的交换器根据接收到的路由键查找相匹配的队列。
  • 如果找到,则将从生产者发送过来的消息存入相应的队列。
  • 如果没有找到,则根据生产者配置的属性选择丢弃还是回退给生产者
  • 关闭信道
  • 关闭连接

2.2、消费者接收消息的过程

  • 消费者连接到RabbitMQ Broker,建立一个连接(Connection),开启一个信道(Channel)。
  • 消费者向RabbitMQ Broker请求消费相应队列中的消息,可能会设置相应的回调函数,以及做一些准备工作。
  • 等待RabbitMQ Broker回应并投递相应队列中队列的消息,消费者接收消息。
  • 消费者确认(ack)接收到的消息。
  • RabbitMQ从队列中删除相应已经被确认的消息。
  • 关闭信道
  • 关闭连接
    在这里插入图片描述无论是生产者还是消费者,都需要和RabbitMQ Broker建立连接,这个连接就是一条TCP连接,也就是Connection。一旦TCP连接建立起来,客户端紧接着可以创建一个AMQP信道(Channel),每个信道都会被指派一个唯一的ID。信道是建立在Connection之上的虚拟连接,RabbitMQ处理的每条AMQP指令都是通过信道完成的。

2.3、AMQP协议

Advanced Message Queuing Protocol,一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件不同产品,不同的开发语言等条件的限制。Erlang中的实现有RabbitMQ等
在这里插入图片描述

  • Broker:接收和分发消息的应用,RabbitMQ 就是 Message Broker
  • Virtual Host:虚拟 Broker,将多个单元隔离开
  • Connection:publisher / consumer 和 broker 之间的 tcp 连接
  • Channel:connection 内部建立的逻辑连接,通常每个线程创建单独的 channel
  • Routing key:路由键,用来指示消息的路由转发,相当于快递的地址
  • Exchange:交换机,相当于快递的分拨中心
  • Queue:消息队列,消息最终被送到这里等待 consumer 取走
  • Binding:exchange 和 queue 之间的虚拟连接,用于 message 的分发依据

3、RabbitMQ windows安装

3.1、下载

https://github.com/erlang/otp/releases/download/OTP-25.2/otp_win64_25.2.exe
https://github.com/rabbitmq/rabbitmq-server/releases/download/v3.11.5/rabbitmq-server-3.11.5.exe

3.2、安装

配置环境变量
在这里插入图片描述
在这里插入图片描述

cd D:\Program Files\RabbitMQ Server\rabbitmq_server-3.11.5\sbin

开启rabbitmq-plugins插件

rabbitmq-plugins enable rabbitmq_management
在这里插入图片描述

打开地址
http://127.0.0.1:15672/
在这里插入图片描述
输入用户名/密码:guest/guest

4、Spring Boot 整合RabbitMQ

4.1、在user-service添加依赖

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

4.2、配置文件添加

spring:
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    username: guest
    password: guest

4.3、增加RabbitMQ配置类

package com.xxxx.user.config;

import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitMQConfig {
    /******************direct**********************/
    /**
     * 创建direct队列
     * @return
     */
    @Bean
    public Queue directQueue(){
        return new Queue("directQueue");
    }

    /**
     * 创建direct交换机
     * @return
     */
    @Bean
    public DirectExchange directExchange(){
        return new DirectExchange("directExchange");
    }

    /**
     * 把队列和交换机绑定在一起
     * @param queue
     * @param directExchange
     * @return
     */
    @Bean
    public Binding bindingDirect(@Qualifier("directQueue") Queue queue, DirectExchange directExchange){
        return BindingBuilder.bind(queue).to(directExchange).with("routingKey");
    }

    /******************topic**********************/
    @Bean
    public Queue topicQuerue1(){
        return new Queue("topicQuerue1");
    }

    @Bean
    public Queue topicQuerue2(){
        return new Queue("topicQuerue2");
    }
    @Bean
    public TopicExchange topicExchange(){
        return new TopicExchange("topicExchange");
    }
    @Bean
    public Binding bindingTopic1(@Qualifier("topicQuerue1") Queue queue,@Qualifier("topicExchange") TopicExchange topicExchange){
        return BindingBuilder.bind(queue).to(topicExchange).with("topic.key1");
    }

    /**
     * 通配符:* 表示一个词,# 表示零个或多个词
     * @param queue
     * @param topicExchange
     * @return
     */
    @Bean
    public Binding bindingTopic2(@Qualifier("topicQuerue2") Queue queue,@Qualifier("topicExchange") TopicExchange topicExchange){
        return BindingBuilder.bind(queue).to(topicExchange).with("topic.#");
    }

    /******************fanout**********************/
    @Bean
    public Queue fanoutQueue1(){
        return new Queue("fanoutQueue1");
    }
    @Bean
    public Queue fanoutQueue2(){
        return new Queue("fanoutQueue2");
    }
    @Bean
    public Queue fanoutQueue3(){
        return new Queue("fanoutQueue3");
    }
    @Bean
    public FanoutExchange fanoutExchange(){
        return new FanoutExchange("fanoutExchange");
    }

    @Bean
    public Binding bindingFanout1(@Qualifier("fanoutQueue1") Queue queue,@Qualifier("fanoutExchange") FanoutExchange fanoutExchange){
        return BindingBuilder.bind(queue).to(fanoutExchange);
    }

    @Bean
    public Binding bindingFanout2(@Qualifier("fanoutQueue2") Queue queue,@Qualifier("fanoutExchange") FanoutExchange fanoutExchange){
        return BindingBuilder.bind(queue).to(fanoutExchange);
    }

    @Bean
    public Binding bindingFanout3(@Qualifier("fanoutQueue3") Queue queue,@Qualifier("fanoutExchange") FanoutExchange fanoutExchange){
        return BindingBuilder.bind(queue).to(fanoutExchange);
    }
}

4.4、新增消费监听类

package com.xxxx.user.consumer;

import com.xxxx.drp.common.entity.UserInfo;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
@Slf4j
@RabbitListener(queues = "directQueue")
public class DataDirectReceiver {
    @RabbitHandler
    public void process(String data){
        log.info("收到directQueue队列信息:" + data);
    }

    @RabbitHandler
    public void process(UserInfo data){
        log.info("收到directQueue队列信息:" + data);
    }
}

package com.xxxx.user.consumer;

import com.xxxx.common.entity.UserInfo;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
@Slf4j
@RabbitListener(queues = {"topicQuerue1","topicQuerue2"})
public class DataFanoutReceiver {
    @RabbitHandler
    public void process(String data){
        log.info("收到topicQuerue队列信息:" + data);
    }

    @RabbitHandler
    public void process(UserInfo data){
        log.info("收到topicQuerue队列信息:" + data);
    }
}
package com.xxxx.user.consumer;

import com.xxxx.common.entity.UserInfo;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
@Slf4j
@RabbitListener(queues = {"fanoutQueue1","fanoutQueue2","fanoutQueue3"})
public class DataTopicReceiver {
    @RabbitHandler
    public void process(String data){
        log.info("收到topicQuerue队列信息:" + data);
    }

    @RabbitHandler
    public void process(UserInfo data){
        log.info("收到topicQuerue队列信息:" + data);
    }
}

4.5、消息生产端

package com.xxxx.user;

import com.xxxx.common.entity.UserInfo;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
@SpringBootTest
public class DataSender {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    public void sendDirect(){
        UserInfo userInfo = new UserInfo();
        userInfo.setUserAccount("tiger");
        userInfo.setPassword("12345");
        this.rabbitTemplate.convertAndSend("directExchange","routingKey",userInfo);
    }
    @Test
    public void sendTopic(){
        this.rabbitTemplate.convertAndSend("topicExchange","topic.key2","Hello world topic");
    }

    @Test
    public void sendFanout(){
        this.rabbitTemplate.convertAndSend("fanoutExchange","","Hello world topic");
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1174550.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【m98】abseil-cpp的cmake构建

m79的代码有些头文件没有,比如#include "absl/numeric/bits.h"使用m98版本里的代码,支持cmake构建cmake版本 WIN32 DEBUG configure Selecting Windows SDK version 10.0.22000.0 to target Windows 10.0.22621. The CXX compiler identification is MSVC 19.37.32…

第七章 Python常用函内置函数

系列文章目录 第一章 Python 基础知识 第二章 python 字符串处理 第三章 python 数据类型 第四章 python 运算符与流程控制 第五章 python 文件操作 第六章 python 函数 第七章 python 常用内建函数 第八章 python 类(面向对象编程) 第九章 python 异常处理 第十章 python 自定…

解决Visual Studio 2010 运行时屏幕一闪而过,无结果显示的问题

安装配置&#xff1a;Visual Studio 2010 软件安装教程&#xff08;附下载链接&#xff09;——计算机二级专用编程软件https://blog.csdn.net/W_Fe5/article/details/134218817?spm1001.2014.3001.5502 1、 我们在运行时会出现窗口一闪而过&#xff0c;这时候我们右键Test_1…

C++初阶-类和对象(中)2

类和对象&#xff08;中&#xff09;2 一、赋值运算符重载运算符重载赋值运算符重载前置和后置重载 二、日期类的实现三、const成员四、取地址及const取地址操作符重载 一、赋值运算符重载 运算符重载 C为了增强代码的可读性引入了运算符重载&#xff0c;运算符重载是具有特殊…

​软考-高级-信息系统项目管理师教程 第四版【第14章-项目沟通管理-思维导图】​

软考-高级-信息系统项目管理师教程 第四版【第14章-项目沟通管理-思维导图】 课本里章节里所有蓝色字体的思维导图

c语言练习100(贪吃蛇的实现)

贪吃蛇的实现 先实现主界面&#xff0c;后续将会不断完善。&#xff08;逐渐添加更多的功能&#xff09; test.c #define _CRT_SECURE_NO_WARNINGS #include"snake.h" void Test() {Snake snake { 0 };//创建贪吃蛇//1.游戏开始 - 初始化游戏GameStart(&sn…

类锁和实例对象锁你分清了吗?

系列文章目录 文章目录 系列文章目录前言一、什么是锁竞争&#xff1f;二、什么是类锁&#xff1f;什么是实例对象锁&#xff1f;三、给类对象加锁不是锁住了整个类四、总结 前言 java选手们应该都对锁不陌生&#xff0c;加锁了就是为保证操作语句的原子性&#xff0c;如果你是…

二AcW826. 单链表

#include<iostream>using namespace std;const int N100010;//head头结点下标//e[i]值//ne[i]下一个位置的地址//idx当前已经用到了哪个点int head, e[N],ne[N],idx;void init(){head-1;idx0; }void add_to_head(int x)//插到head{e[idx]x;ne[idx]head;//以前head指针是指…

多媒体应用设计师 2023年(含答案回忆版)

以下是小红书上的回忆版 软考考完疯狂回忆&#xff0c;多媒体应用设计师选择题 1.pattern 2.effective 3.merge 4.applications 5.graphic 6.udp 7.rtp 8.rtsp 9.10cm 10.永久 11…97 12.工作技术管理标准 13.管理型元数据 14.premiere 15.wave 16.500km/h 17.3M 18.44000 19.…

11.1~11.2数电实验一些点+11.4~11.5报错复盘

方框写在前面是说这个数有多大&#xff0c;写在后面是说这类数有多少 前面的用于计数&#xff0c;每位无实际意义&#xff1b;后面每位都代表一个同类型的&#xff0c;即数组&#xff0c;每位有实际意义 使用四位格雷码作为深度为8的FIFO的读写指针 将格雷码转换成四位二进制…

Amlogic IR模块Linux驱动分析

目录 一、简介 1、了解IR协议 2、代码结构介绍 二、硬件原理及连接 2、芯片手册解读 三、驱动代码分析 1、设备树介绍 1&#xff09;reg 2&#xff09;protocol 3&#xff09;pinctrl 4&#xff09;map 2、linux驱动介绍 1&#xff09;makefile 2&#xff09;数据…

【C++--string模拟实现】

一、基本思路 新建一个项目&#xff0c;在项目中创建头文件string.h 源文件string.cpp 在头文件中&#xff0c;先定义一个新的命名空间&#xff08;为了防止与库中的string发生冲突&#xff09;&#xff0c;命名空间的名字可以按照自己意愿来命名。 接下来就可以在命名空间中…

高斯过程回归 | 高斯过程回归(GPR)区间预测

对于高斯过程,高斯指的是多元高斯分布,过程指的是随机过程。 我们都知道随机过程就是指函数的分布,那么多元高斯分布实际上应该是指无限元的高斯分布。 协方差函数也称为核函数,是高斯过程回归的重点。核函数的选取方式有很多,包括径向基函数(高斯核函数)、线性核函数、…

C++相关练习及详细讲解

目录 题1&#xff1a;输出数组中第k小的数在数组内找出查找数字在该数组第一次出现的索引 题1&#xff1a;输出数组中第k小的数 题目描述&#xff1a; 给定一个数组arr 输出数组中第k小的数 如果不存在 输出-1 输入格式&#xff1a; 第一行输入一个数字n 代表数组arr大小 第二…

S32K324 UDS Bootloader开发-下位机篇-Bootload软件(1)

文章目录 前言启动过程Bootloader开发链接文件编译文件跳转函数CAN收发相关发送接收初始化及使能CAN周期函数总结前言 上一篇文章介绍了S32K324 -UDS Bootlodaer开发中的需求,本文根据需求开发Bootloader软件。 本文参考NXP官网的S32K324 UBL,其中有一些Bug,也有一些和上位机…

C++ 实现红黑树

红黑树的概念 红黑树&#xff0c;是一种二叉搜索树&#xff0c;但在每个结点上增加一个存储位表示结点的颜色&#xff0c;可以是Red或 Black。 通过对任何一条从根到叶子的路径上各个结点着色方式的限制&#xff0c;红黑树确保没有一条路 径会比其他路径长出俩倍&#xff0c;因…

SMART PLC开放式以太网通信(UDP通信)

西门子S7-200 SMART PLC不仅支持开放式以太网通信,还支持MODBU-RTU,以及ModbusTcp通信,详细内容请参考下面文章: MODBUS-RTU主站通信 【精选】PLC MODBUS通信优化、提高通信效率避免权限冲突(程序+算法描述)-CSDN博客文章浏览阅读2.5k次,点赞5次,收藏10次。MODBUS通讯…

Ubuntu 20.04源码安装git 2.35.1

《如何在 Ubuntu 20.04 上从源代码安装 Git [快速入门]》和《如何在 Ubuntu 20.04 上安装 Git》是我参考的博客。 https://git-scm.com/是git官网。 lsb_release -r看到操作系统版本是20.04。 uname -r看到内核版本是5.4.0-156-generic。 sudo apt update更新一下源。 完…

操作系统复习(3)处理机调度与死锁

一、概述 1.1了解调度的层次 调度是指&#xff0c;在一个队列中&#xff0c;按照某种方法&#xff08;算法&#xff09;&#xff0c;选择一个合适的个体的过程。进程调度的功能就是按一定策略、动态地把CPU分配给处于就绪队列中的某一进程&#xff0c;并使之执行。 作业调度&…

设置DevC++支持c++11标准

1.点击编译选项 2. 设置语言标准 3.点击确认 4.测试代码 使用auto成功 测试&#xff01;