【Spark分布式内存计算框架——Spark Streaming】7. Kafka集成方式

news2025/1/18 14:56:13

集成方式

Spark Streaming与Kafka集成,有两套API,原因在于Kafka Consumer API有两套,
文档:http://spark.apache.org/docs/2.4.5/streaming-kafka-integration.html。

方式一:Kafka 0.8.x版本

  • 老的Old Kafka Consumer API
  • 文档:http://spark.apache.org/docs/2.4.5/streaming-kafka-0-8-integration.html
  • 老的Old消费者API,有两种方式:
    • 第一种:高级消费API(Consumer High Level API),Receiver接收器接收数据
    • 第二种:简单消费者API(Consumer Simple Level API) ,Direct 直接拉取数据

方式二:Kafka 0.10.x版本

  • 新的 New Kafka Consumer API
  • 文档:http://spark.apache.org/docs/2.4.5/streaming-kafka-0-10-integration.html
  • 核心API:KafkaConsumer、ConsumerRecorder
    在这里插入图片描述

两种方式区别

使用Kafka Old Consumer API集成两种方式,虽然实际生产环境使用Direct方式获取数据,但是在面试的时候常常问到两者区别。
文档:http://spark.apache.org/docs/2.4.5/streaming-kafka-0-8-integration.html

  • Receiver-based Approach:
    • 基于接收器方式,消费Kafka Topic数据,但是企业中基本上不再使用;
    • Receiver作为常驻的Task运行在Executor等待数据,但是一个Receiver效率低,需要开启多个,再手动合并数据(union),再进行处理,很麻烦;
    • Receiver那台机器挂了,可能会丢失数据,所以需要开启WAL(预写日志)保证数据安全,那么效率又会降低;
    • Receiver方式是通过zookeeper来连接kafka队列,调用Kafka高阶API,offset存储在zookeeper,由Receiver维护;
    • Spark在消费的时候为了保证数据不丢也会在Checkpoint中存一份offset,可能会出现数据不一致;
  • Direct Approach (No Receivers):
    • 直接方式,Streaming中每批次的每个job直接调用Simple Consumer API获取对应Topic数据,此种方式使用最多,面试时被问的最多;
    • Direct方式是直接连接kafka分区来获取数据,从每个分区直接读取数据大大提高并行能力
    • Direct方式调用Kafka低阶API(底层API),offset自己存储和维护,默认由Spark维护在checkpoint中,消除了与zk不一致的情况;
    • 当然也可以自己手动维护,把offset存在MySQL、Redis和Zookeeper中;

上述两种方式区别,如下图所示:
在这里插入图片描述

4.2 Direct 方式集成

使用Kafka Old Consumer API方式集成Streaming,采用Direct方式,调用Old Simple Consumer API,
文档:
http://spark.apache.org/docs/2.4.5/streaming-kafka-0-8-integration.html#approach-2-direct-approach-no-receivers

编码实现
Direct方式会定期地从Kafka的topic下对应的partition中查询最新的偏移量,再根据偏移量范围,在每个batch里面处理数据,Spark通过调用kafka简单的消费者API读取一定范围的数据。
在这里插入图片描述
Direct方式没有receiver层,其会周期性的获取Kafka中每个topic的每个partition中的最新offsets,再根据设定的maxRatePerPartition来处理每个batch。较于Receiver方式的优势:

  • 其一、简化的并行:Kafka中的partition与RDD中的partition是一一对应的并行读取Kafka数据;
  • 其二、高效:no need for Write Ahead Logs;
  • 其三、精确一次:直接使用simple Kafka API,Offsets则利用Spark Streaming的checkpoints进行记录。

添加相关MAVEN依赖:

<!-- Spark Streaming 集成Kafka 0.8.2.1 -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-8_2.11</artifactId>
<version>2.4.5</version>
</dependency>

提供工具类KafkaUtils专门从Kafka消费数据,其中函数【createDirectStream】消费数据:
在这里插入图片描述
创建Topic及Console控制台发送数据,命令如下:

