【RabbitMQ】RabbitMQ的下载安装及使用

news2025/2/10 17:31:31

安装RabbitMQ

下载网站:https://www.rabbitmq.com/docs/install-windows

在这里插入图片描述

点击后,会直接定位到依赖介绍位置,告诉你需要安装Erlang

在这里插入图片描述

下载Erlang

Erlang也是一种编程语言,只是比较小众,但其拥有极为出色的性能

在这里插入图片描述

这个网站是到GitHub上下载的,可能需要点魔法,也可以去Erlang官网下载(能下,但慢)

在这里插入图片描述

下载RabbitMQ

下载Erlang的同时,也顺便下载RabbitMQ吧

在这里插入图片描述

或者直接使用别人下载好的包,比如我这提供的包

安装Erlang

运行下载好的exe文件

  • 点击Next即可

在这里插入图片描述

  • 选择安装路径,点击Next继续

在这里插入图片描述

  • 点击Install安装

在这里插入图片描述

  • 安装完后,点击Close即可

在这里插入图片描述

安装RabbitMQ

运行下载好的exe文件

  • 点击Next即可

在这里插入图片描述

  • 选择安装路径,点击Install安装

在这里插入图片描述

  • 安装成功后,点击Next继续

在这里插入图片描述

  • 点击Finish完成安装

在这里插入图片描述

安装插件

找到RabbitMQ目录下的sbin目录,打开CMD控制台,输入rabbitmq-plugins.bat enable rabbitmq_management命令

在这里插入图片描述

重启RabbitMQ服务后访问http://localhost:15672

在这里插入图片描述

默认账号密码均为guest

在这里插入图片描述

使用RabbitMQ

  • 导入依赖
<!-- https://mvnrepository.com/artifact/com.rabbitmq/amqp-client -->
<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>5.24.0</version>
</dependency>

一对一队列模型

  • 生产者发送消息
/**
 * 一对一消息队列模型:生产者
 */
public class SingleProducer {

    private final static String QUEUE_NAME = "hello";

    public static void main(String[] argv) throws Exception {
        // 创建连接工厂,用于创建到RabbitMQ服务器的连接
        ConnectionFactory factory = new ConnectionFactory();
        // 设置RabbitMQ服务器地址
        factory.setHost("localhost");
        // 创建一个连接,用于和RabbitMQ服务器建立通信通道
        try (Connection connection = factory.newConnection();
             // 创建一个通道
             Channel channel = connection.createChannel()) {
            // 声明一个队列,队列名为hello
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            String message = "Hello World!";
            // 将消息发布到指定队列中
            channel.basicPublish("", QUEUE_NAME, null, message.getBytes(StandardCharsets.UTF_8));
            System.out.println(" [x] Sent '" + message + "'");
        }
    }
}

运行后,RabbitMQ管理后台会增加一个队列

在这里插入图片描述

  • 消费者消费消息
/**
 * 一对一消息队列模型:消费者
 */
public class SingleConsumer {
   // 定义我们正在监听的队列名称
    private final static String QUEUE_NAME = "hello";

    public static void main(String[] argv) throws Exception {
        // 创建连接工厂并配置连接信息
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        // 从工厂中获取一个新的连接
        Connection connection = factory.newConnection();
        // 创建一个新的通道
        Channel channel = connection.createChannel();
        // 声明一个队列,在该通道中声明我们要监听的队列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
        // 创建一个回调函数,用于处理从队列中接收到的消息
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            // 获取消息体并转换为字符串
            String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
            System.out.println(" [x] Received '" + message + "'");
        };
        // 在通道上开始消费队列中的消息,接收的消息会传递给deliverCallback进行处理,会持续阻塞
        channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });
    }
}

未运行代码前

在这里插入图片描述

运行代码后

在这里插入图片描述

此时在不中断消费者代码运行的情况下,再去运行生产者代码,会发现消费者会持续消费生产者增加的消息

在这里插入图片描述

一对多队列模型

  • 生产者发送消息
/**
 * 一对多消息队列:生产者
 */
public class MultiProducer {

  private static final String TASK_QUEUE_NAME = "task_queue";

