【RabbitMQ】快速入门及基本使用

news2025/1/21 2:59:16

一、引言

1、、消息队列

Ⅰ、什么是消息队列?

        消息队列是一种进程间通信或同一进程的不同线程间的通信方式,软件的贮列用来处理一系列的输入,通常是来自用户。消息队列提供了异步的通信协议,每一个贮列中的纪录包含详细说明的数据,包含发生的时间,输入设备的种类,以及特定的输入参数。也就是说:消息的发送者和接收者不需要同时与消息队列交互。消息会保存在队列中,直到接收者取回它。

Ⅱ、Message queue 释义

        服务之间最常见的通信方式是直接调用彼此来通信,消息从一端发出后立即就可以达到另一端,称为即时消息通讯(同步通信) 消息从某一端发出后,首先进入一个容器进行临时存储,当达到某种条件后,再由这个容器发送给另一端,称为延迟消息通讯(异步通信)

Ⅲ、消息队列特点

  1. 异步性:消息队列允许发送者发送消息后继续其它任务,而不需要等待接收者的响应。这种异步性能够提高系统的吞吐量和响应速度。
  2. 解耦性:消息队列将发送者和接收者解耦,使它们不需要知道彼此的存在。发送者只需将消息发送到队列中,而不需要关心谁将接收该消息。接收者通过订阅队列来获取消息,而不需要关心消息的发送者。这种解耦性能够提高系统的可扩展性和可维护性。
  3. 可靠性:消息队列通常具有持久化机制,可以确保消息的可靠性传输。即使在发送者或接收者出现故障的情况下,消息也不会丢失。此外,消息队列还具有消息重试和消息确认等机制,确保消息能够被正确处理。
  4. 消息类型和格式:消息队列中的消息是有类型的,并且具有格式。这使得消息可以被按类型读取,并遵循一定的格式。
  5. 多进程支持:消息队列允许一个或多个进程向它写入或者读取消息。
  6. 数据持久性:当从消息队列中读出消息后,消息队列中对应的数据都会被删除,确保数据不会重复消费。
  7. 分布式特性:通过对消费者的横向扩展,降低了消息队列阻塞的风险,以及单个消费者产生单点故障的可能性。

Ⅳ、好处

  1. 高吞吐量:由于消息的传输速度比普通的文件快,所以能够实现高吞吐量。
  2. 支持异步操作:一个线程在处理完自己的任务之后,可以把结果发送到另一个线程。
  3. 支持并发操作:多个线程可以同时处理消息队列中的消息。
  4. 支持分布式系统:由于消息队列的解耦特性,分布式系统中的组件可以独立扩展。
  5. 提供数据持久化机制:消息队列可以将数据进行持久化,确保数据不会因为处理过程中的失败而丢失。
  6. 提供错误恢复机制:当系统的一部分组件失效时,消息队列可以保证即使一个处理消息的进程挂掉,加入队列中的消息仍然可以在系统恢复后被处理。
  7. 提高响应速度:通过异步处理,消息队列可以显著提高系统的响应速度。
  8. 解耦:在项目启动之初来预测将来项目会碰到什么需求是极其困难的。使用消息队列可以在处理过程中间插入一个隐含的、基于数据的接口层,两边的处理过程都要实现这一接口。这允许你独立的扩展或修改两边的处理过程,只要确保它们遵守同样的接口约束。
  9. 扩展性:由于消息队列解耦了你的处理过程,所以增大消息入队和处理的频率是很容易的,只要另外增加处理过程即可。
  10. 灵活性:使用消息队列能够使关键组件顶住增长的访问压力,而不是因为超出负荷的请求而完全崩溃。
  11. 可恢复性:当体系的一部分组件失效时,不会影响到整个系统。
  12. 送达保证:消息队列提供的冗余机制保证了消息能被实际的处理,只要一个进程读取了该队列即可。
  13. 排序保证:在许多情况下,数据处理的顺序都很重要。消息队列能保证数据会按照特定的顺序来处理。
  14. 缓冲:在任何重要的系统中,都会有需要不同的处理时间的元素。例如,加载一张图片比应用过滤器花费更少的时间。消息队列通过一个缓冲层来帮助任务最高效率的执行——写入队列的处理会尽可能的快速,而不受从队列读的预备处理的约束。该缓冲有助于控制和优化数据流经过系统的速度。
  15. 理解数据流:在一个分布式系统里,要得到一个关于用户操作会用多长时间及其原因的总体印象,是个巨大的挑战。消息系列通过消息被处理的频率,来方便的辅助确定那些表现不佳的处理过程或领域,这些地方的数据流都不够优化。
  16. 异步通信:很多时候,你不想也不需要立即处理消息。消息队列提供了异步处理机制,允许你把一个消息放入队列,但并不立即处理它。你想向队列中放入多少消息就放多少,然后在你乐意的时候再去处理它们。

