brpc之Acceptor

news2025/1/7 16:27:55

简介

Acceptor作为brpc服务端接收网络连接请求

初始化

ServerStartInternal指定监听端口,BuildAcceptor创建Acceptor,会根据当前支持的协议构造InputMessageHandlerv类型的handler,添加到Acceptor

Acceptor* Server::BuildAcceptor() {
    std::set<std::string> whitelist;
    for (butil::StringSplitter sp(_options.enabled_protocols.c_str(), ' ');
         sp; ++sp) {
        std::string protocol(sp.field(), sp.length());
        whitelist.insert(protocol);
    }
    const bool has_whitelist = !whitelist.empty();
    Acceptor* acceptor = new (std::nothrow) Acceptor(_keytable_pool);
    if (NULL == acceptor) {
        LOG(ERROR) << "Fail to new Acceptor";
        return NULL;
    }
    InputMessageHandler handler;
    std::vector<Protocol> protocols;
    ListProtocols(&protocols);
    for (size_t i = 0; i < protocols.size(); ++i) {
        if (protocols[i].process_request == NULL) {
            // The protocol does not support server-side.
            continue;
        }
        if (has_whitelist &&
            !is_http_protocol(protocols[i].name) &&
            !whitelist.erase(protocols[i].name)) {
            // the protocol is not allowed to serve.
            RPC_VLOG << "Skip protocol=" << protocols[i].name;
            continue;
        }
        // `process_request' is required at server side
        handler.parse = protocols[i].parse;
        handler.process = protocols[i].process_request;
        handler.verify = protocols[i].verify;
        handler.arg = this;
        handler.name = protocols[i].name;
        if (acceptor->AddHandler(handler) != 0) {
            LOG(ERROR) << "Fail to add handler into Acceptor("
                       << acceptor << ')';
            delete acceptor;
            return NULL;
        }
    }
    if (!whitelist.empty()) {
        std::ostringstream err;
        err << "ServerOptions.enabled_protocols has unknown protocols=`";
        for (std::set<std::string>::const_iterator it = whitelist.begin();
             it != whitelist.end(); ++it) {
            err << *it << ' ';
        }
        err << '\'';
        delete acceptor;
        LOG(ERROR) << err.str();
        return NULL;
    }
    return acceptor;
}

使用StartAccept来启动,指定读事件为OnNewConnections,使用Socket::Create创建Socket得到SocketId,ResetFileDescriptor注册读事件添加到EventDispatcher。在创建Socket时,会设置为非阻塞,TCP_NODELAY,根据参数设置SO_SNDBUF,SO_RCVBUF,SO_KEEPALIVE

SocketOptions options;
options.fd = listened_fd;
options.user = this;
options.bthread_tag = _bthread_tag;
options.on_edge_triggered_events = OnNewConnections;
if (Socket::Create(options, &_acception_id) != 0) {
    // Close-idle-socket thread will be stopped inside destructor
    LOG(FATAL) << "Fail to create _acception_id";
    return -1;
}

