C++编程:利用环形缓冲区优化 TCP 发送流程,避免 Short Write 问题

news2024/12/23 8:25:48

文章目录

    • 1. 什么是 Short Write 问题?
    • 2. 如何解决 Short Write 问题?
      • 2.1 方法 1:将 Socket 设置为阻塞模式
      • 2.2 方法 2:用户态维护发送缓冲区
    • 3. 用户态维护发送缓冲区实现
      • 3.1 核心要点
      • 3.2 代码实现
      • 3.3 测试程序
    • 参考文档

1. 什么是 Short Write 问题?

在 TCP 编程中,short write 问题指的是在调用 sendwrite 系统调用时,实际发送的数据量比预期要少。这通常是因为网络协议栈发送缓冲区的空间不足,导致不能一次性发送完整的数据。遇到这种情况时,系统调用会返回实际发送的字节数,并将 errno 设置为 EAGAIN,表示缓冲区没有足够的空间来继续发送数据。

2. 如何解决 Short Write 问题?

针对 EPOLL 模型 中的 LT(Level Triggered)模式,可以采取以下几种方案来解决 short write 问题:

2.1 方法 1:将 Socket 设置为阻塞模式

将 Socket 设置为阻塞模式时,send 系统调用会一直阻塞,直到有足够的缓冲区空间发送完整的数据。这种方法能够避免 short write 问题,但会导致线程阻塞,从而影响性能。因此,通常不推荐在高并发或需要高吞吐量的场景中使用此方法。

2.2 方法 2:用户态维护发送缓冲区

更推荐的方法是用户态维护一个发送缓冲区,并结合 EPOLLONESHOTEPOLLOUT 事件来控制数据发送。这种方法不需要阻塞线程,能够有效地处理 short write 问题。

3. 用户态维护发送缓冲区实现

3.1 核心要点

使用环形缓冲区来保存待发送的数据,当系统发送缓冲区不够时,数据会被存入环形缓冲区,并在后续的 EPOLLOUT 事件触发时继续发送。

  • 环形缓冲区设计

环形缓冲区(Circular Buffer)是一个固定大小的缓存,用于暂存数据。当数据无法完全写入网络协议栈缓冲区时,可以将其暂存,并在缓冲区有足够空间时继续写入。通过注册 EPOLLOUT 事件,当缓冲区有空闲空间时,程序可以重新尝试发送数据。

  • 数据收发管理类设计

    • asyncSend:当数据网络协议栈发送缓冲区没有足够空间时,会将数据存储到环形缓冲区,并通过 EPOLLONESHOTEPOLLOUT 事件确保数据能在后续时刻继续发送。

    • doSend:此函数会被 EPOLLOUT 事件触发,它从环形缓冲区中取出数据并尝试发送。如果发送成功,则释放相应的缓冲区空间;如果发送失败,且错误码为 EAGAINEINTER,则会重试。

3.2 代码实现

#include <atomic>
#include <cstring>
#include <memory>
#include <sys/socket.h>
#include <sys/epoll.h>
#include <unistd.h>
#include <cstdio>
#include <mutex>
#include <cassert>
#include <fcntl.h>

class LockFreeBytesBuffer {
 public:
  static const std::size_t kBufferSize = 10240U;  // 缓冲区大小

  LockFreeBytesBuffer() noexcept : reader_index_(0U), writer_index_(0U) {
    std::memset(buffer_, 0, kBufferSize);
  }

  bool append(const char* data, std::size_t length) noexcept;
  std::size_t beginRead(const char** target) noexcept;
  void endRead(std::size_t length) noexcept;

 private:
  char buffer_[kBufferSize];
  std::atomic<std::size_t> reader_index_;
  std::atomic<std::size_t> writer_index_;
};

