Spark项目通用开发框架

news2024/9/19 10:45:48

文章目录

  • 1. 大数据项目结构
  • 2. 类说明
    • 2.1 公共接口类
    • 2.2 TaskNameEnum指定每个任务的名称
    • 2.3 TaskRunner中编写任务的业务逻辑
  • 3. 任务执行脚本

每个公司内部都有一套自己的架子,一般新人来了就直接在已有的架子上开发业务。
以下仅仅作为记录下自己使用的架子,不作为任何推荐,也不认为这样的组织结构就是好用的。

1. 大数据项目结构

项目的整体组织结构
在这里插入图片描述


目录说明
annotation自定义注解Runner和Task。
app用来放整个项目的各个任务。
test1和test2是具体开发的业务任务。
baseBaseRunner和BaseTask是两个基础类
enums用来定义任务的别名
FeatureContextApp主类在目录中的位置保持不变,如果移动,会影响扫描task和Runner

2. 类说明

2.1 公共接口类

package com.king.ml.base

import com.king.ml.enums.TaskNameEnum
import org.apache.spark.SparkConf
import org.apache.spark.internal.Logging
import org.apache.spark.sql.SparkSession
import org.joda.time.DateTime
import org.springframework.util.StopWatch

import scala.util.{Failure, Success, Try}

trait BaseTask extends Logging with Serializable {

  def taskName: TaskNameEnum.Value
  def initConf(sparkConf: SparkConf = new SparkConf()): SparkConf = sparkConf
  var runtime: StopWatch = _


  def around(implicit spark: SparkSession, currDate: DateTime = DateTime.now): Unit = {
    before
    Try {
      Class.forName(spark.conf.get("task.runner"))
        .newInstance()
        .asInstanceOf[BaseRunner]
        .run
    } match {
      case Success(_) => after
      case Failure(_) => afterThrowException
    }

  }


  private def before(implicit spark: SparkSession, currDate: DateTime): Unit = {

    val taskName = spark.conf.get("task.runner")
    println("开始执行任务 ...["+taskName+"]")
    runtime = new StopWatch(taskName)
    runtime.start(taskName)

  }

  private def after(implicit spark: SparkSession, currDate: DateTime): Unit = {
    val taskName = spark.conf.get("task.runner")
    runtime.stop()
    println("任务执行结束 ...["+ taskName+"],共耗时:" + runtime.getTotalTimeSeconds +"秒")
  }

  private def afterThrowException(implicit spark: SparkSession, currDate: DateTime): Unit = {
    val taskName = spark.conf.get("task.runner")
    runtime.stop()
    println("任务执行异常 ...[" + taskName + "],共耗时:" + runtime.getTotalTimeSeconds + "秒")
  }
}


通过一个公共的接口记录每个任务执行的具体日志信息。

在这里插入图片描述

2.2 TaskNameEnum指定每个任务的名称

  
object TaskNameEnum extends Enumeration {

  def getEnumType(source:String):TaskNameEnum.Value = {
    val values =TaskNameEnum.values.toList.filter(_.toString.toUpperCase == source.toUpperCase)
    values.length match {
      case 1 => values.head
      case _ => throw new IllegalArgumentException("该任务不存在")
    }
  }

  val Test1 = Value("ods.ods_test1")
  val Test2 = Value("ods.ods_test2")

}

这里的Test1和Test2表示任务的名称。

2.3 TaskRunner中编写任务的业务逻辑

package com.king.ml.app.test1

import com.king.ml.annotation.Runner
import com.king.ml.base.BaseRunner
import com.king.ml.enums.TaskNameEnum
import org.apache.spark.sql.SparkSession
import org.joda.time.DateTime


@Runner
class Test1TaskRunner extends BaseRunner{
  override def taskName: TaskNameEnum.Value = TaskNameEnum.Test1

