RabbitMQ教程:发布/订阅模式(Publish/Subscribe)(三)

news2025/1/16 9:13:31

文章目录

  • RabbitMQ教程:发布/订阅模式(Publish/Subscribe)(三)
    • 一、引言
    • 二、简介
    • 三、准备工作
      • 3.1 说明
      • 3.2 生成项目
    • 四、实战
      • 4.1 交换机(Exchanges)
      • 4.2 临时队列(Temporary Queues)
      • 4.3 绑定(Bindings)
      • 4.4 整合代码
        • 发布程序
        • 订阅程序
      • 4.5 验证一:先广播后订阅
      • 4.6 验证二:先订阅后广播
    • 五、结论

RabbitMQ教程:发布/订阅模式(Publish/Subscribe)(三)

一、引言

在快节奏的软件开发世界中,我们经常面临需要将消息发送给多个接收者的场景。例如,在构建日志监控系统、实时通知系统等场景时,我们希望一个事件的发生能够被多个服务同时感知和处理。这时,发布/订阅模式(Publish/Subscribe)就显得尤为重要。在本教程中,我们将通过一个简单的例子来学习如何使用RabbitMQ实现发布/订阅模式。

二、简介

在上一篇教程中,我们学习了如何使用RabbitMQ实现工作队列(Work Queues)。今天,我们将探索工作队列的进阶应用——发布/订阅模式,这是一种允许多个接收者(Subscribers)监听同一个消息通道,并在消息发布时接收通知的机制。发布/订阅模式的核心在于解耦消息的发送者(Publisher)和接收者(Subscribers),发送者不需要知道有哪些接收者,只需要将消息发送到一个交换机(Exchange),而接收者则订阅这个交换机来接收消息。

三、准备工作

3.1 说明

在本教程中,我们将使用RabbitMQ的.NET客户端来创建一个简单的发布/订阅系统。我们将创建一个名为logs的fanout类型的交换机,并将所有日志消息广播给所有订阅了该交换机的队列。

3.2 生成项目

首先,我们需要生成两个项目:

EmitLogApp:用于模拟日志消息的发布者。
ReceiveLogsApp:用于接收并打印日志消息的订阅者。
我们可以使用以下命令来创建这两个项目:

dotnet new console --name EmitLog
cd EmitLog
dotnet add package RabbitMQ.Client
cd ..
dotnet new console --name ReceiveLogs
cd ReceiveLogs
dotnet add package RabbitMQ.Client

这些命令创建了两个新的控制台应用程序,一个用于发送日志消息,另一个用于接收并打印日志消息。

四、实战

4.1 交换机(Exchanges)

在之前的教程中,我们直接将消息发送到队列。现在,我们需要引入交换机(Exchange)的概念。在RabbitMQ中,生产者从不直接向队列发送消息,而是发送到交换机,然后由交换机将消息推送到一个或多个队列。交换机的行为由交换机类型定义。

我们将创建一个名为logsfanout类型的交换机,它将广播所有接收到的消息给所有绑定到它的队列。
在这里插入图片描述

channel.ExchangeDeclare("logs", ExchangeType.Fanout);

4.2 临时队列(Temporary Queues)

在我们的日志系统中,我们希望每个运行的接收者程序都能接收到所有日志消息。因此,我们不需要为队列指定名称,而是让服务器为我们生成一个随机名称。同时,我们希望在消费者断开连接后队列能自动删除。

在.NET客户端中,我们可以创建一个非持久性、独占的、自动删除的队列,并让服务器为我们生成一个名称:

var queueName = channel.QueueDeclare().QueueName;

4.3 绑定(Bindings)

我们已经创建了一个fanout交换机和一个队列。现在,我们需要告诉交换机将消息发送到我们的队列。交换机和队列之间的关系称为绑定(Binding)。

channel.QueueBind(queue: queueName, exchange: "logs", routingKey: string.Empty);

在这里插入图片描述

4.4 整合代码

发布程序
using RabbitMQ.Client;
using System.Text;

