Rust gRPC---Tonic实战

news2024/11/17 23:36:21

API

一个API做了两件事

  • 客户端发起请求Request
  • 服务端作出响应Response

REST是什么

REST(Representational State Transfer):表现层状态传输,是一种设计风格,通常将 HTTP API 称为 RESTful API、RESTful 服务或 REST 服务

  • 资源由URL决定
  • 通过GET、POST、PUT、DELETE、PATCH、OPTIONS、HEAD、TRACE方法来操作资源
  • 操作资源的表现形式是XML、HTML、JSON等格式
  • 由客户端保存状态

RPC是什么

RPC(Remote Procedure Call):远程过程调用,像调用本地函数一样调用远程函数,grpc是RPC的一种实现

Tonic是什么

tonic基于HTTP/2构建,grpc的Rust实现

在这里插入图片描述

Protocol Buffers

Protocol Buffers协议缓冲区

服务方法

允许定义四种服务方法

  • 一元RPC:客户端向服务端发送单个请求并得到单个响应,就像调用普通函数一样
  • 服务端流式RPC:客户端向服务端发送请求并获取流以读取一系列消息,客户端从返回的流中读取直到没有消息
  • 客户端流式RPC:客户端使用流编写一系列消息发送到服务端,等待服务端做出响应
  • 双向流式RPC:双方使用读写流发送一系列消息,两个流独立运行

Protocol Buffers语法

  • 文件名以.proto结尾
  • 类似于json,体积更小、速度更快,会生成本机语言绑定

message:定义数据传递格式

  • 一个proto文件可以定义多个消息,消息可以嵌套

  • required:proto2必填、proto3不需要填

  • optional:可选字段

  • repeate:可重复字段

  • 标识号:每个字段必须要有一个唯一的标识号,范围1~2^29 -1(19000-19999保留标识号不能用)
    service:RPC服务接口

认证

  • SSL/TLS认证
    • TLS(Transport layerSecurity)安全传输层,建立在TCP协议上,前身是SSL(Secure Socket Layer)安全套接字层,将应用层的报文进行加密后再交由TCP传输
  • 基于Token认证
  • 自定义认证

安装protobuf

https://github.com/protocolbuffers/protobuf/releases/tag/v28.2

在这里插入图片描述

解压后将路径加入PATH

在这里插入图片描述

验证

protoc --version

Tonic实战

  • 需要安装protobuf
  • 项目地址:https://github.com/VCCICCV/tonic-demo

创建项目

cargo new tonic-server

cargo new tonic-client

两个项目都添加依赖

[dependencies]
tonic = "0.12.2"
tokio = { version = "1.40.0", features = ["macros", "rt-multi-thread"] }
# grpc编码器
prost = "0.13.2"
# 我们只在构建的时候需要它
[build-dependencies]
tonic-build = "0.12.2"

在两个项目同级新建proto\order.proto

syntax = "proto3";// 指定proto版本
package order;// 指定包名
// 定义服务
service OrderService {
    rpc GetOrder (OrderRequest) returns (OrderResponse);
    rpc SetOrder (Order) returns (OrderResponse);
}
// 定义message类型
message OrderRequest {
    int32 id = 1;
}

message OrderResponse {
    int32 id = 1;
    string description = 2;
    double price = 3;
}

message Order {
    int32 id = 1;
    string description = 2;
    double price = 3;
}

两个项目根目录新建build.rs,用于编译客户端和服务端的代码

// 告诉tonic-build,需要编译哪些文件
fn main() -> Result<(), Box<dyn std::error::Error>> {
    tonic_build::compile_protos("../proto/order.proto")?;// 注意导入的是项目外也就是项目同级的文件夹
    Ok(())
}

此时两个项目编译

cargo build

生成的rust代码在target\debug\build\tonic-server-****\out\order.rs

server项目新建src\order_service.rs

use tonic::{ Request, Response, Status };
use order::order_service_server::{ OrderService, OrderServiceServer };
use order::{ OrderRequest, OrderResponse, Order };

pub mod order {
    tonic::include_proto!("order");// 使用宏引入proto文件
}

#[derive(Debug, Default)] // 实现打印、默认值
pub struct MyOrderService;

#[tonic::async_trait] // 异步方法
impl OrderService for MyOrderService {
    // 获取订单
    async fn get_order(
        &self,
        request: Request<OrderRequest>
    ) -> Result<Response<OrderResponse>, Status> {
        let req = request.into_inner();
        let response = OrderResponse {
            id: req.id,
            description: format!("Order {}", req.id),
            price: 100.0,
        };
        Ok(Response::new(response))
    }
    // 创建订单
    async fn set_order(&self, request: Request<Order>) -> Result<Response<OrderResponse>, Status> {
        let order = request.into_inner();
        let response = OrderResponse {
            id: order.id,
            description: order.description,
            price: order.price,
        };
        Ok(Response::new(response))
    }
}
// 启动grpc服务
pub fn create_server() -> OrderServiceServer<MyOrderService> {
    OrderServiceServer::new(MyOrderService::default())
}

