记一次hyperf框架封装swoole自定义进程

news2025/2/27 4:32:11
背景

公司准备引入swoole和rabbitmq来处理公司业务。因此,我引入hyperf框架,想用swoole的多进程来实现。

自定义启动服务封装
<?php
/**
 * 进程启动服务【manager】
 */
declare(strict_types=1);

namespace App\Command;

use Swoole;
use Swoole\Process;
use Swoole\Process\Pool;
use App\Process\BaseProcess;
use Hyperf\Command\Command as HyperfCommand;
use Psr\Container\ContainerInterface;
use Symfony\Component\Console\Input\InputArgument;
use Symfony\Component\Console\Input\InputOption;


/**
 * @Command
 */
#[Command]
class TaskProcessCommand extends HyperfCommand
{
    const MANAGER_PROCESS_PID_PATH = BASE_PATH . '/runtime/taskProcess.pid';

    /**
     * @var ContainerInterface
     */
    protected $container;

    protected $coroutine = false;

    public function __construct(ContainerInterface $container)
    {
        $this->container = $container;

        parent::__construct('task');
    }

    public function configure()
    {
        parent::configure();
        $this->setDescription('自定义进程任务');
        $this->addOption('daemonize', 'd', InputOption::VALUE_NONE, '守护进程化');
        $this->addArgument('action', InputArgument::REQUIRED, 'start/stop/restart 启动/关闭/重启');

    }

    public function handle()
    {
        $action = $this->input->getArgument('action');

        if ($action === 'start') {
            $this->start();
        } elseif ($action === 'stop') {
            $this->stop();
        } elseif ($action === 'restart') {
            $this->restart();
        } else {
            echo "不支持的action, 请输入 -h 参数查看" . PHP_EOL;
        }
    }

    /**
     * 重启:php bin/hyperf.php task restart
     */
    protected function restart()
    {
        $this->stop();
        $this->start();
    }

    /**
     * 停止:php bin/hyperf.php task stop
     */
    protected function stop()
    {

        if (file_exists(self::MANAGER_PROCESS_PID_PATH)) {
            //后期可以写入数据表,根据状态进行重启
            $managerPid = file_get_contents(self::MANAGER_PROCESS_PID_PATH);
            echo "stopping...\n";
            echo "kill pid $managerPid \n";
            $managerPid = intval($managerPid);
            $startTime = time();
            $timeout = config('server.settings.max_wait_time', 10);
            @Process::kill($managerPid);
            //等待主进程结束
            while (@Process::kill($managerPid, 0)) {
                //waiting process stop
                echo "waiting...\r";
                usleep(100000);
                echo "              \r";
                echo "waiting.\r";
                usleep(100000);
                echo "              \r";

                //超时 强杀所有子进程
                if ($managerPid > 0 && time() - $startTime >= $timeout) {
                    echo "wait timeout, kill -9 child process, pid: $managerPid \n";
                    echo shell_exec("ps -ef|awk '$3~/^{$managerPid}$/'") . PHP_EOL;
                    echo shell_exec("ps -ef|awk '$3~/^{$managerPid}$/ {print $2}'|xargs kill -9") . PHP_EOL;
                }
            }
            unlink(self::MANAGER_PROCESS_PID_PATH);
            echo "stopped. \n";

        } else {
            echo "找不到manager pid, path: " . self::MANAGER_PROCESS_PID_PATH;
        }

    }

