Spring Boot 整合 RabbitMQ

news2025/2/27 23:03:01

一、工程简介
1、生产者(test-11-rabbitmq-producer,spring boot 版本 2.4.1)
1)pom依赖

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

2)application.yml 文件配置

server:
  port: 8012

spring:
  application:
    name: test-12-rabbitmq-consumer-01
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    username: guest
    password: guest
    virtual-host: /

2、消费者(test-12-rabbitmq-consumer-01,spring boot 版本 2.4.1)
1)pom依赖

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

2)application.yml 文件配置

server:
  port: 8012

spring:
  application:
    name: test-12-rabbitmq-consumer-01
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    username: guest
    password: guest
    virtual-host: /

3、消费者(test-12-rabbitmq-consumer-02,spring boot 版本 2.4.1)
1)pom依赖

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

2)application.yml 文件配置

server:
  port: 8013

spring:
  application:
    name: test-13-rabbitmq-consumer-02
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    username: guest
    password: guest
    virtual-host: /

二、RabbitMQ常用工作模式简单实现
1、简单模式(Simple)
Ⅰ、模式特点
① 一个生产者,一个消费者,点对点消费消息
② 默认使用 direct 交换机(RabbitMQ中默认使用 direct 交换机,每个新建的队列都会自动绑定到 direct 交换机上,且绑定的路由键 routingKey 名称和队列名称相同。如果想让队列绑定其他交换机,可以手动进行绑定。)
③ 工作流程:生产者生产消息并将消息发送到交换机 --> 交换机将消息路由到绑定的队列 --> 队列收到消息 --> 消费者监听队列获取消息进行消费

Ⅱ、生产者实现
① 在生产者工程 test-11-rabbitmq-producer 下新增 RabbitMQ 的配置类 RabbitmqConfig,内容如下:

package com.test.rabbitmq_producer.config;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * RabbitMQ 消息生产者配置类
 * 配置类需要配置哪些东西:
 * 1、定义队列
 * 2、定义交换机
 * 3、配置交换机类型等相关信息(比如说:是否持久化、是否自动删除...)
 * 4、声明队列
 * 5、队列绑定交换机(RabbitMQ默认使用direct交换机,每个新建的队列都会自动绑定到 direct 交换机上,且绑定的路由键 routingKey 名称和队列名称相同。如果想让队列绑定其他交换机,可以手动进行绑定。)
 */
@Slf4j
@Configuration
public class RabbitmqConfig {

    //定义队列
    public static final String SIMPLE_QUEUE = "simple.queue";//简单模式测试队列

    //声明-简单模式测试队列
    @Bean(SIMPLE_QUEUE)
    public Queue SIMPLE_QUEUE() {
        return new Queue(SIMPLE_QUEUE);
    }

}

② 在生产者工程 test-11-rabbitmq-producer 下新增测试类 ModelProducerTest,内容如下:

package com.test.rabbitmq_consumer02.mq;

import com.test.rabbitmq_consumer02.config.RabbitmqConfig;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.context.annotation.Configuration;

/**
 * RabbitMQ 常用模式测试类 -- 生产者
 */
@Slf4j
@RunWith(SpringJUnit4ClassRunner.class)
@SpringBootTest
public class ModelProducerTest {

    /**
     * 简单模式(Simple)
     * 一对一消费,一条消息只有一个消费者可以消费
     * 备注:
     * 1、一个队列中的一条消息只会被一个消费者消费一次
     * 2、简单模式中,使用默认交换机direct
     */
    @Test
    public void simpleSendMessageTest() {
        for (int i = 1; i <= 5; i++) {
            String currTime = TimeFormatUtil.getTimeByMs(System.currentTimeMillis(), "yyyy-MM-dd HH:mm:ss");
            //要发送的消息内容
            String message = "hello,简单模式(Simple),发送第 " + i + " 条消息,发送时间:" + currTime;
            rabbitTemplate.convertAndSend(RabbitmqConfig.SIMPLE_QUEUE,message);
            System.out.println("简单模式(Simple),生产者,发送第 " + i + " 条消息,发送的消息内容:" + message);
        }
    }

}

Ⅲ、消费者实现
① 将生产者工程 test-11-rabbitmq-producer 下 RabbitMQ 的配置类 RabbitmqConfig 复制一份到 消费者工程 test-12-rabbitmq-consumer-01 下,无需修改
② 在 消费者工程 test-12-rabbitmq-consumer-01 下新增一个消息监听类 ModelListener,内容如下:

package com.test.rabbitmq_consumer01.mq;

import com.test.rabbitmq_consumer01.config.RabbitmqConfig;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.context.annotation.Configuration;

/**
 * RabbitMQ 常用模式测试类 -- 消费者
 */
@Configuration
public class ModelListener {

    /**
     * 简单模式(Simple)
     * 一对一消费,一条消息只有一个消费者可以消费
     * 备注:
     * 1、一个队列中的一条消息只会被一个消费者消费一次
     * 2、简单模式中,使用默认交换机direct
     */
    @RabbitListener(queues = {RabbitmqConfig.SIMPLE_QUEUE})//监听 RabbitmqConfig.SIMPLE_QUEUE 队列
    public void simpleReceive(String message) {
        System.out.println("consumer-01,简单模式(Simple),接收到的消息:" + message);
    }

}

Ⅳ、测试
① 启动生产者工程 test-11-rabbitmq-producer
② 启动消费者工程 test-12-rabbitmq-consumer-01
③ 执行测试类 ModelProducerTest 中的 simpleSendMessageTest() 方法,发送消息
④ 消息监听结果, ModelListener 的 simpleReceive() 方法中收到消息如下:
在这里插入图片描述
2、工作队列模式(Work Queues)
Ⅰ、模式特点
① 一个生产者,多个消费者(默认使用轮询策略,可配置能者多劳模式,谁完成的快,谁多做一点)
② 虽然有多个消费者,但是只有一个队列,一条消息只能被一个消费者获取
② 默认使用 direct 交换机(RabbitMQ中默认使用 direct 交换机,每个新建的队列都会自动绑定到 direct 交换机上,且绑定的路由键 routingKey 名称和队列名称相同。如果想让队列绑定其他交换机,可以手动进行绑定。)
③ 工作流程:生产者生产消息并将消息发送到交换机 --> 交换机将消息路由到绑定的队列 --> 队列收到消息 --> 消费者监听队列获取消息进行消费

Ⅱ、生产者实现
① 在生产者工程 test-11-rabbitmq-producer 下 RabbitMQ 的配置类 RabbitmqConfig 中添加一个队列,添加完成后将 RabbitmqConfig 复制到 rabbitmq-consumer-01 和 rabbitmq-consumer-02 工程中,生产者和所有消费者中的配置类完全一样即可,添加后的内容如下:

package com.test.rabbitmq_producer.config;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * RabbitMQ 消息生产者配置类
 * 配置类需要配置哪些东西:
 * 1、定义队列
 * 2、定义交换机
 * 3、配置交换机类型等相关信息(比如说:是否持久化、是否自动删除...)
 * 4、声明队列
 * 5、队列绑定交换机(RabbitMQ默认使用direct交换机,每个新建的队列都会自动绑定到 direct 交换机上,且绑定的路由键 routingKey 名称和队列名称相同。如果想让队列绑定其他交换机,可以手动进行绑定。)
 */
@Slf4j
@Configuration
public class RabbitmqConfig {

    //定义队列
    public static final String SIMPLE_QUEUE = "simple.queue";//简单模式测试队列
    public static final String WORK_QUEUES = "work.queues";//工作队列模式测试队列

