2023_Spark_实验二十四:SparkStreaming读取Kafka数据源:使用Direct方式

news2025/1/7 5:58:00

SparkStreaming读取Kafka数据源:使用Direct方式

一、前提工作

  • 安装了zookeeper

  • 安装了Kafka

  • 实验环境:kafka + zookeeper + spark

  • 实验流程

二、实验内容

实验要求:实现的从kafka读取实现wordcount程序

启动zookeeper

zk.sh start

# zk.sh脚本 参考教程 https://blog.csdn.net/pblh123/article/details/134730738?spm=1001.2014.3001.5502

启动Kafka

kf.sh start

# kf.sh 参照教程 https://blog.csdn.net/pblh123/article/details/134730738?spm=1001.2014.3001.5502

 (测试用,实验不做)创建Kafka主题,如test,可参考:Kafka的安装与基本操作

--topic 定义topic名

--replication-factor  定义副本数

--partitions  定义分区数

--bootstrap-server  连接的Kafka Broker主机名称和端口号

--create 创建主题

--describe 查看主题详细描述

# 创建kafka主题测试
/opt/module/kafka_2.12-3.0.0/bin/kafka-topics.sh --create --bootstrap-server hd1:9092 --replication-factor 3 --partitions 1 --topic gnutest2

# 再次查看first主题的详情
/opt/module/kafka_2.12-3.0.0/bin/kafka-topics.sh --bootstrap-server hd1:9092 --describe --topic gnutest2

启动Kafka控制台生产者,可参考:Kafka的安装与基本操作

# 创建kafka生产者
/opt/module/kafka_2.12-3.0.0/bin/kafka-console-producer.sh --bootstrap-server hd1:9092 --topic gnutest2

创建maven项目

添加kafka依赖

       <!--- 添加streaming依赖 --->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_2.13</artifactId>
            <version>${spark.version}</version>
        </dependency>

       <!--- 添加streaming kafka依赖 --->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming-kafka-0-10_2.13</artifactId>
            <version>3.4.1</version>
        </dependency>

编写程序,如下所示:

package exams

import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkContext
import org.apache.spark.sql.SparkSession
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe

import java.lang

/**
 * @projectName SparkLearning2023  
 * @package exams  
 * @className exams.SparkStreamingReadKafka  
 * @description ${description}  
 * @author pblh123
 * @date 2023/12/1 15:19
 * @version 1.0
 *
 */
    
object SparkStreamingReadKafka {
  def main(args: Array[String]): Unit = {

    //  1. 创建spark,sc对象
    if (args.length != 2) {
      println("您需要输入一个参数")
      System.exit(5)
    }
    val musrl: String = args(0)
    val spark: SparkSession = new SparkSession.Builder()
      .appName(s"${this.getClass.getSimpleName}")
      .master(musrl)
      .getOrCreate()
    val sc: SparkContext = spark.sparkContext
    // 生成streamingContext对象
    val ssc: StreamingContext = new StreamingContext(sc, Seconds(5))

    //  2. 代码主体
    val bststrapServers = args(1)
    val kafkaParms: Map[String, Object] = Map[String, Object](
      "bootstrap.servers" -> bststrapServers, //kafka列表
      "key.deserializer" -> classOf[StringDeserializer], k和v 的序列化类型
      "value.deserializer" -> classOf[StringDeserializer],
      "group.id" -> "use_a_separate_group_id_for_each_stream", //消费者组
      "auto.offset.reset" -> "latest", //如果没有记录偏移量,第一次从最开始读,有偏移量,接着偏移量读
      "enable.auto.commit" -> (true: java.lang.Boolean) // 消费者不自动提交偏移量
    )

    val topics = Array("gnutest2", "t100")
    // createDirectStream: 主动拉取数据
    val stream = KafkaUtils.createDirectStream[String, String](
      ssc,
      PreferConsistent,
      Subscribe[String, String](topics, kafkaParms)
    )
    val mapDStream: DStream[(String, String)] = stream.map(record => (record.key, record.value))

    //kafka 是一个key value 格式的, 默认key 为null ,一般用不上
    val resultRDD: DStream[(String, Int)] = mapDStream.flatMap(_._2.split(" ")).map((_, 1)).reduceByKey(_ + _)

    // 打印
    resultRDD.print()

    //  3. 关闭sc,spark对象
    ssc.start()
    ssc.awaitTermination()
    ssc.stop()
    sc.stop()
    spark.stop()
  }
}

