RabbitMQ七种工作模式之 RPC通信模式, 发布确认模式

news2024/12/27 1:23:01

文章目录

  • 六. RPC(RPC通信模式)
    • 客户端
    • 服务端
  • 七. Publisher Confirms(发布确认模式)
    • 1. Publishing Messages Individually(单独确认)
    • 2. Publishing Messages in Batches(批量确认)
    • 3. Handling Publisher Confirms Asynchronously(异步确认)

六. RPC(RPC通信模式)

在这里插入图片描述
在这里插入图片描述

  1. 客⼾端发送消息到⼀个指定的队列, 并在消息属性中设置replyTo字段, 这个字段指定了⼀个回调队列, ⽤于接收服务端的响应.
  2. 服务端接收到请求后, 处理请求并发送响应消息到replyTo指定的回调队列
  3. 客⼾端在回调队列上等待响应消息. ⼀旦收到响应,客⼾端会检查消息的correlationId属性,以确保它是所期望的响应

公共代码:

 	public static final String RPC_REQUEST_QUEUE = "rpc.request.queue";
    public static final String RPC_RESPONSE_QUEUE = "rpc.response.queue";

客户端

客户端需要完成两件事, 发送请求, 接收响应
发送请求:

//发送请求:
        //声明队列
        channel.queueDeclare(Common.RPC_REQUEST_QUEUE, true, false, false, null);
        channel.queueDeclare(Common.RPC_RESPONSE_QUEUE, true, false, false, null);
        //定义回调队列
        String replyQueueName = Common.RPC_RESPONSE_QUEUE;
        //本次请求的唯一标识
        String corrId = UUID.randomUUID().toString();
        //生成发送消息的属性
        AMQP.BasicProperties props = new AMQP.BasicProperties
                .Builder()
                .correlationId(corrId)
                .replyTo(replyQueueName)
                .build();
        //通过内置交换机, 发送消息
        String message = "hello, rpc......";
        channel.basicPublish("", Common.RPC_REQUEST_QUEUE, props, message.getBytes(StandardCharsets.UTF_8));

接收响应:

 //接收响应:
        //使用阻塞队列来存储回调结果, 避免了客户端反复访问队列
        final BlockingQueue<String> response = new ArrayBlockingQueue<>(1);
        
        //接收服务器的响应
        DefaultConsumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("接收到消息: " + new String(body));
                //如果唯一标识正确, 放在阻塞队列中
                if(properties.getCorrelationId().equals(corrId)){
                    response.offer(new String(body));
                }
            }
        };
        channel.basicConsume(replyQueueName, true, consumer);
        //获取回调的结果
        String result = response.take();
        System.out.println("[RPCClient] Result: " + result);
       

服务端

在这里插入图片描述

		//设置同时最多只能获取一个消息
        channel.basicQos(1);
        //接收消息, 并对消息进行应答
        DefaultConsumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                BasicProperties replyProps = new AMQP.BasicProperties
                        .Builder()
                        .correlationId(properties.getCorrelationId())
                        .build();
                String message = new String(body);
                String response = "接收到消息 request: " + message;
                channel.basicPublish("", properties.getReplyTo(), replyProps, response.getBytes(StandardCharsets.UTF_8));
                //对消息进行应答
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        };
        channel.basicConsume(Common.RPC_REQUEST_QUEUE, false, consumer);//自动应答设置成false, 在成功回调后, 再进行手动应答

在这里插入图片描述

七. Publisher Confirms(发布确认模式)

在这里插入图片描述

