RabbitMQ消息可靠性保证机制6--可靠性分析

news2025/1/22 18:51:44

在使用消息中间件的过程中,难免会出现消息错误或者消息丢失等异常情况。这个时候就需要有一个良好的机制来跟踪记录消息的过程(轨迹溯源),帮助我们排查问题。

在RabbitMQ中可以使用Firehose实现消息的跟踪,Firehose可以记录每一次发送或者消息的记录,方便RabbitMQ的使用都进行调试、排错等。

FireHose的原理是将生产者投递给RabbitMQ的消息,或者RabbitMQ投递给消费者的消息按照指定的格式,发送到默认交换器上,这个默认交换器的名称是:amq.rabbitmq.trace它是一个topic类型的交换器。发送到交换器的消息的路由键为publis.{exchangename}deliver.{queuename}。其中exchangename和queuename为交换器和队列名字。分别对应生产者投递到交换器的消息和消费者从队列中获取的消息。

在这里插入图片描述

上图是一个样例。生产者将消息发送至trace.ex交换器,交换器将消息路由至trace.qu这个队列,然后由消息者将消息取走。当消息到达trace.ex这个队列后,消息就会投递一份到名称为amq.rabbitmq.trace的交换器,按收到交换器的名称加上一个前缀变更publish.trace.ex作为路由的KEY,投递一份至publishtrace这个队列中;接收消息也样如此,当消费都取走消息时,会将消息发送一份到名称为amq.rabbitmq.trace的交换器,按消费者队列的名称加一个前缀变成deliver.trace.qu作为路由的KEY,投递至delivertrace这个队列中。

Firehose命令:

# 开启命令
rabbitmqctl trace_on [-p vhose]
# [-p vhose]是可选参数,用来指定虚拟主机的vhose

# 关闭命令
rabbitmqctl trace_off [-p vhose]

Firehose默认情况下处于关闭状态,并且Firehose的状态是非持久化的,会在RabbitMQ服务重启的时候还原成默认的状态。Firehose开启之后会影响RabbitMQ整体服务性能,因为它会引起额外的消息生成、路由和存储 。

7.9.1 Firehose验证

首先开启追溯

[root@nullnull-os rabbitmq]# rabbitmqctl trace_on -p / 
Starting tracing for vhost "/" ...
Trace enabled for vhost /

生产者

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;

public class TraceProduce {
  public static void main(String[] args) throws Exception {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setUri("amqp://root:123456@node1:5672/%2f");

    try (Connection connection = factory.newConnection();
        Channel channel = connection.createChannel(); ) {
      // 定义交换机
      channel.exchangeDeclare("trace.ex", BuiltinExchangeType.DIRECT, false, true, null);
      // 定义队列
      channel.queueDeclare("trace.qu", false, false, true, null);
      // 队列绑定
      channel.queueBind("trace.qu", "trace.ex", "");

      // 定义保留数据队列
      channel.queueDeclare("publishtrace", false, false, false, null);
      // 绑定
      channel.queueBind("publishtrace", "amq.rabbitmq.trace", "publish.trace.ex");

      for (int i = 0; i < 100; i++) {
        String msg = "这是发送的消息:" + i;
        channel.basicPublish("trace.ex", "", null, msg.getBytes(StandardCharsets.UTF_8));
      }
    } catch (IOException e) {
      e.printStackTrace();
    } catch (TimeoutException e) {
      e.printStackTrace();
    }
  }
}

检查队列的信息:

[root@nullnull-os rabbitmq]#  rabbitmqctl list_queues name,messages_ready,messages_unacknowledged,messages,consumers  --formatter pretty_table
Timeout: 60.0 seconds ...
Listing queues for vhost / ...
┌──────────────┬────────────────┬─────────────────────────┬──────────┬───────────┐
│ name         │ messages_ready │ messages_unacknowledged │ messages │ consumers │
├──────────────┼────────────────┼─────────────────────────┼──────────┼───────────┤
│ publishtrace │ 10001000         │
├──────────────┼────────────────┼─────────────────────────┼──────────┼───────────┤
│ trace.qu     │ 10001000         │
└──────────────┴────────────────┴─────────────────────────┴──────────┴───────────┘
[root@nullnull-os rabbitmq]# 

