【RabbitMQ】

news2024/12/30 1:53:10

一、概念

MQ(消息队列):是指在消息传送过程中保存消息的容器,用于分布式系统之间的通信
生产者:是发送消息的用户应用程序。
队列:是存储消息的缓冲区。
消费者:是接收消息的用户应用程序。

1、优劣势

优势

● 应用解耦:使用MQ使得应用间耦合度降低,提高系统容错性和可维护性
● 异步提速:系统将消息发给MQ之后,就可返回用户信息,后续操作异步执行
● 消峰填谷:并发量高时,将消息全部放到MQ中,限制消费的速度为固定并发量,这样就消掉了高峰期的并发量,这就是消峰;但是因为消息积压,在高峰期过后的一段时间内,消费速度也仍旧保持固定并发量,直到消费完积压的消息,这就是填谷

劣势

● 系统可用性降低:一旦MQ宕机
● 系统复杂度提高:如何保证消息没有重复被消费,消息丢失了怎么办,如何保证消息的顺序等等
● 一致性问题:A系统同时给B、C、D发送消息,BC成功,D失败,如何保证数据一致性

2、工作模式

  1. 简单工作模式:一个生产者对应一个消费者
    在这里插入图片描述

  2. 工作队列模式(Work Queues):一个生产者对应多个消费者,多个消费者之间属于竞争关系,当任务比较重时,可以提高处理速度
    在这里插入图片描述

  3. 订阅模式(Publish/Subscribe):一个生产者对应多个消费者,多个消费者之间不是竞争关系,在这种模式中引入交换机的概念,交换机类型为fanout
    交换机:接收生产者的消息,并将消息推送给队列;交换机必须知道要如何处理他接收到的消息,类型如下:
    ○ direct:定向,将消息交给符合指定routing key的队列
    ○ topic:通配符,将消息交给符合制定routing pattern的队列
    ○ headers:参数匹配
    ○ fanout:广播,将收到的所有消息广播到它知道的所有队列。发送消息时不需要指定routing key
    在这里插入图片描述

  4. 路由模式(Routing):需要设计交换机类型为 direct,交换机和队列进行绑定,并指定通配符方式的routing key,当发送消息到交换机时,交换机会根据routing key将消息发送给队列
    ● 队列与交换机的绑定不再是随意绑定,而是指定要routing key;
    ● 发送方发送消息时,需要指定routing key;
    ● 只有队列的routing key与消息的routing key一致,才能接收到消息
    在这里插入图片描述

  5. 通配符模式(Topics):需要设计交换机类型为Topics,交换机和队列进行绑定,并指定通配符方式的routing key,当发送消息到交换机时,交换机会根据routing key将消息发送给队列
    ● *(星号)只能代替一个词。
    ● # (hash) 可以替代零个或多个单词。
    在这里插入图片描述

二、SpringBoot 整合RabbitMQ

1、引入依赖

implementation 'org.springframework.boot:spring-boot-starter-amqp'

2、配置文件

spring:
  rabbitmq:
    host: 192.168.252.206
    port: 5672
    username: admin
    password: admin

3、配置类

@Configuration
public class RabbitConfig {
    public static String EXCHANGE_NAME = "test_exchange";
    public static String QUEUE_NAME = "test_queue";

    /**
     * 1、交换机
     *
     * @return
     */
    @Bean(name = "testExchange")
    public Exchange exchanger() {
        return ExchangeBuilder.topicExchange(EXCHANGE_NAME).build();
    }

    /**
     * 2、队列
     *
     * @return
     */
    @Bean(name = "testQueue")
    public Queue queue() {
        return QueueBuilder.durable(QUEUE_NAME).build();
    }

    /**
     * 3、绑定交换机和队列
     *
     * @param exchange
     * @param queue
     * @return
     */
    @Bean
    public Binding binding(@Qualifier(value = "testExchange") Exchange exchange, @Qualifier(value = "testQueue") Queue queue) {
        return BindingBuilder.bind(queue).to(exchange).with("test.#").noargs();
    }
}