server项目main.rs

mod order_service;
use order_service::create_server;
use tonic::transport::Server;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // 监听端口
    let addr = "[::1]:50051".parse()?;
    // 创建grpc服务
    let order_service = create_server();
    println!("OrderService listening on {}", addr);
    // 启动服务
    Server::builder().add_service(order_service).serve(addr).await?;
    Ok(())
}

此时启动server项目

cargo run

使用Apifox创建grpc请求

在这里插入图片描述

设置端口[::1]:50051

在这里插入图片描述

导入proto文件

在这里插入图片描述

此时调用即可响应请求

{
    "id": 0,
    "description": "Order 0",
    "price": 100
}

client项目main.rs

use order::order_service_client::OrderServiceClient;
use order::{ OrderRequest, Order };

pub mod order {
    tonic::include_proto!("order");
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // 连接到gRPC服务端
    let mut client = OrderServiceClient::connect("http://[::1]:50051").await?;
    // 创建请求
    let request = tonic::Request::new(OrderRequest { id: 1 });
    // 发送请求并等待响应
    let response = client.get_order(request).await?;
    println!("RESPONSE={:?}", response);

    let order = Order {
        id: 1,
        description: "New Order".to_string(),
        price: 150.0,
    };
    // 创建请求
    let request = tonic::Request::new(order);
    // 发送请求并等待响应
    let response = client.set_order(request).await?;
    println!("RESPONSE={:?}", response);

    Ok(())
}

运行即可调用服务端的方法

cargo run

响应数据

