使用 Confluent Cloud 的 Elasticsearch Connector 部署 Elastic Agent

news2025/1/27 7:41:45

作者:来自 Elastic Nima Rezainia

Confluent Cloud 用户现在可以使用更新后的 Elasticsearch Sink Connector 与 Elastic Agent 和 Elastic Integrations 来实现完全托管且高度可扩展的数据提取架构。

Elastic 和 Confluent 是关键的技术合作伙伴,我们很高兴宣布对该合作伙伴关系进行新的投资。Confluent 的 Kafka 是许多企业摄取架构的关键组件,它确保客户能够保证将关键的可观察性和安全性数据传输到他们的 Elasticsearch 集群中。我们一直致力于改进我们产品的结合方式。借助 Elastic Agent 的新 Kafka 输出和 Confluent 新改进的 Elasticsearch Sink Connectors,无缝地从边缘收集数据、通过 Kafka 将其传输到 Elasticsearch 集群从未如此简单。

在这篇博客中,我们研究了一种将 Elastic Agent 与 Confluent Cloud 的 Kafka 产品集成的简单方法,以减轻摄取业务关键数据的运营负担。

Elastic Agent 和 Confluent Cloud 的优势

Elastic Agent 和 Confluent Cloud 的更新版 Elasticsearch Sink connector 结合使用时,可为各种规模的组织提供无数优势。这种组合解决方案可以灵活地以高效且有弹性的方式处理任何类型的数据采集工作负载。

完全托管

Elastic Cloud Serverless 和 Confluent Cloud 结合使用时,可为用户提供完全托管的服务。这使得部署和采集几乎无限量的数据变得毫不费力,而无需担心节点、集群或扩展。

完全支持 Elastic 集成

300 多个 Elastic 集成中的任何一个都完全支持通过 Kafka 发送数据。在这篇博文中,我们概述了如何在两个平台之间建立连接。这可确保你能从我们在内置警报、SLO、AI 助手等方面的投资中受益。

解耦架构

Kafka 充当数据源(如 Elastic Agent 和 Logstash)和 Elasticsearch 之间的弹性缓冲区,将数据生产者与消费者解耦。这可以显著降低总体拥有成本,因为你可以根据典型的数据摄取量(而不是最大摄取量)调整 Elasticsearch 集群的大小。它还可以确保系统在数据量激增时具有弹性。

完全控制你的数据

借助我们新的 “Output per Integration - 每个集成输出” 功能,客户现在可以使用同一代理将不同的数据发送到不同的目的地。客户可以轻松地将安全日志直接发送到 Confluent Cloud/Kafka,这可以提供交付保证,同时将不太重要的应用程序日志和系统指标直接发送到 Elasticsearch。

部署参考架构

在以下部分中,我们将引导你了解使用 Confluent Cloud 的 Elasticsearch Sink Connector 将 Confluent Kafka 与 Elastic Agent 和 Elasticsearch 集成的方法之一。与任何流式传输和数据收集技术一样,根据特定用例,可以通过多种方式配置管道。这篇博文将重点介绍一个简单的架构,可以将其用作更复杂部署的起点。

该架构的一些亮点包括:

  • Elastic Agents 上的动态 Kafka 主题选择
  • Elasticsearch Sink Connectors,用于从 Confluent Kafka 到 Elasticsearch 的完全托管传输
  • 利用 Elastic 的 300 多个集成处理数据


先决条件

在开始之前,请确保你在 Confluent Cloud 中部署了 Kafka 集群,在 Elastic Cloud 中部署了 Elasticsearch 集群或项目,并安装并注册了 Elastic Agent。

为 Elastic Agent 配置 Confluent Cloud Kafka 集群

导航到 Confluent Cloud 中的 Kafka 集群,然后选择 “Cluster Settings”。找到并记下 Bootstrap Server 地址,稍后在 Fleet 中创建 Kafka 输出时我们将需要此值。

导航到左侧导航菜单中的主题并创建两个主题:

  • 名为 logs 的主题
  • 名为 metrics 的主题

接下来,导航到左侧导航菜单中的 API Keys:

  1. 单击 + 添加 API Key
  2. 选择 Service Account  API key type
  3. 为此 API Key 提供一个有意义的名称
  4. 授予 metrics 和 logs 主题的密钥写入权限
  5. 创建密钥

