RabbitMQ不完整的笔记

news2025/1/15 19:37:27

同步的不足

1、拓展性差,当要添加功能时,需要在原来的功能代码上做修改,高耦合。
2、性能下降,调用者需要等待服务提供者执行完返回结果后,才能继续向下执行
3、级联失败,由于我们是基于OpenFeign调用交易服务、通知服务。当交易服务、通知服务出现故障时,整个事务都会回滚(回滚的范围取决于自己的设定)

而通过RabbitMQ就能解决上述问题,因为其是异步调用

安装 安装docker后,使用docker拉取RabbitMQ的镜像,进行部署

相关概念

publisher: 生产者,也就是发送消息的一方
consumer: 消费者,也就是消费消息的一方
queue: 队列,存储消息。生产者投递的消息会暂存在消息队列中,等待消费者处理
exchange: 交换机,负责消息路由。生产者发送的消息由交换机决定投递到哪个队列
virtual host: 虚拟主机,起到数据隔离的作用。每个虚拟主机相互独立,有各自的exchange、queue
(与MySQL中的不同数据库相似)

SpringAMQP

RabbitMQ采用了AMQP协议,因此它具备跨语言的特性。任何语言只要遵循AMQP协议收发消息,都可以与RabbitMQ交互,而Spring的官方刚好基于RabbitMQ提供了这样一套消息收发的模板工具:SpringAMQP
提供的三个功能:

自动声明队列、交换机及其绑定关系

基于注解的监听器模式,异步接收消息

封装了RabbitTemplate工具,用于发送消息

案例入门

采用的方案结合注解的方式没有使用控制台。方便快速在Spring项目中开发
publisher发送消息到交换机,交换机发送消息到队列,consumer发送接受队列中的消息

导入AMQP依赖

<!--AMQP依赖,包含RabbitMQ-->
<dependency>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

WorkQueues模型

让多个消费者绑定到一个队列,共同消费队列中的消息
有什么好处?
消息的处理速度就能提高
最佳实践
**消息发送到队列,模拟大量消息堆积的队列

/**
     * workQueue
     * 向队列中不停发送消息,模拟消息堆积。
     */
@Test
public void testWorkQueue() throws InterruptedException {
    // 队列名称
    String queueName = "simple.queue";
    // 消息
    String message = "hello, message_";
    for (int i = 0; i < 50; i++) {
        // 发送消息,每20毫秒发送一次,相当于每秒发送50条消息
        rabbitTemplate.convertAndSend(queueName, message + i);
        Thread.sleep(20);
    }
}

消息接受
两个消费者接收队列中的消息,接受者1每20ms接收一个,消费者2每200ms接受一个

@RabbitListener(queues = "work.queue")
public void listenWorkQueue1(String msg) throws InterruptedException {
    System.out.println("消费者1接收到消息:【" + msg + "】" + LocalTime.now());
    Thread.sleep(20);
}
 
@RabbitListener(queues = "work.queue")
public void listenWorkQueue2(String msg) throws InterruptedException {
    System.err.println("消费者2........接收到消息:【" + msg + "】" + LocalTime.now());
    Thread.sleep(200);
}

两个消息接收者都设置了Thead.sleep,模拟任务耗时:
消费者1 sleep了20毫秒,相当于每秒钟处理50个消息
消费者2 sleep了200毫秒,相当于每秒处理5个消息

可是实际测试结果是消费者1和消费者2竟然每人消费了25条消息:
消费者1很快完成了自己的25条消息,消费者2却在缓慢的处理自己的25条消息。
出现这种现象表明队列对于消息的分配并没有考虑到每个消费者的实际能力

优化配置
在yml文件中配置prefetch: 1 (prefetch: 1表示每个消费者每次只能从队列中预取1个消息,消费完就能拿下一次,不需要等轮询。它可以帮助保证每个消息在被消费者处理时都能得到较为均匀的分配,避免某个消费者处理速度慢而导致其他消费者空闲的情况。)