4、生产者发送消息

@SpringBootTest
class RabbitConfigTest {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    public void test() {
        rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE_NAME, "test.kk", "hello world!");
    }
}

5、消费者接收消息

@Component
public class RabbitMqListener {
    
    @RabbitListener(queues = "test_queue")
    public void testListener(Message message) {
        System.out.println(message);
    }
}

结果如下:

(Body:'hello world!' MessageProperties [headers={}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=test_exchange, receivedRoutingKey=test.kk, deliveryTag=1, consumerTag=amq.ctag-nYz1fLxW0ezTLEcO3W1rVw, consumerQueue=test_queue])

三、特性

1、消息的可靠投递

RabbitMQ为我们提供了两种控制消息可靠性的模式

confirm 确认模式

1、开启确认模式
spring:
rabbitmq:
publisher-confirm-type: correlated

  • NONE 禁用发布确认模式,是默认值
  • CORRELATED 发布消息成功到交换器后会触发回调方法
  • SIMPLE 经测试有两种效果:
    • 其一效果和 CORRELATED 值一样会触发回调方法,
    • 其二在发布消息成功后使用 rabbitTemplate 调用 waitForConfirms 或 waitForConfirmsOrDie 方法 等待 broker 节点返回发送结果,根据返回结果来判定下一步的逻辑,要注意的点是 waitForConfirmsOrDie 方法如果返回 false 则会关闭 channel,则接下来无法发送消息到 broker;

2、设置ConfirmCallback

@SpringBootTest
class RabbitConfigTest {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    public void test() {
        rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
            if (ack) {
                System.out.println("消息接收成功!");
            } else {
                System.out.println("消息接收失败!" + cause);
            }
        });
        rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE_NAME, "test.kk", "hello world!");
    }
}

return 退回模式

当消息发送给Exchange时,Exchange路由到Queue失败,才会执行RetureCallback
1、开启回退模式

spring:
  rabbitmq:
  	publisher-returns: true

2、设置RetureCallback
3、设置Exchange消息处理模式

  • 如果消息没有路由到Queue,则丢弃消息
  • 如果消息没有路由到Queue,将消息返回给发送方
@SpringBootTest
class RabbitConfigTest {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    public void test() {
        // 设置交换机失败处理模式,true:返回消息给发送方;默认为false,即丢弃消息
        rabbitTemplate.setMandatory(true);
        // 设置RetureCallback
        rabbitTemplate.setReturnsCallback((returned) -> {
            System.out.println("return 执行了");
            System.out.println(returned);
        });
        rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE_NAME, "test1.kk", "hello world!");
    }
}

2、Consumer ack

1、设置模式

spring:
  rabbitmq:
  	listener:
      direct:
        acknowledge-mode: manual
  • none:自动确认
  • manual:手动确认

2、设置监听器

  • 如果在消费端没有出现异常,就调用basicAck()方法签收消息
  • 如果在消费端出现异常,就调用basicNack()方法拒绝消息,让mq重新发送
@Component
public class AckListener implements ChannelAwareMessageListener {
    @RabbitListener(queues = "test_queue")
    @Override
    public void onMessage(Message message, Channel channel) throws Exception {
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        try {
            System.out.println("处理业务逻辑");
            channel.basicAck(deliveryTag, true);
        } catch (Exception e) {
            channel.basicNack(deliveryTag, true, true);
        }
    }
}

3、消费端限流

1、这是Consumer ack的模式为手动确认

spring:
  rabbitmq:
  	listener:
      direct:
        acknowledge-mode: manual
		prefetch: 2
  • none:自动确认
  • manual:手动确认
  • prefetch:表示消费端每次从mq中拉取多少条消息,直到手动确认消费完,才会拉取下一条消息