# 1. 启动Zookeeper 服务
zookeeper-daemon.sh start
# 2. 启动Kafka 服务
kafka-daemon.sh start
# 3. Create Topic
kafka-topics.sh --create --topic wc-topic \
--partitions 3 --replication-factor 1 --zookeeper node1.itcast.cn:2181/kafka200
# List Topics
kafka-topics.sh --list --zookeeper node1.itcast.cn:2181/kafka200
# Producer
kafka-console-producer.sh --topic wc-topic --broker-list node1.itcast.cn:9092
# Consumer
kafka-console-consumer.sh --topic wc-topic \
--bootstrap-server node1.itcast.cn:9092 --from-beginning

具体演示代码如下:

import java.util.Date
import kafka.serializer.StringDecoder
import org.apache.commons.lang3.time.FastDateFormat
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}
/**
* 集成Kafka,采用Direct式读取数据,对每批次(时间为1秒)数据进行词频统计,将统计结果输出到控制台
*/
object StreamingKafkaDirect {
def main(args: Array[String]): Unit = {
// 1、构建流式上下文实例对象StreamingContext,用于读取流式的数据和调度Batch Job执行
val ssc: StreamingContext = {
// a. 创建SparkConf实例对象,设置Application相关信息
val sparkConf = new SparkConf()
.setAppName(this.getClass.getSimpleName.stripSuffix("$"))
.setMaster("local[3]")
//TODO: 设置每秒钟读取Kafka中Topic最大数据量
.set("spark.streaming.kafka.maxRatePerPartition", "10000")
// b. 创建StreamingContext实例,传递Batch Interval(时间间隔:划分流式数据)
val context: StreamingContext = new StreamingContext(sparkConf, Seconds(5))
// c. 返回上下文对象
context
}
// TODO: 2、从流式数据源读取数据
/*
def createDirectStream[
K: ClassTag, // Topic中Key数据类型
V: ClassTag,
KD <: Decoder[K]: ClassTag, // 表示Topic中Key数据解码(从文件中读取,反序列化)
VD <: Decoder[V]: ClassTag] (
ssc: StreamingContext,
kafkaParams: Map[String, String],
topics: Set[String]
): InputDStream[(K, V)]
*/
// 表示从Kafka Topic读取数据时相关参数设置
val kafkaParams: Map[String, String] = Map(
"bootstrap.servers" -> "node1.itcast.cn:9092",
// 表示从Topic的各个分区的哪个偏移量开始消费数据,设置为最大的偏移量开始消费数据
"auto.offset.reset" -> "largest"
)
// 从哪些Topic中读取数据
val topics: Set[String] = Set("wc-topic")
// 采用Direct方式从Kafka 的Topic中读取数据
val kafkaDStream: DStream[(String, String)] = KafkaUtils
.createDirectStream[String, String, StringDecoder, StringDecoder](
ssc, //
kafkaParams, //
topics
)
// 3、对接收每批次流式数据,进行词频统计WordCount
val resultDStream: DStream[(String, Int)] = kafkaDStream.transform(rdd => {
rdd
// 获取从Kafka读取数据的Message
.map(tuple => tuple._2)
// 过滤“脏数据”
.filter(line => null != line && line.trim.length > 0)
// 分割为单词
.flatMap(line => line.trim.split("\\s+"))
// 转换为二元组
.mapPartitions{iter => iter.map(word => (word, 1))}
// 聚合统计
.reduceByKey((a, b) => a + b)
})
// 4、将分析每批次结果数据输出,此处答应控制台
resultDStream.foreachRDD{ (rdd, time) =>
// 使用lang3包下FastDateFormat日期格式类,属于线程安全的
val batchTime = FastDateFormat.getInstance("yyyy/MM/dd HH:mm:ss")
.format(new Date(time.milliseconds))
println("-------------------------------------------")
println(s"Time: $batchTime")
println("-------------------------------------------")
// TODO: 先判断RDD是否有数据,有数据在输出哦
if(!rdd.isEmpty()){
rdd
// 对于结果RDD输出,需要考虑降低分区数目
.coalesce(1)
// 对分区数据操作
.foreachPartition{iter =>iter.foreach(item => println(item))}
}
}
// 5、对于流式应用来说,需要启动应用,正常情况下启动以后一直运行,直到程序异常终止或者人为干涉
ssc.start() // 启动接收器Receivers,作为Long Running Task(线程) 运行在Executor
ssc.awaitTermination()
// 结束Streaming应用执行
ssc.stop(stopSparkContext = true, stopGracefully = true)
}
}

