RabbitMQ(消息队列)

news2025/1/12 5:51:36

RabbitMQ

它是消息中间件,是在消息的传输过程中保存消息的容器,实现应用程序和应用程序之间通信的中间产品。目前主流消息队列通讯协议是AMQP(二进制传输,支持多种语言)、JMS(HTTP传输,只支持Java)。

特点:每秒十万左右级别、消息延迟在微秒级、完整的消息确认机制、并发能力强、性能好。

常见MQ

  • ActiveMQ基于JMS,每秒数万级别
  • RabbitMQ基于AMQP,每秒十万级别
  • RocketMQ是阿里的产品,基于JMS,每秒十万级别,经历过双十一
  • Kafka自定义协议,每秒百万级别

体系结构

分为:服务器、交换器、队列;

服务器:负责管理所有的交换器和队列,一个RabbitMQ内有多个服务器,(为了避免每次发送消息都建立TCP连接,有了很多的服务器,每个线程建立单独的服务器进行通讯)每个服务器负责一部分交换器和队列,之间通过 HTTP 协议通信;

交换机:负责接收、路由、传递消息,支持多种交换器类型,每个类型有不同的消息传递方式和使用场景;

队列:负责存储消息,支持多种队列,都有不同的存储方式;

安装:

使用docker方式

# 拉取镜像
docker pull rabbitmq:3.8.12-management
# 注意:在拉取镜像,遇到missing signature key问题,需要提升docker的版本

# 运行容器
# -d 参数:后台运行 Docker 容器
# -e 参数:设置容器内的环境变量,这里我们设置了登录RabbitMQ管理后台的默认用户和密码
docker run --name rabbitmq -d -p 5672:5672 -p 15672:15672 -e RABBITMQ_DEFAULT_USER=guest -e RABBITMQ_DEFAULT_PASS=123456 rabbitmq:3.8.12-management

# 启动成功,MQ的客户端页面,输入所设置的用户名和密码
# 访问:http://xxx:15672

# 如果访问不通,需要开放端口
firewall-cmd --zone=plublic -add-pord=5672/tcp --add-pord=15672/tcp --permanent
success
firewall-cmd --reload
success

发送消息

需要引包

<dependency>
     <groupId>com.rabbitmq</groupId>
     <artifactId>amqp-client</artifactId>
     <version>5.20.0</version>
</dependency>

生产者

// 生产者 - 产生消息
public static void main(String[] args) throws Exception {
    ConnectionFactory connectionFactory = new ConnectionFactory();
    connectionFactory.setHost("IP地址");
    connectionFactory.setPort(5672);
    connectionFactory.setVirtualHost("/");
    connectionFactory.setUsername("guest");
    connectionFactory.setPassword("123456");
    // 创建连接
    Connection connection = connectionFactory.newConnection();
    // 创建频道
    Channel channel = connection.createChannel();
    // 创建队列,参数:队列名称、是否定义持久化队列、是否独占本次连接、是否在不使用的时候自动删除队列、其他参数
    channel.queueDeclare("new_queue", true, false, false, null);
    String message = "发送的消息的内容:123";
    // 参数:交换机名称,默认Default Exchange、队列名称、配置信息、消息内容
    channel.basicPublish("", "new_queue", null, message.getBytes());
    // 关闭资源  
    channel.close();
    connection.close();
}

创建一个队列,并有消息待查看,点击该队列的名称,在Get messages处,可以查看该消息的信息;

查看队列:

在这里插入图片描述
在这里插入图片描述
消费者