2、设置监听器

@Component
public class AckListener implements ChannelAwareMessageListener {

    @RabbitListener(queues = "test_queue")
    @Override
    public void onMessage(Message message, Channel channel) throws Exception {
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        try {
            System.out.println(message.getBody().toString());
            System.out.println("处理业务逻辑");
            //channel.basicAck(deliveryTag, true);
        } catch (Exception e) {
            channel.basicNack(deliveryTag, true, true);
        }
    }
}

4、生产者

class RabbitConfigTest {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    public void test() {
        for (int i = 0; i < 3; i++) {
            rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE_NAME, "test.kk", "hello world!");
        }
    }
}

4、TTL

TTL全程 time to live,也就是存活时间/过期时间;当消息到达存活时间后,还没有被消费,会被清除;RabbitMQ可以对消息设置存活时间也可以对队列设置存活时间
对队列统一设置:是对 x-message-ttl 参数设置
对消息单独设置:是对 expiration 参数设置
如果两者都设置了,以时间短的为准

设置队列的存活时间

1、配置队列,将ttl设置为10秒

@Configuration
public class RabbitConfig {
    public static String EXCHANGE_NAME = "test_exchange";
    public static String QUEUE_NAME = "test_queue";

    /**
     * 1、交换机
     *
     * @return
     */
    @Bean(name = "testExchange")
    public Exchange exchanger() {
        return ExchangeBuilder.topicExchange(EXCHANGE_NAME).build();
    }

    /**
     * 2、队列
     *
     * @return
     */
    @Bean(name = "testQueue")
    public Queue queue() {
        return QueueBuilder.durable(QUEUE_NAME).ttl(10000).build();
    }

    /**
     * 3、绑定交换机和队列
     *
     * @param exchange
     * @param queue
     * @return
     */
    @Bean
    public Binding binding(@Qualifier(value = "testExchange") Exchange exchange, @Qualifier(value = "testQueue") Queue queue) {
        return BindingBuilder.bind(queue).to(exchange).with("test.#").noargs();
    }
}

2、发送mq

@SpringBootTest
class RabbitConfigTest {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    public void test() {
        for (int i = 0; i < 3; i++) {
            rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE_NAME, "test.kk", "hello world!");
        }
    }
}

3、去RabbitMQ界面查看,会发现该队列的ready在10秒之后会置0
在这里插入图片描述
在这里插入图片描述

设置消息的存活时间

1、生产者发送MQ:只需在发送消息时加上messagePostProcessor即可

@SpringBootTest
class RabbitConfigTest {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    public void test() {
        MessagePostProcessor messagePostProcessor = new MessagePostProcessor() {
            @Override
            public Message postProcessMessage(Message message) throws AmqpException {
                message.getMessageProperties().setExpiration("10000");
                return message;
            }
        };
        rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE_NAME, "test.kk", "hello world!",messagePostProcessor);
    }
}

5、死信队列

消息成为死信的条件:

  • 消息队列长度达到限制
  • 消费者拒收消息:basicNack/basicReject,并且不把消息放回原目标队列:requeue=false
  • 原队列存在消息过期设置,消息达到过期时间未被消费

队列绑定死信交换机

  • x-dead-letter-exchange
  • x-dead-letter-routing-key

1、配置

spring:
  rabbitmq:
    host: 192.168.252.206
    port: 5672
    username: admin
    password: admin

2、配置类

@Configuration
public class RabbitConfig {
    public static String EXCHANGE_NAME = "test_exchange";
    public static String QUEUE_NAME = "test_queue";

    public static String DEAD_EXCHANGE_NAME = "dead_test_exchange";
    public static String DEAD_QUEUE_NAME = "dead_test_queue";

    /**
     * 1、死信交换机
     *
     * @return
     */
    @Bean(name = "deadTestExchange")
    public Exchange deadExchanger() {
        return ExchangeBuilder.topicExchange(DEAD_EXCHANGE_NAME).build();
    }

