MQ:RabbitMQ

news2025/1/11 12:49:23

同步和异步通讯

同步通讯:

需要实时响应,时效性强

耦合度高

每次增加功能都要修改两边的代码

性能下降

需要等待服务提供者的响应,如果调用链过长则每次响应时间需要等待所有调用完成

资源浪费

调用链中的每个服务在等待响应过程中,不能释放请求占用的资源,高并发场景下会极度浪费系统资源

级联失败

如果一个服务的提供者出现了问题,所有调用方都会出问题,出现雪崩问题

异步通讯:

在发布方和接收方之间存在中间人(Broker)

发布方只需将消息发布到中间人

接受方只需要从中间人订阅消息

实现解耦

吞吐量提升

无需等待订阅者处理完成,响应快速

故障隔离

服务不存在直接调用,不存在级联失败问题

资源占用问题

调用之间不会阻塞,不会产生无效资源占用问题

解耦

每个服务之间灵活插拔,实现解耦

流量削峰

所有发布事件由Broker直接接受,接受者按照自己的速度从Broker处理事件,实现缓冲

MQ

MessageQueue消息队列

即上述过程中的Broker

几种常见的MQ对比
请添加图片描述

MQ的基本结构

请添加图片描述

publisher:发布者

consumer:消费者

exchange:交换机,负责消息路由

queue:队列,存储消息,消息的缓冲区

队列绑定交换机

virtualHost:虚拟主机

channel:表示通道,操作MQ的工具,连接消息发布者和交换机,连接消息接受者和队列

RabbitMQ整体工作流程

发布者发布消息给交换机

交换机将消息路由到与其绑定的队列

消费者监听与其对应的队列获取消息

RabbitMQ消息模型

生产者->(交换机)->队列->消费者
基本消息队列BasicQueue

请添加图片描述

工作消息队列WorkQueue

请添加图片描述

多个消费者并发消费队列,消费者之间是竞争关系

发布订阅(Publish,Subscribe)

根据交换机类型不同分为三种

广播

请添加图片描述

消费者各自拥有

生产者将消息发送到交换机,具体发给哪个队列,生产者无法决定,由交换机决定.

交换机把消息发送给绑定过的所有队列,队列的所有消费者都能拿到消息.实现一条消息被多个消费者消费

生产者->所有消费者

路由

请添加图片描述

需求不同的消息被不同队列消费,就需要用到Direct类型的Exchange.在Direct模型下,需要指定RoutingKey(路由key).在消息发送方向交换机发送消息时,必须指定消息的路由key

交换机在接受到生产者消息后,将消息递交给routingkey完全匹配的队列

主题

请添加图片描述

topic类型相当于可以使用通配符匹配routingkey的路由类型

通配符规则

#:匹配一个或者多个词

*:匹配恰好一个词

SpringAMQP

Spring官方基于RabbitMQ提供的一套消息收发的模版工具:SpringAMQP

提供了三个功能:

自动声明队列,交换机以及绑定关系

基于注解的监听器模式,异步接收消息

封装RabbitTemplate工具用于发送消息

引入依赖
 <!--AMQP依赖,包含RabbitMQ-->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

实现通信

新建队列->publisher发送->consumer接收

基于rabbitTemplate API进行发送

简单队列通信

发送:

    @GetMapping("/simple")
    public void publishMessage(){
        String queueName = "simplequeue";
        String message = "simplequeuemessage";
        rabbitTemplate.convertAndSend(queueName,message);
        System.out.println("simple_success");
    }

接收:

@Component
public class MqListener {
    @RabbitListener(queues = "simplequeue")
    public void simpleListener(String msg){
        System.out.printf("%s 简单队列收到消息:%s", Convert.toStr(LocalDateTime.now()),msg);
    }
}

工作队列通信

多个消费者监听一个队列,不同消费者因为自身的能力不同对消息处理的时间也不同

如果不进行额外设置的话,会将队列中的消息平均分配给所有消费者

造成处理能力浪费的情况

所以我们可以通过配置

    listener:
      simple:
        prefetch: 1#每个消费者每次只能取一条

来限制每个消费者预取的数量,实现能者多劳的工作场景

发送:

    @GetMapping("/work")
    public void PublishWorkMessage(){
        String queueName = this.WORK;
        for (int i = 0; i < 100; i++) {
            rabbitTemplate.convertAndSend(queueName,System.currentTimeMillis());
        }
    }

接收:

    @RabbitListener(queues = "workqueue")
    public void workListener1(String msg){
        System.out.printf("%s 工作队列1收到消息:%s\n", Convert.toStr(LocalDateTime.now()),msg);
    }

    @RabbitListener(queues = "workqueue")
    public void workListener2(String msg){
        System.out.printf("%s 工作队列2收到消息:%s\n", Convert.toStr(LocalDateTime.now()),msg);
    }

