【MQ02】基础简单消息队列应用

news2025/1/15 20:43:51

基础简单消息队列应用

在上一课中,我们已经学习到了什么是消息队列,有哪些消息队列,以及我们会用到哪个消息队列。今天,就直接进入主题,学习第一种,最简单,但也是最常用,最好用的消息队列模式。

最简单的队列功能

最简单的队列功能,无非就是将我们在数据结构与算法中学过的那个队列结构,变成一个外部功能组件。让各种语言和各种应用程序都可以通过这个队列来进行数据操作。这样的一个队列系统就称之为“消息队列中间件”。

一般,我们会将生产消息的程序,或者说,将数据放入到队列的一方称为 P (生产者,Producer);然后将队列称为Q(Queue);最后,将守候在队列前,等待从队列中获取数据的应用、程序或者代码段称为 C(消费者/客户端,Consumer)。

27021c06f480119d7c4c2696a2109768.png

这是 RabbitMQ 官网手册上的图,后面的相关图示我们也将直接使用它们的。在这个图中,有字母的部分就不多解释了。中间红色的一格一格的部分代表的就是 Q 。

通常来说,P 只管将数据放到 Q 中,Q 负责中间存储数据,然后 C 取出数据。数据的进出方式遵循经典数据结构队列中的规则,也就是 FIFO 先进先出。

是不是就和我们之前说过的一样,最简单的理解,它就是将队列这个数据结构抽成了一个第三方组件。这样就可以跨平台、跨应用、跨语言地进行数据存取了。

RebbitMQ 实现

好了,先来看 RabbitMQ 的实现。你需要先安装好 RabbitMQ ,我这里是使用的 Docker 安装的。

docker run -d -p 5672:5672 -p 15672:15672 --name rabbitmq rabbitmq:management

然后,也要安装好 PHP 的 Composer 组件,用于操作 RabbitMQ 消息队列,PHP 需要开启 sockets 扩展。

composer require php-amqplib/php-amqplib

5672 是 RabbitMQ 的服务端口,15672 则是它自带的一个管理工具的访问端口。具体的内容大家可以到官方文档中进行更加深入的学习。当然,也可以使用虚拟机方式来搭建测试环境,这个大家看自己的喜好吧。

接下来,我们先实现 P 端的代码,也就是生产者向消息队列中添加数据。

// 2.rq.p.php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

// 建立连接
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel(); // 获取频道

// 定义队列
$channel->queue_declare('hello', false, false, false, false);

// 创建消息
$msg = new AMQPMessage('Hello World!');
$channel->basic_publish($msg, '', 'hello'); // 将消息放入队列中

echo "生产者向消息队列中发送信息:Hello World!";

$channel->close();
$connection->close();

注释很清晰了吧,如果你之前没有用 PHP 操作过 RabbitMQ 的话,那么直接复制这段代码就可以了。其中比较特殊的是  channel ,它是共享单个 TCP 连接的轻量级信道。这个概念可能是 RabbitMQ 相较于其它消息队列系统比较特别的。然后就是消息内容是通过一个 AMQPMessage 对象承载的,这个 AMQP 其实就是 RabbitMQ 的核心协议。协议是通信双方能够相互看明白对方的基础,能够建立通信的基础,就像我们之前在 Redis 中学习过的 RESP 协议一样。这里大家只需要知道 RabbitMQ 是使用这个协议就好了,而且它也支持其它的一些协议。发送完消息之后,记得关闭连接哦。

好了,接下来是我们的消费者/客户端实现。

// 2.rq.c.php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;

// 建立连接
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel(); // 获取频道

// 定义队列
$channel->queue_declare('hello', false, false, false, false);

echo "等待消息,或者使用 Ctrl+C 退出程序。", PHP_EOL;

// 定义接收数据的回调函数
$callback = function ($msg) {
    echo '接收到数据: ', $msg->body, PHP_EOL;
};
// 消费队列,获取到数据将调用 callback 回调函数
$channel->basic_consume('hello', '', false, true, false, false, $callback);

// 频道是开启状态时,挂起程序,不停地执行
while ($channel->is_open()) {
    // 等待并监听频道中的队列信息
    // 发现上方 basic_consume 定义的队列有消息后
    // 就调用它对应的 callback
    $channel->wait();
}

