RabbitMQ (4)

news2024/11/19 16:32:16

RabbitMQ (4)

文章目录

  • 1. 死信的概念
  • 2. 死信的来源
  • 3. 死信代码案例
    • 3.1 TTL 过期时间
    • 3.2 超过队列最大长度
    • 3.3 拒绝消息

前言

上文我们已经学习完 交换机 ,知道了几个交换机的使用 ,下面我们来学习一下 死信队列

1. 死信的概念


先从概念解释上搞清楚这个定义,死信,顾名思义就是无法被消费的消息,字面意思可以这样理解,一般来说,producer 将消息投递到 broker 或者直接到queue 里了,consumer 从 queue 取出消息 进行消费,但某些时候由于特定的原因导致 queue 中的某些消息无法被消费,这样的消息如果没有后续的处理,就变成了死信,有死信自然就有了死信队列。

应用场景:为了保证订单业务的消息数据不丢失,需要使用到 RabbitMQ 的死信队列机制,当消息消费发生异常时,将消息投入死信队列中。还有比如说:用户在商城下单成功并点击去支付后在指定时间未支付时自动失效。

2. 死信的来源

  1. 消息 TTL 过期 : TTL 是 Time To Live 的缩写, TTL 就是 生存时间
  2. 队列达到最长长度 : 队列满了 , 无法添加数据到 MQ 中
  3. 消息被拒绝 (basic.reject 或 basic.nack) 并且 requeue = false

3. 死信代码案例

这里 创建一个 direct 交换机 ,两个消费者 , 一个生产者 , 两个 队列 (一个为 消息队列 , 一个为死信队列)


图:

在这里插入图片描述


代码 :

3.1 TTL 过期时间

生产者:

package org.example.seven;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import org.example.utils.RabbitMQUtils;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

// 生产者
public class Producer {

    // 普通交换机的名称
    public static final String NORMAL_EXCHANGE = "normal_exchange";

    public static void main(String[] args) throws IOException, TimeoutException {
        Channel channel = RabbitMQUtils.getChannel();

        // 设置消息的过期时间 (TTL) 单位是 ms --> 设置消息的过期时间为 10s
        AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().expiration("10000").build();

        for (int i = 1; i < 11; i++) {
            String message = "info" + i;
            channel.basicPublish(NORMAL_EXCHANGE, "zhangsan", properties, message.getBytes());
        }
    }
}

消费者 c1 (启动之后关闭该消费者, 模拟其接受不到消息)

package org.example.seven;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import org.example.utils.RabbitMQUtils;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.TimeoutException;

public class Consumer01 {

    // 普通交换机的名称
    public static final String NORMAL_EXCHANGE = "normal_exchange";

    // 死信交换机的名称
    public static final String DEAD_EXCHANGE = "dead_change";

    // 普通队列的名称
    public static final String NORMAL_QUEUE = "normal_queue";

    // 死刑队列的名称
    public static final String DEAD_QUEUE = "dead_queue";

    public static void main(String[] args) throws IOException, TimeoutException {
        Channel channel = RabbitMQUtils.getChannel();

        // 声明死信和普通交换机 , 类型为 direct (直接交换机)

        // 普通交换机
        channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
        // 死信交换机
        channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);

        // 声明普通队列
        Map<String, Object> arguments = new HashMap<>();

        // 过期时间 10s 由生产者指定 更加灵活
//        arguments.put("x-message-ttl", 10000);

        // 正常的队列设置死信交换机
        arguments.put("x-dead-letter-exchange", DEAD_EXCHANGE);

        // 设置死信 路由键 (routingKey)
        arguments.put("x-dead-letter-routing-key", "lisi");

        // 声明队列
        channel.queueDeclare(NORMAL_QUEUE, false, false, false, arguments);

        // 申明死刑队列
        channel.queueDeclare(DEAD_QUEUE, false, false, false, null);

        // 绑定普通的交换机与队列
        channel.queueBind(NORMAL_QUEUE, NORMAL_EXCHANGE, "zhangsan");

        // 绑定死信的交换机与死信的队列
        channel.queueBind(DEAD_QUEUE, DEAD_EXCHANGE, "lisi");

        System.out.println("等待接受消息");

