Idea+maven+springboot项目搭建系列--2 整合Rabbitmq完成客户端服务器端消息收发

news2024/11/24 10:29:49

前言:本文通过springBoot -maven 框架,对Rabbitmq 进行整合,完成客户端消息的发送和消费;

1 为什么要使用Rabbitmq:

RabbitMQ 是一个可靠的、灵活的、开源的消息中间件,具有以下优点:

  • 异步通信:RabbitMQ 支持异步通信,使得消息发送者和接收者能够异步处理,提高了系统性能和吞吐量。

  • 解耦合:RabbitMQ 的消息队列机制可以将发送者和接收者解耦合,减少了应用程序之间的耦合度。

  • 可靠性高:RabbitMQ 支持事务和持久化,能够确保消息不会丢失。

  • 高吞吐量:RabbitMQ 支持多种吞吐量调优方法,能够处理高并发的消息通讯。

  • 可扩展性:RabbitMQ 支持集群和分布式部署,可以扩展到大规模的消息通讯场景。

RabbitMQ 提供了易用、高效、灵活、可靠的消息传递机制,可以帮助开发者更快地构建系统并实现各种复杂的业务场景。

2 springboot 整合:

2.1 pom 引入依赖:

 <!-- rabbitmq 自动装配 -->
<dependency>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<!-- 提供web访问 默认端口8080 -->
<dependency>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter-web</artifactId>
</dependency>

<!-- loomback 用于生成get set 方法 -->
<dependency>
   <groupId>org.projectlombok</groupId>
   <artifactId>lombok</artifactId>
   <optional>true</optional>
</dependency>
<dependency>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter-test</artifactId>
   <scope>test</scope>
</dependency>
<!-- 阿里的json 数据转换 -->
<dependency>
   <groupId>com.alibaba.fastjson2</groupId>
   <artifactId>fastjson2</artifactId>
   <version>2.0.31</version>
</dependency>

2.2 连接参数配置:
2.2.1 基础配置:
基础配置后springboot 的自动装载机制会注册一个RabbitTemplate rabbitTemplate 对象用于消息的接收和发送;

############# 基础配置
# mq 服务器的地址
spring.rabbitmq.host=localhost
# mq 服务器的端口
spring.rabbitmq.port=5672
# mq 服务器的连接使用的用户名
spring.rabbitmq.username=admin
# mq 服务器的连接使用的密码
spring.rabbitmq.password=rabbitmq
# mq 服务器的连接使用的虚拟机
spring.rabbitmq.virtual-host=my_vhost

注意: 其中 spring.rabbitmq.virtual-host 为隔离的虚拟机,需要根据自己业务进行配置,如果rabbitmq 有web 端可以在web端创建需要的v_host:
在这里插入图片描述
2.2.2 可扩展的连接参数配置:

############# 连接和管道配置
# When the cache mode is 'CHANNEL', the connection cache size cannot be configured.
# spring.rabbitmq.cache.connection.mode 为connection 生效 ,connection 连接池的大小
#spring.rabbitmq.cache.connection.size=3
# 与broker 连接的 模式 channel 或者 connection 默认channel 
spring.rabbitmq.cache.connection.mode=channel
# 与broker 连接的默认时间,默认为 60000即 60 秒,超时会会中断并抛出异常,单位毫秒
spring.rabbitmq.connection-timeout=1000
# 每个连接中可以建立的channel 数量,默认值25
spring.rabbitmq.cache.channel.size=50
# 如果已达到channel缓存大小,等待获取channel的时间。 如果为0,则始终创建一个新channel
# 默认值为 -1,表示不限制等待时间,即一直等待直到获取到可用的 Channel,单位毫秒
spring.rabbitmq.cache.channel.checkout-timeout=2000
# 指定心跳超时,单位秒,0为不指定;默认60s
spring.rabbitmq.requested-heartbeat=60
# 客户端总共可以创建总的channel 数量
spring.rabbitmq.requested-channel-max=1024

默认与rabbitmq 的连接为channel,多个channel 公用一个connection , 每个线程都从缓存池中获取channel ,每个线程中持有的channel 是互相隔离的;

2.3 生产者发送消息:
生产者发送消息主要是通过 引入 RabbitTemplate 模版对象来完成;这里按照发送消息发送的场景分别进行介绍:

2.3.1 交换机和队列的绑定:
因为消息最开始是要发送到交换机上的,然后在通过交换机通过routkey 路由键到匹配的队列中;所以我们需要先在项目中使用的
virtual-host 中去分别创建交换机和队列,然后进行绑定;一帮情况下,我们应该向运维去申请自己的虚拟机,交换机,队列,然后通过后,项目中直接使用即可;当然通过代码也完全可以进行交换机和队列的创建和绑定,这里我们通过web 页面来进行处理:

