zlMediaKit 3 socket模块--怎么封装socket,怎么connect listen/bind write read

news2024/11/27 21:54:24

socket.cpp socket.h

socket

在这里插入图片描述

SockInfo类,有四个获取四元组信息的虚函数+一个获取自身标识符的虚函数

shared_from_this

原理关于boost中enable_shared_from_this类的原理分析 - 阿玛尼迪迪 - 博客园 (cnblogs.com)

shared_ptr<Tp> shared_from_this() { return shared_ptr<T>(M_weak_this); }

从上面的说明来看,需要小心的是shared_from_this()仅在shared_ptr<T>的构造函数被调用之后才能使用,原因是enable_shared_from_this::weak_this_并不在构造函数中设置,而是在shared_ptr<T>的构造函数中设置

#include <iostream>
#include <vector>
#include <list>
#include<iostream>
#include <memory>
#include <sstream>
using namespace std;

class A : public std::enable_shared_from_this<A>
{
public:
    int i{1};
};

int main()
{
    
    //auto p = new A;
    auto p = std::make_shared<A>();
    auto a = p->shared_from_this();// auto p = new A;  --> what():  bad_weak_ptr 没有给m_weak_ptr赋值
    printf("i:%d",p->i);
    return 0;   
}

Socket

结构

//和send_l部分关联
//一级发送缓存, socket可写时,会把一级缓存批量送入到二级缓存
List<std::pair<Buffer::Ptr, bool> > _send_buf_waiting;
//一级发送缓存锁
MutexWrapper<std::recursive_mutex> _mtx_send_buf_waiting;
//二级发送缓存, socket可写时,会把二级缓存批量写入到socket
List<BufferList::Ptr> _send_buf_sending;
//二级发送缓存锁
MutexWrapper<std::recursive_mutex> _mtx_send_buf_sending;
//发送buffer结果回调
BufferList::SendResult _send_result;
//对象个数统计
ObjectStatistic<Socket> _statistic;

构造函数

//接收数据回调
using onReadCB = std::function<void(const Buffer::Ptr &buf, struct sockaddr *addr, int addr_len)>;
//发生错误回调
using onErrCB = std::function<void(const SockException &err)>; //只有错误码,socket可能已经断开了
//tcp监听接收到连接请求
using onAcceptCB = std::function<void(Socket::Ptr &sock, std::shared_ptr<void> &complete)>;
//socket发送缓存清空事件,返回true代表下次继续监听该事件,否则停止
using onFlush = std::function<bool()>;
//在接收到连接请求前,拦截Socket默认生成方式
using onCreateSocket = std::function<Ptr(const EventPoller::Ptr &poller)>;
//发送buffer成功与否回调
using onSendResult = BufferList::SendResult;

//类内部的静态方法,设置socket的poller和锁属性
static Socket::Ptr toolkit::Socket::createSocket(const EventPoller::Ptr &poller, bool enable_mutex)

Socket(const toolkit::EventPoller::Ptr &poller, bool enable_mutex)
{
    //1 绑定poller
    //2 使能锁  _mtx_sock_fd|_mtx_event|_mtx_send_buf_waiting|_mtx_send_buf_sending
}

封装可控制开关的递归锁

template<class Mtx = std::recursive_mutex>
class MutexWrapper {
public:
    MutexWrapper(bool enable) {
        _enable = enable;
    }

    ~MutexWrapper() = default;

    inline void lock() {
        if (_enable) {
            _mtx.lock();
        }
    }

    inline void unlock() {
        if (_enable) {
            _mtx.unlock();
        }
    }

private:
    bool _enable;
    Mtx _mtx;
};

lock_guard任意类型的锁的宏

#define LOCK_GUARD(mtx) lock_guard<decltype(mtx)> lck(mtx)

connect

使用weak_ptr作为一个观察指针,在执行异步时不确定socket是否还在,所以要用weak_ptr.lock()再判断下