  public static void main(String[] argv) throws Exception {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    try (Connection connection = factory.newConnection();
         Channel channel = connection.createChannel()) {
        // durable参数设置为 true,服务器重启后队列不丢失
         channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
		// 此处使用scanner循环输入消息,模拟生产者不定时多次生产
        Scanner scanner = new Scanner(System.in); 
        while (scanner.hasNext()){
            String message = scanner.nextLine();
            // 指定 MessageProperties.PERSISTENT_TEXT_PLAIN,表示消息持久化
            channel.basicPublish("", TASK_QUEUE_NAME,
                    MessageProperties.PERSISTENT_TEXT_PLAIN,
                    message.getBytes("UTF-8"));
            System.out.println(" [x] Sent '" + message + "'");
        }
    }
  }
}

运行代码,可以模拟发送多条数据

在这里插入图片描述

  • 消费者消费信息
/**
 * 一对多消息队列:消费者
 */
public class MultiConsumer {

  private static final String TASK_QUEUE_NAME = "task_queue";

  public static void main(String[] argv) throws Exception {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    final Connection connection = factory.newConnection();
    final Channel channel = connection.createChannel();
      // 设置持久化
    channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
    System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

    channel.basicQos(1);

    DeliverCallback deliverCallback = (consumerTag, delivery) -> {
        String message = new String(delivery.getBody(), "UTF-8");

        try {
            System.out.println(" [x] Received '" + message + "'");
            doWork(message);
        } finally {
            System.out.println(" [x] Done");
            // 手动确认消息
            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
        }
    };
    // 开始消费消息,传入队列名称。关闭自动确认,投递回调和消费者取消回调
    channel.basicConsume(TASK_QUEUE_NAME, false, deliverCallback, consumerTag -> { });
  }

  // 模拟消息处理,消息中每有一个“.”就让线程暂停10s,模拟复杂的耗时工程
  private static void doWork(String task) {
    for (char ch : task.toCharArray()) {
        if (ch == '.') {
            try {
                Thread.sleep(10000);
            } catch (InterruptedException _ignored) {
                Thread.currentThread().interrupt();
            }
        }
    }
  }
}

运行消费者代码

在这里插入图片描述

队列中的消息不是一次性全部接收的,而是需要等待消费者完成消费(处理完事务后)并主动确认完成后,才会继续发送下一条消息

在这里插入图片描述

这与一对多有什么关系呢?不急,我们停掉消费者代码运行先,然后让生产者进行生成消息

在这里插入图片描述

然后,直接拷贝一份消费者代码,命名为MultiConsumer2。此时运行两个消费者看效果

在这里插入图片描述

会发现,两个消费者会轮流接收队列消息,消费完(完成任务)后才会确认并接收新的队列消息,直至队列所有消息被消费完

在这里插入图片描述
下面交换机模型为初学者见解,可能存在理解错误,看看就好,我后面也会深入学习,但大概率不会再修改本文章,所以要专业的、准确的还是得看官方文档,这里会用就行,被误导了概不负责,谢谢

交换机模型

交换机是消息队列中的一个组件,有点类似于中转站,上面两个模型都是生产者创建消息队列,然后由消费者去接收指定消息队列中的消息,而交换机模型中,生产者不再创建指定的消息队列,而是创建一个交换机,由消费者去绑定交换机并创建消息队列,然后再接收生产者的消息。由直接变间接。

这就有点像网络路由一样,最初,两台电脑要互发消息,就必须各自开一个网口连接网线,三台电脑要交互就各开两个网口,随着电脑接入的越多,一台电脑上要的网口就越多,网线交错也就越复杂,这时为了更好梳理网线和减少网口,就有了集线器、交互器、路由器等,而RabbitMQ中的交换机也是同样道理,为了方便管理多个消息队列及其后续变动

在这里插入图片描述

交换机有direct, topic, headersfanout四种类型,因为headers交换机的性能较差,不太推荐使用,了解有该类型即可

Fanout交换机

fanout有扇出的意思,该类型交换机会把消息一次性扇出(发布)给所有与该交换机绑定的消息队列,适用于广播消息,如更新文章后,广播消息给所有订阅文章的用户

在这里插入图片描述

  • 生产者发送消息
/**
 * 交换机模型:生产者
 */
public class FanoutProducer {

    // 定义交换机的名称
    private static final String EXCHANGE_NAME = "exchange";

