基于Mahout实现K-Means聚类

news2024/9/22 13:32:04

需求分析

需要对数据集进行预处理,选择合适的特征进行聚类分析,确定聚类的数量和初始中心点,调用Mahout提供的K-Means算法进行聚类计算,评估聚类结果的准确性和稳定性。同时,需要对Mahout的使用和参数调优进行深入学习和实践,以保证聚类结果的有效性和可靠性。

系统实现

    1.对实验整体的理解:

    本次实验,我们的目的是理解聚类的原理,并且掌握常见聚类的算法,以及掌握使用Mahout实现K-Means聚类分析算法的过程。

     2.实验整体流程分析:

  • 创建项目,导入开发依赖包
  • 编写工具类
  • 编写聚类分析的代码
  • 将聚类结果输出
  • 评估聚类的效果

     3.准备工作:

  • 使用IDEA创建一个Maven项目:mahout_kmeans_demo

 

  • 修改pom.xml文件,导入开发MapReduce所需的Jar包
 <dependencies>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-hdfs</artifactId>
            <version>2.6.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>2.6.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-common</artifactId>
            <version>2.6.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-mapreduce-client-common</artifactId>
            <version>2.6.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-mapreduce-client-core</artifactId>
            <version>2.6.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.mahout</groupId>
            <artifactId>mahout-mr</artifactId>
            <version>0.13.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.mahout</groupId>
            <artifactId>mahout-math</artifactId>
            <version>0.13.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.mahout</groupId>
            <artifactId>mahout-hdfs</artifactId>
            <version>0.13.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.mahout</groupId>
            <artifactId>mahout-integration</artifactId>
            <version>0.13.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.mahout</groupId>
            <artifactId>mahout-examples</artifactId>
            <version>0.13.0</version>
        </dependency>
</dependencies>

下载相关依赖包

等待pom.xml文件不再出现错误即可 

  • 准备实验数据并下载

  • 启动Hadoop集群。

终端输入start-all.sh

可以使用jps命令查看集群启动情况。

     4.执行聚类过程:

  • 编写工具类HdfsUtil,对HDFS的基本操作进行封装
package com;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.mapred.JobConf;

import java.io.IOException;
import java.net.URI;

public class HdfsUtil {
    private static final String HDFS = "hdfs://master:9000/";
    private String hdfsPath;
    private Configuration conf;

    public HdfsUtil(Configuration conf) {
        this(HDFS, conf);
    }

    public HdfsUtil(String hdfs, Configuration conf) {
        this.hdfsPath = hdfs;
        this.conf = conf;
    }

    public static JobConf config() {
        JobConf conf = new JobConf(HdfsUtil.class);
        conf.setJobName("HdfsDAO");
        return conf;
    }

    public void mkdirs(String folder) throws IOException {
        Path path = new Path(folder);
        FileSystem fs = FileSystem.get(URI.create(hdfsPath), conf);
        if (!fs.exists(path)) {
            fs.mkdirs(path);
            System.out.println("Create: " + folder);
        }
        fs.close();
    }

    public void rmr(String folder) throws IOException {
        Path path = new Path(folder);
        FileSystem fs = FileSystem.get(URI.create(hdfsPath), conf);
        fs.deleteOnExit(path);
        System.out.println("Delete: " + folder);
        fs.close();
    }

    public void ls(String folder) throws IOException {
        Path path = new Path(folder);
        FileSystem fs = FileSystem.get(URI.create(hdfsPath), conf);
        FileStatus[] list = fs.listStatus(path);
        System.out.println("ls: " + folder);
        System.out.println("==========================================================");
        for (FileStatus f : list) {
            System.out.printf("name: %s, folder: %s, size: %d\n", f.getPath(), f.isDir(), f.getLen());
        }
        System.out.println("==========================================================");
        fs.close();
    }

    public void createFile(String file, String content) throws IOException {
        FileSystem fs = FileSystem.get(URI.create(hdfsPath), conf);
        byte[] buff = content.getBytes();
        FSDataOutputStream os = null;
        try {
            os = fs.create(new Path(file));
            os.write(buff, 0, buff.length);
            System.out.println("Create: " + file);
        } finally {
            if (os != null)
                os.close();
        }
        fs.close();
    }

