SparkStreaming学习——读取socket的数据和kafka生产者的消息

news2025/1/4 18:39:17

目录

一、Spark Streaming概述

二、添加依赖

三、配置log4j

1.依赖下载好后打开IDEA最左侧的外部库

2.找到spark-core

3.找到apache.spark目录

4.找到log4j-defaults.properties文件

5.将该文件放在资源目录下,并修改文件名

6.修改log4j.properties第19行的内容

四、Spark Streaming读取Socket数据流

1.代码编写

2.开启nc -lk

3.启动Scala程序

五、Spark Streaming读取kafka消息

1.代码编写

2.开启生产者sparkkafkastu并生产消息

3. 运行scala代码


一、Spark Streaming概述

        Spark Streaming 用于流式数据的处理。Spark Streaming 支持的数据输入源很多,例如:Kafka、Flume、Twitter、ZeroMQ 和简单的 TCP 套接字等等。数据输入后可以用 Spark 的RDD如:map、reduce、join、window 等进行运算。而结果也能保存在很多地方,如 HDFS,数据库等。

         Spark Streaming与Flink的区别:Spark Streaming是基于秒级别,而Flink是基于毫秒级别,是真正的实时流,Spark Streaming属于伪实时。因此,在选择实时流计算框架时,如果对实时速度要求不高的话,选择Spark Streaming基本足够。

        Spark Streaming的编程抽象是离散化流,也就是DStream。它是一个 RDD 序列,每个RDD代表数据流中一个时间片内的数据。

        应用于 DStream 上的转换操作都会转换为底层RDD上的操作。如对行 DStream中的每个RDD应用flatMap操作以生成单词 DStream 的RDD。

二、添加依赖

<properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <maven.compiler.source>1.8</maven.compiler.source>
    <maven.compiler.target>1.8</maven.compiler.target>
    <spark.version>3.1.2</spark.version>
    <mysql.version>8.0.29</mysql.version>
  </properties>
  <dependencies>
    <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <version>4.11</version>
      <scope>test</scope>
    </dependency>
    <!--  https://mvnrepository.com/artifact/org.apache.spark/spark-core  -->
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-core_2.12</artifactId>
      <version>${spark.version}</version>
    </dependency>
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-sql_2.12</artifactId>
      <version>${spark.version}</version>
    </dependency>
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-streaming_2.12</artifactId>
      <version>${spark.version}</version>
    </dependency>
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-streaming-kafka-0-10_2.12</artifactId>
      <version>${spark.version}</version>
    </dependency>
  </dependencies>

三、配置log4j

1.依赖下载好后打开IDEA最左侧的外部库

2.找到spark-core

3.找到apache.spark目录

4.找到log4j-defaults.properties文件

5.将该文件放在资源目录下,并修改文件名

6.修改log4j.properties第19行的内容

log4j.rootCategory=ERROR, console

四、Spark Streaming读取Socket数据流

1.代码编写

import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}

object SparkStreamDemo1 {
  def main(args: Array[String]): Unit = {
    val conf: SparkConf = new SparkConf().setMaster("local[2]").setAppName("sparkstream1")
    // 定义流,采集周期3秒
    val streamingContext = new StreamingContext(conf, Seconds(3))
    // TODO 配置数据源为指定机器和端口
    val socketLineStream: ReceiverInputDStream[String] = streamingContext.socketTextStream("lxm147", 8888)
    // TODO 业务处理
    val wordStream: DStream[String] = socketLineStream.flatMap(_.split("\\s+"))
    val mapStream: DStream[(String, Int)] = wordStream.map((_, 1))
    val wordCountStream: DStream[(String, Int)] = mapStream.reduceByKey(_ + _)

    // TODO 输出结果
    wordCountStream.print()
    // TODO 启动采集器
    streamingContext.start()
    streamingContext.awaitTermination()
  }
}

2.开启nc -lk

3.启动Scala程序

五、Spark Streaming读取kafka消息

1.代码编写

import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.{Seconds, StreamingContext}

