从零开始 Spring Cloud 9:RabbitMQ

news2025/1/10 21:46:03

从零开始 Spring Cloud 9:RabbitMQ

image-20230714102655393

图源:laiketui.com

RabbitMQ 是一款消息队列中间件,可以用于异步通信。

基础

安装

通过 Docker 安装镜像:

docker pull rabbitmq:3-management

运行

docker run \
 -e RABBITMQ_DEFAULT_USER=itcast \
 -e RABBITMQ_DEFAULT_PASS=123321 \
 --name mq \
 --hostname mq1 \
 -p 15672:15672 \
 -p 5672:5672 \
 -d \
 rabbitmq:3-management

两个环境变量的意思是:

  • RABBITMQ_DEFAULT_USER,初始管理员账号
  • RABBITMQ_DEFAULT_PASS,初始管理员密码

映射的两个端口用途分别是:

  • 15672,用于浏览器访问管理页面
  • 5672,用于消息发送和接收

RabbitMQ 支持集群部署,这里演示的只是单体部署。

管理页面

现在访问 Docker 宿主机对应的 RabbitMQ 管理页面,比如 http://192.168.0.88:15672/。输入初始管理员账号密码即可进入。

关于 RabbitMQ 管理页面的功能和基本的概念,可以观看这个视频。

RabbitMQ 的基本架构可以用下图表示:

image-20210717162752376

快速入门

从一个简单示例开始。

创建一个 Maven 项目,JDK 版本选择 JDK 1.8

框架代码搭建

POM 文件中添加如下内容:

    <packaging>pom</packaging>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.3.9.RELEASE</version>
        <relativePath/>
    </parent>
    <dependencies>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
        </dependency>
        <!--AMQP依赖,包含RabbitMQ-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
        <!--单元测试-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
        </dependency>
        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-databind</artifactId>
        </dependency>
    </dependencies>

消息的生产者和消费者用子模块表示,所以父项目下的src目录可以删除。

创建两个子模块 consumerpublisher

在两个子模块中分别创建 Spring Boot 的入口类。

生产者

publisher 子模块中添加测试用例作为用 RabbitMQ 客户端发送消息的示例:

@Log4j2
public class PublisherTests {
    @Test
    @SneakyThrows
    public void testSendMessage() {
        // 设置到 RabbitMQ 的连接
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.0.88");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");
        connectionFactory.setUsername("itcast");
        connectionFactory.setPassword("123321");
        // 创建连接
        Connection connection = connectionFactory.newConnection();
        // 创建通道
        Channel channel = connection.createChannel();
        // 创建队列
        String queueName = "simple.queue";
        channel.queueDeclare(queueName, false, false, false, null);
        // 发送消息
        String msg = "Hello World!";
        channel.basicPublish("", queueName, null, msg.getBytes(StandardCharsets.UTF_8));
        log.info(String.format("message [%s] was sent to RabbitMQ.", msg));
    }
}

这里的大致过程是:

  1. 利用工厂模式创建一个到 RabbitMQ 服务端的连接(Connection)。
  2. 在连接上创建一个通道(Channel)用于发送消息。
  3. 在通道上声明一个队列(Queue)用于保存消息。
  4. 用通道向队列发送消息。

消费者

类似的,在 consumer 子模块中编写一个演示通过 RabbitMQ 接收消息的测试用例:

@Log4j2
public class ConsumerTests {
    @Test
    @SneakyThrows
    public void testConsumeMessage(){
        // 设置到 RabbitMQ 的连接
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.0.88");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");
        connectionFactory.setUsername("itcast");
        connectionFactory.setPassword("123321");
        // 创建连接
        Connection connection = connectionFactory.newConnection();
        // 创建通道
        Channel channel = connection.createChannel();
        // 声明队列
        String queueName = "simple.queue";
        channel.queueDeclare(queueName, false, false ,false, null);
        // 获取消息
        channel.basicConsume(queueName, true,  new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String msg = new String(body);
                log.info(String.format("Received msg [%s] from RabbitMQ.", msg));
            }
        });
        log.info("Already added msg receiver to RabbitMQ client.");
    }
}

除了将发送消息替换为获取消息的 API 意外,其余部分基本一致。

