2023_Spark_实验三十二:消费Kafka数据并保存到MySQL中

news2025/1/10 2:45:30

实验目的:掌握Scala开发工具消费Kafka数据,并将结果保存到关系型数据库中

实验方法:消费Kafka数据保存到MySQL中

实验步骤:

一、创建Job_ClickData_Process

代码如下:

package exams

import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, HasOffsetRanges, KafkaUtils, LocationStrategies, OffsetRange}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.streaming.{Seconds, StreamingContext}

import java.sql.{Connection, DriverManager, PreparedStatement}
import scala.collection.mutable

/**
 * @projectName sparkGNU2023  
 * @package exams  
 * @className exams.Job_ClickData_Process  
 * @description ${description}  
 * @author pblh123
 * @date 2023/12/20 15:42
 * @version 1.0
 *
 */
    
object Job_ClickData_Process {

  def main(args: Array[String]): Unit = {
    //  1. 创建spark,sc,sparkstreaming对象
    if (args.length != 3) {
      println("您需要输入三个参数")
      System.exit(5)
    }
    val musrl: String = args(0)
    val conf = new SparkConf().setAppName(s"${this.getClass.getSimpleName}").setMaster(musrl)
    val sc: SparkContext = new SparkContext(conf)
    sc.setLogLevel("WARN")
    val ckeckpointdir: String = args(1)
    val ssc = new StreamingContext(sc, Seconds(5)) //连续流批次处理的大小

    //  2. 代码主体
//    设置ckeckpoint目录
    ssc.checkpoint(ckeckpointdir)

    //准备kafka的连接参数
    val kfkbst: String = args(2)
    val kafkaParams: Map[String, Object] = Map[String, Object](
      "bootstrap.servers" -> kfkbst,
      "group.id" -> "SparkKafka",
      //latest表示如果记录了偏移量的位置,就从记录的位置开始消费,如果没有记录,就从最新/或最后的位置开始消费
      //earliest表示如果记录了偏移量的位置,就从记录的位置开始消费,如果没有记录,就从最开始/最早的位置开始消费
      //none示如果记录了偏移量的位置,就从记录的位置开始消费,如果没有记录,则报错
      "auto.offset.reset" -> "latest", //偏移量的重置位置
      "enable.auto.commit" -> (false: java.lang.Boolean), //是否自动提交偏移量
      "key.deserializer" -> classOf[StringDeserializer],
      "value.deserializer" -> classOf[StringDeserializer]
    )
    val topics: Array[String] = Array("RealDataTopic")

    //从mysql中查询出offsets:Map[TopicPartition, Long]
    val offsetsMap: mutable.Map[TopicPartition, Long] = OffsetUtils.getOffsetMap("SparkKafka", "RealDataTopic")
    val kafkaDS: InputDStream[ConsumerRecord[String, String]] = if (offsetsMap.size > 0) {
      println("MySql记录了offset信息,从offset处开始消费")
      //连接kafka的消息
      KafkaUtils.createDirectStream[String, String](
        ssc,
        LocationStrategies.PreferConsistent,
        ConsumerStrategies.Subscribe[String, String](topics, kafkaParams, offsetsMap)
      )
    } else {
      println("MySql没有记录了offset信息,从latest处开始消费")
      //连接kafka的消息
      KafkaUtils.createDirectStream[String, String](
        ssc,
        LocationStrategies.PreferConsistent,
        ConsumerStrategies.Subscribe[String, String](topics, kafkaParams)
      )
    }

    //实时处理数据并手动维护offset
    val valueDS = kafkaDS.map(_.value()) //_表示从kafka中消费出来的每一条数据
    valueDS.print()
    kafkaDS.map(_.value())
    valueDS.foreachRDD(rdd => {
      rdd.foreachPartition(lines => {
        //将处理分析的结果存入mysql
        /*
        DROP TABLE IF EXISTS `job_real_time`;
        CREATE TABLE `job_real_time` (
        `datetime` varchar(8) DEFAULT NULL COMMENT '日期',
        `job_type` int(2) DEFAULT NULL COMMENT '1代表新招聘岗位,0代表找工作的人',
        `job_id` int(8) DEFAULT NULL COMMENT '岗位ID,匹配岗位名称',
        `count` int(8) DEFAULT NULL COMMENT '企业新增岗位数和找工作的人数'
        ) ENGINE=InnoDB DEFAULT CHARSET=utf8;

        */
        //1.开启连接
        val conn: Connection = DriverManager.getConnection("jdbc:mysql://192.168.137.110:3306/bigdata19?characterEncoding=UTF-8&serverTimezone=UTC", "lh", "Lh123456!")
        //2.编写sql并获取ps
        val sql: String = "replace into job_real_time(datetime,job_type,job_id,count) values(?,?,?,?)"
        val ps: PreparedStatement = conn.prepareStatement(sql)
        //3.设置参数并执行
        for (line <- lines) {
          var item = line.split(" ")
          ps.setString(1, item(0).toString)
          ps.setInt(2, item(1).toInt)
          ps.setInt(3, item(2).toInt)
          ps.setInt(4, item(3).toInt)
          ps.executeUpdate()
        }
        //4.关闭资源
        ps.close()
        conn.close()
      })
    })

    //手动提交偏移量
    kafkaDS.foreachRDD(rdd => {
      if (rdd.count() > 0) {
        //获取偏移量
        val offsets: Array[OffsetRange] = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
        OffsetUtils.saveOffsets(groupId = "SparkKafka", offsets)
      }
    })

    //开启sparkstreaming任务并等待结束,关闭ssc,sc
    ssc.start()
    ssc.awaitTermination()
    ssc.stop()
    sc.stop()
  }

}