bool LockFreeBytesBuffer::append(const char* data, std::size_t length) noexcept {
  const std::size_t current_write_index = writer_index_.load(std::memory_order_relaxed);
  const std::size_t current_read_index = reader_index_.load(std::memory_order_acquire);

  const std::size_t free_space = (current_read_index + kBufferSize - current_write_index - 1U) % kBufferSize;
  if (length > free_space) {
    return false;  // 缓冲区满
  }

  const std::size_t pos = current_write_index % kBufferSize;
  const std::size_t first_part = std::min(length, kBufferSize - pos);
  std::memcpy(&buffer_[pos], data, first_part);
  std::memcpy(&buffer_[0], data + first_part, length - first_part);

  writer_index_.store(current_write_index + length, std::memory_order_release);
  return true;
}

std::size_t LockFreeBytesBuffer::beginRead(const char** target) noexcept {
  const std::size_t current_read_index = reader_index_.load(std::memory_order_relaxed);
  const std::size_t current_write_index = writer_index_.load(std::memory_order_acquire);

  const std::size_t available_data = (current_write_index - current_read_index) % kBufferSize;
  if (available_data == 0U) {
    return 0U;  // 缓冲区空
  }

  const std::size_t pos = current_read_index % kBufferSize;
  *target = &buffer_[pos];
  return std::min(available_data, kBufferSize - pos);
}

void LockFreeBytesBuffer::endRead(std::size_t length) noexcept {
  const std::size_t current_read_index = reader_index_.load(std::memory_order_relaxed);
  reader_index_.store(current_read_index + length, std::memory_order_release);
}

class SocketContext {
 public:
  SocketContext(int epoll_fd, int sock_fd)
      : epoll_fd_(epoll_fd), sock_fd_(sock_fd) {
    setNonblocking();
    addFd();
  }

  ~SocketContext() {
    removeFd();
    close(sock_fd_);
  }

  bool asyncSend(const char* data, int size) {
    bool result = buffer_.append(data, static_cast<std::size_t>(size));
    if (result) {
      modifyEvent(false, true);  // 修改 EPOLLONESHOT 和 EPOLLOUT
    }
    return result;
  }

  int doRecv() {
    char buffer[1024] = {};
    int count = read(sock_fd_, buffer, sizeof(buffer));
    if (count <= 0) {
      return count;  // 读取失败或连接关闭
    }

    modifyEvent(true, false);  // 恢复 EPOLLIN 事件
    fprintf(stderr, "Received: %s\n", buffer);
    return count;
  }

  int doSend() {
    const char* pdata = nullptr;
    std::size_t data_size = buffer_.beginRead(&pdata);
    if (data_size == 0) {
      return 0;  // 没有数据可以发送
    }

    int send_size = send(sock_fd_, pdata, static_cast<int>(data_size), MSG_DONTWAIT);
    if (send_size > 0) {
      buffer_.endRead(static_cast<std::size_t>(send_size));  // 更新已发送数据
    } else if (send_size == -1 && errno != EAGAIN) {
      fprintf(stderr, "send failed, error: %s\n", strerror(errno));
    }

    return send_size;
  }

 protected:
  void setNonblocking() {
    int flags = fcntl(sock_fd_, F_GETFL, 0);
    if (flags == -1) {
      fprintf(stderr, "fcntl GETFL failed: %s\n", strerror(errno));
      return;
    }
    fcntl(sock_fd_, F_SETFL, flags | O_NONBLOCK);
  }

  void addFd() {
    struct epoll_event event;
    event.data.ptr = this;
    event.events = EPOLLIN | EPOLLONESHOT | EPOLLOUT;
    if (epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, sock_fd_, &event) == -1) {
      fprintf(stderr, "epoll_ctl add failed: %s\n", strerror(errno));
    }
  }

  void removeFd() {
    epoll_ctl(epoll_fd_, EPOLL_CTL_DEL, sock_fd_, nullptr);
  }

  inline void modifyEvent(bool in_event = true, bool out_event = true) {
    struct epoll_event event;
    event.data.ptr = this;
    event.events = EPOLLONESHOT;
    if (in_event) {
        event.events |= EPOLLIN;
    }
    if (out_event) {
        event.events |= EPOLLOUT;
    }
    epoll_ctl(epoll_fd_, EPOLL_CTL_MOD, sock_fd_, &event);
  }

 private:
  int epoll_fd_;
  int sock_fd_;
  LockFreeBytesBuffer buffer_;
};