获取消息调用的是Channel.basicConsume方法,需要传入一个Consumer类型的回调。在 Java8 中,回调通常用传递一个匿名类实例来实现。在匿名类内,通过实现handleDelivery方法来完成对从 RabbitMQ 接收到的消息的处理。

需要注意的是,这里的匿名类内部消息处理代码是异步执行的,主程序中只是调用Channel.basicConsume方法向 RabbitMQ 客户端添加了一个一次性的对某个队列的消息接收并处理的逻辑,至于具体的什么时候才能收到该消息并执行处理逻辑,主进程并不关心。

这点可以通过日志看到:

10:26:55.586 [pool-1-thread-4] INFO org.example.simplemq.consumer.ConsumerTests - Received msg [Hello World!] from RabbitMQ.
10:26:55.586 [main] INFO org.example.simplemq.consumer.ConsumerTests - Already added msg receiver to RabbitMQ client.

主进程是main,实际执行消息处理的是进程pool-1-thread-4

上面就是用 RabbitMQ 原生的 API 实现的消息发送和接收的示例,并不是很方便使用。实际上在 Spring 框架中,我们可以使用更方便的 SpringAMQP 完成消息的发送和接收。

SpringAMQP

AMQP(Advanced Message Queuing Protocol,高级消息队列协议)是一种用于消息发送和接收的协议

,该协议与语言和平台无关。

SpringAMQP 是Spring 对 AMQP 实现后提供的一组封装好的 API,利用它可以更方便的消息发送和接收。

依赖

SpringAMQP 底层可以使用多种 MQ 实现,我们这里使用的是 RabbitMQ,所以需要添加对应的 spring-boot-starter 依赖:

<!--AMQP依赖,包含RabbitMQ-->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

这里在父项目中添加,这样子模块就无需再次添加。

配置文件

给子模块 consumerpublisher 添加配置文件application.yml

spring:
  rabbitmq:
    host: 192.168.0.88 # RabbitMQ 服务端 ip
    port: 5672 # 端口
    virtual-host: / # 虚拟主机
    username: itcast # 用户名
    password: 123321 # 密码

消息发送

在子模块 publisher 中添加一个用 SpringAMQP 发送消息的测试用例:

@RunWith(SpringRunner.class)
@SpringBootTest
@Log4j2
public class SpringAMQPTests {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    public void testSendMessage(){
        String msg = "Hello World, again!";
        String queueName = "simple.queue";
        rabbitTemplate.convertAndSend(queueName, msg);
        log.info(String.format("Already sent message [%s] to RabbitMQ.", msg));
    }
}

spring-boot-starter-amqp的自动装配会注入一个模板类RabbitTemplate的 Spring Bean,所以这里我们只需要添加对RabbitTemplate的依赖注入,并直接利用这个模板类实例发送消息即可。

这里使用了已经存在的队列simple.queue

相比原生的 RabbitMQ API,SpringAMQP 更简洁,不需要显式创建连接和通道,SpringAMQP 会帮我们创建,我们只需要利用RabbitTemplate向某个队列发送消息即可。

消息接收

同样可以用RabbitTemplate完成消息接收,但通常我们不需要那么做,因为有更好的方式:

在子模块 consumer 中添加一个 bean 定义:

@Component
@Log4j2
public class RabbitMQListeners {
    @RabbitListener(queues = "simple.queue")
    void handleMessage(String msg){
        log.info(String.format("Received message [%s] from RabbitMQ.", msg));
    }
}

这里的写法和监听 Spring 事件很类似,实际上它们的功能也是类似的,只不过 RabbitMQ 用于不同服务之间的通信,而Spring 事件用于 Spring 应用内部的通信。而前者是异步调用,后者一般是同步调用。

@RabbitListenerqueues属性定义了要监听的队列。监听处理方法包含一个表示消息的参数,该参数的类型与发送消息时候传入的类型一致,类型转换由 SpringAMQP 帮我们完成。

工作队列

上边介绍的是最简单的队列应用场景,即一个队列对应一个生产者和消费者。这种模式被称作简单队列(Simple Queue)或基本队列(Basic Queue)。实际上更常见的是一个队列对应多个消费者:

image-20210717164238910

这样做的目的是——如果生产者产生消息的速度要比单个消费者消费消息的速度快,那我们必须添加多个消费者来消费消息,这样才能避免消息在队列中的堆积,以及堆满队列后导致的消息抛弃。

