前言
是时候总结下消息队列相关知识点啦!我搓搓搓搓
本文包括消息队列基础概念介绍,常见解决方案包括消息可靠性、消息有序、消息堆积、重复消费、事务消息
参考资料:
Kafka常见问题总结 | JavaGuide
RocketMQ常见问题总结 | JavaGuide
【原创】消息队列的消费语义和投递语义 - 孤独烟 - 博客园
Kafka事务是怎么实现的?Kafka事务消息原理详解(文末送书)-CSDN博客
Kafka/RocketMQ事务消息对比 - 简书
1. 什么是消息队列
消息队列可以看作是存放消息的容器,常用于分布式系统中,在消息的生产者和消费者之间引入一个缓冲区
它的作用主要由三点概括,分别是
- 解耦:生产者和消费者之间没有直接的调用关系,新增或者修改模块对其它模块的影响较小
- 异步:生产者无需等待消息消费完成就即刻返回,减少用户请求的响应时间
- 削峰:在高并发场景,消息队列可以缓存消息,平滑高峰流量,防止系统过载
2. 消息队列有什么常见应用场景
日志处理:将日志发送到消息队列,由日志处理系统进行消费,进行实时分析和监控
电商系统:订单的创建、支付、发货等步骤可以由消息队列进行异步处理和解耦
任务调度:任务调度系统可以将任务发布在消息队列中,由多个不同的节点并行处理
数据同步:消息队列可以将变更的数据同步到不同的存储系统中
3. 常见的消息队列如何选型
常见的消息队列有几种:RabbitMQ、Kafka、RocketMQ
从延时来看,RabbitMQ是基于Erlang开发的,延时最低,能达到微秒级别,其它都是毫秒级别
从吞吐量来看,Kafka和RocketMQ吞吐量最高,达十万、百万级,RabbitMQ只有万级
从定制开发难度来看,RocketMQ是由Java开发的,对于大型公司可以有人手进行定制化开发,而能对RabbitMQ进行定制化开发的较少,但是RabbitMQ社区活跃,能够解决开发上的bug
从业务场景来看,RocketMQ基本上经受住了大型企业场景的考验比如双十一,Kafka适用于大数据场景实时分析和日志采集等业务
总结,如果是数据量没有那么大,可以选择功能完备、社区活跃的RabbitMQ,如果有大数据量的金融互联网场景,对可靠性和吞吐量要求都很高,推荐选择RocketMQ,如果是日志采集和大数据实时分析的场景,则推荐使用Kafka
4. 消息队列有什么模型
JMS是一种Java消息服务的API规范,它定义了两种消息模型:队列模型和发布/订阅模型
队列模型又叫点对点模型,指的是生产者往队列里发送消息,一个消息只能被一个消费者消费,一个队列的多个消费者之间是竞争关系
发布/订阅模型又叫主题模型,指生产者往Topic发送消息,所有订阅了该Topic的消费者都能消费到
Kafka和RocketMQ就是基于发布/订阅模型实现的
AMQP是另一种消息服务的协议,它定义了几种消息模型,通过引入exchange完成对消息的路由,支持direct exchange、fanout exchange、topic exchange
RabbitMQ就是基于AMQP协议实现的
5. 如何保证消息不丢失
消息不丢失需要从生产消息、存储消息、消费消息三个方面去保证
5.1 生产消息
在生产者侧,需要处理好生产消息的异常处理,如果写入失败需要有重试、告警等机制
5.2 存储消息
在存储消息上,Broker需要在刷盘之后再给生产者应答
在RocketMQ中,可以配置同步刷盘,主从架构下配置同步复制(也叫同步双写)
在Kafka中,默认acks=1,表示消息被leader副本接受后就会返回成功,设置acks=all,那么在所有的ISR(In-Sync Replicas)接受后才算成功,相关的其它参数有:
replication.factor:分区的副本数,可设置为>=3
min.insync.replicas:消息发送成功需要至少写入的副本数,设置为>1
unclean.leader.election.enable:leader副本故障后的选举机制,false表示不从非ISR节点中选leader,设置为false
5.3 消费消息
消费者在真正完成业务逻辑时再返回成功给Broker
在RocketMQ中,完成消费逻辑后再返回ConsumeConcurrentlyStatus.CONSUME_SUCCESS,否则返回ConsumeConcurrentlyStatus.RECONSUME_LATER
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;
public class RocketMQConsumer {
public static void main(String[] args) throws Exception {
// 实例化消费者,并指定消费者组名
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("example_consumer_group");
// 指定NameServer地址
consumer.setNamesrvAddr("localhost:9876");
// 订阅一个或多个Topic,以及Tag来过滤需要消费的消息
consumer.subscribe("TopicTest", "*");
// 注册消息监听器
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
for (MessageExt msg : msgs) {
try {
// 处理消息的业务逻辑
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), new String(msg.getBody()));
} catch (Exception e) {
// 处理异常,这里可以根据业务需求进行重试或其他处理
e.printStackTrace();
// 消费失败,返回RECONSUME_LATER,消息会重新投递
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
}
// 消费成功,返回CONSUME_SUCCESS
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
// 启动消费者实例
consumer.start();
System.out.printf("Consumer Started.%n");
}
}
在Kafka中,需要设置enable.auto.commit=false,然后在代码中手动提交offset
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import java.time.Duration;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
public class KafkaConsumerExample {
public static void main(String[] args) {
// 配置消费者属性
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test-group");
props.put("enable.auto.commit", "false");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
// 创建Kafka消费者实例
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
// 订阅主题
String topic = "test-topic";
consumer.subscribe(Collections.singletonList(topic));
try {
while (true) {
// 拉取消息
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
Map<TopicPartition, OffsetAndMetadata> offsetsToCommit = new HashMap<>();
for (ConsumerRecord<String, String> record : records) {
try {
// 处理消息的业务逻辑
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
// 记录需要提交的偏移量
TopicPartition partition = new TopicPartition(record.topic(), record.partition());
OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(record.offset() + 1);
offsetsToCommit.put(partition, offsetAndMetadata);
} catch (Exception e) {
// 处理异常,这里可以根据业务需求进行重试或其他处理
e.printStackTrace();
}
}
// 手动同步提交偏移量
if (!offsetsToCommit.isEmpty()) {
consumer.commitSync(offsetsToCommit);
}
}
} finally {
// 关闭消费者
consumer.close();
}
}
}
6. 如何解决重复消费
解决重复消费比较粗鲁的办法就是保证一条消息只能被消费一次,但是如果消费者挂了未提交offset,那么为了保证消息可靠性,消息就会被重复消费
所以重点是要让消费者的处理具有幂等性,即多次处理同一条消息得到的结果是一样的
具体做法有引入全局唯一的ID,对已经消费过的消息进行去重
7. 如何保证消息的有序性
简单粗暴的方法是:单一生产者和单一消费者,即消息只由单个生产者发往单个队列,再由单个消费者消费,存在性能瓶颈
在支持分区键(Partition Key)的系统比如Kafka和RocketMQ中,可以在发送消息的时候指定key
在Kafka中在发送消息时指定key,如果需要严格顺序的话,可以指定max.in.flight.requests.per.connection=1,该参数表示在得到响应前可以发送的消息数,参数值越大吞吐量越大,设置为1会降低吞吐量
在RocketMQ中可以通过继承MessageQueueSelector实现
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.MessageQueueSelector;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageQueue;
import java.util.List;
public class RocketMQPartitionKeyProducer {
public static void main(String[] args) throws MQClientException {
DefaultMQProducer producer = new DefaultMQProducer("partition_key_producer_group");
producer.setNamesrvAddr("localhost:9876");
producer.start();
String topic = "partition_key_topic";
// 模拟不同的 Partition Key
String[] partitionKeys = {"key1", "key2", "key1", "key2"};
for (int i = 0; i < partitionKeys.length; i++) {
String partitionKey = partitionKeys[i];
Message msg = new Message(topic, "TAG", ("Message " + i).getBytes());
try {
SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
String key = (String) arg;
int index = Math.abs(key.hashCode()) % mqs.size();
return mqs.get(index);
}
}, partitionKey);
System.out.printf("%s%n", sendResult);
} catch (Exception e) {
e.printStackTrace();
}
}
producer.shutdown();
}
}
8. 推拉模式有什么区别,怎么选择
推模式和拉模式的选择一般出现在Broker和消费者之间,而生产者和Broker之间一般使用推模式而不是Broker去拉取消息,否则大量的生产者还需要去维护消息可靠性
推模式的好处是Broker在接收到消息后能够迅速地推送给消费者进行消费,适用于对实时性要求比较高的场景,坏处是消费者消费能力不够强时容易过载,需要Broker去维护消费者的状态去调整推送速率,对Broker要求比较高
拉模式的好处是对Broker没什么要求,可以稳定地控制消息消费的速率,防止消费者过载,并且适合批量地拉取消息,坏处是消息容易延迟,或者是长时间轮询没有消息可消费
RocketMQ和Kafka都使用了拉模式,它们都利用长轮询对拉模式做了优化,也就是在消费者拉取的时候把请求给hold住,然后等待消息到来再把消息发送出去
9. 如何解决消息堆积
消息堆积问题有两个原因,一个是生产太快,一个是消费太慢
生产太快的问题可以对生产端做限流和降级,以及只保留关键消息,丢弃非关键消息,或者延迟处理
消费太慢的问题需要去定位bug,提升消费者的消费能力,包括优化消费的逻辑,增加消费者的线程数量,对消费者进行垂直扩容(增加单个消费者的CPU/内存),或者水平扩容(增加消费者副本数)
10. Kafka的事务消息是什么,怎么实现
Kafka的事务消息保证一系列消息要么完全发送成功,要么完全发送失败,即一系列消息操作的原子性
实现的方式是,Kafka使用事务协调器负责事务的启动、提交和终止,为确保精确一次(exactly once)的投递语义,会为每个Producer维护一个pid,为<pid, topic, partition>维护一个递增的seq从而保证消息按顺序被正确接收,在消费者侧,需要设置read_committed=true保证只消费已提交的消息
具体实践如下
生产者配置acks=all确认消息成功发送、transaction_id设置事务id、enable.idempotence=true确保投递精确一次
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
public class KafkaTransactionalProducer {
public static void main(String[] args) {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.ACKS_CONFIG, "all");
props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "my-transactional-id");
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
producer.initTransactions();
try {
producer.beginTransaction();
ProducerRecord<String, String> record = new ProducerRecord<>("test_topic", "key", "value");
producer.send(record);
producer.commitTransaction();
} catch (Exception e) {
producer.abortTransaction();
e.printStackTrace();
} finally {
producer.close();
}
}
}
消费者配置read_committed=true确保只消费已提交的消息,auto.offset.reset=earliest确保启动时从最早的消息开始消费
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
public class KafkaTransactionalConsumer {
public static void main(String[] args) {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "test_group");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("test_topic"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (var record : records) {
System.out.printf("Received message: key = %s, value = %s%n", record.key(), record.value());
}
}
}
}
11. RocketMQ的事务消息是什么
RocketMQ的事务消息和Kafka的不同,Kafka实现的是多条消息的原子性,RocketMQ实现的是分布式事务,也就是消息存储和本地事务处在同一个事务
RocketMQ是利用事务消息+事务回查机制实现分布式事务的
- 生产者发送“半消息”到MQ
- MQ响应
- 生产者执行本地事务
- 生产者根据本地事务执行状态向MQ发送commit/rollback,若是commit,半消息变为正式消息,消费者可见,若是rollback,则丢弃该半消息
- 如果因为网络抖动等原因,MQ没收到第四步,则通知生产者事务回查
- 生产者查询事务状态
- 生产者根据回查的事务状态,通知MQ进行commit/rollback