gRPC Hello World
protoc 是 Protobuf 的核心工具,用于编写 .proto
文件并生成 protobuf 代码。在这里,以 Go 语言代码为例,进行 gRPC 相关代码编写。
- 下载 protoc 工具:https://github.com/protocolbuffers/protobuf/releases,并添加到环境变量。
- 下载依赖包:
go install google.golang.org/protobuf/cmd/protoc-gen-go@v1.28
go install google.golang.org/grpc/cmd/protoc-gen-go-grpc@v1.2
- 定义目录结构,在 proto 目录下编写
helloworld.proto
文件:
syntax="proto3";
package proto;
option go_package = "./;proto";
service Greeter {
rpc SayHello (HelloRequest) returns (HelloReply) {}
}
message HelloRequest {
string name = 1;
}
message HelloReply {
string message = 1;
}
运行 protoc -I . helloworld.proto --go_out=:. --go-grpc_out=require_unimplemented_servers=false:.
命令用于生成与 Protocol Buffers 和 gRPC 服务相关的代码。
- 分别在 server 目录和 client 目录编写服务端和客户端代码,启动 sever 和 client,client 端可看到运行结果:Hello: Alex。
// server.go
package main
import (
"context"
"net"
"google.golang.org/grpc"
"your_moudle/proto"
)
type Server struct {}
func (s *Server) SayHello(ctx context.Context, request *proto.HelloRequest) (*proto.HelloReply, error) {
return &proto.HelloReply{Message: "Hello " + request.GetName()}, nil
}
func main() {
lis, err := net.Listen("tcp", "0.0.0.0:8000")
if err != nil {
panic("failed to listen: " + err.Error())
}
g := grpc.NewServer()
proto.RegisterGreeterServer(g, &Server{})
err = g.Serve(lis)
if err != nil {
panic("failed to start grpc: " + err.Error())
}
}
// client.go
package main
import (
"context"
"fmt"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"your_moudle/proto"
)
func main() {
conn, err := grpc.Dial("127.0.0.1:8000", grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
panic(err)
}
defer conn.Close()
c := proto.NewGreeterClient(conn)
r, err := c.SayHello(context.Background(), &proto.HelloRequest{Name: "Alex"})
if err != nil {
panic(err)
}
fmt.Println(r.Message)
}
从上面例子中可以看出,一个完整的客户端与服务端进行通信的流程如下:
服务端:
- 服务端实现 proto 文件中定义的接口:
func (s *Server) SayHello(ctx context.Context, request *proto.HelloRequest) (*proto.HelloReply, error) {
return &proto.HelloReply{Message: "Hello " + request.GetName()}, nil
}
- 监听 TCP 端口:
lis, _ := net.Listen("tcp", "0.0.0.0:8000")
- 注册和启动服务:
g := grpc.NewServer()
proto.RegisterGreeterServer(g, &Server{})
_ = g.Serve(lis)
客户端:
- 建立 TCP 连接:
conn, _ := grpc.Dial("127.0.0.1:8000", grpc.WithTransportCredentials(insecure.NewCredentials()))
- 创建 Client:
c := proto.NewGreeterClient(conn)
- 执行 RPC 调用,并接收信息:
r, _ := c.SayHello(context.Background(), &proto.HelloRequest{Name: "Alex"})
gRPC 内部怎样实现方法调用
服务端
通过 NewServer()
方法初始化一个 gRPC server,用于后续注册服务以及接受请求
创建带默认值的 gRPC server 结构体对象,初始化描述协议的各种参数选项,包括发送和接收的消息大小、buffer大小等等各种,类似于 http Headers。
func NewServer(opt ...ServerOption) *Server {
// 描述协议的各种参数选项,包括发送和接收的消息大小、buffer大小等等各种,类似于 http Headers
opts := defaultServerOptions
for _, o := range globalServerOptions {
o.apply(&opts)
}
for _, o := range opt {
o.apply(&opts)
}
// 创建带默认值的 gRPC server 结构体对象
s := &Server{
lis: make(map[net.Listener]bool),
opts: opts,
conns: make(map[string]map[transport.ServerTransport]bool),
services: make(map[string]*serviceInfo),
quit: grpcsync.NewEvent(),
done: grpcsync.NewEvent(),
czData: new(channelzData),
}
// 配置拦截器
chainUnaryServerInterceptors(s)
chainStreamServerInterceptors(s)
s.cv = sync.NewCond(&s.mu)
// 配置简单的链路工具
if EnableTracing {
_, file, line, _ := runtime.Caller(1)
s.events = trace.NewEventLog("grpc.Server", fmt.Sprintf("%s:%d", file, line))
}
if s.opts.numServerWorkers > 0 {
s.initServerWorkers()
}
s.channelzID = channelz.RegisterServer(&channelzServer{s}, "")
channelz.Info(logger, s.channelzID, "Server created")
return s
}
通过 RegisterService
方法来实现服务注册
使用 proto.RegisterGreeterServer(g, &Server{})
来注册服务时,其在 xxx_grpc.pb.go
中通过 RegisterService
方法来实现服务注册。
func RegisterGreeterServer(s grpc.ServiceRegistrar, srv GreeterServer) {
// 第二个参数为我们自定义实现了相应接口的实现类。
s.RegisterService(&Greeter_ServiceDesc, srv)
}
// Greeter_ServiceDesc 是 greter 服务的 grpc.ServiceDesc,不能被自省或修改(即使作为副本)。
var Greeter_ServiceDesc = grpc.ServiceDesc{
// 声明了名称、路由、方法及其他元数据属性
ServiceName: "proto.Greeter",
HandlerType: (*GreeterServer)(nil),
Methods: []grpc.MethodDesc{
{
MethodName: "SayHello",
Handler: _Greeter_SayHello_Handler,
},
},
Streams: []grpc.StreamDesc{},
Metadata: "helloworld.proto",
}
RegisterService
方法的具体实现如下:
func (s *Server) RegisterService(sd *ServiceDesc, ss interface{}) {
// 判断 ServiceServer 是否实现 ServiceDesc 中描述的 HandlerType,如果实现了则调用 s.register 方法注册
if ss != nil {
ht := reflect.TypeOf(sd.HandlerType).Elem()
st := reflect.TypeOf(ss)
if !st.Implements(ht) {
logger.Fatalf("grpc: Server.RegisterService found the handler of type %v that does not satisfy %v", st, ht)
}
}
s.register(sd, ss)
}
首先做一些前置判断,接着 register 根据 sd 中的 Method 创建对应的 map,并将名称作为键,方法描述(指针)作为值,添加到相应的 map 中,最后按照服务名为 key,将 serviceInfo 信息注入到 Server 的 services map 中。
func (s *Server) register(sd *ServiceDesc, ss interface{}) {
s.mu.Lock()
defer s.mu.Unlock()
// 一些前置性判断,注册服务必须要在 server() 方法之前调用
s.printf("RegisterService(%q)", sd.ServiceName)
if s.serve {
logger.Fatalf("grpc: Server.RegisterService after Server.Serve for %q", sd.ServiceName)
}
if _, ok := s.services[sd.ServiceName]; ok {
logger.Fatalf("grpc: Server.RegisterService found duplicate service registration for %q", sd.ServiceName)
}
// 在 Server 结构体中,services 字段存放的是 {service name -> service info} map
// serviceInfo 中有两个重要的属性:methods 和 streams
info := &serviceInfo{
serviceImpl: ss,
methods: make(map[string]*MethodDesc),
streams: make(map[string]*StreamDesc),
mdata: sd.Metadata,
}
// register 根据 sd 中的 Method 创建对应的 map,并将名称作为键,方法描述(指针)作为值,添加到相应的 map 中
for i := range sd.Methods {
d := &sd.Methods[i]
info.methods[d.MethodName] = d
}
for i := range sd.Streams {
d := &sd.Streams[i]
info.streams[d.StreamName] = d
}
// 按照服务名为 key,将 serviceInfo 信息注入到 Server 的 services map 中
s.services[sd.ServiceName] = info
}
从 register
方法可以看出,其实对于不同的 RPC 请求,根据 services 中不同的 serviceName 去 services map 中取出不同的 handler 进行处理.
通过 Serve()
启动服务
通过死循环的方式在某一个端口实现监听,然后 client 对这个端口发起连接请求,握手成功后建立连接,然后 server 处理 client 发送过来的请求数据,根据请求参数,调用不同的 handler 进行处理,回写响应数据。
func (s *Server) Serve(lis net.Listener) error {
// ...
for {
rawConn, err := lis.Accept()
// ...
s.serveWG.Add(1)
go func() {
// gRPC 是基于 HTTP2.0 实现
// handleRawConn 实现了 http 的 handshake
s.handleRawConn(lis.Addr().String(), rawConn)
s.serveWG.Done()
}()
}
//...
}
func (s *Server) handleRawConn(lisAddr string, rawConn net.Conn) {
// 。。。
// Finish handshaking (HTTP2)
st := s.newHTTP2Transport(rawConn)
rawConn.SetDeadline(time.Time{})
// 。。。
go func() {
// 继续调用 serveStreams 方法
s.serveStreams(st)
s.removeConn(lisAddr, st)
}()
}
func (s *Server) serveStreams(st transport.ServerTransport) {
// 。。。
st.HandleStreams(func(stream *transport.Stream) {
// 。。。
go func() {
defer wg.Done()
// 根据 serviceName 取 server 中的 services map
s.handleStream(st, stream, s.traceInfo(st, stream))
}()
}, func(ctx context.Context, method string) context.Context {
// 。。。
})
wg.Wait()
}
func (s *Server) handleStream(t transport.ServerTransport, stream *transport.Stream, trInfo *traceInfo) {
// ...
service := sm[:pos]
method := sm[pos+1:]
// ...
// 取出 handler 进行处理
srv, knownService := s.services[service]
if knownService {
if md, ok := srv.methods[method]; ok {
// 在该方法中实现 handler 对 rpc 的处理,以及处理后的 response
s.processUnaryRPC(t, stream, srv, md, trInfo)
return
}
if sd, ok := srv.streams[method]; ok {
s.processStreamingRPC(t, stream, srv, sd, trInfo)
return
}
}
// ...
}
客户端
通过拨号 Dial()
建立连接
func Dial(target string, opts ...DialOption) (*ClientConn, error) {
return DialContext(context.Background(), target, opts...)
}
调用 DialContext()
方法主要返回一个初始化的 ClientConn{}
结构体对象:
// 压缩解压缩、是否需要认证、超时时间、是否重试等信息
cc := &ClientConn{
target: target,
// 连接的状态管理器,每个连接具有 "IDLE"、"CONNECTING"、"READY"、"TRANSIENT_FAILURE"、"SHUTDOW N"、"Invalid-State" 这几种状态
csMgr: &connectivityStateManager{},
conns: make(map[*addrConn]struct{}),
dopts: defaultDialOptions(),
// 监测 server 和 channel 的状态
czData: new(channelzData),
// 。。。
}
通过 NewGreeterClient
创建客户端
通过 proto.NewGreeterClient(conn)
创建 Client 对象,其在 xxx_grpc.pb.go
中通过 NewGreeterClient
创建客户端
func NewGreeterClient(cc grpc.ClientConnInterface) GreeterClient {
return &greeterClient{cc}
}
调用 Invoke
发起 RPC 调用
执行 c.SayHello(context.Background(), &proto.HelloRequest{Name: "Alex"})
调用,并接收信息,SayHello
方法通过调用 Invoke
发起 RPC 调用
func (c *greeterClient) SayHello(ctx context.Context, in *HelloRequest, opts ...grpc.CallOption) (*HelloReply, error) {
out := new(HelloReply)
// Invoke 中调用 invoke
err := c.cc.Invoke(ctx, "/proto.Greeter/SayHello", in, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
func (cc *ClientConn) Invoke(ctx context.Context, method string, args, reply interface{}, opts ...CallOption) error {
// 。。。
return invoke(ctx, method, args, reply, cc, opts...)
}
func invoke(ctx context.Context, method string, req, reply interface{}, cc *ClientConn, opts ...CallOption) error {
cs, err := newClientStream(ctx, unaryStreamDesc, cc, method, opts...)
// 。。。
if err := cs.SendMsg(req); err != nil {
return err
}
return cs.RecvMsg(reply)
}
cs.SendMsg(req)
方法中,首先准备数据:
hdr, payload, data, err := prepareMsg(m, cs.codec, cs.cp, cs.comp)
接着调用 csAttempt
这个结构体中的 sendMsg
方法:
op := func(a *csAttempt) error {
// 这个 sendMsg 方法中通过 a.t.Write(a.s, hdr, payld, &transport.Options{Last: !cs.desc.ClientStreams}) 发出的数据写操作
return a.sendMsg(m, hdr, payload, data)
}
cs.RecvMsg(reply)
方法中调用 csAttempt
的 recvMsg
方法:
func (cs *clientStream) RecvMsg(m interface{}) error {
// 。。。
err := cs.withRetry(func(a *csAttempt) error {
return a.recvMsg(m, recvInfo)
}, cs.commitAttemptLocked)
// 。。。
}
通过层层调用,最终是通过一个 io.Reader 类型的 p.r 来读取数据。
func (a *csAttempt) recvMsg(m interface{}, payInfo *payloadInfo) (err error) {
// ...
err = recv(a.p, cs.codec, a.s, a.dc, m, *cs.callInfo.maxReceiveMessageSize, payInfo, a.decomp)
// ...
}
func recv(p *parser, c baseCodec, s *transport.Stream, dc Decompressor, m interface{}, maxReceiveMessageSize int, payInfo *payloadInfo, compressor encoding.Compressor) error {
// 通过 recvAndDecompress 方法,接收数据并解压缩
d, err := recvAndDecompress(p, s, dc, maxReceiveMessageSize, payInfo, compressor)
// 。。。
}
func recvAndDecompress(p *parser, s *transport.Stream, dc Decompressor, maxReceiveMessageSize int, payInfo *payloadInfo, compressor encoding.Compressor) ([]byte, error) {
pf, d, err := p.recvMsg(maxReceiveMessageSize)
// 。。。
}
func (p *parser) recvMsg(maxReceiveMessageSize int) (pf payloadFormat, msg []byte, err error) {
if _, err := p.r.Read(p.header[:]); err != nil {
return 0, nil, err
}
// 。。。
}
代理模式
定义:提供一个代理对象,并由代理对象控制对原对象的引用。
代理模式下存在三类角色:
- 抽象角色 Subject:通过接口或抽象类声明业务方法,需要代理角色和真实角色来实现。
- 代理角色 Proxy:暴露给客户端,是真实角色的代理,通过真实角色的业务逻辑方法来实现抽象方法,并可以附加额外方法。
- 真实角色 RealSubject:实现真实角色的业务逻辑,供代理角色调用。
如上图所示,Client 客户端通过 toRequest 发起请求。抽象角色 Subject 作为一个接口,其中有一个Request方法。下面还有两个实现类,RealSubject 是实际的实现类,Proxy 是一个代理类,它内部是直接调用 RealSubject 的 request方法。当然,在 Proxy 中,它可以增加自己的 preRequest 和 afterRequest 处理逻辑。
不用代理模式的情况下,Client 就会直接请求 RealSubject。而使用代理模式,Client 直接调用的 Proxy,由 Proxy 先处理一遍,再由 Proxy调用 RealSubject,最后再返回给 Client。整个过程,Client 看到的只有 Proxy,对它来说,Proxy 就是真实的 Subject 实现类。而RealSubject 原来要服务很多的Client,但是现在只需要暴露给 Proxy,它的风险就小很多了,因为只需要信任 Proxy 就够了。
综上,代理模式起到中介隔离效果,避免直接暴露 RealSubject,同时这还符合开闭原则,对扩展开放,对修改关闭。Proxy 就是一个扩展,有新的需求可以在 Proxy 中实现,从而减少对 RealSubject 的修改。RealSubject 从而可以更加聚焦自己的核心能力,把一些边缘性的经常变化的扩展性需求放在 Proxy 中来实现。
作为补充,常见的 gRPC Proxy 原理:
- 启动一个 gRPC 代理服务端;
- 拦截 gRPC 的服务,转发到代理的一个函数中执行;
- 接收客户端的请求,完成相关处理后转发给服务端;
- 接收服务端的响应,完成相关处理后转发给客户端;