(七)「消息队列」之 RabbitMQ 发布者确认(使用 .NET 客户端)

news2025/1/22 17:42:47

发布者确认(Publisher Confirms)

发布者确认是一个 RabbitMQ 扩展,用于实现可靠的发布。当在通道上启用发布者确认时,客户端发布的消息将由代理异步确认,这意味着它们已在服务器端得到处理。

0、引言

先决条件

本教程假设 RabbitMQ 已安装并且正在 本地主机 的标准端口(5672)上运行。如果您使用了不同的主机、端口或凭证,则要求调整连接设置。

获取帮助

如果您在阅读本教程时遇到问题,可以通过邮件列表或者 RabbitMQ 社区 Slack 与 RabbitMQ 官方取得联系。

在本教程中,我们将使用发布者确认来确保已发布的消息已安全到达代理。我们将介绍几种使用发布者确认的策略,并解释它们的利弊。

原文链接:https://www.rabbitmq.com/tutorials/tutorial-seven-dotnet.html

1、在通道上启用发布者确认

发布者确认是 RabbitMQ 对 AMQP 0.9.1 协议的扩展,所以默认情况下它们是不启用的。使用 ConfirmSelect 方法可以在通道层级启用发布者确认:

var channel = connection.CreateModel();
channel.ConfirmSelect();

您必须在期望启用发布者确认的每个通道上调用该方法。确认只需要启用一次,而不是对每条发布的消息都启用。

策略 #1:单独发布消息

让我们从实现带确认的发布的最简单途径开始吧,那就是,发布一条消息并同步等待它确认

while (ThereAreMessagesToPublish())
{
    byte[] body = ...;
    IBasicProperties properties = ...;
    channel.BasicPublish(exchange, queue, properties, body);
    // uses a 5 second timeout
    channel.WaitForConfirmsOrDie(TimeSpan.FromSeconds(5));
}

在前面的示例中我们像往常一样发布消息并使用 Channel#WaitForConfirmsOrDie(TimeSpan) 方法等待它确认。该方法在消息确认后立即返回。如果在超时时间内消息未得到确认或者如果消息已 nack(Negative-Acknowledgement) 了(意味着代理由于某些原因无法处理它),方法会抛出一个异常。异常的处理通常包括记录一个错误消息日志 并/或 重新尝试发送消息。

不同的客户端库有不同的方式去同步处理发布者确认,所以确保仔细阅读您正在使用的客户端的文档。

这个技术非常简单但也有一个巨大的缺点:它会显著降低发布速度,因为某条消息的确认会堵塞后续消息的发布。这种方法提供的吞吐量不会超过每秒几百条已发布的消息。不过,这对于某些应用程序来说已经足够好了。

发布者确认是异步的吗

在开头我们提到代理是异步确认已发布的消息的,但在第一个例子中,代码是同步等待直至消息确认的。客户端实际上异步接收确认,并相应地解除对 WaitForConfirmsOrDie 的调用阻塞。可以将 WaitForConfirmsOrDie 看作是一个同步 helper,它依赖于底层的异步通知。

策略 #2:批量发布消息

为了改进上面的例子,我们可以发布一批消息并等待这一批消息全部得到确认。如下是一个使用 100 一批次的示例:

var batchSize = 100;
var outstandingMessageCount = 0;
while (ThereAreMessagesToPublish())
{
    byte[] body = ...;
    IBasicProperties properties = ...;
    channel.BasicPublish(exchange, queue, properties, body);
    outstandingMessageCount++;
    if (outstandingMessageCount == batchSize)
    {
        channel.WaitForConfirmsOrDie(TimeSpan.FromSeconds(5));
        outstandingMessageCount = 0;
    }
}
if (outstandingMessageCount > 0)
{
    channel.WaitForConfirmsOrDie(TimeSpan.FromSeconds(5));
}

等待一批消息的确认比等待单个消息的确认大大提高了吞吐量(在远程 RabbitMQ 节点上最多可提高 20-30 倍)。一个缺点是,如果出现故障,我们不知道究竟是哪里出了问题,因此我们可能不得不在内存中保存整个批处理,以记录一些有意义的内容或重新发布消息。这个解决方案仍然是同步的,因此它阻止消息的发布。