配置输入参数

生产者追加数据

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1277263.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

西南科技大学模拟电子技术实验三(BJT单管共射放大电路测试)预习报告

一、计算/设计过程 说明:本实验是验证性实验,计算预测验证结果。是设计性实验一定要从系统指标计算出元件参数过程,越详细越好。用公式输入法完成相关公式内容,不得贴手写图片。(注意:从抽象公式直接得出结果,不得分,页数可根据内容调整) 二、画出并填写实验指导书上…

数据结构 - 堆:TOP-K问题

问题描述 TOP-K问题&#xff1a;即求数据结合中前K个最大的元素或者最小的元素&#xff0c;一般情况下数据量都比较大 比如&#xff1a;专业前10名、世界500强、富豪榜、游戏中前100的活跃玩家等 对于Top-K问题&#xff0c;能想到的最简单直接的方式就是排序&#xff0c;但是&…

使用drawio图表,在团队中,做计划,设计和跟踪项目

使用drawio图表&#xff0c;在团队中&#xff0c;做计划&#xff0c;设计和跟踪项目 drawio是一款强大的图表绘制软件&#xff0c;支持在线云端版本以及windows, macOS, linux安装版。 如果想在线直接使用&#xff0c;则直接输入网址draw.io或者使用drawon(桌案), drawon.cn内部…

六、三台主机免密登录和时钟同步

目录 1、免密登录 1.1 为什么要免密登录 1.2 免密 SSH 登录的原理

Sass 同时导出JavaScript 和 CSS变量

Sass 官网 安装插件 注意 sass-loader 版本没设太高&#xff0c;否则会报错 Syntax Error: TypeError: this.getOptions is not a function npm i sass sass-loader10 -D创建 Sass 文件 variables.module.scss。注意这里是 module.scss&#xff1a; 否则报错 Cant find st…

安卓小程序与编译抓包

APK小程序渗透测试 查找bp的证书 在浏览器中打开bp代理&#xff0c;然后在网页中搜索hppps://burp 点击高级——接受风险并继续 拿到证书 将浏览器信任证书 打开设置 搜索证书——查看证书 点击导入——导入证书 证书验证成功后&#xff0c;访问网页&#xff08;吾爱破解&a…

linux上编写进度条

目录 一、预备的两个小知识1、缓冲区2、回车与换行 二、倒计时程序三、编写入门的进度条四、编写一个正式的五、模拟实现和下载速度相关的进度条 一、预备的两个小知识 1、缓冲区 首先认识一下缓冲区&#xff1a;先写一个.c文件如下&#xff1a; 我们执行一下这个程序时&…

windows ce Remote Process Explorer定位程序崩溃地址

windows ce Remote Process Explorer定位程序崩溃地址 一&#xff1a;下载地址二&#xff1a;使用1&#xff09;找到程序基准地址2) 定位程序异常位置 一&#xff1a;下载地址 链接&#xff1a;https://pan.baidu.com/s/1fQVBpputtRmynqa95DaPrg 提取码&#xff1a;cx65 二&a…

hdlbits系列verilog解答(真值表)-50

文章目录 一、问题描述二、verilog源码三、仿真结果一、问题描述 本节我们学习用真值表来描述组合逻辑的行为,通过真值表我们将组合逻辑的每一种输入和输出对应值都罗列出来。 对于一个N个输入的布尔函数,理论上有2的N次方输入组合。下表是一个3输入的例子。 假设现在我们来…

Vue---Echarts

项目需要用echarts来做数据展示&#xff0c;现记录vue3引入并使用echarts的过程。 1. 使用步骤 安装 ECharts&#xff1a;使用 npm 或 yarn 等包管理工具安装 ECharts。 npm install echarts 在 Vue 组件中引入 ECharts&#xff1a;在需要使用图表的 Vue 组件中&#xff0c;引入…