Publisher Confirms模式是RabbitMQ提供的⼀种确保消息可靠发送到RabbitMQ服务器的机制。在这种模式下,⽣产者可以等待RabbitMQ服务器的确认,以确保消息已经被服务器接收并处理.

  1. ⽣产者将Channel设置为confirm模式(通过调⽤channel.confirmSelect()完成)后, 发布的每⼀条消息都会获得⼀个唯⼀的ID, ⽣产者可以将这些序列号与消息关联起来,以便跟踪消息的状态.
  2. 当消息被RabbitMQ服务器接收并处理后,服务器会异步地向⽣产者发送⼀个确认(ACK)给⽣产者(包含消息的唯⼀ID),表明消息已经送达.
    通过Publisher Confirms模式,⽣产者可以确保消息被RabbitMQ服务器成功接收, 从⽽避免消息丢失的问题.
    适⽤场景: 对数据安全性要求较⾼的场景. ⽐如⾦融交易, 订单处理

⽣产者将信道设置成confirm(确认)模式, ⼀旦信道进⼊confirm模式, 所有在该信道上⾯发布的消息都会被指派⼀个唯⼀的ID(从1开始), ⼀旦消息被投递到所有匹配的队列之后, RabbitMQ就会发送⼀个确认给⽣产者(包含消息的唯⼀ID), 这就使得⽣产者知道消息已经正确到达⽬的队列了, 如果消息和队列是可持久化的, 那么确认消息会在将消息写⼊磁盘之后发出. broker回传给⽣产者的确认消息中deliveryTag 包含了确认消息的序号, 此外 broker 也可以设置channel.basicAck⽅法中的multiple参数, 表⽰到这个序号之前的所有消息都已经得到了处理.
在这里插入图片描述

使⽤发送确认机制, 必须要信道设置成confirm(确认)模式

发布确认有3种策略:

1. Publishing Messages Individually(单独确认)

public static void publishingMessagesIndividually() throws IOException, TimeoutException, InterruptedException {
        try(Connection connection = createConnection()){
            //创建channel
            Channel channel = connection.createChannel();
            //开启信道确认模式
            channel.confirmSelect();
            channel.queueDeclare(Common.PUBLISHER_CONFIRMS_QUEUE1, true, false, false, null);
            Long start = System.currentTimeMillis();
            for(int i = 0; i < MESSAGE_COUNT; i++){
                String body = "消息" + i;
                channel.basicPublish("", Common.PUBLISHER_CONFIRMS_QUEUE1, null, body.getBytes(StandardCharsets.UTF_8));
                //等待确认消息, 只要消息被确认, 这个方法就会被返回
                //如果超时过期, 则抛出TimeoutException, 如果任何消息被nack(丢失), waitForConfirmsOrDie将抛出IOException
                channel.waitForConfirmsOrDie();

            }
            Long end = System.currentTimeMillis(5000);
            System.out.printf("Published %d message individually in %d ms", MESSAGE_COUNT, end - start);
        }

    }

在这里插入图片描述

2. Publishing Messages in Batches(批量确认)

    private static void publishingMessagesInBatches() throws IOException, TimeoutException, InterruptedException {
        try(Connection connection = createConnection()){
            Channel channel = connection.createChannel();
            channel.confirmSelect();
            channel.queueDeclare(Common.PUBLISHER_CONFIRMS_QUEUE2, true, false, false, null);
            //批量个数
            int batchSize = 100;
            int outstandingMessageCount = 0;
            long start = System.currentTimeMillis();
            for(int i = 0; i < MESSAGE_COUNT; i++){
                String body = "消息" + i;
                channel.basicPublish("", Common.PUBLISHER_CONFIRMS_QUEUE2, null, body.getBytes(StandardCharsets.UTF_8));
                outstandingMessageCount++;
                if(outstandingMessageCount == batchSize){
                    channel.waitForConfirmsOrDie(5000);
                    outstandingMessageCount = 0;
                }
            }
            if(outstandingMessageCount > 0){
                channel.waitForConfirms(5000);
            }
            long end = System.currentTimeMillis();
            System.out.printf("Published %d message batch in %d ms", MESSAGE_COUNT, end - start);
        }
    }

