RabbitMQ的介绍和使用

news2024/12/26 12:05:19
1.同步通讯和异步通讯

        举个例子,同步通讯就像是在打电话,因此它时效性较强,可以立即得到结果,但如果你正在和一个MM打电话,其他MM找你的话,你们之间是不能进行消息的传递和响应的

        异步通讯就像是微信,你可以和多个MM进行消息的传递,对方可以立即响应或者有空了在”回“你

        同步调用问题

        微服务间基于Feign的调用就属于同步方式,存在一些问题。

        1.代码耦合:如果后续需要添加业务,需要不断修改支付服务的代码

        2.性能下降,吞吐量下降:支付服务一直在等待,订单等服务的响应,cpu资源一直在占用。

        3.级联失败:如果服务提供者出现问题,所有调用方都会跟着出问题。

2.异步调用方案

        异步调用常见的就是事件驱动模式:Broker是事件代理者,一旦用户支付成功,这就是一个事件,这个事件就会被Broker管理,订单服务、仓储服务、短信服务。就会找Broker,这个叫做订阅事件。一旦用户支付成功之后,Broker就会通知被订阅过事件的服务,支付服务完成事件发布之后,就结束了服务,返回给用户

        优点:

        解决代码解耦:添加业务时不需要再更改支付服务的代码,支付服务只需要发布事件就行。至于后面的业务支付服务可以不用考虑。

        性能提升,吞吐量提供:相比较同步服务,业务处理时间的累加,支付服务还需要等待其他服务完成并响应,通过异步的方式,支付服务发布时间之后就结束服务,无需等待其他服务响应。提升了性能,和吞吐量

        服务没有依赖关系,不用担心级联失败问题

        流量削峰: 有多个事件发布,可以囤积到broker上,订阅该事件的服务可以按自己的处理能力来稳步进行。broker起到缓冲作用

        缺点:

        依赖Broker的可靠性、安全性、吞吐能力

        架构复杂了,业务没有明显的流程线,不好追踪管理

3.什么是MQ

MQ(MessageQueue),中文是消息队列,字面来看就是存放消息的队列。也就是事件驱动架构中的Broker

常见的MQ

RabbitMQ,适用于中小型企业开发,如果对性能要求比较高的并且需要定制服务的大型企业推荐使用Kafka。下面会介绍RabbitMQ的使用。

4.RabbitMQ概述和安装

RabbitMQ概述

RabbitMQ是基于Erlang语言开发的开源消息通信中间件,官网地址:https://www.rabbitmq.com

RabbitMQ的部署

环境:centos7,docker在线拉取的方式部署。

#拉取RabbitMQ镜像

输入:docker pull rabbitmq:3-management

#设置默认用户名密码并启动容器

docker run --name rabbitmq -e RABBITMQ_DEFAULT_USER=root -e RABBITMQ_DEFAULT_PASS=123 -p 15672:15672 -p 5672:5672 -d rabbitmq:3-management

端口15672是rabbitMQ的ui界面,5672是服务端口

ui界面展示:

RabbitMQ结构和概念

PubLisher是我们的消息(事件)发送者,consumer是消息的消费者,PubLisher将来会把消息发送到我们的exchange(交换机)上,交换机负责路由,并把消息投射到queue(队列),queue负责暂存消息,consumer负责从queue里面获取消息处理消息。

 

5.RabbitMQ的常见消息模型

一、基本消息队列(BasicQueue)

HelloWorld是最基本的消息队列模型,实现的话,包含三个角色

publisher:消息发布者,将消息发送到队列queue

queue:消息队列,负责接受并缓存消息

consumer:订阅队列,处理队列中的消息

官方提供的编码方式非常麻烦,下面我们介绍学习一下SpringAMQP,它可以大大简化我们消息发送和接收API。

SpringAMQP简介

AMQP:是用于在应用程序或之间传递业务消息的开放标准,该协议与语言的平台无关,更符合微服务中独立性的要求。

Spring AMQP:是基于AMQP协议定义的一套API规范,提供了模板来发送和接收消息,包含两部分,其中spring-amqp是基础抽象,spring-rabbit是底层的默认实现。

利用SpringAMQP实现基础消息队列功能

通过rabbitTemplate提供的convertAndSend就可以实现消息的发送

引入相关依赖

<dependency>
    <groupId>org.springframework.amqp</groupId>
    <artifactId>spring-rabbit</artifactId>
    <scope>test</scope>