探索数据之美:深入学习Plotly库的强大可视化

1. 引言&#xff1a; Plotly 是一个交互性可视化库&#xff0c;可以用于创建各种漂亮的图表和仪表板。它支持多种编程语言&#xff0c;包括Python、R、JavaScript。在Python中&#xff0c;Plotly提供了Plotly Express和Graph Objects两个主要的绘图接口。 2. Plotly库简介&am…

第九节HarmonyOS 常用基础组件1-Text

一、组件介绍 组件&#xff08;Component&#xff09;是界面搭建与显示的最小单位&#xff0c;HarmonyOS ArkUI声名式为开发者提供了丰富多样的UI组件&#xff0c;我们可以使用这些组件轻松的编写出更加丰富、漂亮的界面。 组件根据功能可以分为以下五大类&#xff1a;基础组件…

优化生产流程,开启智能制造新时代——探索MES生产管理系统的优势

在当今高度竞争且日益全球化的制造业环境中&#xff0c;企业需要不断提升生产效率&#xff0c;同时也要降低成本。作为一种先进的生产管理工具&#xff0c;MES生产管理系统正在全球范围内受到制造业的广泛关注。本文将深入探讨MES生产管理系统的优势以及如何帮助企业实现这些目…

00后卷王真的很卷吗?

前言 都在传00后躺平、整顿职场&#xff0c;但该说不说&#xff0c;是真的卷&#xff0c;感觉我都要被卷废了... 前段时间&#xff0c;公司招了一个年轻人&#xff0c;其中有一个是00后&#xff0c;工作才一年多&#xff0c;直接跳槽到我们公司&#xff0c;薪资据说有18K&…

时序预测 | Python实现TCN时间卷积神经网络时间序列预测(多图,多指标)

时序预测 | Python实现TCN时间卷积神经网络时间序列预测(多图,多指标) 目录 时序预测 | Python实现TCN时间卷积神经网络时间序列预测(多图,多指标)预测效果基本介绍环境准备程序设计参考资料预测效果 基本介绍

分享4个工具,轻松搞定PDF和图像中提取文本

大型语言模型已经席卷了互联网&#xff0c;导致更多的人没有认真关注使用这些模型最重要的部分&#xff1a;高质量的数据&#xff01; 本文旨在提供一些有效从任何类型文档中提取文本的技术。 Python库 本文专注于Pytesseract、easyOCR、PyPDF2和LangChain库。实验数据是一个…

新功能?浅谈nuclei的反制思路

code新功能&#xff1f; 写poc时&#xff0c;习惯性查官方文档的时候&#xff0c;注意到了一个新的功能&#xff1a;code 链接直达&#xff1a;https://docs.projectdiscovery.io/templates/protocols/code 大概翻译下&#xff1a; Nuclei 支持在主机操作系统上执行外部代码。…

MySQL- CRUD-单表查询

一、INSERT 添加 公式 INSERT INTO table_name [(column [, column...])] VALUES (value [, value...]); 示例&#xff1a; CREATE TABLE goods (id INT ,good_name VARCHAR(10),price DOUBLE ); #添加数据 INSERT INTO goods (id,good_name,price ) VALUES (20,华为手机,…

基于linux的C语言环境下开源hashmap的使用与测试

C语言中没有C语言中map键值对容器的数据结构&#xff0c;为在C语言中提供一种hashmap数据结构&#xff0c;并提供hashmap的操作方法&#xff0c;具体包括新建、释放、清除、获得缓存数据量、设置数据、获取数据、浏览数据等操作&#xff0c;基于hashmap的开源代码很丰富&#x…

基于通义千问和向量数据构建问答知识库

参考&#xff1a;Java从0到1构建基于ChatGPT向量数据库的检索增强生成模型RAG-02 - 知乎 (zhihu.com) 1、先开通 阿里云的向量检索服务 如何开通向量检索服务并创建API-KEY_向量检索服务-阿里云帮助中心 (aliyun.com) 按流程申请 最后需要申请API-KEY 安装DashVector SDK M…