// 消费者 - 要消费消息
public static void main(String[] args) throws Exception {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("IP地址");
    factory.setPort(5672);
    factory.setVirtualHost("/");
    factory.setUsername("guest");
    factory.setPassword("123456");
    // 创建连接 Connection        
    Connection connection = factory.newConnection();
    // 创建频道  
    Channel channel = connection.createChannel();
    // 接收消息  
    DefaultConsumer consumer = new DefaultConsumer(channel) {
        // 回调方法,当收到消息后,会自动执行该方法  
        // 参数:标识、获取一些信息,交换机等、配置信息、数据(消息内容)
        @Override
        public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
            System.out.println("consumerTag:" + consumerTag);
            System.out.println("Exchange:" + envelope.getExchange());
            System.out.println("RoutingKey:" + envelope.getRoutingKey());
            System.out.println("properties:" + properties);
            System.out.println("body:" + new String(body));
        }
    };
    // 监听程序,用来监听消息,参数:队列、是否自动确认、回调对象  
    channel.basicConsume("new_queue", true, consumer);
}

如果有被消费者消费,会在管理页中,该队列的 Ready进行-1;

工作模式

简单模式

生产者(只有一个)、消费者(只有一个)、消息储存在队列中;

工作队列模式

生产者(只有一个)、消费者(有多个)、消息储存在队列中;消费者谁抢到算谁的。

发布订阅模式

生产者(只有一个)、消费者(有多个)、交换机、多个队列;

生产者把消息发送给交换机,交换机处理消息取决于交换机的类型,交换机根据类型把消费存在对应的队列中,消费者(多个)满足规则都可以得到消息;

交换机有3种类型

➢ Fanout:广播,将消息交给所有绑定到交换机的队列

➢ Direct:定向,把消息交给符合指定routing key 的队列

➢ Topic:通配符,把消息交给符合routing pattern(路由模式) 的队列

路由模式

队列与交换机的绑定,使用Direct,消费者监听的队列,该队列与交换机绑定的路由件匹配,该消费者可以收到消息。其他消费者也监听该队列,但路由件不匹配,不会收到消息。

查看交换机

在这里插入图片描述

交换机与队列的绑定

在这里插入图片描述
在这里插入图片描述

主题模式(通配符匹配)

这个与路由模式类似,只是这个支持通配符绑定,# 匹配一个或多个词,* 匹配不多不少恰好1个词。

创建

创建交换机

在这里插入图片描述

创建队列

在这里插入图片描述

队列与交换机绑定

在这里插入图片描述

整合SpringBoot

引包

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

更改配置文件

spring: 
  rabbitmq: 
    host: IP地址
    port: 5672 
    username: guest 
    password: 123456 
    virtual-host: /
    # 来保证消息的可靠性
    publisher-confirm-type: CORRELATED # 交换机的确认
    publisher-returns: true # 队列的确认

代码

// 生产者
@Autowired  
private RabbitTemplate rabbitTemplate;
  
public void testSend() {  
    rabbitTemplate.convertAndSend("交换机","路由键","消息内容");  
}  
// 消费者 (durable 是否持久化)
@RabbitListener(bindings = @QueueBinding(value = @Queue(value = "队列名字", durable = "true"),
            exchange = @Exchange(value = "交换机"),
            key = {"路由键"}
))
public void process(String dateString,Message message,Channel channel) {
    log.info("消息内容:"+ dateString);
}

配置类

生产者保障消息的可靠性

生产者 - 保障消息是否发送到队列或者交换机

@Component
public class MQAckConfig 
    implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnsCallback {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @PostConstruct
    public void init() {
        rabbitTemplate.setConfirmCallback(this);
        rabbitTemplate.setReturnsCallback(this);
    }

    /**
     * 确认消息是否发送到交换机
     */
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        if (ack) {
            log.info("消息发送到交换机 - 成功!数据:" + correlationData);
        } else {
            log.info("消息发送到交换机 - 失败!数据:" + correlationData + " 原因:" + cause);
        }
    }

    /**
     * 确认消息是否发送到队列 - 只有发送失败的时候才会调用该方法
     */
    @Override
    public void returnedMessage(ReturnedMessage returned) {
        log.info("消息主体: " + new String(returned.getMessage().getBody()));
        log.info("应答码: " + returned.getReplyCode());
        log.info("描述:" + returned.getReplyText());
        log.info("消息使用的交换器 exchange : " + returned.getExchange());
        log.info("消息使用的路由键 routing : " + returned.getRoutingKey());
    }

}

