day1 服务端与消息编码

news2024/9/9 3:54:25

文章目录

  • 消息的序列化与反序列化
  • 通信过程
  • 服务端的实现
  • main 函数(一个简易的客户端)

本文代码地址:

本文是7天用Go从零实现RPC框架GeeRPC的第一篇。

  • 使用 encoding/gob 实现消息的编解码(序列化与反序列化)
  • 实现一个简易的服务端,仅接受消息,不处理,代码约 200

消息的序列化与反序列化

一个典型的 RPC 调用如下:

err = client.Call("Arith.Multiply", args, &reply)

客户端发送的请求包括服务名 Arith,方法名 Multiply,参数 args 三个,服务端的响应包括错误 error,返回值 reply 2 个。我们将请求和响应中的参数和返回值抽象为 body,剩余的信息放在 header 中,那么就可以抽象出数据结构 Header

day1-codec/codec/codec.go

package codec

import "io"

type Header struct {
	ServiceMethod string // format "Service.Method"
	Seq           uint64 // sequence number chosen by client
	Error         string
}
  • ServiceMethod 是服务名和方法名,通常与 Go 语言中的结构体和方法相映射。
  • Seq 是请求的序号,也可以认为是某个请求的 ID,用来区分不同的请求。
  • Error 是错误信息,客户端置为空,服务端如果如果发生错误,将错误信息置于 Error 中。

我们将和消息编解码相关的代码都放到 codec 子目录中,在此之前,还需要在根目录下使用 go mod init geerpc 初始化项目,方便后续子 package 之间的引用。

进一步,抽象出对消息体进行编解码的接口 Codec,抽象出接口是为了实现不同的 Codec 实例:

type Codec interface {
	io.Closer
	ReadHeader(*Header) error
	ReadBody(interface{}) error
	Write(*Header, interface{}) error
}

紧接着,抽象出 Codec 的构造函数,客户端和服务端可以通过 CodecType 得到构造函数,从而创建 Codec 实例。这部分代码和工厂模式类似,与工厂模式不同的是,返回的是构造函数,而非实例。

type NewCodecFunc func(io.ReadWriteCloser) Codec

type Type string

const (
	GobType  Type = "application/gob"
	JsonType Type = "application/json" // not implemented
)

var NewCodecFuncMap map[Type]NewCodecFunc

func init() {
	NewCodecFuncMap = make(map[Type]NewCodecFunc)
	NewCodecFuncMap[GobType] = NewGobCodec
}

我们定义了 2CodecGobJson,但是实际代码中只实现了 Gob 一种,事实上,2 者的实现非常接近,甚至只需要把 gob 换成 json 即可。

首先定义 GobCodec 结构体,这个结构体由四部分构成,conn 是由构建函数传入,通常是通过 TCP 或者 Unix 建立 socket 时得到的链接实例,dec enc 对应 gobDecoderEncoderbuf 是为了防止阻塞而创建的带缓冲的 Writer,一般这么做能提升性能。

day1-codec/codec/gob.go

package codec

import (
	"bufio"
	"encoding/gob"
	"io"
	"log"
)

type GobCodec struct {
	conn io.ReadWriteCloser
	buf  *bufio.Writer
	dec  *gob.Decoder
	enc  *gob.Encoder
}

var _ Codec = (*GobCodec)(nil)

func NewGobCodec(conn io.ReadWriteCloser) Codec {
	buf := bufio.NewWriter(conn)
	return &GobCodec{
		conn: conn,
		buf:  buf,
		dec:  gob.NewDecoder(conn),
		enc:  gob.NewEncoder(buf),
	}
}

Go 语言中,json.NewDecoder json.Unmarshal 都用于将 JSON 数据解析为 Go中的数据结构,但它们有一些区别:

  • json.NewDecoder 是通过创建一个 Decoder 对象,从一个 io.Reader(如os.Stdin、文件、网络连接等)中读取 JSON 数据并进行解码。
  • json.Unmarshal 则是直接将 JSON 数据(以字节切片 []byte 或者字符串的形式)解析并映射到指定的数据结构。