交换机

上述两种工作方式都是不包含交换机,消息直接发送到队列的通信方式

我们可以通过引入交换机来实现消息的路由,决定具体要发送到哪个队列

消息通信流程 发布者->交换机->交换机决定的队列->消费者

交换机类型

包含以下四种:

FanOut:广播,将消息传递给所有绑定交换机的队列
Direct:基于RoutingKey发送消息给对应的队列
Topic:通配符订阅,基于通配符RoutingKey发送消息给对应的队列
Headers:头匹配,基于MQ的消息头匹配

FanOut

创建多个队列->创建交换机进行绑定->发布者发布->消费者接收

发布:

    @GetMapping("/fanout")
    public void publishFanoutMessage(){
        String exchangeName = this.FANOUT;
        for (int i = 0; i < 10; i++) {
            rabbitTemplate.convertAndSend(exchangeName,"",System.currentTimeMillis() + "from:fanout");
        }
    }

接收:

    @RabbitListener(queues = "cfjg_queue1")
    public void queue1Listener(String msg){
        System.out.printf("%s 队列1收到消息:%s\n", Convert.toStr(LocalDateTime.now()),msg);
    }
    @RabbitListener(queues = "cfjg_queue2")
    public void queue2Listener(String msg){
        System.out.printf("%s 队列2收到消息:%s\n", Convert.toStr(LocalDateTime.now()),msg);
    }

Direct

创建多个队列->创建交换机进行路由key绑定->发布者发布->消费者接收

发布:

    @GetMapping("/direct")
    public void publishDirectMessage(){
        String exchangeName = this.DIRECT;
        for (int i = 0; i < 10; i++) {
            if(i%2 == 0){
                rabbitTemplate.convertAndSend(exchangeName,this.Queue1,"directTo:queue1--" + i);
            }else if(i%2 !=  0){
                rabbitTemplate.convertAndSend(exchangeName,this.Queue2,"directTo:queue2--" + i);
            }
            if(i%5 == 0){
                rabbitTemplate.convertAndSend(exchangeName,this.ALL,"directTo:all--" + i);
            }
        }
    }

接收:

    @RabbitListener(queues = "cfjg_queue1")
    public void queue1Listener(String msg){
        System.out.printf("%s 队列1收到消息:%s\n", Convert.toStr(LocalDateTime.now()),msg);
    }
    @RabbitListener(queues = "cfjg_queue2")
    public void queue2Listener(String msg){
        System.out.printf("%s 队列2收到消息:%s\n", Convert.toStr(LocalDateTime.now()),msg);
    }

Topic

创建多个队列->创建交换机进行通配符路由key绑定->发布者发布->消费者接收

路由key由多个由.分割的单词组成

绑定时使用#或*通配符进行绑定

#:代表任意多个单词

*.代表任意一个单词

eg:cfjg.test

可以由cfjg.#或者*.test进行匹配

发布:

    @GetMapping("/topic")
    public void publishTopicMessage(){
        for (int i = 0; i < 100; i++) {
            String tmp = i % 2 == 0 ? "two" : "one";
            String routingKey = "cfjg." + tmp;
            System.out.println(routingKey);
            rabbitTemplate.convertAndSend(this.TOPIC,routingKey,"topicTo:queue--" + i);
        }
    }

接收:

    @RabbitListener(queues = "cfjg_queue1")
    public void queue1Listener(String msg){
        System.out.printf("%s 队列1收到消息:%s\n", Convert.toStr(LocalDateTime.now()),msg);
    }
    @RabbitListener(queues = "cfjg_queue2")
    public void queue2Listener(String msg){
        System.out.printf("%s 队列2收到消息:%s\n", Convert.toStr(LocalDateTime.now()),msg);
    }

声明队列和交换机

SpringAMQP提供了一个Queue类用来创建队列

public class Queue extends AbstractDeclarable implements Cloneable {}

提供了一个Exchange接口用来表示不同类型的交换机

请添加图片描述

SpringAMQP提供了ExchangeBuilder和BindingBuilder来简化创建和绑定队列和交换机的过程

我们可以在消费者中编写一个配置类来对队列和交换机进行声明

@Configuration
public class MqConfig {
    //声明交换机
    @Bean
    public FanoutExchange fanoutExchange(){
        return new FanoutExchange("cfjg.fanout");
    }
	
    //声明队列
    @Bean
    public Queue fanoutQueue1(){
        return new Queue("test_queue");
    }
	
