CS 144 Lab Three -- the TCP sender
- TCPSender 功能
- 如何检测丢包
- TCPSender 要求
- TCPSender 状态转换图
- TCPSender 实现
- 测试
对应课程视频: 【计算机网络】 斯坦福大学CS144课程
Lab Three 对应的PDF: Lab Checkpoint 3: the TCP sender
TCPSender 功能
TCP Sender 负责将数据以 TCP 报文的形式发送,其需要完成的功能有:
- 将 ByteStream 中的数据以 TCP 报文形式持续发送给接收者。
- 处理 TCPReceiver 传入的 ackno 和 window size,以追踪接收者当前的接收状态,以及检测丢包情况。
- 若经过一个超时时间后仍然没有接收到 TCPReceiver 发送的针对某个数据包的 ack 包,则重传对应的原始数据包。
如何检测丢包
TCP 使用超时重传机制。TCPSender 除了将原始数据流分解成众多 TCP 报文并发送以外,它还会追踪每个已发送报文(已被发送但还未被接收)的发送时间。如果某些已发送报文太久没有被接收方确认(即接收方接收到对应的 ackno),则该数据包必须重传。
需要注意的是,接收方返回的 ackno 并不一定对应着发送方返回的 seqno(也不和 seqno 有算数关系),这是因为发送的数据可能会因为内存问题,被接收方截断。
接收方确认某个报文,指的是该报文的所有字节索引都已被确认。这意味着如果该报文只有部分被确认,则不能说明该报文已被完全确认。
TCP 的超时机制比较麻烦,这是因为超时机制直接影响到应用程序从远程服务器上读取数据的响应时间,以及影响到网络拥堵的程度。以下是实现 TCPSender 时需要注意的一些点:
-
每隔几毫秒,TCPSender的 tick 函数将会被调用,其参数声明了过去的时间。这是 TCPSender 唯一能调用的超时时间相关函数。因为直接调用 clock 或者 time 将会导致测试套件不可用。
-
TCPSender 在构造时会被给予一个重传超时时间 RTO的初始值。RTO 是在重新发送未完成 TCP 段之前需要等待的毫秒数。RTO值将会随着时间的流逝(或者更应该说是网络环境的变化)而变化,但初始的RTO将始终不变。
-
在 TCPSender 中,我们需要实现一个重传计时器。该计时器将会在 RTO 结束时进行一些操作。
-
当每次发送包含数据的数据包时,都需要启动重传计时器,并让它在 RTO 毫秒后超时。若所有发送中报文均被确认,则终止重传计时器。
-
如果重传计时器超时,则需要进行以下几步(稍微有点麻烦)
-
重传尚未被 TCP 接收方完全确认的最早报文(即最低 ackno所对应的报文)。这一步需要我们将发送中的报文数据保存至一个新的数据结构中,这样才可以追踪正处于发送状态的数据。
-
如果接收者的 window size 不为 0,即可以正常接收数据,则
- 跟踪连续重传次数。过多的重传次数可能意味着网络的中断,需要立即停止重传。
- 将RTO的值设置为先前的两倍,以降低较差网络环境的重传速度,以避免加深网络环境的拥堵。
- 重置并重启重传计时器。
-
接收者 window size 为 0 的情况将在下面说明。
-
当接收者给发送者一个确认成功接收新数据的 ack 包时(absolute ack seqno 比之前接收到的 ackno 更大):
- 将 RTO 设置回初始值
- 如果发送方存在尚未完成的数据,则重新启动重传定时器
- 将连续重传计数清零。
TCPSender 要求
在该实验中,我们需要完成 TCPSender 的以下四个接口:
-
fill_window:TCPSender 从 ByteStream 中读取数据,并以 TCPSegement 的形式发送,尽可能地填充接收者的窗口。但每个TCP段的大小不得超过
TCPConfig::MAX PAYLOAD SIZE
。-
若接收方的 Windows size 为 0,则发送方将按照接收方 window size 为 1 的情况进行处理,持续发包。
-
因为虽然此时发送方发送的数据包可能会被接收方拒绝,但接收方可以在反向发送 ack 包时,将自己最新的 window size 返回给发送者。否则若双方停止了通信,那么当接收方的 window size 变大后,发送方仍然无法得知接收方可接受的字节数量。
-
若远程没有 ack 这个在 window size 为 0 的情况下发送的一字节数据包,那么发送者重传时不要将 RTO 乘2。这是因为将 RTO 双倍的目的是为了避免网络拥堵,但此时的数据包丢弃并不是因为网络拥堵的问题,而是远程放不下了。
-
-
ack_received:对接收方返回的 ackno 和 window size 进行处理。丢弃那些已经完全确认但仍然处于追踪队列的数据包。同时如果 window size 仍然存在空闲,则继续发包。
-
tick:该函数将会被调用以指示经过的时间长度。发送方可能需要重新发送一些超时且没有被确认的数据包。
-
send_empty_segment:生成并发送一个在 seq 空间中长度为 0 并正确设置 seqno 的 TCPSegment,这可让用户发送一个空的 ACK 段。
TCPSender 状态转换图
我们无需定义新的状态变量,只需合理利用好各个公共接口的状态,即可快速确认当前的状态:
TCPSender 实现
注意点:
- 当 SYN 设置后,payload 应该在尽可能装的基础之上,少装入 1byte,因为这个 byte 大小被 SYN 占用。
- 而在 payload 尽可能装的基础上,若 FIN 装不下了,则必须在下一个包中装入 FIN 。
- FIN 包的发送必须满足三个条件:
- 从来没发送过 FIN。这是为了防止发送方在发送 FIN 包并接收到 FIN ack 包之后,循环用 FIN 包填充发送窗口的情况。
- 输入字节流处于 EOF
- window 减去 payload 大小后,仍然可以存放下 FIN
- 当循环填充发送窗口时,若发送窗口大小足够但本地没有数据包需要发送,则必须停止发送。
- 若当前 Segment 是 FIN 包,则在发送完该包后,立即停止填充发送窗口。
- 重传定时器追踪的是发送者距离上次接收到新 ack 包的时间,而不是每个处于发送中的包的超时时间。因此除 SYN 包以外(它会启动定时器),其他发包操作将不会重置 重传定时器,同时也无需为每个数据包配备一个定时器。
- 同时,只有存在新数据包被接收方确认后,才会重置定时器。
- tick 函数也是类似,只有存在处于发送状态的数据包时,重传定时器才起作用。若重传定时器超时,则重传的是第一个 seqno 最小且尚未重传的数据包。
- 当接收方的 window size 为 0 时,仍旧按照 window size 为 1 时去处理,发送一字节数据。但是,若远程没有发送 ack 包的时候,不要将 RTO 双倍,还是重置为之前的 RTO。
此部分可参考<<自顶向下学习计算机网络>> 3.5 小节
首先,我们来看一下代码中涉及到相关类:
- TCPSegment : tcp报文内存中的数据载体,主要负责Buffer和Tcp数据报结构体之间的序列号与反序列化
// tcp_segment.hh
class TCPSegment {
private:
TCPHeader _header{};
// Buffer就是对String字符串相关操作的封装
Buffer _payload{};
public:
// Parse the segment from a string
ParseResult parse(const Buffer buffer, const uint32_t datagram_layer_checksum = 0);
// Serialize the segment to a string
BufferList serialize(const uint32_t datagram_layer_checksum = 0) const;
const TCPHeader &header() const { return _header; }
TCPHeader &header() { return _header; }
const Buffer &payload() const { return _payload; }
Buffer &payload() { return _payload; }
// Segment's length in sequence space
// Equal to payload length plus one byte if SYN is set, plus one byte if FIN is set
size_t length_in_sequence_space() const;
};
// tcp_segment.hh
// buffer string/Buffer to be parsed
// datagram_layer_checksum pseudo-checksum from the lower-layer protocol
// datagram_layer_checksum 是当前层下一层对数据报计算得到的校验和
ParseResult TCPSegment::parse(const Buffer buffer, const uint32_t datagram_layer_checksum) {
// 对buffer内容计算校验和,与下面一层传入的校验和进行比对,如果不一致直接返回
InternetChecksum check(datagram_layer_checksum);
check.add(buffer);
if (check.value()) {
return ParseResult::BadChecksum;
}
// 解析拿到tcp协议请求头和请求体
NetParser p{buffer};
_header.parse(p);
_payload = p.buffer();
return p.get_error();
}
// 请求体大小+syn标志位占据的一个序列号+fin标志位占据的一个序列号
size_t TCPSegment::length_in_sequence_space() const {
return payload().str().size() + (header().syn ? 1 : 0) + (header().fin ? 1 : 0);
}
// datagram_layer_checksum pseudo-checksum from the lower-layer protocol
BufferList TCPSegment::serialize(const uint32_t datagram_layer_checksum) const {
// 拿到TCP请求头
TCPHeader header_out = _header;
header_out.cksum = 0;
// calculate checksum -- taken over entire segment
// 计算TCP报文的校验和 --- 如果TCP报文校验和与传入的校验和计算一致,那么check.value()返回值应该为0
InternetChecksum check(datagram_layer_checksum);
check.add(header_out.serialize());
check.add(_payload);
header_out.cksum = check.value();
// 将TCP报文添加到BufferList中
BufferList ret;
ret.append(header_out.serialize());
ret.append(_payload);
return ret;
}
SYN和FIN标志需要占用一个序列号,tcp使用序列号来标识一段字节流,但是序列号和流重组器中的index流索引之间并不是一一对应的关系,序列号和index流索引进行转换时,需要去掉SYN和FIN标志占用的序列号。
- libsponge/tcp_sender.hh
//! Accepts a ByteStream, divides it up into segments and sends the
//! segments, keeps track of which segments are still in-flight,
//! maintains the Retransmission Timer, and retransmits in-flight
//! segments if the retransmission timer expires.
class TCPSender {
private:
int _timeout{-1};
int _timecount{0};
// 记录已经发送但是还没有确认的TCP报文段及其起始序列号---该集合是有序的
std::map<size_t, TCPSegment> _outgoing_map{};
// 记录已经发送但是还没有确认的字节数量
size_t _outgoing_bytes{0};
// 记录接收端上一次传回来的窗口大小
size_t _last_window_size{1};
// 是否设置SYN标志
bool _set_syn_flag{false};
// 是否设置FIN标志
bool _set_fin_flag{false};
// 连续重传计数
size_t _consecutive_retransmissions_count{0};
// 初始序列号
WrappingInt32 _isn;
// outbound queue of segments that the TCPSender wants sent
// 将需要发送的TCP数据报塞入这个队列即可发送出去
std::queue<TCPSegment> _segments_out{};
// 重传计时器初始的重传时间
unsigned int _initial_retransmission_timeout;
// 等待被发送的字节流
ByteStream _stream;
// 下一个发送的字节对应的序列号
uint64_t _next_seqno{0};
public:
...
};
- libsponge/tcp_sender.cc
// 返回已经发送但是没有手动ack的字节数量
uint64_t TCPSender::bytes_in_flight() const { return _outgoing_bytes; }
void TCPSender::fill_window() {
// 如果远程窗口大小为 0, 则把其视为 1 进行操作
size_t curr_window_size = _last_window_size ? _last_window_size : 1;
// 循环填充窗口
while (curr_window_size > _outgoing_bytes) {
// 尝试构造单个数据包
// 如果此时尚未发送 SYN 数据包,则立即发送
TCPSegment segment;
if (!_set_syn_flag) {
segment.header().syn = true;
_set_syn_flag = true;
}
// 设置 seqno --- 当前这批数据字节流的起始序号
segment.header().seqno = next_seqno();
// 装入 payload
const size_t payload_size =
min(TCPConfig::MAX_PAYLOAD_SIZE,
// 减去SYN可能占据的一个序列号大小 -- SYN和FIN虽然不占据实际payload空间,但是会占据一个序列号
curr_window_size - _outgoing_bytes - segment.header().syn);
// 从待读取字节流中读取payload_size大小的字节数据出来
string payload = _stream.read(payload_size);
/**
* 读取好后,如果满足以下条件,则增加 FIN
* 1. 从来没发送过 FIN
* 2. 输入字节流处于 EOF
* 3. window 减去 payload 大小后,仍然可以存放下 FIN
*/
if (!_set_fin_flag && _stream.eof() && payload.size() + _outgoing_bytes < curr_window_size)
_set_fin_flag = segment.header().fin = true;
// 将payload载入tcp报文结构体对象
segment.payload() = Buffer(move(payload));
// 如果没有任何数据,则停止数据包的发送
if (segment.length_in_sequence_space() == 0)
break;
// 如果没有正在等待的数据包,则重设重传计时器的超时时间
if (_outgoing_map.empty()) {
_timeout = _initial_retransmission_timeout;
_timecount = 0;
}
// 发送组装好的TCP报文
_segments_out.push(segment);
// 追踪这些数据包 --> 累加记录已经发送但是还没有ack的字节数
_outgoing_bytes += segment.length_in_sequence_space();
_outgoing_map.insert(make_pair(_next_seqno, segment));
// 更新待发送 abs seqno
_next_seqno += segment.length_in_sequence_space();
// 如果设置了 fin,则直接退出填充 window 的操作
if (segment.header().fin)
break;
}
}
//! \param ackno The remote receiver's ackno (acknowledgment number)
//! \param window_size The remote receiver's advertised window size
void TCPSender::ack_received(const WrappingInt32 ackno, const uint16_t window_size) {
size_t abs_seqno = unwrap(ackno, _isn, _next_seqno);
// 如果传入的 ack 是不可靠的,则直接丢弃
if (abs_seqno > _next_seqno)
return;
// 遍历数据结构,将已经接收到的数据包丢弃
for (auto iter = _outgoing_map.begin(); iter != _outgoing_map.end();) {
// 如果一个发送的数据包已经被成功接收
const TCPSegment &seg = iter->second;
// 当前数据包的起始序列号加上总字节数 <= 接收端传回的ackno,说明当前数据包已经被成功接收了
if (iter->first + seg.length_in_sequence_space() <= abs_seqno) {
// 已经发出但是还未确认的字节数减去对应的大小
_outgoing_bytes -= seg.length_in_sequence_space();
// 从map集合中移除当前数据包
iter = _outgoing_map.erase(iter);
// 如果有新的数据包被成功接收,则清空超时时间
_timeout = _initial_retransmission_timeout;
_timecount = 0;
}
// 如果当前遍历到的数据包还没被接收,则说明后面的数据包均未被接收,因此直接返回
else
break;
}
// 重传次数归零
_consecutive_retransmissions_count = 0;
// 更新当前接收方窗口大小
_last_window_size = window_size;
// 尝试传输数据
fill_window();
}
//! \param[in] ms_since_last_tick the number of milliseconds since the last call to this method
void TCPSender::tick(const size_t ms_since_last_tick) {
_timecount += ms_since_last_tick;
auto iter = _outgoing_map.begin();
// 如果存在发送中的数据包,并且定时器超时
if (iter != _outgoing_map.end() && _timecount >= _timeout) {
// 如果窗口大小不为0还超时,则说明网络拥堵
if (_last_window_size > 0)
_timeout *= 2;
_timecount = 0;
_segments_out.push(iter->second);
// 连续重传计时器增加
++_consecutive_retransmissions_count;
}
}
unsigned int TCPSender::consecutive_retransmissions() const { return _consecutive_retransmissions_count; }
void TCPSender::send_empty_segment() {
TCPSegment segment;
segment.header().seqno = next_seqno();
_segments_out.push(segment);
}
测试
在 build 目录下执行 make 后执行 make check_lab3: