文章目录
- 同步发送
- 异步发送
- 单向发送
- 拉模式
- 随机获取一个queue的消息
- 指定一个queue的消息
- 顺序消息
- 广播消息
- 延迟消息
- 批量消息
- 过滤消息
- Tag过滤
- sql过滤
- 事务消息
- RocketMQ常见问题
- RocketMQ如何保证消息不丢失?
- RocketMQ的消息持久化机制
- RocketMQ如何保证消息顺序
- RocketMQ事务消息原理
同步发送
等待消息返回后再继续进行下面的操作
适合可靠性要求较高,数据量小,实时响应
消费者:
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;
/**
* @Author jiangxuzhao
* @Description
* @Date 2023/6/9
*/
public class SyncConsumer {
public static void main(String[] args) throws MQClientException {
// 推模式,消费者等到Broker把消息推过来
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("PushComsumer");
consumer.setNamesrvAddr("127.0.0.1:9876");
consumer.subscribe("Sync","*");
// 并行消费消息
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
for(int i=0;i<list.size();i++){
System.out.println(i+"_消息消费成功_"+new String(list.get(i).getBody()));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.out.println("消费者启动成功");
}
}
生产者:
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;
import java.nio.charset.StandardCharsets;
/**
* @Author jiangxuzhao
* @Description
* @Date 2023/6/9
*/
public class SyncProducer {
public static void main(String[] args) throws MQClientException, MQBrokerException, RemotingException, InterruptedException {
DefaultMQProducer producer = new DefaultMQProducer("SyncProducer");
producer.setNamesrvAddr("127.0.0.1:9876");
producer.start();
System.out.println("生产者启动成功");
for(int i=0;i<2;i++){
Message message = new Message("Sync", "SyncTags", (i+"_SyncProducer").getBytes(StandardCharsets.UTF_8));
// 同步发送
SendResult sendResult = producer.send(message);
System.out.println(i+"_消息发送成功"+sendResult);
}
producer.shutdown();
}
}
其中消费者的输出为:
0_消息消费成功_0_SyncProducer
0_消息消费成功_1_SyncProducer
前面都是0说明这种推模式broker是分两次推送的
异步发送
不等待消息返回直接进入后续的流程
消费者:
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;
/**
* @Author jiangxuzhao
* @Description
* @Date 2023/6/9
*/
public class AsyncConsumer {
public static void main(String[] args) throws MQClientException {
// 推模式,消费者等到Broker把消息推过来
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("PushComsumer");
consumer.setNamesrvAddr("127.0.0.1:9876");
consumer.subscribe("Async","*");
// 并行消费消息
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
for(int i=0;i<list.size();i++){
System.out.println(i+"_消息消费成功_"+new String(list.get(i).getBody()));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.out.println("消费者启动成功");
}
}
生产者:
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.CountDownLatch;
/**
* @Author jiangxuzhao
* @Description
* @Date 2023/6/10
*/
public class AsyncProducer {
public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException {
DefaultMQProducer producer = new DefaultMQProducer("AsyncProducer");
producer.setNamesrvAddr("127.0.0.1:9876");
producer.start();
System.out.println("生产者启动成功");
CountDownLatch countDownLatch = new CountDownLatch(100);
for(int i=0;i<100;i++){
final int index= i;
Message message = new Message("Async", "AsyncTags", (i+"_AsyncProducer").getBytes(StandardCharsets.UTF_8));
// 异步发送,回调方法由broker调用
producer.send(message, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
System.out.println(index +"_消息发送成功_"+sendResult);
countDownLatch.countDown();
}
@Override
public void onException(Throwable throwable) {
System.out.println(index +"_消息发送失败_"+throwable.getStackTrace());
countDownLatch.countDown();
}
});
}
// 主线程等待异步发送结束
countDownLatch.await();
System.out.println("发送结束");
producer.shutdown();
}
}
如果没有countDownLatch.await(),会出现下面的情况:
生产者启动成功
发送结束
0_消息发送失败_[Ljava.lang.StackTraceElement;@10c89bd0
10_消息发送失败_[Ljava.lang.StackTraceElement;@3d181392
11_消息发送失败_[Ljava.lang.StackTraceElement;@69da09fa
....
就是producer主线程先关闭了,但是异步发送还没结束,深入源码看这个send是用ThreadPoolExecutor异步线程池
其中消费者的输出为:
....
0_消息消费成功_78_AsyncProducer
0_消息消费成功_98_AsyncProducer
0_消息消费成功_99_AsyncProducer
0_消息消费成功_86_AsyncProducer
0_消息消费成功_87_AsyncProducer
0_消息消费成功_69_AsyncProducer
0_消息消费成功_9_AsyncProduce
单向发送
只负责发送,不管消息是否发送成功
适合收集日志,可靠性要求不高的场景
消费者:
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;
/**
* @Author jiangxuzhao
* @Description
* @Date 2023/6/9
*/
public class OnewayConsumer {
public static void main(String[] args) throws MQClientException {
// 推模式,消费者等到Broker把消息推过来
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("PushComsumer");
consumer.setNamesrvAddr("127.0.0.1:9876");
consumer.subscribe("Oneway","*");
// 并行消费消息
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
for(int i=0;i<list.size();i++){
System.out.println(i+"_消息消费成功_"+new String(list.get(i).getBody()));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.out.println("消费者启动成功");
}
}
发送者:
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;
import java.nio.charset.StandardCharsets;
/**
* @Author jiangxuzhao
* @Description
* @Date 2023/6/9
*/
public class OnewayProducer {
public static void main(String[] args) throws MQClientException, MQBrokerException, RemotingException, InterruptedException {
DefaultMQProducer producer = new DefaultMQProducer("SyncProducer");
producer.setNamesrvAddr("127.0.0.1:9876");
producer.start();
System.out.println("生产者启动成功");
for(int i=0;i<2;i++){
Message message = new Message("Oneway", "OnewayTags", (i+"_OnewayProducer").getBytes(StandardCharsets.UTF_8));
// 单向发送
producer.send(message);
System.out.println(i+"_消息发送成功");
}
producer.shutdown();
}
}
消费者输出:
消费者启动成功
0_消息消费成功_0_OnewayProducer
0_消息消费成功_1_OnewayProduce
拉模式
随机获取一个queue的消息
生产者:
随便指定一种发送模式,这里选了异步发送
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.CountDownLatch;
/**
* @Author jiangxuzhao
* @Description
* @Date 2023/6/11
*/
public class LitePullProducer {
public static void main(String[] args) throws MQClientException, InterruptedException, RemotingException {
DefaultMQProducer producer = new DefaultMQProducer("AsyncProducer");
producer.setNamesrvAddr("127.0.0.1:9876");
producer.start();
System.out.println("生产者启动成功");
CountDownLatch countDownLatch = new CountDownLatch(100);
for(int i=0;i<100;i++){
final int index= i;
Message message = new Message("LitePull", "LitePullTags", (i+"_LitePullProducer").getBytes(StandardCharsets.UTF_8));
// 异步发送,回调方法由broker调用
producer.send(message, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
System.out.println(index +"_消息发送成功_"+sendResult);
countDownLatch.countDown();
}
@Override
public void onException(Throwable throwable) {
System.out.println(index +"_消息发送失败_"+throwable.getStackTrace());
countDownLatch.countDown();
}
});
}
// 主线程等待异步发送结束
countDownLatch.await();
System.out.println("发送结束");
producer.shutdown();
}
}
消费者:
import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;
/**
* @Author jiangxuzhao
* @Description 拉模式-随机获取一个queue的消息
* @Date 2023/6/11
*/
public class LitePullConsumer {
public static void main(String[] args) throws MQClientException {
DefaultLitePullConsumer consumer = new DefaultLitePullConsumer("LitePullConsumer");
consumer.setNamesrvAddr("127.0.0.1:9876");
consumer.subscribe("LitePull","*");
consumer.start();
while(true){
List<MessageExt> messageExts = consumer.poll();
System.out.println("消息接收成功");
messageExts.forEach(n->{
System.out.println("消息消费成功_"+n);
});
}
}
}
消费者输出:
可以看出一次拉出来的消息queueId都是一样的。
消息接收成功
消息接收成功
消息接收成功
消息消费成功_MessageExt [brokerName=broker-a, queueId=2, storeSize=199, queueOffset=0, sysFlag=0, bornTimestamp=1686479335622, bornHost=/127.0.0.1:53451, storeTimestamp=1686479335660, storeHost=/127.0.0.1:10911, msgId=7F00000100002A9F000000000004DB87, commitLogOffset=318343, bodyCRC=44743681, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='LitePull', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=26, UNIQ_KEY=7F5E0001F19018B4AAC23776D8C40001, CLUSTER=DefaultCluster, TAGS=LitePullTags}, body=[54, 95, 76, 105, 116, 101, 80, 117, 108, 108, 80, 114, 111, 100, 117, 99, 101, 114], transactionId='null'}]
消息消费成功_MessageExt [brokerName=broker-a, queueId=2, storeSize=199, queueOffset=1, sysFlag=0, bornTimestamp=1686479335622, bornHost=/127.0.0.1:53451, storeTimestamp=1686479335661, storeHost=/127.0.0.1:10911, msgId=7F00000100002A9F000000000004DEA3, commitLogOffset=319139, bodyCRC=1669641829, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='LitePull', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=26, UNIQ_KEY=7F5E0001F19018B4AAC23776D8C40007, CLUSTER=DefaultCluster, TAGS=LitePullTags}, body=[49, 95, 76, 105, 116, 101, 80, 117, 108, 108, 80, 114, 111, 100, 117, 99, 101, 114], transactionId='null'}]
消息消费成功_MessageExt [brokerName=broker-a, queueId=2, storeSize=200, queueOffset=2, sysFlag=0, bornTimestamp=1686479335637, bornHost=/127.0.0.1:53451, storeTimestamp=1686479335661, storeHost=/127.0.0.1:10911, msgId=7F00000100002A9F000000000004DF6A, commitLogOffset=319338, bodyCRC=1570600805, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='LitePull', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=26, UNIQ_KEY=7F5E0001F19018B4AAC23776D8D5000F, CLUSTER=DefaultCluster, TAGS=LitePullTags}, body=[49, 54, 95, 76, 105, 116, 101, 80, 117, 108, 108, 80, 114, 111, 100, 117, 99, 101, 114], transactionId='null'}]
消息消费成功_MessageExt [brokerName=broker-a, queueId=2, storeSize=200, queueOffset=3, sysFlag=0, bornTimestamp=1686479335637, bornHost=/127.0.0.1:53451, storeTimestamp=1686479335661, storeHost=/127.0.0.1:10911, msgId=7F00000100002A9F000000000004E032, commitLogOffset=319538, bodyCRC=852476292, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='LitePull', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=26, UNIQ_KEY=7F5E0001F19018B4AAC23776D8D50010, CLUSTER=DefaultCluster, TAGS=LitePullTags}, body=[49, 55, 95, 76, 105, 116, 101, 80, 117, 108, 108, 80, 114, 111, 100, 117, 99, 101, 114], transactionId='null'}]
消息消费成功_MessageExt [brokerName=broker-a, queueId=2, storeSize=200, queueOffset=4, sysFlag=0, bornTimestamp=1686479335637, bornHost=/127.0.0.1:53451, storeTimestamp=1686479335663, storeHost=/127.0.0.1:10911, msgId=7F00000100002A9F000000000004E4E1, commitLogOffset=320737, bodyCRC=924559879, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='LitePull', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=26, UNIQ_KEY=7F5E0001F19018B4AAC23776D8D5000B, CLUSTER=DefaultCluster, TAGS=LitePullTags}, body=[49, 53, 95, 76, 105, 116, 101, 80, 117, 108, 108, 80, 114, 111, 100, 117, 99, 101, 114], transactionId='null'}]
消息消费成功_MessageExt [brokerName=broker-a, queueId=2, storeSize=200, queueOffset=5, sysFlag=0, bornTimestamp=1686479335637, bornHost=/127.0.0.1:53451, storeTimestamp=1686479335666, storeHost=/127.0.0.1:10911, msgId=7F00000100002A9F000000000004E801, commitLogOffset=321537, bodyCRC=1481186534, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='LitePull', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=26, UNIQ_KEY=7F5E0001F19018B4AAC23776D8D50017, CLUSTER=DefaultCluster, TAGS=LitePullTags}, body=[49, 52, 95, 76, 105, 116, 101, 80, 117, 108, 108, 80, 114, 111, 100, 117, 99, 101, 114], transactionId='null'}]
消息消费成功_MessageExt [brokerName=broker-a, queueId=2, storeSize=200, queueOffset=6, sysFlag=0, bornTimestamp=1686479335637, bornHost=/127.0.0.1:53451, storeTimestamp=1686479335666, storeHost=/127.0.0.1:10911, msgId=7F00000100002A9F000000000004E991, commitLogOffset=321937, bodyCRC=1743184709, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='LitePull', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=26, UNIQ_KEY=7F5E0001F19018B4AAC23776D8D5001A, CLUSTER=DefaultCluster, TAGS=LitePullTags}, body=[50, 56, 95, 76, 105, 116, 101, 80, 117, 108, 108, 80, 114, 111, 100, 117, 99, 101, 114], transactionId='null'}]
消息消费成功_MessageExt [brokerName=broker-a, queueId=2, storeSize=200, queueOffset=7, sysFlag=0, bornTimestamp=1686479335636, bornHost=/127.0.0.1:53451, storeTimestamp=1686479335667, storeHost=/127.0.0.1:10911, msgId=7F00000100002A9F000000000004EBE9, commitLogOffset=322537, bodyCRC=1446355043, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='LitePull', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=26, UNIQ_KEY=7F5E0001F19018B4AAC23776D8D4000A, CLUSTER=DefaultCluster, TAGS=LitePullTags}, body=[49, 50, 95, 76, 105, 116, 101, 80, 117, 108, 108, 80, 114, 111, 100, 117, 99, 101, 114], transactionId='null'}]
消息消费成功_MessageExt [brokerName=broker-a, queueId=2, storeSize=200, queueOffset=8, sysFlag=0, bornTimestamp=1686479335637, bornHost=/127.0.0.1:53451, storeTimestamp=1686479335667, storeHost=/127.0.0.1:10911, msgId=7F00000100002A9F000000000004F099, commitLogOffset=323737, bodyCRC=535165864, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='LitePull', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=26, UNIQ_KEY=7F5E0001F19018B4AAC23776D8D50014, CLUSTER=DefaultCluster, TAGS=LitePullTags}, body=[50, 49, 95, 76, 105, 116, 101, 80, 117, 108, 108, 80, 114, 111, 100, 117, 99, 101, 114], transactionId='null'}]
消息消费成功_MessageExt [brokerName=broker-a, queueId=2, storeSize=200, queueOffset=9, sysFlag=0, bornTimestamp=1686479335640, bornHost=/127.0.0.1:53451, storeTimestamp=1686479335672, storeHost=/127.0.0.1:10911, msgId=7F00000100002A9F000000000004F869, commitLogOffset=325737, bodyCRC=1182517109, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='LitePull', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=26, UNIQ_KEY=7F5E0001F19018B4AAC23776D8D80023, CLUSTER=DefaultCluster, TAGS=LitePullTags}, body=[51, 55, 95, 76, 105, 116, 101, 80, 117, 108, 108, 80, 114, 111, 100, 117, 99, 101, 114], transactionId='null'}]
消息接收成功
消息消费成功_MessageExt [brokerName=broker-a, queueId=1, storeSize=199, queueOffset=0, sysFlag=0, bornTimestamp=1686479335624, bornHost=/127.0.0.1:53451, storeTimestamp=1686479335655, storeHost=/127.0.0.1:10911, msgId=7F00000100002A9F000000000004D86B, commitLogOffset=317547, bodyCRC=461876872, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='LitePull', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=25, UNIQ_KEY=7F5E0001F19018B4AAC23776D8C80008, CLUSTER=DefaultCluster, TAGS=LitePullTags}, body=[56, 95, 76, 105, 116, 101, 80, 117, 108, 108, 80, 114, 111, 100, 117, 99, 101, 114], transactionId='null'}]
......
指定一个queue的消息
生产者:
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.CountDownLatch;
/**
* @Author jiangxuzhao
* @Description
* @Date 2023/6/11
*/
public class LitePullProducerAssign {
public static void main(String[] args) throws MQClientException, InterruptedException, RemotingException {
DefaultMQProducer producer = new DefaultMQProducer("AsyncProducer");
producer.setNamesrvAddr("127.0.0.1:9876");
producer.start();
System.out.println("生产者启动成功");
CountDownLatch countDownLatch = new CountDownLatch(100);
for(int i=0;i<100;i++){
final int index= i;
Message message = new Message("LitePullAssign", "LitePullAssignTags", (i+"_LitePullProducerAssign").getBytes(StandardCharsets.UTF_8));
// 异步发送,回调方法由broker调用
producer.send(message, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
System.out.println(index +"_消息发送成功_"+sendResult);
countDownLatch.countDown();
}
@Override
public void onException(Throwable throwable) {
System.out.println(index +"_消息发送失败_"+throwable.getStackTrace());
countDownLatch.countDown();
}
});
}
// 主线程等待异步发送结束
countDownLatch.await();
System.out.println("发送结束");
producer.shutdown();
}
}
消费者:
import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
/**
* @Author jiangxuzhao
* @Description 拉模式-指定一个queue的消息
* @Date 2023/6/11
*/
public class LitePullConsumerAssign {
public static void main(String[] args) throws MQClientException {
DefaultLitePullConsumer consumer = new DefaultLitePullConsumer("LitePullConsumerAssign");
consumer.setNamesrvAddr("127.0.0.1:9876");
consumer.start();
Collection<MessageQueue> messageQueues = consumer.fetchMessageQueues("LitePullAssign");
ArrayList<MessageQueue> messageQueues1 = new ArrayList<>(messageQueues);
consumer.assign(messageQueues1);
consumer.seek(messageQueues1.get(0),10);
while(true){
List<MessageExt> messageExts = consumer.poll();
System.out.println("消息接收成功");
messageExts.forEach(n->{
System.out.println("消息消费成功_"+n);
});
}
}
}
在检验的时候,需要生产者先把消息写到broker里面去,consumer才能拉取到,消费者输出如下:
消息消费成功_MessageExt [brokerName=broker-a, queueId=3, storeSize=218, queueOffset=57, sysFlag=0, bornTimestamp=1686486526339, bornHost=/127.0.0.1:54222, storeTimestamp=1686486526371, storeHost=/127.0.0.1:10911, msgId=7F00000100002A9F000000000005F037, commitLogOffset=389175, bodyCRC=1241947329, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='LitePullAssign', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=72, UNIQ_KEY=7F5E0001F35618B4AAC237E491830029, CLUSTER=DefaultCluster, TAGS=LitePullAssignTags}, body=[52, 52, 95, 76, 105, 116, 101, 80, 117, 108, 108, 80, 114, 111, 100, 117, 99, 101, 114, 65, 115, 115, 105, 103, 110], transactionId='null'}]
消息消费成功_MessageExt [brokerName=broker-a, queueId=3, storeSize=218, queueOffset=58, sysFlag=0, bornTimestamp=1686486526339, bornHost=/127.0.0.1:54222, storeTimestamp=1686486526371, storeHost=/127.0.0.1:10911, msgId=7F00000100002A9F000000000005F111, commitLogOffset=389393, bodyCRC=2065607606, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='LitePullAssign', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=72, UNIQ_KEY=7F5E0001F35618B4AAC237E49183002A, CLUSTER=DefaultCluster, TAGS=LitePullAssignTags}, body=[52, 51, 95, 76, 105, 116, 101, 80, 117, 108, 108, 80, 114, 111, 100, 117, 99, 101, 114, 65, 115, 115, 105, 103, 110], transactionId='null'}]
消息消费成功_MessageExt [brokerName=broker-a, queueId=3, storeSize=218, queueOffset=59, sysFlag=0, bornTimestamp=1686486526339, bornHost=/127.0.0.1:54222, storeTimestamp=1686486526371, storeHost=/127.0.0.1:10911, msgId=7F00000100002A9F000000000005F2C5, commitLogOffset=389829, bodyCRC=1548273985, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='LitePullAssign', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=72, UNIQ_KEY=7F5E0001F35618B4AAC237E491830023, CLUSTER=DefaultCluster, TAGS=LitePullAssignTags}, body=[51, 55, 95, 76, 105, 116, 101, 80, 117, 108, 108, 80, 114, 111, 100, 117, 99, 101, 114, 65, 115, 115, 105, 103, 110], transactionId='null'}]
消息接收成功
消息消费成功_MessageExt [brokerName=broker-a, queueId=3, storeSize=218, queueOffset=60, sysFlag=0, bornTimestamp=1686486526339, bornHost=/127.0.0.1:54222, storeTimestamp=1686486526371, storeHost=/127.0.0.1:10911, msgId=7F00000100002A9F000000000005F39F, commitLogOffset=390047, bodyCRC=322785486, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='LitePullAssign', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=72, UNIQ_KEY=7F5E0001F35618B4AAC237E49183002C, CLUSTER=DefaultCluster, TAGS=LitePullAssignTags}, body=[52, 54, 95, 76, 105, 116, 101, 80, 117, 108, 108, 80, 114, 111, 100, 117, 99, 101, 114, 65, 115, 115, 105, 103, 110], transactionId='null'}]
消息消费成功_MessageExt [brokerName=broker-a, queueId=3, storeSize=218, queueOffset=61, sysFlag=0, bornTimestamp=1686486526339, bornHost=/127.0.0.1:54222, storeTimestamp=1686486526371, storeHost=/127.0.0.1:10911, msgId=7F00000100002A9F000000000005F479, commitLogOffset=390265, bodyCRC=1881692935, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='LitePullAssign', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=72, UNIQ_KEY=7F5E0001F35618B4AAC237E49183002E, CLUSTER=DefaultCluster, TAGS=LitePullAssignTags}, body=[52, 57, 95, 76, 105, 116, 101, 80, 117, 108, 108, 80, 114, 111, 100, 117, 99, 101, 114, 65, 115, 115, 105, 103, 110], transactionId='null'}]
顺序消息
生产者局部有序地发送到一个queue中,但多个queue之间是全局无序的。
生产者:通过MessageQueueSelector将消息有序地发送到同一个queue中。
package order;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.MessageQueueSelector;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.remoting.exception.RemotingException;
import java.nio.charset.StandardCharsets;
import java.util.List;
/**
* @Author jiangxuzhao
* @Description
* @Date 2023/6/11
*/
public class OrderProducer {
public static void main(String[] args) throws MQClientException, MQBrokerException, RemotingException, InterruptedException {
DefaultMQProducer producer = new DefaultMQProducer("OrderProducer");
producer.setNamesrvAddr("127.0.0.1:9876");
producer.start();
// i的消息都是按顺序发送的
for(int i=0;i<5;i++){
for(int j=0;j<10;j++){
Message message = new Message("Order","OrderTag",("order_"+i+"_step_"+j).getBytes(StandardCharsets.UTF_8));
// MessageQueue分配
SendResult sendResult = producer.send(message, new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
Integer id=(Integer) arg;
int index= id%mqs.size();
return mqs.get(index); // 返回要发送到的MessageQueue
}
}, i); // 传入的就是arg
System.out.println("消息发送成功_"+sendResult);
}
}
producer.shutdown();
}
}
消费者:通过MessageListenerOrderly消费者每次读取消息都只从一个queue中获取。
package order;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.*;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;
/**
* @Author jiangxuzhao
* @Description
* @Date 2023/6/11
*/
public class OrderConsumer {
public static void main(String[] args) throws MQClientException {
// 推模式,消费者等到Broker把消息推过来
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("PushComsumer");
consumer.setNamesrvAddr("127.0.0.1:9876");
consumer.subscribe("Order","*");
// 顺序消费
consumer.registerMessageListener(new MessageListenerOrderly() {
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
for(int i=0;i<msgs.size();i++){
System.out.println(i+"_消息消费成功_"+new String(msgs.get(i).getBody()));
}
return ConsumeOrderlyStatus.SUCCESS;
}
});
consumer.start();
System.out.println("消费者启动成功");
}
}
消费者输出:
外循环都是为3的时候,内循环按照顺序消费
消费者启动成功
0_消息消费成功_order_3_step_0
0_消息消费成功_order_3_step_1
0_消息消费成功_order_3_step_2
0_消息消费成功_order_3_step_3
0_消息消费成功_order_3_step_4
0_消息消费成功_order_3_step_5
0_消息消费成功_order_3_step_6
0_消息消费成功_order_3_step_7
0_消息消费成功_order_3_step_8
0_消息消费成功_order_3_step_9
如果改成并发消费,输出就乱了
消费者启动成功
0_消息消费成功_order_3_step_7
0_消息消费成功_order_3_step_9
0_消息消费成功_order_3_step_4
0_消息消费成功_order_3_step_8
0_消息消费成功_order_3_step_2
0_消息消费成功_order_3_step_5
0_消息消费成功_order_3_step_0
0_消息消费成功_order_3_step_6
0_消息消费成功_order_3_step_1
0_消息消费成功_order_3_step_3
广播消息
广播消息并没有特定的消息消费者样例,这是因为这涉及到消费者的集群消费横式。
- MessageModel.BROADCASTING:广播消息。一条消息会发给所有订阅了对应主题的消费者,不管消费者是不是同一个消费者组。
- MessageModel.CLUSTERING:集群消息。每一条消息只会被同一个消费者组中的一个实例消费。
生产者:
选择了同步发送
package broadcast;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;
import java.nio.charset.StandardCharsets;
/**
* @Author jiangxuzhao
* @Description
* @Date 2023/6/9
*/
public class BroadcastProducer {
public static void main(String[] args) throws MQClientException, MQBrokerException, RemotingException, InterruptedException {
DefaultMQProducer producer = new DefaultMQProducer("SyncProducer");
producer.setNamesrvAddr("127.0.0.1:9876");
producer.start();
System.out.println("生产者启动成功");
for(int i=0;i<2;i++){
Message message = new Message("Broadcast", "BroadcastTags", (i+"_BroadcastProducer").getBytes(StandardCharsets.UTF_8));
// 同步发送
SendResult sendResult = producer.send(message);
System.out.println(i+"_消息发送成功"+sendResult);
}
producer.shutdown();
}
}
消费者:
MessageModel.BROADCASTING:广播消息
package broadcast;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import java.util.List;
/**
* @Author jiangxuzhao
* @Description
* @Date 2023/6/9
*/
public class BroadcastConsumer {
public static void main(String[] args) throws MQClientException {
// 推模式,消费者等到Broker把消息推过来
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("PushComsumer");
consumer.setNamesrvAddr("127.0.0.1:9876");
consumer.subscribe("Broadcast","*");
consumer.setMessageModel(MessageModel.BROADCASTING);
// 并行消费消息
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
for(int i=0;i<list.size();i++){
System.out.println(i+"_消息消费成功_"+new String(list.get(i).getBody()));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.out.println("消费者启动成功");
}
}
我这里开启了两个消费者
BroadcastConsumer输出:
消费者启动成功
0_消息消费成功_3_BroadcastProducer
0_消息消费成功_0_BroadcastProducer
0_消息消费成功_2_BroadcastProducer
0_消息消费成功_1_BroadcastProducer
0_消息消费成功_4_BroadcastProducer
0_消息消费成功_7_BroadcastProducer
0_消息消费成功_5_BroadcastProducer
0_消息消费成功_6_BroadcastProducer
0_消息消费成功_8_BroadcastProducer
0_消息消费成功_9_BroadcastProducer
BroadcastConsumer2输出:
消费者启动成功
0_消息消费成功_0_BroadcastProducer
0_消息消费成功_2_BroadcastProducer
0_消息消费成功_3_BroadcastProducer
0_消息消费成功_1_BroadcastProducer
0_消息消费成功_5_BroadcastProducer
0_消息消费成功_4_BroadcastProducer
0_消息消费成功_6_BroadcastProducer
0_消息消费成功_7_BroadcastProducer
0_消息消费成功_8_BroadcastProducer
0_消息消费成功_9_BroadcastProducer
MessageModel.CLUSTERING:集群消息
消费者改下:
package broadcast;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import java.util.List;
/**
* @Author jiangxuzhao
* @Description
* @Date 2023/6/9
*/
public class BroadcastConsumer {
public static void main(String[] args) throws MQClientException {
// 推模式,消费者等到Broker把消息推过来
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("PushComsumer");
consumer.setNamesrvAddr("127.0.0.1:9876");
consumer.subscribe("Broadcast","*");
consumer.setMessageModel(MessageModel.CLUSTERING);
// 并行消费消息
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
for(int i=0;i<list.size();i++){
System.out.println(i+"_消息消费成功_"+new String(list.get(i).getBody()));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.out.println("消费者启动成功");
}
}
我这里开启了两个消费者
BroadcastConsumer输出:
消费者启动成功
0_消息消费成功_0_BroadcastProducer
0_消息消费成功_4_BroadcastProducer
0_消息消费成功_8_BroadcastProducer
0_消息消费成功_3_BroadcastProducer
0_消息消费成功_7_BroadcastProducer
BroadcastConsumer2输出:
消费者启动成功
0_消息消费成功_0_BroadcastProducer
0_消息消费成功_4_BroadcastProducer
0_消息消费成功_8_BroadcastProducer
每个消费者消费了部分
延迟消息
消息在producer.send以后不会立刻发出去,而是会在producer端延迟一段时间
生产者:
package schedule;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;
import java.nio.charset.StandardCharsets;
import java.time.LocalTime;
/**
* @Author jiangxuzhao
* @Description
* @Date 2023/6/12
*/
public class ScheduleProducer {
public static void main(String[] args) throws MQClientException, MQBrokerException, RemotingException, InterruptedException {
DefaultMQProducer producer = new DefaultMQProducer("SyncProducer");
producer.setNamesrvAddr("127.0.0.1:9876");
producer.start();
System.out.println("生产者启动成功");
for(int i=0;i<2;i++){
Message message = new Message("Schedule", "ScheduleTags", (i+"_ScheduleProducer").getBytes(StandardCharsets.UTF_8));
// 延迟发送,单向发送
message.setDelayTimeLevel(2); // 延迟5s
producer.send(message);
System.out.println(i+"_消息发送成功"+ LocalTime.now());
}
producer.shutdown();
}
}
消费者:
package schedule;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import java.time.LocalTime;
import java.util.List;
/**
* @Author jiangxuzhao
* @Description
* @Date 2023/6/12
*/
public class ScheduleConsumer {
public static void main(String[] args) throws MQClientException {
// 推模式,消费者等到Broker把消息推过来
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ScheduleConsumer");
consumer.setNamesrvAddr("127.0.0.1:9876");
consumer.subscribe("Schedule","*");
// 并行消费消息
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
for(int i=0;i<list.size();i++){
System.out.println(i+"_消息消费成功_"+ LocalTime.now());
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.out.println("消费者启动成功");
}
}
假设消费者可以立刻接收到生产者已经发出的消息
生产者输出:
生产者启动成功
0_消息发送成功22:41:44.096
1_消息发送成功22:41:44.102
22:41:44.148 [NettyClientSelector_1] INFO RocketmqRemoting - closeChannel: close the connection to remote address[127.0.0.1:9876] result: true
22:41:44.151 [NettyClientSelector_1] INFO RocketmqRemoting - closeChannel: close the connection to remote address[127.0.0.1:10911] result: true
消费者输出:
消费者启动成功
0_消息消费成功_22:41:49.165
0_消息消费成功_22:41:49.165
批量消息
批量消息是指将多条消息合井成一个批量消息,一次发送出去。这样的好处是可以减少网络IO,提升吞吐量。
批量消息的使用限制:
- 消息大小不能超过4M,虽然源码注释不能超1M,但是实际使用不超过4M即可。平衡整体的性能,建议保持1M左右。
- 相同的Topic
- 相同的waitStoreMsgOK
- 不能是延迟消息、事务消息等
生产者:
其实就是producer.send一个ArrayList
package batch;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
/**
* @Author jiangxuzhao
* @Description
* @Date 2023/6/9
*/
public class BatchProducer {
public static void main(String[] args) throws MQClientException, MQBrokerException, RemotingException, InterruptedException {
DefaultMQProducer producer = new DefaultMQProducer("SyncProducer");
producer.setNamesrvAddr("127.0.0.1:9876");
producer.start();
System.out.println("生产者启动成功");
ArrayList<Message> list=new ArrayList<>();
for(int i=0;i<10;i++){
Message message = new Message("Batch", "BatchTags", (i+"_BatchProducer").getBytes(StandardCharsets.UTF_8));
list.add(message);
}
// 同步发送,批量发送
SendResult sendResult=producer.send(list);
System.out.println("消息发送成功_"+sendResult);
producer.shutdown();
}
}
消费者:
package batch;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;
/**
* @Author jiangxuzhao
* @Description
* @Date 2023/6/9
*/
public class BatchConsumer {
public static void main(String[] args) throws MQClientException {
// 推模式,消费者等到Broker把消息推过来
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("PushComsumer");
consumer.setNamesrvAddr("127.0.0.1:9876");
consumer.subscribe("Batch","*");
// 并行消费消息
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
for(int i=0;i<list.size();i++){
System.out.println(i+"_消息消费成功_"+new String(list.get(i).getBody()));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.out.println("消费者启动成功");
}
}
消费者输出:
消费者启动成功
0_消息消费成功_0_BatchProducer
0_消息消费成功_2_BatchProducer
0_消息消费成功_1_BatchProducer
0_消息消费成功_3_BatchProducer
0_消息消费成功_4_BatchProducer
0_消息消费成功_5_BatchProducer
0_消息消费成功_6_BatchProducer
0_消息消费成功_7_BatchProducer
0_消息消费成功_8_BatchProducer
0_消息消费成功_9_BatchProducer
过滤消息
- 过滤消息是在broker端进行的,broker服务会比较繁忙,consumer是将过滤条件推给broker端的。
- 只有推模式可以使用SQL过滤
Tag过滤
消费者:
package filter;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;
/**
* @Author jiangxuzhao
* @Description 过滤消息消费者-Tag方式
* @Date 2023/6/13
*/
public class FilterTagConsumer {
public static void main(String[] args) throws MQClientException {
// 推模式,消费者等到Broker把消息推过来
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("PushComsumer");
consumer.setNamesrvAddr("127.0.0.1:9876");
// 订阅指定Tag进行消息过滤
consumer.subscribe("Filter","TagA || TagC");
// 并行消费消息
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
for(int i=0;i<list.size();i++){
System.out.println(i+"_消息消费成功_"+new String(list.get(i).getBody()));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.out.println("消费者启动成功");
}
}
生产者:
package filter;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;
import java.nio.charset.StandardCharsets;
/**
* @Author jiangxuzhao
* @Description 过滤消息生产者-Tag方式
* @Date 2023/6/13
*/
public class FilterTagProducer {
public static void main(String[] args) throws MQClientException, MQBrokerException, RemotingException, InterruptedException {
DefaultMQProducer producer = new DefaultMQProducer("SyncProducer");
producer.setNamesrvAddr("127.0.0.1:9876");
producer.start();
System.out.println("生产者启动成功");
String[] Tags=new String[]{"TagA","TagB","TagC"};
for(int i=0;i<10;i++){
// 指定tag
Message message = new Message("Filter", Tags[i%Tags.length], (Tags[i%Tags.length]+"_FilterTagProducer").getBytes(StandardCharsets.UTF_8));
// 同步发送
SendResult sendResult = producer.send(message);
System.out.println(i+"_消息发送成功"+sendResult);
}
producer.shutdown();
}
}
消费者输出:
接收地异常慢
消费者启动成功
0_消息消费成功_TagA_FilterTagProducer
.....
sql过滤
消费者:
package filter;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.MessageSelector;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;
/**
* @Author jiangxuzhao
* @Description 过滤消息消费者-Sql方式
* @Date 2023/6/13
*/
public class FilterSqlConsumer {
public static void main(String[] args) throws MQClientException {
// 推模式,消费者等到Broker把消息推过来
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("PushComsumer");
consumer.setNamesrvAddr("127.0.0.1:9876");
// MessageSelector.bySql过滤消息
consumer.subscribe("Filter", MessageSelector.bySql("TAGS is not null and TAGS in ('TagA', 'TagC')"+
"and jiangxuzhao is not null and jiangxuzhao between 0 and 3"));
// 并行消费消息
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
for(int i=0;i<list.size();i++){
System.out.println(i+"_消息消费成功_"+new String(list.get(i).getBody()));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.out.println("消费者启动成功");
}
}
生产者:
package filter;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;
import java.nio.charset.StandardCharsets;
/**
* @Author jiangxuzhao
* @Description 过滤消息生产者-Sql方式
* @Date 2023/6/13
*/
public class FilterSqlProducer {
public static void main(String[] args) throws MQClientException, MQBrokerException, RemotingException, InterruptedException {
DefaultMQProducer producer = new DefaultMQProducer("SyncProducer");
producer.setNamesrvAddr("127.0.0.1:9876");
producer.start();
System.out.println("生产者启动成功");
String[] Tags=new String[]{"TagA","TagB","TagC"};
for(int i=0;i<10;i++){
// 指定tag
Message message = new Message("Filter", Tags[i%Tags.length], (Tags[i%Tags.length]+"_"+i+"_FilterTagProducer").getBytes(StandardCharsets.UTF_8));
// 在消息中放入过滤属性
message.putUserProperty("jiangxuzhao",String.valueOf(i));
// 同步发送
SendResult sendResult = producer.send(message);
System.out.println(Tags[i%Tags.length]+"_jiangxuzhao:"+i+"_消息发送成功"+sendResult);
}
producer.shutdown();
}
}
消费者输出:还是很慢
消费者启动成功
0_消息消费成功_TagA_0_FilterTagProducer
0_消息消费成功_TagA_3_FilterTagProducer
事务消息
主要和生产者有关系。
事务消息是在分布式系统中保证最终一致性的两阶段提交的消息实现。他可以保证本地事务执行与消息发送两个操作的原子性,也就是这两个操作一起成功或者一起失败。
事务消息机制的关键是在发送消息时会将消息转为一个half半消息,井存入RocketMQ内部的一个Topic(RMQ_ SYS_ TRANS_ HALF_ TOPIC),这个Topic对消费者是不可见的。再经过一系列事务检查通过后,再将消息转存到目标Topic,这样对消费者就可见了。
- 事务消息不支持延迟消息和批量消息。
- 为了避免单个消息被检查太多次而导致半队列消息累积,我们默认将单个消息的检查次数限制为 15 次,但是用户可以通过 Broker 配置文件的transactionCheckMax参数来修改此限制,如果已经检查某条消息超过N次的话 (N = transactionCheckMax)则 Broker将丢弃此消息,井在默认情況下同时打印错误日志。可以通过重写AbstractTransactionCheckListener类来修改这个行为。
- 事务性消息可能不止一次被检查或消费。
生产者:
package transaction;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.apache.rocketmq.client.producer.TransactionSendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* @Author jiangxuzhao
* @Description
* @Date 2023/6/14
*/
public class TransactionProducer {
public static void main(String[] args) throws MQClientException, MQBrokerException, RemotingException, InterruptedException {
// 事务消息生产者
TransactionMQProducer producer = new TransactionMQProducer("Transaction");
producer.setNamesrvAddr("127.0.0.1:9876");
// 异步提交事务状态,提升性能
ThreadPoolExecutor pool = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<>(2000), new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
thread.setName("thread-jiangxuzhao");
return thread;
}
});
producer.setExecutorService(pool);
// 本地事件监听器,与想要发送的Tag挂钩
producer.setTransactionListener(new TransactionListenerImpl());
producer.start();
System.out.println("生产者启动成功");
String[] Tags=new String[]{"TagA","TagB","TagC","TagD","TagE"};
for(int i=0;i<10;i++){
// 指定tag
Message message = new Message("Transaction", Tags[i%Tags.length], (Tags[i%Tags.length]+"_TransactionProducer").getBytes(StandardCharsets.UTF_8));
// 事务消息发送
TransactionSendResult transactionSendResult = producer.sendMessageInTransaction(message, null);
System.out.println(i+"_消息发送成功"+transactionSendResult);
}
// 主线程等待异步的线程回查都结束
Thread.sleep(100000);
producer.shutdown();
}
}
消费者:
package transaction;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;
/**
* @Author jiangxuzhao
* @Description
* @Date 2023/6/14
*/
public class TransactionConsumer {
public static void main(String[] args) throws MQClientException {
// 推模式,消费者等到Broker把消息推过来
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("PushComsumer");
consumer.setNamesrvAddr("127.0.0.1:9876");
consumer.subscribe("Transaction","*");
// 并行消费消息
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
for(int i=0;i<list.size();i++){
System.out.println(i+"_消息消费成功_"+new String(list.get(i).getBody()));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.out.println("消费者启动成功");
}
}
本地事件监听者:
package transaction;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
/**
* @Author jiangxuzhao
* @Description
* @Date 2023/6/14
*/
public class TransactionListenerImpl implements TransactionListener {
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
String tag = msg.getTags();
if (StringUtils.contains("TagA",tag)) {
return LocalTransactionState.COMMIT_MESSAGE; // 直接提交
}
if (StringUtils.contains("TagB",tag)) {
return LocalTransactionState.ROLLBACK_MESSAGE;
}else {
return LocalTransactionState.UNKNOW;
}
}
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
String tag = msg.getTags();
if (StringUtils.contains("TagC",tag)) {
return LocalTransactionState.COMMIT_MESSAGE; // 直接提交
}
if (StringUtils.contains("TagD",tag)) {
return LocalTransactionState.ROLLBACK_MESSAGE;
}else {
return LocalTransactionState.UNKNOW;
}
}
}
消费者输出:
TagA、TagC的消息是消费成功的
消费者启动成功
0_消息消费成功_TagA_TransactionProducer
0_消息消费成功_TagA_TransactionProducer
0_消息消费成功_TagC_TransactionProducer
0_消息消费成功_TagC_TransactionProducer
0_消息消费成功_TagC_TransactionProducer
RocketMQ常见问题
RocketMQ如何保证消息不丢失?
我们将消息流程分为三大部分,每一部分都有可能会丟失数据。
- 生产阶段:Producer通过网络将消息发送给Broker,这个发送可能会发生丢失。比如网络延迟不可达等。
- 存储阶段:Broker肯定是先把消息放到内存的,然后根据刷盘策路持久化到硬盘中,刚收到Producer的消息,放入内存,但是异常宕机了,导致消息丢失。
- 消费阶段:消费失败。比如先提交ack再消费,消费过程中出现异常,该消息就出现了丢失。
解决方案:
- 生产阶段:使用同步发送失败重试机制;异步发送重写回调方法检查发送结果;Ack确认机制。
- 存储阶段:同步刷盘机制;集群模式采用同步复制。
- 消费阶段:正常消费处理完成才提交ACK;如果处理异常返回重试标识。
RocketMQ的消息持久化机制
RocketMQ的消息持久化机制是指将消息存储在磁盘上,以确保消息能够可靠地存储和检素。RocketMQ 的消息持久化机制涉及到以下三个角色:Commitlog、ConsumeQueue 和 IndexFile.
- CommitLog:消息真正的存储文件,所有的消息(所有messageQueue的消息)都存在 CommitLog文件中。
RocketMQ默认会将消息数据先存储到内存中的一个缓冲区,每当缓冲区中积累了一定量的消息或者一定时间后,就会将缓冲区中的消息批量写入到磁盘上的CommitLog 文件中(同步刷盘写入cache,继而直接写入磁盘;异步刷盘写入cache,就不管写入磁盘了)。消息在写入 CommitLog 文件后就可以被消费者消费了。
Commitlog文件的大小固定1G,写满之后生成新的文件。
并且采用的是顺序写的方式。
- ConsumeQueue: 消息消费逻辑队列,类似数据库的索引文件。
RocketMQ 中每个主题下的每个消息队列都会对应一个 ConsumeQueue。 ConsumeQueue存储了消息的offset以及该offset对应的消息在CommitLog文件中的位置信息,便于消费者快速定位井消费消息。
每个ConsumeQueue文件固定由30万个固定大小20byte的数据块组成:据块的内容包括:commitLogOffset(8byte,消息在commitlog文件中的位置)+msgSize(4byte,消息在文件中占用的长度)+msg TagCode(8byte, 消息的tag的Hash值)。
- IndexFile:消息索引文件,主要存储消息key与offset的对应关系,提升消息检索速度。
如果生产者在发送消息时设置了消息Key,那么RocketMQ会将消息Key值和CommitLog Offset存在IndexFie文件中,这样当消费者需要根据消息Key查询消息时,就可以直接在IndexFile文件中查找对应的CommitLog Offset,然后通过 ConsumeQueue文件快速定位并消费消息。
IndexFile文件大小固定400M,可以保存2000W个索引。
三个角色构成的消息存储结构如下:
RocketMQ如何保证消息顺序
RocketMQ架构本身是无法保证消息有序的,但是提供了相应的API保证消息有序消费。RocketMQ APl利用FIFO先进先出的特性,保证生产者消息有序进入同一队列,消费者在同一队列消费就能达到消息的有序消费。
- 使用MessageQueueSelector编写有序消息生产者
有序消息生产者会按照一定的规则将消息发送到同一个队列中,从而保证同一个队列中的消息是有序的。RocketMQ 并不保证整个主题内所有队列的消息都是按照发送顺序排列的。
- 使用MessageListenerOrderly进行顺序消费与之对应的MessageListenerConcurrently并行消费 (push模式)
MessageListenerOrderly是RocketMQ 专门提供的一种顺序消费的接口,它可以让消费者按照消息发送的顺序,一个一个地处理消息。这个接口支持按照消息的重试次数进行顺序消费、订单ID等作为消息键来实现顺序消费、批量消费等操作。
通过加锁的方式实现(有超时机制),一个队列同时只有一个消费者;井且存在一个定时任务,每隔一段时间就会延长锁的时间,直到整个消息队列全部消费结束。
-
消费端自己保证消息顺序消费(pull模式)
-
消费者井发消费时设置消费线程为1
RocketMQ 的消费者可以开启多个消费线程同时消费同一个队列中的消息,如果要保证消息的顺序,需要将消费线程数设置为1。这样,在同一个队列中,每个消息只会被单个消费者线程消费,从而保证消息的顺序性
RocketMQ事务消息原理
RocketMQ 的事务消息是一种保证消息可靠性的机制。在RocketMQ中,事务消息的实现原理主要是通过两个发送阶段和一个确认阶段来实现的。(个人理解就是分布式事务中的两阶段提交的实现)
- 发送消息的预处理阶段:在发送事务消息之前,RocketMQ 会将消息的状态设置为 "Preparing”,并将消息存储到消息存储库中。
- 执行本地事务:当预处理阶段完成后,消息发送者需要执行本地事务,并返回执行结果 (commit 或 rollback)
- 消息的二次确认阶段:根据本地事务的执行结果,如果是commit,则 RocketMQ 将消息的状态设置为 “Committing”:否则将消息的状态设置为 “Rollback”
- 完成事务:最后在消息的消费者消费该消息时,RocketMQ 会根据消息的状态来决定是否提交该消息。如果消息的状态是“Commiting”,则直接提交该消息;否则
忽略该消息。
需要注意的是,如果在消息发送的过程中出现异常或者网络故降等问题,RocketMQ 会触发消息回查机制。在回查过程中,RocketMQ 会调用消息发送方提供的回查接口来确认事务的提交状态,从而解决消息投递的不确定性。