简介
Acceptor作为brpc服务端接收网络连接请求
初始化
Server
用StartInternal
指定监听端口,BuildAcceptor
创建Acceptor
,会根据当前支持的协议构造InputMessageHandlerv
类型的handler,添加到Acceptor
中
Acceptor* Server::BuildAcceptor() {
std::set<std::string> whitelist;
for (butil::StringSplitter sp(_options.enabled_protocols.c_str(), ' ');
sp; ++sp) {
std::string protocol(sp.field(), sp.length());
whitelist.insert(protocol);
}
const bool has_whitelist = !whitelist.empty();
Acceptor* acceptor = new (std::nothrow) Acceptor(_keytable_pool);
if (NULL == acceptor) {
LOG(ERROR) << "Fail to new Acceptor";
return NULL;
}
InputMessageHandler handler;
std::vector<Protocol> protocols;
ListProtocols(&protocols);
for (size_t i = 0; i < protocols.size(); ++i) {
if (protocols[i].process_request == NULL) {
// The protocol does not support server-side.
continue;
}
if (has_whitelist &&
!is_http_protocol(protocols[i].name) &&
!whitelist.erase(protocols[i].name)) {
// the protocol is not allowed to serve.
RPC_VLOG << "Skip protocol=" << protocols[i].name;
continue;
}
// `process_request' is required at server side
handler.parse = protocols[i].parse;
handler.process = protocols[i].process_request;
handler.verify = protocols[i].verify;
handler.arg = this;
handler.name = protocols[i].name;
if (acceptor->AddHandler(handler) != 0) {
LOG(ERROR) << "Fail to add handler into Acceptor("
<< acceptor << ')';
delete acceptor;
return NULL;
}
}
if (!whitelist.empty()) {
std::ostringstream err;
err << "ServerOptions.enabled_protocols has unknown protocols=`";
for (std::set<std::string>::const_iterator it = whitelist.begin();
it != whitelist.end(); ++it) {
err << *it << ' ';
}
err << '\'';
delete acceptor;
LOG(ERROR) << err.str();
return NULL;
}
return acceptor;
}
使用StartAccept
来启动,指定读事件为OnNewConnections
,使用Socket::Create
创建Socket
得到SocketId
,ResetFileDescriptor
注册读事件添加到EventDispatcher
。在创建Socket
时,会设置为非阻塞,TCP_NODELAY
,根据参数设置SO_SNDBUF
,SO_RCVBUF
,SO_KEEPALIVE
SocketOptions options;
options.fd = listened_fd;
options.user = this;
options.bthread_tag = _bthread_tag;
options.on_edge_triggered_events = OnNewConnections;
if (Socket::Create(options, &_acception_id) != 0) {
// Close-idle-socket thread will be stopped inside destructor
LOG(FATAL) << "Fail to create _acception_id";
return -1;
}
int Socket::Create(const SocketOptions& options, SocketId* id) {
butil::ResourceId<Socket> slot;
Socket* const m = butil::get_resource(&slot, Forbidden());
if (m == NULL) {
LOG(FATAL) << "Fail to get_resource<Socket>";
return -1;
}
g_vars->nsocket << 1;
CHECK(NULL == m->_shared_part.load(butil::memory_order_relaxed));
m->_nevent.store(0, butil::memory_order_relaxed);
m->_keytable_pool = options.keytable_pool;
m->_tos = 0;
m->_remote_side = options.remote_side;
m->_on_edge_triggered_events = options.on_edge_triggered_events;
m->_user = options.user;
m->_conn = options.conn;
m->_app_connect = options.app_connect;
// nref can be non-zero due to concurrent AddressSocket().
// _this_id will only be used in destructor/Destroy of referenced
// slots, which is safe and properly fenced. Although it's better
// to put the id into SocketUniquePtr.
m->_this_id = MakeSocketId(
VersionOfVRef(m->_versioned_ref.fetch_add(
1, butil::memory_order_release)), slot);
m->_preferred_index = -1;
m->_hc_count = 0;
CHECK(m->_read_buf.empty());
const int64_t cpuwide_now = butil::cpuwide_time_us();
m->_last_readtime_us.store(cpuwide_now, butil::memory_order_relaxed);
m->reset_parsing_context(options.initial_parsing_context);
m->_correlation_id = 0;
m->_health_check_interval_s = options.health_check_interval_s;
m->_is_hc_related_ref_held = false;
m->_hc_started.store(false, butil::memory_order_relaxed);
m->_ninprocess.store(1, butil::memory_order_relaxed);
m->_auth_flag_error.store(0, butil::memory_order_relaxed);
const int rc2 = bthread_id_create(&m->_auth_id, NULL, NULL);
if (rc2) {
LOG(ERROR) << "Fail to create auth_id: " << berror(rc2);
m->SetFailed(rc2, "Fail to create auth_id: %s", berror(rc2));
return -1;
}
m->_force_ssl = options.force_ssl;
// Disable SSL check if there is no SSL context
m->_ssl_state = (options.initial_ssl_ctx == NULL ? SSL_OFF : SSL_UNKNOWN);
m->_ssl_session = NULL;
m->_ssl_ctx = options.initial_ssl_ctx;
#if BRPC_WITH_RDMA
CHECK(m->_rdma_ep == NULL);
if (options.use_rdma) {
m->_rdma_ep = new (std::nothrow)rdma::RdmaEndpoint(m);
if (!m->_rdma_ep) {
const int saved_errno = errno;
PLOG(ERROR) << "Fail to create RdmaEndpoint";
m->SetFailed(saved_errno, "Fail to create RdmaEndpoint: %s",
berror(saved_errno));
return -1;
}
m->_rdma_state = RDMA_UNKNOWN;
} else {
m->_rdma_state = RDMA_OFF;
}
#endif
m->_connection_type_for_progressive_read = CONNECTION_TYPE_UNKNOWN;
m->_controller_released_socket.store(false, butil::memory_order_relaxed);
m->_overcrowded = false;
// May be non-zero for RTMP connections.
m->_fail_me_at_server_stop = false;
m->_logoff_flag.store(false, butil::memory_order_relaxed);
m->_additional_ref_status.store(REF_USING, butil::memory_order_relaxed);
m->_error_code = 0;
m->_error_text.clear();
m->_agent_socket_id.store(INVALID_SOCKET_ID, butil::memory_order_relaxed);
m->_total_streams_unconsumed_size.store(0, butil::memory_order_relaxed);
m->_ninflight_app_health_check.store(0, butil::memory_order_relaxed);
// NOTE: last two params are useless in bthread > r32787
const int rc = bthread_id_list_init(&m->_id_wait_list, 512, 512);
if (rc) {
LOG(ERROR) << "Fail to init _id_wait_list: " << berror(rc);
m->SetFailed(rc, "Fail to init _id_wait_list: %s", berror(rc));
return -1;
}
m->_last_writetime_us.store(cpuwide_now, butil::memory_order_relaxed);
m->_unwritten_bytes.store(0, butil::memory_order_relaxed);
m->_keepalive_options = options.keepalive_options;
m->_bthread_tag = options.bthread_tag;
CHECK(NULL == m->_write_head.load(butil::memory_order_relaxed));
m->_is_write_shutdown = false;
// Must be last one! Internal fields of this Socket may be access
// just after calling ResetFileDescriptor.
if (m->ResetFileDescriptor(options.fd) != 0) {
const int saved_errno = errno;
PLOG(ERROR) << "Fail to ResetFileDescriptor";
m->SetFailed(saved_errno, "Fail to ResetFileDescriptor: %s",
berror(saved_errno));
return -1;
}
*id = m->_this_id;
return 0;
}
int Socket::ResetFileDescriptor(int fd) {
// Reset message sizes when fd is changed.
_last_msg_size = 0;
_avg_msg_size = 0;
// MUST store `_fd' before adding itself into epoll device to avoid
// race conditions with the callback function inside epoll
_fd.store(fd, butil::memory_order_release);
_reset_fd_real_us = butil::gettimeofday_us();
if (!ValidFileDescriptor(fd)) {
return 0;
}
// OK to fail, non-socket fd does not support this.
if (butil::get_local_side(fd, &_local_side) != 0) {
_local_side = butil::EndPoint();
}
// FIXME : close-on-exec should be set by new syscalls or worse: set right
// after fd-creation syscall. Setting at here has higher probabilities of
// race condition.
butil::make_close_on_exec(fd);
// Make the fd non-blocking.
if (butil::make_non_blocking(fd) != 0) {
PLOG(ERROR) << "Fail to set fd=" << fd << " to non-blocking";
return -1;
}
// turn off nagling.
// OK to fail, namely unix domain socket does not support this.
butil::make_no_delay(fd);
if (_tos > 0 &&
setsockopt(fd, IPPROTO_IP, IP_TOS, &_tos, sizeof(_tos)) != 0) {
PLOG(ERROR) << "Fail to set tos of fd=" << fd << " to " << _tos;
}
if (FLAGS_socket_send_buffer_size > 0) {
int buff_size = FLAGS_socket_send_buffer_size;
if (setsockopt(fd, SOL_SOCKET, SO_SNDBUF, &buff_size, sizeof(buff_size)) != 0) {
PLOG(ERROR) << "Fail to set sndbuf of fd=" << fd << " to "
<< buff_size;
}
}
if (FLAGS_socket_recv_buffer_size > 0) {
int buff_size = FLAGS_socket_recv_buffer_size;
if (setsockopt(fd, SOL_SOCKET, SO_RCVBUF, &buff_size, sizeof(buff_size)) != 0) {
PLOG(ERROR) << "Fail to set rcvbuf of fd=" << fd << " to "
<< buff_size;
}
}
EnableKeepaliveIfNeeded(fd);
if (_on_edge_triggered_events) {
if (GetGlobalEventDispatcher(fd, _bthread_tag).AddConsumer(id(), fd) != 0) {
PLOG(ERROR) << "Fail to add SocketId=" << id()
<< " into EventDispatcher";
_fd.store(-1, butil::memory_order_release);
return -1;
}
}
return 0;
}
处理新连接
处理新连接是通过OnNewConnections
。有新连接到来得到新的套接字,设置读事件为OnNewMessages
,使用Socket::Create
创建Socket
注册到事件分发器中
void Acceptor::OnNewConnections(Socket* acception) {
int progress = Socket::PROGRESS_INIT;
do {
OnNewConnectionsUntilEAGAIN(acception);
if (acception->Failed()) {
return;
}
} while (acception->MoreReadEvents(&progress));
}
void Acceptor::OnNewConnectionsUntilEAGAIN(Socket* acception) {
while (1) {
struct sockaddr_storage in_addr;
bzero(&in_addr, sizeof(in_addr));
socklen_t in_len = sizeof(in_addr);
butil::fd_guard in_fd(accept(acception->fd(), (sockaddr*)&in_addr, &in_len));
if (in_fd < 0) {
// no EINTR because listened fd is non-blocking.
if (errno == EAGAIN) {
return;
}
// Do NOT return -1 when `accept' failed, otherwise `_listened_fd'
// will be closed. Continue to consume all the events until EAGAIN
// instead.
// If the accept was failed, the error may repeat constantly,
// limit frequency of logging.
PLOG_EVERY_SECOND(ERROR)
<< "Fail to accept from listened_fd=" << acception->fd();
continue;
}
Acceptor* am = dynamic_cast<Acceptor*>(acception->user());
if (NULL == am) {
LOG(FATAL) << "Impossible! acception->user() MUST be Acceptor";
acception->SetFailed(EINVAL, "Impossible! acception->user() MUST be Acceptor");
return;
}
SocketId socket_id;
SocketOptions options;
options.keytable_pool = am->_keytable_pool;
options.fd = in_fd;
butil::sockaddr2endpoint(&in_addr, in_len, &options.remote_side);
options.user = acception->user();
options.force_ssl = am->_force_ssl;
options.initial_ssl_ctx = am->_ssl_ctx;
#if BRPC_WITH_RDMA
if (am->_use_rdma) {
options.on_edge_triggered_events = rdma::RdmaEndpoint::OnNewDataFromTcp;
} else {
#else
{
#endif
options.on_edge_triggered_events = InputMessenger::OnNewMessages;
}
options.use_rdma = am->_use_rdma;
options.bthread_tag = am->_bthread_tag;
if (Socket::Create(options, &socket_id) != 0) {
LOG(ERROR) << "Fail to create Socket";
continue;
}
in_fd.release(); // transfer ownership to socket_id
// There's a funny race condition here. After Socket::Create, messages
// from the socket are already handled and a RPC is possibly done
// before the socket is added into _socket_map below. This is found in
// ChannelTest.skip_parallel in test/brpc_channel_unittest.cpp (running
// on machines with few cores) where the _messenger.ConnectionCount()
// may surprisingly be 0 even if the RPC is already done.
SocketUniquePtr sock;
if (Socket::AddressFailedAsWell(socket_id, &sock) >= 0) {
bool is_running = true;
{
BAIDU_SCOPED_LOCK(am->_map_mutex);
is_running = (am->status() == RUNNING);
// Always add this socket into `_socket_map' whether it
// has been `SetFailed' or not, whether `Acceptor' is
// running or not. Otherwise, `Acceptor::BeforeRecycle'
// may be called (inside Socket::OnRecycle) after `Acceptor'
// has been destroyed
am->_socket_map.insert(socket_id, ConnectStatistics());
}
if (!is_running) {
LOG(WARNING) << "Acceptor on fd=" << acception->fd()
<< " has been stopped, discard newly created " << *sock;
sock->SetFailed(ELOGOFF, "Acceptor on fd=%d has been stopped, "
"discard newly created %s", acception->fd(),
sock->description().c_str());
return;
}
} // else: The socket has already been destroyed, Don't add its id
// into _socket_map
}
}