Spark-Streaming集成Kafka

news2024/12/24 8:44:54

 Spark Streaming集成Kafka是生产上最多的方式,其中集成Kafka 0.10是较为简单的,即:Kafka分区和Spark分区之间是1:1的对应关系,以及对偏移量和元数据的访问。与高版本的Kafka Consumer API 集成时做了一些调整,下面我们一起来看看吧。

一、创建一个Direct Stream

导入相关maven依赖

<dependency>
  <groupId>org.apache.spark</groupId>
  <artifactId>spark-streaming-kafka-0-10_2.12</artifactId>
  <version>3.5.3</version>
</dependency>

import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe

class KafkaDriectStream {
  def main(args: Array[String]): Unit = {

    // 创建一个具有2个线程和1秒批处理间隔的本地StreamingContext。
    val conf = new SparkConf().setMaster("local[2]").setAppName("KafkaDriectStream")
    val ssc = new StreamingContext(conf, Seconds(1))

    val kafkaParams = Map[String, Object](
      "bootstrap.servers" -> "cdh1:9092,cdh2:9092,cdh3:9092",
      "key.deserializer" -> classOf[StringDeserializer],
      "value.deserializer" -> classOf[StringDeserializer],
      "group.id" -> "use_a_separate_group_id_for_each_stream",
      "auto.offset.reset" -> "latest",
      "enable.auto.commit" -> (false: java.lang.Boolean)
    )

    val topics = Array("topicA", "topicB")
    val inputDStream :InputDStream[ConsumerRecord[String, String]]= KafkaUtils.createDirectStream[String, String](
      ssc,
      PreferConsistent,
      Subscribe[String, String](topics, kafkaParams)
    )
    inputDStream.map(record => (record.key, record.value))
  }
}

如果Spark批处理持续时间大于默认的Kafka心跳会话超时时间(30秒),请适当增加heartbeat.interval.ms和session.timeout.ms。对于大于5分钟的批处理,这将需要更改代理上的group.max.session.timeout.ms。

二、executor选择适合分区处理

新的Kafka Consumer API会将消息预取到缓冲区中。因此,出于性能原因,Spark集成Kafka时最好将缓存的Consumer 保留在executor上(而不是为每个批次重新创建它们)。

在大多数情况下,应该使用LocationStrategies.PreferConsistent。这将在可用的executor之间均匀地分配分区。如果executor与Kafka 的broker位于相同的主机上,则使用PreferBrokers,这将在该分区的Kafka leader上安排分区。最后,如果分区之间的负载严重偏差,请使用PreferFixed。这允许指定分区到主机的显式映射(任何未指定的分区都将使用一致的位置)。

Consumer 缓存的默认最大大小为64。如果处理超过(64个executor数量)的Kafka分区,可以通过更改spark.streaming.kafka.consumer.cache.maxCapacity设置。

如果想禁用Consumer 的缓存,可以将spark.streaming.kafka.consumer.cache.enabled 设置成false

缓存由topic分区和group.id控制,因此对createDirectStream的每次调用使用单独的 group.id

三、根据topic、partition、offset创建RDD

// 导入依赖关系并创建kafka-params,例如第一步:创建Direct Stream

val offsetRanges = Array(
  // topic, partition, 包含起始offset, 不包含结束offset
  OffsetRange("test", 0, 0, 100),
  OffsetRange("test", 1, 0, 100)
)

//根据kafka TopicPartition 中的一段数据来创建一个RDD,这是不是为了实现微批来提供支持呢
val rdd = KafkaUtils.createRDD[String, String](sparkContext, kafkaParams, offsetRanges, PreferConsistent)

请注意,这里不能指定broker来消费,因为spark streaming的Driver Consumer 可以自动查找broker的元数据。如果要指定broker,需要将其与元数据绑定到一起。

四、获取offset

stream.foreachRDD { rdd =>
  val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
  rdd.foreachPartition { iter =>
    val o: OffsetRange = offsetRanges(TaskContext.get.partitionId)
    println(s"${o.topic} ${o.partition} ${o.fromOffset} ${o.untilOffset}")
  }
}

请注意,HasOffsetRanges的类型转换只有在createDirectStream结果调用的第一个方法中完成时才会成功,而不是在后面的方法链中完成。因为一旦发生shuffle和重分区,RDD分区和Kafka分区之间的一对一关系就会遭到破坏。

五、存储offset

