SparkStreaming入门

news2025/1/10 15:36:56

概述

实时/离线

  • 实时:Spark是每个3秒或者5秒更新一下处理后的数据,这个是按照时间切分的伪实时。真正的实时是根据事件触发的数据计算,处理精度达到ms级别。
  • 离线:数据是落盘后再处理,一般处理的数据是昨天的数据,处理精度是天。

SparkStreaming简介

  1. 支持的输入源:Kafka, Flume, HDFS等
  2. 数据输入后,可以用RDD处理数据
  3. 结果可以保存在很多地方,比如HDFS,数据库等

SparkStreaming架构

DStream

SparkCore的基本单位RDD
SparkSQL的基本单位是DataFreme, DataSet
Spark Streaming的基本单位是Dstream

每个时间区间内收到的RDD组成的序列就是DStream.因此每个时间段的数据之间是独立的,如果需要汇总,需要指定相应的时间间隔。

架构图

在这里插入图片描述
由于接收方和计算方是两个节点,如果接收方和计算方的速度不一致,会存在数据挤压或者计算方空闲等待数据的问题。

DirectAPI : 为了解决该问题,后续新版本增加了Direct, 通过Executor计算方来控制数据的消费速度。

Hello World案例

  1. 添加依赖
<dependencies>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-streaming_2.12</artifactId>
        <version>3.3.1</version>
    </dependency>

    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_2.12</artifactId>
        <version>3.3.1</version>
</dependency>

<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-streaming-kafka-0-10_2.12</artifactId>
    <version>3.3.1</version>
 </dependency>
</dependencies>
  1. 编写代码,入口为javaStreamingContext, 必须设置时间间隔。
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka010.ConsumerStrategies;
import org.apache.spark.streaming.kafka010.KafkaUtils;
import org.apache.spark.streaming.kafka010.LocationStrategies;

import java.util.ArrayList;
import java.util.HashMap;


public class Test01_HelloWorld {
    public static void main(String[] args) throws InterruptedException {
        // 创建流环境
        JavaStreamingContext javaStreamingContext = new JavaStreamingContext("local[*]", "HelloWorld", Duration.apply(3000));

        // 创建配置参数
        HashMap<String, Object> map = new HashMap<>();
        map.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092,hadoop103:9092,hadoop104:9092");
        map.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        map.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        map.put(ConsumerConfig.GROUP_ID_CONFIG,"atguigu");
        map.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"latest");

        // 需要消费的主题
        ArrayList<String> strings = new ArrayList<>();
        strings.add("topic_db");

        JavaInputDStream<ConsumerRecord<String, String>> directStream = KafkaUtils.createDirectStream(javaStreamingContext, LocationStrategies.PreferBrokers(), ConsumerStrategies.<String, String>Subscribe(strings,map));

        JavaDStream<String> flatMap = directStream.flatMap(new FlatMapFunction<ConsumerRecord<String, String>, String>() {
            @Override
            public Iterator<String> call(ConsumerRecord<String, String> consumerRecord) throws Exception {
                String[] words = consumerRecord.value().split(" ");
                return Arrays.stream(words).iterator();
            }
        });
		
        flatMap .print();
        // 执行流的任务
        javaStreamingContext.start();
        javaStreamingContext.awaitTermination();//线程阻塞
    }
}

window算子窗口操作

由于不同的DStream之间是独立,如果相同统计比DStream时间间隔更大的时间范围内的数据,可以使用窗口操作。

窗口时长:计算内容的时间范围
滑动步长:隔多久触发一次计算

//4 添加窗口 窗口大小12s 滑动步长6s
        JavaPairDStream<String, Long> word2oneDStreamBywindow = word2oneDStream.window(Duration.apply(12000L), Duration.apply(6000L));

        //5 对加过窗口的数据流进行计算
        JavaPairDStream<String, Long> resultDStream = word2oneDStreamBywindow.reduceByKey((v1, v2) -> v1 + v2);

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1117131.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【微信小程序】后台数据交互于WX文件使用