spring:
  rabbitmq:
    listener:
      simple:
        prefetch: 1 # 每次只能获取一条消息,处理完成才能获取下一个消息

交换机

作用: 在接受生产者消息的同时最重要的是如何处理消息,比如是交给所有队列还是交给某个特定的队列

Fanout: 将消息交给所有绑定到交换机的队列,就像上面的案例,默认是每个队列平均的接受消息

Direct: 基于RoutingKey(路由key)发送给订阅了消息的队列,交换机不再把消息交给每个绑定的队列,而是根据消息的Routing Key进行判断,只有队列的Routing Key与消息的Routing Key完全一致,这个队列才能接收到消息

总结
Direct交换机与Fanout交换机的差异?

Fanout交换机将消息路由给每一个与之绑定的队列
Direct交换机根据RoutingKey判断路由给哪个队列
如果多个队列具有相同的RoutingKey,则与Fanout功能类似

Topic交换机
在Direct交换机中它对于队列接收消息的选择性是单一的,只能被单个的Routing Key绑定,而如果在将队列绑定Key时使用通配符绑定 就能将队列同时绑定多个Key
比如:

#:匹配一个或多个词
*:匹配不多不少恰好1个词
item.#:能够匹配item.spu.insert 或者 item.spu
item.*:只能匹配item.spu

接下来正式开始编码使用RabbitMQ,直接使用注解声明队列和交换机
消息发送者

/**
 * topicExchange
 */
@Test
public void testSendTopicExchange() {
    // 交换机名称
    String exchangeName = "hmall.topic";
    // 消息
    String message = "喜报!孙悟空大战哥斯拉,胜!";
    // 发送消息
    rabbitTemplate.convertAndSend(exchangeName, "blue", message);
}

消息接受者
声明Direct模式的交换机和队列

@RabbitListener(bindings = @QueueBinding(
    value = @Queue(name = "direct.queue1"),//声明了队列
    exchange = @Exchange(name = "hmall.direct", type = ExchangeTypes.DIRECT),//声明了direct的交换机
    key = {"red", "blue"}//声明了key
))
public void listenDirectQueue1(String msg){
    System.out.println("消费者1接收到direct.queue1的消息:【" + msg + "】");
}
 
@RabbitListener(bindings = @QueueBinding(
    value = @Queue(name = "direct.queue2"),
    exchange = @Exchange(name = "hmall.direct", type = ExchangeTypes.DIRECT),
    key = {"red", "yellow"}
))
public void listenDirectQueue2(String msg){
    System.out.println("消费者2接收到direct.queue2的消息:【" + msg + "】");
}

结果就是只有listenDirectQueue1才能接收到消息

一、使用 RabbitMQ 作为中间通信层,实现不同编程语言间的通信,使用消息队列完成定时任务,保证功能可靠性

1.1、虚拟机安装RabbitMQ

首先是在虚拟机中安装配置RabbitMQ,基于docker安装,

docker pull rabbitmq

创建并运行 RabbitMQ 容器

docker run -d -p 15672:15672 -p 5672:5672 \
	-e RABBITMQ_DEFAULT_VHOST=my_vhost  \
	-e RABBITMQ_DEFAULT_USER=admin \
	-e RABBITMQ_DEFAULT_PASS=admin \
	--hostname myRabbit \
	--name rabbitmq \
	rabbitmq

1.2、SpringBoot部署RabbitMQ

添加依赖spring-boot-starter-amqp
修改yaml配置

