RabbitMQ:消息中间件

news2025/1/22 21:36:52

文章目录

  • 概念
  • 管理界面简介
  • 4中常见交换器类型
    • 1.Direct交换器:
    • 2.Fanout交换器
    • 3.Topic交换器
    • 4.headers交换器
  • 对象类型消息传递
  • 同步等待
  • 使用代码创建队列
  • 待续......

概念

在微服务架构中项目之间项目A调用项目B 项目B调用项目C项目C调用项目D。。
用户必须等待项目之间内容依次的运行结束后才会给用户返回结果
这是一个同步的调用,用户等待时间可能比较长,用户体验度比较差

消息中间件: 可以理解成一个队列,把用户需要处理的任务交给放到队列,任务在队列中进行排队等待执行,用户无需等待代码的执行的时间.
在这里插入图片描述

管理界面简介

1.Overview: 此面板为RabbitMQ基础信息展示面板,列举了服务器的信息,如:节点名称、内存占用、磁盘占用等。
2.Connections: 此面板中展示所有连接到RabbitMQ的客户端链接。只展示基于5672端口的链接。
3.Channels: 此面板中展示各链接中的具体信道。标记方式为链接(编号),如:192.168.91.1:12345(1)。
4.Exchanges: 此面板中展示RabbitMQ中已有的交换器,并注明交换器名称、类型等基本信息。其中只有direct交换器有默认交换器(AMQP default),当使用direct交换器时,如果没有明确指定名称,使用AMQP default交换器,也可明确指定名称。但是其他类型交换器没有默认的,都需要指定名称。
5.Queues: 此面板展示RabbitMQ中的队列信息。

4中常见交换器类型

1.Direct交换器:

direct会根据路由键把消息放入到指定的队列中。
在Queues界面创建队列q1,在Exchanges创建direct交换机fs.direct绑定交换机q1,指定路由key,fs.q1
一.生产者
1.导入依赖

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>

2.配置yml

# 配置RabbitMQ相关信息
# 当创建RabbitMQ容器的时候,不提供用户名和密码配置,自动创建用户guest,密码guest。
# guest用户只能本地访问RabbitMQ。
spring:
  rabbitmq:
    host: localhost # RabbitMQ服务器的IP。默认localhost
    port: 32769 # RabbitMQ服务器的端口。
    username: guest # RabbitMQ的访问用户名。默认guest。
    password: guest # RabbitMQ的访问密码。默认guest
    virtual-host: / # 连接RabbitMQ中的哪一个虚拟主机。默认 /

3.在测试包下发送消息:
类型可以是: AmqpTemplate(顶级接口), RabbitOperations(专用子接口),RabbitTemplate(具体实现)
建议使用接口: 优先级是 RabbitOperations > AmqpTemplate

Spring AMQP可以发送的消息类型必须是Message类型。
Spring AMQP可以帮助程序员自动封装消息类型Message对象(默认封装),自动转换封装的消息体类型是Object,只要类型可序列化即可。

消息发送到队列后,无需等待,是异步操作,生产者接着往下执行.

@SpringBootTest
class RabbitmqApplicationTests {
    @Autowired
    private RabbitOperations rabbitOperations;
    @Test
    void contextLoads() {
		//仅发送,不需要等待返回值,异步,推荐
        rabbitOperations.convertAndSend("fs.dire"(交换机名称),"xy.q1"(路由key),"你好 rabbitmq"(消息内容));
        //发送,有返回值,使得rabbitmq变成同步的了,需要等待返回值,失去了rabbitmq的意义
        //rabbitOperations.convertSendAndReceive("","","");
    }
}

二.消费者
1.pom.xml

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

2.yml

spring:
  rabbitmq:
    host: localhost
    port: 32769
    username: guest
    password: guest

3.监听队列
修饰符: public
返回值: 异步消息必须是void
方法名: 自定义
参数表: 一个参数,类型可以是Message或者具体的消息体类型(Object)。message是Spring AMQP中消息的唯一类型。代表完整消息,有头和体组成。如果对消息头没有任何处理要求,则直接定义消息体具体类型即可。
方法实现: 根据具体要求,定义即可。