在这里插入图片描述
相⽐于单独确认策略, 批量确认极⼤地提升了confirm的效率, 缺点是出现Basic.Nack或者超时时, 我们不清楚具体哪条消息出了问题. 客⼾端需要将这⼀批次的消息全部重发, 这会带来明显的重复消息数量, 当消息经常丢失时,批量确认的性能应该是不升反降的

3. Handling Publisher Confirms Asynchronously(异步确认)

异步confirm⽅法的编程实现最为复杂. Channel 接⼝提供了⼀个⽅法addConfirmListener. 这个⽅法可以添加ConfirmListener 回调接⼝.
ConfirmListener 接⼝中包含两个⽅法: handleAck(long deliveryTag, boolean multiple) 和 handleNack(long deliveryTag, boolean multiple) , 分别对应处理RabbitMQ发送给⽣产者的ack和nack.
deliveryTag 表⽰发送消息的序号. multiple 表⽰是否批量确认.
我们需要为每⼀个Channel 维护⼀个已发送消息的序号集合. 当收到RabbitMQ的confirm 回调时, 从集合中删除对应的消息. 当Channel开启confirm模式后, channel上发送消息都会附带⼀个从1开始递增的deliveryTag序号. 我们可以使⽤SortedSet 的有序性来维护这个已发消息的集合.

  1. 当收到ack时, 从序列中删除该消息的序号. 如果为批量确认消息, 表⽰⼩于等于当前序号deliveryTag的消息都收到了, 则清除对应集合
  2. 当收到nack时, 处理逻辑类似, 不过需要结合具体的业务情况, 进⾏消息重发等操作
private static void handlingPublisherConfirmsAsynchronously() throws IOException, TimeoutException, InterruptedException{
        try(Connection connection = createConnection()){
            Channel channel = connection.createChannel();
            channel.queueDeclare(Common.PUBLISHER_CONFIRMS_QUEUE3, true, false, false, null);
            channel.confirmSelect();

            //有序集合, 元素按照自然顺序进行排序, 存储未confirm消息序号
            SortedSet<Long> confirmSet = Collections.synchronizedSortedSet(new TreeSet<>());
            channel.addConfirmListener(new ConfirmListener() {
                @Override
                public void handleAck(long deliveryTag, boolean multiple) throws IOException {
                    if(multiple){
                        //批量
                        confirmSet.headSet(deliveryTag + 1).clear();
                    }else{
                        confirmSet.remove(deliveryTag);
                    }
                }

                @Override
                public void handleNack(long deliveryTag, boolean multiple) throws IOException {
                    if(multiple){
                        //批量
                        confirmSet.headSet(deliveryTag + 1).clear();
                    }else{
                        confirmSet.remove(deliveryTag);
                    }
                    //如果处理失败, 需要有消息重发的环节, 此处省略
                }
            });
            long start = System.currentTimeMillis();
            for (int i = 0; i < MESSAGE_COUNT; i++) {
                String message = "消息" + i;
                long nextPublishSeqNo = channel.getNextPublishSeqNo();
                channel.basicPublish("", Common.PUBLISHER_CONFIRMS_QUEUE3, null, message.getBytes(StandardCharsets.UTF_8));
                confirmSet.add(nextPublishSeqNo);
            }
            while(!confirmSet.isEmpty()){
                Thread.sleep(10);
            }
            long end = System.currentTimeMillis();
            System.out.printf("Published %d message ConfirmsAsynchronously in %d ms", MESSAGE_COUNT, end - start);
        }
    }

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2257567.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

ArcGIS字符串补零与去零

我们有时候需要 对属性表中字符串的补零与去零操作 我们下面直接视频教学 下面看视频教学 ArcGIS字符串去零与补零 推荐学习 ArcGIS全系列实战视频教程——9个单一课程组合 ArcGIS10.X入门实战视频教程&#xff08;GIS思维&#xff09; ArcGIS之模型构建器&#xff08;Mod…