2、消息队列相关

Ⅰ、AMQP

        一个提供统一消息服务的应用层标准高级消息队列协议,是一个通用的应用层协议 消息发送与接受的双方遵守这个协议可以实现异步通讯.这个协议约定了消息的格式和工作方式.

技术选型

3、什么是RabbitMQ

        RabbitMQ是一个开源的消息队列系统,使用Erlang语言开发,基于AMQP(高级消息队列协议)实现。它最初起源于金融系统,用于在分布式系统中存储和转发消息。RabbitMQ的主要特性包括易用性、扩展性、高可用性以及可靠性。消息队列(MQ)是一种应用程序对应用程序的通信方法,应用程序通过读写出入队列的消息来通信,而无需专用连接来链接它们。在事件驱动(发布-订阅)架构中,RabbitMQ扮演着Broker的角色。

  • Server(Broker):接收客户端连接,实现AMQP协议的消息队列和路由功能的进程.
  • Virtual Host:虚拟主机的概念,类似权限控制组,一个Virtual Host里可以有多个Exchange和Queue.
  • Exchange:交换机,接收生产者发送的消息,并根据Routing Key将消息路由到服务器中的队列Queue.
  • ExchangeType:交换机类型决定了路由消息行为,RabbitMQ中有三种类型Exchange,分别是fanout、direct、topic.
  • Message Queue:消息队列,用于存储还未被消费者消费的消息.
  • Message:由Header和body组成,Header是由生产者添加的各种属性的集合,包括Message是否被持久化、优先级是多少、由哪个Message Queue接收等.body是真正需要发送的数据内 容.
  • BindingKey:绑定关键字,将一个特定的Exchange和一个特定的Queue绑定起来.

二、快速入门

1、Docker安装部署RabbitMQ

【注意】获取镜像的时候要获取management版本的,不要获取last版本的,management版本的才带有管理界面

docker pull rabbitmq:management

--hostname:主机名(RabbitMQ的一个重要注意事项是它根据所谓的 “节点名称” 存储数据,默认为主机名)

-e:指定环境变量:
       RABBITMQ_DEFAULT_VHOST:默认虚拟机名
        RABBITMQ_DEFAULT_USER:默认的用户名
        RABBITMQ_DEFAULT_PASS:默认用户名的密码

docker run -d \
--name 容器名\
-p 5672:5672 -p 15672:15672 \
-v /home/rabbitmq:/var/lib/rabbitmq \
--hostname my-rabbitmq-host \
-e RABBITMQ_DEFAULT_VHOST=my_vhost \
-e RABBITMQ_DEFAULT_USER=admin \
-e RABBITMQ_DEFAULT_PASS=admin \
--restart=always \
rabbitmq:management

容器启动后,查看容器日志

docker logs 容器名

开启防火墙

systemctl start firewalld

开放端口

firewall-cmd --zone=public --add-port=15672/tcp --permanent
firewall-cmd --zone=public --add-port=5672/tcp --permanent

更新防火墙规则

firewall-cmd --reload

Ⅰ、配置用户

进入管理后台

密码:admin

账号:admin

http://ip:15672

创建用户

点进用户进行分配

2、springboot连接配置

Ⅰ、新建项目

 新建一个空项目,里面新建两个spring boot模块,并且导入依赖

接收者(publisher)消费者(consumer)

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

Ⅱ、配置yml

spring:
  rabbitmq:
    host: 0.0.0.0 #虚拟机开启的IP地址
    username: spring #创建的用户
    password: 123456 #用户的密码
    port: 5672
    virtual-host: my_vhost 

Ⅲ、案例一

接收一个string类型

生产者

import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
@SuppressWarnings("all")
public class RabbitConfig {
   @Bean
    public Queue firstQueue() {
        // 创建一个名为firstQueue的队列
        return new Queue("firstQueue");
    }
}

