quinn源码解析:QUIC数据包是如何发送的

news2025/1/17 5:52:52

quinn源码解析:QUIC数据包是如何发送的

  • 简介
  • QUIC协议中的概念
    • endpoint(端点)
    • connection(连接)
    • Stream(流)
    • Frame (帧)
  • 发包过程解析
    • SendStream::write_all
    • ConnectionDriver
    • EndpointDriver

简介

quinn是Rust编程语言中用于实现QUIC(Quick UDP Internet Connections)协议的一个crate(包)。它提供了一个高级别的API,用于构建基于QUIC的网络应用程序。quinn crate的设计目标是提供一个简洁、安全和高性能的QUIC实现。它内部使用了Rust的异步编程模型(async/await),使得编写异步网络代码更加方便和高效。
本文主要介绍其发送数据的流程

QUIC协议中的概念

endpoint(端点)

在QUIC(Quick UDP Internet Connections)协议中,Endpoint(端点)是指QUIC连接的一端,可以是客户端或服务器。每个端点都有自己的网络地址,并与其他端点进行通信以建立和管理QUIC连接。在quinn中,endpoint对应一个操作系统的socket。例如client的Endpoint创建时就是bind了一个本地的地址。

    pub fn client(addr: SocketAddr) -> io::Result<Self> {
        let socket = std::net::UdpSocket::bind(addr)?;
        let runtime = default_runtime()
            .ok_or_else(|| io::Error::new(io::ErrorKind::Other, "no async runtime found"))?;
        Self::new_with_runtime(
            EndpointConfig::default(),
            None,
            runtime.wrap_udp_socket(socket)?,
            runtime,
        )
    }

connection(连接)

两个endpoint之间可以建立connection,并且一个endpoint可以向多个endpoint建立连接。

注意与TCP不同的是,QUIC的一个socket可以同时向多个其他socket建立连接。而TCP中每一个连接都对应client和server端的两个socket。

在这里插入图片描述

Stream(流)

一条连接可以同时存在多条流,每条流上的数据相互独立,一条流发生阻塞不会影响其他流。(TCP相当于只有一条流,所以会有对头阻塞的缺陷。)

client的流ID为奇数,server的流ID为偶数

在这里插入图片描述

Frame (帧)

流是抽象出的概念,而实际上在链路上传输的只是不同的帧,不同流的帧中会有流ID用于标识此帧属于哪条流,接收端收到后根据流ID将对应的帧放入对应的流缓冲区。
在这里插入图片描述

发包过程解析

以官方的client Example为例。其关键步骤如下述伪代码所示,主要包括:创建endpoint、创建连接、创建流、最后写入数据。

 //创建endpoint
 let mut endpoint = quinn::Endpoint::client("[::]:0".parse().unwrap())?; 
    ...
    //创建连接
    let conn = endpoint
        .connect(remote, host)?
        .await
        .map_err(|e| anyhow!("failed to connect: {}", e))?;
    //创建流
    let (mut send, mut recv) = conn
        .open_bi()
        .await
        .map_err(|e| anyhow!("failed to open stream: {}", e))?;

	//写数据
    send.write_all(request.as_bytes())
        .await
        .map_err(|e| anyhow!("failed to send request: {}", e))?;

SendStream::write_all

首先我们以流写入数据为切入点来看。
write_all接口实际上是产生了一个WriteAllFuture,数据会暂时放在WriteAll结构体里。当Runtime(默认为Tokio的运行时)下一次pollFuture时才会将数据写入到该流的缓冲区中。

impl<'a> Future for WriteAll<'a> {
    type Output = Result<(), WriteError>;
    fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
        let this = self.get_mut();
        loop {
            if this.buf.is_empty() {
                return Poll::Ready(Ok(()));
            }
            let buf = this.buf;
            #将数据写入缓冲区
            let n = ready!(this.stream.execute_poll(cx, |s| s.write(buf)))?;
            this.buf = &this.buf[n..];
        }
    }
}

注意向流的缓冲区写数据时,是经过了流控逻辑的:当可写空间为0时,写操作会被block。可写空间一般由send_window-unacked_datasend_windowunacked_data都是连接级的,所有流都受此限制。send_window是开始时设置的,此值决定整个连接的发送缓冲区的峰值大小。当应用连接数较多时应该谨慎设置此值,避免因内存占用过多而引起OOM。

    /// Returns the maximum amount of data this is allowed to be written on the connection
    pub(crate) fn write_limit(&self) -> u64 {
        (self.max_data - self.data_sent).min(self.send_window - self.unacked_data)
    }

