如何通过 Apache Camel 将数据导入 Elasticsearch

news2025/1/22 21:12:15

作者:来自 Elastic Andre Luiz

使用 Apache Camel 将数据提取到 Elasticsearch 的过程将搜索引擎的稳健性与集成框架的灵活性相结合。在本文中,我们将探讨 Apache Camel 如何简化和优化将数据提取到 Elasticsearch。为了说明此功能,我们将实现一个入门应用程序,逐步演示如何配置和使用 Apache Camel 将数据发送到 Elasticsearch。

什么是 Apache Camel?

Apache Camel 是一个开源集成框架,可简化不同系统的连接,使开发人员可以专注于业务逻辑,而不必担心系统通信的复杂性。Camel 的核心概念是 “routes - 路由”,它定义了消息从源到目的地所遵循的路径,可能包括转换、验证和过滤等中间步骤。

Apache Camel 架构

Camel 使用 “components- 组件” 连接不同的系统和协议,例如数据库和消息传递服务,并使用 “endpoints- 端点” 表示消息的入口点和出口点。这些概念提供了模块化和灵活的设计,使配置和管理复杂集成变得更加容易,高效且可扩展。

使用 Elasticsearch 和 Apache Camel

我们将演示如何配置一个简单的 Java 应用程序,该应用程序使用 Apache Camel 将数据导入 Elasticsearch 集群。还将介绍使用 Apache Camel 中定义的路由在 Elasticsearch 中创建、更新和删除数据的过程。

1. 添加依赖项

配置此集成的第一步是将必要的依赖项添加到项目的 pom.xml 文件中。这将包括 Apache Camel 和 Elasticsearch 库。我们将使用新的 Java API 客户端库,因此我们必须导入 camel-elasticsearch 组件,并且版本必须与 camel-core 库相同。

如果你想使用 Java 低级 Rest 客户端,则必须使用 Elasticsearch 低级 Rest 客户端组件。

<dependency>
   <groupId>org.apache.camel</groupId>
   <artifactId>camel-core</artifactId>
   <version>4.7.0</version>
</dependency>

<dependency>
   <groupId>org.apache.camel</groupId>
   <artifactId>camel-elasticsearch</artifactId>
   <version>4.7.0</version>
</dependency>

<dependency>
   <groupId>org.apache.camel</groupId>
   <artifactId>camel-jackson</artifactId>
   <version>4.7.0</version>
</dependency>

<dependency>
   <groupId>co.elastic.clients</groupId>
   <artifactId>elasticsearch-java</artifactId>
   <version>8.14.3</version>
</dependency>

2. 配置和运行 Camel 上下文

配置首先使用 DefaultCamelContext 类创建一个新的 Camel 上下文,该类是定义和执行路由的基础。接下来,我们配置 Elasticsearch 组件,这将允许 Apache Camel 与 Elasticsearch 集群交互。ESlasticsearchComponent 实例配置为连接到地址 localhost:9200,这是本地 Elasticsearch 集群的默认地址。对于需要身份验证的环境设置,你应该阅读有关如何配置组件和启用基本身份验证的文档,称为 “Configure the component and enable basic authentication - 配置组件和启用基本身份验证”。

public class ESComponent {

    public static ElasticsearchComponent getInstance() {
        var elasticsearch = new ElasticsearchComponent();
        elasticsearch.setHostAddresses("localhost:9200");
        return elasticsearch;
    }

    public static String getName() {
        return "elasticsearch";
    }
}

然后将该组件添加到 Camel 上下文中,使得定义的路由能够使用该组件在 Elasticsearch 中执行操作。

try (var context = new DefaultCamelContext()) {
   context.addComponent(ESComponent.getName(), ESComponent.getInstance());
   context.addRoutes(new OperationBulkRoute());
   context.start();
}

随后,将路由添加到上下文中。我们将创建用于批量索引、更新和删除文档的路由。

3. 配置 Camel 路由

数据索引

