RabbitMQ-核心特性

news2024/11/30 15:38:51

已经不需要为RabbitMQ交换机的离去而感到伤心了,接下来登场的是RabbitMQ-核心特性!!!

文章目录

  • 核心特性
    • 消息过期机制
    • 消息确认机制
    • 死信队列

核心特性

消息过期机制

官方文档:https://www.rabbitmq.com/ttl.html
可以给每条消息指定一个有效期,一段时间内未被消费者处理,就过期了
适用场景:清理过期数据

1)给队列中的所有消息指定过期时间

Map<String, Object> args = new HashMap<String, Object>();
args.put("x-message-ttl", 5000);
// args 指定参数
channel.queueDeclare(QUEUE_NAME, false, false, false, args);

如果在过期时间内,还没有消费者取消息,消息才会过期
注意,如果消息已经接收到,但是没确认,是不会过期的
消费者中给队列中所有消息设置过期时间:

public class TtlConsumer {

    private final static String QUEUE_NAME = "ttl_queue";

    public static void main(String[] argv) throws Exception {
        // 创建连接
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        // 创建队列,指定消息过期参数
        Map<String, Object> args = new HashMap<String, Object>();
        args.put("x-message-ttl", 5000);
        // args 指定参数
        channel.queueDeclare(QUEUE_NAME, false, false, false, args);

        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
        // 定义了如何处理消息
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
            System.out.println(" [x] Received '" + message + "'");
        };
        // 消费消息,会持续阻塞
        channel.basicConsume(QUEUE_NAME, false, deliverCallback, consumerTag -> { });
    }
}

2)给某条消息指定过期时间

// 给消息指定过期时间
AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
        .expiration("1000")
        .build();
channel.basicPublish("my-exchange", "routing-key", properties, message.getBytes(StandardCharsets.UTF_8));

生产者给某条消息指定过期时间

public class TtlProducer {

    private final static String QUEUE_NAME = "ttl_queue";

    public static void main(String[] argv) throws Exception {
        // 创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
//        factory.setUsername();
//        factory.setPassword();
//        factory.setPort();

        // 建立连接、创建频道
        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {
            // 发送消息
            String message = "Hello World!";

            // 给消息指定过期时间
            AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
                    .expiration("1000")
                    .build();
            channel.basicPublish("my-exchange", "routing-key", properties, message.getBytes(StandardCharsets.UTF_8));
            System.out.println(" [x] Sent '" + message + "'");
        }
    }
}

消息确认机制

官方文档:https://www.rabbitmq.com/confirms.html
为保证消息成功被消费,rabbitmq提供了消息确认机制,当消费者收到消息后要给一个成功反馈:
●ack:消费成功
●nack:消费失败
●reject:拒绝
如果告诉 rabbitmq 服务器消费成功,服务器才会放心地移除消息。
支持配置 autoack,会自动执行 ack 命令,接收到消息立刻就成功了。
image.png

        channel.basicConsume(queueName, true, xiaoyuDeliverCallback, consumerTag -> {
        });

一般情况,建议 autoack 改为 false,根据实际情况,去手动确认。
指定确认某条消息:

image.png

channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);

第二个参数 multiple 批量确认:是指是否要一次性确认所有的历史消息直到当前这条消息
指定拒绝某条消息:
image.png

channel.basicNack(delivery.getEnvelope().getDeliveryTag(),false,false);

第 3 个参数表示是否重新入队,可用于重试

死信队列

官方文档:https://www.rabbitmq.com/dlx.html
为了保证消息的可靠性,比如每条消息都成功消费,需要提供一个容错机制,即:失败的消息怎么处理?
死信:过期的消息,拒收的消息,消息队列满了,处理失败的消息的统称
死信队列:专门处理死信的队列
死信交换机:专门给死信队列转发消息的交换机
示例场景:
image.png
实现:
1)创建死信交换机和死信队列,并且绑定关系
2)给失败之后需要容错处理的队列绑定死信交换机
3)可以给要容错的队列指定死信之后的转发规则,死信应该再转发到哪个死信队列
4)可以通过程序来读取死信队列中的消息,从而进行处理
生产者代码:

package com.yupi.springbootinit.mq;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;

import java.util.Scanner;