这样生产者发送的消息就已经被保存至publishtrace中了,后缀便可以通过检查队列中的消息,检查消息内容。

消费者

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.GetResponse;
import java.nio.charset.StandardCharsets;

public class TraceConsumer {
  public static void main(String[] args) throws Exception {

    // 资源限制
    ConnectionFactory factory = new ConnectionFactory();
    factory.setUri("amqp://root:123456@node1:5672/%2f");

    try (Connection connection = factory.newConnection();
        Channel channel = connection.createChannel(); ) {

      // 定义交换机
      channel.exchangeDeclare("trace.ex", BuiltinExchangeType.DIRECT, false, true, null);
      // 定义队列
      channel.queueDeclare("trace.qu", false, false, true, null);
      // 队列绑定
      channel.queueBind("trace.qu", "trace.ex", "");

      // 定义队列
      channel.queueDeclare("delivertrace", false, false, true, null);
      // 队列绑定
      channel.queueBind("delivertrace", "amq.rabbitmq.trace", "deliver.trace.qu");

      // 接收消息
      for (int i = 0; i < 25; i++) {
        GetResponse getResponse = channel.basicGet("trace.qu", true);
        String msg = new String(getResponse.getBody(), StandardCharsets.UTF_8);
        System.out.println("收到的消息:" + msg);
      }
    } catch (Exception e) {
      e.printStackTrace();
    }
  }
}

此处采用的是拉模式,从队列中获取了25条记录,也就是说队列中还剩余75条记录。

首先查看控制台输出:

收到的消息:这是发送的消息:0
收到的消息:这是发送的消息:1
收到的消息:这是发送的消息:2
收到的消息:这是发送的消息:3
......
收到的消息:这是发送的消息:22
收到的消息:这是发送的消息:23
收到的消息:这是发送的消息:24

检查队列的情况:

[root@nullnull-os rabbitmq]#  rabbitmqctl list_queues name,messages_ready,messages_unacknowledged,messages,consumers  --formatter pretty_table
Timeout: 60.0 seconds ...
Listing queues for vhost / ...
┌──────────────┬────────────────┬─────────────────────────┬──────────┬───────────┐
│ name         │ messages_ready │ messages_unacknowledged │ messages │ consumers │
├──────────────┼────────────────┼─────────────────────────┼──────────┼───────────┤
│ delivertrace │ 250250         │
├──────────────┼────────────────┼─────────────────────────┼──────────┼───────────┤
│ publishtrace │ 10001000         │
├──────────────┼────────────────┼─────────────────────────┼──────────┼───────────┤
│ trace.qu     │ 750750         │
└──────────────┴────────────────┴─────────────────────────┴──────────┴───────────┘
[root@nullnull-os rabbitmq]# 

可以发现,拉的消息,都已经被推送到了delivertrace中了。

最后关闭Tracehose

[root@nullnull-os rabbitmq]# rabbitmqctl trace_off -p / 
Stopping tracing for vhost "/" ...
Trace disabled for vhost /

使用Firehose验证完成。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2253735.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

RAG评估指南:从检索到生成,全面解析LLM性能评估方法

前言 这一节我们将从时间线出发对RAG的评估方式进行对比&#xff0c;这些评估方式不仅限于RAG流程之中&#xff0c;其中基于LLM的评估方式更加适用于各行各业。 RAG常用评估方式 上一节我们讲了如何用ROUGE 这个方法评估摘要的相似度&#xff0c;由于篇幅限制&#xff0c;没…

高危端口汇总(Summary of High-Risk Ports)

