RocketMQ编程使用和应用场景

news2024/11/22 9:31:34

RocketMQ消息模型

一、RocketMQ客户端基本使用

引入RocketMQ依赖

<dependency>
    <groupId>org.apache.rocketmq</groupId>
	<artifactId>rocketmq-client</artifactId>
	<version>4.9.5</version>
</dependency>ncy>

RocketMQ权限控制相关的核心依赖 

<dependency>
    <groupId>org.apache.rocketmq</groupId>
	<artifactId>rocketmq-acl</artifactId>
	<version>4.9.5</version>
</dependency>

一个最为简单的消息生产者代码如下:

public class Producer {
    public static void main(String[] args) throws MQClientException, InterruptedException {
        //初始化一个消息生产者
        DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
        // 指定nameserver地址
        producer.setNamesrvAddr("192.168.64.133:9876");
        // 启动消息生产者服务
        producer.start();
        for (int i = 0; i < 2; i++) {
            try {
                // 创建消息。消息由Topic,Tag和body三个属性组成,其中Body就是消息内容
                Message msg = new Message("TopicTest","TagA",("Hello RocketMQ " +i).getBytes(RemotingHelper.DEFAULT_CHARSET));
                //发送消息,获取发送结果
                SendResult sendResult = producer.send(msg);
                System.out.printf("%s%n", sendResult);
            } catch (Exception e) {
                e.printStackTrace();
                Thread.sleep(1000);
            }
        }
        //消息发送完后,停止消息生产者服务。
        producer.shutdown();
    }
}

​ 一个简单的消息消费者代码如下:

public class Consumer {
    public static void main(String[] args) throws InterruptedException, MQClientException {
        //构建一个消息消费者
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4");
        //指定nameserver地址
        consumer.setNamesrvAddr("192.168.64.133:9876");
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
        // 订阅一个感兴趣的话题,这个话题需要与消息的topic一致
        consumer.subscribe("TopicTest", "*");
        // 注册一个消息回调函数,消费到消息后就会触发回调。
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                msgs.forEach(messageExt -> {
                    try {
                        System.out.println("收到消息:"+new String(messageExt.getBody(), RemotingHelper.DEFAULT_CHARSET));
                    } catch (UnsupportedEncodingException e) {}
                });
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        //启动消费者服务
        consumer.start();
        System.out.print("Consumer Started");
    }
}

  • 消息生产者的固定步骤

1.创建消息生产者producer,并指定生产者组名
2.指定Nameserver地址
3.启动producer。 这个步骤比较容易忘记。可以认为这是消息生产者与服务端建立连接的过程。
4.创建消息对象,指定主题Topic、Tag和消息体
5.发送消息
6.关闭生产者producer,释放资源。

  • 消息消费者的固定步骤

1.创建消费者Consumer,必须指定消费者组名
2.指定Nameserver地址
3.订阅主题Topic和Tag
4.设置回调函数,处理消息
5.启动消费者consumer。消费者会一直挂起,持续处理消息。

​ 其中,最为关键的就是NameServer。从示例中可以看到,RocketMQ的客户端只需要指定NameServer地址,而不需要指定具体的Broker地址。

​ 指定NameServer的方式有两种。可以在客户端直接指定,例如 consumer.setNameSrvAddr("127.0.0.1:9876")。然后,也可以通过读取系统环境变量NAMESRV_ADDR指定。其中第一种方式的优先级更高。

二、消息确定机制

1、消息生产端采用消息确认加多次重试的机制保证消息正常发送到RocketMQ

三种不同的消息发送方式

1)、单向发送

sendOneway()方法没有返回值,虽然效率高,容易丢失数据

public class OnewayProducer {
    public static void main(String[] args)throws Exception{
        DefaultMQProducer producer = new DefaultMQProducer("producerGroup");
        producer.start();
        Message message = new Message("Order","tag","order info : orderId = xxx".getBytes(StandardCharsets.UTF_8));
        producer.sendOneway(message);
        Thread.sleep(50000);
        producer.shutdown();
    }
}
2、同步发送

生产者发送消息给broker,直到broker给予反馈才能继续其它的工作,否则一直等待broker反馈 

SendResult sendResult = producer.send(msg);