策略 #3:异步处理发布者确认

代理异步确认已发布的消息,只需要在客户端上注册一个回调就可以收到这些确认的通知:

var channel = connection.CreateModel();
channel.ConfirmSelect();
channel.BasicAcks += (sender, ea) =>
{
  // code when message is confirmed
};
channel.BasicNacks += (sender, ea) =>
{
  //code when message is nack-ed
};

这儿有两个回调:一个用于已确认的消息,一个用于已 nack 的消息(可以认为是代理丢失的消息)。两个回调都有相应的 EventArgs 参数(ea)包含:

delivery tag
标识已确认或已 nack 消息的序列号。我们将很快看到如何将其与发布的消息关联起来。
multiple
这是一个布尔值。如果为 false,则仅有一条消息确认/nack-ed;如果为 true,所有序列号 小于等于该序列号的消息都确认/nack-ed。

在发布前,可以通过 Channel#NextPublishSeqNo 获得消息的序列号:

var sequenceNumber = channel.NextPublishSeqNo;
channel.BasicPublish(exchange, queue, properties, body);

将消息与序列号关联起来的一种简单方法是使用字典。让我们假设我们想要发送字符串,因为它们很容易转换为用于发布的字节数组。下面是一个代码示例,它使用字典将发布序列号与字符串消息体关联起来:

var outstandingConfirms = new ConcurrentDictionary<ulong, string>();
// ... code for confirm callbacks will come later
var body = "...";
outstandingConfirms.TryAdd(channel.NextPublishSeqNo, body);
channel.BasicPublish(exchange, queue, properties, Encoding.UTF8.GetBytes(body));

发布代码现在使用字典跟踪出站消息。我们需要在确认到达时清理字典,并在消息已 nack 时做一些类似于记录警告的事情:

var outstandingConfirms = new ConcurrentDictionary<ulong, string>();

void CleanOutstandingConfirms(ulong sequenceNumber, bool multiple)
{
    if (multiple)
    {
        var confirmed = outstandingConfirms.Where(k => k.Key <= sequenceNumber);
        foreach (var entry in confirmed)
        {
            outstandingConfirms.TryRemove(entry.Key, out _);
        }
    }
    else
    {
        outstandingConfirms.TryRemove(sequenceNumber, out _);
    }
}

channel.BasicAcks += (sender, ea) => CleanOutstandingConfirms(ea.DeliveryTag, ea.Multiple);
channel.BasicNacks += (sender, ea) =>
{
    outstandingConfirms.TryGetValue(ea.DeliveryTag, out string body);
    Console.WriteLine($"Message with body {body} has been nack-ed. Sequence number: {ea.DeliveryTag}, multiple: {ea.Multiple}");
    CleanOutstandingConfirms(ea.DeliveryTag, ea.Multiple);
};

// ... publishing code