这样的模式被称作工作队列(Work Queue),即多个消费者协同工作,共同处理一个队列中的消息。

下面用一个具体示例说明其实现方式。

示例

在这个示例中,我们创建一个生产者,每秒产生50条消息,两个消费者,一个每秒可以消费50条消息,另一个消费者每秒可以消费10条消息。理论上这样就不会导致消息积压。

先在子模块 publisher 中创建生产者:

@RunWith(SpringRunner.class)
@SpringBootTest
public class WorkQueueTests {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    @SneakyThrows
    public void testSendMessages() {
        //1秒内发送50条消息
        String queueName = "simple.queue";
        for (int i = 0; i < 50; i++) {
            String msg = String.format("work queue test[%d]", i + 1);
            rabbitTemplate.convertAndSend(queueName, msg);
            Thread.sleep(20);
        }
    }
}

再在子模块 consumer 中创建消费者:

@Component
@Log4j2
public class RabbitMQListeners {
    private static final String QUEUE_NAME = "simple.queue";

    @SneakyThrows
    @RabbitListener(queues = RabbitMQListeners.QUEUE_NAME)
    void consumer1(String msg) {
        //消费者1,每秒消耗50条消息
        log.info(String.format("consumer1 received message: %s", msg));
        Thread.sleep(20);
    }

    @SneakyThrows
    @RabbitListener(queues = RabbitMQListeners.QUEUE_NAME)
    void consumer2(String msg){
        //消费者2,每秒消耗10条消息
        log.info(String.format("consumer2 received message: %s", msg));
        Thread.sleep(100);
    }
}

将之前的消费者代码删除或注释掉,因为它们消费同一个队列。

先启动子模块 consumer 让两个消费者保持对队列的监听状态,然后再运行 publisher 下我们新添加的测试用例。

可以看到诸如下边的日志打印:

2023-08-02 17:41:22.015  ...  : consumer1 received message: work queue test[2]
2023-08-02 17:41:22.076  ...  : consumer1 received message: work queue test[4]
2023-08-02 17:41:22.136  ...  : consumer1 received message: work queue test[6]
2023-08-02 17:41:22.198  ...  : consumer1 received message: work queue test[8]
...
2023-08-02 17:41:23.244  ...  : consumer1 received message: work queue test[42]
2023-08-02 17:41:23.307  ...  : consumer1 received message: work queue test[44]
2023-08-02 17:41:23.367  ...  : consumer1 received message: work queue test[46]
2023-08-02 17:41:23.429  ...  : consumer1 received message: work queue test[48]
2023-08-02 17:41:23.491  ...  : consumer1 received message: work queue test[50]

以及:

2023-08-02 17:41:22.014  ...  : consumer2 received message: work queue test[1]
2023-08-02 17:41:22.120  ...  : consumer2 received message: work queue test[3]
2023-08-02 17:41:22.228  ...  : consumer2 received message: work queue test[5]
2023-08-02 17:41:22.335  ...  : consumer2 received message: work queue test[7]
...
2023-08-02 17:41:24.272  ...  : consumer2 received message: work queue test[43]
2023-08-02 17:41:24.380  ...  : consumer2 received message: work queue test[45]
2023-08-02 17:41:24.489  ...  : consumer2 received message: work queue test[47]
2023-08-02 17:41:24.595  ...  : consumer2 received message: work queue test[49]

结果并不像我们设想的那样——消费者1因为处理速度快,多处理消息,而消费者2少处理消息,两者一起在1秒内将消息消耗掉。

实际情况是——消费者1和消费者2平分了消息(即使它们处理速度不同),消费者1消费偶数消息,消费者2消费奇数消息。最终的效果就是消费者1的确在1秒内消费掉了25条消息,但消费者2消费掉25条消息花了2秒多。

预读取

之所以出现这样的情况,是因为 RabbitMQ 存在一种预读取(Prefetch)机制,一旦有消息产生,无论两个消费者进程是不是已经完成对上一条消息的处理,它们都会依次从队列中抓取消息到本地保存起来,当上一条消息处理完后,消费者进程将直接从本地缓存的消息中拿取并处理消息。

我们可以通过配置文件修改这个策略,以实现我们对工作队列的预期效果:

spring:
  rabbitmq:
    listener:
      simple:
        prefetch: 1 # 每次只能获取一条消息,处理完成才能获取下一个消息

prefetch的默认值是无限。

修改子模块 consumer 的配置文件,并添加上述配置。

重启子模块 consumer 并执行 publisher 中工作队列的测试用例。

现在可以看到两个消费者都在1秒左右完成了对消息的处理,并且处理速度更快的消费者1处理了绝大多数消息,消费者2只处理了少数消息。

发布-订阅

之前介绍的两种 RabbitMQ 的使用模式中,同一个消息一旦被某个消费者接收,就会被从队列中移除。换句话说就是同一个消息只能被消费1次。如果我们需要某个消息被多个消费者接收,即所谓的发布(Publish)-订阅(subscribe)模型,就需要用其它的方式实现:

image-20210717165309625

这里的 exchange 是交换机,它的用途是将生产者发出的消息通过一定的规则路由到所绑定的队列中。交换机不保存消息,只路由消息,如果路由失败,消息将被丢弃。

FanoutExchange

交换机有多种类型,我们这里实际上要用到的是 FanoutExchange,这种交换机可以将一个消息同时路由到所有绑定的队列。

Spring 对交换机的抽象层次:

image-20210717165552676

下面用实际示例说明如何实现。

在示例中会创建一个消息发布者,两个消息接收者,它们同时会收到发布者发布的“同一个”消息。

首先我们需要在 consumer 子模块中创建发布-订阅模型,具体方式是添加一些 Spring Bean:

  • 1个 FanoutExchange类型的交换机用于路由消息。
  • 2个队列(Queue)用于接收交换机路由后的消息。
  • 2个绑定关系(Binding)用于表示路由到交换机的绑定。

在子模块 consumer 中添加一个配置类WebConfig

@Configuration
public class WebConfig {
    public static final String QUEUE1 = "queue.q1";
    public static final String QUEUE2 = "queue.q2";
    @Bean
    public FanoutExchange fanoutExchange(){
        return new FanoutExchange("exchange.fanout");
    }

    @Bean
    public Queue queue1(){
        return new Queue(QUEUE1);
    }

    @Bean
    public Queue queue2(){
        return new Queue(QUEUE2);
    }

    @Bean
    Binding binding1(Queue queue1, FanoutExchange fanoutExchange){
        return BindingBuilder.bind(queue1).to(fanoutExchange);
    }

    @Bean
    Binding binding2(Queue queue2, FanoutExchange fanoutExchange){
        return BindingBuilder.bind(queue2).to(fanoutExchange);
    }
}

FanoutExchange的构造器中传入的是交换机名称,Queue的构造器中传入的是队列名称。通过BindingBuilder.bind(...).to(...)可以创建队列和交换机的绑定关系。

将所有这些 bean 注册到 IOC 容器后,就可以在 RabbitMQ 中实现我们上边描绘的发布-订阅模型。

运行子模块 consumer 后,可以在 RabbitMQ 的管理页面查看创建好的交换机:

image-20230802193516986

可以点击交换机名称进入详细页面,会显示绑定到交换机的队列:

image-20230802193633502

消息发布者的代码与之前的略有不同:

@RunWith(SpringRunner.class)
@SpringBootTest
public class PublishDescribeTests {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    public void testPublishMessage(){
        String msg = "Hello, everyone!";
        String exchangeName = "exchange.fanout";
        rabbitTemplate.convertAndSend(exchangeName, "", msg);
    }
}

这里的convertAndSend传递了3个参数:

  • 交换机名称
  • 路由key
  • 消息

执行测试用例后就能看到两个消息消费者分别从两个队列中获取了消息。

DirectExchange

DirectExchange也是一种交换机,与FanoutExchange不同的是,与它绑定的队列需要添加一个bindingKey,发送给它的消息需要指定一个routingKey,它只会将消息路由给 bindingKeyroutingKey 相匹配的队列。

在子模块 consumer 中添加两个消息消费者:

@Component
@Log4j2
public class RabbitMQListeners {
    // ...
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue("direct.queue1"),
            key = {"red", "yellow"},
            exchange = @Exchange(value = "exchange.direct", type = ExchangeTypes.DIRECT)
    ))
    void directConsumer1(String msg) {
        log.info(String.format("Consumer1 has received message[%s] from queue[%s]", msg, "direct.queue1"));
    }

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue("direct.queue2"),
            key = {"red", "blue"},
            exchange = @Exchange(value = "exchange.direct", type = ExchangeTypes.DIRECT)
    ))
    void directConsumer2(String msg) {
        log.info(String.format("Consumer2 has received message[%s] from queue[%s]", msg, "direct.queue2"));
    }
}

与之前有所不同的是,这里不再通过配置类添加交换机、队列以及绑定关系。而是通过@RabbitListener注解来添加。具体是在@RabbitListenerbindings属性中用@QueueBinding定义绑定关系,通常需要定义其3个属性:

  • value,队列
  • key,前面提到的 bindingKey,用于和消息发送时传递的routingKey匹配。
  • exchange,交换机。表示交换机的注解@Exchange需要定义2个属性:
    • value,交换机名称
    • type,交换机类型

重新运行子模块 consumer 后可以在管理页面看到一个新的交换机 exchange.direct,在其详情页可以看到:

image-20230802202627428

这里列举的就是绑定到其上的队列以及对应的 bindingKey

生产者代码:

@RunWith(SpringRunner.class)
@SpringBootTest
public class PublishDescribeTests {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    // ...
    @Test
    public void testDirectExchangeMessageSend(){
        doDirectExchangeMsgSend("red");
        doDirectExchangeMsgSend("blue");
        doDirectExchangeMsgSend("yellow");
    }

    private void doDirectExchangeMsgSend(String routingKey) {
        String exchangeName = "exchange.direct";
        String msg = String.format("Hello, %s!", routingKey);
        rabbitTemplate.convertAndSend(exchangeName, routingKey, msg);
    }
}

这里依次发送3条消息,分别使用3个 routingKey

运行测试用例后 consumer 的输出:

...  : Consumer2 has received message[Hello, red!] from queue[direct.queue2]
...  : Consumer1 has received message[Hello, red!] from queue[direct.queue1]
...  : Consumer1 has received message[Hello, yellow!] from queue[direct.queue1]
...  : Consumer2 has received message[Hello, blue!] from queue[direct.queue2]

这符合用 routingKeybindingKey 匹配后的结果。

TopicExchange

TopicExchangeDirectExchange类似,区别在于其 routingKey 由多个.分隔的单词组成,并且可以在 bindingKey 中使用通配符。路由的时候交换机会按照通配符的匹配结果进行路由。

可以使用两种通配符:

  • *,匹配一个单词。
  • #,匹配一个或多个单词。

举个例子,如果 routingKeyhistory.china.archeology,与其匹配的 bindingKey 可以是:

  • history,china.*
  • history.#
  • #.archeology
  • *.china.#
  • 这里用history.china.archeology表示一个代表“中国古代史”的图书类别。
  • TopicExchange可以看作是一种提供了将消息按主题(Topic)进行订阅功能的交换机。

使用 TopicExchange 时与DirectExchange 没有太大区别,只需要修改交换机类别及使用通配符即可。

下面用实际示例说明:

在子模块 consumer 中添加“主题”消息的消费者:

@Component
@Log4j2
public class RabbitMQListeners {
	// ...
     /**
     * 消费中国历史书籍
     * @param msg
     */
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue("topic.queue1"),
            exchange = @Exchange(value = "exchange.topic", type = ExchangeTypes.TOPIC),
            key = "history.china.#"
    ))
    void topicMessageConsumer1(String msg){
        log.info(String.format("Consumer1 received message[%s].", msg));
    }

    /**
     * 消费所有的历史书籍
     * @param msg
     */
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue("topic.queue2"),
            exchange = @Exchange(value = "exchange.topic", type = ExchangeTypes.TOPIC),
            key = "history.#"
    ))
    void topicMessageConsumer2(String msg){
        log.info(String.format("Consumer2 received message[%s].", msg));
    }

    /**
     * 消费所有的古代史书籍
     * @param msg
     */
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue("topic.queue3"),
            exchange = @Exchange(value = "exchange.topic", type = ExchangeTypes.TOPIC),
            key = "#.archeology"
    ))
    void topicMessageConsumer3(String msg){
        log.info(String.format("Consumer3 received message[%s].", msg));
    }
}

