2.12日学习打卡----初学RocketMQ(三)

news2025/1/23 2:05:20

2.12日学习打卡

目录:

  • 2.12日学习打卡
  • 一. RocketMQ高级特性(续)
    • 消息重试
    • 延迟消息
    • 消息查询
  • 二.RocketMQ应用实战
    • 生产端发送同步消息
    • 发送异步消息
    • 单向发送消息
    • 顺序发送消息
    • 消费顺序消息
    • 全局顺序消息
    • 延迟消息
    • 事务消息
    • 消息查询

一. RocketMQ高级特性(续)

消息重试

生产端重试

例如由于网络原因导致生产者发送消息到MQ失败,即发送端没有收到Broker的ACK,导致最终Consumer无法消费消息,此时RocketMQ会自动进行重试。

// 同步发送消息,如果5秒内没有发送成功,则重试3次
DefaultMQProducer producer = new DefaultMQProducer("DefaultProducer");
producer.setRetryTimesWhenSendFailed(3);
producer.send(msg, 5000L);

消费端重试

同样的,由于网络原因,Broker发送消息给消费者后,没有受到消费端的ACK响应,所以Broker又会尝试将消息重新发送给Consumer,在实际开发过程中,我们更应该考虑的是消费端的重试。消费端的消息重试可以分为顺序消息的重试以及无序消息的重试。

  • 顺序消息重试
    对于顺序消息,当消费者消费消息失败后,消息队列 RocketMQ
    会自动不断进行消息重试(每次间隔时间为 1 秒),这时,应用
    会出现消息消费被阻塞的情况。因此,在使用顺序消息时,务必
    保证应用能够及时监控并处理消费失败的情况,避免阻塞现象的
    发生

  • 无序消息重试
    对于无序消息(普通、定时、延时、事务消息),当消费者消费
    消息失败时,可以通过设置返回状态达到消息重试的结果。

    • 最大重试次数
      消息消费失败后,可被消息队列RocketMQ重复投递的最大
      次数。
      TCP协议无序消息重试时间间隔:
      在这里插入图片描述
    • 消费失败后重新配置方式
    • 需要在消息监听器接口的实现中明确进行配置(三种方式任选一种):
      • 返回 ConsumeConcurrentlyStatus.RECONSUME_LATER; (推荐)
      • 返回 Null
      • 抛出异常

    延迟消息

    Producer将消息发送到消息队列RocketMQ服务端,但并不期望立马投递这条消息,而是延迟一定时间后才投递到Consumer进行消费,该消息即延时消息。
    消息生产和消费有时间窗口要求,例如在电商交易中超时未支付关闭订单的场景,在订单创建时会发送一条延时消息。这条消息将会在30分钟以后投递给消费者,消费者收到此消息后需要判断对应的订单是否已完成支付。如支付未完成,则关闭订单。如已完成支付则忽略。通过消息触发一些定时任务,例如在某一固定时间点向用户发送提醒消息。
    定时消息会暂存在名为SCHEDULE_TOPIC_XXXX的topic中,并且会根据
    delayTimeLevel存入特定的queue,queueId = delayTimeLevel –1,即一个queue只存相同延迟的消息,保证具有相同发送延迟的消息能够顺序消费。broker会调度地消费SCHEDULE_TOPIC_XXXX,将消息写入真实的topic。

//
org/apache/rocketmq/store/config/MessageStore
Config.java
private String messageDelayLevel = "1s 5s 10s
30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h
2h";

代码测试
生产者

package com.jjy.produce;

import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;

public class DelayMessageProducer {
    public static void main(String[] args) throws MQClientException, MQBrokerException, RemotingException, InterruptedException {
        DefaultMQProducer producer=new DefaultMQProducer("producer_group");
        producer.setNamesrvAddr("192.168.66.100:9876");
        producer.start();
        Message message=null;
        for(int i=0; i<20;i++){
            message=new Message("tp_demo_3",("hello RocketMQ delayMessage -"+i).getBytes());
           //设置延迟时间0-17 0表示2秒 大于18都是2小时
            message.setDelayTimeLevel(i);
            producer.send(message);
        }
       producer.shutdown();

    }
}

消费者

package com.jjy.consumer;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;

import org.apache.rocketmq.common.message.MessageExt;



