MQ高可用相关设置

news2025/1/16 0:56:51

文章目录

  • 前言
  • MQ如何保证消息不丢失
    • RabbitMQ
    • RocketMQ
    • KafkaMQ
  • MQ如何保证顺序消息
    • RabbitMQ
    • RocketMQ
    • Kafka
  • MQ刷盘机制/集群同步
    • RabbitMQ
    • RocketMQ
    • Kafka
  • 广播消息&集群消息
    • RabbitMQ
    • RocketMQ
  • MQ集群架构
    • RabbitMQ
    • RocketMQ
    • Kafka
  • 消息重试
    • RabbitMQ
    • RockeMq
    • Kafka
  • 死信队列
    • RocketMQ
    • Kafka
  • 消息去重
    • RocketMQ
    • Kafka
  • 事务消息
    • RabbitMQ
    • RocketMQ
    • Kafka
  • 消息积压
    • RabbitMQ
  • 设计MQ思路
  • 博客记录

前言

消息丢失的三种情况

  1. 消息在传入服务过程中丢失
  2. MQ收到消息,暂存内存中,还没消费,自己挂掉了,内存中的数据搞丢
  3. 消费者消费到了这个消息,但还没来得及处理,就挂了,MQ以为消息已经被处理

也就是生产者丢失消息、消息列表丢失消息、消费者丢失消息;

MQ如何保证消息不丢失

RabbitMQ

一、生产者

开启RabbitMQ事务
生产者发送数据之前开启 RabbitMQ 事务channel.txSelect,然后发送消息,如果消息没有成功被 RabbitMQ 接收到,那么生产者会收到异常报错,此时就可以回滚事务channel.txRollback,然后重试发送消息;如果收到了消息,那么可以提交事务channel.txCommit。

// 开启事务
channel.txSelect
try {
      // 这里发送消息
} catch (Exception e) {
      channel.txRollback
 
// 这里再次重发这条消息
 
}
 
// 提交事务
channel.txCommit

设置Confirm模式:

同步确认:

//开启发布确认
channel.confirmSelect();
String message = i + "";
channel.basicPublish("",queueName,null,message.getBytes());
//服务端返回 false 或超时时间内未返回,生产者可以消息重发
boolean flag = channel.waitForConfirms();
if(flag){
    System.out.println("消息发送成功");
}

异步确认 :

服务端 :

消息持久化,必须满足以下三个条件,缺一不可。

  • Exchange 设置持久化

  • Queue 设置持久化

  • Message持久化发送:发送消息设置发送模式deliveryMode=2,代表持久化消息

发送消息时设置delivery_mode属性为2,使消息被持久化保存到磁盘,即使RabbitMQ服务器宕机也能保证消息不丢失。同时,创建队列时设置durable属性为True,以确保队列也被持久化保存。

// 声明队列,并将队列设置为持久化
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
String message = "Hello, RabbitMQ!";
// 发送消息时将消息设置为持久化
channel.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
        .deliveryMode(2)
        .build();
channel.basicPublish("", "myQueue", properties, "Hello, RabbitMQ".getBytes());

设置备份交换机:

Map<String, Object> arguments = new HashMap<>();
arguments.put("alternate-exchange", "myAlternateExchange");
channel.exchangeDeclare("myExchange", BuiltinExchangeType.DIRECT, true, false, arguments);
channel.exchangeDeclare("myAlternateExchange", BuiltinExchangeType.FANOUT, true, false, null);

二 :服务端设置集群镜像模式

  • 单节点模式: 最简单的情况,非集群模式,节点挂了,消息就不能用了。业务可能瘫痪,只能等待。

  • 普通模式: 消息只会存在与当前节点中,并不会同步到其他节点,当前节点宕机,有影响的业务会瘫痪,只能等待节点恢复重启可用(必须持久化消息情况下)。

  • 镜像模式: 消息会同步到其他节点上,可以设置同步的节点个数,但吞吐量会下降。属于RabbitMQ的HA方案

消费方开启消息确认机制 :

// 开启消息确认机制
channel.basicConsume(QUEUE_NAME, false, new DefaultConsumer(channel) {
    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        String message = new String(body, "UTF-8");
        System.out.println("Received message: " + message);
        // 手动发送消息确认
        channel.basicAck(envelope.getDeliveryTag(), false);
    }
});

手动确认 :

codechannel.basicConsume("myQueue", false, (consumerTag, delivery) -> {
    // 处理消息
    channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}, consumerTag -> {});

RocketMQ

一、生产者提供SYNC的发送消息方式,等待broker处理结果。

RocketMQ生产者提供了3种发送消息方式,分别是:

同步发送:Producer 向 broker 发送消息,阻塞当前线程等待 broker 响应发送结果。

Message msg = new Message("TopicTest",
        "TagA","OrderID188",
        "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
//同步传递消息,消息会发给集群中的⼀个Broker节点。
SendResult sendResult = producer.send(msg);

异步发送:Producer 首先构建一个向 broker 发送消息的任务,把该任务提交给线程池,等执行完该任务时,回调用户自定义的回调函数,执行处理结果。

Message msg = new Message("TopicTest","TagA","OrderID188",
        "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
        
producer.send(msg, new SendCallback() {
    @Override
    public void onSuccess(SendResult sendResult) {
        countDownLatch.countDown();
        System.out.printf("%-10d OK %s %n", index, sendResult.getMsgId());
    }

    @Override
    public void onException(Throwable e) {
        countDownLatch.countDown();
        System.out.printf("%-10d Exception %s %n", index, e);
        e.printStackTrace();
    }
});

Oneway发送:Oneway 方式只负责发送请求,不等待应答,Producer只负责把请求发出去,而不处理响应结果。

Message msg = new Message("TopicTest" /* Topic */,
        "TagA" /* Tag */,
        ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
);
//Call send message to deliver message to one of brokers.
//核⼼:发送消息。没有返回值,发完消息就不管了,不知道有没有发送消息成功
producer.sendOneway(msg);

SendResult定义说明(来自RocketMQ官方)

  • SEND_OK
    消息发送成功。要注意的是消息发送成功也不意味着它是可靠的。要确保不会丢失任何消息,还应启用同步Master服务器或同步刷盘,即SYNC_MASTER或SYNC_FLUSH。
  • FLUSH_DISK_TIMEOUT
    消息发送成功但是服务器刷盘超时。此时消息已经进入服务器队列(内存),只有服务器宕机,消息才会丢失。消息存储配置参数中可以设置刷盘方式和同步刷盘时间长度,如果Broker服务器设置了刷盘方式为同步刷盘,即FlushDiskType=SYNC_FLUSH(默认为异步刷盘方式),当Broker服务器未在同步刷盘时间内(默认为5s)完成刷盘,则将返回该状态——刷盘超时。
  • FLUSH_SLAVE_TIMEOUT
    消息发送成功,但是服务器同步到Slave时超时。此时消息已经进入服务器队列,只有服务器宕机,消息才会丢失。如果Broker服务器的角色是同步Master,即SYNC_MASTER(默认是异步Master即ASYNC_MASTER),并且从Broker服务器未在同步刷盘时间(默认为5秒)内完成与主服务器的同步,则将返回该状态——数据同步到Slave服务器超时。
  • SLAVE_NOT_AVAILABLE
    消息发送成功,但是此时Slave不可用。如果Broker服务器的角色是同步Master,即- SYNC_MASTER(默认是异步Master服务器即ASYNC_MASTER),但没有配置slave Broker服务器,则将返回该状态——无Slave服务器可用。

我们在调用producer.send方法时,不指定回调方法,则默认采用同步发送消息的方式,这也是丢失几率最小的一种发送方式(但是效率比较低)。

二、Borker 方面 : 设置成同步刷盘及同步复制;开启集群模式,集群同步;

1)异步刷盘

默认。消息写入 CommitLog 时,并不会直接写入磁盘,而是先写入 PageCache 缓存后返回成功,然后用后台线程异步把消息刷入磁盘。异步刷盘提高了消息吞吐量,但是可能会有消息丢失的情况,比如断点导致机器停机,PageCache 中没来得及刷盘的消息就会丢失。

2)同步刷盘

