【RocketMQ】RocketMQ发送不同类型消息

news2025/1/21 4:50:15

🎯 导读:本文介绍了RocketMQ消息队列系统中的几种消息发送模式及其应用场景,包括同步消息、异步消息以及事务消息。同步消息确保了消息的安全性,但牺牲了一定的性能;异步消息提高了响应速度,适用于对响应时间敏感的场景;事务消息则保证了消息与本地事务的一致性,适用于需要预执行业务逻辑以决定消息是否发送的场景。此外,文章还探讨了Topic与Tag的应用策略,以及如何利用自定义Key来方便消息的查询和去重,提供了丰富的代码示例帮助理解。

文章目录

  • RocketMQ发送同步消息*
  • RocketMQ发送异步消息*
    • 异步消息生产者
    • 异步消息消费者
  • RocketMQ发送单向消息
    • 单向消息生产者
    • 单向消息消费者
  • RocketMQ发送延迟消息*
    • 延迟消息生产者
    • 延迟消息消费者
  • RocketMQ发送顺序消息
    • 场景分析
    • 定义消息实体
    • 顺序消息生产者
    • 顺序消息消费者
  • RocketMQ发送批量消息
    • 批量消息生产者
    • 批量消息消费者
  • RocketMQ发送事务消息(不够Seata方便)
    • 事务消息的发送流程
    • 事务消息生产者
    • 事务消息消费者
    • 测试结果
  • RocketMQ发送带标签的消息*(消息过滤)
    • 订阅关系一致
    • 标签消息生产者
    • 标签消息消费者
    • Topic 和 Tag 的应用推荐(官方推荐)
  • 发送消息携带自定义Key
    • 携带Key好处
    • 携带 key 消息生产者
    • 携带 key 消息消费者

使用*标注的为常用的消息类型

RocketMQ发送同步消息*

在这里插入图片描述

方法有返回Result的是同步消息,可以参考快速入门案例的实现

  • 同步消息发送过后会有一个返回值(MQ 服务器接收到消息后返回的一个确认),这种方式非常安全,但是性能没有那么高
  • 在 MQ 集群中,要等到所有的从机都复制了消息以后才会返回(要等很久)
  • 应用场景:重要的消息可以选择这种方式

在这里插入图片描述

RocketMQ发送异步消息*

  • 异步消息用在对响应时间敏感的业务场景,即发送端不能容忍长时间地等待Broker响应的场景
  • 发送完以后会有一个异步消息通知告诉生产者消息是否发送成功

异步消息生产者

@Test
public void asyncProducer() throws Exception {
    DefaultMQProducer producer = new DefaultMQProducer("async-producer-group");
    producer.setNamesrvAddr(MqConstant.NAME_SRV_ADDR);
    producer.start();
    Message message = new Message("asyncTopic", "我是一个异步消息".getBytes());
    producer.send(message, new SendCallback() {
        // 异步回调方法
        @Override
        public void onSuccess(SendResult sendResult) {
            System.out.println("发送成功");
        }

        @Override
        public void onException(Throwable e) {
            System.err.println("发送失败:" + e.getMessage());
        }
    });
    System.out.println("我先执行");
    // 挂起jvm
    System.in.read();
}

在这里插入图片描述

异步消息消费者

@Test
public void testAsyncConsumer() throws Exception {
    // 创建默认消费者组
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer-group");
    // 设置nameServer地址
    consumer.setNamesrvAddr("localhost:9876");
    // 订阅一个主题来消费   *表示没有过滤参数 表示这个主题的任何消息
    consumer.subscribe("TopicTest", "*");
    // 注册一个消费监听 MessageListenerConcurrently是并发消费
    // 默认是20个线程一起消费,可以参看 consumer.setConsumeThreadMax()
    consumer.registerMessageListener(new MessageListenerConcurrently() {
        @Override
        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                                                        ConsumeConcurrentlyContext context) {
            // 这里执行消费的代码 默认是多线程消费
            System.out.println(Thread.currentThread().getName() + "----" + msgs);
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
    });
    consumer.start();
    System.in.read();
}

