四.RocketMQ的几种消息发送方式应用

news2024/12/24 3:07:48

RocketMQ的几种消息发送方式应用

    • 一:普通消息
      • 1)发送同步消息
      • 2)发送异步消息
      • 3)单向发送消息
      • 4)消费消息-负载均衡模式
      • 5)消费消息-广播模式
    • 二:顺序消息
      • 1.顺序消息指的是:严格按照消息的发送顺序进行消费的消息(FIFO)。
      • 2.为什么需要顺序消息?
      • 3.有序性分类
      • 4.代码示例
    • 三:延时消息
      • 1.延时消息概览及适用场景
      • 2.延时等级
      • 3.代码示例
    • 四:批量消息
      • 1.批量发送消息
        • 1.1 发送限制
        • 1.2 生产者发送的消息大小
      • 2. 批量消费消息
        • 2.1 批量消费配置
        • 2.2 存在的问题
      • 3.代码示例
        • 3.1 定义消息列表分割器
        • 3.2 发送消息:
        • 3.3 消费消息:
        • 3.4 结果:
    • 五:过滤消息
      • 1.Tag过滤
        • 1.1 代码实现
      • 2.SQL过滤
        • 2.1 代码实现
    • 六:事务消息
      • 1.问题引入
      • 2.解决思路
      • 3.注意
      • 4.代码实现
        • 4.1生产者:
        • 4.2 消费者:
        • 4.3事务监听实现:
        • 4.4结果:

导入MQ启动依赖:

<dependency>
   <groupId>org.apache.rocketmq</groupId>
   <artifactId>rocketmq-spring-boot-starter</artifactId>
   <version>2.2.3</version>
</dependency>

yml配置:

rocketmq:
  name-server: 192.168.31.30:9876 # 访问地址
  producer:
    group: rocket-producer # 必须指定group
    send-message-timeout: 3000 # 消息发送超时时长,默认3s
    retry-times-when-send-failed: 3 # 同步发送消息失败重试次数,默认2
    retry-times-when-send-async-failed: 3 # 异步发送消息失败重试次数,默认2

工具类

package com.lmy.config.rocketmq;

import com.lmy.dto.rsp.User;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.apache.rocketmq.spring.support.RocketMQHeaders;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;
import com.lmy.utils.JsonUtil;

/**
 * @author : lmy
 * @date : 2023/12/9 下午 12:45
 * 生产者
 */
@Slf4j
@Component
public class MQProducerService {

    @Value("${rocketmq.producer.send-message-timeout}")
    private Integer messageTimeOut;

    // 建议正常规模项目统一用一个TOPIC
    private static final String topic = "RLT_TEST_TOPIC";

    @Autowired
    private RocketMQTemplate rocketMQTemplate;

    private ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2000), new
            ThreadFactory() {
                @Override
                public Thread newThread(Runnable r) {
                    Thread thread = new Thread(r);
                    thread.setName("client-transaction-msg-check-thread");
                    return thread;
                }
            });

    /**
     * 普通发送(这里的参数可以随意定义,可以发送个对象,也可以是字符串等)
     */
    public void send(String topic,String tag,Object obj) {
//        rocketMQTemplate.convertAndSend(topic + ":"+tag, obj);
        if (StringUtils.isNotEmpty(tag)) {
            tag = ":"+tag;
        }
        rocketMQTemplate.send(topic + tag, MessageBuilder.withPayload(obj).build()); // 等价于上面一行
    }

    /**
     * 发送同步消息(阻塞当前线程,等待broker响应发送结果,这样不太容易丢失消息)
     * (msgBody也可以是对象,sendResult为返回的发送结果)
     * setHeader: 在消息发送到RocketMQ时,这个键值对会被添加到消息的头部,以便在消息接收端进行识别和处理
     */
    public SendResult sendSyncMsg(String topic,String tag,Object msgBody) {
        if (StringUtils.isNotEmpty(tag)) {
            tag = ":"+tag;
        }
        SendResult sendResult = rocketMQTemplate.syncSend(topic+tag, MessageBuilder.withPayload(msgBody).setHeader(RocketMQHeaders.KEYS,"myKey").build());
        log.info("【sendMsg】sendResult={}", JsonUtil.objectToJson(sendResult));
        return sendResult;
    }

    /**
     * 发送异步消息(通过线程池执行发送到broker的消息任务,执行完后回调:在SendCallback中可处理相关成功失败时的逻辑)
     * (适合对响应时间敏感的业务场景)
     */
    public void sendAsyncMsg(String topic,String tag,String msgBody) {
        if (StringUtils.isNotEmpty(tag)) {
            tag = ":"+tag;
        }
        rocketMQTemplate.asyncSend(topic+tag, MessageBuilder.withPayload(msgBody).build(), new SendCallback() {
            @Override
            public void onSuccess(SendResult sendResult) {
                // 处理消息发送成功逻辑
                log.info("【sendAsyncMsg】sendResult={}", JsonUtil.objectToJson(sendResult));
            }
            @Override
            public void onException(Throwable throwable) {
                // 处理消息发送异常逻辑
                log.info("【sendAsyncMsg】发送失败:sendResult={}",throwable.getMessage());
            }
        });
    }

    /**
     * 发送顺序消息
     * @param topic
     * @param msgBody
     * 该方法的hashkey参数,RocketMQ会根据这个key来决定消息发送到哪个队列,具有相同hashkey的消息会发送到同一个队列。
     */
    public void syncSendOrderly(String topic,String tag,String msgBody,String hashKey) {
        if (StringUtils.isNotEmpty(tag)) {
            tag = ":"+tag;
        }
        SendResult sendResult = rocketMQTemplate.syncSendOrderly(topic + tag, MessageBuilder.withPayload(msgBody).build(), hashKey);
        System.out.println(String.format("SendResult status:%s, queueId:%d, body:%s",
                sendResult.getSendStatus(),
                sendResult.getMessageQueue().getQueueId(),
                msgBody));
    }

    /**
     * 发送延时消息(上面的发送同步消息,delayLevel的值就为0,因为不延时)
     * 在start版本中 延时消息一共分为18个等级分别为:1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
     */
    public void sendDelayMsg(String topic,String tag,String msgBody, int delayLevel) {
        if (StringUtils.isNotEmpty(tag)) {
            tag = ":"+tag;
        }
        rocketMQTemplate.syncSend(topic+tag, MessageBuilder.withPayload(msgBody).build(), messageTimeOut, delayLevel);
    }

    /**
     * 发送同步批量消息
     * @param topic
     * @param tag
     * @param messageList
     */
    public void sendBatchMsg(String topic, String tag, List<String> messageList) {
        List<Message> messages = new ArrayList<>();
        for (String message : messageList) {
            messages.add(new Message(topic,tag, message.getBytes()));
        }
        DefaultMQProducer producer = rocketMQTemplate.getProducer();
        //把大的消息分裂成若干个小的消息
        ListSplitter splitter = new ListSplitter(messages);
        while (splitter.hasNext()) {
            try {
                List<Message>  listItem = splitter.next();
                SendResult sendResult = producer.send(listItem);
                log.info("【sendMsg】sendResult={}", sendResult.getRawRespBody()+","+sendResult.getSendStatus());
            } catch (Exception e) {
                e.printStackTrace();
                //处理error
            }
        }

    }

    /**
     * 发送单向消息(只负责发送消息,不等待应答,不关心发送结果,如日志)
     */
    public void sendOneWayMsg(String topic,String tag,String msgBody) {
        if (StringUtils.isNotEmpty(tag)) {
            tag = ":"+tag;
        }
        rocketMQTemplate.sendOneWay(topic+tag, MessageBuilder.withPayload(msgBody).build());
    }

    /**
     * 发送带tag的消息,直接在topic后面加上":tag"
     */
    public SendResult sendTagMsg(String topic,String tag,Object msgBody) {
        if (StringUtils.isNotEmpty(tag)) {
            tag = ":"+tag;
        }
        return rocketMQTemplate.syncSend(topic + tag, MessageBuilder.withPayload(msgBody).build());
    }

    /**
     * 发送sql过滤的消息"
     */
    public SendResult sendSqlMsg(String topic, String tags, Object msgBody, Map<String,Object> propMap) {
        SendResult sendResult = new SendResult();
        try {
            DefaultMQProducer producer = rocketMQTemplate.getProducer();
            Message msg = new Message(topic, tags, msgBody.toString().getBytes());
            Set<Map.Entry<String, Object>> entries = propMap.entrySet();
            for (Map.Entry<String, Object> entry : entries) {
                String key = entry.getKey();
                Object value = entry.getValue();
                msg.putUserProperty(key, value + "");
            }
            sendResult = producer.send(msg);
            System.out.println(sendResult);
        }catch (Exception e) {
            e.printStackTrace();
        }
        return sendResult;
    }

    /**
     * 发送事务消息
     * @param topic
     * @param tags
     * @param msgBody
     */
    public TransactionSendResult TransactionMsg(String topic, String tags, Object msgBody) {
        TransactionMQProducer producer = (TransactionMQProducer)rocketMQTemplate.getProducer();
        // 为生产者指定一个线程池
        producer.setExecutorService(executorService);
        // 设置事务监听器
        producer.setTransactionListener(new TransactionListenerImpl());
        // 设置生产者组
        producer.setProducerGroup("Con_Group_9");

        // 生成生产事务id
        String transactionId = UUID.randomUUID().toString().replace("-", "");
        // 构建消息体
        org.springframework.messaging.Message<Object> message = MessageBuilder.withPayload(msgBody).setHeader(RocketMQHeaders.TRANSACTION_ID, transactionId).build();

        // 第三个参数用于指定在执行本地事务时要使用的业务参数
        TransactionSendResult transactionSendResult = rocketMQTemplate.sendMessageInTransaction(topic + ":" + tags, message, "业务参数:" + msgBody);

        return transactionSendResult;
    }

}
package com.lmy.controller.rocketmq.consumer;

