PHP + Laravel + RabbitMQ + Redis 实现消息队列 (三) 消费队列在RabbitMQ和redis中的发布和订阅

news2025/1/22 19:38:55

发布订阅(Pub/Sub)

对于消息队列传统的模式来说,一个消费者消费一条消息,这条消息被消费之后就不会再次被其它的消费者消费。但是在发布订阅模式中,一条消息是可以被多个消费者消费的,这些消费者其实相当于是订阅了这条队列的消息。当有新的消息出现在队列中,就会像广播一样让所有订阅者都获得这条消息。
来源于RabbitMQ官网

为什么要使用发布订阅模式

  • 解耦和异步通信: 发布订阅模式允许发布者(发布消息的一方)和订阅者(接收消息的一方)之间解耦。发布者不需要知道哪些订阅者会接收消息,订阅者也不需要了解消息的来源。这种解耦使系统更加灵活和可扩展。
  • 实时数据处理和通知: 当需要实时传输数据并且多个接收者需要收到同一数据时,发布订阅模式特别有用。例如,即时聊天应用程序中的消息传输,或者实时数据分析系统中的数据处理和通知。
  • 事件驱动架构: 在事件驱动架构中,发布订阅模式是核心机制之一。系统中的各个组件可以通过发布和订阅事件来响应特定的业务事件,从而使系统更加响应式和可维护。
  • 分布式系统协调: 在分布式系统中,不同节点之间可能需要进行协调和通信。通过发布订阅模式,可以实现跨节点的消息传递和事件处理,促进系统间的解耦和灵活性。
  • 解决竞态条件: 在多线程或多进程环境中,使用发布订阅模式可以避免竞态条件的发生。订阅者能够按照自己的速度和时间处理接收到的消息,不会因为速度不同而导致数据不一致或丢失。

举个例子

比如现在有一个订单系统,在用户下单以后,我们需要同步给用户发送下单成功的通知,同时也需要给商家发送用户已经下单的通知;
如果使用传统的模式,我们大概需要一个事务隔离的环境执行如下逻辑

  1. 用户成功下单
  2. 给用户发送短信、站内消息等
  3. 给商家发送有用户下单短信、站内消息等

如果使用发布/订阅模式的话则可以拆成两个部分;

  • 发布者
  1. 用户成功下单
  2. 发布者发布消息 publish
  • 订阅者
  1. 订阅者一,发送消息
  2. 订阅者二,发送消息

RabbitMQ实现

在 RabbitMQ 中,交换机(Exchange)是消息的分发中心,它决定了消息应该被发送到哪些队列。RabbitMQ 提供了几种不同的交换机类型,每种类型都有不同的消息分发规则,其中包括了发布订阅模式的实现方式。
其中 Fanout Exchange (扇出交换机)
它会把所有发送到它的消息广播到所有与它绑定的队列中。这种模式实现了典型的发布订阅(Publish-Subscribe)模式,其中:

  • 发布者将消息发送到 Fanout Exchange。
  • Fanout Exchange 接收到消息后,会将消息复制并发送到所有与之绑定的队列。
  • 订阅者分别从各自的队列中接收消息。

发布者代码(创建订单)

    // 发布订单创建
    public function orderCreate(){
        $connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
        $channel = $connection->channel();

        // 定义交换机
        $channel->exchange_declare('orders', 'fanout', false, false, false);

        $data = '订单号:' . time();
        $msg = new AMQPMessage($data);

        // 注意,这里是指定的交换机,第三个参数还是队列名,之前普通队列我们指定的是第三个参数
        $channel->basic_publish($msg, 'orders');

        echo '[x] 发送消息 ', $data;

        $channel->close();
        $connection->close();
    }