    public static void main(String[] argv) throws Exception {
        // 创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        // 设置工厂的主机地址
        factory.setHost("localhost");
        // 创建一个连接和通道
        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {
            //  声明一个交换机,交换机名称为exchange,类型为fanout
            channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
            // 发布消息
            Scanner scanner = new Scanner(System.in);
            while(scanner.hasNext()){
                // 获取用户输入
                String message = scanner.nextLine();
                // 将消息发送到指定的交换机上,交换机名称为exchange,路由键为空,消息属性为null,消息内容为用户输入的字符串
                channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("UTF-8"));
                System.out.println(" [x] Sent '" + message + "'");
            }
        }
    }
}

此时运行生产者代码,并输入内容,会发现队列表中没有新增队列

在这里插入图片描述

但可以再Exchanges中看到名为exchange、类型为fanout的交换机信息

在这里插入图片描述

  • 消费者消费消息
/**
 * 交换机模型:消费者
 */
public class FanoutConsumer {
  // 声明交换机的名称
  private static final String EXCHANGE_NAME = "exchange";

  public static void main(String[] argv) throws Exception {
    // 创建连接工厂
    ConnectionFactory factory = new ConnectionFactory();
    // 设置工厂的主机地址
    factory.setHost("localhost");
    // 创建一个连接和通道
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();
    // 声明一个交换机,交换机名称为exchange,类型为fanout
    channel.exchangeDeclare(EXCHANGE_NAME, "fanout");

    // String queueName = channel.queueDeclare().getQueue();
    // 创建队列,名称为fanout_queue,并绑定到交换机上
    String queueName = "yg1_queue";
    channel.queueDeclare(queueName, true, false, false, null);
    channel.queueBind(queueName, EXCHANGE_NAME, "");

    System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

    DeliverCallback deliverCallback = (consumerTag, delivery) -> {
        String message = new String(delivery.getBody(), "UTF-8");
        System.out.println(" [员工1] Received '" + message + "'");
    };
    channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });
  }
}

运行消费者代码,会发现队列表中新增了yg1_queue队列

在这里插入图片描述

此时拷贝多一份消费者代码,并修改队列名为yg2_queue

在这里插入图片描述

运行后,队列表会新增yg2_queue队列

在这里插入图片描述

此时生产者发送消息后,会同时发送给两位消费者进行处理

在这里插入图片描述

需要注意的是,如果生产者先发送消息,再创建消费者,因为还没有创建存储的消息队列,所以是无法存储消息的,即消费者无法接收队列创建前的旧消息;但如果消费者已经启动过一次了(RabbitMQ中已有其消息队列),那么生产者发送消息后再启动消费者,还是能接收到消息的;

就比如你没有QQ号,那么别人发送不了消息给你,当你创建好QQ号后,无论你是否上线,别人都能发送消息给你

以一对多队列模型为例

  • 我们删除队列表中所有队列

在这里插入图片描述

  • 然后只运行生产者代码,会发现它直接创建好了一个消息队列

在这里插入图片描述

  • 此时发送消息后再启动消费者,消费者是能接收到队列消息的

在这里插入图片描述

因为消息只在消息队列中传递,交换机只是中间件。这里的生产者只创建交换机,不创建队列,队列有消费者创建

在这里插入图片描述

在这里插入图片描述

为什么要用交换机呢? (个人理解)

打个不恰当的比喻:消息队列就是Q群,此时你有n个Q群,你要给每个群都发送一个拼手气红包,让群友去争抢;你自然可以手动一个一个去发,但更好的方式是选择采用脚本(交换机),通过脚本(交换机)去给该账号下的每个群(消息队列)都去发送一个拼手气红包。这样的好处在于,后面不论是有新群还是有群被解散,你都无需理会,你只需在意是否是自己q号上的群(是否绑定在交换机上)

Direct交换机

fanout就像AOE技能,无差别的范围攻击,而Direct就像是指定性单体技能,即使有多个消息队列绑定在其上,也能根据路由键给指定消息队列发送消息,适用于指派任务,通过路由键分发任务给指定消息队列

  • 生产者发送消息
/**
 * direct交换机模型:生产者
 */
public class DirectProducer {
  // 定义交换机名称
  private static final String EXCHANGE_NAME = "direct_exchange";

