rabbitMQ死信队列快速编写记录

news2025/1/16 9:03:42

文章目录

  • 1.介绍
    • 1.1 什么是死信队列
    • 1.2 死信队列有什么用
  • 2. 如何编码
    • 2.1 架构分析
    • 2.2 maven坐标
    • 2.3 工具类编写
    • 2.4 consumer1编写
    • 2.5 consumer2编写
    • 2.6 producer编写
  • 3.整合springboot
    • 3.1 架构图
    • 3.2 maven坐标
    • 3.3 构建配置类,创建exchange,queue,并绑定
    • 3.4 编写生产者(controller的一个方法)
    • 3.5 编写消费者(一个类, 方法上加上@RabbitListenner,表明需要监听的queue)
  • 4. 常见参数汇总

1.介绍

1.1 什么是死信队列

再rabbitMQ中,有两个重要的组件。exchange(交换机),queue(队列)。交换机用于路由消息,简单来说就是接收客户端传递的消息转发到queue中。队列做的事情就是存储消息
但消息并不会一只存储在队列中。当存在一下三种情况,消息就会死掉

  • 队列存储不了过多的消息
  • 消息本身存在过期时间

当遇到死掉的消息时,我们通常会将这些死信转发到新的交换机中,这个交换机就叫做死信交换机,而配合死信交换机存储信息的队列,叫做死信队列

1.2 死信队列有什么用

死信队列在构建延迟队列时,有巨大作用。比如用户购票订单,30min不支付就过期。在rabbitMQ中可以这样实现

  • 1 存储购票信息到exchange-queue
  • 2 设置消息过期时间为30min
  • 3 如果超过30min消息未被消费(消息过期,成为死信),存储死信队列,通知服务取消订单

2. 如何编码

2.1 架构分析

先上绑定架构图
在这里插入图片描述

整个流程中,出现了三方。在编写代码时,我们可以分三个大类,分别是producer,consumer1,consumer2。其消息传递顺序如下

  • producer -> normal_exchange
  • normal_queue -> consumer1
  • dead_queue-> consumer2

我们可以在编写consumer1的时候,完成exchange和queue的创建与绑定

2.2 maven坐标

        <dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
            <version>5.9.0</version>
        </dependency>

2.3 工具类编写

rabbitMQ整体的编写流程如下

  • 创建Connection工厂
  • 配置工厂
  • 创建链接
  • 获取channel
  • 通过channel创建exchange, queue, 关系绑定, 监听消息, 发送消息

因此,我们可以先创建工具类,帮我们获取channel,以此减少开发代码量

public class MQUtils {
    public static Channel getChannel() throws Exception {
        // 创建工厂链接
        ConnectionFactory factory = new ConnectionFactory();
        // 设置工厂
        factory.setHost("your_ip");
        factory.setUsername("your_username");
        factory.setPassword("your_password");
        factory.setVirtualHost("/"); // 基本都是/
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        return channel;
    }
}

2.4 consumer1编写

public class Consumer1 {
    static String EXCHANGE_NAME = "normal_exchange";
    static String QUEUE_NAME = "normal_queue";
    static String DEAD_EXCHANGE_NAME = "dead_exchange";
    static String DEAD_QUEUE_NAME = "dead_queue";

    public static void main(String[] args) throws Exception{
        Channel channel = MQUtils.getChannel();
        // 普通交换机
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT, false, true, null);
        // 普通队列
        // 配置死信交换机参数
        HashMap<String, Object> map = new HashMap<>();
        // 配置normal_queue连接的dead_exchange
        map.put("x-dead-letter-exchange", DEAD_EXCHANGE_NAME);
        // 设置normal_queue的消息过期时间
        map.put("x-message-ttl", 10000);
        // 设置路由到死信交换机的路由key: lisi
        map.put("x-dead-letter-routing-key", "lisi");
        // map.put("x-max-length", 6); // 设置队列最大长度
        channel.queueDeclare(QUEUE_NAME, false, false, true, map);
        // 绑定普通交换机和普通队列
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "zhangsan");
        // 死信交换机
        channel.exchangeDeclare(DEAD_EXCHANGE_NAME, BuiltinExchangeType.DIRECT, false, true, false, null);
        // 死信队列
        channel.queueDeclare(DEAD_QUEUE_NAME, false, false, true, null);
        // 绑定死信交换机和死信队列
        channel.queueBind(DEAD_QUEUE_NAME, DEAD_EXCHANGE_NAME, "lisi");
        // 监听
        channel.basicConsume(QUEUE_NAME, false, (consumerTag, message) -> {
            System.out.println("监听普通队列: " + new String(message.getBody()));
        }, consumerTag -> {});
    }
}