int Socket::Create(const SocketOptions& options, SocketId* id) {
    butil::ResourceId<Socket> slot;
    Socket* const m = butil::get_resource(&slot, Forbidden());
    if (m == NULL) {
        LOG(FATAL) << "Fail to get_resource<Socket>";
        return -1;
    }
    g_vars->nsocket << 1;
    CHECK(NULL == m->_shared_part.load(butil::memory_order_relaxed));
    m->_nevent.store(0, butil::memory_order_relaxed);
    m->_keytable_pool = options.keytable_pool;
    m->_tos = 0;
    m->_remote_side = options.remote_side;
    m->_on_edge_triggered_events = options.on_edge_triggered_events;
    m->_user = options.user;
    m->_conn = options.conn;
    m->_app_connect = options.app_connect;
    // nref can be non-zero due to concurrent AddressSocket().
    // _this_id will only be used in destructor/Destroy of referenced
    // slots, which is safe and properly fenced. Although it's better
    // to put the id into SocketUniquePtr.
    m->_this_id = MakeSocketId(
            VersionOfVRef(m->_versioned_ref.fetch_add(
                    1, butil::memory_order_release)), slot);
    m->_preferred_index = -1;
    m->_hc_count = 0;
    CHECK(m->_read_buf.empty());
    const int64_t cpuwide_now = butil::cpuwide_time_us();
    m->_last_readtime_us.store(cpuwide_now, butil::memory_order_relaxed);
    m->reset_parsing_context(options.initial_parsing_context);
    m->_correlation_id = 0;
    m->_health_check_interval_s = options.health_check_interval_s;
    m->_is_hc_related_ref_held = false;
    m->_hc_started.store(false, butil::memory_order_relaxed);
    m->_ninprocess.store(1, butil::memory_order_relaxed);
    m->_auth_flag_error.store(0, butil::memory_order_relaxed);
    const int rc2 = bthread_id_create(&m->_auth_id, NULL, NULL);
    if (rc2) {
        LOG(ERROR) << "Fail to create auth_id: " << berror(rc2);
        m->SetFailed(rc2, "Fail to create auth_id: %s", berror(rc2));
        return -1;
    }
    m->_force_ssl = options.force_ssl;
    // Disable SSL check if there is no SSL context
    m->_ssl_state = (options.initial_ssl_ctx == NULL ? SSL_OFF : SSL_UNKNOWN);
    m->_ssl_session = NULL;
    m->_ssl_ctx = options.initial_ssl_ctx;
#if BRPC_WITH_RDMA
    CHECK(m->_rdma_ep == NULL);
    if (options.use_rdma) {
        m->_rdma_ep = new (std::nothrow)rdma::RdmaEndpoint(m);
        if (!m->_rdma_ep) {
            const int saved_errno = errno;
            PLOG(ERROR) << "Fail to create RdmaEndpoint";
            m->SetFailed(saved_errno, "Fail to create RdmaEndpoint: %s",
                         berror(saved_errno));
            return -1;
        }
        m->_rdma_state = RDMA_UNKNOWN;
    } else {
        m->_rdma_state = RDMA_OFF;
    }
#endif
    m->_connection_type_for_progressive_read = CONNECTION_TYPE_UNKNOWN;
    m->_controller_released_socket.store(false, butil::memory_order_relaxed);
    m->_overcrowded = false;
    // May be non-zero for RTMP connections.
    m->_fail_me_at_server_stop = false;
    m->_logoff_flag.store(false, butil::memory_order_relaxed);
    m->_additional_ref_status.store(REF_USING, butil::memory_order_relaxed);
    m->_error_code = 0;
    m->_error_text.clear();
    m->_agent_socket_id.store(INVALID_SOCKET_ID, butil::memory_order_relaxed);
    m->_total_streams_unconsumed_size.store(0, butil::memory_order_relaxed);
    m->_ninflight_app_health_check.store(0, butil::memory_order_relaxed);
    // NOTE: last two params are useless in bthread > r32787
    const int rc = bthread_id_list_init(&m->_id_wait_list, 512, 512);
    if (rc) {
        LOG(ERROR) << "Fail to init _id_wait_list: " << berror(rc);
        m->SetFailed(rc, "Fail to init _id_wait_list: %s", berror(rc));
        return -1;
    }
    m->_last_writetime_us.store(cpuwide_now, butil::memory_order_relaxed);
    m->_unwritten_bytes.store(0, butil::memory_order_relaxed);
    m->_keepalive_options = options.keepalive_options;
    m->_bthread_tag = options.bthread_tag;
    CHECK(NULL == m->_write_head.load(butil::memory_order_relaxed));
    m->_is_write_shutdown = false;
    // Must be last one! Internal fields of this Socket may be access
    // just after calling ResetFileDescriptor.
    if (m->ResetFileDescriptor(options.fd) != 0) {
        const int saved_errno = errno;
        PLOG(ERROR) << "Fail to ResetFileDescriptor";
        m->SetFailed(saved_errno, "Fail to ResetFileDescriptor: %s", 
                     berror(saved_errno));
        return -1;
    }
    *id = m->_this_id;
    return 0;
}
int Socket::ResetFileDescriptor(int fd) {
    // Reset message sizes when fd is changed.
    _last_msg_size = 0;
    _avg_msg_size = 0;
    // MUST store `_fd' before adding itself into epoll device to avoid
    // race conditions with the callback function inside epoll
    _fd.store(fd, butil::memory_order_release);
    _reset_fd_real_us = butil::gettimeofday_us();
    if (!ValidFileDescriptor(fd)) {
        return 0;
    }
    // OK to fail, non-socket fd does not support this.
    if (butil::get_local_side(fd, &_local_side) != 0) {
        _local_side = butil::EndPoint();
    }

    // FIXME : close-on-exec should be set by new syscalls or worse: set right
    // after fd-creation syscall. Setting at here has higher probabilities of
    // race condition.
    butil::make_close_on_exec(fd);

    // Make the fd non-blocking.
    if (butil::make_non_blocking(fd) != 0) {
        PLOG(ERROR) << "Fail to set fd=" << fd << " to non-blocking";
        return -1;
    }
    // turn off nagling.
    // OK to fail, namely unix domain socket does not support this.
    butil::make_no_delay(fd);
    if (_tos > 0 &&
        setsockopt(fd, IPPROTO_IP, IP_TOS, &_tos, sizeof(_tos)) != 0) {
        PLOG(ERROR) << "Fail to set tos of fd=" << fd << " to " << _tos;
    }

    if (FLAGS_socket_send_buffer_size > 0) {
        int buff_size = FLAGS_socket_send_buffer_size;
        if (setsockopt(fd, SOL_SOCKET, SO_SNDBUF, &buff_size, sizeof(buff_size)) != 0) {
            PLOG(ERROR) << "Fail to set sndbuf of fd=" << fd << " to "
                        << buff_size;
        }
    }

    if (FLAGS_socket_recv_buffer_size > 0) {
        int buff_size = FLAGS_socket_recv_buffer_size;
        if (setsockopt(fd, SOL_SOCKET, SO_RCVBUF, &buff_size, sizeof(buff_size)) != 0) {
            PLOG(ERROR) << "Fail to set rcvbuf of fd=" << fd << " to "
                        << buff_size;
        }
    }

    EnableKeepaliveIfNeeded(fd);

    if (_on_edge_triggered_events) {
        if (GetGlobalEventDispatcher(fd, _bthread_tag).AddConsumer(id(), fd) != 0) {
            PLOG(ERROR) << "Fail to add SocketId=" << id() 
                        << " into EventDispatcher";
            _fd.store(-1, butil::memory_order_release);
            return -1;
        }
    }
    return 0;
}

