rabbitMQ(3)

news2024/12/29 8:53:05

RabbitMq 交换机

文章目录

  • 1. 交换机的介绍
  • 2. 交换机的类型
  • 3. 临时队列
  • 4. 绑定 (bindings)
  • 5. 扇形交换机(Fanout ) 演示
  • 6. 直接交换机 Direct exchange
    • 6.1 多重绑定
    • 6.2 direct 代码案例
  • 7. 主题交换机
    • 7.1 Topic 匹配案例
    • 7.2 Topic 代码案例
  • 8. headers 头交换机


前言:

在之前的文章中,我们一直都没有提到交换机,只是在介绍 rabbitmq 的时候 说过一嘴交换机 , 另外在 之前文章的代码案例中,我们其实

是使用到过交换机的 ,

比如:

channel.basicPublish("",QUEUE_NAME,null,message.getBytes());


这里第一个参数就是用来知名交换机的,但是我们用 “” 来作为交换机的名,rabbitmq 就会默认使用 Direct Exchange(直连交换机) , >

既然 之前我们没有讲到过 交换机,下面我们就来学习一下交换机.

1. 交换机的介绍

RabbitMQ 消息传递模型的核心思想是: 生产者生产的消息不会直接发送到队列。实际上,通常生产者甚至都不知道这些消息传递传递到了哪些队列中。

相反,生产者只能将消息发送到交换机(exchange),交换机工作的内容非常简单,一方面它接收来自生产者的消息,另一方面将它们推入队列。交换机必须确切知道如何处理收到的消息。是应该把这些消息放到特定队列还是说把他们到许多队列中还是说应该丢弃它们。这就的由交换机的类型来决定。

在这里插入图片描述


简单一句话: 交换机是用来将 生产者生产的消息 转发到对应的队列中.


简单看完交换机的介绍,下面我们来看看 rabbitmq 中交换机的几种类型 .

2. 交换机的类型


引用:

  1. Direct Exchange(直连交换机):根据消息的 routing key 将消息路由到与之完全匹配的队列。适用于一对一的消息传递。 比如 一个队列绑定到该交换机上 要求 routing key (路由键) 为 abc ,那么只有被标记为 abc 的消息才能被转发,不会转发 abc.def , 也不会转发 aaa.bbb.ccc 只会转发 abc.

  1. Fanout Exchange(扇形交换机):将消息广播到所有与该交换机绑定的队列。适用于一对多的消息广播。扇形交换机不处理路由键,我们只需要简单的将队列绑定到交换机上,一个发送到交换机的消息都会被转发到与该交换机绑定的所有队列上。很像子网广播,每台子网内的主机都获得了一份复制的消息。Fanout 交换机转发消息是最快的。

  1. Topic Exchange(主题交换机):根据消息的 routing key 和交换机与队列绑定时的 routing pattern 进行匹配,将消息路由到满足条件的队列。支持通配符匹配,比如 符号“#” 匹配一个或多个词,符号 * 匹配不多不少一个词。因此 abc.# 能够匹配到 abc.def.ghi,但是 abc.* 只会匹配到 abc.def。

  1. Headers Exchange(头交换机):根据消息的头部属性进行匹配,并将消息路由到满足条件的队列。

    解释: 不处理路由键。而是根据发送的消息内容中的headers属性进行匹配。在绑定 Queue 与 Exchange 时指定一组键值对;当消息发送
    到RabbitMQ 时会 取到该消息的 headers 与 Exchange 绑定时指定的键值对进行匹配;如果完全匹配则消息会路由到该队列,否则不会路由到该队列。headers 属性是一个键值对,可以是 Hashtable,键值对的值可以是任何类型。而 fanout,direct,topic 的路由键都需要要字符串形式的

    匹配规则 x-match 有下列两种类型:

    x-match = all :表示所有的键值对都匹配才能接受到消息

    x-match = any :表示只要有键值对匹配就能接受到消息
  2. Default Exchange(默认交换机):它是一个特殊的直连交换机(类型为 direct),当没有指定交换机时,默认的交换机会将消息根据消息的 routing key 发送到同名的队列上


看完了 交换机的类型,这里先来 学习一下 临时队列 之后在演示绑定交换机 会使用到.

3. 临时队列

