大数据技术(入门篇) --- 使用 Spring Boot 操作 CDH6.2.0 Hadoop

news2024/12/23 17:53:20

前言

本人是web后端研发,习惯使用spring boot 相关框架,因此技术选型直接使用的是spring boot,目前并未使用 spring-data-hadoop 依赖,因为这个依赖已经在 2019 年终止了,可以点击查看 ,所以我这里使用的是自己找的依赖,
声明:此依赖可能和你使用的不兼容,我这个适用于我自己的CDH配套环境,如果遇到不兼容情况,自行修改相关版本即可

代码库地址:https://github.com/lcy19930619/cdh-demo

认识Hadoop

Hadoop是一个由Apache基金会所开发的分布式系统基础架构。用户可以在不了解分布式底层细节的情况下,开发分布式程序。充分利用集群的威力进行高速运算和存储。Hadoop 中的HDFS 是CDH数据系统中的核心存储单元,也是学习其他组件的基础

组成

NameNode

  • NameNode 是一个通常在 HDFS 实例中的单独机器上运行的软件。它负责管理文件系统名称空间控制外部客户机的访问
  • NameNode 决定是否将文件映射到 DataNode 上的复制块上。对于最常见的 3 个复制块,第一个复制块存储在同一机架的不同节点上,最后一个复制块存储在不同机架的某个节点上
  • 实际的 I/O事务并没有经过 NameNode,只有表示 DataNode 和块的文件映射的元数据经过 NameNode。当外部客户机发送请求要求创建文件时,NameNode 会以块标识和该块的第一个副本的 DataNode IP 地址作为响应。这个 NameNode 还会通知其他将要接收该块的副本的 DataNode
  • NameNode 在一个称为 FsImage 的文件中存储所有关于文件系统名称空间的信息。这个文件和一个包含所有事务的记录文件( EditLog)将存储在 NameNode 的本地文件系统上。FsImage 和 EditLog 文件也需要复制副本,以防文件损坏或 NameNode 系统丢失
  • NameNode本身不可避免地具有单点失效的风险,主备模式并不能解决这个问题,通过Hadoop Non-stop namenode才能实现100% uptime可用时间

DataNode

  • DataNode 也是一个通常在 HDFS实例中的单独机器上运行的软件。
  • Hadoop 集群包含一个 NameNode 和大量 DataNode。DataNode 通常以机架的形式组织,机架通过一个交换机将所有系统连接起来。Hadoop 的一个假设是:机架内部节点之间的传输速度快于机架间节点的传输速度
  • DataNode 响应来自 HDFS 客户机的读写请求。它们还响应来自 NameNode 的创建、删除和复制块的命令。NameNode 依赖来自每个 DataNode 的定期心跳(heartbeat)消息。每条消息都包含一个块报告,NameNode 可以根据这个报告验证块映射和其他文件系统元数据。如果 DataNode 不能发送心跳消息,NameNode 将采取修复措施,重新复制在该节点上丢失的块文件操作

核心部分

HDFS

  • HDFS 是Apache Hadoop Core项目的一部分。它存储 Hadoop 集群中所有存储节点上的文件
  • HDFS 是指被设计成适合运行在通用硬件上的分布式文件系统。
  • HDFS 是一个高度容错性的系统,适合部署在廉价的机器上。HDFS能提供高吞吐量的数据访问,非常适合大规模数据集上的应用。
  • 存储在 HDFS 中的文件被分成块,然后将这些块复制到多个计算机中(DataNode)。这与传统的 RAID 架构大不相同。块的大小(1.x版本默认为 64MB,2.x版本默认为128MB)和复制的块数量在创建文件时由客户机决定。NameNode 可以控制所有文件操作。HDFS 内部的所有通信都基于标准的 TCP/IP 协议。
  • HDFS 并不是一个万能的文件系统。它的主要目的是支持以流的形式访问写入的大型文件

