【RabbitMQ】Producer之mandatory、alternative exchange、TTL - 基于AMQP 0-9-1(一)

news2025/1/10 1:41:32

RabbitMQ系列文章,前几篇介绍了基础概念,AMQP 0-9-1 协议,和服务端安装,准备工作都完成后,就开始着手开发了。这篇文章主要介绍RabbitMQ生产者的开发,包括Producer、Message常见的参数,读完这篇文章,基本掌握了Procuder端日常开发和使用。

这篇文章的所有代码实例都是基于AMQP 0-9-1,需要了解AMQP 0-9-1 协议的详细内容的小伙伴们,请参考AMQP 0-9-1 协议介绍 这篇文章。

这篇文章中设计的代码会列出关键部分,如果想看完整的Producer客户端开发的代码,可以参考RabbitMQ简单使用这篇文章,只需要在此基础上,加上这篇文章给出的关键代码,稍作修改就可以实现相关的功能。

mandatory参数

Producer发布的消息并不能百分百保证准确无误的路由到队列,当交换器无法将消息路由到队列时,消息就有可能丢失。mandatory参数可以解决消息丢失的问题,只需要把mandatory设置为true,当出现上述问题时,RabbitMQ会将消息返回给Producer,让Producer自己根据实际业务处理。如果设置为false,出现上述问题,消息就会被丢弃。

具体实现方式很简单,只需要在发布消息是将mandatory参数设置为true,并给channel添加ReturnListener监听器。代码实现如下。

// 添加监听器,当消息无法路由时,就会监听到RabbitMQ返回的消息
channel.addReturnListener(new ReturnListener() {
            @Override
            public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println(replyCode);
                System.out.println(replyText);
                System.out.println(new String(body));
            }
        });

String message = "mandatory message";
// 发布消息时,第三个参数mandatory设为true
channel.basicPublish(exchange, "aaaaaaaa", true, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());

备份Exchange

当Producer发布的消息无法路由到队列,如果mandatory设置false,消息会丢失,mandatory设置true,则会增加编程复杂度。备份队列可以解决这个问题,业务队列绑定备份队列后,当消息无法路由到队列时,此消息经过备份交换器,路由到备份队列,消息不会丢失,在合适的时候再处理备份队列中的消息,这种方式让程序更简单且更具灵活性。

下面是备份交换器的代码实现。

// 普通交换器
String normalExchange = "normal_exchange";
// 普通队列
String normalQueue = "normal_queue";
// 普通路由键
String normalRoutingKey = "normal_routing_key";
// 备份交换器
String aeExchange = "ae_exchange";
// 备份队列
String aeQueue = "ae_queue";

// 设置参数,申明备份交换器
Map<String, Object> args = new HashMap<>();
args.put("alternate-exchange", aeExchange);

// 申请普通交换器,将参数设置到普通交换器上
channel.exchangeDeclare(normalExchange, BuiltinExchangeType.DIRECT, false, false, args);
// 申明备份交换器(备份交换器也需要创建)
channel.exchangeDeclare(aeExchange, BuiltinExchangeType.FANOUT, false, false, null);

// 申明普通队列
channel.queueDeclare(normalQueue, false, false, false, null);
// 申明备份队列
channel.queueDeclare(aeQueue, false, false, false, null);

// 绑定
channel.queueBind(normalQueue, normalExchange, normalRoutingKey);
channel.queueBind(aeQueue, aeExchange, "");

只有发布消息时的路由键是normal_routing_key,消息才会路由到 normal_queue,当发布消息的路由键不是normal_routing_key 时,找不到队列,此时消息会进入ae_queue。下面列出一张图帮助理解。

bf72edde1bc44a1a93e78b5a9aef8385.png

这里的备份交换器类型是fanout,也可以设置为direct或者topic,但是需要注意,此时备份交换器上的绑定键需要和普通交换器上的绑定键一致,消息才能进入备份队列,以上面的代码举例:

普通路由键是normal_routing_key,备份交换器时 ae_routing_key,当Producer发布的消息,带的路由键是other_routing_key,无法路由到普通队列,就会进入备份交换器,但是备份交换器上的绑定键 ae_routing_key,无法和发布消息携带的 other_routing_key 匹配,消息还是被丢弃。

所以,建议备份交换器的类型设置为fanout,确实需要使用direct或者topic的话,注意路由键。如果不了解交换器类型的小伙伴们,可以参考AMQP 0-9-1 协议介绍 或者RabbitMQ相关概念及代码示例 这两篇文章。

消息过期时间(TTL)

 给消息设置TTL,在超过TTL值后,消息就会变成dead message(死信),订阅此队列的消费者无法消费(也不是绝地的,后续文章会介绍解决办法)。只需要在申明队列的时候,设置x-message-ttl 值即可。下面是代码实现。

HashMap<String, Object> args = new HashMap<>();
args.put("x-message-ttl", 5 * 1000);
channel.queueDeclare(queue, true, false, false, args);

