最简单的队列功能
RabbitMQ和消息传递通常会使用一些术语:
- 生产者(Producer)意味着发送消息。一个发送消息的程序称为生产者。
- 队列(Queue)尽管消息通过RabbitMQ和您的应用程序流动,但它们只能存储在队列中。队列仅受主机内存和磁盘限制,它本质上是一个大的消息缓冲区。
- 消费者(Consumer)与接收类似。消费者是一个主要等待接收消息的程序。
如上图 来自于RabbitMQ官网 其中 hello 这一个区块就是 Queue 也可以称为 Q
需要注意的是:
- 多个生产者可以发送消息到一个队列,多个消费者可以尝试从一个队列接收数据。
- 生产者、消费者和代理不必驻留在同一主机上。
- 一个应用程序也可以既是生产者又是消费者。
RebbitMQ 实现
- 首先 5672 是 RabbitMQ 的服务端口,15672 则是它自带的一个管理工具的访问端口(也就是上一篇文章我们开启的web管理端);
- 其次我们需要安装PHP 的 Composer 组件,
composer require php-amqplib/php-amqplib
,安装的时候尽量版本选择 3.2 以上版本,最开始我安装的是2.8 会缺失一些函数和官网Demo对不上;用于操作 RabbitMQ 消息队列,同时需要开启 sockets 扩展。这里需要去对应的php.ini 中找到 sockets并且打开它,然后重载配置或者重启你的服务;
P端添加数据 ( send )
// 建立连接
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel(); // 获取频道
// 定义队列
$channel->queue_declare('hello', false, false, false, false);
// 创建消息
$msg = new AMQPMessage('Hello World!');
$channel->basic_publish($msg, '', 'hello'); // 将消息放入队列中
echo "生产者向消息队列中发送信息:Hello World!";
$channel->close();
$connection->close();
在上面这段代码中 我们需要注意的是
- channel 它是共享单个 TCP 连接的轻量级信道 这样的好处是不用开辟多个TCP 连接减少多个连接带来的资源损失;管理简便,这种粒度的控制允许您根据需要调整每个通道的最大消息数量、优先级、确认模式等,以及整个连接的最大资源使用情况,从而更好地优化和保护系统的稳定性和可用性。
- AMQPMessage RabbitMQ 中是用来封装和传递消息内容及相关信息的对象;
消费/接受消息(Receiving)
// 建立连接
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel(); // 获取频道
// 定义队列
$channel->queue_declare('hello', false, false, false, false);
echo "等待消息,或者使用 Ctrl+C 退出程序。", PHP_EOL;
// 定义接收数据的回调函数
$callback = function ($msg) {
echo '接收到数据: ', $msg->body, PHP_EOL;
};
// 消费队列,获取到数据将调用 callback 回调函数
$channel->basic_consume('hello', '', false, true, false, false, $callback);
// 处理接收消息
while ($channel->is_open()) {
$channel->wait();
}
// 关闭频道和连接
$channel->close();
$connection->close();
我们进行一个测试,可以看到消费者先是输出了接收到的数据,然后正常的打印出来了;
这里我们的测试环境使用的是larveal;其中P端代码我放到了,作为一个基础的路由进行访问,但是在因为这里消费的代码不能这样处理所以消费端的代码我使用了 larveal 提供的命令行
Redis 实现
send
public function redisSend(){
$time = time();
echo "生产者向消息队列中发送信息:Hello World!".$time;
Redis::lpush('hello', 'Hello World!'.$time);
}
Receiving
class RedisReceivingCommand extends Command
{
protected $signature = 'redis:receiving';
protected $description = '消费队列';
public function handle()
{
echo "等待消息,或者使用 Ctrl+C 退出程序。", PHP_EOL;
while(1){
$data = Redis::rpop('hello');
if ($data){
echo '接收到数据: ', $data, PHP_EOL;
}
}
}
}