我们将配置的第一个路由用于数据索引。我们将使用包含电影目录的 JSON 文件。路由将配置为读取位于 src/main/resources/movies.json 的文件,将 JSON 内容反序列化为 Java 对象,然后应用聚合策略将多条消息合并为一条,从而允许在 Elasticsearch 中进行批量操作。配置了每条消息 500 个项目的大小,即批量将一次索引 500 部电影。

路由 Elasticsearch 操作 bulk:

String URI_BULK_OPERATION = String
       .format("elasticsearch://elasticsearch?operation=%s&indexName=%s",
               IndexOperationConfig.BULK_OPERATION,
               INDEX_NAME);
public class OperationBulkRoute extends RouteBuilder {
   private static final Log log = LogFactory.getLog(OperationBulkRoute.class);
   private static final int BULK_SIZE = 500;

   @Override
   public void configure() {
       from("file:src/main/resources?fileName=movies.json&noop=true")
               .routeId("route-bulk-ingest")
               .unmarshal().json()
               .split(body())
               .aggregate(constant(true), new BulkAggregationStrategy())
               .completionSize(BULK_SIZE)
               .to(URI_BULK_OPERATION)
               .process(exchange -> {
                   var body = exchange.getIn().getBody(String.class);
                   log.info(String.format("Response: %s", body));
               })
               .end();
   }
}

这批文档将被发送到 Elasticsearch 的批量操作端点。这种方法可确保处理大量数据时的效率和速度。

数据更新

下一个路由是更新文档。我们在上一步中索引了一些电影,现在我们将创建新的路由,通过参考代码搜索文档,然后更新评级字段。

我们设置了一个 Camel 上下文 (DefaultCamelContext),其中注册了一个 Elasticsearch 组件,并添加了一个自定义路由 IngestionRoute。操作首先通过 ProducerTemplate 发送文档代码,然后从 direct:update-ingestion 端点启动路由。

try (var context = new DefaultCamelContext()) {
    context.addComponent(ESComponent.getName(), ESComponent.getInstance());
    context.addRoutes(new IngestionRoute());
    context.start();
    ProducerTemplate producerTemplate = context.createProducerTemplate();
    producerTemplate.sendBody("direct:update-ingestion", documentCode);
    Thread.sleep(5000);
}

接下来,我们有 IngestionRoute,它是此流程的输入端点。该路由执行几个流水线操作。首先,在 Elasticsearch 中进行搜索以按代码 (direct:search-by-id) 定位文档,其中 SearchByCodeProcessor 根据代码组装查询。然后,检索到的文档由 UpdateRatingProcessor 处理,它将结果转换为 Movie 对象,将电影评级(movie rating)更新为特定值,并准备将更新后的文档发送回 Elasticsearch 进行更新。

public class IngestionRoute extends RouteBuilder {
    private static final Log log = LogFactory.getLog(IngestionRoute.class);

    @Override
    public void configure() throws Exception {

        from("direct:update-ingestion")
                .pipeline()
                .to("direct:search-by-id")
                .to(URI_SEARCH_OPERATION)
                .to("direct:update-rating")
                .to(URI_UPDATE_OPERATION)
                .process(exchange -> {
                    var body = exchange.getIn().getBody(String.class);
                    log.info(String.format("Response: %s", body));
                })
                .end();

        from("direct:search-by-id")
                .process(new SearchByCodeProcessor());

        from("direct:update-rating")
                .process(new UpdateRatingProcessor());
    }
}

SearchByCodeProcessor 处理器仅配置为执行搜索查询:

public class SearchByCodeProcessor implements Processor {
    @Override
    public void process(Exchange exchange) throws Exception {
        var code = exchange.getIn().getBody();

        String query = "{\n" +
                "  \"query\": {\n" +
                "   \"term\": {\n" +
                "     \"code\": {\n" +
                "       \"value\":" + code + "\n" +
                "     }\n" +
                "   }\n" +
                "  }\n" +
                "}";
        exchange.setProperty("document_code", code);
        exchange.getIn().setBody(query);
    }
}

UpdateRatingProcessor 处理器负责更新评级字段。