使用场景上,如果数据是从一个输入流中读取,通常使用 json.NewDecoder;如果已经有了 JSON 数据的字节切片或字符串,使用json.Unmarshal 会更方便。json.NewEncoder json.Marshal 同理。

接着实现 ReadHeaderReadBodyWriteClose 方法。

func (c *GobCodec) ReadHeader(h *Header) error {
	return c.dec.Decode(h)
}

func (c *GobCodec) ReadBody(body interface{}) error {
	return c.dec.Decode(body)
}

func (c *GobCodec) Write(h *Header, body interface{}) (err error) {
	defer func() {
		_ = c.buf.Flush()
		if err != nil {
			_ = c.Close()
		}
	}()
	if err := c.enc.Encode(h); err != nil {
		log.Println("rpc codec: gob error encoding header:", err)
		return err
	}
	if err := c.enc.Encode(body); err != nil {
		log.Println("rpc codec: gob error encoding body:", err)
		return err
	}
	return nil
}

func (c *GobCodec) Close() error {
	return c.conn.Close()
}

通信过程

客户端与服务端的通信需要协商一些内容,例如 HTTP 报文,分为headerbody 2 部分,body 的格式和长度通过 header 中的 Content-TypeContent-Length 指定,服务端通过解析 header 就能够知道如何从 body 中读取需要的信息。对于 RPC 协议来说,这部分协商是需要自主设计的。为了提升性能,一般在报文的最开始会规划固定的字节,来协商相关的信息。比如第1个字节用来表示序列化方式,第2个字节表示压缩方式,第3-6字节表示 header 的长度,7-10 字节表示 body 的长度。

对于 GeeRPC 来说,目前需要协商的唯一一项内容是消息的编解码方式。我们将这部分信息,放到结构体 Option 中承载。目前,已经进入到服务端的实现阶段了。

day1-codec/server.go

package geerpc

const MagicNumber = 0x3bef5c

type Option struct {
	MagicNumber int        // MagicNumber marks this's a geerpc request
	CodecType   codec.Type // client may choose different Codec to encode body
}

var DefaultOption = &Option{
	MagicNumber: MagicNumber,
	CodecType:   codec.GobType,
}

一般来说,涉及协议协商的这部分信息,需要设计固定的字节来传输的。但是为了实现上更简单,GeeRPC 客户端固定采用 JSON 编码 Option,后续的 headerbody 的编码方式由 Option 中的 CodeType 指定,服务端首先使用 JSON 解码 Option,然后通过 OptionCodeType 解码剩余的内容。即报文将以这样的形式发送:

| Option{MagicNumber: xxx, CodecType: xxx} | Header{ServiceMethod ...} | Body interface{} |
| <------      固定 JSON 编码      ------>  | <-------   编码方式由 CodeType 决定   ------->|

在一次连接中,Option 固定在报文的最开始,HeaderBody 可以有多个,即报文可能是这样的。

| Option | Header1 | Body1 | Header2 | Body2 | ...

服务端的实现

通信过程已经定义清楚了,那么服务端的实现就比较直接了。

day1-codec/server.go

// Server represents an RPC Server.
type Server struct{}

// NewServer returns a new Server.
func NewServer() *Server {
	return &Server{}
}

// DefaultServer is the default instance of *Server.
var DefaultServer = NewServer()

// Accept accepts connections on the listener and serves requests
// for each incoming connection.
func (server *Server) Accept(lis net.Listener) {
	for {
		conn, err := lis.Accept()
		if err != nil {
			log.Println("rpc server: accept error:", err)
			return
		}
		go server.ServeConn(conn)
	}
}