在SendResult中有一个SendStatus属性,这个SendStatus是一个枚举类型,其中包含了Broker端的各种情况。

public enum SendStatus {
    SEND_OK,
    FLUSH_DISK_TIMEOUT,
    FLUSH_SLAVE_TIMEOUT,
    SLAVE_NOT_AVAILABLE,
}

 SEND_OK表示消息已经成功发送到Broker上,其他几种枚举值,都是表示消息在Broker端处理失败了,Broker端返回的SendStatus不是SEND_OK,也并不表示消息就一定不会推送给下游消费者。仅仅只是表示broker端并没有完全正确的处理这些消息。因此,如果要重新发送消息,最好要带上唯一的系统标识,这样在消费者端才能自行做幂等判断。也就是为了不让消息重复消费,这种同步的方式如果网络问题而导致阻塞很长时间,显然不合适的

3、异步发送

异步发送机制下,生产者在向Broker发送消息时,会同时注册一个回调函数。异步发送机制下,生产者在向Broker发送消息时,会同时注册一个回调函数

producer.send(msg, new SendCallback() {
                    @Override
                    public void onSuccess(SendResult sendResult) {
                        countDownLatch.countDown();
                        System.out.printf("%-10d OK %s %n", index, sendResult.getMsgId());
                    }
                    @Override
                    public void onException(Throwable e) {
                        countDownLatch.countDown();
                        System.out.printf("%-10d Exception %s %n", index, e);
                        e.printStackTrace();
                    }
                });

在SendCallback接口中有两个方法,onSuccess和onException。当Broker端返回消息处理成功的响应信息SendResult时,就会调用onSuccess方法。当Broker端处理消息超时或者失败时,就会调用onExcetion方法,生产者就可以在onException方法中进行补救措施。

此时同样有几个问题需要注意。一是与同步发送机制类似,触发了SendCallback的onException方法同样并不一定就表示消息不会向消费者推送。如果Broker端返回响应信息太慢,超过了超时时间,也会触发onException方法。超时时间默认是3秒,可以通过producer.setSendMsgTimeout方法定制。而造成超时的原因则有很多,消息太大造成网络拥堵、网速太慢、Broker端处理太慢等都可能造成消息处理超时。 

二是在SendCallback的对应方法被触发之前,生产者不能调用shutdown()方法。如果消息处理完之前,生产者线程就关闭了,生产者的SendCallback对应方法就不会触发。这是因为使用异步发送机制后,生产者虽然不用阻塞下来等待Broker端响应,但是SendCallback还是需要附属于生产者的主线程才能执行。如果Broker端还没有返回SendResult,而生产者主线程已经停止了,那么SendCallback的执行线程也就会随主线程一起停止,对应的方法自然也就无法执行了。

2、消息消费者端采用状态确认机制保证消费者一定能正常处理对应的消息

Broker等待消费者返回消息处理状态

consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

 这个返回值是一个枚举值,有两个选项 CONSUME_SUCCESS和RECONSUME_LATER。如果消费者返回CONSUME_SUCCESS,那么消息自然就处理结束了。但是如果消费者没有处理成功,返回的是RECONSUME_LATER,Broker就会过一段时间再发起消息重试。

​ 为了要兼容重试机制的成功率和性能,RocketMQ设计了一套非常完善的消息重试机制,从而尽可能保证消费者能够正常处理用户的订单信息。

​ 1、Broker不可能无限制的向消费失败的消费者推送消息。如果消费者一直没有恢复,Broker显然不可能一直无限制的推送,这会浪费集群很多的性能。所以,Broker会记录每一个消息的重试次数。如果一个消息经过很多次重试后,消费者依然无法正常处理,那么Broker会将这个消息推入到消费者组对应的死信Topic中。死信Topic相当于windows当中的垃圾桶。你可以人工介入对死信Topic中的消息进行补救,也可以直接彻底删除这些消息。RocketMQ默认的最大重试次数是16次。

