StructuredStreaming (一)

news2024/11/17 6:28:19

一、sparkStreaming的不足

1.基于微批,延迟高不能做到真正的实时

2.DStream基于RDD,不直接支持SQL

3.流批处理的API应用层不统一,(流用的DStream-底层是RDD,批用的DF/DS/RDD)

4.不支持EventTime事件时间(一般流处理都会有两个时间:事件发生的事件,一个是事件处理的时间)

5.数据的Exactly-Once(恰好一次语义)需要手动实现

二、StructuredStreaming 的介绍 

1、2016年Spark2.0版本中发布

2、基于SparkSQL引擎的可扩展、容错的全新的流处理引擎。

3、并不是对Spark Streaming的简单改进,而是重新开发的全新流式引擎

准实时技术:来一批处理一批 实时:来一条处理一条 离线:一般都是处理一些静止的数据

三、socket+console

1、在虚拟机中下载nc
yum install -y nc

2、启动 

nc -lk 9999

案例:wordcount

import os

from pyspark.sql import SparkSession
from pyspark.sql.functions import explode
import pyspark.sql.functions as F
if __name__ == '__main__':
    os.environ['JAVA_HOME'] = 'E:/java-configuration/jdk-8'
    # 配置Hadoop的路径,就是前面解压的那个路径
    os.environ['HADOOP_HOME'] = 'E:/applications/bigdata_config/hadoop-3.3.1/hadoop-3.3.1'
    # 配置base环境Python解析器的路径
    os.environ['PYSPARK_PYTHON'] = 'C:/Users/35741/miniconda3/python.exe'
    # 配置base环境Python解析器的路径
    os.environ['PYSPARK_DRIVER_PYTHON'] = 'C:/Users/35741/miniconda3/python.exe'

    # 创建一个sparkSession对象
    spark = SparkSession.builder.appName("socketDemo").getOrCreate()

    socketDf = spark.readStream.format("socket") \
        .option("host", "bigdata01") \
        .option("port", 9999) \
        .load()
    
   


    # 处理
    # 方式一:使用dsl语法
    splitDf = socketDf.select(explode(F.split(socketDf.value, " ")).alias("word"))
    resultDf1 = splitDf.groupBy("word").count()
    
    # 方式二:使用sql
    socketDf.createOrReplaceTempView("wordcount")
    resultDf2 = spark.sql("""
    with t1 as( 
    select num from wordcount lateral view explode(split(value," ")) c as num
    )select num,count(*) counts from t1 group by num;
    """)
    
    
    
    # 下面的就是sink的写法 后续会写

    query1 = resultDf1.writeStream \
        .outputMode("complete") \
        .format("console") \
        .start()
        
    query2 = resultDf2.writeStream \
        .outputMode("complete") \
        .format("console") \
        .start() \
        .awaitTermination()

    spark.stop()

四、file+console

文件中的数据:
1;yuwen;43
1;shuxue;55
2;yuwen;77
2;shuxue;88
3;yuwen;98
3;shuxue;65
3;yingyu;88
import os


from pyspark.sql import SparkSession

from pyspark.sql.types import StructField, StringType, DoubleType, LongType, IntegerType, StructType

if __name__ == '__main__':
    os.environ['JAVA_HOME'] = 'E:/java-configuration/jdk-8'
    # 配置Hadoop的路径,就是前面解压的那个路径
    os.environ['HADOOP_HOME'] = 'E:/applications/bigdata_config/hadoop-3.3.1/hadoop-3.3.1'
    # 配置base环境Python解析器的路径
    os.environ['PYSPARK_PYTHON'] = 'C:/Users/35741/miniconda3/python.exe'
    # 配置base环境Python解析器的路径
    os.environ['PYSPARK_DRIVER_PYTHON'] = 'C:/Users/35741/miniconda3/python.exe'

    # 创建一个sparkSession对象
    spark = SparkSession.builder.appName("socketDemo").getOrCreate()
    # score_schema = StructType([
    #     StructField(name="stu_id", dataType=IntegerType(), nullable=False),
    #     StructField(name="subject_name", dataType=StringType(), nullable=True),
    #     StructField(name="score", dataType=DoubleType(), nullable=True)
    # ])
    score_schema = StructType().add("stu_id", IntegerType()).add("subject_name", StringType()).add("score",DoubleType())

    socketDf = spark.readStream.format("csv") \
        .option("sep", ";") \
        .schema(score_schema) \
        .load("../../resources/input1")


    socketDf.writeStream \
        .outputMode("append") \
        .format("console") \
        .option("truncate", False) \
        .start() \
        .awaitTermination()

    spark.stop()

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2242000.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