使用Controller 

@RestController
@SuppressWarnings("all")
public class SenderController {
    // 自动装配rabbitTemplate
    @Autowired
    private AmqpTemplate rabbitTemplate;

    // 发送消息到firstQueue队列
    @RequestMapping("/sendFirst")
    public String sendFirst() {
        // 将消息转换并发送到firstQueue队列
        rabbitTemplate.convertAndSend("firstQueue", "Hello World");
        return "🐉";
    }
}

访问http://localhost:8888/sendFirst

可以看到消息队列中有了一个

消费者

@Component
@SuppressWarnings("all")
@Slf4j
@RabbitListener(queues = "firstQueue")
public class Receiver {
    @RabbitHandler
    public void process(String msg) {
        log.warn("接收到:" + msg);
    }
}

运行之后可以看到我们发送的消息

Ⅳ、案例二

我们接受一个实体类

生产者(publisher)

新建一个实体类User 实现接口Serializable

@SuppressWarnings("all")
@Data
@AllArgsConstructor
@NoArgsConstructor
public class User implements Serializable {
private String username;
private String userpwd;
}

RabbitConfig 类里面添加一个队列

    @Bean
    public Queue secondQueue() {
        // 创建一个名为 secondQueue 的队列
        return new Queue("secondQueue");
    }

完整代码 RabbitConfig 

@Configuration
@SuppressWarnings("all")
public class RabbitConfig {
    @Bean
    public Queue firstQueue() {
        // 创建一个名为firstQueue的队列
        return new Queue("firstQueue");
    }

    @Bean
    public Queue secondQueue() {
        // 创建一个名为 secondQueue 的队列
        return new Queue("secondQueue");
    }
}

在Controller 层添加方法

        // 自动装配ObjectMapper
    @Autowired
    private ObjectMapper objectMapper;
// 发送消息到secondQueue队列
    @RequestMapping("/send2")
    public String send2() throws JsonProcessingException {
        // 创建一个User对象
        User wfzldr = new User("wfzldr", "1234567890");
        // 将User对象转换为json字符串
        String json = objectMapper.writeValueAsString(wfzldr);
        // 将消息转换并发送到firstQueue队列
        // 将消息转换并发送到secondQueue队列
        rabbitTemplate.convertAndSend("secondQueue", json);
        return "🐉";
    }

完整代码SenderController

@RestController
@SuppressWarnings("all")
public class SenderController {
   // 自动装配rabbitTemplate
    @Autowired
    private AmqpTemplate rabbitTemplate;
    // 自动装配ObjectMapper
    @Autowired
    private ObjectMapper objectMapper;

    // 发送消息到firstQueue队列
    @RequestMapping("/sendFirst")
    public String sendFirst() {
        // 将消息转换并发送到firstQueue队列
        rabbitTemplate.convertAndSend("firstQueue", "Hello World");
        return "🐉";
    }

    // 发送消息到secondQueue队列
    @RequestMapping("/send2")
    public String send2() throws JsonProcessingException {
        // 创建一个User对象
        User wfzldr = new User("wfzldr", "1234567890");
        // 将User对象转换为json字符串
        String json = objectMapper.writeValueAsString(wfzldr);
        // 将消息转换并发送到firstQueue队列
        // 将消息转换并发送到secondQueue队列
        rabbitTemplate.convertAndSend("secondQueue", json);
        return "🐉";
    }
}

消费者(consumer)

新建一个接受实体的Receiver

@Component
@SuppressWarnings("all")
@Slf4j
@RabbitListener(queues = "secondQueue")
public class EntityReceiver {
    @Autowired
    private ObjectMapper objectMapper;

    @RabbitHandler
    public void process(String json) throws JsonProcessingException {
        User user = objectMapper.readValue(json, User.class);
        log.warn("接收到:" + user);
    }
}

我破门也需要在消费者里添加实体

@SuppressWarnings("all")
@Data
@AllArgsConstructor
@NoArgsConstructor
public class User implements Serializable {
private String username;
private String userpwd;
}

运行两个方法

 

3、创建公共模块

在项目里面新建一个publi的公共模块,在里面写公共的实体类

在公共模块的pom.xml文件里面把打包方式war改成 jar 

在对应的消费者或者生产者里面引入对应的模块即可

        <!--        引入公共模块-->
        <dependency>
            <groupId>org.example</groupId>
            <artifactId>public</artifactId>
            <version>1.0-SNAPSHOT</version>
        </dependency>

