RocketMQ的消息模型
深入理解RocketMQ的消息模型
RocketMQ客户端基本流程
RocketMQ基于Maven提供了客户端的核心依赖:
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.9.5</version>
</dependency>
简单生产者实现:
public class Producer
{
public static void main(String[] args) throws MQClientException, InterruptedException
{
// 初始化一个消息生产者
DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
// 指定nameserver地址
producer.setNamesrvAddr("你的公网IP:9876");
// 启动消息生产者服务
producer.start();
for (int i = 0; i < 2; i++)
{
try
{
// 创建消息。消息由Topic,Tag和body三个属性组成,其中Body就是消息内容
Message msg =
new Message("TopicTest", "TagA", ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
// 发送消息,获取发送结果
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
}
catch (Exception e)
{
e.printStackTrace();
Thread.sleep(1000);
}
}
// 消息发送完后,停止消息生产者服务。
producer.shutdown();
}
}
注意生产者producer启动后不会马上关闭,而是使用后关闭
简单消费者实现:
public class Consumer
{
public static void main(String[] args) throws InterruptedException, MQClientException
{
// 构建一个消息消费者
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4");
// 指定nameserver地址
consumer.setNamesrvAddr("你的公网IP:9876");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
// 订阅一个感兴趣的话题,这个话题需要与消息的topic一致
consumer.subscribe("TopicTest", "*");
// 注册一个消息回调函数,消费到消息后就会触发回调。
consumer.registerMessageListener(new MessageListenerConcurrently()
{
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context)
{
msgs.forEach(messageExt -> {
try
{
System.out.println("收到消息:" + new String(messageExt.getBody(), RemotingHelper.DEFAULT_CHARSET));
}
catch (UnsupportedEncodingException e)
{
}
});
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
// 启动消费者服务
consumer.start();
System.out.print("Consumer Started");
}
}
注意消费者启动后没有关闭,而是建立长连接,继续接受后续消息
指定NameServer的方式有两种:(第一种方式的优先级更高)
1、客户端直接指定,实例代码中的setNameSrvAddr("**:9876")
2、通过读取系统环境变量NAMESRV_ADDR指定,读取环境变量可以不写setNameSrvAddr
消息确认机制
RocketMQ要支持互联网金融场景,需要保证消息安全。
消息安全的要求:生产者确保将消息发送到Broker上;消费者确保从Broker上正确获取到消息
消息生产端采用消息确认加多次重试的机制保证消息正常发送到RocketMQ
针对消息发送的不确定性,封装了三种发送消息的方式
-
单向发送
消息生产者只管往Broker发送消息,而全然不关心Broker端有没有成功接收到消息
public class OnewayProducer
{
public static void main(String[] args) throws Exception
{
DefaultMQProducer producer = new DefaultMQProducer("producerGroup");
producer.setNamesrvAddr("你的公网IP:9876");
producer.start();
Message message = new Message("Order", "tag", "order info : orderId = xxx".getBytes(StandardCharsets.UTF_8));
producer.sendOneway(message);
Thread.sleep(50000);
producer.shutdown();
}
}
缺点:sendOneway方法没有返回值,如果发送失败,生产者无法补救
优点:发送消息的效率更高
适用场景:追求消息发送效率,允许消息丢失的业务场景。比如日志
-
同步发送
消息生产者在往Broker端发送消息后,会阻塞当前线程,等待Broker端的响应结果
SendResult sendResult = producer.send(msg);
public enum SendStatus {
SEND_OK,
FLUSH_DISK_TIMEOUT,
FLUSH_SLAVE_TIMEOUT,
SLAVE_NOT_AVAILABLE,
}
SEND_OK表示消息已经成功发送到Broker上。其他几种枚举值,表示消息在Broker端处理失败
注意:如果Broker端返回的SendStatus不是SEND_OK,也并不表示消息就一定不会推送给下游的消费者。仅仅只是表示Broker端并没有完全正确的处理这些消息。因此,如果要重新发送消息,最好要带上唯一的系统标识,这样在消费者端,才能自行做幂等判断。
缺点:producer在send发出消息,到Broker返回SendResult的过程中,无法做其他的事情
优点:使用同步发送的机制,可以在消息生产者发送完消息后,对发送失败的消息进行补救。例如重新发送
总结:同步发送的机制能够很大程度上保证消息发送的安全性。但是,这种同步发送机制的发送效率比较低。毕竟,send方法需要消息在生产者和Broker之间传输一个来回后才能结束。如果网速比较慢,同步发送的耗时就会很长。
-
异步发送
生产者在向Broker发送消息时,会同时注册一个回调函数。接下来生产者并不等待Broker的响应。当Broker端有响应数据过来时,自动触发回调函数进行对应的处理。
public class AsyncProducer
{
public static void main(String[] args) throws Exception
{
DefaultMQProducer producer = new DefaultMQProducer("producerGroup");
producer.setNamesrvAddr("你的公网IP:9876");
producer.start();
Message message = new Message("Order", "tag", "order info : orderId = xxx".getBytes(StandardCharsets.UTF_8));
producer.send(message, new SendCallback()
{
@Override
public void onSuccess(SendResult sendResult)
{
System.out.println("订单消息发送成功");
}
@Override
public void onException(Throwable e)
{
System.out.println("订单消息发送失败,失败原因是:" + e.getMessage());
}
});
Thread.sleep(5000);
producer.shutdown();
}
}
当Broker端返回消息处理成功的响应信息SendResult时,就会调用onSuccess方法。当Broker端处理消息超时或者失败时,就会调用onExcetion方法,生产者就可以在onException方法中进行补救措施。
注意:触发了SendCallback的onException方法同样并不一定就表示消息不会向消费者推送。如果Broker端返回响应信息太慢,超过了超时时间,也会触发onException方法。超时时间默认是3秒,可以通过producer.setSendMsgTimeout方法定制。而造成超时的原因则有很多,消息太大造成网络拥堵、网速太慢、Broker端处理太慢等都可能造成消息处理超时。
另外SendCallback的对应方法被触发之前,生产者不能调用shutdown()方法。如果消息处理完之前,生产者线程就关闭了,生产者的SendCallback对应方法就不会触发。方法触发前调用shutdown会导致反馈接受不到,SendCallback依附于生产者的主线程才能执行
优点:异步发送的机制能够比较好的兼容消息的安全性以及生产者的高吞吐需求,是很多MQ产品都支持的方式
消息消费者端采用状态确认机制保证消费者一定能正常处理对应的消息
public class OrderConsumer
{
public static void main(String[] args)
throws Exception
{
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("RedEnvelope");
// 从上次消费进度开始消费
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
// 订阅Order主题
consumer.subscribe("Order", "*");
// 注册回调监听器
consumer.registerMessageListener(new MessageListenerConcurrently()
{
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context)
{
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.out.println("订单服务已启动");
}
}
CONSUME_SUCCESS:消息处理结束
RECONSUME_LATER:消费者没有处理成功,Broker就会过一段时间再发起消息重试。
为了要兼容重试机制的成功率和性能,RocketMQ设计了一套非常完善的消息重试机制,从而尽可能保证消费者能够正常处理用户的订单信息。
1、Broker不可能无限制的向消费失败的消费者推送消息。防止浪费集群的性能,Broker会记录每一个消息的重试次数。如果一个消息经过很多次重试后,消费者依然无法正常处理,那么Broker会将这个消息推入到消费者组对应的死信Topic中。可以人工介入对死信Topic中的消息进行补救,也可以直接彻底删除这些消息。RocketMQ默认的最大重试次数是16次。
2、为了让这些重试的消息不会影响Topic下其他正常的消息,Broker会给每个消费者组设计对应的重试Topic。MessageQueue是一个具有严格FIFO特性的数据结构。如果需要重试的这些消息还是放在原来的MessageQueue中,就会对当前MessageQueue产生阻塞,让其他正常的消息无法处理。RocketMQ的做法是给每个消费者组自动生成一个对应的重试Topic。在消息需要重试时,会先移动到对应的重试Topic中。后续Broker只要从这些重试Topic中不断拿出消息,往消费者组重新推送即可。这样,这些重试的消息有了自己单独的队列,就不会影响到Topic下的其他消息了。
3、RocketMQ中设定的消费者组都是订阅主题和消费逻辑相同的服务备份,所以当消息重试时,Broker只要往消费者组中随意一个实例推送即可。这是消息重试机制能够正常运行的基础。但是,在客户端的具体实现时,MQDefaultMQConsumer并没有强制规定消费者组不能重复。也就是说,你完全可以实现出一些订阅主题和消费逻辑完全不同的消费者服务,共同组成一个消费组。在这种情况下,RocketMQ不会报错,但是消息的处理逻辑就无法保持一致了。这会给业务带来很大的麻烦。这是在实际应用时需要注意的地方。
4、Broker端最终只通过消费者组返回的状态来确定消息有没有处理成功。至于消费者组自己的业务执行是否正常,Broker端是没有办法知道的。因此,在实现消费者的业务逻辑时,应该要尽量使用同步实现方式,保证在自己业务处理完成之后再向Broker端返回状态。而应该尽量避免异步的方式处理业务逻辑。
小结:推送重试次数和重试消息放在单独Topic是MQ做的;开发时尽量让消费者组里订阅同一个Topic做相同处理逻辑,消费者业务逻辑最好采用同步方式,处理成功后再通知Broker防止漏处理
消费者也可以自行指定起始消费位点
Broker端通过Consumer返回的状态来推进所属消费者组对应的Offset。但是,这里还是会造成一种分裂,消息最终是由Consumer来处理,但是消息却是由Broker推送过来的,也就是说,Consumer无法确定自己将要处理的是哪些消息。
对消息队列也一样。虽然Offset完全由Broker进行维护,但是,RocketMQ也允许Consumer自己指定消费位点。核心代码是在Consumer中设定了一个属性ConsumeFromWhere,表示在Consumer启动时,从哪一条消息开始进行消费。
ConsumerFromWhere并不是直接传入Offset位点,而是可以传入一个ConsumerFromWhere枚举对象
public enum ConsumeFromWhere
{
CONSUME_FROM_LAST_OFFSET, //从上次消费到的地方开始继续消费
CONSUME_FROM_FIRST_OFFSET, //从队列的第一条消息开始重新消费
CONSUME_FROM_TIMESTAMP; //从某一个时间点开始重新消费
}
如果指定了ConsumerFromWhere.CONSUME_FROM_TIMESTAMP,这就表示要从一个具体的时间开始。具体时间点,需要通过Consumer的另一个属性ConsumerTimestamp。
consumer.setConsumerTimestamp("20131201211205");
广播消息
应用场景:
广播模式和集群模式是RocketMQ的消费者端处理消息最基本的两种模式。
集群模式下,一个消息,只会被一个消费者组中的多个消费者实例共同处理一次。
广播模式下,一个消息,则会推送给所有消费者实例处理,不再关心消费者组。
消费者核心代码:
consumer.setMessageModel(MessageModel.BROADCASTING);
实现思路:
默认模式(也就是集群模式)下,Broker端会给每个ConsumerGroup维护一个统一的Offset,这个Offset可以保证一个消息,在同一个ConsumerGroup内只会被消费一次。而广播模式的实现方式,是将Offset转移到消费者端自行保管,这样Broker端只管向所有消费者推送消息,而不用负责维护消费进度。
注意:
1、Broker端不维护消费进度,意味着,如果消费者处理消息失败了,将无法进行消息重试。
2、消费者端维护Offset的作用是可以在服务重启时,按照上一次消费的进度,处理后面没有消费过的消息。丢了也不影响服务稳定性。
实际上,Offset的维护数据是放在 ${user.home}/.rocketmq_offset/${clientIp}${instanceName}/${group}/offsets.json 文件下的。
消费者端存储广播消费的本地offsets文件的默认缓存目录是 System.getProperty(“user.home”) + File.separator + “.rocketmq_offsets” ,可以通过定制 rocketmq.client.localOffsetStoreDir 系统属性进行修改。
本地offsets文件在缓存目录中的具体位置与消费者的clientIp 和 instanceName有关。其中instanceName默认是DEFAULT,可以通过定制系统属性rocketmq.client.name进行修改。另外,每个消费者对象也可以单独设定instanceName。
RocketMQ会通过定时任务不断尝试本地Offsets文件的写入,但是,如果本地Offsets文件写入失败,RocketMQ不会进行任何的补救。
参考文章:RocketMQ广播消费本地Offset文件丢失问题探秘
顺序消息机制
应用场景:
每一个订单有从下单、锁库存、支付、下物流等几个业务步骤。每个业务步骤都由一个消息生产者通知给下游服务。如何保证对每个订单的业务处理顺序不乱?
生产者核心代码:
for (int i = 0; i < 10; i++)
{
int orderId = i;
for (int j = 0; j <= 5; j++)
{
Message msg = new Message("OrderTopicTest", "order_" + orderId, "KEY" + orderId,
("order_" + orderId + " step " + j).getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.send(msg, new MessageQueueSelector()
{
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg)
{
Integer id = (Integer)arg;
int index = id % mqs.size();
return mqs.get(index);
}
}, orderId);
System.out.printf("%s%n", sendResult);
}
}
通过MessageSelector,将orderId相同的消息,都转发到同一个MessageQueue中
消费者核心代码:
consumer.registerMessageListener(new MessageListenerOrderly()
{
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context)
{
context.setAutoCommit(true);
for (MessageExt msg : msgs)
{
System.out.println("收到消息内容 " + new String(msg.getBody()));
}
return ConsumeOrderlyStatus.SUCCESS;
}
});
注入一个MessageListenerOrderly实现
实现思路:
1、生产者将一批有顺序要求的消息,放到同一个MesasgeQueue上。
2、消费者一次锁定一个MessageQueue,拿到MessageQueue上所有的消息
注意:
1、理解局部有序与全局有序。大部分业务场景下,我们需要的其实是局部有序。如果要保持全局有序,那就只保留一个MessageQueue。性能显然非常低。
2、生产者端尽可能将有序消息打散到不同的MessageQueue上,避免过于集中导致数据热点竞争。
3、消费者端只能用同步的方式处理消息,不要使用异步处理。更不能自行使用批量处理。
4、消费者端只进行有限次数的重试。如果一条消息处理失败,RocketMQ会将后续消息阻塞住,让消费者进行重试。但是,如果消费者一直处理失败,超过最大重试次数,那么RocketMQ就会跳过这一条消息,处理后面的消息,这会造成消息乱序。
5、消费者端如果确实处理逻辑中出现问题,不建议抛出异常,可以返回ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT作为替代。
延迟消息
消息发送到Apache RocketMQ后,并不期望立马投递这条消息,而是延迟一定时间后才投递到Consumer进行消费。
RabbitMQ中只能通过使用死信队列变相实现延迟消息,或者加装一个插件来支持延迟消息。 Kafka则不太好实现延迟消息。
生产者核心代码:
msg.setDelayTimeLevel(3);
只要给消息设定一个延迟级别
RocketMQ给消息定制了18个默认的延迟级别,分别对应18个不同的预设好的延迟时间。
messageDelayLevel:1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
实现思路:
延迟消息的难点其实是性能,需要不断进行定时轮询。RocketMQ的实现方式是预设一个系统Topic,名字叫做SCHEDULE_TOPIC_XXXX。在这个Topic下,预设18个延迟队列。然后每次只针对这18个队列里的消息进行延迟操作,这样就不用一直扫描所有的消息了
注意:
这样预设延迟时间其实是不太灵活的。5.x版本已经支持预设一个具体的时间戳,按秒的精度进行定时发送
批量消息
应用场景:
生产者要发送的消息比较多时,可以将多条消息合并成一个批量消息,一次性发送出去。这样可以减少网络IO,提升消息发送的吞吐量。
生产者核心代码:
List<Message> messages = new ArrayList<>();
messages.add(new Message(topic, "Tag", "OrderID001", "Hello world 0".getBytes()));
messages.add(new Message(topic, "Tag", "OrderID002", "Hello world 1".getBytes()));
messages.add(new Message(topic, "Tag", "OrderID003", "Hello world 2".getBytes()));
producer.send(messages);
注意点:
批量消息使用简单,但是要注意RocketMQ做了限制。同一批消息的Topic必须相同,另外,不支持延迟消息。
还有批量消息的大小不要超过1M,如果太大就需要自行分割。