请记录提供的 Key 和 Secret,我们稍后在 Fleet 中配置 Kafka 输出时会用到它们。

配置 Elasticsearch 和 Elastic Agent

在本节中,我们将配置 Elastic Agent 以将数据发送到 Confluent Cloud 的 Kafka 集群,并将配置 Elasticsearch 以便它可以从 Confluent Cloud Elasticsearch Sink Connector 接收数据。

配置 Elastic Agent 以将数据发送到 Confluent Cloud

Elastic Fleet 简化了向 Kafka 和 Confluent Cloud 发送数据的过程。使用 Elastic Agent,可以轻松将 Kafka “输出” 附加到来自代理的所有数据,也可以将其仅应用于来自特定数据源的数据。

在左侧导航中找到 Fleet,单击 “Settings” 选项卡。在 “Settings” 选项卡上,找到 “Output” 部分,然后单击 “dd Output”。

执行以下步骤来配置新的 Kafka output:

  • 为 output 提供 Name
  • 将 Type 设置为 Kafka
  • 使用我们之前记下的 Bootstrap Server 地址填充主机字段。
  • 在身份验证下,使用 API 密钥填充用户名,使用我们之前记下的密码填充密码

  • 在 Topics 下,选择 Dynamic Topic,并将 Topic from field 设置为 data_stream.type

  • 单击 “Save and apply settings”

接下来,我们将导航到 Fleet 中的 “Agent Policies” 选项卡,然后单击以编辑我们要将 Kafka 输出附加到的代理策略。打开代理策略后,单击 “Settings” 选项卡,然后将 Output for integrations 和 Output for agent monitoring 更改为我们刚刚创建的 Kafka 输出。

为每个 Elastic 集成选择一个输出:要设置用于特定数据源的 Kafka 输出,请参阅集成级输出文档。

关于主题选择的说明:data_stream.type 字段是一个保留字段,如果我们发送的数据是日志,Elastic Agent 会自动将其设置为 logs,如果我们发送的数据是指标,则设置为 metrics。使用 data_stream.type 启用 Dynamic Topic 选择将导致 Elastic Agent 自动将指标路由到 metrics 主题,并将日志路由到 logs 主题。有关主题选择的信息,请参阅 Kafka 输出的主题设置文档。

在 Elasticsearch 中配置发布端点

接下来,我们将为 Confluent Cloud Sink Connector 设置两个发布端点(数据流),以便在将文档发布到 Elasticsearch 时使用:

  • 我们将创建一个数据流 logs-kafka.reroute-default 用于处理 logs
  • 我们将创建一个数据流 metrics-kafka.reroute-default 用于处理 metrics

如果我们让这些数据流中的数据保持原样,数据虽然可用,但我们会发现数据未经解析且缺乏重要的丰富内容。因此,我们还将创建两个索引模板和两个摄取管道,以确保数据由我们的 Elastic Integrations 处理。

创建 Elasticsearch 索引模板和摄取管道

以下步骤使用 Kibana 中的 Dev Tools,但所有这些步骤都可以通过 REST API 或使用 Stack Management 中的相关用户界面完成。

首先,我们将创建用于处理 logs 的索引模板和摄取管道:

PUT _index_template/logs-kafka.reroute
{
  "template": {
    "settings": {
      "index.default_pipeline": "logs-kafka.reroute"
    }
  },
  "index_patterns": [
    "logs-kafka.reroute-default"
  ],
  "data_stream": {}
}
PUT _ingest/pipeline/logs-kafka.reroute
{
  "processors": [
    {
      "reroute": {
        "dataset": [
          "{
  
  {data_stream.dataset}}"
        ],
        "namespace": [
          "{
  
  {data_stream.namespace}}"
        ]
      }
    }
  ]
}

接下来,我们将创建用于处理 metrics 的索引模板和提取管道:

PUT _index_template/metrics-kafka.reroute
{
  "template": {
    "settings": {
      "index.default_pipeline": "metrics-kafka.reroute"
    }
  },
  "index_patterns": [
    "metrics-kafka.reroute-default"
  ],
  "data_stream": {}
}
PUT _ingest/pipeline/metrics-kafka.reroute
{
  "processors": [
    {
      "reroute": {
        "dataset": [
          "{
  
  {data_stream.dataset}}"
        ],
        "namespace": [
          "{
  
  {data_stream.namespace}}"
        ]
      }
    }
  ]
}

关于 rerouting 的说明:下面是一个实际示例,其中与 Linux 网络指标相关的文档将首先进入 metrics-kafka.reroute-default,然后此 Ingest Pipeline 将检查该文档并发现 data_stream.dataset 设置为 system.network,data_stream.namespace 设置为 default。它将使用这些值将文档从 metrics-kafka.reroute-default 重新路由到 metrics-system.network-default,然后由系统集成进行处理。

配置 Confluent Cloud Elasticsearch Sink 连接器

现在是时候配置 Confluent Cloud Elasticsearch Sink 连接器了。我们将执行以下步骤两次并创建两个单独的连接器,一个用于 logs 的连接器,一个用于 metrics 的连接器。如果所需的设置不同,我们将突出显示正确的值。

导航到 Confluent Cloud 中的 Kafka 集群,然后从左侧导航菜单中选择 Connectors。在 Connectors 页面上,从可用的连接器目录中选择 Elasticsearch Service Sink。

Confluent Cloud 为用户提供了配置连接器的简化工作流程。这里我们将逐步介绍该过程的每个步骤:

步骤 1:主题选择

首先,我们将根据要部署的连接器选择连接器将从中消费数据的主题:

  • 部署用于 logs 的 Elasticsearch Sink 连接器时,请选择 logs 主题。
  • 部署用于 metrics 的 Elasticsearch Sink 连接器时,请选择 metrics 主题。

步骤 2:Kafka 凭据

选择 KAFKA_API_KEY 作为集群身份验证模式。提供前面提到的 API Key 和 Secret

我们收集所需的 Confluent Cloud Cluster 信息。

步骤 3:身份验证

提供我们的 Elasticsearch 集群的 Elasticsearch Endpoint 地址作为连接 URI。连接用户和连接密码是 Elasticsearch 中帐户的身份验证信息,Elasticsearch Sink Connector 将使用这些信息将数据写入 Elasticsearch。

步骤 4:配置

在此步骤中,我们将 Input Kafka record value format 设置为 JSON。接下来,展开 Advanced Configuration。

  1. 我们将 Data Stream Dataset 设置为 kafka.reroute
  2. 我们将根据要部署的连接器设置 Data Stream Type:
    • 在为 logs 部署 Elasticsearch Sink 连接器时,我们将 Data Stream Type 设置为 logs
    • 在为 metrics 部署 Elasticsearch Sink 连接器时,我们将Data Stream Type 设置为 metrics
  3. 其他设置的正确值将取决于具体环境。

步骤 5:调整大小

在此步骤中,请注意 Confluent Cloud 为我们的部署提供了建议的最少任务数。遵循此处的建议是大多数部署的良好起点。

步骤 6:检查和启动

查看 Connector configuration 和 Connector pricing 部分,如果一切正常,则是时候单击 continue 并启动连接器了!连接器可能会报告为配置,但很快就会开始使用来自 Kafka 主题的数据并将其写入 Elasticsearch 集群。

你现在可以导航到 Kibana 中的发现并找到流入 Elasticsearch 的日志!还请查看 Confluent Cloud 为你的新 Elasticsearch Sink Connector 部署提供的实时指标。

如果你只部署了第一个 logs 接收器连接器,你现在可以重复上述步骤来部署第二个 metrics 接收器连接器。

享受完全托管的数据采集架构

如果你遵循上述步骤,那么恭喜你。你已成功:

  1. 配置 Elastic Agent 以将日志和指标发送到 Kafka 中的专用主题
  2. 在 Elasticsearch 中创建发布端点(数据流),专用于处理来自 Elasticsearch Sink Connector 的数据
  3. 配置托管 Elasticsearch Sink Connector 以使用来自多个主题的数据并将该数据发布到 Elasticsearch

