RabbitMQ工作模式-主题模式

news2025/1/22 17:46:42

主题模式

官方文档参考:https://www.rabbitmq.com/tutorials/tutorial-five-python.html

使用topic类型的交换器,队列绑定到交换器、bingingKey时使用通配符,交换器将消息路由转发到具体队列时,会根据消息routingKey模糊匹配,比较灵活。

在Direct类型的交换器做到了根据日志级别的不同,将消息发送给了不同队列的。

这里再加入一个需求,不仅想根据日志级别进行划分,还想根据日志的来源分日志,如何来做呢?

使用topic类型的交换器, routingKey就不能随便写了,它必须是点分单词,单词可以随便写,一般按消息的特征,该点分单词字符串最长255字节。

bindingKey也必须是这种形式。top类型的交换器背后原理跟direct类型类似只要队列的bingingkey的值与消息的routingKey的匹配,队列就可以收到该消息。有两个不同

  1. * (star)匹配一个单词。
  2. # 匹配0到多个单词。

在这里插入图片描述

上报的数据的RoutingKey,格式如下

地区.业务.日志级别 如shanghai.busi.INFO 、 hangzhou.line.ERROR

生产者

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.nio.charset.StandardCharsets;
import java.util.concurrent.ThreadLocalRandom;

public class Product {

  private static final String[] ADDRESS_ARRAYS = {"shanghai", "suzhou", "hangzhou"};

  private static final String[] BUSI_NAMES = {"product", "user", "schedule"};

  private static final String[] LOG_LEVEL = {"ERROR", "WARN", "INFO"};

  public static void main(String[] args) throws Exception {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setUri("amqp://root:123456@node1:5672/%2f");

    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();

    // 定义交换机
    channel.exchangeDeclare(
        "ex.busi.topic",
        BuiltinExchangeType.TOPIC,
        // 持久化标识
        false,
        // 是否自动删除
        false,
        // 属性信息
        null);

    for (int i = 0; i < 50; i++) {

      String level = LOG_LEVEL[ThreadLocalRandom.current().nextInt(0, LOG_LEVEL.length)];
      String busiName = BUSI_NAMES[ThreadLocalRandom.current().nextInt(0, BUSI_NAMES.length)];
      String address =
          ADDRESS_ARRAYS[ThreadLocalRandom.current().nextInt(0, ADDRESS_ARRAYS.length)];
      String routingKey = address + "." + busiName + "." + level;

      String pushMsg = "地址[" + address + "],业务[" + busiName + "],级别[" + level + "],消息:" + i;

      channel.basicPublish(
          "ex.busi.topic", routingKey, null, pushMsg.getBytes(StandardCharsets.UTF_8));
    }
  }
}

上海的消费者

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.ThreadLocalRandom;

/**
 * 上海地区的消费都,获取所有的上海信息
 */
public class ShangHaiConsumer {

  public static void main(String[] args) throws Exception {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setUri("amqp://root:123456@node1:5672/%2f");

    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();

    // 定义交换机
    channel.exchangeDeclare(
        "ex.busi.topic",
        BuiltinExchangeType.TOPIC,
        // 持久化标识
        false,
        // 是否自动删除
        false,
        // 属性信息
        null);

    // 定义队列
    channel.queueDeclare(
        "shanghai.all.log",
        // 持久化存储
        true,
        // 排他
        false,
        // 自动删除
        true,
        // 属性
        null);

    // 将队列与交换机进行绑定
    channel.queueBind("shanghai.all.log", "ex.busi.topic", "shanghai.#", null);

    channel.basicConsume(
        "shanghai.all.log",
        (consumerTag, message) -> {
          String dataMsg = new String(message.getBody(), StandardCharsets.UTF_8);
          System.out.println("shanghai consumer 收到数据:" + dataMsg);
        },
        consumerTag -> {});
  }
}

所有错误日志的消费者

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.nio.charset.StandardCharsets;

public class ErrorLogConsumer {