从WEB UI监控页面可以看出如下信息:
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/378538.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

MyBatis学习笔记(八) —— 字段名和属性不一致的情况下,如何处理映射关系

EmpMapper.java /** * 根据id查询员工信息 * param empId * return */ Emp getEmpByEmpId(Param("empId") Integer empId);EmpMapper.xml <?xml version"1.0" encoding"UTF-8" ?> <!DOCTYPE mapper PUBLIC "-//mybatis.org//D…

Android仿QQ未读消息拖拽粘性效果

效果图原理分析首先是在指定某个位置画一个圆出来&#xff0c;手指按到这个圆的时候再绘制一个可以根据手指位置移动的圆&#xff0c;随着手指的移动两个圆逐渐分离&#xff0c;分离的过程中两圆中间出现连接带&#xff0c;随着两圆圆心距的增大&#xff0c;半径也是根据某一比…

LeetCode经典例题|134. 加油站|运用坐标系数学思维一步解决

134. 加油站 这道题刚看很容易就想到了暴力或者回溯剪枝。 这是一个有增有减的过程&#xff0c;就好像坐标系上的一个个点&#xff0c;连在一起形成一条上下起伏的折线。 1. 做坐标轴 比如 gas [1,2,3,4,5], cost [3,4,5,1,2] 从0号汽车站开始出发&#xff0c;一直到回到起…

计算机网络(第三版) 胡亮 课后习题第四章答案

计算机网络&#xff08;第三版&#xff09; 胡亮 课后习题第四章答案 1、数据链路层的任务和功能是什么&#xff1f; 数据链路层的任务是提供两个相邻的网络节点或主机及其相连的网络节点之间的可靠通信。 数据链路层的主要服务功能是线路规程、差错控制和流量控制。 2、什么是…

第四阶段08-基于element-ui的vue2.0脚手架(续)

42. VUE脚手架项目嵌套路由 在配置路由&#xff08;配置/src/router/index.js&#xff09;时&#xff0c;如果配置的路由对象是routes常量的直接数组元素&#xff0c;则此路由配置的视图会显示在App.vue的<router-view/>中。 在设计视图时&#xff0c;可能会出现<ro…

深度学习基础实例与总结

一、神经网络 1 深度学习 1 什么是深度学习&#xff1f; 简单来说&#xff0c;深度学习就是一种包括多个隐含层 (越多即为越深)的多层感知机。它通过组合低层特征&#xff0c;形成更为抽象的高层表示&#xff0c;用以描述被识别对象的高级属性类别或特征。 能自生成数据的中…

Spring常见面试题汇总(超详细回答)

1.什么是Spring框架&#xff1f;Spring框架是一个开源的Java应用程序开发框架&#xff0c;它提供了很多工具和功能&#xff0c;可以帮助开发者更快地构建企业级应用程序。通过使用Spring框架&#xff0c;开发者可以更加轻松地开发Java应用程序&#xff0c;并且可以更加灵活地组…

C++学习笔记

C学习笔记 学习视频资料 随手记 vector是一个能够存放任意类型的动态数组 unordered_set 不能放重复元素的容器 emplace放入 std::cin.get(); //使控制器不会立马关闭&#xff0c;保持窗口打开 总是通过 const引用传递对象 默认 动态链接 static 静态链接&#xff08;只在当…

WebRTC Qos策略

1.WebRTC 用于提升 QoS 的方法&#xff1a;NACK、FEC、SVC、JitterBuffer、IDR Request、PACER、Sender Side BWE、VFR&#xff08;动态帧率调整策略&#xff09;https://blog.csdn.net/CrystalShaw/article/details/80432267丢包重传NACK&#xff1a;一种通知技术&#xff0c;…

【办公类-19-02】Python批量制作word文本框的名字小标签,用A4word打印(植物角、家长会、值日生)

背景需求&#xff1a; 2月28日去小班带班&#xff0c;看到班主任制作了一些小手印花束作为家长会的家长座位提示&#xff0c;上面贴着“”圆形白色的幼儿名字贴”。 我立刻想起了制作的过程——在word中插入文本框&#xff0c;然后复制无数个文本框&#xff0c;摆好位置&#…