目录 一、前期准备 1.1 数据库准备 1.2 后端数据获取接口编写 1.3 前端配置接口 1.4 封装微信的request请求 二、WXS文件的使用 2.1 WXS简介 2.2 WXS使用 三、后台数据交互完整代码 3.1 WXML 3.2 JS 3.3 WXSS 效果图 一、前期准备 1.1 数据库准备 创建数据库&…

【JavaEE初阶】 定时器详解与实现

文章目录 &#x1f334;定时器是什么&#x1f38b;Java标准库中的定时器&#x1f332;模拟实现定时器&#x1f6a9;定时器的构成&#x1f4cc;第一步&#xff1a;MyStack类的建立&#x1f4cc;第二步&#xff1a;创建MyTimer类&#x1f4cc;第三步&#xff1a;解决相关问题 &am…

ant利用pathconvert将资源集合转换为路径方式

ant可以利用pathconvert 任务&#xff0c;将内嵌的资源集合转换为某一平台的路径方式。pathconvert可以通过属性property设置一个属性名称&#xff0c;将转换后的路径保存到属性中。 例如&#xff0c;下面的代码&#xff1a; <project name"demo_project"><…

2023年中国一次性医用内窥镜市场发展现状分析:相关产品进入上市高峰期[图]

基于对减少交叉感染风险和维护成本的需求等因素&#xff0c;一种新兴的、耗材化的一次性内窥镜可以避免因重复使用产品而导致的感染问题和高额的清洗消毒费用&#xff0c;从而提高患者的安全性并帮助医疗机构节省运营成本。 一次性和可重复使用医用内窥镜特点对比 资料来源&am…

C++前缀和算法:合并石头的最低成本原理、源码及测试用例

本文涉及的基础知识点 C算法&#xff1a;前缀和、前缀乘积、前缀异或的原理、源码及测试用例 包括课程视频 动态规划&#xff0c;日后完成。 题目 有 n 堆石头排成一排&#xff0c;第 i 堆中有 stones[i] 块石头。 每次 移动 需要将 连续的 k 堆石头合并为一堆&#xff0c;而…

asp.net企业招聘管理系统VS开发sqlserver数据库web结构c#编程Microsoft Visual Studio计算机毕业设计

一、源码特点 asp.net 企业招聘管理系统 是一套完善的web设计管理系统&#xff0c;系统具有完整的源代码和数据库&#xff0c;系统主要采用B/S模式开发。开发环境为vs2010&#xff0c;数据库为sqlserver2008&#xff0c;使用c#语 言开发 asp.net企业招聘管理系统 二、功…

SpringBoot常见异步编程,你会多少?

微信公众号访问地址&#xff1a;SpringBoot常见异步编程&#xff0c;你会多少&#xff1f; 近期热推文章&#xff1a; 1、springBoot对接kafka,批量、并发、异步获取消息,并动态、批量插入库表; 2、SpringBoot用线程池ThreadPoolTaskExecutor异步处理百万级数据; 3、基于Redis…

线性代数2:梯队矩阵形式

图片来自 Europeana on Unsplash 一、前言 欢迎阅读的系列文章的第二篇文章&#xff0c;内容是线性代数的基础知识&#xff0c;线性代数是机器学习背后的基础数学。在我之前的文章中&#xff0c;我介绍了线性方程和系统、矩阵符号和行缩减运算。本文将介绍梯队矩阵形式&#xf…

2023年中国养殖渔船产业链、市场规模及发展趋势分析[图]

养殖渔船行业是指涉及水产养殖活动的渔船制造、运营和相关服务的产业。这个行业将渔船和水产养殖业结合起来&#xff0c;包括生产和维护用于养殖水域中养殖活动的各种船只&#xff0c;如养殖网船、渔业养殖船、水产养殖工作船等。 养殖渔船行业产业链 资料来源&#xff1a;共研…

2023年中国车用冲压模具行业特征、竞争现状及行业市场规模分析[图]

汽车冲压件模具具有尺寸大、型面复杂、精度要求高等特点&#xff0c;属于技术密集型产品。汽车冲压模具能快速精密地把材料直接加工成零件或半成品并通过焊接、铆接、拼装等工艺装配成零部件&#xff0c;冲压模具的设计开发和加工能力对汽车冲压零部件产品总制造成本、质量及性…