消息写入内存后,立刻请求刷盘线程进行刷盘,如果消息未在约定的时间内(默认 5 s)刷盘成功,就返回 FLUSH_DISK_TIMEOUT,Producer 收到这个响应后,可以进行重试。同步刷盘策略保证了消息的可靠性,同时降低了吞吐量,增加了延迟。要开启同步刷盘,需要增加下面配置:

flushDiskType=SYNC_FLUSH

同步复制后,消息复制流程如下:

  • slave 初始化后,跟 master 建立连接并向 master 发送自己的 offset;

  • master 收到 slave 发送的 offset 后,将 offset 后面的消息批量发送给 slave;

  • slave 把收到的消息写入 commitLog 文件,并给 master 发送新的 offset;

  • master 收到新的 offset 后,如果 offset >= producer 发送消息后的 offset,给 Producer 返回 SEND_OK。

三、消费者

RocketMQ消费失败后的消费重试机制

手动提交消息偏移量

consumer.registerMessageListener(new MessageListenerConcurrently() {
    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
        ConsumeConcurrentlyContext context) {
        System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }
});
public enum ConsumeConcurrentlyStatus {
    //业务方消费成功
    CONSUME_SUCCESS,
    //业务方消费失败,之后进行重新尝试消费
    RECONSUME_LATER;
}

RECONSUME_LATER “%RETRY%+ConsumeGroupName”—重试队列的主题

KafkaMQ

解决方案:
1、生产者调用异步回调消息。伪代码如下: producer.send(msg,callback);
2、生产者增加消息确认机制,设置生产者参数:acks = all。partition的leader副本接收到消息,等待所有的follower副本都同步到了消息之后,才认为本次生产者发送消息成功了;
3、生产者设置重试次数。比如:retries>=3,增加重试次数以保证消息的不丢失;
4、定义本地消息日志表,定时任务扫描这个表自动补偿,做好监控告警。
5、后台提供一个补偿消息的工具,可以手工补偿。


生产者设置同步发送 :

 // 异步发送 默认
 kafkaProducer.send(new ProducerRecord<>("first","kafka" + i));
 // 同步发送
 RecordMetadata first = kafkaProducer.send(new ProducerRecord<>("first", "kafka" + i)).get(); 

生产者设置发送ack :

// 1. 创建 kafka 生产者的配置对象
Properties properties = new Properties();
        // 设置 acks
properties.put(ProducerConfig.ACKS_CONFIG, "all");
        // 3. 创建 kafka 生产者对象
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);
            kafkaProducer.send(new ProducerRecord<>("first","atguigu " + i));

生产者设置重试次数。比如:retries>=3,增加重试次数以保证消息的不丢失;

三、消费者:

通过在Consumer端设置“enable.auto.commit”属性为false后,
在代码中手动调用KafkaConsumer实例的commitSync()方法提交,

这里指的是同步阻塞commit消费的偏移量,等待Broker端的返回响应,需要注意Broker端在对commit请求做出响应之前,消费端会处于阻塞状态,从而限制消息的处理性能和整体吞吐量以确保消息能够正常被消费。

如果在消费过程中,消费端突然Crash,这时候消费偏移量没有commit,等正常恢复后依然还会处理刚刚未commit的消息。

生产者acks参数指定了必须要有多少个分区副本收到消息,生产者才会认为消息写入是成功的。这个参数对消息丢失的可能性有重要影响。

1)ack=0,生产者在成功写入悄息之前不会等待任何来自服务器的响应。

2)ack=1,只要集群的首领副本收到消息,生产者就会收到一个来自服务器的成功响应。

3)ack=all,只有当所有同步副本全部收到消息时,生产者才会收到一个来自 服务器的成功响应。

MQ如何保证顺序消息

严格顺序消费的注意事项

  • 生产者不能异步发送,异步发送在发送失败的情况下,就没办法保证消息顺序。

比如你连续发了1,2,3。 过了一会,返回结果1失败,2, 3成功。你把1再重新发送1遍,这个时候顺序就乱掉了。

  • 应用中应确保业务中添加事务锁,防止并发处理同一对象。

比如修改业务员的手机号,操作员A和操作员B同时修改业务员张三的手机号,如两人的填入手机号相同无影响,如不同,操作员A输入正确,操作员B输入错误,可能造成消费顺序乱掉,手机号修改错误。

  • 对于消费端,不能并行消费,生产者顺序发送,消费端必须顺序消费。

RabbitMQ

消息单个消费者单线程消费。

RocketMQ

RocketMQ 提供了两种顺序消息模式:全局顺序消息和分区顺序消息。

  • 全局顺序消息:适用于性能要求不高的场景,所有消息按照严格的先入先出(FIFO)的顺序来发布和消费。全局顺序消息实际上是一种特殊的分区顺序消息,即Topic中只有一个分区,因此全局顺序和分区顺序的实现原理相同。由于分区顺序消息有多个分区,所以分区顺序消息比全局顺序消息的并发度和性能更高。
  • 分区顺序消息:适用于性能要求高的场景,所有消息根据Sharding Key进行区块分区,同一个分区内的消息按照严格的先进先出(FIFO)原则进行发布和消费。同一分区内的消息保证顺序,不同分区之间的消息顺序不做要求。对于指定的一个Topic,所有消息按照严格的先入先出(FIFO)的顺序来发布和消费。分区顺序消息比全局顺序消息的并发度和性能更高。

发送方使用MessageQueueSelector选择队列 :

Message msg = new Message("OrderTopicTest", "order_"+orderId,
        "KEY" + orderId,("order_"+orderId+" step " + j).getBytes(RemotingHelper.DEFAULT_CHARSET));
//消息队列的选择器
SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
    //第一个参数:所有的消息,第二个参数:发送的消息,第三个参数:根据什么发送,这里面传的是orderId
    @Override
    public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
        Integer id = (Integer) arg;
        int index = id % mqs.size();
        return mqs.get(index);
    }
    //同一个订单id可以放到同一个队列里面去
}, orderId);

消费方使用MessageQueueSelector选择队列 :

consumer.registerMessageListener(new MessageListenerOrderly() {
    @Override
    public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
        //自动提交
        context.setAutoCommit(true);
        for(MessageExt msg:msgs){
            System.out.println("收到消息内容 "+new String(msg.getBody()));
        }
        return ConsumeOrderlyStatus.SUCCESS;
    }
});

Kafka

Kafka是分布式多partition的,它会将一个topic中的消息尽可能均匀的分发到每个partition上。那么问题就来了,这样怎么保证同一个topic消息的顺序呢?

kafka可以通过partitionKey,将某类消息写入同一个partition,一个partition只能对应一个消费线程,以保证数据有序。
也就是说生产者在写消息的时候,可以指定一个 key,比如说我们指定了某个订单 id 作为 key,那么这个订单相关的数据,一定会被分发到同一个 partition 中去,而且这个 partition 中的数据一定是有顺序的。

先后两条消息发送时,前一条消息发送失败,后一条消息发送成功,然后失败的消息重试后发送成功,造成乱序。为了解决重试机制引起的消息乱序为实现Producer的幂等性,Kafka引入了Producer ID(即PID)和Sequence Number。