spring:
  #rabbitmq 配置
  rabbitmq:
    host: 192.168.79.202
    username: guest
    password: guest
    #虚拟主机
    virtual-host: /
    #端口
    port: 5672
    listener:
      simple:
        #消费者最小数量
        concurrency: 10
        #消费者最大数量
        max-concurrency: 10
        #限制消费者,每次只能处理一条消息,处理完才能继续下一条消息
        prefetch: 1
        #启动时是否默认启动容器,默认为 true
        auto-startup: true
        #被拒绝时重新进入队列的
        default-requeue-rejected: true
    template:
      retry:
        #启用消息重试机制,默认为 false
        enabled: true
        #初始重试间隔时间
        initial-interval: 1000ms
        #重试最大次数,默认为 3 次
        max-attempts: 3
        #重试最大时间间隔,默认 10000ms
        max-interval: 10000ms
        #重试的间隔乘数,
        #配置 2 的话,第一次等 1s,第二次等 2s,第三次等 4s
        multiplier: 1

        #在 RabbitMQ 中,initial-interval 和 max-interval 是用于指定消息重试机制的两个参数,
        #它们的区别如下:
        #1. initial-interval(初始间隔时间):表示第一次重试的时间间隔,也就是在消息第一次处
        #理失败后,等待多长时间再尝试重新发送消息。这个参数的默认值是 1 秒。
        #2.max-interval(最大间隔时间):表示重试过程中的最大时间间隔,也就是每次重试时,
        #最长等待多长时间再尝试重新发送消息。这个参数的默认值是 10 秒。

1.3、消息发送者

