SparkStreaming在实时处理的两个场景示例

news2024/12/27 4:21:48

简介

Spark Streaming是Apache Spark生态系统中的一个组件,用于实时流式数据处理。它提供了类似于Spark的API,使开发者可以使用相似的编程模型来处理实时数据流。

Spark Streaming的工作原理是将连续的数据流划分成小的批次,并将每个批次作为RDD(弹性分布式数据集)来处理。这样,开发者可以使用Spark的各种高级功能,如map、reduce、join等,来进行实时数据处理。Spark Streaming还提供了内置的窗口操作、状态管理、容错处理等功能,使得开发者能够轻松处理实时数据的复杂逻辑。

Spark Streaming支持多种数据源,包括Kafka、Flume、HDFS、S3等,因此可以轻松地集成到各种数据管道中。它还能够与Spark的批处理和SQL引擎进行无缝集成,从而实现流式处理与批处理的混合使用。
在这里插入图片描述

本文以 TCP、kafka场景讲解spark streaming的使用

消息队列下的信息铺抓

类似消息队列的有redis、kafka等核心组件。
本文以kafka为例,向kafka中实时抓取数据,

pom.xml中添加以下依赖

<dependencies>
    <!-- Spark Core -->
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_2.12</artifactId>
        <version>3.2.0</version>
    </dependency>

    <!-- Spark Streaming -->
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-streaming_2.12</artifactId>
        <version>3.2.0</version>
    </dependency>

    <!-- Spark SQL -->
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-sql_2.12</artifactId>
        <version>3.2.0</version>
    </dependency>

    <!-- Kafka -->
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>2.8.0</version>
    </dependency>

    <!-- Spark Streaming Kafka Connector -->
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-streaming-kafka-0-10_2.12</artifactId>
        <version>3.2.0</version>
    </dependency>

    <!-- PostgreSQL JDBC -->
    <dependency>
        <groupId>org.postgresql</groupId>
        <artifactId>postgresql</artifactId>
        <version>42.2.24</version>
    </dependency>
</dependencies>

创建项目编写以下代码实现功能

package org.example;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.sql.*;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka010.ConsumerStrategies;
import org.apache.spark.streaming.kafka010.KafkaUtils;
import org.apache.spark.streaming.kafka010.LocationStrategies;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.util.*;

public class SparkStreamingKafka {
    public static void main(String[] args) throws InterruptedException {
        // 创建 Spark 配置
        SparkConf sparkConf = new SparkConf()
                .setAppName("spark_kafka")
                .setMaster("local[*]")
                .setExecutorEnv("setLogLevel", "ERROR");//设置日志等级为ERROR,避免日志增长导致的磁盘膨胀

        // 创建 Spark Streaming 上下文
        JavaStreamingContext streamingContext = new JavaStreamingContext(sparkConf, new Duration(2000)); // 间隔两秒扑捉一次

        // 创建 Spark SQL 会话
        SparkSession sparkSession = SparkSession.builder().config(sparkConf).getOrCreate();


        // 设置 Kafka 相关参数
        Map<String, Object> kafkaParams = new HashMap<>();
        kafkaParams.put("bootstrap.servers", "10.0.0.105:9092,10.0.0.106:9092,10.0.0.107:9092");
        kafkaParams.put("key.deserializer", StringDeserializer.class);
        kafkaParams.put("value.deserializer", StringDeserializer.class);
        kafkaParams.put("auto.offset.reset", "earliest");
        // auto.offset.reset可指定参数有
        // latest:从分区的最新偏移量开始读取消息。
        // earliest:从分区的最早偏移量开始读取消息。
        // none:如果没有有效的偏移量,则抛出异常。
        kafkaParams.put("enable.auto.commit", true);  //采用自动提交offset 的模式
        kafkaParams.put("auto.commit.interval.ms",2000);//每隔离两秒提交一次commited-offset
        kafkaParams.put("group.id", "spark_kafka"); //消费组名称


        // 创建 Kafka stream
        Collection<String> topics = Collections.singletonList("spark_kafka"); // Kafka 主题名称
        JavaDStream<ConsumerRecord<String, String>> kafkaStream = KafkaUtils.createDirectStream(
                streamingContext,
                LocationStrategies.PreferConsistent(),
                ConsumerStrategies.Subscribe(topics, kafkaParams)  //订阅kafka
        );

        //定义数据结构
        StructType schema = new StructType()
                .add("key", DataTypes.LongType)
                .add("value", DataTypes.StringType);

        kafkaStream.foreachRDD((VoidFunction<JavaRDD<ConsumerRecord<String, String>>>) rdd -> {
            // 转换为 DataFrame
            Dataset<Row> df = sparkSession.createDataFrame(rdd.map(record -> {
                return RowFactory.create(record.offset(), record.value());  //将偏移量和value聚合
            }), schema);

            // 写入到 PostgreSQL
            df.write()
                    //选择写入数据库的模式
                    .mode(SaveMode.Append)//采用追加的写入模式
                    //协议
                    .format("jdbc")
                    //option 参数
                    .option("url", "jdbc:postgresql://localhost:5432/postgres") // PostgreSQL 连接 URL
                    //确定表名
                    .option("dbtable", "public.spark_kafka")//指定表名
                    .option("user", "postgres") // PostgreSQL 用户名
                    .option("password", "postgres") // PostgreSQL 密码
                    .save();
        });
        // 启动 Spark Streaming
        streamingContext.start();
        // 等待 Spark Streaming 应用程序终止
        streamingContext.awaitTermination();
    }
}

