RabbitMQ 的介绍与使用

news2025/4/17 17:57:39

一. 简介

1> 什么是MQ

消息队列(Message Queue,简称MQ),从字面意思上看,本质是个队列,FIFO先入先出,只不过队列中存放的内容是message而已。
其主要用途:不同进程Process/线程Thread之间通信。

那么为什么会产生消息队列呢?有几个原因:

  • 不同进程(process)之间传递消息时,两个进程之间耦合程度过高,改动一个进程,引发必须修改另一个进程,为了隔离这两个进程,在两进程间抽离出一层(一个模块),所有两进程之间传递的消息,都必须通过消息队列来传递,单独修改某一个进程,不会影响另一个;

  • 不同进程(process)之间传递消息时,为了实现标准化,将消息的格式规范化了,并且,某一个进程接受的消息太多,一下子无法处理完,并且也有先后顺序,必须对收到的消息进行排队,因此诞生了事实上的消息队列;

MQ框架非常之多,比较流行的有RabbitMq、ActiveMq、ZeroMq、kafka,以及阿里开源的RocketMQ。本文主要介绍RabbitMq。

2> 什么是RabbitMQ

RabbitMQ 是一个消息代理:它接受和转发消息。您可以将其视为邮局:当您将要寄的邮件放入邮箱时,您可以确信信使最终会将邮件发送给您的收件人。在本例中,RabbitMQ 是邮箱、邮局和信使。

RabbitMQ 与邮局的主要区别在于它不处理纸张,而是接受、存储和转发二进制数据块——消息

RabbitMQ 和一般意义上的消息传递使用了一些术语。

  • 生产_仅仅意味着发送。发送消息的程序称为_生产者

  • _队列_是 RabbitMQ 中邮箱的名称。虽然消息会流经 RabbitMQ 和您的应用程序,但它们只能存储在_队列_中。_队列_仅受主机内存和磁盘限制的约束,它本质上是一个大型消息缓冲区。许多_生产者_可以发送消息到一个队列,并且许多_消费者_可以尝试从一个_队列_中接收数据。这就是我们表示队列的方式

其是实现 AMQP(高级消息队列协议)的消息中间件的一种,最初起源于金融系统,用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。 RabbitMQ 主要是为了实现系统之间的双向解耦而实现的。当生产者大量产生数据时,消费者无法快速消费,那么需要一个中间层。保存这个数据。
AMQP,即 Advanced Message Queuing Protocol,高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。消息中间件主要用于组件之间的解耦,消息的发送者无需知道消息使用者的存在,反之亦然。AMQP 的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、安全。
RabbitMQ 是一个开源的 AMQP 实现,服务器端用Erlang语言编写,支持多种客户端,如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP 等,支持 AJAX。用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。

3> 相关概念

通常我们谈到队列服务,会有三个概念:发消息者、队列、收消息者,RabbitMQ 在这个基本概念之上,多做了一层抽象,在发消息者和队列之间,加入了交换器 (Exchange)。这样发消息者和队列就没有直接联系,转而变成发消息者把消息给交换器,交换器根据调度策略再把消息给队列。那么,其中比较重要的概念有4个,分别为:虚拟主机,交换机,队列,和绑定。

  • 虚拟主机:一个虚拟主机持有一组交换机、队列和绑定。为什么需要多个虚拟主机呢?很简单, RabbitMQ 当中,用户只能在虚拟主机的 粒度进行权限控制。 因此,如果需要禁止A组访问B组的交换机/队列/绑定,必须为A和B分别创建一个虚拟主机。每一个RabbitMQ 服务器 都有一个默认的虚拟主机“/”。
  • 交换机:Exchange 用于转发消息,但是它不会做存储 ,如果没有 Queue bind 到 Exchange 的话,它会直接丢弃掉 Producer 发送过来的 消息。这里有一个比较重要的概念:路由键。消息到交换机的时候,交互机会转发到对应的队列中,那么究竟转发到哪个队列,就要根据
    该路由键。
  • 绑定:也就是交换机需要和队列相绑定,这其中如上图所示,是多对多的关系。

二. 实现

Spring Boot 集成 RabbitMQ

Spring Boot 集成 RabbitMQ 非常简单,如果只是简单的使用配置非常少,Spring Boot 提供了spring-boot-starter-amqp 项目对消息各种支持。

1. 简单使用
1>配置 pom 包,主要是添加 spring-boot-starter-amqp 的支持
	<dependency>
		<groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-starter-amqp</artifactId>
	</dependency>