    /**
     * 启动:php bin/hyperf.php task start
     * 守护进程启动:php bin/hyperf.php task start -d
     */
    protected function start()
    {
        $processConfig = config('processes');

        if ($processConfig) {

            echo "start now.\n";

            $daemonize = $this->input->getOption('daemonize');
            if ($daemonize) {
                //重定向标准输出到指定日志文件
                fclose(STDOUT);
                fclose(STDERR);
                $STDOUT = fopen(BASE_PATH . '/runtime/logs/taskProcess_output.log', 'ab');
                $STDERR = fopen(BASE_PATH . '/runtime/logs/taskProcess_error.log', 'ab');
                Process::daemon(true, true);
            }

            //save pid
            file_put_contents(self::MANAGER_PROCESS_PID_PATH, getmypid());

            //TODO 后期可以根据需要写入配置或者数据表,开启多个主进程、挂载多个子进程
            BaseProcess::setProcessName('manager');//主进程

            $startFuncMap = [];

            foreach ($processConfig as $processClass) {
                $processObj = new $processClass;
                if ($processObj->isEnable && ($processObj instanceof BaseProcess) && isset($processObj->nums) && $processObj->nums > 0) {
                    for ($i = 0; $i < $processObj->nums; $i++) {
                        $startFuncMap[] = [
                            [$processObj, 'handle'],
                            $processObj->enableCoroutine ?? false,
                            $i,
                        ];
                    }
                }
            }

            $pool = new Pool(count($startFuncMap), SWOOLE_IPC_UNIXSOCK, 0, false);

            $pool->on('workerStart', function (Pool $pool, int $workerId) use ($startFuncMap) {
                [$func, $enableCoroutine, $idx] = $startFuncMap[$workerId];
                if ($enableCoroutine) {
                    run(function () use ($func, $pool, $workerId, $idx) {
                        $pm = $func[0];//process下的类
                        $idx += 1;
                        BaseProcess::setProcessName($pm->name . "[{$idx}/{$pm->nums}]");//多个子进程
                        call_user_func($func, $pool, $workerId);
                    });
                } else {
                    $func($pool, $workerId);//baseProcess下的handle
                }
            });

            $pool->on('Message', function (Swoole\Process\Pool $pool, string $data) {
                echo 'process Message,data=' .json_encode($data). PHP_EOL;
            });

            //进程关闭
            $pool->on("WorkerStop", function (Swoole\Process\Pool $pool, int $workerId) {
                echo "process WorkerId={$workerId} is stopped". PHP_EOL;
            });

            $pool->start();


        } else {
            printf("没有可启动的自定义进程, 请在配置task_process中声明,且继承%s\n", BaseProcess::class);
        }
    }

    /**
     * 查看运行状态:php bin/hyperf.php task status
     */
    protected function status(){
        //TODO 查看任务执行状态
    }

    public function getProcess($pid = -1)
    {

        if ($pid === -1) {
            $pid = getmypid();
        }
        return static::$process[$pid] ?? null;
    }

    public function getAllProcess()
    {
        return static::$process;
    }
}

基础process封装

此处可以用hyperf框架自带的,也可以自己封装

<?php

declare (strict_types = 1);

namespace App\Process;

use Swoole;
use Swoole\Process\Pool;

abstract class BaseProcess {

    /**
     * 进程数
     * @var integer
     */
    public $nums = 0;

    /**
     * 进程名称
     * @var string
     */
    public $name = '';

    /**
     * 是否启用协程
     * @var bool
     */
    public $enableCoroutine = true;

    /**
     * 是否随进程启动服务
     * @var bool
     */
    public $isEnable = true;

    protected $isRunning = true;

    protected $process;



    static $signal = 0;

    function __construct() {
        //进程自动命名
        if (empty($this->name)) {
            $this->name = trim(str_replace('\\', '.', str_replace(__NAMESPACE__, '', get_called_class())), '.');
        }
    }

    final public function handle(Pool $pool, int $workerId): void {

        try {

            $this->processInit($pool->getProcess());

            $this->beforeRun();

            while (true) {
                //进程结束信号
                if (BaseProcess::$signal === SIGTERM) {
                    $this->onProcessExit();
                    break;
                }

                $this->run();
            }
        } catch (\Throwable $e) {
            throw $e;
        }

    }

    protected function onProcessExit() {
        $this->isRunning = false;
    }