    //声明-简单模式测试队列
    @Bean(SIMPLE_QUEUE)
    public Queue SIMPLE_QUEUE() {
        return new Queue(SIMPLE_QUEUE);
    }

	//声明-工作队列模式测试队列
    @Bean(WORK_QUEUES)
    public Queue WORK_QUEUES() {
        return new Queue(WORK_QUEUES);
    }

}

② 在 生产者工程 test-11-rabbitmq-producer 下测试类 ModelProducerTest 中新增一个方法 workQueuesSendMessageTest(),内容如下:

	/**
     * 工作队列模式(Work Queues)
     * 多个消费者,你一个我一个分配消费消息,有预取机制,默认公平消费(轮询策略),可配置能者多劳模式,谁完成的快,谁多做一点
     * 备注:一个队列中的一条消息只会被一个消费者消费一次
     */
    @Test
    public void workQueuesSendMessageTest() {
        for (int i = 1; i <= 10; i++) {
            String currTime = TimeFormatUtil.getTimeByMs(System.currentTimeMillis(), "yyyy-MM-dd HH:mm:ss");
            //要发送的消息内容
            String message = "hello,工作队列模式(Work Queues),发送第 " + i + " 条消息,发送时间:" + currTime;
            rabbitTemplate.convertAndSend(RabbitmqConfig.WORK_QUEUES,message);
            System.out.println("工作队列模型(Work Queues),生产者,发送第 " + i + " 条消息,发送的消息内容:" + message);
        }
    }

Ⅲ、消费者实现
① 在 消费者工程 test-12-rabbitmq-consumer-01 下消息监听类 ModelListener 中新增一个监听方法 workQueuesReceive(),内容如下:

	/**
     * 工作队列模式(Work Queues)
     * 多个消费者,你一个我一个分配消费消息,有预取机制,默认公平消费(轮询策略),可配置能者多劳模式,谁完成的快,谁多做一点
     * 备注:一个队列中的一条消息只会被一个消费者消费一次
     */
    @RabbitListener(queues = {RabbitmqConfig.WORK_QUEUES})
    public void workQueuesReceive(String message) {
        System.out.println("consumer-01,工作队列模型(Work Queues),消费者,接收到的消息:" + message);
    }

② 在 消费者工程 test-12-rabbitmq-consumer-02 下消息监听类 ModelListener 中新增一个监听方法 workQueuesReceive(),内容如下:

/**
     * 工作队列模式(Work Queues)
     * 多个消费者,你一个我一个分配消费消息,有预取机制,默认公平消费(轮询策略),可配置能者多劳模式,谁完成的快,谁多做一点
     * 备注:一个队列中的一条消息只会被一个消费者消费一次
     */
    @RabbitListener(queues = {RabbitmqConfig.WORK_QUEUES})
    public void workQueuesReceive(String message) {
        System.out.println("consumer-02,工作队列模型(Work Queues),消费者,接收到的消息:" + message);
    }

Ⅳ、测试
① 启动生产者工程 test-11-rabbitmq-producer
② 启动消费者工程 test-12-rabbitmq-consumer-01
③ 启动消费者工程 test-12-rabbitmq-consumer-02
④ 执行测试类 ModelProducerTest 中的 workQueuesSendMessageTest() 方法,发送消息
⑤ 消息监听结果, test-12-rabbitmq-consumer-01 的 workQueuesReceive() 方法中收到消息如下:
在这里插入图片描述
⑥ 消息监听结果, test-12-rabbitmq-consumer-02 的 workQueuesReceive() 方法中收到消息如下:
在这里插入图片描述
Ⅴ、结论:
① 默认情况下,工作队列模式(Work Queues)使用轮询策略,多个消费者你一条我一条的公平消费
② 工作队列模式和简单模式的区别仅在于消费者数量

Ⅵ、问题:怎样取消工作队列模式的预取机制(轮询)?
解决:在生产者和所有消费者工程的 application.yml 文件中添加如下配置
在这里插入图片描述
application.yml 文件修改重启服务后,test-12-rabbitmq-consumer-01 的 workQueuesReceive() 方法中收到消息如下:
在这里插入图片描述
application.yml 文件修改重启服务后,test-12-rabbitmq-consumer-02 的 workQueuesReceive() 方法中收到消息如下:
在这里插入图片描述

3、发布订阅模式(Publish/Subscribe)
Ⅰ、模式特点
① 一个生产者,多个消费者,一条消息可以被多个消费者同时消费
② 使用 fanout 交换机
③ 工作流程:生产者生产消息并将消息发送给交换机 --> 交换机收到消息后将消息转发给绑定该交换机的所有队列 --> 队列收到消息 --> 消费者监听队列获取消息进行消费

Ⅱ、生产者实现
① 在生产者工程 test-11-rabbitmq-producer 下 RabbitMQ 的配置类 RabbitmqConfig 中添加两个队列和一个自定义交换机,添加完成后将 RabbitmqConfig 复制到 rabbitmq-consumer-01 和 rabbitmq-consumer-02 工程中,生产者和所有消费者中的配置类完全一样即可,添加后的内容如下:

package com.test.rabbitmq_producer.config;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * RabbitMQ 消息生产者配置类
 * 配置类需要配置哪些东西:
 * 1、定义队列
 * 2、定义交换机
 * 3、配置交换机类型等相关信息(比如说:是否持久化、是否自动删除...)
 * 4、声明队列
 * 5、队列绑定交换机(RabbitMQ默认使用direct交换机,每个新建的队列都会自动绑定到 direct 交换机上,且绑定的路由键 routingKey 名称和队列名称相同。如果想让队列绑定其他交换机,可以手动进行绑定。)
 */
@Slf4j
@Configuration
public class RabbitmqConfig {

    //定义队列
    public static final String SIMPLE_QUEUE = "simple.queue";//简单模式测试队列
    public static final String WORK_QUEUES = "work.queues";//工作队列模式测试队列
    public static final String PUBLISH_SUBSCRIBE_QUEUES_1 = "publish.subscribe.queues.01";//发布订阅模式测试队列
    public static final String PUBLISH_SUBSCRIBE_QUEUES_2 = "publish.subscribe.queues.02";//发布订阅模式测试队列

	//定义交换机
	public static final String PUBLISH_SUBSCRIBE_FANOUT_TEST_EXCHANGE = "fanout_exchange";//发布订阅模式测试交换机

    //声明-简单模式测试队列
    @Bean(SIMPLE_QUEUE)
    public Queue SIMPLE_QUEUE() {
        return new Queue(SIMPLE_QUEUE);
    }

	//声明-工作队列模式测试队列
    @Bean(WORK_QUEUES)
    public Queue WORK_QUEUES() {
        return new Queue(WORK_QUEUES);
    }
    
	//声明-发布订阅模式测试队列01
    @Bean(PUBLISH_SUBSCRIBE_QUEUES_1)
    public Queue PUBLISH_SUBSCRIBE_QUEUES_1() {
        return new Queue(PUBLISH_SUBSCRIBE_QUEUES_1);
    }

    //声明-发布订阅模式测试队列02
    @Bean(PUBLISH_SUBSCRIBE_QUEUES_2)
    public Queue PUBLISH_SUBSCRIBE_QUEUES_2() {
        return new Queue(PUBLISH_SUBSCRIBE_QUEUES_2);
    }