</dependency>

publisher(消息发布者)的application.yml的配置

test的测试代码:

@RunWith(SpringRunner.class)
@SpringBootTest
class PublisherApplicationTests {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    void contextLoads() {
        String queuename="simple queue";
        String message="hello,spring AMQP";
        rabbitTemplate.convertAndSend(queuename,message);
    }

}

通过rabbitTemplate实现对队列消息的监听

引入依赖

<dependency>
    <groupId>org.springframework.amqp</groupId>
    <artifactId>spring-rabbit</artifactId>
    <scope>test</scope>
</dependency>

配置consumer(消息接收者),的application.yml文件

spring:
  rabbitmq:
    host: 192.168.10.8 #主机名
    port: 5672         #端口
    username: root      #用户名
    password: 123       #密码
    virtual-host: /     #虚拟主机名

监听类的代码

@Component
public class SimpleListener {
    @RabbitListener(queues = "simple queue")
    public void ListenSimpleQueue(String msg)
    {
        System.out.println("消费者接收到simple queue的消息:"+msg);
    }
}

ps:消息一旦消费就会从队列中删除,RabbitMQ没有消息回溯功能。

二、工作消息队列(WorkQueue)

工作队列的结构如下:

工作消息队列的结构,相比于基础消息队列多了个消费者,因为rabbitMQ阅后即焚的特性这两个消费者属于共同工作的关系,如果有50个消息,他们两个消费者就会一人分一半,也就是一人25条,为啥会多一个消费者?这是因为如果一个消费者每次处理40个消息,但是publisher一次发布50个消息,多出来的消息会存储在queue里面,又因为queue是占用内存的假以时日,内存就会爆满,新的消息就存不进去了,多一个消费者每次就可以处理80条消息,可以有效解决这个问题。

work queue,工作队列,可以提高消费处理速度,避免队列消息堆积。但是这里有一个消息预取机制 ,消费者会提前把消息拿过来,因此消息是平局分配,并不是“能者多劳”的模式,通过设置prefetch的值来实现每次只能获取一条消息,处理完成才能接取下一个消息。实现“能者多劳”的模式。

  1. 发布订阅(Publish、Subscribe)

基础消息队列和工作消息队列都是一条信息只被一个消费者消费,消费完就删除,显然不能实现我们之前预想的完成支付之后,通知仓储、短信等服务。这就需要我们了解学习发布订阅模式。发布订阅模式与之前案例的区别就是允许同一消息发送给多个消费者,实现方式是加入exchange(交换机),结构如下:

publisher将消息发送给exchange(交换机),交换机把这个消息转发给队列,因此,发布者(publisher)并不需要知到转发给了那个 队列或多个队列,转发给多个队列,这种方式就能实现被多个消费者消费,那么交换机到底是发给一个还是多个呢?这是由交换机类型来决定的。常见的exchange的类型包括:

广播:Fanout

路由:Direct

主题:Topic

注意: exchange负责消息路由,而不是储存,路由失败则消息丢失。

广播-Fanout Exchange

Fanout Exchange 会将接收到的消息路由到每一个绑定的queue。

实现思路:

1.在consumer服务中,利用代码声明队列、交换机、并将两者绑定。

在consumer服务上添加@Configuration注解,并声明FanoutExchange、Queue和关系对象Binding,代码如下:

@Configuration
public class FanoutConfig {
    //声明交换机对象
    @Bean
    public FanoutExchange fanoutExchange()
    {
        return new FanoutExchange("fanout");
    }
    //声明队列1
    @Bean
    public Queue fanoutqueue1()
    {
        return new Queue("fanoutqueue1");
    }
    //绑定队列一到交换机
    @Bean
    public Binding fanoutBinding1(Queue fanoutqueue1,FanoutExchange fanoutExchange)
    {
        return BindingBuilder.bind(fanoutqueue1).to(fanoutExchange);
    }
    //声明队列2
    @Bean
    public Queue fanoutqueue2()
    {
        return new Queue("fanoutqueue2");
    }
    //绑定队列二到交换机
    @Bean
    public Binding fanoutBinding2(Queue fanoutqueue2,FanoutExchange fanoutExchange)
    {
        return BindingBuilder.bind(fanoutqueue2).to(fanoutExchange);
    }
}

2.在consumer服务中,编写两个消费者方法,分别监听fanout.queue1和fanout.queue2

