RabbitMQ保证消息的可靠投递,Java实现RabbitMQ消息的可靠投递,Springboot实现RabbitMQ消息的可靠投递

news2025/1/13 9:00:20

文章目录

  • 一、RabbitMQ消息可靠性概述
    • 1、引出问题
    • 2、RabbitMQ消息可靠性保证的四个环节
  • 二、保证生产者消息发送到RabbitMQ服务器
    • 1、服务端确认:Transaction模式
      • (1)JavaAPI
      • (2)springbootAPI
    • 2、服务端确认:Confirm模式
      • (1)JavaAPI
      • (2)springbootAPI
  • 三、保证消息从交换机路由到队列
    • 1、消息回发
      • (1)JavaAPI
      • (2)SpringbootAPI
    • 2、消息路由到备份交换机
      • (1)JavaAPI
  • 四、保证消息在队列中可靠存储
    • 1、队列持久化
    • 2、交换机持久化
    • 3、消息持久化
    • 4、搭建集群
  • 五、保证消费者成功消费消息
    • 1、自动ACK
    • 2、手动ACK
  • 六、保证消息一致之消费者回调
    • 1、调用生产者API
    • 2、发送响应消息给生产者
  • 七、保证消息一致性之补偿机制
    • 1、消息结果查证
    • 2、消息重发
    • 3、消息幂等性
    • 4、最终一致性
  • 八、消息的顺序性

一、RabbitMQ消息可靠性概述

1、引出问题

我们先看一串代码,并思考一下为什么要先入库然后发MQ:

public int add(Merchant merchant) {
    int k = merchantMapper.add(merchant);
    System.out.println("aaa : "+merchant.getId());
    JSONObject title = new JSONObject();
    String jsonBody = JSONObject.toJSONString(merchant);
    title.put("type","add");
    title.put("desc","新增商户");
    title.put("content",jsonBody);
    rabbitTemplate.convertAndSend(topicExchange,topicRoutingKey, title.toJSONString());
    return k;
}

如果先发MQ的话,如果入库失败,就会导致MQ消息无法回滚了。今天我们就好好聊一聊RabbitMQ消息可靠投递的问题。

2、RabbitMQ消息可靠性保证的四个环节

在这里插入图片描述
① 消息从生产者发送到Broker
生产者把消息发送到Broker之后,如何知道自己的消息有没有被Broker成功接收?如果Broker不给应答,生产者发送的消息也无法知道成功还是失败。

② 消息从Exchange路由到Queue
Exchange交换机维护一个与Queue的绑定列表,它的职责是分发消息。如果交换机处理消息的分发出现问题怎么办?

③ 消息存储在Queue
RabbitMQ的队列有自己的数据库(Mnesia),它是真正用来存储消息的。如果还没有消费者来消费,消息要一直存储在队列中。队列中的消息有没有丢失的可能?怎么保证消息在队列中稳定的存储呢?

④ 消费者订阅Queue并消费消息
队列是先进先出的,消息投递给消费者是一条一条投递的,如何能保证消费者正确地消费了消息?

二、保证生产者消息发送到RabbitMQ服务器

生产者发送RabbitMQ消息时,如果遇到了网络问题或者Broker的问题(硬盘故障、磁盘写满等),就会导致消息发送失败,生产者无法确定Broker是否正确地接收到消息。

在RabbitMQ中提供了两种生产者确认机制,也就是说生产者发送消息给RabbitMQ时,服务端会通过某种方式返回一个应答,只要生产者收到了这个应答,就知道消息发送成功了。

1、服务端确认:Transaction模式

在事务模式里面,只有收到了服务端的Commit-OK指令,才能提交成功。所以可以解决生产者和服务端确认的问题。但是事务模式有一个缺点,它是阻塞的,一条消息没有发送完毕,不能发送下一条消息,它会榨干RabbitMQ服务器的性能。所以不建议在生产环境使用。
在这里插入图片描述

(1)JavaAPI

import com.gupaoedu.util.ResourceUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

/**
 * 消息生产者,测试事务模式。发送消息的效率比较低,建议使用Confirm模式
 * 参考文章:https://www.cnblogs.com/vipstone/p/9350075.html
 */