对于每个PID,该Producer发送消息的每个<Topic, Partition>都对应一个单调递增的Sequence Number。
同样,Broker端也会为每个<PID, Topic, Partition>维护一个序号,并且每Commit一条消息时将其对应序号递增。
对于接收的每条消息,如果其序号比Broker维护的序号大一,则Broker会接受它,否则将其丢弃
如果消息序号比Broker维护的序号差值比一大,说明中间有数据尚未写入,即乱序,此时Broker拒绝该消息
如果消息序号小于等于Broker维护的序号,说明该消息已被保存,即为重复消息,Broker直接丢弃该消息
发送失败后会重试,这样可以保证每个消息都被发送到broker

消费者从 partition 中取出来数据的时候,也一定是有顺序的。到这里,顺序还是 ok 的,没有错乱。

指定发送partition的分区:

//没有指明 partition 值但有 key 的情况下,将 key 的 hash 值与 topic 的 partition 数进行取余得到 partition 值
// 依次指定 key 值为 a,b,f ,数据 key 的 hash 值与 3 个分区求余,
//kafkaProducer.send(new ProducerRecord<>("first","a","atguigu " + i), new Callback() {}
kafkaProducer.send(new ProducerRecord("first", 0, "", "atguigu" + i)
        , new Callback() {
            @Override
            public void onCompletion(RecordMetadata metadata, Exception e) {
                if (e == null) {
                    System.out.println(" 主题: " +
                            metadata.topic() + "->" + "分区:" + metadata.partition()
                    );
                } else {
                    e.printStackTrace();
                }
            }
        });

MQ刷盘机制/集群同步

RabbitMQ

RocketMQ

同步刷盘、异步刷盘
  RocketMQ的消息是存储到磁盘上的,这样既能保证断电后恢复,又可以让存储的消息量超出内存的限制。

RocketMQ为了提高性能,会尽可能地保证磁盘的顺序写。消息在通过Producer写入RocketMQ的时候,有两种写磁盘方式:

1)异步刷盘方式:在返回写成功状态时,消息可能只是被写入了内存的PAGECACHE,立刻返回发送端发送成功,有单独的线程执行刷盘;写操作的返回快,吞吐量大;当内存里的消息量积累到一定程度时,统一触发写磁盘操作,快速写入。

2)同步刷盘方式:在返回写成功状态时,消息已经被写入磁盘。同步调用MappedByteBuffer的force()方法,同步等待刷盘结果,进行刷盘结果返回告知发送端。具体流程是,消息写入内存的PAGECACHE后,立刻通知刷盘线程刷盘,然后等待刷盘完成,刷盘线程执行完成后唤醒等待的线程,返回消息写成功的状态。

同步刷盘还是异步刷盘,是通过Broker配置文件里的flushDiskType参数设置的,这个参数被设置成SYNC_FLUSH、ASYNC_FLUSH中的一个

消息存储时,先将消息存储到内存,再根据不同的刷盘策略进行刷盘

同步刷盘:

异步刷盘:

刷盘源码

同步复制、异步复制
  如果一个broker组有Master和Slave,消息需要从Master复制到Slave上,有同步和异步两种复制方式。

  • 同步复制是等Master和Slave均写成功后才反馈给客户端写成功状态;
  • 异步复制方式是只要Master写成功即可反馈给客户端写成功状态

这两种复制方式各有优劣:

  • 在异步复制方式下,系统拥有较低的延迟和较高的吞吐量,但是如果Master出了故障,有些数据因为没有被写入Slave,有可能会丢失;
  • 在同步复制方式下,如果Master出故障,Slave上有全部的备份数据,容易恢复,但是同步复制会增大数据写入延迟,降低系统吞吐量。

同步复制和异步复制是通过Broker配置文件里的brokerRole参数进行设置的,这个参数可以被设置成ASYNC_MASTER、SYNC_MASTER、SLAVE三个值中的一个。

实际应用中要结合业务场景,合理设置刷盘方式和主从复制方式,尤其是SYNC_FLUSH方式,由于频繁的触发写磁盘动作,会明显降低性能。

通常情况下,应该把Master和Slave设置成ASYNC_FLUSH的刷盘方式,

主从之间配置成SYNC_MASTER的复制方式,这样即使有一台机器出故障,仍然可以保证数据不丢。

Kafka

Broker针对每个分区会创建一个分区目录,分区目录下面存放的是日志文件(.log)和索引文件(.index)

Kafka的刷盘策略主要有两种:同步刷盘(sync flush)和异步刷盘(async flush)。

同步异步刷盘的区别在于,消息存储在内存(memory)中以后,是否会等待执行完刷盘动作再返回,即是否会等待将消息中的消息写入磁盘中。kafka可以通过配置flush.messageflush.ms来设置刷盘策略,如果flush.message设置为5,表示每5条消息进行一次刷盘,如果flush.message设置为1,表示每一条消息都进行一次刷盘。如果flush.ms设置为1000,表示每过1000ms进行一次刷盘,如果flush.ms设置为5000,表示每过5000ms进行一次刷盘。

  • 同步刷盘:每条消息被写入磁盘前,必须等待操作系统完成该消息的磁盘写入操作。这种方式可以确保数据不丢失,但由于每次消息都要等待磁盘I/O完成,因此会影响性能。在Kafka中,默认使用的是异步刷盘策略,因为它结合了多副本和基于日志的存储机制,通过复制和重放来保障数据的高可用性。异步刷盘的目的是为了提高吞吐量和适应高性能应用场景。不过,这种方法增加了数据丢失的风险,尤其是在系统发生故障的情况下。
  • 异步刷盘:这是一种更轻量级的刷盘方式,它允许消息先被写入内存中的页缓存,然后在空闲时由操作系统异步地刷入磁盘。这样可以减少对性能的影响,尤其是当处理大量消息时。然而,由于没有立即将数据刷入磁盘,所以存在一定的数据丢失风险。Kafka提供了配置项log.flush.interval.messageslog.flush.interval.ms来控制何时触发强制的刷盘操作。如果没有设置这些参数,那么Kafka会根据log.flush.scheduler.interval.ms(默认值为3000毫秒)的时间间隔进行检查,以确定是否需要刷新所有日志到磁盘。需要注意的是,尽管可以通过这些参数来实现一定程度的控制,但是官方并不推荐依赖它们来强制刷盘,而是强调通过副本机制来保证数据的一致性和可靠性。

广播消息&集群消息

RabbitMQ

RocketMQ

RocketMQ主要提供了两种消费模式:集群消费以及广播消费。我们只需要在定义消费者的时候通过setMessageModel(MessageModel.XXX)方法就可以指定是集群还是广播式消费,默认是集群消费模式,即每个Consumer Group中的Consumer均摊所有的消息。

