RabbitMQ工作模式-路由模式

news2024/12/23 16:23:41

官方文档参考:https://www.rabbitmq.com/tutorials/tutorial-four-python.html
在这里插入图片描述

使用direct类型的Exchange,发N条消息并使用不同的routingKey,消费者定义队列并将队列routingKey、Exchange绑定。此时使用direct模式Exchange必须要routingKey完成匹配的情况下消息才会转发到对应的队列中被消费。

样例使用日志分发为样例。即按日志不同的级别,分发到不同的队列。每个队列只处理自己的对应的级别日志。

创建生产者

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.nio.charset.StandardCharsets;
import java.util.concurrent.ThreadLocalRandom;

public class Product {

  private static final String[] LOG_LEVEL = {"ERROR", "WARN", "INFO"};

  public static void main(String[] args) throws Exception {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setUri("amqp://root:123456@node1:5672/%2f");

    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();

    // 声明交换机,交换器和消息队列的绑定不需要在这里处理。
    channel.exchangeDeclare(
        "ex.routing",
        BuiltinExchangeType.DIRECT,
        // 持久的标识
        false,
        // 自动删除的标识
        false,
        // 属性
        null);

    for (int i = 0; i < 30; i++) {
      String level = LOG_LEVEL[ThreadLocalRandom.current().nextInt(0, LOG_LEVEL.length)];
      String dataMsg = "[" + level + "] 消息发送 :" + i;
      // 发送消息
      channel.basicPublish("ex.routing", level, null, dataMsg.getBytes(StandardCharsets.UTF_8));
    }
  }
}

创建ERROR的消费者

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.nio.charset.StandardCharsets;

public class ErrorConsumer {

  public static void main(String[] args) throws Exception {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setUri("amqp://root:123456@node1:5672/%2f");

    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();

    // 声明队列并绑定
    channel.exchangeDeclare(
        "ex.routing",
        BuiltinExchangeType.DIRECT,
        // 持久的标识
        false,
        // 自动删除的标识
        false,
        // 属性
        null);

    // 此也可以声明为临时队列,但是如果消息很重要,不要声明临时队列。
    channel.queueDeclare(
        "log.error",
        // 永久
        false,
        // 排他
        false,
        // 自动删除
        true,
        // 属性
        null);

    //消费者享有绑定到交换器的权力。
    channel.queueBind("log.error", "ex.routing", "ERROR");

    // 通过chanel消费消息
    channel.basicConsume(
        "log.error",
        (consumerTag, message) -> {
          System.out.println("ERROR收到的消息:" + new String(message.getBody(), StandardCharsets.UTF_8));
        },
        consumerTag -> {});
  }
}

创建INFO级的消费者

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.nio.charset.StandardCharsets;

public class InfoConsumer {

  public static void main(String[] args) throws Exception {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setUri("amqp://root:123456@node1:5672/%2f");

    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();

    // 声明队列并绑定
    channel.exchangeDeclare(
        "ex.routing",
        BuiltinExchangeType.DIRECT,
        // 持久的标识
        false,
        // 自动删除的标识
        true,
        // 属性
        null);

    // 此也可以声明为临时队列,但是如果消息很重要,不要声明临时队列。
    channel.queueDeclare(
        "log.info",
        // 永久
        false,
        // 排他
        false,
        // 自动删除
        false,
        // 属性
        null);

    //消费者享有绑定到交换器的权力。
    channel.queueBind("log.info", "ex.routing", "INFO");

    // 通过chanel消费消息
    channel.basicConsume(
        "log.info",
        (consumerTag, message) -> {
          System.out.println("INFO收到的消息:" + new String(message.getBody(), StandardCharsets.UTF_8));
        },
        consumerTag -> {});
  }
}

创建WARN级别的消息者

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.nio.charset.StandardCharsets;

public class WarnConsumer {

  public static void main(String[] args) throws Exception {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setUri("amqp://root:123456@node1:5672/%2f");

    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();

    // 声明队列并绑定
    channel.exchangeDeclare(
        "ex.routing",
        BuiltinExchangeType.DIRECT,
        // 持久的标识
        false,
        // 自动删除的标识
        false,
        // 属性
        null);

    // 此也可以声明为临时队列,但是如果消息很重要,不要声明临时队列。
    channel.queueDeclare(
        "log.warn",
        // 永久
        false,
        // 排他
        false,
        // 自动删除
        true,
        // 属性
        null);

    //消费者享有绑定到交换器的权力。
    channel.queueBind("log.warn", "ex.routing", "WARN");

    // 通过chanel消费消息
    channel.basicConsume(
        "log.warn",
        (consumerTag, message) -> {
          System.out.println("warn收到的消息:" + new String(message.getBody(), StandardCharsets.UTF_8));
        },
        consumerTag -> {});
  }
}

