Boost Asio TCP异步服务端和客户端

news2025/1/17 15:55:38

服务端
消息分两次发送,第一次发送head,第二次发送body。接收也是先接收head,然后通过head结构中的body长度字段再接收body。
TcpServer.h

#pragma once
#include <atomic>
#include <vector>
#include <unordered_set>
#include <boost/asio/io_service.hpp>
#include <boost/asio/ip/tcp.hpp>
#include <boost/system/system_error.hpp>
#include "Connection.h"

using namespace boost::asio;
using namespace boost::asio::ip;
using namespace boost::system;

class TcpServer : public Connection::Listener {
public:
    using Handler = std::function<void(std::vector<uint8_t>, MessageType)>;
    TcpServer(uint16_t port, Handler&& handler);
    ~TcpServer();
    void _startListen();
    void _startAccept();
    void _handleAccept(const error_code& error, std::shared_ptr<tcp::socket> socket);

    virtual void onDataReceived(ConnectionPtr connection, const std::vector<uint8_t>& data, MessageType type);

    void send(const char*, int size);

private:
    uint16_t m_localPort;
    io_service m_oAcceptService;
    io_service::work m_oAcceptWork;
    tcp::acceptor *m_pAcceptor = nullptr;
    std::atomic_bool m_bStop = false;

    mutable boost::shared_mutex _connectionMutex;
    std::unordered_set<ConnectionPtr> _connections;

    Handler m_handler;
};


TcpServer.cpp

#include "TcpServer.h"
#include <boost/asio/buffer.hpp>
#include <fstream>
#include <iostream>
#include <boost/bind.hpp>
#include <boost/asio/placeholders.hpp>
#include <boost/asio.hpp>

TcpServer::TcpServer(uint16_t port, Handler&& handler)
    : m_oAcceptWork(m_oAcceptService)
    , m_localPort(port)
    , m_handler(handler)
{
    m_pAcceptor = new boost::asio::ip::tcp::acceptor(m_oAcceptService);

    _startListen();
    _startAccept();

    std::thread([&]() {
        while (1)
        {
            m_oAcceptService.run();
        }
    }).detach();
}

TcpServer::~TcpServer() {
    m_bStop = true;
}

void TcpServer::_startListen() {
    boost::asio::ip::tcp::endpoint localEndpoint(boost::asio::ip::tcp::v4(), m_localPort);
    m_pAcceptor->open(localEndpoint.protocol());

    m_pAcceptor->set_option(boost::asio::ip::tcp::acceptor::reuse_address(true));
    boost::asio::ip::tcp::no_delay noDelayOption(false);
    m_pAcceptor->set_option(noDelayOption);

    boost::system::error_code ec;
    boost::system::error_code code = m_pAcceptor->bind(localEndpoint, ec);
    if (!ec.value())
    {
        m_pAcceptor->listen();
    }
    else
        std::cout << (std::string("TcpServer start listen exception: ") + ec.message().c_str()) << std::endl;

}

void TcpServer::_startAccept() {

    if (m_bStop)
    {
        return;
    }

    auto socket = std::make_shared<boost::asio::ip::tcp::socket>(m_oAcceptService);
    m_pAcceptor->async_accept(*socket, boost::bind(&TcpServer::_handleAccept
        , this
        , boost::asio::placeholders::error, socket));

}

void TcpServer::_handleAccept(const error_code& error, std::shared_ptr<tcp::socket> socket) {

    if (!error) {
        // read and write.
        std::cout << "_handleAccept" << std::endl;

        auto connection = std::make_shared<Connection>(std::move(*socket)
            , socket->get_io_service()
            , this);

        boost::unique_lock<boost::shared_mutex> lock(_connectionMutex);
        _connections.emplace(connection);
        lock.unlock();
        connection->start();
    }

    _startAccept();

}

void TcpServer::onDataReceived(ConnectionPtr connection, const std::vector<uint8_t>& data, MessageType type) {

    //connection->send(data);

    m_handler(data, type);
}

void TcpServer::send(const char* data, int size)
{
    for (auto conn : _connections)
    {
        conn->send(data, size);
    }
}


Connection.h

#pragma once
#define BOOST_ASIO_DISABLE_STD_CHRONO
#include <boost/asio.hpp>
#include <boost/date_time/time_duration.hpp>
#include <boost/thread/shared_mutex.hpp>
#include <boost/thread/mutex.hpp>
#include <atomic>
#include <memory>
#include <list>
#include <future>
#include <boost/asio/steady_timer.hpp>
#include "message.h"

