RPC
包管理,1.12前;旧版本要设置GO111MODULE=off
;查找gopath/src;goroot/src
;几乎没有包管理;
新版本;go.mod;
module xxx
go version
设置GO111MODULE=on
是什么
远程过程调用;
RPC与本地调用的区别;
远程过程调用的问题;
本地调用的过程:
- 变量压栈
- 变量辅助
- 执行函数体
- 返回结果
远程调用的问题:
- Call的id映射;如何调用远程的哪一个函数;唯一的确定需要被调用的函数;为函数给定唯一的ID;(数字、字符串都可以)
- 序列化与反序列化:网络中只能传递二进制;序列化将对象序列化为2进制串(或json、xml、protobuf),反序列化从二进制重建对象;
- 网络传输:http、tcp、自定义均可;gRPC用http2.0
远程过程调用,实现了序列化与反序列化,两端的编程语言便不重要了;
http协议问题;一次性 http2.0解决了这个问题;
一个简单的例子
一个简单的服务端提供add方法
package main
import (
"encoding/json"
"fmt"
"net/http"
"strconv"
)
func main() {
// http://127.0.0.1:8000/add?a=1&b=2
// 返回json {"data":3}
http.HandleFunc("/add", func(w http.ResponseWriter, r *http.Request) {
_ = r.ParseForm()
fmt.Println("path: ", r.URL.Path) //方法名
a, _ := strconv.Atoi(r.Form["a"][0]) //获取参数
b, _ := strconv.Atoi(r.Form["b"][0])
w.Header().Set("Content-Type", "application/json") //设置返回类型
data, _ := json.Marshal(map[string]int{"data": a + b}) //封装数据并序列化
_, _ = w.Write(data)
})
http.ListenAndServe(":8000", nil)
}
一个client的实现
package main
import (
"encoding/json"
"fmt"
"github.com/kirinlabs/HttpRequest"
)
//rpc 做到与本地调用一样
type ResponseData struct {
Data int `json:"data"`
}
// 封装一个远程调用的方法
func Add(a, b int) int {
// 传输协议:http
req := HttpRequest.NewRequest()
res, _ := req.Get(fmt.Sprintf("http://127.0.0.1:8000/%s?a=%d&b=%d", "add", a, b)) //待解决
body, _ := res.Body()
respData := ResponseData{}
_ = json.Unmarshal(body, &respData)
return respData.Data
}
func main() {
fmt.Println(Add(1, 2))
}
RPC开发四要素
- 客户端
- 客户端存根
- 服务端
- 服务端存根
存根一般动态生成代码;动态代理
内置RPC库
服务器
package main
import (
"net"
"net/rpc"
)
type HelloService struct{}
// 为HelloService实现方法
func (s *HelloService) Hello(request string, reply *string) error {
*reply = "hello:" + request // 传出参数
return nil
}
func main() {
// 1. 实例化一个server
listener, err := net.Listen("tcp", ":8888")
if err != nil {
panic(err)
}
// 2. 注册处理逻辑 handler
_ = rpc.RegisterName("HelloService", new(HelloService)) //注册服务,其中new(HelloService)是服务的具体实现
// 3. 启动服务
conn, err := listener.Accept()
if err != nil {
panic(err)
}
rpc.ServeConn(conn) // 处理连接请求
}
客户端
package main
import (
"fmt"
"log"
"net/rpc"
)
func main() {
client, err := rpc.Dial("tcp", ":8888") //拨号连接远程服务
if err != nil {
log.Fatal(err)
}
var reply string
_ = client.Call("HelloService.Hello", "你好", &reply) //调用远程方法,如何知道远程方法的名字?
if err != nil {
log.Fatal(err)
}
fmt.Println(reply) //打印返回值
}
此时两个程序在不同的机器上也可以调用;
进一步封装
将功能分给三个角色实现:client、server、接口设计者
接口设计
package service
import "net/rpc"
const HelloServiceName = "HelloService"
type HelloServiceInterface = interface {
Hello(request string, reply *string) error
}
func RegisterHelloService(svc HelloServiceInterface) error {
return rpc.RegisterName(HelloServiceName, svc)
}
server不变
client进一步封装
package main
import (
service "RPC/02_inner_rpc/service"
"fmt"
"log"
"net/rpc"
)
type HelloServiceClient struct {
*rpc.Client
}
var _ service.HelloServiceInterface = (*HelloServiceClient)(nil)
func DialHelloService(network, address string) (*HelloServiceClient, error) {
c, err := rpc.Dial(network, address)
if err != nil {
return nil, err
}
return &HelloServiceClient{Client: c}, nil
}
func (p *HelloServiceClient) Hello(request string, reply *string) error {
return p.Client.Call(service.HelloServiceName+".Hello", request, reply)
}
// 以上代码是客户端的代码,客户端需要知道服务端的方法名,这样才能调用服务端的方法。
func main() {
client, err := DialHelloService("tcp", "192.168.120.172:8888")
if err != nil {
log.Fatal("dialing:", err)
}
var reply string
err = client.Hello("hello", &reply)
if err != nil {
log.Fatal(err)
}
fmt.Println(reply)
}
问题怎么用以下方式调用远程方法 ,自己封装可以实现,但如何不自己实现实现呢?
client.Hello("world")
go语言的内置RPC序列化是什么协议?Gob,只是go的,也就是说不能跨程序;
JSON RPC
服务器代码
package main
import (
"log"
"net"
"net/rpc"
"net/rpc/jsonrpc"
)
type HelloService struct{}
func (p *HelloService) Hello(request string, reply *string) error {
*reply = "hello:" + request
return nil
}
func main() {
err := rpc.RegisterName("HelloService", new(HelloService)) // 注册rpc服务
if err != nil {
log.Fatal(err)
}
listen, err := net.Listen("tcp", ":8080") // 监听tcp端口
if err != nil {
log.Fatal(err)
}
for {
conn, err := listen.Accept() // 接收tcp连接
if err != nil {
log.Fatal(err)
}
go rpc.ServeCodec(jsonrpc.NewServerCodec(conn)) // jsonrpc方式处理连接
}
}
客户端代码:
package main
import (
"log"
"net"
"net/rpc"
"net/rpc/jsonrpc"
)
func main() {
//1. 不需要使用rpc拨号了,不用Gob编码了,使用json编码,所以直接使用net.Dial
conn, err := net.Dial("tcp", "192.168.120.172:8080")
if err != nil {
log.Fatal(err)
}
var reply string
client := rpc.NewClientWithCodec(jsonrpc.NewClientCodec(conn))
err = client.Call("HelloService.Hello", "world", &reply) // 2. 调用远程方法;如何发送的呢?包装成json格式发送;具体如何
if err != nil {
log.Fatal(err)
}
log.Println(reply)
}
我们可以通过nc -l 8080
去监听,如何执行客户端可以得到
{"method":"HelloService.Hello","params":["world"],"id":0}
也就是说,实际上客户端传递给服务器的是json格式的请求;
用python来模拟客户端请求服务;
import json
from socket import *
client = socket(AF_INET, SOCK_STREAM) # 创建socket对象,tcp协议
client.connect(("192.168.120.172", 8080)) # 服务器的ip和端口
call_msg = {
"method": "HelloService.Hello",
"params": ["world"],
"id": 0
}
call_msg = json.dumps(call_msg)
client.send(call_msg.encode("utf-8")) # 调用远程方法
msg = client.recv(1024) # 接收返回值
msg = json.loads(msg.decode("utf-8"))
print(msg['result'])
client.close()
实际上,底层是如下数据结构
type ClientRequest struct {
Method string "json:\"method\""
Params [1]interface{} "json:\"params\""
Id uint64 "json:\"id\""
}
type ClientResponse struct {
Id uint64 "json:\"id\""
Result *json.RawMessage "json:\"result\""
Error interface{} "json:\"error\""
}
HTTP RPC
使用HTTP 实现RPC
服务端:
package main
import (
"io"
"log"
"net/http"
"net/rpc"
"net/rpc/jsonrpc"
)
type HelloService struct {
}
func (p *HelloService) Hello(request string, reply *string) error {
*reply = "Hello, " + request + "!"
return nil
}
func main() {
// 服务注册
err := rpc.RegisterName("HelloService", new(HelloService))
if err != nil {
log.Fatal("RegisterName error:", err)
}
http.HandleFunc("/jsonrpc", func(w http.ResponseWriter, r *http.Request) {
var conn io.ReadWriteCloser = struct {
io.Writer
io.ReadCloser
}{
ReadCloser: r.Body,
Writer: w,
}
rpc.ServeRequest(jsonrpc.NewServerCodec(conn))
})
http.ListenAndServe(":8080", nil)
}
客户端
package main
import (
"bytes"
"encoding/json"
"fmt"
"net/http"
)
func main() {
req := map[string]interface{}{
"jsonrpc": "2.0",
"method": "HelloService.Hello",
"params": []string{"world"},
"id": 0,
}
res, _ := json.Marshal(req)
request, err := http.NewRequest("POST", "http://localhost:8080/jsonrpc", bytes.NewReader(res))
if err != nil {
panic(err)
}
defer request.Body.Close()
result := make([]byte, 1024)
request.Body.Read(result)
fmt.Println(string(res))
}
或者使用curl 10.60.80.28:8080/jsonrpc -X POST --data '{"method":"HelloService.Hello","params":["world"],"id":0}'
或者Python
import requests
req = {
"id": 0,
"params": ["world"],
"method": "HelloService.Hello"
}
res = requests.post("http://localhost:8080/jsonrpc", json=req)
print(res.text)
像本地调用一样
要实现像本地调用一样,需要实现client,client_proxy
,server,server_proxy
,handler
client
:就像本地调用一样,调用远程的方法,通过client_proxy
实现;
client_proxy
:负责建立连接,与server_proxy
通信,并实现相关服务调用的代码
server_proxy
:负责注册相应的服务,
server
:监听服务,通过server_proxy
注册服务,处理服务
handler
:为服务的具体实现
具体代码:
handler/handler.go
package handler
const HelloServiceName = "handler/HelloService"
type HelloService struct{}
// 为HelloService实现方法
func (s *HelloService) Hello(request string, reply *string) error {
*reply = "hello:" + request // 传出参数
return nil
}
server_proxy/server_proxy.go
package server_proxy
import (
"RPC/05_new_helloworld/handler"
"net/rpc"
)
type HelloServicer interface {
Hello(request string, reply *string) error
}
// 如何解耦? 用接口
func RegisterHelloService(srv HelloServicer) error {
return rpc.RegisterName(handler.HelloServiceName, srv)
}
server/main.go
package main
import (
"RPC/05_new_helloworld/handler"
"RPC/05_new_helloworld/server_proxy"
"net"
"net/rpc"
)
func main() {
// 1. 实例化一个server
listener, err := net.Listen("tcp", ":8888")
if err != nil {
panic(err)
}
// 2. 注册处理逻辑 handler
_ = server_proxy.RegisterHelloService(&handler.HelloService{}) //注册服务,其中new(HelloService)是服务的具体实现
// 3. 启动服务
for {
conn, err := listener.Accept()
if err != nil {
panic(err)
}
go rpc.ServeConn(conn) // 处理连接请求
}
}
client_proxy/client_proxy.go
package client_proxy
import (
"RPC/05_new_helloworld/handler"
"log"
"net/rpc"
)
type HelloServiceStub struct {
*rpc.Client
}
// go中没有类、对象,就没有初始化方法
// 但是可以通过定义一个New方法来实现初始化
func NewHelloServiceClient(proto, address string) HelloServiceStub {
conn, err := rpc.Dial(proto, address)
if err != nil {
log.Fatal(err)
}
return HelloServiceStub{conn}
}
// 封装调用细节
func (c *HelloServiceStub) Hello(request string, reply *string) error {
err := c.Call(handler.HelloServiceName+".Hello", request, reply)
if err != nil {
log.Fatal(err)
}
return nil
}
client/main.go
package main
import (
"RPC/05_new_helloworld/client_proxy"
)
func main() {
// 1. 建立连接
serviceClient := client_proxy.NewHelloServiceClient("tcp", ":8888")
var reply string
// 2. 调用方法
serviceClient.Hello("hello", &reply)
// 3. 打印结果
println(reply)
}
需要解决的问题:
- server_proxy与client_proxy是否可以自动生成?多种语言
- 解决方法:protobuf + grpc
gRPC初步
gRPC 使用了 http 2.0 + protobuf; http 2.0 解决了通信问题; protobuf解决了序列化与反序列化问题;
- protocol buffer 是 Google 提供的一种轻量&高效的结构化数据存储格式,性能比
Json、XML
强很多;优点:跨平台、便捷性、高性能;缺点:通用性差,自解释性差; - http 2.0 长连接
protobuf 体验:
go 与protobuf
安装protobuf
,官方连接,windows安装win版本,linux安装linux版本;
设置环境变量:D:\Program Files\protoc-23.2\bin
go 安装依赖包go get github.com/golang/protobuf/protoc-gen-go
go install google.golang.org/protobuf/cmd/protoc-gen-go@v1.28
go install google.golang.org/grpc/cmd/protoc-gen-go-grpc@v1.2
关于protoc-gen-go不是内外部命令的方案
的一种解决方案:
-
下载源码
git clone https://github.com/golang/protobuf
-
生成
protoc-gen-go.exe
cd protobuf/protoc-gen-go go build -o protoc-gen-go.exe main.go
-
添加环境变量
proto文件,作为中间文件,可以生成其他语言的代码,创建helloworld.proto
文件,编写代码:
syntax = "proto3";
option go_package = "./";
message HelloRequest{
string name = 1;//1 是编号不是值
}
在创建文件的目录运行protoc --go_out=. --go-grpc_out=. helloworld.proto
初步使用
-
编写proto文件
syntax = "proto3"; option go_package = ".;proto";//Go的包名 service Greeter{ rpc SayHello(HelloRequest) returns (HelloResponse); } message HelloRequest{ string name = 1;//1 是编号不是值 } message HelloResponse{ string msg = 1; }
生成代码
protoc --go_out=. --go-grpc_out=. helloworld.proto
-
编写服务端代码
package main import ( "context" "google.golang.org/grpc" "net" "RPC/07_grpc_helloworld/proto" ) type Server struct { proto.UnimplementedGreeterServer } func (s *Server) SayHello(ctx context.Context, request *proto.HelloRequest) (*proto.HelloResponse, error) { //TODO implement me return &proto.HelloResponse{Msg: "Hello " + request.Name}, nil } func main() { g := grpc.NewServer() proto.RegisterGreeterServer(g, &Server{}) listen, err := net.Listen("tcp", ":8080") if err != nil { panic(err) } err = g.Serve(listen) if err != nil { panic(err) } }
-
编写客户端代码
package main import ( "RPC/07_grpc_helloworld/proto" "context" "fmt" "google.golang.org/grpc" ) func main() { conn, err := grpc.Dial("localhost:8080", grpc.WithInsecure()) if err != nil { panic(err) } defer conn.Close() client := proto.NewGreeterClient(conn) hello, err := client.SayHello(context.Background(), &proto.HelloRequest{Name: "World"}) if err != nil { panic(err) } fmt.Println(hello.Msg) }
gRPC流模式
- 服务端流模式;
- 客户端流模式;
- 双向流模式;
对三种模式进行测试,
首先是创建proto
文件,内容如下:
syntax = "proto3";
option go_package = ".;proto";
service Greeter{
rpc GetStream(StreamReqData) returns (stream StreamResData);//返回值是流,也就是服务端流模式
rpc PutStream(stream StreamReqData) returns (StreamResData);//客户端流模式
rpc AllStream(stream StreamReqData) returns (stream StreamResData);//双向流模式
}
message StreamReqData{
string data = 1;
}
message StreamResData{
string data = 1;
}
其中,stream
关键字表示我们传递的是流数据;
服务端代码,完成相关功能的实现即可。
package main
import (
"fmt"
"net"
"sync"
"time"
"google.golang.org/grpc"
"RPC/08_grpc_stream/proto"
)
const PORT = ":8080"
type server struct {
proto.UnimplementedGreeterServer
}
// 此时客户端是流模式,服务器接受客户端的流
func (s *server) PutStream(cliStream proto.Greeter_PutStreamServer) error {
//TODO implement me
for {
if recv, err := cliStream.Recv(); err != nil {
fmt.Println(err)
break
} else {
fmt.Println(recv)
}
}
return nil
}
func (s *server) AllStream(allStream proto.Greeter_AllStreamServer) error {
//TODO implement me
// 接收发送同时进行,可以使用协程,并行
wg := sync.WaitGroup{}
wg.Add(2)
go func() {
defer wg.Done()
for {
if data, err := allStream.Recv(); err != nil {
fmt.Println(err)
} else {
fmt.Printf("客户端的消息:%v\n", data)
}
}
}()
go func() {
defer wg.Done()
i := 0
for {
_ = allStream.Send(&proto.StreamResData{Data: fmt.Sprintf("%v : 服务器消息 [%d] ", time.Now().Unix(), i)})
time.Sleep(time.Second * 1)
i++
}
}()
wg.Wait()
return nil
}
// 方法实现
// contex 不需要了,服务器端是流模式
func (s *server) GetStream(request *proto.StreamReqData, serverStream proto.Greeter_GetStreamServer) error {
//TODO implement me
i := 0
for {
_ = serverStream.Send(&proto.StreamResData{Data: fmt.Sprintf("%v", time.Now().Unix())}) //异步?
time.Sleep(time.Second * 1)
if i > 10 {
break
}
i++
}
return nil
}
func main() {
lis, err := net.Listen("tcp", PORT)
if err != nil {
panic(err)
}
s := grpc.NewServer()
proto.RegisterGreeterServer(s, &server{})
err = s.Serve(lis)
if err != nil {
panic(err)
}
}
客户端代码,完成接收,与发送,注意sync.WaitGroup{}
的用法,它是一种同步机制,会保证所有的协程的同步。
package main
import (
"context"
"fmt"
"sync"
"time"
"google.golang.org/grpc"
"RPC/08_grpc_stream/proto"
)
func main() {
//连接到服务端
conn, err := grpc.Dial("localhost:8080", grpc.WithInsecure())
if err != nil {
panic(err)
}
defer conn.Close()
c := proto.NewGreeterClient(conn)
stream, err := c.GetStream(context.Background(), &proto.StreamReqData{Data: "hello server stream!"})
if err != nil {
panic(err)
}
//接收
fmt.Println("测试服务端流模式")
for {
recv, err := stream.Recv()
if err != nil {
fmt.Println(err)
break
}
fmt.Println(recv.Data)
}
// 客户端流模式
fmt.Println("测试客户端流模式")
putStream, err := c.PutStream(context.Background())
if err != nil {
panic(err)
}
// 发送 10次
for i := 0; i < 10; i++ {
err := putStream.Send(&proto.StreamReqData{Data: fmt.Sprintf("%d: %v", i, time.Now().Unix())})
time.Sleep(time.Second * 1)
if err != nil {
fmt.Println(err)
break
}
}
//err = putStream.CloseSend()
if err != nil {
fmt.Println(err)
}
//双向流模式
fmt.Println("测试双向流模式")
wg := sync.WaitGroup{}
allStream, _ := c.AllStream(context.Background())
wg.Add(2)
go func() {
defer wg.Done()
for {
data, _ := allStream.Recv()
fmt.Println("服务端的消息:" + data.Data)
}
}()
go func() {
defer wg.Done()
i := 0
for {
//客户端发送请求消息,
_ = allStream.Send(&proto.StreamReqData{Data: fmt.Sprintf("%v : 客户端消息 [%d] ", time.Now().Unix(), i)})
time.Sleep(time.Second * 1)
i++
}
}()
wg.Wait() //等待协程执行完毕
}
protobuf
官方文档
定义消息
默认数值类型
.proto Type | Notes | Go Type |
---|---|---|
double | float64 | |
float | float32 | |
int32 | Uses variable-length encoding. Inefficient for encoding negative numbers – if your field is likely to have negative values, use sint32 instead. | int32 |
int64 | Uses variable-length encoding. Inefficient for encoding negative numbers – if your field is likely to have negative values, use sint64 instead. | int64 |
uint32 | Uses variable-length encoding. | uint32 |
uint64 | Uses variable-length encoding. | uint64 |
sint32 | Uses variable-length encoding. Signed int value. These more efficiently encode negative numbers than regular int32s. | int32 |
sint64 | Uses variable-length encoding. Signed int value. These more efficiently encode negative numbers than regular int64s. | int64 |
fixed32 | Always four bytes. More efficient than uint32 if values are often greater than 228. | uint32 |
fixed64 | Always eight bytes. More efficient than uint64 if values are often greater than 256. | uint64 |
sfixed32 | Always four bytes. | int32 |
sfixed64 | Always eight bytes. | int64 |
bool | bool | |
string | A string must always contain UTF-8 encoded or 7-bit ASCII text, and cannot be longer than 232. | string |
bytes | May contain any arbitrary sequence of bytes no longer than 232. | []byte |
几乎是一一对应,默认值
- string默认为空string
- 对于bytes默认为空bytes
- 对于bools默认为false
- 对于数值类型默认为0
- 对于枚举类型默认第一个枚举值,且为0
几乎与Go的默认值一样。
option go_package
在 Go 中,option go_package = ".;proto"
; 是 Protocol Buffers(protobuf)语法中的一个选项,用于指定生成的 Go 代码的包名。
具体解释如下:
option
:表示这是一个选项(Option)声明。
go_package
:表示这个选项是用于指定生成的 Go 代码的包名。
".;proto"
:是一个字符串,其中 .
表示当前目录(即与 .proto
文件相同的目录),proto
表示包的名称。也可以放到其他目录
该选项的作用是告诉编译器生成的 Go 代码应该放在当前目录(与 .proto 文件相同的目录)下的名为 proto 的包中。
例如,如果有一个名为 example.proto 的文件,其中包含 option go_package = “.;proto”;,则生成的 Go 代码将被放置在名为 proto 的包中。
这种方式的好处是,可以方便地将生成的 Go 代码放在与 .proto 文件相同的包中,以简化导入路径和包名的管理。
请注意,option go_package = ".;proto"
; 是 protobuf 特有的选项语法,其他编程语言可能使用不同的选项语法来指定包名。在不同的编程语言中,使用合适的选项语法来设置包名以确保生成的代码可以正确导入和使用。
从一个proto引入另一个proto的消息
一个基本proto;base.proto
syntax = "proto3";
option go_package = ".;proto";
message Pong{
string id = 1;
}
另一个具体的服务hello.proto
syntax = "proto3";
import "base.proto";
import "google/protobuf/empty.proto";//内置
option go_package=".;proto";
service Greeter {
rpc SayHello (HelloRequest) returns (HelloReply);
rpc Ping(google.protobuf.Empty) returns(Pong);
}
message HelloRequest {
string name = 1;
}
message HelloReply {
string message = 1;
}
生成方式需要指定相关文件的目录
protoc --go_out=. --go-grpc_out=. -I="C:\Users\Rao\AppData\Local\JetBrains\GoLand2022.3\protoeditor" -I="." .\proto\*.proto
message嵌套
定义:
syntax = "proto3";
import "proto/base.proto";
import "google/protobuf/empty.proto";
option go_package = ".;proto";
service Greeter {
rpc SayHello (HelloRequest) returns (HelloReply);
rpc Ping(google.protobuf.Empty) returns(Pong);
}
message HelloRequest {
string name = 1;
}
message HelloReply {
string message = 1;
message Result{
string name = 1;
int32 age = 2;
}
repeated Result data = 2;
}
使用:
package main
import (
proto "RPC/09_proto"
"fmt"
)
func main() {
a := proto.HelloReply_Result{}
fmt.Println(a)
b := proto.Pong{}
fmt.Println(b)
}
emun
message HelloRequest {
string name = 1;
Gender g = 3;
}
enum Gender{
MALE = 0;
FEMALE = 1;
}
使用
a := proto.HelloRequest{Name: "Tom", G: proto.Gender_MALE}
map
proto
message HelloRequest {
string name = 1;
Gender g = 3;
map<string,string> mp = 4;
}
使用
a := proto.HelloRequest{Name: "Tom", G: proto.Gender_MALE, Mp: map[string]string{"a": "b"}}
时间戳
syntax = "proto3";
import "base.proto";
import "google/protobuf/empty.proto";
import "google/protobuf/timestamp.proto";
option go_package = "./proto;proto";
service Greeter {
rpc SayHello (HelloRequest) returns (HelloReply);
rpc Ping(google.protobuf.Empty) returns(Pong);
}
message HelloRequest {
string name = 1;
Gender g = 3;
map<string,string> mp = 4;
google.protobuf.Timestamp addTime = 5;
}
enum Gender{
MALE = 0;
FEMALE = 1;
}
message HelloReply {
string message = 1;
message Result{
string name = 1;
int32 age = 2;
}
repeated Result data = 2;
}
代码生成:
protoc --go_out=. --go-grpc_out=. -I="C:\Users\Rao\AppData\Local\JetBrains\GoLand2022.3\protoeditor" -I="." .\proto\*.proto
使用
package main
import (
proto "RPC/09_proto/proto"
"fmt"
timestamppb "google.golang.org/protobuf/types/known/timestamppb"
"time"
)
func main() {
a := proto.HelloRequest{Name: "Tom", G: proto.Gender_MALE, Mp: map[string]string{"a": "b"}, AddTime: timestamppb.New(time.Now())}
fmt.Printf("%v\n", a)
b := proto.Pong{}
fmt.Println(b)
}
gRPC进阶
metadata机制
相当于HTTP的头部,不放在业务代码里。
metadata的存储形式,key:value
,map[string][]string
type MD map[string][]string
-
新建metadata
第一种创建metadata的方法 md := metadata.New(map[string]string{"k1": "v1", "k2": "v2"}) //第二种创建metadata的方法 md = metadata.Pairs( "k1", "v1", "k2", "v2", )
-
发送metadata
//发送metadata ctx := metadata.NewOutgoingContext(context.Background(), md) //单向RPC response, err := client.SomeRPC(ctx, someRequest)
-
接收metadata
//接收metadata func (s *server) SomeRPC(ctx context.Context, in *pb.SomeRequest) (*pb.SomeResponse, error) { md, ok := metadata.FromIncomingContext(ctx) // do something with metadata }
简单的metadata例子
-
创建proto文件
syntax = "proto3"; option go_package = "./proto;proto"; service Greeter{ rpc SayHello (HelloRequest) returns (HelloReply) {} } message HelloRequest{ string name = 1; } message HelloReply{ string message = 1; }
protoc --go_out=. --go-grpc_out=. -I="C:\Users\Rao\AppData\Local\JetBrains\GoLand2022.3\protoeditor" -I="proto" ./proto/*.proto
-
创建服务端代码
package main import ( "RPC/10_metadata/proto" "context" "fmt" "google.golang.org/grpc" "google.golang.org/grpc/metadata" "net" ) type Server struct { proto.UnimplementedGreeterServer } func (s *Server) SayHello(ctx context.Context, req *proto.HelloRequest) (*proto.HelloReply, error) { md, ok := metadata.FromIncomingContext(ctx) if !ok { fmt.Println("No metadata") } else { if cookieSlice, ok := md["cookie"]; ok { fmt.Println(cookieSlice[0]) } else { fmt.Println(cookieSlice) } } return &proto.HelloReply{ Message: "hello " + req.Name, }, nil } func main() { g := grpc.NewServer() proto.RegisterGreeterServer(g, &Server{}) listen, err := net.Listen("tcp", ":8080") if err != nil { panic(err) } err = g.Serve(listen) if err != nil { panic(err) } }
-
创建客户端代码
package main import ( "RPC/10_metadata/proto" "context" "fmt" "google.golang.org/grpc" "google.golang.org/grpc/metadata" "time" ) func main() { conn, err := grpc.Dial("localhost:8080", grpc.WithInsecure()) if err != nil { panic(err) } defer conn.Close() client := proto.NewGreeterClient(conn) md := metadata.New(map[string]string{"Cookie": "sadfasf4511", "Token": "asdfa21215fas22", "timestamp": fmt.Sprintf("%d", time.Now().Unix())}) ctx := metadata.NewOutgoingContext(context.Background(), md) res, err := client.SayHello(ctx, &proto.HelloRequest{Name: "Tom"}) if err != nil { panic(err) } fmt.Println(res.Message) }
grpc 拦截器 -go
拦截器几乎等同于web服务器的拦截器;
服务端拦截器,代码使用以上代码,
//创建拦截器
interceptor := func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) {
fmt.Println("一个新的请求") //统计时间开始
res, err := handler(ctx, req) //放行
fmt.Println("请求处理完毕") //统计时间结束
return res, nil
}
//grpc包装拦截器
opt := grpc.UnaryInterceptor(interceptor)
g := grpc.NewServer(opt) //可以传入多个拦截器
客户端拦截器:
//客户端拦截器
interceptor := func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
start := time.Now()
err := invoker(ctx, method, req, reply, cc, opts...)
fmt.Println("耗时:%s", time.Since(start))
return err
}
//grpc包装拦截器
opt := grpc.WithUnaryInterceptor(interceptor)
//建立连接
conn, err := grpc.Dial("localhost:8080", grpc.WithInsecure(), opt)
中间件:https://github.com/grpc-ecosystem/go-grpc-middleware
大部分通过拦截器实现。
拦截器例子,Auth
-
创建proto文件
syntax = "proto3"; option go_package = "./proto;proto"; service Greeter{ rpc SayHello (HelloRequest) returns (HelloReply) {} } message HelloRequest{ string name = 1; } message HelloReply{ string message = 1; }
-
创建客户端代码
package main import ( "RPC/12_grpc_auth_token/proto" "context" "fmt" "google.golang.org/grpc" "google.golang.org/grpc/metadata" "time" ) func main() { //客户端拦截器 interceptor := func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error { start := time.Now() //获取新的上下文 md := metadata.New(map[string]string{"appid": "12548", "appkey": "I am key!"}) //将新的上下文放入 ctx = metadata.NewOutgoingContext(context.Background(), md) //调用方法 err := invoker(ctx, method, req, reply, cc, opts...) fmt.Println("耗时:", time.Since(start)) return err } //grpc包装拦截器 opt := grpc.WithUnaryInterceptor(interceptor) //建立连接 conn, err := grpc.Dial("localhost:8080", grpc.WithInsecure(), opt) if err != nil { panic(err) } defer conn.Close() client := proto.NewGreeterClient(conn) res, err := client.SayHello(context.Background(), &proto.HelloRequest{Name: "Tom"}) if err != nil { panic(err) } fmt.Println(res.Message) }
另外一种实现方式,实际上是一种封装。
package main import ( "RPC/12_grpc_auth_token/proto" "context" "fmt" "google.golang.org/grpc" ) type CustomCredential struct { } func (s *CustomCredential) GetRequestMetadata(ctx context.Context, uri ...string) (map[string]string, error) { return map[string]string{ "appid": "12548", "appkey": "I am key!", }, nil } // 是否需要基于TLS认证进行安全传输 func (s *CustomCredential) RequireTransportSecurity() bool { return false } func main() { //grpc包装拦截器 opt := grpc.WithPerRPCCredentials(&CustomCredential{}) //建立连接 conn, err := grpc.Dial("localhost:8080", grpc.WithInsecure(), opt) if err != nil { panic(err) } defer conn.Close() client := proto.NewGreeterClient(conn) res, err := client.SayHello(context.Background(), &proto.HelloRequest{Name: "Tom"}) if err != nil { panic(err) } fmt.Println(res.Message) }
-
服务端代码
package main import ( "RPC/12_grpc_auth_token/proto" "context" "fmt" "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/metadata" "google.golang.org/grpc/status" "net" ) type Server struct { proto.UnimplementedGreeterServer } func (s *Server) SayHello(ctx context.Context, req *proto.HelloRequest) (*proto.HelloReply, error) { md, ok := metadata.FromIncomingContext(ctx) if !ok { fmt.Println("No metadata") } else { if cookieSlice, ok := md["cookie"]; ok { fmt.Println(cookieSlice[0]) } else { fmt.Println("无cookie") } } return &proto.HelloReply{ Message: "hello " + req.Name, }, nil } func main() { //创建拦截器 interceptor := func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) { md, ok := metadata.FromIncomingContext(ctx) if !ok { //错误处理 return resp, status.Error(codes.Unauthenticated, "无Token认证信息") } var ( appid string appkey string ) if var1, ok := md["appid"]; ok { appid = var1[0] } if var2, ok := md["appkey"]; ok { appkey = var2[0] } //真正的鉴权逻辑 if appid != "12548" || appkey != "I am key!" { //错误处理 return resp, status.Error(codes.Unauthenticated, "Token认证信息无效") } fmt.Println("鉴权成功") res, err := handler(ctx, req) //放行 return res, nil } //grpc包装拦截器 opt := grpc.UnaryInterceptor(interceptor) g := grpc.NewServer(opt) //可以传入多个拦截器 proto.RegisterGreeterServer(g, &Server{}) listen, err := net.Listen("tcp", ":8080") if err != nil { panic(err) } err = g.Serve(listen) if err != nil { panic(err) } }
验证器
基于https://github.com/bufbuild/protoc-gen-validate
下载对应系统版本发行版,移动到对应目录,go/bin下
下载validate.proto
文件:https://github.com/bufbuild/protoc-gen-validate/tree/main/validate,移动到proto目录下,
创建自己的proto
syntax = "proto3";
option go_package = "./proto;proto";
import "validate.proto";
service Greeter{
rpc SayHello (Person) returns (Person) {}
}
// 这个相当于文档,验证功能
message Person {
uint64 id = 1 [(validate.rules).uint64.gt = 999];
string email = 2 [(validate.rules).string.email = true];
string mobile = 3 [(validate.rules).string = {
pattern: "^1[3456789]\\d{9}",
}];
}
protoc --go_out=. --go-grpc_out=. --validate_out="lang=go:." -I="C:\Users\Rao\AppData\Local\JetBrains\GoLand2022.3\protoeditor" -I="proto" ./proto/*.proto
使用Interceptor
grpc异常处理
grpc 的状态码
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-GrWl50WQ-1687505592051)(./assets/image-20230613101825802.png)]
服务端返回状态码,
错误返回
return nil, status.Error(codes.NotFound, "服务未找到!"+req.Name)
客户端处理错误
res, err := client.SayHello(ctx, &proto.HelloRequest{Name: "Tom"})
if err != nil {
s, ok := status.FromError(err)
if !ok {
panic("解析错误失败")
}
fmt.Println(s.Message())
fmt.Println(s.Code())
}
grpc超时机制
- 网络抖动 网络拥塞
- 服务器很慢
客户端设置
ctx, _ := context.WithTimeout(context.Background(), time.Second*3)
protoc 生成的go源码有什么
message 对应一个 struct
service:对应服务端和客户端的stub的实现,是接口;对于服务端还生成了Registerxxx
,客户端会生成接口,是调用所需的参数。
roto.RegisterGreeterServer(g, &Server{})
listen, err := net.Listen("tcp", ":8080")
if err != nil {
panic(err)
}
err = g.Serve(listen)
if err != nil {
panic(err)
}
}
```
验证器
基于https://github.com/bufbuild/protoc-gen-validate
下载对应系统版本发行版,移动到对应目录,go/bin下
下载validate.proto
文件:https://github.com/bufbuild/protoc-gen-validate/tree/main/validate,移动到proto目录下,
创建自己的proto
syntax = "proto3";
option go_package = "./proto;proto";
import "validate.proto";
service Greeter{
rpc SayHello (Person) returns (Person) {}
}
// 这个相当于文档,验证功能
message Person {
uint64 id = 1 [(validate.rules).uint64.gt = 999];
string email = 2 [(validate.rules).string.email = true];
string mobile = 3 [(validate.rules).string = {
pattern: "^1[3456789]\\d{9}",
}];
}
protoc --go_out=. --go-grpc_out=. --validate_out="lang=go:." -I="C:\Users\Rao\AppData\Local\JetBrains\GoLand2022.3\protoeditor" -I="proto" ./proto/*.proto
使用Interceptor
grpc异常处理
grpc 的状态码
服务端返回状态码,
错误返回
return nil, status.Error(codes.NotFound, "服务未找到!"+req.Name)
客户端处理错误
res, err := client.SayHello(ctx, &proto.HelloRequest{Name: "Tom"})
if err != nil {
s, ok := status.FromError(err)
if !ok {
panic("解析错误失败")
}
fmt.Println(s.Message())
fmt.Println(s.Code())
}
grpc超时机制
- 网络抖动 网络拥塞
- 服务器很慢
客户端设置
ctx, _ := context.WithTimeout(context.Background(), time.Second*3)
protoc 生成的go源码有什么
message 对应一个 struct
service:对应服务端和客户端的stub的实现,是接口;对于服务端还生成了Registerxxx
,客户端会生成接口,是调用所需的参数。