	/**
     * 配置交换机--fanout交换机
     * @return
     */
	@Bean(PUBLISH_SUBSCRIBE_FANOUT_TEST_EXCHANGE)
    public Exchange PUBLISH_SUBSCRIBE_FANOUT_TEST_EXCHANGE() {
        return ExchangeBuilder.fanoutExchange(PUBLISH_SUBSCRIBE_FANOUT_TEST_EXCHANGE).durable(true).build();
    }

	/**
     * 队列绑定到交换机
     * @param queue         队列
     * @param exchange      交换机
     * @return
     */
    @Bean
    public Binding BINDING_PUBLISH_SUBSCRIBE_QUEUES_1(@Qualifier(PUBLISH_SUBSCRIBE_QUEUES_1) Queue queue,
        @Qualifier(PUBLISH_SUBSCRIBE_FANOUT_TEST_EXCHANGE) Exchange exchange) {
        //在 with() 中指定 routingKey ,routingKey 中的 # 号表示 通配符,可以匹配任何字符
        return BindingBuilder.bind(queue).to(exchange).with("publish.subscribe.queues.01").noargs();
    }

    /**
     * 队列绑定到交换机
     * @param queue         队列
     * @param exchange      交换机
     * @return
     */
    @Bean
    public Binding BINDING_PUBLISH_SUBSCRIBE_QUEUES_2(@Qualifier(PUBLISH_SUBSCRIBE_QUEUES_2) Queue queue,
        @Qualifier(PUBLISH_SUBSCRIBE_FANOUT_TEST_EXCHANGE) Exchange exchange) {
        //在 with() 中指定 routingKey ,routingKey 中的 # 号表示 通配符,可以匹配任何字符
        return BindingBuilder.bind(queue).to(exchange).with("publish.subscribe.queues.02").noargs();
    }

}

② 在 生产者工程 test-11-rabbitmq-producer 下测试类 ModelProducerTest 中新增一个方法 publishSubscribeSendMessageTest(),内容如下:

	/**
     * 发布订阅模式(Publish/Subscribe)
     * 发布订阅模式使用 fanout 类型的交换机,将接收到的消息路由到每一个跟其绑定的queue(队列)
     * 备注:
     * 1、生产者将消息发送给交换机,并指定路由规则
     * 2、交换机将收到的所有消息按照路由规则转发到跟当前交换机绑定的所有队列上(每个和当前交换机绑定的队列都会收到消息,然后被消费者消费,也就是说一条消息可以被多个消费者同时消费)
     */
    @Test
    public void publishSubscribeSendMessageTest() throws InterruptedException {
        for (int i = 1; i <= 10; i++) {
            String currTime = TimeFormatUtil.getTimeByMs(System.currentTimeMillis(), "yyyy-MM-dd HH:mm:ss");
            //要发送的消息内容
            String message = "hello,publish.subscribe.queues,发送第 " + i + " 条消息,发送时间:" + currTime;
            String routingKey = "";
            if (i % 2 == 0) {
                message = message.replace("queues", "queues.01");
                routingKey = "publish.subscribe.queues.01";
            } else {
                message = message.replace("queues", "queues.02");
                routingKey = "publish.subscribe.queues.02";
            }
            /**
             * convertAndSend(String exchange, String routingKey, Object object):
             * 参数一(String exchange):交换机名称
             * 参数二(String routingKey):路由规则
             * 参数三(Object object):发送的消息内容
             */
            rabbitTemplate.convertAndSend(RabbitmqConfig.PUBLISH_SUBSCRIBE_FANOUT_TEST_EXCHANGE,routingKey,message);
            System.out.println("发布订阅模型(Publish/Subscribe),生产者,发送第 " + i + " 条消息,发送的消息内容:" + message);
//            Thread.sleep(2000);
        }
    }

Ⅲ、消费者实现
① 在 消费者工程 test-12-rabbitmq-consumer-01 下消息监听类 ModelListener 中新增一个监听方法 publishSubscribeQueuesReceive(),内容如下:

	/**
     * 发布订阅模式(Publish/Subscribe)
     * 发布订阅模式使用 fanout 类型的交换机,将接收到的消息路由到每一个跟其绑定的queue(队列)
     * 备注:
     * 1、生产者将消息发送给交换机,并指定路由规则
     * 2、交换机将收到的所有消息按照路由规则转发到跟当前交换机绑定的所有队列上(每个和当前交换机绑定的队列都会收到消息,然后被消费者消费,也就是说一条消息可以被多个消费者同时消费)
     */
    @RabbitListener(queues = {RabbitmqConfig.PUBLISH_SUBSCRIBE_QUEUES_1})
    public void publishSubscribeQueuesReceive(String message) throws InterruptedException {
        Thread.sleep(100);
        System.out.println("consumer-01,发布订阅模型(Publish/Subscribe),消费者一*绑定队列*PUBLISH_SUBSCRIBE_QUEUES_1,接收到的消息:" + message);
    }

② 在 消费者工程 test-12-rabbitmq-consumer-02 下消息监听类 ModelListener 中新增一个监听方法 publishSubscribeQueuesReceive(),内容如下:

	/**
     * 发布订阅模式(Publish/Subscribe)
     * 发布订阅模式使用 fanout 类型的交换机,将接收到的消息路由到每一个跟其绑定的queue(队列)
     * 备注:
     * 1、生产者将消息发送给交换机,并指定路由规则
     * 2、交换机将收到的所有消息按照路由规则转发到跟当前交换机绑定的所有队列上(每个和当前交换机绑定的队列都会收到消息,然后被消费者消费,也就是说一条消息可以被多个消费者同时消费)
     */
    @RabbitListener(queues = {RabbitmqConfig.PUBLISH_SUBSCRIBE_QUEUES_2})
    public void publishSubscribeQueuesReceive(String message) throws InterruptedException {
        Thread.sleep(100);
        System.out.println("consumer-02,发布订阅模型(Publish/Subscribe),消费者二*绑定队列*PUBLISH_SUBSCRIBE_QUEUES_2,接收到的消息:" + message);
    }

Ⅳ、测试
① 启动生产者工程 test-11-rabbitmq-producer
② 启动消费者工程 test-12-rabbitmq-consumer-01
③ 启动消费者工程 test-12-rabbitmq-consumer-02
④ 执行测试类 ModelProducerTest 中的 publishSubscribeSendMessageTest() 方法,发送消息
⑤ 消息监听结果, test-12-rabbitmq-consumer-01 的 publishSubscribeQueuesReceive() 方法中收到消息如下:
在这里插入图片描述
⑥ 消息监听结果, test-12-rabbitmq-consumer-02 的 publishSubscribeQueuesReceive() 方法中收到消息如下:
在这里插入图片描述
结论:在发布订阅模式(Publish/Subscribe)下一条消息是可以被多个消费者同时消费的(多个队列绑定同一个交换机)。

4、路由模式(Routing)
Ⅰ、模式特点
① 一个生产者,一个交换机,多个队列,多个消费者,消费者监听队列,队列跟交换机绑定
② 使用 direct 交换机
③ 队列绑定交换机的时候必须指定 RoutingKey,不支持通配符
④ 发送消息的时候必须指定 RoutingKey
⑤ RoutingKey 可以重复
⑥ 工作流程:生产者生产消息并将消息发送给交换机 --> 交换机收到消息后根据路由规则将消息转发给绑定该交换机且符合路由规则的队列 --> 队列收到消息 --> 消费者监听队列获取消息进行消费

