RabbitMQ从入门到入土

news2025/1/12 10:53:24

同步与异步

同步调用

优势:

  • 时效性强,等到结果后就返回

问题:

  • 扩展性差

  • 性能下降

  • 级联失败问题

异步调用

优势:

  • 耦合度低,扩展性强

  • 无需等待,性能好

  • 故障隔离,下游服务故障不影响上游

  • 缓存消息,削峰填谷

问题:

  • 不能立刻获得结果,时效性差

  • 不确定下游业务执行是否成功

  • 业务安全依赖于Broker的可靠性

Broker:(代理,常指一种中间件,用于协调和管理不同组件之间的通信、交互、服务调用)

异步调用基于消息推送的方式,一般包含3个角色。

  • 消息发送者

  • 消息代理:管理、暂存、转发消息

  • 消息接收者

MQ技术选型

MQ:消息队列,就是存放消息的队列,也就是异步调用中的Broker。

RabbitMQ、ActiveMQ、Rocket MQ、Kafka对比

RabbitMQActiveMQRocketMQKafka
公司/社区RabbitApache阿里Apache
开发语言ErlangJavaJavaScala&Java
协议支持AMQP,XMPP,SMTP,STOMPOpenWire,STOMP,REST,XMPP,AMQP自定义协议自定义协议
可用性一般
单机吞吐量一般非常高
消息延迟微秒级毫秒级毫秒级毫秒以内
消息可靠性一般一般
  • 追求可用性:Kafka、 RocketMQ 、RabbitMQ

  • 追求可靠性:RabbitMQ、RocketMQ

  • 追求吞吐能力:RocketMQ、Kafka

  • 追求消息低延迟:RabbitMQ、Kafka

个人认为:如果在项目中使用的是RabbitMQ,面试官问你为啥使用这个可以从以下入手:

  • 可用性高

  • 消息可靠性高

  • 虽然单机吞吐量一般,但是消息延迟低。为什么消息延迟低呢?

    • RabbitMQ之所以被认为能够提供较低的消息延迟,主要归因于以下几个因素:

      1. 高性能的底层实现:RabbitMQ是用Erlang语言编写的,Erlang专为高并发、分布式系统设计,提供了轻量级进程和共享无锁数据结构,这使得RabbitMQ在处理大量并发连接和消息时表现得非常高效。

      2. 零拷贝技术:RabbitMQ利用操作系统提供的零拷贝特性,减少数据在内核空间和用户空间之间的复制次数,从而加快消息传输速度,降低延迟。

      3. 多路复用的TCP连接:通过使用Channel(信道)这一概念,RabbitMQ可以在单个TCP连接上复用多个逻辑连接,减少了网络连接的开销,提升了通信效率。

      4. 可配置的消息优先级:RabbitMQ允许为消息设置优先级,这在某些场景下可以帮助紧急或高优先级的消息更快地被消费,减少它们的等待时间。

      5. 社区支持和成熟度:作为一个成熟的开源项目,RabbitMQ拥有活跃的开发者社区和丰富的文档资源,这意味着它经过了广泛的测试和优化,能够提供稳定的低延迟表现。

好,现在开始来介绍我们的RabbitMQ了

RabbitMQ

认识

整体架构和核心概念如下:

  • publisher:消息发布者

  • consumer:消息消费者

  • queue:队列,存储消息

  • exchange:交换机:转发消息

image-20240215142856543

Java客户端使用步骤

我们要知道的是RabbitMQ是根据AMQP协议来实现的:

AMQP:用于应用程序之间传递业务消息的开放标准

Spring AMQP:基于AMQP协议定义的一套API规范,提供了模板来发送和接收消息

使用之前你要从官网进行下载RabbitMQ这个中间件,并启动它。

下载地址:https://github.com/rabbitmq/rabbitmq-server/releases/tag/v3.13.3

