Spark Transformation转换算子和Action行动算子

news2024/10/1 7:31:40

1、Transformation转换算子

RDD整体上分为Value类型、双Value类型和Key-Value类型

1.1,Value类型

 1.1.1,map()映射

 

object value01_map {

    def main(args: Array[String]): Unit = {

        //1.创建SparkConf并设置App名称
        val conf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")

        //2.创建SparkContext,该对象是提交Spark App的入口
        val sc = new SparkContext(conf)

        //3具体业务逻辑
        // 3.1 创建一个RDD
        val rdd: RDD[Int] = sc.makeRDD(1 to 4,2)

        // 3.2 调用map方法,每个元素乘以2
        val mapRdd: RDD[Int] = rdd.map(_ * 2)

        // 3.3 打印修改后的RDD中数据
        mapRdd.collect().foreach(println)

        //4.关闭连接
        sc.stop()
    }
}

 1.1.2、mapPartitions()以分区为单位执行Map

object value02_mapPartitions {

    def main(args: Array[String]): Unit = {

        //1.创建SparkConf并设置App名称
        val conf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")

        //2.创建SparkContext,该对象是提交Spark App的入口
        val sc = new SparkContext(conf)

        //3具体业务逻辑
        // 3.1 创建一个RDD
        val rdd: RDD[Int] = sc.makeRDD(1 to 4, 2)

        // 3.2 调用mapPartitions方法,每个元素乘以2
        val rdd1 = rdd.mapPartitions(x=>x.map(_*2))

        // 3.3 打印修改后的RDD中数据
        rdd1.collect().foreach(println)

        //4.关闭连接
        sc.stop()
    }
}

 1.1.3,map()和mapPartitions()区别

1.1.4,mapPartitionsWithIndex()带分区号 

 

object value03_mapPartitionsWithIndex {

    def main(args: Array[String]): Unit = {

        //1.创建SparkConf并设置App名称
        val conf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")

        //2.创建SparkContext,该对象是提交Spark App的入口
        val sc = new SparkContext(conf)

        //3具体业务逻辑
        // 3.1 创建一个RDD
        val rdd: RDD[Int] = sc.makeRDD(1 to 4, 2)

        // 3.2 创建一个RDD,使每个元素跟所在分区号形成一个元组,组成一个新的RDD
        val indexRdd = rdd.mapPartitionsWithIndex( (index,items)=>{items.map( (index,_) )} )

        //扩展功能:第二个分区元素*2,其余分区不变
// 3.3 打印修改后的RDD中数据
        indexRdd.collect().foreach(println)

        //4.关闭连接
        sc.stop()
    }
}

1.1.5,flatMap()压平

 

object value04_flatMap {

    def main(args: Array[String]): Unit = {

        //1.创建SparkConf并设置App名称
        val conf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")

        //2.创建SparkContext,该对象是提交Spark App的入口
        val sc = new SparkContext(conf)

        //3具体业务逻辑
        // 3.1 创建一个RDD
        val listRDD=sc.makeRDD(List(List(1,2),List(3,4),List(5,6),List(7)), 2)

        // 3.2 把所有子集合中数据取出放入到一个大的集合中
        listRDD.flatMap(list=>list).collect.foreach(println)

        //4.关闭连接
        sc.stop()
    }
}

1.1.6,glom()分区转换数组

object value05_glom {

def main(args: Array[String]): Unit = {

        //1.创建SparkConf并设置App名称
        val conf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")

        //2.创建SparkContext,该对象是提交Spark App的入口
        val sc = new SparkContext(conf)

        //3具体业务逻辑
        // 3.1 创建一个RDD
        val rdd = sc.makeRDD(1 to 4, 2)

        // 3.2 求出每个分区的最大值  0->1,2   1->3,4
        val maxRdd: RDD[Int] = rdd.glom().map(_.max)

        // 3.3 求出所有分区的最大值的和 2 + 4
        println(maxRdd.collect().sum)

        //4.关闭连接
        sc.stop()
    }
}

 

1.1.7,groupBy()分组

object value06_groupby {

    def main(args: Array[String]): Unit = {

        //1.创建SparkConf并设置App名称
        val conf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")

        //2.创建SparkContext,该对象是提交Spark App的入口
        val sc = new SparkContext(conf)

        //3具体业务逻辑
        // 3.1 创建一个RDD
        val rdd = sc.makeRDD(1 to 4, 2)

        // 3.2 将每个分区的数据放到一个数组并收集到Driver端打印
        rdd.groupBy(_ % 2).collect().foreach(println)

        // 3.3 创建一个RDD
        val rdd1: RDD[String] = sc.makeRDD(List("hello","hive","hadoop","spark","scala"))

        // 3.4 按照首字母第一个单词相同分组
        rdd1.groupBy(str=>str.substring(0,1)).collect().foreach(println)

        sc.stop()
    }
}

groupBy会存在shuffle过程

shuffle:将不同的分区数据进行打乱重组的过程

shuffle一定会落盘。可以在local模式下执行程序,通过4040看效果。

1.1.8,GroupBy之WordCount

object value06_groupby {