  public static void main(String[] argv) throws Exception {
    // 创建连接工厂
    ConnectionFactory factory = new ConnectionFactory();
    // 设置连接工厂的主机地址为本地主机
    factory.setHost("localhost");
    // 建立连接并创建通道
    try (Connection connection = factory.newConnection();
         Channel channel = connection.createChannel()) {
        // 使用通道声明交换机,类型为direct
        channel.exchangeDeclare(EXCHANGE_NAME, "direct");

        Scanner scanner = new Scanner(System.in);
        while(scanner.hasNext()){
            // 读取用户输入内容,并以空格分隔
            String userInput = scanner.nextLine();
            String[] parts = userInput.split(" ");
            if(parts.length < 1){
                continue;
            }
            String message = parts[0];
            // 路由键,用于确定消息被路由到哪个队列
            String severity = parts[1];
            // 发布消息到交换机
            channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes("UTF-8"));
            System.out.println(" [生产者] 发送 '" + severity + "':'" + message + "'");
        }
    }
  }
}

运行生产者代码,同样不会生成消息队列而是创建类型为direct的交换机

在这里插入图片描述

  • 消费者消费消息
/**
 * direct交换机模型:消费者
 */
public class DirectConsumer {
    // 定义我们正在监听的交换机名称
    private static final String EXCHANGE_NAME = "direct_exchange";

    public static void main(String[] argv) throws Exception {
        // 创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        // 设置连接工厂的主机地址为本地主机
        factory.setHost("localhost");
        // 建立与 RabbitMQ 服务器的连接
        Connection connection = factory.newConnection();
        // 创建一个通道
        Channel channel = connection.createChannel();
        // 声明一个 direct 类型的交换机
        channel.exchangeDeclare(EXCHANGE_NAME, "direct");
        // 声明一个匿名队列,并获取队列名称
        // String queueName = channel.queueDeclare().getQueue();
        // 手动声明队列名称
        String queueName = "cy_queue";
        // 创建队列,并设置队列的持久化属性为 true
        channel.queueDeclare(queueName, true, false, false, null);
        // 绑定队列到交换机上,并指定绑定路由键为“cy”
        channel.queueBind(queueName, EXCHANGE_NAME, "cy");
        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

        // 创建一个 DeliverCallback 实例来处理接收到的消息
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println(" [残炎] 收到了 '" +
                    delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");
        };
        // 开始消费队列中的消息,设置自动确认消息已被消费
        channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {
        });
    }
}

运行消费者代码,会生成指定名称的队列

在这里插入图片描述

此时,生产者发送消息并指定路由键后,对应的消费者会收到消息,而不属于它 的消息不会收到

在这里插入图片描述

同样的,拷贝一份消费者代码并启动

在这里插入图片描述

生产者每次发送给不同目标发送消息,都会精准无误地转发到指定目标

在这里插入图片描述

其实上面也同样演示了一个问题,那就是发送消息给不存在的路由键目标,也就是还没拷贝第二份消费者(fy)代码时生产者给fy发送的消息,是直接丢弃的。

direct就像是能输入具体发送目标类型的红包脚本,它允许我们自行选择要发送的目标群类型,而不是账号下的所有群,毕竟有些群可能是不相干的人拉你进的,群友与你没有任何瓜葛,你又何必给他们发呢?又或者你只想给自己所有的家族群发(只要你有标记哪些群是家族群)

direct能否给不同队列发送消息?

可以的,官网明确说了,不同消息队列允许绑定相同的路由键,而我们发送消息只关注路由键是否存在,并不在意有几个队列绑定在同一路由键上,所以我们可以将同类型的消息队列绑定在同一路由键上

在这里插入图片描述

Topic交换机

topic交换机与direct交换机类型,也是指定性,只不过它不再是单体指定,而是允许指定多个目标(注意,这里的目标指的是路由键而非具体的消息队列)。

  • 生产者发送消息
/**
 * topic交换机模型:生产者
 */
public class TopicProducer {
    // 交换机名称
    private static final String EXCHANGE_NAME = "topic_exchange";

    public static void main(String[] argv) throws Exception {
        // 创建连接工厂并设置连接参数
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        // 建立连接并创建通道
        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {
            // 使用通道声明交换机,类型为topic
            channel.exchangeDeclare(EXCHANGE_NAME, "topic");

            Scanner scanner = new Scanner(System.in);
            while (scanner.hasNext()){
                String userInput = scanner.nextLine();
                String[] parts = userInput.split(" ");
                if (parts.length < 1) {
                    continue;
                }
                String message = parts[0];
                String routingKey = parts[1];
                // 发布消息到指定的交换机和路由键
                channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes("UTF-8"));
                System.out.println(" [生产者] 发送 '" + routingKey + "':'" + message + "'");
            }
        }
    }
}

