.netcore grpc客户端流方法详解

news2025/1/15 20:49:00

一、客户端流式处理概述

  1. 客户端流式处理方法在该方法没有接收消息的情况下启动。 requestStream 参数用于从客户端读取消息。 返回响应消息时,客户端流式处理调用完成。
  2. 客户端可以发送多个消息流到服务端,当所有客户端消息流发送结束,调用请求流完结方法,则标记客户端流消息推送结束,等待服务端执行完成。
  3. 等同于客户端发送批量消息,服务端统一处理。

二、案例介绍

  1. 接下来 会提供三个案例,用于大家理解
  2. 第一个客户端流的基础用法
  3. 第二个客户端流的优化版本
  4. 第三个客户端的文件流式传输

三、服务端配置(注意:grpc相关配置参考我之前的文章

// 1.提供公共的实体proto文件
// 2.服务引用对应的proto文件
// 3.定义三个客户流方法


// 公共messages.proto文件
syntax = "proto3";

option csharp_namespace = "GrpcProject";

package grpc.serviceing;

// 请求体
message ServerRequest{
	string name = 1;
	double height = 2;
	int32 age = 3;
	bool flag = 4;
	float x = 5;
	float y = 6;
	float z= 7;
	repeated string departments = 8;
}

message ServerFileRequest{
	bytes fileBytes = 1;
}

// 响应体
message ServerResponse{
	bool result = 1;
}



// clientstream.proto 定义service方法
syntax = "proto3";

import "google/protobuf/empty.proto";
import "Protos/messages.proto";

option csharp_namespace = "GrpcProject";

package grpc.serviceing;

service ClientStreamRpc{
	// 基础客户端流处理
	rpc StreamingFromClient	(stream ServerRequest) returns (ServerResponse);
	// foreach 客户端流式处理 前提使用C#8 或者更高版本
	rpc StreamingClientForeach(stream ServerRequest) returns (ServerResponse);
	// 文件流传输
	rpc FileStreamFromClient(stream ServerFileRequest) returns (ServerResponse);
}

服务接口实现:

   /// <summary>
    /// 客户端流式处理
    /// </summary>
    public class ClientStreamService : ClientStreamRpc.ClientStreamRpcBase
    {
        /// <summary>
        /// 基础访问流模式
        /// </summary>
        /// <param name="requestStream"></param>
        /// <param name="context"></param>
        /// <returns></returns>
        public override async Task<ServerResponse> StreamingFromClient(IAsyncStreamReader<ServerRequest> requestStream, ServerCallContext context)
        {
            while (await requestStream.MoveNext())
            {
                await Console.Out.WriteLineAsync("\r\n-------------------------激光射线------------------------------\r\n");
                ServerRequest request = requestStream.Current;
                await Handle(request);
            }
            return new ServerResponse();
        }


        /// <summary>
        /// foreach访问流模式
        /// </summary>
        /// <param name="requestStream"></param>
        /// <param name="context"></param>
        /// <returns></returns>
        public override async Task<ServerResponse> StreamingClientForeach(IAsyncStreamReader<ServerRequest> requestStream, ServerCallContext context)
        {
            await foreach (var request in requestStream.ReadAllAsync())
            {
                await Console.Out.WriteLineAsync("\r\n-------------------------激光射线------------------------------\r\n");
                await Handle(request);
            }

            return new ServerResponse();
        }

        /// <summary>
        /// 读取文件流 组合成完整的文件。
        /// </summary>
        /// <param name="requestStream"></param>
        /// <param name="context"></param>
        /// <returns></returns>
        public override async Task<ServerResponse> FileStreamFromClient(IAsyncStreamReader<ServerFileRequest> requestStream, ServerCallContext context)
        {
            ServerResponse serverResponse = new ServerResponse();
            serverResponse.Result = false;
            // 存储流
            MemoryStream ms = new();
            await foreach (var request in requestStream.ReadAllAsync())
            {
                ms.Write(request.FileBytes.Span);
                await Console.Out.WriteLineAsync($"记录字节大小:{ms.Length} bytes");
            }

            string filePath = Path.Combine(Directory.GetCurrentDirectory(), "log.txt");

            using FileStream fileStream = new FileStream(filePath, FileMode.OpenOrCreate, FileAccess.ReadWrite);

            fileStream.Position = 0;
            await fileStream.WriteAsync(ms.ToArray());

            fileStream.Flush();
            fileStream.Close();

            serverResponse.Result = true;


            return serverResponse;
        }


        private async Task Handle(ServerRequest request)
        {
            if (request != null)
            {
                foreach (var prop in request.GetType().GetProperties())
                {
                    if (prop.CanRead && prop.GetValue(request) is not null)
                    {
                        await Console.Out.WriteLineAsync($"property  name:{prop.Name};value:{prop.GetValue(request)}");
                    }
                }
            }
        }
    }

 await foreach (var request in requestStream.ReadAllAsync()) 版本是C#8及以上版本才支持,这个需要注意!

四、客户端配置

  1. 引用proto文件,配置为客户端类型
  2. 根据编译生成的函数进行传参调用
    public partial class ClientStreamForm : Form
    {
        private readonly string _url;

        public ClientStreamForm(IConfiguration configuration)
        {
            InitializeComponent();
            _url = configuration.GetConnectionString("ConnectionString");
        }

        private async void button1_Click(object sender, EventArgs e)
        {
            var channel = GrpcChannel.ForAddress(_url);

            var client = new ClientStreamRpc.ClientStreamRpcClient(channel);

            var streamingFromClient = client.StreamingFromClient();



            for (int i = 0; i < 3; i++)
            {
                ServerRequest request = new();
                request.Age = i;
                request.X = new Random(30).Next(50);
                request.Y = new Random(60).Next(50);
                request.Z = new Random(90).Next(50);
                request.Flag = i % 2 == 0 ? true : false;
                request.Height = 12;
                request.Departments.Add($"{i}");

                await streamingFromClient.RequestStream.WriteAsync(request);
            }
            await streamingFromClient.RequestStream.CompleteAsync();
        }

        private async void btnGrpcOptimize_Click(object sender, EventArgs e)
        {
            var channel = GrpcChannel.ForAddress(_url);

            var client = new ClientStreamRpc.ClientStreamRpcClient(channel);

            var streamingFromClient = client.StreamingClientForeach();

            var departments = new List<string>()
            {
                "one",
                "two",
                "three",
                "four",
                "five",
                "six"
            };
            ServerRequest request = new();

            foreach (var department in departments)
            {
                request.Age = new Random(20).Next(100);
                request.X = new Random(30).Next(50);
                request.Y = new Random(60).Next(50);
                request.Z = new Random(90).Next(50);
                request.Flag = false;
                request.Height = 12;
                request.Departments.Add(department);

                await streamingFromClient.RequestStream.WriteAsync(request);
            }

            await streamingFromClient.RequestStream.CompleteAsync();
        }

        private async void btnFile_Click(object sender, EventArgs e)
        {
            var channel = GrpcChannel.ForAddress(_url);

            var client = new ClientStreamRpc.ClientStreamRpcClient(channel);

            var streamingFromClient = client.FileStreamFromClient();

            var strMessage = "伟大抗日战争的一周年纪念,七月七日,快要到了。全民族的力量团结起来,坚持抗战,坚持统一战线,同敌人作英勇的战争,快一年了。这个战争,在东方历史上是空前的,在世界历史上也将是伟大的,全世界人民都关心这个战争。身受战争灾难、为着自己民族的生存而奋斗的每一个中国人,无日不在渴望战争的胜利。然而战争的过程究竟会要怎么样?能胜利还是不能胜利?能速胜还是不能速胜?很多人都说持久战,但是为什么是持久战?怎样进行持久战?很多人都说最后胜利,但是为什么会有最后胜利?怎样争取最后胜利?这些问题,不是每个人都解决了的,甚至是大多数人至今没有解决的。于是失败主义的亡国论者跑出来向人们说:中国会亡,最后胜利不是中国的。某些性急的朋友们也跑出来向人们说:中国很快就能战胜,无需乎费大气力。这些议论究竟对不对呢?我们一向都说:这些议论是不对的。可是我们说的,还没有为大多数人所了解。一半因为我们的宣传解释工作还不够,一半也因为客观事变的发展还没有完全暴露其固有的性质,还没有将其面貌鲜明地摆在人们之前,使人们无从看出其整个的趋势和前途,因而无从决定自己的整套的方针和做法。现在好了,抗战十个月的经验,尽够击破毫无根据的亡国论,也尽够说服急性朋友们的速胜论了。在这种情形下,很多人要求做个总结性的解释。尤其是对持久战,有亡国论和速胜论的反对意见,也有空洞无物的了解。“卢沟桥事变以来,四万万人一齐努力,最后胜利是中国的。”这样一种公式,在广大的人们中流行着。这个公式是对的,但有加以充实的必要。抗日战争和统一战线之所以能够坚持,是由于许多的因素:全国党派,从共产党到国民党;全国人民,从工人农民到资产阶级;全国军队,从主力军到游击队;国际方面,从社会主义国家到各国爱好正义的人民;敌国方面,从某些国内反战的人民到前线反战的兵士。总而言之,所有这些因素,在我们的抗战中都尽了他们各种程度的努力。每一个有良心的人,都应向他们表示敬意。我们共产党人,同其他抗战党派和全国人民一道,唯一的方向,是努力团结一切力量,战胜万恶的日寇。今年七月一日,是中国共产党建立的十七周年纪念日。为了使每个共产党员在抗日战争中能够尽其更好和更大的努力,也有着重地研究持久战的必要。因此,我的讲演就来研究持久战。和持久战这个题目有关的问题,我都准备说到;但是不能一切都说到,因为一切的东西,不是在一个讲演中完全说得了的。";

            var spanLength = strMessage.Length / 10;
            for (int i = 0; i < 10; i++)
            {
                var startIndex = spanLength * i;
                string spanMessage;
                if (i == 9)
                {
                    spanMessage = strMessage.Substring(startIndex);
                }
                else
                {
                    spanMessage = strMessage.Substring(startIndex, spanLength - 1);
                }
                var strBytes = Encoding.UTF8.GetBytes(spanMessage);
                var byteString = ByteString.CopyFrom(strBytes, 0, strBytes.Length);
                await streamingFromClient.RequestStream.WriteAsync(new ServerFileRequest() { FileBytes = byteString });

                Task.Delay(1000).Wait();
            }
            await streamingFromClient.RequestStream.CompleteAsync();
        }
    }
  1. IConfiguration 接口在program类中进行注入,读取appsettings.json文件
  2. 调用接口查看执行结果
    1. 客户端流基础模式

         2.客户端流foreach模式

           3.文件流处理模式

     

五、源码地址

链接:https://pan.baidu.com/s/1PnLhysfGbVxC1ecpu7XReA 
提取码:l6w0

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/858306.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Mac 调试 ios safar

1. 打开Mac的 Safari 浏览器的“开发”菜单 运行 Safari 浏览器&#xff0c;然后依次选取“Safari 浏览器”>“偏好设置”&#xff0c;点按“高级”面板&#xff0c;然后勾选“在菜单栏中显示开发菜单”。 2. 开启IPhone的Safari调试模式 启用 Web 检查 功能&#xff0c;打…

『PostgreSQL』在 PostgreSQL中创建只读权限和读写权限的账号

&#x1f4e3;读完这篇文章里你能收获到 理解在 PostgreSQL 数据库中创建账号的重要性以及如何进行账号管理掌握在 PostgreSQL 中创建具有只读权限和读写权限的账号的步骤和方法学会使用 SQL 命令来创建账号、为账号分配适当的权限以及控制账号对数据库的访问级别了解如何确保…

OR36 链表的回文结构 题解

题目描述&#xff1a;链表的回文结构_牛客题霸_牛客网 (nowcoder.com) 对于一个链表&#xff0c;请设计一个时间复杂度为O(n),额外空间复杂度为O(1)的算法&#xff0c;判断其是否为回文结构。 给定一个链表的头指针A&#xff0c;请返回一个bool值&#xff0c;代表其是否为回文结…

QT开发环境下的一些运行框架概述

QT运行框架与MFC的不同&#xff0c;两者结合会学习得更快&#xff01; 送给那些刚入门QT的学生们&#xff01;这里是小白学程序开发&#xff0c;欢迎加入我们跟我一起学习&#xff0c;一起成长&#xff01; 区别一&#xff1a;入口函数 首先执行的是主对话框函数中的构造函数…

Oracle database Linux自建环境备份至远端服务器自定义保留天数

环境准备 linux下安装oracle 请看 oracle12c单节点部署 系统版本: CentOS 7 软件版本&#xff1a; Oracle12c 备份策略与实现方法 此次备份依赖Oracle自带命令exp与linux下crontab命令&#xff08;定时任务&#xff09; exp Oracle中exp命令是一个用于导出数据库数据和对象的…

【Java】常用Stream API

常见 Stream 流表达式 总体结构图 一、两大类型 中间操作(Intermediate Operations) 中间操作是指在Stream上执行的操作, 它们返回一个新的Stream, 允许你链式地进行多个中间操作. 终端操作(Terminal Operations) 对Stream进行最终处理的操作, 当调用终端操作时, Stream会开始执…

老师如何制作学生分班信息查询系统?

即将迎来新学期的开始&#xff01;学校和老师们将忙于为我们可爱的学生做分班准备。如果有一个强大的分班查询系统&#xff0c;学生们就可以提前知道自己被分到哪个班级&#xff0c;有哪些课程&#xff0c;以及班主任是谁&#xff01; 别担心&#xff0c;我将教你如何设计一个…

前端开发常见效果

目录 css实现图像填充文字 css实现手风琴效果 css实现网站变灰色 elementUi的导航栏效果 css实现滚动吸附效果 鼠标经过&#xff0c;元素内部放大 css实现图像填充文字 效果图&#xff1a; 代码&#xff1a; <!DOCTYPE html> <html><head><meta c…

带你彻底了解什么是API接口?

作为一名资深程序员&#xff0c;我知道很多人对API接口这个名词可能还不太了解。今天我要给大家分享一些关于API接口的知识&#xff0c;让你们彻底了解它的概念和作用。一起来看看吧&#xff01; 首先&#xff0c;我们先来解释一下API的全称─Application Programming Interfac…

FPGA应用学习-----FIFO双口ram解决时钟域+asic样机的时钟选通

60m写入异步ram&#xff0c;再用100M从ram中读出 写地址转换为格雷码后&#xff0c;打两拍和读地址判断是否空产生。相反读地址来判断是否满产生。 分割同步模块 asic时钟的门控时钟&#xff0c;fpga是不推荐采用门控时钟的&#xff0c;有很多方法移除fpga的时钟选通。 如果是a…

0基础学C#笔记10:归并排序法

文章目录 前言一、递归的方式二、代码总结前言 将一个大的无序数组有序,我们可以把大的数组分成两个,然后对这两个数组分别进行排序,之后在把这两个数组合并成一个有序的数组。由于两个小的数组都是有序的,所以在合并的时候是很快的。 一、递归的方式 通过递归的方式将大…

研发效能行业工具书来袭!12位专家推荐,文末包邮免费送!

近年来&#xff0c;研发效能度量是一个热点话题。在行业里几乎每家公司的高层都在关注如何有效度量研发效能&#xff0c;合理提升效率、项目质量&#xff0c;降低成本。 尽管这些公司来自互联网、金融、房地产、汽车行业等各行各业&#xff0c;且业务不同、软件研发模式不同&a…

誉天HCIA-CloudService3.0 课程简介

课时数&#xff1a;30 课时 一、云计算概念和价值 1.1 什么是云计算 1.1.1 IT 发展历程及面临的挑战 1.1.2 云计算的定义 1.1.3 云计算的应用场景 1.1.4 云计算技术 1.1.4.1 虚拟化 1.1.4.2 云计算 1.1.4.3 容器 1. 2. 云计算部署形态及商业模式 1.2.1 IaaS 1.2.2 PaaS 1.2.3 S…

BM8 链表中倒数最后k个结点

/*** struct ListNode {* int val;* struct ListNode *next;* ListNode(int x) : val(x), next(nullptr) {}* };*/ class Solution { public:/*** 代码中的类名、方法名、参数名已经指定&#xff0c;请勿修改&#xff0c;直接返回方法规定的值即可** * param pHead ListNode类 …

Blender如何给fbx模型添加材质贴图并导出带有材质贴图的模型

推荐&#xff1a;使用 NSDT场景编辑器快速助你搭建可二次编辑的3D应用场景 此教程适合新手用户&#xff0c;专业人士直接可直接绕路。 本教程中介绍了利用Blender建模软件&#xff0c;只需要简单几步就可以为模型添加材质贴&#xff0c;图&#xff0c;并且导出带有材质的模型文…

php代码审计,php漏洞详解

文章目录 1、输入验证和输出显示2、命令注入(Command Injection)3、eval 注入(Eval Injection)4、跨网站脚本攻击(Cross Site Scripting, XSS)5、SQL 注入攻击(SQL injection)6、跨网站请求伪造攻击(Cross Site Request Forgeries, CSRF)7、Session 会话劫持(Session Hijacking…

PC端自动化工具pywinauto:如何选择应用程序的窗口?

如何选择需要打开的应用程序的窗口有2种方法&#xff1a; ①通过窗口标题/窗口类名来打开应用程序窗口&#xff0c;第一步就要打开窗口精灵&#xff0c;通过拖动放大镜到应用窗口找到窗口标题和窗口类名&#xff0c;如下图所示&#xff1a; 接下来就可以根据窗口类名和标题选择…

PostgreSQL技术沙龙|PPT合集速来下载

新机遇&#xff0c;新态势&#xff0c;新发展 2023年8月5日&#xff0c;由中国开源软件推进联盟PG分会&#xff08;中国PG分会&#xff09;联合杭州云贝教育共同举办的“PostgreSQL技术沙龙杭州站”圆满举行。本次活动结合当下去O、国产化趋势&#xff0c;邀请社群技术专家围…

LoadRunner 脚本优化之——参数化迭代介绍

在LoadRunner的脚本优化时&#xff0c;有时发送给服务器的请求参数化时&#xff0c;服务器返回的内容也会和参数化的内容相对应&#xff0c;例如发送的请求带有查询key123&#xff0c;则服务器也会返回含有123相关的内容。这时我们在使用检查点检查服务器参数化返回的数据正确性…

威胁性恶意软件,基于LINUX多云环境中的威胁

恶意软件迁移到基于 Linux 的云系统 SC Media 基于 Linux 的威胁经常被忽视。这是一个问题&#xff0c;因为大多数多云环境都是基于 Linux 的。VMware 最近在一份报告和 SC Media 网络广播中强调了这个问题。 这是事实&#xff1a;大多数云在 Linux 上运行。90% 的云由 Linux 操…