前端面试如何出彩

1、原型链和作用域链说不太清&#xff0c;主要表现在寄生组合继承和extends继承的区别和new做了什么。2、推荐我的两篇文章&#xff1a;若川&#xff1a;面试官问&#xff1a;能否模拟实现JS的new操作符、若川&#xff1a;面试官问&#xff1a;JS的继承 3、数组构造函数上有哪些…

大模型应用编排工具Dify之构建专属FQA应用

1.前言 ​ 通过 dify可以基于开源大模型的能力&#xff0c;并结合业务知识库、工具API和自定义代码等构建特定场景、行业的专属大模型应用。本文通过 dify工作室的聊天助手-工作流编排构建了一个基于历史工作日志回答问题的助手&#xff0c;相比原始的大模型答复&#xff0c;通…

前端node环境安装:nvm安装详细教程(安装nvm、node、npm、cnpm、yarn及环境变量配置)

需求&#xff1a;在做前端开发的时候&#xff0c;有的时候 这个项目需要 node 14 那个项目需要 node 16&#xff0c;我们也不能卸载 安装 。这岂不是很麻烦。这个时候 就需要 一个工具 来管理我们的 node 版本和 npm 版本。 下面就分享一个 nvm 工具 用来管理 node 版本。 这个…

c基础加堆练习题

1】思维导图&#xff1a; 2】在堆区空间连续申请5个int类型大小空间&#xff0c;用来存放从终端输入的5个学生成绩&#xff0c;然后显示5个学生成绩&#xff0c;再将学生成绩升序排序&#xff0c;排序后&#xff0c;再次显示学生成绩。显示和排序分别用函数完成 要求&#xff…

嵌入式Linux 设备树 GPIO详解 示例分析 三星 NXP RK

GPIO设备树用于在Linux内核中定义与GPIO相关的硬件资源&#xff0c;它使操作系统可以识别、配置和使用GPIO引脚。设备树中通常会指定GPIO控制器的基地址、GPIO引脚的中断配置、时钟和其他相关信息。 目录 RK相关案例代码 NXP相关案例代码 三星相关案例代码 在设备树中&…

【日记】不想随礼欸(926 字)

正文 今天忙了一天。感觉从早上就开始在救火。客户经理迎接检查&#xff0c;要补资料&#xff0c;找我们问这样要那样&#xff0c;我自己的事情几乎完全开展不了。虽说也没什么大事就是了。 晚上行长还让我重装系统…… 难绷。看来这个爹味新行长懂得还挺多。 中午趁着不多的休…

Spring 源码学习(七)——注解后处理器

通过之前对注解式配置的解析&#xff08;Spring 源码学习&#xff08;三&#xff09;—— 注解式配置解析_spring源码学习-CSDN博客&#xff09;可以发现其使用 AnnotationConfigUtils 类的 registerAnnotationConfigProcessors 静态方法对象注解后处理器对象进行注册&#xff…

如何避免缓存击穿?超融合常驻缓存和多存储池方案对比

作者&#xff1a;SmartX 解决方案专家 钟锦锌 很多运维人员都知道&#xff0c;混合存储介质配置可能会带来“缓存击穿”的问题&#xff0c;尤其是大数据分析、数据仓库等需要频繁访问“冷数据”的应用场景&#xff0c;缓存击穿可能会更频繁地出现&#xff0c;影响业务运行。除…

Scala的正则表达式二

验证用户名是否合法 规则 1.长度在6-12之间 2.不能数字开头 3.只能包含数字&#xff0c;大小写字母&#xff0c;下划线def main(args: Array[String]): Unit {val name1 "1admin"//不合法&#xff0c;是数字开头val name2 "admin123"//合法val name3 &quo…

【CKA】Kubernetes(k8s)认证之CKA考题讲解

