Golang学习之路——之tinyrpc源码阅读

news2025/1/11 22:48:18

tinyrpc是一个高性能的基于protocol buffer的rpc框架。项目代码非常少,很适合初学者进行golang的学习。

如果你正在为没有资料学习发愁,文末有相关的学习资料获取方式

tinyrpc功能

tinyrpc基于TCP协议,支持各种压缩格式,基于protocol buffer的序列化协议。其rpc是基于golang原生的net/rpc开发而成。

tinyrpc项目结构

tinyrpc基于net/rpc开发而成,在此基础上集成了额外的能力。项目结构如图:

功能目录如下:

  • codec 编码模块
  • compressor 压缩模块
  • header 请求/响应头模块
  • protoc-gen-tinyrpc 代码生成插件
  • serializer 序列化模块

tinyrpc源码解读

客户端和服务端构建

客户端是以net/rpcrpc.Client为基础构建,在此基础上定义了Option以配置压缩方式和序列化方式:

type Option func(o *options)

type options struct {
    compressType compressor.CompressType
    serializer   serializer.Serializer
}

在创建客户端的时候将配置好的压缩算法和序列化方式作为创建客户端的参数:

func NewClient(conn io.ReadWriteCloser, opts ...Option) *Client {
    options := options{
        compressType: compressor.Raw,
        serializer:   serializer.Proto,
    }
    for _, option := range opts {
        option(&options)
    }
    return &Client{rpc.NewClientWithCodec(
        codec.NewClientCodec(conn, options.compressType, options.serializer))}
}

服务端是以net/rpcrpc.Server为基础构建,在此基础上扩展了Server的定义:

type Server struct {
    *rpc.Server
    serializer.Serializer
}

在创建客户端和开启服务时传入序列化方式:

func NewServer(opts ...Option) *Server {
    options := options{
        serializer: serializer.Proto,
    }
    for _, option := range opts {
        option(&options)
    }

    return &Server{&rpc.Server{}, options.serializer}
}

func (s *Server) Serve(lis net.Listener) {
    log.Printf("tinyrpc started on: %s", lis.Addr().String())
    for {
        conn, err := lis.Accept()
        if err != nil {
            continue
        }
        go s.Server.ServeCodec(codec.NewServerCodec(conn, s.Serializer))
    }
}

压缩算法compressor

压缩算法的实现中首先是定义了压缩的接口:

type Compressor interface {
    Zip([]byte) ([]byte, error)
    Unzip([]byte) ([]byte, error)
}

压缩的接口包含压缩和解压方法。

压缩算法使用的是uint类型,使用iota来初始化,并且使用map来进行所有压缩算法实现的管理:

type CompressType uint16

const (
    Raw CompressType = iota
    Gzip
    Snappy
    Zlib
)

// Compressors which supported by rpc
var Compressors = map[CompressType]Compressor{
    Raw:    RawCompressor{},
    Gzip:   GzipCompressor{},
    Snappy: SnappyCompressor{},
    Zlib:   ZlibCompressor{},
}

序列化 serializer

序列化部分代码非常简单,提供了一个接口:

type Serializer interface {
    Marshal(message interface{}) ([]byte, error)
    Unmarshal(data []byte, message interface{}) error
}

目前只有ProtoSerializer一个实现,ProtoSerializer内部的实现是基于"google.golang.org/protobuf/proto"来实现的,并没有什么特殊的处理,因此就不花费笔墨详述了。

请求/响应头 header

tinyrpc定义了自己的请求头和响应头:

// RequestHeader request header structure looks like:
// +--------------+----------------+----------+------------+----------+
// | CompressType |      Method    |    ID    | RequestLen | Checksum |
// +--------------+----------------+----------+------------+----------+
// |    uint16    | uvarint+string |  uvarint |   uvarint  |  uint32  |
// +--------------+----------------+----------+------------+----------+
type RequestHeader struct {
    sync.RWMutex
    CompressType compressor.CompressType
    Method       string
    ID           uint64
    RequestLen   uint32
    Checksum     uint32
}

请求头由压缩类型,方法,id,请求长度和校验码组成。

// ResponseHeader request header structure looks like:
// +--------------+---------+----------------+-------------+----------+
// | CompressType |    ID   |      Error     | ResponseLen | Checksum |
// +--------------+---------+----------------+-------------+----------+
// |    uint16    | uvarint | uvarint+string |    uvarint  |  uint32  |
// +--------------+---------+----------------+-------------+----------+
type ResponseHeader struct {
    sync.RWMutex
    CompressType compressor.CompressType
    ID           uint64
    Error        string
    ResponseLen  uint32
    Checksum     uint32
}