写入的数据最终又被暂时放在SendBufferunacked_segments里。

impl SendBuffer {
    /// Append application data to the end of the stream
    pub(super) fn write(&mut self, data: Bytes) {
        self.unacked_len += data.len();
        self.offset += data.len() as u64;
        self.unacked_segments.push_back(data);
    }
}

到这里,write_all这个操作就算是结束了。那么放入缓冲区的数据又是如何进一步被发送的呢?

ConnectionDriver

我们把视线回到 endpoint.connect(remote, host)?.await,在连接建立时,产生了一个ConnectionDriverFuture,此ConnectionDriver一产生就被丢进runtime中去持续地执行了。

        runtime.spawn(Box::pin(
            ConnectionDriver(conn.clone()).instrument(Span::current()),
        ));

而这个ConnectionDriver在被poll时最终会调用Connection::poll_transmit–>Connection::populate_packet获取将要发送的帧

    fn populate_packet(
        &mut self,
        now: Instant,
        space_id: SpaceId,
        buf: &mut BytesMut,
        max_size: usize,
        pn: u64,
    ) -> SentFrames {
        let mut sent = SentFrames::default();

		...
		...

        // STREAM
        if space_id == SpaceId::Data {
            sent.stream_frames = self.streams.write_stream_frames(buf, max_size);
            self.stats.frame_tx.stream += sent.stream_frames.len() as u64;
        }

        sent
    }

StreamsState::write_stream_frames方法中从优先级队列中取出优先级最高的流并将其数据写入buf,如果流的数据都已发送完毕则将此流从优先级队列中取出。

pub(crate) fn write_stream_frames(
        &mut self,
        buf: &mut BytesMut,
        max_buf_size: usize,
    ) -> StreamMetaVec {
        let mut stream_frames = StreamMetaVec::new();
        while buf.len() + frame::Stream::SIZE_BOUND < max_buf_size {
            if max_buf_size
                .checked_sub(buf.len() + frame::Stream::SIZE_BOUND)
                .is_none()
            {
                break;
            }
			//不同优先级的数量
            let num_levels = self.pending.len();
            //获取优先级最高的队列
            let mut level = match self.pending.peek_mut() {
                Some(x) => x,
                None => break,
            };
            // Poppping data from the front of the queue, storing as much data
            // as possible in a single frame, and enqueing sending further
            // remaining data at the end of the queue helps with fairness.
            // Other streams will have a chance to write data before we touch
            // this stream again.
            //从队列中拿到第一个流
            let id = match level.queue.get_mut().pop_front() {
                Some(x) => x,
                None => {
                    debug_assert!(
                        num_levels == 1,
                        "An empty queue is only allowed for a single level"
                    );
                    break;
                }
            };
            //拿到具体的流
            let stream = match self.send.get_mut(&id) {
                Some(s) => s,
                // Stream was reset with pending data and the reset was acknowledged
                None => continue,
            };

            // Reset streams aren't removed from the pending list and still exist while the peer
            // hasn't acknowledged the reset, but should not generate STREAM frames, so we need to
            // check for them explicitly.
            if stream.is_reset() {
                continue;
            }

            // Now that we know the `StreamId`, we can better account for how many bytes
            // are required to encode it.
            let max_buf_size = max_buf_size - buf.len() - 1 - VarInt::size(id.into());
            //从流中获取到本次要写的偏移量
            let (offsets, encode_length) = stream.pending.poll_transmit(max_buf_size);
            //如果流中的数据都已经发送完,则将此流从pending队列中移除
            let fin = offsets.end == stream.pending.offset()
                && matches!(stream.state, SendState::DataSent { .. });
            if fin {
                stream.fin_pending = false;
            }

            if stream.is_pending() {
                if level.priority == stream.priority {
                    // Enqueue for the same level
                    level.queue.get_mut().push_back(id);
                } else {
                    // Enqueue for a different level. If the current level is empty, drop it
                    if level.queue.borrow().is_empty() && num_levels != 1 {
                        // We keep the last level around even in empty form so that
                        // the next insert doesn't have to reallocate the queue
                        PeekMut::pop(level);
                    } else {
                        drop(level);
                    }
                    push_pending(&mut self.pending, id, stream.priority);
                }
            } else if level.queue.borrow().is_empty() && num_levels != 1 {
                // We keep the last level around even in empty form so that
                // the next insert doesn't have to reallocate the queue
                PeekMut::pop(level);
            }

            let meta = frame::StreamMeta { id, offsets, fin };
            trace!(id = %meta.id, off = meta.offsets.start, len = meta.offsets.end - meta.offsets.start, fin = meta.fin, "STREAM");
            //写入帧的头部
            meta.encode(encode_length, buf);

            // The range might not be retrievable in a single `get` if it is
            // stored in noncontiguous fashion. Therefore this loop iterates
            // until the range is fully copied into the frame.
            let mut offsets = meta.offsets.clone();
            while offsets.start != offsets.end {
                let data = stream.pending.get(offsets.clone());
                offsets.start += data.len() as u64;
                //写入具体数据
                buf.put_slice(data);
            }
            stream_frames.push(meta);
        }

        stream_frames
    }

