rabbitmq交换机,死信队列的简单例子

news2025/1/18 6:45:29

       假设我们有一个场景,生产者有消息发到某个直连交换机,这个交换机上有两个队列分别存储两种类型的消息,但是与这两个队列相连的消费者太不争气了,处理消息有点慢,我们想5秒钟这个消息在队列中还没有被消费的话,就给它丢进死信队列里得了(我们平时听到的延时队列其实就可按此方法实现,故意让它过期然后延时处理),后续再处理,但是这俩队列明显存储的消息不一样,我们又不好意思将它都扔到同一个死信队列里去,如果我们想要俩死信队列分别装这两个消费者漏掉的消息,那我们怎么做呢?

        下面就是一个简单的例子,如果用spring boot之类的去做也是类似,原理差不多,感兴趣的可以自己改造。

        预处理:我们先创建一个工具类用来连接rabbitmq,注意你需要去创建对应的虚拟主机,以及对应的登录账号和密码。

工具类如下: 

import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;

public class ConnectionUtil {
    public static Connection getConnection() throws Exception {
        //定义连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        //设置服务地址
        factory.setHost("localhost");
        //端口
        factory.setPort(5672);
        //设置账号信息,用户名、密码、vhost
        //VirtualHost(虚拟主机)是一个逻辑上独立的RabbitMQ服务实例。每个VirtualHost都有自己的队列、交换机、绑定等对象,并且它们之间是相互隔离的,即exchange、queue、message不能互通。
        factory.setVirtualHost("myVirtualHost");
        factory.setUsername("mytest");
        factory.setPassword("mytest");
        // 通过工程获取连接
        Connection connection = factory.newConnection();
        return connection;
    }
}

        

现在我们有一个直连交换机test_exchange_direct(直连交换机即根据设置的固定键直接路由到对应的队列,注意与主题topic队列的区分),我们往这个交换机里每300毫秒分别发送键为good和bad的数据各30条。

import com.dubbo.study.dubbostudy.utils.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

public class SendToExchange {

    private final static String EXCHANGE_NAME = "test_exchange_direct";

    public static void main(String[] argv) throws Exception {
        // 获取到连接以及mq通道
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();

        // 声明exchange
        channel.exchangeDeclare(EXCHANGE_NAME, "direct");

        
        //这里要注意,如果你没有响应的队列的话即交换机还没有绑定队列,发送消息到交换机这些消息会丢失。
        for (int i = 0; i < 30; i++) {
            // 消息内容
            String message = "good " + i;
            //会路由到good对应的队列上
            channel.basicPublish(EXCHANGE_NAME, "good", null, message.getBytes());
            System.out.println(" [x] Sent '" + message + "'");
            Thread.sleep(300);
        }

        for (int i = 0; i < 30; i++) {
            // 消息内容
            String message = "bad " + i;
            //会路由到bad对应的队列上
            channel.basicPublish(EXCHANGE_NAME, "bad", null, message.getBytes());
            System.out.println(" [x] Sent '" + message + "'");
            Thread.sleep(300);
        }

        channel.close();
        connection.close();
    }
}

        我们再创建一个直连死信交换机dead_exchange_direct,和连接到此私信交换机上的两个队列dead_queue,dead_queue1,对应的键分别为dead-good和dead-bad。

import com.dubbo.study.dubbostudy.utils.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

public class DeadExchange {

    private final static String EXCHANGE_NAME = "dead_exchange_direct";

    private final static String QUEUE_NAME = "dead_queue";

    private final static String QUEUE_NAME1 = "dead_queue1";

    public static void main(String[] argv) throws Exception {
        channel1();
        channel2();
    }

    public static void channel1() throws Exception{
        // 获取到连接以及mq通道
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();

        // 声明exchange
        channel.exchangeDeclare(EXCHANGE_NAME, "direct");

        // 声明队列 注意:一个消费者队列可以有多个消费者实例,只有其中一个消费者实例会消费
        // 队列名称,是否持久化,是否排他,是否自动删除,自定义属性
        channel.queueDeclare(QUEUE_NAME, true, false, false, null);

        // 绑定队列到交换机  死信路由键为dead
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "dead-good");