public class DlxDirectProducer {
    //死信交换机
    private static final String DEAD_EXCHANGE_NAME = "dlx-direct-exchange";
    //工作交换机
    private static final String WORK_EXCHANGE_NAME = "direct2-exchange";


    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {
            // 声明死信交换机
            channel.exchangeDeclare(DEAD_EXCHANGE_NAME, "direct");

            // 创建laoban死信队列
            String queueName = "laoban_dlx_queue";
            channel.queueDeclare(queueName, true, false, false, null);
            channel.queueBind(queueName, DEAD_EXCHANGE_NAME, "laoban");
            //创建waibao死信队列
            String queueName2 = "waibao_dlx_queue";
            channel.queueDeclare(queueName2, true, false, false, null);
            channel.queueBind(queueName2, DEAD_EXCHANGE_NAME, "waibao");

            DeliverCallback laobanDeliverCallback = (consumerTag, delivery) -> {
                String message = new String(delivery.getBody(), "UTF-8");
                // 拒绝消息
                channel.basicNack(delivery.getEnvelope().getDeliveryTag(), false, false);
                System.out.println(" [laoban] Received '" +
                        delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");
            };

            DeliverCallback waibaoDeliverCallback = (consumerTag, delivery) -> {
                String message = new String(delivery.getBody(), "UTF-8");
                // 拒绝消息
                channel.basicNack(delivery.getEnvelope().getDeliveryTag(), false, false);
                System.out.println(" [waibao] Received '" +
                        delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");
            };

            channel.basicConsume(queueName, false, laobanDeliverCallback, consumerTag -> {
            });
            channel.basicConsume(queueName2, false, waibaoDeliverCallback, consumerTag -> {
            });


            Scanner scanner = new Scanner(System.in);
            while (scanner.hasNext()) {
                String userInput = scanner.nextLine();
                String[] strings = userInput.split(" ");
                if (strings.length < 1) {
                    continue;
                }
                String message = strings[0];
                String routingKey = strings[1];

                channel.basicPublish(WORK_EXCHANGE_NAME, routingKey, null, message.getBytes("UTF-8"));
                System.out.println(" [x] Sent '" + message + " with routing:" + routingKey + "'");
            }

        }
    }
}

消费者代码:

在这里插入代码片package com.yupi.springbootinit.mq;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;

import java.util.HashMap;
import java.util.Map;

public class DlxDirectConsumer {

    private static final String DEAD_EXCHANGE_NAME = "dlx-direct-exchange";

    private static final String WORK_EXCHANGE_NAME = "direct2-exchange";

    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        channel.exchangeDeclare(WORK_EXCHANGE_NAME, "direct");

        //小狗的死信要转发到waibao这个死信队列
        // 指定死信队列参数
        Map<String, Object> args = new HashMap<>();
        // 要绑定到哪个交换机
        args.put("x-dead-letter-exchange", DEAD_EXCHANGE_NAME);
        // 指定死信要转发到哪个死信队列
        args.put("x-dead-letter-routing-key", "waibao");

        // 创建队列,随机分配一个队列名称
        String queueName = "xiaodog_queue";
        channel.queueDeclare(queueName, true, false, false, args);
        channel.queueBind(queueName, WORK_EXCHANGE_NAME, "xiaodog");
        //小猫的死信要转发到laoban这个死信队列
        Map<String, Object> args2 = new HashMap<>();
        args2.put("x-dead-letter-exchange", DEAD_EXCHANGE_NAME);
        args2.put("x-dead-letter-routing-key", "laoban");

        // 创建队列,随机分配一个队列名称
        String queueName2 = "xiaocat_queue";
        channel.queueDeclare(queueName2, true, false, false, args2);
        channel.queueBind(queueName2, WORK_EXCHANGE_NAME, "xiaocat");

        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

        DeliverCallback xiaoyuDeliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            // 拒绝消息,并且不要重新将消息放回队列,只拒绝当前消息
            channel.basicNack(delivery.getEnvelope().getDeliveryTag(), false, false);
            System.out.println(" [xiaodog] Received '" +
                    delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");
        };

        DeliverCallback xiaopiDeliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            // 拒绝消息
            channel.basicNack(delivery.getEnvelope().getDeliveryTag(), false, false);
            System.out.println(" [xiaocat] Received '" +
                    delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");
        };

