IDEA编写各种WordCount运行

news2025/1/20 3:54:28

目录

一、编写WordCount(Spark_scala)提交到spark高可用集群

1.项目结构

2.导入依赖

3.编写scala版的WordCount

4.maven打包

5.运行jar包

​6.查询hdfs的输出结果

二、本地编写WordCount(Spark_scala)读取本地文件

1.项目结构

2.编写scala版的WordCount

3.编辑Edit Configurations配置文件

4.直接本地运行LocalWordCount.scala文件

三、本地编写WordCount(Spark_java版)读取本地文件,并输出到本地

1.项目结构

2.编写java版的WordCount

3.编辑Edit Configurations配置文件

4.运行后查看结果

四、本地编写WordCount(Spark_java_lambda版)读取本地文件,并输出到本地

1.项目结构

2.编写java-lambda版的WordCount

3.编辑Edit Configurations配置文件

4.运行后查看结果


搞了一个晚上加一个白天,总算搞出来了,呼~~

本地编写IDEA之前需要在windows下安装scala、hadoop和spark环境,参考文章如下:

《Scala安装》 《Windows环境部署Hadoop-3.3.2和Spark3.3.2》

一、编写WordCount(Spark_scala)提交到spark高可用集群

首先安装好scala,然后在IDEA创建一个maven项目,开始编写代码

1.项目结构

2.导入依赖

