【Spark分布式内存计算框架——Spark Streaming】9. 获取偏移量 应用案例:百度搜索风云榜(上)

news2024/9/29 21:23:27

4.4 获取偏移量

当SparkStreaming集成Kafka时,无论是Old Consumer API中Direct方式还是New Consumer API方式获取的数据,每批次的数据封装在KafkaRDD中,其中包含每条数据的元数据信息。
在这里插入图片描述
文档:http://spark.apache.org/docs/2.4.5/streaming-kafka-0-10-integration.html#obtaining-offsets
具体说明如下:
在这里插入图片描述
获取偏移量信息代码如下:
在这里插入图片描述
代码演示获取每批次RDD中对应Kafka分区中数据偏移量信息:
在这里插入图片描述
具体演示代码如下:

import java.util.Date
import org.apache.commons.lang3.time.FastDateFormat
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.kafka010._
/**
* 集成Kafka,实时消费Topic中数据,获取每批次数据对应Topic各个分区数据偏移量
*/
object StreamingKafkaOffset {
def main(args: Array[String]): Unit = {
// TODO: 1. 构建StreamingContext流式上下文实例对象
val ssc: StreamingContext = {
// a. 创建SparkConf对象,设置应用配置信息
val sparkConf = new SparkConf()
.setAppName(this.getClass.getSimpleName.stripSuffix("$"))
.setMaster("local[3]")
// b.创建流式上下文对象, 传递SparkConf对象,TODO: 时间间隔 -> 用于划分流式数据为很多批次Batch
val context = new StreamingContext(sparkConf, Seconds(5))
// c. 返回
context
}
// 2. 读取Kafka Topic中数据
/*
def createDirectStream[K, V](
ssc: StreamingContext,
locationStrategy: LocationStrategy,
consumerStrategy: ConsumerStrategy[K, V]
): InputDStream[ConsumerRecord[K, V]]
*/
// i.位置策略
val locationStrategy: LocationStrategy = LocationStrategies.PreferConsistent
/*
def Subscribe[K, V](
topics: ju.Collection[jl.String],
kafkaParams: ju.Map[String, Object]
): ConsumerStrategy[K, V]
*/
// ii.读取哪些Topic数据
val topics = Array("wc-topic")
// iii.消费Kafka 数据配置参数
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> "node1.itcast.cn:9092",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "group_id_streaming_0001",
"auto.offset.reset" -> "latest",
"enable.auto.commit" -> (false: java.lang.Boolean)
)
// iv.消费数据策略
val consumerStrategy: ConsumerStrategy[String, String] = ConsumerStrategies.Subscribe(
topics, kafkaParams
)
// v.采用新消费者API获取数据,类似于Direct方式
val kafkaDStream: DStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream(
ssc, locationStrategy, consumerStrategy
)
// TODO:其一、定义数组存储每批次数据对应RDD中各个分区的Topic Partition中偏移量信息
var offsetRanges: Array[OffsetRange] = Array.empty
// 3. 对每批次的数据进行词频统计
val resultDStream: DStream[(String, Int)] = kafkaDStream.transform(kafkaRDD => {
// TODO:其二、直接从Kafka获取的每批次KafkaRDD中获取偏移量信息
offsetRanges = kafkaRDD.asInstanceOf[HasOffsetRanges].offsetRanges
val resultRDD: RDD[(String, Int)] = kafkaRDD
.map(record => record.value()) // 获取Message数据
// 过滤不合格的数据
.filter(line => null != line && line.trim.length > 0)
// 按照分隔符划分单词
.flatMap(line => line.trim.split("\\s+"))
// 转换数据为二元组,表示每个单词出现一次
.map(word => (word, 1))
// 按照单词分组,聚合统计
.reduceByKey((tmp, item) => tmp + item)
resultRDD
})
// 4. 将结果数据输出 -> 将每批次的数据处理以后输出
resultDStream.foreachRDD{ (rdd, time) =>
val batchTime: String = FastDateFormat.getInstance("yyyy/MM/dd HH:mm:ss")
.format(new Date(time.milliseconds))
println("-------------------------------------------")
println(s"Time: $batchTime")
println("-------------------------------------------")
// 先判断RDD是否有数据,有数据在输出
if(!rdd.isEmpty()){
rdd
// 对于结果RDD输出,需要考虑降低分区数目
.coalesce(1)
// 对分区数据操作
.foreachPartition{iter =>iter.foreach(item => println(item))}
}
// TODO: 其三、当DStream进行Output操作完成以后,更新偏移量至外部存储系统(如Zookeeper、Redis等)
for (offsetRange <- offsetRanges) {
println(s"topic: ${offsetRange.topic} partition: ${offsetRange.partition} offsets:${o
ffsetRange.fromOffset} to ${offsetRange.untilOffset}")
}
}
// 5. 对于流式应用来说,需要启动应用
ssc.start()
// 流式应用启动以后,正常情况一直运行(接收数据、处理数据和输出数据),除非人为终止程序或者程序异常停止
ssc.awaitTermination()
// 关闭流式应用(参数一:是否关闭SparkContext,参数二:是否优雅的关闭)
ssc.stop(stopSparkContext = true, stopGracefully = true)
}
}