RESPONSE=Response { metadata: MetadataMap { headers: {"content-type": "application/grpc", "date": "Fri, 20 Sep 2024 17:35:55 GMT", "grpc-status": "0"} }, message: OrderResponse { id: 1, description: "Order 1", price: 100.0 }, extensions: Extensions }
RESPONSE=Response { metadata: MetadataMap { headers: {"content-type": "application/grpc", "date": "Fri, 20 Sep 2024 tent-type": "application/grpc", "date": "Fri, 20 Sep 2024 17:35:55 GMT", "grpc-status": "0"} }, message: OrderResponse { id: 1, description: "Order 1", price: 100.0 }, extensions: Extensions }
RESPONSE=Response { metadata: MetadataMap { headers: {"content-type": "application/grpc", "date": "Fri, 20 Sep 2024 17:35:55 GMT", "grpc-status": "0"} }, message: OrderResponions: Extensions }
RESPONSE=Response { metadata: MetadataMap { headers: {"content-type": "application/grpc", "date": "Fri, 20 Sep 2024 17:35:55 GMT", "grpc-status": "0"} }, message: OrderRespontent-type": "application/grpc", "date": "Fri, 20 Sep 2024 17:35:55 GMT", "grpc-status": "0"} }, message: OrderResponse { id: 1, description: "New Order", price: 150.0 }, extensions: Extensions }

一元RPC、服务端流式RPC、客户端流式RPC、双向流式RPC

example.proto

syntax = "proto3";

package example;

service ExampleService {
  // Unary RPC
  rpc UnaryCall(RequestMessage) returns (ResponseMessage);

  // Server-side streaming RPC
  rpc ServerStream(RequestMessage) returns (stream ResponseMessage);

  // Client-side streaming RPC
  rpc ClientStream(stream RequestMessage) returns (ResponseMessage);

  // Bidirectional streaming RPC
  rpc BidiStream(stream RequestMessage) returns (stream ResponseMessage);
}

message RequestMessage {
  string message = 1;
}

message ResponseMessage {
  string message = 1;
}

server项目build.rs

// 告诉tonic-build,需要编译哪些文件
fn main() -> Result<(), Box<dyn std::error::Error>> {
    tonic_build::compile_protos("../proto/example.proto")?;// 注意导入的是项目外也就是项目同级的文件夹
    Ok(())
}

server项目cargo.toml

[dependencies]
tonic = "0.12.2"
tokio = { version = "1.40.0", features = ["full"] }
prost = "0.13.2"
futures-util = "0.3.30"
tokio-stream = "0.1.14"
tonic-reflection = "0.12.2"
[build-dependencies]
tonic-build = "0.12.2"

server项目main.rs

use tonic::{ transport::Server, Request, Response, Status };
use futures_util::Stream; // 使用 futures_util 提供的 Stream trait
use tokio_stream::wrappers::ReceiverStream; // 引入 tokio_stream
use tokio::sync::mpsc;
use std::pin::Pin;
use example::example_service_server::{ ExampleService, ExampleServiceServer };
use example::{ RequestMessage, ResponseMessage };

pub mod example {
    tonic::include_proto!("example");
}

#[derive(Default)]
pub struct MyExampleService {}

#[tonic::async_trait]
impl ExampleService for MyExampleService {
    //    1. 一元 RPC 调用
    async fn unary_call(
        &self,
        request: Request<RequestMessage>
    ) -> Result<Response<ResponseMessage>, Status> {
        println!("一元调用");
        let message = request.into_inner().message;
        Ok(
            Response::new(ResponseMessage {
                message: format!("Hello from Unary: {}", message),
            })
        )
    }

    // 2. 服务端流式 RPC 调用
    type ServerStreamStream = Pin<Box<dyn Stream<Item = Result<ResponseMessage, Status>> + Send>>;

    async fn server_stream(
        &self,
        request: Request<RequestMessage>
    ) -> Result<Response<Self::ServerStreamStream>, Status> {
        println!("服务端流式调用");
        let message = request.into_inner().message;

        let (tx, rx) = mpsc::channel(4);
        tokio::spawn(async move {
            for i in 1..=5 {
                tx.send(
                    Ok(ResponseMessage {
                        message: format!("Stream {}: {}", i, message),
                    })
                ).await.unwrap();
            }
            println!("流结束");
        });

        Ok(Response::new(Box::pin(ReceiverStream::new(rx))))
    }

    // 3. 客户端流式 RPC 调用
    async fn client_stream(
        &self,
        request: Request<tonic::Streaming<RequestMessage>>
    ) -> Result<Response<ResponseMessage>, Status> {
        println!("客户端流式调用");
        let mut stream = request.into_inner();
        let mut messages = vec![];

        while let Some(req) = stream.message().await? {
            messages.push(req.message);
        }

        Ok(
            Response::new(ResponseMessage {
                message: format!("Received: {:?}", messages),
            })
        )
    }

    // 4. 双向流式 RPC 调用
    type BidiStreamStream = Pin<Box<dyn Stream<Item = Result<ResponseMessage, Status>> + Send>>;

    async fn bidi_stream(
        &self,
        request: Request<tonic::Streaming<RequestMessage>>
    ) -> Result<Response<Self::BidiStreamStream>, Status> {
        println!("双向流式调用");
        let mut stream = request.into_inner();
        let (tx, rx) = mpsc::channel(4);

        tokio::spawn(async move {
            while let Some(req) = stream.message().await.unwrap() {
                tx.send(
                    Ok(ResponseMessage {
                        message: format!("Echo: {}", req.message),
                    })
                ).await.unwrap();
            }
            println!("流结束");
        });

        Ok(Response::new(Box::pin(ReceiverStream::new(rx))))
    }
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let addr = "[::1]:50051".parse()?;
    let example_service = MyExampleService::default();

    println!("Server listening on {}", addr);

    Server::builder().add_service(ExampleServiceServer::new(example_service)).serve(addr).await?;

    Ok(())
}

此时cargo buildcargo run即可用apifox发送rpc请求

注意我们更改了proto文件,需要重新指定

一元rpc

在这里插入图片描述

服务端流式 RPC

在这里插入图片描述

客户端流式 RPC

在这里插入图片描述

双向流式 RPC 调用

在这里插入图片描述

client项目main.rs

use tonic::Request;
use example::example_service_client::ExampleServiceClient;
use example::{ RequestMessage };

pub mod example {
    tonic::include_proto!("example");
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // 连接到服务端
    let mut client = ExampleServiceClient::connect("http://[::1]:50051").await?;

    // 1. 一元 RPC 调用
    let request = Request::new(RequestMessage {
        message: String::from("Unary Hello"),
    });
    let response = client.unary_call(request).await?;
    println!("Unary Response: {:?}", response.into_inner());

    // 2. 服务器流式 RPC 调用
    let request = Request::new(RequestMessage {
        message: String::from("Stream Hello"),
    });
    let mut stream = client.server_stream(request).await?.into_inner();
    while let Some(res) = stream.message().await? {
        println!("Server Stream Response: {:?}", res);
    }
    Ok(())
}

由于proto生成的rust方法在不同的位置,例如server生成的代码在server/target***下,客户端无法调用server提供的方法,我们需要将代码生成的目录设置为两个项目的公共目录

改造为gRPC微服务通信

  • 反射(Reflection):允许客户端在运行时发现 gRPC 服务所提供的方法、消息类型等信息,而无需在编译时就知道这些详细信息,但是它只提供了一种动态发现服务的能力,你可以用postman、apifox的反射调用而不需要导入proto文件,在编写客户端代码时仍然需要导入生成后的代码

反射仅仅是为了让客户端在运行时查询服务定义,而不是为了消除客户端对服务定义的依赖

项目结构
在这里插入图片描述

创建公共目录proto
proto\example.proto

syntax = "proto3";

package example;

service ExampleService {
  // Unary RPC
  rpc UnaryCall(RequestMessage) returns (ResponseMessage);

  // Server-side streaming RPC
  rpc ServerStream(RequestMessage) returns (stream ResponseMessage);

  // Client-side streaming RPC
  rpc ClientStream(stream RequestMessage) returns (ResponseMessage);

  // Bidirectional streaming RPC
  rpc BidiStream(stream RequestMessage) returns (stream ResponseMessage);
}

message RequestMessage {
  string message = 1;
}

message ResponseMessage {
  string message = 1;
}

两个项目的cargo.toml

[dependencies]
tonic = "0.12.2"
tokio = { version = "1.40.0", features = ["full"] }
prost = "0.13.3"
tokio-stream = "0.1.14"
tonic-reflection = "0.12.2"
[build-dependencies]
tonic-build = "0.12.2"

server\build.rs

use std::path::PathBuf;

fn main() -> Result<(), Box<dyn std::error::Error>> {
    let out_dir = PathBuf::from("../proto");
    // let out_dir = PathBuf::from(env::var("OUT_DIR").unwrap()); // 指定输出目录在target下
    tonic_build
        ::configure()
        .build_server(true)// 指定是否生成服务端代码
        .out_dir("src/generated")// 指定rust文件输出目录
        .file_descriptor_set_path(out_dir.join("example_descriptor.bin")) // 生成文件描述符集
        .compile(&["../proto/example.proto"], &["../proto"])
        .unwrap();
    Ok(())
}

client\build.rs

use std::path::PathBuf;

fn main() -> Result<(), Box<dyn std::error::Error>> {
    let out_dir = PathBuf::from("../proto");
    // let out_dir = PathBuf::from(env::var("OUT_DIR").unwrap()); // 指定输出目录在target下
    tonic_build
        ::configure()
        .build_client(true)// 指定是否生成客户端端代码
        .out_dir("src/generated")// 指定rust文件输出目录
        .file_descriptor_set_path(out_dir.join("example_descriptor.bin")) // 生成文件描述符集
        .compile(&["../proto/example.proto"], &["../proto"])
        .unwrap();
    Ok(())
}
  • .build_client(true):生成客户端Rust代码
  • .build_server(true):生成服务端Rust代码
  • .out_dir(“src/generated”):指定生成的Rust代码的目录,cargo build你会发现生成的example.rs内容是一样的,因为我们定义的请求与响应的proto文件内容是一样的
  • .file_descriptor_set_path:生成文件描述符集,用于创建反射服务
  • .compile(&[“…/proto/example.proto”], &[“…/proto”]):指定编译的proto文件及其目录

server\src\main.rs

mod server_lib;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    server_lib::start_server().await?;
    Ok(())
}