import com.lmy.utils.JsonUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.spring.annotation.MessageModel;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;
import org.springframework.stereotype.Service;

/**
 * @author : lmy
 * @date : 2023/12/9 下午 12:51
 * 消费者
 *
 * consumerGroup:是消费者的逻辑分组,用于使同一个 consumerGroup 下的消费者消费同一类消息。
 * consumerGroup 的作用主要有以下 2 点:
 *  1. 实现负载均衡。同一个 consumerGroup 下的消费者会均匀地消费同一类消息,不会重复消费。
 *  2. 实现消息重试。当某个消费者挂掉时,其它消费者会继续消费该消费者未消费的消息。
 *
 * topic:是消息的逻辑分类,用于将消息分发到不同的消费者
 * topic 的作用主要有以下 2 点:
 * 1. 实现消息分发。当消息发送到 RocketMQ 服务器时,会根据 topic 将消息分发到不同的消费者。
 * 2. 实现消息过滤。消费者可以通过指定 topic 来过滤消息。
 *
 * selectorType:是用于指定消息选择器的类型的属性。
 * selectorType 的作用:
 * 根据消息的属性或标签进行消息过滤,以便只有符合特定条件的消息才会被消费者消费。
 * MessageSelectorType.TAG:表示消息选择器的类型是基于标签进行过滤。只有标签匹配的消息才会被消费者消费。
 * MessageSelectorType.SQL92:它可以使用SQL92语法作为过滤规则表达式
 *
 * selectorExpression:是用于指定消息选择器的表达式的属性
 * selectorExpression 的作用:
 * 根据消息的属性或标签进行消息过滤,以便只有符合特定条件的消息才会被消费者消费。
 * 在使用@RocketMQMessageListener 注解标注消费者类时,通过 selectorExpression 属性指定消息选择器的表达式
 * 例:selectorExpression = "tag1 || tag2"
 *
 * consumeMode:用于指定消费者消费模式的属性
 * consumeMode的作用:
 * ConsumeMode.ORDERLY:有序消费。在这种模式下,消费者会按照消息发送的顺序来消费消息。
 * ConsumeMode.CONCURRENTLY:并发消费。在这种模式下,消费者可以并发地消费消息
 *
 * messageModel:用于指定消息模型的属性
 * messageModel的作用:
 * MessageModel.CLUSTERING:负载均衡模式。在这种模式下,消息会被发送给其中一个订阅该主题的消费者。
 * MessageModel.BROADCASTING:广播模式。在这种模式下,消息会被发送给所有订阅该主题的消费者。
 *
 * consumeThreadNumber:用于指定消费者线程数的属性
 * RocketMQ中consumeThreadNumber的默认值是1,表示使用单线程消费消息。如果消息量很大,可以通过增加 consumeThreadNumber 来提高消费性能。
 *
 * maxReconsumeTimes:用于指定消息最大重试次数的属性
 * RocketMQ中maxReconsumeTimes 的默认值是 3,表示消息最多可以重试 3 次。如果消息在重试 3 次后仍然没有被消费,那么该消息将被丢弃。
 * 注意:maxReconsumeTimes 的设置会影响消费者的消费性能。如果消息量很大,建议减少 maxReconsumeTimes 的值,以提高消费性能。但是,如果消息量不大,减少 maxReconsumeTimes 的值可能会导致消息丢失。
 *
 * consumeTimeout:用于指定消息消费超时时间的属性
 * RocketMQ中consumeTimeout 的默认值是 10000 毫秒,表示消息消费超时时间为 10 秒。如果消息在 10 秒内没有被消费,那么该消息将被重新投递。
 * 注意:consumeTimeout 的设置会影响消费者的消费性能。如果消息量很大,建议减少 consumeTimeout 的值,以提高消费性能。但是,如果消息量不大,减少 consumeTimeout 的值可能会导致消息丢失。
 *
 * replyTimeout:用于指定消息回复超时时间的属性。
 * RocketMQ中replyTimeout 的默认值是 3000 毫秒,表示消息回复超时时间为 3 秒。如果消息在 3 秒内没有收到回复,那么该消息将被重新投递。
 * 注意:replyTimeout 的设置会影响消费者的消费性能。如果消息量很大,建议减少 replyTimeout 的值,以提高消费性能。但是,如果消息量不大,减少 replyTimeout 的值可能会导致消息丢失。
 *
 * enableMsgTrace:用于指定是否开启消息跟踪的属性
 * enableMsgTrace的默认值是 false,表示不开启消息跟踪。如果开启消息跟踪,那么消费者可以通过消息 ID 查询消息的消费状态。
 * 注意:enableMsgTrace 的设置会影响消费者的消费性能。如果开启消息跟踪,那么消费者需要额外消耗资源来记录消息的消费状态。如果消息量很大,建议关闭消息跟踪。
 *
 * tlsEnable:用于指定是否开启 TLS 加密的属性
 * tlsEnable 的默认值是 false,表示不开启 TLS 加密。如果开启 TLS 加密,那么消费者和生产者之间的通信将使用 TLS 协议进行加密。
 * 注意:tlsEnable 的设置会影响消费者的消费性能。如果开启 TLS 加密,那么消费者需要额外消耗资源来进行 TLS 加密和解密。如果消息量很大,建议关闭 TLS 加密。
 *
 * namespace:用于指定消息的命名空间的属性
 * namespace 的默认值是 default,表示使用默认的命名空间。如果需要使用自定义的命名空间,可以通过 namespace 属性指定。
 * 注意:namespace 的设置会影响消息的消费路由。如果 namespace 设置不正确,那么消息可能会被错误地消费。
 *
 * delayLevelWhenNextConsume:用于指定消息下次消费的延迟级别的属性
 * delayLevelWhenNextConsume 的默认值是 0,表示消息下次消费的延迟级别为 0 级。如果需要设置消息下次消费的延迟级别,可以通过 delayLevelWhenNextConsume 属性指定
 * 注意:delayLevelWhenNextConsume 的设置会影响消息的消费时间。如果 delayLevelWhenNextConsume 设置过高,那么消息可能会被延迟很长时间才能被消费。
 *
 * suspendCurrentQueueTimeMillis:用于指定暂停当前队列的毫秒数的属性
 * suspendCurrentQueueTimeMillis 的默认值是 -1,表示不暂停当前队列。如果需要暂停当前队列,可以通过 suspendCurrentQueueTimeMillis 属性指定暂停的时间。
 * 注意:suspendCurrentQueueTimeMillis 的设置会影响消息的消费时间。如果 suspendCurrentQueueTimeMillis 设置过高,那么消息可能会被延迟很长时间才能被消费。
 *
 * awaitTerminationMillisWhenShutdown:用于指定消费者在关闭时等待终止的时间的属性。
 * awaitTerminationMillisWhenShutdown 的默认值是 0,表示消费者在关闭时不等待终止。如果需要消费者在关闭时等待一段时间后终止,可以通过 awaitTerminationMillisWhenShutdown 属性指定等待的时间。
 * 注意:awaitTerminationMillisWhenShutdown 的设置会影响消费者的关闭时间。如果设置的等待时间过长,消费者可能需要等待较长时间才能完全关闭。
 *
 * instanceName:用于指定消费者实例名称的属性
 * instanceName 的作用是为了在一个进程中创建多个消费者实例,每个实例可以独立运行和管理。
 * 这对于需要同时消费多个主题或者在不同的消费者组中使用相同的消费者类来处理消息时非常有用。
 *
 */
