Elasticsearch 集成--Flink 框架集成

news2024/12/29 7:51:18

一、Flink 框架介绍

       Apache Spark 是一种基于内存的快速、通用、可扩展的大数据分析计算引擎。
Apache Spark 掀开了内存计算的先河,以内存作为赌注,赢得了内存计算的飞速发展。
但是在其火热的同时,开发人员发现,在 Spark 中,计算框架普遍存在的缺点和不足依然没
有完全解决,而这些问题随着 5G 时代的来临以及决策者对实时数据分析结果的迫切需要而
凸显的更加明显:
  •  数据精准一次性处理(Exactly-Once
  • 乱序数据,迟到数据
  •  低延迟,高吞吐,准确性
  •  容错性
        Apache Flink 是一个框架和分布式处理引擎,用于对无界和有界数据流进行有状态计算。在
Spark 火热的同时,也默默地发展自己,并尝试着解决其他计算框架的问题。
慢慢地,随着这些问题的解决, Flink 慢慢被绝大数程序员所熟知并进行大力推广,阿里公
司在 2015 年改进 Flink ,并创建了内部分支 Blink ,目前服务于阿里集团内部搜索、推荐、
广告和蚂蚁等大量核心实时业务。

二、框架集成

2.1创建 Maven 项目

依赖

<?xml version="1.0" encoding="UTF-8"?>
<project
        xmlns="http://maven.apache.org/POM/4.0.0"
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>com.lun.es</groupId>
    <artifactId>flink-elasticsearch</artifactId>
    <version>1.0</version>
    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-scala_2.12</artifactId>
            <version>1.12.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-scala_2.12</artifactId>
            <version>1.12.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_2.12</artifactId>
            <version>1.12.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-elasticsearch7_2.11</artifactId>
            <version>1.12.0</version>
        </dependency>
        <!-- jackson -->
        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-core</artifactId>
            <version>2.11.1</version>
        </dependency>
    </dependencies>
</project>

功能实现

package com.xmx.es;

import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
import org.apache.flink.streaming.connectors.elasticsearch7.ElasticsearchSink;
import org.apache.http.HttpHost;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.Requests;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

public class FlinkElasticsearchSinkTest {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<String> source = env.socketTextStream("localhost", 9999);
        List<HttpHost> httpHosts = new ArrayList<>();
        httpHosts.add(new HttpHost("127.0.0.1", 9200, "http"));
        //httpHosts.add(new HttpHost("10.2.3.1", 9200, "http"));

        // use a ElasticsearchSink.Builder to create an ElasticsearchSink
        ElasticsearchSink.Builder<String> esSinkBuilder = new ElasticsearchSink.Builder<>(httpHosts,
                new ElasticsearchSinkFunction<String>() {
                    public IndexRequest createIndexRequest(String element) {
                        Map<String, String> json = new HashMap<>();
                        json.put("data", element);
                        return Requests.indexRequest()
                                .index("my-index")
                                //.type("my-type")
                                .source(json);
                    }

                    @Override
                    public void process(String element, RuntimeContext ctx, RequestIndexer indexer) {
                        indexer.add(createIndexRequest(element));
                    }
                }
        );

        // configuration for the bulk requests; this instructs the sink to emit after every element, otherwise they would be buffered
        esSinkBuilder.setBulkFlushMaxActions(1);

        // provide a RestClientFactory for custom configuration on the internally createdREST client
        // esSinkBuilder.setRestClientFactory(
        // restClientBuilder -> {
        // restClientBuilder.setDefaultHeaders(...)
        // restClientBuilder.setMaxRetryTimeoutMillis(...)
        // restClientBuilder.setPathPrefix(...)
        // restClientBuilder.setHttpClientConfigCallback(...)
        // }
        // );
        source.addSink(esSinkBuilder.build());
        env.execute("flink-es");
    }
}

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/939612.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

青蛙趣味支付页html源码

