CS144-Lab1

news2025/1/11 7:44:36

实验架构

TCP实施中模块和数据流的排列 :

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-pmRfy6Va-1676857163260)(null)]

字节流是Lab0。TCP的工作是通过不可靠的数据报网络传输两个字节流(每个方向一个),以便写入连接一侧套接字的字节显示为可在对等端读取的字节,反之亦然。Lab1是StreamReAssemer,在Lab2、3和4中,您将实施TCPReceiverTCPSender,然后实施 TCPConnection 将它们连接在一起。

  1. 在Lab1中,您将实现一个流重组器-该模块将字节流的一小部分(称为子串或段)按正确的顺序缝合回连续的字节流。

  2. 在Lab2中,您将实现TCP中处理入站字节流的部分:TCPReceiver。这涉及到考虑TCP将如何表示流中每个字节的位置-称为“序列号”。TCPReceiver 负责告诉发送者(A)它已经能够成功组装多少入站字节流(这称为“确认”)和(B)发送者现在被允许发送多少字节(“flow控制”)。(B)TCPReceiver 负责告诉发送者(A)它已经能够成功组装多少入站字节流(这称为“确认”)和(B)允许发送者现在发送多少字节(“flow control”)。

  3. 在Lab3中,您将实现TCP中处理出站字节流的部分:TCPSender。当发送方怀疑其传输的数据段在途中丢失并且从未到达接收方时,它应该如何反应?它应该在什么时候重试并重新传输丢失的数据段?

  4. 在Lab4中,您将结合前面的工作和Lab来创建工作的TCP实现:包含TCPSenderTCPReceiverTCPConnection。您将使用它与世界各地的真实服务器进行对话。

您的Push Substring方法将忽略会导致 StreamReAssembly 超出其“容量”的字符串的任何部分:内存使用限制,即允许它存储的最大字节数。这可以防止重新组装器使用无限量的内存,无论TCP发送器决定执行什么操作。我们已经在下面的图片中对此进行了说明。“容量”是两者的上限:

  1. 重组的ByteStream中的字节数(如下绿色所示),以及

  2. unassembled”的子字符串可以使用的最大字节数(以红色显示)

image-20220323012722658

  • 红色:re-assembler 保存在辅助存储器中的已接收字节
  • 绿色:re-assembler 保存在字节流中的已接收字节数
  • 蓝色:已读取的已接收字节数

说明

  • 整个数据流中第一个字节的索引是什么?
    • 0。
  • 我的实现应该有多大的效率?
    • 我们还不打算指定一个效率的概念,但请不要建立一个严重影响空间或时间的数据结构——这个数据结构将是你的TCP实现的基础。
  • 应该如何处理不一致的子串?
    • 你可以假设它们不存在。也就是说,你可以假设有一个唯一的底层字节流,而所有的子串都是它的(精确)片段。
  • 我可以使用什么?
    • 你可以使用你认为有用的标准库的任何部分。特别是,我们希望你至少要使用一个数据结构。
  • 字节什么时候应该被写入流中?
    • 越快越好。一个字节不应该出现在流中的唯一情况是,在它之前有一个字节还没有被”push”。
  • 子串可能重叠吗?
    • 可能。
  • 我是否需要向StreamReassembler添加私有成员?
    • 是的。由于段可能以任何顺序到达,你的数据结构将不得不记住子串,直到它们准备好被放入流中,也就是说,直到它们之前的所有索引都已填充。

实现思路

1. 要求

在我们所实现的 StreamReassembler 中,有以下几种特性:

  • 接收子字符串。这些子字符串中包含了一串字节,以及该字符串在总的数据流中的第一个字节的索引
  • 流的每个字节都有自己唯一的索引,从零开始向上计数。
  • StreamReassembler 中存在一个 ByteStream 用于输出,当 StreamReassembler 知道了流的下一个字节,它就会将其写入至 ByteStream 中。

需要注意的是,传入的子串中:

  • 子串之间可能相互重复,存在重叠部分

    但假设重叠部分数据完全重复。

    不存在某些 index 下的数据在某个子串中是一种数据,在另一个子串里又是另一种数据。

    重叠部分的处理最为麻烦。

  • 可能会传一些已经被装配了的数据

  • 如果 ByteStream 已满,则必须暂停装配,将未装配数据暂时保存起来

除了上面的要求以外,容量 Capacity 需要严格限制:

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-34kDAJL2-1676857165397)(null)]

为了便于说明,将图中的绿色区域称为 ByteStream,将图中**存放红色区域的内存范围(即 first unassembled - first unacceptable)**称为 Unassembled_strs。

