RabbitMQ-交换机

news2025/1/8 4:31:39

文章目录

    • 交换机
      • fanout
      • Direct
      • topic
      • Headers
      • RPC

交换机

**交换机 **是消息队列中的一个组件,其作用类似于网络路由器。它负责将我们发送的消息转发到相应的目标,就像快递站将快递发送到对应的站点,或者网络路由器将网络请求转发到相应的服务器或客户端一样。交换机的主要功能是提供转发消息的能力,根据消息的路由规则将消息投递到合适的队列或绑定的消费者。
我们可以理解为,如果说一个快递站已经承受不了那么多的快递了,就建多个快递站。

fanout

扇出,广播
特点:消息会被转发到所有绑定到该交换机的队列
场景:很适用于发布订阅的场景,比如写日志,可以多个系统间共享

示例场景:
image.png
生产者代码:

package com.yupi.springbootinit.mq;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.util.Scanner;

public class FanoutProducer {
  // 交换机名字
  private static final String EXCHANGE_NAME = "fanout-exchange";

  public static void main(String[] argv) throws Exception {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    try (Connection connection = factory.newConnection();
         Channel channel = connection.createChannel()) {
        // 创建交换机
        channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
        Scanner scanner = new Scanner(System.in);
        while (scanner.hasNext()) {
            String message = scanner.nextLine();
            channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("UTF-8"));
            System.out.println(" [x] Sent '" + message + "'");
        }
    }
  }
}

消费者代码:

package com.yupi.springbootinit.mq;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;

public class FanoutConsumer {
  //交换机名字
  private static final String EXCHANGE_NAME = "fanout-exchange";

  public static void main(String[] argv) throws Exception {
    //建立连接
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    Connection connection = factory.newConnection();
    //创建频道
    Channel channel1 = connection.createChannel();
    // 声明交换机
    channel1.exchangeDeclare(EXCHANGE_NAME, "fanout");
    // 创建队列1,连接到交换机上
    String queueName = "xiaowang_queue";
    channel1.queueDeclare(queueName, true, false, false, null);
    channel1.queueBind(queueName, EXCHANGE_NAME, "");
    // 创建队列2,连接到交换机上
    String queueName2 = "xiaoli_queue";
    channel1.queueDeclare(queueName2, true, false, false, null);
    channel1.queueBind(queueName2, EXCHANGE_NAME, "");
    System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
    // 创建交付回调函数1
    DeliverCallback deliverCallback1 = (consumerTag, delivery) -> {
        String message = new String(delivery.getBody(), "UTF-8");
        System.out.println(" [小王] Received '" + message + "'");
    };
    // 创建交付回调函数2
    DeliverCallback deliverCallback2 = (consumerTag, delivery) -> {
      String message = new String(delivery.getBody(), "UTF-8");
      System.out.println(" [小李] Received '" + message + "'");
    };
    // 开始消费消息队列1
    channel1.basicConsume(queueName, true, deliverCallback1, consumerTag -> { });
    // 开始消费消息队列2
    channel1.basicConsume(queueName2, true, deliverCallback2, consumerTag -> { });
  }
}

Direct

官方教程:https://www.rabbitmq.com/tutorials/tutorial-four-java.html
特点:消息会根据路由键转发到指定的队列
场景:特定的消息只交给特定的系统(程序)来处理

注意:不同队列可以绑定相同的路由键

示例场景:
老板在发送消息同时会带上路由键,根据路由键找对应的队列来发送
image.png
生产者代码:

package com.yupi.springbootinit.mq;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.util.Scanner;

public class DirectProducer {

  private static final String EXCHANGE_NAME = "direct-exchange";

  public static void main(String[] argv) throws Exception {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    try (Connection connection = factory.newConnection();
         Channel channel = connection.createChannel()) {
        //声明交换机是direct
        channel.exchangeDeclare(EXCHANGE_NAME, "direct");
        //输入消息 和 路由键
        Scanner scanner = new Scanner(System.in);
        while (scanner.hasNext()) {
            String userInput = scanner.nextLine();
            String[] strings = userInput.split(" ");
            if (strings.length < 1) {
                continue;
            }
            String message = strings[0];
            String routingKey = strings[1];
            //发布消息的时候注意指定路由键
            channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes("UTF-8"));
            System.out.println(" [x] Sent '" + message + " with routing:" + routingKey + "'");
        }

    }
  }
}

消费者代码:

package com.yupi.springbootinit.mq;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;

