大数据技术原理与应用期末复习-代码

news2024/12/25 5:18:00

RDD:

// 导入SparkConf和SparkContext类,用于配置和创建Spark上下文
import org.apache.spark.{SparkConf, SparkContext}

// 定义一个名为TopN的对象
object TopN {
  def main(args: Array[String]): Unit = {
    // 创建一个新的SparkConf对象,并设置应用程序名称为"TopN",主节点为"local"
    val conf = new SparkConf().setAppName("TopN").setMaster("local")
    val sc = new SparkContext(conf)
    // 设置日志级别为ERROR,以减少输出的信息量
    sc.setLogLevel("ERROR")
    // 从HDFS读取数据,使用2个分区
    val lines = sc.textFile("hdfs://localhost:9000/user/hadoop/spark/mycode/rdd/examples", 2)  
    // 初始化计数器
    var num = 0
    
    // 对读取的数据进行一系列转换操作:
    // 1. 过滤掉空行或长度为0的行
    // 2. 过滤掉不能被逗号分割成4部分的行
    // 3. 将每一行按逗号分割成两部分
    // 4. 将分割后的第二部分转换为整数
    // 5. 按照第二部分(整数)降序排序
    // 6. 取排序后的前5个元素
    // 7. 遍历这5个元素,打印其索引和值
    val result = lines.filter(line => (line.trim.length > 0) && (line.split(",").length == 4))
                      .map(_.split(",")(2))
                      .map(x => (x.toInt,""))
                      .sortByKey(false)
                      .map(x => x._1)
                      .take(5)
                      .foreach(x => {
                        num = num + 1
                        println(num + "\t" + x)
                      })
  }
}


import org.apache.spark.{SparkConf, SparkContext}
object MaxAndMin {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("MaxAndMin").setMaster("local")
    val sc = new SparkContext(conf)
    sc.setLogLevel("ERROR")
   
    val lines = sc.textFile("hdfs://localhost:9000/user/hadoop/spark/chapter5", 2)

    // 对读取的数据进行一系列转换操作:
    // 1. 过滤掉空行或长度为0的行
    // 2. 将每一行按逗号分割成两部分:键(key)和值(value)
    // 3. 按键分组
    // 4. 计算每个键对应的最大值和最小值
    // 5. 收集结果并打印
    val result = lines.filter(_.trim.length > 0)
                      .map(line => ("key", line.trim.toInt))
                      .groupByKey()
                      .map(x => {
                        var min = Integer.MAX_VALUE
                        var max = Integer.MIN_VALUE
                        for (num <- x._2) {
                          if (num > max) {
                            max = num
                          }
                          if (num < min) {
                            min = num
                          }
                        }
                        (max, min)
                      })
                      .collect()
                      .foreach(x => {
                        println("max\t" + x._1)
                        println("min\t" + x._2)
                      })
  }
}

案例3:文件排序

任务描述:

有多个输入文件,每个文件中的每一行内容均为一个整数。要求读取所有文件中的整数,进行排序后,输出到一个新的文件中,输出的内容个数为每行两个整数,第一个整数为第二个整数的排序位次,第二个整数为原待排序的整数。

                                    


import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.HashPartitioner
object FileSort {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("FileSort")
    val sc = new SparkContext(conf)
    
    // 设置数据文件路径
    val dataFile = "file:///usr/local/spark/mycode/rdd/data"
    
    // 从指定路径读取数据文件,使用3个分区
    val lines = sc.textFile(dataFile, 3)
    
    // 初始化索引变量
    var index = 0
    
    // 对读取的数据进行一系列转换操作:
    // 1. 过滤掉空行或长度为0的行
    // 2. 将每一行按逗号分割成两部分:键(key)和值(value)
    // 3. 使用HashPartitioner进行分区
    // 4. 按键排序
    // 5. 添加索引并重新组合结果
    val result = lines.filter(_.trim.length > 0)
                      .map(n => (n.trim.toInt, ""))
                      .partitionBy(new HashPartitioner(1))
                      .sortByKey()
                      .map(t => {
                        index += 1
                        (index, t._1)
                      })
    
    // 将处理后的结果保存到指定路径
    result.saveAsTextFile("file:///usr/local/spark/mycode/rdd/examples/result")
  }
}

案例4:二次排序

任务要求:

