大数据-104 Spark Streaming Kafka Offset Scala实现Redis管理Offset并更新

news2024/11/13 12:49:46

点一下关注吧!!!非常感谢!!持续更新!!!

目前已经更新到了:

  • Hadoop(已更完)
  • HDFS(已更完)
  • MapReduce(已更完)
  • Hive(已更完)
  • Flume(已更完)
  • Sqoop(已更完)
  • Zookeeper(已更完)
  • HBase(已更完)
  • Redis (已更完)
  • Kafka(已更完)
  • Spark(正在更新!)

章节内容

上节完成了如下的内容:

  • Spark Streaming Kafka
  • 自定义管理Offset Scala代码实现
    在这里插入图片描述

Offset 管理

Spark Streaming 集成Kafka,允许从Kafka中读取一个或者多个Topic的数据,一个Kafka Topic包含一个或者多个分区,每个分区中的消息顺序存储,并使用offset来标记消息位置,开发者可以在Spark Streaming应用中通过offset来控制数据的读取位置。
Offsets 管理对于保证流式应用在整个生命周期中数据的连贯性是非常重要的,如果在应用停止或者报错退出之前将Offset持久化保存,该消息就会丢失,那么Spark Streaming就没有办法从上次停止或保存的位置继续消费Kafka中的消息。

Spark Streaming 与 Kafka 的集成

Spark Streaming 可以通过 KafkaUtils.createDirectStream 直接与 Kafka 集成。这种方式不会依赖于 ZooKeeper,而是直接从 Kafka 分区中读取数据。
在这种直接方式下,Spark Streaming 依赖 Kafka 的 API 来管理和存储消费者偏移量(Offsets),默认情况下偏移量保存在 Kafka 自身的 __consumer_offsets 主题中。

使用 Redis 管理 Offsets

Redis 作为一个高效的内存数据库,常用于存储 Spark Streaming 中的 Kafka 偏移量。
通过手动管理偏移量,你可以在每批次数据处理后,将当前批次的 Kafka 偏移量存储到 Redis 中。这样,在应用程序重新启动时,可以从 Redis 中读取最后处理的偏移量,从而从正确的位置继续消费 Kafka 数据。

实现步骤

从 Redis 获取偏移量

应用启动时,从 Redis 中读取上次处理的偏移量,并从这些偏移量开始消费 Kafka 数据。

处理数据

通过 Spark Streaming 处理从 Kafka 消费到的数据。

保存偏移量到 Redis

每处理完一批数据后,将最新的偏移量存储到 Redis 中。这样,如果应用程序崩溃或重启,可以从这个位置继续消费。

自定义Offsets:根据Key从Redis获取Offsets 处理完更新Redis

添加依赖

<!-- jedis -->
<dependency>
  <groupId>redis.clients</groupId>
  <artifactId>jedis</artifactId>
  <version>2.9.0</version>
</dependency>

服务器上我们需要有:
Redis服务启动
在这里插入图片描述
Kafka服务启动
在这里插入图片描述
编写代码,实现的主要逻辑如下所示:

package icu.wzk

import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, HasOffsetRanges, KafkaUtils, LocationStrategies, OffsetRange}
import org.apache.spark.streaming.{Seconds, StreamingContext}

