《RabbitMQ篇》交换机基本概览

news2024/10/9 11:38:04

生产者都是把消息给交换机,由交换机分发给消息队列。

routingKey:路由键,也可称为绑定,是交换机和队列之间的桥梁,交换机会根据routingKey来把消息转发到对应的队列。

Fanout

不处理路由键。你只需要简单的将队列绑定到交换机上。一个发送到交换机的消息都会被转发到与该交换机绑定的所有队列上。

通用工具类:

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class RabbitMQUtil {
    public static Channel getChannel() throws Exception {
        //得到工厂
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        factory.setPort(5672);
        factory.setUsername("guest");
        factory.setPassword("guest");
        //生成连接
        Connection connection = factory.newConnection();
        //获取信道
        return connection.createChannel();
    }
}

生产者代码:创建一个Fanout Exchange。

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;

import java.util.Scanner;

public class FanoutProducer {

    private final static String EXCHANGE_NAME = "LogExchange";

    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMQUtil.getChannel();

        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);

        Scanner scanner = new Scanner(System.in);
        while (scanner.hasNext()) {
            String message = scanner.nextLine();
            channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());
        }
    }
}

两个消费者代码:创建队列去绑定交换机

public class Consumer1 {
    private final static String FILE_QUEUE = "FileQueue";
    private final static String EXCHANGE_NAME = "LogExchange";

    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMQUtil.getChannel();
        channel.queueDeclare(FILE_QUEUE, false, false, false, null);
        channel.queueBind(FILE_QUEUE, EXCHANGE_NAME, "");

        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            System.out.printf("消息:%s,存储到磁盘%n", new String(delivery.getBody()));
        };
        channel.basicConsume(FILE_QUEUE, true, deliverCallback, consumerTag -> {});
    }
}

public class Consumer2 {
    private final static String CONSOLE_QUEUE = "ConsoleQueue";
    private final static String EXCHANGE_NAME = "LogExchange";

    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMQUtil.getChannel();

        channel.queueDeclare(CONSOLE_QUEUE, false, false, false, null);
        channel.queueBind(CONSOLE_QUEUE, EXCHANGE_NAME, "");

        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            System.out.printf("消息:%s,输出到控制台%n", new String(delivery.getBody()));
        };
        channel.basicConsume(CONSOLE_QUEUE, true, deliverCallback, consumerTag -> {});
    }
}

运行结果图如下。总结:在Fanout模式下,生产者生产消息发布到交换机后,交换机会把消息发送给所有与之绑定的队列,在该模式下,即使设置了routingKey也会无视它。

Direct

处理路由键。需要将一个队列绑定到交换机上,要求该消息与一个特定的路由键完全匹配。这是一个完整的匹配。

生产者代码:创建一个Direct交换机,发送消息到交换机同时指定routingKey。

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;

import java.util.Scanner;

public class DirectProducer {
    private final static String EXCHANGE_NAME = "LogExchange-Direct";

    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMQUtil.getChannel();

        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);

        Scanner scanner = new Scanner(System.in);
        while (scanner.hasNext()) {
            String message = scanner.nextLine();
            channel.basicPublish(EXCHANGE_NAME, "consoleRouteKey", null, message.getBytes());
        }
    }
}

消费者代码:让队列和交换机绑定,同时设置routingKey。

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;

public class Consumer1 {
    private final static String FILE_QUEUE = "FileQueue";
    private final static String EXCHANGE_NAME = "LogExchange-Direct";


    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMQUtil.getChannel();
        channel.queueDeclare(FILE_QUEUE, false, false, false, null);
        channel.queueBind(FILE_QUEUE, EXCHANGE_NAME, "fileRouteKey");

        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            System.out.printf("消息:%s,存储到磁盘%n", new String(delivery.getBody()));
        };
        channel.basicConsume(FILE_QUEUE, true, deliverCallback, consumerTag -> {});
    }
}
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;

public class Consumer2 {
    private final static String CONSOLE_QUEUE = "ConsoleQueue";
    private final static String EXCHANGE_NAME = "LogExchange-Direct";


    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMQUtil.getChannel();

        channel.queueDeclare(CONSOLE_QUEUE, false, false, false, null);
        channel.queueBind(CONSOLE_QUEUE, EXCHANGE_NAME, "consoleRouteKey");

        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            System.out.printf("消息:%s,输出到控制台%n", new String(delivery.getBody()));
        };
        channel.basicConsume(CONSOLE_QUEUE, true, deliverCallback, consumerTag -> {});
    }
}

