Spark 安装及WordCount编写(Spark、Scala、java三种方法)

news2024/11/15 10:34:43

Spark 官网:Apache Spark™ - Unified Engine for large-scale data analytics

Spark RDD介绍官网:https://spark.apache.org/docs/2.2.0/api/scala/index.html#org.apache.spark.rdd.RDD

下载好spark解压mv到软件目录

linux>mv spark-xxx-xxx /opt/install/spark

修改配置文件进入spark/conf

linux>vi spark-env.sh
#添加如下配置
export JAVA_HOME=/opt/install/jdk
export SPARK_MASTER_IP=192.168.58.200
export HADOOP_CONF_DIR=$HADOOP_HOME/etc/hadoop

linux>vi slaves
192.168.58.201
192.168.58.202

启动spark服务

linux>sbin/start-all.sh
linux>jps    #查看服务进程
192.168.58.200 启动spark jps 后有Master
192.168.58.201 启动spark jps 后有Worker
192.168.58.202 启动spark jps 后有Worker

访问Spark Web UI 用浏览器输入IP:8080 即可

spark Shell 用于测试

Linux>bin/spark-shell 用于测试
或bin/spark-shell --master spark://IP:7070 (会启用stanlang集器 )

测试数据:(将数据上传到hdfs上 例:hdfs dfs -put testdata.txt  /mycluster/tmp_data/sparkdata)

linux>testdata.txt
hello zhangsan hello lisi hello wangwu hello xiaohong
hello lisi2 hello xiaoming hello zhangsan
hello zhangsan2 hello lisi3 hello wangwu2
zhang san li si

Spark Shell WordCount

spark(scala)>val lines=sc.textFile("/root/tmp_data/sparkdata/testdata.txt")	(创建RDD,指定hdfs上的文件)
spark(scala)>val lines=sc.textFile("file:///root/tmp_data/sparkdata/testdata.txt")	(创建RDD,指定本地上的文件)
spark(scala)>lines
spark(scala)>lines.take(5)    (查看指定范围)
spark(scala)>lines.collect		(收集查看全部)
spark(scala)>lines.flatMap(x=>x.split(" ")).take(10)
spark(scala)>lines.flatMap(x=>x.split(" ")).map(x=>(x,1)).take(10)
spark(scala)>lines.flatMap(x=>x.split(" ")).map(x=>(x,1)).reduceByKey((x,y)=>x+y).take(10)

spark(scala)>lines.flatMap(x=>x.split(" ")).map(x=>(x,1)).reduceByKey((x,y)=>x+y).sortBy(_._2,false).take(5)    (查看指定范围的WordCount)

spark(scala)>lines.flatMap(x=>x.split(" ")).map(x=>(x,1)).reduceByKey((x,y)=>x+y).sortBy(_._2,false).saveAsTextFile("file:///root/tmp_data/sparkdata/output")    (WordCount 后将结果保存到本地)

spark(scala)>lines.flatMap(x=>x.split(" ")).map(x=>(x,1)).reduceByKey((x,y)=>x+y).sortBy(_._2,false).saveAsTextFile("/root/tmp_data/sparkdata/output")    (WordCount 后将结果保存到hdfs)

linux>hdfs dfs -cat /xxx/xxx 查看数据

IDEA pom.xml配置:

<properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>2.11.8</version>
        </dependency>


        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
            <version>2.2.0</version>
        </dependency>

    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>net.alchim31.maven</groupId>
                <artifactId>scala-maven-plugin</artifactId>
                <version>3.2.2</version>
                <executions>
                    <execution>
                        <goals>
                            <goal>compile</goal>
                            <goal>testCompile</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>2.4.3</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <filters>
                                <filter>
                                    <artifact>*:*</artifact>
                                    <excludes>
                                        <exclude>META-INF/*.SF</exclude>
                                        <exclude>META-INF/*.DSA</exclude>
                                        <exclude>META-INF/*.RSA</exclude>
                                    </excludes>
                                </filter>
                            </filters>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

SparkWordCount

package SparkTest

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object SparkWordCount {
/*  1.创建Sparkcontext
  2.创建RDD
  3.调用transformation算子
  4.调用action算子
  5.释放资源*/
  def main(args:Array[String]):Unit={
/*    val conf =new SparkConf().setAppName("SparkWrodCount")
    val sc = new SparkContext(conf)
    val lines: RDD[String]=sc.textFile("/mycluster/tmp_data/sparkdata")
    val wordes: RDD[String]=lines.flatMap(_.split(" "))
    val wordAndOne: RDD[(String,Int)] = wordes.map((_,1))
    val reduced: RDD[(String,Int)]=wordAndOne.reduceByKey(_+_)
    val result: RDD[(String,Int)]=reduced.sortBy(_._2,false)
    result.saveAsTextFile("/mycluster/tmp_data/output")
    sc.stop()*/

    val conf =new SparkConf().setAppName("SparkWrodCount")
    val sc = new SparkContext(conf)
    val lines:RDD[String]=sc.textFile(args(0))
    val wordes:RDD[String]=lines.flatMap(_.split(" "))
    val wordAndOne:RDD[(String,Int)] = wordes.map((_,1))
    val reduced:RDD[(String,Int)]=wordAndOne.reduceByKey(_+_)
    val result:RDD[(String,Int)]=reduced.sortBy(_._2,false)
    result.saveAsTextFile(args(1))
    sc.stop()
  }
}

