目录
1. 简述消息队列
2. 常见的的消息队列中间件
2.1 Kafka概念及结构
基本概念名词解释
为什么要设置多个副本?
2.2 Kafka的工作流程
消息生产流程
消息存储流程
消息消费流程
2.3 面试必问题
如何保证消息不丢失?
消费者如何保证消息不重复?
Kafka的事务特性?
Kafka集群一台机器宕机?
2.4 中间件对比
JMQ和Kafka的不同?
京东为什么不使用Kafka而是使用JMQ?
3. Zookeeper选主过程
3.1 集群启动时选举过程
3.2 实例节点宕机恢复选举过程
1. 简述消息队列
消息队列一般称为MQ,底层的数据结构也就是先进先出的队列。通过MQ发送消息可以用于实现系统的异步和解耦,但MQ的本质作用是用于通讯。在传统的消息中间件设计里,有两种消息模型:
- 队列模型:一组消费者和生产者通过一个队列关联起来,队列中消息有序,但有且仅有一个消费者会收到消息。一旦某条消息被一个消费者消费成功后,这个消息就会在队列中被删除。
- 订阅者-消费者模型:一组消费者和一组生产者通过一个主题topic关联起来,发布者将消息发布到某个主题中。一个消费者订阅主题,该消息就被发送给这个订阅者,多个订阅者订阅该主题,消息就会被发送给每一个。
上面从消息队列直接描述到了消息中间件,通过消息中间件可以在多个微服务之间进行可靠的异步通信,从而降低微服务的耦合度,提升服务性能,提高系统的可用性和更灵活的系统集成。
简单罗列使用消息中间件的好处:
- 降低应用耦合:使用消息中间件可大大减小微服务之间的调用和依赖
- 改善应用性能:服务器将消息发送给中间件就可以继续执行逻辑,不依赖消息消费结果
- 提升应用的可扩展性:一个topic可以有多个订阅者,新增消费者对于生产者来说无感知无作用,所以在复杂的业务系统如电商系统中,更方便系统的扩展
- 更灵活的系统集成:应用间除了接口集成外还可以通过消息中间件集成,使系统可以跨平台,同时还降低了对网络协议处理的复杂性。
2. 常见的的消息队列中间件
个人工作经验原因使用的消息队列中间件都是公司内部自研消息中间件,市场上常见的中间件并无使用经验只有简单的了解原理,由于自研中间件是参考Kafka去做的设计,因此会先主要去梳理一下Kafka的架构设计,然后再以对比自研中间件(对外开放部分的内容)。
2.1 Kafka概念及结构
找到一个画的很清晰的结构图如下(图来源于网络,自己复习整理懒得画图了)
基本概念名词解释
- producer:消息的生产者
- consumer:消息的消费者
- kafka cluster:
- Broker:是kafka的实例,每个服务器上有一个或者多个Broker。假设一台机器一个Broker实例,每个Broker都有一个不重复的编号
- Topic:消息的主题,可以理解为消息的分类。kafka的数据就保存在topic中。在每个Broker上都可以创建多个Topic。
- Partition:Topic的分区。每个Topic可以有多个分区。分区的作用是做负载,提高kafka的吞吐量。同一个Topic的每个分区中的数据是不重复的,partition的表现形式是一个一个文件夹。每个partition只能被一个consumer消费。
- Replication:每一个分区有多个副本,当Leader故障时,会选择一个follower成为leader。kafka的最大默认副本数是10,且副本的数量不能 > broker的数量。每个partition的副本和leader绝对不在同一台机器上。
- offset(偏移量) :分区中的每条消息都有唯一的编号,用来唯一标识这一条消息(message)
- Leader、Follower(副本) :每个分区都可以设置自己对应的副本(replication-factor参数),有一个主副本(leader)、多个从副本(follower)
- ISR(副本集):动态集合,保存正在同步的副本集,是与leader同步的副本。如果某个副本不能正常同步数据或落后的数据比较多,会从副本集中把节点中剔除,当追赶上来了在重新加入。kafka默认的follower副本能够落后leader副本的最长时间间隔是10S
为什么要设置多个副本?
单一职责:leader负责和生产消费者交互,follower负责副本拷贝。副本是为了保证消息存储安全性,当其中一个leader挂掉,则会从follower中选举出新的leader,提高了容灾能力,但是副本也会占用存储空间。
2.2 Kafka的工作流程
Kafka的工作流程可以拆分为三部分,分别是生产者生产消息( 生产者将消息发送到Kafka集群,并选择目标分区)、消息中间件接收消息(Broker将消息持久化到磁盘,并通过副本机制保证数据的高可用性和容错)、消费者订阅消费消息(消费者从Kafka集群拉取消息,并处理消息。消费者定期提交消费进度,以确保消息的准确处理和故障恢复)。
消息生产流程
- 创建生产者实例: 生产者首先需要创建一个KafkaProducer实例,并配置必要的参数,如Kafka Broker的地址、序列化器(Serializer)等。
- 构建消息: 生产者构建消息记录(ProducerRecord),包括目标主题、消息键(Key)和消息值(Value)。
- 发送消息: 生产者调用send()方法将消息发送到Kafka集群。生产者可以选择同步或异步发送消息。
- 选择分区: Kafka根据消息键和分区策略(如轮询或哈希)选择目标分区。如果消息键为空,Kafka会使用轮询策略将消息均匀分配到各个分区。
- 消息序列化: 生产者将消息键和消息值序列化为字节数组,以便在网络上传输和存储。
- 消息发送: 生产者将序列化后的消息发送到目标分区的Leader Broker。
- 消息确认: Leader Broker接收到消息后,将其写入本地日志文件,并向生产者发送确认(ACK)
消息存储流程
消息存储是Kafka工作流程的第二步,Broker负责将消息持久化到磁盘,并提供高可用性和容错性。以下是消息存储流程的详细步骤:
- 接收消息: Leader Broker接收到生产者发送的消息后,将其写入本地日志文件。
- 副本同步: Leader Broker将消息同步到从副本(Follower)Broker。从副本将消息写入本地日志文件,并向Leader发送确认。
- 消息提交: 当Leader Broker收到足够数量的从副本确认后,将消息标记为已提交(Committed)。已提交的消息对消费者可见。
- 日志管理: Kafka定期清理过期的日志段(Log Segment),以释放磁盘空间。Kafka还支持日志压缩(Log Compaction),用于保留每个键的最新消息。
消息消费流程
消息消费是Kafka工作流程的第三步,消费者(Consumer)负责从Kafka的主题中订阅并消费消息。以下是消息消费流程的详细步骤:
- 创建消费者实例: 消费者首先需要创建一个KafkaConsumer实例,并配置必要的参数,如Kafka Broker的地址、反序列化器(Deserializer)等。
- 订阅主题: 消费者调用subscribe()方法订阅一个或多个主题。消费者可以选择单独消费消息,也可以组成消费组(Consumer Group)共同消费消息。
- 拉取消息: 消费者调用poll()方法从Kafka集群拉取消息。消费者可以根据需要设置拉取间隔和拉取数量。
- 消息反序列化: 消费者将消息键和消息值反序列化为原始数据类型,以便进行处理。
- 处理消息: 消费者处理拉取到的消息,执行必要的业务逻辑。
- 提交偏移量: 消费者定期提交消费进度(偏移量),以确保消息的准确处理和故障恢复。消费者可以选择自动提交或手动提交偏移量。
2.3 面试必问题
如何保证消息不丢失?
保证消息不丢失也从三个角度去考虑,首先在生产者发送消息到中间件的过程中保证消息不丢失,是基于ACK应答机制,在生产者向队列写入数据时可以配置三种策略的参数,分别是0、1、all,用于保证中间件收到了消息。
- 0代表producer往集群发送数据不需要等到集群的返回,不确保消息发送成功,安全性最低但是效率最高;
- 1代表producer往集群发送数据只要leader应答就可以发送下一条,只确保leader发送成功;
- all代表producer往集群发送数据需要所有的follower都完成从leader的同步才会发送下一条,确保leader发送成功和所有的副本都完成备份。安全性最高,但是效率最低。
消息成功到达消息队列中间件后要考虑中间件对消息的保存,这块有两个方面的机制,一个是消息持久化过程,另一个是容错机制。确保即使在节点失败的情况下,也能保证消息的可靠传递。
其一关于消息的存储:Kafka通过将消息写入磁盘上的分区日志文件来实现持久化存储。每个消息都会被追加到日志文件的末尾,确保消息在写入后不会被修改,从而保证了消息的持久性。
其二关于容错机制:Kafka 可以配置多个副本来保护数据,一个副本作为主副本(Leader),其余的作为从副本(Follower)。只有主副本才能被客户端使用,如果主副本失效,则会自动进行副本之间的选举,保证服务的可用性。(此处选举过程参考zookeeper的选主流程,放在下文单独说)
然后就到了去如何保证消费者消费消息过程中没有消息丢失的问题,消费者消费消息有两种模式:pull和push,分别是指消费者主动去拉取(pull)消息或服务端推送(push)消息给消费者。当然啊,Kafka是采取的pull模式,毕竟pull模式更灵活,能实现削峰填谷的目的。那么在pull模式下Kafka怎么保证消息不丢失的呢?
Kafka通过Offset来跟踪消费者已经读取的数据位置。消费者需要定期提交Offset到Kafka,以便在出现故障时可以从正确的位置恢复读取。同时,Kafka也提供了重置Offset的机制,以便在必要时重新消费之前的数据。这种机制确保了即使在消费者崩溃或断开连接后,也能从正确的位置继续消费消息,从而避免了消息的丢失(另外在消费者端编码也要注意,不要生吞异常或者在一些场景下进行消息丢弃,最好在消费者的系统里进行日志数据的记录,再不济也要进行异常数据的记录,防止消息的丢失,要不然代码漏洞天王老子来了中间件也搞不定啊)。
消费者如何保证消息不重复?
参考上述流程一样的回答思路,首先在生产者生产消息发送到消息队列的过程中,采用了事务,就是为了减少IO次数提升发送消息的效率的时候,可能会批量发送消息到中间件broker,在这个过程中采用事务保证了要么全部成功要么全部失败,避免了部分成功部分失败的时候,再次重试又进行重复的发送。其次正常的业务场景根据业务数据可能会指定某个字段作为消息的key,这样不论是在生产过程还是消费过程中根据key就可以过滤掉重复数据。
然后是在消费过程中,通过offset标记消费位置,一个消费者组记录一个位置状态,这样避免消息被重复消费。
Kafka的事务特性?
Kafka为了支持事务特性,引入一个新的组件:Transaction Coordinator。主要负责分配pid,记录事务状态等操作。
下面时Kafka开启一个事务到提交一个事务的流程图:
- 原子写:Kafka的事务特性本质上是支持了Kafka跨分区和Topic的原子写操作。在同一个事务中的消息要么同时写入成功,要么同时写入失败。我们知道,Kafka中的Offset信息存储在一个名为_consumed_offsets的Topic中,因此read-process-write模式,除了向目标Topic写入消息,还会向_consumed_offsets中写入已经消费的Offsets数据。因此read-process-write本质上就是跨分区和Topic的原子写操作。Kafka的事务特性就是要确保跨分区的多个写操作的原子性。
- 拒绝僵尸实例:在分布式系统中,一个instance的宕机或失联,集群往往会自动启动一个新的实例来代替它的工作。此时若原实例恢复了,那么集群中就产生了两个具有相同职责的实例,此时前一个instance就被称为“僵尸实例(Zombie Instance)。在Kafka中,两个相同的producer同时处理消息并生产出重复的消息(read-process-write模式),这样就严重违反了Exactly Once Processing的语义。这就是僵尸实例问题。Kafka事务特性通过transaction-id属性来解决僵尸实例问题。所有具有相同transaction-id的Producer都会被分配相同的pid,同时每一个Producer还会被分配一个递增的epoch。Kafka收到事务提交请求时,如果检查当前事务提交者的epoch不是最新的,那么就会拒绝该Producer的请求。从而达成拒绝僵尸实例的目标。
- 读事务消息:为了保证事务特性,Consumer如果设置了isolation.level = read_committed,那么它只会读取已经提交了的消息。在Producer成功提交事务后,Kafka会将所有该事务中的消息的Transaction Marker从uncommitted标记为committed状态,从而所有的Consumer都能够消费。
Kafka集群一台机器宕机?
当Kafka集群中的一台机器宕机时,集群的行为取决于这台机器的角色(Leader或Follower)以及宕机的严重性。
- 如果宕机的是Follower,那么在宕机期间,该Partition的复制将会暂停,直到Follower恢复或者重新选举一个新的Leader。
- 如果宕机的是Leader,那么所有该Partition的读写操作都会受到影响,直到一个新的Leader被选举并同步了数据。
解决方案:
- 监控: 使用Kafka提供的监控工具,如JMX或Kafka Manager,来及时发现机器故障。
- 自动故障转移: Kafka的Producer和Consumer客户端都配置了自动故障转移机制,可以在Zookeeper中注册Watcher监控Broker状态。
- 手动干预: 如果需要,可以手动干预,比如重启宕机的Broker或者将其上的Partition重新分配到其他Broker。
- 操作示例:重启宕机的Broker: 如果Broker修复后重新启动,它会尝试加入回Kafka集群。 手动分配Partition: 如果Broker无法恢复,可以手动将该Broker上的Partition移到其他Broker。
2.4 中间件对比
JMQ和Kafka的不同?
JMQ 在服务端存储设计上与 KAFKA 有一些相似的地方,借鉴了文件按照偏移位置管理、顺序追加等特点。不过 JMQ 的存储和消费模型有自己的特点:
- 消息存放:JMQ 每个存储系统只有一个分段存储的日志文件,不同类的消息按照服务端接收的顺序存放在日志文件中,通过索引程序按照不同的消息(主题)分类名异步创建各自的索引,方便消费端获取消息时快速定位该客户端所关心的(主题)分类消息。每个(主题)分类的索引划分了多个分区,同一(主题)分类的消息分配在多组服务器上的分区数是相同的。每个索引分区都是以链表按照时间序存放消息引用信息。
- 消费 :JMQ 也采用客户端主动拉取的方式,但是客户端不需要协调自己应该从哪个服务器上获取消息,服务端会控制好每个索引分区里对应的消息在同一时刻只会被一个客户端线程取走,直到客户端反馈消费成功或者消费异常,消费异常会被重试程序转移到重试服务中。如果客户端长时间没有反馈信息,达到了超时时间,那么锁定的消息可以被其他的线程拉取走。 由于服务端储存了每个消费者消费的位置,因此服务器可以随时把已经消费的消息移除走。
京东为什么不使用Kafka而是使用JMQ?
京东选择使用JMQ而不是Kafka,主要是因为JMQ能够更好地满足在线系统的消息队列需求,并且是京东自主研发的,能够更好地适应京东的业务需求和系统架构。
- 首先,JMQ作为京东自主研发的消息中间件,主要应用于在线系统的消息队列。这意味着JMQ的设计和优化都是基于京东的业务场景和系统需求进行的,能够更好地适应京东的特定环境和业务逻辑。相比之下,虽然Kafka是一个广泛使用的开源消息中间件,但它可能并不完全适合京东的在线系统需求。
- 其次,JMQ的使用场景包括变更缺货商品的库存信息时需要更新下单系统中的库存数、通知搜索系统修改商品索引、通知网页缓存系统刷新等。这些场景要求消息系统能够主动查询不确定状态消息,作为多个资源的事务协调器使用,确保系统之间状态的一致性。通过事务消息,JMQ能够很好地解决这类场景中的问题,避免因网络不可用等原因出现系统之间状态不一致的情况。
- 此外,JMQ在架构和功能上也有其独特之处。例如,JMQ采用了事务消息机制,当更新任何一个服务出现故障时,能够抛出异常,确保事务消息不会被提交或回滚,从而保持系统状态的一致性。同时,JMQ还支持流批一体计算,通过统一的逻辑模型来兼容实时和离线两端的元数据,实现流批统一计算,进一步提高了计算效率和业务逻辑的处理能力。
综上所述,JMQ能够更好地满足京东在线系统的消息队列需求,通过事务消息机制和流批一体计算等功能,提供更高的系统可用性和业务处理效率。同时,作为京东自主研发的消息中间件,JMQ能够更好地适应京东的业务需求和系统架构。
3. Zookeeper选主过程
Leader 选举是保证分布式数据一致性的关键所在。当Zookeeper集群中的一台服务器出现以下两种情况之一时,需要进入Leader选举。
- 服务器初始化启动。
- 服务器运行期间无法和Leader保持连接。
在此之前,先铺垫一下ZK集群中主机的状态:LOOKING、FOLLOWING、OBSERVING、LEADING
选举流程图大致如下:
3.1 集群启动时选举过程
ZooKeeper集群在第一次启动时的选主流程主要涉及以下几个步骤:
1、初始化阶段:当ZooKeeper集群中的一台服务器启动时,它无法单独进行Leader选举。但是,当第二台服务器启动时,两台机器可以相互通信,每台机器都试图找到Leader,从而进入Leader选举过程。
2、投票过程:
- 每个Server发出一个投票,包含所推举的服务器的myid(每个ZooKeeper节点都有一个唯一的myid文件)和ZXID(ZooKeeper的事务ID)。
- 初始情况下,比如Server1和Server2都会将自己作为Leader服务器来进行投票,Server1的投票为(1, 0),Server2的投票为(2, 0)。
3、接受和处理投票:
- 集群的每个服务器收到投票后,首先判断该投票的有效性,如检查是否是本轮投票、是否来自处于LOOKING状态的服务器(LOOKING状态表示服务器处于竞选状态)。
- 处理投票时,服务器根据PK规则进行比较:优先检查ZXID,ZXID较大的服务器优先作为Leader;如果ZXID相同,则比较myid,myid较大的服务器作为Leader服务器。
4、确定Leader:通过上述步骤,最终会有一台服务器被选为Leader,其他服务器将作为Follower与该Leader进行通信。 这个过程确保了ZooKeeper集群在启动时能够选择出一个Leader,从而保证集群的正常运行和服务的一致性。
3.2 实例节点宕机恢复选举过程
在Zookeeper中,如果一个实例(follower)节点宕机,并在宕机期间发生了领导者的选举,当宕机的节点恢复后,恢复过程如下:
- 宕机的节点恢复后,它会尝试再次连接到集群中的其他节点。
- 如果恢复的节点成功连接到集群中的一个有效节点(follower或leader),它会将自己的状态同步到该节点。
- 如果恢复的节点是领导者失效产生的投票中的一个,该节点将参与新一轮的领导者选举。
- 如果恢复的节点是一个跟随者,它将继续从当前的领导者接收数据更新,并在 Leader 宕机时参与新一轮的投票。
如果需要了解更多Zookeeper内部细节,建议查看Zookeeper的客户端API文档,这里说的比较简陋。