CS144 要求将 ByteStream + Unassembled_strs 的内存占用总和限制在 Reassember 中构造函数传入的 capacity 大小。因此我们在构造 Reassembler 时,需要既将传入的 capacity 参数设置为 ByteStream的缓冲区大小上限,也将其设置为first unassembled - first unacceptable的范围大小,以避免极端情况下的内存使用。

注意:first unassembled - first unacceptable的范围大小,并不等同于存放尚未装配子串的结构体内存大小上限,别混淆了。

Capacity 这个概念很重要,因为它不仅用于限制高内存占用,而且它还会起到流量控制的作用(见 lab2)。

本节实验需要安装 pcap 库和 pcap-dev 库才能正常编译

sudo apt-get install libpcap-dev

在新的 Segment 到来的时候,如果他能和已储存的包 “合并” 的话,我们可以不更改已储存的包,而是把这个新包修剪一下,利用 BufferPlus 修剪前后缀的两个函数去掉它的重复的部分


class StreamBlock {
  private:
    BufferPlus _buffer{};
    size_t _begin_index;

  public:
    StreamBlock(const int begin, std::string &&str) noexcept
      : _buffer(std::move(str)), _begin_index(begin) {};

    StreamBlock(const StreamBlock &Other) noexcept
      : _buffer(Other._buffer), _begin_index(Other._begin_index) {};

    StreamBlock(const int begin, const Buffer &data) noexcept
      : _buffer(data), _begin_index(begin) {};

    bool operator<(const StreamBlock sb) const { return begin() < sb.begin(); }
    inline size_t end() const { return _begin_index + _buffer.starting_offset() + _buffer.size(); }
    inline size_t len() const { return _buffer.size(); }
    inline size_t begin() const { return _begin_index + _buffer.starting_offset(); }
    BufferPlus &buffer() { return _buffer; }
    const BufferPlus &buffer() const { return _buffer; }
};

定义一个名为 StreamBlock

它包含了一个私有成员 _buffer,类型为 BufferPlus,另一个私有成员 _begin_index,类型为 size_t。类的定义中包含了三个构造函数:

  • 第一个构造函数接受两个参数:一个整数 begin 和一个右值引用类型的 std::string 对象 str。它使用 std::movestr 移动到 _buffer 成员中,并将 _begin_index 初始化为 begin
  • 第二个构造函数接受一个参数:另一个 StreamBlock 类型的对象 Other。它将 Other_buffer_begin_index 成员的值分别赋值给当前对象的 _buffer_begin_index 成员。
  • 第三个构造函数接受两个参数:一个整数 begin 和一个 Buffer 类型的对象 data。它将 data 的值复制到 _buffer 成员中,并将 _begin_index 初始化为 begin

该类还包含了四个公共成员函数:

  • 一个重载了小于号 < 的运算符,用于比较两个 StreamBlock 对象的起始位置,返回值为布尔类型。
  • 一个返回 StreamBlock 对象的结束位置的函数 end(),返回值为 size_t 类型。
  • 一个返回 StreamBlock 对象的长度的函数 len(),返回值为 size_t 类型。
  • 一个返回 StreamBlock 对象的起始位置的函数 begin(),返回值为 size_t 类型。

最后,类中还有两个 BufferPlus 类型的成员函数 buffer()buffer() const,用于返回 _buffer 成员。前一个是非 const 成员函数,可以修改 _buffer 成员的值,后一个是 const 成员函数,不允许修改 _buffer 成员的值。

//! \brief A class that assembles a series of excerpts from a byte stream (possibly out of order,
//! possibly overlapping) into an in-order byte stream.
class StreamReassembler {
  private:
    // Your code here -- add private members as necessary.

    ByteStream _output;  //!< The reassembled in-order byte stream
    size_t _capacity;    //!< The maximum number of bytes
    size_t _first_uass;  // index of segment waiting for
    size_t _unassembled_bytes;
    bool _eof;           // whether _eof_ is effecitve
    size_t _eof_idx;     // where the eof is
    std::set<StreamBlock> _blocks;

    //! Merge the two blocks "blk" and "new_block"
    //! the result will stored in new_block
    //! nothing happens if two blocks can't merge
    //! return ture if merge happens, false otherwise

    //! add "to_add" blocks to set blocks
    //! merge all the blocks mergeable
    inline void add_block(StreamBlock &new_block);

    bool overlap(const StreamBlock &blk, const StreamBlock &new_blk) const;

    //! Write the first block to the stream, this block should begin at  '_first_uass'
    inline void write_to_stream();

    //! Check if eof is written to the stream
    //! If true, end the stream
    inline void EOFcheck();

  public:
    //! \brief Construct a `StreamReassembler` that will store up to `capacity` bytes.
    //! \note This capacity limits both the bytes that have been reassembled,
    //! and those that have not yet been reassembled.
    StreamReassembler(const size_t capacity);

