交换机-Exchanges

news2025/1/10 21:15:25

交换机

Exchanges 概念

RabbitMQ 消息传递模型的核心思想是: 生产者生产的消息从不会直接发送到队列。实际上,通常生产者甚至都不知道这些消息传递传递到了哪些队列中。相反,生产者只能将消息发送到交换机(exchange),交换机工作的内容非常简单,一方面它接收来自生产者的消息,另一方面将它们推入队列。交换机必须确切知道如何处理收到的消息。是应该把这些消息放到特定队列还是说把他们到许多队列中还是说应该丢弃它们。这就的由交换机的类型来决定。

Exchanges 的类型

  • 直接:direct 路由模式
  • 主题:topic
  • 标题:headers(不常用)
  • 扇出:fanout 广播模式,发布订阅模式
无名exchange

第一个参数是交换机的名称,空字符串表示默认或无名称交换机:消息能路由发送到队列中其实是由 routingKey(bindingkey)绑定 key 指定的,如果它存在的话

channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
绑定bindings

binding 其实是 exchange 和 queue 之间的桥梁,它告诉我们 exchange 和那个队列进行了绑定关系。

在这里插入图片描述

Fanout模式介绍

Fanout 这种类型非常简单。正如从名称中猜到的那样,它是将接收到的所有消息广播到它知道的所有队列中。
在这里插入图片描述

代码示例
package com.vmware.rabbit.demo5;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.vmware.rabbit.utils.RabbitUtil;

import java.util.Scanner;

public class Producer {
    private static final String EXCHANGE_NAME = "log";

    public static void main(String[] args) throws Exception{
        Connection connection = RabbitUtil.getConnection();
        Channel channel = connection.createChannel();
        //声明扇出类型交换机
        channel.exchangeDeclare(EXCHANGE_NAME,"fanout");
        //向交换机发送消息
        Scanner scanner = new Scanner(System.in);
        while (scanner.hasNext()){
            String message = scanner.next();
            channel.basicPublish(EXCHANGE_NAME,"",null,message.getBytes());
            System.out.printf("消息:%s发送成功!",message);
        }
    }
}
package com.vmware.rabbit.demo5;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DeliverCallback;
import com.vmware.rabbit.utils.RabbitUtil;

public class Consumer1 {
    private static final String EXCHANGE_NAME = "log";

    public static void main(String[] args) throws Exception{
        Connection connection = RabbitUtil.getConnection();
        Channel channel = connection.createChannel();
        String queueName = channel.queueDeclare().getQueue();
        channel.queueBind(queueName,EXCHANGE_NAME,"");

        DeliverCallback deliverCallback = (consumerTag, message) -> {
            String msg = new String(message.getBody());
            System.out.println(msg);
        };

        channel.basicConsume(queueName,true,deliverCallback,(arg1,arg2)->{});
    }
}
package com.vmware.rabbit.demo5;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DeliverCallback;
import com.vmware.rabbit.utils.RabbitUtil;

public class Consumer2 {
    private static final String EXCHANGE_NAME = "log";

    public static void main(String[] args) throws Exception{
        Connection connection = RabbitUtil.getConnection();
        Channel channel = connection.createChannel();
        String queueName = channel.queueDeclare().getQueue();
        channel.queueBind(queueName,EXCHANGE_NAME,"");

        DeliverCallback deliverCallback = (consumerTag, message) -> {
            String msg = new String(message.getBody());
            System.out.println(msg);
        };

        channel.basicConsume(queueName,true,deliverCallback,(arg1,arg2)->{});
    }
}

Direct模式介绍

Fanout 这种交换类型并不能给我们带来很大的灵活性-它只能进行无意识的广播,在这里我们将使用 direct(直接) 这种类型来进行替换,这种类型的工作方式是,消息只去到它绑定的routingKey 队列中去
在这里插入图片描述

  • 在上面这张图中,我们可以看到 X 绑定了两个队列,绑定类型是 direct。队列Q1 绑定键为 orange,队列 Q2 绑定键有两个:一个绑定键为 black,另一个绑定键为 green
  • 在这种绑定情况下,生产者发布消息到 exchange 上,绑定键为 orange 的消息会被发布到队列Q1。绑定键为 black/green 和的消息会被发布到队列 Q2,其他消息类型的消息将被丢弃
多重绑定

在这里插入图片描述

如果 exchange 的绑定类型是direct,但是它绑定的多个队列的 key 如果都相同,在这种情况下虽然绑定类型是 direct 但是它表现的就和 fanout 有点类似了,就跟广播差不多

代码实战
package com.vmware.rabbit.demo6;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.util.Scanner;
import java.util.UUID;

public class Producer {
    private static final String EXCHANGE_NAME = "log";

