用最少的代码模拟gRPC四种消息交换模式

news2024/11/19 12:14:17

我们知道,建立在HTTP2/3之上的gRPC具有四种基本的通信模式或者消息交换模式(MEP: Message Exchange Pattern),即Unary、Server Stream、Client Stream和Bidirectional Stream。本篇文章通过4个简单的实例演示它们在.NET平台上的实现原理,源代码从这里查看。

目录
一、定义ProtoBuf消息
二、请求/响应的读写
三、Unary
四、Server Stream
五、Client Stream
六、Bidirectional Stream

一、定义ProtoBuf消息

我们选择简单的“Hello World”场景进行演示:客户端请求指定一个或者多个名字,回复以“Hello, {Name}!”。为此我们在一个ASP.NET Core应用中定义了如下两个ProtoBuf消息HelloRequest和HelloReply,生成两个同名的消息类型。

syntax = "proto3";

message HelloRequest {
  string names = 1;
}

message HelloReply {
  string message = 1;
}

二、请求/响应的读写

gRPC框架的核心莫过于在服务端针对请求消息的读取和对响应消息的写入;以及在客户端针对请求消息的写入和对响应消息的读取。这四个核心功能被实现在如下这两个扩展方法中。如下面的代码片段所示,扩展方法WriteMessageAsync将指定的ProtoBuf消息写入PipeWriter对象中。为了确保消息能够被准确的读取,我们利用前置的四个字节存储了消息的字节数。

public static class ReadWriteExtensions
{
    public static ValueTask<FlushResult> WriteMessageAsync(this PipeWriter writer, IMessage message)
    {
        var length = message.CalculateSize();
        var span = writer.GetSpan(4+length);
        BitConverter.GetBytes(length).CopyTo(span);
        message.WriteTo(span.Slice(4, length));
        writer.Advance(4 + length);
        return writer.FlushAsync();
    }

    public static async Task ReadAndProcessAsync<TMessage>(this PipeReader reader, MessageParser<TMessage> parser, Func<TMessage, Task> handler) 
        where TMessage:IMessage<TMessage>
    {
        while(true)
        {
            var result = await reader.ReadAsync();
            var buffer = result.Buffer;
            while (TryReadMessage(ref buffer, out var message))
            {
                await handler(message!);
            }
            reader.AdvanceTo(buffer.Start, buffer.End);
            if(result.IsCompleted)
            {
                break;
            }
        }


        bool TryReadMessage(ref ReadOnlySequence<byte> buffer, out TMessage? message)
        {
            if(buffer.Length < 4)
            {
                message = default;
                return false;
            }

            Span<byte> lengthBytes = stackalloc byte[4];
            buffer.Slice(0,4).CopyTo(lengthBytes);
            var length = BinaryPrimitives.ReadInt32LittleEndian(lengthBytes);
            if (buffer.Length < length + 4)
            {
                message = default;
                return false;
            }

            message = parser.ParseFrom(buffer.Slice(4, length));
            buffer = buffer.Slice(length + 4);
            return true;
        }
    }
}

ReadAndProcessAsync扩展方法从指定的PipeReader对象中读取指定类型的ProtoBuf消息,并利用指定处理器(一个Func<TMessage, Task>委托)对它进行处理。由于写入时指定了消息的字节数,所以我们可以将承载消息的字节“精准地”读出来,并利用指定的MessageParser<TMessage>对其进行序列化。

三、Unary

我们知道正常的gRPC开发需要将包含一个或者多个操作的服务定义在ProtoBuf文件中,并利用它生成一个基类,我们通过继承这个基类并重写操作对应方法。对于ASP.NET Core gRPC来说,服务操作对应的方法最终会转换成对应的终结点并以路由的形式进行注册。这个过程其实并不复杂,但不是本篇文章关注的终结点。本文会直接注册四个对应的路由终结点来演示四个基本的消息交换模式。