    //! \brief Receive a substring and write any newly contiguous bytes into the stream.
    //!
    //! The StreamReassembler will stay within the memory limits of the `capacity`.
    //! Bytes that would exceed the capacity are silently discarded.
    //!
    //! \param data the substring
    //! \param index indicates the index (place in sequence) of the first byte in `data`
    //! \param eof the last byte of `data` will be the last byte in the entire stream
    void push_substring(const std::string &data, const uint64_t index, const bool eof);
    void push_substring(const Buffer &data, const size_t index, const bool eof);

    //! \name Access the reassembled byte stream
    //!@{
    const ByteStream &stream_out() const { return _output; }
    ByteStream &stream_out() { return _output; }
    //!@}

    uint64_t first_unassembled() const;

    //! The number of bytes in the substrings stored but not yet reassembled
    //!
    //! \note If the byte at a particular index has been pushed more than once, it
    //! should only be counted once for the purpose of this function.
    size_t unassembled_bytes() const;

    //! \brief Is the internal state empty (other than the output stream)?
    //! \returns `true` if no substrings are waiting to be assembled
    bool empty() const;
};

StreamReassembler 的类,用于将一个字节流中的多个子字符串(可能无序、可能重叠)重新组装成有序的字节流。类中包含了一些私有成员和公有成员。

类的私有成员包括:

  • ByteStream _output:存储重新组装后的有序字节流。
  • size_t _capacityStreamReassembler 可以存储的最大字节数。
  • size_t _first_uass:未组装的第一个字节在原始字节流中的索引。
  • size_t _unassembled_bytes:已接收但未组装的字节数。
  • bool _eof:标记是否已经收到 EOF(文件结束)。
  • size_t _eof_idx:EOF 在原始字节流中的索引。
  • std::set<StreamBlock> _blocks:存储已接收但未组装的数据块。

类的公有成员包括:

  • StreamReassembler(const size_t capacity):构造函数,创建一个 StreamReassembler 实例,设置它的最大容量为 capacity
  • void push_substring(const std::string &data, const uint64_t index, const bool eof):将子字符串 data 添加到 StreamReassembler 中,并将任何新接收到的连续字节写入到 _output 中。index 表示 data 中第一个字节在原始字节流中的索引,eof 表示 data 是否包含文件结束符。
  • const ByteStream &stream_out() constByteStream &stream_out():获取 _output 中存储的有序字节流。
  • uint64_t first_unassembled() const:获取未组装的第一个字节在原始字节流中的索引。
  • size_t unassembled_bytes() const:获取已接收但未组装的字节数。
  • bool empty() const:判断 StreamReassembler 是否为空,即判断是否有待组装的子字符串。
//! \details This function check if eof is written to the stream
inline void StreamReassembler::EOFcheck() {
    if (!_eof) {
        return; 
    }
    
    if (static_cast<size_t>(_eof_idx) == _first_uass) {
        _output.end_input();
    }
}

这是 StreamReassembler 类的成员函数,用于检查文件结束标记是否已写入输出流。该函数首先检查 _eof 是否为 true ,这意味着文件结束标记已写入流。如果 _eof 不为 true,函数立即返回,不做任何操作。
如果 _eoftrue,该函数将检查流中文件结束标记的索引 _eof_idx 是否等于 _first_uass_first_uass 是流中第一个未使用的字节的索引,这意味着在 _first_uass 之前的所有字节都已被输出流使用。如果 _eof_idx 等于 _first_uass,这意味着流中的所有字节,包括文件结束标记,都已被输出流占用。在这种情况下,函数调用 _output 对象的 end_input() 函数,这表明流中没有更多的输入。


//! \details This function write the first block into the stream,
//! the first block should begin at '_first_uass'
inline void StreamReassembler::write_to_stream() {
    while (!_blocks.empty()) {
        auto block = *_blocks.begin();
        if (block.begin() != _first_uass) {
            return;
        }

        size_t bytes_written = _output.write(block.buffer());

        if (bytes_written == 0) {
            return;
        }

        _first_uass += bytes_written;
        _unassembled_bytes -= bytes_written;
        _blocks.erase(move(_blocks.begin()));

        // partially written
        if (bytes_written != block.len()) {
            block.buffer().remove_prefix(move(bytes_written));
            _blocks.insert(move(block));
        }
    }
}

write_to_stream,作用是将数据块写入流中。根据代码中的注释,这个函数写入的是第一个块,即起始位置为 _first_uass 的块。

这个函数首先进入一个 while 循环,只要数据块队列 _blocks 不为空,就会一直循环。然后,它取出 _blocks 队列中的第一个块,如果这个块的起始位置不等于 _first_uass,说明还没有到该块,就直接返回。

