目录
- rpc
- RPC调用
- net/rpc
- RPC over HTTP 和 RESTful
- server
- client
- RPC over TCP 和 RESTful
- server
- client
- 序列化/反序列化协议
- json序列化
- server
- client
- python调用rpc
- RPC原理
- rpc框架比较
- grpc
- why gRpc
- gRPC与Protobuf介绍
- 安装gRPC和Protobuf
- 检查
- gRPC的开发方式
- 编写proto代码
- 编写Server端Go代码
- 编写Client端Go代码
- gRPC跨语言调用
- python
- 生成Python代码
- 流式RPC
- 单向流式server
- 单向流式client
- 双向流式
rpc
-
远程过程调用(Remote Procedure Call,缩写为 RPC)是一个计算机通信协议。
该协议允许运行于一台计算机的程序调用另一台计算机的子程序,而程序员无需额外地为这个交互作用编程。 -
它允许像调用本地服务一样调用远程服务。
-
RPC是一种服务器-客户端(Client/Server)模式,经典实现是一个通过发送请求-接受回应进行信息交互的系统。
4. 从上图可以看出, RPC 本身是 client-server模型,也是一种 request-response 协议。
RPC调用
-
使用 RPC 的目的是让我们调用远程方法像调用本地方法一样无差别。并且RESTful API通常是基于HTTP协议,传输数据采用JSON等文本协议,相较于RPC 直接使用TCP协议,传输数据多采用二进制协议来说,RPC通常相比RESTful API性能会更好。
-
RESTful API多用于前后端之间的数据传输,而目前微服务架构下各个微服务之间多采用RPC调用。
net/rpc
-
RPC 的消息传输可以通过 TCP、UDP 或者 HTTP等,所以有时候我们称之为 RPC over TCP、 RPC over HTTP(RPC 通过HTTP 传输消息的时候和 RESTful的架构是类似的,但是也有不同)。
-
Go语言的 rpc 包提供对通过网络或其他 i/o 连接导出的对象方法的访问,服务器注册一个对象,并把它作为服务对外可见(服务名称就是类型名称)。注册后,对象的导出方法将支持远程访问。服务器可以注册不同类型的多个对象(服务) ,但是不支持注册同一类型的多个对象。
RPC over HTTP 和 RESTful
-
RPC 的客户端和服务器端是紧耦合的,客户端需要知道调用的过程的名字,过程的参数以及它们的类型、顺序等。一旦服务器更改了过程的实现,客户端的实现很容易出问题。RESTful基于 http的语义操作资源,参数的顺序一般没有关系,也很容易的通过代理转换链接和资源位置,从这一点上来说,RESTful 更灵活。
-
它们操作的对象不一样。 RPC 操作的是方法和过程,它要操作的是方法对象。RESTful 操作的是资源(resource),而不是方法。
-
RESTful执行的是对资源的操作,增加、查找、修改和删除等,主要是CURD,所以如果要实现一个特定目的的操作,比如为名字姓张的学生的数学成绩都加上10这样的操作,RESTful的API设计起来就不是那么直观或者有意义。在这种情况下, RPC的实现更有意义,它可以实现一个Student.Increment(Name, Score) 的方法供客户端调用。
server
package main
import (
"log"
"net"
"net/http"
"net/rpc"
)
type Args struct {
X, Y int
}
// ServiceA 自定义一个结构体类型
type ServiceA struct{}
// Add 为ServiceA类型增加一个可导出的Add方法
func (s *ServiceA) Add(args *Args, reply *int) error {
*reply = args.X + args.Y
return nil
}
func main() {
service := new(ServiceA)
rpc.Register(service) // 注册RPC服务
rpc.HandleHTTP() // 基于HTTP协议
l, e := net.Listen("tcp", ":9091")
if e != nil {
log.Fatal("listen error:", e)
}
http.Serve(l, nil)
}
client
package main
import (
"fmt"
"log"
"net/rpc"
)
func main() {
// 建立HTTP连接
client, err := rpc.DialHTTP("tcp", "127.0.0.1:9091")
if err != nil {
log.Fatal("dialing:", err)
}
// 同步调用
args := &Args{10, 20}
var reply int
err = client.Call("ServiceA.Add", args, &reply)
if err != nil {
log.Fatal("ServiceA.Add error:", err)
}
fmt.Printf("ServiceA.Add: %d+%d=%d\n", args.X, args.Y, reply)
// 异步调用
var reply2 int
divCall := client.Go("ServiceA.Add", args, &reply2, nil)
replyCall := <-divCall.Done // 接收调用结果
fmt.Println(replyCall.Error)
fmt.Println(reply2)
}
RPC over TCP 和 RESTful
-
如果直接使用socket实现 RPC,除了上面的不同外,可以获得性能上的优势。
-
RPC over TCP可以通过长连接减少连接的建立所产生的花费,在调用次数非常巨大的时候(这是目前互联网公司经常遇到的情况,大并发的情况下),这个花费影响是非常巨大的。 RESTful 也可以通过 keep-alive 实现长连接, 但是它最大的一个问题是它的request-response模型是阻塞的 (http1.0和 http1.1, http 2.0没这个问题),发送一个请求后只有等到response返回才能发送第二个请求 (有些http server实现了pipeling的功能,但不是标配), RPC的实现没有这个限制。
server
package main
import (
"log"
"net"
"net/rpc"
)
func main() {
service := new(ServiceA)
rpc.Register(service) // 注册RPC服务
l, e := net.Listen("tcp", ":9091")
if e != nil {
log.Fatal("listen error:", e)
}
for {
conn, _ := l.Accept()
rpc.ServeConn(conn)
}
}
client
package main
import (
"fmt"
"log"
"net/rpc"
)
func main() {
// 建立TCP连接
client, err := rpc.Dial("tcp", "127.0.0.1:9091")
if err != nil {
log.Fatal("dialing:", err)
}
// 同步调用
args := &Args{10, 20}
var reply int
err = client.Call("ServiceA.Add", args, &reply)
if err != nil {
log.Fatal("ServiceA.Add error:", err)
}
fmt.Printf("ServiceA.Add: %d+%d=%d\n", args.X, args.Y, reply)
// 异步调用
var reply2 int
divCall := client.Go("ServiceA.Add", args, &reply2, nil)
replyCall := <-divCall.Done // 接收调用结果
fmt.Println(replyCall.Error)
fmt.Println(reply2)
}
序列化/反序列化协议
rpc 包默认使用的是 gob 协议对传输数据进行序列化/反序列化,比较有局限性。
json序列化
server
package main
import (
"log"
"net"
"net/rpc"
"net/rpc/jsonrpc"
)
func main() {
service := new(ServiceA)
rpc.Register(service) // 注册RPC服务
l, e := net.Listen("tcp", ":9091")
if e != nil {
log.Fatal("listen error:", e)
}
for {
conn, _ := l.Accept()
// 使用JSON协议
rpc.ServeCodec(jsonrpc.NewServerCodec(conn))
}
}
client
package main
import (
"fmt"
"log"
"net"
"net/rpc"
"net/rpc/jsonrpc"
)
func main() {
// 建立TCP连接
conn, err := net.Dial("tcp", "127.0.0.1:9091")
if err != nil {
log.Fatal("dialing:", err)
}
// 使用JSON协议
client := rpc.NewClientWithCodec(jsonrpc.NewClientCodec(conn))
// 同步调用
args := &Args{10, 20}
var reply int
err = client.Call("ServiceA.Add", args, &reply)
if err != nil {
log.Fatal("ServiceA.Add error:", err)
}
fmt.Printf("ServiceA.Add: %d+%d=%d\n", args.X, args.Y, reply)
// 异步调用
var reply2 int
divCall := client.Go("ServiceA.Add", args, &reply2, nil)
replyCall := <-divCall.Done // 接收调用结果
fmt.Println(replyCall.Error)
fmt.Println(reply2)
}
python调用rpc
import socket
import json
request = {
"id": 0,
"params": [{"x":10, "y":20}], # 参数要对应上Args结构体
"method": "ServiceA.Add"
}
client = socket.create_connection(("127.0.0.1", 9091),5)
client.sendall(json.dumps(request).encode())
rsp = client.recv(1024)
rsp = json.loads(rsp.decode())
print(rsp)
RPC原理
RPC 让远程调用就像本地调用一样,其调用过程可拆解为以下步骤:
① 服务调用方(client)以本地调用方式调用服务;
② client stub接收到调用后负责将方法、参数等组装成能够进行网络传输的消息体;
③ client stub找到服务地址,并将消息发送到服务端;
④ server 端接收到消息;
⑤ server stub收到消息后进行解码;
⑥ server stub根据解码结果调用本地的服务;
⑦ 本地服务执行并将结果返回给server stub;
⑧ server stub将返回结果打包成能够进行网络传输的消息体;
⑨ 按地址将消息发送至调用方;
⑩ client 端接收到消息;
⑪ client stub收到消息并进行解码;
⑫ 调用方得到最终结果。
使用RPC框架的目标是只需要关心第1步和最后1步,中间的其他步骤统统封装起来,让使用者无需关心。各式RPC框架(grpc、thrift等)就是为了让RPC调用更方便。
rpc框架比较
-
Dubbo 是阿里巴巴公司开源的一个Java高性能优秀的服务框架,使得应用可通过高性能的 RPC 实现服务的输出和输入功能,可以和 Spring框架无缝集成。不过,略有遗憾的是,据说在淘宝内部,dubbo由于跟淘宝另一个类似的框架HSF(非开源)有竞争关系,导致dubbo团队已经解散。
-
Motan是新浪微博开源的一个Java 框架。它诞生的比较晚,起于2013年,2016年5月开源。Motan 在微博平台中已经广泛应用,每天为数百个服务完成近千亿次的调用。
-
rpcx是Go语言生态圈的Dubbo, 比Dubbo更轻量,实现了Dubbo的许多特性,借助于Go语言优秀的并发特性和简洁语法,可以使用较少的代码实现分布式的RPC服务。
-
gRPC是Google开发的高性能、通用的开源RPC框架,其由Google主要面向移动应用开发并基于HTTP/2协议标准而设计,基于ProtoBuf(Protocol Buffers)序列化协议开发,且支持众多开发语言。本身它不是分布式的,所以要实现上面的框架的功能需要进一步的开发。
-
thrift是Apache的一个跨语言的高性能的服务框架,也得到了广泛的应用。
Dubbo | Montan | rpcx | gRPC | Thrift | |
---|---|---|---|---|---|
开发语言 | Java | Java | Go | 跨语言 | 跨语言 |
分布式(服务治理) | √ | √ | √ | × | × |
多序列化框架支持 | √ | √ (当前支持Hessian2、Json,可扩展) | √ | × (只支持protobuf) | × (thrift格式) |
多种注册中心 | √ | √ | √ | × | × |
管理中心 | √ | √ | √ | × | × |
跨编程语言 | × | × (支持php client和C server) | × | √ | √ |
grpc
-
gRPC由google开发,是一款语言中立、平台中立、开源的远程过程调用系统。
-
gRPC是一种现代化开源的高性能RPC框架,能够运行于任意环境之中。最初由谷歌进行开发。它使用HTTP/2作为传输协议。
why gRpc
- 使用gRPC, 可以一次性的在一个
.proto
文件中定义服务并使用任何支持它的语言去实现客户端和服务端。 - 使用protocol buffers还能获得其他好处,包括高效的序列化,简单的IDL以及容易进行接口更新。
IDL:IDL(Interface description language)是指接口描述语言,是用来描述软件组件接口的一种计算机语言,是跨平台开发的基础。IDL通过一种中立的方式来描述接口,使得在不同平台上运行的对象和用不同语言编写的程序可以相互通信交流;比如,一个组件用C++写成,另一个组件用Go写成。
gRPC与Protobuf介绍
- 微服务架构中,由于每个服务对应的代码库是独立运行的,无法直接调用,彼此间的通信就是个大问题。
- gRPC可以实现微服务,将大的项目拆分为多个小且独立的业务模块,也就是服务,各服务间使用高效的protobuf协议进行RPC调用,gRPC默认使用protocol buffers,这是
google开源的一套成熟的结构数据序列化机制(当然也可以使用其他数据格式如JSON) - 可以用proto files创建gRPC服务,用message类型来定义方法参数和返回类型。
安装gRPC和Protobuf
-
安装grpc:
go get google.golang.org/grpc
-
安装Protocol Buffers v3:下载适合平台的预编译好的二进制文件,将下载得到的可执行文件
protoc
所在的 bin 目录加到电脑的环境变量中。- 适用Windows 64位protoc-3.20.1-win64.zip
- 适用于Mac Intel 64位protoc-3.20.1-osx-x86_64.zip
- 适用于Mac ARM 64位protoc-3.20.1-osx-aarch_64.zip
- 适用于Linux 64位protoc-3.20.1-linux-x86_64.zip
-
安装go语言protobuf插件:
go install google.golang.org/protobuf/cmd/protoc-gen-go@v1.28
该插件会根据.proto文件生成一个后缀为.pb.go的文件,包含所有.proto文件中定义的类型及其序列化方法。 -
安装grpc插件:
go install google.golang.org/grpc/cmd/protoc-gen-go-grpc@v1.2
,该插件会生成一个后缀为_grpc.pb.go的文件,其中包含:一种接口类型(或存根) ,供客户端调用的服务方法,服务器要实现的接口类型。
检查
❯ protoc --version
libprotoc 3.20.1
❯ protoc-gen-go --version
protoc-gen-go v1.28.0
❯ protoc-gen-go-grpc --version
protoc-gen-go-grpc 1.2.0
gRPC的开发方式
像许多 RPC 系统一样,gRPC 基于定义服务的思想,指定可以通过参数和返回类型远程调用的方法。默认情况下,gRPC 使用 protocol buffers作为接口定义语言(IDL)来描述服务接口和有效负载消息的结构。可以根据需要使用其他的IDL代替。
编写proto代码
Protocol Buffers是一种与语言无关,平台无关的可扩展机制,用于序列化结构化数据。使用Protocol Buffers可以一次定义结构化的数据,然后可以使用特殊生成的源代码轻松地在各种数据流中使用各种语言编写和读取结构化数据。
syntax = "proto3"; // 版本声明,使用Protocol Buffers v3版本
option go_package = "xx"; // 指定生成的Go代码在你项目中的导入路径
package pb; // 包名
// 定义服务
service Greeter {
// SayHello 方法
rpc SayHello (HelloRequest) returns (HelloResponse) {}
}
// 请求消息
message HelloRequest {
string name = 1;
}
// 响应消息
message HelloResponse {
string reply = 1;
}
编写Server端Go代码
修改proto文件
// ...
option go_package = "hello_server/pb";
// ...
目录结构:
hello_server
├── go.mod
├── go.sum
├── main.go
└── pb
└── hello.proto
生成pb文件:
protoc --go_out=. --go_opt=paths=source_relative --go-grpc_out=. --go-grpc_opt=paths=source_relative pb/hello.proto
目录结构:
hello_server
├── go.mod
├── go.sum
├── main.go
└── pb
├── hello.pb.go
├── hello.proto
└── hello_grpc.pb.go
server:
package main
import (
"context"
"fmt"
"hello_server/pb"
"net"
"google.golang.org/grpc"
)
// hello server
type server struct {
pb.UnimplementedGreeterServer
}
func (s *server) SayHello(ctx context.Context, in *pb.HelloRequest) (*pb.HelloResponse, error) {
return &pb.HelloResponse{Reply: "Hello " + in.Name}, nil
}
func main() {
// 监听本地的8972端口
lis, err := net.Listen("tcp", ":8972")
if err != nil {
fmt.Printf("failed to listen: %v", err)
return
}
s := grpc.NewServer() // 创建gRPC服务器
pb.RegisterGreeterServer(s, &server{}) // 在gRPC服务端注册服务
// 启动服务
err = s.Serve(lis)
if err != nil {
fmt.Printf("failed to serve: %v", err)
return
}
}
编写Client端Go代码
修改proto文件:
// ...
option go_package = "hello_client/pb";
// ...
生成pb文件:
protoc --go_out=. --go_opt=paths=source_relative --go-grpc_out=. --go-grpc_opt=paths=source_relative pb/hello.proto
目录结构:
http_client
├── go.mod
├── go.sum
├── main.go
└── pb
├── hello.pb.go
├── hello.proto
└── hello_grpc.pb.go
client:
package main
import (
"context"
"flag"
"log"
"time"
"hello_client/pb"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
)
// hello_client
const (
defaultName = "world"
)
var (
addr = flag.String("addr", "127.0.0.1:8972", "the address to connect to")
name = flag.String("name", defaultName, "Name to greet")
)
func main() {
flag.Parse()
// 连接到server端,此处禁用安全传输
conn, err := grpc.Dial(*addr, grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
log.Fatalf("did not connect: %v", err)
}
defer conn.Close()
c := pb.NewGreeterClient(conn)
// 执行RPC调用并打印收到的响应数据
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
r, err := c.SayHello(ctx, &pb.HelloRequest{Name: *name})
if err != nil {
log.Fatalf("could not greet: %v", err)
}
log.Printf("Greeting: %s", r.GetReply())
}
gRPC跨语言调用
python
# grpc
python -m pip install grpcio
# tool
python -m pip install grpcio-tools
生成Python代码
- 新建一个py_client目录,将hello.proto文件保存到py_client/pb/目录下。
- 在py_client目录下执行以下命令,生成python源码文件。
python3 -m grpc_tools.protoc -Ipb --python_out=. --grpc_python_out=. pb/hello.proto
py_client
├── client.py
├── hello_pb2.py
├── hello_pb2_grpc.py
└── pb
└── hello.proto
-----------------------------------------
import logging
import grpc
import hello_pb2
import hello_pb2_grpc
def run():
# NOTE(gRPC Python Team): .close() is possible on a channel and should be
# used in circumstances in which the with statement does not fit the needs
# of the code.
with grpc.insecure_channel('127.0.0.1:8972') as channel:
stub = hello_pb2_grpc.GreeterStub(channel)
resp = stub.SayHello(hello_pb2.HelloRequest(name='q1mi'))
print("Greeter client received: " + resp.reply)
if __name__ == '__main__':
logging.basicConfig()
run()
流式RPC
客户端发起了一个RPC请求到服务端,服务端进行业务处理并返回响应给客户端,这是gRPC最基本的一种工作方式(Unary RPC)。除此之外,依托于HTTP2,gRPC还支持流式RPC(Streaming RPC)。
单向流式server
客户端发出一个RPC请求,服务端与客户端之间建立一个单向的流,服务端可以向流中写入多个响应消息,最后主动关闭流;而客户端需要监听这个流,不断获取响应直到流关闭。
- 定义服务(修改.proto文件后,需要重新使用 protocol buffers编译器生成客户端和服务端代码。)
// 服务端返回流式数据
rpc LotsOfReplies(HelloRequest) returns (stream HelloResponse);
- 实现 LotsOfReplies 方法。
// LotsOfReplies 返回使用多种语言打招呼
func (s *server) LotsOfReplies(in *pb.HelloRequest, stream pb.Greeter_LotsOfRepliesServer) error {
words := []string{
"你好",
"hello",
"fuck you",
"안녕하세요",
}
for _, word := range words {
data := &pb.HelloResponse{
Reply: word + in.GetName(),
}
// 使用Send方法返回多个数据
if err := stream.Send(data); err != nil {
return err
}
}
return nil
}
- 客户端调用LotsOfReplies 并将收到的数据依次打印出来
func runLotsOfReplies(c pb.GreeterClient) {
// server端流式RPC
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
stream, err := c.LotsOfReplies(ctx, &pb.HelloRequest{Name: *name})
if err != nil {
log.Fatalf("c.LotsOfReplies failed, err: %v", err)
}
for {
// 接收服务端返回的流式数据,当收到io.EOF或错误时退出
res, err := stream.Recv()
if err == io.EOF {
break
}
if err != nil {
log.Fatalf("c.LotsOfReplies failed, err: %v", err)
}
log.Printf("got reply: %q\n", res.GetReply())
}
}
单向流式client
- 定义服务
// 客户端发送流式数据
rpc LotsOfGreetings(stream HelloRequest) returns (HelloResponse);
- 服务端实现LotsOfGreetings方法
// LotsOfGreetings 接收流式数据
func (s *server) LotsOfGreetings(stream pb.Greeter_LotsOfGreetingsServer) error {
reply := "你好:"
for {
// 接收客户端发来的流式数据
res, err := stream.Recv()
if err == io.EOF {
// 最终统一回复
return stream.SendAndClose(&pb.HelloResponse{
Reply: reply,
})
}
if err != nil {
return err
}
reply += res.GetName()
}
}
- 客户端调用LotsOfGreetings方法,向服务端发送流式请求数据,接收返回值并打印
func runLotsOfGreeting(c pb.GreeterClient) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
// 客户端流式RPC
stream, err := c.LotsOfGreetings(ctx)
if err != nil {
log.Fatalf("c.LotsOfGreetings failed, err: %v", err)
}
names := []string{"", "i", ""}
for _, name := range names {
// 发送流式数据
err := stream.Send(&pb.HelloRequest{Name: name})
if err != nil {
log.Fatalf("c.LotsOfGreetings stream.Send(%v) failed, err: %v", name, err)
}
}
res, err := stream.CloseAndRecv()
if err != nil {
log.Fatalf("c.LotsOfGreetings failed: %v", err)
}
log.Printf("got reply: %v", res.GetReply())
}
双向流式
双向流式RPC即客户端和服务端均为流式的RPC,能发送多个请求对象也能接收到多个响应对象。典型应用示例:聊天应用等。
- 定义服务
// 双向流式数据
rpc BidiHello(stream HelloRequest) returns (stream HelloResponse);
- 服务端实现BidiHello方法。
// BidiHello 双向流式打招呼
func (s *server) BidiHello(stream pb.Greeter_BidiHelloServer) error {
for {
// 接收流式请求
in, err := stream.Recv()
if err == io.EOF {
return nil
}
if err != nil {
return err
}
reply := magic(in.GetName()) // 对收到的数据做些处理
// 返回流式响应
if err := stream.Send(&pb.HelloResponse{Reply: reply}); err != nil {
return err
}
}
}
- 客户端调用BidiHello方法,一边从终端获取输入的请求数据发送至服务端,一边从服务端接收流式响应。
func runBidiHello(c pb.GreeterClient) {
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Minute)
defer cancel()
// 双向流模式
stream, err := c.BidiHello(ctx)
if err != nil {
log.Fatalf("c.BidiHello failed, err: %v", err)
}
waitc := make(chan struct{})
go func() {
for {
// 接收服务端返回的响应
in, err := stream.Recv()
if err == io.EOF {
// read done.
close(waitc)
return
}
if err != nil {
log.Fatalf("c.BidiHello stream.Recv() failed, err: %v", err)
}
fmt.Printf("AI:%s\n", in.GetReply())
}
}()
// 从标准输入获取用户输入
reader := bufio.NewReader(os.Stdin) // 从标准输入生成读对象
for {
cmd, _ := reader.ReadString('\n') // 读到换行
cmd = strings.TrimSpace(cmd)
if len(cmd) == 0 {
continue
}
if strings.ToUpper(cmd) == "QUIT" {
break
}
// 将获取到的数据发送至服务端
if err := stream.Send(&pb.HelloRequest{Name: cmd}); err != nil {
log.Fatalf("c.BidiHello stream.Send(%v) failed: %v", cmd, err)
}
}
stream.CloseSend()
<-waitc
}