@Slf4j
@Component
public class MQConsumerService {

    /**
     *
     * topic需要和生产者的topic一致,consumerGroup属性是必须指定的,内容可以随意
     * selectorExpression的意思指的就是tag,默认为“*”,不设置的话会监听所有消息
     * messageModel可设置消费者模式
     *   1.CLUSTERING:消费者采用负载均衡方式消费消息,多个消费者共同消费队列消息,每个消费者处理的消息不同
     *   2.BROADCASTING:消费者采用广播的方式消费消息,每个消费者消费的消息都是相同的
     */
    @Service
    @RocketMQMessageListener(topic = "RLT_TEST_TOPIC", selectorExpression = "tag1",messageModel = MessageModel.CLUSTERING,consumerGroup = "Con_Group_One")
    public class ConsumerSend implements RocketMQListener<Object> {
        // 监听到消息就会执行此方法
        @Override
        public void onMessage(Object obj) {
            log.info("监听到消息:Object={}", JsonUtil.objectToJson(obj));
        }
    }

    // 注意:这个ConsumerSend2和上面ConsumerSend在没有添加tag做区分时,不能共存,
    // 不然生产者发送一条消息,这两个都会去消费,如果类型不同会有一个报错,所以实际运用中最好加上tag,写这只是让你看知道就行
//    @Service
//    @RocketMQMessageListener(topic = "RLT_TEST_TOPIC", consumerGroup = "Con_Group_Two")
//    public class ConsumerSend2 implements RocketMQListener<String> {
//        @Override
//        public void onMessage(String str) {
//            log.info("监听到消息:str={}", str);
//        }
//    }

    // MessageExt:是一个消息接收通配符,不管发送的是String还是对象,都可接收,当然也可以像上面明确指定类型(我建议还是指定类型较方便)
    @Service
    @RocketMQMessageListener(topic = "RLT_TEST_TOPIC", selectorExpression = "tag2",messageModel = MessageModel.CLUSTERING, consumerGroup = "Con_Group_Three")
    public class Consumer implements RocketMQListener<MessageExt> {
        @Override
        public void onMessage(MessageExt messageExt) {
            byte[] body = messageExt.getBody();
            String msg = new String(body);
            log.info("监听到消息:msg={},Key:{}", msg,messageExt.getKeys());
        }
    }

    @Service
    @RocketMQMessageListener(topic = "RLT_TEST_TOPIC", selectorExpression = "tag3",messageModel = MessageModel.CLUSTERING,consumeMode= ConsumeMode.ORDERLY, consumerGroup = "Con_Group_Four")
    public class orderConsumer implements RocketMQListener<MessageExt> {
        @Override
        public void onMessage(MessageExt messageExt) {
            byte[] body = messageExt.getBody();
            String msg = new String(body);
            log.info("监听到顺序消息:msg={},Key:{}", msg,messageExt.getKeys());
        }
    }

    @Service
    @RocketMQMessageListener(topic = "RLT_TEST_TOPIC", selectorExpression = "tag4",messageModel = MessageModel.CLUSTERING,consumerGroup = "Con_Group_Five")
    public class delayMsgConsumer implements RocketMQListener<Object> {
        // 监听到消息就会执行此方法
        @Override
        public void onMessage(Object obj) {
            log.info("监听到延迟消息:Object={},消费时间={}", JsonUtil.objectToJson(obj),new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()));
        }
    }

    @Service
    @RocketMQMessageListener(topic = "RLT_TEST_TOPIC", selectorExpression = "tag5", consumerGroup = "Con_Group_Sex")
    public class batchConsumer implements RocketMQListener<MessageExt> {
        @Override
        public void onMessage(MessageExt messageExt) {
            byte[] body = messageExt.getBody();
            String msg = new String(body);
            log.info("监听到批量消息:msg={},Key:{}", msg,messageExt.getKeys());
        }
    }

    @Service
    @RocketMQMessageListener(topic = "RLT_TEST_TOPIC", selectorExpression = "tag5 || tag6", consumerGroup = "Con_Group_7")
    public class tagConsumer implements RocketMQListener<MessageExt> {
        @Override
        public void onMessage(MessageExt messageExt) {
            byte[] body = messageExt.getBody();
            String msg = new String(body);
            log.info("监听到过滤消息:msg={},Key:{}", msg,messageExt.getKeys());
        }
    }

    @Service
    @RocketMQMessageListener(topic = "RLT_TEST_TOPIC",selectorType = SelectorType.SQL92, selectorExpression = "age between 0 and 6", consumerGroup = "Con_Group_8")
    public class SqlConsumer implements RocketMQListener<MessageExt> {
        @Override
        public void onMessage(MessageExt messageExt) {
            byte[] body = messageExt.getBody();
            String msg = new String(body);
            log.info("监听到sql过滤消息:msg={},Key:{}", msg,messageExt.getKeys());
        }
    }

    @Service
    @RocketMQMessageListener(topic = "RLT_TEST_TOPIC", selectorExpression = "tag8 || tag9 || tag10", consumerGroup = "Con_Group_9")
    public class TransactionConsumer implements RocketMQListener<MessageExt> {
        @Override
        public void onMessage(MessageExt messageExt) {
            byte[] body = messageExt.getBody();
            String msg = new String(body);
            log.info("监听到事务消息:msg={},Key:{}", msg,messageExt.getKeys());
        }
    }

}


消息发送者步骤:

1.创建消息生产者producer,并制定生产者组名
2.指定Nameserver地址
3.启动producer
4.创建消息对象,指定主题TopicTag和消息体
5.发送消息
6.关闭生产者producer

消息消费者步骤:

1.创建消费者Consumer,制定消费者组名
2.指定Nameserver地址
3.订阅主题TopicTag
4.设置回调函数,处理消息
5.启动消费者consumer

一:普通消息

1)发送同步消息

同步发送消息是指,Producer发出⼀条消息后,会在收到MQ返回的ACK之后才发下⼀条消息。该方式的消息可靠性最高,但消息发送效率太低。

这种可靠性同步地发送方式使用的比较广泛,比如:重要的消息通知,短信通知。
在这里插入图片描述