计算机网络HTTP——针对实习面试

目录 计算机网络HTTP什么是HTTP?HTTP和HTTPS有什么区别?分别说明HTTP/1.0、HTTP/2.0、HTTP/3.0请说明访问网页的全过程请说明HTTP常见的状态码Cookie和Session有什么区别?HTTP请求方式有哪些?请解释GET和POST的区别?HT…

转轮数组(C语言实现)

题目介绍 方法一我们可以先把数字1 2 3 4逆转一下&#xff0c;第二步我们可以逆转一下5 6 7&#xff0c; 最后整体逆置一下就会变成上面的数字。 void reverse(int* nums, int begin, int end) {while (begin < end){int tmp nums[begin];nums[begin] nums[end];nums[en…

Ubuntu 的 ROS 操作系统 turtlebot3 gazebo仿真

引言 TurtleBot3 Gazebo仿真环境是一个非常强大的工具&#xff0c;能够帮助开发者在虚拟环境中测试和验证机器人算法。 Gazebo是一个开源的3D机器人仿真平台&#xff0c;它能支持物理引擎&#xff0c;允许机器人在虚拟环境中模拟和测试。结合ROS&#xff0c;它能提供一个完整的…

uniapp vuex的使用

实现组件全局&#xff08;数据&#xff09;管理的一种机制&#xff0c;可以方便的实现组件之间共享数据&#xff0c;不同于上述三种传递值的方式。 可以把vuex当成一个store仓库&#xff0c;可以集中管理共享的数据&#xff0c;并且存储在vuex中的数据都是响应式的&#xff0c…

uniapp适配暗黑模式配置plus.nativeUI.setUIStyle适配DarkMode配置

uniapp适配暗黑模式配置 目录 uniapp适配暗黑模式配置setUIStyleDarkMode 适配app-plus manifest.json配置theme.json配置pages.json配置页面切换代码实现同步手机暗黑配置额外适配 参考官方文档&#xff1a;https://uniapp.dcloud.net.cn/tutorial/darkmode.html 主要用到api…

element ui table进行相同数据合并单元格

示例如图 //要合并的项&#xff08;自定义&#xff09; const columnArr ["dq","sj","xj","zj","zjj","zjfzr","nhxm","nhsjh","nhsfzh","","",""…

uniapp 实现 ble蓝牙同时连接多台蓝牙设备,支持app、苹果(ios)和安卓手机,以及ios连接蓝牙后的一些坑

首先对 uniapp BLE蓝牙API进行封装 这里我封装了一个类&#xff1a;bluetoothService.js 代码&#xff1a; import { throttle } from lodash export default class Bluetooth {constructor() {this.device {};this.connected false;// 使用箭头函数绑定类实例的上下文&am…

51单片机应用开发(进阶)---模块化编程

实现目标 1、掌握.h 文件的格式、extern 的用法&#xff1b; 2、握模块化编程方法步骤&#xff1b; 3、具体实现&#xff1a;&#xff08;1&#xff09;提供一个C文件&#xff0c;将其按照功能模块进行模块化。 一、为什么要进行模块化编程&#xff1f; 传统的编程方式&…

arkUI:水果选择与管理:基于 ArkUI 的长按编辑功能实现

水果选择与管理&#xff1a;基于 ArkUI 的长按编辑功能实现 1 主要内容说明2 相关内容2.1 相关内容2.1.1 源码1内容的相关说明2.1.1.1 数据结构与状态管理2.1.1.2 添加水果功能2.1.1.3 水果列表展示2.1.1.4 长按进入编辑模式2.1.1.5 复选框的多选功能2.1.1.6 删除水果功能2.1.1…

小程序20-样式:自适应尺寸单位 rpx

手机设备的宽度逐渐多元化&#xff0c;也就需要开发者开发过程中&#xff0c;去适配不同屏幕宽度的手机&#xff0c;为了解决屏幕适配问题&#xff0c;微信小程序推出了 rpx 单位 rpx&#xff1a;小程序新增的自适应单位&#xff0c;可以根据不同设备的屏幕宽度进行自适应缩放 …

