目录
一、消息队列的优缺点
二、常用MQ
1. Kafka
2. RocketMQ
3. RabbitMQ
4. ActiveMQ
5. ZeroMQ
6. MQ选型对比
适用场景——从公司基础建设力量角度出发
适用场景——从业务场景角度出发
四、基本概念和操作
1. kafka常用术语
2. kafka常用指令
3. 单播消息(group.id相同)
4. 多播消息(group.id不相同)
5. 消费组
编辑6. 稀疏索引
五、kafka集群
1. 集群搭建
2. 集群启动和验证
3. Topic的意义
4. Topic和Partition
5. 分区
6. 副本
7. 集群操作指令
8. 多分区&多副本
9. 多分区消费组
10. Controller
11. Rebalance机制
12. HW和LEO
六、高频面试题
1. 如何防止消息丢失?
2. 如何防止消息的重复消费?
3. 如何做到顺序消费?
4. 如何解决消息积压的问题?
5. 如何实现延迟队列?
6. Kafka如何做到单机上百万的高吞吐量呢?
7. Kafka高吞吐——非零拷贝技术
8. Kafka高吞吐——零拷贝技术
一、消息队列的优缺点
消息队列的优点:
- ① 实现系统解耦
- ② 实现异步调用
- ③ 流量削峰
消息队列的缺点:
- ① 系统可用性降低
- ② 提升系统的复杂度
- ③ 数据一致性问题
二、常用MQ
1. Kafka
Apache Kafka是一个分布式消息发布订阅系统。它最初由LinkedIn公司基于独特的设计实 现为一个分布式的日志提交系统(a distributed commit log),之后成为Apache项目的一 部分。Kafka性能高效、可扩展良好并且可持久化。它的分区特性,可复制和可容错都是 其不错的特性。
优点: 客户端语言丰富:支持Java、.Net、PHP、Ruby、Python、Go等多种语言 高性能:单机写入TPS约在100万条/秒,消息大小10个字节; 提供完全分布式架构,并有replica机制,拥有较高的可用性和可靠性,理论上支持消息无限堆积; 消费者采用Pull方式获取消息。消息有序,通过控制能够保证所有消息被消费且仅被消费一次; 在日志领域比较成熟,被多家公司和多个开源项目使用。有管理界面Kafka-Manager;
缺点: Kafka单机超过64个队列/分区时,Load时会发生明显的飙高现象。队列越多,负载越高,发送消息响 应时间变长; 使用短轮询方式,实时性取决于轮询间隔时间; 消费失败不支持重试; 社区更新较慢。
2. RocketMQ
RocketMQ出自阿里的开源产品,用Java语言实现,在设计时参考了Kafka,并做出了自 己的一些改进,消息可靠性上比Kafka更好。RocketMQ在阿里内部被广泛应用在订单, 交易,充值,流计算,消息推送,日志流式处理,binglog分发等场景。
优点:单机支持1万以上持久化队列; RocketMQ的所有消息都是持久化的,先写入系统PAGECACHE,然后刷盘,可以保证内存与磁盘都有 一份数据,而访问时,直接从内存读取。 模型简单,接口易用(JMS的接口很多场合并不太实用); 性能非常好,可以允许大量堆积消息在Broker中; 支持多种消费模式,包括集群消费、广播消费等; 各个环节分布式扩展设计,支持主从和高可用;开发度较活跃,版本更新很快。
缺点:支持的客户端语言不多,目前是Java及C++,其中C++还不成熟; RocketMQ社区关注度及成熟度也不及Kafka; 没有Web管理界面,提供了一个 CLI (命令行界面) 管理工具带来查询、管理和诊断各种问题; 没有在MQ核心里实现JMS等接口;
3. RabbitMQ
RabbitMQ于2007年发布,是一个在AMQP(高级消息队列协议)基础上完成的,可复用的企 业消息系统,是当前最主流的消息中间件之一。它提供了多种技术可以让你在性能和可靠 性之间进行权衡。这些技术包括持久性机制、投递确认、发布者证实和高可用性机制;灵 活的路由,消息在到达队列前是通过交换机进行路由的。RabbitMQ为典型的路由逻辑提 供了多种内置交换机类型。
优点:由于Erlang语言的特性,消息队列性能较好,支持高并发; 健壮、稳定、易用、跨平台、支持多种语言、文档齐全; 有消息确认机制和持久化机制,可靠性高; 高度可定制的路由; 管理界面较丰富,在互联网公司也有较大规模的应用,社区活跃度高。
缺点:实现了代理架构,意味着消息在发送到客户端之前可以在中央节点上排队。此特性使得RabbitMQ易 于使用和部署,但是使得其运行速度较慢,因为中央节点增加了延迟,消息封装后也比较大;需要学习比 较复杂的接口和协议,学习和维护成本较高。 尽管结合 Erlang 语言本身的并发优势,性能较好,但是不利于做二次开发和维护;
4. ActiveMQ
ActiveMQ是由Apache出品,ActiveMQ是一个完全支持JMS1.1和J2EE 1.4规范的JMS Provider实现。它非常快速,支持多种语言的客户端和协议,而且可以非常容易的嵌入到 企业的应用环境中,并有许多高级功能。
优点:可以用JDBC:可以将数据持久化到数据库。虽然使用JDBC会降低ActiveMQ的性能,但是数据库一直都 是开发人员最熟悉的存储介质; 支持JMS规范:支持JMS规范提供的统一接口; 支持自动重连和错误重试机制; 有安全机制:支持基于shiro,jaas等多种安全配置机制,可以对Queue/Topic进行认证和授权; 监控完善:拥有完善的监控,包括WebConsole,JMX,Shell命令行,Jolokia的RESTful API; 界面友善:提供的WebConsole可以满足大部分情况,还有很多第三方的组件可以使用,比如hawtio;
缺点:社区活跃度不及RabbitMQ高; 根据其他用户反馈,会出莫名其妙的问题,会丢失消息; 目前重心放到activemq6.0产品Apollo,对5.x的维护较少; 不适合用于上千个队列的应用场景;
5. ZeroMQ
号称史上最快的消息队列,它实际类似于Socket的一系列接口,他跟Socket的区别是:普 通的socket是端到端的(1:1的关系),而ZMQ却是可以N:M 的关系,人们对BSD套接字 的了解较多的是点对点的连接,点对点连接需要显式地建立连接、销毁连接、选择协议 (TCP/UDP)和处理错误等,而ZMQ屏蔽了这些细节,让你的网络编程更为简单。ZMQ用 于node与node间的通信,node可以是主机或者是进程。
引用官方的说法: “ZMQ(以下ZeroMQ简称ZMQ)是一个简单好用的传输层,像框架一样的 一个socket library,他使得Socket编程更加简单、简洁和性能更高。是一个消息处理队列 库,可在多个线程、内核和主机盒之间弹性伸缩。ZMQ的明确目标是“成为标准网络协议栈 的一部分,之后进入Linux内核”。
与RabbitMQ相比,ZMQ并不像是一个传统意义上的消息队列服务器,事实上,它也根本不 是一个服务器,更像一个底层的网络通讯库,在Socket API之上做了一层封装,将网络通 讯、进程通讯和线程通讯抽象为统一的API接口。支持“Request-Reply “,”Publisher Subscriber“,”Parallel Pipeline”三种基本模型和扩展模型。
6. MQ选型对比
总结:消息队列的选型需要根据具体应用需求而定,ZeroMQ小而美,RabbitMQ大而稳,Kakfa和RocketMQ快而强劲。
适用场景——从公司基础建设力量角度出发
中小型软件公司,建议选RabbitMQ 首先:erlang语言天生具备高并发的特性,而且他的管理界面用起来十分方便。他的弊端 也很明显,虽然RabbitMQ是开源的,然而国内有几个能定制化开发erlang的程序员呢?所幸, RabbitMQ的社区十分活跃,可以解决开发过程中遇到的bug,这点对于中小型公司来说十分重 要。
其次:不考虑Kafka的原因是,一方面中小型软件公司不如互联网公司,数据量没那么大, 选消息中间件,应首选功能比较完备的,所以kafka排除。 最后:不考虑RocketMQ的原因是,RocketMQ是阿里出品,如果阿里放弃维护RocketMQ, 中小型公司一般抽不出人来进行RocketMQ的定制化开发,因此不推荐。
大型软件公司 根据具体使用在RocketMQ和kafka之间二选一。 一方面,大型软件公司,具备足够的资金搭建分布式环境,也具备足够大的数据量。 针对RocketMQ,大型软件公司也可以抽出人手对RocketMQ进行定制化开发,毕竟国内有 能力改JAVA源码的人,还是相当多的。 至于kafka,根据业务场景选择,如果有日志采集功能,肯定是首选kafka了。
适用场景——从业务场景角度出发
RocketMQ定位于非日志的可靠消息传输(日志场景也OK),目前RocketMQ在阿里集团被 广泛应用在订单,交易,充值,流计算,消息推送,日志流式处理,binglog分发等场景。
Kafka是LinkedIn开源的分布式发布-订阅消息系统,目前归属于Apache顶级项目。Kafka主 要特点是基于Pull的模式来处理消息消费,追求高吞吐量,一开始的目的就是用于日志收集 和传输。0.8版本开始支持复制,不支持事务,对消息的重复、丢失、错误没有严格要求, 适合产生大量数据的互联网服务的数据收集业务。
RabbitMQ是使用Erlang语言开发的开源消息队列系统,基于AMQP协议来实现。AMQP的主 要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、安全。AMQP协议更 多用在企业系统内,对数据一致性、稳定性和可靠性要求很高的场景,对性能和吞吐量的要 求还在其次。
三、安装
四、基本概念和操作
1. kafka常用术语
名词 | 解释 |
Broker | 【节点】一个Kafka节点就是一个Broker,一个和多个Broker可以组成 一个Kafka集群 |
Topic | 【主题】Kafka根据topic对消息进行归类,发布到kafka集群的每套消 息都需要指定一个topic,topic是一个逻辑概念,物理上是不存在的 |
Producer | 【生产者】用于向Kafka中发送消息 |
Consumer | 【消费者】从Kafka中获取消息 |
Consumer Group | 【消费组】每个Consumer都会归属于一个消费组,一条消息可以同时 被多个不同的消费组消费,但是只能被一个消费组中的消费者消费 |
Partition | 【分片】物理上的概念,可以将一个topic上的数据拆分为多分放到 Partition中,每个Patition内部的消息是有序的。 |
2. kafka常用指令
创建主题
- ./kafka-topics.sh --bootstrap-server localhost:9092 --create --topic muse -- partitions 1 --replication-factor 1
查看主题
- ./kafka-topics.sh --list --bootstrap-server localhost:9092
开启消息生产端
- ./kafka-console-producer.sh --topic muse --bootstrap-server localhost:9092
开启消息消费端
- ./kafka-console-consumer.sh --topic muse --bootstrap-server localhost:9092
- ./kafka-console-consumer.sh --topic muse --bootstrap-server localhost:9092 -- from-beginning
消息是会被存储在kafka中的文件里的,并且是顺序存储的,消息有偏移量的概念,所以我们可以指定偏移量去读取某个位置的消息。
3. 单播消息(group.id相同)
一个消费组里,只会有一个消费者能够消费到某个topic中的消息。
首先,打开两个窗口,分别执行如下语句开启消费端,那么就在“museGroup1”消费组中创 建了两个Consumer
- ./kafka-console-consumer.sh --topic muse --bootstrap-server localhost:9092 -- consumer-property group.id=museGroup1
然后:Producer端发送3条消息,我们发现,只有一个Consumer收到了消息。
4. 多播消息(group.id不相同)
当业务场景中,需要同一个topic下的消息被多个消费者消费,那么我们可以采用创建多个消费组的方 式,那么这种方式就是多播消息。打开两个窗口,分别执行如下指令:
- ./kafka-console-consumer.sh --topic muse --bootstrap-server localhost:9092 --consumer property group.id=museGroup1
- ./kafka-console-consumer.sh --topic muse --bootstrap-server localhost:9092 --consumer property group.id=museGroup2
最后,Producer端发送3条消息,我们发现,两个Consumer都收到了消息。如下所示:
5. 消费组
查看当前主题下有哪些消费组
- ./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list
查看消费组中的具体信息
- ./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group museGroup1 --describe
其中,展现出来的信息有如下含义:
- CURRENT-OFFSET:当前消费组已消费消息的偏移量。
- LOG-END-OFFSET:主题对应分区消息的结束偏移量(水位——HW)。
- LAG:当前消费组堆积未消费的消息数量。
__consumer_offsets-N
kafka默认创建了一个拥有50个分区的主题,名称为:“__consumer_offsets”。
consumer会将消费分区的offset提交给kafka内部topic:__consumer_offsets,提交过去 的时候,【key】=consumerGroupID+topic+分区号,【value】=当前offset的值。
Kafka会定期清理topic里的消息,最后就保留最新的那条数据。可以通过如下公式确定 consumer消费的offset要提交到哪个__consumer_offsets hash(consumerGroupID)% 主题“__consumer_offsets”的分区数量
6. 稀疏索引
五、kafka集群
1. 集群搭建
创建kafka-cluster目录,并解压kafka_2.13-3.0.0.tgz为3份kafka
修改kafka1的server.propertis配置文件
- broker.id=10 listeners=PLAINTEXT://localhost:9093 log.dirs=/Users/muse/Lesson/ServerContext/kafka-cluster/kafka1/kafka-logs
修改kafka2的server.propertis配置文件
- broker.id=11 listeners=PLAINTEXT://localhost:9094 log.dirs=/Users/muse/Lesson/ServerContext/kafka-cluster/kafka2/kafka-logs
修改kafka3的server.propertis配置文件
- broker.id=12 listeners=PLAINTEXT://localhost:9095 log.dirs=/Users/muse/Lesson/ServerContext/kafka-cluster/kafka3/kafka-logs
2. 集群启动和验证
启动kafka1,kafka2和kafka3
- ./kafka1/bin/kafka-server-start.sh -daemon ./kafka1/config/server.properties
- ./kafka2/bin/kafka-server-start.sh -daemon ./kafka2/config/server.properties
- ./kafka3/bin/kafka-server-start.sh -daemon ./kafka3/config/server.properties
验证3个Broker是否启动成功
- 首先,可以通过ps -ef | grep kafka来查看进程是否启动
- 其次:在zookeeper中查看/brokers/ids下中是否有相应的brokerId目录生成
3. Topic的意义
试想一下,当我们要尝试发送 /消费消息的时候需要注意什 么呢?“这有啥需要注意的, 发送不就得了!” 结果,我们发现了一个非常重 大的问题,大家都往Kafka中 发送消息,所有的消息都混合 在了一起,就类似所有快递公 司的快递员(Producer端)把 快递都扔到了一个大仓库里, 结果,去取快递的小伙伴们 (Consumer端)面对堆积如 山且混乱不堪的“快递山”—— 疯了。。。
4. Topic和Partition
5. 分区
一个主题中的消息量是非常大的,因此可以通过分区的设置,来分布式存储这些消息。 并且分区也可以提供消息并发存储的能力。
6. 副本
如果分片挂掉了,数据就丢失了。那么为了提高系统的可用性。我们把分片复制多个,这 就是副本了。但是,副本的产生,也会随之带来数据一致性的问题,即:有的副本写数据 成功,但是有的副本写数据失败。
Leader:kafka的读写操作都发生在leader上,leader负责把数据同步给follower。 当leader挂掉了,那么经过主从选举,从多个follower中选举产生一个新的leader。
Follower:follower接收leader同步过来的数据,它不提供读写(主要是为了保证多副本数据与消费 的一致性)
7. 集群操作指令
为名称为muse-rp的Topic创建2个分区(--partitions)3个副本(--replication-factor)
- ./kafka-topics.sh --create --topic muse-rp --bootstrap-server localhost:9093 localhost:9094 localhost:9095 --partitions 2 --replication-factor 3
删除Topic
- ./kafka-topics.sh --delete --topic muse-rp --bootstrap-server localhost:9093 localhost:9094 localhost:9095
查看分区和副本分布情况
- ./kafka-topics.sh --describe --topic muse-rp --bootstrap-server localhost:9093 localhost:9094 localhost:9095
消息发送和消费
- ./kafka-console-producer.sh --topic muse-rp --bootstrap-server localhost:9093 localhost:9094 localhost:9095
- ./kafka-console-consumer.sh --topic muse-rp --bootstrap-server localhost:9093 localhost:9094 localhost:9095 --consumer-property group.id=museGroup1
8. 多分区&多副本
为名称为muse-rp的Topic创建---------------------------- 2个分区(--partitions) 3个副本(--replication-factor)
9. 多分区消费组
一个partition只能被一个消费组中的一个消费者消费,这样设计的目的是保证消息的有序 性,但是在多个partition的多个消费者消费的总顺序性是无法得到保证的。
partition的数量决定了消费组中Consumer的数量,建议同一个消费组中的Consumer数量 不要超过partition的数量,否则多余的Consumer就无法消费到消息了。
但是,如果消费者挂掉了,那么就会触发rebalance机制,会由其他消费者来消费该分区。
10. Controller
Kafka集群中的Broker在ZK中创建临时序号节点,序号最小的节点也就是最先创建的那个 节点,将作为集群的Controller,负责管理整个集群中的所有分区和副本的状态。
Controller控制器的作用如下:
- ① 当某个分区的leader副本出现故障时,由控制器负责为该分区选举出新的leader副本。
- ② 当检测到某个分区的ISR集合发生变化时,由控制器负责通知所有broker更新其元数 据信息。
- ③ 当使用kafka-topics.sh脚本为某个topic增加分区数量时,同样还是由控制器负责让 新分区被其他节点感知到。
11. Rebalance机制
当消费者没有指明分区消费时,消费组里的消费者和分区关系发生了变化,那么就会触发 rebalance机制。这个机制会重新调整消费者消费哪个分区。
在触发rebalance机制之前,消费者消费哪个分区有3种策略:
1> range:通过公式来计算某个消费者消费哪个分区。
2> 轮询:大家轮流对分区进行消费。
3> sticky:在触发rebalance之后,在消费者消费的原分区不变的基础上进行调整。
12. HW和LEO
HW(HighWatermark)俗称高水位,取一个partition对应的ISR中最小的LEO(log-end offset)作为HW;
Consumer最多只能消费到HW所在的位置,每个副本都有HW,Leader和Follower各自负 责更新自己的HW的状态。
对于Leader新写入的消息,Consumer不能立刻消费,Leader会等待该消息被所有ISR中的 副本同步后更新HW,此时消息才能被Consumer所消费。这样就能保证如果Leader所在的 broker失效,该消息仍然可以从新选举的Leader中获取。
具体逻辑请看下一张图:
六、高频面试题
1. 如何防止消息丢失?
针对发送方:将ack设置为1或者-1/all可以防止消息丢失,如果要做到99.9999%,ack要设 置为all,并把min.insync.replicas配置成分区备份数
针对接收方:把自动提交修改为手动提交(enable-auto-commit)。
2. 如何防止消息的重复消费?
一条消息被消费者消费多次,如果为了不消费到重复的消息,我们需要在消费 端增加幂等性处理,例如:
- ① 通过mysql插入业务id作为主键,因为主键具有唯一性,所以一次只能插 入一条业务数据。
- ② 使用redis或zk的分布式锁,实现对业务数据的幂等操作。
3. 如何做到顺序消费?
针对发送方:在发送时将ack配置为非0,确保消息至少同步到leader之后再返回ack继续发 送。但是,只能保证分区内部的消息是顺序的,而无法保证一个Topic下的多个分 区总的消息是有序的。
针对接收方:消息发送到一个分区中,只配置一个消费组的消费者来接收消息,那么这个 Consumer所接收到的消息就是有顺序的了,不过这也就牺牲掉了性能。
4. 如何解决消息积压的问题?
消息积压会导致很多问题,比如:磁盘被打满、Producer发送消息导致kafka 性能过慢,然后就有可能发生服务雪崩。解决的方案如下所示:
① 提升一个Consumer的处理能力。即:在一个消费者中启动多个线程,让 多线程加快消费的速度。
② 提升总体Consumer的处理能力。启动多个消费组,增加Consumer的数量 从而提高消费能力。
③ 如果业务运行,设定某个时间内,如果消息仍没有被消费,那么Consumer收到消息后,直接废弃掉,不执行下面的业务逻辑。
5. 如何实现延迟队列?
应用场景:订单创建成功后如果超过30分钟没有付款,则需要取消订单。
- ① 创建一个表示“订单30分钟未支付”的Topic,如:order_not_paid_30min, 表示延迟30分钟的消息队列。
- ② Producer发送消息的时候,消息内容要带上订单生成的时间create_time。
- ③ Consumer消费Topic中的消息,如果发现now减去create_time不足30分钟, 则不去消费;记录当前的offset,不去消费当前以及之后的消息。
- ④ 通过记录的offset去获取消息,如果发现消息已经超过30分钟且订单状态 是“未支付”,那么则将订单状态设置为“取消”,然后获取下一个offset的 消息。
6. Kafka如何做到单机上百万的高吞吐量呢?
- 写入数据:主要是依靠页面缓存技术 + 磁盘顺序写实现的。
- 读取数据:主要依靠零拷贝技术实现的。
7. Kafka高吞吐——非零拷贝技术
通过下图过程的描述,很明显可以看到存在两次没必要的copy,一次是从OS Cache里拷 贝到Kafka进程的缓存里,接着又从Kafka进程的缓存里拷贝回OS的Socket缓存里。而且 为了进行这两次拷贝,中间还发生了好几次上下文切换,一会儿是Kafka进程在执行,一 会儿上下文切换到操作系统来执行,所以这种方式来读取数据是比较消耗性能的。
8. Kafka高吞吐——零拷贝技术
Kafka为了解决非零拷贝这个问题,在读数据的时候是引入零拷贝技术。也就是说,通过 OS的sendfile技术直接让OS Cache中的数据发送到网卡后传输给下游的消费者,中间跳 过了两次拷贝数据的步骤。通过零拷贝技术,就不需要把OS Cache里的数据拷贝到应用 缓存,再从应用缓存拷贝到 Socket 缓存了,两次拷贝都省略了,所以叫做零拷贝