tp8 使用rabbitMQ(3)发布/订阅

news2024/12/29 9:21:18

发布/订阅

当我们想把一个消息,发送给 多个消费者的时候,我们把这种模式叫做发布/订阅模式,比如我们做两个消费者,其中一个消费者把消息写入磁盘中,别一个消费者把消息结果输出到屏幕上,就要用到发布订阅模式

  • 发布者(producer)是发布消息的应用程序。
  • 队列(queue)用于消息存储的缓冲。
  • 消费者(consumer)是接收消息的应用程序。

下面的图才是 rabbitmq 的完整模式, 中间是有交换机的
在这里插入图片描述

交换机

RabbitMQ消息模型的核心理念是:发布者(producer)不会直接发送任何消息给队列。事实上,发布者(producer)甚至不知道消息是否已经被投递到队列。
发布者(producer)只需要把消息发送给一个交换机(exchange)。交换机非常简单,它一边从发布者方接收消息,一边把消息推送到队列。交换机必须知道如何处理它接收到的消息,是应该推送到指定的队列还是是多个队列,或者是直接忽略消息。这些规则是通过交换机类型(exchange type)来定义的。

我们之前的 简单队列和工作队列中,没有提来交换机的概念。

默认交换机

当我们使用RabbitMQ时,如果不指定交换机的类型,那么Rabbit会使用默认的一个交换机,这个默认的交换机类型是一个直连交换机(direct),后续新建的队列(queue)都会自动绑定到这个默认交换机上,绑定的路由键就是队列的名称,注意这个默认交换机的名称是一个空字符串 " "

交换机的种类有多种

直连交换机(direct), 主题交换机(topic), (头交换机)headers和 扇型交换机(fanout)
头交换机的性能不好, 基本不用
用的最多的还是 扇形交换机 相当于是广播
前面两节中,我们只使用了下面的代码,其实是使用的默认交换机,没有定义,直接使用了

$channel->basic_publish($msg, '', 'hello');

发布订阅模式中,我们使用 扇形交换机 fanout 代码如下

$channel->exchange_declare('logs', 'fanout', false, false, false);
$channel->basic_publish($msg, 'logs');

上面两段代码比较,第一段,因为使用了默认的交换机,所以没有交换机的定义语句, 但是在发布的时候,中间那个参数是 “”,这样就指定了默认交换机的名称, 第二段,我们指明了要使用的交换机 fanout 所以在发布的时候,使用的是自定义的交换机名称


因为有了交换机,生产者代码中只需要把 message 发送给交换机就可以了, 所以生产者中不需要创建队列,创建队列放到 消费者中就可以了,(如果我们一定要把创建队列的时机放在生产者中,也是可以的, 个人根据需要灵活应用)

交换机和队列的绑定(这里应该是在消费者代码中出现的)

我们创建了交换机,并且有了N个队列,它们之间要建立绑定关系,才可以分发到相应有绑定的队列中

$channel->queue_bind($queue_name, 'hello');  //这样就把队列名称和交换机名称做了绑定

下面的 完整的代码示例

生产者

<?php
declare (strict_types = 1);

namespace app\command;

use ba\Exception;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use think\console\Command;
use think\console\Input;
use think\console\input\Argument;
use think\console\input\Option;
use think\console\Output;

class PubSubMQProduce extends Command
{
    protected function configure()
    {
        // 指令配置
        $this->setName('pubsubmqproduce')
            ->setDescription('发布订阅模式的生产者');
    }