Unary调用最为简单,就是简单的Request/Reply模式。在如下的代码中,我们注册了一个针对请求路径“/unary”的路由,对应的处理方法为如下所示的HandleUnaryCallAsync。该方法直接调用上面定义的ReadAndProcessAsync扩展方法将请求消息(HelloRequest)从请求的BodyReader中读取出来,并生成一个对应的HelloReply消息予以应答。后者利用上面的WriteMessageAsync扩展方法写入响应的BodyWriter。

 
 

using GrpcService; using System.IO.Pipelines; using System.Net; var app = WebApplication.Create(); app.MapPost("/unary", HandleUnaryCallAsync); await app.StartAsync();

await UnaryCallAsync();

static async Task HandleUnaryCallAsync(HttpContext httpContext) { var reader = httpContext.Request.BodyReader; var write = httpContext.Response.BodyWriter; await reader.ReadAndProcessAsync(HelloRequest.Parser, async hello => { var reply = new HelloReply { Message = $"Hello, {hello.Names}!" }; await write.WriteMessageAsync(reply); }); } static async Task UnaryCallAsync() { using (var httpClient = new HttpClient()) { var request = new HttpRequestMessage(HttpMethod.Post, "http://localhost:5000/unary") { Version = HttpVersion.Version20, VersionPolicy = HttpVersionPolicy.RequestVersionExact, Content = new MessageContent(new HelloRequest { Names = "foobar" }) }; var reply = await httpClient.SendAsync(request); await PipeReader.Create(await reply.Content.ReadAsStreamAsync()).ReadAndProcessAsync(HelloReply.Parser, reply => { Console.WriteLine(reply.Message); return Task.CompletedTask; }); } }

UnaryCallAsync模拟了客户端针对Unary服务操作的调用,具体的调用由我们熟悉的HttpClient对象完成。如代码片段所示,我们针对路由地址创建了一个HttpRequestMessage对象,并对其HTTP版本进行了设置(2.0),代表请求主体内容的HttpContent是一个MessageContent对象,具体的定义如下。MessageContent将代表ProtoBuf消息的IMessage对象作为主体内容,在重写的SerializeToStreamAsync,我们调用上面定义的WriteMessageAsync扩展方法将指定的IMessage对象写入输出流中。

public class MessageContent : HttpContent
{
    private readonly IMessage _message;
    public MessageContent(IMessage message) => _message = message;
    protected override async Task SerializeToStreamAsync(Stream stream, TransportContext? context)
    =>await PipeWriter.Create(stream).WriteMessageAsync(_message);
    protected override bool TryComputeLength(out long length)
    {
        length = -1;
        return false;
    }
}

创建的HttpRequestMessage对象利用HttpClient发送出去后,我们得到对应的HttpResponseMessage对象,并调用ReadAndProcessAsync扩展方法将主体内容读取出来并反序列化成HelloReply对象,其承载的问候消息将以如下的形式输出到控制台上。

四、Server Stream

Server Stream这种消息交换模式意味着服务端可以将内容以流的形式响应给客户端。作为模拟,客户端会携带一个名字列表(“foo,bar,baz,qux”),服务端以流的形式针对每个名字回复一个问候消息,具体的实现体现在针对请求路径“/serverstream”的路由处理方法HandleServerStreamCallAsync上。和上面一样,HandleServerStreamCallAsync方法利用我们定义的ReadAndProcessAsync方法读取作为请求的HelloRequest对象,并针对其携带的每一个名气生成一个HelloReply对象,后者最终通过我们定义的WriteMessageAsync方法予以响应。为了体验“流”的效果,我们添加了1秒的时间间隔。

 
 

using GrpcService; using System.IO.Pipelines; using System.Net; var app = WebApplication.Create(); app.MapPost("/unary", HandleUnaryCallAsync); app.MapPost("/serverstream", HandleServerStreamCallAsync); await app.StartAsync();

await ServerStreamCallAsync();

static async Task HandleServerStreamCallAsync(HttpContext httpContext) { var reader = httpContext.Request.BodyReader; var write = httpContext.Response.BodyWriter; await reader.ReadAndProcessAsync(HelloRequest.Parser, async hello => { var names = hello.Names.Split(','); foreach (var name in names) { var reply = new HelloReply { Message = $"Hello, {name}!" }; await write.WriteMessageAsync(reply); await Task.Delay(1000); } }); }

