RabbitMQ-工作模式(Publish模式Routing模式)

news2024/12/26 10:50:44

在这里插入图片描述

文章目录

  • 发布/订阅(Publish/Subscribe)
    • 交换机
    • 临时队列
    • 绑定
    • 总体代码示例
  • 路由(Routing)
    • 绑定
    • 直连交换机
    • 多重绑定
    • 发送日志
    • 订阅
    • 总体代码示例

更多相关内容可查看

发布/订阅(Publish/Subscribe)

构建一个简单的日志系统

  • 我们将通过构建一个简单的日志系统来说明这个模式。它将包含两个程序 – 第一个程序将发出日志消息,第二个程序将接收并打印它们。
  • 在我们的日志系统中,每个运行中的接收程序都将收到这些消息。这样,我们就能够运行一个接收程序并将日志定向到磁盘;同时,我们也能够运行另一个接收程序,并在屏幕上看到日志。

基本上,发布的日志消息将被广播到所有的接收程序。

交换机

Rabbit中完整的消息模型:

  • 生产者是发送消息的用户应用程序。
  • 队列是存储消息的缓冲区。
  • 消费者是接收消息的用户应用程序。

在RabbitMQ中消息模型的核心思想是,生产者从不直接发送任何消息到队列。实际上,很多时候生产者甚至不知道消息是否会被发送到任何队列。

相反,生产者只能将消息发送到一个交换机。交换机是一个非常简单的东西。它一边从生产者接收消息,另一边将它们推送到队列。交换机必须确切地知道如何处理收到的消息。它应该被附加到特定的队列吗?它应该被附加到多个队列吗?还是应该被丢弃。这些规则由交换机类型定义。
在这里插入图片描述

有几种可用的交换机类型:direct、topic、headers和fanout。我们将专注于最后一种类型 – fanout。让我们创建一个这种类型的交换机,并称其为logs:

channel.exchangeDeclare("logs", "fanout");

fanout交换机非常简单。正如你可能从名称中猜到的那样,它只是将接收到的所有消息广播到它所知道的所有队列。这正是我们的日志记录器所需要的。

列出交换机
要列出服务器上的交换机,您可以运行非常有用的rabbitmqctl命令:

sudo rabbitmqctl list_exchanges

在这个列表中会有一些amq.*交换机和默认(未命名)交换机。这些是默认创建的,无需考虑

之前对交换机一无所知,但仍然能够将消息发送到队列。这是因为我们使用的是默认交换,通过空字符串("")来标识它。

回想一下之前发布消息的方式:

channel.basicPublish("", "hello", null, message.getBytes());

第一个参数是交换机的名称空字符串表示默认或无名交换机:如果存在指定 routingKey 的队列,则消息会被路由到该队列。

现在,我们可以将消息发布到我们命名的交换机:

channel.basicPublish("logs", "", null, message.getBytes());

这样,我们将消息发布到名为 logs 的交换机中,而不是默认的无名交换机。

临时队列

在之前,我们使用的队列都有特定的名称。能够给队列命名对我们来说非常重要 - 我们需要将工作者指向同一个队列。在想要在生产者和消费者之间共享队列时,给队列命名非常重要。

但是对于我们的日志记录器来说情况并非如此。我们希望收到所有日志消息,而不仅仅是其中的一部分。我们也只对当前正在流动的消息感兴趣,而不是旧消息。为了解决这个问题,我们需要两件事情。

  • 首先,每当我们连接到 Rabbit 时,我们都需要一个全新的空队列。为此,我们可以创建一个具有随机名称的队列,或者更好的是让服务器为我们选择一个随机的队列名称。
  • 其次,一旦我们断开消费者连接,队列应该自动删除。

在 Java 中,当我们向 queueDeclare() 方法提供没有参数时,我们创建一个非持久化、独占、自动删除的队列,并且由服务器生成一个名称:

String queueName = channel.queueDeclare().getQueue();

在这一点上,queueName 包含一个随机的队列名称。例如,它可能看起来像amq.gen-JzTY20BRgKO-HjmUJj0wLg

绑定

在这里插入图片描述

