一、双向流处理概述
- 简单来讲客户端可以向服务端发送消息流,服务端也可以向客户端传输响应流,即客户端和服务端可以互相通讯
- 客户端无需发送消息即可开始双向流式处理调用 。 客户端可选择使用
RequestStream.WriteAsync
发送消息。 使用ResponseStream.MoveNext()
或ResponseStream.ReadAllAsync()
可访问从服务流式处理的消息。ResponseStream
没有更多消息时,双向流式处理调用完成。
二、案例简介
- 客户端发送请求流通过
equestStream.WriteAsync
传入到服务端 - 服务端响应到客户端的流通过
ResponseStream.WriteAsync写入到客户端
- 服务端使用System.Threading.Channels保证线程安全交互
三、服务端配置(注意:grpc相关配置参考我之前的文章)
- 配置.proto文件
// 1.提供公共的实体proto文件
// 2.服务引用对应的proto文件
// 3.定义三个客户流方法
//定义messages.proto文件令需要注意项目文件中的特性GrpcServices=None;
syntax = "proto3";
option csharp_namespace = "GrpcProject";
package grpc.serviceing;
// 消息推送/接收实体
message ExampleMessage
{
string msg = 1;
}
// 双向流文件twowaystream.proto
syntax = "proto3";
import "Protos/messages.proto";
option csharp_namespace = "GrpcProject";
package grpc.serviceing;
service BothWaysRpc{
// 双向流
rpc StreamingBothWays(stream ExampleMessage) returns (stream ExampleMessage);
}
- 1 服务接口实现
/// <summary>
/// 双向流服务
/// </summary>
public class BothWaysService : BothWaysRpc.BothWaysRpcBase
{
/// <summary>
/// 自动重置事件
/// </summary>
private readonly ManualResetEventSlim _event;
public BothWaysService()
{
_event = new ManualResetEventSlim(false);
}
public override async Task StreamingBothWays(IAsyncStreamReader<ExampleMessage> requestStream,
IServerStreamWriter<ExampleMessage> responseStream,
ServerCallContext context)
{
// 创建线程安全的有限容量通道
var channel = Channel.CreateBounded<ExampleMessage>(new BoundedChannelOptions(capacity: 5));
var task = Task.Run(async () =>
{
await foreach (var message in requestStream.ReadAllAsync())
{
// 读取消息 写入通道
if (!string.IsNullOrWhiteSpace(message.Msg))
{
await Console.Out.WriteLineAsync($"记录客户端传入消息:{message.Msg}");
// todo 消息处理
await channel.Writer.WriteAsync(message, context.CancellationToken);
}
}
}, context.CancellationToken);
await foreach (var message in channel.Reader.ReadAllAsync())
{
// 打印通道接收的消息
await Console.Out.WriteLineAsync($"通道传入消息:{message.Msg}");
// 写入响应流
ExampleMessage exampleMessage = new ExampleMessage() { Msg = $"我已经接收到消息:{message.Msg}" };
await responseStream.WriteAsync(exampleMessage);
if (message.Msg.ToLower() == "exit")
{
break;
}
}
// 完结写入通道
channel.Writer.Complete();
await task;
}
}
- 2 Program注入
public class Program
{
public static void Main(string[] args)
{
var builder = WebApplication.CreateBuilder(args);
builder.Services.AddGrpc();
var app = builder.Build();
// 一元方法
//app.MapGrpcService<DollarService>();
// 客户端流
//app.MapGrpcService<ClientStreamService>();
// 服务端流
//app.MapGrpcService<ServerStreamService>();
// 双向流
app.MapGrpcService<BothWaysService>();
app.Run();
}
}
四、客户端配置
- 引用proto文件,配置为客户端类型
- 根据编译生成的函数进行传参调用
- 创建WPF测试客户端
button按钮触发grpc
/// <summary>
/// BothWaysClient.xaml 的交互逻辑
/// </summary>
public partial class BothWaysClient : Window
{
public BothWaysClient()
{
InitializeComponent();
}
private async void Excute_Click(object sender, RoutedEventArgs e)
{
Action<string> action = str => { txtValue.Text += $"{str}\r\n"; };
await WpfClient.Show(action);
txtValue.Text += "\r\n\r\n";
}
}
grpc客户端接口调用
/// <summary>
/// 双向流
/// </summary>
/// <param name="action"></param>
/// <returns></returns>
public static async Task Show(Action<string> action)
{
var messages = new List<string>()
{
"test",
"one",
"two",
"three",
"false",
"four",
"Oooo",
"dddd",
"vvvfff",
"exit"
};
Random rnd = new Random(20);
var channel = GrpcChannel.ForAddress("https://localhost:7188");
var client = new GrpcProject.BothWaysRpc.BothWaysRpcClient(channel);
var bothWays = client.StreamingBothWays();
var requestTask = Task.Run(async () =>
{
while (true)
{
var index = rnd.Next(messages.Count);
var msg = messages[index];
await bothWays.RequestStream.WriteAsync(new ExampleMessage { Msg = msg });
if (msg == "exit")
{
break;
}
}
});
await foreach (var item in bothWays.ResponseStream.ReadAllAsync())
{
action(item.Msg);
if (item.Msg == "我已经接收到消息:exit")
{
break;
}
}
await requestTask;
}
五、执行结果
服务端:
客户端:
六、源码地址
链接:https://pan.baidu.com/s/1uCirfbexPJ7C-AujBVtkCQ
提取码:sd4y
七、后续进阶简介
- 接下来会讲解客户端工厂,优化客户端请求地址使用依赖注入提取各个服务
- proto文件各个字段详细介绍
- token认证
- 截止时间(中止请求)和请求取消
- AOP切面策略
- 重试策略(policy)
- 负载均衡策略(grpc本身提供的策略及nginx代理)
- 日志记录
- 健康检查
- 后续有更多特色功能会持续补充