  override def run(implicit spark: SparkSession, currDate: DateTime): Unit = {
    val cnt = spark.table("ods.ods_test1").count()
    println("===>总记录数为:")
    println("===>" + cnt)

  }
}

3. 任务执行脚本

在执行脚本中,任务主程序名不需要改变,只需要给任务传参枚举中任务名的值即可。

spark-submit \
--name 'test-ml' \
--master yarn \
--deploy-mode client \
--conf spark.port.maxRetries=100 \
--conf spark.dynamicAllocation.enabled=false \
--conf spark.yarn.executor.memoryOverhead=5120 \
--queue root.production \
--driver-memory 2g  --num-executors 2 --executor-memory 2g --executor-cores 1 \
--class com.king.ml.app.FeatureContextApp \
./ml/ml-demo.jar "ods.ods_test1"

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1916862.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

低代码平台赋能企业全面数字化转型

引言:在当今这个日新月异的数字化时代,企业正面临着前所未有的机遇与挑战。为了保持竞争力并实现可持续发展,企业亟需进行全面的数字化转型。而低代码平台作为数字化转型的重要工具,正以其独特的优势赋能企业,推动其向…

SQL Server 查询死锁以及解决死锁的基本知识(图文)

目录 1. 基本知识2. 查看和解锁被锁的表3. 查看和处理数据库堵塞 1. 基本知识 在 SQL Server 中,死锁是指两个或多个进程互相等待对方持有的资源,从而无法继续执行的现象 要解决死锁问题,首先需要识别并分析死锁的发生原因,然后…

C++基础语法:链表和数据结构

前言 "打牢基础,万事不愁" .C的基础语法的学习 引入 链表是最基础的数据集合,对标数组.数组是固定长度,随机访问,链表是非固定长度,不能随机访问.数组查找快,插入慢;链表是插入快,查找慢. 前面推导过"数据结构算法数据集合".想建立一个数据集合,就要设计数…

K8S中部署 Nacos 集群

1. 准备 GitK8Skubectlhelm 咱也没想到 K8S 部署系列能搞这么多次,我一个开发天天干运维的活,前端后端运维测试工程师实至名归。 2. 方案选择 https://github.com/nacos-group/nacos-k8s 我替你们看了一下,有好几种方式能部署&#xff…

极狐Gitlab安装部署

GitLab 是一个基于 Git 的开源 DevOps 平台,提供代码仓库管理、CI/CD(持续集成和持续交付)、项目管理、监控和安全等功能。它集成了多种工具,帮助开发团队在一个平台上进行代码开发、测试、部署和运维。以下是 GitLab 的主要功能和…

LLM - 绝对与相对位置编码 与 RoPE 旋转位置编码 源码

欢迎关注我的CSDN:https://spike.blog.csdn.net/ 本文地址:https://spike.blog.csdn.net/article/details/140281680 免责声明:本文来源于个人知识与公开资料,仅用于学术交流,欢迎讨论,不支持转载。 Transformer 是基于 MHSA (多头自注意力),然而,MHSA 对于位置是不敏感…

自定义类型:联合体

像结构体一样,联合体也是由一个或者多个成员组成,这些成员可以是不同的类型。 联合体类型的声明 编译器只为最⼤的成员分配⾜够的内存空间。联合体的特点是所有成员共⽤同⼀块内存空间。所以联合体也叫:共⽤体。 输出结果: 联合体…

AcWing 3381:手机键盘

【题目来源】https://www.acwing.com/problem/content/3384/【题目描述】 请你计算按照手机键盘(9键输入法)输入字母的方式,键入给定字符串(由小写字母构成)所花费的时间。 具体键入规则和花费时间如下描述&#xff1a…

科普文:Java对象在堆中的内存结构

概叙 今天来讲些抽象的东西 -- 对象头,因为我在学习的过程中发现很多地方都关联到了对象头的知识点,例如JDK中的 synchronized锁优化 和 JVM 中对象年龄升级等等。 对象内存构成# Java 中通过 new 关键字创建一个类的实例对象,对象存于内存的…