官方使用地址:RabbitMQ Tutorials | RabbitMQ

  1. 引入Spring-amqp的依赖

    <dependency>
        <groupId>com.rabbitmq</groupId>
        <artifactId>amqp-client</artifactId>
        <version>5.13.0</version> <!-- 或者使用最新版本 -->
    </dependency>

    2、配置RabbitMQ服务端信息

    spring:
      rabbitmq:
        host: 127.0.0.1 #ip
        port: 5672      #端口
        username: guest #账号
        password: guest #密码
        virtualHost:    #链接的虚拟主机
        addresses: 127.0.0.1:5672     #多个以逗号分隔,与host功能一样。
        requestedHeartbeat: 60 #指定心跳超时,单位秒,0为不指定;默认60s
        publisherConfirms: true  #发布确认机制是否启用
        #确认消息已发送到交换机(Exchange)
        #publisher-confirm-type参数有三个可选值:
        #SIMPLE:会触发回调方法,相当于单个确认(发一条确认一条)。
        #CORRELATED:消息从生产者发送到交换机后触发回调方法。
        #NONE(默认):关闭发布确认模式。
        #publisher-confirm-type: correlated #发布确认机制是否启用 高版本Springboot使用替换掉publisher-confirms:true
        publisherReturns: true #发布返回是否启用
        connectionTimeout: #链接超时。单位ms。0表示无穷大不超时

    3、利用RabbitTemplate发送消息

    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    ​
    public class Producer {
    ​
        private static final String QUEUE_NAME = "hello";
    ​
        public static void main(String[] argv) throws Exception {
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("localhost"); // 如果RabbitMQ不在本地,请修改为主机地址
            try (Connection connection = factory.newConnection();
                 Channel channel = connection.createChannel()) {
                channel.queueDeclare(QUEUE_NAME, false, false, false, null);
                String message = "Hello World!";
                channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8"));
                System.out.println(" [x] Sent '" + message + "'");
            }
        }
    }

    4、利用@RabbitListener注解声明要监听的队列,监听消息

    import com.rabbitmq.client.*;
    ​
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    ​
    public class Consumer {
    ​
        private static final String QUEUE_NAME = "hello";
    ​
        public static void main(String[] argv) throws IOException, TimeoutException {
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("localhost"); // 如果RabbitMQ不在本地,请修改为主机地址
            Connection connection = factory.newConnection();
            Channel channel = connection.createChannel();
    ​
            DeliverCallback deliverCallback = (consumerTag, delivery) -> {
                String message = new String(delivery.getBody(), "UTF-8");
                System.out.println(" [x] Received '" + message + "'");
            };
            channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });
    ​
            System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
        }
    }

WorkQueue

  • 一个队列绑定多个消费者,加快消息处理速度。

  • 同一个消息只会被一个消费者处理

  • 通过设置prefetch来控制消费者预取的消息数量,处理完一条再处理下一条,实现能者多劳

spring:
    rabbitmq:
        listener:
            simple:
                prefetch: 1 #每次只能获取一条消息,处理完才能获取下一条消息

Java声明队列和交换机

SpringAMQP提供了几个类,用来声明队列、交换机及其绑定关系:

  • Queue:用于声明队列,可以用工厂类QueueBuilder构建

  • Exchange:用于声明交换机,可以用工厂类ExchangeBuilder构建

  • Binding:用于声明队列和交换机绑 定关系,可以用工厂类BindingBuilder构建

在consumer中创建一个类,声明队列和交换机:

1、通过配置实现

package cn.itcast.mq.config;
​
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
​
@Configuration
public class FanoutConfig {
    /**
     * 声明交换机
     * @return Fanout类型交换机
     */
    @Bean
    public FanoutExchange fanoutExchange(){
        return new FanoutExchange("itcast.fanout");
    }
​
    /**
     * 第1个队列
     */
    @Bean
    public Queue fanoutQueue1(){
        return new Queue("fanout.queue1");
    }
​
    /**
     * 绑定队列和交换机
     */
    @Bean
    public Binding bindingQueue1(Queue fanoutQueue1, FanoutExchange fanoutExchange){
        return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);
    }
​
    /**
     * 第2个队列
     */
    @Bean
    public Queue fanoutQueue2(){
        return new Queue("fanout.queue2");
    }
​
    /**
     * 绑定队列和交换机
     */
    @Bean
    public Binding bindingQueue2(Queue fanoutQueue2, FanoutExchange fanoutExchange){
        return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);
    }
}
​
2、通过注解实现

@RabbitListener(bindings = @QueueBinding(
        value = @Queue(name = "direct.queue1", durable="true"),
        exchange = @Exchange(name = "test.direct", type = ExchangeTypes.DIRECT),
        key = {"red", "blue"}
))
public void listenDirectQueue1(String msg) throws Exception{
    System.out.println("消费者1收到了 direct.queue1的消息:【" + msg + "】");
}

交换机

消息一般都是通过exchange来发送消息的,而不是直接发送到队列中的。

常用交换机的类型有以下三种:

  • Fanout(广播):将消息发送给所有跟该交换机绑定了的queue(就是每个人都能收到)

  • Direct(定向):消息根据规则路由到指定的queue

  • Topic(话题):根据类别来进行发送消息(类似Direct)

Fouout交换机

这里定义了一个生产者发送消息:

    @Test
    public void testSendFanout(){
        String exchangeName = "test.fanout";
        String msg = "hello, everyone!";
        rabbitTemplate.convertAndSend(exchangeName,null,msg);
    }

下面是两个消费者

package cn.itcast.mq.listener;
​
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
​
import java.time.LocalTime;
​
@Component
public class SpringRabbitListener {
    @RabbitListener(queues = "fanout.queue1")
    public void listenFanout1(String msg) throws InterruptedException {
        System.out.println("消费者1接收到消息:【" + msg + "】" + LocalTime.now());
        Thread.sleep(20);
    }
​
    @RabbitListener(queues = "fanout.queue2")
    public void listenFanout2(String msg) throws InterruptedException {
        System.err.println("消费者2........接收到消息:【" + msg + "】" + LocalTime.now());
        Thread.sleep(200);
    }
​
}
​