    protected function execute(Input $input, Output $output)
    {
        //获取连接
        $connection = $this->getConnectRabbitMQ();
        //创建通道
        $channel = $connection->channel();
        //创建交换机
        /**
         * params exchange  自定义交换机名称
         * params type  交换机的类型, 一般都会使用 扇形(fanout)
         * params passive 是否消极声名
         * params durable 是否持久化
         * params auto_delete 是否自动删除
         * params internal 设置是否内置的, true表示是内置的交换器,客户端程序无法直接发送消息到这个交换器中, 只能通过交换器路由到交换器这个方式
         * params  nowait 相当于做一个异步版的声明,不等待返回,就让程序继续执行
         */
        $channel->exchange_declare("exchangeName","fanout",false,false,false,false,false);
        //现在生产者只需要把消息发给交换机就可以了,所以不用在生产者中创建队列了(当然,想创建也是可以的)

        for ($i = 0; $i < 20; $i++) {
            $msgArr = [
                "name"=>"haha".$i,
                "age"=>'10'.$i,
                "sex"=>"female".$i
            ];
            $msg = new AMQPMessage(json_encode($msgArr),[
                "delivery_mode"=>AMQPMessage::DELIVERY_MODE_PERSISTENT
            ]);
            sleep(1);
            $channel->basic_publish($msg,"exchangeName");
        }

        $channel->close();
        $connection->close();
    }

    protected function getConnectRabbitMQ(){
        try{
            $connection = new AMQPStreamConnection("192.168.3.228",'5672',"admin","123456");
            return $connection;
        }catch(Exception $e){
            throw new Exception("队列连接失败");
        }

    }
}

消费者

<?php
declare (strict_types = 1);

namespace app\command;

use ba\Exception;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use think\console\Command;
use think\console\Input;
use think\console\input\Argument;
use think\console\input\Option;
use think\console\Output;

class PubSubMQConsumer extends Command
{
    protected function configure()
    {
        // 指令配置
        $this->setName('pubsubmqconsumer')
            ->setDescription('发布订阅模式的消费者');
    }

