当物理文件删除了 队列中的下标的消息也被删除了 但是即使物理删除了 队列中的偏移量还是会持续上升
每天凌晨4点 定时清理
在 RocketMQ 中,消息的物理删除是通过定期清理 CommitLog 文件来实现的。CommitLog 文件中存储的是所有主题和队列的消息,一旦这些文件中的数据超过了文件保留时间(fileReservedTime
)或者文件大小限制,老旧的文件将会被删除。对于 ConsumeQueue 文件,它们的清理依赖于 CommitLog。如果 CommitLog 中的消息被删除了,那么对应的 ConsumeQueue 文件中的条目也会失效,并在随后的清理中被删除
nameserver每次不想写可以配置成环境变量
<dependencies>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>5.3.0</version>
</dependency>
</dependencies>
ACL访问控制列表
<dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-acl</artifactId> <version>5.3.0</version> </dependency>
基础演示
package com.example;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.ObjectOutputStream;
/**
* @author hrui
* @date 2024/8/2 18:07
*/
public class MQProducer {
public static void main(String[] args) {
//创建生产者
DefaultMQProducer producer = new DefaultMQProducer("mq_producer_group_test");
//设置nameserver地址
producer.setNamesrvAddr("10.8.0.1:9876");
try {
//启动生产者
producer.start();
//发送消息
User user = new User("hrui",18,"北京");
Message message = new Message("mq_topic_test",serialize(user));//生产环境最好把自动创建Topic关闭
SendResult sendResult = producer.send(message);
//[sendStatus=SEND_OK, msgId=24098A282821613099CCCEAC6E72517F7C0818B4AAC20921F6960000, offsetMsgId=AC15217300002A9F00000000000B186D, messageQueue=MessageQueue [topic=mq_topic_test, brokerName=broker-a, queueId=2], queueOffset=0]
System.out.println("消息已发送,返回结果:"+sendResult);
}catch (Exception e){
e.printStackTrace();
}finally {
producer.shutdown();
}
}
public static byte[] serialize(Object object) throws IOException {
try (ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
ObjectOutputStream objectOutputStream = new ObjectOutputStream(byteArrayOutputStream)) {
objectOutputStream.writeObject(object);
return byteArrayOutputStream.toByteArray();
}
}
}
package com.example;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.util.List;
/**
* @author hrui
* @date 2024/8/2 18:30
*/
public class MQConsumer {
public static void main(String[] args) {
//两种消费模式
//DefaultMQPushConsumer:采用长轮询机制,模拟推送效果,但本质上是主动拉取。适合低延迟、高实时性的场景。
//DefaultMQPullConsumer:消费者明确主动拉取消息,控制权完全在消费者手中,适合需要严格控制消息拉取节奏的场景。
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("mq_consumer_group_test");
//设置namesrv地址
consumer.setNamesrvAddr("10.8.0.1:9876");
try {
//订阅topic
consumer.subscribe("mq_topic_test", "*");
//注册监听器,Broker推送消息触发
//1. MessageListenerOrderly(顺序消费):保证消息按顺序处理
//2. MessageListenerConcurrently(并发消费):消息并发处理,不保证顺序
consumer.registerMessageListener(new MessageListenerConcurrently(){
@Override//consumeConcurrentlyContext 是个消费上下文对象
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
//不同线程
System.out.println(Thread.currentThread().getName());
System.out.println("list.size="+list.size());
for (MessageExt messageExt : list) {
//将messageExt里的User 反序列化
User user = deserialize(messageExt.getBody());
System.out.println("消费者接收到消息: " + user);
//获取当前消息队列
System.out.println("当前消息队列: " + consumeConcurrentlyContext.getMessageQueue());//MessageQueue [topic=mq_topic_test, brokerName=broker-a, queueId=0]
//获取下一次消费时的延迟级别
int delayLevel = consumeConcurrentlyContext.getDelayLevelWhenNextConsume();
System.out.println("消费时的延迟级别: " + delayLevel);
//设置下一次消费时的延迟级别
//consumeConcurrentlyContext.setDelayLevelWhenNextConsume(2);
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
//启动消费者
consumer.start();
}catch (Exception e){
e.printStackTrace();
}
}
// 反序列化方法
public static User deserialize(byte[] data) {
try (ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(data);
ObjectInputStream objectInputStream = new ObjectInputStream(byteArrayInputStream)) {
return (User) objectInputStream.readObject();
} catch (IOException | ClassNotFoundException e) {
e.printStackTrace();
return null;
}
}
}
上面示例,如果有多个消费者,消息只会被其中一个消费者消费一次
Topic,Broker,messageQueue,queueOffset,tags大概说明
单机环境下
topic是个逻辑概念,可以在配置环境conf/broker.conf中配置 autoCreateTopicEnable=true # 允许自动创建 Topic
在单机环境下,一个Topic主题创建后,默认会有4个队列(messageQueue),每条消息都有唯一标识,避免重复发送
当生产者发送一条消息到MQ时候,需要选择一个Topic,随机(或者说MQ有自己的策略),放到4个队列中的其中一个
而偏移量(queueOffset)其实就是该消息在该队列中的下标
tags:消息tag用于消息过滤
主从或者多个Broker下
注意注意注意::::主从关系的broker 一般只有主Broker和消费者关联,从Broker只做数据备份
多个Broker,当Topic创建之后,多个Broker可能有相同的Topic主题
但是生产者发送的消息还是会根据MQ的策略选择其中一个Broker的该主题下一般4个队列选择一个队列存放
DefaultMQProducer
默认的消息生产者实现类,当自动创建主题开启,可以创建主题,设置队列数量(不指定,默认一个Broker 4个队列)
消息长度验证
消息发送之前,确保生产者发送的消息符合相应规范,具体规范要求是,主题名称(Topic),消息体(body)不能为空,消息长度>0并且小于4M(1024*1024*4)
死信队列的工作机制
在 RocketMQ 中,当消费者消费消息时,出现以下情况之一时,消息会被投递到死信队列:
-
消息被重复消费多次仍然失败:RocketMQ 中默认的最大重试次数是 16 次。如果消息在多次重试后仍然消费失败,则会被放入死信队列。
-
消费者抛出异常且没有成功消费消息:如果消费者在处理消息时抛出异常,并且消费未成功,则该消息会被重新放回队列并尝试重新消费。经过多次尝试仍然失败,最终会进入死信队列。
1.简单消息(普通消息)
可靠同步消息:
生产者向RocketMQ执行发送API时,同步等待,直到MQ服务器返回发送结果.
package com.example.simple;
import com.example.User;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.common.message.Message;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.ObjectOutputStream;
/**
* @author hrui
* @date 2024/8/2 20:45
*/
public class SyncMQProducer {
public static void main(String[] args) {
//创建生产者
DefaultMQProducer producer = new DefaultMQProducer("mq_producer_group_test");
//设置nameserver地址
producer.setNamesrvAddr("10.8.0.1:9876");
try {
//启动生产者
producer.start();
//发送消息
User user = new User("hrui", 18, "北京");
Message message = new Message("mq_topic_test", serialize(user));
//同步发送消息,等待返回结果
SendResult sendResult = producer.send(message);
System.out.println("消息已发送, 返回结果: " + sendResult);
//根据返回结果进行后续处理
if (sendResult.getSendStatus() == SendStatus.SEND_OK) {
System.out.println("消息发送成功");
} else {
System.out.println("消息发送失败");
}
} catch (Exception e) {
e.printStackTrace();
} finally {
producer.shutdown();
}
}
public static byte[] serialize(Object object) throws IOException {
try (ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
ObjectOutputStream objectOutputStream = new ObjectOutputStream(byteArrayOutputStream)) {
objectOutputStream.writeObject(object);
return byteArrayOutputStream.toByteArray();
}
}
}
可靠异步消息
生产者在发送消息后,不等待Broker的返回结果,而是继续执行后续的业务逻辑。当消息发送成功或失败时,Broker会通过回调函数在一个新的线程中通知生产者结果。这种方式可以提高系统的吞吐量和响应速度。
同步发送和异步发送都是用send方法
异步的send方法多了一个参数 SEndCallback回调
package com.example.simple;
import com.example.User;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.ObjectOutputStream;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
/**
* @author hrui
* @date 2024/8/2 20:53
*/
public class AsyncMQProducer {
public static void main(String[] args) {
//创建生产者
DefaultMQProducer producer = new DefaultMQProducer("mq_producer_group_test");
//设置nameserver地址
producer.setNamesrvAddr("10.8.0.1:9876");
try {
//启动生产者
producer.start();
//发送消息
User user = new User("hrui", 18, "北京");
Message message = new Message("mq_topic_test", serialize(user));
//使用 CountDownLatch 确保异步发送完成
CountDownLatch countDownLatch = new CountDownLatch(1);
//异步发送消息,设置回调函数处理返回结果
producer.send(message, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
//发送成功的回调
System.out.println("消息发送成功:"+sendResult);
countDownLatch.countDown(); // 减少计数器
}
@Override
public void onException(Throwable e) {
//发送失败的回调
System.out.println("消息发送失败:"+e.getMessage());
e.printStackTrace();
countDownLatch.countDown(); // 减少计数器
}
});
//等待消息发送完成(设置超时时间为 5 秒)
boolean await = countDownLatch.await(5, TimeUnit.SECONDS);
if (!await) {
System.out.println("消息发送超时");
}
} catch (Exception e) {
e.printStackTrace();
} finally {
//关闭生产者
producer.shutdown();//这里不要关闭,你这里直接关闭了,MQ回调是异步的,等回调完再关
}
}
public static byte[] serialize(Object object) throws IOException {
try (ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
ObjectOutputStream objectOutputStream = new ObjectOutputStream(byteArrayOutputStream)) {
objectOutputStream.writeObject(object);
return byteArrayOutputStream.toByteArray();
}
}
}
单向消息
生产者只负责发送消息,不关心发送结果,也不需要等待服务器的响应。这种方式非常适合需要极高吞吐量的场景,例如日志采集。你只管发,成功失败无关紧要
简单发送sendOneway
package com.example.simple;
import com.example.User;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.ObjectOutputStream;
/**
* @author hrui
* @date 2024/8/2 21:23
*/
public class OnewayMQProducer {
public static void main(String[] args) {
//创建生产者
DefaultMQProducer producer = new DefaultMQProducer("mq_producer_group_test");
//设置nameserver地址
producer.setNamesrvAddr("10.8.0.1:9876");
try {
//启动生产者
producer.start();
System.out.println("生产者启动成功");
//发送消息
User user = new User("hrui", 18, "北京");
Message message = new Message("mq_topic_test", serialize(user));
//单向发送消息
producer.sendOneway(message);
System.out.println("消息已发送");
} catch (Exception e) {
e.printStackTrace();
} finally {
//关闭生产者
producer.shutdown();
}
}
public static byte[] serialize(Object object) throws IOException {
try (ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
ObjectOutputStream objectOutputStream = new ObjectOutputStream(byteArrayOutputStream)) {
objectOutputStream.writeObject(object);
return byteArrayOutputStream.toByteArray();
}
}
}
以上示例消费者都可以用
package com.example;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.util.List;
/**
* @author hrui
* @date 2024/8/2 18:30
*/
public class MQConsumer {
public static void main(String[] args) {
//两种消费模式
//DefaultMQPushConsumer:采用长轮询机制,模拟推送效果,但本质上是主动拉取。适合低延迟、高实时性的场景。
//DefaultMQPullConsumer:消费者明确主动拉取消息,控制权完全在消费者手中,适合需要严格控制消息拉取节奏的场景。
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("mq_consumer_group_test");
//设置namesrv地址
consumer.setNamesrvAddr("xxx.xxx.xxx:9876");
try {
//订阅topic
consumer.subscribe("mq_topic_test", "*");
//注册监听器,Broker推送消息触发
//1. MessageListenerOrderly(顺序消费):保证消息按顺序处理
//2. MessageListenerConcurrently(并发消费):消息并发处理,不保证顺序
consumer.registerMessageListener(new MessageListenerConcurrently(){
@Override//consumeConcurrentlyContext 是个消费上下文对象
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
System.out.println(Thread.currentThread().getName());
System.out.println("list.size="+list.size());
for (MessageExt messageExt : list) {
//将messageExt里的User 反序列化
User user = deserialize(messageExt.getBody());
System.out.println("消费者接收到消息: " + user);
//获取当前消息队列
System.out.println("当前消息队列: " + consumeConcurrentlyContext.getMessageQueue());//MessageQueue [topic=mq_topic_test, brokerName=broker-a, queueId=0]
//获取下一次消费时的延迟级别
int delayLevel = consumeConcurrentlyContext.getDelayLevelWhenNextConsume();
System.out.println("消费时的延迟级别: " + delayLevel);
//设置下一次消费时的延迟级别
//consumeConcurrentlyContext.setDelayLevelWhenNextConsume(2);
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
//启动消费者
consumer.start();
}catch (Exception e){
e.printStackTrace();
}
}
// 反序列化方法
public static User deserialize(byte[] data) {
try (ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(data);
ObjectInputStream objectInputStream = new ObjectInputStream(byteArrayInputStream)) {
return (User) objectInputStream.readObject();
} catch (IOException | ClassNotFoundException e) {
e.printStackTrace();
return null;
}
}
}
2.顺序消息
顺序消息指消费者消费某个topic的某个队列中的消息是顺序的.
要保证顺序消息两个前提
生产者:确保将相同顺序要求的消息发送到同一个Broker的同一个队列
消费者:确保只消费特定的队列内的消息
不然无法保证顺序消息
最简单的实现方式,全局就一个Broker,Broker内就一个队列
修改conf/broker.conf
# Broker 配置文件 broker.conf
defaultTopicQueueNums=1
但是这样设置会导致其他Topic也只有一个队列,事实上我们在代码里也可以指定,或者在dashboard页面中创建Topic时候指定用哪个Broker中有几个队列
能否这么做,我用不同的Topic代表不同的消息,而在创建顺序消息的Topic时候,我指定就一个Broker有并且这个broker里的队列只有一个
因自己这边本身就是单机部署的MQ
主题 对应放置的消息
并且把顺序消息放到一个broker的一个队列中
下面代码用来创建Topic
package com.example.topic;
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
/**
* @author hrui
* @date 2024/8/3 15:17
*/
public class CreateTopics {
public static void main(String[] args) {
// 创建带有ACL凭证的RPCHook
AclClientRPCHook aclHook = new AclClientRPCHook(new SessionCredentials("xxxxx", "xxxxx"));
// 创建DefaultMQAdminExt实例并设置RPCHook
DefaultMQAdminExt adminExt = new DefaultMQAdminExt(aclHook);
adminExt.setNamesrvAddr("xxx.xxx.xxx:9876");
//adminExt.setInstanceName("admin-instance");
try {
adminExt.start();
// 创建普通消息主题
createTopic(adminExt, "normalTopic", 4, 4, new String[]{"xxx.xxx.xxx:10911"});
// 创建顺序消息主题,只在broker-a上创建,且只有一个队列
createTopic(adminExt, "sequentialTopic", 1, 1, new String[]{"10.8.0.1:10911"});
// 创建延时消息主题
createTopic(adminExt, "delayedTopic", 4, 4, new String[]{"xxx.xxx.xxx:10911"});
// 创建广播消息主题
createTopic(adminExt, "broadcastTopic", 4, 4, new String[]{"xxx.xxx.xxx:10911"});
// 创建事务消息主题
createTopic(adminExt, "transactionTopic", 4, 4, new String[]{"xxx.xxx.xxx:10911"});
} catch (Exception e) {
e.printStackTrace();
} finally {
adminExt.shutdown();
}
}
public static void createTopic(DefaultMQAdminExt adminExt, String topicName, int writeQueueNums, int readQueueNums, String[] brokers) throws Exception {
TopicConfig topicConfig = new TopicConfig(topicName);
topicConfig.setWriteQueueNums(writeQueueNums);
topicConfig.setReadQueueNums(readQueueNums);
topicConfig.setPerm(6);
for (String broker : brokers) {
adminExt.createAndUpdateTopicConfig(broker, topicConfig);
}
}
}
顺序消费,指定存入一个Broker的队列 生产者 因为上面创建sequentialTopic主题时候,里面就放了一个队列
package com.example.order;
import com.example.User;
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.MessageQueueSelector;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.remoting.RPCHook;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.ObjectOutputStream;
import java.util.List;
public class OrderMQProducer {
public static void main(String[] args) {
DefaultMQProducer producer = new DefaultMQProducer("mq_producer_group_test", getAclRPCHook());
producer.setNamesrvAddr("xxx.xxx.xxx:9876");
try {
producer.start();
// 发送多条消息
for (int i = 0; i < 100; i++) {
User user = new User("hrui-" + i, 18 + i, "北京");
Message message = new Message("sequentialTopic", serialize(user));
// 选择具体某个队列
SendResult sendResult = producer.send(message, new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> list, Message message, Object arg) {
// 假设只在 broker-a 上创建了一个队列,那么直接返回该队列
for (MessageQueue mq : list) {
if ("broker-a".equals(mq.getBrokerName()) && mq.getQueueId() == 0) {
return mq;
}
}
// 如果没有找到指定的队列,则返回默认队列
return list.get(0);
}
}, null);
System.out.println("消息发送结果:" + sendResult);
}
} catch (Exception e) {
e.printStackTrace();
} finally {
producer.shutdown();
}
}
public static byte[] serialize(Object object) throws IOException {
try (ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
ObjectOutputStream objectOutputStream = new ObjectOutputStream(byteArrayOutputStream)) {
objectOutputStream.writeObject(object);
return byteArrayOutputStream.toByteArray();
}
}
// 访问权限
public static RPCHook getAclRPCHook() {
return new AclClientRPCHook(new SessionCredentials("xxxxxx", "xxxxxx"));
}
}
消费者订阅sequentialTopic 并且里面只有一个队列
package com.example.order;
import com.example.User;
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.RPCHook;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.util.List;
/**
* 要保证顺序消费
* 消费者侧:确保每个消费者实例只消费特定的队列。
*/
public class OrderMQConsumer {
public static void main(String[] args) {
// 创建消费者并配置ACL
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("order_consumer_group", getAclRPCHook());
consumer.setNamesrvAddr("xxxx.xxx.xxx:9876");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
try {
// 订阅顺序消息主题
consumer.subscribe("sequentialTopic", "*");
// 注册顺序消息监听器
consumer.registerMessageListener(new MessageListenerOrderly() {
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
for (MessageExt msg : msgs) {
System.out.println("消费消息: " + deserialize(msg.getBody()));
}
return ConsumeOrderlyStatus.SUCCESS;
}
});
// 启动消费者
consumer.start();
System.out.println("顺序消息消费者启动成功");
} catch (MQClientException e) {
e.printStackTrace();
}
}
// 反序列化方法
public static User deserialize(byte[] data) {
try (ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(data);
ObjectInputStream objectInputStream = new ObjectInputStream(byteArrayInputStream)) {
return (User) objectInputStream.readObject();
} catch (IOException | ClassNotFoundException e) {
e.printStackTrace();
return null;
}
}
// 访问权限
public static RPCHook getAclRPCHook() {
return new AclClientRPCHook(new SessionCredentials("xxxxxx", "xxxxxx"));
}
}
3.广播消息
是向所有订阅了该主题的订阅者发送消息.订阅同一个topic的多个消费者,能全量收到生产者发送
的所有消息
生产者可以随意发送消息
广播模式主要在消费者端设置下广播模式的类型
MessageModel.BROADCASTING:广播消息.一条消息会发给所有订阅了对应主题的消费者,不管消费者是不是同一个消费者组.
MessageModel.CLUSTERING:集群消息.每条消息只会被同一个消费者组中的一个实例消费.
关于
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
这个设置
重新启动只会消费新的数据,不会消费老数据
package com.example.broadcast;
import com.example.User;
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.RPCHook;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.ObjectOutputStream;
/**
* 广播消息
* @author hrui
* @date 2024/8/3 17:17
*/
public class BroadcastProducer {
public static void main(String[] args) {
DefaultMQProducer producer = new DefaultMQProducer("broadcast-producer-group",getAclRPCHook());
//设置nameserver地址
producer.setNamesrvAddr("xxx.xxx.xxx:9876");
producer.setSendMsgTimeout(10000); // 设置发送超时时间为10秒 如果循环发送消息 可能报超时 设置下
try {
producer.start();
for (int i = 0; i < 10; i++) {
User user = new User();
user.setName("hrui_"+i);
user.setAge(18);
user.setAddress("北京");
Message message = new Message("broadcastTopic", "tagA", serialize(user));
//发送消息
SendResult sendResult = producer.send(message);
//根据返回结果进行后续处理
if (sendResult.getSendStatus() == SendStatus.SEND_OK) {
System.out.println("消息发送成功");
} else {
System.out.println("消息发送失败");
}
}
}catch(Exception e){
e.printStackTrace();
}finally{
producer.shutdown();
}
}
public static byte[] serialize(Object object) throws IOException {
try (ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
ObjectOutputStream objectOutputStream = new ObjectOutputStream(byteArrayOutputStream)) {
objectOutputStream.writeObject(object);
return byteArrayOutputStream.toByteArray();
}
}
public static RPCHook getAclRPCHook() {
return new AclClientRPCHook(new SessionCredentials("xxxxx", "xxxxx"));
}
}
package com.example.broadcast;
import com.example.User;
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.protocol.heartbeat.MessageModel;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.util.List;
/**
* 广播消息消费者
* 该类实现了一个广播消息的消费者,能够从RocketMQ集群中接收并处理消息。
* 使用了RocketMQ的ACL进行权限控制。
*
* @author hrui
* @date 2024/8/3 17:17
*/
public class BroadcastConsumer {
public static void main(String[] args) {
// 创建消费者实例,并指定消费者组名和ACL权限验证
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("broadcast-consumer-group", getAclRPCHook());
// 设置NameServer地址
consumer.setNamesrvAddr("xxxx.xxx.xxx:9876");
//RocketMQ设计的目标是让所有的消费者接收到它们启动后发送到主题的所有消息,而不是从队列的历史开始位置重播所有消息
// 设置从队列的最开始位置消费 如果不设置默认consumeFromWhere:CONSUME_FROM_LAST_OFFSET
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
ConsumeFromWhere consumeFromWhere = consumer.getConsumeFromWhere();
System.out.println("consumeFromWhere:"+consumeFromWhere);
// 设置消息模式为广播模式
consumer.setMessageModel(MessageModel.CLUSTERING);
try {
// 订阅主题并指定Tag,*表示全部Tag
consumer.subscribe("broadcastTopic", "*");
// 注册消息监听器,处理接收到的消息
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
for (MessageExt msg : msgs) {
// 反序列化消息体
User user = deserialize(msg.getBody());
System.out.println("收到消息:"+user);
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
// 启动消费者
consumer.start();
System.out.println("Broadcast Consumer started.");
} catch (Exception e) {
e.printStackTrace();
} finally {
// 这里没有需要在finally中关闭的资源,但如果有资源需要关闭,可以在这里处理
}
}
/**
* 反序列化方法,将字节数组转换为User对象
*
* @param data 字节数组
* @return User对象
*/
public static User deserialize(byte[] data) {
try (ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(data);
ObjectInputStream objectInputStream = new ObjectInputStream(byteArrayInputStream)) {
return (User) objectInputStream.readObject();
} catch (IOException | ClassNotFoundException e) {
e.printStackTrace();
return null;
}
}
/**
* 获取ACL权限验证的RPCHook
*
* @return RPCHook
*/
public static RPCHook getAclRPCHook() {
return new AclClientRPCHook(new SessionCredentials("xxxxx", "xxxxx"));
}
}
4.延迟消息
延迟消息与普通消息的不同之处在于,他们要等到指定的时间之后才会被传递
可以在conf/broker.conf中配置
延迟消息大概逻辑是 系统为延迟任务创建了topic 当然 你只需要关注于自己的topic就行了
分了18个等级对应18个队列
package com.example.schedule;
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.RPCHook;
/**
* @author hrui
* @date 2024/8/3 19:34
*/
public class ScheduledProducer {
public static void main(String[] args) throws Exception {
// 创建生产者,指定生产者组名
DefaultMQProducer producer = new DefaultMQProducer("ScheduledProducerGroup",getAclRPCHook());
// 设置NameServer地址
producer.setNamesrvAddr("xxx.xxx.xxx:9876");
producer.setSendMsgTimeout(10000); // 设置发送超时时间为10秒 可能报超时 设置下
// 启动生产者
producer.start();
int delayLevel = 3; // 延迟级别,对应某个具体的延迟时间(如1分钟)
// 创建消息实例,指定topic,tag和消息体
Message message = new Message("delayedTopic", "TagA", "OrderID188", "Hello scheduled message".getBytes());
// 设置延迟级别
message.setDelayTimeLevel(delayLevel);
// 发送消息
SendResult sendResult = producer.send(message);
System.out.println("发送结果: " + sendResult);
// 关闭生产者
producer.shutdown();
}
// 访问权限
public static RPCHook getAclRPCHook() {
return new AclClientRPCHook(new SessionCredentials("xxxxx", "xxxxxx"));
}
}
package com.example.schedule;
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.RPCHook;
/**
* @author hrui
* @date 2024/8/3 19:34
*/
public class ScheduleConsumer {
public static void main(String[] args) throws MQClientException {
// 创建消费者,指定消费者组名
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ScheduledConsumerGroup",getAclRPCHook());
// 设置NameServer地址
consumer.setNamesrvAddr("xxx.xxx.xxx:9876");
// 订阅topic
consumer.subscribe("delayedTopic", "*");
// 注册消息监听器
consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
for (MessageExt msg : msgs) {
String keys = msg.getKeys();
System.out.println("Receive message[msgId=" + msg.getMsgId() + "] " +
"(body: " + new String(msg.getBody()) + ", keys: " + keys + ") " +
"at time: " + System.currentTimeMillis());
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
// 启动消费者
consumer.start();
System.out.println("延迟任务消费者启动");
}
public static RPCHook getAclRPCHook() {
return new AclClientRPCHook(new SessionCredentials("xxxxxx", "xxxxx"));
}
}
5.批量消息
批量消息为了减少IO
批量消息(Batch Message)是一种将多条消息组合成一个单一的消息发送的方式,以提高消息的发送效率。通过这种方式,可以减少网络调用的次数,从而提高吞吐量。
消息不要超过4M 超过就分批次
package com.example.bach;
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.RPCHook;
import java.util.ArrayList;
import java.util.List;
/**
* @author hrui
* @date 2024/8/3 21:56
*/
public class BatchProducer {
public static void main(String[] args) throws Exception {
// 创建生产者实例
DefaultMQProducer producer = new DefaultMQProducer("batch_producer_group",getAclRPCHook());
producer.setNamesrvAddr("xxx.xxx.xxx:9876");
producer.setSendMsgTimeout(20000); // 设置发送超时时间为10秒 如果循环发送消息 可能报超时 设置下
producer.start();
// 创建批量消息
String topic = "BatchTopic";
List<Message> messages = new ArrayList<>();
messages.add(new Message(topic, "TagA", "OrderID001", "Hello World 0".getBytes()));
messages.add(new Message(topic, "TagA", "OrderID002", "Hello World 1".getBytes()));
messages.add(new Message(topic, "TagA", "OrderID003", "Hello World 2".getBytes()));
// 发送批量消息
SendResult sendResult = producer.send(messages);
//根据返回结果进行后续处理
if (sendResult.getSendStatus() == SendStatus.SEND_OK) {
System.out.println("消息发送成功");
} else {
System.out.println("消息发送失败");
}
// 关闭生产者
producer.shutdown();
}
public static RPCHook getAclRPCHook() {
return new AclClientRPCHook(new SessionCredentials("xxxxx", "xxxxxx"));
}
}
package com.example.bach;
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.RPCHook;
import java.util.List;
/**
* @author hrui
* @date 2024/8/3 22:13
*/
public class BatchConsumer {
public static void main(String[] args) throws MQClientException {
// 创建消费者实例,并指定消费者组名
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("BatchConsumerGroup", getAclRPCHook());
// 设置NameServer地址
consumer.setNamesrvAddr("xxx.xxx.xxx:9876");
// 订阅topic
consumer.subscribe("BatchTopic", "*");
// 注册消息监听器,处理接收到的消息
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
System.out.println("当前线程:"+Thread.currentThread().getName()+",消息集合的大小:"+msgs.size());//不同线程推
for (MessageExt msg : msgs) {
System.out.println(Thread.currentThread().getName() + " 收到新消息: " + new String(msg.getBody()));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
// 启动消费者
consumer.start();
System.out.println("批量消费者已启动.");
}
public static RPCHook getAclRPCHook() {
return new AclClientRPCHook(new SessionCredentials("xxxxx", "xxxxxx"));
}
}
当超过4M了怎么办
package com.example.batch;
import com.example.bach.ListSplitter;
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.exception.RemotingException;
import java.io.UnsupportedEncodingException;
import java.util.ArrayList;
import java.util.List;
/**
* @author hrui
* @date 2024/8/3 22:21
*/
public class BatchProducer4M {
public static void main(String[] args) throws MQClientException, InterruptedException, RemotingException, MQBrokerException, UnsupportedEncodingException {
// 创建一个生产者实例,并指定生产者组名
DefaultMQProducer producer = new DefaultMQProducer("BatchProducerGroup", getAclRPCHook());
// 设置NameServer地址
producer.setNamesrvAddr("xxx.xxx.xxx:9876");
producer.setSendMsgTimeout(50000); // 设置发送超时时间为50秒
// 启动生产者
producer.start();
// 创建消息列表
List<Message> messages = new ArrayList<>();
for (int i = 0; i < 100000; i++) {
byte[] body = ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET);
Message msg = new Message("BatchTopic", "TagA", "OrderID188", body);
messages.add(msg);
}
// 使用ListSplitter将大消息分割成小消息
ListSplitter splitter = new ListSplitter(messages);
while (splitter.hasNext()) {
try {
List<Message> listItem = splitter.next();
SendResult sendResult = producer.send(listItem);
if (sendResult.getSendStatus() == SendStatus.SEND_OK) {
System.out.println("消息发送成功");
} else {
System.out.println("消息发送失败");
}
} catch (Exception e) {
e.printStackTrace();
// 处理发送失败的情况
}
}
// 关闭生产者
producer.shutdown();
}
public static RPCHook getAclRPCHook() {
return new AclClientRPCHook(new SessionCredentials("xxxxxx", "xxxxxxx"));
}
}
package com.example.bach;
import org.apache.rocketmq.common.message.Message;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
/**
* 将所有传入的消息按照 <SIZE_LIMIT> 的要求进行分块
*/
public class ListSplitter implements Iterator<List<Message>> {
private final int SIZE_LIMIT = 1024 * 1024 * 4; // 4MB
private final List<Message> messages;
private int currIndex;
public ListSplitter(List<Message> messages) {
this.messages = messages;
}
@Override
public boolean hasNext() {
return currIndex < messages.size();
}
@Override
public List<Message> next() {
int startIndex = getStartIndex();
int nextIndex = startIndex;
int totalSize = 0;
for (; nextIndex < messages.size(); nextIndex++) {
Message message = messages.get(nextIndex);
int tmpSize = calcMessageSize(message);
if (tmpSize + totalSize > SIZE_LIMIT) {
break;
} else {
totalSize += tmpSize;
}
}
List<Message> subList = messages.subList(startIndex, nextIndex);
currIndex = nextIndex;
return subList;
}
/**
* 获取从currIndex开始第一个符合<=SIZE_LIMIT的元素
* @return currIndex
*/
private int getStartIndex() {
Message currMessage = messages.get(currIndex);
int tmpSize = calcMessageSize(currMessage);
while (tmpSize > SIZE_LIMIT) {
currIndex += 1;
Message message = messages.get(currIndex);
tmpSize = calcMessageSize(message);
}
return currIndex;
}
/**
* 计算消息的大小,包括主题,属性和日志开销
* @param message
* @return 消息大小(字节)
*/
private int calcMessageSize(Message message) {
int tmpSize = message.getTopic().length() + message.getBody().length;
Map<String, String> properties = message.getProperties();
for (Map.Entry<String, String> entry : properties.entrySet()) {
tmpSize += entry.getKey().length() + entry.getValue().length();
}
tmpSize = tmpSize + 180; // 增加日志的开销20字节
return tmpSize;
}
@Override
public void remove() {
throw new UnsupportedOperationException("不支持删除操作");
}
}
6.过滤消息
Tag标记主要就是用来过滤的,如果将Topic看出1级目录 那么Tag可以看出2级
RocketMQ 提供了两种方式:基于标签(Tag)的过滤和基于 SQL92 的过滤
基于标签(Tag)的过滤
基于标签的过滤是 RocketMQ 默认支持的过滤方式。生产者发送消息时指定一个标签,消费者订阅消息时也指定相应的标签,从而只接收指定标签的消息。
按 Tag 过滤:
consumer.subscribe("TopicTest", "TagA || TagB");
package com.example.filter;
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.RPCHook;
/**
* @author hrui
* @date 2024/8/3 23:07
*/
public class TagProducer {
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("TagProducerGroup", getAclRPCHook());
producer.setNamesrvAddr("xxx.xxx.xxx:9876");
producer.setSendMsgTimeout(10000); // 设置发送超时时间为10秒 如果循环发送消息 可能报超时 设置下
producer.start();
//发送带有TagA标签的消息
Message messageA = new Message("TagTopic", "TagA", "Hello TagA".getBytes());
SendResult sendResultA = producer.send(messageA);
System.out.println("Send Result A: " + sendResultA);
//发送带有TagB标签的消息
Message messageB = new Message("TagTopic", "TagB", "Hello TagB".getBytes());
SendResult sendResultB = producer.send(messageB);
System.out.println("Send Result B: " + sendResultB);
producer.shutdown();
}
public static RPCHook getAclRPCHook() {
return new AclClientRPCHook(new SessionCredentials("xxxxxx", "xxxxxx"));
}
}
package com.example.filter;
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.RPCHook;
/**
* @author hrui
* @date 2024/8/3 23:07
*/
public class TagConsumer {
public static void main(String[] args) throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("TagConsumerGroup", getAclRPCHook());
consumer.setNamesrvAddr("xxxx.xxx.xxx:9876");
//只订阅带有TagA标签的消息
consumer.subscribe("TagTopic", "TagA");
consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
for (MessageExt msg : msgs) {
System.out.println("Received Message: " + new String(msg.getBody()));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
consumer.start();
System.out.println("Tag Consumer Started.");
}
public static RPCHook getAclRPCHook() {
return new AclClientRPCHook(new SessionCredentials("xxxxx", "xxxxxx"));
}
}
基于 SQL92 的过滤
基于 SQL92 的过滤需要在 Broker 上配置 enablePropertyFilter=true
,并且生产者发送消息时添加自定义属性,消费者通过 SQL92 表达式进行过滤。
SQL92 过滤 需要用到 MessageSelector
按 SQL92 表达式过滤:
consumer.subscribe("TopicTest", MessageSelector.bySql("a > 5"));
package com.example.filter;
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.RPCHook;
/**
* @author hrui
* @date 2024/8/3 23:11
*/
public class SQLProducer {
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("SQLProducerGroup", getAclRPCHook());
producer.setNamesrvAddr("xxx.xxx.xxx:9876");
producer.setSendMsgTimeout(10000); // 设置发送超时时间为10秒 如果循环发送消息 可能报超时 设置下
producer.start();
//发送带有自定义属性的消息
Message message = new Message("SQLTopic", "TagA", "Hello SQL".getBytes());
message.putUserProperty("age", "20");
SendResult sendResult = producer.send(message);
System.out.println("Send Result: " + sendResult);
Message message2 = new Message("SQLTopic", "TagB", "Hello SQL".getBytes());
message.putUserProperty("age", "10");
SendResult sendResult2= producer.send(message2);
System.out.println("Send Result: " + sendResult2);
producer.shutdown();
}
public static RPCHook getAclRPCHook() {
return new AclClientRPCHook(new SessionCredentials("xxxxx", "xxxxx"));
}
}
package com.example.filter;
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.MessageSelector;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.RPCHook;
public class SQLConsumer {
public static void main(String[] args) throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("SQLConsumerGroup", getAclRPCHook());
consumer.setNamesrvAddr("xxx.xxx.xxx:9876");
// 通过SQL92表达式订阅消息
consumer.subscribe("SQLTopic", MessageSelector.bySql("age > 18"));
consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
System.out.println("接收到消息:");
for (MessageExt msg : msgs) {
System.out.println("Received Message: " + new String(msg.getBody()));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
consumer.start();
System.out.println("SQL Consumer Started.");
}
public static RPCHook getAclRPCHook() {
return new AclClientRPCHook(new SessionCredentials("xxxxxx", "xxxxxx"));
}
}
7.事务消息
RocketMQ的事务是一个两阶段提交消息实现,确保分布式系统的最终一致性
事务消息的三种状态
1.TransactionStatus.CommitTransaction:提交事务,表示允许消费者消费该消息
2.TransactionStatus.RollbackTransaction:回滚事务,表示该消息将被删除,不允许消费
3.TransactionStatus.Unknow:中间状态,表示需要MQ回查才能确定状态
RocketMQ事务消息主要在生产者处理.
-
发送事务消息:
- 事务消息的生产者先发送一条半消息(half message)到Broker。这条消息会被标记为未决状态(pending status)。
-
Broker接收半消息并回复:
- Broker接收到半消息后,会持久化这条消息,并回复生产者确认接收成功。这时消息还不会被消费者消费。
-
执行本地事务:
- 生产者接收到Broker的确认回复后,开始执行本地事务操作。
-
提交本地事务状态:
- 本地事务执行完毕后,生产者会根据事务执行的结果提交事务状态(commit或rollback)给Broker。
-
Broker处理事务状态:
- 如果生产者提交的是commit状态,Broker会将这条消息的状态更新为可消费状态,消费者可以消费这条消息。
- 如果生产者提交的是rollback状态,Broker会删除这条消息,消费者不会看到这条消息。
-
事务状态回查:
- 如果在步骤4中,生产者提交事务状态失败或超时,Broker会定期回查生产者的事务状态(通过TransactionListener),确认本地事务的最终状态。根据回查结果,Broker会决定提交还是回滚消息。
从图中的流程可以看出,这一流程图正确反映了RocketMQ事务消息的处理步骤:
- 发送事务消息(半消息)。
- Broker接收并确认半消息。
- 生产者执行本地事务。
- 生产者提交本地事务状态给Broker。
- Broker根据事务状态更新消息状态。
- 若事务状态提交失败,Broker进行回查。(如果第4不生产者提交事务状态失败,超时,或者Unknow状态时候,开启回查)
package com.example.transaction;
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.RPCHook;
/**
* @author hrui
* @date 2024/8/4 0:31
*/
public class TransactionProducer {
public static void main(String[] args) throws Exception {
//创建一个事务性消息生产者,并指定生产者组名
TransactionMQProducer producer = new TransactionMQProducer("TransactionProducerGroup", getAclRPCHook());
producer.setNamesrvAddr("xxxx.xxx.xxxx:9876");
producer.setSendMsgTimeout(20000); //设置发送超时时间为10秒 可能报超时 设置下
//设置事务监听器
producer.setTransactionListener(new TransactionListener() {
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
System.out.println("执行本地事务,消息:" + new String(msg.getBody()));
// 执行本地事务
boolean localTransactionSuccess = executeLocalTransactionLogic();
return localTransactionSuccess ? LocalTransactionState.COMMIT_MESSAGE : LocalTransactionState.ROLLBACK_MESSAGE;
}
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
System.out.println("回查本地事务,消息:" + new String(msg.getBody()));
// 根据本地事务执行结果返回COMMIT_MESSAGE或ROLLBACK_MESSAGE
boolean localTransactionSuccess = checkLocalTransactionResult();
return localTransactionSuccess ? LocalTransactionState.COMMIT_MESSAGE : LocalTransactionState.ROLLBACK_MESSAGE;
}
});
// 启动生产者
producer.start();
// 发送事务消息
Message message = new Message("TransactionTopic", "TagA", "Hello Transaction Message".getBytes());
SendResult sendResult = producer.sendMessageInTransaction(message, null);
System.out.println("发送结果:" + sendResult);
// 保持生产者运行以接收事务回查
Thread.sleep(100000);
// 关闭生产者
producer.shutdown();
}
private static boolean executeLocalTransactionLogic() {
// 执行本地事务逻辑,如数据库操作
// 返回true表示本地事务成功,返回false表示失败
System.out.println("本地事务执行成功");
return true;
}
private static boolean checkLocalTransactionResult() {
// 检查本地事务执行结果
// 返回true表示本地事务成功,返回false表示失败
return true;
}
private static RPCHook getAclRPCHook() {
return new AclClientRPCHook(new SessionCredentials("xxxxx", "xxxxxx"));
}
}
package com.example.transaction;
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.RPCHook;
import java.util.List;
/**
* @author hrui
* @date 2024/8/4 0:30
*/
public class TransactionConsumer {
public static void main(String[] args) throws Exception {
//创建消费者,指定消费者组名
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("TransactionConsumerGroup", getAclRPCHook());
consumer.setNamesrvAddr("xxx.xxx.xxx:9876");
//订阅主题
consumer.subscribe("TransactionTopic", "*");
//注册消息监听器
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
for (MessageExt msg : msgs) {
System.out.println("接收到消息:" + new String(msg.getBody()));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
//启动消费者
consumer.start();
System.out.println("消费者启动完成。");
}
private static RPCHook getAclRPCHook() {
return new AclClientRPCHook(new SessionCredentials("xxxxxx", "xxxxxx"));
}
}