RabbitMQ入门 安装 SpringAMQP简单队列、工作队列、发布订阅(扇出模式,广播模式)、Direct模式(Roting模式)、Topic模式

news2025/4/5 22:30:21

一、RabbitMQ介绍

1. MQ介绍

1. 为什么要有MQ

同步调用与异步调用

程序里所有的通信,有两种形式:

  • 同步通信:通信时必须得到结果才会执行下一步

  • 异步通信:通信时不必等待结果,可以直接处理下一步

同步调用

解析:

同步调用的缺点

  • 业务链长,消耗时间增加,用户体验不好

  • 耦合性强

  • 流量洪峰服务器压力大

同步调用的好处:

  • 时效性强,可以立即得到结果

异步调用

解析:

异步调用的好处:

  • 异步调用,调用链短,用户等待时间短,体验好

  • 降低耦合,服务之间耦合性低了,任何一个服务出现问题,对其它服务的影响都非常小

  • 削峰填谷,中间件Broker具备一定的消息堆积与缓存能力,下游服务可以根据自己的能力慢慢处理

异步调用的坏处:

  • 业务复杂度增加:需要考虑数据一致性问题、消息丢失问题、消息重复消费问题、消息的顺序问题等等

  • 架构复杂度增加:对中间件Broker的依赖性增强了,必须保证Broker的高可用;一旦Broker出错,会对整个系统造成非常大的冲击

2. MQ介绍【面试】

什么是MQ

Message Queue,消息队列

消息中间件利用高效可靠的消息传递机制进行平台无关的数据交流,并基于数据通信来进行分布式系统的集成。通过提供消息传递和消息排队模型,它可以在分布式环境下扩展进程间的通信。

MQ的作用

  • 异步:实现服务之间异步通信

  • 削峰:实现流量的削峰填谷

  • 解耦:实现服务之间的耦合性降低

3. 常见的MQ

 在使用MQ时:

  • 追求可用性:Kafka、 RocketMQ 、RabbitMQ

  • 追求可靠性:RabbitMQ、RocketMQ

  • 追求吞吐能力:RocketMQ、Kafka

  • 追求消息低延迟:RabbitMQ、Kafka

2. RabbitMQ介绍

2.1 AMQP协议

AMQP,Advanced Message Queuing Protocol,高级消息队列,是一种网络协议。它是应用层协议的一个开发标准,为面向消息的中间件而设计。

基于此协议的客户端与消息中间件可传递消息,并不受客户端、中间件不同产品、不同编程语言的限制。

  • Publisher:消息发布者

  • Exchange:交换机。用于分发消息

  • Routes:路由。消息被分发到目标队列上的过程

  • Queue:消息队列。用于存储消息数据

  • Consumer:消息消费者。从队列里取出消息、处理消息

2.2 RabbitMQ

Rabbit公司基于AMQP协议标准,开发了RabbitMQ1.0。RabbitMQ采用Erlang语言开发,Erlang是专门为开发高并发和分布式系统的一种语言,在电信领域广泛使用。

官网地址:Messaging that just works — RabbitMQ

 

  • Broker:消息中间件,指的就是RabbitMQ服务器

  • Virtual Host:虚拟主机。当多个不同用户使用同一个RabbitMQ服务时,可以划分出多个vhost,每个用户在自己的vhost内创建exchange和queue等。

  • Connection:消息生产者、消费者 与 RabbitMQ之间建立的TCP连接

  • Channel:Channel作为轻量级的Connection,可以极大的减少建立Connection的开销。

    如果每一次访问RabbitMQ都建立一个TCP Connection,将会造成巨大的性能开销,性能非常低下。

    Channel是在Connection内部建立的逻辑连接,Channel之间完全隔离。如果应用程序支持多线程,通常每个线程创建独立的Channel进行通讯, AMQP method包含了channel id,帮助客户端和broker识别channel,所以channel之间是完全隔离的

  • Exchange:message到达broker的第一站,根据分发规则,匹配查询表中的routing key,分发消息到queue中去。常用的类型有:direct(point-to-point), topic(publish-subscribe),fanout(multicast)

  • Queue:消息最终被送到这里,等待Consumer取走

  • Binding:exchange和queue之间的虚拟连接,binding中可以包含routing key。Binding信息被保存到exchange的查询表中,用作message的分发依据

 

3. 小结

同步调用和异步调用
    同步调用:调用后必须得到结果,才会执行下一步
                     好处:

                              时效性强,每一步操作都能够及时得到结果
                     缺点:
                             耦合性太强,任何一个环节出错,都可能导致整个调用链出现问题
                             业务链太长,调用的时间花费比较长,用户体验差
                             难以处理流量洪峰,如果要增加并发能力,整个调用链所有环节都要增加
    异步调用:调用后不必等待结果,可以直接执行下一步
                     好处:
                            业务链短了,异步调用只要发出一条消息给broker,就可以执行下一步了,

                            不需要等等消息的消费

                     坏处:
                            业务复杂度增加了:需要考虑数据一致性问题,消息丢失的问题、消息重

                                                            复消费问题、消息的顺序问题
                            系统架构更复杂了:必须要保证Broker的高可用,否则一旦Broker出错,

                                                             对整个系统影响太大


