spark12-13-14

news2025/1/11 1:49:36

12. Task线程安全问题

12.1 现象和原理

在一个Executor可以同时运行多个Task,如果多个Task使用同一个共享的单例对象,如果对共享的数据同时进行读写操作,会导致线程不安全的问题,为了避免这个问题,可以加锁,但效率变低了,因为在一个Executor中同一个时间点只能有一个Task使用共享的数据,这样就变成了串行了,效率低!

12.2 案例

定义一个工具类object,格式化日期,因为SimpleDateFormat线程不安全,会出现异常

Scala
val conf = new SparkConf()
  .setAppName("WordCount")
  .setMaster("local[*]") //本地模式,开多个线程
//1.创建SparkContext
val sc = new SparkContext(conf)

val lines = sc.textFile("data/date.txt")

val timeRDD: RDD[Long] = lines.map(e => {
  //将字符串转成long类型时间戳
  //使用自定义的object工具类
  val time: Long = DateUtilObj.parse(e)
  time
})

val res = timeRDD.collect()
println(res.toBuffer)

Scala
object DateUtilObj {

  //多个Task使用了一个共享的SimpleDateFormat,SimpleDateFormat是线程不安全

  val sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")

  //线程安全的
  //val sdf: FastDateFormat = FastDateFormat.getInstance("yyyy-MM-dd HH:mm:ss")

  def parse(str: String): Long = {
    //2022-05-23 11:39:30
    sdf.parse(str).getTime
  }

}

上面的程序会出现错误,因为多个Task同时使用一个单例对象格式化日期,报错,如果加锁,程序会变慢,改进后的代码:

Scala
val conf = new SparkConf()
  .setAppName("WordCount")
  .setMaster("local[*]") //本地模式,开多个线程
//1.创建SparkContext
val sc = new SparkContext(conf)

val lines = sc.textFile("data/date.txt")

val timeRDD = lines.mapPartitions(it => {
  //一个Task使用自己单独的DateUtilClass实例,缺点是浪费内存资源
  val dataUtil = new DateUtilClass
  it.map(e => {
    dataUtil.parse(e)
  })
})

val res = timeRDD.collect()
println(res.toBuffer)

Scala
class DateUtilClass {

  val sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")

  def parse(str: String): Long = {
    //2022-05-23 11:39:30
    sdf.parse(str).getTime
  }
}

改进后,一个Task使用一个DateUtilClass实例,不会出现线程安全的问题。

13. 累加器

累加器是Spark中用来做计数功能的,在程序运行过程当中,可以做一些额外的数据指标统计

需求:在处理数据的同时,统计一下指标数据,具体的需求为:将RDD中对应的每个元素乘以10,同时在统计每个分区中偶数的数据

13.1 不使用累加器的方案

需要多次触发Action,效率低,数据会被重复计算

Scala
/**
 * 不使用累加器,而是触发两次Action
 */
object C12_AccumulatorDemo1 {

  def main(args: Array[String]): Unit = {

    val conf = new SparkConf()
      .setAppName("WordCount")
      .setMaster("local[*]") //本地模式,开多个线程
    //1.创建SparkContext
    val sc = new SparkContext(conf)

    val rdd1 = sc.parallelize(List(1,2,3,4,5,6,7,8,9), 2)
    //对数据进行转换操作(将每个元素乘以10),同时还要统计每个分区的偶数的数量
    val rdd2 = rdd1.map(_ * 10)
    //第一次触发Action
    rdd2.saveAsTextFile("out/111")

    //附加的指标统计
    val rdd3 = rdd1.filter(_ % 2 == 0)
    //第二个触发Action
    val c = rdd3.count()
    println(c)
  }
}

13.2 使用累加器的方法

触发一次Action,并且将附带的统计指标计算出来,可以使用Accumulator进行处理,Accumulator的本质数一个实现序列化接口class,每个Task都有自己的累加器,避免累加的数据发送冲突

Scala
object C14_AccumulatorDemo3 {

  def main(args: Array[String]): Unit = {

    val conf = new SparkConf()
      .setAppName("WordCount")
      .setMaster("local[*]") //本地模式,开多个线程
    //1.创建SparkContext
    val sc = new SparkContext(conf)

    val rdd1 = sc.parallelize(List(1,2,3,4,5,6,7,8,9), 2)
    //在Driver定义一个特殊的变量,即累加器
    //Accumulator可以将每个分区的计数结果,通过网络传输到Driver,然后进行全局求和
    val accumulator: LongAccumulator = sc.longAccumulator("even-acc")
    val rdd2 = rdd1.map(e => {
      if (e % 2 == 0) {
        accumulator.add(1)  //闭包,在Executor中累计的
      }
      e * 10
    })

    //就触发一次Action
    rdd2.saveAsTextFile("out/113")

    //每个Task中累计的数据会返回到Driver吗?
    println(accumulator.count)
  }
}