响应头由压缩类型,id,错误信息,返回长度和校验码组成。

为了实现头的重用,tinyrpc为头构建了缓存池:

var (
    RequestPool  sync.Pool
    ResponsePool sync.Pool
)

func init() {
    RequestPool = sync.Pool{New: func() interface{} {
        return &RequestHeader{}
    }}
    ResponsePool = sync.Pool{New: func() interface{} {
        return &ResponseHeader{}
    }}
}

在使用时get出来,生命周期结束后放回池子,并且在put之前需要进行重置:

    h := header.RequestPool.Get().(*header.RequestHeader)
    defer func() {
        h.ResetHeader()
        header.RequestPool.Put(h)
    }()

// ResetHeader reset request header
func (r *RequestHeader) ResetHeader() {
    r.Lock()
    defer r.Unlock()
    r.ID = 0
    r.Checksum = 0
    r.Method = ""
    r.CompressType = 0
    r.RequestLen = 0
}

// ResetHeader reset response header
func (r *ResponseHeader) ResetHeader() {
    r.Lock()
    defer r.Unlock()
    r.Error = ""
    r.ID = 0
    r.CompressType = 0
    r.Checksum = 0
    r.ResponseLen = 0
}

搞清楚了头的结构以及对象池的复用逻辑,那么具体的头的编码与解码就是很简单的拆装工作,就不在此一行一行解析了,大家有兴趣可以自行去阅读。

编码 codec

由于tinyrpc是基于net/rpc开发,那么其codec模块自然也是依赖于net/rpcClientCodecServerCodec接口来实现的。

客户端实现

客户端是基于ClientCodec实现的能力:

type ClientCodec interface {
    WriteRequest(*Request, any) error
    ReadResponseHeader(*Response) error
    ReadResponseBody(any) error

    Close() error
}

client定义了一个clientCodec类型,并且实现了ClientCodec的接口方法:

type clientCodec struct {
    r io.Reader
    w io.Writer
    c io.Closer

    compressor compressor.CompressType // rpc compress type(raw,gzip,snappy,zlib)
    serializer serializer.Serializer
    response   header.ResponseHeader // rpc response header
    mutex      sync.Mutex            // protect pending map
    pending    map[uint64]string
}

WriteRequest实现:

// WriteRequest Write the rpc request header and body to the io stream
func (c *clientCodec) WriteRequest(r *rpc.Request, param interface{}) error {
    c.mutex.Lock()
    c.pending[r.Seq] = r.ServiceMethod
    c.mutex.Unlock()

    if _, ok := compressor.Compressors[c.compressor]; !ok {
        return NotFoundCompressorError
    }
    reqBody, err := c.serializer.Marshal(param)
    if err != nil {
        return err
    }
    compressedReqBody, err := compressor.Compressors[c.compressor].Zip(reqBody)
    if err != nil {
        return err
    }
    h := header.RequestPool.Get().(*header.RequestHeader)
    defer func() {
        h.ResetHeader()
        header.RequestPool.Put(h)
    }()
    h.ID = r.Seq
    h.Method = r.ServiceMethod
    h.RequestLen = uint32(len(compressedReqBody))
    h.CompressType = compressor.CompressType(c.compressor)
    h.Checksum = crc32.ChecksumIEEE(compressedReqBody)

    if err := sendFrame(c.w, h.Marshal()); err != nil {
        return err
    }
    if err := write(c.w, compressedReqBody); err != nil {
        return err
    }

    c.w.(*bufio.Writer).Flush()
    return nil
}

可以看到代码的实现还是比较清晰的,主要分为几个步骤:

  1. 将数据进行序列化构成请求体
  2. 选择相应的压缩算法进行压缩
  3. 从Pool中获取请求头实例将数据全部填入其中构成最后的请求头
  4. 分别通过io操作发送处理过的请求头和请求体

ReadResponseHeader实现:

// ReadResponseHeader read the rpc response header from the io stream
func (c *clientCodec) ReadResponseHeader(r *rpc.Response) error {
    c.response.ResetHeader()
    data, err := recvFrame(c.r)
    if err != nil {
        return err
    }
    err = c.response.Unmarshal(data)
    if err != nil {
        return err
    }
    c.mutex.Lock()
    r.Seq = c.response.ID
    r.Error = c.response.Error
    r.ServiceMethod = c.pending[r.Seq]
    delete(c.pending, r.Seq)
    c.mutex.Unlock()
    return nil
}