常见的MQ有哪些:
    RabbitMQ:追求高性能
    RocketMQ:追求高稳定
    Kafka:追求高吞吐量
AMQP协议:是一种消息通信协议,规定了消息中间件要有以下结构
    Publisher:生产者,发布消息的服务是生产者
    Consumer:消费者,接收处理消息的服务是消费者
    Exchange:交换机,用于分发路由消息,把一条消息分发到对应的队列上
    Queue:队列,真正缓存堆积消息的队列,等待消费者进行处理
RabbitMQ:是根据AMQP协议设计出来的消息中间件
    Producer:生产者,发送消息的服务
    Consumer:消费者,接收消息的服务
    Connection:连接对象,无论是生产者还是消费者,都必须和RabbitMQ建立连接才可以通信
    Channel:通道,是一种轻量的连接,使用Channel进行通信,更灵活,消耗更小
    VirtualHost:虚拟主机,是对Exchange和Queue的逻辑分组,实现不同环境的隔离。不同VirtualHost之间是互相隔离的
    Exchange:交换机
    Queue:队列

二、RabbitMQ安装

1. 拉取RabbitMQ镜像

1. 拉取RabbitMQ镜像

  • 方式一:在线拉取镜像

    docker pull rabbitmq:3.8-management

    方式二:从本地加载镜像【我们采用这种方式】

    把资料中的《mq.tar》上传到虚拟机CentOS里

    在CentOS里执行命令加载镜像:

  • #先切换到mq.tar所在的目录
    
    #再执行命令加载镜像
    docker load -i mq.tar
    
    #加载后,查看一下镜像。找一下有没有rabbitmq这个镜像
    docker images

2. 安装RabbitMQ

执行下面的命令来运行MQ容器:

docker run \
 -e RABBITMQ_DEFAULT_USER=itcast \
 -e RABBITMQ_DEFAULT_PASS=123321 \
 -v mq-plugins:/plugins \
 --name mq \
 --hostname mq \
 -p 15672:15672 \
 -p 5672:5672 \
 -d \
 --restart=always \
 rabbitmq:3-management
  • Java程序连接RabbitMQ:使用端口5672

  • RabbitMQ控制台: http://ip:15672, 登录帐号:itcast, 密码:123321

3. RabbitMQ控制台

打开浏览器输入地址:http://ip:15672, 登录帐号:itcast, 密码:123321

在控制台里,可以查看、管理 交换机、队列等等

三、RabbitMQ入门

目标

使用RabbitMQ的简单模式,发送消息、接收消息

实现

RabbitMQ的简单模式,是所有消息模式里最简单、最基础的一种。如下图所示:

  • P:Producer或者Publisher,消息的生产者(或者叫 消息发布者)

  • C:Consumer,消息的消费者

  • queue:消息队列,图中红色部分即是

我们使用这种模式,实现入门案例:

  • 生产者把消息投递到消息队列里

  • 消费者从消息队列里获取消息

1. 创建工程

  • 创建project,作为父工程使用

  • 删除src文件夹

  • 修改pom.xml添加依赖坐标

<dependencies>
    <!-- RabbitMQ的客户端依赖包 -->
    <dependency>
        <groupId>com.rabbitmq</groupId>
        <artifactId>amqp-client</artifactId>
        <version>5.9.0</version>
    </dependency>
</dependencies>

2. 消息生产者

  • 在工程里创建Module:demo01-producer

  • 编写测试类

public class Demo01ProducerSimple {
    public static void main(String[] args) throws IOException, TimeoutException {
        //1. 连接RabbitMQ服务器
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.200.137");
        factory.setPort(5672);
        factory.setVirtualHost("/");
        factory.setUsername("itcast");
        factory.setPassword("123321");

        //2. 获取通道Channel
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        //3. 声明队列
        String queueName = "test_simple.queue";
        //  参数1:队列名称
        //  参数2:是否 持久化队列(持久化队列:重启RabbitMQ服务后,队列仍然存在)
        //  参数3:是否 独占队列(第一个消费者独占队列)
        //  参数4:是否 自动删除队列(自动删除队列:当所有消费者都断开连接后,队列自动删除)
        //  参数5:队列的附加参数
        channel.queueDeclare(queueName, true, false, false, null);

        //4. 发送消息:简单模式,交换机设置为空,路由key设置为队列名称
        //  参数1:交换机名称。简单模式时,设置为空字符串
        //  参数2:路由key。简单模式时,设置为队列名称
        //  参数3:消息的附加参数
        //  参数4:消息内容
        channel.basicPublish("", queueName, null, "这是一条简单消息".getBytes());

        //5. 释放资源,关闭连接
        channel.close();
        connection.close();
    }

3. 消费消费者

  • 在工程里创建Module:demo01-consumer

  • 编写测试类

