socket.cpp socket.h
socket
SockInfo类,有四个获取四元组信息的虚函数+一个获取自身标识符的虚函数
shared_from_this
原理关于boost中enable_shared_from_this类的原理分析 - 阿玛尼迪迪 - 博客园 (cnblogs.com)
shared_ptr<Tp> shared_from_this() { return shared_ptr<T>(M_weak_this); }
从上面的说明来看,需要小心的是shared_from_this()仅在shared_ptr<T>的构造函数被调用之后才能使用,原因是enable_shared_from_this::weak_this_并不在构造函数中设置,而是在shared_ptr<T>的构造函数中设置
#include <iostream>
#include <vector>
#include <list>
#include<iostream>
#include <memory>
#include <sstream>
using namespace std;
class A : public std::enable_shared_from_this<A>
{
public:
int i{1};
};
int main()
{
//auto p = new A;
auto p = std::make_shared<A>();
auto a = p->shared_from_this();// auto p = new A; --> what(): bad_weak_ptr 没有给m_weak_ptr赋值
printf("i:%d",p->i);
return 0;
}
Socket
结构
//和send_l部分关联
//一级发送缓存, socket可写时,会把一级缓存批量送入到二级缓存
List<std::pair<Buffer::Ptr, bool> > _send_buf_waiting;
//一级发送缓存锁
MutexWrapper<std::recursive_mutex> _mtx_send_buf_waiting;
//二级发送缓存, socket可写时,会把二级缓存批量写入到socket
List<BufferList::Ptr> _send_buf_sending;
//二级发送缓存锁
MutexWrapper<std::recursive_mutex> _mtx_send_buf_sending;
//发送buffer结果回调
BufferList::SendResult _send_result;
//对象个数统计
ObjectStatistic<Socket> _statistic;
构造函数
//接收数据回调
using onReadCB = std::function<void(const Buffer::Ptr &buf, struct sockaddr *addr, int addr_len)>;
//发生错误回调
using onErrCB = std::function<void(const SockException &err)>; //只有错误码,socket可能已经断开了
//tcp监听接收到连接请求
using onAcceptCB = std::function<void(Socket::Ptr &sock, std::shared_ptr<void> &complete)>;
//socket发送缓存清空事件,返回true代表下次继续监听该事件,否则停止
using onFlush = std::function<bool()>;
//在接收到连接请求前,拦截Socket默认生成方式
using onCreateSocket = std::function<Ptr(const EventPoller::Ptr &poller)>;
//发送buffer成功与否回调
using onSendResult = BufferList::SendResult;
//类内部的静态方法,设置socket的poller和锁属性
static Socket::Ptr toolkit::Socket::createSocket(const EventPoller::Ptr &poller, bool enable_mutex)
Socket(const toolkit::EventPoller::Ptr &poller, bool enable_mutex)
{
//1 绑定poller
//2 使能锁 _mtx_sock_fd|_mtx_event|_mtx_send_buf_waiting|_mtx_send_buf_sending
}
封装可控制开关的递归锁
template<class Mtx = std::recursive_mutex>
class MutexWrapper {
public:
MutexWrapper(bool enable) {
_enable = enable;
}
~MutexWrapper() = default;
inline void lock() {
if (_enable) {
_mtx.lock();
}
}
inline void unlock() {
if (_enable) {
_mtx.unlock();
}
}
private:
bool _enable;
Mtx _mtx;
};
lock_guard任意类型的锁的宏
#define LOCK_GUARD(mtx) lock_guard<decltype(mtx)> lck(mtx)
connect
使用weak_ptr作为一个观察指针,在执行异步时不确定socket是否还在,所以要用weak_ptr.lock()再判断下
con_cb 执行成功后,释放async_con_cb/_con_timer对象,如果con失败,则释放自己对应的文件描述符
async_con_cb:con_cb的声明周期由async_con_cb保证
lambda函数本质:
遵循了类的特征,生命周期和类是一样的
如果有参数传入好比类的成员变量
传入引用就是引用本身的生命周期
为什么要有con_cb和async_con_cb,后面的if-else中可知,如果时直接给出ip,直接连接,连接的结果作为con_cb的参数传入
如果需要DNS解析,那么这个操作是阻塞的,异步连接。无论同步异步,连接操作完毕之后,poll该fd上的可写事件,可写后调用onConnected事件
todo
- 定时器的实现
- 工作线程分配任务的实现
- poller添加事件的实现
void Socket::connect(const string &url, uint16_t port, onErrCB con_cb_in, float timeout_sec, const string &local_ip, uint16_t local_port)
{
closeSock();
weak_ptr<Socket> weak_self = shared_from_this();
auto con_cb = [con_cb_in, weak_self](const SockException &err) {
auto strong_self = weak_self.lock();
if (!strong_self) {
return;
}
strong_self->_async_con_cb = nullptr;//
strong_self->_con_timer = nullptr;//释放智能指针指向的对象
if (err) {
LOCK_GUARD(strong_self->_mtx_sock_fd);
strong_self->_sock_fd = nullptr;
}
con_cb_in(err);
};
auto async_con_cb = std::make_shared<function<void(int)> >([weak_self, con_cb](int sock) {
//con_cb的生命周期由async_con_cb保证
auto strong_self = weak_self.lock();
if (sock == -1 || !strong_self) {
if (!strong_self) {
CLOSE_SOCK(sock);
} else {
con_cb(SockException(Err_dns, get_uv_errmsg(true)));
}
return;
}
auto sock_fd = strong_self->makeSock(sock, SockNum::Sock_TCP);
weak_ptr<SockFD> weak_sock_fd = sock_fd;
//监听该socket是否可写,可写表明已经连接服务器成功
int result = strong_self->_poller->addEvent(sock, EventPoller::Event_Write, [weak_self, weak_sock_fd, con_cb](int event) {
auto strong_sock_fd = weak_sock_fd.lock();
auto strong_self = weak_self.lock();
if (strong_sock_fd && strong_self) {
//socket可写事件,说明已经连接服务器成功
strong_self->onConnected(strong_sock_fd, con_cb);
}
});
if (result == -1) {
con_cb(SockException(Err_other, "add event to poller failed when start connect"));
return;
}
//保存fd
LOCK_GUARD(strong_self->_mtx_sock_fd);
strong_self->_sock_fd = sock_fd;
});
if (isIP(url.data())) {
(*async_con_cb)(SockUtil::connect(url.data(), port, true, local_ip.data(), local_port));
} else {
auto poller = _poller;
weak_ptr<function<void(int)>> weak_task = async_con_cb;
WorkThreadPool::Instance().getExecutor()->async([url, port, local_ip, local_port, weak_task, poller]() {
//阻塞式dns解析放在后台线程执行
int sock = SockUtil::connect(url.data(), port, true, local_ip.data(), local_port);
poller->async([sock, weak_task]() {
auto strong_task = weak_task.lock();
if (strong_task) {
(*strong_task)(sock);
} else {
CLOSE_SOCK(sock);
}
});
});
_async_con_cb = async_con_cb;//async_con_cb的生命周期由类成员_async_con_cb保证(续命)
}
//连接超时定时器 定时器堆上分配,生命周期由类成员_con_timer保证(续命)
_con_timer = std::make_shared<Timer>(timeout_sec, [weak_self, con_cb]() {
con_cb(SockException(Err_timeout, uv_strerror(UV_ETIMEDOUT)));
return false;
}, _poller);
}
onConnected函数由connect函数调用,先删除可写事件监听,再调用attachEvent(正式进入工作状态了),初始化所有线程下共享的读缓存(_read_buffer,默认256KB),连接时的可写事件表明socket是否连接服务器成功。连接后的可读可写事件表明socket上确实有事件到来了,要调用onRead|onWrite|emitErr等方法
onRead
使用socket共享的读buffer调用recvfrom,调用_on_read,即同步触发onReadCB回调:接受到了哪个地址的多少数据
send/send_l
send(Buffer::Ptr buf, struct sockaddr *addr, socklen_t addr_len, bool try_flush)
{
if (!addr || !addr_len) {
return send_l(std::move(buf), false, try_flush); //没有sock地址的buf
}
return send_l(std::make_shared<BufferSock>(std::move(buf), addr, addr_len), true, try_flush);
}
send_l(Buffer::Ptr buf, bool is_buf_sock, bool try_flush)
//
{
flushData
}
flushData
//一级缓存是多个buffer
//二级缓存是多个bufferlist,一个bufferlist是一个一级缓存,在创建时已经绑定对应的发送类型了BufferSendMsg/BufferSendMMsg
消费sock二级缓存中的数据,如果二级缓存为空就消费一级的,一级缓存清空是通过move操作,
如果一级缓存也为空,那么说明所有数据均写入socket了,poller停止监听sock可写事件,即使不可写也可以往多级缓存中写,返回
需要发数据,while循环中把二级缓存中的BufferList依次发送,同时设置sock的可写事件,有空间写了。udp发送失败,丢弃(pop_front),tcp触发异常,
回滚未发送出去的数据,写回二级缓存中
如果是poller线程就再flushData一次
startWriteAbleEvent 开始监听可写事件: send失败了,缓冲区不够了,监听以便下次接着写
stopWriteAbleEvent 停止监听可写事件: 多级buffer空了,即使不可写也能写buffer
onWriteAble: 多级缓存空了就停止监听可写事件,否则多级缓存写入socket缓冲区
listen/bindUdpSock
监听读事件,对应的回调是onAccept
onAccept
SockUtil::setNoSigpipe(fd);
SockUtil::setNoBlocked(fd);
SockUtil::setNoDelay(fd);
SockUtil::setSendBuf(fd);
SockUtil::setRecvBuf(fd);
SockUtil::setCloseWait(fd);
SockUtil::setCloExec(fd);
// tcp监听收到accept请求,自定义创建peer Socket事件(可以控制子Socket绑定到其他poller线程)
peer_sock = _on_before_accept(_poller);//?
// 监听都时间 利用自己传入deleter析构的时候执行 completed
//这样的目的是 用户处理完onAccept时间后 才能再收到onRead事件
//因为onAccept是可能异步处理的 所以不能触发事件后立即加入epoll监听onRead事件
shared_ptr<void> completed(nullptr, [peer_sock, peer_sock_fd](void *) {
try {
//然后把该fd加入poll监听(确保先触发onAccept事件然后再触发onRead等事件)
if (!peer_sock->attachEvent(peer_sock_fd, false)) {
//加入poll监听失败,触发onErr事件,通知该Socket无效
peer_sock->emitErr(SockException(Err_eof, "add event to poller failed when accept a socket"));
}
} catch (std::exception &ex) {
ErrorL << ex.what();
}
});
_socket->setOnAccept([this](Socket::Ptr &sock, shared_ptr<void> &complete) {
auto ptr = sock->getPoller().get();
auto server = getServer(ptr);
ptr->async([server, sock, complete]() {
//该tcp客户端派发给对应线程的TcpServer服务器
server->onAcceptConnection(sock);
});
});
bindUdpSock
udp没有listen_fd。就是直接监听这个端口的读写事件就行
bool Socket::bindUdpSock(uint16_t port, const string &local_ip, bool enable_reuse) {
closeSock();
int fd = SockUtil::bindUdpSock(port, local_ip.data(), enable_reuse);
if (fd == -1) {
return false;
}
auto sock = makeSock(fd, SockNum::Sock_UDP);
if (!attachEvent(sock, true)) {
return false;
}
LOCK_GUARD(_mtx_sock_fd);
_sock_fd = sock;
return true;
}
bool Socket::attachEvent(const SockFD::Ptr &sock, bool is_udp) {
weak_ptr<Socket> weak_self = shared_from_this();
weak_ptr<SockFD> weak_sock = sock;
_enable_recv = true;
_read_buffer = _poller->getSharedBuffer();
int result = _poller->addEvent(sock->rawFd(), EventPoller::Event_Read | EventPoller::Event_Error | EventPoller::Event_Write, [weak_self,weak_sock,is_udp](int event) {
auto strong_self = weak_self.lock();
auto strong_sock = weak_sock.lock();
if (!strong_self || !strong_sock) {
return;
}
if (event & EventPoller::Event_Read) {
strong_self->onRead(strong_sock, is_udp);
}
if (event & EventPoller::Event_Write) {
strong_self->onWriteAble(strong_sock);
}
if (event & EventPoller::Event_Error) {
strong_self->emitErr(getSockErr(strong_sock));
}
});
return -1 != result;
}
SockSender
子类send的实现!
virtual ssize_t send(Buffer::Ptr buf) = 0;
virtual void shutdown(const SockException &ex = SockException(Err_shutdown, "self shutdown")) = 0;
//这么多发送类型,都依赖send(Buffer::Ptr buf)的重写实现!
//发送char *
SockSender &operator << (const char *buf);
ssize_t send(const char *buf, size_t size = 0);
//发送字符串
SockSender &operator << (std::string buf);
ssize_t send(std::string buf);
//发送Buffer对象
SockSender &operator << (Buffer::Ptr buf);
//发送其他类型是数据
template<typename T>
SockSender &operator << (T &&buf) {
std::ostringstream ss;
ss << std::forward<T>(buf);
send(ss.str());
return *this;
}
test_tcpClient.cpp 这样也可以发数据
auto buf = BufferRaw::create();
if(buf){
buf->assign("[BufferRaw]\0");
(*this) << _nTick++ << " "
<< 3.14 << " "
<< string("string") << " "
<<(Buffer::Ptr &)buf;
}
SocketHelper
has a sock
对于send的实现,是对Socket::send的封装
ssize_t SocketHelper::send(Buffer::Ptr buf) {
if (!_sock) {
return -1;
}
return _sock->send(std::move(buf), nullptr, 0, _try_flush);
}//nullptr判断是否是一个bufSock getBufferSockPtr,给sendmmsg指定地址用
对于async的实现,是对EventPoller::async的封装
总结
-
weak_ptr|shared_from_this|的使用场景,异步操作时,判断对象生命周期
-
decltype常用在<>中,不像auto一样需要定义,只是声明
-
lock_guard的使用,传入一个可以lock和unlock的对象
-
recursive_mutex的使用场景
-
二级缓存在发送数据时的作用,以及回滚的实现
-
写事件的监听的目的和时机
-
shared<void> xxx(nullptr, [](){} ) 用法,自己传入deleter