unity3d————Resources异步加载

知识点一&#xff1a;Resources异步加载是什么&#xff1f; 在Unity中&#xff0c;资源加载可以分为同步加载和异步加载两种方式。同步加载会在主线程中直接进行&#xff0c;如果加载的资源过大&#xff0c;可能会导致程序卡顿&#xff0c;因为从硬盘读取数据到内存并进行处理…

C#/WinForm拖拽文件上传

一、首先创建一个上传文件的类&#xff0c;继承Control类&#xff0c;如下&#xff1a; public class UploadControl : Control{private Image _image;public UploadControl(){this.SetStyle(ControlStyles.UserPaint | //控件自行绘制&#xff0c;而不使用操作系统的绘制Cont…

2024 同一个网段,反弹shell四种方法【linux版本】bash、python、nc、villian反弹shell图解步骤

实验环境准备&#xff08;同一个网段下&#xff0c;我是桥接的虚拟机&#xff09; 一、bash反弹shell 二、python反弹shell 三、nc反弹shell 四、villain反弹shell 实验环境准备&#xff08;同一个网段下&#xff0c;我是桥接的虚拟机&#xff09; 一台kali的linux(攻击者)…

FPGA使用Verilog实现CAN通信

FPGA实现CAN通信&#xff08;Verilog&#xff09; 1.作者使用的方法是通过FPGA芯片&#xff08;如Xilinx公司的型号为XC7K325TFFG676-2&#xff09;控制SJA1000T芯片&#xff08;CAN控制器芯片&#xff09;实现CAN通信&#xff0c;如下图所示&#xff1a; 2.熟悉连接方式之后&…

已解决:spark代码中sqlContext.createDataframe空指针异常

这段代码是使用local模式运行spark代码。但是在获取了spark.sqlContext之后&#xff0c;用sqlContext将rdd算子转换为Dataframe的时候报错空指针异常 Exception in thread "main" org.apache.spark.sql.AnalysisException: java.lang.RuntimeException: java.lang.Nu…

jenkins用户在执行scp的时候如何做免密登录

一、背景 在jenkins job中执行scp的shell命令&#xff0c;当然不希望每次输入密码&#xff0c;另外处于出于安全考虑&#xff0c;也不建议在scp命令中指定。 所以&#xff0c;我们需要对远程机器进行免密登录。 本文遇到的问题是&#xff0c;在jenkins机器上执行scp已做到了…

HarmonyOS ArkUI(基于ArkTS) 开发布局 (中)

HarmonyOS ArkUI(基于ArkTS) 开发布局 &#xff08;上&#xff09; 四 层叠布局 (Stack) 层叠布局&#xff08;StackLayout&#xff09;用于在屏幕上预留一块区域来显示组件中的元素&#xff0c;提供元素可以重叠的布局。层叠布局通过Stack容器组件实现位置的固定定位与层叠&…

无线网络信号 6G、5G和2.4G 的一些小科普

无线网络信号划分为6G、5G和2.4G这几类信号&#xff0c;它们各自有不同的用途和区别&#xff1a; 1、 2.4G无线技术 - 用途&#xff1a;2.4G无线技术广泛应用于智能家居、物联网、WLAN和蓝牙设备等。它是一个全球性的工作频段&#xff0c;适用于低速率的应用&#xff0c;如普通…

什么是GCP kunernetes的Node Taints and Tolerations

在Kubernetes中&#xff0c;Node taints和Pod tolerations是两个相关的功能&#xff0c;它们用于控制Pods的调度&#xff0c;以确保Pods不会调度到不适当的节点上。以下是这两个概念的详细解释&#xff1a; Node Taints&#xff08;节点污点&#xff09; 定义&#xff1a;Node…

ROS进阶:使用URDF和Xacro构建差速轮式机器人模型

前言 本篇文章介绍的是ROS高效进阶内容&#xff0c;使用URDF 语言&#xff08;xml格式&#xff09;做一个差速轮式机器人模型&#xff0c;并使用URDF的增强版xacro&#xff0c;对机器人模型文件进行二次优化。 差速轮式机器人&#xff1a;两轮差速底盘由两个动力轮位于底盘左…