public class TransactionProducer {
    private final static String QUEUE_NAME = "ORIGIN_QUEUE";

    public static void main(String[] args) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setUri(ResourceUtil.getKey("rabbitmq.uri"));

        // 建立连接
        Connection conn = factory.newConnection();
        // 创建消息通道
        Channel channel = conn.createChannel();

        String msg = "Hello world, Rabbit MQ";
        // 声明队列(默认交换机AMQP default,Direct)
        // String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        try {
        	// 开启事务
            channel.txSelect();
            // 发送消息,发布了4条,但只确认了3条
            // String exchange, String routingKey, BasicProperties props, byte[] body
            channel.basicPublish("", QUEUE_NAME, null, (msg).getBytes());
            channel.txCommit(); // 提交
            channel.basicPublish("", QUEUE_NAME, null, (msg).getBytes());
            channel.txCommit();
            channel.basicPublish("", QUEUE_NAME, null, (msg).getBytes());
            channel.txCommit();
            channel.basicPublish("", QUEUE_NAME, null, (msg).getBytes());
            int i =1/0;
            channel.txCommit();
            channel.basicPublish("", QUEUE_NAME, null, (msg).getBytes());
            channel.txCommit();
            System.out.println("消息发送成功");
        } catch (Exception e) {
            channel.txRollback(); // 回滚
            System.out.println("消息已经回滚");
        }

        channel.close();
        conn.close();
    }
}

(2)springbootAPI

rabbitTemplate.setChannelTransacted(true);

2、服务端确认:Confirm模式

Confirm模式有三种,单个确认、批量确认、异步确认。

在这里插入图片描述

(1)JavaAPI

import com.gupaoedu.util.ResourceUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

/**
 * 普通确认模式
 */
public class NormalConfirmProducer {

    private final static String QUEUE_NAME = "ORIGIN_QUEUE";

    public static void main(String[] args) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setUri(ResourceUtil.getKey("rabbitmq.uri"));

        // 建立连接
        Connection conn = factory.newConnection();
        // 创建消息通道
        Channel channel = conn.createChannel();

        String msg = "Hello world, Rabbit MQ ,Normal Confirm";
        // 声明队列(默认交换机AMQP default,Direct)
        // String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        // 开启发送方确认模式
        channel.confirmSelect();

        channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
        // 普通Confirm,发送一条,确认一条
        if (channel.waitForConfirms()) {
            System.out.println("消息发送成功" );
        }else{
            System.out.println("消息发送失败");
        }

        channel.close();
        conn.close();
    }
}

import com.gupaoedu.util.ResourceUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

/**
 * 消息生产者,测试批量Confirm模式
 */
public class BatchConfirmProducer {
    private final static String QUEUE_NAME = "ORIGIN_QUEUE";

    public static void main(String[] args) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setUri(ResourceUtil.getKey("rabbitmq.uri"));

        // 建立连接
        Connection conn = factory.newConnection();
        // 创建消息通道
        Channel channel = conn.createChannel();

        String msg = "Hello world, Rabbit MQ ,Batch Confirm";
        // 声明队列(默认交换机AMQP default,Direct)
        // String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        try {
            // 开启confirm模式
            channel.confirmSelect();
            for (int i = 0; i < 5; i++) {
                // 发送消息
                // String exchange, String routingKey, BasicProperties props, byte[] body
                channel.basicPublish("", QUEUE_NAME, null, (msg +"-"+ i).getBytes());
            }
            // 批量确认结果,ACK如果是Multiple=True,代表ACK里面的Delivery-Tag之前的消息都被确认了
            // 比如5条消息可能只收到1个ACK,也可能收到2个(抓包才看得到)
            // 直到所有信息都发布,只要有一个未被Broker确认就会IOException
            channel.waitForConfirmsOrDie();
            System.out.println("消息发送完毕,批量确认成功");
        } catch (Exception e) {
            // 发生异常,可能需要对所有消息进行重发
            e.printStackTrace();
        }

        channel.close();
        conn.close();
    }
}

import com.gupaoedu.util.ResourceUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConfirmListener;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.Collections;
import java.util.SortedSet;
import java.util.TreeSet;