注意:方法可以抛出任意类型的异常。只要抛出异常,则代表消费错误,RabbitMQ不删除队列中的消息。
注解是RabbitListener。

如果有多个消费监听,默认采用轮询消费,消费监听不需要相互等待,并发执行.

@Component
public class StringMessageConsumer {
    @RabbitListener(queues = {"q1"})(队列名称可以指定多个)
    public void received1(String m){
        System.out.println(m);
    }

	@RabbitListener(queues = {"q1"})(队列名称可以指定多个)
    public void received2(String m){
        System.out.println(m);
    }
}

2.Fanout交换器

扇形交换器: 会把消息发送给所有的绑定在当前交换器上的队列,可以不写路由键
1.在Queues界面创建队列f1,f2,在Exchanges创建fanout交换机fs.fanout绑定交换机f1,f2,不指定路由
2.编写测试代码

    @Autowired
    private RabbitOperations rabbitOperations;
    @Test
    void contextLoad1s() {
    	//没有绑定路由键,设为null即可
        rabbitOperations.convertAndSend("fs.fanout",null,"你好 rabbitmq");
    }

3.f1,f2队列都多了一个消息

3.Topic交换器

主题交换器: 路由键可包括特殊字符实现通配。特殊字符包括: 星号和 ‘#’。
星号 : 代表一个单词。多个单词使用’.'分割。
‘#’ : 代表0~n个字符,即任意字符串。//如abc.# 代表以 abc. 开头的所有路由key
1.在Queues界面创建队列t1,t2,t3在Exchanges创建Topic交换机fs.topic绑定交换机t1,t2,t3
2.指定t1路由键: abc.t1,指定t2路由键: abc.*,指定t3路由键: abc.#
3.编写测试代码

    @Autowired
    private RabbitOperations rabbitOperations;

    @Test
    void contextLoawd1s() {
        rabbitOperations.convertAndSend("fs.topic","abc.t1","你好 rabbitmq");//t1,t2,t3
        rabbitOperations.convertAndSend("fs.topic","abc.123","你好 rabbitmq");//t2,t3
        rabbitOperations.convertAndSend("fs.topic","abc.123.234","你好 rabbitmq");//t3
    }

4.headers交换器

headers交换器和direct交换器的主要区别是在传递消息时可以传递header部分消息
生产消息
在Queues界面创建队列h1,在Exchanges创建headers交换机fs.headers绑定交换机h1,指定路由key,fs.h1

    @Autowired
    private RabbitOperations rabbitOperations;
    
    @Test
    void conwtextLoawd1s() {
        MessageProperties messageProperties = new MessageProperties();
        messageProperties.setHeader("fs","java");
        Message message = new Message("我是消息".getBytes(), messageProperties);
        rabbitOperations.convertAndSend("fs.headers","fs.h1",message);
    }

消费消息

    @Test
    void conwtextLoawd1s() {
        MessageProperties messageProperties = new MessageProperties();
        messageProperties.setHeader("fs","java");
        Message message = new Message("我是消息".getBytes(), messageProperties);
        rabbitOperations.convertAndSend("fs.headers","fs.h1",message);
    }

对象类型消息传递

1.生产者
创建User类
注意要继承序列化接口Serializable
同时指定序列化版本号serialVersionUID且和消费者一致(如果不指定,消费者的User类必须和生产者的User类完全一致,否则,不会认为是统一类型)
User类在生产者和消费者中的包路径必须一致

@Data
@NoArgsConstructor
@AllArgsConstructor
public class User implements Serializable {
    private final Long serialVersionUID = 1L;
    private String name;
    private Integer age;
}

发送消息
    	@Autowired
    	private RabbitOperations rabbitOperations;
    
        User user = new User("张三",18);
        rabbitOperations.convertAndSend("fs.direct","xy.q1",user);