RocketMQ发送单向消息

  • 这种方式主要用在不关心发送结果的场景(没有同步或者异步回调,不在乎消息是否发送成功),例如日志信息的发送
  • 这种方式吞吐量很大,但是存在消息丢失的风险

单向消息生产者

@Test
public void testOnewayProducer() throws Exception {
    // 创建默认的生产者
    DefaultMQProducer producer = new DefaultMQProducer("test-group");
    // 设置nameServer地址
    producer.setNamesrvAddr("localhost:9876");
    // 启动实例
    producer.start();
    Message msg = new Message("TopicTest", ("单向消息").getBytes());
    // 发送单向消息
    producer.sendOneway(msg);
    // 关闭实例
    producer.shutdown();
}

单向消息消费者

消费者和上面一样

RocketMQ发送延迟消息*

  • 消息放入 MQ 后,过一段时间,才会被消费者监听到
  • 在下单业务中,提交一个订单后,发送一个延时消息,30min后去检查这个订单的状态,如果还是未付款就取消订单,释放库存。类似场景还有 7 天自动收货

延迟消息生产者

RocketMQ 4.x 版本不支持任意时间的延时,只支持以下几个固定的延时等级,等级1就对应1s,以此类推,最高支持2h延迟

在这里插入图片描述

@Test
public void testDelayProducer() throws Exception {
    // 创建默认的生产者
    DefaultMQProducer producer = new DefaultMQProducer("ms-consumer-group");
    // 设置nameServer地址
    producer.setNamesrvAddr("localhost:9876");
    // 启动实例
    producer.start();
    Message msg = new Message("TopicTest", ("延迟消息").getBytes());
    // 给这个消息设定一个延迟级别,每个级别对应一个时间
    // messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
    msg.setDelayTimeLevel(3);
    // 发送单向消息
    producer.send(msg);
    // 打印时间
    System.out.println(new Date());
    // 关闭实例
    producer.shutdown();
}

RocketMQ 5.x 版本支持任意时间的延时(使用时间轮算法)

Message message = new Message("orderMsTopic", "订单号,座位号".getBytes());
// 直接设置延迟多少秒
message.setDelayTimeSec(500);
// 设置延迟多少毫秒
// message.setDelayTimeMs(100);
// 发延迟消息
producer.send(message);

延迟消息消费者

延时消息会有一点小误差,不完全准时

/**
 * 发送时间Fri Apr 21 16:19:54 CST 2023
 * 收到消息了Fri Apr 21 16:20:20 CST 2023
 * --------------
 * 发送时间Fri Apr 21 16:21:08 CST 2023
 * 收到消息Fri Apr 21 16:21:18 CST 2023
 *
 * @throws Exception
 */
@Test
public void msConsumer() throws Exception {
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ms-consumer-group");
    consumer.setNamesrvAddr(MqConstant.NAME_SRV_ADDR);
    consumer.subscribe("orderMsTopic", "*");
    consumer.registerMessageListener(new MessageListenerConcurrently() {
        @Override
        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
            System.out.println("收到消息了" + new Date());
            System.out.println(new String(msgs.get(0).getBody()));
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
    });
    consumer.start();
    System.in.read();
}

RocketMQ发送顺序消息

  • 消息有序指的是可以按照消息的发送顺序来消费(FIFO)
  • RocketMQ 可严格保证消息有序,例如下单之后,先发送短信、再发货

【有序类型】

  • 分区有序(可以直接通过MessageListenerOrderly实现)
  • 全局有序(MessageListenerOrderly + 设置Broker队列数量为1)

可能大家会有疑问,mq本身不就是FIFO吗?

答:一个broker中有四个queue,消费默认是并发的,线程之间有竞争关系,不能确保顺序

【顺序消费的原理解析】

  • 默认消息发送时采取Round Robin轮询方式把消息发送到不同的queue(分区队列)
  • 消费消息的时候,从多个queue上拉取消息,消费不能保证全局有序,只能保证分区有序(每个queue的消息都是有序的)

在这里插入图片描述

  • 但如果控制消息只依次发送到同一个queue中,消费的时候只从这个queue上依次拉取,就保证了全局有序(发送和消费参与的队列只有一个)

场景分析

模拟一个订单的发送流程,创建两个订单,发送的消息分别是

  • 订单号1000111 发消息流程 下订单->物流->签收
  • 订单号1000222 发消息流程 下订单->物流->拒收