    注意:消费者的代码要放到main方法里,不要写到单元测试方法内

public class Demo01ConsumerSimple {
    public static void main(String[] args) throws IOException, TimeoutException {
        //1. 连接RabbitMQ服务器
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.200.137");
        factory.setPort(5672);
        factory.setVirtualHost("/");
        factory.setUsername("itcast");
        factory.setPassword("123321");

        //2. 获取连接
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        //3. 声明消息队列(如果消息队列不存在,则创建消息队列;如果已存在,则不创建)
        String queueName = "test_simple.queue";
        channel.queueDeclare(queueName, true, false, false, null);

        //4. 监听消息
        //  参数1:队列名称。要从哪个队列里获取消息
        //  参数2:是否自动ack。当成功消费消息之后,会自动给RabbitMQ返回一个标识。RabbitMQ收到后,就知道消息已经被成功消费了,不会重新推送这条消息
        //  参数3:处理消息的回调        
        channel.basicConsume(queueName, true, new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("收到消息:" + new String(body));
            }
        });

        //等待接收消息。不要关闭Channel通道和Connection连接
        System.out.println("等待消息中……");
    }

}

4. 测试

  1. 运行生产者,发送消息

  2. 启动消费者,监听消息

  3. 可以看到消费者已经收到消息了

四、SpringAMQP

1. 准备代码环境

SpringAMQP是基于RabbitMQ封装的一套模板,并且还利用SpringBoot对其实现了自动装配,使用起来非常方便。

SpringAmqp的官方地址:Spring AMQP

AMQP全称:Advanced Message Queuing Protocol

AMQP翻译:高级消息队列协议

是一个:进程间传递异步消息的网络协议

SpringAMQP提供了三个功能

  • 自动声明队列交换机及其绑定关系

  • 基于注解监听器模式,异步接收消息

  • 封装了RabbitTemplate工具,用于发送消息

为了演示SpringAMQP的功能,我们需要先准备代码环境,步骤如下:

  1. 创建project,删除其src目录,然后添加依赖

  2. 创建生产者模块

  3. 创建消费者模块

1.1 创建project

创建maven类型的project,不选择骨架,直接设置工程坐标然后下一步

创建后,删除src目录

修改pom.xml添加依赖坐标如下:

<parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>2.3.9.RELEASE</version>
</parent>

<properties>
    <maven.compiler.source>8</maven.compiler.source>
    <maven.compiler.target>8</maven.compiler.target>
</properties>

<dependencies>
    <!--AMQP依赖,包含RabbitMQ-->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>

    <!--单元测试-->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-test</artifactId>
    </dependency>

    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
    </dependency>
</dependencies>

1.2 创建生产者模块

在project里创建module,起名称为demo-producer

依赖

不需要添加依赖,父工程里已经导入了依赖,子模块直接继承父工程里的依赖就足够了

配置

修改配置文件application.yaml

spring:
  application:
    name: demo-producer
  rabbitmq:
    host: 192.168.200.137 #RabbitMQ服务的ip
    port: 5672            #RabbitMQ服务的端口
    username: itcast	  #RabbitMQ的帐号
    password: 123321	  #RabbitMQ的密码

引导类

创建引导类,没有什么特殊的要求



import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class DemoProducerApplication {
    public static void main(String[] args) {
        SpringApplication.run(DemoProducerApplication.class, args);
    }
}

1.3 创建消费者模块

在project里创建module,起名称为demo-consumer

依赖

不需要添加依赖,父工程里已经导入了依赖,子模块直接继承父工程里的依赖就足够了

配置

修改配置文件application.yaml

spring:
  application:
    name: demo-producer
  rabbitmq:
    host: 192.168.200.137 #RabbitMQ服务的ip
    port: 5672            #RabbitMQ服务的端口
    username: itcast	  #RabbitMQ的帐号
    password: 123321	  #RabbitMQ的密码

引导类



import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class DemoConsumerApplication {
    public static void main(String[] args) {
        SpringApplication.run(DemoConsumerApplication.class, args);
    }
}

 

2. RabbitMQ工作模式

RabbitMQ提供了6种工作模式,参考:RabbitMQ Tutorials — RabbitMQ

  • basic queue:简单模式

  • work queues:工作队列集群消费

  • Publish/Subscribe:发布订阅模式,也称为Fanout,是一种消息广播模式

  • Routing:路由模式,也称为Direct模式

  • Topics:主题模式

  • RPC远程调用模式:远程调用,其实算不上MQ,这里不做介绍

无论哪种模式,可能都需要用到交换机、队列、交换机与队列的绑定。如何声明这些队列和交换机?

  • 在控制台页面上直接创建。不经常用

  • 使用@Bean的方式,声明交换机、队列、绑定关系

  • 使用注解方式,可以在消费者一方直接声明交换机、队列、绑定关系,适合于复杂的绑定关系声明

3. basic queue简单队列 

3.1 模式说明

basic queue:基本 队列

basic queue是RabbitMQ中最简单的一种队列模式:生产者把消息直接发送到队列queue消费者从queue里直接接收消息

 解析:

3.2 使用示例

生产者