/**
 * 消息生产者,测试异步Confirm模式
 *  参考文章:https://www.cnblogs.com/vipstone/p/9350075.html
 */
public class AsyncConfirmProducer {
    private final static String QUEUE_NAME = "ORIGIN_QUEUE";

    public static void main(String[] args) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setUri(ResourceUtil.getKey("rabbitmq.uri"));

        // 建立连接
        Connection conn = factory.newConnection();
        // 创建消息通道
        Channel channel = conn.createChannel();

        String msg = "Hello world, Rabbit MQ, Async Confirm";
        // 声明队列(默认交换机AMQP default,Direct)
        // String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        // 用来维护未确认消息的deliveryTag
        final SortedSet<Long> confirmSet = Collections.synchronizedSortedSet(new TreeSet<Long>());

        // 这里不会打印所有响应的ACK;ACK可能有多个,有可能一次确认多条,也有可能一次确认一条
        // 异步监听确认和未确认的消息
        // 如果要重复运行,先停掉之前的生产者,清空队列
        channel.addConfirmListener(new ConfirmListener() {
            public void handleNack(long deliveryTag, boolean multiple) throws IOException {
                System.out.println("Broker未确认消息,标识:" + deliveryTag);
                if (multiple) {
                    // headSet表示后面参数之前的所有元素,全部删除
                    confirmSet.headSet(deliveryTag + 1L).clear();
                } else {
                    confirmSet.remove(deliveryTag);
                }
                // 这里添加重发的方法
            }
            public void handleAck(long deliveryTag, boolean multiple) throws IOException {
                // 如果true表示批量执行了deliveryTag这个值以前(小于deliveryTag的)的所有消息,如果为false的话表示单条确认
                System.out.println(String.format("Broker已确认消息,标识:%d,多个消息:%b", deliveryTag, multiple));
                if (multiple) {
                    // headSet表示后面参数之前的所有元素,全部删除
                    confirmSet.headSet(deliveryTag + 1L).clear();
                } else {
                    // 只移除一个元素
                    confirmSet.remove(deliveryTag);
                }
                System.out.println("未确认的消息:"+confirmSet);
            }
        });

        // 开启发送方确认模式
        channel.confirmSelect();
        for (int i = 0; i < 10; i++) {
            long nextSeqNo = channel.getNextPublishSeqNo();
            // 发送消息
            // String exchange, String routingKey, BasicProperties props, byte[] body
            channel.basicPublish("", QUEUE_NAME, null, (msg +"-"+ i).getBytes());
            confirmSet.add(nextSeqNo);
        }
        System.out.println("所有消息:"+confirmSet);

        // 这里注释掉的原因是如果先关闭了,可能收不到后面的ACK
        //channel.close();
        //conn.close();
    }
}

(2)springbootAPI

Confirm模式是在Channel上开启的,RabbitTemplate对Channel进行了封装。

rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        System.out.println("--------收到服务端异步确认--------");
        System.out.println("received ack: "+ack);
        System.out.println("cause: "+cause);
        System.out.println("correlationId: "+correlationData.getId());
        if (!ack) {
            System.out.println("发送消息失败:" + cause);
            throw new RuntimeException("发送异常:" + cause);
            // 做一些回滚、重试、记录处理
        } else {
			System.out.println("消息确认成功");
		}
    }
});

三、保证消息从交换机路由到队列

如果routingkey错误,或者队列不存在(但是生产环境基本不会出现这么低级的错误),就会导致消息从交换机无法路由到队列。

我们有两种方式处理无法路由的消息,一种是服务端回发给生产者,一种是让交换机路由到另一个备份的交换机。

1、消息回发

(1)JavaAPI

channel.addReturnListener(new ReturnListener() {
	   public void handleReturn(int replyCode,
	                            String replyText,
	                            String exchange,
	                            String routingKey,
	                            AMQP.BasicProperties properties,
	                            byte[] body)
	           throws IOException {
	       System.out.println("=========监听器收到了无法路由,被返回的消息============");
	       System.out.println("replyText:"+replyText);
	       System.out.println("exchange:"+exchange);
	       System.out.println("routingKey:"+routingKey);
	       System.out.println("message:"+new String(body));
	   }
});