2>配置文件application.yml

配置 RabbitMQ 的安装地址、端口以及账户信息

# 配置文件
spring:
  datasource:
    driver-class-name: com.mysql.cj.jdbc.Driver
    url: jdbc:mysql://localhost:3306/test?useUnicode=true&characterEncoding=utf-8&useSSL=false
    username: root
    password: 123456
    # RabbitMQ配置
  rabbitmq:
    host: 192.168.146.1
    port: 5672
    username: admin
    password: 123456

我这里还配置了数据库

3>队列配置
package com.nianxi.mybatisplus.config;

import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitConfig {

    @Bean
    public Queue Queue() {
        return new Queue("hello");
    }
}
4>发送者

rabbitTemplate 是 Spring Boot 提供的默认实现

package com.nianxi.mybatisplus.mapper;

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.util.Date;

@Component
public class HelloSender {

    @Autowired
    private AmqpTemplate rabbitTemplate;

    public void send() {
        String context = "hello " + new Date();
        System.out.println("Sender : " + context);
        this.rabbitTemplate.convertAndSend("hello", context);
    }
}
5>接收者
package com.nianxi.mybatisplus.mapper;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
@RabbitListener(queues = "hello")
public class HelloReceiver {

    @RabbitHandler
    public void process(String hello) {
        System.out.println("Receiver  : " + hello);
    }
}
6> 测试
package com.nianxi.mybatisplus;

import com.nianxi.mybatisplus.mapper.HelloSender;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;

@SpringBootTest
public class RabbitMqHelloTest {

    @Autowired
    private HelloSender helloSender;

    @Test
    public void hello() throws Exception {
        helloSender.send();
    }
}

注意:发送者和接收者的 queue name 必须一致,不然不能接收

2.RabbitTemplate

**RabbitTemplate**是SpringAMQP提供的一个高级消息操作模板,**用于在与RabbitMQ进行交互时进行消息的发送和接收操作。**它是对底层AMQP协议的封装,简化了与RabbitMQ的交互过程, 是SpringAMQP中的核心类,提供声明式方式处理RabbitMQ,包括发送和接收消息、消息转换、属性设置及回调机制。通过配置和正确使用,简化了RabbitMQ的集成与操作。

1> 发送消息

**RabbitTemplate**提供了多种发送消息的方法,包括同步发送和异步发送。通过指定交换机、路由键和消息体,我们可以将消息发送到 RabbitMQ 服务器上的指定位置。此外,RabbitTemplate还支持消息的确认机制,以确保消息被成功发送和接收。

rabbitTemplate.convertAndSend("exchangeName", "routingKey", message);
2> 接收消息

除了发送消息外,**RabbitTemplate**还提供了接收消息的功能。通过调用相关方法,我们可以从指定的队列中接收消息,并进行相应的处理。这通常涉及到监听队列、处理消息和确认消息接收等步骤。

Message receivedMessage = rabbitTemplate.receive("queueName");
MyMessage myMessage = rabbitTemplate.receiveAndConvert("queueName", MyMessage.class);
3> 消息转换

**RabbitTemplate支持消息的自动转换。这意味着我们可以将 Java 对象作为消息体发送,而RabbitTemplate会自动将其转换为可序列化的格式(如 JSON 或 XML)。同样地,当从队列中接收消息时,RabbitTemplate**也可以自动将消息体转换回 Java 对象。

Jackson2JsonMessageConverter messageConverter = new Jackson2JsonMessageConverter();
rabbitTemplate.setMessageConverter(messageConverter);
4> 消息属性设置

在发送消息时,我们可以设置各种消息属性,如消息的优先级、持久化标志、过期时间等。这些属性可以通过**MessageProperties对象进行设置,并在发送消息时传递给RabbitTemplate**。

import org.springframework.amqp.core.Message;  
import org.springframework.amqp.core.MessageBuilder;  
import org.springframework.amqp.core.MessageProperties;  
import org.springframework.amqp.rabbit.core.RabbitTemplate;  
import org.springframework.beans.factory.annotation.Autowired;  
import org.springframework.stereotype.Service;  
  
@Service  
public class MessageSender {  
  
    @Autowired  
    private RabbitTemplate rabbitTemplate;  
  