    public static void main(String[] args) throws Exception{
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.31.232");
        factory.setUsername("admin");
        factory.setPassword("admin");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        //声明路由模式交换机
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
        //向交换机发送消息
        Scanner scanner=new Scanner(System.in);
        while (scanner.hasNext()){
            String message = UUID.randomUUID().toString();
            String routingKey = scanner.next();
            channel.basicPublish(EXCHANGE_NAME,routingKey,null,message.getBytes());
            System.out.printf("发送消息:%s成功!RoutingKey:%s\n",message,routingKey);
        }
    }
}
package com.vmware.rabbit.demo6;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.vmware.rabbit.utils.RabbitUtil;

public class Consumer1 {
    private static final String EXCHANGE_NAME = "log";

    public static void main(String[] args) throws Exception{
        Connection connection = RabbitUtil.getConnection();
        Channel channel = connection.createChannel();
        //声明队列
        String queueName = channel.queueDeclare().getQueue();
        channel.queueBind(queueName,EXCHANGE_NAME,"error");
        channel.basicConsume(queueName,(tag,msg)-> System.out.println(new String(msg.getBody())),(tag,msg)->{});
    }
}
package com.vmware.rabbit.demo6;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.vmware.rabbit.utils.RabbitUtil;

public class Consumer2 {
    private static final String EXCHANGE_NAME = "log";

    public static void main(String[] args) throws Exception{
        Connection connection = RabbitUtil.getConnection();
        Channel channel = connection.createChannel();
        //声明队列
        String queueName = channel.queueDeclare().getQueue();
        channel.queueBind(queueName,EXCHANGE_NAME,"info");
        channel.queueBind(queueName,EXCHANGE_NAME,"warn");
        channel.basicConsume(queueName,(tag,msg)-> System.out.println(new String(msg.getBody())),(tag,msg)->{});
    }
}

Topics主题模式介绍

存在的问题:尽管使用direct 交换机改进了我们的系统,但是它仍然存在局限性-比方说我们想接收的日志类型有info.base 和 info.advantage,某个队列只想 info.base 的消息,那这个时候direct 就办不到了。这个时候就只能使用 topic 类型

Topic 的要求

发送到类型是 topic 交换机的消息的 routing_key 不能随意写,必须满足一定的要求,它必须是一个单词列表,以点号分隔开。这些单词可以是任意单词,比如说:“stock.usd.nyse”, “nyse.vmw”,“quick.orange.rabbit”.这种类型的。当然这个单词列表最多不能超过 255 个字节

  • *星号可以代替一个单词

  • #井号可以替代零个或多个单词
    在这里插入图片描述

  • 当一个队列绑定键是#那么这个队列将接收所有数据,就有点像fanout了

  • 如果队列绑定键当中没有#和*出现,那么该队列绑定类型就是direct了

代码实现
package com.vmware.rabbit.demo7;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.vmware.rabbit.utils.RabbitUtil;

import java.util.HashMap;
import java.util.Map;

public class Producer {
    private static final String EXCHANGE_NAME = "topic_logs";

    public static void main(String[] args) throws Exception{
        Connection connection = RabbitUtil.getConnection();
        System.out.println("连接RabbitMQ服务器成功!");
        Channel channel = connection.createChannel();
        //声明主题模式交换机
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
        System.out.println("交换机创建成功!");
        Thread.sleep(15*1000);
        //发布消息
        HashMap<String,String> msgMap = new HashMap<>();
        msgMap.put("quick.orange.rabbit","被队列 Q1Q2 接收到");
        msgMap.put("lazy.orange.elephant","被队列 Q1Q2 接收到");
        msgMap.put("quick.orange.fox","被队列 Q1 接收到");
        msgMap.put("lazy.brown.fox","被队列 Q2 接收到");
        msgMap.put("lazy.pink.rabbit","虽然满足两个绑定但只被队列 Q2 接收一次");
        msgMap.put("quick.brown.fox","不匹配任何绑定不会被任何队列接收到会被丢弃");
        msgMap.put("quick.orange.male.rabbit","是四个单词不匹配任何绑定会被丢弃");
        msgMap.put("lazy.orange.male.rabbit","是四个单词但匹配 Q2");
        for (Map.Entry<String, String> entry : msgMap.entrySet()) {
            channel.basicPublish(EXCHANGE_NAME,entry.getKey(),null,entry.getValue().getBytes("UTF-8"));
        }
    }
}
package com.vmware.rabbit.demo7;

import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DeliverCallback;
import com.vmware.rabbit.utils.RabbitUtil;

public class Consumer1 {
    private static final String EXCHANGE_NAME = "topic_logs";
    private static final String QUEUE_NAME = "Q1";

