微服务学习|初识MQ、RabbitMQ快速入门、SpringAMQP

news2024/11/18 12:40:30

初识MQ

同步通讯和异步通讯

同步通讯是实时性质的,就好像你用手机与朋友打视频电话,但是,别人再想与你视频就不行了,异步通讯不要求实时性,就好像你用手机发短信,好多人都能同时给你发短信,你都可以收到,而且不用及时回复。

同步调用的问题

微服务间基于Feign的调用就属于同步方式,存在一些问题

比如用户调用支付服务时,它需要先后调用订单服务、仓储服务、短信服务等,都调用结束后,支付服务再返回用户相关信息,故这个过程的响应时间实际上就是所有这些相关服务执行之后所用时间之和,这样是非常影响效率的。但是也有优点,时效性较强,可以立即得到结果

同步调用存在的问题

1.如果我们想对支付服务增加一些功能,增加一些别的服务,为了让支付功能调用这个新服务,我们需要改动相关的代码

2.调用者需要等待服务提供者响应,如果调用链过长则响应时间等于每次调用的时间之和。就如支付服务必须要等订单服务、仓储服务、短信服务都执行后,才能给出响应,很慢。

3.上述支付服务在等待其余服务响应的时候,资源是不释放的,一直在等待,高并发场景下嫉妒浪费系统资源

4.如果其中一个服务出现问题,例如仓储服务出现问题,则整个响应就无法进行了,就整个崩掉了。

异步调用方案

异步调用常见实现就是事件驱动模式

比如用户发起支付服务,此时会将该消息发给Broker,然后就不用管后面了,支付服务直接给用户响应,不需要管订单、仓储、短信这些服务的执行了,这几个服务提前订阅了这个Broker,所以,支付服务消息来了,会从Broker中获取到消息,然后执行自己的逻辑就好。

优势一:服务解耦

当我们对支付服务增加一些新的功能,新服务时,只需要将该服务订阅这个Broker即可,不需要改动支付服务的代码

优势二:性能提升,吞吐量提高

因为是异步调用,支付服务不需要关注其相关的其他服务执行,只需将支付消息发给Broker即可,然后就能直接给用户响应,非常快,后续相关的服务只需要从Broker中收到该消息后执行相关的业务操作就可。

优势三:服务没有强依赖,不担心级联失败问题

其中一个服务发生故障,整个支付服务不会因为这个服务故障了而停掉,支付服务依然会直接给用户响应。

优势四:流量削峰

因为有Broker的缘故,如果并发量比较大,这些消息都会暂存在Broker中,慢慢的让其余各服务去处理,所以能够很好的将流量削峰。

异步通信的缺点

依赖于Broker的可靠性、安全性、吞吐能力

架构复杂了,业务没有明显的流程线,不好追踪管理

什么是MQ?

MQ(MessageQueue),中文是消息队列,字面来看就是存放消息的队列。也就是事件驱动架构中的Broker。简单来讲,Kafka对并发场景性能更高,但是没有RabbitMQ安全,所以一般情况下就用RabbitMQ就可,中规中矩。

RabbitMQ快速入门

单机部署

我们在Centos7虚拟机中使用Docker来安装。

方式一:在线拉取

方式二:从本地加载

在课前资料已经提供了镜像包

上传到虚拟机中后,使用命令加载镜像即可

安装MQ

执行下面的命令来运行MQ容器

这里第一个15672端口是用来登录RabbitMQ控制台的,第二个5672端口是用来进行异步调用的。这里也要设置RabbitMQ控制台的登录账号itcast密码123321

用宿主机ip+第一个端口15672来访问RabbitMQ控制台,账号itcast密码123321,登录即可

RabbitMQ的结构和概念

RabbitMQ中的几个概念
channel:操作MQ的工具
exchange:路由消息到队列中
queue:缓存消息
virtualhost: 虚拟主机,是对queue、exchange等资源的逻辑分组

常见消息模型

MQ的官方文档中给出了5个MQ的Demo示例,对应了几种不同的用法

HelloWorld案例

官方的Helloworld是基于最基础的消息队列模型来实现的,只包括三个角色:
publisher:消息发布者,将消息发送到队列queue
queue:消息队列,负责接受并缓存消息
consumer:订阅队列,处理队列中的消息

编写publisher消息发布者代码,新建一个连接工厂,配置本机RabbitMQ的异步调用端口,以及账号密码,然后建立一个连接,connection对象。

连接对象connection生成后,我们可以在RabbitMQ的控制台,看到了这个新生成的连接