server\src\server_lib.rs 这里我们添加了反射

use tonic::{ Request, Response, Status, Streaming };
use tokio_stream::{ wrappers::ReceiverStream, StreamExt };
use std::pin::Pin;
use tokio::sync::mpsc;

pub mod example {
    // 1. 导入文件描述符集
    pub const FILE_DESCRIPTOR_SET: &[u8] = include_bytes!("../../proto/example_descriptor.bin");
    // tonic::include_proto!("example");// 你可以ctrl点击查看源码,它是一个宏,添加了env的编译目录,我们这里指定生成的rust目录,所以直接导入即可
    include!("./generated/example.rs");
}

use example::example_service_server::{ ExampleService, ExampleServiceServer };
use example::{ RequestMessage, ResponseMessage };

#[derive(Debug, Default)]
pub struct MyExampleService;

#[tonic::async_trait]
impl ExampleService for MyExampleService {
    // 一元RPC调用
    async fn unary_call(
        &self,
        request: Request<RequestMessage>
    ) -> Result<Response<ResponseMessage>, Status> {
        let response = ResponseMessage {
            message: format!("Unary response to {}", request.into_inner().message),
        };
        Ok(Response::new(response))
    }

    type ServerStreamStream = Pin<
        Box<dyn tokio_stream::Stream<Item = Result<ResponseMessage, Status>> + Send>
    >;
    // 服务端流式RPC调用
    async fn server_stream(
        &self,
        request: Request<RequestMessage>
    ) -> Result<Response<Self::ServerStreamStream>, Status> {
        let message = request.into_inner().message;
        let (tx, rx) = mpsc::channel(4);

        tokio::spawn(async move {
            for i in 0..3 {
                let response = ResponseMessage {
                    message: format!("Stream response {} to {}", i, message),
                };
                tx.send(Ok(response)).await.unwrap();
            }
        });

        Ok(Response::new(Box::pin(ReceiverStream::new(rx)) as Self::ServerStreamStream))
    }
    // 客户端流式RPC调用
    async fn client_stream(
        &self,
        request: Request<Streaming<RequestMessage>>
    ) -> Result<Response<ResponseMessage>, Status> {
        let mut stream = request.into_inner();
        let mut messages = Vec::new();

        while let Some(req) = stream.next().await {
            let req = req?;
            messages.push(req.message);
        }

        let response = ResponseMessage {
            message: format!("Received messages: {:?}", messages.join(", ")),
        };
        Ok(Response::new(response))
    }

    type BidiStreamStream = Pin<
        Box<dyn tokio_stream::Stream<Item = Result<ResponseMessage, Status>> + Send>
    >;
    // 双向流式RPC调用
    async fn bidi_stream(
        &self,
        request: Request<Streaming<RequestMessage>>
    ) -> Result<Response<Self::BidiStreamStream>, Status> {
        let mut stream = request.into_inner();
        let (tx, rx) = mpsc::channel(4);

        tokio::spawn(async move {
            while let Some(req) = stream.next().await {
                let req = req.unwrap();
                let response = ResponseMessage {
                    message: format!("Echo: {}", req.message),
                };
                tx.send(Ok(response)).await.unwrap();
            }
        });

        Ok(Response::new(Box::pin(ReceiverStream::new(rx)) as Self::BidiStreamStream))
    }
}