Win11的两个实用技巧系列之设置系统还原点的方法、安全启动状态开启方法

Win11如何设置系统还原点?Win11设置系统还原点的方法很多用户下载安装win11后应该如何创建还原点呢&#xff1f;现在我通过这篇文章给大家介绍一下Win11如何设置系统还原点&#xff1f;在Windows系统中有一个系统还原功能可以帮助我们在电脑出现问题的时候还原到设置的时间上&…

spring循环依赖debug源码【Java面试第三季】

spring循环依赖debug源码【Java面试第三季】前言spring循环依赖debug源码35_spring循环依赖debug源码01全部Debug断点36_spring循环依赖debug源码0237_spring循环依赖debug源码0338_spring循环依赖debug源码04最后前言 本文是4.Spring【Java面试第三季】的小节 spring循环依赖…

守护进程与TCP通讯

目录 一.守护进程 1.1进程组与会画 1.2守护进程 二.创建守护进程 setsid函数&#xff1a; 三. TCP通讯流程 3.1三次握手&#xff1a; 3.2 数据传输的过程 3.3四次挥手 一.守护进程 1.1进程组与会画 进程组&#xff1a;进程组由一个进程或者多个进程组成&#xff0c;每…

谷歌优化排名怎么做出来的?谷歌排名多久做上去?

本文主要分享谷歌排名的算法机制&#xff0c;让你很容易地用更短的时间把Google的自然排名做到首页。 本文由光算创作&#xff0c;有可能会被剽窃和修改&#xff0c;我们佛系对待这种行为吧。 谷歌优化排名怎么做出来的&#xff1f; 答案是&#xff1a;持续更新原创优质内容…

让WPF中的DataGrid像Excel一样可以筛选

在默认情况下&#xff0c;WPF提供的DataGrid仅拥有数据展示等简单功能&#xff0c;如果要实现像Excel一样复杂的筛选过滤功能&#xff0c;则相对比较麻烦。本文以一个简单的小例子&#xff0c;简述如何通过WPF实话DataGrid的筛选功能&#xff0c;仅供学习分享使用&#xff0c;如…

唯品会财报:一面骄阳,一面寒霜

配图来自Canva可画 在互联网技术飞速发展、物流基础设施日益完善&#xff0c;以及消费者购物习惯不断改变等多重因素的共同影响下&#xff0c;电商行业实现了蓬勃发展。得益于此&#xff0c;电商行业也跑出了许多知名电商品牌&#xff0c;其中既有淘宝、京东、拼多多等综合型电…

SSRF漏洞原理、危害以及防御与修复

一、SSRF漏洞原理漏洞概述SSRF&#xff08;Server-side Request Forge&#xff0c;服务端请求伪造&#xff09;是一种由攻击者构造形成由服务端发起请求的安全漏洞。一般情况下&#xff0c;SSRF攻击的目标是从外网无法访问的内部系统。正是因为它是由服务端发起的&#xff0c;所…

Java开发 - Elasticsearch初体验

目录 前言 什么是es&#xff1f; 为什么要使用es&#xff1f; es查询的原理&#xff1f; es需要准备什么&#xff1f; es基本用法 创建工程 添加依赖 创建操作es的文件 使用ik分词插件 Spring Data 项目中引入Spring Data 添加依赖 添加配置 创建操作es的业务逻…

深度学习常用的激活函数总结

各种激活函数总结 目录一、sigmoid二、tanh![在这里插入图片描述](https://img-blog.csdnimg.cn/a0d92552edf8464db793fdd2f2b75cb5.png)三、ReLU系列1.原始ReLU2.ReLU改进&#xff1a;Leaky ReLU四、swish五、GeLU一、sigmoid 优点&#xff1a; 1.可以将任意范围的输出映射到 …

高通平台开发系列讲解(Sensor篇)AlsPs的工作原理及介绍

文章目录 一、什么是ALS?二、什么是距感(PS)?三、AlsPs的工作原理四、AlsPs的特性五、距感的校准参数说明六、光感的校准参数说明沉淀、分享、成长,让自己和他人都能有所收获!😄 📢本篇文章将介绍 AlsPs 的工作原理及介绍。 一、什么是ALS? 光感的英文叫做Ambient Li…