接下来,你应该启用其他集成,部署更多 Elastic Agent,在 Kibana 中探索你的数据,并享受 Elastic Serverless 和 Confluent Cloud 完全托管的数据采集架构带来的好处!

原文:Deploying Elastic Agent with Confluent Cloud's Elasticsearch Connector — Elastic Observability Labs

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2283844.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

JDK17 HashMap

HashMap ArrayList用动态数组存放元素,而HashMap用动态数组(桶)存储键值对。 如果两个键值对映射到桶同一个索引,则称为散列冲突。HashMap采用拉链法解决冲突,即桶中每个索引指向一个链表或者红黑树,多个键…

ansible自动化运维实战--通过role远程部署nginx并配置(8)

文章目录 1、准备工作2、创建角色结构3、编写任务4、准备配置文件(金甲模板)5、编写变量6、编写处理程序7、编写剧本8、执行剧本Playbook9、验证-游览器访问每台主机的nginx页面 在 Ansible 中,使用角色(Role)来远程部…

Python网络自动化运维---用户交互模块

文章目录 目录 文章目录 前言 实验环境准备 一.input函数 代码分段解析 二.getpass模块 前言 在前面的SSH模块章节中,我们都是将提供SSH服务的设备的账户/密码直接写入到python代码中,这样很容易导致账户/密码泄露,而使用Python中的用户交…

计算机的错误计算(二百二十二)

摘要 利用大模型化简计算 实验表明,虽然结果正确,但是,大模型既绕了弯路,又有数值计算错误。 与前面相同,再利用同一个算式看看另外一个大模型的化简与计算能力。 例1. 化简计算摘要中算式。 下面是与一个大模型的…

mybatis(78/134)

前天学了很多&#xff0c;关于java的反射机制&#xff0c;其实跳过了new对象&#xff0c;然后底层生成了字节码&#xff0c;创建了对应的编码。手搓了一遍源码&#xff0c;还是比较复杂的。 <?xml version"1.0" encoding"UTF-8" ?> <!DOCTYPE …

数据分箱 baggingboosting onehot独热编码 woe编码 sklearn的ensemble(集成学习)

目录 数据分箱就是将连续变量离散化。 bagging&boosting onehot独热编码 独热编码的结果如下&#xff1a; woe编码 WOE编码的基本原理 步骤一&#xff1a;计算WOE 步骤二&#xff1a;应用WOE WOE编码的优点 示例 数据示例 步骤一&#xff1a;计算每个类别的违约…

企业微信开发010_使用WxJava企业微信开发框架_封装第三方应用企业微信开发003_并且实现多企业授权访问---企业微信开发012

继续来看吧,上一节,已经把config部分,代码都拿过来了: 并且把企业微信第三方应用开发部分,对应的config的配置,mutiltp 代码拿过来了,并且把yml中的配置也给出了. 然后,这里说一下config中的内容,到时候自己看也可以看懂 其实就是封装了,当系统启动,加载企微模块,这个时候,会…

Office2021下载与安装保姆级教程【Office Tool Plus】

Office Tool Plus安装Office2021 下载Office Tool Plus安装OfficeⅠ. 清除旧版本Ⅱ. 配置安装参数Ⅲ. 安装许可证Ⅳ. 激发&#xff08;JH&#xff09;Office 本文介绍使用Office Tool Plus工具下载、安装、部署Office 2021全过程。 下载Office Tool Plus OfficeToolPlus是一个…

Unity在WebGL中拍照和录视频

原工程地址https://github.com/eangulee/UnityWebGLRecoder Unity版本2018.3.6f1&#xff0c;有点年久失修了 https://github.com/xue-fei/Unity.WebGLRecorder 修改jslib适配了Unity2021 效果图 录制的视频 Unity在WebGL中拍照和录视频

【外文原版书阅读】《机器学习前置知识》1.线性代数的重要性,初识向量以及向量加法

目录 ​编辑 ​编辑 1.Chapter 2 Why Linear Algebra? 2.Chapter 3 What Is a Vector? 个人主页&#xff1a;Icomi 大家好&#xff0c;我是Icomi&#xff0c;本专栏是我阅读外文原版书《Before Machine Learning》对于文章中我认为能够增进线性代数与机器学习之间的理解的…