看着好像多很多东西呀?其实上面一半和生产者是一样的。从中间开始,我们使用的是 Channel 对象的 basic_consume() 方法,这个方法最后有一个回调函数参数。然后在下面通过 wait() 方法持续监听队列中是否有数据。如果有数据了,就调用指定的回调函数。并将消息内容交给回调函数的参数。

注意哦,一般来说,消息队列的消费者,或者说是客户端,或者说是 C 端。大部分情况下可能都会是这样通过一个死循环挂起的。目的就是当我们运行起程序之后,可以不停地,不间断地一直处理队列中的消息数据。之前在学习 Swoole 时,另外如果你学习过 Go 语言的话,也会发现它们的 Http 服务中也是有类似的死循环代码来实现服务端挂起的。这个大家可以到我的 Swoole 系列中看看哦。

测试一下吧,运行一下生产者代码。

➜  source git:(master) ✗ php 2.rq.p.php 
生产者向消息队列中发送信息:Hello World!%

执行结束了,只是输出了一句话,没啥别的效果。那么我们再来运行一下消费者代码。

> php 2.rq.c.php 
等待消息,或者使用 Ctrl+C 退出程序。
接收到数据: Hello World!

可以看到,消费者先是输出了接收到的数据,这个数据其实是上一步我们运行生产者插入到队列中的数据。现在,这条数据打印出来了,其实就是相当于已经被我们消费了。当然,在实际业务中,你可能会对这些数据进行更复杂的业务操作。但在演示时,我这里只是打印了一下。然后,消费者会继续挂在这里等待下一条消息的到来。这时,你可以再次运行生产者代码,然后就会看到消费者这边直接就已经消费了。

Redis 实现

对于 Redis 的实现,其实非常简单,我们之前也已经学过的,那就是使用 List 这个数据结构。先来看生产者,直接 push 数据就好了。

// 2.rs.p.php
$redis = new Redis();
$redis->connect('127.0.0.1', 6379);

echo "生产者向消息队列中发送信息:Hello World!";

$redis->lpush('hello', 'Hello World!');

消费者呢?当然就是我们之前已经学过的 pop 啦。

// 2.rs.p.php
$redis = new Redis();
$redis->connect('127.0.0.1', 6379);

echo "等待消息,或者使用 Ctrl+C 退出程序。", PHP_EOL;

while(1){
    $data = $redis->rpop('hello');
    if ($data){
        echo '接收到数据: ', $data, PHP_EOL;
    }
}

在这里,我们使用的是 lpush() + rpop() 的方式,当然你也可以使用 rpush() + lpop() 的方式,大家思考一下,如果是 lpush() + lpop() ,是实现的什么数据结构呢?

同样地,在 Redis 的消费者中,我们也需要通过一个死循环挂起消费者,然后不停地获取数据进行处理。剩下的测试过程就和上面的 RabbitMQ 一样了。

我的实践

之前我就说过,我的消息队列实践不多。唯一的业务场景下实现的高并发消息队列应用其实就是上面的 Redis 这两段代码。真的,核心就是这点东西。但为了抗高并发,我是使用的 Swoole ,生产者是在 Hyperf 框架中通过控制器接收到数据后,直接就放到 Redis 里。然后消费者就是一个命令行,接着开 100 个协程,将获取到的消息数据丢给协程处理。

业务应用是游戏的日志上报,最高并发 20000+ ,入库日志量 3000万+ 。使用的是阿里云最便宜的那个 Redis 服务,4G 大小单实例的那款。

是不是见到了消息队列的恐怖能力。这个量级,对于任何消息队列应用来说问题都不大,RabbitMQ 被认为是比较慢的,但是,它的处理能力是每秒几万次请求。而 Redis ,就是以 Redis 的读写性能为基础的,大概每秒11万的读和8万的写。这个在之前的 Redis 学习中都已经说过了。

总结

今天通过代码,我们其实就已经学习到了整个消息队列中最核心的内容。没错,消息队列就是这么地简单,但又这么地实用。我的业务例子其实是异步解耦的一种实现。而对于另一种常见的场景,秒杀,大家想一想,是不是也可以直接通过这样一个简单的队列就能够实现了。当然,可能还比较简陋,也需要考虑更多的东西,但是,一秒内处理几万条请求和一秒内让几万条请求入队,这个差别可大了去了。