如果不设置消息的ttl,消息不会过期;如果ttl设置为0,除非可以直接投递给消费者,否则消息会被丢弃。

每条消息可以设置不同的TTL,所以每条消息在被投递到消费者之前,才会判断消息是否过期,这样就会存在一种情况,后面的消息比前面的先过期,但是消费者依然不能消费到后面的消息,必须前面的消息先被投递到消费者,RabbitMQ就是采用这种方案的。下面用一张图帮助理解。

daa2c5fe05c6428d9823cab0884606b5.png

 

队列过期时间(TTL)

RabbitMQ不仅支持消息的TTL,还支持队列级别的TTL,可以通过x-expires 参数控制在队列删除之前处于未使用状态的时间,比如设置为1000,表示队列在1s之内,没有被使用,就会被删除。注意,队列级别的TTL不能设置为0。下面是代码实现。

HashMap<String, Object> args = new HashMap<>();
args.put("x-expires", 20000);
channel.queueDeclare(queue, true, false,false, args);

队列级别的TTL和消息级别的TTL不一样,因为不用考虑每条消息的TTL,只要队列到了TTL,就可以被删除。

好了,以上就是基于AMQP 0-9-1 协议,关于Producer的常用API使用第一部分的分享。

RabbitMQ系列文章会陆续更新,欢迎各位小伙伴关注后面的技术分享。

 

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/383723.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

20.hadoop系列之Yarn资源调度器

Yarn是一个资源调度平台&#xff0c;负责为运算程序提供服务器运算资源&#xff0c;相当于一个分布式的操作系统&#xff0c;而MapReduce等运算程序则相当于运行于操作系统之上的应用程序 1.Yarn基础架构 Yarn主要由ResourceManager、NodeManager、ApplicationMaster和Contai…

【MySQL】基础操作

数据库的操作 查看所有数据库&#xff1a; SHOW DATABASES;&#xff08;注意这里是 databases&#xff0c;多了个 s&#xff09; mysql 不区分大小写&#xff0c;所以 show databases; 是一样的。 另外&#xff0c;在命令行窗口操作时&#xff0c;可以使用上下方向键调用前面已…

CentOS编译安装Apache

1、下载Apache、apr和apr-util源码包&#xff1a; 2、解压文件&#xff1a; tar -zxvf httpd-2.4.55.tar.gz tar -zxvf apr-util-1.6.3.tar.gz tar -zxvf apr-util-1.6.3.tar.gz mv apr-1.7.2 httpd-2.4.55/srclib/apr mv apr-util-1.6.3 httpd-2.4.55/srclib/apr-util 说明&…

计讯物联5G数采仪助力打造化工园区企业工况监测系统

项目背景 随着我国化工行业的快速发展&#xff0c;化工园区已成为化工行业发展的重要阵地&#xff0c;化工企业聚集&#xff0c;危险化学品安全风险集中&#xff0c;安全规范问题逐渐成为行业关注的焦点。然而&#xff0c;我国化工园区发展水平发展参差不齐&#xff0c;尤其是…

Redis过期删除策略和内存淘汰策略

目录一、面试题二、Redis内存满了怎么办2.1 结论三、redis里写的数据如何删除的&#xff1f;3.1 三种不同的删除策略3.1.1 立即删除3.1.2 惰性删除3.1.3 定期删除3.3.4 总结四、redis缓存淘汰策略4.1 有哪些4.2 你平时用哪一种五、总结一、面试题 生产上你们你们的redis内存设…

第三章-OpenCV基础-8-绘图函数

前置内容 这篇内容不是本书内容,但后续用的到&#xff0c;特做记录。 使用OpenCV中不可避免需要用到各种绘图功能,比如绘制人脸库、显示人脸识别信息,那就需要用到OpenCV的绘图函数&#xff0c;这些函数包括cv2.line()&#xff0c; cv2.circle()&#xff0c;cv2.rectangle()…

Lazada如何做好店铺运营?产品定价是关键

1.东南亚各国状况一览&#xff08;对比中国&#xff09; 2.东南亚消费水平真的很低&#xff1f; 精准定价的意义&#xff1a;定价过高&#xff0c;失去核心竞争力&#xff1b;定价过低&#xff0c;亏本对市场失去信心&#xff1b;价格改动&#xff0c;流量下降 定价公式&#…

MySQL实战解析底层---日志系统:一条SQL更新语句是如何执行的

目录 前言 重要的日志模块&#xff1a;redo log 重要的日志模块&#xff1a;binlog 两阶段提交 前言 MySQL 可以恢复到半个月内任意一秒的状态&#xff0c;这是怎样做到的呢&#xff1f;从一个表的一条更新语句说起&#xff0c;下面是这个表的创建语句&#xff0c;这个表有…

【个人总结】Mongodb安装下载