对于一个给定的文件(数据如file1.txt所示),请对数据进行排序,首先根据第1列数据降序排序,如果第1列数据相等,则根据第2列数据降序排序。

// 定义一个SecondarySortKey类,用于实现自定义排序逻辑
package cn.edu.xmu.spark

class SecondarySortKey(val first: Int, val second: Int) extends Ordered[SecondarySortKey] with Serializable {
  // 实现compare方法,用于比较两个SecondarySortKey对象
  def compare(other: SecondarySortKey): Int = {
    if (this.first - other.first != 0) {
      this.first - other.first
    } else {
      this.second - other.second
    }
  }
}

// 定义一个SecondarySortApp对象,用于执行主程序
object SecondarySortApp {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("SecondarySortApp").setMaster("local")
    // 创建SparkContext上下文
    val sc = new SparkContext(conf)    
    // 读取文件数据
    val lines = sc.textFile("file:///usr/local/spark/mycode/rdd/examples/file1.txt", 1)  
    // 将每行数据转换为SecondarySortKey对象
    val pairWithSortKey = lines.map(line => new SecondarySortKey(line.split(" ")(0).toInt, line.split(" ")(1).toInt))
    // 按照SecondarySortKey进行排序
    val sorted = pairWithSortKey.sortByKey(false)
    // 提取排序后的结果
    val sortedResult = sorted.map(sortedLine => sortedLine._2)
    // 打印排序结果
    sortedResult.collect().foreach(println)
  }
}

案例五:连接操作

任务描述:在推荐领域有一个著名的开放测试集,下载链接是:http://grouplens.org/datasets/movielens/,该测试集包含三个文件,分别是ratings.dat、sers.dat、movies.dat,具体介绍可阅读:README.txt。请编程实现:通过连接ratings.dat和movies.dat两个文件得到平均得分超过4.0的电影列表,采用的数据集是:ml-1m

import org.apache.spark._
import org.apache.spark.SparkContext._

object SparkJoin {
  def main(args: Array[String]): Unit = {

    // 检查命令行参数的数量是否正确,确保提供三个参数:评分文件路径、电影文件路径、输出路径
    if (args.length != 3) {
      println("usage is SparkJoin <rating> <movie> <output>")
      return
    }
    val conf = new SparkConf().setAppName("SparkJoin").setMaster("local")
    // 创建Spark上下文对象
    val sc = new SparkContext(conf)

    try {
      // 从HDFS文件系统读取评分数据
      val textFile = sc.textFile(args(0))

      // 提取(movieId, rating)键值对
      val ratings = textFile.map(line => {
        val fields = line.split("::")
        (fields(1).toInt, fields(2).toDouble)
      })

      // 计算每个电影的平均评分
      val movieScores = ratings
        .groupByKey() // 将相同电影ID的评分组合在一起
        .map(data => { // 对每个电影ID的评分组计算平均值
          val avg = data._2.sum / data._2.size
          (data._1, avg)
        })

      // 从HDFS文件系统读取电影数据
      val movies = sc.textFile(args(1))
      // 提取(MovieID, MovieName)键值对,并基于MovieID创建键值对
      val moviesKey = movies.map(line => {
        val fields = line.split("::")
        (fields(0).toInt, fields(1)) // (MovieID, MovieName)
      }).keyBy(tup => tup._1)

      // 通过join操作合并电影评分和电影信息,过滤出平均评分大于4.0的电影,并格式化输出
      val result = movieScores
        .keyBy(tup => tup._1) // 基于电影ID创建键值对
        .join(moviesKey) // 将评分与电影信息进行连接
        .filter(f => f._2._1._2 > 4.0) // 过滤出平均评分大于4.0的电影
        .map(f => (f._1, f._2._1._2, f._2._2._2)) // 格式化为 (MovieID, AverageRating, MovieName)

      // 将结果保存到指定的输出路径
      result.saveAsTextFile(args(2))

    } finally {
      // 确保在程序结束时停止Spark上下文
      sc.stop()
    }
  }
}

wordcount两道:

MapReduce实现wordcount

package org.apache.hadoop.examples;

import java.io.IOException;
import java.util.Iterator;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

public class WordCount {

    // 默认构造函数,无参数。
    public WordCount() {
    }