  public static void main(String[] args) throws Exception {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setUri("amqp://root:123456@node1:5672/%2f");

    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();

    // 定义交换机
    channel.exchangeDeclare(
        "ex.busi.topic",
        BuiltinExchangeType.TOPIC,
        // 持久化标识
        false,
        // 是否自动删除
        false,
        // 属性信息
        null);

    // 定义队列
    channel.queueDeclare(
        "log.all.error",
        // 持久化存储
        true,
        // 排他
        false,
        // 自动删除
        true,
        // 属性
        null);

    // 将队列与交换机进行绑定
    channel.queueBind("log.all.error", "ex.busi.topic", "#.ERROR", null);

    channel.basicConsume(
        "log.all.error",
        (consumerTag, message) -> {
          String dataMsg = new String(message.getBody(), StandardCharsets.UTF_8);
          System.out.println("错误日志 consumer 收到数据:" + dataMsg);
        },
        consumerTag -> {});
  }
}

苏州用户的消费者

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.nio.charset.StandardCharsets;

public class SuZhouUserConsumer {

  public static void main(String[] args) throws Exception {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setUri("amqp://root:123456@node1:5672/%2f");

    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();

    // 定义交换机
    channel.exchangeDeclare(
        "ex.busi.topic",
        BuiltinExchangeType.TOPIC,
        // 持久化标识
        false,
        // 是否自动删除
        false,
        // 属性信息
        null);

    // 定义队列
    channel.queueDeclare(
        "suzhou.user.consumer",
        // 持久化存储
        true,
        // 排他
        false,
        // 自动删除
        true,
        // 属性
        null);

    // 将队列与交换机进行绑定
    channel.queueBind("suzhou.user.consumer", "ex.busi.topic", "suzhou.user.*", null);

    channel.basicConsume(
        "suzhou.user.consumer",
        (consumerTag, message) -> {
          String dataMsg = new String(message.getBody(), StandardCharsets.UTF_8);
          System.out.println("suzhou consumer 收到数据:" + dataMsg);
        },
        consumerTag -> {});
  }
}

首先启动三个消费者,查看队列和交换器的信息

[root@nullnull-os ~]# rabbitmqctl list_exchanges --formatter pretty_table
Listing exchanges for vhost / ...
┌────────────────────┬─────────┐
│ name               │ type    │
├────────────────────┼─────────┤
│ amq.fanout         │ fanout  │
├────────────────────┼─────────┤
│ ex.busi.topic      │ topic   │
├────────────────────┼─────────┤
│ amq.rabbitmq.trace │ topic   │
├────────────────────┼─────────┤
│ amq.headers        │ headers │
├────────────────────┼─────────┤
│ amq.topic          │ topic   │
├────────────────────┼─────────┤
│ amq.direct         │ direct  │
├────────────────────┼─────────┤
│                    │ direct  │
├────────────────────┼─────────┤
│ ex.routing         │ direct  │
├────────────────────┼─────────┤
│ amq.match          │ headers │
└────────────────────┴─────────┘
[root@nullnull-os ~]# rabbitmqctl list_bindings --formatter pretty_table
Listing bindings for vhost /...
┌───────────────┬─────────────┬──────────────────────┬──────────────────┬──────────────────────┬───────────┐
│ source_name   │ source_kind │ destination_name     │ destination_kind │ routing_key          │ arguments │
├───────────────┼─────────────┼──────────────────────┼──────────────────┼──────────────────────┼───────────┤
│               │ exchange    │ suzhou.user.consumer │ queue            │ suzhou.user.consumer │           │
├───────────────┼─────────────┼──────────────────────┼──────────────────┼──────────────────────┼───────────┤
│               │ exchange    │ shanghai.all.log     │ queue            │ shanghai.all.log     │           │
├───────────────┼─────────────┼──────────────────────┼──────────────────┼──────────────────────┼───────────┤
│               │ exchange    │ log.all.error        │ queue            │ log.all.error        │           │
├───────────────┼─────────────┼──────────────────────┼──────────────────┼──────────────────────┼───────────┤
│ ex.busi.topic │ exchange    │ log.all.error        │ queue            │ #.ERROR              │           │
├───────────────┼─────────────┼──────────────────────┼──────────────────┼──────────────────────┼───────────┤
│ ex.busi.topic │ exchange    │ shanghai.all.log     │ queue            │ shanghai.#           │           │
├───────────────┼─────────────┼──────────────────────┼──────────────────┼──────────────────────┼───────────┤
│ ex.busi.topic │ exchange    │ suzhou.user.consumer │ queue            │ suzhou.user.*        │           │
└───────────────┴─────────────┴──────────────────────┴──────────────────┴──────────────────────┴───────────┘
[root@nullnull-os ~]# 