MapReduce 计算引擎

  • 该引擎由 JobTrackers 和 TaskTrackers 组成
  • 该引擎位于HDFS上层
  • MapReduce是一个基于集群的高性能并行计算平台。它允许用市场上普通的商用服务器构成一个包含数十、数百至数千个节点的分布和并行计算集群。
  • MapReduce是一个并行计算与运行软件框架。它提供了一个庞大但设计精良的并行计算软件框架,能自动完成计算任务的并行化处理,自动划分计算数据和计算任务,在集群节点上自动分配和执行任务以及收集计算结果,将数据分布存储、数据通信、容错处理等并行计算涉及到的很多系统底层的复杂细节交由系统负责处理,大大减少了软件开发人员的负担。
  • MapReduce是一个并行程序设计模型与方法。它借助于函数式程序设计语言Lisp的设计思想,提供了一种简便的并行程序设计方法,用Map和Reduce两个函数编程实现基本的并行计算任务,提供了抽象的操作和并行编程接口,以简单方便地完成大规模数据的编程和计算处理

使用CDH 创建 HDFS 集群

添加服务

在这里插入图片描述
在这里插入图片描述

角色分配

在这里插入图片描述

修改HDFS配置,允许外部访问

搜索关键值 ‘绑定到通配符地址’
在这里插入图片描述

添加完成

在这里插入图片描述
点击红款部分,访问 NameNode 检查集群情况

检查集群

在这里插入图片描述
可以看到 hadoop 正常启动,且集群版本为 ``

使用 Spring Boot 操作Hadoop

新建项目

在这里插入图片描述
在这里插入图片描述

一定要添加 Cloudera仓库

<repositories>
	<repository>
    	<id>cloudera.repo</id>
        <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
    </repository>
</repositories>

