RabbitMQ笔记

news2025/1/19 23:02:47

一、MQ与RabbitMQ概述


1. MQ简述


MQ(Message Queue)消息队列,是基础数据结构中 “先进先出” 的一种数据结构,也是在消息的传输过程中保存消息的容器(中间件),多用于分布式系统之间进行通信。

一般MQ用来解决系统耦合、异步消息、流量削峰等问题,实现高性能、高可用、可伸缩和最终一致性架构。(AP架构)

在这里插入图片描述

分布式系统有两种通信方式:直接远程调用 和 借助第三方(MQ)完成间接通信。(发送方称为生产者,接收方称为消费者)


2. MQ的优势与劣势


2.1 MQ的优势


MQ的优势:(应用解耦、异步、削峰)

  • 应用解耦:提高系统容错性和可维护性;
  • 异步提速:提升用户体验和系统吞吐量;
  • 削峰填谷:提高系统稳定性。

1、应用解耦

在这里插入图片描述
在这里插入图片描述


2、异步提速

在这里插入图片描述

在这里插入图片描述


3、削峰填谷

在这里插入图片描述

填谷:

在这里插入图片描述

使用了MQ之后,限制消费消息的速度为1000,这样一来,高峰期产生的数据势必会被积压在MQ中,高峰就被“削”掉了,但是因为消息积压,在高峰期过后的一段时间内,消费消息的速度还是会维持在1000,直到消费完积压的消息,这就叫做“填谷”,从而提升系统的稳定性。


2.2 MQ的劣势


引入MQ会遇到下列问题:

  • 消息可靠性问题(如何确保发送的消息至少被消费者消费一次,避免消息丢失问题)
  • 延迟消息问题 (如何实现消息的延迟投递,解决方案:使用延时队列、TTL、延迟队列插件实现)
  • 高可用问题(如何避免单点MQ故障而导致的不可用问题,解决方案:搭建MQ集群)
  • 消息堆积问题(如何解决数百万消息堆积,无法及时消费的问题)

3. 常见的MQ产品


市面上有很多MQ产品,例如RabbitMQ、RocketMQ、ActiveMQ、Kafka、ZeroMQ、MetaMq等,也有直接使用Rdis充当消息队列的场景。在实际技术选型时,需要结合自身需求及MQ产品特征来综合考虑。

几种常见MQ的对比:

RabbitMQActiveMQRocketMQKafka
公司/社区RabbitApache阿里Apache
开发语言Erlang(二郎神,高并发语言)JavaJavaScala&Java
协议支持AMQP,XMPP,SMTP,STOMPOpenWire,STOMP,REST,XMPP,AMQP自定义协议自定义协议,社区封装了http协议支持
可用性一般
单机吞吐量一般(万级)高(十万级)非常高(十万级)
消息延迟微秒级毫秒级毫秒级毫秒以内
消息可靠性一般一般
功能特性并发能力强,性能极其好,延迟低,社区活跃,管理界面丰富老牌产品,成熟度高,文档较多MQ功能比较完备,扩展性佳只支持主要的MQ功能,毕竟是为大数据领域准备的。
  • 追求可用性(高->低):Kafka、 RocketMQ 、RabbitMQ;

  • 追求可靠性:RabbitMQ、RocketMQ;

  • 追求吞吐能力:RocketMQ、Kafka;

  • 追求消息低延迟:RabbitMQ、Kafka。


4. RabbitMQ简述


RabbitMQ官网地址:http://www.rabbitmq.com/

在这里插入图片描述

RabbitMQ是基于AMQP协议使用Erlang语言开发的一款消息队列产品。

AMQP (全称Advanced Message Queuing Protocol,表示高级消息队列协议),是一个网络协议,是应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件不同产品,不同的开发语言等条件的限制。2006年,AMQP规范发布。类比HTTP。(同类的协议还有MQTT用于物联网场景下)

RabbitMQ中的一些角色:(AMQP协议消息中间件类似)

  • publisher:生产者;
  • consumer:消费者;
  • exchange :交换机,负责消息路由;
  • queue:队列,存储消息;
  • virtualHost:虚拟主机,隔离不同租户的exchange、queue、消息的隔离。

在这里插入图片描述

RabbitMQ工作模式:

RabbitMQ提供了6种工作模型,但是我们常用的只有5种:简单队列模型、工作队列模型、发布订阅模型(广播、路由、主题)。(第六种RPC远程调用不属于mq)

在这里插入图片描述

  • 官网对应模式介绍:https://www.rabbitmq.com/getstarted.html