执行结果如图,总结:在Direct模式下,交换机会根据routingKey把消息给相关的队列,如果没有这样的队列,消息会被丢弃。

Topic

给定的routingKey,与 交换机和队列之间设置的routingKey 根据模式匹配(类似正则)转发到对应的队列。

该模式下的routingKey的命名是一个单词列表,以点号分隔开。例如:“log.error.console”。其中可以用 *(星号)可以代替一个位置,#(井号)可以替代零个或多个位置。

比如:现在队列Q1和交换机之间设置的routingKey为 *.orange.*

队列Q2和交换机之间设置的routingKey为 *.*.rabbit*lazy.#

例如说明
quick.orange.rabbit被队列 Q1Q2 接收到
azy.orange.elephant被队列 Q1Q2 接收到
quick.orange.fox被队列 Q1 接收到
lazy.brown.fox被队列 Q2 接收到
lazy.pink.rabbit虽然满足两个绑定但只被队列 Q2 接收一次
quick.brown.fox不匹配任何绑定不会被任何队列接收到会被丢弃
quick.orange.male.rabbit是四个单词不匹配任何绑定会被丢弃
lazy.orange.male.rabbit是四个单词但匹配 Q2

实际测试代码

import com.rabbitmq.client.Channel;

import java.util.HashMap;
import java.util.Map;

public class TopicProducer {
    private final static String EXCHANGE_NAME = "LogExchange-Topic";

    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMQUtil.getChannel();

        HashMap<String, String> bindingKeyMap = new HashMap<>();
        bindingKeyMap.put("quick.orange.rabbit", "1.被队列 Q1Q2 接收到");
        bindingKeyMap.put("lazy.orange.elephant", "2.被队列 Q1Q2 接收到");
        bindingKeyMap.put("quick.orange.fox", "3.被队列 Q1 接收到");
        bindingKeyMap.put("lazy.brown.fox", "4.被队列 Q2 接收到");
        bindingKeyMap.put("lazy.pink.rabbit", "5.虽然满足两个绑定但只被队列 Q2 接收一次");
        bindingKeyMap.put("quick.brown.fox", "6.不匹配任何绑定不会被任何队列接收到会被丢弃");
        bindingKeyMap.put("quick.orange.male.rabbit", "7.是四个单词不匹配任何绑定会被丢弃");
        bindingKeyMap.put("lazy.orange.male.rabbit", "8.是四个单词但匹配 Q2");
        for (Map.Entry<String,String> bindingKeyEntry : bindingKeyMap.entrySet()){
            String routingKey = bindingKeyEntry.getKey();
            String message = bindingKeyEntry.getValue();
            channel.basicPublish(EXCHANGE_NAME,routingKey,null,message.getBytes("UTF-8"));
            System.out.println("生产者发出消息:"+message);
        }
    }
}
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;

public class Consumer1 {
    private final static String FILE_QUEUE = "FileQueue";
    private final static String EXCHANGE_NAME = "LogExchange-Topic";


    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMQUtil.getChannel();
        channel.queueDeclare(FILE_QUEUE, false, false, false, null);
        channel.queueBind(FILE_QUEUE, EXCHANGE_NAME, "*.orange.*");

        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            System.out.printf("消息:%s,存储到磁盘%n", new String(delivery.getBody()));
        };
        channel.basicConsume(FILE_QUEUE, true, deliverCallback, consumerTag -> {});
    }
}

public class Consumer2 {
    private final static String CONSOLE_QUEUE = "ConsoleQueue";
    private final static String EXCHANGE_NAME = "LogExchange-Topic";


    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMQUtil.getChannel();

        channel.queueDeclare(CONSOLE_QUEUE, false, false, false, null);
        channel.queueBind(CONSOLE_QUEUE, EXCHANGE_NAME, "*.*.rabbit");
        channel.queueBind(CONSOLE_QUEUE, EXCHANGE_NAME, "lazy.#");

        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            System.out.printf("消息:%s,输出到控制台%n", new String(delivery.getBody()));
        };
        channel.basicConsume(CONSOLE_QUEUE, true, deliverCallback, consumerTag -> {});
    }
}

Header