namespace pt = boost::posix_time;
namespace placeholders = boost::asio::placeholders;
using boost::asio::buffer;
using boost::asio::const_buffer;

// Connection的最大作用是保存TcpServer连接的客户端socket,以及单独启动线程或异步进行数据收发
class Connection : public std::enable_shared_from_this<Connection> {
public:
    class Listener;
    Connection(boost::asio::ip::tcp::socket&& socket
        , boost::asio::io_service& oTimerService
        , Listener* listener);
    ~Connection();
    void start();
    void stop();
    void _ranDataReception();
    void _handleReadHeader(const boost::system::error_code& error);
    void _handleReadData(const boost::system::error_code& error, const std::vector<uint8_t>& body, MessageType type);

    void send(const char* data, int size);
    void send(const std::vector<uint8_t>& data);
    void on_write(const boost::system::error_code & err, size_t bytes);

private:
    bool _stopped = false;
    boost::asio::ip::tcp::socket _socket;
    MessageHeader _header;
    Listener* _listener;
};

typedef std::shared_ptr<Connection> ConnectionPtr;

class Connection::Listener {
public:
    virtual ~Listener() {}
    virtual void onDataReceived(ConnectionPtr connection, const std::vector<uint8_t>& data, MessageType type) = 0;
};

Connection.cpp

#include "Connection.h"
#include <boost/bind.hpp>
#include <functional>
#include <iostream>

Connection::Connection(boost::asio::ip::tcp::socket&& socket, boost::asio::io_service& oTimerService, Listener* listener)
    : _socket(std::move(socket))
    , _listener(listener)
{
}

Connection::~Connection()
{

}

void Connection::start()
{
    _stopped = false;
    _ranDataReception();
}

void Connection::stop()
{
    _stopped = true;
}

void Connection::_ranDataReception() {
    if (!_stopped)
    {
        memset(&_header, 0, sizeof(MessageHeader));
        boost::system::error_code oError;
        boost::asio::async_read(_socket, boost::asio::buffer(&_header, sizeof(MessageHeader)),
            boost::asio::transfer_exactly(sizeof(MessageHeader)),
            boost::bind(&Connection::_handleReadHeader, shared_from_this(), oError));
    }
}

void Connection::_handleReadHeader(const boost::system::error_code& error) {

    if (!_stopped) {
        if (!error) {

            MessageType type = _header.type;
            int bodyLen = _header.length;

            //std::string strBody;
            std::vector<uint8_t> strBody;
            strBody.resize(bodyLen);

            //
            boost::system::error_code error;
            size_t iReadSize = _socket.read_some(boost::asio::buffer(strBody.data(), bodyLen), error);
            while (!error)
            {
                if (iReadSize < bodyLen)
                {
                    iReadSize += _socket.read_some(boost::asio::buffer(strBody.data() + iReadSize
                        , bodyLen - iReadSize), error);
                }
                else
                {
                    break;
                }
            }

            if (!error && iReadSize == bodyLen)
            {
                _handleReadData(error, strBody, type);
            }
            else
            {
            }


        }
    }
}

void Connection::_handleReadData(const boost::system::error_code& error, const std::vector<uint8_t>& body, MessageType type)
{
    //
    if (!_stopped)
    {
        if (!error)
        {
            _listener->onDataReceived(shared_from_this(), body, type);
            _ranDataReception();
        }
    }
}

void Connection::send(const char* data, int size)
{
    boost::system::error_code error;
    _socket.async_write_some(boost::asio::buffer(data, size),
        boost::bind(&Connection::on_write, this,
            boost::placeholders::_1,
            boost::placeholders::_2));
}

void Connection::send(const std::vector<uint8_t>& data)
{
    boost::system::error_code error;
    _socket.async_write_some(boost::asio::buffer(data.data(), data.size()), 
        boost::bind(&Connection::on_write, this, 
        boost::placeholders::_1, 
        boost::placeholders::_2));
}


void Connection::on_write(const boost::system::error_code & err, size_t bytes)
{

}

客户端
Network.h

#pragma once
#include <boost/asio/io_service.hpp>
#include <boost/asio/ip/tcp.hpp>

namespace sinftech {
namespace tv {
class Network {
public:
    Network(boost::asio::io_service& ioService, const std::string& address, uint16_t port);
    ~Network();
    void start();
    void stop();
    size_t send(char* data, size_t size);
    size_t receive(char* data, size_t size);

private:
    bool _running;
    boost::asio::ip::tcp::socket _socket;
    boost::asio::ip::tcp::endpoint _remoteEndpoint;
};
}//namespace tv
}//namespace sinftech