订阅者(发送短信和邮件)

		// 发送短信
    	$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
        $channel = $connection->channel();

        $channel->exchange_declare('orders', 'fanout', false,false,false);
        // 使用空队列名,由 RabbitMQ 生成随机队列名
         // 这里使用了 解构赋值   PHP 版本在 7.1  
        // 以上可以这样使用 queue_declare 方法返回的数组中提取第一个和第二个元素,分别赋给 $queue_name 和一个没有具体变量名的占位符
        [$queue_name, ,] = $channel->queue_declare('',false, false, true,false);
        // 队列绑定到 orders 交换机
        $channel->queue_bind($queue_name, 'orders');

        echo "[x] 等待数据,退出请按 CTRL+C\n";

        $callback = function($msg) {
            echo '[x] 接收到 ', $msg->body, ",开始向相关方发送短信....\n";
        };

        $channel->basic_consume($queue_name, '', false, true, false, false, $callback);

        while($channel->is_open()){
            $channel->wait();
        }

        $channel->close();
        $connection->close();

		// 发送邮件
 		$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
        $channel = $connection->channel();

        $channel->exchange_declare('orders', 'fanout', false,false,false);
        // 使用空队列名,由 RabbitMQ 生成随机队列名
        // 这里使用了 解构赋值   PHP 版本在 7.1  
        // 以上可以这样使用 queue_declare 方法返回的数组中提取第一个和第二个元素,分别赋给 $queue_name 和一个没有具体变量名的占位符
        [$queue_name, ,] = $channel->queue_declare('',false, false, true,false);
        // 队列绑定到 orders 交换机
        $channel->queue_bind($queue_name, 'orders');

        echo "[x] 等待数据,退出请按 CTRL+C\n";

        $callback = function($msg) {
            echo '[x] 接收到 ', $msg->body, ",开始向相关方发送邮件....\n";
        };

        $channel->basic_consume($queue_name, '', false, true, false, false, $callback);

        while($channel->is_open()){
            $channel->wait();
        }

        $channel->close();
        $connection->close();

我们可以发现其实在这两个代码中并没有太大差距,唯一的差距就是在模拟发送消息的时候echo的数据不一致,现在开始测试

测试

  1. 订阅者订阅
  • 命令行1(SMS)
    [x] 等待数据,退出请按 CTRL+C
  • 命令行2 (Email)
    [x] 等待数据,退出请按 CTRL+C
  1. 创建订单
  • 发送消息 订单号:1723028165
  1. 订阅者接收
  • 命令行1(SMS)
    [x] 接收到 订单号:1723028165,开始向相关方发送短信…
  • 命令行2 (Email)
    [x] 接收到 订单号:1723028165,开始向相关方发送邮件…

Redis实现

	// 订单创建
    public function redisOrderCreate(){
        $data = '订单号:' . time();

        Redis::publish('orders', $data);

        echo '[x] 发送消息 ', $data;
    }
	// 发布者订阅
	
	// 发送短信
	 echo "[x] 等待数据,退出请按 CTRL+C\n";
    // 订阅频道 'orders'
     Redis::subscribe(['orders'], function ($message, $channel) {
         echo "[x] 接收到 {$message},开始向相关方发送短信....\n";
     });
     
	// 发送邮件
    echo "[x] 等待数据,退出请按 CTRL+C\n";
    // 订阅频道 'orders'
     Redis::subscribe(['orders'], function ($message, $channel) {
         echo "[x] 接收到 {$message},开始向相关方发送邮件....\n";
     });
	

注意:

subscribe() 方法,而且这个方法是直接就会挂起当前应用程序的,不需要我们再使用 while 来做死循环挂起。一个 subscribe() 方法可以监听多个发布频道,所以它的第一个参数是数组。第二个参数就是一个回调函数,这个函数有三个参数,分别是 redis实例、频道名称、消息内容 。
在使用 subscribe() 挂起程序的时候,要设置一下连接超时时间,要不过一会超过默认的连接超时时间后就会断开连接了。
在 Laravel 中,对于 Redis 的操作,特别是在使用 illuminate/redis 组件时,并没有直接支持设置 OPT_READ_TIMEOUT 这样的常量选项。
如果你需要在 Laravel 中控制 Redis 的读取超时,你可以考虑通过 Redis 客户端的其他方式来实现,例如使用 Predis 库。