        channel.close();
        connection.close();
    }

    public static void channel2() throws Exception{
        // 获取到连接以及mq通道
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();

        // 声明exchange
        channel.exchangeDeclare(EXCHANGE_NAME, "direct");

        // 声明队列 注意:一个消费者队列可以有多个消费者实例,只有其中一个消费者实例会消费
        // 队列名称,是否持久化,是否排他,是否自动删除,自定义属性
        channel.queueDeclare(QUEUE_NAME1, true, false, false, null);

        // 绑定队列到交换机  死信路由键为dead
        channel.queueBind(QUEUE_NAME1, EXCHANGE_NAME, "dead-bad");


        channel.close();
        connection.close();
    }

第一个不争气消费者RecvFromExchange,这个消费者对应的队列是good_queue队列,它800毫秒能处理一条消息,给他设置读取队列消息过期时间为5秒,绑定dead_exchange_direct死信交换机,死信队列路由键为dead-good

import com.dubbo.study.dubbostudy.utils.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.QueueingConsumer;

import java.util.HashMap;
import java.util.Map;

public class RecvFromExchange {

    private final static String QUEUE_NAME = "good_queue";

    private final static String EXCHANGE_NAME = "test_exchange_direct";

    public static void main(String[] argv) throws Exception {

        // 获取到连接以及mq通道
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();

        Map<String,Object> args = new HashMap<>();
        args.put("x-message-ttl",5000);
        args.put("x-dead-letter-exchange","dead_exchange_direct");
        args.put("x-dead-letter-routing-key","dead-good"); // 死信路由键dead 路由到键为dead的死信队列

        // 声明队列 注意:一个消费者队列可以有多个消费者实例,只有其中一个消费者实例会消费
        // 队列名称,是否持久化,是否排他,是否自动删除,自定义属性
        channel.queueDeclare(QUEUE_NAME, true, false, false, args);

        // 绑定队列到交换机
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "good");

        // 同一时刻服务器只会发一条消息给消费者
        channel.basicQos(1);

        // 定义队列的消费者
        QueueingConsumer consumer = new QueueingConsumer(channel);
        // 监听队列,手动返回完成
        channel.basicConsume(QUEUE_NAME, false, consumer);

        // 获取消息
        while (true) {
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String message = new String(delivery.getBody());
            System.out.println(" [Recv] Received '" + message + "'");
            Thread.sleep(800);

            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
        }
    }

}

第二个不争气消费者RecvFromExchange2,这个消费者对应的队列是bad_queue队列,它1秒能处理一条消息,它虽然慢一些但是我就是一视同仁给他设置读取队列消息过期时间也为5秒,绑定dead_exchange_direct死信交换机,死信队列路由键为dead-bad

import com.dubbo.study.dubbostudy.utils.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.QueueingConsumer;

import java.util.HashMap;
import java.util.Map;

public class RecvFromExchange2 {

    private final static String QUEUE_NAME = "bad_queue";

    private final static String EXCHANGE_NAME = "test_exchange_direct";

    public static void main(String[] argv) throws Exception {

        // 获取到连接以及mq通道
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();


        Map<String,Object> args = new HashMap<>();
        args.put("x-message-ttl",5000);
        args.put("x-dead-letter-exchange","dead_exchange_direct");
        args.put("x-dead-letter-routing-key","dead-bad"); // 死信路由键dead 路由到键为dead的死信队列

        // 声明队列 注意:一个消费者队列可以有多个消费者实例,只有其中一个消费者实例会消费
        // 队列名称,是否持久化,是否排他,是否自动删除,自定义属性
        channel.queueDeclare(QUEUE_NAME, true, false, false, args);

        // 绑定队列到交换机
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "bad");

        // 同一时刻服务器只会发一条消息给消费者
        channel.basicQos(1);

        // 定义队列的消费者
        QueueingConsumer consumer = new QueueingConsumer(channel);
        // 监听队列,手动返回完成
        channel.basicConsume(QUEUE_NAME, false, consumer);

        // 获取消息
        while (true) {
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String message = new String(delivery.getBody());
            System.out.println(" [Recv] Received '" + message + "'");
            Thread.sleep(1000);

            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
        }
    }

}

按顺序先启动DeadExchange,SendToExchange,RecvFromExchange,RecvFromExchange2。然后再次启动SendToExchange,重新发数据观察发现,这两个不争气的消费者漏掉的数据最后被死信队列接收了。

 

接下来我们对我们喜欢的绑定键dead-good的好队列给它兜底擦屁股。

import com.dubbo.study.dubbostudy.utils.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.QueueingConsumer;

public class FuckDeadQueue {

    private final static String EXCHANGE_NAME = "dead_exchange_direct";

    private final static String QUEUE_NAME = "dead_queue";

