一文读懂RabbitMQ消息队列

news2025/1/4 6:07:37

一.什么是消息队列

1.简介

在介绍消息队列之前,应该先了解什么是 AMQP(Advanced Message Queuing Protocol, 高级消息队列协议,点击查看)
消息(Message)是指在应用间 传送的数据,消息可以非常简单,比如只包含文本字符串,也可以更复杂,可能包含嵌入对象;而 消息队列(Message Queue)是一种 应用间通信方式,消息发送后可以 立即返回,由 消息系统来确保消息的 可靠传递消息发布者只管把消息发布到 MQ 中而不用管谁来取, 消息使用者只管从 MQ 中取消息而不管是谁发布的,这样发布者和使用者都不用知道对方的存在,它是典型的 生产者-消费者模型,生产者不断向消息队列生产消息,消费者不断的从队列获取消息。因为消息的生产和消费都是 异步的,并且只关心消息的发送和接收,没有业务逻辑的浸入,这样就实现了生产者和消费者的 解耦

2.总结

(1).消息队列是队列结构的 中间件
(2).消息发送后,不需要立即处理,而是由消息系统来处理
(3).消息处理是 消息使用者(消费者) 按顺序处理的

3.结构图

二.为什么要使用消息队列

消息队列是一种应用间的异步协作机制,是分布式系统中的重要的组件,主要目的是为了解决应用 藕合异步消息流城削锋, 冗余,扩展性,排序保证等问题,实现 高性能高并发可伸缩最终一致性架构,下面举例说明
  1. 业务解耦

以常见的订单系统为例,用户点击【下单】按钮之后的业务逻辑可能包括:扣减库存、生成相应单据、发红包、发短信通知,商品配送等业务;在业务发展初期这些逻辑可能放在一起同步执行,随着业务的发展订单量增长,需要提升系统服务的性能,这时可以将一些不需要立即生效的操作拆分出来异步执行,或者单独拆分出来作为一个独立的系统,比如生成相应单据为订单系统,扣减库存为库存系统,发放红包独立为红包系统、发短信通知为短信系统,商品配送为配送系统等。这种场景下就可以用 MQ ,在下单的主流程(比如扣减库存、生成相应单据)完成之后发送一条消息到 MQ 让主流程快速完结,而由另外的单独线程拉取MQ的消息(或者由 MQ 推送消息),当发现 MQ 中有发红包或发短信或商品配送之类的消息时,执行相应的业务系统逻辑,这样各个业务系统相互独立,就很方便进行分离部署,防止某一系统故障引起的连锁故障

  1. 流量削峰

流量削峰一般在秒杀或者团抢活动中广泛使用

(1).由来

主要是还是来自于互联网的业务场景,例如:春节火车票抢购,大量的用户需要同一时间去抢购;以及双11秒杀, 短时间上亿的用户涌入,瞬间流量巨大(高并发),比如:200万人准备在凌晨12:00准备抢购一件商品,但是商品的数量缺是有限的100-500件左右。这样真实能购买到该件商品的用户也只有几百人左右, 但是从业务上来说,秒杀活动是希望更多的人来参与,也就是抢购之前希望有越来越多的人来看购买商品。但是,在抢购时间达到后,用户开始真正下单时,秒杀的服务器后端却不希望同时有几百万人同时发起抢购请求。因为服务器的处理资源是有限的,所以出现峰值的时候,很容易导致服务器宕机,用户无法访问的情况出现。这就好比出行的时候存在早高峰和晚高峰的问题,为了解决这个问题,出行就有了错峰限行的解决方案。同理,在线上的秒杀等业务场景,也需要类似的解决方案,需要平安度过同时抢购带来的流量峰值的问题,这就是流量削峰的由来。

(2).怎样来实现流量削峰方案

削峰从本质上来说就是更多地延缓用户请求,以及层层过滤用户的访问需求,遵从“最后落地到数据库的请求数要尽量少”的原则。

1).消息队列解决削峰

要对流量进行削峰,最容易想到的解决方案就是用消息队列来缓冲瞬时流量,把同步的直接调用转换成异步的间接推送,中间通过一个队列在一端承接瞬时的流量洪峰,在另一端平滑地将消息推送出去。

消息队列中间件主要解决应用耦合,异步消息, 流量削锋等问题;常用消息队列系统:目前在生产环境,使用较多的消息队列有 ActiveMQ、RabbitMQ、 ZeroMQ、Kafka、MetaMQ、RocketMQ 等。

在这里,消息队列就像“水库”一样,拦蓄上游的洪水,削减进入下游河道的洪峰流量,从而达到减免洪水灾害的目的。

2).流量削峰漏斗:层层削峰

针对秒杀场景还有一种方法,就是对请求进行分层过滤,从而过滤掉一些无效的请求。分层过滤其实就是采用“漏斗”式设计来处理请求的,如下图所示:

这样就像漏斗一样,尽量把数据量和请求量一层一层地过滤和减少了
I.分层过滤的核心思想
  • 通过在不同的层次尽可能地过滤掉无效请求

  • 通过CDN过滤掉大量的图片,静态资源的请求

  • 再通过类似Redis这样的分布式缓存,过滤请求等就是典型的在上游拦截读请求

II.分层过滤的基本原则
  • 对写数据进行基于时间合理分片,过滤掉过期的失效请求

  • 对写请求做限流保护,将超出系统承载能力的请求过滤掉

  • 涉及到的读数据不做强一致性校验,减少因为一致性校验产生瓶颈的问题

  • 对写数据进行强一致性校验,只保留最后有效的数据

最终,让“漏斗”最末端(数据库)的才是有效请求,例如:当用户真实达到订单和支付的流程,这个是需要数据强一致性的。

(3).总结

1).对于秒杀这样的高并发场景业务,最基本的原则就是将请求拦截在系统上游,降低下游压力。如果不在前端拦截很可能造成数据库(mysql、oracle等)读写锁冲突,甚至导致死锁,最终还有可能出现雪崩等场景。

2).划分好动静资源,静态资源使用CDN进行服务分发

3).充分利用缓存(redis等),增加QPS,从而加大整个集群的吞吐量

4).高峰值流量是压垮系统很重要的原因,所以需要RabbitMQ等消息队列在一端承接瞬时的流量洪峰,在另一端平滑地将消息推送出去

  1. 异步处理

用户注册后,需要发送注册邮件和注册短信

三.RabbitMQ介绍

RabbitMQ是一个由 erlang语言开发的,实现了AMQP协议的标准的开源消息代理和队列服务器( 消息队列中间件)

1.常见的消息队列中间件

2.RabbitMQ特性

  • 可靠性(Reliability)
    RabbitMQ 使用一些机制来保证可靠性,如持久化、传输确认、发布确认

  • 灵活的路由(Flexible Routing)
    在消息进入队列之前,通过 Exchange 来路由消息的。对于典型的路由功能,RabbitMQ 已经提供了一些内置的 Exchange 来实现。针对更复杂的路由功能,可以将多个 Exchange 绑定在一起,也通过插件机制实现自己的 Exchange

  • 消息集群(Clustering)
    多个 RabbitMQ 服务器可以组成一个集群,形成一个逻辑 Broker

  • 高可用(Highly Available Queues)
    队列可以在集群中的机器上进行镜像,使得在部分节点出问题的情况下队列仍然可用

  • 多种协议(Multi-protocol)
    RabbitMQ 支持多种消息队列协议,比如 STOMP、MQTT 等

  • 多语言客户端(Many Clients)
    RabbitMQ 几乎支持所有常用语言,比如 Java、.NET、Ruby ,PHP等

  • 管理界面(Management UI)
    RabbitMQ 提供了一个易用的用户界面,使得用户可以监控和管理消息 Broker 的许多方面

  • 跟踪机制(Tracing)
    如果消息异常,RabbitMQ 提供了消息跟踪机制,使用者可以找出发生了什么

  • 插件机制(Plugin System)
    RabbitMQ 提供了许多插件,来从多方面进行扩展,也可以编写自己的插件

3.RabbitMQ工作原理

内部实际上也是 AMQP 中的基本概念

上图各个模块的说明:

  • Message: 消息,消息是不具名的,它由消息头消息体组成,消息体是不透明的,而消息头则由一系列的可选属性组成,这些属性包括routing-key(路由键)priority(相对于其他消息的优先权)delivery-mode(指出该消息可能需要持久性存储)

  • Publisher: 消息的生产者,也是一个向交换器发布消息客户端应用程序

  • Consumer:消息的消费者,表示一个从消息队列中取得消息客户端应用程序

  • Broker: 接收和分发消息的应用,表示消息队列服务器实体,RabbitMQ Server就是Message Broker

  • Virtual host: 虚拟主机(共享相同的身份认证和加密环境的独立服务器域),表示一批交换器、消息队列和相关对象,类似于mysql的数据库,当多个不同的用户使用同一个RabbitMQ server提供的服务时,可以划分出多个vhost,每个用户在自己的vhost创建exchange、queue等.每个 vhost 本质上就是一个 mini 版的 RabbitMQ 服务器,拥有自己的队列、交换器、绑定和权限机制,vhost 是 AMQP 概念的基础,必须在连接时指定,RabbitMQ 默认的 vhost 是 /

  • Connection: publisher、consumer和broker之间的网络连接,比如:TCP连接,断开连接的操作只会在client端进行,Broker不会断开连接,除非出现网络故障broker服务出现问题

  • Channel: 管道,多路复用连接中的一条独立的双向数据流通道,信道是建立在真实的TCP连接内虚拟连接(逻辑连接),如果应用程序支持多线程,通常每个多线程创建单独的channel进行通讯, 因为AMQP 方法中包含了channel id帮助客户端和broker识别channel,所以channel之间是完全隔离的,AMQP 命令都是通过管道发出去的,不管是发布消息、订阅队列还是接收消息,这些动作都是通过管道完成。因为对于操作系统来说建立和销毁 TCP 都是非常昂贵的开销,如果每一次访问RabbitMQ都建立一个Connection,在消息量大的时候建立TCP Connection的开销将是巨大的,效率也较低,所以引入了管道的概念,目的是为了减少操作系统建立TCP 连接的开销,以复用一条 TCP 连接

  • Exchange: 交换器,用来接收生产者发送的消息并将这些消息路由给服务器中的队列,message到达broker的第一站,根据分发规则,匹配查询表中的路由键(routing key),分发消息到queue中去,常用的类型有:直连交换机-direct (point-to-point), 主题交换机-topic (publish-subscribe),扇型交换机-fanout (multicast), 头交换机-headers(amq.match (and amq.headers in RabbitMQ))

  • Queue: 消息队列,用来保存消息直到发送给消费者,它是消息的容器,也是消息的终点,一个消息可投入一个或多个队列,消息最终被送到这里等待消费者连接到这个队列将其取走

  • Binding: 绑定,消息队列(queue)和交换器(exchange)之间的虚拟连接, binding中可以包含routing key, Binding信息被保存到exchange中的查询表中,用于message的分发依据,一个绑定就是基于路由键将交换器和消息队列连接起来的路由规则,所以可以将交换器理解成一个由绑定构成的路由表