@RabbitListener(queues = "fanoutqueue1")
public void ListenSimpleQueue3(String msg) throws InterruptedException {
    System.out.println("消费者222接收到fanoutqueue1的消息:"+msg);
    Thread.sleep(100);
}
@RabbitListener(queues = "fanoutqueue2")
public void ListenSimpleQueue4(String msg) throws InterruptedException {
    System.out.println("消费者222接收到fanoutqueue2的消息:"+msg);
    Thread.sleep(100);
}

3.在publisher中编写测试方法,向fanout发送消息。

@Test
public void contectFonoutExchange()
{
    //交换机名称
    String exchangeName="fanout";
    //消息内容
    String message="hello everyone";
    //发送消息
    rabbitTemplate.convertAndSend(exchangeName,"",message);
}

路由-DirectExchange

Direct Exchange 会将接收到的消息根据规则路由到指定的Queue,因此称为路由模式(routes)

每一个Queue都与Exchange设置一个BindingKey,一个队列可以绑定多个BindingKey。

发布者发送消息时,指定消息的RoutingKey

Exchange(交换机)将消息路由到Bindingkey与消息RoutingKey一致的队列

实现思路:

1.利用@RabbitListenner声明Exchange、Queue、RountingKey

2.在consumer服务中编写两个消费者方法,分别监听queue1和queue2

在我们的监听类里面增加两个方法,用来声明交换机、队列和RountingKey

//发布订阅DirectExchange
@RabbitListener(bindings = @QueueBinding(
        value = @Queue(name = "queue1"),
        exchange = @Exchange(name = "direct",type= ExchangeTypes.DIRECT),
        key = {"red","blue"}
))
public void listenerDirectqueue1(String msg)
{
    System.out.println("消费者接收到direct.queue1的消息"+msg);
}

@RabbitListener(bindings = @QueueBinding(
        value = @Queue(name = "queue2"),
        exchange = @Exchange(name = "direct",type= ExchangeTypes.DIRECT),
        key = {"red","yellow"}
))
public void listenerDirectqueue2(String msg)
{
    System.out.println("消费者接收到direct.queue2的消息"+msg);
}

3.在publisher中编写测试方法,向Exchange发送消息。

@Test
public void contextDirectExchange()
{
    String exchangeName="direct";
    String msgblue="hello blue";
    String msgRed="hello red";
    String msgYellow="hello yellow";
    rabbitTemplate.convertAndSend(exchangeName,"blue",msgblue);
}

@Test
public void contextDirectExchange()
{
    String exchangeName="direct";
    String msgblue="hello blue";
    String msgRed="hello red";
    String msgYellow="hello yellow";
    rabbitTemplate.convertAndSend(exchangeName,"red",msgRed);
}

话题-TopicExchange

TopicExchange与DirectExchange类似,区别在于rountingKey必须是多个单词的列表,并且以"."分割。Queue与Exchange指定BindingKey可以使用通配符

#:代表0个或多个单词

*:代表一个单词

我们使用Direct的时候一个队列如果绑定了很多key,会非常麻烦,通配符的引入就把key的绑定简化许多,原来绑定多个key现在只需要绑定一个key。

实现思路:

1.利用@RabbitListenter声明Exchange、Queue、RoutingKey

2.在consumer服务中,编写两个消费者方法,分别监听topic.queue1和topic.queue2

//topic话题
@RabbitListener(bindings = @QueueBinding(
        value = @Queue(name = "topic.queue1"),
        exchange = @Exchange(name = "topic",type = ExchangeTypes.TOPIC),
        key ="china.#"
))
public void listennertopicqueue1(String msg)
{
    System.out.println("消费者接收到topic.queue1的消息"+msg);
}
@RabbitListener(bindings = @QueueBinding(

        value = @Queue(name = "topic.queue2"),
        exchange = @Exchange(name = "topic",type = ExchangeTypes.TOPIC),
        key ="#.news"
))
public void listennertopicqueue2(String msg)
{
    System.out.println("消费者接收到topic.queue2的消息"+msg);
}

3.在publisher中编写测试方法,向交换机topic发送消息

@Test
public void contextTopicExchange()
{
    String exchangeName="topic";
    String msg="我是懒大王";
    rabbitTemplate.convertAndSend(exchangeName,"china.news",msg);
}

6.消息转换器

在SpringAMQP的发送方法中,接收消息的类型是Object,也就是说我们可以发送任意对象类型的消息,SpringAMQP会帮我们序列化为字节后发送。