即实现分区有序

定义消息实体

@Data
@AllArgsConstructor
@NoArgsConstructor
public class MsgModel {

    private String orderSn;
    private Integer userId;
    private String desc; // 下单 短信 物流
}

顺序消息生产者

private List<MsgModel> msgModels = Arrays.asList(
        new MsgModel("1000111", 1, "下单"),
        new MsgModel("1000111", 1, "短信"),
        new MsgModel("1000111", 1, "物流"),
        new MsgModel("1000222", 2, "下单"),
        new MsgModel("1000222", 2, "短信"),
        new MsgModel("1000222", 2, "物流")
);

@Test
public void orderlyProducer() throws Exception {
    DefaultMQProducer producer = new DefaultMQProducer("orderly-producer-group");
    producer.setNamesrvAddr(MqConstant.NAME_SRV_ADDR);
    producer.start();
    // 发送顺序消息  发送时要确保有序 并且要发到同一个队列下面去
    msgModels.forEach(msgModel -> {
        String msgModelString = msgModel.toString();
        Message message = new Message("orderlyTopic", msgModelString.getBytes());
        try {
            producer.send(message, new MessageQueueSelector() {
                        @Override
                        public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
                            // arg就是接收到的订单号 orderSn
                            // 在这里 选择队列
                            int hashCode = arg.toString().hashCode();
                            // 根据订单号hashCode,取模放到队列中,订单号一样,放到的队列一样
                            return mqs.get(hashCode % mqs.size());
                        }
                    },
                    // 传递订单号进去
                    msgModel.getOrderSn());

        } catch (Exception e) {
            e.printStackTrace();
        }
    });
    producer.shutdown();
    System.out.println("发送完成");
}

顺序消息消费者

@Test
public void orderlyConsumer() throws Exception {
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("orderly-consumer-group");
    consumer.setNamesrvAddr(MqConstant.NAME_SRV_ADDR);
    consumer.subscribe("orderlyTopic", "*");
    // MessageListenerConcurrently 并发模式 多线程的  重试16次
    // MessageListenerOrderly 顺序模式 单线程的 无限重试Integer.Max_Value
    consumer.registerMessageListener(new MessageListenerOrderly() {
        @Override
        public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
            System.out.println("线程id:" + Thread.currentThread().getId());
            System.out.println(new String(msgs.get(0).getBody()));
            return ConsumeOrderlyStatus.SUCCESS;
        }
    });
    consumer.start();
    System.in.read();
}

下面的结果就是分区有序,同一个订单(同一队列)的消息会按照顺序消费,但是不同订单的消息没有顺序约束

在这里插入图片描述

RocketMQ发送批量消息

RocketMQ可以一次性发送一组消息,这一组消息被投递到同一个队列中,会被当做一条消息进行消费

批量消息生产者

@Test
public void testBatchProducer() throws Exception {
    // 创建默认的生产者
    DefaultMQProducer producer = new DefaultMQProducer("test-group");
    // 设置nameServer地址
    producer.setNamesrvAddr("localhost:9876");
    // 启动实例
    producer.start();
    // 消息实际上还是分开发的,接收的时候还是一条一条接收就行
    List<Message> msgs = Arrays.asList(
            new Message("TopicTest", "我是一组消息的A消息".getBytes()),
            new Message("TopicTest", "我是一组消息的B消息".getBytes()),
            new Message("TopicTest", "我是一组消息的C消息".getBytes())
    );
    SendResult send = producer.send(msgs);
    System.out.println(send);
    // 关闭实例
    producer.shutdown();
}

三个消息放在一个队列中

在这里插入图片描述

批量消息消费者

消费者还是一条一条来消费

@Test
public void testBatchConsumer() throws Exception {
    // 创建默认消费者组
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer-group");
    // 设置nameServer地址
    consumer.setNamesrvAddr("localhost:9876");
    // 订阅一个主题来消费   表达式,默认是*
    consumer.subscribe("TopicTest", "*");
    // 注册一个消费监听 MessageListenerConcurrently是并发消费
    // 默认是20个线程一起消费,可以参看 consumer.setConsumeThreadMax()
    consumer.registerMessageListener(new MessageListenerConcurrently() {
        @Override
        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                                                        ConsumeConcurrentlyContext context) {
            // 这里执行消费的代码 默认是多线程消费
            System.out.println(Thread.currentThread().getName() + "----" + new String(msgs.get(0).getBody()));
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
    });
    consumer.start();
    System.in.read();
}