    protected function processInit($process) {

        $this->process = $process;

        echo "process {$this->name} start, pid: " . getmypid().PHP_EOL;

        //注册信号处理器,实现优雅重启(等待任务执行完后或者等待超时)
        pcntl_signal(SIGTERM, function () {
            BaseProcess::$signal = SIGTERM;
            $maxWaitTime = config('server.settings.max_wait_time', 5);
            $sTime = time();
            //检查进程任务状态
            Swoole\Timer::tick(500, function () use ($sTime, $maxWaitTime) {

                $coStat = \Swoole\Coroutine::stats();
                //如果主循环结束,且其它协程任务执行完,清理定时器以退出进程
                if (!$this->isRunning && $coStat['coroutine_num'] <= 1) {
                    Swoole\Timer::clearAll();
                    $this->process->exit();
                }
                //等待超时,强制结束进程
                elseif (time() - $sTime >= $maxWaitTime) {

                    Swoole\Timer::clearAll();
                    if ($this->isRunning) {
                        $this->onProcessExit();
                    }
                    $this->process->exit();
                }
            });
        });

    }

    public static function setProcessName(string $name) {
        swoole_set_process_name(env('APP_NAME', 'app') . '.taskProcess.' . $name);
    }

    /**
     * 事件循环前调用
     * @return [type] [description]
     */
    abstract function beforeRun();

    /**
     * 事件循环,注意这里不能使用死循环
     * @return [type] [description]
     */
    abstract function run();

}
使用demo

demo1

<?php

declare (strict_types = 1);

namespace App\Process;


/**
 * test
 */
class TestProcess extends BaseProcess {

    /**
     * 进程数
     * @var integer
     */
    public $nums = 5;

    public $enableCoroutine = true;

    /**
     * 不随服务启动进程
     * @var bool 
     */
    public $isEnable = false;

    public function beforeRun() {
        //事件循环前执行,比如一些初始化工作
    }

    public function run() {
        //事件循环主体
        echo date('Y-m-d H:i:s').PHP_EOL;
        usleep(1000);
    }



}

demo2

<?php


namespace App\Process;


use App\Amqp\Producer\JbtyProducer;
use App\Amqp\Producer\UpdateZeroStockProducer;
use App\Library\Jbchip\JbchipRequest;
use App\Model\HqchipGoodsModel;
use App\Model\IcbaseGoodsModel;
use App\Model\JbtyGoodsModel;
use App\Model\LcscGoodsModel;
use App\Model\OneyacGoodsModel;
use Hyperf\Amqp\Producer;
use Hyperf\Redis\Redis;
use Hyperf\Utils\ApplicationContext;

class UpdateZeroStock extends BaseProcess
{
    const ZERO_STOCK_KEY = 'platform_zero_stock_cache_key';
    /**
     * 进程数
     * @var integer
     */
    public $nums = 1;

    public $enableCoroutine = true;
    /**
     * 随服务启动进程
     * @var bool 
     */
    public $isEnable=true;


    public function beforeRun() {
        //事件循环前执行,比如一些初始化工作
    }

    public function run() {
        //事件循环主体
        $this->updateZeroStock();
        echo date('Y-m-d H:i:s').PHP_EOL;
        sleep(300);
    }

    public function updateZeroStock()
    {
        // 1.全量更新
        $list_hq = HqchipGoodsModel::select(['id','spu','stock','manufacturer'])->where('excute_time','<',8)->limit(1000)->get();
        $container = ApplicationContext::getContainer();
        $redis = $container->get(Redis::class);
        $producer = ApplicationContext::getContainer()->get(Producer::class);
        $today = date('Y-m-d');
        if($list_hq){
            foreach ($list_hq as $item){
                $spu = trim($item['spu']);
                $zeroStockKey =  $this->getZeroStockKey($today,'hqchip',$item['manufacturer']);
                if($redis->exists($zeroStockKey) && !$redis->hGet($zeroStockKey,$spu)){
                    $sendData = $item;
                    $sendData['appKey'] = $this->appSecretKey();
                    $sendData['platform'] = 'hqchip';
                    $message = new UpdateZeroStockProducer($sendData);
                    $res = $producer->produce($message);
                    echo date('Y-m-d H:i:s') . 'rabbitmq hqchip sendMq: ' .$res . PHP_EOL;
                }
            }
        }
    }

    /**
     * 零库存缓存KEY
     * @param $brand
     * @param $sku
     * @return string
     */
    private function getZeroStockKey($day,$platfrom,$brand)
    {
        return self::ZERO_STOCK_KEY .":". $platfrom .":" . $day . ":" . $brand;
    }