con_cb 执行成功后,释放async_con_cb/_con_timer对象,如果con失败,则释放自己对应的文件描述符

async_con_cb:con_cb的声明周期由async_con_cb保证

lambda函数本质:
遵循了类的特征,生命周期和类是一样的
如果有参数传入好比类的成员变量
传入引用就是引用本身的生命周期

为什么要有con_cb和async_con_cb,后面的if-else中可知,如果时直接给出ip,直接连接,连接的结果作为con_cb的参数传入

如果需要DNS解析,那么这个操作是阻塞的,异步连接。无论同步异步,连接操作完毕之后,poll该fd上的可写事件,可写后调用onConnected事件

todo

  • 定时器的实现
  • 工作线程分配任务的实现
  • poller添加事件的实现
void Socket::connect(const string &url, uint16_t port, onErrCB con_cb_in, float timeout_sec, const string &local_ip, uint16_t local_port)
{
        closeSock();

    weak_ptr<Socket> weak_self = shared_from_this();
    auto con_cb = [con_cb_in, weak_self](const SockException &err) {
        auto strong_self = weak_self.lock();
        if (!strong_self) {
            return;
        }
        strong_self->_async_con_cb = nullptr;//
        strong_self->_con_timer = nullptr;//释放智能指针指向的对象
        if (err) {
            LOCK_GUARD(strong_self->_mtx_sock_fd);
            strong_self->_sock_fd = nullptr;
        }
        con_cb_in(err);
    };

    auto async_con_cb = std::make_shared<function<void(int)> >([weak_self, con_cb](int sock) {
        //con_cb的生命周期由async_con_cb保证
        auto strong_self = weak_self.lock();
        if (sock == -1 || !strong_self) {
            if (!strong_self) {
                CLOSE_SOCK(sock);
            } else {
                con_cb(SockException(Err_dns, get_uv_errmsg(true)));
            }
            return;
        }

        auto sock_fd = strong_self->makeSock(sock, SockNum::Sock_TCP);
        weak_ptr<SockFD> weak_sock_fd = sock_fd;

        //监听该socket是否可写,可写表明已经连接服务器成功
        int result = strong_self->_poller->addEvent(sock, EventPoller::Event_Write, [weak_self, weak_sock_fd, con_cb](int event) {
            auto strong_sock_fd = weak_sock_fd.lock();
            auto strong_self = weak_self.lock();
            if (strong_sock_fd && strong_self) {
                //socket可写事件,说明已经连接服务器成功
                strong_self->onConnected(strong_sock_fd, con_cb);
            }
        });

        if (result == -1) {
            con_cb(SockException(Err_other, "add event to poller failed when start connect"));
            return;
        }

        //保存fd
        LOCK_GUARD(strong_self->_mtx_sock_fd);
        strong_self->_sock_fd = sock_fd;
    });

    if (isIP(url.data())) {
        (*async_con_cb)(SockUtil::connect(url.data(), port, true, local_ip.data(), local_port));
    } else {
        auto poller = _poller;
        weak_ptr<function<void(int)>> weak_task = async_con_cb;
        WorkThreadPool::Instance().getExecutor()->async([url, port, local_ip, local_port, weak_task, poller]() {
            //阻塞式dns解析放在后台线程执行
            int sock = SockUtil::connect(url.data(), port, true, local_ip.data(), local_port);
            poller->async([sock, weak_task]() {
                auto strong_task = weak_task.lock();
                if (strong_task) {
                    (*strong_task)(sock);
                } else {
                    CLOSE_SOCK(sock);
                }
            });
        });
        _async_con_cb = async_con_cb;//async_con_cb的生命周期由类成员_async_con_cb保证(续命)
    }

    //连接超时定时器 定时器堆上分配,生命周期由类成员_con_timer保证(续命)
    _con_timer = std::make_shared<Timer>(timeout_sec, [weak_self, con_cb]() {
        con_cb(SockException(Err_timeout, uv_strerror(UV_ETIMEDOUT)));
        return false;
    }, _poller);
}