然后用这个connection连接对象创建一个通道channel对象

执行完后,控制台也能够看到这个新建的通道

然后用channel对象新建一个名为simple.queue的队列,并给这个队列发消息

可以在控制台看到这个新创建的名为simple.queue的队列,并且队列中已经有了我们刚发送的消息

订阅该队列的consumer编写,前三步与publisher一致

最后一步变成了利用channel将消费者与队列绑定,在handleDelivery()中定义consumer的消费行为

SpringAMQP

什么是SpringAMQP

案例:利用SpringAMQP实现HelloWorld中的基础消息队列功能

流程如下:
1.在父工程中引入spring-amqp的依赖

2.在publisher服务中利用RabbitTemplate发送消息到simple.queue这个队列

3.在consumer服务中编写消费逻辑,绑定simple.queue这个队列

步骤1: 引入AMQP依赖

因为publisher和consumer服务都需要amqp依赖,因此这里把依赖直接放到父工程mq-demo中

步骤2: 在publisher中编写测试方法,向simple.queue发送消息

1.在publisher服务中编写application.yml,添加mq连接信息

2.在publisher服务中新建一个测试类,编写测试方法

运行后,可在RabbitMQ控制台看到该队列,以及队列中有消息了

什么是AMQP?

应用间消息通信的一种协议,与语言和平台无关。

SpringAMOP如何发送消息?

引入amqp的starter依赖
配置RabbitMo地址
利用RabbitTemplate的convertAndSend方法

步骤3: 在consumer中编写消费逻辑,监听simple.queue

1.在consumer服务中编写application.yml,添加mq连接信息

2.在consumer服务中新建一个类,编写消费逻辑

SpringAMQP如何接收消息?
引入amqp的starter依赖
配置RabbitMQ地址
定义类,添加@Component注解
类中声明方法,添加@RabbitListener注解,方法参数就是消息

注意:消息一旦消费就会从队列删除,RabbitMQ没有消息回溯功能

Work Queue 工作队列

Work queue,工作队列,可以提高消息处理速度,避免队列消息堆积,也就是两个消费者同时订阅一个队列,共同处理队列中的消息

案例:模拟WorkQueue,实现一个队列绑定多个消费者

基本思路如下
1.在publisher服务中定义测试方法,每秒产生50条消息,发送到simple.queue

2.在consumer服务中定义两个消息监听者,都监听simple.queue队列
3.消费者1每秒处理50条消息,消费者2每秒处理10条消息

publisher服务中定义测试方法,每20ms发送一个消息,发送50次,也就是1秒向消息队列中发送50条消息

定义两个消息监听者,都监听simple.queue队列,消费者1一秒钟能处理50条消息,消费者2一秒钟能处理5条消息,故理论上这两个消费者能在1秒内处理完publisher发送的所有(50条)消息

但是,运行起来发现,并不是按照能力强的处理的消息多这样来分配的,而是这两个监听者,各分到了一般的消息,消费者1处理偶数号的消息,消费者2处理奇数的消息,这样最终结果就是消费者1很快的处理完了总共消息的一半,25条消息,而消费者2却花了好多秒去处理分给自己25条消息,最终的结果就是这两个消费者不能够在1秒内处理完所有消息。

实际上,它是有一个预取机制造成的这样结果,进入到消息队列中的消息会被提前预取给消费者,默认的是一人一个,这样平均分配的,消费者还没处理完,但是消息都已经全部预取出来给了对应消费者,我们可以配置预取机制,让每个消费者预取消息的数量为1,每次只能获取一消息,处理完成才能获取下一个消息,这样就可以做到能者多劳的结果。

消费预取限制

修改application.yml文件,设置preFetch这个值,可以控制预取消息的上限

配置完后再启动,就发现了消费者1执行的消息要比消费者2执行的消息多了,而不是简单的将消息都平均分配给监听的两个消费者。

Work模型的使用

多个消费者绑定到一个队列,同一条消息只会被一个消费者处理
通过设置prefetch来控制消费者预取的消息数量

发布 ( Publish )、订阅 ( Subscribe )

发布订阅模式与之前案例的区别就是允许将同一消息发送给多个消费者。实现方式是加入了exchange(交换机)。

常见exchange类型包括:
Fanout:广播
Direct: 路由
Topic: 话题

发布订阅-Fanout Exchange

Fanout Exchange会将接收到的消息路由到每一个跟其绑定的queue

案例:利用SpringAMQP演示FanoutExchange的使用