        channel.basicConsume(queueName, false, xiaoyuDeliverCallback, consumerTag -> {
        });
        channel.basicConsume(queueName2, false, xiaopiDeliverCallback, consumerTag -> {
        });
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1602792.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

富文本在线编辑器 - tinymce

tinymce 项目是一个比较好的富文本编辑器. 这里有个小demo, 下载下来尝试一下, 需要配置个本地服务器才能够访问, 我这里使用的nginx, 下面是我的整个操作过程: git clone gitgitee.com:chick1993/layui-tinymce.git cd layui-tinymcewget http://nginx.org/download/nginx-1.…

监控系统泛滥:CTO 面临的隐形成本危机

在信息技术飞速发展的今天&#xff0c;构建和维护现代化的数字系统变得日益复杂和关键&#xff1b;在这样的背景下&#xff0c;监控系统的作用变得尤为突出。正如业界广泛流传的一句经验之谈“无监控&#xff0c;不运维”所揭示的道理一样&#xff0c;对于任何具有一定复杂性的…

Redis(Windows版本下载安装和使用)

天行健&#xff0c;君子以自强不息&#xff1b;地势坤&#xff0c;君子以厚德载物。 每个人都有惰性&#xff0c;但不断学习是好好生活的根本&#xff0c;共勉&#xff01; 文章均为学习整理笔记&#xff0c;分享记录为主&#xff0c;如有错误请指正&#xff0c;共同学习进步。…

lua 光速入门

文章目录 安装注释字符串变量逻辑运算条件判断循环函数Table (表)常用全局函数模块化 首先明确 lua 和 js Python一样是动态解释性语言&#xff0c;需要解释器执行。并且不同于 Python 的强类型与 js 的弱类型&#xff0c;它有点居中&#xff0c;倾向于强类型。 安装 下载解释…

AI预测福彩3D第38弹【2024年4月17日预测--第8套算法开始计算第6次测试】

今天咱们继续测试第8套算法和模型&#xff0c;今天是第5次测试&#xff0c;目前的测试只是为了记录和验证&#xff0c;为后续的模型修改和参数调整做铺垫&#xff0c;所以暂时不建议大家盲目跟买~废话不多说了&#xff0c;直接上结果&#xff01; 2024年4月17日3D的七码预测结果…

vivado 与 VIO 核输出探针进行交互

与 VIO 核输出探针进行交互 VIO 核输出探针用于将值写入实际硬件中的 FPGA 或 ACAP 中运行的设计。 VIO 输出探针通常用作为待测设计的低带 宽控制信号。 VIO 调试探针需手动添加到 VIO 仪表板的“ VIO 探针 (VIO Probes) ”窗口中。请参阅“在‘调试探针 (Debug Pr…

中国12.5米DEM地形瓦片数据免费领取!

之前向大家公开了中国34个省12.5米DEM地形瓦片数据的免费领取链接&#xff0c;大家对12.5米DEM数据的使用需求很强烈&#xff0c;领取也很积极&#xff0c;也有不少读者反馈能否提供全国范围的12.5米DEM地形瓦片数据&#xff0c;因为分省级地形瓦片数据想要合并成全国数据&…

CUDA 以及MPI并行矩阵乘连接服务器运算vscode配置

一、CUDA Vscode配置 &#xff08;一&#xff09;扩展安装 本地安装 服务器端安装 &#xff08;二&#xff09; CUDA 配置 .vscode c_cpp_properties.json {"configurations": [{"name": "Linux","includePath": ["${workspa…

【AI】DeepStream(01)介绍

1、简介 DeepStream 本质是 GStreamer 的插件,基于GStreamer的管道,实现高效的视频流分析。 DeepStream 将来自 USB/CSI 摄像头的流数据、来自文件的视频或通过 RTSP 的流作为输入,并使用人工智能和计算机视觉从像素中生成AI结果。 DeepStream SDK 可以成为许多视频分析解…

C++ queue priority_queuestack 详解及模拟实现

1. stack的介绍和使用 1.1 stack的介绍 1. stack是一种容器适配器&#xff0c;专门用在具有后进先出操作的上下文环境中&#xff0c;其删除只能从容器的一端进行元素的插入与提取操作。 2. stack是作为容器适配器被实现的&#xff0c;容器适配器即是对特定类封装作为其底层的容…

【华为笔试题汇总】2024-04-17-华为春招笔试题-三语言题解(Python/Java/Cpp)

&#x1f36d; 大家好这里是KK爱Coding &#xff0c;一枚热爱算法的程序员 ✨ 本系列打算持续跟新华为近期的春秋招笔试题汇总&#xff5e; &#x1f4bb; ACM银牌&#x1f948;| 多次AK大厂笔试 &#xff5c; 编程一对一辅导 &#x1f44f; 感谢大家的订阅➕ 和 喜欢&#x1f…

吴恩达2022机器学习专项课程(一) 第二周课程实验:特征工程和多项式回归(Lab_04)

目标 探索特征工程和多项式回归&#xff0c;使用线性回归来拟合非常复杂甚至非线性的函数。 1.为什么线性回归能拟合非线性函数&#xff1f; fxw*xb&#xff0c;属于线性回归的扩展&#xff0c;这个公式在数学中不属于线性&#xff0c;因为有x&#xff0c;而在机器学习中属于…

【云计算】云数据中心网络(六):私网连接

云数据中心网络&#xff08;六&#xff09;&#xff1a;私网连接 1.什么是私网连接2.私网连接的组成3.私网连接的优势4.私网连接的主要应用场景 前面讲到 VPC 网络具有隔离性&#xff0c;VPC 之间无法通信。当一个 VPC 中的终端需要访问部署在另一个 VPC 中的服务时&#xff0c…

江苏开放大学2024年春《测试技术 060245》作业3参考答案

答案&#xff1a;更多答案&#xff0c;请关注【电大搜题】微信公众号 答案&#xff1a;更多答案&#xff0c;请关注【电大搜题】微信公众号 答案&#xff1a;更多答案&#xff0c;请关注【电大搜题】微信公众号 单选题 1相关系数ρxy的取值范围处于( )之间。 A-1和0 B…

OpenCV基本图像处理操作(十一)——图像特征Sift算法

图像尺度空间 在一定的范围内&#xff0c;无论物体是大还是小&#xff0c;人眼都可以分辨出来&#xff0c;然而计算机要有相同的能力却很难&#xff0c;所以要让机器能够对物体在不同尺度下有一个统一的认知&#xff0c;就需要考虑图像在不同的尺度下都存在的特点。 尺度空间的…

WebGl/Three 粒子系统 人物破碎及还原运动

粒子 首先&#xff0c;加载模型&#xff0c;这是万千粒子的前身&#xff0c;模型对象由很多面构成&#xff0c;这些面又是由各个点构成的&#xff0c;所以可以将模型的几何体对象geometry赋给粒子对象&#xff0c;粒子物体用Points方式渲染 bloader.load("obj/female02/Fe…

斯坦福发布法律指令数据集LawInstruct,统一17个辖区24种语言

在法律领域&#xff0c;语言模型&#xff08;Language Models, LLMs&#xff09;的发展一直面临着独特的挑战。法律文本的复杂性、专业术语的广泛使用以及对精确性和可靠性的极高要求&#xff0c;使得法律领域的自然语言处理&#xff08;Natural Language Processing, NLP&…

新版周易运势风水测算 宝宝起名改名 公司吉凶测名 八字姻缘爱情算命预测 易经塔罗牌占卜

源码简介&#xff1a; 系统包含八字合婚、流年运势、宝宝起名、塔罗占卜、姓名配对、命中注定、星座运势、八字精批、桃花运姻缘、测终生运、十年大运详批、犯太岁化解、紫薇财运精批、取名改名、算姻缘、婚前合婚、算事业、算财运、姓名详批、塔罗爱情占卜&#xff08;三个月…

Python接口自动化 —— Web接口(2)

1.2.3 HTTP HTTP概念 Hyper Text Transfer Protocal超文本传输协议&#xff0c;基于tcp/ip通信协议来传递数据&#xff0c;属于应用层协议主要特点: 无连接: 每次连接只处理一个请求&#xff0c;服务器处理完请求并受到客户端应答后就断开连接媒体独立。 只要服务器和客…

web项目中jsp页面不识别el表达式

如果使用el表达式出现下图问题 ** 解决办法 ** 这是因为maven创建项目时&#xff0c;web.xml头部声明默认是2.3&#xff0c;这个默认jsp关闭el表达式 修改web.xml文件开头的web-app的版本 <?xml version"1.0" encoding"UTF-8"?> <web-app x…