pub async fn start_server() -> Result<(), Box<dyn std::error::Error>> {
    let addr = "[::1]:50051".parse()?;
    let example_service = MyExampleService::default();

    println!("Server listening on {}", addr);
    // 2. 创建反射服务
    let reflection_service = tonic_reflection::server::Builder
        ::configure()
        .register_encoded_file_descriptor_set(example::FILE_DESCRIPTOR_SET)
        .build_v1()?;

    tonic::transport::Server
        ::builder()
        .add_service(ExampleServiceServer::new(example_service))
        .add_service(reflection_service) // 3. 添加反射服务
        .serve(addr).await?;

    Ok(())
}

tonic-client\src\main.rs你可以在生成的client\src\generated\example.rs查看client端提供的方法

use tonic::transport::Channel;
use example::example_service_client::ExampleServiceClient;
use example::RequestMessage;
use tokio_stream::StreamExt;

pub mod example {
    // tonic::include_proto!("example");
    include!("./generated/example.rs");
}
// 一元RPC调用
async fn unary_call(
    client: &mut ExampleServiceClient<Channel>
) -> Result<(), Box<dyn std::error::Error>> {
    let request = tonic::Request::new(RequestMessage {
        message: "Hello from unary".into(),
    });
    let response = client.unary_call(request).await?;
    println!("Unary response: {:?}", response.into_inner());
    Ok(())
}
// 服务端流式RPC调用
async fn server_stream(
    client: &mut ExampleServiceClient<Channel>
) -> Result<(), Box<dyn std::error::Error>> {
    let request = tonic::Request::new(RequestMessage {
        message: "Hello from stream".into(),
    });
    let mut response = client.server_stream(request).await?.into_inner();
    while let Some(res) = response.next().await {
        println!("Server stream response: {:?}", res?);
    }
    Ok(())
}
// 客户端流式RPC调用
async fn client_stream(
    client: &mut ExampleServiceClient<Channel>
) -> Result<(), Box<dyn std::error::Error>> {
    let request = tokio_stream::iter(
        vec![
            RequestMessage { message: "Hello".into() },
            RequestMessage { message: "from".into() },
            RequestMessage { message: "client".into() }
        ]
    );
    let response = client.client_stream(request).await?;
    println!("Client stream response: {:?}", response.into_inner());
    Ok(())
}
// 双向流式RPC调用
async fn bidi_stream(
    client: &mut ExampleServiceClient<Channel>
) -> Result<(), Box<dyn std::error::Error>> {
    let request = tokio_stream::iter(
        vec![
            RequestMessage { message: "Hello".into() },
            RequestMessage { message: "from".into() },
            RequestMessage { message: "bidi".into() }
        ]
    );
    let mut response = client.bidi_stream(request).await?.into_inner();
    while let Some(res) = response.next().await {
        println!("Bidi stream response: {:?}", res?);
    }
    Ok(())
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // 连接到gRPC服务器
    let mut client = ExampleServiceClient::connect("http://[::1]:50051").await?;

    unary_call(&mut client).await?;
    server_stream(&mut client).await?;
    client_stream(&mut client).await?;
    bidi_stream(&mut client).await?;

    Ok(())
}

此时cargo run两个服务即可看到响应信息,你也可以用postman、apifox请求

tonic超时处理、认证看官方例子就可以了https://github.com/hyperium/tonic/tree/master/examples/src

bin crate实现client\server(可选了解)

  • bin:二进制可执行文件
  • lib:库crate
    一般来说是用两个服务,两个服务间通信用rpc,bin crate的方法直接调用就好了,很多教程都是用的bin,包括官方例子也是用的bin

这里给出的是在main中启动服务端和客户端,可能你在嵌入式中使用gRPC?

如果你想作为bin使用gRPC,你可以在cargo.toml里添加[[bin]]指定crate,bin crate会编译为单独的二进制文件

[[bin]]
name="client"
path="src/example_client.rs"
[[bin]]
name="server"
path="src/example_server.rs"

在client、server编写main函数

然后cargo run --bin servercargo run --bin client

cargo.toml

[dependencies]
tonic = "0.12.2"
tokio = { version = "1.40.0", features = ["full"] }
prost = "0.13.3"
tokio-stream = "0.1.14"
tonic-reflection = "0.12.2"
[build-dependencies]
tonic-build = "0.12.2"

../proto/example.proto

syntax = "proto3";

package example;

service ExampleService {
  // Unary RPC
  rpc UnaryCall(RequestMessage) returns (ResponseMessage);

  // Server-side streaming RPC
  rpc ServerStream(RequestMessage) returns (stream ResponseMessage);

  // Client-side streaming RPC
  rpc ClientStream(stream RequestMessage) returns (ResponseMessage);

  // Bidirectional streaming RPC
  rpc BidiStream(stream RequestMessage) returns (stream ResponseMessage);
}