观察可以发现,此队列与消息的绑定已经成功。接下使用生产者发送消息。观察控制台输出:

错误日志消费者

错误日志 consumer 收到数据:地址[suzhou],业务[schedule],级别[ERROR],消息:6
错误日志 consumer 收到数据:地址[suzhou],业务[product],级别[ERROR],消息:8
错误日志 consumer 收到数据:地址[shanghai],业务[product],级别[ERROR],消息:10
错误日志 consumer 收到数据:地址[shanghai],业务[schedule],级别[ERROR],消息:12
错误日志 consumer 收到数据:地址[suzhou],业务[schedule],级别[ERROR],消息:15
错误日志 consumer 收到数据:地址[hangzhou],业务[user],级别[ERROR],消息:16
错误日志 consumer 收到数据:地址[shanghai],业务[schedule],级别[ERROR],消息:17
错误日志 consumer 收到数据:地址[shanghai],业务[product],级别[ERROR],消息:18
错误日志 consumer 收到数据:地址[hangzhou],业务[user],级别[ERROR],消息:21
错误日志 consumer 收到数据:地址[shanghai],业务[product],级别[ERROR],消息:22
错误日志 consumer 收到数据:地址[shanghai],业务[product],级别[ERROR],消息:24
错误日志 consumer 收到数据:地址[hangzhou],业务[product],级别[ERROR],消息:28
错误日志 consumer 收到数据:地址[suzhou],业务[schedule],级别[ERROR],消息:33
错误日志 consumer 收到数据:地址[hangzhou],业务[schedule],级别[ERROR],消息:39
错误日志 consumer 收到数据:地址[suzhou],业务[user],级别[ERROR],消息:40
错误日志 consumer 收到数据:地址[suzhou],业务[user],级别[ERROR],消息:43
错误日志 consumer 收到数据:地址[shanghai],业务[schedule],级别[ERROR],消息:46

上海地区的消费者

shanghai consumer 收到数据:地址[shanghai],业务[schedule],级别[WARN],消息:0
shanghai consumer 收到数据:地址[shanghai],业务[user],级别[INFO],消息:1
shanghai consumer 收到数据:地址[shanghai],业务[schedule],级别[INFO],消息:2
shanghai consumer 收到数据:地址[shanghai],业务[schedule],级别[WARN],消息:5
shanghai consumer 收到数据:地址[shanghai],业务[product],级别[ERROR],消息:10
shanghai consumer 收到数据:地址[shanghai],业务[schedule],级别[ERROR],消息:12
shanghai consumer 收到数据:地址[shanghai],业务[schedule],级别[ERROR],消息:17
shanghai consumer 收到数据:地址[shanghai],业务[product],级别[ERROR],消息:18
shanghai consumer 收到数据:地址[shanghai],业务[product],级别[ERROR],消息:22
shanghai consumer 收到数据:地址[shanghai],业务[product],级别[ERROR],消息:24
shanghai consumer 收到数据:地址[shanghai],业务[user],级别[INFO],消息:32
shanghai consumer 收到数据:地址[shanghai],业务[schedule],级别[INFO],消息:35
shanghai consumer 收到数据:地址[shanghai],业务[product],级别[INFO],消息:38
shanghai consumer 收到数据:地址[shanghai],业务[schedule],级别[WARN],消息:41
shanghai consumer 收到数据:地址[shanghai],业务[schedule],级别[ERROR],消息:46
shanghai consumer 收到数据:地址[shanghai],业务[user],级别[INFO],消息:48

苏州用户的消费者

suzhou consumer 收到数据:地址[suzhou],业务[user],级别[WARN],消息:37
suzhou consumer 收到数据:地址[suzhou],业务[user],级别[ERROR],消息:40
suzhou consumer 收到数据:地址[suzhou],业务[user],级别[ERROR],消息:43
suzhou consumer 收到数据:地址[suzhou],业务[user],级别[WARN],消息:45