实现思路如下:
1.在consumer服务中,利用代码声明队列、交换机,并将两者绑定
2.在consumer服务中,编写两个消费者方法,分别监听fanout.queue1和fanout.queue2

3.在publisher中编写测试方法,向itcast.fanout发送消息

步骤1: 在consumer服务声明Exchange、Queue、 Binding

在consumer服务常见一个类,添加@Configuration注解,并声明FanoutExchange、Queue和绑定关系对象Binding,代码如下

启动后,可以在RabbitMQ控制台看到我们声明的交换机与两个队列完成绑定。

步骤2: 在consumer服务声明两个消费者

在consumer服务的SpringRabbitListener类中,添加两个方法,分别监听fanout.queue1和fanout.queue2

步骤3: 在publisher服务发送消息到FanoutExchange

在publisher服务的SpringAmqpTest类中添加测试方法

项目启动,两个消费者都收到了这个publisher发送的消息

交换机的作用是什么?
接收publisher发送的消息
将消息按照规则路由到与之绑定的队列

发布订阅-DirectExchange

Direct Exchange 会将接收到的消息根据规则路由到指定的Queue,因此称为路由模式 (routes)。每一个Queue都与Exchange设置一个BindingKey
发布者发送消息时,指定消息的RoutingKey
Exchange将消息路由到BindingKey与消息RoutingKey一致的队列

案例:利用SpringAMQP演示DirectExchange的使用

实现思路如下:
1.利用@RabbitListener声明Exchange、Queue、RoutingKey
2.在consumer服务中,编写两个消费者方法,分别监听direct.queue1和direct.queue2
3.在publisher中编写测试方法,向itcast.direct发送消息

步骤1: 在consumer服务声明Exchange、Queue

1.在consumer服务中,编写两个消费者方法,分别监听direct.queue1和direct.queue2.
2.并利用@RabbitListener声明Exchange、Queue、RoutingKey

步骤2: 在publisher服务发送消息到DirectExchange

在publisher服务的SpringAmqpTest类中添加测试方法,convertAndSend的第二个参数就是RoutingKey,如果是red,则两个消费者都可收到,是blue,则只有消费者1收到,yellow则只有消费者2收到。

描述下Direct交换机与Fanout交换机的差异?
Fanout交换机将消息路由给每一个与之绑定的队列
Direct交换机根据RoutingKey判断路由给哪个队列
如果多个队列具有相同的RoutingKey,则与Fanout功能类似

基于@RabbitListener注解声明队列和交换机有哪些常见注解?
@Queue
@Exchange

发布订阅-TopicExchange

TopicExchange与DirectExchange类似,区别在于routingKey必须是多个单词的列表,并且以.分割。Queue与Exchange指定BindingKey时可以使用通配符:
#:代指0个或多个单词
*:代指一个单词

案例:利用SpringAMQP演示TopicExchange的使用

实现思路如下:
1.并利用@RabbitListener声明Exchange、Queue、RoutingKey
2.在consumer服务中,编写两个消费者方法,分别监听topic.queue1和topic.queue2

3.在publisher中编写测试方法,向itcast.topic发送消息

步骤1: 在consumer服务声明Exchange、Queue

1.在consumer服务中,编写两个消费者方法,分别监听topic.queue1和topic.queue2
2.并利用@RabbitListener声明Exchange、Queue、RoutingKey

步骤2: 在publisher服务发送消息到TopicExchange

在publisher服务的SpringAmqpTest类中添加测试方法,如果RoutingKey是china.news,则两个消费者都能够接收到消息,如果为china.weather,则只有消费者1能收到消息,如果是usa.news,则只有消费者2能收到消息

消息转换器

案例:测试发送Object类型消息

说明:在SpringAMQP的发送方法中,接收消息的类型是object,也就是说我们可以发送任意对象类型的消息,SpringAMOP会帮我们序列化为字节后发送

先在consumer中的fanoutConfig中声明一个队列object.queue

在publisher中发送消息以测试

执行完publisher发消息后,RabbitMQ控制台查看object.queue队列中刚收到的消息,看到为乱码,被序列化了

Spring的对消息对象的处理是由org.springframeworkamqp.support.converterMessageConverter来处理的。而默认实现是simpleMessageConverter,基于]DK的ObjectOutputStream完成序列化。

如果要修改只需要定义一个MessaeConverter 类型的Bean即可。推荐用ISON方式序列化,步骤如下:
我们在publisher服务引入依赖

我们在publisher服务声明MessageConverter,在该服务的主启动函数中声明即可。