    def main(args: Array[String]): Unit = {

        //1.创建SparkConf并设置App名称
        val conf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")

        //2.创建SparkContext,该对象是提交Spark App的入口
        val sc = new SparkContext(conf)

        //3具体业务逻辑
        // 3.1 创建一个RDD
        val strList: List[String] = List("Hello Scala", "Hello Spark", "Hello World")
        val rdd = sc.makeRDD(strList)

        // 3.2 将字符串拆分成一个一个的单词
        val wordRdd: RDD[String] = rdd.flatMap(str=>str.split(" "))

        // 3.3 将单词结果进行转换:word=>(word,1)
        val wordToOneRdd: RDD[(String, Int)] = wordRdd.map(word=>(word, 1))

        // 3.4 将转换结构后的数据分组
        val groupRdd: RDD[(String, Iterable[(String, Int)])] = wordToOneRdd.groupBy(t=>t._1)

        // 3.5 将分组后的数据进行结构的转换
        val wordToSum: RDD[(String, Int)] = groupRdd.map {
            case (word, list) => {
                (word, list.size)
            }
        }

        wordToSum.collect().foreach(println)

        sc.s

 扩展复杂版WordCount

val rdd: RDD[(String, Int)] = sc.makeRDD(List(("Hello Scala", 2), ("Hello Spark", 3), ("Hello World", 2)))

1.1.9,filter()过滤

object value07_filter {

    def main(args: Array[String]): Unit = {

        //1.创建SparkConf并设置App名称
        val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")

        //2.创建SparkContext,该对象是提交Spark App的入口
        val sc: SparkContext = new SparkContext(conf)

        //3.创建一个RDD
        val rdd: RDD[Int] = sc.makeRDD(Array(1, 2, 3, 4),2)

        //3.1 过滤出符合条件的数据
        val filterRdd: RDD[Int] = rdd.filter(_ % 2 == 0)

        //3.2 收集并打印数据
        filterRdd.collect().foreach(println)

        //4 关闭连接
        sc.stop()
    }
}

 

1.1.10,sample()采样

object value08_sample {

    def main(args: Array[String]): Unit = {

        //1.创建SparkConf并设置App名称
        val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")

        //2.创建SparkContext,该对象是提交Spark App的入口
        val sc: SparkContext = new SparkContext(conf)

        //3具体业务逻辑
        // 3.1 创建一个RDD
        val rdd: RDD[Int] = sc.makeRDD(1 to 10)

        // 3.2 打印放回抽样结果
        rdd.sample(true, 0.4, 2).collect().foreach(println)

        // 3.3 打印不放回抽样结果
        rdd.sample(false, 0.2, 3).collect().foreach(println)

        //4.关闭连接
        sc.stop()
    }
}

 

1.1.11,distinct()去重

object value09_distinct {

    def main(args: Array[String]): Unit = {

        //1.创建SparkConf并设置App名称
        val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")

        //2.创建SparkContext,该对象是提交Spark App的入口
        val sc: SparkContext = new SparkContext(conf)

        //3具体业务逻辑
        // 3.1 创建一个RDD
        val distinctRdd: RDD[Int] = sc.makeRDD(List(1,2,1,5,2,9,6,1))

        // 3.2 打印去重后生成的新RDD
        distinctRdd.distinct().collect().foreach(println)

        // 3.3 对RDD采用多个Task去重,提高并发度
        distinctRdd.distinct(2).collect().foreach(println)

        //4.关闭连接
        sc.stop()
    }
}

 

1.1.12,coalesce()重新分区

Coalesce算子包括:配置执行Shuffle和配置不执行Shuffle两种方式。

1、不执行Shuffle方式

object value10_coalesce {

    def main(args: Array[String]): Unit = {

        //1.创建SparkConf并设置App名称
        val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")

        //2.创建SparkContext,该对象是提交Spark App的入口
        val sc: SparkContext = new SparkContext(conf)

        //3.创建一个RDD
        //val rdd: RDD[Int] = sc.makeRDD(Array(1, 2, 3, 4), 4)

        //3.1 缩减分区
        //val coalesceRdd: RDD[Int] = rdd.coalesce(2)

        //4. 创建一个RDD
        val rdd: RDD[Int] = sc.makeRDD(Array(1, 2, 3, 4, 5, 6), 3)
        //4.1 缩减分区
        val coalesceRdd: RDD[Int] = rdd.coalesce(2)

        //5 打印查看对应分区数据
        val indexRdd: RDD[Int] = coalesceRdd.mapPartitionsWithIndex(
            (index, datas) => {
                // 打印每个分区数据,并带分区号
                datas.foreach(data => {
                    println(index + "=>" + data)
                })
                // 返回分区的数据
                datas
            }
        )
        indexRdd.collect()

        //6. 关闭连接
        sc.stop()
    }
}

 2、执行Shuffle方式

//3. 创建一个RDD
val rdd: RDD[Int] = sc.makeRDD(Array(1, 2, 3, 4, 5, 6), 3)
//3.1 执行shuffle
val coalesceRdd: RDD[Int] = rdd.coalesce(2, true)

3、Shuffle原理

 

 

1.1.13,repartition()重新分区(执行Shuffle)


import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object value11_repartition {

    def main(args: Array[String]): Unit = {

        //1.创建SparkConf并设置App名称
        val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")

        //2.创建SparkContext,该对象是提交Spark App的入口
        val sc: SparkContext = new SparkContext(conf)

        //3. 创建一个RDD
        val rdd: RDD[Int] = sc.makeRDD(Array(1, 2, 3, 4, 5, 6), 3)
        //3.1 缩减分区
        //val coalesceRdd: RDD[Int] = rdd.coalesce(2,true)
        
        //3.2 重新分区
        val repartitionRdd: RDD[Int] = rdd.repartition(2)

        //4 打印查看对应分区数据
        val indexRdd: RDD[Int] = repartitionRdd.mapPartitionsWithIndex(
            (index, datas) => {
                // 打印每个分区数据,并带分区号
                datas.foreach(data => {
                    println(index + "=>" + data)
                })
                // 返回分区的数据
                datas
            }
        )
        indexRdd.collect()

        //6. 关闭连接
        sc.stop()
    }
}

 

1.1.14,coalesce和repartition区别

1)coalesce重新分区,可以选择是否进行shuffle过程。由参数shuffle: Boolean = false/true决定。

2)repartition实际上是调用的coalesce,进行shuffle。源码如下:

def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope {
    coalesce(numPartitions, shuffle = true)
}

