Hyperf 如何做到用两个端口 9501/9502 都能连接 Websocket 服务以及多 Worker 协作实现聊天室功能

news2025/1/22 21:05:57

为何 Hyperf 能够在两个端口上监听 WebSocket 连接?

源码角度来看,在配置了多个 Servers 时,实际上,只启动了一个 Server

注:我之前接触的代码都是启动一个服务绑定一个端口,之前也看过 swoole 扩展的文档,但是没留意服务和监听端口也是分离的,这启发了我一种思维,代码凡是能继续拆分的,就继续拆分,这样代码就会有更多的灵活,每个功能都能进行扩展,将服务和端口进行拆分之后,就可以在一个 Server 绑定多个 Port,每个 Port 又能有独立的事件。

/**
 * @param Port[] $servers
 * @return Port[]
 */
protected function sortServers(array $servers): array
{
    $sortServers = [];
    foreach ($servers as $server) {
        switch ($server->getType()) {
            case ServerInterface::SERVER_HTTP:
                $this->enableHttpServer = true;
                if (! $this->enableWebsocketServer) {
                    array_unshift($sortServers, $server);
                } else {
                    $sortServers[] = $server;
                }
                break;
            case ServerInterface::SERVER_WEBSOCKET:
                $this->enableWebsocketServer = true;
                array_unshift($sortServers, $server);
                break;
            default:
                $sortServers[] = $server;
                break;
        }
    }

    return $sortServers;
}

从源码看,排在第一个的服务配置,会被创建服务,之后都是增加监听

protected function initServers(ServerConfig $config)
{
    $servers = $this->sortServers($config->getServers());

    foreach ($servers as $server) {
        $name = $server->getName();
        $type = $server->getType();
        $host = $server->getHost();
        $port = $server->getPort();
        $sockType = $server->getSockType();
        $callbacks = $server->getCallbacks();

        if (! $this->server instanceof SwooleServer) {
            $this->server = $this->makeServer($type, $host, $port, $config->getMode(), $sockType);
            $callbacks = array_replace($this->defaultCallbacks(), $config->getCallbacks(), $callbacks);
            $this->registerSwooleEvents($this->server, $callbacks, $name);
            $this->server->set(array_replace($config->getSettings(), $server->getSettings()));
            ServerManager::add($name, [$type, current($this->server->ports)]);

            if (class_exists(BeforeMainServerStart::class)) {
                // Trigger BeforeMainServerStart event, this event only trigger once before main server start.
                $this->eventDispatcher->dispatch(new BeforeMainServerStart($this->server, $config->toArray()));
            }
        } else {
            /** @var bool|\Swoole\Server\Port $slaveServer */
            $slaveServer = $this->server->addlistener($host, $port, $sockType);
            if (! $slaveServer) {
                throw new \RuntimeException("Failed to listen server port [{$host}:{$port}]");
            }
            $server->getSettings() && $slaveServer->set(array_replace($config->getSettings(), $server->getSettings()));
            $this->registerSwooleEvents($slaveServer, $callbacks, $name);
            ServerManager::add($name, [$type, $slaveServer]);
        }

        // Trigger beforeStart event.
        if (isset($callbacks[Event::ON_BEFORE_START])) {
            [$class, $method] = $callbacks[Event::ON_BEFORE_START];
            if ($this->container->has($class)) {
                $this->container->get($class)->{$method}();
            }
        }

        if (class_exists(BeforeServerStart::class)) {
            // Trigger BeforeServerStart event.
            $this->eventDispatcher->dispatch(new BeforeServerStart($name));
        }
    }
}

从makeServer函数来看,如果服务中有SERVER_WEBSOCKET,则这个会被作为主服务启动,new SwooleWebSocketServer

protected function makeServer(int $type, string $host, int $port, int $mode, int $sockType): SwooleServer
{
    switch ($type) {
        case ServerInterface::SERVER_HTTP:
            return new SwooleHttpServer($host, $port, $mode, $sockType);
        case ServerInterface::SERVER_WEBSOCKET:
            return new SwooleWebSocketServer($host, $port, $mode, $sockType);
        case ServerInterface::SERVER_BASE:
            return new SwooleServer($host, $port, $mode, $sockType);
    }

    throw new RuntimeException('Server type is invalid.');
}