public class UpdateRatingProcessor implements Processor {

    private final ObjectMapper objectMapper;

    public UpdateRatingProcessor() {
        this.objectMapper = new ObjectMapper();
        this.objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
    }

    @Override
    public void process(Exchange exchange) throws Exception {

        HitsMetadata response = exchange.getIn().getBody(HitsMetadata.class);
        var code = Long.parseLong(exchange.getProperty("document_code").toString());

        if (response != null && response.hits() != null) {

            var documents = parseToMovies(response);

            var optionalMovie = documents.stream()
                    .filter(document -> code == (document.getSource().getCode())).findAny();

            optionalMovie.ifPresent(document -> {
                document.getSource().setRating(13.0);
                Map<String, Object> updateMap = new HashMap<>();
                updateMap.put("doc", document.getSource());
                exchange.getIn().setHeader("indexId", document.getId());
                exchange.getIn().setBody(updateMap);
            });
        }
    }

数据删除

最后,配置删除文档的路由。在这里,我们将使用文档的 ID 删除文档。在 Elasticsearch 中,要删除文档,我们需要知道文档标识符、存储文档的索引并执行删除请求。在 Apache Camel 中,我们将通过创建新路由来执行此操作,如下所示。

路由从 direct:op-delete 端点开始,该端点作为入口点。当需要删除文档时,将在消息正文中收到其标识符 (_id)。然后,路由使用 simple("${body}") 将 indexId 标头设置为该标识符的值,这会从消息正文中提取 _id。

public class OperationDeleteRoute extends RouteBuilder {
   private static final Log log = LogFactory.getLog(OperationDeleteRoute.class);

   @Override
   public void configure() {
       from("direct:op-delete")
               .routeId("route-delete")
               .setHeader("indexId", simple("${body}"))
               .to(URI_DELETE_OPERATION)
               .process(exchange -> {
                   var body = exchange.getIn().getBody(String.class);
                   log.info(String.format("Response: %s", body));
               })
               .end();
       ;
   }
}
String URI_DELETE_OPERATION = String
       .format("elasticsearch://elasticsearch?operation=%s&indexName=%s",
               IndexOperationConfig.DELETE_OPERATION,
               INDEX_NAME);

最后,消息被定向到URI_DELETE_OPERATION指定的端点,该端点连接到 Elasticsearch 以执行相应索引中的文档删除操作。
现在我们已经创建了路由,我们可以创建一个 Camel 上下文(DefaultCamelContext),它被配置为包含 Elasticsearch 组件。

try (var context = new DefaultCamelContext()) {
   context.addComponent(ESComponent.getName(), ESComponent.getInstance());
   context.addRoutes(new OperationDeleteRoute());
   context.start();
   ProducerTemplate producerTemplate = context.createProducerTemplate();
   producerTemplate.sendBody("direct:op-delete", documentId);
}

接下来,将 OperationDeleteRoute 类定义的删除路由(delete route)添加到上下文中。初始化上下文后,使用 ProducerTemplate 将应删除的文档的标识符传递给 direct:op-delete 端点,从而触发删除路由。

结论

Apache Camel 与 Elasticsearch 之间的集成允许实现强大而高效的数据提取,利用 Camel 的灵活性来定义可以处理不同数据操作场景(例如索引、更新和删除)的路由。通过此设置,你可以以可扩展的方式编排和自动化复杂流程,确保你的数据在 Elasticsearch 中得到有效管理。此示例演示了如何将这些工具一起使用来创建高效且适应性强的数据提取解决方案。

参考资料