    /**
     * 密钥生产
     * @return string
     */
    private function appSecretKey()
    {
        $a = 'chipmall-spider&V2&' . date('Y-m-d');
        $appKey = base64_encode(md5($a)   .'||'. base64_encode(time() . '|' . $a));
        return $appKey;
    }
}
在配置中进程需要执行的服务

在这里插入图片描述

以守护进程方式启动服务
php bin/hyperf.php task start -d

在这里插入图片描述
查看进程命令

ps -ef|grep taskProcess

在这里插入图片描述

疑惑

这次封装还存在两个点需要完善!!!
1.重复执行:

php bin/hyperf.php task start -d

会启动多个manager进程

2、没有封装查看进程状态的status方法

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1037033.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

软考知识产权基础知识

商标权可以根据需要无限延长 根据《商标法》的规定&#xff0c;商标的有效期为10年&#xff0c;自商标注册之日起计算。有效期届满后&#xff0c;可以递交商标续展申请。每次续展的有效期为10年。但是&#xff0c;商标续展仅限于最后一年有效期也就是期满前六个月内提交申请。…

服务注册发现_actuator微服务信息完善

SpringCloud体系里的&#xff0c;服务实体向eureka注册时&#xff0c;注册名默认是IP名:应用名:应用端口名。 问题&#xff1a; 自定义服务在Eureka上的实例名怎么弄呢 在服务提供者pom中配置Actuator依赖 <!-- actuator监控信息完善 --> <dependency><groupId…

011_第一代软件开发(三)

第一代软件开发(三) 文章目录 第一代软件开发(三)项目介绍带下知识点系统日志滤波器陷波滤波器带通滤波器 打印初始化调用打印机打印文件保存到PDF 总结一下 关键字&#xff1a; Qt、 Qml、 日志、 打印、 滤波器 项目介绍 欢迎来到我们的 QML & C 项目&#xff01;这…

排序算法:非比较排序(计数排序)

朋友们、伙计们&#xff0c;我们又见面了&#xff0c;本期来给大家解读一下有关排序算法的相关知识点&#xff0c;如果看完之后对你有一定的启发&#xff0c;那么请留下你的三连&#xff0c;祝大家心想事成&#xff01; C 语 言 专 栏&#xff1a;C语言&#xff1a;从入门到精通…

2018 国际AIOps挑战赛单指标数据集分析

关于数据集 2018年国际AIOps 由中国建设银行、清华大学以及必示科技公司联合举办&#xff0c;尽管已经过去了这么长时间&#xff0c;其提供的比赛数据依然被用于智能运维相关算法的研究。这里我们对此数据集做简单的分析&#xff0c;把一些常用的数据分析方法在这里进行略微地…

Spring面试题13:Spring中ApplicationContext实现有哪些?Bean工厂和Applicationcontext有什么区别

该文章专注于面试,面试只要回答关键点即可,不需要对框架有非常深入的回答,如果你想应付面试,是足够了,抓住关键点 面试官:Spring中ApplicationContext实现有哪些? 在Spring框架中,有以下几种ApplicationContext的实现: ClassPathXmlApplicationContext:从类路径下的…

基于springboot消防员招录系统

博主主页&#xff1a;猫头鹰源码 博主简介&#xff1a;Java领域优质创作者、CSDN博客专家、公司架构师、全网粉丝5万、专注Java技术领域和毕业设计项目实战 主要内容&#xff1a;毕业设计(Javaweb项目|小程序等)、简历模板、学习资料、面试题库、技术咨询 文末联系获取 项目介绍…

​Segment-and-Track Anything——通用智能视频分割、跟踪、编辑算法解读与源码部署

一、 万物分割 随着Meta发布的Segment Anything Model (万物分割)的论文并开源了相关的算法&#xff0c;我们可以从中看到&#xff0c;SAM与GPT-4类似&#xff0c;这篇论文的目标是&#xff08;零样本&#xff09;分割一切&#xff0c;将自然语言处理&#xff08;NLP&#xff…

【数据结构】二叉排序树;平衡二叉树的知识点学习总结

