【Spark分布式内存计算框架——Spark Streaming】8. Direct 方式集成底层原理 集成Kafka 0.10.x

news2025/1/22 18:55:01

Direct 方式集成底层原理

SparkStreaming集成Kafka采用Direct方式消费数据,如下三个方面优势:

第一、简单的并行度(Simplified Parallelism)

  • 读取topics的总的分区数目 = 每批次RDD中分区数目;
  • topic中每个分区数据 被读取到 RDD中每个分区进行处理
    在这里插入图片描述

第二、高效(Efficiency)

  • 处理数据比使用Receiver接收数据高效很多
  • 使用Receiver接收数据的时候,要将数据存储到Executor、为了可靠性还需要将数据存储文件系统中WAL

第三、Exactly-once semantics

  • 能保证一次性语义,从Kafka消费数据仅仅被消费一次,不会重复消费或者不消费
  • 在Streaming数据处理分析中,需要考虑数据是否被处理及被处理次数,称为消费语义
    • At most once:最多一次,比如从Kafka Topic读取数据最多消费一次,可能出现不消费,此时数据丢失;
    • At least once:至少一次,比如从Kafka Topic读取数据至少消费一次,可能出现多次消费数据;
    • Exactly once:精确一次,比如从Kafka topic读取数据当且仅当消费一次,不多不少,最好的状态
      在这里插入图片描述

深入剖析SparkStreaming采用Direct方式消费Kafka数据,底层原理:

官方:this approach periodically queries Kafka for the latest offsets in each topic+partition, and accordingly
defines the offset ranges to process in each batch. When the jobs to process the data are launched, Kafka’s
simple consumer API is used to read the defined ranges of offsets from Kafka (similar to read files from
a file system).

示意图如下:
在这里插入图片描述

采用Direct方式消费数据时,需要设置每批次处理数据的最大量,防止【波峰】时数据太多,导致批次数据处理有性能问题:

  • 参数:spark.streaming.kafka.maxRatePerPartition
  • 含义:Topic中每个分区每秒中消费数据的最大值
  • 举例说明:
    • BatchInterval:5s、Topic-Partition:3、maxRatePerPartition: 10000
    • 最大消费数据量:10000 * 3 * 5 = 150000 条

4.3 集成Kafka 0.10.x

使用Kafka 0.10.+提供新版本Consumer API集成Streaming,实时消费Topic数据,进行处理。
文档:http://spark.apache.org/docs/2.2.0/streaming-kafka-0-10-integration.html
添加相关Maven依赖:

<!-- Spark Streaming 与Kafka 0.10.0 集成依赖-->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
<version>2.4.5</version>
</dependency>

目前企业中基本都使用New Consumer API集成,优势如下:
第一、类似 Old Consumer API中Direct方式

  • 直接到Kafka Topic中依据偏移量范围获取数据,进行处理分析;
  • The Spark Streaming integration for Kafka 0.10 is similar in design to the 0.8 Direct Stream approach;

第二、简单并行度1:1

  • 每批次中RDD的分区与Topic分区一对一关系;
  • It provides simple parallelism, 1:1 correspondence between Kafka partitions and Spark partitions, and access to offsets and metadata;
  • 获取Topic中数据的同时,还可以获取偏移量和元数据信息;

工具类KafkaUtils中createDirectStream函数API使用说明(函数声明):
在这里插入图片描述

具体演示案例代码如下:

import java.util.Date
import org.apache.commons.lang3.time.FastDateFormat
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.{Seconds, StreamingContext}
/**
* Streaming通过Kafka New Consumer消费者API获取数据
*/
object StreamingSourceKafka {
def main(args: Array[String]): Unit = {
// 1. 构建StreamingContext流式上下文实例对象
val ssc: StreamingContext = {
// a. 创建SparkConf对象,设置应用配置信息
val sparkConf = new SparkConf()
.setAppName(this.getClass.getSimpleName.stripSuffix("$"))
.setMaster("local[3]")
// b.创建流式上下文对象, 传递SparkConf对象,TODO: 时间间隔 -> 用于划分流式数据为很多批次Batch
val context = new StreamingContext(sparkConf, Seconds(5))
// c. 返回
context
}
// TODO: 2. 读取Kafka Topic中数据
/*
def createDirectStream[K, V](
ssc: StreamingContext,
locationStrategy: LocationStrategy,
consumerStrategy: ConsumerStrategy[K, V]
): InputDStream[ConsumerRecord[K, V]]
*/
// i.位置策略
val locationStrategy: LocationStrategy = LocationStrategies.PreferConsistent
/*
def Subscribe[K, V](
topics: ju.Collection[jl.String],
kafkaParams: ju.Map[String, Object]
): ConsumerStrategy[K, V]
*/
// ii.读取哪些Topic数据
val topics = Array("wc-topic")
// iii.消费Kafka 数据配置参数
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> "node1.itcast.cn:9092",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "group_id_streaming_0001",
"auto.offset.reset" -> "latest",
"enable.auto.commit" -> (false: java.lang.Boolean)
)
// iv.消费数据策略
val consumerStrategy: ConsumerStrategy[String, String] = ConsumerStrategies.Subscribe(
topics, kafkaParams
)
// v.采用新消费者API获取数据,类似于Direct方式
val kafkaDStream: DStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream(
ssc, locationStrategy, consumerStrategy
)
// 3. 对每批次的数据进行词频统计
val resultDStream: DStream[(String, Int)] = kafkaDStream.transform(kafkaRDD => {
val resultRDD: RDD[(String, Int)] = kafkaRDD
.map(record => record.value()) // 获取Message数据
// 过滤不合格的数据
.filter(line => null != line && line.trim.length > 0)
// 按照分隔符划分单词
.flatMap(line => line.trim.split("\\s+"))
// 转换数据为二元组,表示每个单词出现一次
.map(word => (word, 1))
// 按照单词分组,聚合统计
.reduceByKey((tmp, item) => tmp + item)
resultRDD
})
// 4. 将结果数据输出 -> 将每批次的数据处理以后输出
resultDStream.foreachRDD{ (rdd, time) =>
val batchTime: String = FastDateFormat.getInstance("yyyy/MM/dd HH:mm:ss")
.format(new Date(time.milliseconds))
println("-------------------------------------------")
println(s"Time: $batchTime")
println("-------------------------------------------")
// TODO: 先判断RDD是否有数据,有数据在输出哦
if(!rdd.isEmpty()){
rdd
// 对于结果RDD输出,需要考虑降低分区数目
.coalesce(1)
// 对分区数据操作
.foreachPartition{iter =>iter.foreach(item => println(item))}
}
}
// 5. 对于流式应用来说,需要启动应用
ssc.start()
// 流式应用启动以后,正常情况一直运行(接收数据、处理数据和输出数据),除非人为终止程序或者程序异常停止
ssc.awaitTermination()
// 关闭流式应用(参数一:是否关闭SparkContext,参数二:是否优雅的关闭)
ssc.stop(stopSparkContext = true, stopGracefully = true)
}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/377699.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

布局三八女王节,巧借小红书数据分析工具成功引爆618

对于小红书“她”经济来说&#xff0c;没有比三八节更好的阵地了。伴随三八女王节逐渐临近&#xff0c;各大品牌蓄势待发&#xff0c;这场开春后第一个S级大促活动&#xff0c;看看品牌方们可以做什么&#xff1f; 洞察流量&#xff0c;把握节点营销时机 搜索小红书2023年的三…

学员作品|微博“绿洲”APP产品分析

一产品架构1. 产品功能架构图绿洲的主要功能模块可以拆分为六部分&#xff1a;首页、发现、发布动态、个人中心、水滴、消息。整体功能架构图如下&#xff1a;2. 用户使用路径图用于浏览动态&#xff1a;用于发布动态&#xff1a;新用户引导路径&#xff1a;二市场分析1. 产品定…

【Linux】Linux中gcc/g++的使用

本期主题&#xff1a;程序的编译过程和gcc/g的使用博客主页&#xff1a;小峰同学分享小编的在Linux中学习到的知识和遇到的问题小编的能力有限&#xff0c;出现错误希望大家不吝赐&#x1f341; 1.背景知识 预处理&#xff08;进行宏替换&#xff0c;去注释&#xff0c;头文件的…

索引、索引失效、索引的存储

文章目录1.索引种类2.创建索引注意点3.索引失效的情况4.为什么索引使用B树存储&#xff08;1&#xff09;如果使用数组来存储索引&#xff08;2&#xff09;如果使用二叉查找树来存储索引&#xff08;3&#xff09;如果使用平衡二叉查找树来存储索引&#xff08;4&#xff09;如…

同为(TOWE)防雷产品助力福建移动南平分公司防雷改造

01 公司简介中国移动通信集团福建有限公司南平分公司属于福建移动地级分公司&#xff0c;所属行业为电信、广播电视和卫星传输服务。现已建成覆盖范围广、业务品种多、通信质量高的综合通信网络&#xff0c;具备行业领先的经营管理制度。移动通信大楼的综合防雷及地接系统&…

第八节 构造器和this关键字、封装

构造器的作用 定义在类中的&#xff0c;可以用于初始化一个类的对象&#xff0c;并返回对象的地址。 构造器的注意事项 1.任何类定义出来&#xff0c;默认就自带了无参数构造器&#xff0c;写不写都有。 2.一旦定义了有参数构造器&#xff0c;那么无参数构造器就没有了&#xf…

互联网人看一看,这些神器你用过哪些?

很多小伙伴在剪辑视频的过程中经常可以看到一些语音素材&#xff0c;经常刷视频的小伙伴也可以看到很多视频中经常出现一些AI合成的声音或者音效&#xff0c;这些配音可以给视频增添很多亮点&#xff01;那么大家都是怎么将文字转语音的呢&#xff1f;今天给大家分享5款非常专业…

【Git】Git常用命令及练习—Git环境配置