Network.cpp (windows平台setopt设置超时时间使用整数,Linux平台使用结构体struct timeval)

#include "Network.h"
#include <boost/asio/buffer.hpp>
#include <thread>

namespace sinftech {
namespace tv {

Network::Network(boost::asio::io_service& ioService, const std::string& address, uint16_t port)
    : _running(false)
    , _socket(ioService)
    , _remoteEndpoint(boost::asio::ip::address::from_string(address), port) 
{
    int timeout = 1000;
    int iRet = setsockopt(_socket.native(), SOL_SOCKET, SO_RCVTIMEO, (const char*)&timeout, sizeof(timeout));
    if (0 != iRet)
    {
        printf("Set rcv time out error\n");
    }

    int iRcvSize = 1024 * 1000;
    iRet = setsockopt(_socket.native(), SOL_SOCKET, SO_RCVBUF, (char *)&iRcvSize, sizeof(iRcvSize));
    if (0 != iRet)
    {
        printf("Set rcv buffer size error\n");
    }

    start();
}

Network::~Network() {
    stop();
}

void Network::start() {
    _running = true;
}

void Network::stop() {
    _running = false;
    boost::system::error_code ec;
    _socket.close(ec);
}

size_t Network::send(char* data, size_t size) {
    size_t bytesSent = 0;
    if (_running) {
        boost::system::error_code ec;
        if (!_socket.is_open()) {
            _socket.connect(_remoteEndpoint, ec);
        }
        if (!ec) {            
            bytesSent = _socket.write_some(boost::asio::buffer(data, size), ec);
        }
        if (ec) {
            _socket.close(ec);
        }
    }
    return bytesSent;
}

size_t Network::receive(char* data, size_t size) {
    size_t bytesRecv = 0;
    if (_running) {
        boost::system::error_code ec;
        if (!_socket.is_open()) {
            _socket.connect(_remoteEndpoint, ec);
        }
        if (!ec) {            
            bytesRecv = _socket.read_some(boost::asio::buffer(data, size), ec);
        }
        if (ec) {
            _socket.close(ec);
        }
    }

    return bytesRecv;
}

}//namespace tv
}//namespace sinftech

注意,Linux和Windows平台使用setopt设置超时参数的方式是不同的。在Linux上,你可以使用setsockopt来设置套接字选项,包括读取和写入超时。具体的选项是SO_RCVTIMEO和SO_SNDTIMEO。

#include <sys/types.h>
#include <sys/socket.h>
#include <unistd.h>

int set_socket_timeout(int sockfd, int timeout_ms) {
    struct timeval timeout;
    timeout.tv_sec = timeout_ms / 1000;
    timeout.tv_usec = (timeout_ms % 1000) * 1000;

    // 设置接收超时
    if (setsockopt(sockfd, SOL_SOCKET, SO_RCVTIMEO, (char *)&timeout, sizeof(timeout)) < 0) {
        perror("setsockopt failed");
        return -1;
    }

    // 设置发送超时
    if (setsockopt(sockfd, SOL_SOCKET, SO_SNDTIMEO, (char *)&timeout, sizeof(timeout)) < 0) {
        perror("setsockopt failed");
        return -1;
    }

    return 0;
}

在Windows上,setsockopt同样用于设置套接字选项,但超时时间是以毫秒为单位的整数,而不是timeval结构体。你需要使用SO_RCVTIMEO和SO_SNDTIMEO选项,并传递一个DWORD类型的值。

#include <winsock2.h>
#include <ws2tcpip.h>

#pragma comment(lib, "Ws2_32.lib")

int set_socket_timeout(SOCKET sockfd, DWORD timeout_ms) {
    // 设置接收超时
    if (setsockopt(sockfd, SOL_SOCKET, SO_RCVTIMEO, (char*)&timeout_ms, sizeof(timeout_ms)) == SOCKET_ERROR) {
        printf("setsockopt failed with error: %ld\n", WSAGetLastError());
        return -1;
    }

    // 设置发送超时
    if (setsockopt(sockfd, SOL_SOCKET, SO_SNDTIMEO, (char*)&timeout_ms, sizeof(timeout_ms)) == SOCKET_ERROR) {
        printf("setsockopt failed with error: %ld\n", WSAGetLastError());
        return -1;
    }

    return 0;
}