如果该块的起始位置等于 _first_uass,则将该块写入流中,并记录已写入的字节数,更新 _first_uass,减少 _unassembled_bytes 的值(表示还未组装的字节数),然后将该块从队列中删除。

如果该块只写入了部分数据,即字节数小于该块的长度,就将该块的缓冲区前缀截去已写入的字节数,并将该块重新插入到队列中。然后,这个函数就继续处理下一个数据块,直到队列为空或者写入的字节数为 0。


//! \details This function add "to_add" blocks to set blocks
// merge all the blocks mergeable
inline void StreamReassembler::add_block(StreamBlock &new_block) {
    if (new_block.len() == 0) {
        return;
    }

    vector<StreamBlock> blks_to_add;
    blks_to_add.emplace_back(move(new_block));

    if (!_blocks.empty()) {
        auto nblk = blks_to_add.begin();
        auto iter = _blocks.lower_bound(*nblk);
        auto prev = iter;

        while (iter != _blocks.end() && overlap(*iter, *nblk)) {
            if ((*iter).end() >= (*nblk).end()) {
                (*nblk).buffer().remove_suffix((*nblk).end() - (*iter).begin());
                break;
            }
            
            StreamBlock last(*nblk);
            (*nblk).buffer().remove_suffix((*nblk).end() - (*iter).begin());
            last.buffer().remove_prefix((*iter).end() - (*nblk).begin());
            blks_to_add.push_back(move(last));
            nblk = blks_to_add.end();
            nblk -- ;
            iter ++ ;
        }

        // compare with prevs
        // check one previous block is enough
        if (prev != _blocks.begin()) {
            prev -- ;
            nblk = blks_to_add.begin();

            if (overlap(*nblk, *prev)) {
                (*nblk).buffer().remove_prefix((*prev).end() - (*nblk).begin());
            }   
        }
    }

    for (auto &blk : blks_to_add) {
        if (blk.len() != 0) {
            _blocks.emplace(move(blk));
            _unassembled_bytes += blk.len();
        }
    }
}

这段代码是 StreamReassembler 类中的 add_block 函数,用于向一个缓存区 _blocks 中添加新的数据块。这些数据块需要和缓存区中的已有数据块合并,如果新数据块和已有数据块可以合并成一个连续的数据块,则合并它们。如果新数据块和已有数据块不能合并,就将新数据块插入到缓存区 _blocks 中。

具体实现过程如下:

  • 如果新数据块的长度为 0,直接返回。
  • 如果缓存区 _blocks 不为空,则从头开始遍历它,如果有数据块和新数据块可以合并,就将它们合并成一个数据块。如果遍历到一个数据块和新数据块不能合并,就停止遍历。
  • 如果新数据块可以和一个已有数据块合并,就将新数据块和该数据块合并。
  • 如果新数据块和已有数据块不能合并,就插入新数据块到缓存区 _blocks 中。

//! \details This function check if the two blocks have overlap part
bool StreamReassembler::overlap(const StreamBlock &blk, const StreamBlock &new_blk) const {
    if (blk.begin() < new_blk.begin()) {
        return new_blk.begin() < blk.end();
    }

    return blk.begin() < new_blk.end();
}

这个函数用于判断两个数据块 (StreamBlock)是否有重叠的部分。重叠的部分指的是两个数据块在数据流中存在相同的字节范围。函数接收两个参数,blknew_blk,分别代表已有的数据块和待添加的新数据块。如果这两个数据块有重叠的部分,则返回 true,否则返回 false

具体实现中,首先比较 blk 的起始位置和 new_blk 的起始位置,如果 blk 的起始位置在 new_blk 的起始位置之前,那么只需比较 new_blk 的起始位置是否在 blk 的结束位置之前;否则,只需比较 blk 的起始位置是否在 new_blk 的结束位置之前。如果满足这两个条件之一,则说明这两个数据块存在重叠部分,返回 true;否则返回 false


//! \details This function accepts a substring (aka a segment) of bytes,
//! possibly out-of-order, from the logical stream, and assembles any newly
//! contiguous substrings and writes them into the output stream in order.
void StreamReassembler::push_substring(const string &data, const size_t index, const bool eof) {
    // the data that have been reassembled
    if (index + data.size() < _first_uass) {
        return;
    }

    if (eof && !_eof) {
        _eof = true;
        _eof_idx = index + data.size();
    }

    StreamBlock blk(index, move(string(data)));

    // if a part of the data have been reassembled
    if (index < _first_uass) {
        blk.buffer().remove_prefix(_first_uass - index);
    }

    // if a part of the data out of the capacity
    if (index + data.size() > _capacity + _first_uass) {
        blk.buffer().remove_suffix(index + data.size() - _capacity - _first_uass);
    }

    add_block(blk);
    write_to_stream();
    EOFcheck();
}