@ApiOperation("发送同步消息")
@GetMapping("/syncMessageProducer")
public void syncMessageProducer() {
    String msg = "同步消息体";
    for (int i = 0; i < 100; i++) {
        SendResult sendResult = mqProducerService.sendSyncMsg("RLT_TEST_TOPIC","tag1",msg + i);
        System.out.printf("%s%n", sendResult);
    }
}

2)发送异步消息

异步发送消息是指,Producer发出消息后无需等待MQ返回ACK,直接发送下⼀条消息。该方式的消息可靠性可以得到保障,消息发送效率也可以。

异步消息通常用在对响应时间敏感的业务场景,即发送端不能容忍长时间地等待Broker的响应。
在这里插入图片描述

@ApiOperation("发送异步消息")
@GetMapping("/sendAsyncMsg")
public void sendAsyncMsg() {
    String msg = "异步消息体";
    for (int i = 0; i < 100; i++) {
        mqProducerService.sendAsyncMsg("RLT_TEST_TOPIC","tag2",msg + i);
    }
}

3)单向发送消息

单向发送消息是指,Producer仅负责发送消息,不等待、不处理MQ的ACK。该发送方式时MQ也不返回ACK。该方式的消息发送效率最高,但消息可靠性较差。
这种方式主要用在不特别关心发送结果的场景,例如日志发送。
在这里插入图片描述

@ApiOperation("发送单向消息")
@GetMapping("/sendOneWayMsg")
public void sendOneWayMsg() {
   String msg = "单向消息体";
   for (int i = 0; i < 100; i++) {
       mqProducerService.sendOneWayMsg("RLT_TEST_TOPIC","tag2",msg + i);
   }
}

4)消费消息-负载均衡模式

消费者采用负载均衡方式消费消息,多个消费者共同消费队列消息,每个消费者处理的消息不同

生产消息:

@ApiOperation("发送同步消息")
@GetMapping("/syncMessageProducer")
public void syncMessageProducer() {
    String msg = "同步消息体";
    for (int i = 0; i < 100; i++) {
        SendResult sendResult = mqProducerService.sendSyncMsg("RLT_TEST_TOPIC","tag1",msg + i);
        System.out.printf("%s%n", sendResult);
    }
}

消费消息:

package com.lmy.config.rocketmq;

import com.lmy.utils.JsonUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.spring.annotation.MessageModel;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;
import org.springframework.stereotype.Service;

/**
 * @author : lmy
 * @date : 2023/12/9 下午 12:51
 * 消费者
 */
@Slf4j
@Component
public class MQConsumerService {

    /**
     * topic需要和生产者的topic一致,consumerGroup属性是必须指定的,内容可以随意
     * selectorExpression的意思指的就是tag,默认为“*”,不设置的话会监听所有消息
     * messageModel可设置消费者模式
     *   1.CLUSTERING:消费者采用负载均衡方式消费消息,多个消费者共同消费队列消息,每个消费者处理的消息不同
     *   2.BROADCASTING:消费者采用广播的方式消费消息,每个消费者消费的消息都是相同的
     */
    @Service
    @RocketMQMessageListener(topic = "RLT_TEST_TOPIC", selectorExpression = "tag1",messageModel = MessageModel.CLUSTERING,consumerGroup = "Con_Group_One")
    public class ConsumerSend implements RocketMQListener<Object> {
        // 监听到消息就会执行此方法
        @Override
        public void onMessage(Object obj) {
            log.info("监听到消息:Object={}", JsonUtil.objectToJson(obj));
        }
    }

    // MessageExt:是一个消息接收通配符,不管发送的是String还是对象,都可接收,当然也可以像上面明确指定类型(我建议还是指定类型较方便)
    @Service
    @RocketMQMessageListener(topic = "RLT_TEST_TOPIC", selectorExpression = "tag1",messageModel = MessageModel.CLUSTERING, consumerGroup = "Con_Group_One")
    public class Consumer implements RocketMQListener<MessageExt> {
        @Override
        public void onMessage(MessageExt messageExt) {
            byte[] body = messageExt.getBody();
            String msg = new String(body);
            log.info("监听到消息:msg={},Key:{}", msg,messageExt.getKeys());
        }
    }
}

响应结果:
在这里插入图片描述

5)消费消息-广播模式

消费者采用广播的方式消费消息,每个消费者消费的消息都是相同的

生产消息:

@ApiOperation("发送同步消息")
@GetMapping("/syncMessageProducer")
public void syncMessageProducer() {
    String msg = "同步消息体";
    for (int i = 0; i < 100; i++) {
        SendResult sendResult = mqProducerService.sendSyncMsg("RLT_TEST_TOPIC","tag1",msg + i);
        System.out.printf("%s%n", sendResult);
    }
}

消费消息:

package com.lmy.config.rocketmq;

import com.lmy.utils.JsonUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.spring.annotation.MessageModel;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;
import org.springframework.stereotype.Service;

/**
 * @author : lmy
 * @date : 2023/12/9 下午 12:51
 * 消费者
 */
@Slf4j
@Component
public class MQConsumerService {

    /**
     * topic需要和生产者的topic一致,consumerGroup属性是必须指定的,内容可以随意
     * selectorExpression的意思指的就是tag,默认为“*”,不设置的话会监听所有消息
     * messageModel可设置消费者模式
     *   1.CLUSTERING:消费者采用负载均衡方式消费消息,多个消费者共同消费队列消息,每个消费者处理的消息不同
     *   2.BROADCASTING:消费者采用广播的方式消费消息,每个消费者消费的消息都是相同的
     */
    @Service
    @RocketMQMessageListener(topic = "RLT_TEST_TOPIC", selectorExpression = "tag1",messageModel = MessageModel.BROADCASTING,consumerGroup = "Con_Group_One")
    public class ConsumerSend implements RocketMQListener<Object> {
        // 监听到消息就会执行此方法
        @Override
        public void onMessage(Object obj) {
            log.info("监听到消息:Object={}", JsonUtil.objectToJson(obj));
        }
    }

    // MessageExt:是一个消息接收通配符,不管发送的是String还是对象,都可接收,当然也可以像上面明确指定类型(我建议还是指定类型较方便)
    @Service
    @RocketMQMessageListener(topic = "RLT_TEST_TOPIC", selectorExpression = "tag1",messageModel = MessageModel.BROADCASTING, consumerGroup = "Con_Group_Three")
    public class Consumer implements RocketMQListener<MessageExt> {
        @Override
        public void onMessage(MessageExt messageExt) {
            byte[] body = messageExt.getBody();
            String msg = new String(body);
            log.info("监听到消息:msg={},Key:{}", msg,messageExt.getKeys());
        }
    }
}

响应结果:
在这里插入图片描述

二:顺序消息

1.顺序消息指的是:严格按照消息的发送顺序进行消费的消息(FIFO)。

默认情况下生产者会把消息以Round Robin轮询方式发送到不同的Queue分区队列;而消费消息时会从多个Queue上拉取消息,这种情况下的发送和消费是不能保证顺序的。如果将消息仅发送到同一个Queue中,消费时也只从这个Queue上拉取消息,就严格保证了消息的顺序性。

2.为什么需要顺序消息?

例如,现在有TOPIC ORDER_STATUS (订单状态),其下有4个Queue队列,该Topic中的不同消息用于描述当前订单的不同状态。假设订单有状态:未支付、已支付、发货中、发货成功、发货失败。

根据以上订单状态,生产者从时序上可以生成如下几个消息:
订单T0000001:未支付 --> 订单T0000001:已支付 --> 订单T0000001:发货中 --> 订单T0000001:发货失败

消息发送到MQ中之后,Queue的选择如果采用轮询策略,消息在MQ的存储可能如下:
在这里插入图片描述
这种情况下,我们希望Consumer消费消息的顺序和我们发送是一致的,然而上述MQ的投递和消费方式,我们无法保证顺序是正确的。对于顺序异常的消息,Consumer即使设置有一定的状态容错,也不能完全处理好这么多种随机出现组合情况。