在 之前的文章中 ,我们每次创建的队列都是具有特定的名字 比如 hello , ack_queue , 可以说 队列的名字对我们来说是非常重要的 ,因为我们要指定 消费者 去那个队列消费消息 . 但是我们 每次都要去想名字,有时候我们愿 想名字 (取名字啥的最烦了) .为了解决这个问题,我们就可以创建一个具有随机名称的队列 , 或者让服务器为我们选择一个随机对立的名称. 另外 如果 我们 想要 在使用完队列 后 断开连接 就删除 (一次性的队列) 就可以 通过下面这种方式来创建 临时队列.

String queueName = channel.queueDeclare().getQueue();


创建出来的队列:

在这里插入图片描述


知道了如何 创建临时队列,下面我们来看看 绑定 (bindings) ,这里是 使用 交换机 最重要的 一环节 , 如果 没有绑定 交换机 就无法绑定到 队列中,交换机就无法 把 消息 转发给 队列.

4. 绑定 (bindings)

什么是 绑定 呢,绑定 其实是 exchange 和 queue 之间的桥梁,它告诉我们 交换机 和 那个队列进行了绑定关系。

比如说下面这张图告诉我们的就是 X 与 Q1 和 Q2 进行了绑定

在这里插入图片描述

关于 临时队列 和 绑定 预备知识点看完后,我们就来 使用 交换机.

5. 扇形交换机(Fanout ) 演示

扇形交换机: 将消息广播到所有与该交换机绑定的队列。适用于一对多的消息广播。

演示: 这里我们创建一个简单的日志系统来完成代码样式 .

创建一个消费者 用来 生产消息 ,创建两个消费者 ,一个消费者 将接收到的消息 显示到 控制台,另外一个将消息 存储到 本地磁盘 .


大致:

在这里插入图片描述


代码案例:


消费者:

ReceiveLogs01

package org.example.five;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import org.example.utils.RabbitMQUtils;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

// 消费者1 --> 将消息显示到控制台
public class ReceiveLogs01 {

    // 交换机名字
    private static final String EXCHANGE_NAME = "logs";

    public static void main(String[] args) throws IOException, TimeoutException {
        Channel channel = RabbitMQUtils.getChannel();

        // 声明一个交换机 --> 扇形 fanout
        channel.exchangeDeclare(EXCHANGE_NAME, "fanout");

        // 声明一个队列 , 临时队列 --> 名字随机,当消费者与临死队列断开连接后 队列自动删除
        String queueName = channel.queueDeclare().getQueue();

        // 绑定交换机与队列 --> 第三个参数为路由键 因为使用的是扇形交换机 , 所以路由键可以不写 用 ""
        channel.queueBind(queueName, EXCHANGE_NAME, "");

        // 接受消息 -->  消费者接受消息时的回调
        DeliverCallback deliverCallback = (tag, message) -> {
            System.out.println("控制台打印接收到的消息:" + new String(message.getBody(), "UTF-8"));
        };
        channel.basicConsume(queueName, true, deliverCallback, tag -> {
        });
    }
}


ReceiveLogs02

package org.example.five;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import org.example.utils.RabbitMQUtils;

import java.io.*;
import java.net.URISyntaxException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.concurrent.TimeoutException;

// 消费者2 --> 将消息 写入本地
public class ReceiveLogs02 {

    // 交换机名字
    private static final String EXCHANGE_NAME = "logs";

    private static final String path = "E:\\java\\java_lx\\practice\\javaTest\\blog_rabbitmq\\src\\main\\java\\org\\example\\five";

    public static void main(String[] args) throws IOException, TimeoutException, URISyntaxException {
        Channel channel = RabbitMQUtils.getChannel();

        // 声明一个交换机 --> 扇形 fanout
        channel.exchangeDeclare(EXCHANGE_NAME, "fanout");

        // 声明一个队列 , 临时队列 --> 名字随机,当消费者与临死队列断开连接后 队列自动删除
        String queueName = channel.queueDeclare().getQueue();

        // 绑定交换机与队列 --> 第三个参数为路由键 因为使用的是扇形交换机 , 所以路由键可以不写 用 ""
        channel.queueBind(queueName, EXCHANGE_NAME, "");


        // 接受消息 -->  消费者接受消息时的回调
        DeliverCallback deliverCallback = (tag, message) -> {
            // 文件操作
            File file = new File(path + "/test.txt");

            // 文件不存在创建文件
            if (!file.exists()) {
                file.createNewFile();
            }

            // 使用 Files.newOutputStream 共创创建 outputStream 对象
            try (OutputStream outputStream = Files.newOutputStream(file.toPath());) {
                outputStream.write(message.getBody());
            } catch (IOException e) {
                throw new RuntimeException("写入文件时发生错误: " + e.getMessage());
            }

        };
        channel.basicConsume(queueName, true, deliverCallback, tag -> {
        });
    }
}