3)coalesce一般为缩减分区,如果扩大分区,不使用shuffle是没有意义的,repartition扩大分区执行shuffle。

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object value11_repartition {

    def main(args: Array[String]): Unit = {

        //1.创建SparkConf并设置App名称
        val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")

        //2.创建SparkContext,该对象是提交Spark App的入口
        val sc: SparkContext = new SparkContext(conf)

        //3. 创建一个RDD
        val rdd: RDD[Int] = sc.makeRDD(Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11), 3)
        //3.1 合并分区(没有shuffle)
        // coalesce一般为缩减分区,如果扩大分区,不使用shuffle是没有意义的
        //val pRdd: RDD[Int] = rdd.coalesce(4)

        //3.2 重新分区(有shuffle)
        val pRdd: RDD[Int] = rdd.repartition(4)

        //4 打印查看对应分区数据
        val indexRdd: RDD[Int] = pRdd.mapPartitionsWithIndex(
            (index, datas) => {
                // 打印每个分区数据,并带分区号
                datas.foreach(data => {
                    println(index + "=>" + data)
                })
                // 返回分区的数据
                datas
            }
        )
        indexRdd.collect()

        //6. 关闭连接
        sc.stop()
    }
}

1.1.15,sortBy()排序

object value12_sortBy {

    def main(args: Array[String]): Unit = {

        //1.创建SparkConf并设置App名称
        val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")

        //2.创建SparkContext,该对象是提交Spark App的入口
        val sc: SparkContext = new SparkContext(conf)

        //3具体业务逻辑
        // 3.1 创建一个RDD
        val rdd: RDD[Int] = sc.makeRDD(List(2, 1, 3, 4, 6, 5))
        // 3.2 默认是升序排
        val sortRdd: RDD[Int] = rdd.sortBy(num => num)
        sortRdd.collect().foreach(println)

        // 3.3 配置为倒序排
        val sortRdd2: RDD[Int] = rdd.sortBy(num => num, false)
        sortRdd2.collect().foreach(println)

        // 3.4 创建一个RDD
        val strRdd: RDD[String] = sc.makeRDD(List("1", "22", "12", "2", "3"))

        // 3.5 按照字符的int值排序
        strRdd.sortBy(num => num.toInt).collect().foreach(println)

        // 3.5 创建一个RDD
        val rdd3: RDD[(Int, Int)] = sc.makeRDD(List((2, 1), (1, 2), (1, 1), (2, 2)))

        // 3.6 先按照tuple的第一个值排序,相等再按照第2个值排
        rdd3.sortBy(t=>t).collect().foreach(println)

        //4.关闭连接
        sc.stop()
    }
}

 

1.1.16,pipe()调用脚本

 1.2,双Value类型交互

 1.2.1,union()并集

object DoubleValue01_union {

    def main(args: Array[String]): Unit = {

        //1.创建SparkConf并设置App名称
        val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")

        //2.创建SparkContext,该对象是提交Spark App的入口
        val sc: SparkContext = new SparkContext(conf)

        //3具体业务逻辑
        //3.1 创建第一个RDD
        val rdd1: RDD[Int] = sc.makeRDD(1 to 4)

        //3.2 创建第二个RDD
        val rdd2: RDD[Int] = sc.makeRDD(4 to 8)

        //3.3 计算两个RDD的并集
        rdd1.union(rdd2).collect().foreach(println)

        //4.关闭连接
        sc.stop()
    }
}

 

 1.2.2,subtract ()差集

object DoubleValue02_subtract {

    def main(args: Array[String]): Unit = {

        //1.创建SparkConf并设置App名称
        val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")

        //2.创建SparkContext,该对象是提交Spark App的入口
        val sc: SparkContext = new SparkContext(conf)

        //3具体业务逻辑
        //3.1 创建第一个RDD
        val rdd: RDD[Int] = sc.makeRDD(1 to 4)

        //3.2 创建第二个RDD
        val rdd1: RDD[Int] = sc.makeRDD(4 to 8)

        //3.3 计算第一个RDD与第二个RDD的差集并打印
        rdd.subtract(rdd1).collect().foreach(println)

        //4.关闭连接
        sc.stop()
    }
}

 

 1.2.3,intersection()交集

object DoubleValue03_intersection {

    def main(args: Array[String]): Unit = {

        //1.创建SparkConf并设置App名称
        val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")

        //2.创建SparkContext,该对象是提交Spark App的入口
        val sc: SparkContext = new SparkContext(conf)

        //3具体业务逻辑
        //3.1 创建第一个RDD
        val rdd1: RDD[Int] = sc.makeRDD(1 to 4)

        //3.2 创建第二个RDD
        val rdd2: RDD[Int] = sc.makeRDD(4 to 8)

        //3.3 计算第一个RDD与第二个RDD的差集并打印
        rdd1.intersection(rdd2).collect().foreach(println)

        //4.关闭连接
        sc.stop()
    }
}

 

 1.2.4, zip()拉链

 

object DoubleValue04_zip {

    def main(args: Array[String]): Unit = {

        //1.创建SparkConf并设置App名称
        val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")

        //2.创建SparkContext,该对象是提交Spark App的入口
        val sc: SparkContext = new SparkContext(conf)

        //3具体业务逻辑
        //3.1 创建第一个RDD
        val rdd1: RDD[Int] = sc.makeRDD(Array(1,2,3),3)

        //3.2 创建第二个RDD
        val rdd2: RDD[String] = sc.makeRDD(Array("a","b","c"),3)

        //3.3 第一个RDD组合第二个RDD并打印
        rdd1.zip(rdd2).collect().foreach(println)

        //3.4 第二个RDD组合第一个RDD并打印
        rdd2.zip(rdd1).collect().foreach(println)

        //3.5 创建第三个RDD(与1,2分区数不同)
        val rdd3: RDD[String] = sc.makeRDD(Array("a","b"),3)

        //3.6 元素个数不同,不能拉链
        // Can only zip RDDs with same number of elements in each partition
        rdd1.zip(rdd3).collect().foreach(println)

        //3.7 创建第四个RDD(与1,2分区数不同)
        val rdd4: RDD[String] = sc.makeRDD(Array("a","b","c"),2)

        //3.8 分区数不同,不能拉链
        // Can't zip RDDs with unequal numbers of partitions: List(3, 2)
        rdd1.zip(rdd4).collect().foreach(println)

        //4.关闭连接
        sc.stop()
    }
}