pom文件内容如下

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>com.example</groupId>
    <artifactId>cdh-demo</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>cdh-demo</name>
    <description>cdh-demo</description>
    <properties>
        <java.version>1.8</java.version>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
        <spring-boot.version>2.6.13</spring-boot.version>
        <hadoop.version>3.0.0-cdh6.2.0</hadoop.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-configuration-processor</artifactId>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-common</artifactId>
            <version>${hadoop.version}</version>
            <exclusions>
                <exclusion>
                    <artifactId>slf4j-reload4j</artifactId>
                    <groupId>org.slf4j</groupId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-hdfs</artifactId>
            <version>${hadoop.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-mapreduce-client-core</artifactId>
            <version>${hadoop.version}</version>
            <exclusions>
                <exclusion>
                    <artifactId>slf4j-reload4j</artifactId>
                    <groupId>org.slf4j</groupId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-mapreduce-client-jobclient</artifactId>
            <version>${hadoop.version}</version>
            <exclusions>
                <exclusion>
                    <artifactId>slf4j-reload4j</artifactId>
                    <groupId>org.slf4j</groupId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-mapreduce-client-common</artifactId>
            <version>${hadoop.version}</version>
            <exclusions>
                <exclusion>
                    <artifactId>slf4j-reload4j</artifactId>
                    <groupId>org.slf4j</groupId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>
    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-dependencies</artifactId>
                <version>${spring-boot.version}</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>
    <repositories>
        <repository>
            <id>cloudera.repo</id>
            <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
        </repository>
    </repositories>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.8.1</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                    <encoding>UTF-8</encoding>
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
                <version>${spring-boot.version}</version>
                <configuration>
                    <mainClass>com.example.cdh.CdhDemoApplication</mainClass>
                    <skip>true</skip>
                </configuration>
                <executions>
                    <execution>
                        <id>repackage</id>
                        <goals>
                            <goal>repackage</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>

YML 文件

hadoop:
  # 我的 hdfs namenode 在 slave-1这台机器上 
  url: hdfs://cdh-slave-1:8020
  replication: 3
  blockSize: 2097152
  user: root

Hadoop 属性配置类

package com.example.cdh.properties;

import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;

/**
 * @author chunyang.leng
 * @date 2023-04-17 10:31
 */
@Configuration
@ConfigurationProperties(prefix = "hadoop")
public class HadoopProperties {
    /**
     * namenode 地址,示例:hdfs://cdh-master:8020
     */
    private String url;

    /**
     * 分片数量
     */
    private String replication;
    /**
     * 块文件大小
     */
    private String blockSize;
    /**
     * 操作的用户
     */
    private String user;

    public String getUrl() {
        return url;
    }

    public void setUrl(String url) {
        this.url = url;
    }

    public String getReplication() {
        return replication;
    }

    public void setReplication(String replication) {
        this.replication = replication;
    }

    public String getBlockSize() {
        return blockSize;
    }

    public void setBlockSize(String blockSize) {
        this.blockSize = blockSize;
    }

    public String getUser() {
        return user;
    }

    public void setUser(String user) {
        this.user = user;
    }

}

Hadoop 自动装配类

package com.example.cdh.configuration;

import com.example.cdh.properties.HadoopProperties;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.springframework.context.annotation.Bean;

/**
 * @author chunyang.leng
 * @date 2023-04-17 10:40
 */
@org.springframework.context.annotation.Configuration
public class HadoopAutoConfiguration {

    @Bean
    public FileSystem fileSystem(
        HadoopProperties hadoopProperties) throws URISyntaxException, IOException, InterruptedException {
        // 获取连接集群的地址
        URI uri = new URI(hadoopProperties.getUrl());
        // 创建一个配置文件
        Configuration configuration = new Configuration();
        // 设置配置文件中副本的数量
        configuration.set("dfs.replication", hadoopProperties.getReplication());
        // 设置配置文件块大小
        configuration.set("dfs.blocksize", hadoopProperties.getBlockSize());
        // 获取到了客户端对象
        return FileSystem.get(uri, configuration, hadoopProperties.getUser());
    }
}

操作HDFS

HDFS 操作类

package com.example.cdh.service;

import java.io.IOException;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.List;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

/**
 * @author chunyang.leng
 * @date 2023-04-17 11:06
 */
@Component
public class HdfsService {
    @Autowired
    private FileSystem fileSystem;
    /**
     * 上传文件到 HDFS
     * @param data 文件数据
     * @param url 文件名称和路径
     * @param overwrite  是否允许覆盖文件
     */
    public void uploadFile(byte[] data, String url,boolean overwrite) throws IOException {
        try (FSDataOutputStream stream = fileSystem.create(new Path(url), overwrite)){
            IOUtils.write(data, stream);
        }
    }

    /**
     * 下载文件到本地
     * @param url
     * @return
     */
    public void download(String url, OutputStream outputStream) throws IOException {
        Path path = new Path(url);
        try (FSDataInputStream open = fileSystem.open(path)){
            IOUtils.copy(open, outputStream);
        }
    }

    /**
     * 遍历全部文件,并返回所有文件路径
     * @param url
     * @param recursive 是否为递归遍历
     * @return
     * @throws IOException
     */
    public List<Path> listFiles(String url,boolean recursive) throws IOException {
        Path path = new Path(url);
        RemoteIterator<LocatedFileStatus> iterator = fileSystem.listFiles(path, true);
        List<Path> list= new ArrayList<Path>();
        while (iterator.hasNext()){
            LocatedFileStatus file = iterator.next();
            Path filePath = file.getPath();
            list.add(filePath);
        }
        return list;
    }

    /**
     * 删除文件
     * @param path 文件路径
     * @param recursive 是否为递归删除
     * @throws IOException
     */
    public void delete(String path,boolean recursive) throws IOException{
        fileSystem.delete(new Path(path),recursive);
    }
}

HDFS 单元测试

使用单元测试操作hdfs

  • 使用UUID,生成简短测试文件内容
  • 清理掉HDFS 测试目录内容,防止出现错误目录
  • 将测试文件通过HDFS操作类,上传到HDFS中
  • 使用遍历封装的接口,确认数据上传成功
  • 使用下载接口,下载刚刚上传的文件内容
  • 将初始文件内容、下载后的文件内容分别生成MD5摘要
  • 计算两个MD5应该相同
  • 使用删除接口,清理HDFS测试环境
  • 使用删除功能,删除本地测试文件
package com.example.cdh;

import com.example.cdh.service.HdfsService;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
import java.util.List;
import java.util.UUID;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.fs.Path;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.util.Assert;
import org.springframework.util.CollectionUtils;
import org.springframework.util.DigestUtils;

/**
 * @author chunyang.leng
 * @date 2023-04-17 11:26
 */
@SpringBootTest
public class HdfsServiceTest {
    private static final Logger logger = LoggerFactory.getLogger(HdfsServiceTest.class);
    String fileContent = UUID.randomUUID().toString();

    @Autowired
    private HdfsService hdfsService;

    @Test
    public void hdfsTest() throws IOException {
        File testFile = new File("./test", "hdfs-test.txt");
        FileUtils.writeStringToFile(testFile,fileContent,"utf-8");
        logger.info("生成测试文件完毕");
        byte[] before = FileUtils.readFileToByteArray(testFile);

        String testPath = "/test/" +UUID.randomUUID().toString();
        hdfsService.delete(testPath,true);
        logger.info("清理测试目录:{}",testPath);

        String hdfsFilePath = testPath +"/test.txt";
        hdfsService.uploadFile(before,hdfsFilePath,true);
        logger.info("上传流程测试完毕");

        List<Path> paths = hdfsService.listFiles(testPath, true);
        Assert.isTrue(!CollectionUtils.isEmpty(paths),"测试目录不应该为空");
        logger.info("遍历流程测试完毕");

        ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
        hdfsService.download(hdfsFilePath,outputStream);
        byte[] after = outputStream.toByteArray();

        String beforeMd5 = DigestUtils.md5DigestAsHex(before);
        String afterMd5 = DigestUtils.md5DigestAsHex(after);

        Assert.isTrue(beforeMd5.equals(afterMd5),"上传与下载的文件内容应该一致");
        logger.info("下载流程测试完毕");
        
  		hdfsService.delete(testPath,true);
        testFile.delete();
        logger.info("测试环境清理完毕");
    }
}

HDFS 测试结果

在这里插入图片描述

MapReduce

执行流程

  1. 切分输入数据:MapReduce会将输入数据切分成若干个小块,让不同的Map任务来处理这些小块。

  2. 执行Map任务:对于每一个Map任务,MapReduce框架会调用Map函数来处理该任务所负责的输入数据块。Map函数可以根据输入数据生成若干个键值对,这些键值对可以是简单的数据类型(如整数、字符串等),也可以是自定义的数据类型。Map函数执行完毕后,会将生成的键值对按照键的哈希值分发给不同的Reduce任务。

  3. 执行Shuffle过程:MapReduce框架会将所有Map任务生成的键值对按照键的哈希值发送到不同的Reduce任务。这个过程被称为Shuffle过程。Shuffle过程是MapReduce框架中最耗时的操作之一。

  4. 执行Reduce任务:每一个Reduce任务会收到多个Map任务发来的键值对,并根据键将这些键值对进行合并,并执行Reduce函数来生成最终的输出结果。Reduce函数的输入和输出可以是简单的数据类型(如整数、字符串等),也可以是自定义的数据类型。

  5. 输出结果:所有Reduce任务执行完毕后,MapReduce框架会将最终的输出结果写入输出文件或输出数据库中,然后输出结果。

编程模型

​ 用户编程的程序分成三个部分:MapperReducerDriver

  • Mapper:
    • 用户自定义的Mapper要继承自己的父类
    • Mapper的输入数据是KV对的形式(KV的类型可自定义)
    • Mapper中的业务逻辑写在map()方法中
    • Mapper的输出数据是KV对的形式(KV的类型可自定义)
    • map()方法(MapTask进程)对每一个<K,V>调用一次
  • Reduce
    • 用户自定义的Reduce要继承自己的父类
    • Reduce的输入数据类型对应Mapper的输出数据类型,也是KV
    • Reducer的业务逻辑写在reduce()方法中
    • Reduce Task进程对每一组相同的<K,V>组调用一次reduce()方法
  • Driver : Driver是对本次Job相关参数内容的一层封装

JAVA 类型 与Hadoop writable 类型 映射

Java类型Hadoop writable类型
booleanBooleanWritable
byteByteWritable
intIntWritable
floatFloatWritable
longLongWritable
doubleDoubleWritable
StringText
mapMapWritable
arrayArrayWritable

测试用例

  • 编写测试 Mapper、Reduce、Driver
  • 统计 HDFS 一段数据中,每个非空白字符的使用数量
  • 将结果写入到HDFS中,并将结果打印到控制台

测试Mapper

package com.example.cdh.service.mapreduce.wordcount;

import java.io.IOException;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

/**
 * @author chunyang.leng
 * @date 2023-04-17 13:26
 */
public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
    private final Text outK = new Text();
    private final IntWritable outV = new IntWritable(1);

    @Override
    protected void map(LongWritable key,
        Text value,
        Mapper<LongWritable, Text, Text, IntWritable>.Context context) throws IOException, InterruptedException {
        String line = value.toString();
        char[] chars = line.toCharArray();
        for (char aChar : chars) {
            String str = Character.toString(aChar);
            if (StringUtils.isBlank(str)){
                continue;
            }
            outK.set(str);
            context.write(outK, outV);

        }
    }
}