onConnected函数由connect函数调用,先删除可写事件监听,再调用attachEvent(正式进入工作状态了),初始化所有线程下共享的读缓存(_read_buffer,默认256KB),连接时的可写事件表明socket是否连接服务器成功。连接后的可读可写事件表明socket上确实有事件到来了,要调用onRead|onWrite|emitErr等方法

onRead

使用socket共享的读buffer调用recvfrom,调用_on_read,即同步触发onReadCB回调:接受到了哪个地址的多少数据

send/send_l

send(Buffer::Ptr buf, struct sockaddr *addr, socklen_t addr_len, bool try_flush)
{
    if (!addr || !addr_len) {
      	return send_l(std::move(buf), false, try_flush); //没有sock地址的buf
    }
    return send_l(std::make_shared<BufferSock>(std::move(buf), addr, addr_len), true, try_flush);
}

send_l(Buffer::Ptr buf, bool is_buf_sock, bool try_flush)
// 
{
    flushData
}

flushData

//一级缓存是多个buffer
//二级缓存是多个bufferlist,一个bufferlist是一个一级缓存,在创建时已经绑定对应的发送类型了BufferSendMsg/BufferSendMMsg

消费sock二级缓存中的数据,如果二级缓存为空就消费一级的,一级缓存清空是通过move操作

​ 如果一级缓存也为空,那么说明所有数据均写入socket了,poller停止监听sock可写事件,即使不可写也可以往多级缓存中写,返回

​ 需要发数据,while循环中把二级缓存中的BufferList依次发送,同时设置sock的可写事件,有空间写了。udp发送失败,丢弃(pop_front),tcp触发异常,

​ 回滚未发送出去的数据,写回二级缓存中

如果是poller线程就再flushData一次

startWriteAbleEvent 开始监听可写事件: send失败了,缓冲区不够了,监听以便下次接着写

stopWriteAbleEvent 停止监听可写事件: 多级buffer空了,即使不可写也能写buffer

onWriteAble: 多级缓存空了就停止监听可写事件,否则多级缓存写入socket缓冲区

listen/bindUdpSock

监听读事件,对应的回调是onAccept

onAccept

SockUtil::setNoSigpipe(fd);
SockUtil::setNoBlocked(fd);
SockUtil::setNoDelay(fd);
SockUtil::setSendBuf(fd);
SockUtil::setRecvBuf(fd);
SockUtil::setCloseWait(fd);
SockUtil::setCloExec(fd);

// tcp监听收到accept请求,自定义创建peer Socket事件(可以控制子Socket绑定到其他poller线程)
peer_sock = _on_before_accept(_poller);//?

// 监听都时间  利用自己传入deleter析构的时候执行 completed
//这样的目的是 用户处理完onAccept时间后 才能再收到onRead事件
//因为onAccept是可能异步处理的 所以不能触发事件后立即加入epoll监听onRead事件
shared_ptr<void> completed(nullptr, [peer_sock, peer_sock_fd](void *) {
    try {
        //然后把该fd加入poll监听(确保先触发onAccept事件然后再触发onRead等事件)
        if (!peer_sock->attachEvent(peer_sock_fd, false)) {
            //加入poll监听失败,触发onErr事件,通知该Socket无效
            peer_sock->emitErr(SockException(Err_eof, "add event to poller failed when accept a socket"));
        }
    } catch (std::exception &ex) {
        ErrorL << ex.what();
    }
});


_socket->setOnAccept([this](Socket::Ptr &sock, shared_ptr<void> &complete) {
    auto ptr = sock->getPoller().get();
    auto server = getServer(ptr);
    ptr->async([server, sock, complete]() {
        //该tcp客户端派发给对应线程的TcpServer服务器
        server->onAcceptConnection(sock);
    });
});

bindUdpSock

udp没有listen_fd。就是直接监听这个端口的读写事件就行

