【MQ05】异常消息处理

news2024/12/25 9:58:40

异常消息处理

上节课我们已经学习到了消息的持久化和确认相关的内容。但是,光有这些还不行,如果我们的消费者出现问题了,无法确认,或者直接报错产生异常了,这些消息要怎么处理呢?直接丢弃?这就是丢消息了呀。再次处理?一直继续报错怎么办?这条消息就永远都在不停报错的死循环中了。

通常,消息队列系统都会提供一套对于异常消息的处理机制,比如 RabbitMQ 的死信队列。

RabbitMQ死信队列

死信队列,其实就是在满足一定规则的前提下,将消息发送到指定的一个交换机队列中。这些规则包括:

  • 使用者使用basic.reject或basic.nack(重新排队参数设置为false)对消息进行否定确认。

  • 消息过期,根据队列的消息 TTL 过期时间而定。

  • 消息被丢弃,因为队列超过了长度限制。

前面两个好测试,最后一个要修改 RabbitMQ 的配置稍微麻烦点,那么咱们就用前面两个来测。

首先,要定义一个用于接收死信消息的交换机和队列,我们顺便也直接做一个客户端消费者,专门读取死信队列里的消息。这个就相当于是正规队列消费者处理出现问题之后,再由这个消费者来做善后。

// 5.rq.c.deadletter.php
// ……………………
// 建立连接
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel(); // 获取频道

// 死信队列及交换机
$channel->exchange_declare('dead_letter', 'direct', false,true,false); // 定义交换机
$channel->queue_declare('dead_letter_queue', false, true); // 定义队列
$channel->queue_bind('dead_letter_queue', 'dead_letter'); // 队列绑定交换机


echo "等待死信队列消息,或者使用 Ctrl+C 退出程序。", PHP_EOL;

// 定义接收数据的回调函数
$callback = function ($msg) {
    echo '死信队列接收到数据: ', $msg->body, PHP_EOL;
};

// 消费队列,获取到数据将调用 callback 回调函数
$channel->basic_consume('dead_letter_queue', '', false, true, false, false, $callback);

while ($channel->is_open()) {
    $channel->wait();
}

交换机名字和队列名字自己起就好了。前面已经说过了,这个消费者获取到的死信队列数据都是正常消费有问题的,那么善后工作咱们就可以将这些数据记录日志或者记录到数据库,顺便发邮件、发短信提醒,或者做任何你想做的通知及记录工作。出于测试的目的,咱们就是简单打印了一下。

> php 5.rq.c.deadletter.php
等待死信队列消息,或者使用 Ctrl+C 退出程序。

启动之后就等着死信数据的到来吧。接下来,我们要修改一下正常的队列,增加一些参数。

// 5.rq.c.php
// ……………………
$channel->queue_declare('hello', false, true, false, false, false, new AMQPTable([
    'x-message-ttl'=>10000, // 10秒过期
    'x-dead-letter-exchange'=>'dead_letter', // 死信到某个交换机
    'x-dead-letter-routing-key'=>'', // 死信路由
]));

// ……………………

$callback = function ($msg) {
    echo '接收到数据: ', $msg->body, PHP_EOL;
    $msg->nack();
};

// ……………………

x-message-ttl 是消息的过期时间,我们后面要用它来测过期时间进入死信队列的情况。x-dead-letter-exchange 用于定义出现问题后,将这个队列的数据放到哪个死信队列交换机中。x-dead-letter-routing-key 这个是指定进入死信队列的哪个路由。关于 RabbitMQ 交换机和路由的内容,如果有不清楚的小伙伴,可以在深入地学习一下 RabbitMQ 的官方文档和示例哦。

然后,在回调函数中,我们直接调用 $msg->nack() 。这个表示的就是手动取消确认,还记得上节课我们学过的是 $msg->ack() 吧,这个表示的是确认处理完成,没问题了,而 nack() 就是反过来的,说明当前的队列处理有问题,没有正常 ACK 。这对应的就是死信队列的第一条规则。

现在,向普通的 hello 消息队列中发送消息,结果死信队列中会接收到数据。

