Flink 输出至 Elasticsearch

news2024/11/25 20:51:55

【1】引入pom.xml依赖

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-elasticsearch6_2.12</artifactId>
    <version>1.10.0</version>
</dependency>

【2】ES6 Scala代码,自动导入的scala包需要修改为scala._ 否则会出现错误。

package com.zzx.flink

import java.util

import org.apache.flink.api.common.functions.RuntimeContext
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.elasticsearch.{ElasticsearchSinkFunction, RequestIndexer}
import org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink
import org.apache.http.HttpHost
import org.elasticsearch.client.Requests


object EsSinkTest {
  def main(args: Array[String]): Unit = {
    // 创建一个流处理执行环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    //从文件中读取数据并转换为 类
    val inputStreamFromFile: DataStream[String] = env.readTextFile("E:\\Project\\flink\\src\\main\\resources\\wordcount.txt")
    //转换
    val dataStream: DataStream[SensorReading] = inputStreamFromFile
      .map( data => {
        var dataArray = data.split(",")
        SensorReading(dataArray(0),dataArray(1).toLong,dataArray(2).toDouble)
      })

    //定义一个 HttpHosts
    val httpHost = new util.ArrayList[HttpHost]()
    //默认 9200 我的修改为了 9201
    httpHost.add(new HttpHost("192.168.1.12",9200,"http"))
    httpHost.add(new HttpHost("127.0.0.1",9200,"http"))
    //定义一个 ElasticSearchFuntion 操作 es的function
    val esSinkFunc = new ElasticsearchSinkFunction[SensorReading] {
      //element 每一条数据 通过 index 发送
      override def process(element: SensorReading, runtimeContext: RuntimeContext, index: RequestIndexer): Unit = {
        //包装写入 es 的数据
        val dataSource = new util.HashMap[String,String]()
        dataSource.put("sensor_id",element.id)
        dataSource.put("temp",element.temperature.toString)
        dataSource.put("ts",element.timestamp.toString)

        //index
        val indexRequest = Requests.indexRequest()
            .index("sensor_temp")
            .`type`("readingdata")
            .source(dataSource)
        index.add(indexRequest)
        println("saved successfully " + element.toString)
      }
    }
    //输出值 es
    dataStream.addSink(new ElasticsearchSink.Builder[SensorReading](httpHost,esSinkFunc).build())
    env.execute("es")
  }
}

【3】ES6输出展示

​ [点击并拖拽以移动] ​​

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1343043.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

数组形式的整数加法C语言❤

一、题目&#xff1a; 整数的 数组形式 是按照从左到右的顺序表示其数字的数组。num 例如&#xff0c;对于 &#xff0c;数组形式是 。num 1321[1,3,2,1] 给定 &#xff0c;整数的 数组形式 &#xff0c;和整数 &#xff0c;返回 整数 num k 的 数组形式 。numk 示例 1&…

猫咪吃哪种冻干最好?推荐新手养猫五款口碑最好主食冻干猫粮牌子

现在越来越多的铲屎官关注猫咪的食品选择&#xff0c;而冻干猫粮一直是热门话题。其中主食冻干的肉含量很高&#xff0c;富含猫咪成长所需的蛋白质、维生素等营养物质。而且冻干工艺还保留了食材的原始风味&#xff0c;复水后可以恢复鲜肉的口感&#xff0c;猫咪很喜欢吃&#…

用于IT管理的COBIT

随着世界的不断发展和变化&#xff0c;企业必须像冲浪者一样乘风破浪&#xff0c;适应社会不断更新的浪潮&#xff0c;拥抱新技术。信息技术&#xff08;IT&#xff09;已成为大多数企业运营的支柱&#xff0c;对战略决策、客户互动和整体效率都起了一定的影响作用。然而&#…

《Spring Cloud学习笔记:分布式事务Seata》

1.分布式事务理论基础 1.1.本地事务 本地事务&#xff0c;也就是传统的单机事务&#xff0c;在传统的数据库事务中&#xff0c;必须要满足ACID四个原则&#xff1a; 1.2.分布式事务 分布式事务&#xff0c;就是指不是在单个服务或单个数据库架构下产生的事务。 分布式事务是…

polar CTF上传

WEB-上传 一、查看题目信息 二、漏洞分析 经过上传测试发现&#xff0c;这题过滤掉了<?&#xff0c;这样正常的一句话木马就没法上传&#xff0c;这里可以用utf-16编码绕过。因为utf-16占utf-8的两倍长度&#xff0c;上传时默认检测为utf-8,从而就能绕过检测成功上传。 同…

BOM和DOM有什么区别和联系

BOM (Browser Object Model) 和 DOM (Document Object Model) 都是与 Web 开发相关的术语&#xff0c;它们分别代表了浏览器对象模型和文档对象模型。 BOM 是浏览器对象模型的缩写&#xff0c;它提供了一组用于操作浏览器窗口、浏览器历史记录、浏览器的位置等浏览器相关对象的…

C语言中关于while语句的理解以及getchar和putchar

while是一个循环语句&#xff0c;关于while的一些理解可以看下面这串代码 #include <stdio.h> int main() {int i 0;scanf("%d", &i);printf("输入十以内的数字&#xff0c;从输入的数字开始一直数到十&#xff1a;");while (i<10){printf(…