public class DelayMessageConsumer {
    public static void main(String[] args) throws MQClientException {
        DefaultMQPushConsumer consumer=new DefaultMQPushConsumer("transactionConsumer-grp");

        consumer.setNamesrvAddr("192.168.66.100:9876");
        System.out.println("==========================================");
        //设置消息重试次数
        consumer.setMaxReconsumeTimes(5);
        //设置可以批量处理
        consumer.setConsumeMessageBatchMaxSize(1);
        //订阅主题
        consumer.subscribe("tp_demo_3","*");
        consumer.setMessageListener((MessageListenerConcurrently) (list, consumeConcurrentlyContext) -> {
            System.out.println(System.currentTimeMillis()/1000);
            for(MessageExt message:list){
                System.out.println(message.getTopic()+"\t"+message.getQueueId()+"\t"+message.getDelayTimeLevel()+"\t"+new String(message.getBody()));
            }
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        });
       consumer.start();

    }
}

运行结果
在这里插入图片描述

消息查询

在这里插入图片描述

在实际开发中,经常需要查看MQ中消息的内容来排查问题。RocketMQ提供了三种消息查询的方式,分别是按Message ID、Message Key以及Unique Key查询。

//返回结果
SendResult [
 sendStatus=SEND_OK,
msgId=C0A801030D4B18B4AAC247DE4A0D0000,
offsetMsgId=C0A8010300002A9F000000000007BEE9,
 messageQueue=MessageQueue [topic=TopicA,
brokerName=broker-a, queueId=0],
 queueOffset=0]
  • 按MessageId查询消息
    Message Id 是消息发送后,在Broker端生成的,其包含了
    Broker的地址、偏移信息,并且会把Message Id作为结果的一
    部分返回。Message Id中属于精确匹配,代表唯一一条消息,
    查询效率更高。

  • 按照Message Key查询消息
    消息的key是开发人员在发送消息之前自行指定的,通常把具有
    业务含义,区分度高的字段作为消息的key,如用户id,订单id
    等。

  • 按照Unique Key查询消息
    除了开发人员指定的消息key,生产者在发送发送消息之前,会
    自动生成一个UNIQ_KEY,设置到消息的属性中,从逻辑上唯一
    代表一条消息

消息在消息队列RocketMQ中存储的时间默认为3天(不建议修
改),即只能查询从消息发送时间算起3天内的消息,三种查询方式
的特点和对比如下表所述:
在这里插入图片描述

二.RocketMQ应用实战

在这里插入图片描述

生产端发送同步消息

同步发送是指消息发送方发出数据后,同步等待,直到收到接收方发回响应之后才发下一个请求
在这里插入图片描述

package com.jjy.produce;

import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.exception.RemotingException;

import java.io.UnsupportedEncodingException;

public class SyncProducer {
    public static void main(String[] args) throws MQClientException, UnsupportedEncodingException, MQBrokerException, RemotingException, InterruptedException {
        //实例化消息生产者
        DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
        // 设置NameServer的地址
        producer.setNamesrvAddr("192.168.66.100:9876");
        // 启动Producer实例
        producer.start();
        for (int i = 0; i < 100; i++) {
            // 创建消息,并指定Topic,Tag和消息体
            Message msg = new Message("TopicTest" /* Topic */,
                    "TagA" /* Tag */,
                    ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
            );
            // 发送消息到一个Broker
            SendResult sendResult = producer.send(msg);
            // 通过sendResult返回消息是否成功送达
            System.out.printf("%s%n", sendResult);
        }
        // 如果不再发送消息,关闭Producer实例。
        producer.shutdown();
    }
}

运行结果

SendResult [sendStatus=SEND_OK, msgId=7F0000015A3863947C6B98F2DFEB0000, offsetMsgId=AC12FA2E00002A9F0000000000012E3A, messageQueue=MessageQueue [topic=TopicTest, brokerName=chenl346-vszbn, queueId=1], queueOffset=100]
SendResult [sendStatus=SEND_OK, msgId=7F0000015A3863947C6B98F2E0030001, offsetMsgId=AC12FA2E00002A9F0000000000012EF8, messageQueue=MessageQueue [topic=TopicTest, brokerName=chenl346-vszbn, queueId=2], queueOffset=101]
SendResult [sendStatus=SEND_OK, msgId=7F0000015A3863947C6B98F2E0060002, offsetMsgId=AC12FA2E00002A9F0000000000012FB6, messageQueue=MessageQueue [topic=TopicTest, brokerName=chenl346-vszbn, queueId=3], queueOffset=100]
SendResult [sendStatus=SEND_OK, msgId=7F0000015A3863947C6B98F2E0090003, offsetMsgId=AC12FA2E00002A9F0000000000013074, messageQueue=MessageQueue [topic=TopicTest, brokerName=chenl346-vszbn, queueId=0], queueOffset=99]
... ...