> php 5.rq.c.deadletter.php
等待死信队列消息,或者使用 Ctrl+C 退出程序。
死信队列接收到数据: Hello World!

过期时间

好了,上面测试的结果就是死信队列的第一条规则。接下来我们测试第二条规则。

在 hello 队列的配置中,我们加上的 x-message-ttl 是 10 秒,也就是说,这条消息 10 秒不处理就会进入到死信队列。这个非常好测,直接关掉消费者,然后生产者发送数据,等到过期时间后,死信队列就会显示接收到数据了。

这个规则可以帮我们实现一个非常重要的功能,就是:延时队列。RabbitMQ 中没有直接的延时队列相关的功能,但可以通过死信的这个规则来实现,具体内容我们下节课再说。

Redis 队列在 Laravel 框架中处理异常消息

好了,看完 RabbitMQ 的相关异常处理功能之后,我们马上会联想到,Redis 有这样的功能吗?

抱歉,真的没有,但是,Laravel 和 TP 框架的队列功能都通过业务代码的形式实现了类似的功能。我们还是以 Laravel 为例进行学习。

在 Laravel 中,异常的消息队列数据最后会保存到 MySQL 数据库中,我们需要执行数据迁移来创建表,使用下面这两个命令。

php artisan queue:failed-table
php artisan migrate

操作成功之后,会在数据库中创建一个名为 failed_jbs 的表。接下来,还是继续拿上次课创建的那个最后会报异常的 Job 来进行测试,直接调用生产者的命令插入队列。

> php artisan q:p4

然后,我们不使用 --tries ,这样就不会进行重试了,一次失败就会进入到异常处理流程中,也就是插入到数据库中。

> php artisan queue:work
[2023-01-03 01:58:12][gAfPH03MvGWQXjjjk5nAFWtU7b3gkhqD] Processing: App\Jobs\Queue4
接收到了消息:测试 1672711092
[2023-01-03 01:58:22][gAfPH03MvGWQXjjjk5nAFWtU7b3gkhqD] Failed:     App\Jobs\Queue4

好了,去数据库看一眼吧,数据是不是插入进来了。

36aa3a7d22ca85b13e1dd118f614099f.png

从截图上可以看到,不仅有原始的队列信息,还有异常信息、队列使用的连接以及队列名、uuid 和失败时间这些字段。在结构上,还为 uuid 创建了一个唯一索引,这个 uuid 的作用我们后面马上就会看到。

接下来,使用命令行,我们还可以看到所有失败队列的信息。

> php artisan queue:failed

+--------------------------------------+------------+---------+-----------------+---------------------+
| ID                                   | Connection | Queue   | Class           | Failed At           |
+--------------------------------------+------------+---------+-----------------+---------------------+
| c8774e27-1284-4657-a13a-7e5665789d74 | redis      | default | App\Jobs\Queue4 | 2023-01-03 01:58:22 |
| 4a38d37b-86e7-4755-a29a-f6843b7289cc | redis      | default | App\Jobs\Queue4 | 2022-12-31 03:57:23 |
| f4fa4cfd-cee2-4199-b048-c21dbbc188ca | redis      | default | App\Jobs\Queue4 | 2022-12-31 03:57:03 |
| 210c9716-89f1-438b-b252-32cc6552a68f | redis      | default | App\Jobs\Queue4 | 2022-12-31 03:52:57 |
| c971ddef-9098-4399-a404-bbce08bd97af | redis      | default | App\Jobs\Queue4 | 2022-12-31 03:52:41 |
+--------------------------------------+------------+---------+-----------------+---------------------+

然后,可以利用 uuid ,也就是上面命令中返回的 ID ,来让数据进行消息重试,使用 queue:retry 命令。

> php artisan queue:retry c8774e27-1284-4657-a13a-7e5665789d74 
The failed job [c8774e27-1284-4657-a13a-7e5665789d74] has been pushed back onto the queue!

