【Spark系列3】RDD源码解析实战

news2025/3/11 15:20:50

本文主要讲

1、什么是RDD

2、RDD是如何从数据中构建

一、什么是RDD?

RDD:弹性分布式数据集,Resillient Distributed Dataset的缩写。

个人理解:RDD是一个容错的、并行的数据结构,可以让用户显式的将数据存储到磁盘和内存中,并能控制数据的分区。同时RDD还提供一组丰富的API来操作它。本质上,RDD是一个只读的分区集合,一个RDD可以包含多个分区,每个分区就是一个dataset片段。RDD可以互相依赖

二、RDD是如何从数据中构建

2.1、RDD源码

Internally, each RDD is characterized by five main properties

  • A list of pattitions

  • A function for computing each split

  • A list of dependencies on each RDDs

  • optionally, a partitioner for key-value RDDs(e.g. to say that RDD is hash-partitioned)

  • optionally, a list of preferred locations to compute each split on (e.g. block locations for an HDFS file)

RDD基本都有这5个特性:

1、每个RDD 都会有 一个分片列表。 就是可以被切分,和hadoop一样,能够被切分的数据才能并行计算

2、有一个函数计算每一个分片。这里是指下面会提到的compute函数

3、对其他RDD的依赖列表。依赖区分宽依赖和窄依赖

4、可选:key-value类型的RDD是根据hash来分区的,类似于mapreduce当中的partitioner接口,控制哪个key分到哪个reduce

5、可选:每一个分片的有效计算位置(preferred locations),比如HDFS的block的所在位置应该是优先计算的位置

2.2、宽窄依赖

如果一个RDD的每个分区最多只能被一个Child RDD的一个分区所使用, 则称之为窄依赖(Narrow dependency), 如果被多个Child RDD分区依赖, 则称之为宽依赖(wide dependency)

例如 map、filter是窄依赖, 而join、groupby是宽依赖

2.3、源码分析

RDD的5个特征会对应到源码中的 4个方法 和一个属性

RDD.scala是一个总的抽象,不同的子类会对下面的方法进行定制化的实现。比如compute方法,不同子类在实现的时候是不同的。

// 该方法只会被调用一次。由子类实现,返回这个RDD下的所有Partition
protected def getPartitions: Array[Partition]
​
// 该方法只会被调用一次。计算该RDD和父RDD的关系
protected def getDenpendencies: Seq[Dependency[_]] = deps
​
//对分区进行计算,返回一个可遍历的结果
def compute(split: Partition, context: TaskContext): Iterator[T]
​
//可选的,指定优先位置,输入参数是split分片,输出结果是一组优先的节点位置
protected def getPreferredLocations(split: Partition): Seq(String)= Nil
​
// 可选的,分区的方法,针对第4点,控制分区的计算规则
@transient val partitioner: Option[Partitioner] = None

拿官网上的workcount举例:

val textFile = sc.textFile("文件目录/test.txt")
val counts = textFile.flatMap(line => line.split(" "))
                 .filter(_.length >= 2)
                 .map(word => (word, 1))
                 .reduceByKey(_ + _)
counts.saveAsTextFile("hdfs://...")

这里涉及到几个RDD的转换

1、textfile是一个hadoopRDD经过map转换后的MapPartitionsRDD,

2、经过flatMap后仍然是一个MapPartitionsRDD

3、经过filter方法之后生成了一个新的MapPartitionRDD

4、经过map函数之后,继续是一个MapPartitionsRDD

5、经过最后一个reduceByKey编程了ShuffleRDD

文件分为一个part1,part2,part3经过spark读取之后就变成了HadoopRDD,再按上面流程理解即可

2.3.1、代码分析:SparkContext 类

本次只看textfile方法,注释上说明

Read a text file from HDFS, a local file system (available on all nodes), or any Hadoop-supported file system URI, and return it as an RDD of Strings.
​
读取text文本从hdfs上、本地文件系统,或者hadoop支持的文件系统URI中, 返回一个String类型的RDD

看代码:

hadoopFile最后返回的是一个HadoopRDD对象,然后经过map变换后,转换成MapPartitionsRDD,鱿鱼HadoopRDD没有重写map函数,所以调用的是父类的RDD的map