$this->registerSwooleEvents($this->server, $callbacks, $name); 这句代码会将 Websocket 的各种事件都注册进去,于是主服务器拥有 websocket 的各种事件,而后 http 服务器挂载 9501 端口上,绑定了 onrequest 事件,但是如果有 websocket 连接9501 端口上时,默认该服务器是自动开启 websocket 自动升级的,又因为监听 9501 端口绑定的主服务器是 WebSocketServer,因此,WebSocketServer 默认的 onmessage,onopen事件就会被拿来用。

推测,如果开启 9503 WebSocket服务器,那么理论上用 WebSocket连接 9501 端口,应该就是连接的 9503 的回调事件。如果不想让 http 监听端口自动开启 websocket 协议,则将open_websocket_protocol=false

<?php
return [
// 这里省略了该文件的其它配置
'servers' => [
        [
            'name' => 'http',
            'type' => Server::SERVER_HTTP,
            'host' => '0.0.0.0',
            'port' => 9501,
            'sock_type' => SWOOLE_SOCK_TCP,
            'callbacks' => [
                Event::ON_REQUEST => [Hyperf\HttpServer\Server::class,'onRequest'],
             ],
             'settings' => ['open_websocket_protocol' => false,]
         ],
     ]
 ];

关于很多SWOOLE中出现的常量来看,这些东西在执行脚本时,会被自动设置好,通过实际代码运行发现

define('SWOOLE_HTTP2_ERROR_COMPRESSION_ERROR', 9);
define('SWOOLE_HTTP2_ERROR_CONNECT_ERROR', 10);
define('SWOOLE_HTTP2_ERROR_ENHANCE_YOUR_CALM', 11);
define('SWOOLE_HTTP2_ERROR_INADEQUATE_SECURITY', 12);
define('SWOOLE_BASE', 1);
define('SWOOLE_PROCESS', 2);
define('SWOOLE_IPC_UNSOCK', 1);
define('SWOOLE_IPC_MSGQUEUE', 2);
define('SWOOLE_IPC_PREEMPTIVE', 3);

以下,随便设置的一个test.php中输出SWOOLE_BASE,都能输出1,我之前都以为这些常量是运行时设置的呢,看来这种理解是错误的。

<?php
echo SWOOLE_BASE."\n";
echo "hello\n";

// 输出
1
hello

Hyperf-skeleton 给的默认配置就是进程模式,这个竟然没有发现,这样就比较明确了,使用的都是PROCESS模式,那么在websocket连接时,所有的连接都是由Manager来控制

SWOOLE_PRECESS 和 SWOOLE_BASE 两种模式

Server 的两种运行模式介绍

在 Swoole\Server 构造函数的第三个参数,可以填 2 个常量值 -- SWOOLE_BASE或 SWOOLE_PROCESS,下面将分别介绍这两个模式的区别以及优缺点

SWOOLE_PROCESS

SWOOLE_PROCESS 模式的 Server 所有客户端的 TCP 连接都是和主进程建立的,内部实现比较复杂,用了大量的进程间通信、进程管理机制。适合业务逻辑非常复杂的场景。Swoole 提供了完善的进程管理、内存保护机制。 在业务逻辑非常复杂的情况下,也可以长期稳定运行。

Swoole 在 Reactor线程中提供了 Buffer 的功能,可以应对大量慢速连接和逐字节的恶意客户端。

进程模式的优点:

  • 连接与数据请求发送是分离的,不会因为某些连接数据量大某些连接数据量小导致 Worker 进程不均衡

  • Worker 进程发生致命错误时,连接并不会被切断

  • 可实现单连接并发,仅保持少量 TCP 连接,请求可以并发地在多个 Worker 进程中处理

进程模式的缺点:

  • 存在 2 次 IPC 的开销,master 进程与 worker 进程需要使用 unixSocket进行通信

  • SWOOLE_PROCESS 不支持 PHP ZTS,在这种情况下只能使用 SWOOLE_BASE 或者设置 single_thread为 true

SWOOLE_BASE

SWOOLE_BASE 这种模式就是传统的异步非阻塞 Server。与 Nginx 和 Node.js 等程序是完全一致的。

worker_num参数对于 BASE 模式仍然有效,会启动多个 Worker 进程。