四.RabbitMQ安装与启动

见linux下,docker-compose 安装nignx,php,mysql,redis,rabbitmq,mongo

端口说明:安装完rabbitmq后,有几个常见端口:
4369: epmd(Erlang Port Mapper Daemon),erlang服务端口
5672: client端通信端口
15672:http Api客户端,管理UI(仅在启用了管理插件的情况下使用)
25672:用于节点间通信(Erlang分发服务器端口)

五.RabbitMQ几个重要特性概念讲解

  • 队列模式-简单队列模式,工作队列模式

  • ACK&NACK消费确认机制&重回队列机制

  • 消息持久化

  • 公平调度(限流机制)

  • 幂等性

  • return机制

  • 消息的可靠性投递

下面是RabbitMQ和消息所涉及到的一些 术语
  • 生产(Producing)的意思就是发送:发送消息的程序就是一个生产者(producer),一般用"P"来表示:

  • 队列(queue)就是存在于RabbitMQ中邮箱的名称:虽然消息的传输经过了RabbitMQ和你的应用程序,但是它只能被存储于队列当中。实质上队列就是个巨大的消息缓冲区,它的大小只受主机内存和硬盘限制。多个生产者(producers)可以把消息发送给同一个队列,同样,多个消费者(consumers)也能够从同一个队列(queue)中获取数据。队列可以绘制成这样(图上是队列的名称):

  • 消费(Consuming)和接收(receiving)是同一个意思:一个消费者(consumer)就是一个等待获取消息的程序。把它绘制为"C":

以php框架yii2为参照
  1. 简单队列模式(simple queue)

发送 单个消息的生产者,以及 接收消息并将其 打印出来的消费者。将忽略RabbitMQ API中的一些细节。 在下图中,“P”是生产者,“C”是消费者。中间的框是一个队列(保存消息的地方)

(1).生产者发布消息步骤

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

// 创建连接
$connection = new AMQPStreamConnection($host, $port, $user, $pass, $v_host="/");

// 创建channel
$channel = $connection->channel();

// 初始化队列,并持久化(声明队列)
$channel->queue_declare($queue_name, false, true, false, false);

//消息内容
$data = "this is message2";
// 声明消息,并持久化(创建消息)
$mes = new AMQPMessage($data, ["delivery_mode" => AMQPMessage::DELIVERY_MODE_PERSISTENT]);

// 把消息推到队列里(发布消息)
$channel->basic_publish($mes, '', $queue_name);

//关闭通道和连接
$channel->close();
$connection->close();
上面声明队列方法queue_declare()参数详解

(2).消费者消费消息步骤

//核心代码
basic_consume($queue = '', $consumer_tag = '', $no_local = false,$no_ack = false,$exclusive = false,$nowait = false,$callback = null,$ticket = null,$arguments = array())
上面消费消息方法basic_consume()参数详解

(3).具体代码展示

rabbitmq配置

"rabbitMq" => [
    "base" => [
        'host' => '192.168.0.5',  // host地址
        'port' => 5672,  // 端口
        "user" => "user",  // 账户
        'pass' => 123456,  // 密码
        "v_host" => "order",  // 对应Virtual Hosts
    ],
    "queue_name" => [
        "name1" => "goods",  // 队列名称
    ],
]

生产者代码

<?php
/**
 * 生产者生产消息
 */

namespace console\controllers\simple;

use Yii;
use yii\web\Controller;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

class PublisherController extends Controller
{
    public $enableCsrfValidation=false;

    public function actionIndex()
    {
        //rabbitmq相关配置
        $rabbitMqConfig = Yii::$app->params["rabbitMq"];
        $config = $rabbitMqConfig["base"];
        $queue_name = $rabbitMqConfig["queue_name"]["name1"]; // 队列名称
        // 创建连接
        $connection = new AMQPStreamConnection($config["host"], $config["port"], $config["user"], $config["pass"], $config["v_host"]);
        // 创建channel
        $channel = $connection->channel();
        // 初始化队列,并持久化
        $channel->queue_declare($queue_name, false, true, false, false);
        //消息
        $data = "this is message2";
        // 声明消息,并持久化
        $mes = new AMQPMessage($data, ["delivery_mode" => AMQPMessage::DELIVERY_MODE_PERSISTENT]);
        // 把消息推到队列里
        $channel->basic_publish($mes, '', $queue_name);
        //关闭通道和连接
        $channel->close();
        $connection->close();
    }
}

消费者代码

<?php
/**
 * 消费者消费消息
 */

namespace console\controllers\simple;

use Yii;
use yii\web\Controller;
use PhpAmqpLib\Connection\AMQPStreamConnection;

class ConsumerController extends Controller
{
    public $enableCsrfValidation=false;
    public function actionIndex()
    {
        //rabbitmq相关配置
        $rabbitMqConfig = Yii::$app->params["rabbitMq"];
        $config = $rabbitMqConfig["base"];
        $queue_name = $rabbitMqConfig["queue_name"]["name1"]; // 队列名称
        // 创建连接
        $connection = new AMQPStreamConnection($config["host"], $config["port"], $config["user"], $config["pass"], $config["v_host"]);
        // 创建channel
        $channel = $connection->channel();
        // 初始化队列,并持久化
        $channel->queue_declare($queue_name, false, true, false, false);
        // 消费消息
        $callback = function ($msg) {
            echo "reviced: " . $msg->body . "\n";
        };
        $channel->basic_consume($queue_name, "", false, true, false, false,$callback);
        // 监控
        while ($channel->is_open()){
            $channel->wait();
        }
        //关闭通道和连接
        $channel->close();
        $connection->close();
    }
}
  1. 工作队列模式(worker queue)

创建一个 工作队列(Work Queue),它会发送一些 耗时的任务多个工作者(Worker),工作队列(又称: 任务队列——Task Queues)是为了 避免等待一些占用大量资源、时间的操作,当把任务(Task)当作消息发送到队列中,一个运行在后台的工作者(worker)进程就会取出任务然后处理,当运行多个工作者(workers),任务就会在它们之间 共享。这个概念在网络应用中是非常有用的,它可以在短暂的HTTP请求中处理一些复杂的任务,使用工作队列的一个 好处就是它能够 并行的处理队列。如果堆积了很多任务,只需要添加 更多的工作者(workers)就可以了,这就是所谓的 循环调度,扩展很简单

(1).具体代码展示

rabbitmq配置

"rabbitMq" => [
    "base" => [
        'host' => '192.168.0.5',  // host地址
        'port' => 5672,  // 端口
        "user" => "user",  // 账户
        'pass' => 123456,  // 密码
        "v_host" => "order",  // 对应Virtual Hosts
    ],
    "queue_name" => [
        "name1" => "goods",  // 队列名称
        "name2" => "task_queue",  // 队列名称
    ],
]

生产者代码

<?php
/**
 * 生产者生产消息
 */

namespace console\controllers\worker;

use Yii;
use yii\web\Controller;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

class PublisherController extends Controller
{
    public $enableCsrfValidation=false;

    public function actionIndex()
    {
        //rabbitmq相关配置
        $rabbitMqConfig = Yii::$app->params["rabbitMq"];
        $config = $rabbitMqConfig["base"];
        $queue_name = $rabbitMqConfig["queue_name"]["name2"]; // 队列名称
        // 创建连接
        $connection = new AMQPStreamConnection($config["host"], $config["port"], $config["user"], $config["pass"], $config["v_host"]);
        // 创建channel
        $channel = $connection->channel();
        // 初始化队列,并持久化
        $channel->queue_declare($queue_name, false, true, false, false);

        // 生产多条消息
        for ($i = 0; $i <= 10; ++$i) {
            //消息
            $data = "this is " . $i. " message";
            // 声明消息,并持久化
            $mes = new AMQPMessage($data, ["delivery_mode" => AMQPMessage::DELIVERY_MODE_PERSISTENT]);
            // 把消息推到队列里
            $channel->basic_publish($mes, '', $queue_name);
        }
        //关闭通道和连接
        $channel->close();
        $connection->close();
    }
}

消费者代码

当生产者生产了多条费时的消息时,一个消费者不能满足需要,可以添加多个消费者处理生产者的消息,多个消费者之间采用 轮询的方式获取队列的消息,并把该消息发送给对应的用户
比如:可以对一个队列的消息开多个消费者,这里我们开了两个消费者,里面的代码都是一致的
<?php
/**
 * 消费者消费消息
 */

namespace console\controllers\worker;

use Yii;
use yii\web\Controller;
use PhpAmqpLib\Connection\AMQPStreamConnection;

class ConsumerController extends Controller
{
    public $enableCsrfValidation=false;
    public function actionIndex()
    {
        //rabbitmq相关配置
        $rabbitMqConfig = Yii::$app->params["rabbitMq"];
        $config = $rabbitMqConfig["base"];
        $queue_name = $rabbitMqConfig["queue_name"]["name2"]; // 队列名称
        // 创建连接
        $connection = new AMQPStreamConnection($config["host"], $config["port"], $config["user"], $config["pass"], $config["v_host"]);
        // 创建channel
        $channel = $connection->channel();
        // 初始化队列,并持久化
        $channel->queue_declare($queue_name, false, true, false, false);
        // 消费消息
        $callback = function ($msg) {
            echo "reviced: " . $msg->body . "\n";
        };
        $channel->basic_consume($queue_name, "", false, true, false, false,$callback);
        // 监控
        while ($channel->is_open()){
            $channel->wait();
        }
        //关闭通道和连接
        $channel->close();
        $connection->close();
    }
}
<?php
/**
 * 消费者2消费消息
 */

namespace console\controllers\worker;

use Yii;
use yii\web\Controller;
use PhpAmqpLib\Connection\AMQPStreamConnection;

class Consumer2Controller extends Controller
{
    public $enableCsrfValidation=false;
    public function actionIndex()
    {
        //rabbitmq相关配置
        $rabbitMqConfig = Yii::$app->params["rabbitMq"];
        $config = $rabbitMqConfig["base"];
        $queue_name = $rabbitMqConfig["queue_name"]["name2"]; // 队列名称
        // 创建连接
        $connection = new AMQPStreamConnection($config["host"], $config["port"], $config["user"], $config["pass"], $config["v_host"]);
        // 创建channel
        $channel = $connection->channel();
        // 初始化队列,并持久化
        $channel->queue_declare($queue_name, false, true, false, false);
        // 消费消息
        $callback = function ($msg) {
            echo "reviced: " . $msg->body . "\n";
        };
        $channel->basic_consume($queue_name, "", false, true, false, false,$callback);
        // 监控
        while ($channel->is_open()){
            $channel->wait();
        }
        //关闭通道和连接
        $channel->close();
        $connection->close();
    }
}
  1. ACK消费确认机制&NACK&重回队列机制