生产者: EmitLog

package org.example.five;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import org.example.utils.RabbitMQUtils;

import java.io.*;
import java.net.URISyntaxException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.StandardOpenOption;
import java.util.concurrent.TimeoutException;

// 消费者2 --> 将消息 写入本地
public class ReceiveLogs02 {

    // 交换机名字
    private static final String EXCHANGE_NAME = "logs";

    private static final String path = "E:\\java\\java_lx\\practice\\javaTest\\blog_rabbitmq\\src\\main\\java\\org\\example\\five";

    public static void main(String[] args) throws IOException, TimeoutException, URISyntaxException {
        Channel channel = RabbitMQUtils.getChannel();

        // 声明一个交换机 --> 扇形 fanout
        channel.exchangeDeclare(EXCHANGE_NAME, "fanout");

        // 声明一个队列 , 临时队列 --> 名字随机,当消费者与临死队列断开连接后 队列自动删除
        String queueName = channel.queueDeclare().getQueue();

        // 绑定交换机与队列 --> 第三个参数为路由键 因为使用的是扇形交换机 , 所以路由键可以不写 用 ""
        channel.queueBind(queueName, EXCHANGE_NAME, "");


        // 接受消息 -->  消费者接受消息时的回调
        DeliverCallback deliverCallback = (tag, message) -> {
            // 文件操作
            File file = new File(path + "/test.txt");

            // 文件不存在创建文件
            if (!file.exists()) {
                file.createNewFile();
            }

            // 使用 Files.newOutputStream 共创创建 outputStream 对象 , ture 表示 写文件不覆盖之前的内容
            try (OutputStream outputStream = new FileOutputStream(file, true)) {
                outputStream.write(message.getBody());
            } catch (IOException e) {
                throw new RuntimeException("写入文件时发生错误: " + e.getMessage());
            }

        };
        channel.basicConsume(queueName, true, deliverCallback, tag -> {
        });
    }
}


注意:

先启动两个消费者再启动生产者。

生产者生产消息后,如果没有对应的消费者接收,则该消息是遗弃的消息


启动看看效果:

在这里插入图片描述


可以看到 一个 生产者生产者的消息被多个消费者消费 ,到此 fanout 交换机 就 演示完成了 .

6. 直接交换机 Direct exchange

直接交换机: 根据消息的 routing key 将消息路由到与之完全匹配的队列。适用于一对一的消息传递。


在使用 Fanout 交换机 ,完成了对日志系统的构造 ,但是还不够,因为 日志 有很多 ,我们并不好 一下找到 严重错误 , 我们可以让一个消费者去操作文件存入全部日志, 让 另外一个消费者 消费严重错误 (如果是严重的错误 才发送给消费者) 将 严重错误的消息存盘 . 此时就好找错误了.


我们要实现这个功能 扇形交换机就不好完成,因为扇形交换机会将消息传播给全部绑定的队列中 ,这里我们就可以使用直接交换机 (direct)


使用 queueBind 方法 绑定路由键

channel.queueBind(queueName, EXCHANGE_NAME, "routingKey");


绑定玩路由键后 ,消息只会去 它绑定的 路由键队列中.

在这里插入图片描述


在上面这张图中,我们可以看到 X 绑定了两个队列,绑定类型是 direct。队列 Q1 绑定键为 orange, 队列 Q2 绑定键有两个:一个绑定键为 black,另一个绑定键为 green.


在这种绑定情况下,生产者发布消息到 exchange 上,绑定键为 orange 的消息会被发布到队列 Q1。绑定键为 black ,green 和的消息会被发布到队列 Q2,其他消息类型的消息将被丢弃。

6.1 多重绑定

