文章目录
- 1)前言
- 2)Client interface 客户接口
- 3)Callback queue回调队列
- 4)Correlation Id 关联Id
- 5)Summary总结
- 6)综合以上代码
- 准备工作
- 服务端
- 客户端
- 结果验证
官网参考链接: https://www.rabbitmq.com/tutorials/tutorial-six-dotnet.html
其他人的翻译版参考: https://www.cnblogs.com/grayguo/p/5606886.html
以下工作是本人在参考官网的教程下,结合自己的理解做的代码流程,更深刻的理解还需要参考官网进行学习哦
1)前言
在Work Queues文章中, 我们学习了如何使用Work Queues在多个消费者之间分发耗时任务。但是如果我们需要在远程电脑上运行一个方法并等待其执行结果呢?这种模式就是我们一般讲的RPC(远程过程调用),类似客户端与服务端的发送请求的过程。
在这篇文章当中我们将会使用RabbitMQ构建一个简单的RPC系统,一个客户端和一个可扩展的 RPC 服务器,由于我们没有耗时任务需要分发,因此我们创建一个假的RPC服务返回斐波那契数字。
2)Client interface 客户接口
为了说明RPC服务是怎样被使用的,我们创建一个了简单的Client类,该类有一个Call的方法用来发送RPC请求然后阻塞直到请求结果的返回。
var rpcClient = new RPCClient();
Console.WriteLine(" [x] Requesting fib(30)");
var response = rpcClient.Call("30");
Console.WriteLine(" [.] Got '{0}'", response);
rpcClient.Close();
注意:
虽然RPC在计算处理中是一种非常常见的模型,但是经常有非常多的争议,当编程人员不注意一个方法调用是本地的还是较慢的RPC调用就会出现问题。这样的困惑会导致系统不可预测,在调试时也增加了不必要的复杂性。错用RPC并不会简化软件而且可能导致一堆不可维护的屎山代码
请将以下建议记在心里:
- 确保清楚哪个函数是本地的哪个函数是远程的。
- 文档化你的系统,确保组件之间的依赖更加清晰。
- 处理异常场景,当远程RPC服务长时间中断时,客户端应该怎么处理。
当存在疑问时避免使用RPC,而应该使用异步管道来代替RPC的功能–如阻塞,可以将结果异步地带入到下一个计算阶段。
3)Callback queue回调队列
通常情况下,在RabbitMQ上进行RPC调用非常简单。客户端发起一个请求服务端响应一个消息。为了能够接收一个响应消息,我们需将一个回调队列地址带着请求一起发送给服务器。
var props = channel.CreateBasicProperties();
props.ReplyTo = replyQueueName;
var messageBytes = Encoding.UTF8.GetBytes(message);
channel.BasicPublish(exchange: "",
routingKey: "rpc_queue",
basicProperties: props,
body: messageBytes);
Message properties
AMQP 0-9-1约定 预定义了14个属性可以随消息一起发送,其中大多数的属性很少使用,除了下面几个
Persistent
:为true时使消息持久化,为其他值时随程序结束而消失DeliveryMode
:熟悉该协议的用户可以选择使用此属性而不是Persistent
。他们控制着同一件事contentType
:用来描述编码的的mime-type,例如,对于常用的 JSON 编码,最好将此属性设置为:application/json
replyTo
:通常被用来命名一个回调队列correlationId
:用于将 RPC 响应与请求相关联
4)Correlation Id 关联Id
在之前呈现的方法我们建议为每一个RPC请求都创建了一个回调队列,但这是非常低效的,更好的方式是为每一个客户端创建一个回调队列。这样就带来了一个新的问题,当接收到一个结果的时候我们不知道该结果对应于哪个RPC请求,这就是使用correlationId 属性的原因。我们将给每个RPC请求设定一个唯一值,然后当我们从回调队列当中接收到消息的时,可以根据该属性值将响应与请求匹配上。如果我们发现一个未知的correlationId值可以将其忽略掉,因为它不属于我们的请求。
你可能会问为什么我们要忽略回调队列中的未知消息,而不是产生一个error?这是由于可能的竞争机制,虽然这种情况是非常少见的,但是存在这种可能:RPC服务刚刚把计算结果放入回调队列就挂了,但这时还没有来的及进行对Request进行Ack确认,这种情况下重启的RPC服务器会把该条消息再处理一次。这就是为什么客户端需要平滑的处理重复的correlationId结果,并且 RPC服务在理想情况下是幂等的。
5)Summary总结
我们的RPC系统将会这样工作:
- 当客户端启动的时候它会创建一个匿名的排他回调队列
- 对于RPC请求,客户端在发送消息上添加两个属性,replyTo–用作回调队列,correlationId–每一个请求的唯一值。
- 请求被发送到rpc_queue 队列
- RPC服务端等待rpc_queue 上面的请求,当请求出现时,它处理请求然后把结果发送到replyTo标示的回调队列上去。
- 客户端在回调队列上等待结果,当消息出现时,它先会检查correlationId是否正确,如果与请求中的值匹配成功,就会将响应消息返回给应用程序。
6)综合以上代码
准备工作
新建一个netcore6的控制台项目,添加RabbitMQ的包依赖
NuGet\Install-Package RabbitMQ.Client -Version 6.4.0
新建一个类MainClass,注释掉program.cs的代码,使MainClass中的tatic void Main(string[] args)作为程序的入口
按照此方法新建2个netcore6的控制台项目,分别代表服务端,客户端。
其中斐波那契数列 函数(递归写法)
private static int fib(int n)
{
if (n == 0 || n == 1) return n;
return fib(n - 1) + fib(n - 2);
}
注:此代码是在VisualStudio上运行的,所以与官网代码略有不同。
运行逻辑:
先运行服务端,服务端会开始监听请求;再运行客户端,客户端会发送请求,即数字30以及请求参数props(包括请求的唯一标识props.CorrelationId
,自己生成的replyQueueName
存入props.ReplyTo
。
与此同时服务端接收到了请求,根据接收消息的ea.BasicProperties
获取请求的相关参数props(包括唯一标识props.CorrelationId以及props.ReplyTo)计算f(30),props.ReplyTo
作为的routingKey
,新的replyProps
(replyProps.CorrelationId=请求的props.CorrelationId)作为basicProperties
,f(30)作为body进行发布。
channel.BasicPublish(exchange: "", routingKey: props.ReplyTo, basicProperties: replyProps, body: responseBytes); //将响应的消息发回客户端
客户端这边一直等待f(30)的返回,如果收到了消息会根据ea.BasicProperties.CorrelationId判断是不是自己刚刚发送的请求,如果是则会放入respQueue队列,respQueue队列有值就会返回结果,打印到控制台上。
服务端
- 像往常一样,我们首先建立连接、通道并声明队列
- 我们可能希望运行多个服务器进程。为了将负载平均分布到多个服务器上,我们需要在设置channel.basicQos时设置预取计数prefetchCount
- 我们使用 BasicConsume 来访问队列。然后我们注册一个传递处理程序,在其中执行工作并将响应返回
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Collections.Generic;
using System.ComponentModel;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace RPCService
{
public class MainClass
{
public static void Main()
{
var factory = new ConnectionFactory() { HostName = "localhost" };
using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{
channel.QueueDeclare(queue: "rpc_queue", durable: false, exclusive: false, autoDelete: false, arguments: null); //声明队列
channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false); //负载均分
var consumer = new EventingBasicConsumer(channel); //新建消费者,接收客户端发过来的请求
channel.BasicConsume(queue: "rpc_queue", autoAck: false, consumer: consumer);
Console.WriteLine("[Service] Awaiting RPC requests");
consumer.Received += (model, ea) =>
{
string response = null;
int n = -1;
var body = ea.Body.ToArray();
var props = ea.BasicProperties; //获取请求的参数
var replyProps = channel.CreateBasicProperties(); //新建响应的参数
replyProps.CorrelationId = props.CorrelationId;
Console.WriteLine("[Service] Processing requests...");
try
{
var message = Encoding.UTF8.GetString(body);
n = int.Parse(message); //取出请求消息中的数字
Console.WriteLine("[Service] getting fib ({0})", message);
response = fib(n).ToString(); //计算fib(n)作为响应消息
Console.WriteLine("[Service] fib ({0}) = {1}", message, response);
}
catch (Exception ex)
{
Console.WriteLine("[Service] " + ex.Message);
response = "";
}
finally
{
var responseBytes = Encoding.UTF8.GetBytes(response);
channel.BasicPublish(exchange: "", routingKey: props.ReplyTo, basicProperties: replyProps, body: responseBytes); //将响应的消息发回客户端
Console.WriteLine("[Service] Already sent fib({0})={1} to {2}", n, response, props.ReplyTo);
channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false); //回传确认消息,表示服务端已收到
}
};
Console.WriteLine(" Press [enter] to exit.");
Console.ReadLine();
}
}
///
/// Assumes only valid positive integer input.
/// Don't expect this one to work for big numbers, and it's
/// probably the slowest recursive implementation possible.
///
private static int fib(int n)
{
if (n == 0 || n == 1)
{
return n;
}
return fib(n - 1) + fib(n - 2);
}
}
}
客户端
- 创建连接、通道,然后创建一个用来响应的排他的回调消息队列
- 我们订阅这个回调的队列,以便收到RPC的响应信息
- call方法发起真实的 RPC请求
- 我们先生成一个唯一的correlationId 并将其保存,以便与它对应的响应在到达时能够被识别
- 接下来我们发布一个请求,其中包括replyTo 和correlationId属性
- 这时我们可以等待直到对应的响应消息到达
- 对于每条响应消息,客户端都会检查相关 ID 是否是我们正在寻找的 CorrelationId,如果是我们需要保存这个结果
- 最后我们把响应结果返回给用户
新建MainClass类写入如下代码
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace PRCClient
{
public class MainClass
{
static void Main()
{
var rpcClient = new RpcClient();
Console.WriteLine(" [Client] Requesting fib(30)");
var response = rpcClient.Call("30"); //f(30)
Console.WriteLine(" [Client] Got '{0}'", response);
rpcClient.Close();
}
}
}
新建RpcClient类,写入如下代码
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace PRCClient
{
public class RpcClient
{
private readonly IConnection connection;
private readonly IModel channel;
private readonly string replyQueueName;
private readonly EventingBasicConsumer consumer;
private readonly BlockingCollection<string> respQueue = new BlockingCollection<string>();
private readonly IBasicProperties props;
public RpcClient() //构造方法初始化属性
{
var factory = new ConnectionFactory() { HostName = "localhost",UserName="lyh",Password="1211" };
connection = factory.CreateConnection();
channel = connection.CreateModel();
//props随着请求一起发送给服务器
replyQueueName = channel.QueueDeclare().QueueName; //回调队列名
props = channel.CreateBasicProperties(); //生成基本属性
var correlationId = Guid.NewGuid().ToString(); //生成唯一的correlationId
props.CorrelationId = correlationId;
props.ReplyTo = replyQueueName; //将接收方设置为该回调队列名
consumer = new EventingBasicConsumer(channel); //接收到服务端的响应值后进行消费
consumer.Received += (model, ea) => //接收消息
{
var body = ea.Body.ToArray();
var response = Encoding.UTF8.GetString(body);
if (ea.BasicProperties.CorrelationId == correlationId) //判断响应消息的correlationId是否与自己发出的相同
{
respQueue.Add(response);//相同则将返回值计入respQueue,只要队列有值就会作为call的返回结果
}
};
channel.BasicConsume(
consumer: consumer,
queue: replyQueueName,
autoAck: true);
}
public string Call(string message)
{
var messageBytes = Encoding.UTF8.GetBytes(message);
channel.BasicPublish( //发送 30 到服务端
exchange: "",
routingKey: "rpc_queue",
basicProperties: props,
body: messageBytes);
Console.WriteLine(" [Client] already sent '{0}' to Service", message);
return respQueue.Take();
}
public void Close()
{
connection.Close();
}
}
}
结果验证
运行结果与准备工作的过程描述一致。