    public void copyFile(String local, String remote) throws IOException {
        FileSystem fs = FileSystem.get(URI.create(hdfsPath), conf);
        fs.copyFromLocalFile(new Path(local), new Path(remote));
        System.out.println("copy from: " + local + " to " + remote);
        fs.close();
    }

    public void download(String remote, String local) throws IOException {
        Path path = new Path(remote);
        FileSystem fs = FileSystem.get(URI.create(hdfsPath), conf);
        fs.copyToLocalFile(path, new Path(local));
        System.out.println("download: from" + remote + " to " + local);
        fs.close();
    }

    public void cat(String remoteFile) throws IOException {
        Path path = new Path(remoteFile);
        FileSystem fs = FileSystem.get(URI.create(hdfsPath), conf);
        FSDataInputStream fsdis = null;
        System.out.println("cat: " + remoteFile);
        try {
            fsdis = fs.open(path);
            IOUtils.copyBytes(fsdis, System.out, 4096, false);
        } finally {
            IOUtils.closeStream(fsdis);
            fs.close();
        }
    }
}
  • 编写KMeansMahout类,执行聚类过程
package com;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.mahout.clustering.Cluster;
import org.apache.mahout.clustering.canopy.CanopyDriver;
import org.apache.mahout.clustering.conversion.InputDriver;
import org.apache.mahout.clustering.kmeans.KMeansDriver;
import org.apache.mahout.common.HadoopUtil;
import org.apache.mahout.common.distance.EuclideanDistanceMeasure;
import org.apache.mahout.utils.clustering.ClusterDumper;

public class KMeansMahout {
    private static final String HDFS = "hdfs://master:9000";

    public static void main(String[] args) throws Exception {
        String localFile = "/home/data/iris.dat";
        //  mahout输出至HDFS的目录
        String outputPath = HDFS + "/user/hdfs/kmeans/output";
        //  mahout的输入目录
        String inputPath = HDFS + "/user/hdfs/kmeans/input/";

        //  canopy算法的t1和t2
        double t1 = 2;
        double t2 = 1;
        //  收敛阀值
        double convergenceDelta = 0.5;
        //  最大迭代次数
        int maxIterations = 10;

        Path output = new Path(outputPath);
        Path input = new Path(inputPath);
        Configuration conf = new Configuration();

        HdfsUtil hdfs = new HdfsUtil(HDFS, conf);
        hdfs.rmr(inputPath);
        hdfs.mkdirs(inputPath);
        hdfs.copyFile(localFile, inputPath);
        hdfs.ls(inputPath);

        //  每次执行聚类前,删除掉上一次的输出目录
        HadoopUtil.delete(conf, output);
        //  执行聚类
        run(conf, input, output, new EuclideanDistanceMeasure(), t1, t2, convergenceDelta, maxIterations);
    }

    private static void run(Configuration conf, Path input, Path output,
                            EuclideanDistanceMeasure euclideanDistanceMeasure, double t1, double t2,
                            double convergenceDelta, int maxIterations) throws Exception {

        Path directoryContainingConvertedInput = new Path(output, "data");

        System.out.println("Preparing  Input");
        //  将输入文件序列化,并选取RandomAccessSparseVector作为保存向量的数据结构
        InputDriver.runJob(input, directoryContainingConvertedInput,
                "org.apache.mahout.math.RandomAccessSparseVector");

        System.out.println("Running  Canopy  to  get  initial  clusters");
        //  保存canopy的目录
        Path canopyOutput = new Path(output, "canopies");

        //  执行Canopy聚类
        CanopyDriver.run(conf, directoryContainingConvertedInput, canopyOutput,
                euclideanDistanceMeasure, t1, t2, false, 0.0, false);

        System.out.println("Running  KMeans");
        //  执行k-means聚类,并使用canopy目录
        KMeansDriver.run(conf, directoryContainingConvertedInput,
                new Path(canopyOutput, Cluster.INITIAL_CLUSTERS_DIR + "-final"),
                output, convergenceDelta, maxIterations, true, 0.0, false);

        System.out.println("run  clusterdumper");
        //  将聚类的结果输出至HDFS
        ClusterDumper clusterDumper = new ClusterDumper(new Path(output, "clusters-*-final"),
                new Path(output, "clusteredPoints"));
        clusterDumper.printClusters(null);
    }
}

