rabbitmq+netcore6 【6】RPC:远程过程调用

news2025/1/10 14:25:09

文章目录

    • 1)前言
    • 2)Client interface 客户接口
    • 3)Callback queue回调队列
    • 4)Correlation Id 关联Id
    • 5)Summary总结
    • 6)综合以上代码
      • 准备工作
      • 服务端
      • 客户端
      • 结果验证

官网参考链接: https://www.rabbitmq.com/tutorials/tutorial-six-dotnet.html
其他人的翻译版参考: https://www.cnblogs.com/grayguo/p/5606886.html
以下工作是本人在参考官网的教程下,结合自己的理解做的代码流程,更深刻的理解还需要参考官网进行学习哦

1)前言

在Work Queues文章中, 我们学习了如何使用Work Queues在多个消费者之间分发耗时任务。但是如果我们需要在远程电脑上运行一个方法并等待其执行结果呢?这种模式就是我们一般讲的RPC(远程过程调用),类似客户端与服务端的发送请求的过程。

在这篇文章当中我们将会使用RabbitMQ构建一个简单的RPC系统,一个客户端和一个可扩展的 RPC 服务器,由于我们没有耗时任务需要分发,因此我们创建一个假的RPC服务返回斐波那契数字。

2)Client interface 客户接口

为了说明RPC服务是怎样被使用的,我们创建一个了简单的Client类,该类有一个Call的方法用来发送RPC请求然后阻塞直到请求结果的返回。

var rpcClient = new RPCClient();

Console.WriteLine(" [x] Requesting fib(30)");
var response = rpcClient.Call("30");
Console.WriteLine(" [.] Got '{0}'", response);

rpcClient.Close();

注意:
虽然RPC在计算处理中是一种非常常见的模型,但是经常有非常多的争议,当编程人员不注意一个方法调用是本地的还是较慢的RPC调用就会出现问题。这样的困惑会导致系统不可预测,在调试时也增加了不必要的复杂性。错用RPC并不会简化软件而且可能导致一堆不可维护的屎山代码

请将以下建议记在心里:

  • 确保清楚哪个函数是本地的哪个函数是远程的。
  • 文档化你的系统,确保组件之间的依赖更加清晰。
  • 处理异常场景,当远程RPC服务长时间中断时,客户端应该怎么处理。

当存在疑问时避免使用RPC,而应该使用异步管道来代替RPC的功能–如阻塞,可以将结果异步地带入到下一个计算阶段。

3)Callback queue回调队列

通常情况下,在RabbitMQ上进行RPC调用非常简单。客户端发起一个请求服务端响应一个消息。为了能够接收一个响应消息,我们需将一个回调队列地址带着请求一起发送给服务器。

var props = channel.CreateBasicProperties();
props.ReplyTo = replyQueueName;

var messageBytes = Encoding.UTF8.GetBytes(message);
channel.BasicPublish(exchange: "",
                     routingKey: "rpc_queue",
                     basicProperties: props,
                     body: messageBytes);

Message properties
AMQP 0-9-1约定 预定义了14个属性可以随消息一起发送,其中大多数的属性很少使用,除了下面几个

  • Persistent:为true时使消息持久化,为其他值时随程序结束而消失
  • DeliveryMode:熟悉该协议的用户可以选择使用此属性而不是Persistent。他们控制着同一件事
  • contentType:用来描述编码的的mime-type,例如,对于常用的 JSON 编码,最好将此属性设置为:application/json
  • replyTo:通常被用来命名一个回调队列
  • correlationId:用于将 RPC 响应与请求相关联

4)Correlation Id 关联Id

在之前呈现的方法我们建议为每一个RPC请求都创建了一个回调队列,但这是非常低效的,更好的方式是为每一个客户端创建一个回调队列。这样就带来了一个新的问题,当接收到一个结果的时候我们不知道该结果对应于哪个RPC请求,这就是使用correlationId 属性的原因。我们将给每个RPC请求设定一个唯一值,然后当我们从回调队列当中接收到消息的时,可以根据该属性值将响应与请求匹配上。如果我们发现一个未知的correlationId值可以将其忽略掉,因为它不属于我们的请求。