2.消费者
在消费者中创建User类
注意要继承序列化接口Serializable
同时指定序列化版本号serialVersionUID且和消费者一致(如果不指定,消费者的User类必须和生产者的User类完全一致,否则,不会认为是统一类型)
User类在生产者和消费者中的包路径必须一致
直接用对应类型接收

@Component
public class StringMessageConsumer {
    @RabbitListener(queues = {"q1"})
    public void received(User(类型和发送来的一致即可) user){
        System.out.println(user);
    }
}

统一用Message对象接收,常用语Headers交换器

@Component
public class StringMessageConsumer {
    @RabbitListener(queues = {"q1"})
    public void received(Message message) throws IOException, ClassNotFoundException {
        byte[] body = message.getBody();
        
        ByteArrayInputStream bai = new ByteArrayInputStream(body);
        ObjectInputStream ois = new ObjectInputStream(bai);
        
        Object object = ois.readObject();
        if(object instanceof User){
            User user = (User) object;
            System.out.println(user);
        }
    }
}

同步等待

等待结果返回,才能往下执行
默认等待5s,可以配置,超时返回null
yml配置

spring:
  rabbitmq:
    host: localhost # RabbitMQ服务器的IP。默认localhost
    port: 32769 # RabbitMQ服务器的端口。
    username: guest # RabbitMQ的访问用户名。默认guest。
    password: guest # RabbitMQ的访问密码。默认guest
    virtual-host: / # 连接RabbitMQ中的哪一个虚拟主机。默认 /

生产者:

    @Test
    void contextLowads() {
    	//发送,有返回值,使得rabbitmq变成同步的了,需要等待返回值,等待结果返回,才能往下执行
        String aa = (String)rabbitOperations.convertSendAndReceive("fs.direct", "xy.q1", "aa");
        System.out.println(aa);
}

消费者:

@Component
public class StringMessageConsumer {
    @RabbitListener(queues = {"q1"})
    public String received(Message message) {
        return "hello";
    }
}

使用代码创建队列

一.在生成者端创建队列
是发送消息时创建,而不是启动项目时。

@Configuration
public class RabbitMQConfig {
    // 发送消息时如果不存在这个队列,会自动创建这个队列。
    // 注意:是发送消息时,而不是启动项目时。
    // 相当于:可视化操作时创建一个队列
    // 如果队列创建完成后,没有绑定(没有另外两个方法),默认绑定到AMQP default交换器
    @Bean
    public Queue queue(){
        return new Queue("queue.second");
    }

    // 如果没有这个交换器,在发送消息创建这个交换器
    // 配置类中方法名就是这个类型的实例名。相当于<bean id="" class="">的id属性,返回值相当于class
    @Bean
    public DirectExchange directExchange(){
        return new DirectExchange("direct.first.ex");
    }

    // 配置类中方法参数,会由Spring 容器自动注入
    @Bean
    public Binding directBingding(DirectExchange directExchange,Queue queue){
        // with(“自定义路由键名称”)
        return BindingBuilder.bind(queue).to(directExchange).with("routing.key.2");
        // withQueueName() 表示队列名就是路由键名称
        // return BindingBuilder.bind(queue).to(directExchange).withQueueName();
    }
}

二.消费者

    @RabbitListener(bindings = {
            @QueueBinding(
                    value = @Queue(name = "queue.second",autoDelete = "false", durable = "true"),
                    exchange = @Exchange(name = "direct.second.ex",autoDelete = "false", type = ExchangeTypes.DIRECT),
                    key = {"routing.key.second.1"}
            )
    })
    public void onMessage(String messageBody){
        System.out.println("第二个消息消费者监听,处理消息:" + messageBody);
    }

待续…

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/434174.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

后端太难了,不 All in 了

作者&#xff1a;阿秀 校招八股文学习网站&#xff1a;https://interviewguide.cn 这是阿秀的第「256」篇原创 小伙伴们大家好&#xff0c;我是阿秀。 欢迎今年参加秋招的小伙伴加入阿秀的学习圈&#xff0c;目前已经超过 2200 小伙伴加入&#xff01;去年认真准备和走下来的基…