打好包后运行

linux>bin/spark-submit --master spark://192.168.58.200:7077 --class SparkTest.SparkWordCount  /root/tmp_data/sparkdata/SparkDemo-1.0-SNAPSHOT.jar  /mycluster/tmp_data/sparkdata  /mycluster/tmp_data/output

JavaLamdaWordCount

package WordCount;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import scala.Tuple2;

import java.util.Arrays;

public class JavaLamdaWordCount {
    public static void main(String[] args) {
        final SparkConf conf = new SparkConf().setAppName("JavaLamdaWordCount");
        final JavaSparkContext jsc = new JavaSparkContext(conf);
        final JavaRDD<String> lines = jsc.textFile(args[0]);
        final JavaRDD<String> words=lines.flatMap(line -> Arrays.asList(line.split(" ")).iterator());
        final JavaPairRDD<String,Integer> wordAndOne = words.mapToPair(Word -> new Tuple2<>(Word,1));
        final JavaPairRDD<String,Integer> reduced = wordAndOne.reduceByKey((x,y) -> x + y);
        final JavaPairRDD<Integer,String> swaped = reduced.mapToPair(tuple -> tuple.swap());
        final JavaPairRDD<Integer,String> sorted = swaped.sortByKey(false);
        final JavaPairRDD<String,Integer> result = sorted.mapToPair(tuple -> tuple.swap());
        result.saveAsTextFile(args[1]);

        jsc.stop();

    }
}

打好包后运行

linux>bin/spark-submit --master spark://192.168.58.200:7077 --class WordCount.JavaLamdaWordCount  /root/tmp_data/sparkdata/SparkDemo-1.0-SNAPSHOT.jar  /mycluster/tmp_data/sparkdata  /mycluster/tmp_data/output

JavaWordCount

package WordCount;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;

import java.util.Arrays;
import java.util.Iterator;

public class JavaWordCount {
    public static void main(String[] args) {
        final SparkConf conf = new SparkConf().setAppName("JavaWordCount");
        final JavaSparkContext jsc = new JavaSparkContext(conf);
      //  final JavaRDD<String> lines=jsc.textFile("/mycluster/tmp_data/sparkdata");
        final JavaRDD<String> lines=jsc.textFile(args[0]);
        final JavaRDD<String> words=lines.flatMap(new FlatMapFunction<String,String>() {
            @Override
            public Iterator<String> call(String line) throws Exception{
//              final  String[] words=line.split(" ");
//              final List<String> lists= Arrays.asList(words);
//              return  lists.iterator();
                return Arrays.asList(line.split(" ")).iterator();
            }
        });
        final JavaPairRDD<String,Integer> wordAndOne = words.mapToPair(new PairFunction<String,String,Integer>(){
           @Override
           public Tuple2<String,Integer> call(String word) throws Exception{
               return new Tuple2<>(word,1);
           }
        });

        final JavaPairRDD<String,Integer> reduced = wordAndOne.reduceByKey(new Function2<Integer, Integer, Integer>() {
            @Override
            public Integer call(Integer v1, Integer v2) throws Exception {
                return v1 + v2;
            }
        });

        final JavaPairRDD<Integer,String> swaped=reduced.mapToPair(new PairFunction<Tuple2<String, Integer>, Integer, String>() {
            @Override
            public Tuple2<Integer, String> call(Tuple2<String, Integer> Tuple2) throws Exception {
                return Tuple2.swap();
            }
        });
        final JavaPairRDD<Integer,String> sorted = swaped.sortByKey(false);

        final JavaPairRDD<String,Integer> reuslt = sorted.mapToPair(new PairFunction<Tuple2<Integer, String>, String, Integer>() {
            @Override
            public Tuple2<String, Integer> call(Tuple2<Integer, String> Tuple2) throws Exception {
                return Tuple2.swap();
            }
        });
//integerString
//        reuslt.saveAsTextFile("/mycluster/tmp_data/output");
        reuslt.saveAsTextFile(args[1]);
            jsc.stop();

    }
}