1.3,Key-Value类型

1.3.1,partitionBy()按照K重新分区

object KeyValue01_partitionBy {

    def main(args: Array[String]): Unit = {

        //1.创建SparkConf并设置App名称
        val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")

        //2.创建SparkContext,该对象是提交Spark App的入口
        val sc: SparkContext = new SparkContext(conf)

        //3具体业务逻辑
        //3.1 创建第一个RDD
        val rdd: RDD[(Int, String)] = sc.makeRDD(Array((1,"aaa"),(2,"bbb"),(3,"ccc")),3)

        //3.2 对RDD重新分区
        val rdd2: RDD[(Int, String)] = rdd.partitionBy(new org.apache.spark.HashPartitioner(2))

        //3.3 查看新RDD的分区数
        println(rdd2.partitions.size)

        //4.关闭连接
        sc.stop()
    }
}

 HashPartitioner源码解读

class HashPartitioner(partitions: Int) extends Partitioner {
    require(partitions >= 0, s"Number of partitions ($partitions) cannot be negative.")
    
    def numPartitions: Int = partitions
    
    def getPartition(key: Any): Int = key match {
        case null => 0
        case _ => Utils.nonNegativeMod(key.hashCode, numPartitions)
    }
    
    override def equals(other: Any): Boolean = other match {
        case h: HashPartitioner =>
            h.numPartitions == numPartitions
        case _ =>
            false
    }
    
    override def hashCode: Int = numPartitions
}

自定义分区器

object KeyValue01_partitionBy {

    def main(args: Array[String]): Unit = {

        //1.创建SparkConf并设置App名称
        val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")

        //2.创建SparkContext,该对象是提交Spark App的入口
        val sc: SparkContext = new SparkContext(conf)

        //3具体业务逻辑
        //3.1 创建第一个RDD
        val rdd: RDD[(Int, String)] = sc.makeRDD(Array((1, "aaa"), (2, "bbb"), (3, "ccc")), 3)
        //3.2 自定义分区
        val rdd3: RDD[(Int, String)] = rdd.partitionBy(new MyPartitioner(2))

        //4 打印查看对应分区数据
        val indexRdd: RDD[(Int, String)] = rdd3.mapPartitionsWithIndex(
            (index, datas) => {
                // 打印每个分区数据,并带分区号
                datas.foreach(data => {
                    println(index + "=>" + data)
                })
                // 返回分区的数据
                datas
            }
        )

        indexRdd.collect()

        //5.关闭连接
        sc.stop()
    }
}

// 自定义分区
class MyPartitioner(num: Int) extends Partitioner {
    // 设置的分区数
    override def numPartitions: Int = num

    // 具体分区逻辑
    override def getPartition(key: Any): Int = {

        if (key.isInstanceOf[Int]) {

            val keyInt: Int = key.asInstanceOf[Int]
            if (keyInt % 2 == 0)
                0
            else
                1
        }else{
            0
        }
    }
}


1.3.2,reduceByKey()按照K聚合V

object KeyValue02_reduceByKey {

    def main(args: Array[String]): Unit = {

        //1.创建SparkConf并设置App名称
        val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")

        //2.创建SparkContext,该对象是提交Spark App的入口
        val sc: SparkContext = new SparkContext(conf)

        //3具体业务逻辑
        //3.1 创建第一个RDD
        val rdd = sc.makeRDD(List(("a",1),("b",5),("a",5),("b",2)))

        //3.2 计算相同key对应值的相加结果
        val reduce: RDD[(String, Int)] = rdd.reduceByKey((x,y) => x+y)

        //3.3 打印结果
        reduce.collect().foreach(println)

        //4.关闭连接
        sc.stop()
    }
}

 


1.3.3,groupByKey()按照K重新分组

object KeyValue03_groupByKey {

    def main(args: Array[String]): Unit = {

        //1.创建SparkConf并设置App名称
        val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")

        //2.创建SparkContext,该对象是提交Spark App的入口
        val sc: SparkContext = new SparkContext(conf)

        //3具体业务逻辑
        //3.1 创建第一个RDD
        val rdd = sc.makeRDD(List(("a",1),("b",5),("a",5),("b",2)))

        //3.2 将相同key对应值聚合到一个Seq中
        val group: RDD[(String, Iterable[Int])] = rdd.groupByKey()
        
        //3.3 打印结果
        group.collect().foreach(println)
        
        //3.4 计算相同key对应值的相加结果
        group.map(t=>(t._1,t._2.sum)).collect().foreach(println)

        //4.关闭连接
        sc.stop()
    }
}

 


1.3.4,reduceByKey和groupByKey区别

1)reduceByKey:按照key进行聚合,在shuffle之前有combine(预聚合)操作,返回结果是RDD[k,v]。

2)groupByKey:按照key进行分组,直接进行shuffle。

3)开发指导:在不影响业务逻辑的前提下,优先选用reduceByKey。求和操作不影响业务逻辑,求平均值影响业务逻辑。

1.3.5,aggregateByKey()按照K处理分区内和分区间逻辑

 

object KeyValue04_aggregateByKey {

    def main(args: Array[String]): Unit = {

        //1.创建SparkConf并设置App名称
        val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")

        //2.创建SparkContext,该对象是提交Spark App的入口
        val sc: SparkContext = new SparkContext(conf)

        //3具体业务逻辑
        //3.1 创建第一个RDD
        val rdd: RDD[(String, Int)] = sc.makeRDD(List(("a", 3), ("a", 2), ("c", 4), ("b", 3), ("c", 6), ("c", 8)), 2)

        //3.2 取出每个分区相同key对应值的最大值,然后相加
        rdd.aggregateByKey(0)(math.max(_, _), _ + _).collect().foreach(println)

        //4.关闭连接
        sc.stop()
    }
}

 