(2)SpringbootAPI

Springboot消息回发是使用mandatory参数和ReturnListener(在SPringAMQP中是ReturnCallback)

// 为RabbitTemplate设置ReturnCallback
rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
    @Override
    public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
        try {
            System.out.println("--------收到无法路由回发的消息--------");
            System.out.println("replyCode:" + replyCode);
            System.out.println("replyText:" + replyText);
            System.out.println("exchange:" + exchange);
            System.out.println("routingKey:" + routingKey);
            System.out.println("properties:" + message.getMessageProperties());
            System.out.println("body:" + new String(message.getBody(), "UTF-8"));
        } catch (UnsupportedEncodingException e) {
            e.printStackTrace();
        }
    }
});

2、消息路由到备份交换机

在创建交换机时,从属性中指定备份交换机。

(1)JavaAPI

AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder().deliveryMode(2).
        contentEncoding("UTF-8").build();

// 备份交换机
channel.exchangeDeclare("ALTERNATE_EXCHANGE","topic", false, false, false, null);
channel.queueDeclare("ALTERNATE_QUEUE", false, false, false, null);
channel.queueBind("ALTERNATE_QUEUE","ALTERNATE_EXCHANGE","#");

// 在声明交换机的时候指定备份交换机
Map<String,Object> arguments = new HashMap<String,Object>();
arguments.put("alternate-exchange","ALTERNATE_EXCHANGE");
channel.exchangeDeclare("TEST_EXCHANGE","topic", false, false, false, arguments);


// 发送到了默认的交换机上,由于没有任何队列使用这个关键字跟交换机绑定,所以会被退回
// 第三个参数是设置的mandatory,如果mandatory是false,消息也会被直接丢弃
channel.basicPublish("TEST_EXCHANGE","error.routingKey",true, properties,"只为更好的你".getBytes());

四、保证消息在队列中可靠存储

如果消息一直没有被消费者消费,消息会一直存储在队列中,并且队列中的数据存储在数据库中。

如果RabbitMQ服务或者硬件发生了故障,比如系统宕机、重启、关闭等等,可能会导致内存中的消息丢失,所以我们要把消息本身和元数据(队列、交换机、绑定)都保存到磁盘中。

1、队列持久化

声明队列时可以设置几个重要的参数:

// 声明队列
// String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
channel.queueDeclare(QUEUE_NAME, false, false, false, null);

// springboot中
@Bean("reliableQueue")
public Queue queue() {
	// public Queue(String name, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)
    return new Queue("RELIABLE_QUEUE", true, false, false, new HashMap<>());
}

参数解析:
durable:没有持久化的队列,保存在内存中,服务重启后队列和消息都会丢失。设置为true可以保证消息持久化
exclusive:排他性队列的特点是:只对首次声明它的连接(Connection)可见;会在其连接断开的时候自动删除。
autoDelete:没有消费者连接的时候,自动删除。

2、交换机持久化

声明交换机时可以设置这几个重要的参数,和队列类似:

// 声明交换机
// String exchange, String type, boolean durable, boolean autoDelete, Map<String, Object> arguments
channel.exchangeDeclare(EXCHANGE_NAME,"direct",false, false, null);

// Springboot中
@Bean("directExchange")
public DirectExchange exchange() {
	// public DirectExchange(String name, boolean durable, boolean autoDelete, Map<String, Object> arguments)
    return new DirectExchange("RELIABLE_EXCHANGE", true, false, new HashMap<>());
}

3、消息持久化

在生产者发送消息时,可以指定消息的配置:

Map<String, Object> headers = new HashMap<String, Object>();
headers.put("name", "gupao");
headers.put("level", "top");
AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
        .deliveryMode(2)   // 2代表持久化
        .contentEncoding("UTF-8")  // 编码
        .expiration("10000")  // TTL,过期时间
        .headers(headers) // 自定义属性
        .priority(5) // 优先级,默认为5,配合队列的 x-max-priority 属性使用
        .messageId(String.valueOf(UUID.randomUUID()))
        .build();
channel.basicPublish("", QUEUE_NAME, properties, msg.getBytes());