运行Streaming应用程序,控制台输出结果如下:
在这里插入图片描述

5. 应用案例:百度搜索风云榜

百度搜索风云榜(http://top.baidu.com/)以数亿网民的单日搜索行为作为数据基础,以搜索关键词为统计对象建立权威全面的各类关键词排行榜,以榜单形式向用户呈现基于百度海量搜索数据的排行信息,线上覆盖十余个行业类别,一百多个榜单。
在这里插入图片描述
在【热点榜单】中,可以看到依据搜索关键词实时统计各种维度热点,下图展示【实时热点】。
在这里插入图片描述

5.1 业务场景

仿【百度搜索风云榜】对用户使用百度搜索时日志进行分析:【百度搜索日志实时分析】,主要业务需求如下三个方面:
在这里插入图片描述

  • 业务一:搜索日志数据存储HDFS,实时对日志数据进行ETL提取转换,存储HDFS文件系统;
  • 业务二:百度热搜排行榜Top10,累加统计所有用户搜索词次数,获取Top10搜索词及次数;
  • 业务三:近期时间内热搜Top10,统计最近一段时间范围(比如,最近半个小时或最近2个小时)内用户搜索词次数,获取Top10搜索词及次数;

开发Maven Project中目录结构如下所示:
在这里插入图片描述

5.2 初始化环境

编程实现业务之前,首先编写程序模拟产生用户使用百度搜索产生日志数据和创建工具类StreamingContextUtils提供StreamingContext对象与从Kafka接收数据方法。

创建 Topic
启动Kafka Broker服务,创建Topic【search-log-topic】,命令如下所示:

# 1. 启动Zookeeper 服务
zookeeper-daemon.sh start
# 2. 启动Kafka 服务
kafka-daemon.sh start
# 3. Create Topic
kafka-topics.sh --create --topic search-log-topic \
--partitions 3 --replication-factor 1 --zookeeper node1.itcast.cn:2181/kafka200
# List Topics
kafka-topics.sh --list --zookeeper node1.itcast.cn:2181/kafka200
# Producer
kafka-console-producer.sh --topic search-log-topic --broker-list node1.itcast.cn:9092
# Consumer
kafka-console-consumer.sh --topic search-log-topic \
--bootstrap-server node1.itcast.cn:9092 --from-beginning

模拟日志数据
模拟用户搜索日志数据,字段信息封装到CaseClass样例类【SearchLog】类,代码如下:

package cn.itcast.spark.app.mock
/**
* 用户百度搜索时日志数据封装样例类CaseClass
* <p>
* @param sessionId 会话ID
* @param ip IP地址
* @param datetime 搜索日期时间
* @param keyword 搜索关键词
*/
case class SearchLog(
sessionId: String, //
ip: String, //
datetime: String, //
keyword: String //
) {
override def toString: String = s"$sessionId,$ip,$datetime,$keyword"
}

模拟产生搜索日志数据类【MockSearchLogs】具体代码如下:

package cn.itcast.spark.app.mock
import java.util.{Properties, UUID}
import org.apache.commons.lang3.time.FastDateFormat
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
import org.apache.kafka.common.serialization.StringSerializer
import scala.util.Random
/**
* 模拟产生用户使用百度搜索引擎时,搜索查询日志数据,包含字段为:
* uid, ip, search_datetime, search_keyword
*/
object MockSearchLogs {
def main(args: Array[String]): Unit = {
// 搜索关键词,直接到百度热搜榜获取即可
val keywords: Array[String] = Array("罗志祥", "谭卓疑", "当当网", "裸海蝶", "张建国")
// 发送Kafka Topic
val props = new Properties()
props.put("bootstrap.servers", "node1.itcast.cn:9092")
props.put("acks", "1")
props.put("retries", "3")
props.put("key.serializer", classOf[StringSerializer].getName)
props.put("value.serializer", classOf[StringSerializer].getName)
val producer = new KafkaProducer[String, String](props)
val random: Random = new Random()
while (true){
// 随机产生一条搜索查询日志
val searchLog: SearchLog = SearchLog(
getUserId(), //
getRandomIp(), //
getCurrentDateTime(), //
keywords(random.nextInt(keywords.length)) //
)
println(searchLog.toString)
Thread.sleep(10 + random.nextInt(100))
val record = new ProducerRecord[String, String]("search-log-topic", searchLog.toString)
producer.send(record)
}
// 关闭连接
producer.close()
}
/**
* 随机生成用户SessionId
*/
def getUserId(): String = {
val uuid: String = UUID.randomUUID().toString
uuid.replaceAll("-", "").substring(16)
}
/**
* 获取当前日期时间,格式为yyyyMMddHHmmssSSS
*/
def getCurrentDateTime(): String = {
val format = FastDateFormat.getInstance("yyyyMMddHHmmssSSS")
val nowDateTime: Long = System.currentTimeMillis()
format.format(nowDateTime)
}
/**
* 获取随机IP地址
*/
def getRandomIp(): String = {
// ip范围
val range: Array[(Int, Int)] = Array(
(607649792,608174079), //36.56.0.0-36.63.255.255
(1038614528,1039007743), //61.232.0.0-61.237.255.255
(1783627776,1784676351), //106.80.0.0-106.95.255.255
(2035023872,2035154943), //121.76.0.0-121.77.255.255
(2078801920,2079064063), //123.232.0.0-123.235.255.255
(-1950089216,-1948778497),//139.196.0.0-139.215.255.255
(-1425539072,-1425014785),//171.8.0.0-171.15.255.255
(-1236271104,-1235419137),//182.80.0.0-182.92.255.255
(-770113536,-768606209),//210.25.0.0-210.47.255.255
(-569376768,-564133889) //222.16.0.0-222.95.255.255
)
// 随机数:IP地址范围下标
val random = new Random()
val index = random.nextInt(10)
val ipNumber: Int = range(index)._1 + random.nextInt(range(index)._2 - range(index)._1)
//println(s"ipNumber = ${ipNumber}")
// 转换Int类型IP地址为IPv4格式
number2IpString(ipNumber)
}
/**
* 将Int类型IPv4地址转换为字符串类型
*/
def number2IpString(ip: Int): String = {
val buffer: Array[Int] = new Array[Int](4)
buffer(0) = (ip >> 24) & 0xff
buffer(1) = (ip >> 16) & 0xff
buffer(2) = (ip >> 8) & 0xff
buffer(3) = ip & 0xff
// 返回IPv4地址
buffer.mkString(".")
}
}

运行应用程序,源源不断产生日志数据,发送至Kafka(同时在控制台打印),截图如下:
在这里插入图片描述

StreamingContextUtils 工具类
所有SparkStreaming应用都需要构建StreamingContext实例对象,并且从采用New Kafka Consumer API消费Kafka数据,编写工具类【StreamingContextUtils】,提供两个方法:

方法一:getStreamingContext,获取StreamingContext实例对象
在这里插入图片描述
方法二:consumerKafka,消费Kafka Topic中数据
在这里插入图片描述
具体代码如下:

package cn.itcast.spark.app
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.{Seconds, StreamingContext}
/**
* 工具类提供:构建流式应用上下文StreamingContext实例对象和从Kafka Topic消费数据
*/
object StreamingContextUtils {
/**
* 获取StreamingContext实例,传递批处理时间间隔
* @param batchInterval 批处理时间间隔,单位为秒
*/
def getStreamingContext(clazz: Class[_], batchInterval: Int): StreamingContext = {
// i. 创建SparkConf对象,设置应用配置信息
val sparkConf = new SparkConf()
.setAppName(clazz.getSimpleName.stripSuffix("$"))
.setMaster("local[3]")
// 设置Kryo序列化
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.registerKryoClasses(Array(classOf[ConsumerRecord[String, String]]))
// ii.创建流式上下文对象, 传递SparkConf对象和时间间隔
val context = new StreamingContext(sparkConf, Seconds(batchInterval))
// iii. 返回
context
}
/**
* 从指定的Kafka Topic中消费数据,默认从最新偏移量(largest)开始消费
* @param ssc StreamingContext实例对象
* @param topicName 消费Kafka中Topic名称
*/
def consumerKafka(ssc: StreamingContext, topicName: String): DStream[ConsumerRecord[String, String]] = {
// i.位置策略
val locationStrategy: LocationStrategy = LocationStrategies.PreferConsistent
// ii.读取哪些Topic数据
val topics = Array(topicName)
// iii.消费Kafka 数据配置参数
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> "node1.itcast.cn:9092",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "group_id_streaming_0001",
"auto.offset.reset" -> "latest",
"enable.auto.commit" -> (false: java.lang.Boolean)
)
// iv.消费数据策略
val consumerStrategy: ConsumerStrategy[String, String] = ConsumerStrategies.Subscribe(
topics, kafkaParams
)
// v.采用新消费者API获取数据,类似于Direct方式
val kafkaDStream: DStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream(
ssc, locationStrategy, consumerStrategy
)
// vi.返回DStream
kafkaDStream
}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/381186.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

重磅:Meta未来4年路线图曝光,Quest出货超2000万台

The Verge今天曝光了一份Meta内部AR/VR产品规划图&#xff0c;这份规划图为Meta高管为Reality Labs员工的内部分享&#xff0c;包括了大量AR/VR产品信息&#xff0c;下面我们一起来看看。一&#xff0c;未来四年规划Meta Reality Labs四年规划&#xff1a;1&#xff0c;2023年&…

你知道吗?火狐搜集您的数据?

导读请注意,打包在 Firefox Web 浏览器里面的地理位置服务即使浏览器关闭后也会在后台运行。我们还没有从关于浏览器插件丑闻的消息中平复下来。插件原本目的是保卫隐私&#xff0c;但现在却把信息卖给了第三方公司。然而更令人愤怒的是其规模完全超出我们的预计。MLS MLS&…

报错“FirewallD is not running”怎么办,如何解决?

目录 一、报错详情 二、解决方法—开启防火墙步骤 步骤一&#xff1a;先通过命令查看一下防火墙的状态。 步骤二&#xff1a;开启防火墙。 步骤三&#xff1a;再次查看防火墙状态 一、报错详情 在docker创建redis容器&#xff0c;在进行window访问redis容器端口进行绑定设…

IGKBoard(imx6ull)-ADC编程MQ-2烟雾传感器采样

文章目录1- ADC介绍2- MQ-2烟雾传感器介绍&#xff08;1&#xff09;工作原理&#xff08;2&#xff09;MQ-2应用电路3- MQ-2烟雾传感器硬件连接4- ADC驱动配置5- 编程查看当前浓度1- ADC介绍 ADC是Analog-to-Digital Converter的缩写&#xff0c;指模数转换器。真实世界的模拟…

【C#基础】C# 预处理器指令

序号系列文章8【C#基础】C# 面向对象编程9【C# 基础】C# 异常处理操作10【C#基础】C# 正则表达式文章目录前言1&#xff0c;预处理器指令的概念2&#xff0c;预处理器指令的定义与使用2.1&#xff0c;可为空上下文2.2&#xff0c;定义符号2.3&#xff0c;条件编译2.4&#xff0…

KConfig语言学习(一文全览)

KConfig 语言学习菜单项菜单属性类型定义prompt: 输入提示default: 默认值depends on/requires: 依赖关系select: 反向依赖关系imply: 弱反向依赖关系visible if: 选项可见range: 数据范围help: 帮助信息菜单依赖关系菜单结构关系KConfig语法config: 配置项menuconfig: 配置菜单…

发布依赖到maven仓库

maven中央仓库是一个开放的仓库&#xff0c;所以我们也可以把自己开发的jar推送到远程仓库&#xff0c;这样可以直接引入pom依赖使用我们的库。 准备工作 ● 需要一个github账号&#xff08;程序员必备&#xff09; ● 网络代理&#xff08;涉及到的网站通常没版本在国内直接访…

Computers Graphics(CAG)及Elsevier常见期刊投稿记录

1.期刊地址 Editorial Managerhttps://www.editorialmanager.com/cag/default2.aspx先进行用户注册&#xff0c;登录后进入首页点击Submit New Manuscript开始提交手稿&#xff0c;其他期刊流程相同&#xff0c;CAG所有的投稿注意事项见&#xff1a;Guide for authors - Comp…

数据库连接与properties文件

管理properties数据库&#xff1a; 现在pom文件中加入Druid的坐标&#xff1a; <dependency><groupId>com.alibaba</groupId><artifactId>druid</artifactId><version>1.2.8</version></dependency>配置文件中添加相应的数据&…

拒绝B站邀约,从月薪3k到年薪47W,我的经验值得每一个测试人借鉴

有时候&#xff0c;大佬们总是会特立独行。因为像我这样的常人总是想不通&#xff0c;究竟是怎样的情境&#xff0c;连B站这样的大厂面试都可以推掉&#xff1f; 缘起一通电话&#xff0c;踏出了改变人生轨迹的第一步 我是小瑾&#xff0c;今年28岁&#xff0c;2016年毕业于陕…

线程池的基本认识与使用

线程池的基本认识与使用线程池线程池工作原理&#xff1a;优点&#xff1a;传统的创建线程方式线程池创建线程使用线程池 池化思想&#xff1a;线程池、字符串常量池、数据库连接池可以提高资源的利用率 线程池工作原理&#xff1a; 预先创建多个线程对象 放入线程池种&#…

数据库基础-数据库基本概念(1-1)

你好&#xff0c;欢迎来到数据库基础系列专栏&#xff0c;欢迎留言互动哦~ 目录一、数据库基础1. 数据库基本概念1.1 数据库1.2 什么是数据库管理软件1.3 表1.4 行1.5 列和数据类型1.6 主键1.7 什么是 SQL一、数据库基础 1. 数据库基本概念 1.1 数据库 数据库是一个以某种有…

射频调试的习惯

三月开工了&#xff0c;一个月的调试即将开始。其实调试的重心是测试&#xff0c;核心的推动力是做事的习惯和思维。测试很重要&#xff0c;数据不对&#xff0c;能力和时间都浪费了上面了。测试的问题初步解完了&#xff0c;今天吃饭的时候碰到大领导。领导好忙&#xff0c;我…

SQL报错注入(上)

SQL报错注入报错注入概述报错注入的前提条件Xpath型函数&#xff08;需要数据库版本>5.15&#xff09;extractvalue&#xff08;&#xff09;extractvalue&#xff08;&#xff09;实操![在这里插入图片描述](https://img-blog.csdnimg.cn/5c7bfbc6565045d4bb352448c17f0869…

docker搭建redis集群、哨兵

集群搭建 本机IP 192.168.1.149 分别采用映射 192.168.1.149 的6379 6380 6381 三个端口模拟三台服务器。搭建三主无从的集群 首先可以在本机上创建三份redis.conf配置文件,分别命名为redis1.conf, redis2.conf, redis3.conf &#xff0c;我这里放在/opt/redis/conf/中 redis*.…

MyBatis学习笔记(九) —— 自定义映射resultMap

9、自定义映射resultMap 9.1、resultMap处理字段和属性的映射关系 若字段名和实体类中的属性名不一致&#xff0c;则可以通过resultMap设置自定义映射 resultType 是一个具体的类型。 resultMap 是resultMap的标签。 id 是处理主键和属性的映射关系&#xff1b; result 是处…

FirePower X2 14.0.1 for RAD Studio Alexandria

介绍 FirePower X2 FirePower X2 集成了 RAD Studio 11.0 Alexandria 中的新功能&#xff0c;并预览了我们的新特色组件 TwwDataGrouper。 FirePower X2 还允许您为 Apple 的新 M1 芯片构建应用程序&#xff0c;这样您就可以进一步利用 M1 芯片来提高本机应用程序的性能&#x…

set和map的基本使用

目录 关联式容器 要点分析 键值对 pair介绍 set 模板参数列表&#xff1a; set的构造&#xff1a; 常用接口 操作 multiset map map的构造 插入 make_pair map的迭代器 operator[] multimap multimap中为什么没有重载operator[] 关联式容器 关联式容器也是用…

(五十六)针对主键之外的字段建立的二级索引,又是如何运作的?

上一次我们已经给大家彻底讲透了聚簇索引这个东西&#xff0c;其实聚簇索引就是innodb存储引擎默认给我们创建的一套基于主键的索引结构&#xff0c;而且我们表里的数据就是直接放在聚簇索引里的&#xff0c;作为叶子节点的数据页&#xff0c;如下图。 而且我们现在也对基于主键…

日志框架以及如何使用LogBack记录程序

使用日志框架可以记录一个程序运行的过程和详情&#xff0c;同时便捷地存储到文件里面&#xff0c;并且性能和灵活性都比较好。日志的体系结构包括两类日志规范接口&#xff1a;Commons Logging&#xff0c;简称&#xff1a;JCL&#xff1b;Simple Logging Facade for Java&…