这段代码是一个函数,用于处理来自逻辑流的子字符串(即段)数据,该数据可能是乱序的,然后组装任何新的连续的子字符串并按顺序将其写入输出流。该函数的实现分为三个步骤:

  1. 首先检查输入的数据是否已经在已组装的数据范围内,如果已经在范围内,则直接返回,不做处理。
  2. 如果输入数据已经包含 EOF 标记,将标记设置为 true,记录标记的位置,以便后续处理。
  3. 根据输入数据的索引和内容创建一个 StreamBlock 对象。如果输入数据的索引小于已组装数据的范围,将数据前面的部分丢弃;如果输入数据的索引加上数据长度超出了容量,将数据后面的部分丢弃。然后将新的 StreamBlock 添加到已有的 StreamBlock 集合中,并检查是否有连续的 StreamBlock,将它们合并成一个更大的 StreamBlock。接着将可写入的数据写入到输出流中,并检查是否已经写入了 EOF 标记。

总之,这个函数的作用是将输入数据组装成完整的数据块,然后将这些数据块按顺序写入到输出流中,同时处理 EOF 标记。

完整代码

  • stream_reassembler.hh
  • stream_reassembler.cc

“stream_reassembler.hh”

#ifndef SPONGE_LIBSPONGE_STREAM_REASSEMBLER_HH
#define SPONGE_LIBSPONGE_STREAM_REASSEMBLER_HH

#include "byte_stream.hh"
#include "buffer.hh"

#include <cstdint>
#include <string>
#include <set>

class StreamBlock {
  private:
    BufferPlus _buffer{};
    size_t _begin_index;

  public:
    StreamBlock(const int begin, std::string &&str) noexcept
      : _buffer(std::move(str)), _begin_index(begin) {};

    StreamBlock(const StreamBlock &Other) noexcept
      : _buffer(Other._buffer), _begin_index(Other._begin_index) {};

    StreamBlock(const int begin, const Buffer &data) noexcept
      : _buffer(data), _begin_index(begin) {};

    bool operator<(const StreamBlock sb) const { return begin() < sb.begin(); }
    inline size_t end() const { return _begin_index + _buffer.starting_offset() + _buffer.size(); }
    inline size_t len() const { return _buffer.size(); }
    inline size_t begin() const { return _begin_index + _buffer.starting_offset(); }
    BufferPlus &buffer() { return _buffer; }
    const BufferPlus &buffer() const { return _buffer; }
};

//! \brief A class that assembles a series of excerpts from a byte stream (possibly out of order,
//! possibly overlapping) into an in-order byte stream.
class StreamReassembler {
  private:
    // Your code here -- add private members as necessary.

    ByteStream _output;  //!< The reassembled in-order byte stream
    size_t _capacity;    //!< The maximum number of bytes
    size_t _first_uass;  // index of segment waiting for
    size_t _unassembled_bytes;
    bool _eof;           // whether _eof_ is effecitve
    size_t _eof_idx;     // where the eof is
    std::set<StreamBlock> _blocks;

    //! Merge the two blocks "blk" and "new_block"
    //! the result will stored in new_block
    //! nothing happens if two blocks can't merge
    //! return ture if merge happens, false otherwise

    //! add "to_add" blocks to set blocks
    //! merge all the blocks mergeable
    inline void add_block(StreamBlock &new_block);

    bool overlap(const StreamBlock &blk, const StreamBlock &new_blk) const;

    //! Write the first block to the stream, this block should begin at  '_first_uass'
    inline void write_to_stream();

    //! Check if eof is written to the stream
    //! If true, end the stream
    inline void EOFcheck();

  public:
    //! \brief Construct a `StreamReassembler` that will store up to `capacity` bytes.
    //! \note This capacity limits both the bytes that have been reassembled,
    //! and those that have not yet been reassembled.
    StreamReassembler(const size_t capacity);

    //! \brief Receive a substring and write any newly contiguous bytes into the stream.
    //!
    //! The StreamReassembler will stay within the memory limits of the `capacity`.
    //! Bytes that would exceed the capacity are silently discarded.
    //!
    //! \param data the substring
    //! \param index indicates the index (place in sequence) of the first byte in `data`
    //! \param eof the last byte of `data` will be the last byte in the entire stream
    void push_substring(const std::string &data, const uint64_t index, const bool eof);
    void push_substring(const Buffer &data, const size_t index, const bool eof);

    //! \name Access the reassembled byte stream
    //!@{
    const ByteStream &stream_out() const { return _output; }
    ByteStream &stream_out() { return _output; }
    //!@}

    uint64_t first_unassembled() const;