    public void sendMessage(String exchange, String routingKey, String message, int priority, boolean persistent, int ttl) {  
        // 创建MessageProperties  
        MessageProperties properties = new MessageProperties();  
        // 设置优先级,值范围0-9,其中0为最低优先级,9为最高优先级  
        properties.setPriority(priority);  
        // 设置消息持久化  
        properties.setDeliveryMode(persistent ? MessageDeliveryMode.PERSISTENT : MessageDeliveryMode.NON_PERSISTENT);  
        // 设置消息的过期时间,单位为毫秒  
        properties.setExpiration(String.valueOf(ttl));  
  
        // 使用MessageBuilder构建Message对象  
        Message msg = MessageBuilder.withBody(message.getBytes())  
                .setContentEncoding("UTF-8")  
                .setContentType("text/plain")  
                .setMessageId(UUID.randomUUID().toString()) // 可选,设置消息ID  
                .setTimestamp(new Date()) // 可选,设置时间戳  
                .setHeaders(Collections.singletonMap("x-custom-header", "value")) // 可选,设置自定义头  
                .andProperties(properties)  
                .build();  
  
        // 发送消息  
        rabbitTemplate.convertAndSend(exchange, routingKey, msg);  
    }  
}
5> 回调机制

**RabbitTemplate**支持发送消息时的回调机制。这意味着在发送消息后,我们可以注册一个回调函数来处理发送结果或接收响应。这对于需要异步处理发送结果或接收响应的场景非常有用。

**setConfirmCallback方法是RabbitTemplate**类中的一个回调方法,用于处理消息的确认(acknowledgment)结果。当消息成功发送到RabbitMQ的交换机时,会触发确认回调,你可以在该回调中处理相应的逻辑。

  • correlationData:关联数据,可以是任意类型的对象,通常用于唯一标识消息。

  • ack:布尔值,表示消息是否成功发送到交换机。true表示成功,false表示失败。

  • cause:失败的原因,当ackfalse时,此参数会提供一个可选的异常信息。

    rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
    if (ack) {
    // 消息发送成功
    System.out.println(“Message sent successfully”);
    } else {
    // 消息发送失败,进行处理
    System.out.println("Message sent failed: " + cause);
    }
    });

6> 异步消息处理

RabbitTemplate支持异步消息处理,你可以注册ConfirmCallbackReturnCallback来处理消息的确认和返回结果。ConfirmCallback用于确认消息是否成功发送到交换机,ReturnCallback用于处理无法路由到队列的消息。

rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
    if (ack) {
        // 消息发送成功
    } else {
        // 消息发送失败,进行处理
    }
});
 
rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
    // 处理无法路由到队列的消息
});
3.使用 RabbitTemplate 的步骤
1> 配置 RabbitTemplate

在使用**RabbitTemplate**之前,我们需要对其进行配置。这通常涉及到设置连接工厂、交换机、队列和绑定等。这些配置可以通过 XML 配置或 Java 配置完成。

2> 创建 RabbitTemplate 实例

一旦配置完成,我们可以创建一个**RabbitTemplate**实例。这个实例将使用我们提供的配置来与 RabbitMQ 服务器进行交互。

3> 发送消息

使用**RabbitTemplate**的发送方法,我们可以将消息发送到指定的交换机和路由键。我们可以指定消息体、消息属性和其他发送选项。

4> 接收消息

要接收消息,我们可以使用**RabbitTemplate**的接收方法或结合监听器来监听指定的队列。当消息到达时,我们可以处理消息并执行相应的业务逻辑。

5> 处理异常和错误

在使用**RabbitTemplate**时,我们还需要考虑异常和错误处理。例如,当发送消息失败或接收消息时发生异常时,我们需要有相应的处理机制来确保系统的稳定性和可靠性。

4.RabbitTemplate 的优势与注意事项
优势
  1. 简化操作RabbitTemplate封装了底层细节,使得开发者能够专注于业务逻辑的实现,而无需关心底层的消息传输细节。
  2. 灵活性RabbitTemplate提供了丰富的配置选项和扩展点,使得开发者能够根据实际需求进行定制和优化。
  3. 性能优化RabbitTemplate内部进行了性能优化,如连接池管理、消息缓存等,以提高消息传输的效率和可靠性。
注意事项
  1. 配置正确性:确保RabbitTemplate的配置正确无误,包括连接工厂、交换机、队列和绑定等的设置。错误的配置可能导致消息无法正确发送或接收。
  2. 异常处理:在使用RabbitTemplate时,要充分考虑异常处理机制,确保在发生异常时能够及时发现并处理。
  3. 资源释放:在使用完RabbitTemplate后,要确保释放相关资源,如关闭连接、释放连接池中的连接等,以避免资源泄漏和性能问题。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2307634.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