到了这里,要发送的数据实际上还是暂存在缓冲区了。然后又以EndpointEvent::Transmit事件的方式通过channel发送到endpoint的协程里。

fn drive_transmit(&mut self) -> bool {
        let now = Instant::now();
        let mut transmits = 0;

        let max_datagrams = self.socket.max_transmit_segments();
        let capacity = self.inner.current_mtu();
        let mut buffer = BytesMut::with_capacity(capacity as usize);

        while let Some(t) = self.inner.poll_transmit(now, max_datagrams, &mut buffer) {
            transmits += match t.segment_size {
                None => 1,
                Some(s) => (t.size + s - 1) / s, // round up
            };
            // If the endpoint driver is gone, noop.
            let size = t.size;
            //将要发送的数据发送到endpoint协程
            let _ = self.endpoint_events.send((
                self.handle,
                EndpointEvent::Transmit(t, buffer.split_to(size).freeze()),
            ));

            if transmits >= MAX_TRANSMIT_DATAGRAMS {
                // TODO: What isn't ideal here yet is that if we don't poll all
                // datagrams that could be sent we don't go into the `app_limited`
                // state and CWND continues to grow until we get here the next time.
                // See https://github.com/quinn-rs/quinn/issues/1126
                return true;
            }
        }

        false
    }

ConnectionDriver的任务到这里就完成了,总的来说ConnectionDriver的任务就是从流中取出数据,并最终将数据通过channel发送给endpoint

EndpointDriver

connection的逻辑类似,endpoints建立时就已经spawn了一个EndpointDriver在后台一直poll,正是在poll方法中会处理来自ConnectionDriver发来的events,并写入outgoing缓冲区中。

    fn handle_events(&mut self, cx: &mut Context, shared: &Shared) -> bool {
        use EndpointEvent::*;
        for _ in 0..IO_LOOP_BOUND {
            match self.events.poll_recv(cx) {
                Poll::Ready(Some((ch, event))) => match event {
                    ...
                    ...
                    //接受从ConnectionDriver发过来的Transmit,并写入到outgoing缓冲区中
                    Transmit(t, buf) => {
                        let contents_len = buf.len();
                        self.outgoing.push_back(udp_transmit(t, buf));
                        self.transmit_queue_contents_len = self
                            .transmit_queue_contents_len
                            .saturating_add(contents_len);
                    }
                },
                Poll::Ready(None) => unreachable!("EndpointInner owns one sender"),
                Poll::Pending => {
                    return false;
                }
            }
        }

        true
    }

drive_send中从outgoing缓冲区中取出数据并写入socket

 fn drive_send(&mut self, cx: &mut Context) -> Result<bool, io::Error> {
        self.send_limiter.start_cycle();

        let result = loop {
            if self.outgoing.is_empty() {
                break Ok(false);
            }

            if !self.send_limiter.allow_work() {
                break Ok(true);
            }
			//实际写入
            match self.socket.poll_send(cx, self.outgoing.as_slices().0) {
                Poll::Ready(Ok(n)) => {
                    let contents_len: usize =
                        self.outgoing.drain(..n).map(|t| t.contents.len()).sum();
                    self.transmit_queue_contents_len = self
                        .transmit_queue_contents_len
                        .saturating_sub(contents_len);
                    // We count transmits instead of `poll_send` calls since the cost
                    // of a `sendmmsg` still linearly increases with number of packets.
                    self.send_limiter.record_work(n);
                }
                Poll::Pending => {
                    break Ok(false);
                }
                Poll::Ready(Err(e)) => {
                    break Err(e);
                }
            }
        };

        self.send_limiter.finish_cycle();
        result
    }