ACK消费确认机制

当处理一个 比较耗时得任务的时候,想知道消费者(consumers) 运行到一半就挂掉时, 正在处理的消息/发送给当前工作者的消息会怎样,当消息在队列中 没有进行持久化操作时,消息被RabbitMQ发送给消费者(consumers)之后,马上就会在内存中 移除。这种情况,只要把一个 工作者(worker)停止,正在处理的消息就会 丢失。同时,所有发送到这个工作者的还没有处理的消息 都会丢失。所以,如果不想消息丢失,当一个工作者(worker)挂掉了,希望任务会重新发送给其他的工作者(worker),RabbitMQ就提供了 消息响应acknowledgments)。消费者会通过一个 ack(响应),告诉RabbitMQ已经收到并处理了某条消息,然后RabbitMQ就会 释放并删除这条消息。如果消费者(consumer) 挂掉了, 没有发送响应,RabbitMQ就会认为消息 没有被完全处理,然后 重新发送其他消费者(consumer)。这样,即使工作者(workers)偶尔的挂掉,也不会丢失消息。消息是没有超时这个概念的,当工作者与它断开连的时候,RabbitMQ会重新发送消息,这样在处理一个耗时非常长的消息任务的时候就不会出问题了。在该讲解中,将使用手动消息确认,通过为 no_ack参数传递 false,一旦有任务完成,使用d.ack()(false)向RabbitMQ服务器发送消费完成的确认(这个
确认消息是单次传递的)
//核心代码
basic_consume($queue = '', $consumer_tag = '', $no_local = false, $no_ack = false,$exclusive = false,$nowait = false,$callback = null,$ticket = null,$arguments = array())
// 消费消息
$callback = function ($msg) {
echo "reviced: " . $msg->body . "\n";
// 消费ack
$msg->ack();
};
// 第四个参数: 需要ack确认,这里我们在callback手动确认
$channel->basic_consume($queue_name, "", false, false, false, false,$callback);

rabbitmq配置

"rabbitMq" => [
    "base" => [
        'host' => '192.168.0.5',  // host地址
        'port' => 5672,  // 端口
        "user" => "user",  // 账户
        'pass' => 123456,  // 密码
        "v_host" => "order",  // 对应Virtual Hosts
    ],
    "queue_name" => [
        "name1" => "goods",  // 队列名称
        "name2" => "task_queue",  // 队列名称
        "name3" => "task_ack",  // 队列名称
    ],
]

生产者代码

<?php
/**
 * 生产者生产消息
 */

namespace console\controllers\ack;

use Yii;
use yii\web\Controller;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

class PublisherController extends Controller
{
    public $enableCsrfValidation=false;

    public function actionIndex()
    {
        //rabbitmq相关配置
        $rabbitMqConfig = Yii::$app->params["rabbitMq"];
        $config = $rabbitMqConfig["base"];
        $queue_name = $rabbitMqConfig["queue_name"]["name3"]; // 队列名称
        // 创建连接
        $connection = new AMQPStreamConnection($config["host"], $config["port"], $config["user"], $config["pass"], $config["v_host"]);
        // 创建channel
        $channel = $connection->channel();
        // 初始化队列,并持久化
        $channel->queue_declare($queue_name, false, true, false, false);

        // 生产多条消息
        for ($i = 0; $i <= 10; ++$i) {
            //消息
            $data = "this is " . $i. " message";
            // 声明消息,并持久化
            $mes = new AMQPMessage($data, ["delivery_mode" => AMQPMessage::DELIVERY_MODE_PERSISTENT]);
            // 把消息推到队列里
            $channel->basic_publish($mes, '', $queue_name);
        }
        //关闭通道和连接
        $channel->close();
        $connection->close();
    }
}

消费者代码

<?php
/**
 * 消费者消费消息
 */

namespace console\controllers\ack;

use Yii;
use yii\web\Controller;
use PhpAmqpLib\Connection\AMQPStreamConnection;

class ConsumerController extends Controller
{
    public $enableCsrfValidation=false;
    public function actionIndex()
    {
        //rabbitmq相关配置
        $rabbitMqConfig = Yii::$app->params["rabbitMq"];
        $config = $rabbitMqConfig["base"];
        $queue_name = $rabbitMqConfig["queue_name"]["name3"]; // 队列名称
        // 创建连接
        $connection = new AMQPStreamConnection($config["host"], $config["port"], $config["user"], $config["pass"], $config["v_host"]);
        // 创建channel
        $channel = $connection->channel();
        // 初始化队列,并持久化
        $channel->queue_declare($queue_name, false, true, false, false);
        // 消费消息
        $callback = function ($msg) {
            echo "reviced: " . $msg->body . "\n";
            // 消费ack
            $msg->ack();
        };
        // 第二个参数:同一时刻服务器只会发送1条消息给消费者
        $channel->basic_qos(null, 1, null);
        // 第四个参数: 需要ack确认,这里我们在callback手动确认
        $channel->basic_consume($queue_name, "", false, false, false, false,$callback);
        // 监控
        while ($channel->is_open()){
            $channel->wait();
        }
        //关闭通道和连接
        $channel->close();
        $connection->close();
    }
}

NACK&重回队列机制

当设置了方法 basic_consume$no_ack = false 时,使用手工 ACK 方式,除了ACK外,其实还有 NACK 方式,当手工 AcK 时,会发送给Broker( 服务器)一个应答,代表消息处理成功,Broker就可回送响应给生产端 .
NACK 则表示消息处理失败,如果设设置了重回队列, Broker 端就会将没有成功处理的消息重新发送
通俗来讲:
手工ACK:消费成功了,向发起者确认
NACK:消费失败,让生产者重新发
一般在实际应用中,都会关闭重回队列,也就是设置为false

使用方式

  • 消费端消费时.如果由于业务异常,可以手工 NACK 记录日志,然后进行补偿

  • API :basic_nack($delivery_tag, $multiple = false, $requeue = false)

  • 如果由于服务器宕机等严里问题,就需要手工 ACK 保障消费端消费成功

  • API :basic_ack($delivery_tag, $multiple = false)

4.消息持久化

如果没有特意告诉RabbitMQ,那么在它退出或者崩溃的时候,将会丢失所有队列和消息,为了确保消息不会丢失,有两个事情是需要注意的:必须把“队列”“消息”设为持久化首先,为了不让队列消失,需要把队列声明为持久化(durable),但这里面会有一定的问题,它会返回一个错误,可以使用一个快捷的解决方法——用不同名字的队列,例如task_queue

代码如上面生产者/消费者所示:

// 初始化队列,并持久化
$channel->queue_declare($queue_name, false, true, false, false);
消息持久化配置: " delivery_mode" => AMQPMessage::DELIVERY_MODE_PERSISTENT

5.公平调度(限流机制)

为什么要限流

  • 当工作者处理消息时,会出现这么一个问题:比如有两个工作者(workers),处理奇数消息的比较繁忙,处理偶数消息的比较轻松,然而RabbitMQ并不知道这些,它仍然一如既往的派发消息,这时因为RabbitMQ只管分发进入队列的消息不会关心有多少消费者(consumer)没有作出响应,它盲目的把第n-th条消息发给第n-th个消费者

  • 假设还有这样的场景:RabbitMQ服务器有上万条未处理的消息,随便打开一个消费者Client ,会造成巨量的消息瞬间全部推送过来,然而单个客户端无法同时处理这么多数据,此时很有可能导致服务器崩溃,严重的可能导致线上的故障

  • 还有一些其他的场景:比如说单个 生产端一分钟产生了几百条数据,但是单个消费端 一分钟可能只能处理 60 条,这个时候生产端-消费端肯定是不平衡的,通常生产端是没办法做限制的,所以消费端肯定需要做一些限流措施,否则如果超出最大负载,可能导致 消费端 性能下降,服务器卡顿甚至崩溃等一系列严重后果

RabbitMQ 提供了一种 qos (服务质质量保证)功能,即在 非自动确认消息的前提下,如果一定数目的消息 ( 通过基于 生产者或者 channel设置 Qos 的值) 未被确认前,不消费新的消息
不能设置自动签收功能( auto_ack = false ),如果消息未被确认,就不会到达 消费端 ,目的就是给 生产端 减压

这是可以设置预取计数值为1,告诉RabbitMQ一次只向一个worker发送一条消息,换句话说,在处理并确认前一个消息之前,不要向工作人员发送新消息

如上面ACK消费确认机制中消费者代码:

// 第二个参数:同一时刻服务器只会发送1条消息给消费者
//basic_qos($prefetch_size, $prefetch_count, $a_global)
$channel->basic_qos(null, 1, null);
参数说明:
$prefetch_size:单条消息的大小限制, 通常设置为 0 ,表示不做限制
$prefetch_count:一次最多能处理多少条消息
$a_global:是否将上面设置: true 应用于 channel 级别, false 代表消费者级别
$prefetch_size,$a_global这两项, RabbitMQ 没有实现,暂且不研究.$prefetch_count 在auto_ack = f alse 的情况下生效,即在自动应答的情况下该值无效

6.幂等性概念

一句话概括: 用户对于同一操作发起的一次请求或者多次请求的结果是一致的
比如:对一个SQL执行100次1000次,我们可以借鉴数据库的乐观锁机制:比如我们执行一条更新库存的SQL语句:update T_reps set count = count -1, version = version +1 where version = 1,数据库的乐观锁在执行更新操作前一先去数据库查询此version ,然后执行更新语句,以此version作为条件,如果执行更新时有其他人先更新了这张表的数据,那么这个条件就不生效了,也就不会执行操作了,通过这种 乐观锁的机制来保障幕等性

消费端 - 幂等性保障

在海量订单产生的业务高峰期,如何避免 消息的重复消费问题?
在业务高峰期:容易产生 消息重复消费问题,当消费端消费完消息时,在给生产者端返回ack时由于 网络中断,导致生产端 未收到确认信息,该条消息就会 重新发送被消费端消费,但实际上该消费端已成功消费了该条消息,这就造成了重复消费.而 消费端实现 幂等性,就意味着:消息不会被多次消费,即使收到了很多一样的消息

业界主流的幂等性操作解决方案:

(1)唯一Id + 指纹码 机制,核心:利用数据库主键去重

  • 唯一Id: 业务表主键

  • 指纹码: 为了区别每次正常操作的码,每次操作时生成指纹码;可以用时间截+业务编号或者标志位(具体视业务场景而定 )