bool Socket::bindUdpSock(uint16_t port, const string &local_ip, bool enable_reuse) {
    closeSock();
    int fd = SockUtil::bindUdpSock(port, local_ip.data(), enable_reuse);
    if (fd == -1) {
        return false;
    }
    auto sock = makeSock(fd, SockNum::Sock_UDP);
    if (!attachEvent(sock, true)) {
        return false;
    }
    LOCK_GUARD(_mtx_sock_fd);
    _sock_fd = sock;
    return true;
}


bool Socket::attachEvent(const SockFD::Ptr &sock, bool is_udp) {
    weak_ptr<Socket> weak_self = shared_from_this();
    weak_ptr<SockFD> weak_sock = sock;
    _enable_recv = true;
    _read_buffer = _poller->getSharedBuffer();
    int result = _poller->addEvent(sock->rawFd(), EventPoller::Event_Read | EventPoller::Event_Error | EventPoller::Event_Write, [weak_self,weak_sock,is_udp](int event) {
        auto strong_self = weak_self.lock();
        auto strong_sock = weak_sock.lock();
        if (!strong_self || !strong_sock) {
            return;
        }

        if (event & EventPoller::Event_Read) {
            strong_self->onRead(strong_sock, is_udp);
        }
        if (event & EventPoller::Event_Write) {
            strong_self->onWriteAble(strong_sock);
        }
        if (event & EventPoller::Event_Error) {
            strong_self->emitErr(getSockErr(strong_sock));
        }
    });

    return -1 != result;
}

SockSender

子类send的实现!

virtual ssize_t send(Buffer::Ptr buf) = 0;
virtual void shutdown(const SockException &ex = SockException(Err_shutdown, "self shutdown")) = 0;

//这么多发送类型,都依赖send(Buffer::Ptr buf)的重写实现!
//发送char *
SockSender &operator << (const char *buf);
ssize_t send(const char *buf, size_t size = 0);
//发送字符串
SockSender &operator << (std::string buf);
ssize_t send(std::string buf);
//发送Buffer对象
SockSender &operator << (Buffer::Ptr buf);
//发送其他类型是数据
template<typename T>
SockSender &operator << (T &&buf) {
    std::ostringstream ss;
    ss << std::forward<T>(buf);
    send(ss.str());
    return *this;
}

test_tcpClient.cpp 这样也可以发数据

auto buf = BufferRaw::create();
if(buf){
    buf->assign("[BufferRaw]\0");
    (*this) << _nTick++ << " "
            << 3.14 << " "
            << string("string") << " "
            <<(Buffer::Ptr &)buf;
}

SocketHelper

在这里插入图片描述

has a sock

对于send的实现,是对Socket::send的封装

ssize_t SocketHelper::send(Buffer::Ptr buf) {
    if (!_sock) {
        return -1;
    }
    return _sock->send(std::move(buf), nullptr, 0, _try_flush);
}//nullptr判断是否是一个bufSock getBufferSockPtr,给sendmmsg指定地址用

对于async的实现,是对EventPoller::async的封装

总结

  • weak_ptr|shared_from_this|的使用场景,异步操作时,判断对象生命周期

  • decltype常用在<>中,不像auto一样需要定义,只是声明

  • lock_guard的使用,传入一个可以lock和unlock的对象

  • recursive_mutex的使用场景

  • 二级缓存发送数据时的作用,以及回滚的实现

  • 写事件的监听的目的时机

  • shared<void> xxx(nullptr, [](){} ) 用法,自己传入deleter

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/3866.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

RabbitMQ学习(一)

目录&#xff1a; &#xff08;1&#xff09;什么是消息队列 &#xff08;2&#xff09;为什么要使用消息队列 &#xff08;3&#xff09;RabbitMQ特点 &#xff08;4&#xff09;RabbitMQ的安装 &#xff08;5&#xff09;RabbitMQ常用命令 &#xff08;6&#xff09;Ra…

【HTML】标签简单融合运用