public class DirectConsumer {

    private static final String EXCHANGE_NAME = "direct-exchange";

    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        //声明交换机,不过生成者已经声明过了,消费者声不声明都可以
        channel.exchangeDeclare(EXCHANGE_NAME, "direct");

        // 创建队列
        String queueName = "xiaoyu_queue";
        channel.queueDeclare(queueName, true, false, false, null);
        channel.queueBind(queueName, EXCHANGE_NAME, "xiaoyu"); //指定2交换机和路由键

        // 创建队列,随机分配一个队列名称
        String queueName2 = "xiaopi_queue";
        channel.queueDeclare(queueName2, true, false, false, null);
        channel.queueBind(queueName2, EXCHANGE_NAME, "xiaopi");

        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

        DeliverCallback xiaoyuDeliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println(" [xiaoyu] Received '" +
                    delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");
        };

        DeliverCallback xiaopiDeliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println(" [xiaopi] Received '" +
                    delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");
        };

        channel.basicConsume(queueName, true, xiaoyuDeliverCallback, consumerTag -> {
        });
        channel.basicConsume(queueName2, true, xiaopiDeliverCallback, consumerTag -> {
        });
    }
}

topic

官方教程:https://www.rabbitmq.com/tutorials/tutorial-five-java.html
特点:消息会根据一个模糊的路由键转发到指定的队列
场景:特定的一类消息可以交给特定的一类系统(程序)来处理
规则:

  1. :匹配一个单词,比如.orange,那么 abc.orange、ikun.orange 都能匹配
  2. #:匹配0个或多个单词,比如orange.#,那么orange,orange.abc.ikun都能匹配


应用场景:
老板要下发一个任务,让多个组来处理
image.png
生产者代码:

package com.yupi.springbootinit.mq;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.util.Scanner;

public class TopicProducer {

  private static final String EXCHANGE_NAME = "topic-exchange";

  public static void main(String[] argv) throws Exception {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    try (Connection connection = factory.newConnection();
         Channel channel = connection.createChannel()) {

        channel.exchangeDeclare(EXCHANGE_NAME, "topic");

        Scanner scanner = new Scanner(System.in);
        while (scanner.hasNext()) {
            String userInput = scanner.nextLine();
            String[] strings = userInput.split(" ");
            if (strings.length < 1) {
                continue;
            }
            String message = strings[0];
            String routingKey = strings[1];

            channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes("UTF-8"));
            System.out.println(" [x] Sent '" + message + " with routing:" + routingKey + "'");
        }
    }
  }
}

消费者代码:

package com.yupi.springbootinit.mq;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;

public class TopicConsumer {

  private static final String EXCHANGE_NAME = "topic-exchange";

