一、服务端流式处理概述
- 客户端向服务端发送请求,服务端可以将多个消息流式传输回调用方
- 和客户端流相反,客户端流发出请求,服务端可以传输一批消息给客户端,直至本次请求响应完全结束。
- 针对文件分段传输下载,该方式非常有用。
二、案例介绍
- 提供一个一元方法查询文件
- 提供一个文件流传输的服务端流式方法,进行文件流推送
三、服务端配置(注意:grpc相关配置参考我之前的文章)
// 1.提供公共的实体proto文件
// 2.服务引用对应的proto文件
// 3.定义三个客户流方法
syntax = "proto3";
option csharp_namespace = "GrpcProject";
package grpc.serviceing;
// 服务端流对应的请求流和响应流
message FileInfoRequest
{
string fileName = 1;
}
message FileInfoResponse
{
string fileName = 1;
int64 fileSize = 2;
string extension = 3;
}
message ProgressBarResponse
{
FileInfoResponse fileMessage = 1;
bytes fileBytes = 2;
}
// serverstream.proto定义service方法
syntax = "proto3";
import "google/protobuf/empty.proto";
import "Protos/messages.proto";
option csharp_namespace = "GrpcProject";
package grpc.serviceing;
service ServerStreamRpc{
// 一元文件获取展示
rpc GetFileMessage(google.protobuf.Empty) returns (FileInfoResponse);
// 服务端文件流处理
rpc StreamingFromServer (FileInfoRequest) returns (stream ProgressBarResponse);
}
服务接口实现:
public class ServerStreamService : ServerStreamRpc.ServerStreamRpcBase
{
/// <summary>
/// 获取文件信息
/// </summary>
/// <param name="request">空请求</param>
/// <param name="context">服务调用上下文</param>
/// <returns></returns>
public override Task<FileInfoResponse> GetFileMessage(Empty request, ServerCallContext context)
{
var filePath = Path.Combine(AppDomain.CurrentDomain.BaseDirectory, "Files", "cn-Liang-y.rar");
FileInfo fileInfo = new FileInfo(filePath);
FileInfoResponse fileInfoResponse = new FileInfoResponse();
fileInfoResponse.FileName = fileInfo.Name;
fileInfoResponse.FileSize = fileInfo.Length;
fileInfoResponse.Extension = fileInfo.Extension;
return Task.FromResult(fileInfoResponse);
}
public override async Task StreamingFromServer(FileInfoRequest request,
IServerStreamWriter<ProgressBarResponse> responseStream,
ServerCallContext context)
{
var filePath = Path.Combine(AppDomain.CurrentDomain.BaseDirectory, "Files", request.FileName);
if (!File.Exists(filePath))
{
throw new FileNotFoundException(nameof(filePath));
}
// 进度条按照 100百分比进行划分
FileInfo fileInfo = new FileInfo(filePath);
using var fileStream = fileInfo.OpenRead();
// 插入固定长度
int fixedLength = (int)fileStream.Length / 100;
byte[] fileBytes = new byte[fixedLength];
int len;
while ((len = fileStream.Read(fileBytes, 0, fixedLength)) > 0)
{
await Console.Out.WriteLineAsync($"打印字节长度:{len}");
var response = new ProgressBarResponse();
response.FileMessage = new FileInfoResponse
{
FileName = fileInfo.Name,
FileSize = fileInfo.Length,
Extension = fileInfo.Extension
};
response.FileBytes = ByteString.CopyFrom(fileBytes);
await responseStream.WriteAsync(response);
}
}
}
Program注入:
public class Program
{
public static void Main(string[] args)
{
var builder = WebApplication.CreateBuilder(args);
builder.Services.AddGrpc();
var app = builder.Build();
// 一元方法
//app.MapGrpcService<DollarService>();
// 客户端流
//app.MapGrpcService<ClientStreamService>();
// 服务端流
app.MapGrpcService<ServerStreamService>();
app.Run();
}
}
四、客户端配置
- 引用proto文件,配置为客户端类型
- 根据编译生成的函数进行传参调用
- 创建WPF客户端提供控制条显示
button按钮触发grpc
private async void download_Click(object sender, RoutedEventArgs e)
{
Action<int> action = async i =>
{
progressBar.Value = i;
await Task.Delay(100);
};
await WpfClient.Show(action);
}
grpc客户端接口调用
public class WpfClient
{
public static async Task Show(Action<int> action)
{
var channel = GrpcChannel.ForAddress("https://localhost:7188");
var client = new GrpcProject.ServerStreamRpc.ServerStreamRpcClient(channel);
var fileMessage = await client.GetFileMessageAsync(new Google.Protobuf.WellKnownTypes.Empty());
FileInfoRequest request = new FileInfoRequest();
request.FileName = fileMessage.FileName;
var streaming = client.StreamingFromServer(request);
var path = Path.Combine(Directory.GetCurrentDirectory(), "test.rar");
using var stream = new FileStream(path, FileMode.OpenOrCreate, FileAccess.ReadWrite, FileShare.ReadWrite);
int i = 0;
await foreach (var item in streaming.ResponseStream.ReadAllAsync())
{
stream.Write(item.FileBytes.Span);
action(i++);
}
stream.Flush();
stream.Close();
}
}
五、执行结果
在文件根目录可以看到下载的文件
六、源码地址
链接:https://pan.baidu.com/s/13_AEFHLLJS5qN8aIby8IsA
提取码:72x0