在这里插入图片描述


当然如果 exchange 的绑定类型是direct,但是它绑定的多个队列的 key 如果都相同,在这种情况下虽然绑定类型是 direct 但是它表现的就和 fanout 有点类似了,就跟广播差不多,如上图所示。

6.2 direct 代码案例


这里我们 实现 让 生产者发送多个消息 ,多个消费者 消费不同的消息.

图:

在这里插入图片描述


C1 消费者:绑定 console 队列,routingKey 为 info、warning

C2 消费者:绑定 disk 队列,routingKey 为 error


当生产者生产消息到 direct_logs 交换机里,该交换机会检测消息的 routingKey 条件,然后分配到满足条件的队列里,最后由消费者从队列消费消息。


代码: 这里就不写文件操作了,直接将消费者拿到的消息 放到 控制到上.


生产者

package org.example.five;

import com.rabbitmq.client.Channel;
import org.example.utils.RabbitMQUtils;

import java.io.IOException;
import java.util.Scanner;
import java.util.concurrent.TimeoutException;

public class DirectLogs {

    // 交换机
    public static final String EXCHANGE_NAME = "direct_logs";

    public static void main(String[] args) throws IOException, TimeoutException {
        Channel channel = RabbitMQUtils.getChannel();
        
        Scanner sc = new Scanner(System.in);

        while (sc.hasNext()) {
            String message = sc.next();
            // 路由键为 info
            channel.basicPublish(EXCHANGE_NAME, "warning", null, message.getBytes("UTF-8"));

            System.out.println("生产者发出消息: " + message);
        }
    }
}


启动后 先用 路由键为 info 发送多个消息 ,然后更改 为 error 和 warning 发送消息.


消费者 c1

package org.example.five;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import org.example.utils.RabbitMQUtils;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

// 消费者 c1 -- 消费 路由键为 info 和 warning 的消息
public class ReceiveLogsDirect01 {

    private static final String EXCHANGE_NAME = "direct_logs";

    public static void main(String[] args) throws IOException, TimeoutException {

        Channel channel = RabbitMQUtils.getChannel();

        // 声明一个交换机
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);

        // 声明一个队列
        channel.queueDeclare("console", false, false, false, null);

        // 绑定 --> 队列 console , 交换机 direct_logs , 路由键 info
        channel.queueBind("console", EXCHANGE_NAME, "info");

        // 绑定 --> 队列 console , 交换机 direct_logs , 路由键 warning
        channel.queueBind("console", EXCHANGE_NAME, "warning");

        // 接受消息
        DeliverCallback deliverCallback = (tag, message) -> {
            System.out.println("ReceiveLogsDirect01控制台打印接收到的消息:" + new String(message.getBody(), "UTF-8"));
        };

        channel.basicConsume("console", true, deliverCallback, (tag) -> {
        });
    }
}


消费者 c2

package org.example.five;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import org.example.utils.RabbitMQUtils;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

// 消费者 c2 -- 消费 路由键为 error
public class ReceiveLogsDirect02 {

    private static final String EXCHANGE_NAME = "direct_logs";

    public static void main(String[] args) throws IOException, TimeoutException {

        Channel channel = RabbitMQUtils.getChannel();

        // 声明一个交换机
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);


        // 声明一个队列
        channel.queueDeclare("disk", false, false, false, null);

        // 绑定 --> 队列 console , 交换机 direct_logs , 路由键 info
        channel.queueBind("disk", EXCHANGE_NAME, "error");



        // 接受消息
        DeliverCallback deliverCallback = (tag, message) -> {
            System.out.println("ReceiveLogsDirect01控制台打印接收到的消息:" + new String(message.getBody(), "UTF-8"));
        };

        channel.basicConsume("disk", true, deliverCallback, (tag) -> {});
    }
}


效果:


图一:

在这里插入图片描述


图二:

在这里插入图片描述

7. 主题交换机


主题交换机: 根据消息的 routing key 和交换机与队列绑定时的 routing pattern 进行匹配,将消息路由到满足条件的队列。支持通配符匹配,

上面我们使用 直接交换机改进了 日志记录系统 ,。我们没有使用只能进行随意广播的 fanout 交换机,而是使用了 direct 交换机,从而有能实现有选择性地接收日志。