不处理路由键。而是根据发送的消息内容中的headers属性进行匹配。在绑定 Queue 与 Exchange 时指定一组键值对;当消息发送到RabbitMQ 时会取到该消息的 headers 与 Exchange 绑定时指定的键值对进行匹配;如果完全匹配则消息会路由到该队列,否则不会路由到该队列。headers 属性是一个键值对,可以是 Hashtable,键值对的值可以是任何类型。而 fanout,direct,topic 的路由键都需要要字符串形式的。

匹配规则 x-match 有下列两种类型:

x-match = all :表示所有的键值对都匹配才能接受到消息

x-match = any :表示只要有键值对匹配就能接受到消息

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2198911.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【业务场景】最全的购物车设计与实现

前言 博主最近在做一个购物商城&#xff0c;正好设计到购物车模块&#xff0c;于是乎全面的来聊一聊购物车模块实现的一些核心要点吧&#xff0c;很值得反复品味的设计&#xff0c;当需要实现购物车的时候&#xff0c;本文应该拿来就能用。 目录 1.需要解决的核心问题清单 2…

下一代电源管理:Modern Standby与S3睡眠的对比

Modern Standby与S3睡眠的对比 一、引言二、Modern Standby概述三、S3睡眠模式概述四、Modern Standby与S3睡眠的差异五、实际应用和适用场景六、测试Modern Standby的性能6.1、PowerCfg命令行工具6.2、Windows Performance Toolkit 七、总结 一、引言 电源管理在现代计算设备…

Midjourney中文版:解锁你的创意之旅

在创意与技术的交汇点&#xff0c;Midjourney中文版正等待着每一位热爱艺术、渴望表达的灵魂。这不仅仅是一款AI绘画工具&#xff0c;更是一个激发无限灵感、让创意自由翱翔的奇妙平台。 Midjourney AI超强绘画 (原生态系统&#xff09;用户端&#xff1a;Ai Loadinghttps://w…

Linux操作系统——软件包的管理(实验报告)

实验——软件安装的基本操作 一、实验目的 熟悉软件安装流程&#xff0c;掌握java的安装流程&#xff0c;熟悉相关命令的操作。 二、实验环境 硬件&#xff1a;PC电脑一台&#xff0c;网络正常&#xff1b; 配置&#xff1a;win10系统&#xff0c;内存大于8G 硬盘500G及以上…

机器学习实战27-基于双向长短期记忆网络 BiLSTM 的黄金价格模型研究

大家好&#xff0c;我是微学AI&#xff0c;今天给大家介绍一下机器学习实战27-基于双向长短期记忆网络 BiLSTM 的黄金价格模型研究。本文针对黄金价格预测问题&#xff0c;展开基于改造后的长短期记忆网络BiLSTM的黄金价格模型研究。文章首先介绍了项目背景&#xff0c;随后详细…

LSTM的变体

一、GRU 1、什么是GRU 门控循环单元&#xff08;GRU&#xff09;是一种循环神经网络&#xff08;RNN&#xff09;的变体&#xff0c;它通过引入门控机制来控制信息的流动&#xff0c;从而有效地解决了传统RNN中的梯度消失问题。GRU由Cho等人在2014年提出&#xff0c;它简化了…

判断回文 python

题目&#xff1a; 输入一个四位数&#xff0c;判断该数是否为回文数&#xff0c;回文数是指正序&#xff08;从左向右&#xff09;和倒序&#xff08;从右向左&#xff09;读都是一样的整数&#xff0c;比如1221。 代码法1&#xff1a; ninput() nint(n) if n<1000 or n&g…

微积分复习笔记 Calculus Volume 1 - 2.2 The Limit of a Function

2.2 The Limit of a Function - Calculus Volume 1 | OpenStax

中控自动化测试实战和实车智能驾驶业务解析

一.中控自动化测试流程及环境搭建 1.中控自动化测试流程 2.中控自动化测试环境的搭建 1.JDK环境配置 安装 Java安装包.生成java\bin jre\bin JAVA_HOME: java目录 c:\java path:%JAVA_HOME%\bin jre\bin 为了后面appium server GUI客户端中的环境配置 2.SDK 配置 pal…

怎么编辑图片?这5款工具教你快速编辑

怎么编辑图片&#xff1f;编辑图片是一项既具创意又实用的技能&#xff0c;它不仅能够提升图片的视觉效果&#xff0c;增强信息的传达力&#xff0c;还能激发无限的创作灵感。通过编辑图片&#xff0c;我们可以轻松调整色彩、添加文字、裁剪构图&#xff0c;甚至创造出令人惊叹…