我的分享就到这里,欢迎大家在评论区留言!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1397877.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

HCIA-HarmonyOS设备开发认证-序

序 最近涉及到HarmonyOS鸿蒙系统设备开发&#xff0c;在网络上已经有很多相关资料&#xff0c;视频教程&#xff0c;我也移植了公司的一个stm32G474板卡&#xff0c;运行LiteOS-m L0系统。 一面看资料一面移植&#xff0c;遇到不少坑&#xff0c;当看到运行的LOGO时&#xff0…

关于ElasticSearch,你应该知道的

一、集群规划优化实践 1、基于目标数据量规划集群 在业务初期&#xff0c;经常被问到的问题&#xff0c;要几个节点的集群&#xff0c;内存、CPU要多大&#xff0c;要不要SSD&#xff1f; 最主要的考虑点是&#xff1a;你的目标存储数据量是多大&#xff1f;可以针对目标数据…

纯命令行在Ubuntu中安装qemu的ubuntu虚拟机,成功备忘

信息总体还算完整&#xff0c;有个别软件更新了名字&#xff0c;所以在这备忘一下 1. 验证kvm是否支持 ________________________________________________________________ $ grep vmx /proc/cpuinfo __________________________________________________________________…

vue.js安装

1:下载 Node.js 官网&#xff1a;https://nodejs.org/en/download 2:安装 node -v npm -v 3:配置 npm config set prefix "F:\node\node_global" npm config set cache "F:\node\node_cache" 按 win 键并输入“编辑系统环境变量”调出系统属性界面&a…

汽车微电机行业研究:预计2029年将达到188亿美元

微电机行业是技术密集型行业&#xff0c;其起源于欧洲的德国、瑞士等国家&#xff0c;发展于日本。随着改革开放&#xff0c;中国作为发展中国家&#xff0c;承接了德国、日本等发达国家的汽车微电机产业转移&#xff0c;技术扩散逐步向我国转移。 微特电机广泛应用于信息处理设…

5G+物联网:连接万物,重塑智慧社区,开启未来生活新纪元,助力智慧社区的革新与发展

一、5G与物联网&#xff1a;技术概述与基础 随着科技的飞速发展&#xff0c;第五代移动通信技术&#xff08;5G&#xff09;和物联网&#xff08;IoT&#xff09;已经成为当今社会的热门话题。这两项技术作为现代信息社会的核心基础设施&#xff0c;正深刻地改变着人们的生活和…

解决字符串类型转数字类型相加结果异常问题

js字符串类型转换数字类型有七种方法&#xff0c;分别是parseInt()&#xff0c;parseFloat()&#xff0c;Math.floor()&#xff0c;乘以数字&#xff08;*1&#xff09;&#xff0c;Number()&#xff0c;双波浪号 (~~number)&#xff0c;一元运算符&#xff08;number&#xff…

Linux:多线程

目录 1.线程的概念 1.1线程的理解 1.2进程的理解 1.3线程如何看待进程内部的资源? 1.4进程 VS 线程 2.线程的控制 2.1线程的创建 2.2线程的等待 2.3线程的终止 2.4线程ID 2.5线程的分离 3.线程的互斥与同步 3.1相关概念 3.2互斥锁 3.2.1概念理解 3.2.2操作理解…

x-cmd pkg | aliyun - 阿里云 CLI

目录 简介首次用户技术特点竞品和相关作品进一步阅读 简介 aliyun 是基于阿里云 OpenAPI 的管理工具&#xff0c;用于与阿里云服务交互&#xff0c;管理阿里云资源。 首次用户 使用 x env use aliyun 即可自动下载并使用 在终端运行 eval "$(curl https://get.x-cmd.com…

NativePHP:开发跨平台原生应用的强大工具

NativePHP 是一种创新性的技术&#xff0c;可以帮助开发者使用 PHP 语言构建原生应用程序。本文将介绍 NativePHP 的概念和优势&#xff0c;探讨其在跨平台应用开发中的应用&#xff0c;并提供一些使用 NativePHP 开发原生应用的最佳实践。 什么是 NativePHP&#xff1f; Nati…

解决系统开发中的跨域问题:CORS、JSONP、Nginx