1.3.6,foldByKey()分区内和分区间相同的aggregateByKey()

object KeyValue05_foldByKey {

    def main(args: Array[String]): Unit = {

        //1.创建SparkConf并设置App名称
        val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")

        //2.创建SparkContext,该对象是提交Spark App的入口
        val sc: SparkContext = new SparkContext(conf)

        //3具体业务逻辑
        //3.1 创建第一个RDD
        val list: List[(String, Int)] = List(("a",1),("a",1),("a",1),("b",1),("b",1),("b",1),("b",1),("a",1))
        val rdd = sc.makeRDD(list,2)

        //3.2 求wordcount
        //rdd.aggregateByKey(0)(_+_,_+_).collect().foreach(println)

        rdd.foldByKey(0)(_+_).collect().foreach(println)

        //4.关闭连接
        sc.stop()
    }
}

 


1.3.7,combineByKey()转换结构后分区内和分区间操作

3)需求说明:创建一个pairRDD,根据key计算每种key的均值。(先计算每个key对应值的总和以及key出现的次数,再相除得到结果)

 

object KeyValue06_combineByKey {

    def main(args: Array[String]): Unit = {

        //1.创建SparkConf并设置App名称
        val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")

        //2.创建SparkContext,该对象是提交Spark App的入口
        val sc: SparkContext = new SparkContext(conf)

        //3.1 创建第一个RDD
        val list: List[(String, Int)] = List(("a", 88), ("b", 95), ("a", 91), ("b", 93), ("a", 95), ("b", 98))
        val input: RDD[(String, Int)] = sc.makeRDD(list, 2)

        //3.2 将相同key对应的值相加,同时记录该key出现的次数,放入一个二元组
        val combineRdd: RDD[(String, (Int, Int))] = input.combineByKey(
            (_, 1),
            (acc: (Int, Int), v) => (acc._1 + v, acc._2 + 1),
            (acc1: (Int, Int), acc2: (Int, Int)) => (acc1._1 + acc2._1, acc1._2 + acc2._2)
        )

        //3.3 打印合并后的结果
        combineRdd.collect().foreach(println)

        //3.4 计算平均值
        combineRdd.map {
            case (key, value) => {
                (key, value._1 / value._2.toDouble)
            }
        }.collect().foreach(println)

        //4.关闭连接
        sc.stop()
    }
}

 


1.3.8,reduceByKey、aggregateByKey、foldByKey、combineByKey

 


1.3.9,sortByKey()按照K进行排序

object KeyValue07_sortByKey {

    def main(args: Array[String]): Unit = {

        //1.创建SparkConf并设置App名称
        val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")

        //2.创建SparkContext,该对象是提交Spark App的入口
        val sc: SparkContext = new SparkContext(conf)

        //3具体业务逻辑
        //3.1 创建第一个RDD
        val rdd: RDD[(Int, String)] = sc.makeRDD(Array((3,"aa"),(6,"cc"),(2,"bb"),(1,"dd")))

        //3.2 按照key的正序(默认顺序)
        rdd.sortByKey(true).collect().foreach(println)

        //3.3 按照key的倒序
        rdd.sortByKey(false).collect().foreach(println)

        //4.关闭连接
        sc.stop()
    }
}

 1.3.10,mapValues()只对V进行操作

object KeyValue08_mapValues {

    def main(args: Array[String]): Unit = {

        //1.创建SparkConf并设置App名称
        val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")

        //2.创建SparkContext,该对象是提交Spark App的入口
        val sc: SparkContext = new SparkContext(conf)

        //3具体业务逻辑
        //3.1 创建第一个RDD
        val rdd: RDD[(Int, String)] = sc.makeRDD(Array((1, "a"), (1, "d"), (2, "b"), (3, "c")))

        //3.2 对value添加字符串"|||"
        rdd.mapValues(_ + "|||").collect().foreach(println)

        //4.关闭连接
        sc.stop()
    }
}

 


1.3.11,join()连接 将相同key对应的多个value关联在一起

需求说明:创建两个pairRDD,并将key相同的数据聚合到一个元组。
object KeyValue09_join {

    def main(args: Array[String]): Unit = {

        //1.创建SparkConf并设置App名称
        val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")

        //2.创建SparkContext,该对象是提交Spark App的入口
        val sc: SparkContext = new SparkContext(conf)

        //3具体业务逻辑
        //3.1 创建第一个RDD
        val rdd: RDD[(Int, String)] = sc.makeRDD(Array((1, "a"), (2, "b"), (3, "c")))

        //3.2 创建第二个pairRDD
        val rdd1: RDD[(Int, Int)] = sc.makeRDD(Array((1, 4), (2, 5), (4, 6)))

        //3.3 join操作并打印结果
        rdd.join(rdd1).collect().foreach(println)

        //4.关闭连接
        sc.stop()
    }
}

 


1.3.12,cogroup() 类似全连接,但是在同一个RDD中对key聚合

操作两个RDD中的KV元素,每个RDD中相同key中的元素分别聚合成一个集合。

object KeyValue10_cogroup {

    def main(args: Array[String]): Unit = {

        //1.创建SparkConf并设置App名称
        val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")

        //2.创建SparkContext,该对象是提交Spark App的入口
        val sc: SparkContext = new SparkContext(conf)

        //3具体业务逻辑
        //3.1 创建第一个RDD
        val rdd: RDD[(Int, String)] = sc.makeRDD(Array((1,"a"),(2,"b"),(3,"c")))

        //3.2 创建第二个RDD
        val rdd1: RDD[(Int, Int)] = sc.makeRDD(Array((1,4),(2,5),(3,6)))

        //3.3 cogroup两个RDD并打印结果
        rdd.cogroup(rdd1).collect().foreach(println)

        //4.关闭连接
        sc.stop()
    }
}

2 Action行动算子

行动算子是触发了整个作业的执行。因为转换算子都是懒加载,并不会立即执行。

