发布订阅(Pub/Sub)
对于消息队列传统的模式来说,一个消费者消费一条消息,这条消息被消费之后就不会再次被其它的消费者消费。但是在发布订阅模式中,一条消息是可以被多个消费者消费的,这些消费者其实相当于是订阅了这条队列的消息。当有新的消息出现在队列中,就会像广播一样让所有订阅者都获得这条消息。
为什么要使用发布订阅模式
- 解耦和异步通信: 发布订阅模式允许发布者(发布消息的一方)和订阅者(接收消息的一方)之间解耦。发布者不需要知道哪些订阅者会接收消息,订阅者也不需要了解消息的来源。这种解耦使系统更加灵活和可扩展。
- 实时数据处理和通知: 当需要实时传输数据并且多个接收者需要收到同一数据时,发布订阅模式特别有用。例如,即时聊天应用程序中的消息传输,或者实时数据分析系统中的数据处理和通知。
- 事件驱动架构: 在事件驱动架构中,发布订阅模式是核心机制之一。系统中的各个组件可以通过发布和订阅事件来响应特定的业务事件,从而使系统更加响应式和可维护。
- 分布式系统协调: 在分布式系统中,不同节点之间可能需要进行协调和通信。通过发布订阅模式,可以实现跨节点的消息传递和事件处理,促进系统间的解耦和灵活性。
- 解决竞态条件: 在多线程或多进程环境中,使用发布订阅模式可以避免竞态条件的发生。订阅者能够按照自己的速度和时间处理接收到的消息,不会因为速度不同而导致数据不一致或丢失。
举个例子
比如现在有一个订单系统,在用户下单以后,我们需要同步给用户发送下单成功的通知,同时也需要给商家发送用户已经下单的通知;
如果使用传统的模式,我们大概需要一个事务隔离的环境执行如下逻辑
- 用户成功下单
- 给用户发送短信、站内消息等
- 给商家发送有用户下单短信、站内消息等
如果使用发布/订阅模式的话则可以拆成两个部分;
- 发布者
- 用户成功下单
- 发布者发布消息 publish
- 订阅者
- 订阅者一,发送消息
- 订阅者二,发送消息
RabbitMQ实现
在 RabbitMQ 中,交换机(Exchange)是消息的分发中心,它决定了消息应该被发送到哪些队列。RabbitMQ 提供了几种不同的交换机类型,每种类型都有不同的消息分发规则,其中包括了发布订阅模式的实现方式。
其中 Fanout Exchange (扇出交换机)
它会把所有发送到它的消息广播到所有与它绑定的队列中。这种模式实现了典型的发布订阅(Publish-Subscribe)模式,其中:
- 发布者将消息发送到 Fanout Exchange。
- Fanout Exchange 接收到消息后,会将消息复制并发送到所有与之绑定的队列。
- 订阅者分别从各自的队列中接收消息。
发布者代码(创建订单)
// 发布订单创建
public function orderCreate(){
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
// 定义交换机
$channel->exchange_declare('orders', 'fanout', false, false, false);
$data = '订单号:' . time();
$msg = new AMQPMessage($data);
// 注意,这里是指定的交换机,第三个参数还是队列名,之前普通队列我们指定的是第三个参数
$channel->basic_publish($msg, 'orders');
echo '[x] 发送消息 ', $data;
$channel->close();
$connection->close();
}
订阅者(发送短信和邮件)
// 发送短信
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
$channel->exchange_declare('orders', 'fanout', false,false,false);
// 使用空队列名,由 RabbitMQ 生成随机队列名
// 这里使用了 解构赋值 PHP 版本在 7.1
// 以上可以这样使用 queue_declare 方法返回的数组中提取第一个和第二个元素,分别赋给 $queue_name 和一个没有具体变量名的占位符
[$queue_name, ,] = $channel->queue_declare('',false, false, true,false);
// 队列绑定到 orders 交换机
$channel->queue_bind($queue_name, 'orders');
echo "[x] 等待数据,退出请按 CTRL+C\n";
$callback = function($msg) {
echo '[x] 接收到 ', $msg->body, ",开始向相关方发送短信....\n";
};
$channel->basic_consume($queue_name, '', false, true, false, false, $callback);
while($channel->is_open()){
$channel->wait();
}
$channel->close();
$connection->close();
// 发送邮件
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
$channel->exchange_declare('orders', 'fanout', false,false,false);
// 使用空队列名,由 RabbitMQ 生成随机队列名
// 这里使用了 解构赋值 PHP 版本在 7.1
// 以上可以这样使用 queue_declare 方法返回的数组中提取第一个和第二个元素,分别赋给 $queue_name 和一个没有具体变量名的占位符
[$queue_name, ,] = $channel->queue_declare('',false, false, true,false);
// 队列绑定到 orders 交换机
$channel->queue_bind($queue_name, 'orders');
echo "[x] 等待数据,退出请按 CTRL+C\n";
$callback = function($msg) {
echo '[x] 接收到 ', $msg->body, ",开始向相关方发送邮件....\n";
};
$channel->basic_consume($queue_name, '', false, true, false, false, $callback);
while($channel->is_open()){
$channel->wait();
}
$channel->close();
$connection->close();
我们可以发现其实在这两个代码中并没有太大差距,唯一的差距就是在模拟发送消息的时候echo的数据不一致,现在开始测试
测试
- 订阅者订阅
- 命令行1(SMS)
[x] 等待数据,退出请按 CTRL+C - 命令行2 (Email)
[x] 等待数据,退出请按 CTRL+C
- 创建订单
- 发送消息 订单号:1723028165
- 订阅者接收
- 命令行1(SMS)
[x] 接收到 订单号:1723028165,开始向相关方发送短信… - 命令行2 (Email)
[x] 接收到 订单号:1723028165,开始向相关方发送邮件…
Redis实现
// 订单创建
public function redisOrderCreate(){
$data = '订单号:' . time();
Redis::publish('orders', $data);
echo '[x] 发送消息 ', $data;
}
// 发布者订阅
// 发送短信
echo "[x] 等待数据,退出请按 CTRL+C\n";
// 订阅频道 'orders'
Redis::subscribe(['orders'], function ($message, $channel) {
echo "[x] 接收到 {$message},开始向相关方发送短信....\n";
});
// 发送邮件
echo "[x] 等待数据,退出请按 CTRL+C\n";
// 订阅频道 'orders'
Redis::subscribe(['orders'], function ($message, $channel) {
echo "[x] 接收到 {$message},开始向相关方发送邮件....\n";
});
注意:
subscribe() 方法,而且这个方法是直接就会挂起当前应用程序的,不需要我们再使用 while 来做死循环挂起。一个 subscribe() 方法可以监听多个发布频道,所以它的第一个参数是数组。第二个参数就是一个回调函数,这个函数有三个参数,分别是 redis实例、频道名称、消息内容 。
在使用 subscribe() 挂起程序的时候,要设置一下连接超时时间,要不过一会超过默认的连接超时时间后就会断开连接了。
在 Laravel 中,对于 Redis 的操作,特别是在使用 illuminate/redis 组件时,并没有直接支持设置 OPT_READ_TIMEOUT 这样的常量选项。
如果你需要在 Laravel 中控制 Redis 的读取超时,你可以考虑通过 Redis 客户端的其他方式来实现,例如使用 Predis 库。
composer require predis/predis
use Predis\Client;
// 创建 Predis 客户端实例
$redis = new Client();
// 设置读取超时时间
$redis->getConnection()->setReadTimeout(-1);
// 订阅频道 'orders'
$redis->pubSubLoop()->subscribe('orders', function ($message) {
echo "[x] 接收到 {$message->payload},开始向相关方发送xx....\n";
});