await PublishMessagesAsync(10);
/// <summary>
/// 发布指定次数的消息到RabbitMQ队列
/// </summary>
/// <param name="loopCount">消息发送的次数</param>
/// <returns>Task对象,表示异步操作</returns>
async Task PublishMessagesAsync(int loopCount)
{
    // 循环发送指定次数的消息
    for (int i = 1; i <= loopCount; i++)
    {
        // 调用SendMessageToQueue方法发送消息,并包含当前迭代次数
        await SendMessageToQueue($"Iteration {i} - Hello World");
        // 这里可以添加延迟,如果需要的话
        // await Task.Delay(1000);
    }
    Console.ReadLine();
}
/// <summary>
/// 向RabbitMQ队列发送一条消息
/// </summary>
/// <param name="message">要发送的消息内容</param>
/// <returns>Task对象,表示异步操作</returns>
async Task SendMessageToQueue(string message)
{
    // 创建连接工厂,并设置RabbitMQ服务器地址为localhost
    var factory = new ConnectionFactory { HostName = "localhost" };
    // 使用异步方式创建连接
    using var connection = await factory.CreateConnectionAsync();
    // 使用异步方式创建通道
    using var channel = await connection.CreateChannelAsync();
    //声明名为"logs"的fanout类型的交换机
    await channel.ExchangeDeclareAsync(exchange: "logs", type: ExchangeType.Fanout);
    // 将消息内容编码为字节数组
    var body = Encoding.UTF8.GetBytes(message);

    // 异步发布消息到队列
    await channel.BasicPublishAsync(exchange: "logs", routingKey: string.Empty, body: body);
    Console.WriteLine($" [x] Sent {message}");
}
订阅程序
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System.Text;
// 创建连接工厂,并设置RabbitMQ服务器地址为localhos
var factory = new ConnectionFactory { HostName = "localhost" };
// 使用异步方式创建连接
using var connection = await factory.CreateConnectionAsync();
// 使用异步方式创建通道
using var channel = await connection.CreateChannelAsync();

// 异步声明一个名为"logs"的fanout类型的交换机
// 交换机会将所有接收到的消息广播给所有绑定到它的队列
await channel.ExchangeDeclareAsync(exchange: "logs",
    type: ExchangeType.Fanout);

// 声明一个由服务器命名的队列,这样每个消费者都会有一个唯一的队列
// 这使得我们可以有多个消费者同时接收消息,而不会相互干扰
QueueDeclareOk queueDeclareResult = await channel.QueueDeclareAsync();
string queueName = queueDeclareResult.QueueName;

// 将由服务器创建的队列绑定到"logs"交换机
// 这样,交换机就会将消息发送到这个队列
await channel.QueueBindAsync(queue: queueName, exchange: "logs", routingKey: string.Empty);
// 输出提示信息,表示消费者正在等待日志消息
Console.WriteLine(" [*] Waiting for logs.");

// 创建一个异步事件驱动的消费者
var consumer = new AsyncEventingBasicConsumer(channel);

// 设置当消费者接收到消息时的事件处理程序
consumer.ReceivedAsync += (model, ea) =>
{
    // 从接收到的消息中提取消息体并转换为字符串
    byte[] body = ea.Body.ToArray();
    var message = Encoding.UTF8.GetString(body);
    // 打印接收到的消息
    Console.WriteLine($" [x] {message}");
    // 返回Task.CompletedTask以满足异步事件处理的签名要求
    return Task.CompletedTask;
};
// 开始消费指定队列的消息
// 这个调用会告诉RabbitMQ服务器,我们有一个消费者准备好接收"logs"交换机绑定的队列中的消息
await channel.BasicConsumeAsync(queueName, autoAck: true, consumer: consumer);

Console.WriteLine(" Press [enter] to exit.");
Console.ReadLine();

4.5 验证一:先广播后订阅

运行EmitLogApp\bin\Debug\net8.0\EmitLogApp.exe,即先发布广播;再运行ReceiveLogsApp\bin\Debug\net8.0\ReceiveLogsApp.exe进行订阅广播。可以发现,没有接收到任何内容。原因是需要先启动订阅者(消费者),再启动广播(发布者/生产者)才可以接收到消息
在这里插入图片描述

4.6 验证二:先订阅后广播

先运行ReceiveLogsApp\bin\Debug\net8.0\ReceiveLogsApp.exe
再运行EmitLogApp\bin\Debug\net8.0\EmitLogApp.exe;可以发现每个消费者(订阅者)都收到了相同信息。
在这里插入图片描述

五、结论

在本教程中,我们深入探讨了RabbitMQ发布/订阅模式的概念和实现。通过构建一个简单的日志系统,我们学习了如何创建fanout类型的交换机,以及如何发送和接收消息。以下是我们从本教程中获得的关键要点:

  1. 解耦发送者和接收者:发布/订阅模式允许发送者和接收者之间没有直接的联系,发送者只需要将消息发送到交换机,而接收者则订阅交换机来接收消息。

  2. 消息广播fanout类型的交换机会将所有接收到的消息广播给所有绑定到它的队列,这对于日志系统、事件通知等场景非常有用。

  3. 临时队列:我们使用了临时队列来接收消息,这样每个订阅者都会有自己的队列,并且在订阅者断开连接后,队列会自动删除。

  4. 动态订阅:订阅者可以随时订阅或取消订阅交换机,这使得系统具有很高的灵活性和动态性。