2.5 consumer2编写

public class Consumer2{
    static String DEAD_QUEUE_NAME = "dead_queue";

    public static void main(String[] args) throws Exception{
        Channel channel = MQUtils.getChannel();

        // 监听
        channel.basicConsume(DEAD_QUEUE_NAME, true, (consumerTag, message) -> {
            System.out.println("监听死信队列: " + new String(message.getBody()));
        }, consumerTag -> {});
    }
}

2.6 producer编写

public class Producer {
    static String EXCHANGE_NAME = "normal_exchange";

    public static void main(String[] args) throws Exception{
        Channel channel = MQUtils.getChannel();
        for (int i = 0; i < 10; i++) {
            String msg = i + "";
            channel.basicPublish(EXCHANGE_NAME, "zhangsan",
                    null,
                    msg.getBytes());
        }
    }
}

3.整合springboot

3.1 架构图

在这里插入图片描述

3.2 maven坐标

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
            <version>2.3.9.RELEASE</version>
        </dependency>

3.3 构建配置类,创建exchange,queue,并绑定

package com.xhf.mq.config;

import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class TTLConfig {
    /*-----交换机名称-----*/
    private static final String EXCHANGE_NAME = "X";
    private static final String DEAD_EXCHANGE_NAME = "Y";
    /*-----队列名称-----*/
    private static final String QUEUE_NAME1 = "QA";
    private static final String QUEUE_NAME2 = "QB";
    private static final String DEAD_QUEUE_NAME = "QD";

    // 注册交换机
    @Bean("xExchange")
    public DirectExchange xExchange() {
        return ExchangeBuilder.directExchange(EXCHANGE_NAME)
                .autoDelete()
                .build();
//        return new DirectExchange(EXCHANGE_NAME);
    }

    // 注册死信交换机
    @Bean("deadExchange")
    public DirectExchange deadExchange() {
        return ExchangeBuilder.directExchange(DEAD_EXCHANGE_NAME)
                .autoDelete()
                .build();
    }

    // 注册队列QA
    @Bean("queueA")
    public Queue queueA() {
        return QueueBuilder
                .nonDurable(QUEUE_NAME1)
                .withArgument("x-message-ttl", 10000)
                .withArgument("x-dead-letter-exchange", DEAD_EXCHANGE_NAME)
                .withArgument("x-dead-letter-routing-key", "YD")
                .build();
    }

    // 注册队列QB
    @Bean("queueB")
    public Queue queueB() {
        return QueueBuilder
                .nonDurable(QUEUE_NAME2)
                .withArgument("x-message-ttl", 40000)
                .withArgument("x-dead-letter-exchange", DEAD_EXCHANGE_NAME)
                .withArgument("x-dead-letter-routing-key", "YD")
                .build();
    }

    // 注册队列QD
    @Bean("queueD")
    public Queue queueD() {
        return QueueBuilder
                .nonDurable(DEAD_QUEUE_NAME)
                .build();
    }

    // 绑定普通交换机, 队列
    @Bean
    public Binding queueABindingX(@Qualifier("queueA") Queue queueA, @Qualifier("xExchange") Exchange xExchange) {
        return BindingBuilder.bind(queueA).to(xExchange).with("XA").and(null);
    }

    @Bean
    public Binding queueBBindingX(@Qualifier("queueB") Queue queueB, @Qualifier("xExchange") Exchange xExchange) {
        return BindingBuilder.bind(queueB).to(xExchange).with("XB").and(null);
    }

    // 绑定死信交换机, 队列
    @Bean
    public Binding queueDBindingY(@Qualifier("queueD") Queue queueD, @Qualifier("deadExchange") Exchange deadExchange) {
        return BindingBuilder.bind(queueD).to(deadExchange).with("YD").and(null);
    }
}

3.4 编写生产者(controller的一个方法)

   @GetMapping("/sendToQA")
    public void sendToQA() {
    	// 向X交换机发送消息, 消息通过"XA"路由到队列
        rabbitTemplate.convertAndSend("X", "XA", "hello".getBytes());
    }