2.3.1.1交换机的创建:
在这里插入图片描述

  • Virtual host : 对应隔离的虚拟机,所以需要选择项目中 通过spring.rabbitmq.virtual-host 参数连接的虚拟机;

  • Name: 虚拟机的名称,见名知意即可;

  • Type: 虚拟机的类型:比较常用的有直连 direct; 主题topic,广播fanout;
    在这里插入图片描述
    这里对交换机的类型进行简单的介绍:

  • 直连direct的交换机,交换机直接与队列完成绑定,通过发送消息是携带的Routing Key 和队列与 Exchange 绑定时指定的 Routing Key 精准匹配,然后路由消息到指定队列中:

  • Direct Exchange
    Direct Exchange 是最简单的交换机类型,交换机直接与队列完成绑定,它根据消息携带的 Routing Key 和队列与 Exchange 绑定时指定的 Routing Key 精准匹配,然后路由消息到指定队列中。 Direct Exchange 可以理解为一张路由表,交换机通过 Routing Key 在路由表中查找匹配队列,将消息从生产者处发送到匹配队列。

  • Topic Exchange
    Topic Exchange 根据 Routing Key 的匹配规则将消息路由到对应的队列中。Topic Exchange 支持两种匹配规则:* 代表通配符,表示可以匹配一个单词,# 代表通配符,表示可以匹配多个单词。例如,Routing Key 为 com.XXX.# 的消息会被路由到匹配 com.XXX. 开头的所有队列中,Routing Key 为 # ,会匹配到所有的消息;列如 user.* 匹配 user. 后跟一个单词的消息,可以匹配到user.a 但是匹配不到user.a.b 。

  • Fanout Exchange
    Fanout Exchange 会将消息路由到所有绑定到它上面的队列中。Fanout Exchange 的路由方式与路由表无关,会忽略 Routing Key,与 Direct Exchange 和 Topic Exchange 相比,它具有更高的传输效率和更低的消耗。

  • Headers Exchange
    Headers Exchange 根据消息头中的键值对匹配规则将消息路由到对应的队列中。Headers Exchange 的匹配规则相对较复杂,需要在绑定时指定键值对的匹配方式。

  • Durability : 交换机是否持久化到磁盘的属性值设置

  • 如果将 Durability 属性设置为 durable ,表示交换器会被持久化到磁盘上,即使 RabbitMQ 服务器在交换机定义被创建之后终止,交换机定义仍然能够在服务器重新启动时得到恢复,从而保证交换机在重启后仍然存在。

  • 如果将 Durability 属性设置为 transient ,表示交换器不会被持久化到磁盘上,如果 RabbitMQ 服务器重启,则该交换器定义将会丢失。

  • Auto delete 用于指定该交换机是否自动删除。当一个交换机关联的所有队列都被删除时,如果交换机的 Auto Delete 属性为 true,则该交换机也会被自动删除

  • Internal 是否为内部交换机:
    内部交换机的 internal 属性设置为 true,使其只能被通过 AMQP 协议连接到相同 Virtual Host 的客户端使用,不能被直连类型的 Exchange 或 Headers 类型的 Exchange 所使用。
    内部交换机只能用于消费者和生产者在同一个 RabbitMQ 实例中的场景,而不能用于服务器和客户端之间传递消息。
    内部交换机主要用于应用程序之间传递消息,而不是用于服务器和客户端之间传递消息。

  • Arguments:交换机的额外属性,比较常用的属性如alternate-exchange:指定备用交换机。如果一条消息无法被路由到任何队列中,那么它将被发送到备用交换机中;

一般我们创建交换机时只需要选择Virtual host:,填入交换机的名称,选择交互机的类型这3项,其它都默认即可:
在这里插入图片描述

2.3.1.2 队列的创建:
在这里插入图片描述- type 队列的类型:
在 RabbitMQ 中,队列的 type 参数共有三种,分别是 classic、quorum 和 stream。它们的区别可以简单概括如下:
classic 队列:
最早的、经典的队列类型,支持多个消费者竞争消费消息,但是在节点宕机时可能会出现消息丢失的情况。适用于简单的消息处理场景。

quorum 队列:
支持高可用性、多个消费者竞争消费的队列类型。它通过复制机制保证消息的可靠性,可以在节点宕机时自动进行故障转移,避免消息丢失。适用于需要高可用特性的分布式环境中使用,但相对来说,quorum 队列性能较 classic 队列有所下降。

stream 队列:
支持无限缓存的消息流队列,可以通过队列中的缓存来处理各种等待中的问题。传统队列中当消息进入队列时,它就被立即写入了内存中,并等待处理。这样做的问题是,当生产者不断地发送消息时,很容易将内存撑满。 stream 队列则允许队列的缓存区域随着时间和队列大小的增长而扩展,使得待处理的消息可以在缓存区域中有所体现。适用于需要处理海量时间序列数据的场景。

