关键要点
-
Cloudflare在处理大量数据时使用Kafka集群,开发了一个通用的消息总线集群,以解耦团队、有效扩展和处理数万亿条消息。
-
为了解决事件驱动系统中无结构通信的问题,应建立一个强有力的契约:跨平台数据格式Protobuf帮助Cloudflare实现了这一目标。
-
在开发工具上投资度量指标是至关重要的,可以轻松地发现问题:Cloudflare通过在SDK中增加OpenTracing和Prometheus度量指标来了解系统行为并做出更好的决策,尤其是在事故发生时。
-
为了在SDK的采用和使用中实现一致性并促进最佳实践,优先编写清晰的模式文档非常重要。
-
Cloudflare旨在在灵活性和简洁性之间取得平衡:虽然可配置的设置可能提供更多的灵活性,但简单的设置可以在不同的流程中实现标准化。
Cloudflare在不到六年的时间里,为了服务间通信,向Kafka生成了超过1万亿条消息。随着公司和应用服务团队的壮大,他们不得不调整他们的工具以保持快速交付。
我们将讨论在分布式领域团队中工作的早期阶段,以及如何在Kafka之上构建抽象层来达到1万亿消息的里程碑。
我们还将涵盖近年来由于可扩展性限制而面临的真实事故,并介绍应对不断增长的需求所采取的步骤和模式。
什么是 Cloudflare?
Cloudflare 为其客户提供全球网络,并允许他们保护其网站、API 和互联网流量。
该网络还保护企业网络,使客户能够在边缘运行和部署整个应用程序。
Cloudflare 提供一系列产品,包括 CDN、Zero Trust 和 Cloudflare Workers,以实现这些目标,识别和阻止恶意活动并让客户专注于他们的工作。
图 1:Cloudflare 的全球网络
从工程角度来看,Cloudflare的网络由两个主要组成部分构成:全球边缘网络和Cloudflare控制平面。
网络的重要部分是使用Cloudflare的产品构建的,其中Workers部署在边缘网络上并被用于服务。而控制平面则是一组数据中心,公司在其中运行裸金属上的Kubernetes、Kafka和数据库。通常,所有的Kafka生产者和消费者都部署在Kubernetes上,但具体的部署位置取决于工作负载和期望的结果。
在本文中,我们将重点关注Cloudflare的控制平面,并探讨如何扩展服务间通信和使能工具以支持运营。
Kafka
Apache Kafka是建立在集群概念之上的,集群由多个代理(broker)组成,每个集群都有一个指定的领导代理负责协调工作。在下面的图示中,代理2充当领导代理。
图 2:Kafka 集群
消息被归类为主题(topics),例如用户事件,比如用户创建或用户信息更新。主题然后被划分为分区,这种方法使得Kafka能够水平扩展。在图示中,主题A的分区在两个代理上都有,每个分区都有一个指定的领导代理来确定其"真相来源"。为了确保容错性,分区根据预定的复制因子进行复制,通常最小值为3。发送消息到Kafka的服务称为生产者(producers),而读取消息的服务称为消费者(consumers)。
Cloudflare 工程文化
在过去,Cloudflare使用的是一个单体式的PHP应用程序,但随着公司的发展和多样化,这种方法变得限制性和风险较高。
现在,公司不再强制规定特定的工具或编程语言,而是赋予团队建立和维护自己服务的权力,公司鼓励实验和倡导有效的工具和实践。应用服务团队是工程组织中相对较新的成员,它通过提供预打包的工具来帮助其他团队取得成功,并采用最佳实践。这使得开发团队能够专注于提供价值。
紧密耦合
随着产品的增多,我们需要找到更好的方法来使团队能够以自己的节奏工作,并与同事和工程团队解耦。工程团队也需要更多地控制请求的回退和工作完成的保证。
由于我们已经在运行Kafka集群来处理大量数据,因此我们决定花时间创建一个通用的消息总线集群:入职过程很简单,只需提交一个拉取请求到一个代码库,它会设置新主题所需的一切,包括复制策略、保留期和ACL。下图说明了消息总线集群如何帮助解耦不同的团队。
图 3:通用消息总线集群
例如,三个团队可以发布对审计日志系统感兴趣的消息,而无需了解具体的服务。通过减少耦合,工程团队可以更高效地工作并实现有效的扩展。
非结构化通信
在事件驱动的系统中,为了避免耦合,各个系统不应相互感知。最初,我们没有强制的消息格式,生产团队可以自行决定如何组织消息结构。这可能导致非结构化的通信,并且如果团队没有建立强有力的合同,会增加无法处理的消息的数量。
为了避免非结构化的通信,团队在Kafka生态系统中寻找解决方案,找到了两个可行的选择,Apache Avro和protobuf,最终选择了protobuf。之前我们使用的是JSON,但发现很难确保兼容性,并且与protobuf相比,JSON消息的大小更大。
图 4:protobuf 消息
Protobuf提供严格的消息类型和内置的前向和后向兼容性,同时能够生成多种编程语言的代码,这也是一个重要的优势。团队鼓励对他们的protobuf消息进行详细注释,并使用Uber开源的工具Prototool来检测破坏性变更并强制执行代码风格规范。
图 5:切换到 Protobuf
仅使用 Protobuf 是不够的:不同的团队仍然可以向同一主题发送消息,并且由于格式不一致,消费者可能无法处理它什么是预期的。此外,配置 Kafka 消费者和生产者并非易事,需要对工作负载有复杂的了解。由于大多数团队都在使用 Go,我们决定在 Go 中构建一个“消息总线客户端库”,结合最佳实践并允许团队更快地移动。
为了避免团队向同一主题发出不同的消息,我们做出了有争议的决定(在客户端)为每个主题强制执行一种 protobuf 消息类型。虽然这一决定使采用变得容易,但它导致创建了大量主题,并复制了多个分区,复制因子至少为三。
连接器
团队在简化Kafka基础架构方面取得了显著进展,引入了工具和抽象层,但我们意识到还有其他需要解决的用例和模式,以确保遵循最佳实践:团队开发了连接器框架。
图 6:连接器框架
该框架基于 Kafka 连接器,使工程师能够创建从一个系统读取并将其推送到另一个系统的服务,例如 Kafka 或Quicksilver,Cloudflare 的 Edge 数据库。为了简化这个过程,我们使用Cookiecutter作为服务创建的模板,工程师只需要在命令行界面中输入几个参数即可。
连接器的配置过程简单,可以通过环境变量进行,无需进行任何代码更改。
在下面的示例中,读取器是Kafka,写入器是Quicksilver。连接器设置为从topic 1和topic 2读取数据,并应用函数pf_edge。这是完整的配置,其中还包括指标、警报和所有其他进入生产所需的内容,使团队能够轻松遵循最佳实践。团队还可以注册自定义转换,这将是他们需要编写的唯一代码部分。
图 7:一个简单的连接器
例如,我们在通信偏好服务中使用连接器:如果用户想要在 Cloudflare 仪表板中选择退出营销信息,他们可以与此服务交互以实现此目的。通信偏好升级存储在其数据库中,并向 Kafka 发送一条消息。为确保更改反映在三个不同的源系统中,我们使用单独的连接器将更改同步到交易电子邮件服务、客户管理系统和市场电子邮件系统。这种方法使系统最终保持一致,我们利用 Kafka 提供的保证来确保该过程顺利进行。
图 8:连接器和通信首选项
可见性
随着疫情期间我们的客户基础迅速增长,吞吐量也随之增加,这凸显出我们创建的一些抽象层面的可扩展性问题。
一个例子是我们处理的审计日志,针对我们的Kafka客户:我们建立了一个系统来管理这些日志,允许生产者团队产生事件,而我们监听这些事件,并将数据记录在我们的数据库中。
图 9:为审计日志添加日志推送
我们通过API和名为"日志推送"的集成公开了这些信息,该集成使我们能够将审计日志数据推送到各种数据存储桶,例如Cloudflare R2或Amazon S3。
在疫情期间,我们注册了更多的审计日志,并且客户开始使用我们的API获取最新的数据。由于这种方法不具备可扩展性,我们决定开发一个流水线来解决这个问题,创建一个小型服务来监听审计日志事件并将其转换为适当的格式,直接存储在存储桶中,而不会过载API。
随着我们积累了更多的日志,我们遇到了进一步的问题,无法及时清除日志,导致延迟和违反我们的SLA。由于缺乏工具和仪表板来诊断问题,我们对延迟的原因感到不确定:是从Kafka读取的瓶颈,还是转换过程,或者是将数据保存到数据库的问题?
图 10:瓶颈在哪里?
为了解决这个问题,我们决定通过增加Prometheus指标来增强我们的SDK,使用直方图测量处理消息的每个步骤所花费的时间。这帮助我们确定较慢的步骤,但我们无法确定哪个具体组件在处理特定消息时花费的时间更长。为了解决这个问题,我们研究了OpenTelemetry,并专注于其追踪集成功能:在Kafka上并没有很多良好的OpenTracing集成,并且在生产事故期间在不同服务之间传播追踪信息是具有挑战性的。
通过团队增强SDK以支持OpenTracing,我们能够确定将数据推送到存储桶和从Kafka中读取数据都是瓶颈,我们将修复这些问题放在优先考虑的位置。
图 11:识别瓶颈
将指标添加到 SDK 中,我们能够更好地了解集群和服务的健康状况。
Noisy On-call
我们遇到了一个挑战,由于我们收集了大量的指标数据,导致了一个嘈杂的值班体验,大量与应用程序健康和延迟问题相关的警报不断触发。
图 12:警报管道
基本的警报流水线由Prometheus和AlertManager组成,警报会发送到PagerDuty。由于重新启动或扩缩容服务并不理想,我们决定探索如何利用Kubernetes并实施健康检查。
在Kubernetes中,有三种类型的健康检查:存活检查(liveness)、就绪检查(readiness)和启动检查(startup)。对于Kafka来说,实施就绪检查并不是有用的,因为通常情况下不会暴露HTTP服务器。为了解决这个问题,我们采取了一种替代方法。
图 13:健康检查和 Kafka
当接收到存活检查的请求时,我们尝试与一个Broker进行基本操作,比如列出主题,如果响应成功,则检查通过。然而,有些情况下应用程序仍然健康,但无法生产或消费消息,这促使团队为消费者实施更智能的健康检查。
图 14:健康检查实施
Kafka 的当前偏移量是分区上最后一个可用的偏移量,而提交的偏移量是消费者成功消费的最后一个偏移量。
通过在健康检查期间检索这些偏移量,我们可以确定消费者是否正常运行:如果我们无法检索到偏移量,则可能存在潜在问题,并且消费者被报告为不健康。如果偏移量是可检索的,我们将最后提交的偏移量与当前偏移量进行比较。如果它们相同,则没有附加新消息,并且消费者被认为是健康的。如果最后提交的偏移量不同,我们检查它是否与之前记录的最后提交的偏移量相同,以确保消费者没有卡住并需要重启。这个过程带来了更好的随叫随到体验和更快乐的客户。
无法跟上
我们有一个系统,团队可以在其中为他们的电子邮件系统从 Kafka 生成事件。这些事件包含一个模板,例如“受攻击”模板,其中包含有关受攻击网站和攻击者身份的信息,以及元数据。
我们会监听事件,从他们的注册表中检索电子邮件的模板,丰富它,并将其发送给客户。然而,我们开始遇到负载问题:团队开始看到生产率出现峰值,导致消费滞后并影响重要的 OTP 消息和 SLO。
图 15:消费滞后
批处理
我们开始探索不同的解决方案来解决这个问题,最初的解决方案是增加分区和消费者的数量,但并没有带来显著的改善。
图 16:批处理方法
我们决定实施一种更简单但更有效的方法,批量消费,一次处理一定数量的消息,应用转换,然后分批发送。这被证明是有效的,并使团队能够轻松处理高生产率。
图 17:使用批处理时消费没有滞后
文档
在开发我们的 SDK 时,我们发现许多开发人员在使用它时遇到了问题。有些遇到错误,而另一些则不确定如何实现某些功能或解释特定错误。为解决这个问题,我们在 Google Chat 上创建了频道,用户可以在其中向我们提问。我们有一个人随叫随到,并花时间在我们的 wiki 中记录我们的发现和答案。这有助于改善 SDK 的整体用户体验。
结论
有四个教训需要学习:
-
始终在灵活性和简单性之间找到合适的平衡:虽然可配置的设置可能提供更多的灵活性,但简单的设置可以在不同的流程中实现标准化。
-
可见性:尽早在SDK中添加指标可以帮助团队了解系统的行为并做出更好的决策,特别是在故障发生时。
-
契约:强制执行一个强大、严格的契约,可以很好地了解到主题内部发生的情况,知道谁在写入和读取它。
-
文档记录工作中的好成果,这样就不必花时间回答问题或帮助人们调试生产问题。可以通过Google Chat和维基等渠道实现这一点。
通过遵循这些规则,我们能够改善我们的系统,并使我们的客户在高压情况下感到满意。
作者:Matt Boyle
更多技术干货请关注公号“云原生数据库”
squids.cn,基于公有云基础资源,提供云上 RDS,云备份,云迁移,SQL 窗口门户企业功能,
帮助企业快速构建云上数据库融合生态。