3.5 编写消费者(一个类, 方法上加上@RabbitListenner,表明需要监听的queue)

@Component
public class Customer {
    @RabbitListener(queues = "QD")
    public void customer(Message message, Channel channel) {
        byte[] body = message.getBody();
        System.out.println(new String(body));
    }
}

4. 常见参数汇总

  • x-dead-letter-exchange :死信交换机名称
  • x-message-ttl:消息time to live时间(过期时间)
  • x-dead-letter-routing-key:死信交换机路由key
  • x-max-length:队列最大长度

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1052438.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

OpenCV查找和绘制轮廓:findContours和drawContours

1 任务描述&#xff1a; 绘制图中粗线矩形的2个边界&#xff0c;并找到其边界的中心线 图1 原始图像 2.函数原型 findContours( InputOutputArray image, OutputArrayOfArrays contours, OutputArray hierarchy, int mode, …

知识总结 1

传输层重点协议----TCP 1、TCP① TCP协议段格式② TCP原理Ⅰ 确认应答机制&#xff08;可靠性&#xff09;Ⅱ 超时重传机制&#xff08;可靠性&#xff09;Ⅲ 连接管理机制&#xff08;可靠性&#xff09;Ⅳ 滑动窗口机制&#xff08;效率&#xff09;Ⅴ 流量控制机制&#xff…

内存函数的介绍和模拟实现

目录 1.memcpy的使用(内存拷贝) 2.memcpy的实现 3.memmove的使用&#xff08;内存拷贝&#xff09; 4.memmove的实现 5.memset 的使用&#xff08;内存设置&#xff09; 6.memcmp的使用&#xff08;内存比较&#xff09; 1.memcpy的使用(内存拷贝) void * memcpy ( void * …

【C++】C++11------线程库

目录 线程库接口线程接口使用lock_guard与unique_lockmutex(互斥锁)lock_guardunique_lock 原子性操作库条件变量(condition_variable) 线程库接口 在C11之前&#xff0c;涉及到多线程问题&#xff0c;都是和平台相关的&#xff0c;比如windows和linux下各有自己的接口&#x…

PMSM——转子位置估算基于QPLL

文章目录 前言仿真模型观测器速度观测位置观测转矩波形电流波形 前言 今后是电机控制方向的研究生的啦&#xff0c;期待有同行互相交流。 仿真模型 观测器 速度观测 位置观测 转矩波形 电流波形

Fake Maxpooling 二维滑动窗口

先对每一行求一遍滑动窗口&#xff0c;列数变为(列数-k1) 再对每一列求一遍滑动窗口&#xff0c;行数变为(行数-k1) 剩下的就是每一个窗口里的最大值啦 #include<bits/stdc.h> #define IOS ios::sync_with_stdio(0);cin.tie(0);cout.tie(0); #define endl \nusing nam…

【图论C++】链式前向星(图(树)的存储)