需要注意的是,stream 队列是从 RabbitMQ 3.8 开始引入的新类型,目前还不是很成熟,可能在稳定性和性能方面还需要更多的优化和改进。因此,在选择队列类型时,需要结合具体的业务情况和系统限制,选择采用 classic、quorum 还是 stream 队列,以达到最优的性能和可用性。

  • Name 队列的名称;
  • Durability 队列是否持久化,参数意义同交换机;
  • Auto delete:
    在 RabbitMQ 中,队列的 auto-delete 参数用于控制队列的自动删除行为。如果将 auto-delete 参数设置为 true,则在最后一个消费者断开连接时,队列会自动被删除。
  • Arguments 队列参数的额外选择;

通常创建队列时只需要选择Virtual host,填入队列的名称,其它项默认即可:
在这里插入图片描述

2.3.1.3 交换机和队列的绑定:完成交换机和队列关系的绑定
在这里插入图片描述
2.3.2 发送消息:
2.3.2.2 生产者参数的配置:

########## 生产者配置
spring.rabbitmq.template.exchange=my_exchange
# 启用消息投递结果确认
spring.rabbitmq.publisher-returns=true
# 启用强制消息投递,即生产者发送消息成功或者失败,需要返回确认消息
spring.rabbitmq.template.mandatory=true
# 消息发布者确认模式
spring.rabbitmq.publisher-confirm-type=correlated

# 发送重试是否可用
spring.rabbitmq.template.retry.enabled= true
# 最大重试次数,默认值为 3
spring.rabbitmq.template.retry.max-attempts=3
# 第一次和第二次尝试发布或传递消息之间的间隔,默认值为 1000 毫秒
spring.rabbitmq.template.retry.initial-interval=1000
#表示时间间隔的倍数系数,默认值为 1 当进行第 n 次重试时,
# 会将时间间隔设置为  initial-interval * multiplier^(n-1) ,用于控制重试时间间隔逐渐增加
spring.rabbitmq.template.retry.multiplier=1 
# 表示时间间隔的最大值,默认值为 10000 毫秒
spring.rabbitmq.template.retry.max-interval= 1000

2.3.2.3 使用RabbitTemplate 模版发送单条消息,发送多条消息,发送延迟消息,使用自定义的RabbitTemplate 发送事务消息:
1) 定义一个类来封装我们要发送的消息结构:

package com.example.rabbitmqdemo.rabbitmq.msgDto;

import lombok.AllArgsConstructor;
import lombok.Data;

import java.io.Serializable;

@Data
@AllArgsConstructor
public class MsgDto implements Serializable {
    // 消息类型
    private String msgType;
    // 消息体
    private Object body;
}

2) 对RabbitTemplate 模版对象配置消息确认:
如果消息投递失败,我们需要对此类消息进行记录,方便后续进行数据补偿;

package com.example.rabbitmqdemo.rabbitmq.config;


import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.transaction.RabbitTransactionManager;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;

@Slf4j
@Component("rabbitMqCustomerConfig")
public class BatchConfig {
    @Value("${env:prod}")
    private String env;
    @Autowired
    SimpleRabbitListenerContainerFactory containerFactory;
    @Autowired
    RabbitTemplate rabbitTemplate;


    @PostConstruct
    public void simpleListenerBatchInit() {
        log.info("设置批量-----");
        containerFactory.setBatchListener(true);
        if ("prod".equals(env)) {
            // 依照不同的环境进行开启
            containerFactory.setAutoStartup(true);
        }


        // 设置 ConfirmCallback 回调函数 确认消息是否成功发送到 Exchang
        rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
            if (ack) {
                if (null == correlationData) {
                    // 延迟消息 correlationData 为null
                    return;
                }
                log.debug("Message sent successfully:{} ", correlationData.getId());

            } else {
                if (null == correlationData && null == cause) {
                    // 延迟消息 correlationData 为null
                    return;
                }
                log.error("Message sent failed: {}", correlationData.getId() + ", cause: " + cause);
            }
        });
        // ReturnCallback  处理的是未路由的消息返回的情况
        rabbitTemplate.setReturnCallback((oneMessage, replyCode, replyText, exchange, routingKey) -> {
            // 判断是否是延迟消息
            if (routingKey.indexOf("delay") != -1) {
                // 是一个延迟消息,忽略这个错误提示
                return;
            }
            log.debug("Message returned: {}", new String(oneMessage.getBody()) + ", replyCode: " + replyCode + ", replyText: " + replyText + ", exchange: " + exchange + ", routingKey: " + routingKey);
        });

    }


}

3) 因为发送事务需要关闭消息的确认,所以这里重新定义一个RabbitTemplate 模版用来发送事务消息:

package com.example.rabbitmqdemo.rabbitmq.config;