测试Reduce

package com.example.cdh.service.mapreduce.wordcount;

import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

/**
 * @author chunyang.leng
 * @date 2023-04-17 13:27
 */
public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
    private final IntWritable outV = new IntWritable();

    @Override
    protected void reduce(Text key, Iterable<IntWritable> values, Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException {
        int sum = 0;
        for (IntWritable value : values) {
            sum += value.get();
        }
        outV.set(sum);
        context.write(key, outV);
    }
}

测试Driver

package com.example.cdh.service.mapreduce.wordcount;

import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

/**
 * @author chunyang.leng
 * @date 2023-04-17 13:27
 */
public class WordCountDriver {

    private final Job instance;

    public WordCountDriver(String inputPath, String outputPath) throws IOException {

        JobConf jobConf = new JobConf();
        // 设置要计算的文件读取路径
        jobConf.set(FileInputFormat.INPUT_DIR,inputPath);
        // 设置计算结果存储路径
        jobConf.set(FileOutputFormat.OUTDIR,outputPath);

        // 1.创建job实例
        instance = Job.getInstance(jobConf);
        // 2.设置jar
        instance.setJarByClass(WordCountDriver.class);
        // 3.设置Mapper和Reducer
        instance.setMapperClass(WordCountMapper.class);
        instance.setReducerClass(WordCountReducer.class);
        // 4.设置map输出的kv类型
        instance.setMapOutputKeyClass(Text.class);
        instance.setMapOutputValueClass(IntWritable.class);
        // 5.设置最终输出的kv类型
        instance.setOutputKeyClass(Text.class);
        instance.setOutputValueClass(IntWritable.class);
    }