尽管使用 direct 交换机改进了我们的系统,但是它仍然存在局限性——比方说我们想接收的日志类型有 info.base 和 info.advantage,某个队列只想 info.base 的消息,那这个时候direct 就办不到了。这个时候就只能使用 topic 类型


另外 使用 topic 需要注意:

发送到类型是 topic 交换机的消息的 routing_key 不能随意写,必须满足一定的要求,它必须是一个单词列表以点号分隔开。这些单词可以是任意单词

比如说:“stock.usd.nyse”, “nyse.vmw”, “quick.orange.rabbit” 这种类型的。

当然这个单词列表最多不能超过 255 个字节。

在这个规则列表中,其中有两个替换符是大家需要注意的:

  • *(星号)可以代替一个位置
  • #(井号)可以替代零个或多个位置

7.1 Topic 匹配案例


绑定关系图

在这里插入图片描述

  • Q1–>绑定的是
    • 中间带 orange 带 3 个单词的字符串 (*.orange.*)
  • Q2–>绑定的是
    • 最后一个单词是 rabbit 的 3 个单词 (*.*.rabbit)
    • 第一个单词是 lazy 的多个单词 (lazy.#)


对上面 q1 和 q2 绑定的路由键 , 举几个例子

例子说明
uick.orange.rabbit被队列 Q1Q2 接收到
azy.orange.elephant被队列 Q1Q2 接收到
quick.orange.fox被队列 Q1 接收到
lazy.brown.fox被队列 Q2 接收到
lazy.pink.rabbit虽然满足两个绑定但只被队列 Q2 接收一次
quick.brown.fox不匹配任何绑定不会被任何队列接收到会被丢弃
quick.orange.male.rabbit是四个单词不匹配任何绑定会被丢弃
lazy.orange.male.rabbit是四个单词但匹配 Q2


如果一个队列绑定的路由键为 # , 此时队列可以接收到所有的数据,此时就相当于绑定了 一个 fanout (扇形) 交换机了 ,如果 一个队列 绑定的路由键 没有 # 和 * 此时就可以认为 绑定了一个 direct(直接) 交换机了.


看完这些,下面就来看看代码案例

7.2 Topic 代码案例

创建一个生产者 ,生产多个消息到交换机,交换机按照通配符分配消息到不同的队列中,队列由消费者进行消费


生产者 EmitLogTopic:

package org.example.six;

import com.rabbitmq.client.Channel;
import org.example.utils.RabbitMQUtils;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeoutException;

public class EmitLogTopic {

    // 交换机的名称
    public static final String EXCHANGE_NAME = "topic_logs";

    public static void main(String[] args) throws IOException, TimeoutException {
        Channel channel = RabbitMQUtils.getChannel();


        /**
         * Q1-->绑定的是
         *      中间带 orange 带 3 个单词的字符串(*.orange.*)
         * Q2-->绑定的是
         *      最后一个单词是 rabbit 的 3 个单词(*.*.rabbit)
         *      第一个单词是 lazy 的多个单词(lazy.#)
         */

        HashMap<String, String> bindingKeyMap = new HashMap<>();

        bindingKeyMap.put("quick.orange.rabbit", "被队列 Q1Q2 接收到");
        bindingKeyMap.put("lazy.orange.elephant", "被队列 Q1Q2 接收到");
        bindingKeyMap.put("quick.orange.fox", "被队列 Q1 接收到");
        bindingKeyMap.put("lazy.brown.fox", "被队列 Q2 接收到");
        bindingKeyMap.put("lazy.pink.rabbit", "虽然满足两个绑定但只被队列 Q2 接收一次");
        bindingKeyMap.put("quick.brown.fox", "不匹配任何绑定不会被任何队列接收到会被丢弃");
        bindingKeyMap.put("quick.orange.male.rabbit", "是四个单词不匹配任何绑定会被丢弃");
        bindingKeyMap.put("lazy.orange.male.rabbit", "是四个单词但匹配 Q2");

        // 遍历 map 发送消息
        for (Map.Entry<String, String> bindingKeyEntry : bindingKeyMap.entrySet()) {
            String routingKey = bindingKeyEntry.getKey();
            String message = bindingKeyEntry.getValue();

            channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes("UTF-8"));
            System.out.println("生产者发出消息: " + message);
        }
    }
}