import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class TxRabbitTemplate {
    @Value("${spring.rabbitmq.host}")
    private String host;
    @Value("${spring.rabbitmq.username}")
    private String username;
    @Value("${spring.rabbitmq.password}")
    private String password;
    @Value("${spring.rabbitmq.port}")
    private String port;
    @Value("${spring.rabbitmq.virtual-host}")
    private String virtualHost;

    @Bean(value = "txRabbitTemplat")
    public RabbitTemplate rabbitTemplate() {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory());
        rabbitTemplate.setChannelTransacted(true);
        return rabbitTemplate;
    }

    private ConnectionFactory connectionFactory() {
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
        connectionFactory.setHost(host);
        connectionFactory.setUsername(username);
        connectionFactory.setPassword(password);
        connectionFactory.setVirtualHost(virtualHost);
        connectionFactory.setCacheMode(CachingConnectionFactory.CacheMode.CHANNEL);
        connectionFactory.setChannelCacheSize(10);
        // 关闭消息的ack 确认
        connectionFactory.setPublisherConfirms(false);
        connectionFactory.setPublisherReturns(false);

        return connectionFactory;
    }
}

4)使用自动装配的RabbitTemplate 模版来进行 消息发送 :

package com.example.rabbitmqdemo.rabbitmq.producer;

import com.alibaba.fastjson2.JSONObject;
import com.example.rabbitmqdemo.rabbitmq.msgDto.MsgDto;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

import java.io.IOException;
import java.nio.charset.Charset;
import java.util.UUID;

@Slf4j
@Component
public class MessageProducer {
    // 这里可以指定一个默认发送使用的交换机
    @Value("${amqp-binding.exchange-name:my_exchange}")
    private String exchangeName;


    @Autowired
    private RabbitTemplate rabbitTemplate;
    @Autowired
    @Qualifier("txRabbitTemplat")
    private RabbitTemplate txRabbitTemplate;


    /**
     * 指定的routKey 发送信息
     *
     * @param message
     */
    public void sendMessage(String routKey, Object message) {
        this.sendMessage(exchangeName, routKey, JSONObject.toJSONString(message));
    }

    /**
     * 通过交换机,路由key 发送消息
     *
     * @param exchangeName
     * @param routKey
     * @param message
     */
    public void sendMessage(String exchangeName, String routKey, Object message) {
        // 设置消息的唯一标识符
        long deliveryTag = System.currentTimeMillis();
        rabbitTemplate.convertAndSend(exchangeName, routKey, message, messagePostProcessor -> {
            messagePostProcessor.getMessageProperties().setMessageId(String.valueOf("messageId_" + deliveryTag));
            return messagePostProcessor;
        }, new CorrelationData(UUID.randomUUID().toString()));

    }


    /**
     * 指定的routKey 发送批量信息
     *
     * @param messages
     */
    public void sendMessageBatch(String routKey, MsgDto messages) {
        this.sendMessageBatch(exchangeName, routKey, JSONObject.toJSONString(messages));
    }

    /**
     * 通过交换机,路由key 发送批量信息
     *
     * @param exchangeName
     * @param routKey
     * @param messages
     */
    public void sendMessageBatch(String exchangeName, String routKey, Object messages) {
        rabbitTemplate.convertSendAndReceive(exchangeName, routKey, messages, messagePostProcessor -> {
            messagePostProcessor.getMessageProperties().setMessageId(String.valueOf("messageId_" + 1));
            return messagePostProcessor;
        }, new CorrelationData(UUID.randomUUID().toString()));
    }

    /**
     * 指定的routKey 发送信息
     *
     * @param message
     */
    public void sendDelayMessage(String routKey, Object message, long delayTime) {
        this.sendDelayMessage(exchangeName, routKey, message, delayTime);
    }

    /**
     * 指定的routKey 发送延迟信息
     *
     * @param message
     */
    public void sendDelayMessage(String exchangeName, String routKey, Object message, long delayTime) {
        log.debug("producer send delay message:{}", message);
        rabbitTemplate.convertAndSend(exchangeName, routKey, message, header -> {
            header.getMessageProperties().setHeader("x-delay", delayTime);
            return header;
        });
    }

    /**
     * 指定的routKey 发送事务信息
     *
     * @param message
     */
    @SneakyThrows
    public void sendTxMessage(String exchangeName, String routKey, Object message) {
        log.debug("producer send delay message:{}", message);
        String messageStr = JSONObject.toJSONString(message);
        // method 1:
//        sendTransactedMsgByNewChannel(exchangeName,routKey,message);
        // method2:
        sendTransactedMsgByNTemplate(exchangeName, routKey, messageStr);


    }

