首先什么了解什么是RPC?
不同于 TCP 和 HTTP,TCP 和 HTTP 是网络传输协议,而 RPC 是一种设计、实现框架,通讯协议只是其中一部分,RPC 不仅要解决协议通讯的问题,还有序列化和反序列化,以及消息通知。
一个完整的 RPC 架构里面包含了四个核心的组件,分别是:Client, Server, ClientOptions, ServerOptions,这个 Options 就是RPC需要设计实现的东西。
- 客户端(Client):服务的调用方
- 服务端(Server):真正的服务提供方
- 客户端存根(ClientOption): Socket 管理,网络收发包的序列化
- 服务端存根(ServerOption): Socket 管理,提醒 server 层 rpc 方法调用,以及网络收发包的序列化
RPC 逻辑图
gRPC
什么是 gRPC?
RPC 的一种,它使用 Protocol Buffer(简称 Protobuf)作为序列化格式,Protocol Buffer 是来自 google 的序列化框架。
官方地址:https://grpc.io/
gRPC 是一个高性能、开源和通用的 RPC 框架,面向移动和 HTTP/2 设计。目前提供 C、Java 和 Go 语言版本,分别是:grpc, grpc-java, grpc-go. 其中 C 版本支持 C, C++, Node.js, Python, Ruby, Objective-C, PHP 和 C# 支持。
比 JSON 更加轻便高效,gRPC 基于 HTTP/2 标准设计,带来诸如双向流、流控、头部压缩、单 TCP 连接上的多复用请求等特性。这些特性使得其在移动设备上表现更好,更省电和节省空间占用。
在gRPC中,客户端应用程序可以直接在另一台计算机上的服务器应用程序上调用方法,就好像它是本地对象一样,从而使您更轻松地创建分布式应用程序和服务。
调用模型
- 客户端(gRPC Stub)调用 A 方法,发起 RPC 调用
- 对请求信息使用 Protobuf 进行对象序列化压缩(IDL),Proto Request 压缩后发送请求信息到服务端
- 服务端(gRPC Server)接收到请求后,解码请求体,进行业务逻辑处理并返回
- 对响应结果使用 Protobuf 进行对象序列化压缩(IDL),Proto Response 压缩后返回信息到客户端
- 客户端接受到服务端响应,解码请求体。回调被调用的 A 方法,唤醒正在等待响应(阻塞)的客户端并返回响应结果。
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-T9NBsmyE-1672275629314)(null)]
适用场景
- 分布式场景:gRPC设计为低延迟和高吞吐量通信,非常适用于效率至关重要的轻型微服务。
- 点对点实时通信:gRPC对双向流媒体提供出色的支持,可以实时推送消息而无需轮循。
- 多语言混合开发:支持主流的开发语言,使gRPC成为多语言开发环境的理想选择。
- 网络受限环境:使用 Protobuf 序列化 gRPC消息。gRPC消息始终小于等效的 JSON 消息。
通信模式
在 RPC 中,服务端会开启服务供客户端调用,每一次 RPC 调用都是一次客户端发送请求到服务端获得相应结果的过程,中间过程被封装了,看起来像一次本地调用一样,一次 RPC 调用也就是一次通讯过程。
RPC调用根据双端是否为流式交互,有着不同的通信模式。
分别是:简单 RPC(Unary RPC)、服务端流式 RPC (Server streaming RPC)、客户端流式 RPC (Clientstreaming RPC)、双向流式 RPC(Bi-directional streaming RPC)。
它们主要有以下特点:
服务类型 | 描述 |
---|---|
简单 RPC(Unary RPC) | 一般的 RPC 调用,传入一个请求对象,返回一个返回对象 |
服务端流式 RPC(Server streaming RPC) | 传入一个请求对象,服务端可以返回多个结果对象 |
客户端流式 RPC(Clientstreaming RPC) | 客户端传入多个请求对象,服务端返回一个结果对象 |
双向流式 RPC(Bi-directional streaming RPC) | 结合客户端流式RPC和服务端流式RPC,可以传⼊多个请求对象,返回多个结果对 |
简单 RPC(Unary RPC)
又称一元(没有媒体流),最简单的rpc调用,一个请求对象对应一个返回对象。客户端发起一次请求客户端相应一个数据,即标准的RPC通信。
服务端流式 RPC(Server streaming RPC)
客户端传⼊⼀个请求对象,服务端可以返回多个结果对象
服务端流式 RPC 下,客户端发出一个请求,不会立即得到一个响应,而是在服务端与客户端之间建立了一个单身的流,服务端可以随时向流中写入响应数据,最后由服务端主动关闭流,客户端需要监听这个流,不断获取响应数据流直到流关闭。
应用场景示例:
客户端向服务端发送一个股票代码,服务端就实时把数据源源不断的返回给客户端,客户端收到后实时渲染。
客户端流式 RPC(Clientstreaming RPC)
客户端流式 RPC,由客户端传入多个请求对象,服务端只返回一个响应结果。
应用场景示例:
如物联网终端向服务器报送数据
双向流式 RPC(Bi-directional streaming RPC)
双向流式 RPC 结合客户端流式 rpc和服务端流式 rpc, 可以传入多个对象,返回多个响应对象。
总结
gRPC设计为低延迟和⾼吞吐量通信。gRPC⾮常适⽤于效率⾄关重要的轻型微服务、点对点实时通信 - gRPC对双向流媒体提供出⾊的⽀持。
gRPC服务可以实时推送消息⽽⽆需轮询。
多语⾔混合开发环境 - gRPC⼯具⽀持所有流⾏的开发语⾔,使gRPC成为多语⾔开发环境的理想选择。
⽹络受限环境 - 使⽤Protobuf(⼀种轻量级消息格式)序列化gRPC消息。gRPC消息始终⼩于等效的JSON消息
gRPC框架代码实现
安装必备依赖
golang
版本的grpc
要求go
版本要在1.6
以上
安装 gRPC
go get -u google.golang.org/grpc
安装有问题请切换代理源。
安装 protobuf
go get -u github.com/golang/protobuf/protoc-gen-go
protobuf简介
Protocol Buffers(protobuf)
:与编程语言无关,与程序运行平台无关的数据序列化协议以及接口定义语言(IDL: interface definition language)。
要使用protobuf
需要先理解几个概念:
-
protobuf
编译器protoc
,用于编译.proto
文件,github 地址:https://github.com/protocolbuffers/protobuf -
编程语言的
protobuf
插件,搭配protoc
编译器,根据.proto
文件生成对应编程语言的代码。 -
protobuf runtime library
:每个编程语言有各自的protobuf runtime
,用于实现各自语言的protobuf
协议。
生成 pb 文件
编写grpc
接口,在.proto
文件定义接口通信数据格式和接口信息,然后通过protoc
自动生成对应的go
代码
protoc 工具
protoc -I helloworld/ helloworld/helloworld.proto --go_out=plugins=grpc:helloworld // 根据 .proto 文件生成对应的.go文件
- path 指定路径
--go_out
:指定输出go
代码plugins=grpc
:.proto
中的service
是grpc
扩展的功能,需要使用grpc
插件进行解析才能生成对应的接口定义代码。
.proto 文件
proto 文件基础规范:
Message命名采用驼峰命名方式,字段命名采用小写字母加下划线分隔方式。
Enums类型名采用驼峰命名方式,字段命名采用大写字母加下划线分隔方式,即便业务上不需要参数也必须指定一个请求消息,一般会定义一个空message。
示例:rpc.proto
//proto3标准
syntax = "proto3";
//包名
option go_package = "./;pb";
// 请求数据体
message RequestMessage {
string name = 1;
string message = 2;
}
// 返回数据体
message ResponseMessage {
string name = 1;
string message = 2;
}
//定义rpc接口
service Greets {
// 简单 RPC(Unary RPC)
rpc sayUnaryHello(RequestMessage) returns (ResponseMessage) {}
// 服务端流式 RPC(Server streaming RPC)
rpc sayServerStreamHello (RequestMessage) returns (stream ResponseMessage) {}
// 客户端流式 RPC(Client streaming RPC)
rpc SayClientStreamHello (stream RequestMessage) returns (ResponseMessage) {}
// 双向流式 RPC(Bi-directional streaming RPC)
rpc SayBiDirectionalStreamHello (stream RequestMessage) returns (stream ResponseMessage) {}
}
根据接口描述文件生成源码
$ protoc rpc.proto --go_out=plugins=grpc:.
gRPC 代码示例
完整代码地址:git@github.com:oscar-liu/gRPC-learn-demo.git
rpc.proto 文件
//proto3标准
syntax = "proto3";
//包名
option go_package = "./;pb";
// 请求数据体
message RequestMessage {
string name = 1;
string message = 2;
}
// 返回数据体
message ResponseMessage {
string name = 1;
string message = 2;
}
//定义rpc接口
service Greets {
// 简单 RPC(Unary RPC)
rpc sayUnaryHello(RequestMessage) returns (ResponseMessage) {}
// 服务端流式 RPC(Server streaming RPC)
rpc sayServerStreamHello (RequestMessage) returns (stream ResponseMessage) {}
// 客户端流式 RPC(Client streaming RPC)
rpc SayClientStreamHello (stream RequestMessage) returns (ResponseMessage) {}
// 双向流式 RPC(Bi-directional streaming RPC)
rpc SayBiDirectionalStreamHello (stream RequestMessage) returns (stream ResponseMessage) {}
}
服务端 Server.go
package main
import (
"context"
"fmt"
"google.golang.org/grpc"
"google.golang.org/grpc/reflection"
"grpc/pb"
"io"
"log"
"net"
)
type Server struct{}
// SayUnaryHello 简单 RPC(Unary RPC)
// 一般的 RPC 调用,传入一个请求对象,返回一个返回对象
func (s *Server) SayUnaryHello(ctx context.Context, req *pb.RequestMessage) (*pb.ResponseMessage, error) {
log.Println(req.Name, req.Message)
return &pb.ResponseMessage{Name: "服务器大爷", Message: "收到,吱"}, nil
}
// SayServerStreamHello 服务端流式 RPC(Server streaming RPC)
// 传入一个请求对象,服务端可以返回多个结果对象
func (s *Server) SayServerStreamHello(request *pb.RequestMessage, server pb.Greets_SayServerStreamHelloServer) error {
fmt.Println("request: ", request)
var err error
menu := [2]string{"红烧肉", "黄花鱼"}
for i := 0; i < 2; i++ {
err = server.Send(&pb.ResponseMessage{Name: "服务器大爷", Message: "晚上吃:" + " " + menu[i]})
if err != nil {
log.Println(err)
return err
}
}
return nil
}
// SayClientStreamHello 客户端流式 RPC(Clientstreaming RPC)
// 客户端传入多个请求对象,服务端返回一个结果对象
func (s *Server) SayClientStreamHello(server pb.Greets_SayClientStreamHelloServer) error {
for {
recv, err := server.Recv()
// 接收完所有数据之后再响应
if err == io.EOF {
err := server.SendAndClose(&pb.ResponseMessage{Name: "服务器大爷", Message: "太哆嗦了,结束"})
if err != nil {
log.Println(err)
return err
}
return nil
} else if err != nil {
return err
}
fmt.Println("recv: ", recv)
}
}
// SayBiDirectionalStreamHello 双向流式 RPC(Bi-directional streaming RPC)
// 结合客户端流式RPC和服务端流式RPC,可以传⼊多个请求对象,返回多个结果对
func (s *Server) SayBiDirectionalStreamHello(server pb.Greets_SayBiDirectionalStreamHelloServer) error {
for {
recv, err := server.Recv()
// 接收完所有数据之后再响应
if err == io.EOF {
fmt.Println("数据接收完毕!")
// 返回两个菜
menu := [2]string{"红烧肉", "黄花鱼"}
err = server.Send(&pb.ResponseMessage{Name: "服务器大爷", Message: "晚上吃:" + " " + menu[0]})
err = server.Send(&pb.ResponseMessage{Name: "服务器大爷", Message: "晚上吃:" + " " + menu[1]})
if err != nil {
log.Println(err)
return err
}
return nil
} else if err != nil {
return err
}
fmt.Println("recv: ", recv)
}
}
func main() {
listen, err := net.Listen("tcp", ":8082")
if err != nil {
fmt.Println("Error listening", err)
return
}
// 定义一个 rpc 的 server
server := grpc.NewServer()
// 注册服务,注册 sayHello 接口
pb.RegisterGreetsServer(server, &Server{})
// 映射绑定
reflection.Register(server)
// 启动服务
err = server.Serve(listen)
if err != nil {
fmt.Println("Error serving", err)
return
}
}
客户端 client.go
package main
import (
"context"
"fmt"
"google.golang.org/grpc"
"grpc/pb"
"io"
"log"
"time"
)
func sayUnaryClient() {
// 创建一个 grpc 连接
conn, err := grpc.Dial("localhost:8082", grpc.WithInsecure())
if err != nil {
fmt.Println(err)
return
}
defer conn.Close()
// 创建 grpc 客户端
client := pb.NewGreetsClient(conn)
// 设置超时时间
_, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
// 调用方法
sendMessage := &pb.RequestMessage{
Name: "client1", Message: "server 大爷,收到我信息了没有,收到了请吱一声",
}
log.Println("client send:", sendMessage)
reply, err := client.SayUnaryHello(context.Background(), sendMessage)
if err != nil {
log.Fatalf("couldn not greet: %v", err)
}
log.Printf("serve reply:name: %s, message: %s\n", reply.Name, reply.Message)
}
func sayStreamClient() {
// 创建一个 grpc 连接
conn, err := grpc.Dial("localhost:8082", grpc.WithInsecure())
if err != nil {
fmt.Println(err)
return
}
defer conn.Close()
// 创建 grpc 客户端
client := pb.NewGreetsClient(conn)
// 设置超时时间
_, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
// 调用方法
sendMessage := &pb.RequestMessage{
Name: "client1", Message: "server 大爷,晚上吃什么?",
}
log.Println("client send:", sendMessage)
reply, err := client.SayServerStreamHello(context.Background(), sendMessage)
if err != nil {
log.Fatalf("couldn not greet: %v", err)
}
for {
recv, err := reply.Recv()
if err != nil {
log.Fatalf("couldn not greet: %v", err)
break
}
fmt.Println(recv)
}
}
func sayClientStreamHello() {
// 创建一个 grpc 连接
conn, err := grpc.Dial("localhost:8082", grpc.WithInsecure())
if err != nil {
fmt.Println(err)
return
}
defer conn.Close()
// 创建 grpc 客户端
client := pb.NewGreetsClient(conn)
// 设置超时时间
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
//调用rpc方法,得到一个客户端用于循环发送数据
gclient, err := client.SayClientStreamHello(ctx)
requestNumber := 2 // 请求2次
for i := 0; i < requestNumber; i++ {
// 调用方法
sendMessage := &pb.RequestMessage{
Name: "client1", Message: "大爷你好?",
}
sendMessage2 := &pb.RequestMessage{
Name: "client1", Message: "大爷在吗?",
}
if i == 0 {
err = gclient.Send(sendMessage)
log.Println("client send:", sendMessage)
} else {
err = gclient.Send(sendMessage2)
log.Println("client send:", sendMessage2)
}
if err != nil {
log.Fatalf("couldn not greet: %v", err)
return
}
if i >= 1 {
res, err := gclient.CloseAndRecv()
if err != nil {
fmt.Println(err)
break
}
fmt.Println(res)
break
}
}
}
func sayBiDirectionalStreamHello() {
// 创建一个 grpc 连接
conn, err := grpc.Dial("localhost:8082", grpc.WithInsecure())
if err != nil {
fmt.Println(err)
return
}
defer conn.Close()
// 创建 grpc 客户端
client := pb.NewGreetsClient(conn)
// 设置超时时间
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
//调用rpc方法,得到一个客户端用于循环发送数据
gclient, err := client.SayBiDirectionalStreamHello(ctx)
requestNumber := 2 // 请求2次
for i := 0; i < requestNumber; i++ {
// 调用方法
sendMessage := &pb.RequestMessage{
Name: "client1", Message: "大爷你好?晚上吃什么?",
}
sendMessage2 := &pb.RequestMessage{
Name: "client1", Message: "大爷在吗?晚上吃什么?",
}
if i == 0 {
err = gclient.Send(sendMessage)
log.Println("client send:", sendMessage)
} else {
err = gclient.Send(sendMessage2)
log.Println("client send:", sendMessage2)
}
if err != nil {
log.Fatalf("couldn not greet: %v", err)
return
}
if i >= 1 {
err := gclient.CloseSend()
if err != nil {
fmt.Println(err)
break
}
break
}
}
// 接收
for {
recv, err := gclient.Recv()
if err == io.EOF {
log.Println("数据接收完毕")
break
}
fmt.Println(recv)
}
}
func main() {
sayUnaryClient()
sayStreamClient()
sayClientStreamHello()
sayBiDirectionalStreamHello()
}