代码说明

  • 无锁环形缓冲区LockFreeBytesBuffer 类通过原子操作(std::atomic)来确保线程安全,避免了传统的锁机制。
    更多请见:C++生产者-消费者无锁缓冲区的简单实现
  • 事件驱动机制:通过 EPOLLINEPOLLOUT 事件来控制数据的接收和发送,避免了阻塞操作。
  • 非阻塞发送:通过 send 函数的 MSG_DONTWAIT 标志来确保发送操作不会阻塞,遇到 EAGAIN 错误时会重试。

在这里插入图片描述

3.3 测试程序

#include <iostream>
#include <memory>
#include <cstring>
#include <unistd.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <sys/epoll.h>
#include <fcntl.h>
#include <cassert>

#define MAX_EVENTS 10

int createServerSocket(int port) {
  int sockfd = socket(AF_INET, SOCK_STREAM, 0);
  if (sockfd == -1) {
    fprintf(stderr, "socket creation failed: %s\n", strerror(errno));
    return -1;
  }

  sockaddr_in server_addr;
  std::memset(&server_addr, 0, sizeof(server_addr));
  server_addr.sin_family = AF_INET;
  server_addr.sin_addr.s_addr = INADDR_ANY;
  server_addr.sin_port = htons(port);

  if (bind(sockfd, (struct sockaddr*)&server_addr, sizeof(server_addr)) == -1) {
    fprintf(stderr, "bind failed: %s\n", strerror(errno));
    close(sockfd);
    return -1;
  }

  if (listen(sockfd, 5) == -1) {
    fprintf(stderr, "listen failed: %s\n", strerror(errno));
    close(sockfd);
    return -1;
  }

  return sockfd;
}

int main() {
  int epoll_fd = epoll_create1(0);
  if (epoll_fd == -1) {
    fprintf(stderr, "epoll_create1 failed: %s\n", strerror(errno));
    return -1;
  }

  int server_fd = createServerSocket(8080);
  if (server_fd == -1) {
    return -1;
  }

  // Set the server socket to non-blocking mode
  fcntl(server_fd, F_SETFL, O_NONBLOCK);

  // Add server socket to epoll
  struct epoll_event ev;
  ev.events = EPOLLIN;
  ev.data.fd = server_fd;
  if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, server_fd, &ev) == -1) {
    fprintf(stderr, "epoll_ctl: server_fd failed: %s\n", strerror(errno));
    return -1;
  }

  fprintf(stderr, "Server listening on port 8080...\n");

  while (true) {
    struct epoll_event events[MAX_EVENTS];
    int n = epoll_wait(epoll_fd, events, MAX_EVENTS, -1);

    for (int i = 0; i < n; ++i) {
      if (events[i].data.fd == server_fd) {
        // Accept new client connection
        int client_fd = accept(server_fd, NULL, NULL);
        if (client_fd == -1) {
          fprintf(stderr, "accept failed: %s\n", strerror(errno));
          continue;
        }

        fcntl(client_fd, F_SETFL, O_NONBLOCK);
        std::unique_ptr<SocketContext> client = std::make_unique<SocketContext>(epoll_fd, client_fd);

        ev.events = EPOLLIN | EPOLLOUT | EPOLLONESHOT;
        ev.data.ptr = client.get();
        if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, client_fd, &ev) == -1) {
          fprintf(stderr, "epoll_ctl: client_fd failed: %s\n", strerror(errno));
        }
      } else {
        SocketContext* client = static_cast<SocketContext*>(events[i].data.ptr);

        if (events[i].events & EPOLLIN) {
          client->doRecv();
        }

        if (events[i].events & EPOLLOUT) {
          client->doSend();
        }
      }
    }
  }

  close(server_fd);
  close(epoll_fd);
  return 0;
}

参考文档

  • tcp 解决short write问题
  • C++生产者-消费者无锁缓冲区的简单实现

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2240722.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

数据网格能替代数据仓库吗?