【人工智能】-- 反向传播

个人主页:欢迎来到 Papicatch的博客 课设专栏 :学生成绩管理系统 专业知识专栏: 专业知识 文章目录 🍉引言 🍉反向传播 🍈定义 🍈反向传播的作用 🍍参数优化 🍍学…

软件测试学习之-ADB命令

ADB命令 adb工具即Android Debug Bridge(安卓调试桥) tools。它就是一个命令行窗口,用于通过电脑端与模拟器或者真实设备交互。在某些特殊的情况下进入不了系统,adb就派上用场啦! Android程序的开发通常需要使用到一…

解决Anaconda下载pytorch常见问题

1.问题一 安装完Anaconda后,输入conda命令,出现 conda不是内部或外部命令,也不是可运行的程序 或批处理文件。 分析原因:未配置环境到系统变量 解决方法:将Anaconda安装路径和Anaconda目录下的Scripts文件的路径配…

本地部署,GFPGAN: 实用的面部修复算法

目录 什么是 GFPGAN? 技术原理 主要功能 应用场景 本地安装 运行结果 结语 Tip: 在图像处理和计算机视觉领域,面部修复是一个重要且具有挑战性的研究方向。随着深度学习技术的不断进步,许多新的算法被提出,用于…

Linux笔记之使用系统调用sendfile高速拷贝文件

Linux笔记之使用系统调用sendfile高速拷贝文件 code review! 文章目录 Linux笔记之使用系统调用sendfile高速拷贝文件sendfile 性能优势sendfile 系统调用优点:缺点: cp 命令优点:缺点: 实际测试:拷贝5.8个G的文件&a…

《Windows API每日一练》9.1.5 自定义资源

自定义资源(Custom Resources)是在 Windows 程序中使用的一种资源类型,用于存储应用程序特定的数据、图像、音频、二进制文件等。通过自定义资源,开发者可以将应用程序所需的各种资源文件集中管理和存储,便于在程序中访…

开源可视化Flutter图表库:Graphic

Graphic:用Graphic绘制数据的无限可能- 精选真开源,释放新价值。 概览 Graphic,这个基于Flutter的图表库,以其源自《The Grammar of Graphics》的灵感,为数据可视化提供了一种全新的方法。它不仅仅是一个工具&#xf…

安全策略与用户认证综合实验

一、实验拓扑 二、实验需求 1,DMZ区内的服务器,办公区仅能在办公时间内(9:00-18:00)可以访问,生产区的设备全天可以访问. 2,生产区不允许访问互联网,办公区和游客区允许访问互联网 3,办公区设备10.0.2.10不允许访问DMz区的FTP服务器和HTTP服务器,仅能ping通10.0.3.10 4,办公区…

【运维】docker批量删除临时镜像(两种方式)

docker批量删除Tag<none>的临时镜像 在开发的时候&#xff0c;需要经常发布开发包&#xff0c;在使用docker build构建镜像的时候&#xff0c;同一个版本经常会使用相同tag&#xff0c;频繁打包一段时间后&#xff0c;本地会出现很多Tag<none>的临时镜像&#xff…

WordPress知识付费系统+自动采集插件

采集功能&#xff1a; 1.支持分类替换 将主站同步过来的文章分类进行替换 2.支持自定义文章作者&#xff08;选择多个作者则同步到的文章作者将会随机分配&#xff09; 3.支持添加黑名单分类 添加后 如果同步过来的文章包含黑名单分类将不会发布文章 4.自动检测同步&#xf…

禁止使用存储过程

优质博文&#xff1a;IT-BLOG-CN 灵感来源 什么是存储过程 存储过程Stored Procedure是指为了完成特定功能的SQL语句集&#xff0c;经编译后存储在数据库中&#xff0c;用户可通过指定存储过程的名字并给定参数&#xff08;如果该存储过程带有参数&#xff09;来调用执行。 …