// Accept accepts connections on the listener and serves requests
// for each incoming connection.
func Accept(lis net.Listener) { DefaultServer.Accept(lis) }
  • 首先定义了结构体 Server,没有任何的成员字段。
  • 实现了 Accept 方式,net.Listener 作为参数,for 循环等待 socket 连接建立,并开启子协程处理,处理过程交给了 ServerConn 方法。
  • DefaultServer 是一个默认的 Server 实例,主要为了用户使用方便。

如果想启动服务,过程是非常简单的,传入 listener 即可,tcp 协议和 unix 协议都支持。

lis, _ := net.Listen("tcp", ":9999")
geerpc.Accept(lis)

ServeConn 的实现就和之前讨论的通信过程紧密相关了,首先使用 json.NewDecoder 反序列化得到 Option 实例,检查 MagicNumber CodeType 的值是否正确。然后根据 CodeType 得到对应的消息编解码器,接下来的处理交给 serverCodec

// ServeConn runs the server on a single connection.
// ServeConn blocks, serving the connection until the client hangs up.
func (server *Server) ServeConn(conn io.ReadWriteCloser) {
	defer func() { _ = conn.Close() }()
	var opt Option
	if err := json.NewDecoder(conn).Decode(&opt); err != nil {
		log.Println("rpc server: options error: ", err)
		return
	}
	if opt.MagicNumber != MagicNumber {
		log.Printf("rpc server: invalid magic number %x", opt.MagicNumber)
		return
	}
	f := codec.NewCodecFuncMap[opt.CodecType]
	if f == nil {
		log.Printf("rpc server: invalid codec type %s", opt.CodecType)
		return
	}
	server.serveCodec(f(conn))
}

// invalidRequest is a placeholder for response argv when error occurs
var invalidRequest = struct{}{}

func (server *Server) serveCodec(cc codec.Codec) {
	sending := new(sync.Mutex) // make sure to send a complete response
	wg := new(sync.WaitGroup)  // wait until all request are handled
	for {
		req, err := server.readRequest(cc)
		if err != nil {
			if req == nil {
				break // it's not possible to recover, so close the connection
			}
			req.h.Error = err.Error()
			server.sendResponse(cc, req.h, invalidRequest, sending)
			continue
		}
		wg.Add(1)
		go server.handleRequest(cc, req, sending, wg)
	}
	wg.Wait()
	_ = cc.Close()
}

serveCodec 的过程非常简单。主要包含三个阶段

  • 读取请求 readRequest
  • 处理请求 handleRequest
  • 回复请求 sendResponse

之前提到过,在一次连接中,允许接收多个请求,即多个 request headerrequest body,因此这里使用了for无限制地等待请求的到来,直到发生错误(例如连接被关闭,接收到的报文有问题等),这里需要注意的点有三个:

  • handleRequest 使用了协程并发执行请求。
  • 处理请求是并发的,但是回复请求的报文必须是逐个发送的,并发容易导致多个回复报文交织在一起,客户端无法解析。在这里使用锁(sending)保证。
  • 尽力而为,只有在 header 解析失败时,才终止循环。
// request stores all information of a call
type request struct {
	h            *codec.Header // header of request
	argv, replyv reflect.Value // argv and replyv of request
}

func (server *Server) readRequestHeader(cc codec.Codec) (*codec.Header, error) {
	var h codec.Header
	if err := cc.ReadHeader(&h); err != nil {
		if err != io.EOF && err != io.ErrUnexpectedEOF {
			log.Println("rpc server: read header error:", err)
		}
		return nil, err
	}
	return &h, nil
}

func (server *Server) readRequest(cc codec.Codec) (*request, error) {
	h, err := server.readRequestHeader(cc)
	if err != nil {
		return nil, err
	}
	req := &request{h: h}
	// TODO: now we don't know the type of request argv
	// day 1, just suppose it's string
	req.argv = reflect.New(reflect.TypeOf(""))
	if err = cc.ReadBody(req.argv.Interface()); err != nil {
		log.Println("rpc server: read argv err:", err)
	}
	return req, nil
}