一、数据网格是什么&#xff1f; 数据网格&#xff1a;是一种新兴的数据管理架构和理念&#xff0c;主要用于解决大规模、复杂数据环境下的数据管理和利用问题。 核心概念&#xff1a; 1、数据即产品&#xff1a;将数据看作一种产品&#xff0c;每个数据域都要对其生产的数据负…

力扣经典面试26题删除有序数组中的重复项1

给你一个非严格递增排列的数组nums&#xff0c;请你原地删除重复出现的元素&#xff0c; 使每个元素 只出现一次&#xff0c;返回删除后数组的新长度。元素的相对顺序 应该保持 一致。然后返回 nums 中唯一元素的个数。 考虑 nums 的唯一元素的数量为 k&#xff0c; 你需要做以…

LLM: AI Mathematical Olympiad (上)

文章目录 一、项目简介二、first place 攻略三、必备知识1、COT思维链技术2、ToRA 四、first place 训练功略五、数据集构建1、COT数据集2、TIR数据集 六、数据集详细技术报告总结 本文较长分成两个部分分析 | ू•ૅω•́)ᵎᵎᵎ 第一部分&#xff1a;预备知识介绍和数据准备…

GA/T1400视图库平台EasyCVR视频融合平台HLS视频协议是什么?

在数字化时代&#xff0c;视频监控系统已成为保障安全、提升效率的关键技术。EasyCVR视频融合云平台&#xff0c;作为TSINGSEE青犀视频在“云边端”架构体系中的重要一环&#xff0c;专为大中型项目设计&#xff0c;提供了一个跨区域、网络化的视频监控综合管理系统平台。它不仅…

给阿里云OSS绑定域名并启用SSL

为什么要这么做&#xff1f; 问题描述&#xff1a; 当用户通过 OSS 域名访问文件时&#xff0c;OSS 会在响应头中增加 Content-Disposition: attachment 和 x-oss-force-download: true&#xff0c;导致文件被强制下载而不是预览。这个问题特别影响在 2022/10/09 之后新开通 OS…

`node-gyp` 无法找到版本为 `10.0.19041.0` 的 Windows SDK

从你提供的错误信息来看&#xff0c;问题出在 node-gyp 无法找到版本为 10.0.19041.0 的 Windows SDK。我们可以尝试以下几种方法来解决这个问题&#xff1a; 完整示例 方法 1&#xff1a;安装指定版本的 Windows SDK 下载并安装 Windows SDK&#xff1a; 访问 Windows SDK 下…

【Hive】【HiveQL】【大数据技术基础】 实验四 HBase shell命令实验

实验四&#xff1a;熟悉常用的HBase操作 实验概览 在本次实验中&#xff0c;我们将深入探索HBase在Hadoop生态系统中的角色&#xff0c;并熟练掌握常用的HBase Shell命令和Java API操作。通过这些实践&#xff0c;我们能够更好地理解HBase的工作原理以及如何在实际项目中应用。…

3D意识(3D Awareness)浅析

一、简介 3D意识&#xff08;3D Awareness&#xff09;主要是指视觉基础模型&#xff08;visual foundation models&#xff09;对于3D结构的意识或感知能力&#xff0c;即这些模型在处理2D图像时是否能够理解和表示出图像中物体或场景的3D结构&#xff0c;其具体体现在编码场景…

快递面单批量导入打印软件小程序下载 佳易王网店快递面单批量打印管理系统操作教程

一、概述 【软件文件资源在文章最后】 快递面单批量导入打印软件小程序下载 快递面单批量打印管理系统操作教程 直接使用快递空白单打印&#xff0c;可以扫描条码并可以查询快递信息&#xff0c;面单内容可以自定义。 可以批量导入批量打印&#xff0c;从而提高效率节省时间…

缓冲区溢出,数据被踩的案例学习

继续在ubuntu上学习GDB&#xff0c;今天要学习的是缓冲区溢出。 程序的地址&#xff1a; GitHub - gedulab/gebypass: bypass password by heap buffer overflow 编译的方法&#xff1a; gcc -g -O2 -o gebypass gebypass.c 照例设置一下科学shangwang代理&#xff1a; e…

数据库SQL——连接表达式(JOIN)图解