// 在程序开始时需要初始化Winsock库
int main() {
    WSADATA wsaData;
    int result = WSAStartup(MAKEWORD(2, 2), &wsaData);
    if (result != 0) {
        printf("WSAStartup failed: %d\n", result);
        return 1;
    }

    // ... 创建并配置套接字 ...

    // 在程序结束前清理Winsock库
    WSACleanup();
    return 0;
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2278069.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

第三十八章 Spring之假如让你来写MVC——适配器篇

Spring源码阅读目录 第一部分——IOC篇 第一章 Spring之最熟悉的陌生人——IOC 第二章 Spring之假如让你来写IOC容器——加载资源篇 第三章 Spring之假如让你来写IOC容器——解析配置文件篇 第四章 Spring之假如让你来写IOC容器——XML配置文件篇 第五章 Spring之假如让你来写…

【开源免费】基于SpringBoot+Vue.JS企业OA管理系统(JAVA毕业设计)

本文项目编号 T 135 &#xff0c;文末自助获取源码 \color{red}{T135&#xff0c;文末自助获取源码} T135&#xff0c;文末自助获取源码 目录 一、系统介绍二、数据库设计三、配套教程3.1 启动教程3.2 讲解视频3.3 二次开发教程 四、功能截图五、文案资料5.1 选题背景5.2 国内…

(12)springMVC文件的上传

SpringMVC文件上传 首先是快速搭建一个springMVC项目 新建项目mvn依赖导入添加webMoudle添加Tomcat运行环境.在配置tomcat时ApplicationContext置为"/"配置Artfact的lib配置WEB-INF配置文件&#xff08;记得添加乱码过滤&#xff09;配置springmvc-servlet文件&…

【华为路由/交换机的ftp文件操作】

华为路由/交换机的ftp文件操作 PC&#xff1a;10.0.1.1 R1&#xff1a;10.0.1.254 / 10.0.2.254 FTP&#xff1a;10.0.2.1 S1&#xff1a;无配置 在桌面创建FTP-Huawei文件夹&#xff0c;里面创建config/test.txt。 点击上图中的“启动”按钮。 然后ftp到server&#xff0c;…

虚拟拨号技术(GOIP|VOIP)【基于IP的语音传输转换给不法分子的境外来电披上一层外衣】: Voice over Internet Protocol

文章目录 引言I 虚拟拨号技术(GOIP|VOIP)原理特性:隐蔽性和欺骗性II “GOIP”设备原理主要功能III 基于IP的语音传输 “VOIP” (Voice over Internet Protocol)IV “断卡行动”“断卡行动”目的电信运营商为打击电诈的工作V 知识扩展虚拟号保护隐私虚拟运营商被用于拨打骚扰…

获取当前页面的url相关信息

引言&#xff1a;如何通过javascript获取当前html页面的链接信息 let currentUrl window.location.href; let protocol window.location.protocol; // 协议 let host window.location.host; // 主机名和端口 let hostname window.location.hostname; // 主机名 le…

【C++】size_t全面解析与深入拓展

博客主页&#xff1a; [小ᶻ☡꙳ᵃⁱᵍᶜ꙳] 本文专栏: C 文章目录 &#x1f4af;前言&#x1f4af;一、什么是size_t&#xff1f;为什么需要size_t&#xff1f; &#x1f4af;二、size_t的特性与用途1. size_t是无符号类型示例&#xff1a; 2. size_t的跨平台适应性示例对…

Quinlan C4.5剪枝U(0,6)U(1,16)等置信上限如何计算?

之前看到Quinlan中关于C4.5决策树算法剪枝环节中,关于错误率e置信区间估计,为啥 当E=0时,U(0,1)=0.75,U(0,6)=0.206,U(0,9)=0.143? 而当E不为0时,比如U(1,16)=0.157,如图: 关于C4.5决策树,Quinlan写了一本书,如下: J. Ross Quinlan (Auth.) - C4.5. Programs f…

怎么进行论文选题?有没有AI工具可以帮助~

论文选题听起来简单&#xff0c;做起来难&#xff01;尤其是对于我们这群即将毕业的“学术小白”。记得那天导师布置完任务&#xff0c;我整个人就陷入了深深的沉思&#xff08;其实是发呆&#xff09;。直到室友神秘兮兮地告诉我&#xff1a;“你知道AI现在能帮人选题了吗&…

windows 极速安装 Linux (Ubuntu)-- 无需虚拟机

1. 安装 WSL 和 Ubuntu 打开命令行&#xff0c;执行 WSL --install -d ubuntu若报错&#xff0c;则先执行 WSL --update2. 重启电脑 因安装了子系统&#xff0c;需重启电脑才生效 3. 配置 Ubuntu 的账号密码 打开 Ubuntu 的命令行 按提示&#xff0c;输入账号&#xff0c;密…

TCP-IP详解卷 TCP的超时与重传

TCP-IP详解卷1-21&#xff1a;TCP的超时与重传&#xff08;Timeout and Retransmission&#xff09; 一&#xff1a;介绍 1&#xff1a; 与数据链路层的ARQ协议相类似&#xff0c;TCP使用超时重发的重传机制。 即&#xff1a;TCP每发送一个报文段&#xff0c;就对此报文段设置…

【统计的思想】假设检验(一)

假设检验是统计学里的重要方法&#xff0c;同时也是一种“在理想与现实之间观察求索”的测试活动。假设检验从概率的角度去考察理想与现实之间的关系&#xff0c;籍此来缓解测试可信性问题。 我们先来看一个例子。民航旅客服务系统&#xff0c;简称PSS系统&#xff0c;有一种业…

欧拉Euler 21.10 安装Oracle 19c RAC( PDB )到单机ADG

环境说明 主库&#xff08;RAC&#xff09; 备库&#xff08;FS&#xff09; 数据库版本 Oracle19.22.0.0 Oracle19.22.0.0 IP 地址 192.168.40.90-94 192.168.40.95 主机名 hfdb90、hfdb91 hfdb95 DB_UNIQUE_NAME hfdb dghfdb DB_NAME hfdb hfdb DB Instance…

图数据库 | 18、高可用分布式设计(中)

上文我们聊了在设计高性能、高可用图数据库的时候&#xff0c;从单实例、单节点出发&#xff0c;一般有3种架构演进选项&#xff1a;主备高可用&#xff0c;今天我们具体讲讲分布式共识&#xff0c;以及大规模水平分布式。 主备高可用、分布式共识、大规模水平分布式&#xff…

为mysql开启error日志 - phpstudy的数据库启动失败

步骤 找到mysql的配置文件 “my.ini”&#xff0c; windows上直接进入mysql安装目录&#xff0c;或者直接全盘搜&#xff1b; linux上使用命令 locate my.ini 即可搜索 修改"my.ini"&#xff0c;找到 组[mysqld] 下面的“log_error”并设置日志文件绝对路径&#x…

Java设计模式——单例模式(特性、各种实现、懒汉式、饿汉式、内部类实现、枚举方式、双重校验+锁)

我是一个计算机专业研0的学生卡蒙Camel&#x1f42b;&#x1f42b;&#x1f42b;&#xff08;刚保研&#xff09; 记录每天学习过程&#xff08;主要学习Java、python、人工智能&#xff09;&#xff0c;总结知识点&#xff08;内容来自&#xff1a;自我总结网上借鉴&#xff0…

MySQL查询相关内容

创建员工库和表&#xff1b; mysql> create database mydb8_worker; Query OK, 1 row affected (0.01 sec)mysql> use mydb8_worker; Database changed mysql> create table t_worker(-> department_id int(11) not null comment 部门号,-> worker_id int(11)…

微信小程序原生与 H5 交互方式

在微信小程序中&#xff0c;原生与 H5 页面&#xff08;即 WebView 页面&#xff09;之间的交互通常有以下几种方式&#xff1a; 1. 使用 postMessage 进行通信 微信小程序的 WebView 页面和原生小程序页面可以通过 postMessage 来进行数据传递。 WebView 页面向原生小程序发…

shell脚本基础练习

1、需求&#xff1a;判断192.168.1.0/24网络中&#xff0c;当前在线的ip有哪些&#xff0c;并编写脚本打印出来。&#xff08;以前10个网络IP为例&#xff0c;可以进行更改&#xff09; #!/bin/bashfor ((i1;i<10;i)) doping -c1 -w1 192.168.1.$i &> /dev/null &…

【全栈开发】----Mysql基本配置与使用

本篇是在已下载Mysql的情况下进行的&#xff0c;若还未下载或未创建Mysql服务&#xff0c;请转到这篇: 2024 年 MySQL 8.0.40 安装配置、Workbench汉化教程最简易&#xff08;保姆级&#xff09;_mysql8.0.40下载安装教程-CSDN博客 本文对于mysql的操作均使用控制台sql原生代码…