Spark–steaming

news2025/4/23 6:10:43

实验项目:

找出所有有效数据,要求电话号码为11位,但只要列中没有空值就算有效数据。 按地址分类,输出条数最多的前20个地址及其数据。

代码讲解: 导包和声明对象,设置Spark配置对象和SparkContext对象。 使用Spark SQL语言进行数据处理,包括创建数据库、数据表,导入数据文件,进行数据转换。 筛选有效数据并存储到新表中。 按地址分组并统计出现次数,排序并输出前20个地址。 代码如下 import org.apache.spark.SparkConf import org.apache.spark.sql.SparkSession object Demo { def main(args: Array[String]): Unit = { val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Demo") val spark = SparkSession.builder().enableHiveSupport() .config("spark.sql.warehouse.dir", "hdfs://node01:9000/user/hive/warehouse").config(sparkConf).getOrCreate() spark.sql(sqlText = "create database spark_sql_2") spark.sql(sqlText = "use spark_sql_2") //创建存放原始数据的表 spark.sql( """ |create table user_login_info(data string |row format delimited |""".stripMargin) spark.sql(sqlText = "load data local inpath 'Spark-SQL/input/user_login_info.json' into table user_login_info") //利用get_json_object将数据做转换 spark.sql( """ |create table user_login_info_1 |as |select get_json_object(data,'$.uid') as uid, |get_json_object(data,'$.phone') as phone, |get_json_object(data,'$.addr') as addr from user_login_info |""".stripMargin) spark.sql(sqlText = "select count(*) count from user_login_info_1").show() //获取有效数据 spark.sql( """ |create table user_login_info_2 |as |select * from user_login_info_1 |where uid != ' ' and phone != ' ' and addr != ' ' |""".stripMargin) spark.sql(sqlText = "select count(*) count from user_login_info_2").show() //获取前20个地址 spark.sql( """ |create table hot_addr |as |select addr,count(addr) count from user_login_info_2 |group by addr order by count desc limit 20 |""".stripMargin) spark.sql(sqlText = "select * from hot_addr").show() spark.stop() } }

 

Spark Streaming介绍 Spark Streaming概述: 用于流式计算,处理实时数据流。 支持多种数据输入源(如Kafka、Flume、Twitter、TCP套接字等)和输出存储位置(如HDFS、数据库等)。

Spark Streaming特点: 易用性:支持Java、Python、Scala等编程语言,编写实时计算程序如同编写批处理程序。 容错性:无需额外代码和配置即可恢复丢失的数据,确保实时计算的可靠性。 整合性:可以在Spark上运行,允许重复使用相关代码进行批处理,实现交互式查询操作。

Spark Streaming架构: 驱动程序(StreamingContext)处理数据并传给SparkContext。 工作节点接收和处理数据,执行任务并备份数据到其他节点。 背压机制协调数据接收能力和资源处理能力,避免数据堆积和资源浪费。 Spark Streaming实操 词频统计案例: 使用ipad工具向999端口发送数据,Spark Streaming读取端口数据并统计单词出现次数。 代码配置包括设置关键对象、接收TCP套接字数据、扁平化处理、累加相同键值对、分组统计词频。 启动和运行: 启动netpad发送数据,Spark Streaming每隔三秒收集和处理数据。 代码中没有显式关闭状态,流式计算默认持续运行,确保数据处理不间断。 DStream创建 DStream创建方式: RDD队列:通过SSC创建RDD队列,将RDD推送到队列中作为DStream处理。 自定义数据源:下节课详细讲解。

RDD队列案例: 循环创建多个RDD并推送到队列中,使用Spark Streaming处理RDD队列进行词频统计。 代码包括配置对象、创建可变队列、转换RDD为DStream、累加和分组统计词频。 代码如下 import org.apache.spark.SparkConf object WordCount { def main(args: Array[String]): Unit = { val sparkConf = new SparkConf().setMaster("local[*]").setAppName("streaming") val ssc = new StreamingContext(sparkConf,Seconds(3)) val lineStreams = ssc.socketTextStream("node01",9999) val wordStreams = lineStreams.flatMap(_.split(" ")) val wordAndOneStreams = wordStreams.map((_,1)) val wordAndCountStreams = wordAndOneStreams.reduceByKey(_+_) wordAndCountStreams.print() ssc.start() ssc.awaitTermination() } }

 

结果展示: 展示了词频统计的结果,验证了Spark Streaming的正确性和有效性。 自定义数据源的实现 需要导入新的函数并继承现有的函数。 创建数据源时需选择class而不是object。 在class中定义on start和on stop方法,并在这些方法中实现具体的功能。 类的定义和初始化 类的定义包括数据类型的设定,如端口号和TCP名称。 使用extends关键字继承父类的方法。 数据存储类型设定为内存中保存。 数据接收和处理 在on start方法中创建新线程并调用接收数据的方法。 连接到指定的主机和端口号,创建输入流并转换为字符流。 逐行读取数据并写入到spark stream中,进行词频统计。 数据扁平化和词频统计 使用block map进行数据扁平化处理。 将原始数据转换为键值对形式,并根据相同键进行分组和累加。 输出词频统计结果。 程序终止条件 设定手动终止和程序异常时的终止条件。 在满足终止条件时输出结果并终止程序。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2340554.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

深度学习训练中的显存溢出问题分析与优化:以UNet图像去噪为例

最近在训练一个基于 Tiny-UNet 的图像去噪模型时,我遇到了经典但棘手的错误: RuntimeError: CUDA out of memory。本文记录了我如何从复现、分析,到逐步优化并成功解决该问题的全过程,希望对深度学习开发者有所借鉴。 训练数据&am…

如何修复WordPress中“您所关注的链接已过期”的错误

几乎每个管理WordPress网站的人都可能遇到过“您关注的链接已过期”的错误,尤其是在上传插件或者主题的时候。本文将详细解释该错误出现的原因以及如何修复,帮助您更好地管理WordPress网站。 为什么会出现“您关注的链接已过期”的错误 为了防止资源被滥…

从零开始搭建Django博客①--正式开始前的准备工作

本文主要在Ubuntu环境上搭建,为便于研究理解,采用SSH连接在虚拟机里的ubuntu-24.04.2-desktop系统搭建的可视化桌面,涉及一些文件操作部分便于通过桌面化进行理解,最后的目标是在本地搭建好系统后,迁移至云服务器并通过…

健身房管理系统(springboot+ssm+vue+mysql)含运行文档

健身房管理系统(springbootssmvuemysql)含运行文档 健身房管理系统是一个全面的解决方案,旨在帮助健身房高效管理其运营。系统提供多种功能模块,包括会员管理、员工管理、会员卡管理、教练信息管理、解聘管理、健身项目管理、指导项目管理、健身器材管理…

Java从入门到“放弃”(精通)之旅——继承与多态⑧

Java从入门到“放弃”(精通)之旅🚀——继承与多态⑧ 一、继承:代码复用的利器 1.1 为什么需要继承? 想象一下我们要描述狗和猫这两种动物。如果不使用继承,代码可能会是这样: // Dog.java pu…

DeepSeek开源引爆AI Agent革命:应用生态迎来“安卓时刻”

开源低成本:AI应用开发进入“全民时代” 2025年初,中国AI领域迎来里程碑事件——DeepSeek开源模型的横空出世,迅速在全球开发者社区掀起热潮。其R1和V3模型以超低API成本(仅为GPT-4o的2%-10%)和本地化部署能力&#x…

使用 LangChain + Higress + Elasticsearch 构建 RAG 应用

RAG(Retrieval Augmented Generation,检索增强生成) 是一种结合了信息检索与生成式大语言模型(LLM)的技术。它的核心思想是:在生成模型输出内容之前,先从外部知识库或数据源中检索相关信息&…

Self-Ask:LLM Agent架构的思考模式 | 智能体推理框架与工具调用实践

作为程序员,我们习惯将复杂问题分解为可管理的子任务,这正是递归和分治算法的核心思想。那么,如何让AI模型也具备这种结构化思考能力?本文深入剖析Self-Ask推理模式的工作原理、实现方法与最佳实践,帮助你构建具有清晰…

安装 vmtools

第2章 安装 vmtools 1.安装 vmtools 的准备工作 1)现在查看是否安装了 gcc ​ 查看是否安装gcc 打开终端 输入 gcc - v 安装 gcc 链接:https://blog.csdn.net/qq_45316173/article/details/122018354?ops_request_misc&request_id&biz_id10…

【论文阅读20】-CNN-Attention-BiGRU-滑坡预测(2025-03)

这篇论文主要探讨了基于深度学习的滑坡位移预测模型,结合了MT-InSAR(多时相合成孔径雷达干涉测量)观测数据,提出了一种具有可解释性的滑坡位移预测方法。 [1] Zhou C, Ye M, Xia Z, et al. An interpretable attention-based deep…

滑动窗口学习

2090. 半径为 k 的子数组平均值 题目 问题分析 给定一个数组 nums 和一个整数 k,需要构建一个新的数组 avgs,其中 avgs[i] 表示以 nums[i] 为中心且半径为 k 的子数组的平均值。如果在 i 前或后不足 k 个元素,则 avgs[i] 的值为 -1。 思路…

# 基于PyTorch的食品图像分类系统:从训练到部署全流程指南

基于PyTorch的食品图像分类系统:从训练到部署全流程指南 本文将详细介绍如何使用PyTorch框架构建一个完整的食品图像分类系统,涵盖数据预处理、模型构建、训练优化以及模型保存与加载的全过程。 1. 系统概述 本系统实现了一个基于卷积神经网络(CNN)的…

v-html 显示富文本内容

返回数据格式&#xff1a; 只有图片名称 显示不出完整路径 解决方法&#xff1a;在接收数据后手动给img格式的拼接vite.config中的服务器地址 页面&#xff1a; <el-button click"">获取信息<el-button><!-- 弹出层 --> <el-dialog v-model&…

【数学建模】孤立森林算法:异常检测的高效利器

孤立森林算法&#xff1a;异常检测的高效利器 文章目录 孤立森林算法&#xff1a;异常检测的高效利器1 引言2 孤立森林算法原理2.1 核心思想2.2 算法流程步骤一&#xff1a;构建孤立树(iTree)步骤二&#xff1a;构建孤立森林(iForest)步骤三&#xff1a;计算异常分数 3 代码实现…

<项目代码>YOLO小船识别<目标检测>

项目代码下载链接 YOLOv8是一种单阶段&#xff08;one-stage&#xff09;检测算法&#xff0c;它将目标检测问题转化为一个回归问题&#xff0c;能够在一次前向传播过程中同时完成目标的分类和定位任务。相较于两阶段检测算法&#xff08;如Faster R-CNN&#xff09;&#xff0…

Crawl4AI:打破数据孤岛,开启大语言模型的实时智能新时代

当大语言模型遇见数据饥渴症 在人工智能的竞技场上&#xff0c;大语言模型&#xff08;LLMs&#xff09;正以惊人的速度进化&#xff0c;但其认知能力的跃升始终面临一个根本性挑战——如何持续获取新鲜、结构化、高相关性的数据。传统数据供给方式如同输血式营养支持&#xff…

【Spring Boot】MyBatis多表查询的操作:注解和XML实现SQL语句

1.准备工作 1.1创建数据库 &#xff08;1&#xff09;创建数据库&#xff1a; CREATE DATABASE mybatis_test DEFAULT CHARACTER SET utf8mb4;&#xff08;2&#xff09;使用数据库 -- 使⽤数据数据 USE mybatis_test;1.2 创建用户表和实体类 创建用户表 -- 创建表[⽤⼾表…

[Android]豆包爱学v4.5.0小学到研究生 题目Ai解析

拍照解析答案 【应用名称】豆包爱学 【应用版本】4.5.0 【软件大小】95mb 【适用平台】安卓 【应用简介】豆包爱学&#xff0c;一般又称河马爱学教育平台app,河马爱学。 关于学习&#xff0c;你可能也需要一个“豆包爱学”这样的AI伙伴&#xff0c;它将为你提供全方位的学习帮助…

Qt开发:软件崩溃时,如何生成dump文件

文章目录 一、程序崩溃时如何自动生成 Dump 文件二、支持多线程中的异常捕获三、在 DLL 中使用 Dump 捕获四、封装成可复用类五、MiniDumpWriteDump函数详解 一、程序崩溃时如何自动生成 Dump 文件 步骤一&#xff1a;包含必要的头文件 #include <Windows.h> #include …

普罗米修斯Prometheus监控安装(mac)

普罗米修斯是后端数据监控平台&#xff0c;通过Node_exporter/mysql_exporter等收集数据&#xff0c;Grafana将数据用图形的方式展示出来 官网各平台下载 Prometheus安装&#xff08;mac&#xff09; &#xff08;1&#xff09;通过brew安装 brew install prometheus &…