  • Apache Camel
  • Apache Camel 架构
  • 聚合 Apache Camel
  • 文件组件
  • Elasticsearch 组件


准备好自己尝试一下了吗?开始免费试用。
想要获得 Elastic 认证吗?了解下一期 Elasticsearch 工程师培训何时开始!

原文:https://www.elastic.co/search-labs/blog/elasticsearch-apache-camel-ingest-data

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2123299.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

5G网络建设

题目描述 现需要在基城市进行5G网络建设&#xff0c;已经选取N个地点设置5G基站&#xff0c;编号固定为1到N&#xff0c;接下来需要各个基站之间使用光纤进行连接以确保基 站能互联互通&#xff0c;不同基站之间假设光纤的成本各不相同&#xff0c;且有些节点之间已经存在光纤…

8个动态着陆页案例及最佳实践

动态着陆页是一种让市场营销人员在不必因成百上千变量而抓狂的情况下&#xff0c;利用个性化力量的绝佳方式&#xff0c;从而让他们能够扩大努力并增长业务。使用像光年AI这样的平台&#xff0c;可以更方便地实现这一目标。 在这篇文章中&#xff0c;您将了解到&#xff1a; …

非监督式机器学习:群集

聚类分析是一种非监督式机器学习形式&#xff0c;在此形式下&#xff0c;基于观察值的数据值或特征的相似性&#xff0c;将观察值分组到群集中。 这种就是非监督式机器学习&#xff0c;因为它不使用先前已知的标签值来训练模型。 在聚类分析模型中&#xff0c;标签是群集&#…

【Nacos】健康检查与环境隔离

1. 健康检测 1.1 两种健康检查机制 Nacos作为注册中心,需要感知服务的健康状态,才能为服务调用方提供良好的服务 Nacos 中提供了两种健康检查机制: 1. 客户端主动上报机制 客户端通过心跳上报方式告知服务端(nacos注册中心)健康状态,默认心跳间隔5秒:nacos会在超过15秒未收…

内网穿透之EW使用、判断服务器是否出网

环境搭建 使用的是下面文章的环境 记一次学习--内网穿透-CSDN博客 ew代理 然后同样通过thinkphp漏洞写入文件&#xff0c;然后通过蚁剑连接 然后上传ew的Linux版本&#xff0c;然后加权执行 一层代理 正向代理 设置正向代理&#xff08;在ubuntu上&#xff09;&#xff0…

React 发现无webpack相关的配置的目录,使用eject进行创建, 安装插件需要进行配置

React 发现无webpack相关的配置的目录&#xff0c;进行创建&#xff0c; 安装插件需要进行配置 react脚手架将webpack相关的配置隐藏起来了&#xff0c;如果想要看到webpack的配置可以执行package.json文件中的一个脚本&#xff1a;“eject”: “react-scripts eject”&#x…

流媒体之HLS协议(其三)

欢迎诸位来阅读在下的博文~ 在这里&#xff0c;在下会不定期发表一些浅薄的知识和经验&#xff0c;望诸位能与在下多多交流&#xff0c;共同努力&#xff01; 江山如画&#xff0c;客心如若&#xff0c;欢迎到访&#xff0c;一展风采 文章目录 前期博客参考书籍一、HLS协议简…

TwinCAT3 实时核中ADS实现C++ server、clinet数据传输

一、基本概念 ADS &#xff1a;Automation Device Specification&#xff0c;ADS设备间进行通信的协议规范。协议定义了ADS device之间如何寻址对方、ADS device之间可以执行哪些操作、执行这些操作需要哪些参数&#xff0c;以及操作完成后如何返回结果等。从编程角度看&#…

SVM 监督学习

一、分类问题 利用一条直线分类存在很多问题 二、SVM 支持向量机 其核心思想是通过在特征空间中找到一个最优的超平面来进行分类&#xff0c;并且间隔最大。分类面尽可能远离样本点&#xff0c;宽度越大越好。 适用于中小型复杂数据集的分类。 三、硬间隔和软间隔 硬&#x…

Android Studio -> Android Studio 获取release模式和debug模式的APK

Android Studio上鼠标修改构建类型 Release版本 激活路径&#xff1a;More tool windows->Build Variants->Active Build Variant->releaseAPK路径&#xff1a;Project\app\build\intermediates\apk\app-release.apk Debug版本 激活路径&#xff1a;More tool w…

linux上使用rpm的方式安装mysql

1.从mysql官网上下载需要的版本&#xff0c;根据操作系统版本&#xff0c;CPU架构&#xff0c;下载让rpm bundle,这个版本是个完整版&#xff0c;包含其他所有版本 上传到服务器的一个目录&#xff0c;进行解压 执行tar -xvf mysql*.tar tar -xvf mysql*.tar 2.卸载老版本m…

【Canvas与电脑桌面】用六角回旋镖铺满一个平面(1920*1080)

【成图】 【代码】 <!DOCTYPE html> <html lang"utf-8"> <meta http-equiv"Content-Type" content"text/html; charsetutf-8"/> <head><title>六角回旋镖桌面1920x1080</title><style type"text/cs…

动手学深度学习(pytorch)学习记录27-深度卷积神经网络(AlexNet)[学习记录]

目录 创建模型读取数据集训练AlexNet AlexNet 是由 Alex Krizhevsky、Ilya Sutskever 和 Geoffrey Hinton 在 2012 年提出的深度卷积神经网络&#xff0c;它在当年的 ImageNet 大规模视觉识别挑战赛&#xff08;ILSVRC&#xff09;中取得了显著的成绩&#xff0c;从而引起了深度…

动手学深度学习(pytorch土堆)-02TensorBoard的使用

1.可视化 代码使用了 torch.utils.tensorboard 将数据记录到 TensorBoard 以便可视化。具体来说&#xff0c;它将标量数据记录到目录 logs 中&#xff0c;使用的是 SummaryWriter 类。 代码分解如下&#xff1a; SummaryWriter("logs")&#xff1a;初始化一个 Ten…

常用的 git命令的使用

一. 简介 本文简单学习一下&#xff0c;在从远程仓库中拉取代码&#xff0c;或者向远程仓库提交代码时&#xff0c;经常用到的一些 git命令。 二. git的其他命令的使用 1. 重新提交代码的命令 当已经提交过一笔代码&#xff0c;并经过了 CI自动化编译通过。这时可能发现…

WebAPI(二)、DOM事件监听、事件对象event、事件流、事件委托、页面加载与滚动事件、client,offset

文章目录 一、 DOM事件1. 事件监听2. 事件类型(1)、鼠标事件(2)、焦点事件(3)、键盘事件(4)、文本事件 3. 事件对象(1)、获取事件对象(2)、事件对象常用属性 4. 环境对象 this5. 回调函数 二、 DOM事件进阶1. 事件流(1)、 捕获阶段(2)、 冒泡阶段(3)、 阻止冒泡(4) 、阻止元素默…

python绘制3D瀑布图

成品&#xff1a; 代码&#xff1a; def line_3d(x, y, z, x_label_indexs):"""在y轴的每个点&#xff0c;向x轴的方向延伸出一个折线面&#xff1a;展示每个变量的时序变化。x: x轴&#xff0c;时间维&#xff0c;右边。y: y轴&#xff0c;变量维&#xff0c;…

前端:JavaScript 实现类

文章目录 1. Es6-类-class2. Es6-class 实现继承3. Es6-class 静态属性和私有属性4. Es5-寄生组合式继承 1. Es6-类-class 类是创建对象的模板&#xff0c;用代码封装数据以处理该数据&#xff0c;js中的类建立在原型上。 如何定义类&#xff0c;首先需要关键字 class&#x…

C++之打造my vector篇

目录 前言 1.参照官版&#xff0c;打造vector的基本框架 2.丰富框架&#xff0c;实现接口方法 基本的迭代器实现 数据的[]访问 容量和数据空间的改变 vector空间大小的返回与判空 数据的增删 数据打印 拷贝构造和赋值重载 3.扩展延伸&#xff0c;深度理解代码 迭代器…

iText2KG:显著降低LLM构建知识图谱时的幻觉现象

1. 当前知识图谱构建存在的问题 知识图谱通过捕捉实体之间的关系来构建知识的结构化表示&#xff0c;在分析文本数据集和从结构化异构数据中推断知识方面具有显著优势。比如&#xff0c;知识图谱能够融合来自多个来源的不同数据&#xff0c;提供一个具有凝聚力的信息视角。还能…