当有 TCP 连接请求进来的时候,所有的 Worker 进程去争抢这一个连接,并最终会有一个 worker 进程成功直接和客户端建立 TCP 连接,之后这个连接的所有数据收发直接和这个 worker 通讯,不经过主进程的 Reactor 线程转发。

  • BASE 模式下没有 Master 进程的角色,只有 Manager进程的角色。

  • 每个 Worker 进程同时承担了 SWOOLE_PROCESS模式下 Reactor线程和 Worker 进程两部分职责。

  • BASE 模式下 Manager 进程是可选的,当设置了 worker_num=1,并且没有使用 Task 和 MaxRequest 特性时,底层将直接创建一个单独的 Worker 进程,不创建 Manager 进程

BASE 模式的优点:

  • BASE 模式没有 IPC 开销,性能更好

  • BASE 模式代码更简单,不容易出错

BASE 模式的缺点:

  • TCP 连接是在 Worker 进程中维持的,所以当某个 Worker 进程挂掉时,此 Worker 内的所有连接都将被关闭

  • 少量 TCP 长连接无法利用到所有 Worker 进程

  • TCP 连接与 Worker 是绑定的,长连接应用中某些连接的数据量大,这些连接所在的 Worker 进程负载会非常高。但某些连接数据量小,所以在 Worker 进程的负载会非常低,不同的 Worker 进程无法实现均衡。

  • 如果回调函数中有阻塞操作会导致 Server 退化为同步模式,此时容易导致 TCP 的 backlog队列塞满问题。

BASE 模式的适用场景:

如果客户端连接之间不需要交互,可以使用 BASE 模式。如 Memcache、HTTP 服务器等。

BASE 模式的限制:

在 BASE 模式下,Server 方法除了 send和 close以外,其他的方法都不支持跨进程执行。

Reactor 线程和 Worker 进程

Reactor 线程

  • Reactor 线程是在 Master 进程中创建的线程

  • 负责维护客户端 TCP 连接、处理网络 IO、处理协议、收发数据

  • 不执行任何 PHP 代码

  • 将 TCP 客户端发来的数据缓冲、拼接、拆分成完整的一个请求数据包

Worker 进程

  • 接受由 Reactor 线程投递的请求数据包,并执行 PHP 回调函数处理数据

  • 生成响应数据并发给 Reactor 线程,由 Reactor 线程发送给 TCP 客户端

  • 可以是异步非阻塞模式,也可以是同步阻塞模式

  • Worker 以多进程的方式运行

他们之间的关系可以理解为 Reactor 就是 nginx,Worker 就是 PHP-FPM。Reactor 线程异步并行地处理网络请求,然后再转发给 Worker 进程中去处理。Reactor 和 Worker 间通过 unixSocket进行通信。

在 PHP-FPM 的应用中,经常会将一个任务异步投递到 Redis 等队列中,并在后台启动一些 PHP 进程异步地处理这些任务。Swoole 提供的 TaskWorker 是一套更完整的方案,将任务的投递、队列、PHP 任务处理进程管理合为一体。通过底层提供的 API 可以非常简单地实现异步任务的处理。另外 TaskWorker 还可以在任务执行完成后,再返回一个结果反馈到 Worker。

Swoole 的 Reactor、Worker、TaskWorker 之间可以紧密的结合起来,提供更高级的使用方式。

一个更通俗的比喻,假设 Server 就是一个工厂,那 Reactor 就是销售,接受客户订单。而 Worker 就是工人,当销售接到订单后,Worker 去工作生产出客户要的东西。而 TaskWorker 可以理解为行政人员,可以帮助 Worker 干些杂事,让 Worker 专心工作。