    //! The number of bytes in the substrings stored but not yet reassembled
    //!
    //! \note If the byte at a particular index has been pushed more than once, it
    //! should only be counted once for the purpose of this function.
    size_t unassembled_bytes() const;

    //! \brief Is the internal state empty (other than the output stream)?
    //! \returns `true` if no substrings are waiting to be assembled
    bool empty() const;
};

#endif  // SPONGE_LIBSPONGE_STREAM_REASSEMBLER_HH

“stream_reassembler.cc”

#include "stream_reassembler.hh"

// Dummy implementation of a stream reassembler.

// For Lab 1, please replace with a real implementation that passes the
// automated checks run by `make check_lab1`.

// You will need to add private members to the class declaration in `stream_reassembler.hh`

template <typename... Targs>
void DUMMY_CODE(Targs &&... /* unused */) {}

using namespace std;

StreamReassembler::StreamReassembler(const size_t capacity)
    : _output(capacity)
    , _capacity(capacity)
    , _first_uass(0)
    , _unassembled_bytes(0)
    , _eof(false)
    , _eof_idx(0)
    , _blocks() {}

//! \details This function accepts a substring (aka a segment) of bytes,
//! possibly out-of-order, from the logical stream, and assembles any newly
//! contiguous substrings and writes them into the output stream in order.
void StreamReassembler::push_substring(const string &data, const size_t index, const bool eof) {
    // the data that have been reassembled
    if (index + data.size() < _first_uass) {
        return;
    }

    if (eof && !_eof) {
        _eof = true;
        _eof_idx = index + data.size();
    }

    StreamBlock blk(index, move(string(data)));

    // if a part of the data have been reassembled
    if (index < _first_uass) {
        blk.buffer().remove_prefix(_first_uass - index);
    }

    // if a part of the data out of the capacity
    if (index + data.size() > _capacity + _first_uass) {
        blk.buffer().remove_suffix(index + data.size() - _capacity - _first_uass);
    }

    add_block(blk);
    write_to_stream();
    EOFcheck();
}

void StreamReassembler::push_substring(const Buffer &data, const size_t index, const bool eof) {
    // the data that have been reassembled
    if (index + data.size() < _first_uass) {
        return;
    }

    if (eof && !_eof) {
        _eof = true;
        _eof_idx = index + data.size();
    }

    StreamBlock blk(index, move(data));

    // if a part of the data have been reassembled
    if (index < _first_uass) {
        blk.buffer().remove_prefix(_first_uass - index);
    }

    // if a part of the data out of the capacity
    if (index + data.size() > _capacity + _first_uass) {
        blk.buffer().remove_suffix(index + data.size() - _capacity - _first_uass);
    }

    add_block(blk);
    write_to_stream();
    EOFcheck();
}

//! \details This function check if eof is written to the stream
inline void StreamReassembler::EOFcheck() {
    if (!_eof) {
        return; 
    }
    
    if (static_cast<size_t>(_eof_idx) == _first_uass) {
        _output.end_input();
    }
}

//! \details This function write the first block into the stream,
//! the first block should begin at '_first_uass'
inline void StreamReassembler::write_to_stream() {
    while (!_blocks.empty()) {
        auto block = *_blocks.begin();
        if (block.begin() != _first_uass) {
            return;
        }

        size_t bytes_written = _output.write(block.buffer());

        if (bytes_written == 0) {
            return;
        }

        _first_uass += bytes_written;
        _unassembled_bytes -= bytes_written;
        _blocks.erase(move(_blocks.begin()));

        // partially written
        if (bytes_written != block.len()) {
            block.buffer().remove_prefix(move(bytes_written));
            _blocks.insert(move(block));
        }
    }
}

//! \details This function add "to_add" blocks to set blocks
// merge all the blocks mergeable
inline void StreamReassembler::add_block(StreamBlock &new_block) {
    if (new_block.len() == 0) {
        return;
    }

    vector<StreamBlock> blks_to_add;
    blks_to_add.emplace_back(move(new_block));

    if (!_blocks.empty()) {
        auto nblk = blks_to_add.begin();
        auto iter = _blocks.lower_bound(*nblk);
        auto prev = iter;

        while (iter != _blocks.end() && overlap(*iter, *nblk)) {
            if ((*iter).end() >= (*nblk).end()) {
                (*nblk).buffer().remove_suffix((*nblk).end() - (*iter).begin());
                break;
            }
            
            StreamBlock last(*nblk);
            (*nblk).buffer().remove_suffix((*nblk).end() - (*iter).begin());
            last.buffer().remove_prefix((*iter).end() - (*nblk).begin());
            blks_to_add.push_back(move(last));
            nblk = blks_to_add.end();
            nblk -- ;
            iter ++ ;
        }

        // compare with prevs
        // check one previous block is enough
        if (prev != _blocks.begin()) {
            prev -- ;
            nblk = blks_to_add.begin();

            if (overlap(*nblk, *prev)) {
                (*nblk).buffer().remove_prefix((*prev).end() - (*nblk).begin());
            }   
        }
    }

    for (auto &blk : blks_to_add) {
        if (blk.len() != 0) {
            _blocks.emplace(move(blk));
            _unassembled_bytes += blk.len();
        }
    }
}