首先启动三个消费者:

查看队列及交换机情况

[root@nullnull-os ~]# rabbitmqctl list_exchanges --formatter pretty_table
Listing exchanges for vhost / ...
┌────────────────────┬─────────┐
│ name               │ type    │
├────────────────────┼─────────┤
│ amq.fanout         │ fanout  │
├────────────────────┼─────────┤
│ amq.rabbitmq.trace │ topic   │
├────────────────────┼─────────┤
│ amq.headers        │ headers │
├────────────────────┼─────────┤
│ amq.topic          │ topic   │
├────────────────────┼─────────┤
│ amq.direct         │ direct  │
├────────────────────┼─────────┤
│                    │ direct  │
├────────────────────┼─────────┤
│ ex.routing         │ direct  │
├────────────────────┼─────────┤
│ amq.match          │ headers │
└────────────────────┴─────────┘
[root@nullnull-os ~]# rabbitmqctl list_bindings --formatter pretty_table
Listing bindings for vhost /...
┌─────────────┬─────────────┬──────────────────┬──────────────────┬─────────────┬───────────┐
│ source_name │ source_kind │ destination_name │ destination_kind │ routing_key │ arguments │
├─────────────┼─────────────┼──────────────────┼──────────────────┼─────────────┼───────────┤
│             │ exchange    │ log.info         │ queue            │ log.info    │           │
├─────────────┼─────────────┼──────────────────┼──────────────────┼─────────────┼───────────┤
│             │ exchange    │ log.warn         │ queue            │ log.warn    │           │
├─────────────┼─────────────┼──────────────────┼──────────────────┼─────────────┼───────────┤
│             │ exchange    │ log.error        │ queue            │ log.error   │           │
├─────────────┼─────────────┼──────────────────┼──────────────────┼─────────────┼───────────┤
│ ex.routing  │ exchange    │ log.error        │ queue            │ ERROR       │           │
├─────────────┼─────────────┼──────────────────┼──────────────────┼─────────────┼───────────┤
│ ex.routing  │ exchange    │ log.info         │ queue            │ INFO        │           │
├─────────────┼─────────────┼──────────────────┼──────────────────┼─────────────┼───────────┤
│ ex.routing  │ exchange    │ log.warn         │ queue            │ WARN        │           │
└─────────────┴─────────────┴──────────────────┴──────────────────┴─────────────┴───────────┘
[root@nullnull-os ~]# 

可以发现,交换器ex.routing 绑定了三个队列log.errorlog.info log.warn并指定了路由键。

启动消费者,查看消息通否被正常消费。

ERROR的消费者控制台输出

ERROR收到的消息:[ERROR] 消息发送 :1
ERROR收到的消息:[ERROR] 消息发送 :2
ERROR收到的消息:[ERROR] 消息发送 :6
ERROR收到的消息:[ERROR] 消息发送 :8
ERROR收到的消息:[ERROR] 消息发送 :9
ERROR收到的消息:[ERROR] 消息发送 :11
ERROR收到的消息:[ERROR] 消息发送 :15
ERROR收到的消息:[ERROR] 消息发送 :16
ERROR收到的消息:[ERROR] 消息发送 :19
ERROR收到的消息:[ERROR] 消息发送 :20
ERROR收到的消息:[ERROR] 消息发送 :21
ERROR收到的消息:[ERROR] 消息发送 :23
ERROR收到的消息:[ERROR] 消息发送 :24
ERROR收到的消息:[ERROR] 消息发送 :27
ERROR收到的消息:[ERROR] 消息发送 :28

INFO的消费者控制台输出:

INFO收到的消息:[INFO] 消息发送 :0
INFO收到的消息:[INFO] 消息发送 :3
INFO收到的消息:[INFO] 消息发送 :4
INFO收到的消息:[INFO] 消息发送 :13
INFO收到的消息:[INFO] 消息发送 :14
INFO收到的消息:[INFO] 消息发送 :22
INFO收到的消息:[INFO] 消息发送 :25