message RequestMessage {
  string message = 1;
}

message ResponseMessage {
  string message = 1;
}

build.rs

fn main() -> Result<(), Box<dyn std::error::Error>> {
    tonic_build
        ::configure()
        .compile(&["../proto/example.proto"], &["../proto"])
        .unwrap();
    Ok(())
}

src/example_client.rs

use example::example_service_client::ExampleServiceClient;
use example::RequestMessage;
use tokio_stream::StreamExt;

pub mod example {
    tonic::include_proto!("example");
}
// 一元RPC
pub async fn unary_call() -> Result<(), Box<dyn std::error::Error>> {
    let mut client = ExampleServiceClient::connect("http://[::1]:50051").await?;
    let request = tonic::Request::new(RequestMessage {
        message: "Hello from unary".into(),
    });
    let response = client.unary_call(request).await?;
    println!("Unary response: {:?}", response.into_inner());
    Ok(())
}
// 服务端流式RPC
pub async fn server_stream() -> Result<(), Box<dyn std::error::Error>> {
    let mut client = ExampleServiceClient::connect("http://[::1]:50051").await?;
    let request = tonic::Request::new(RequestMessage {
        message: "Hello from stream".into(),
    });
    let mut response = client.server_stream(request).await?.into_inner();
    while let Some(res) = response.next().await {
        println!("Server stream response: {:?}", res?);
    }
    Ok(())
}
// 客户端流式RPC
pub async fn client_stream() -> Result<(), Box<dyn std::error::Error>> {
    let mut client_stream = ExampleServiceClient::connect("http://[::1]:50051").await?;
    let request = tokio_stream::iter(
        vec![
            RequestMessage { message: "Hello".into() }, 
            RequestMessage { message: "from".into() },
            RequestMessage { message: "client".into() }
        ]
    );
    let response = client_stream.client_stream(request).await?;
    println!("Client stream response: {:?}", response.into_inner());
    Ok(())
}
// 双向流式RPC
pub async fn bidi_stream() -> Result<(), Box<dyn std::error::Error>> {
    let mut client = ExampleServiceClient::connect("http://[::1]:50051").await?;
    let request = tokio_stream::iter(
        vec![
            RequestMessage { message: "Hello".into() },
            RequestMessage { message: "from".into() },
            RequestMessage { message: "bidi".into() }
        ]
    );
    let mut response = client.bidi_stream(request).await?.into_inner();
    while let Some(res) = response.next().await {
        println!("Bidi stream response: {:?}", res?);
    }
    Ok(())
}

src/example_server.rs

use tonic::{ transport::Server, Request, Response, Status, Streaming };
use tokio_stream::{ wrappers::ReceiverStream, StreamExt };
use std::pin::Pin;
use tokio::sync::mpsc;

pub mod example {
    tonic::include_proto!("example");
}

use example::example_service_server::{ ExampleService, ExampleServiceServer };
use example::{ RequestMessage, ResponseMessage };

#[derive(Debug, Default)]
pub struct MyExampleService;

#[tonic::async_trait]
impl ExampleService for MyExampleService {
    async fn unary_call(
        &self,
        request: Request<RequestMessage>
    ) -> Result<Response<ResponseMessage>, Status> {
        let response = ResponseMessage {
            message: format!("Unary response to {}", request.into_inner().message),
        };
        Ok(Response::new(response))
    }

    type ServerStreamStream = Pin<
        Box<dyn tokio_stream::Stream<Item = Result<ResponseMessage, Status>> + Send>
    >;

    async fn server_stream(
        &self,
        request: Request<RequestMessage>
    ) -> Result<Response<Self::ServerStreamStream>, Status> {
        let message = request.into_inner().message;
        let (tx, rx) = mpsc::channel(4);

        tokio::spawn(async move {
            for i in 0..3 {
                let response = ResponseMessage {
                    message: format!("Stream response {} to {}", i, message),
                };
                tx.send(Ok(response)).await.unwrap();
            }
        });

        Ok(Response::new(Box::pin(ReceiverStream::new(rx)) as Self::ServerStreamStream))
    }

    async fn client_stream(
        &self,
        request: Request<Streaming<RequestMessage>>
    ) -> Result<Response<ResponseMessage>, Status> {
        let mut stream = request.into_inner();
        let mut messages = Vec::new();

        while let Some(req) = stream.next().await {
            let req = req?;
            messages.push(req.message);
        }

        let response = ResponseMessage {
            message: format!("Received messages: {:?}", messages.join(", ")),
        };
        Ok(Response::new(response))
    }

    type BidiStreamStream = Pin<
        Box<dyn tokio_stream::Stream<Item = Result<ResponseMessage, Status>> + Send>
    >;