你可能会问为什么我们要忽略回调队列中的未知消息,而不是产生一个error?这是由于可能的竞争机制,虽然这种情况是非常少见的,但是存在这种可能:RPC服务刚刚把计算结果放入回调队列就挂了,但这时还没有来的及进行对Request进行Ack确认,这种情况下重启的RPC服务器会把该条消息再处理一次。这就是为什么客户端需要平滑的处理重复的correlationId结果,并且 RPC服务在理想情况下是幂等的。

5)Summary总结

在这里插入图片描述
我们的RPC系统将会这样工作:

  • 当客户端启动的时候它会创建一个匿名的排他回调队列
  • 对于RPC请求,客户端在发送消息上添加两个属性,replyTo–用作回调队列,correlationId–每一个请求的唯一值。
  • 请求被发送到rpc_queue 队列
  • RPC服务端等待rpc_queue 上面的请求,当请求出现时,它处理请求然后把结果发送到replyTo标示的回调队列上去。
  • 客户端在回调队列上等待结果,当消息出现时,它先会检查correlationId是否正确,如果与请求中的值匹配成功,就会将响应消息返回给应用程序。

6)综合以上代码

准备工作

新建一个netcore6的控制台项目,添加RabbitMQ的包依赖

NuGet\Install-Package RabbitMQ.Client -Version 6.4.0

在这里插入图片描述
新建一个类MainClass,注释掉program.cs的代码,使MainClass中的tatic void Main(string[] args)作为程序的入口
按照此方法新建2个netcore6的控制台项目,分别代表服务端,客户端。

其中斐波那契数列 函数(递归写法)

private static int fib(int n)
{
    if (n == 0 || n == 1) return n;
    return fib(n - 1) + fib(n - 2);
}

注:此代码是在VisualStudio上运行的,所以与官网代码略有不同。

运行逻辑
先运行服务端,服务端会开始监听请求;再运行客户端,客户端会发送请求,即数字30以及请求参数props(包括请求的唯一标识props.CorrelationId,自己生成的replyQueueName存入props.ReplyTo
在这里插入图片描述
与此同时服务端接收到了请求,根据接收消息的ea.BasicProperties获取请求的相关参数props(包括唯一标识props.CorrelationId以及props.ReplyTo)计算f(30),props.ReplyTo作为的routingKey,新的replyProps(replyProps.CorrelationId=请求的props.CorrelationId)作为basicProperties,f(30)作为body进行发布。

channel.BasicPublish(exchange: "", routingKey: props.ReplyTo, basicProperties: replyProps, body: responseBytes); //将响应的消息发回客户端

客户端这边一直等待f(30)的返回,如果收到了消息会根据ea.BasicProperties.CorrelationId判断是不是自己刚刚发送的请求,如果是则会放入respQueue队列,respQueue队列有值就会返回结果,打印到控制台上。

服务端

  1. 像往常一样,我们首先建立连接、通道并声明队列
  2. 我们可能希望运行多个服务器进程。为了将负载平均分布到多个服务器上,我们需要在设置channel.basicQos时设置预取计数prefetchCount
  3. 我们使用 BasicConsume 来访问队列。然后我们注册一个传递处理程序,在其中执行工作并将响应返回
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Collections.Generic;
using System.ComponentModel;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace RPCService
{
    public class MainClass
    {
        public static void Main()
        {
            var factory = new ConnectionFactory() { HostName = "localhost" };
            using (var connection = factory.CreateConnection())
            using (var channel = connection.CreateModel())
            {
                channel.QueueDeclare(queue: "rpc_queue", durable: false, exclusive: false, autoDelete: false, arguments: null); //声明队列
                channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false); //负载均分

                var consumer = new EventingBasicConsumer(channel); //新建消费者,接收客户端发过来的请求
                channel.BasicConsume(queue: "rpc_queue", autoAck: false, consumer: consumer);
                Console.WriteLine("[Service] Awaiting RPC requests");
                consumer.Received += (model, ea) =>
                {
                    string response = null;
                    int n = -1;
                    var body = ea.Body.ToArray();
                    var props = ea.BasicProperties; //获取请求的参数
                    var replyProps = channel.CreateBasicProperties(); //新建响应的参数
                    replyProps.CorrelationId = props.CorrelationId;
                    Console.WriteLine("[Service] Processing requests...");
                    try
                    {
                        var message = Encoding.UTF8.GetString(body);
                        n = int.Parse(message); //取出请求消息中的数字
                        Console.WriteLine("[Service] getting fib ({0})", message);
                        response = fib(n).ToString(); //计算fib(n)作为响应消息
                        Console.WriteLine("[Service]  fib ({0}) = {1}", message, response);
                    }
                    catch (Exception ex)
                    {
                        Console.WriteLine("[Service] " + ex.Message);
                        response = "";
                    }
                    finally
                    {
                        var responseBytes = Encoding.UTF8.GetBytes(response);
                        channel.BasicPublish(exchange: "", routingKey: props.ReplyTo, basicProperties: replyProps, body: responseBytes); //将响应的消息发回客户端
                        Console.WriteLine("[Service] Already sent fib({0})={1} to {2}", n, response, props.ReplyTo);
                        channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false); //回传确认消息,表示服务端已收到
                    }
                };
                Console.WriteLine(" Press [enter] to exit.");
                Console.ReadLine();
            }
        }

        /// 

        /// Assumes only valid positive integer input.
        /// Don't expect this one to work for big numbers, and it's
        /// probably the slowest recursive implementation possible.
        /// 

        private static int fib(int n)
        {
            if (n == 0 || n == 1)
            {
                return n;
            }

            return fib(n - 1) + fib(n - 2);
        }

    }
}

