.netcore grpc双向流方法详解

news2024/11/24 20:51:29

一、双向流处理概述

  1. 简单来讲客户端可以向服务端发送消息流,服务端也可以向客户端传输响应流,即客户端和服务端可以互相通讯
  2. 客户端无需发送消息即可开始双向流式处理调用 。 客户端可选择使用 RequestStream.WriteAsync 发送消息。 使用 ResponseStream.MoveNext() 或 ResponseStream.ReadAllAsync() 可访问从服务流式处理的消息。ResponseStream 没有更多消息时,双向流式处理调用完成。

二、案例简介

  1. 客户端发送请求流通过equestStream.WriteAsync传入到服务端
  2. 服务端响应到客户端的流通过ResponseStream.WriteAsync写入到客户端
  3. 服务端使用System.Threading.Channels保证线程安全交互

三、服务端配置(注意:grpc相关配置参考我之前的文章)

  1. 配置.proto文件
// 1.提供公共的实体proto文件
// 2.服务引用对应的proto文件
// 3.定义三个客户流方法

//定义messages.proto文件令需要注意项目文件中的特性GrpcServices=None;

syntax = "proto3";

option csharp_namespace = "GrpcProject";

package grpc.serviceing;


// 消息推送/接收实体
message ExampleMessage
{
	string msg = 1;
}


// 双向流文件twowaystream.proto

syntax = "proto3";

import "Protos/messages.proto";

option csharp_namespace = "GrpcProject";

package grpc.serviceing;

service BothWaysRpc{
	// 双向流
	rpc StreamingBothWays(stream ExampleMessage) returns (stream ExampleMessage);
}
  1. 1 服务接口实现
    /// <summary>
    /// 双向流服务
    /// </summary>
    public class BothWaysService : BothWaysRpc.BothWaysRpcBase
    {
        /// <summary>
        /// 自动重置事件
        /// </summary>
        private readonly ManualResetEventSlim _event;
        public BothWaysService()
        {
            _event = new ManualResetEventSlim(false);
        }

        public override async Task StreamingBothWays(IAsyncStreamReader<ExampleMessage> requestStream,
                                               IServerStreamWriter<ExampleMessage> responseStream,
                                               ServerCallContext context)
        {

            // 创建线程安全的有限容量通道
            var channel = Channel.CreateBounded<ExampleMessage>(new BoundedChannelOptions(capacity: 5));

            var task = Task.Run(async () =>
            {
                await foreach (var message in requestStream.ReadAllAsync())
                {
                    // 读取消息 写入通道
                    if (!string.IsNullOrWhiteSpace(message.Msg))
                    {
                        await Console.Out.WriteLineAsync($"记录客户端传入消息:{message.Msg}");
                        // todo 消息处理
                        await channel.Writer.WriteAsync(message, context.CancellationToken);
                    }
                }
            }, context.CancellationToken);


            await foreach (var message in channel.Reader.ReadAllAsync())
            {
                // 打印通道接收的消息
                await Console.Out.WriteLineAsync($"通道传入消息:{message.Msg}");

                // 写入响应流
                ExampleMessage exampleMessage = new ExampleMessage() { Msg = $"我已经接收到消息:{message.Msg}" };

                await responseStream.WriteAsync(exampleMessage);

                if (message.Msg.ToLower() == "exit")
                {
                    break;
                }
            }

            // 完结写入通道
            channel.Writer.Complete();
            await task;
        }
    }
  1. 2 Program注入
    public class Program
    {
        public static void Main(string[] args)
        {
            var builder = WebApplication.CreateBuilder(args);
            builder.Services.AddGrpc();
            var app = builder.Build();
            // 一元方法
            //app.MapGrpcService<DollarService>();
            // 客户端流
            //app.MapGrpcService<ClientStreamService>();
            // 服务端流
            //app.MapGrpcService<ServerStreamService>();
            // 双向流
            app.MapGrpcService<BothWaysService>();
            app.Run();
        }
    }

四、客户端配置

  1. 引用proto文件,配置为客户端类型
  2. 根据编译生成的函数进行传参调用
  3. 创建WPF测试客户端

button按钮触发grpc

 

    /// <summary>
    /// BothWaysClient.xaml 的交互逻辑
    /// </summary>
    public partial class BothWaysClient : Window
    {
        public BothWaysClient()
        {
            InitializeComponent();
        }

        private async void Excute_Click(object sender, RoutedEventArgs e)
        {
            Action<string> action = str => { txtValue.Text += $"{str}\r\n"; };

            await WpfClient.Show(action);


            txtValue.Text += "\r\n\r\n";
        }
    }

