真的好用吗?鲜有人提的 RabbitMQ-RPC模式

news2024/12/25 13:19:27

RabbitMQ系列文章

手把手教你,本地RabbitMQ服务搭建(windows)
消息队列选型——为什么选择RabbitMQ
RabbitMQ灵活运用,怎么理解五种消息模型
RabbitMQ 能保证消息可靠性吗
推或拉? RabbitMQ 消费模式该如何选择
死信是什么,如何运用RabbitMQ的死信机制?


真的好用吗?鲜有人提的 RabbitMQ-RPC 模式

  • RabbitMQ系列文章
  • 前言
  • 一、RPC 及 RabbitMQ-RPC模型
    • 1. RPC概念
    • 2. RabbitMQ-RPC模型
  • 二、RPC模式的Demo
    • 1. 调用方代码
    • 2. 服务方代码
  • 三、RabbitMQ-RPC 的利与弊


前言

我们之前介绍了RabbitMQ的五种模型(详见上方系列文章《RabbitMQ灵活运用,怎么理解五种消息模型》),即简单、轮询、主题、发布/订阅、路由、主题五种模式。除此之外,rabbitMQ还提供了一种 RPC 模式,这种模式是怎么回事?真的好用吗?一起来了解下


一、RPC 及 RabbitMQ-RPC模型

1. RPC概念

RPC是指远程过程调用(Remote Procedure Call),是一种计算机通信协议,用于将一个计算机程序的执行过程转移至另一台计算机上,但对用户而言,它就像是在本地运行一样,RPC通常用于分布式系统或组件化架构中,可以隐藏分布式系统背后的复杂性在这里插入图片描述

2. RabbitMQ-RPC模型

我们照例先看官方流程图

RPC 工作模式如下:

  1. 当客户端启动时,它会创建一个匿名独占回调队列。
  2. 对于 RPC 请求,客户端发送具有两个属性的消息:reply_to,设置为回调队列,correlation_id,设置为每个请求的唯一值,请求将发送到rpc_queue队列。
  3. RPC 工作线程(又名:服务器)正在等待该队列上的请求。当请求出现时,它会执行作业,并使用 reply_to 字段中的队列将包含结果的消息发送回客户端。
  4. 客户端等待回调队列中的数据。出现消息时,它会检查 correlation_id 属性。如果它与请求中的值匹配,则它将响应返回到应用程序。

可以看出,所谓RPC模式其实是两台机器互为消息生产者和消费者,同时利用两条队列,一个负责传递参数,一个负责传递结果

二、RPC模式的Demo

在进一步探讨前,我们先实际运行一段demo

1. 调用方代码

在这个示例中,我们创建了一个RPCClient类,并在构造函数中建立了RabbitMQ连接并创建了一个通道。我们还声明了一个请求队列名称requestQueueName和一个回复队列名称replyQueueName。

在call方法中,我们将请求消息发布到请求队列,并使用replyTo属性将回复队列名称包括在消息中。接着我们定义一个response阻塞队列,它将用于存储来自回复队列的响应。最后,我们使用BlockingQueue.take()方法等待响应内容并返回结果。

调用方代码如下(示例):

package com.example.seeu.mq;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeoutException;

public class RPCClient implements AutoCloseable {
    private final Connection connection;
    private final Channel channel;
    private final String requestQueueName = "rpc_queue";
    private String replyQueueName;
    final BlockingQueue<String> response = new ArrayBlockingQueue<>(1);

    public RPCClient() throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");

        connection = factory.newConnection();
        channel = connection.createChannel();
        replyQueueName = channel.queueDeclare().getQueue();
        channel.basicConsume(replyQueueName, true, (consumerTag, delivery) -> {
                response.offer(new String(delivery.getBody(), "UTF-8"));
        }, consumerTag -> {
        });
    }

    public String call(String message) throws IOException, InterruptedException {
        final String corrId = UUID.randomUUID().toString();

        AMQP.BasicProperties props = new AMQP.BasicProperties
                .Builder()
                .correlationId(corrId)
                .replyTo(replyQueueName)
                .build();

        channel.basicPublish("", requestQueueName, props, message.getBytes("UTF-8"));
        String result = response.take();

        return result;
    }

    public void close() throws IOException {
        connection.close();
    }

    public static void main(String[] argv) {
        try (RPCClient fibonacciRpc = new RPCClient()) {
            for (int i = 1; i <= 30; i++) {
                System.out.println(" [x] 请求 fib("+ i +")");
                String response = fibonacciRpc.call(String.valueOf(i));
                System.out.println(" [.] fib("+ i +")的结果为 '" + response + "'");
            }
        } catch (IOException | TimeoutException | InterruptedException e) {
            e.printStackTrace();
        }
    }
}