14. StandAlone的两种执行模式

spark自动的StandAlone集群有两种运行方式,分别是client模式和cluster模式,默认使用的是client模式。两种运行模式的本质区别是,Driver运行在哪里了

14.1 什么是Driver

Driver本意是驱动的意思(类似叫法的有MySQL的连接驱动),在就是与集群中的服务建立连接,执行一些命令和请求的。但是在Spark的Driver指定就是SparkContext和里面创建的一些对象,所有可以总结为,SparkContext在哪里创建,Driver就在哪里。Driver中包含很多的对象实例,有SparkContext,DAGScheduler、TaskScheduler、ShuffleManager、BroadCastManager等,Driver是对这些对象的统称。

14.2 client模式

Driver运行在用来提交任务的SparkSubmit进程中,在Spark的stand alone集群中,提交spark任务时,可以使用cluster模式即--deploy-mode client (默认的)

 

 

注意:spark-shell只能以client模式运行,不能以cluster模式运行,因为提交任务的命令行客户端和SparkContext必须在同一个进程中。

 

14.3 cluster模式

Driver运行在Worker启动的一个进程中,这个进程叫DriverWapper,在Spark的stand alone集群中,提交spark任务时,可以使用cluster模式即--deploy-mode cluster

特点:Driver运行在集群中,不在SparkSubmit进程中,需要将jar包上传到hdfs中

Shell
spark-submit --master spark://node-1.51doit.cn:7077 --class cn._51doit.spark.day01.WordCount --deploy-mode cluster hdfs://node-1.51doit.cn:9000/jars/spark10-1.0-SNAPSHOT.jar hdfs://node-1.51doit.cn:9000/wc hdfs://node-1.51doit.cn:9000/out002

 

cluster模式的特点:可以给Driver更灵活的指定一些参数,可以给Driver指定内存大小,cores的数量

如果一些运算要在Driver进行计算,或者将数据收集到Driver端,这样就必须指定Driver的内存和cores更大一些

Shell
# 指定Driver的内存,默认是1g
--driver-memory MEM         Memory for driver (e.g. 1000M, 2G) (Default: 1024M).
# 指定Driver的cores,默认是1
--driver-cores NUM          Number of cores used by the driver, only in cluster mode (Default: 1).

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/688167.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

操作系统—中断和异常、磁盘调度算法、操作系统其他内容

异常 时常由CPU*执行指令的内部事件引起,比如非法操作码、地址越界、算术溢出等,还有缺页异常、除0异常。同时,他会发送给内核,要求内核处理这些异常。 外中断 狭义上的中断指的就是外中断。由CPU执行指令以外的事件引起&#…

linux高并发网络编程开发(广播-组播-本地套接字)14_tcp udp使用场景,广播通信流程,组播通信流程,本地套接字通信流程,epoll反应堆模型

01 tcp udp使用场景 1.tcp使用场景 对数据安全性要求高的时候  登录数据的传输  文件传输http协议  传输层协议-tcp 2.udp使用场景 效率高-实时性要求比较高  视频聊天  通话有实力的大公司  使用upd  在应用层自定义协议,做数据校验 02 广播通信流程 广播…

LLM 开发实战系列 | 01:API进行在线访问和部署

在本文中,我们将使用Python编程语言来展示如何调用OpenAI的GPT-3.5模型。在开始之前,请确保您已经注册了OpenAI API并获得了访问凭证。 环境准备 下载python 方法1:官网 www.python.org 从最开始的开始,先到Python官网下载一个…

零基础自学:2023年的今天,请谨慎进入网络安全行业

前言 2023年的今天,慎重进入网安行业吧,目前来说信息安全方向的就业对于学历的容忍度比软件开发要大得多,还有很多高中被挖过来的大佬。 理由很简单,目前来说,信息安全的圈子人少,985、211院校很多都才建…

Linux中安装部署docker

目录 什么是docker系统环境要求安装和使用docker 什么是docker Docker是一个开源的容器化平台,用于帮助开发者更轻松地构建、打包、分发和运行应用程序。它基于容器化技术,利用操作系统层级的虚拟化来隔离应用程序和其依赖的环境。通过使用Docker&#…

javaEE进阶 -初识框架

目录 1.为什么要学框架? 框架的优点展示 2、项目的开发 2.1 Servlet 项目的开发 2.2 Spring Boot 项目开发 3 、Spring Boot编写代码 4、 Spring Boot 运行项目 5、验证程序 6、发布项目 主要讲解 四个框架。 1、Spring 2、Spring Boot 3、Spring NVC 4、…

别只盯着Docker了,这十大容器运行时错过后悔