在KmeansMahout类上点击右键并执行程序

 执行结果在HDFS目录中

     5.解析聚类结果:

  • 从Mahout的输出目录下提取出所要的信息

  • 编写ClusterOutput类,解析聚类后结果
package com;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.mahout.clustering.classify.WeightedPropertyVectorWritable;
import org.apache.mahout.math.Vector;

import java.io.BufferedWriter;
import java.io.File;
import java.io.FileWriter;

public class ClusterOutput {
    private static final String HDFS = "hdfs://master:9000";

    public static void main(String[] args) {
        try {
            //   需要被解析的mahout的输出文件
            String clusterOutputPath = "/user/hdfs/kmeans/output";
            //   解析后的聚类结果,将输出至本地磁盘
            String resultPath = "/home/data/result.txt";

            BufferedWriter bw;
            Configuration conf = new Configuration();
            conf.set("fs.default.name", HDFS);
            FileSystem fs = FileSystem.get(conf);

            SequenceFile.Reader reader = null;
            reader = new SequenceFile.Reader(fs, new Path(clusterOutputPath + "/clusteredPoints/part-m-00000"), conf);
            bw = new BufferedWriter(new FileWriter(new File(resultPath)));

            //   key为聚簇中心ID
            IntWritable key = new IntWritable();
            WeightedPropertyVectorWritable value = new WeightedPropertyVectorWritable();

            while (reader.next(key, value)) {
                //   得到向量
                Vector vector = value.getVector();
                String vectorValue = "";
                //   将向量各个维度拼接成一行,用\t分隔
                for (int i = 0; i < vector.size(); i++) {
                    if (i == vector.size() - 1) {
                        vectorValue += vector.get(i);
                    } else {
                        vectorValue += vector.get(i) + "\t";
                    }
                }
                bw.write(key.toString() + "\t" + vectorValue + "\n\n");
            }

            bw.flush();
            reader.close();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

在ClusterOutput类上右键执行程序

 执行结果被保存在/home/data/result.txt文件中,打开终端执行以下命令

     6.评估聚类效果:

  • 编写InterClusterDistances类,计算平均簇间距离
package com;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Writable;
import org.apache.mahout.clustering.Cluster;
import org.apache.mahout.clustering.iterator.ClusterWritable;
import org.apache.mahout.common.distance.DistanceMeasure;
import org.apache.mahout.common.distance.EuclideanDistanceMeasure;

import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;

public class InterClusterDistances {
    private static final String HDFS = "hdfs://master:9000";

    public static void main(String[] args) throws Exception {
        String inputFile = HDFS + "/user/hdfs/kmeans/output";
        System.out.println("聚类结果文件地址:" + inputFile);

        Configuration conf = new Configuration();
        Path path = new Path(inputFile + "/clusters-2-final/part-r-00000");
        System.out.println("Input Path:" + path);

        FileSystem fs = FileSystem.get(path.toUri(), conf);
        List<Cluster> clusters = new ArrayList<Cluster>();

        SequenceFile.Reader reader = new SequenceFile.Reader(fs, path, conf);
        Writable key = (Writable) reader.getKeyClass().newInstance();
        ClusterWritable value = (ClusterWritable) reader.getValueClass().newInstance();

        while (reader.next(key, value)) {
            Cluster cluster = value.getValue();
            clusters.add(cluster);
            value = (ClusterWritable) reader.getValueClass().newInstance();
        }

        System.out.println("Cluster In Total:" + clusters.size());

        DistanceMeasure measure = new EuclideanDistanceMeasure();
        double max = 0;
        double min = Double.MAX_VALUE;
        double sum = 0;
        int count = 0;
        Set<Double> total = new HashSet<Double>();

        // 如果聚类的个数大于1才开始计算
        if (clusters.size() != 1 && clusters.size() != 0) {
            for (int i = 0; i < clusters.size(); i++) {
                for (int j = 0; j < clusters.size(); j++) {
                    double d = measure.distance(clusters.get(i).getCenter(), clusters.get(j).getCenter());
                    min = Math.min(d, min);
                    max = Math.max(d, max);
                    total.add(d);
                    sum += d;
                    count++;
                }
            }

            System.out.println("Maximum Intercluster Distance:" + max);
            System.out.println("Minimum Intercluster Distance:" + min);
            System.out.println("Average Intercluster Distance:" + sum / count);
            for (double d : total) {
                System.out.print("[" + d + "] ");
            }

        } else if (clusters.size() == 1) {
            System.out.println("只有一个类,无法判断聚类质量");
        } else if (clusters.size() == 0) {
            System.out.println("聚类失败");
        }
    }
}

同样右键执行程序,得到下图结果

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1488730.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

c++|内存管理

c|内存管理 C/C内存分布strlen 和 sizeof的区别 c语言动态内存管理方式malloccallocrealloc例题 c管理方式new/delete操作内置类型new/delete操作自定义类型证明 new 和 delete 的底层原理operator new与operator delete函数operator new 和 operator delete的 用法构造函数里面…

基于 Vue3打造前台+中台通用提效解决方案(下)

47、通用组件 - 倒计时组件 特惠部分存在一个倒计时的功能,所以我们需要先处理对应的倒计时模块,并把它处理成一个通用组件。 那么对于倒计时模块我们又应该如何进行处理呢? 所谓倒计时,其实更多的是一个时间的处理,那么对于时间的处理,此时我们就需要使用到一个第三方…

Socks5代理协议:原理、应用与优势

在计算机网络中&#xff0c;代理协议是一种用于转发客户端请求的机制。Socks5是其中一种广泛使用的代理协议。它主要工作在传输层和应用层之间&#xff0c;位于OSI参考模型的第五层&#xff08;会话层&#xff09;。其设计初衷是为了帮助授权用户突破防火墙限制&#xff0c;获取…

20240304-2-计算机网络

计算机网络 知识体系 Questions 1.计算机网络分层的优点和缺点 优点 各层之间是独立的&#xff1b;灵活性好&#xff1b;结构上可分割开&#xff1b;易于实现和维护&#xff1b;能促进标准化工作。 缺点&#xff1a; 降低效率&#xff1b;有些功能会在不同的层次中重复出现&…

2000-2021年全国各省市城乡平均受教育年限数据(分城镇和农村)(含原始数据+计算过程+计算结果)

2000-2021年全国各省市城乡平均受教育年限数据&#xff08;分城镇和农村&#xff09; 1、时间&#xff1a;2000-2021年 2、范围&#xff1a;全国及31省 3、来源&#xff1a;人口与就业统计年鉴 4、指标包括&#xff1a;城乡平均受教育年限 、6岁以上总人口 未上过学、…

自动化神器 Playwright 的 Web 自动化测试解决方案!

Playwright认识 3. Playwright环境搭建 Playwright简介&#xff1a; 2020年&#xff0c;微软&#xff08;Microsoft&#xff09;开源了一个名为Playwright的工具&#xff0c;与Selenium一样入门简单&#xff0c;支持多语言&#xff08;Python、Java、Node.js、.NET&#xff0…

Java8,函数式编程应用:

持续更新中&#xff1a; 函数式(Functional)接口 什么是函数式(Functional)接口 只包含一个抽象方法的接口&#xff0c;称为函数式接口。 你可以通过 Lambda 表达式来创建该接口的对象。&#xff08;若 Lambda 表达式 抛出一个受检异常(即&#xff1a;非运行时异常)&#xff0c…

Linux学习:初识Linux

目录 1. 引子&#xff1a;1.1 简述&#xff1a;操作系统1.2 学习工具 2. Linux操作系统中的一些基础概念与指令2.1 简单指令2.2 ls指令与文件2.3 cd指令与目录2.4 文件目录的新建与删除指令2.5 补充指令1&#xff1a;2.6 文件编辑与拷贝剪切2.7 文件的查看2.8 时间相关指令2.9 …

为什么TestNg会成为Java测试框架的首选?还犹豫什么,看它!

上一篇自动化测试我们大概了解了测试的目标、测试的技术选型以及搭建平台的目标及需求&#xff0c;也确定了自动化测试方案以testNg作为整个测试流程贯穿的基础支持框架&#xff0c;那么testNg究竟有什么特点&#xff1f;本篇开始我们来详细的学习testNg这个测试框架。 为什么要…

软件设计师8--输入输出技术

软件设计师8--输入输出技术 考点1&#xff1a;输入输出技术数据传输控制方式中断处理过程例题&#xff1a; 考点1&#xff1a;输入输出技术 数据传输控制方式 √ 程序控制&#xff08;查询&#xff09;方式&#xff1a;分为无条件传送和程序查询方式两种。方法简单&#xff0…

MySQL篇—执行计划之覆盖索引Using index和条件过滤Using where介绍(第三篇,总共三篇)

☘️博主介绍☘️&#xff1a; ✨又是一天没白过&#xff0c;我是奈斯&#xff0c;DBA一名✨ ✌✌️擅长Oracle、MySQL、SQLserver、Linux&#xff0c;也在积极的扩展IT方向的其他知识面✌✌️ ❣️❣️❣️大佬们都喜欢静静的看文章&#xff0c;并且也会默默的点赞收藏加关注❣…

python实现手机号归属地查询

手机上突然收到了某银行的短信提示&#xff0c;看了一下手机的位数&#xff0c;正好是11位。我一想&#xff0c;这不就是标准的手机号码吗&#xff1f;于是一个想法涌上心头——用python的库实现查询手机号码归属地查询自由。 那实现的效果如下&#xff1a; 注&#xff1a;电…

yolov7添加spd-conv注意力机制

一、spd-conv是什么&#xff1f; SPD-Conv&#xff08;Symmetric Positive Definite Convolution&#xff09;是一种新颖的卷积操作&#xff0c;它主要应用于处理对称正定矩阵&#xff08;SPD&#xff09;数据。在传统的卷积神经网络&#xff08;CNN&#xff09;中&#xff0c;…

【java数据结构】模拟二叉树的链式结构之孩子表示法,掌握背后的实现逻辑

&#x1f4e2;编程环境&#xff1a;idea &#x1f4e2;树结构&#xff0c;以及叶子&#xff0c;结点&#xff0c;度等一些名词是什么意思&#xff0c;本篇不再赘述。 【java数据结构】模拟二叉树的链式结构之孩子表示法&#xff0c;掌握背后的实现逻辑 1. 认识二叉树1.1 二叉树…

桂院校园导航 | 云上高校导航 云开发项目 二次开发教程 2.0

Gitee代码仓库&#xff1a;桂院校园导航小程序 GitHub代码仓库&#xff1a;GLU-Campus-Guide 演示视频 【校园导航小程序】2.0版本 静态/云开发项目 演示 云开发项目 2.0版本 升级日志 序号 板块 详情 1 首页 重做了首页&#xff0c;界面更加高效和美观 2 校园页 新增…

Python判断结构20个实例

基本理论基础 Python中的选择判断结构是一种编程中常用的控制结构&#xff0c;它用于根据条件的真假决定程序的执行路径。选择判断结构有多种类型&#xff0c;包括if语句、if-else语句、if-elif-else语句以及嵌套的选择结构。 首先&#xff0c;我们来介绍最常见的if语句。if语…

浅谈WPF之Binding数据校验和类型转换

在WPF开发中&#xff0c;Binding实现了数据在Source和Target之间的传递和流通&#xff0c;就像现实生活中的一条条道路&#xff0c;建立起了城镇与城镇之间的衔接&#xff0c;而数据校验和类型转换&#xff0c;就像高速公路之间的收费站和安检站。那在WPF开发中&#xff0c;如何…

引入本地图片报错:require is not defined

文章目录 问题分析1. 原始写法2. 最初的解决方案3. 尝试使用 require 引入4. 封装方法进行解析引入图片 问题 Vue3 Vite 使用本地图片报错&#xff1a;require is not defined 分析 1. 原始写法 刚开始我是这样写的&#xff0c;数据是这样定义的&#xff0c;但是数据没出…

Vue.js+SpringBoot开发高校实验室管理系统

目录 一、摘要1.1 项目介绍1.2 项目录屏 二、研究内容2.1 实验室类型模块2.2 实验室模块2.3 实验管理模块2.4 实验设备模块2.5 实验订单模块 三、系统设计3.1 用例设计3.2 数据库设计 四、系统展示五、样例代码5.1 查询实验室设备5.2 实验放号5.3 实验预定 六、免责说明 一、摘…