RabbitMQ 2023面试5题(一)

news2024/11/23 19:26:11

一、RabbitMQ延时队列可以用于哪些场景

RabbitMQ延时队列可以用于以下场景:

  1. 订单处理:在电商网站中,订单处理是一个常见的业务流程。如果订单需要立即处理,可以使用RabbitMQ的延时队列来实现延迟处理。例如,可以将订单发送到一个延时队列中,并设置一个延迟时间(例如30分钟),然后在延迟时间到达后,将订单从队列中取出并进行处理。

  2. 消息推送:在移动应用或Web应用程序中,可以使用RabbitMQ的延时队列来实现消息推送。例如,可以将用户订阅的消息发送到一个延时队列中,并设置一个延迟时间(例如1小时),然后在延迟时间到达后,将消息从队列中取出并推送给用户。

  3. 定时任务:在分布式系统中,可以使用RabbitMQ的延时队列来实现定时任务。例如,可以将需要定期执行的任务发送到一个延时队列中,并设置一个延迟时间(例如每天),然后在延迟时间到达后,将任务从队列中取出并执行。

  4. 数据备份:在数据库中,可以使用RabbitMQ的延时队列来实现数据备份。例如,可以将需要备份的数据发送到一个延时队列中,并设置一个延迟时间(例如每天),然后在延迟时间到达后,将数据从队列中取出并进行备份。

  5. 优惠券发放:您可以设置一个延时队列,将优惠券发放任务添加到队列中,设置一定的延时时间,以保证优惠券在特定时间后才能被消费。

  6. 动态路由:您可以使用延时队列来实现动态路由的功能,将消息发送到延时队列中,并设置一定的路由规则,以实现消息在特定时间后被路由到不同的目标队列中。

二、RabbitMQ延时队列,如何实现定时任务

场景:
比如未付款订单,超过一定时间后,系统自动取消订单并释放占有物品。

常用解决方案:
spring的 schedule 定时任务轮询数据库

缺点:
消耗系统内存、增加了数据库的压力、存在较大的时间误差

解决:
rabbitmq的消息TTL和死信Exchange结合

示例:

  1. 首先,需要安装并启动RabbitMQ服务器。
  2. 在Java代码中,使用RabbitMQ的Java客户端库(如amqp-client)连接到RabbitMQ服务器。
  3. 创建一个交换机(exchange),并设置其类型为“x-delayed-message”。
  4. 将需要延迟发送的消息发送到该交换机上,并指定一个唯一的键值(key)。
  5. 在发送消息时,将消息的优先级设置为较低的值(例如0或1)。
  6. RabbitMQ会根据消息的优先级和队列中的其他消息,将该消息放入一个名为“x-delayed-messages”的队列中。
  7. 在Java代码中,使用ScheduledExecutorService来定期执行任务。在每个任务中,从“x-delayed-messages”队列中取出消息并进行处理。
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeUnit;

import java.util.concurrent.atomic.AtomicInteger;

public class RabbitMQDelayedQueueExample {

    private static final String EXCHANGE_NAME = "delayed_queue";
    private static final String ROUTING_KEY = "delayed_routing_key";
    private static final int DELAY_SECONDS = 60; // 延迟1分钟
    private static final AtomicInteger messageCount = new AtomicInteger(0);    

    public static void main(String[] args) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();        
        // 声明交换机与队列
        channel.exchangeDeclare(EXCHANGE_NAME, "x-delayed-message", true, false);
        channel.queueDeclare(ROUTING_KEY, true, false, false, null);        
        // 绑定队列 到交换机
        channel.queueBind(ROUTING_KEY, EXCHANGE_NAME, ROUTING_KEY);        

        // 发送 (0 or 1)
        for (int i = 0; i < 10; i++) {
            String message = "Message " + i;
            channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, null, message.getBytes());
            if (i % DELAY_SECONDS == 0) { 
                messageCount.incrementAndGet();
            } else {
                messageCount.incrementAndGet();
            }
        }  

        // 定时任务
        ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
        scheduler.scheduleAtFixedRate(() -> {
            try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) {
                Queue queue = channel.queueDeclare("x-delayed-messages", true, false, false, null).getQueue();
                GetResponse response = channel.basicGet(queue, true); 
                if (response != null && response.getMessageProperties() != null && response.getMessageProperties().getDeliveryMode() == DeliveryMode.PERSISTENT) { 
                    byte[] body = response.getBody();
                    String message = new String(body); 
                    System.out.println("Processed message: " + message); 
                } else { 
                    System.out.println("No messages available in the queue at this moment"); 
                }
            } catch (IOException | TimeoutException e) { 
                e.printStackTrace(); 
            } finally { 
                scheduler.shutdownNow(); 
            }
        }, DELAY_SECONDS * TimeUnit.SECONDS, DELAY_SECONDS * TimeUnit.SECONDS, TimeUnit.SECONDS); 

