RabbitMQ基本介绍及简单上手

news2025/1/11 17:16:14

(一)什么是MQ

  MQ(message queue)本质上是队列,满足先入先出,只不过队列中存放的内容是消息而已,那什么是消息呢? 消息可以是字符串,json也可以是一些复杂对象

 我们应用场景通常是再分布式系统之间

    系统通信方式

一般我们的系统调用方式有两种

一种是同步通信,也就是a--->b,直接调用对方服务,数据从a出发直接到达b

另一种叫异步通信,也就是a----->容器---->b,数据从一段出发,先进入一个容器进行临时存储,当满足条件后,由容器发给另一端,这个容器的实现就是MQ

(二)MQ的作用

MQ主要的工作是接收并发送消息,在不同场景下可以有不同的作用

这里说一些比较常见的作用

1.异步解耦

  在业务流程中,有一些操作时很耗时的,并且可能阻塞其他任务,但是这些操作并不要求立即执行,我们就可以借助MQ来把这些操作异步化

2.流量削峰

 我们在一些特殊时间可能会有访问量突增的情况,那我们应用一般来说是不能立即全部处理的,所以我们可以使用MQ把访问放到消息队列中,然后依次的处理这些请求

3.消息分发

  当多个系统要对同一个数据做出响应时,我们可以使用MQ来进行消息分发,比如支付成功后,支付系统就可以向MQ发送消息,然后其他系统来订阅这个消息,就不需要轮询数据库了

4.延迟通知

 在一些定时要进行处理的一些场景中,我们可以使用MQ的延迟消息功能,就比如在电商平台,用户下单后一定时间未支付,我们可以把限定的时间放到队列中,时间到了自动取消订单

(三)RabbitMQ的核心概念

我们这里用RabbitMQ来学习消息队列

AMQP

首先我们来看一下这个,AMQP是一种高级消息队列,定义了一套确定的消息交换功能,包含交换机和队列如何映射等,这些组件共同工作,使消息能够正常被接收和发送,AMQP还定义了一个网络协议,允许客户端和AMQP模型进行通信

我们RabbitMQ就是实现了AMQP协议的,RabbitMQ就是AMQP协议的Erlang的实现

所以他们两者的结构模型是一样的

首先我们来看一下RabbitMQ的工作流程

RabbitMQ是⼀个消息中间件,也是⼀个⽣产者消费者模型.它负责接收,存储并转发消息.

首先我们先来了解一下图中出现的一些概念

1.Producer和Consumer

Producer:生产者,是MQ的客户端,用来向RabbitMQ发送消息的

Consumer:消费者,也是MQ的客户端,是用来从RabbitMQ中取消息的

Brokeer就是RabbitMQ,用来接收和转发消息的

我们也可以简化下上面的图,只关注这三者

2.Connection和Channel

Connection:连接,是客户端和RabbitMQ服务器建立的一个TCP连接,这个连接时建立消息传递的基础,它负责传输客户端和服务器之间的所有数据和控制信息

Channel:信道,每个建立的connection都可以由很多个信道,每个信道是一个独立的虚拟连接,消息的发送和接收都是通过channel的

注:信道的作用是为了把所有的读写操作都给复用到同一个TCP连接上,这个可以减少建立和关闭连接的开销,提高性能

3.Virtual host

Virtual host:虚拟机,这个是消息队列提供的一种逻辑上的隔离机制(本质上没有隔离),对于Rabbit一个Broker可以有多个虚拟机,当多个不同的⽤⼾使⽤同⼀个 RabbitMQ Server提供的服务时,可以虚拟划分出多个vhost,每个⽤⼾在⾃⼰的vhost创建exchange/queue等

4.queue

queue:队列,是RabbitMQ中的内部对象,是存储消息的地方

多个消费者可以订阅同一个队列

5.exchange

exchange:交换机 ,是消息到达broker中的第一站(因为虚拟机是逻辑上的本质上是不存在的),它负责接收生产者传来的消息,并根据routingkey(bingingkey)来把这些消息路由到一个或者多个queue中,exchange起到了消息路由的作用