文章目录 一、Docker二、Containerd三、CRI-O四、Firecracker五、gVisor六、Kata七、Lima八、Lxd九、rkt十、runC如何选择适合自己的容器运行时? MCNU云原生,文章首发地,欢迎微信搜索关注,更多干货,第一时间掌握&#…

Apikit 自学日记:数据结构

您可以将API文档中的重复部分提取出来成为数据结构,方便其他文档中复用。当数据结构发生改变时,所有引用了该数据结构的API文档会同步发生改变。 创建数据结构 进入数据结构管理页面,点击 添加数据结构 按钮,输入相关内容并保存…

XXL-JOB任务调度

简介: XXL-JOB 是一个分布式任务调度平台,其核心设计目标是开发迅速、学习简单、轻量级、易扩展。 官网:https://www.xuxueli.com/xxl-job/ 以下业务场景可用任务解决 某电商平台需要每天上午10点,下午3点,晚上8点发…

2023 高质量 Java 面试题集锦:高级 Java 工程师面试八股汇总

人人都想进大厂,当然我也不例外。早在春招的时候我就有向某某某大厂投岗了不少简历,可惜了,疫情期间都是远程面试,加上那时自身也有问题,导致屡投屡败。突然也意识到自己肚子里没啥货,问个啥都是卡卡卡卡&a…

炫技亮点 Websocket集群解决方案汇总

文章目录 问题方案方案一:~~Session共享~~(不可行)方案二:负载均衡器(状态路由)方案三:广播机制(异步方式 - 建议)方案四:路由转发(同步方式&…

【JS经验分享】你真的会写JS吗?满满干货,建议读三遍(2)

大家好,最近准备总结一下JS的经验,分享分享,有不对的欢迎讨论哈~ JS作为前端的基础技能,每一位前端开发都要运用熟练,但你真的会写JS吗?js全称JavaScript,是运行在浏览器上的脚本语言&#xff0…

【高危】Nuxt.js <3.4.3 远程代码执行漏洞(POC公开)

漏洞描述 Nuxt.js(简称 Nuxt)是一个基于 Vue.js 的通用应用框架,用于构建服务端渲染的应用程序(SSR)和静态生成的网站。 Nuxt.js 3.4.3之前版本中的 test-component-wrapper 组件的动态导入函数存在代码注入漏洞,当服务器在开发…

Java集合流式编程

一、简介 1、什么是集合流式编程 集合流式编程(Stream API)是Java 8引入的一个功能强大的特性,它提供了一种更简洁、更高效的方式来操作集合数据。它的设计目标是让开发者能够以一种更声明式的风格来处理集合数据,减少了显式的迭…

Ubuntu部署jmeter与ant

为了整合接口自动化的持续集成工具,我将jmeter与ant都部署在了Jenkins容器中,并配置了build.xml 一、ubuntu部署jdk 1:先下载jdk-8u74-linux-x64.tar.gz,上传到服务器,这里上传文件用到了ubuntu 下的 lrzsz。 ubunt…

WordPress 备份插件 BackUpWordPress

WordPress备份是一件必不可少的事情,毕竟自己辛辛苦苦花了很多时间精力写得博客,经验总结,必须保留传承。WordPress备份可以在发生灾难性情况(比如劫持或意外锁定)下迅速恢复,确保了网站安全。 BackUpWord…

揭示不断增长的预切蔬菜市场:深入研究行业驱动因素和挑战

随着现代社会的快节奏和人们生活压力的增加,越来越多的人选择预制菜作为饮食解决方案,预制菜已经成为餐饮行业的新兴赛道。预制菜的优点包括方便快捷、卫生安全、节省时间、质量可靠,以及丰富的菜品选择和灵活的烹饪和食用方式,满…

基于SpringCloud微服务流动资金贷款业务系统设计与实现

一、引言 由于传统的贷款业务系统并不能够顺应时代的变化,同时在一定程度上对业务发展进行了限制,所以为了适应时代的发展,信息贷款业务应该能够被产品化、丰富化,同时还需要制定一套特定的流程来满足新时代用户的需求。流程化的规范管理是当今银行业务发展的必然趋势,研究并开…

基于Stable Diffusion的2D游戏关卡生成【实战】

接下来的几篇文章将与常规主题有所不同(这是在从事通用机器人技术的职业中吸取的教训)。 相反,我决定利用我的一些新空闲时间 1 边做边学,并使用所有酷孩子都在谈论的一些很酷的新 ML。 推荐:用 NSDT设计器 快速搭建可…

分割回文串-ii

分割回文串-ii 题目链接:分割回文串-ii 思路:分割字符串s,使得子串都是回文串,最后获得最小分割次数。那么我们可以不断把字符串缩短,判断子串是否可以被分割成回文串,并且最小分割次数。这就是子问题分割…