    //声明绑定
    @Bean
    public Binding bingingQueue1(){
        return BindingBuilder.bind(fanoutQueue1()).to(fanoutExchange());
    }
}
direct和topic模式需要每个key都进行一次绑定(同控制台操作)

基于注解声明

Spring提供了基于注解方式进行声明的途径

通过注解可以声明

绑定
队列
交换机
路由key
    @RabbitListener(
            bindings = @QueueBinding(
                    value = @Queue(name = "cfjg_queue2"),
                    exchange = @Exchange(name = "cfjg.fanout" , type = ExchangeTypes.FANOUT),
                    key = {"red","blue"}
            )
    )
    public void fanoutQueue(String msg){
        System.out.println(msg);
    }

消息转换器

在MQ的消息传输中,会先将对象序列化为字节,接收消息时将字节反序列化为Java对象

但是默认的JDK序列化存在以下问题

数据体积过大,

存在安全漏洞,

可读性差

可以使用JSON进行序列化和反序列化

引入JackSon依赖来进行JSON序列化

<dependency>
    <groupId>com.fasterxml.jackson.dataformat</groupId>
    <artifactId>jackson-dataformat-xml</artifactId>
    <version>2.9.10</version>
</dependency>

通过在配置类中注册转换器Bean来实现消息发送时的自动序列化

	@Bean
    public MessageConverter messageConverter(){
        return new jackson2JsonMessageConverter();
    }

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1884169.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

来聊聊Redis定期删除策略的设计与实现

写在文章开头 我们都知道redis通过主线程完成内存数据库的指令操作&#xff0c;由于只有一个线程负责核心业务流程&#xff0c;所以对于每一个操作都要求尽可能达到尽可能的高效迅速&#xff0c;而本文就基于源码来聊聊redis的定期删除策略的设计与实现。 Hi&#xff0c;我是 …

上传头像到Domino中

大家好&#xff0c;才是真的好。 首先&#xff0c;说一个消息&#xff0c;2024年6月25号HCL发布了一则公告&#xff0c;就是从2024年12月10号开始结束Notes/Domino 11.0.x版本的市场订单申请&#xff0c;从从2025年6月26号开始停止对Notes/Domino 11.0.x版本的产品技术支持&am…

宝塔linux网站迁移步骤

网站迁移到新服务器步骤 1.宝塔网站迁移&#xff0c;有个一键迁移工具&#xff0c;参考官网 宝塔一键迁移API版本 3.0版本教程 - Linux面板 - 宝塔面板论坛 (bt.cn)2 2.修改域名解析为新ip 3.如果网站没有域名&#xff0c;而是用ip访问的&#xff0c;则新宝塔数据库的wp_o…

Ubuntu机器安装rdkit指定版本,通过conda安装不需要make,有手就行。

阿里云购买Ubuntu 22.0机器 IP没错&#xff0c;访问外网没问题 图片中的命令放在下面了。 useradd test-user -s /bin/bash mkdir /home/test-user chown -R test-user: /home/test-user passwd test-uservi /etc/sudoers wget -c https://repo.anaconda.com/archive/Anacon…

springcloud-gateway 网关组件中文文档

Spring Cloud网关 Greenwich SR5 该项目提供了一个基于Spring生态系统的API网关&#xff0c;其中包括&#xff1a;Spring 5&#xff0c;Spring Boot 2和项目Reactor。Spring Cloud网关的目的是提供一种简单而有效的方法来路由到API&#xff0c;并向它们提供跨领域的关注&#x…

[快易签]免越狱苹果签名工具快易签自用证书签名教程学会了可签一切应用

相关地址 快易签官网&#xff1a;快易签 定制版&#xff1a;快易签.定制款(含证书) 自签版&#xff1a;https://s1.kyq1.cn/ 免费源&#xff1a;https://app.eqishare.com/appstore 网盘&#xff1a;路灯网盘-iOS砸壳分享网-IPA分享网-巨魔商店IPA软件资源-后厂村路灯的网…

白话负载均衡、正反向代理(入门科普版)

什么是负载均衡&#xff1f;为什么需要负载均衡 从字面上理解&#xff0c;什么是负载&#xff0c;服务器承受访问量的大小是负载&#xff0c;但是单台服务器的访问性能是有限的&#xff0c;最典型的例子就是双十一、春运抢票这种&#xff0c;这时候就需要一种方案来解决这类问…

互联网框架五层模型详解

注&#xff1a;机翻&#xff0c;未校对。 What is the Five Layers Model? The Framework of the Internet Explained 五层模型互联网框架解释 Computer Networks are a beautiful, amazing topic. Networks involve so much knowledge from different fields, from physics…

idea启用多个环境