6.RabbitMQ的工作流程

1.生产者生产消息

2.生产者与RabbitMQ建立一个连接并且开启一个信道

3.生产者声明一个交换机

4.生产者声明一个队列

5.生产者发送消息到Broker

6.Broker接收消息,并存入到相列中,如果没有相应的队列,就根据生产者的配置来进行一些处理

(四)RabbitMQ入门程序

  首先我们要使用RabbitMQ我们要在服务器上进行安装,那安装就先不说了,我们可以在官方文档或者b站找到很多资料

1.引入依赖

<dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
            <version>5.20.0</version>
        </dependency>

生产者代码:

public class Produce {
    public static void main(String[] args) throws IOException, TimeoutException {
          //建立连接
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("62.234.46.219");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("wd");
        connectionFactory.setPassword("w85315678");
        connectionFactory.setVirtualHost("bit");
        Connection connection = connectionFactory.newConnection();
        //开启信道
        Channel channel = connection.createChannel();
        //声明交换机(这里使用默认交换机)
        //声明队列
        /**
         * var1 队列名称
         * var2 是否持久化
         * var3 是否独占(这个队列只有一个消费者)
         * var4 是否自动删除
         * argument 一些高级参数
         * AMQP.Queue.DeclareOk queueDeclare(String var1, boolean var2, boolean var3, boolean var4, Map<String, Object> var5) throws IOException;
         */
        channel.queueDeclare("hello",true,false,false,null);
        //发送消息
        /**
         * var1 交换机名称
         * var2 交换机通过什么映射到queue中(内置交换机routingkey与队列名称保持一致)
         * var3 属性配置
         * body 消息
         * void basicPublish(String var1, String var2, AMQP.BasicProperties var3, byte[] var4)
         */
        String s1="hello wd";
        channel.basicPublish("","hello",null,s1.getBytes());
        //释放信道
        channel.close();
        //释放连接
        connection.close();
    }
}

消费者代码:

public class consumer {
    public static void main(String[] args) throws IOException, TimeoutException {
        //创建连接
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("62.234.46.219");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("wd");
        connectionFactory.setPassword("w85315678");
        connectionFactory.setVirtualHost("bit");
        Connection connection = connectionFactory.newConnection();
        //创建信道
        Channel channel = connection.createChannel();
        //声明队列(如果队列没有才创建,有就不创建)
        channel.queueDeclare("hello",true,false,false,null);
        //消费消息
        /**
         * String basicConsume(String queue, boolean autoAck, Consumer callback)
         * queue 队列名称
         * autoAck是否自动确认
         * callback接收消息后要执行的逻辑
         */
        DefaultConsumer consumer=new DefaultConsumer(channel){
            //从队列中获取到消息就会执行的方法
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("接收到: "+new String(body));
            }
        };
        channel.basicConsume("hello",true,consumer);
        //释放资源
        channel.close();
        connection.close();
    }
}

消费者代码中有一个类叫Consumer

也就是我们channel.basicConsume()中的最后一个参数

这个参数是来定义消费者行为的,当我们需要从MQ中接收消息时,我们要提供一个实现了Consumer接口的对象(这里我使用的是内部类)

DefaultConsumer是RabbitMQ提供的一个默认消费者,实现了Consumer接口

核心方法就是我们上面重写的方法

那上面我们没有说参数都是什么意思,这里来说一下

consumerTag:消费者标签,通常是消费者在订阅队列时指定的

envelope:有一些封包信息,如队列名称,交换机等

properties:一些配置信息

body:消息的具体内容

(五)可能出现的错误

1.

我们上面销毁的过程中,一定要先销毁channel再销毁connection或者直接销毁connection不销毁channel

如果先销毁connection再销毁channel就会报错

2.

我们在消费者代码中,如果不自己声明队列,且MQ中没有这个队列时,我们就会报错

3.

如果端口或者ip错误,也会报错

4.账号密码不正确也会报错 

5.用户对虚拟机没有操作权限会报错

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2275007.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【Rust自学】11.3. 自定义错误信息