Ⅱ、生产者实现
① 在生产者工程 test-11-rabbitmq-producer 下 RabbitMQ 的配置类 RabbitmqConfig 中添加两个队列和一个自定义交换机,添加完成后将 RabbitmqConfig 复制到 rabbitmq-consumer-01 和 rabbitmq-consumer-02 工程中,生产者和所有消费者中的配置类完全一样即可,添加后的内容如下:

package com.test.rabbitmq_producer.config;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * RabbitMQ 消息生产者配置类
 * 配置类需要配置哪些东西:
 * 1、定义队列
 * 2、定义交换机
 * 3、配置交换机类型等相关信息(比如说:是否持久化、是否自动删除...)
 * 4、声明队列
 * 5、队列绑定交换机(RabbitMQ默认使用direct交换机,每个新建的队列都会自动绑定到 direct 交换机上,且绑定的路由键 routingKey 名称和队列名称相同。如果想让队列绑定其他交换机,可以手动进行绑定。)
 */
@Slf4j
@Configuration
public class RabbitmqConfig {

    //定义队列
    public static final String SIMPLE_QUEUE = "simple.queue";//简单模式测试队列
    public static final String WORK_QUEUES = "work.queues";//工作队列模式测试队列
    public static final String PUBLISH_SUBSCRIBE_QUEUES_1 = "publish.subscribe.queues.01";//发布订阅模式测试队列
    public static final String PUBLISH_SUBSCRIBE_QUEUES_2 = "publish.subscribe.queues.02";//发布订阅模式测试队列
    public static final String ROUTING_QUEUES_SMS = "routing.queues.sms";//路由模式测试队列
    public static final String ROUTING_QUEUES_EMAIL = "routing.queues.email";//路由模式测试队列

	//定义交换机
	public static final String PUBLISH_SUBSCRIBE_FANOUT_TEST_EXCHANGE = "fanout_exchange";//发布订阅模式测试交换机
	public static final String ROUTING_DIRECT_TEST_EXCHANGE = "direct_exchange";//路由模式测试交换机

    //声明-简单模式测试队列
    @Bean(SIMPLE_QUEUE)
    public Queue SIMPLE_QUEUE() {
        return new Queue(SIMPLE_QUEUE);
    }

	//声明-工作队列模式测试队列
    @Bean(WORK_QUEUES)
    public Queue WORK_QUEUES() {
        return new Queue(WORK_QUEUES);
    }
    
	//声明-发布订阅模式测试队列01
    @Bean(PUBLISH_SUBSCRIBE_QUEUES_1)
    public Queue PUBLISH_SUBSCRIBE_QUEUES_1() {
        return new Queue(PUBLISH_SUBSCRIBE_QUEUES_1);
    }

    //声明-发布订阅模式测试队列02
    @Bean(PUBLISH_SUBSCRIBE_QUEUES_2)
    public Queue PUBLISH_SUBSCRIBE_QUEUES_2() {
        return new Queue(PUBLISH_SUBSCRIBE_QUEUES_2);
    }

	//声明-路由模式短信测试队列
    @Bean(ROUTING_QUEUES_SMS)
    public Queue ROUTING_QUEUES_SMS() {
        return new Queue(ROUTING_QUEUES_SMS);
    }

    //声明-路由模式邮件测试队列
    @Bean(ROUTING_QUEUES_EMAIL)
    public Queue ROUTING_QUEUES_EMAIL() {
        return new Queue(ROUTING_QUEUES_EMAIL);
    }

	/**
     * 配置交换机--fanout交换机
     * @return
     */
	@Bean(PUBLISH_SUBSCRIBE_FANOUT_TEST_EXCHANGE)
    public Exchange PUBLISH_SUBSCRIBE_FANOUT_TEST_EXCHANGE() {
        return ExchangeBuilder.fanoutExchange(PUBLISH_SUBSCRIBE_FANOUT_TEST_EXCHANGE).durable(true).build();
    }

	/**
     * 配置交换机--direct交换机
     * @return
     */
    @Bean(ROUTING_DIRECT_TEST_EXCHANGE)
    public Exchange ROUTING_DIRECT_TEST_EXCHANGE() {
        return ExchangeBuilder.directExchange(ROUTING_DIRECT_TEST_EXCHANGE).durable(true).build();
    }

	/**
     * 队列绑定到交换机
     * @param queue         队列
     * @param exchange      交换机
     * @return
     */
    @Bean
    public Binding BINDING_PUBLISH_SUBSCRIBE_QUEUES_1(@Qualifier(PUBLISH_SUBSCRIBE_QUEUES_1) Queue queue,
        @Qualifier(PUBLISH_SUBSCRIBE_FANOUT_TEST_EXCHANGE) Exchange exchange) {
        //在 with() 中指定 routingKey ,routingKey 中的 # 号表示 通配符,可以匹配任何字符
        return BindingBuilder.bind(queue).to(exchange).with("publish.subscribe.queues.01").noargs();
    }

    /**
     * 队列绑定到交换机
     * @param queue         队列
     * @param exchange      交换机
     * @return
     */
    @Bean
    public Binding BINDING_PUBLISH_SUBSCRIBE_QUEUES_2(@Qualifier(PUBLISH_SUBSCRIBE_QUEUES_2) Queue queue,
        @Qualifier(PUBLISH_SUBSCRIBE_FANOUT_TEST_EXCHANGE) Exchange exchange) {
        //在 with() 中指定 routingKey ,routingKey 中的 # 号表示 通配符,可以匹配任何字符
        return BindingBuilder.bind(queue).to(exchange).with("publish.subscribe.queues.02").noargs();
    }

	/**
     * 队列绑定到交换机
     * @param queue         队列
     * @param exchange      交换机
     * @return
     */
    @Bean
    public Binding BINDING_ROUTING_QUEUES_SMS(@Qualifier(ROUTING_QUEUES_SMS) Queue queue,
        @Qualifier(ROUTING_DIRECT_TEST_EXCHANGE) Exchange exchange) {
        //在 with() 中指定 routingKey ,routingKey 中的 # 号表示 通配符,可以匹配任何字符
        return BindingBuilder.bind(queue).to(exchange).with("routing.queues.sms").noargs();
    }

    /**
     * 队列绑定到交换机
     * @param queue         队列
     * @param exchange      交换机
     * @return
     */
    @Bean
    public Binding BINDING_ROUTING_QUEUES_EMAIL(@Qualifier(ROUTING_QUEUES_EMAIL) Queue queue,
        @Qualifier(ROUTING_DIRECT_TEST_EXCHANGE) Exchange exchange) {
        //在 with() 中指定 routingKey ,routingKey 中的 # 号表示 通配符,可以匹配任何字符
        return BindingBuilder.bind(queue).to(exchange).with("routing.queues.email").noargs();
    }

}

