SpringCloud 微服务全栈体系(十)

news2025/1/12 16:14:00

第十章 RabbitMQ

一、初识 MQ

1. 同步和异步通讯

  • 微服务间通讯有同步和异步两种方式:

    • 同步通讯:就像打电话,需要实时响应。

    • 异步通讯:就像发邮件,不需要马上回复。

在这里插入图片描述

  • 两种方式各有优劣,打电话可以立即得到响应,但是你却不能跟多个人同时通话。发送邮件可以同时与多个人收发邮件,但是往往响应会有延迟。
1.1 同步通讯
  • 之前学习的 Feign 调用就属于同步方式,虽然调用可以实时得到结果,但存在下面的问题:

在这里插入图片描述

  • 总结:

    • 同步调用的优点:

      • 时效性较强,可以立即得到结果
    • 同步调用的问题:

      • 耦合度高
      • 性能和吞吐能力下降
      • 有额外的资源消耗
      • 有级联失败问题
1.2 异步通讯
  • 异步调用则可以避免上述问题:

    • 我们以购买商品为例,用户支付后需要调用订单服务完成订单状态修改,调用物流服务,从仓库分配响应的库存并准备发货。

    • 在事件模式中,支付服务是事件发布者(publisher),在支付完成后只需要发布一个支付成功的事件(event),事件中带上订单 id。

    • 订单服务和物流服务是事件订阅者(Consumer),订阅支付成功的事件,监听到事件后完成自己业务即可。

    • 为了解除事件发布者与订阅者之间的耦合,两者并不是直接通信,而是有一个中间人(Broker)。发布者发布事件到 Broker,不关心谁来订阅事件。订阅者从 Broker 订阅事件,不关心谁发来的消息。

在这里插入图片描述

  • Broker 是一个像数据总线一样的东西,所有的服务要接收数据和发送数据都发到这个总线上,这个总线就像协议一样,让服务间的通讯变得标准和可控。

  • 好处:

    • 吞吐量提升:无需等待订阅者处理完成,响应更快速

    • 故障隔离:服务没有直接调用,不存在级联失败问题

    • 调用间没有阻塞,不会造成无效的资源占用

    • 耦合度极低,每个服务都可以灵活插拔,可替换

    • 流量削峰:不管发布事件的流量波动多大,都由 Broker 接收,订阅者可以按照自己的速度去处理事件

  • 缺点:

    • 架构复杂了,业务没有明显的流程线,不好管理
    • 需要依赖于 Broker 的可靠、安全、性能
  • 好在现在开源软件或云平台上 Broker 的软件是非常成熟的,比较常见的一种就是 MQ 技术。

2. 技术对比

  • MQ,中文是消息队列(MessageQueue),字面来看就是存放消息的队列。也就是事件驱动架构中的 Broker。
2.1 比较常见的 MQ 实现
  • ActiveMQ
  • RabbitMQ
  • RocketMQ
  • Kafka
2.2 几种常见 MQ 的对比
RabbitMQActiveMQRocketMQKafka
公司/社区RabbitApache阿里Apache
开发语言ErlangJavaJavaScala&Java
协议支持AMQP,XMPP,SMTP,STOMPOpenWire,STOMP,REST,XMPP,AMQP自定义协议自定义协议
可用性一般
单机吞吐量一般非常高
消息延迟微秒级毫秒级毫秒级毫秒以内
消息可靠性一般一般
  • 追求可用性:Kafka、 RocketMQ 、RabbitMQ

  • 追求可靠性:RabbitMQ、RocketMQ

  • 追求吞吐能力:RocketMQ、Kafka

  • 追求消息低延迟:RabbitMQ、Kafka

二、快速入门

1. 安装 RabbitMQ

1.1 单机部署
  • 在 Centos7 虚拟机中使用 Docker 来安装。
1.1.1 下载镜像
  • 方式一:在线拉取
docker pull rabbitmq:3-management
  • 方式二:从本地加载 - 资料已经提供了镜像包:
    见专栏 -> 全栈资料包 -> 资源包/02_cloud

    • 上传到虚拟机中后,使用命令加载镜像即可:
docker load -i mq.tar
1.1.2 安装 MQ
  • 执行下面的命令来运行 MQ 容器:
docker run \
 -e RABBITMQ_DEFAULT_USER=alex \
 -e RABBITMQ_DEFAULT_PASS=123321 \
 --name mq \
 --hostname mq1 \
 -p 15672:15672 \
 -p 5672:5672 \
 -d \
 rabbitmq:3-management
1.2 集群部署
集群分类
  • 在 RabbitMQ 的官方文档中,讲述了两种集群的配置方式:

    • 普通模式:普通模式集群不进行数据同步,每个 MQ 都有自己的队列、数据信息(其它元数据信息如交换机等会同步)。例如我们有 2 个 MQ:mq1,和 mq2,如果你的消息在 mq1,而你连接到了 mq2,那么 mq2 会去 mq1 拉取消息,然后返回给你。如果 mq1 宕机,消息就会丢失。
    • 镜像模式:与普通模式不同,队列会在各个 mq 的镜像节点之间同步,因此你连接到任何一个镜像节点,均可获取到消息。而且如果一个节点宕机,并不会导致数据丢失。不过,这种方式增加了数据同步的带宽消耗。
1.3 MQ 的基本结构

在这里插入图片描述

  • RabbitMQ 中的一些角色:

    • publisher:生产者
    • consumer:消费者
    • exchange:交换机,负责消息路由
    • queue:队列,存储消息
    • virtualHost:虚拟主机,隔离不同租户的 exchange、queue、消息的隔离

2. RabbitMQ 消息模型

  • RabbitMQ 官方提供了 5 个不同的 Demo 示例,对应了不同的消息模型:

在这里插入图片描述

3. 导入 Demo 工程

  • 资料提供了一个 Demo 工程,mq-demo:
    见专栏 -> 全栈资料包 -> 资源包/02_cloud

在这里插入图片描述

  • 导入后可以看到结构如下:

在这里插入图片描述

  • 包括三部分:

    • mq-demo:父工程,管理项目依赖
    • publisher:消息的发送者
    • consumer:消息的消费者

4. 入门案例

  • 简单队列模式的模型图:

在这里插入图片描述

  • 官方的 HelloWorld 是基于最基础的消息队列模型来实现的,只包括三个角色:

    • publisher:消息发布者,将消息发送到队列 queue
    • queue:消息队列,负责接受并缓存消息
    • consumer:订阅队列,处理队列中的消息
4.1 publisher 实现
  • 思路:

    • 建立连接
    • 创建 Channel
    • 声明队列
    • 发送消息
    • 关闭连接和 channel
  • 代码实现:

package com.alex.mq.helloworld;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import org.junit.Test;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class PublisherTest {
    @Test
    public void testSendMessage() throws IOException, TimeoutException {
        // 1.建立连接
        ConnectionFactory factory = new ConnectionFactory();
        // 1.1.设置连接参数,分别是:主机名、端口号、vhost、用户名、密码
        factory.setHost("192.168.150.101");
        factory.setPort(5672);
        factory.setVirtualHost("/");
        factory.setUsername("alex");
        factory.setPassword("123321");
        // 1.2.建立连接
        Connection connection = factory.newConnection();

        // 2.创建通道Channel
        Channel channel = connection.createChannel();

        // 3.创建队列
        String queueName = "simple.queue";
        channel.queueDeclare(queueName, false, false, false, null);

        // 4.发送消息
        String message = "hello, rabbitmq!";
        channel.basicPublish("", queueName, null, message.getBytes());
        System.out.println("发送消息成功:【" + message + "】");

        // 5.关闭通道和连接
        channel.close();
        connection.close();

    }
}
4.2 consumer 实现
  • 代码思路:

    • 建立连接
    • 创建 Channel
    • 声明队列
    • 订阅消息
  • 代码实现:

package com.alex.mq.helloworld;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class ConsumerTest {

    public static void main(String[] args) throws IOException, TimeoutException {
        // 1.建立连接
        ConnectionFactory factory = new ConnectionFactory();
        // 1.1.设置连接参数,分别是:主机名、端口号、vhost、用户名、密码
        factory.setHost("192.168.150.101");
        factory.setPort(5672);
        factory.setVirtualHost("/");
        factory.setUsername("alex");
        factory.setPassword("123321");
        // 1.2.建立连接
        Connection connection = factory.newConnection();

        // 2.创建通道Channel
        Channel channel = connection.createChannel();

        // 3.创建队列
        String queueName = "simple.queue";
        channel.queueDeclare(queueName, false, false, false, null);

        // 4.订阅消息
        channel.basicConsume(queueName, true, new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,
                                       AMQP.BasicProperties properties, byte[] body) throws IOException {
                // 5.处理消息
                String message = new String(body);
                System.out.println("接收到消息:【" + message + "】");
            }
        });
        System.out.println("等待接收消息。。。。");
    }
}

5. 总结

  • 基本消息队列的消息发送流程:
  1. 建立 connection

  2. 创建 channel

  3. 利用 channel 声明队列

  4. 利用 channel 向队列发送消息

  • 基本消息队列的消息接收流程:
  1. 建立 connection

  2. 创建 channel

  3. 利用 channel 声明队列

  4. 定义 consumer 的消费行为 handleDelivery()

  5. 利用 channel 将消费者与队列绑定

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1162413.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

弹幕互动游戏运营模式你知道吗?

分享一个今年最新的风口项目,弹幕互动游戏。这种游戏类型是,主播真人出镜主持游戏过程,观众在直播间通过评论和礼物可以加入,并且操控游戏里的角色,等于是游戏和直播的结合。那普通人怎么把握这波风口呢? 说…

氮化硼纳米球 BN纳米球 hexagonal boron nitride

(西)氮化硼纳米球 (安)用途:科研 (瑞)平均粒径:50nm (禧)纯度:99.9% (生)比表面积:54.23m2/g &#xff0…

“利用Lazada API揭秘电商数据:一键获取海量商品详情!“

要使用Lazada API获取Lazada商品详情,您需要先注册Lazada开发者账号并获取授权码和密钥。然后,通过调用Lazada API的item_get接口,传入商品ID和国家域名后缀,即可获取到商品的详细信息。 以下是使用Lazada API获取Lazada商品详情…

一篇文章让你了解MySQL中的索引

索引是怎么提高查询效率的?可以为了提高查询效率增加索引么?mysql索引系统采用的数据结构是什么?为什么要使用B树?聚集索引相对于非聚集索引的区别?什么是回表?什么是索引覆盖?什么是最左匹配原…

BSP视频教程第28期:CANopen主从机组网实战,CAN词典工具使用方法以及吃透PDO玩法

视频教程汇总帖:【学以致用,授人以渔】2023视频教程汇总,DSP第12期,ThreadX第9期,BSP驱动第28期,USB实战第5期,GUI实战第3期(2023-11-01) - STM32F429 - 硬汉嵌入式论坛 …

华为防火墙 配置 SSLVPN

需求: 公司域环境,大陆客户端居家办公室需要连到公司域,这里可以在上海防火墙上面开通SSLVPN,员工就可以透过SSLVPN连通上海公司的内网,但是由于公司域控有2个站点,一个在上海,一个在台北&…

长距离工业RFID读写器的特点

长距离工业RFID读写器是一种特殊的RFID设备,能够在较远的距离内读取和写入RFID标签上的信息。这种读写器通常用于工业自动化、物流跟踪、车辆管理等领域,以实现高效、准确的跟踪和管理。 长距离工业RFID读写器采用先进的射频技术和信号处理技术&#xff…

C# 图解教程 第5版 —— 第14章 委托

文章目录 14.1 什么是委托14.2 委托概述14.3 声明委托类型14.4 创建委托对象14.5 给委托赋值14.6 组合委托14.7 为委托添加方法14.8 从委托移除方法14.9 调用委托14.10 委托的示例(*)14.11 调用带返回值的委托14.12 调用带引用参数的委托14.13 匿名方法1…

【笔记】excel怎么把汉字转换成拼音