Message ID:消息的全局唯一标识(内部机制的ID生成是使用机器IP和消息偏移量的组成,所以有可能重复,如果是幂等性还是最好考虑Key),由消息队列MQ系统自动生成,唯一标识某条消息。

SendStatus:发送的标识。成功,失败等

Queue:相当于是Topic的分区;用于并行发送和接收消息

发送异步消息

异步消息通常用在对响应时间敏感的业务场景,即发送端不能容忍长时间地等待Broker的响应。在这里插入图片描述
消息发送方在发送了一条消息后,不等接收方发回响应,接着进行第二条消息发送。发送方通过回调接口的方式接收服务器响应,并对响应结果进行处理

package com.jjy.produce;

import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.CountDownLatch2;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.exception.RemotingException;

import java.io.UnsupportedEncodingException;
import java.util.concurrent.TimeUnit;

public class AsyncProducer {
    public static void main(String[] args) throws MQClientException, UnsupportedEncodingException, RemotingException, InterruptedException {
        // 实例化消息生产者Producer
        DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
        // 设置NameServer的地址
        producer.setNamesrvAddr("192.168.66.100:9876");
        // 启动Producer实例
        producer.start();
        //消息失败重试次数
        producer.setRetryTimesWhenSendAsyncFailed(0);
        int messageCount = 100;
        // 根据消息数量实例化倒计时计算器
        final CountDownLatch2 countDownLatch = new CountDownLatch2(messageCount);
        //创建消息
        for(int i=0;i<messageCount;i++){
            final int index=i;
           Message msg=new Message("topic_demo","TagA","OrderID188","Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
            // SendCallback接收异步返回结果的回调
            producer.send(msg, new SendCallback() {
                @Override
                public void onSuccess(SendResult sendResult) {
                    countDownLatch.countDown();
                    System.out.printf("%-10d OK %s %n", index,
                            sendResult.getMsgId());
                }

                @Override
                public void onException(Throwable e) {
                    countDownLatch.countDown();
                    System.out.printf("%-10d Exception %s %n", index, e);
                    e.printStackTrace();
                }
            });
           
        }
        // 等待5s
        countDownLatch.await(5, TimeUnit.SECONDS);
        // 如果不再发送消息,关闭Producer实例。
        producer.shutdown();
    }
}

单向发送消息

这种方式主要用在不特别关心发送结果的场景,例如日志发送
在这里插入图片描述
单向(Oneway)发送特点为发送方只负责发送消息,不等待服务器回应且没有回调函数触发,即只发送请求不等待应答。此方式发送消息的过程耗时非常短,一般在微秒级别

package com.jjy.produce;

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;

public class OnewayProducer {
    public static void main(String[] args) throws Exception{
        // 实例化消息生产者Producer
        DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
        // 设置NameServer的地址
        producer.setNamesrvAddr("192.168.66.100:9876");
        // 启动Producer实例
        producer.start();
        for (int i = 0; i < 100; i++) {
            // 创建消息,并指定Topic,Tag和消息体
            Message msg = new Message("TopicTest" /* Topic */,
                    "TagA" /* Tag */,
                    ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
            );
            // 发送单向消息,没有任何返回结果
            producer.sendOneway(msg);


        }
        // 如果不再发送消息,关闭Producer实例。
        producer.shutdown();
    }
}


消息发送时的权衡

发送方式发送TPS发送结果反馈可靠性使用场景
同步发送可靠邮件、短信、推送
异步发送可靠视频转码
单向发送最快可能丢失日志收集

顺序发送消息

一个订单的顺序流程是:创建、付款、推送、完成。订单号相同的消息会被先后发送到同一个队列中,消费时,同一个OrderId获取到的肯定是同一个队列。

package com.jjy.produce;

import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.remoting.exception.RemotingException;

import java.util.List;

public class OrderProducer {
    public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException, MQBrokerException {
        DefaultMQProducer producer = new DefaultMQProducer("producer_grp_02");
        producer.setNamesrvAddr("192.168.66.100:9876");
        producer.start();
        // 获取指定主题的MQ列表
        final List<MessageQueue> messageQueues = producer.fetchPublishMessageQueues("tp_demo_11");


        Message message = null;
        MessageQueue messageQueue = null;
        for (int i = 0; i < 100; i++) {
            // 采用轮询的方式指定MQ,发送订单消息,保证同一个订单的消息按顺序
            // 发送到同一个MQ
            messageQueue = messageQueues.get(i % 8);
            //创建message对象
            //发送创建订单消息
            message = new Message("tp_demo_11", ("hello rocketmq order create - " + i).getBytes());
            producer.send(message, messageQueue);
            //发送付款订单消息
            message = new Message("tp_demo_11", ("hello rocketmq order pay - " + i).getBytes());
            producer.send(message, messageQueue);
            //发送订单送货消息
            message = new Message("tp_demo_11", ("hello rocketmq order delivery - " + i).getBytes());
            producer.send(message, messageQueue);
        }


        producer.shutdown();
    }
}

消费顺序消息

package com.jjy.consumer;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.List;

public class OrderConsumer {
    public static void main(String[] args) throws MQClientException {
        //创建消费者对象
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_grp_02");
        consumer.setNamesrvAddr("192.168.66.100:9876");
        //订阅主题
        consumer.subscribe("tp_demo_11", "*");

       //最小消费线程数
        consumer.setConsumeThreadMin(1);
        //最大消费线程数
        consumer.setConsumeThreadMax(1);
        //一次拉取的消息数量
        consumer.setPullBatchSize(1);
        //一次消费的消息数量
        consumer.setConsumeMessageBatchMaxSize(1);


        // 使用有序消息监听器
        consumer.setMessageListener(new MessageListenerOrderly() {
            @Override
            public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs,  ConsumeOrderlyContext context) {


                for (MessageExt msg : msgs) {
                    System.out.println(
                            msg.getTopic() + "\t" +
                                    msg.getQueueId() + "\t" +
                                    new String(msg.getBody())
                    );
                }
                return ConsumeOrderlyStatus.SUCCESS;
            }
        });
        consumer.start();
    }
}

