消息队列(Message Queue),是分布式系统中重要的组件,其通用的使用场景可以简单地描述为:当不需要立即获得结果,但是并发量又需要进行控制的时候,差不多就是需要使用消息队列的时候。
一、消息队列
1.什么是消息队列
消息(Message)是指在应用间传送的数据。消息可以非常简单,比如只包含文本字符串,也可以更复杂,可能包含嵌入对象。
消息队列(Message Queue)是一种应用间的通信方式,消息发送后可以立即返回,由消息系统来确保消息的可靠传递。消息发布者只管把消息发布到MQ 中而不用管谁来取,消息使用者只管从MQ中取消息而不管是谁发布的。这样发布者和使用者都不用知道对方的存在。
2.消息队列的特征
(1)存储
与依赖于使用套接字的基本TCP和UDP 协议的传统请求和响应系统不同,消息队列通常将消息存储在某种类型的缓冲区中,直到目标进程读取这些消息或将其从消息队列中显式移除为止。
(2)异步
与请求和响应系统不同,消息队列通过缓冲消息可以在应用程序中公开一定程度的异步性,允许源进程发送消息并在队列中累积消息,而目标进程则可以挑选消息进行处理。 这样,应用程序就可以在某些故障情况下运行,例如连接断断续续或源进程或目标进程故障。
路由:消息队列还可以提供路由功能,其中多个进程可以在同一队列中读取或写入消息,从而实现广播或单播通信模式。
3.为什么需要消息队列
(1)解耦
允许你独立的扩展或修改两边的处理过程,只要确保它们遵守同样的接口约束。
(2)冗余
消息队列把数据进行持久化直到它们已经被完全处理,通过这一方式规避了数据丢失风险。许多消息队列所采用的"插入-获取-删除"范式中,在把一个消息从队列中删除之前,需要你的处理系统明确的指出该消息已经被处理完毕,从而确保你的数据被安全的保存直到你使用完毕。
(3)扩展性
因为消息队列解耦了你的处理过程,所以增大消息入队和处理的频率是很容易的,只要另外增加处理过程即可。
(4)灵活性&峰值处理能力
在访问量剧增的情况下,应用仍然需要继续发挥作用,但是这样的突发流量并不常见。如果为以能处理这类峰值访问为标准来投入资源随时待命无疑是巨大的浪费。使用消息队列能够使关键组件顶住突发的访问压力,而不会因为突发的超负荷的请求而完全崩溃。
(5)可恢复性
系统的一部分组件失效时,不会影响到整个系统。消息队列降低了进程间的耦合度,所以即使一个处理消息的进程挂掉,加入队列中的消息仍然可以在系统恢复后被处理。
(6)顺序保证
在大多使用场景下,数据处理的顺序都很重要。大部分消息队列本来就是排序的,并且能保证数据会按照特定的顺序来处理。(Kafka 保证一个 Partition 内的消息的有序性)
(7)缓冲
有助于控制和优化数据流经过系统的速度,解决生产消息和消费消息的处理速度不一致的情况。
(8)异步通信
很多时候,用户不想也不需要立即处理消息。消息队列提供了异步处理机制,允许用户把一个消息放入队列,但并不立即处理它。想向队列中放入多少消息就放多少,然后在需要的时候再去处理它们。
二、kafka基础与入门
1.kafka基本概念
Kafka 是一种高吞吐量的分布式发布/订阅消息系统,这是官方对 kafka 的定义,这样说起来,可能不太好理解,这里简单举个例子:现在是个大数据时代,各种商业、社交、搜索、浏览都会产生大量的数据。那么如何快速收集这些数据,如何实时的分析这些数据,是一个必须要解决的问题。同时,这也形成了一个业务需求模型,即生产者生产(produce)各种数据,消费者(consume)消费(分析、处理)这些数据。那么面对这些需求,如何高效、稳定的完成数据的生产和消费呢?这就需要在生产者与消费者之间,建立一个通信的桥梁,这个桥梁就是消息系统。从微观层面来说,这种业务需求也可理解为不同的系统之间如何传递消息。
Kafka 是 Apache 组织下的一个开源系统,它的最大的特性就是可以实时的处理大量数据以满足各种需求场景:比如基于 hadoop 平台的数据分析、低时延的实时系统、storm/spark 流式处理引擎等。kafka现在已被多家大型公司作为多种类型的数据管道和消息系统使用。
2:kafka角色术语
kafka 的一些核心概念和角色
Broker:Kafka集群包含一个或多个服务器,每个服务器被称为broker(经纪人)
Topic:每条发布到 Kafka 集群的消息都有一个分类,这个类别被称为Topic(主题)
Producer:指消息的生产者,负责发布消息到 kafka broker。
Consumer:指消息的消费者,从 kafka broker 拉取数据,并消费这些已发布的消息。
Partition:Partition 是物理上的概念,每个Topic 包含一个或多个Partition,每个partition 都是一个有序的队列。partition 中的每条消息都会被分配一个有序的 id(offset)
Consumer Group:消费者组,可以给每个Consumer 指定消费组,若不指定消费者组,则属
于默认的 group。
Message:消息,通信的基本单位,每个producer 可以向一个 topic 发布一些消息。
3.kafka拓扑结构
一个典型的 Kafka 集群包含若干 Producer,若干 broker、若干 Consumer Group,以及一个
Zookeeper 集群。Kafka 通过 Zookeeper 管理集群配置,选举 leader,以及在 Consumer Group发生变化时进行 rebalance。Producer 使用 push 模式将消息发布到 broker,Consumer 使用 pull模式从 broker 订阅并消费消息。典型架构如下图所示:
从图中可以看出,典型的消息系统有生产者(Producer),存储系统(broker)和消费者(Consumer)组成,Kafka作为分布式的消息系统支持多个生产者和多个消费者,生产者可以将消息分布到集群中不同节点的不同 Partition 上,消费者也可以消费集群中多个节点上的多个Partition。在写消息时允许多个生产者写到同一个 Partition 中,但是读消息时一个 Partition只允许被一个消费组中的一个消费者所消费,而一个消费者可以消费多个Partition。也就是说同一个消费组下消费者对 Partition 是互斥的,而不同消费组之间是共享的
kafka 支持消息持久化存储,持久化数据保存在 kafka 的日志文件中,在生产者生产消息后,kafka不会直接把消息传递给消费者,而是先要在 broker 中进行存储,为了减少磁盘写入的次数,broker 会将消息暂时缓存起来,当消息的个数或尺寸、大小达到一定阀值时,再统一写到磁盘上,这样不但提高了 kafka 的执行效率,也减少了磁盘IO调用次数。
kafka 中每条消息写到 partition 中,是顺序写入磁盘的,这个很重要,因为在机械盘中如果是随机写入的话,效率将是很低的,但是如果是顺序写入,那么效率 却是非常高,这种顺序写入磁盘机制是 kafka 高吞吐率的一个很重要的保证。
4.Topic和Partition
Kafka 中的 topic(主题)是以 partition 的形式存放的,每一个 topic 都可以设置它的partition 数量,Partition 的数量决定了组成 topic 的 log 的数量。推荐 partition 的数量一定要大于同时运行的 consumer 的数量。另外,建议 partition 的数量要小于等于集群 broker 的数量,这样消息数据就可以均匀的分布在各个broker中
那么,Topic 为什么要设置多个Partition 呢,这是因为 kafka 是基于文件存储的,通过配置多个 partition 可以将消息内容分散存储到多个 broker 上,这样可以避免文件尺寸达到单机磁盘的上限。同时,将一个 topic切分成任意多个 partitions,可以保证消息存储、消息消费的效率,因为越多的 partitions 可以容纳更多的 consumer,可有效提升 Kafka 的吞吐率。因此,将Topic 切分成多个 partitions 的好处是可以将大量的消息分成多批数据同时写到不同节点上,将写请求分担负载到各个集群节点。
在存储结构上,每个 partition 在物理上对应一个文件夹,该文件夹下存储这个 partition的所有消息和索引文件。partiton 命名规则为 topic 名称+序号,第一个 partiton 序号从0开始,序号最大值为 partitions 数量减 1。
在每个 partition(文件夹)中有多个大小相等的 segment(段)数据文件,每个 segment 的大小是相同的,但是每条消息的大小可能不相同,因此 segment<br/>数据文件中消息数量不一定相等。segment 数据文件有两个部分组成,分别为index file 和 data file,此两个文件是一一对应,成对出现,后缀”.index“和“.log”分别表示为 segment 索引文件和数据文件。
5.Producer生产机制
Producer 是消息和数据的生产者,它发送消息到 broker 时,会根据 Paritition 机制选择将其存储到哪一个 Partition。如果 Partition 机制设置的合理,所有消息都可以均匀分布到不同的 Partition 里,这样就实现了数据的负载均衡。如果一个 Topic 对应一个文件,那这个文件所在的机器 I/O 将会成为这个 Topic 的性能瓶颈,而有了 Partition 后,不同的消息可以并行写入不同 broker 的不同 Partition 里,极大的提高了吞吐率。
6.Consumer消费机制
Kafka 发布消息通常有两种模式:队列模式(queuing)和发布/订阅模式(publish-subscribe)。在队列模式下,只有一个消费组,而这个消费组有多个消费者,一条消息只能被这个消费组中的一个消费者所消费;而在发布/订阅模式下,可有多个消费组,每个消费组只有一个消费者,同一条消息可被多个消费组消费。
Kafka 中的 Producer 和 consumer 采用的是 push、pu11 的模式,即 producer 向 broker 进行push 消息,comsumer 从 bork 进行 pu11 消息,push 和 pu11 对于消息的生产和消费是异步进行的。pu11 模式的一个好处是 consumer 可自主控制消费消息的速率,同时 consumer 还可以自己控制消费消息的方式是批量的从 broker 拉取数据还是逐条消费数据。
三、zookeeper概念介绍
ZooKeeper 是一种分布式协调技术,所谓分布式协调技术主要是用来解决分布式环境当中多个进程之间的同步控制,让他们有序的去访问某种共享资源,防止造成资源竞争(脑裂)的后果。脑裂是指在主备切换时,由于切换不彻底或其他原因,导致客户端和Slave 误以为出现两个activemaster,最终使得整个集群处于混乱状态。
ZooKeeper 是一种为分布式应用所设计的高可用、高性能的开源协调服务,它提供了一项基本服务:分布式锁服务,同时,也提供了数据的维护和管理机制,如:统一命名服务、状态同步服务、集群管理、分布式消息队列、分布式应用配置项的管理等等。
1.zookeeper的工作原理
(1)master 启动
在分布式系统中引入 Zookeeper 以后,就可以配置多个主节点,这里以配置两个主节点为例假定它们是主节点A和主节点 B,当两个主节点都启动后,它们都会向 ZooKeeper 中注册节点信息。我们假设主节点A锁注册的节点信息是master00001,主节点B注册的节点信息是master00002 ,注册完以后会进行选举,选举有多种算法,这里以编号最小作为选举算法为例,编号最小的节点将在选举中获胜并获得锁成为主节点,也就是主节点A 将会获得锁成为主节点,然后 主节点B 将被阻塞成为一个备用节点。这样,通过这种方式 Zookeeper 就完成了对两个 Master进程的调度。完成了主、备节点的分配和协作。
(2)master 故障
如果主节点 A 发生了故障,这时候它在 ZooKeeper 所注册的节点信息会被自动删除,而ZooKeeper 会自动感知节点的变化,发现 主节点A 故障后,会再次发出选举,这时候 主节点B将在选举中获胜,替代主节点A 成为新的主节点,这样就完成了主、被节点的重新选举。
(3)master 恢复
如果主节点恢复了,它会再次向 ZooKeeper 注册自身的节点信息,只不过这时候它注册的节,信息将会变成 master00003 ,而不是原来的信息。ZooKeeper 会感知节点的变化再次发动选举这时候 主节点B在选举中会再次获胜继续担任 主节点,主节点A 会担任备用节点。zookeeper 就是通过这样的协调、调度机制如此反复的对集群进行管理和状态同步的。
2.zookeeper集群架构
zookeeper 集群主要角色有 server 和 client,其中 server 又分为 leader、follower 和observer 三个角色,每个角色的含义如下:
Leader:领导者角色,主要负责投票的发起和决议,以及更新系统状态
follower:跟随着角色,用于接收客户端的请求并返回结果给客户端,在选举过程中参与投票。
observer:观察者角色,用户接收客户端的请求,并将写请求转发给leader,同时同步 leader状态,但是不参与投票。observer目的是扩展系统,提高伸缩性。
client:客户端角色,用于向zookeeper 发起请求。
3.zookeeper的工作流程
Zookeeper 修改数据的流程:Zookeeper 集群中每个Server 在内存中存储了一份数据,在Zookeeper 启动时,将从实例中选举一个 Server 作为 leader,Leader 负责处理数据更新等操作,当且仅当大多数 Server 在内存中成功修改数据,才认为数据修改成功。
Zookeeper 写的流程为:客户端 C1ient 首先和一个 Server 或者 0bserve 通信,发起写请求然后 Server 将写请求转发给 Leader,Leader 再将写请求转发给其它 Server,其它 Server 在接收到写请求后写入数据并响应 Leader,Leader 在接收到大多数写成功回应后,认为数据写成功,最后响应 cient,完成一次写操作过程。
四、Zookeeper 在Kafka 中的作用
1.Broker 注册
Broker 是分布式部署并且相互之间相互独立,但是需要有一个注册系统能够将整个集群中的Broker 管理起来,此时就使用到了 Zookeeper。在 Zookeeper 上会有一个专门用来进行 Broker服务器列表记录的节点:
/brokers/ids
每个 Broker 在启动时,都会到 Zookeeper 上进行注册,即到/brokers/ids 下创建属于自己的节点,如/brokers/ids/[0...N]。
Kafka 使用了全局唯一的数字来指代每个 Broker 服务器,不同的 Broker 必须使用不同的 BrokerID 进行注册,创建完节点后,每个 Broker 就会将自己的 IP 地址和端口信息记录到该节点中去。其中,Broker 创建的节点类型是临时节点,一旦 Broker 宕机,则对应的临时节点也会被自动删除。
2.Topic 注册
在 Kafka 中,同一个 Topic 的消息会被分成多个分区并将其分布在多个 Broker 上,这些分区信息及与 Broker 的对应关系也都是由 Zookeeper 在维护,由专门的节点来记录如:borkers/topics
Kafka中每个Topic都会以/brokers/topics/[topic]的形式被记录,如/brokers/topics/login 和/brokers/topics/search 等。Broker 服务器启动后,会到对应Topic 节点(/brokers/topics)上注册自己的 Broker ID 并写入针对该 Topic 的分区总数,如/brokers/topics/login/3->2,这个节点表示 Broker ID为3的一个 Broker 服务器,对于"login"这个 Topic 的消息,提供了2个分区进行消息存储,同样,这个分区节点也是临时节点。
3.生产者负载均衡
由于同一个 Topic 消息会被分区并将其分布在多个 Broker 上,因此,生产者需要将消息合理地发送到这些分布式的 Broker 上,那么如何实现生产者的负载均衡,Kafka 支持传统的四层负载均衡,也支持 Zookeeper 方式实现负载均衡。
(1)四层负载均衡
根据生产者的 IP 地址和端口来为其确定一个相关联的 Broker。通常,一个生产者只会对应单个 Broker,然后该生产者产生的消息都发往该 Broker。这种方式逻辑简单,每个生产者不需要同其他系统建立额外的 TCP 连接,只需要和 Broker 维护单个 TCP 连接即可。但是,其无法做到真正的负载均衡,因为实际系统中的每个生产者产生的消息量及每个 Broker 的消息存储量都是不一样的,如果有些生产者产生的消息远多于其他生产者的话,那么会导致不同的 Broker 接收到的消息总数差异巨大,同时,生产者也无法实时感知到 Broker的新增和删除。
(2)使用 Zookeeper 进行负载均衡
由于每个 Broker 启动时,都会完成 Broker 注册过程,生产者会通过该节点的变化来动态地感知到 Broker 服务器列表的变更,这样就可以实现动态的负载均衡机制。
4.消费者负载均衡
与生产者类似,Kafka中的消费者同样需要进行负载均衡来实现多个消费者合理地从对应的Broker 服务器上接收消息,每个消费者分组包含若干消费者,每条消息都只会发送给分组中的-个消费者,不同的消费者分组消费自己特定的Topic下面的消息,互不扰。
5.记录消息分区与消费者的关系
消费组(Consumer Group)下有多个 Consumer(消费者)。对于每个消费者组(consumerGroup),Kafka 都会为其分配一个全局唯一的 Group ID,Group 内部的所有消费者共享该 ID。订阅的 topic下的每个分区只能分配给某个 group 下的一个consumer(当然该分区还可以被分配给其他 group)
同时,Kafka为每个消费者分配一个 Consumer ID,通常采用"Hostname:UUID"形式表示。在 Kafka中,规定了每个消息分区 只能被同组的一个消费者进行消费,因此,需要在Zookeeper 上记录消息分区与Consumer 之间的关系,每个消费者一旦确定了对一个消息分区的消费权力,需要将其 Consumer ID 写入到 Zookeeper 对应消息分区的临时节点上,例如:consumers/[group_id]/owners/[topic]/[broker id-partition id]其中,[broker_id-partition_id]就是一个消息分区的标识,节点内容就是该消息分区上消费者的 Consumer ID。
6.消息消费进度 offset 记录
在消费者对指定消息分区进行消息消费的过程中,需要定时地将分区消息的消费进度 Offset记录到 Zookeeper 上,以便在该消费者进行重启或者其他消费者重新接管该消息分区的消息消费后,能够从之前的进度开始继续进行消息消费。0ffset 在 Zookeeper 中由一个专门节点进行记录,其节点路径为:
/consumers/[group_id]/offsets/[topic]/[broker id-partition id]节点内容就是 Offset 的值。
7.消费者注册
消费者服务器在初始化启动时加入消费者分组的步骤如下:
(1)注册到消费者分组
每个消费者服务器启动时,都会到 Zookeeper 的指定节点下创建一个属于自己的消费者节点,例如/consumers/[group_id]/ids/[consumer_id],完成节点创建后,消费者就会将自己订阅的Topic 信息写入该临时节点。
(2)对消费者分组中的消费者的变化注册监听
每个消费者都需要关注所属消费者分组中其他消费者服务器的变化情况,即对/consumers/[group_id]/ids 节点注册子节点变化的 Watcher 监听,一旦发现消费者新增或减少,就触发消费者的负载均衡。
(3)对 Broker 服务器变化注册监听
消费者需要对/broker/ids/[0-N]中的节点进行监听,如果发现Broker服务器列表发生变化,那么就根据具体情况来决定是否需要进行消费者负均衡。
(4)进行消费者负载均衡
为了让同一个 Topic 下不同分区的消息尽量均衡地被多个 消费者 消费而进行 消费者 与 消息 分区分配的过程,通常,对于一个消费者分组,如果组内的消费者服务器发生变更或 Broker 服务器发生变更,会发出消费者负载均衡。