我们已经创建了一个fanout 交换机和一个队列。现在我们需要告诉交换机将消息发送到我们的队列。交换机和队列之间的这种关系称为绑定

channel.queueBind(queueName, "logs", "");

从现在开始,logs 交换机将会将消息追加到我们的队列中。

列出绑定
您可以使用以下命令列出现有的绑定:

rabbitmqctl list_bindings

总体代码示例

在这里插入图片描述
发出日志消息的 producer 程序:logsroutingKeyfanoutEmitLog.java

public class EmitLog {

  private static final String EXCHANGE_NAME = "logs";

  public static void main(String[] argv) throws Exception {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    try (Connection connection = factory.newConnection();
         Channel channel = connection.createChannel()) {
         //指定交换机类型-fanout
        channel.exchangeDeclare(EXCHANGE_NAME, "fanout");

        String message = argv.length < 1 ? "info: Hello World!" :
                            String.join(" ", argv);
		//绑定交换机 发送消息到交换机EXCHANGE_NAME中
        channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("UTF-8"));
        System.out.println(" [x] Sent '" + message + "'");
    }
  }
}

如果还没有队列绑定到交换机,则消息将丢失, 但这对我们来说没关系,如果还没有消费者在获取,我们可以安全地丢弃该消息。

代码为:ReceiveLogs.java

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;

public class ReceiveLogs {
  private static final String EXCHANGE_NAME = "logs";

  public static void main(String[] argv) throws Exception {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();
	//指定交换机类型-fanout
    channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
    String queueName = channel.queueDeclare().getQueue();
    //绑定交换机跟队列
    channel.queueBind(queueName, EXCHANGE_NAME, "");

    System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

    DeliverCallback deliverCallback = (consumerTag, delivery) -> {
        String message = new String(delivery.getBody(), "UTF-8");
        System.out.println(" [x] Received '" + message + "'");
    };
    channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });
  }
}

如果你想将日志保存到文件中,只需打开控制台并输入以下命令:

java -cp $CP ReceiveLogs > logs_from_rabbit.log

如果你希望在屏幕上看到日志,开启一个新的终端并运行:

java -cp $CP ReceiveLogs

当然,要发出日志,只需输入:

java -cp $CP EmitLog

使用 rabbitmqctl list_bindings 命令,你可以验证代码实际上是否按我们想要的方式创建了绑定和队列。如果有两个 ReceiveLogs.java 程序正在运行,你应该会看到类似如下的输出:

sudo rabbitmqctl list_bindings
# => Listing bindings ...
# => logs    exchange        amq.gen-JzTY20BRgKO-HjmUJj0wLg  queue           []
# => logs    exchange        amq.gen-vso0PVvyiRIL2WoV3i48Yg  queue           []
# => ...done.

结果的解释很简单:来自 exchange logs 的数据发送到两个带有服务器分配名称的队列中。这正是我们想要的。

路由(Routing)

绑定

在前面的示例中,我们已经创建了绑定。你可能还记得 代码如下:

channel.queueBind(queueName, EXCHANGE_NAME, "");

绑定是交换机和队列之间的关系。这可以简单地理解为:队列对来自该交换机的消息感兴趣。

绑定可以接受一个额外的routingKey 参数。为了避免与basic_publish参数混淆,我们将其称为绑定键。以下是我们如何使用键创建绑定的示例:

channel.queueBind(queueName, EXCHANGE_NAME, "black");

绑定键的含义取决于交换机类型。我们先前使用的fanout 交换机简单地忽略了它的值。

直连交换机

在我们之前的教程中,我们的日志系统将所有消息广播给所有消费者。我们希望扩展其功能,以允许根据消息的严重性进行过滤。例如,我们可能希望一个将日志消息写入磁盘的程序只接收关键错误,而不浪费磁盘空间来记录警告或信息日志消息。

我们之前使用的是fanout 交换机,它并没有提供太多的灵活性 - 它只能进行无脑广播。

相反,我们将使用直连交换机。直连交换机背后的路由算法很简单 - 消息将被发送到绑定键与消息的路由键完全匹配的队列中。

为了说明这一点,考虑以下设置:

橙黑绿P直接问₁Q₂C₁

在这个设置中,我们可以看到直连交换机 X 与两个绑定到它的队列。第一个队列绑定的绑定键是 orange,而第二个队列有两个绑定,一个绑定键为 black,另一个为 green

在这样的设置中,使用路由键 orange发布到交换机的消息将被路由到队列 Q1。具有路由键 black 或 green的消息将发送到 Q2。所有其他消息将被丢弃。

多重绑定

黑黑P直接问₁Q₂C₁C₂

将多个队列与相同的绑定键绑定是完全合法的。在我们的示例中,我们可以添加一个在交换机 X 和队列 Q1 之间的绑定,绑定键为black。在这种情况下,直连交换机将像fanout 交换机一样行为,将消息广播到所有匹配的队列。具有路由键 black 的消息将被传递到 Q1 和 Q2。

发送日志

我们将使用这个模型来构建我们的日志系统。与使用fanout 交换机不同,我们将消息发送到一个直连交换机。我们将日志严重程度作为路由键提供。这样,接收程序就能够选择它想要接收的严重程度。让我们先专注于发送日志。

和往常一样,我们首先需要创建一个交换机:

channel.exchangeDeclare(EXCHANGE_NAME, "direct");

接下来,我们准备发送一条消息:

channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes());

我们假设 ‘severity’ 可以是 'info'、'warning' 或 'error' 中的一个。

订阅

接收消息的方式与之前的教程类似,只有一个例外 - 我们将为每个我们感兴趣的严重程度创建一个新的绑定。

String queueName = channel.queueDeclare().getQueue();

for(String severity : argv){
  channel.queueBind(queueName, EXCHANGE_NAME, severity);
}

总体代码示例

错误信息警告错误P直接amq.gen-S9b...amq.gen-Ag1...C₁C₂

生产者类的代码:EmitLogDirect.java

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class EmitLogDirect {

  private static final String EXCHANGE_NAME = "direct_logs";

  public static void main(String[] argv) throws Exception {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    try (Connection connection = factory.newConnection();
         Channel channel = connection.createChannel()) {
         //指定交换机类型
        channel.exchangeDeclare(EXCHANGE_NAME, "direct");
		//指定日志类型
        String severity = getSeverity(argv);
        String message = getMessage(argv);
		//发送日志
        channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes("UTF-8"));
        System.out.println(" [x] Sent '" + severity + "':'" + message + "'");
    }
  }
  //..
}

消费者代码为:ReceiveLogsDirect.java

import com.rabbitmq.client.*;

public class ReceiveLogsDirect {

  private static final String EXCHANGE_NAME = "direct_logs";

  public static void main(String[] argv) throws Exception {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();
	//指定交换机类型
    channel.exchangeDeclare(EXCHANGE_NAME, "direct");
    //获取交换机随机名字
    String queueName = channel.queueDeclare().getQueue();

    if (argv.length < 1) {
        System.err.println("Usage: ReceiveLogsDirect [info] [warning] [error]");
        System.exit(1);
    }

    for (String severity : argv) {
    	//指定日志类型
        channel.queueBind(queueName, EXCHANGE_NAME, severity);
    }
    System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

    DeliverCallback deliverCallback = (consumerTag, delivery) -> {
        String message = new String(delivery.getBody(), "UTF-8");
        System.out.println(" [x] Received '" +
        delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");
    };
    channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });
  }
}

如果你只想将 ‘warning’ 和 ‘error’(而不是 ‘info’)日志消息保存到文件中,只需打开控制台并输入以下命令:

java -cp $CP ReceiveLogsDirect warning error > logs_from_rabbit.log

如果你想在屏幕上看到所有的日志消息,打开一个新的终端并执行以下命令:

java -cp $CP ReceiveLogsDirect info warning error
# => [*] Waiting for logs. To exit press CTRL+C

例如,要发出一个错误日志消息,只需输入以下命令:

java -cp $CP EmitLogDirect error "Run. Run. Or it will explode."
# => [x] Sent 'error':'Run. Run. Or it will explode.'

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1800985.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Playwright框架入门