select count(1) from t_order where id = 唯一Id + 指纹码

  • 优势:实现简单

  • 弊端:高并发下有数据库写入的性能瓶颈

  • 解决方案:根据ID进行分库分表进行算法路由

(2)利用Redis的原子性去实现

  • 第一:是否要进行数据落库,如果落库的话,关键解决的问题是数据库和缓存如何做到原子性?

  • 第二:如果不进行落库,那么都存储到缓存中,如何设置定时同步的策略?

7.return机制

  • Return Listener用于处理一些不可路由的消息,也是生产阶段添加的一个监听

  • 消息生产者通过指定一个Exchange和Routing Key,把消息送达到某一个队列中去,然后消费者监听队列,进行消费处理操作

  • 但是在某些情况下,如果在发送消息的时候,当前的Exchange不存在或者指定的路由key路由不到,这个时候如果需要监听这种不可达的消息,就要使用Return Listener

  • 在API中有一个关键的配置项 Mandatory:如果为true,则监听器会接收到路由不可达的消息,然后进行后续处理,如果为false,那么Broker(服务器)会自动删除该消息

8.消息的可靠性投递

(1).什么是生产端的可靠性投递?

  • 保障消息的成功发出

  • 保障MQ节点的成功接收

  • 发送端收到MQ节点(Broker)确认应答

  • 完善的消息进行补偿机制(在大厂一般都不会加事务,都是进行补偿操作)

在实际生产中,很难保障前三点的完全可靠,比如在 极端的环境中,生产者发送消息失败了,发送端在接受确认应答时突然发生网络闪断等等情况,很难保障可靠性投递,所以就需要有第四点完善的 消息补偿机制

(2).解决方案