public class MQProducer {
    public static void main(String[] args) throws MQClientException, UnsupportedEncodingException, RemotingException, InterruptedException, MQBrokerException {
        // 创建DefaultMQProducer类并设定生产者名称
        DefaultMQProducer mqProducer = new DefaultMQProducer("producer-group-test");
        // 设置NameServer地址,如果是集群的话,使用分号;分隔开
        mqProducer.setNamesrvAddr("10.0.91.71:9876");
        // 消息最大长度 默认4M
        mqProducer.setMaxMessageSize(4096);
        // 发送消息超时时间,默认3000
        mqProducer.setSendMsgTimeout(3000);
        // 发送消息失败重试次数,默认2
        mqProducer.setRetryTimesWhenSendAsyncFailed(2);
        // 启动消息生产者
        mqProducer.start();
        // 循环十次,发送十条消息
        for (int i = 1; i <= 10; i++) {
            String msg = "hello, 这是第" + i + "条同步消息";
            // 创建消息,并指定Topic(主题),Tag(标签)和消息内容
            Message message = new Message("CLUSTERING_TOPIC", "", msg.getBytes(RemotingHelper.DEFAULT_CHARSET));
            // 发送同步消息到一个Broker,可以通过sendResult返回消息是否成功送达
            SendResult sendResult = mqProducer.send(message);
            System.out.println(sendResult);
        }
        // 如果不再发送消息,关闭Producer实例
        mqProducer.shutdown();
    }
}
public class MQConsumerB {
    public static void main(String[] args) throws MQClientException {
        // 创建DefaultMQPushConsumer类并设定消费者名称
        DefaultMQPushConsumer mqPushConsumer = new DefaultMQPushConsumer("consumer-group-test");
        // 设置NameServer地址,如果是集群的话,使用分号;分隔开
        mqPushConsumer.setNamesrvAddr("10.0.91.71:9876");
        // 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费
        // 如果不是第一次启动,那么按照上次消费的位置继续消费
        mqPushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        // 设置消费模型,集群还是广播,默认为集群
        mqPushConsumer.setMessageModel(MessageModel.CLUSTERING);
        // 消费者最小线程量
        mqPushConsumer.setConsumeThreadMin(5);
        // 消费者最大线程量
        mqPushConsumer.setConsumeThreadMax(10);
        // 设置一次消费消息的条数,默认是1
        mqPushConsumer.setConsumeMessageBatchMaxSize(1);
        // 订阅一个或者多个Topic,以及Tag来过滤需要消费的消息,如果订阅该主题下的所有tag,则使用*
        mqPushConsumer.subscribe("CLUSTERING_TOPIC", "*");
        // 注册回调实现类来处理从broker拉取回来的消息
        mqPushConsumer.registerMessageListener(new MessageListenerConcurrently() {
            // 监听类实现MessageListenerConcurrently接口即可,重写consumeMessage方法接收数据
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgList, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                MessageExt messageExt = msgList.get(0);
                String body = new String(messageExt.getBody(), StandardCharsets.UTF_8);
                System.out.println("消费者接收到消息: " + messageExt.toString() + "---消息内容为:" + body);
                // 标记该消息已经被成功消费
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        // 启动消费者实例
        mqPushConsumer.start();
    }
}
setMessageModel(MessageModel.CLUSTERING);//设置集群消息
setMessageModel(MessageModel.BROADCASTING); //设置广播消息

1、在Rocket集群消费模式下,(订阅)同一个主题(Topic)下的消息,对于不同的消费者组是一种“广播形式”,即每个消费者组的都会消费消息。

2、在Rocket集群消费模式下,(订阅)同一个主题(Topic)下的消息,对于相同的消费者组的消费者而言是一种集群模式,即同一个消费者组内的所有消费者均分消息并消费。

案例博客

MQ集群架构

RabbitMQ

普通集群
镜像集群

RocketMQ

单Master模式

这种方式风险较大,一旦Broker重启或者宕机时,会导致整个服务不可用。不建议线上环境使用,可以用于本地测试。

多Master模式

一个集群无Slave,全是Master,例如2个Master或者3个Master,这种模式的优缺点如下

  • 优点:配置简单,单个Master宕机或重启维护对应用无影响,在磁盘配置为RAID10时,即使机器宕机不可恢复情况下,由于RAID10磁盘非常可靠,消息也不会丢(异步刷盘丢失少量消息,同步刷盘一条不丢),性能最高;
  • 缺点:单台机器宕机期间,这台机器上未被消费的消息在机器恢复之前不可订阅,消息实时性会受到影响。

多Master多Slave模式-异步复制

每个Master配置一个Slave,有多对Master-Slave,HA采用异步复制方式,主备有短暂消息延迟(毫秒级),这种模式的优缺点如下

  • 优点:即使磁盘损坏,消息丢失的非常少,且消息实时性不会受影响,同时Master宕机后,消费者仍然可以从Slave消费,而且此过程对应用透明,不需要人工干预,性能同多Master模式几乎一样;
  • 缺点:Master宕机,磁盘损坏情况下会丢失少量消息(非同步刷盘的情况下)

多Master多Slave模式-同步双写

每个Master配置一个Slave,有多对Master-Slave,HA采用同步双写方式,即只有主备都写成功,才向应用返回成功,这种模式的优缺点如下:

  • 优点:数据与服务都无单点故障,Master宕机情况下,消息无延迟,服务可用性与数据可用性都非常高;
  • 缺点:性能比异步复制模式略低(大约低10%左右),发送单个消息的RT会略高,且目前版本在主节点宕机后,备机不能自动切换为主机
    如果是想不存在消息丢失的情况,那么在多Master的情况下要配置消息同步刷盘,而在 多Master多Slave模式-同步双写 的情况下配置同步刷盘。

RocketMQ有一个集群模式叫做:Dleger模式。当主节点失活时,能够自动重新触发选举。

DLedger集群节点状态leader、candidate、follower

  • leader:接受客户端请求,本地写入日志数据,并将数据复制给follower;定期发送心跳数据给follower维护leader状态

  • candidate:master故障后节点的中间状态,只有处于candidate状态的节点才会发送投票选举请求,master选举完成后,节点状态为leader或者follower

  • follower:负责同步leader的日志数据;接受leader心跳数据,重置倒计时器保持follower状态,并将心跳响应返回给leader

集群架构原博客

集群搭建

DLedger集群

Kafka

消息重试

重试带来的副作用
不停的重试看起来很美好,但也是有副作用的,主要包括两方面:消息重复,服务端压力增大

  • 远程调用的不确定性,因请求超时触发消息发送重试流程,此时客户端无法感知服务端的处理结果;客户端进行的消息发送重试可能会导致消费方重复消费,应该按照用户ID、业务主键等信息幂等处理消息。
  • 较多的重试次数也会增大服务端的处理压力。

RabbitMQ

消费者默认是自动提交,如果消费时出现了RuntimException,会导致消息直接重新入队,再次投递(进入队首),进入死循环,继而导致后面的消息被阻塞。

消息阻塞带来的后果是:后边的消息无法被消费;RabbitMQ服务端继续接收消息,占内存和磁盘越来越多。

重试机制有2种情况

  • 消息是自动确认时,如果抛出了异常导致多次重试都失败,消息被自动确认,消息就丢失了
  • 消息是手动确认时,如果抛出了异常导致多次重试都失败,消息没被确认,也无法nack,就一直是unacked状态,导致消息积压。

消息重试了5次,之后会抛出ListenerExecutionFailedException的异常。后面附带着Retry Policy Exhausted,提示我们重试次数已经用尽了。

消息重试次数用尽后,消息就会被抛弃。

重试机制

RockeMq

Producer端重试:
消息发送时,默认情况下会进行2次重试。如果重试次数达到上限,消息将不会被再次发送。

重试配置 :

producer.setRetryTimesWhenSendFailed(x)同步,参数默认值是2
producer.setRetryTimesWhenSendAsyncFailed()异步,参数默认值是2

Consumer端重试:

  • 当 Consumer 端遇到异常时,消息通常会重复重试16次,重试的时间间隔包括10秒、30秒、1分钟、2分钟、3分钟等。
  • 如果 Consumer 端没有返回 ConsumeConcurrentlyStatus.CONSUME_SUCCESS 或 ConsumeConcurrentlyStatus.RECONSUME_LATER,且消息没有消费成功,MQ 会无限制地发送给消费端,直到达到最大重试次数。
  • 在集群模式下,如果消费业务逻辑代码返回 Action.ReconsumerLater、NULL 或抛出异常,消息最多会重试16次。如果重试16次后消息仍然失败,则会被丢弃。
  • 消息队列 RocketMQ 默认允许每条消息最多重试16次,每次重试的间隔时间根据配置的间隔而变化。如果消息在16次重试后仍然失败,则不再投递该消息。理论上,如果消息持续失败,最长可能需要4小时46分钟内完成这16次重试。
Properties properties = new Properties();
//  配置对应 Group ID的最大消息重试次数为 20 次
properties.put(PropertyKeyConst.MaxReconsumeTimes, "20");
Consumer consumer =ONSFactory.createConsumer(properties);

(1)重试队列:如果Consumer端因为各种类型异常导致本次消费失败,为防止该消息丢失而需要将其重新回发给Broker端保存,保存这种因为异常无法正常消费而回发给MQ的消息队列称之为重试队列。
考虑到异常恢复需要一些时间,RocketMQ会为重试队列设置多个重试级别,每个重试级别都有与之对应的重新投递延时,重试次数越多投递延时就越大。
RocketMQ会为每个消费组都设置一个Topic名称为“%RETRY%+consumerGroup”的重试队列(这里需要注意的是,这个Topic的重试队列是针对消费组,而不是针对每个Topic设置的),用于暂时保存因为各种异常而导致Consumer端无法消费的消息。
考虑到异常恢复起来需要一些时间,会为重试队列设置多个重试级别,每个重试级别都有与之对应的重新投递延时,重试次数越多投递延时就越大。RocketMQ对于重试消息的处理是先保存至Topic名称为“SCHEDULE_TOPIC_XXXX”的延迟队列中,后台定时任务按照对应的时间进行Delay后重新保存至“%RETRY%+consumerGroup”的重试队列中(具体细节后面会详细阐述)。

(2)死信队列:由于有些原因导致Consumer端长时间的无法正常消费从Broker端Pull过来的业务消息,为了确保消息不会被无故的丢弃,那么超过配置的“最大重试消费次数”后就会移入到这个死信队列中。在RocketMQ中,SubscriptionGroupConfig配置常量默认地设置了两个参数,一个是retryQueueNums为1(重试队列数量为1个),另外一个是retryMaxTimes为16(最大重试消费的次数为16次)。Broker端通过校验判断,如果超过了最大重试消费次数则会将消息移至这里所说的死信队列。这里,RocketMQ会为每个消费组都设置一个Topic命名为“%DLQ%+consumerGroup"的死信队列。一般在实际应用中,移入至死信队列的消息,需要人工干预处理;

重试队列&死信队列

Consumer 消费失败,这里有 3 种情况:

  • 返回 RECONSUME_LATER

  • 返回 null

  • 抛出异常

public class MessageListenerImpl implements MessageListener {
	@Override
	public Action consume(Message message, ConsumeContext context) {
		//处理消息
		doConsumeMessage(message);
		//方式1:返回 Action.ReconsumeLater,消息将重试
		return Action.ReconsumeLater;
		//方式2:返回 null,消息将重试
		return null;
		//方式3:直接抛出异常, 消息将重试
		throw new RuntimeException("Consumer Message exceotion");
	}
}

如果希望消费失败后不重试,可以直接返回Action.CommitMessage。

一条消息无论重试多少次,这些重试消息的Message ID都不会改变。
消息重试只针对集群消费模式生效;广播消费模式不提供失败重试特性,即消费失败后,失败消息不再重试,继续消费新的消息。

官方文档

重试配置

Kafka

死信队列

RocketMQ

当一条消息初次消费失败,消息队列 RocketMQ 会自动进行消息重试;达到最大重试次数后,若消费依然失败,则表明消费者在正常情况下无法正确地消费该消息,此时,消息队列 RocketMQ 不会立刻将消息丢弃,而是将其发送到该消费者对应的特殊队列中。

在消息队列 RocketMQ 中,这种正常情况下无法被消费的消息称为死信消息(Dead-Letter Message),存储死信消息的特殊队列称为死信队列(Dead-Letter Queue)。

死信消息具有以下特性

  • 不会再被消费者正常消费。
  • 有效期与正常消息相同,均为 3 天,3 天后会被自动删除。因此,请在死信消息产生后的 3 天内及时处理。(commitLog文件的过期时间)

死信队列:

  • 一个死信队列对应一个 Group ID, 而不是对应单个消费者实例。
  • 如果一个 Group ID 未产生死信消息,消息队列 RocketMQ 不会为其创建相应的死信队列。
  • 一个死信队列包含了对应 Group ID 产生的所有死信消息,不论该消息属于哪个 Topic。
  • 死信队列是一个特殊的Topic,名称为%DLQ%consumerGroup@consumerGroup

Kafka

消息去重

  • 生产时消息重复
    由于生产者发送消息给MQ,在MQ确认的时候出现了网络波动,生产者没有收到确认,实际上MQ已经接收到了消息。这时候生产者就会重新发送一遍这条消息。
    生产者中如果消息未被确认,或确认失败,我们可以使用定时任务+(redis/db)来进行消息重试

  • 消费时消息重复
    消费者消费成功后,再给MQ确认的时候出现了网络波动,MQ没有接收到确认,为了保证消息被消费,MQ就会继续给消费者投递之前的消息。这时候消费者就接收到了两条一样的消息。
    由于重复消息是由于网络原因造成的,因此不可避免重复消息。但是我们需要保证消息的幂等性。

需要从生产者和消费端同时保证
生产者生产消息判断是否已经发送过。
消费端保证消息幂等性,即多次处理相同消息的效果与处理一次相同。

RabbitMQ 中消息重复消费的问题可以通过以下几种方式解决:

使用消息去重:在生产者发送消息时,为每条消息生成一个唯一标识符,并将其存储到数据库或缓存中。消费者在接收到消息后,先查询该标识符是否存在,如果存在则说明该消息已被处理过,直接跳过;否则进行业务处理,并将该标识符存储到数据库或缓存中。

使用幂等性操作:即使同一条消息被消费多次,也不会对业务造成影响。例如,在更新操作时使用乐观锁或悲观锁机制来避免并发更新问题。

使用 TTL(Time To Live)特性:设置每条消息的生命周期,超过指定时间后自动删除。如果消费者在该时间内未能成功处理该消息,则可以认为该消息已经丢失。

使用 RabbitMQ 提供的 ACK 机制:当消费者成功处理一条消息时,发送 ACK 确认信号给 RabbitMQ 服务器。服务器收到 ACK 后才会将该条消息从队列中删除。如果消费者处理失败,则发送 NACK 信号给 RabbitMQ 服务器,并设置重新入队(requeue)参数为 true,在下次重新投递时再次尝试消费。

在生产者端设置 IDempotent(幂等)属性:确保相同 ID 的请求只执行一次。这样就可以避免重复发送消息,从而避免了消息的重复消费。

RocketMQ

生产者生产时候设置一个唯一keyid

public void testRepeatProducer() throws Exception {
	// 创建默认的生产者
	DefaultMQProducer producer = new DefaultMQProducer("test-group");
	// 设置nameServer地址
	producer.setNamesrvAddr("localhost:9876");
	// 启动实例
	producer.start();
	// 我们可以使用自定义key当做唯一标识
	String keyId = UUID.randomUUID().toString();
	System.out.println(keyId);
	Message msg = new Message("TopicTest", "tagA", keyId, "我是一个测试消息".getBytes());
	SendResult send = producer.send(msg);
	System.out.println(send);
	// 关闭实例
	producer.shutdown();
}
/**
 * 在boot项目中可以使用@Bean在整个容器中放置一个单利对象
 */
public static BitMapBloomFilter bloomFilter = new BitMapBloomFilter(100); // m数组长度
 
@Test
public void testRepeatConsumer() throws Exception {
 
	DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("repeat-consumer-group");
	consumer.setMessageModel(MessageModel.BROADCASTING);
	consumer.setNamesrvAddr(MyConstant.NAME_SRV_ADDR);
	consumer.subscribe("repeatTestTopic", "*");
	// 注册一个消费监听 MessageListenerConcurrently是并发消费
	consumer.registerMessageListener(new MessageListenerConcurrently() {
	    @Override
	    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
	                                                    ConsumeConcurrentlyContext context) {
	        // 拿到消息的key
	        MessageExt messageExt = msgs.get(0);
	        String keys = messageExt.getKeys();
	        // 判断是否存在布隆过滤器中
	        if (bloomFilter.contains(keys)) {
	            // 直接返回了 不往下处理业务
	            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
	        }
	        // 这个处理业务,然后放入过滤器中
	        // do sth...
	        bloomFilter.add(keys);
	        System.out.println("keys:" + keys);
	        System.out.println(new String(messageExt.getBody()));
	        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
	    }
	});
	consumer.start();
	System.in.read();
}

Kafka

Producer 的幂等性指的是当发送同一条消息时,数据在 Server 端只会被持久化一次,数据不丟不重,Kafka为了实现幂等性,底层设计架构中引入了ProducerID和SequenceNumbe。

当Producer发送消息(x2,y2)给Broker时,Broker接收到消息并将其追加到消息流中。此时,Broker返回Ack信号给Producer时,发生异常导致Producer接收Ack信号失败。对于Producer来说,会触发重试机制,将消息(x2,y2)再次发送,但是,由于引入了幂等性,在每条消息中附带了PID(ProducerID)和SequenceNumber。相同的PID和SequenceNumber发送给Broker,而之前Broker缓存过之前发送的相同的消息,那么在消息流中的消息就只有一条(x2,y2),不会出现重复发送的情况。

缺点:Kafka 的 Exactly Once 幂等性只能保证单次会话内的精准一次性,不能解决跨会话和跨分区的问题;

事务消息

RabbitMQ

RabbitMQ中与事务机制有关的方法有三个:txSelect(), txCommit()以及txRollback();
txSelect用于将当前channel设置成transaction模式,txCommit用于提交事务,txRollback用于回滚事务,在通过txSelect开启事务之后,我们便可以发布消息给broker代理服务器了。
如果txCommit提交成功了,则消息一定到达了broker了,如果在txCommit执行之前broker异常崩溃或者由于其他原因抛出异常,这个时候我们便可以捕获异常通过txRollback回滚事务了。

// 开启事务
try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {

     channel.txSelect();
     try {
         // 发送消息
         channel.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, MESSAGE.getBytes());
         System.out.println("消息发送成功");

         // 提交事务
         channel.txCommit();
     } catch (IOException e) {
         // 回滚事务
         channel.txRollback();
         System.out.println("消息发送失败,事务回滚");
     }
 } catch (IOException | TimeoutException e) {
     e.printStackTrace();
 }