三、RabbitMQ如何设置消息的TTL

消息的TTL就是消息的存活时间。

RabbitMQ可以对队列和消息分别设置TTL。

  • 对队列设置就是队列没有消费者连着的保留时间,也可以对每一个单独的消息做单独的
    设置。超过了这个时间,我们认为这个消息就死了,称之为死信。

  • 如果队列设置了,消息也设置了,那么会取小的。所以一个消息如果被路由到不同的队
    列中,这个消息死亡的时间有可能不一样(不同的队列设置)。

  • 这里单讲单个消息的TTL,因为它才是实现延迟任务的关键。可以通过设置消息的expiration字段或者x- message-ttl属性来设置时间,两者是一样的效果。

在声明队列时设置 TTL:

Map<String, Object> args = new HashMap<>();  
args.put("x-message-ttl", 5000); // 设置 TTL 为 5 秒  
Queue queue = channel.queueDeclare("my_queue", true, false, false, args).getQueue();

在发送消息时设置 TTL:

MessageProperties properties = new MessageProperties();  
properties.setExpiration("10000"); // 设置 TTL 为 10 秒  
byte[] body = "Hello, world!".getBytes();  
channel.basicPublish("", "my_queue", properties, body);

四、RabbitMQ死信交换机与死信队列

1、概念

RabbitMQ中的死信交换机(Dead Letter Exchange,简称DLX)和死信队列(Dead Letter Queue,简称DLQ)是用于处理无法被消费者处理的消息的机制。当消息无法被消费时,可以将消息转发到DLX和DLQ中,以便后续处理。

  • DLX是一个特殊的交换机,它用于接收被其它交换机拒绝或超时等无法被消费者处理的消息。当消息被转发到DLX后,可以根据消息的路由键将其路由到相应的队列中进行处理。

  • DLQ是一个特殊的队列,它用于存储被消费者处理失败的消息。当消息无法被消费者处理时,可以将消息转发到DLX,然后再将消息存储到DLQ中。消费者可以在DLQ中处理这些消息,或者将它们重新发送到其它队列中尝试进行处理。

使用DLX和DLQ可以增强RabbitMQ的可靠性和容错性,确保消息能够被正确地处理或者在处理失败时得到适当的处理。在RabbitMQ中,可以通过声明DLX和DLQ来使用这个机制。

2、示例

import com.rabbitmq.client.Channel;  
import com.rabbitmq.client.Connection;  
import com.rabbitmq.client.ConnectionFactory;  
import com.rabbitmq.client.DeliverCallback;  
import com.rabbitmq.client.MessageProperties;  
import java.io.IOException;  
import java.util.concurrent.TimeoutException;  
  
public class DeadLetterExample {  
    private final static String QUEUE_NAME = "my_queue";  
    private final static String EXCHANGE_NAME = "my_exchange";  
    private final static String ROUTING_KEY = "my_routing_key";  
    private final static String MESSAGE_BODY = "Hello, world!";  
    private final static String DLQ_NAME = "my_dlq";  
  
    public static void main(String[] args) throws IOException, TimeoutException {  
        // 创建连接和通道  
        ConnectionFactory factory = new ConnectionFactory();  
        factory.setHost("localhost");  
        Connection connection = factory.newConnection();  
        Channel channel = connection.createChannel();  
  
        // 声明交换机和队列  
        channel.exchangeDeclare(EXCHANGE_NAME, "direct", true, false, null);  
        channel.queueDeclare(QUEUE_NAME, true, false, false, null);  
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY);  
        channel.queueDeclare(DLQ_NAME, true, false, false, null);  
        channel.queueBind(DLQ_NAME, EXCHANGE_NAME, ROUTING_KEY);  
  
        // 发送消息  
        channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, MessageProperties.PERSISTENT_TEXT_PLAIN, MESSAGE_BODY.getBytes());  
  
        // 接收消息  
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {  
            System.out.println("Received message: " + new String(delivery.getBody(), "UTF-8"));  
        };  
        channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });  
  
        // 模拟消费失败,将消息转发到DLX和DLQ中  
        channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, MessageProperties.PERSISTENT_TEXT_PLAIN, MESSAGE_BODY.getBytes());  
  
        // 等待一段时间后关闭连接和通道  
        Thread.sleep(5000); // 等待5秒  
        channel.close();  
        connection.close();  
    }  
}