static async Task ServerStreamCallAsync() { using (var httpClient = new HttpClient()) { var request = new HttpRequestMessage(HttpMethod.Post, "http://localhost:5000/serverstream") { Version = HttpVersion.Version20, VersionPolicy = HttpVersionPolicy.RequestVersionExact, Content = new MessageContent(new HelloRequest { Names = "foo,bar,baz,qux" }) }; var reply = await httpClient.SendAsync(request, HttpCompletionOption.ResponseHeadersRead); await PipeReader.Create(await reply.Content.ReadAsStreamAsync()).ReadAndProcessAsync(HelloReply.Parser, reply => { Console.WriteLine($"[{DateTimeOffset.Now}]{reply.Message}"); return Task.CompletedTask; }); } }

模拟客户端调用的ServerStreamCallAsync方法在生成一个携带多个名字的HttpRequestMessage对象,并利用HttpClient将其发送出去。由于服务端是以流的形式对请求进行响应的,所以我们在调用SendAsync方法是将HttpCompletionOption.ResponseHeadersRead枚举作为第二个参数,这样我们才能在收到响应头部之后得到代表响应消息的HttpResponseMessage对象。这样的响应将会携带4个问候消息,我们同样利用ReadAndProcessAsync方法将读取并以如下的形式输出到控制台上。

五、Client Stream

Client Stream与Server Stream正好相反,客户端会以流的形式将请求内容提交给服务端进行处理。由于我们以HttpClient来模拟客户端,所以我们只能从HttpRequestMessage上作文章。具体来说,我们需要自定义一个HttpContent类型,让它以“客户端流”的形式相对方发送内容。这个自定义的HttpContent就是如下这个ClientStreamContent<TMessage>类型。如代码片段所示,ClientStreamContent<TMessage>是对一个ClientStreamWriter<TMessage>对象的封装,客户端程序利用后者以流的形式向服务端输出TMessage对象承载的内容。对于ClientStreamWriter<TMessage>方法来说,作为输出流的Stream对象是在ClientStreamContent<TMessage>重写的SerializeToStreamAsync方法中指定的。WriteAsync方法利用我们定义的WriteMessageAsync扩展方法实现了针对ProtoBuf消息的输出。客户端通过调用Complete方法决定客户端流是否终结,ClientStreamContent<TMessage>重写的SerializeToStreamAsync通过WaitAsync进行等待。

 
 

public class ClientStreamContent<TMessage> : HttpContent where TMessage:IMessage<TMessage> { private readonly ClientStreamWriter<TMessage> _writer; public ClientStreamContent(ClientStreamWriter<TMessage> writer)=> _writer = writer; protected override Task SerializeToStreamAsync(Stream stream, TransportContext? context) => _writer.SetOutputStream(stream).WaitAsync(); protected override bool TryComputeLength(out long length) => (length = -1) != -1; }

public class ClientStreamWriter<TMessage> where TMessage: IMessage<TMessage> { private readonly TaskCompletionSource<Stream> _streamSetSource = new(); private readonly TaskCompletionSource _streamEndSuource = new(); public ClientStreamWriter<TMessage> SetOutputStream(Stream outputStream) { _streamSetSource.SetResult(outputStream); return this; } public async Task WriteAsync(TMessage message) { var stream = await _streamSetSource.Task; await PipeWriter.Create(stream).WriteMessageAsync(message); } public void Complete()=> _streamEndSuource.SetResult(); public Task WaitAsync() => _streamEndSuource.Task; }

针对Client Stream的模拟体现在针对路径“/clientstream”的路由处理方法HandleClientStreamCallAsync。这个方法没有什么特别之处,它进行时调用ReadAndProcessAsync方法将HelloRequest消息读取出来,并将生成的问候语直接输出到本地(服务端)控制台上而已。

 
 

using GrpcService; using System.IO.Pipelines; using System.Net; var app = WebApplication.Create(); app.MapPost("/unary", HandleUnaryCallAsync); app.MapPost("/serverstream", HandleServerStreamCallAsync); app.MapPost("/clientstream", HandleClientStreamCallAsync); await app.StartAsync();