文章目录 一、概述1.问题场景2.浏览器的同源策略3.解决思路 二、一点准备工作1.创建前端工程12.创建后端工程3.创建前端工程24.跨域问题 三、方法1&#xff1a;使用CORS四、方法2&#xff1a;JSONP五、方法3&#xff1a;Nginx1.安装和启动&#xff08;windows&#xff09;2.使用…

多维时序 | Matlab实现CNN-LSTM-Mutilhead-Attention卷积长短期记忆神经网络融合多头注意力机制多变量时间序列预测

多维时序 | Matlab实现CNN-LSTM-Mutilhead-Attention卷积长短期记忆神经网络融合多头注意力机制多变量时间序列预测 目录 多维时序 | Matlab实现CNN-LSTM-Mutilhead-Attention卷积长短期记忆神经网络融合多头注意力机制多变量时间序列预测效果一览基本介绍程序设计参考资料 效果…

Qt —— 编译Qt5版本QFTP库,并实现连接服务、获取列表、上传、下载、删除文件等操作(附源码、附基于Qt5编译好的QFTP库)

示例效果1 示例效果2 介绍 QFTP是Qt4的库,Qt5改用了QNetworkAccessManager来代替。但是Qt5提供的QNetworkAccessManager仅支持FTP的上传和下载,所以只能将QFTP库编译为Qt5的库来进行调用。 QFTP在Github的下载地址:https://github.com/qt/qtftp 客户端源码生成的release结果…

安捷伦AgilentE8363B网络分析仪

安捷伦AgilentE8363B网络分析仪 E8363B 是 Agilent 的 40 GHz 网络分析仪。网络分析仪是一种功能强大的仪器&#xff0c;可以以无与伦比的精度测量射频设备的线性特性。许多行业使用网络分析仪来测试设备、测量材料和监控信号的完整性 附加功能&#xff1a; 104 dB 的动态范围…

基于 Redis 实现高性能、低延迟的延时消息的方案演进

1、前言 随着互联网的发展&#xff0c;越来越多的业务场景需要使用延迟队列。比如: 任务调度:延时队列可以用于任务调度&#xff0c;将需要在未来某个特定时刻执行的任务放入队列中。消息延迟处理: 延时队列可以用于消息系统&#xff0c;其中一些消息需要在一段时间后才能被消…

beego的控制器Controller篇 — 错误处理

1 错误处理 在做 Web 开发的时候&#xff0c;经常需要页面跳转和错误处理&#xff0c;beego 这方面也进行了考虑&#xff0c;通过 Redirect 方法来进行跳转&#xff1a; func (this *AddController) Get() {this.Redirect("/", 302) } 如何中止此次请求并抛出异常…

基于ORB算法的图像匹配

基础理论 2006年Rosten和Drummond提出一种使用决策树学习方法加速的角点检测算法&#xff0c;即FAST算法&#xff0c;该算法认为若某点像素值与其周围某邻域内一定数量的点的像素值相差较大&#xff0c;则该像素可能是角点。 其计算步骤如下&#xff1a; 1&#xff09;基于F…

Java 数组、二维数组、值传递和引用传递的区别

数组可以存放多个同一类型的数据。数组也是一种数据类型&#xff0c;是引用类型。 即&#xff1a;数(数据)组(一组)就是一组数据。 数组是一种数据结构&#xff0c; 用来存储同一类型值的集合。通过一个整型下标可以访问数组中的每一个值。 数组的定义 在声明数组变量时&…

Unity—配置lua环境变量+VSCode 搭建 Lua 开发环境

每日一句&#xff1a;保持须臾的浪漫&#xff0c;理想的喧嚣&#xff0c;平等的热情 Windows 11下配置lua环境变量 一、lua-5.4.4版本安装到本地电脑 链接&#xff1a;https://pan.baidu.com/s/14pAlOjhzz2_jmvpRZf9u6Q?pwdhd4s 提取码&#xff1a;hd4s 二、高级系统设置 此电…

削峰填谷与应用间解耦:分布式消息中间件在分布式环境下并发流量控制的应用

这是《百图解码支付系统设计与实现》专栏系列文章中的第&#xff08;18&#xff09;篇&#xff0c;也是流量控制系列的第&#xff08;4&#xff09;篇。点击上方关注&#xff0c;深入了解支付系统的方方面面。 本篇重点讲清楚分布式消息中间件的特点&#xff0c;常见消息中间件…