处理新连接

处理新连接是通过OnNewConnections。有新连接到来得到新的套接字,设置读事件为OnNewMessages,使用Socket::Create创建Socket注册到事件分发器中

void Acceptor::OnNewConnections(Socket* acception) {
    int progress = Socket::PROGRESS_INIT;
    do {
        OnNewConnectionsUntilEAGAIN(acception);
        if (acception->Failed()) {
            return;
        }
    } while (acception->MoreReadEvents(&progress));
}

void Acceptor::OnNewConnectionsUntilEAGAIN(Socket* acception) {
    while (1) {
        struct sockaddr_storage in_addr;
        bzero(&in_addr, sizeof(in_addr));
        socklen_t in_len = sizeof(in_addr);
        butil::fd_guard in_fd(accept(acception->fd(), (sockaddr*)&in_addr, &in_len));
        if (in_fd < 0) {
            // no EINTR because listened fd is non-blocking.
            if (errno == EAGAIN) {
                return;
            }
            // Do NOT return -1 when `accept' failed, otherwise `_listened_fd'
            // will be closed. Continue to consume all the events until EAGAIN
            // instead.
            // If the accept was failed, the error may repeat constantly, 
            // limit frequency of logging.
            PLOG_EVERY_SECOND(ERROR)
                << "Fail to accept from listened_fd=" << acception->fd();
            continue;
        }

        Acceptor* am = dynamic_cast<Acceptor*>(acception->user());
        if (NULL == am) {
            LOG(FATAL) << "Impossible! acception->user() MUST be Acceptor";
            acception->SetFailed(EINVAL, "Impossible! acception->user() MUST be Acceptor");
            return;
        }
        
        SocketId socket_id;
        SocketOptions options;
        options.keytable_pool = am->_keytable_pool;
        options.fd = in_fd;
        butil::sockaddr2endpoint(&in_addr, in_len, &options.remote_side);
        options.user = acception->user();
        options.force_ssl = am->_force_ssl;
        options.initial_ssl_ctx = am->_ssl_ctx;
#if BRPC_WITH_RDMA
        if (am->_use_rdma) {
            options.on_edge_triggered_events = rdma::RdmaEndpoint::OnNewDataFromTcp;
        } else {
#else
        {
#endif
            options.on_edge_triggered_events = InputMessenger::OnNewMessages;
        }
        options.use_rdma = am->_use_rdma;
        options.bthread_tag = am->_bthread_tag;
        if (Socket::Create(options, &socket_id) != 0) {
            LOG(ERROR) << "Fail to create Socket";
            continue;
        }
        in_fd.release(); // transfer ownership to socket_id

        // There's a funny race condition here. After Socket::Create, messages
        // from the socket are already handled and a RPC is possibly done
        // before the socket is added into _socket_map below. This is found in
        // ChannelTest.skip_parallel in test/brpc_channel_unittest.cpp (running
        // on machines with few cores) where the _messenger.ConnectionCount()
        // may surprisingly be 0 even if the RPC is already done.

        SocketUniquePtr sock;
        if (Socket::AddressFailedAsWell(socket_id, &sock) >= 0) {
            bool is_running = true;
            {
                BAIDU_SCOPED_LOCK(am->_map_mutex);
                is_running = (am->status() == RUNNING);
                // Always add this socket into `_socket_map' whether it
                // has been `SetFailed' or not, whether `Acceptor' is
                // running or not. Otherwise, `Acceptor::BeforeRecycle'
                // may be called (inside Socket::OnRecycle) after `Acceptor'
                // has been destroyed
                am->_socket_map.insert(socket_id, ConnectStatistics());
            }
            if (!is_running) {
                LOG(WARNING) << "Acceptor on fd=" << acception->fd()
                    << " has been stopped, discard newly created " << *sock;
                sock->SetFailed(ELOGOFF, "Acceptor on fd=%d has been stopped, "
                        "discard newly created %s", acception->fd(),
                        sock->description().c_str());
                return;
            }
        } // else: The socket has already been destroyed, Don't add its id
          // into _socket_map
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2271957.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

法拉利F80发布 360万欧元限量799辆 25年Q4交付

今日&#xff0c;法拉利旗下全新超级跑车——F80正式发布&#xff0c;新车将作为法拉利GTO和法拉利LaFerrari&#xff08;参数丨图片&#xff09; Aterta的继任者&#xff0c;搭载V6混合动力系统&#xff0c;最大综合输出功率高达1632马力。售价360万欧元&#xff0c;全球限量生…

【pytorch练习】使用pytorch神经网络架构拟合余弦曲线

在本篇博客中&#xff0c;我们将通过一个简单的例子&#xff0c;讲解如何使用 PyTorch 实现一个神经网络模型来拟合余弦函数。本文将详细分析每个步骤&#xff0c;从数据准备到模型的训练与评估&#xff0c;帮助大家更好地理解如何使用 PyTorch 进行模型构建和训练。 一、背景 …

电脑steam api dll缺失了怎么办?

电脑故障解析与自救指南&#xff1a;Steam API DLL缺失问题的全面解析 在软件开发与电脑维护的广阔天地里&#xff0c;我们时常会遇到各种各样的系统报错与文件问题&#xff0c;其中“Steam API DLL缺失”便是让不少游戏爱好者和游戏开发者头疼的难题之一。作为一名深耕软件开…

Conda 安装 Jupyter Notebook

文章目录 1. 安装 Conda下载与安装步骤&#xff1a; 2. 创建虚拟环境3. 安装 Jupyter Notebook4. 启动 Jupyter Notebook5. 安装扩展功能&#xff08;可选&#xff09;6. 更新与维护7. 总结 Jupyter Notebook 是一款非常流行的交互式开发工具&#xff0c;尤其适合数据科学、机器…

组合的能力

在《德鲁克最后的忠告》一书中&#xff0c;有这样一段话&#xff1a; 企业将由各种积木组建而成&#xff1a;人员、产品、理念和建筑。积木的设计组合至少和其供给一样重要。……对于一切程序、应用软件以及附件来说&#xff0c;重要的是掌握将已有的软件模块组合的能力&…

去掉el-table中自带的边框线

1.问题:el-table中自带的边框线 2.解决后的效果: 3.分析:明明在el-table中没有添加border,但是会出现边框线. 可能的原因: 由 Element UI 的默认样式或者表格的某些内置样式引起的。比如,<el-table> 会通过 border-collapse 或 border-spacing 等属性影响边框的显示。 4…

大模型与EDA工具

EDA工具&#xff0c;目标是硬件设计&#xff0c;而硬件设计&#xff0c;您也可以看成是一个编程过程。 大模型可以辅助软件编程&#xff0c;相信很多人都体验过了。但大都是针对高级语言的软件编程&#xff0c;比如&#xff1a;C&#xff0c;Java&#xff0c;Python&#xff0c…

【HarmonyOS之旅】基于ArkTS开发(一) -> Ability开发一

目录 1 -> FA模型综述 1.1 -> 整体架构 1.2 -> 应用包结构 1.3 -> 生命周期 1.4 -> 进程线程模型 2 -> PageAbility开发 2.1 -> 概述 2.1.1 ->功能简介 2.1.2 -> PageAbility的生命周期 2.1.3 -> 启动模式 2.2 -> featureAbility接…

BART:用于自然语言生成、翻译和理解的去噪序列到序列预训练

摘要&#xff1a; 我们提出了BART&#xff0c;一种用于预训练序列到序列模型的去噪自编码器。BART通过以下方式训练&#xff1a;(1) 使用任意的噪声函数对文本进行破坏&#xff0c;(2) 学习一个模型来重建原始文本。它采用了一种标准的基于Transformer的神经机器翻译架构&#…

Promise编码小挑战

题目 我们将实现一个 createImage 函数&#xff0c;该函数返回一个 Promise&#xff0c;用于处理图片加载的异步操作。此外&#xff0c;还会实现暂停执行的 wait 函数。 Part 1: createImage 函数 该函数会&#xff1a; 创建一个新的图片元素。将图片的 src 设置为提供的路径…

Dubbo扩展点加载机制

加载机制中已经存在的一些关键注解&#xff0c;如SPI、©Adaptive> ©Activateo然后介绍整个加载机制中最核心的ExtensionLoader的工作流程及实现原理。最后介绍扩展中使用的类动态编译的实 现原理。 Java SPI Java 5 中的服务提供商https://docs.oracle.com/jav…

【Web】软件系统安全赛CachedVisitor——记一次二开工具的经历

明天开始考试周&#xff0c;百无聊赖开了一把CTF&#xff0c;还顺带体验了下二开工具&#xff0c;让无聊的Z3很开心&#x1f642; CachedVisitor这题 大概描述一下&#xff1a;从main.lua加载一段visit.script中被##LUA_START##(.-)##LUA_END##包裹的lua代码 main.lua loca…

在不到 5 分钟的时间内将威胁情报 PDF 添加为 AI 助手的自定义知识

作者&#xff1a;来自 Elastic jamesspi 安全运营团队通常会维护威胁情报报告的存储库&#xff0c;这些报告包含由报告提供商生成的大量知识。然而&#xff0c;挑战在于&#xff0c;这些报告的内容通常以 PDF 格式存在&#xff0c;使得在处理安全事件或调查时难以检索和引用相关…

vscode代码AI插件Continue 安装与使用

“Continue” 是一款强大的插件&#xff0c;它主要用于在开发过程中提供智能的代码延续功能。例如&#xff0c;当你在编写代码并且需要进行下一步操作或者完成一个代码块时&#xff0c;它能够根据代码的上下文、语法规则以及相关的库和框架知识&#xff0c;为你提供可能的代码续…

leetcode(hot100)4

解题思路&#xff1a;双指针思想 利用两个for循环&#xff0c;第一个for循环把所有非0的全部移到前面&#xff0c;第二个for循环将指针放在非0的末尾全部加上0。 还有一种解法就是利用while循环双指针条件&#xff0c;当不为0就两个指针一起移动 &#xff0c;为0就只移动右指针…

vulnhub——Earth靶机

使用命令在kali查看靶机ip arp-scan -l 第一 信息收集 使用 nmap 进行 dns 解析 把这两条解析添加到hosts文件中去&#xff0c;这样我们才可以访问页面 这样网站就可以正常打开 扫描ip时候我们发现443是打开的&#xff0c;扫描第二个dns解析的443端口能扫描出来一个 txt 文件…

k8s基础(1)—Kubernetes-Pod

一、Pod简介 Pod是Kubernetes&#xff08;k8s&#xff09;系统中可以创建和管理的最小单元&#xff0c;是资源对象模型中由用户创建或部署的最小资源对象模型‌。Pod是由一个或多个容器组成的&#xff0c;这些容器共享存储和网络资源&#xff0c;可以看作是一个逻辑的主机‌。…

【FlutterDart】页面切换 PageView PageController(9 /100)

上效果&#xff1a; 有些不能理解官方例子里的动画为什么没有效果&#xff0c;有可能是我写法不对 后续如果有动画效果修复了&#xff0c;再更新这篇&#xff0c;没有动画效果&#xff0c;总觉得感受的丝滑效果差了很多 上代码&#xff1a; import package:flutter/material.…

使用 NestJS 构建高效且模块化的 Node.js 应用程序,从安装到第一个 API 端点:一步一步指南

一、安装 NestJS 要开始构建一个基于 NestJS 的应用&#xff0c;首先需要安装一系列依赖包。以下是必要的安装命令&#xff1a; npm i --save nestjs/core nestjs/common rxjs reflect-metadata nestjs/platform-express npm install -g ts-node包名介绍nestjs/coreNestJS 框…

第07章 存储管理(一)

一、磁盘简介 1.1 名称称呼 磁盘/硬盘/disk是同一个东西&#xff0c;不同于内存的是容量比较大。 1.2 类型 机械&#xff1a;机械硬盘即是传统普通硬盘&#xff0c;主要由&#xff1a;盘片&#xff0c;磁头&#xff0c;盘片转轴及控制电机&#xff0c;磁头控制器&#xff0…