目录 一、基本概念 二、常见类型 内连接&#xff08;INNER JOIN&#xff09;&#xff1a; 左连接&#xff08;LEFT JOIN 或 LEFT OUTER JOIN&#xff09;&#xff1a; 右连接&#xff08;RIGHT JOIN 或 RIGHT OUTER JOIN&#xff09;&#xff1a; 全连接&#xff08;FULL…

sql注入之二次注入(sqlilabs-less24)

二阶注入&#xff08;Second-Order Injection&#xff09;是一种特殊的 SQL 注入攻击&#xff0c;通常发生在用户输入的数据首先被存储在数据库中&#xff0c;然后在后续的操作中被使用时&#xff0c;触发了注入漏洞。与传统的 SQL 注入&#xff08;直接注入&#xff09;不同&a…

查询DBA_FREE_SPACE缓慢问题

这个是一个常见的问题&#xff0c;理论上应该也算是一个bug&#xff0c;在oracle10g&#xff0c;到19c&#xff0c;我都曾经遇到过&#xff1b;今天在给两套新建的19C RAC添加监控脚本时&#xff0c;又发现了这个问题&#xff0c;在这里记录一下。 Symptoms 环境&#xff1a;…

实验6记录网络与故障排除

实验6记录网络与故障排除 实验目的及要求&#xff1a; 通过实验&#xff0c;掌握如何利用文档记录网络设备相关信息并完成网络拓扑结构的绘制。能够使用各种技术和工具来找出连通性问题&#xff0c;使用文档来指导故障排除工作&#xff0c;确定具体的网络问题&#xff0c;实施…

「QT」文件类 之 QTextStream 文本流类

✨博客主页何曾参静谧的博客&#x1f4cc;文章专栏「QT」QT5程序设计&#x1f4da;全部专栏「Win」Windows程序设计「IDE」集成开发环境「UG/NX」BlockUI集合「C/C」C/C程序设计「DSA」数据结构与算法「UG/NX」NX二次开发「QT」QT5程序设计「File」数据文件格式「UG/NX」NX定制…

【go从零单排】JSON序列化和反序列化

&#x1f308;Don’t worry , just coding! 内耗与overthinking只会削弱你的精力&#xff0c;虚度你的光阴&#xff0c;每天迈出一小步&#xff0c;回头时发现已经走了很远。 &#x1f4d7;概念 在 Go 语言中&#xff0c;处理 JSON 数据主要依赖于 encoding/json 包。这个包提…

网络学习第四篇

引言&#xff1a; 我们在第三篇的时候出现了错误&#xff0c;我们要就行排错&#xff0c;那么我们要知道一下怎么配置静态路由实现ping通&#xff0c;这样子我们才知道下一跳到底是什么&#xff0c;为什么这样子做。 实验目的 理解和掌握静态路由的基本概念和配置方法。 实…

蓝桥杯竞赛单片机组备赛【经验帖】

本人获奖情况说明 笔者本人曾参加过两次蓝桥杯电子赛&#xff0c;在第十二届蓝桥杯大赛单片机设计与开发组获得省级一等奖和国家级二等奖&#xff0c;在第十五届嵌入式设计开发组获得省级二等奖。如果跟着本帖的流程备赛&#xff0c;只要认真勤奋&#xff0c;拿个省二绝对没问…

yolo标签自动标注(使用python和yolo方法)

yolo代码自动标注 1.引言1.初阶“自动标注”&#xff0c;给每个图像都生成一个固定的标注文件&#xff0c;进而在labglimg中对矩形框进行微调&#xff0c;减少标注的工作量2.高阶自动标注&#xff0c;利用我们训练好的&#xff08;但是没有特别精准的&#xff09;yolo文件先对每…

Git在版本控制中的应用

&#x1f493; 博客主页&#xff1a;瑕疵的CSDN主页 &#x1f4dd; Gitee主页&#xff1a;瑕疵的gitee主页 ⏩ 文章专栏&#xff1a;《热点资讯》 Git在版本控制中的应用 Git在版本控制中的应用 Git在版本控制中的应用 引言 Git 概述 定义与原理 发展历程 Git 的关键技术 分布…