在这里插入图片描述
基于上述的情况,可以设计如下方案:对于相同订单号的消息,通过一定的策略,将其放置在一个Queue中,然后消费者再采用一定的策略(例如,一个线程独立处理一个queue,保证处理消息的顺序性),能够保证消费的顺序性。

3.有序性分类

根据有序范围的不同,RocketMQ可以严格地保证两种消息的有序性:分区有序全局有序
1)全局有序
当发送和消费参与的Queue只有一个时所保证的有序是整个Topic中消息的顺序, 称为全局有序。

在创建Topic时指定Queue的数量。有三种指定方式:

  • 在代码中创建Producer时,可以指定其自动创建的Topic的Queue数量
  • 在RocketMQ可视化控制台中手动创建Topic时指定Queue数量
  • 使用mqadmin命令手动创建Topic时指定Queue数量

在这里插入图片描述
2)分区有序
如果有多个Queue参与,其仅可保证在该Queue分区队列上的消息顺序,则称为分区有序。

如何实现Queue的选择:
一般我们给出唯一且不重复的key(例如订单号),让key(或其hash值)与该Topic所包含的Queue的数量取模,其结果即为选择出的Queue的QueueId。以下是源码:
在这里插入图片描述

取模算法存在一个问题:不同选择key与Queue数量取模结果可能会是相同的,即不同选择key的消息可能会出现在相同的Queue,即同一个Consuemr可能会消费到不同选择key的消息。这个问题如何解决?一般性的作法是,从消息中获取到选择key,对其进行判断。

但是不属于那个Consumer的消息被拉取走了,那么应该消费该消息的Consumer是否还能再消费该消息呢?同一个Queue中的消息不可能被同一个Group中的不同Consumer同时消费。所以,消费现一个Queue的不同选择key的消息的Consumer一定属于不同的Group。而不同的Group中的Consumer间的消费是相互隔离的,互不影响的
在这里插入图片描述

4.代码示例

/**
 * 订单的步骤
 */
 private static class OrderStep {
     private long orderId;
     private String desc;

     public long getOrderId() {
         return orderId;
     }

     public void setOrderId(long orderId) {
         this.orderId = orderId;
     }

     public String getDesc() {
         return desc;
     }

     public void setDesc(String desc) {
         this.desc = desc;
     }

     @Override
     public String toString() {
         return "OrderStep{" +
                 "orderId=" + orderId +
                 ", desc='" + desc + '\'' +
                 '}';
     }
 }
/**
 * 生成模拟订单数据
 */
private List<OrderStep> buildOrders() {
    List<OrderStep> orderList = new ArrayList<OrderStep>();

    OrderStep orderDemo = new OrderStep();
    orderDemo.setOrderId(15103111039L);
    orderDemo.setDesc("未支付");
    orderList.add(orderDemo);

    orderDemo = new OrderStep();
    orderDemo.setOrderId(15103111065L);
    orderDemo.setDesc("未支付");
    orderList.add(orderDemo);

    orderDemo = new OrderStep();
    orderDemo.setOrderId(15103117235L);
    orderDemo.setDesc("未支付");
    orderList.add(orderDemo);

    orderDemo = new OrderStep();
    orderDemo.setOrderId(15103111065L);
    orderDemo.setDesc("已支付");
    orderList.add(orderDemo);

    orderDemo = new OrderStep();
    orderDemo.setOrderId(15103111039L);
    orderDemo.setDesc("已支付");
    orderList.add(orderDemo);

    orderDemo = new OrderStep();
    orderDemo.setOrderId(15103117235L);
    orderDemo.setDesc("已支付");
    orderList.add(orderDemo);

    orderDemo = new OrderStep();
    orderDemo.setOrderId(15103111065L);
    orderDemo.setDesc("发货中");
    orderList.add(orderDemo);

    orderDemo = new OrderStep();
    orderDemo.setOrderId(15103117235L);
    orderDemo.setDesc("发货中");
    orderList.add(orderDemo);

    orderDemo = new OrderStep();
    orderDemo.setOrderId(15103111039L);
    orderDemo.setDesc("发货中");
    orderList.add(orderDemo);

    orderDemo = new OrderStep();
    orderDemo.setOrderId(15103117235L);
    orderDemo.setDesc("发货失败");
    orderList.add(orderDemo);

    orderDemo = new OrderStep();
    orderDemo.setOrderId(15103111065L);
    orderDemo.setDesc("发货成功");
    orderList.add(orderDemo);

    orderDemo = new OrderStep();
    orderDemo.setOrderId(15103111039L);
    orderDemo.setDesc("发货成功");
    orderList.add(orderDemo);

    return orderList;
}

生产消息:

@ApiOperation("发送顺序消息")
@GetMapping("/syncSendOrderly")
public void syncSendOrderly() {
    String msg = "顺序消息体";
    for (int i = 0; i < 12; i++) {
        // 订单列表
        List<OrderStep> orderList = new RocketMQController().buildOrders();
        mqProducerService.syncSendOrderly("RLT_TEST_TOPIC","tag3",msg +orderList.get(i).toString() ,orderList.get(i).getOrderId()+"");
    }
}

消息消费:

@Service
@RocketMQMessageListener(topic = "RLT_TEST_TOPIC", selectorExpression = "tag3",messageModel = MessageModel.CLUSTERING,consumeMode= ConsumeMode.ORDERLY, consumerGroup = "Con_Group_Four")
public class orderConsumer implements RocketMQListener<MessageExt> {
    @Override
    public void onMessage(MessageExt messageExt) {
        byte[] body = messageExt.getBody();
        String msg = new String(body);
        log.info("监听到顺序消息:msg={},Key:{}", msg,messageExt.getKeys());
    }
}

响应结果:
在这里插入图片描述

三:延时消息

1.延时消息概览及适用场景

当消息写入到Broker后,在指定的时长后才可被消费处理的消息,称为延时消息。

采用RocketMQ的延时消息可以实现定时任务的功能,而无需使用定时器。典型的应用场景是,电商交易中超时未支付关闭订单的场景,提交一个订单时可以发送一个延时消息,1h后去检查这个订单的状态,如果还是未付款就取消订单释放库存。

2.延时等级

延时消息的延迟时长不支持随意时长的延迟,是通过特定的延迟等级来指定的。
延时等级定义在RocketMQ服务端的MessageStoreConfig类中的如下变量中:

// org/apache/rocketmq/store/config/MessageStoreConfig.java
private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";

即,若指定的延时等级为3,则表示延迟时长为10s,即延迟等级是从1开始计数的。

如果需要自定义的延时等级,可以通过在broker加载的配置中新增如下配置(例如下面增加了1天这个等级1d)。配置文件在RocketMQ安装目录下的conf目录中。

messageDelayLevel = 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m
1h 2h 1d

3.代码示例

延迟消息生产者:

@ApiOperation("发送延时消息")
@GetMapping("/sendDelayMsg")
public void sendDelayMsg() {
    String msg = "延时消息体发出时间";
    for (int i = 0; i < 12; i++) {
        String msgBody = msg + i + ":" + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date());
        mqProducerService.sendDelayMsg("RLT_TEST_TOPIC","tag4",msgBody ,3);
    }
}

延迟消息消费者:

@Service
@RocketMQMessageListener(topic = "RLT_TEST_TOPIC", selectorExpression = "tag4",messageModel = MessageModel.CLUSTERING,consumerGroup = "Con_Group_Five")
public class delayMsgConsumer implements RocketMQListener<Object> {
    // 监听到消息就会执行此方法
    @Override
    public void onMessage(Object obj) {
        log.info("监听到延迟消息:Object={},消费时间={}", JsonUtil.objectToJson(obj),new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()));
    }
}

结果:
在这里插入图片描述