添加好消费者、交换机、队列及绑定关系后,重启子模块 consumer,可以在管理页面看到交换机、队列及 bindingKey,这里不再演示。

在子模块 publisher 中添加发送主题消息的测试用例:

@RunWith(SpringRunner.class)
@SpringBootTest
public class PublishDescribeTests {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    // ...
    @Test
    public void testSendTopicMessages() {
        rabbitTemplate.convertAndSend("exchange.topic",
                "history.china.archeology",
                "《明朝那些事》");
        rabbitTemplate.convertAndSend("exchange.topic",
                "history.japan.archeology",
                "《日本战国史》");
        rabbitTemplate.convertAndSend("exchange.topic",
                "history.world.current",
                "《二战全景解读》");
    }
}

控制台日志:

2023-08-03 11:14:06.509  ...  : Consumer1 received message[《明朝那些事》].
2023-08-03 11:14:06.511  ...  : Consumer2 received message[《明朝那些事》].
2023-08-03 11:14:06.516  ...  : Consumer2 received message[《日本战国史》].
2023-08-03 11:14:06.519  ...  : Consumer2 received message[《二战全景解读》].
2023-08-03 11:14:06.509  ...  : Consumer3 received message[《明朝那些事》].
2023-08-03 11:14:06.512  ...  : Consumer3 received message[《日本战国史》].

为了方便解读,我调整了一下日志的输出顺序。

消息转换器

RabbitMQ 客户端实际发送和接收的消息都是字节,但我们可以传递 StringObject 类型的消息,这是因为 SpringAMQP 底层会使用一个消息转换器(MessageConverter),它可以将 StringObject 类型的消息转换为字节。

可以通过以下方式进行验证:

在子模块consumer的配置类中添加一个用于接收Object类型消息的队列:

@Configuration
public class WebConfig {
    // ...
    @Bean
    public Queue objectQueue(){
        return new Queue("queue.object");
    }
}

重启子模块以创建队列。

在子模块 publisher 中添加发送消息的测试用例:

@RunWith(SpringRunner.class)
@SpringBootTest
public class PublishDescribeTests {
    // ...
    @Test
    public void testSendObjectMessage(){
        Map<String, Object> msg = new HashMap<>();
        msg.put("name", "icexmoon");
        msg.put("age", 28);
        msg.put("phone", "123456");
        rabbitTemplate.convertAndSend("queue.object", msg);
    }
}

运行测试用例以发送消息。

此时查看 RabbitMQ 管理页面中的队列里的消息,可以看到:

image-20230803114649747

content_typeapplication/x-java-serialized-object

也就是说默认情况下通过 SpringAMQP 发送的Object类型的消息,是用 Java 对象序列化的方式进行编码的。自然的,接收方也会用 Java 对象反序列化进行解码。

这样的方式存在 Java 对象序列化本身的一些缺陷:

  • 编码/解码效率较低
  • 编码后的字符串比较长,传输效率低,占用存储空间
  • 存在注入攻击安全隐患

SpringAMQP 通过一个MessageConverter类型的 bean 实现消息的编码/解码,我们可以通过添加自定义的MessageConverter类型的 bean 来改变默认行为。

用 JSON 方式传递消息

开始示例前通过管理页面清空目标队列queue.object中的消息(Purge Messages 按钮),否则接收方会解码出错。

这里用 Jackson 实现的 json 来取代默认的消息编解码方式:

首先在父工程引入 Jackson 相关依赖:

<dependency>
    <groupId>com.fasterxml.jackson.core</groupId>
    <artifactId>jackson-databind</artifactId>
</dependency>

在子模块 publisher 中都添加一个自定义的MessageConverter类型的 bean:

@SpringBootApplication
public class PublisherApplication {
	// ...
    @Bean
    public MessageConverter messageConverter(){
        return new Jackson2JsonMessageConverter();
    }
}

再次执行测试用例以发送消息。

再查看管理页面,可以看到消息格式已经变为了 application/json,并且可以在 Payload 中看到 json 格式的消息内容:

image-20230803120424984

作为接收方,子模块 consumer 同样需要添加 bean 定义以修改默认的消息解码方式:

@SpringBootApplication
public class ConsumerApplication {
	// ...
    @Bean
    public MessageConverter messageConverter(){
        return new Jackson2JsonMessageConverter();
    }
}

添加一个消费者以接收消息:

@Component
@Log4j2
public class RabbitMQListeners {
	// ...
    @RabbitListener(queues = "queue.object")
    public void objectMessageConsumer(Map<String, String> msg) {
        log.info(String.format("Received message:%s", msg));
    }
}

重启子模块,就可以看到日志:

Received message:{phone=123456, name=icexmoon, age=28}

The End,谢谢阅读。

本文的完整示例代码可以从这里获取。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/834686.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

微信小程序:点击按钮实现数据加载(带模糊查询)

效果图 代码 wxml: <!-- 搜索框--> <form action"" bindsubmit"search_all_productiond"><view class"search_position"><view class"search"><view class"search_left">工单号:</view…

力扣 416. 分割等和子集

题目来源&#xff1a;https://leetcode.cn/problems/partition-equal-subset-sum/description/ C题解&#xff08;思路来源代码随想录&#xff09; &#xff1a; 背包问题有多种背包方式&#xff0c;常见的有&#xff1a;01背包、完全背包、多重背包、分组背包和混合背包等等。…

实验笔记之——Android项目的适配

android有一个很烦人的点就是版本之间差距较大&#xff0c;且不兼容&#xff0c;导致不同版本之间代码兼容很容易出问题&#xff0c;一个常见的例子就是几年前自己开发的app&#xff0c;几年后再用竟然配置不了。。。为此&#xff0c;写下本博客记录一下配置旧项目的过程。 …

IDA+Frida分析CTF样本和Frid源码和objection模块

文章目录 一些资料IDA调试命令IDA调试安卓的10个技巧objection基本使用 Wallbreaker1frida源码阅读之frida-java 第一个实例EasyJNI第二个实例objection资料 art_trace2.pyart_trace2.js IDAFrida分析CTF样本和Frid源码和objection模块 一些资料 IDA调试命令 adb devices adb…

Redis 如何解决缓存雪崩、缓存击穿、缓存穿透难题

前言 Redis 作为一门热门的缓存技术&#xff0c;引入了缓存层&#xff0c;就会有缓存异常的三个问题&#xff0c;分别是缓存击穿、缓存穿透、缓存雪崩。我们用本篇文章来讲解下如何解决&#xff01; 缓存击穿 缓存击穿: 指的是缓存中的某个热点数据过期了&#xff0c;但是此…

我们一起聊聊Docker And Dockerfile

目录 一、前言 二、了解Dockerfile 三、Dockerfile 指令 四、多阶段构建 五、Dockerfile 高级用法 六、小结 一、前言 对于开发人员来说&#xff0c;会Docker而不知道Dockerfile等于不会Docker&#xff0c;上一篇文章带大家学习了Docker的基本使用方法&#xff1a;《一文…

Mybatis where 1=1 会导致索引失效?

背景 这几天在网上百度看到有说法 where 11 会导致索引失效 实践 1.直接where 条件 这是我自己本地建立的表&#xff0c;索引也看到了&#xff0c;是这个index_shopname 2.where 11 and 条件 这个是加了11的&#xff0c;可以看到也是走了索引的 3.直接select * from whe…

测试平台——用户模块开发

这里写目录标题 一、创建子应用二、用户注册设计1、用户注册模型类设计a、Django认证系统提供了用户模型类User&#xff0c;为什么还要定义User模型类?b、AbstractUserc、自定义用户模型类的字段有d、User模型类编写好了就可以了吗? 2、用户注册序列化器类设计a、注意b、单字…

基于分级安全的OpenHarmony架构设计

本文转载自 OpenHarmony TSC 官方微信公众号《峰会回顾第1期 | 基于分级安全的OpenHarmony架构设计》 演讲嘉宾 | 付天福 回顾整理 | 廖 涛 排版校对 | 李萍萍 嘉宾简介 付天福&#xff0c;OpenHarmony技术指导委员会安全及机密计算TSG负责人&#xff0c;华为公司科学家委员会…

机器人“瓦力”近在咫尺?谷歌最新的RT-2 AI模型简介