消费者保障消息的可靠性

消费者 - 保障消息真的收到,改为手动确认ACK

spring:
  rabbitmq:
    ...
    listener:
      simple:
        acknowledge-mode: manual # 把消息确认模式改为手动确认

消费者手动ACK

@RabbitListener( 
    // 设置绑定关系
    bindings = @QueueBinding(
        // 配置队列信息:durable 设置为 true 表示队列持久化;autoDelete 设置为 false 表示关闭自动删除
        value = @Queue(value = QUEUE_NAME, durable = "true", autoDelete = "false"),
        // 配置交换机信息:durable 设置为 true 表示队列持久化;autoDelete 设置为 false 表示关闭自动删除
        exchange = @Exchange(value = EXCHANGE_DIRECT, durable = "true", autoDelete = "false"),
        // 配置路由键信息
        key = {ROUTING_KEY}
))
public void processMessage2(String dataString, Message message, Channel channel) throws IOException {
    // 注意: 重置消息,需要考虑幂等性
    long tag = message.getMessageProperties().getDeliveryTag();
    try {
        log.info("消费者 - 接收消息:" + dataString);
        // System.out.println(10 / 0);  // 手动制造出异常,让消息回到队列中
        channel.basicAck(tag, false);
    } catch (Exception e) {
        // 获取信息,查看此消息是否曾经被投递过
        Boolean redelivered = message.getMessageProperties().getRedelivered();
        if (!redelivered) {
            // 没有被投递过,那就重新放回队列,重新投递,再试一次
            channel.basicNack(tag, false, true);
        } else {
            // 已经被投递过,且这一次仍然进入了 catch 块,那么返回拒绝此消息,不会回到MQ队列中
            channel.basicReject(tag, false);
        }
    }
}

消费者-限流

大量消息进入队列中,消息队列中消息有1万,设置每次最多从队列取回1000个消息,并发能力只能处理1000个请求,消费端-最多只处理1000个请求;

限流配置

spring:
  rabbitmq:
    ...
    listener:
      simple:
        acknowledge-mode: manual # 把消息确认模式改为手动确认
        prefetch: 10  # 设置消费者,每次最多从消息队列服务器取回多少消息,并不是一下把队列中的消息全部取走

从队列中Ready,变成所设置的值,从此起到了限流的作用,消息者处理ACK个数也下降。

在这里插入图片描述

消息超时

可在创建队列的时候,设置参数:x-message-ttl = 3000 (毫秒值),当我们生产者发送消息到队列,队列里的消息没有被消费者消费时,可通过队列里的消息超时时间,进行丢弃消息。

@Autowired
private RabbitTemplate rabbitTemplate;

MessagePostProcessor processor = (Message message) -> {
    // 设定超时时间,单位 (毫秒)
    message.getMessageProperties().setExpiration("7000");
    return message;
};
rabbitTemplate.convertAndSend("交换机名", "路由键", "生产者发送消息 - 消息超时 - " + processor);

死信

一个消息无法被消费,会变成死信;产生的原因:消费者拒绝:basicNackbasicReject这两个方法不把消息重新放回队列、队列溢出:队列里的消息达到数量的限制、消息超时:超时时间未被消费;

解决:丢弃(一般不重要数据)、入库(记录日志、后续处理)、监听(进入死信队列,监听死信队列进行处理)

前提准备:

创建死信交换机、死信队列、死信路由键,互相绑定;

创建正常交换机、队列、路由键,互相绑定;
在这里插入图片描述