    /**
     * 2、死信队列
     *
     * @return
     */
    @Bean(name = "deadTestQueue")
    public Queue deadQueue() {
        return QueueBuilder.durable(DEAD_QUEUE_NAME).build();
    }

    /**
     * 3、绑定死信交换机和死信队列
     *
     * @param exchange
     * @param queue
     * @return
     */
    @Bean
    public Binding bindingDead(@Qualifier(value = "deadTestExchange") Exchange exchange,
                               @Qualifier(value = "deadTestQueue") Queue queue) {
        return BindingBuilder.bind(queue).to(exchange).with("test.kk.#").noargs();
    }

    /**
     * 1、交换机
     *
     * @return
     */
    @Bean(name = "testExchange")
    public Exchange exchanger() {
        return ExchangeBuilder.topicExchange(EXCHANGE_NAME).build();
    }

    /**
     * 2、队列
     *
     * @return
     */
    @Bean(name = "testQueue")
    public Queue queue() {
        return QueueBuilder.durable(QUEUE_NAME)
                .ttl(10000)
                .deadLetterExchange(DEAD_EXCHANGE_NAME)
                .deadLetterRoutingKey("test.kk")
                .maxLength(3)
                .build();
    }

    /**
     * 3、绑定交换机和队列
     *
     * @param exchange
     * @param queue
     * @return
     */
    @Bean
    public Binding binding(@Qualifier(value = "testExchange") Exchange exchange, @Qualifier(value = "testQueue") Queue queue) {
        return BindingBuilder.bind(queue).to(exchange).with("test.#").noargs();
    }
}

3、测试类

  • 超时情况
@SpringBootTest
class RabbitConfigTest {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    public void test() {
        rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE_NAME, "test.kk", "hello world!");
    }
}
  • 超出长度
@SpringBootTest
class RabbitConfigTest {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    public void test() {
        for (int i = 0; i < 5; i++) {
            rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE_NAME, "test.kk", "hello world!");
        }
    }
}
  • 消费端拒收

生产端

@SpringBootTest
class RabbitConfigTest {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    public void test() {
        rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE_NAME, "test.kk", "hello world!");
    }
}

消费端

@Component
public class DeadListener implements ChannelAwareMessageListener {

    @RabbitListener(queues = "test_queue")
    @Override
    public void onMessage(Message message, Channel channel) throws Exception {
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        try {
            System.out.println(new String(message.getBody()));
            System.out.println("处理业务逻辑");
            int i = 3 / 0;
            channel.basicAck(deliveryTag, true);
        } catch (Exception e) {
            channel.basicNack(deliveryTag, true, false);
        }
    }
}

6、延迟队列

延迟队列:消息进入消费端之后,不会立马被消费,会在指定时间达到后,才会消费。RabbitMQ通过TTL和死信队列实现延迟队列

  • 只需设置队列或者消息过期时间,当消息过期后即可进入死信队列
  • 消费端监听队列要监听死信队列

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/717606.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

前端开发在公司中的位置以及日常工作内容

导读 俗话说的好&#xff0c;不谋全局者不足谋一域。 上一篇文章我们介绍了计算机相关的各种不同方向&#xff0c;相信大家心里也有自己所喜欢的职业&#xff0c;那么今天我们继续讲讲在一个公司中前端开发处于什么样的地位&#xff0c;以及前端的一天都干些什么 普通公司的…

太卷了,阿里一面试官把多年总结的Java八股文完全开源了.......

Java越来越卷了&#xff0c;都快卷成韭菜花了&#xff0c;最近又赶上跳槽的高峰期&#xff0c;好多粉丝&#xff0c;都问我要有没有最新面试题&#xff0c;索性&#xff0c;前一阵子偶然得到一份阿里面试官整理的Java八股文&#xff0c;答案都整理好&#xff0c;整理的《互联网…