  public static void main(String[] argv) throws Exception {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();

    channel.exchangeDeclare(EXCHANGE_NAME, "topic");

      // 创建队列
      String queueName = "frontend_queue";
      channel.queueDeclare(queueName, true, false, false, null);
      channel.queueBind(queueName, EXCHANGE_NAME, "#.前端.#");

      // 创建队列
      String queueName2 = "backend_queue";
      channel.queueDeclare(queueName2, true, false, false, null);
      channel.queueBind(queueName2, EXCHANGE_NAME, "#.后端.#");

      // 创建队列
      String queueName3 = "product_queue";
      channel.queueDeclare(queueName3, true, false, false, null);
      channel.queueBind(queueName3, EXCHANGE_NAME, "#.产品.#");

      System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

      DeliverCallback xiaoaDeliverCallback = (consumerTag, delivery) -> {
          String message = new String(delivery.getBody(), "UTF-8");
          System.out.println(" [xiaoa] Received '" +
                  delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");
      };

      DeliverCallback xiaobDeliverCallback = (consumerTag, delivery) -> {
          String message = new String(delivery.getBody(), "UTF-8");
          System.out.println(" [xiaob] Received '" +
                  delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");
      };

      DeliverCallback xiaocDeliverCallback = (consumerTag, delivery) -> {
          String message = new String(delivery.getBody(), "UTF-8");
          System.out.println(" [xiaoc] Received '" +
                  delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");
      };

      channel.basicConsume(queueName, true, xiaoaDeliverCallback, consumerTag -> {
      });
      channel.basicConsume(queueName2, true, xiaobDeliverCallback, consumerTag -> {
      });
      channel.basicConsume(queueName3, true, xiaocDeliverCallback, consumerTag -> {
      });
  }
}

这样生产者发消息:前端.后端
image.png
就可以匹配到前端和后端两个队列
image.png

Headers

可以根据headers中的内容来指定发送到哪个队列,由于性能差,比较复杂,一般不推荐使用

RPC

支持用消息队列来模拟RPC的调用,但是一般没必要,直接用 Dubbo、GRPC 等 RPC 框架就好了。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1601911.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

杀死那个名为360安全的软件

背景 2023年底&#xff0c;闲来没事想起了xjun师傅2021年发的procexp驱动利用帖子时在群里讨论的&#xff0c;通过procexp驱动突破PPL后注入到csrss进程中&#xff0c;再通过csrss来结束那些个安全防护软件。于是在当时就有了如下成果&#xff1a; 这些弄完之后&#xff0c;觉…

康谋技术 | 深入探讨:自动驾驶中的相机标定技术

随着自动驾驶技术的快速发展&#xff0c;多传感器的数据采集和融合可以显著提高系统的冗余度和容错性&#xff0c;进而保证决策的快速性和正确性。在项目开发迭代过程中&#xff0c;传感器标定扮演着至关重要的角色&#xff0c;它位于数据采集平台与感知融合算法之间&#xff0…

关于外网后端服务访问内网minio中间件,因连接minio超时,启动失败问题

注&#xff1a;服务器情况&#xff1a;2台服务器&#xff0c;内网服务器包含&#xff08;activemq、minio、nginx、redis、mysql、后端java服务&#xff09;。外网服务器只有后端java服务&#xff0c;访问内网的中间件&#xff08;内网服务器开放了部分指定端口&#xff09; 问…

01_FreeRTOS移植

FreeRTOS移植 FreeRTOS移植FreeRTOS移植-中断文件修改 FreeRTOS移植 内核移植时用到的文件 复制工程模板&#xff0c;不需要的源文件全部删除 复制到工程文件中 放到这里 注释掉这两个重复定义的中断 替换掉工程中的裸机延时 FreeRTOS移植-中断文件修改

React脚手架的搭建与使用

React脚手架是开发现代Web应用的必备&#xff0c;其充分利用Webpack、Babel、ESlint等工具辅助项目的开发&#xff0c;当然这些工具也无需手动配置即可使用&#xff0c;脚手架的意义更多的是关注的是业务而不是工具的配置&#xff1b;项目的整体技术架构为&#xff1a;react w…

OpenHarmony实战开发-如何通过分割swiper区域,实现指示器导航点位于swiper下方的效果。

介绍 本示例介绍通过分割swiper区域&#xff0c;实现指示器导航点位于swiper下方的效果。 效果预览图 使用说明 1.加载完成后swiper指示器导航点&#xff0c;位于显示内容下方。 实现思路 1.将swiper区域分割为两块区域&#xff0c;上方为内容区域&#xff0c;下方为空白区…

git clone报错:error invalid path ‘dorisdockerthirdpartiesdocker-composexxxx‘

git clone报错&#xff1a;error: invalid path ‘doris/docker/thirdparties/docker-compose/xxxx’ 在周日晚上&#xff0c;我尝试从GitHub上克隆Doris的代码库&#xff0c;以便进行学习。在使用IntelliJ IDEA进行克隆时&#xff0c;我遇到了一个Git错误。具体操作如下&…

selenium反反爬虫,隐藏selenium特征

一、stealth.min.js 使用 用selenium爬网页时&#xff0c;常常碰到被检测到selenium &#xff0c;会被服务器直接判定为非法访问&#xff0c;这个时候就可以用stealth.min.js 来隐藏selenium特征&#xff0c;达到绕过检测的目的 from selenium import webdriver from seleniu…

KDTree索引(K近邻搜索,半径R内近邻搜索)——PCL

K近邻搜索&#xff08;K Nearest Neighbors&#xff09; K近邻搜索是一种基于点数量的搜索方法&#xff0c;它会找到指定点附近最接近的K个邻居点。K近邻搜索中的K值是一个参数&#xff0c;您需要指定要搜索的邻居数量。该方法适用于需要查找固定数量邻居点的情况&#xff0c;…

2W 3KVDC 隔离 稳压单输出 DC/DC 电源模块——TPK-SAR 系列介绍

TPK-SAR系列产品是专门针对PCB上分布式电源系统中需要与输入电源隔离且输出精度要求较高的电源应用场合而设计。该产品适用于&#xff1b;1&#xff09;输入电源的电压变化≤5%&#xff1b;2&#xff09;输入输出之前要求隔离电压≥3000VDC&#xff1b;3&#xff09;对输出电压…

使用两台主机实现博客的搭建

1.运行环境 这里的主机IP是自己虚拟器的IP。 主机主机名系统服务192.168.179.128Server-WebLinuxWeb192.168.179.129Server-NFSDNSLinuxNFS/DNS 2.基础配置 1.配置主机名&#xff0c;静态IP地址 2.开启防火墙并配置 3.部分开启SElinux并配置 4.服务器之间使用同ntp.aliyun.com…

正整数分解-第12届蓝桥杯省赛Python真题精选

[导读]&#xff1a;超平老师的Scratch蓝桥杯真题解读系列在推出之后&#xff0c;受到了广大老师和家长的好评&#xff0c;非常感谢各位的认可和厚爱。作为回馈&#xff0c;超平老师计划推出《Python蓝桥杯真题解析100讲》&#xff0c;这是解读系列的第54讲。 正整数分解&#…

Golang 开发实战day11 - Pass By Value

&#x1f3c6;个人专栏 &#x1f93a; leetcode &#x1f9d7; Leetcode Prime &#x1f3c7; Golang20天教程 &#x1f6b4;‍♂️ Java问题收集园地 &#x1f334; 成长感悟 欢迎大家观看&#xff0c;不执着于追求顶峰&#xff0c;只享受探索过程 Golang 开发实战day11 - 按值…

如何阻止访问您的网站

本周有一个客户&#xff0c;购买Hostease的HK Basic Linux虚拟主机&#xff0c;询问我们的在线客服&#xff0c;如何阻止部分地区或IP段访问他的网站。我们为用户提供教程&#xff0c;用户很快完成了设置。在此&#xff0c;我们分享这个操作教程&#xff0c;希望可以对您有帮助…

管理 nodejs 版本工具 nvm

nvm 方便切换不同版本的 node 及 对应的 npm 版本 一、安装nvm nvm官网 &#xff08;内含下载的文件&#xff0c;点击进去下载&#xff0c;并按照 网站文档步骤 操作即可&#xff09; 二、nvm 基础命令 nvm arch&#xff1a;显示node是运行在32位还是64位。nvm install <…

网上订餐系统|基于springboot的网上订餐系统设计与实现(源码+数据库+文档)

网上订餐系统目录 目录 基于springboot的网上订餐系统设计与实现 一、前言 二、系统功能设计 三、系统实现 1、用户功能模块的实现 &#xff08;1&#xff09;用户注册界面 &#xff08;2&#xff09;用户登录界面 &#xff08;3&#xff09;菜品详情界面 &#xff08…

随动系统同步性问题(跟随给定和跟随反馈的区别)

1、运动控制比例随动 运动控制比例随动系统_正运动随动系统-CSDN博客文章浏览阅读1.4k次,点赞2次,收藏5次。PLC如何测量采集编码器的位置数据,不清楚的可以参看我的另一篇博文:三菱FX3U PLC高速计数器应用(附代码)_RXXW_Dor的博客-CSDN博客本文主要以三菱FX3U系列的高速…

Groovy程序设计-【第一部分Groovy起步】-02-面向Java开发者的Groovy

前言&#xff1a; 知识点记录来源于【Groovy程序设计】一书中&#xff0c;本文仅作知识点记录供日后使用查询&#xff0c;不做教程使用。 groovy支持java语法&#xff0c;并且保留了java的语义&#xff0c;所以我们可以随心所欲的混用两种语言。 1.从Java到Groovy 先看一个…

vr兽医设备操作模拟仿真教学平台提升教学效果

在兽医教育的传统领域中&#xff0c;动物诊疗一直是一项不可或缺的实践环节。然而&#xff0c;传统的解剖教学方式受限于动物数量、种类以及安全隐患&#xff0c;无法充分满足学生的学习需求。随着VR虚拟仿真技术的不断精进&#xff0c;VR动物诊疗仿真实训系统为兽医教育带来了…

数字藏品app开发

数字藏品是指使用区块链技术&#xff0c;对应特定的作品、艺术品生成的唯一数字凭证。在保护其数字版权的基础上&#xff0c;数字藏品实现了真实可信的数字化发行、购买、收藏和使用。数字藏品是数字出版物的一种新形态&#xff0c;具有唯一的IP数字身份和所有权信息&#xff0…