​ 2、为了让这些重试的消息不会影响Topic下其他正常的消息,Broker会给每个消费者组设计对应的重试Topic。MessageQueue是一个具有严格FIFO特性的数据结构。如果需要重试的这些消息还是放在原来的MessageQueue中,就会对当前MessageQueue产生阻塞,让其他正常的消息无法处理。RocketMQ的做法是给每个消费者组自动生成一个对应的重试Topic。在消息需要重试时,会先移动到对应的重试Topic中。后续Broker只要从这些重试Topic中不断拿出消息,往消费者组重新推送即可。这样,这些重试的消息有了自己单独的队列,就不会影响到Topic下的其他消息了。

​ 3、RocketMQ中设定的消费者组都是订阅主题和消费逻辑相同的服务备份,所以当消息重试时,Broker只要往消费者组中随意一个实例推送即可。这是消息重试机制能够正常运行的基础。但是,在客户端的具体实现时,MQDefaultMQConsumer并没有强制规定消费者组不能重复。也就是说,你完全可以实现出一些订阅主题和消费逻辑完全不同的消费者服务,共同组成一个消费组。在这种情况下,RocketMQ不会报错,但是消息的处理逻辑就无法保持一致了。这会给业务带来很大的麻烦。这是在实际应用时需要注意的地方。

​ 4、Broker端最终只通过消费者组返回的状态来确定消息有没有处理成功。至于消费者组自己的业务执行是否正常,Broker端是没有办法知道的。因此,在实现消费者的业务逻辑时,应该要尽量使用同步实现方式,保证在自己业务处理完成之后再向Broker端返回状态。而应该尽量避免异步的方式处理业务逻辑。

3、消费者也可以自行指定起始消费位点

Broker端通过Consumer返回的状态来推进所属消费者组对应的Offset。

public enum ConsumeFromWhere {
    CONSUME_FROM_LAST_OFFSET, //从队列的第一条消息开始重新消费
    CONSUME_FROM_FIRST_OFFSET, //从上次消费到的地方开始继续消费
    CONSUME_FROM_TIMESTAMP; //从某一个时间点开始重新消费
}

另外,如果指定了ConsumerFromWhere.CONSUME_FROM_TIMESTAMP,这就表示要从一个具体的时间开始。具体时间点,需要通过Consumer的另一个属性ConsumerTimestamp。这个属性可以传入一个表示时间的字符串。

consumer.setConsumerTimestamp("20131223171201");

三、广播机制

广播模式和集群模式是RocketMQ的消费者端处理消息最基本的两种模式。集群模式下,一个消息,只会被一个消费者组中的多个消费者实例 共同 处理一次。广播模式下,一个消息,则会推送给所有消费者实例处理,不再关心消费者组。

示例代码:

​ 消费者核心代码

consumer.setMessageModel(MessageModel.BROADCASTING);

启动多个消费者,广播模式下,这些消费者都会消费一次消息。

实现思路:

​ 默认模式(也就是集群模式)下,Broker端会给每个ConsumerGroup维护一个统一的Offset,这个Offset可以保证一个消息,在同一个ConsumerGroup内只会被消费一次。而广播模式的实现方式,是将Offset转移到消费者端自行保管,这样Broker端只管向所有消费者推送消息,而不用负责维护消费进度。

注意点:

1、Broker端不维护消费进度,意味着,如果消费者处理消息失败了,将无法进行消息重试。

2、消费者端维护Offset的作用是可以在服务重启时,按照上一次消费的进度,处理后面没有消费过的消息。丢了也不影响服务稳定性。

比如生产者发送了1~10号消息。消费者当消费到第6个时宕机了。当他重启时,Broker端已经把第10个消息都推送完成了。如果消费者端维护好了自己的Offset,那么他就可以在服务重启时,重新向Broker申请6号到10号的消息。但是,如果消费者端的Offset丢失了,消费者服务依然可以正常运行,但是6到10号消息就无法再申请了。后续这个消费者就只能获取10号以后的消息。

​ 实际上,Offset的维护数据是放在 ${user.home}/.rocketmq_offset/${clientIp}${instanceName}/${group}/offsets.json 文件下的。

​ 消费者端存储广播消费的本地offsets文件的默认缓存目录是 System.getProperty(“user.home”) + File.separator + “.rocketmq_offsets” ,可以通过定制 rocketmq.client.localOffsetStoreDir 系统属性进行修改。