高危端口汇总 能关闭就关闭 &#x1f49d;&#x1f49d;&#x1f49d;欢迎来到我的博客&#xff0c;很高兴能够在这里和您见面&#xff01;希望您在这里可以感受到一份轻松愉快的氛围&#xff0c;不仅可以获得有趣的内容和知识&#xff0c;也可以畅所欲言、分享您的想法和见解…

电子病历静态数据脱敏路径探索

一、引言 数据脱敏&#xff08;Data Masking&#xff09;&#xff0c;屏蔽敏感数据&#xff0c;对某些敏感信息&#xff08;比如patient_name、ip_no、ad、no、icd11、drug等等 &#xff09;通过脱敏规则进行数据的变形&#xff0c;实现隐私数据的可靠保护。电子病历作为医疗领…

黑马微服务开发与实战学习笔记_导论

系列博客目录 文章目录 系列博客目录为什么学微服务&#xff1f;定义 为什么学微服务&#xff1f; 从下图搜索指数可以看出&#xff0c;微服务热度不减 公司中很多微服务的应用。 公司岗位要求中很多微服务的身影。 定义 微服务是一种软件架构风格&#xff0c;它是以专注于…

Python从入门到入狱

Python是从入门到入狱&#xff1f;这个充满调侃意味的说法在程序员圈子里流传甚广。表面看&#xff0c;它似乎是在嘲笑这门语言从简单易学到深陷麻烦的巨大反差&#xff0c;实际上却隐藏着很多值得深思的问题。要解读这个话题&#xff0c;得从Python的特点、使用场景以及潜在风…

网安瞭望台第9期:0day 情报,OAuth 2.0授权流程学习

国内外要闻 Veeam 修补服务提供商控制台关键 RCE 漏洞 Veeam 发布了安全更新以解决影响服务提供商控制台&#xff08;VSPC&#xff09;的一个关键漏洞&#xff0c;该漏洞可能为在易受攻击的实例上执行远程代码创造条件。此漏洞被追踪为 CVE-2024-42448&#xff0c;其 CVSS 评分…

Qt复习学习

https://www.bilibili.com/video/BV1Jp4y167R9/?spm_id_from333.999.0.0&vd_sourceb3723521e243814388688d813c9d475f https://subingwen.cn/qt/qt-primer/#1-4-Qt%E6%A1%88%E4%BE%8B https://subingwen.cn/qt/ https://download.qt.io/archive/qt/1.1Qt的特点 1.2QT中的…

视频监控集中管理方案设计:Liveweb视频汇聚方案技术特点与应用

随着科技的发展&#xff0c;视频监控平台在各个领域的应用越来越广泛。然而&#xff0c;当前的视频监控平台仍存在一些问题&#xff0c;如视频质量不高、监控范围有限、智能化程度不够等。这些问题不仅影响了监控效果&#xff0c;也制约了视频监控平台的发展。 为了解决这些问…

【算法】图论——树的重心

目录 题目解析 算法原理 图的存储 算法实现 题目解析 题目解析 给定一颗树&#xff0c;树中包含n个结点&#xff08;编号&#xff09;和n-1条无向边。请找到树的重心&#xff0c;并输出将重心删除后&#xff0c;剩余各个连通块中点数的最大值。 什么是重心&#xff1f; 重…

STM32 进阶 定时器 2基本定时器 基本定时器中断案例:LED闪烁

基本定时器 基本定时器TIM6和TIM7各包含一个16位自动装载计数器&#xff0c;由各自的可编程预分频器驱动。 这2个定时器是互相独立的&#xff0c;不共享任何资源。 这个2个基本定时器只能向上计数&#xff0c;由于没有外部IO&#xff0c;所以只能计时&#xff0c;不能对外部…

51单片机(STC89C52RC版本)学习笔记(更新中...)