        DeliverCallback deliverCallback = (tag, message) -> {
            System.out.println("C1 接收到的消息为: " + new String(message.getBody(), "UTF-8"));
        };
        channel.basicConsume(NORMAL_QUEUE, true, deliverCallback, (tag) -> {
        });
    }
}

先启动消费者 C1,创建出队列,然后停止该 C1 的运行,则 C1 将无法收到队列的消息,无法收到的消息 10 秒后进入死信队列。启动生产者 producer 生产消息

在这里插入图片描述


c1 看完,我们在来写 c2 消费者 ,将进入到死信队列的消息 进行消费.


消费者c2

package org.example.seven;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import org.example.utils.RabbitMQUtils;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Consumer02 {

    // 死信队列的名称
    public static final String DEAD_QUEUE = "dead_queue";

    public static void main(String[] args) throws IOException, TimeoutException {
        Channel channel = RabbitMQUtils.getChannel();

        System.out.println("等待接受死信消息......");

        DeliverCallback deliverCallback = (tag, message) -> {
            System.out.println("C2 接收到的消息: " + new String(message.getBody(), "UTF-8"));
        };

        channel.basicConsume(DEAD_QUEUE, true, deliverCallback,(tag)->{});
    }
}


图:

在这里插入图片描述


看完 消息过期后 ,消息转发到 死信队列 被 c2 消费,下面我们来 尝试使用 死信最大长度 (队列满了,将多的消息转发到死信队列中)

3.2 超过队列最大长度

消息生产者代码 去掉 TTL 属性 , 将 basicPublish 的第三个参数改为 null


生产者:

package org.example.seven;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import org.example.utils.RabbitMQUtils;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

// 生产者
public class Producer {

    // 普通交换机的名称
    public static final String NORMAL_EXCHANGE = "normal_exchange";

    public static void main(String[] args) throws IOException, TimeoutException {
        Channel channel = RabbitMQUtils.getChannel();

        // 设置消息的过期时间 (TTL) 单位是 ms --> 设置消息的过期时间为 10s
//        AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().expiration("10000").build();

        for (int i = 1; i < 11; i++) {
            String message = "info" + i;
//            channel.basicPublish(NORMAL_EXCHANGE, "zhangsan", properties, message.getBytes());
            channel.basicPublish(NORMAL_EXCHANGE, "zhangsan", null, message.getBytes());
        }
    }
}


c1 消费者 (启动之后关闭该消费者 模拟其接收不到消息)

package org.example.seven;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import org.example.utils.RabbitMQUtils;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.TimeoutException;

public class Consumer01 {

    // 普通交换机的名称
    public static final String NORMAL_EXCHANGE = "normal_exchange";

    // 死信交换机的名称
    public static final String DEAD_EXCHANGE = "dead_change";

    // 普通队列的名称
    public static final String NORMAL_QUEUE = "normal_queue";

    // 死刑队列的名称
    public static final String DEAD_QUEUE = "dead_queue";

    public static void main(String[] args) throws IOException, TimeoutException {
        Channel channel = RabbitMQUtils.getChannel();

        // 声明死信和普通交换机 , 类型为 direct (直接交换机)

        // 普通交换机
        channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
        // 死信交换机
        channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);

        // 声明普通队列
        Map<String, Object> arguments = new HashMap<>();

        // 过期时间 10s 由生产者指定 更加灵活
//        arguments.put("x-message-ttl", 10000);

        // 正常的队列设置死信交换机
        arguments.put("x-dead-letter-exchange", DEAD_EXCHANGE);

        // 设置死信 路由键 (routingKey)
        arguments.put("x-dead-letter-routing-key", "lisi");

        // 设置队列的限制 , 例如 发送 10 个消息 , 6 个为正常 , 4 个为死信
        arguments.put("x-max-length", 6);

        // 声明队列
        channel.queueDeclare(NORMAL_QUEUE, false, false, false, arguments);

        // 申明死刑队列
        channel.queueDeclare(DEAD_QUEUE, false, false, false, null);

        // 绑定普通的交换机与队列
        channel.queueBind(NORMAL_QUEUE, NORMAL_EXCHANGE, "zhangsan");

        // 绑定死信的交换机与死信的队列
        channel.queueBind(DEAD_QUEUE, DEAD_EXCHANGE, "lisi");

        System.out.println("等待接受消息");