这里是接收到的消息,每个都接收到了。

消费者2........接收到消息:【hello, everyone!】18:50:53.628336200
消费者1接收到消息:【hello, everyone!】18:50:53.628336200

定向交换机

Direct Exchange会将收到的消息根据规则路由到指定的Queue,因此称为定向路由。

  • 每一个Queue都与Exchange设置一个BindingKey

  • 发布者发送消息时,指定消息的RoutingKey

  • Exchange将消息路由到BindingKey与消息RoutingKey一直的队列

添加配置类,将交换机和队列进行绑定:

@Configuration
@EnableRabbit
public class RabbitConfig {
​
    @Bean
    DirectExchange directExchange() {
        return new DirectExchange("test.direct");
    }
​
    @Bean
    Queue redQueue() {
        return new Queue("redQueue", false); // false 表示队列不是持久化的
    }
​
    @Bean
    Binding bindingRed(DirectExchange directExchange, Queue redQueue) {
        return BindingBuilder.bind(redQueue()).to(directExchange).with("red"); // 绑定键为"red"
    }
}

发送端:

  @Test
    public void testSendFanout(){
        String exchangeName = "test.direct";
        String msg = "hello, everyone!";
        rabbitTemplate.convertAndSend(exchangeName,"red",msg);
    }   //这个red是以及绑定了的BindingKey

消费端:

@Service
public class FanoutReceiver {
​
    @RabbitListener(queues = "redQueue")
    public void receiveMessage(String message) {
        System.out.println("Received from redQueue: " + message);
    }
}

话题交换机

TopicExchange与DirectExchange类似,区别在于routingKey可以是多个单词的列表,并且以 . 分隔。例如:

China.news (代表中国新闻这个列表)

Queue与Exchange指定BindingKey时可以使用通配符:

  • #: 代指0个或多个单词

  • *:代指一个单词

配置类代码:

@Configuration
@EnableRabbit
public class RabbitConfig {
​
    @Bean
    TopicExchange topicExchange() {
        return new TopicExchange("test.topic");
    }
​
    @Bean
    Queue chinaNewsQueue() {
        return new Queue("china.news.queue", false);
    }
​
    @Bean
    Queue chinaSportsQueue() {
        return new Queue("china.sports.queue", false);
    }
​
    @Bean
    Binding bindingChinaNews(TopicExchange topicExchange, Queue chinaNewsQueue) {
        return BindingBuilder.bind(chinaNewsQueue()).to(topicExchange).with("China.news"); 
    }
​
    @Bean
    Binding bindingChinaSports(TopicExchange topicExchange, Queue chinaSportsQueue) {
        return BindingBuilder.bind(chinaSportsQueue()).to(topicExchange).with("China.sports.*"); // 匹配以"China.sports."开头的所有Routing Key
    }
}

生产者代码:

@Test
public void testSendTopic() {
    String exchangeName = "test.topic";
    String routingKey = "China.news"; // 使用符合Topic规则的Routing Key
    String msg = "Breaking news from China!";
    rabbitTemplate.convertAndSend(exchangeName, routingKey, msg);
}

消费者:

@Service
public class TopicReceiver {
​
    @RabbitListener(queues = "china.news.queue")
    public void receiveNews(String message) {
        System.out.println("Received news from 'China.news': " + message);
    }
​
    @RabbitListener(queues = "china.sports.queue")
    public void receiveSports(String message) {
        System.out.println("Received sports update from 'China.sports.*': " + message);
    }
}

消息转换器

Spring会把你发送的消息序列化为字节发送给MQ,接收消息的时候,还会把字节反序列化为Java对象。只不过,默认情况下Spring采用的序列化方式是JDK序列化。众所周知,JDK序列化存在下列问题:

  • 数据体积过大

  • 有安全漏洞

  • 可读性差

所以,我们就希望让消息体的体积更小、可读性更高,因此可以使用JSON方式来做序列化和反序列化。

1、引入依赖

<dependency>
    <groupId>com.fasterxml.jackson.dataformat</groupId>
    <artifactId>jackson-dataformat-xml</artifactId>
    <version>2.9.10</version>
</dependency>

2、配置Bean

@Bean
public MessageConverter jsonMessageConverter(){
    return new Jackson2JsonMessageConverter();
}

可靠性

发送者的可靠性

生产者重连

有时候由于网络波动,出现连接MQ失败情况。

我们可以通过配置开启连接失败后的重连机制:

spring:
    rabbitmq:
        connection-timeout: 1s #设置MQ的连接超时时间
            template:
                retry:
                    enabled: true #开启超时重试机制
                        inital-interval: 1000ms #失败后的初始等待时间
                        multiplier: 1 #失败后下次等待时长的倍数,下次等待时长 = initial-interval * multipler
                        max-attempts: 3 #最大重试次数
                        

