字节开源的netPoll多路复用器源码解析
- 引言
- NetPoll
- epoll API
- 原生网络库实现
- netpoll 设计思路
- netpoll 对比 go net
- 数据结构
- 源码解析
- 多路复用池初始化
- Epoll相关API
- 可读事件处理
- server启动
- accept 事件
- 客户端连接初始化
- 客户端连接建立
- 可读事件
- 等待读取数据
- 可写事件处理
- 客户端启动
- 写数据
- 可写事件
- 客户端socket可写事件
- 服务端socket可写事件
引言
IO 有阻塞和非阻塞两种模式,在阻塞IO下,我们需要耗费一个线程去阻塞在read操作下,去等待有足够多的数据可读并返回。在非阻塞IO下,不停对所有fd集合进行轮询,筛选出所有可读fd进行处理。
阻塞IO浪费线程(会占用内存和上下文切换开销),非阻塞IO会浪费CPU做大量无效操作。而基于IO多路复用系统调用实现的poll的意义在于将可读/可写状态通知和实际文件操作分开,并支持多个文件描述符通过一个系统调用监听以提升性能。
网络库的核心功能就是去同时监听大量的文件描述符的状态变化(通过操作系统调用),并对于不同状态变更,高效,安全地进行对应的文件操作。
对于一个高效的网络库而言,它的设计需要考虑以下几个场景:
- 连接数量密集型: 如长连接场景,每个连接请求并不多,但是需要一直维护着长连接
- 连接创建/销毁密集型: 如短连接场景,会频繁创建销毁连接
这类场景下,对 listener fd 的压⼒很⼤,监听 listener fd 的系统调⽤会被频繁唤醒。⽽⼀个 fd 只能被⼀个线程处理,这样的话创建连接的压⼒只能由单个 CPU 承担⽆法充分利⽤多核。Linux 后⾯增加SO_REUSEPORT 功能,可以对同⼀个 bind ip+port 创建多个 listener fd,内核提供负载均衡分发,这样来实现多核处理连接创建密集型场景。需要注意的是,即便是那些⻓连接场景下,如果遇到⼀些特殊业务场景(例如准点秒杀)也会出现瞬间创建⼤量连接的情况。
- 请求密集型: 如支持连接多路复用的RPC服务,点对点的所有请求都可以基于一个长连接进行,此时单连接会频繁被唤醒处理事件
同时由于网络库不仅要管理监听文件事件,还需要管理用户业务逻辑层handler的执行 ,因此一个设计优秀的网络库,还应当具备以下指标:
- QPS 尽量高
QPS 要⾼意味着要让单请求开销低(即平均 latency 低),⾸先要保证充分利⽤CPU ,其次是要让 CPU 尽可能少执⾏内存拷⻉等和我们⾸要⼯作⽆关的代码。简⽽⾔之,CPU处于满负荷⼯作,且做有效的⼯作的状态。⽆效⼯作是指那些和业务⽆关的事情,例如GC,线程&& 协程上下⽂切换开销,锁竞争等。在 Golang 中,G 依赖 P 运⾏,⽽ P ⾃⾝有调度逻辑,所以需要尽可能充分利⽤ P,不让 P 空转
- P99 延迟尽量低
P99 ⽐ Avg ⾼的根因是在运⾏中间遇到⼀些原因导致 CPU 腾出去进⾏了其他的⼯作,或是整个⼯作循环被暂停了(如 GC stop the world,或是 Goroutine 陷⼊ syscall导致的暂时卡顿,⼜或是锁竞争)。
NetPoll
epoll API
在正式开始讲解NetPoll源码前,我们先来快速复习一下多路复用API实现,本文基于Linux系统进行展开,所有此处多路复用器实现基于epoll展开:
typedef union epoll_data {
int fd;
//...
} epoll_data_t;
struct epoll_event {
uint32_t events; /* Epoll events */
epoll_data_t data; /* User data variable */
};
int epoll_create(int size);
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
int epoll_wait(int epfd, struct epoll_event *events,int maxevents, int timeout);
- epoll_create: 创建 epollfd 对象,后续 epoll 操作都围绕该对象。
- epoll_ctl: 在 epollfd 对象上,对 fd 使⽤某个
op(ADD/DEL/...)
,并声明关⼼什么events(EPOLLIN/EPOLLOUT/...)
。 - epoll_wait: 阻塞等待直到 epollfd 内有就绪事件便返回,返回值为有效事件数,并且有效事件会记录再传⼊的 events 地址中。
- timeout > 0 时:超过 timeout ms 后返回,若⽆事件发⽣返回值为 0
- timeout = 0 时:⾮阻塞,即便没有任何事件发⽣,也会⽴刻返回,返回值为 0
- timeout = -1 时: 阻塞,直到有事件发⽣
Epoll 在使⽤上有两种模式:边缘触发(ET)和⽔平触发(LT)
- 边缘触发只有在从⽆数据到有数据时通知⼀次,⽽⽔平触发只要 fd 处于可读状态就会⼀直触发。
- ⽔平触发的缺点在于,如果没有读完 fd 继续调⽤ epoll_wait 还会再次触发 EPOLLOUT 导致会⼀直尝试进⾏读完所有数据。这导致如果包体积特别⼤的情况下,会占⽤更多内存开销。
原生网络库实现
golang 原生网络库基于epoll et模式开发,基本架构如下图所示:
- 每个 fd 对应⼀个 goroutine,业务⽅对 conn 发起主动的读写,底层使⽤⾮阻塞 IO,当事件未就
绪,将 fd 注册(epoll_ctl)进 epoll fd,通过把 goroutine 设置(park)成 GWaiting 状态。当有就绪
事件后,唤醒(ready) 对应 goroutine 成 GRunnable 状态。 - ⽤⼾⾃⼰的 Goroutine 负责进⾏实际的 read/write syscal,Go Net 负责事件监听,以及帮助 block/ready ⽤⼾ Goroutine。
golang原生网络库的特点就是:
- 从⽤⼾视⻆来看 net.Conn 接⼝的函数都是阻塞的,即便底层 IO 是⾮阻塞的
- Read 接⼝能够填充满缓冲区就填充,填充不满也会直接返回⻓度 n
- 上层调⽤⽅既可以控制从内核缓冲区中的读取速率,也可以控制读取块⼤⼩
type Conn interface {
// Read reads data from the connection.
// Read can be made to time out and return an error after a fixed
// time limit; see SetDeadline and SetReadDeadline.
Read(b []byte) (n int, err error)
// Write writes data to the connection.
// Write can be made to time out and return an error after a fixed
// time limit; see SetDeadline and SetWriteDeadline.
Write(b []byte) (n int, err error)
}
由于go net采用ET模式,所以只会在数据就绪时通知一次,用户在自己的线程中调用read api不断读取数据,直到返回的n等于0,说明数据全部读取完毕了。
golang 原生网络库的优势如下:
- 创建连接后,由调⽤⽅ Goroutine ⾃⾝决定是否要进⾏读写,何时进⾏读写,以多少 size 进⾏读写。
- ⽤⼾只需关⼼ net.Conn 暴露的同步接⼝,使⽤上⾮常⽅便。
但是有利必有弊,golang原生网络库的设计会导致如下问题:
- 1 Goroutine : 1 Connection 模型,在连接⾮常多时,Goroutine 数量会爆炸从⽽在调度上产⽣较⼤开销,进⽽影响到 P99 指标。
- 不⽀持零拷⻉,⽤⼾使⽤
conn.Read(b []byte)
后,完成⼀次内核缓冲区到⽤⼾缓冲区的复制。拿到[]byte
后再传递给上层进⾏协议解析(反序列化)时,往往还需要再进⾏⼀次拷⻉
根本性原因还是协议反序列化时拿到的内存和内核缓冲区复制到⽤⼾缓冲区的内存不是同⼀块导致的
- net.Conn 是⽤⼾主动调⽤才触发事件监听,所以如果⼀个连接对端已经关闭,此时需要下⼀次写/读时,才能根据返回 error 进⾏判断连接状态。如果该连接⼀直没被调⽤到,会⼀直存在,占⽤内存。
netpoll 设计思路
Netpoll 主要由两⼤部分构成:
- EventLoop(Polls):⽤来监听⽂件描述符事件
- FDOperators:⽤来根据不同事件进⾏不同操作
这里官方提供了一幅图画的很好:
整个流程要分为三部分来看:
-
netpoll 初始化:
- netpoll 启动时,会初始化poll manager , 依次初始化池中每个poll对象
- 首先调用EpollCreate api创建一个新的Epoll对象,然后将其与当前poll对象绑定,同时还会为当前poll对象分配linkbuffer等缓冲区
- 为每个poll对象开启一个协程来不断轮询当前epoll上的可读可写等事件
-
server 端:
- 启动后,从poll manager中获取一个空闲的poll ,将listener fd注册到poll中,监听accept事件
- 当accept 到客户端连接后,从poll manager中获取一个空闲的poll ,将客户端socket fd注册到poll中,监听可读事件
- 每个poll会关联一个LinkBuffer对象,当监听到客户端连接上的可读事件后,从linkbuffer中预定一块内存,将数据都读取到这块内存中来
- 包装一个模版任务,用于不断轮询处理linkbuffer上剩余可读数据,同时每次轮询完后,都会回调用户设置好的OnRequest函数,就是上图的hanler函数
- 包装的模版任务会被提交到协程池中执行,也就是上图中的gopool
- 与内核的系统调⽤交互完全由⽹络库进⾏控制,⽤⼾对 Conn 的读写都只是在操作⼀段 Buffer ⽽已
-
client 端:
- 启动后,建立和server端的连接 , 从opCache对象池中获取一个空闲的FDOperator对象返回,然后等待直到client socket可写
- 调用connection提供的相关写api如malloc,先分配一块内存用于写数据
- 写完需要发送给server的数据,调用flush api进行数据提交
- flush api会首先尝试将数据写入socket内核缓冲区中,如果一次没写完,说明socket缓冲区写满了,此时会在poll上注册对当前socket fd可写事件监听
- 然后调用waitFlush api阻塞等待writeTrigger通道发送过来的可写通知
- 当poll线程监听到当前socket fd上发生了可写事件的时候,会向writeTrigger通道发送消息,唤醒等待的客户端
netpoll 对比 go net
netpoll 实现思路和 golang 原生网络库的区别如下:
- Go Net 使⽤ Epoll ET ,Netpoll 使⽤ LT。
- Netpoll 在⼤包场景下会占⽤更多的内存。Go Net 只有⼀个 Epoll 事件循环(因为 ET 模式被唤醒的少,且事件循环内⽆需负责读写,所以⼲的活少),⽽ Netpoll 允许有多个事件循环(循环内需要负责读写,⼲的活多,读写越重,越需要开更多 Loops)。
- Go Net ⼀个连接⼀个 Goroutine,Netpoll 连接数和 Goroutine 数量没有关系,和请求数有⼀定
关系,但是有 Gopool 重⽤。 - Go Net 不⽀持 Zero Copy,甚⾄于如果⽤⼾想要实现 BufferdConnection 这类缓存读取,还会产
⽣⼆次拷⻉。Netpoll ⽀持管理⼀个 Buffer 池直接交给⽤⼾,且上层⽤⼾可以不使⽤ Read(p []byte) 接⼝⽽使⽤特定零拷⻉读取接⼝对 Buffer 进⾏管理,实现零拷⻉能⼒的传递。
ET模式在高并发下调度压力比较大,因为 EventLoop 本⾝只是监听事件,真正的读写操作都在⽤⼾⾃⼰的 Goroutine 函数中执⾏,不由⽹络库控制;因此每次 EventLoop监听到事件发生后,都需要唤醒对应的线程去读写数据,这里存在上下文切换开销。
而 LT 单线程轮询对 cache/计算类业务更友好,因为 Cache 的特点是业务逻辑执⾏的⾮常快,所以在 readv 完了后可以⽴刻执⾏ handler 同时执⾏write,整个过程都不需要进⾏线程调度。对于计算类任务⽽⾔,越少协程切换能够让 CPU 尽可能少的做⽆效⼯作。
数据结构
此处只列举核心的几个对象:
- server 服务端对象:
type server struct {
operator FDOperator // ⽤来根据不同事件进⾏不同操作
ln Listener.
opts *options // 配置相关回调接口
onQuit func(err error) // 退出回调
connections sync.Map // 记录当前server上accept得到的活跃客户端连接: key=fd, value=connection
}
- 封装多路复用器操作的对象:
// FDOperator is a collection of operations on file descriptors.
type FDOperator struct {
// epoll 监听的 fd
FD int
// 监听到读写事件后的回调函数
OnRead func(p Poll) error // accept 事件回调
OnWrite func(p Poll) error // 客户端 socket 写回调
OnHup func(p Poll) error
// linkbuffer 与 socket 缓冲区之间的读写API
Inputs func(vs [][]byte) (rs [][]byte)
InputAck func(n int) (err error)
Outputs func(vs [][]byte) (rs [][]byte, supportZeroCopy bool)
OutputAck func(n int) (err error)
// epoll 对象
poll Poll
...
}
- 封装多路复用器的对象
type Poll interface {
Wait() error
Close() error
Trigger() error
Control(operator *FDOperator, event PollEvent) error
Alloc() (operator *FDOperator)
Free(operator *FDOperator)
}
type defaultPoll struct {
pollArgs
fd int // 监听的fd
wop *FDOperator // eventfd(轻量级进程通信机制), wake epoll_wait -- 用于唤醒wait上阻塞的线程
...
Handler func(events []epollevent) (closed bool) // 发生感兴趣事件时,回调该接口处理这些事件
}
type pollArgs struct {
...
events []epollevent // 发送/接收感兴趣事件
barriers []barrier。 // 用于实现分散读/集中写的向量缓冲区
}
type epollevent struct {
events uint32 // 事件位图
_ int32
data [8]byte // 注册感兴趣事件时,可以携带用户数据的指针
}
源码解析
多路复用池初始化
netpoll使用pollmanager维护着一组epoll对象池,以此来实现对象复用,每次有客户端新连接被Accept时,都会从epoll池中按照对应的负载均衡策略,pick出一个空闲的epoll对象来监听客户端连接上后续的读写事件。
netpoll多路复用池初始化的流程图如下所示:
具体源码如下:
// poll_manager.go
var pollmanager *manager // 多路复用器池子管理器
func init() {
var loops = runtime.GOMAXPROCS(0)/20 + 1
pollmanager = &manager{}
// 设置负载均衡器,默认采用轮询策略从epoll池中挑选空闲epoll
pollmanager.SetLoadBalance(RoundRobin)
// 设置epoll池的大小,同时会初始化池中的epoll对象
pollmanager.SetNumLoops(loops)
...
}
golang 程序启动时,会去自动调用每个go文件的init方法,所以pollmanager会在程序启动时被初始化。
真正初始化epoll池中epoll对象的逻辑是在设置eventLoopNum时完成的:
// poll_manager.go
func (m *manager) SetNumLoops(numLoops int) error {
..
// netpoll支持运行时动态调整epoll池大小,所以此处存在该分支
// 如果我们打算缩小epoll池大小,则进入下面这个分支
if numLoops < m.NumLoops {
// 创建一个新的epoll池
var polls = make([]Poll, numLoops)
for idx := 0; idx < m.NumLoops; idx++ {
// 对于无需缩减的部分,直接重新指向即可
if idx < numLoops {
polls[idx] = m.polls[idx]
} else {
// 对于需要缩减的部分,直接Close关闭该多路复用器
if err := m.polls[idx].Close(); err != nil {
logger.Printf("NETPOLL: poller close failed: %v\n", err)
}
}
}
// 更新多路复用池管理器的相关状态
m.NumLoops = numLoops
m.polls = polls
m.balance.Rebalance(m.polls)
// 如果是动态缩容,缩容完毕后,直接返回
return nil
}
// 进入初始化或者扩容逻辑
m.NumLoops = numLoops
return m.Run()
}
从上面代码可以看出,netpoll支持在运行时动态调整池子的大小,下面我们看看初始化和扩容逻辑是如何完成的:
// poll_manager.go
// 扩容或者初始化epoll池
func (m *manager) Run() (err error) {
defer func() {
if err != nil {
_ = m.Close()
}
}()
// 如果是初始化epoll池,此处的polls大小应该为0
// 如果时扩容逻辑,此处的polls大小为当前池中已有的多路复用器个数
for idx := len(m.polls); idx < m.NumLoops; idx++ {
var poll Poll
// 创建一个新的多路复用器
poll, err = openPoll()
if err != nil {
return
}
// 新创建的多路复用器追加到polls集合
m.polls = append(m.polls, poll)
// 每个多路复用器绑定一个协程,不断轮询注册到该epoll上的fd事件
go poll.Wait()
}
// 更新多路复用池管理器的相关状态
m.balance.Rebalance(m.polls)
return nil
}
初始化epoll池时,首先是将池中每个epoll对象创建出来:
// poll_default_linux.go
// 打开多路复用器
func openPoll() (Poll, error) {
return openDefaultPoll()
}
func openDefaultPoll() (*defaultPoll, error) {
var poll = new(defaultPoll)
poll.buf = make([]byte, 8)
// 创建Epoll对象
var p, err = EpollCreate(0)
...
// 保存epoll的fd
poll.fd = p
// eventfd是一种进程/线程通信的机制,他类似信号,不过eventfd只是一种通知机制
// 无法承载数据(eventfd承载的数据是8个字节),他的好处是简单并且只消耗一个fd
// 进程间通信机制: https://zhuanlan.zhihu.com/p/383395277
var r0, _, e0 = syscall.Syscall(syscall.SYS_EVENTFD2, 0, 0, 0)
...
// TODO: 这几个回调接口干啥的 ?
poll.Reset = poll.reset
// 处理当前epoll fd上所发生的感兴趣的事件
poll.Handler = poll.handler
// eventFd 通信机制
poll.wop = &FDOperator{FD: int(r0)}
// 在epoll上注册并监听eventFd的可读事件 -- 监听r0上的可读事件
if err = poll.Control(poll.wop, PollReadable); err != nil {
_ = syscall.Close(poll.wop.FD)
_ = syscall.Close(poll.fd)
return nil, err
}
// 初始化FDOperator缓存
poll.opcache = newOperatorCache()
return poll, nil
}
这里使用defaultPoll保存多路复用器上下文信息,同时还为每个多路复用器创建出了一个eventFD用于实现进程间通信,同时在当前epoll上注册监听eventFD的可读事件。
此处使用eventFD是为了epoll池关闭的时候,通知那些阻塞在epoll_wait系统调用上的线程可以醒过来,然后结束自己。
当创建出来多路复用器后,下一步便是将其加入epoll池中,最后为每个多路复用器绑定一个协程,然后不断轮询注册到该epoll上的fd事件:
// poll_default_linux.go
func (p *defaultPoll) Wait() (err error) {
// init
var caps, msec, n = barriercap, -1, 0
p.Reset(128, caps)
// wait
for {
if n == p.size && p.size < 128*1024 {
p.Reset(p.size<<1, caps)
}
// p.fd 就是 epoll fd
// events 就是挂载到epoll tree上的epoll item
// mesc 用于指定阻塞时间,是永久阻塞,还是阻塞一段时间,还是非阻塞IO
// 等待当前epoll上发生感兴趣的事件
n, err = EpollWait(p.fd, p.events, msec)
if err != nil && err != syscall.EINTR {
return err
}
// 如果没有发生感兴趣的事件,则将msec设置为-1,表示下一次采用永久阻塞策略来等待感兴趣的事件发生
// 然后调用Gosched完成协程调度
if n <= 0 {
msec = -1
runtime.Gosched()
continue
}
msec = 0
// 处理感兴趣的事件
if p.Handler(p.events[:n]) {
return nil
}
// we can make sure that there is no op remaining if Handler finished
p.opcache.free()
}
}
defaultPoll的Handler回调接口是在openDefaultPoll函数中被赋值的,实际调用的是poll_default_linux.go
文件中的handler函数:
// poll_default_linux.go
// 当epoll上有感兴趣的事件发生的时候,调用该函数进行处理
func (p *defaultPoll) handler(events []epollevent) (closed bool) {
var triggerRead, triggerWrite, triggerHup, triggerError bool
var err error
// 遍历所有感兴趣的事件
for i := range events {
// epollevent.data保存的是与之关联的FDOperator对象
operator := p.getOperator(0, unsafe.Pointer(&events[i].data))
if operator == nil || !operator.do() {
continue
}
var totalRead int
// 判断当前发生了什么事件
evt := events[i].events
triggerRead = evt&syscall.EPOLLIN != 0
triggerWrite = evt&syscall.EPOLLOUT != 0
triggerHup = evt&(syscall.EPOLLHUP|syscall.EPOLLRDHUP) != 0
triggerError = evt&syscall.EPOLLERR != 0
// trigger or exit gracefully
// 是否是eventFD可读事件发生了
if operator.FD == p.wop.FD {
// must clean trigger first
// 从eventFD中读取数据到buf中
syscall.Read(p.wop.FD, p.buf)
atomic.StoreUint32(&p.trigger, 0)
// if closed & exit
// 说明接收到了关闭信号,那么就关闭当前epoll
if p.buf[0] > 0 {
// 关闭eventFD
syscall.Close(p.wop.FD)
// 关闭epoll fd
syscall.Close(p.fd)
operator.done()
return true
}
operator.done()
continue
}
// 发生了可读事件
if triggerRead {
// 如果FDOperator上的OnRead回调接口不为空,说明发生的是客户端的accept事件
if operator.OnRead != nil {
// 调用OnRead来接收并处理客户端连接
operator.OnRead(p)
// 否则说明发生的是某个客户端连接上的可读事件
} else if operator.Inputs != nil {
// 每个poll对象会关联一个barriers结构,该结构用于实现分散读取与集中写入的系统调用
// 每个poll对象还会关联一个LinkBuffer对象,作为读写数据缓冲区
// 此处是从LinkBuffer中分配出一块空闲内存
var bs = operator.Inputs(p.barriers[i].bs)
if len(bs) > 0 {
// 读取数据到bs缓存区中
var n, err = ioread(operator.FD, bs, p.barriers[i].ivs)
// 推动读指针,让写入缓冲区的数据对消费者可见,同时调用用户注册的OnRequest回调接口,处理读数据
operator.InputAck(n)
...
}
} else {
logger.Printf("NETPOLL: operator has critical problem! event=%d operator=%v", evt, operator)
}
}
// 其他感兴趣事件的触发此处暂时不展开
...
}
Epoll相关API
本节介绍一下netpoll为Linux下的epoll系统调用封装的API接口:
Linux 底层的epoll系统调用由红黑树实现,netpoll 给红黑树上每个节点都关联一个epollevent类型,该类型由一个事件位图和用户数据指针组成:
// sys_epoll_linux_arm64.go
type epollevent struct {
events uint32 // events:表示要监听的事件类型,如可读、可写等。这是一个位掩码,可以设置多个事件类型,例如 EPOLLIN 表示可读事件,EPOLLOUT 表示可写事件。
_ int32
data [8]byte // 可以携带用户数据。这里的用户数据通常是一个指针,指向与文件描述符关联的对象或其他相关数据。
}
netpoll 还提供了对epoll对象创建,感兴趣事件监听,等待感兴趣事件发生等操作的API封装:
- 创建epoll对象
func EpollCreate(flag int) (fd int, err error) {
var r0 uintptr
// 执行epoll_create系统调用
r0, _, err = syscall.RawSyscall(syscall.SYS_EPOLL_CREATE1, uintptr(flag), 0, 0)
if err == syscall.Errno(0) {
err = nil
}
return int(r0), err
}
- 注册感兴趣事件
func EpollCtl(epfd int, op int, fd int, event *epollevent) (err error) {
_, _, err = syscall.RawSyscall6(syscall.SYS_EPOLL_CTL, uintptr(epfd), uintptr(op), uintptr(fd), uintptr(unsafe.Pointer(event)), 0, 0)
if err == syscall.Errno(0) {
err = nil
}
return err
}
- 等待感兴趣事件发生
func EpollWait(epfd int, events []epollevent, msec int) (n int, err error) {
var r0 uintptr
var _p0 = unsafe.Pointer(&events[0])
if msec == 0 {
r0, _, err = syscall.RawSyscall6(syscall.SYS_EPOLL_PWAIT, uintptr(epfd), uintptr(_p0), uintptr(len(events)), 0, 0, 0)
} else {
r0, _, err = syscall.Syscall6(syscall.SYS_EPOLL_PWAIT, uintptr(epfd), uintptr(_p0), uintptr(len(events)), uintptr(msec), 0, 0)
}
if err == syscall.Errno(0) {
err = nil
}
return int(r0), err
}
关于注册感兴趣的事件,netpoll在此基础之上又封装了一层:
// Control implements Poll.
func (p *defaultPoll) Control(operator *FDOperator, event PollEvent) error {
var op int
// TODO: evt.data = operator
var evt epollevent
// 将epollevent对象的data指针指向传入的FDOperator对象
p.setOperator(unsafe.Pointer(&evt.data), operator)
// 根据监听的事件类型,更新事件位图
switch event {
case PollReadable: // server accept a new connection and wait read
operator.inuse()
op, evt.events = syscall.EPOLL_CTL_ADD, syscall.EPOLLIN|syscall.EPOLLRDHUP|syscall.EPOLLERR
case PollWritable: // client create a new connection and wait connect finished
operator.inuse()
op, evt.events = syscall.EPOLL_CTL_ADD, EPOLLET|syscall.EPOLLOUT|syscall.EPOLLRDHUP|syscall.EPOLLERR
case PollDetach: // deregister
p.delOperator(operator)
op, evt.events = syscall.EPOLL_CTL_DEL, syscall.EPOLLIN|syscall.EPOLLOUT|syscall.EPOLLRDHUP|syscall.EPOLLERR
case PollR2RW: // connection wait read/write
op, evt.events = syscall.EPOLL_CTL_MOD, syscall.EPOLLIN|syscall.EPOLLOUT|syscall.EPOLLRDHUP|syscall.EPOLLERR
case PollRW2R: // connection wait read
op, evt.events = syscall.EPOLL_CTL_MOD, syscall.EPOLLIN|syscall.EPOLLRDHUP|syscall.EPOLLERR
}
// 完成监听事件信息注册
return EpollCtl(p.fd, op, operator.FD, &evt)
}
从Control函数中可以看出来,netpoll会在epollevent的data字段中保存监听的fd对象信息,这里fd对象是netpoll经过封装后的FDOperator对象。
FDOperator对象中又保存了对当前fd对象的读写API封装。
可读事件处理
可读事件有两类,一种是accept事件,另一种是readable事件:
accept事件针对的是server端
下面给出整个读事件处理的流程图,大家可以时不时回看本图:
server启动
服务提供方server在启动时,会创建一个新的server端套接字,然后在该套接字上打开并监听对应的端口,随后向poll manager获取一个空闲poller对象 , 并在该对象上监听server端套接字的可读事件,这里实际是客户端的accept事件:
netpoll server 方代码模版写法如下:
// 1. OnRequest: 有可读数据时回调该接口
var OnRequest = func(ctx context.Context, connection netpoll.Connection) error { return nil }
// 2. OnPrepare: 客户端连接建立完毕后,回调该接口
var OnPrepare = func(connection netpoll.Connection) context.Context { return nil }
func main() {
// 1. 建立连接
listen, err := net.Listen("tcp", ":1234")
if err != nil {
return
}
// 2. 创建eventLoop
eventLoop, _ := netpoll.NewEventLoop(OnRequest, netpoll.WithOnPrepare(OnPrepare), netpoll.WithReadTimeout(time.Second))
// 3. 启动服务
eventLoop.Serve(listen)
}
eventLoop的Serve方法会创建一个新的Server对象,并启动netpoll服务端:
// Serve implements EventLoop.
func (evl *eventLoop) Serve(ln net.Listener) error {
// 将原生的listener对象转换为netpoll包装后的Listener对象
npln, err := ConvertListener(ln)
if err != nil {
return err
}
evl.Lock()
// 创建新的server对象
evl.svr = newServer(npln,
// opts对象保存了相关事件回调
evl.opts,
// 退出事件回调监听函数
evl.quit)
// 启动回调
evl.svr.Run()
evl.Unlock()
// 监听到停止信号后,从此处返回
err = evl.waitQuit()
// ensure evl will not be finalized until Serve returns
runtime.SetFinalizer(evl, nil)
return err
}
创建完server对象后,会调用server对象的Run方法启动服务:
// Run this server.
func (s *server) Run() (err error) {
// 当前FDOperator对象封装的是server socket套接字对象
s.operator = FDOperator{
FD: s.ln.Fd(), // 服务端Socket监听器
OnRead: s.OnRead, // 可读事件发生
OnHup: s.OnHup, // 挂断事件发生
}
// 挑选一个空闲的多路复用器
s.operator.poll = pollmanager.Pick()
// 监听服务端套接字上的可读事件
err = s.operator.Control(PollReadable)
if err != nil {
// 错误退出时,回调该方法
s.onQuit(err)
}
return err
}
netpoll 在对server socket执行事件注册时,会设置FDOperator的OnRead接口,用于处理服务端套接字上的可读事件。
netpoll 也是通过FDOperator的OnRead接口是否为nil来判断当前发生的事件是accept还是readable事件。
只有server端启动时才会对服务端套接字设置OnRead回调接口,client端是不会设置的。
accept 事件
在defaultPoll的handler函数中,我们暂时只关心读事件是如何被处理的,而关于可读事件,本节我们来看看客户端accept事件是如何处理的:
// poll_default_linux.go
// 当epoll上有感兴趣的事件发生的时候,调用该函数进行处理
func (p *defaultPoll) handler(events []epollevent) (closed bool) {
var triggerRead, triggerWrite, triggerHup, triggerError bool
var err error
// 遍历所有感兴趣的事件
for i := range events {
// epollevent.data保存的是与之关联的FDOperator对象
operator := p.getOperator(0, unsafe.Pointer(&events[i].data))
...
var totalRead int
// 判断当前发生了什么事件
evt := events[i].events
triggerRead = evt&syscall.EPOLLIN != 0
triggerWrite = evt&syscall.EPOLLOUT != 0
triggerHup = evt&(syscall.EPOLLHUP|syscall.EPOLLRDHUP) != 0
triggerError = evt&syscall.EPOLLERR != 0
...
// 发生了可读事件
if triggerRead {
// 如果FDOperator上的OnRead回调接口不为空,说明发生的是客户端的accept事件
if operator.OnRead != nil {
// 调用OnRead来接收并处理客户端连接
operator.OnRead(p)
// 否则说明发生的是某个客户端连接上的可读事件
} else if operator.Inputs != nil {
// 每个poll对象会关联一个barriers结构,该结构用于实现分散读取与集中写入的系统调用
// 每个poll对象还会关联一个LinkBuffer对象,作为读写数据缓冲区
// 此处是从LinkBuffer中分配出一块空闲内存
var bs = operator.Inputs(p.barriers[i].bs)
if len(bs) > 0 {
// 读取数据到bs缓存区中
var n, err = ioread(operator.FD, bs, p.barriers[i].ivs)
// 推动读指针,让写入缓冲区的数据对消费者可见,同时调用用户注册的OnRequest回调接口,处理读数据
operator.InputAck(n)
...
}
} else {
logger.Printf("NETPOLL: operator has critical problem! event=%d operator=%v", evt, operator)
}
}
// 其他感兴趣事件的触发此处暂时不展开
...
}
回顾上面给出的handler函数可知,netpoll会依次遍历感兴趣的事件集合中每个事件,然后获取与当前事件绑定的FDOperator对象;首先判断当前发生的是否死可读事件,再根据FDOperator的OnRead接口是否为空,来判断发生的是accept事件,还是readable事件。
在server启动一节我们已经知道了,如果FDOperator的OnRead接口不为空,那么说明发生的是客户端的accept事件,此时会调用FDOperator的OnRead回调来处理客户端的连接事件;此处实际调用的是server的OnRead的方法;
// OnRead implements FDOperator.
func (s *server) OnRead(p Poll) error {
// 获取客户端连接
conn, err := s.ln.Accept()
...
// 包装一下原生的conn连接
var connection = &connection{}
// 初始化一下连接
connection.init(conn.(Conn),
// 初始化完毕后,回调用户注册进来的prepare接口
s.opts)
// 连接不活跃,直接返回
if !connection.IsActive() {
return nil
}
// 返回客户端连接套接字对应的文件描述符
var fd = conn.(Conn).Fd()
// 添加关闭回调接口 --- netpoll回调接口这里采用的是回调链的形式,可以添加多个回调接口
connection.AddCloseCallback(func(connection Connection) error {
// 当前连接关闭时,将自己从server连接集合中移除
s.connections.Delete(fd)
return nil
})
// 在server对象中保存 < fd , 已打开连接 >
s.connections.Store(fd, connection)
// 调用连接建立接口
connection.onConnect()
return nil
}
处理客户端accept事件的过程主要分为三步:
- 获取原生conn连接对象,对其进行包装,然后为当前连接初始化相关数据结构和回调接口
- 从poller池中挑选出一个poll对象与当前连接进行绑定,并在该poll上注册对当前连接可读事件的监听
- 为当前连接包装一个任务对象,然后丢入协程池中之行,该任务负责死循环轮询,发现可读数据立马回调用户提供的OnRequest接口进行处理
客户端连接初始化
server.OnRead函数中调用的connection.init函数主要是用来为当前连接初始化相关数据结构,回调接口,以及在poll上注册对当前connection可读事件的监听
// init initialize the connection with options
func (c *connection) init(conn Conn, opts *options) (err error) {
// init buffer, barrier, finalizer
c.readTrigger = make(chan error, 1)
c.writeTrigger = make(chan error, 1)
// 初始化LinkBuffer相关数据结构
c.bookSize, c.maxSize = pagesize, pagesize
c.inputBuffer, c.outputBuffer = NewLinkBuffer(pagesize), NewLinkBuffer()
c.outputBarrier = barrierPool.Get().(*barrier) // 用于聚集读写的缓冲区
// 初始化
c.initNetFD(conn) // 确保conn是被netpoll包装后的netFD类型
c.initFDOperator() // 初始化FDOperator
c.initFinalizer() // 添加close回调函数
// 将客户端连接套接字设置为非阻塞模式
syscall.SetNonblock(c.fd, true)
// enable TCP_NODELAY by default
switch c.network {
case "tcp", "tcp4", "tcp6":
// 禁用 Nagle 算法
setTCPNoDelay(c.fd, true)
}
// 启用零拷贝传输的 TCP Socket 选项 和 阻塞超时时间
if setZeroCopy(c.fd) == nil && setBlockZeroCopySend(c.fd, defaultZeroCopyTimeoutSec, 0) == nil {
c.supportZeroCopy = true
}
// connection initialized and prepare options
// 设置相关回调接口,在poll上注册对当前connection可读事件的监听
return c.onPrepare(opts)
}
netpoll 里面为原生的Listener,Connection,Epoll,Fd等对象都进行了一层自己的封装,initNetFD函数便是对原生客户端套接字文件描述符的封装:
func (c *connection) initNetFD(conn Conn) {
if nfd, ok := conn.(*netFD); ok {
c.netFD = *nfd
return
}
c.netFD = netFD{
fd: conn.Fd(),
localAddr: conn.LocalAddr(),
remoteAddr: conn.RemoteAddr(),
}
}
FDOperator 是对需要注册在epoll上进行监听的fd的封装,其是netpoll中的一个核心对象,内部持有被监听的fd和poll对象,同时对外提供fd数据读写回调接口 , 当fd上发生可读可写事件时,便会回调FDOperator上注册好的回调接口进行处理:
func (c *connection) initFDOperator() {
// 通过负载均衡器挑选一个可用的poll
poll := pollmanager.Pick()
// 从opcache中分配一个可用的poll对象
op := poll.Alloc()
// 拿到当前客户端连接对应的socket文件描述符
op.FD = c.fd
// 回调接口初始化 -- 注意OnRead回调被设置为了nil
op.OnRead, op.OnWrite, op.OnHup = nil, nil, c.onHup
op.Inputs, op.InputAck = c.inputs, c.inputAck
op.Outputs, op.OutputAck = c.outputs, c.outputAck
c.operator = op
}
connection 的 onPrepare 函数主要用来将用户提供的相关回调接口设置到当前connection对象上,以及相关读写超时参数等;如果用户提供了OnPrepare接口,此处会进行回调通知。
该函数最后会在当前poll上注册对当前客户端connection的读事件监听:
// onPrepare supports close connection, but not read/write data.
// connection will be registered by this call after preparing.
func (c *connection) onPrepare(opts *options) (err error) {
if opts != nil {
// 将用户通过options设置的回调接口都赋值给当前accept得到的客户端连接
c.SetOnConnect(opts.onConnect)
c.SetOnRequest(opts.onRequest)
c.SetReadTimeout(opts.readTimeout)
c.SetWriteTimeout(opts.writeTimeout)
c.SetIdleTimeout(opts.idleTimeout)
// calling prepare first and then register.
// 如果我们指定了onPrepare回调,此处会执行回调
if opts.onPrepare != nil {
c.ctx = opts.onPrepare(c)
}
}
// 初始化连接上下文
if c.ctx == nil {
c.ctx = context.Background()
}
// prepare may close the connection.
if c.IsActive() {
// 在当前poll上注册对当前客户端connection的读事件监听
return c.register()
}
return nil
}
connection的register函数负责在poll上注册对当前客户端connection的读事件监听:
// register only use for connection register into poll.
func (c *connection) register() (err error) {
err = c.operator.Control(PollReadable)
...
return nil
}
客户端连接建立
当accept得到的客户端连接初始化完毕后,会调用onConnect函数对客户端连接进行任务包装,然后提交到协程池执行任务:
// onConnect is responsible for executing onRequest if there is new data coming after onConnect callback finished.
func (c *connection) onConnect() {
// 获取用户设置的OnConnect回调和OnRequest回调接口 --- 如果没有设置OnConnect回调,此处直接返回
var onConnect, _ = c.onConnectCallback.Load().(OnConnect)
if onConnect == nil {
return
}
var onRequest, _ = c.onRequestCallback.Load().(OnRequest)
var connected int32
c.onProcess(
// 第一个回调函数用于判断当前是否连接此刻是否可被处理
func(c *connection) bool {
// 在当前客户端连接初始化完毕后,会在onConnect函数中回调一次客户端提供的OnConnect接口
// 此处通过标记确保只会调用一次OnConnect函数
if atomic.LoadInt32(&connected) == 0 {
return true
}
// check for onRequest
return onRequest != nil &&
// 存在可读数据
c.Reader().Len() > 0
},
// 第二个回调函数会在第一个回调函数返回true的前提下,进行处理
func(c *connection) {
// 回调OnConnect函数
if atomic.CompareAndSwapInt32(&connected, 0, 1) {
c.ctx = onConnect(c.ctx, c)
return
}
// 处理可读数据,回调用户提供的回调函数
if onRequest != nil {
_ = onRequest(c.ctx, c)
}
},
)
}
onProcess 函数内部负责实现一套模版方法,用于不断轮询连接状态,如果可处理,则调用执行处理,直到接收到停止信号或连接不可处理时,才会退出循环:
// onProcess is responsible for executing the process function serially,
// and make sure the connection has been closed correctly if user call c.Close() in process function.
func (c *connection) onProcess(isProcessable func(c *connection) bool, process func(c *connection)) (processed bool) {
...
// 准备任务
var task = func() {
// 如果当前任务可执行,确保至少被执行过一次
if isProcessable(c) {
process(c)
}
// 死循环处理任务,直到接收到关闭信号或者任务不再可处理
var closedBy who
for {
closedBy = c.status(closing)
// close by user or no processable
if closedBy == user || !isProcessable(c) {
break
}
process(c)
}
...
return
}
// 异步跑这个任务 --- gopool.CtxGo 字节开源的协程池
runTask(c.ctx, task)
return true
}
但是这里要注意的是,如果连接上一段时间都没有可读数据,那么与当前连接绑定的协程在发现无数据可读时,会退出返回,也就是说当前协程就与当前连接解绑,并重新放回了协程池中。
大家要注意此处netpoll的实现思路:
- 连接初始化完毕的最后,会调用onConnect函数,该函数主要作用是调用用户设置好的onConnect回调,通知用户连接已经建立完毕了;而还需要OnRequest回调,只是为了顺道检查是否有可读数据准备就绪,如果准备就绪了,那么就顺道处理一波。
- onProcess函数主要做的事情就是不断轮询处理当前连接上的可读数据,直到接送到停止信号或者当前连接此刻没有可读数据了,则结束轮询,释放当前协程。
netpoll 通过一个单独的协程来监听fd上的可读可写事件,当监听到可读可写事件时,不是在当前协程内进行同步处理,而是将可读可写事件包装为一个任务,然后从协程池中取出一个空闲协程进行处理,这是典型的Reactor模式实现思路。
可读事件
当netpoll accept到一个连接后,会从poller池中挑选一个空闲poll,然后在当前poll上执行对当前conn可读事件的监听。
后续当conn上发生可读事件时,便会被与该conn绑定的poll感知到,然后通过判断FDOperator的OnRead接口为nil,知道当前发生的是可读事件,而非accept事件。
此时我们再来回看defaultPoll的handler,看看当发生可读事件时,netpoll是如何处理的:
// poll_default_linux.go
// 当epoll上有感兴趣的事件发生的时候,调用该函数进行处理
func (p *defaultPoll) handler(events []epollevent) (closed bool) {
var triggerRead, triggerWrite, triggerHup, triggerError bool
var err error
// 遍历所有感兴趣的事件
for i := range events {
// epollevent.data保存的是与之关联的FDOperator对象
operator := p.getOperator(0, unsafe.Pointer(&events[i].data))
...
var totalRead int
// 判断当前发生了什么事件
evt := events[i].events
triggerRead = evt&syscall.EPOLLIN != 0
triggerWrite = evt&syscall.EPOLLOUT != 0
triggerHup = evt&(syscall.EPOLLHUP|syscall.EPOLLRDHUP) != 0
triggerError = evt&syscall.EPOLLERR != 0
...
// 发生了可读事件
if triggerRead {
// 如果FDOperator上的OnRead回调接口不为空,说明发生的是客户端的accept事件
if operator.OnRead != nil {
// 调用OnRead来接收并处理客户端连接
operator.OnRead(p)
// 否则说明发生的是某个客户端连接上的可读事件
} else if operator.Inputs != nil {
// 每个poll对象会关联一个barriers结构,该结构用于实现分散读取与集中写入的系统调用
// 每个poll对象还会关联一个LinkBuffer对象,作为读写数据缓冲区
// 此处是从LinkBuffer中分配出一块空闲内存
var bs = operator.Inputs(p.barriers[i].bs)
if len(bs) > 0 {
// 读取数据到bs缓存区中
var n, err = ioread(operator.FD, bs, p.barriers[i].ivs)
// 推动读指针,让写入缓冲区的数据对消费者可见,同时调用用户注册的OnRequest回调接口,处理读数据
operator.InputAck(n)
...
}
} else {
logger.Printf("NETPOLL: operator has critical problem! event=%d operator=%v", evt, operator)
}
}
// 其他感兴趣事件的触发此处暂时不展开
...
}
关于LinkBuffer的源码解析本文就不过多展开了,感兴趣的小伙伴可以阅读我之前写的这篇文章:
- 字节开源的netPoll底层LinkBuffer设计与实现
FDOperator的Inputs和InputAck回调接口都是在客户端连接初始化时,在initFDOperator方法中被设置的:
func (c *connection) initFDOperator() {
...
op.Inputs, op.InputAck = c.inputs, c.inputAck
op.Outputs, op.OutputAck = c.outputs, c.outputAck
...
}
connection的inputs函数就是调用linkbuffer提供的book方法预定一块内存用于接收socket缓冲区中的可读数据:
// inputs implements FDOperator.
func (c *connection) inputs(vs [][]byte) (rs [][]byte) {
vs[0] = c.inputBuffer.book(c.bookSize, c.maxSize)
return vs[:1]
}
inputAck则复杂一些,首先会调用linkbuffer的bookAck函数完成预留内存的提交,这样已经从socket缓冲区写入linkbuffer的数据就对用户可见了:
// inputAck implements FDOperator.
func (c *connection) inputAck(n int) (err error) {
...
// 提交预留内存,提交后,用户便可以读取这部分内存数据了
length, _ := c.inputBuffer.bookAck(n)
...
var needTrigger = true
// 从协程池中取出一个空闲协程来处理当前连接上的可读数据
if length == n { // first start onRequest
needTrigger = c.onRequest() // 返回值表示是否读取完毕了所有需要的数据, 如果返回false,说明读完了,否则说明没有读完
}
// 单开协程处理客户端连接上的可读数据时,可能在回调用户OnRequest接口时,调用读数据接口从而阻塞等待数据准备就绪
// 此处当数据就绪时,会唤醒对应的协程
if needTrigger && length >= int(atomic.LoadInt64(&c.waitReadSize)) {
c.triggerRead(nil)
}
return nil
}
此处调用connection的onRequest方法,并非直接就是调用的用户提供的回调接口,而是和OnConnect方法一样,创建一个读数据任务去处理当前连接上的可读数据:
// onRequest is responsible for executing the closeCallbacks after the connection has been closed.
func (c *connection) onRequest() (needTrigger bool) {
// 加载用户设置的回调接口
var onRequest, ok = c.onRequestCallback.Load().(OnRequest)
if !ok {
return true
}
// 处理请求
processed := c.onProcess(
// 第一个回调函数用于判断当前连接是否活跃并且还有未读取数据
func(c *connection) bool {
return c.Reader().Len() > 0
},
// 第二个回调才是真正将请求交给用户回调来处理
func(c *connection) {
_ = onRequest(c.ctx, c)
},
)
// if not processed, should trigger read
return !processed
}
connection的onProcess方法上文已经说过,就是从协程池中捞取一个空闲协程来处理当前连接上的可读数据;
如果当前连接上一直有数据可读,便会一直处理,如果当前协程上没有数据可读了,协程便会被释放,重新返回池中。
等待读取数据
上文说到,当poll线程监听到可读可写数据的时候,会单开一个线程去处理当前连接上的可读可写数据;如果此时发生的是可读事件,那么最终会回调到用户提供的OnRequest接口。
而用户可以在OnRequest接口中去调用connection相关读API去读取数据:
// ReadString implements Connection.
func (c *connection) ReadString(n int) (s string, err error) {
if err = c.waitRead(n); err != nil {
return s, err
}
return c.inputBuffer.ReadString(n)
}
// ReadBinary implements Connection.
func (c *connection) ReadBinary(n int) (p []byte, err error) {
if err = c.waitRead(n); err != nil {
return p, err
}
return c.inputBuffer.ReadBinary(n)
}
// Next implements Connection.
func (c *connection) Next(n int) (p []byte, err error) {
if err = c.waitRead(n); err != nil {
return p, err
}
return c.inputBuffer.Next(n)
}
这些读API在方法开头都会调用waitRead等待所读数据量就绪后或者读超时后,才会进行数据读取或者超时返回:
// waitRead will wait full n bytes.
func (c *connection) waitRead(n int) (err error) {
// 如果当前可读数据大于需要的了,直接返回,无需等待
if n <= c.inputBuffer.Len() {
return nil
}
// 存储自己希望读取的数据量
atomic.StoreInt64(&c.waitReadSize, int64(n))
// 返回时,清空变量
defer atomic.StoreInt64(&c.waitReadSize, 0)
// 如果设置了读超时属性,就有限期等待,直到数据就绪
if c.readTimeout > 0 {
return c.waitReadWithTimeout(n)
}
// wait full n
// 否则无限期等待,直到数据准备就绪
for c.inputBuffer.Len() < n {
switch c.status(closing) {
case poller:
return Exception(ErrEOF, "wait read")
case user:
return Exception(ErrConnClosed, "wait read")
default:
// 等待接收数据就绪的信号
err = <-c.readTrigger
if err != nil {
return err
}
}
}
return nil
}
// waitReadWithTimeout will wait full n bytes or until timeout.
// 有限期等待
func (c *connection) waitReadWithTimeout(n int) (err error) {
// set read timeout
if c.readTimer == nil {
c.readTimer = time.NewTimer(c.readTimeout)
} else {
c.readTimer.Reset(c.readTimeout)
}
for c.inputBuffer.Len() < n {
switch c.status(closing) {
case poller:
// cannot return directly, stop timer first!
err = Exception(ErrEOF, "wait read")
goto RET
case user:
// cannot return directly, stop timer first!
err = Exception(ErrConnClosed, "wait read")
goto RET
default:
select {
case <-c.readTimer.C:
// double check if there is enough data to be read
if c.inputBuffer.Len() >= n {
return nil
}
return Exception(ErrReadTimeout, c.remoteAddr.String())
case err = <-c.readTrigger:
if err != nil {
return err
}
continue
}
}
}
RET:
// clean timer.C
if !c.readTimer.Stop() {
<-c.readTimer.C
}
return err
}
可写事件处理
可写事件有两类,一种是client端socket套接字可写事件,另一种是server端socket套接字可写事件:
注意区分server socket和socket套接字的区别 , 前者是server端启动绑定并监听的套接字,用于accept客户端连接,后者是accept得到的客户端socket连接套接字 和 客户端connect 服务端成功后得到的 socket套接字。
下面还是给出一幅写数据流程图:
客户端启动
客户端代码典型写法如下:
func main() {
// 1. 建立连接
dialer := netpoll.NewDialer()
conn, _ := dialer.DialConnection("tcp", ":1234", time.Second)
var reader, writer = conn.Reader(), conn.Writer()
// 2. 写数据
write_data := []byte("hello world")
alloc, _ := writer.Malloc(len(write_data))
copy(alloc, write_data) // write data
writer.Flush()
// 3. 读数据
buf, _ := reader.Next(reader.Len())
fmt.Println("服务端响应的数据:" + string(buf))
reader.Release()
}
我们下面来看一下客户端启动过程:
- 建立连接
func (d *dialer) DialConnection(network, address string, timeout time.Duration) (connection Connection, err error) {
...
switch network {
case "tcp", "tcp4", "tcp6":
// 走tcp连接
return d.dialTCP(ctx, network, address)
...
}
}
func (d *dialer) dialTCP(ctx context.Context, network, address string) (connection *TCPConnection, err error) {
...
connection, err = DialTCP(ctx, "tcp", nil, tcpAddr)
...
return nil, firstErr
}
func DialTCP(ctx context.Context, network string, laddr, raddr *TCPAddr) (*TCPConnection, error) {
...
sd := &sysDialer{network: network, address: raddr.String()}
c, err := sd.dialTCP(ctx, laddr, raddr)
...
return c, nil
}
- 连接建立成功后,返回对应的客户端连接socket
func (sd *sysDialer) dialTCP(ctx context.Context, laddr, raddr *TCPAddr) (*TCPConnection, error) {
conn, err := internetSocket(ctx, sd.network, laddr, raddr, syscall.SOCK_STREAM, 0, "dial")
...
return newTCPConnection(conn)
}
func internetSocket(ctx context.Context, net string, laddr, raddr sockaddr, sotype, proto int, mode string) (conn *netFD, err error) {
...
return socket(ctx, net, family, sotype, proto, ipv6only, laddr, raddr)
}
func socket(ctx context.Context, net string, family, sotype, proto int, ipv6only bool, laddr, raddr sockaddr) (netfd *netFD, err error) {
// syscall.Socket & set socket options
var fd int
// 创建客户端socket对象,同时设置为非阻塞模式
fd, err = sysSocket(family, sotype, proto)
if err != nil {
return nil, err
}
...
// 包装客户端socket fd 为netFD
netfd = newNetFD(fd, family, sotype, net)
// 建立与server的连接
err = netfd.dial(ctx, laddr, raddr)
...
return netfd, nil
}
func (c *netFD) dial(ctx context.Context, laddr, raddr sockaddr) (err error) {
...
// 连接server
if crsa, err = c.connect(ctx, lsa, rsa); err != nil {
return err
}
...
return nil
}
- 通过connect系统调用真正完成连接建立,然后为当前client socket创建一个FDOperator
func (c *netFD) connect(ctx context.Context, la, ra syscall.Sockaddr) (rsa syscall.Sockaddr, retErr error) {
// 系统调用connect,连接server
syscall.Connect(c.fd, ra)
...
// 为当前client socket创建一个新的FDOperator
c.pd = newPollDesc(c.fd)
for {
// 等待直到当前client socket可写为止
if err := c.pd.WaitWrite(ctx); err != nil {
return nil, err
}
switch err := syscall.Errno(nerr); err {
...
// 如果没有错误发生,直接返回Sockaddr
case syscall.Errno(0):
// The runtime poller can wake us up spuriously;
// see issues 14548 and 19289. Check that we are
// really connected; if not, wait again.
if rsa, err := syscall.Getpeername(c.fd); err == nil {
return rsa, nil
}
...
}
}
}
- 为当前client socket创建一个FDOperator,同时设置其OnWrite回调接口
func newPollDesc(fd int) *pollDesc {
pd := &pollDesc{}
poll := pollmanager.Pick()
pd.operator = &FDOperator{
poll: poll,
FD: fd,
OnWrite: pd.onwrite, // 设置OnWrite回调接口
OnHup: pd.onhup,
}
pd.writeTrigger = make(chan struct{})
pd.closeTrigger = make(chan struct{})
return pd
}
- 注册可写事件,同时等待直到client socket可写
// WaitWrite .
func (pd *pollDesc) WaitWrite(ctx context.Context) (err error) {
if pd.operator.isUnused() {
// add ET|Write|Hup
// 在当前连接绑定的poll上注册等待可写事件
if err = pd.operator.Control(PollWritable); err != nil {
logger.Printf("NETPOLL: pollDesc register operator failed: %v", err)
return err
}
}
// 等待直到接收到终止信号或者可写事件(当前client socket缓冲区为空)
select {
case <-pd.closeTrigger: // triggered by poller
// no need to detach, since poller has done it in OnHup.
return Exception(ErrConnClosed, "by peer")
case <-pd.writeTrigger: // triggered by poller
err = nil
case <-ctx.Done(): // triggered by ctx
pd.detach()
pd.operator.unused()
err = mapErr(ctx.Err())
}
// double check close trigger
// 如果没有接收到停止信号,此处就直接返回
select {
case <-pd.closeTrigger:
return Exception(ErrConnClosed, "by peer")
default:
return err
}
}
写数据
当我们需要写数据时,通常都会先调用connection的malloc方法分配一块写缓冲区:
func (c *connection) Malloc(n int) (buf []byte, err error) {
return c.outputBuffer.Malloc(n)
}
然后调用connection的flush方法刷新写缓冲区中的数据:
func (c *connection) Flush() error {
...
// 刷新写缓冲区中的数据,让其对外可见
c.outputBuffer.Flush()
// 将数据写入内核socket缓冲区
return c.flush()
}
最终调用connection的flush方法,将linkbuffer中的数据写入socket内核缓冲区中:
func (c *connection) flush() error {
...
// netpoll采用聚集写,所以第一步是将写缓冲区中的数据都读取到写缓冲区向量数组中
var bs = c.outputBuffer.GetBytes(c.outputBarrier.bs)
// 将数据都写入内核socket缓冲区中
var n, err = sendmsg(c.fd, bs, c.outputBarrier.ivs, false && c.supportZeroCopy)
...
// 如果写入了部分数据,则释放掉这部分内存中间
if n > 0 {
err = c.outputBuffer.Skip(n)
c.outputBuffer.Release()
if err != nil {
return Exception(err, "when flush")
}
}
// 如果所有数据都已经成功写入内核socket缓冲区中,则直接返回
if c.outputBuffer.IsEmpty() {
return nil
}
// 可能是因为socket缓冲区满了,导致还有一部分数据没写完
// 此处注册对可写事件的监听
err = c.operator.Control(PollR2RW)
...
// 等待直到可写才会返回
return c.waitFlush()
}
如果socket内核缓冲区被写满了,则进行等待,具体是进行无限期等待,还是有限期等待,取决于我们是否设置了写超时时间:
func (c *connection) waitFlush() (err error) {
// 如果我们没有设置写超时事件,则进行无限期等待
if c.writeTimeout == 0 {
select {
case err = <-c.writeTrigger:
}
return err
}
// 如果我们设置了写超时事件,则执行有限期等待
if c.writeTimer == nil {
c.writeTimer = time.NewTimer(c.writeTimeout)
} else {
c.writeTimer.Reset(c.writeTimeout)
}
select {
case err = <-c.writeTrigger:
if !c.writeTimer.Stop() { // clean timer
<-c.writeTimer.C
}
return err
case <-c.writeTimer.C:
select {
// try fetch writeTrigger if both cases fires
case err = <-c.writeTrigger:
return err
default:
}
// if timeout, remove write event from poller
// we cannot flush it again, since we don't if the poller is still process outputBuffer
c.operator.Control(PollRW2R)
return Exception(ErrWriteTimeout, c.remoteAddr.String())
}
}
可写事件
可写事件分为两类,一类是客户端socket可写,一类是服务端socket可写,本节我们来分别看看这两类可写事件都是如何处理的:
// 当感兴趣事件发生的时候,调用该函数进行处理
func (p *defaultPoll) handler(events []epollevent) (closed bool) {
var triggerRead, triggerWrite, triggerHup, triggerError bool
var err error
// 遍历所有感兴趣的事件
for i := range events {
...
// 触发写事件
if triggerWrite {
// 处理client socket可写事件
if operator.OnWrite != nil {
operator.OnWrite(p)
} else if operator.Outputs != nil {
// 处理服务端socket可写事件
var bs, supportZeroCopy = operator.Outputs(p.barriers[i].bs)
if len(bs) > 0 {
// TODO: Let the upper layer pass in whether to use ZeroCopy.
var n, err = iosend(operator.FD, bs, p.barriers[i].ivs, false && supportZeroCopy)
operator.OutputAck(n)
if err != nil {
p.appendHup(operator)
continue
}
}
} else {
logger.Printf("NETPOLL: operator has critical problem! event=%d operator=%v", evt, operator)
}
}
operator.done()
}
...
}
客户端socket可写事件
当客户端socket可写事件发生时,也就是客户端socket内核缓冲区有空闲空间可写时,会调用FDOperator的onwrite回调方法进行处理。
onwrite回调中会向writeTrigger通道写入消息,唤醒阻塞等待可写事件的线程:
func (pd *pollDesc) onwrite(p Poll) error {
select {
case <-pd.writeTrigger:
default:
pd.detach()
close(pd.writeTrigger)
}
return nil
}
服务端socket可写事件
当服务端socket可写事件发生时,也就是在server accept到客户端连接后,发现客户端连接对应的socket可写时,会经历下面三步:
- 如果写缓冲区数据为空,那么就移除对当前fd上可写事件的监听,否则读取数据到传入的vs缓冲区中
// outputs implements FDOperator.
func (c *connection) outputs(vs [][]byte) (rs [][]byte, supportZeroCopy bool) {
// 如果写缓冲区为空
if c.outputBuffer.IsEmpty() {
// 移除对当前fd上可写事件的监听
c.rw2r()
return rs, c.supportZeroCopy
}
// 读取数据到rs中
rs = c.outputBuffer.GetBytes(vs)
return rs, c.supportZeroCopy
}
// 不再监听当前FD上的可写事件
func (c *connection) rw2r() {
c.operator.Control(PollRW2R)
c.triggerWrite(nil) // 唤醒等到可写事件的线程
}
- 采用分散写技术,将bs向量中所有数据写入socket内核缓冲区中
var n, err = iosend(operator.FD, bs, p.barriers[i].ivs, false && supportZeroCopy)
3,释放掉已经用完的写缓冲区空间,同时移除对当前fd上可写事件的监听
func (c *connection) outputAck(n int) (err error) {
// 将已经用完的部分内存回收掉
if n > 0 {
c.outputBuffer.Skip(n)
c.outputBuffer.Release()
}
// 如果此时发现所有待写入数据都写入完毕了,那么就移除对当前fd上可写事件的监听
if c.outputBuffer.IsEmpty() {
c.rw2r()
}
return nil
}