执行之后,这条失败的数据又塞回之前的队列里了,消费者又会开始对它进行消费。这就是 uuid 的作用。另外,我们还可以批量执行重试,直接在命令后面写多个 uuid 就行。也可以一次性全部执行重试,只需要使用 all 参数即可,这个大家可以去官方文档再详细看一下。

我们还可以删除或者整个清除所有的失败任务数据,其实也就是删除 failed_jobs 中的数据。queue:forget 用于根据指定的 uuid 进行删除,而 queue:flush 则会清除所有数据。除了这两个命令之外,还有一个根据时间来清除失败任务的命令 queue:prune-failed 。它默认是默认 24 小时,可以用 --hours=xxx 来设置具体的时间。

> php artisan queue:prune-failed
4 entries deleted!

最后,我们还可以禁用失败存储。直接通过 .env 配置文件进行配置就行了,设置对应的属性值为 null 即可。

QUEUE_FAILED_DRIVER=null

任务错误处理

除了上面的失败处理之外,在 Laravel 中,还可以在出现错误的时候马上去执行一个方法,就像是失败事件后的回调函数一样。通过这个方法,我们可以在任务失败的时候马上就进行邮件、短信通知,或者也可以记录错误日志,甚至也可以不使用上面默认的异常处理功能以及相关的表,直接在这里用我们自己自定义的表来存储失败任务的信息。总之就是,任务失败后你想怎么处理都行。

只需要在任务类中实现 failed() 方法。

// /app/Jobs/Queue4.php
// ……………………
// ……………………
public function failed($exception = null)
{
  echo '如果发生错误就进入到这里了,错误信息是:'.$exception->getMessage(), PHP_EOL;
}

然后看一下效果。

> php artisan queue:work
[2023-01-03 02:19:00][d5MlKJkjnez3MBnqzrYCUkG1HSg1LJtC] Processing: App\Jobs\Queue4
接收到了消息:测试 1672712340
如果发生错误就进入到这里了,错误信息是:
[2023-01-03 02:19:10][d5MlKJkjnez3MBnqzrYCUkG1HSg1LJtC] Failed:     App\Jobs\Queue4

这个方法的执行是同步的,不是异步的,就像我们前面说的,任务失败了马上就会调用这个方法。因此,就可以安全有效地在这个方法中进行我们任何想要善后处理的操作了。

总结

好了,到现在为止,我们的队列系统其实已经相当安全可靠了。上一篇文章通过持久化和 ACK 机制解决了消息丢失的问题,这次即使是消费者出现了异常,我们也可以保证消息能够通过死信队列或者框架机制保存下来。从这里我们也可以看出,即使原生的系统不支持,也可以通过框架代码的形式来实现类似的功能,这也是队列这种系统灵活性的表现。补充一点,BLMOVE 这类 Redis 命令其实也可以实现消息备份,但和上面死信那种触发条件还是有区别,这是主动备份。再有,Redis 的 Stream 类型其实也已经是很完备的一套消息队列功能机制了,未应答 ACK 的数据是可以重复执行的,这也可以当成是一种异常处理的形式,只不过也一样需要我们自己编码干预进行转移,可以参考我们之前 Redis 系列中相关的内容:【Redis09】Redis基础:Stream操作 https://mp.weixin.qq.com/s/DsSnyU9xuJEijm9t-DVs2A 。

接下来,我们再看两种常见的队列形式,分别是延时队列和优先级队列,它们在 RabbitMQ 和 Laravel+Redis 中的实现又是怎样的呢?

测试代码:

https://github.com/zhangyue0503/dev-blog/blob/master/%E6%B6%88%E6%81%AF%E9%98%9F%E5%88%97/source/5.rq.c.deadletter.php

https://github.com/zhangyue0503/dev-blog/blob/master/%E6%B6%88%E6%81%AF%E9%98%9F%E5%88%97/source/5.rq.c.php

https://github.com/zhangyue0503/dev-blog/blob/master/%E6%B6%88%E6%81%AF%E9%98%9F%E5%88%97/source/5.rq.p.php

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1472888.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

深入理解计算机系统学习笔记