青蛙因生活所迫卖儿卖HTML单页源码.zip - 蓝奏云 源码自适应窗口&#xff0c;电脑和手机&#xff0c; 适合作为网站下载页&#xff0c;用于增加支付率 &#xff08;终于写好了&#xff0c;不太想写&#xff09;

3.3 运算符和表达式

前言&#xff1a; 几乎每一个程序都需要进行运算&#xff0c;对数据进行加工处理&#xff0c;否则程序就没有意义了。要进行运算&#xff0c;就需规定可以使用的运算符。C语言的运算符范围很宽&#xff0c;把除了控制语句和输入输出以外几乎所有的基本操作都作为运算符处理&am…

【百度之星2023】初赛第一场 补题(部分)

目录 BD202301 公园BD202302 蛋糕划分解法1TODO 解法2 TODO BD202303 第五维度TODO BD202304 流水线搭积木BD202305 糖果促销 不幸因为码蹄集客户端的bug&#xff0c;导致没法正常参与比赛&#xff0c;只好事后补了 BD202301 公园 样例输入&#xff1a; 4 4 3 1 2 8 8 1 4 2 …

AD如何进行汉化

AD如何进行汉化 通过安装好AD后&#xff0c;默认都是英文界面模式&#xff0c;如果想汉化为中文模式&#xff0c;需要点击“DXP”->“参数选择”&#xff0c;打开界面如下&#xff1a; 然后将上图“本地化”下面的方框勾选上&#xff0c;点击“应用”&#xff0c;“确定”…

Java通过报表技术JXL和POI实现Excel导入导出操作

前言 报表[forms for reporting to the higher organizations]&#xff0c;就是向上级报告情况的表格。简单的说&#xff1a;报表就是用表格、图表等格式来动态显示数据&#xff0c;可以用公式表示为&#xff1a;“报表 多样的格式 动态的数据”。 注意&#xff1a;使用附件…

删除流氓360首页

不管你使用什么浏览器都很容易中招360给你自动设置的流氓首页&#xff0c;流氓厂石锤了。 你在浏览器设置新的首页一样无效&#xff0c;比如 完全没有卵用&#xff0c;以前这样是可以生效的&#xff0c;最近几天突然不行了&#xff0c;这简直流氓的不行&#xff0c;而且 细心…

【Java 中级】一文精通 Spring MVC - JSON 处理(九)

&#x1f449;博主介绍&#xff1a; 博主从事应用安全和大数据领域&#xff0c;有8年研发经验&#xff0c;5年面试官经验&#xff0c;Java技术专家&#xff0c;WEB架构师&#xff0c;阿里云专家博主&#xff0c;华为云云享专家&#xff0c;51CTO 专家博主 ⛪️ 个人社区&#x…

【rar转zip】WinRAR转换压缩包格式

不知道大家有没有遇到需要转换压缩包格式的问题&#xff0c;今天想和大家分享rar压缩包改成zip格式的方法。 方法一&#xff1a; 直接修改rar压缩包的后缀名变为zip&#xff0c;就可以修改压缩包文件格式了 方法二&#xff1a; 先将rar压缩包解压出来&#xff0c;然后再将解…

Stable Diffusion WebUI 整合包

现在网络上出现的各种整合包只是整合了运行 Stable Diffusion WebUI&#xff08;以下简称为 SD-WebUI&#xff09;必需的 Python 和 Git 环境&#xff0c;并且预置好模型&#xff0c;有些整合包还添加了一些常用的插件&#xff0c;其实际与手动进行本地部署并没有区别。 不过&a…

热红外成像技术:未来将有更多技术突破推动应用发展

一、国外发展现状 热红外成像技术在国外得到了广泛的研究和应用。国外的研究机构和企业注重热红外成像技术在军事、环境监测、医疗等领域的应用研究&#xff0c;其中美国、欧洲和日本等国家在热红外成像技术方面处于领先地位。 美国在热红外成像技术方面拥有多个研究机构和公司…

后端面试话术集锦第三篇:spring cloud 相关面试话术