//! \details This function check if the two blocks have overlap part
bool StreamReassembler::overlap(const StreamBlock &blk, const StreamBlock &new_blk) const {
    if (blk.begin() < new_blk.begin()) {
        return new_blk.begin() < blk.end();
    }

    return blk.begin() < new_blk.end();
}

uint64_t StreamReassembler::first_unassembled() const { return _first_uass; }

size_t StreamReassembler::unassembled_bytes() const { return _unassembled_bytes; }

bool StreamReassembler::empty() const { return _unassembled_bytes == 0; }

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/359411.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

在superset中快速制作报表或仪表盘

在中小型企业&#xff0c;当下需要快速迭代、快速了解运营效果的业务&#xff0c;急需一款开源、好用、能快速迭代生产的报表系统。 老板很关心&#xff0c;BI工程师很关心&#xff0c;同时系统开发人员也同样关心&#xff0c;一个好的技术选型往往能够帮助公司减少很多成本&a…

软件持续测试的未来题】

测试是软件开发生命周期(SDLC)的重要组成部分。SDLC 的每个阶段都应包含测试&#xff0c;以获得更快的反馈并提高产品质量。如果以有效的方式实施和使用测试自动化&#xff0c;那么它可以为您带来出色的结果&#xff0c;而持续测试是正确的方法.。预计在2018-2023 年的预测期内…

CCNP350-401学习笔记(251-300题)

251、 Which IPv6 OSPF network type is applied to interface Fa0/0 of R2 by default? A. multipointB. broadcast C. Ethernet D. point-to-point 252、Which EIGRP feature allows the use of leak maps? A. neighborB. Stub C. offset-list D. address-family 253、W…

IMS应用领域|IMS连接器系统使自动驾驶成为可能

IMS连接器系统使自动驾驶成为可能极高的创新力和绝对的产品质量&#xff1a;作为高频接插件接口的创新性开发合伙人&#xff0c;我们的产品满足汽车工业对信息娱乐、娱乐、远程信息处理技术、车载电脑和智能汽车天线等不同产品解决方案的多种要求。我们为确保系统的可靠性做出了…

【学习笔记】Docker(一)

Docker为什么会出现&#xff1f;问题&#xff1a;环境配置最初开发上线都需要配置&#xff0c;并且非常麻烦。每一个机器都需要部署环境——费时费力、我在我的电脑上可以运行、版本更新&#xff0c;导致服务不可用传统&#xff1a;开发打包jar包交给运维来做现在&#xff1a;开…

面试经常被问悲观锁和乐观锁?什么是cas?来我花3分钟时间告诉你

锁大家都知道吧&#xff0c;多线程访问资源会存在竞争&#xff0c;那么就需要加锁进而让多个线程一个一个访问。 比如有一个房间&#xff0c;一次只能进一个人&#xff0c;现在有十个人都想进去怎么办&#xff1f; 对&#xff0c;加锁。拿一把钥匙&#xff0c;谁抢到钥匙谁就…

5.4 BGP地址聚合

5.3.1配置BGP地址聚合 1. 实验目的 熟悉BGP地址聚合的应用场景掌握BGP地址聚合的配置方法2. 实验拓扑 实验拓扑如图5-4所示: 图5-4:配置BGP地址聚合 3. 实验步骤 (1)配置IP地址 R1的配置 <Huawe…

skywalking window版使用

文章目录 目录 文章目录 前言 一、skywalking 二、使用步骤 2.1 使用mysql持久化监控数据 2.2 接入到idea的单个微服务和多个微服务 2.3 自定义skywalking的链路追踪 总结 前言 skywalking是一个国产开源框架&#xff0c;是分布式系统的应用程序性能监视工具&#xff0c;专为…