SpringBoot(二)集成 Quartz:2.5.4

Quartz是一个广泛使用的开源任务调度框架&#xff0c;用于在Java应用程序中执行定时任务和周期性任务。它提供了强大的调度功能&#xff0c;允许您计划、管理和执行各种任务&#xff0c;从简单的任务到复杂的任务。 以下是Quartz的一些关键特点和功能&#xff1a; 灵活的调度器…

海外问卷调查是不是真的能赚钱?

海外问卷调查是不是真的能赚钱&#xff1f;我来告诉你&#xff0c;我在橙河网络这家公司干了两年半的问卷调查&#xff0c;可以明确地告诉你&#xff1a;海外问卷调查确实可以赚钱&#xff0c;真的&#xff01; 海外问卷调查这个项目&#xff0c;在国内已经存在了很长时间&…

KVM动态在线迁移实操笔录

环境介绍 一台NFS&#xff08;192.168.184.132&#xff09; 一台KVM-a&#xff08;192.168.184.133&#xff09; 一台KVM-b&#xff08;192.168.184.134&#xff09; NFS配置 [rootlocalhost ~]# setenforce 0 //关闭selinux [rootlocalhost ~]# service iptables stop [root…

电子元器件网络变压器(网络滤波器 ̖ 脉冲变压器)的EMI产生原因

Hqst华强盛&#xff08;盈盛电子&#xff09;导读&#xff1a;网络变压器&#xff08;网络滤波器 ̖ 脉冲变压器&#xff0c;以下称网络变压器&#xff09;在工作过程中会产生电磁场&#xff0c;这可能会导致电磁干扰&#xff08;EMI&#xff09;。EMI会影响设备的性能和可靠性…

基于Java的图书商城管理系统设计与实现(源码+lw+部署文档+讲解等)

文章目录 前言具体实现截图论文参考详细视频演示为什么选择我自己的网站自己的小程序&#xff08;小蔡coding&#xff09; 代码参考数据库参考源码获取 前言 &#x1f497;博主介绍&#xff1a;✌全网粉丝10W,CSDN特邀作者、博客专家、CSDN新星计划导师、全栈领域优质创作者&am…

基于Java的图书馆借阅管理系统设计与实现(源码+lw+部署文档+讲解等)

文章目录 前言具体实现截图论文参考详细视频演示为什么选择我自己的网站自己的小程序&#xff08;小蔡coding&#xff09; 代码参考数据库参考源码获取 前言 &#x1f497;博主介绍&#xff1a;✌全网粉丝10W,CSDN特邀作者、博客专家、CSDN新星计划导师、全栈领域优质创作者&am…

设计模式:模板模式(C#、JAVA、JavaScript、C++、Python、Go、PHP)

简介&#xff1a; 模板模式&#xff0c;它是一种行为型设计模式&#xff0c;它定义了一个操作中的算法的框架&#xff0c;将一些步骤延迟到子类中实现&#xff0c;使得子类可以不改变一个算法的结构即可重定义该算法的某些特定步骤。 通俗地说&#xff0c;模板模式就是将某一行…

微信小程序数据交互------WXS的使用

&#x1f3ac; 艳艳耶✌️&#xff1a;个人主页 &#x1f525; 个人专栏 &#xff1a;《Spring与Mybatis集成整合》《Vue.js使用》 ⛺️ 越努力 &#xff0c;越幸运。 1.数据库连接 数据表结构&#xff1a; 数据测式&#xff1a; 2.后台配置 pom.xml <?xml version&quo…

重磅发布!RflySim Cloud 智能算法云仿真平台亮相,助力大规模集群算法高效训练

RflySim Cloud智能算法云仿真平台&#xff08;以下简称RflySim Cloud平台&#xff09;是由卓翼智能及飞思实验室为无人平台集群算法验证、大规模博弈对抗仿真、人工智能模型训练等前沿研究领域研发的平台。主要由环境仿真模块、物理效应计算模块、多智能体仿真模块、分布式网络…