    public static void main(String[] args) throws Exception {
        // 创建一个配置对象,用于读取命令行参数和配置文件。
        Configuration conf = new Configuration();
        
        // 解析命令行参数,并将非Hadoop通用选项的参数分离出来。
        String[] otherArgs = (new GenericOptionsParser(conf, args)).getRemainingArgs();
        
        // 检查输入参数是否正确。至少需要两个参数:一个或多个输入路径和一个输出路径。
        if (otherArgs.length < 2) {
            System.err.println("Usage: wordcount <in> [<in>...] <out>");
            System.exit(2);
        }
        
        // 初始化一个新的MapReduce作业,并设置其名称为"word count"。
        Job job = Job.getInstance(conf, "word count");
        
        // 设置包含main方法的类作为作业的主类,以便找到相关的Mapper、Reducer和其他资源。
        job.setJarByClass(WordCount.class);
        
        // 设置Mapper类,它负责处理输入数据并生成中间键值对。
        job.setMapperClass(TokenizerMapper.class);
        
        // 设置Combiner类(可选),它在映射阶段后立即对中间结果进行局部聚合,以减少传输的数据量。
        job.setCombinerClass(IntSumReducer.class);
        
        // 设置Reducer类,它负责接收来自Mapper的中间键值对,并执行最终的聚合操作。
        job.setReducerClass(IntSumReducer.class);
        
        // 定义作业的输出格式,指定键和值的类型。
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);

        // 添加所有提供的输入路径到作业中。最后一个参数总是作为输出路径。
        for (int i = 0; i < otherArgs.length - 1; ++i) {
            FileInputFormat.addInputPath(job, new Path(otherArgs[i]));
        }
        
        // 设置输出目录,该目录必须不存在。
        FileOutputFormat.setOutputPath(job, new Path(otherArgs[otherArgs.length - 1]));

        // 提交作业并等待完成,成功返回0,失败返回1。
        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }

    // Reducer类用于汇总每个单词出现的次数。
    public static class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
        private IntWritable result = new IntWritable();

        @Override
        protected void reduce(Text key, Iterable<IntWritable> values, Context context)
                throws IOException, InterruptedException {
            int sum = 0;
            
            // 遍历所有的IntWritable值,累加它们以计算总和。
            for (IntWritable val : values) {
                sum += val.get(); // 将IntWritable转换为原始int类型
            }
            
            // 设置sum到result中,以便可以序列化。
            result.set(sum);
            
            // 输出<key, result>对到context,即单词及其出现的次数。
            context.write(key, result);
        }
    }

    // Mapper类负责将输入文本拆分为单词,并为每个单词生成一个计数为1的键值对。
    public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable> {
        private final static IntWritable one = new IntWritable(1); // 代表每个单词出现一次
        private Text word = new Text(); // 用于存储当前处理的单词

        @Override
        protected void map(Object key, Text value, Context context)
                throws IOException, InterruptedException {
            // 使用StringTokenizer来分割文本行中的单词。
            StringTokenizer itr = new StringTokenizer(value.toString());
            
            // 对于每一个单词,创建一个<单词, 1>键值对并写入到context中。
            while (itr.hasMoreTokens()) {
                word.set(itr.nextToken()); // 设置当前单词
                context.write(word, one); // 写入键值对到context中
            }
        }
    }
}

Spark SQL实现wordcount

package com.ht.final.wordcount

import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.SparkConf

object WordCountSparkSQL {