// springboot中
MessageProperties messageProperties = new MessageProperties();
messageProperties.setDeliveryMode(MessageDeliveryMode.PERSISTENT);
messageProperties.setContentType("UTF-8");
Message message = new Message("一条正常的消息".getBytes(), messageProperties);

4、搭建集群

如果只有一个RabbitMQ节点,即使交换机、队列、消息做了持久化,如果服务崩溃或者硬件故障,RabbitMQ的服务一样是不可以的。

所以为了提高MQ服务的可用性,保证消息的传输,我们需要有多个RabbitMQ的节点。

RabbitMQ集群搭建与高可用实现

五、保证消费者成功消费消息

如果消费者收到消息后没来得及处理或者发生了一场,就会导致消费失败。RabbitMQ提供了消费者的消息确认机制(message acknowledgement),消费者可以自动或者手动地发送ACK给服务端。

如果消费者拿到消息没有ACK会怎么样?
没有收到ACK的消息,消费者断开连接之后,RabbitMQ会把这条消息发送给其他消费者。如果没有其他消费者,消费者重启后会重新消费这条消息,重复执行业务逻辑。

1、自动ACK

默认就是自动ACK。消费者收到消息的时候就会自动发送ACK,而不是方法执行成功的时候发送ACK。这种情况RabbitMQ只会关心消费者是否接收到了消息,对消息的处理结果是不关心的,所以通常来说我们会选择手动ACK。

// 创建消费者
Consumer consumer = new DefaultConsumer(channel) {
    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
                               byte[] body) throws IOException {
        String msg = new String(body, "UTF-8");
        System.out.println("Received message : '" + msg + "'");
        System.out.println("consumerTag : " + consumerTag );
        System.out.println("deliveryTag : " + envelope.getDeliveryTag() );
    }
};
// 开始获取消息 autoAck参数为true代表自动ACK
// String queue, boolean autoAck, Consumer callback
channel.basicConsume(QUEUE_NAME, true, consumer);

同样的,在Springboot中如果没有特殊配置,默认的就是自动ACK。

2、手动ACK

// 创建消费者,并接收消息
Consumer consumer = new DefaultConsumer(channel) {
    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
                               byte[] body) throws IOException {
        String msg = new String(body, "UTF-8");
        System.out.println("Received message : '" + msg + "'");

        if (msg.contains("拒收")){
            // 拒绝消息
            // requeue:是否重新入队列,true:是,会再发给其他消费者;false:直接丢弃,相当于告诉队列可以直接删除掉
            // TODO 如果只有这一个消费者,requeue 为true 的时候会造成消息重复消费
            // basicReject(long deliveryTag, boolean requeue)
            channel.basicReject(envelope.getDeliveryTag(), false);
        } else if (msg.contains("异常")){
            // 批量拒绝
            // requeue:是否重新入队列
            // TODO 如果只有这一个消费者,requeue 为true 的时候会造成消息重复消费
            // basicNack(long deliveryTag, boolean multiple, boolean requeue)
            channel.basicNack(envelope.getDeliveryTag(), true, false);
        } else {
            // 手工应答
            // 如果不应答,队列中的消息会一直存在,重新连接的时候会重复消费
            // basicAck(long deliveryTag, boolean multiple)
            channel.basicAck(envelope.getDeliveryTag(), true);
        }
    }
};

// 开始获取消息,注意这里开启了手工应答
// String queue, boolean autoAck, Consumer callback
channel.basicConsume(QUEUE_NAME, false, consumer);

在Springboot中,可以这样配置应答方式:

spring.rabbitmq.listener.direct.acknowledge-mode=manual
spring.rabbitmq.listener.simple.acknowledge-mode=manual

或者在SimpleMessageListenerContainer或者SimpleRabbitListenerContainerFactory中这样设置:

factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);

消费者中获取到Channel对象,就可以进行应答了:

@Component
@PropertySource("classpath:mq.properties")
@RabbitListener(queues = "${com.queue}", containerFactory="rabbitListenerContainerFactory")
public class SecondConsumer {
    @RabbitHandler
    public void process(String msgContent,Channel channel, Message message) throws IOException {
        System.out.println("Second Queue received msg : " + msgContent );
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    }
}