运行生产者代码,同样不会生成消息队列而是创建类型为topic的交换机

在这里插入图片描述

  • 消费者消费消息
/**
 * topic交换机模型:消费者
 */
public class TopicConsumer {

    // 定义监听的交换机名称
    private static final String EXCHANGE_NAME = "topic_exchange";

    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        // 创建连接和通道
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        // 声明使用的交换机,并指定类型为topic
        channel.exchangeDeclare(EXCHANGE_NAME, "topic");
        // 创建cy队列
        String queueName = "cy_queue";
        // 声明队列,并设置队列未持久化、非排他、非自动删除
        channel.queueDeclare(queueName, true, false, false, null);
        // 绑定队列到交换机上,并指定路由键模式为“#.cy.#”
        channel.queueBind(queueName, EXCHANGE_NAME, "#.cy.#");

        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println(" [cy组] 收到 '" +
                    delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");
        };
        channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {
        });
    }
}

同样拷贝一份消费者代码并运行

在这里插入图片描述

与direct对比,除了routing key采用格式不同外,表面上好像并没有太大的区别

在这里插入图片描述

但topic可以以组合的发送而direct并不能,如下,cy组和fy组可以以任意形式组合,并发送对应消息给他们

在这里插入图片描述

这样,我们就可以一次性指定多个具体目标去处理指定消息

值得注意的一点,topic允许一个消息队列绑定多个绑定键,然后只要匹配其中一个即可收到消息

我们再拷贝一份消费者代码,修改如下,这里我们绑定的路由键包含cy与fy

在这里插入图片描述

此时无论我们给cy发,还是fy发,cyfy组都能收到消息

在这里插入图片描述

且需要注意的是,即使我们同时给cy和fy发消息(项目3 cy.fy),cyfy组都只会收到一条消息,不同出现重复接收的情况

在这里插入图片描述

topic就像是比direct更高级的脚本(官方说的,topic是比direct更复杂),direct这个脚本只能指定一个群类型,假设我除了想给所有家族群发,还想给我创建的群发,或者我管理的群,很显然,这需要我重新设置目标类型并再一次启动脚本。

在这里插入图片描述

这自然没有任何问题,无非就是需要操作多次的问题,可事实上,可能有些家族群是由我创建的或者由我管理的,那么就会出现一个问题,脚本会在某些群里发送多次红包,这很显然不符合我给指定目标群发送一个拼手气红包的目的。topic便是能处理这个问题的脚本,我能一次性设置家族群、我是群主、我是管理员三个类型,然后一次性给我Q号下满足这些类型的群发送一个拼手气红包,且及时有些群满足多个条件,也只会发送一个。

项目实战

项目中有很多种方法使用RabbitMQ,如使用官方的客户端,或者使用别人封装好的客户端,因为我的项目使用的 SpringBoot 框架,所以这里直接选择 Spring官方提供的封装好的客户端:Spring Boot RabbitMQ Starter

  • 导入依赖
<!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-amqp -->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
    <version>2.7.2</version>
</dependency>
  • yml配置
spring:
    rabbitmq:
      # rabbitmq 主机地址
      host: localhost
      # 端口
      port: 5672
      # 登录账号和密码
      username: guest
      password: guest

在项目使用MQ前,需先创建好交换机和队列

  • MqInitMain
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

/**
 * 用于创建测试程序用到的交换机和队列(只用在程序启动前执行一次)
 */
public class MqInitMain {
    public static void main(String[] args) {
        try{
            // 创建链接工厂
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("localhost");
            // 创建连接
            Connection connection = factory.newConnection();
            // 创建通道
            Channel channel = connection.createChannel();
            // 定义交换机的名称为“code_exchange”
            String EXCHAMGE_NAME = "code_exchange";
            // 声明交换机,并指定类型
            channel.exchangeDeclare(EXCHAMGE_NAME, "direct");

            // 创建队列,随机分配一个队列名称
            String queueName = "code_queue";
            // 声明队列,设置队列持久化、非独占、非自动删除、并传入额外参数为null
            channel.queueDeclare(queueName, true, false, false, null);
            // 将队列绑定到交换机,指定路由键为“my_routingKey”
            channel.queueBind(queueName, EXCHAMGE_NAME, "my_routingKey");
        }
       catch (Exception e){
            e.printStackTrace();
        }
    }
}

运行后关闭即可,RabbitMQ后台会增加对应交换机和队列