python基于django电影院购票系统(含选座功能

可定制框架:ssm/Springboot/vue/python/PHP/小程序/安卓均可开发 目录 1 绪论 1 1.1课题背景 1 1.2课题研究现状 1 1.3初步设计方法与实施方案 2 1.4本文研究内容 2 2 系统开发环境 4 2.项目介绍影城管理系统的主要使用者分为管理员和用户&#xff0c;实现功能包括管理员&…

UVM实战--加法器

前言 这里以UVM实战&#xff08;张强&#xff09;第二章为基础修改原有的DUT&#xff0c;将DUT修改为加法器&#xff0c;从而修改代码以使得更加深入的了解各个组件的类型和使用。 一. 组件的基本框架 和第二章的平台的主要区别点 &#xff08;1&#xff09;有两个transactio…

全15万字丨PyTorch 深度学习实践、基础知识体系全集;忘记时,请时常回顾。

✨ ✨我们抬头便看到星光&#xff0c;星星却穿越了万年. ✨ ✨ &#x1f3af;作者主页&#xff1a;追光者♂ &#x1f338;个人简介&#xff1a;在读计算机专业硕士研究生、CSDN-人工智能领域新星创作者&#x1f3c6;、2022年度博客之星人工智能领域TOP4&#x1f31f;、阿里云…

国产哪种蓝牙耳机最好?口碑最好的国产蓝牙耳机推荐

随着近几年蓝牙耳机的飞速发展&#xff0c;国产蓝牙耳机也逐渐突破技术壁垒&#xff0c;被更多用户熟知、认可。但&#xff0c;国产蓝牙耳机品牌的多样化&#xff0c;也为人们的选择增添了不少困难。那么&#xff0c;国产哪种蓝牙耳机最好&#xff1f;下面&#xff0c;我来给大…

详解可变形注意力模块(Deformable Attention Module)

Deformable Attention&#xff08;可变形注意力&#xff09;首先在2020年10月初商汤研究院的《Deformable DETR: Deformable Transformers for End-to-End Object Detection》论文中提出&#xff0c;在2022CVPR中《Vision Transformer with Deformable Attention》提出应用了De…

JavaEE简单示例——再插入的同时获取插入的主键列

简单介绍&#xff1a; 在某些时候&#xff0c;我们在插入完成一条语句之后&#xff0c;我们会想要返回之前插入的这条语句的主键列的数据&#xff0c;进行下一步的展示或者修改&#xff0c;我们就可以使用MyBatis的主键回写功能&#xff0c;帮助我们获取插入成功的一条数据的主…

Microsoft Dynamics 365:导入License到服务层,通过Business Central Administration Shell

本文主要是Microsoft Dynamics 365的License导入的图解干货&#xff0c;不多赘述&#xff0c;直接上图&#xff1a;第一步&#xff1a;准备好的License文件放在你喜欢的目录下第二步&#xff1a;到开始程序里找到并打开 Business Central Administration Shell3.第三步&#xf…

klog bug:仅输出到日志文件,不打印到命令行/stderr

一、 问题描述 开发k8s插件&#xff0c;使用klog作为日志工具&#xff0c;开发完成发现在设置将日志打印到文件后&#xff0c;Error级别的日志信息仍然会输出到命令行&#xff0c;过多日志打印会使后期将服务部署于docker有卡死的风险&#xff08;docker的bug&#xff0c;日志…

美国原装二手keysight E4980A(安捷伦)2MHZ LCR表

Agilent E4980A、Keysight E4980A、LCR 表&#xff0c;20 Hz - 2 MHz E4980A 是 Agilent 的 2 MHz LCR 表。LCR表是一种电子测试设备&#xff0c;用于测量电子元件的电感&#xff08;L&#xff09;、电容&#xff08;C&#xff09;和电阻&#xff08;R&#xff09;。LCR 表可…

W800|iot|HLK-W800-KIT-PRO|AliOS|阿里云| |官方demo|学习(1):板载AliOS系统快速上手

板载系统简介 HLK-W800-KIT-PRO 是海凌科电子面向开发者&#xff0c;采用了联盛德 w800 方案&#xff0c;带有一个RGB三色灯&#xff0c;集成了 CHT8305C 温湿度传感器的多功能开发板&#xff0c;用户可以在上面学习、研究嵌入式系统和物联网产品的开发&#xff0c;本套设备运行…

js中的隐式类型转换有哪些

目录一、隐式类型转换条件二、 的隐式类型转换三、 的隐式类型转换四、object 的隐式类型转换探讨 object 的隐式转换执行顺序探讨 Symbol.toPrimitive 属性如何将对象转换为原始值在前端js这门动态弱类型语言中&#xff0c;不仅存在着显示类型转换&#xff0c;还存在许多隐式类…

原画培训机构排名前十名,最新10大原画培训机构

原画培训机构排名出来啦&#xff0c;最新10大原画培训机构出炉&#xff0c;快来看看游戏原画培训机构有哪些吧&#xff0c;对于不知道如何选择靠谱的原画培训班&#xff0c;可以借鉴和参考一下&#xff01; 1、轻微课 国内人气很高的板绘学习平台&#xff0c;主打课程有日系插…