Segment Anything Model代码讲解(二)之image_encoder

image_encoder代码解析 在transformer的结构中&#xff0c;编码是非常重要的部分。接下来看image_encoder的代码部分目录 class ImageEncoderViT def initdef forward class Block def initdef forward class Attention def initdef forward def window_partitiondef window_…

【C++】引用(下)【深度全面解析】

&#x1f339;作者:云小逸 &#x1f4dd;个人主页:云小逸的主页 &#x1f4dd;Github:云小逸的Github &#x1f91f;motto:要敢于一个人默默的面对自己&#xff0c;强大自己才是核心。不要等到什么都没有了&#xff0c;才下定决心去做。种一颗树&#xff0c;最好的时间是十年前…

自定义类型——枚举与联合体

枚举 枚举顾名思义就是一一列举 把可能的取值一一列举 枚举类型的定义 enum Day//星期 {Mon,Tues,Wed,Thur,Fri,Sat,Sun };enum Sex//性别 {MALE,FEMALE,SECRET }&#xff1b;以上定义的 enum Day &#xff0c; enum Sex 都是枚举类型 { }中的内容是枚举类型的可能取值&…

English Learning - L2 第 14 次小组纠音 复习元音 [ɔɪ ] [aʊ] [əʊ] [ɪə] 弱读 2023.4.12 周三

English Learning - L2 第 14 次小组纠音 复习元音 [ɔɪ ] [aʊ] [əʊ] [ɪə] 弱读 2023.4.12 周三 共性问题coin voice /kɔɪn/ /vɔɪs/ 中 ɔɪvowel pounds /ˈvaʊəl/ /paʊndz/ 中的 aʊshow /ʃəʊ/beer nearly /bɪə/ /ˈnɪəlɪ/ 中的 ɪəbest bed ten /best…

U-Boot 烧写与启动

1.uboot 编译好以后就可以烧写到板子上使用了&#xff0c;这里我们跟前面裸机例程一样&#xff0c;将 uboot 烧写到 SD 卡中&#xff0c;然后通过 SD 卡来启动来运行 uboot。使用 imxdownload 软件烧写&#xff0c;命令如 下&#xff1a; chmod 777 imxdownload //给予 imxdo…

超参数的设置;使用适当的尺度来选择超参数;批量归一化;测试时的批量标准化:

超参数的设置&#xff1a; 超参数之间也有重要性差异。通常来说&#xff0c;学习因子α是最重要的超参数&#xff0c;也是需要重点调试的超参数。动量梯度下降因子β、各隐藏层神经元个数#hidden units和mini-batch size的重要性仅次于α。然后就是神经网络层数#layers和学习因…

【Python38安装PyAudio过程出现错误如:Failed building wheel for PyAudio等一系列问题】

安装PyAudio过程出现错误&#xff1a;Failed building wheel for PyAudio 目前成功解决解决过程&#xff08;1&#xff09; 解决方法1 查看pip支持安装whl文件的命名方式:没解决&#xff08;2&#xff09;解决方法2 直接用终端解决 目前成功解决 环境&#xff1a;Windows11、p…

数据库实验 | 第2关:建立和调用存储过程(带输出参数)

任务描述 本关任务&#xff1a; 销售数据库有工作人员、销售单数据表 工作人员gzry数据表有雇员号gyh、姓名gyxm、出生日期csrq、学历xl、工资gz、部门bm、电话dh字段 销售单xsd数据表有销售单号xsdh、会员号hyh、雇员号gyh、销售日期xsrq、应付款yfk、实际付款sjfk字段 任…

JKind入门(二)引擎简介 BMC

如上文所说&#xff0c;JKind 使用了多个并行引擎&#xff0c;协调它们来证明需要检验属性。本文主要介绍 bounded model checking (BMC) 有界模型检验。其中会涉及到有关JKind的 K-induction &#xff08;k归纳引擎&#xff09;和 SMT求解机。 本来这些文章就是单纯就是自己的…

C语言进阶之内存操作函数

