2.12日学习打卡
目录:
- 2.12日学习打卡
- 一. RocketMQ高级特性(续)
- 消息重试
- 延迟消息
- 消息查询
- 二.RocketMQ应用实战
- 生产端发送同步消息
- 发送异步消息
- 单向发送消息
- 顺序发送消息
- 消费顺序消息
- 全局顺序消息
- 延迟消息
- 事务消息
- 消息查询
一. RocketMQ高级特性(续)
消息重试
生产端重试
例如由于网络原因导致生产者发送消息到MQ失败,即发送端没有收到Broker的ACK,导致最终Consumer无法消费消息,此时RocketMQ会自动进行重试。
// 同步发送消息,如果5秒内没有发送成功,则重试3次
DefaultMQProducer producer = new DefaultMQProducer("DefaultProducer");
producer.setRetryTimesWhenSendFailed(3);
producer.send(msg, 5000L);
消费端重试
同样的,由于网络原因,Broker发送消息给消费者后,没有受到消费端的ACK响应,所以Broker又会尝试将消息重新发送给Consumer,在实际开发过程中,我们更应该考虑的是消费端的重试。消费端的消息重试可以分为顺序消息的重试以及无序消息的重试。
-
顺序消息重试
对于顺序消息,当消费者消费消息失败后,消息队列 RocketMQ
会自动不断进行消息重试(每次间隔时间为 1 秒),这时,应用
会出现消息消费被阻塞的情况。因此,在使用顺序消息时,务必
保证应用能够及时监控并处理消费失败的情况,避免阻塞现象的
发生 -
无序消息重试
对于无序消息(普通、定时、延时、事务消息),当消费者消费
消息失败时,可以通过设置返回状态达到消息重试的结果。- 最大重试次数
消息消费失败后,可被消息队列RocketMQ重复投递的最大
次数。
TCP协议无序消息重试时间间隔:
- 消费失败后重新配置方式
- 需要在消息监听器接口的实现中明确进行配置(三种方式任选一种):
- 返回 ConsumeConcurrentlyStatus.RECONSUME_LATER; (推荐)
- 返回 Null
- 抛出异常
延迟消息
Producer将消息发送到消息队列RocketMQ服务端,但并不期望立马投递这条消息,而是延迟一定时间后才投递到Consumer进行消费,该消息即延时消息。
消息生产和消费有时间窗口要求,例如在电商交易中超时未支付关闭订单的场景,在订单创建时会发送一条延时消息。这条消息将会在30分钟以后投递给消费者,消费者收到此消息后需要判断对应的订单是否已完成支付。如支付未完成,则关闭订单。如已完成支付则忽略。通过消息触发一些定时任务,例如在某一固定时间点向用户发送提醒消息。
定时消息会暂存在名为SCHEDULE_TOPIC_XXXX的topic中,并且会根据
delayTimeLevel存入特定的queue,queueId = delayTimeLevel –1,即一个queue只存相同延迟的消息,保证具有相同发送延迟的消息能够顺序消费。broker会调度地消费SCHEDULE_TOPIC_XXXX,将消息写入真实的topic。 - 最大重试次数
//
org/apache/rocketmq/store/config/MessageStore
Config.java
private String messageDelayLevel = "1s 5s 10s
30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h
2h";
代码测试
生产者
package com.jjy.produce;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;
public class DelayMessageProducer {
public static void main(String[] args) throws MQClientException, MQBrokerException, RemotingException, InterruptedException {
DefaultMQProducer producer=new DefaultMQProducer("producer_group");
producer.setNamesrvAddr("192.168.66.100:9876");
producer.start();
Message message=null;
for(int i=0; i<20;i++){
message=new Message("tp_demo_3",("hello RocketMQ delayMessage -"+i).getBytes());
//设置延迟时间0-17 0表示2秒 大于18都是2小时
message.setDelayTimeLevel(i);
producer.send(message);
}
producer.shutdown();
}
}
消费者
package com.jjy.consumer;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
public class DelayMessageConsumer {
public static void main(String[] args) throws MQClientException {
DefaultMQPushConsumer consumer=new DefaultMQPushConsumer("transactionConsumer-grp");
consumer.setNamesrvAddr("192.168.66.100:9876");
System.out.println("==========================================");
//设置消息重试次数
consumer.setMaxReconsumeTimes(5);
//设置可以批量处理
consumer.setConsumeMessageBatchMaxSize(1);
//订阅主题
consumer.subscribe("tp_demo_3","*");
consumer.setMessageListener((MessageListenerConcurrently) (list, consumeConcurrentlyContext) -> {
System.out.println(System.currentTimeMillis()/1000);
for(MessageExt message:list){
System.out.println(message.getTopic()+"\t"+message.getQueueId()+"\t"+message.getDelayTimeLevel()+"\t"+new String(message.getBody()));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
consumer.start();
}
}
运行结果
消息查询
在实际开发中,经常需要查看MQ中消息的内容来排查问题。RocketMQ提供了三种消息查询的方式,分别是按Message ID、Message Key以及Unique Key查询。
//返回结果
SendResult [
sendStatus=SEND_OK,
msgId=C0A801030D4B18B4AAC247DE4A0D0000,
offsetMsgId=C0A8010300002A9F000000000007BEE9,
messageQueue=MessageQueue [topic=TopicA,
brokerName=broker-a, queueId=0],
queueOffset=0]
-
按MessageId查询消息
Message Id 是消息发送后,在Broker端生成的,其包含了
Broker的地址、偏移信息,并且会把Message Id作为结果的一
部分返回。Message Id中属于精确匹配,代表唯一一条消息,
查询效率更高。 -
按照Message Key查询消息
消息的key是开发人员在发送消息之前自行指定的,通常把具有
业务含义,区分度高的字段作为消息的key,如用户id,订单id
等。 -
按照Unique Key查询消息
除了开发人员指定的消息key,生产者在发送发送消息之前,会
自动生成一个UNIQ_KEY,设置到消息的属性中,从逻辑上唯一
代表一条消息
消息在消息队列RocketMQ中存储的时间默认为3天(不建议修
改),即只能查询从消息发送时间算起3天内的消息,三种查询方式
的特点和对比如下表所述:
二.RocketMQ应用实战
生产端发送同步消息
同步发送是指消息发送方发出数据后,同步等待,直到收到接收方发回响应之后才发下一个请求
package com.jjy.produce;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.exception.RemotingException;
import java.io.UnsupportedEncodingException;
public class SyncProducer {
public static void main(String[] args) throws MQClientException, UnsupportedEncodingException, MQBrokerException, RemotingException, InterruptedException {
//实例化消息生产者
DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
// 设置NameServer的地址
producer.setNamesrvAddr("192.168.66.100:9876");
// 启动Producer实例
producer.start();
for (int i = 0; i < 100; i++) {
// 创建消息,并指定Topic,Tag和消息体
Message msg = new Message("TopicTest" /* Topic */,
"TagA" /* Tag */,
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
);
// 发送消息到一个Broker
SendResult sendResult = producer.send(msg);
// 通过sendResult返回消息是否成功送达
System.out.printf("%s%n", sendResult);
}
// 如果不再发送消息,关闭Producer实例。
producer.shutdown();
}
}
运行结果
SendResult [sendStatus=SEND_OK, msgId=7F0000015A3863947C6B98F2DFEB0000, offsetMsgId=AC12FA2E00002A9F0000000000012E3A, messageQueue=MessageQueue [topic=TopicTest, brokerName=chenl346-vszbn, queueId=1], queueOffset=100]
SendResult [sendStatus=SEND_OK, msgId=7F0000015A3863947C6B98F2E0030001, offsetMsgId=AC12FA2E00002A9F0000000000012EF8, messageQueue=MessageQueue [topic=TopicTest, brokerName=chenl346-vszbn, queueId=2], queueOffset=101]
SendResult [sendStatus=SEND_OK, msgId=7F0000015A3863947C6B98F2E0060002, offsetMsgId=AC12FA2E00002A9F0000000000012FB6, messageQueue=MessageQueue [topic=TopicTest, brokerName=chenl346-vszbn, queueId=3], queueOffset=100]
SendResult [sendStatus=SEND_OK, msgId=7F0000015A3863947C6B98F2E0090003, offsetMsgId=AC12FA2E00002A9F0000000000013074, messageQueue=MessageQueue [topic=TopicTest, brokerName=chenl346-vszbn, queueId=0], queueOffset=99]
... ...
Message ID:消息的全局唯一标识(内部机制的ID生成是使用机器IP和消息偏移量的组成,所以有可能重复,如果是幂等性还是最好考虑Key),由消息队列MQ系统自动生成,唯一标识某条消息。
SendStatus:发送的标识。成功,失败等
Queue:相当于是Topic的分区;用于并行发送和接收消息
发送异步消息
异步消息通常用在对响应时间敏感的业务场景,即发送端不能容忍长时间地等待Broker的响应。
消息发送方在发送了一条消息后,不等接收方发回响应,接着进行第二条消息发送。发送方通过回调接口的方式接收服务器响应,并对响应结果进行处理
package com.jjy.produce;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.CountDownLatch2;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.exception.RemotingException;
import java.io.UnsupportedEncodingException;
import java.util.concurrent.TimeUnit;
public class AsyncProducer {
public static void main(String[] args) throws MQClientException, UnsupportedEncodingException, RemotingException, InterruptedException {
// 实例化消息生产者Producer
DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
// 设置NameServer的地址
producer.setNamesrvAddr("192.168.66.100:9876");
// 启动Producer实例
producer.start();
//消息失败重试次数
producer.setRetryTimesWhenSendAsyncFailed(0);
int messageCount = 100;
// 根据消息数量实例化倒计时计算器
final CountDownLatch2 countDownLatch = new CountDownLatch2(messageCount);
//创建消息
for(int i=0;i<messageCount;i++){
final int index=i;
Message msg=new Message("topic_demo","TagA","OrderID188","Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
// SendCallback接收异步返回结果的回调
producer.send(msg, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
countDownLatch.countDown();
System.out.printf("%-10d OK %s %n", index,
sendResult.getMsgId());
}
@Override
public void onException(Throwable e) {
countDownLatch.countDown();
System.out.printf("%-10d Exception %s %n", index, e);
e.printStackTrace();
}
});
}
// 等待5s
countDownLatch.await(5, TimeUnit.SECONDS);
// 如果不再发送消息,关闭Producer实例。
producer.shutdown();
}
}
单向发送消息
这种方式主要用在不特别关心发送结果的场景,例如日志发送
单向(Oneway)发送特点为发送方只负责发送消息,不等待服务器回应且没有回调函数触发,即只发送请求不等待应答。此方式发送消息的过程耗时非常短,一般在微秒级别
package com.jjy.produce;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
public class OnewayProducer {
public static void main(String[] args) throws Exception{
// 实例化消息生产者Producer
DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
// 设置NameServer的地址
producer.setNamesrvAddr("192.168.66.100:9876");
// 启动Producer实例
producer.start();
for (int i = 0; i < 100; i++) {
// 创建消息,并指定Topic,Tag和消息体
Message msg = new Message("TopicTest" /* Topic */,
"TagA" /* Tag */,
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
);
// 发送单向消息,没有任何返回结果
producer.sendOneway(msg);
}
// 如果不再发送消息,关闭Producer实例。
producer.shutdown();
}
}
消息发送时的权衡
发送方式 | 发送TPS | 发送结果反馈 | 可靠性 | 使用场景 |
---|---|---|---|---|
同步发送 | 快 | 有 | 可靠 | 邮件、短信、推送 |
异步发送 | 快 | 有 | 可靠 | 视频转码 |
单向发送 | 最快 | 无 | 可能丢失 | 日志收集 |
顺序发送消息
一个订单的顺序流程是:创建、付款、推送、完成。订单号相同的消息会被先后发送到同一个队列中,消费时,同一个OrderId获取到的肯定是同一个队列。
package com.jjy.produce;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.remoting.exception.RemotingException;
import java.util.List;
public class OrderProducer {
public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException, MQBrokerException {
DefaultMQProducer producer = new DefaultMQProducer("producer_grp_02");
producer.setNamesrvAddr("192.168.66.100:9876");
producer.start();
// 获取指定主题的MQ列表
final List<MessageQueue> messageQueues = producer.fetchPublishMessageQueues("tp_demo_11");
Message message = null;
MessageQueue messageQueue = null;
for (int i = 0; i < 100; i++) {
// 采用轮询的方式指定MQ,发送订单消息,保证同一个订单的消息按顺序
// 发送到同一个MQ
messageQueue = messageQueues.get(i % 8);
//创建message对象
//发送创建订单消息
message = new Message("tp_demo_11", ("hello rocketmq order create - " + i).getBytes());
producer.send(message, messageQueue);
//发送付款订单消息
message = new Message("tp_demo_11", ("hello rocketmq order pay - " + i).getBytes());
producer.send(message, messageQueue);
//发送订单送货消息
message = new Message("tp_demo_11", ("hello rocketmq order delivery - " + i).getBytes());
producer.send(message, messageQueue);
}
producer.shutdown();
}
}
消费顺序消息
package com.jjy.consumer;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;
public class OrderConsumer {
public static void main(String[] args) throws MQClientException {
//创建消费者对象
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_grp_02");
consumer.setNamesrvAddr("192.168.66.100:9876");
//订阅主题
consumer.subscribe("tp_demo_11", "*");
//最小消费线程数
consumer.setConsumeThreadMin(1);
//最大消费线程数
consumer.setConsumeThreadMax(1);
//一次拉取的消息数量
consumer.setPullBatchSize(1);
//一次消费的消息数量
consumer.setConsumeMessageBatchMaxSize(1);
// 使用有序消息监听器
consumer.setMessageListener(new MessageListenerOrderly() {
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
for (MessageExt msg : msgs) {
System.out.println(
msg.getTopic() + "\t" +
msg.getQueueId() + "\t" +
new String(msg.getBody())
);
}
return ConsumeOrderlyStatus.SUCCESS;
}
});
consumer.start();
}
}
全局顺序消息
生产者
package com.jjy.produce;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.MessageQueueSelector;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.remoting.exception.RemotingException;
import java.util.List;
public class GlobalOrderProducer {
public static void main(String[] args) throws MQClientException, MQBrokerException, RemotingException, InterruptedException {
DefaultMQProducer producer = new DefaultMQProducer("producer_grp_02");
producer.setNamesrvAddr("192.168.66.100:9876");
producer.start();
Message message = null;
for(int i=0;i<100;i++){
message=new Message("tp_demo_11",("全局有序消息...."+i).getBytes());
producer.send(message, new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> list, Message message, Object o) {
return list.get((Integer)0);
}
},1);
}
producer.shutdown();
}
}
消费者
package com.jjy.consumer;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;
public class GlobalConsumer {
public static void main(String[] args) throws MQClientException {
//创建消费者对象
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_grp_02");
consumer.setNamesrvAddr("192.168.66.100:9876");
//订阅主题
consumer.subscribe("tp_demo_11", "*");
//最小消费线程数
consumer.setConsumeThreadMin(1);
//最大消费线程数
consumer.setConsumeThreadMax(1);
//一次拉取的消息数量
consumer.setPullBatchSize(1);
//一次消费的消息数量
consumer.setConsumeMessageBatchMaxSize(1);
consumer.setMessageListener(new MessageListenerOrderly() {
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> list, ConsumeOrderlyContext consumeOrderlyContext) {
for (MessageExt msg : list) {
System.out.println("消费线程=" + Thread.currentThread().getName() +
", queueId=" + msg.getQueueId() + ", 消息内容:" + new String(msg.getBody()));
}
return ConsumeOrderlyStatus.SUCCESS;
}
});
consumer.start();
}
}
延迟消息
生产者
package com.jjy.produce;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;
public class DelayMessageProducer {
public static void main(String[] args) throws MQClientException, MQBrokerException, RemotingException, InterruptedException {
DefaultMQProducer producer=new DefaultMQProducer("producer_group");
producer.setNamesrvAddr("192.168.66.100:9876");
producer.start();
Message message=null;
for(int i=0; i<20;i++){
message=new Message("tp_demo_3",("hello RocketMQ delayMessage -"+i).getBytes());
// 设置延迟时间级别0,18,0表示不延迟,18表示延迟2h,大于18的都是2h
message.setDelayTimeLevel(i);
producer.send(message);
}
producer.shutdown();
}
}
消费者
package com.jjy.consumer;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
public class DelayMessageConsumer {
public static void main(String[] args) throws MQClientException {
DefaultMQPushConsumer consumer=new DefaultMQPushConsumer("transactionConsumer-grp");
consumer.setNamesrvAddr("192.168.66.100:9876");
System.out.println("==========================================");
//设置消息重试次数
consumer.setMaxReconsumeTimes(5);
//设置可以批量处理
consumer.setConsumeMessageBatchMaxSize(1);
//订阅主题
consumer.subscribe("tp_demo_3","*");
consumer.setMessageListener((MessageListenerConcurrently) (list, consumeConcurrentlyContext) -> {
System.out.println(System.currentTimeMillis()/1000);
for(MessageExt message:list){
System.out.println(message.getTopic()+"\t"+message.getQueueId()+"\t"+message.getDelayTimeLevel()+"\t"+new String(message.getBody()));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
consumer.start();
}
}
事务消息
生产者
package com.jjy.produce;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
public class TransactionProducer {
public static void main(String[] args) throws MQClientException {
TransactionListener listener=new TransactionListener() {
@Override
public LocalTransactionState executeLocalTransaction(Message message, Object o) {
// 当发送事务消息prepare(half)成功后,调用该方法执行本地事务
System.out.println("执行本地事务......");
// return LocalTransactionState.COMMIT_MESSAGE;
//使用下面事务回滚 消费端无法接收到消息了
try {
Thread.sleep(100000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return LocalTransactionState.ROLLBACK_MESSAGE;
}
@Override
public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {
// 该方法用于获取本地事务执行的状态。
System.out.println("检查本地事务的状态:" + messageExt);
return LocalTransactionState.COMMIT_MESSAGE;
}
};
//创建事务消息生产者
TransactionMQProducer producer=new TransactionMQProducer("producer_grp_01");
//设置事务监听器
producer.setTransactionListener(listener);
producer.setNamesrvAddr("192.168.66.100:9876");
producer.start();
Message message=null;
message=new Message("tp_demo_11","hello translation message".getBytes());
producer.sendMessageInTransaction(message,"{\" name\":\"zhansan\"}");
}
}
消费者
package com.jjy.consumer;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;
public class TransactionMsgConsumer {
public static void main(String[] args) throws MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("txconsumer_grp_12_01");
consumer.setNamesrvAddr("192.168.66.100:9876");
consumer.subscribe("tp_demo_11", "*");
consumer.setMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
for (MessageExt msg : msgs) {
System.out.println(new String(msg.getBody()));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
}
}
消息查询
package com.itbaizhan.consumer;
public class QueryingMessageDemo {
public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException, MQBrokerException {
//创建消费者对象
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_grp_01");
//设置nameserver地址
consumer.setNamesrvAddr("192.168.66.100:9876");
//设置消息监听器
consumer.setMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
//根据messageId查询消息
MessageExt message = consumer.viewMessage("topic_springboot_demo_02", "C0A88B8000002A9F000000000000C8E8");
System.out.println(message);
System.out.println(message.getMsgId());
consumer.shutdown();
}
}
如果我的内容对你有帮助,请点赞,评论,收藏。创作不易,大家的支持就是我坚持下去的动力!