安装
在此我就不再略过TP6的项目创建过程了,大致就是安装composer工具,安装成功以后,再使用composer去创建项目即可。
think-queue 安装
composer require topthink/think-queue
项目中添加驱动配置
我们需要在安装好的config下找到 queue.php
<?phpreturn [
'default' => 'redis',
'connections' => [
'sync' => [
'type' => 'sync',
],
'database' => [
'type' => 'database',
'queue' => 'default',
'table' => 'jobs',
'connection' => null,
],
'redis' => [
'type' => 'redis',
'queue' => 'default',
'host' => '127.0.0.1',
'port' => 6379,
'password' => '',
'select' => 4,
'timeout' => 0,
'persistent' => false,
],
],
'failed' => [
'type' => 'none',
'table' => 'failed_jobs',
],
];
生产者
<?php
namespace app\controller;
use app\BaseController;
use think\facade\Queue;
class Index extends BaseController
{
publicfunction queue()
{
//当前任务将由哪个类来负责处理。
//当轮到该任务时,系统将生成一个该类的实例,并默认调用其 fire 方法$jobHandlerClassName = 'app\Job\Order';
//当前任务归属的队列名称,如果为新队列,会自动创建
//php think queue:work --queue orderJobQueue
//php think queue:work --queue orderJobQueue --daemon$jobQueueName = "orderJobQueue";
//数组数据$orderData = [
'id' => uniqid(),
'time' => time(),
'message' => 'later message83'
];
//将该任务推送到消息队列,等待对应的消费者去执行
//这里只是负责将数据添加到相应的队列名称的队列里,消费者与生产者并无联系
//立即执行$isPushed = Queue::push($jobHandlerClassName, $orderData, $jobQueueName);
//延迟10秒后执行
//$isPushed = Queue::later(10, $jobHandlerClassName, $orderData, $jobQueueName);if ($isPushed !== false) {
echodate('Y-m-d H:i:s') . " 队列添加成功";
} else {
echo '队列添加失败';
}
}
}
消费者
<?php
namespace app\Job;
use think\facade\Log;
use think\queue\Job;
/**
* @Title: app\task\job$Order
* @Package package_name
* @Description: todo(测试订单消费者)
* @author Jack
*/class Order
{
/**
* @Title: fire
* @Description: todo(fire方法是消息队列默认调用的方法)
* @param Job $job
* @param array $data
* @author Jack
* @throws
*/publicfunction fire(Job $job, array$data)
{
//有些消息在到达消费者时,可能已经不再需要执行了$isJobStillNeedToBeDone = $this->checkDatabaseToSeeIfJobNeedToBeDone($data);
if(!$isJobStillNeedToBeDone){
$job->delete();
return;
}
$jobId = $job->getJobId();
$isJobDone = $this->orders($data, $jobId);
if ($isJobDone) {
//如果任务执行成功,记得删除任务$job->delete();
} else {
//通过这个方法可以检查这个任务已经重试了几次了if ($job->attempts() > 3){
Log::error('试了3次了');
$job->delete();
//也可以重新发布这个任务
//print("<info>Hello Job will be availabe again after 2s."."</info>\n");
//$job->release(2); //$delay为延迟时间,表示该任务延迟2秒后再执行 }
}
}
/**
* @Title: checkDatabaseToSeeIfJobNeedToBeDone
* @Description: todo(有些消息在到达消费者时,可能已经不再需要执行了)
* @param array $data
* @return boolean
* @author Jack
* @throws
*/privatefunction checkDatabaseToSeeIfJobNeedToBeDone($data)
{
returntrue;
}
/**
* @Title: orders
* @Description: todo(数据处理)
* @param array $data
* @author Jack
* @throws
*/publicfunction orders(array$data, $jobId)
{
//对订单进行数据库操作或其他等等Log::info(date('Y-m-d H:i:s') . ' - data:' . json_encode($data));
returntrue;
}
}
服务器执行常驻命令
php think queue:work --queue orderJobQueue