<name>spark-in-action</name>
    <url>http://maven.apache.org</url>

    <!-- 定义的一些常量 -->
    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
        <encoding>UTF-8</encoding>
        <spark.version>3.3.2</spark.version>
        <scala.version>2.12.15</scala.version>
    </properties>

    <dependencies>
        <!-- scala的依赖 -->
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>${scala.version}</version>
        </dependency>

        <!-- spark core 即为spark内核 ,其他高级组件都要依赖spark core -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.12</artifactId>
            <version>${spark.version}</version>
        </dependency>

    </dependencies>

    <!-- 配置Maven的镜像库 -->
    <!-- 依赖下载国内镜像库 -->
    <repositories>
        <repository>
            <id>nexus-aliyun</id>
            <name>Nexus aliyun</name>
            <layout>default</layout>
            <url>http://maven.aliyun.com/nexus/content/groups/public</url>
            <snapshots>
                <enabled>false</enabled>
                <updatePolicy>never</updatePolicy>
            </snapshots>
            <releases>
                <enabled>true</enabled>
                <updatePolicy>never</updatePolicy>
            </releases>
        </repository>
    </repositories>

    <!-- maven插件下载国内镜像库 -->
    <pluginRepositories>
        <pluginRepository>
            <id>ali-plugin</id>
            <url>http://maven.aliyun.com/nexus/content/groups/public/</url>
            <snapshots>
                <enabled>false</enabled>
                <updatePolicy>never</updatePolicy>
            </snapshots>
            <releases>
                <enabled>true</enabled>
                <updatePolicy>never</updatePolicy>
            </releases>
        </pluginRepository>
    </pluginRepositories>

    <build>
        <pluginManagement>
            <plugins>
                <!-- 编译scala的插件 -->
                <plugin>
                    <groupId>net.alchim31.maven</groupId>
                    <artifactId>scala-maven-plugin</artifactId>
                    <version>3.2.2</version>
                </plugin>
                <!-- 编译java的插件 -->
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-compiler-plugin</artifactId>
                    <version>3.5.1</version>
                </plugin>
            </plugins>
        </pluginManagement>
        <plugins>
            <plugin>
                <groupId>net.alchim31.maven</groupId>
                <artifactId>scala-maven-plugin</artifactId>
                <executions>
                    <execution>
                        <id>scala-compile-first</id>
                        <phase>process-resources</phase>
                        <goals>
                            <goal>add-source</goal>
                            <goal>compile</goal>
                        </goals>
                    </execution>
                    <execution>
                        <id>scala-test-compile</id>
                        <phase>process-test-resources</phase>
                        <goals>
                            <goal>testCompile</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>

            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <executions>
                    <execution>
                        <phase>compile</phase>
                        <goals>
                            <goal>compile</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>

            <!-- 打jar插件 -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>2.4.3</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <filters>
                                <filter>
                                    <artifact>*:*</artifact>
                                    <excludes>
                                        <exclude>META-INF/*.SF</exclude>
                                        <exclude>META-INF/*.DSA</exclude>
                                        <exclude>META-INF/*.RSA</exclude>
                                    </excludes>
                                </filter>
                            </filters>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

3.编写scala版的WordCount

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object WordCount {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("WordCount")
    // 1.创建SparkContext
    val sc = new SparkContext(conf)

    // 2.创建RDD(神奇的大集合,该集合中不装真正要计算的数据,而是装描述信息)
    val lines: RDD[String] = sc.textFile(args(0))

    // 3.对RDD进行操作,调用RDD的方法
    // -------------Transformation (转换算子开始) --------------
    // 切分压平
    val words: RDD[String] = lines.flatMap(_.split(" "))

    // 将单词和1组合放入到元组中
    val wordAndOne: RDD[(String, Int)] = words.map((_, 1))

    // 将key相同的数据进行分组聚合
    val reduced: RDD[(String, Int)] = wordAndOne.reduceByKey(_ + _)

    // 按照次数排序
    val sorted: RDD[(String, Int)] = reduced.sortBy(_._2, false) // 降序
    // -------------Transformation (转换算子结束) --------------

    // 4.调用Action
    // 将数据写入到外部的存储系统中
    sorted.saveAsTextFile(args(1))

    // 5.释放资源
    sc.stop()
  }
}

4.maven打包

这个是胖包,除了项目本身的依赖,还有其他依赖,上面的original是瘦包,只有项目本身的依赖

5.运行jar包

首先启动zk、hdfs和spark高可用集群,这里我搭建的是standalone模式的高可用集群,不是on Yarn的

创建/opt/soft/spark-3.2.3/submit目录,将jar包上传到该目录下

提交命令

[root@node141 submit]# ../bin/spark-submit --master spark://node141:7077 --class cn.doitedu.day01.WordCount --executor-memory 1g --total-executor-cores 4 ./spark-in-action-1.0.jar hdfs://node141:9000/words.txt hdfs://node141:9000/out-1

--master spark://node141:7077     spark的master节点

--class cn.doitedu.day01.WordCount   运行的类名

--executor-memory 1g   占用的内存

--total-executor-cores 4  占用的核数

./spark-in-action-1.0.jar  运行的jar包地址

hdfs://node141:9000/words.txt   代码中args[0]对应的参数

hdfs://node141:9000/out-1  代码中args[1]对应的参数

6.查询hdfs的输出结果

二、本地编写WordCount(Spark_scala)读取本地文件

1.项目结构

2.编写scala版的WordCount

package cn.doitedu.day01

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

/**
 * 使用本地模式运行Spark程序(开发调试的时候使用)
 */
object LocalWordCount {
  def main(args: Array[String]): Unit = {
    // 指定当前用户为root
    System.setProperty("HADOOP_USER_NAME", "root")

    val conf = new SparkConf()
      .setAppName("WordCount")
      .setMaster("local[*]") // 本地模式,*表示根据当前机器的核数开多个线程

    // 1.创建SparkContext
    val sc = new SparkContext(conf)

    // 2.创建RDD(神奇的大集合,该集合中不装真正要计算的数据,而是装描述信息)
    val lines: RDD[String] = sc.textFile(args(0))

    // 3.对RDD进行操作,调用RDD的方法
    // -------------Transformation (转换算子开始) --------------
    // 切分压平
    val words: RDD[String] = lines.flatMap(line => {
      val words = line.split(" ")
      println(words)  // debug
      words
    })

    // 将单词和1组合放入到元祖中
    val wordAndOne: RDD[(String, Int)] = words.map((_, 1))

    // 将key相同的数据进行分组聚合
    val reduced: RDD[(String, Int)] = wordAndOne.reduceByKey(_ + _)

    // 按照次数排序
    val sorted: RDD[(String, Int)] = reduced.sortBy(_._2, false) // 降序
    // -------------Transformation (转换算子结束) --------------

    // 4.调用Action
    // 将数据写入到外部的存储系统中
    sorted.saveAsTextFile(args(1))

    // 5.释放资源
    sc.stop()
  }
}