JMS(JavaMessage Service)

  • JMS,Java消息服务应用程序接口,即Java操作消息中间件的API;
  • JMS是JavaEE规范的一种,类比JDBC;
  • 很多消息中间件都实现了JMS规范,例如:ActiveMQ。RabbitMQ官方没有提供JMS的实现包,但是开源社区有。

二、RabbitMQ安装与配置


1. 基于docker快速安装RabbitMQ


扩展:docker-compose安装rabbitmq:https://gitee.com/aopmin/docker-compose/blob/master/Linux/RabbitMQ/docker-compose.yml

1、拉取镜像

docker pull rabbitmq:3.8-management

2、运行容器

 docker run \
 -e RABBITMQ_DEFAULT_USER=admin \
 -e RABBITMQ_DEFAULT_PASS=123456 \
 -v /docker/rabbitmq/plugins:/plugins \
 --name rabbitmq  \
 --hostname my-rabbit \
 -p 15672:15672 \
 -p 5672:5672 \
 -d \
 rabbitmq:3.8-management
  • \ 代表换行
  • -e 指定环境变量
  • -e RABBITMQ_DEFAULT_USER=admin用户名
  • -e RABBITMQ_DEFAULT_PASS=123456密码
  • -v 挂载目录或文件 (数据卷)
  • -p 15672:15672 用于web管理页面使用的端口 (管理员页面)
  • -p 5672:5672 用于生产和消费端使用的端口(通信端口,也就是在代码里使用)
  • -d 后台运行
  • –name rabbitmq 容器名字
  • –hostname my-rabbit(RabbitMQ的一个重要注意事项是它根据所谓的 “节点名称” 存储数据,默认为主机名);

3、启动xxx插件

# 进入容器
docker exec -it rabbitmq /bin/bash

# 启动xxx插件
rabbitmq-plugins enable xxx

RabbitMQ管理端:

管理端访问地址:http://ip:15672/

在这里插入图片描述
在这里插入图片描述


2. 创建用户和虚拟机


1、添加一个新用户:

在这里插入图片描述

添加成功后列表会显示该用户,但是这个用户没有操作权限,需要为他创建一个虚拟机:

在这里插入图片描述


2、创建虚拟机

在这里插入图片描述

为指定用户授权:

在这里插入图片描述

最后该用户就可以操作这个虚拟机了:

在这里插入图片描述


三、RabbitMQ快速入门


使用简单模型中的基本模式完成消息传递:

在这里插入图片描述

官方的HelloWorld示例是基于简单消息队列模型来实现的,其中包括三个角色:

  • publisher:消息发布者,将消息发送到队列queue;
  • queue:消息队列,负责接受并缓存消息;
  • consumer:订阅队列,处理队列中的消息。

1. 基础环境搭建


1、创建父工程mq-demo,并在pom文件中导入如下依赖:

<parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>2.3.9.RELEASE</version>
    <relativePath/>
</parent>

<dependencies>
    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
    </dependency>
    <!--SpringAMQP依赖,包含RabbitMQ-->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>
    <!--单元测试-->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-test</artifactId>
    </dependency>
</dependencies>

2、创建子模块publisher、consumer,并编写启动类和yml配置文件:

# 日志输出格式配置
logging:
  pattern:
    dateformat: MM-dd HH:mm:ss:SSS

在这里插入图片描述


2. publisher消息发布者实现


消息收发流程:Connection连接、Channel通道、queue队列、exchange 交换机。

publisher消息发布者实现思路:

  • 建立连接
  • 创建Channel
  • 声明队列
  • 发送消息
  • 关闭连接和channel

1、编写publisher测试代码:

package com.baidou.mq.test;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import org.junit.Test;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * 生产者
 * @author 白豆五
 * @version 2023/04/27
 * @since JDK8
 */
public class PublisherTest {
    @Test
    public void testSendMessage() throws IOException, TimeoutException {
        // 1.创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        // 1.1.设置连接参数,分别是:主机名、端口号、vhost、用户名、密码
        factory.setHost("192.168.200.128");
        factory.setPort(5672);
        factory.setVirtualHost("/");
        factory.setUsername("admin");
        factory.setPassword("123456");
        // 1.2.建立连接
        Connection connection = factory.newConnection();

        // 2.创建通道Channel
        Channel channel = connection.createChannel();

        // 3.创建队列
        String queueName = "simple.queue";
        channel.queueDeclare(queueName, false, false, false, null);

        // 4.发送消息
        String message = "hello, rabbitmq!";
        channel.basicPublish("", queueName, null, message.getBytes());
        System.out.println("发送消息成功:【" + message + "】");

        // 5.关闭通道和连接
        channel.close();
        connection.close();

    }
}

在这里插入图片描述