全局顺序消息

生产者

package com.jjy.produce;

import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.MessageQueueSelector;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.remoting.exception.RemotingException;

import java.util.List;

public class GlobalOrderProducer {
   public static void main(String[] args) throws MQClientException, MQBrokerException, RemotingException, InterruptedException {

       DefaultMQProducer producer = new DefaultMQProducer("producer_grp_02");
       producer.setNamesrvAddr("192.168.66.100:9876");
       producer.start();
       Message message = null;
       for(int i=0;i<100;i++){
           message=new Message("tp_demo_11",("全局有序消息...."+i).getBytes());
           producer.send(message, new MessageQueueSelector() {
               @Override
               public MessageQueue select(List<MessageQueue> list, Message message, Object o) {
                   return list.get((Integer)0);
               }
           },1);

       }
       producer.shutdown();
   }
}

消费者

package com.jjy.consumer;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.List;

public class GlobalConsumer {
    public static void main(String[] args) throws MQClientException {
        //创建消费者对象
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_grp_02");
        consumer.setNamesrvAddr("192.168.66.100:9876");
        //订阅主题
        consumer.subscribe("tp_demo_11", "*");

        //最小消费线程数
        consumer.setConsumeThreadMin(1);
        //最大消费线程数
        consumer.setConsumeThreadMax(1);
        //一次拉取的消息数量
        consumer.setPullBatchSize(1);
        //一次消费的消息数量
        consumer.setConsumeMessageBatchMaxSize(1);
        consumer.setMessageListener(new MessageListenerOrderly() {
            @Override
            public ConsumeOrderlyStatus consumeMessage(List<MessageExt> list, ConsumeOrderlyContext consumeOrderlyContext) {
                for (MessageExt msg : list) {
                    System.out.println("消费线程=" + Thread.currentThread().getName() +
                            ", queueId=" + msg.getQueueId() + ", 消息内容:" + new String(msg.getBody()));
                }
                return ConsumeOrderlyStatus.SUCCESS;
            }
        });
        consumer.start();
    }
}