消费者c1

package org.example.six;


// 消费者 c1 --> 绑定的路由键为 *.orange.*

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import org.example.utils.RabbitMQUtils;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class ReceiveLogsTopic01 {

    // 交换机名称
    public static final String EXCHANGE_NAME = "topic_logs";

    public static void main(String[] args) throws IOException, TimeoutException {
        Channel channel = RabbitMQUtils.getChannel();

        // 声明交换机
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);

        // 声明队列
        String queueName = "Q1";

        channel.queueDeclare(queueName, false, false, false, null);

        channel.queueBind(queueName, EXCHANGE_NAME, "*.orange.*");
        System.out.println("等待接受消息.....");

        // 接收到消息的回调
        DeliverCallback deliverCallback = (tag, message) -> {
            System.out.println(new String(message.getBody(), "UTF-8"));
            System.out.println("接收队列:" + queueName + "  绑定键:" + message.getEnvelope().getRoutingKey());
        };

        // 接受消息
        channel.basicConsume(queueName, true, deliverCallback, (message) -> {});
    }
}


消费者2

package org.example.six;


// 消费者 c2 绑定的路由键为 *.*.rabbit  , lazy.#

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import org.example.utils.RabbitMQUtils;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class ReceiveLogsTopic02 {

    // 交换机名称
    public static final String EXCHANGE_NAME = "topic_logs";

    public static void main(String[] args) throws IOException, TimeoutException {
        Channel channel = RabbitMQUtils.getChannel();

        // 声明交换机
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);

        // 声明队列
        String queueName = "Q2";

        channel.queueDeclare(queueName, false, false, false, null);

        channel.queueBind(queueName, EXCHANGE_NAME, "*.*.rabbit");
        channel.queueBind(queueName, EXCHANGE_NAME, "lazy.#");

        System.out.println("c2 等待接受消息.....");

        // 接收到消息的回调
        DeliverCallback deliverCallback = (tag, message) -> {
            System.out.println(new String(message.getBody(), "UTF-8"));
            System.out.println("接收队列:" + queueName + "  绑定键:" + message.getEnvelope().getRoutingKey());
        };

        // 接受消息
        channel.basicConsume(queueName, true, deliverCallback, (message) -> {});
    }
}


效果展示:

在这里插入图片描述

8. headers 头交换机


headers 皮皮额 AMQP 消息的 headr 而不是 路由键 ,此外 headers 交换机和 direct 交换机完全一致 ,但是性能差很多 ,目前几乎 用不到 ,了解即可.


消费方指定的 headers 中必须包含一个 “x-match” 的键 。

键 “x-match” 的值有两个

  1. x-match = all :表示所有的键值对都匹配才能接收到消息
  2. x-mathc = any : 表示只有键值对匹配就能接受到消息

在这里插入图片描述


代码演示:

生产者:

package org.example.six;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import org.example.utils.RabbitMQUtils;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeoutException;

public class ProducerHeaders {

    public static String EXCHANGE = "header_exchange";

    public static String QUEUE = "header_queue";

    public static void main(String[] args) throws IOException, TimeoutException {

        Channel channel = RabbitMQUtils.getChannel();

        // 声明一个交换机
        channel.exchangeDeclare(EXCHANGE, BuiltinExchangeType.HEADERS, true, false, null);

        // 声明一个队列
        channel.queueDeclare(QUEUE, true, false, false, null);


        Map<String, Object> headerMap = new HashMap<>();
        headerMap.put("name", "abcdef");
        headerMap.put("sex", "男");
        AMQP.BasicProperties.Builder properties = new AMQP.BasicProperties.Builder().headers(headerMap);

        // 发送消息
        String message = "hello_header";


        channel.basicPublish(EXCHANGE, "", properties.build(), message.getBytes());
    }

}

消费者:

package org.example.six;

import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import org.example.utils.RabbitMQUtils;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.TimeoutException;

public class ConsumerHeader {

    public static String EXCHANGE = "header_exchange";

    public static String QUEUE = "header_queue";