生产者向某一队列发送消息时必须先声明队列;否则消息会发送失败。

我们使用@Bean的方式声明队列

声明队列

  1. 创建一个配置类Demo01SimpleConfig,类上添加@Configuration注解

  2. 类里定义一上方法,返回值为Queue,在方法里定义队列。方法上加注解@Bean

  3.  

    /**
     * 生产者-->simple.queue-->消费者
     * 需要:声明队列simple.queue
     */
    @Configuration
    public class Demo02SimpleConfig {
        /**
         * 声明 一个队列,队列名称为simple.queue
         * 注意:Queue类是org.springframework.amqp.core.Queue,不要导错了
         */
        @Bean
        public Queue simpleQueue(){
            return QueueBuilder.durable("producer.simple.queue").build();
        }
    }

    发送消息


@SpringBootTest
public class Demo02SimpleTest {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    public void test(){
        //参数1:队列名称
        //参数2:消息内容
        rabbitTemplate.convertAndSend("producer.simple.queue","生产者,制造中...");
    }
}

消费者

  1. 创建一个类,用于监听消息。类上需要添加@Component注解

  2. 类里定义一个方法,用于处理消息。

    方法上需要添加注解 @RabbitListener(queues = "队列名称")

    方法上需要添加一个String类型的形参:是接收到的消息内容

    

/**
 *监听器  消费者 接收者 收听广播
 */
@Slf4j
@Component
public class Demo02SimpleListener {
    @RabbitListener(queues = "producer.simple.queue")
    public void handleSimpleQueueMsg(String msg){
        log.info("从{}队列里接收到消息:{}","producer.simple.queue",msg);

    }
}

4. work queues工作队列

假如只有一个消费者处理消息,那么处理消息的速度就有可能赶不上发送消息的速度。该如何同时处理更多的消息呢?

可以在同一个队列创建多个竞争的消费者,以便消费者可以同时处理更多的消息

4.1 模式说明

多个消费者相互竞争从同一个队列里获取消息生产者发送的消息将被所有消费者分摊

消费

注意: 一个队列里一条消息只能被消费一次不可能多个消费者同时消费处理

           对于任务过重,或者任务较多的情况,使用工作队列可以提高任务处理的速度

例如:短信通知服务 订单完成后要发短信通知

4.2 示例代码

生产者

声明队列

创建配置类Demo02WorkQueueConfig里使用@Bean声明队列

/**
 *                  ↗消费者1
 * 生产者-->work.queue
 *                  ↘消费者2
 * 需要:声明队列:work.queue
 */
@Configuration
public class Demo03WorkQueuesConfig {

/**
 * 声明 一个队列,队列名称是 work.queue
 * 注意:Queue类是org.springframework.amqp.core.Queue,不要导错了
*/
    @Bean
    public Queue workQueue(){
        return QueueBuilder.durable("producer.work.queue").build();
    }
}

发送消息

创建测试类,添加测试方法,发送消息:

@SpringBootTest
public class Demo02WorkQueueTest {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    public void test() {
        for (int i = 0; i < 10; i++) {
            rabbitTemplate.convertAndSend("producer.work.queue", "这是work队列消息" + i);
        }
    }
}

测试:

消费者

@Slf4j
@Component
public class Demo03WorkQueueListener {
    @RabbitListener(queues = "producer.work.queue")
    public void handleWorkQueueMsg1(String msg){
        log.info("消费者1从{}队列里接收到消息{}","producer.work.queue",msg);
    }
    @RabbitListener(queues = "producer.work.queue")
    public void handleWorkQueueMsg2(String msg){
        log.info("消费者2从{}队列里接收到消息{}","producer.work.queue",msg);
    }
}

测试:

 

4.3 注意事项

在WorkQueues模式的默认情况下,一个队列里的所有消息,将平均分配给每个消费者

这种情况并没有考虑到消费者的实际处理能力,显然是有问题的。

例如:生产者发送了50条消息,有两个消费者,各接收到了25条消息。假如

  • 消费者1,每秒能处理100条消息。 很快就能处理完消息

  • 消费者2,每秒能处理10条消息。 消息堆积越来越多

解决这个问题其实非常简单:让每个消费者一次性只拉取1条消息

修改消费者的配置文件application.yaml:

spring:
  rabbitmq:
    listener:
      simple:
        prefetch: 1 #消费者一次抓取几条消息

5. Publish/Subscribe发布订阅(Fanout)

在上一章节中,我们创建了一个工作队列。工作队列背后的假设是,每个任务只传递给一个消费者。在这一节中,我们将做一些完全不同的事情——我们将向多个消费者传递一条消息。这种模式称为“发布/订阅”