    public static void main(String[] args) throws Exception {
        Connection connection = RabbitUtil.getConnection();
        Channel channel = connection.createChannel();
        //声明队列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        //绑定交换机和队列
        channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"*.orange.*");

        DeliverCallback deliverCallback = (tag,msg)->{
            String message = new String(msg.getBody());
            System.out.println(message+"\tRouting Key:"+msg.getEnvelope().getRoutingKey());
        };

        CancelCallback cancelCallback = (tag)->{

        };
        channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);
    }
}
package com.vmware.rabbit.demo7;

import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DeliverCallback;
import com.vmware.rabbit.utils.RabbitUtil;

public class Consumer2 {
    private static final String EXCHANGE_NAME = "topic_logs";
    private static final String QUEUE_NAME = "Q2";

    public static void main(String[] args) throws Exception {
        Connection connection = RabbitUtil.getConnection();
        Channel channel = connection.createChannel();
        //声明队列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        //绑定交换机和队列
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "*.*.rabbit");
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "lazy.#");

        DeliverCallback deliverCallback = (tag,msg)->{
            String message = new String(msg.getBody());
            System.out.println(message+"Routing Key:"+msg.getEnvelope().getRoutingKey());
        };

        CancelCallback cancelCallback=(tag)->{

        };
        channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/484364.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Threejs进阶之十:让模型移动到鼠标点击的指定位置

上一节中我们实现了物体沿指定轨迹移动的动画效果&#xff0c;这一节我们来实现让模型移动到鼠标点击的制定位置的动画效果。 先看下实现后的最终效果 要实现上面的动画效果&#xff0c;我们需要通过以下步骤来实现 第一步&#xff0c;监听鼠标事件 我们需要监听鼠标的点击事…

2 ROS2话题通讯基础(2)

ROS2话题通讯基础 2.5 自定义话题通讯2.5.1 自定义话题通讯的一般流程2.5.2 创建自定义话题消息简单例子2.5.3 快速创建C/C和Python自定义话题通讯的Student.msg 2.6 使用C/C实现自定义话题通讯2.6.1 创建C/C自定义话题发布方功能包并编写节点文件2.6.2 配置C/C自定义话题发布方…

如何选择适合企业的网盘?必须要考虑这几个方面

随着云存储技术的发展&#xff0c;传统的文件存储服务已逐渐不能满足企业日益增长的文件应用、共享和存储需求。越来越多的企业开始将目光转移到企业网盘上。 在选择企业网盘工具时&#xff0c;比较重要的有两个方面&#xff0c;一个是数据的安全性&#xff0c;一个是协同办公。…

Java 将增加虚拟线程,挑战 Go 协程

Java19 正式发布&#xff0c;带来了一个 Java 开发者垂涎已久的新特性 —— 虚拟线程。在 Java 有这个新特性之前&#xff0c;Go 语言的协程风靡已久&#xff0c;在并发编程领域可以说是叱咤风云。随着国内 Go 语言的快速发展与推广&#xff0c;协程好像成为了一个世界上最好语…

K8S二进制单节点 一键部署K8S_V1.21.x

1、安装前注意事项 安装shell脚本在文章最后位置 1、提前配置静态IP 把脚本的IP 192.168.1.31 换成你的IP 2、创建安装包路径 /home/software/shell 所有的tar包 shell脚本 放在这里 3、免密登录配置所有节点 提前下载镜像如下&#xff1a; [rootmaster01 ~]# docker image…

Ubuntu搜狗输入法安装指南

Ubuntu搜狗输入法安装指南 Ubuntu搜狗输入法安装指南搜狗输入法已支持Ubuntu1604、1804、1910、2004、2010Ubuntu20.04及以上安装搜狗输入法步骤 Ubuntu搜狗输入法安装指南 下载地址&#xff1a;https://shurufa.sogou.com/ 计算为amd64的选择x86_64&#xff0c;以下教程来源…

ORBBEC(奥比中光)AstraPro相机在ROS2下的标定与D2C(配准与对齐)

文章目录 1.rgb、depth相机标定矫正1.1.标定rgb相机1.2.标定depth相机1.3.rgb、depth相机一起标定&#xff08;效果重复了&#xff09;1.4.取得标定结果1.4.1.得到的标定结果的意义&#xff1a;1.5.IR、RGB相机分别应用标定结果1.5.1.openCV应用标定结果1.5.2.ros2工程应用标定…

[stable-diffusion-art] 指北-2 如何为sd提出好的prompt

https://stable-diffusion-art.com/how-to-come-up-with-good-prompts-for-ai-image-generation/https://stable-diffusion-art.com/how-to-come-up-with-good-prompts-for-ai-image-generation/1.prompt可以促使模型生成以前不存在的高质量的图片&#xff0c;例如&#xff1a;…

windows如何使用脚本打开多个软件