本地offsets文件在缓存目录中的具体位置与消费者的clientIp 和 instanceName有关。其中instanceName默认是DEFAULT,可以通过定制系统属性 rocketmq.client.name 进行修改。另外,每个消费者对象也可以单独设定instanceName。

RocketMQ会通过定时任务不断尝试本地Offsets文件的写入,但是,如果本地Offsets文件写入失败,RocketMQ不会进行任何的补救。

四、顺序消费机制

应用场景:

​ 每一个订单有从下单、锁库存、支付、下物流等几个业务步骤。每个业务步骤都由一个消息生产者通知给下游服务。如何保证对每个订单的业务处理顺序不乱?

示例代码:

​ 生产者核心代码:

for (int i = 0; i < 10; i++) {
                int orderId = i;
                for(int j = 0 ; j <= 5 ; j ++){
                    Message msg =
                            new Message("OrderTopicTest", "order_"+orderId, "KEY" + orderId,
                                    ("order_"+orderId+" step " + j).getBytes(RemotingHelper.DEFAULT_CHARSET));
                    SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
                        @Override
                        public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
                            Integer id = (Integer) arg;
                            int index = id % mqs.size();
                            return mqs.get(index);
                        }
                    }, orderId);
                    System.out.printf("%s%n", sendResult);
                }
            }

通过MessageSelector,将orderId相同的消息,都转发到同一个MessageQueue中。

​ 消费者核心代码:

consumer.registerMessageListener(new MessageListenerOrderly() {
            @Override
            public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
                context.setAutoCommit(true);
                for(MessageExt msg:msgs){
                    System.out.println("收到消息内容 "+new String(msg.getBody()));
                }
                return ConsumeOrderlyStatus.SUCCESS;
            }
        });

​ 注入一个MessageListenerOrderly实现。

实现思路:

​ 基础思路:只有放到一起的一批消息,才有可能保持消息的顺序。

1、生产者只有将一批有顺序要求的消息,放到同一个MesasgeQueue上,Broker才有可能保持这一批消息的顺序。

​ 2、消费者只有一次锁定一个MessageQueue,拿到MessageQueue上所有的消息,

注意点:

​ 1、理解局部有序与全局有序。大部分业务场景下,我们需要的其实是局部有序。如果要保持全局有序,那就只保留一个MessageQueue。性能显然非常低。

​ 2、生产者端尽可能将有序消息打散到不同的MessageQueue上,避免过于几种导致数据热点竞争。

​ 2、消费者端只能用同步的方式处理消息,不要使用异步处理。更不能自行使用批量处理。

​ 3、消费者端只进行有限次数的重试。如果一条消息处理失败,RocketMQ会将后续消息阻塞住,让消费者进行重试。但是,如果消费者一直处理失败,超过最大重试次数,那么RocketMQ就会跳过这一条消息,处理后面的消息,这会造成消息乱序。

​ 4、消费者端如果确实处理逻辑中出现问题,不建议抛出异常,可以返回ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT作为替代。

五、延迟消息

应用场景:

​ 延迟消息发送是指消息发送到Apache RocketMQ后,并不期望立马投递这条消息,而是延迟一定时间后才投递到Consumer进行消费。

虽然不太起眼,但是这是RocketMQ非常有特色的一个功能。对比下RabbitMQ和Kafka。RabbitMQ中只能通过使用死信队列变相实现延迟消息,或者加装一个插件来支持延迟消息。 Kafka则不太好实现延迟消息。

示例代码:

​ 生产者端核心代码:

msg.setDelayTimeLevel(3);

 只要给消息设定一个延迟级别就行了,无比简单。

​ RocketMQ给消息定制了18个默认的延迟级别,分别对应18个不同的预设好的延迟时间。

image.png

实现思路:

​ 延迟消息的难点其实是性能,需要不断进行定时轮训。全部扫描所有消息是不可能的,RocketMQ的实现方式是预设一个系统Topic,名字叫做SCHEDULE_TOPIC_XXXX。在这个Topic下,预设18个延迟队列。然后每次只针对这18个队列里的消息进行延迟操作,这样就不用一直扫描所有的消息了。

注意点:

​ 这样预设延迟时间其实是不太灵活的。5.x版本已经支持预设一个具体的时间戳,按秒的精度进行定时发送。