注意:

  • 当网络不稳定的时候,利用重试机制可以有效的提高消息发送成功率,但是SpringAMQP的重试机制是阻塞式的重试,也就是说多次重试等待的过程中,当前线程是呗阻塞的,会影响性能。

  • 如果对于业务性能有要求的时候,建议禁用重试机制。如果一定要用,请进行适当的配置。

生产者确认

生产者确认的作用:为了让生产者知道消息是否被消费成功。

RabbitMQ的Publisher Confirm 和Publisher Return两种确认机制:

  • ConfirmCallback:当RabbitMQ成功处理消息时调用

  • ReturnCallback:当消息无法路由到任何队列时调用,例如由于交换器找不到匹配的队列,此时会返回消息及原因。

开启确认机制后,以下两种情况会返回消息被接收的ACK:

  • 消息被投递到匹配的队列

  • 持久化消息写入磁盘

注意这一种情况:

当消息被投递到MQ后,但是路由失败(没有匹配规则的队列),一样会返回ACK

实现生产者确认
  1. 配置实现:

    spring:
        rabbitmq:
            publisher-confirm-type: correlated
            publisher-returns: true
            
    #publisher-confirm-type有3中模式:
    #- none:关闭confirm机制
    #- simple:同步阻塞等待MQ的回执消息
    #- correlated:MQ异步回调方式返回回执消息

  2. import com.rabbitmq.client.*;
    ​
    import java.io.IOException;
    import java.util.HashMap;
    import java.util.Map;
    import java.util.concurrent.TimeoutException;
    import java.util.concurrent.atomic.AtomicInteger;
    ​
    public class RabbitMQProducerConfirmAndReturn {
    ​
        private static final String QUEUE_NAME = "my_queue";
        private static final String EXCHANGE_NAME = "my_exchange";
        private static final String ROUTING_KEY = "my_routing_key";
    ​
        public static void main(String[] args) throws IOException, TimeoutException {
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("localhost"); // 根据实际情况设置RabbitMQ服务器地址
            Connection connection = factory.newConnection();
            Channel channel = connection.createChannel();
    ​
            // 开启发布确认
            channel.confirmSelect();
    ​
            AtomicInteger outstandingConfirms = new AtomicInteger(0);
    ​
            // 添加ConfirmCallback
            channel.addConfirmListener((deliveryTag, multiple) -> {
                System.out.println("Confirmed delivery for message with tag: " + deliveryTag);
                outstandingConfirms.decrementAndGet();
            }, (deliveryTag, multiple) -> {
                System.out.println("Negative acknowledgement received for message with tag: " + deliveryTag);
                // 这里可以根据需要处理未确认消息的逻辑
            });
    ​
            // 添加ReturnCallback
            channel.addReturnListener((replyCode, replyText, exchange, routingKey, properties, body) -> {
                System.out.println("Returned message: " + new String(body));
                Map<String, Object> headers = properties.getHeaders();
                if (headers != null && headers.containsKey("messageId")) {
                    String messageId = (String) headers.get("messageId");
                    System.out.println("Message with ID " + messageId + " was not routed.");
                    // 这里可以添加消息未路由的重试逻辑
                }
            });
    ​
            try {
                String customMessageId = "msg-id-123";
                sendMessageWithId(channel, EXCHANGE_NAME, ROUTING_KEY, "Hello, World!".getBytes(), customMessageId);
            } finally {
                channel.close();
                connection.close();
            }
        }
    ​
        private static void sendMessageWithId(Channel channel, String exchange, String routingKey, byte[] messageBody, String messageId) throws IOException {
            Map<String, Object> headers = new HashMap<>();
            headers.put("messageId", messageId); // 自定义消息ID作为header
    ​
            BasicProperties props = new BasicProperties.Builder()
                    .deliveryMode(2) // 持久化消息
                    .headers(headers)
                    .build();
    ​
            channel.basicPublish(exchange, routingKey, props, messageBody);
            outstandingConfirms.incrementAndGet(); // 记录待确认的消息数
            System.out.println("Published message with ID " + messageId);
        }
    }

MQ的可靠性

数据持久化

默认情况,Rabbit会将收到的消息存在内存中。

RabbitMQ实现数据持久化(存放在磁盘中)有3个方面:

  • 交换机持久化

    • 声明的时候将 durable属性配置为true

  • 队列持久化

    • 声明的时候将 durable属性配置为true

  • 消息持久化

    • 设置消息属性中的deliveryMode2来标记消息为持久化

注意事项:

  • 持久化消息并不保证零丢失,因为它们在内存中排队等待写入磁盘时仍有可能因系统崩溃而丢失。

  • 持久化会增加消息发布的延迟,因为消息必须等待被写入磁盘。

  • 队列和交换器的持久化并不会自动持久化其中的消息,消息的持久化需要单独设置。

  • 如果之前声明的队列或交换器是非持久化的,需要先删除原有队列或交换器,然后重新声明为持久化版本,否则会遇到错误。

  • 为了确保消息不丢失,除了持久化之外,还需要考虑消费者确认(Ack)机制,以及可能的死信队列和重试策略。