在kafka中为了实现精确一次的语义,必须把结果处理和offset放到一个事务中去处理,在与spark streaming集成时也不例外。必须在幂等输出之后存储offset,或者将offset与输出一起存储在原子事务中。

offset可以存储在spark的checkpoint中,也可以存储在kafka自身的内部topic中。将offset存储到kafka的好处是,无论应用程序代码发生什么变化,Kafka都是一个持久的存储。但是,Kafka不是事务性的,程序的输出必须仍然是幂等的。注意,在流式计算中我们一般会将enable.auto.commit置为false。采用手动提交的方式。

stream.foreachRDD { rdd =>
  val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges

  // 一段时间后,在输出完成之后,提交offset
  stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
}

与HasOffsetRanges一样,只有在createDirectStream的结果上调用时,才能成功得到CanCommitOffsets ,而不是在转换之后。获取到CanCommitOffsets 一般要等这批数据处理完再进行提交。

// 从提交到数据库的偏移量开始
val fromOffsets = selectOffsetsFromYourDatabase.map { resultSet =>
  new TopicPartition(resultSet.string("topic"), resultSet.int("partition")) -> resultSet.long("offset")
}.toMap

val stream = KafkaUtils.createDirectStream[String, String](
  streamingContext,
  PreferConsistent,
  Assign[String, String](fromOffsets.keys.toList, kafkaParams, fromOffsets)
)

stream.foreachRDD { rdd =>
  val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges

  val results = yourCalculation(rdd)

  // 开启事务

  // 更新结果
  // 更新offset

  // 结束事务
}

六、官方例子

object DirectKafkaWordCount {
  def main(args: Array[String]): Unit = {
    if (args.length < 3) {
      System.err.println(s"""
        |Usage: DirectKafkaWordCount <brokers> <groupId> <topics>
        |  <brokers> is a list of one or more Kafka brokers
        |  <groupId> is a consumer group name to consume from topics
        |  <topics> is a list of one or more kafka topics to consume from
        |
        """.stripMargin)
      System.exit(1)
    }

    StreamingExamples.setStreamingLogLevels()

    val Array(brokers, groupId, topics) = args

    // 以2秒的批处理间隔创建上下文
    val sparkConf = new SparkConf().setAppName("DirectKafkaWordCount")
    val ssc = new StreamingContext(sparkConf, Seconds(2))

    //指定kafka、topic信息创建direct kafka stream
    val topicsSet = topics.split(",").toSet
    val kafkaParams = Map[String, Object](
      ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> brokers,
      ConsumerConfig.GROUP_ID_CONFIG -> groupId,
      ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
      ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer])
    val messages = KafkaUtils.createDirectStream[String, String](
      ssc,
      LocationStrategies.PreferConsistent,
      ConsumerStrategies.Subscribe[String, String](topicsSet, kafkaParams))

    // 获取一行数据并进行分割、统计、打印
    val lines = messages.map(_.value)
    val words = lines.flatMap(_.split(" "))
    val wordCounts = words.map(x => (x, 1L)).reduceByKey(_ + _)
    wordCounts.print()

    //启动计算
    ssc.start()
    ssc.awaitTermination()
  }
}

该例子消费Kafka中一个或多个topic的消息并进行单词统计,需要三个参数:1、Kafka broker的列表,2、消费者组,3、以逗号分隔的topic列表

1、创建2个topic

kafka-topics --create --topic spark-streaming-wc1 --bootstrap-server cdh1:9092 --partitions 2 --replication-factor 2
kafka-topics --create --topic spark-streaming-wc2 --bootstrap-server cdh1:9092 --partitions 2 --replication-factor 2

2、启动程序

cd /opt/cloudera/parcels/CDH-6.3.1-1.cdh6.3.1.p0.1470567/lib/spark/
bin/run-example org.apache.spark.examples.streaming.DirectKafkaWordCount cdh1:9092,cdh2:9092 direct-kafka-wc-group spark-streaming-wc1,spark-streaming-wc2

3、向topic推送数据

kafka-console-producer --topic spark-streaming-wc1 --broker-list cdh1:9092,cdh2:9092,cdh3:9092
kafka-console-producer --topic spark-streaming-wc2 --broker-list cdh1:9092,cdh2:9092,cdh3:9092

4、查看结果


大多数高校硕博生毕业要求需要参加学术会议,发表EI或者SCI检索的学术论文会议论文:
可访问艾思科蓝官网,浏览即将召开的学术会议列表。会议如下:

第四届大数据、信息与计算机网络国际学术会议(BDICN 2025)

  • 广州
  • https://ais.cn/u/fi2yym

第四届电子信息工程、大数据与计算机技术国际学术会议(EIBDCT 2025)

  • 青岛
  • https://ais.cn/u/nuQr6f

第六届大数据与信息化教育国际学术会议(ICBDIE 2025)

  • 苏州
  • https://ais.cn/u/eYnmQr

第三届通信网络与机器学习国际学术会议(CNML 2025)

  • 南京
  • https://ais.cn/u/vUNva2

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2264633.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

「下载」智慧城市包括哪些方面:大数据公共服务平台、城市运行指挥中心、城市综合治理平台、城市体检综合运营平台解决方案

在当今信息化高速发展的时代&#xff0c;智慧城市已成为全球城市发展的新趋势。系列全面而创新的智慧城市解决方案&#xff0c;旨在助力城市实现智慧化转型&#xff0c;提升城市管理效率&#xff0c;增强市民生活质量。 智慧城市最新解决方案&#xff0c;标准规范顶层设计指南、…

ChatGPT生成接口文档实践案例(二)

不难发现&#xff0c;两个方案都出色地完成了接口文档的生成&#xff0c;但笔者更喜欢Response 2的表达&#xff0c;因为其描述更加全面。 还可以让ChatGPT生成符合OpenAPI 3.0规范的接口文档&#xff0c;以便于项目相关成员阅读&#xff0c;如图5-13所示。 为什么要生成OpenAP…

【解决】Linux更新系统内核后Nvidia-smi has failed...

问题概述 由于服务器(操作系统为 RedHat 9)宕机&#xff0c;重启后&#xff0c;系统内核自动更新了&#xff0c;然后输入 nvidia-smi 发现报了下面的异常&#xff1a; NVIDIA-SMI has failed because it couldnt communicate with the NVIDIA driver. Make sure that the late…

Docker Compose 安装 Harbor

我使用的系统是rocky Linux 9 1. 准备环境 确保你的系统已经安装了以下工具&#xff1a; DockerDocker ComposeOpenSSL&#xff08;用于生成证书&#xff09;#如果不需要通过https连接的可以不设置 1.1 安装 Docker 如果尚未安装 Docker&#xff0c;可以参考以下命令安装&…

PCIe_Host驱动分析_设备枚举

往期内容 本文章相关专栏往期内容&#xff0c;PCI/PCIe子系统专栏&#xff1a; 嵌入式系统的内存访问和总线通信机制解析、PCI/PCIe引入 深入解析非桥PCI设备的访问和配置方法 PCI桥设备的访问方法、软件角度讲解PCIe设备的硬件结构 深入解析PCIe设备事务层与配置过程 PCIe的三…

【CVE-2024-53375】TP-Link Archer系列路由器认证操作系统命令注入(内附远离和代码利用)

CVE-2024-53375 TP-Link Archer系列路由器认证操作系统命令注入 受影响的设备 使用 HomeShield 功能的 TP-Link 设备容易受到此漏洞的影响。这包括 TP-Link Archer 系列的多款路由器。 经过测试 Archer AXE75(EU)_V1_1.2.2 Build 20240827(发布日期 2024 年 11 月 4 日)…

SpringBoot 自动装配原理及源码解析

目录 一、引言 二、什么是 Spring Boot 的自动装配 三、自动装配的核心注解解析 3.1 SpringBootApplication 注解 &#xff08;1&#xff09;SpringBootConfiguration&#xff1a; &#xff08;2&#xff09;EnableAutoConfiguration&#xff1a; &#xff08;3&#xf…

2025系统架构师(一考就过):案例题之一:嵌入式架构、大数据架构、ISA

一、嵌入式系统架构 软件脆弱性是软件中存在的弱点(或缺陷)&#xff0c;利用它可以危害系统安全策略&#xff0c;导致信息丢失、系统价值和可用性降低。嵌入式系统软件架构通常采用分层架构&#xff0c;它可以将问题分解为一系列相对独立的子问题&#xff0c;局部化在每一层中…

单片机上电后程序不运行怎么排查问题?