1、准备好excel文件,复制需要转拼音列。 2、打开一个空白Word文档,并粘贴刚才复制的内容; 3、全选Word文档中刚粘贴的内容,点击「开始」选项卡「字体」命令组下的「拼音指南」,调出拼音指南对话框; 4、全…

java入门,哈希函数

一、前言 一听到哈希函数这种东西就感觉是数学,增加了人们的印象它很难。其中在数据结构中的HashMap的存储方式就用到了哈希函数,所以它也算是java的基础。看到哈希别惊慌,首先它只不过是个名称,我们理解它是个函数就行&#xff…

全球最强长文本大模型,一次可读35万汉字:Baichuan2-192K上线

大模型看书,从来没有这么快过。 国内大模型创业公司,正在技术前沿创造新的记录。 10 月 30 日,百川智能正式发布 Baichuan2-192K 长窗口大模型,将大语言模型(LLM)上下文窗口的长度一举提升到了 192K toke…

计算机体系结构图,冯诺依曼模型(控制器,运算器,指令集,存储器,cache),os(为什么要有os+如何管理举例,系统调用,用户操作接口)

目录 引入 硬件 -- 冯诺依曼模型 背景 早期 -- 硬件化 冯诺依曼结构 存储程序控制原理 核心思想 结构 cpu -- (运算器和控制器) 介绍 控制器 运算器 指令集 存储器 介绍 内部存储器 读写操作 高速缓冲存储器Cache 内存分类 RAM ROM 外部存储器 软件 -- …

Windows ObjectType Hook 之 ParseProcedure

1、背景 Object Type Hook 是基于 Object Type的一种深入的 Hook,比起常用的 SSDT Hook 更为深入。 有关 Object Type 的分析见文章 《Windows驱动开发学习记录-ObjectType Hook之ObjectType结构相关分析》。 这里进行的 Hook 为 其中之一的 ParseProcedure。文章实…

【入门Flink】- 03Flink部署

集群角色 Flik提交作业和执行任务,需要几个关键组件: 客户端(Client):代码由客户端获取并做转换,之后提交给JobManger JobManager:就是Fink集群里的“管事人”,对作业进行中央调度管理;而它获…

2021上半年下午网络工程师试题

2021上半年下午网络工程师试题 试题一(共20分) 阅读以下说明,回答问题1至问题4,将解答填入答题纸对应的解答栏内。 【说明】 某企业网络拓扑图如图1-1所示。该网络可以实现的网络功能有: 1.汇聚层交换机A与交换机B采用VRRP技术组网; 2.…

myabtis流式查询

1、流式查询简介 流式处理在大数据方面应用比较广泛。随着数据的爆发式增长,流式处理的方式也被应用到日常的工具中,如JDK的对于集合处理的Stream流、Redis5.0新增的数据结构Stream专门来处理消息等。 流式查询指的是查询成功后不是返回一个集合而是返回…

景联文科技:高质量数据采集清洗标注服务,助力大语言模型红蓝对抗更加精准高效

红蓝对抗是一种测试和评估大语言模型的方法。通过模拟真实世界测试AI模型的潜在漏洞、偏见和弱点,确保大型语言模型的可靠性和性能。 在红蓝对抗过程中,由主题专家组成的专业团队负责模拟攻击和提供反馈,他们试图诱导AI模型产生不当行为&…

电子商务平台对接电商供应链,不得不说的开放平台电商API接口

B2B电商开放平台的设计需要从以下几面去思考: 开放平台API接口的设计,主要是从功能需求的角度,设计满足业务需求的接口及对应的字段; 平台与商家之间信息的对接,对接的方法有哪些?对接过程中需要可能会遇到…

PaDiM 无监督异常检测和定位-论文和源码阅读

目录 1. 论文 1.1 检测效果 1.2 框架 1.2.1 特征提取embedding extraction 1.2.2 正样本学习Learning of the normality 1.2.3 计算异常图 inference: computation of the anomaly map 2. 源码 2.1 dataset 2.2 model 2.3 提取特征 1. 论文 https://arxiv.org/abs/…