背景 在平常的后端开发中&#xff0c;需要与前端联调&#xff0c;比较方便的是让前端直接连自己的本地环境&#xff08;毕竟每次都要打包部署到测试环境实在是太麻烦了&#xff09;。但是这样子也有点不好&#xff0c;就是自己功能还没写好呢&#xff0c;结果前端连着自己的环…

LLaVA1.5训练数据和时间分析

LLaVA的PT+SFT训练_llava sft-CSDN博客文章浏览阅读379次。这个阶段,使用8个A100(80G)训练LLaVA-v1.5-13B大约需要20h。全量微调,非lora跑不起来啊,以前一直用swift,llama-factory这种框架式的代码库,但用原作者开源的代码也是有很多好处的。在这个阶段,使用 8 个 A100(…

Web端登录页和注册页源码

前言&#xff1a;登录页面是前端开发中最常见的页面&#xff0c;下面是登录页面效果图和源代码&#xff0c;CV大法直接拿走。 1、登录页面 源代码&#xff1a; <!DOCTYPE html> <html><head><meta charset"utf-8"><title>登录</ti…

云计算【第一阶段(24)】Linux文件系统与日志分析

一、文件与存储系统的inode与block 1.1、硬盘存储 最小存储单位&#xff1a;扇区(sector) 每个扇区大小&#xff1a;512字节 1.2、文件存取 最小存取单位&#xff1a;块(block)连续八个扇区组成&#xff1a;块(block) 每个块大小&#xff1a;4K文件数据&#xff1a;实际数据…

为什么我学个 JAVA 就已经耗尽所有而有些人还能同时学习多门语言

在开始前刚好我有一些资料&#xff0c;是我根据网友给的问题精心整理了一份「JAVA的资料从专业入门到高级教程」&#xff0c; 点个关注在评论区回复“888”之后私信回复“888”&#xff0c;全部无偿共享给大家&#xff01;&#xff01;&#xff01;我的入门语言是C&#xff0c…

网安小贴士(3)网安协议

一、前言 网络安全协议是构建安全网络环境的基础&#xff0c;它们帮助保护网络通信免受各种威胁和攻击。 二、定义 网络安全协议是指在计算机网络中用于确保网络通信和数据传输安全的协议。它们定义了在网络通信过程中的安全机制、加密算法、认证和授权流程等&#xff0c;以保…

SOC模块LoRa-STM32WLE5有哪些值得关注

SoC 是片上系统的缩写&#xff0c;是一种集成芯片&#xff0c;集成了计算机或其他电子系统的所有或大部分组件。这些组件通常包括中央处理器 (CPU)、内存、输入/输出接口和辅助存储接口。包含数字、模拟、混合信号和通常的 RF 信号处理功能&#xff0c;具体取决于应用。片上系统…

Kotlin扩展函数(also apply run let)和with函数

also apply run let with的使用例子 private fun testOperator() {/*** also*/val person Person("ZhangSan", 18)person.also {// 通常仅仅打印使用, 也可以通过it修改it.name "ZhangSan1"println("also inner name: " it.name)}println(&qu…

DevOps认证是什么?DevOps工具介绍

DevOps 这个词是由Development&#xff08;开发&#xff09; 和 Operations&#xff08;运维&#xff09;组合起来的&#xff0c;你可以把它理解成为一种让开发团队和运维团队紧密合作的方法。 DevOps从2009年诞生到现在已经14年多了&#xff0c;一开始大家还在摸索&#xff0…

【gitee使用教程】(创建项目仓库并上传代码简易版)

gitee使用教程&#xff0c;创建项目仓库并上传代码简易版 1.在码云上创建一个仓库2.将代码克隆到本地1.复制仓库地址2.找到你想要放置的文件位置&#xff0c;右键点击更多选项&#xff0c;选择Git Clone3.将复制的仓库地址填入URL 3. IDEA结合GIT和Gitee的简单使用idea需要识别…

69. x 的平方根(简单)

69. x 的平方根 1. 题目描述2.详细题解3.代码实现3.1 Python方法一&#xff1a;逐个遍历方法二&#xff1a;二分查找 3.2 Java 1. 题目描述 题目中转&#xff1a;69. x 的平方根 2.详细题解 不能使用系统内置的函数&#xff0c;寻找某个数&#xff08;假定为x&#xff09;的…

【传知代码】揭秘AI如何揪出图片中的“李鬼”(论文复现)

在数字化时代&#xff0c;我们时常被各种图像信息所包围。然而&#xff0c;这些图像中有时隐藏着不为人知的秘密——被篡改的文字或图像。这些被篡改的内容可能误导我们的判断&#xff0c;甚至在某些情况下造成严重的后果。幸运的是&#xff0c;随着人工智能&#xff08;AI&…