Kafka/Spark-01消费topic到写出到topic

news2025/1/21 0:52:45

1 Kafka的工具类

1.1 从kafka消费数据的方法

  1. 消费者代码
  def getKafkaDStream(ssc : StreamingContext , topic: String  , groupId:String  ) ={
    consumerConfigs.put(ConsumerConfig.GROUP_ID_CONFIG , groupId)

    val kafkaDStream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream(ssc,
      LocationStrategies.PreferConsistent,
      ConsumerStrategies.Subscribe[String, String](Array(topic), consumerConfigs))
    kafkaDStream
  }
  1. 注意点
  • consumerConfigs是定义的可变的map的类型的,具体如下
private val consumerConfigs: mutable.Map[String, Object] = mutable.Map[String,Object](
    // kafka集群位置

    ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> MyPropsUtils(MyConfig.KAFKA_BOOTSTRAP_SERVERS),

    // kv反序列化器
    ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> "org.apache.kafka.common.serialization.StringDeserializer",
    ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> "org.apache.kafka.common.serialization.StringDeserializer",
    // groupId
    // offset提交  自动 手动
    ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG -> "true",
    //自动提交的时间间隔
    //ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG
    // offset重置  "latest"  "earliest"
    ConsumerConfig.AUTO_OFFSET_RESET_CONFIG -> "latest"
    // .....
  )
  • consumerConfigs.put(ConsumerConfig.GROUP_ID_CONFIG , groupId)是为了不限制groupId特意写的传参

  • 是使用自带的kafka工具类createDirectStream方法去消费kafak 的数据,详细参数解释如下

在`KafkaUtils.createDirectStream`方法中,后续传递的参数的含义如下:

1. `ssc`:这是一个`StreamingContext`对象,用于指定Spark Streaming的上下文。
2. `LocationStrategies.PreferConsistent`:这是一个位置策略,用于指定Kafka消费者的位置策略。`PreferConsistent`表示优先选择分区分布均匀的消费者。
3. `ConsumerStrategies.Subscribe[String, String]`:这是一个消费者策略,用于指定Kafka消费者的订阅策略。`Subscribe[String, String]`表示按照指定的泛型主题字符串数组订阅消息,键和值的类型都为`String`。
4. `Array(topic)`:这是一个字符串数组,用于指定要订阅的Kafka主题。
5. `consumerConfigs`:这是一个`java.util.Properties`类型的对象,其中配置了一些Kafka消费者的属性。

总之,在`KafkaUtils.createDirectStream`方法中,这些参数组合被用于创建一个Kafka直连流(Direct Stream),该流可以直接从Kafka主题中消费消息,并将其转换为`InputDStream[ConsumerRecord[String, String]]`类型的DStream。

在这里插入图片描述

  • Subscribe传参需要指定泛型,这边指定string,表示指定主题的键和值的类型,即Array(topic), consumerConfigs传参是string

在这里插入图片描述

  • 最后方法返回一个kafkaDStream

1.2 kafka的生产数据的方法

  1. 生产者代码
  • 创建与配置
/**
    * 生产者对象
    */
  val producer : KafkaProducer[String,String] = createProducer()

  /**
    * 创建生产者对象
    */
  def createProducer():KafkaProducer[String,String] = {
    val producerConfigs: util.HashMap[String, AnyRef] = new util.HashMap[String,AnyRef]
    //生产者配置类 ProducerConfig
    //kafka集群位置
    //producerConfigs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092,hadoop103:9092,hadoop104:9092")
    //producerConfigs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,MyPropsUtils("kafka.bootstrap-servers"))
    producerConfigs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,MyPropsUtils(MyConfig.KAFKA_BOOTSTRAP_SERVERS))
    //kv序列化器
    producerConfigs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG , "org.apache.kafka.common.serialization.StringSerializer")
    producerConfigs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG , "org.apache.kafka.common.serialization.StringSerializer")
    //acks
    producerConfigs.put(ProducerConfig.ACKS_CONFIG , "all")
    //batch.size  16kb
    //linger.ms   0
    //retries
    //幂等配置
    producerConfigs.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG , "true")

    val producer: KafkaProducer[String, String] = new KafkaProducer[String,String](producerConfigs)
    producer
  }
  • 生产方法
  /**
    * 生产(按照默认的黏性分区策略)
    */
  def send(topic : String  , msg : String ):Unit = {
    producer.send(new ProducerRecord[String,String](topic , msg ))
  }

  /**或者!
    * 生产(按照key进行分区)
    */
  def send(topic : String  , key : String ,  msg : String ):Unit = {
    producer.send(new ProducerRecord[String,String](topic , key ,  msg ))
  }
  • 关闭生产