grpc客户端接口调用

        /// <summary>
        /// 双向流
        /// </summary>
        /// <param name="action"></param>
        /// <returns></returns>
        public static async Task Show(Action<string> action)
        {
            var messages = new List<string>()
            {
                "test",
                "one",
                "two",
                "three",
                "false",
                "four",
                "Oooo",
                "dddd",
                "vvvfff",
                "exit"
            };


            Random rnd = new Random(20);


            var channel = GrpcChannel.ForAddress("https://localhost:7188");

            var client = new GrpcProject.BothWaysRpc.BothWaysRpcClient(channel);

            var bothWays = client.StreamingBothWays();

            var requestTask = Task.Run(async () =>
              {
                  while (true)
                  {
                      var index = rnd.Next(messages.Count);
                      var msg = messages[index];
                      await bothWays.RequestStream.WriteAsync(new ExampleMessage { Msg = msg });
                      if (msg == "exit")
                      {
                          break;
                      }
                  }
              });
            await foreach (var item in bothWays.ResponseStream.ReadAllAsync())
            {
                action(item.Msg);
                if (item.Msg == "我已经接收到消息:exit")
                {
                    break;
                }
            }

            await requestTask;
        }

五、执行结果

服务端:

 客户端:

 六、源码地址

链接:https://pan.baidu.com/s/1uCirfbexPJ7C-AujBVtkCQ 
提取码:sd4y

七、后续进阶简介

  1. 接下来会讲解客户端工厂,优化客户端请求地址使用依赖注入提取各个服务
  2. proto文件各个字段详细介绍
  3. token认证
  4. 截止时间(中止请求)和请求取消
  5. AOP切面策略
  6. 重试策略(policy)
  7. 负载均衡策略(grpc本身提供的策略及nginx代理)
  8. 日志记录
  9. 健康检查
  10. 后续有更多特色功能会持续补充

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/879302.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

new BigDecimal(double val)注意事项 / JWT解析BigDecimal类型数据

前言&#xff1a; 公司项目中有一个板块需要解析JWT令牌获取载荷里面封装的数据&#xff0c;遇到要解析一个BigDecimal类型的数据 问题发现过程&#xff1a; 正常来说&#xff0c;我们解析一个JWT令牌的步骤如下&#xff1a; public static Claims getDataFromToken(String tok…

python 多个字符替换为一个字符(简洁代码)

在windows系统当中的文件命名&#xff0c;有些特殊字符是不能存在&#xff0c;下面我们来看一下哪些字符不能存在。 文件名称中不能包含\ / : * ? " < > |一共9个特殊字符 一开始想用replace()替换&#xff0c;但是要处理多个字符&#xff0c;写起来代码不整洁 每次…

k8s认证详解 k8s证书详解 2023推荐

推荐阅读 https://www.yii666.com/blog/478731.html?actiononAll 在 Kube-apiserver 中提供了很多认证方式&#xff0c;其中最常用的就是 TLS 认证&#xff0c;当然也有 BootstrapToken&#xff0c;BasicAuth 认证等&#xff0c;只要有一个认证通过&#xff0c;那么 Kube-api…

Jupyter并发测试以后出现EOFError marshal data too short

Jupyter 并发测试以后出现EOFError: marshal data too short 背景 由于项目需求需要用户能进行网页在线运行python代码程序&#xff0c;调研后决定使用Jupyter的服务接口实现此功能&#xff0c;目前使用docker进行容器化部署&#xff0c;测试针对次服务进行并发测试。测试并发…

tkinter的Frame控件

文章目录 Frame和LabelFrame控件Frame参数LabelFrame参数 tkinter系列&#xff1a; GUI初步&#x1f48e;布局&#x1f48e;绑定变量&#x1f48e;绑定事件&#x1f48e;消息框&#x1f48e;文件对话框Frame控件&#x1f48e;PanedWindow和notebook控件扫雷小游戏&#x1f48e…

K8S系列二:实战入门

I. 配置kubectl 1.1 什么是kubectl&#xff1f; 官方文档中介绍kubectl是&#xff1a; Kubectl 是一个命令行接口&#xff0c;用于对 Kubernetes 集群运行命令。Kubectl的配置文件在$HOME/.kube目录。我们可以通过设置KUBECONFIG环境变量或设置命令参数–kubeconfig来指定其他…

Android布局【LinearLayout】

文章目录 常见属性orientation的选择项解释项目结构主要代码 常见属性 orientation&#xff1a;布局中组件的排列方式gravity&#xff1a;控制组件所包含的子元素的对齐方式&#xff0c;可多个组合layout_gravity&#xff1a;控制该组件在父容器里的对齐方式background&#x…

Verdi_如何dump信号的驱动强度

Verdi_如何dump信号的驱动强度 需求背景 在Verilog语法标准中&#xff0c;0和1各自被分成了8个强度等级&#xff1b; Strength NameStrength NameStrength Levelsupply 0supply 17strong 0strong 16pull 0pull 15large 0large 14weak 0weak 13medium 0medium 12small 0small…

k8s 自身原理 5