在这里插入图片描述

  • 生产者:MyMessageProducer
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;

@Component
public class MyMessageProducer {
    @Resource
    private RabbitTemplate rabbitTemplate;

    /**
     * 发送消息
     * @param exchange  交换机名称,指定消息要发送到哪个交换机
     * @param routingKey 路由键,指定消息要根据什么规则路由到对应的队列
     * @param message 消息内容,要发生的具体消息
     */
    public void sendMessage(String exchange, String routingKey, Object message) {
        // 使用rabbitTimplate的convertAndSend方法发送消息到指定交换机(exchange)和路由键(routingKey)
        rabbitTemplate.convertAndSend(exchange, routingKey, message);
    }
}
  • 消费者:MyMessageConsumer
@Component
@Slf4j
public class MyMessageConsumer {

    /***
     *
     * @param message
     * @param channel
     * @param deliveryTag
     */
    @SneakyThrows
    // 使用 @RabbitListener 注解,监听code-queue队列,并设置消息的确认机制为手动确认
    @RabbitListener(queues = {"code_queue"},ackMode = "MANUAL")
    public void receiveMessage(String message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) {
        log.info("接收到消息:{}", message);
        // deliveryTag是一个数字标识,在消息消费者接收到消息后用于向RabbitMQ确认消息的处理状态,告知该消息已经被消费,可以进行确认和从队列中删除。
        channel.basicAck(deliveryTag, false);
    }
}

需要注意,填写的队列名一定得在RabbitMQ服务中存在,否则会直接报错

在这里插入图片描述

所以推荐把队列名、交换机名、路由键给抽出来作为常数管理

public interface BiMqConstant {
    // 交换机、队列、路由键信息
    String BI_EXCHANGE_NAME = "bi_exchange";
    String BI_QUEUE_NAME = "bi_queue";
    String BI_ROUTING_KEY = "bi_routingKey";
}

测试

@SpringBootTest
class MyMessageProducerTest {

    @Resource
    private MyMessageProducer myMessageProducer;

    @Test
    void sendMessage() {
        myMessageProducer.sendMessage("code_exchange", "my_routingKey", "Hello, RabbitMQ!");
    }
}

消费者监听到队列有消息,就会接收并处理

在这里插入图片描述

当我们路由键写错,RabbitMQ没有对应路由键,RabbitMQ会直接丢弃该消息

在这里插入图片描述

如果本项目没有对应消息队列的消费者,那么也不会去消费该消息队列的消息

项目中使用

  • Controller/server层
@Resource
private BiMessageProducer biMessageProducer;

public BaseResponse<BiResponse> genChartByAiAsyncMq(@RequestPart("file") MultipartFile multipartFile,
                                                  GenChartByAiRequest genChartByAiRequest, HttpServletRequest request){
    // TODO 处理代码逻辑
    ......
    // 这里是先将用户信息处理好后,先存入到数据库中,并标准执行状态为等待
        
    // 在返回结果前先提交一个任务
    // 建议处理任务队列满之后,抛异常的情况(因为提交任务报错了,前端会返回异常)
    biMessageProducer.sendMessage(String.valueOf(newChartId));

    // TODO 处理代码逻辑
    ......

    return ResultUtils.success(biResponse);
}
  • BiMessageProducer
/**
 * 发送消息(固定发送目标)
 * @param message 消息内容,要发生的具体消息
 */
public void sendMessage(Object message) {



    // 使用rabbitTimplate的convertAndSend方法发送消息到指定交换机(exchange)和路由键(routingKey)
    rabbitTemplate.convertAndSend(BiMqConstant.BI_EXCHANGE_NAME, BiMqConstant.BI_ROUTING_KEY, message);
}
  • BiMessageConsumer
@SneakyThrows
// 使用 @RabbitListener 注解,监听code-queue队列,并设置消息的确认机制为手动确认
@RabbitListener(queues = {BiMqConstant.BI_QUEUE_NAME},ackMode = "MANUAL")
public void receiveMessage(String message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) {
    log.info("接收到消息:{}", message);
    if (StringUtils.isBlank(message)){
        // 如果更多失败,拒绝当前消息,让消息重新进入队列
        channel.basicNack(deliveryTag,false,false);
        throw new BusinessException(ErrorCode.SYSTEM_ERROR,"消息为空");
    }

    // TODP 进行业务逻辑处理
    ......
    // 在需要抛出异常处,即业务处理失败时,拒绝消息
    if (!updateResult){
        // 只要不成功,就拒绝消息
        channel.basicNack(deliveryTag,false,false);
        handleChartUpdateError(chart.getId(), "更新失败");
    }

    // 如果业务处理完了,就确认消息
    channel.basicAck(deliveryTag, false);
}