此方法作用是读取返回的响应头,并解析成具体的结构体

ReadResponseBody实现:

func (c *clientCodec) ReadResponseBody(param interface{}) error {
    if param == nil {
        if c.response.ResponseLen != 0 {
            if err := read(c.r, make([]byte, c.response.ResponseLen)); err != nil {
                return err
            }
        }
        return nil
    }

    respBody := make([]byte, c.response.ResponseLen)
    err := read(c.r, respBody)
    if err != nil {
        return err
    }

    if c.response.Checksum != 0 {
        if crc32.ChecksumIEEE(respBody) != c.response.Checksum {
            return UnexpectedChecksumError
        }
    }

    if c.response.GetCompressType() != c.compressor {
        return CompressorTypeMismatchError
    }

    resp, err := compressor.Compressors[c.response.GetCompressType()].Unzip(respBody)
    if err != nil {
        return err
    }

    return c.serializer.Unmarshal(resp, param)
}

此方法是用于读取返回的响应结构体,流程如下:

  1. 读取流获取响应体
  2. 根据响应头中的校验码来比对响应体是否完整
  3. 根据压缩算法来解压具体的结构体
  4. 进行反序列化

服务端实现

服务端是基于ServerCodec实现的能力:

type ServerCodec interface {
    ReadRequestHeader(*Request) error
    ReadRequestBody(any) error
    WriteResponse(*Response, any) error

    // Close can be called multiple times and must be idempotent.
    Close() error
}

和客户端类似,server定义了一个serverCodec类型,并且实现了ServerCodec的接口方法:

type serverCodec struct {
    r io.Reader
    w io.Writer
    c io.Closer

    request    header.RequestHeader
    serializer serializer.Serializer
    mutex      sync.Mutex // protects seq, pending
    seq        uint64
    pending    map[uint64]*reqCtx
}

ReadRequestHeader实现:

// ReadRequestHeader read the rpc request header from the io stream
func (s *serverCodec) ReadRequestHeader(r *rpc.Request) error {
    s.request.ResetHeader()
    data, err := recvFrame(s.r)
    if err != nil {
        return err
    }
    err = s.request.Unmarshal(data)
    if err != nil {
        return err
    }
    s.mutex.Lock()
    s.seq++
    s.pending[s.seq] = &reqCtx{s.request.ID, s.request.GetCompressType()}
    r.ServiceMethod = s.request.Method
    r.Seq = s.seq
    s.mutex.Unlock()
    return nil
}

此方法用于读取请求头并解析成结构体

ReadRequestBody实现:

// ReadRequestBody read the rpc request body from the io stream
func (s *serverCodec) ReadRequestBody(param interface{}) error {
    if param == nil {
        if s.request.RequestLen != 0 {
            if err := read(s.r, make([]byte, s.request.RequestLen)); err != nil {
                return err
            }
        }
        return nil
    }

    reqBody := make([]byte, s.request.RequestLen)

    err := read(s.r, reqBody)
    if err != nil {
        return err
    }

    if s.request.Checksum != 0 {
        if crc32.ChecksumIEEE(reqBody) != s.request.Checksum {
            return UnexpectedChecksumError
        }
    }

    if _, ok := compressor.
        Compressors[s.request.GetCompressType()]; !ok {
        return NotFoundCompressorError
    }

    req, err := compressor.
        Compressors[s.request.GetCompressType()].Unzip(reqBody)
    if err != nil {
        return err
    }

    return s.serializer.Unmarshal(req, param)
}

此方法用于读取请求体,流程和读取响应体差不多,大致如下:

  1. 读取流并解析成请求体
  2. 根据请求头中的校验码进行校验
  3. 根据压缩算法进行解压
  4. 反序列化

WriteResponse实现:

// WriteResponse Write the rpc response header and body to the io stream
func (s *serverCodec) WriteResponse(r *rpc.Response, param interface{}) error {
    s.mutex.Lock()
    reqCtx, ok := s.pending[r.Seq]
    if !ok {
        s.mutex.Unlock()
        return InvalidSequenceError
    }
    delete(s.pending, r.Seq)
    s.mutex.Unlock()

    if r.Error != "" {
        param = nil
    }
    if _, ok := compressor.
        Compressors[reqCtx.compareType]; !ok {
        return NotFoundCompressorError
    }

    var respBody []byte
    var err error
    if param != nil {
        respBody, err = s.serializer.Marshal(param)
        if err != nil {
            return err
        }
    }

    compressedRespBody, err := compressor.
        Compressors[reqCtx.compareType].Zip(respBody)
    if err != nil {
        return err
    }
    h := header.ResponsePool.Get().(*header.ResponseHeader)
    defer func() {
        h.ResetHeader()
        header.ResponsePool.Put(h)
    }()
    h.ID = reqCtx.requestID
    h.Error = r.Error
    h.ResponseLen = uint32(len(compressedRespBody))
    h.Checksum = crc32.ChecksumIEEE(compressedRespBody)
    h.CompressType = reqCtx.compareType

    if err = sendFrame(s.w, h.Marshal()); err != nil {
        return err
    }

    if err = write(s.w, compressedRespBody); err != nil {
        return err
    }
    s.w.(*bufio.Writer).Flush()
    return nil
}

