Elasticsearch 集成---Spark Streaming 框架集成

news2025/1/11 2:29:56

一.Spark Streaming 框架介绍

Spark Streaming Spark core API 的扩展,支持实时数据流的处理,并且具有可扩展,
高吞吐量,容错的特点。
数据可以从许多来源获取,如 Kafka Flume Kinesis TCP sockets
并且可以使用复杂的算法进行处理,这些算法使用诸如 map reduce join window 等高
级函数表示。 最后,处理后的数据可以推送到文件系统,数据库等。 实际上,您可以将
Spark 的机器学习和图形处理算法应用于数据流。

二.框架集成

1. 创建 Maven 项目

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.atguigu.es</groupId>
    <artifactId>es-sparkstreaming</artifactId>
    <version>1.0</version>

    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.12</artifactId>
            <version>3.0.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_2.12</artifactId>
            <version>3.0.0</version>
        </dependency>
        <dependency>
            <groupId>org.elasticsearch</groupId>
            <artifactId>elasticsearch</artifactId>
            <version>7.8.0</version>
        </dependency>
        <!-- elasticsearch的客户端 -->
        <dependency>
            <groupId>org.elasticsearch.client</groupId>
            <artifactId>elasticsearch-rest-high-level-client</artifactId>
            <version>7.8.0</version>
        </dependency>
        <!-- elasticsearch依赖2.x的log4j -->
        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-api</artifactId>
            <version>2.8.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-core</artifactId>
            <version>2.8.2</version>
        </dependency>
        <!--        <dependency>-->
        <!--            <groupId>com.fasterxml.jackson.core</groupId>-->
        <!--            <artifactId>jackson-databind</artifactId>-->
        <!--            <version>2.11.1</version>-->
        <!--        </dependency>-->
        <!--        &lt;!&ndash; junit单元测试 &ndash;&gt;-->
        <!--        <dependency>-->
        <!--            <groupId>junit</groupId>-->
        <!--            <artifactId>junit</artifactId>-->
        <!--            <version>4.12</version>-->
        <!--        </dependency>-->
    </dependencies>
</project>

2.功能实现

package com.atguigu.es

import org.apache.http.HttpHost
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.ReceiverInputDStream
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.elasticsearch.action.index.{IndexRequest, IndexResponse}
import org.elasticsearch.client.{RequestOptions, RestClient, RestHighLevelClient}
import org.elasticsearch.common.xcontent.XContentType

object SparkStreamingESTest {

    def main(args: Array[String]): Unit = {

        val sparkConf = new SparkConf().setMaster("local[*]").setAppName("ESTest")
        val ssc = new StreamingContext(sparkConf, Seconds(3))

        val ds: ReceiverInputDStream[String] = ssc.socketTextStream("localhost", 9999)
        ds.foreachRDD(
            rdd => {
                rdd.foreach(
                    data => {
                        val client = new RestHighLevelClient(
                            RestClient.builder(new HttpHost("localhost",9200, "http"))
                        )

                        val ss = data.split(" ")

                        val request = new IndexRequest()
                        request.index("product").id(ss(0))
                        val json =
                            s"""
                              | {  "data" : "${ss(1)}" }
                              |""".stripMargin
                        request.source(json, XContentType.JSON)

                        val response: IndexResponse = client.index(request, RequestOptions.DEFAULT)
                        println(response.getResult)
                        client.close()
                    }
                )
            }
        )

        ssc.start()
        ssc.awaitTermination()
    }
}

3.界面截图

三.安装NetCat

1.下载网址:netcat 1.11 for Win32/Win64

2.解压压缩包

右键zip文件-->解压到当前文件夹

3.配置环境变量

右键此电脑-->属性-->高级系统设置-->环境变量

四.测试

Window + R  重新启动cmd命令窗口

4.1测试:输入 nc -l -p 9999

4.2 启动测试

4.3 cmd输入 1001 jianzi

 4.4 postman 查看

get    http://127.0.0.1:9200/product/_doc/1001

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/927969.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

网络地址转换NAT-动态NAT的使用范围和配置-思科EI,华为数通