2. 服务方代码

在被调用方,声明了一个名为RPC_QUEUE_NAME的请求队列,并使用queueDeclare方法在RabbitMQ中声明它。我们还设置了basicQos,以确保每次只接收一个请求。

在main方法中,我们定义了一个DeliverCallback,它将处理每个传入的消息并生成响应。对于每个传入的消息,我们提取出correlationId和请求消息。我们执行一些计算,然后将结果作为字符串回复到名为replyTo的消息队列中。最后,我们使用basicAck确认我们已收到消息。

我们使用basicConsume在请求队列上注册一个消费者,并将DeliverCallback传递给它。我们还使用synchronized块和wait()方法来等待每个传入的消息处理完成并立即发送回复。

被调用方代码:

package com.example.seeu.mq;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class RPCServer {
    private static final String RPC_QUEUE_NAME = "rpc_queue";

    private static int fib(int n) {
        if (n == 0) return 0;
        if (n == 1) return 1;
        return fib(n - 1) + fib(n - 2);
    }

    public static void main(String[] argv) {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");

        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {
            channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null);
            channel.basicQos(1);

            System.out.println(" [x] Awaiting RPC requests");

            Object monitor = new Object();

            // 当成功接受到消息时,进行的处理逻辑
            DeliverCallback deliverCallback = (consumerTag, delivery) -> {
                AMQP.BasicProperties replyProps = new AMQP.BasicProperties
                        .Builder()
                        // 返回参数中需要附带请求的id
                        .correlationId(delivery.getProperties().getCorrelationId())
                        .build();

                String response = "";

                try {
                    String message = new String(delivery.getBody(), "UTF-8");
                    int n = Integer.parseInt(message);

                    System.out.println(" [.] fib(" + message + ")");
                    response += fib(n);
                } catch (RuntimeException e) {
                    System.out.println(" [.] " + e.toString());
                } finally {
                    // delivery.getProperties().getReplyTo() 即为返回的队列,将结果发送给该该队列
                    channel.basicPublish("", delivery.getProperties().getReplyTo(), replyProps, response.getBytes("UTF-8"));
                    // 接受确认,通知RabbitMQ 服务器自己已经消费掉此条消息
                    channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
                    synchronized (monitor) {
                        monitor.notify();
                    }
                }
            };

            // 推模式,监听获取消息
            channel.basicConsume(RPC_QUEUE_NAME, false, deliverCallback, (consumerTag -> {
            }));

            // 主线程活跃
            while (true) {
                synchronized (monitor) {
                    try {
                        monitor.wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        } catch (IOException | TimeoutException e) {
            e.printStackTrace();
        }
    }
}

结果:

以上示例仅为最简单的 1 对 1 同步调用的示例,当调用方和被调用方是多对多的关系时,调用方还得考虑获取的结果乱序的问题,必须为每一个请求找到其对应的结果

三、RabbitMQ-RPC 的利与弊

尽管我们现在知道了 RabbitMQ 有RPC 的用法,但是其跟真正的 RPC 框架(如duubo) 等进行对比,又有什么优劣呢?

优势:

  • Dubbo 的跨语言支持相对较差,主要支持 Java 、Golang及部分其他语言;RabbitMQ 支持的语言更多

劣势:

  • RabbitMQ周边支持不完善,如序列化问题,同步异步选择,服务治理,都需要开发者自行设计与编码
  • 调用方和被调用方多对多时,结果集可能乱序,需开发者为每个结果寻找其请求线程并返回值
  • 高度依赖MQ集群的可用性和稳定性,一旦MQ故障,RPC即无法进行

总结:和真正的RPC框架比起来,优势不足,缺陷太多,主要是需要开发者进行大量代码编写才能实现较完善的RPC功能,有这时间,不如直接使用现成的RPC框架了,所以虽然RabbitMQ 官方提供了 RPC 模式,但使用者寥寥。除非是少量的异步调用,或跨语言问题无法解决,否则笔者亦不建议在生产上使用RabbitMQ 作为 RPC 框架


本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/697465.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

2023年3季度广州/深圳DAMA-CDGA/CDGP数据治理认证招生

DAMA认证为数据管理专业人士提供职业目标晋升规划&#xff0c;彰显了职业发展里程碑及发展阶梯定义&#xff0c;帮助数据管理从业人士获得企业数字化转型战略下的必备职业能力&#xff0c;促进开展工作实践应用及实际问题解决&#xff0c;形成企业所需的新数字经济下的核心职业…

【C语言%的多种用法】

C语言%的多种用法 C语言中%号那些事儿1、%之取余运算符的使用2、%之作格式符使用2.1、C语言格式符 3、%之搭配 * 的使用4、%之搭配 # 的使用5、客套话 C语言中%号那些事儿 前言&#xff1a; 众所周知&#xff1a;%百分号&#xff0c;在九年义务教育中&#xff0c;表示分数的分…

图片识别文字怎么做?这几种方法轻松解决

在现代社会&#xff0c;图片已经成为人们日常生活和工作中不可或缺的一部分。但是&#xff0c;由于图片中往往包含了大量的文字信息&#xff0c;如果要手动输入这些文字&#xff0c;不仅费时费力&#xff0c;而且容易出错。因此&#xff0c;将图片中的文字进行识别已经成为我们…

应用打包部署k8s (包括dockerfile打包及仓库上传)

文章目录 一、docker run 运行测试1.数据库测试2.接口测试3.前端界面测试 二、编写dockerfile1.前端2.后端 三、推送镜像&#xff08;后面k8s就从这里获取镜像了&#xff09;四.编写 docker-compose.yml五.K8s部署1.yaml文件编写及部署houduan-deployment.yamlhouduan-service.…

【STM32】F103 时钟树

STM32F103是一款基于ARM Cortex-M3内核的32位微控制器&#xff0c;它具有丰富的外设资源和灵活的时钟配置。本文将从以下几个方面介绍STM32F103的时钟树&#xff1a; 时钟树的概念和作用时钟树的组成和分类时钟树的配置方法和步骤时钟树的应用实例 一、时钟树的概念和作用二、时…

如何通过Nacos获取当前服务注册的IP信息

一台机器可能存在多个网卡也就同时存在多个IP地址,如果我想知道我这个服务在向Nacos注册的时候使用的哪一个IP该怎么获取呢? 非常简单,你可以通过这种方式获取 import com.alibaba.cloud.nacos.NacosDiscoveryProperties; import org.springframework.boot.CommandLineRunne…

Angular 调试 —— 一个真实的多重循环导致的Bug

导致性能问题的原因可能很复杂&#xff0c;也可能很简单&#xff0c;今天让我们来看一个现实的例子。一个多重循环导致列表卡死。 startDemo() {this.processing true// 创建复杂数据结构const data [];for (let i 0; i < 5000; i) {const innerArray [];for (let j …

【数据压缩】LZ77算法原理及实现

1. 引言 LZ77算法是采用字典做数据压缩的算法,由以色列的两位大神Jacob Ziv与Abraham Lempel在1977年发表的论文《A Universal Algorithm for Sequential Data Compression》中提出。 基于统计的数据压缩编码,比如Huffman编码,需要得到先验知识——信源的字符频率,然后进…

EasyExcel导出csv文件,用Office Excel打开乱码

1.前言 导出的列里有中文&#xff0c;导出后用Excel打开乱码 2.原因 搜索相关资料&#xff0c;csv和excel的编码不一致&#xff0c;需要在导出csv的时候设置编码GBK 3.验证

多级时间轮定时器

一. 多级时间轮实现框架 ​上图是5个时间轮级联的效果图。中间的大轮是工作轮&#xff0c;只有在它上的任务才会被执行&#xff1b;其他轮上的任务时间到后迁移到下一级轮上&#xff0c;他们最终都会迁移到工作轮上而被调度执行。 多级时间轮的原理也容易理解&#xff1a;就拿时…

实用工具|教你如何使用备份神器 Rclone,手把手保姆级教程

目录 什么是Rclone 功能 特性 支持的提供商 虚拟提供商 使用 安装 配置 语法 命令列表 常用参数 日志 过滤 环境变量 最佳实践 什么是Rclone Rclone是一个用于管理云存储上的文件的命令行程序。它是云供应商web存储接口的一个功能丰富的替代方案。超过40种云存储…

【Java高级编程】Java集合

Java集合 1、Java集合框架概述1.1、集合框架的概述1.2、集合框架 2、Collection接口方法3、Iterator迭代器接口3.1、Iterator迭代器概述3.1、Iterator的使用 4、Conllection子接口一&#xff1a;List4.1、List框架4.2、ArrayList的源码分析4.3、面试题&#xff1a;ArrayList、L…

掌握这些vue内容,让你在提升代码复用上不再纠结!

前端工程化的最终目的都是为了能够更好地维护代码。代码复用是提升效率和可维护性的利器。 vue 中针对不同场景和业务情况&#xff0c;提供了各种方式。全面了解这些内容&#xff0c;可以在开发过程中让你得心应手&#xff01; 方式建议组件主要的构建模块组合式函数侧重于有状…

【单片机】STM32单片机,定时器,多路PWM,TIM1、TIM2、TIM3、TIM4,STM32F103

文章目录 STM32中文参考手册V10.pdfTIM1 的四路PWMTIM2 的四路PWMTIM3 的四路PWMTIM4 的四路PWM STM32中文参考手册V10.pdf 在《STM32中文参考手册V10.pdf》有写&#xff1a; TIM1 的四路PWM TIM1 的PWM是带互补输出的&#xff0c;较为高级和复杂&#xff0c;有兴趣可以参…

Z3Ordering编码及查询c++实现 (GeoMesa翻译)

网上搜了很多Z3-Ordering实现没搜到,通过 sfcurve-master和geomesa-geomesa-3.2.2 得scala代码改编而来, 环境为C, vs2015, 理论上windows和Linux都可以用. 不依赖任何库, 这项自身理解和翻译断断续续进行, 最近终于有一点进展, 本次放出Z3, 待全部实现完毕将直接挂出 详细…

知识图谱实战

一、知识图谱简单介绍 二、知识图谱的构建 三、知识图谱问答方案 NL2SQL:自然语言转为SQL语句 bulid_graph.py """知识图谱""" #三元组&#xff1a;实体-关系-实体 实体-属性-属性值import re,json from py2neo import Graph from collectio…

shell脚本检测进程的CPU内存占用率

使用方法&#xff1a; 把xxx替换为自己进程的名字&#xff0c;然后直接运行该脚本即可在当前目录下产生一个叫做memory_info.txt的文件&#xff0c;记录进程的CPU内存占用率信息。可以用来查看自己进程对系统资源的消耗情况。 #!/bin/bashprocess"xxx" output_file…

C#,数值计算——算术编码压缩技术与方法(Compression by Arithmetic Coding)源代码

算术编码的数据压缩 算术编码是无损和有损数据压缩算法中常用的一种算法。 这是一种熵编码技术&#xff0c;其中常见符号比罕见符号用更少的比特进行编码。与诸如霍夫曼编码之类的众所周知的技术相比&#xff0c;它具有一些优势。本文将详细描述CACM87算术编码的实现&#xf…

Uniapp_分包

前言&#xff1a;由于微信小程序的包只限制压缩不能超过2M&#xff0c;当开发的页面过多就要进行分包操作,tabbar页面不能进行分包其他页面可以 最多5个分包 不超过20M 第一步、找到这个位置 然后把这个代码复制进去 开启分包 "optimization" : {"subPackages&…

Linux系统【VS】Windows系统

✅作者简介&#xff1a;2022年博客新星 第八。热爱国学的Java后端开发者&#xff0c;修心和技术同步精进。 &#x1f34e;个人主页&#xff1a;Java Fans的博客 &#x1f34a;个人信条&#xff1a;不迁怒&#xff0c;不贰过。小知识&#xff0c;大智慧。 &#x1f49e;当前专栏…