3.编辑Edit Configurations配置文件

4.直接本地运行LocalWordCount.scala文件

查看运行结果

三、本地编写WordCount(Spark_java版)读取本地文件,并输出到本地

1.项目结构

2.编写java版的WordCount

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;

import java.util.Arrays;
import java.util.Iterator;

public class JavaWordCount {
    public static void main(String[] args) {
        SparkConf conf = new SparkConf()
                .setAppName("JavaWordCount")
                .setMaster("local[*]");
        JavaSparkContext jsc = new JavaSparkContext(conf);

        JavaRDD<String> lines = jsc.textFile(args[0]);

        JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
            @Override
            public Iterator<String> call(String lines) throws Exception {
                String[] words = lines.split("\\s+");
                return Arrays.asList(words).iterator();
            }
        });

        JavaPairRDD<String, Integer> wordAndOne = words.mapToPair(new PairFunction<String, String, Integer>() {
            @Override
            public Tuple2<String, Integer> call(String word) throws Exception {
                return Tuple2.apply(word, 1);
            }
        });

        JavaPairRDD<String, Integer> reduced = wordAndOne.reduceByKey(new Function2<Integer, Integer, Integer>() {
            @Override
            public Integer call(Integer v1, Integer v2) throws Exception {
                return v1 + v2;
            }
        });

        // 将原来的kv顺序颠倒  (flink,3)  ----> (3,flink)
        JavaPairRDD<Integer, String> swapped = reduced.mapToPair(new PairFunction<Tuple2<String, Integer>, Integer, String>() {
            @Override
            public Tuple2<Integer, String> call(Tuple2<String, Integer> tp) throws Exception {
                return tp.swap(); // 交换
            }
        });

        JavaPairRDD<Integer, String> sorted = swapped.sortByKey(false);

        JavaPairRDD<String, Integer> result = sorted.mapToPair(new PairFunction<Tuple2<Integer, String>, String, Integer>() {
            @Override
            public Tuple2<String, Integer> call(Tuple2<Integer, String> tp) throws Exception {
                return tp.swap();
            }
        });

        result.saveAsTextFile(args[1]);

        jsc.stop();
    }
}

3.编辑Edit Configurations配置文件

4.运行后查看结果

四、本地编写WordCount(Spark_java_lambda版)读取本地文件,并输出到本地

1.项目结构

2.编写java-lambda版的WordCount

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import scala.Tuple2;

import java.util.Arrays;

public class JavaLambdaWordCount {
    public static void main(String[] args) {
        SparkConf conf = new SparkConf()
                .setAppName("JavaLambdaWordCount")
                .setMaster("local[*]");
        JavaSparkContext jsc = new JavaSparkContext(conf);

        JavaRDD<String> lines = jsc.textFile(args[0]);
        JavaRDD<String> words = lines.flatMap(line -> Arrays.asList(line.split(" ")).iterator());
        JavaPairRDD<String, Integer> wordAndOne = words.mapToPair(w -> Tuple2.apply(w, 1));
//        JavaPairRDD<String, Integer> reduced = wordAndOne.reduceByKey((a, b) -> a + b);
        JavaPairRDD<String, Integer> reduced = wordAndOne.reduceByKey(Integer::sum);
        JavaPairRDD<Integer, String> swapped = reduced.mapToPair(Tuple2::swap);
        // 排序
        JavaPairRDD<Integer, String> sorted = swapped.sortByKey(false);
        // 调回来
        JavaPairRDD<String, Integer> result = sorted.mapToPair(Tuple2::swap);
        result.saveAsTextFile(args[1]);
        jsc.stop();
    }
}

3.编辑Edit Configurations配置文件

4.运行后查看结果

好啦~~

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1518429.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

mupdf渲染过程(一):颜色