    protected function execute(Input $input, Output $output)
    {
        $connection = $this->connectRabbitMQ();
        $channel = $connection->channel();
        //创建两个队列
        $channel->queue_declare("queueName1",false,false,false,false,false);
        $channel->queue_declare("queueName2",false,false,false,false,false);
        //绑定交换机和队列,交换机的名称是在生产者中定义的
        $channel->queue_bind("queueName1","exchangeName");
        $channel->queue_bind("queueName2","exchangeName");
        //设置消息处理函数
        $callback1 = function($msg){
            $msgArr = json_decode($msg->body,true);
            echo "这是(显示)处理数据的队列NO1  ".$msgArr["name"]."-11-".$msgArr["age"]."-11-".$msgArr["sex"].PHP_EOL;
            $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);   //这里让就是消息的应答了
        };
        $callback2 = function($msg){
            $msgArr = json_decode($msg->body,true);
            echo "这是(保存)处理数据的队列NO2  ".$msgArr["name"]."-22-".$msgArr["age"]."-22-".$msgArr["sex"].PHP_EOL;
            $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);   //这里让就是消息的应答了
        };
        $channel->basic_consume("queueName1","",false,false,false,false,$callback1);
        $channel->basic_consume("queueName2","",false,false,false,false,$callback2);
        while(count($channel->callbacks)){
            $channel->wait();
        }
    }

    protected function connectRabbitMQ(){
        try{
            $connection = new AMQPStreamConnection("192.168.3.228",'5672',"admin","123456");
            return $connection;
        }catch(Exception $e){
            throw new Exception("队列连接失败");
        }

    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1245429.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

leetcode刷题日志-70爬楼梯

假设你正在爬楼梯。需要 n 阶你才能到达楼顶。 每次你可以爬 1 或 2 个台阶。你有多少种不同的方法可以爬到楼顶呢&#xff1f; 示例 1&#xff1a; 输入&#xff1a;n 2 输出&#xff1a;2 解释&#xff1a;有两种方法可以爬到楼顶。 1 阶 1 阶2 阶 示例 2&#xff1a; …

牛气霸屏-快抖云推独立版V1.6.7

介绍 快抖云推全插件独立版是最近很火的牛气霸屏系统独立版&#xff0c;牛气霸屏系统就是商家通过系统在线创建抖音或快手霸屏活动&#xff0c;并生成该活动的爆客二维码&#xff0c;用户通过扫二维码即可参加活动&#xff08;活动可以是领取卡劵&#xff0c;抽奖等&#xff0…

RT-Thread 线程间通信【邮箱、消息队列、信号】

线程间通信 一、邮箱1. 创建邮箱2. 发送邮件3. 接收邮件4. 删除邮箱5. 代码示例 二、消息队列1. 创建消息队列2. 发送消息3. 接收消息4. 删除消息队列5. 代码示例 三、信号1. 安装信号2. 阻塞信号3. 解除信号阻塞4. 发送信号5. 等待信号6. 代码示例 一、邮箱 RT-Thread 操作系…

6个PPT素材网站,让你快速做出好看的PPT

找PPT模板一定要收藏好这6个网站&#xff0c;能让你快速做出好看的PPT&#xff0c;重点十可以免费下载&#xff0c;赶紧收藏&#xff01; 1、菜鸟图库 https://www.sucai999.com/search/ppt/0_0_0_1.html?vNTYwNDUx 菜鸟图库网有非常丰富的免费素材&#xff0c;像设计类、办公…

Linux系统介绍及文件类型和权限

终端:CtrlAltT 或者桌面/文件夹右键,打开终端 切换为管理员:sudo su 退出:exit 查看内核版本号:uname -a 内核版本号含义:5 代表主版本号;13代表次版本号;0代表修订版本号;30代表修订版本的第几次微调;数字越大表示内核越新. 目录结构 /bin:存放常用命令(即二进制可执行程序…

c++|类和对象(上)

目录 一、面向过程和面向对象初步认识 二、类的引入和定义 2.1类的引入 2.2类的定义 三、类的访问限定符及封装 3.1访问限定符 3.2封装 四、类的作用域 五、类的实例化 六、类的对象大小的计算 6.1如何计算对象的大小 6.2类对象的存储方式 七、类成员函数的thi…

OpenCV快速入门:图像分析——傅里叶变换、积分图像

文章目录 前言一、傅里叶变换1.1 离散傅里叶变换1.1.1 离散傅里叶变换原理1.1.2 离散傅里叶变换公式1.1.3 代码实现1.1.4 cv2.dft 函数解析 1.2 傅里叶变换进行卷积1.2.1 傅里叶变换卷积原理1.2.2 傅里叶变换卷积公式1.2.3 代码实现1.2.4 cv2.mulSpectrums 函数解析 1.3 离散余…

LangChain的简单使用介绍

❤️觉得内容不错的话&#xff0c;欢迎点赞收藏加关注&#x1f60a;&#x1f60a;&#x1f60a;&#xff0c;后续会继续输入更多优质内容❤️ &#x1f449;有问题欢迎大家加关注私戳或者评论&#xff08;包括但不限于NLP算法相关&#xff0c;linux学习相关&#xff0c;读研读博…

解决kubernetes中微服务pod之间调用失败报错connection refused的问题

现象&#xff1a; 从这里可以看到是当前服务在调用product service服务是出现了连接拒绝connection refused 走读一下原始代码&#xff1a; 可以看到请求是由FeignClient代理发出的 &#xff0c;但问题在于为什么Feign请求的时候会产生connection refused错误&#xff1f; 上…

Python+jieba+wordcloud实现文本分词、词频统计、条形图绘制及不同主题的词云图绘制

目录 序言&#xff1a;第三方库及所需材料函数模块介绍分词词频统计条形图绘制词云绘制主函数 效果预览全部代码 序言&#xff1a;第三方库及所需材料 编程语言&#xff1a;Python3.9。 编程环境&#xff1a;Anaconda3&#xff0c;Spyder5。 使用到的主要第三方库&#xff1a;…

飞书+ChatGPT搭建智能AI助手并实现公网访问web界面

文章目录 前言环境列表1.飞书设置2.克隆feishu-chatgpt项目3.配置config.yaml文件4.运行feishu-chatgpt项目5.安装cpolar内网穿透6.固定公网地址7.机器人权限配置8.创建版本9.创建测试企业10. 机器人测试 前言 在飞书中创建chatGPT机器人并且对话&#xff0c;在下面操作步骤中…

时间序列分析算法的概念、模型检验及应用

时间序列分析是一种用于研究随时间变化的数据模式和趋势的统计方法。这类数据通常按照时间顺序排列&#xff0c;例如股票价格、气温、销售额等。时间序列分析的目标是从过去的观测中提取信息&#xff0c;以便预测未来的趋势。 以下是关于时间序列分析的一些重要概念、模型检验…

【深度学习】学习率及多种选择策略

学习率是最影响性能的超参数之一&#xff0c;如果我们只能调整一个超参数&#xff0c;那么最好的选择就是它。相比于其它超参数学习率以一种更加复杂的方式控制着模型的有效容量&#xff0c;当学习率最优时&#xff0c;模型的有效容量最大。本文从手动选择学习率到使用预热机制…

插入排序(形象类比)

最近在看riscv手册的时候&#xff0c;里面有一段代码是插入排序&#xff0c;但是单看代码的时候有点迷&#xff0c;没看懂咋操作的&#xff0c;后来又查资料复习了一下&#xff0c;最终才把代码看明白&#xff0c;所以写篇博客记录一下。 插入排序像打扑克牌 这是我听到过比较形…

RubyMine 2023:提升Rails/Ruby开发效率的强大利器

在Rails/Ruby开发领域&#xff0c;JetBrains RubyMine一直以其强大的功能和优秀的性能而备受开发者的青睐。现如今&#xff0c;我们迎来了全新的RubyMine 2023版本&#xff0c;它将为开发者们带来更高效的开发体验和无可比拟的工具支持。 首先&#xff0c;RubyMine 2023提供了…

IDEA安装教程

文章目录 1 下载IntelliJ IDEA2 安装3 IDEA配置4 创建项目 1 下载IntelliJ IDEA ​ 官方网站上下载最新版本的IntelliJ IDEA。官方网站提供了两个版本&#xff1a;Community版和Ultimate版。 Community版是免费的&#xff0c;适用于个人和非商业用途。Ultimate版则需要付费购…

ESP32之避障

ESP32之避障 图片 程序 int Led27;//定义LED 接口 int buttonpin4; //定义光遮断传感器接口 int val;//定义数字变量val void setup() { pinMode(Led,OUTPUT);//定义LED 为输出接口 pinMode(buttonpin,INPUT);//定义避障传感器为输出接口 } void loop() {Serial.begin(9600);…

JVMj之console Java监视与管理控制台

jconsole Java监视与管理控制台 1、jconsole介绍 jconsole (java monitoring and management console)是一款基于JMX (Java Management Extensions) 的可视化监视和管理工具。 2、启动jconsole 1、在linux和windwos下通过jconsole启动即可。 2、然后会自动搜索本机运行的…

【栈】不同字符的最小子序列

题目&#xff1a; /*** 思路&#xff1a;栈,使用数组记录每个字母出现的次数&#xff0c;再用一个数组标记字符是否在栈中* 遍历栈&#xff0c;存储字符时比较栈顶字符&#xff0c;若小于栈顶字符并且后面有重复的字符则* 栈顶元素出栈&#xff0c;否则入栈。** au…

超级利器!Postman自动化接口测试让你提升测试效率,节省宝贵时间!

Postman自动化接口测试 该篇文章针对已经掌握 Postman 基本用法的读者&#xff0c;即对接口相关概念有一定了解、已经会使用 Postman 进行模拟请求的操作。 当前环境&#xff1a; Window 7 - 64 Postman 版本&#xff08;免费版&#xff09;&#xff1a;Chrome App v5.5.3 …