RDD:
// 导入SparkConf和SparkContext类,用于配置和创建Spark上下文
import org.apache.spark.{SparkConf, SparkContext}
// 定义一个名为TopN的对象
object TopN {
def main(args: Array[String]): Unit = {
// 创建一个新的SparkConf对象,并设置应用程序名称为"TopN",主节点为"local"
val conf = new SparkConf().setAppName("TopN").setMaster("local")
val sc = new SparkContext(conf)
// 设置日志级别为ERROR,以减少输出的信息量
sc.setLogLevel("ERROR")
// 从HDFS读取数据,使用2个分区
val lines = sc.textFile("hdfs://localhost:9000/user/hadoop/spark/mycode/rdd/examples", 2)
// 初始化计数器
var num = 0
// 对读取的数据进行一系列转换操作:
// 1. 过滤掉空行或长度为0的行
// 2. 过滤掉不能被逗号分割成4部分的行
// 3. 将每一行按逗号分割成两部分
// 4. 将分割后的第二部分转换为整数
// 5. 按照第二部分(整数)降序排序
// 6. 取排序后的前5个元素
// 7. 遍历这5个元素,打印其索引和值
val result = lines.filter(line => (line.trim.length > 0) && (line.split(",").length == 4))
.map(_.split(",")(2))
.map(x => (x.toInt,""))
.sortByKey(false)
.map(x => x._1)
.take(5)
.foreach(x => {
num = num + 1
println(num + "\t" + x)
})
}
}
import org.apache.spark.{SparkConf, SparkContext}
object MaxAndMin {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("MaxAndMin").setMaster("local")
val sc = new SparkContext(conf)
sc.setLogLevel("ERROR")
val lines = sc.textFile("hdfs://localhost:9000/user/hadoop/spark/chapter5", 2)
// 对读取的数据进行一系列转换操作:
// 1. 过滤掉空行或长度为0的行
// 2. 将每一行按逗号分割成两部分:键(key)和值(value)
// 3. 按键分组
// 4. 计算每个键对应的最大值和最小值
// 5. 收集结果并打印
val result = lines.filter(_.trim.length > 0)
.map(line => ("key", line.trim.toInt))
.groupByKey()
.map(x => {
var min = Integer.MAX_VALUE
var max = Integer.MIN_VALUE
for (num <- x._2) {
if (num > max) {
max = num
}
if (num < min) {
min = num
}
}
(max, min)
})
.collect()
.foreach(x => {
println("max\t" + x._1)
println("min\t" + x._2)
})
}
}
案例3:文件排序
任务描述:
有多个输入文件,每个文件中的每一行内容均为一个整数。要求读取所有文件中的整数,进行排序后,输出到一个新的文件中,输出的内容个数为每行两个整数,第一个整数为第二个整数的排序位次,第二个整数为原待排序的整数。
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.HashPartitioner
object FileSort {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("FileSort")
val sc = new SparkContext(conf)
// 设置数据文件路径
val dataFile = "file:///usr/local/spark/mycode/rdd/data"
// 从指定路径读取数据文件,使用3个分区
val lines = sc.textFile(dataFile, 3)
// 初始化索引变量
var index = 0
// 对读取的数据进行一系列转换操作:
// 1. 过滤掉空行或长度为0的行
// 2. 将每一行按逗号分割成两部分:键(key)和值(value)
// 3. 使用HashPartitioner进行分区
// 4. 按键排序
// 5. 添加索引并重新组合结果
val result = lines.filter(_.trim.length > 0)
.map(n => (n.trim.toInt, ""))
.partitionBy(new HashPartitioner(1))
.sortByKey()
.map(t => {
index += 1
(index, t._1)
})
// 将处理后的结果保存到指定路径
result.saveAsTextFile("file:///usr/local/spark/mycode/rdd/examples/result")
}
}
案例4:二次排序
任务要求:
对于一个给定的文件(数据如file1.txt所示),请对数据进行排序,首先根据第1列数据降序排序,如果第1列数据相等,则根据第2列数据降序排序。
// 定义一个SecondarySortKey类,用于实现自定义排序逻辑
package cn.edu.xmu.spark
class SecondarySortKey(val first: Int, val second: Int) extends Ordered[SecondarySortKey] with Serializable {
// 实现compare方法,用于比较两个SecondarySortKey对象
def compare(other: SecondarySortKey): Int = {
if (this.first - other.first != 0) {
this.first - other.first
} else {
this.second - other.second
}
}
}
// 定义一个SecondarySortApp对象,用于执行主程序
object SecondarySortApp {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("SecondarySortApp").setMaster("local")
// 创建SparkContext上下文
val sc = new SparkContext(conf)
// 读取文件数据
val lines = sc.textFile("file:///usr/local/spark/mycode/rdd/examples/file1.txt", 1)
// 将每行数据转换为SecondarySortKey对象
val pairWithSortKey = lines.map(line => new SecondarySortKey(line.split(" ")(0).toInt, line.split(" ")(1).toInt))
// 按照SecondarySortKey进行排序
val sorted = pairWithSortKey.sortByKey(false)
// 提取排序后的结果
val sortedResult = sorted.map(sortedLine => sortedLine._2)
// 打印排序结果
sortedResult.collect().foreach(println)
}
}
案例五:连接操作
任务描述:在推荐领域有一个著名的开放测试集,下载链接是:http://grouplens.org/datasets/movielens/,该测试集包含三个文件,分别是ratings.dat、sers.dat、movies.dat,具体介绍可阅读:README.txt。请编程实现:通过连接ratings.dat和movies.dat两个文件得到平均得分超过4.0的电影列表,采用的数据集是:ml-1m
import org.apache.spark._
import org.apache.spark.SparkContext._
object SparkJoin {
def main(args: Array[String]): Unit = {
// 检查命令行参数的数量是否正确,确保提供三个参数:评分文件路径、电影文件路径、输出路径
if (args.length != 3) {
println("usage is SparkJoin <rating> <movie> <output>")
return
}
val conf = new SparkConf().setAppName("SparkJoin").setMaster("local")
// 创建Spark上下文对象
val sc = new SparkContext(conf)
try {
// 从HDFS文件系统读取评分数据
val textFile = sc.textFile(args(0))
// 提取(movieId, rating)键值对
val ratings = textFile.map(line => {
val fields = line.split("::")
(fields(1).toInt, fields(2).toDouble)
})
// 计算每个电影的平均评分
val movieScores = ratings
.groupByKey() // 将相同电影ID的评分组合在一起
.map(data => { // 对每个电影ID的评分组计算平均值
val avg = data._2.sum / data._2.size
(data._1, avg)
})
// 从HDFS文件系统读取电影数据
val movies = sc.textFile(args(1))
// 提取(MovieID, MovieName)键值对,并基于MovieID创建键值对
val moviesKey = movies.map(line => {
val fields = line.split("::")
(fields(0).toInt, fields(1)) // (MovieID, MovieName)
}).keyBy(tup => tup._1)
// 通过join操作合并电影评分和电影信息,过滤出平均评分大于4.0的电影,并格式化输出
val result = movieScores
.keyBy(tup => tup._1) // 基于电影ID创建键值对
.join(moviesKey) // 将评分与电影信息进行连接
.filter(f => f._2._1._2 > 4.0) // 过滤出平均评分大于4.0的电影
.map(f => (f._1, f._2._1._2, f._2._2._2)) // 格式化为 (MovieID, AverageRating, MovieName)
// 将结果保存到指定的输出路径
result.saveAsTextFile(args(2))
} finally {
// 确保在程序结束时停止Spark上下文
sc.stop()
}
}
}
wordcount两道:
MapReduce实现wordcount
package org.apache.hadoop.examples;
import java.io.IOException;
import java.util.Iterator;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
public class WordCount {
// 默认构造函数,无参数。
public WordCount() {
}
public static void main(String[] args) throws Exception {
// 创建一个配置对象,用于读取命令行参数和配置文件。
Configuration conf = new Configuration();
// 解析命令行参数,并将非Hadoop通用选项的参数分离出来。
String[] otherArgs = (new GenericOptionsParser(conf, args)).getRemainingArgs();
// 检查输入参数是否正确。至少需要两个参数:一个或多个输入路径和一个输出路径。
if (otherArgs.length < 2) {
System.err.println("Usage: wordcount <in> [<in>...] <out>");
System.exit(2);
}
// 初始化一个新的MapReduce作业,并设置其名称为"word count"。
Job job = Job.getInstance(conf, "word count");
// 设置包含main方法的类作为作业的主类,以便找到相关的Mapper、Reducer和其他资源。
job.setJarByClass(WordCount.class);
// 设置Mapper类,它负责处理输入数据并生成中间键值对。
job.setMapperClass(TokenizerMapper.class);
// 设置Combiner类(可选),它在映射阶段后立即对中间结果进行局部聚合,以减少传输的数据量。
job.setCombinerClass(IntSumReducer.class);
// 设置Reducer类,它负责接收来自Mapper的中间键值对,并执行最终的聚合操作。
job.setReducerClass(IntSumReducer.class);
// 定义作业的输出格式,指定键和值的类型。
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
// 添加所有提供的输入路径到作业中。最后一个参数总是作为输出路径。
for (int i = 0; i < otherArgs.length - 1; ++i) {
FileInputFormat.addInputPath(job, new Path(otherArgs[i]));
}
// 设置输出目录,该目录必须不存在。
FileOutputFormat.setOutputPath(job, new Path(otherArgs[otherArgs.length - 1]));
// 提交作业并等待完成,成功返回0,失败返回1。
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
// Reducer类用于汇总每个单词出现的次数。
public static class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
private IntWritable result = new IntWritable();
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context)
throws IOException, InterruptedException {
int sum = 0;
// 遍历所有的IntWritable值,累加它们以计算总和。
for (IntWritable val : values) {
sum += val.get(); // 将IntWritable转换为原始int类型
}
// 设置sum到result中,以便可以序列化。
result.set(sum);
// 输出<key, result>对到context,即单词及其出现的次数。
context.write(key, result);
}
}
// Mapper类负责将输入文本拆分为单词,并为每个单词生成一个计数为1的键值对。
public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable> {
private final static IntWritable one = new IntWritable(1); // 代表每个单词出现一次
private Text word = new Text(); // 用于存储当前处理的单词
@Override
protected void map(Object key, Text value, Context context)
throws IOException, InterruptedException {
// 使用StringTokenizer来分割文本行中的单词。
StringTokenizer itr = new StringTokenizer(value.toString());
// 对于每一个单词,创建一个<单词, 1>键值对并写入到context中。
while (itr.hasMoreTokens()) {
word.set(itr.nextToken()); // 设置当前单词
context.write(word, one); // 写入键值对到context中
}
}
}
}
Spark SQL实现wordcount
package com.ht.final.wordcount
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.SparkConf
object WordCountSparkSQL {
def main(args: Array[String]): Unit = {
// 初始化Spark配置,并创建一个本地模式的SparkSession。
// local[*]表示使用所有可用的处理器核心来运行任务。
val sparkConf = new SparkConf().setAppName("WordCount").setMaster("local[*]")
val spark = SparkSession.builder.config(sparkConf).getOrCreate()
// 设置日志级别为警告,减少控制台输出的日志信息量。
spark.sparkContext.setLogLevel("WARN")
import spark.implicits._ // 导入隐式转换,用于支持DataFrame/Dataset操作。
try {
// 读取文本文件内容到Dataset中,每行作为一个字符串元素。
val fileDS: Dataset[String] = spark.read.textFile("D:\\Document\\temp\\wordcount\\input.txt")
// 将每一行按照制表符('\t')分割成多个单词,形成一个新的包含单个单词的Dataset。
val wordDS: Dataset[String] = fileDS.flatMap(_.split("\t"))
// 注册临时视图(类似数据库表),以便能够通过SQL查询访问数据。
wordDS.createOrReplaceTempView("word_count")
// 定义SQL查询语句,计算每个单词出现的次数,并按出现次数降序排列。
val sqlQuery = "SELECT value AS word, COUNT(*) AS counts FROM word_count GROUP BY word ORDER BY counts DESC"
// 执行SQL查询并获取结果作为DataFrame。
val resultDF: DataFrame = spark.sql(sqlQuery)
// 展示查询结果的前20行,默认情况下。
resultDF.show()
} finally {
// 确保在程序结束时关闭资源。
spark.stop()
}
}
}