此方法用于写入响应体,大致与写入请求体差不多,流程如下:

  1. 将响应体序列化
  2. 使用压缩算法将响应体进行压缩
  3. 使用Pool管理响应头
  4. 分别发送返回头和返回体

总结

tinyrpc是基于golang原生的net/rpc包实现,在此基础上实现了压缩和序列化等能力扩展。整体来看tinyrpc的代码非常简单,比较适合刚接触golang的程序员来进行阅读学习,学习一些golang的基础的开发技巧和一些语言特性。
最后如果你正在为没有资料学习发愁,需要Golang学习资料可以点击下方卡片,把相关的学习资料分享给你

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/163235.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Junit单元测试

Junit测试简介什么是单元测试单元测试是针对最小的功能单元编写测试代码Java程序最小的功能单元是方法单元测试就是针对单个Java方法的测试测试驱动开发(TDD)使用main()方法测试的缺点:只能有一个main()方法,不能把测试代码分离没有打印出测试结果和期望…

python+vue2+nodejs 搜索引擎课设 SCAU数信学院本科生通知检索(附源码)

前言 这个系统主要实现了以下功能: 爬虫:数据爬取及分词后端:数据库全文模糊搜索、高频词获取前端:输入拼音缩写或文字后匹配输入建议、搜索、列表分页、高亮关键词、相关度排序及时间排序、深色模式及浅色模式切换 爬虫&#x…

兔八哥与猎人

兔八哥与猎人 题目描述 兔八哥躲藏在树林旁边的果园里。果园有 MNM \times NMN 棵树,组成一个 MMM 行 NNN 列的矩阵,水平或垂直相邻的两棵树的距离为 111。兔八哥在一棵果树下。 猎人背着猎枪走进了果园,他爬上一棵果树,准备杀…

springboot整合JSR303校验

4.7 JSR303校验 4.7.1 统一校验的需求 前端请求后端接口传输参数,是在controller中校验还是在Service中校验? 答案是都需要校验,只是分工不同。 Contoller中校验请求参数的合法性,包括:必填项校验,数据…

Python_内置函数

1、abs():绝对值 2、all():接受一个可迭代对象,如果对象里的所有元素的bool运算值都是True,那么返回True,否则返回False 3、any():接受一个可迭代对象,如果对象里有一个元素的bool运算值都是True&#xff0…

CSS实现从下至上弹出的抽屉动画

从下至上展开抽屉动画<!DOCTYPE html> <html><head><meta charset"UTF-8"><meta name"viewport" content"initial-scale1.0, maximum-scale1.0, user-scalableno" /><title></title><style>.co…

码农抓取商品详情API调用,Json和XML等格式

API 指的 是一些预定义的函数&#xff0c; 可以 提供给应用程序和开发人员基于软件或硬件访问一组例程的 功能 &#xff0c; 而 不再需要访问源代码或理解内部工作机制细节。 API 可以用于 于开发使用相同数据的其他应用程序&#xff0c;比如公司&#xff0c;他们可以创建一个A…

携手向前,欧拉沙龙双品牌联合运营纯电赛道再提速

面对波诡云谲的市场环境和竞争格局&#xff0c;企业只有不断变革&#xff0c;才能赢得更多的发展机遇&#xff0c;拥有属于自己的生存空间。 在2022年12月底广州国际车展和今年1月初的海口新能源车展上&#xff0c;欧拉携好猫、好猫GT、芭蕾猫、闪电猫&#xff0c;沙龙携高端车…

【Linux】-- 进程程序替换

目录 引入进程程序替换 进程程序替换 初步使用exec系列函数 原理分析 做一个简易的shell cd - 内置命令的理解 export - 环境变量的深入理解 引入进程程序替换 对于fork的学习让我们知道&#xff1a;fork()之后的&#xff0c;父子进程各自执行父进程代码的一部分。但是创…

IO初识233

