文章摘要
本文整理自 ApacheCon Asia 上,StreamNative 工程师付睿的分享《Use Apache Pulsar Functions in a Cloud-Native way》。本文将介绍在云原生环境中使用 Pulsar Functions 的实践,以及基于 Pulsar Functions 和 Kubernetes 的项目 Function Mesh。希望本文可以为 Pulsar Functions 用户带来新的认知,并为有此类需求的用户传递 Pulsar Functions 的价值。
作者简介
付睿,StreamNative 软件工程师,Apache Pulsar Committer,主要工作方向围绕 Pulsar Functions、Pulsar IO Connector 和 Serverless 等相关开发工作。
导读
本文分为四部分。第一部分将从 Apache Pulsar 社区版本在云原生的现状切入,介绍用户使用 Pulsar Functions 遇到的挑战。第二部分将介绍 StreamNative 基于 Pulsar Functions 和 Kubernetes 开源的 Operator——Function Mesh,以及 Function Mesh 如何将 Pulsar Functions 以更延伸的云原生方式运行在 Kubernetes 上。第三部分将分享 Kubernetes 带给 Pulsar Functions 的新能力、扩展 Pulsar Functions 可能性。最终,我们将介绍 StreamNative 对 Function Mesh 的未来规划。
Apache Pulsar in Kubernetes
首先为大家介绍 Apache Pulsar 在云原生环境的现状。近年来,越来越多的企业与开发者因良好的性能选择使用 Pulsar。
• 高吞吐、低延迟:根据 2022 Apache Pulsar 与 Kafka 对比报告,即使集群中有上万的 Topic 或 Partition,Pulsar 依然可以在对消息进行持久化的前提下实现高吞吐和低延迟,性能超出当前流行的其他消息系统。
• 弹性扩缩容:Pulsar 另一个吸引人的特点是非凡的弹性。在用户需要扩容集群、增加新的节点时,无论 BookKeeper 集群还是 Broker 集群,都可以立即将新的工作负载并分配到新的节点,而无需等待集群中已有的数据重新分布。该特性对运维十分友好,大大降低了集群扩缩容的复杂度和风险。
• 原生支持多个大规模分布式高可用数据存储重要的特性:包括多租户、异步复制、分层存储等,非常适合对大规模流消息进行长期持久存储。
• 丰富的生态系统:StreamNative Hub 上汇总了大部分 Pulsar 开源周边生态,包含多个 IO Connector、插件协议等,可以轻松地将 Pulsar 连接到其他系统,实现消息的流动。Pulsar 也与当前最先进的数据处理深度整合,如 Flink、Spark 和 Presto,方便用户对 Pulsar 内数据进行复杂的分析。
虽然以上特性都是开源的,但是在实际生产环境中部署私有化的 Pulsar 集群且充分发挥其能力并不是容易的事情。如图所示,最小的 Pulsar 集群包含一个 ZooKeeper 集群用于存储元数据、一个 BookKeeper 集群作为分布式存储系统以及一个 Broker 集群提供消息和流的能力;如果需要将 Pulsar 暴露到外部网络,那么还需要一个 Proxy 集群。
越来越多的用户选择在云原生环境中部署和使用 Pulsar,比如在 Kubernetes 环境中,然而 Pulsar 并非简单的系统,需要一种简单且通用的方式在云原生环境下支持 Pulsar 的部署。目前综合 Pulsar 社区和商业支持等层面,很多用户会选择社区或者自己维护的 Helm Chart 以及相关 Operator 来在 Kubernetes 上部署 Pulsar 集群。
当然,Pulsar 诞生之初就是为了解决消息领域存在的痛点,打通云与消息系统生态。本文接下来介绍 Pulsar 计算生态中的 Pulsar Functions。
Pulsar Functions
常用的大数据计算分为三种:
• 交互式查询:常见的场景是基于 Presto 进行计算。
• 批/流处理:常用的系统有 Flink 和 Spark 等。
• IO Connector:Pulsar 提供了 Sink 和 Source Connector 支持,让引擎可以理解 Pulsar Schema 并把 Pulsar Topic 当作表来读取并使用。
Pulsar Functions 是轻量级的计算框架,其应用场景不同于以上几个计算方式复杂的计算场景。下图直观地展示了 Pulsar Functions 的运行方式,通过左侧 Pulsar Source 订阅 Topic,然后通过用户自定义 Function 计算进入右侧的 Pulsar Sink 把计算结果写回 Topic。其内部相当于把用户常用消息进行简单的计算,并提供 Function 抽象。这样可以把用户常用的创建、管理、调度副本数等基础功能作为基础设施提供给用户。
注意:对消息领域不熟的人可以将 Pulsar Topic 理解为管道。其实 Topic 是管道的抽象,它可以作为载体让所有的数据通过 Topic 进行缓存。生产者将消息写入 Topic,消费者按照生产者写入的顺序消费消息。
Pulsar Functions 诞生的目的不是为了提供复杂的计算引擎,而是为了将 Serverless 理念与消息系统结合,让 Pulsar 在数据端能够处理轻量级数据,常见的场景如 ETL 和 Real-time Aggregation(实时聚类)大约占总体使用场景的 60% -70%、占 IoT 场景的 90%。其他场景包括微服务、Routing、Filtering 和 Enrichment 等。对于简单的场景,Pulsar Functions 处理就可以让用户无需构建复杂的集群,在 Pulsar 消息端就可以进行简单的计算,节省传输和计算资源。
Pulsar Functions Worker
Broker 是提供消息和流服务的组件,它如何对 Function 进行调度和管理、如何提供相应 API?此处系统引入了新的组件 Pulsar Functions Worker。Pulsar Functions Worker 依赖 Topic 存储元数据,并通过生产者和消费者特性来实现主从架构,如用户分配和调度等。同时,Pulsar Functions Worker 提供完整的 REST API 为 Function 全生命周期管理提供相关接口,这些 API 也集成在 Pulsar Admin 等工具中。用户在使用 Pulsar Functions 时,Broker 和 Functions Worker 一起运行,这也是社区在 Helm Chart 上提供的模式。此模式的优点是易于部署和管理,适合资源有限、非深度使用 Function 的用户场景。
如果用户需要系统有更高的隔离性、避免 Functions Worker 影响集群的运行(深度或重度 Function 用户),那么可以选择将 Functions Worker 作为单独的集群来运行 Function。但是这种方式缺少云原生部署与管理的最佳实践,需要用户花费更多的时间和精力配置和日常运维。
StreamNative 团队验证过此模式,我们将在后文为大家介绍相关的问题与解决方案。
Function Runtime
Pulsar Functions Worker 支持用不同的方式运行用户提交的 Function。通常情况下,Functions Worker 可以单独或和 Broker 共同运营用户提供的 Function,也就是线程和进程的模式。Functions Worker 包含 Kubernetes Runtime 实现,可以将 Function 打包成 StatefulSets 提交给 Kubernetes 集群运行来更好地运行在云原生环境中。
下图展示了一个常见的 Pulsar 集群。用户使用 Kubernetes Runtime 时所提交的 Function 并不运行在 Broker 和 Functions Worker 容器中,而是单独以 StatefulSets 的方式运行在 Kubernetes 集群内。这种方式可以隔离资源并避免 Broker 和 Functions Worker 同时运行带来的安全风险。
Pulsar Functions 遇到的挑战
随着 Pulsar 越来越壮大,Pulsar Functions 的用户越来越多,在众多使用场景中,Pulsar Functions 遇到了以下挑战:
• 不一致性
• 元数据 Topic 可能导致 Broker 循环宕机
• 如何基于 Pulsar Topic 调度
虽然 Kubernetes Runtime 为 Pulsar Functons 带来云原生的支持,但是由于 Broker 和 Functions Worker 部署在一起,每个 Broker 都有一个 Functions Worker,对应的 Function 汇集了所有管理与运维的接口。用户提交 Function 到 Functions Worker 来把 Function 的资源保存到 Topic,调度时 Kubernetes 去 Topic 读取元数据等资源并把数据部署为 StatefulSets。这个过程中存在一些问题。元数据存储在 Pulsar 内,Functions Worker 启动后读取 Topic 元数据,如果对应的 Broker 没有启动或不可用就会导致循环宕机。服务 Functions Worker 元数据的 Broker 启动后 Function 才会开始运行。
以上过程中元数据分为两部分,提交到 Functions Worker 的元数据部署在 Pulsar 上,同时会有一部分落入 Kubernetes,元数据管理复杂。以一个简单的场景举例,当用户使用 kubectl 直接管理已提交到 Kubernetes 的 Function StatefulSets 时,Functions Worker 没有协调的机制。
除了上述几点,Pulsar Functions 通过 Kubernetes Runtime 还会遇到以下挑战:
• 非云原生:难以使用 Kubernetes 功能。扩容、动态管理、弹性伸缩等功能是 Kubernetes 自带的优势,通过 Kubernetes Runtime 难以享受这些优势。
• Function 实例身份验证刷新:针对目前 Kubernetes Runtime 的局限性,用户提交 Function 只能使用 Token 与 Broker 认证鉴权,每隔一段时间就会因为 Token 过期而无法正常启动系统。为了解决这个问题,社区在 Pulsar Admin 上增加了 Update Auth 选项,帮助用户主动更新过期的 Token,但依然需要用户手动执行相关命令来保持 Token 的有效性。
• 难以管理复杂作业:用户处理简单的并非只使用一个 Function,很多场景下需要将多个 Function 串联,甚至将 Source、Sink 和 Function 连在一起来将多个 Function 作为整体来运营或管控。旧有模式需要维护多个命令然后串联多个 Function 来维护 Topic,Function 之间的关系非常复杂,为管理与运维带来压力。
经历了 Pulsar Function 在生产环境中遇到的多个问题后,我们找到了更好的方案把 Pulsar Function 带入云原生环境,以便更好地使用 Kubernetes 特性并帮助用户更简单地管理复杂的 Function 场景。这就是 Function Mesh 诞生的背景。
应对挑战,Function Mesh 设计原理
Function Mesh 的主要目的不是支持更复杂的、对所有计算通用的框架,而是通过更方便的工具帮助用户管理、使用 Pulsar Functions,让 Pulsar Functions 以云原生的方式运行在云原生框架下。
以将 Functions 串联起来作为整体服务用户的需求为例,StreamNative 两年前曾在社区发起一个 PIP[1],初步想法是提供一个统一的组件让用户描述输入和输出的关系,这样通过 yaml 文件等描述就可以直观地观察 Function 之间的逻辑。假如将 PIP 和 Kubernetes 整合,就可以结合 Kubernetes 自带的调度和弹性的策略为用户提供更好的管理与使用体验。从以上两个优点出发,StreamNative 基于 Kubernetes 框架提出了开源的 Function Mesh Operator 的设想。
下面将介绍几个 Function Mesh 的核心概念。
Kubernetes Operator
首先根据 Operator 分析 Function Mesh CRD 和相应的 Controller。
Function Mesh CRD
Kubernetes Operator 可以解决两个问题,描述与提交 Function、调度 Function。描述与提交 Function 以 Kubernetes CRD 为核心,Function、Sink 和 Source 都通过 CRD 的方式进行描述,比如如何并发、自动化扩缩容、需要的资源、输入与输出的 Topic 信息等。这些描述可以让用户以 Kubernetes 的形式来表达 Function,以此打通 Function 在 Kubernetes 环境的描述和提交。下图为 CRD 描述 Function 配置的示例。
为了解决前文提到的复杂场景下多 Function 串联问题,还额外提交了 Function Mesh CRD,用户可以在单独的资源中描述复杂的计算场景,通过 Pulsar Topic 把多个 Function、Sink 和 Source 串联起来。下图为 CRD 描述 Function Mesh 配置的示例。
下图为典型的 Function Mesh 应用场景。假设这是一个常见的 CDC 场景,首先使用 MongoDB Connector 在 Function Mesh 中描述 Namespace Source,需要多个 Function 串联对 DB Message 做 ETL、Filtering、Routing 等处理,最后将结果通过 Sink Connector 将消息传递到下游 MySQL。通过 Function Mesh,用户可以直接描述完整过程并一次性提交到集群中运行。
Controller
当用户把资源提交到 Kubernetes 后,Kubernetes 如何启用 Function?此处需要 Controller 组件。Controller 是 Kubernetes Control Plane 的扩展,可以直接与 Kubernetes API 交互,将用户在 CRD 中的配置映射到对应的 Kubernetes 资源上,也可以在集群生命周期中进行管理。Controller 可以将 Operational Knowledge 转换为 Program,在需要的时候可操作集群,其角色类似工程师,但是更高效更迅速。
Controller 可以观察 Kubernetes 资源的事件并在发生变化时触发调节。此调节基于资源的实际状态和用户对资源的期望状态,每次调节都试图减少实际状态和期望状态之间的差距,条件允许的话,资源每次都可以被调节到期望状态。用户也可以依赖 Timer 来触发调节。Controller 的特性让管理自定义 CRD 更加灵活和强大。
下图展示了 Kubernetes Operator 运行的完整流程。用户可以通过 yaml 文件基于 CRD 定义资源,并使用 kubectl 工具资源提交到 Kubernetes 集群,Kubernetes API 会利用对应的 Controller 调动内部资源,同时监控资源变化。如果有 CRD 变化,那么会根据变化更改信息,这样 Function 和 Pulsar 集群的信息和关系会更加清晰。Pulsar 集群不会保存或管理任何关于 Function 的元数据,只作为数据的出口提供管道服务。
Function Runner
第二个核心概念是 Function Runner,即用户提交的 Function 运行在哪些容器中。Pulsar Function 支持多语言的开发和运行时,目前支持的语言有 Java、Python 和 Go,这些语言在运行时通常会与 Pulsar Image 打包分发。如果在 Function Mesh 中每个 Function 运行的容器都用 Pulsar 的话将产生很大的开销。此外,Function 可能来自第三方程序,Pulsar Image 在 Pulsar 2.10 版本前默认在 Root 权限下运行有安全隐患。
基于以上考虑,StreamNative 在设计时将不同语言的运行时都作为独立 Runner Image 打包,并且将 Java Runner Image 和 StreamNative CI 集成来支持 Sink 和 Source Connector,这样在 StreamNative 开发与认证的 Connector 中都有对应的独立 Image,方便用户在 Function Mesh 中直接使用。Runner Image 让用户可以自定义提交 Function 的方式,既可以选择使用 Runner Image 将 Function 和 Image 直接打包到新的 Image 中,然后提交到 Function Mesh 执行;也可以直接和 Pulsar Package Management Service 交互,将 Package 上传到 Pulsar 的服务中,由 Function Mesh 调度,将 Function 下载到 Runner Pod 执行。
Mesh Worker Service
Function Mesh 的设计初衷是帮助用户更方便地使用 Pulsar Functions。Pulsar Functions 用户可以通过 Functions Worker REST API 访问数据,Function Mesh 向前兼容,基于现有 Kubernetes API 的实现打通 Pulsar Admin,依照之前管理和使用 Pulsar Functions 的方式继续管理和使用 Function Mesh。如果不习惯 CRD 的提交方式,用户依然可以通过 Mesh Worker Service 拥有和以前一样的操作体验。
StreamNative 团队在 Pulsar 2.8 版本提出了将 Mesh Worker Service 抽象为接口的方案,基于接口可以把 Kubernetes API 作为独立的 Worker Service 实现。用户只需要把新的 Worker Service 用和 Pulsar Functions Worker 以同样的方式部署到集群,就可以继续使用 Pulsar Admin 和 Pulsarctl 等管理 Function Mesh 中的 Function。考虑到特殊场景与 Kubernetes 配置,StreamNative 团队在 Mesh Worker Service 提供了更多的可自定义配置选项,方便用户在提交 Function 时可以更好地使用 Kubernetes 原生的能力。
下图罗列了现阶段 Pulsar Functions 和 Function Mesh Worker Service 接口的差别。目前 Mesh Worker Service 已实现了大部分基础管理接口,如 Create、Delete 和 Update。Stateful Function 相关接口正处于开发阶段,近期将支持实现。下图中 Start / Stop / Restart 和 Trigger 两个功能还未实现支持,Start / Stop / Restart 很难延伸到当前 Kubernetes 场景中支持 Scale to 0 等操作,需要其他组件如 Knative 等才能在 Kubernetes 场景中支持该功能。在不支持 Scale to 0 的前提下,该功能类似于 Delete。StreamNative 希望在 Kubernetes 实现原生支持 Scale to 0 后再开发相应的 API 组件。
Pulsar Functions Feature Matrix
以上介绍了 Function Mesh 基本概念,下面对比 Pulsar Functions 和 Function Mesh 支持功能的情况。功能矩阵表格的内容来自 [PIP 108] Pulsar Feature Matrix (Client and Function)[2],下表将 Pulsar Functions 和 Function Mesh 对比来确保 Function Mesh 支持和 Pulsar Functions 同样的功能,现在 Function Mesh 支持包括 Schema、消息的加解密、资源限制、用户自定义配置、Secret 的管理和使用以及状态函数等功能,唯一在开发进程中的功能是对 Window Function 的支持(目前已经支持 Window Function)。
相关资源
• 访问官网[3]了解并使用 Function Mesh。网站内有完整的教程与技术文档帮助用户上手体验 Function Mesh。
• 访问 Function Mesh、Runner Image 和 Function Mesh Worker Service 等开源 GitHub 代码仓库。
· https://github.com/streamnative/function-mesh
· https://github.com/streamnative/function-mesh-worker-service
• 通过 OLM(Operator Lifecycle Manager)或 Helm Chart 等方式安装 Function Mesh Operator。
• https://operatorhub.io/operator/function-mesh
• https://artifacthub.io/packages/helm/function-mesh/function-mesh-operator
• Function Mesh Operator 已经取得 Openshift 的认证,用户可以通过 Openshift 中直接使用经过认证的 Function Mesh Operator。
云原生环境中的 Pulsar Functions
上文介绍了 Function Mesh 如何解决 Pulsar Functions 在云原生环境下遇到的问题。接下来介绍将 Pulsar Functions 带入云原生场景后迸发的新魅力。
自动扩缩容
Function Mesh 允许用户在 CRD 层面上直接定义自动扩缩容的策略,Kubernetes HPA 允许用户基于 CPU 内存或自定义 Metrics 来实现不同层面的自动扩缩容。借助 Prometheus 和 Prometheus Metrics Adapter 组件将 Pulsar Topic Metrics 或 Function Metrics 作为 HPA 参考,就可以成功实现对不同场景下波峰波谷的动态响应。
当单一部分的 Function 消息压力越来越大时,HPA 会及时地根据对应的 Metrics 执行扩容,在很短的时间内就可以将副本数扩充到 10 个。当 Function 的消息压力降低后,HPA 也会进行缩容,恢复为 1 个副本。基于业务和消息压力自动扩缩容其实很难在 Pulsar Functions 上单独实现,通过 Function Mesh 可以轻松实现。
网络安全
在 Kubernetes 中有很多 Pod 对 Pod 和 Pod 对外部的通讯。对 Function 来说,在很多场景中需要执行外部代码,如果可以在网络层面进行限制,就可以更大程度保证集群的安全。Function Mesh 刚刚完成了和 Istio 的集成工作,用户可以方便地借助 Istio 的能力,通过 Authorization Policy[4] 来定义 Pod 网络规则。比如在示例中,用户可以只允许 Function 和 Pulsar Broker Pod 通讯,而避免 Function 接触到其他的服务如 BookKeeper。
apiVersion: security.istio.io/v1beta1
kind: AuthorizationPolicy
...
spec:
rules:
- from:
- source:
principals:
- demo/s/demo/sa/cluster-broker
selector:
matchLabls:
cloud.streamnative.io/pulsar -cluster:
clustercloud.streamnative.io/role: pulsar-function
安全
在大的安全层面,可以借助 Kubernetes 自身的安全配置来进一步加强 Pulsar Functions 安全性,比如通过 SecurityContext 定义用户文件系统权限、配置进程用户权限;直接使用 Kubernetes Secret 为每一个 Function 配置 Service Account 来避免 Pulsar Functions 对 Kubernetes API 越权访问。我们在此介绍了很多安全相关的特性,这也是团队在构建 StreamNative Cloud 时优先考虑的问题,并在每个安全组件上都做了很多工作,比如在 Runner Image 层面引入 Root Image、在 Controller 层面确保 Function 可以以非 Root 用户权限运行、为 Function 配置单独的 Service Account、允许用户配置每一个 Function 和 Pulsar Broker 鉴权参数。Function Mesh 让用户以更安全的方式使用 Pulsar Functions,这也是 StreamNative 在开发 Function Mesh 的过程中发掘的另一个愿景。
周边生态
Function Mesh 把 Pulsar Function 带入 Kubernetes 生态中,意味着用户可以将 Kubernetes 中有趣的能力带入 Pulsar Function,比如 KEDA 可以更好地帮助 Pulsar Functions 进行扩缩容,甚至 Scale to 0;Buildpacks 可以让 Function Mesh 在运行时构建 Function Image,允许用户直接上传代码或者通过 GitHub 仓库提交 Pulsar Functions;甚至可以结合 Krustlet 把 WebAssembly 和 Rust 带入 Pulsar 中。Kubernetes 生态让 Pulsar Functions 拥有了更多的可能性,可以使其更好地服务于更多的场景。
总结与未来规划
Function Mesh 让用户更容易地管理 Pulsar Functions,也让 Pulsar Functions 享受到全能力的 Kubernetes 调度器。Function Mesh 将 Pulsar Functions 带入云原生环境,让 Function 以第一公民的身份运行在云原生环境中,充分享受 Kubernetes 生态的优势。通过 Function Mesh 可以将 Pulsar 集群做不同的分割来把 Function 分布到不同的集群,比如将 Function 单独运行在计算密集型的服务集群上,这是对集群资源调度的很大的优势。
最后介绍 StreamNative 对 Function Mesh 未来规划:
• 除了和生态更多的融合,StreamNative 也计划从 Operator 层面提高 Function Mesh 的能力,比如可监测性、自动扩缩容等。
• 与 Pulsar Functions 功能对齐,保证用户体验一致。
• 提供更好的工具协助用户编排 Function Mesh 资源,可以更简单地构建完整的 Workflow。
• 支持 VPA,Function 可以在资源上动态扩缩容。
引用链接
[1]
PIP: https://github.com/apache/pulsar/wiki/PIP-66%3A-Pulsar-Function-Mesh[2]
[PIP 108] Pulsar Feature Matrix (Client and Function): https://github.com/apache/pulsar/wiki/PIP-108:-Pulsar-Feature-Matrix-(Client-and-Function)[3]
官网: https://functionmesh.io/[4]
Authorization Policy: https://istio.io/latest/docs/reference/config/security/authorization-policy/
▼ 关注「Apache Pulsar」,获取干货与动态 ▼
👇🏻 加入 Apache Pulsar 中文交流群 👇🏻