5.1 模式说明

                

  • P:生产者,用于发送消息。但是生产者要把消息发给交换机(X)

  • X:Exchange交换机

    • 接收消息,接收生产者发送的消息

    • 处理消息,把消息投递给某个或某些队列,或者把消息丢弃。具体会如何操作,由Exchange类型决定:

      • Fanout:广播,把消息交给绑定的所有队列(交换机绑定了哪些队列,就把消息投递给这些队列)

      • Direct:定向,把消息交给符合指定routing key的队列

      • Topic:通配符,把消息交给符合routing pattern的队列

    • 注意:交换机只负责转发消息,不具备存储消息的能力。所以如果没有任何队列与Exchange绑定,或者没有符合路由规则的队列,消息将会丢失

  • Queue:消息队列,接收消息、缓存消息

  • C:消费者,等待消息、处理消息

5.2 示例代码

为了说明这种模式,我们将构建一个简单的日志系统。它将由两个程序组成:

  • 生产者程序将发出日志消息

  • 消费者程序将接收日志消息

    • 第一组消费者,接收到日志消息并保存到磁盘上

    • 第二组消费者,接收到日志消息并打印到控制台

生产者

声明队列

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 *                       ↗ producer.fanout.queue1---消费者1
 * 生产者-->fanout.exchange
 *                       ↘ producer.fanout.queue1---消费者2
 *
 * 需要:
 *      1. 声明Fanout类型的交换机:fanout.exchange
 *      2. 声明队列1:producer.fanout.queue1
 *         声明队列2:producer.fanout.queue2
 *      3. 把交换机与队列1绑定
 *         把交换机与队列2绑定
 */

@Configuration
public class Demo04FanoutConfig {
    @Bean
    public Queue fanoutQueue1() {
        return QueueBuilder.durable("producer.fanout.queue1").build();
    }

    @Bean
    public Queue fanoutQueue2() {
        return QueueBuilder.durable("producer.fanout.queue2").build();
    }

    @Bean
    public FanoutExchange fanoutExchange() {
        return ExchangeBuilder.fanoutExchange("producer.fanout.exchange").build();
    }
    /**
     * 注意形参的名称。Spring在注入对象时,首先根据形参类型,从容器里查找对象。现在根据类型找到多个,根据形参名称byName注入
     * @param fanoutQueue1
     * @param fanoutExchange
     * @return
     */
    @Bean
    public Binding fanoutQueue1Biding(Queue fanoutQueue1, FanoutExchange fanoutExchange){
        return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);
    }
    @Bean
    public Binding fanoutQueue2Binding(Queue fanoutQueue2,FanoutExchange fanoutExchange){
        return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);
    }
}

        发送消息

@SpringBootTest
public class Demo04FanoutTest {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    public void test() {
        //参数1:交换机名称
        //参数2:路由key。fanout类型的交换机不需要使用路由key,把这个值写成空的
        //参数3:消息内容
        rabbitTemplate.convertAndSend("producer.fanout.exchange", "", "是一条Fanout广播消息");
    }
}

消费者

@Slf4j
@Component
public class Demo04Fanoutlistener {
    @RabbitListener(queues = "producer.fanout.queue1")
    public void handleFanoutQueue1Msg(String msg) {
        log.info("消费者1从{}队列里收到消息:{}","producer.fanout.queue1",msg);
    }
    @RabbitListener(queues = "producer.fanout.queue2")
    public void handleFanoutQueue2Msg(String msg) {
        log.info("消费者2从{}队列里收到消息:{}","producer.fanout.queue2",msg);
    }

}

测试:

 

 

6. Direct(Routing)

在上一个章节中,我们构建了一个简单的日志系统。我们能够向许多消费者广播日志消息。

在本节中,我们将向其添加一个功能:我们将使消费者能够仅订阅消息的子集。例如:

  • 只能将关键错误消息定向到日志文件(以节省磁盘空间)

  • 同时仍然能够在控制台上打印所有日志消息。

6.1 模式说明

  • 队列在绑定交换机时,需要给队列指定一个Routing Key(路由key)

  • 生产者在发送消息时,必须指定消息的Routing Key

  • 交换机根据消息的RoutingKey进行判断:只有队列的RoutingKey 与 消息的RoutingKey完全相同,才会收到消息

 

6.2 示例代码

生产者

声明队列

/**
 *                      ↗ error-->direct.error.queue-->消费者1
 * 生产者-->direct.exchange
 *                      ↘ info---->direct.all.queue---->消费者2
 *                      ↘ error--->direct.all.queue---->消费者2
 * 需要:
 *      1. 声明Direct类型的交换机:direct.exchange
 *      2. 声明队列1:direct.error.queue,用于接收错误日志信息
 *         声明队列2:direct.all.queue,  用于接收所有日志信息
 *      3. 把交换机与队列1绑定:把 RoutingKey为error的消息,投递到队列1
 *         把交换机与队列2绑定:把 RoutingKey为error的消息,投递到队列2
 *         把交换机与队列2绑定:把 RoutingKey为info的消息, 投递到队列2
 */
@Configuration
public class Demo05DirectConfig {
    @Bean
    public Queue directQueue1() {
        return QueueBuilder.durable("producer.direct.queue1").build();
    }

    @Bean
    public Queue directQueue2() {
        return QueueBuilder.durable("producer.direct.queue2").build();
    }

    @Bean
    public DirectExchange directExchange() {
        return ExchangeBuilder.directExchange("producer.direct.exchange").build();
    }