“首创”的机器人 AI 模型能够识别垃圾并执行复杂的动作。 上周五&#xff0c;谷歌 DeepMind 宣布了机器人变形器 2&#xff08;RT-2&#xff09;&#xff0c;这是一种“首次推出”的视觉-语言-行动&#xff08;VLA&#xff09;模型&#xff0c;利用从互联网上抓取的数据&…

理解 CSS 中的 Containing Block

前言 在开始本文之前先来看一个例子&#xff0c;下面一段简单的 html 代码&#xff0c;布局很简单&#xff1a; <!DOCTYPE html> <html lang"en"><head><meta charset"UTF-8" /><meta name"viewport" content"w…

clickhouse调研报告2

由Distributed表发送分片数据 clickhouse分区目录合并 clickhouse副本协同流程 clickhouse索引查询逻辑 clickhouse一级索引生成逻辑(两主键) clickhouse的data目录下包含如下目录: [root@brfs-stress-01 201403_10_10_0]# ll /data01/clickhouse/data total 4 drwxr-x---…

【Linux操作系统】网络配置详解:从原理到实践(详细通俗讲明DNS)

导语&#xff1a;网络配置是Linux系统中的一项重要任务&#xff0c;合理的网络配置可以保证计算机与其他设备的正常通信。本文将详细介绍Linux网络配置的原理和实践&#xff0c;包括网络配置原理、查看网络IP和网关、测试网络连通性、网络环境配置、设置主机名和hosts映射以及主…

获取全部的地区并生成表格

思路 写文章的时间2023-8-4&#xff0c;大部分网页设置的区域都是先是省&#xff0c;然后通过省获取对应的市&#xff0c;再通过市获取对应的区&#xff0c;以此类推。所以模拟的请求也是按照这个逻辑&#xff0c;先获取所有的省&#xff0c;再获取所有的市&#xff0c;最后获取…

【2023华数杯全国大学生数学建模竞赛】C题 母亲身心健康对婴儿成长的影响第一、二问

第一问部分截图 第二问部分截图 参考文献 理论和可直接运行代码获取参见&#xff1a;理论和可直接运行代码获取参见&#xff1a;理论和可直接运行代码获取参见&#xff1a;理论和可直接运行代码获取参见&#xff1a; 有人看的话更新后续问题思路。

【项目经验】产研流程(超级详细的步骤)

一、产研流程简述 项目立项-——定需求——Sprint需求宣讲会——技术方案——技术方案评审会——开发及单元测试——测试用例评审会——提测——测试——Sprint评审会——发版——Sprint复盘会 二、产研流程详情 以下部分根据Sprint里程碑节点进行循环&#xff08;sprint周期…

Java8实战-总结12

Java8实战-总结12 Lambda表达式Lambda 和方法引用实战第1步&#xff1a;传递代码第2步&#xff1a;使用匿名类第3步&#xff1a;使用Lambda表达式第4步&#xff1a;使用方法引用 复合Lambda表达式的有用方法比较器复合逆序比较器链 函数复合 Lambda表达式 Lambda 和方法引用实…

【C++】从无到有了解并掌握C++面向对象编程的三大特性——封装、继承、多态

前置知识&#xff1a;类和对象 参考书籍&#xff1a;《C Primer 第五版》 目录 什么是面向过程&#xff1f;什么是面向对象&#xff1f; 一、封装 1、封装的含义以及如何实现封装 1.1 访问限定符&#xff08;访问说明符&#xff09; 1.2 什么是封装&#xff1f; 2、封装的优点…

2023年华数杯选题人数发布!!

该选题人数&#xff0c;主要基于根据各个平台开赛后12小时各项数据统计&#xff0c;进行评估&#xff08;方法见注释&#xff09;&#xff0c;最终得出2023年华数杯选选题人数&#xff0c;大致为 题号选题人数A120B159C420 注释&#xff1a;选题人数来源&#xff1a;源自各个平…

Java字符串常量池以及new String(“abc“)到底创建了几个对象?各种字符串到底相不相等?

new String(“abc”)到底创建了几个对象&#xff1f; 字符串常量池 是 JVM 为了提升性能和减少内存消耗针对字符串&#xff08;String 类&#xff09;专门开辟的一块区域&#xff0c;主要目的是为了避免字符串的重复创建。 1.如果字符串常量池中不存在“abc”的引用&#xff…