客户端

  1. 创建连接、通道,然后创建一个用来响应的排他的回调消息队列
  2. 我们订阅这个回调的队列,以便收到RPC的响应信息
  3. call方法发起真实的 RPC请求
  4. 我们先生成一个唯一的correlationId 并将其保存,以便与它对应的响应在到达时能够被识别
  5. 接下来我们发布一个请求,其中包括replyTo 和correlationId属性
  6. 这时我们可以等待直到对应的响应消息到达
  7. 对于每条响应消息,客户端都会检查相关 ID 是否是我们正在寻找的 CorrelationId,如果是我们需要保存这个结果
  8. 最后我们把响应结果返回给用户

新建MainClass类写入如下代码

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace PRCClient
{
    public class MainClass
    {
        static void Main()
        {
            var rpcClient = new RpcClient();

            Console.WriteLine(" [Client] Requesting fib(30)");
            var response = rpcClient.Call("30"); //f(30)

            Console.WriteLine(" [Client] Got '{0}'", response);
            rpcClient.Close();
        }
    }

}

新建RpcClient类,写入如下代码

using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace PRCClient
{
    public class RpcClient
    {
        private readonly IConnection connection;
        private readonly IModel channel;
        private readonly string replyQueueName;
        private readonly EventingBasicConsumer consumer;
        private readonly BlockingCollection<string> respQueue = new BlockingCollection<string>();
        private readonly IBasicProperties props;

        public RpcClient() //构造方法初始化属性
        {
            var factory = new ConnectionFactory() { HostName = "localhost",UserName="lyh",Password="1211" };

            connection = factory.CreateConnection();
            channel = connection.CreateModel();

            //props随着请求一起发送给服务器
            replyQueueName = channel.QueueDeclare().QueueName; //回调队列名
            props = channel.CreateBasicProperties(); //生成基本属性
            var correlationId = Guid.NewGuid().ToString(); //生成唯一的correlationId
            props.CorrelationId = correlationId;  
            props.ReplyTo = replyQueueName; //将接收方设置为该回调队列名

            consumer = new EventingBasicConsumer(channel); //接收到服务端的响应值后进行消费
            consumer.Received += (model, ea) => //接收消息
            {
                var body = ea.Body.ToArray();
                var response = Encoding.UTF8.GetString(body);
                if (ea.BasicProperties.CorrelationId == correlationId) //判断响应消息的correlationId是否与自己发出的相同
                {
                    respQueue.Add(response);//相同则将返回值计入respQueue,只要队列有值就会作为call的返回结果
                }
            };

            channel.BasicConsume(
                consumer: consumer,
                queue: replyQueueName,
                autoAck: true);
        }

        public string Call(string message)
        {
            var messageBytes = Encoding.UTF8.GetBytes(message);
            channel.BasicPublish( //发送 30 到服务端
                exchange: "",
                routingKey: "rpc_queue",
                basicProperties: props,
                body: messageBytes);
            Console.WriteLine(" [Client] already sent '{0}' to Service", message);
            return respQueue.Take();
        }

        public void Close()
        {
            connection.Close();
        }
    }
}