​ 但是可以看到,这18个延迟级别虽然无法调整,但是每个延迟级别对应的延迟时间其实是可以调整的。只需要修改截图中的参数就行。不过通常不建议这么做。

六、批量消息

应用场景:

​ 生产者要发送的消息比较多时,可以将多条消息合并成一个批量消息,一次性发送出去。这样可以减少网络IO,提升消息发送的吞吐量。

示例代码:

​ 生产者核心代码:

List<Message> messages = new ArrayList<>();
messages.add(new Message(topic, "Tag", "OrderID001", "Hello world 0".getBytes()));
messages.add(new Message(topic, "Tag", "OrderID002", "Hello world 1".getBytes()));
messages.add(new Message(topic, "Tag", "OrderID003", "Hello world 2".getBytes()));

producer.send(messages);

注意点:

​ 批量消息的使用非常简单,但是要注意RocketMQ做了限制。同一批消息的Topic必须相同,另外,不支持延迟消息。

​ 还有批量消息的大小不要超过1M,如果太大就需要自行分割。

七、过滤消息

应用场景:

​ 同一个Topic下有多种不同的消息,消费者只希望关注某一类消息。

​ 例如,某系统中给仓储系统分配一个Topic,在Topic下,会传递过来入库、出库等不同的消息,仓储系统的不同业务消费者就需要过滤出自己感兴趣的消息,进行不同的业务操作。

示例代码1:简单过滤

​ 生产者端需要在发送消息时,增加Tag属性。比如我们上面举例当中的入库、出库。核心代码:

String[] tags = new String[] {"TagA", "TagB", "TagC"};

for (int i = 0; i < 15; i++) {
    Message msg = new Message("TagFilterTest",
        tags[i % tags.length],
        "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
    SendResult sendResult = producer.send(msg);
    System.out.printf("%s%n", sendResult);
}

消费者端就可以通过这个Tag属性订阅自己感兴趣的内容。核心代码:

consumer.subscribe("TagFilterTest", "TagA");

这样,后续Consumer就只会出处理TagA的消息。

示例代码2:SQL过滤

​ 通过Tag属性,只能进行简单的消息匹配。如果要进行更复杂的消息过滤,比如数字比较,模糊匹配等,就需要使用SQL过滤方式。SQL过滤方式可以通过Tag属性以及用户自定义的属性一起,以标准SQL的方式进行消息过滤。

​ 生产者端在发送消息时,出了Tag属性外,还可以增加自定义属性。核心代码:

String[] tags = new String[] {"TagA", "TagB", "TagC"};

for (int i = 0; i < 15; i++) {
    Message msg = new Message("SqlFilterTest",
        tags[i % tags.length],
        ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)
            );
    msg.putUserProperty("a", String.valueOf(i));

    SendResult sendResult = producer.send(msg);
    System.out.printf("%s%n", sendResult);
}

消费者端在进行过滤时,可以指定一个标准的SQL语句,定制复杂的过滤规则。核心代码:

consumer.subscribe("SqlFilterTest",
            MessageSelector.bySql("(TAGS is not null and TAGS in ('TagA', 'TagB'))" +
                "and (a is not null and a between 0 and 3)"));

实现思路:

​ 实际上,Tags和用户自定义的属性,都是随着消息一起传递的,所以,消费者端是可以拿到消息的Tags和自定义属性的。比如:

consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                ConsumeConcurrentlyContext context) {
                for (MessageExt msg : msgs) {
                    System.out.println(msg.getTags());
                    System.out.println(msg.getProperties());
                }
                System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

这样,剩下的就是在Consumer中对消息进行过滤了。Broker会在往Consumer推送消息时,在Broker端进行消息过滤。是Consumer感兴趣的消息,就往Consumer推送。

​ Tag属性的处理比较简单,就是直接匹配。而SQL语句的处理会比较麻烦一点。RocketMQ也是通过ANLTR引擎来解析SQL语句,然后再进行消息过滤的。

ANLTR是一个开源的SQL语句解析框架。很多开源产品都在使用ANLTR来解析SQL语句。比如ShardingSphere,Flink等。

注意点:

​ 1、使用Tag过滤时,如果希望匹配多个Tag,可以使用两个竖线(||)连接多个Tag值。另外,也可以使用星号(*)匹配所有。

​ 2、使用SQL顾虑时,SQL语句是按照SQL92标准来执行的。SQL语句中支持一些常见的基本操作:

  • 数值比较,比如:>,>=,<,<=,BETWEEN,=;
  • 字符比较,比如:=,<>,IN;
  • IS NULL 或者 IS NOT NULL;
  • 逻辑符号 AND,OR,NOT;

​ 2、消息过滤,其实在Broker端和在Consumer端都可以做。Consumer端也可以自行获取用户属性,不感兴趣的消息,直接返回不成功的状态,跳过该消息就行了。但是RocketMQ会在Broker端完成过滤条件的判断,只将Consumer感兴趣的消息推送给Consumer。这样的好处是减少了不必要的网络IO,但是缺点是加大了服务端的压力。不过在RocketMQ的良好设计下,更建议使用消息过滤机制。

​ 3、Consumer不感兴趣的消息并不表示直接丢弃。通常是需要在同一个消费者组,定制另外的消费者实例,消费那些剩下的消息。但是,如果一直没有另外的Consumer,那么,Broker端还是会推进Offset。

八、事务消息

应用场景:

​ 事务消息是RocketMQ非常有特色的一个高级功能。他的基础诉求是通过RocketMQ的事务机制,来保证上下游的数据一致性。

​ 以电商为例,用户支付订单这一核心操作的同时会涉及到下游物流发货、积分变更、购物车状态清空等多个子系统的变更。这种场景,非常适合使用RocketMQ的解耦功能来进行串联。

​ 考虑到事务的安全性,即要保证相关联的这几个业务一定是同时成功或者同时失败的。如果要将四个服务一起作为一个分布式事务来控制,可以做到,但是会非常麻烦。而使用RocketMQ在中间串联了之后,事情可以得到一定程度的简化。由于RocketMQ与消费者端有失败重试机制,所以,只要消息成功发送到RocketMQ了,那么可以认为Branch2.1,Branch2.2,Branch2.3这几个分支步骤,是可以保证最终的数据一致性的。这样,一个复杂的分布式事务问题,就变成了MinBranch1和Branch2两个步骤的分布式事务问题。

​ 然后,在此基础上,RocketMQ提出了事务消息机制,采用两阶段提交的思路,保证Main Branch1和Branch2之间的事务一致性。

具体的实现思路是这样的:

image.png

  1. 生产者将消息发送至Apache RocketMQ服务端。
  2. Apache RocketMQ服务端将消息持久化成功之后,向生产者返回Ack确认消息已经发送成功,此时消息被标记为"暂不能投递",这种状态下的消息即为半事务消息。
  3. 生产者开始执行本地事务逻辑。
  4. 生产者根据本地事务执行结果向服务端提交二次确认结果(Commit或是Rollback),服务端收到确认结果后处理逻辑如下:
    • 二次确认结果为Commit:服务端将半事务消息标记为可投递,并投递给消费者。
    • 二次确认结果为Rollback:服务端将回滚事务,不会将半事务消息投递给消费者。
  5. 在断网或者是生产者应用重启的特殊情况下,若服务端未收到发送者提交的二次确认结果,或服务端收到的二次确认结果为Unknown未知状态,经过固定时间后,服务端将对消息生产者即生产者集群中任一生产者实例发起消息回查。
  6. 生产者收到消息回查后,需要检查对应消息的本地事务执行的最终结果。
  7. 生产者根据检查到的本地事务的最终状态再次提交二次确认,服务端仍按照步骤4对半事务消息进行处理。

实现时的重点是使用RocketMQ提供的TransactionMQProducer事务生产者,在TransactionMQProducer中注入一个TransactionListener事务监听器来执行本地事务,以及后续对本地事务的检查。

注意点:

​ 1、半消息是对消费者不可见的一种消息。实际上,RocketMQ的做法是将消息转到了一个系统Topic,RMQ_SYS_TRANS_HALF_TOPIC。

​ 2、事务消息中,本地事务回查次数通过参数transactionCheckMax设定,默认15次。本地事务回查的间隔通过参数transactionCheckInterval设定,默认60秒。超过回查次数后,消息将会被丢弃。

​ 3、其实,了解了事务消息的机制后,在具体执行时,可以对事务流程进行适当的调整

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1021353.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

哈希及哈希表的实现

目录 一、哈希的引入 二、概念 三、哈希冲突 四、哈希函数 常见的哈希函数 1、直接定址法 2、除留余数法 五、哈希冲突的解决 1、闭散列 2、开散列 一、哈希的引入 顺序结构以及平衡树中&#xff0c;元素关键码与其存储位置之间没有对应的关系&#xff0c;因此在查找…

【Java 基础篇】Java标准输入流详解:读取用户输入的完整指南

Java是一门流行的编程语言&#xff0c;常用于开发各种类型的应用程序&#xff0c;包括控制台应用、桌面应用、Web应用等。在这些应用中&#xff0c;与用户进行交互是一项重要的任务。本文将重点介绍Java标准输入流&#xff0c;它是Java程序中用于从用户获取输入的关键组成部分。…

【unity小技巧】Unity 存储存档保存——PlayerPrefs、JsonUtility和MySQL数据库的使用

文章目录 前言PlayerPrefs一、基本介绍二、Demo三、优缺点 JsonUtility一、基本使用二、Demo三、优缺点 Mysql&#xff08;扩展&#xff09;完结 前言 游戏存档不言而喻&#xff0c;是游戏设计中的重要元素&#xff0c;可以提高游戏的可玩性&#xff0c;为玩家提供更多的自由和…

【JavaScript】video标签配置及相关事件:

文章目录 一、标签配置&#xff1a;二、事件&#xff1a;三、案例&#xff1a; 一、标签配置&#xff1a; 标签名描述src要播放的路径地址autoplay是否自动播放&#xff0c;默认值是false,&#xff08;Boolean&#xff09;loop是否循环播放&#xff0c;默认值是false,&#xf…

Hbase工作原理

Hbase&#xff1a;HBase 底层原理详解&#xff08;深度好文&#xff0c;建议收藏&#xff09; - 腾讯云开发者社区-腾讯云 Hbase架构图 同一个列族如果有多个store&#xff0c;那么这些store在不同的region Hbase写流程&#xff08;读比写慢&#xff09; MemStore Flush Hbas…

arm day2(9.15)数据操作指令,跳转指令,特殊功能寄存器指令,+XMind

作业 1.求最大公约数&#xff1a; .text .global _start _start:mov r0,#0x9mov r1,#0x15bl Loop Loop:cmp r0,r1 比较r0寄存器和r1寄存器的中的值beq stop 当两数相同时,退出程序subhi r0,r0,r1 r0>r1 r0 r0 - r1subcc r1,r1,r0 r0<r1 r1 r1 - r0mov pc,lr 恢复现…

自动驾驶行业观察之2023上海车展-----整体发展趋势

1.行业趋势 新能源势不可挡。 本次车展上首发了150多款新车&#xff0c;约有100款是新能源车;跨国车企全面电动化&#xff0c;但日韩系布局相对缓慢&#xff1b; 2.自主品牌 品牌持续向上 本届车展自主品牌开始疯狂向高端内卷&#xff0c;高端化态度坚决 &#xff08;包括仰…

Modbus RTU(Remote Terminal Unit)与RS-485协议介绍(主站设备(Master)、从站设备(Slave))

文章目录 Modbus RTU与RS-485协议介绍一、引言二、Modbus RTU 协议介绍2.1 Modbus RTU 协议简介2.2 Modbus RTU 协议帧结构主站设备、从站设备与从站设备地址2.3 Modbus RTU 协议举例 三、RS-485 协议介绍3.1 RS-485 协议简介3.2 RS-485 物理连接方式3.3 RS-485 与 Modbus RTU …

代码片段的理解

1.后面的error直接走的是失败的回调 例如:权限不足,可以理解为服务器的一种形式 2.前面走的是成功的回调 但是也可能不对,例如在传过去的参数,在数据库查询不了这个值,传递过来的值不一样&#xff0c;这样它也是走的成功回调。

提升前端开发效率:基于vue的van-radio-group组件封装指南

前言 vant 作为一款流行的 ui 框架&#xff0c;其中&#xff0c;van-radio-group 组件是一个常用的单选框组件&#xff0c;但有时我们需要根据项目需求进行定制化封装。本文将介绍如何基于 vue 框架封装 van-radio-group 组件&#xff0c;让我们一起来探索吧&#xff01; 封装文…

Python网络编程:构建网络应用与通信

&#x1f482; 个人网站:【工具大全】【游戏大全】【神级源码资源网】&#x1f91f; 前端学习课程&#xff1a;&#x1f449;【28个案例趣学前端】【400个JS面试题】&#x1f485; 寻找学习交流、摸鱼划水的小伙伴&#xff0c;请点击【摸鱼学习交流群】 Python是一门强大的编程…

node查询七牛云上的文件信息

const qiniu require(qiniu) const {getQiNiuKey, } require(./tools)//#region 七牛云 const { accessKey, secretKey } getQiNiuKey() const mac new qiniu.auth.digest.Mac(accessKey, secretKey) let config new qiniu.conf.Config() // 空间对应的机房 config.zone …

目标跟踪方向开源数据集资源汇总

Temple Color 128 数据集下载链接&#xff1a;http://suo.nz/2dKEEL 本数据集包含一大组 128 种颜色序列&#xff0c;带有基本事实和挑战因素注释&#xff08;例如&#xff0c;遮挡&#xff09; NfS高帧率视频数据集 数据集下载链接&#xff1a;http://suo.nz/34o8df 第一个…

旺店通·企业奇门与金蝶云星空对接集成订单查询连通销售订单新增(旺店通销售-金蝶销售订单-小红书)

旺店通企业奇门与金蝶云星空对接集成订单查询连通销售订单新增(旺店通销售-金蝶销售订单-小红书) 接通系统&#xff1a;旺店通企业奇门 慧策最先以旺店通ERP切入商家核心管理痛点——订单管理&#xff0c;之后围绕电商经营管理中的核心管理诉求&#xff0c;先后布局流量获取、会…

react-route的路由

React-Router是一个基于React的强大路由库&#xff0c;它可以帮助我们在React应用中实现页面之间的跳转和路由管理。本文将详细介绍React-Router的路由功能、常用功能模块、路由传参和路由嵌套&#xff0c;并提供相关代码和解释。 路由功能 React-Router通过管理URL和组件的映…

Vue3 ~

变动 实例 const app new Vue({}) Vue.use() Vue.mixin() Vue.component() Vue.directive()const app Vue.createApp({}) app.use() app.mixin() app.component() app.directive()createApp 代替 new Vue 允许多个根标签 createStore 代替 Vue.use(Vuex) createRouter 代替…

保障网络安全:IP代理识别API的作用与应用

引言 随着互联网的不断发展&#xff0c;网络安全问题已经变得愈发重要。在网络上&#xff0c;恶意用户可以利用IP代理隐藏其真实身份&#xff0c;从而发动各种网络攻击或欺诈行为。为了保障网络安全&#xff0c;IP代理识别API成为了一种不可或缺的工具&#xff0c;本文将深入探…

计算机竞赛 深度学习 opencv python 公式识别(图像识别 机器视觉)

文章目录 0 前言1 课题说明2 效果展示3 具体实现4 关键代码实现5 算法综合效果6 最后 0 前言 &#x1f525; 优质竞赛项目系列&#xff0c;今天要分享的是 &#x1f6a9; 基于深度学习的数学公式识别算法实现 该项目较为新颖&#xff0c;适合作为竞赛课题方向&#xff0c;学…

Apollo配置更新通知

文章目录 启用方式hook编写服务部署本地部署容器化部署构建镜像 使用 ⚡️: 应领导要求想要把 Apollo 配置变更信息更新到企业微信群中&#xff0c;线上出现异常可根据变更时间&#xff0c;快速反应是否是配置变更导致异常 启用方式 &#x1f31b;: 前提有一个可正常使用的Apo…

微服务保护-Sentinel

初识Sentinel 雪崩问题及解决方案 雪崩问题 微服务中&#xff0c;服务间调用关系错综复杂&#xff0c;一个微服务往往依赖于多个其它微服务。 如图&#xff0c;如果服务提供者I发生了故障&#xff0c;当前的应用的部分业务因为依赖于服务I&#xff0c;因此也会被阻塞。此时&a…