Lazy Queue

惰性队列特征:

  1. 接收到消息直接存入磁盘,非内存(内存中只保留最近的消息,默认2048条)

  2. 消费者要消费消息才会从磁盘读取消息并加载到内存中

  3. 支持数百万条的消息存储

在3.12版本后,所有的队列都是Lazy Queue模式,无法更改。

实现方式
  1. Java代码声明中实现

    @Bean
    public Queue lazyQueue() {
        return QueueBuilder
                .durable("lazy.queue")  //持久化
                .lazy()                 //开启lazy模式
                .build();
    }

  2. 消费端实现:

    @RabbitListener(queuesToDeclare = @Queue(
                name = "lazy.queue",
                arguments = @Argument(name = "x-queue-mode", value = "lazy")))//lazy开启了消息持久化
    public void listenLazyQueue(String msg){
        log.info("接收到lazy.queue的消息:{}",msg);
    }

消费者的确认机制

  • 为了确定消费者是否成功处理消息而提出的消费者确认机制。

  • 当消费者处理消息结束后,应该向RabbitMQ发送一个回执,告知RabbitMQ自己消息处理状态:

    • ack:成功处理消息,RabbitMQ从队列中删除该消息

    • nack:消息处理失败,RabbitMQ需要再次投递消息

    • reject:消息处理失败并拒绝该消息,RabbitMQ从队列中删除该消息(消费者无法处理接收到的消息时;消费者在处理消息的过程中发生了异常)

实现:

SpringAMQP已经实现了消息确认功能,运行通过配置文件选择ACK处理方式,有3种方式:

  • none:不处理。即消息投递后,直接返回ack,不安全,不建议使用

  • manual :手动模式。需要自己在业务种调用api,发送ack或者reject,存在业务入侵,但是更灵活

  • auto:自动模式。SpringAMQP利用AOP对消息处理逻辑进行环绕增强,当业务正常执行,返回ack,出现异常

    • 业务异常,nack

    • 消息处理或校验异常,reject

spring:
    rabbitmq:
        listener:
            simple:
                prefetch: 1
                    acknowledge-mode: manual

建议一般采用manual实现。

失败重试机制

  • 问题:消费者出现异常后,消息会不断的重新入队,发送给消费者,这样无限循环,导致mq消息处理飙升。

  • 解决方案:利用Spring的retry机制,在消费者出现异常时利用本地重试,而不是无限制的requeue到mq队列

spring:
    rabbitmq:
        listener:
            simple:
                prefetch: 1
                    acknowledge-mode: auto
                retry:
                    enable: true
                    initial-interval: 1000ms #初始失败等待时长
                    multiplier: 1
                    max-attempts: 3 #最大重试次数
                    st
                    
                    atuless: true #true无状态,false有状态。如果业务中包含事务,这里改成false

如果重试次数耗尽的时候,如果消息依然失败,还有兜底的策略,可以实现MessageRecoverer接口实现:

  • RejectAndDontRequeueRecoverer:重试耗尽后,直接reject,丢弃消息(默认这种方式)

  • ImmediateRequeueMessageRecoverer:重试耗尽后,返回nack,消息重新入队

  • RepublishMessageRecoverer:重试耗尽,将失败消息发送给指定交换机(专门用来处理失败消息的,死信交换机)

RepublishMessageRecoverer的示例:

  1. 定义接收失败消息的交换机、队列,并绑定关系。此处实现略。

  2. 定义RepublishMessageRecoverer

    @ConditionalOnProperty(prefix = "spring.rabbitmq.retry",name = "enable", havingValue = "true")
    //可以在配置类上加入这个代码,只有上述条件满足的时候,配置才实现
    ​
    ​
    //这里还有配置交换机、队列、以及进行绑定
    @Bean
    public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){
        return new RepublishMessageRecoverer(rabbitTemplate, " error.direct", "error");
    }

业务幂等性

幂等:一个数学概念。f(x) = f( f(x) )。意思就是:同一个业务,执行一次或多次,对业务的影响是一致的。(比如说删除和查询)

唯一消息id

给每一消息都设置一个唯一id,利用唯一id区分是否重复消息。

  1. 每一条消息都生成一个唯一id,与消息一起投递给消费者

  2. 消费者接收到消息后处理自己的业务,业务完成将消息id保存到数据库

  3. 如果下次收到相同消息,去数据库中判断是否存储,存在则重复消息放弃处理

@Bean
public MessageConverter messageConverter(){
    //1、定义消息转换器
    Jackson2JsonMessageConverter jjmc = new Jackson2JsonMessageConverter();
    //2、配置启动自动创建消息id,用于识别不同消息,也可以在业务中基于id判断是否重复消息
    jjmc.setCreateMessageIds(true);
}

这个消息id是保存在header中的