mupdf除了解析PDF功能之外&#xff0c;还有一个强大的功能就是渲染文字和图像&#xff0c;本文介绍mupdf渲染过程中涉及到的颜色问题&#xff1a;包括颜色空间&#xff0c;颜色转换&#xff0c;lcms的使用。 1.初始化 mupdf初始化第一步是实例化fz_context *ctx&#xff0c;fz…

2W10-ASEMI适配器专用2W10

编辑&#xff1a;ll 2W10-ASEMI适配器专用2W10 型号&#xff1a;2W10 品牌&#xff1a;ASEMI 封装&#xff1a;WOB-4 最大重复峰值反向电压&#xff1a;1000V 最大正向平均整流电流(Vdss)&#xff1a;2A 功率(Pd)&#xff1a;中小功率 芯片个数&#xff1a;4 引脚数量…

钡铼技术有限公司R40路由器工业4G让养殖环境监控更高效

钡铼技术有限公司的R40路由器是一款专为养殖环境监控而设计的工业级4G路由器。该路由器的出现极大地提高了养殖行业的监控效率&#xff0c;为养殖场主和管理者提供了更可靠、高效的解决方案。本文将从功能特点、优势以及应用案例等方面介绍钡铼技术有限公司的R40路由器在养殖环…

【SpringBoot】自定义工具类实现Excel数据新建表存入MySQL数据库

&#x1f3e1;浩泽学编程&#xff1a;个人主页 &#x1f525; 推荐专栏&#xff1a;《深入浅出SpringBoot》《java对AI的调用开发》 《RabbitMQ》《Spring》《SpringMVC》《项目实战》 &#x1f6f8;学无止境&#xff0c;不骄不躁&#xff0c;知行合一 文章目录 …

hololens2发布unity设置

生成vs工程再向hololens发布时&#xff0c; Architecture选X64或ARM64都可以成功发布

python-0002-linux安装pycharm

下载软件包 下载地址&#xff1a;https://download.csdn.net/download/qq_41833259/88944791 安装 # 解压 tar -zxvf 你的软件包 # 进入软件解压后的路径&#xff0c;如解压到了/home/soft/pycharm cd /home/soft/pycharm cd bin # 执行启动命令 sh pycharm.sh # 等待软件启…

京东云主机+京美建站SaaS版

京美建站SaaS版 京美建站搭建企业网站、小程序、3000精美模板 链接:https://daili.jd.com/s?linkNo57UBX34BZMWGNFYTOCPVUE7SN36CCIPKLTFLPCUCPYBKSYYBIPS2BJ57GP7RACLDHU66X526ZOULMIXL2VN7DT7IHU 京东云主机&#xff0c;安全稳定&#xff0c;性能强劲&#xff0c;新客下单…

VMware安装Ubuntu 18.04.2

下载Ubuntu映像 下载地址&#xff1a;http://old-releases.ubuntu.com/releases/18.04/ 下载名称&#xff1a; ubuntu-18.04.2-desktop-amd64.iso 清华镜像站&#xff1a;https://mirrors.tuna.tsinghua.edu.cn/ubuntu-releases/ 阿里云镜像站&#xff1a;https://mirrors.ali…

ASP.NET区域检验云LIS平台源码 标本全生命周期管理

目录 一、云LIS系统功能亮点 二、收费项目管理 三、检验项目管理 系统功能 云LIS系统源码是一款全面的实验室信息管理系统源码&#xff0c;其主要功能包括样本管理、检测项目管理、质控管理、报告管理、数据分析、两癌筛查等多个方面。具有独立的配套SaaS模式运维管理系统&…

科研学习|论文解读——词汇量及其对主题表示的影响 (IPM, 2017)

原文标题 Vocabulary size and its effect on topic representation 摘要 这项研究调查了如何通过选择性地从被建模的文本语料库的词汇中删除术语来减少主题模型训练的计算开销。我们使用三个数据集&#xff0c;比较了删除单独出现的术语、前0.5%、1%和 5% 最频繁出现的术语以及…