结论

  1. 无论是SWOOLE_PRECESS 还是 SWOOLE_BASE 模式,Websocket 的连接对象都是直接分配给具体的 Worker 进程的,Worker 进程Pack 消息后,返回给对应 Reactor,不同的 Reactor 可能绑定不同的 Worker,因此在碰到的聊天室这种需要群发的情况,就涉及到当前 Worker 并没有直接关联其他的群成员的问题,但可以确定的是,其他群成员肯定都分布在所有的 Worker 上

  2. 鉴于 Reactor 也不止一个,Reactor 与 Worker 也可能是多对多,Reactor 要尽可能简单的,只负责收发消息,所以当 Reactor1-10 与 Worker1 建立了链接后,Reactor1-10 的所有连接的收发都由Reactor1-10 控制,同时返回的数据都有 Worker1 控制,Worker1 内部也会分发出来 1-10 个协程来对应 10 个请求,此时,群里另外 10-20 个人连接的是 Worker2,则 Worker2 中 13 号要给群里 20 个人同时回复消息,那么让 Worker2 就无法给 1-10 人发消息,需要中转给 Worker1,Worker1 收到 Worker2 的任务后,帮忙代理转发,这是一种Websocket 实现思路呀,分布式的情况下,还能实现集中化管理

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/906230.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

这种隐形行为,酒店管理需警惕!

当火灾爆发时&#xff0c;及早的警报和响应可以挽救生命并减少财产损失。烟感监控系统作为一种关键设施&#xff0c;通过检测空气中的烟雾&#xff0c;能够在火灾初期就发出警报&#xff0c;为人们提供宝贵的时间来采取紧急措施。 烟感监控在各种场所如住宅、商业建筑、工厂等都…

机器人制作开源方案 | 滑板助力器

我们可以用一块废滑板做些什么呢&#xff1f; 如今&#xff0c;越来越多的人选择电动滑板作为代步工具或娱乐方式&#xff0c;市场上也涌现出越来越多的电动滑板产品。 &#xff08;图片来源&#xff1a;Backfire Zealot X Belt Drive Electric Skateboard– Backfire Boards…

磁铁的吸力与磁吸器件

目前磁铁很多的应用是利用其同极相斥异极相吸以及对铁磁性物质吸附的原理&#xff0c;如各类磁吸器件、磁性连接结构、磁选设备、磁传动设备等。 对于磁吸类的应用&#xff0c;大家都非常关注磁铁的吸力。磁铁的吸力是可以计算的&#xff0c;有如下公式可以参考&#xff0c;但…

深入浅出 TCP/IP 协议栈

TCP/IP 协议栈是一系列网络协议的总和&#xff0c;是构成网络通信的核心骨架&#xff0c;它定义了电子设备如何连入因特网&#xff0c;以及数据如何在它们之间进行传输。TCP/IP 协议采用4层结构&#xff0c;分别是应用层、传输层、网络层和链路层&#xff0c;每一层都呼叫它的下…

【Spring Boot】JdbcTemplate数据连接模板 — 使用JdbcTemplate操作数据库

使用JdbcTemplate操作数据库 成功在Spring Boot项目中集成JdbcTemplate后&#xff0c;如何使用JdbcTemplate数据库连接模板操作数据库呢&#xff1f;接下来以示例演示JdbcTemplate实现学生信息的增、删、改、查等操作&#xff0c;让我们在实践中边学边用&#xff0c;更好地理解…

Maven之Servlet 版本问题

maven-archetype-webapp 骨架的 Servlet 版本问题 通过 maven-archetype-webapp 骨架去创建 java web 项目时&#xff0c;自动生成的 web.xml 配置文件所使用的 Servlet 的版本比较低&#xff08;2.3&#xff09;&#xff0c;而在低版本的 Servlet 中 EL 表达式默认是关闭的。…

stack和queue的模拟实现