网络地址转换NAT-动态NAT的使用范围和配置 什么是动态NAT&#xff1f; 使用公有地址池&#xff0c;并以先到先得的原则分配这些地址。当具有私有 IP 地址的主机请求访问 Internet 时&#xff0c;动态 NAT 从地址池中选择一个未被其它主机占用的 IP 地址一对一的转化。当数据会话…

操作无法完成错误0x0000709的解决方法分享,教你快速修复错误代码问题

在使用计算机时&#xff0c;我们有时会遇到各种错误代码。其中之一是错误代码0x0000709&#xff0c;表示操作无法完成。这个错误代码可能由多种原因引起&#xff0c;但幸运的是&#xff0c;我们可以采取一些措施来解决它。本文将介绍错误代码0x0000709的含义&#xff0c;提供几…

使用vlc在线播放rtsp视频url

1. 2. 3. 工具链接&#xff1a; https://download.csdn.net/download/qq_43560721/88249440

人效九宫格城市沙龙暨《人效九宫格白皮书》发布会 —上海站,圆满结束

8月11日&#xff0c;在上海龙之梦万丽酒店&#xff0c;由盖雅工场主办的人效九宫格城市沙龙暨《人效九宫格白皮书》发布会 —上海站&#xff0c;圆满结束。 近百位来自多个行业的企业管理者及人力资源从业者汇聚一堂&#xff0c;共同探讨企业如何将盈利模式从数量增长转为质量增…

生成式 AI 在 Gartner 的 2023 年炒作周期中备受关注

原创 | 文 BFT机器人 01 背景 Gartner&#xff0c; Inc. 在其最新的 2023 年新兴技术炒作周期中&#xff0c;将生成人工智能(AI)定位于膨胀期望的顶峰&#xff0c;预计它将在未来两到五年内带来转型效益。这种人工智能变体是更广泛的新兴人工智能趋势的一部分&#xff0c;预示…

挂耳式运动耳机哪个款式戴着跑步舒服、挂耳式运动耳机推荐

对于和我一样热爱健身和运动的人来说&#xff0c;音乐就像一种调动情绪的"兴奋剂"&#xff0c;在戴上耳机、聆听着动感的音乐时&#xff0c;我们能够感受到肌肉的收缩&#xff0c;完全沉浸在自己的世界中。这种状态让我们的训练状态达到巅峰&#xff0c;快乐倍增。因…

暖手宝方案

充电暖手宝因为它的便携性&#xff0c;既能供暖又能当充电宝使用而备受人们喜爱。航誉微推出充电暖手宝方案&#xff0c;主控芯片为航誉微单片机HU系列&#xff0c;具有智能温控功能&#xff0c;可定制冷光显示屏。一、暖手宝方案原理 目前&#xff0c;市场常见的暖手宝大致有三…

大数据领域都有什么发展方向

近年来越来越多的人选择大数据行业&#xff0c;大数据行业前景不错薪资待遇好&#xff0c;各大名企对于大数据人才需求不断上涨。 大数据从业领域很宽广&#xff0c;不管是科技领域还是食品产业&#xff0c;零售业等都是需要大数据人才进行大数据的处理&#xff0c;以提供更好…

钡铼技术BL102 PLC网关案例:远程调试西门子PLC程序

网口PLC 远程下载 1、打开网关配置软件&#xff0c;点击“搜索”&#xff0c;搜索局内网网关BL102 ​2、搜索到的网关设备&#xff0c;选择要配置的设备&#xff0c;双击登录 ​3、输入登录密码登录&#xff0c;默认是123456 ​4、配置网关网口采集PLC&#xff0c;远程下载暂…

行业首家·合规典范|昂首资本携手菲律宾警察局,树立经纪商合规经营典范

Anzo Capital 昂首资本携手菲律宾达沃市警察局长阿尔贝托P卢帕兹受邀参加由 AFP-PNP Southern Mindanao Press Corps( 菲律宾武装部队(AFP)和菲律宾国家警察(PNP)南部棉兰老岛记者团)举办的新闻发布会。 本次新闻发布会在菲律宾达沃市皇家曼达亚酒店举行&#xff0c;Anzo Cap…