&#x1f60a;博主页面&#xff1a;鱿年年 &#x1f449;博主推荐专栏&#xff1a;《WEB前端》&#x1f448; ​&#x1f493;博主格言&#xff1a;追风赶月莫停留&#xff0c;平芜尽处是春山❤️ 一、目录文件夹 1.在vscode建立一个新的目录文件夹如15-综合案例 2.将imag…

图的存储结构

图的存储结构 1.邻接矩阵表示法 设图G (V, E)是具有n个顶点的图&#xff0c;顶点顺序依次为{v1,v2,v3.......} 设a[N][N]为 n 阶方阵 G 的邻接矩阵具有此种性质&#xff1a; 若a[i][j]1&#xff0c;则存在边(vi, vj)或者弧<vi, vj> (即两点之间存在边或弧)若a[i][j]0…

day01 计算机基础和环境搭建

day01 计算机基础和环境搭建 课程目标&#xff1a;让大家了解计算机基础知识并完成python的环境搭建 课程概要&#xff1a; 计算机基础 编程的本质 python的介绍 python环境的搭建 1.计算机基础 1.1 基本概念 计算机的组成 计算机的组计算机是由多个硬件组合而成&#…

快2023了你不会还没学uni-app吧?(uniapp开发快速上手,uniapp项目创建,基础目录介绍)

uniapp新人上手指南前言开发工具快速尝鲜—创建uni-app项目项目基础目录介绍最后前言 uni-app 是一个使用 Vue.js (opens new window)开发所有前端应用的框架&#xff0c;开发者编写一套代码&#xff0c;可发布到14个平台&#xff0c;听起来是不是非常厉害&#xff0c;如果你后…

【论文阅读】Semi-supervised Sequence Learning半监督序列学习