注意这个枚举值:
NONE:自动ACK
MANUAL:手动ACK
AUTO:如果方法未抛出异常,则发送ACK。如果方法抛出异常,并且不是AmqpRejectAndDontRequeueException则发送nack,并且重新入队。如果抛出异常是AmqpRejectAndDontRequeueException则发送nack并不会重新入队。

注意!拒绝消息时如果requeue参数设置为true,可以把这条消息重新存入队列,以便发给下一个消费者处理,如果只有一个消费者时,这种方式可能会出现无限循环重复消费的情况。

六、保证消息一致之消费者回调

消费者成功消费了消息之后,如何告知生产者该消息已经被成功消费了?

虽然说使用MQ的目的之一是解耦,但是某些一致性要求很高的场景,比如说金融业务,还是很有必要通知生产者的。

1、调用生产者API

例如,提单系统给其他系统发送了保险消息后,其他系统必须在处理完消息之后调用提单系统提供的API,来修改提单系统中这笔数据的状态。只要API没有被调用,数据状态没有被修改,提单系统就认为下游系统没有收到这条消息。

但是这种方式又从解耦的状态变成了耦合状态,还是需要根据实际情况来判断是否要采用这种方式。

2、发送响应消息给生产者

例如商业银行与人民银行二代支付系统(使用IBM MQ),无论是人行收到了商业银行的消息,还是商业银行收到了人行的消息,都必须发送一条响应消息(叫做回执报文)。

整个通信的流程设计非常复杂,但是对于金融场景下的消息可靠性保证,是很有用的。
在这里插入图片描述

七、保证消息一致性之补偿机制

如果消费者消费了消息之后,一直迟迟没有通知给生产者,我们如何知道消费者已经成功消费了消息?
就像是银行A给银行B发起了一笔转账,银行B一直没有给银行A回调,此时我们如何确定银行B确定是已经处理了该消息?

此时生产者与消费者之间应该约定一个超时时间,对于超出这个时间没有得到响应的消息,才确定为消费失败,比如说5分钟。

1、消息结果查证

假如说生产者5分钟内没有收到消费者消费成功的回调,生产者可以主动发起一次结果查证,通过业务要素或者唯一流水号,查证该业务在消费者是否正常消费。

2、消息重发

假如说消息一直没有结果,就需要考虑消息重发了。

可以启动一个定时任务,比如30秒跑一次,查询业务表业务状态是中间状态的记录,查询出来,构建MQ消息,重新发送。也可以单独设计一张消息表,把本系统所有发送出去的消息全部异步登记,状态是未回复的消息,进行重发(这种方式会对数据库性能造成一定损耗)。

重发机制也不可能一直重复发,如果消费者确实是有bug或者其他问题,如果一直重复发送会导致死循环了。我们可以设置一个衰减机制,第一次间隔一分钟,第二次间隔2分钟,最终发送三次,三次过后如果还没有收到回复,就需要将消息设置为特殊状态,进行人工干预。

3、消息幂等性

消息重发就意味着要处理消息的幂等。

什么是服务的幂等?为什么要实现幂等?
接口的幂等性——详细谈谈接口的幂等即解决方案

4、最终一致性

在一些一致性很强的接口调用中,比如说转账操作,通常会在一天的业务结束之后,第二天营业之前,生产者和消费者之间会进行一次消息对账。生成一个对账文件,两者分别解析对方的文件进行对账,如果确实有消息不一致的情况,会通过短信或者邮件的方式通知业务人员进行手动处理,要么把钱退回,要么把钱补上。

八、消息的顺序性

在RabbitMQ中,一个队列有多个消费者时,由于不同的消费者消费消息的速度是不一样的,顺序无法保证。只有一个队列仅有一个消费者的情况才能保证顺序消费(不同的业务消息发送到不同的专用队列)。

除非负载的场景,不要用多个消费者消费消息,可以保证消息的顺序性。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/693762.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【Matlab】根据伯德图计算pid参数方法原理