再次重新执行完publisher发消息后,RabbitMQ控制台查看object.queue队列中刚收到的消息,看到消息已经成为了json格式,而不是乱码了

我们在consumer服务引入Jackson依赖

我们在consumer服务定义MessageConverter,在该服务的主启动函数中声明即可

然后定义一个消费者,监听object.queue队列并消费消息

服务启动后,这个consumer消费者收到了这个json消息。

SpringAMQP中消息的序列化和反序列化是怎么实现的?
利用MessageConverter实现的,默认是JDK的序列化
注意发送方与接收方必须使用相同的MessageConverter

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1250006.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

NX二次开发UF_CURVE_ask_curve_struct 函数介绍

文章作者:里海 来源网站:https://blog.csdn.net/WangPaiFeiXingYuan UF_CURVE_ask_curve_struct Defined in: uf_curve.h int UF_CURVE_ask_curve_struct(tag_t curve_id, UF_CURVE_struct_p_t * curve_struct ) overview 概述 Gets the structure p…

notion 3.0.0 版本最新桌面端汉化教程,支持MAC和WIN版本

notion客户端汉化(目前版本3.0.0) 最近notion桌面端更新了3.0.0版本后会导致老版本汉化失效,本项目实现了最新版Notion桌面端的汉化。 文件下载地址:汉化文件下载地址 项目说明 本项目针对新的客户端做了汉化文化,依…

【知网稳定检索】第九届社会科学与经济发展国际学术会议 (ICSSED 2024)

第九届社会科学与经济发展国际学术会议 (ICSSED 2024) 2024 9th International Conference on Social Sciences and Economic Development 第九届社会科学与经济发展国际学术会议(ICSSED 2024)定于2024年3月22-24日在中国北京隆重举行。会议主要围绕社会科学与经济发展等研究…

数据结构与算法编程题20

统计二叉树的叶结点个数。 #define _CRT_SECURE_NO_WARNINGS#include <iostream> using namespace std;typedef char ElemType; #define ERROR 0 #define OK 1 typedef struct BiNode {ElemType data;BiNode* lchild, * rchild; }BiNode,*BiTree;bool Create_tree(BiTre…

Feign接口请求返回异常 no suitable HttpMessageConvert found for response type

问题场景&#xff1a; 后端调用feign接口请求, 接口返回异常, no suitable HttpMessageConvert found for response type 问题描述 报错异常如下&#xff1a; //根据图片特征 去查询人员信息ResultVo<List> personVos ipbdFaceLibPersonApi.queryFacePersonByFeatur…

HTTP状态码:404 Not Found错误之谜

文章目录 HTTP 404 Not Found错误 404出现形式导致 HTTP 404 错误的原因&#xff1f;推荐阅读 HTTP 404 Not Found 错误 404&#xff0c;也称为“HTTP 404 Not Found”&#xff0c;是当无法找到所请求的资源时 Web 服务器返回的HTTP 状态代码。 简单来说&#xff0c;这意味着…

机器学习探索计划——数据集划分