STM32 + RTThread + UGUI

一、概述 开发板&#xff1a;STM32F103C8T6显示器&#xff1a;ST7735SRT-Thread&#xff1a;5.0.0 玩过 GUI 的小伙伴都知道&#xff0c;界面的显示是一个个像素点组合起来的&#xff0c;那么直接构建出来炫酷的 GUI 还是相对比较困难的&#xff0c;所以我们一般都会使用一些…

反射机制-体会反射的动态性案例(尚硅谷Java学习笔记)

// 举例01 public class Reflect{ // 静态性 public Person getInstance(){return new Person(); }// 动态性 public T<T> getInstance(String className) throws Exception{Calss clzz Class.forName(className);Constructor con class.getDeclaredConstructor();con…

如何赋能音频行业更多创新 荔枝集团邀请华为云专家共探AIGC新动向

在互联网新时代&#xff0c;随着AIGC和大语言模型的技术突破&#xff0c;为音频产品提供了更多创新的可能性。8月25日下午&#xff0c;一场有关AIGC的技术交流论坛在位于广州的荔枝集团总部召开&#xff0c;并邀请华为云AI领域技术专家进行分享。华为云AIGC首席架构师&#xff…

网深科技与中科方德完成兼容性认证

网深科技的产品NetInside可观测性分析平台与国产中科方德主流操作系统完成兼容性适配&#xff0c;系统名称&#xff1a;方德高可信服务器操作系统V4.0&#xff0c;系统运行稳定&#xff0c;性能卓越&#xff0c;完美兼容&#xff0c;能够为广大用户提供灵活、专业、直观可视性&…

centos下配置SFTP且限制用户访问目录

一、SFTP使用场景 ftp是大多数网站的文件传输选择工具&#xff0c;但ftp并不是非常安全&#xff0c;并且在centos上搭建的vsftpd也非常的不稳定&#xff0c;偶尔会出现权限问题&#xff0c;例如500、或是账号密码不正确等等。 而SFTP是基于默认的22端口&#xff0c;是ssh内含…

23款奔驰GLS450升级原厂电动吸合门,体验绅士的关门状态

电吸门的工作原理是在门框(或门板边缘)上安装一个电磁线圈。当门打开时&#xff0c;电流会流过线圈&#xff0c;形成电磁场。这样&#xff0c;由于磁力的作用&#xff0c;当门靠近门框关闭时&#xff0c;门会自动关闭。 另外&#xff0c;电吸门也有有用的一面。如果下车&#…

生成地图展示【Python思路】

# 1.导包 import json from pyecharts.charts import Map #导入关于编写地图的包 from pyechart.options import * #全局设置# 2.得到地图对象 map Map()# 3.打开事先准备好的JSON数据文件 f open("D:/Typora 记事本/notebook/Python/Exercise_data/疫情.txt",&…

江西武功山旅游攻略(周末两日游)

一、 往返路线 1: 出发路线 周五晚上上海出发坐火车&#x1f684;到江西萍乡(11.5小时,卧铺550左右) 打车到江西武功山景区,120-150元左右,人均30元,1小时10分左右到达 或者 &#x1f697;到达萍乡北之后 出站后步行200米到长途汽车站&#xff0c;乘旅游巴士直达武功山游…

Centos 7.6 安装mongodb

以下是在CentOS 7.6上安装MongoDB的步骤&#xff1a; 打开终端并以root用户身份登录系统。 创建一个新的MongoDB存储库文件 /etc/yum.repos.d/mongodb-org-4.4.repo 并编辑它。 sudo vi /etc/yum.repos.d/mongodb-org-4.4.repo在编辑器中&#xff0c;添加下面的内容到文件中并…

GB28181开发------录像查询(设备视音频文件检索)

文件检索主要用区域、设备、录像时间段、录像地点、录像内容为条件进行查询,用 Message消息发 送检索请求和返回查询结果,传送结果的 Message消息可以发送多条,应支持附录 N 多响应消息传输 的要求。文件检索请求和应答命令采用 MANSCDP协议格式定义,详细描述见 A.2.4文件目录检…