在学习鲁棒控制的过程中&#xff0c;有一些步骤需要根据一些性能参数来计算pid参数&#xff0c;因此记录一下根据伯德图的性能来计算pid参数的原理。 系统开环响应的几个关键参数 在使用开环响应初调控制器参数时&#xff0c;主要就是调整几个需要注意的关键参数&#xff0c;…

nbcio-vue中formdesigner的组件显示不正常的处理

今天看演示系统的formdesigner组件显示不正常&#xff0c;也不知道是什么时候开始的事情&#xff0c; 如下&#xff1a; 对组件的操作倒是正常&#xff0c;但看本地是正常的&#xff0c;如下&#xff1a; 开始也不知道是什么原因&#xff0c;看代码也是一样的&#xff0c;应该…

11 MFC 制作记事本

文章目录 界面制作制作菜单设置编译框随着窗口的变化而变化OnSize打开文件文件另存为设置字体颜色修改字体文件的查找与替换查找与替换对话框显示&#xff08;非模态对话框&#xff09;对话框消息与对话框处理函数 全部代码 界面制作 制作菜单 选择Menu 点击新建 将内容写入&qu…

Nightingle夜莺Docker版SNMP监控

起因 对夜莺很感兴趣&#xff0c;想使用一下。我看官方提供了v6版本的docker-compose。而且我之前有使用过promtheus和grafana&#xff0c;虽然很好但是总觉得还是得二开。总有一天有人去搞一个不错的玩意儿出来。官方文档地址 安装与配置 直接运行docker版本的demo&#xf…

mysql内部结构和InnoDB底层原理

一、mysql内部结构 mysql总体上分为客户端、Server层、引擎层&#xff0c;具体如下图&#xff1a; 1、连接器 一般客户端通过jdbc、navicat等工具发送请求连接到mysql服务端&#xff0c;完成TCP三次握手后&#xff0c;连接器就开始认证身份&#xff0c;如果身份认证成功&…

数据结构-串、数组和广义表

数据结构之串、数组和广义表 串的定义一、串的顺序存储结构1.1、串的链式存储结构1.2、串的模式匹配算法1.2.1、Brute-Force简称为BF算法1.2.2、KMP算法 数组的定义2.1、数组的顺序存储结构2.2、数组的特点&#xff1a;结构固定-----维数和维界不变2.3、特殊矩阵的压缩存储 广义…

密码学—Kasiski测试法Python程序

Kasiski Kasiski是辅助破解Vigenere的前提工作&#xff0c;Kasiski是猜测加密者使用Vigenere密码体系的密钥的长度&#xff0c;Kasiski只是猜测长度而已&#xff0c;所以说是辅助破解Vigenere 若密文中出现两个相同的密文段(密文段的长度m>2)&#xff0c;则它们对应的明文&…

leetcode第66题:加一

题目 这是一道简单的小题&#xff0c;自己却也没写出来。。。逆序遍历数组digits&#xff0c;用carry标记当前元素是否需要进位&#xff08;0不要&#xff0c;1要&#xff09;。 若carry1&#xff0c;则当前元素要么置0&#xff0c;要么自加1。自加1之后&#xff0c;再也不需要…

【深入了解Spring Cloud Alibaba Nacos:服务注册和配置中心】—— 每天一点小知识

&#x1f4a7; 深入了解 S p r i n g C l o u d A l i b a b a N a c o s &#xff1a;服务注册和配置中心 \color{#FF1493}{深入了解Spring Cloud Alibaba Nacos&#xff1a;服务注册和配置中心} 深入了解SpringCloudAlibabaNacos&#xff1a;服务注册和配置中心&#x1f4a7;…

深入浅出解析LoRA完整核心基础知识 | 【算法兵器谱】

Rocky Ding 公众号&#xff1a;WeThinkIn 写在前面 【算法兵器谱】栏目专注分享AI行业中的前沿/经典/必备的模型&论文&#xff0c;并对具备划时代意义的模型&论文进行全方位系统的解析&#xff0c;比如Rocky之前出品的爆款文章Make YOLO Great Again系列。也欢迎大家提…

让Ai帮我们画个粽子,它会画成什么样呢?