WARN的消费都控制台输出:

warn收到的消息:[WARN] 消息发送 :5
warn收到的消息:[WARN] 消息发送 :7
warn收到的消息:[WARN] 消息发送 :10
warn收到的消息:[WARN] 消息发送 :12
warn收到的消息:[WARN] 消息发送 :17
warn收到的消息:[WARN] 消息发送 :18
warn收到的消息:[WARN] 消息发送 :26
warn收到的消息:[WARN] 消息发送 :29

至此,验证已经完成。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/950279.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Node.js /webpack DAY6

一、Node.js 入门 1. 什么是 Node.js&#xff1f; 2. 什么是前端工程化&#xff1f; 3. Node.js 为何能执行 JS&#xff1f; 4. Node.js 安装 5. 使用 Node.js 总结 6. fs 模块 - 读写文件 /*** 目标&#xff1a;基于 fs 模块 读写文件内容* 1. 加载 fs 模块对象* 2. 写入文件…

Java并发(十五)----synchronized解决共享的问题

为了避免临界区的竞态条件发生&#xff0c;有多种手段可以达到目的。 阻塞式的解决方案&#xff1a;synchronized&#xff0c;Lock 非阻塞式的解决方案&#xff1a;原子变量 此次介绍使用阻塞式的解决方案&#xff1a;synchronized&#xff0c;来解决上述问题&#xff0c;即…

Ubuntu入门03——Ubuntu用户操作

1.Ubuntu如何进入root用户 进入ROOT用户的指令&#xff1a; Linux用su命令来切换用户&#xff1a; su root执行命令后&#xff0c;会提示你输入密码&#xff0c;而Ubuntu是没有设置root初始密码的。 若su命令不能切换root&#xff0c;提示su: Authentication failure&#x…

弹窗、抽屉、页面跳转区别 | web交互入门

当用户点击或触发浏览页面的某个操作&#xff0c;有很多web交互方式&#xff0c;可以大致分为弹窗、抽屉、跳转新页面三种web交互方式。虽然这三种web交互方式看起来没什么不同&#xff0c;但实际上弹窗、抽屉、跳转新页面对交互体验有蛮大的影响。 这需要UI\UX设计师针对不同…

【拾枝杂谈】从游戏开发的角度来谈谈原神4.0更新

君兮_的个人主页 勤时当勉励 岁月不待人 C/C 游戏开发 Hello,米娜桑们&#xff0c;这里是君兮_&#xff0c;结合最近的学习内容和以后自己的目标&#xff0c;今天又开了杂谈这个新坑&#xff0c;分享一下我在学习游戏开发的成长和自己的游戏理解&#xff0c;当然现在还是一枚…

Kotlin入门1. 语法基础

Kotlin入门1. 语法基础 一、简介二、在Idea创建一个示例项目三、基本语法1. 第一个程序2. 基本数据类型(1) 数字(2) 类型转换(3) 数学运算位运算 &#xff08;4&#xff09;可空类型 3. 函数4. 字符串(1) 字符串拼接(2) 字符串查找(3) 字符串替换(4) 字符串分割 5. null 安全的…

系统架构师---系统规划

目录 前言&#xff1a; 项目的提出与选择 项目立项的目标和动机 进行基础研究并获取技术 进行应用研发并获得产品 提供技术服务 信息技术产品的使用者 项目的选择和确定 选择有核心价值的产品、项目或可开发方向 评估项目风险、收益和代价 评估项目的多种实施方式 平…

HDFS文件删除后,HIVE元数据还存在的问题

一.背景 手动在hdfs上删除了一个表的分区数据(inc_day2023-08-30)&#xff0c;当查询这个表这个分区的数据时报错文件不存在 二.原因 即HDFS数据删除了&#xff0c;但是hive metastore元数据却没有更新&#xff0c;使用show partitions tablename 发现该分区还存在 三.解决办法…

python 笔记(1)——基础和常用部分

目录 1、print 输出不换行 2、格式化输出字符串 3、浮点数的处理 4、进制转换和ASCII与字符间的转换 5、随机数 6、字符串截取和内置方法 6-1&#xff09;字符串截取 6-2&#xff09;字符串内置方法 7、元组、列表&#xff0c;及其遍历方式 7-1&#xff09;列表常用内…