    public static void main(String[] argv) throws Exception {
        // 获取到连接以及mq通道
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();

        // 绑定队列到交换机  死信路由键为dead
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "dead-good");

        // 同一时刻服务器只会发一条消息给消费者
        channel.basicQos(1);

        // 定义队列的消费者
        QueueingConsumer consumer = new QueueingConsumer(channel);
        // 监听队列,手动返回完成
        channel.basicConsume(QUEUE_NAME, false, consumer);

        // 获取消息
        while (true) {
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String message = new String(delivery.getBody());
            System.out.println(" [Recv] Received '" + message + "'");
            Thread.sleep(1000);

            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
        }

    }

}

 执行后发现,死信队列里的消息被我们消费掉了。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1674593.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

初识java——javaSE(4)类与对象

文章目录 前言一 类与对象1.1 面向过程与面向对象思想的区别&#xff1a;1.2 类的定义1.3 类的实例化——对象通过创建对象&#xff0c;调用对象中的成员变量与方法 1.4 this关键字this的作用一&#xff1a;this 的作用二构造方法&#xff1a;对象创建的两步方法的重载 this的作…

基础ArkTS组件:输入框,开关,评分条(HarmonyOS学习第三课【3.3】)

输入框组件 ArkUI开发框架提供了 2 种类型的输入框&#xff1a; TextInput 和 TextArea &#xff0c;前者只支持单行输入&#xff0c;后者支持多行输入&#xff0c;下面我们分别做下介绍。 TextInput 子组件 无 接口 TextInput(value?:{placeholder?: ResourceStr, tex…

Echarts结课之小杨总结版

Echarts结课之小杨总结版 前言基础回顾框架sale框架代码&#xff1a; user框架基础代码&#xff1a; inventory框架基础代码&#xff1a; total框架基础代码&#xff1a; 基础设置1.标题(Title)2.图例(Legend)实现 3.工具提示(Tooltip)实现 4.X轴(X Axis) 和 Y轴(Y Axis)5.数据…

架构设计入门(Redis架构模式分析)

目录 架构为啥要设计Redis 支持的四种架构模式单机模式性能分析优点缺点 主从复制&#xff08;读写分离&#xff09;结构性能分析优点缺点适用场景 哨兵模式结构优点缺点应用场景 集群模式可用性和可扩展性分析单机模式主从模式哨兵模式集群模式 总结 本文主要以 Redis 为例&am…

【Python |基础入门】入门必备知识(基础各方面全覆盖)

✨✨谢谢大家捧场&#xff0c;祝屏幕前的小伙伴们每天都有好运相伴左右&#xff0c;一定要天天开心哦&#xff01;✨✨ &#x1f388;&#x1f388;作者主页&#xff1a; &#x1f388;丠丠64-CSDN博客&#x1f388; ✨✨ 帅哥美女们&#xff0c;我们共同加油&#xff01;一起…

Linux —— 线程控制

Linux —— 线程控制 创建多个线程线程的优缺点优点缺点 pthread_self进程和线程的关系pthread_exit 线程等待pthread_ join线程的返回值线程分离pthread_detach 线程取消pthread_cancel pthread_t 的理解 我们今天接着来学习线程&#xff1a; 创建多个线程 我们可以结合以前…

图搜索算法-最短路径算法-贝尔曼-福特算法

相关文章&#xff1a; 数据结构–图的概念 图搜索算法 - 深度优先搜索法&#xff08;DFS&#xff09; 图搜索算法 - 广度优先搜索法&#xff08;BFS&#xff09; 图搜索算法 - 拓扑排序 图搜索算法-最短路径算法-戴克斯特拉算法 贝尔曼-福特算法&#xff08;Bellman-Ford&#…

树莓派 4B putty远程连接登录显示拒绝访问,密码修改

putty显示拒绝访问 可能是树莓派的ip没有找到正确的 在下载系统镜像的时候&#xff0c;会提示设置wifi 这里设置的WiFi和密码需记住&#xff0c;主机名也需记住 可以在手机打开热点&#xff08;将热点的账号和密码改为跟你设置的wifi一样的&#xff09; 可以在手机后台查看…

Linux系统的第六天

昨天&#xff0c;学习了vim编辑工具&#xff0c;今天学习Linux系统的目录结构、补充命令和配置网络。 一、目录结果 1.1目录的特点 Windows和Linux&#xff1a; Windows中c、d、e盘&#xff0c;每个都是一个根系统【多根系统】&#xff1b;Linux中只有一个根【单根系统…

