API
一个API做了两件事
- 客户端发起请求Request
- 服务端作出响应Response
REST是什么
REST(Representational State Transfer):表现层状态传输,是一种设计风格,通常将 HTTP API 称为 RESTful API、RESTful 服务或 REST 服务
- 资源由URL决定
- 通过GET、POST、PUT、DELETE、PATCH、OPTIONS、HEAD、TRACE方法来操作资源
- 操作资源的表现形式是XML、HTML、JSON等格式
- 由客户端保存状态
RPC是什么
RPC(Remote Procedure Call):远程过程调用,像调用本地函数一样调用远程函数,grpc
是RPC的一种实现
Tonic是什么
tonic基于HTTP/2构建,grpc的Rust实现
Protocol Buffers
Protocol Buffers协议缓冲区
服务方法
允许定义四种服务方法
- 一元RPC:客户端向服务端发送单个请求并得到单个响应,就像调用普通函数一样
- 服务端流式RPC:客户端向服务端发送请求并获取流以读取一系列消息,客户端从返回的流中读取直到没有消息
- 客户端流式RPC:客户端使用流编写一系列消息发送到服务端,等待服务端做出响应
- 双向流式RPC:双方使用读写流发送一系列消息,两个流独立运行
Protocol Buffers语法
- 文件名以
.proto
结尾 - 类似于json,体积更小、速度更快,会生成本机语言绑定
message:定义数据传递格式
-
一个proto文件可以定义多个消息,消息可以嵌套
-
required:proto2必填、proto3不需要填
-
optional:可选字段
-
repeate:可重复字段
-
标识号:每个字段必须要有一个唯一的标识号,范围1~2^29 -1(19000-19999保留标识号不能用)
service:RPC服务接口
认证
- SSL/TLS认证
- TLS(Transport layerSecurity)安全传输层,建立在TCP协议上,前身是SSL(Secure Socket Layer)安全套接字层,将应用层的报文进行加密后再交由TCP传输
- 基于Token认证
- 自定义认证
安装protobuf
https://github.com/protocolbuffers/protobuf/releases/tag/v28.2
解压后将路径加入PATH
验证
protoc --version
Tonic实战
- 需要安装protobuf
- 项目地址:https://github.com/VCCICCV/tonic-demo
创建项目
cargo new tonic-server
cargo new tonic-client
两个项目都添加依赖
[dependencies]
tonic = "0.12.2"
tokio = { version = "1.40.0", features = ["macros", "rt-multi-thread"] }
# grpc编码器
prost = "0.13.2"
# 我们只在构建的时候需要它
[build-dependencies]
tonic-build = "0.12.2"
在两个项目同级新建proto\order.proto
syntax = "proto3";// 指定proto版本
package order;// 指定包名
// 定义服务
service OrderService {
rpc GetOrder (OrderRequest) returns (OrderResponse);
rpc SetOrder (Order) returns (OrderResponse);
}
// 定义message类型
message OrderRequest {
int32 id = 1;
}
message OrderResponse {
int32 id = 1;
string description = 2;
double price = 3;
}
message Order {
int32 id = 1;
string description = 2;
double price = 3;
}
两个项目根目录新建build.rs
,用于编译客户端和服务端的代码
// 告诉tonic-build,需要编译哪些文件
fn main() -> Result<(), Box<dyn std::error::Error>> {
tonic_build::compile_protos("../proto/order.proto")?;// 注意导入的是项目外也就是项目同级的文件夹
Ok(())
}
此时两个项目编译
cargo build
生成的rust代码在target\debug\build\tonic-server-****\out\order.rs
server项目新建src\order_service.rs
use tonic::{ Request, Response, Status };
use order::order_service_server::{ OrderService, OrderServiceServer };
use order::{ OrderRequest, OrderResponse, Order };
pub mod order {
tonic::include_proto!("order");// 使用宏引入proto文件
}
#[derive(Debug, Default)] // 实现打印、默认值
pub struct MyOrderService;
#[tonic::async_trait] // 异步方法
impl OrderService for MyOrderService {
// 获取订单
async fn get_order(
&self,
request: Request<OrderRequest>
) -> Result<Response<OrderResponse>, Status> {
let req = request.into_inner();
let response = OrderResponse {
id: req.id,
description: format!("Order {}", req.id),
price: 100.0,
};
Ok(Response::new(response))
}
// 创建订单
async fn set_order(&self, request: Request<Order>) -> Result<Response<OrderResponse>, Status> {
let order = request.into_inner();
let response = OrderResponse {
id: order.id,
description: order.description,
price: order.price,
};
Ok(Response::new(response))
}
}
// 启动grpc服务
pub fn create_server() -> OrderServiceServer<MyOrderService> {
OrderServiceServer::new(MyOrderService::default())
}
server项目main.rs
mod order_service;
use order_service::create_server;
use tonic::transport::Server;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// 监听端口
let addr = "[::1]:50051".parse()?;
// 创建grpc服务
let order_service = create_server();
println!("OrderService listening on {}", addr);
// 启动服务
Server::builder().add_service(order_service).serve(addr).await?;
Ok(())
}
此时启动server项目
cargo run
使用Apifox创建grpc请求
设置端口[::1]:50051
导入proto文件
此时调用即可响应请求
{
"id": 0,
"description": "Order 0",
"price": 100
}
client项目main.rs
use order::order_service_client::OrderServiceClient;
use order::{ OrderRequest, Order };
pub mod order {
tonic::include_proto!("order");
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// 连接到gRPC服务端
let mut client = OrderServiceClient::connect("http://[::1]:50051").await?;
// 创建请求
let request = tonic::Request::new(OrderRequest { id: 1 });
// 发送请求并等待响应
let response = client.get_order(request).await?;
println!("RESPONSE={:?}", response);
let order = Order {
id: 1,
description: "New Order".to_string(),
price: 150.0,
};
// 创建请求
let request = tonic::Request::new(order);
// 发送请求并等待响应
let response = client.set_order(request).await?;
println!("RESPONSE={:?}", response);
Ok(())
}
运行即可调用服务端的方法
cargo run
响应数据
RESPONSE=Response { metadata: MetadataMap { headers: {"content-type": "application/grpc", "date": "Fri, 20 Sep 2024 17:35:55 GMT", "grpc-status": "0"} }, message: OrderResponse { id: 1, description: "Order 1", price: 100.0 }, extensions: Extensions }
RESPONSE=Response { metadata: MetadataMap { headers: {"content-type": "application/grpc", "date": "Fri, 20 Sep 2024 tent-type": "application/grpc", "date": "Fri, 20 Sep 2024 17:35:55 GMT", "grpc-status": "0"} }, message: OrderResponse { id: 1, description: "Order 1", price: 100.0 }, extensions: Extensions }
RESPONSE=Response { metadata: MetadataMap { headers: {"content-type": "application/grpc", "date": "Fri, 20 Sep 2024 17:35:55 GMT", "grpc-status": "0"} }, message: OrderResponions: Extensions }
RESPONSE=Response { metadata: MetadataMap { headers: {"content-type": "application/grpc", "date": "Fri, 20 Sep 2024 17:35:55 GMT", "grpc-status": "0"} }, message: OrderRespontent-type": "application/grpc", "date": "Fri, 20 Sep 2024 17:35:55 GMT", "grpc-status": "0"} }, message: OrderResponse { id: 1, description: "New Order", price: 150.0 }, extensions: Extensions }
一元RPC、服务端流式RPC、客户端流式RPC、双向流式RPC
example.proto
syntax = "proto3";
package example;
service ExampleService {
// Unary RPC
rpc UnaryCall(RequestMessage) returns (ResponseMessage);
// Server-side streaming RPC
rpc ServerStream(RequestMessage) returns (stream ResponseMessage);
// Client-side streaming RPC
rpc ClientStream(stream RequestMessage) returns (ResponseMessage);
// Bidirectional streaming RPC
rpc BidiStream(stream RequestMessage) returns (stream ResponseMessage);
}
message RequestMessage {
string message = 1;
}
message ResponseMessage {
string message = 1;
}
server项目build.rs
// 告诉tonic-build,需要编译哪些文件
fn main() -> Result<(), Box<dyn std::error::Error>> {
tonic_build::compile_protos("../proto/example.proto")?;// 注意导入的是项目外也就是项目同级的文件夹
Ok(())
}
server项目cargo.toml
[dependencies]
tonic = "0.12.2"
tokio = { version = "1.40.0", features = ["full"] }
prost = "0.13.2"
futures-util = "0.3.30"
tokio-stream = "0.1.14"
tonic-reflection = "0.12.2"
[build-dependencies]
tonic-build = "0.12.2"
server项目main.rs
use tonic::{ transport::Server, Request, Response, Status };
use futures_util::Stream; // 使用 futures_util 提供的 Stream trait
use tokio_stream::wrappers::ReceiverStream; // 引入 tokio_stream
use tokio::sync::mpsc;
use std::pin::Pin;
use example::example_service_server::{ ExampleService, ExampleServiceServer };
use example::{ RequestMessage, ResponseMessage };
pub mod example {
tonic::include_proto!("example");
}
#[derive(Default)]
pub struct MyExampleService {}
#[tonic::async_trait]
impl ExampleService for MyExampleService {
// 1. 一元 RPC 调用
async fn unary_call(
&self,
request: Request<RequestMessage>
) -> Result<Response<ResponseMessage>, Status> {
println!("一元调用");
let message = request.into_inner().message;
Ok(
Response::new(ResponseMessage {
message: format!("Hello from Unary: {}", message),
})
)
}
// 2. 服务端流式 RPC 调用
type ServerStreamStream = Pin<Box<dyn Stream<Item = Result<ResponseMessage, Status>> + Send>>;
async fn server_stream(
&self,
request: Request<RequestMessage>
) -> Result<Response<Self::ServerStreamStream>, Status> {
println!("服务端流式调用");
let message = request.into_inner().message;
let (tx, rx) = mpsc::channel(4);
tokio::spawn(async move {
for i in 1..=5 {
tx.send(
Ok(ResponseMessage {
message: format!("Stream {}: {}", i, message),
})
).await.unwrap();
}
println!("流结束");
});
Ok(Response::new(Box::pin(ReceiverStream::new(rx))))
}
// 3. 客户端流式 RPC 调用
async fn client_stream(
&self,
request: Request<tonic::Streaming<RequestMessage>>
) -> Result<Response<ResponseMessage>, Status> {
println!("客户端流式调用");
let mut stream = request.into_inner();
let mut messages = vec![];
while let Some(req) = stream.message().await? {
messages.push(req.message);
}
Ok(
Response::new(ResponseMessage {
message: format!("Received: {:?}", messages),
})
)
}
// 4. 双向流式 RPC 调用
type BidiStreamStream = Pin<Box<dyn Stream<Item = Result<ResponseMessage, Status>> + Send>>;
async fn bidi_stream(
&self,
request: Request<tonic::Streaming<RequestMessage>>
) -> Result<Response<Self::BidiStreamStream>, Status> {
println!("双向流式调用");
let mut stream = request.into_inner();
let (tx, rx) = mpsc::channel(4);
tokio::spawn(async move {
while let Some(req) = stream.message().await.unwrap() {
tx.send(
Ok(ResponseMessage {
message: format!("Echo: {}", req.message),
})
).await.unwrap();
}
println!("流结束");
});
Ok(Response::new(Box::pin(ReceiverStream::new(rx))))
}
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let addr = "[::1]:50051".parse()?;
let example_service = MyExampleService::default();
println!("Server listening on {}", addr);
Server::builder().add_service(ExampleServiceServer::new(example_service)).serve(addr).await?;
Ok(())
}
此时cargo build
、cargo run
即可用apifox发送rpc请求
注意我们更改了proto文件,需要重新指定
一元rpc
服务端流式 RPC
客户端流式 RPC
双向流式 RPC 调用
client项目main.rs
use tonic::Request;
use example::example_service_client::ExampleServiceClient;
use example::{ RequestMessage };
pub mod example {
tonic::include_proto!("example");
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// 连接到服务端
let mut client = ExampleServiceClient::connect("http://[::1]:50051").await?;
// 1. 一元 RPC 调用
let request = Request::new(RequestMessage {
message: String::from("Unary Hello"),
});
let response = client.unary_call(request).await?;
println!("Unary Response: {:?}", response.into_inner());
// 2. 服务器流式 RPC 调用
let request = Request::new(RequestMessage {
message: String::from("Stream Hello"),
});
let mut stream = client.server_stream(request).await?.into_inner();
while let Some(res) = stream.message().await? {
println!("Server Stream Response: {:?}", res);
}
Ok(())
}
由于proto生成的rust方法在不同的位置,例如server生成的代码在server/target***
下,客户端无法调用server提供的方法,我们需要将代码生成的目录设置为两个项目的公共目录
改造为gRPC微服务通信
- 反射(Reflection):允许客户端在运行时发现 gRPC 服务所提供的方法、消息类型等信息,而无需在编译时就知道这些详细信息,但是它只提供了一种动态发现服务的能力,你可以用postman、apifox的反射调用而不需要导入proto文件,在编写客户端代码时仍然需要导入生成后的代码
反射仅仅是为了让客户端在运行时查询服务定义,而不是为了消除客户端对服务定义的依赖
项目结构
创建公共目录proto
proto\example.proto
syntax = "proto3";
package example;
service ExampleService {
// Unary RPC
rpc UnaryCall(RequestMessage) returns (ResponseMessage);
// Server-side streaming RPC
rpc ServerStream(RequestMessage) returns (stream ResponseMessage);
// Client-side streaming RPC
rpc ClientStream(stream RequestMessage) returns (ResponseMessage);
// Bidirectional streaming RPC
rpc BidiStream(stream RequestMessage) returns (stream ResponseMessage);
}
message RequestMessage {
string message = 1;
}
message ResponseMessage {
string message = 1;
}
两个项目的cargo.toml
[dependencies]
tonic = "0.12.2"
tokio = { version = "1.40.0", features = ["full"] }
prost = "0.13.3"
tokio-stream = "0.1.14"
tonic-reflection = "0.12.2"
[build-dependencies]
tonic-build = "0.12.2"
server\build.rs
use std::path::PathBuf;
fn main() -> Result<(), Box<dyn std::error::Error>> {
let out_dir = PathBuf::from("../proto");
// let out_dir = PathBuf::from(env::var("OUT_DIR").unwrap()); // 指定输出目录在target下
tonic_build
::configure()
.build_server(true)// 指定是否生成服务端代码
.out_dir("src/generated")// 指定rust文件输出目录
.file_descriptor_set_path(out_dir.join("example_descriptor.bin")) // 生成文件描述符集
.compile(&["../proto/example.proto"], &["../proto"])
.unwrap();
Ok(())
}
client\build.rs
use std::path::PathBuf;
fn main() -> Result<(), Box<dyn std::error::Error>> {
let out_dir = PathBuf::from("../proto");
// let out_dir = PathBuf::from(env::var("OUT_DIR").unwrap()); // 指定输出目录在target下
tonic_build
::configure()
.build_client(true)// 指定是否生成客户端端代码
.out_dir("src/generated")// 指定rust文件输出目录
.file_descriptor_set_path(out_dir.join("example_descriptor.bin")) // 生成文件描述符集
.compile(&["../proto/example.proto"], &["../proto"])
.unwrap();
Ok(())
}
- .build_client(true):生成客户端Rust代码
- .build_server(true):生成服务端Rust代码
- .out_dir(“src/generated”):指定生成的Rust代码的目录,
cargo build
你会发现生成的example.rs
内容是一样的,因为我们定义的请求与响应的proto文件内容是一样的 - .file_descriptor_set_path:生成文件描述符集,用于创建反射服务
- .compile(&[“…/proto/example.proto”], &[“…/proto”]):指定编译的proto文件及其目录
server\src\main.rs
mod server_lib;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
server_lib::start_server().await?;
Ok(())
}
server\src\server_lib.rs
这里我们添加了反射
use tonic::{ Request, Response, Status, Streaming };
use tokio_stream::{ wrappers::ReceiverStream, StreamExt };
use std::pin::Pin;
use tokio::sync::mpsc;
pub mod example {
// 1. 导入文件描述符集
pub const FILE_DESCRIPTOR_SET: &[u8] = include_bytes!("../../proto/example_descriptor.bin");
// tonic::include_proto!("example");// 你可以ctrl点击查看源码,它是一个宏,添加了env的编译目录,我们这里指定生成的rust目录,所以直接导入即可
include!("./generated/example.rs");
}
use example::example_service_server::{ ExampleService, ExampleServiceServer };
use example::{ RequestMessage, ResponseMessage };
#[derive(Debug, Default)]
pub struct MyExampleService;
#[tonic::async_trait]
impl ExampleService for MyExampleService {
// 一元RPC调用
async fn unary_call(
&self,
request: Request<RequestMessage>
) -> Result<Response<ResponseMessage>, Status> {
let response = ResponseMessage {
message: format!("Unary response to {}", request.into_inner().message),
};
Ok(Response::new(response))
}
type ServerStreamStream = Pin<
Box<dyn tokio_stream::Stream<Item = Result<ResponseMessage, Status>> + Send>
>;
// 服务端流式RPC调用
async fn server_stream(
&self,
request: Request<RequestMessage>
) -> Result<Response<Self::ServerStreamStream>, Status> {
let message = request.into_inner().message;
let (tx, rx) = mpsc::channel(4);
tokio::spawn(async move {
for i in 0..3 {
let response = ResponseMessage {
message: format!("Stream response {} to {}", i, message),
};
tx.send(Ok(response)).await.unwrap();
}
});
Ok(Response::new(Box::pin(ReceiverStream::new(rx)) as Self::ServerStreamStream))
}
// 客户端流式RPC调用
async fn client_stream(
&self,
request: Request<Streaming<RequestMessage>>
) -> Result<Response<ResponseMessage>, Status> {
let mut stream = request.into_inner();
let mut messages = Vec::new();
while let Some(req) = stream.next().await {
let req = req?;
messages.push(req.message);
}
let response = ResponseMessage {
message: format!("Received messages: {:?}", messages.join(", ")),
};
Ok(Response::new(response))
}
type BidiStreamStream = Pin<
Box<dyn tokio_stream::Stream<Item = Result<ResponseMessage, Status>> + Send>
>;
// 双向流式RPC调用
async fn bidi_stream(
&self,
request: Request<Streaming<RequestMessage>>
) -> Result<Response<Self::BidiStreamStream>, Status> {
let mut stream = request.into_inner();
let (tx, rx) = mpsc::channel(4);
tokio::spawn(async move {
while let Some(req) = stream.next().await {
let req = req.unwrap();
let response = ResponseMessage {
message: format!("Echo: {}", req.message),
};
tx.send(Ok(response)).await.unwrap();
}
});
Ok(Response::new(Box::pin(ReceiverStream::new(rx)) as Self::BidiStreamStream))
}
}
pub async fn start_server() -> Result<(), Box<dyn std::error::Error>> {
let addr = "[::1]:50051".parse()?;
let example_service = MyExampleService::default();
println!("Server listening on {}", addr);
// 2. 创建反射服务
let reflection_service = tonic_reflection::server::Builder
::configure()
.register_encoded_file_descriptor_set(example::FILE_DESCRIPTOR_SET)
.build_v1()?;
tonic::transport::Server
::builder()
.add_service(ExampleServiceServer::new(example_service))
.add_service(reflection_service) // 3. 添加反射服务
.serve(addr).await?;
Ok(())
}
tonic-client\src\main.rs
你可以在生成的client\src\generated\example.rs
查看client
端提供的方法
use tonic::transport::Channel;
use example::example_service_client::ExampleServiceClient;
use example::RequestMessage;
use tokio_stream::StreamExt;
pub mod example {
// tonic::include_proto!("example");
include!("./generated/example.rs");
}
// 一元RPC调用
async fn unary_call(
client: &mut ExampleServiceClient<Channel>
) -> Result<(), Box<dyn std::error::Error>> {
let request = tonic::Request::new(RequestMessage {
message: "Hello from unary".into(),
});
let response = client.unary_call(request).await?;
println!("Unary response: {:?}", response.into_inner());
Ok(())
}
// 服务端流式RPC调用
async fn server_stream(
client: &mut ExampleServiceClient<Channel>
) -> Result<(), Box<dyn std::error::Error>> {
let request = tonic::Request::new(RequestMessage {
message: "Hello from stream".into(),
});
let mut response = client.server_stream(request).await?.into_inner();
while let Some(res) = response.next().await {
println!("Server stream response: {:?}", res?);
}
Ok(())
}
// 客户端流式RPC调用
async fn client_stream(
client: &mut ExampleServiceClient<Channel>
) -> Result<(), Box<dyn std::error::Error>> {
let request = tokio_stream::iter(
vec![
RequestMessage { message: "Hello".into() },
RequestMessage { message: "from".into() },
RequestMessage { message: "client".into() }
]
);
let response = client.client_stream(request).await?;
println!("Client stream response: {:?}", response.into_inner());
Ok(())
}
// 双向流式RPC调用
async fn bidi_stream(
client: &mut ExampleServiceClient<Channel>
) -> Result<(), Box<dyn std::error::Error>> {
let request = tokio_stream::iter(
vec![
RequestMessage { message: "Hello".into() },
RequestMessage { message: "from".into() },
RequestMessage { message: "bidi".into() }
]
);
let mut response = client.bidi_stream(request).await?.into_inner();
while let Some(res) = response.next().await {
println!("Bidi stream response: {:?}", res?);
}
Ok(())
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// 连接到gRPC服务器
let mut client = ExampleServiceClient::connect("http://[::1]:50051").await?;
unary_call(&mut client).await?;
server_stream(&mut client).await?;
client_stream(&mut client).await?;
bidi_stream(&mut client).await?;
Ok(())
}
此时cargo run
两个服务即可看到响应信息,你也可以用postman、apifox请求
tonic超时处理、认证看官方例子就可以了https://github.com/hyperium/tonic/tree/master/examples/src
bin crate实现client\server(可选了解)
- bin:二进制可执行文件
- lib:库crate
一般来说是用两个服务,两个服务间通信用rpc,bin crate的方法直接调用就好了,很多教程都是用的bin,包括官方例子也是用的bin
这里给出的是在main中启动服务端和客户端,可能你在嵌入式中使用gRPC?
如果你想作为bin使用gRPC,你可以在
cargo.toml
里添加[[bin]]
指定crate,bin crate会编译为单独的二进制文件[[bin]] name="client" path="src/example_client.rs" [[bin]] name="server" path="src/example_server.rs"
在client、server编写main函数
然后
cargo run --bin server
、cargo run --bin client
cargo.toml
[dependencies]
tonic = "0.12.2"
tokio = { version = "1.40.0", features = ["full"] }
prost = "0.13.3"
tokio-stream = "0.1.14"
tonic-reflection = "0.12.2"
[build-dependencies]
tonic-build = "0.12.2"
../proto/example.proto
syntax = "proto3";
package example;
service ExampleService {
// Unary RPC
rpc UnaryCall(RequestMessage) returns (ResponseMessage);
// Server-side streaming RPC
rpc ServerStream(RequestMessage) returns (stream ResponseMessage);
// Client-side streaming RPC
rpc ClientStream(stream RequestMessage) returns (ResponseMessage);
// Bidirectional streaming RPC
rpc BidiStream(stream RequestMessage) returns (stream ResponseMessage);
}
message RequestMessage {
string message = 1;
}
message ResponseMessage {
string message = 1;
}
build.rs
fn main() -> Result<(), Box<dyn std::error::Error>> {
tonic_build
::configure()
.compile(&["../proto/example.proto"], &["../proto"])
.unwrap();
Ok(())
}
src/example_client.rs
use example::example_service_client::ExampleServiceClient;
use example::RequestMessage;
use tokio_stream::StreamExt;
pub mod example {
tonic::include_proto!("example");
}
// 一元RPC
pub async fn unary_call() -> Result<(), Box<dyn std::error::Error>> {
let mut client = ExampleServiceClient::connect("http://[::1]:50051").await?;
let request = tonic::Request::new(RequestMessage {
message: "Hello from unary".into(),
});
let response = client.unary_call(request).await?;
println!("Unary response: {:?}", response.into_inner());
Ok(())
}
// 服务端流式RPC
pub async fn server_stream() -> Result<(), Box<dyn std::error::Error>> {
let mut client = ExampleServiceClient::connect("http://[::1]:50051").await?;
let request = tonic::Request::new(RequestMessage {
message: "Hello from stream".into(),
});
let mut response = client.server_stream(request).await?.into_inner();
while let Some(res) = response.next().await {
println!("Server stream response: {:?}", res?);
}
Ok(())
}
// 客户端流式RPC
pub async fn client_stream() -> Result<(), Box<dyn std::error::Error>> {
let mut client_stream = ExampleServiceClient::connect("http://[::1]:50051").await?;
let request = tokio_stream::iter(
vec![
RequestMessage { message: "Hello".into() },
RequestMessage { message: "from".into() },
RequestMessage { message: "client".into() }
]
);
let response = client_stream.client_stream(request).await?;
println!("Client stream response: {:?}", response.into_inner());
Ok(())
}
// 双向流式RPC
pub async fn bidi_stream() -> Result<(), Box<dyn std::error::Error>> {
let mut client = ExampleServiceClient::connect("http://[::1]:50051").await?;
let request = tokio_stream::iter(
vec![
RequestMessage { message: "Hello".into() },
RequestMessage { message: "from".into() },
RequestMessage { message: "bidi".into() }
]
);
let mut response = client.bidi_stream(request).await?.into_inner();
while let Some(res) = response.next().await {
println!("Bidi stream response: {:?}", res?);
}
Ok(())
}
src/example_server.rs
use tonic::{ transport::Server, Request, Response, Status, Streaming };
use tokio_stream::{ wrappers::ReceiverStream, StreamExt };
use std::pin::Pin;
use tokio::sync::mpsc;
pub mod example {
tonic::include_proto!("example");
}
use example::example_service_server::{ ExampleService, ExampleServiceServer };
use example::{ RequestMessage, ResponseMessage };
#[derive(Debug, Default)]
pub struct MyExampleService;
#[tonic::async_trait]
impl ExampleService for MyExampleService {
async fn unary_call(
&self,
request: Request<RequestMessage>
) -> Result<Response<ResponseMessage>, Status> {
let response = ResponseMessage {
message: format!("Unary response to {}", request.into_inner().message),
};
Ok(Response::new(response))
}
type ServerStreamStream = Pin<
Box<dyn tokio_stream::Stream<Item = Result<ResponseMessage, Status>> + Send>
>;
async fn server_stream(
&self,
request: Request<RequestMessage>
) -> Result<Response<Self::ServerStreamStream>, Status> {
let message = request.into_inner().message;
let (tx, rx) = mpsc::channel(4);
tokio::spawn(async move {
for i in 0..3 {
let response = ResponseMessage {
message: format!("Stream response {} to {}", i, message),
};
tx.send(Ok(response)).await.unwrap();
}
});
Ok(Response::new(Box::pin(ReceiverStream::new(rx)) as Self::ServerStreamStream))
}
async fn client_stream(
&self,
request: Request<Streaming<RequestMessage>>
) -> Result<Response<ResponseMessage>, Status> {
let mut stream = request.into_inner();
let mut messages = Vec::new();
while let Some(req) = stream.next().await {
let req = req?;
messages.push(req.message);
}
let response = ResponseMessage {
message: format!("Received messages: {:?}", messages.join(", ")),
};
Ok(Response::new(response))
}
type BidiStreamStream = Pin<
Box<dyn tokio_stream::Stream<Item = Result<ResponseMessage, Status>> + Send>
>;
async fn bidi_stream(
&self,
request: Request<Streaming<RequestMessage>>
) -> Result<Response<Self::BidiStreamStream>, Status> {
let mut stream = request.into_inner();
let (tx, rx) = mpsc::channel(4);
tokio::spawn(async move {
while let Some(req) = stream.next().await {
let req = req.unwrap();
let response = ResponseMessage {
message: format!("Echo: {}", req.message),
};
tx.send(Ok(response)).await.unwrap();
}
});
Ok(Response::new(Box::pin(ReceiverStream::new(rx)) as Self::BidiStreamStream))
}
}
pub async fn start_server() -> Result<(), Box<dyn std::error::Error>> {
let addr = "[::1]:50051".parse()?;
let example_service = MyExampleService::default();
println!("Server listening on {}", addr);
Server::builder().add_service(ExampleServiceServer::new(example_service)).serve(addr).await?;
Ok(())
}
main.rs
use tokio::task;
mod example_client;
mod example_server;
use std::error::Error;
#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
// 启动服务端
task::spawn(async {
example_server::start_server().await.unwrap();
});
// 延时
tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
// 调用客户端
example_client::unary_call().await?;
example_client::server_stream().await?;
example_client::client_stream().await?;
example_client::bidi_stream().await?;
Ok(())
}
cargo build
然后cargo run
即可看到调用结果
Unary response: ResponseMessage { message: "Unary response to Hello from unary" }
Server stream response: ResponseMessage { message: "Stream response 0 to Hello from stream" }
Server stream response: ResponseMessage { message: "Stream response 1 to Hello from stream" }
Server stream response: ResponseMessage { message: "Stream response 2 to Hello from stream" }
Client stream response: ResponseMessage { message: "Received messages: \"Hello, from, client\"" }
Bidi stream response: ResponseMessage { message: "Echo: Hello" }
Bidi stream response: ResponseMessage { message: "Echo: from" }
Bidi stream response: ResponseMessage { message: "Echo: bidi" }