【个人总结】MongDB安装下载1、下载2、安装3、创建数据库文件的存放位置4、配置5、创建文件6、安装mongodb服务7、测试1、下载 官网下载地址&#xff1a;https://www.mongodb.com/try/download/community-kubernetes-operator&#xff0c;选择对应版本&#xff0c;我这里是3.6…

大数据之Hudi数据湖_大数据治理_简介_发展历史_特性_应用场景---大数据之Hudi数据湖工作笔记0001

支持hive spark flink 美国公司开发的~ 都在使用,这些企业都在用 支持hadoop的,更新,插入,删除 和数据增量处理 支持流式数据处理. hive是离线数仓 hive不支持事物 insert overwrite 底层后来通过这种方式支持了事物 insert overwrite处理数据很低效,因为更新是基于覆盖实现…

2023年,软件测试怎么样?

2022年因为各种不可抗力原因&#xff0c;大厂裁员&#xff0c;失业等等频频受到关注。 不解释&#xff0c;确实存在&#xff0c;各行各业都很难&#xff0c;但是&#xff0c;说软件测试行业不吃香&#xff0c;我还真不认同&#xff08;不是为培训机构说好话&#xff0c;大环境…

计算神经网络参数量Params、计算量FLOPs(亲测有效的3种方法)

1.stat&#xff08;cpu统计&#xff09; pip install torchstat from torchstat import statstat(model, (3, 32, 32)) #统计模型的参数量和FLOPs&#xff0c;&#xff08;3,32,32&#xff09;是输入图像的size 结果&#xff1a; 问题&#xff1a;当网络中有自定义参数时&am…

龙蜥LoongArch架构研发全揭秘,龙芯开辟龙腾计划技术合作新范式

编者按&#xff1a;在开源新基建加快建设的背景下&#xff0c;越来越多的企业选择加入龙蜥社区&#xff0c;当前社区生态合作伙伴已突破 300 家。于是&#xff0c;龙蜥社区能为加入的企业提供哪些支持成为越多伙伴们更加关注的话题。本文将以龙蜥社区和龙芯中科联合研发龙蜥 Lo…

Webpack-好文

webpack是一个前端资源加载/打包工具&#xff0c;会根据模块的依赖关系进行静态分析&#xff0c;然后将这些模块按照指定的规则生成对应的静态资源Webpack打包js文件创建一个文件夹&#xff0c;cmd进入到终端&#xff0c;运行npm install -g webpack webpack-cli安装webpack we…

三、work queues(多进程消费一个队列)

1、轮训分发消息 在这个案例中我们会启动两个工作线程&#xff0c;一个消息发送线程&#xff0c;我们来看看他们两个工作线程是如何工作的。 1.1 抽取工具类 public class RabbitMqUtils {//得到一个连接的channelpublic static Channel getChannel() throws Exception {//创…

探寻世界:用Python获取照片的地理定位信息

目录 步骤&#xff1a; 源代码&#xff1a; 代码说明&#xff1a; 报错1&#xff1a; 解决方法1&#xff1a; 报错2&#xff1a; 解决方法2&#xff1a; 效果如下所示&#xff1a; 验证效果如下&#xff1a; 一、步骤&#xff1a; 要从 JPEG 图像中获取经纬度信息&…

投票页面制作线上投票活动制作网络投票制作关注投票制作

现在来说&#xff0c;公司、企业、学校更多的想借助短视频推广自己。通过微信投票小程序&#xff0c;网友们就可以通过手机拍视频上传视频参加活动&#xff0c;而短视频微信投票评选活动既可以给用户发挥的空间激发参与的热情&#xff0c;又可以让商家和企业实现推广的目的&…

2023上海国际电商物流包装产业展览会相约上海

2023年7月5-7日 | 上海新国际博览中心 同期举办&#xff1a;2023上海国际快递物流产业博览会 指导单位&#xff1a;上海市邮政管理局 中国快递协会 主办单位&#xff1a;上海市快递行业协会 上海市仓储与配送行业协会 上海市物流协会 承办单位&#xff1a;上海信世展览服务有…

【NLP相关】GPT-X合集:GPT类模型介绍(附相关论文和Github项目地址)

❤️觉得内容不错的话&#xff0c;欢迎点赞收藏加关注&#x1f60a;&#x1f60a;&#x1f60a;&#xff0c;后续会继续输入更多优质内容❤️&#x1f449;有问题欢迎大家加关注私戳或者评论&#xff08;包括但不限于NLP算法相关&#xff0c;linux学习相关&#xff0c;读研读博…

哈希一致性算法(分布式服务器落点算法)

场景预设和一般hash算法&#xff1a; 先预设一个场景&#xff0c;有10000份文件&#xff0c;需要缓存到五台缓存服务器之上 那么按照最常规&#xff0c;每个服务器平均分配2000份文件 那么用一个取余操作就可以完成 比如说是第2513的图片&#xff0c;那么用一个公式 需要缓…