其实,从队列的思想就可以看出,我们用数据库也可以实现队列,插入数据是入队,然后倒序查询出来一条就可以视为出队。但是呢,数据库的性能往往和专业的消息队列以及 NoSQL 工具都是有很大的差距的。因此,其实还是那句话,把握本质和思想,工具用啥都好说。

测试代码:

https://github.com/zhangyue0503/dev-blog/blob/master/%E6%B6%88%E6%81%AF%E9%98%9F%E5%88%97/source/2.rq.c.php

https://github.com/zhangyue0503/dev-blog/blob/master/%E6%B6%88%E6%81%AF%E9%98%9F%E5%88%97/source/2.rq.c.php

https://github.com/zhangyue0503/dev-blog/blob/master/%E6%B6%88%E6%81%AF%E9%98%9F%E5%88%97/source/2.rq.c.php

https://github.com/zhangyue0503/dev-blog/blob/master/%E6%B6%88%E6%81%AF%E9%98%9F%E5%88%97/source/2.rq.c.php

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1417010.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【书籍分享 • 第一期】基于GPT-3、ChatGPT、GPT-4等Transformer架构的自然语言处理

文章目录 一、内容简介二、前言2.1 Transformer 模型标志着AI 新时代的开始2.2 Transformer 架构具有革命性和颠覆性2.3 Google BERT 和OpenAI GPT-3 等Transformer 模型将AI 提升到另一个层次2.4 本书将带给你的“芝士”2.5 本书面向的读者 三、本书内容简介3.1 第一章3.2 第二…

看了《如果奔跑是我的人生》,你有感触么?

♥ 为方便您进行讨论和分享,同时也为能带给您不一样的参与感。请您在阅读本文之前,点击一下“关注”,非常感谢您的支持! 文 |猴哥聊娱乐 编 辑|徐 婷 校 对|侯欢庭 亲爱的,你知道吗?今年的国产剧市场好像…

[SpingBoot] 3个扩展点

初始化器ApplicationContextInitializer监听器ApplicationListenerRunner: Runner的一般应用场景就是资源释放清理或者做注册中心, 因为执行到Runner的时候项目已经启动完毕了, 这个时候可以注册进注册中心。 文章目录 1.初始化器ApplicationContextInitializer2.监听器Applica…

JVM-字节码文件的组成

Java虚拟机的组成 Java虚拟机主要分为以下几个组成部分: 类加载子系统:核心组件类加载器,负责将字节码文件中的内容加载到内存中。 运行时数据区:JVM管理的内存,创建出来的对象、类的信息等等内容都会放在这块区域中。…

面了中邮消金算法岗、开水团数据挖掘岗,做个系统性总结

最近技术群的同学,分享了面试数据挖掘/算法岗(实习)的经验。 今天整理后分享给大家,如果你对这块面试感兴趣,可以文末加入我们的面试、技术群 1、中邮消费金融AI算法工程师 面试官是nlp方向的,主要是问nlp相关 首先自我介绍 …

设计模式:简介及基本原则

简介 设计模式是一套被反复使用的、多数人知晓的、经过分类编目的、代码设计经验的总结。使用设计模式是为了重用代码、让代码更容易被他人理解、保证代码可靠性。 毫无疑问,设计模式于己于他人于系统都是多赢的,设计模式使代码编制真正工程化&#xff…

基于springboot校园台球厅人员与设备管理系统源码和论文

在Internet高速发展的今天,我们生活的各个领域都涉及到计算机的应用,其中包括校园台球厅人员与设备管理系统的网络应用,在外国管理系统已经是很普遍的方式,不过国内的管理网站可能还处于起步阶段。校园台球厅人员与设备管理系统具…

基于springboot+vue的医院管理系统(前后端分离)

博主主页:猫头鹰源码 博主简介:Java领域优质创作者、CSDN博客专家、公司架构师、全网粉丝5万、专注Java技术领域和毕业设计项目实战 主要内容:毕业设计(Javaweb项目|小程序等)、简历模板、学习资料、面试题库、技术咨询 文末联系获取 研究背景…

[RootersCTF2019]I_<3_Flask

打开界面,根据题目应该是个flask模板注入,但是参数不知道是什么,偷看了一手别人的wp,学到了一个工具Arjun 找到一个参数name,测试一下 模板注入成功 先看一下有没有os._wrap_close类 放到notepad里面排下序 132直接用…