目录 一、Git环境配置 1.1基本配置 1.2为常用指令配置别名&#xff08;可选&#xff09; 1.3 解决GitBash乱码问题 二、Git命令详细操作 1.获取本地仓库 2.基础操作命令及练习 基础操作练习 3.分支命令及练习 开发中分支使用原则与流程 分支练习 &#x1f49f; 创作不…

Ubuntu20.04安装Cuckoo

参考链接&#xff1a; &#xff08;1&#xff09;【主要参考】http://www.wityx.com/post/134851_1_1.html &#xff08;2&#xff09;【主要参考】在Ubuntu18.04上搭建Cuckoo Sandbox2.0.7 https://www.jianshu.com/p/4dd6373fa206 &#xff08;3&#xff09;与Cuckoo的斗智斗…

Kafka入门(五)

下面聊聊Kafka常用命令 1、Topic管理命令 以topic&#xff1a;test_1为例 1.1、创建topic ./bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 3 --partitions 3 --topic test_1参数说明&#xff1a; –bootstrap-server&#xff1a;…

【论文笔记】Decoupling Representation and Classifier for Long-Tailed Recognition

这一篇其实并不是提出什么新的东西&#xff0c;而且是做了点类似综述的技术调用实验。省流&#xff1a;T-normalization最好用 摘要 现状&#xff1a;Existing solutions usually involve class-balancing strategies, e.g. by loss re-weighting, data re-sampling, or tran…

【操作方法】windows防火墙添加出入站规则方法

【操作方法】windows防火墙添加出入站规则方法说明一、入站规则1.打开防火墙&#xff0c;点击“高级设置”2.点击“入站规则”后点击“新建规则”3.例3.1选择“端口”3.2添加需要放通的端口3.3选择操作动作为“允许连接”3.4选择应用区域&#xff0c;此处我选择所有区域二、出站…

Linux: malloc()的指向指针发生指向偏移后,释放前需要将指针指向复原。

Linux: malloc()的指向指针发生指向偏移后&#xff0c;释放前需要将指针指向复原。 #include <stdlib.h> #include <stdio.h> #include <unistd.h> #include <sys/types.h> #include <sys/stat.h> #include <string.h> #include <time…

A Contextual-Bandit Approach to Personalized News Article Recommendation-论文学习

A Contextual-Bandit Approach to Personalized News Article Recommendation-论文学习 github地址&#xff1a;bandit-learning 摘要 该算法根据用户和文章的上下文信息依次选择文章为用户服务&#xff0c;同时根据用户点击反馈调整其文章选择策略&#xff0c;以最大化用户…

不是,到底有多少种图片懒加载方式?

一、也是我最开始了解到的 js方法&#xff0c;利用滚动事件&#xff0c;判断当时的图片位置是否在可视框内&#xff0c;然后进行渲染。 弊端&#xff1a;代码冗杂&#xff0c;你还要去监听页面的滚动事件&#xff0c;这本身就是一个不建议监听的事件&#xff0c;即便是我们做了…

Qt音视频开发18-不同视频打开无缝切换

一、前言 在轮询视频的时候&#xff0c;通常都是需要将之前的视频全部关闭&#xff0c;然后打开下一组视频&#xff0c;在这个切换的过程中&#xff0c;如果是按照常规的做法&#xff0c;比如先关闭再打开新的视频&#xff0c;肯定会出现空白黑屏之类的过度空白区间&#xff0…

【解决办法】windows防火墙出入站规则放通telnet方法

【操作方法】windows防火墙出站规则放通telnet方法一、出站规则1.新建出站规则中选择“程序”2.选择路径&#xff0c;点击“下一页”3.选择“允许连接”4.选择所有区域二、入站规则注&#xff1a;打开防火墙添加出入站规则参考【操作方法】windows防火墙添加出入站规则方法 一、…

Learining C++ No.12【vector】

引言&#xff1a; 北京时间&#xff1a;2023/2/27/11:42&#xff0c;高数考试还在进行中&#xff0c;我充分意识到了学校的不高级&#xff0c;因为题目真的没什么意思&#xff0c;虽然挺平易近人&#xff0c;但是……&#xff0c;考试期间时间比较放松&#xff0c;所以不能耽误…

通过python技术获取甲流分布数据

近期&#xff0c;多地学校出现因甲流导致的班级停课&#xff0c;儿科甲流患者就诊量呈数倍增长。此轮甲流为何如此严重&#xff1f;感染甲流之后会出现哪些症状&#xff1f; 经过专家的介绍甲流之所以这么严重有这些原因导致的。一、疫情完全放开后很多孩子不戴口罩了&#x…

CData Drivers for Cassandra Crack

CData Drivers for Cassandra Crack Cassandra JDBC驱动程序允许用户连接Cassandra的实时数据。它允许从任何能够支持JDBC连接的应用程序直接连接。它将Java应用程序与实时Cassandra和NoSQL以及云服务和数据库连接起来。用户可以使用ApacheCassandra&#xff0c;因为在本例中&a…