def textFile(path: String,
      minPartitions: Int = defaultMinPartitions): RDD[String] = withScope {
    assertNotStopped() // 忽略不看
    
    hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text], minPartitions)
      .map(pair => pair._2.toString).setName(path)
  }

看下hadoopFile方法

1、广播hadoop的配置文件

2、设置文件的输入格式之类的,也决定的文件的读取方式

3、new HadoopRDD,并返回

def hadoopFile[K, V](path: String,
      inputFormatClass: Class[_ <: InputFormat[K, V]],
      keyClass: Class[K],
      valueClass: Class[V],
      minPartitions: Int = defaultMinPartitions): RDD[(K, V)] = withScope {
    assertNotStopped()
​
    // 做一些校验
    FileSystem.getLocal(hadoopConfiguration)
​
    // A Hadoop configuration can be about 10 KiB, which is pretty big, so broadcast it.
    val confBroadcast = broadcast(new SerializableConfiguration(hadoopConfiguration))
    val setInputPathsFunc = (jobConf: JobConf) => FileInputFormat.setInputPaths(jobConf, path)
    new HadoopRDD(
      this,
      confBroadcast,
      Some(setInputPathsFunc),
      inputFormatClass,
      keyClass,
      valueClass,
      minPartitions).setName(path)
  }

2.3.2、源码分析:HadoopRDD类

先看注释

An RDD that provides core functionality for reading data stored in Hadoop (e.g., files in HDFS, sources in HBase, or S3), using the older MapReduce API (org.apache.hadoop.mapred).

看注释可以知道,HadoopRDD是一个专为Hadoop(HDFS、Hbase、S3)设计的RDD。使用的是以前的MapReduce 的API来读取的。

HadoopRDD extends RDD[(K, V)] 重写了RDD中的三个方法

override def compute(theSplit: Partition, context: TaskContext): InterruptibleIterator[(K, V)] = {}
​
override def getPartitions: Array[Partition] = {}
​
override def getPreferredLocations(split: Partition): Seq[String] = {}

分别来看一下

HadoopRDD#getPartitions

1、读取配置文件

2、通过inputFormat自带的getSplits方法来计算分片,获取所有的Splits

3、创建HadoopPartition的List并返回

这里是不是可以理解,Hadoop中的一个分片,就对应到Spark中的一个Partition

override def getPartitions: Array[Partition] = {
  val jobConf = getJobConf()
    // add the credentials here as this can be called before SparkContext initialized
    SparkHadoopUtil.get.addCredentials(jobConf)
    try {
      // 通过配置的文件读取方式获取所有的Splits
      val allInputSplits = getInputFormat(jobConf).getSplits(jobConf, minPartitions)
      val inputSplits = if (ignoreEmptySplits) {
        allInputSplits.filter(_.getLength > 0)
      } else {
        allInputSplits
      }
      // 创建Partition的List
      val array = new Array[Partition](inputSplits.size)
      for (i <- 0 until inputSplits.size) {
        // 创建HadoopPartition
        array(i) = new HadoopPartition(id, i, inputSplits(i))
      }
      array
    } catch {
      异常处理
    }
}

HadoopRDD#compute

compute的作用主要是 根据输入的partition信息生成一个InterruptibleIterator。

iter中的逻辑主要是

1、把Partition转成HadoopPartition,通过InputSplit创建一个RecordReader

2、重写Iterator的getNext方法,通过创建的reader调用next方法读取下一个值

compute方法通过Partition来获取Iterator接口,以遍历Partition的数据

override def compute(theSplit: Partition, context: TaskContext): InterruptibleIterator[(K, V)] = {
    val iter = new NextIterator[(K, V)] {...}
    new InterruptibleIterator[(K, V)](context, iter)
  }
 override def compute(theSplit: Partition, context: TaskContext): InterruptibleIterator[(K, V)] = {
​
 val iter = new NextIterator[(K, V)] {
​
      //将compute的输入theSplit,转换为HadoopPartition
      val split = theSplit.asInstanceOf[HadoopPartition]
      ......
      //c重写getNext方法
      override def getNext(): (K, V) = {
        try {
          finished = !reader.next(key, value)
        } catch {
          case _: EOFException if ignoreCorruptFiles => finished = true
        }
        if (!finished) {
          inputMetrics.incRecordsRead(1)
        }
        (key, value)
      }
     }
}