        DeliverCallback deliverCallback = (tag, message) -> {
            System.out.println("C1 接收到的消息为: " + new String(message.getBody(), "UTF-8"));
        };
        channel.basicConsume(NORMAL_QUEUE, true, deliverCallback, (tag) -> {
        });
    }
}


注意:

这参数改变了(没有设置 ttl 时间,新增了 队列的 最大长度限制 为 6) ,所以 需要把原来队列删除


消费者c2 代码不变

package org.example.seven;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import org.example.utils.RabbitMQUtils;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Consumer02 {

    // 死信队列的名称
    public static final String DEAD_QUEUE = "dead_queue";

    public static void main(String[] args) throws IOException, TimeoutException {
        Channel channel = RabbitMQUtils.getChannel();

        System.out.println("等待接受死信消息......");

        DeliverCallback deliverCallback = (tag, message) -> {
            System.out.println("C2 接收到的消息: " + new String(message.getBody(), "UTF-8"));
        };

        channel.basicConsume(DEAD_QUEUE, true, deliverCallback,(tag)->{});
    }
}


效果:

在这里插入图片描述


这里 之所以要启动 c1 后在关闭,是为了展示 6个消息放到 普通队列 ,4个消息放到死信队列, 如果不这么做,发送的 10个消息 都会被 c1 消费 ,(消息发送到 队列后 , 立马 转发给 c1 导致 队列就不会达到 6 个 ,队列不会满 ,也就不会将消息转化给 死信队列).

在这里插入图片描述

3.3 拒绝消息


消息生产者 和 消费者 c2 与上面的代码一样

这里我们 拒绝 info7 消息 ,想要 拒绝 info7 消息,我们可以采用手动应答.


消费者c1

package org.example.seven;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import org.example.utils.RabbitMQUtils;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.TimeoutException;

public class Consumer01 {

    // 普通交换机的名称
    public static final String NORMAL_EXCHANGE = "normal_exchange";

    // 死信交换机的名称
    public static final String DEAD_EXCHANGE = "dead_change";

    // 普通队列的名称
    public static final String NORMAL_QUEUE = "normal_queue";

    // 死刑队列的名称
    public static final String DEAD_QUEUE = "dead_queue";

    public static void main(String[] args) throws IOException, TimeoutException {
        Channel channel = RabbitMQUtils.getChannel();

        // 声明死信和普通交换机 , 类型为 direct (直接交换机)

        // 普通交换机
        channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
        // 死信交换机
        channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);

        // 声明普通队列
        Map<String, Object> arguments = new HashMap<>();

        // 过期时间 10s 由生产者指定 更加灵活
//        arguments.put("x-message-ttl", 10000);

        // 正常的队列设置死信交换机
        arguments.put("x-dead-letter-exchange", DEAD_EXCHANGE);

        // 设置死信 路由键 (routingKey)
        arguments.put("x-dead-letter-routing-key", "lisi");

        // 设置队列的限制 , 例如 发送 10 个消息 , 6 个为正常 , 4 个为死信
//        arguments.put("x-max-length", 6);

        // 声明队列
        channel.queueDeclare(NORMAL_QUEUE, false, false, false, arguments);

        // 申明死刑队列
        channel.queueDeclare(DEAD_QUEUE, false, false, false, null);

        // 绑定普通的交换机与队列
        channel.queueBind(NORMAL_QUEUE, NORMAL_EXCHANGE, "zhangsan");

        // 绑定死信的交换机与死信的队列
        channel.queueBind(DEAD_QUEUE, DEAD_EXCHANGE, "lisi");

        System.out.println("等待接受消息");

        DeliverCallback deliverCallback = (tag, message) -> {
            String msg = new String(message.getBody(), "UTF-8");
            if (msg.equals("info7")) {
                System.out.println("C1 接收到消息为: " + msg + " 此消息被 C1 拒绝");
                //requeue 设置为 false 代表拒绝重新入队 该队列如果配置了死信交换机将发送到死信队列中
                channel.basicReject(message.getEnvelope().getDeliveryTag(), false);
            } else {
                System.out.println("C1 接收到的消息为: " + msg);
                channel.basicAck(message.getEnvelope().getDeliveryTag(), false);
            }
        };
//        channel.basicConsume(NORMAL_QUEUE, true, deliverCallback, (tag) -> {});
        // 开启 手动应答
        channel.basicConsume(NORMAL_QUEUE, false, deliverCallback, (tag) -> {
        });
    }
}

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1138133.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