【Java基础教程】(二)入门介绍篇 · 下:从JDK下载安装到第一个“Hello World!”程序,解析PATH和CLASSPATH环境变量的妙用~

Java基础教程之入门介绍 下 本节学习目标1️⃣ JDK安装与配置2️⃣ 第一个Java程序&#xff1a;“Hello World!”3️⃣ 环境变量 CLASSPATH&#x1f33e; 总结 本节学习目标 JDK 安装与配置&#xff1b;理解环境变量PATH和CLASSPATH的主要作用&#xff1b;运行第一个Java程序…

Spark(11):RDD持久化

目录 0. 相关文章链接 1. RDD Cache 缓存 2. RDD CheckPoint 检查点 3. 缓存和检查点区别 0. 相关文章链接 Spark文章汇总 1. RDD Cache 缓存 RDD 通过 Cache 或者 Persist 方法将前面的计算结果缓存&#xff0c;默认情况下会把数据以缓存在 JVM 的堆内存中。但是并不是这…

关于Spring Boot的若干个重要问题

Spring Boot 1.什么是springboot 用来简化spring应用的初始搭建以及开发过程 使用特定的方式来进行配置&#xff08;properties或yml文件&#xff09; 创建独立的spring引用程序 main方法运行 嵌入的Tomcat 无需部署war文件 简化maven配置 自动配置spring添加对应功能starter…

nodejs-pm2管理js并发/自动重启/恢复等

目录 一、nodejs安装二、启动运行js三、实用功能1-pm2对进程名起别名四、实用功能2-pm2启动多个进程五、实用功能3-pm2内存限制自动重启六、实用功能4-服务器宕机前保存记录恢复进程 一、nodejs安装 nodejs安装使用nohup后台启动项目&#xff0c;倒是解决了控制台问题&#xf…

用Python从文件中读取学生成绩,并计算最高分/最低分/平均分

目录标题 前言环境使用:涉及知识点代码展示尾语 前言 嗨喽~大家好呀&#xff0c;这里是魔王呐 ❤ ~! 今天咱们试试用Python从文件中读取学生成绩&#xff0c;并计算最高分/最低分/平均分。 环境使用: Python 3.8 解释器 Pycharm 编辑器 涉及知识点 文件读写 基础语法 字…

QQ邮箱第三方POP3/IMAP/SMTP/Exchange/CardDAV/CalDAV服务授权码

参考QQ邮箱&#xff1a; 什么是授权码&#xff0c;它又是如何设置&#xff1f; 设置入口&#xff1a; 选择 账户 下拉找到POP3/IMAP/SMTP/Exchange/CardDAV/CalDAV服务就好了。我这边已经开通&#xff0c;开通流程挺简单的&#xff0c;手机号绑定然后输入验证码就好了。

华安联大:基于北斗RTK+蓝牙AOA、UWB定位技术为智慧港口提供多元化解决方案

深圳华安联大创新科技有限公司的商场室内导航系统方案&#xff0c;解决传统购物中心用户体验差的缺点&#xff0c;可实现3大类功能应用: (1)实现顾客在商场内自主导航&#xff0c;室内位置实时分享&#xff0c;目的地商铺自主导航&#xff0c;路径规划等功能: (2)停车场反向寻…

CISA在三星和D-Link设备中发现8个被积极利用的漏洞

美国网络安全和基础设施安全局&#xff08;CISA&#xff09;根据已有的证据&#xff0c;将8个被积极利用的漏洞列入已知的漏洞&#xff08;KEV&#xff09;目录中。 这8个被积极利用的漏洞包括影响三星智能手机的六个漏洞和影响D-Link设备的两个漏洞。以下是这八个漏洞&#x…

Java jsp 实战