    @Bean
    public Binding directQueue1Binding(Queue directQueue1, DirectExchange directExchange) {
        return BindingBuilder.bind(directQueue1).to(directExchange).with("error");
    }

    @Bean
    public Binding directQueue2BindingInfo(Queue directQueue2, DirectExchange directExchange) {
        return BindingBuilder.bind(directQueue2).to(directExchange).with("info");
    }

    @Bean
    public Binding direcQueue2BindingError(Queue directQueue2, DirectExchange directExchange) {
        return BindingBuilder.bind(directQueue2).to(directExchange).with("error");
    }
}

发消息:

@SpringBootTest
public class Demo05DirectTest {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    @Test
    public void test(){
        //rabbitTemplate.convertAndSend("producer.direct.exchange","error","这是一条错误日志");
        rabbitTemplate.convertAndSend("producer.direct.exchange","info","这是一条正常日志");
    }
}

消费者

@Slf4j
@Component
public class Demo05DirectListener {
    @RabbitListener(queues = "producer.direct.queue1")
    public void handleDirectQueue1(String msg) {
        log.info("消费者1从{}队列里收到消息:{}", "producer.direct.queue1", msg);
    }

    @RabbitListener(queues = "producer.direct.queue2")
    public void handleDirectQueue2(String msg) {
        log.info("消费者2从{}队列里收到消息:{}", "producer.direct.queue2", msg);
    }
}

测试:

 

 

 

7. Topic【重点】

在上一个章节中,我们改进了日志系统。我们没有使用仅能进行消息广播的FANOUT,而是使用了DIRECT,实现了了有选择地接收日志。

虽然使用DIRECT改进了我们的系统,但它仍然有局限性——它不能基于多个标准进行路由,例如:

  • 第一组消费者,要接收所有系统的所有日志消息,打印到控制台

  • 第二组消费者,要接收所有系统的错误日志消息,和订单系统的所有日志消息,保存到磁盘

为了在日志系统中实现这一点,我们需要了解更复杂的TOPIC交换机。

7.1 模式说明

​​​​​​

  • RoutingKey:发送到TOPIC的消息不能有任意的routing键,它:

    • 必须是由点分隔的单词列表

    • 可以有任意多个单词,最多255个字节

    • 可使用* 星号,匹配一个单词

    • 可使用#,匹配0个或多个单词

  • 使用特定RoutingKey发送的消息,将被传递到使用匹配Key绑定的所有队列。

7.2 使用示例

生产者

/**
 *                      ↗#.error--->topic.queue1-->消费者1
 * 生产者-->topic.exchange
 *                      ↘order.*--->topic.queue2-->消费者2
 * 需要:
 *      1. 声明Topic类型的交换机:topic.exchange
 *      2. 声明队列1:topic.queue1
 *         声明队列2:topic.queue2
 *      3. 把交换机与队列1绑定:将RoutingKey为#.error的消息,投递到topic.queue1
 *         把交换机与队列2绑定:将RoutingKey为order.*的消息,投递到topic.queue2
 */
@Configuration
public class Demo05TopicConfig {
    @Bean
    public Queue topicQueue1(){
        return QueueBuilder.durable("topic.queue1").build();
    }
    @Bean
    public Queue topicQueue2(){
        return QueueBuilder.durable("topic.queue2").build();
    }

    @Bean
    public TopicExchange topicExchange(){
        return ExchangeBuilder.topicExchange("topic.exchange").build();
    }

    @Bean
    public Binding queue1Binding(TopicExchange topicExchange, Queue topicQueue1){
        return BindingBuilder.bind(topicQueue1).to(topicExchange).with("#.error");
    }

    @Bean
    public Binding queue2Binding(TopicExchange topicExchange, Queue topicQueue2) {
        return BindingBuilder.bind(topicQueue2).to(topicExchange).with("order.*");
    }
}

发送消息

@SpringBootTest
public class Demo05TopicTest {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    public void test(){
        //路由为order.*的消息,投递到topic.queue2,将由消费者2接收
        rabbitTemplate.convertAndSend("topic.exchange", "order.create", "创建订单1");
        //路由为order.*的消息,投递到topic.queue2,将由消费者2接收
        rabbitTemplate.convertAndSend("topic.exchange", "order.cancel", "取消订单1");
        //路由为#.error的消息,投递到topic.queue1,将由消费者1接收
        //路由为order.*的消息,投递到topic.queue2,将由消费者2接收
        rabbitTemplate.convertAndSend("topic.exchange", "order.error", "订单1取消失败");
    }
}

消费者

@Slf4j
@Component
public class Demo05TopicListener {

    @RabbitListener(queues = "topic.queue1")
    public void handleTopicQueue1Msg(String msg) {
        log.info("消费者1从{}接收到消息:{}", "topic.queue1", msg);
    }

    @RabbitListener(queues = "topic.queue2")
    public void handleTopicQueue2Msg(String msg) {
        log.info("消费者2从{}接收到消息:{}", "topic.queue1", msg);
    }
}

 

 