    public static void main(String[] args) throws IOException, TimeoutException {

        Channel channel = RabbitMQUtils.getChannel();

        DeliverCallback deliverCallback = (tag, message) -> {
            System.out.println("接收到消息: " + new String(message.getBody()));
        };

        CancelCallback callback = (tag) -> {
            System.out.println("消息被中断");
        };

        Map<String, Object> headerMap = new HashMap<>();
        headerMap.put("x-match", "all");
        // 除了 all 还有 any 
        headerMap.put("name", "abcdef");
        headerMap.put("sex", "男");

        channel.queueBind(QUEUE, EXCHANGE, "", headerMap);


        channel.basicConsume(QUEUE, true, deliverCallback, callback);
    }
}

效果:

图一:

在这里插入图片描述

图二:

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1120647.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【力扣每日一题】2023.10.22 做菜顺序

目录 题目&#xff1a; 示例&#xff1a; 分析&#xff1a; 代码&#xff1a; 题目&#xff1a; 示例&#xff1a; 分析&#xff1a; 给我们一个数组表示每个菜的满意度&#xff0c;我们可以指定做哪些菜以及做的顺序&#xff0c;需要我们凑到一个系数的最大值&#xff0c…

第三章 内存管理 十五、内存映射文件

目录 一、传统的文件访问方式 二、内存映射文件 1、方便文件的访问 2、实现文件数据的共享 三、总结 一、传统的文件访问方式 二、内存映射文件 1、方便文件的访问 2、实现文件数据的共享 三、总结

教你注册chrome开发者账号,并发布chrome浏览器插件。

本篇文章主要讲解&#xff0c;注册chrome开发者账号&#xff0c;及发布chrome浏览器插件的流程。包含插件的打包和上传。 日期&#xff1a;2023年10月22日 作者&#xff1a;任聪聪 一、前提准备&#xff1a;注册chrome开发者账号 说明&#xff1a;注册需要5美元&#xff0c;一…

Qt界面容器:Widget、 Frame、分组框、滚动区、工具箱、选项卡小部件、堆叠小部件控件精讲

​Qt 界面设计容器简介 Qt中常用的容器控件, 包括: Widget, Frame, Group Box, Scroll Area, Tool Box, Tab Widget, Stacked Widget。 QWidget 这个类是所有窗口类的父类, 可以作为独立窗口使用, 也可以内嵌到其它窗口中使用。 头文件: #include <QWidget> qmake: QT…

按键控制LED灯亮灭

按键原理图&#xff1a;按键选用PE4 创建两个文件一个.c文件一个.h文件来写按键的代码 .c文件 #include "Key.h"void Key_Init(void) {RCC_APB2PeriphClockCmd(RCC_APB2Periph_GPIOE,ENABLE);GPIO_InitTypeDef GPIO_InitStruct;GPIO_InitStruct.GPIO_Mode GPIO_M…

python astra相机驱动问题

报错问题&#xff1a; openni.utils.OpenNIError: (OniStatus.ONI_STATUS_ERROR, bDeviceOpen using default: no devices found, None) 解决办法&#xff1a; 1、从sdk中拷贝文件 2、修改openni源码 3、执行测试程序 from openni import openni2 import numpy as np impor…

基于驾驶训练优化的BP神经网络(分类应用) - 附代码

基于驾驶训练优化的BP神经网络&#xff08;分类应用&#xff09; - 附代码 文章目录 基于驾驶训练优化的BP神经网络&#xff08;分类应用&#xff09; - 附代码1.鸢尾花iris数据介绍2.数据集整理3.驾驶训练优化BP神经网络3.1 BP神经网络参数设置3.2 驾驶训练算法应用 4.测试结果…

【C/C++笔试练习】初始化列表、构造函数、析构函数、两种排序方法、求最小公倍数

文章目录 C/C笔试练习1. 初始化列表&#xff08;1&#xff09;只能在列表初始化的变量 2.构造函数&#xff08;2&#xff09;函数体赋值&#xff08;3&#xff09;构造函数的概念&#xff08;4&#xff09;构造函数调用次数&#xff08;5&#xff09;构造函数调用次数&#xff…

自然语言处理---RNN经典案例之使用seq2seq实现英译法

1 seq2seq介绍 1.1 seq2seq模型架构 seq2seq模型架构分析&#xff1a; seq2seq模型架构&#xff0c;包括两部分分别是encoder(编码器)和decoder(解码器)&#xff0c;编码器和解码器的内部实现都使用了GRU模型&#xff0c;这里它要完成的是一个中文到英文的翻译&#xff1a;欢迎…