2.1,reduce()聚合

1)函数签名:def reduce(f: (T, T) => T): T

2)功能说明:f函数聚集RDD中的所有元素,先聚合分区内数据,再聚合分区间数据。

3)需求说明:创建一个RDD,将所有元素聚合得到结果 

object action01_reduce {

    def main(args: Array[String]): Unit = {

        //1.创建SparkConf并设置App名称
        val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")

        //2.创建SparkContext,该对象是提交Spark App的入口
        val sc: SparkContext = new SparkContext(conf)

        //3具体业务逻辑
        //3.1 创建第一个RDD
        val rdd: RDD[Int] = sc.makeRDD(List(1,2,3,4))

        //3.2 聚合数据
        val reduceResult: Int = rdd.reduce(_+_)
        println(reduceResult)

        //4.关闭连接
        sc.stop()
    }
}


2.2,collect()以数组的形式返回数据集

 

1)函数签名:def collect(): Array[T]

2)功能说明:在驱动程序中,以数组Array的形式返回数据集的所有元素。

注意:所有的数据都会被拉取到Driver端,慎用 

3)需求说明:创建一个RDD,并将RDD内容收集到Driver端打印

object action02_collect {

    def main(args: Array[String]): Unit = {

        //1.创建SparkConf并设置App名称
        val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")

        //2.创建SparkContext,该对象是提交Spark App的入口
        val sc: SparkContext = new SparkContext(conf)

        //3具体业务逻辑
        //3.1 创建第一个RDD
        val rdd: RDD[Int] = sc.makeRDD(List(1,2,3,4))

        //3.2 收集数据到Driver
        rdd.collect().foreach(println)

        //4.关闭连接
        sc.stop()
    }
}


2.3,count()返回RDD中元素个数

1)函数签名:def count(): Long

2)功能说明:返回RDD中元素的个数

3)需求说明:创建一个RDD,统计该RDD的条数

object action03_count {

    def main(args: Array[String]): Unit = {

        //1.创建SparkConf并设置App名称
        val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")

        //2.创建SparkContext,该对象是提交Spark App的入口
        val sc: SparkContext = new SparkContext(conf)

        //3具体业务逻辑
        //3.1 创建第一个RDD
        val rdd: RDD[Int] = sc.makeRDD(List(1,2,3,4))

        //3.2 返回RDD中元素的个数
        val countResult: Long = rdd.count()
        println(countResult)

        //4.关闭连接
        sc.stop()
    }
}

 

2.4,first()返回RDD中的第一个元素

1)函数签名: def first(): T

2)功能说明:返回RDD中的第一个元素

 3)需求说明:创建一个RDD,返回该RDD中的第一个元素

object action04_first {

    def main(args: Array[String]): Unit = {

        //1.创建SparkConf并设置App名称
        val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")

        //2.创建SparkContext,该对象是提交Spark App的入口
        val sc: SparkContext = new SparkContext(conf)

        //3具体业务逻辑
        //3.1 创建第一个RDD
        val rdd: RDD[Int] = sc.makeRDD(List(1,2,3,4))

        //3.2 返回RDD中元素的个数
        val firstResult: Int = rdd.first()
        println(firstResult)

        //4.关闭连接
        sc.stop()
    }
}

 

2.5,take()返回由RDD前n个元素组成的数组

1)函数签名: def take(num: Int): Array[T]

2)功能说明:返回一个由RDD的前n个元素组成的数组

3)需求说明:创建一个RDD,统计该RDD的条数

object action05_take {

    def main(args: Array[String]): Unit = {

        //1.创建SparkConf并设置App名称
        val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")

        //2.创建SparkContext,该对象是提交Spark App的入口
        val sc: SparkContext = new SparkContext(conf)

        //3具体业务逻辑
        //3.1 创建第一个RDD
        val rdd: RDD[Int] = sc.makeRDD(List(1,2,3,4))

        //3.2 返回RDD中元素的个数
        val takeResult: Array[Int] = rdd.take(2)
        println(takeResult)

        //4.关闭连接
        sc.stop()
    }
}

 

2.6,takeOrdered()返回该RDD排序后前n个元素组成的数组

1)函数签名: def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T]

2)功能说明:返回该RDD排序后的前n个元素组成的数组

def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T] = withScope {
    ......
    if (mapRDDs.partitions.length == 0) {
        Array.empty
    } else {
        mapRDDs.reduce { (queue1, queue2) =>
          queue1 ++= queue2
          queue1
        }.toArray.sorted(ord)
    }
}

 3)需求说明:创建一个RDD,获取该RDD排序后的前2个元素 

object action06_takeOrdered{

    def main(args: Array[String]): Unit = {

        //1.创建SparkConf并设置App名称
        val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")

        //2.创建SparkContext,该对象是提交Spark App的入口
        val sc: SparkContext = new SparkContext(conf)

        //3具体业务逻辑
        //3.1 创建第一个RDD
        val rdd: RDD[Int] = sc.makeRDD(List(1,3,2,4))

        //3.2 返回RDD中元素的个数
        val result: Array[Int] = rdd.takeOrdered(2)
        println(result)

        //4.关闭连接
        sc.stop()
    }
}

2.7,aggregate()案例

3)需求说明:创建一个RDD,将所有元素相加得到结果

object action07_aggregate {

    def main(args: Array[String]): Unit = {

        //1.创建SparkConf并设置App名称
        val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")

        //2.创建SparkContext,该对象是提交Spark App的入口
        val sc: SparkContext = new SparkContext(conf)

        //3具体业务逻辑
        //3.1 创建第一个RDD
        val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4),8)

        //3.2 将该RDD所有元素相加得到结果
        //val result: Int = rdd.aggregate(0)(_ + _, _ + _)
        val result: Int = rdd.aggregate(10)(_ + _, _ + _)

        println(result)

        //4.关闭连接
        sc.stop()
    }
}

 