composer require predis/predis
use Predis\Client;

// 创建 Predis 客户端实例
$redis = new Client();

// 设置读取超时时间
$redis->getConnection()->setReadTimeout(-1);

// 订阅频道 'orders'
$redis->pubSubLoop()->subscribe('orders', function ($message) {
    echo "[x] 接收到 {$message->payload},开始向相关方发送xx....\n";
});

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1990958.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

SOMEIP_ETS_021:echoINT8

测试目的: 验证DUT在发送和接收INT8参数时,是否能够保持参数的值和顺序不变。 描述 本测试用例旨在检验DUT在处理包含INT8类型参数的SOME/IP消息时,是否能够正确地发送和接收这些参数,并且确保返回的方法响应消息中的INT8参数值…

QT(C#)-QTabWidget修改字体后Tab页显示不完整的解决方法

提示:文章写完后,目录可以自动生成,如何生成可参考右边的帮助文档 文章目录 1、 前言2、问题示例3、解决方法 1、 前言 最近一段时间学习QT的程序开发,遇到了如标题所说的问题,经过查询和摸索找到了解决方法&#xff…

CSP-J 复赛 模拟题6

1.大小写字母互换: 题目描述 由输入给定一个字符串,你的任务是将原字符串中的大写字母转换成其对应的小写字母,还要将原字符串中的小写字母转换成对应的大写字母,其余字符不变。 输出转换之后得到的新字符串。 输入格式 一行…

Flink开发语言大比拼:Java与Scala怎么选好?

在选择Apache Flink的开发语言时,Java和Scala各有优劣,最合适的选择取决于项目需求、团队技能和偏好。 Apache Flink是一个开源流处理框架,广泛应用于实时数据处理场景,如金融交易监控、网络流量分析和用户行为分析等。Flink支持J…

水泵性能参数详解

水泵性能参数之流量 水泵流量是指水泵单位时间内输送液体的体积或重量,用符号Q来表示,常用单位有m/h、m/s、L/s或t/h。 水泵铭牌上标注的流量是这台泵的设计流量,又称额定流量,水泵在额定流量下运行效率最高。 水泵流量计算公式为…

代码随想录训练营 Day23打卡 回溯算法part02 39. 组合总和 40. 组合总和II 131. 分割回文串

代码随想录训练营 Day23打卡 回溯算法part02 一、 力扣39. 组合总和 给你一个 无重复元素 的整数数组 candidates 和一个目标整数 target ,找出 candidates 中可以使数字和为目标数 target 的 所有 不同组合 ,并以列表形式返回。你可以按 任意顺序 返回…

李飞飞亲自撰文,数十名科学家签署联名信,反对加州AI限制法案

AI真的已经危险到要如此监管的地步了吗? 点击访问我的技术博客https://ai.weoknow.comhttps://ai.weoknow.com 在创新的热土硅谷,李飞飞、吴恩达等 AI 科学家正在与监管部门展开一场关于安全与创新的拉锯战。 这场拉锯战的核心是一个名叫 SB-1047 的法案…

云平台部署 FunAudioLLM 语音天花板

FunAudioLLM FunAudioLLM 是阿里开源的语音处理模型,包含 SenseVoice 和 CosyVoice 两个模型。可以实现 5 种语言生成,以及 50 种语言无缝翻译,还能识别语音情绪。 FunAudioLLM:https://github.com/FunAudioLLM CosyVoice开源仓…

【Material-UI】按钮组:垂直按钮组详解

文章目录 一、按钮组概述1. 组件介绍2. 基本用法 二、垂直按钮组的应用场景1. 导航菜单2. 表单操作3. 选项切换 三、按钮组的样式定制1. 变体(Variants)2. 颜色(Colors) 四、垂直按钮组的优势1. 空间利用2. 可读性与易用性3. 视觉…

【网络基础一】几乎不讲任何网络协议细节,搭建网络基本结构

文章目录 问题认识“协议”计算机通信问题技术问题应用问题 协议分层 统编程帮助我们处理数据,网络编程帮助我们获取数据,网络配上我们写的线程池模块很快就搭建起来了。 问题 网卡是文件吗? 是的,所以未来网络通信的本质反馈到编…

Obsidian插件安装与开发

大概背景 事情的起因还是因为做笔记,我喜欢利用插件Obsidian Git自动同步笔记到Gitee,写md文档有个问题就是关于图片如何存储。 我个人习惯是将所有图片都保存到指定的文件夹下,如图👇 由于Obsidian对粘贴图片默认格式为这样的&…

ESXI加入VMware现有集群提示常规性错误

集群内有vSphere6.5和6.7的版本,都开启了EVC 这台老服务器是DELL R710添加时报错,网上查了些资料说要重装ESXI或者关闭EVC等等 最终解决方法是,给这台ESXI配置一个NTP服务器,同步系统时间,之后即可正常加入集群 往期文…

【安卓】文件存储

文章目录 将数据存储到文件中从文件中读取数据 文件存储是Android中最基本的数据存储方式,它不对存储的内容进行任何格式化处理,所有数据都是原封不动地保存到文件当中的,因而它比较适合存储一些简单的文本数据或二进制数据。如果你想使用文件…

家庭教育—情绪教育:塑造孩子情绪智力的金钥匙

文章目录 1. 背景介绍2. “1310镇静”方法的介绍3. 方法的科学依据4. 实施步骤5. 总结 1. 背景介绍 在快节奏的现代生活中,儿童面临着越来越多的情绪挑战。情绪教育作为素质教育的重要组成部分,越来越受到家长和教育者的重视。情绪教育不仅关乎儿童的心…

第100+20步 ChatGPT学习:R实现Lasso回归

基于R 4.2.2版本演示 一、写在前面 花了好几期分享了使用R语言实现机器学习分类,基本把常见模型都讲完了。 最后就以Lasso回归收尾得了。 Lasso回归应该很出名了,做特征变量筛选的,因此,不过多介绍。 二、R代码实现Lasso回归 …

OceanBase V4.2特性解析:MySQL模式下GIS空间表达式的场景及能力解析

1. 背景 1.1. OceanBase Mysql gis空间表达式的应用场景及能力 在OceanBase 4.1版本中,mysql模式下支持了gis数据类型以及部分空间对象相关的表达式,随着客户使用空间数据的需求日益增长,需要快速地补齐空间数据存储和计算分析的能力&#…

简单分享下Python进程

1. 单进程与多进程 理论讲解: 进程是操作系统中资源分配的基本单位,每个进程都有独立的内存空间。 多进程允许同时运行多个进程,提高CPU利用率和程序响应速度。 示例代码: import os print("当前进程ID:", os.getp…

【适配器模式】设计模式: 穿越接口的时空隧道(架起接口间的桥梁)

文章目录 Java 设计模式之适配器模式:理论与实践1. 引言1.1 结构型模式介绍1.2 为什么需要适配器模式? 2. 适配器模式概述2.1 定义2.2 关键概念2.3 适配器模式的类型 3. 适配器模式的参与者4. 适配器模式的工作原理4.1 类适配器模式的工作流程4.2 对象适…

CSS基础 - CSS3

目录 A. 简介 B. 基础用法 C. 总结 A. 简介 CSS3 是 CSS(层叠样式表)技术的升级版本。 一、新特性概述 选择器增强 CSS3 引入了更多强大的选择器,使得开发者能够更精确地选择和样式化网页元素。例如,属性选择器可以根据元素…