RocketMQ

RocketMQ支持事务消息,整体流程如下图:

在这里插入图片描述

1、Producer向broker发送半消息。
2、Producer端收到响应,消息发送成功,此时消息是半消息,标记为“不可投递”状态,Consumer消费不了。
3、Producer端执行本地事务。
4、正常情况本地事务执行完成,Producer向Broker发送Commit/Rollback,如果是Commit,Broker端将半消息标记为正常消息,Consumer可以消费,如果是Rollback,Broker丢弃此消息。
5、异常情况,Broker端迟迟等不到二次确认。在一定时间后,会查询所有的半消息,然后到Producer端查询半消息的执行情况。
6、Producer端查询本地事务的状态。
7、根据事务的状态提交commit/rollback到broker端。(5,6,7是消息回查)

事务的三种状态:

  • TransactionStatus.CommitTransaction:提交事务消息,消费者可以消费此消息

  • TransactionStatus.RollbackTransaction:回滚事务,它代表该消息将被删除,不允许被消费。

  • TransactionStatus.Unknown :中间状态,它代表需要检查消息队列来确定状态。

/**
 * 事务消息生产者
 */
public class TransactionMessageProducer {
    /**
     * 事务消息监听实现
     */
    private final static TransactionListener transactionListenerImpl = new TransactionListener() {

        /**
         * 在发送消息成功时执行本地事务
         * @param msg
         * @param arg producer.sendMessageInTransaction的第二个参数
         * @return 返回事务状态
         * LocalTransactionState.COMMIT_MESSAGE:提交事务,提交后broker才允许消费者使用
         * LocalTransactionState.RollbackTransaction:回滚事务,回滚后消息将被删除,并且不允许别消费
         * LocalTransactionState.Unknown:中间状态,表示MQ需要核对,以确定状态
         */
        @Override
        public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
            // TODO 开启本地事务(实际就是我们的jdbc操作)

            // TODO 执行业务代码(插入订单数据库表)
            // int i = orderDatabaseService.insert(....)
            // TODO 提交或回滚本地事务(如果用spring事务注解,这些都不需要我们手工去操作)

            // 模拟一个处理结果
            int index = 8;
            /**
             * 模拟返回事务状态
             */
            switch (index) {
                case 3:
                    System.out.printf("本地事务回滚,回滚消息,id:%s%n", msg.getKeys());
                    return LocalTransactionState.ROLLBACK_MESSAGE;
                case 5:
                case 8:
                    return LocalTransactionState.UNKNOW;
                default:
                    System.out.println("事务提交,消息正常处理");
                    return LocalTransactionState.COMMIT_MESSAGE;
            }
        }