打好后包后运行

linux>bin/spark-submit --master spark://192.168.58.200:7077 --class WordCount.JavaWordCount  /root/tmp_data/sparkdata/SparkDemo-1.0-SNAPSHOT.jar  /mycluster/tmp_data/sparkdata  /mycluster/tmp_data/output

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/416667.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

统计套利策略

统计套利策略套利策略跨品种套利标的择时风控套利策略 套利是&#xff0c;某种商品在&#xff08;在同一市场或不同市场&#xff09;拥有两个价格的情况下&#xff0c;以较低的价格买进&#xff0c;较高的价格卖出&#xff0c;从而实现获利的交易方式。 比如咖啡店里有小杯、…

【jvm系列-04】精通运行时数据区共享区域---堆

JVM系列整体栏目 内容链接地址【一】初识虚拟机与java虚拟机https://blog.csdn.net/zhenghuishengq/article/details/129544460【二】jvm的类加载子系统以及jclasslib的基本使用https://blog.csdn.net/zhenghuishengq/article/details/129610963【三】运行时私有区域之虚拟机栈…

chapter-6数据库设计原则

以下课程来源于MOOC学习—原课程请见&#xff1a;数据库原理与应用 考研复习 数据库设计 数据库设计是基于应用系统需求分析中对数据的需求&#xff0c;解决数据的抽象、数据的表达和数据的存储等问题&#xff0c;其目标是设计出一个满足应用要求&#xff0c;简洁、高效、规范…

【c语言】二维数组

创作不易&#xff0c;本篇文章如果帮助到了你&#xff0c;还请点赞支持一下♡>&#x16966;<)!! 主页专栏有更多知识&#xff0c;如有疑问欢迎大家指正讨论&#xff0c;共同进步&#xff01; 给大家跳段街舞感谢支持&#xff01;ጿ ኈ ቼ ዽ ጿ ኈ ቼ ዽ ጿ ኈ ቼ ዽ ጿ…

「计算机控制系统」3. 计算机控制系统的数学描述

差分方程 Z变换 脉冲传递函数 计算机控制系统的响应 文章目录差分方程基础知识差分方程的解Z变换定义与性质求Z变换Z变换表求Z反变换用Z变换解差分方程脉冲传递函数脉冲传递函数与差分方程的相互转化开环脉冲传递函数闭环脉冲传递函数计算机控制系统的响应差分方程 基础知识 …

Photoshop CS6安装包下载及安装教程(Photoshop 2016)

下载链接&#xff1a; https://pan.quark.cn/s/f961759b36cc “Adobe Photoshop是一款集图像扫描、编辑修改、图像制作、广告创意、图像输入输出于一体的图形图像处理软件,简称ps,对于这款软件大家应该并不陌生,而今天小编带来的是Photoshop2023中文版,也是该系列的最新版本,不…

WAF攻防-菜刀冰蝎哥斯拉流量通讯特征绕过检测反制感知

文章目录菜刀-流量&绕过&特征&检测特征绕过检测冰蝎3-流量&绕过&特征&检测特征通讯过程检测绕过哥斯拉-流量&绕过&特征&检测特征Other使用Proxifier进行流量转发至Burp抓包分析(使用Wireshake也可以) 菜刀-流量&绕过&特征&检…

Java阶段一Day21

Java阶段一Day21 文章目录Java阶段一Day21多线程并发原理使用场景创建并启动线程创建线程的方法进程线程的生命周期获取线程信息的方法教师总结新单词多线程概念线程:一个顺序的单一的程序执行流程就是一个线程。代码一句一句的有先后顺序的执行。多线程:多个单一顺序执行的流程…

最新!AI第一次有了国家标准,北大、华为、百度等单位共同编制

最近&#xff0c;国家标准全文公开系统网站正式发布了国家标准《神经网络表示与模型压缩 第一部分&#xff1a;卷积神经网络》&#xff08;GB/T 42382.1-2023&#xff09;&#xff0c;此标准由北京大学、鹏城实验室、华为、百度等16家单位共同编制。 &#xff08;来源&#xff…

