作者:来自 Elastic Nima Rezainia
Confluent Cloud 用户现在可以使用更新后的 Elasticsearch Sink Connector 与 Elastic Agent 和 Elastic Integrations 来实现完全托管且高度可扩展的数据提取架构。
Elastic 和 Confluent 是关键的技术合作伙伴,我们很高兴宣布对该合作伙伴关系进行新的投资。Confluent 的 Kafka 是许多企业摄取架构的关键组件,它确保客户能够保证将关键的可观察性和安全性数据传输到他们的 Elasticsearch 集群中。我们一直致力于改进我们产品的结合方式。借助 Elastic Agent 的新 Kafka 输出和 Confluent 新改进的 Elasticsearch Sink Connectors,无缝地从边缘收集数据、通过 Kafka 将其传输到 Elasticsearch 集群从未如此简单。
在这篇博客中,我们研究了一种将 Elastic Agent 与 Confluent Cloud 的 Kafka 产品集成的简单方法,以减轻摄取业务关键数据的运营负担。
Elastic Agent 和 Confluent Cloud 的优势
Elastic Agent 和 Confluent Cloud 的更新版 Elasticsearch Sink connector 结合使用时,可为各种规模的组织提供无数优势。这种组合解决方案可以灵活地以高效且有弹性的方式处理任何类型的数据采集工作负载。
完全托管
Elastic Cloud Serverless 和 Confluent Cloud 结合使用时,可为用户提供完全托管的服务。这使得部署和采集几乎无限量的数据变得毫不费力,而无需担心节点、集群或扩展。
完全支持 Elastic 集成
300 多个 Elastic 集成中的任何一个都完全支持通过 Kafka 发送数据。在这篇博文中,我们概述了如何在两个平台之间建立连接。这可确保你能从我们在内置警报、SLO、AI 助手等方面的投资中受益。
解耦架构
Kafka 充当数据源(如 Elastic Agent 和 Logstash)和 Elasticsearch 之间的弹性缓冲区,将数据生产者与消费者解耦。这可以显著降低总体拥有成本,因为你可以根据典型的数据摄取量(而不是最大摄取量)调整 Elasticsearch 集群的大小。它还可以确保系统在数据量激增时具有弹性。
完全控制你的数据
借助我们新的 “Output per Integration - 每个集成输出” 功能,客户现在可以使用同一代理将不同的数据发送到不同的目的地。客户可以轻松地将安全日志直接发送到 Confluent Cloud/Kafka,这可以提供交付保证,同时将不太重要的应用程序日志和系统指标直接发送到 Elasticsearch。
部署参考架构
在以下部分中,我们将引导你了解使用 Confluent Cloud 的 Elasticsearch Sink Connector 将 Confluent Kafka 与 Elastic Agent 和 Elasticsearch 集成的方法之一。与任何流式传输和数据收集技术一样,根据特定用例,可以通过多种方式配置管道。这篇博文将重点介绍一个简单的架构,可以将其用作更复杂部署的起点。
该架构的一些亮点包括:
- Elastic Agents 上的动态 Kafka 主题选择
- Elasticsearch Sink Connectors,用于从 Confluent Kafka 到 Elasticsearch 的完全托管传输
- 利用 Elastic 的 300 多个集成处理数据
先决条件
在开始之前,请确保你在 Confluent Cloud 中部署了 Kafka 集群,在 Elastic Cloud 中部署了 Elasticsearch 集群或项目,并安装并注册了 Elastic Agent。
为 Elastic Agent 配置 Confluent Cloud Kafka 集群
导航到 Confluent Cloud 中的 Kafka 集群,然后选择 “Cluster Settings”。找到并记下 Bootstrap Server 地址,稍后在 Fleet 中创建 Kafka 输出时我们将需要此值。
导航到左侧导航菜单中的主题并创建两个主题:
- 名为 logs 的主题
- 名为 metrics 的主题
接下来,导航到左侧导航菜单中的 API Keys:
- 单击 + 添加 API Key
- 选择 Service Account API key type
- 为此 API Key 提供一个有意义的名称
- 授予 metrics 和 logs 主题的密钥写入权限
- 创建密钥
请记录提供的 Key 和 Secret,我们稍后在 Fleet 中配置 Kafka 输出时会用到它们。
配置 Elasticsearch 和 Elastic Agent
在本节中,我们将配置 Elastic Agent 以将数据发送到 Confluent Cloud 的 Kafka 集群,并将配置 Elasticsearch 以便它可以从 Confluent Cloud Elasticsearch Sink Connector 接收数据。
配置 Elastic Agent 以将数据发送到 Confluent Cloud
Elastic Fleet 简化了向 Kafka 和 Confluent Cloud 发送数据的过程。使用 Elastic Agent,可以轻松将 Kafka “输出” 附加到来自代理的所有数据,也可以将其仅应用于来自特定数据源的数据。
在左侧导航中找到 Fleet,单击 “Settings” 选项卡。在 “Settings” 选项卡上,找到 “Output” 部分,然后单击 “dd Output”。
执行以下步骤来配置新的 Kafka output:
- 为 output 提供 Name
- 将 Type 设置为 Kafka
- 使用我们之前记下的 Bootstrap Server 地址填充主机字段。
- 在身份验证下,使用 API 密钥填充用户名,使用我们之前记下的密码填充密码
- 在 Topics 下,选择 Dynamic Topic,并将 Topic from field 设置为 data_stream.type
- 单击 “Save and apply settings”
接下来,我们将导航到 Fleet 中的 “Agent Policies” 选项卡,然后单击以编辑我们要将 Kafka 输出附加到的代理策略。打开代理策略后,单击 “Settings” 选项卡,然后将 Output for integrations 和 Output for agent monitoring 更改为我们刚刚创建的 Kafka 输出。
为每个 Elastic 集成选择一个输出:要设置用于特定数据源的 Kafka 输出,请参阅集成级输出文档。
关于主题选择的说明:data_stream.type 字段是一个保留字段,如果我们发送的数据是日志,Elastic Agent 会自动将其设置为 logs,如果我们发送的数据是指标,则设置为 metrics。使用 data_stream.type 启用 Dynamic Topic 选择将导致 Elastic Agent 自动将指标路由到 metrics 主题,并将日志路由到 logs 主题。有关主题选择的信息,请参阅 Kafka 输出的主题设置文档。
在 Elasticsearch 中配置发布端点
接下来,我们将为 Confluent Cloud Sink Connector 设置两个发布端点(数据流),以便在将文档发布到 Elasticsearch 时使用:
- 我们将创建一个数据流 logs-kafka.reroute-default 用于处理 logs
- 我们将创建一个数据流 metrics-kafka.reroute-default 用于处理 metrics
如果我们让这些数据流中的数据保持原样,数据虽然可用,但我们会发现数据未经解析且缺乏重要的丰富内容。因此,我们还将创建两个索引模板和两个摄取管道,以确保数据由我们的 Elastic Integrations 处理。
创建 Elasticsearch 索引模板和摄取管道
以下步骤使用 Kibana 中的 Dev Tools,但所有这些步骤都可以通过 REST API 或使用 Stack Management 中的相关用户界面完成。
首先,我们将创建用于处理 logs 的索引模板和摄取管道:
PUT _index_template/logs-kafka.reroute
{
"template": {
"settings": {
"index.default_pipeline": "logs-kafka.reroute"
}
},
"index_patterns": [
"logs-kafka.reroute-default"
],
"data_stream": {}
}
PUT _ingest/pipeline/logs-kafka.reroute
{
"processors": [
{
"reroute": {
"dataset": [
"{
{data_stream.dataset}}"
],
"namespace": [
"{
{data_stream.namespace}}"
]
}
}
]
}
接下来,我们将创建用于处理 metrics 的索引模板和提取管道:
PUT _index_template/metrics-kafka.reroute
{
"template": {
"settings": {
"index.default_pipeline": "metrics-kafka.reroute"
}
},
"index_patterns": [
"metrics-kafka.reroute-default"
],
"data_stream": {}
}
PUT _ingest/pipeline/metrics-kafka.reroute
{
"processors": [
{
"reroute": {
"dataset": [
"{
{data_stream.dataset}}"
],
"namespace": [
"{
{data_stream.namespace}}"
]
}
}
]
}
关于 rerouting 的说明:下面是一个实际示例,其中与 Linux 网络指标相关的文档将首先进入 metrics-kafka.reroute-default,然后此 Ingest Pipeline 将检查该文档并发现 data_stream.dataset 设置为 system.network,data_stream.namespace 设置为 default。它将使用这些值将文档从 metrics-kafka.reroute-default 重新路由到 metrics-system.network-default,然后由系统集成进行处理。
配置 Confluent Cloud Elasticsearch Sink 连接器
现在是时候配置 Confluent Cloud Elasticsearch Sink 连接器了。我们将执行以下步骤两次并创建两个单独的连接器,一个用于 logs 的连接器,一个用于 metrics 的连接器。如果所需的设置不同,我们将突出显示正确的值。
导航到 Confluent Cloud 中的 Kafka 集群,然后从左侧导航菜单中选择 Connectors。在 Connectors 页面上,从可用的连接器目录中选择 Elasticsearch Service Sink。
Confluent Cloud 为用户提供了配置连接器的简化工作流程。这里我们将逐步介绍该过程的每个步骤:
步骤 1:主题选择
首先,我们将根据要部署的连接器选择连接器将从中消费数据的主题:
- 部署用于 logs 的 Elasticsearch Sink 连接器时,请选择 logs 主题。
- 部署用于 metrics 的 Elasticsearch Sink 连接器时,请选择 metrics 主题。
步骤 2:Kafka 凭据
选择 KAFKA_API_KEY 作为集群身份验证模式。提供前面提到的 API Key 和 Secret
我们收集所需的 Confluent Cloud Cluster 信息。
步骤 3:身份验证
提供我们的 Elasticsearch 集群的 Elasticsearch Endpoint 地址作为连接 URI。连接用户和连接密码是 Elasticsearch 中帐户的身份验证信息,Elasticsearch Sink Connector 将使用这些信息将数据写入 Elasticsearch。
步骤 4:配置
在此步骤中,我们将 Input Kafka record value format 设置为 JSON。接下来,展开 Advanced Configuration。
- 我们将 Data Stream Dataset 设置为 kafka.reroute
- 我们将根据要部署的连接器设置 Data Stream Type:
- 在为 logs 部署 Elasticsearch Sink 连接器时,我们将 Data Stream Type 设置为 logs
- 在为 metrics 部署 Elasticsearch Sink 连接器时,我们将Data Stream Type 设置为 metrics
- 其他设置的正确值将取决于具体环境。
步骤 5:调整大小
在此步骤中,请注意 Confluent Cloud 为我们的部署提供了建议的最少任务数。遵循此处的建议是大多数部署的良好起点。
步骤 6:检查和启动
查看 Connector configuration 和 Connector pricing 部分,如果一切正常,则是时候单击 continue 并启动连接器了!连接器可能会报告为配置,但很快就会开始使用来自 Kafka 主题的数据并将其写入 Elasticsearch 集群。
你现在可以导航到 Kibana 中的发现并找到流入 Elasticsearch 的日志!还请查看 Confluent Cloud 为你的新 Elasticsearch Sink Connector 部署提供的实时指标。
如果你只部署了第一个 logs 接收器连接器,你现在可以重复上述步骤来部署第二个 metrics 接收器连接器。
享受完全托管的数据采集架构
如果你遵循上述步骤,那么恭喜你。你已成功:
- 配置 Elastic Agent 以将日志和指标发送到 Kafka 中的专用主题
- 在 Elasticsearch 中创建发布端点(数据流),专用于处理来自 Elasticsearch Sink Connector 的数据
- 配置托管 Elasticsearch Sink Connector 以使用来自多个主题的数据并将该数据发布到 Elasticsearch
接下来,你应该启用其他集成,部署更多 Elastic Agent,在 Kibana 中探索你的数据,并享受 Elastic Serverless 和 Confluent Cloud 完全托管的数据采集架构带来的好处!
原文:Deploying Elastic Agent with Confluent Cloud's Elasticsearch Connector — Elastic Observability Labs