二、编写模拟点击量并消费Kafka数据

启动zookeeper集群

zk.sh start

启动kafka集群

kf.sh start

检查模拟的实时数据是否正常更新

不断正常更新的情况下,启动flume采集real-time-data.log的实时数据

启动flume

在mysql数据库中准备偏移表与实时数据表

启动Job_ClickData_Process方法消费kafka数据并保存到mysql中


 

检查mysql表是否存入数据

实验结果:通过scala开发spark代码实现消费kafka数据存储到MySQL中

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1325665.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【C++初阶】第一站:C++入门基础(下)

前言&#xff1a; 紧接着上两篇文章&#xff0c;c入门基础(上)&#xff1a;C入门基础(上) c入门基础(中)&#xff1a;C入门基础(中) 继续补充完c初阶入门基础的知识点&#xff0c;本章知识点包括&#xff1a; 引用和指针的区别、内联函数、auto关键字(C11)、基于范围的for循环…

开源投票微信小程序源码系统+超强的盈利模式+礼物道具刷不停+完整的代码包 附带安装部署教程

微信小程序的日益普及&#xff0c;越来越多的企业和开发者开始关注如何利用小程序进行业务拓展和创新。投票微信小程序源码系统就是在这样的背景下应运而生。这款源码系统旨在为广大开发者提供一个功能强大、易于扩展的投票系统&#xff0c;以满足各种行业和场景的需求。 以下…

vp与vs联合开发-Ini配置文件

1.*.ini文件是Initialization file的缩写&#xff0c;即为初始化文件&#xff0c;是Windows系统配置文件所采用的存储格式&#xff0c;统管Windows的各项配置&#xff0c; 2.可以用来存放软件信息、注册表信息等 3.可以使用代码方式和手动编辑操作 &#xff0c;一般不用直接编辑…

工业互联网平台存在意义是什么?国内有哪些工业互联网平台?

一、工业互联网平台存在意义是什么&#xff1f; 工业互联网平台是一个连接设备与服务、数据与人的跨行业、跨领域的全新工业平台。工业互联网平台利用了互联网、物联网、大数据、AI等技术&#xff0c;集成各类工业设备&#xff0c;不断采集和分析数据&#xff0c;以实现设备状…

分享一些实用工具和学习网站

1 前言 虽然已经工作过几年的时间了&#xff0c;但是学习的脚步是不能停止的&#xff0c;对于学习&#xff0c;特别是自学&#xff0c;善于搜索网上的一些资源来辅助&#xff0c;还是非常有必要的&#xff0c;下面我就把这几年私藏的各种资源&#xff0c;网站贡献出来给你们。…

外汇天眼:五大常见交易心理陷阱,你中了几个?

抗争的心理 我们从小所受的教育都是建立在抗争和竞争的基础上&#xff0c;如征服自然&#xff0c;战胜各种艰难障碍&#xff0c;与困难搏斗。 这种意识已经深深地扎根于我们的内在。 当我们进入证券市场还是带着这样的心理意识。 我们常常看到一些各行各业的精英来到证券市场…

SAP ABAP 双击回应

导语&#xff1a;最近在做带自定义屏幕的功能开发&#xff0c;用户希望点击屏幕上的内容进行穿透。 一、需求 用户希望点击下面屏幕上的客户代码&#xff0c;穿透到BP界面&#xff0c;这种功能在SAP标准的屏幕上屡见不鲜&#xff0c;研究一下了&#xff0c;这个属于屏幕上的【…

算法----Dota2 参议院

题目 Dota2 的世界里有两个阵营&#xff1a;Radiant&#xff08;天辉&#xff09;和 Dire&#xff08;夜魇&#xff09; Dota2 参议院由来自两派的参议员组成。现在参议院希望对一个 Dota2 游戏里的改变作出决定。他们以一个基于轮为过程的投票进行。在每一轮中&#xff0c;每…

64. 最小路径和已解答 23.12.07(一)补