Spring的消息对象处理是由MessageConcerter来处理的,而默认实现是SimpleMessageConverter,基于JDK的ObjectOutputStream完成序列化,这种序列化方式,比较浪费内存资源,如果需要修改,只需要定义一个MessageConverter类型的Bean即可。推荐用JSON方式序列化,步骤如下:

我们在publisher服务引入依赖

我们在publisher服务中声明MessageConverter

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1648517.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

基于GEE遥感影像处理和长时序土地分类以及生物量估算分析

简介 Google Earth Engine云平台是目前全球范围内测绘领域内使用最为广泛的遥感云计算平台&#xff0c;其凭借强大的数据存储和云计算能力&#xff0c;极大了提高了全球科研工作者的科研产出&#xff0c;每年借助GEE平台发布的各类期刊论文超1000篇&#xff0c;在海量遥感数据的…

鉴源实验室丨汽车入侵检测系统介绍及测试

作者 | 张诏景 上海控安可信软件创新研究院工控网络安全组 来源 | 鉴源实验室 社群 | 添加微信号“TICPShanghai”加入“上海控安51fusa安全社区” 01 入侵检测系统背景 智能网联汽车不再是一个孤立的嵌入式系统了&#xff0c;信息安全问题越来越被重视。国内外发布了多项标…

Jmeter 命令行压测 生成 HTML 测试报告,你真的会?

通常 Jmeter 的 GUI 模式仅用于调试&#xff0c;在实际的压测项目中&#xff0c;为了让压测机有更好的性能&#xff0c;多用 Jmeter 命令行来进行压测。 同时&#xff0c;JMeter 也支持生成 HTML 测试报告&#xff0c; 以便从测试计划中获得图表和统计信息。 以上定义的文件路…

无人售货机是否是下个风口?

当前&#xff0c;众多大中城市正逐步转变为无人零售的新兴试验场&#xff0c;其广阔的发展前景与潜在价值日益受到瞩目。据统计数据显示&#xff0c;无人零售领域已吸引超过650亿元的投资注入。未来五年内&#xff0c;无人零售渠道在中国快消品市场有望迎来爆发性增长&#xff…

2024智能电网与能源系统国际学术会议(ICSGES2024)

2024智能电网与能源系统国际学术会议&#xff08;ICSGES2024) 会议简介 我们诚挚邀请您参加将在南京隆重举行的2024年智能电网与能源系统国际学术会议&#xff08;ICSGES2024&#xff09;。南京&#xff0c;一座历史与现代交织的城市&#xff0c;将为这场盛会提供独特的学术…

10页面结构分析

我们打开一个网页&#xff0c;都会有一个清晰的结构和布局上图中的标签就是用来划分各个部分区域用的。其中比较常用重要的是header、footer和nav&#xff0c;需要重点掌握。 下面是部分代码及效果演示 <header> <h2>网页头部</h2> </header><sec…

安泰ATA-309C:功率放大器的分类及区别是什么

功率放大器是一种电子器件&#xff0c;用于将低功率信号放大到更高功率&#xff0c;以驱动负载或增强信号强度。功率放大器根据其工作原理、电路拓扑和应用领域的不同&#xff0c;可以分为多种类型。下面将介绍几种常见的功率放大器分类及其区别。 A类功率放大器&#xff1a;A类…

做好源代码防泄密的10条准则

#深度好文计划# 近年来&#xff0c;电脑以及互联网应用在中国的普及和发展&#xff0c;已经深入到社会每个角落&#xff0c; 政府&#xff0c;经济&#xff0c;军事&#xff0c;社会&#xff0c;文化和人们生活等各方面都越来越依赖于电脑和网络。企业需要花费大量的时间精力去…

搞定 TS 装饰器,让你写 Node 接口更轻松

前言 亲爱的小伙伴&#xff0c;你好&#xff01;我是 嘟老板。你是否用过 TypeScript 呢&#xff1f;对 装饰器 了解多少呢&#xff1f;有没有实践应用过呢&#xff1f;今天我们就来聊聊 装饰器 的那点事儿&#xff0c;看看它有哪些神奇的地方。 什么是装饰器 咱们先来看一段…

Spring中的Bean相关理解

