今天这篇是接上上篇RPC原理之后这篇是讲如何使用go本身自带的标准库RPC。这篇篇幅会比较短。重点在于上一章对的补充。
文章目录
- RPC包的概念
- 使用RPC包
- 服务器代码分析
- 如何实现的?
- 总结
- Server还提供了两个注册服务的方法
- 客户端代码分析
- 如何实现的?
- 如何异步编程同步?
- 总结
- codec/序列化框架
- 使用JSON协议的RPC
RPC包的概念
回顾RPC原理
看完回顾后其实就可以继续需了解并使用go中所提供的包。
Go语言的 rpc
包提供对通过网络或其他i/o
连接导出的对象方法的访问,服务器注册一个对象,并把它作为服务对外可见(服务名称就是类型名称)。
注册后,对象的导出方法将支持远程访问。服务器可以注册不同类型的多个对象(服务) ,但是不支持注册同一类型的多个对象。
Go官方提供了一个RPC库: net/rpc
。
包rpc
提供了通过网络访问一个对象的输出方法的能力。
服务器需要注册对象,通过对象的类型名暴露这个服务。
注册后这个对象的输出方法就可以远程调用,这个库封装了底层传输的细节,包括序列化(默认GOB
序列化器)。
对象的方法要能远程访问,它们必须满足一定的条件,否则这个对象的方法会被忽略:
- 方法的
类型
是可输出的 - 方法
本身
是可输出的 - 方法必须由两个参数,必须是输出类型或者是内建类型
- 方法的第二个参数必须是
指针类型
- 方法返回类型为
error
所以一个输出方法的格式如下:
func (t *T) MethodName(argType T1, replyType *T2) error
这里的T
、T1
、T2
能够被encoding/gob
序列化,即使使用其它的序列化框架
,将来这个需求可能回被弱化。
- 第一个参数
(T1)
代表调用者(client)提供的参数 - 第二个参数
(*T2)
代表要返回给调用者的计算结果 - 方法的返回值如果不为空, 那么它作为一个
字符串
返回给调用者(所以需要一个序列化框架) - 如果返回
error
,则reply
参数不会返回给调用者
使用RPC包
简单例子,是一个非常简单的服务。
我们在这个里面就搞1
和12
就好:
在这个例子中定义了一个简单的RPC服务器和客户端,使用的方法是一个
第一步
需要定义传入参数和返回参数的数据结构
type Args struct {
A, B int
}
type Quotient struct {
Quo, Rem int
}
第二步
定义一个服务对象,这个服务对象可以很简单。
比如类型是int
或者是interface{}
,重要的是它输出的方法。
type Arith int
第三步
实现这个类型的两个方法, 乘法和除法:
func (t *Arith) Multiply(args *Args, reply *int) error {
*reply = args.A * args.B
return nil
}
func (t *Arith) Divide(args *Args, quo *Quotient) error {
if args.B == 0 {
return errors.New("divide by zero")
}
quo.Quo = args.A / args.B
quo.Rem = args.A % args.B
return nil
}
第四步
实现RPC服务器: 基于tcp实现
生成了一个Arith
对象,并使用rpc.Register
注册这个服务,然后通过HTTP暴露出来
arith := new(Arith)
rpc.Register(arith)
rpc.HandleHTTP()
l, e := net.Listen("tcp", ":9091")
if e != nil {
log.Fatal("listen error:", e)
}
go http.Serve(l, nil)
select{
}
客户端可以看到服务Arith
以及它的两个方法Arith.Multiply
和Arith.Divide
第五步
创建一个客户端,建立客户端和服务器端的连接: 分为同步调用和异步调用(都是远程调用)
同步调用:
client, err := rpc.DialHTTP("tcp", "127.0.0.1:9091")
if err != nil {
log.Fatal("dialing:", err)
}
args := &server.Args{7,8}
var reply int
err = client.Call("Arith.Multiply", args, &reply)
if err != nil {
log.Fatal("arith error:", err)
}
fmt.Printf("Arith: %d*%d=%d", args.A, args.B, reply)
异步调用:
client, err := rpc.DialHTTP("tcp", "127.0.0.1:9091")
if err != nil {
log.Fatal("dialing:", err)
}
quotient := new(Quotient)
divCall := client.Go("Arith.Divide", args, quotient, nil)
replyCall := <-divCall.Done // will be equal to divCall
// check errors, print, etc.
完整的例子:
- 创建一个
service.go
的文件用来保存结构体对象以及方法
package main
import "errors"
type Args struct {
A, B int
}
type Quotient struct {
Quo, Rem int
}
type Arith int
func (t *Arith) Multiply(args *Args, reply *int) error {
*reply = args.A * args.B
return nil
}
func (t *Arith) Divide(args *Args, quo *Quotient) error {
if args.B == 0 {
return errors.New("divide by zero")
}
quo.Quo = args.A / args.B
quo.Rem = args.A % args.B
return nil
}
- 创建一个
RPC
服务端,server.go
package main
import (
"log"
"net"
"net/http"
"net/rpc"
)
func main() {
arith := new(Arith)
rpc.Register(arith)
rpc.HandleHTTP()
l, e := net.Listen("tcp", ":9091")
if e != nil {
log.Fatal("listen error:", e)
}
go http.Serve(l, nil)
select {}
}
- 创建一个客户端,
client.go
package main
import (
"fmt"
"log"
"net/rpc"
)
func main() {
// 建立HTTP连接
client, err := rpc.DialHTTP("tcp", "127.0.0.1:9091")
if err != nil {
log.Fatal("dialing:", err)
}
// 同步调用
args := &Args{7, 8}
var reply int
err = client.Call("Arith.Multiply", args, &reply)
if err != nil {
log.Fatal("arith error:", err)
}
fmt.Printf("Arith: %d*%d=%d", args.A, args.B, reply)
// 异步调用
quotient := new(Quotient)
divCall := client.Go("Arith.Divide", args, quotient, nil)
replyCall := <-divCall.Done // will be equal to divCall
// check errors, print, etc.
fmt.Println(replyCall.Error)
fmt.Println(quotient)
}
打开终端:
先启动服务器:
go run server.go service.go
在打开一个终端:
最后启动一个客户端:
go run client.go service.go
结果为:
服务器代码分析
Server
的很多方法
你可以直接调用,这对于一个简单的Server
的实现更方便,但是你如果需要配置不同的Server,
比如不同的监听地址或端口,就需要自己生成Server:
var DefaultServer = NewServer()
Server
有多种Socket监听
的方式:
func (server *Server) Accept(lis net.Listener)
func (server *Server) HandleHTTP(rpcPath, debugPath string)
func (server *Server) ServeCodec(codec ServerCodec)
func (server *Server) ServeConn(conn io.ReadWriteCloser)
func (server *Server) ServeHTTP(w http.ResponseWriter, req *http.Request)
func (server *Server) ServeRequest(codec ServerCodec) error
ServeHTTP
: 实现了处理http
请求的业务逻辑,它首先处理http
的CONNECT
请求, 接收后就Hijacker
这个连接conn
, 然后调用ServeConn
在这个连接上处理这个客户端的请求。- 其实是实现了
http.Handler
接口,我们一般不直接调用这个方法。Server.HandleHTTP
设置rpc的上下文路径rpc.HandleHTTP
使用默认的上下文路径`DefaultRPCPath
DefaultDebugPath
- 当你启动一个
http server
的时候http.ListenAndServe
,面设置的上下文将用作RPC传输,这个上下文的请求会教给ServeHTTP
来处理 - 以上是
RPC over http
的实现,可以看出net/rpc
只是利用http CONNECT
建立连接,这和普通的RESTful api
还是不一样的。 - (源码)
- 其实是实现了
func (server *Server) ServeHTTP(w http.ResponseWriter, req *http.Request) {
if req.Method != "CONNECT" {
w.Header().Set("Content-Type", "text/plain; charset=utf-8")
w.WriteHeader(http.StatusMethodNotAllowed)
io.WriteString(w, "405 must CONNECT\n")
return
}
conn, _, err := w.(http.Hijacker).Hijack()
if err != nil {
log.Print("rpc hijacking ", req.RemoteAddr, ": ", err.Error())
return
}
io.WriteString(conn, "HTTP/1.0 "+connected+"\n\n")
server.ServeConn(conn)
}
如何实现的?
Accept
用来处理一个监听器,一直在监听客户端的连接,一旦监听器接收了一个连接,则还是交给ServeConn
在另外一个goroutine
中去处理:(源码)
//Accept接受侦听器上的连接并提供请求
//每个传入连接。接受阻塞,直到监听器
//返回非nil错误。对象中调用Accept
//go语句
func (server *Server) Accept(lis net.Listener) {
for {
conn, err := lis.Accept()
if err != nil {
log.Print("rpc.Serve: accept:", err.Error())
return
}
go server.ServeConn(conn)
}
}
协程进入ServeConn
可以看出,很重要的一个方法就是ServeConn
// ServeConn在单连接上运行服务器。
// ServeConn阻塞,服务连接,直到客户端挂起。
//调用者通常在go语句中调用ServeConn。
// ServeConn使用gob连接格式(参见包gob)
//连接。要使用备用编解码器,请使用ServeCodec。
//有关并发访问的信息,请参阅NewClient的注释。.
func (server *Server) ServeConn(conn io.ReadWriteCloser) {
buf := bufio.NewWriter(conn)
srv := &gobServerCodec{
rwc: conn,
dec: gob.NewDecoder(conn),
enc: gob.NewEncoder(buf),
encBuf: buf,
}
server.ServeCodec(srv)
}
连接其实是交给一个ServerCodec
去处理,这里默认
使用gobServerCodec
去处理,这是一个未输出默认的编解码器,可以使用其它的编解码器。
// ServeCodec类似于ServeConn,但使用指定的编解码器来
//解码请求和编码响应。
func (server *Server) ServeCodec(codec ServerCodec) {
sending := new(sync.Mutex)
wg := new(sync.WaitGroup)
for {
service, mtype, req, argv, replyv, keepReading, err := server.readRequest(codec)
if err != nil {
if debugLog && err != io.EOF {
log.Println("rpc:", err)
}
if !keepReading {
break
}
// send a response if we actually managed to read a header.
if req != nil {
server.sendResponse(sending, req, invalidRequest, codec, err.Error())
server.freeRequest(req)
}
continue
}
wg.Add(1)
go service.call(server, sending, wg, mtype, req, argv, replyv, codec)
}
//我们已经看到没有更多的请求。
//在关闭编解码器之前等待响应。
wg.Wait()
codec.Close()
}
它其实一直从连接中读取请求,然后调用go service.call在另外的goroutine中处理服务调用。
总结
-
对象重用。
Request
和Response
都是可重用的,通过Lock
处理竞争。这在大并发
的情况下很有效。 -
使用了大量的
goroutine
。如果使用一定数量的goroutine
作为worker池
去处理这个case,可能还会有些性能的提升,但是更复杂了。使用goroutine
可以获得了非常好的性能。 -
业务处理是异步的,服务的执行不会阻塞其它消息的读取。
-
一个
codec实例
必然和一个connnection
相关,因为它需要从connection
中读取request和发送response
。
go的rpc官方库
的消息(request
和response
)的定义很简单, 就是消息头(header
)+内容体(body
)。
消息体是reply类型的序列化后的值。
type Request struct {
ServiceMethod string // format: "Service.Method"
Seq uint64 // 客户端选择的序列号
// 包含过滤或未导出的字段
}
Server还提供了两个注册服务的方法
第二个方法为服务起一个别名,否则服务名已它的类型命名
func (server *Server) Register(rcvr interface{}) error
func (server *Server) RegisterName(name string, rcvr interface{}) error
它们俩底层调用register进行服务的注册(这里的源码太多就不放了)
func (server *Server) register(rcvr interface{}, name string, useName bool) error
受限于Go
语言的特点,我们不可能在接到客户端的请求的时候,根据反射动态的创建一个对象,就是Java
那样。
因此在Go语言中,我们需要预先创建一个服务map这是在编译的时候完成的
说白了这里需要建立一个注册名与服务之间的映射关系
server.serviceMap = make(map[string]*service)
同时每个服务还有一个方法map: map[string]*methodType
,通过suitableMethods
建立映射:
func suitableMethods(typ reflect.Type, reportErr bool) map[string]*methodType
这样rpc
在读取请求header
,通过查找这两个map
,就可以得到要调用的服务及它的对应方法了。
func (s *service) call(server *Server, sending *sync.Mutex, wg *sync.WaitGroup, mtype *methodType, req *Request, argv, replyv reflect.Value, codec ServerCodec) {
if wg != nil {
defer wg.Done()
}
mtype.Lock()
mtype.numCalls++
mtype.Unlock()
function := mtype.method.Func
// 调用该方法,为应答提供一个新值。
returnValues := function.Call([]reflect.Value{s.rcvr, argv, replyv})
// 该方法的返回值是一个错误。.
errInter := returnValues[0].Interface()
errmsg := ""
if errInter != nil {
errmsg = errInter.(error).Error()
}
server.sendResponse(sending, req, replyv.Interface(), codec, errmsg)
server.freeRequest(req)
}
客户端代码分析
客户端要建立和服务器的连接
func Dial(network, address string) (*Client, error)
func DialHTTP(network, address string) (*Client, error)
func DialHTTPPath(network, address, path string) (*Client, error)
func NewClient(conn io.ReadWriteCloser) *Client
func NewClientWithCodec(codec ClientCodec) *Client
如何实现的?
DialHTTP
和 DialHTTPPath
是通过HTTP的方式和服务器建立连接,他俩的区别之在于是否设置上下文路径:
// DialHTTPPath连接HTTP RPC服务器在指定的网络地址和路径上
func DialHTTPPath(network, address, path string) (*Client, error) {
conn, err := net.Dial(network, address)
if err != nil {
return nil, err
}
io.WriteString(conn, "CONNECT "+path+" HTTP/1.0\n\n")
// 在切换到RPC协议之前,需要成功的HTTP响应
resp, err := http.ReadResponse(bufio.NewReader(conn), &http.Request{Method: "CONNECT"})
if err == nil && resp.Status == connected {
return NewClient(conn), nil
}
if err == nil {
err = errors.New("unexpected HTTP response: " + resp.Status)
}
conn.Close()
return nil, &net.OpError{
Op: "dial-http",
Net: network + " " + address,
Addr: nil,
Err: err,
}
}
首先发送 CONNECT
请求,如果连接成功则通过NewClient(conn)
创建client
。
Dial
则通过TCP
直接连接服务器
// Dial连接到指定网络地址的RPC服务器
func Dial(network, address string) (*Client, error) {
conn, err := net.Dial(network, address)
if err != nil {
return nil, err
}
return NewClient(conn), nil
}
注意:根据服务是over HTTP还是 over TCP选择合适的连接方式
NewClient
则创建一个缺省codec
为glob序列化库
的客户端
// NewClient返回一个新的Client来处理到连接另一端的服务集合。
//在连接的写端添加一个缓冲区,报头和有效载荷作为一个单元发送。
//
//连接的读写部分是独立序列化的,不需要联锁。然而,每一半都可以访问并发,所以conn的实现应该防止,并发读或并发写。
func NewClient(conn io.ReadWriteCloser) *Client {
encBuf := bufio.NewWriter(conn)
client := &gobClientCodec{conn, gob.NewDecoder(conn), gob.NewEncoder(encBuf), encBuf}
return NewClientWithCodec(client)
}
如果想用其它的序列化库,你可以调用NewClientWithCodec
方法 一般用来做RPC框架的
// NewClientWithCodec类似于NewClient,但使用指定的编码请求和解码响应。
func NewClientWithCodec(codec ClientCodec) *Client {
client := &Client{
codec: codec,
pending: make(map[uint64]*Call),
}
go client.input()
return client
}
重要的是input
方法,它以一个死循环的方式不断
地从连接中读取response
,然后调用map
中读取等待的Call.Done channel
通知完成。(这个其实有点令牌扫描的作用,消息队列中有说)
消息的结构和服务器一致,都是Header+Body
的方式
客户端的调用有两个方法: Go
和 Call
Go方法
是异步的,它返回一个Call指针对象
, 它的Done
是一个channel
,如果服务返回,Done
就可以得到返回的对象(实际是Call对象
,包含Reply
和error信息
)。Call 方法
是同步的,它实际是调用Go实现的。
如何异步编程同步?
func (client *Client) Call(serviceMethod string, args interface{}, reply interface{}) error {
call := <-client.Go(serviceMethod, args, reply, make(chan *Call, 1)).Done
return call.Error
}
从一个Channel
中读取对象会被阻塞住,直到有对象可以读取,这种实现很简单,也很方便。
总结
从源码中:我们还可以学到锁Lock的一种实用方式,也就是尽快的释放锁,而不是defer mu.Unlock
直到函数执行到最后才释放,那样锁
占用的时间太长了。
codec/序列化框架
rpc框架默认使用gob序列化库,很多情况下我们追求更好的效率的情况下,或者追求更通用的序列化格式,我们可能采用其它的序列化方式, 比如protobuf, json, xml等。
市面上也有许多序列化框架。速度快而且非常好用。gRPC
是互联网后台常用的RPC
框架,其内部是由protobuf
协议完成通讯。这个后面再讲。
(JDK Serializable
、FST
、Kryo
、Protobuf
、Thrift
、Hession
和Avro
,Fury
)
Fury
是最新的序列化框架:号称比jdk 快170倍,后面会讲的 支持多种语言
Go官方库实现了JSON-RPC 1.0
。JSON-RPC
是一个通过JSON格式
进行消息传输的RPC规范
,因此可以进行跨语言的调用。
Go的net/rpc/jsonrpc
库可以将JSON-RPC
的请求转换成自己内部的格式:
func (c *serverCodec) ReadRequestHeader(r *rpc.Request) error {
c.req.reset()
if err := c.dec.Decode(&c.req); err != nil {
return err
}
r.ServiceMethod = c.req.Method
c.mutex.Lock()
c.seq++
c.pending[c.seq] = c.req.Id
c.req.Id = nil
r.Seq = c.seq
c.mutex.Unlock()
return nil
}
使用JSON协议的RPC
rpc
包默认使用的是 gob 协议
对传输数据进行序列化/反序列化
,比较有局限性
。
将例子进行修改:
服务器端:
package main
import (
"log"
"net"
"net/rpc"
"net/rpc/jsonrpc"
)
func main() {
arith := new(Arith)
rpc.Register(arith)
l, e := net.Listen("tcp", ":9091")
if e != nil {
log.Fatal("listen error:", e)
}
for {
conn, _ := l.Accept()
// 使用JSON协议
rpc.ServeCodec(jsonrpc.NewServerCodec(conn))
}
}
客户端:
package main
import (
"fmt"
"log"
"net"
"net/rpc"
"net/rpc/jsonrpc"
)
func main() {
// 建立HTTP连接
conn, err := net.Dial("tcp", "127.0.0.1:9091")
if err != nil {
log.Fatal("dialing:", err)
}
client := rpc.NewClientWithCodec(jsonrpc.NewClientCodec(conn))
// 同步调用
args := &Args{7, 8}
var reply int
err = client.Call("Arith.Multiply", args, &reply)
if err != nil {
log.Fatal("arith error:", err)
}
fmt.Printf("Arith: %d*%d=%d", args.A, args.B, reply)
// 异步调用
quotient := new(Quotient)
divCall := client.Go("Arith.Divide", args, quotient, nil)
replyCall := <-divCall.Done // will be equal to divCall
// check errors, print, etc.
fmt.Println(replyCall.Error)
fmt.Println(quotient)
}
如何使用与上面的例子一致。
社区中各式RPC框架(grpc、thrift等)就是为了让RPC调用更方便。