通过这些机制,我们能够建立一个高效的发布/订阅系统,它不仅能够提高系统的响应速度,还能够在面对各种异常情况时保持消息的可靠性和持久性。这些知识为我们在实际开发中实现复杂的事件驱动架构提供了坚实的基础。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2244062.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

金山云Q3调整后EBITDA率提升至9.8% 经营效率和盈利能力强劲增长

11月19日&#xff0c;金山云公布了2024年第三季度业绩。 季度内&#xff0c;公司在收入规模、盈利能力、经营现金流方面都取得了扎实的进展。财报显示&#xff0c;金山云Q3营收18.9亿元&#xff0c;同比回归两位数快速增长&#xff0c;达16.0%&#xff1b;公有云实现收入11.8亿…

Python轴承故障诊断 (19)基于Transformer-BiLSTM的创新诊断模型

往期精彩内容&#xff1a; Python-凯斯西储大学&#xff08;CWRU&#xff09;轴承数据解读与分类处理 Pytorch-LSTM轴承故障一维信号分类(一)-CSDN博客 Pytorch-CNN轴承故障一维信号分类(二)-CSDN博客 Pytorch-Transformer轴承故障一维信号分类(三)-CSDN博客 三十多个开源…

Linux 安装 jdk8

将原有的 JDK 卸载干净&#xff08;可选&#xff09; # 查找并显示出系统所有已安装的与 JDK 相关的 rpm 软件包名称 rpm -qa | grep jdk # 删除 jdk rpm -e --nodeps 要卸载的JDK 安装 一、方法一&#xff1a;yum 包管理器安装 1&#xff09;检索可用包 yum search java |…

ESLint的简单使用(js,ts,vue)

一、ESLint介绍 1.为什么要用ESLint 统一团队编码规范&#xff08;命名&#xff0c;格式等&#xff09; 统一语法 减少git不必要的提交 减少低级错误 在编译时检查语法&#xff0c;而不是等js引擎运行时才检查 2.eslint用法 可以手动下载配置 可以通过vue脚手架创建项…

11.19机器学习_逻辑回归

十二 逻辑回归 1.概念 逻辑回归(Logistic Regression)是机器学习中的一种分类模型&#xff0c;逻辑回归是一种分类算法&#xff0c;虽然名字中带有回归&#xff0c;但是它与回归之间有一定的联系。由于算法的简单和高效&#xff0c;在实际中应用非常广泛。 逻辑回归一般用于…

数据结构-树状数组专题(2)

一、前言 接上回树状数组专题&#xff08;1&#xff09;&#xff0c;这次主要介绍差分跟树状数组联动实现区间更新 二、我的模板 重新放了一遍&#xff0c;还是提一嘴&#xff0c;注意下标从0开始&#xff0c;区间左闭右开 template <typename T> struct Fenwick {in…

SAM-Med2D 训练完成后boxes_prompt没有生成mask的问题

之前对着这这篇文章去微调SAM_Med2D(windows环境),发现boxes_prompt空空如也。查找了好长时间问题SAM-Med2D 大模型学习笔记&#xff08;续&#xff09;&#xff1a;训练自己数据集_sam训练自己数据集-CSDN博客 今天在看label2image_test.json文件的时候发现了一些端倪: 官方…

从源头保障电力安全:输电线路动态增容与温度监测技术详解

在电力系统中&#xff0c;输电线路是电能传输的关键环节。然而&#xff0c;当导线温度过高时&#xff0c;会加速导线老化&#xff0c;降低绝缘性能&#xff0c;甚至引发短路、火灾等严重事故&#xff0c;对电网安全运行构成巨大威胁。近日&#xff0c;某地区因持续高温和用电负…

第02章 CentOS基本操作

2.文件基本操作【文件操作&#xff08;一&#xff09;】 目标 理解Linux下路径的表示方法能够使用命令(mkdir和touch)在指定位置创建目录和文件能够使用命令(rm)删除指定的目录和文件能够使用命令(ls)列出目录里的文件能够使用命令(cat,head,tail,less,more)查看文件内容理解标…

leetcode400第N位数字