@RabbitListener(queues = "yourQueueName")
public void listen(Message message) {
    MessageProperties messageProperties = message.getMessageProperties();
    String messageId = messageProperties.getMessageId(); // 获取消息ID
    if (messageId != null) {
        System.out.println("Received message with ID: " + messageId);
        // 进行幂等性检查或其他基于ID的处理逻辑
    }
    // 其他消息处理逻辑...
}

业务判断

结合实际业务逻辑做判断。

比如说一个订单业务,我们要防止订单状态修改后不再继续被修改,就可以对订单业务进行判断:

  • 如果订单是未支付状态,才变成已支付状态

  • 如果订单是已支付状态,状态不变

延迟消息

生产者发送消息时指定一个时间,消费者不会立刻收到消息,而是在指定消息后才收到消息。

死信交换机

死信:当队列中的消息满足下列情况之一后,就成为了死信。(dead letter)

  • 消费者使用basic.reject或basic.nack声明消费失败,并且消息的requeue参数设置为false

  • 消息是一个过期消息(达到了队列或消息本身设置的过期时间),超时无人消费。

  • 要投递的队列消息堆积满了,最早的消息可能成为死信。

在队列的dead-letter-exchange属性中指定一个A交换机,则该队列中的死信会投递到A交换机中,该A交换机就是死信交换机Dead Letter Exchange,简称DLX)。

image-20240611130025919

插件实现

官方提供了一个插件实现延迟消息,网址如下:Community Plugins | RabbitMQ

下载rabbitmq_delayed_message_exchage

1、声明延迟交换机

1)方式一

 @Bean
    public DirectExchange delayExchange(){
        return ExchangeBuilder
                .directExchange("delay.direct")
                .delayed()  //设置delay的属性为true
                .durable(true)
                .build();
    }

2)方式二

  
  @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "delay.queue", durable = "true"),
            exchange = @Exchange(value = "delay.direct",delayed = "true"), //delayed = true 开启延迟交换机
            key = "delay"
    ))
    public void listenToDelayMessage(String msg){
        log.info("接收到delay.queue的延迟消息:{}",msg);
    }

2、测试:

@Test
    void testSendDelayMessage(){
        rabbitTemplate.convertAndSend("delay.direct", "delay", "hello", new MessagePostProcessor() {
            @Override
            public Message postProcessMessage(Message message) throws AmqpException {
                //设置消息延迟时间
                message.getMessageProperties().setDelay(1000);
                return message;
            }
        });
        log.info("消息发送成功!");
    }

消息堆积

为什么会出现消息堆积这种问题呢?

RabbitMQ 消息堆积问题通常发生在生产者发送消息的速度远大于消费者处理消息的速度时,这可能导致队列中消息的累积,直至达到存储上限,进而影响系统性能甚至导致消息丢失。

解决方案

要生成解决方案的前提,我们首先要先确定对应的消息堆积产生场景,对症下药。

  • 消费者处理消息速度太慢

    • 增加消费者数量

    • 优化消费者性能,优化代码,增加资源

    • 消息预取限制(prefetch在配置文件中的配置),以避免一次处理过多消息导致处理缓慢

  • 队列容量太小

    • 增加队列容量

  • 网络故障,导致消息可能丢失,导致消息在队列中堆积

    • 监控 + 告警,确保网络故障发送时能快速发现并解决问题

    • 持久化 + 高可用:确保消息和队列持久化以避免消息丢失,并使用镜像队列提高可用性

  • 消费者故障

    • 使用死信队列:将无法处理的添加到死信队列中,避免阻塞主队列

    • 容错机制:消费者自动重复和错误处理逻辑

  • 队列配置不当

    • 优化队列配置:检测并优化消息确认模式,队列长度限制和其他相关配置

  • 消息太大了,处理时间较长

    • 消息分片:将大型消息分割成小的消息片段,加速处理

  • 业务逻辑复杂或耗时

    • 优化业务逻辑:简化消费者中的业务逻辑,减少处理每个消息所需的时间

  • 消息产生速度快于消费者速度

    • 限流

    • 负载均衡:消费者间公平分配,避免个别消费者加载

  • 其他配置优化

    • 设置消息优先级,确保优先级高的先处理

    • 配置文件描述符的限制,内存使用限制

补充

消息队列的路由模型

消息队列的路由模型指的是消息从生产者到消费者的传递路径和方式。常见的消息队列路由模型包括:

  • 点对点模型(Point-to-Point):消息被发送到一个队列中,只有一个消费者可以接收并处理该消息。

  • 发布-订阅模型(Publish-Subscribe):消息被发送到一个主题(或交换机)中,多个消费者可以订阅该主题,并且每个消费者都可以收到消息并独立处理。

  • 路由模型(Routing):消息根据特定的路由规则被发送到不同的队列中,消费者根据队列接收并处理消息。

消息队列的应用经验(使用场景)

  • 异步通信:在系统内部或者不同系统之间进行异步通信,提高系统的响应速度和吞吐量。

  • 任务调度和削峰填谷:通过消息队列进行任务调度,将请求分散到不同的时间段或者不同的处理节点,避免系统在高峰时期负载过重。

  • 分布式事务:在分布式系统中使用消息队列进行事件的发布和订阅,保证系统的一致性和可靠性。

  • 日志收集和数据分析:通过消息队列将日志数据发送到消息队列中进行集中收集和处理,方便进行数据分析和监控。