// 前提把 死信、正常都创建好,绑定好
// 监听正常队列
@RabbitListener(queues = {‘正常队列’})
public void processMessageNormal(String dateString, Message message, Channel channel) throws IOException {
    // 监听正常队列,但是拒绝消息,进行消息拒绝
    log.info("【正常】接受到消息:" + dateString);;
    channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
}
// 监听死信队列
@RabbitListener(queues = {‘死信队列’})
public void processMessageDead(String dataString, Message message, Channel channel) throws IOException {
    // 监听死信队列
    log.info("【死信】接受到消息 = " + dataString);
    channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}

延迟队列

方案一:

生产者发送消息,到队列中,该队列配置消息超时时间,并没有消费者进行监听该队列(进行监听消费),超时进入死信队列,(监听死信队列)就是延迟队列的一种配置。

方案二:

安装插件,默认消息存放最多2天

地址:https://github.com/rabbitmq/rabbitmq-delayed-message-exchange

下载插件的网址:https://www.rabbitmq.com/community-plugins.html

事务

该事务处理仅仅在java层面,生产者1发送消息,生产者2发送消息,保障其中有一个发送失败,都要失败,需要添加配置类:

@Configuration
@Data
public class RabbitConfig {

    @Bean
    public RabbitTransactionManager transactionManager(CachingConnectionFactory connectionFactory) {
        return new RabbitTransactionManager(connectionFactory);
    }

    @Bean
    public RabbitTemplate rabbitTemplate(CachingConnectionFactory connectionFactory) {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        rabbitTemplate.setChannelTransacted(true);
        return rabbitTemplate;
    }
}

惰性队列

一般队列创建是默认,并不是惰性队列,惰性队列适用场景:在非常长的队列(百万条消息),生产者的速度超过消费者,消费者处理慢,使用惰性队列;它把消息放在队列中,并不是马上进行持久化操作,是在有空闲时,当队列达到百分之多少时,再进行数据持久化操作。

// x-queue-mode 参数,可在插件安装配置
@Queue(value = 队列名字, durable = "true", autoDelete = "false", arguments = {
	@Argument(name = "x-queue-mode", value = "lazy")
})

队列消息优先级

创建队列,如果这个值是0,代表优先级无效,设置的值优先级不能超过该值,优先级越高占用内存资源越多。
在这里插入图片描述

生产者配置

@Resource
private RabbitTemplate rabbitTemplate;

// 生产者发送消息 1  - 优先级是1
rabbitTemplate.convertAndSend(EXCHANGE_PRIORITY, ROUTING_KEY_PRIORITY, "生产者发送消息 - 演示优先级:1", message -> {
    message.getMessageProperties().setPriority(1);
    return message;
});
// 生产者发送消息 2  - 优先级是2
rabbitTemplate.convertAndSend(EXCHANGE_PRIORITY, ROUTING_KEY_PRIORITY, "生产者发送消息 - 演示优先级:2", message -> {
    message.getMessageProperties().setPriority(2);
    return message;
});
// 生产者发送消息 3  - 优先级是3
rabbitTemplate.convertAndSend(EXCHANGE_PRIORITY, ROUTING_KEY_PRIORITY, "生产者发送消息 - 演示优先级:3", message -> {
    message.getMessageProperties().setPriority(3);
    return message;
});
// 生产者发送消息1、消息2、消息3的顺序(先进先出)
// 但消费者,消费优先消费:消息3、消息2、消息1 (改变了消费的顺序)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1862790.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

故障诊断 | HO-VMD-TCN河马优化算法优化变分模态分解时间卷积神经网络故障诊断模型

效果一览 文章概述 故障诊断 | HO-VMD-TCN河马优化算法优化变分模态分解时间卷积神经网络故障诊断模型&#xff01;河马优化算法&#xff08;Hippopotamus optimization algorithm&#xff0c;HO&#xff09;由Amiri等人于2024年提出&#xff0c;该算法模拟了河马在河流或池塘中…

突破架构瓶颈:克服软件系统中的漂移和侵蚀