当用户提交一个请求后,会发送一个消息到RabbitMQ上。此时不会理会该消息是否被消费,而是直接返回一个等待结果给用户;当有进程空闲时,消费者会开始监听并处理该消息的业务,如果业务处理异常,则拒绝消息,让消息重回队列中等再次被人接收;否则确认消息已被消费。此时用户可在后台中看到请求已完成

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2295911.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Stylelint 如何处理 CSS 预处理器

&#x1f90d; 前端开发工程师、技术日更博主、已过CET6 &#x1f368; 阿珊和她的猫_CSDN博客专家、23年度博客之星前端领域TOP1 &#x1f560; 牛客高级专题作者、打造专栏《前端面试必备》 、《2024面试高频手撕题》 &#x1f35a; 蓝桥云课签约作者、上架课程《Vue.js 和 E…

Word中Ctrl+V粘贴报错问题

Word中CtrlV粘贴时显示“文件未找到&#xff1a;MathPage.WLL”的问题 Word的功能栏中有MathType&#xff0c;但无法使用&#xff0c;显示灰色。 解决方法如下&#xff1a; 首先找到MathType安装目录下MathPage.wll文件以及MathType Commands 2016.dotm文件&#xff0c;分别复…

jmeter逻辑控制器9

1&#xff0c;简单控制器2&#xff0c;录制控制器3&#xff0c;循环控制器4&#xff0c;随机控制器5&#xff0c;随机顺序控制器6&#xff0c;if控制器7&#xff0c;模块控制器8&#xff0c;Include控制器9&#xff0c;事物控制器本文永久更新地址: 1&#xff0c;简单控制器 不…

GitHub Copilot Agent 模式系统提示词

系统提示词 你是一名 AI 编程助手。 当被问及你的名字时&#xff0c;你必须回答“GitHub Copilot”。请严格且完整地遵循用户的要求。 遵守微软内容政策。 避免涉及侵犯版权的内容。如果有人要求你生成有害、仇恨、种族主义、性别歧视、淫秽、暴力或与软件工程完全无关的内容&…

【设计模式】【行为型模式】模板方法模式(Template Method)

&#x1f44b;hi&#xff0c;我不是一名外包公司的员工&#xff0c;也不会偷吃茶水间的零食&#xff0c;我的梦想是能写高端CRUD &#x1f525; 2025本人正在沉淀中… 博客更新速度 &#x1f4eb; 欢迎V&#xff1a; flzjcsg2&#xff0c;我们共同讨论Java深渊的奥秘 &#x1f…

w200基于spring boot的个人博客系统的设计与实现

&#x1f64a;作者简介&#xff1a;多年一线开发工作经验&#xff0c;原创团队&#xff0c;分享技术代码帮助学生学习&#xff0c;独立完成自己的网站项目。 代码可以查看文章末尾⬇️联系方式获取&#xff0c;记得注明来意哦~&#x1f339;赠送计算机毕业设计600个选题excel文…

docker grafana安装

mkdir /root/grafana-storage chmod 777 -R /root/grafana-storage docker run -d -p 3000:3000 --namedocker-apisix-grafana-1 --network docker-apisix_apisix -v /root/grafana-storage:/var/lib/grafana grafana/grafana:9.1.0 浏览器访问&#xff1a; http://192.…

H5+CSS+JS制作好看的轮播图

先来看效果 点击下方按钮可以做到平滑切换轮播&#xff0c;轮播图片可以根据自定义随心变化。 先来看一下页面代码结构 <div class"container"><div class"lunbo-wrap"><div id"slide"></div><div class"butto…

表单与交互:HTML表单标签全面解析

目录 前言 一.HTML表单的基本结构 基本结构 示例 二.常用表单控件 文本输入框 选择控件 文件上传 按钮 综合案例 三.标签的作用 四.注意事项 前言 HTML&#xff08;超文本标记语言&#xff09;是构建网页的基础&#xff0c;其中表单&#xff08;<form>&…

Python基础-元组tuple的学习