让Ai帮我们画个粽子&#xff0c;它会画成什么样呢&#xff1f; 本文目录&#xff1a; 一、Ai绘图技术的现状 二、看看Ai理解的粽子是怎样的 2.1、基础粽子 2.2、生成不同风格的粽子 2.2.1、真实风格的粽子 2.2.2、插图风格的粽子 2.2.3、3D风格的粽子 2.2.4、卡通风格…

Mysql锁机制介绍

Mysql锁机制 锁是计算机协调多个进程或线程并发访问某一资源的机制。 在数据库中&#xff0c;除传统的计算资源(如CPU、RAM、I/O等)的争用以外&#xff0c;数据也是一种供许多用户共享的资源。如何保证数据并发访问的一致性、有效性是所有数据库必须解决的一个问题&#xff0…

vue-cli笔记

vue的生命周期&#xff1a; 借鉴react 钩子函数&#xff1a; change() 挂载完毕&#xff0c;vue完成模板解析&#xff0c;并把初始的真实的dom元素放入到页面后执行 beforeCreate() {// 数据代理和数据监测创建之前console.log(beforeCreate) }, created() {console.l…

深度:全面解析数据智能的金融“炼金术”!

‍数据智能产业创新服务媒体 ——聚焦数智 改变商业 金融以其财富效应&#xff0c;成为最新科技的试金石。一项新技术出来后&#xff0c;人们首先闪过的念头就是“能不能用它赚钱”。例如&#xff0c;ChatGPT带火了大模型&#xff0c;人们也开始将目标聚焦到大模型在金融领域的…

【实战】 JWT、用户认证与异步请求(下) —— React17+React Hook+TS4 最佳实践,仿 Jira 企业级项目(五)

文章目录 一、项目起航&#xff1a;项目初始化与配置二、React 与 Hook 应用&#xff1a;实现项目列表三、TS 应用&#xff1a;JS神助攻 - 强类型四、JWT、用户认证与异步请求1~56.用useAuth切换登录与非登录状态7.用fetch抽象通用HTTP请求方法&#xff0c;增强通用性8.用useHt…

AR宇航员互动体验软件:虚拟与现实叠加增强体验感

随着科技的不断发展&#xff0c;人们对太空探索的兴趣和热情也越来越高涨。为了满足人们对太空探索的渴望&#xff0c;广州华锐互动研发了宇航员AR模拟体验软件&#xff0c;这种软件可以让用户身临其境地体验太空探索的过程&#xff0c;提供一种全新的、令人兴奋的太空探索新体…

css基础知识十一:CSS3新增了哪些新特性?

一、是什么 css&#xff0c;即层叠样式表&#xff08;Cascading Style Sheets&#xff09;的简称&#xff0c;是一种标记语言&#xff0c;由浏览器解释执行用来使页面变得更为美观 css3是css的最新标准&#xff0c;是向后兼容的&#xff0c;CSS1/2的特性在CSS3 里都是可以使用…

图解CNN中的卷积(卷积运算、池化、Padding、多通道的卷积)

文章目录 卷积操作池化Padding对多通道&#xff08;channels&#xff09;图片的卷积套上激活函数是什么样的参考&#xff1a; 卷积层是深度学习神经网络中经常使用的一种层。它通过卷积运算来提取输入的特征&#xff0c;常用于图像、语音等信号处理任务中。 卷积层有以下几个参…

rocketmq-spring-boot-starter支持SpringBoot 1.x(spring-context 4.x)版本

1 问题说明 由于历史原因&#xff0c;项目使用的是SpringBoot1.x版本&#xff0c;而且由于种种原因&#xff0c;不能升级。在项目开发迭代过程中&#xff0c;决定使用RocketMQ作为消息中间件&#xff0c;因为是SpringBoot项目&#xff0c;理所应当的引入了rocketmq-spring-boo…

简单聊聊数字孪生与GIS融合的必要性

随着科技的不断发展和应用的不断深入&#xff0c;数字孪生和GIS在各自领域中展现出巨大的潜力。然而&#xff0c;更引人注目的是&#xff0c;数字孪生和GIS的融合将为许多行业带来全新的机遇和变革。在本文中&#xff0c;我们将探讨数字孪生和GIS融合的必要性&#xff0c;以及它…