Gitee配置SSH登录

一、背景 新入手的电脑&#xff0c;需要对Gitee上存放的项目进行更改上传&#xff0c;发现上传不了需要登录&#xff0c;便采用SSH密钥进行登录&#xff0c;防止远程管理工程中的信息泄露 二、前提 电脑已下载Git Bash工具&#xff0c;在项目下点击鼠标右键&#xff0c;进入…

多线程(线程池)

线程池 池的概念意味着可以复用, 减少创建, 销毁线程的消耗 即事先把需要使用的线程创建好, 放到 “池” 中, 需要的时候从 “池” 里取, 用完再放回 池里取 这样全程只创建和销毁线程一次(之说是一次哦, 没说一次创建和销毁多少个)即可 标准库线程池的使用 public class Main…

【解读】区块链和分布式记账技术标准体系建设指南

大家好&#xff0c;这里是苏泽。一个从业Java后端的区块链技术爱好者。 今天带大家来解读这份三部门印发的行业建设指南《区块链和分布式记账技术标准体系建设指南》 原文件可查看P020240112840724196854.pdf (www.gov.cn) 以下是个人解读&#xff0c;如有纰漏请指正&#xff…

学习Android的第二十八天

目录 Android Service (服务) 线程 Service (服务) Service 相关方法 Android 非绑定 Service startService() 启动 Service 验证 startService() 启动 Service 的调用顺序 Android 绑定 Service bindService() 启动 Service 验证 BindService 启动 Service 的顺序 …

《LeetCode热题100》笔记题解思路技巧优化_Part_3

《LeetCode热题100》笔记&题解&思路&技巧&优化_Part_3 &#x1f60d;&#x1f60d;&#x1f60d; 相知&#x1f64c;&#x1f64c;&#x1f64c; 相识&#x1f622;&#x1f622;&#x1f622; 开始刷题链表&#x1f7e2;1. 相交链表&#x1f7e2;2. 反转链表&…

html5cssjs代码 020 推荐网址

html5&css&js代码 020 推荐网址 一、代码二、解释 这段HTML代码定义了一个网页&#xff0c;它包含了一个标题、一些样式和一个表格。表格中列出了一些推荐的网址&#xff0c;包括序号、名称、网址、描述和备注。每个表格行都包含一个链接到相应网址的超链接。页面的样式…

一种基于宏和serde_json实现的rust web中统一返回类

本人rust萌新&#xff0c;写web碰到了这个&#xff0c;基于ChatGPT和文心一言学了宏&#xff0c;强行把这玩意实现出来了&#xff0c;做个学习记录&#xff0c;如果有更好的方法&#xff0c;勿喷。 先看效果&#xff0c;注意不支持嵌套&#xff0c;且kv映射要用>(因为它这个…

《JAVA与模式》之抽象工厂模式

系列文章目录 文章目录 系列文章目录前言一、使用简单工厂模式的解决方案二、引进抽象工厂模式三、抽象工厂模式结构四、抽象工厂模式的优缺点前言 前些天发现了一个巨牛的人工智能学习网站,通俗易懂,风趣幽默,忍不住分享一下给大家。点击跳转到网站,这篇文章男女通用,看…

MC78L05ACDR2G线性稳压器芯片中文资料规格书PDF数据手册引脚图参数图片价格

产品概述&#xff1a; MC78L00A系列线性稳压器价格便宜&#xff0c;易于使用&#xff0c;适用于各种需要最高100mA的调节电源的应用。与大功率MC7800和MC78M00系列一样&#xff0c;这款稳压器也提供内部电流限制和高温关断&#xff0c;因此非常坚固耐用。在很多应用中&#xf…

提高工作效率,这 10 款 AI 工具不能错过

作为一个职场打工人&#xff0c;我深知时间和效率的重要性。正是因为如此&#xff0c;我开始使用各种人工智能工具来帮助我提高工作效率。在这篇文章中&#xff0c;我将会分享 10 款必备的 AI 工具&#xff0c;这些工具可以让你的工作更加高效&#xff0c;从而为你节省更多的时…