文章目录 参考资料1. 准备工作1.1 win10配置51单片机开发环境1.1 Ubuntu配置51单片机开发环境问题1&#xff1a;mcs51/8051.h依赖于mcs51/lint.h问题2&#xff1a;提示找不到头文件mcs51/8051.h 2. 认识51单片机2.1 STC89C52单片机2.2 管脚图2.3 原理图2.4 按键抖动2.5 头文件说…

USB 声卡全解析:提升音频体验的得力助手

在当今数字化的时代&#xff0c;音频领域的追求愈发多元。无论是热衷聆听高品质音乐的爱好者&#xff0c;还是在专业音频工作中精雕细琢的人士&#xff0c;亦或是在游戏世界里渴望极致音效沉浸的玩家&#xff0c;都始终在寻觅能让音频体验更上一层楼的妙法。而 USB 声卡&#x…

计算机的错误计算(一百七十四)

摘要 探讨 MATLAB 关于计算机的错误计算&#xff08;一百七十三&#xff09;中多项式的秦九韶&#xff08;或Horner&#xff09;形式的计算误差。 在计算机的错误计算&#xff08;一百七十三&#xff09;中&#xff0c;我们讨论了一个多项式的计算误差。本节探讨其对应秦九韶&…

Magento2如何创建CRUD Models

Mageno2 Model的创建不同于其他框架&#xff0c;需要3个不同目录层级的文件 例如需要为表hello_test创建model&#xff1a; 1、app/code/Hello/Test/Model/Test.php <?phpnamespace Hello\Test\Model;class Test extends \Magento\Framework\Model\AbstractModel {protec…

Visual Studio 2022 项目配置常用选项

作为一名C++开发者,经常需要配置第三方库,今天来跟大家截图一下,方便大家快速配置: 头文件包含目录: 或者: 库文件包含目录:

基于Vue实现的移动端手机商城项目 电商购物网站 成品源码

&#x1f4c2;文章目录 一、&#x1f4d4;网站题目 二、✍️网站描述 三、&#x1f4da;网站介绍 四、&#x1f310;网站演示 &#x1f4f8;部分截图 &#x1f3ac;视频演示 五、⚙️网站代码 &#x1f9f1;项目结构 &#x1f492;vue代码预览 六、&#x1f527;完整…

PHP使用RabbitMQ(正常连接与开启SSL验证后的连接)

代码中包含了PHP在一般情况下使用方法和RabbitMQ开启了SSL验证后的使用方法&#xff08;我这边消费队列是使用接口请求的方式&#xff0c;每次只从中取出一条&#xff09; 安装amqp扩展 PHP使用RabbitMQ前&#xff0c;需要安装amqp扩展&#xff0c;之前文章中介绍了Windows环…

uniapp h5 vue3 m3u8 和 mp4 外链视频播放

m3u8视频播放 使用mui-player 和hls.js。 安装npm install mui-player hls.js我的版本是"hls.js": "^1.5.17"和"mui-player": "^1.8.1"使用 页面标签&#xff1a; 引用&#xff1a; 点击目录播放视频&#xff1a; m3u8视频播放&a…

给el-table表头添加icon图标,以及鼠标移入icon时显示el-tooltip提示内容

在你的代码中&#xff0c;你已经正确地使用了 el-tooltip 组件来实现鼠标划过加号时显示提示信息。el-tooltip 组件的 content 属性设置了提示信息的内容&#xff0c;placement 属性设置了提示信息的位置。 你需要确保 el-tooltip 组件的 content 属性和 placement 属性设置正…

node.js实现分页,jwt鉴权机制,token,cookie和session的区别

文章目录 1. 分⻚功能2. jwt鉴权机制1.jwt是什么2.jwt的应用3.优缺点 3. cookie&#xff0c;token&#xff0c;session的对比 1. 分⻚功能 为什么要分页 如果数据量很⼤&#xff0c;⽐如⼏万条数据&#xff0c;放在⼀个⻚⾯显⽰的话显然不友好&#xff0c;这时候就需要采⽤分⻚…