四:批量消息

生产者批量发送消息能显著提高传递小消息的性能。

1.批量发送消息

1.1 发送限制
  • 批量发送的消息必须具有相同的Topic
  • 批量发送的消息必须具有相同的刷盘策略
  • 批量发送的消息不能是延时消息与事务消息
  • 默认情况下,一批发送的消息总大小不能超过4MB字节,若想调整大小方法如下
    1)将批量消息进行拆分,拆分为若干不大于4M的消息集合分多次批量发送
    2)在Producer端与Broker端修改属性
    Producer端需要在发送之前设置Producer的maxMessageSize属性
    Broker端需要修改其加载的配置文件中的maxMessageSize属性
1.2 生产者发送的消息大小

生产者通过send()方法发送的Message,并不是直接将Message序列化后发送到网络上的,而是通过这个Message生成了一个字符串发送出去的。这个字符串由四部分构成:Topic、消息Body、消息日志(占20字节),及用于描述消息的一堆属性key-value。
在这里插入图片描述

2. 批量消费消息

2.1 批量消费配置

Consumer的MessageListenerConcurrently监听接口的consumeMessage()方法的第一个参数为消息列表,但默认情况下每次只能消费一条消息。若要使其一次可以消费多条消息,则可以通过修改Consumer的consumeMessageBatchMaxSize属性来指定。但是,该值不能超过32。因为默认情况下消费者每次可以拉取的消息最多是32条。若要修改一次拉取的最大值,则可通过修改Consumer的pullBatchSize属性来指定。
在这里插入图片描述

2.2 存在的问题
  • pullBatchSize值设置的越大,Consumer每拉取一次需要的时间就会越长,且在网络上传输出现问题的可能性就越高。若在拉取过程中若出现了问题,那么本批次所有消息都需要全部重新拉取。
  • consumeMessageBatchMaxSize值设置的越大,Consumer的消息并发消费能力越低,且这批被消费的消息具有相同的消费结果。因为consumeMessageBatchMaxSize指定的一批消息只会使用一个线程进行处理,且在处理过程中只要有一个消息处理异常,则这批消息需要全部重新再次消费处理。

3.代码示例

3.1 定义消息列表分割器
package com.lmy.utils.rocketMq;

import org.apache.rocketmq.common.message.Message;

import java.util.Iterator;
import java.util.List;
import java.util.Map;

/**
 * @author : lmy
 * @date : 2023/12/24 上午 11:58
 * 定义消息列表分割器,将消息列表分割为多个不超出4M大小的小列表
 * 消息列表分割器:其只会处理每条消息的大小不超4M的情况。
 * 若存在某条消息,其本身大小大于4M,这个分割器无法处理,
 * 其直接将这条消息构成一个子列表返回。并没有再进行分割
 */
public class ListSplitter implements Iterator<List<Message>> {
    // 指定极限值为4M
    private int SIZE_LIMIT = 1024 * 1024 * 4;
    // 存放所有要发送的消息
    private final List<Message> messages;
    // 要进行批量发送消息的小集合起始索引
    private int currIndex;
    public ListSplitter(List<Message> messages) {
        this.messages = messages;
    }

    @Override
    public boolean hasNext() {
        // 判断当前开始遍历的消息索引要小于消息总数
        return currIndex < messages.size();
    }
    @Override
    public List<Message> next() {
        int nextIndex = currIndex;
        // 记录当前要发送的这一小批次消息列表的大小
        int totalSize = 0;
        for (; nextIndex < messages.size(); nextIndex++) {
            // 获取当前遍历的消息
            Message message = messages.get(nextIndex);
            // 统计当前遍历的message的大小
            int tmpSize = message.getTopic().length() + message.getBody().length;
            Map<String, String> properties = message.getProperties();
            for (Map.Entry<String, String> entry : properties.entrySet()) {
                tmpSize += entry.getKey().length() + entry.getValue().length();
            }
            tmpSize = tmpSize + 20; // 增加日志的开销20字节
            // 判断当前消息本身是否大于4M
            if (tmpSize > SIZE_LIMIT) {
                //单个消息超过了最大的限制
                //忽略,否则会阻塞分裂的进程
                if (nextIndex - currIndex == 0) {
                    //假如下一个子列表没有元素,则添加这个子列表然后退出循环,否则只是退出循环
                    nextIndex++;
                }
                break;
            }
            if (tmpSize + totalSize > SIZE_LIMIT) {
                break;
            } else {
                totalSize += tmpSize;
            }

        }
        // 获取当前messages列表的子集合[currIndex, nextIndex)
        List<Message> subList = messages.subList(currIndex, nextIndex);
        // 下次遍历的开始索引
        currIndex = nextIndex;
        return subList;
    }

}

3.2 发送消息:
@ApiOperation("发送批量消息")
@GetMapping("/sendBatchMsg")
public void sendBatchMsg() {
    String msg = "批量消息体发出时间";
    List<String> list = new ArrayList<>();
    for (int i = 0; i < 100; i++) {
        String msgBody = msg + i + ":" + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date());
        list.add(msgBody);
    }
    mqProducerService.sendBatchMsg("RLT_TEST_TOPIC","tag5",list);
}
3.3 消费消息:
@Service
@RocketMQMessageListener(topic = "RLT_TEST_TOPIC", selectorExpression = "tag5", consumerGroup = "Con_Group_Sex")
public class batchConsumer implements RocketMQListener<MessageExt> {
    @Override
    public void onMessage(MessageExt messageExt) {
        byte[] body = messageExt.getBody();
        String msg = new String(body);
        log.info("监听到批量消息:msg={},Key:{}", msg,messageExt.getKeys());
    }
}
3.4 结果:

可见,100条信息一次批量发送
在这里插入图片描述

五:过滤消息

消费者在进行消息订阅时,除了可以指定要订阅消息的Topic外,还可以对指定Topic中的消息根据指定条件进行过滤,即可以订阅比Topic更加细粒度的消息类型。

1.Tag过滤

以一个标签标识信息进行过滤

1.1 代码实现

生产者:

@ApiOperation("发送过滤消息")
@GetMapping("/sendTagMsg")
public void sendTagMsg() {
    String msg = "过滤消息体";
    for (int i = 0; i < 100; i++) {
        mqProducerService.sendTagMsg("RLT_TEST_TOPIC","tag6",msg + i);
    }
}

消费者:

@Service
@RocketMQMessageListener(topic = "RLT_TEST_TOPIC", selectorExpression = "tag5 || tag6", consumerGroup = "Con_Group_7")
public class tagConsumer implements RocketMQListener<MessageExt> {
    @Override
    public void onMessage(MessageExt messageExt) {
        byte[] body = messageExt.getBody();
        String msg = new String(body);
        log.info("监听到过滤消息:msg={},Key:{}", msg,messageExt.getKeys());
    }
}

结果:
在这里插入图片描述

2.SQL过滤

相对于tag(一个消息只能有一个标签)不能应对复杂的场景,可以使用SQL表达式过滤消息。
SQL过滤是一种通过特定表达式对事先埋入到消息中的用户属性进行筛选过滤的方式。但是,只有使用PUSH模式的消费者才能使用SQL过滤。

SQL过滤表达式中支持多种常量类型与运算符:
支持的常量类型:

  • 数值:比如:123,3.1415
  • 字符:必须用单引号包裹起来,比如:‘abc’
  • 布尔:TRUE 或 FALSE
  • NULL:特殊的常量,表示空

支持的运算符有:

  • 数值比较:>,>=,<,<=,BETWEEN,=
  • 字符比较:=,<>,IN
  • 逻辑运算 :AND,OR,NOT
  • NULL判断:IS NULL 或者 IS NOT NULL

默认情况下Broker没有开启消息的SQL过滤功能,需要在Broker加载的配置文件中添加如下属性,以开启该功能:

#默认情况下Broker没有开启消息的SQL过滤功能,以下设置开启
enablePropertyFilter=true