延迟消息

生产者

package com.jjy.produce;

import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;

public class DelayMessageProducer {
    public static void main(String[] args) throws MQClientException, MQBrokerException, RemotingException, InterruptedException {
        DefaultMQProducer producer=new DefaultMQProducer("producer_group");
        producer.setNamesrvAddr("192.168.66.100:9876");
        producer.start();
        Message message=null;
        for(int i=0; i<20;i++){
            message=new Message("tp_demo_3",("hello RocketMQ delayMessage -"+i).getBytes());
           // 设置延迟时间级别0,18,0表示不延迟,18表示延迟2h,大于18的都是2h
            message.setDelayTimeLevel(i);
            producer.send(message);
        }
       producer.shutdown();

    }
}

消费者

package com.jjy.consumer;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;

import org.apache.rocketmq.common.message.MessageExt;



public class DelayMessageConsumer {
    public static void main(String[] args) throws MQClientException {
        DefaultMQPushConsumer consumer=new DefaultMQPushConsumer("transactionConsumer-grp");

        consumer.setNamesrvAddr("192.168.66.100:9876");
        System.out.println("==========================================");
        //设置消息重试次数
        consumer.setMaxReconsumeTimes(5);
        //设置可以批量处理
        consumer.setConsumeMessageBatchMaxSize(1);
        //订阅主题
        consumer.subscribe("tp_demo_3","*");
        consumer.setMessageListener((MessageListenerConcurrently) (list, consumeConcurrentlyContext) -> {
            System.out.println(System.currentTimeMillis()/1000);
            for(MessageExt message:list){
                System.out.println(message.getTopic()+"\t"+message.getQueueId()+"\t"+message.getDelayTimeLevel()+"\t"+new String(message.getBody()));
            }
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        });
       consumer.start();

    }
}

事务消息

生产者

package com.jjy.produce;

import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;