🚗后端面试集锦目录 💖后端面试话术集锦第一篇:spring面试话术💖 💖后端面试话术集锦第二篇:spring boot面试话术💖 💖后端面试话术集锦第三篇:spring cloud面试话术💖 💖后端面试话术集锦第四篇:ElasticSearch面试话术💖 1. 什么是Springcloud Spring …

全景图像生成算法

摘要 全景图像生成是计算机视觉领域的一个重要研究方向。本文对五种经典的全景图像生成算法进行综述&#xff0c;包括基于相机运动估计的算法、基于特征匹配的算法、基于图像切割的算法、基于多项式拟合的算法和基于深度学习的算法。通过对这些算法的原理、优缺点、适用场景等…

最新Burp Suite插件详解

Burp Suite中的插件 Burp Suite中存在多个插件&#xff0c;通过这些插件可以更方便地进行安全测试。插件可以在“BApp Store”&#xff08;“Extender”→“BApp Store”&#xff09;中安装&#xff0c;如图3-46所示。 下面列举一些常见的Burp Suite插件。 1&#xff0e;Act…

怎么在手机上开启提词器?这个方法轻松解决

在现代科技高度发达的时代&#xff0c;手机已经成为了人们生活中不可或缺的一部分。其中&#xff0c;开启提词器也是一种非常重要的工具&#xff0c;在我们需要面对手机录制讲解视频或者其他的一些演讲时&#xff0c;如果屏幕上可以有提词器那真是泰裤辣&#xff01;那么怎么在…

MySQL binlog的几种日志录入格式以及区别

&#x1f3c6;作者简介&#xff0c;黑夜开发者&#xff0c;CSDN领军人物&#xff0c;全栈领域优质创作者✌&#xff0c;CSDN博客专家&#xff0c;阿里云社区专家博主&#xff0c;2023年6月CSDN上海赛道top4。 &#x1f3c6;数年电商行业从业经验&#xff0c;历任核心研发工程师…

不能打电话的流量卡都是物联卡?别胡说八道了!

可能大家对流量卡有一种误区&#xff0c;认为只要是不能打电话&#xff0c;不能发信息的流量卡都是物联卡&#xff0c;其实这种理解是错的。今天小编要为流量卡正名。 为什么要说大家对流量卡有误区呢&#xff0c;其实&#xff0c;目前市面上常见的纯流量卡有两种&#xff0c;…

《C和指针》笔记12: 存储类型(自动变量、静态变量和寄存器变量)

文章目录 1. 自动变量&#xff08;auto&#xff09;1.1 自动变量的初始化 2. 静态变量&#xff08;static&#xff09;2.1 静态变量的初始化 3. 寄存器变量&#xff08;register&#xff09; 1. 自动变量&#xff08;auto&#xff09; 在代码块内部声明的变量的缺省存储类型是…

linux下安装Mycat

1 官网下载mycat 官方网站&#xff1a; 上海云业网络科技有限公司http://www.mycat.org.cn/ github地址&#xff1a; MyCATApache GitHubMyCATApache has 34 repositories available. Follow their code on GitHub.https://github.com/MyCATApache 2 Mycat安装 1 把MyCat…

新方案unity配表工具

工具下载&#xff1a;网盘链接 工具结构&#xff1a;针对每张表格生成一个表格类&#xff0c;其中默认包含一个list和字典类型参数记录表格数据&#xff0c;初始化项目时将list中的数据转为按id索引的dictionary&#xff0c;用于访问数据。额外包含一个同名Temp后缀的类&#…

AI助力智能安检,基于图像目标检测实现危险品X光智能安全检测系统

基于AI相关的技术来对一些重复性的但是又比较重要的工作来做智能化助力是一个非常有潜力的场景&#xff0c;关于这方面的项目开发实践在我之前的文章中也有不少的实践&#xff0c;感兴趣的话可以自行移步阅读即可&#xff1a;《AI助力智能安检&#xff0c;基于目标检测模型实现…