func (server *Server) sendResponse(cc codec.Codec, h *codec.Header, body interface{}, sending *sync.Mutex) {
	sending.Lock()
	defer sending.Unlock()
	if err := cc.Write(h, body); err != nil {
		log.Println("rpc server: write response error:", err)
	}
}

func (server *Server) handleRequest(cc codec.Codec, req *request, sending *sync.Mutex, wg *sync.WaitGroup) {
	// TODO, should call registered rpc methods to get the right replyv
	// day 1, just print argv and send a hello message
	defer wg.Done()
	log.Println(req.h, req.argv.Elem())
	req.replyv = reflect.ValueOf(fmt.Sprintf("geerpc resp %d", req.h.Seq))
	server.sendResponse(cc, req.h, req.replyv.Interface(), sending)
}

目前还不能判断 body 的类型,因此在readRequesthandleRequest 中,day1 body 作为字符串处理。接收到请求,打印 header,并回复 geerpc resp ${req.h.Seq}。这一部分后续再实现。

main 函数(一个简易的客户端)

day1 的内容就到此为止了,在这里我们已经实现了一个消息的编解码器 GobCodec,并且客户端与服务端实现了简单的协议交换(protocol exchange),即允许客户端使用不同的编码方式。同时实现了服务端的雏形,建立连接,读取、处理并回复客户端的请求。

接下来,我们就在 main 函数中看看如何使用刚实现的 GeeRPC 吧。

day1-codec/main/main.go

package main

import (
	"encoding/json"
	"fmt"
	"geerpc"
	"geerpc/codec"
	"log"
	"net"
	"time"
)

func startServer(addr chan string) {
	// pick a free port
	l, err := net.Listen("tcp", ":0")
	if err != nil {
		log.Fatal("network error:", err)
	}
	log.Println("start rpc server on", l.Addr())
	addr <- l.Addr().String()
	geerpc.Accept(l)
}

func main() {
	addr := make(chan string)
	go startServer(addr)

	// in fact, following code is like a simple geerpc client
	conn, _ := net.Dial("tcp", <-addr)
	defer func() { _ = conn.Close() }()

	time.Sleep(time.Second)
	// send options
	_ = json.NewEncoder(conn).Encode(geerpc.DefaultOption)
	cc := codec.NewGobCodec(conn)
	// send request & receive response
	for i := 0; i < 5; i++ {
		h := &codec.Header{
			ServiceMethod: "Foo.Sum",
			Seq:           uint64(i),
		}
		_ = cc.Write(h, fmt.Sprintf("geerpc req %d", h.Seq))
		_ = cc.ReadHeader(h)
		var reply string
		_ = cc.ReadBody(&reply)
		log.Println("reply:", reply)
	}
}
  • startServer 中使用了信道 addr,确保服务端端口监听成功,客户端再发起请求。
  • 客户端首先发送 Option 进行协议交换,接下来发送消息头 h := &codec.Header{},和消息体 geerpc req ${h.Seq}
  • 最后解析服务端的响应 reply,并打印出来。

执行结果如下:

start rpc server on [::]:63662
&{Foo.Sum 0 } geerpc req 0
reply: geerpc resp 0
&{Foo.Sum 1 } geerpc req 1
reply: geerpc resp 1
&{Foo.Sum 2 } geerpc req 2
reply: geerpc resp 2
&{Foo.Sum 3 } geerpc req 3
reply: geerpc resp 3
&{Foo.Sum 4 } geerpc req 4
reply: geerpc resp 4

原文链接:https://geektutu.com/post/geerpc-day1.html

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1960741.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

国内民营企业「数字化转型」典型案例

一、企业简介 三一集团成立于1989年&#xff0c;现有3家上市公司&#xff08;三一重工、三一国际、三一重能&#xff09;&#xff0c;公司总资产超2000亿元&#xff0c;在国内12个省市设有生产基地&#xff0c;在海外建有印度、美国、德国、巴西四大研发制造基地&#xff0c;业…