② 在 生产者工程 test-11-rabbitmq-producer 下测试类 ModelProducerTest 中新增一个方法 routingSendMessageTest(),内容如下:

	/**
     * 路由模式(Routing)
     * Routing模式 使用 Direct 类型的交换机,将接收到的消息路由到每一个跟其绑定的queue(队列)
     * 备注:
     * 1、生产者将消息发送给交换机,并指定路由规则
     * 2、交换机将收到的所有消息按照路由规则转发到跟当前交换机绑定的所有队列上(每个和当前交换机绑定的队列都会收到消息,然后被消费者消费,也就是说一条消息可以被多个消费者同时消费)
     * 3、路由模式不支持使用通配符,convertAndSend() 方法中指定的 routingKey 和队列绑定交换机时 with() 方法中配置的 routingKey 必须完全匹配才能将消息发送到具体的队列上
     */
    @Test
    public void routingSendMessageTest() throws InterruptedException {
        for (int i = 1; i <= 10; i++) {
            String currTime = TimeFormatUtil.getTimeByMs(System.currentTimeMillis(), "yyyy-MM-dd HH:mm:ss");
            //要发送的消息内容
            String message = "hello,routing.queues,发送第 " + i + " 条消息,发送时间:" + currTime;
            String routingKey = "";
            if (i % 2 == 0) {
                //发送短信
                message = message.replace("queues", "queues.sms");
                routingKey = "routing.queues.sms";
            } else {
                //发送邮件
                message = message.replace("queues", "queues.email");
                routingKey = "routing.queues.email";
            }
            /**
             * convertAndSend(String exchange, String routingKey, Object object):
             * 参数一(String exchange):交换机名称
             * 参数二(String routingKey):指定的路由,Routing模式下指定的路由必须和 RabbitmqConfig 配置类中队列和交换机绑定时 with() 中配置的 routingKey 要一致才行,不支持使用通配符
             * 参数三(Object object):发送的消息内容
             */
            rabbitTemplate.convertAndSend(RabbitmqConfig.ROUTING_DIRECT_TEST_EXCHANGE,routingKey,message);
            System.out.println("路由模式(Routing),生产者,发送第 " + i + " 条消息,发送的消息内容:" + message);
//            Thread.sleep(1000);
        }
    }

Ⅲ、消费者实现
① 在 消费者工程 test-12-rabbitmq-consumer-01 下消息监听类 ModelListener 中新增一个监听方法 routingQueuesReceive(),内容如下:

	/**
     * 路由模式(Routing)
     * Routing模式 使用 Direct 类型的交换机,将接收到的消息路由到每一个跟其绑定的queue(队列)
     * 备注:
     * 1、生产者将消息发送给交换机,并指定路由规则
     * 2、交换机将收到的所有消息按照路由规则转发到跟当前交换机绑定的所有队列上(每个和当前交换机绑定的队列都会收到消息,然后被消费者消费,也就是说一条消息可以被多个消费者同时消费)
     * 3、路由模式不支持使用通配符,convertAndSend() 方法中指定的 routingKey 和队列绑定交换机时 with() 方法中配置的 routingKey 必须完全匹配才能将消息发送到具体的队列上
     */
    @RabbitListener(queues = {RabbitmqConfig.ROUTING_QUEUES_SMS})//queues需要手动先创建队列
    public void routingQueuesReceive(String message) {
        System.out.println("consumer-01,路由模式(Routing),消费者一*绑定队列*ROUTING_QUEUES_SMS,接收到的消息:" + message);
    }

② 在 消费者工程 test-12-rabbitmq-consumer-02 下消息监听类 ModelListener 中新增一个监听方法 routingQueuesReceive(),内容如下:

	/**
     * 路由模式(Routing)
     * Routing模式 使用 Direct 类型的交换机,将接收到的消息路由到每一个跟其绑定的queue(队列)
     * 备注:
     * 1、生产者将消息发送给交换机,并指定路由规则
     * 2、交换机将收到的所有消息按照路由规则转发到跟当前交换机绑定的所有队列上(每个和当前交换机绑定的队列都会收到消息,然后被消费者消费,也就是说一条消息可以被多个消费者同时消费)
     * 3、路由模式不支持使用通配符,convertAndSend() 方法中指定的 routingKey 和队列绑定交换机时 with() 方法中配置的 routingKey 必须完全匹配才能将消息发送到具体的队列上
     */
    @RabbitListener(queues = {RabbitmqConfig.ROUTING_QUEUES_EMAIL})
    public void routingQueuesReceive(String message) throws InterruptedException {
        Thread.sleep(100);
        System.out.println("consumer-02,路由模式(Routing),消费者二*绑定队列*ROUTING_QUEUES_EMAIL,接收到的消息:" + message);
    }

Ⅳ、测试
① 启动生产者工程 test-11-rabbitmq-producer
② 启动消费者工程 test-12-rabbitmq-consumer-01
③ 启动消费者工程 test-12-rabbitmq-consumer-02
④ 执行测试类 ModelProducerTest 中的 routingSendMessageTest() 方法,发送消息
⑤ 消息监听结果, test-12-rabbitmq-consumer-01 的 routingQueuesReceive() 方法中收到消息如下:
在这里插入图片描述

⑥ 消息监听结果, test-12-rabbitmq-consumer-02 的 routingQueuesReceive() 方法中收到消息如下:
在这里插入图片描述
问题:
1、路由模式(Routing)和 发布订阅模式(Publish/Subscribe)的区别在哪?
交换机不同
路由模式使用的是 direct 交换机,direct 交换机会把消息路由到那些 binding key 与 routing key 完全匹配的队列中。(不仅要队列绑定交换机,还要验证 binding key 与 routing key 是否匹配,两个条件均满足才能收到消息)
发布订阅模式使用的是 fanout 交换机,fanout 交换机会把所有发送到该交换机的消息路由到所有与它绑定的队列中。(只要队列绑定了交换机就能收到消息,不会验证 binding key 与 routing key 是否匹配)
2、什么是 routing key ?什么是 binding key ?
routing key:convertAndSend(String exchange, String routingKey, Object object)
binding key:BindingBuilder.bind(queue).to(exchange).with(“publish.subscribe.queues.02”).noargs(); – with() 中的就是 binding key

5、主题模式(Topic,也叫通配符模式)
Ⅰ、模式特点
① 使用 topic 交换机
② 其他和路由模式基本一样,只是在路由模式的基础上增加了 使用通配符来进行 RoutingKey 匹配的功能

Ⅱ、生产者实现
① 在生产者工程 test-11-rabbitmq-producer 下 RabbitMQ 的配置类 RabbitmqConfig 中添加两个队列和一个自定义交换机,添加完成后将 RabbitmqConfig 复制到 rabbitmq-consumer-01 和 rabbitmq-consumer-02 工程中,生产者和所有消费者中的配置类完全一样即可,添加后的内容如下:

package com.test.rabbitmq_producer.config;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * RabbitMQ 消息生产者配置类
 * 配置类需要配置哪些东西:
 * 1、定义队列
 * 2、定义交换机
 * 3、配置交换机类型等相关信息(比如说:是否持久化、是否自动删除...)
 * 4、声明队列
 * 5、队列绑定交换机(RabbitMQ默认使用direct交换机,每个新建的队列都会自动绑定到 direct 交换机上,且绑定的路由键 routingKey 名称和队列名称相同。如果想让队列绑定其他交换机,可以手动进行绑定。)
 */
@Slf4j
@Configuration
public class RabbitmqConfig {

    //定义队列
    public static final String SIMPLE_QUEUE = "simple.queue";//简单模式测试队列
    public static final String WORK_QUEUES = "work.queues";//工作队列模式测试队列
    public static final String PUBLISH_SUBSCRIBE_QUEUES_1 = "publish.subscribe.queues.01";//发布订阅模式测试队列
    public static final String PUBLISH_SUBSCRIBE_QUEUES_2 = "publish.subscribe.queues.02";//发布订阅模式测试队列
    public static final String ROUTING_QUEUES_SMS = "routing.queues.sms";//路由模式测试队列
    public static final String ROUTING_QUEUES_EMAIL = "routing.queues.email";//路由模式测试队列
    public static final String TOPIC_QUEUES_SMS = "topic_queues_sms";//主题模式短信队列
    public static final String TOPIC_QUEUES_EMAIL = "topic_queues_email";//主题模式邮件队列

