1、什么是kcp协议
了解kcp协议之前先回顾一下传输层的两大协议TCP和UDP。
kcp是一个快速可靠协议(也可以叫udp的可靠性传输)。结合了tcp的可靠性和udp的传输速度等优点,能以⽐ TCP浪费10%-20%带宽的代价,换取平均延迟降低 30%-40%,且 最⼤延迟降低三倍的传输效果。使用纯算法实现,不负责底层协议(如udp)的收发,内部没有系统调用。
kcp协议头简介
- conv :连接号。UDP是⽆连接的,conv⽤于表示来⾃于哪个客户端。对连接的⼀种替代, 因为有 conv , 所以KCP也是⽀持多路复⽤的。
- cmd :命令类型,只有四种。
IKCP_CMD_ACK,确认命令;IKCP_CMD_PUSH,数据推送命令;
IKCP_CMD_WASK,接收窗⼝⼤⼩询问命令; IKCP_CMD_WINS,接收窗⼝⼤⼩告知命令。 IKCP_CMD_PUSH 和 IKCP_CMD_ACK 关联; IKCP_CMD_WASK 和 IKCP_CMD_WINS 关联。- frg :分⽚,⽤户数据可能会被分成多个KCP包,发送出去。
- wnd :接收窗⼝⼤⼩,发送⽅的发送窗⼝不能超过接收⽅给出的数值, (其实是接收窗⼝的剩余⼤⼩,这个⼤⼩是动态变化的)
- ts : 时间序列
- sn : 序列号
- una :下⼀个可接收的序列号(连续的序列)。其实就是确认号,收到sn=10的包,una为11
- len :数据⻓度(DATA的⻓度)
- data :⽤户数据
kcp协议中相关名词说明
- 用户数据:应用层发送的数据,如一张图片2Kb的数据
- MTU:最大传输单元。即每次发送的最大数据
- RTO:Retransmission TimeOut,重传超时时间。
- RTT: 一个报文段发送出去,到收到对应确认包的时间差。
- cwnd:congestion window,拥塞窗口,表示发送方可发送多少个KCP数据包。
- 与接收方窗口有关,与网络状况(拥塞控制)有关,与发送窗口大小有关。
- rwnd:receiver window,接收方窗口大小,表示接收方还可接收多少个KCP数据包
- snd_buf:发送消息的缓存,存放处于发送状态的数据(比如调用了sendto后)
- snd_queue:待发送KCP数据包队列,暂存分片、排序后的数据,当发送缓存有空之后就会放入发送缓存。
- snd_nxt:下一个即将发送的kcp数据包序列号
- snd_una:下一个待确认的序列号
- rcv_buf:接收消息的缓存, 还不能直接供用户读取的数据
- rcv_queue:接收消息的队列, 是已经确认可以供用户读取的数据(已经排好序)
2、KCP对比TCP协议的技术特点
tcp是为了流量设计的(传输的是字节流数据,单位时间传输多少字节的数据),讲究充分利用带宽;而kcp是为了流速设计的(传输的是数据包,单个数据包从源端发到目的端需要的时间)。
- 超时重传时间对比
- TCP超时计算是RTOx2,这样连续丢三次包就变成RTOx8了,⼗分恐怖;
- ⽽KCP启动快速模式后不x2, 只是x1.5(实验证明1.5这个值相对⽐较好),提⾼了传输速度。
- 超时重传机制对比
- TCP丢包时会全部重传从丢的那个包开始以后的数据;
- KCP是选择性重传,只重传真正丢失的数据包。
- 快速重传机制对比
发送端发送了1,2,3,4,5⼏个包,如果2包丢失;
- tcp快速重传机制为:接受端收到1,3包后,连续回复发送端三次ack=2,表示2包丢失,发送端则会立即重传2包,但是多发了好几个ACK会导致网络更加拥塞。
- kcp快速重传机制为:收到远端回复的ACK: 1, 3, 4, 5,当收到ACK3时,KCP知道2被跳过1 次,收到ACK4时,知道2被跳过了2次,此时可以认为2号丢失,不⽤等超时,直接重传2号包,⼤⼤改善 了丢包时的传输速度。
- ACK机制对比
- TCP为了充分利⽤带宽,延迟发送ACK(NODELAY都没⽤),这样超时计算会算出较⼤ RTT时间,延⻓ 了丢包时的判断过程。
- KCP的ACK是否延迟发送可以调节。
- 收包确认机制对比
ARQ模型响应有两种,UNA(此编号前所有包已收到,如TCP)和ACK(该编号包已收到),光⽤UNA将 导致全部重传,光⽤ACK则丢失成本太⾼,以往协议都是⼆选其⼀,⽽ KCP协议中,除去单独的 ACK包 外,所有包都有UNA信息。
- 非退让流控
KCP正常模式同TCP⼀样使⽤公平退让法则,即发送窗⼝⼤⼩由:发送缓存⼤⼩、接收端剩余接收缓存大小、丢包退让及慢启动这四要素决定。但传送及时性要求很⾼的⼩数据时,可选择通过配置跳过后两步,仅⽤前两项来控制发送频率。以牺牲部分公平性及带宽利⽤率之代价,换取了开着BT都能流畅传输的效果。
3、kcp协议的开源案例
- kcptun: 基于 kcp-go做的⾼速远程端⼝转发(隧道) ,配合ssh -D,可以⽐ shadowsocks 更流畅的看在线视频。
- dog-tunnel: GO开发的⽹络隧道,使⽤ KCP极⼤的改进了传输速度,并移植了⼀份 GO版本 KCP
- v2ray: 著名代理软件,Shadowsocks 代替者,1.17后集成了 kcp协议,使⽤UDP传输,⽆数据包特征。
- HP-Socket: ⾼性能⽹络通信框架 HP-Socket。
- frp: ⾼性能内⽹穿透的反向代理软件,可将将内⽹服务暴露映射到外⽹服务器。
- asio-kcp: 使⽤ KCP的完整 UDP⽹络库,完整实现了基于 UDP的链接状态管理,会话控制,KCP协议调度等
- kcp-java: Java版本 KCP协议实现。
- kcp-netty: kcp的Java语⾔实现,基于netty。
- java-kcp: JAVA版本KCP,基于netty实现(包含fec功能)
- kcp-go: ⾼安全性的kcp的 GO语⾔实现,包含 UDP会话管理的简单实现,可以作为后续开发的基础库。
- kcp-csharp: kcp的 csharp移植,同时包含⼀份回话管理,可以连接上⾯kcp-go的服务端。
- kcp-csharp: 新版本 Kcp的 csharp移植。线程安全,运⾏时⽆alloc,对gc⽆压⼒。
- kcp-rs: KCP的 rust移植
- kcp-rust:新版本 KCP的 rust 移植
- tokio-kcp:rust tokio 的 kcp 集成
- lua-kcp: KCP的 Lua扩展,⽤于 Lua服务器
- node-kcp: node-js 的 KCP 接⼝
- nysocks: 基于libuv实现的node-addon,提供nodejs版本的代理服务,客户端接⼊⽀持SOCKS5和ss两种协议
- shadowsocks-android: Shadowsocks for android 集成了 kcptun 使⽤ kcp协议加速shadowsocks,效果不错
- kcpuv: 使⽤ libuv开发的kcpuv库,⽬前还在 Demo阶段
- Lantern:更好的 VPN,Github 50000 星,使⽤ kcpgo 加速
- rpcx :RPC 框架,1000+ 星,使⽤ kcpgo 加速 RPC
- xkcptun: c语⾔实现的kcptun,主要⽤于OpenWrt, LEDE开发的路由器项⽬上
- et-frame: C#前后端框架(前端unity3d),统⼀⽤C#开发游戏,实现了前后端kcp协议
4、kcp配置模式
- 工作模式:int ikcp_nodelay(ikcpcb *kcp, int nodelay, int interval, int resend, int nc)
- nodelay :是否启用 nodelay模式,0不启用;1启用。
- interval :协议内部工作的 interval,单位毫秒,比如 10ms或者 20ms
- resend :快速重传模式,默认0关闭,可以设置2(2次ACK跨越将会直接重传)
- nc :是否关闭流控,默认是0代表不关闭,1代表关闭
- 普通模式: ikcp_nodelay(kcp, 0, 40, 0, 0)
- 极速模式: ikcp_nodelay(kcp, 1, 10, 2, 1)
- 最大窗口:int ikcp_wndsize(ikcpcb *kcp, int sndwnd, int rcvwnd);
该调用将会设置协议的最大发送窗口和最大接收窗口大小,默认为32,单位为包。
- 最大传输单元:int ikcp_setmtu(ikcpcb *kcp, int mtu);
kcp协议并不负责探测MTU (最大传输单元),默认 mtu 是 1400 字节
- 最小RTO:不管是 TCP还是 KCP计算 RTO时都有最小 RTO的限制,即便计算出来RTO为40ms,由于默认的 RTO是100ms,协议只有在100ms后才能检测到丢包,快速模式下为30ms,可以手动更改该值: kcp->rx_minrto = 10;
5、源码分析
KCP 协议本质上是用纯算法实现的一个应用层协议,在使用KCP之前,需要先确定下层传输协议,即要先设置底层调用函数,也就是下文讲到的output回调函数,一般情况,KCP 采用 UDP 作为传输层协议。下图展示了 KCP 在协议栈中所处的位置:
- kcp源码整体架构图
整个KCP协议主要依靠⼀个循环ikcp_update来驱动整个算法的运转,所有的数据送,接收,状态变化 都依赖于此,所以如果有操作占⽤每⼀次update的周期过⻓,或者设置内部刷新的时间间隔过⼤,都会导 致整个算法的效率降低。在ikcp_update中最终调⽤的是ikcp_flush,这是协议中的⼀个核⼼函数,将数 据,确认包,以及窗⼝探测和应答发送到对端。
kcp源文件一共就两个文件,ikcp.c和ikcp.h
ikcp_create:创建kcp对象,每条链路都回应一个kcp对象;
ikcp_update:它是一个状态机,处理数据排序、校正、重传等,需要循环调用;
ikcp_send:调用将数据进行分片、编号等处理,放入snd_queue的待发送队列中;
ikcp_output:发送消息的回调函数
ikcp_input:预接收数据,从recvfrom读出原始数据交给kcp;
ikcp_recv:提取真正数据,返回提取到的数据大小;
发送数据流程:用户数据 -> kcp_send -> kcp_update -> output -> sendto;
接收数据流程:recvfrom -> kcp_input -> kcp_update -> kcp_recv -> 用户读出数据。
-
kcp发送数据过程:
应⽤层调⽤ ikcp_send 后,数据将会进⼊到 snd_queue 中,⽽下层函数 ikcp_flush 将会决定将多少数据从 snd_queue 中移到 snd_buf 中,然后通过回调函数kcp->output() 将snd_buf中的数据发送出去。
ikcp_send:
1、此函数的主要作用是:把用户发送的数据根据MSS(max segment size)分片成KCP的数据分片格式,插入待发送队列中。
1)当用户的数据超过一个MSS(最大分片大小)的时候,会对发送的数据进行分片处理。通过frg进行排序区分,frg即message中的segment分片ID,在message中的索引,由大到小,0表示最后一个分片。分成4片时,frg为3,2,1,0。
2)如用户发送2900字节的数据,MSS为1400byte。因此,该函数会把2900byte的用户数据分成三个分片,一个数据大小为1400,头frg设置为2,len设置为1400;第二个分片,头frg设置为1,len设置为1400; 第三个分片, 头frg设置为0,len设置为100。切好KCP分片之后,放入到名为snd_queue的待发送队列中。
2、分片方式共有两种。
1) 流模式情况下,检测每个发送队列里的分片是否达到最大MSS,如果没有达到就会用新的数据填充分片。接收端会把多片发送的数据重组为一个完整的KCP帧。
2)消息模式下,将用户数据分片,为每个分片设置sn和frag,将分片后的数据一个一个地存入发送队列,接收方通过sn和frag解析原来的包,消息方式一个分片的数据量可能不能达到MSS,也会作为一个包发送出去。3、源码:
/* 不能一下send太长的数据, 当数据长度/mss大于对方接收窗口的时候则返回错误 */ int ikcp_send(ikcpcb *kcp, const char *buffer, int len) { IKCPSEG *seg; int count, i; assert(kcp->mss > 0); // 从mtu if (len < 0) return -1; // append to previous segment in streaming mode (if possible) // 1 如果当前的 KCP 开启流模式,取出 `snd_queue` 中的最后一个报文 将其填充到 mss 的长度,并设置其 frg 为 0. if (kcp->stream != 0) { // 先不考虑流式的 if (!iqueue_is_empty(&kcp->snd_queue)) { IKCPSEG *old = iqueue_entry(kcp->snd_queue.prev, IKCPSEG, node); /*节点内数据长度小于mss,计算还可容纳的数据大小,以及本次占用的空间大小, 以此新建segment,将新建segment附加到发送队列尾,将old节点内数据拷贝过去, 然后将buffer中也拷贝其中,如果buffer中的数据没有拷贝完,extend为拷贝数据, 开始frg计数。更新len为剩余数据,删除old */ if (old->len < kcp->mss) { int capacity = kcp->mss - old->len; int extend = (len < capacity)? len : capacity; seg = ikcp_segment_new(kcp, old->len + extend); assert(seg); if (seg == NULL) { return -2; } iqueue_add_tail(&seg->node, &kcp->snd_queue); // 重新一个新segment加入 memcpy(seg->data, old->data, old->len); if (buffer) { memcpy(seg->data + old->len, buffer, extend); buffer += extend; } seg->len = old->len + extend; seg->frg = 0; len -= extend; iqueue_del_init(&old->node); ikcp_segment_delete(kcp, old); } } if (len <= 0) { return 0; } } // 2 计算数据可以被最多分成多少个frag if (len <= (int)kcp->mss) count = 1; // 分片(头部24) + user data(mss 1376) = mtu (1400) else count = (len + kcp->mss - 1) / kcp->mss; if (count >= (int)IKCP_WND_RCV) return -2; // 超过对方的初始接收窗口 // 这里的设计有个疑问,如果一致send则snd_queue一致增长, 如果去也去判断主机的发送窗口大小可能更优些 if (count == 0) count = 1; // ? // fragment // 3 将数据全部新建segment插入发送队列尾部,队列计数递增, frag递减 for (i = 0; i < count; i++) { int size = len > (int)kcp->mss ? (int)kcp->mss : len; seg = ikcp_segment_new(kcp, size); assert(seg); if (seg == NULL) { return -2; } if (buffer && len > 0) { memcpy(seg->data, buffer, size); // 拷贝数据 } seg->len = size; // 每seg 数据大小 seg->frg = (kcp->stream == 0)? (count - i - 1) : 0; // frg编号 , 流模式情况下分片编号不用填写 iqueue_init(&seg->node); iqueue_add_tail(&seg->node, &kcp->snd_queue); // 发送队列 kcp->nsnd_que++; // 发送队列++ if (buffer) { buffer += size; } len -= size; } return 0; }
ikcp_flush :
准备将 acklist 中记录的 ACK 报文发送出去,即从 acklist 中填充 ACK 报文的 sn 和 ts 字段;
检查当前是否需要对远端窗口进行探测。由于 KCP 流量控制依赖于远端通知其可接受窗口的大小,一旦远端接受窗口 kcp->rmt_wnd 为0,那么本地将不会再向远端发送数据,因此就没有机会从远端接受 ACK 报文,从而没有机会更新远端窗口大小。在这种情况下,KCP 需要发送窗口探测报文到远端,待远端回复窗口大小后,后续传输可以继续。
在发送数据之前,先设置快重传的次数和重传间隔;KCP 允许设置快重传的次数,即 fastresend 参数。
例如设置 fastresend 为2,并且发送端发送了1,2,3,4,5几个分片,收到远端的ACK: 1, 3, 4, 5,当收到ACK3时,KCP知道2被跳过1次,收到ACK4时,知道2被“跳过”了2次,此时可以认为2号丢失,不用等超时,直接重传2号包;每个报文的 fastack 记录了该报文被跳过了几次,由函数 ikcp_parse_fastack 更新。
于此同时,KCP 也允许设置 nodelay 参数,当激活该参数时,每个报文的超时重传时间将由 x2 变为 x1.5,即加快报文重传:
ikcp_flush函数的主要作用:
* ack确认包
* 探测远端窗口
* 发送snd_buf数据分片
* 更新拥塞窗口
源码:
void ikcp_flush(ikcpcb *kcp) { IUINT32 current = kcp->current; char *buffer = kcp->buffer; char *ptr = buffer; int count, size, i; IUINT32 resent, cwnd; IUINT32 rtomin; struct IQUEUEHEAD *p; int change = 0; int lost = 0; IKCPSEG seg; // 'ikcp_update' haven't been called. if (kcp->updated == 0) return; seg.conv = kcp->conv; seg.cmd = IKCP_CMD_ACK; // 这里的ack后 seg.frg = 0; seg.wnd = ikcp_wnd_unused(kcp); // 应答的时候携带了剩余的接收窗口大小 seg.una = kcp->rcv_nxt; // 已经处理到具体的分片 seg.len = 0; seg.sn = 0; seg.ts = 0; // 逐一获取acklist中的sn和ts,编码成segment // flush acknowledges count = kcp->ackcount; // 需要应答的分片数量 for (i = 0; i < count; i++) { size = (int)(ptr - buffer); if (size + (int)IKCP_OVERHEAD > (int)kcp->mtu) { ikcp_output(kcp, buffer, size); ptr = buffer; } ikcp_ack_get(kcp, i, &seg.sn, &seg.ts); // 应答包 把时间戳发回去是为了能够计算RTT ptr = ikcp_encode_seg(ptr, &seg); // 编码segment协议头 } kcp->ackcount = 0; // probe window size (if remote window size equals zero) if (kcp->rmt_wnd == 0) { if (kcp->probe_wait == 0) { // 初始化探测间隔和下一次探测时间 kcp->probe_wait = IKCP_PROBE_INIT; // 默认7秒探测 kcp->ts_probe = kcp->current + kcp->probe_wait; // 下一次探测时间 } else { //远端窗口为0,发送过探测请求,但是已经超过下次探测的时间 //更新probe_wait,增加为IKCP_PROBE_INIT+ probe_wait /2,但满足KCP_PROBE_LIMIT //更新下次探测时间 ts_probe与 探测变量 为 IKCP_ASK_SEND,立即发送探测消息 if (_itimediff(kcp->current, kcp->ts_probe) >= 0) { // 检测是否到了探测时间 if (kcp->probe_wait < IKCP_PROBE_INIT) kcp->probe_wait = IKCP_PROBE_INIT; kcp->probe_wait += kcp->probe_wait / 2; if (kcp->probe_wait > IKCP_PROBE_LIMIT) kcp->probe_wait = IKCP_PROBE_LIMIT; kcp->ts_probe = kcp->current + kcp->probe_wait; kcp->probe |= IKCP_ASK_SEND; } } } else { // 远端窗口正常,则不需要探测 远端窗口不等于0,更新下次探测时间与探测窗口等待时间为0,不发送窗口探测 kcp->ts_probe = 0; kcp->probe_wait = 0; } // flush window probing commands if (kcp->probe & IKCP_ASK_SEND) { seg.cmd = IKCP_CMD_WASK; // 窗口探测 [询问对方窗口size] size = (int)(ptr - buffer); if (size + (int)IKCP_OVERHEAD > (int)kcp->mtu) { ikcp_output(kcp, buffer, size); ptr = buffer; } ptr = ikcp_encode_seg(ptr, &seg); } // flush window probing commands if (kcp->probe & IKCP_ASK_TELL) { seg.cmd = IKCP_CMD_WINS; // [告诉对方我方窗口size], 如果不为0,可以往我方发送数据 size = (int)(ptr - buffer); if (size + (int)IKCP_OVERHEAD > (int)kcp->mtu) { ikcp_output(kcp, buffer, size); ptr = buffer; } ptr = ikcp_encode_seg(ptr, &seg); } kcp->probe = 0; //清空标识 // calculate window size 取发送窗口和远端窗口最小值得到拥塞窗口小 cwnd = _imin_(kcp->snd_wnd, kcp->rmt_wnd); // 当rmt_wnd为0的时候, // 如果没有做流控制则取配置拥塞窗口、发送窗口和远端窗口三者最小值 if (kcp->nocwnd == 0) cwnd = _imin_(kcp->cwnd, cwnd); // 进一步控制cwnd大小 // move data from snd_queue to snd_buf // 从snd_queue移动到snd_buf的数量不能超出对方的接收能力 此时如果 // 发送那些符合拥塞范围的数据分片 while (_itimediff(kcp->snd_nxt, kcp->snd_una + cwnd) < 0) { IKCPSEG *newseg; if (iqueue_is_empty(&kcp->snd_queue)) break; newseg = iqueue_entry(kcp->snd_queue.next, IKCPSEG, node); iqueue_del(&newseg->node); iqueue_add_tail(&newseg->node, &kcp->snd_buf); // 从发送队列添加到发送缓存 kcp->nsnd_que--; kcp->nsnd_buf++; //设置数据分片的属性 newseg->conv = kcp->conv; newseg->cmd = IKCP_CMD_PUSH; newseg->wnd = seg.wnd; newseg->ts = current; newseg->sn = kcp->snd_nxt++; // 序号 newseg->una = kcp->rcv_nxt; newseg->resendts = current; newseg->rto = kcp->rx_rto; newseg->fastack = 0; newseg->xmit = 0; } // calculate resent (1)使用快速重传时 resent = fastresend; (2)不使用时resent = 0xffffffff resent = (kcp->fastresend > 0)? (IUINT32)kcp->fastresend : 0xffffffff; // 使用快速重传时 rtomin = (kcp->nodelay == 0)? (kcp->rx_rto >> 3) : 0; // 最小超时时间 // flush data segments // 只要还在snd_buf 说明对方还没有应答 // 发送snd buf的分片 for (p = kcp->snd_buf.next; p != &kcp->snd_buf; p = p->next) { IKCPSEG *segment = iqueue_entry(p, IKCPSEG, node); int needsend = 0; if (segment->xmit == 0) { //1 如果该报文是第一次传输,那么直接发送 needsend = 1; segment->xmit++; // 发送次数技术 segment->rto = kcp->rx_rto; // 超时时间 segment->resendts = current + segment->rto + rtomin; // 下一次要发送的时间 } else if (_itimediff(current, segment->resendts) >= 0) { //2 当前时间达到了重发时间,但并没有新的ack到达,出现丢包, 重传 needsend = 1; segment->xmit++; kcp->xmit++; if (kcp->nodelay == 0) { segment->rto += _imax_(segment->rto, (IUINT32)kcp->rx_rto); } else { IINT32 step = (kcp->nodelay < 2)? ((IINT32)(segment->rto)) : kcp->rx_rto; segment->rto += step / 2; } segment->resendts = current + segment->rto; lost = 1; // 丢包,反应到拥塞控制策略去了 } else if (segment->fastack >= resent) { //3 segment的累计被跳过次数大于快速重传设定,需要重传 if ((int)segment->xmit <= kcp->fastlimit || kcp->fastlimit <= 0) { needsend = 1; segment->xmit++; segment->fastack = 0; segment->resendts = current + segment->rto; change++; } } if (needsend) { int need; segment->ts = current; segment->wnd = seg.wnd; // 剩余接收窗口大小(接收窗口大小-接收队列大小), 告诉对方目前自己的接收能力 segment->una = kcp->rcv_nxt; // 待接收的下一个包序号, 即是告诉对方una之前的包都收到了, 你不用再发送发送缓存了 size = (int)(ptr - buffer); need = IKCP_OVERHEAD + segment->len; if (size + need > (int)kcp->mtu) { // 小包封装成大包取发送 500 500 , 按1000发 ikcp_output(kcp, buffer, size); ptr = buffer; } ptr = ikcp_encode_seg(ptr, segment); // 把segment封装成线性buffer发送 头部+数据 if (segment->len > 0) { memcpy(ptr, segment->data, segment->len); ptr += segment->len; } if (segment->xmit >= kcp->dead_link) { kcp->state = (IUINT32)-1; } } } // flash remain segments size = (int)(ptr - buffer); // 剩余的数据 if (size > 0) { ikcp_output(kcp, buffer, size); } // update ssthresh 看完 用户态协议栈再来看这里的拥塞控制 if (change) { //如果发生了快速重传,拥塞窗口阈值降低为当前未确认包数量的一半或最小值 IUINT32 inflight = kcp->snd_nxt - kcp->snd_una; kcp->ssthresh = inflight / 2; if (kcp->ssthresh < IKCP_THRESH_MIN) kcp->ssthresh = IKCP_THRESH_MIN; kcp->cwnd = kcp->ssthresh + resent; // 动态调整拥塞控制窗口 kcp->incr = kcp->cwnd * kcp->mss; } if (lost) { kcp->ssthresh = cwnd / 2; if (kcp->ssthresh < IKCP_THRESH_MIN) kcp->ssthresh = IKCP_THRESH_MIN; kcp->cwnd = 1; //丢失则阈值减半, cwd 窗口保留为 1 动态调整拥塞控制窗口 kcp->incr = kcp->mss; } if (kcp->cwnd < 1) { kcp->cwnd = 1; kcp->incr = kcp->mss; } }
-
kcp接收数据过程:
应用层调用下层recvfrom方法接收数据,然后通过ikcp_input将数据输入到接收缓存rcv_buf中,最后通过ikcp_recv将合适的数据从接收缓存rcv_buf转移到接收队列rcv_queue中供用户直接读取。
ikcp_input:
该函数主要功能:
* 接收对方的数据输入
* 该函数主要是处理接收到的数据
* 校验数据=》解析数据=》处理数据(将合法的数据分片添加到接收buf中)=》拥塞窗口处理
分片处理:
* 1. 检测una,将una之前的分片从snd_buf清除(批量)
* 2. 检测ack,对应ack sn分片从snd_buf清除(单个)
源码:
int ikcp_input(ikcpcb *kcp, const char *data, long size) { IUINT32 prev_una = kcp->snd_una; // 最新应答的序号 IUINT32 maxack = 0, latest_ts = 0; int flag = 0; if (ikcp_canlog(kcp, IKCP_LOG_INPUT)) { ikcp_log(kcp, IKCP_LOG_INPUT, "[RI] %d bytes", (int)size); } // 据和长度的初步校验 数据太小时异常,因为kcp头部都占用了24字节了,即时sendto最小的大小为 IKCP_OVERHEAD if (data == NULL || (int)size < (int)IKCP_OVERHEAD) return -1; while (1) { IUINT32 ts, sn, len, una, conv; IUINT16 wnd; IUINT8 cmd, frg; IKCPSEG *seg; if (size < (int)IKCP_OVERHEAD) break; // 校验数据分片 data = ikcp_decode32u(data, &conv); // 获取segment头部信息 if (conv != kcp->conv) return -1; // 特别需要注意会话id的匹配 data = ikcp_decode8u(data, &cmd); data = ikcp_decode8u(data, &frg); data = ikcp_decode16u(data, &wnd); data = ikcp_decode32u(data, &ts); data = ikcp_decode32u(data, &sn); data = ikcp_decode32u(data, &una); data = ikcp_decode32u(data, &len); size -= IKCP_OVERHEAD; //剔除固定的包头信息长度 if ((long)size < (long)len || (int)len < 0) return -2; // 数据不足或者, 没有真正的数据存在 //只支持{IKCP_CMD_PUSH, IKCP_CMD_ACK, IKCP_CMD_WASK, IKCP_CMD_WINS}指令 //其他不合法 if (cmd != IKCP_CMD_PUSH && cmd != IKCP_CMD_ACK && cmd != IKCP_CMD_WASK && cmd != IKCP_CMD_WINS) return -3; kcp->rmt_wnd = wnd; // 携带了远端的接收窗口 ikcp_parse_una(kcp, una); // 删除小于snd_buf中小于una的segment, 意思是una之前的都已经收到了 ikcp_shrink_buf(kcp); // 更新snd_una为snd_buf中seg->sn或kcp->snd_nxt ,更新下一个待应答的序号 if (cmd == IKCP_CMD_ACK) { if (_itimediff(kcp->current, ts) >= 0) { // 根据应答判断rtt //更新rx_srtt,rx_rttval,计算kcp->rx_rto ikcp_update_ack(kcp, _itimediff(kcp->current, ts)); } //遍历snd_buf中(snd_una, snd_nxt),将sn相等的删除,直到大于sn ikcp_parse_ack(kcp, sn); // 将已经ack的分片删除 ikcp_shrink_buf(kcp); // 更新控制块的 snd_una if (flag == 0) { flag = 1; //快速重传标记 maxack = sn; // 记录最大的 ACK 编号 latest_ts = ts; } else { if (_itimediff(sn, maxack) > 0) { #ifndef IKCP_FASTACK_CONSERVE maxack = sn; // 记录最大的 ACK 编号 latest_ts = ts; #else if (_itimediff(ts, latest_ts) > 0) { maxack = sn; latest_ts = ts; } #endif } } if (ikcp_canlog(kcp, IKCP_LOG_IN_ACK)) { ikcp_log(kcp, IKCP_LOG_IN_ACK, "input ack: sn=%lu rtt=%ld rto=%ld", (unsigned long)sn, (long)_itimediff(kcp->current, ts), (long)kcp->rx_rto); } } else if (cmd == IKCP_CMD_PUSH) { //接收到具体的数据包 if (ikcp_canlog(kcp, IKCP_LOG_IN_DATA)) { ikcp_log(kcp, IKCP_LOG_IN_DATA, "input psh: sn=%lu ts=%lu", (unsigned long)sn, (unsigned long)ts); } if (_itimediff(sn, kcp->rcv_nxt + kcp->rcv_wnd) < 0) { ikcp_ack_push(kcp, sn, ts); // 对该报文的确认 ACK 报文放入 ACK 列表中 // 判断接收的数据分片编号是否符合要求,即:在接收窗口(滑动窗口)范围之内 if (_itimediff(sn, kcp->rcv_nxt) >= 0) { // 是要接受起始的序号 seg = ikcp_segment_new(kcp, len); seg->conv = conv; seg->cmd = cmd; seg->frg = frg; seg->wnd = wnd; seg->ts = ts; seg->sn = sn; seg->una = una; seg->len = len; if (len > 0) { memcpy(seg->data, data, len); } //1. 丢弃sn > kcp->rcv_nxt + kcp->rcv_wnd的segment; //2. 逐一比较rcv_buf中的segment,若重复丢弃,非重复,新建segment加入; //3. 检查rcv_buf的包序号sn,如果是待接收的序号rcv_nxt,且可以接收(接收队列小 于接收窗口), // 转移segment到rcv_buf,nrcv_buf减少,nrcv_que增加,rcv_nxt增加; ikcp_parse_data(kcp, seg); // 将该报文插入到 rcv_buf 链表中 } } } else if (cmd == IKCP_CMD_WASK) { // ready to send back IKCP_CMD_WINS in ikcp_flush // 如果是探测包 // tell remote my window size 添加相应的标识位 kcp->probe |= IKCP_ASK_TELL; // 收到对方请求后标记自己要告诉对方自己的窗口 if (ikcp_canlog(kcp, IKCP_LOG_IN_PROBE)) { ikcp_log(kcp, IKCP_LOG_IN_PROBE, "input probe"); } } else if (cmd == IKCP_CMD_WINS) { // do nothing 如果是tell me 远端窗口大小,什么都不做 if (ikcp_canlog(kcp, IKCP_LOG_IN_WINS)) { ikcp_log(kcp, IKCP_LOG_IN_WINS, "input wins: %lu", (unsigned long)(wnd)); } } else { return -3; } data += len; size -= len; } if (flag != 0) { ikcp_parse_fastack(kcp, maxack, latest_ts); } // 如果snd_una增加了那么就说明对端正常收到且回应了发送方发送缓冲区第一个待确认的包, // 此时需要更新cwnd(拥塞窗口) if (_itimediff(kcp->snd_una, prev_una) > 0) { //如何拥塞窗口小于远端窗口 if (kcp->cwnd < kcp->rmt_wnd) { IUINT32 mss = kcp->mss; //最大分片大小 if (kcp->cwnd < kcp->ssthresh) { //拥塞窗口小于阈值 kcp->cwnd++; // 扩大窗口? kcp->incr += mss; } else { if (kcp->incr < mss) kcp->incr = mss; kcp->incr += (mss * mss) / kcp->incr + (mss / 16); if ((kcp->cwnd + 1) * mss <= kcp->incr) { #if 1 kcp->cwnd = (kcp->incr + mss - 1) / ((mss > 0)? mss : 1); #else kcp->cwnd++; #endif } } //如果拥塞窗口大于远端窗口 if (kcp->cwnd > kcp->rmt_wnd) { kcp->cwnd = kcp->rmt_wnd; //则使用远端窗口 kcp->incr = kcp->rmt_wnd * mss; //并设置相应数据量,该数据量以字节数 } } } return 0; }
ikcp_recv:
该函数主要功能:
* 读取组好包的数据
* 将接收缓存rcv_buf的分片转移到接收队列rcv_queue
* 如果有接收空间则将kcp->probe |= IKCP_ASK_TELL; 以在update的时候告知对方可以发送数据了。
源码:
int ikcp_recv(ikcpcb *kcp, char *buffer, int len) { struct IQUEUEHEAD *p; int ispeek = (len < 0)? 1 : 0; // peek:窥视; 偷看. 如果是 ispeek 说明只是为了拿数据看看,则不用将数据从queue删除 int peeksize; int recover = 0; IKCPSEG *seg; assert(kcp); // 排序好的数据存放在rcv_queue, 反过来待发送的数据则存放在snd_queue if (iqueue_is_empty(&kcp->rcv_queue)) // 如果为空则没有数据可读 return -1; if (len < 0) len = -len; //计算当前接收队列中的属于同一个消息的数据总长度(不是所有消息的总长度),注意这里的同一个消息是seg->frg进行标记 peeksize = ikcp_peeksize(kcp); // 当我们没有采用流式传输的时候, 我们接收的则是类似 udp一样的报文传输方式 if (peeksize < 0) // 没有数据可读 return -2; if (peeksize > len) // 可读数据大于 用户传入的长度,每次读取需要一次性读取完毕,类似udp报文的读取 return -3; // KCP 协议在远端窗口为0的时候将会停止发送数据 if (kcp->nrcv_que >= kcp->rcv_wnd) // 接收队列segment数量大于等于接收窗口,标记窗口可以恢复 recover = 1; // 标记可以开始窗口恢复 // merge fragment 将属于同一个消息的各分片重组完整数据,并删除rcv_queue中segment,nrcv_que减少 // 经过 ikcp_send 发送的数据会进行分片,分片编号为倒序序号,因此 frg 为 0 的数据包标记着完整接收到了一次 send 发送过来的数据 for (len = 0, p = kcp->rcv_queue.next; p != &kcp->rcv_queue; ) { int fragment; seg = iqueue_entry(p, IKCPSEG, node); p = p->next; if (buffer) { memcpy(buffer, seg->data, seg->len); // 把queue的数据就放入用户buffer buffer += seg->len; } len += seg->len; fragment = seg->frg; if (ikcp_canlog(kcp, IKCP_LOG_RECV)) { ikcp_log(kcp, IKCP_LOG_RECV, "recv sn=%lu", (unsigned long)seg->sn); } if (ispeek == 0) { iqueue_del(&seg->node); ikcp_segment_delete(kcp, seg); // 删除节点 kcp->nrcv_que--; // nrcv_que接收队列-1 } if (fragment == 0) // 完整的数据接收到, send的时候如果数据被分片,比如分成 n个片,则0 .. n-2的fragment为1,n-1的为0 break; } assert(len == peeksize); // move available data from rcv_buf -> rcv_queue // 将合适的数据从接收缓存rcv_buf 转移到接收队列rcv_queue(这个过程就是排序), 已经确认的以及接收窗口未满 while (! iqueue_is_empty(&kcp->rcv_buf)) { seg = iqueue_entry(kcp->rcv_buf.next, IKCPSEG, node); // 条件1 序号是该接收的数据 // 条件2 接收队列nrcv_que < 接收窗口rcv_wnd; if (seg->sn == kcp->rcv_nxt && kcp->nrcv_que < kcp->rcv_wnd) { iqueue_del(&seg->node); kcp->nrcv_buf--; iqueue_add_tail(&seg->node, &kcp->rcv_queue); kcp->nrcv_que++; // 接收队列 有多少个分片 + 1 kcp->rcv_nxt++; // 接收序号 + 1 } else { break; } } // fast recover ,nrcv_que小于rcv_wnd, 说明接收端有空间继续接收数据了 if (kcp->nrcv_que < kcp->rcv_wnd && recover) { // ready to send back IKCP_CMD_WINS in ikcp_flush // tell remote my window size kcp->probe |= IKCP_ASK_TELL; } return len; }
6、待完善
kcp协议的分片机制、ack和una确认机制、重传机制、拥塞控制机制等具体实现。