    async fn bidi_stream(
        &self,
        request: Request<Streaming<RequestMessage>>
    ) -> Result<Response<Self::BidiStreamStream>, Status> {
        let mut stream = request.into_inner();
        let (tx, rx) = mpsc::channel(4);

        tokio::spawn(async move {
            while let Some(req) = stream.next().await {
                let req = req.unwrap();
                let response = ResponseMessage {
                    message: format!("Echo: {}", req.message),
                };
                tx.send(Ok(response)).await.unwrap();
            }
        });

        Ok(Response::new(Box::pin(ReceiverStream::new(rx)) as Self::BidiStreamStream))
    }
}

pub async fn start_server() -> Result<(), Box<dyn std::error::Error>> {
    let addr = "[::1]:50051".parse()?;
    let example_service = MyExampleService::default();

    println!("Server listening on {}", addr);

    Server::builder().add_service(ExampleServiceServer::new(example_service)).serve(addr).await?;

    Ok(())
}

main.rs

use tokio::task;
mod example_client;
mod example_server;
use std::error::Error;

#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
    // 启动服务端 
    task::spawn(async {
        example_server::start_server().await.unwrap();
    });

    // 延时
    tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;

    // 调用客户端
    example_client::unary_call().await?;
    example_client::server_stream().await?;
    example_client::client_stream().await?;
    example_client::bidi_stream().await?;

    Ok(())
}

cargo build然后cargo run即可看到调用结果

Unary response: ResponseMessage { message: "Unary response to Hello from unary" }
Server stream response: ResponseMessage { message: "Stream response 0 to Hello from stream" }
Server stream response: ResponseMessage { message: "Stream response 1 to Hello from stream" }
Server stream response: ResponseMessage { message: "Stream response 2 to Hello from stream" }
Client stream response: ResponseMessage { message: "Received messages: \"Hello, from, client\"" }
Bidi stream response: ResponseMessage { message: "Echo: Hello" }       
Bidi stream response: ResponseMessage { message: "Echo: from" }        
Bidi stream response: ResponseMessage { message: "Echo: bidi" } 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2169655.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

AI大模型生态暨算力大会今日举行,20位大咖领衔共探「AI NATIVE,生成未来」

出品&#xff5c;AI大模型工场 9月25日消息&#xff0c;由AI大模型工场主办AI大模型生态暨算力大会今日举行。作为国内最具影响力与最懂大模型的AI生态大会&#xff0c;大会讨论了AI大模型的最新进展和未来发展趋势。 2024年被业内称为大模型应用落地元年&#xff0c;大模型产…

【入门01】arcgis api 4.x 创建地图、添加图层、添加指北针、比例尺、图例、卷帘、图层控制、家控件(附完整源码)

1.效果 2.代码 <!DOCTYPE html> <html lang"en"><head><meta charset"UTF-8"><meta name"viewport" content"widthdevice-width, initial-scale1.0"><title></title><link rel"s…

JSP+Servlet+Mybatis实现列表显示和批量删除等功能

前言 使用JSP回显用户列表&#xff0c;可以进行批量删除&#xff08;有删除确认步骤&#xff09;&#xff0c;和修改用户数据&#xff08;用户数据回显步骤&#xff09;使用servlet处理传递进来的请求参数&#xff0c;并调用dao处理数据并返回使用mybatis&#xff0c;书写dao层…

828华为云征文|华为云Flexus云服务器X实例的网络性能测试

828华为云征文&#xff5c;华为云Flexus云服务器X实例的网络性能测试 前言一、Flexus云服务器X实例介绍1.1 Flexus云服务器X实例简介1.2 Flexus云服务器X实例特点1.3 Flexus云服务器X实例使用场景 二、iperf3工具介绍2.1 iperf3简介2.2 iperf3特点 三、本次实践介绍3.1 本次实践…

专利如何有效维持?

专利的获得并非一劳永逸&#xff0c;其法律效力的持续存在依赖于有效的专利维持工作。专利维持&#xff0c;即专利权人在专利有效期内&#xff0c;按照法定程序缴纳年费、提交必要的文件&#xff0c;以保持专利权的有效状态。这一过程对于确保创新成果持续闪耀、维护企业竞争优…

DSP——从入门到放弃系列——多核导航器(持续更新)

1、概述 C6678中的数据移动非常复杂&#xff0c;多核导航器是C6678中协助完成在器件内高速数据包移动的外围设备。 2、组成 多核导航器由1个队列管理子系统&#xff08;QMSS&#xff09;1个包DMA&#xff08;Packet DMA PKTDMA&#xff09;来控制和实现器件内的高速数据包移…

Arthas mc(Memory Compiler/内存编译器 )

文章目录 二、命令列表2.2 class/classloader相关命令2.2.2 mc &#xff08;Memory Compiler/内存编译器 &#xff09;举例1&#xff1a;可以通过-d命令指定输出目录&#xff1a;mc -d /temporary/tmp /temporary/tmp/AccountController.java举例2&#xff1a;通过--classLoade…

SpringBoot-全局处理异常,时间格式,跨域,拦截器,监听器

