发布/订阅
当我们想把一个消息,发送给 多个消费者的时候,我们把这种模式叫做发布/订阅模式,比如我们做两个消费者,其中一个消费者把消息写入磁盘中,别一个消费者把消息结果输出到屏幕上,就要用到发布订阅模式
- 发布者(producer)是发布消息的应用程序。
- 队列(queue)用于消息存储的缓冲。
- 消费者(consumer)是接收消息的应用程序。
下面的图才是 rabbitmq 的完整模式, 中间是有交换机的
交换机
RabbitMQ消息模型的核心理念是:发布者(producer)不会直接发送任何消息给队列。事实上,发布者(producer)甚至不知道消息是否已经被投递到队列。
发布者(producer)只需要把消息发送给一个交换机(exchange)。交换机非常简单,它一边从发布者方接收消息,一边把消息推送到队列。交换机必须知道如何处理它接收到的消息,是应该推送到指定的队列还是是多个队列,或者是直接忽略消息。这些规则是通过交换机类型(exchange type)来定义的。
我们之前的 简单队列和工作队列中,没有提来交换机的概念。
默认交换机
当我们使用RabbitMQ时,如果不指定交换机的类型,那么Rabbit会使用默认的一个交换机,这个默认的交换机类型是一个直连交换机(direct),后续新建的队列(queue)都会自动绑定到这个默认交换机上,绑定的路由键就是队列的名称,注意这个默认交换机的名称是一个空字符串 " "
交换机的种类有多种
直连交换机(direct), 主题交换机(topic), (头交换机)headers和 扇型交换机(fanout)
头交换机的性能不好, 基本不用
用的最多的还是 扇形交换机 相当于是广播
前面两节中,我们只使用了下面的代码,其实是使用的默认交换机,没有定义,直接使用了
$channel->basic_publish($msg, '', 'hello');
发布订阅模式中,我们使用 扇形交换机 fanout 代码如下
$channel->exchange_declare('logs', 'fanout', false, false, false);
$channel->basic_publish($msg, 'logs');
上面两段代码比较,第一段,因为使用了默认的交换机,所以没有交换机的定义语句, 但是在发布的时候,中间那个参数是 “”,这样就指定了默认交换机的名称, 第二段,我们指明了要使用的交换机 fanout 所以在发布的时候,使用的是自定义的交换机名称
因为有了交换机,生产者代码中只需要把 message 发送给交换机就可以了, 所以生产者中不需要创建队列,创建队列放到 消费者中就可以了,(如果我们一定要把创建队列的时机放在生产者中,也是可以的, 个人根据需要灵活应用)
交换机和队列的绑定(这里应该是在消费者代码中出现的)
我们创建了交换机,并且有了N个队列,它们之间要建立绑定关系,才可以分发到相应有绑定的队列中
$channel->queue_bind($queue_name, 'hello'); //这样就把队列名称和交换机名称做了绑定
下面的 完整的代码示例
生产者
<?php
declare (strict_types = 1);
namespace app\command;
use ba\Exception;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use think\console\Command;
use think\console\Input;
use think\console\input\Argument;
use think\console\input\Option;
use think\console\Output;
class PubSubMQProduce extends Command
{
protected function configure()
{
// 指令配置
$this->setName('pubsubmqproduce')
->setDescription('发布订阅模式的生产者');
}
protected function execute(Input $input, Output $output)
{
//获取连接
$connection = $this->getConnectRabbitMQ();
//创建通道
$channel = $connection->channel();
//创建交换机
/**
* params exchange 自定义交换机名称
* params type 交换机的类型, 一般都会使用 扇形(fanout)
* params passive 是否消极声名
* params durable 是否持久化
* params auto_delete 是否自动删除
* params internal 设置是否内置的, true表示是内置的交换器,客户端程序无法直接发送消息到这个交换器中, 只能通过交换器路由到交换器这个方式
* params nowait 相当于做一个异步版的声明,不等待返回,就让程序继续执行
*/
$channel->exchange_declare("exchangeName","fanout",false,false,false,false,false);
//现在生产者只需要把消息发给交换机就可以了,所以不用在生产者中创建队列了(当然,想创建也是可以的)
for ($i = 0; $i < 20; $i++) {
$msgArr = [
"name"=>"haha".$i,
"age"=>'10'.$i,
"sex"=>"female".$i
];
$msg = new AMQPMessage(json_encode($msgArr),[
"delivery_mode"=>AMQPMessage::DELIVERY_MODE_PERSISTENT
]);
sleep(1);
$channel->basic_publish($msg,"exchangeName");
}
$channel->close();
$connection->close();
}
protected function getConnectRabbitMQ(){
try{
$connection = new AMQPStreamConnection("192.168.3.228",'5672',"admin","123456");
return $connection;
}catch(Exception $e){
throw new Exception("队列连接失败");
}
}
}
消费者
<?php
declare (strict_types = 1);
namespace app\command;
use ba\Exception;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use think\console\Command;
use think\console\Input;
use think\console\input\Argument;
use think\console\input\Option;
use think\console\Output;
class PubSubMQConsumer extends Command
{
protected function configure()
{
// 指令配置
$this->setName('pubsubmqconsumer')
->setDescription('发布订阅模式的消费者');
}
protected function execute(Input $input, Output $output)
{
$connection = $this->connectRabbitMQ();
$channel = $connection->channel();
//创建两个队列
$channel->queue_declare("queueName1",false,false,false,false,false);
$channel->queue_declare("queueName2",false,false,false,false,false);
//绑定交换机和队列,交换机的名称是在生产者中定义的
$channel->queue_bind("queueName1","exchangeName");
$channel->queue_bind("queueName2","exchangeName");
//设置消息处理函数
$callback1 = function($msg){
$msgArr = json_decode($msg->body,true);
echo "这是(显示)处理数据的队列NO1 ".$msgArr["name"]."-11-".$msgArr["age"]."-11-".$msgArr["sex"].PHP_EOL;
$msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']); //这里让就是消息的应答了
};
$callback2 = function($msg){
$msgArr = json_decode($msg->body,true);
echo "这是(保存)处理数据的队列NO2 ".$msgArr["name"]."-22-".$msgArr["age"]."-22-".$msgArr["sex"].PHP_EOL;
$msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']); //这里让就是消息的应答了
};
$channel->basic_consume("queueName1","",false,false,false,false,$callback1);
$channel->basic_consume("queueName2","",false,false,false,false,$callback2);
while(count($channel->callbacks)){
$channel->wait();
}
}
protected function connectRabbitMQ(){
try{
$connection = new AMQPStreamConnection("192.168.3.228",'5672',"admin","123456");
return $connection;
}catch(Exception $e){
throw new Exception("队列连接失败");
}
}
}