数据库MongoDB

MongoDB记录是一个文档&#xff0c;由一个字段和值对组成的数据结构&#xff0c;文档类似于JSON对象。 一个文档认为就是一个对象&#xff0c;字段的数据类型是字符型&#xff0c;值除了使用基本类型外&#xff0c;还可以包括其他文档&#xff0c;普通数组和文档数组。 一、…

Python —— UI自动化之使用JavaScript进行元素点亮、修改、点击元素

1、JavaScript点亮元素 在控制台通过JavaScript语言中对元素点亮效果如下&#xff1a; 将这个语句和UI自动化结合&#xff0c;代码如下&#xff1a; locator (By.ID,"kw") # 是元组类型 web_element WebDriverWait(driver,5,0.5).until(EC.visibility_of_eleme…

Windows安装virtualenv虚拟环境

需要先安装好python环境 1 创建虚拟环境目录 还是在D:\Program\ 的文件夹新建 .env 目录&#xff08;你也可以不叫这个名字&#xff0c;一般命名为 .env 或者 .virtualenv &#xff0c;你也可以在其他目录中创建&#xff09; 2 配置虚拟环境目录的环境变量 3 安装虚拟环境 进…

网络原理之UDP协议

文章目录 前言应用层协议常见的几种数据格式1. xml2. JSON3. protobuffer 端口号传输层UDP 报文协议格式源端口号和目的端口号UDP 长度校验和 前言 前面我们学习了如何使用 UDP 数据报 和 TCP 流实现网络编程一个回显服务器&#xff0c;在知道了 UDP 和 TCP 协议的基本原理之后…

Arduino驱动BMA220三轴加速度传感器(惯性测量传感器篇)

目录 1、传感器特性 2、硬件原理图 3、驱动程序 BMA220的三轴加速度计是一款具有I2C接口的超小型三轴低g加速度传感器断路器,面向低功耗消费市场应用。它可以测量3个垂直轴的加速度,从而在手机、手持设备、计算机外围设备、人机界面、虚拟现实功能和游戏控制器中感知倾斜、…

MYSQL第五章节有关约束操作详解(附代码,例题详解)这一篇就够了

c知识点合集已经完成欢迎前往主页查看&#xff0c;点点赞点点关注不迷路哦 点我进入c第一章知识点合集 MYSQL第一章节DDL数据定义语言的操作----点我进入 MYSQL第二章节DDL-数据库操作语言 DQL-数据查询语言----点我进入 MYSQL第三章节DCL-管理用户&#xff0c;控制权限----点我…

【Lua语法】字符串

Lua语言中的字符串是不可变值。不能像在C语言中那样直接改变某个字符串中的某个字符&#xff0c;但是可以通过创建一个新字符串的方式来达到修改的目的 print(add2(1 , 2 ,15,3))a "no one"b string.gsub(a , "no" , "on1111")print(a) print…

英语——名言篇——成语

爱屋及乌 Love me, love my dog.百闻不如一见 (眼见为实 ) Seeing is believing.比上不足比下有余 worse off than some, better off than many; to fall short of the best, but be better than the worst.笨鸟先飞 A slow sparrow should make an early start.不眠之夜 white…

VSCode C/C++ 分目录+多文件编译配置2

前言&#xff1a;介绍 task.json 和 launch.json 文件 task.json 和 launch.json 是用于配置 VS Code 编辑器中的任务 和 调试功能的两个重要文件。 task.json 文件用于配置任务&#xff0c;它定义了执行特定操作的任务&#xff0c;并提供了相应的命令和参数。以下是 task.js…

BERT理解

参数计算&#xff08;沐神对于bert参数计算介绍&#xff09;

【趣味随笔】移动机器人基础(导航方式、自主导航、硬件系统结构分布)

&#x1f4e2;&#xff1a;如果你也对机器人、人工智能感兴趣&#xff0c;看来我们志同道合✨ &#x1f4e2;&#xff1a;不妨浏览一下我的博客主页【https://blog.csdn.net/weixin_51244852】 &#x1f4e2;&#xff1a;文章若有幸对你有帮助&#xff0c;可点赞 &#x1f44d;…