public class  TransactionProducer {
    public static void main(String[] args) throws MQClientException {
        TransactionListener listener=new TransactionListener() {
            @Override
            public LocalTransactionState executeLocalTransaction(Message message, Object o) {
                // 当发送事务消息prepare(half)成功后,调用该方法执行本地事务
                System.out.println("执行本地事务......");
               // return LocalTransactionState.COMMIT_MESSAGE;
                //使用下面事务回滚 消费端无法接收到消息了
                try {
                    Thread.sleep(100000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                return LocalTransactionState.ROLLBACK_MESSAGE;
            }

            @Override
            public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {
                // 该方法用于获取本地事务执行的状态。
                System.out.println("检查本地事务的状态:" + messageExt);
                 return LocalTransactionState.COMMIT_MESSAGE;
            }
        };
        //创建事务消息生产者
        TransactionMQProducer producer=new TransactionMQProducer("producer_grp_01");
       //设置事务监听器
        producer.setTransactionListener(listener);
        producer.setNamesrvAddr("192.168.66.100:9876");
        producer.start();
        Message message=null;
        message=new Message("tp_demo_11","hello translation message".getBytes());
        producer.sendMessageInTransaction(message,"{\" name\":\"zhansan\"}");


    }
}

消费者

package com.jjy.consumer;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.List;

public class TransactionMsgConsumer {
    public static void main(String[] args) throws MQClientException {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("txconsumer_grp_12_01");
        consumer.setNamesrvAddr("192.168.66.100:9876");
        consumer.subscribe("tp_demo_11", "*");
        consumer.setMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                for (MessageExt msg : msgs) {
                    System.out.println(new String(msg.getBody()));
                }

                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        consumer.start();
    }

}


消息查询

package com.itbaizhan.consumer;


public class QueryingMessageDemo {
  public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException, MQBrokerException {
    //创建消费者对象
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_grp_01");
    //设置nameserver地址
    consumer.setNamesrvAddr("192.168.66.100:9876");
    //设置消息监听器
    consumer.setMessageListener(new MessageListenerConcurrently() {
      @Override
      public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
       }
     });
    consumer.start();
    //根据messageId查询消息
    MessageExt message = consumer.viewMessage("topic_springboot_demo_02", "C0A88B8000002A9F000000000000C8E8");
    System.out.println(message);
    System.out.println(message.getMsgId());
    consumer.shutdown();
   }
}

如果我的内容对你有帮助,请点赞,评论,收藏。创作不易,大家的支持就是我坚持下去的动力!
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1452983.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Packet Tracer - Configuring ASA Basic Settings and Firewall Using CLI

Packet Tracer - 使用CLI配置ASA基本设置和防火墙 IP地址表 目标 验证连接并探索ASA设备使用CLI配置ASA的基本设置和接口安全级别使用CLI配置路由、地址转换和检查策略配置DHCP、AAA和SSH服务配置DMZ区域、静态NAT和访问控制列表&#xff08;ACL&#xff09; 场景 您的公司…

grafana配置钉钉告警模版(一)

1、配置钉钉告警模版 创建钉钉告警模版&#xff0c;然后在创建钉钉告警时调用模版。 定义发送内容具体代码 my_text_alert_list 是模版名称后面再配置钉钉告警时需要调用。 {{/* 定义消息体片段 */}} {{ define "my_text_alert_list" }}{{ range . }}告警名称&…

redis为什么使用跳跃表而不是树

Redis中支持五种数据类型中有序集合Sorted Set的底层数据结构使用的跳跃表&#xff0c;为何不使用其他的如平衡二叉树、b树等数据结构呢&#xff1f; 1&#xff0c;redis的设计目标、性能需求&#xff1a; redis是高性能的非关系型&#xff08;NoSQL&#xff09;内存键值数据…

在Postgresql 下安装QGIS

安装QGIS的前提是需要 在windows下安装Postgres&#xff0c;具体可以参考文章&#xff1a; Windows 安装和连接使用 PgSql数据库 安装GIS的具体步骤如下&#xff1a; 一.打开 Application Stack Builder 二.选择默认端口和安装目标 三.选择【Spatial Extensions】 四.选择安装…

链式结构实现队列

链式结构实现队列 1.队列1.1队列的概念及结构1.2队列的实现 2. 队列的各种函数实现3. 队列的全部代码实现 1.队列 1.1队列的概念及结构 队列&#xff1a;只允许在一端进行插入数据操作&#xff0c;在另一端进行删除数据操作的特殊线性表&#xff0c;队列具有先进先出 FIFO(Fi…

【大厂AI课学习笔记】【2.1 人工智能项目开发规划与目标】(5)数据管理

今天学习了数据管理&#xff0c;以及数据管理和数据治理的区别和联系。 数据管理&#xff1a;利用计算机硬件和软件技术对数据进行有效的收集、存储、处理和应用的过程其目的在于充分有效地发挥数据的作用。 实现数据有效管理的关键是数据组织。 数据管理和数据治理的区别&am…

无人驾驶控制算法LQR和MPC的仿真实现

1. LQR控制器 1.1 问题陈述 考虑一个质量为 m m m 的滑块在光滑的一维地面上运动。初始时&#xff0c;滑块的位置和速度均为 0 0 0。我们的目标是设计一个控制器&#xff0c;基于传感器测得的滑块位置 x x x&#xff0c;为滑块提供外力 u u u&#xff0c;使其能够跟随参考…

每日一题——LeetCode1455.检查单词是否为句中其他单词的前缀

方法一 js函数slice() 将字符串按空格符分割为单词数组&#xff0c;记searchWord的长度为n&#xff0c;分割每个单词的前n位看是否和searchWord匹配 var isPrefixOfWord function(sentence, searchWord) {let res sentence.split(" ")for(i 0 ; i < res.lengt…

七天入门大模型 :大模型LLM 训练理论和实战最强总结!

本文对于想入门大模型、面试大模型岗位、大模型实具有很强的指导意义。喜欢记得收藏、关注、点赞 文章目录 技术交流群用通俗易懂方式讲解系列总览介绍预训练范式如何确定自己的模型需要做什么训练&#xff1f;模型推理的一般过程PyTorch 框架设备PyTorch基本训练代码范例Trans…

【复现】cellinx摄像设备 未授权漏洞_50

目录 一.概述 二 .漏洞影响 三.漏洞复现 1. 漏洞一&#xff1a; 四.修复建议&#xff1a; 五. 搜索语法&#xff1a; 六.免责声明 一.概述 cellinx是一家韩国的摄像设备 二 .漏洞影响 通过未授权访问可以创建用户进入后台&#xff0c;可能造成系统功能破坏。 三.漏洞复…

CCF编程能力等级认证GESP—C++8级—20231209

CCF编程能力等级认证GESP—C8级—20231209 单选题&#xff08;每题 2 分&#xff0c;共 30 分&#xff09;判断题&#xff08;每题 2 分&#xff0c;共 20 分&#xff09;编程题 (每题 25 分&#xff0c;共 50 分)奖品分配大量的工作沟通 答案及解析单选题判断题编程题1编程题2…

GIS利用不舒适指数绘制地区的生物气候舒适度图

生物气候舒适度定义了最适宜的气候条件,在这种条件下,人们感到健康和充满活力。生物气候舒适度地图对城市规划研究特别有用。温度、相对湿度和风速等要素对评估生物气候舒适度非常重要。[1] 人们已经得出了许多不同的指数来确定生物气候舒适度。在本博文中,我们将使用广泛使…

基于SringBoot+Vue的大学生社团管理系统

末尾获取源码作者介绍&#xff1a;大家好&#xff0c;我是墨韵&#xff0c;本人4年开发经验&#xff0c;专注定制项目开发 更多项目&#xff1a;CSDN主页YAML墨韵 学如逆水行舟&#xff0c;不进则退。学习如赶路&#xff0c;不能慢一步。 目录 一、项目简介 1.1 研究背景 1.…

英文论文(sci)解读复现【NO.21】一种基于空间坐标的轻量级目标检测器无人机航空图像的自注意

此前出了目标检测算法改进专栏&#xff0c;但是对于应用于什么场景&#xff0c;需要什么改进方法对应与自己的应用场景有效果&#xff0c;并且多少改进点能发什么水平的文章&#xff0c;为解决大家的困惑&#xff0c;此系列文章旨在给大家解读发表高水平学术期刊中的 SCI论文&a…

leetcode hot100不同路径Ⅱ

本题和之前做的不同路径类似&#xff0c;区别是本题中加入了障碍&#xff0c;遇到障碍之后需要避开&#xff08;注意&#xff0c;这里依旧是只能向下向右移动&#xff09;&#xff0c;那么也就是说&#xff0c;有障碍的点是到达不了的&#xff0c;并且 &#xff0c;我在初始化的…

Java基于微信小程序的医院挂号小程序,附源码

博主介绍&#xff1a;✌程序员徐师兄、7年大厂程序员经历。全网粉丝12w、csdn博客专家、掘金/华为云/阿里云/InfoQ等平台优质作者、专注于Java技术领域和毕业项目实战✌ &#x1f345;文末获取源码联系&#x1f345; &#x1f447;&#x1f3fb; 精彩专栏推荐订阅&#x1f447;…

Atmel ATSHA204应用总结

1 ACES软件安装 Atmel Crypto Evaluation Studio (ACES) https://www.microchip.com/DevelopmentTools/ProductDetails/PartNO/Atmel%20Crypto%20%20Studio%20(ACES) 2 基本概念 ACES CE&#xff1a;Atmel Crypto Evalution Studio Configuration Environment&#xff08;基于加…

Intelij Terminal中文乱码解决

第一&#xff1a; &#xff08;重启Intelij生效&#xff09; -Dfile.encodingUTF-8 第二&#xff1a; &#xff08;重启Intelij生效&#xff09; 如果还不行&#xff0c;第三&#xff1a; 测试结果很ok&#xff1a;

红队打靶练习:IMF: 1

目录 信息收集 1、arp 2、nmap 3、nikto 目录探测 gobuster dirsearch WEB 信息收集 get flag1 get flag2 get flag3 SQL注入 漏洞探测 脱库 get flag4 文件上传 反弹shell 提权 get flag5 get flag6 信息收集 1、arp ┌──(root㉿ru)-[~/kali] └─# a…

使用MinIO S3存储桶备份Weaviate

Weaviate 是一个开创性的开源向量数据库&#xff0c;旨在通过利用机器学习模型来增强语义搜索。与依赖关键字匹配的传统搜索引擎不同&#xff0c;Weaviate 采用语义相似性原则。这种创新方法将各种形式的数据&#xff08;文本、图像等&#xff09;转换为矢量表示形式&#xff0…