2.8,fold()案例

3)需求说明:创建一个RDD,将所有元素相加得到结果

object action08_fold {

    def main(args: Array[String]): Unit = {

        //1.创建SparkConf并设置App名称
        val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")

        //2.创建SparkContext,该对象是提交Spark App的入口
        val sc: SparkContext = new SparkContext(conf)

        //3具体业务逻辑
        //3.1 创建第一个RDD
        val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4))

        //3.2 将该RDD所有元素相加得到结果
        val foldResult: Int = rdd.fold(0)(_+_)
        println(foldResult)

        //4.关闭连接
        sc.stop()
    }
}


2.9,countByKey()统计每种key的个数

 1)函数签名:def countByKey(): Map[K, Long]

2)功能说明:统计每种key的个数

3)需求说明:创建一个PairRDD,统计每种key的个数 

object action09_countByKey {

    def main(args: Array[String]): Unit = {

        //1.创建SparkConf并设置App名称
        val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")

        //2.创建SparkContext,该对象是提交Spark App的入口
        val sc: SparkContext = new SparkContext(conf)

        //3具体业务逻辑
        //3.1 创建第一个RDD
        val rdd: RDD[(Int, String)] = sc.makeRDD(List((1, "a"), (1, "a"), (1, "a"), (2, "b"), (3, "c"), (3, "c")))

        //3.2 统计每种key的个数
        val result: collection.Map[Int, Long] = rdd.countByKey()
        println(result)

        //4.关闭连接
        sc.stop()
    }
}


2.10,save相关算子

1saveAsTextFile(path)保存成Text文件

(1)函数签名

(2)功能说明:将数据集的元素以textfile的形式保存到HDFS文件系统或者其他支持的文件系统,对于每个元素,Spark将会调用toString方法,将它装换为文件中的文本

2saveAsSequenceFile(path) 保存成Sequencefile文件

(1)函数签名

(2)功能说明:将数据集中的元素以Hadoop Sequencefile的格式保存到指定的目录下,可以使HDFS或者其他Hadoop支持的文件系统。

注意:只有kv类型RDD有该操作,单值的没有

3saveAsObjectFile(path) 序列化成对象保存到文件

(1)函数签名

(2)功能说明:用于将RDD中的元素序列化成对象,存储到文件中。

4)代码实现

object action10_save {

    def main(args: Array[String]): Unit = {

        //1.创建SparkConf并设置App名称
        val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")

        //2.创建SparkContext,该对象是提交Spark App的入口
        val sc: SparkContext = new SparkContext(conf)

        //3具体业务逻辑
        //3.1 创建第一个RDD
        val rdd: RDD[Int] = sc.makeRDD(List(1,2,3,4),2)

        //3.2 保存成Text文件
        rdd.saveAsTextFile("output")

        //3.3 序列化成对象保存到文件
        rdd.saveAsObjectFile("output1")

        //3.4 保存成Sequencefile文件
        rdd.map((_,1)).saveAsSequenceFile("output2")

        //4.关闭连接
        sc.stop()
    }
}

2.11,foreach(f)遍历RDD中每一个元素

3)需求说明:创建一个RDD,对每个元素进行打印 

object action11_foreach {