stack和queue的模拟实现 容器适配器什么是适配器STL标准库中stack和queue的底层结构deque的简单介绍deque的缺陷 stack模拟实现queue模拟实现priority_queuepriority_queue的使用priority_queue的模拟实现 容器适配器 什么是适配器 适配器是一种设计模式(设计模式是一套被反复…

深入解析:如何打造高效的直播视频美颜SDK

在当今数字化时代&#xff0c;视频直播已经成为人们交流、娱乐和信息传递的重要方式。然而&#xff0c;许多人在直播时都希望能够呈现出最佳的外观&#xff0c;这就需要高效的直播视频美颜技术。本文将深入解析如何打造高效的直播视频美颜SDK&#xff0c;以实现令人满意的视觉效…

磁盘格式化工具的详细指南!一文看懂五分钟搞定

什么是磁盘格式化工具&#xff1f; 磁盘格式化工具是一种软件&#xff0c;可让你擦除硬盘上的所有数据&#xff08;包括操作系统&#xff09;&#xff0c;并为新数据做好准备。格式化硬盘是提高电脑性能并消除你可能遇到的问题的好方法。 使用磁盘格式化工具有什么好处&am…

NineData x SelectDB 完成产品兼容互认证

近日&#xff0c;新一代实时数据仓库厂商 SelectDB 与云原生智能数据管理平台 NineData 完成产品兼容互认证。经过严格的联合测试&#xff0c;双方软件完全相互兼容、功能完善、整体运行稳定且性能表现优异。基于本次的合作&#xff0c;双方将进一步为数据管理与大数据分析业务…

Blazor前后端框架Known-V1.2.13

V1.2.13 Known是基于C#和Blazor开发的前后端分离快速开发框架&#xff0c;开箱即用&#xff0c;跨平台&#xff0c;一处代码&#xff0c;多处运行。 Gitee&#xff1a; https://gitee.com/known/KnownGithub&#xff1a;https://github.com/known/Known 概述 基于C#和Blazo…

微信ipad协议

前言 微信协议就是基于微信IPad协议的智能控制系统&#xff0c;利用人工智能AI技术、云计算技术、虚拟技术、边缘计算技术、大数据技术&#xff0c; 打造出智能桌面系统RDS、 智能聊天系统ACS 、智能插 件系统PLUGIN 、云计算服务CCS 、 任务管理系统TM、设备管理服务DM、 应…

Webpack 配置多入口

1、配置多入口 entry index 和 other 为入口名称&#xff0c;即页面名称 2、配置出口 output filename 中的 [name] 对应入口的文件名 contentHash 会命中缓存&#xff0c;提高性能 3、配置插件 htmlWebpackPlugin&#xff0c;生成多页面 htmlWebpackPlugin 插件会生成页…

【大虾送书第六期】搞懂大模型的智能基因,RLHF系统设计关键问答

目录 ✨1、RLHF是什么&#xff1f; ✨2、RLHF适用于哪些任务&#xff1f; ✨3、RLHF和其他构建奖励模型的方法相比有何优劣&#xff1f; ✨4、什么样的人类反馈才是好的反馈 ✨5、RLHF算法有哪些类别&#xff0c;各有什么优缺点&#xff1f; ✨6、RLHF采用人类反馈会带来哪些局…

Swagger2 使用

大家好 , 我是苏麟 , 今天带来Swagger的使用 . 官方文档 : 招摇文档 (swagger.io) 访问地址 : 在路径后加上doc.html 例如: http://localhost:8000/doc.html Swagger 使用 依赖 <!--Swagger依赖 核心--><dependency><groupId>io.springfox</groupId&g…

关于ubuntu下面安装cuda不对应版本的pyTorch

最近换了台新的linux的ubuntu的服务器&#xff0c;发现其实际安装的cuda版本为11.4&#xff0c;但是pytorch官方给出的针对cuda 11.4并没有具体的pytorch的安装指令&#xff0c;于是采用不指定pytorch版本直接安装让其自动搜索得到即可 直接通过&#xff1a; pip3 install tor…

series的数据对齐功能

Series 是一种类似于 Numpy 中一维数组的对象&#xff0c;它由一组任意类型的数据以及一组与之相关的数据标签&#xff08;即索引&#xff09;组成。举个最简单的例子&#xff1a; 上面的代码将打印出如下内容&#xff1a; 左边的是数据的标签&#xff0c;默认从 0 开始依次递增…

快解析Linux搭建FTP服务器:轻松实现文件传输

在Linux操作系统中&#xff0c;搭建FTP服务器是一种常见且重要的操作。快解析提供了便捷的解决方案&#xff0c;帮助用户快速搭建FTP服务器&#xff0c;实现高效的文件传输和共享。本文将介绍Linux搭建FTP服务器的定义、作用以及其独特的优势&#xff0c;助您了解并利用这一强大…

用手势操控现实:OpenCV 音量控制与 AI 换脸技术解析

基于opencv的手势控制音量和ai换脸 HandTrackingModule.py import cv2 import mediapipe as mp import timeclass handDetector():def __init__(self, mode False, maxHands 2, model_complexity 1, detectionCon 0.5, trackCon 0.5):self.mode modeself.maxHands max…