    /**
     * 提交 job 运行
     * @throws IOException
     * @throws InterruptedException
     * @throws ClassNotFoundException
     */
    public void run() throws IOException, InterruptedException, ClassNotFoundException {
        instance.waitForCompletion(true);
    }
}

测试类

package com.example.cdh;

import com.example.cdh.service.HdfsService;
import com.example.cdh.service.mapreduce.WordCountJob;
import java.io.ByteArrayOutputStream;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.UUID;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;

/**
 * @author chunyang.leng
 * @date 2023-04-17 15:28
 */
@SpringBootTest
public class MapReduceTest {

    private static final Logger logger = LoggerFactory.getLogger(MapReduceTest.class);
    String context = "Spring Boot makes it easy to create stand-alone, production-grade Spring based Applications that you can \"just run\". " +
        "We take an opinionated view of the Spring platform and third-party libraries so you can get started with minimum fuss. Most Spring Boot applications need minimal Spring configuration. " +
        "If you’re looking for information about a specific version, or instructions about how to upgrade from an earlier release, check out the project release notes section on our wiki.";
    @Autowired
    private HdfsService hdfsService;
    @Autowired
    private WordCountJob wordCountJob;
    @Autowired
    private FileSystem fileSystem;

    @Test
    public void testMapReduce() throws Exception {
        String fileName = "mapreduce.txt";
        String path = "/test/" + UUID.randomUUID().toString();

        String inputHdfsFilePath = path + "/" + fileName;

        String outPutHdfsFile = path + "/result/";
        hdfsService.delete(inputHdfsFilePath, true);
        logger.info("测试环境数据清理完毕");

        hdfsService.uploadFile(context.getBytes(StandardCharsets.UTF_8), inputHdfsFilePath, true);
        logger.info("MapReduce 测试文本上传完毕,开始执行 word count job");

        wordCountJob.runJob("hdfs://cdh-slave-1:8020" + inputHdfsFilePath, "hdfs://cdh-slave-1:8020" + outPutHdfsFile);
        logger.info("MapReduce 测试job执行完毕");


        List<Path> paths = hdfsService.listFiles(outPutHdfsFile, true);
        for (Path resultPath : paths) {
            FileStatus status = fileSystem.getFileStatus(resultPath);
            if (status.isDirectory()){
                continue;
            }
            if (status.isFile() && !resultPath.getName().startsWith("_SUCCESS")){
                // 是文件,并且不是成功标识文件

                try (FSDataInputStream open = fileSystem.open(resultPath);
                     ByteArrayOutputStream outputStream = new ByteArrayOutputStream()){
                    IOUtils.copy(open, outputStream);
                    byte[] bytes = outputStream.toByteArray();
                    logger.info("任务执行完毕,获取结果:{}", new String(bytes, StandardCharsets.UTF_8));
                }

            }
        }

        hdfsService.delete(path, true);
        logger.info("测试结束,清理空间完毕");

    }
}