在执行代码前,向创建名为spark_kafka的topic

kafka-topics.sh --create --topic spark_kafka --bootstrap-server 10.0.0.105:9092,10.0.0.106:9092,10.0.0.107:9092

向spark_kafka 主题进行随机推数

kafka-producer-perf-test.sh --topic spark_kafka --thrghput 10 --num-records 10000 --record-size 100000 --producer-props bootstrap.servers=10.0.0.105:9092,10.0.0.106:9092,10.0.0.107:9092

运行过程中消费的offset会一直被提交到每一个分区
在这里插入图片描述

此时在数据库中查看,数据已经实时落地到库中
在这里插入图片描述

TCP

TCP环境下,实时监控日志的输出,可用于监控设备状态、环境变化等。当监测到异常情况时,可以实时发出警报。

package org.example;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.sql.*;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka010.ConsumerStrategies;
import org.apache.spark.streaming.kafka010.KafkaUtils;
import org.apache.spark.streaming.kafka010.LocationStrategies;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.util.*;

public class SparkStreamingKafka {
    public static void main(String[] args) throws InterruptedException {
        // 创建 Spark 配置
        SparkConf sparkConf = new SparkConf()
                .setAppName("spark_kafka") // 设置应用程序名称
                .setMaster("local[*]") // 设置 Spark master 为本地模式,[*]表示使用所有可用核心

                // 设置日志等级为ERROR,避免日志增长导致的磁盘膨胀
                .setExecutorEnv("setLogLevel", "ERROR");

        // 创建 Spark Streaming 上下文
        JavaStreamingContext streamingContext = new JavaStreamingContext(sparkConf, new Duration(2000)); // 间隔两秒扑捉一次

        // 创建 Spark SQL 会话
        SparkSession sparkSession = SparkSession.builder().config(sparkConf).getOrCreate();


        // 设置 Kafka 相关参数
        Map<String, Object> kafkaParams = new HashMap<>();
        kafkaParams.put("bootstrap.servers", "10.0.0.105:9092,10.0.0.106:9092,10.0.0.107:9092"); // Kafka 服务器地址
        kafkaParams.put("key.deserializer", StringDeserializer.class); // key 反序列化器类
        kafkaParams.put("value.deserializer", StringDeserializer.class); // value 反序列化器类
        kafkaParams.put("auto.offset.reset", "earliest"); // 从最早的偏移量开始消费消息
        kafkaParams.put("enable.auto.commit", true);  // 采用自动提交 offset 的模式
        kafkaParams.put("auto.commit.interval.ms", 2000); // 每隔两秒提交一次 committed-offset
        kafkaParams.put("group.id", "spark_kafka"); // 消费组名称


        // 创建 Kafka stream
        Collection<String> topics = Collections.singletonList("spark_kafka"); // Kafka 主题名称
        JavaDStream<ConsumerRecord<String, String>> kafkaStream = KafkaUtils.createDirectStream(
                streamingContext,
                LocationStrategies.PreferConsistent(),
                ConsumerStrategies.Subscribe(topics, kafkaParams)  // 订阅 Kafka
        );

        // 定义数据结构
        StructType schema = new StructType()
                .add("key", DataTypes.LongType)
                .add("value", DataTypes.StringType);

        kafkaStream.foreachRDD((VoidFunction<JavaRDD<ConsumerRecord<String, String>>>) rdd -> {
            // 转换为 DataFrame
            Dataset<Row> df = sparkSession.createDataFrame(rdd.map(record -> {
                return RowFactory.create(record.offset(), record.value());  // 将偏移量和 value 聚合
            }), schema);

            // 写入到 PostgreSQL
            df.write()
                    // 选择写入数据库的模式
                    .mode(SaveMode.Append) // 采用追加的写入模式
                    // 协议
                    .format("jdbc")
                    // option 参数
                    .option("url", "jdbc:postgresql://localhost:5432/postgres") // PostgreSQL 连接 URL
                    // 确定表名
                    .option("dbtable", "public.spark_kafka") // 指定表名
                    .option("user", "postgres") // PostgreSQL 用户名
                    .option("password", "postgres") // PostgreSQL 密码
                    .save();
        });
        // 启动 Spark Streaming
        streamingContext.start();
        // 等待 Spark Streaming 应用程序终止
        streamingContext.awaitTermination();
    }
}