RocketMQ发送事务消息(不够Seata方便)

事务消息的发送流程

它允许发送方在发送消息之前执行某些业务逻辑,并根据这些业务逻辑的结果来决定消息是否应该被发送。

  • 如果业务逻辑执行成功,则消息会被提交并最终被消费者消费;
  • 如果业务逻辑执行失败或不确定,则消息的状态将保持未知,直到通过回查机制确认其状态为止。

在这里插入图片描述

下图说明了事务消息的大致方案,其中分为两个流程:正常事务消息的发送及提交、事务消息的补偿流程。

在这里插入图片描述

事务消息发送及提交

1、发送消息(half消息)

2、服务端响应消息写入结果(如果写入失败,此时half消息对业务不可见,本地事务不执行)

3、根据发送结果执行本地事务

4、根据本地事务状态执行Commit或Rollback(Commit操作生成消息索引,消息对消费者可见)

事务补偿

补偿阶段用于解决消息UNKNOW或者Rollback发生超时或者失败的情况

1、对没有Commit/Rollback的事务消息(pending状态的消息),发起一次“回查”

2、Producer收到回查消息,检查回查消息对应的本地事务的状态

3、根据本地事务状态,重新 Commit 或者 Rollback

事务消息状态

事务消息共有三种状态,提交状态、回滚状态、中间状态

  • TransactionStatus.CommitTransaction: 提交事务,允许消费者消费此消息
  • TransactionStatus.RollbackTransaction: 回滚事务,该消息将被删除,不允许被消费
  • TransactionStatus.Unknown: 中间状态,需要检查消息队列来确定状态

事务消息生产者

通过在broker.conf文件中设置如下参数

  • transactionCheckInterval:Broker检查事务消息状态的默认间隔时间(单位为毫秒)。默认Broker每1分钟(60000毫秒)会对未确认的事务消息进行一次状态检查
  • transactionTimeOut:事务消息的有效期,即事务消息在未得到确认前能存在的最长时间
  • transactionCheckMax:事务消息的最大检测次数(默认回查15次)。如果在达到最大检测次数后事务消息的状态仍未得到确认,Broker会默认认为事务已失败,并对消息进行回滚
private Random random = new Random();
private SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");

/**
 * 创建订单,扣减库存
 *
 * @throws Exception
 */
@Test
public void createOrderAndDeductStock() throws Exception {
    // 构建消息体
    TransactionMQProducer producer = new TransactionMQProducer("async-producer-group");
    producer.setNamesrvAddr(MqConstant.NAME_SRV_ADDR);

    // 设置事务消息的监听器
    producer.setTransactionListener(new TransactionListener() {
        @Override
        public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
            System.out.println(sdf.format(new Date()) + "-->执行本地事务:" + new String(msg.getBody()) + ";事务参数:" + arg);

            // 获取订单信息
            String orderInfo = new String(msg.getBody());

            // 做一些业务操作...

            // 检查订单状态
            if (isOrderValid(orderInfo)) {
                System.out.println("订单状态没有问题,消息发送给消费者处理");
                return LocalTransactionState.COMMIT_MESSAGE;
            } else {
                System.out.println("订单状态有问题,等会重新 检查本地事务");
                // 返回 UNKNOW ,后面会调用checkLocalTransaction(msg)方法
                return LocalTransactionState.UNKNOW;
            }
        }

        /**
         * 回查,确认上面的业务是否有结果
         * 触发条件:
         * 1、当上面执行本地事务返回结果 UNKNOW 时,或者回查方法也返回 UNKNOW 时,会触发
         * 2、上面操作超过 20s 没有做出一个结果,也就是超时或者卡住了,也会进行回查
         * @param messageExt
         * @return
         */
        @Override
        public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {
            System.out.println(sdf.format(new Date()) + "-->检查本地事务:" + new String(messageExt.getBody()));

            // 检查订单状态
            String orderInfo = new String(messageExt.getBody());
            if (isOrderValid(orderInfo)) {
                System.out.println("订单状态没有问题,消息发送给消费者处理");
                return LocalTransactionState.COMMIT_MESSAGE;
            } else {
                System.out.println("订单状态还是有问题,这个消息不要了");
                return LocalTransactionState.ROLLBACK_MESSAGE;
            }
        }
    });

    producer.start();

    for (int i = 0; i < 3; i++) {
        String orderId = UUID.randomUUID().toString().replace("-", "");
        int amount = random.nextInt(10) + 1;
        // 构建订单信息
        String orderInfo = "订单号:" + orderId + " 数量:" + amount;

        // 尝试保存订单信息
        boolean saveOrderSuccess = saveOrder(orderInfo);

        // 发送事务消息
        Message message = new Message("Affair_Topic", orderInfo.getBytes());
        System.out.println(sdf.format(new Date()) + "-->发送事务消息:" + orderInfo);
        producer.sendMessageInTransaction(message, "hahaha");

    }
    System.in.read();
}