OpenCV给图像添加噪声

操作系统&#xff1a;ubuntu22.04 OpenCV版本&#xff1a;OpenCV4.9 IDE:Visual Studio Code 编程语言&#xff1a;C11 算法描述 如果你已经有了一张干净的图像&#xff0c;并希望通过编程方式向其添加噪声&#xff0c;可以使用 OpenCV 来实现这一点。以下是一个简单的例子&a…

Elasticsearch:使用阿里云 AI 服务进行嵌入和重新排名

作者&#xff1a;来自 Elastic Toms Mura 将阿里云 AI 服务功能与 Elastic 结合使用。 更多阅读&#xff0c;请参阅 “Elasticsearch&#xff1a;使用阿里 infererence API 及 semantic text 进行向量搜索”。 在本文中&#xff0c;我们将介绍如何将阿里云 AI 功能与 Elastics…

管理后台环境配置

后端配置及启动 a. 软件安装 1. Java sdk 1.8 2. maven 3.6 3. intellij IDEA 2024 4. Visual C Redistributable 5. mongodb 7.0 6. mysql 8.0 双击安装&#xff1a;mysql-installer-community-8.0.41.0.msi 版本选择&#xff1a;Full&#xff0c;包括服务器和客户端 …

数字IC低功耗后端设计实现之power gating和isolation技术

考虑低功耗设计需求&#xff0c;下图中间那个功能模块是需要做power domain的&#xff0c;即这个模块需要插MTCMOS。需要开启时&#xff0c;外面的VDD会和这个模块的LOCAL VDD形成通路&#xff0c;否则就是断开即power off状态。 这些低功耗设计实现经验&#xff0c;你真的懂了…

【网络编程】几个常用命令:ping / netstat / xargs / pidof / watch

ping&#xff1a;检测网络联通 1. ping 的基本功能2. ping 的工作原理3. ping 的常见用法4. ping 的输出解释5. ping 的应用场景6. 注意事项 netstat&#xff1a;查看网络状态 1. netstat 的基本功能2. 常见用法3. 示例4. 输出字段解释5. netstat 的替代工具6. 注意事项 xargs&…

sqlilab 46 关(布尔、时间盲注)

sqlilabs 46关&#xff08;布尔、时间盲注&#xff09; 46关有变化了&#xff0c;需要我们输入sort&#xff0c;那我们就从sort1开始 递增测试&#xff1a; 发现测试到sort4就出现报错&#xff1a; 我们查看源码&#xff1a; 从图中可看出&#xff1a;用户输入的sort值被用于查…

《Effective Objective-C》阅读笔记(下)

目录 内存管理 理解引用计数 引用计数工作原理 自动释放池 保留环 以ARC简化引用计数 使用ARC时必须遵循的方法命名规则 变量的内存管理语义 ARC如何清理实例变量 在dealloc方法中只释放引用并解除监听 编写“异常安全代码”时留意内存管理问题 以弱引用避免保留环 …

穷举vs暴搜vs深搜vs回溯vs剪枝(典型算法思想)—— OJ例题算法解析思路