  def main(args: Array[String]): Unit = {
    // 初始化Spark配置,并创建一个本地模式的SparkSession。
    // local[*]表示使用所有可用的处理器核心来运行任务。
    val sparkConf = new SparkConf().setAppName("WordCount").setMaster("local[*]")
    val spark = SparkSession.builder.config(sparkConf).getOrCreate()
    
    // 设置日志级别为警告,减少控制台输出的日志信息量。
    spark.sparkContext.setLogLevel("WARN")
    
    import spark.implicits._ // 导入隐式转换,用于支持DataFrame/Dataset操作。

    try {
      // 读取文本文件内容到Dataset中,每行作为一个字符串元素。
      val fileDS: Dataset[String] = spark.read.textFile("D:\\Document\\temp\\wordcount\\input.txt")

      // 将每一行按照制表符('\t')分割成多个单词,形成一个新的包含单个单词的Dataset。
      val wordDS: Dataset[String] = fileDS.flatMap(_.split("\t"))

      // 注册临时视图(类似数据库表),以便能够通过SQL查询访问数据。
      wordDS.createOrReplaceTempView("word_count")

      // 定义SQL查询语句,计算每个单词出现的次数,并按出现次数降序排列。
      val sqlQuery = "SELECT value AS word, COUNT(*) AS counts FROM word_count GROUP BY word ORDER BY counts DESC"

      // 执行SQL查询并获取结果作为DataFrame。
      val resultDF: DataFrame = spark.sql(sqlQuery)

      // 展示查询结果的前20行,默认情况下。
      resultDF.show()

    } finally {
      // 确保在程序结束时关闭资源。
      spark.stop()
    }
  }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2265098.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

CNN和Transfomer介绍

文章目录 CNN和Transfomer介绍CNN和Transfomer的区别1. **基本概念**2. **数据处理方式**3. **模型结构差异**4. **应用场景区别** 自注意力机制1. **自注意力机制的概念**2. **自注意力机制的实现步骤**3. **自注意力机制的优势** Transformer结构组成1. **多头注意力层&#…

ArcGIS Pro 3.4新功能3:空间统计新特性,基于森林和增强分类与回归,过滤空间自相关

目录 应用 1&#xff1a;它是相关性还是托布勒第一定律&#xff1f; 应用 2&#xff1a;将空间带入非空间模型 结论 在 ArcGIS Pro 3.4 中&#xff0c;我们在新的空间组件实用程序&#xff08;Moran 特征向量&#xff09;工具集中发布了一个新工具 - 从字段过滤空间自相关。…

webserver log日志系统的实现

参考博客&#xff1a;https://blog.csdn.net/weixin_51322383/article/details/130474753 https://zhuanlan.zhihu.com/p/721880618 阻塞队列blockqueue 1、阻塞队列的设计流程是什么样的 它的底层是用deque进行管理的 阻塞队列主要是围绕着生产者消费者模式进行多线程的同步和…

深度学习实战之超分辨率算法(tensorflow)——ESPCN

espcn原理算法请参考上一篇论文&#xff0c;这里主要给实现。 数据集如下&#xff1a;尺寸相等即可 针对数据集&#xff0c;生成样本代码preeate_data.py import imageio from scipy import misc, ndimage import numpy as np import imghdr import shutil import os import…

Unity3d 基于UGUI和VideoPlayer 实现一个多功能视频播放器功能(含源码)

前言 随着Unity3d引擎在数字沙盘、智慧工厂、数字孪生等场景的广泛应用&#xff0c;视频已成为系统程序中展示时&#xff0c;不可或缺的一部分。在 Unity3d 中&#xff0c;我们可以通过强大的 VideoPlayer 组件和灵活的 UGUI 系统&#xff0c;将视频播放功能无缝集成到用户界面…

WebGAL 项目下载及安装教程

WebGAL 项目下载及安装教程 WebGAL A brand new web Visual Novel engine | 全新的网页端视觉小说引擎 [这里是图片001] 项目地址: https://gitcode.com/gh_mirrors/web/WebGAL 1、项目介绍 WebGAL 是一个全新的网页端视觉小说引擎&#xff0c;旨在提供美观、功能强大且易于…

虚幻引擎是什么?

Unreal Engine&#xff0c;是一款由Epic Games开发的游戏引擎。该引擎主要是为了开发第一人称射击游戏而设计&#xff0c;但现在已经被成功地应用于开发模拟游戏、恐怖游戏、角色扮演游戏等多种不同类型的游戏。虚幻引擎除了被用于开发游戏&#xff0c;现在也用于电影的虚拟制片…

Kubernetes 架构图和组件

&#x1f9d1; 博主简介&#xff1a;CSDN博客专家&#xff0c;历代文学网&#xff08;PC端可以访问&#xff1a;历代文学&#xff0c;移动端可微信小程序搜索“历代文学”&#xff09;总架构师&#xff0c;15年工作经验&#xff0c;精通Java编程&#xff0c;高并发设计&#xf…

GESP2024年12月认证C++五级( 第三部分编程题(2))

参考程序&#xff1a; #include<bits/stdc.h> using namespace std; #define ll long long int n, m; int cnt[1010]; vector<int> cs[1010]; ll calc(int aim) {int cur_cnt cnt[1];ll res 0;vector<int> tmp;for (int i 2; i<n; i){int buy max((…

基于DockerCompose搭建Redis主从哨兵模式

linux目录结构 内网配置 哨兵配置文件如下&#xff0c;创建3个哨兵配置文件 # sentinel26379.conf sentinel26380.conf sentinel26381.conf 内容如下 protected-mode no sentinel monitor mymaster redis-master 6379 2 sentinel down-after-milliseconds mymaster 60000 s…

npm error code ETIMEDOUT

参考:https://blog.csdn.net/qq_38572963/article/details/142052986 二、解决办法 1、清空缓存 npm cache clean --force 2、查看当前的npm镜像设置 npm config get registry 3、切换新镜像源 npm config set registry https://registry.npmmirror.com 4、查看新源是否设置成功…

【终端工具】FinalShell v4.5.12 官方版

1.下载地址 【终端工具】FinalShell v4.5.12 官方版 2.简介 FinalShell是一款免费的跨平台远程管理工具&#xff0c;专为开发者和运维人员设计。它支持通过 SSH、SFTP 等方式连接到 Linux 和 Windows 服务器&#xff0c;提供类似于终端的操作界面。除了常规的远程登录功能&a…

微前端qiankun的使用——实践

qiankun 创建主应用项目——vue2 main.js注册子应用 $ yarn add qiankun # 或者 npm i qiankun -Simport { registerMicroApps, start } from qiankun; import Vue from "vue"; import App from "./App.vue"; import router from "./router"; …

ansible play-book玩法

使用ansible-playbook实现安装nginx_ansible 安装nginx-CSDN博客文章浏览阅读1.5k次&#xff0c;点赞14次&#xff0c;收藏19次。本文详细介绍了如何在Linux环境中准备Ansible环境&#xff0c;包括配置主机、下载和安装Ansible&#xff0c;以及使用yum模块和tar包源码安装Nginx…

Require:离线部署 Sourcegraph

Sourcegraph 使读取、编写和修复代码变得容易——即使在庞大而复杂的代码库中。 代码搜索&#xff1a;搜索所有分支和所有代码主机的所有存储库。代码智能&#xff1a;导航代码、查找引用、查看代码所有者、跟踪历史记录等。修复和重构&#xff1a;一次对许多存储库进行大规模更…

大数据新视界 -- Hive 集群性能监控与故障排查(2 - 16 - 14)

&#x1f496;&#x1f496;&#x1f496;亲爱的朋友们&#xff0c;热烈欢迎你们来到 青云交的博客&#xff01;能与你们在此邂逅&#xff0c;我满心欢喜&#xff0c;深感无比荣幸。在这个瞬息万变的时代&#xff0c;我们每个人都在苦苦追寻一处能让心灵安然栖息的港湾。而 我的…

Windows搭建域控服务器时本地administrator账户密码不符合要求解决办法

cmd命令行执行以下命令&#xff0c;再重试&#xff1a; net user administrator /passwordreq:yesWindows Server 2016 域控服务器搭建教程&#xff1a;https://blog.csdn.net/u010091664/article/details/122072506

【研究生必备|学术会议|高录用|见刊后1个月检索】第三届材料科学与智能制造国际学术会议(MSIM2025)

用处 1. 学术交流 参加学术会议是展示您研究成果和获取反馈的绝佳机会。在会议上&#xff0c;您可以与来自各地的研究者进行深入交流&#xff0c;讨论最新的研究动态与趋势&#xff0c;分享经验与观点。 2. 拓展人脉 学术会议汇聚了来自不同高校和研究机构的优秀学者和学生。…

ubuntu开机进入initramfs状态

虚拟机卡死成功起后进入了initramfs状态&#xff0c;可能是跟文件系统有问题或者检索不到根文件系统&#xff0c;或者是配置错误&#xff0c;系统磁盘等硬件问题导致 开机后进入如下图的界面&#xff0c; 文中有一条提示 要手动fsck 命令修复 /dev/sda1 命令如下 fsck /de…

C# 线程安全集合

文章目录 引言一、ConcurrentBag<T>二、ConcurrentQueue<T>三、ConcurrentStack<T>四、ConcurrentDictionary<TKey, TValue>五、总结引言 在多线程编程环境中,多个线程可能同时访问和操作集合数据。如果使用普通集合,很容易引发数据不一致、错误结果…