object SparkStreamingKafkaSource {
  def main(args: Array[String]): Unit = {
    val conf: SparkConf = new SparkConf().setAppName("sparkKafkaStream").setMaster("local[*]")
    val streamingContext = new StreamingContext(conf, Seconds(5))

    val kafkaParams = Map(
      (ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "lxm147:9092"),
      (ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> "org.apache.kafka.common.serialization.StringDeserializer"),
      (ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> "org.apache.kafka.common.serialization.StringDeserializer"),
      (ConsumerConfig.GROUP_ID_CONFIG -> "sparkstreamgroup1")
    )
    
    val kafkaStream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream(
      streamingContext,
      LocationStrategies.PreferConsistent,
      // 如果没有topic需要创建
      // kafka-topics.sh --create --zookeeper lxm147:2181 --topic sparkkafkastu --partitions 1 --replication-factor 1
      ConsumerStrategies.Subscribe(Set("sparkkafkastu"), kafkaParams)
    )

    // KeyValue(key,value)
    val wordCountStream: DStream[(String, Int)] = kafkaStream.flatMap(_.value().toString.split("\\s+"))
      .map((_, 1))
      .reduceByKey(_ + _)

    wordCountStream.print()

    streamingContext.start()
    streamingContext.awaitTermination()
  }
}

2.开启生产者sparkkafkastu并生产消息

3. 运行scala代码

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/462753.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Vue.js学习-1

一、Vue.js环境准备 官网地址&#xff1a;Vue.js - 渐进式 JavaScript 框架 | Vue.js (vuejs.org) Vue.js v2教程&#xff1a;Vue.js (vuejs.org) 在浏览器中安装Vue调试工具&#xff1a;Installation | Vue Devtools (vuejs.org) VSCode安装见这里&#xff1a; 下载vue.j…

每日学术速递4.26

CV - 计算机视觉 | ML - 机器学习 | RL - 强化学习 | NLP 自然语言处理 Subjects: cs.CV 1.AutoNeRF: Training Implicit Scene Representations with Autonomous Agents 标题&#xff1a;AutoNeRF&#xff1a;使用自主代理训练隐式场景表示 作者&#xff1a;Pierre Marz…

macOS 13.4Beta 3(22F5049e)发布

系统介绍 4 月 26 日消息&#xff0c;苹果今日向 Mac 电脑用户推送了 macOS 13.4 开发者预览版 Beta 3 更新&#xff08;内部版本号&#xff1a;22F5049e&#xff09;&#xff0c;本次更新距离上次发布隔了 14 天。 macOS Ventura 带来了台前调度、连续互通相机、FaceTime 通…

Go | 一分钟掌握Go | 5 - 切片

作者&#xff1a;Mars酱 声明&#xff1a;本文章由Mars酱编写&#xff0c;部分内容来源于网络&#xff0c;如有疑问请联系本人。 转载&#xff1a;欢迎转载&#xff0c;转载前先请联系我&#xff01; 说明 切片和数组有点像&#xff0c;对于我的理解就是声明了固定长度的就是数…

「 Redis 」RDB和AOF持久化全面解析

「 Redis 」RDB和AOF持久化全面解析 参考&鸣谢 【说透Redis】10分钟彻底理解Redis的持久化机制&#xff1a;RDB和AOF 程序员读书 AOF 持久化是怎么实现的&#xff1f; xiaolinCoding Redis持久化之RDB与AOF 的区别 1024下午茶 文章目录 「 Redis 」RDB和AOF持久化全面解析前…

设计模式之解释器模式(C++)

作者&#xff1a;翟天保Steven 版权声明&#xff1a;著作权归作者所有&#xff0c;商业转载请联系作者获得授权&#xff0c;非商业转载请注明出处 一、解释器模式是什么&#xff1f; 解释器模式是一种行为型的软件设计模式&#xff0c;定义了一个解释器&#xff0c;来解释给定语…

C语言函数大全-- q 开头的函数

C语言函数大全 本篇介绍C语言函数大全-- q 开头的函数 1. qsort 1.1 函数说明 函数声明函数功能void qsort(void *base, size_t nmemb, size_t size, int (*compar)(const void *, const void *));用于将指定数组按指定顺序进行排序 参数&#xff1a; base &#xff1a; 指…

2023年,企业如何做好团队知识管理?

团队知识管理是一个组织管理中非常重要的组成部分。成熟的企业通常会非常注重团队知识管理的实践&#xff0c;以提高团队的协作效率和整体绩效。本文将介绍成熟企业如何做好团队知识管理&#xff0c;以提高企业的竞争力和创新能力。 一、了解团队知识管理的重要性 团队知识管…

【网络进阶】五种IO网络模型(二)

文章目录 1. 多路复用IO2. 异步IO3. 信号驱动IO 1. 多路复用IO I/O多路复用这个术语可能对一些人来说比较陌生&#xff0c;但提到select/epoll&#xff0c;就容易理解了。在某些场景下&#xff0c;这种I/O方式也被称为事件驱动I/O&#xff08;event-driven I/O&#xff09;。我…

[Gitops--5]APISIX