private boolean saveOrder(String orderInfo) {
    // 模拟保存订单信息到数据库
    // 假设保存成功
    return true;
}

private boolean isOrderValid(String orderInfo) {
    // 模拟检查订单状态
    // 假设订单有效
    int random = this.random.nextInt(2);
    return random == 1; // 应该替换为实际的订单状态检查逻辑
}

事务消息消费者

@Test
public void testTransactionConsumer() throws Exception {
    // 创建默认消费者组
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer-group");
    // 设置nameServer地址
    consumer.setNamesrvAddr(MqConstant.NAME_SRV_ADDR);
    // 订阅一个主题来消费   *表示没有过滤参数 表示这个主题的任何消息
    consumer.subscribe("Affair_Topic", "*");
    // 注册一个消费监听 MessageListenerConcurrently是并发消费
    // 默认是20个线程一起消费,可以参看 consumer.setConsumeThreadMax()
    consumer.registerMessageListener(new MessageListenerConcurrently() {
        @Override
        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
            // 这里执行消费的代码 默认是多线程消费
            System.out.println(sdf.format(new Date()) + "-->" + Thread.currentThread().getName() + " 执行消费:" + new String(msgs.get(0).getBody()));
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
    });
    consumer.start();
    System.in.read();
}

测试结果

在这里插入图片描述

RocketMQ发送带标签的消息*(消息过滤)

  • RocketMQ提供消息过滤功能,通过tag或者key进行区分。同一个主题对应多个tag。我们往一个主题里面发送消息的时候,根据业务逻辑,可能需要区分,比如带有tagA标签的被A消费,带有tagB标签的被B消费,通过过滤来区别对待
  • 场景:订单主题中。有的消息是关于生鲜,需要及时配送;有的消息是关于服装,可以慢一点发货。不同标签的处理逻辑不同

在这里插入图片描述

订阅关系一致

  • 订阅关系:一个消费者组订阅一个 Topic 的某一个 Tag(个人实测,一个标签最好对应一个标签,不然信息会被过滤掉,不被正常消费),这种记录被称为订阅关系。
  • 订阅关系一致:同一个消费者组下所有消费者实例所订阅的 Topic、Tag 必须完全一致。如果订阅关系(消费者组名 - Topic - Tag)不一致,会导致消费消息紊乱,甚至消息丢失。

在这里插入图片描述

标签消息生产者

@Test
public void tagProducer() throws Exception {
    DefaultMQProducer producer = new DefaultMQProducer("tag-producer-group");
    producer.setNamesrvAddr(MqConstant.NAME_SRV_ADDR);
    producer.start();
    Message message = new Message("tagTopic", "vip1", "我是vip1的文章".getBytes());
    Message message2 = new Message("tagTopic", "vip2", "我是vip2的文章".getBytes());
    producer.send(message);
    producer.send(message2);
    System.out.println("发送成功");
    producer.shutdown();
}

标签消息消费者

/**
 * vip1
 *
 * @throws Exception
 */