我们上一期学习的是字符串函数&#xff0c;只能操作字符串&#xff0c;如果我们想拷贝等等操作给一个整型数据或者浮点型数据&#xff0c;又该怎么办呢&#xff0c;就用到我们今天要学的内存操作函数 memcpy 内存拷贝 memmove 内存移动 memset 内存设计 memcpy操作 先来…

【Webpack】前端工程化与webpack

文章目录前端工程化1、小白眼中的前端开发 vs 实际的前端开发2、什么是前端工程化3、前端工程化的解决方案Webpack的基本使用1、什么是 webpack2、创建列表隔行变色项目3、在项目中安装webpack4、在项目中配置webpackWebpack中的插件1、webpack插件的作用2、webpack-dev -serve…

IntersectionObserver与无限滚动加载

学习链接 IntersectionObserver MDN Api IntersectionObserver API详解 Intersection observer 的概念和用法 过去&#xff0c;要检测一个元素是否可见或者两个元素是否相交并不容易&#xff0c;比如实现图片懒加载、内容无限滚动等功能时&#xff0c;都需要通过​getBound…

Java语法理论和面经杂疑篇《十一. JDK8新特性》

目录 1. Java版本迭代概述 1.1 发布特点&#xff08;小步快跑&#xff0c;快速迭代&#xff09; 1.2 名词解释 1.3 各版本支持时间路线图 1.4 各版本介绍 1.5 JDK各版本下载链接 1.6 如何学习新特性 2. Java8新特性&#xff1a;Lambda表达式 2.1 关于Java8新特性简介 …

C# | 上位机开发新手指南(十)加密算法——ECC

上位机开发新手指南&#xff08;十&#xff09;加密算法——ECC 文章目录 上位机开发新手指南&#xff08;十&#xff09;加密算法——ECC前言ECC的特性非对称性可逆性签名安全性高计算量和存储空间小 对比ECC与RSAC#中如何使用ECC加密与解密数据导入与导出秘钥签名与验证 结束…

PyQt Custom Widget

pyuic/pyside6-uic pip install PyQt6 pyqt6-tools或者 pip install PySide6假设你的自定义控件时from vtk.test2.testhead import testfaQ 首先拉一个QWidget 右键Promote to… 在header file里写上 vtk.test2.testhead&#xff08;写vtk/test2/testhead.h或者vtk/test2/te…

【改进灰狼优化算法】混沌灰狼优化算法(Matlab代码实现)

&#x1f4a5;&#x1f4a5;&#x1f49e;&#x1f49e;欢迎来到本博客❤️❤️&#x1f4a5;&#x1f4a5; &#x1f3c6;博主优势&#xff1a;&#x1f31e;&#x1f31e;&#x1f31e;博客内容尽量做到思维缜密&#xff0c;逻辑清晰&#xff0c;为了方便读者。 ⛳️座右铭&a…

当我把chatGPT作为Java面试官,它问了我这些问题

向chatGPT提问 面试官&#xff1a;你好&#xff0c;欢迎参加我们的Java面试。请先自我介绍一下。 面试者&#xff1a;非常感谢&#xff0c;我是一名资深Java开发工程师&#xff0c;具有丰富的Java开发经验。我在过去的五年里&#xff0c;主要从事了企业级Java应用的设计、开发…

VSCode纯手工配置C/C++项目

面向大二同学不想用Visual Studio的需求&#xff0c;探索Visual Studio Code平台上单纯利用C/C纯手动配置的方法&#xff0c;实现Release版本和Debug版本的调试和运行&#xff0c;并指定版本进行调试。 前置依赖项&#xff1a; C/C1 VSCode扩展配置文件列表&#xff0c;将下面的…

设计模式-1

1&#xff0c;设计模式概述 1.1 软件设计模式的产生背景 "设计模式"最初并不是出现在软件设计中&#xff0c;而是被用于建筑领域的设计中。 1977年美国著名建筑大师、加利福尼亚大学伯克利分校环境结构中心主任克里斯托夫亚历山大&#xff08;Christopher Alexand…