APISIX Apache APISIX是一款开源的高性能,动态云原生网关.Apache APISIX当前已经覆盖了API网关,LB,Ingress,Service,Mesh等多种场景 1. APISIX部署 使用Helm Chart部署Apache APISIX Ingress Controller Apache APISIX Ingress Controller目前和Apache APISIX网关是强关联的…

基于html+css的图展示34

准备项目 项目开发工具 Visual Studio Code 1.44.2 版本: 1.44.2 提交: ff915844119ce9485abfe8aa9076ec76b5300ddd 日期: 2020-04-16T16:36:23.138Z Electron: 7.1.11 Chrome: 78.0.3904.130 Node.js: 12.8.1 V8: 7.8.279.23-electron.0 OS: Windows_NT x64 10.0.19044 项目…

GoodSync 数据自动同步备份工具VS傲梅轻松备份系统数据备份工具 哪款更好?

备份和同步软件的使用越来越广泛&#xff0c;因为在这个数据驱动的时代&#xff0c;数据的备份和恢复非常重要。在这里我想向大家推荐两款备份和同步软件——GoodSync和傲梅轻松备份。 GoodSync是一款备份和同步软件&#xff0c;它可以在多个设备之间同步文件、文件夹、照片、音…

在电脑上剪辑视频用什么软件 如何在电脑上剪辑视频

工作中需要剪辑视频的场景越来越多了&#xff0c;视频剪辑已经成了打工人必备技能之一。但对很多新手小白来说&#xff0c;剪辑视频看起来比较困难&#xff0c;那可能是没有找到合适的软件和方法&#xff0c;下面就为大家介绍在电脑上剪辑视频用什么软件&#xff0c;如何在电脑…

Xcode14 设置Display Name不生效问题

一、前言 早在Xcode13苹果就对Info.plist做了一次大改革&#xff0c;新建的OC项目默认Info.plist文件是“空的”&#xff0c;Swift项目甚至干脆连Info.plist文件都没有了&#xff0c;苹果这样做是为了建立一个新的Info.plist管理方式&#xff0c;把Info.plist物理文件中的配置…

对数据库中存储的程序进行现代化改造,以使用 Amazon Aurora PostgreSQL 联合查询、pg_cron 和 Amazon Lambda

作为数据库迁移和现代化的一部分&#xff0c;您可以继续使用存储的程序和调度作业&#xff0c;将远程实例中的数据整合到集中式数据存储中。 Amazon Schema Conversion Tool&#xff08;Amazon SCT&#xff09;可帮助您将传统的 Oracle 和 SQL Server 函数转换为其等效的开源函…

PDF 预览和下载你是怎么实现的?

前言 在开发过程中要求对 PDF 类型的发票提供 预览 和 下载 功能&#xff0c;**PDF** 类型文件的来源又包括 H5 移动端 和 **PC 端**&#xff0c;而针对这两个不同端的处理会有些许不同&#xff0c;下文会有所提及。 针对 PDF 预览 的文章不在少数&#xff0c;但似乎都没有提…

排查和解决CentOS系统上Nacos服务启动报错“java.net.UnknownHostException: jmenv.tbsite.net“问题

背景 环境是CentOS7操作系统&#xff0c;nacos服务宕掉了&#xff0c;启动服务的时候报错。 Caused by: org.springframework.beans.BeanInstantiationException: Failed to instantiate [org.springframework.boot.web.servlet.FilterRegistrationBean]: Factory method ‘di…

太酷啦,Transformer 的有效上下文长度可扩展至百万级

夕小瑶科技说 原创作者 | 智商掉了一地、iven 用 Recurrent Memory Transformer 架构&#xff1a;可输入长度取决于内存大小 Transformer 因其在自然语言处理领域的成功应用而备受瞩目&#xff0c;同时在计算机视觉领域的研究中&#xff0c;诸多的多模态大模型如 ViT、CLIP、BL…

iperf3使用教程

文章目录 简介1.下载2. 使用测试实例&#xff1a;案例一 TCP通信测试案例二&#xff1a;UDP测试案例三 传输东西进行测试 iperfs3简单使用样例 简介 iPerf3是用于主动测试IP网络上最大可用带宽的工具。它支持时序、缓冲区、协议&#xff08;TCP&#xff0c;UDP&#xff0c;SCT…

软件测试方法——等价类划分法详解

1、等价类划分法的介绍和概念 划分 指互不相交的一组子集&#xff0c;这些子集的并是整个集合。 对测试的意义&#xff1a;完备性和无冗余性。 等价类 等价类是指某个输入域的子集合。在该子集合中&#xff0c;各个输入数据对于揭露程序中的错误都是等效的&#xff0c;具有等…