05-C++ 类和对象-继承

类与对象-03 继承与派生 1. 继承的概念 c最重要的特征是代码重用&#xff0c;通过继承机制可以利用已有的数据类型&#xff0c;来定义新的数据类型&#xff0c;新的类不仅拥有旧类的成员&#xff0c;还拥有新定义的成员。 一个 B 类继承于 A 类&#xff0c;或称从类 A 派生…

【操作系统】不同操作系统内核架构分析

一、内核架构与操作系统性能之间的关系的分析 1. 适用性和专业化&#xff1a; 不同的内核架构往往会有不同的设计目标和优化点。例如&#xff0c;实时操作系统&#xff08;RTOS&#xff09;和通用操作系统&#xff08;像Linux或Windows&#xff09;在设计时就有不同的重点&am…

数据结构--二叉搜索树的实现

目录 1.二叉搜索树的概念 2.二叉搜索树的操作 二叉搜索树的插入 中序遍历(常用于排序) 二叉搜索树的查找 二叉搜索树的删除 完整二叉树代码&#xff1a; 二叉搜索树的应用 key/value搜索模型整体代码 1.二叉搜索树的概念 二叉搜索树又称二叉排序树&#xff0c;它或者是一…

【第十三课】Trie字符串统计(acwing-835 / 二维数组的含义 / c++代码)

思想 Trie树在我们之前学习树的时候简单提过一嘴。 Trie树也称为前缀树或字典树&#xff0c;是一种用于高效存储和查找字符串的数据结构。Trie树的主要思想是利用字符串之间的公共前缀来节省存储空间&#xff0c;提高查询效率。 节点表示&#xff1a;Trie树中的每个节点代表一…

[Angular] 笔记 16:模板驱动表单 - 选择框与选项

油管视频&#xff1a; Select & Option (Template Driven Forms) Select & Option 在 pokemon.ts 中新增 interface: export interface Pokemon {id: number;name: string;type: string;isCool: boolean;isStylish: boolean;acceptTerms: boolean; }// new interface…

JavaScript(简写js)常用事件举例演示

目录 1.窗口事件onblur :失去焦点onfocus:获得焦点onload:窗口加载事件onresize:窗口大小缩放事件 二、表单事件oninput &#xff1a;当文本框内容改变时 &#xff0c;立即将改变内容 输出在控制台onchange&#xff1a; 内容改变事件onclick&#xff1a;鼠标单击时触发此事件 三…

OR-NeRF论文笔记

OR-NeRF论文笔记 文章目录 OR-NeRF论文笔记论文概述Abstract1 Introduction2 Related Work3 Background4 Method4.1 Multiview Segmentation4.2 Scene Object Removal 5 ExperimentsDatasetsMetricsMultiview SegmentationScene Object Removal 6 Conclusion 论文概述 目的&am…

resnet18

ResNet18的基本含义是&#xff0c;网络的基本架构是ResNet&#xff0c;网络的深度是18层。但是这里的网络深度指的是网络的权重层&#xff0c;也就是包括池化&#xff0c;激活&#xff0c;线性层。而不包括批量化归一层&#xff0c;池化层。 transforms.RandomCrop(32, pa…

悔不该用中文作为Windows的用户名啊~

前言 汉字在中华文明已经有了几千年的历史&#xff0c;小伙伴们所使用名字更是伴随了自己一生。所以小白们在拿到自己的新电脑&#xff0c;总会想着把自己的中文名字设置为电脑的用户名&#xff0c;这样更能显示出那是自己的专属电脑&#xff01; 一开始小白也是这么想的&…

Unity中Shader裁剪空间推导(在Shader中使用)

文章目录 前言一、在Shader中使用转化矩阵1、在顶点着色器中定义转化矩阵2、用 UNITY_NEAR_CLIP_VALUE 区分平台矩阵3、定义一个枚举用于区分当前是处于什么相机 二、我们在DirectX平台下&#xff0c;看看效果1、正交相机下2、透视相机下3、最终代码 前言 在上一篇文章中&…

迁移Ubuntu报错问题

问题描述&#xff1a; 使用LxRunOffline-v3.5.0-mingw迁移Ubuntu至非系统盘时&#xff0c;出现如下报错 ‘Couldn’t set the case sensitive attribute of the directory “\?\C:\Users\xxx\AppData\Local\Packages\CanonicalGroupLimited.UbuntuonWindows_79rhkp1fndgsc\Loc…

基于策略模式和简单工厂模式实现zip、tar、rar、7z四种压缩文件格式的解压

推荐语 这篇技术文章深入探讨了基于策略模式和简单工厂模式实现四种常见压缩文件格式的解压方法。通过阅读该文章&#xff0c;你将了解到如何利用这两种设计模式来实现灵活、可扩展的解压功能&#xff0c;同时适应不同的压缩文件格式。如果你对设计模式和文件处理感兴趣或刚好…

【ES】es介绍,使用spring-boot-starter-data-elasticsearch整合的ES来进行操作Es

文章目录 倒排索引&#xff08;Inverted Index&#xff09;和正排索引&#xff08;Forward Index&#xff09;es和MySQL对比IK分词器的总结mapping映射使用springboot整合的ES来进行操作Es1. 实体类中添加注解2. 编写Repository层3. 通过Repository进行增删改查 倒排索引&#…