0. 前言
这一节要熟悉Go中的反射reflet,不然可能比较难理解。在使用到反射的一些函数时候,我也会讲解关于反射reflect的用法。
1.引出反射reflect
这个例子是表示客户端想使用Foo服务的Sum方法。即是想调用Foo结构体的Sum方法。
client.Call("Foo.Sum", args, &reply);
服务端那肯定还有其他结构体的,例如Say结构体,Speak结构体;Hello方法。
有这么多结构体和方法,那服务器要如何判断是用使用那个服务呢。按照硬编码实现的话,就是遍历所有的服务
switch req.ServiceMethod {
case "T.MethodName":
t := new(t)
reply := new(T2)
var argv T1
gob.NewDecoder(conn).Decode(&argv)
err := t.MethodName(argv, reply)
server.sendMessage(reply, err)
case "Foo.Sum":
f := new(Foo)
...
}
这肯定是很繁琐且不现实的。那之后,会想到map[string]service。例如访问"Foo.Sum",那就直接获得该service,那就要为"Foo.Sum"编写特定的执行流程,要是访问"Cat.Speak",那就要为其编写特定的执行流程,那也又回到之前的困境了。
这时候就需要使用反射了。
同Java
语言一样,Go
语言也有运行时反射,这为我们提供了一种可以在运行时操作任意类型对象的能力。比如查看一个接口变量的具体类型、看看一个结构体有多少字段、修改某个字段的值等。Go
语言是静态编译类语言,比如在定义一个变量的时候,已经知道了它是什么类型,那么为什么还需要反射呢?这当然是因为有些事情只有在运行时才知道。比如你定义了一个函数,它有一个any类型的参数,这也就意味着调用者可以传递任何类型的参数给这个函数。在这种情况下,如果你想知道调用者传递的是什么类型的参数,就需要用到反射。如果你想知道一个结构体有哪些字段和方法,也需要反射。
那通过反射,我们就可以知道该结构体的字段和方法,还有方法的参数返回值这些。这样我们就可以对该服务进行统一编写了。
3. 通过反射实现 service
前面我们完成了客户端和服务端。客户端相对来说功能是比较完整的,但是服务端的功能并不完整,仅仅将请求的 header 打印了出来,并没有真正地处理body。那今天的主要目的是补全这部分功能。
首先通过反射实现结构体与服务的映射关系,代码独立放置在 service.go
中。
先来看如何描述一个方法, 表示方法的结构体,用于注册服务方法
- method表示方法本身的反射类型,即是其方法名字
- ArgType表示参数的类型,即是例子Call中的第二个参数args
- replyType表示响应的类型,即是第三参数reply
- numCalls:后续统计方法调用次数时会用到
type methodType struct {
method reflect.Method
ArgType reflect.Type
replyType reflect.Type
numCalls uint64
}
func (m *methodType) newArgv() reflect.Value {
var argv reflect.Value
if m.ArgType.Kind() == reflect.Ptr {
argv = reflect.New(m.ArgType.Elem())
} else {
argv = reflect.New(m.ArgType).Elem()
}
return argv
}
func (m *methodType) newReplyv() reflect.Value {
// reply must be a pointer type
replyv := reflect.New(m.replyType.Elem())
switch m.replyType.Elem().Kind() {
case reflect.Map:
replyv.Elem().Set(reflect.MakeMap(m.replyType.Elem()))
case reflect.Slice:
replyv.Elem().Set(reflect.MakeSlice(m.replyType.Elem(), 0, 0))
}
return replyv
}
reflect更为强大的功能是可以在运行时动态创建各种类型的实例,用New方法。
先讲讲Type
和 Kind
的区别
Type
是类型,Kind
是类别,Type 和Kind
可能是相同的,也可能是不同的。
比如:var num int
= 10,num
的 Type
是 int
, Kind
也是 int
;
比如: var stu Student,stu
的Type
是 packageXXX.Student
, Kind
是 struct
。
我们还实现了 2 个方法 newArgv
和 newReplyv
,用于创建对应类型的实例。newArgv
方法有一个小细节,指针类型和值类型创建实例的方式有细微区别。
需要讲解下reflect.Value.Elem(),其是Go语言程序中对指针获取反射对象时,可以通过 reflect.Elem() 方法获取这个指针指向的元素,这个获取过程被称为取元素,等效于对指针类型变量做了一个*
操作
reflect.ValueOf(xxx).Elem() //xxx需要是指针,不然会出现panic
//例子
i := 100
v := reflect.ValueOf(&i).Elem()
fmt.Println(v) //打印出 100
fmt.Println(reflect.ValueOf(&i)) //打印出地址 0xc00009a298
//当xxx是指针时候,就可以使用Elem()来获取xxx指向的值
看回函数newArgv
,当m.ArgType是指针类型时候,New方法中的参数就需要使用Elem()来获取其值,而不是地址。当m.ArgType是值类型时,New方法的参数就是m.ArgType,但应该new之后是指针,所以需要使用Elem()获取值。
再到newReplyv方法,因为响应就是指针类型的,所以New中需要使用Elem()来获取值。
反射的map和slice和其他普通的创建方式是不一样的,所以New之后要是其是map类型,那就需要修改,而Go语言反射修改变量需要使用Set方法。有两种方法,一种是使用 Set 方法,一种是使用 SetXXX() 方法,比如 SetString()、SetInt() 等,但是没有setMap。那就使用Set()。
//语法 使用 reflect.ValueOf 传入我们要获取的变量的地址,并且使用 Elem 获取指针信息
reflect.ValueOf(&x).Elem().Set()
前面定义了方法,即是有了"Foo.Sum"中的Sum,那接着定义Foo,即是服务结构体service。表示服务的结构体,用于注册服务对象。
type service struct {
name string
typ reflect.Type
rcvr reflect.Value
method map[string]*methodType
}
name即是映射的结构体的名称,比如T,WaitGroup;typ是结构体的类型,rcvr是结构体实例本身;method是map类型,存储映射的结构体的所有符合条件的方法,比如Foo服务中可以有Sum方法,也可以有Check方法等等。
那接着完成其构造函数newService
// 语法 func Indirect(v Value) Value
// Indirect返回v持有的指针指向的值的Value封装。若v持有的值为nil,会返回Value零值。若v持有的变量不是指针,那么将返回原值v
func newService(rcvr any) *service {
s := new(service)
s.rcvr = reflect.ValueOf(rcvr)
//通过reflect.Value.Type.Name()获取结构体名
//,但是当reflect.Value是指针时候,Name()返回空字符串。所以要先通过Indirect取指针的值
s.name = reflect.Indirect(s.rcvr).Type().Name()
s.typ = reflect.TypeOf(rcvr)
//该函数是判断s.name是否是以大写字母开头的
if !ast.IsExported(s.name) {
log.Fatalf("rpc server: %s is not a valid service name", s.name)
}
s.registerMethods() //判断方法是否符合条件的
return s
}
registerMethods就是判断方法是否符合条件的,有些方法是不能导出或者参数个数不符合等等情况。
过滤出符合条件的方法:
- 两个可导出或者内置类型的入参(反射时为3个,第0个是自身)
- 返回值只有一个,类型为error
func (s *service) registerMethods() {
s.method = make(map[string]*methodType)
//reflect.Type.NumMethod()是获取该结构体的方法个数,只能获取能导出的(方法首字母大写的)
for i := 0; i < s.typ.NumMethod(); i++ {
method := s.typ.Method(i) //reflect.Type.Method()获取对应类型对应的方法
mType := method.Type //获取方法类型
//reflect.Type.NumIn()是获取参数个数,NumOut()是返回值个数
if mType.NumIn() != 3 || mType.NumOut() != 1 {
continue
}
//reflect.Type.Out()是返回值类型, 判断返回值是不是error类型
if mType.Out(0) != reflect.TypeOf((*error)(nil)).Elem() {
continue
}
argType, replyType := mType.In(1), mType.In(2) //In()是方法参数类型
// 响应值必须为可导出或者内置类型
if !isExportedOrBuiltinType(argType) || !isExportedOrBuiltinType(replyType) {
continue
}
s.method[method.Name] = &methodType{
method: method,
ArgType: argType,
replyType: replyType,
}
log.Printf("rpc server: register %s.%s\n", s.name, method.Name)
}
}
func isExportedOrBuiltinType(t reflect.Type) bool {
if t.Kind() == reflect.Pointer {
t = t.Elem()
}
return ast.IsExported(t.Name()) || t.PkgPath() == ""
}
注册好服务后,那如何才能通过这反射去调用到需要的方法呢。那需要实现call方法,即是能够通过反射调用方法。
func (s *service) call(m *methodType, argv, replyv reflect.Value) error {
f := m.method.Func //m.method是reflect.method类型
returnValues := f.Call([]reflect.Value{s.rcvr, argv, replyv})
if errInter := returnValues[0].Interface(); errInter != nil {
return errInter.(error) //从Value转为原始数据类型
}
return nil
}
这里讲解下反射中调用方法。有两种情况。
reflect.Value.Method(i)
返回一个reflect.Value
对象,它总是以调用Method(i)
方法的reflect.Value
作为接收器对象,不需要额外传入。而且直接使用Call()
发起方法调用:
m.Call(...args)
//前面的方法registerMethods中就可以使用 s.typ.Method(i).Call(...args) 来调用方法
如果要通过reflect.Method
调用方法,必须使用Func
字段,而且要传入接收器的reflect.Value
作为第一个参数:
m.Func.Call(v, ...args)
//Call方法原型 func (v Value) Call(in []Value) []Value
Call()
的返回值为 []Value
,切片中元素的个数就是返回值的个数,下标也一一对应。可被调用的方法返回值只有一个,是error类型。Interface().(int)
4.service 的测试用例
定义结构体Hello,实现两个方法,可导出的方法Sum、 不可导出方法sum,进行测试 newService 和 call 方法。
type Hello int //服务结构体类型
type Args struct{ Num1, Num2 int } //方法的参数类型Args
func (h Hello) Sum(args Args, reply *int) error {
*reply = args.Num1 + args.Num2
return nil
}
// 不能导出的,方法sum的首字母是小写
func (h Hello) sum(args Args, reply *int) error {
*reply = args.Num1 + args.Num2
return nil
}
func _assert(condition bool, msg string, v ...interface{}) {
if !condition {
panic(fmt.Sprintf("assertion failed: "+msg, v...))
}
}
//测试 newService 方法。
func TestNewService(t *testing.T) {
var h Hello
s := newService(&h)
//判断s的方法的个数,是可导出的方法
_assert(len(s.method) == 1, "wrong service Method, expect 1, but got %d", len(s.method))
mType := s.method["Sum"]
//判断是否有这个方法
_assert(mType != nil, "wrong Method, Sum shouldn't nil")
}
//测试 call 方法。
func TestMethodType_Call(t *testing.T) {
var h Hello
s := newService(h)
mType := s.method["Sum"]
//new 方法的参数和返回结果
argv := mType.newArgv()
reply := mType.newReplyv()
//设置参数值
argv.Set(reflect.ValueOf(Args{Num1: 3, Num2: 47}))
err := s.call(mType, argv, reply)
_assert(err == nil && *reply.Interface().(*int) == 50, "failed to call Foo.Sum")
}
5.集成到服务端
需要把该服务注册到服务端。那么服务端肯定是有多个服务的。那如何存储服务呢,我们肯定是想通过服务名字找到服务,时间复杂度是O(1)的,那就要用到map。而map又是并发不安全的。进而可以想到使用sync.Map。这是并发安全的。
可能有读者想到使用原生 map +互斥锁/读写锁。首先我们明确服务肯定是读多写少的,(写即是注册服务,读即是通过名查找服务),读多写少的话,可能大家也会想到使用原生 map读写锁的。
而sync.map是用读写分离实现的,其思想是空间换时间。和map+RWLock的实现方式相比,它做了一些优化:可以无锁访问read map,而且会优先操作read map,倘若只操作read map就可以满足要求(增删改查遍历),那就不用去操作write map(它的读写都要加锁),所以在某些特定场景中它发生锁竞争的频率会远远小于map+RWLock的实现方式。
那在Server结构体中添加服务存储map,并添加一个注册服务的方法。
type Server struct {
serviceMap sync.Map
}
func (server *Server) Register(rcvr any) error {
s := newService(rcvr)
//如果获取的 key 存在,就返回 key 对应的元素,
//若获取的 key 不存在,就返回我们设置的值,并且将我们设置的值,存入 map
if _, dup := server.serviceMap.LoadOrStore(s.name, s); dup {
return errors.New("rpc: service already defined: " + s.name)
}
return nil
}
// Register publishes the receiver's methods in the DefaultServer.
func Register(rcvr interface{}) error { return DefaultServer.Register(rcvr) }
实现了注册服务后,之后还要实现找到服务。
其返回值就是服务service,和其对应的方法名(即是要methodType)。
func (server *Server) findService(serviceMethod string) (svc *service, mtype *methodType, err error) {
//serviceMethod例子 "myservice.say"
dot := strings.LastIndex(serviceMethod, ".")
if dot < 0 {
err = errors.New("rpc server: service/method request ill-formed: " + serviceMethod)
return
}
//获取服务名字和方法名
serviceName, methodName := serviceMethod[:dot], serviceMethod[dot+1:]
//Load是sync.Map获取value的方法,返回值类型是any
svci, ok := server.serviceMap.Load(serviceName)
if !ok {
err = errors.New("rpc server: can't find service " + serviceName)
return
}
svc = svci.(*service) //这个是any类型转成*service类型
mtype = svc.method[methodName] //找到对应的 methodType
if mtype == nil {
err = errors.New("rpc server: can't find method " + methodName)
}
return
}
实现了查找服务的方法后,那该方法要在何处使用的呢。在读取请求(server).readRequest中,我们没有对请求进行处理,也没有获取到服务名和方法名,这是不正确的。所以我们需要从readRequest中开始。
那我们需要先填补下request结构体。readRequest后请求的所有数据都保存在request结构体中。
那该结构体就需要有请求方法的参数和返回值,还有就是需要服务service和方法名字methodType。
type request struct {
h *codec.Header
// argv, replyv reflect.Value
argv, replyv reflect.Value
mtype *methodType
svc *service
//这是之前的
// h *codec.Header
// // argv, replyv reflect.Value
// requestData uint64
// replyData string
}
func (server *Server) readRequest(cc codec.Codec) (*request, error) {
//代码中省略了一些错误处理
h, err := server.readRequestHeader(cc)
if err != nil {
return nil, err
}
req := &request{h: h}
req.svc, req.mtype, err = server.findService(h.ServiceMethod) //在此处使用findService
//创建方法参数和返回值,new出来的
req.argv = req.mtype.newArgv()
req.replyv = req.mtype.newReplyv()
// make sure that argvi is a pointer, ReadBody need a pointer as parameter
argvi := req.argv.Interface() //使用Interface()方法是为了req.argv转回any类型, cc.ReadBody入参需要的
if req.argv.Type().Kind() != reflect.Pointer {
argvi = req.argv.Addr().Interface()
}
err := cc.ReadBody(argvi);
return req, nil
//之前的写法
// req.requestData = reflect.New(reflect.TypeOf(uint64(1)))
// err = cc.ReadBody(req.requestData.Interface())
// TODO: now we don't know the type of request argv
//这一章节,我们只能处理用户发送过来的uint64类型的数据
// if err = cc.ReadBody(&req.requestData); err != nil {
// log.Println("rpc server: read argv err:", err)
// }
// return req, nil
}
(server).readRequest方法中调用readRequestHeader读取head(head有服务名和方法名),之后调用findService方法获取服务。再通过 newArgv()
和 newReplyv()
两个方法创建出两个入参实例。之后就是调用ReadBody。
这里要强调一点,(dec *Decoder) Decode(e any)需要确保参数e是个指针,是Decode方法要求的。而Go源码Decode方法中会调用DecodeValue(v reflect.Value)方法。所以通过调用一个reflect.Value值的Interface方法得到一个interface{}值,就可以传入了。
跟着到handleRequest方法。这个方法是处理请求,即是调用该服务的对应的方法,即是要用call。之后调用sendResponse返回结果给客户端。 req.replyv是结果,将其传递给 sendResponse 完成序列化。
func (server *Server) handleRequest(cc codec.Codec, req *request, sending *sync.Mutex, wg *sync.WaitGroup) {
defer wg.Done()
err := req.svc.call(req.mtype, req.argv, req.replyv)
if err != nil {
req.h.Error = err.Error()
server.sendResponse(cc, req.h, invalidRequest, sending)
return
}
server.sendResponse(cc, req.h, req.replyv.Interface(), sending)
//之前的做法
// defer wg.Done()
// req.replyData = fmt.Sprintf(" ok my resp %d", req.h.Seq)
// server.sendResponse(cc, req.h, &req.replyData, sending)
}
到这里,我们成功在服务端实现了服务注册与调用。
6.测试
第一步定义结构体My和Sum方法(其方法要符合rpc的要求)
type My int
type Args struct{ Num1, Num2 int }
func (m *My) Sum(args Args, reply *int) error {
*reply = args.Num1 + args.Num2
return nil
}
第二步,注册服务,并启动rpc服务(这里对比之前的,主要是添加了注册服务这步骤)
func startServer(addr chan string) {
//注册服务
var myServie My
//这里一定要用&myServie,因为前面Sum方法的接受者是*My;若接受者是My,myServie或者&myServie都可以
if err := geerpc.Register(&myServie); err != nil {
slog.Error("register error:", err) //slog是Go官方建议的日志库
os.Exit(1)
}
//启动服务端
l, err := net.Listen("tcp", "localhost:10000")
if err != nil {
slog.Error("network error:", err)
os.Exit(1)
}
slog.Info("start rpc server on", l.Addr())
addr <- l.Addr().String()
geerpc.Accept(l)
}
第三步,主要是客户端的操作,构造参数,发送 RPC 请求,并打印结果。Call中的"My.Sum"就要对应。是结构体My,服务名字就要写My,方法名是Sum,其就要写Sum。
func main() {
addr := make(chan string)
go startServer(addr)
client, _ := geerpc.Dail("tcp", <-addr)
defer client.Close()
time.Sleep(time.Second * 1)
num := 3
var wg sync.WaitGroup
wg.Add(num)
for i := 0; i < num; i++ {
go func(i int) {
defer wg.Done()
args := &Args{Num1: i, Num2: i * i}
var reply int
if err := client.Call("My.Sum", args, &reply); err != nil {
log.Fatal("call Foo.Sum error:", err)
}
fmt.Println("reply: ", reply)
}(i)
}
wg.Wait()
}
7.总结
到这里,和官方的net/rpc库已经接近了,已实现rpc基础的重要部分。
前面的代码例子,总结下主要2步:
1.注册服务,并启动rpc服务,
2.创建rpc客户端,客户端发起请求Call.
注册服务
通过newService方法新建服务,通过reflect获取服务的结构体名,类型等等,之后调用(service.) registerMethods方法来获取可以导出的方法,然后存储在服务端。这里使用了很多反射的知识。
rpc客户端
客户端主要有两个操作
- 创建客户端,通过parseOptions解析规定编解码方式,之后在newClientCodec方法中新开协程执行(Client.)receive方法,该方法是读取客户端发送的请求并进行解析。
- 发起请求(即是调用Call)。其内部调用(Client.) Go方法。(
Go
是一个异步接口,返回 call 实例),最终再调用(Client.)send方法发送给服务端。
完整代码: https://githubfast.com/liwook/Go-projects/tree/main/geerpc/3-service