鸿蒙开发基础案列001

1、开发需求 案例app一打开是“Hello world” 界面,开发者点击“Hello world”变成“Hello ArkUI”’ 2、源代码 Entry Component struct Hello {State person_name: string Worldbuild() {Row() {Column() {Text(Hello this.person_name).fontSize(50).fontWei…

牛客周赛 Round 30 解题报告 | 珂学家 | 树形DP + 期望DP

前言 整体评价 D是一道数学题,E是一道经典的入门树形DP,F题是一道期望DP,记忆化的方式更加简单一些。 ABC虽然偏简单,但是都是构造形态的,好像有CF风格了。 欢迎关注 珂朵莉 牛客周赛专栏 珂朵莉 牛客小白月赛专栏…

【Docker】数据持久化 挂载

Docker的镜像是只读的,但是容器是可写的,我们可以将数据写入到容器,不过一旦容器删除数据将会丢 失,那么有什么办法能将数据进行持久化存储呢? ——在宿主机上开辟一块地方,存储内容和docker容器的存储内…

C++中的 auto

一、auto的引入 随着程序越来越复杂&#xff0c;程序中使用的类型也越来越复杂。 例如std::map<std::string, std::string>::iterator是一个类型&#xff0c;但是该类型太长了&#xff0c;容易写错。 用auto声明的变量可以自动推导出变量的类型 二、auto的使用细则 1…

RabbitMQ快速上手

首先他的需求实在什么地方。我美哟明显的感受到。 它给我的最大感受就是脱裤子放屁——多此一举&#xff0c;的感觉。 他将信息发送给服务端中间件。在由MQ服务器发送消息。 服务器会监听消息。 但是它不仅仅局限于削峰填谷和稳定发送信息的功能&#xff0c;它还有其他重要…

Java中的Map和Set

在Java中Map和Set分别是两个不同的接口 对于Set来说&#xff0c;Set上面还有Collection这个接口&#xff0c;而对于Map来说&#xff0c;上面就没有接口了 在这两个接口下面分别实现了一个有序的接口&#xff0c;sortmap和sortset&#xff0c;而在这个接口下面又分别有两个实现…

第4章 python深度学习——(波斯美女)

第4章 机器学习基础 本章包括以下内容&#xff1a; 除分类和回归之外的机器学习形式 评估机器学习模型的规范流程 为深度学习准备数据 特征工程 解决过拟合 处理机器学习问题的通用工作流程 学完第 3 章的三个实例&#xff0c;你应该已经知道如何用神经网络解决分类问题和回归…

【GitHub项目推荐--常见的国内镜像】【转载】

由于国内网络原因&#xff0c;下载依赖包或者软件&#xff0c;对于不少互联网从业者来说&#xff0c;都有不小的挑战&#xff0c;时间浪费在这上边&#xff0c;实在可惜。这个项目介绍了常见依赖&#xff0c;软件的国内镜像&#xff0c;助力大家畅爽编码。 这是一个归纳梳理类…

【GitHub项目推荐--如何构建项目】【转载】

这是一个 138K Star 的开源项目&#xff0c;这个仓库汇集了诸多优质资源&#xff0c;教你如何构建一些属于自己的东西&#xff0c;内容主要分为增强现实、区块链、机器人、编辑器、命令行工具、神经网络、操作系统等几大类别。 开源地址&#xff1a;https://github.com/danist…

(N-141)基于springboot,vue网上拍卖平台

开发工具&#xff1a;IDEA 服务器&#xff1a;Tomcat9.0&#xff0c; jdk1.8 项目构建&#xff1a;maven 数据库&#xff1a;mysql5.7 系统分前后台&#xff0c;项目采用前后端分离 前端技术&#xff1a;vueelementUI 服务端技术&#xff1a;springbootmybatis-plusredi…

Vue2.0+Element实现日历组件

(壹)博主介绍 &#x1f320;个人博客&#xff1a; 尔滨三皮⌛程序寄语&#xff1a;木秀于林&#xff0c;风必摧之&#xff1b;行高于人&#xff0c;众必非之。 (贰)文章内容 1、安装依赖 npm install moment2.29.4 --savenpm install lunar0.0.3 --savenpm install lunar-java…