自从2023年底playwright框架火起来之后,很多小伙伴咨询我们这个框架,甚至问我们什么时候出这个课程. 这步这个课程在我们千呼万唤中出来了.具体的课程大纲和试听可以联系下方二维码获取. 今天给大家分享一下playwright的安装和一些常用API,为后续的学习做好准备工作. Playwrig…

批量重命名大解放!自定义取文本左侧长度,轻松实现文件名焕新之旅!

文件管理是我们日常工作和生活中不可或缺的一部分。然而&#xff0c;面对成千上万的文件&#xff0c;手动重命名无疑是一项繁琐且耗时的任务。今天&#xff0c;我们为您推荐一款高效便捷的批量文件重命名工具——文件批量改名高手&#xff0c;让您轻松实现取文本左的长度来进行…

【PythonCode】力扣Leetcode21~25题Python版

【PythonCode】力扣Leetcode21~25题Python版 前言 力扣Leetcode是一个集学习、刷题、竞赛等功能于一体的编程学习平台&#xff0c;很多计算机相关专业的学生、编程自学者、IT从业者在上面学习和刷题。 在Leetcode上刷题&#xff0c;可以选择各种主流的编程语言&#xff0c;如C…

【会议征稿,IEEE出版】EEI 2024,6月28-30

第六届电子工程与信息学国际学术会议&#xff08;EEI 2024&#xff09;将于2024年6月28日至6月30日在中国重庆召开。EEI 2024将围绕“电子工程”、“信息学”与“计算机科学”等相关最新研究领域 &#xff0c;为来自国内外高等院校、科学研究所、企事业单位的专家、教授、学者、…

海康威视综合安防管理平台 多处 FastJson反序列化RCE漏洞复现

0x01 产品简介 海康威视综合安防管理平台是一套“集成化”、“智能化”的平台,通过接入视频监控、一卡通、停车场、报警检测等系统的设备。海康威视集成化综合管理软件平台,可以对接入的视频监控点集中管理,实现统一部署、统一配置、统一管理和统一调度。 0x02 漏洞概述 由于…

javacv ffmpeg使用笔记 (补充中...)

javacv ffmpeg使用笔记 一、maven依赖二、示例代码1. 获取视频时长 三、小技巧 一、maven依赖 使用javacv ffmpeg并指定classifier之后&#xff0c;就不需要额外安装ffmpeg软件&#xff08;jar包中已经内置&#xff09;了。 全量依赖包&#xff08;不推荐&#xff09;安装包总大…

Docker 管理 | 代理配置、内网共享和 Harbor 部署

唠唠闲话 在现代软件开发和运维中&#xff0c;容器技术已经成为构建、部署和管理应用程序的标准工具。然而&#xff0c;在实际操作中&#xff0c;我们常常需要面对一些常见的挑战&#xff0c;如容器访问外部资源的代理配置、内网环境下的镜像共享以及企业级镜像管理。 本教程…

Transformer动画讲解:注意力计算Q、K、V

暑期实习基本结束了&#xff0c;校招即将开启。 不同以往的是&#xff0c;当前职场环境已不再是那个双向奔赴时代了。求职者在变多&#xff0c;HC 在变少&#xff0c;岗位要求还更高了。提前准备才是完全之策。 最近&#xff0c;我们又陆续整理了很多大厂的面试题&#xff0c…

自动化Reddit图片收集:Python爬虫技巧

引言 Reddit&#xff0c;作为一个全球性的社交平台&#xff0c;拥有海量的用户生成内容&#xff0c;其中包括大量的图片资源。对于数据科学家、市场研究人员或任何需要大量图片资源的人来说&#xff0c;自动化地从Reddit收集图片是一个极具价值的技能。本文将详细介绍如何使用…

GCB | 全球草地和森林土壤呼吸对降水量变化的不对称响应

全球变化导致地球水文循环的强化放大了降水的年际变化&#xff0c;这将显著影响陆地碳&#xff08;C&#xff09;循环。然而&#xff0c;在极端降水变化情况下&#xff0c;先前观测到的土壤呼吸&#xff08;Rs&#xff09;和降水之间的关系是否仍然适用&#xff0c;目前仍不清楚…