消息消费顺序性问题

很多时候,我们的MQ使用并不需要保证顺序消费,比如订单超时等。

但有些时候,业务中可能会存在多个消息需要顺序处理的情况,比如在库存更新场景中,减少库存和增加库存的通知必须按照接收顺序处理,以防止库存数量错误或超卖现象。

那这个时候我们该怎么实现我们消息消费顺序性呢?

目前的方案是:

  • 单一消费者:一个队列绑定一个消费者,采用单活模式实现顺序消费

  • 分区策略:将不同的消息进行分区(Direct),然后不同的区绑定不同的消息队列,可以加大并发

  • 排序:顺序消息ID+手动排序

这里是一个单活模式的例子:

/**
     * 创建一个 单活模式的队列
     * @param name
     * @return queue
     */
    private Queue creatQueue(String name) {
        HashMap<String, Object> args = new HashMap<>();
        // x-single-active-consumer 单活模式 队列
        // 表示是否最多只允许一个消费者消费,如果有多个消费者同时绑定,则只会激活第一个,
        // 除非第一个消费者被取消或者死亡,才会自动转到下一个消费者。
        args.put("x-single-active-consumer", true);
        return new Queue(name, true, false, false, args);
    }

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1813610.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

探索乡村振兴新模式:发挥科技创新在乡村振兴中的引领作用,构建智慧农业体系,助力美丽乡村建设

随着科技的不断进步&#xff0c;乡村振兴工作正迎来前所未有的发展机遇。科技创新作为推动社会发展的重要力量&#xff0c;在乡村振兴中发挥着越来越重要的引领作用。本文旨在探讨如何发挥科技创新在乡村振兴中的引领作用&#xff0c;通过构建智慧农业体系&#xff0c;助力美丽…

汉语翻译藏语的软件,有3款宝藏软件!

在数字化飞速发展的今天&#xff0c;语言不再是沟通的障碍。对于想要学习藏语或需要与藏区人民交流的朋友们来说&#xff0c;一款优质的汉语翻译藏语的软件无疑是一大福音。那么&#xff0c;市面上究竟有哪些值得推荐的汉语翻译藏语的软件呢&#xff1f;接下来&#xff0c;就让…

PostgreSQL 快速入门与实战

1、概述 前面2篇博客给大家详细的介绍了PostgreSQL的安装和配置&#xff0c;本篇文章就带着大家一起学习一下PostgreSQL的用法&#xff0c;主要内容包括 基本的数据库操作、用户管理、数据备份、SCHEMA(模式)以及和MySQL的区别。 2、数据库基本操作 PostgreSQL是严格遵守SQL规…

C# Winform内嵌窗体(在主窗体上显示子窗体)

在开发Winform项目中&#xff0c;经常会要切换不同的窗体。通常程序都有一个主窗体&#xff0c;在切换窗体时往往需要关闭其他子窗体&#xff0c;这个实例就来介绍MDI主窗体内嵌子窗体的实现方法。 MDI主窗体要设置一个比较重要的属性&#xff0c;IsMdiContainertrue。子窗体的…

boost asio异步服务器(3)增加发送队列实现全双工通信

增加发送节点 构造发送节点&#xff0c;管理发送数据。发送节点的类如下。 这个发送节点用于保证发送和接收数据的有效性。 增加发送队列 前边实现的是一个简单的echo服务器&#xff0c;也就是服务器将收到的内容发送给对应的客户端。但是在实际的服务器设计中&#xff0c;服务…

苹果WWDC 2024 带来的 AI 风暴:从生产力工具到个人助理,AI 将如何融入我们的生活?

2024年6月5日&#xff0c;苹果WWDC 2024全球开发者大会如约而至&#xff0c;带来了众多令人兴奋的新功能和新产品。其中&#xff0c;AI 技术的全面融入无疑是最引人注目的亮点。从 iOS、iPadOS 到 macOS&#xff0c;再到 Siri 和开发者工具&#xff0c;苹果正在将 AI 融入到其生…

数字孪生技术推动希腊水务系统的技术进步

OpenFlows 提供的数字孪生技术将科扎尼供水渗漏的响应时间缩短了 50% 引领希腊供水管理改革 新冠疫情之后&#xff0c;希腊制定国家经济复苏计划&#xff0c;旨在推动能源改革、数字化和现代化&#xff0c;作为计划的一部分&#xff0c;希腊正试图实现可持续的给排水管理&…

Qt | openSSL将TCP数据进行不对称(RSA)加密传输-windows平台实操(可行)

01、windows平台工具准备 QtQt5.14.2openSSL下载(选择适合自己的版本即可)https://slproweb.com/products/Win32OpenSSL.htmlTCP调试助手调试助手02、简介 首先简单介绍一下openssl。接着描述如何在windo