代码 class Solution {public int findNthDigit(int n) {int base 1;//位数int weight 9;//权重while(n>(long)base*weight){//300n-base*weight;base;weight*10;}//n111 base3 weight900;n--;int res (int)Math.pow(10,base-1)n/base;int index n%base;return String…

工业生产安全-安全帽第二篇-用java语言看看opencv实现的目标检测使用过程

一.背景 公司是非煤采矿业&#xff0c;核心业务是采选&#xff0c;大型设备多&#xff0c;安全风险因素多。当下政府重视安全&#xff0c;头部技术企业的安全解决方案先进但价格不低&#xff0c;作为民营企业对安全投入的成本很敏感。利用我本身所学&#xff0c;准备搭建公司的…

【AI人脸工具整合包及教程】Rope——重新定义你的数字形象!

引言 在这个数字时代&#xff0c;个人形象的重要性不言而喻。无论是社交媒体上的个人展示&#xff0c;还是商业活动中的品牌塑造&#xff0c;一个独特的形象都能让人眼前一亮。随着技术的发展&#xff0c;AI人脸技术逐渐从科幻走向现实&#xff0c;成为普通人也能轻松触及的技…

NLP论文速读(EMNLP 2024)|动态奖励与提示优化来帮助语言模型的进行自我对齐

论文速读|Dynamic Rewarding with Prompt Optimization Enables Tuning-free Self-Alignment of Language Models 论文信息&#xff1a; 简介: 本文讨论的背景是大型语言模型&#xff08;LLMs&#xff09;的自我对齐问题。传统的LLMs对齐方法依赖于昂贵的训练和人类偏好注释&am…

java CAS详解

java 中CAS是如何实现的&#xff1f; 在 Java 中&#xff0c;实现 CAS&#xff08;Compare-And-Swap, 比较并交换&#xff09;操作的一个关键类是Unsafe。 Unsafe类位于sun.misc包下&#xff0c;是一个提供低级别、不安全操作的类。由于其强大的功能和潜在的危险性&#xff0…

Gooxi受邀参加海通证券AI+应用生态大会,助力数智金融高质量发展

11月15日&#xff0c;由海通证券举办以”智算无界&#xff0c;共臻高远”为主题AI应用生态大会在上海圆满落幕。此次活动汇聚了众多人工智能领域的意见领袖、专家学者、优秀企业代表及资深投资人&#xff0c;共同探讨金融行业人工智能应用的前沿理论、最佳实践及发展趋势&#…

Python数据分析与可视化实验案例,所需数据已经绑定上传

大数据技术专业技能竞赛试卷 一、项目名称 农业肥料登记数据分析赛题 二、竞赛内容 赛项以大数据技术为核心内容&#xff0c;重点考查参赛选手数据清洗和数据分析的能力&#xff0c;结合Pandas和matplotlib图表展示数据。所有参赛学生在现场根据给定的项目任务&#xff0c;…

【竞技宝】LOL-传奇杯:姿态飞机TP绕后一锤定音

北京时间2024年11月19日,英雄联盟第二届传奇杯正在如火如荼的进行之中。昨天迎来小组赛第四个比赛日,本日一共进行了七场小组赛的对决,那么在昨日上半场的四场比赛中,登场的各支队伍都取得了什么样的表现呢?接下来小宝为大家带来小组赛day4上半场的比赛战报。 OP(宁王队) 0-1 …

qt之telnet连接目标设备在线调试功能

一、前言 在QT下使用telnet连接目标设备&#xff0c;进行在线命令调试&#xff0c;也可配合ftp或ssh使用。 telnet某些库在qt5下不可用&#xff0c;无法获取登录信息&#xff0c;只能获取到连接信息&#xff0c;这里我用自己的方式判断是否成功登录 二、环境 window qt5.7…

Android中常见内存泄漏的场景和解决方案

本文讲解Android 开发中常见内存泄漏场景及其解决方案&#xff0c;内容包括代码示例、原因分析以及最佳实践建议。 1. 静态变量导致的内存泄漏 静态变量的生命周期与应用进程一致&#xff0c;如果静态变量持有了对 Activity 或其他大对象的引用&#xff0c;就可能导致内存泄漏…

红外相机和RGB相机外参标定 - 无需标定板方案

1. 动机 在之前的文章中红外相机和RGB相机标定&#xff1a;实现两种模态数据融合_红外相机标定-CSDN博客 &#xff0c;介绍了如何利用标定板实现外参标定&#xff1b;但实测下来发现2个问题&#xff1a; &#xff08;1&#xff09;红外标定板尺寸问题&#xff0c;由于标定板小…