生产者
生产者生产的过程: producer会在接入nameserver时,获取所有topic和队列的信息,然后在每次发送时,根据负载均衡在topic中选择发送的队列。
生产者的消息是发送给具体的queue,而消费者消费是从具体的queue消费
在 RocketMQ 的集群模式下,生产者发送的一条消息只会发送到一个特定的队列(Queue),但是一个queue可能有多个生产者,但是一定只有一个consumer(集群模式)
producer发送消息是按照topic发送的,通过负载均衡选择queue,然后发送到queue对应的broker
对于生产者来说,RocketMQ 中的队列(Queue)是通过 Broker、Topic 和 Queue 共同来区分的
负载均衡
- 轮询 (Round Robin):轮询选择队列,依次发送到不同的队列。
- 随机 (Random):随机选择一个队列进行发送。
- 自定义策略:可以实现 MessageQueueSelector 接口,定义自己的负载均衡策略。例如根据消息的某个属性(如订单ID)选择队列,确保有序性。
生产者组
创建时需要填写生产组的名称,生产者组是指同一类Producer的集合,这类Producer发送同一类消息且发送逻辑一致。
Tag
Topic 是一级分类,而 Tag 可以理解为是二级分类。
Topic:消息主题,通过 Topic 对不同的业务消息进行分类。
Tag:消息标签,用来进一步区分某个 Topic 下的消息分类,消息从生产者发出即带上的属性。
什么时候该用 Topic,什么时候该用 Tag?
可以从以下几个方面进行判断:
消息类型是否一致:如普通消息、事务消息、定时(延时)消息、顺序消息,不同的消息类型使用不同的 Topic,无法通过 Tag 进行区分。
业务是否相关联:没有直接关联的消息,如淘宝交易消息,京东物流消息使用不同的 Topic 进行区分;而同样是天猫交易消息,电器类订单、女装类订单、化妆品类订单的消息可以用 Tag 进行区分。
消息优先级是否一致:如同样是物流消息,盒马必须小时内送达,天猫超市 24 小时内送达,淘宝物流则相对会慢一些,不同优先级的消息用不同的 Topic 进行区分。
消息量级是否相当:有些业务消息虽然量小但是实时性要求高,如果跟某些万亿量级的消息使用同一个 Topic,则有可能会因为过长的等待时间而“饿死”,此时需要将不同量级的消息进行拆分,使用不同的 Topic。
Tags的使用
一个应用尽可能用一个Topic,而消息子类型则可以用tags来标识。tags可以由应用自由设置,只有生产者在发送消息设置了tags,消费方在订阅消息时才可以利用tags通过broker做消息过滤:message.setTags(“TagA”)。
Keys
每个消息在业务层面的唯一标识码要设置到keys字段,方便将来定位消息丢失问题。
消息的key是消息的一个用户自定义属性,主要用于消息的检索和查询。它是一个字符串,用户可以在发送消息时设置这个key。
Apache RocketMQ 每个消息可以在业务层面的设置唯一标识码 keys 字段,方便将来定位消息丢失问题。 Broker 端会为每个消息创建索引(哈希索引),应用可以通过 topic、key 来查询这条消息内容,以及消息被谁消费。由于是哈希索引,请务必保证 key 尽可能唯一,这样可以避免潜在的哈希冲突。
// 订单Id
String orderId = "20034568923546";
message.setKeys(orderId);
队列
为了支持高并发和水平扩展,需要对 Topic 进行分区,在 RocketMQ 中这被称为队列,一个 Topic 可能有多个队列,并且可能分布在不同的 Broker 上。
一般来说一条消息,如果没有重复发送(比如因为服务端没有响应而进行重试),则只会存在在 Topic 的其中一个队列中,消息在队列中按照先进先出的原则存储,每条消息会有自己的位点,每个队列会统计当前消息的总条数,这个称为最大位点 MaxOffset;队列的起始位置对应的位置叫做起始位点 MinOffset。队列可以提升消息发送和消费的并发度。
消息
在不同的场景中,需要使用不同的消息进行发送。比如在电商交易中超时未支付关闭订单的场景,在订单创建时会发送一条延时消息。这条消息将会在 30 分钟以后投递给消费者,消费者收到此消息后需要判断对应的订单是否已完成支付。如支付未完成,则关闭订单。如已完成支付则忽略,此时就需要用到延迟消息;电商场景中,业务上要求同一订单的消息保持严格顺序,此时就要用到顺序消息。在日志处理场景中,可以接受的比较大的发送延迟,但对吞吐量的要求很高,希望每秒能处理百万条日志,此时可以使用批量消息。在银行扣款的场景中,要保持上游的扣款操作和下游的短信通知保持一致,此时就要使用事务消息.
Apache RocketMQ可用于以三种方式发送消息:同步、异步和单向传输。前两种消息类型是可靠的,因为无论它们是否成功发送都有响应。
同步消息
异步模式:返回结果会调用回调函数
/ 异步发送消息, 发送结果通过callback返回给客户端
producer.send(msg, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
System.out.printf("%-10d OK %s %n", index,
sendResult.getMsgId());
countDownLatch.countDown();
}
@Override
public void onException(Throwable e) {
System.out.printf("%-10d Exception %s %n", index, e);
e.printStackTrace();
countDownLatch.countDown();
}
});
单向模式:单向模式调用sendOneway,不会对返回结果有任何等待和处理。
适用于某些耗时非常短,但对可靠性要求并不高的场景,例如日志收集。
// 由于在oneway方式发送消息时没有请求应答处理,如果出现消息发送失败,则会因为没有重试而导致数据丢失。若数据不可丢,建议选用可靠同步或可靠异步发送方式。
producer.sendOneway(msg);
普通消息
顺序消息
对于一个指定的Topic,消息严格按照先进先出(FIFO)的原则进行消息发布和消费,即先发布的消息先消费,后发布的消息后消费。
RocketMQ 通过生产者和服务端的协议保障单个生产者串行地发送消息,并按序存储和持久化。即顺序消息使用的前提是单个生产者,串行发送消息。
ShardingKey(分片键)用来消息的负载均衡,rocketmq没有具体的类来实现,但是在producer发送消息时会使用这个概念
当消息的key用于消息路由和负载均衡时,它可以充当Sharding Key的角色。
在 RocketMQ 中,Sharding Key 是用于在消息发送时保证消息有序性的机制。使用 Sharding Key,可以确保具有相同 Sharding Key 的消息会被发送到同一个队列中,从而保证这些消息的顺序性。
使用shardingkey的例子:
// 发送消息,并使用自定义的MessageQueueSelector来选择队列
SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
Integer id = (Integer) arg; // 传递的订单ID
int index = id % mqs.size(); // 选择队列的索引
return mqs.get(index); // 返回选择的队列
}
}, orderId); // 将orderId作为arg传递给MessageQueueSelector
延时消息
延迟消息发送是指消息发送到Apache RocketMQ后,并不期望立马投递这条消息,而是延迟一定时间后才投递到Consumer进行消费。
Message message = new Message("TestTopic", ("Hello scheduled message " + i).getBytes());
// This message will be delivered to consumer 10 seconds later.
message.setDelayTimeLevel(3);
// Send the message
producer.send(message);
批量消息
这里调用非常简单,将消息打包成 Collection msgs 传入方法中即可,需要注意的是批量消息的大小不能超过 1MiB(否则需要自行分割),其次同一批 batch 中 topic 必须相同。
producer.start();
//If you just send messages of no more than 1MiB at a time, it is easy to use batch
//Messages of the same batch should have: same topic, same waitStoreMsgOK and no schedule support
String topic = "BatchTest";
List<Message> messages = new ArrayList<>();
messages.add(new Message(topic, "Tag", "OrderID001", "Hello world 0".getBytes()));
messages.add(new Message(topic, "Tag", "OrderID002", "Hello world 1".getBytes()));
messages.add(new Message(topic, "Tag", "OrderID003", "Hello world 2".getBytes()));
producer.send(messages);
事务消息
消费者
消费者组订阅一个或多个主题,组内的消费者实例根据订阅的主题和过滤条件来消费消息。
消费者组订阅一个或多个Topic时,实际上是订阅了这些Topic下的所有队列。
集群模式(Clustering Mode)和广播模式(Broadcasting Mode)是针对消费者而言的,和生产者无关
在集群消费模式下,一个消息队列通常会被分配给一个消费者实例进行消费,确保消息的有序性和避免重复消费。
在广播消费模式下,每个消息队列中的消息会被所有订阅该 Topic 的消费者实例消费一次,每个消息被所有消费者实例处理一次。
消费者消费的过程:消费者实例启动后会向NameServer注册,获取该Topic的路由信息,包括所有Broker的地址和Queue信息,RocketMQ通过负载均衡算法将Topic下的Queue均衡分配给消费者组内的各个消费者实例,消费者实例根据分配到的Queue,从Broker中主动拉取消息进行消费。
默认的分配策略是平均分配,这也是最常见的策略。平均分配策略下消费组内的消费者会按照类似分页的策略均摊消费。
在集群模式下,消费位点是由客户端提给交服务端保存的,在广播模式下,消费位点是由客户端自己保存的。
Rocketmq的消费者的消费模式:push,pull和长轮询
push
public class Consumer {
public static void main(String[] args) throws InterruptedException, MQClientException {
// 初始化consumer,并设置consumer group name
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name");
// 设置NameServer地址
consumer.setNamesrvAddr("localhost:9876");
//订阅一个或多个topic,并指定tag过滤条件,这里指定*表示接收所有tag的消息
consumer.subscribe("TopicTest", "*");
//注册回调接口来处理从Broker中收到的消息
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
// 返回消息消费状态,ConsumeConcurrentlyStatus.CONSUME_SUCCESS为消费成功
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
// 启动Consumer
consumer.start();
System.out.printf("Consumer Started.%n");
}
}
消息重试
若Consumer消费某条消息失败,则RocketMQ会在重试间隔时间后,将消息重新投递给Consumer消费,若达到最大重试次数后消息还没有成功被消费,则消息将被投递至死信队列
消息重试只针对集群消费模式生效;广播消费模式不提供失败重试特性,即消费失败后,失败消息不再重试,继续消费新的消息
consumer.setMaxReconsumeTimes(10);//最大重试次数
consumer.setSuspendCurrentQueueTimeMillis(5000);//重试间隔
死信队列
当一条消息初次消费失败,RocketMQ会自动进行消息重试,达到最大重试次数后,若消费依然失败,则表明消费者在正常情况下无法正确地消费该消息。此时,该消息不会立刻被丢弃,而是将其发送到该消费者对应的特殊队列中,这类消息称为死信消息(Dead-Letter Message),存储死信消息的特殊队列称为死信队列(Dead-Letter Queue),死信队列是死信Topic下分区数唯一的单独队列。如果产生了死信消息,那对应的ConsumerGroup的死信Topic名称为%DLQ%ConsumerGroupName,死信队列的消息将不会再被消费。可以利用RocketMQ Admin工具或者RocketMQ Dashboard上查询到对应死信消息的信息。