喜欢的话别忘了点赞、收藏加关注哦&#xff0c;对接下来的教程有兴趣的可以关注专栏。谢谢喵&#xff01;(&#xff65;ω&#xff65;) 11.3.1. 添加错误信息 在 11.2. 断言(Assert) 中我们学习了assert!、assert_eq!和assert_ne!这三个宏&#xff0c;而这篇文章讲的就是它…

某漫画网站JS逆向反混淆流程分析

文章目录 1. 写在前面1. 接口分析2. 反混淆分析 【&#x1f3e0;作者主页】&#xff1a;吴秋霖 【&#x1f4bc;作者介绍】&#xff1a;擅长爬虫与JS加密逆向分析&#xff01;Python领域优质创作者、CSDN博客专家、阿里云博客专家、华为云享专家。一路走来长期坚守并致力于Pyth…

网络分析与监控:阿里云拨测方案解密

作者&#xff1a;俞嵩(榆松) 随着互联网的蓬勃发展&#xff0c;网络和服务的稳定性已成为社会秩序中不可或缺的一部分。一旦网络和服务发生故障&#xff0c;其带来的后果将波及整个社会、企业和民众的生活质量&#xff0c;造成难以估量的损失。 2020 年 12 月&#xff1a; Ak…

STL——二叉搜索树

目录 二叉搜索树的概念 ⼆叉搜索树的性能分析 ⼆叉搜索树的插⼊ ⼆叉搜索树的查找 ⼆叉搜索树的删除 中序遍历结果为升序序列 二叉搜索树的概念 ⼆叉搜索树⼜称⼆叉排序树&#xff0c;它或者是⼀棵空树&#xff0c;或者是具有以下性质的⼆叉树 • 若它的左⼦树不为空&#…

【文件I/O】UNIX文件基础

IO编程的本质是通过 API 操作 文件。 什么是 IO I - Input 输入O - Output 输出 这里的输入和输出都是站在应用&#xff08;运行中的程序&#xff09;的角度。外部特指文件。 这里的文件是泛指&#xff0c;并不是只表示存在存盘中的常规文件。还有设备、套接字、管道、链接…

VS调试MFC进入系统源代码配置

调试MFC代码有时候能进入MFC的源代码,有时候不能.之前一直没有深入研究.后面经过查资料发现每次调试必能进入源代码的配置.很简单,只需要3步. 1.打开工具->选项->调试->符号,勾选Microsoft符号服务器. 2.打开项目->属性->配置属性->常规,MFC的使用修改成&qu…

车载网络:现代汽车的数字心跳

在汽车领域&#xff0c;“智能汽车”一词毫不夸张。如今的汽车已不再是原始的机械工程&#xff0c;而是通过先进的车载网络无缝连接的精密数字生态系统。这些滚动计算机由复杂的电子控制单元(ECU)网络提供动力&#xff0c;ECU是负责管理从发动机性能到信息娱乐系统等一切事务的…

mycat介绍与操作步骤

文章目录 1.分库分表2.mycat 入门2.1 概述2.2 案例&#xff1a;水平分表1&#xff09;准备工作2&#xff09;配置3&#xff09;启动并测试 3.mycat 配置详解3.1 schema.xml3.2 rule.xml3.3 server.xml 4.mycat 分片&#xff1a;垂直拆分1&#xff09;准备工作2&#xff09;配置…

【Python】Python之Selenium基础教程+实战demo:提升你的测试+测试数据构造的效率!

这里写目录标题 什么是Selenium&#xff1f;Selenium基础用法详解环境搭建编写第一个Selenium脚本解析脚本脚本执行结果常用的元素定位方法常用的WebDriver方法等待机制 Selenium高级技巧详解页面元素操作处理弹窗和警告框截图和日志记录多窗口和多标签页操作 一个实战的小demo…

Apache XMLBeans 一个强大的 XML 数据处理框架

Apache XMLBeans 是一个用于处理 XML 数据的 Java 框架&#xff0c;它提供了一种方式将 XML Schema (XSD) 映射到 Java 类&#xff0c;从而使得开发者可以通过强类型化的 Java 对象来访问和操作 XML 文档。下面将以一个简单的案例说明如何使用 Apache XMLBeans 来解析、生成和验…