1.电源检查。使用电压表测量单片机的电源电压是否正常&#xff0c;确保电压在规定的范围内&#xff0c;如常见的5V。 2.复位检查。检查复位引脚的电压是否正常&#xff0c;在单片机接通电源时&#xff0c;复位引脚通常会有一个高电平&#xff0c;按下复位按钮时&#xff0c;复位…

初学stm32 --- 外部中断

目录 STM32 IO 口中断基础知识 相关库函数&#xff1a; 使用 IO 口外部中断的一般步骤 STM32 IO 口中断基础知识 STM32 的每个 IO 都可以作为外部中断的中断输入口。STM32F103 的中断控制器支持 19 个外部中断/事件请求。每个中断设有状态位&#xff0c;每个中断/事件都有独立…

c++------------------函数

函数定义 语法格式 函数定义包括函数头和函数体。函数头包含返回类型、函数名和参数列表。函数体是用花括号{}括起来的代码块&#xff0c;用于实现函数的功能。例如&#xff0c;定义一个计算两个整数之和的函数&#xff1a; int add(int a, int b) {return a b; }这里int是返回…

【java基础系列】实现一个简单的猜数字小游戏

主要是用的java中的键盘录入和随机数两个api&#xff0c;实现这种人机交互的小游戏&#xff0c;可以用来锻炼基础算法思维 实现效果 实现代码 package com.gaofeng.day10;import java.util.Random; import java.util.Scanner;/*** author gaofeng* date 2024-12-22 - 9:21*/ …

helm的介绍和安装

1 helm概述 1.1 资源对象难以管理的问题 helm是k8s资源清单的管理工具&#xff0c;它就像Linux下的包管理器&#xff0c;比如centos的yum&#xff0c;ubuntu的apt helm&#xff1a;命令行工具&#xff0c;主要用于k8s的chart的创建&#xff0c;打包&#xff0c;发布和管理。…

AI,cursor快速上手思维导图

https://cursor101.com/zh/tutorial/learn-cursor-tab

ESP32S3 使用LVGL驱动LCD屏(ST7789主控)

ESP32S3 使用LVGL驱动LCD屏&#xff08;ST7789主控&#xff09; 目录 1 分析原理图 2 驱动、点亮LCD(ST7789) 2.1 在工程中添加目录、文件 2.2 添加esp_lvgl_port组件 2.3 对工程进行必要的配置 2.4 编写必要代码 3 烧录、验证 1 分析原理图 要使用SOC驱动LCD屏&#…

【hackmyvm】Zday靶机wp

HMVrbash绕过no_root_squash静态编译fogproject 1. 基本信息^toc 这里写目录标题 1. 基本信息^toc2. 信息收集2.1. 端口扫描2.2. 目录扫描 3. fog project Rce3.1. ssh绕过限制 4. NFS no_root_squash5. bash运行不了怎么办 靶机链接 https://hackmyvm.eu/machines/machine.ph…

neo4j console 报错

项目场景&#xff1a; neo4j 开启失败 问题描述 在终端打开 neo4j 失败打开cmd, 输入: neo4j console 报错 原因分析&#xff1a; 1 可能是没有配置环境变量2 当前脚本的执行策略有问题 解决方案&#xff1a; 解决没有配置环境变量 添加环境变量 在path路径中将变量添加进去…

范德蒙矩阵(Vandermonde 矩阵)简介:意义、用途及编程应用

参考&#xff1a; Introduction to Applied Linear Algebra – Vectors, Matrices, and Least Squares Stephen Boyd and Lieven Vandenberghe 书的网站: https://web.stanford.edu/~boyd/vmls/ Vandermonde 矩阵简介&#xff1a;意义、用途及编程应用 在数学和计算科学中&a…

编译原理复习---正则表达式+有穷自动机

适用于电子科技大学编译原理期末考试复习。 1. 正则表达式 正则表达式&#xff08;Regular Expression&#xff0c;简称regex或regexp&#xff09;是一种用于描述、匹配和操作文本模式的强大工具。它由一系列字符和特殊符号组成&#xff0c;这些字符和符号定义了一种搜索模式…

CAD跨图纸复制与粘贴怎么操作?教程来了

在过去&#xff0c;图纸的复制粘贴工作大多依赖于电脑完成&#xff0c;手机则因运行内存等硬件限制&#xff0c;难以像电脑那样轻松实现多图同开&#xff0c;以及图纸内容的跨图复制粘贴。为解决这一痛点&#xff0c;CAD看图王手机端推出了跨图复制与粘贴功能&#xff0c;为用户…