8. 小结

1. 简单模式的使用:
  
 生产者
        声明队列:在一个配置类里,使用@Bean方式,声明了一个Queue对象
        发送消息:使用RabbitTemplate对象的convertAndSend("队列名称", "消息内容");
    
消费者
        在任意bean对象里增加一个方法
        @RabbitListener(queues="队列名称")
        public void 方法名(String msg){
            //msg的值,就是接收到的消息内容
        }


2. 工作队列模式:
       主要解决的问题单个消费者处理消息的能力有限

                                   增加消费者的数量,可以提升消费能力

             
生产者
        声明队列:在一个配置类里,使用@Bean方式,声明了一个Queue对象
        发送消息:使用RabbitTemplate对象的convertAndSend("队列名称", "消息内容");

消费者
        在任意bean对象里增加多个方法,每个方法都是一个消费者,可以处理消息
        @RabbitListener(queues="队列名称")
        public void 方法名(String msg){
            //msg的值,就是接收到的消息内容
        }
为了避免不同消费者,消费能力不均衡,

导致:某些消费者很快处理完了,其它消费者处理的慢导致消息堆积
不采用平均分摊消息的方式(默认)
让消费者每次拉取1条消息,处理完再拉取:谁的性能强,谁就能拉取、处理更多的消息


五、其它内容

        

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/535261.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

IPWorks EDI 2022 .NET Edition 22.0.8 Crack

IPWorks EDI基于用于安全 EDI 通信&#xff08;AS2、SFTP、OFTP、RosettaNet、MLLP 等&#xff09;的领先 EDI-INT 协议&#xff0c;IPWorks EDI 库包含促进安全 EDI 消息传递以及 EDI 映射、翻译和验证&#xff08;X12、 EDIFACT、HL7、TRADACOMS、VDA、XML 和 JSON&#xff0…

Mybatis基础操作

文章目录 一. Mybatis单表操作删除操作查询操作#{} 与 ${}的区别更新操作新增操作 二. Mybatis多表操作 一. Mybatis单表操作 删除操作 我们接着使用昨天的表和程序&#xff0c;我们来实现通过id删除数据&#xff1a; 我们这样就可以实现将id 1的数据进行删除了&#xff0c;…

FE_JS对象的理解

对象是JS中的引用数据类型&#xff0c;对象是一种复合数据类型&#xff0c;在对象中可以保存多个不同数据类型的属性。使用typeof检查一个对象时&#xff0c;会返回object。 1 对象创建模式 - Object构造函数模式 套路: 先创建空Object对象, 再动态添加属性/方法 适用场景: 起…

【LeetCode: 1335. 工作计划的最低难度 | 暴力递归=>记忆化搜索=>动态规划 】

&#x1f680; 算法题 &#x1f680; &#x1f332; 算法刷题专栏 | 面试必备算法 | 面试高频算法 &#x1f340; &#x1f332; 越难的东西,越要努力坚持&#xff0c;因为它具有很高的价值&#xff0c;算法就是这样✨ &#x1f332; 作者简介&#xff1a;硕风和炜&#xff0c;…

又一个开源便斩获 7k star 的新模型「GitHub 热点速览」

作者&#xff1a;HelloGitHub-小鱼干 Star 并不能代表什么&#xff0c;但是绝对能表示一个项目的受欢迎程度。就像刚开源一周就有 7k star 的新模型&#xff0c;输入文本 / 图像就能获得 3D 对象。除了这个新模型&#xff0c;本周还有一款新的 Web 3D 渲染引擎 Orillusion&…

MySQL学习---17、MySQL8其它新特性

1、MySQL新增特性 1.1 更简便的NoSQL支持 NoSQL泛指非关系型数据库和数据存储。随着互联网平台的规模飞速发展&#xff0c;传统的关系型数据库已经越来越不能瞒住需求。从5.6版本开始&#xff0c;MySQL就开始支持简单的NoSQL存储功能。MySQL 8对这一功能做了优化&#xff0c;…

阿里P8强烈推荐的可伸缩服务架构:框架与中间件笔记

随着企业业务量的增加&#xff0c;流量洪峰在不断挑战着业务系统的承载能力&#xff0c;设计高并发、可伸缩的系统已成为软件架构师的紧迫任务&#xff0c;而分布式、可伸缩的架构模式已成为抵御洪峰的有效方案之一。本书汇集了作者在多年核心系统开发中的架构及实践经验&#…

【算法】--- 几分钟带你走进排序算法大门(上)

文章目录 前言&#x1f31f;一、常见的排序算法&#xff1a;&#x1f31f;二、直接插入排序排序&#xff1a;&#x1f4d2;2.1 基本思想&#xff1a;&#x1f4d2;2.2 代码&#xff1a;&#x1f4d2;2.3 时间复杂度&#xff1a;O(N^2)&#x1f4d2;2.4 流程图详解&#xff1a; …

聚观早报 | Midjourney官方在QQ开启内测;富士康印度新工厂动工