object KafkaDStream3 {
  def main(args: Array[String]): Unit = {
    Logger.getLogger("args").setLevel(Level.ERROR)
    val conf = new SparkConf()
      .setAppName("KafkaDStream3")
      .setMaster("local[*]")
    val ssc = new StreamingContext(conf, Seconds(5))
    val groupId: String = "wzkicu"
    val topics: Array[String] = Array("spark_streaming_test01")
    val kafkaParams: Map[String, Object] = getKafkaConsumerParameters(groupId)

    // 从 Kafka 获取 Offsets
    val offsets: Map[TopicPartition, Long] = OffsetsRedisUtils.getOffsetsFromRedis(topics, groupId)

    // 创建 DStream
    val dstream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream(
      ssc,
      LocationStrategies.PreferConsistent,
      ConsumerStrategies.Subscribe[String, String](topics, kafkaParams, offsets)
    )

    // DStream 转换&输出
    dstream.foreachRDD {
      (rdd, time) =>
        if (!rdd.isEmpty()) {
          // 处理消息
          println(s"====== rdd.count = ${rdd.count()}, time = $time =======")
          // 将 Offsets 保存到 Redis
          val offsetRanges: Array[OffsetRange] = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
          OffsetsRedisUtils.saveOffsetsToRedis(offsetRanges, groupId)
        }
    }

    ssc.start()
    ssc.awaitTermination()

  }

  private def getKafkaConsumerParameters(groupId: String): Map[String, Object] = {
    Map[String, Object](
      ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "h121.wzk.icu:9092",
      ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
      ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
      ConsumerConfig.GROUP_ID_CONFIG -> groupId,
      ConsumerConfig.AUTO_OFFSET_RESET_CONFIG -> "earliest",
      ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG -> (false: java.lang.Boolean)
    )
  }
}

代码中我们封装了一个工具类:

package icu.wzk

import org.apache.kafka.common.TopicPartition
import org.apache.spark.streaming.kafka010.OffsetRange
import redis.clients.jedis.{Jedis, JedisPool, JedisPoolConfig}

import scala.collection.mutable

object OffsetsRedisUtils {

  private val config = new JedisPoolConfig
  private val redisHost = "h121.wzk.icu"
  private val redisPort = 6379

  config.setMaxTotal(30)
  config.setMaxIdle(10)

  private val pool= new JedisPool(config, redisHost, redisPort, 10000)
  private val topicPrefix = "kafka:topic"

  private def getKey(topic: String, groupId: String, prefix: String = topicPrefix): String = s"$prefix:$topic:$groupId"
  private def getRedisConnection: Jedis = pool.getResource

  // 从Redis中获取Offsets
  def getOffsetsFromRedis(topics: Array[String], groupId: String): Map[TopicPartition, Long] = {
    val jedis: Jedis = getRedisConnection
    val offsets: Array[mutable.Map[TopicPartition, Long]] = topics.map {
      topic =>
        import scala.collection.JavaConverters._
        jedis.hgetAll(getKey(topic, groupId))
          .asScala
          .map {
            case (partition, offset) => new TopicPartition(topic, partition.toInt) -> offset.toLong
          }
    }
    jedis.close()
    offsets.flatten.toMap
  }

  // 将 Offsets 保存到 Redis
  def saveOffsetsToRedis(ranges: Array[OffsetRange], groupId: String): Unit = {
    val jedis: Jedis = getRedisConnection
    ranges
      .map(range => (range.topic, range.partition -> range.untilOffset))
      .groupBy(_._1)
      .map {
        case (topic, buffer) => (topic, buffer.map(_._2))
      }
      .foreach {
        case (topic, partitionAndOffset) =>
          val offsets: Array[(String, String)] = partitionAndOffset.map(elem => (elem._1.toString, elem._2.toString))
          import scala.collection.JavaConverters._
          jedis.hmset(getKey(topic, groupId), offsets.toMap.asJava)
      }

    jedis.close()
  }

}

我们启动后,如图所示:
在这里插入图片描述
这里我使用Redis查看当前的存储情况:
在这里插入图片描述
可以看到当前已经写入了,我们继续启动 KafkaProducer工具,继续写入数据。
可以看到,已经统计到数据了。
在这里插入图片描述
我们继续查看当前的Redis中的数据:
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2080517.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

IngsollRang拧紧Insight IC-D控制器维修 系统参数设置

——设置菜单 Setup&#xff08;设置&#xff09;菜单及其子菜单用于编写拧紧策略并设置许多重要的系统参数。 在Setup&#xff08;设置&#xff09;菜单中&#xff0c;创建基本拧紧策略。 除策略外&#xff0c;您可以使用Setup&#xff08;设置&#xff09;菜单来设置时间、显…