绝对路径和相对路径 路径是用来描述一个文件在电脑上的具体位置。 这里的 E:\绘画合集\CCE展会logo 2.0就是绝对路径 目录之间的分隔符可以用\也可以用/来表示 相对路径就是以一个基准路径&#xff08;工作路径&#xff09;&#xff0c;以基准路径为起点往下走要怎么表示目标…

Java字符串训练

Java字符串训练一、用户登录二、统计字符次数三、拼接字符串1. 使用String2. 使用StringBuilder四、字符串反转五、金额转换六、手机号屏蔽七、身份证信息查看八、敏感词替换九、对称字符串十、数字转罗马数字十一、调整字符串十二、打乱字符串一、用户登录 需求&#xff1a;已…

MySQL监控(一):了解SigNoz

1.SigNoz介绍 github SigNoz SigNoz官方文档 2022 年 11 大 MYSQL 监控工具 MySQL | 六个最常用的 MySQL 数据库监控工具 2.SigNoz安装 从官方文档上得知使用以下命令进行安装&#xff1a; git clone -b main https://github.com/SigNoz/signoz.git && cd signoz/d…

SpringSecurity(十三)【授权】

十三、授权 什么是授权权限管理核心概念Spring Security 权限管理策略基于 URL 地址的权限管理基于方法的权限管理实战 权限管理 身份认证&#xff0c;就是判断一个用户是否为合法用户的处理过程。SpringSecurity中支持多种不同方式的认证&#xff0c;但是无论开发者使用那种方…

【uniapp】uniapp使用高德地图定位打包成安卓app的一些记录,比如打包后定位失效、

提示&#xff1a;文章写完后&#xff0c;目录可以自动生成&#xff0c;如何生成可参考右边的帮助文档 文章目录前言一、创建你的uniapp1.打开Dcloud开发者后台2.下载你的证书、获取你的SHA1安全码、证书私钥密码二、打开高德开放平台申请key1.打开官网2.创建一个应用三、在unia…

快速傅里叶变换FFT和逆变换的python编程

0. 预备知识 快速傅里叶变换旨在解决离散傅里叶变换DFT计算量大效率低的问题。当我们想要抑制噪声提取出某段信号中的有效信息时&#xff0c;如系统模型辨识或者是使用高精度力传感器测量人体腕部寸关尺脉搏信号这类应用&#xff0c;应该如何设计采样流程&#xff1f; 首先&a…

《通讯录》思路及代码实现详解

目录 一、通讯录功能实现的详细描述 二、通讯录的代码及思路实现 2、1 定义联系人结构体 2、2 初始化就结构体与释放动态开辟空间的实现 2、3 菜单打印 2、4 添加联系人信息 2、5 删除联系人信息 2、6 查询联系人信息 2、7 修改联系人信息 2、8 打印所有联系人信息 2、9 排序整…

75. 序列模型的代码实现

1. 训练 在了解了上述统计工具后&#xff0c;让我们在实践中尝试一下&#xff01; 首先&#xff0c;我们生成一些数据&#xff1a;(使用正弦函数和一些可加性噪声来生成序列数据&#xff0c; 时间步为 1,2,…,1000 。) %matplotlib inline import torch from torch import nn…

新手nvm npm 卸载不用依赖包,项识别为 cmdlet、函数、脚本文件,等命令集合

nvm安装包&#xff1a;Releases coreybutler/nvm-windows GitHub下载ta就不用单独下载node了注意:vnm安装位置尽量不要动C:\Users\Administrator\AppData\Roaming\nvm\settings.txt增加下面代码node_mirror: https://npm.taobao.org/mirrors/node/ npm_mirror: https://npm.t…

java+Springboot交通事故档案管理系统

系统分为用户和管理员两个角色 用户的主要功能有&#xff1a; 1.用户注册和登陆系统 2.用户查看警察相关信息 3.用户查看我的相关事故信息&#xff0c;可以对交通事故进行交通申诉 4.用户查看交通申诉审核信息 5.退出登陆 管理员的主要功能有&#xff1a; 1.管理员输入账户登陆…

Metasploit渗透框架介绍及永恒之蓝复现

Metasploit渗透框架介绍及永恒之蓝复现一、Metasploit渗透框架介绍1.1 名词解释1.2 MSF简介1.3 MSF框架结构1.4 MSF命令汇总1.4.1 常用命令1.4.2 基本命令1.4.3 Exploits模块1.4.4 漏洞名称规则1.5 MSF模块介绍1.5.1 auxiliary(辅助模块)1.5.2 exploits(漏洞利用模块)1.5.3 pay…