今日要闻&#xff1a;Midjourney官方在QQ开启内测&#xff1b;富士康印度新工厂动工&#xff1b;闲鱼将开收软件服务费&#xff1b;专家建议五年内禁售燃油车&#xff1b;笑果文化已被立案调查 Midjourney官方在QQ开启内测 5 月 15 日&#xff0c;据 Midjourney AI 微信公众号…

Unity之OpenXR+XR Interaction Toolkit实现 手枪模型的拆卸和组装

前言 之前我们曾实现过PC端的模型的拆卸和组装&#xff0c;如果使用VR模式来实现物体的拆卸呢&#xff1f;如何使用双手手柄来控制物体&#xff0c;拆卸物体呢&#xff1f;今天我们就来实现一个VR小Demo&#xff0c;基于OpenXR &#xff0c;XR Interaction Toolkit插件来实现。…

程序员面试宝典

前言 编者: 大淘宝技术开发工程师 / 刘苏宏(淘苏) 淘苏(花名)目前是大淘宝技术的一名开发工程师。从国企 跳槽来到互联网&#xff0c;【职业规划】是他被问得最多&#xff0c;也思考得 最多的问题。 扫一扫&#xff0c;关注公众号【大淘宝技术】 了解更多大淘宝技术干货沉淀 …

技术架构演进之路-Docker【一】

技术架构演进之路 了解每种技术架构以及如何演进的&#xff0c;熟悉Docker在架构中的核心作用 八大架构演进 单机架构 当前服务由应用服务和数据库服务两个服务组成&#xff0c;应用服务由 用户模块、商品模块、交易模块三个模块组成&#xff0c;我们可以理解它为 淘宝。用户模…

互联网时代,自媒体宣发的概念、优势、策略及注意事项

自媒体宣发是指通过自己或者委托专业机构&#xff0c;运用自媒体平台传播宣传信息的一种方式。在互联网时代&#xff0c;自媒体已经成为了企业推广的一种重要手段。本文将为大家介绍自媒体宣发的概念、优势、策略及注意事项。#自媒体# 一、什么是自媒体宣发&#xff1f; 自媒体…

通配符SSL证书是什么?

通配符SSL证书可以对申请的域名保护以外&#xff0c;还可以对下一级子域名无限使用&#xff0c;适合存在很多二级域名项目的网站&#xff0c;这样不需要额外对子域名申请SSL证书&#xff0c;还可以进行同意管理及部署SSL证书避免跨站窜站的问题。 申请通配符SSL证书 一、申请通…

SSH远程终端神器,你在用哪一款

唠嗑部分 在我们日常开发中啊&#xff0c;不可避免的要与Linux打交道&#xff0c;虽然我们作为开发&#xff0c;不要求我们对Linux有多么的专业&#xff0c;但是基本的操作还是要会的&#xff0c;举几个常用的例子&#xff1a; 1、查看nginx配置&#xff0c;配置转发 2、清理…

蓝桥杯模块学习4——数码管

第一章 硬件部分 1.1 电路的组成部分 1.1.1 译码器和锁存器 具体可回顾之前LED灯的文章&#xff1a; https://blog.csdn.net/weixin_63568691/article/details/130660096 1.1.2 共阳极数码管&#xff1a; 原理图&#xff1a; 功能&#xff1a; &#xff08;1&#xff09;可…

[网鼎杯 2020 青龙组]bang 复现--frida-dexdump安卓脱壳工具的使用

一.前言 在NSSCTF练习安卓逆向,第一次遇到安卓脱壳题 大佬的题解只有一句话"frida-dexdump一把嗦" 听起来容易做起来难,还遇到了安卓虚拟机的玄学bug,折磨了我很久,好在最终使用真机成功dump并得到flag 题目来源:[网鼎杯 2020 青龙组]bang 如果直接用jadx打开会发现…

重大喜讯!国产讯飞星火大模型将超越chatGPT!

引言 近年来&#xff0c;随着人工智能技术的快速发展&#xff0c;自然语言处理成为了研究的热点。而在自然语言处理领域&#xff0c;ChatGPT是一个备受关注的模型&#xff0c;它的出现极大地推动了自然语言处理技术的发展。然而&#xff0c;最近国内一家公司——讯飞&#xff0…

记录--10个超级实用的Set、Map使用技巧

这里给大家分享我在网上总结出来的一些知识&#xff0c;希望对大家有所帮助 Set是一种类似于数组的数据结构&#xff0c;但是它的值是唯一的&#xff0c;即Set中的每个值只会出现一次。Set对象的实例可以用于存储任何类型的唯一值&#xff0c;从而使它们非常适用于去重。 Map是…

如何为研发团队打造专属的效能提升路径|QECon 演讲回顾

近日&#xff0c;ONES 受邀参加 2023 QECon 全球软件质量&效能大会&#xff08;深圳站&#xff09;。在会上&#xff0c;ONES 研发效能改进咨询顾问陈仪&#xff0c;发表了主题为《如何为研发团队打造专属的效能提升路径》的演讲。 陈仪有着丰富的咨询经验&#xff0c;曾带…