@Test
public void tagConsumer1() throws Exception {
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("tag-consumer-group-a");
    consumer.setNamesrvAddr(MqConstant.NAME_SRV_ADDR);
    // 主题,标签(写 * 是所有标签)
    consumer.subscribe("tagTopic", "vip1");
    consumer.registerMessageListener(new MessageListenerConcurrently() {
        @Override
        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
            System.out.println("我是vip1的消费者,我正在消费消息" + new String(msgs.get(0).getBody()));
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
    });
    consumer.start();
    System.in.read();
}


/**
 * vip1 || vip2
 *
 * @throws Exception
 */
@Test
public void tagConsumer2() throws Exception {
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("tag-consumer-group-b");
    consumer.setNamesrvAddr(MqConstant.NAME_SRV_ADDR);
    consumer.subscribe("tagTopic", "vip1 || vip2");
    consumer.registerMessageListener(new MessageListenerConcurrently() {
        @Override
        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
            System.out.println("我是vip2的消费者,我正在消费消息" + new String(msgs.get(0).getBody()));
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
    });
    consumer.start();
    System.in.read();
}

Topic 和 Tag 的应用推荐(官方推荐)

总结:不同的业务应该使用不同的Topic,如果是相同的业务里面有不同的表现形式,我们要使用tag进行区分,可以从以下几个方面进行判断:

1、消息类型是否一致:如普通消息、事务消息、定时(延时)消息、顺序消息,不同的消息类型使用不同的 Topic,无法通过 Tag 进行区分

2、业务是否相关联:淘宝交易消息、京东物流消息使用不同的 Topic 进行区分;同样是天猫交易消息,电器类订单、女装类订单、化妆品类订单的消息可以用 Tag 进行区分

3、消息优先级是否一致:如同样是物流消息,盒马必须小时内送达,天猫超市 24 小时内送达,淘宝物流则相对会慢一些,不同优先级的消息用不同的 Topic 进行区分

4、消息量级是否相当:有些业务消息虽然量小但是实时性要求高(如生鲜订单),如果跟某些万亿量级的消息使用同一个 Topic,可能会因为过长的等待时间而“饿死”,此时需要将不同量级的消息进行拆分,使用不同的 Topic

总的来说,针对消息分类,您可以选择创建多个 Topic,或者在同一个 Topic 下创建多个 Tag。但通常情况下,不同的 Topic 之间的消息没有必然的联系,而 Tag 则用来区分同一个 Topic 下相互关联的消息,例如全集和子集的关系、流程先后的关系。

发送消息携带自定义Key

在 RocketMQ 中的消息,默认会有一个messageId当做消息的唯一标识,我们也可以给消息携带一个key,用作唯一标识或者业务标识,包括在控制面板查询的时候也可以使用messageId或key来进行查询

在这里插入图片描述

携带Key好处

  • 方便查阅
  • 方便去重

携带 key 消息生产者

@Test
public void keyProducer() throws Exception {
    DefaultMQProducer producer = new DefaultMQProducer("key-producer-group");
    producer.setNamesrvAddr(MqConstant.NAME_SRV_ADDR);
    producer.start();
    String key = UUID.randomUUID().toString();
    System.out.println(key);
    Message message = new Message("keyTopic", "vip1", key, "我是vip1的文章".getBytes());
    producer.send(message);
    System.out.println("发送成功");
    producer.shutdown();
}

携带 key 消息消费者

@Test
public void keyConsumer() throws Exception {
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("key-consumer-group");
    consumer.setNamesrvAddr(MqConstant.NAME_SRV_ADDR);
    consumer.subscribe("keyTopic", "*");
    consumer.registerMessageListener(new MessageListenerConcurrently() {
        @Override
        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
            MessageExt messageExt = msgs.get(0);
            System.out.println("我是vip1的消费者,我正在消费消息" + new String(messageExt.getBody()));
            // 获取key
            System.out.println("我们业务的标识:" + messageExt.getKeys());
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
    });
    consumer.start();
    System.in.read();
}

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2175497.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

搬砖6、Python函数和模块的使用

函数和模块的使用 在讲解本章节的内容之前&#xff0c;我们先来研究一道数学题&#xff0c;请说出下面的方程有多少组正整数解。 事实上&#xff0c;上面的问题等同于将8个苹果分成四组每组至少一个苹果有多少种方案。想到这一点问题的答案就呼之欲出了。 可以用Python的程序来…