在这个示例中,我们首先创建了一个交换机和一个队列,并将它们绑定在一起。然后,我们发送了一条消息到这个队列中,接着模拟了一个消费失败的场景,将消息转发到DLX和DLQ中。最后,我们等待5秒钟后关闭连接和通道。

五、RabbitMQ使用延时队列来关闭单据

1. 简单流程

  1. 首先,生产者在订单生成。
  2. 给 user.order.delay.exchange发送消息,路由键是order_delay
  3. 交换机把消息再发送给user.order.delay.queue,路由键是order_delay,这个队列设置三个参数:

x-dead-letter-exchange: user.order.exchange //死信队列
x-dead-letter-routing-key: order //死信路由键
x-message-ttl: 60000 //消息存活1分钟

  1. 1分钟后,交给 user.order.exchange死信交换机
  2. user.order.exchange再把消息存入到 user.order.queue死信队列
  3. 判断,如果没有支付,则关闭单据

在这里插入图片描述

2. 优化流程

将上面的进行优化,使用同一个交换机,就是将user.order.delay.exchange与user.order.exchange进行合并
如图:
在这里插入图片描述

  1. 首先,生产者在订单生成。
  2. 给 order-event- exchange发送消息,路由键是order.create.order
  3. 交换机把消息再发送给order.delay.queue,路由键是order.create.order,这个队列设置三个参数:

x-dead-letter-exchange: order-event-exchange
x-dead-letter-routing-key: order.release.order
x-message-ttl: 60000 //消息存活1分钟

  1. 1分钟后,交还给 order-event- exchange交换机,路由键order.release.order
  2. order-event- exchange再把消息存入到order.release.order. queue死信队列,路由键order.release.order
  3. 判断,如果没有支付,则关闭单据

3、优化流程代码实现

3.1 配置

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.Exchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;

/**
 * 创建队列,交换机,延时队列,绑定关系 的configuration
 * 1.Broker中的Queue、Exchange、Binding不存在的情况下,会自动创建(在RabbitMQ),不会重复创建覆盖
 * 2.懒加载,只有第一次使用的时候才会创建(例如监听队列)
 */
@Configuration
public class MyRabbitMQConfig {

    /**
     * 延时队列
     */
    @Bean
    public Queue orderDelayQueue() {
        /**
         * Queue(String name,  队列名字
         *       boolean durable,  是否持久化
         *       boolean exclusive,  是否排他
         *       boolean autoDelete, 是否自动删除
         *       Map<String, Object> arguments) 属性【TTL、死信路由、死信路由键】
         */
        HashMap<String, Object> arguments = new HashMap<>();
        arguments.put("x-dead-letter-exchange", "order-event-exchange");// 死信路由
        arguments.put("x-dead-letter-routing-key", "order.release.order");// 死信路由键
        arguments.put("x-message-ttl", 60000); // 消息过期时间 1分钟
        return new Queue("order.delay.queue", true, false, false, arguments);
    }

    /**
     * 交换机(死信路由)
     */
    @Bean
    public Exchange orderEventExchange() {
        return new TopicExchange("order-event-exchange", true, false);
    }

    /**
     * 死信队列
     */
    @Bean
    public Queue orderReleaseQueue() {
        return new Queue("order.release.order.queue", true, false, false);
    }

    /**
     * 绑定:交换机与订单解锁延迟队列
     */
    @Bean
    public Binding orderCreateBinding() {
        /**
         * String destination, 目的地(队列名或者交换机名字)
         * DestinationType destinationType, 目的地类型(Queue、Exhcange)
         * String exchange,
         * String routingKey,
         * Map<String, Object> arguments
         **/
        return new Binding("order.delay.queue",
                Binding.DestinationType.QUEUE,
                "order-event-exchange",
                "order.create.order",
                null);
    }

    /**
     * 绑定:交换机与订单解锁死信队列
     */
    @Bean
    public Binding orderReleaseBinding() {
        return new Binding("order.release.order.queue",
                Binding.DestinationType.QUEUE,
                "order-event-exchange",
                "order.release.order",
                null);
    }

    /**
     * 绑定:交换机与库存解锁
     */
    @Bean
    public Binding orderReleaseOtherBinding() {
        return new Binding("stock.release.stock.queue",
                Binding.DestinationType.QUEUE,
                "order-event-exchange",
                "order.release.other.#",
                null);
    }
}

3.2 发送消息