thinkphp6.0版本下子查询sql处理

目录 一&#xff1a;背景 二&#xff1a;查询实例 三&#xff1a;总结 一&#xff1a;背景 我们在实际业务的开发过程中&#xff0c;经常会碰到这样的场景&#xff0c;查询某些部门的客户信息&#xff0c;查询下过订单的客户信息。这里查询客户信息实际上就用到了子查询&…

PS的stable diffusion插件安装指南

PS的stable diffusion插件安装指南 1.首先要安装stable diffusion&#xff0c;具体安装方法&#xff0c;参考https://blog.csdn.net/sheji888/article/details/139196688 stable diffusion要求要启用API功能 2.安装ps2023以上版本&#xff0c;低于这个版本不能使用stable diff…

TCP攻击是怎么实现的,如何防御?

TCP&#xff08;Transmission Control Protocol&#xff09;是互联网协议族中的重要组成部分&#xff0c;用于在不可靠的网络上提供可靠的数据传输服务。然而&#xff0c;TCP协议的一些特性也使其成为攻击者的目标&#xff0c;尤其是DDoS&#xff08;Distributed Denial of Ser…

马斯克五步流程法在产品创新中的实践与应用

引言&#xff1a; 在科技创新的浪潮中&#xff0c;埃隆马斯克以其独到的思维方式和创新实践&#xff0c;引领着多个行业的前沿。他提出的“第一性原理”下的五步流程法&#xff0c;不仅是对创新过程的深刻洞见&#xff0c;也为产品经理和工程师们提供了一套行之有效的工作方法。…

【Redis】Redis经典问题:缓存穿透、缓存击穿、缓存雪崩

目录 缓存的处理流程缓存穿透解释产生原因解决方案1.针对不存在的数据也进行缓存2.设置合适的缓存过期时间3. 对缓存访问进行限流和降级4. 接口层增加校验5. 布隆过滤器原理优点缺点关于扩容其他使用场景SpringBoot 整合 布隆过滤器 缓存击穿产生原因解决方案1.设置热点数据永不…

强国机械制造有限公司引入先进制造技术,提升产品质量和生产效率

强国机械制造有限公司2024年6月3日宣布引入了一系列先进制造技术,包括机器学习、人工智能和物联网等,旨在提升其产品的质量和生产效率。这些前沿技术的应用,使得公司的制造过程更加智能化和数据驱动,显著提高了产品的精度和稳定性。 通过机器学习算法,强国机械能够分析和预测生…

【Mybatis】动态SQL标签3

foreach标签是使用举例 在实际应用中&#xff0c;我常常需要根据多个id批量的操作&#xff1a; 查询指定id的记录&#xff1a; 这时就可以用foreach标签&#xff1a; collection"ids" &#xff1a; 接口上传过来的数值或list集合或者map集合都可以 item"id&…

区块链技术:供应链金融的革新者与引领者

一、引言 在供应链金融领域,区块链技术以其独特的去中心化、不可篡改、透明公开等特性,正在逐步成为该领域的革新者与引领者。本文将深入探讨区块链技术在供应链金融中的应用特点、功能、使用场景,并结合具体案例和技术方案,展现其巨大的潜力和价值。 二、区块链在供应链金…

【动态规划-BM71 最长上升子序列(一)】

题目 BM71 最长上升子序列(一) 分析 dp[i] 考虑到下标i&#xff0c;其组成的最长上升子序列长度 可以用动态规划的原因&#xff1a; 到i的结果可以由到j &#xff08;j<i) 的结果推出&#xff0c;只需要判断下标j对应的数字是否比下标i 对应的字母小即可 注意&#xf…

Three.js——粒子效果、粒子水波、粒子组成立方体

个人简介 &#x1f440;个人主页&#xff1a; 前端杂货铺 ⚡开源项目&#xff1a; rich-vue3 &#xff08;基于 Vue3 TS Pinia Element Plus Spring全家桶 MySQL&#xff09; &#x1f64b;‍♂️学习方向&#xff1a; 主攻前端方向&#xff0c;正逐渐往全干发展 &#x1…