然后重启生效即可!
在这里插入图片描述

2.1 代码实现

定义SQL过滤生产者:

/**
 * 发送sql过滤的消息"
 */
public SendResult sendSqlMsg(String topic, String tags, Object msgBody, Map<String,Object> propMap) {
    SendResult sendResult = new SendResult();
    try {
        DefaultMQProducer producer = rocketMQTemplate.getProducer();
        Message msg = new Message(topic, tags, msgBody.toString().getBytes());
        Set<Map.Entry<String, Object>> entries = propMap.entrySet();
        for (Map.Entry<String, Object> entry : entries) {
            String key = entry.getKey();
            Object value = entry.getValue();
            msg.putUserProperty(key, value + "");
        }
        sendResult = producer.send(msg);
        System.out.println(sendResult);
    }catch (Exception e) {
        e.printStackTrace();
    }
    return sendResult;
}
@ApiOperation("发送sql过滤消息")
@GetMapping("/sendSqlMsg")
public void sendSqlMsg() {
    String msg = "sql过滤消息体";
    for (int i = 0; i < 100; i++) {
        Map<String, Object> pop = new HashMap<>();
        pop.put("age",i);
        mqProducerService.sendSqlMsg("RLT_TEST_TOPIC","tag7",msg + i,pop);
    }
}

消费者:

@Service
@RocketMQMessageListener(topic = "RLT_TEST_TOPIC",selectorType = SelectorType.SQL92, selectorExpression = "age between 0 and 6", consumerGroup = "Con_Group_8")
public class SqlConsumer implements RocketMQListener<MessageExt> {
    @Override
    public void onMessage(MessageExt messageExt) {
        byte[] body = messageExt.getBody();
        String msg = new String(body);
        log.info("监听到sql过滤消息:msg={},Key:{}", msg,messageExt.getKeys());
    }
}

结果:
在这里插入图片描述

六:事务消息

1.问题引入

需求场景:工行用户A向建行用户B转账1万元
可以使用同步消息来处理该需求场景:
在这里插入图片描述
存在的问题:
若第3步中的扣款操作失败,但消息已经成功发送到了Broker。对于MQ来说,只要消息写入成功,那么这个消息就可以被消费。此时建行系统中用户B增加了1万元。出现了数据不一致问题。

2.解决思路

让第1、2、3步具有原子性,要么全部成功,要么全部失败。即消息发送成功后,必须要保证扣款成功。如果扣款失败,则回滚发送成功的消息。而该思路即使用事务消息。这里要使用分布式事务解决方案。
在这里插入图片描述
预扣款执行结果存在三种可能性:

// 描述本地事务执行状态
public enum LocalTransactionState {
COMMIT_MESSAGE, // 本地事务执行成功
ROLLBACK_MESSAGE, // 本地事务执行失败
UNKNOW, // 不确定,表示需要进行回查以确定本地事务的执行结果
}

消息回查,即重新查询本地事务的执行状态。

关于消息回查,有三个常见的属性设置。它们都在broker加载的配置文件中设置,例如:

  • transactionTimeout=20,指定TM在20秒内应将最终确认状态发送给TC,否则引发消息回查。默认为60秒
  • transactionCheckMax=5,指定最多回查5次,超过后将丢弃消息并记录错误日志。默认15次。
  • transactionCheckInterval=10,指定设置的多次消息回查的时间间隔为10秒。默认为60秒。

rocketMQ中使用的分布式事务解决方案是XA处理模式
XA模式中有三个重要组件:TC、TM、RM。
TC:事务协调者。维护全局和分支事务的状态,驱动全局事务提交或回滚。(RocketMQ中Broker充当着TC。)
TM:事务管理器。定义全局事务的范围:开始全局事务、提交或回滚全局事务。它实际是全局事务的发起者。(RocketMQ中事务消息的Producer充当着TM)
RM:资源管理器。管理分支事务处理的资源,与TC交谈以注册分支事务和报告分支事务的状态,并驱动分支事务提交或回滚(RocketMQ中事务消息的Producer及Broker均是RM)

3.注意

  • 事务消息不支持延时消息和批量消息。
  • 对于事务消息要做好幂等性检查,因为事务消息可能不止一次被消费(因为存在回滚后再提交的情况)
  • 事务消息的生产者 ID 不能与其他类型消息的生产者 ID 共享。与其他类型的消息不同,事务消息允许反向查询、MQ服务器能通过它们的生产者 ID 查询到消费者。

4.代码实现

4.1生产者:
@ApiOperation("发送事务消息")
@GetMapping("/sendTransactionMsg")
public void TransactionMsg() {
    for (int i = 8; i < 11; i++) {
        mqProducerService.TransactionMsg("RLT_TEST_TOPIC", "tag" + i, "事务消息" + i);
    }

}
/**
 * 发送事务消息
 * @param topic
 * @param tags
 * @param msgBody
 */
public TransactionSendResult TransactionMsg(String topic, String tags, Object msgBody) {
    TransactionMQProducer producer = (TransactionMQProducer)rocketMQTemplate.getProducer();
    // 为生产者指定一个线程池
    producer.setExecutorService(executorService);
    // 设置事务监听器
    producer.setTransactionListener(new TransactionListenerImpl());
    // 设置生产者组
    producer.setProducerGroup("Con_Group_9");

    // 生成生产事务id
    String transactionId = UUID.randomUUID().toString().replace("-", "");
    // 构建消息体
    org.springframework.messaging.Message<Object> message = MessageBuilder.withPayload(msgBody).setHeader(RocketMQHeaders.TRANSACTION_ID, transactionId).build();

    // 第三个参数用于指定在执行本地事务时要使用的业务参数
    TransactionSendResult transactionSendResult = rocketMQTemplate.sendMessageInTransaction(topic + ":" + tags, message, "业务参数:" + msgBody);

    return transactionSendResult;
}
4.2 消费者:
@Service
@RocketMQMessageListener(topic = "RLT_TEST_TOPIC", selectorExpression = "tag8 || tag9 || tag10", consumerGroup = "Con_Group_9")
public class TransactionConsumer implements RocketMQListener<MessageExt> {
    @Override
    public void onMessage(MessageExt messageExt) {
        byte[] body = messageExt.getBody();
        String msg = new String(body);
        log.info("监听到事务消息:msg={},Key:{}", msg,messageExt.getKeys());
    }
}
4.3事务监听实现:
package com.lmy.config.rocketmq;

import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;

/**
 * @author : liu ming yong
 * @date : 2024/4/21 下午 6:30
 * @description : 事务监听器
 */
@Slf4j
public class TransactionListenerImpl implements TransactionListener {

    @Override
    public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
        // 回调操作方法,消息预提交成功就会触发该方法的执行,用于完成本地事务
        log.info("预提交消息成功:" + msg+";业务参数:"+arg);
        // 假设接收到tag8的消息就表示操作成功,tag9的消息表示操作失败,tag10表示操作结果不清楚,需要执行消息回查
        if (StringUtils.equals("tag8", msg.getTags())) {
            return LocalTransactionState.COMMIT_MESSAGE;
        } else if (StringUtils.equals("tag9", msg.getTags())) {
            return LocalTransactionState.ROLLBACK_MESSAGE;
        } else if (StringUtils.equals("tag10", msg.getTags())) {
            return LocalTransactionState.UNKNOW;
        }
        return LocalTransactionState.UNKNOW;

    }

    @Override
    public LocalTransactionState checkLocalTransaction(MessageExt msg) {
        // 消息回查方法
        // 引发消息回查的原因最常见的有两个:1)回调操作返回UNKNWON  2)TC没有接收到TM的最终全局事务确认指令
        log.info("执行消息回查" + msg.getTags());
        return LocalTransactionState.COMMIT_MESSAGE;
    }
}

4.4结果:

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1613373.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