        /**
         * Broker端对未确定状态的消息发起回查,将消息发送到对应的Producer端(同一个Group的Producer),
         * 由Producer根据消息来检查本地事务的状态,进而执行Commit或者Rollback
         * @param msg
         * @return 返回事务状态
         */
        @Override
        public LocalTransactionState checkLocalTransaction(MessageExt msg) {
            // 根据业务,正确处理: 订单场景,只要数据库有了这条记录,消息应该被commit
            String transactionId = msg.getTransactionId();
            String key = msg.getKeys();
            System.out.printf("回查事务状态 key:%-5s msgId:%-10s transactionId:%-10s %n", key, msg.getMsgId(), transactionId);

            if ("id_5".equals(key)) { // 刚刚测试的10条消息, 把id_5这条消息提交,其他的全部回滚。
                System.out.printf("回查到本地事务已提交,提交消息,id:%s%n", msg.getKeys());
                return LocalTransactionState.COMMIT_MESSAGE;
            } else {
                System.out.printf("未查到本地事务状态,回滚消息,id:%s%n", msg.getKeys());
                return LocalTransactionState.ROLLBACK_MESSAGE;
            }
        }
    };

    public static void main(String[] args) throws MQClientException, IOException {
        // 1. 创建事务生产者对象
        // 和普通消息生产者有所区别,这里使用的是TransactionMQProducer
        TransactionMQProducer producer = new TransactionMQProducer("GROUP_TEST");

        // 2. 设置NameServer的地址,如果设置了环境变量NAMESRV_ADDR,可以省略此步
        producer.setNamesrvAddr("192.168.100.242:9876");

        // 3. 设置事务监听器
        producer.setTransactionListener(transactionListenerImpl);

        // 4. 启动生产者
        producer.start();

        for (int i = 0; i < 10; i++) {
            String content = "Hello transaction message " + i;
            Message message = new Message("TopicTest", "TagA", "id_" + i, content.getBytes(RemotingHelper.DEFAULT_CHARSET));

            // 5. 发送消息(发送一条新订单生成的通知)
            SendResult result = producer.sendMessageInTransaction(message, i);

            System.out.printf("发送结果:%s%n", result);
        }

        System.in.read();
        // 6. 停止生产者
        producer.shutdown();
    }
}

rocketmq相关

Kafka

在使用Kafka事务前,需要开启幂等特性,将 enable.idempotence 设置为 true

Kafka 0.11.0 版本开始引入了事务性功能。实现事务性消息的过程涉及到生产者(Producer)和消费者(Consumer)两个方面:

一、生产者事务: 生产者可以通过事务将一批消息发送到 Kafka,并保证这批消息要么全部发送成功,要么全部发送失败,实现消息的原子性。

1)在使用事务之前,生产者需要先初始化一个事务,即调用 initTransactions() 方法。这样会将生产者切换到事务模式。