【国产化信创平台】麒麟银河V10系统虚拟机创建

目录 一、麒麟V10系统镜像下载 二、虚拟机创建流程 三、麒麟银河系统安装流程 一、麒麟V10系统镜像下载 https://www.kylinos.cn/# 官方访问还是会有问题&#xff0c;如果有需要麒麟银河Kylin系统V10的镜像文件&#xff0c;可以留下邮箱或者私信博主获取。 二、虚拟机创…

【LeetCode】16. 最接近的三数之和

三数之和这道题被反复考到&#xff0c;但是我一次都没给写出来&#xff0c;真是汗颜&#xff01;本题是三数之和的一道变形题&#xff0c;也是一道好题&#xff01;本题有两个关键点&#xff1a;其一&#xff0c;双指针是怎么个用法&#xff1f;在本题中是怎么实现的&#xff1…

智慧的拼图:聚类分析的启示

文章目录 聚类分析简介K-means聚类什么是K-means聚类&#xff1f;K-means聚类的步骤K-means聚类的特点K-means聚类实例 层次聚类什么是层次聚类&#xff1f;层次聚类的类型层次聚类的步骤层次聚类的特点层次聚类实例 K-means与层次聚类对比相同点不同点 结论补充 聚类分析是数据…

【永洪BI】1.添加数据源

目录 一、添加文本数据源&#xff08;EXCEL&TEXT&#xff09; 二、添加Mysql数据源 一、添加文本数据源&#xff08;EXCEL&TEXT&#xff09; 略 二、添加Mysql数据源 1.添加Mysql驱动 2.问题&#xff1a; 问题① 原因&#xff1a; Jar文件上传校验&#xff0c;只允…

类静态方法将python 中把不同脚本融合为一个大脚本 互不影响的方法!首发 以便于维护和扩展。

一般遇见想要方便管理或者集合的大脚本 我们管理很麻烦 用这个方法 无论是什么方面的脚本 都能放在一个大脚本中运行 互不干扰。 将所有功能整合到一个单一的脚本中&#xff0c;而不再依赖外部的.py文件&#xff0c;你可以将rendering.py和sercurity.py中的函数代码直接复制到…

centos7 安装minio

文章目录 下载 Minio 二进制文件配置 Minio将Minio设置成服务 配置Systemd服务启动创建minio变量文件&#xff1a;/etc/default/minio创建Service File&#xff1a;/etc/systemd/system/minio.service设置开启自启动 Nginx反向代理nginx配置 下载 Minio 二进制文件 cd /usr/lo…

VLM系列文章6-Cambrian-1

以视觉为中心的多模态大模型Cambrian-1&#xff0c;主要考虑的是MLLM中视觉模型的影响。 1、评估 LLM 还是 MLLM&#xff1f;&#xff1a;我们在使用 23 种不同的视觉主干训练的 MLLM 中比较了视觉禁用和视觉启用设置之间的性能。我们的研究结果表明&#xff0c;MMMU 和 AI2D 等…

有没有视频ai换脸的软件?

AI视频生成&#xff1a;小说文案智能分镜智能识别角色和场景批量Ai绘图自动配音添加音乐一键合成视频百万播放量https://aitools.jurilu.com/ 发现大家对AI换脸都非常感兴趣&#xff0c;AI换脸一般的步骤是&#xff1a;上传自己的照片→选择不同的风格→制作出被替换的GIF图或视…

安卓修改logo与开机动画后不显示问题解决

问题描述 替换安卓logo与开机动画bootanimation.zip画面是花的或黑的 修复步骤 手动替换安卓framework中的开机logo后显示异常排查 在电脑上打开logo图片, 查看分辨率, 再adb shell wm size查看设备的分辨率,要求二者一致.电脑上右键点击logo图片,查看属性,要确保它的位深度…

数据结构与算法 - 二分查找