1.全局异常处理 使用ControllerAdvice与ExceptionHandler注解 /*** 全局异常处理程序** author * date */ ControllerAdvice ResponseBody public class GlobalExceptionHandler {ExceptionHandler(Exception.class)public JsonResult handleException(Exception e) {e.print…

Vue3 中 this 一分钟了解

Vue3 中 this 在Vue3的开发过程中&#xff0c;this的使用方式和Vue2有着显著的不同&#xff0c;特别是在组合式API&#xff08;Composition API&#xff09;的引入后。本文将深入探讨Vue3中this的使用&#xff0c;解析其底层源码&#xff0c;并探讨这种设计背后的原因&#xff…

Heart Animated Realistic 心脏运动模型素材带动画

Realistic Heart具有两个多边形质量的网格,具有详细的解剖结构,并配有高清纹理2048x2048,在高低多边形网格上具有高清法线贴图,可在教育、游戏和虚拟现实场景中获得更好、更真实的效果。 还具有完整的心动周期。 下载:​​Unity资源商店链接资源下载链接 效果图:

51单片机如何判断浮点数nan

使用这个函数进行判断 帮助信息内的描述如下 _chkfloat_ #include <intrins.h> unsigned char _chkfloat_ (float val); /* number for error checking */ Description: The _chkfloat_ function checks the status of a floating-point number. Return Value: The…

短视频去水印解析api接口使用文档

短视频去水印解析api接口&#xff0c;支持各大平台短视频和图集。 请求示例&#xff1a;https://www.dspqsy.vip/spapi?key密钥&url短视频链接 返回数据格式&#xff1a;JSON 请求方式&#xff1a;GET/POST 请求参数&#xff1a;url (短视频分享的URL) PHP 源码&…

C语言数组探秘:数据操控的艺术【下】

承接上篇&#xff0c;我们继续讲数组的内容。 八.二维数组的使用 当我们掌握了二维数组的创建和初始化&#xff0c;那我们怎么使用二维数组呢&#xff1f;其实二维数组访问也是使用下标的形式的&#xff0c;二维数组是有行和列的&#xff0c;只要锁定了行和列就能唯一锁定数组中…

Race Karts Pack 全管线 卡丁车赛车模型素材

是8辆高细节、可定制的赛车,内部有纹理。经过优化,可在手机游戏中使用。Unity车辆系统已实施-准备驾驶。 此套装包含8种不同的车辆,每种车辆有8-10种颜色变化,总共有75种车辆变化! 技术细节: -每辆卡丁车模型使用4种材料(车身、玻璃、车轮和BrakeFlare) 纹理大小: -车…

屏幕活动保存到NAS

目录 一、套件选择 二、员工机准备 1、下载安装ffmpeg 2、安装运行rtsp-simple-server 3、生成桌面流 4、接收查看桌面变化 三、NAS端配置 1、安装套件 2、配置Surveillence Station 3、实时监看 4、历史记录查看 5、录像文件操作 四、总结 朋友的朋友找上我,说到…

网络安全专业,在校大学生如何赚外快,实现财富自由?零基础入门到精通,收藏这一篇就够了

如今&#xff0c;计算机行业内卷严重&#xff0c;我们不找点赚外快的路子这么行呢&#xff1f; 今天就来说说网络安全专业平时都怎么赚外快。 一、安全众测 国内有很多成熟的src众测平台&#xff0c;如漏洞盒子、火线众测、补天、CNVD、漏洞银行等。一些大厂也有自己的src&a…

大厂必问 · 如何防止订单重复?

在电商系统或任何涉及订单操作的场景中&#xff0c;用户多次点击“提交订单”按钮可能会导致重复订单提交&#xff0c;造成数据冗余和业务逻辑错误&#xff0c;导致库存问题、用户体验下降或财务上的错误。因此&#xff0c;防止订单重复提交是一个常见需求。 常见的重复提交场…

Dapper介绍及特性

一、Dapper介绍及特性 Dapper是一个.NET平台上的轻量级对象关系映射&#xff08;ORM&#xff09;工具&#xff0c;它通过扩展IDbConnection接口&#xff0c;提供了一系列的扩展方法来执行SQL查询并将结果映射到.NET对象中。Dapper以其高性能和简单易用著称&#xff0c;特别适合…

springboot中有哪些方式可以解决跨域问题

文章目录 什么是跨域解决方案CrossOrigin注解实现WebMvcConfigurer接口CorsFilter过滤器如何选择&#xff1f; 前些天发现了一个巨牛的人工智能学习网站&#xff0c;通俗易懂&#xff0c;风趣幽默&#xff0c;忍不住分享一下给大家。点击跳转到网站。 Talk is cheap &#xff0…

Keepalived+Nginx 高可用集群(双主模式)

1.基础环境配置 [rootlb1 ~]# systemctl stop firewalld # 关闭防火墙 [rootlb1 ~]# sed -i s/^SELINUX.*/SELINUXdisabled/ /etc/sysconfig/selinux # 关闭selinux&#xff0c;重启生效 [rootlb1 ~]# setenforce 0          …