契约锁助力电子检测报告应用,杜绝假证书、出证更便捷

国家市场监管总局发布的最新数据显示&#xff1a;2022年&#xff0c;全国5.2万家检验检测机构出具检验检测报告共6.5亿份。按照一份纸质报告3-5页、成本约15元计算&#xff0c;“电子检测报告”的应用可以帮助检验检测行业一年节省约27亿张纸、97.5亿元的成本费。 引入电子签章…

【算法小课堂】深入理解前缀和算法

前缀和是指某序列的前n项和&#xff0c;可以把它理解为数学上的数列的前n项和&#xff0c;而差分可以看成前缀和的逆运算。合理的使用前缀和与差分&#xff0c;可以将某些复杂的问题简单化。 我们通过一个例子来理解前缀和算法的优势&#xff1a; 一维前缀和&#xff1a; ww…

10.26课上)计数排序,分割字符串

课上 计数排序 思路就是用数组下标对应元素&#xff0c;记录完后从头遍历&#xff0c;填到新数组里 和为零的最长子段 子段必须是要在原序列的基础上取出来的&#xff0c;相对顺序不变&#xff0c;而且没有间隔 用前缀和&#xff0c;如果一个子序列的和为0&#xff0c;那么…

【python海洋专题三十】画南海115°E的温度剖面图

【python海洋专题三十】画南海115E的温度剖面图 【python海洋专题一】查看数据nc文件的属性并输出属性到txt文件 【python海洋专题二】读取水深nc文件并水深地形图 【python海洋专题三】图像修饰之画布和坐标轴 【Python海洋专题四】之水深地图图像修饰 【Python海洋专题五】…

FL Studio2024重磅更新 包含FL水果21.1破解版安装包下载

FL Studio是一款非常好用方便的音频媒体制作工具&#xff0c;它的功能是非常的强大全面的&#xff0c;想必那些喜欢音乐创作的朋友们应该都知道这款软件是多么的好用吧&#xff0c;它还能够给用户们带来更多的创作灵感&#xff0c;进一步加强提升我们的音乐制作能力。该软件还有…

c语言进制的转换16进制转换10进制

c语言进制的转换16进制转换10进制与16转10 c语言的进制的转换 c语言进制的转换16进制转换10进制与16转10一、16进制的介绍二、16进制转换10进制方法 一、16进制的介绍 十六进制&#xff1a; 十六进制逢十六进一&#xff0c;所有的数组是0到9和A到F组成&#xff0c;其中A代表10…

RLHF系统设计关键问答及案例

目录 RLHF介绍RLHF是什么RLHF适用于哪些任务RLHF和其他构建奖励模型的方法相比有何优劣什么样的人类反馈才是好的反馈RLHF算法有哪些类别&#xff0c;各有什么优缺点RLHF采用人类反馈会带来哪些局限如何降低人类反馈带来的负面影响案例 RLHF介绍 RLHF&#xff08;Reinforcemen…

Linux创建逻辑卷并扩容(超详细)

目录 ​编辑 一、概念解析 1、LV逻辑卷 2、PV物理卷 3、VG卷组 二、扩容前准备 三、创建逻辑卷并扩容 1、打开虚拟机 2、进入root用户 3、查看新加入的硬盘 4、创建主分区 5、创建物理卷 6、打包为一个卷组 7、创建逻辑卷 8、格式化逻辑卷 9、挂载逻辑卷--开机自…

企业如何安全跨国传输30T文件数据

对于一些对数据敏感性比较高的企业&#xff0c;如IT企业和国企等&#xff0c;跨国数据传输是当今企业面临的一个重要挑战&#xff0c;尤其是当数据量达到30T这样的规模时&#xff0c;如何保证数据的速度、安全和合规性&#xff0c;就成为了企业必须考虑的问题。本文将从以下几个…

pytorch-fastrcnn识别王者荣耀敌方英雄血条

文章目录 前言效果如下实现训练数据获得训练数据和测试数据yaml文件训练py画框文件的修改py测试py 前言 最近看王者荣耀视频看到了一个别人提供的一个百里自动设计解决方案,使用一个外设放在百里的二技能上,然后拖动外设在屏幕上滑动,当外设检测到有敌方英雄时外设自动松开百里…