2)然后,生产者开始事务,将待发送的消息放入事务中,而不是直接发送到 Kafka。

3)在事务中,可以将多条消息添加到一个或多个主题的不同分区中。在事务中,如果发送消息成功,则会将消息暂存于事务缓冲区,不会立即发送到 Kafka。

4)在所有消息都添加到事务中后,可以调用 commitTransaction() 方法提交事务。如果所有消息提交成功,则整个事务提交成功,所有消息会被一起发送到 Kafka。

5)如果在事务过程中出现错误或者某条消息发送失败,可以调用 abortTransaction() 方法回滚事务,之前添加到事务中的消息不会发送到 Kafka。

Properties properties = new Properties();
properties.put(org.apache.kafka.clients.producer.ProducerConfig.TRANSACTIONAL_ID_CONFIG, transactionId);

KafkaProducer<String, String> producer = new KafkaProducer<String, String>(properties);

// 初始化事务
producer.initTransactions();
// 开启事务
producer.beginTransaction();

try {
     // 处理业务逻辑
     ProducerRecord<String, String> record1 = new ProducerRecord<String, String>(topic, "msg1");
     producer.send(record1);
     ProducerRecord<String, String> record2 = new ProducerRecord<String, String>(topic, "msg2");
     producer.send(record2);
     ProducerRecord<String, String> record3 = new ProducerRecord<String, String>(topic, "msg3");
     producer.send(record3);
     // 处理其他业务逻辑
     // 提交事务
     producer.commitTransaction();
} catch (ProducerFencedException e) {
	 // 中止事务,类似于事务回滚
     producer.abortTransaction();
}
producer.close();

事务手动提交
在一个事务中如果需要手动提交消息,需要先将 enable.auto.commit 参数设置为 false,然后调用 sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets, String consumerGroupId) 方法进行手动提交,该方式特别适用于 消费-转换-生产模式的状况

producer.initTransactions();
     while (true){
         org.apache.kafka.clients.consumer.ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
         if (!records.isEmpty()){
             Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
             producer.beginTransaction();
             try {
                 for (TopicPartition partition: records.partitions()){
                     List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
                     for (ConsumerRecord<String, String> record : partitionRecords) {
                         ProducerRecord<String, String> producerRecord = new ProducerRecord<>("topic-sink", record.key(), record.value());
                         producer.send(producerRecord);
                     }
                     long lastConsumedOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
                     offsets.put(partition, new OffsetAndMetadata(lastConsumedOffset + 1));
                 }
                 // 手动提交事务
                 producer.sendOffsetsToTransaction(offsets, "groupId");
                 producer.commitTransaction();
             }catch (ProducerFencedException e){
                 // log the exception
                 producer.abortTransaction();
             }
         }
    }

二、消费者事务: 消费者可以通过事务来保证消息的读取和消息处理的原子性。

1)消费者可以将消息的偏移量(Offset)保存在事务中,并在处理完消息后将偏移量提交到事务中。

2)当事务成功提交后,偏移量会被记录到消费者组中,表示这批消息已经被成功消费。

3)如果事务失败或回滚,偏移量不会被提交,下次消费者启动时会从上次提交的偏移量处继续消费。

通过使用事务,生产者和消费者都可以实现对消息的原子性处理,保证消息的可靠传输和处理。这对于一些需要强一致性的应用场景非常重要,例如金融交易系统或者订单处理系统等。但需要注意,使用事务会增加一定的系统开销,因此在实现事务时需要权衡性能和可靠性。

为了实现事务,Kafka引入了事务协调器(TransactionCoodinator)负责事务的处理,所有的事务逻辑包括分派PID等都是由TransactionCoodinator 负责实施的。

broker节点有一个专门管理事务的内部主题 __transaction_state,TransactionCoodinator 会将事务状态持久化到该主题中。

事务消息流程:

  1. 查找 TransactionCoordinator:生产者会先向某个broker发送 FindCoordinator 请求,找到 TransactionCoordinator 所在的 broker节点.
  2. 获取PID:生产者会向 TransactionCoordinator 申请获取 PID,TransactionCoordinator 收到请求后,会把 transactionalId 和对应的 PID 以消息的形式保存到主题 __transaction_state 中,保证 <transaction_Id,PID>的对应关系被持久化,即使宕机该对应关系也不会丢失
  3. 开启事务:调用 beginTransaction()后,生产者本地会标记开启了一个新事务
  4. 发送消息:生产者向用户主题发送消息,过程跟普通消息相同,但第一次发送请求前会先发送请求给TransactionCoordinator 将 transactionalId 和 TopicPartition 的对应关系存储在 __transaction_state 中
  5. 提交或中止事务:Kafka除了普通消息,还有专门的控制消息(ControlBatch)来标志一个事务的结束,控制消息有两种类型,分别用来表征事务的提交和中止
    该阶段本质就是一个两阶段提交过程:
    1. 将 PREPARE_COMMIT 或 PREPARE_ABORT 消息写入主题 __transaction_state
    2. 将COMMIT 或 ABORT 信息写入用户所使用的普通主题和 __consumer_offsets
    3. 将 COMPLETE_COMMIT 或 COMPLETE_COMMIT_ABORT 消息写入主题 __transaction_state

学习博客

消息积压

  • 消息积压处理办法:临时紧急扩容。
  • 先修复 consumer 的问题,确保其恢复消费速度,然后将现有 cnosumer 都停掉。新建一个 topic,partition 是原来的 10 倍,临时建立好原先 10 倍的 queue 数量。然后写一个临时的分发数据的 consumer 程序,这个程序部署上去消费积压的数据,消费之后不做耗时的处理,直接均匀轮询写入临时建立好的 10 倍数量的 queue。接着临时征用 10 倍的机器来部署 consumer,每一批 consumer 消费一个临时 queue 的数据。这种做法相当于是临时将 queue 资源和 consumer 资源扩大 10 倍,以正常的 10 倍速度来消费数据。等快速消费完积压数据之后,得恢复原先部署的架构,重新用原先的 consumer 机器来消费消息。
  • MQ中消息失效:假设你用的是 RabbitMQ,RabbtiMQ 是可以设置过期时间的,也就是 TTL。如果消息在 queue 中积压超过一定的时间就会被 RabbitMQ 给清理掉,这个数据就没了。那这就是第二个坑了。这就不是说数据会大量积压在 mq 里,而是大量的数据会直接搞丢。我们可以采取一个方案,就是批量重导,这个我们之前线上也有类似的场景干过。就是大量积压的时候,我们当时就直接丢弃数据了,然后等过了高峰期以后,比如大家一起喝咖啡熬夜到晚上12点以后,用户都睡觉了。这个时候我们就开始写程序,将丢失的那批数据,写个临时程序,一点一点的查出来,然后重新灌入 mq 里面去,把白天丢的数据给他补回来。也只能是这样了。假设 1 万个订单积压在 mq 里面,没有处理,其中 1000 个订单都丢了,你只能手动写程序把那 1000 个订单给查出来,手动发到 mq 里去再补一次。
  • mq消息队列块满了:如果消息积压在 mq 里,你很长时间都没有处理掉,此时导致 mq 都快写满了,咋办?这个还有别的办法吗?没有,谁让你第一个方案执行的太慢了,你临时写程序,接入数据来消费,消费一个丢弃一个,都不要了,快速消费掉所有的消息。然后走第二个方案,到了晚上再补数据吧。

RabbitMQ

设计MQ思路

博客记录

mq中常见问题

rocketmq 相关问题

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1506981.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Linux网络套接字之TCP网络程序