    private void sendTransactedMsgByNTemplate(String exchangeName, String routKey, String message) {
        txRabbitTemplate.execute(channel -> {
            try {
                String messageId = UUID.randomUUID().toString() + "_messageId";
                String correlationId = UUID.randomUUID().toString() + "_correId";

                // 创建 BasicProperties 对象并设置属性
                AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
                        .messageId(messageId)
                        .correlationId(correlationId)
                        .build();
                channel.txSelect(); // 开启事务

                channel.basicPublish(exchangeName, routKey, properties, message.getBytes(Charset.forName("UTF-8"))); // 发送消息
//                "124".substring(7);
                channel.txCommit(); // 提交事务
            } catch (Exception e) {
                channel.txRollback(); // 回滚事务
            }
            return true;
        });
    }

    @SneakyThrows
    private void sendTransactedMsgByNewChannel(String exchangeName, String routKey, String message) {
        // 获取新的channel 对象
        Channel channel = txRabbitTemplate.getConnectionFactory().createConnection().createChannel(true);
        // 开启事务
        channel.txSelect();
        try {
            // 消息格式化
            channel.basicPublish(exchangeName, routKey, null, message.getBytes(Charset.forName("UTF-8")));
            // 消息提交
            channel.txCommit();
        } catch (IOException e) {
            channel.txRollback();
            throw e;
        }
    }


}

5)测试代码:

package com.example.rabbitmqdemo.rabbitmq.controller;

import com.example.rabbitmqdemo.rabbitmq.enums.RabbitRoutKeyEnum;
import com.example.rabbitmqdemo.rabbitmq.msgDto.MsgDto;
import com.example.rabbitmqdemo.rabbitmq.producer.MessageProducer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

import java.util.ArrayList;
import java.util.List;

@RestController
public class TestSendMsgController {
    @Autowired
    private MessageProducer messageProducer;

    @GetMapping("/sendMsg")
    public boolean sendMsg(@RequestParam String content,@RequestParam String routKey) {
        List<Object> msgs = new ArrayList<>(10);
        for (int i = 0; i < 10; i++) {
            msgs.add(content+"_"+i);

        }
        msgs.stream().forEach(e->{
            MsgDto msgDto = new MsgDto("user",e);
            messageProducer.sendMessage(RabbitRoutKeyEnum.业务_单条消息.getRoutKey(),msgDto);
        });

        return true;
    }
    @GetMapping("/sendBatchMsg")
    public boolean sendBatchMsg(@RequestParam String content,@RequestParam String routKey) {
        List<Object> msgs = new ArrayList<>(10);
        for (int i = 0; i < 10; i++) {
            msgs.add(content+"_"+i);
        }
        MsgDto msgDto = new MsgDto("test",msgs);
        messageProducer.sendMessageBatch(RabbitRoutKeyEnum.业务_多条消息.getRoutKey(), msgDto);

        return true;
    }

    @GetMapping("/sendDelayMsg")
    public boolean sendDelayMsg(@RequestParam String content,@RequestParam long delayTime) {
        List<Object> msgs = new ArrayList<>(10);
        for (int i = 0; i < 10; i++) {
            msgs.add(content+"_"+i);
        }
        msgs.stream().forEach(e->{
            messageProducer.sendDelayMessage("my_delay_exchange",RabbitRoutKeyEnum.业务_延迟.getRoutKey(),e,delayTime);
        });

        return true;
    }

    @GetMapping("/sendTxMsg")
    public boolean sendTxMsg(@RequestParam String content) {
        List<Object> msgs = new ArrayList<>(10);
        for (int i = 0; i < 2; i++) {
            msgs.add(content+"_"+i);
        }
        msgs.stream().forEach(e->{
            MsgDto msgDto = new MsgDto("tx",e);
            messageProducer.sendTxMessage("my_tx_exchange",RabbitRoutKeyEnum.业务_事务.getRoutKey(),msgDto);
//            messageProducer.sendMessage(RabbitRoutKeyEnum.业务_单条消息.getRoutKey(),msgDto);
        });


        return true;
    }
}


这里分别测试了单条消息,多条消息,延迟消息,事务消息的发送,将其封装为MsgDto对象,在发送时将其转为json 字符串;基本上满足了大部分的业务场景;需要注意的是rabbitmq 中所谓批量发送的消息实际上会被消息压缩为1条消息进行发送,到达队列是也是1条消息;

6 )routKey 的枚举类:

package com.example.rabbitmqdemo.rabbitmq.enums;

import lombok.AllArgsConstructor;
import lombok.Getter;

@Getter
@AllArgsConstructor
public enum RabbitRoutKeyEnum {

    业务_单条消息("my_routKey"),
    业务_多条消息("my_batch_routKey"),
    业务_1("my_one_routKey"),
    业务_延迟("my_delay_routKey"),

    业务_事务("my_tx_routKey"),

    ;

    private String routKey;


}


至此我们已基本完成生产端消息的发送以及发送结果的监听处理;需要注意的是对于延迟消息,返回的确认消息correlationData 是一个null 值,所以这里对其消息的确认进行了一次特殊的判断;

3 消费者接收消息:

3.1 消费者参数的配置:

########## 消费者配置
# 是否自动启动消息的监听
spring.rabbitmq.listener.simple.auto-startup=false
# 消费消息确认模式
spring.rabbitmq.listener.simple.acknowledge-mode=manual
# 批量预取条数 默认值250
spring.rabbitmq.listener.simple.prefetch=50
# 开启批量消费
spring.rabbitmq.listener.simple.consumer-batch-enabled=true
# 批量消费的条数
spring.rabbitmq.listener.simple.batch-size=2
# 并发消费最小线程数
spring.rabbitmq.listener.simple.concurrency=1
# 并发消费最大线程数
spring.rabbitmq.listener.simple.max-concurrency=1


### 消费失败 重试参数
# 开启重试
spring.rabbitmq.listener.simple.retry.enabled=true
# 表示最大重试次数,默认值为 3
spring.rabbitmq.listener.simple.retry.max-attempts=3
# 表示第一次重试的时间间隔,默认值为 1000 毫秒
spring.rabbitmq.listener.simple.retry.initial-interval=1000
#表示时间间隔的倍数系数,默认值为 1 当进行第 n 次重试时,
# 会将时间间隔设置为  initial-interval * multiplier^(n-1) ,用于控制重试时间间隔逐渐增加
spring.rabbitmq.listener.simple.retry.multiplier=1
# 表示时间间隔的最大值,默认值为 10000 毫秒
spring.rabbitmq.listener.simple.retry.max-interval=1000
# 消息监听器是否启用无状态(stateless)重试 默认true
spring.rabbitmq.listener.simple.retry.stateless=false
# 控制当消息消费失败后,RabbitMQ 是否需要将消息重新入队。该参数的默认值为 true,即消息将被重新入队
spring.rabbitmq.listener.simple.default-requeue-rejected=true

以上参数,配置了消费端消费消息后的ack 机制为手动提交,并且设定了 批量预取条数 和每次批量消费的条数,以及消费失败的重试机制配置;

3.2 消费消息:
消费者监听某个或者几个队列,然后通过channel 获取要消费的消息:

package com.example.rabbitmqdemo.rabbitmq.consumer;

import com.alibaba.fastjson2.JSONObject;
import com.example.rabbitmqdemo.rabbitmq.msgDto.MsgDto;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.io.IOException;
import java.util.List;

@Slf4j
@Component
public class MessageConsumer {

    /**
     * 逐条/批量 消费
     *
     * @param messages
     */
//    @RabbitListener(queues = "my_queue_one")
    public void receiveMessage(List<Message> messages, Channel channel) throws IOException {
        log.debug("逐条消费消息:{}", messages);
        for (Message message : messages) {
            try {
//                // 处理消息
                log.debug("Received message: {}", message);
                String jsonMessage = new String(message.getBody(), "UTF-8");
                MsgDto body = JSONObject.parseObject(jsonMessage, MsgDto.class);
                // 数据处理

                // 手动发送 ack 消息
                channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
            } catch (Exception ex) {
                // 发生异常,手动发送 nack 消息
                channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);

            }
        }

    }


    /**
     * 逐条消费--延时消息
     *
     * @param messages
     */
    @RabbitListener(queues = "my_deay_queue")
    public void receiveDelayMessage(List<Message> messages, Channel channel) throws IOException {
        for (Message message : messages) {
            try {
                // 处理消息
                log.debug("Received delay message: {}", message);
                String jsonMessage = new String(message.getBody(), "UTF-8");
                MsgDto body = JSONObject.parseObject(jsonMessage, MsgDto.class);

                // 手动发送 ack 消息
                channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
            } catch (Exception ex) {
                // 发生异常,手动发送 nack 消息
                channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);

            }
        }

    }

    /**
     * 逐条消费--事务消息
     *
     * @param messages
     */
    @RabbitListener(queues = "my_tx_queue")
    public void receiveTxMessage(List<Message> messages, Channel channel) throws IOException {
        for (Message message : messages) {
            try {
                // 处理消息
                log.debug("Received delay message: {}", message);
                String jsonMessage = new String(message.getBody(), "UTF-8");
                MsgDto body = JSONObject.parseObject(jsonMessage, MsgDto.class);

                // 手动发送 ack 消息
                channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
            } catch (Exception ex) {
                // 发生异常,手动发送 nack 消息
                channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);

            }
        }

    }

}

这里我们接收到消息后,然后通过"UTF-8"编码(生产者默认按照UTF-8 对数据编码后进行发送)将字节数据转换为字符串,然后通过阿里的json jar 完成java 对象的转换,进行业务处理,最后手动提交消息;