我们知道容器是通过 pod 来承载的&#xff0c;我们在 k8s 中&#xff0c;服务都是跑在 pod 里面的&#xff0c;pod 里面可以跑 1 个容器&#xff0c;或者跑多个容器&#xff0c;那么咱们 pod 里面跑 1 个服务容器&#xff0c;咱真的就以为里面就只有这样个容器吗&#xff1f; …

替代阿托斯DLKZOR-T/DLHZO-TES直动式伺服阀比例阀

DLKZOR-T/DLKZOR-TES直动式伺服阀比例阀结构&#xff1a; 1&#xff0c;LVDT传感器 2&#xff0c;比例电磁铁 3&#xff0c;阀体 4&#xff0c;阀套 5&#xff0c;阀芯 6&#xff0c;复位弹簧 7&#xff0c;集成数字放大器 8&#xff0c;七芯插头 9&#xff0c;RS232通…

Python学习笔记_基础篇(二)_数据类型之字符串

一.基本数据类型 整数&#xff1a;int 字符串&#xff1a;str(注&#xff1a;\t等于一个tab键) 布尔值&#xff1a; bool 列表&#xff1a;list 列表用[] 元祖&#xff1a;tuple 元祖用&#xff08;&#xff09; 字典&#xff1a;dict 注&#xff1a;所有的数据类型都存在想对应…

synchronized锁膨胀、锁升级、锁优化的过程

参考文章 Java中的偏向锁&#xff0c;轻量级锁&#xff0c; 重量级锁解析_萧萧九宸的博客-CSDN博客 本文是本人对以上文章的整理&#xff0c;建议先去看以上文章。 在Java中&#xff0c;一个锁对象的四种状态: 无锁偏向锁轻量级锁重量级锁 在Java中&#xff0c;一个锁就是一…

推断统计方法(假设检验)

统计方法除了描述统计方法之外还有推断统计&#xff0c;推断统计包括参数估计和假设检验&#xff0c;假设检验的概念就是先假设后检验&#xff0c;运用的是数学上的反证法&#xff1b;假设检验是利用样本数据提供的信息&#xff0c;对未知总体分布的某些方面&#xff08;如总体…

STM32F103C8T6蓝牙OTA教程

一、准备与简介 1. 准备材料 文章使用的软硬件并不局限&#xff0c;下述仅作参考&#xff0c;文章的所有使用的工程可在文末获取&#xff08;百度网盘Github&#xff09; 1&#xff09;STM32F103C8T6核心板 2&#xff09;下载器&#xff08;PWLINK&#xff09; 3&#xff0…

Vscode 常用操作教程

一、语言换成中文 这是我们可以直接点击左边栏第四个图标搜索插件 chinese ,也可以直接ctrlshiftp快捷键也会出来如图所示图标&#xff0c;出来chinese 插件之后选择安装install,安装完成之后重新ctrlshiftp会出现如图所示页面 找到我的鼠标在的地方对应的中文&#xff0c;此时…

【工作中问题解决实践 十二】使用@JsonTypeInfo实现请求数据对象多态

最近在处理接口请求进行数据写入的一个case时&#xff0c;我希望上游只使用我一个写入接口去实现不同类型的数据写入&#xff0c;而上游的数据写入Model是各不相同的&#xff0c;这就要求我接口的一个对象可以应对上游不同类型对象的写入请求。关于Jackson的概念不再赘述&#…

SQL进阶--SQL的常用技巧

一、ORDER BY FIELD() 自定义排序逻辑 排序 ORDER BY 除了可以用 ASC 和 DESC&#xff0c;还可以通过**ORDER BY FIELD(str,str1,...)**自定义字符串/数字来实现排序。这里用 order_diy 表举例&#xff0c;结构以及表数据展示&#xff1a; 二、CASE 表达式 「case when then el…

UI设计师个人工作感悟5篇

UI设计师个人工作感悟一 工作一年了&#xff0c;结合我自身谈谈UI设计的重要性。现在主流的论坛建站程序有两种 Phpwind 和Discuz(Phpwind被阿里巴巴收购 Discuz被腾讯收购这两个论坛程序都是开源免费的)&#xff0c;利用这两种程序我都分别建立过论坛&#xff0c;我第一次用的…

7-15 然后是几点

有时候人们用四位数字表示一个时间&#xff0c;比如 1106 表示 11 点零 6 分。现在&#xff0c;你的程序要根据起始时间和流逝的时间计算出终止时间。 读入两个数字&#xff0c;第一个数字以这样的四位数字表示当前时间&#xff0c;第二个数字表示分钟数&#xff0c;计算当前时…

高效解决在pycharm环境下的UserWarning: loaded more than 1 DLL from .libs这类问题

文章目录 问题解决方案Plan APlan B 解决&#xff01; 问题 这说明因同时存在多个动态链接库而存在冲突&#xff0c;所以需要删除其中一个 解决方案 Plan A Plan B 如果Plan A没用&#xff0c;就重装numpy&#xff0c;因为这个库就是numpy的 pip uninstall numpy pip insta…