人工智能开发实战照片智能搜索功能实现

内容提要 项目分析预备知识项目实战 一、项目分析 1、提出问题 随着人民生活水平的提高和手机照相功能的日趋完美&#xff0c;我们不经意中拍摄了很多值得回忆的时刻&#xff0c;一场说走就走的旅行途中也记录下许多令人心动的瞬间&#xff0c;不知不觉之中&#xff0c;我们…

Time-MoE : 时间序列领域的亿级规模混合专家基础模型

Time-MoE : 时间序列领域的亿级规模混合专家基础模型 时间序列预测一直是量化研究和工业应用中的重要课题。随着深度学习技术的发展&#xff0c;大规模预训练模型在自然语言处理和计算机视觉领域取得了显著进展&#xff0c;但在时间序列预测领域&#xff0c;这些模型的规模和运…

【归回预测】归回预测│PSO-ELM与标准ELM多输入预测对比源代码

摘要 本文比较了基于粒子群优化&#xff08;PSO&#xff09;和标准极限学习机&#xff08;ELM&#xff09;算法的电力负荷多输入预测模型。利用真实电力负荷数据集&#xff0c;对两种方法的预测性能进行了全面的评估&#xff0c;使用了均方误差&#xff08;MSE&#xff09;、平…

【文心智能体 | AI大师工坊】如何使用智能体插件,完成一款旅游类智能体的开发,来体验一下我的智能体『​​​​​​​厦门CityWalk』

目录 1.1、智能体运行效果 1.2、创作灵感来源 1.3、如何制作智能体 1.4、可能会遇到的几个问题 1.5、快速调优指南 『厦门CityWalk&#x1f680;』我的优质智能体&#xff1a;https://0nxj3k.smartapps.baidu.com/?_swebfr1&_swebScene3621000000000000 在当今这个全…

青动CRM V3.2.1

全面解决企业销售团队的全流程客户服务难题旨在助力企业销售全流程精细化、数字化管理&#xff0c;全面解决企业销售团队的全流程客户服务难题&#xff0c;帮助企业有效盘活客户资源、量化销售行为&#xff0c;合理配置资源、建立科学销售体系&#xff0c;提升销售业绩。标准授…

【宝藏妙招,轻松拿捏!】如何防止U盘资料被复制?U盘文件防拷贝的五种措施!

小李&#xff1a;“小张&#xff0c;你上次借我的U盘还回来的时候&#xff0c;我总觉得里面的资料好像被人动过了&#xff0c;有没有什么办法可以防止U盘里的资料被复制啊&#xff1f;” 小张&#xff1a;“当然有啦&#xff01;现在数据安全这么重要&#xff0c;防止U盘资料被…

贪心的思想

803.区间合并 给定 n 个区间 [li,ri]&#xff0c;要求合并所有有交集的区间。 注意如果在端点处相交&#xff0c;也算有交集。 输出合并完成后的区间个数。 例如&#xff1a;[1,3] 和 [2,6] 可以合并为一个区间 [1,6]。 输入格式 第一行包含整数 n。 接下来 n 行&#x…

如何通过GSR排名系统迅速提升谷歌排名?

如果你希望在谷歌上迅速提升某个关键词排名&#xff0c;或者某个关键词无论怎么优化都无法上首页&#xff0c;那么GSR关键词排名系统你就可以关注一下&#xff0c;GSR系统可以在短时间内帮助你进一步提升至首页。与传统的SEO方法不同&#xff0c;GSR侧重于外部优化&#xff0c;…

C语言进阶版第13课—字符函数和字符串函数2

文章目录 1. strstr函数的使用和模拟实现1.1 strstr函数的使用1.2 模拟实现strstr函数1.3 strstr函数和strncpy函数、puts函数的混合使用 2. strtok函数的使用**3. strerror函数的使用** 1. strstr函数的使用和模拟实现 1.1 strstr函数的使用 strstr函数是用来通过一个字符串来…

《迁移学习》—— 将 ResNet18 模型迁移到食物分类项目中