结果验证

在这里插入图片描述
运行结果与准备工作的过程描述一致。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/167219.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

372. 超级次方

372. 超级次方题目算法设计&#xff1a;迭代算法设计&#xff1a;递归题目 传送门&#xff1a;https://leetcode.cn/problems/super-pow/ 题目不难懂&#xff0c;问题在于 b 是一个非常非常大的数&#xff0c;会溢出。 迭代和递归&#xff0c;各有解决方法&#xff0c;记录在…

Elasticsearch入门——kibanna和postman操作Elasticsearch索引示例

目录一、使用kibanna操作Elasticsearch索引示例二、使用postman操作Elasticsearch索引示例三、kibanna和postman操作Elasticsearch的总结一、使用kibanna操作Elasticsearch索引示例 启动Elasticsearch和kibanna服务&#xff0c;浏览器访问http://localhost:5601/,进入Dev Tools…

week11

T1汤姆斯的天堂梦 题目描述 汤姆斯生活在一个等级为 000 的星球上。那里的环境极其恶劣&#xff0c;每天 121212 小时的工作和成堆的垃圾让人忍无可忍。他向往着等级为 NNN 的星球上天堂般的生活。 有一些航班将人从低等级的星球送上高一级的星球&#xff0c;有时需要向驾驶…

【C语言】数据结构基础(每日小细节025),有三数之和哦

算法好题初阶&#xff08;一共14回已经更新完毕&#xff09;&#xff0c;从今天开始就是基础的数据结构题目 1.只出现一次的数字 如果不额外开辟任何空间的话一定要想到位运算符 异或^ :两个整数异或&#xff0c;遵循相同为0&#xff0c;相异为1的二进制位运算规则 &#x…

【Nginx 基础】

Nginx 的安装 Nginx 的静态网站部署 理解 Nginx 的反向代理与负载均衡&#xff0c;能够配置反向代理与负载均衡 一、 Nginx 概述 Nginx 是一款高性能的 HTTP 服务器/反向代理服务器及电子邮件&#xff08;IMAP/POP3&#xff09;代理服务器&#xff0c;由俄罗斯的程序工程师伊戈…

spring学习系列

Spring_三种方式的依赖注入1.第一种&#xff0c;set方式&#xff0c;property2.构造器注入&#xff08;构造方法&#xff09;3.p命名空间注入4、注入各种数据类型//老师类 public class Teacher {private String name;private int age; }//课程类 public class Course {private…

云原生技术学习笔记(基础版)

一、容器基本概念容器运行时&#xff0c;多种虚拟化技术&#xff0c;runC、kata、gVisor等。containerd -shim不是个lib&#xff0c;是个守护进程&#xff0c;管理容器生命周期,可被containerd动态接管。&#xff08;可以从containerd中脱离出来&#xff0c;插件化管理&#xf…

jvm系列(1)--JVM和Java体系架构

目录Java-跨平台的语言JVM-跨语言的平台多语言混合编程虚拟机虚拟机概念Java虚拟机JVM的位置JVM的整体结构Java代码执行流程JVM的架构模型基于栈的指令集架构基于寄存器的指令级架构两种架构的举例JVM架构总结JVM的生命周期虚拟机的启动虚拟机的执行虚拟机的退出Java-跨平台的语…

VTK-vtkSelectPolyDataFilter

前言&#xff1a;本博文主要记录vtkSelectPolyDataFilter接口的应用&#xff0c;实现原理&#xff0c;以及与其近似的vtkClipPolyData&vtkImplicitSelectionLoop的应用相比较&#xff0c;帮助小伙伴理解vtkSelectPolyDataFilter接口的实现原理&#xff0c;并且与其它接口进…

2023新生个人训练赛第08场解题报告

问题 A: Candies 题目描述 We have a 2N grid. We will denote the square at the i-th row and j-th column (1≤i≤2, 1≤j≤N) as (i,j). You are initially in the top-left square, (1,1). You will travel to the bottom-right square, (2,N), by repeatedly moving ri…

鉴源论坛 · 观通丨轨交系统安全性设计