HadoopRDD#getPreferredLocations

getPreferredLocations方法比较简单,直接调用SplitInfoReflections下的inputSplitWithLocationInfo方法获得所在的位置。

override def getPreferredLocations(split: Partition): Seq[String] = {
  val hsplit = split.asInstanceOf[HadoopPartition].inputSplit.value
  val locs: Option[Seq[String]] = HadoopRDD.SPLIT_INFO_REFLECTIONS match {
    case Some(c) =>
      try {
        val lsplit = c.inputSplitWithLocationInfo.cast(hsplit)
        val infos = c.getLocationInfo.invoke(lsplit).asInstanceOf[Array[AnyRef]]
        Some(HadoopRDD.convertSplitLocationInfo(infos))
      } catch {
        case e: Exception =>
          logDebug("Failed to use InputSplitWithLocations.", e)
          None
      }
    case None => None
  }
  locs.getOrElse(hsplit.getLocations.filter(_ != "localhost"))
}

2.3.3、源码分析:MapHadoopRDD类
An RDD that applies the provided function to every partition of the parent RDD.

经过RDD提供的function处理后的 父RDD 将会变成MapHadoopRDD

MapHadoopRDD重写了父类的partitioner、getPartitions和compute方法

private[spark] class MapPartitionsRDD[U: ClassTag, T: ClassTag](
    var prev: RDD[T],
    f: (TaskContext, Int, Iterator[T]) => Iterator[U],  // (TaskContext, partition index, iterator)
    preservesPartitioning: Boolean = false)
  extends RDD[U](prev) {
  override val partitioner = if (preservesPartitioning) firstParent[T].partitioner else None
  override def getPartitions: Array[Partition] = firstParent[T].partitions
  override def compute(split: Partition, context: TaskContext): Iterator[U] =
    f(context, split.index, firstParent[T].iterator(split, context))
  override def clearDependencies() {
    super.clearDependencies()
    prev = null
  }
}

在partitioner、getPartitions、compute中都用到了一个firstParent函数,可以看到,在MapPartition中并没有重写partitioner和getPartitions方法,只是从firstParent中取了出来

再看下firstParent是干什么的,其实就是取的父依赖

/** Returns the first parent RDD */
protected[spark] def firstParent[U: ClassTag]: RDD[U] = {
  dependencies.head.rdd.asInstanceOf[RDD[U]]
}

再看一下MapPartitionsRDD继承的RDD,它继承的是RDD[U] (prev),这里的prev指的是我们的HadoopRDD,也就是说HadoopRDD变成了我们这个MapPartitionRDD的OneToOneDependency依赖,OneToOneDependency是窄依赖

def this(@transient oneParent: RDD[_]) =
    this(oneParent.context , List(new OneToOneDependency(oneParent)))

再来看map方法

/**
 * Return a new RDD by applying a function to all elements of this RDD.
 * 通过将函数应用于新RDD的所有元素,返回新的RDD。
 */
def map[U: ClassTag](f: T => U): RDD[U] = withScope {
  val cleanF = sc.clean(f)
  new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.map(cleanF))
}

flatMap方法

/**
 *  Return a new RDD by first applying a function to all elements of this
 *  RDD, and then flattening the results.
 */
def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U] = withScope {
  val cleanF = sc.clean(f)
  new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.flatMap(cleanF))
}

filter方法

/**
  * Return a new RDD containing only the elements that satisfy a predicate.
  * 返回仅包含满足表达式 的元素的新RDD。
  */
 def filter(f: T => Boolean): RDD[T] = withScope {
   val cleanF = sc.clean(f)
   new MapPartitionsRDD[T, T](
     this,
     (context, pid, iter) => iter.filter(cleanF),
     preservesPartitioning = true)
 }

观察代码发现,他们返回的都是MapPartitionsRDD对象,不同的仅仅是传入的function不同而已,经过前面的分析,这些都是窄依赖

注意:这里我们可以明白了MapPartitionsRDD的compute方法的作用了:

1、在没有依赖的条件下,根据分片的信息生成遍历数据的iterable接口