至此,整个发送过程就算完了。写入socket的数据由具体的操作系统底层去实现了。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1228781.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【洛谷算法题】P5713-洛谷团队系统【入门2分支结构】

&#x1f468;‍&#x1f4bb;博客主页&#xff1a;花无缺 欢迎 点赞&#x1f44d; 收藏⭐ 留言&#x1f4dd; 加关注✅! 本文由 花无缺 原创 收录于专栏 【洛谷算法题】 文章目录 【洛谷算法题】P5713-洛谷团队系统【入门2分支结构】&#x1f30f;题目描述&#x1f30f;输入格…

项目自动化构建工具——make/Makefile

目录 一、概念 二、使用实例 三、原理 四、进度条程序 1、缓冲区问题 1、概念 2、\r和\n 2、代码编写 一、概念 一个工程中的源文件不计数&#xff0c;其按类型、功能、模块分别放在若干个目录中&#xff0c;makefile定义了一系列的规则来指定&#xff0c;哪些文件需要先…

第五篇 《随机点名答题系统》——抽点答题详解(类抽奖系统、在线答题系统、线上答题系统、在线点名系统、线上点名系统、在线考试系统、线上考试系统)

目录 1.功能需求 2.界面设计 3.流程设计 4.关键代码 随机点名答题系统&#xff08;类抽奖系统、在线答题系统、线上答题系统、在线点名系统、线上点名系统、在线考试系统、线上考试系统&#xff09;&#xff0c;是基于php&#xff08;8.2.11&#xff09;&#xff0c;Java…

迪克森电荷泵

迪克森电荷泵&#xff08;Dickson Charge Pump&#xff09;是一种电压倍增器电路&#xff0c;可以将低电压升高到较高电压&#xff0c;相对于其他电压升压电路&#xff0c;迪克森电荷泵具有较高的效率和较简单的电路结构。该电路的基本原理是通过电容和开关来实现电荷的积累和转…

上海亚商投顾:三大指数小幅上涨 HBM概念股全天强势

上海亚商投顾前言&#xff1a;无惧大盘涨跌&#xff0c;解密龙虎榜资金&#xff0c;跟踪一线游资和机构资金动向&#xff0c;识别短期热点和强势个股。 一.市场情绪 三大指数早盘窄幅震荡&#xff0c;午后集体拉升翻红&#xff0c;黄白二线走势分化&#xff0c;题材热点快速轮…

计算两个向量的叉积numpy.cross()

【小白从小学Python、C、Java】 【计算机等考500强证书考研】 【Python-数据分析】 计算两个向量的叉积 numpy.cross() [太阳]选择题 请问代码中最后输出正确的是&#xff1f; import numpy as np a np.array([1, 2, 3]) b np.array([4, 5, 6]) c np.cross(a, b) pri…

C#,数值计算——插值和外推,Laplace_interp的计算方法与源程序

