1. base 模块
1.1 API
1.1.1 eventfd
int eventfd(unsigned int initval, int flags);
(1)类似信号量;其内部保存了一个 uint64_t
计数器 count
,使用 initval 初始化;
(2)read
没有设置 EFD_SEMAPHORE 并且 count
不为 0,返回 count
值,并将 count
设为 0;
如果 count
值为 0,阻塞直到其非 0; 设置 EFD_NONBLOCK 后,返回 EAGAIN
(3)write
调用整数相加,count
最大值为 (64位最大值 - 1),如果超过该值会阻塞;
(4)IO 复用事件
可读:计数器大于 0
可写:不会阻塞的情况下,至少还能加 1;
(5)主要作用:用来替换只作为发出事件信号的 pipe
参考网址:https://www.man7.org/linux/man-pages/man2/eventfd.2.html
1.1.2 timerfd
参考网址:https://www.man7.org/linux/man-pages/man2/timerfd_create.2.html
(1)timerfd_create
创建定时器对象
(2)timerfd_settime
启动 / 停止定时器( new_value.it_value
设为非 0 或 0,该值是相对值,相对调用时的当前时间)
struct itimerspec
{
struct timespec it_interval;
struct timespec it_value;
};
it_interval
设为 0,只触发一次;非 0 会重复触发;
(3)交互
可读:上次调用 timerfd_settime()
启动后,触发一次或多次
调用 read
时,返回 uint64_t
整数,表明触发次数;
1.2 其他类
1.2.1 ThreadPool
ThreadPool
任务队列:生产者消费者模型
(1) setMaxQueueSize 设置队列的最大大小
(2) start 创建并启动 numThreads 个线程,都放入了 threads_ 中,线程函数为 runInThread
(3) run 是向任务队列中添加任务;
(4) runInThread 不断调用 take 函数,从任务队列中取出任务并执行;(如果有 threadInitCallback_,需要先执行该初始化函数)
1.2.2 CountDownLatch
CountDownLatch
倒计时,需要显式初始化指明最开始的计数值;内部封装了一个条件变量和一个锁(只有自己使用),当计数减为 0 时,会等待;
(1) countDown 函数;计数减1,若减为 0,则唤醒所有阻塞线程;
(2) wait 函数;等待 当前计数变为 0;(若为 0,则返回);
1.2.3 Thread
Thread
template< class R, class... Args >
class function<R(Args...)>;
R 为返回值类型,Args 为可变参数
(1) start; 内部调用了 pthread_create 创建线程,使用初始计数为 1 的 CountDownLatch 来确保获取到 tid 后再返回
(2) join; 内部调用了 pthread_join
(3) ~Thread; 不会等待线程结束,如果没有调用 join 函数,就会调用 pthread_detach;
(1)pthread_create
和 gettid
区别
pthread_create
返回的是 pthread_t 类型,是一个进程中各个线程之间的标识号,对于这个进程内是唯一的,而不同进程中,每个线程返回线程 ID 的可能是一样的。glibc 的 Pthreads 只保证同一进程内,同一时刻的各个线程 id 不同,不能保证同一进程先后多个线程具有不同 id;
在 Linux 下,建议使用gettid
的返回值作为线程 id;其类型是 pid_t,保证任何时刻都是全局唯一的,0 是非法值;
而gettid
获取的 线程ID 和 PID 是有关系的,因为在linux中线程其实也是一个进程(clone)。在一个进程中,主线程的线程ID 和进程 PID 是一样的,gettid是不可移植的。
(2)为了效率,使用 thread local 缓存了线程 tid
// 内部调用 gettid() ,并缓存了线程 tid;
CurrentThread::tid()
调用 fork
如何保证子进程不会看到这个缓存结果?
使用 pthread_atfork
注册回调函数 child
清空缓存,重新调用 tid()
;
1.2.4 Condition
对条件变量进行了封装;MutexLock
的 holder_
中记录了当前持有锁的线程 TID;
(1)Condition 构造时,MutexLock
已经构造完毕,当该线程阻塞在该条件变量时,其他线程获取到锁时,需要改变 holder_
值;
直观想法是,在调用 pthread_cond_wait
前后重新对 holder_
进行赋值,这里采用了 UnassignGuard
的思想,构造时,将 holder_
赋值为 0,析构时,赋值为当前线程 TID;
1.2.5 ThreadLocal
(1)内部创建了 pthread_key_t 类型变量;
1.2.6 FixedBuffer
内部为 固定大小的字符数组 char data_[SIZE]
向 Buffer 中写数据时调用 memcpy
cur_
指向当前待写入的位置;
全初始化为 0,内部调用 memset
1.2.7 AsyncLogging
使用 unique_ptr
指向缓存块 Buffer
;
双缓存方案,前端和后端分别使用两块缓存;
(1)构造时
初始化一个后台线程,申请两块内存,buffers_
数组大小设置为 16;
(2)start
启动后台线程,这里 CountDownLatch
的作用是确保线程函数执行后,再退出 (确保在线程中 running_
一定为 Ture
,确保不发生重排序? );
(3)append
buffers_
为当前写满的缓存块数组;
前端调用,向缓存区写入 Log,写满缓存后,唤醒后台线程清理缓存;
currentBuffer_
写满后会替换为 nextBuffer_
,若其也写满则申请新的缓存块;
(4)threadFunc
持有两个空的缓存块,newBuffer1
和 newBuffer2
,局部变量数组 buffersToWrite
后端线程调用(若当前 buffers_
为空,则阻塞;每隔一段时间醒来或可有前端提前唤醒);
将 currentBuffer_
放入 buffers_
,使用 newBuffer1
替换 currentBuffer_
,交换 buffersToWrite
和 buffers_
,若 nextBuffer_
为空,使用 newBuffer2
替换;
将 buffersToWrite
中缓存块写入日志文件,对 newBuffer1
和 newBuffer2
重新赋值,清空其他缓存块;
2. net 模块
2.1 时间相关类
2.1.1 Timestamp
内部使用 int64_t
表示,以 微秒 为单位
(1)now()
静态方法,返回 Timestamp
(调用时的当前时间构造)
内部调用 gettimeofday(&tv, NULL);
,
会计算从1970年1月1号00:00(UTC)到当前的时间跨度
struct timeval
{
__time_t tv_sec; /* Seconds. */
__suseconds_t tv_usec; /* Microseconds. */
};
time()
返回的 time_t ,是以秒为单位的;
(2)toFormattedString
gmtime_r
将 time_t 转换为年月日时分秒的形式;
2.1.2 Timer
时间事件,包含 到期时间、时间回调函数、时间间隔 interval_
、是否可重复触发、sequence_
(使用静态原子整数生成序列号)
(1)run
调用时间回调函数
(2)restart( now )
将到期时间改为 now + interval_
2.1.3 TimerId
包含一个指向 Timer 的指针和一个 sequence_
2.1.4 TimerQueue
内部使用 set
作为 TimerList timers_
,按到期时间 Timestamp
排序各时间事件
(1)构造函数
创建 timerfd 并使用 timerfd 设置 Channel,设置其读回调函数为 handleRead
(2)handleRead
-
1.获取所有过期的时间事件,从
activeTimers_
中删除这些事件; -
2.清空
cancelingTimers_
,调用每个事件对应的回调函数 -
3.调用
reset
函数
(3)insert
将 Timer 根据到期时间插入到 timers_
和 activeTimers_
中;
其返回值标志,新加入的 Timer 是否是第一个元素(即到期时间最早的)
(4)reset
将可重复触发并且不在 cancelingTimers_
中的时间事件调用 restart
,重新插入到 timers_
和 activeTimers_
中;
否则就进行删除
如果当前 timers_
不为空,获取下次到期时间,如果该时间有效,使用该时间调用 resetTimerfd
设置下次定时器到期时间;
(5)resetTimerfd
设置时钟下次到期时间,最小为 100 微妙
(6)addTimer
新构造一个 Timer(动态创建)
(7)addTimerInLoop
对时间事件调用 insert
;
(8)cancelInLoop
若其在 activeTimers_
中,则将其从中删除,并释放;
否则,若 callingExpiredTimers_
为 True,将其加入到 cancelingTimers_
中;(只有在 handleRead
中的第 2 步才有可能发生)
2.2 Channel
封装了 一个 fd 对应的事件及回调函数;(只属于一个 EventLoop)
events_
表示 fd 上的注册事件
revents_
接收到的事件(触发的事件)
封装了一些 readCallback_
回调函数;
index_
表明是否添加到了 Poller
中(-1 为未添加,1 为已添加,2 为已删除)
(1)handleEvent
根据 revents_
调用相应的回调函数
(2)disenableXXX
/enableXXX
方法都会调用 update()
注册读 / 写事件
设置 addedToLoop_
为 Ture
调用 EventLoop 的 updateChannel
函数
调用该方法时,必定是 loop 线程;
(
在 EventLoop 构造时,注册 timefd、wakefd 可读
客户端 Connector::connecting
中,注册 sockfd 可写
服务端 Acceptor::listen()
,注册 listenfd 可读
回调函数中调用
)
2.3 Poller
IO 复用基类(纯虚基类)
channels_
为哈希表,key 为 fd,val 为 channel
(1)poll
内部调用 epoll_wait
如果有就绪的事件,调用 fillActiveChannels
(2)fillActiveChannels
设置触发 channel 的 revents_
将当前触发的 channel,加入到 activeChannels_
(3)updateChannel
根据 channel 的 index_
调用 epoll_ctl
;
如果 index_
为 -1,需要添加到 channels_
中
(4)removeChannel
从 channels_
中删除 channel;
2.4 事件循环类
2.4.1 EventLoop
记录下创建 EventLoop 的线程 threadId_
(1)loop
循环执行下面语句:
- 清空
activeChannels_
- 调用
poll
- 调用
activeChannels_
中的每个 channel 的handleEvent
- 调用
doPendingFunctors
(2)doPendingFunctors
callingPendingFunctors_
设为 Ture
使用 swap 技巧减少了临界区;
调用 pendingFunctors_
中的每个函数;
callingPendingFunctors_
设为 False
(3)runInLoop
如果调用线程是 threadId_
,直接运行函数;否则,调用 queueInLoop
(4)queueInLoop
将 回调函数 加入到 pendingFunctors_
中(此步有锁保护)
如果调用线程不是 threadId_
,调用 wakeup
(5)wakeup
eventfd_
加 1
(6)updateChannel
断言调用线程为 threadId_
调用 poller 的 updateChannel
(7)添加定时器事件的流程如下图所示(在非 IO 线程中)
等定时器到期后,会从 poll
中返回,紧接着下一步调用 TimerQueue::handleRead()
,然后调用定时器任务的回调函数;
(8)quit
设置 quit_ = true
(上图中 loop 的循环条件为该变量)
如果当前不是事件循环的线程,则调用 wakeup()
唤醒事件循环
2.4.2 EventLoopThread
创建额外的一个线程专门执行 loop
,并把 EventLoop 对象返回给用户;
2.4.3 EventLoopThreadPool
setThreadNum
设置线程数量
(1)start
创建并启动 numThreads_
个 EventLoopThread;
2.5 TCP 服务端相关类
2.5.1 InetAddress
封装了 IPv4 或 IPv6 的 ip 地址和端口号;
内部数据为一个 union,为 sockaddr_in
或 sockaddr_in6
strchr()
用于查找字符串中的一个字符,并返回该字符在字符串中第一次出现的位置
(1)resolve
(只能解析 IPv4)
调用 gethostbyname_r
解析主机名为 IP 地址,需要为其提供缓存供其内部查找过程使用;
链接: link
2.5.2 Socket
封装 socket fd;及 socket 选项(保活机制等)
listen
等函数的返回值检查工作
析构时关闭 fd(RAII,该类是 fd 的所有者,fd 的创建由 SocketsOps 封装的系统调用完成)
2.5.3 Buffer
vector<char> buffer_
,读/写索引;类似循环数组,但这里是通过拷贝的方式实现的;
默认大小为 1024 + 8;
预留区(初始为 8 byte) | 读区 | 写区
(1)append
ensureWritableBytes
判断当前可写的空间是否大于 len
拷贝数据到写区,并 writerIndex_ += len
网络序和主机序互换通过 __bswap_
函数完成
(2)makeSpace
当前可用空间是否大于 len,若是,则将读区的数据拷贝到数组最前面;
否则,调用 resize
(3)prepend
向预留区写入数据,并向前调整 readerIndex_ -= len
;
(4)retrieveAsString
或 readInt32
读取完当前全部字节时,会调用 retrieveAll
将读/写索引设置为初始状态
(5)readFd
直接从 fd 读取数据到 Buffer 中,内部调用了 readv
函数
内部使用了额外的栈缓存char extrabuf[65536];
,超出 Buffer 的数据会暂时存储在该缓存中,之后调用 makeSpace
扩容后再进行拷贝;
(6)shrink
使用 swap 技巧减少 Buffer 数组的空间占用;
2.5.4 TcpConnection
保存两个端点的地址
新建 Channel,并设置了 connectfd 的各个回调函数
输入 和 输出 Buffer
highWaterMark_
高水位标志
(1)sendInLoop
如果当前写缓存中没有数据且没有注册写事件,则直接发送数据;
否则,会将数据向拷贝到 outputBuffer_
中,如果没有注册写事件会进行注册;(channel_->enableWriting()
)
(2)handleRead
调用 Buffer 的 readFd
将数据保存到 inputBuffer_
中
如果读取到了数据,调用 messageCallback_
(业务逻辑)
如果 n == 0,调用 handleClose()
(3)handleWrite
如果将 outputBuffer_
的数据全部发送完毕,就会删除写事件;
调用 writeCompleteCallback_
(业务逻辑)
(4)connectEstablished
- 状态从 kConnecting 改为 kConnected
channel_
绑定自身的智能指针,并注册读事件\- 调用
connectionCallback_
(业务逻辑)
(5)startRead()
/ stopRead
调用 runInLoop
注册或删除读事件
(6)send
当前状态为 kConnected 时,通过runInLoop
将 sendInLoop
添加到事件循环的中;
否则,什么也不做;(即关闭写端后再调用 send
是无效的)
(7)handleClose
删除所有注册事件
调用 connectionCallback_
调用 closeCallback_
(业务逻辑)
(8)shutdown
(只是关闭写端)
设置状态为 kDisconnecting
如果当前注册了写事件,则什么也不做(即等待数据发送完毕,在写回调中会处理这种情况)
否则,调用 shutdown
系统调用关闭写端
(9)什么时候关闭读端?如何关闭?
注意:Socket 析构时会调用 close
系统调用,因此其析构时会关闭读写端(即整个连接);
TcpConnection 构造时,会有 socket_(new Socket(sockfd))
,因此 TcpConnection 析构时会关闭整个连接;
std::unique_ptr<Socket> socket_
被动关闭时,相当于直接调用了 close
系统调用;
2.5.5 Acceptor
封装 listenfd 作用(非阻塞)
(1)listen()
向 IO 复用注册读事件
必须设置 newConnectionCallback_
,否则收到连接就会关闭;
(2)当 accept
出现错误 EMFILE 时(本进程的文件描述符达到上限,无法为新连接创建 socket 文件描述符,因为新连接还在等待处理,epoll_wait
会立即返回,造成忙等待)
构造时创建了一个额外的空闲文件描述符 idleFd_
关闭 idleFd_
,再调用 accept
,再关闭 idleFd_
,最后重新打开一个额外的空闲文件描述符(在多线程中不保证正确,猜想是如果不加临界保护,关闭 idleFd_
,其他线程可能正好调用 accept
,此时并未出错,但当前线程又陷入困境)
2.5.6 TcpServer
创建 EventLoopThreadPool
设置 acceptor_
的回调函数 newConnection
维护各个连接的哈希表,key 为IP、Port、连接号组成的 string,val 为 conn 指针;
(1)newConnection
从线程池轮次获得 EventLoop (如果未设置线程池数量会返回 baseloop );
新建 TcpConnection,设置其一系列回调函数;(创建 TcpConnection 时会新建 Channel)
在对应事件循环 ioLoop
中调用 TcpConnection 的 connectEstablished
方法,注册读事件;
(2)start
调用线程池的 start
在 loop_
中调用 acceptor_
的 listen()
函数
(3)setThreadNum
设置线程池中线程的数量,如果不设置将只有一个线程,即一个事件循环;
(4)此处逻辑暗示了:主线程负责 listenfd ,每个子线程轮次负责 connfd
具体读写处理由 loop 中就绪事件的回调函数完成(业务逻辑由 Connect 等类中的回调函数完成,可以看作是在读写回调函数中又调用了业务逻辑函数)
注册修改事件大多都会放到 pendingFunctors_
中
(5)可以在一个 loop 上运行多个服务端,即监听多个 listenfd;
2.6 客户端
2.6.1 Connector
(1)start
/ startInLoop
在 loop 中调用 connect()
函数
(2) connect()
创建 sockfd,调用 connect
函数
如果立即建立或正在建立过程中,调用 connecting
(注册写事件)
handleWrite
中如果出错或是服务端、客户端都位于同一主机,调用 retry
,
否则,成功建立连接,调用 newConnectionCallback_
,即 TcpClient 中的 newConnection
(3)retry
关闭 sockfd
调用 runAfter
设置定时器事件(到期后的回调函数是 startInLoop
)
2.6.2 TcpClient
新建 Connector
设置 Connector 的 connectionCallback_
(1)newConnection
新建 TcpConnection,设置其一系列回调函数;
调用 connectEstablished
注册读事件(注意回调、事件都是由 TcpConnection 管理)
(2)connect()
调用 Connector 的start
(3)disconnect()
调用连接的 connection_->shutdown();
(有锁保护)
3. 使用教程
(1)IO 复用其实是复用线程,而非 IO 连接;
(2)每千兆比特每秒的吞吐量配一个 event loop;
3.1 本质
1.连接的建立
2.连接的断开
3.消息到达
3.5 消息发送完毕(将数据写入操作系统缓冲区)
在 TcpConnection 中的四个回调函数分别对应这几个事件;
3.2 muduo 线程模型
上图为 muduo 库的默认线程模型,适合小规模的计算;
如果有大量计算任务,可以再加一个线程池,如下所示:
3.3 非阻塞 IO 为什么必须使用应用缓存?
(1)接收时,数据可能没有一次性收全,已经收到的数据累积到 Buffer 中;
(2)发送数据时,只发送了一部分就填满了操作系统发送缓冲区,阻塞时间取决于对方接收数据的速率,剩余数据应该复制到应用缓冲区中;
3.4 IO 复用为何最好不要搭配阻塞 IO?
以下假设在 Berkeley 的实现中;
假设 listenfd 为阻塞 IO,以 accpet
为例,select
返回 listenfd 可读后,可能并不会马上调用 accpet
(这也是维护一个已完成连接队列的原因),当客户在这期间中止该连接时(收到客户的 RST)这个已完成的连接被服务器 TCP 驱除出队列,如果此时队列中没有其他已完成的连接时,之后调用 accpet
时会一直阻塞,直到其他某个客户建立一个连接为止;
3.5 chat 高性能程序设计
(1)利用了 ThreadLocalSingleton
即每个事件循环都有自身线程的一个连接队列;这样在每个事件循环的内部分发时,由于是单线程就不需要加锁;
只需在 Server 端遍历各个 loop 时加锁即可;此步的速度比较快,因为只需于在事件循环中注册函数即可,不用进行等待;
(2)在设计并发的 Hub 程序时,也可以参考上述思路
每个事件循环都维护自己的 std::map<string, Topic>
(3)multiplexer
在没有连接到后台服务器时,接收到的连接都放弃掉
3.6
(1)Channel 中回调函数如何被调用?
(2)在实现 TimerQueue 中,不能直接使用 Timestamp 为 Key,可能有多个 Timer 到期时间相同,解决办法是:使用 multiset 或者 设法区分 Key;
muduo 中使用 std::pair<Timestamp, Timer*>
作为 Key
(3)既然使用 wakeupfd 是为了唤醒事件循环去执行用户回调,那么 doPendingFunctors
为什么不直接放到 EventLoop::handleRead
中?
- 假设已经执行过
EventLoop::handleRead
,此时处理其他就绪事件回调函数,此时如果调用queueInLoop
,由于当前处于 IO 线程且callingPendingFunctors_ = false
,是无法调用wakeup
的,即用户回调不能被及时处理;(要么必须得保证调用中不涉及queueInLoop
,需要约束用户,要么设法调用wakeup
) - 若把
doPendingFunctors
直接放到EventLoop::handleRead
,那么想执行用户回调,必须先执行wakeup
函数;需要执行三个系统调用,write-poll-read
,如果把doPendingFunctors
放到外面,那么这些就绪事件回调函数都可以直接使用这个doPendingFunctors
,节省系统调用;
例如,removeConnectionInLoop
中就直接调用了queueInLoop
;
3.6 为何在 removeConnectionInLoop
中调用了 queueInLoop
方法?
(1)这主要是为了延长 TcpConnection 声明周期,使用 queueInLoop
方法后,其生命期会延长到 doPendingFunctors
函数执行完毕;
若不使用 queueInLoop
而使用 runInLoop
时,在 removeConnectionInLoop
执行完毕后,引用计数就会降为 0,将会析构 TcpConnection
,从而析构 Channel
,这会导致无法返回到 Channel::handleEvent
中;(如果直接使用 runInLoop
,在程序中的表现是,析构 Channel 会出错)
需要注意的是,调用方式为 closeCallback_(shared_from_this());
,即使是引用传递但由于是右值,在函数执行完后就会析构;
(2)上述这种做法对 s06 的实现是必须的,但在 muduo 实现中,Channel 使用了弱引用,在调用前会升级成 shared_ptr,这就保证了在从 handleEvent
返回前,Channel 不会被析构,把这部分代码添加到 s06 中 TcpConnection 可以正常析构,那么在 muduo 实现中是否还有必要使用 queueInLoop
呢?
if (tied_)
{
guard = tie_.lock();
if (guard)
{
handleEventWithGuard(receiveTime);
}
}
// 在建立连接时,用细线绑定
void TcpConnection::connectEstablished()
{
...
channel_->tie(shared_from_this());
...
}
在采用线程池的事件循环中,muduo 中处理被动关闭时的流程可能如下所示:
这里的 queueInLoop()
替换为 runInLoop
效果是相同的;对单线程事件循环而言,如果进行替换,会在 EventLoop 中执行完 connectDestroyed
后才返回到 TcpServer 处,逻辑上感觉没什么问题;
可能有问题的地方在于,在就绪事件回调函数中,删除了这个就绪事件,是否会造成迭代器失效等问题,muduo 中采用了复制的方式将就绪事件都保存到了 currentActiveChannel_
;在源代码中修改运行部分测试程序后,暂未发现什么问题;
参考
《Linux 多线程服务端编程》