2、在建立连接处打断点,并以debug方式启动(方便观察每个组件的创建)

在这里插入图片描述

查看连接信息:

在这里插入图片描述


继续按F8,查看通道信息:

在这里插入图片描述

在这里插入图片描述


继续按F8,查看队列信息:

在这里插入图片描述
在这里插入图片描述


最后直接放行程序,查看队列中的消息:

在这里插入图片描述


3. consumer消费者实现


consumer消费者实现思路:

  • 建立连接
  • 创建Channel
  • 声明队列
  • 订阅消息

1、编写消费者代码

package com.baidou.mq.test;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * 消费者
 * @author 白豆五
 * @version 2023/04/27
 * @since JDK8
 */
public class ConsumerTest {
    public static void main(String[] args) throws IOException, TimeoutException {
        // 1.创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        // 1.1.设置连接参数,分别是:主机名、端口号、vhost、用户名、密码
        factory.setHost("192.168.200.128");
        factory.setPort(5672);
        factory.setVirtualHost("/");
        factory.setUsername("admin");
        factory.setPassword("123456");
        // 1.2.建立连接
        Connection connection = factory.newConnection();

        // 2.创建通道Channel
        Channel channel = connection.createChannel();

        // 3.创建队列
        String queueName = "simple.queue";
        channel.queueDeclare(queueName, false, false, false, null);

        // 4.订阅消息
        channel.basicConsume(queueName, true, new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,
                                       AMQP.BasicProperties properties, byte[] body) throws IOException {
                // 5.处理消息
                String message = new String(body);
                System.out.println("接收到消息:【" + message + "】");
            }
        });
        System.out.println("等待接收消息。。。。");
    }
}

在这里插入图片描述

2、测试(启动程序后会一直执行,不用的时候将程序结束即可)

在这里插入图片描述
在这里插入图片描述


四、SpringAMQP与RabbitMQ工作模型


1. SpringAMQP概述


AMQP是消息中间件收发消息的协议(规范),具体实现由各个消息中间厂商实现;(例如 RabbitMQ)

SpringAMQP是基于RabbitMQ封装的一套模板,并且还利用SpringBoot对其实现了自动装配,使用起来非常方便。

SpringAMQP的官方地址:https://spring.io/projects/spring-amqp

在这里插入图片描述

SpringAMQP提供了三个功能:

  • 自动声明队列、交换机及其绑定关系;
  • 基于注解的监听器模式,异步接收消息;
  • 封装了RabbitTemplate工具,用于发送消息 。

RabbitMQ工作模型:简单队列模型、工作队列模型、发布订阅模型(广播、路由、主题)。

在这里插入图片描述


2. BasicQueue 简单队列模型


使用SpringAMQP实现基础消息队列功能:

1、在父工程中引入spring-amqp起步依赖:

<!--SpringAMQP依赖,包含RabbitMQ-->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

2、消息发送

2.1、在publisher服务的application.yml中添加rabbitmq配置:

# 配置rabbitmq
spring:
  rabbitmq:
    host: 192.168.200.128 # 主机名
    port: 5672       # 端口
    virtual-host: /  # 虚拟主机
    username: admin  # 用户名
    password: 123456 # 密码

# 配置日志格式
logging:
  pattern:
    dateformat: MM-dd HH:mm:ss:SSS

2.2、在publisher服务中编写测试类SpringAmqpTest,并利用RabbitTemplate实现消息发送:

package com.baidou.mq.test;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

/**
 * 使用SpringAMQP实现简单队列模型的消息发送
 *
 * @author 白豆五
 * @version 2023/04/27
 * @since JDK8
 */
@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringAmqpTest {

    // 操作RabbitMQ的模板类
    @Autowired
    private RabbitTemplate rabbitTemplate;

    /**
     * 测试简单队列模型
     */
    @Test
    public void testSimpleQueue() {
        // 队列名称
        String queueName = "simple.queue";
        // 消息
        String message = "hello, spring amqp!";
        // 发送消息
        rabbitTemplate.convertAndSend(queueName, message);
    }
}

3、消息接收

3.1、在consumer服务的application.yml中添加rabbitmq配置:

# 配置rabbitmq
spring:
  rabbitmq:
    host: 192.168.200.128 # 主机名
    port: 5672       # 端口
    virtual-host: /  # 虚拟主机
    username: admin  # 用户名
    password: 123456 # 密码

# 配置日志格式
logging:
  pattern:
    dateformat: MM-dd HH:mm:ss:SSS

3.2、在consumer服务的com.baidou.mq.listener包中创建SpringRabbitListener类:

package com.baidou.mq.listener;

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * 消息监听类
 *
 * @author 白豆五
 * @version 2023/04/27
 * @since JDK8
 */
@Component
public class SpringRabbitListener {

    /**
     * 订阅消息
     *
     * @param msg
     * @throws InterruptedException
     */
    @RabbitListener(queues = "simple.queue") // 指定监听的队列名称为simple.queue
    public void listenSimpleQueueMessage(String msg) throws InterruptedException {
        System.out.println("spring 消费者接收到消息:【" + msg + "】");
    }
}

4、测试

先启动consumer服务(启动类),然后在publisher服务中运行测试代码,发送MQ消息。

在这里插入图片描述


3. WorkQueue 工作队列模型


本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/469404.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

[Pandas] 设置DataFrame的index索引起始值为1

导入数据 import pandas as pddf pd.DataFrame([[liver,E,89,21,24,64],[Arry,C,36,37,37,57],[Ack,A,57,60,18,84],[Eorge,C,93,96,71,78],[Oah,D,65,49,61,86]], columns [name,team,Q1,Q2,Q3,Q4]) df 上述DataFrame中的index索引列默认是从0开始的&#xff0c;那么我们…

【Spark】Spark是什么?能干什么?有什么特点?

一、什么是Spark 官网&#xff1a;http://spark.apache.org Apache Spark™ is a multi-language engine for executing data engineering, data science, and machine learning on single-node machines or clusters. Spark是一种快速、通用、可扩展的大数据分析引擎&#xf…

MATLAB连续LTI系统的时域分析(十)

目录 1、实验目的&#xff1a; 2、实验内容&#xff1a; 1、实验目的&#xff1a; 1&#xff09;掌握利用MATLAB对系统进行时域分析的方法&#xff1b; 2&#xff09;掌握连续时间系统零输入响应的求解方法&#xff1b; 3&#xff09;掌握连续时间系统零状态响应、冲激响应和…

AD9739配置解析与数据输出指南

1 概述 本文用于AD9737芯片的配置使用情况&#xff0c;以及数据输出的格式说明情况&#xff0c;数据速率的计算情况等。 AD9739是ADI公司的一款14BIT&#xff0c;可达2.5GSPS采样率的DAC芯片。 2 AD9739的性能 支持的输入数据速率&#xff1a;1.6GSPS TO 2.5GSPS. industry lea…

基于3D渲染和基于虚拟/增强现实的IIoT原理的数字孪生平台的方案论文阅读笔记

基于3D渲染和基于虚拟/增强现实的IIoT原理的数字孪生平台的方案论文阅读笔记 论文原文链接&#xff1a;https://ieeexplore.ieee.org/abstract/document/9039804 本笔记对部分要点进行了翻译和批注&#xff0c;原文和翻译可参考链接阅读&#xff0c;此处不进行完整翻译。 论文…

【服务网格】Service Mesh 是什么?为我们解决了什么问题?

文章目录 背景一、Service Mesh 介绍Service Mesh的定义Service Mesh 诞生 二、Service Mesh 解决的问题三、Service Mesh 的原理四、Service Mesh具体是怎么实现的&#xff1f;Istio是什么&#xff1f;istio架构和主要功能Istio 1.5.1 性能总结Istio与Kubernetesistio的实战案…

四、MyBatis获取参数值的两种方式(重点)

文章目录 四、MyBatis获取参数值的两种方式&#xff08;重点&#xff09;4.1 单个字面量类型的参数4.2 多个字面量类型的参数4.3 map集合类型的参数4.4 实体类类型的参数4.5 使用Param标识参数 四、MyBatis获取参数值的两种方式&#xff08;重点&#xff09; MyBatis获取参数值…

IP-GUARD屏幕记录能实现平时不记录,特定操作触发记录吗?

支持触发性屏幕记录。部分策略有选项“记录屏幕”,勾选后,策略触发时,会自动记录客户端当时的屏幕情况,记录次数和间隔可通过配置修改。 所有包含了记录屏幕的策略有: 应用程序、上网浏览、流量控制、网络控制、邮件控制、IM传送控制、上传控制、文档控制、打印控制、敏感…

深浅拷贝,类型检测及继承面试题

01 对象的深/浅拷贝 1.1 变量的存储 基本类型&#xff1a;基本类型的值存在栈内存中 引用类型&#xff1a; 引用类型的地址存储在栈内存中&#xff0c;他的值存储在堆内存中&#xff0c;通过指针(地址)连接 1.2 变量拷贝 基本类型&#xff1a;基本类型拷贝的是值 引用类型…

达梦数据库中,如何设置表的访问控制权限?