/**
    * 关闭生产者对象
    */
  def close():Unit = {
    if(producer != null ) producer.close()
  }

  /**
    * 刷写 ,将缓冲区的数据刷写到磁盘
    *
    */
  def flush(): Unit ={
    producer.flush()
  }

2 消费数据

2.1 消费到数据

单纯的使用返回的ConsumerRecord不支持序列化,没有实现序列化接口

在这里插入图片描述

因此需要转换成通用的jsonobject对象

//3. 处理数据
    //3.1 转换数据结构
    val jsonObjDStream: DStream[JSONObject] = offsetRangesDStream.map(
      consumerRecord => {
        //获取ConsumerRecord中的value,value就是日志数据
        val log: String = consumerRecord.value()
        //转换成Json对象
        val jsonObj: JSONObject = JSON.parseObject(log)
        //返回
        jsonObj
      }
    )

2.2 数据分流发送到对应topic

  1. 提取错误数据并发送到对应的topic中
jsonObjDStream.foreachRDD(
      rdd => {

        rdd.foreachPartition(
          jsonObjIter => {
            for (jsonObj <- jsonObjIter) {
              //分流过程
              //分流错误数据
              val errObj: JSONObject = jsonObj.getJSONObject("err")
              if(errObj != null){
                //将错误数据发送到 DWD_ERROR_LOG_TOPIC
                MyKafkaUtils.send(DWD_ERROR_LOG_TOPIC ,  jsonObj.toJSONString )
              }else{
                  
              }
            }
          }
        }	
  1. 将公共字段和页面数据发送到DWD_PAGE_DISPLAY_TOPIC
else{
                // 提取公共字段
                val commonObj: JSONObject = jsonObj.getJSONObject("common")
                val ar: String = commonObj.getString("ar")
                val uid: String = commonObj.getString("uid")
                val os: String = commonObj.getString("os")
                val ch: String = commonObj.getString("ch")
                val isNew: String = commonObj.getString("is_new")
                val md: String = commonObj.getString("md")
                val mid: String = commonObj.getString("mid")
                val vc: String = commonObj.getString("vc")
                val ba: String = commonObj.getString("ba")
                //提取时间戳
                val ts: Long = jsonObj.getLong("ts")
                // 页面数据
                val pageObj: JSONObject = jsonObj.getJSONObject("page")
                if(pageObj != null ){
                  //提取page字段
                  val pageId: String = pageObj.getString("page_id")
                  val pageItem: String = pageObj.getString("item")
                  val pageItemType: String = pageObj.getString("item_type")
                  val duringTime: Long = pageObj.getLong("during_time")
                  val lastPageId: String = pageObj.getString("last_page_id")
                  val sourceType: String = pageObj.getString("source_type")

                  //封装成PageLog,这边还写了bean实体类去接收
                  var pageLog =
                    PageLog(mid,uid,ar,ch,isNew,md,os,vc,ba,pageId,lastPageId,pageItem,pageItemType,duringTime,sourceType,ts)
                  //发送到DWD_PAGE_LOG_TOPIC
                  MyKafkaUtils.send(DWD_PAGE_LOG_TOPIC , JSON.toJSONString(pageLog , new SerializeConfig(true)))//scala中bean没有set和get方法,这边是直接操作字段
                }

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/996966.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

centos7安装airflow2.7.1

python3安装 版本 Python-3.9.17 ./configure --prefix/usr/local/python3 make && make install随后用ln -s短链接python3和pip3 airflow安装 版本2.7.1 export AIRFLOW_HOME~/airflow编写一个sh文件 AIRFLOW_VERSION2.7.1# Extract the version of Python you…

C 风格文件输入/输出---直接输入/输出---(std::fread)---(std::fwrite)

C 标准库的 C I/O 子集实现 C 风格流输入/输出操作。 <cstdio> 头文件提供通用文件支持并提供有窄和多字节字符输入/输出能力的函数&#xff0c;而 <cwchar>头文件提供有宽字符输入/输出能力的函数。 从直接输入/输出 文件读取 std::fread 从给定输入流 stream …

基于ESP32设计可以通过 WiFi 控制的基于 ESP32 的定制四轴飞行器

介绍 我想选择一个涉及物联网概念的项目,例如无线通信和服务器端脚本编写。我最终决定建造一架四轴飞行器,使用定制的机载飞行控制器进行飞行,该控制器通过 WiFi 从触摸屏平板电脑接收操纵命令。该项目的最终目标是让四轴飞行器从相机图像中跟踪移动物体并跟随它。 硬件 对…

正则表达式:实数

正则表达式&#xff1a;实数 校验字符串&#xff0c;为有效的实数。 可以为&#xff1a;正数或负数&#xff1b; 可以为&#xff1a;整数或小数&#xff1b; 但是&#xff0c;不可以为非数值型的字符串&#xff0c;不可以是一连串的“0” 。 原始正则表达式 ^-?(0|[1-9]\d…

高可用Kuberbetes部署Prometheus + Grafana

概述 阅读官方文档部署部署Prometheus Grafana GitHub - prometheus-operator/kube-prometheus at release-0.10 环境 步骤 下周官方github仓库 git clone https://github.com/prometheus-operator/kube-prometheus.git git checkout release-0.10 进入工作目录 cd kube…

GDB的C++调试方法

本文记录基础的GDB调试过程&#xff0c;包含指令如下&#xff1a; 文章目录 准备编译文件GDB启动GDB开启代码行设置断点运行程序查看pc的指令查看监视的变量以及断点设置快照checkpoint实验1实验2 nextnextistepx/i $pcfinishinfo break 和 delete Numrefbreak col. if condit…

浅谈原型链

一.在掌握原型链之前首先要了解这三点 1.每个函数都有prototype这个属性我们称为原型对象 2.每个对象都有__proto__这个属性 3.对象的__proto__可以访问原型对象上的方法和变量,如果访问不了,就会向上进行查找,直到找不到为止,会出现报错的情况l。 二.例子 1.代码: let arr …

云计算与虚拟化

一、概念 什么是云计算&#xff1f; 云计算&#xff08;cloud computing&#xff09;是分布式计算的一种&#xff0c;指的是通过网络“云”将巨大的数据计算处理程序分解成无数个小程序&#xff0c;然后&#xff0c;通过多部服务器组成的系统进行处理和分析这些小程序得到结果…

基于folium绘制黑河腾冲线,胡焕庸线

背景 黑河腾冲线&#xff0c;又名胡焕庸线&#xff0c;是我们人口密度分布的的近似分界线。今天基于folium&#xff0c;使用python来绘制这条线。 代码 # -*- coding:UTF-8 -*-# region 引入必要依赖 from selfPyTools.mapModule import * # endregion# 准备一个地图类对象,…

学习Bootstrap 5的第十一天

折叠 基础的折叠 在 Bootstrap 5 中&#xff0c;折叠效果可以通过添加特定的属性和类来轻松实现内容的显示和隐藏。具体步骤如下&#xff1a; 1、创建一个可折叠的元素&#xff0c;通常使用 <div> 标签&#xff0c;并为其添加 .collapse 类&#xff0c;以指示它是可折…

智能化时代前端开发使用Amazon CodeWhisperer在vscode中编写代码

目录 一、概述 1.Amazon CodeWhisperer使用您的 AI 编码配套应用程序更快、更安全地构建应用程序。 2.CodeWhisperer 经过数十亿行代码的训练&#xff0c;可以根据您的评论和现有代码实时生成从代码片段到全函数的代码建议。绕过耗时的编码任务&#xff0c;加速使用不熟悉的 …

【自学开发之旅】Flask-数据查询-数据序列化-数据库关系(四)

db.session ProductInfo.query filter() 灵活查询 filter_by() limit() 限制输出条目 offset() 偏移量 order_by() 排序 group_by() 分组聚合 <模型类>.query.<过滤方法> 过滤方法 查询方法 “牛”字开头且&#xff08;“,”默认&#xff09;价格大于5的 &g…

JS判断当前是早上,中午,下午还是晚上

<!DOCTYPE html> <html><head><meta charset"utf-8" /><title></title></head><body><div></div><script>function getTimeState() {// 获取当前时间let timeNow new Date();// 获取当前小时let…

Pytest系列-fixture的详细使用和结合conftest.py的详细使用(3)

介绍 前面一篇讲了setup、teardown可以实现在执行用例前或结束后加入一些操作&#xff0c;但这种都是针对整个脚本全局生效的。 Fixture是pytest的非常核心功能之一&#xff0c;在不改变被装饰函数的前提下对函数进行功能增强&#xff0c;经常用于自定义测试用例前置和后置工作…

【C++】string类模拟实现上篇(附完整源码)

目录 前言1. string的基本结构2. 构造函数、析构函数2.1 构造函数的实现2.1.1带参构造函数 2.2析构函数2.3无参构造函数2.4无参和带参构造函数合并 3. string的遍历3.1 operator[ ]3.2迭代器模拟实现 (简单实现&#xff09;3.3 const迭代器模拟实现 4. 数据的增删查改4.1 reser…

最经典的解析LSA数据库(第六课)

初步认识OSPF的大致内容(第三课)_IHOPEDREAM的博客-CSDN博客 1 OSPF 工作过程 建立领居表 同步数据库 今天来 说一说数据库概念 计算路由表 2 什么是数据库&#xff1f; 数据库是一个组织化的数据集合&#xff0c;用于存储、管理和检索数据。它是一个可访问的集合&#x…

[SICTF 2023 #Round2] Crypto,PWN,Reverse

似乎很久没写了。 周五到周日&#xff0c;两天的这个比赛&#xff0c;有些东西还真是头回用&#xff0c;值得纪录一下。 Crypto 密码这块这届还是比较简单的&#xff0c;没有复杂的题&#xff0c;但量大分多。 【签到】古典大杂烩 给了一堆emoji的图 &#x1f429;&#x…

英国私校的艺术奖学金有哪些?申请要求和申请流程详解!

众所周知&#xff0c;英国私校不仅学术拔尖&#xff0c;在对学生艺术方面的培养也是毫不逊色的。几乎打开每一所英国私校的官网&#xff0c;都可以看到学校罗列的提供的各类课外艺术活动的精彩照片。      每个英国私校除了课后开设的五花八门的兴趣课外&#xff0c;还有各…

【项目 计网12】4.32UDP通信实现 4.33广播 4.34组播 4.35本地套接字通信

文章目录 4.32UDP通信实现udp_client.cudp_server.c 4.33广播bro_server.cbro_client.c 4.34组播multi_server.cmulti_client.c 4.35本地套接字通信ipc_server.cipc_client.c 4.32UDP通信实现 udp_client.c #include <stdio.h> #include <stdlib.h> #include <…

2023-09-10 LeetCode每日一题(课程表 II)

2023-09-10每日一题 一、题目编号 210. 课程表 II二、题目链接 点击跳转到题目位置 三、题目描述 现在你总共有 numCourses 门课需要选&#xff0c;记为 0 到 numCourses - 1。给你一个数组 prerequisites &#xff0c;其中 prerequisites[i] [ai, bi] &#xff0c;表示在…