2、在有前置依赖的条件下,在父RDD的iterable接口上给遍历每个元素的时候再套上一个方法

2.3.4、源码分析:PairRDDFunctions 类

接下来,该reduceByKey操作了。它在PairRDDFunctions里面

reduceByKey稍微复杂一点,因为这里有一个同相同key的内容聚合的一个过程,它调用的是combineByKey方法。

/**
   * Merge the values for each key using an associative reduce function. This will also perform
   * the merging locally on each mapper before sending results to a reducer, similarly to a
   * "combiner" in MapReduce.
   */
  def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)] = self.withScope {
    combineByKeyWithClassTag[V]((v: V) => v, func, func, partitioner)
  }
​
    /**
   * Generic function to combine the elements for each key using a custom set of aggregation
   泛型函数,将每个key的元素 通过自定义的聚合 来组合到一起
   * functions. Turns an RDD[(K, V)] into a result of type RDD[(K, C)], for a "combined type" C
   *
   * Users provide three functions:
   *
   *  - `createCombiner`, which turns a V into a C (e.g., creates a one-element list)
   *  - `mergeValue`, to merge a V into a C (e.g., adds it to the end of a list)
   *  - `mergeCombiners`, to combine two C's into a single one.
   *
   * In addition, users can control the partitioning of the output RDD, and whether to perform
   * map-side aggregation (if a mapper can produce multiple items with the same key).
   *
   * @note V and C can be different -- for example, one might group an RDD of type
   * (Int, Int) into an RDD of type (Int, Seq[Int]).
   */
  def combineByKeyWithClassTag[C](
      createCombiner: V => C,
      mergeValue: (C, V) => C,
      mergeCombiners: (C, C) => C,
      partitioner: Partitioner,
      mapSideCombine: Boolean = true,
      serializer: Serializer = null)(implicit ct: ClassTag[C]): RDD[(K, C)] = self.withScope {
    require(mergeCombiners != null, "mergeCombiners must be defined") // required as of Spark 0.9.0
    // 判断keyclass是不是array类型,如果是array并且在两种情况下throw exception。
    if (keyClass.isArray) {
      if (mapSideCombine) {
        throw SparkCoreErrors.cannotUseMapSideCombiningWithArrayKeyError()
      }
      if (partitioner.isInstanceOf[HashPartitioner]) {
        throw SparkCoreErrors.hashPartitionerCannotPartitionArrayKeyError()
      }
    }
    val aggregator = new Aggregator[K, V, C](
      self.context.clean(createCombiner),
      self.context.clean(mergeValue),
      self.context.clean(mergeCombiners))
    //虽然不太明白,但是此处基本上一直是false,感兴趣的看后面的参考文章
    if (self.partitioner == Some(partitioner)) {
      self.mapPartitions(iter => {
        val context = TaskContext.get()
        new InterruptibleIterator(context, aggregator.combineValuesByKey(iter, context))
      }, preservesPartitioning = true)
    } else {
      // 默认走这个方法
      new ShuffledRDD[K, V, C](self, partitioner)
        .setSerializer(serializer)
        .setAggregator(aggregator)
        .setMapSideCombine(mapSideCombine)
    }
  }

2.3.5、源码分析:ShuffledRDD类

看上面代码最后传入了self和partitioner ,并set了三个值,shuffled过程暂时不做解析。这里看下ShuffledRDD的依赖关系(getDependencies方法),它是一个宽依赖

override def getDependencies: Seq[Dependency[_]] = {
    val serializer = userSpecifiedSerializer.getOrElse {
      val serializerManager = SparkEnv.get.serializerManager
      if (mapSideCombine) {
        serializerManager.getSerializer(implicitly[ClassTag[K]], implicitly[ClassTag[C]])
      } else {
        serializerManager.getSerializer(implicitly[ClassTag[K]], implicitly[ClassTag[V]])
      }
    }
    List(new ShuffleDependency(prev, part, serializer, keyOrdering, aggregator, mapSideCombine))
  }

总结:我们讲了RDD的基本组成结构,也通过一个wordcount程序举例来说明代码是如果运行的,希望大家可以从源码入手,学习spark,共勉!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1416606.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

FPGA HDMI IP之DDC(本质I2C协议)通道学习

