简介
muduo用TcpClient发起连接,TcpClient有一个Connector连接器,TCPClient使用Conneccor发起连接, 连接建立成功后, 用socket创建TcpConnection来管理连接, 每个TcpClient class只管理一个TcpConnecction,连接建立成功后设置相应的回调函数。很显然,TcpClient用来管理客户端连接,真正连接交给Connector。
主要成员及属性解析
主要接口
回调setters
这些回调函数会在新连接建立时,通过newConnection内部实现方法传递给TcpConnction对象
核心实现:newConnection
在构造时将这个函数作为回调注册给connector_对象
在Connector中的Channel执行本回调后,创建一个新的TcpConnection对象
connect
调用Connector的start接口
stop
调用Connector的stop接口
主要成员
loop
所属workloop
connector
TcpClient所维护的一个连接器
retry_
重连标志
TcpConnection connection_
TcpClient所维护的一个TCP连接对象
关于连接中回调的传递,参考下面的简图:
源码剖析
代码已编写完整注释,
TcpClient.h
// Copyright 2010, Shuo Chen. All rights reserved.
// http://code.google.com/p/muduo/
//
// Use of this source code is governed by a BSD-style license
// that can be found in the License file.
// Author: Shuo Chen (chenshuo at chenshuo dot com)
//
// This is a public header file, it must only include public header files.
#ifndef MUDUO_NET_TCPCLIENT_H
#define MUDUO_NET_TCPCLIENT_H
#include "muduo/base/Mutex.h"
#include "muduo/net/TcpConnection.h"
namespace muduo
{
namespace net
{
class Connector;
typedef std::shared_ptr<Connector> ConnectorPtr;
class TcpClient : noncopyable
{
public:
// TcpClient(EventLoop* loop);
// TcpClient(EventLoop* loop, const string& host, uint16_t port);
TcpClient(EventLoop* loop,
const InetAddress& serverAddr,
const string& nameArg);
~TcpClient(); // force out-line dtor, for std::unique_ptr members.
void connect();//请求连接
void disconnect();//断开连接
void stop();//停止连接
TcpConnectionPtr connection() const
{
MutexLockGuard lock(mutex_);
return connection_;
}
EventLoop* getLoop() const { return loop_; }
bool retry() const { return retry_; }
void enableRetry() { retry_ = true; }
const string& name() const
{ return name_; }
/// Set connection callback.
/// Not thread safe.
void setConnectionCallback(ConnectionCallback cb)
{ connectionCallback_ = std::move(cb); }
/// Set message callback.
/// Not thread safe.
void setMessageCallback(MessageCallback cb)
{ messageCallback_ = std::move(cb); }
/// Set write complete callback.
/// Not thread safe.
void setWriteCompleteCallback(WriteCompleteCallback cb)
{ writeCompleteCallback_ = std::move(cb); }
private:
/// Not thread safe, but in loop
//新连接建立后的回调函数,将新连接封装为TcpConnection交给TcpClient来管理
void newConnection(int sockfd);
/// Not thread safe, but in loop
释放连接
void removeConnection(const TcpConnectionPtr& conn);
//所属loop
EventLoop* loop_;
//Connector,用来处理连接阶段,
ConnectorPtr connector_; // avoid revealing Connector
const string name_;
ConnectionCallback connectionCallback_;//连接回调
MessageCallback messageCallback_;//消息回调
WriteCompleteCallback writeCompleteCallback_;//数据发送完成回调
//是否重连
bool retry_; // atomic
//是否连接
bool connect_; // atomic
// always in loop thread
int nextConnId_;
mutable MutexLock mutex_;
//管理连接的TcpConnection
TcpConnectionPtr connection_ GUARDED_BY(mutex_);
};
} // namespace net
} // namespace muduo
#endif // MUDUO_NET_TCPCLIENT_H
TcpClient.cc
// Copyright 2010, Shuo Chen. All rights reserved.
// http://code.google.com/p/muduo/
//
// Use of this source code is governed by a BSD-style license
// that can be found in the License file.
// Author: Shuo Chen (chenshuo at chenshuo dot com)
//
#include "muduo/net/TcpClient.h"
#include "muduo/base/Logging.h"
#include "muduo/net/Connector.h"
#include "muduo/net/EventLoop.h"
#include "muduo/net/SocketsOps.h"
#include <stdio.h> // snprintf
using namespace muduo;
using namespace muduo::net;
// TcpClient::TcpClient(EventLoop* loop)
// : loop_(loop)
// {
// }
// TcpClient::TcpClient(EventLoop* loop, const string& host, uint16_t port)
// : loop_(CHECK_NOTNULL(loop)),
// serverAddr_(host, port)
// {
// }
namespace muduo
{
namespace net
{
namespace detail
{
//断开连接
void removeConnection(EventLoop* loop, const TcpConnectionPtr& conn)
{
loop->queueInLoop(std::bind(&TcpConnection::connectDestroyed, conn));
}
void removeConnector(const ConnectorPtr& connector)
{
//connector->
}
} // namespace detail
} // namespace net
} // namespace muduo
TcpClient::TcpClient(EventLoop* loop,
const InetAddress& serverAddr,
const string& nameArg)
: loop_(CHECK_NOTNULL(loop)),
connector_(new Connector(loop, serverAddr)),
name_(nameArg),
connectionCallback_(defaultConnectionCallback),
messageCallback_(defaultMessageCallback),
retry_(false),
connect_(true),
nextConnId_(1)
{
//设置Connector新连接建立的回调函数
connector_->setNewConnectionCallback(
std::bind(&TcpClient::newConnection, this, _1));
// FIXME setConnectFailedCallback
LOG_INFO << "TcpClient::TcpClient[" << name_
<< "] - connector " << get_pointer(connector_);
}
TcpClient::~TcpClient()
{
LOG_INFO << "TcpClient::~TcpClient[" << name_
<< "] - connector " << get_pointer(connector_);
TcpConnectionPtr conn;
bool unique = false;
{
MutexLockGuard lock(mutex_);
//检查所管理对象是否仅由当前 shared_ptr 的实例管理
unique = connection_.unique();
conn = connection_;
}
if (conn)
{
assert(loop_ == conn->getLoop());
// FIXME: not 100% safe, if we are in different thread
//执行断开连接操作
CloseCallback cb = std::bind(&detail::removeConnection, loop_, _1);
loop_->runInLoop(
std::bind(&TcpConnection::setCloseCallback, conn, cb));
//如果TcpConnection只有一份,那就强行关闭连接
if (unique)
{
conn->forceClose();
}
}
else
{
connector_->stop();
// FIXME: HACK
loop_->runAfter(1, std::bind(&detail::removeConnector, connector_));
}
}
//请求连接服务器
void TcpClient::connect()
{
// FIXME: check state
LOG_INFO << "TcpClient::connect[" << name_ << "] - connecting to "
<< connector_->serverAddress().toIpPort();
connect_ = true;
connector_->start();
}
//断开连接
void TcpClient::disconnect()
{
connect_ = false;
{
MutexLockGuard lock(mutex_);
if (connection_)
{
connection_->shutdown();
}
}
}
void TcpClient::stop()
{
connect_ = false;
connector_->stop();
}
//新连接建立后的回调函数,将新连接封装为TcpConnection交给TcpClient来管理
void TcpClient::newConnection(int sockfd)
{
loop_->assertInLoopThread();
//获取服务器地址并打印
InetAddress peerAddr(sockets::getPeerAddr(sockfd));
char buf[32];
snprintf(buf, sizeof buf, ":%s#%d", peerAddr.toIpPort().c_str(), nextConnId_);
++nextConnId_;
string connName = name_ + buf;
//获取client地址
InetAddress localAddr(sockets::getLocalAddr(sockfd));
// FIXME poll with zero timeout to double confirm the new connection
// FIXME use make_shared if necessary
//创建一个TcpConnection,并设置相关回调
TcpConnectionPtr conn(new TcpConnection(loop_,
connName,
sockfd,
localAddr,
peerAddr));
conn->setConnectionCallback(connectionCallback_);
conn->setMessageCallback(messageCallback_);
conn->setWriteCompleteCallback(writeCompleteCallback_);
conn->setCloseCallback(
std::bind(&TcpClient::removeConnection, this, _1)); // FIXME: unsafe
{
MutexLockGuard lock(mutex_);
connection_ = conn;
}
//调用连接建立函数
conn->connectEstablished();
}
//释放连接
void TcpClient::removeConnection(const TcpConnectionPtr& conn)
{
loop_->assertInLoopThread();
assert(loop_ == conn->getLoop());
{
MutexLockGuard lock(mutex_);
assert(connection_ == conn);
//释放TcpConnection
connection_.reset();
}
//在所属loop中执行连接断开释放操作
loop_->queueInLoop(std::bind(&TcpConnection::connectDestroyed, conn));
//如果设置了重连并且连接标志为true,那就重新连接
if (retry_ && connect_)
{
LOG_INFO << "TcpClient::connect[" << name_ << "] - Reconnecting to "
<< connector_->serverAddress().toIpPort();
connector_->restart();//重新启动
}
}