在工作中&#xff0c;大家都会遇到这样的场景&#xff0c;出于对数据库访问安全的考虑&#xff0c;对于某些用户我们不想让他们看到全库的表&#xff0c;只想给他们特定表的访问权限。那么在DM数据库中我们该如何去配置相应的权限呢&#xff1f; 我们下面来进行详细解析。 我们…

buuctf6

目录 [ACTF2020 新生赛]BackupFile [RoarCTF 2019]Easy Calc 利用PHP的字符串解析特性Bypass: http走私攻击 来首歌吧 荷兰宽带数据泄露​编辑 面具下的flag [ACTF2020 新生赛]BackupFile 1.打开环境 查看源代码得不到任何想要的信息&#xff0c;使用目录扫描来得到我们…

关于秒杀系统的一系列问题

阻塞队列怎么么实现&#xff1f;超卖问题&#xff1f;整体怎么实现&#xff1f; 5 设计一个秒杀系统 特点&#xff1a;高并发&#xff0c;请求量远大于库存量&#xff0c;只有少数能成功&#xff1b;逻辑比较简单&#xff0c;下单减库存&#xff1b; 设计理念&#xff1a;**限…

ESP8266基于Lua开发点灯示例

ESP8266基于Lua开发点灯示例 ✨基于ESPlorer IDE&#xff1a;https://github.com/4refr0nt/ESPlorer/releases&#x1f527;固件烧录工具&#xff1a;NodeMCU-PyFlasher&#x1f33f;esp8266烧录工具下载&#xff1a;https://github.com/marcelstoer/nodemcu-pyflasher/release…

LabVIEW CompactRIO 开发指南2 CompactRIO软件架构

第一章 CompactRIO软件架构 几乎所有的CompactRIO系统都至少有三个顶层VIs在三个不同的目标上异步执行:FPGA、实时操作系统(RTOS)和主机PC。如果开始软件开发时没有某种架构或流程图可供参考&#xff0c;那么可能会发现跟踪所有软件组件和通信路径是具有挑战性的。拥有一个在…

CentOS7上使用yum搭建LNMP架构并实现wordpress博客实战

前言 &#x1f3e0;个人主页&#xff1a;我是沐风晓月 &#x1f9d1;个人简介&#xff1a;大家好&#xff0c;我是沐风晓月&#xff0c;阿里云社区博客专家&#x1f609;&#x1f609; &#x1f495; 座右铭&#xff1a; 先努力成长自己&#xff0c;再帮助更多的人 &#xff0c…

魔兽worldserver.conf 服务端配置文件说明

魔兽worldserver.conf 服务端配置文件说明 我是艾西&#xff0c;今天把很多小伙伴需要的魔兽worldserver.conf 服务端配置文件说明分享给大家&#xff0c;大家可以自己研究参考下 worldserver.conf 这个文件是服务端的配置文件&#xff0c;可以在这里做很多个性化修改 注意&a…

SpringSecurity跌坑指南

SpringSecurity跌坑指南 1&#xff0c;事情原委 这两天开始了毕业设计&#xff0c;但是突然发现自己的java方面的基础比较薄弱&#xff0c;于是决定自己从头到尾的开发一个java项目 要说跌的最惨的坑&#xff0c;莫过于springsecurity&#xff0c;如果你只是想要在项目里面配…

〖ChatGPT实践指南 - 零基础扫盲篇⑦〗- 基于 Python 实现的 OpenAI-Library 的简单使用

文章目录 ⭐ python 安装 OpenAI library⭐ 创建 openai.py 进行测试⭐ openai.Completion.create() 方法的小拓展 该章节我们呢来学习一下 OpenAI-Library 的使用&#xff0c;OpenAI-Library 是 OpenAI 官方给我们提供的各种开发语言的库&#xff0c;供我们使用。在前面的章节…

【无标题】基于matlab的长短期神经网络lstm的股票预测

目录 背影 摘要 LSTM的基本定义 LSTM实现的步骤 基于长短期神经网络LSTM的股票预测 MATALB编程实现&#xff0c;附有代码 效果图 结果分析 展望 参考论文 背影 股票市场的波动十分巨大&#xff0c;由于一些不确定因素的影响&#xff0c;导致很难对股票进行投资盈利。因此&…

数据库 SQL Server 检测到基于一致性的逻辑 I/O 错误 校验和不正确 解决方法

目录 一、错误提示信息&#xff1a; 二、原因分析&#xff1a; 三、解决方法&#xff1a; 四、执行完成结果&#xff1a; 五、重要说明 一、错误提示信息&#xff1a; 本文验证是数据库中的baiduAi_0258表无法通过select语句ID条件查询&#xff0c; 如执行 select * fro…