目的&#xff1a; 使用KingstVIS逻辑分析仪软件分析HDMI的DDC通道传输的SCDC数据&#xff08;遵循I2C协议&#xff09;&#xff0c;同时学习了解SCDC的寄存器与I2C通信协议。 部分英文缩写&#xff1a; HDMIHigh Definition Multi-media Interface高清多媒体接口DDCDisplay Dat…

css文本水波效果

<!DOCTYPE html> <html lang"en"><head><meta charset"UTF-8" /><meta name"viewport" content"widthdevice-width, initial-scale1.0" /><title>文本水波效果</title><style>* {mar…

网际协议 IP、IP地址

目录 网际协议 IP 虚拟互连网络 使用中间设备进行互连 IP 地址 IP 地址及其表示方法 ​编辑 IP 地址采用 2 级结构 IP 地址的编址方法 分类的 IP 地址 各类 IP 地址的指派范围 一般不使用的特殊的 IP 地址 IPv4网络中的地址类型 分类的 IP 地址的优点和缺点 划分子网…

unity学习笔记----游戏练习05

一、阳光的收集和搜集动画开发 1.收集阳光的思路&#xff1a;当鼠标点击到阳光的时候&#xff0c;就可以进行收集了。可以通过为添加一个碰撞器来检测Circle Collider 2D 编写脚本&#xff1a; 在SunManager中写一个增加阳光的方法 //增加阳光 public void AddSubSun(in…

C++笔记之奇异递归模板模式CRTP(Curiously Recurring Template Pattern)和静态多态

C++笔记之奇异递归模板模式CRTP(Curiously Recurring Template Pattern)和静态多态 —— 杭州 2024-01-28 code review! 文章目录 C++笔记之奇异递归模板模式CRTP(Curiously Recurring Template Pattern)和静态多态一.CRTP二.CRTP 的基本特征表现:基类是一个模板类;派生…

(免费领源码)java#Springboot#mysql旅游景点订票系统68524-计算机毕业设计项目选题推荐

摘 要 科技进步的飞速发展引起人们日常生活的巨大变化&#xff0c;电子信息技术的飞速发展使得电子信息技术的各个领域的应用水平得到普及和应用。信息时代的到来已成为不可阻挡的时尚潮流&#xff0c;人类发展的历史正进入一个新时代。在现实运用中&#xff0c;应用软件的工作…

java eazyexcel 实现excel的动态多级联动下拉列表(2)使用MATCH+OFFSET函数

原理 同样是将数据源放到一个新建的隐藏的sheet中&#xff0c;第一行是第一个列表的数据&#xff0c;第二行是每一个有下级菜单的菜单&#xff0c;他下面的行就是他下级菜单的每一值使用MATCH函数从第二行找到上级菜单对应的列根据OFFSET函数从2中获取的列&#xff0c;取得下级…

Linux 系统相关的命令

参考资料 Linux之chmod使用【linux】chmod命令详细用法 目录 一. 系统用户相关1.1 查看当前访问的主机和用户1.2 切换用户1.2.1 设置root用户密码1.2.2 普通用户和root用户切换 1.4 系统状态1.4.1 vmstat 查看当前系统的状态1.4.2 history 查看系统中输入过的命令 二. 系统文件…

【Demo】基于CharacterController组件的角色控制

项目介绍 项目名称&#xff1a;Demo1 项目版本&#xff1a;1.0 游戏引擎&#xff1a;Unity2020.3.26f1c1 IDE&#xff1a;Visual Studio Code 关键词&#xff1a;Unity3D&#xff0c;CharacterController组件&#xff0c;角色控制&#xff0c;自定义按键&#xff0c;Scrip…

Spring 的存储和获取Bean

文章目录 获取 Spring 上下文对象的方式存储 Bean 对象的方式类注解配置扫描路径&#xff08;必须&#xff09;Controller&#xff08;控制器存储&#xff09;Service&#xff08;服务&#xff09;Repository&#xff08;持久层&#xff09;Component&#xff08;工具&#xff…

༺༽༾ཊ—Unity之-04-工厂方法模式—ཏ༿༼༻

首先创建一个项目&#xff0c; 在这个初始界面我们需要做一些准备工作&#xff0c; 建基础通用文件夹&#xff0c; 创建一个Plane 重置后 缩放100倍 加一个颜色&#xff0c; 任务&#xff1a;使用工厂方法模式 创建 飞船模型&#xff0c; 首先资源商店下载飞船模型&#xff0c…

【程序员英语】【美语从头学】初级篇(入门)(笔记)Lesson10(电话会话Ⅱ)

《美语从头学初级入门篇》 注意&#xff1a;被 删除线 划掉的不一定不正确&#xff0c;只是不是标准答案。 文章目录 Lesson 10 Telephone Conversation Ⅱ 电话会话&#xff08;二&#xff09;会话A会话B笔记I would like to do&#xff08;Id like to to do&#xff09;我想…

颠覆半导体:煤炭变身新材料,或将现身下一代CPU

《IEEE Spectrum》报道&#xff0c;一组研究人员正在探索将煤炭作为下一代二维晶体管绝缘材料的潜在替代品&#xff0c;以取代现有的金属氧化物薄膜。如果煤炭能够成功替代现代金属氧化物晶体管&#xff0c;那么这对于半导体行业来说将具有重大意义。 半导体器件正常工作需要依…

【数据结构和算法】--- 二叉树(5)--二叉树OJ题

目录 一、二叉树OJ题1.1 单值二叉树1.2 检查两颗树是否相同1.3 对称二叉树1.4 另一颗树的子树1.5 平衡二叉树 二、概念选择题 一、二叉树OJ题 1.1 单值二叉树 题目描述&#xff1a; 如果二叉树每个节点都具有相同的值&#xff0c;那么该二叉树就是单值二叉树。只有给定的树是…

AIGC知识速递——Google的Bert模型是如何fine-tuning的?

Look&#xff01;&#x1f440;我们的大模型商业化落地产品&#x1f4d6;更多AI资讯请&#x1f449;&#x1f3fe;关注Free三天集训营助教在线为您火热答疑&#x1f469;&#x1f3fc;‍&#x1f3eb; 选择合适的预训练模型&#xff1a; 从预训练的BERT模型开始&#xff0c;例如…

解决PyCharm的Terminal终端conda环境默认为base无法切换的问题

问题描述 在使用PyCharm的Terminal终端时&#xff0c;打开的默认环境为base。 在使用切换命令时&#xff0c;依旧无法解决。 解决方法 1、输入以下命令以查看conda的配置信息&#xff1a; conda config --show2、在输出中找到 auto_activate_base 的行&#xff0c;发现被…

【Linux 基础】常用基础指令(上)

文章目录 一、 创建新用户并设置密码二、ls指令ls指令基本概念ls指令的简写操作 三、pwd指令四、cd指令五、touch指令六、rm指令七、mkdir指令八、rmdir 指令 一、 创建新用户并设置密码 ls /home —— 查看存在多少用户 whoami —— 查看当前用户名 adduser 用户名 —— 创建新…

防御保护--第一次实验

目录 一&#xff0c;vlan的划分及在防火墙上创建单臂路由 二&#xff0c;创建安全区域 三&#xff0c;配置安全策略 四&#xff0c;配置认证策略 五&#xff0c;配置NAT策略 1.将内网中各个接口能够ping通自己的网关 2..生产区在工作时间内可以访问服务器区&#xff0c;仅…

解密人工智能:探索机器学习奥秘

&#x1f308;个人主页&#xff1a;聆风吟 &#x1f525;系列专栏&#xff1a;网络奇遇记、数据结构 &#x1f516;少年有梦不应止于心动&#xff0c;更要付诸行动。 文章目录 &#x1f4cb;前言一. 机器学习的定义二. 机器学习的发展历程三. 机器学习的原理四. 机器学习的分类…

探索Pyecharts之美-绘制多彩旭日图的艺术与技巧【第37篇—python:旭日图】

文章目录 引言准备工作绘制基本旭日图调整颜色和样式添加交互功能定制标签和标签格式嵌套层级数据高级样式与自定义进阶主题&#xff1a;动态旭日图数据源扩展&#xff1a;外部JSON文件总结 引言 数据可视化在现代编程中扮演着重要的角色&#xff0c;而Pyecharts是Python中一个…