给定一个包含非负整数的 m x n 网格 grid &#xff0c;请找出一条从左上角到右下角的路径&#xff0c;使得路径上的数字总和为最小。 说明&#xff1a;每次只能向下或者向右移动一步。 示例 1&#xff1a; 输入&#xff1a;grid [[1,3,1],[1,5,1],[4,2,1]] 输出&#xff1a;7…

数据结构和算法-二叉排序树

文章目录 二叉排序树总览二叉排序树的定义二叉排序树的查找二叉排序树的插入二叉排序树的构造二叉排序树的删除删除的是叶子节点删除的是只有左子树或者只有右子树的节点删除的是有左子树和右子树的节点 查找效率分析查找成功查找失败 小结 二叉排序树 总览 二叉排序树的定义 …

泽攸科技SEM台式扫描电子显微镜

泽攸科技是一家国产的科学仪器公司&#xff0c;专注于研发、生产和销售原位电镜解决方案、扫描电镜整机、台阶仪、探针台等仪器。目前台式扫描电镜分为三个系列&#xff1a;ZEM15、ZEM18、ZEM20。 ZEM15台式扫描电镜&#xff1a; ZEM18台式扫描电镜&#xff1a; ZEM20台式扫描…

FA-238V (MHz范围晶体单元微型低轮廓SMD)

FA-238V晶振是EPSON推出的一款额定频率12MHz至15.999MHz的石英晶体谐振器&#xff0c;它能够实现高效、稳定的数据传输&#xff0c;外形尺寸&#xff08;3.2 2.5 0.7mm&#xff09;具有小型超薄、 稳定性好&#xff0c;这种微型化的设计不仅提升了设备的整体性能&#xff0c;…

网络安全知识图谱 图数据库介绍及语法

本体构建: 资产&#xff1a; 系统&#xff0c;软件 威胁&#xff1a; 攻击&#xff1a; 建模&#xff1a; 3个本体 5个实体类型 CWE漏洞库 http://cwe.mitre.org/data/downloads.html CPECP攻击模式分类库 http://capec.mitre.org/data/downloads.html CPE通用组件库 http:…

Java如何将中文转化为拼音

Java中可以使用第三方库pinyin4j来实现中文转拼音。 首先&#xff0c;需要引入pinyin4j的jar包&#xff0c;可以在pinyin4j的官方网站&#xff08;http://pinyin4j.sourceforge.net/&#xff09;下载&#xff0c;也可以通过Maven引入。 Maven引入依赖&#xff1a; <depend…

多表插入、删除操作(批量)——后端

多表插入 场景&#xff1a;当添加一个菜品时&#xff0c;还需要记录菜品的口味信息&#xff0c;因此需要对菜品表&#xff08;dish&#xff09;和口味表&#xff08;dish_flavor&#xff09;同时进行插入操作。 两个表的字段&#xff1a; 代码思路&#xff1a;由DishControll…

用JVS低代码实现业务流程的撤回和重新开始

在当今的数字化时代&#xff0c;业务流程的效率和准确性对于企业的运营至关重要。在实际业务场景中&#xff0c;我们可能需要处理一些复杂的流程&#xff0c;例如申请审批流程、合同签订流程等。这些流程在执行过程中可能会遇到各种情况&#xff0c;例如某个审批步骤需要重新审…

Spring Boot学习随笔- 文件上传和下载(在线打卡、附件下载、MultipartFile)

学习视频&#xff1a;【编程不良人】2021年SpringBoot最新最全教程 第十二章、文件上传、下载 文件上传 文件上传是指将文件从客户端计算机传输到服务器的过程。 上传思路 前端的上传页面&#xff1a;提交方式必须为post&#xff0c;enctype属性必须为multipart/form-data开发…

百度智能云上新:一手实测!0门槛、分钟级打造智能体

“大模型时代真正的价值在于AI原生应用” 继腾讯云推出高性能应用服务HAI&#xff0c;10分钟创建AIGC应用之后&#xff0c;百度云昨天在智算大会上也发布了AI云生应用平台AppBuilder&#xff0c;号称0门槛、分钟级打造智能体agent。AI赛道好不热闹。小编今天就带大家来实测一把…

全光谱护眼灯哪个牌子好?全光谱备考护眼台灯推荐

什么是全光谱&#xff1f;全光谱指的是光谱中包含紫外光、可见光、红外光的光谱曲线&#xff0c;并且在可见光部分中红绿蓝的比例与阳光近似&#xff0c;显色指数接近于100的光谱。太阳光的光谱可以称作全光谱&#xff0c;太阳光的色温是随着四季和早晚时间变化而变化&#xff…

芯知识 | 什么是语音芯片的32Mhz内部振荡及其应用优势

随着科技的飞速发展&#xff0c;语音芯片已经成为现代电子产品中不可或缺的一部分。而在这些芯片的技术参数中&#xff0c;我们常常可以看到“32Mhz内部振荡”这样的描述。那么&#xff0c;究竟什么是语音芯片的32Mhz内部振荡&#xff0c;它又为应用带来了哪些优势呢&#xff1…