在10.0.0.108 打开9999端口键入数值 ,使其被spark接收到并进行运算

nc -lk 9999

开启端口可以键入数值 此时会在IDEA的控制台显示其计算值
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1484654.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

redis04 发布与订阅

一种消息通信模式&#xff1a;发布者&#xff08;pub&#xff09;发布消息&#xff0c;订阅者&#xff08;sub&#xff09;接收消 息。 redis客户端可以订阅任意数量的频道。 发布订阅流程图 发布和订阅实现 打开两个窗口订阅channel1频道&#xff0c;再打开一个窗口向chan…

ssh连接github报错:ssh_exchange_identification: Connection closed by remote host

报错截图&#xff1a;ssh_exchange_identification: Connection closed by remote host 配置步骤&#xff1a; 生成新的 SSH 密钥并将其添加到 ssh-agent - GitHub 文档 这个报错是我在寒武纪MLU270的dcoker环境中遇到的&#xff0c;但是在windows系统中可以通过SSH的方式git…

Excel 按奇数偶数列处理数据

目录 一. 需求背景1.1 获取偶数列的数据1.2 奇偶列数据互换 二. 解决方式2.1 为列添加奇偶辅助列2.2 通过公式将奇偶列互换 一. 需求背景 1.1 获取偶数列的数据 ⏹ 最近在整理歌单&#xff0c;发现部分歌曲没有歌词&#xff0c;于是打算自己制作一份。 从网上找到了歌词&…

从下一代车规MCU厘清存储器的发展(1)

目录 1.车规MCU制程工艺朝28nm进发 2.MCU存储器概述 3.MCU大厂的选择 3.1 瑞萨自研STT-MRAM 3.2 ST专注PCM 3.3 英飞凌和台积电联手RRAM 3.4 NXP如何计划eNVM 4.小结 1.车规MCU制程工艺朝28nm进发 随着英飞凌发布了关于AURIX TC4xx系列即将量产的新闻&#xff0c;国际…

RS编码的FPGA实现

RS编码&#xff0c;即Reed-solomon codes&#xff0c;是一类纠错能力很强的特殊的非二进制BCH码&#xff08;BCH码是一种有限域中的线性分组码&#xff0c;具有纠正多个随机错误的能力&#xff09;。对于任选正整数S可构造一个相应的码长为nqS-1的 q进制BCH码&#xff0c;而q作…

机器学习,数学统计常用数学符号

通用字体&#xff0c;符号规则 x x x 标量 x \boldsymbol{x} x 向量 X \boldsymbol{X} X 随机向量、矩阵 χ \chi \quad χ 集合 x ^ \hat{x} x^ 估计或近似值 x ∗ x^* x∗ 最优值 x ˉ \bar{x} xˉ 平均值 常见的数学符号 ∀ \forall \quad ∀ 对任意 ∃ \exists ∃ 存在 ∝…

青少年CTF擂台挑战赛 2024 #Round 1 Web方向题解 WP 全

EasyMD5 题目描述&#xff1a;php没有难题 考点总结&#xff1a;脑洞题目&#xff0c;不如我出&#xff08;狗头 只允许两个都上传pdf文件。 文件还不能太大了。burp多次发包发现要求两个pdf内容不一样 不一样时候&#xff0c;提示我们MD5碰撞。 科学计数法绕过 PHP的后门 …

快递平台独立版小程序源码|带cps推广营销流量主+前端

源码介绍&#xff1a; 快递代发快递代寄寄件小程序可以对接易达云洋一级总代 快递小程序&#xff0c;接入云洋/易达物流接口&#xff0c;支持选择快递公司&#xff0c;三通一达&#xff0c;极兔&#xff0c;德邦等&#xff0c;功能成熟 如何收益: 1.对接第三方平台成本大约4元…

vscode如何远程到linux python venv虚拟环境开发?(python虚拟环境、vscode远程开发、vscode远程连接)

文章目录 1. 安装VSCode2. 安装扩展插件3. 配置SSH连接4. 输入用户名和密码5. 打开远程文件夹6. 创建/选择Python虚拟环境7. 安装Python插件 Visual Studio Code (VSCode) 提供了一种称为 Remote Development 的功能&#xff0c;允许用户在远程系统、容器或甚至 Windows 子系统…