await ClientStreamCallAsync(); static async Task HandleClientStreamCallAsync(HttpContext httpContext) { var reader = httpContext.Request.BodyReader; var write = httpContext.Response.BodyWriter; await reader.ReadAndProcessAsync(HelloRequest.Parser, async hello => { var names = hello.Names.Split(','); foreach (var name in names) { Console.WriteLine($"[{DateTimeOffset.Now}]Hello, {name}!"); } }); } static async Task ClientStreamCallAsync() { using (var httpClient = new HttpClient()) { var writer = new ClientStreamWriter<HelloRequest>(); var request = new HttpRequestMessage(HttpMethod.Post, "http://localhost:5000/clientstream") { Version = HttpVersion.Version20, VersionPolicy = HttpVersionPolicy.RequestVersionExact, Content = new ClientStreamContent<HelloRequest>(writer) }; _ = httpClient.SendAsync(request, HttpCompletionOption.ResponseHeadersRead); foreach (var name in new string[] {"foo","bar","baz","qux" }) { await writer.WriteAsync(new HelloRequest { Names = name}); await Task.Delay(1000); } writer.Complete(); } }

在用于模拟Client Stream调用的ClientStreamCallAsync方法中,我们首先创建了一个ClientStreamWriter<HelloRequest>对象,并利用它创建了对应的ClientStreamContent<HelloRequest>对象,后者将作为HttpRequestMessage消息的主体内容。在调用HttpClient的SendAsync方法后,我们并没有作任何等待(否则程序将卡在这里),而是利用ClientStreamWriter<HelloRequest>对象以流的形式发送了四个请求。服务端在接收到每个请求后,会将对应的问候语以如下的形式输出到控制台上。

六、Bidirectional Stream

Bidirectional Stream将连接作为真正的“双工通道”。这次我们不再注册额外的路由,而是直接利用前面模拟Unary的路由终结点来演示双向通信。在如下所示的客户端模拟方法BidirectionalStreamCallAsync中,我们采用上面的方式以流的形式发送了4个HelloRequest。

 
 

using GrpcService; using System.IO.Pipelines; using System.Net; var app = WebApplication.Create(); app.MapPost("/unary", HandleUnaryCallAsync); app.MapPost("/serverstream", HandleServerStreamCallAsync); app.MapPost("/clientstream", HandleClientStreamCallAsync); await app.StartAsync();

await BidirectionalStreamCallAsync(); static async Task BidirectionalStreamCallAsync() { using (var httpClient = new HttpClient()) { var writer = new ClientStreamWriter<HelloRequest>(); var request = new HttpRequestMessage(HttpMethod.Post, "http://localhost:5000/unary") { Version = HttpVersion.Version20, VersionPolicy = HttpVersionPolicy.RequestVersionExact, Content = new ClientStreamContent<HelloRequest>(writer) }; var task = httpClient.SendAsync(request, HttpCompletionOption.ResponseHeadersRead); _ = Task.Run(async () => { var response = await task; await PipeReader.Create(await response.Content.ReadAsStreamAsync()).ReadAndProcessAsync(HelloReply.Parser, reply => { Console.WriteLine($"[{DateTimeOffset.Now}]{reply.Message}"); return Task.CompletedTask; }); }); foreach (var name in new string[] { "foo", "bar", "baz", "qux" }) { await writer.WriteAsync(new HelloRequest { Names = name }); await Task.Delay(1000); } writer.Complete(); } }

于此同时,我们在得到表示响应消息的HttpResponseMessage后,调用ReadAndProcessAsync方法将作为响应的问候语以如下的方式输出到控制台上。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/29243.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

HTML+CSS大作业 格林蛋糕(7个页面) 餐饮美食网页设计与实现

&#x1f380; 精彩专栏推荐&#x1f447;&#x1f3fb;&#x1f447;&#x1f3fb;&#x1f447;&#x1f3fb; ✍️ 作者简介: 一个热爱把逻辑思维转变为代码的技术博主 &#x1f482; 作者主页: 【主页——&#x1f680;获取更多优质源码】 &#x1f393; web前端期末大作业…