	//定义交换机
	public static final String PUBLISH_SUBSCRIBE_FANOUT_TEST_EXCHANGE = "fanout_exchange";//发布订阅模式测试交换机
	public static final String ROUTING_DIRECT_TEST_EXCHANGE = "direct_exchange";//路由模式测试交换机
	public static final String TOPIC_TEST_EXCHANGE = "topic_exchange";//主题模式测试交换机

    //声明-简单模式测试队列
    @Bean(SIMPLE_QUEUE)
    public Queue SIMPLE_QUEUE() {
        return new Queue(SIMPLE_QUEUE);
    }

	//声明-工作队列模式测试队列
    @Bean(WORK_QUEUES)
    public Queue WORK_QUEUES() {
        return new Queue(WORK_QUEUES);
    }
    
	//声明-发布订阅模式测试队列01
    @Bean(PUBLISH_SUBSCRIBE_QUEUES_1)
    public Queue PUBLISH_SUBSCRIBE_QUEUES_1() {
        return new Queue(PUBLISH_SUBSCRIBE_QUEUES_1);
    }

    //声明-发布订阅模式测试队列02
    @Bean(PUBLISH_SUBSCRIBE_QUEUES_2)
    public Queue PUBLISH_SUBSCRIBE_QUEUES_2() {
        return new Queue(PUBLISH_SUBSCRIBE_QUEUES_2);
    }

	//声明-路由模式短信测试队列
    @Bean(ROUTING_QUEUES_SMS)
    public Queue ROUTING_QUEUES_SMS() {
        return new Queue(ROUTING_QUEUES_SMS);
    }

    //声明-路由模式邮件测试队列
    @Bean(ROUTING_QUEUES_EMAIL)
    public Queue ROUTING_QUEUES_EMAIL() {
        return new Queue(ROUTING_QUEUES_EMAIL);
    }

	//声明-主题模式短信队列
    @Bean(TOPIC_QUEUES_SMS)
    public Queue TOPIC_QUEUES_SMS() {
        return new Queue(TOPIC_QUEUES_SMS);
    }

    //声明-主题模式邮件队列
    @Bean(TOPIC_QUEUES_EMAIL)
    public Queue TOPIC_QUEUES_EMAIL() {
        return new Queue(TOPIC_QUEUES_EMAIL);
    }

	/**
     * 配置交换机--fanout交换机
     * @return
     */
	@Bean(PUBLISH_SUBSCRIBE_FANOUT_TEST_EXCHANGE)
    public Exchange PUBLISH_SUBSCRIBE_FANOUT_TEST_EXCHANGE() {
        return ExchangeBuilder.fanoutExchange(PUBLISH_SUBSCRIBE_FANOUT_TEST_EXCHANGE).durable(true).build();
    }

	/**
     * 配置交换机--direct交换机
     * @return
     */
    @Bean(ROUTING_DIRECT_TEST_EXCHANGE)
    public Exchange ROUTING_DIRECT_TEST_EXCHANGE() {
        return ExchangeBuilder.directExchange(ROUTING_DIRECT_TEST_EXCHANGE).durable(true).build();
    }

	/**
     * 配置交换机--topic交换机
     * @return
     */
    @Bean(TOPIC_TEST_EXCHANGE)
    public Exchange TOPIC_TEST_EXCHANGE() {
        return ExchangeBuilder.topicExchange(TOPIC_TEST_EXCHANGE).durable(true).build();
    }

	/**
     * 队列绑定到交换机
     * @param queue         队列
     * @param exchange      交换机
     * @return
     */
    @Bean
    public Binding BINDING_PUBLISH_SUBSCRIBE_QUEUES_1(@Qualifier(PUBLISH_SUBSCRIBE_QUEUES_1) Queue queue,
        @Qualifier(PUBLISH_SUBSCRIBE_FANOUT_TEST_EXCHANGE) Exchange exchange) {
        //在 with() 中指定 routingKey ,routingKey 中的 # 号表示 通配符,可以匹配任何字符
        return BindingBuilder.bind(queue).to(exchange).with("publish.subscribe.queues.01").noargs();
    }

    /**
     * 队列绑定到交换机
     * @param queue         队列
     * @param exchange      交换机
     * @return
     */
    @Bean
    public Binding BINDING_PUBLISH_SUBSCRIBE_QUEUES_2(@Qualifier(PUBLISH_SUBSCRIBE_QUEUES_2) Queue queue,
        @Qualifier(PUBLISH_SUBSCRIBE_FANOUT_TEST_EXCHANGE) Exchange exchange) {
        //在 with() 中指定 routingKey ,routingKey 中的 # 号表示 通配符,可以匹配任何字符
        return BindingBuilder.bind(queue).to(exchange).with("publish.subscribe.queues.02").noargs();
    }

	/**
     * 队列绑定到交换机
     * @param queue         队列
     * @param exchange      交换机
     * @return
     */
    @Bean
    public Binding BINDING_ROUTING_QUEUES_SMS(@Qualifier(ROUTING_QUEUES_SMS) Queue queue,
        @Qualifier(ROUTING_DIRECT_TEST_EXCHANGE) Exchange exchange) {
        //在 with() 中指定 routingKey ,routingKey 中的 # 号表示 通配符,可以匹配任何字符
        return BindingBuilder.bind(queue).to(exchange).with("routing.queues.sms").noargs();
    }

    /**
     * 队列绑定到交换机
     * @param queue         队列
     * @param exchange      交换机
     * @return
     */
    @Bean
    public Binding BINDING_ROUTING_QUEUES_EMAIL(@Qualifier(ROUTING_QUEUES_EMAIL) Queue queue,
        @Qualifier(ROUTING_DIRECT_TEST_EXCHANGE) Exchange exchange) {
        //在 with() 中指定 routingKey ,routingKey 中的 # 号表示 通配符,可以匹配任何字符
        return BindingBuilder.bind(queue).to(exchange).with("routing.queues.email").noargs();
    }

	/**
     * 队列绑定到交换机
     * @param queue         队列
     * @param exchange      交换机
     * @return
     */
    @Bean
    public Binding BINDING_TOPIC_QUEUES_SMS(@Qualifier(TOPIC_QUEUES_SMS) Queue queue,
        @Qualifier(TOPIC_TEST_EXCHANGE) Exchange exchange) {
        //在 with() 中指定 routingKey ,routingKey 中的 # 号表示 通配符,可以匹配任何字符
        return BindingBuilder.bind(queue).to(exchange).with("topic.queues.sms.*").noargs();
    }

    /**
     * 队列绑定到交换机
     * @param queue         队列
     * @param exchange      交换机
     * @return
     */
    @Bean
    public Binding BINDING_TOPIC_QUEUES_EMAIL(@Qualifier(TOPIC_QUEUES_EMAIL) Queue queue,
        @Qualifier(TOPIC_TEST_EXCHANGE) Exchange exchange) {
        //在 with() 中指定 routingKey ,routingKey 中的 # 号表示 通配符,可以匹配任何字符
        return BindingBuilder.bind(queue).to(exchange).with("topic.queues.*.email").noargs();
    }

}