Linux:kubernetes(k8s)node节点加入master主节点(3)

Linux&#xff1a;kubernetes&#xff08;k8s&#xff09;搭建mater节点&#xff08;kubeadm&#xff0c;kubectl&#xff0c;kubelet&#xff09;-CSDN博客https://blog.csdn.net/w14768855/article/details/136415575?spm1001.2014.3001.5502 我在上一章部署好了主节点&…

MySQL 数据库表设计和优化

一、数据结构设计 正确的数据结构设计对数据库的性能是非常重要的。 在设计数据表时&#xff0c;尽量遵循一下几点&#xff1a; 将数据分解为合适的表&#xff0c;每个表都应该有清晰定义的目的&#xff0c;避免将过多的数据存储在单个表中。使用适当的数据类型来存储数据&…

鸿蒙Harmony应用开发—ArkTS声明式开发(通用属性:显隐控制)

控制组件是否可见。 说明&#xff1a; 从API Version 7开始支持。后续版本如有新增内容&#xff0c;则采用上角标单独标记该内容的起始版本。 visibility visibility(value: Visibility) 控制组件的显隐。 卡片能力&#xff1a; 从API version 9开始&#xff0c;该接口支持在…

HarmonyOS Full SDK的安装

OpenHarmony的应用开发工具HUAWEI DevEco Studio现在随着OpenHarmony版本发布而发布,只能在版本发布说明中下载,例如最新版本的OpenHarmony 4.0 Release。对应的需要下载DevEco Studio 4.0 Release,如下图。 图片 下载Full SDK主要有两种方式,一种是通过DevEco Studio下载…

2024 年 5 大移动应用安全预测

2024 年已经到来&#xff0c;企业必须为接下来的事情做好准备。为未来做好准备需要回顾过去。企业可以从那里判断自己当前的状态&#xff0c;从而做出准确的预测。 移动应用程序安全仍然是企业关注的一个重要问题&#xff0c;特别是当消费者依赖应用程序来完成更重要的任务时。…

python3装饰器

装饰器 它允许你修改函数或类的行为&#xff0c;而不更改其源代码。实质上&#xff0c;装饰器是接受另一个函数作为参数并返回一个包装原始函数的新函数。这样&#xff0c;你可以在不修改原始函数的情况下&#xff0c;添加一些额外的功能或逻辑。 def time_cost(func):"…

鸿蒙Harmony应用开发—ArkTS声明式开发(通用属性:Popup控制)

给组件绑定popup弹窗&#xff0c;并设置弹窗内容&#xff0c;交互逻辑和显示状态。 说明&#xff1a; 从API Version 7开始支持。后续版本如有新增内容&#xff0c;则采用上角标单独标记该内容的起始版本。 popup弹窗的显示状态在onStateChange事件回调中反馈&#xff0c;其显…

如何修炼成“神医”——《OceanBase诊断系列》之一

本系列是基于OcenaBase 开发工程师在工作中的一些诊断经验&#xff0c;也欢迎大家分享相关经验。 1. 关于神医的故事 扁鹊&#xff0c;中国古代第一个被正史记载的医生&#xff0c;他的成才之路非常传奇。年轻时&#xff0c;扁鹊是一家客栈的主管。有一位名叫长桑君的客人来到…

Vue3_2024_2天【Vue3组合式setup用法及响应式ref和reactive】

第一&#xff1a;浅谈 | 不可不知 1.vue3目录介绍&#xff08;区别Vue2没有的&#xff09; vue3&#xff0c;默认使用ts语言&#xff0c;但是ts一开始无法识别某些文件&#xff0c;这里是系统默认配置&#xff1b; 2.vue2中的入口文件是main.js&#xff0c;而vue3这里的入口文…

2024年2月最新微信域名检测拦截接口源码

这段PHP代码用于检测指定域名列表中的域名是否被封。代码首先定义了一个包含待检测域名的数组 $domainList&#xff0c;然后遍历该数组&#xff0c;对每个域名发送HTTP请求并检查响应内容以判断域名是否被封。 具体步骤如下&#xff1a; 1. 定义待检测的域名列表。 2. 遍历域名…

探索Manticore Search:开源全文搜索引擎的强大功能

在当今信息爆炸的时代&#xff0c;数据的快速检索变得至关重要。无论是在电子商务网站、新闻门户还是企业内部文档&#xff0c;高效的搜索引擎都是确保用户满意度和工作效率的关键因素之一。而在搜索引擎领域&#xff0c;Manticore Search 作为一款开源的全文搜索引擎&#xff…