至此topic模式操作成功。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/950326.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

[学习笔记] fhq Treap 平衡树

fhq Treap 也叫无旋Treap &#xff08;好像&#xff1f;我也不知道&#xff09; 反正我带旋 Treap 是不会滴&#xff0c;其他的平衡树也不会&#xff08;但是会平板电视&#xff09; fhq Treap 好写&#xff0c;码量小&#xff0c;缺点是常数比较大 定义 二叉搜索树 二叉搜…

为什么说模电难学?因为它至少是这27个基础知识的排列组合!

1、基尔1、基尔霍夫定理的内容是什么&#xff1f; 基尔霍夫电流定律&#xff1a;在电路任一节点&#xff0c;流入、流出该节点电流的代数和为零。 基尔霍夫电压定律&#xff1a;在电路中的任一闭合电路&#xff0c;电压的代数和为零。 2、戴维南定理 一个含独立源、线性电阻…

在抖音开店卖货的流程是什么?最全解答如下,建议新手认真看完!

我是王路飞。 同样是在抖音卖货&#xff0c;为何如今大多数人都是选择在抖音开店&#xff0c;而不再是选择做账号、开直播了呢&#xff1f; 原因很简单&#xff0c;因为门槛和变现方式。 相比短视频和直播带货的起号、变现难度越来越大&#xff0c;低门槛的抖音小店显然更适…

Origin热图的做法

1.数据准备 2.绘制-选择带标签热图 3.图表调整 右边标签- 下方标签-复制格式&#xff0c;再到左边或者右边选择 粘贴所有 图中的标签及颜色 直接双击在属性框更改 主框的其他特征在属性框选择 色阶的控制 直接选择色阶的属性框

Endnote中查看一个文献的分组的具体方法——以Endnote X8为例

Endnote中查看一个文献的分组的具体方法——以Endnote X8为例 一、问题 当Endnote中使用分类方法对文献进行分组管理后&#xff0c;有时需要重新调整该文献的分组&#xff0c;则需要找到这个文献在哪个分组中。本文阐述怎样寻找一个文献的分组的位置信息。 二、解决方法 1.选…

Spooling的原理

脱机技术 程序猿先用纸带机把自己的程序数据输入到磁带中&#xff0c;这个输入的过程是由一台专门的外围控制机实现的。之后CPU直接从快速的磁带中读取想要的这些输入数据。输出也类似。 假脱机技术&#xff08;Spooling技术&#xff09; 即用软件的方式来模拟脱机技术。要…

Kubernetes技术--k8s核心技术Controller控制器

1.Controller概述 Controller是在集群上管理和运行容器的对象。是一个实际存在的对象。 2.pod和Controller之间的关系 pod通过controller实现应用的运维,包括伸缩、滚动升级等操作。 这里pod和controller通过label标签来建立关系。如下所示: 3.Deployment控制器应用场景 -1:…

RabbitMQ工作模式-发布订阅模式

Publish/Subscribe&#xff08;发布订阅模式&#xff09; 官方文档&#xff1a; https://www.rabbitmq.com/tutorials/tutorial-three-python.html 使用fanout类型类型的交换器&#xff0c;routingKey忽略。每个消费者定义生成一个队列关绑定到同一个Exchange&#xff0c;每个…

win10底部任务栏开机后长时间未响应的解决办法

https://blog.csdn.net/hj960511/article/details/128746025?share_tokenAA088876-4477-44B8-B978-5C7C7726D552&tt_fromcopy_link&utm_sourcecopy_link&utm_mediumtoutiao_ios&utm_campaignclient_share win10底部任务栏开机后长时间未响应的解决办法-CSDN博…

亚马逊云科技re:Inforce大会:为企业提供端到端的安全防护能力

2023年&#xff0c;生成式AI带来了无数的创新&#xff0c;并将会在行业应用中产生更多的新能力、新场景。与此同时&#xff0c;关于生成式AI的风险管控成为各方关注焦点&#xff0c;数据隐私、合规保护、防欺诈等&#xff0c;已成为生成式AI时代的安全合规的新话题。 随着云上业…

(一)SpringBoot 整合WebSocket 前端 uniapp 访问