一种常见但不完美的比喻是将软件系统中的架构漂移和侵蚀与物理建筑的架构相比。虽然这个比喻很直观&#xff0c;但它存在一个根本性的误解&#xff0c;这也常常引发软件开发中的架构问题。 试想一下&#xff0c;一个设计良好的摩天大楼或房屋建成后&#xff0c;我们期望它基本保…

数字革命的先锋:揭示Facebook的技术创新

在当今数字化飞速发展的时代&#xff0c;技术创新不仅改变了人们的生活方式&#xff0c;也深刻影响着社会的发展和文化的演变。作为全球最大的社交网络平台&#xff0c;Facebook不仅扮演着连接人与人之间的桥梁角色&#xff0c;更是技术创新的领军者之一。本文将深入探讨Facebo…

【STM32入门学习】学习嵌入式实时操作系统(RTOS)移植uc/OS到stm32F103上

目录 一、建立STM32HAL库工程 1.1实时操作系统 1.2基于HAL库创建工程 二、获取uC/OS-III源码 三、移植准备 3.1复制uC/OS-III文件到工程文件夹 3.2添加工程组件和头文件路径 四、移植修改代码 &#xff14;.1.启动文件修改&#xff1a; &#xff14;.2.app_cfg.h &a…

【PA交易】BackTrader的交易管理

前言 本主要讨论BackTrader中的Broker定制化。如果你已经对于BackTrader的交易管理非常熟悉&#xff0c;并且自己有了成熟的适配方案&#xff0c;那么并不需要看这篇文章。但是如果你还没有深入研究过&#xff0c;那么这篇文章可能会给到你启发。 背景与需求 网上现存大量资…

【数据结构】(C语言):链表

链表&#xff1a; 基本单位是节点。节点至少两部分&#xff1a;数据&#xff0c;下一个数据的地址。头指针head&#xff0c;始终指向链表的第一个节点。若没有节点&#xff0c;则headNULL。链表在内存中是非连续的。不能使用索引&#xff08;下标&#xff09;查找元素。只能从…

关于application/x-www-form-urlencoded跟application/json请求的区别

当你的java方法是这样定义的 PostMapping("/rePushMedicalRecord") public String rePushMedicalRecord(RequestParam("topicId") String topicId){ } 参数是RequestParam接收&#xff0c;则请求时需要用application/x-www-form-urlencoded请求 如果是R…

【ARMv8/v9 GIC 系列 2.3 -- GIC SPI 中断的 GICD_CLRSPI_NSR寄存器】

文章目录 GICD_CLRSPIN_NSR寄存器功能INTID 位 [12:0]中断触发类型的影响小结 GICD_CLRSPIN_NSR 在 ARMv9 架构下&#xff0c;GIC&#xff08;Generic Interrupt Controller&#xff09;是负责中断管理的关键组件&#xff0c;它支持复杂的中断处理需求&#xff0c;包括多处理器…

OS中断机制-嵌套和竞争

对于FreeRTOS最好不去用中断嵌套,中断嵌套会增加堆栈空间的使用,因为每个中断服务程序都需要保存和恢复寄存器状态,这可能会耗尽有限的堆栈空间,从而导致系统故障。以及中断嵌套时,不同的中断服务程序可能会竞争访问共享资源,从而增加死锁的风险。这可能会导致系统出现故…

第二节课 6月13日 ssh密钥登陆方式

centos和ubuntu openssh服务的初始安装 一、实验&#xff1a;ubuntu系统激活root用户 ubuntu系统如何激活root用户&#xff0c;允许root用户ssh登陆&#xff1f; 1、ubuntu默认root用户未设置密码&#xff0c;未激活 激活root用户&#xff0c;设置root密码 sudo passwd roo…

从零开始做题:修猫

修猫 1 题目 2 解题 2.1 使用Stegslove分析图片 (base) ┌──(holyeyes㉿kali2023)-[~/Misc/tool-misc] └─$ java -jar Stegsolve.jar 2.2 analyse -frame browser 2.3 得到flag DASCTF{818ca3a840e768da7d5fcdeaedd5012f}