堆和栈的概念和区别

文章目录 堆和栈的概念和区别栈 (Stack)堆 (Heap)详细描述补充说明逃逸分析 (Escape Analysis)栈上分配 (Stack Allocation)堆碎片化 (Heap Fragmentation) 堆和栈的概念和区别 堆和栈的概念和区别【改编自博客】 在说堆和栈之前&#xff0c;我们先说一下JVM&#xff08;虚拟…

家里两个路由器IP地址一样吗?‌IP地址冲突怎么办?‌

在家庭网络环境中&#xff0c;‌随着智能设备的不断增多和网络需求的日益提升&#xff0c;‌很多家庭选择使用两个或更多的路由器来扩展网络覆盖、‌提高网络性能。‌然而&#xff0c;‌在设置和使用多个路由器的过程中&#xff0c;‌一个常见且令人困惑的问题是&#xff1a;‌…

C++常见面试题(面试中总结)

文章目录 原文章链接1、回调函数的了解&#xff1f;2、递归算法解释&#xff1f;3、内存对齐解释&#xff1f;4、一种排序算法解释&#xff08;快速排序&#xff09;5、什么是多态&#xff1f;6、基类为什么需要虚析构函数&#xff1f;7、new和malloc的区别&#xff1f;8、指针…

ubuntu中安装Mysql以及使用Navicat远程连接的详细步骤【图文教程】

安装步骤 注意&#xff1a;建议大家都安装Ubuntu22.04的版本&#xff0c;在该版本下再安装MySQL8.0版本的数据库。 1查看当前是否安装了MySQL程序 $ dpkg -l |grep mysql 执行以上命令&#xff0c;如果执行后什么都没有&#xff0c;则进入到MySQL的安装步骤 2如果执行以上…

MATLAB进阶:应用微积分

今天我们继续学习matlab中的应用微积分 求导&#xff08;微分&#xff09; 1、数值微分 n维向量x(xi&#xff0c;x,… x)的差分定义为n-1维向量△x(X2-X1&#xff0c;X3-X2&#xff0c;…&#xff0c;Xn- Xn-1)。 diff(x) 如果x是向量&#xff0c;返回向量x的差分如果x是矩…

初识Linux · 有关gcc/g++

目录 前言&#xff1a; 1 gcc和g 2 翻译过程 2.1 预处理 2.2 编译 2.3 汇编 2.4 链接 前言&#xff1a; 继上文介绍了vim 和 yum&#xff0c;相当于介绍了 文本编译器&#xff0c;我们可以利用vim写代码&#xff0c;那么写代码的我们了解了&#xff0c;现在应该了解编译…

R语言统计分析——如何选择最佳回归模型

参考资料&#xff1a;R语言实战【第2版】 尝试获取一个回归方程时&#xff0c;实际上你就面对着从众多可能的模型中做选择的问题。是不是所有的变量都要包括&#xff1f;还是去掉那个对预测贡献不显著的变量&#xff1f;是否需要添加多项式项和/或交互项来提高拟合度&#xff1…

.NET WPF 抖动动画

.NET WPF 抖动动画 Demo Code <!-- 水平抖动 --> <Button Content"Hello World"><Button.RenderTransform><TranslateTransform x:Name"translateTransform" /></Button.RenderTransform><Button.Triggers><Even…

SP: eric

靶机搭建 靶机下载地址 在Virtualbox中打开下载好的靶机&#xff0c;网络配置修改为桥接模式&#xff0c;启动靶机即可。 信息收集 主机发现 nmap 192.168.31.0/24 -Pn -T4 靶机IP&#xff1a;192.168.31.244 端口扫描 nmap 192.168.31.244 -A -p- -T4 根据端口扫描结果…

Linux驱动学习之内核poll阻塞

