CS 144 Lab Four 收尾 -- 网络交互全流程解析
- 引言
- Tun/Tap简介
- tcp_ipv4.cc文件
- 配置信息初始化
- cs144实现的fd家族体系
- 基于自定义fd体系进行数据读写的adapter适配器体系
- 自定义socket体系
- 自定义事件循环EventLoop
- 模板类TCPSpongeSocket详解
- listen_and_accept方法
- _tcp_main方法
- _initialize_TCP初始化Tcp连接和事件循环
- _tcp_loop函数启动tcp事件循环
- connect 方法
- bidirectional_stream_copy方法
- TCPSpongeSocket的wait_until_closed方法
- 通道串联起子主线程
- 小结
对应课程视频: 【计算机网络】 斯坦福大学CS144课程
本节作为Lab Four的收尾,主要带领各位来看看网络交互的整体流程是怎样的。
引言
这里以tcp_ipv4.cc文件为起点,来探究一下cs144是如何实现整个协议栈的。
首先,项目根路径中的 tun.sh 会使用 ip tuntap 技术创建虚拟 Tun/Tap 网络设备。这类接口仅能工作在内核中。不同于普通的网络接口,没有物理硬件。这样做的目的应该是为了模拟真实网络环境下的网络环境。
Tun/Tap简介
关于Tun/Tap的介绍可以参考:
- 虚拟设备之TUN和TAP
- Linux官方内核文档: Tun/Tap驱动程序说明
TUN/TAP提供了用户空间程序的数据包接收和传输功能。
它可以被视为一个简单的点对点或以太网设备,不是从物理媒体接收数据包,而是从用户空间程序接收数据包,并且不是通过物理媒体发送数据包,而是将数据包写入用户空间程序。
为了使用驱动程序,程序必须打开/dev/net/tun,并发出相应的ioctl()来向内核注册一个网络设备。网络设备将显示为tunXX或tapXX,这取决于所选择的选项。当程序关闭文件描述符时,网络设备和所有相应的路由都将消失。
根据所选择的设备类型,用户空间程序必须读取/写入IP数据包(对于tun)或以太网帧(对于tap),使用哪种取决于ioctl()给定的标志。
- TUN 是一个虚拟网络设备,它模拟的是一个三层设备,通过它可以处理来自网络层的数据包,也就是 IP 数据包。由于它只模拟到了 IP 层,所以它无法与物理网卡做 bridge,也没有 MAC 地址,但是可以通过三层交换的方式来与物理网卡相互通信。
- TAP 模拟的是一个二层设备,它比 TUN 更加深入,它可以处理数据链路层的数据包,拥有 MAC 地址,可以与物理网卡做 bridge,支持 MAC 层广播,也可以给它设置 IP 地址。
tcp_ipv4.cc文件
当 Tun/Tap 网络设备建立好后,接下来我们进入到 tcp_ipv4.cc 的main函数中:
int main(int argc, char **argv) {
try {
// 参数个数检查: 第一个参数是编译器传入的程序名,然后是我们需要传入的host和port
if (argc < 3) {
show_usage(argv[0], "ERROR: required arguments are missing.");
return EXIT_FAILURE;
}
// 解析参数,获取TCPConfig,FdAdapterConfig,当前启动的模式(server or client) 和 选择哪个网卡
auto [c_fsm, c_filt, listen, tun_dev_name] = get_config(argc, argv);
// 借助Tun/Tap实现一个虚拟网卡,该虚拟网络设备实现到了IP层
// TunFD是tun设备的文件描述符
// TCPOverIPv4OverTunFdAdapter封装从tun设备读取和写入IPV4数据报的操作
// LossyTCPOverIPv4OverTunFdAdapter采用装饰器模式在前者基础上,增加写入时根据先前设置的丢包率随机丢包的功能
// LossyTCPOverIPv4SpongeSocket 对上层提供一个标准Socket接口,进行调用
LossyTCPOverIPv4SpongeSocket tcp_socket(LossyTCPOverIPv4OverTunFdAdapter(
TCPOverIPv4OverTunFdAdapter(TunFD(tun_dev_name == nullptr ? TUN_DFLT : tun_dev_name))));
// 如果启动的是server mode,那么在监听指定端口上的消息
if (listen) {
tcp_socket.listen_and_accept(c_fsm, c_filt);
} else {
// 如果启动的是client mode,那么主动与对应server建立连接
tcp_socket.connect(c_fsm, c_filt);
}
// 键盘输入的数据会写入socket,socket有可读的数据会输出到屏幕上
bidirectional_stream_copy(tcp_socket);
// 同步等待直到_tcp_thread线程结束
tcp_socket.wait_until_closed();
} catch (const exception &e) {
cerr << "Exception: " << e.what() << endl;
return EXIT_FAILURE;
}
return EXIT_SUCCESS;
}
配置信息初始化
下面给出get_config方法源码解析,感兴趣可以瞅两眼:
//! Config for TCP sender and receiver
class TCPConfig {
public:
// 发送器和接收器缓冲区的默认容量。缓冲区容量指的是在给定时间内可以存储的最大数据量
static constexpr size_t DEFAULT_CAPACITY = 64000; //!< Default capacity
// tcp数据报中payload部分最大容量限制
static constexpr size_t MAX_PAYLOAD_SIZE = 1000; //!< Conservative max payload size for real Internet
// 默认的重传超时时间,以毫秒为单位。
// 当TCP发送器向接收器传输数据时,它期望在规定的超时时间内收到一个确认(ACK)。如果发送器在超时时间内没有收到确认,它会重新传输数据
static constexpr uint16_t TIMEOUT_DFLT = 1000; //!< Default re-transmit timeout is 1 second
// 数据包在放弃之前允许的最大重传次数。如果发送器在经过指定的重传尝试次数后仍未收到确认,它会认为连接不可靠并采取适当的措施
static constexpr unsigned MAX_RETX_ATTEMPTS = 8; //!< Maximum re-transmit attempts before giving up
// 用于保存重传超时的初始值,以毫秒为单位。它指定发送器在重新传输数据之前应等待ACK的时间
// 由于重传超时时间会在网络拥塞的时候动态增加,因此当重置超时重传计数器时,需要将重传超时时间恢复为初始值
uint16_t rt_timeout = TIMEOUT_DFLT; //!< Initial value of the retransmission timeout, in milliseconds
// 接收和发送缓冲区默认大小
size_t recv_capacity = DEFAULT_CAPACITY; //!< Receive capacity, in bytes
size_t send_capacity = DEFAULT_CAPACITY; //!< Sender capacity, in bytes
// 初始序列号,如果没有设置,那么会采用随机值策略
std::optional<WrappingInt32> fixed_isn{};
};
//! Config for classes derived from FdAdapter
class FdAdapterConfig {
public:
// 源ip地址和端口号
Address source{"0", 0}; //!< Source address and port
// 目的ip地址和端口号
Address destination{"0", 0}; //!< Destination address and port
// 下行丢包率,即从服务器发往客户端的数据包丢失的概率
uint16_t loss_rate_dn = 0; //!< Downlink loss rate (for LossyFdAdapter)
// 上行丢包率,即从客户端发往服务器的数据包丢失的概率
uint16_t loss_rate_up = 0; //!< Uplink loss rate (for LossyFdAdapter)
};
static tuple<TCPConfig, FdAdapterConfig, bool, char *> get_config(int argc, char **argv) {
TCPConfig c_fsm{};
FdAdapterConfig c_filt{};
char *tundev = nullptr;
int curr = 1;
bool listen = false;
// 如果我们不指定Host和Port,那么使用默认提供的ip地址和随机端口号
string source_address = LOCAL_ADDRESS_DFLT;
string source_port = to_string(uint16_t(random_device()()));
// 判断是否传入了相关参数,保留最后两个host和port值
while (argc - curr > 2) {
// 打开server端的Listen模式
if (strncmp("-l", argv[curr], 3) == 0) {
listen = true;
curr += 1;
} else if (strncmp("-a", argv[curr], 3) == 0) {
// -a 用来指定自己的ip地址
check_argc(argc, argv, curr, "ERROR: -a requires one argument.");
source_address = argv[curr + 1];
curr += 2;
} else if (strncmp("-s", argv[curr], 3) == 0) {
// -s 用来指定自己的端口号
check_argc(argc, argv, curr, "ERROR: -s requires one argument.");
source_port = argv[curr + 1];
curr += 2;
} else if (strncmp("-w", argv[curr], 3) == 0) {
// -w 用来指定自己接收窗口大小
check_argc(argc, argv, curr, "ERROR: -w requires one argument.");
c_fsm.recv_capacity = strtol(argv[curr + 1], nullptr, 0);
curr += 2;
} else if (strncmp("-t", argv[curr], 3) == 0) {
// -t 指定RTO超时时间
check_argc(argc, argv, curr, "ERROR: -t requires one argument.");
c_fsm.rt_timeout = strtol(argv[curr + 1], nullptr, 0);
curr += 2;
} else if (strncmp("-d", argv[curr], 3) == 0) {
// -d 指定要连接的tundev也就是网卡
check_argc(argc, argv, curr, "ERROR: -t requires one argument.");
tundev = argv[curr + 1];
curr += 2;
} else if (strncmp("-Lu", argv[curr], 3) == 0) {
// -Lu 此选项设置上行丢包率,即从客户端发往服务器的数据包丢失的概率
check_argc(argc, argv, curr, "ERROR: -Lu requires one argument.");
float lossrate = strtof(argv[curr + 1], nullptr);
using LossRateUpT = decltype(c_filt.loss_rate_up);
c_filt.loss_rate_up =
static_cast<LossRateUpT>(static_cast<float>(numeric_limits<LossRateUpT>::max()) * lossrate);
curr += 2;
} else if (strncmp("-Ld", argv[curr], 3) == 0) {
// -Ld 此选项设置下行丢包率,即从服务器发往客户端的数据包丢失的概率
check_argc(argc, argv, curr, "ERROR: -Lu requires one argument.");
float lossrate = strtof(argv[curr + 1], nullptr);
using LossRateDnT = decltype(c_filt.loss_rate_dn);
c_filt.loss_rate_dn =
static_cast<LossRateDnT>(static_cast<float>(numeric_limits<LossRateDnT>::max()) * lossrate);
curr += 2;
} else if (strncmp("-h", argv[curr], 3) == 0) {
// -h 显示提示信息
show_usage(argv[0], nullptr);
exit(0);
} else {
show_usage(argv[0], string("ERROR: unrecognized option " + string(argv[curr])).c_str());
exit(1);
}
}
// parse positional command-line arguments
// 是否打开了server端LISTEN模式
if (listen) {
// 说明当前启动的是server端 --> 从参数中获取监听端口号
// 将过滤器的源地址配置为 "0"(表示监听所有本地网络接口的地址)
c_filt.source = {"0", argv[curr + 1]};
if (c_filt.source.port() == 0) {
show_usage(argv[0], "ERROR: listen port cannot be zero in server mode.");
exit(1);
}
} else {
// 说明当前启动的是client端 -- 目的ip地址和端口号从最后两个参数获取
c_filt.destination = {argv[curr], argv[curr + 1]};
// 我们可以通过-a或者-s参数指定启动的客户端监听的ip地址和端口
c_filt.source = {source_address, source_port};
}
return make_tuple(c_fsm, c_filt, listen, tundev);
}
cs144实现的fd家族体系
main函数中会建立一个 TCPOverIPv4OverTunFdAdapter
。TunFd
指的是连接进 Tun 设备上的 socket :
TunFD具体应用可以看app/tun.cc :
int main() {
try {
TunFD tun("tun144");
while (true) {
auto buffer = tun.read();
cout << "\n\n***\n*** Got packet:\n***\n";
hexdump(buffer.data(), buffer.size());
IPv4Datagram ip_dgram;
cout << "attempting to parse as ipv4 datagram... ";
if (ip_dgram.parse(move(buffer)) != ParseResult::NoError) {
cout << "failed.\n";
continue;
}
cout << "success! totlen=" << ip_dgram.header().len << ", IPv4 header contents:\n";
cout << ip_dgram.header().to_string();
if (ip_dgram.header().proto != IPv4Header::PROTO_TCP) {
cout << "\nNot TCP, skipping.\n";
continue;
}
cout << "\nAttempting to parse as a TCP segment... ";
TCPSegment tcp_seg;
if (tcp_seg.parse(ip_dgram.payload(), ip_dgram.header().pseudo_cksum()) != ParseResult::NoError) {
cout << "failed.\n";
continue;
}
cout << "success! payload len=" << tcp_seg.payload().size() << ", TCP header contents:\n";
cout << tcp_seg.header().to_string() << endl;
}
} catch (const exception &e) {
cout << "Exception: " << e.what() << endl;
return EXIT_FAILURE;
}
return EXIT_SUCCESS;
}
基于自定义fd体系进行数据读写的adapter适配器体系
TCPOverIPv4OverTunFdAdapter
是一个 IP 层面的封装接口。当调用 adapter 向其写入 TCP 报文段时,它会自动 wrap 上 IP 段并传输进网络设备中;读取也是亦然,会自动解除 IP 段并返回其内部封装的 TCP报文段。
LossyTCPOverIPv4OverTunFdAdapter本身由模板类LossyFdAdapter实例化而来,该模板类通过装饰器模式对内部持有的Adapter进行功能增强,主要增加在读写数据时,根据先前设置丢包率来判断是否丢弃此次的数据报:
template <typename AdapterT>
class LossyFdAdapter {
private:
//! Fast RNG used by _should_drop()
std::mt19937 _rand{get_random_generator()};
//! The underlying FD adapter
AdapterT _adapter;
...
bool _should_drop(bool uplink) {
const auto &cfg = _adapter.config();
const uint16_t loss = uplink ? cfg.loss_rate_up : cfg.loss_rate_dn;
return loss != 0 && uint16_t(_rand()) < loss;
}
//! \brief Read from the underlying AdapterT instance, potentially dropping the read datagram
//! \returns std::optional<TCPSegment> that is empty if the segment was dropped or if
//! the underlying AdapterT returned an empty value
std::optional<TCPSegment> read() {
auto ret = _adapter.read();
if (_should_drop(false)) {
return {};
}
return ret;
}
//! \brief Write to the underlying AdapterT instance, potentially dropping the datagram to be written
//! \param[in] seg is the packet to either write or drop
void write(TCPSegment &seg) {
if (_should_drop(true)) {
return;
}
return _adapter.write(seg);
}
...
};
自定义socket体系
cs144中封装的Socket继承体系如下所示:
自定义事件循环EventLoop
cs144在Linux提供的多路复用模型Poll基础上进行封装,造出了一个简易版本的事件循环机制EventLoop:
//! Waits for events on file descriptors and executes corresponding callbacks.
class EventLoop {
public:
// 对fd的读事件还是写事件感兴趣
enum class Direction : short {
In = POLLIN,
Out = POLLOUT
};
private:
using CallbackT = std::function<void(void)>;
using InterestT = std::function<bool(void)>;
// 内部类Rule,说白了就是持有用户对哪个fd的那些事件感兴趣的信息载体
// 同时持有对应事件发生和取消时的回调接口
class Rule {
public:
FileDescriptor fd;
Direction direction;
// 发生感兴趣事件的时候回调该接口
CallbackT callback;
// 返回值决定当前fd是否需要被监听
InterestT interest;
// 当对应fd关闭,出错时,回调该接口
CallbackT cancel;
// 根据direction的不同返回当前fd已经被读取或者写入了多少次
unsigned int service_count() const;
};
// 用户注册的感兴趣的事件集合
std::list<Rule> _rules{};
public:
// 事件监听的返回结果
enum class Result {
Success, // At least one Rule was triggered.
Timeout, // No rules were triggered before timeout.
Exit // All rules have been canceled or were uninterested; make no further calls to EventLoop::wait_next_event.
};
// 用户添加感兴趣的事件
void add_rule(
const FileDescriptor &fd,
const Direction direction,
const CallbackT &callback,
const InterestT &interest = [] { return true; },
const CallbackT &cancel = [] {});
// 等待下一个感兴趣的事件发生 --- 参数是等待超时时间
Result wait_next_event(const int timeout_ms);
};
- add_rule函数: 注册感兴趣的事件
void EventLoop::add_rule(const FileDescriptor &fd,
const Direction direction,
const CallbackT &callback,
const InterestT &interest,
const CallbackT &cancel) {
_rules.push_back({fd.duplicate(), direction, callback, interest, cancel});
}
- service_count函数: 当前fd已经被读取或者写入了多少次
unsigned int EventLoop::Rule::service_count() const {
return direction == Direction::In ? fd.read_count() : fd.write_count();
}
- wait_next_event函数: 等待获取下一个发生的感兴趣的事件
EventLoop::Result EventLoop::wait_next_event(const int timeout_ms) {
vector<pollfd> pollfds{};
pollfds.reserve(_rules.size());
bool something_to_poll = false;
// set up the pollfd for each rule
// 遍历所有Rule
for (auto it = _rules.cbegin(); it != _rules.cend();) { // NOTE: it gets erased or incremented in loop body
const auto &this_rule = *it;
// 如果当前rule期望从fd中读取数据,并且此时fd已经没有数据可以读取了,那么回调当前rule的cacel回调接口
// 并且将当前rule从已有的rule集合中移除
if (this_rule.direction == Direction::In && this_rule.fd.eof()) {
// no more reading on this rule, it's reached eof
this_rule.cancel();
it = _rules.erase(it);
continue;
}
// 如果当前fd关闭了,同上处理
if (this_rule.fd.closed()) {
this_rule.cancel();
it = _rules.erase(it);
continue;
}
// 判断是否对当前rule感兴趣,如果感兴趣则加入pollfds进入下面事件轮询阶段
if (this_rule.interest()) {
// pollfd由三个属性: 需要轮询的fd,是对fd的可读还是可写事件感兴趣,实际发生了什么事件
pollfds.push_back({this_rule.fd.fd_num(), static_cast<short>(this_rule.direction), 0});
something_to_poll = true;
} else {
// 为了保持 pollfds 数组和规则列表 _rules 中的规则一一对应,仍然需要将一个 pollfd 结构体添加到 pollfds 数组中
// 但是对应的事件设置为 0,表示不关注任何事件,相当于占位符
pollfds.push_back({this_rule.fd.fd_num(), 0, 0}); // placeholder --- we still want errors
}
++it;
}
// quit if there is nothing left to poll --- 没有任何rule需要轮询
if (not something_to_poll) {
return Result::Exit;
}
// call poll -- wait until one of the fds satisfies one of the rules (writeable/readable)
try {
// 通过调用poll对pollfds集合中所有pollfd开启事件轮询
// 最后一个参数: 如果没有感兴趣事件发生,最多轮询等待多久
if (0 == SystemCall("poll", ::poll(pollfds.data(), pollfds.size(), timeout_ms))) {
return Result::Timeout;
}
} catch (unix_error const &e) {
if (e.code().value() == EINTR) {
return Result::Exit;
}
}
// go through the poll results
// 遍历poll结果 -- rules和pollfds集合索引是一一对应的
for (auto [it, idx] = make_pair(_rules.begin(), size_t(0)); it != _rules.end(); ++idx) {
const auto &this_pollfd = pollfds[idx];
// revents保存着实际发生的事件 -- 是否发生错误
const auto poll_error = static_cast<bool>(this_pollfd.revents & (POLLERR | POLLNVAL));
if (poll_error) {
throw runtime_error("EventLoop: error on polled file descriptor");
}
const auto &this_rule = *it;
// 获取发生了哪些感兴趣的事件
const auto poll_ready = static_cast<bool>(this_pollfd.revents & this_pollfd.events);
// 当描述符关闭时或者对端连接关闭时,会设置描述符挂起事件
const auto poll_hup = static_cast<bool>(this_pollfd.revents & POLLHUP);
// 如果当前描述符被挂起了,那么将当前rule移除
if (poll_hup && this_pollfd.events && !poll_ready) {
// if we asked for the status, and the _only_ condition was a hangup, this FD is defunct:
// - if it was POLLIN and nothing is readable, no more will ever be readable
// - if it was POLLOUT, it will not be writable again
this_rule.cancel();
it = _rules.erase(it);
continue;
}
// 如果存在感兴趣的事件发生
if (poll_ready) {
// we only want to call callback if revents includes the event we asked for
const auto count_before = this_rule.service_count();
// 回调Rule对应的接口
this_rule.callback();
// only check for busy wait if we're not canceling or exiting
if (count_before == this_rule.service_count() and this_rule.interest()) {
throw runtime_error(
"EventLoop: busy wait detected: callback did not read/write fd and is still interested");
}
}
++it; // if we got here, it means we didn't call _rules.erase()
}
return Result::Success;
}
模板类TCPSpongeSocket详解
TCPSpongeSocket本身是一个模板类,再该模板类基础上衍生出大量实例化类型:
using TCPOverUDPSpongeSocket = TCPSpongeSocket<TCPOverUDPSocketAdapter>;
using TCPOverIPv4SpongeSocket = TCPSpongeSocket<TCPOverIPv4OverTunFdAdapter>;
using TCPOverIPv4OverEthernetSpongeSocket = TCPSpongeSocket<TCPOverIPv4OverEthernetAdapter>;
using LossyTCPOverUDPSpongeSocket = TCPSpongeSocket<LossyTCPOverUDPSocketAdapter>;
using LossyTCPOverIPv4SpongeSocket = TCPSpongeSocket<LossyTCPOverIPv4OverTunFdAdapter>;
TCPSpongeSocket类中重要的属性如下所示:
//! Multithreaded wrapper around TCPConnection that approximates the Unix sockets API
template <typename AdaptT>
class TCPSpongeSocket : public LocalStreamSocket {
private:
//! Stream socket for reads and writes between owner and TCP thread
LocalStreamSocket _thread_data;
protected:
//! Adapter to underlying datagram socket (e.g., UDP or IP)
AdaptT _datagram_adapter;
private:
//! Set up the TCPConnection and the event loop
void _initialize_TCP(const TCPConfig &config);
//! TCP state machine -- Lab Four实现的
std::optional<TCPConnection> _tcp{};
//! eventloop that handles all the events (new inbound datagram, new outbound bytes, new inbound bytes)
// 事件循环机制 -- 参考Select和Epoll模型
EventLoop _eventloop{};
//! Process events while specified condition is true
void _tcp_loop(const std::function<bool()> &condition);
//! Main loop of TCPConnection thread
void _tcp_main();
//! Handle to the TCPConnection thread; owner thread calls join() in the destructor
std::thread _tcp_thread{};
//! Construct LocalStreamSocket fds from socket pair, initialize eventloop
TCPSpongeSocket(std::pair<FileDescriptor, FileDescriptor> data_socket_pair, AdaptT &&datagram_interface);
std::atomic_bool _abort{false}; //!< Flag used by the owner to force the TCPConnection thread to shut down
bool _inbound_shutdown{false}; //!< Has TCPSpongeSocket shut down the incoming data to the owner?
bool _outbound_shutdown{false}; //!< Has the owner shut down the outbound data to the TCP connection?
bool _fully_acked{false}; //!< Has the outbound data been fully acknowledged by the peer?
...
listen_and_accept方法
我们先来看一下TCPSpongeSocket类的listen_and_accept方法实现,服务端会调用该方法进行端口监听:
//! \param[in] c_tcp is the TCPConfig for the TCPConnection
//! \param[in] c_ad is the FdAdapterConfig for the FdAdapter
template <typename AdaptT>
void TCPSpongeSocket<AdaptT>::listen_and_accept(const TCPConfig &c_tcp, const FdAdapterConfig &c_ad) {
if (_tcp) {
throw runtime_error("listen_and_accept() with TCPConnection already initialized");
}
// 初始化TCP连接和事件循环
_initialize_TCP(c_tcp);
_datagram_adapter.config_mut() = c_ad;
_datagram_adapter.set_listening(true);
cerr << "DEBUG: Listening for incoming connection...\n";
// 启动tcp事件循环,传入的函数为condition,其返回值决定事件循环是否继续
// 该事件循环只负责将连接建立起来,三次握手结束后,退出事件循环 -- 事务循环函数解析下面会给出
_tcp_loop([&] {
const auto s = _tcp->state();
return (s == TCPState::State::LISTEN or s == TCPState::State::SYN_RCVD or s == TCPState::State::SYN_SENT);
});
cerr << "New connection from " << _datagram_adapter.config().destination.to_string() << ".\n";
// _tcp_thread线程负责完成当前TCP连接后续数据传输,此时线程已经启动
_tcp_thread = thread(&TCPSpongeSocket::_tcp_main, this);
}
_tcp_main方法
template <typename AdaptT>
void TCPSpongeSocket<AdaptT>::_tcp_main() {
try {
if (not _tcp.has_value()) {
throw runtime_error("no TCP");
}
// 开启tcp事件循环,不断运行,直到TCP连接断开
_tcp_loop([] { return true; });
// 关闭当前Socket
shutdown(SHUT_RDWR);
if (not _tcp.value().active()) {
cerr << "DEBUG: TCP connection finished "
<< (_tcp.value().state() == TCPState::State::RESET ? "uncleanly" : "cleanly.\n");
}
// 将optional里面保存的TCPConnection清空
_tcp.reset();
} catch (const exception &e) {
cerr << "Exception in TCPConnection runner thread: " << e.what() << "\n";
throw e;
}
}
_initialize_TCP初始化Tcp连接和事件循环
_initialize_TCP负责初始化tcp连接和事件循环:
template <typename AdaptT>
void TCPSpongeSocket<AdaptT>::_initialize_TCP(const TCPConfig &config) {
// 将tcpConfig设置到TCPConnection中
_tcp.emplace(config);
// Set up the event loop
// There are four possible events to handle:
//
// 1) Incoming datagram received (needs to be given to
// TCPConnection::segment_received method)
//
// 2) Outbound bytes received from local application via a write()
// call (needs to be read from the local stream socket and
// given to TCPConnection::data_written method)
//
// 3) Incoming bytes reassembled by the TCPConnection
// (needs to be read from the inbound_stream and written
// to the local stream socket back to the application)
//
// 4) Outbound segment generated by TCP (needs to be
// given to underlying datagram socket)
// rule 1: read from filtered packet stream and dump into TCPConnection
// 监听网络是否有数据报到达
_eventloop.add_rule(
// 监听的fd本质是tun设备
_datagram_adapter,
Direction::In,
// 当感兴趣事件发生时,会回调该接口
[&] {
// 从tun设备读取数据
auto seg = _datagram_adapter.read();
// 交给TcpConnection进行处理
if (seg) {
_tcp->segment_received(move(seg.value()));
}
// debugging output:
if (_thread_data.eof() and _tcp.value().bytes_in_flight() == 0 and not _fully_acked) {
cerr << "DEBUG: Outbound stream to " << _datagram_adapter.config().destination.to_string()
<< " has been fully acknowledged.\n";
_fully_acked = true;
}
},
// 只要tcp连接还活跃,那么就继续轮询当前rule
[&] { return _tcp->active(); });
// rule 2: read from pipe into outbound buffer
// 监听应用程序是否有数据需要传输
_eventloop.add_rule(
// 监听_thread_data -- 竖立在应用程序和协议栈直接的数据传输通道
_thread_data,
Direction::In,
[&] {
// 应用程序向_thread_data中写入数据,然后通知协议栈有数据需要发送
// 根据tcp写入窗口剩余空闲大小读取指定的需要写出的数据量
const auto data = _thread_data.read(_tcp->remaining_outbound_capacity());
const auto len = data.size();
// 调用TCPConnection的write方法进行写出
const auto amount_written = _tcp->write(move(data));
if (amount_written != len) {
throw runtime_error("TCPConnection::write() accepted less than advertised length");
}
// 如果应用程序主动调用close关闭了_thread_data通道,那么tcp写入通道也可以关闭了
if (_thread_data.eof()) {
_tcp->end_input_stream();
// 输出通道关闭
_outbound_shutdown = true;
// debugging output:
cerr << "DEBUG: Outbound stream to " << _datagram_adapter.config().destination.to_string()
<< " finished (" << _tcp.value().bytes_in_flight() << " byte"
<< (_tcp.value().bytes_in_flight() == 1 ? "" : "s") << " still in flight).\n";
}
},
// 只要当前tcp连接还活跃并且输出通道还没有关闭并且当前tcp写入窗口大小不为0,就继续轮询当前rule
[&] { return (_tcp->active()) and (not _outbound_shutdown) and (_tcp->remaining_outbound_capacity() > 0); },
// fd发生错误时,回调该接口
[&] {
_tcp->end_input_stream();
_outbound_shutdown = true;
});
// rule 3: read from inbound buffer into pipe
// 监听是否有按序到达的字节流还未写入,同时_thread_data通道还未关闭,如果有则写入_thread_data通道
_eventloop.add_rule(
// 监听thread_data
_thread_data,
// 关注可写事件
Direction::Out,
[&] {
// 获取tcp接收器的读取流
ByteStream &inbound = _tcp->inbound_stream();
// Write from the inbound_stream into
// the pipe, handling the possibility of a partial
// write (i.e., only pop what was actually written).
// 一口气把所有已经按序达到的字节流全部读取出来
const size_t amount_to_write = min(size_t(65536), inbound.buffer_size());
const std::string buffer = inbound.peek_output(amount_to_write);
// 将读取出来的数据全部写入_thread_data管道中
const auto bytes_written = _thread_data.write(move(buffer), false);
// 已经成功被应用程序接收的字节流可以丢掉了
inbound.pop_output(bytes_written);
// 如果tcp进入四次挥手阶段或者断开连接了,那么关闭_thread_data管道
if (inbound.eof() or inbound.error()) {
_thread_data.shutdown(SHUT_WR);
_inbound_shutdown = true;
// debugging output:
cerr << "DEBUG: Inbound stream from " << _datagram_adapter.config().destination.to_string()
<< " finished " << (inbound.error() ? "with an error/reset.\n" : "cleanly.\n");
// 满足下面这个条件说明目前此端为客户端,并且进入了四次挥手的TIME_WAIT阶段
if (_tcp.value().state() == TCPState::State::TIME_WAIT) {
cerr << "DEBUG: Waiting for lingering segments (e.g. retransmissions of FIN) from peer...\n";
}
}
},
// 如果tcp接收器还存在按序到达的字节流没有读取,或者tcp_receiver还没有接收到FIN包,那么就继续轮询当前rule
[&] {
return (not _tcp->inbound_stream().buffer_empty()) or
((_tcp->inbound_stream().eof() or _tcp->inbound_stream().error()) and not _inbound_shutdown);
});
// rule 4: read outbound segments from TCPConnection and send as datagrams
// 监听TCPConnection是否有数据需要发送,如果有则发送,前提是_datagram_adapter可写
_eventloop.add_rule(
_datagram_adapter,
Direction::Out,
[&] {
// 如果TCPConnection的segments_out等待队列不为空,说明存在待传输的数据包
while (not _tcp->segments_out().empty()) {
// 写入segments_out,进行数据包的实际传输
_datagram_adapter.write(_tcp->segments_out().front());
_tcp->segments_out().pop();
}
},
// 只要segments_out不为空,就继续轮询当前rule
[&] { return not _tcp->segments_out().empty(); });
}
_tcp_loop函数启动tcp事件循环
_tcp_loop函数启动tcp事件循环:
//! \param[in] condition is a function returning true if loop should continue
template <typename AdaptT>
void TCPSpongeSocket<AdaptT>::_tcp_loop(const function<bool()> &condition) {
auto base_time = timestamp_ms();
// 什么时候停止事件循环取决于condition函数返回值
while (condition()) {
// 等待获取下一个待发生的rule,超时则返回 -- 超时时间为10毫秒
auto ret = _eventloop.wait_next_event(TCP_TICK_MS);
// 没有事件发生,说明TCP断开了连接
if (ret == EventLoop::Result::Exit or _abort) {
break;
}
// 如果tcp连接仍然活跃
if (_tcp.value().active()) {
// 每隔10毫秒,调用一次TCPConnection的tick方法
const auto next_time = timestamp_ms();
// 传入参数: 距离上次调用该方法过了多久
_tcp.value().tick(next_time - base_time);
// 只有TCPOverIPv4OverEthernetAdapter的tick函数才有意义 -- 这里暂时不解释
// 其他adapter均为空实现
_datagram_adapter.tick(next_time - base_time);
base_time = next_time;
}
}
}
connect 方法
//! \param[in] c_tcp is the TCPConfig for the TCPConnection
//! \param[in] c_ad is the FdAdapterConfig for the FdAdapter
template <typename AdaptT>
void TCPSpongeSocket<AdaptT>::connect(const TCPConfig &c_tcp, const FdAdapterConfig &c_ad) {
if (_tcp) {
throw runtime_error("connect() with TCPConnection already initialized");
}
// 初始化TCP连接和事件循环
_initialize_TCP(c_tcp);
_datagram_adapter.config_mut() = c_ad;
cerr << "DEBUG: Connecting to " << c_ad.destination.to_string() << "...\n";
// 开始三次握手,首先由Client发出一个SYN包
_tcp->connect();
const TCPState expected_state = TCPState::State::SYN_SENT;
if (_tcp->state() != expected_state) {
throw runtime_error("After TCPConnection::connect(), state was " + _tcp->state().name() + " but expected " +
expected_state.name());
}
// 使用事件循环,等待三次连接建立完毕
_tcp_loop([&] { return _tcp->state() == TCPState::State::SYN_SENT; });
cerr << "Successfully connected to " << c_ad.destination.to_string() << ".\n";
// 单独开启一个线程用于后续数据传输
_tcp_thread = thread(&TCPSpongeSocket::_tcp_main, this);
}
bidirectional_stream_copy方法
无论对于 Server 还是 Client,在三次握手之后,都会建立一个新的线程,来专门执行 LossyTCPOverIPv4SpongeSocket
中的 eventloop。而主线程会另起一个 eventloop 以及另外开辟两个缓冲区,用于存放用户写入的数据与即将输出至屏幕的数据。当用户通过 stdin 输入数据时, eventloop 中所注册的 poll 事件被检测到,则数据将会被写入进本地输入缓冲区中。当 TCPOverIPv4OverTunFdAdapter
可写时,它会将本地输入缓冲区中的数据全部写入至 TCPOverIPv4OverTunFdAdapter
,并最终传输至远程。
而 webget 与真实服务器通信的原理,也是通过将 IP 报文写入 tun 虚拟网络设备,将其注入进 OS 协议栈中,模拟实际的发包情况。
// 在标准输入(stdin)和标准输出(stdout)之间以及一个自定义的 socket 对象之间进行双向数据复制
// 标准输入 --> socket --> 标准输出
// 键盘输入的数据会写入socket,socket有可读的数据会输出到屏幕上
void bidirectional_stream_copy(Socket &socket) {
constexpr size_t max_copy_length = 65536;
constexpr size_t buffer_size = 1048576;
EventLoop _eventloop{};
FileDescriptor _input{STDIN_FILENO};
FileDescriptor _output{STDOUT_FILENO};
ByteStream _outbound{buffer_size};
ByteStream _inbound{buffer_size};
bool _outbound_shutdown{false};
bool _inbound_shutdown{false};
socket.set_blocking(false);
_input.set_blocking(false);
_output.set_blocking(false);
// rule 1: read from stdin into outbound byte stream
// 标准输入有数据可读则写入_outbound通道
_eventloop.add_rule(
_input,
Direction::In,
[&] {
_outbound.write(_input.read(_outbound.remaining_capacity()));
if (_input.eof()) {
_outbound.end_input();
}
},
[&] { return (not _outbound.error()) and (_outbound.remaining_capacity() > 0) and (not _inbound.error()); },
[&] { _outbound.end_input(); });
// rule 2: read from outbound byte stream into socket
// socket可写,则将_outbound通道中数据写入socket
_eventloop.add_rule(
socket,
Direction::Out,
[&] {
const size_t bytes_to_write = min(max_copy_length, _outbound.buffer_size());
const size_t bytes_written = socket.write(_outbound.peek_output(bytes_to_write), false);
_outbound.pop_output(bytes_written);
if (_outbound.eof()) {
socket.shutdown(SHUT_WR);
_outbound_shutdown = true;
}
},
[&] { return (not _outbound.buffer_empty()) or (_outbound.eof() and not _outbound_shutdown); },
[&] { _outbound.end_input(); });
// rule 3: read from socket into inbound byte stream
// socket有可读数据,则读取数据并写入_inbound通道
_eventloop.add_rule(
socket,
Direction::In,
[&] {
_inbound.write(socket.read(_inbound.remaining_capacity()));
if (socket.eof()) {
_inbound.end_input();
}
},
[&] { return (not _inbound.error()) and (_inbound.remaining_capacity() > 0) and (not _outbound.error()); },
[&] { _inbound.end_input(); });
// rule 4: read from inbound byte stream into stdout
// 如果标准输出可写,则将数据从_inbound中读取出来,然后写入标准输出
_eventloop.add_rule(
_output,
Direction::Out,
[&] {
const size_t bytes_to_write = min(max_copy_length, _inbound.buffer_size());
const size_t bytes_written = _output.write(_inbound.peek_output(bytes_to_write), false);
_inbound.pop_output(bytes_written);
if (_inbound.eof()) {
_output.close();
_inbound_shutdown = true;
}
},
[&] { return (not _inbound.buffer_empty()) or (_inbound.eof() and not _inbound_shutdown); },
[&] { _inbound.end_input(); });
// loop until completion -- 死循环,每次都阻塞到下一次事件发生
while (true) {
if (EventLoop::Result::Exit == _eventloop.wait_next_event(-1)) {
return;
}
}
}
TCPSpongeSocket的wait_until_closed方法
wait_until_closed方法负责同步等待直到_tcp_thread线程结束:
template <typename AdaptT>
void TCPSpongeSocket<AdaptT>::wait_until_closed() {
// 关闭当前socket
shutdown(SHUT_RDWR);
// 同步等待直到_tcp_thread线程结束
if (_tcp_thread.joinable()) {
cerr << "DEBUG: Waiting for clean shutdown... ";
_tcp_thread.join();
cerr << "done.\n";
}
}
通道串联起子主线程
首先,我们来看一下TCPSpongeSocket的构造函数和析构函数:
// socketpair系统调用的作用是在本地进程间创建一对已连接的套接字(sockets)。
// 这对套接字可用于本地通信,类似于网络套接字的用法,但是不需要通过网络协议栈进行通信,而是直接在内核中完成通信,因此效率更高。
static inline pair<FileDescriptor, FileDescriptor> socket_pair_helper(const int type) {
int fds[2];
// 具体来说,socketpair创建了两个相关联的套接字,一个作为读取套接字(reading socket),另一个作为写入套接字(writing socket)。
// 这两个套接字之间形成了一条双向的通信通道,任何通过写入套接字发送的数据都可以通过读取套接字接收,并且反之亦然。
SystemCall("socketpair", ::socketpair(AF_UNIX, type, 0, static_cast<int *>(fds)));
return {FileDescriptor(fds[0]), FileDescriptor(fds[1])};
}
//! \param[in] datagram_interface is the underlying interface (e.g. to UDP, IP, or Ethernet)
template <typename AdaptT>
TCPSpongeSocket<AdaptT>::TCPSpongeSocket(AdaptT &&datagram_interface)
: TCPSpongeSocket(socket_pair_helper(SOCK_STREAM), move(datagram_interface)) {}
template <typename AdaptT>
TCPSpongeSocket<AdaptT>::TCPSpongeSocket(pair<FileDescriptor, FileDescriptor> data_socket_pair,
AdaptT &&datagram_interface)
// 主线程拿着通道一端
: LocalStreamSocket(move(data_socket_pair.first))
// 子线程拿着通道的另一端
, _thread_data(move(data_socket_pair.second))
, _datagram_adapter(move(datagram_interface)) {
_thread_data.set_blocking(false);
}
template <typename AdaptT>
TCPSpongeSocket<AdaptT>::~TCPSpongeSocket() {
try {
if (_tcp_thread.joinable()) {
cerr << "Warning: unclean shutdown of TCPSpongeSocket\n";
// force the other side to exit
_abort.store(true);
_tcp_thread.join();
}
} catch (const exception &e) {
cerr << "Exception destructing TCPSpongeSocket: " << e.what() << endl;
}
}
主线程和子线程通过socketpair系统调用创建的一对已连接的套接字(sockets)进行本地通信。
- 主线程中发生键盘输入事件,到输入的内容通过socktpair创建的双向通道传输到子线程,然后由子线程将数据最终通过tun设备发送出去,这中间结合了两个eventloop共同协作完成
- 当tun设备接收到网络数据包的时候,会将数据包传输给TCP协议栈进行处理,TCP协议栈处理完后,如果发现_thread_data双向通道可写,则将处理完毕的数据包丢到通道中,主线程中的Socket发现来数据了,将数据写入_inbound通道中,此时发现标准输出可写,最终将接收到的数据包输出到屏幕上
- 这中间同样结合了两个eventloop共同协作工作,大家可以好好理解一下
- 这中间同样结合了两个eventloop共同协作工作,大家可以好好理解一下
小结
以上就是我个人对cs144 Lab Four测试文件tcp_ipv4.cc文件大体流程的理解,可能会存在错误,欢迎各位大佬评论区指出,同时由于篇幅有限,不能将所有源码一一贴出讲解,所以阅读过程中大家可以对照cs144 lab four相关源码进行学习