文章目录 一、迁移学习的简单介绍1.迁移学习是什么&#xff1f;2.迁移学习的步骤 二、数据集介绍三、代码实现1. 步骤2.所用到方法介绍的文章链接3. 完整代码 一、迁移学习的简单介绍 1.迁移学习是什么&#xff1f; 迁移学习是指利用已经训练好的模型&#xff0c;在新的任务上…

牛顿迭代法求解x 的平方根

牛顿迭代法是一种可以用来快速求解函数零点的方法。 为了叙述方便&#xff0c;我们用 C C C表示待求出平方根的那个整数。显然&#xff0c; C C C的平方根就是函数 f ( x ) x c − C f(x)x^c-C f(x)xc−C 的零点。 牛顿迭代法的本质是借助泰勒级数&#xff0c;从初始值开始快…

【软件测试】最新Linux大全(超详细!超级全!)

目录 前言1. 操作系统是干什么的2. Linux 是什么3. 为什么要学习 Linux4. Linux 发行版本5. Linux 系统特点6. Linux 安装7. Linux 系统启动8. Linux 操作方式9. Shell 与命令10. 命令格式 一、 Linux终端命令格式1. 终端命令格式2. 查阅命令帮助信息 二、 常用Linux命令的基本…

项目计划软件如何助力企业策略规划和执行监控

项目管理软件助力任务、时间和协作管理&#xff0c;如ZohoProjects集成了任务管理、时间跟踪、协作工具等功能&#xff0c;提高性价比&#xff0c;适合不同规模团队。其简化流程、专业度高&#xff0c;成为企业提升效率的重要工具。 一、项目计划软件的由来 项目计划软件的历史…

暴雨受邀出席2024 AI大模型生态算力峰会

9月25日&#xff0c;2024 AI大模型生态暨算力峰会在北京国家会议中心正式开幕&#xff0c;AI行业头部厂家、业界专家及人工智能行业精英齐聚一堂&#xff0c;暴雨华北大区产品总监丁海受邀出席并发表演《用AI奔赴新质生产力》的主题演讲&#xff0c;深度诠释了人工智能如何驱动…

解开BL锁之后如何安装模块及安装注意事项

本文是在解开BL锁的前提下进行的。 解开BL锁请参考:出厂非澎湃OS手机解BL锁 本文 参考&#xff1a; Magisk中文网 Magisk资源分享 ROM基地 我安装了这几个模块&#xff0c;切记先按照救砖模块。 解开BL锁之后&#xff0c;需要将下载系统ROM包提取boot.img。 目前我知道的又…

基于云开发进行快速搭建企业智能名片小程序

如何基于云开发进行快速搭建企业智能名片小程序&#xff1f; 首先&#xff0c;需要注册一个小程序账号&#xff0c;获取AppID。如果还不知道怎么注册的朋友&#xff0c;可以去看我前面写的那篇教程&#xff0c;有比较详细的注册步骤图文教程。 复制AppID&#xff0c;打开开发者…

基于SpringBoot+Vue+MySQL的旅游管理系统

系统展示 用户前台界面 管理员后台界面 系统背景 随着旅游业的蓬勃发展&#xff0c;传统的旅游信息查询与订票方式已难以满足现代游客的多元化需求。为了提升用户体验&#xff0c;提高旅游管理的效率&#xff0c;我们开发了基于SpringBootVueMySQL的旅游管理系统。该系统旨在通…

大模型微调4:Alpaca模型微调、Adalora、Qlora

Alpaca模型微调&#xff1a; 整个pipeline 1. 主流底座&#xff1a;Candidate 中文&#xff1a;YI-34B 英文&#xff1a;LLama&#xff0c;mistral 2. 验证&#xff1a; 我们自己的Instructoin data 通用的Instruction data&#xff08;适合我们场景的&#xff09; 3. 收集…

kubernetes存储入门(kubernetes)

实验环境依旧是三个节点拉取镜像&#xff0c;然后在master节点拉取资源清单&#xff1a; 然后同步会话&#xff0c;导入镜像&#xff1b; 存储入门 ConfigMap volume卷--》volumemount&#xff08;挂载卷&#xff09; Glusterfs NFS ISCSI HostPath ConfigMap Secret E…