编写第一个Qt程序和分析第一个Qt程序

文章目录编写第一个Qt程序新建一个项目项目的文件组成和管理项目的编译、调试与运行分析第一个Qt程序创建项目1) main.cpp2) mainwindow.h和mainwindow.cpp编码实现简易的窗口界面编写第一个Qt程序 已剪辑自: http://c.biancheng.net/view/1817.html 学习一种编程语言或编程环…

最漂亮:yWorks yFiles Diagramming SDK 5.4.0.2

yWorks yfiles Diagramming SDK 5.4.0.2 卓越的 绘图 SDK 使用 yFiles 自动布局的图表 您的数据可视化软件开发套件 yFiles是行业领先的 图表软件库。20 多年来&#xff0c;公司和机构一直在使用此 SDK 来 描述、编辑和分析他们的连接数据。 准备好探索自动布局和交互式图形组…

Qt-OpenCV学习笔记--人脸识别--基于Haar特征的cascade分类器

概述 基于Haar特征的cascade分类器(classifiers) 是Paul Viola和 Michael Jone在2001年&#xff0c;论文”Rapid Object Detection using a Boosted Cascade of Simple Features”中提出的一种有效的物品检测(object detect)方法。它是一种机器学习方法&#xff0c;通过许多正…

G1D21-作业-AttacKGSVMkg_book偷懒哈哈哈

唔~咖啡泡出来好好看呀&#xff01; 一、写作业 第一件事是将昨天读的NER综述补充到作业之中~大概30min 50分钟&#xff0c;补充完了思维导图和文档&#xff0c;明确了下一步论文的阅读方向——NER的综述/网安NER具体技术类文章&#xff08;找找最新的叭&#xff09;。 二、…

2022 IDEA大会引领科技创新趋势 沈向洋团队重磅发布低空经济白皮书

11月22日&#xff0c;2022 IDEA大会在深圳顺利开幕。大会由深圳市科技创新委员会、深圳市人才工作局和深圳市福田区人民政府指导&#xff0c;粤港澳大湾区数字经济研究院&#xff08;International Digital Economy Academy&#xff0c;简称“IDEA研究院”&#xff09;主办。 …

leetcode 216. 组合总和 III

文章目录题目思考代码和注释总结题目 找出所有相加之和为 n 的 k 个数的组合&#xff0c;且满足下列条件&#xff1a; 只使用数字1到9 每个数字 最多使用一次 返回 所有可能的有效组合的列表 。该列表不能包含相同的组合两次&#xff0c;组合可以以任何顺序返回。 来源&…

【图像分类】Efficientnet的学习

文章目录1. Efficientnet的学习1.1 网络模型1.2 MBConv卷积块1.3 模型规模1.4 模型训练方式2. Efficientnet-pytorch代码3.参考1. Efficientnet的学习 论文&#xff1a;https://arxiv.org/abs/1905.11946 1.1 网络模型 主要结构&#xff1a; 基线模型EfficientNet-B0Effici…

ImmunoChemistry艾美捷Annexin DNA损伤ELISA试剂盒方案

使用ImmunoChemistry艾美捷DNA损伤&#xff08;8-OHdG&#xff09;ELISA试剂盒定量尿液、细胞培养物、血浆和其他样品基质中的8-OHdG。该试剂盒提供了快速的培养时间、稳定的试剂和用户友好的方案。使用吸光度板读取器分析结果。 8-羟基-2-脱氧鸟苷&#xff08;8-OHdG&#xff…

开放式激光振镜运动控制器:C++ 快速调用图形库应用

今天&#xff0c;正运动小助手给大家分享一下开放式激光振镜运动控制器&#xff1a;C快速调用图形库应用&#xff0c;本文以二维码打标、文本打标、矢量图形打标为例&#xff0c;解决用户在激光打标时需要进行各种复杂的操作和函数库调用时容易出现的错误问题。 01 ZMC408SCAN…

牛客网语法篇练习复合类型(一)