在 Python 中&#xff0c;元组&#xff08;tuple&#xff09;是一种不可变的序列类型&#xff0c;允许存储不同类型的元素。元组非常类似于列表&#xff08;list&#xff09;&#xff0c;但与列表不同的是&#xff0c;元组一旦创建&#xff0c;就不能修改其内容。 1 元组的创建…

Vue与Konva:解锁Canvas绘图的无限可能

前言 在现代Web开发中&#xff0c;动态、交互式的图形界面已成为提升用户体验的关键要素。Vue.js&#xff0c;作为一款轻量级且高效的前端框架&#xff0c;凭借其响应式数据绑定和组件化开发模式&#xff0c;赢得了众多开发者的青睐。而当Vue.js邂逅Konva.js&#xff0c;两者结…

go语言文件和目录

打开和关闭文件 os.Open()函数能够打开一个文件&#xff0c;返回一个*File 和一个 err。操作完成文件对象以后一定要记得关闭文件。 package mainimport ("fmt""os" )func main() {// 只读方式打开当前目录下的 main.go 文件file, err : os.Open(".…

Solidity09 Solidity构造函数和修饰器

文章目录 一、构造函数二、修饰器三、OpenZeppelin的Ownable标准实现四、Remix 演示示例五、代码示例 这一讲&#xff0c;我们将用合约权限控制&#xff08; Ownable&#xff09;的例子介绍 Solidity语言中构造函数&#xff08; constructor&#xff09;和独有的修饰器&…

使用wpa_supplicant和wpa_cli 扫描wifi热点及配网

一&#xff1a;简要说明 交叉编译wpa_supplicant工具后会有wpa_supplicant和wpa_cli两个程序生产&#xff0c;如果知道需要连接的wifi热点及密码的话不需要遍历及查询所有wifi热点的名字及信号强度等信息的话&#xff0c;使用wpa_supplicant即可&#xff0c;否则还需要使用wpa_…

大模型应用与实战:专栏概要与内容目录

文章目录 大模型应用与实战&#x1f4da; 核心内容模块一、大模型推理与部署1.1 推理框架应用实践1.2 框架源码深度解析1.3 高并发部署优化1.4 国产化平台适配 二、Agent框架专题2.1 Langchain系列2.2 Qwen-Agent系列2.3 Dify应用实践2.4 框架对比与迁移 三、微调技术研究3.1 微…

Arbess基础教程-创建流水线

Arbess(谐音阿尔卑斯) 是一款开源免费的 CI/CD 工具&#xff0c;本文将介绍如何使用 Arbess 配置你的第一条流水线&#xff0c;以快速入门上手。 1. 创建流水线 根据不同需求来创建不同的流水线。 1.1 配置基本信息 配置流水线的基本信息&#xff0c;如分组&#xff0c;环境&…

科普书《从一到无穷大》的科普知识推翻百年集论

科普书《从一到无穷大》的科普知识推翻百年集论 黄小宁 “我们给两组无穷大数列中的各个数一一配对&#xff0c;如果最后这两组都一个不剩&#xff0c;这两组无穷大就是相等的&#xff1b;如果有一组还有些数没有配出去&#xff0c;这一组就比另一组大些&#xff0c;或者说强些…

【键盘识别】实例分割

第一步 键盘检测 方案一 canny边缘检测 canny边缘检测检测结果不稳定,容易因为复杂背景或光线变换检测出其他目标。 如图是用canny边缘检测方法标出的检测出的边缘的四个红点。 参考的是这篇文章OpenCV实战之三 | 基于OpenCV实现图像校正_opencv 图像校正-CSDN博客 方案二…

25/2/7 <机器人基础>雅可比矩阵计算 雅可比伪逆

雅可比矩阵计算 雅可比矩阵的定义 假设我们有一个简单的两个关节的平面机器人臂&#xff0c;其末端执行器的位置可以表示为&#xff1a; 其中&#xff1a; L1​ 和 L2 是机器人臂的长度。θ1​ 和 θ2是关节的角度。 计算雅可比矩阵 雅可比矩阵 JJ 的定义是将关节速度与末…

apisix的real-ip插件使用说明

k8s集群入口一般都需要过负载均衡&#xff0c;然后再到apisix。 这时候如果后台业务需要获取客户端ip&#xff0c;可能拿到的是lb或者网关的内网ip。 这里一般要获取真实ip需要做几个处理。 1. 负载均衡上&#xff0c;一般支持配置获取真实ip参数&#xff0c;需要配置上。然…