目录 1、二叉排序树 1.1 定义 1.2 查找操作 1.3 插入操作 1.4 删除操作 1.5 C语言实现二叉排序树的基本操作 2、平衡二叉树的知识点总结 2.1 定义 2.2 插入操作 2.3 调整“不平衡” 2.4 删除操作 1、二叉排序树 1.1 定义 二叉排序树&#xff08;Binary Search …

云计算与大数据——部署Hadoop集群并运行MapReduce集群(超级详细!)

云计算与大数据——部署Hadoop集群并运行MapReduce集群(超级详细&#xff01;) Linux搭建Hadoop集群(CentOS7hadoop3.2.0JDK1.8Mapreduce完全分布式集群) 本文章所用到的版本号&#xff1a; CentOS7 Hadoop3.2.0 JDK1.8 基本概念及重要性 很多小伙伴部署集群用hadoop用mapr…

C++设计模式_06_Decorator 装饰模式

本篇将会介绍Decorator 装饰模式&#xff0c;它是属于一个新的类别&#xff0c;按照C设计模式_03_模板方法Template Method中介绍的划分为“单一职责”模式。 “单一职责”模式讲的是在软件组件的设计中&#xff0c;如果责任划分的不清晰&#xff0c;使用继承得到的结果往往是随…

HT for Web (Hightopo) 使用心得(2)- 2D 图纸、节点、连线 与基本动画

概括来说&#xff0c;用 HT for Web 做可视化主要分为两部分&#xff0c;也就是 2D 和 3D。这两部分需要单独创建。在它们被创建完成后&#xff0c;我们再把它们集成到一起。 HT for Web 的 2D 部分主要是指 ht.graph.GraphView (简称 GraphView&#xff0c;也就是 2D 图纸)。…

Java项目:SSM的食堂点餐系统

作者主页&#xff1a;Java毕设网 简介&#xff1a;Java领域优质创作者、Java项目、学习资料、技术互助 文末获取源码 一、相关文档 系统中的核心用户是系统管理员&#xff0c;管理员登录后&#xff0c;通过管理员菜单来管理后台系统。主要功能有&#xff1a;个人中心、用户管理…

自动发现、zabbix_proxy代理

自动发现&#xff1a;自己去发现被监控的主机 它能够根据用户事先定义的规则自动添加监控的主机或服务等。 优点 加快Zabbix部署&#xff08;agent&#xff09; 简化管理 在快速变化的环境中使用Zabbix&#xff0c;而不需要过度管理 部署自动发现(新机子) rpm -Uvh https://re…

OSI 七层网络协议最全的图

OSI 七层网络协议最全的图 文章出处&#xff1a;https://www.shuzhiduo.com/A/RnJWawowdq/

DINO(ICLR 2023)

DINO&#xff08;ICLR 2023&#xff09; DETR with Improved deNoising anchOr box DINO发展&#xff1a; Conditional DETR->DAB-DETR&#xff08;4D,WH修正&#xff09; DN-DETR&#xff08;去噪训练&#xff0c;deNoising 稳定匹配过程&#xff09; Deformable DETR&…

后端大厂面试-16道面试题

1 java集合类有哪些&#xff1f; List是有序的Collection&#xff0c;使用此接口能够精确的控制每个元素的插入位置&#xff0c;用户能根据索引访问List中元素。常用的实现List的类有LinkedList&#xff0c;ArrayList&#xff0c;Vector&#xff0c;Stack。 ArrayList是容量…

基于同名面片的TLS测站点云配准

1、原理介绍 2、代码介绍 基于C++编写的程序代码如下,其依赖eigen矩阵运算库,在创建工程时包含库目录中使用了相对路径,因此其下载下来直接可以运行,不用单独在设置环境,非常方便。

Java项目:SpringBoot高校宿舍管理系统

作者主页&#xff1a;Java毕设网 简介&#xff1a;Java领域优质创作者、Java项目、学习资料、技术互助 文末获取源码 一、相关文档 宿舍是大学生学习与生活的主要场所之一&#xff0c;宿舍管理是高校学工管理事务中尤为重要的一项。随着我国高校招生规模的进一步扩大&#xff0…

异步回调

Future 设计的初衷&#xff1a;对将来的某个事件的结果进行建模 package com.kuang.future;import com.kuang.pc.C;import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.uti…