考试前临时抱佛脚有用吗?这篇复习攻略会告诉你答案

夏天来了&#xff0c;大家的期末考试也将不远了。不知平时大家是如何准备考试的&#xff0c;是平时学习计划有序进行复习&#xff0c;还是考试前临时抱佛脚呢&#xff1f;今天就来跟大家讲一讲&#xff0c;学习中很重要的一个环节&#xff0c;如何复习。所以敲黑板&#xff0c;…

基于 AT89C51 单片机的数字时钟设计

目录 1.设计目的、作用 2.设计要求 3.设计的具体实现 3.1 设计原理 3.2 硬件系统设计 3.2.1 AT89C51 单片机原理 3.2.2 晶振电路设计 3.2.3 复位电路设计 3.2.4 LED 数码管显示 3.3 系统实现 3.3.1 系统仿真与调试 3.3.2 演示结果 4.总结 附录 附录 1 附录 2 1.…

【JVM】常量池

常量池&#xff08;Runtime Constant Poo&#xff09; 常量池Java中可以分为三种&#xff1a;字符串常量池&#xff08;堆&#xff09;、Class文件常量池、运行时常量池&#xff08;堆&#xff09;。 1.字符串常量池&#xff08;String Pool&#xff09; 为了提升性能和减少…

C++变量限定

C的变量限定指可以在变量类型的基础上加上特殊的限定条件&#xff0c;主要包括&#xff1a;是不是const&#xff0c;是不是volatile&#xff0c;是左值还是右值&#xff0c;是不是引用&#xff0c;是左值引用还是右值引用&#xff0c;等等。 1. 为什么要研究这个东西 主要是c…

练习之烦人的递归

文章目录1.删除公共字符2.读入一串以?结束的字符串&#xff0c;逆序输出。法一&#xff1a;常规递归法二&#xff1a;投机取巧3.递归将整数输出为字符串4.递归输出1--n的平方和5.递归计算222222...6.递归求最大公约数7.递归输出x的n次方8. 递归计算下列式子的值1.删除公共字符…

从FPGA说起的深度学习(六)-任务并行性

这是新的系列教程&#xff0c;在本教程中&#xff0c;我们将介绍使用 FPGA 实现深度学习的技术&#xff0c;深度学习是近年来人工智能领域的热门话题。在本教程中&#xff0c;旨在加深对深度学习和 FPGA 的理解。用 C/C 编写深度学习推理代码高级综合 (HLS) 将 C/C 代码转换为硬…

还在回想Midjourney的参数,Claude来帮你

本来是想整理一份Midjourne使用的参数表&#xff0c;看来是不用了&#xff0c;Claude很方便的就能帮到我们。 问下Claude: 你知道 Midjourne 的参数吗 Claude 回答如下&#xff1a; 是回答出来了&#xff0c;但是并不是非常的好&#xff0c;我们继续引导他一下&#xff1a; 你…

堆的实际应用(topk问题以及堆排序)

目录 前言&#xff1a; 一:解决topk问题 二:堆排序 【1】第一种方法(很少用) 【2】第二种方法(很实用) 前言&#xff1a; 上一次我们进行了二叉树的初步介绍并实现了堆的基本功能&#xff0c;但堆的作用并不是存储数据&#xff0c;它可以用来解决topk问题(求一组数据较大或…

【C语言】9000字长文操作符详解

简单不先于复杂&#xff0c;而是在复杂之后。 目录 1. 操作符分类 2. 算数操作符 3. 移位操作符 3.1 左移操作符 3.2 右移操作符 4. 位操作符 4.1 按位与 & 4.2 按位或 | 4.3 按位异或 ^ 4.4 一道变态的面试题 4.5 练习 5. 赋值操作符 5.1 复合赋值…

主流接口测试框架对比,究竟哪个更好用

公司计划系统的开展接口自动化测试&#xff0c;需要我这边调研一下主流的接口测试框架给后端测试&#xff08;主要测试接口&#xff09;的同事介绍一下每个框架的特定和使用方式。后端同事根据他们接口的特点提出一下需求&#xff0c;看哪个框架更适合我们。 需求 1、接口编写…

项目工作分解工具WBS

WBS工作分解结构&#xff08;Work Breakdown Structure&#xff09;&#xff0c;是一个描述思路的规划和设计工具&#xff0c;它可以清晰地表示各项目之间相互联系的结构&#xff0c;详细说明为完成项目所必须完成的各项工作&#xff0c;也可以向高层管理者和客户报告项目完成的…