SpringBoot开发(二)Spring Boot项目构建、Bootstrap基础知识

1. Spring Boot项目构建 1.1. 简介 基于官方网站https://start.spring.io进行项目的创建. 1.1.1. 简介 Spring Boot是基于Spring4框架开发的全新框架&#xff0c;设计目的是简化搭建及开发过程&#xff0c;并不是对Spring功能上的增强&#xff0c;而是提供了一种快速使用Spr…

【PyTorch】4.张量拼接操作

个人主页&#xff1a;Icomi 在深度学习蓬勃发展的当下&#xff0c;PyTorch 是不可或缺的工具。它作为强大的深度学习框架&#xff0c;为构建和训练神经网络提供了高效且灵活的平台。神经网络作为人工智能的核心技术&#xff0c;能够处理复杂的数据模式。通过 PyTorch&#xff0…

新电脑安装系统找不到硬盘原因和解决方法来了

有不少网友反馈新电脑采用官方u盘方式装win10或win100出现找不到硬盘是怎么回事&#xff1f;后来研究半天发现是bios中开启了rst(vmd)模式。如果关闭rst模式肯定是可以安装的&#xff0c;但这会影响硬盘性能&#xff0c;有没有办法解决开启rst模式的情况安装win10或win11呢&…

「 机器人 」仿生扑翼飞行器中的“被动旋转机制”概述

前言 在仿生扑翼飞行器的机翼设计中,模仿昆虫翼的被动旋转机制是一项关键技术。其核心思想在于:机翼旋转角度(攻角)并非完全通过主动伺服来控制,而是利用空气动力和惯性力的作用,自然地实现被动调节。以下对这种设计的背景、原理与优势进行详细说明。 1. 背景:昆虫的被动…

Android GLSurfaceView 覆盖其它控件问题 (RK平台)

平台 涉及主控: RK3566 Android: 11/13 问题 在使用GLSurfaceView播放视频的过程中, 增加了一个播放控制面板, 覆盖在视频上方. 默认隐藏setVisibility(View.INVISIBLE);点击屏幕再显示出来. 然而, 在RK3566上这个简单的功能却无法正常工作. 通过缩小视频窗口可以看到, 实际…

【C++】类和对象(五)

1、初始化列表 作用&#xff1a;C提供了初始化列表语法&#xff0c;用来初始化属性。 语法&#xff1a; 构造函数&#xff08;&#xff09;&#xff1a;属性1&#xff08;值1&#xff09;&#xff0c;属性2&#xff08;值2&#xff09;...{}示例&#xff1a; #include<i…

Maven的下载安装配置

maven的下载安装配置 maven是什么 Maven 是一个用于 Java 平台的 自动化构建工具&#xff0c;由 Apache 组织提供。它不仅可以用作包管理&#xff0c;还支持项目的开发、打包、测试及部署等一系列行为 Maven的核心功能 项目构建生命周期管理&#xff1a;Maven定义了项目构建…

Mysql主从复制+MHA实验笔记[特殊字符]

目录 基本概念 工作原理 优势 环境准备&#xff1a;四台centos-其中三台mysql&#xff0c;一台MHA 配置一主两从 安装MHA 配置无密码认证 配置MHA 模拟master故障 基本概念 MySQL 主从复制&#xff1a;是 MySQL 数据库中实现数据冗余、数据备份和高可用性的重要技术手…

面向长文本的多模型协作摘要架构:多LLM文本摘要方法

多LLM摘要框架在每轮对话中包含两个基本步骤:生成和评估。这些步骤在多LLM分散式摘要和集中式摘要中有所不同。在两种策略中,k个不同的LLM都会生成多样化的文本摘要。然而在评估阶段,多LLM集中式摘要方法使用单个LLM来评估摘要并选择最佳摘要,而分散式多LLM摘要则使用k个LLM进行…

Python中容器类型的数据(上)

若我们想将多个数据打包并且统一管理&#xff0c;应该怎么办? Python内置的数据类型如序列(列表、元组等)、集合和字典等可以容纳多项数据&#xff0c;我们称它们为容器类型的数据。 序列 序列 (sequence) 是一种可迭代的、元素有序的容器类型的数据。 序列包括列表 (list)…