② 在 生产者工程 test-11-rabbitmq-producer 下测试类 ModelProducerTest 中新增一个方法 topicSendMessageTest(),内容如下:

	/**
     * 主题模式(Topic,也叫通配符模式)
     * Topic模式 使用 topic 类型的交换机,将接收到的消息路由到每一个跟其绑定的queue(队列)
     * 备注:
     * 1、生产者将消息发送给交换机,并指定路由规则
     * 2、交换机将收到的所有消息按照路由规则转发到跟当前交换机绑定的所有队列上(每个和当前交换机绑定的队列都会收到消息,然后被消费者消费,也就是说一条消息可以被多个消费者同时消费)
     * 3、主题模式 相比于 路由模式 增加了通配符的使用
     * RabbitMQ 通配符有哪些:
     * 1、 # :代表没有或一个或多个单词(不是一个字符哦!单词与单词之间用“.”分割);
     * 2、 * :代表没有或一个英文单词(不是一个字符哦);
     * 通配符具体使用案例:
     * 1、topic.queues.sms.#     能匹配     topic.queues.sms、topic.queues.sms.111(111 可以换成任何字符)
     * 2、topic.queues.#.email       能匹配     topic.queues.email、topic.queues.111.email(111 可以换成任何字符)
     */
    @Test
    public void topicSendMessageTest() throws InterruptedException {
        for (int i = 1; i <= 9; i++) {
            String currTime = TimeFormatUtil.getTimeByMs(System.currentTimeMillis(), "yyyy-MM-dd HH:mm:ss");
            //要发送的消息内容
            String message = "hello,topic.queues,发送第 " + i + " 条消息,发送时间:" + currTime;
            String routingKey = "";
            if (i % 3 == 0) {//3、6、9
                //发送短信
                message = message.replace("queues", "queues.sms");
                routingKey = "topic.queues.sms";
            }  else if (i % 3 == 1){//1、4、7
                //发送邮件
                message = message.replace("queues", "queues.email");
                routingKey = "topic.queues.email";
            } else {//2、5、8
                //短信和邮件都发送
                message = message.replace("queues", "queues.sms.email");
                routingKey = "topic.queues.sms.email";
            }
            /**
             * convertAndSend(String exchange, String routingKey, Object object):
             * 参数一(String exchange):交换机名称
             * 参数二(String routingKey):指定的路由,Topic模式下支持使用通配符进行匹配
             * 参数三(Object object):发送的消息内容
             */
            rabbitTemplate.convertAndSend(RabbitmqConfig.TOPIC_TEST_EXCHANGE,routingKey,message);
            System.out.println("主题模式(Topic,也叫通配符模式),生产者,发送第 " + i + " 条消息,发送的消息内容:" + message);
//            Thread.sleep(500);
        }
    }

Ⅲ、消费者实现
① 在 消费者工程 test-12-rabbitmq-consumer-01 下消息监听类 ModelListener 中新增一个监听方法 topicQueuesReceive(),内容如下:

	/**
     * 主题模式(Topic,也叫通配符模式)
     * Topic模式 使用 topic 类型的交换机,将接收到的消息路由到每一个跟其绑定的queue(队列)
     * 备注:
     * 1、生产者将消息发送给交换机,并指定路由规则
     * 2、交换机将收到的所有消息按照路由规则转发到跟当前交换机绑定的所有队列上(每个和当前交换机绑定的队列都会收到消息,然后被消费者消费,也就是说一条消息可以被多个消费者同时消费)
     * 3、主题模式 相比于 路由模式 增加了通配符的使用
     * RabbitMQ 通配符有哪些:
     * 1、 # :代表没有或一个或多个单词(不是一个字符哦!单词与单词之间用“.”分割);
     * 2、 * :代表没有或一个英文单词(不是一个字符哦);
     * 通配符具体使用案例:
     * 1、topic.queues.sms.#     能匹配     topic.queues.sms、topic.queues.sms.111(111 可以换成任何字符)
     * 2、topic.queues.#.email       能匹配     topic.queues.email、topic.queues.111.email(111 可以换成任何字符)
     */
    @RabbitListener(queues = {RabbitmqConfig.TOPIC_QUEUES_SMS})
    public void topicQueuesReceive(String message) {
        System.out.println("consumer-01,主题模式(Topic,也叫通配符模式),消费者一*绑定队列*TOPIC_QUEUES_SMS,接收到的消息:" + message);
    }

② 在 消费者工程 test-12-rabbitmq-consumer-02 下消息监听类 ModelListener 中新增一个监听方法 routingQueuesReceive(),内容如下:

	/**
     * 主题模式(Topic,也叫通配符模式)
     * Topic模式 使用 topic 类型的交换机,将接收到的消息路由到每一个跟其绑定的queue(队列)
     * 备注:
     * 1、生产者将消息发送给交换机,并指定路由规则
     * 2、交换机将收到的所有消息按照路由规则转发到跟当前交换机绑定的所有队列上(每个和当前交换机绑定的队列都会收到消息,然后被消费者消费,也就是说一条消息可以被多个消费者同时消费)
     * 3、主题模式 相比于 路由模式 增加了通配符的使用
     * RabbitMQ 通配符有哪些:
     * 1、 # :代表没有或一个或多个单词(不是一个字符哦!单词与单词之间用“.”分割);
     * 2、 * :代表没有或一个英文单词(不是一个字符哦);
     * 通配符具体使用案例:
     * 1、topic.queues.sms.#     能匹配     topic.queues.sms、topic.queues.sms.111(111 可以换成任何字符)
     * 2、topic.queues.#.email       能匹配     topic.queues.email、topic.queues.111.email(111 可以换成任何字符)
     */
    @RabbitListener(queues = {RabbitmqConfig.TOPIC_QUEUES_EMAIL})
    public void topicQueuesReceive(String message) {
        System.out.println("consumer-02,主题模式(Topic,也叫通配符模式),消费者二*绑定队列*TOPIC_QUEUES_EMAIL,接收到的消息:" + message);
    }

Ⅳ、测试
① 启动生产者工程 test-11-rabbitmq-producer
② 启动消费者工程 test-12-rabbitmq-consumer-01
③ 启动消费者工程 test-12-rabbitmq-consumer-02
④ 执行测试类 ModelProducerTest 中的 topicSendMessageTest() 方法,发送消息
⑤ 消息监听结果, test-12-rabbitmq-consumer-01 的 topicQueuesReceive() 方法中收到消息如下:
在这里插入图片描述
⑥ 消息监听结果, test-12-rabbitmq-consumer-02 的 topicQueuesReceive() 方法中收到消息如下:
在这里插入图片描述
问题:
1、主题模式(Topic,也叫通配符模式)和 路由模式(Routing)的区别在哪?
主题模式只是在路由模式的基础上增加了通配符匹配,其他没区别。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/94480.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

外汇天眼:分分飞艇──谎称33倍高收益,入金投资获利要不回

在这个万物皆涨、薪水不涨的年代&#xff0c;大多数人都知道投资的重要性&#xff0c;但因为受限于本身的知识与技巧不足&#xff0c;经常看错市场方向或选错标的而亏损&#xff0c;并因此感到苦恼不已。此时若看到人宣称有无风险高获利的赚钱管道&#xff0c;不免会跃跃欲试。…

SM59 事物码里的错误消息 SECSTORE035

系统无法访问全局键值&#xff0c;其存储位置 在配置文件参数 rsec/securestorage/keyfile 中指定。 使用事物码 RZ11&#xff0c;输入 rsec/securestorage/keyfile&#xff0c;点击 Display&#xff1a; 当这个参数路径指向的 .pse 文件包含非法字符或者文件内容小于 48 个字…

Matlab实现|多元宇宙算法求解电力系统多目标优化问题(期刊论文复现)