@ResponseBody
    @GetMapping(value = "/test/createOrder")
    public String createOrderTest() {
        //订单下单成功
        OrderEntity orderEntity = new OrderEntity();
        orderEntity.setOrderSn(UUID.randomUUID().toString());
        orderEntity.setModifyTime(new Date());
        //给MQ发送消息
        rabbitTemplate.convertAndSend("order-event-exchange","order.create.order",orderEntity);
        return "ok";
    }

3.3 监听消息

@Service
public class DeadQueueListener {

    /**
     * queues:声明需要监听的队列
     * channel:当前传输数据的通道
     * 获取实际消息内容有两种方式:
     *  方式一:在方法参数列表中直接声明出来
     *  方式二:从请求体中取出消息的二进制形式,然后通过JSON反序列化即可
     */
    @RabbitListener(queues = {"order.release.order.queue"})
    public void revieveMessage(Message message, OrderEntity entity, Channel channel) throws IOException {
        System.out.println("接受到的消息内容" + entity);
    }

}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/685568.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

SpringBoot + Vue前后端分离项目实战 || 三:Spring Boot后端与Vue前端连接

文章目录 前后端对接前端接口修改对接后端后端总体配置后端编写登录登出业务代码 测试后端所有代码 前后端对接 前端接口修改对接后端 src\api\user.js中修改请求地址&#xff0c;与后端保持一致 记录下前端的src\utils\request.js中的X-Token字段 改变开发环境中的请求地…

Golang每日一练(leetDay0108) 灯泡开关I\II Bulb Switcher

目录 319. 灯泡开关 Bulb Switcher &#x1f31f;&#x1f31f; 672. 灯泡开关II Bulb Switcher ii &#x1f31f;&#x1f31f; &#x1f31f; 每日一练刷题专栏 &#x1f31f; Rust每日一练 专栏 Golang每日一练 专栏 Python每日一练 专栏 C/C每日一练 专栏 Java每…

基于Java校园快递一站式服务系统设计实现(源码+lw+部署文档+讲解等)

博主介绍&#xff1a; ✌全网粉丝30W,csdn特邀作者、博客专家、CSDN新星计划导师、java领域优质创作者,博客之星、掘金/华为云/阿里云/InfoQ等平台优质作者、专注于Java技术领域和毕业项目实战 ✌ &#x1f345; 文末获取源码联系 &#x1f345; &#x1f447;&#x1f3fb; 精…

【八大排序(十)】八大排序效率与稳定性分析

&#x1f493;博主CSDN主页:杭电码农-NEO&#x1f493;   ⏩专栏分类:八大排序专栏⏪   &#x1f69a;代码仓库:NEO的学习日记&#x1f69a;   &#x1f339;关注我&#x1faf5;带你学习排序知识   &#x1f51d;&#x1f51d; 八大排序总结 1. 前言2. 什么是排序算法的…

基于多模态变分对抗主动学习的下游医学图像分析任务

文章目录 M-VAAL: Multimodal Variational Adversarial Active Learning for Downstream Medical Image Analysis Tasks摘要本文方法实验结果 M-VAAL: Multimodal Variational Adversarial Active Learning for Downstream Medical Image Analysis Tasks 摘要 在医学领域&…

chatgpt赋能python:Python自动执行某个软件

Python自动执行某个软件 Python是一种简单易用且非常流行的编程语言&#xff0c;常用于自动化和数据分析。如果你想自动执行某个软件&#xff0c;那么Python将是一个非常好的选择。在本文中&#xff0c;我们将介绍如何使用Python自动执行某个软件&#xff0c;以及如何做好SEO优…

第 351 场LeetCode周赛