测试结果

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/424946.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

防火墙的IPSECVPN点到点实验 dsvpn多层分支实验

目录 防火墙的IPSECVPN点到点实验 dsvpn多层分支实验 ​编辑 防火墙的IPSECVPN点到点实验 配置路由器接口IP 配置接口防火墙IP 写放通的策略 ping对端防火墙的接口看是否能ping通 ipsec进行配置 配置往返流量 dsvpn多层分支实验 先配置IP 2&#xff0c;配置静态IP 3&#xf…

拦截导弹 导弹防御系统

拦截导弹 & 导弹防御系统拦截导弹导弹防御系统拦截导弹 题目链接:acwing1010. 拦截导弹 题目描述&#xff1a; 输入输出: 分析&#xff1a; 第一个问题为输出最长递减子序列&#xff0c;由于导弹数在1000以内所以采用时间复杂度为O(n2)O(n^2)O(n2)或者O(nlogn)O(nlogn)O…

介绍一款idea神级插件【Bito-ChatGPT】

什么是Bito&#xff1f; Bito是一款在IntelliJ IDEA编辑器中的插件&#xff0c;Bito插件是由ChatGPT团队开发的&#xff0c;它是ChatGPT团队为了提高开发效率而开发的一款工具。ChatGPT团队是一支专注于自然语言处理技术的团队&#xff0c;他们开发了一款基于GPT的自然语言处理…

[oeasy]python0133_[趣味拓展]好玩的unicode字符_另类字符_上下颠倒英文字符

另类字符 回忆上次内容 上次再次输出了大红心♥ 找到了红心对应的编码黑红梅方都对应有编码 原来的编码叫做 ascii️ \u这种新的编码方式叫unicode包括了 中日韩字符集等 各书写系统的字符集 除了这些常规字符之外 还有什么好玩的东西呢&#xff1f; 颠倒字符 这个网站可以…

DQN基本概念和算法流程(附Pytorch代码)

❀DQN算法原理 DQN&#xff0c;Deep Q Network本质上还是Q learning算法&#xff0c;它的算法精髓还是让Q估计Q_{估计}Q估计​尽可能接近Q现实Q_{现实}Q现实​&#xff0c;或者说是让当前状态下预测的Q值跟基于过去经验的Q值尽可能接近。在后面的介绍中Q现实Q_{现实}Q现实​也…

提高工作效率必备,5款实用的Windows系统工具推荐

每次分享实用的软件,都会给人一种踏实和喜悦的感觉,这也是我热衷于搜集和推荐高效工具软件的原因。 音量控制——EarTrumpet EarTrumpet是一款音量控制工具&#xff0c;可以让你更方便地调节Windows系统中不同应用程序的音量。你可以使用EarTrumpet来替代系统自带的音量混合器…

表单设计器开源的定义和应用场景布局介绍

为了实现提质增效的办公自动化&#xff0c;表单设计器开源工具的应用变得广泛起来。在低代码开发市场昌盛发展的今天&#xff0c;不少企业期望通过快速、现成的快速配置表单工具实现高效率表单制作&#xff0c;那么&#xff0c;现在给大家介绍的这款开发易用性强、组件丰富、高…

设计模式 -- 门面模式

前言 月是一轮明镜,晶莹剔透,代表着一张白纸(啥也不懂) 央是一片海洋,海乃百川,代表着一块海绵(吸纳万物) 泽是一柄利剑,千锤百炼,代表着千百锤炼(输入输出) 月央泽,学习的一种过程,从白纸->吸收各种知识->不断输入输出变成自己的内容 希望大家一起坚持这个过程,也同…

stable-diffusion真的好用吗?

hi&#xff0c;各位大佬&#xff0c;今天尝试下diffusion大模型&#xff0c;也是CV领域的GPT&#xff0c;但需要prompt&#xff0c;我给了prompt结果并不咋滴&#xff0c;如下示例&#xff0c;并附代码及参考link 1、img2img 代码实现&#xff1a; import torch from PIL im…

PageHelper的使用

这个分页插件是在Mybatis的环境中使用的&#xff0c;所以项目需要导入Mybatis依赖 更加详细的用法看官方文档&#xff1a;PageHelper官网 在Mybatis中使用 前提条件 引入依赖 <dependency><groupId>com.github.pagehelper</groupId><artifactId>pa…