4 总结:

  • Rabbitmq 对于消息的发送依赖于交换机,通过routKey 绑定不同的queue 完成消息的路由工作;
  • Rabbitmq 发送消息可以为其配置ack确认机制,以及发送失败重试机制参数可以配合完成消息的发送;
  • Rabbitmq 发送消息可以进行批量发送,但是本质上会被合并到一条消息进行发送;
  • Rabbitmq 对于消息的消费,依赖于构建channel 管道 ,绑定queue 完成消息的消费;
  • Rabbitmq 消费消息,可以进行手动的ack 确认,并且可以设置消费重试参数,应便于消费失败的场景;

5 扩展:

5.1 rabbitmq 发送事务消息为什么要关闭 消息的确认回调?

在RabbitMQ中,如果发送事务消息,并且开启了确认模式,那么需要特别注意关闭消息的确认回调,以避免一些潜在的问题。
在RabbitMQ中,开启事务模式后,生产者发送消息时,RabbitMQ会将消息缓存在生产者端。在事务提交之前,不会直接将消息发送到队列。如果在事务未提交的情况下,RabbitMQ服务器异常中断或者连接被关闭,那么消息将会丢失。为了避免这种情况的发生,可以采用事务提交确认和确认模式,在确认之后才将消息发送到队列中。

然而,在发送事务消息时,开启确认模式后,需要关闭消息的确认回调。这是因为在事务提交之前,消息并没有发送到队列中,确认回调将在消息发送到队列后才触发。而在事务模式下,消息已经被缓存到生产者端,没有被发送到队列中,所以确认回调不应该被触发。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/644662.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【C++】C++前言

Yan-英杰的主页 悟已往之不谏 知来者之可追 C程序员&#xff0c;2024届电子信息研究生 目录 1.什么是C 2.C的发展史 3.C的重要性 a.使用广泛 b.C/C的应用 1.操作系统以及大型系统软件开发 2.服务器端开发 3.游戏开发 4.嵌入式和物联网领域 5.数字图像处理 6.人工智…

AVL树的解析

我们在之前的学习里面已经发现了&#xff0c;搜索二叉树是有一些问题的。它可能会存在单边树的问题&#xff0c;如果你插入的值是有序的话&#xff0c;就会导致这个问题。 那我们肯定是要来解决一下的&#xff0c;如何解决呢&#xff1f; 》一种解决方案是AVL树&#xff0c;还有…

【云原生 | 54】Docker三剑客之Docker Compose应用案例二:大数据Spark集群

&#x1f341;博主简介&#xff1a; &#x1f3c5;云计算领域优质创作者 &#x1f3c5;2022年CSDN新星计划python赛道第一名 &#x1f3c5;2022年CSDN原力计划优质作者 &#x1f3c5;阿里云ACE认证高级工程师 &#x1f3c5;阿里云开发者社区专…

天下苦 Spring 久矣,Solon v2.3.3 发布

Solon 是什么框架&#xff1f; 一个&#xff0c;Java 新的生态型应用开发框架。它从零开始构建&#xff0c;有自己的标准规范与开放生态&#xff08;全球第二级别的生态&#xff09;。与其他框架相比&#xff0c;它解决了两个重要的痛点&#xff1a;启动慢&#xff0c;费资源。…

HarmonyOS学习路之开发篇—Java UI框架(PositionLayoutAdaptiveBoxLayout)

PositionLayout 在PositionLayout中&#xff0c;子组件通过指定准确的x/y坐标值在屏幕上显示。(0, 0)为左上角&#xff1b;当向下或向右移动时&#xff0c;坐标值变大&#xff1b;允许组件之间互相重叠。 PositionLayout示意图 布局方式 PositionLayout以坐标的形式控制组件的…

基于Hexo和Butterfly创建个人技术博客,(4) 使用通用的Markdown语法编写博客文章

Hexo官司网查看 这里 hexo的博文建议是用markdown语法来写&#xff0c;原因markdown简单通用&#xff0c;比如很多博客平台都会提供md编辑器&#xff0c;这样如果我们想把同一篇文章发到多个博客平台上(事实上很多人也是这样做的)&#xff0c;md应该是最好的编写方法了&#xf…

目标检测数据集---交通信号数据集

✨✨✨✨✨✨目标检测数据集✨✨✨✨✨✨ 本专栏提供各种场景的数据集,主要聚焦:工业缺陷检测数据集、小目标数据集、遥感数据集、红外小目标数据集,该专栏的数据集会在多个专栏进行验证,在多个数据集进行验证mAP涨点明显,尤其是小目标、遮挡物精度提升明显的数据集会在该…

js控制台 console.log 输出美化,及其他操作