A 美丽下标对的数目 模拟 class Solution { public:int countBeautifulPairs(vector<int> &nums) {int n nums.size();int res 0;for (int i 0; i < n; i)for (int j i 1; j < n; j)if (gcd(to_string(nums[i])[0] - 0, to_string(nums[j]).back() - 0) …

人工智能技术与GIS结合的发展

个人本是GIS专业出身&#xff0c;不知名985高校本硕。工作几年后先后积累了国土空间规划、cesium开发、地理信息数据采集、地理大数据处理&#xff0c;遥感影像处理、人工智能识别&#xff0c;做过十多个500万以上的相关项目&#xff0c;有一些浅薄的经验&#xff0c;想和大家分…

tinykv project4总结

主要目标 实现mvcc和2pc, Percolator partA 将存储分为三个独立的部分&#xff0c;lock&#xff08;管理锁记录&#xff09; default(存储数据)&#xff0c;write(提交的记录)&#xff0c;提高并行性 对于lock存储&#xff0c;只要存储一份&#xff08;因为一个行同时只能有…

chatgpt赋能python:用Python编写聊天机器人:打造AI智能助手

用Python编写聊天机器人&#xff1a;打造AI智能助手 简介 聊天机器人在现代生活中越来越受欢迎。一个好的聊天机器人能够回答我们的问题、执行任务、提供娱乐&#xff0c;甚至成为我们的朋友。Python是一种强大的编程语言&#xff0c;其模块化和易学的特性使其成为开发聊天机…

GPT-4在药物发现中的作用|景联文科技

GPT-4是一种生成式AI模型&#xff0c;可以响应文本和图像&#xff0c;它代表了生成式AI可能实现的重大进步。 药物发现的最早任务之一是检索和观察与靶蛋白结合的已知分子。这可能会导致一种基于知识的筛选方法&#xff0c;人们试图通过仅检查这些分子来进行筛选。我们让GPT-4…

【MySQL数据库的表连接语句】

目录 一、连接查询1、inner join(内连接)2、left join(左连接)3、right join(右连接) 二、CREATE VIEW三、UNION取非交集的值 五、CASE六、空值(NULL) 和 无值() 的区别 一、连接查询 A表 B表 UPDATE store_info SET store_nameWashington WHERE sales300;#修改一下表里面的…

chatgpt赋能Python-python自动化办公真的有用吗_知乎

简介 如今&#xff0c;Python作为一种必学的编程语言&#xff0c;已经走进了各行各业的办公场景。Python自动化办公也逐渐成为了一个热门话题&#xff0c;很多人开始使用Python来进行一些机械化、重复性的办公工作&#xff0c;例如数据清洗、文本处理、文件管理、自动发送邮件…

专业CPU信息检测工具:CPU-Z

今天小编为大家测试了一款轻量级的CPU处理器的测试工具&#xff0c;可以查看CPU的详细信息&#xff0c;以供各位同学们学习。 一、简单介绍 CPU-Z是一款非常流行的CPU检测软件&#xff0c;被广大用户所熟知。它是目前最受欢迎的CPU检测软件之一&#xff0c;除了Intel和AMD自带…

chatgpt赋能python:Python自动化填表:省时省力的数据录入方式

Python自动化填表&#xff1a;省时省力的数据录入方式 现代社会&#xff0c;数据填写是我们日常工作中不可避免的一项任务。但手动填写数据不仅费时费力&#xff0c;还容易出错。那么有没有一种方法可以既省时又省力呢&#xff1f;答案是有的&#xff0c;那就是Python自动化填…

Swagger与knife4j接口文档组件详解

swagger介绍 相信无论是前端还是后端开发&#xff0c;都或多或少地被接口文档折磨过。前端经常抱怨后端给的接口文档与实际情况不一致。后端又觉得编写及维护接口文档会耗费不少精力&#xff0c;经常来不及更新。其实无论是前端调用后端&#xff0c;还是后端调用后端&#xff…

【数据挖掘】——常见算法对比和选择

&#x1f935;‍♂️ 个人主页&#xff1a;Lingxw_w的个人主页 ✍&#x1f3fb;作者简介&#xff1a;计算机科学与技术研究生在读 &#x1f40b; 希望大家多多支持&#xff0c;我们一起进步&#xff01; &#x1f604; 如果文章对你有帮助的话&#xff0c; 欢迎评论 &#x1f4…

2022(一等奖)D678基于改进结构函数法的大气气溶胶遥感反演

作品介绍 1 应用背景 大气气溶胶是大气中重要的成分之一&#xff0c;是悬浮于大气中的固体和液体微粒与它们的气体载体共同组成的多相体系&#xff0c;其尺度大约在10-3到102 μm之间。大气气溶胶的特性对空气质量具有良好的指示作用&#xff0c;气溶胶的研究对空气质量的监测…

读发布!设计与部署稳定的分布式系统(2版)笔记12_超时模式

1. “模式采用量”绝不是好的质量指标 1.1. 应该形成一种“面向恢复”的思维模式 1.2. 良好的模式能为开发工程师提供架构和设计方面的指导&#xff0c;从而减少、消除或缓解系统中的裂纹产生的影响 1.2.1. 在新发布软件后&#xff0c;它们能让你睡个安稳觉 2. 超时 2.1. …

车载网络测试 - CANCANFD - 基础篇_04

9、CAN报文包含帧格式 1&#xff09;数据帧 2&#xff09;远程帧 3&#xff09;错误帧 4&#xff09;过载帧 SOF&#xff1a;Start Of Frame&#xff0c;帧起始位&#xff1b;一个显性位&#xff0c;表明一帧的开始 RTR&#xff1a;Remote Transmission Request&#xff0c;远…