RocketMQ的消息类型
文章目录
- RocketMQ的消息类型
- 一、顺序消息
- 二、广播消息
- 应用场景:
- 示例代码:
- 实现思路:
- 注意点:
- 三、延时消息
- 应用场景:
- 核心方法:
- 四、批量消息
- 应用场景:
- 示例代码:
- 注意点:
- 五、过滤消息
- 应用场景:
- 示例代码:
- 简单过滤:
- SQL过滤:
- 实现思路:
- 六、事务消息
- 应用场景:
- 注意点:
一、顺序消息
顺序消息指生产者局部有序发送一个queue,但是多个queue之间时全局无序的。
-
顺序消息生产者样例:通过MessageQueueSelector将消息有序发送到同一个queue中。
import org.apache.rocketmq.client.exception.MQBrokerException; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.MessageQueueSelector; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.remoting.exception.RemotingException; import java.nio.charset.StandardCharsets; import java.util.List; /** * 顺序消息生产者 * @author * @date 2024年10月23日 11:00 */ public class OrderProducer { public static void main(String[] args) throws MQBrokerException, RemotingException, InterruptedException, MQClientException ,MQBrokerException, RemotingException, InterruptedException { //创建一个生产者 DefaultMQProducer producer = new DefaultMQProducer("orderProducer"); producer.setNamesrvAddr("localhost:9876"); try { producer.start(); } catch (MQClientException e) { throw new RuntimeException(e); } //一个外循环对应十条内循环消息,以便观察消息的有序性 for (int i = 0; i < 5; i++) { for (int j = 0; j < 10; j++) { Message message = new Message("TopicOrder","TagA",("order_"+ i +"_step_"+ j).getBytes(StandardCharsets.UTF_8)); SendResult send = producer. send(message, new MessageQueueSelector() { @Override public MessageQueue select(List<MessageQueue> list, Message message, Object o) { Integer id = (Integer) o; int index = id % list.size(); return list.get(index); } }, i); System.out.println("消息发送成功_"+ send); } } producer.shutdown(); } }
-
顺序消息消费者样例:通过MessageListenerOrderly消费者每次读取消息都只从一个queue中获取(通过加锁的方式实现)
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.listener.*; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.message.MessageExt; import java.util.List; /** * 顺序消息消费者 * @author * @date 2024年10月23日 11:00 */ public class OrderConsumer { public static void main(String[] args) throws MQClientException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("order_consumer"); consumer.setNamesrvAddr("localhost:9876"); consumer.subscribe("TopicOrder", "*"); // consumer.setMessageListener(new MessageListenerOrderly() { // @Override // public ConsumeOrderlyStatus consumeMessage(List<MessageExt> list, ConsumeOrderlyContext consumeOrderlyContext) { // for (int i = 0; i < list.size(); i++) { // System.out.println(i+"_消息消费成功_"+ new String(list.get(i).getBody())); // } // return ConsumeOrderlyStatus.SUCCESS; // } // }); consumer.setMessageListener(new MessageListenerConcurrently() { public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext context) { for (int i = 0; i < list.size(); i++) { System.out.println(i+"_消息消费成功_"+ new String(list.get(i).getBody())); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer. start(); System.out.println("consumer started.%n"); } }
二、广播消息
应用场景:
广播模式和集群模式是RocketMQ的消费端处理消息最基本的两种模式,集群模式下,一个消息,只会被一个消费者组中的多个消费者实例共同处理一次。广播模式下,一个消息,则会推送所有消费者实例处理,不再关心消费者组。
示例代码:
consumer.setMessageModel(MessageModel.BROADCASTING);
实现思路:
默认模式也就是集群模式下,Broker端会给每个ConsumerGroup委会一个同意的Offset,这样,打给你Consumer来拉取消息是,就可以通过Offset保证一个消息,在同一个ConsumerGroup内只会被消费一次,而广播模式的本质,是将Offset转移到Consumer端自行保管,包括Offset的记录以及更新,全都放在客户端。这样Broker推送消息是,就不再管ConsumerGroup,只要Consumer来拉取消息,就返回对应的消息。
注意点:
-
Broker端不维护消息消费进度,意味着,如果消费者处理消息失败了,将无法进行消息重试。
-
Consumer端维护Offset的作用是可以在服务重启时,按照上一次消费的进度,处理后面没有消费过的消息。如果Offset丢了,Consumer依然可以拉取消息。
比如生产者发送了1~10号消息。消费者当前消费到第6个时宕机了。当他重启时,Broker端已经把第十个消息都推送完成了。如果消费者维护好了自己的Offset,那么他就可以在服务重启时,重新向Broker申请6号~10号的消息。但是,如果消费者端的Offset丢失了,消费者服务依然可以正常运行,但是6~10号消息就无法再申请。后续这个消息者就只能获取10号以后的消息。
三、延时消息
应用场景:
延时消息发送是指消息发送到Apache RocketMQ后,并不期望立马投递这条消息,而是延迟一定时间后才投递到Consumer进行消费。
对比RabbitMQ和Kafka。RabbitMQ中只能通过死信队列变相实现延迟消息,或者加装一个插件来支持延迟消息。Kafka则不太好实现延迟消息。
核心方法:
//指定固定的延迟级别
Message message = new Message(TOPIC,("Hello scheduled message"+i).getBytes(StandardsCharsets.UTF_8));
message.setDelayTimelevel(3);//10秒之后发送
//指定消息发送时间
Message message = new Message(TOPIC,("Hello scheduled message"+i).getBytes(StandardsCharsets.UTF_8));
message.setDeliverTimeMs(System.currentTimeMillis()+10_000L);//10秒之后的时间点
四、批量消息
应用场景:
生产者要发送的消息比较多时,可以将多条消息合并成一个批量消息,一次性发送出去。这样可以减少网络IO,提升消息发送的吞吐量。
示例代码:
List<Message> message = new ArraayList<>(MESSAGE_COUNT);
for(int i = 0;i<MESSAGE_COUNT;i++){
message.add(new Message(TOPIC,TAG,("Hello world"+i).getBytes(StandardsCharsets.UTF_8)))
}
ListSplitter splitter = new ListSplitter(message);
while(splitter.hasNext()){
List<Message> listItem = splitter.next();
SendResult sendResult = producer.send(listItem);
System.oout.printf("%s",sendResult);
}
注意点:
批量消息的使用非常简单,但是要注意RocketMQ做了限制。同一批消息的Topic必须相同,另外,不支持延迟消息。还有;;idling消息的大小不要超过1M,如果太大就需要自行分割。
五、过滤消息
应用场景:
同一个Topic下有多种不同的消息,消费者只希望关注某一类消息。
例如,某系统中给仓储系统分配一个Topic,在Topic下,会传递过来入库,出库等不同的消息,仓储系统的不同业务消费者就需要过滤出自己感兴趣的消息,进行不同的业务操作。
示例代码:
简单过滤:
生产者端需要在发送消息时,增加Tag属性。比如我们上面举例当中的入库、出库。核心代码:
String[] tags = new String[] {"TagA","TagB","TagC"};
for(int i = 0;i< 15;i++){
Message msg = new Message("TagFilterTest",tags[i % tags.length],"Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n",sendResult);
}
消费者端就可以通过这个Tag属性订阅自己感兴趣的内容,核心代码:
consumer.subscribe("TagFilterTest","TagA");
这样,后续Consumer就只会出处理TagA的消息
SQL过滤:
通过Tag属性,只能进行简单的消息匹配。如果要进行更复杂的消息过滤,比如数字比较,模糊匹配等,就需要使用SQL过滤方式可以通过Tag属性以及用户自定义的属性一起,以标准SQL的方式进行消息过滤。
生产者端就在发送消息时,除了Tag属性外,还可以增加自定义属性。核心代码:
String[] tags = new String[] {"TagA","TagB","TagC"};
for(int i = 0; i < 15; i++){
Message msg = new Message("SqlFilterTest",tag[i%tags.length],("Hello RocketMQ" + i).getBytes(RemotingHelper.DEFAILT_CHARSET));
msg.putUserProperty("a",String.valueOf(i));
SendResult sendResult = producer.sen(msg);
System.out.printf("%s%n",sendResult);
}
消费者端在进行过滤时,可以指定一个标准的SQL语句,定制复杂的过滤规则。核心代码:
consumer.subscribe("SqlFilterTest",MessageSelector.bySql("(TAGS is not null and TAG in('TagA','TagB'))"+"and(a is not and a between 0 and 3)"));
实现思路:
实际上,Tags和用户自定义的属性,都是随着消息一起传递的,所以,消费者端是可以拿到Tags和自定义属性的。比如“
consumer.registerMessageListener(new MessageListenerConcurrently(){
@Override
public ConsumeConcurrentlyStatus consumeMessag(List<MessageExt> msgd,
ConsumeConcurrentlyContext context{
for(MessageExt msg:msgs){
System.out.println(msg.getTags());
System.out.println(msg.getProperTties());
}
System.out.println("%s Receive New Message: %s %n",Thread.currenThread().getName(),msg);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
})
})
这样,剩下的就是在Consumer中对消息进行过滤了。Broker会在往Consumer推送消息时,在Broker端进行消息过滤。是Consumer感兴趣的消息,就往Consumer推送。
六、事务消息
应用场景:
事务消息时RocketMQ非常特色的一个高级功能。他的基础诉求是通过RocketMQ的事务机制,来保证上下游的数据一致性。
以电商为例,用户支付订单这一核心操作的同时会涉及到下游物流发货、积分变更、购物车状态清空等多个子系统的变更。这种场景,非常适合使用RocketMQ的解藕功能来进行串联。
考虑到事务的安全性,即要保证相关联的这几个业务一定是同时成功或者同时失败的。如果要将四个服务一起作为一个分布式事务来控制,可以做到,但是会非常麻烦。而使用RocketMQ在中间串联了之后,事情可以得到一定程度的简化。由于RocketMQ与消费者端有失败重试机制,所以,只要消息成功发送到RocketMQ了,那么可以认为Branch2.1,Branch2.2,Branch2.3这几个分支步骤,是可以保证最终的数据一致性的。这样,一个复杂的分布式事务问题,就变成了MinBranch1和Branch2两个步骤的分布式事务问题。
在此基础上,RocketMQ提出了事务消息机制,采用两阶段提交的思路,保证Main Branch1和Branch2之间的事务一致性。
注意点:
- 半消息是对消费者不可见的一种消息。实际上,RocketMQ的做法是将消息转到了一个系统Topic,RMQ_SYS_TRANS_HALF_TOPIC.
- 事务消息中,本地事务回查次数通过参数transactionCheckMax设定,默认15次。本地事务回查的间隔通过参数transactionCheckInterval设定,默认60秒。超过回查次数后,消息将会被丢弃。
- 在具体执行时,可以对事务流程进行适当的调整。