CKA考题讲解 0.考试101 0.1 kubectl命令⾃动补全 在 bash 中设置当前 shell 的⾃动补全&#xff0c;要先安装 bash-completion 包。 echo "source <(kubectl completion bash)" >> ~/.bashrc还可以在补全时为 kubectl 使⽤⼀个速记别名&#xff1a; al…

导入kotlin

android studio 导入kotlin项目 android studio kotlin教程 或者直接拿一个kt文件进来&#xff0c;在顶部会显示一个config&#xff0c;然后设置version&#xff0c;点击OK就可以了自动导了

《CSS 知识点》大屏卡片布局思路:弹性布局 flex-grow

思路 大屏左右两侧高宽一致&#xff0c;内部卡片可按比例设置&#xff01; 使用弹性布局和属性 flex-grow 设置比例&#xff1b;间隔使用 margin-bottom 设置&#xff0c;最后一个卡片不设置&#xff1b; 效果如图 代码说明 CSS代码 26 - 30&#xff0c;左右两侧设置弹性布…

责任链模式的理解和实践

责任链模式&#xff08;Chain of Responsibility&#xff09;是行为型设计模式之一&#xff0c;它通过将多个对象连成一条链&#xff0c;并沿着这条链传递请求&#xff0c;直到有对象处理它为止。这个模式的主要目的是将请求的发送者和接收者解耦&#xff0c;使请求沿着处理链传…

如何在 Ubuntu 上安装开源监控工具 Uptime Kuma

简介 Uptime Kuma&#xff08;或简称 Kuma&#xff09;是一个开源监控工具&#xff0c;用于监控 HTTP、HTTPS、DNS 等协议的服务。Uptime Kuma 提供多种功能&#xff0c;如多语言支持、多个状态页面、代理支持等。 接下来&#xff0c;我将一步一步教大家如何进行安装和部署&am…

go语言zero框架对接阿里云消息队列MQ的rabbit的配置与调用

在 Go 语言中对接阿里云消息队列&#xff08;MQ&#xff09;的 RabbitMQ 配置与调用&#xff0c;首先需要安装和配置相关的 Go 库&#xff0c;并了解如何通过 RabbitMQ 与阿里云消息队列进行交互。 ### 步骤一&#xff1a;安装 RabbitMQ Go 客户端库 阿里云的消息队列&#x…

AttributeError: module ‘cv2.dnn‘ has no attribute ‘DictValue‘如何解决?

AttributeError: module cv2.dnn has no attribute DictValue如何解决&#xff1f; 出现场景出错原因解决方案 出现场景 当在代码中导入opencv的时候&#xff1a;import cv2&#xff0c;出现&#xff1a; 出错原因 查看大家出现的错误&#xff0c;发现是因为opencv版本问题…

京东e卡 h5st 4.96

声明: 本文章中所有内容仅供学习交流使用&#xff0c;不用于其他任何目的&#xff0c;抓包内容、敏感网址、数据接口等均已做脱敏处理&#xff0c;严禁用于商业用途和非法用途&#xff0c;否则由此产生的一切后果均与作者无关&#xff01; 有相关问题请第一时间头像私信联系我删…

《探索视频数字人:开启未来视界的钥匙》

一、引言 1.1视频数字人技术的崛起 在当今科技飞速发展的时代&#xff0c;视频数字人技术如一颗璀璨的新星&#xff0c;正逐渐成为各领域瞩目的焦点。它的出现&#xff0c;犹如一场科技风暴&#xff0c;彻底改变了传统的视频制作方式&#xff0c;为各个行业带来了前所未有的机…

畅阅读微信小程序+ssm

摘 要 随着社会的发展&#xff0c;社会的方方面面都在利用信息化时代的优势。互联网的优势和普及使得各种系统的开发成为必需。 本文以实际运用为开发背景&#xff0c;运用软件工程原理和开发方法&#xff0c;它主要是采用java语言技术和mysql数据库来完成对系统的设计。整个…