为什么企业都在建立指标体系,有什么用途?

什么是指标体系 指标是指企业从不同角度梳理日常业务活动&#xff0c;把积累的庞大数据提炼成不同的业务指标&#xff0c;然后反过来用指标来指代具体的业务活动。 指标体系则是把这些从不同部门、业务、人员中提炼出的业务指标融合汇总到一起&#xff0c;形成一个指标系统&a…

JavaScript进阶知识汇总~

JavaScript 进阶 给大家推荐一个实用面试题库 1、前端面试题库 &#xff08;面试必备&#xff09; 推荐&#xff1a;★★★★★ 地址&#xff1a;web前端面试题库 1.原型链入门 1) 构造函数 当我们自定义一个函数时(箭头函数与生成器函数除外)&#xff0c;这个函…

PyQt5写一个Python代码执行器

# Author : 小红牛 # 微信公众号&#xff1a;WdPython import sys from PyQt5.QtWidgets import QApplication, QLabel, QLineEdit, QPushButton, QVBoxLayout, QWidgetdef execute_code():# 获取输入的代码code code_input.text()# 执行代码exec(code)# 创建应用程序和窗口 a…

数智化推送助力用户精准分层,MobPush是如何实现用户价值变现的

随着移动设备普及&#xff0c;移动应用市场日益趋于饱和&#xff0c;传统的拉新促活、提升APP渗透率&#xff0c;利用庞大的用户流量带来的广告收入、第三方合作等方式实现价值变现的路径已越来越窄&#xff0c;拉新促活成本的高企不下进一步限制了这种价值增长方式的可行性。因…

Rookit系列二【文件隐藏】【支持Win7 x32/x64 ~ Win10 x32/x64平台的NTFS文件系统】

文章目录 前言探究代码演示 前言 文件隐藏的方法有很多&#xff0c;这里分享的是一种通过内核文件重定向的方式动态规避检测的方法。举例&#xff1a;假设有一个安全软件A&#xff0c;A要扫描文件B&#xff0c;B是我们想要隐藏的文件。那么我们在内核中将A打开文件B的操作重定…

Qt 实现侧边栏滑出菜单效果

1.效果图 2.实现原理 这里做了两个widget&#xff0c;一个是 展示底图widget&#xff0c;一个是 展示动画widget。 这两个widget需要重合。动画widget需要设置属性叠加到底图widget上面&#xff0c;设置如下属性&#xff1a; setWindowFlags(Qt::FramelessWindowHint | Qt::…

2023/10/26MySQL学习

事务 询问当前是什么提交方式 1代表默认提交,0代表手动提交 将事务设为手动提交 将事务设置为手动提交后,mysql语句只会执行,但不会对原本表中数据进行更改, 只有执行以下两个语句之一,才会继续进行 commit完成原本操作,更改数据 rollback取消原来事务,不会进行任何更改 如…

【嵌入式Linux】编译应用和ko内核模块Makefile使用记录

文章目录 一、常用的语法1.1 , :, , ?的区别1.2 命名模式&#xff1a;target-objs 和 target-y 的区别 二、编译KO2.1 难度0&#xff1a;一个.c文件编译成一个.ko文件2.1.1 改进一下Makefile使得编译命令只需要make就可以 2.2 难度1&#xff1a;多个.c,.h文件编译成一个.ko文件…

在Java中使用FileReader.read()进行读取文件时,为什么乱码?两个方法解决

public class FileReader_ {public static void main(String[] args) {}Testpublic void m1() {String filePath "e:\\hello.txt";FileReader fileReader null;try {fileReader new FileReader(filePath);//循环读取 使用readwhile (fileReader.read()!-1){System…

使用dlib,OpenCV和Python进行人脸识别—人眼瞌睡识别

前期文章我们分享了如何使用python与dlib来进行人脸识别,以及来进行人脸部分的识别, 如下图,dlib人脸数据把人脸分成了68个数据点,从图片可以看出,人脸识别主要是识别:人眉,人眼,人鼻,人嘴以及人脸下颚边框,每个人脸的部位都有不同的数据标签从1-68 当我们识别出人脸…