    def main(args: Array[String]): Unit = {

        //1.创建SparkConf并设置App名称
        val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")

        //2.创建SparkContext,该对象是提交Spark App的入口
        val sc: SparkContext = new SparkContext(conf)

        //3具体业务逻辑
        //3.1 创建第一个RDD
        // val rdd: RDD[Int] = sc.makeRDD(List(1,2,3,4),2)
        val rdd: RDD[Int] = sc.makeRDD(List(1,2,3,4))

        //3.2 收集后打印
        rdd.map(num=>num).collect().foreach(println)

        println("****************")
        //3.3 分布式打印
        rdd.foreach(println)

        //4.关闭连接
        sc.stop()
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/380687.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

c语言入门-5-字符串

c语言入门-5-字符串正文1、字符串怎么用方式一方式二2、字符串的长度深度解析1 字符串的特性2 \0 的含义3 ascii码表下一篇正文 1、字符串怎么用 方式一 // 字符串的标准使用方式&#xff0c;用char类型的数组表示字符串 #include<stdio.h> int main() {char arr[] &…

语音识别技术对比分析

文章目录一、语音识别产品对比二、百度语音识别产品1、套餐及价格&#xff1a;2、官网3、调研结果三、华为语音识别产品四、阿里云语音识别产品1、套餐及价格&#xff1a;2、官网地址3、调研结果五、科大讯飞语音识别产品1、套餐及价格&#xff1a;2、官网3、调研结果六、有道语…

一、Redis入门概述(是什么,能干嘛,去哪下,怎么玩)

一. redis是什么&#xff1f; Redis:REmote Dictionary Server(远程字典服务器)官方解释&#xff1a; Remote Dictionary Server(远程字典服务)是完全开源的&#xff0c;使用ANSIC语言编写遵守BSD协议&#xff0c;是一个高性能的Key-Value数据库提供了丰富的数据结构&#xff…

何谓dB , dB怎么理解?

dB 是什么单位 ?愈低愈好吗?对于声频 ( 声学及电子声学 ) 方面的单位&#xff0c;它是以分贝(decibel &#xff0c;dB ) 来做结果的。斯多里一生专注于科学,1876 发明电话&#xff0c;我们都知道贝尔发明了电话&#xff0c;然而重要的是&#xff0c;他发现我们人类耳朵对声音…

一文带你了解什么是PACS系统源码

▷ 运维级带三维重建和还原的医院PACS系统有源码&#xff0c;有演示&#xff0c;带使用手册和操作说明书。 ▷ PACS系统及影像存取与传输系统( Picture Archiving and Communication System)&#xff0c;为以实现医学影像数字化存储、诊断为核心任务&#xff0c;从医学影像设备…

uniapp小程序接入腾讯地图sdk

新建一个项目。配置uniapp配置文件设置小程序的appid注意&#xff1a;匿名用户可能存在地理定位失效。查uniapp官网官网->apiuni.getLocation(OBJECT) 获取当前的地理位置、速度。属性&#xff1a;success匿名函数返回值&#xff1a;uni.getLocation({type: gcj02,success: …

工作实战之密码防重放攻击

目录 前言 一、登录认证密码加密 二、bcrypt加密密码不一样&#xff0c;匹配原理 1.程序运行现象 2.原理解释 三、密码防重放 总结 前言 密码重放攻击&#xff1a;请求被攻击者获取&#xff0c;并重新发送给认证服务器&#xff0c;从而达到认证通过的目的 一、登录认证密…

系列八、SQL优化

一、插入数据 如果我们需要一次性往数据库表中插入多条记录&#xff0c;可以从以下三个方面进行优化。1.1、优化方案一&#xff08;批量插入数据&#xff09; Insert into tb_test values(1,Tom),(2,Cat),(3,Jerry); 1.2、优化方案二&#xff08;手动控制事务&#xff09; s…

CEC2005:星雀优化算法(Nutcracker optimizer algorithm,NOA)求解CEC2005(提供MATLAB代码)

一、星雀优化算法NOA 星雀优化算法(Nutcracker optimizer algorithm,NOA)由Mohamed Abdel-Basset等人于2023年提出&#xff0c;该算法模拟星雀的两种行为&#xff0c;即&#xff1a;在夏秋季节收集并储存食物&#xff0c;在春冬季节搜索食物的存储位置。 星鸦单独或成对活动&…

C语言循环语句do while和嵌套循环语句讲解

C do…while 循环 不像 for 和 while 循环&#xff0c;它们是在循环头部测试循环条件。在 C 语言中&#xff0c;do…while 循环是在循环的尾部检查它的条件。 do…while 循环与 while 循环类似&#xff0c;但是 do…while 循环会确保至少执行一次循环。 语法 C 语言中 do…w…

杂谈:数组index问题和对象key问题

面试题一&#xff1a; var arr [1, 2, 3, 4] 问&#xff1a;arr[1] ?; arr[1] ?答&#xff1a;arr[1] 2; arr[1] 2 这里可以再分为两个问题&#xff1a; 1、数组赋值 var arr [1, 2, 3, 4]arr[1] 10; // 数字场景 arr[10] 1; // 字符串场景 arr[a] 1; // 字符串…

索莫菲模型的一些理解 Smomerfeld Model

如何解释传统热容算出来的数值与量子模型下的区别&#xff1f; 因为只有费米能附近的电子才能够进行移动&#xff0c;这个是问题的差别所在 我们下面就来介绍如何求费米能&#xff08;费米能的计算&#xff09; 既然费米能附近的电子很重要&#xff0c;那么附近的电子有多少很…

语义分割前储知识

CNN中的Layers 除了正向传播计算loss&#xff0c;反向传播更新parameters&#xff0c;我们还需要知道参数是如何计算的&#xff0c;这个很重要。 我们这里介绍几个在deep learning中经常用到的几个层&#xff0c;dense layer&#xff08;全连接层&#xff09;&#xff0c;con…

Redis源码---有序集合为何能同时支持点查询和范围查询

目录 前言 Sorted Set 基本结构 跳表的设计与实现 跳表数据结构 跳表结点查询 跳表结点层数设置 哈希表和跳表的组合使用 前言 有序集合&#xff08;Sorted Set&#xff09;是 Redis 中一种重要的数据类型&#xff0c;它本身是集合类型&#xff0c;同时也可以支持集合中…

记一次Nodejs减低npm版本的踩坑日记

使用了npm install -g npm6.4.1指令之后&#xff0c;把npm版本减低了&#xff0c;让后悲催的就来了。 由于npm 6.4.1 已经过时&#xff0c;导致运行npm时出现 npm does not support Node.js v18.14.2 版本不兼容问题 升级npm版本&#xff0c;npm install -g npmlatest 没用还是…

DM-VIO论文翻译

简介 DM-VIO: Delayed Marginalization Visual-Inertial Odometry DM-VIO: 延迟边缘化惯性视觉里程计 花了两天时间捏着鼻子把这篇论文翻译完了&#xff0c;很多术语和状态的表达方式可能是和这个团队以前的工作DSO以及VI-DSO保持了一致&#xff0c;所以看起来很是费劲&#…

STM32开发(17)----CubeMX配置CRC

CubeMX配置CRC前言一、什么是CRC&#xff1f;二、实验过程1.STM32CubeMX配置2.代码实现重载printf3.实验结果总结前言 本章介绍使用STM32CubeMX对CRC进行配置的方法&#xff0c;CRC的目的是保证数据的完整性&#xff0c;所有的STM32芯片都内置了一个硬件的CRC计算模块&#xf…

指针的进阶【下篇】

文章目录&#x1f4c0;8.指向函数指针数组的指针&#x1f4c0;9.回调函数&#x1f4c0;8.指向函数指针数组的指针 &#x1f330;请看代码与注释&#x1f447; int Add(int x, int y) {return x y; } int Sub(int x, int y) {return x - y; } int main() {int (*pf)(int, int…

T3 出行云原生容器化平台实践

作者&#xff1a;林勇&#xff0c;就职于南京领行科技股份有限公司&#xff0c;担任云原生负责人&#xff0c;也是公司容器化项目的负责人。主要负责 T3 出行云原生生态相关的所有工作&#xff0c;如服务容器化、多 Kubernetes 集群建设、应用混部、降本增效、云原生可观测性基…

2023年中小企业实施智能制造的建议

智能制造的载体是制造系统&#xff0c;制造系统从微观到宏观有不同的层次&#xff0c;主要包括制造装备、制造单元、制造车间&#xff08;工厂&#xff09;、制造企业和企业生态等。随着智能制造的深入推进&#xff0c;未来智能制造将向以下五个方向发展。 &#xff08;一&…