在Spring框架中&#xff0c;Bean是一个由Spring IoC容器实例化、配置和管理的对象。Bean是一个被Spring框架管理并且被应用程序各个部分所使用的对象。Spring IoC容器负责Bean的创建、初始化、依赖注入以及销毁等生命周期管理。 注&#xff1a;喜欢的朋友可以关注公众号“JAVA学…

学习软考----数据库系统工程师25

关系规范化 1NF&#xff08;第一范式&#xff09; 2NF&#xff08;第二范式&#xff09; 3NF&#xff08;第三范式&#xff09; BCNF&#xff08;巴克斯范式&#xff09; 4NF&#xff08;第四范式&#xff09; 总结

茶多酚复合纳米纤维膜

茶多酚复合纳米纤维膜是一种结合了茶多酚与纳米纤维技术的创新材料。茶多酚作为茶叶中多酚类物质的总称&#xff0c;具有抗氧化、抗辐射、抗*等多种药理作用&#xff0c;是一种非常有益的天然物质。而纳米纤维膜则因其超细纤维结构、高比表面积和高孔隙率等特性&#xff0c;在过…

深度解读:Agent AI智能体如何重塑我们的现实和未来|TodayAI

​​​​​​​ 一、 引言 在当今时代&#xff0c;人工智能&#xff08;AI&#xff09;技术的快速发展正不断改变着我们的生活与工作方式。尤其是Agent AI智能体&#xff0c;作为AI技术中的一种重要形式&#xff0c;它们通过模拟人类智能行为来执行各种复杂任务&#xff0c;从…

基于51单片机的电子钟秒表LCD1602仿真设计( proteus仿真+程序+设计报告+原理图+讲解视频)

基于51单片机的电子钟秒表LCD1602仿真设计( proteus仿真程序设计报告原理图讲解视频&#xff09; 这里写目录标题 1. 主要功能&#xff1a;2. 讲解视频&#xff1a;3. 仿真4. 程序代码5. 设计报告6. 原理图7. 设计资料内容清单&&下载链接 仿真图proteus7.8及以上 程序…

pynq7020系列的资源有多少

pynq系列的资源有多少 对比 查找表107&#xff0c;273 39.14 140&#xff0c;537 51.28查找表随机存储器17&#xff0c;457 12.12 19&#xff0c;524 13.56触发器67&#xff0c;278 12.27 81&#xff0c;453 14.95 Block RAMs ( 36 KB ) 264.5 29.00 457 50.11 Table 1: Zynq-…

英语口语情景对话视频软件分享!

在当今全球化的时代&#xff0c;英语已成为一种通用的国际语言。为了提高英语口语能力&#xff0c;越来越多的人选择使用英语口语情景对话视频软件。本文将为您推荐几款备受欢迎的英语口语情景对话视频软件&#xff0c;帮助您轻松提高英语口语水平。 AI外语陪练 AI外语陪练软件…

计量校准使用测量不确定度具备什么意义?有哪些作用?

一些企业在收到校准报告/证书时&#xff0c;会看到证书之中有“测量不确定度”一栏&#xff0c;因此会有很多人咨询&#xff0c;什么是不确定度&#xff0c;它的作用是什么&#xff1f; 计量校准报告的不确定度 如果以书面形式理解的话&#xff0c;相信很多朋友都不明白不确定…

适合小白使用的编译器(c语言和Java编译器专属篇)

本节课主要讲如何安装适合编程小白的编译器 废话不多说&#xff0c;我们现在开始 c/c篇 首先&#xff0c;进入edge浏览器&#xff0c;在搜索框输入visual studio &#xff0c;找到带我画圈的图标&#xff0c;点击downloads 找到community版&#xff08;社区版&#xff09;的下…

什么是web3D?应用场景有哪些?如何实现web3D展示?

Web3D是一种将3D技术与网络技术完美结合的全新领域&#xff0c;它可以实现将数字化的3D模型直接在网络浏览器上运行&#xff0c;从而实现在线交互式的浏览和操作。 Web3D通过将多媒体技术、3D技术、信息网络技术、计算机技术等多种技术融合在一起&#xff0c;实现了它在网络上…

【JAVA |基础】运算符、程序逻辑控制以及方法的使用

目录 一、前言 二、操作符 1.算术运算符 2.赋值运算符 3.比较运算符 4.逻辑运算符 5.条件&#xff08;三目、三元&#xff09;运算符 6.位运算符(都是基于二进制来计算) 三、 程序逻辑控制 1.顺序结构 2.分支结构 if语句 Switch语句 3.循环结构 while语句 for循环…