带格式 pdf 翻译

支持 openAI 接口&#xff0c;国内 deepseek 接口兼容 openAI 接口&#xff0c; deepseek api 又非常便宜 https://pdf2zh.com/ https://github.com/Byaidu/PDFMathTranslate

ubuntu22.04降级安装CUDA11.3

环境&#xff1a;主机x64的ubuntu22.04&#xff0c;原有CUDA12.1&#xff0c;但是现在需要CUDA11.3&#xff0c;本篇文章介绍步骤。 一、下载CUDA11.3的run文件 下载网址&#xff1a;https://developer.nvidia.com/cuda-11-3-1-download-archive?target_osLinux&target_…

9 异常

如果你希望在软件调试上有所突破,或者想了解如何通过异常进行反调试,或者想自己写一个调试器,那么就必须要深入了解异常,异常与调试是紧密相连的,异常是调试的基础。 异常产生后,首先是要记录异常信息(异常的类型、异常发生的位置等),然后要寻找异常的处理函数,我们…

springBoot整合ELK Windowsb版本 (elasticsearch+logstash+kibana)

springBoot整合ELK Windowsb版本 【elasticsearchlogstashkibana】 下载软件启动服务1、elasticsearch2、kibana3、logstash 集成springboot1、添加依赖2、在logback.xml添加相关配置3、修改logstash 配置4、重启logstash 最后测试 下载软件 elasticsearch 官网 https://www.…

详解Sonar与Jenkins 的集成使用!

本文阅读前提 本文假设读者熟悉Jenkins和SonarQube的基础操作。 核心实现功能 Jenkins中运行的job来调用SonarScanner&#xff0c;最后可实现测试结果与SonarQube中同步查看。 Jenkins中安装Sonar相关插件 配置Sonarqube Dashboard>Manage Jenkins>Systems 指定son…

鸿蒙面试 2025-01-10

写了鉴权工具&#xff0c;你在项目中申请了那些权限&#xff1f;&#xff08;常用权限&#xff09; 位置权限 &#xff1a; ohos.permission.LOCATION_IN_BACKGROUND&#xff1a;允许应用在后台访问位置信息。 ohos.permission.LOCATION&#xff1a;允许应用访问精确的位置信息…

php 使用simplexml_load_string转换xml数据格式失败

本文介绍如何使用php函数解析xml数据为数组。 <?php$a <xml><ToUserName><![CDATA[ww8b77afac71336111]]></ToUserName><FromUserName><![CDATA[sys]]></FromUserName><CreateTime>1736328669</CreateTime><Ms…

【多空资金博弈】综合副图指标,资金做多线,短线做多雷达,中长线共振,大资金进场会涨等技术信号

如上图&#xff0c;副图指标【多空资金博弈】&#xff0c;红线做多资金线&#xff0c;绿色线为做空资金线&#xff0c;紫色柱线为短线做多雷达信号&#xff0c;紫色圆柱叠加文字为大资金进场信号&#xff0c;堆量柱线和紫色空心柱线为底部吸筹建仓信号&#xff0c;三条横向虚线…

Win11家庭版转专业版

Win11家庭版转专业版&#xff08;亲测有效&#xff09; 第一步 【断网】输入这个密钥&#xff1a; R8NJ8-9X7PV-C7RCR-F3J9X-KQBP6 第二步 点击下一步会自动重启 第三步 【联网】输入这个密钥&#xff1a; F3NWX-VFMFC-MHYYF-BCJ3K-QV66Y 注意 两次输入密钥的地方一致 …

【云商城】高性能门户网构建

第3章 高性能门户网构建 网站门户就是首页 1.OpenResty 百万并发站点架构 ​ 1).OpenResty 特性介绍 ​ 2).搭建OpenResty ​ 3).Web站点动静分离方案剖析 2.Lua语法学习 ​ 1).Lua基本语法 3.多级缓存架构实战 ​ 1).多级缓存架构分析 用户请求网站&#xff0c;最开始…