前端开发学习路线

无前端基础学习路线&#xff1a; B站免费视频1 B站免费视频2 有HTML、CSS、JavaScript基础&#xff0c;可直接通过以上视频中Vue2Vue3中实战项目学习Vue。

【网络】多路转接——poll | epoll

&#x1f431;作者&#xff1a;一只大喵咪1201 &#x1f431;专栏&#xff1a;《网络》 &#x1f525;格言&#xff1a;你只管努力&#xff0c;剩下的交给时间&#xff01; 书接上文五种IO模型 | select。 poll | epoll &#x1f367;poll&#x1f9c1;认识接口&#x1f9c1;简…

<硬件设计> 阻抗设计(二) 使用Si9000计算阻抗

目录 01 阻抗相关参数 02 差分阻抗设计示例 确定阻抗线模型 明确对应差分线特征阻抗 获取板厂的板材、阻焊相关参数 使用Si9000软件通过调整线宽和线间距达到目标阻抗 03 文章总结 大家好&#xff0c;这里是程序员杰克。一名平平无奇的嵌入式软件工程师。 上篇已对阻抗…

【工具类】生成唯一编码

需求描述&#xff1a; 在一个项目中&#xff0c;很多的业务表都有一个唯一编码字段&#xff0c;如下&#xff1a; 同事把生成唯一编码字段的代码封装成了一个工具类&#xff0c;符合生成唯一编码规则的地方就可以使用啦&#xff01; 代码如下&#xff1a; Target(ElementTyp…

java定位问题工具

一、使用 JDK 自带工具查看 JVM 情况 在我的机器上运行 ls 命令&#xff0c;可以看到 JDK 8 提供了非常多的工具或程序&#xff1a; 接下来&#xff0c;我会与你介绍些常用的监控工具。你也可以先通过下面这张图了解下各种工具的基本作用&#xff1a; 为了测试这些工具&#x…

Java自定义捕获异常

需求分析 ElectricalCustomerVO electricalCustomerVO new ElectricalCustomerVO(); electricalCustomerVO.setElcNumber(chatRecordsLog.getDeviceNumber()); List<ElectricalCustomerVO> electricalCustomerlist electricalCustomerMapper.selectElectricalCustomer…

Understanding Black-box Predictions via Influence Functions阅读笔记

Understanding Black-box Predictions via Influence Functions阅读笔记 1.案例1----理解模型行为2.案例2----生成对抗训练样本3.案例3----调试域不匹配4.案例4----修正错误标注参考 1.案例1----理解模型行为 通过告诉我们对一个给定的预测“负责”的训练点&#xff0c;影响函数…

七大排序完整版

目录 一、直接插入排序 &#xff08;一&#xff09;单趟直接插入排 1.分析核心代码 2.完整代码 &#xff08;二&#xff09;全部直接插入排 1.分析核心代码 2.完整代码 &#xff08;三&#xff09;时间复杂度和空间复杂度 二、希尔排序 &#xff08;一&#xff09;对…

畅捷通T+用户中locked勒索病毒后该怎么办?勒索病毒解密数据恢复

Locked勒索病毒是一种近年来在全球范围内引起广泛关注的网络安全威胁程序。它是一种加密货币劫持病毒&#xff0c;专门用于加密用户的数据并要求其支付赎金。Locked勒索病毒通过攻击各种系统漏洞和网络薄弱环节&#xff0c;使用户计算机受到感染并被加密锁定时&#xff0c;无法…

Unity+讯飞星火大模型+Web api,实现二次元小姐姐AI聊天互动

1.简述 最近讯飞的星火大模型更新了2.0版本&#xff0c;增强了AI的语言生成能力。毕竟是国产大语言模型&#xff0c;我也尝试使用了一下星火大模型的应用广场&#xff0c;体验还是很不错的。应用广场提供了很多AI助手工具&#xff0c;也支持用户创建自己的AI助手&#xff0c;能…

算法基础第三章

算法基础第三章 1、dfs(深度搜索)1.1、 递归回溯1.2、递归剪枝&#xff08;剪枝就是判断接下来的递归都不会满足条件&#xff0c;直接回溯&#xff0c;不再继续往下无意义的递归&#xff09; 2、bfs(广度搜索)2.1、最优路径&#xff08;只适合于边权都相等的题&#xff09; 3、…