37岁,被裁员,失业三个月,被面试官嫌弃“太水”:就这也叫10年以上工作经验?

今年部门要招两个自动化测试&#xff0c;这几个月我面试了几十位候选人。发现一个很奇怪的现象&#xff0c;面试中一问到元素定位、框架api、脚本编写之类的&#xff0c;很多候选人都对答如流。但是一问到实际项目&#xff0c;比如“项目中UI自动化和接口自动化如何搭配使用&am…

前端:HTML、CSS、JavaScript 代码注释 / 注释与代码规范

一、HTML 行内注释 HTML注释是在HTML代码中添加说明和解释的一种方法&#xff0c;这些注释不会被浏览器渲染或显示在页面上&#xff0c;而是被浏览器忽略。HTML注释对于代码的可读性、可维护性和团队协作非常重要。 1.1、HTML注释的语法 HTML注释的语法是以<!--开始&…

【算法与数据结构】【字符串篇】【String的常见函数】

系列文章 本人系列文章-CSDN博客https://blog.csdn.net/handsomethefirst/article/details/138226266?spm1001.2014.3001.5502 1.string基本概念 string是C风格的字符串&#xff0c;而string本质上是一个类。 string和char * 区别&#xff1a; char * 是一个指针 string是一…

OpenAI 收购桌面实时协作公司 Multi;iOS 18 开放 iPhone 镜像测试丨RTE 开发者日报 Vol.231

开发者朋友们大家好&#xff1a; 这里是 「RTE 开发者日报」 &#xff0c;每天和大家一起看新闻、聊八卦。我们的社区编辑团队会整理分享 RTE&#xff08;Real-Time Engagement&#xff09; 领域内「有话题的 新闻 」、「有态度的 观点 」、「有意思的 数据 」、「有思考的 文…

[深度学习] 卷积神经网络CNN

卷积神经网络&#xff08;Convolutional Neural Network, CNN&#xff09;是一种专门用于处理数据具有类似网格结构的神经网络&#xff0c;最常用于图像数据处理。 一、CNN的详细过程&#xff1a; 1. 输入层 输入层接收原始数据&#xff0c;例如一张图像&#xff0c;它可以被…

图片怎么加水印?快来试试这6个图片加水印方法(2024年新)

图片怎么加水印&#xff1f;作为打工人在日常的工作生活中总会遇到各种各样的工作难题&#xff0c;相信从事电商或者是设计等工作的小伙伴们&#xff0c;遇到最多的问题应该就是给图片添加水印了。为什么要给图片加水印&#xff1f;其实给图片加水印最主要的目的是保护我们的图…

Linux集群自动化维护-Ansible

1.1Ansible概述 自动化运维&#xff1a;批量管理&#xff0c;批量分发&#xff0c;批量执行&#xff0c;维护。。是python写的 批量管理工具&#xff1a; Ansible&#xff08;无客户端&#xff09;&#xff1a;无客户端&#xff0c;基于ssh进行管理与维护 Saltstack &#…

深度之眼(二十五)——研究生学习计划安排

文章目录 一、前言二、结构安排和规划2.1 夯实基础2.2 分方向训练&#xff08;待&#xff09;2.3 进阶训练 三、其他 一、前言 课题组这边是需要对机器视觉有所要求吧&#xff0c;也就是CV方向。这一届研三师兄也都是在大厂拿到30W的年薪了&#xff0c;也是需要拥抱深度学习这…

NineData和华为云在一起!提供一站式智能数据库DevOps平台

以GuassDB数据库为底座 NineData和华为云一起 为企业提供 一站式智能数据库DevOps平台 帮助开发者 高效、安全地完成 数据库SQL审核 访问控制、敏感数据保护等 日常数据库相关开发任务 NineData 智能数据管理平台 NineData 作为新一代的云原生智能数据管理平台&#xf…