方案一 消息信息落库,对消息状态进行打标(常见方案

将消息持久化到 DB中并设置状态值,收到 Consumer 的应答就改变当前记录的状态
再轮询重新发送没接收到应答的消息,注意这里要设置重试次数
方案实现流程
比如下单成功

步骤1

对订单数据入ORDER_DB 订单库,并对因此生成的业务消息入 MSG_DB 消息库,此处由于采用了两个数据库,需要两次持久化操作,为了保证数据的一致性,有人可能就想采用分布式事务,但在大厂实践中,基本都是采用补偿机制

这里一定要保证步骤1中消息都存储成功了,没有出现任问异常情况,然后生产端再进行消息发送.如果失败了就进行快速失败机制

对业务数据和消息入库完毕就进入步骤2

步骤2

发送消息到MQ服务上,如果一切正常无误消费者监听到该消息,进入步骤3

步骤3

生产端有一个 confi rm Listener ,异步监听 Broker(服务端) 回送的响应,从而判断消息是否投递成功

步骤4

如果成功,去数据库查询该消息.并将消息状态更新为 1

步骤5

如果出现意外情况,消费者未接收到或者Listener 接收确认时发生网络闪断,导致生产端的Listener 就永远收不到这条消息的 confi rm应答了,也就是说这条消息的状态就一直为0 了,这时候就需要用到分布式定时任务来从 MSG_DB 数据库抓取那些超时了还未被消费的消息,重新发送一遍。此时需要设置一个规则,比如说消息在入库时候设置一个临界值 timeout , 5 分钟之后如果还是0的状态那就需要把消息抽取出来。这里使用的是分布式定时任务,去定时抓取 MSG_DB中距离消息创建时间超过 5 分钟的且状态为0 的消息

步骤6

把抓取出来的消息进行重新投递( Retry Send ) ,也就是从第二步开始继续往下走

步骤7

当然有些消息可能就是由于一些实际的问题无法路由到 Broker ,比如Routing Key设置不对,对应的队列被误删除了,那么这种消息即使重试多次也仍然无法投递成功,所以需要对重复次数做限制,比如限制 3 次,如果投递次数大于3次,那么就将消息状态更新为 2 ,表示这个消息最终投递失败,然后通过补偿机制,人工去处理,实际生产中.这种情况还是比较少的,但是不能没有这个补偿机制,要不然就做不到可靠性了

缺点
在第一步需要更新或者插入操作数据库2次
优化
不需要消息进行持久化 只需要业务持久化

方案二 消息的延迟投递,做二次确认,回调检查(不常用,大厂在用的高并发方案)

方案实现流程

步骤1

(上游服务: Upstream service )业务入库,然后send 消息到broker,这两步是有先后顺序的

步骤2

进行消息延迟发送到新的队列(延迟时间为 5 分钟:业务决定)

步骤3

(下游服务: Downstream service )监听到消息然后处理消息

步骤4

下游服务 send confirm生成新的消息到 broker (这里是一个新的队列 )

步骤5

callback service 去监听这个消息,并且入库,如果监听到,表示这个消息已经消费成功

步骤6

callback service 去检查 步骤2投递的延迟消息是否 在msgDB里面是否消费成功,不存在或者消费失败就会 Resend command

如果在第 1 , 2 , 4 步失败,如果成功broker 会给一个 confirm ,失败当然没有,这是消息可靠性投递的里要保障

9.注意

关于队列大小: 如果所有的工作者都处理繁忙状态,队列就会被填满,需要留意这个问题,要么添加更多的工作者(workers),要么使用其他策略

六.RabbitMQ几种常见的交换器模式

1.消息模型基本介绍

前面的教程中,讲的是 发送消息到队列并从中取出消息,现在介绍RabbitMQ中 完整的消息模型

简单的概括一下之前讲的:

  • 发布者(producer)是发布消息的应用程序

  • 队列(queue)用于消息存储的缓冲

  • 消费者(consumer)是接收消息的应用程序

RabbitMQ消息模型的 核心理念是:发布者(producer) 不会直接发送任何消息给 队列,事实上,发布者(producer)甚至不知道消息是否已经被投递到队列。发布者(producer)只需要 把消息发送给一个交换机(exchange),交换机非常简单,它一边从发布者方 接收消息,一边把消息 推送到队列,交换机 必须知道如何处理它接收到的消息,是应该 路由指定的队列还是是 多个队列,或者是直接 忽略消息,这些规则是通过交换机类型(exchange type)来定义的

有几个可供选择的交换器类型:direct, topic, headers和fanout

direct(直连/定向交换器)

消息与一个特定的路由键完全匹配

topic(主题交换器)

使用通配符*,#,让路由键和某种模式进行匹配

headers(头交换器)

不处理路由键,而是根据发送的消息内容中的 headers 属性进行匹配

fanout(扇型交换器)

发布/ 订阅模式可以理解为广播模式:即exchange会将消息转发到所有绑定到这个exchange的队列上,这种类型在发送消息,queue bind时,都将忽略route key,也就是说不需要设置 route key

Routing Key(路由键): 生产者将消息发送给交换器,一般都会指定一个Routing Key,用来指定这个消息的路由规则,而这个Routing Key需要与交换器类型和绑定键(Binding Key)联合使用才能生效

Binding(绑定):它是Exchange与Queue之间的虚拟连接,通俗的讲就是交换器和队列之间的联系(这个队列(Queue)对这个交换机(Exchange)的消息感兴趣),实现了根据不同的Routing Key(路由规则),交换机将消息路由(发送)到对应的Queue上

2.交换器核心方法

//试探性申请一个交换器,若该交换器存在,则跳过,不存在则创建
exchange_declare($exchange,$type,$passive = false,$durable = false,$auto_delete = true,$internal = false,$nowait = false,$arguments = array(),$ticket = null)

参数名

默认值

解释

$exchange

交换器名称

$type

交换器类型:

’’ 默认交换机 匿名交换器 未显示声明类型都是该类型

fanout 扇形交换器 会发送消息到它所知道的所有队列,每个消费者获取的消息都是一致的

headers 头部交换器

direct 直连交换器,该交换机将会对绑定键(binding key)和路由键(routing key)进行精确匹配

topic 主题交换器 该交换机会对路由键正则匹配,必须是*(一个单词)、#(多个单词,以.分割) ,eg:user.key .abc.* 类型的key

$passive

false

只判断不创建(一般用于判断交换器是否存在)

true:

1.如果exchange已存在则直接连接并且不检查配置比如已存在的exchange是fanout,新需要建立的是direct,也不会报错;

2.如果exchange不存在则直接报错

false:

1.如果exchange不存在则创建新的exchange

2.如果exchange已存在则判断配置是否相同,如果配置不相同则直接报错,比如已存在的exchange是fanout,新需要建立的是direct,会报错。

$durable

false

设置是否持久化,设置为true,表示持久化,反之非持久化,持久化可以将交换器存盘,在服务器重启的时候不会丢失相关信息

$auto_delete

true

设置是否自动删除,设置为true时,表示自动删除。自动删除的前提:至少有一个队列或者交换器与这个交换器绑定,之后所有与这个交换器绑定的队列或者交换器都与此解绑

$internal

false

设置是否为内置的,设置为true表示是内置的交换器,客户端程序无法直接发送消息到这个交换器,只能通过交换器路由到这个交换器

$nowait

false

如果为true,表示不等待服务器回执信息,函数将返回null,可以提高访问速度

$arguments

array()

其他结构化参数

$ticket

null

3.fanout模式(广播模式)

广播模式可以理解为:发布/订阅模式,即exchange会将消息转发到所有绑定到这个exchange的队列上。针对这种广播模式,在发送消息,queue bind时,都将忽略route key,也就是说不需要设置 route key

案例一

一个生产者生产消息并发布消息到交换器上,多个消费者订阅该交换器,并与之队列绑定,消费消息

rabbitmq配置

 "rabbitMq" => [
        "base" => [
            'host' => '192.168.0.5',  // host地址
            'port' => 5672,  // 端口
            "user" => "user",  // 账户
            'pass' => 123456,  // 密码
            "v_host" => "order",  // 对应Virtual Hosts
        ],
        "exchange_name" => [
            "name1" => "exch", // 交换器名称
        ],
]

生产者

<?php
/**
 * 交换器fanout(广播)模式: 生产者生产消息
 */

namespace console\controllers\exchange\fanout;

use PhpAmqpLib\Exchange\AMQPExchangeType;
use Yii;
use yii\web\Controller;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

class PublisherController extends Controller
{
    public $enableCsrfValidation=false;

    public function actionIndex()
    {
        //rabbitmq相关配置
        $rabbitMqConfig = Yii::$app->params["rabbitMq"];
        $config = $rabbitMqConfig["base"];
        $exchangeName = $rabbitMqConfig["exchange_name"]["name1"];
        // 创建连接
        $connection = new AMQPStreamConnection($config["host"], $config["port"], $config["user"], $config["pass"], $config["v_host"]);
        // 创建channel
        $channel = $connection->channel();
        // 声明并初始化交换器
        $channel->exchange_declare($exchangeName, AMQPExchangeType::FANOUT, false, false, false);
        // 声明一个数据
        $data = "this is a exchange message";
        // 初始化消息并持久化
        $msg = new AMQPMessage($data, ['delivery_mode' => AMQPMessage::DELIVERY_MODE_NON_PERSISTENT]);
        // 发布消息到交换器
        $channel->basic_publish($msg, $exchangeName);

        //关闭通道和连接
        $channel->close();
        $connection->close();
    }
}

消费者(多个)

消费者1

<?php
/**
 *  交换器fanout(广播)模式: 消费者消费消息
 */

namespace console\controllers\exchange\fanout;

use PhpAmqpLib\Exchange\AMQPExchangeType;
use Yii;
use yii\web\Controller;
use PhpAmqpLib\Connection\AMQPStreamConnection;

class ConsumerController extends Controller
{
    public $enableCsrfValidation = false;

    public function actionIndex()
    {
        //rabbitmq相关配置
        $rabbitMqConfig = Yii::$app->params["rabbitMq"];
        $config = $rabbitMqConfig["base"];
//        $queueName = $rabbitMqConfig["queue_name"]["name4"];
        $exchangeName = $rabbitMqConfig["exchange_name"]["name1"];
        // 创建连接
        $connection = new AMQPStreamConnection($config["host"], $config["port"], $config["user"], $config["pass"], $config["v_host"]);
        // 创建channel
        $channel = $connection->channel();
        // 声明对应的交换器
        $channel->exchange_declare($exchangeName, AMQPExchangeType::FANOUT, false, false, false);
        // 声明队列
        list($queueName) = $channel->queue_declare('', false, false, true, false);
        // 交换机与队列绑定
        $channel->queue_bind($queueName, $exchangeName);
        // 消息回调处理
        $callback = function ($meg) {
            echo "revince: " . $meg->body. "\n";
            $meg->ack();
        };
        // 设置消费者处理消息限制,第二个参数:同一时刻服务器只会发送1条消息给消费者消费
        $channel->basic_qos(null, 1, null);
        // 消费者消费消息: 第四个参数: 需要ack确认
        $channel->basic_consume($queueName, '', false, false, false, false, $callback);
        // 监控
        while ($channel->is_open()) {
            $channel->wait();
        }
        //关闭通道和连接
        $channel->close();
        $connection->close();
    }
}

消费者2

<?php
/**
 *  交换器fanout(广播)模式: 消费者消费消息
 */

namespace console\controllers\exchange\fanout;

use PhpAmqpLib\Exchange\AMQPExchangeType;
use Yii;
use yii\web\Controller;
use PhpAmqpLib\Connection\AMQPStreamConnection;

class Consumer2Controller extends Controller
{
    public $enableCsrfValidation = false;

    public function actionIndex()
    {
        //rabbitmq相关配置
        $rabbitMqConfig = Yii::$app->params["rabbitMq"];
        $config = $rabbitMqConfig["base"];
//        $queueName = $rabbitMqConfig["queue_name"]["name4"];
        $exchangeName = $rabbitMqConfig["exchange_name"]["name1"];
        // 创建连接
        $connection = new AMQPStreamConnection($config["host"], $config["port"], $config["user"], $config["pass"], $config["v_host"]);
        // 创建channel
        $channel = $connection->channel();
        // 声明对应的交换器
        $channel->exchange_declare($exchangeName, AMQPExchangeType::FANOUT, false, false, false);
        // 声明队列
        list($queueName) = $channel->queue_declare('', false, false, true, false);
        // 交换机与队列绑定
        $channel->queue_bind($queueName, $exchangeName);
        // 消息回调处理
        $callback = function ($meg) {
            echo "revince: " . $meg->body. "\n";
            $meg->ack();
        };
        // 设置消费者处理消息限制,第二个参数:同一时刻服务器只会发送1条消息给消费者消费
        $channel->basic_qos(null, 1, null);
        // 消费者消费消息: 第四个参数: 需要ack确认
        $channel->basic_consume($queueName, '', false, false, false, false, $callback);
        // 监控
        while ($channel->is_open()) {
            $channel->wait();
        }
        //关闭通道和连接
        $channel->close();
        $connection->close();
    }
}

案例二

举个实际应用的场景:比方说用户注册(注销,更改姓名等)新浪,同时需要开通微博、博客、邮箱等,如果不采用队列,按照常规的线性处理,可能注册用户会特别的慢,因为在注册的时候,需要调各种其他服务器接口,如果服务很多的话,可能客户端就超时了。如果采用普通的队列,可能在处理上也会特别的慢(不是最佳方案),如果采用订阅模式,则是最优的选择

处理过程如下:

  1. 用户提交username、pwd…等之类的基本信息,将数据提交register.php中

  1. register.php对数据进行校验,符合注册要求,生成uid,并将和基本信息json后,发布一条消息到对应的交换器中,同时直接显示用户注册成功

  1. exchange中的多个队列,如(queue.process、queue.boke、queue.weibo、queue.email)都订阅了这个消息,根据各业务自身的逻辑来处理

总结:

1.不申明队列,因为发布/订阅模式下,是可以随时添加新的订阅队列

2.exchange的Type指定为fanout(广播模式)

3.队列不需要指定route key,绑定exchange

代码如下:

生产者
<?php
/**
 * 交换器fanout(广播)模式: 生产者生产消息
 */

namespace console\controllers\exchange\fanout;

use PhpAmqpLib\Exchange\AMQPExchangeType;
use Yii;
use yii\web\Controller;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

class PublisherController extends Controller
{
    public function actionIndex()
    {
        /*
            用户注册逻辑
        */
        
        //发送消息逻辑
        //rabbitmq相关配置
        $rabbitMqConfig = Yii::$app->params["rabbitMq"];
        $config = $rabbitMqConfig["base"];
        $exchangeName = "register";
        // 创建连接
        $connection = new AMQPStreamConnection($config["host"], $config["port"], $config["user"], $config["pass"], $config["v_host"]);
        // 创建channel
        $channel = $connection->channel();
        // 声明并初始化交换器
        $channel->exchange_declare($exchangeName, AMQPExchangeType::FANOUT, false, false, false);
        // 声明一个数据
        $data = "{uid:xxx,reg_time:xxx}";
        // 初始化消息并持久化
        $msg = new AMQPMessage($data, ['delivery_mode' => AMQPMessage::DELIVERY_MODE_NON_PERSISTENT]);
        // 发布消息到交换器
        $channel->basic_publish($msg, $exchangeName);

        //关闭通道和连接
        $channel->close();
        $connection->close();
    }
}
消费者(多个)
可以创建多个不同类型的消费者(开通微博、博客、邮箱)等逻辑功能的消费者
<?php
/**
 *  交换器fanout(广播)模式: 消费者消费消息
 */

namespace console\controllers\exchange\fanout;

use PhpAmqpLib\Exchange\AMQPExchangeType;
use Yii;
use yii\web\Controller;
use PhpAmqpLib\Connection\AMQPStreamConnection;

class ConsumerController extends Controller
{
    public $enableCsrfValidation = false;

    public function actionIndex()
    {
        //rabbitmq相关配置
        $rabbitMqConfig = Yii::$app->params["rabbitMq"];
        $config = $rabbitMqConfig["base"];
        $exchangeName = "register";
        // 创建连接
        $connection = new AMQPStreamConnection($config["host"], $config["port"], $config["user"], $config["pass"], $config["v_host"]);
        // 创建channel
        $channel = $connection->channel();
        // 声明对应的交换器
        $channel->exchange_declare($exchangeName, AMQPExchangeType::FANOUT, false, false, false);
        // 声明一个匿名队列
        list($queueName) = $channel->queue_declare('', false, false, true, false);
        // 交换机与队列绑定
        $channel->queue_bind($queueName, $exchangeName);
        // 消息回调处理
        $callback = function ($meg) {
            //处理逻辑
            echo "revince: " . $meg->body. "\n";
            $meg->ack();
        };
        // 设置消费者处理消息限制,第二个参数:同一时刻服务器只会发送1条消息给消费者消费
        $channel->basic_qos(null, 1, null);
        // 消费者消费消息: 第四个参数: 需要ack确认
        $channel->basic_consume($queueName, '', false, false, false, false, $callback);
        // 监控
        while ($channel->is_open()) {
            $channel->wait();
        }
        //关闭通道和连接
        $channel->close();
        $connection->close();
    }
}

4.Direct模式

Direct交换器将消息投递到路由参数 完全匹配的队列中

直接上代码

rabbitmq配置

 "rabbitMq" => [
        "base" => [
            'host' => '192.168.0.5',  // host地址
            'port' => 5672,  // 端口
            "user" => "user",  // 账户
            'pass' => 123456,  // 密码
            "v_host" => "order",  // 对应Virtual Hosts
        ],
        "queue_name" => [
            "name1" => "goods",  // 队列名称
            "name2" => "task_queue",  // 队列名称
            "name3" => "task_ack",  // 队列名称
            "name4" => "exchange_fanout_1",  // 队列名称
        ],
        "exchange_name" => [
            "name1" => "exch", // 交换器名称
            "name2" => "exch_direct_log", // 交换器名称
        ],
        "routing_key" => [
            "info_key" => "info",  // 路由键
            "error_key" => "error",  // 路由键
            "warn_key" => "warn",  // 路由键
        ],
】

生产者

<?php
/**
 * 交换器direct(routing_key-更详细的bind模式)模式: 生产者生产消息
 */

namespace console\controllers\exchange\direct;

use PhpAmqpLib\Exchange\AMQPExchangeType;
use Yii;
use yii\web\Controller;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

class PublisherController extends Controller
{
    public $enableCsrfValidation=false;

    public function actionIndex()
    {
        //rabbitmq相关配置
        $rabbitMqConfig = Yii::$app->params["rabbitMq"];
        $config = $rabbitMqConfig["base"];
        $exchangeName = $rabbitMqConfig["exchange_name"]["name2"];
        $routingKey = $rabbitMqConfig["routing_key"]["error_key"];
        // 创建连接
        $connection = new AMQPStreamConnection($config["host"], $config["port"], $config["user"], $config["pass"], $config["v_host"]);
        // 创建channel
        $channel = $connection->channel();
        // 声明并初始化交换器
        $channel->exchange_declare($exchangeName, AMQPExchangeType::DIRECT, false, false, false);
        // 声明一个数据
        $data = "this is a ". $routingKey . " message";
        // 初始化消息并持久化
        $msg = new AMQPMessage($data, ['delivery_mode' => AMQPMessage::DELIVERY_MODE_NON_PERSISTENT]);
        // 发布消息到交换器, 并和路由键匹配
        $channel->basic_publish($msg, $exchangeName, $routingKey);

        //关闭通道和连接
        $channel->close();
        $connection->close();
    }
}

消费者

<?php
/**
 *  交换器direct(routing_key-更详细的bind模式)模式: 消费者消费消息
 */

namespace console\controllers\exchange\direct;

use PhpAmqpLib\Exchange\AMQPExchangeType;
use Yii;
use yii\web\Controller;
use PhpAmqpLib\Connection\AMQPStreamConnection;

class ConsumerWarnController extends Controller
{
   

    public function actionIndex()
    {
        //rabbitmq相关配置
        $rabbitMqConfig = Yii::$app->params["rabbitMq"];
        $config = $rabbitMqConfig["base"];
        $exchangeName = $rabbitMqConfig["exchange_name"]["name2"];
        $routingKey = $rabbitMqConfig["routing_key"]["warn_key"]; //路由键可以修改为其他key,与生产者bind的关联
        // 创建连接
        $connection = new AMQPStreamConnection($config["host"], $config["port"], $config["user"], $config["pass"], $config["v_host"]);
        // 创建channel
        $channel = $connection->channel();
        // 声明对应的交换器
        $channel->exchange_declare($exchangeName, AMQPExchangeType::DIRECT, false, false, false);
        // 声明一个匿名队列
        list($queueName) = $channel->queue_declare('', false, false, true, false);
        // 交换机与队列绑定,并指定routing_key
        $channel->queue_bind($queueName, $exchangeName, $routingKey);
        // 消息回调处理
        $callback = function ($meg) {
            echo "revince: " . $meg->body. "\n";
            $meg->ack();
        };
        // 设置消费者处理消息限制,第二个参数:同一时刻服务器只会发送1条消息给消费者消费
        $channel->basic_qos(null, 1, null);
        // 消费者消费消息: 第四个参数: 需要ack确认
        $channel->basic_consume($queueName, '', false, false, false, false, $callback);
        // 监控
        while ($channel->is_open()) {
            $channel->wait();
        }
        //关闭通道和连接
        $channel->close();
        $connection->close();
    }
}

5.topic模式

发送到topic交换器的消息不可以携带随意routing_key,它的routing_key必须是一个由 .分隔开的 词语列表,这些单词随便是什么都可以,但是最好是跟携带它们的消息有关系的词汇,以下是几个推荐的例子:"stock.usd.nyse", "nyse.vmw", "quick.orange.rabbit",词语的个数可以随意,但是 不要超过255字节。binding key也必须拥有同样的格式,topic交换器背后的逻辑跟direct交换机很相似 : 一个携带着特定routing_key的消息会被topic交换机投递给绑定键与之想匹配的队列,但是它的binding key和routing_key有两个特殊应用方式:
  • * (星号) 用来表示一个单词

  • # (井号) 用来表示任意数量(零个或多个)单词

下边用图说明:

这个例子里,发送的所有消息都是用来描述小动物的,发送的消息所携带的路由键是由三个单词所组成的,这三个单词被两个.分割开,路由键里的第一个单词

描述的是动物的手脚的利索程度,第二个单词是动物的颜色,第三个是动物的种类,所以它看起来是这样的: <celerity>.<colour>.<species>。

创建了三个绑定:Q1的绑定键为 *.orange.*,Q2的绑定键为 *.*.rabbit 和 lazy.# 。

  • Q1-->绑定的是

  • 中间带 orange 带 3 个单词的字符串 (*.orange.*)

  • Q2-->绑定的是

  • 最后一个单词是 rabbit 的 3 个单词 (*.*.rabbit)

  • 第一个单词是 lazy 的多个单词 (lazy.#)

这三个绑定键被可以总结为:

  • Q1 对所有的桔黄色动物都感兴趣

  • Q2 则是对所有的兔子所有懒惰的动物感兴趣

一个携带有 quick.orange.rabbit 的消息将会被分别投递给这两个队列,携带着 lazy.orange.elephant 的消息同样也会给两个队列都投递过去。另一方面携带有 quick.orange.fox 的消息会投递给第一个队列,携带有 lazy.brown.fox 的消息会投递给第二个队列。携带有 lazy.pink.rabbit 的消息只会被投递给第二个队列一次,即使它同时匹配第二个队列的两个绑定。携带着 quick.brown.fox 的消息不会投递给任何一个队列。

注意:

如果违反约定,发送了一个携带有一个单词或者四个单词("orange" or "quick.orange.male.rabbit")的消息时,发送的消息不会投递给任何一个队列,而且会丢 失掉

但是另一方面,即使 "lazy.orange.male.rabbit" 有四个单词,他还是会匹配最后一个绑定,并且被投递到第二个队列中。

topic交换机是很强大的,它可以表现出跟其他交换机类似的行为 当一个队列的binding key为 "#"(井号) 的时候,这个队列将会无视消息的routing key,接收所有的消息。 当 * (星号) 和 # (井号) 这两个特殊字符都未在binding key中出现的时候,此时Topic交换机就拥有的direct交换机的行为

代码如下:

rabbitmq配置

"rabbitMq" => [
    "base" => [
        'host' => '192.168.0.5',  // host地址
        'port' => 5672,  // 端口
        "user" => "user",  // 账户
        'pass' => 123456,  // 密码
        "v_host" => "order",  // 对应Virtual Hosts
    ],
    "queue_name" => [
        "name1" => "goods",  // 队列名称
        "name2" => "task_queue",  // 队列名称
        "name3" => "task_ack",  // 队列名称
        "name4" => "exchange_fanout_1",  // 队列名称
    ],
    "exchange_name" => [
        "name1" => "exch", // 交换器名称
        "name2" => "exch_direct_log", // 交换器名称
        "name3" => "exch_topic_log", // 交换器名称
    ],
    "routing_key" => [
        "info_key" => "info",  // 路由键
        "error_key" => "error",  // 路由键
        "warn_key" => "warn",  // 路由键
        "all_key" => "#",  // 所有的路由键
        "user_info" => "user.info", // 路由键
        "user_warn" => "user.warn", // 路由键
        "user_all" => "user.*", // 匹配以user.开头的路由键
    ],
]
】

生产者

<?php
/**
 * 交换器topic(通配符)模式: 生产者生产消息
 */

namespace console\controllers\exchange\topic;

use PhpAmqpLib\Exchange\AMQPExchangeType;
use Yii;
use yii\web\Controller;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

class PublisherController extends Controller
{
  
    public function actionIndex()
    {
        //rabbitmq相关配置
        $rabbitMqConfig = Yii::$app->params["rabbitMq"];
        $config = $rabbitMqConfig["base"];
        $exchangeName = $rabbitMqConfig["exchange_name"]["name3"];
        $routingKey = $rabbitMqConfig["routing_key"]["user_info"];
        // 创建连接
        $connection = new AMQPStreamConnection($config["host"], $config["port"], $config["user"], $config["pass"], $config["v_host"]);
        // 创建channel
        $channel = $connection->channel();
        // 声明并初始化交换器
        $channel->exchange_declare($exchangeName, AMQPExchangeType::TOPIC, false, false, false);
        // 声明一个数据
        $data = "this is a ". $routingKey . " message";
        // 初始化消息并持久化
        $msg = new AMQPMessage($data, ['delivery_mode' => AMQPMessage::DELIVERY_MODE_NON_PERSISTENT]);
        // 发布消息到交换器, 并和路由键匹配
        $channel->basic_publish($msg, $exchangeName, $routingKey);

        //关闭通道和连接
        $channel->close();
        $connection->close();
    }
}

消费者

<?php
/**
 *  交换器topic(通配符)模式: 消费者消费消息
 *
 */

namespace console\controllers\exchange\topic;

use PhpAmqpLib\Exchange\AMQPExchangeType;
use Yii;
use yii\web\Controller;
use PhpAmqpLib\Connection\AMQPStreamConnection;

class ConsumerController extends Controller
{

    public function actionIndex()
    {
        //rabbitmq相关配置
        $rabbitMqConfig = Yii::$app->params["rabbitMq"];
        $config = $rabbitMqConfig["base"];
        $exchangeName = $rabbitMqConfig["exchange_name"]["name3"];
        $routingKey = $rabbitMqConfig["routing_key"]["user_all"];
        // 创建连接
        $connection = new AMQPStreamConnection($config["host"], $config["port"], $config["user"], $config["pass"], $config["v_host"]);
        // 创建channel
        $channel = $connection->channel();
        // 声明对应的交换器
        $channel->exchange_declare($exchangeName, AMQPExchangeType::TOPIC, false, false, false);
        // 声明队列
        list($queueName) = $channel->queue_declare('', false, false, true, false);
        // 交换机与队列绑定,并指定routing_key
        $channel->queue_bind($queueName, $exchangeName, $routingKey);
        // 消息回调处理
        $callback = function ($meg) {
            echo "revince: " . $meg->body. "\n";
            $meg->ack();
        };
        // 设置消费者处理消息限制,第二个参数:同一时刻服务器只会发送1条消息给消费者消费
        $channel->basic_qos(null, 1, null);
        // 消费者消费消息: 第四个参数: 需要ack确认
        $channel->basic_consume($queueName, '', false, false, false, false, $callback);
        // 监控
        while ($channel->is_open()) {
            $channel->wait();
        }
        //关闭通道和连接
        $channel->close();
        $connection->close();
    }
}

七.死信队列,延时队列

1.死信队列

死信( Dead Letter )是RabbitMQ 中的一种 消息机制,当在消费消息时,如果队列里的消息出现以下情况:
  • 消息被拒绝

  • 消息在队列的存活时间超过设置的 TTL 时间

  • TTL (Time To Live),即生存时间

  • RabbitMQ支持消息的过期时间,在消息发送时可以进行指定

  • RabbitMQ 支持为每个队列设号消息的超时时间,从消息入队列开始计算,只要超过了队列的超时时间配置,那么消息会被自动清除

  • 消息队列的消息数量已经超过最大队列长度

那么该消息将成为“死信”,“死信”消息会被 RabbitMQ进行特殊处理,如果配置了死信队列信息,那么该消息将会被丢进死信队列中,如果没有配置,则该消息将会被丢弃

RabbitMQ 中有一种交换器叫 DLX,全称为 Dead 一 Letter 一 Exchange ,可以称之为 死信交换器,当消息在一个队列中变成死信( dead message )消息之后,它会被重新发送到另外一个交换器中,这个交换器就是DLX , 绑定在 DLX上的队列就称之为死信队列, 程序就可以监听这个队列中的消息,并做相应的处理,该特性 可以弥补RabbitMQ3.0以前支持的immediate参数的功能

消息变成死信有以下几种情况:

  • 消息被拒绝消息

  • TTL 过期(延迟队列)

  • 队列达到最大长度

2.延时队列

延时队列就是用来存放需要在 指定时间被处理的元素的队列.
一般可以利用 死信队列的特性实现延迟队列:只要给消息设置一个过期时间,消息过期就会自动进入死信队列,消费者只要监听死信队列就可以实现延迟队列了

应用场景

  • 订单在十分钟之内未支付则自动取消

  • 账单在一周内未支付,则自动结算

  • 某个时间下发一条通知

  • 用户注册成功后,如果三天内没有登陆则进行短信提醒

  • 用户发起退款,如果三天内没有得到处理则通知相关运营人员

下面通过一个案例来更进一步了解死信队列,延时队列

案例1

订单在一段时间内未支付则自动取消,步骤:
(1).创建订单操作
(2).订单创建成功后,订单相关数据json处理
(3).构建rabbitmq消息队列,并设置消息过期时间为60秒,把订单相关json数据发布到交换器, 并和路由键匹配,生产者生产消息60秒之后,消息会进入到死信队列,消费者监听死信队列,处理订单

rabbitmq配置

"rabbitMq" => [
    "base" => [
        'host' => '192.168.0.5',  // host地址
        'port' => 5672,  // 端口
        "user" => "user",  // 账户
        'pass' => 123456,  // 密码
        "v_host" => "order",  // 对应Virtual Hosts
    ],
    "queue_name" => [
        "name1" => "goods",  // 队列名称
        "name2" => "task_queue",  // 队列名称
        "name3" => "task_ack",  // 队列名称
        "name4" => "exchange_fanout_1",  // 队列名称
        "name5" => "queue_pay",  // 订单支付队列
    ],
    "exchange_name" => [
        "name1" => "exch", // 交换器名称
        "name2" => "exch_direct_log", // 交换器名称
        "name3" => "exch_topic_log", // 交换器名称
        "name4" => "exch_pay", // 订单支付, 交换器名称
    ],
    "routing_key" => [
        "info_key" => "info",  // 路由键
        "error_key" => "error",  // 路由键
        "warn_key" => "warn",  // 路由键
        "order_key" => "order_pay",  // 订单支付
        "all_key" => "#",  // 所有的路由键
        "user_info" => "user.info", // 路由键
        "user_warn" => "user.warn", // 路由键
        "user_all" => "user.*", // 匹配以user.开头的路由键
    ],
    "dead_letter" => [  // 死信队列
        "exchange_name" => [  // 死信队列交换机名称
            "pay" => "dead_exch_pay"
        ],
        "queue_name" => [ // 死信队列名称
            "pay" => "dead_queue_pay"
        ],
        "routing_key" => [  // 死信队列routing名称
            "pay" => "dead_routing_key_pay"
        ]
    ]
]

生产者

<?php
/**
 * 死信队列,延时队列: 生产者推送消息到队列,模拟订单支付
 */

namespace console\controllers\exchange\dead;

use PhpAmqpLib\Exchange\AMQPExchangeType;
use PhpAmqpLib\Wire\AMQPTable;
use Yii;
use yii\web\Controller;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

class PublisherController extends Controller
{
    public $enableCsrfValidation = false;

    public function actionIndex()
    {
        //rabbitmq相关配置
        $rabbitMqConfig = Yii::$app->params["rabbitMq"];
        $deadConfig = Yii::$app->params["rabbitMq"]["dead_letter"];
        $config = $rabbitMqConfig["base"];
        $exchangeName = $rabbitMqConfig["exchange_name"]["name4"];
        $queueName = $rabbitMqConfig["queue_name"]["name5"];
        $routingKey = $rabbitMqConfig["routing_key"]["order_key"];
        // 创建连接
        $connection = new AMQPStreamConnection($config["host"], $config["port"], $config["user"], $config["pass"], $config["v_host"]);
        // 创建channel
        $channel = $connection->channel();
        // 声明并初始化交换器, 交换机类型: routing_key-更详细的bind模式
        $channel->exchange_declare($exchangeName, AMQPExchangeType::DIRECT, false, false, false);
        // 消息队列的额外参数
        $args = new AMQPTable([
            'x-message-ttl' => 2000, // 消息的过期时间
            "x-dead-letter-exchange" => $deadConfig["exchange_name"]["pay"], // 死信队列交换机名称
            "x-dead-letter-routing-key" => $deadConfig["routing_key"]["pay"]  // 死信队列routing名称
        ]);
        // 声明队列
        $channel->queue_declare($queueName, false, true, false, false, false, $args);
        // 交换机和队列绑定
        $channel->queue_bind($queueName, $exchangeName, $routingKey);

        // 声明死信交换机
        $channel->exchange_declare($deadConfig["exchange_name"]["pay"], AMQPExchangeType::DIRECT, false, false, false);
        // 声明死信队列
        $channel->queue_declare($deadConfig["queue_name"]["pay"], false, true, false, false, false);
         // 死信交换机和队列绑定
        $channel->queue_bind($deadConfig["queue_name"]["pay"], $deadConfig["exchange_name"]["pay"], $deadConfig["routing_key"]["pay"]);

        // 声明一个数据:里面可以是用户订单相关json数据
        $data = "this is a dead message";
        // 初始化消息并持久化
        $msg = new AMQPMessage($data, ['delivery_mode' => AMQPMessage::DELIVERY_MODE_NON_PERSISTENT]);
        // 发布消息到交换器, 并和路由键匹配
        $channel->basic_publish($msg, $exchangeName, $routingKey);

        //关闭通道和连接
        $channel->close();
        $connection->close();
    }
}

消费者

<?php
/**
 *  死信队列,延时队列: 模拟订单支付,消费者消费消息
 *
 */

namespace console\controllers\exchange\dead;

use PhpAmqpLib\Exchange\AMQPExchangeType;
use Yii;
use yii\web\Controller;
use PhpAmqpLib\Connection\AMQPStreamConnection;

class ConsumerController extends Controller
{  

    public function actionIndex()
    {
        //rabbitmq相关配置
        $rabbitMqConfig = Yii::$app->params["rabbitMq"];
        $config = $rabbitMqConfig["base"];
        $exchangeName = $rabbitMqConfig["dead_letter"]["exchange_name"]["pay"];
        $queueName = $rabbitMqConfig["dead_letter"]["queue_name"]["pay"];
        $routingKey = $rabbitMqConfig["dead_letter"]["routing_key"]["pay"];
        // 创建连接
        $connection = new AMQPStreamConnection($config["host"], $config["port"], $config["user"], $config["pass"], $config["v_host"]);
        // 创建channel
        $channel = $connection->channel();
        // 声明对应的交换器
        $channel->exchange_declare($exchangeName, AMQPExchangeType::DIRECT, false, false, false);
        // 交换机与队列绑定,并指定routing_key
        $channel->queue_bind($queueName, $exchangeName, $routingKey);
        // 消息回调处理
        $callback = function ($meg) {
            //处理订单相关数据
            echo "revince: " . $meg->body. "\n";
            $meg->ack();
        };
        // 设置消费者处理消息限制,第二个参数:同一时刻服务器只会发送1条消息给消费者消费
        $channel->basic_qos(null, 1, null);
        // 消费者消费消息: 第四个参数: 需要ack确认
        $channel->basic_consume($queueName, '', false, false, false, false, $callback);
        // 监控
        while ($channel->is_open()) {
            $channel->wait();
        }
        //关闭通道和连接
        $channel->close();
        $connection->close();
    }
}

问题

通过上面的案例就可以实现死信队列,延时队列操作,上面看上去似乎没什么问题,实测一下就会发现 消息不会“如期死亡 。当先生产一个TTL为60s的消息,再生产一个TTL为5s的消息,第二个消息并不会再5s后过期进入死信队列,而是需要等到第一个消息TTL到期后,与第一个消息一同进入死信队列, 这是因为RabbitMQ 只会判断队列中的第一个消息是否过期

那么怎么来解决这个问题呢?

通过 RabbitMQ 的 rabbitmq_delayed_message_exchange 插件来解决
此插件的原理是将消息在交换机处暂存储在mnesia(一个分布式数据系统)表中,延迟投递到队列中,等到消息到期再投递到队列当中

rabbitmq_delayed_message_exchange插件安装

(1).下载地址:https://www.rabbitmq.com/community-plugins.html
注意: 要下载与rabbitmq相对应的版本
(2).把下载的插件放到指定位置
下载的文件为zip格式,将zip格式解压,插件格式为ez,将文件复制到插件目录:
  • Linux

/usr/lib/rabbitmq/lib/rabbitmq_server-xxx/plugins
rabbitmq-plugins list
  • Windows

C:\Program Files\RabbitMQ Server\rabbitmq_server-3.7.17\plugins
(3).启动插件
  • Linux

rabbitmq-plugins enable rabbitmq_delayed_message_exchange
  • Windows

rabbitmq-plugins enable rabbitmq_delayed_message_exchange

启动信息:

(4).查看
进入:http://localhost:15672/#/exchanges

重构代码

生产者

生产者实现的关键点:
1.在声明交换机时不在是direct类型,而是x-delayed-message类型,这是由插件提供的类型
2.交换机要增加"x-delayed-type": "direct"参数设置
3.发布消息时,要在 Headers 中设置x-delay参数,来控制消息从交换机过期时间
<?php
/**
 * 死信队列,延时队列插件使用: 模拟订单支付
 */

namespace console\controllers\exchange\delay;

use PhpAmqpLib\Exchange\AMQPExchangeType;
use PhpAmqpLib\Wire\AMQPTable;
use Yii;
use yii\web\Controller;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

class PublisherController extends Controller
{
    public $enableCsrfValidation = false;

    public function actionIndex()
    {
        //rabbitmq相关配置
        $rabbitMqConfig = Yii::$app->params["rabbitMq"];
        $delayConfig = Yii::$app->params["rabbitMq"]["delay"];
        $config = $rabbitMqConfig["base"];
        // 创建连接
        $connection = new AMQPStreamConnection($config["host"], $config["port"], $config["user"], $config["pass"], $config["v_host"]);
        // 创建channel
        $channel = $connection->channel();
        // 声明并初始化交换器, 交换机类型: 延时插件名称(x-delayed-message)
        $channel->exchange_declare($delayConfig["exchange_name"]["pay"], "x-delayed-message", false, true, false);
        // 消息队列的额外参数
        $args = new AMQPTable(["x-delayed-type" => "direct"]);
        // 声明队列
        $channel->queue_declare($delayConfig["queue_name"]["pay"], false, true, false, false, false, $args);
        // 交换机和队列绑定
        $channel->queue_bind($delayConfig["queue_name"]["pay"], $delayConfig["exchange_name"]["pay"], $delayConfig["routing_key"]["pay"]);

        // 声明一个数据
        $data = "this is a dead message";
        // 初始化消息并持久化
        $arr = [
            'delivery_mode' => AMQPMessage::DELIVERY_MODE_NON_PERSISTENT,
            "application_headers" => new AMQPTable([
                "x-delayed" => 2000  // 过期时间
            ])
        ];
        $msg = new AMQPMessage($data, $arr);
        // 发布消息到交换器, 并和路由键匹配
        $channel->basic_publish($msg,  $delayConfig["exchange_name"]["pay"], $delayConfig["routing_key"]["pay"]);

        //关闭通道和连接
        $channel->close();
        $connection->close();
    }
}

消费者

没有啥特别的修改
<?php
/**
 *  死信队列,延时队列插件使用: 模拟订单支付
 *
 */

namespace console\controllers\exchange\delay;

use PhpAmqpLib\Exchange\AMQPExchangeType;
use Yii;
use yii\web\Controller;
use PhpAmqpLib\Connection\AMQPStreamConnection;

class ConsumerController extends Controller
{
    public $enableCsrfValidation = false;

    public function actionIndex()
    {
        //rabbitmq相关配置
        $rabbitMqConfig = Yii::$app->params["rabbitMq"];
        $delayConfig = Yii::$app->params["rabbitMq"]["delay"];
        $config = $rabbitMqConfig["base"];// 创建连接
        $connection = new AMQPStreamConnection($config["host"], $config["port"], $config["user"], $config["pass"], $config["v_host"]);
        // 创建channel
        $channel = $connection->channel();
        // 声明并初始化交换器, 交换机类型: 延时插件名称(x-delayed-message)
        $channel->exchange_declare($delayConfig["exchange_name"]["pay"], "x-delayed-message", false, true, false);
        // 交换机和队列绑定
        $channel->queue_bind($delayConfig["queue_name"]["pay"], $delayConfig["exchange_name"]["pay"], $delayConfig["routing_key"]["pay"]);
        // 消息回调处理
        $callback = function ($meg) {
            echo "revince: " . $meg->body. "\n";
            $meg->ack();
        };
        // 设置消费者处理消息限制,第二个参数:同一时刻服务器只会发送1条消息给消费者消费
        $channel->basic_qos(null, 1, null);
        // 消费者消费消息: 第四个参数: 需要ack确认
        $channel->basic_consume($delayConfig["queue_name"]["pay"], '', false, false, false, false, $callback);
        // 监控
        while ($channel->is_open()) {
            $channel->wait();
        }
        //关闭通道和连接
        $channel->close();
        $connection->close();
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/467701.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【Golang开发入门】你真的会用Go写“Hello world“吗?

博主简介&#xff1a;努力学习的大一在校计算机专业学生&#xff0c;热爱学习和创作。目前在学习和分享&#xff1a;数据结构、Go&#xff0c;Java等相关知识。博主主页&#xff1a; 是瑶瑶子啦所属专栏: Go语言核心编程近期目标&#xff1a;写好专栏的每一篇文章 目录 一、Go项…

Zynq-7000、FMQL45T900的GPIO控制(六)---linux驱动层配置GPIO输入输出控制

本文使用的驱动代码 Zynq-7000、FMQL45T900的GPIO控制&#xff08;六&#xff09;-linux驱动层配置GPIO输入输出控制资源-CSDN文库 在Zynq-7000、FMQL45T900驱动层也时常会用到对GPIO的控制&#xff0c;这里就针对实际使用的情况进行说明&#xff0c;首先根据之前的帖子确实使…

监测HDD smart信息的脚本编写

最近需要完成一个测试HDD的项目&#xff0c;因为接的HDD太多&#xff0c;手动查看smart信息太麻烦&#xff0c;所以需要写一个自动帮我们检查smart信息的脚本。此遍文章只介绍直连或者JBOD模式下的信息监测&#xff0c;没有涉及到组RAID模式。 1 首先看下HDD的smart信息&#x…

谁在成为产业经济发展的推车人?

区域发展的新蓝图中&#xff0c;京东云能做什么&#xff1f;它的角色是什么&#xff1f;这个问题背后&#xff0c;隐藏的不仅是京东云自身的能力和价值&#xff0c;更是其作为中国互联网云厂商的代表之一&#xff0c;对“技术产业”的新论证。 作者|皮爷 出品|产业家 关于云…

饮用水中的六价铬去除工艺

铬是人体必需的微量元素&#xff0c;天然水不含铬&#xff0c;海水中铬的平均浓度为0.05μg/L&#xff0c;饮用水中铬含量更低。 铬在水中主要以三价和六价形式存在&#xff0c;三价的铬是对人体有益的元素&#xff0c;而六价铬是有毒的。由于其毒性之高&#xff0c;已被国家列…

固定转向和行进速度下的车辆轨迹计算方法

游戏车辆固定转向轨迹计算 概述 车辆游戏是我们经常接触到的一类游戏&#xff0c;这里游戏在只用键盘操作时&#xff0c;往往非常不方便。这是因为这一类游戏大部分都是按下按键时转向&#xff0c;释放按键时方向就会自动转正。这种控制方式在实现方面比较容易。但是缺点也很明…

如何实现Canvas图像的拖拽、点击等操作

上一篇Canvas的博文写完后&#xff0c;有位朋友希望能对Canvas绘制出来的图像进行点击、拖拽等操作&#xff0c;因为Canvas绘制出的图像能很好的美化。好像是想做炉石什么的游戏&#xff0c;我也没玩过。 Canvas在我的理解中就好像在一张画布上绘制图像&#xff0c;它只能看到…

MySQL用的在溜,不知道业务如何设计也白搭!!!

MySQL业务设计 作者: 博学谷狂野架构师GitHub&#xff1a;GitHub地址 &#xff08;有我精心准备的130本电子书PDF&#xff09; 只分享干货、不吹水&#xff0c;让我们一起加油&#xff01;&#x1f604; 逻辑设计 范式设计 范式概述 第一范式&#xff1a;当关系模式R的所有属…

浙江省CIO峰会|数据安全+数字化转型,美创特色实践获“年度数字化赋能服务商”

近日&#xff0c;浙江省CIO年度峰会暨数实融合创新发展大会在杭州成功举办。美创科技受邀参加本次峰会&#xff0c;与全省数字化领袖人才共话数字化发展。 对话数字化转型 美创分享能力实践 在本次峰会以“数字化转型的昨天 今天 明天”为主题的论坛对话环节&#xff0c;美创科…

Win11启用docker报错

这里写自定义目录标题 An unexpected error was encountered while executing a WSLcommand. An unexpected error was encountered while executing a WSLcommand. An unexpected error was encountered while executing a WSLcommand. provisioning docker WSL distros: se…

M2M场景之客户端凭证模式|OIDC OAuth2.0 认证协议最佳实践系列 【4】

在前两篇文章中&#xff0c;我们介绍了 OIDC 授权码以及授权码增强的 PKCE 模式&#xff0c;本次我们将重点围绕 &#xff08;Client Credentials&#xff09; 模式进行讲解 &#xff0c;Client Credentials 模式是 OIDC 授权模式之一&#xff0c;它是一种用于客户端&#xff0…

微信小程序开发一个多少钱

小程序开发是当前比较流行的一项技术服务&#xff0c;能够为企业和个人带来巨大的商业价值和社会价值&#xff0c;但是小程序开发费用也是潜在的成本之一。在选择小程序开发服务时&#xff0c;了解开发费用如何计算、影响价格的因素以及如何降低成本等方面的知识&#xff0c;可…

055:cesium两种方法加载天地影像图

第055个 点击查看专栏目录 本示例的目的是介绍如何在vue+cesium中用两种方法加载天地影像图。一种是利用WebMapTileServiceImageryProvider,另一种是利用UrlTemplateImageryProvider. 直接复制下面的 vue+cesium源代码,操作2分钟即可运行实现效果. 文章目录 示例效果配置方…

bug记录:c++ mysql Connector:Lost connection to MySQL server during query

1.背景 使用mysql connector1.1.4版本&#xff0c;代码中有 mysql 连接池&#xff0c;每次执行 sql 时从连接池取出一个连接&#xff0c;先用isClosed()判断为false继续使用&#xff0c;否则创建新连接。     现在升级 mysql connector为1.1.13版本&#xff0c;业务代码未修…

Linux进程概念——其二

目录 环境变量 基本概念 常见环境变量 查看环境变量方法 测试PATH&#xff3b;重点&#xff3d; 测试HOME 和环境变量相关的命令 环境变量的组织方式 通过代码获取环境变量 通过系统调用获取或设置环境变量 环境变量通常是具有全局属性的&#xff3b;重点&#xff3d…

学会笔记本电脑录屏快捷键,轻松实现录屏!

案例&#xff1a;笔记本电脑录屏有快捷键吗&#xff1f; 【我每次打开笔记本电脑录屏都要耗费比较长的时间&#xff0c;这样会影响到我录屏的效率。在这里想问一下&#xff0c;有没有快速打开电脑录屏的方法&#xff1f;】 在日常的工作、学习、娱乐中&#xff0c;我们经常需…

算法训练第一周题解汇总

A - Sort the Subarray 大意&#xff1a;在s1找一个最大的 [l&#xff0c;r] 子区间&#xff0c;使其经过从小到大的排序后 能够变成 s2 题解&#xff1a;先确定最小的区间&#xff0c;然后慢慢扩大。 最小区间的确定&#xff1a;s1和s2第一个不相等的数开始&#xff0c;到最后…

浅谈测试用例设计 | 京东云技术团队

作者&#xff1a;京东物流 王莹莹 一、测试用例为什么存在 1.1 定义 测试用例(Test Case)是指对特定的软件产品进行测试任务的描述&#xff0c;体现测试方案、方法、技术和策略。测试用例内容包括测试目标、测试环境、输入数据、测试步骤、预期结果、测试脚本等&#xff0c;…

pytorch 计算网络模型的计算量FLOPs和参数量parameter之殊途同归

计算网络模型的计算量FLOPs和参数量parameter之殊途同归 参数量方法一&#xff1a;pytorch自带方法&#xff0c;计算模型参数总量参数量方法二&#xff1a; summary的使用&#xff1a;来自于torchinfo第三方库参数量方法三&#xff1a; summary的使用&#xff1a;来自于torchsu…

controlnet1.1预处理器功能详解

ControlNet 1.1 与 ControlNet 1.0 具有完全相同的体系结构,ControlNet 1.1 包括所有以前的模型&#xff0c;具有改进的稳健性和结果质量,但增加并细化了多个模型。 今天太忙了&#xff0c;有时间就把每个模型的测试样稿发出来 2023.4.27 分类预处理器备注模型黑白倒转invert边…