龙芯+RT-Thread+LVGL实战笔记(36)——密码锁完善

【写在前面】不知不觉中,又临近学期末了。这个学期,因为一些特殊原因,一直没怎么更新本教程,而且不得已上调了本教程的价格,在此笔者深表歉意。另一方面,自己带的学生发挥不佳,很遗憾未能闯进国赛,为此笔者也郁闷了相当长一段时间。事已至此,也只能慢慢释然,来年再战…

AI网络爬虫:批量爬取AI导航网站Futurepedia数据

Futurepedia致力于使AI技术对各行各业的专业人士更加可理解和实用&#xff0c;提供全面的AI网站和工具目录、易于遵循的指南、每周新闻通讯和信息丰富的YouTube频道&#xff0c;简化AI在专业实践中的整合。如何把Futurepedia上的全部AI网站数据爬取下来呢&#xff1f; 网站一页…

[大模型]LLaMA3-8B-Instruct langchain 接入

环境准备 在 Autodl 平台中租赁一个 3090 等 24G 显存的显卡机器&#xff0c;如下图所示镜像选择 PyTorch-->2.1.0-->3.10(ubuntu22.04)-->12.1 接下来打开刚刚租用服务器的 JupyterLab&#xff0c;并且打开其中的终端开始环境配置、模型下载和运行演示。 pip 换…

一款优秀的下载和共享工具

一、简介 1、它以舒适和快速的方式下载Internet文件&#xff0c;同时支持断点续传和嗅探视频音频的功能。 它具有站点抓取、批量下载队列和计划任务下载等功能&#xff0c;可以接管所有浏览器的下载任务&#xff0c;包括Edge&#xff0c;Firefox和Chrome等主流浏览器。 对于用…

MAC认证

简介 MAC认证是一种基于接口和MAC地址对用户的网络访问权限进行控制的认证方法&#xff0c;它不需要用户安装任何客户端软件。设备在启动了MAC认证的接口上首次检测到用户的MAC地址以后&#xff0c;即启动对该用户的认证操作。认证过程中&#xff0c;不需要用户手动输入用户名…

基于构件开发模型-系统架构师(八)

1、把应用程序中应用最频繁的那部分核心程序作为评价计算机性能的标准程序&#xff0c;称为&#xff08;&#xff09;程序。 A仿真测试 B核心测试 C基准测试 D标准测试 解析&#xff1a; 系统测试最核心的部分内容&#xff0c;基准测试。 2、运用信息技术进行知识的挖掘和…

修改注册表默认端口号;telnet端口号失败、不通、没反应;访问另一机器端口不通

背景&#xff1a;在多集群项目中&#xff0c;发现访问其他机器不通。遂使用telnet命令试试&#xff0c;确实端口不通。也查看了防火墙策略等&#xff0c;最后尝试了修改注册表默认端口号。这样端口可通了。但并未实际解决问题&#xff0c;在实际项目中需要确认一下你实际项目中…

使用 FormCreate 快速创建仿真页面

在现代前端开发中&#xff0c;快速创建和迭代仿真页面是提高开发效率和用户体验的关键。FormCreate 是一个强大的工具&#xff0c;它通过 JSON 生成具有动态渲染、数据收集、验证和提交功能的表单组件&#xff0c;支持多种 UI 框架。本文将介绍如何使用 FormCreate 快速创建一个…

C++初阶学习第六弹——探索STL奥秘(一)——标准库中的string类

前言&#xff1a; 在前面&#xff0c;我们学习了C的类与对象&#xff0c;认识到了C与C语言的一些不同&#xff0c;今天&#xff0c;我们将进入C的 关键部分——STL&#xff0c;学习完这部分之后&#xff0c;我们就可以清楚的认识到C相比于C语言的快捷与便利 目录 一、为什么有s…

Docker笔记-Debian容器内搭建ssh服务

登陆容器之后修改密码&#xff1a; passwd 密码设置完成后安装openssh-server apt-get install openssh-server 修改端口号为50022并添加配置 vim /etc/ssh/sshd_config 修改成 Port 50022 PasswordAuthentication yes PermitRootLogin yes 启动 rootlinux:~# /etc/in…

C#完整服务器

控件&#xff1a;三个按钮&#xff0c;输入框&#xff0c;文件框(richTextBox) 打开服务器按钮方法 Socket socket;// 服务器对象Dictionary<string,Socket> dic new Dictionary<string,Socket>();// 存储客户端对象// 打开服务器private void button1_Click(obje…

优思学院|用ChatGPT快速完成数据分析图表【柏累托图法】

数据分析是很多行业的人不可少的一部分&#xff0c;尤其是质量工程师更是日常的工作。然而&#xff0c;随着科技的进步&#xff0c;人工智能&#xff08;AI&#xff09;将逐渐承担起数据计算的工作&#xff0c;这意味着未来的质量工程师需要具备的不仅仅是计算能力&#xff0c;…