第 394 场 LeetCode 周赛题解

A 统计特殊字母的数量 I 哈希&#xff1a;遍历然后枚举 class Solution {public:int numberOfSpecialChars(string word) {unordered_map<char, int> m;for (auto ch : word)m[ch] 1;int res 0;for (char ch a; ch < z; ch)if (m.count(ch) && m.count(A …

TPM RNG是什么?

TPM是什么&#xff1f; TPM&#xff08;可信平台模块&#xff09;用于提高电脑的安全性。 BitLocker 硬盘加密、Windows Hello 等服务都使用它来安全地创建和存储加密密钥&#xff0c;并确认设备上的操作系统和固件是正确的&#xff0c;没有被篡改。 虽然 TPM 2.0 标准允许英特…

Qt实现XYModem协议(五)

1 概述 XMODEM协议是一种使用拨号调制解调器的个人计算机通信中广泛使用的异步文件运输协议。这种协议以128字节块的形式传输数据&#xff0c;并且每个块都使用一个校验和过程来进行错误检测。使用循环冗余校验的与XMODEM相应的一种协议称为XMODEM-CRC。还有一种是XMODEM-1K&am…

电磁仿真--S参数测试中的参考阻抗

目录 1. 背景介绍 2. 参考阻抗 2.1 简单二端口网络 2.2 离散端口模型 3. 阻抗归一化的指定值 4. 总结 1. 背景介绍 当我们使用网络分析仪来测量S参数&#xff0c;或借助示波器来检测高速信号时&#xff0c;选择仪器系统预设的参考阻抗变得异常简便&#xff0c;通常这个值…

Android14 - WindowManagerService之客户端Activity布局

Android14 - WindowManagerService之客户端Activity布局 一、主要角色 WMS作为一个服务端&#xff0c;有多种客户端与其交互的场景。我们以常见的Activity为例&#xff1a; Activity&#xff1a;在ActivityThread构建一个Activity后&#xff0c;会调用其attach方法&#xff0c;…

FPGA Quartus IP核 打开使用

两种Quartus版本下的IP核&#xff0c;从使用者的角度来看仅仅是配置界面不同&#xff0c;在参数设置和使用方法上基本一致。本文以“MegaWizard Plug-In Manager”中的FIR Compiler IP核使用为例。 Quartus的FIR IP核属于收费IP&#xff0c;如果是个人学习使用需要对IP核单独破…

OpenStack 常见模块详解

目录 一、OpenStack 架构 二、控制台 Dashboard 三、身份认证服务 Keystone 1&#xff09;用户&#xff08;user&#xff09; 2&#xff09;项目&#xff08;project&#xff09; 3&#xff09;角色&#xff08;role&#xff09; 4&#xff09;服务&#xff08;serv…

Linux内核驱动开发-字符设备驱动框架

1前置条件 &#xff08;1&#xff09;【linux】内核编译结束 &#xff08;2&#xff09;【linux】目录配置跳转文件&#xff1a;补充&#xff1a;配置的跳转文件只能在【linux】目录下使用&#xff0c;子目录无法使用2驱动框架 2.1编写驱动程序 #include <linux/init.h&g…

ConcurrentHashMap 源码分析(二)

一、序言 本文和大家探讨一下 ConcurrentHashMap#get() 方法的源码。 二、源码概览 public V get(Object key) {// 定义变量Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;// 计算键的哈希值int h spread(key.hashCode());// 检查哈希表是否为空&#xff…

前端三大件速成 02 CSS(1)CSS是什么、CSS的四种引入方式、CSS的选择器和优先级、继承

文章目录 一、CSS是什么二、CSS的四种引入方式1、行内式2、嵌入式3、链接式&#xff08;推荐&#xff09;4、导入式 三、CSS的选择器1、基本选择器2、组合选择器3、属性选择器4、伪类 四、选择器的优先级1、选择器的权值2、附加说明 五、继承 一、CSS是什么 CSS为层叠样式表&a…

伪分布Hadoop下安装Hive

一、下载并安装Mysql &#xff08;1&#xff09;下载mysql安装包&#xff08;mysql-8.0.26-1.el7.x86_64.rpm-bundle.tar&#xff09; 下载官网&#xff1a;MySQL :: Download MySQL Community Server (Archived Versions)https://downloads.mysql.com/archives/community/ &…

在centos系统中使用boost库

打开MobaXterm软件 下载 boost_1_85_0.tar.gz tar -zxvf boost_1_85_0.tar.gz解压缩成boost_1_85_0文件夹 双击arrayDemo.cpp 在里面可以编写代码 arrayDemo.cpp #include <boost/timer/timer.hpp> #include <boost/array.hpp> #include <cmath> #inc…

Linux 系统systemd(pid=1)占用80端口导致web程序无法启动

背景 linux系统内安装了例如nginx的网站程序&#xff0c;但是无法正常启动&#xff0c;netstat 查看下 80端口被 systemd 占用。 处理方法 注意务必组好快照备份后再操作。 做好备份后将/usr/lib/systemd/system 内http相关的配置文件重命名后重启主机恢复正常。 重启后正常 sy…

51、图论-岛屿数量

思路&#xff1a; 该问题要求在一个由 1&#xff08;表示陆地&#xff09;和 0&#xff08;表示水&#xff09;组成的二维网格中&#xff0c;计算岛屿的数量。岛屿被水包围&#xff0c;并且通过水平或垂直连接相邻的陆地可以形成。这个问题的核心是识别并计数网格中相连的陆地…

网盘——文件操作之界面设计

关于网盘实现部分&#xff0c;文件操作包含三个部分&#xff1a;界面设计、文件夹操作、常规文件操作。本文主要讲解界面设计&#xff0c;后续文章后讲解后两部分。 1、界面设计 最终的界面如下 1.1、创建类&#xff0c;并添加头文件 #include <QListWidget> #include…

spring boot3单模块项目工程搭建-上(个人开发模板)

⛰️个人主页: 蒾酒 &#x1f525;系列专栏&#xff1a;《spring boot实战》 &#x1f30a;山高路远&#xff0c;行路漫漫&#xff0c;终有归途 目录 写在前面 上文衔接 常规目录创建 common目录 exception.handle目录 result.handle目录 controller目录 service…

探索NDWI:归一化水体指数的意义与应用

随着遥感技术的不断发展&#xff0c;NDWI&#xff08;Normalized Difference Water Index&#xff0c;归一化水体指数&#xff09;作为一种重要的植被指数&#xff0c;被广泛应用于水资源管理、湿地监测和环境保护等领域。本文将介绍NDWI的意义、计算方法以及在不同领域的应用。…

什么是0-day漏洞,怎么防护0-day漏洞攻击

随着信息技术的快速发展&#xff0c;网络安全问题日益凸显&#xff0c;其中0day漏洞攻击作为一种高级威胁手段&#xff0c;给企业和个人用户带来了极大的风险。下面德迅云安全就对0day漏洞攻击进行简单讲解下&#xff0c;并分享相应的一些安全措施&#xff0c;以期提高网络安全…

设计模式-创建型-抽象工厂模式-Abstract Factory

UML类图 工厂接口类 public interface ProductFactory {Phone phoneProduct();//生产手机Router routerProduct();//生产路由器 } 小米工厂实现类 public class XiaomiFactoryImpl implements ProductFactory {Overridepublic Phone phoneProduct() {return new XiaomiPhone…

【Interconnection Networks 互连网络】Flattened Butterfly 扁平蝶形拓扑

Flattened Butterfly 扁平蝶形拓扑 1. 传统蝶形网络 Butterfly Topology2. 扁平蝶形拓扑 Flattened Butterfly3.On-Chip Flattened Butterfly 扁平蝶形拓扑应用于片上网络 Flattened Butterfly 扁平蝶形拓扑 扁平蝶形拓扑是一种经济高效的拓扑&#xff0c;适用于高基数路由器…