第三章 程序的机器级表示 3.2.1 机器级代码 对于机器级编程来说,其中两种抽象尤为重要。第一种是由捍令集体系结构或指令集架构(Instruction Set Architecture, ISA)来定义机器级程序的 格式和行为,它定义了处理器状态、指令的格式&#xf…

在Ubuntu上为ARM 8处理器安装Python 3.10.4虚拟环境指南

在Ubuntu上为ARM 8处理器安装Python 3.10.4虚拟环境指南 安装Anaconda或Miniconda: 首先,您需要从官方网站下载适用于ARM架构的Anaconda或Miniconda安装包。下载完成后,在终端中使用bash Anaconda3-2019.10-Linux-armv8.sh(文件…

将仓库A中的部分提交迁移到仓库B中

结论: 使用git format-patchgit am即可实现 使用场景: 例如仓库A这里有5个提交记录,commitid1, commitid2, commitid3, commitid4,commitid5 仓库B想用仓库A中提交的代码,手动改比较慢,当改动较多的时候…

2.26 Qt day4+5 纯净窗口移动+绘画事件+Qt实现TCP连接服务+Qt实现连接数据库

思维导图 Qt实现TCP连接 服务器端&#xff1a; widget.h #ifndef WIDGET_H #define WIDGET_H#include <QWidget> #include<QTcpServer>//服务器端类 #include<QTcpSocket>//客户端类 #include<QMessageBox>//消息对话框类 #include<QList>//链…

2024-02-26(Spark,kafka)

1.Spark SQL是Spark的一个模块&#xff0c;用于处理海量结构化数据 限定&#xff1a;结构化数据处理 RDD的数据开发中&#xff0c;结构化&#xff0c;非结构化&#xff0c;半结构化数据都能处理。 2.为什么要学习SparkSQL SparkSQL是非常成熟的海量结构化数据处理框架。 学…

实践航拍小目标检测,基于轻量级YOLOv8n开发构建无人机航拍场景下的小目标检测识别分析系统

关于无人机相关的场景在我们之前的博文也有一些比较早期的实践&#xff0c;感兴趣的话可以自行移步阅读即可&#xff1a; 《deepLabV3Plus实现无人机航拍目标分割识别系统》 《基于目标检测的无人机航拍场景下小目标检测实践》 《助力环保河道水质监测&#xff0c;基于yolov…

掌握ChatGPT润色绝技:什么是人工智能写作以及如何使用它来完成写作任务

如对AI写论文感兴趣&#xff0c;欢迎添加作者wx讨论 : ryan_2982 人工智能 (AI) 的出现开创了技术进步的新时代&#xff0c;彻底改变了包括写作和内容创作在内的各个行业。人工智能写作和人工智能提示已成为可以简化和增强写作任务的强大工具。在这篇博文中&#xff0c;我们将…

C++多线程学习09:并发队列

参考 链接&#xff1a;恋恋风辰官方博客 并发队列&线程安全栈 代码结构&#xff1a; 并发队列ThreadSafeQueue.h&#xff1a; #pragma once#include <mutex> #include <queue>template<typename T> class threadsafe_queue { private:mutable std::m…

深入理解Python中的JSON模块:基础大总结与实战代码解析【第102篇—JSON模块】

深入理解Python中的JSON模块&#xff1a;基础大总结与实战代码解析 在Python中&#xff0c;JSON&#xff08;JavaScript Object Notation&#xff09;模块是处理JSON数据的重要工具之一。JSON是一种轻量级的数据交换格式&#xff0c;广泛应用于Web开发、API通信等领域。本文将…

linux操作系统期末练习题

背景&#xff1a; 一、远程登录 1&#xff0e;利用远程登录软件&#xff0c;以用户userManager(密码123456)&#xff0c;远程登录教师计算机&#xff08;考试现场给出IP地址&#xff09;&#xff0c;只有操作&#xff0c;没有命令。 2&#xff0e;以stu班级学生个人学号后3位…

goland配置新增文件头

参考&#xff1a; goland函数注释生成插件 goland函数注释生成插件_goland自动加函数说明-CSDN博客 GoLand 快速添加方法注释 GoLand 快速添加方法注释_goland批量注释-CSDN博客 goland 如何设置头注释&#xff0c;自定义author和data goland 如何设置头注释&#xff0c;自定…

spring boot 集成科大讯飞星火认知大模型

一、安装依赖 <?xml version"1.0" encoding"UTF-8"?> <project xmlns"http://maven.apache.org/POM/4.0.0" xmlns:xsi"http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation"http://maven.apache.org/POM/…

springboot003图书个性化推荐系统的设计与实现(源码+调试+LW)

项目描述 临近学期结束&#xff0c;还是毕业设计&#xff0c;你还在做java程序网络编程&#xff0c;期末作业&#xff0c;老师的作业要求觉得大了吗?不知道毕业设计该怎么办?网页功能的数量是否太多?没有合适的类型或系统?等等。今天给大家介绍一篇基于SpringBoot的图书个…

SOLIDWORKS 查找并修复装配体配合错误

我们在SOLIDWORKS 正版软件进行装配体装配时&#xff0c;时常会出现一些报错&#xff0c;例如在配合、装配体特征或被装配体参考引用的零部件和子装配体中。一些常见的错误&#xff0c;如一个零部件的过定义会引发更多其他错误信息&#xff0c;并导致装配体停止解析配合关系。下…

RestTemplate启动问题解决

⭐ 作者简介&#xff1a;码上言 ⭐ 代表教程&#xff1a;Spring Boot vue-element 开发个人博客项目实战教程 ⭐专栏内容&#xff1a;个人博客系统 ⭐我的文档网站&#xff1a;http://xyhwh-nav.cn/ RestTemplate启动问题解决 问题&#xff1a;在SpringCloud架构项目中配…

汽车大灯尾灯划痕裂缝破洞破损掉角崩角等如何修复?根本没必要换车灯换总成,使用无痕修UV树脂胶液即可轻松搞定。

TADHE车灯无痕修复专用UV胶是一种经过处理的UV树脂胶&#xff0c;主要成份是改性丙烯酸UV树脂。应用在车灯的专业无痕修复领域。 车灯修复UV树脂有以下优点&#xff1a; 1. 快速修复&#xff1a;此UV树脂是一种用UV光照射在10秒内固化的材料。 2. 高强度&#xff1a;UV树脂固…

【npm下载包报错:CERT_HAS_EXPIRED,问题解决】

npm下载包报错&#xff1a;CERT_HAS_EXPIRED npm安装依赖的时候出现报错 根据第三行报错的提示得知报错原因是证书已过期 上网一查&#xff0c;原来常用的淘宝镜像早就换新域名了&#xff0c; 之前的镜像域名在2024年1月22日https证书到期了 替换为最新的地址就可以了 npm …

蛋白结构预测模型评价指标

欢迎浏览我的CSND博客&#xff01; Blockbuater_drug …点击进入 文章目录 前言一、蛋白结构预测模型评价指标TM-scorelDDT 二、Alphafold中的评价指标pLDDTpTMPAE 三、AlphaFold-multimer 蛋白结构的评价指标DockQipTM 总结参考资料 前言 本文汇总了AlphaFold和AlphaFold-mul…

线性表——单链表的增删查改(下)

本节继续上节未完成的链表增删查改接口的实现。这是上节的地址:线性表——单链表的增删查改&#xff08;上&#xff09;-CSDN博客 上节实现的接口如下&#xff1a; //申请链表节点函数接口 SLNode* BuySListNode(SLTDataType x); //单链表的打印函数接口 void SListPrint(SLNod…

探索比特币现货 ETF 对加密货币价格的潜在影响

撰文&#xff1a;Sean&#xff0c;Techub News 文章来源Techub News&#xff0c;搜Tehub News下载查看更多Web3资讯。 自美国比特币现货交易所交易基金&#xff08;ETF&#xff09;上市以来&#xff0c;比特币现货 ETF 的相关信息无疑成为了影响比特币价格及加密货币市场走向…