先前的示例中包含一个在确认到达时清理字典的回调。注意,这个回调处理单次和多次确认。这个回调会在确认到达(Channel#BasicAcks)时被使用。用于已 nack 消息的回调将检索消息体并发出警告。然后,它重用之前的回调来清除字典中未完成的确认(无论消息是已确认还是已 nack,都必须删除字典中对应的条目)。

如何跟踪未完成的确认?

我们的示例使用一个 ConcurrentDictionary 跟踪未完成的确认。由于几种原因,这个数据结构十分方便。它允许我们能够轻易地将序列号与消息关联起来(无论消息数据是什么)并允许我们能够通过一个给出的序列 id 轻易地清理条目(以处理多次确认/nack)。最后,它支持并发访问,因为确认回调是在客户端库拥有的线程中被调用的,该线程应该与发布线程保持不同。

除了使用复杂的字典实现外,还有其他方法可以跟踪未完成的确认,比如使用简单的并发哈希表和变量来跟踪发布序列的下界,但它们通常更复杂,不属于“教程”的范畴。

总而言之,异步处理发布者确认通常需要以下步骤:

  • 提供一个方法去关联发布序列号和消息。
  • 在通道上注册确认侦听器,以便在发布者 acks/nacks 到达时得到通知,并执行适当的操作,例如记录或者重新发布已 nack 的消息。在此步骤中,序列号到消息的关联机制也可能需要进行一些清理。
  • 在发布消息之前跟踪发布序列号。

重新发布已 nack 的消息?

在相应的回调中重新发布已 nack 的消息可能很诱人,但应该避免这样,因为确认回调是在(通道不应该执行操作的)I/O 线程中分配的。更好的方案是在内存队列中对消息进行排队,该队列由发布线程轮询。像 ConcurrentQueue 这样的类可以很好地在确认回调和发布线程之间传递消息。

总结

在某些应用程序中,确保已发布的消息到达代理是必要的。发布者确认(Publisher Confirms)是 RabbitMQ 的一个特性,可以帮忙满足这个需求。发布者确认本质上是异步的,但也可以同步处理它们。没有说只有一个绝对的方法来实现发布者确认,这通常取决于应用程序和整个系统中的约束。典型的技术有:

  • 单独发布消息,同步等待确认:简单,但吞吐量非常有限。
  • 批量发布消息,同步等待批处理的确认:简单、合理的吞吐量,但在一些东西出现问题时很难推断。
  • 异步处理:最佳性能和资源使用,错误情况下的良好控制,但还需要参与正确实现的过程(无法直接用现成的)。

2、将所有的东西放到一起

PublisherConfirms.cs 类包含了我们所介绍的技术的代码。我们可以编译它,按原样执行它并看看每项技术的表现如何:

dotnet run

输出会看起来像下面这样:

Published 50,000 messages individually in 5,549 ms
Published 50,000 messages in batch in 2,331 ms
Published 50,000 messages and handled confirms asynchronously in 4,054 ms

运行效果:
在这里插入图片描述

如果客户端和服务器位于同一台机器上,那么您的计算机的输出应当与之类似。不出所料单独发布消息表现十分糟糕;但出乎意料的是:与批量发布相比,异步处理的表现有些令人失望。

发布者确认十分依赖于网络,所以我们最好不要在远端节点上尝试,而在生产中,客户端和服务器通常不在同一台机器上却又是更现实的情况。PublisherConfirms.cs 可以很容易地更改为使用非本地节点:

private static IConnection CreateConnection()
{
    var factory = new ConnectionFactory { HostName = "remote-host", UserName = "remote-host", Password = "remote-password" };
    return factory.CreateConnection();
}

重新编译类,再次执行并等待结果:

Published 50,000 messages individually in 231,541 ms
Published 50,000 messages in batch in 7,232 ms
Published 50,000 messages and handled confirms asynchronously in 6,332 ms

我们看到单独发布现在的表现仍然非常糟糕。但是有了客户端和服务器之间的网络,批量发布和异步处理现在表现得差不多,同时异步处理在发布者确认方面还有一点小优势。

请记住,批量发布很容易实现,但是在 negative publisher acknowledgement 的情况下,不容易知道哪些消息不能发送到代理。异步处理发布者确认需要更多的参与实现,但提供了更好的粒度和对发布消息已 nack 时执行的操作的更好控制。

5、生产[非]适用性免责声明

请记住,本教程和其他教程都是教程。他们一次展示一个新概念,可能会有意地过度简化一些东西,而忽略其他东西。例如,为了简洁起见,连接管理、错误处理、连接恢复、并发性和指标收集等主题在很大程度上被省略了。这种简化的代码不应该被认为可以用于生产。

在发布您的应用之前,请先查看其他文档。我们特别推荐以下指南:发布者确认和消费者确认,生产清单和监控。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/772665.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Spring、SpringBoot、SpringCloud、SpringCloud Alibaba、Elasticsearch版本对应,附下载地址

1、GitHub Alibaba 发布SpringCloud Alibaba和SpringCloud 、SpringBoot版本 Spring Cloud Alibaba VersionSpring Cloud VersionSpring Boot2022.0.0.0-RC2Spring Cloud 2022.0.03.02022.0.0.0-RC1Spring Cloud 2022.0.03.0.02021.0.5.0*Spring Cloud 2021.0.52.6.132021.0.4…

Linux查看某进程所部署的目录路径

1.首先查看系统中正在跑的进程都有什么 ps -ef 2.然后通过抓取你要看的进程名&#xff0c;比如哪些服务 ps -ef | grep xxxxx(服务名) Linux在启动一个进程时&#xff0c;系统会在 /proc 下创建一个以PID命名的文件夹&#xff1b; 在该文件夹下会有我们的进程的信息&#…

【蓝图】p28按键+鼠标点击实现开关门

p28&#xff0c;创建门的蓝图类 actor和组件的区别、门的轴心点修改 创建一个Actor 添加一个静态网格体组件 创建一个门框 同理创建一个门Door 注意&#xff08;当门的中心点不在边角上时&#xff09; 创建一个Scene组件 把物体变换位置 这时只需要旋转Scene就可以旋转…

开发者评价:Serverless 容器最值得推荐的能力是什么?

Kubernetes 作为云原生计算的基础项目&#xff0c;已经在开发者和企业中获得广泛支持。它可以帮助企业加快部署频率、提升应用弹性、优化资源利用率、改善系统可用性。然而其自身复杂性和陡峭的学习曲线依然让一些开发者望而生畏&#xff1b;与此同时&#xff0c;随着企业数字化…

RabbitMQ到底为什么要使用它?

导入 一个技术的衍生必然是为了解决现实出现的问题&#xff0c;在讲这个问题之前我们先了解一下传统开发中关于服务调用出现的问题&#xff08;痛点&#xff09;有哪些&#xff1f; 我们为什么要使用MQ&#xff1f; ①、同步——超时 在多服务体系架构中&#xff0c;必然存在…

掘金量化—Python SDK文档—5.API 介绍(2)

Python SDK文档 5.API介绍 5.6通用数据函数&#xff08;免费&#xff09; python 通用数据 API 包含在 gm3.0.148 版本及以上版本&#xff0c;不需要引入新库 get_symbol_infos - 查询标的基本信息 获取指定(范围)交易标的基本信息&#xff0c;与时间无关. 此函数为掘金公…

C# 动态字典(可以随机实时增删访问,保证先入先出的字典)

如果你有以下需求&#xff1a; 1. 需要对Dictionary进行遍历的同时移除或者添加元素 2. 需要按顺序遍历Dictionary并且保证先入先出 3. 需要即时的获取字典内的元素数量&#xff0c;即时增删 如果你觉得好&#xff0c;请给我的框架点一个免费的star&#xff0c;球球啦 Yueh0607…

如何将视频转换为AVI格式?3个方法轻松转换!

在数字化时代&#xff0c;视频成为了人们记录和分享重要时刻的主要方式之一。然而&#xff0c;不同设备和平台对视频格式的要求千差万别&#xff0c;有时您可能需要将视频转换为特定格式以便于播放或编辑。在本文中&#xff0c;我们将重点介绍将视频转换为AVI&#xff08;Audio…

python实现小波降噪

文章目录 小波分解小波系数小波降噪阈值确定的一些小知识点python 实现小波去噪小波分解 上图为对信号进行3层小波分解,其中,Approximation 为近似小波系数(信号的低频成分),Detail为细节小波系数(信号的高频成分),分解后得到四个小波系数分别为A3,D3,D2,D1。 小波系数 小…

易查分怎么上传成绩?

当使用易查分制作查询系统时&#xff0c;许多老师可能对于如何上传成绩感到困惑。有时候&#xff0c;导入成绩到易查分系统后&#xff0c;信息可能无法完全显示&#xff0c;而且也很难找到错误的原因。因此&#xff0c;今天我将与老师们分享一下易查分上传成绩的方法。这个技巧…

【软件测试】Git实战-分支的新建和合并(超细整理)

目录&#xff1a;导读 前言一、Python编程入门到精通二、接口自动化项目实战三、Web自动化项目实战四、App自动化项目实战五、一线大厂简历六、测试开发DevOps体系七、常用自动化测试工具八、JMeter性能测试九、总结&#xff08;尾部小惊喜&#xff09; 前言 新建分支 首先&a…

ERROR 2003 (HY000): Can‘t connect to MySQL server on ‘localhost:3306‘ (10061)

用压缩包安装MySQL&#xff0c;执行mysql -u root -p命令&#xff0c;报错&#xff1a; ERROR 2003 (HY000): Cant connect to MySQL server on localhost:3306 (10061) 这是因为MySQL服务器没有启动&#xff0c;我打开任务管理器看了一下&#xff0c;确实没有启动&#xff0c;…

携手航天·追求卓越|诚邀优秀企业加入航天采购平台供应商库

近日&#xff0c;第九届中国&#xff08;国际&#xff09;商业航天高峰论坛在武汉开幕&#xff0c;中国载人航天工程副总设计师张海联在《我国载人月球探测发展总体考虑》主旨报告中介绍了中国载人登月的初步方案。为此&#xff0c;我国科研人员正在研制长征十号运载火箭、新一…

科技云报道:大模型“百团大战”,容联云的机会在哪里?

科技云报道原创。 “大模型的迭代是一场‘暴力’填数据、拔规模而造就的‘美学盛宴’”&#xff0c;中金公司研究团队在“AI浪潮之巅”系列报告中如是说。 在大模型发展初期&#xff0c;大模型或许还称得上是“大厂的游戏”&#xff0c;但半年之后的今天&#xff0c;国内10亿…

【Linux】日志与守护进程

目录 一、预备知识 二、打印日志 三、守护进程 1、前置知识 2、守护进程 一、预备知识 日志是有等级的&#xff0c;表明该条日志的重要程度&#xff0c;一般分为以下几个级别&#xff1a; #define DEBUG 0 //调试信息 #define INFO 1 //正常运行 #define WARNING 2 //报…

【Python爬虫+可视化案例】采集电商网站商品数据信息,并可视化分析

爬虫可视化案例 &#xff1a;苏宁易购 案例所需要掌握的知识点&#xff1a; selenium的使用html标签数据解析方法 需要准备的环境&#xff1a; python 3.8pycharm 2022专业版selenium python里面的第三方库 可以用来操作浏览器 爬虫代码展示 所需模块 【代码领取 请看文末…

017 - STM32学习笔记 - SPI读写FLASH(二)-flash数据写入与读取

016 - STM32学习笔记 - SPI访问Flash&#xff08;二&#xff09; 上节内容学习了通过SPI读取FLASH的JEDEC_ID&#xff0c;在flash资料的指令表中&#xff0c;还看到有很多指令可以使用&#xff0c;这节继续学习使用其他指令&#xff0c;程序模板采用上节的模板。 为了方便起…

为何异地销号这么难?这些注意事项要熟记!

最近有不少小伙伴私信小编&#xff0c;他们在网上办理的大流量手机号卡&#xff0c;用了一段时间之后想换其他的卡&#xff0c;所以想注销当前用的卡&#xff0c;但是注销的时候确实屡屡碰壁&#xff0c;程序还比较繁琐&#xff0c;有的甚至申请注销了几个月还注销不掉&#xf…

在Microsoft Excel中如何合并多个表格

如果你问那些处理数据的人,你会知道合并 Excel 文件或合并工作簿是他们日常工作的一部分。 Power Query 是将多个 Excel 文件中的数据合并或组合到一个文件中的最佳方式。你需要将所有文件存储在一个文件夹中,然后使用该文件夹将这些文件中的数据加载到高级查询编辑器中。它…

了解kubernetes部署:namespace和Node设置

节点及namespace的设置 kubectlcreate-f/opt/kubernetes/namespaces.yaml 通过此命令我们创建了如下namespace: ns-elasticsearch:elasticsearch相关  ns-rabbitmq:rabbitmq相关  ns-javashop&#xff1a;javashop应用相关 接下来我们要根据具体情况安排各个节点的部署规划…