Java数据类型:基本数据类型

Java是一种强类型语言&#xff0c;定义变量时&#xff0c;必须指定数据类型。 // 变量必须指定数据类型 private String username;初学者不免有个疑问&#xff1a;在实际编写代码的过程中&#xff0c;该如何选择数据类型呢&#xff1f; 回答这个问题之前&#xff0c;先来解决…

vulhub靶机struts2环境下的s2-032(CVE-2016-3081)(远程命令执行漏洞)

影响范围 Struts 2.3.19至2.3.20.2、2.3.21至2.3.24.1和2.3.25至2.3.28 当用户提交表单数据并验证失败时&#xff0c;后端会将用户之前提交的参数值使用OGNL表达式%{value}进行解析&#xff0c;然后重新填充到对应的表单数据中。 漏洞搭建 没有特殊要求&#xff0c;请看 (3…

给定两点所能得到的数学关系

给定两点所能得到的数学关系 正文 正文 这里介绍一个基础问题&#xff0c;如果给定平面上的两个点的坐标&#xff0c;那么它们之间能够得到什么数学关系呢&#xff1f; ω arctan ⁡ y 1 − y 0 x 1 − x 0 x 1 − x 0 d cos ⁡ ω y 1 − y 0 d cos ⁡ ω d ( x 1 − x…

干部谈话考察:精准洞悉,助推成长

在组织人事管理的精细布局中&#xff0c;干部谈话考察扮演着举足轻重的角色。它不仅是组织深度了解干部、精准评价其表现的重要窗口&#xff0c;更是推动干部个人成长、优化组织人才配置的关键一环。通过深入的谈话考察&#xff0c;我们能够全面把握干部的思想脉搏、工作能力、…

AngularJS指令

指令分类&#xff1a; 1&#xff09;装饰器型指令 装饰器指令的作用是为DOM添加行为&#xff0c;使其具有某种能力。在AngularS中&#xff0c;大多数内置指令属于装饰器型指令&#xff0c;例如ng-click(单击事件)、ng-hide/ng-show(控制DOM元素的显示和隐藏)等 2&#xff09;组…

uniapp 生成安卓证书没有md5指纹怎么办?

由于最新的jdk版本对应的keystore工具无法查看到md5指纹信息 但是不代表它没有md5指纹信息&#xff0c;只是看不到而已 解决方案&#xff1a; 登录uniapp开发者后台生成安卓云端证书

SSM【Spring SpringMVC Mybatis】—— Spring(二)

如果对于Spring的一些基础理论感兴趣可见&#x1f447; SSM【Spring SpringMVC Mybatis】—— Spring&#xff08;一&#xff09; 目录 1、Spring中bean的作用域 1.1 语法 1.2 四个作用域 2、Spring中bean的生命周期 2.1 bean的生命周期 2.2 bean的后置处理器 2.3 添加后…

【Vue】Vue指令与生命周期以及组件化编码

目录 常用内置指令v-text与v-htmlv-text : 更新元素的 textContentv-html : 更新元素的 innerHTML注意&#xff1a;v-html有安全性问题&#xff01;&#xff01;&#xff01;&#xff01; v-once与v-prev-oncev-pre ref与v-cloakrefv-cloak 自定义指令案例定义语法配置对象中常…

一键批量合并视频:掌握视频剪辑技巧解析,轻松创作完美影片

在数字时代的浪潮下&#xff0c;视频已成为人们记录和分享生活的重要工具。然而&#xff0c;对于许多非专业视频编辑者来说&#xff0c;将多个视频片段合并成一个完整的影片却是一项复杂且耗时的任务。幸运的是&#xff0c;云炫AI智剪一键批量合并视频功能的出现&#xff0c;让…

QT切换控件布局

1、切换前垂直布局 2、切换后水平布局 3、关键代码 qDebug() << "开始切换布局";QWidget *widget centralWidget();QLayout *layout widget->layout();if(layout){while(layout->count()){QLayoutItem *item layout->takeAt(0);if(item->layout…

自动化神器Autolt,让你不再重复工作!

随着互联网不断发展&#xff0c;它给我们带来便利的同时&#xff0c;也带来了枯燥、重复、机械的重复工作。今天&#xff0c;我要和大家分享一款老牌实用的自动化工具&#xff1a;AutoIt&#xff0c;它能够让你告别繁琐的重复性工作&#xff0c;提高工作效率。 这里透露一下&am…