(&#xff61;&#xff65;∀&#xff65;)&#xff89;&#xff9e;嗨&#xff01;你好这里是ky233的主页&#xff1a;这里是ky233的主页&#xff0c;欢迎光临~https://blog.csdn.net/ky233?typeblog 点个关注不迷路⌯▾⌯ 目录 一、接口介绍 1.socket 2.listen 3.accept…

conda 设置国内源 windows+linux

默认的conda源连接不好&#xff0c;时好时坏&#xff0c;而且速度很慢&#xff0c;可以使用国内的源 如果没有安装conda&#xff0c;可以参考&#xff1a; miniconda安装&#xff1a;链接 anaconda安装winlinux&#xff1a;链接 windows使用命令提示符&#xff0c;linux使用…

后端八股笔记------Redis

Redis八股 上两种都有可能导致脏数据 所以使用两次删除缓存的技术&#xff0c;延时是因为数据库有主从问题需要更新&#xff0c;无法达到完全的强一致性&#xff0c;只能达到控制一致性。 一般放入缓存中的数据都是读多写少的数据 业务逻辑代码&#x1f447; 写锁&#x1f4…

Linux网络基础2之https

(&#xff61;&#xff65;∀&#xff65;)&#xff89;&#xff9e;嗨&#xff01;你好这里是ky233的主页&#xff1a;这里是ky233的主页&#xff0c;欢迎光临~https://blog.csdn.net/ky233?typeblog 点个关注不迷路⌯▾⌯ http是明文的可以通过一些的工具获取到正文层&#…

【Spring Boot系列】快速上手 Spring Boot

&#x1f49d;&#x1f49d;&#x1f49d;欢迎来到我的博客&#xff0c;很高兴能够在这里和您见面&#xff01;希望您在这里可以感受到一份轻松愉快的氛围&#xff0c;不仅可以获得有趣的内容和知识&#xff0c;也可以畅所欲言、分享您的想法和见解。 推荐:kwan 的首页,持续学…

【数学建模】传染病模型笔记

传染病的基本数学模型&#xff0c;研究传染病的传播速度、空间范围、传播途径、动力学机理等问题&#xff0c;以指导对传染病的有效地预防和控制。常见的传染病模型按照传染病类型分为 SI、SIR、SIRS、SEIR 模型等&#xff0c;按照传播机理又分为基于常微分方程、偏微分方程、网…

《计算机网络》考研:2024/3/7 2.1.4 奈氏准则和香农定理

2024/3/7 (作者转行去干LLMs了&#xff0c;但是又想搞定考研&#xff0c;忙不过来了就全截图了呜呜呜。。。 生活真不容易。) 2.1.4 奈氏准则与香农定理

出现“error: failed to push some refs to ‘https://github.com/****.git‘”,如何解决问题

一、出错情况&#xff1a; 今天继续推送整理的知识点的时候&#xff0c;出现了一个报错。“error: failed to push some refs to https://github.com/.git”&#xff0c;百思不得其解&#xff0c;之前推送的时候都可以轻松推送成功&#xff0c;如今却说本地库与远程库不一致。…

STM32电源及时钟介绍

一、STM32最小系统 二、电源电路 2.1供电电压VDD&#xff0c;VSS F103VET6 的引角图 在 F103VET6 的引角图中可找到 49\50 角&#xff0c; 74\75 角&#xff0c; 99\100 角&#xff0c; 27\28角&#xff0c;10 \11角一共 5 对的VDD&#xff0c;VSS&#xff0c;也就是给我们芯片…

体系班第十三节

1判断完全二叉树递归做法 有四种情况&#xff1a;1 左树完全&#xff0c;右数满&#xff0c;且左高为右高加一 2左满 &#xff0c;右满&#xff0c;左高为右高加一 3左满&#xff0c;右完全&#xff0c;左右高相等 4左右均满且高相等 #include<iostream> #include&l…

外边距折叠的原因和解决

参考文章 什么时候出现外边距塌陷 外边距塌陷&#xff0c;也叫外边距折叠&#xff0c;在普通文档流中&#xff0c;在垂直方向上的2个或多个相邻的块级元素&#xff08;父子或者兄弟&#xff09;外边距合并成一个外边距的现象&#xff0c;不过只有上下外边距才会有塌陷&#x…

Guiding Large Language Models viaDirectional Stimulus Prompting

1. 通过定向刺激提示指导大语言模型 论文地址&#xff1a;[2302.11520] Guiding Large Language Models via Directional Stimulus Prompting (arxiv.org) 源码地址&#xff1a;GitHub - Leezekun/Directional-Stimulus-Prompting: [NeurIPS 2023] Codebase for the paper: &qu…

[mmucache]-ARMV8-aarch64的虚拟内存(mmutlbcache)介绍-概念扫盲

&#x1f525;博客主页&#xff1a; 小羊失眠啦. &#x1f3a5;系列专栏&#xff1a;《C语言》 《数据结构》 《C》 《Linux》 《Cpolar》 ❤️感谢大家点赞&#x1f44d;收藏⭐评论✍️ 思考: 1、cache的entry里都是有什么&#xff1f; 2、TLB的entry里都是有什么? 3、MMU操作…

保持长期高效的七个法则(一)7 Rules for Staying Productive Long-Term(1)

Easily the best habit I’ve ever started was to use a productivity system.The idea is simple:organizing all the stuff you need to do (and how you’re going to do it) prevents a lot of internal struggle to get things done. 无疑&#xff0c;我曾经建立过的最好…

简单了解TCP/IP四层模型

什么是计算机网络&#xff1f; 计算机网络我们可以理解为一个巨大的城市地图&#xff0c;我们想从A地前往B地&#xff0c;其中要走的路、要避开的问题都交给计算机网络解决&#xff0c;直到我们可以正常的到达目的地&#xff0c;那么我们会把其中的过程抽象成一个网络模型&…

分布式执行引擎ray入门--(3)Ray Train

Ray Train中包含4个部分 Training function: 包含训练模型逻辑的函数 Worker: 用来跑训练的 Scaling configuration: 配置 Trainer: 协调以上三个部分 Ray TrainPyTorch 这一块比较建议直接去官网看diff&#xff0c;官网色块标注的比较清晰&#xff0c;非常直观。 impor…

APP2:android studio如何使用lombok

一、前言 不知道从哪个版本开始&#xff0c;android studio便无法在plugins中下载lombok了&#xff0c;有人说是内置了&#xff0c;好像有这么回事儿。我主要面临如下两个问题&#xff1a; 使用内置lombok&#xff0c;可以自动生成setter、setter、toString等。但是&#xff0…

政安晨:【深度学习处理实践】(五)—— 初识RNN-循环神经网络

RNN&#xff08;循环神经网络&#xff09;是一种在深度学习中常用的神经网络结构&#xff0c;用于处理序列数据。与传统的前馈神经网络不同&#xff0c;RNN通过引入循环连接在网络中保留了历史信息。 RNN中的每个神经元都有一个隐藏状态&#xff0c;它会根据当前输入和前一个时…

【QT+QGIS跨平台编译】之七十:【QGIS_Analysis跨平台编译】—【qgsrastercalcparser.cpp生成】

文章目录 一、Bison二、生成来源三、构建过程一、Bison GNU Bison 是一个通用的解析器生成器,它可以将注释的无上下文语法转换为使用 LALR (1) 解析表的确定性 LR 或广义 LR (GLR) 解析器。Bison 还可以生成 IELR (1) 或规范 LR (1) 解析表。一旦您熟练使用 Bison,您可以使用…

free pascal 调用 C#程序读 Freeplane.mm文件,生成测试用例.csv文件

C# 请参阅&#xff1a;C# 用 System.Xml 读 Freeplane.mm文件&#xff0c;生成测试用例.csv文件 Freeplane 是一款基于 Java 的开源软件&#xff0c;继承 Freemind 的思维导图工具软件&#xff0c;它扩展了知识管理功能&#xff0c;在 Freemind 上增加了一些额外的功能&#x…