一、二分查找 二分查找算法也称折半查找&#xff0c;是一种非常高效的工作于有序数组的查找算法。 时间复杂度 最坏情况&#xff1a;O(log n)最好情况&#xff1a;如果待查找元素恰好在数组中央&#xff0c;只需要循环一次O(1&#xff09; 空间复杂度 递归->O(log n)&a…

暑期C++ printf和scanf的平替

有任何不懂的问题可以评论区留言&#xff0c;能力范围内都会一一回答 C中也有专门的输入和输出的方法 首先我们需要一个头文件&#xff0c;也就是#include<iostream> 然后根据我们命名空间的知识可知这个地方如果我们要使用必须先展开 可以全部展开比如using namespa…

K8s大模型算力调度策略的深度解析

随着大数据和人工智能技术的飞速发展&#xff0c;Kubernetes&#xff08;简称K8s&#xff09;作为容器编排的领军者&#xff0c;在支撑大规模模型训练和推理方面扮演着越来越重要的角色。在大模型算力的调度过程中&#xff0c;如何高效、合理地分配和管理资源成为了一个亟待解决…

实验2-5-1 求排列数

本题要求实现一个计算阶乘的简单函数&#xff0c;使得可以利用该函数&#xff0c;根据公式 算出从n个不同元素中取出m个元素&#xff08;0<m≤n&#xff09;的排列数。 函数接口定义&#xff1a; double fact( int n );其中n是用户传入的参数&#xff0c;函数返回n的阶乘。…

数据分析概要【数据分析---偏企业】

各位大佬好 &#xff0c;这里是阿川的博客&#xff0c;祝您变得更强 个人主页&#xff1a;在线OJ的阿川 大佬的支持和鼓励&#xff0c;将是我成长路上最大的动力 阿川水平有限&#xff0c;如有错误&#xff0c;欢迎大佬指正 数据分析概要前 必看 Python 初阶 Python–语言基础…

[数据集][目标检测]易拉罐底部缺陷检测数据集VOC+YOLO格式1122张5类别

数据集格式&#xff1a;Pascal VOC格式YOLO格式(不包含分割路径的txt文件&#xff0c;仅仅包含jpg图片以及对应的VOC格式xml文件和yolo格式txt文件) 图片数量(jpg文件个数)&#xff1a;1122 标注数量(xml文件个数)&#xff1a;1122 标注数量(txt文件个数)&#xff1a;1122 标注…

企业获客重要途径-大数据获客系统

企业获客的重要途径之一是通过大数据获客系统。这一系统利用大数据技术和分析方法&#xff0c;帮助企业更精准地获取客户&#xff0c;提高市场营销的效率和效果。 所以整理了以下是大数据获客系统作为企业获客重要途径的详细阐述&#xff1a; 一、大数据获客系统的定义与功能…

永磁同步电机谐波抑制算法(8)——基于自适应带宽扩张状态观测器的采样电流偏置误差补偿办法

1.前言 在上一期内容中&#xff0c;已经介绍了采样电流的偏置误差the current measurement offset error /CMOE&#xff08;这个采样电流偏置误差通常认为是直流DC偏置&#xff0c;所以其在dq电流中会造成一次谐波&#xff09;。如果没看过上一期内容&#xff0c;那先需要补一…

SSRF-labs-master靶场

目录 file_get_content.php sql_connect.php download.php dns-spoofing.php dns_rebinding.php 访问链接 http://127.0.0.1/SSRF/# file_get_content.php 在编程语言中&#xff0c;有一些函数可以获取本地保存文件的内容。这些功能可能能够从远程URL以及本地文件 如果没…

C++第二十九弹---C++继承机制深度剖析(上)

✨个人主页&#xff1a; 熬夜学编程的小林 &#x1f497;系列专栏&#xff1a; 【C语言详解】 【数据结构详解】【C详解】 目录 1.继承的概念及定义 1.1继承的概念 1.2 继承定义 1.2.1定义格式 1.2.2继承关系和访问限定符 1.2.3继承基类成员访问方式的变化 2.基类和派生…