文章目录 导包手写数据划分函数使用sklearn内置的划分数据函数stratifyy理解举例 导包 import numpy as np from matplotlib import pyplot as plt from sklearn.datasets import make_blobs手写数据划分函数 x, y make_blobs(n_samples 300,n_features 2,centers 3,clus…

Linux中vim的编译链接和gcc

gcc,g,gdb的安装 命令行写gcc,g,gdb根据提示安装:sudo apt install gcc/g/gdb gcc分布编译链接 (1)预编译: gcc -E main.c -o main.i (2)编译: gcc -S main.i -o main.s (3)汇编: gcc -c main.s -o main.o (4)链接 gcc main.o -o main 执行: ./main 或者:全路径/main 编译链…

讲述 什么是鸿蒙 为什么需要鸿蒙 为什么要学习鸿蒙

首先 我们为什么要学习鸿蒙开发&#xff1f; 因为 鸿蒙发展前景巨大 鸿蒙自发布依赖 一直受社会各界关注 强两百的 App厂商 大部分接受了与鸿蒙的合作 硬件也有非常多与鸿蒙合作的厂商 鸿蒙的合作企业基本已经覆盖整个互联网客户的主流需求 所以鸿蒙的崛起不过是早晚的问题 …

NX二次开发UF_CURVE_ask_line_arc_data 函数介绍

文章作者&#xff1a;里海 来源网站&#xff1a;https://blog.csdn.net/WangPaiFeiXingYuan UF_CURVE_ask_line_arc_data Defined in: uf_curve.h int UF_CURVE_ask_line_arc_data(tag_t line_arc_feat_id, UF_CURVE_line_arc_t * line_arc_data ) overview 概述 Populates…

远程网络安全访问JumpServer:使用cpolar内网穿透搭建固定公网地址

文章目录 前言1. 安装Jump server2. 本地访问jump server3. 安装 cpolar内网穿透软件4. 配置Jump server公网访问地址5. 公网远程访问Jump server6. 固定Jump server公网地址 前言 JumpServer 是广受欢迎的开源堡垒机&#xff0c;是符合 4A 规范的专业运维安全审计系统。JumpS…

小程序中的大道理之三--对称性和耦合问题

再继续扒 继续 前一篇 的话题, 在那里, 提到了抽象, 耦合及 MVC, 现在继续探讨这些, 不过在此之前先说下第一篇里提到的对称性. 注: 以下讨论建立在前面的基础之上, 为控制篇幅起见, 这里将不再重复前面说到的部分, 如果您还没看过前两篇章, 阅读起来可能会有些困难. 这是第一…

电源控制系统架构(PCSA)之系统控制处理器组件

目录 6.4 系统控制处理器 6.4.1 SCP组件 SCP处理器Core SCP处理器Core选择 SCP处理器核内存 系统计数器和通用计时器 看门狗 电压调节器控制 时钟控制 系统控制 信息接口 电源策略单元 传感器控制 外设访问 系统访问 6.4 系统控制处理器 系统控制处理器(SCP)是…

【LeetCode】每日一题 2023_11_24 统计和小于目标的下标对数目(暴力/双指针)

文章目录 刷题前唠嗑题目&#xff1a;统计和小于目标的下标对数目题目描述代码与解题思路 结语 刷题前唠嗑 LeetCode&#xff1f;启动&#xff01;&#xff01;&#xff01; 题目&#xff1a;统计和小于目标的下标对数目 题目链接&#xff1a;2824. 统计和小于目标的下标对数…

Kafka 集群如何实现数据同步

Kafka 介绍 Kafka 是一个高吞吐的分布式消息系统&#xff0c;不但像传统消息队列&#xff08;RaabitMQ、RocketMQ等&#xff09;那样能够【异步处理、流量消峰、服务解耦】 还能够把消息持久化到磁盘上&#xff0c;用于批量消费。除此之外由于 Kafka 被设计成分布式系统&…

Qt 软件调试(二)使用dump捕获崩溃信息

Qt应用程序异常崩溃该怎么办&#xff0c;生成dump文件再回溯分析&#xff0c;可以快速且准确的帮助我们定位到崩溃的点。那么&#xff0c;本章我们分享下如何在Qt中生成dump文件。 一、使用minudump捕获崩溃信息 #include <QCoreApplication> #include <QDir> #i…

判断序列Series中的值是否都不一样 PandasSeries中的方法:is_unique()

【小白从小学Python、C、Java】 【计算机等考500强证书考研】 【Python-数据分析】 判断序列Series中的值是否都不一样 PandasSeries中的方法&#xff1a; is_unique() 选择题 请问下列程序运行的的结果是&#xff1a; import pandas as pd s1 pd.Series([1,2,3]) print("…

[PyTorch][chapter 64][强化学习-DQN]

前言&#xff1a; DQN 就是结合了深度学习和强化学习的一种算法&#xff0c;最初是 DeepMind 在 NIPS 2013年提出&#xff0c;它的核心利润包括马尔科夫决策链以及贝尔曼公式。 Q-learning的核心在于Q表格&#xff0c;通过建立Q表格来为行动提供指引&#xff0c;但这适用于状态…

现货黄金区间交易的两个要点

在现货黄金市场中&#xff0c;我们常碰到横盘区间行情。有区间&#xff0c;就终究会出现突破&#xff0c;因为金价不可能缺乏方向而一直在区间内运行。那既然要突破&#xff0c;我们又应当如何应对和交易呢&#xff1f;下面我们就来讨论一下。 切忌在突破发生时马上跟随突破方向…

EI期刊完整程序:MEA-BP思维进化法优化BP神经网络的回归预测算法,可作为对比预测模型,丰富内容,直接运行,免费

适用平台&#xff1a;Matlab 2020及以上 本程序参考中文EI期刊《基于MEA⁃BP神经网络的建筑能耗预测模型》&#xff0c;程序注释清晰&#xff0c;干货满满&#xff0c;下面对文章和程序做简要介绍。 适用领域&#xff1a;风速预测、光伏功率预测、发电功率预测、碳价预测等多…