先说一下遇到的问题,使用的是beanstalk队列,有两个tube, 使用 supervisor 监控 beanstalk 消费队列(主进程A),主进程A产生两个子进程(子进程B,子进程C),每个子进程处理一个tube的数据。
supervisor配置如下:
[program:queue-worker]
command=/usr/local/bin/php /var/www/html/ctc/console.php queue worker
user=www-data
autostart=true
autorestart=true
redirect_stderr=true
startretries=30
startsecs=10
处理消费队列的代码如下:
/**
* 启动消费队列
*
* @command php console.php queue worker
*/
public function wokerAction()
{
echo "------ worker start ------" . PHP_EOL;
$beanstalk = $this->getBeanstalk();
$logger = $this->getLogger('queue');
$beanstalk->addWorker(
'main',
function (BeanstalkJob $job) use ($config, $logger) {
$taskId = $job->getBody();
try {
$manager = new MainQueue();
$manager->handle($taskId);
} catch (\Throwable $e) {
$logger->error("tube:main, task:{$taskId} exception " . kg_json_encode([
'file' => $e->getFile(),
'line' => $e->getLine(),
'message' => $e->getMessage(),
]));
}
exit(0);
}
);
$beanstalk->addWorker(
'notice',
function (BeanstalkJob $job) use ($config, $logger) {
$taskId = $job->getBody();
try {
$manager = new NoticeQueue();
$manager->handle($taskId);
} catch (\Throwable $e) {
$logger->error("tube:notice, task:{$taskId} exception " . kg_json_encode([
'file' => $e->getFile(),
'line' => $e->getLine(),
'message' => $e->getMessage(),
]));
}
exit(0);
}
);
$beanstalk->doWork();
}
经常会出现下面的报错,子进程B或者C就退出了,但是主进程没事。
shmop_open(): unable to attach or create shared memory segment 'No such file or directory'
也查阅了supervisor的文档,里面有个针对组的配置,但是试了不起作用,这里的组配置应该是针对主进程的,主进程没事,子进程死活就不管了。
stopasgroup=true
killasgroup=true
这个事情折腾了好几天,子进程不定时的会死,总是在找为什么会出现共享内存错误的原因,进程的创建使用的是现成的类库,如果总找不到原因程序就不稳定了。
后来转变思路,反正只有两个tube,干脆搞两个独立进程算了,不纠结什么子进程了,这样就把监控的事情交给了supervisor管理了,其实不是supervisor的问题,是我们使用的姿势不对。
supervisor 配置如下:
[program:queue-main-worker]
command=/usr/local/bin/php /var/www/html/ctc/console.php queue main_worker
user=www-data
autostart=true
autorestart=true
redirect_stderr=true
startretries=30
startsecs=10
[program:queue-notice-worker]
command=/usr/local/bin/php /var/www/html/ctc/console.php queue notice_worker
user=www-data
autostart=true
autorestart=true
redirect_stderr=true
startretries=30
startsecs=10
处理消费队列的代码如下:
(1)处理 main tube 队列
/**
* 启动main消费队列
*
* @command php console.php queue main_worker
*/
public function mainWorkerAction()
{
$tube = 'main';
echo "------{$tube} worker start ------" . PHP_EOL;
$beanstalk = $this->getBeanstalk();
$logger = $this->getLogger('queue');
while (true) {
$job = $beanstalk->reserveFromTube($tube);
if ($job instanceof BeanstalkJob) {
$taskId = $job->getBody();
try {
$manager = new MainQueue();
$manager->handle($taskId);
$job->delete();
} catch (\Throwable $e) {
$logger->error("tube:{$tube}, task:{$taskId} exception " . kg_json_encode([
'file' => $e->getFile(),
'line' => $e->getLine(),
'message' => $e->getMessage(),
]));
}
} else {
sleep(1);
}
}
}
(2)处理 notice tube 队列
/**
* 启动notice消费队列
*
* @command php console.php queue notice_worker
*/
public function noticeWorkerAction()
{
$tube = 'notice';
echo "------{$tube} worker start ------" . PHP_EOL;
$beanstalk = $this->getBeanstalk();
$logger = $this->getLogger('queue');
while (true) {
$job = $beanstalk->reserveFromTube($tube);
if ($job instanceof BeanstalkJob) {
$taskId = $job->getBody();
try {
$manager = new NoticeQueue();
$manager->handle($taskId);
$job->delete();
} catch (\Throwable $e) {
$logger->error("tube:{$tube}, task:{$taskId} exception " . kg_json_encode([
'file' => $e->getFile(),
'line' => $e->getLine(),
'message' => $e->getMessage(),
]));
}
} else {
sleep(1);
}
}
}
至于为什么当初会选择使用fork子进程的方式,说到底还是图方便,因为有现成的东西,拿来就用,但是出了问题,又迟迟搞不定。吃过亏才发现,还是用点简单稳定的方案,哪怕看上去没有那么美。
项目组件
- 后台框架:phalcon 3.4.5
- 前端框架:layui 2.8.2
- 全文检索:xunsearch 1.4.9
- 即时通讯:workerman 3.5.22
- 基础依赖:php7.3, mysql5.7, redis5.0
项目文档
- 运行环境搭建
- 系统服务配置
- 客户终端配置
意见反馈
- 码云平台
- 官方社区