文章目录 windows如何使用脚本打开多个软件问题缘由省流版本制作脚本步骤新建文本找到文件的安装位置方法一&#xff1a;方法二&#xff1a; 总结 windows如何使用脚本打开多个软件 问题缘由 因为强迫症&#xff0c;不想让软件自启&#xff0c;会导致开机变慢&#xff0c;电脑…

Lecture7 处理多维特征的输入(Multiple Dimension Input)

以实际代码出发&#xff0c;逐行讲解。 完整代码&#xff1a; import numpy as np import torch import matplotlib.pyplot as plt# load data xy np.loadtxt(C:\\Users\\14185\\Desktop\\diabetes.csv, delimiter,, dtypenp.float32) x_data torch.from_numpy(xy[:, :-1])…

226. 翻转二叉树【58】

难度等级&#xff1a;容易 上一篇算法&#xff1a; 543. 二叉树的直径【71】 力扣此题地址&#xff1a; 226. 翻转二叉树 - 力扣&#xff08;Leetcode&#xff09; 1.题目&#xff1a;226. 翻转二叉树 给你一棵二叉树的根节点 root &#xff0c;翻转这棵二叉树&#xff0c;并返…

DAY 52 LVS+Keepalived群集

Keepalived工具介绍 普通集群容易出现的问题 企业应用中&#xff0c;单台服务器承担应用存在单点故障的危险。 单点故障一旦发生&#xff0c; 企业服务将发生中断&#xff0c;造成极大的危害。 Keepalived工具 Keepalived 是一个基于VRRP协议来实现的LVS服务高可用方案&…

v2c - 从Verilog 转换到 C语言的工具

文章目录 一、如何安装1.下载二进制文件2.基准测试 二、如何使用v2c的应用描述工具流程使用 v2c 转换器的工作示例 三、注意事项情形一&#xff1a;拼接&#xff1a;{4{x}}情形1-1 y&{x&#xff0c;x&#xff0c;x&#xff0c;x}情形1-2 y&{x&#xff0c;x&#xff0c;…

【C++】string 类的实现

目录 构造函数赋值重载关于浅拷贝 迭代器容量相关reserveresize 修改push_backappendinserterase关于npos 流运算符重载流插入流提取 构造函数 无参数构造和传参构造 通过对参数设置缺省值为空串""同时满足无参构造和传参构造成员 _size 和 _capacity 均是针对有效…

自动驾驶—连续系统LQR最优控制的黎卡提方程推导

1. Why use the Riccati equation? 最优控制算法LQR是Linear Quadratic Regulator的缩写,Q、R就是需要设计的半正定矩阵和正定矩阵。考虑根据实车的情况去标定此参数,从理论和工程层面去理解,如果增大Q、减小R,则此时控制系统响应速度比较快速(比较剧烈),直观反映方向…

5月1日 9H45min|5.2 8H20min+30min|时间轴复盘

8:00 起床 8:00-8:30 洗漱吃饭 8:30-10:40 temporary pools阅读真题精读 (真的很慢了 不知道什么原因 感觉也没有彻底完全弄懂)【2h+10min】 10:40-11:10 午餐+酸奶(423+174KJ) 11:20-12:30 三篇阅读【1h+10min】 13:10-14:50 健身 14:50-15:45诵默写list…

Ae:画笔工具

画笔工具 Brush Tool 快捷键&#xff1a;Ctrl B 画笔工具 Brush Tool仅能工作在图层 Layer面板上。 双击纯色图层、像素图层等可打开图层面板。 在 Ae 中的每次画笔绘制都将新建一条路径&#xff0c;然后通过对路径的描边来显示绘制结果&#xff0c;故又称为“绘画描边”或“…

函数-实现交换两个变量的内容

用函数实现交换两个变量的内容&#xff0c;对于该问题我们该如何实现呢&#xff1f;在这里我就用整型变量来说明。 题目&#xff1a;写一个函数可以交换两个整形变量的内容。 我们先来看看如下代码&#xff1a; #include <stdio.h> void swap(int x, int y) {int tem…

Android进阶之光:Dagger2原理简要分析

Dagger2注入框架原理简要分析 使用Dagger2需要的依赖: implementation com.google.dagger:dagger-android:2.46 implementation com.google.dagger:dagger-android-support:2.46 annotationProcessor com.google.dagger:dagger-android-processor:2.46 annotationProcessor c…

第二十七章 碰撞体Collision(下)

本章节我们继续研究碰撞体&#xff0c;并且探索一下碰撞体与刚体之间的联系。我们回到之前的工程&#xff0c;然后给我们的紫色球体Sphere1也添加一个刚体组件。如下所示 此时&#xff0c;两个球体都具备了碰撞体和刚体组件。接下来&#xff0c;我们Play运行查看效果 我们发现&…