1.首先查看项目中是否已经有think-queue目录:/vendor/topthink/
微信截图_20200909142126.png
如果没有,则用composer安装(安装composer参考:http://www.runoob.com/w3cnote/composer-install-and-usage.html
),安装think-queue先进入到项目根目录,运行
composer require topthink/think-queue=1.1.6
安装完成后,有think-queue文件夹,再运行
php think queue:work -h
查看是否安装成功,出现如下,则安装成功
微信截图_20200909142609.png
2.配置队列,queue内置了四种驱动,推荐使用redis驱动。配置文件在application/extra/queue.php,如下
<?php
use think\Env;
return [
'connector' => 'Redis', // Redis 驱动
'expire' => Env::get('queue.expire',60), // 任务的过期时间,默认为60秒; 若要禁用,则设置为 null
'default' => 'default', // 默认的队列名称
'host' => Env::get('queue.host','127.0.0.1'), // redis 主机ip
'port' => Env::get('queue.port',6379), // redis 端口
'password' => Env::get('queue.password',''), // redis 密码
'select' => Env::get('queue.select',0), // 使用哪一个 db,默认为 db0
'timeout' => 0, // redis连接的超时时间
'persistent' => false, // 是否是长连接
];
3.消息的创建和推送,首先要添加队列
<?php
namespace app\common\controller;
use think\Queue;
class OrderQueue
{
static $className = 'app\common\job';
public static function createOrderQueue($jobData)
{
// 1.当前任务将由哪个类来负责处理。
// 当轮到该任务时,系统将生成一个该类的实例,并调用其 fire 方法
$jobHandlerClassName = self::$className.'\Order';
// 2.当前任务归属的队列名称,如果为新队列,会自动创建
$jobQueueName = "createOrderJob";
// 3.当前任务所需的业务数据 . 不能为 resource 类型,其他类型最终将转化为json形式的字符串
// ( jobData 为对象时,需要在此处手动序列化,否则只存储其public属性的键值对 )
// 4.将该任务推送到消息队列,等待对应的消费者去执行
$isPushed = Queue::push( $jobHandlerClassName , $jobData , $jobQueueName );
// database 驱动时,返回值为 1|false ; redis 驱动时,返回值为 随机字符串|false
if( $isPushed !== false ){
return true;
}else{
return false;
}
}
}
4.消息队列的执行
<?php
namespace app\common\job;
use app\common\model\ErrorLog;
use think\queue\Job;
class Order
{
/**
* fire方法是消息队列默认调用的方法
* @param Job $job 当前的任务对象
* @param array|mixed $data 发布任务时自定义的数据
*/
public function fire(Job $job,$data)
{
//业务处理代码,具体不贴出来了
$isJobDone = $this->create($data);
if ($isJobDone) {
// 如果任务执行成功, 记得删除任务
$job->delete();
}else{
if ($job->attempts() > 3) {
//通过这个方法可以检查这个任务已经重试了几次了
$job->delete();
// 也可以重新发布这个任务
//print("<info>Hello Job will be availabe again after 2s."."</info>\n");
//$job->release(2); //$delay为延迟时间,表示该任务延迟2秒后再执行
}
}
}
private function create($data)
{
ErrorLog::insertLog('执行队列',__METHOD__,json_encode($data));
return true;
}
}
到此,所有代码准备完毕,经历了一个消息的 创建 -> 推送 -> 消费 -> 删除 的基本流程,
上面添加队列的代码只要放在任意一个控制器内就行,部分目录机构如下
微信截图_20200909143940.png
5.启用队列的监听模式,队列的监听模式有两种,配置参数如下
微信截图_20200909144211.png
只需要到项目根目录执行
php think queue:listen --queue createOrderJob
即可,具体根据项目实际情况加上不同的参数。而且我们具体是不能要让命令以守护进程执行。所以我们可以换成下面这条命令
nohup php think queue:listen --queue createOrderJob &
到此,整个消息队列就完成了
6.测试:
public function createOrder()
{
$data = [
'order_id' => 1,
];
OrderQueue::createOrderQueue($data);
}