第一次使用WebSocket&#xff0c;所以最需要一个及其简单的例子&#xff0c;跑通之后&#xff0c;增加自己对该技术的理解。&#xff08;技术基础介绍就免掉了&#xff0c;后面再补&#xff09; 案例逻辑&#xff1a;目前只有一个用户&#xff0c;而且是一个用户给服务器发送数…

linux系统中详解u-boot之网络移植与调试

​今天给大家讲一讲如何完善u-boot网络部分的移植和调试。 一、前章回顾 上一章&#xff0c;已经讲过如何讲uboot.2022.10版本移植到我们自己的imx6ull开发板上&#xff0c;但是最后编译下载后网络部分未能正确识别&#xff0c;今天我们就来讲一讲网络部分的调试。 上一篇ub…

静力触探数据智能预处理(2)

静力触探数据智能预处理&#xff08;2&#xff09; 前言 数据处理方式已由手工1.0、计算机辅助2.0到人工智能3.0的趋势发展。现场采集的静力触探数据通常是由仪器厂家开发的数据采集软件保存&#xff0c;将原始数据导入Excel中&#xff0c;数据格式需要花费一定的时间整理&am…

Git管理本地代码

一、Git配置 当安装完 Git 应该做的第一件事就是设置你的用户名称与邮件地址。 这样做很重要&#xff0c;因为每一个 Git 的提交都会使用这些信息&#xff0c;并且它会写入到你的每一次提交中&#xff0c;不可更改 – global全局配置 通过 --global 选项可以设置全局配置信息 …

第6篇:ESP32连接无源喇叭播放音乐《涛声依旧》

第1篇:Arduino与ESP32开发板的安装方法 第2篇:ESP32 helloword第一个程序示范点亮板载LED 第3篇:vscode搭建esp32 arduino开发环境 第4篇:vscodeplatformio搭建esp32 arduino开发环境 第5篇:doit_esp32_devkit_v1使用pmw呼吸灯实验 D5连接喇叭正极&#xff0c;GND连接喇叭负…

推荐一本AI+医疗书:《机器学习和深度学习基础以及医学应用》,附21篇精选综述

当代医学仍然存在许多亟待解决的问题&#xff0c;比如日益增加的成本、医疗服务水平的下降...但近几年AI技术的发展却给医疗领域带来了革命性的变化&#xff0c;因此AI医疗迅速兴起。 从目前已知的成果来看&#xff0c;人工智能在医学领域的应用已经相当广泛&#xff0c;智能诊…

【pyqt5界面化工具开发-14】初始牛刀-登录工具

目录 0x00 前言&#xff1a; 一、准备好ui的加载 二、获取对应的触发事件 三、触发事件绑定 三、输入内容的调用 三、完善登录逻辑 0x00 前言&#xff1a; 在逻辑代码的处理添加数据包的请求&#xff0c;返回数据包的判断&#xff0c;就可以完整实现登录检测的一个界面化…

zookeeper介绍、zookeeper的安装与配置

1、zookeeper介绍 1.1 官网说明 官方地址&#xff1a;http://zookeeper.apache.org/ 它是拿来管理 Hadoop、Hive、Pig的管理员&#xff0c; Apache Hbase和Apache Solr以及阿里的Dubbo等项目中都采用到了Zookeeper。 一句话&#xff1a;ZooKeeper是一个分布式协调技术、高性…

基于Kohonen网络的聚类算法

1.案例背景 1.1 Kohonen网络 Kohonen网络是自组织竞争型神经网络的一种,该网络为无监督学习网络,能够识别环境特征并自动聚类。Kohonen神经网络是芬兰赫尔辛基大学教授Teuvo Kohonen 提出的,该网络通过自组织特征映射调整网络权值,使神经网络收敛于一种表示形态。在这一形态中…

OLAP学习

OLAP又叫联机分析处理&#xff0c;联机分析处理(OLAP)的概念最早是由关系数据库之父E.F.Codd于1993年提出的。 当今的数据处理大致可以分成两大类&#xff1a;联机事务处理OLTP&#xff08;on-linetransactionprocessing&#xff09;、联机分析处理OLAP&#xff08;On-LineAna…