作者 | 刘艳青 上海控安安全测评中心安全测评部测试经理 版块 | 鉴源论坛 观通 引语&#xff1a;第一篇对轨交信号系统从铁路系统分类和组成、城市轨交系统分类和组成、城市轨交系统功能、城市轨交系统发展方面做了介绍&#xff0c;第二篇从信号基础出发&#xff0c;讲述了信…

【蓝桥杯算法 1】AcWing166.飞行员兄弟

本文已收录专栏 &#x1f332;《蓝桥杯周训练》&#x1f332; “飞行员兄弟”这个游戏&#xff0c;需要玩家顺利的打开一个拥有 16 个把手的冰箱。 已知每个把手可以处于以下两种状态之一&#xff1a;打开或关闭。 只有当所有把手都打开时&#xff0c;冰箱才会打开。 把手可…

支持数位板的远程软件,实现远程使用 Wacom 数位板

现在数位板越来越流行了&#xff0c;影视、动漫、游戏、设计等行业经常需要用到。Wacom 是数位板领域的全球领导者&#xff0c;其设备为创意人员带来了真正的纸感绘图体验。 数位板用户需要远程办公的时候&#xff0c;经常会遇到两个问题&#xff1a;远程软件不支持数位板、远…

(考研湖科大教书匠计算机网络)第一章概述-第五节1:计算机网络体系结构之分层思想和举例

文章目录一&#xff1a;计算机网络结构分层的必要性&#xff08;1&#xff09;分层思想&#xff08;2&#xff09;计算机网络分层思想①&#xff1a;如何让两台计算机通过网线传输数据②&#xff1a;如何让分组在单个网络内传输③&#xff1a;如何让分组在网络间传输④&#xf…

SpringBoot项目练习

项目名称&#xff1a;旅游网站后台管理一&#xff1a;项目简介旅游网站后台管理,包括如下用户&#xff1a;旅游线路&#xff1a;线路图片&#xff1a;线路分类&#xff1a;旅行社&#xff1a;后台技术&#xff1a;springboot、mybatis、mybatis plus前台&#xff1a;bootstrap、…

测试开发 | 专项测试技术初识Hook

本文节选自霍格沃兹测试学院内部教材Hook 技术需要预先分析目标应用的源代码和逻辑&#xff0c;根据目标测试场景设置目标、逻辑和数据&#xff0c;然后运行时动态的对目标函数参数值、逻辑或者返回值做修改&#xff0c;达到修改现有函数逻辑、实现目标测试场景的目的。Hook的价…

JavaWeb基础(一) Mybatis使用详解

JavaWeb基础——Mybatis 1&#xff0c;配置文件实现CRUD 如上图所示产品原型&#xff0c;里面包含了品牌数据的 查询 、按条件查询、添加、删除、批量删除、修改 等功能&#xff0c;而这些功能其实就是对数据库表中的数据进行CRUD操作。接下来我们就使用Mybatis完成品牌数据的…

3-2存储系统-主存与CPU的连接外部存储器

文章目录一.主存与CPU的连接&#xff08;一&#xff09;连接原理&#xff08;二&#xff09;主存容量的扩展1.位扩展法2.字扩展法3.字位同时扩展法&#xff08;三&#xff09;存储芯片的地址分配和片选1.线选法2.译码片选法二.外部存储器&#xff08;一&#xff09;磁盘储存器1…

JVM-三色标记

一、什么叫三色标记三色也叫三色抽象&#xff0c;它是所有mutator和collector都必须遵守的定律。它把对象标记为三种颜色&#xff1a;白色&#xff1a;对象还未被垃圾收集器访问&#xff0c;在回收的开始阶段所有的对象均为白色&#xff08;当然了这只是指概念上的&#xff0c;…

PaddleNLP开源UTC通用文本分类技术,斩获ZeroCLUE、FewCLUE双榜第一

飞桨PaddlePaddle 2023-01-12 20:02 发表于湖北 针对产业级分类场景中任务多样、数据稀缺、标签迁移难度大等挑战&#xff0c;百度提出了一个大一统的通用文本分类技术UTC&#xff08;Universal Text Classfication&#xff09;。 UTC在ZeroCLUE和FewCLUE两个榜单上均位居榜首…