Oxygen Forensic Detective 17.0 发布,新增功能概览

Oxygen Forensic Detective 17.0 发布&#xff0c;新增功能概览 Oxygen Forensic Detective Windows 17 Multilingual - 领先的一体化数字取证软件 digital forensic software 请访问原文链接&#xff1a;https://sysin.org/blog/oxygen-forensic-detective/&#xff0c;查看…

【学习笔记】SquareLine Studio安装教程(LVGL官方工具)

一.简介与导航&#xff1a; SquareLine Studio是由LVGL官方开发的一款UI设计工具&#xff0c;采用图形化进行界面UI设计&#xff0c;轻易上手。 SquareLine Studio官方网址&#xff1a;https://squareline.io/SquareLine Studio官方文档&#xff1a;https://docs.squareline.io…

车牌检测系统源码分享

车牌检测系统源码分享 [一条龙教学YOLOV8标注好的数据集一键训练_70全套改进创新点发刊_Web前端展示] 1.研究背景与意义 项目参考AAAI Association for the Advancement of Artificial Intelligence 项目来源AACV Association for the Advancement of Computer Vision 研究…

3、Docker搭建MQTT及Spring Boot 3.x集成MQTT

一、前言 本篇主要是围绕着两个点&#xff0c;1、Docker 搭建单机版本 MQTT&#xff08;EMQX&#xff09;&#xff0c;2、Spring Boot 3.x 集成 MQTT&#xff08;EMQX&#xff09;&#xff1b; 而且这里的 MQTT&#xff08;EMQX&#xff09;的搭建也只是一个简单的过程&#x…

为什么现在的大学生很难真正学好LabVIEW编程?

学习LabVIEW编程对大学生来说可能存在以下挑战&#xff1a; 学习曲线陡峭&#xff1a;尽管LabVIEW提供直观的图形化编程环境&#xff0c;便于初学者入门&#xff0c;但要深入掌握其高级功能和复杂应用&#xff0c;仍需要投入大量时间和精力。随着学习的深入&#xff0c;概念和应…

CAN与CANFD的区别

CAN概念&#xff1a; CAN&#xff0c;全称为Controller Area Network&#xff0c;即控制器局域网络&#xff0c;是一种用于汽车电子系统中的串行通信协议。它由德国电气工程师协会&#xff08;Bosch&#xff09;在1983年开发&#xff0c;并在1986年正式推出。CAN协议主要用于汽…

牛客:Holding Two,Inverse Pair,Counting Triangles

Holding Two 题目描述 登录—专业IT笔试面试备考平台_牛客网 ​​运行代码 #include<bits/stdc.h> using namespace std; const int N3e45; string s1,s2; int main(){int n,m;cin>>n>>m;for(int i0;i<m;i){if(i&1){s10;s21;} else{s11;s20;} }fo…

架构师:Spring Cloud Gateway 的技术指南

1、简述 Spring Cloud Gateway 是 Spring Cloud 生态系统中的一个重要组件,作为微服务架构的 API 网关,它为路由、限流、安全、监控等功能提供了全面支持。相比传统的 Zuul 网关,Spring Cloud Gateway 使用了非阻塞的 WebFlux 框架,性能上有了显著提升,并且提供了更现代化…

BLE MESH学习2——自定义MESH网络架构思考

BLE MESH学习2——自定义MESH网络架构思考 基于对WCH CH582这款单片机的了解&#xff0c;其可以实现mesh配网、朋友节点、低功耗节点和中继节点的角色&#xff0c;基本功能无问题。在此基础上&#xff0c;考虑满足IoT需求的MESH架构设计&#xff0c;作为后续设计的“白皮书”。…

构建流媒体管道:利用 Docker 部署 Nginx-RTMP 从 FFmpeg RTMP 推流到 HLS 播放的完整流程

最近要实现一个类似导播台的功能&#xff0c;于是我先用 FFmpeg 实现一个参考对照的 Demo&#xff0c;我将其整理为一篇文章&#xff0c;方便后续大家或者和自己参考&#xff01; 1、软件工具介绍 本次部署相关软件 / 工具如下&#xff1a; FFmpeg&#xff1a;全称是 Fast Fo…