1.试计算在区间1 到n 的所有整数中&#xff0c;数字x&#xff08;0 ≤ x ≤ 9&#xff09;共出现了多少次&#xff1f; 例如&#xff0c;在1到11 中&#xff0c;即在1、2、3、4、5、6、7、8、9、10、11 中&#xff0c;数字1 出现了4 次。 n,x list(map(int,input().split())) …

CSS的两种渐变

线性渐变和径向渐变 几个常见的例子效果 <!DOCTYPE html> <html><head><meta charset"utf-8"><title></title><style>.a{font-size: 20px;width: 100%;height: 50px;margin: 10px;background-image: linear-gradient(r…

应急监管双重预防机制数字化管理解决方案

新《安全生产法》&#xff0c;将组织建立并落实双重预防工作机制写入生产经营单位主要负责人职责中&#xff0c;双重预防机制建设已上升到法律的高度。2021 年 12 月 31 日国务院安委会印发《全国危险化学品安全风险集中治理方案》&#xff0c;将推进基于信息化的危险化学品企业…

大二Web课程设计——海贼王中乔巴专题漫画(可以很好的应付老师的作业)HTML+CSS

HTML实例网页代码, 本实例适合于初学HTML的同学。该实例里面有设置了css的样式设置&#xff0c;有div的样式格局&#xff0c;这个实例比较全面&#xff0c;有助于同学的学习,本文将介绍如何通过从头开始设计个人网站并将其转换为代码的过程来实践设计。 精彩专栏推荐&#x1f4…

Baklib帮助中心|如何设置好客户服务帮助您的客户?

在如今这个高度智能化的时代&#xff0c;很多人已经习惯了靠自己解决问题&#xff0c;所以当人们浏览网页、使用某件商品时&#xff0c;首先想到的不是客户服务&#xff0c;而是服务中心。 那么&#xff0c;您如何设计帮助中心帮助您的客户&#xff1f;这是大多数公司希望解决…

工程项目管理的主要内容都是什么?

工程项目资金管理组织工作主要就文本是什么&#xff1f; 1&#xff0e;物业公司的资金管理组织工作&#xff08;工程建设建筑工程&#xff09; 物业公司的资金管理组织工作是全过程的&#xff0c;主要就包括工程项目重大决策和实行期的全过程&#xff0c;也即从基本建设工程项…

【图文教程】Centos 7下安装Hadoop

环境说明&#xff1a; 系统&#xff1a;Centos7 在VM中安装的 hadoop版本&#xff1a;2.7.7 JDK&#xff1a;1.8 注意&#xff1a;Hadoop需要Java环境的。记得安装Java环境 PS&#xff1a;Centos JDK安装 mkdir /data1&#xff1a;上传jdk的tar.解压 2&#xff1a;修改/e…

vulntarget-b靶场详细通关记录

vulntarget-b靶场详细通关记录 前言 这个靶场打了好几天才打下来&#xff0c;在上线msf和免杀过火绒还有psexec横向移动卡中了很久。而且这个靶场的通关资料较少&#xff0c;吐槽一下网上的相关文章很多关键步骤都不写而且复现不成功。以下将记录个人通关vulntarget-b靶场的详…

简述二进制码、十进制码、BCD码、十六进制码转换的算法

进制转换简述二进制码、十进制码、BCD码、十六进制码转换的算法把四字节 BCD 码 5287 转换为十六进制码 14A7H简述二进制码、十进制码、BCD码、十六进制码转换的算法 ①二进制转十进制&#xff1a;把二进制的“1”&#xff0c;从右边第一个开始按2的1次方&#xff0c;2的2次方…

如何使用轻量应用服务器自带的Cloudreve应用镜像搭建属于自己的云网盘?

Cloudreve是一款开源的网盘软件&#xff0c;支持服务器本机、腾讯云COS等多种存储方式&#xff0c;提供离线下载、拖拽上传、在线预览等功能&#xff0c;可以帮助用户快速搭建个人或多人使用的网盘系统。腾讯云轻量应用服务器 Cloudreve应用镜像集合了Cloudreve、Nginx、MariaD…