结果和这几种算法进行比较&#xff1a; 目录 1 概述 2 Matlab完整代码实现 3 结果 1 概述 提出了一种求解电力系统环境经济调度的新方法,该方法利用宇宙空间在随机创建过程中高膨胀率的物体随虫洞在空间移动物体的规律,通过对白洞和黑洞间随机传送物体来实现最优搜索. 算法…

5.1 自然语言处理综述

文章目录致命密码&#xff1a;一场关于语言的较量一、自然语言处理的发展历程1.1 兴起时期1.2 符号主义时期1.3 连接主义时期1.4 深度学习时期二、自然语言处理技术面临的挑战2.1 语言学角度2.1.1 同义词问题2.1.2 情感倾向问题2.1.3 歧义性问题2.1.4 对话/篇章等长文本处理问题…

猿如意中【ndm】助你轻松管理你的 NPM包

目录 一、ndm 简介 1.1、下载 ndm-1.exe 版本&#xff08;v1.2.0&#xff09; 1.2、安装 1.3、版本迭代更新记录 1.3.1、ndm v0.1.4 已发布https://github.com/720kb/ndm/releases/tag/v0.1.4 1.3.2、ndm v1.0.0 发布&#xff0c;现已完全跨平台Windows、Mac、Linux 1.3.3、…

cad 怎么取消绘图界限?cad怎么调整图形界限

1、在CAD中&#xff0c;如何设置图形界限&#xff1f; 1、电脑打开CAD&#xff0c;输入limits命令&#xff0c;空格键确定。 2、确定命令后&#xff0c;选择格式中的图形界限。 3、点击图形界限后&#xff0c;会出现重新设置模型空间界限&#xff0c;接着再点击键盘上的回车键…

gcexcel:GrapeCity Documents for Excel v6/NET/Crack

高速 .NET 6 Excel 电子表格 API 库 使用此快速电子表格 API&#xff0c;以编程方式在 .Net 6、.Net 5、.NET Core、.NET Framework 和 Xamarin 跨平台应用程序中创建、编辑、导入和导出 Excel 电子表格。 创建、加载、编辑和保存 Excel .xlsx 电子表格 保存为 .XLSX、PDF、HTM…

C#基于ASP.NET的人事薪资管理系统

ASP.NET20003人事薪资管理系统,SQL数据库&#xff1a;VS2010开发环境,包含员工管理,部门管理,工资管理,绩效管理等功能,并且包含五险一金的计算 3.3 功能需求 3.3.1 员工部分 1&#xff1a;查看工资&#xff1a;以列表的形式查看系统现存的员工工资信息。 2&#xff1a;查看个…

SpringBoot自定义banner—卡塔尔世界杯吉祥物

自定义banner文件 SpringBoot项目在启动的时候&#xff0c;会有一个大大的Spring首先展示出来 . ____ _ __ _ _/\\ / ____ __ _ _(_)_ __ __ _ \ \ \ \ ( ( )\___ | _ | _| | _ \/ _ | \ \ \ \\\/ ___)| |_)| | | | | || (_| | ) ) ) ) |____| .__|…

好书推荐:《Python编程:从入门到实践(第2版)》——写给Python入门者的最好教程

缘起 这段时间把图灵社区的《Python编程&#xff1a;从入门到实践&#xff08;第2版&#xff09;》看完了&#xff0c;在此做一个记录&#xff0c;先谈一下自己最直观的感受&#xff0c;这本书的定位是入门&#xff0c;在我看来&#xff0c;这个目的确实是达到了的&#xff0c…

98.(leaflet之家)leaflet态势标绘-分队战斗行动采集

听老人家说:多看美女会长寿 地图之家总目录(订阅之前建议先查看该博客) 文章末尾处提供保证可运行完整代码包,运行如有问题,可“私信”博主。 效果如下所示: 下面献上完整代码,代码重要位置会做相应解释 <!DOCTYPE html> <html>

Cantor表——洛谷(Java)

题目描述 现代数学的著名证明之一是 Georg Cantor 证明了有理数是可枚举的。他是用下面这一张表来证明这一命题的&#xff1a; 1/11/1 , 1/21/2 , 1/31/3 , 1/41/4, 1/51/5, … 2/12/1, 2/22/2 , 2/32/3, 2/42/4, … 3/13/1 , 3/23/2, 3/33/3, … 4/14/1, 4/24/2, … 5/1…

04. XSS漏洞原理

04. XSS漏洞原理 XSS漏洞原理&#xff08;上&#xff09; 弹窗是怎么实现的&#xff1f; 案例 攻击利用 什么是XSS&#xff1f; XSS&#xff08;Cross Site Scripting&#xff09;&#xff1a;跨站脚本攻击&#xff0c;为了不和层叠样式表&#xff08;Cascading Style Shee…

tensorflow 歌曲题材分类

librosa音频处理 Librosa是一个用于音频、音乐分析、处理的python工具包&#xff0c;一些常见的时频处理、特征提取、绘制声音图形等功能应有尽有&#xff0c;功能十分强大. 加载音频 import librosa x , sr librosa.load(music.au) #歌曲的时长 d librosa.get_duration(…

【Matlab】论文各种图例配色Matlab绘制

1. Matlab 绘图 1.1. Plot 函数 x-pi:pi/10:pi; %以pi/10为步长 ytan(sin(x))-sin(tan(x)); %求出各点上的函数值 plot(x,y,--rs,... %绘制红色的虚线&#xff0c;且每个转折点上用正方形表示。LineWidth,2,... % 设置线宽为2Marke…

Web(十)JavaScript知识训练-JS分支与循环

1、执行下面语句后c的值是&#xff08;D &#xff09;。 var a2,b1,c3; if(a<b){ c0; } else{ c&#xff1b; } A、 1 B、 2 C、 3 D、 4 2、var afalse; var x a?"A":"B"; 在上面的程序片段中&#xff0c;x的值是&#xff08;B &#xff09; A、 A …

XXL-Job分布式任务调度框架-- 介绍和服务搭建

一 xxl-job介绍 1.1 xxl-job介绍 xxl-job是轻量级的分布式任务调度框架&#xff0c;目标是开发迅速、简单、清理、易扩展; 老版本是依赖quartz的定时任务触发&#xff0c;在v2.1.0版本开始 移除quartz依 。 分布式任务调度平台XXL-JOB/ 二 xxl-job的服务搭建 2.1 软件包获…

【数据结构-JAVA】ArrayList

目录 1. 线性表 2. 顺序表(ArrayList) 2.1 什么是顺序表&#xff1f; 2.2 顺序表的使用 2.2.1 ArrayList 的构造方法 2.2.2 ArrayList 的常规操作 2.2.3 ArrayList 的遍历 2.3 顺序表的优缺点 3. 练习题 3.1 练习1 一道面试题 3.2 练习2 杨辉三角形 3.3 练习3 洗牌算法 3.4 …

第十四届蓝桥杯集训——switch——配套用法示例

第十四届蓝桥杯集训——switch——配套用法示例 示例题目&#xff1a; 计算某年某月某日有多少天&#xff1f; 输入三个变量&#xff0c;变量year代表年份&#xff0c;变量month代表月份&#xff0c;变量day代表当月的天数。 取值范围&#xff1a;1853>year<2050;0>…

高比例风电电力系统储能运行及配置分析(Matlab实现)

目录 0 概述 1 案例及分析及分析 2 Matlab实现 3 结论 运行结果 目录 0 概述 1 案例及分析及分析 2 Matlab实现 3 结论 0 概述 高比例风电电力系统储能运行及配置分析 1 案例及分析及分析 针对附件2所示的十五天负荷功率&#xff08;最大值1200MW&#xff09;、风电功…