GANs和Generative Adversarial Nets和Vox2Vox: 3D-GAN for Brain Tumour Segmentation

参考&#xff1a; 各种生成模型&#xff1a;VAE、GAN、flow、DDPM、autoregressive models https://blog.csdn.net/zephyr_wang/article/details/126588478李沐GAN精度 x.1 生成模型家族 DGMs&#xff08;Deep Generatitve Models&#xff09;家族主要有&#xff1a;GAN&…

数据分析的目的和意义是什么?_光点科技

数据分析是一个越来越受到关注的领域&#xff0c;因为它可以帮助企业和组织利用数据来制定更明智的决策。数据分析的目的和意义是多方面的&#xff0c;例如&#xff1a; 1.了解客户需求 数据分析可以帮助企业更好地了解客户需求&#xff0c;从而制定更准确的市场营销策略。通过…

原生JS + HTML + CSS 实现快递物流信息 API 的数据链式展示

引言 全国快递物流查询 API 是一种提供实时、准确、可靠的快递物流信息查询服务的接口。它基于现有的物流信息系统&#xff0c;通过API接口的方式&#xff0c;向用户提供快递物流信息的查询、跟踪、统计等功能。使用全国快递物流查询 API&#xff0c;用户可以在自己的应用程序…

[2021 东华杯]bg3

Index介绍漏洞利用过程一.泄露Libc二.Tcache Bin Attack三.完整EXP介绍 [2021 东华杯]bg3 本题是C写的一道经典菜单堆题&#xff0c;拥有增删改查全部功能。 Bug DataBase - V3.0 - I think i am UnBeatAble 1. Upload A Bug 2. Change A Uploaded Bug 3. Get Uploaded Bug D…

企业大数据湖总体规划及大数据湖 一体化运营管理建设方案

背景&#xff1a;数据快速入湖&#xff0c;分析更加智能&#xff0c;应用更加多样&#xff0c;服务更加开放更多企业数据将进入数据湖&#xff0c;来自传统系统的数据和传感器等新型数据资源不断融合&#xff0c;数据孤岛将继续被打破。随着大数据分析能力的不断提高&#xff0…

借助Nacos配置中心实现一个动态线程池

目录 一、实现思路 二、实现说明概览 三、代码实现 DynamicThreadPool RejectedProxyInvocationHandler DynamicThreadPoolRegister DynamicThreadPoolRefresher 测试动态线程池 平常我们系统中定义的一些线程池如果要想修改的话&#xff0c;需要修改配置重启服务才能生…

『pyqt5 从0基础开始项目实战』05. 按钮点击事件之添加新数据 (保姆级图文)

目录导包和框架代码给按钮绑定一个点击事件获取输入框的数据多线程与界面更新&#xff08;新线程与UI更新的数据交互&#xff09;代码结构完整代码main文件Threads.py总结欢迎关注 『pyqt5 从0基础开始项目实战』 专栏&#xff0c;持续更新中 欢迎关注 『pyqt5 从0基础开始项目…

上海亚商投顾:沪指创年内新高 大金融、中字头集体走强

上海亚商投顾前言&#xff1a;无惧大盘涨跌&#xff0c;解密龙虎榜资金&#xff0c;跟踪一线游资和机构资金动向&#xff0c;识别短期热点和强势个股。 市场情绪 沪指今日低开高走&#xff0c;午后涨超1%&#xff0c;创出近10个月以来新高&#xff0c;创业板指走势较弱&#xf…

不走弯路,AI真的能提高生产效率

AI应用虽然取得了令人瞩目的成果&#xff0c;但是在实际应用中仍存在不少困境。市面上不乏有AI绘画、AI写作、AI聊天的相关产品&#xff0c;即使Chatgpt可以写代码、写论文&#xff0c;但由于技术的有限性&#xff0c;还需要不断地优化完善才能给出更精准的答复&#xff0c;也少…

契约锁与多家软件行业伙伴达成战略合作,携手助力组织数字化转型

近日&#xff0c;契约锁电子签章与天翼云、神州数码、同望科技、宏灿软件、甄零科技、正量科技等多家软件行业伙伴达成战略合作&#xff0c;充分发挥各自专业与资源优势&#xff0c;从产品、市场、销售、技术等多方面展开深度合作&#xff0c;共同为客户提供全程数字化解决方案…