1.格式美化 console.log(%c红色%c蓝色%c绿色, color: red;, color: blue;, color: green;) console.log(%c一段文字\n换行一下\n%c SmileSay %c 版本&#xff1a;1.0.0 ,color: #3eaf7c; font-size: 16px;line-height:30px;,background: #35495e; padding: 4px; border-radius…

数仓数据质量保障方法

一、有赞数据链路 1、数据链路介绍 首先介绍有赞的数据总体架构图&#xff1a; 自顶向下可以大致划分为应用服务层、数据网关层、应用存储层、数据仓库&#xff0c;并且作业开发、元数据管理等平台为数据计算、任务调度以及数据查询提供了基础能力。 以上对整体架构做了初步…

射频电路layout总结

射频电路板设计由于在理论上还有很多不确定性&#xff0c;因此常被形容为一种“黑色艺术”&#xff0c;但这个观点只有部分正确&#xff0c;RF电路板设计也有许多可以遵循的准则和不应该被忽视的法则。在实际设计时&#xff0c;真正实用的技巧是当这些准则和法则因各种设计约束…

OpenCV(图像处理)-基于Oython-滤波器(低通、高通滤波器的使用方法)

1.概念介绍2. 图像卷积filter2D() 3. 低通滤波器3.1 方盒滤波和均值滤波boxFilter()blur() 3.2 高斯滤波&#xff08;高斯噪音&#xff09;3.3 中值滤波&#xff08;胡椒噪音&#xff09;3.4 双边滤波 4. 高通滤波器4.1Sobel&#xff08;索贝尔&#xff09;&#xff08;高斯&am…

软考A计划-系统架构师-知识点汇总-下篇

点击跳转专栏>Unity3D特效百例点击跳转专栏>案例项目实战源码点击跳转专栏>游戏脚本-辅助自动化点击跳转专栏>Android控件全解手册点击跳转专栏>Scratch编程案例点击跳转>软考全系列 &#x1f449;关于作者 专注于Android/Unity和各种游戏开发技巧&#xff…

​Agile与Scrum的比较

作者| Deepali chadokar Agile和Scrum是软件开发中经常使用的两个相关概念。Agile是一个概括性的术语&#xff0c;包含了一组软件开发的价值观和原则&#xff0c;而Scrum是Agile方法中的一个特定框架。 Agile强调协作、灵活性和适应性&#xff0c;以及应对变化的能力。此外&…

Vue中 echarts响应式页面变化resize()

前言 Vue项目中开发数据大屏&#xff0c;使用echarts图表根据不同尺寸的屏幕进行适配 BUG&#xff1a;当页面进行缩放时图表大小没有变化 使用到的方法&#xff1a; resize() 调用echarts中内置的resize函数进行自适应缩放&#xff0c;然后添加监控&#xff0c;页面销毁时删掉…

Zabbix“专家坐诊”第195期问答汇总

问题一 Q&#xff1a;麻烦请教一下zabbix服务器总是上报这几个告警&#xff0c;需要处理嘛&#xff1f;怎么处理&#xff1f; A&#xff1a;同步历史数据进程负载过高的话会影响到server的性能&#xff0c;建议增加服务器硬件配置。 Q&#xff1a;是需要增加哪方面的配置&…

ISO21434 威胁分析和风险评估方法(十二)

目录 一、概述 二、目标 三、资产识别 3.1 输入 3.1.1 先决条件 3.1.2 进一步支持信息 3.2 要求和建议 3.3 输出 四、威胁场景识别 4.1 输入 4.1.1 先决条件 4.1.2 进一步支持信息 4.2 要求和建议 4.3 输出 五、影响等级 5.1 输入 5.1.1 先决条件 5.1.2 进一…

制造业如何进行数字化转型?这个解决方案能帮你!

制造业如何有效实现数字化&#xff1f;制造业企业数字化的趋势已成必然&#xff0c;那么&#xff0c;如何进行制造业企业的数字建设成为各传统制造业企业的探索方向。 于是&#xff0c;我们团队在调研了数百家企业之后&#xff0c;形成了这套制造业数字化从0到1&#xff0c;从…

一文让你用上Xxl-Job 顺带了解cron表达式

文章目录 1.定时任务框架-xxljob1.1 Xxljob介绍1&#xff09;xxljob概述2&#xff09;XXL-JOB特性3) 整体架构4&#xff09;入门资料准备 1.2 xxljob快速入门1&#xff09;导入xxljob工程2&#xff09;配置数据库1.初始化SQL脚本2.配置数据库环境3.业务处配置任务注册中心 3&am…

【色度学】光学基础

1. 光的本质 &#xff08;1&#xff09;波长不同的可见光&#xff0c;引起人眼的颜色感觉不同。 &#xff08;2&#xff09;人们观察到的颜色是物体和特有色光相结合的结果&#xff0c;而不是物体产生颜色的结果。 2. 光度量 【ISP】光的能量与颜色&#xff08;1&#xff0…

【学术探讨】万能密码原理剖析

「作者主页」&#xff1a;士别三日wyx 「作者简介」&#xff1a;CSDN top100、阿里云博客专家、华为云享专家、网络安全领域优质创作者 「推荐专栏」&#xff1a;对网络安全感兴趣的小伙伴可以关注专栏《网络安全入门到精通》 【万能密码】&#xff0c;顾名思义&#xff0c;就是…