在linux系统编程课程中学习过多路IO复用&#xff0c;简单来说就三个函数select&#xff0c;poll&#xff0c;epoll。 对于select 此函数是跨平台的&#xff0c;可以在windows&#xff0c;Linux中使用。 对于poll与epoll 只能在linux平台下使用&#xff0c; epoll底层实现是一个…

ArcGIS应用指南:近邻分析(点匹配到最近线段上)

近邻分析通常用于确定一个要素集中的要素与另一个要素集中最近要素的距离。当涉及到点匹配到最近的线时&#xff0c;这种分析可以用来确定每个点到最近线段的距离及位置&#xff0c;也就是我们常说的点匹配到最近线上&#xff0c;可以参考官方文档&#xff1a;近邻分析 (Covera…

EmguCV学习笔记 VB.Net 6.S 特别示例

版权声明&#xff1a;本文为博主原创文章&#xff0c;转载请在显著位置标明本文出处以及作者网名&#xff0c;未经作者允许不得用于商业目的。 EmguCV是一个基于OpenCV的开源免费的跨平台计算机视觉库,它向C#和VB.NET开发者提供了OpenCV库的大部分功能。 教程VB.net版本请访问…

深度学习基础(Datawhale X 李宏毅苹果书AI夏令营)

深度学习基础(Datawhale X 李宏毅苹果书AI夏令营) 3.1局部极小值和鞍点 3.1.1. 优化失败问题 在神经网络中&#xff0c;当优化到梯度为0的地方&#xff0c;梯度下降就无法继续更新参数了&#xff0c;训练也就停下来了&#xff0c;如图&#xff1a; 梯度为0的情况包含很多种情…

Anaconda3简介与安装步骤

目录 Anaconda3简介与功能 1.Anaconda3简介 2.主要功能和特点 3.使用场景 4.总结 Anaconda3安装 1.Anaconda3下载 1.1我的百度网盘 1.2官网下载 1.2.1访问官网 1.2.2输入邮箱 1.2.3登录你的邮箱下载&#xff08;你的噶&#xff09; 2.安装 2.1双击安装 2.2选择安…

分块矩阵的转置

证明 则 证明&#xff1a;令&#xff0c;有&#xff0c;对它做一个分块使得和后面的分块矩阵中的是同型矩阵&#xff0c;要证明&#xff08;任意的&#xff09;&#xff0c;需要证明1&#xff09;是一个的矩阵 2&#xff09;任意的 首先证明1&#xff09;我们先定义两个函…

HarmonyOS ArkUI工程框架解析

通过 HarmonyOS Developer 官网我们可以了解 ArkUI 是一套声明式开放框架&#xff0c;开发者可以基于 ArkTS 语法设计一套极简的 DSL 以及丰富的 UI 组件完成跨设备的界面开发。 那么 ArkUI 是如何实现这一套声明式开放框架的呢&#xff1f;本文将通过分析开源的 HarmonyOS 渲染…

记录devtmpfs:error mounting -2问题的解决

ext4文件系统制作有问题. 重新制作文件系统烧录 /dev文件夹丢失

软考攻略/超详细/系统集成项目管理工程师/基础知识分享04

第二章 信息技术发展 2.1信息技术及其发展 2.1.1 计算机软硬件&#xff08;了解&#xff09; 在许多情况下&#xff0c;计算机的某些功能既可以由硬件实现&#xff0c;也可以由软件来实现。 1、计算机硬件 计算机硬件主要分为&#xff1a;控制器、运算器、存储器、输入设备和…

开发中如何在运行/调试时将项目热部署到Tomcat

这里有一篇不错的博客&#xff0c;可以参考 http://t.csdnimg.cn/oWcgm 正常情况下&#xff0c;我们将web项目打包成war包后&#xff0c;需要放到tomcat的webapps路径下&#xff0c;然后启动tomcat&#xff0c;才能正常访问。但是这在开发阶段是极为不便的。因此可以使用两种方…