1 文本格式 using System; namespace Legalsoft.Truffer { /// <summary> /// Object for interpolating missing data in a matrix by solving Laplaces /// equation.Call constructor once, then solve one or more times /// </summary> …

初刷leetcode题目(3)——数据结构与算法

&#x1f636;‍&#x1f32b;️&#x1f636;‍&#x1f32b;️&#x1f636;‍&#x1f32b;️&#x1f636;‍&#x1f32b;️Take your time ! &#x1f636;‍&#x1f32b;️&#x1f636;‍&#x1f32b;️&#x1f636;‍&#x1f32b;️&#x1f636;‍&#x1f32b;️…

Go语言常用命令详解(二)

文章目录 前言常用命令go bug示例参数说明 go doc示例参数说明 go env示例 go fix示例 go fmt示例 go generate示例 总结写在最后 前言 接着上一篇继续介绍Go语言的常用命令 常用命令 以下是一些常用的Go命令&#xff0c;这些命令可以帮助您在Go开发中进行编译、测试、运行和…

《数字图像处理-OpenCV/Python》连载(44)图像的投影变换

《数字图像处理-OpenCV/Python》连载&#xff08;44&#xff09;图像的投影变换 本书京东优惠购书链接&#xff1a;https://item.jd.com/14098452.html 本书CSDN独家连载专栏&#xff1a;https://blog.csdn.net/youcans/category_12418787.html 第 6 章 图像的几何变换 几何变…

应用开发平台集成表单设计器系列之3——整体集成思路及表单设计器功能深度了解

背景 平台需要实现自定义表单功能&#xff0c;作为低代码开发的一部分&#xff0c;通过技术预研和技术选型&#xff0c;选择form-create和form-create-designer这两个组件进行集成作为实现方案。通过深入了解和技术验证&#xff0c;确认了组件的功能能满足需求&#xff0c;具备…

移动机器人路径规划(二)--- 图搜索基础,Dijkstra,A*,JPS

目录 1 图搜索基础 1.1 机器人规划的配置空间 Configuration Space 1.2 图搜索算法的基本概念 1.3 启发式的搜索算法 Heuristic search 2 A* Dijkstra算法 2.1 Dijkstra算法 2.2 A*&&Weighted A*算法 2.3 A* 算法的工程实践中的应用 3 JPS 1 图搜索基础 1.1…

V100 GPU服务器安装GPU驱动教程

大家好,我是爱编程的喵喵。双985硕士毕业,现担任全栈工程师一职,热衷于将数据思维应用到工作与生活中。从事机器学习以及相关的前后端开发工作。曾在阿里云、科大讯飞、CCF等比赛获得多次Top名次。现为CSDN博客专家、人工智能领域优质创作者。喜欢通过博客创作的方式对所学的…

计算机网络——物理层-信道的极限容量(奈奎斯特公式、香农公式)

目录 介绍 奈氏准则 香农公式 介绍 信号在传输过程中&#xff0c;会受到各种因素的影响。 如图所示&#xff0c;这是一个数字信号。 当它通过实际的信道后&#xff0c;波形会产生失真&#xff1b;当失真不严重时&#xff0c;在输出端还可根据已失真的波形还原出发送的码元…

JVM垃圾回收相关概念

目录 一、System.gc()的理解 二、内存溢出与内存泄露 &#xff08;一&#xff09;OOM &#xff08;二&#xff09;内存泄露 三、StopTheWorld 四、垃圾回收的并行与并发 五、安全点与安全区域 &#xff08;一&#xff09;安全点 &#xff08;二&#xff09;安全区域 …

数据结构【DS】树与二叉树的应用

哈夫曼树 树的带权路径长度最小的二叉树WPL 路径长度【边数】 * 结点权值n个叶结点的哈夫曼树共有 2n-1 个结点 哈夫曼树的任意非叶结点的左右子树交换后仍是哈夫曼树对同一组权值&#xff0c;可能存在不同构的多棵哈夫曼树&#xff0c;但树的带权路径长度最小且唯一哈夫曼树…

C/C++高精度

个人主页&#xff1a;仍有未知等待探索_C语言疑难,数据结构,小项目-CSDN博客 专题分栏&#xff1a;算法_仍有未知等待探索的博客-CSDN博客 为什么需要高精度算法&#xff1f; 由于c不能进行位数过高的数据运算&#xff0c;所以要通过模拟数组来进行运算&#xff0c;首先是加法。…

基于类电磁机制算法优化概率神经网络PNN的分类预测 - 附代码

基于类电磁机制算法优化概率神经网络PNN的分类预测 - 附代码 文章目录 基于类电磁机制算法优化概率神经网络PNN的分类预测 - 附代码1.PNN网络概述2.变压器故障诊街系统相关背景2.1 模型建立 3.基于类电磁机制优化的PNN网络5.测试结果6.参考文献7.Matlab代码 摘要&#xff1a;针…

使用ChatGPT进行数据分析案例——贷款数据分析

目录 数据数据 每一行是一个记录,代表着一笔贷款,每一列是一个特征,一共1万多条数据,最后一列非常重要save_loans是否成功收回

SpringBean的配置详解 --中

目录 Bean的初始化和销毁方法配置 Bean的初始化和销毁方法配置 扩展 Bean的实例化 Bean的初始化和销毁方法配置 当lazy-init设置为true时为延迟加载&#xff0c;也就是当Spring容器加载的时候&#xff0c;不会立即创建Bean实例&#xff0c;等待用到时再创建Bean实例并存储到单…