@Autowired
private AmqpTemplate rabbitTemplate;
//这里需要创建AmapTemplate对象,以便调用convertAndSend方法
@Test
public void testSendTopicExchange() {
    // 交换机名称
    String exchangeName = "hmall.topic";
    // 消息
    String message = "检测到摔倒";
    // 发送消息
    rabbitTemplate.convertAndSend(exchangeName, “ " message);
}

这里传入的参数跟使用什么交换机有关系,如果使用funout交换机 就队列交换机和消息,如果使用direct交换机 就需要传入key 交换机名字 消息

在这个例子中生产者使用默认的交换机 所以需要指定队列 而不用指定key(主要是完成语言之间的通信)

1.4、python部署RabbitMQ

python中使用pika操作RabbitMQ

# coding=utf-8
### 消费者

import pika

user_info = pika.PlainCredentials('root', 'root')
connection = pika.BlockingConnection(pika.ConnectionParameters('ip', 5672, '/', user_info))
channel = connection.channel()

# 如果指定的queue不存在,则会创建一个queue,如果已经存在 则不会做其他动作,生产者和消费者都做这一步的好处是
# 这样生产者和消费者就没有必要的先后启动顺序了
channel.queue_declare(queue='hello')


# 回调函数
def callback(ch, method, properties, body):
    print('消费者收到:{}'.format(body))

# channel: 包含channel的一切属性和方法
# method: 包含 consumer_tag, delivery_tag, exchange, redelivered, routing_key
# properties: basic_publish 通过 properties 传入的参数
# body: basic_publish发送的消息


channel.basic_consume(queue='hello',  # 接收指定queue的消息
                      auto_ack=True,  # 指定为True,表示消息接收到后自动给消息发送方回复确认,已收到消息
                      on_message_callback=callback  # 设置收到消息的回调函数
                      )

print('Waiting for messages. To exit press CTRL+C')

# 一直处于等待接收消息的状态,如果没收到消息就一直处于阻塞状态,收到消息就调用上面的回调函数
channel.start_consuming()

队列使用的是默认队列

1.5、整体流程

在前端界面点击 发送get请求 前端根据传过来的是1还是2 就来进行不同的活动

import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class MyController {
    
    @GetMapping("/example")
    public String exampleController(@RequestParam("param1") String param1, @RequestParam("param2") int param2) {
        // 处理参数并返回响应
        return ;
    }

}

是1 就进行摔倒检测
后端使用RabbitMq的生产者的方法 发送消息到队列, python端接收到发送到指定队列的消息后开始调用摔倒检测 摔倒检测因为是使用的yolov8所以没有在本机运行,调用阿里云的服务
持续检测阿里云返回的是什么 如果返回的是摔倒了,就进行后续的活动
是2就通知python进行人脸检测安防

实现了python和java的通信之后再来说说如何实现的定时任务的
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

然后在ServiceImpl填写具体的方法 使用RabbitMQ发送消息

 rabbitTemplate.convertAndSend(RabbitMQConfig.TTL_EXCHANGE, "ttl.test", orderInfo)

发送给ttl交换机 这个交换机绑定了ttl队列,同时ttl队列绑定了死信交换机,这个死信交换机绑定了死信队列

有由于这个ttl队列设置了过期时间,所以过期时间到后,消息就会到死信交换机

死信队列监听到之后就会开始处理,这样就能确保定时任务中不会确保定时没有成功的情况

RabbitMQ如何保证消息不丢失

生产者确认机制
RabbitMQ提供了publisher confirm机制来避免消息发送到MQ过程中丢失。消息发送到MQ以后,会返回一个结果给发送者,表示消息是否处理成功(避免了消息在发送交换机或者发送到队列丢失的情况)

MQ默认是内存存储消息,开启持久化功能可以确保缓存在MQ中的消息不丢失。
当内存出问题还有磁盘兜底

通过交换机持久化,队列持久化,消息持久化

消费者确认
RabbitMQ支持消费者确认机制,消费者处理消息后可以向MQ发送ack回执,MQ收到ack回执后才会删除该消息
最好采用auto的确认模式 即 自动ack 由spring检测istener代码是否出现异常,没有异常就返回ack,抛出异常就返回nack
如果消费者接受消息失败,也可以利用Spring的retry机制,在消费者出现异常时利用本地重试,设置重试次数,当次数达到以后,如果消息依然失败将消息投递到异常交换机,交由人工处理

RabbitMQ消息的重复消费问题

出现的原因是网络抖动等,消费者处理消息后因为网络问题没能成功发送确认给MQ,导致Spring的重试机制,就重复消费了消息
解决方案:
每条消息设置一个唯一的标识id
幂等方案 ,可以加锁

死信交换机(延迟队列)

延迟队列:进入队列的消息会被延迟消费的队列
场景:超时订单、限时优惠、定时发布

延迟队列=死信交换机+TTL
TTL,也就是Time-To-Live。如果一个队列中的消息TTL结束仍未消费,则会变为死信,ttl超时分为两种情况:
消息所在的队列设置了存活时间
消息本身设置了存活时间

在这里插入图片描述

延迟队列插件
可以使用DelayExchange插件
DelayExchange的本质还是官方的三种交换机,只是添加了延迟功能。因此使用时只需要声明一个交换机,交换机的类型可以是任意类型,然后设定delayed属性为true即可。

具体怎么使用
在这里插入图片描述

如果有100万消息堆积在MQ,如何解决

当生产者发送消息的速度超过了消费者处理消息的速度,就会导致队列中的消息堆积,直到队列存储消息达到上限。之后发送的消息就会成为死信,可能会被丢弃,这就是消息堆积问题

增加更多消费者,提高消费速度
在消费者内开启线程池加快消息处理速度
扩大队列容积,提高堆积上限

惰性队列的特征如下:
接收到消息后直接存入磁盘而非内存
消费者要消费消息时才会从磁盘中读取并加载到内存
支持数百万条的消息存储
配置的方式添加
在这里插入图片描述
注解的方式添加
在这里插入图片描述

RabbitMQ高可用机制

在生产环境下,使用集群来保证高可用性
普通集群、镜像集群、仲裁队列
暂时不看

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1705720.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

江协科技STM32学习-0 购买套件

前言&#xff1a; 本文是根据哔哩哔哩网站上“江协科技STM32”视频的学习笔记&#xff0c;在这里会记录下江协科技STM32开发板的配套视频教程所作的实验和学习笔记内容。本文大量引用了江协科技STM32教学视频和链接中的内容。 引用&#xff1a; STM32入门教程-2023版 细致讲…

芋道源码 / yudao-cloud:前端技术架构探索与实践

摘要&#xff1a; 随着企业信息化建设的深入&#xff0c;后台管理系统在企业运营中扮演着至关重要的角色。本文将以芋道源码的yudao-cloud项目为例&#xff0c;深入探讨其前端技术架构的设计思路、关键技术与实现细节&#xff0c;并分享在开发过程中遇到的挑战与解决方案。 一、…

《Python编程从入门到实践》day34

# 昨日知识点回顾 json文件提取数据、绘制图表渐变色显示 # 今日知识点学习 第17章 17.1 使用Web API Web API作为网站的一部分&#xff0c;用于与使用具体URL请求特定信息的程序交互&#xff0c;这种请求称为API调用。 17.1.1 Git 和 GitHub Git&#xff1a;分布式版本控制系…

Java语言ADR药物不良反应系统源码Java+IntelliJ+IDEA+MySQL一款先进的药物警戒系统

Java语言ADR药物不良反应系统源码JavaIntelliJIDEAMySQL一款先进的药物警戒系统源码 ADR药物不良反应监测系统是一个综合性的监测平台&#xff0c;旨在收集、报告、分析和评价药品在使用过程中可能出现的不良反应&#xff0c;以确保药品的安全性和有效性。 以下是对该系统的详细…

【职业教育培训机构小程序】教培机构“招生+教学”有效解决方案

教培机构“招生教学”有效解决方案在数字化转型的浪潮中&#xff0c;职业教育培训机构面临着提升教学效率、拓宽招生渠道、增强学员互动等多重挑战。小程序作为一种新兴的移动应用平台&#xff0c;为解决这些痛点提供了有效途径。 一、职业教育培训机构小程序的核心功能 &…

当传统文化遇上数字化,等级保护测评的必要性

第二十届中国&#xff08;深圳&#xff09;国际文化产业博览交易会5月23日在深圳开幕。本届文博会以创办20年为契机&#xff0c;加大创新力度&#xff0c;加快转型升级&#xff0c;着力提升国际化、市场化、专业化和数字化水平&#xff0c;不断强化交易功能&#xff0c;打造富有…

[数据集][目标检测]RSNA肺炎检测数据集VOC+YOLO格式6012张1类别

数据集格式&#xff1a;Pascal VOC格式YOLO格式(不包含分割路径的txt文件&#xff0c;仅仅包含jpg图片以及对应的VOC格式xml文件和yolo格式txt文件) 图片数量(jpg文件个数)&#xff1a;6012 标注数量(xml文件个数)&#xff1a;6012 标注数量(txt文件个数)&#xff1a;6012 标注…

[集群聊天服务器]----(十一) 使用Redis实现发布订阅功能

接着上文&#xff0c;[集群聊天服务器]----(十)Nginx的tcp负载均衡配置–附带截图&#xff0c;我们配置nginx&#xff0c;使用了多台服务端来提高单机的并发量&#xff0c;接下来我们回到项目中&#xff0c;思考一下&#xff0c;各个服务端之间怎么进行通信呢&#xff1f; 配置…

专业145+总410+成电电子科技大学858信号与系统考研经验电子信息与通信工程,抗干扰,空天,资环,真题,大纲,参考书。

今年考研总分410,专业课858信号与系统145&#xff0c;顺利上岸成电&#xff0c;毕设已经搞得七七八八&#xff0c;就等毕业了&#xff0c;抽空整理回顾一下去年的复习&#xff0c;给群里的同学提供一些参加&#xff0c;少走弯路&#xff0c;对于整体复习的把握有个大概得规划。…

Unity 之 Android 【获取设备的序列号 (Serial Number)/Android_ID】功能的简单封装

Unity 之 Android 【获取设备的序列号 (Serial Number)/Android_ID】功能的简单封装 目录 Unity 之 Android 【获取设备的序列号 (Serial Number)/Android_ID】功能的简单封装 一、简单介绍 二、获取设备的序列号 (Serial Number) 实现原理 1、Android 2、 Unity 三、注意…

notepad++ 模糊替换规则

AUTO_INCREMENT\d AUTO_INCREMENT0 ALTER TABLE .* AUTO_INCREMENT0;

计算机网络——在地址栏输入网址(URL)之后都发生了什么

网址&#xff0c;也叫域名&#xff0c;域名就像一个 IP 地址的可读版本&#xff0c;比如&#xff0c;百度的域名 www.baidu.com&#xff0c;他的 ip 是 110.242.68.3&#xff0c;输入 IP 一样可以跳转到百度搜索的页面&#xff0c;我想没有一个人没去记百度的 IP 吧。其实我们真…

Docker 快速更改容器的重启策略(Restart Policies)以及重启策略详解

目录 1. 使用 docker update 命令2. 在启动容器时指定重启策略3. 在 Docker Compose 文件中指定重启策略4. 总结 官方文档&#xff1a;Start containers automatically 1. 使用 docker update 命令 Docker 提供了 docker update 命令&#xff0c;可以在容器运行时更改其重启策…

Audition 2024 for Mac/Win:音频录制与编辑的卓越之选

随着数字媒体的不断发展&#xff0c;音频内容创作已经成为各行各业中不可或缺的一部分。无论是音乐制作、广播节目、播客录制还是影视配音&#xff0c;都需要高品质的音频录制和编辑工具来实现专业水准的作品。在这个充满竞争的时代&#xff0c;要想在音频创作领域脱颖而出&…

JAVASE总结一

1、 2、引用也可以是成员变量&#xff08;实例变量&#xff09;&#xff0c;也可以是局部变量&#xff1b;引用数据类型&#xff0c;引用&#xff0c; 我们是通过引用去访问JVM堆内存当中的java对象&#xff0c;引用保存了java对象的内存地址&#xff0c;指向了JVM堆内存当中…

java项目启动报错

java项目启动报错&#xff1a;java: java.lang.NoSuchFieldError: Class com.sun.tools.javac.tree.JCTree$JCImport does not have member field ‘com.sun.tools.javac.tree.JCTree qualid’ 原因&#xff1a;编译和运行的版本不一样 点击idea文件 点击项目结构 把这两个版本…

埃及媒体分发投放-新闻媒体通稿发布

埃及商业新闻 大舍传媒近日宣布将在埃及商业新闻领域展开新的媒体分发投放。作为埃及最具影响力的商业新闻平台之一&#xff0c;埃及商业新闻将为大舍传媒提供广阔的市场和受众群体。这一合作意味着大舍传媒将有机会通过埃及商业新闻的平台向埃及的商业精英和投资者传递最新的…

记录一次安装k8s初始化失败

实例化 kubeadm init --configkubeadm.yaml --ignore-preflight-errorsSystemVerification报错 [init] Using Kubernetes version: v1.25.0 [preflight] Running pre-flight checks error execution phase preflight: [preflight] Some fatal errors occurred:[ERROR CRI]: co…

引领智能校对行业的革新者:爱校对

我们很高兴向大家介绍爱校对&#xff0c;这是交互未来&#xff08;北京&#xff09;科技有限公司推出的一款前沿智能校对产品。爱校对的诞生&#xff0c;源自清华大学计算机智能人机交互实验室&#xff0c;结合了最先进的技术与理念&#xff0c;旨在为用户提供高效、精准的智能…

【Chapter5】死锁与饥饿,计算机操作系统教程,第四版,左万利,王英

文章目录 1.1 什么是死锁1.2 死锁的类型1.2.1 竞争资源引起的死锁1.2.2 进程间通信引起的死锁1.2.3 其他原因引起的死锁 1.3 死锁产生必要条件1.4 死锁的处理策略1.5 死锁的预防1.5.1 破坏资源独占条件1.5.2 破坏不可剥夺条件1.5.3 破坏保持申请条件1.5.4 破坏循环等待条件 1.6…