【论文阅读】Semi-supervised Sequence Learning半监督学习 前言 半监督学习(Semi-Supervised Learning&#xff0c;SSL) 是模式识别和机器学习领域研究的重点问题&#xff0c;是监督学习与无监督学习相结合的一种学习方法。半监督学习使用大量的未标记数据&#xff0c;以及同…

别让 Linux 成为拿offer的阻碍

文章目录前言目录结构VI/VIM 编辑器是什么一般模式编辑模式&#xff08;插入模式&#xff09;命令模式模式间转换常用基础命令&#xff08;重要&#xff09;帮助命令man 获得帮助信息help 获得 shell 内置命令的帮助信息type 查看某命令是内置命令还是外部命令常用快捷键文件目…

Java 之 ElasticSearch8.x.x 【一篇文章精通系列】【ES的基本操作,ES安装,ES head + Kibana】

Java 之 ElasticSearch8.x.x 【一篇文章精通系列】【上&#xff1a;ES的基本操作&#xff0c;ES安装&#xff0c;ES head Kibana】一、ElasticSearch的安装1、解压安装ES2、熟悉目录3、启动ES4、安装可视化界面&#xff08;elasticsearch head&#xff09;5、了解ELK6、安装Ki…

【路径插值与抽稀篇】(3)路径插值与抽稀篇

系列文章目录 提示&#xff1a;这里可以添加系列文章的所有文章的目录&#xff0c;目录需要自己手动添加 TODO:写完再整理 文章目录系列文章目录前言一、路径曲线插值、拟合和逼近的区别1、拟合2、插值3、逼近二、路径点线性插值方法&#xff08;1&#xff09;纯跟踪&#xff…

EDA程序设计--计时器设计

实训题目&#xff1a;计时器的设计 1 系统设计 1.1设计要求 1.1.1 设计任务 设计并制作一台计时器。 1.1.2 性能指标要求 ① 用EDA实训仪的I/O设备和PLD芯片实现计时器的设计。 ② 计时器能够显示时、分和秒。 ③ 用EDA实训仪上的8只八段数码管显示时、分和秒&#xff08;如00…

mysql 常用查询优化策略详解

前言 在程序上线运行一段时间后&#xff0c;一旦数据量上去了&#xff0c;或多或少会感觉到系统出现延迟、卡顿等现象&#xff0c;出现这种问题&#xff0c;就需要程序员或架构师进行系统调优工作了&#xff0c;其中&#xff0c;大量的实践经验表明&#xff0c;调优的手段尽管…

Linux学习——目录操作和库使用

目录 一、打开目录 二、读取目录 三、关闭目录 四、修改文件权限 五、获取文件属性 六、库的概念&#xff1a; 1、静态库 缺点&#xff1a; 优点&#xff1a; 创建静态库步骤&#xff1a; 链接静态库&#xff1a; 2、动态库 动态库的生成步骤&#xff1a; 练习题&a…

论文阅读之Enhancing Transformer with Sememe Knowledge(2020)

文章目录论文阅读Transformmer-SETransformer-SP实验结果总结参考论文阅读 文章建议结合两种简单的方法将义原知识整合&#xff1a; 1&#xff09;基于语言学假设&#xff0c;我们将聚合义原嵌入添加到每个词嵌入中以增强其语义表示&#xff1b; 2&#xff09;我们使用义原预测…

OpenCV实战项目 -- 口罩识别

每次我忘记戴口罩去食堂吃饭的时候&#xff0c;门口都会有志愿者学生提醒你&#xff1a;“你好&#xff0c;麻烦戴下口罩。” 进门后里面那块大屏幕还会发出声音&#xff1a;“请佩戴口罩”。 上次博客仿照宿舍楼下那块大屏幕写了个人脸考勤&#xff0c;所以这次我打算弄一个口…

std::logic_error 错误的解决

今天测试一个程序&#xff0c;突然出现一个 std::logic_error 错误&#xff0c;详细如下&#xff1a; 这个应该是 std::string 相关的一个错误&#xff0c;具体的错误信息还得用 GDB 跟踪一下了。 看第 8 栈帧已经是系统库里的东西了&#xff0c;第 9 帧是我本地的代码&#x…

【C进阶】之定义结构体及使用typedf

1 结构体中包含函数指针类型成员 声明的格式&#xff1a; struct 结构体名 { 返回类型 (*函数指针名)(形参列表); }; 定义结构体类型的变量并对结构体中的函数指针成员进行初始化 struct 结构体名 结构体变量名; 结构体变量名.函数指针名 函数名; // 函数指针指向的函数具有相…

目标检测(7)—— YOLO系列V3

一、YOLOV3 多scale 三种scale&#xff1a; 为了检测到不同大小的物体&#xff0c;设计了3个scale。 特征融合不好。 感受野大的特征图预测大的&#xff0c;中的预测中的&#xff0c;小的预测小的。各自预测各自的&#xff0c;不用做特征融合。 三个候选框&#xff1a; 每个特…

「C++小游戏教程」基本技巧(2)——系统 DOS 命令

0. 引言 「C小游戏教程」基本技巧(1)——随机化 在 (1) 中&#xff0c;我在使用 random_shuffle() 时加了一个 system("pause");。其中 system() 是系统发出 DOS 命令的函数&#xff0c;原型为 int system(char *command);。我们今天就来谈谈这个函数的主要功能用途…

Redis持久化

目录 一、Redis高可用 1.持久化 2.主从复制 3.哨兵 4.Cluster集群 二、Redis持久化 三、RDB持久化 1.概念 2.触发条件 &#xff08;1&#xff09;手动触发 &#xff08;2&#xff09;自动触发 3.执行流程 4.启动时自动加载 四、AOF持久化 1.概念 2.开启AOF持久…

微信公众号获取openid流程

说明 微信公众号获取openid&#xff0c;在官方文档中称为网页授权&#xff0c;授权有两种scope&#xff0c;snsapi_base和snsapi_userinfo&#xff0c;snsapi_base是静默授权&#xff0c;不需要用户同意&#xff0c;以下要说的就是静默授权。 关于网页授权的两种 scope 的区别…