1.JSP执行过程&#xff08;原理&#xff09; 步骤1&#xff1a;翻译&#xff08;jsp-->java&#xff09; 步骤2&#xff1a;编译&#xff08;java-->class文件&#xff09; 步骤3&#xff1a;执行&#xff08;执行class(字节码)文件&#xff09; 2.JSP实战 步骤1&…

UE4中创建的瞄准偏移或者混合空间无法拖入动画

UE4系列文章目录 文章目录 UE4系列文章目录前言一、解决办法 前言 UE4 AimOffset(瞄准偏移)动画融合时&#xff0c;AimOffse动画拖入不了融合框的解决办法&#xff0c;你会发现动画无法拖入到融合框&#xff0c;ue4编辑器提示“Invalid Additive animation Type”&#xff0c;…

Android Studio最新好用的插件----Gson转Java实体类/Kotlin Data

1.Java 安装好插件之后&#xff0c;把Gson/Json数据复制一下&#xff0c;eg: { "Chrome": "UA-66061856-6", "ChromePro": "UA-66061856-9", "Opera": "UA-66061856-7", "Edge": "UA-66061856-8&q…

K8s 为什么要弃用 Docker

K8s 为什么要弃用 Docker 最近在学习容器技术的过程中&#xff0c;看到有关于Kubernetes“弃用 Docker”的事情&#xff0c;担心现在学 Docker 是否还有价值&#xff0c;是否现在就应该切换到 containerd 或者是其他 runtime。 随着深入了解&#xff0c;这些疑虑的确是有些道理…

python实现语音识别(讯飞开放平台)

文章目录 讯飞平台使用python实现讯飞接口的语音识别第一步&#xff1a;导入需要的依赖库第二步&#xff1a;初始化讯飞接口对象第三步&#xff1a;收到websocket建立连接后的处理函数第四步&#xff1a;收到websocket消息的处理函数第五步&#xff1a;整合运行各函数 讯飞平台…

linux内核TCP源码浅析

目录 数据接收流程驱动层网络层ip_local_deliverip_local_deliver_finish 传输层tcp_v4_rcvtcp_v4_do_rcvtcp_rcv_establishedtcp_recvmsg linux内核源码下载&#xff1a;https://cdn.kernel.org/pub/linux/kernel/ 我下载的是&#xff1a;linux-5.11.1.tar.gz 数据接收流程 …

服务器数据库被360后缀勒索病毒攻击怎么办?360勒索病毒的加密形式

近日&#xff0c;我们收到很多企业的求助&#xff0c;企业服务器内的多种数据库被.360后缀的勒索病毒加密&#xff0c;导致企业许多工作无法正常运转&#xff0c;也给企业带来了严重的经济损失。360后缀勒索病毒是一种恶意软件&#xff0c;它属于BeijingCrypt勒索病毒家族&…

Server - 通过 AutoSSH 建立服务器端口转发用于访问网页

欢迎关注我的CSDN&#xff1a;https://spike.blog.csdn.net/ 本文地址&#xff1a;https://spike.blog.csdn.net/article/details/131536508 AutoSSH 是一款用于创建和维护持久的SSH隧道的工具&#xff0c;可以自动检测和恢复断开的连接&#xff0c;从而保证隧道的稳定性。Auto…

VS2022集成代码规范组件StyleCop.Analyzers应用于解决方案

背景 项目团队刚刚组件&#xff0c;每个人的代码编写习惯都不一样&#xff0c;希望用一款代码规范的检查插件来规范团队成员编写代码的习惯&#xff0c;在网上找了一遍之后&#xff0c;感觉StyleCop.Analyzers比较适用。 集成方法 1.NuGet搜索StyleCop.Analyzers进行安装到指…

docker-compose启动redis:Can‘t open the append-only file: permission deined

一、问题 docker-compose启动redis&#xff1a;Can‘t open the append-only file: permission deined //1-进入docker文件夹下 cd /docker//2-使用docker-comopse启动Mysql和redis docker-compose up -d mysql redisCan‘t open the append-only file: permission deined这里…