实验原理
Spark的核心就是RDD,所有在RDD上的操作会被运行在Cluster上,Driver程序启动很多Workers,Workers在(分布式)文件系统中读取数据后转化为RDD(弹性分布式数据集),然后对RDD在内存中进行缓存和计算。
而RDD有两种类型的操作 ,分别是Action(返回values)和Transformations(返回一个新的RDD)。
一、数据展示与前置准备
某电商网站记录了大量用户对商品的收藏数据,并将数据存储在名为buyer_favorite1的文件中,数据格式以及数据内容如下
在进行后续操作前,请先开启hadoop和spark服务。可以通过jps命令查看进程是否开启完整。
二、创建scala工程项目
1、开发环境:eclipse
打开已安装完Scala插件的Eclipse,新建一个Scala项目,命名为spark4。
在spark4项目下新建包名,命名为my.scala。将scala object命名为ScalaWordCount。
2、导入运行所需要的jar包。
右键项目,创建一个文件夹,名为lib。
将jar包导入进来,再右键jar包,点击Build Path=>Add to Build Path。(可以去我的资源里面下载spark1.x hadoop2.x)
3、编写Scala语句,并统计用户收藏数据中,每个用户收藏商品数量。
package my.scala
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
object ScalaWordCount {
def main(args: Array[String]) {
//创建Spark的配置对象sparkConf,设置Spark程序运行时的配置信息;
val conf = new SparkConf()
conf.setMaster("local") .setAppName("scalawordcount")
//创建SparkContext对象,SparkContext是Spark程序所有功能的唯一入口,无论采用Scala、Java还是Python都必须有一个SparkContext;
val sc = new SparkContext(conf)
val rdd = sc.textFile("hdfs://localhost:9000/myspark/buyer_favorite1") //根据具体的数据来源,通过SparkContext来创建RDD;
//对初始的RDD进行Transformation级别的处理。(首先将每一行的字符串拆分成单个的单词,然后在单词拆分的基础上对每个单词实例计数为1;
//最后,在每个单词实例计数为1的基础上统计每个单词在文件出现的总次数)。
rdd.map(line => (line.split("\t")(0), 1))
.reduceByKey(_ + _)
.collect()
.foreach(println)
sc.stop()
}
}
在控制界面console中查看的输出结果。
三、创建Java工程项目
再次右键点击项目名,新建package,将包命名为my.java 。
右键点击包my.java,新建Class,命名为JavaWordCount。
1、编写Java代码,统计用户收藏数据中,每个用户收藏商品数量。
package my.java;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;
import java.util.Arrays;
import java.util.List;
import java.util.regex.Pattern;
public final class JavaWordCount {
private static final Pattern SPACE = Pattern.compile("\t");
public static void main(String[] args) throws Exception {
SparkConf sparkConf = new SparkConf().setMaster("local").setAppName("JavaWordCount");
JavaSparkContext ctx = new JavaSparkContext(sparkConf);
JavaRDD<String> lines = ctx.textFile("hdfs://localhost:9000/myspark/buyer_favorite1");
JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
@Override
public Iterable<String> call(String s) {
String word[]=s.split("\t",2);
return Arrays.asList(word[0]);
}
});
JavaPairRDD<String, Integer> ones = words.mapToPair(new PairFunction<String, String, Integer>() {
@Override
public Tuple2<String, Integer> call(String s) {
return new Tuple2<String, Integer>(s, 1);
}
});
JavaPairRDD<String, Integer> counts = ones.reduceByKey(new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer i1, Integer i2) {
return i1 + i2;
}
});
List<Tuple2<String, Integer>> output = counts.collect();
System.out.println(counts.collect());
counts.saveAsTextFile("hdfs://localhost:9000/myspark/out");
ctx.stop();
}
}
2、在linux终端查看输出结果
执行如下命令查看结果,前提是已启动集群
hadoop fs -cat /myspark/out/part-00000
写在最后
由此可以看出,scala语言在编写spark程序时的优越性,简短精炼。