回溯算法的模版 void backtrack(vector<int>& path, vector<int>& choice, ...) {// 满⾜结束条件if (/* 满⾜结束条件 */) {// 将路径添加到结果集中res.push_back(path);return;}// 遍历所有选择for (int i 0; i < choices.size(); i) {// 做出选择…

【Java项目】基于Spring Boot的校园博客系统

【Java项目】基于Spring Boot的校园博客系统 技术简介&#xff1a;采用Java技术、Spring Boot框架、MySQL数据库等实现。 系统简介&#xff1a;校园博客系统是一个典型的管理系统&#xff0c;主要功能包括管理员&#xff1a;首页、个人中心、博主管理、文章分类管理、文章信息…

计算机毕业设计SpringBoot+Vue.js图书进销存管理系统(源码+文档+PPT+讲解)

温馨提示&#xff1a;文末有 CSDN 平台官方提供的学长联系方式的名片&#xff01; 温馨提示&#xff1a;文末有 CSDN 平台官方提供的学长联系方式的名片&#xff01; 温馨提示&#xff1a;文末有 CSDN 平台官方提供的学长联系方式的名片&#xff01; 作者简介&#xff1a;Java领…

算法-数据结构(图)-迪杰斯特拉最短逻辑算法( Dijkstra)

迪杰斯特拉算法&#xff08;Dijkstras Algorithm&#xff09; 是一种用于计算单源最短路径的经典算法&#xff0c;由荷兰计算机科学家 艾兹赫尔迪杰斯特拉&#xff08;Edsger W. Dijkstra&#xff09; 于1956年提出。它的主要目标是找到从图中的某个源节点到所有其他节点的最短…

C语言【进阶篇】之指针——涵盖基础、数组与高级概念

目录 &#x1f680;前言&#x1f914;指针是什么&#x1f31f;指针基础&#x1f4af;内存与地址&#x1f4af;指针变量&#x1f4af; 指针类型&#x1f4af;const 修饰指针&#x1f4af;指针运算&#x1f4af;野指针和 assert 断言 &#x1f4bb;数组与指针&#x1f4af;数组名…

关于命令行下的 git( git add、git commit、git push)

文章目录 关于 gitgit 的概念git 操作&#xff08;git add、git commit、git push 三板斧&#xff09;安装 git新建仓库及配置git clone.gitignoregit addgit commitgit push其他 git 指令git pull&#xff08;把远端的东西拉到本地进行同步&#xff09;其他指令 关于 git git…

DaoCloud 亮相 2025 GDC丨开源赋能 AI 更多可能

2025 年 2 月 21 日至 23 日&#xff0c;上海徐汇西岸&#xff0c;2025 全球开发者先锋大会以 “模塑全球&#xff0c;无限可能” 的主题&#xff0c;围绕云计算、机器人、元宇宙等多元领域&#xff0c;探讨前沿技术创新、应用场景拓展和产业生态赋能&#xff0c;各类专业论坛、…

极速探索 HarmonyOS NEXT:开启国产操作系统开发的新篇章

极速探索 HarmonyOS NEXT&#xff1a;开启国产操作系统开发的新篇章 一、引言二、HarmonyOS NEXT 是什么&#xff1f;背景核心特性 三、HarmonyOS NEXT 的发展历程从 LiteOS 到 HarmonyOS 的逐步演进HarmonyOS NEXT 5.0 的发布 四、HarmonyOS NEXT 对科技的影响技术突破开发者生…

火狐浏览器多开指南:独立窗口独立IP教程

无论是跨境电商从业者需要管理多个店铺账号&#xff0c;还是海外社交媒体营销人员要运营多个社交平台账号&#xff0c;亦或是从事多账号广告投放的人员&#xff0c;都面临着一个共同的挑战 —— 如何高效管理多个账号&#xff0c;并确保每个账号的独立性。 在这种情况下&#…

内容中台是什么?内容管理平台解析

内容中台的核心价值 现代企业数字化转型进程中&#xff0c;内容中台作为中枢系统&#xff0c;通过构建统一化的内容管理平台实现数据资产的高效整合与智能调度。其核心价值体现在打破传统信息孤岛&#xff0c;将分散于CRM、ERP等系统的文档、知识库、产品资料进行标准化归集&a…

1.2 Kaggle大白话:Eedi竞赛Transformer框架解决方案02-GPT_4o生成训练集缺失数据

目录 0. 本栏目竞赛汇总表1. 本文主旨2. AI工程架构3. 数据预处理模块3.1 配置数据路径和处理参数3.2 配置API参数3.3 配置输出路径 4. AI并行处理模块4.1 定义LLM客户端类4.2 定义数据处理函数4.3 定义JSON保存函数4.4 定义数据分片函数4.5 定义分片处理函数4.5 定义文件名排序…

sql server笔记

创建数据库 use master gocreate database stuuuuu//删除数据库if db_id ($$$) is not nullDrop database [$$$] go//新建表USE [studyTest] GOSET ANSI_NULLS ON GOSET QUOTED_IDENTIFIER ON GOCREATE TABLE [dbo].[Table_1]([id] [int] NULL,[name] [varchar](10) NULL ) ON…

uni小程序wx.switchTab有时候跳转错误tab问题,解决办法

在一个子页面里面使用uni.switchTab或者wx.switchTab跳转到tab菜单的时候&#xff0c;先发送了一个请求&#xff0c;然后执行跳转到tab菜单&#xff0c;但是这个时候&#xff0c;出错了........也是非常的奇怪&#xff0c;不加请求就没问题......但是业务逻辑就是要先执行某个请…