/*** file * author jUicE_g2R(qq:3406291309)————彬(bin-必应)* 一个某双流一大学通信与信息专业大二在读 * * brief 一直在竞赛算法学习的路上* * copyright 2023.9* COPYRIGHT 原创技术笔记&#xff1a;转载需获得博主本人…

idea debug 重启弹窗提示窗口询问是否关闭运行着的服务器

目录 方法121版本的IDEA idea重新启动服务器时会有一个提示窗口询问是否关闭运行着的服务器&#xff0c;&#xff0c;这个窗口不小心点了不再提示.重新打开弹窗方法 方法1 idea编辑器由于勾选了不再提示选项导致的弹窗无法继续弹出&#xff1a;解决方案 1.打开项目没提示&…

Rust 使用Cargo

Rust 使用技巧 Rust 使用crates 假设你正在编写一个 Rust 程序&#xff0c;要使用一个名为 rand 的第三方库来生成随机数。首先&#xff0c;你需要在 Cargo.toml 文件中添加以下依赖项&#xff1a; toml [dependencies] rand "0.7.3" 然后运行 cargo build&…

Mybatis 二级缓存(使用Ehcache作为二级缓存)

上一篇我们介绍了mybatis中二级缓存的使用&#xff0c;本篇我们在此基础上介绍Mybatis中如何使用Ehcache作为二级缓存。 如果您对mybatis中二级缓存的使用不太了解&#xff0c;建议您先进行了解后再阅读本篇&#xff0c;可以参考&#xff1a; Mybatis 二级缓存https://blog.c…

VisionTransformer(ViT)详细架构图

这是原版的架构图&#xff0c;少了很多东西。 这是我根据源码总结出来的详细版 有几点需要说明的&#xff0c;看架构图能看懂就不用看注释了。 &#xff08;1&#xff09;输入图片必须是 224x224x3 的&#xff0c;如果不是就把它缩放到这个尺寸。 &#xff08;2&#xff09;T…

文本嵌入层

1、代码演示 embedding nn.Embedding(10,3) print(embedding) input torch.LongTensor([[1,2,3,4],[4,3,2,9]]) embedding(input) 2、构建Embeddings类来实现文本嵌入层 # 构建Embedding类来实现文本嵌入层 class Embeddings(nn.Module):def __init__(self,d_model,vocab):…

uboot启动流程-涉及_main汇编函数

一. uboot启动流程涉及函数 本文简单分析一下 save_boot_params_ret调用的函数&#xff1a;_main汇编函数。 本文继之前文章的学习&#xff0c;地址如下&#xff1a; uboot启动流程-涉及s_init汇编函数_凌肖战的博客-CSDN博客 二. uboot启动流程涉及的 _main汇编函数 经过之…

微信公众号

title: “微信公众号” createTime: 2022-01-05T10:14:2008:00 updateTime: 2022-01-05T10:14:2008:00 draft: false author: “name” tags: [“杂”] categories: [“software”] description: “测试的” 公众号发布文章 文章目录 title: "微信公众号" createTim…

数据结构与算法基础-(5)---栈的应用-(1)括号匹配

&#x1f308;write in front&#x1f308; &#x1f9f8;大家好&#xff0c;我是Aileen&#x1f9f8;.希望你看完之后&#xff0c;能对你有所帮助&#xff0c;不足请指正&#xff01;共同学习交流. &#x1f194;本文由Aileen_0v0&#x1f9f8; 原创 CSDN首发&#x1f412; 如…

UG\NX二次开发 通过点云生成曲面 UF_MODL_create_surf_from_cloud

文章作者:里海 来源网站:《里海NX二次开发3000例专栏》 感谢粉丝订阅 感谢 Rlgun 订阅本专栏,非常感谢。 简介 有网友想做一个通过点云生成曲面的程序,我们也试一下 效果 代码 #include "me.hpp" /*HEAD CREATE_SURF_FROM_CLOUD CCC UFUN */

小谈设计模式(6)—依赖倒转原则

小谈设计模式&#xff08;6&#xff09;—依赖倒转原则 专栏介绍专栏地址专栏介绍 依赖倒转原则核心思想关键点分析abc 优缺点分析优点降低模块间的耦合度提高代码的可扩展性便于进行单元测试 缺点增加代码的复杂性需要额外的设计和开发工作 Java代码实现示例分析 总结 专栏介绍…

python编写修改sqlmap进行_WAF绕过

WAF绕过 文章目录 WAF绕过1 waf机制了解1.1 waf防火墙识别工具1.2 WAF机制及绕过方法总结: [绕waf参考总结地址](https://www.freebuf.com/articles/web/229982.html)1.3 绕过waf&#xff08;安全狗&#xff09;方式 2 绕过分析 -替换格式3 编写py脚本绕过安全狗3.1启动编好的脚…

Bug:elementUI样式不起作用、Vue引入组件报错not found等(Vue+ElementUI问题汇总)

前端问题合集&#xff1a;VueElementUI 1. Vue引用Element-UI时&#xff0c;组件无效果解决方案 前提&#xff1a; 已经安装好elementUI依赖 //安装依赖 npm install element-ui //main.js中导入依赖并在全局中使用 import ElementUI from element-ui Vue.use(ElementUI)如果此…

VBA技术资料MF62:创建形状添加文本及设置颜色

【分享成果&#xff0c;随喜正能量】须知往生净土&#xff0c;全仗信、愿。有信、愿&#xff0c;即未得三昧、未得一心不乱&#xff0c;亦可往生。且莫只以一心不乱&#xff0c;及得念佛三昧为志事&#xff0c;不复以信、愿、净念为事。。 我给VBA的定义&#xff1a;VBA是个人…