Hadoop-11-MapReduce JOIN 操作的Java实现 Driver Mapper Reducer具体实现逻辑 模拟SQL进行联表操作

news2025/1/11 12:44:28

章节内容

上一节我们完成了:

  • MapReduce的介绍
  • Hadoop序列化介绍
  • Mapper编写规范
  • Reducer编写规范
  • Driver编写规范
  • WordCount功能开发
  • WordCount本地测试

背景介绍

这里是三台公网云服务器,每台 2C4G,搭建一个Hadoop的学习环境,供我学习。
之前已经在 VM 虚拟机上搭建过一次,但是没留下笔记,这次趁着前几天薅羊毛的3台机器,赶紧尝试在公网上搭建体验一下。

注意,如果你和我一样,打算用公网部署,那一定要做好防火墙策略,避免不必要的麻烦!!!
请大家都以学习为目的,也请不要对我的服务进行嗅探或者攻击!!!

但是有一台公网服务器我还运行着别的服务,比如前几天发的:autodl-keeper 自己写的小工具,防止AutoDL机器过期的。还跑着别的Web服务,所以只能挤出一台 2C2G 的机器。那我的配置如下了:

  • 2C4G 编号 h121
  • 2C4G 编号 h122
  • 2C2G 编号 h123

在这里插入图片描述

业务需求

平常我们在业务上,有很多时候表都是分开的,通过一些 id 或者 code 来进行关联。
在大数据的情况下,也有很多这种情况,我们需要进行联表操作。

表1

项目编码projectCode 项目名projectName

表2

项目编码projectCode 项目类型projectType 项目分类projectFrom

SQL 中,可以通过 LEFT JOIN 来实现字段补齐。大数据下,也需要进行这样的操作,我们需要借助 MapReduce

表1测试

"8aea9ba2-435c-48bd-9751-1cbd4c344d4e"	"社区项目1"
"02d9c090-e467-42b6-9c14-52cacd72a4a8"	"社区项目2"
"244dcaca-0778-4eec-b3a2-403f8fac1dfb"	"智慧社区"
"94befb97-d1af-43f2-b5d5-6df9ce5b9393"	"公交站点"
"f44c8d10-bc92-4398-ad9b-8c11dd48ad7c"	"街道布建"
"2e556d83-bb56-45b1-8d6e-00510902c464"	"街道公交站点"
"3ba00542-eac9-4399-9c2b-3b06e671f4c9"	"未命名项目1"
"5a5982d7-7257-422f-822a-a0c2f31c28d1"	"未命名项目2"

表2测试

"8aea9ba2-435c-48bd-9751-1cbd4c344d4e"	"重要类型"	"种类1"
"02d9c090-e467-42b6-9c14-52cacd72a4a8"	"重要类型"	"种类1"
"244dcaca-0778-4eec-b3a2-403f8fac1dfb"	"重要类型"	"种类1"
"94befb97-d1af-43f2-b5d5-6df9ce5b9393"	"普通类型"	"种类1"
"f44c8d10-bc92-4398-ad9b-8c11dd48ad7c"	"普通类型"	"种类2"
"2e556d83-bb56-45b1-8d6e-00510902c464"	"普通类型"	"种类2"
"3ba00542-eac9-4399-9c2b-3b06e671f4c9"	"一般类型"	"种类2"
"5a5982d7-7257-422f-822a-a0c2f31c28d1"	"一般类型"	"种类2"

SQL连表

假设我们使用SQL的方式联表:

SELECT
  *
FROM
  t_project
LEFT JOIN
  t_project_info
ON
  t_project.projectCode=t_project_info.projectCode

Reduce JOIN

有时候,表可能过大,无法支持我们使用 SQL 进行连表查询。
这里我们编写一个程序来完成操作。

ProjectBean

这里是最终的Bean类,里边是两个表把字段补齐的结果,一会儿我们将使用这个类进行表的连接。

package icu.wzk.demo03;

import org.apache.hadoop.io.Writable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

public class ProjectBean implements Writable {

    private String projectCode;

    private String projectName;

    private String projectType;

    private String projectFrom;

    private String flag;

    @Override
    public void write(DataOutput dataOutput) throws IOException {
        dataOutput.writeUTF(projectCode);
        dataOutput.writeUTF(projectName);
        dataOutput.writeUTF(projectType);
        dataOutput.writeUTF(projectFrom);
        dataOutput.writeUTF(flag);
    }

    @Override
    public void readFields(DataInput dataInput) throws IOException {
        this.projectCode = dataInput.readUTF();
        this.projectName = dataInput.readUTF();
        this.projectType = dataInput.readUTF();
        this.projectFrom = dataInput.readUTF();
        this.flag = dataInput.readUTF();
    }

    public ProjectBean(String projectCode, String projectName, String projectType, String projectFrom, String flag) {
        this.projectCode = projectCode;
        this.projectName = projectName;
        this.projectType = projectType;
        this.projectFrom = projectFrom;
        this.flag = flag;
    }

    public ProjectBean() {

    }

    @Override
    public String toString() {
        return "ProjectBean{" +
                "projectCode='" + projectCode + '\'' +
                ", projectName='" + projectName + '\'' +
                ", projectType='" + projectType + '\'' +
                ", projectFrom='" + projectFrom + '\'' +
                ", flag=" + flag + '\'' +
                '}';
    }

    public String getProjectCode() {
        return projectCode;
    }

    public void setProjectCode(String projectCode) {
        this.projectCode = projectCode;
    }

    public String getProjectName() {
        return projectName;
    }

    public void setProjectName(String projectName) {
        this.projectName = projectName;
    }

    public String getProjectType() {
        return projectType;
    }

    public void setProjectType(String projectType) {
        this.projectType = projectType;
    }

    public String getProjectFrom() {
        return projectFrom;
    }

    public void setProjectFrom(String projectFrom) {
        this.projectFrom = projectFrom;
    }

    public String getFlag() {
        return flag;
    }

    public void setFlag(String flag) {
        this.flag = flag;
    }
}

Reduce Driver

package icu.wzk.demo03;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class ReducerJoinDriver {

    public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {

        // String inputPath = args[0];
        // String outputPath = args[1];

        // === 测试环境 ===
        String inputPath = "project_test";
        String outputPath = "project_test_output";
        // === ===

        Configuration configuration = new Configuration();
        Job job = Job.getInstance(configuration, "ReducerJoinDriver");
        job.setJarByClass(ReducerJoinDriver.class);

        job.setMapperClass(ReducerJoinMapper.class);
        job.setReducerClass(ReducerJoinReducer.class);

        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(ProjectBean.class);

        job.setOutputKeyClass(ProjectBean.class);
        job.setOutputValueClass(NullWritable.class);

        FileInputFormat.setInputPaths(job, new Path(inputPath));
        FileOutputFormat.setOutputPath(job, new Path(outputPath));

        boolean result = job.waitForCompletion(true);
        System.exit(result ? 0 : 1);
    }

}

ReduceMapper

package icu.wzk.demo03;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class ReducerJoinMapper extends Mapper<LongWritable, Text, Text, ProjectBean> {

    String name;
    ProjectBean projectBean = new ProjectBean();
    Text k = new Text();

    @Override
    protected void setup(Mapper<LongWritable, Text, Text, ProjectBean>.Context context) throws IOException, InterruptedException {
        // 获取路径信息
        name = context.getInputSplit().toString();
    }

    @Override
    protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, ProjectBean>.Context context) throws IOException, InterruptedException {
        String line = value.toString();
        if (name.contains("layout_project")) {
            // layout_project
            String[] fields = line.split("\t");
            projectBean.setProjectCode(fields[0]);
            projectBean.setProjectName(fields[1]);
            projectBean.setProjectType("");
            projectBean.setProjectFrom("");
            projectBean.setFlag("layout_project");
            // projectCode 关联
            k.set(fields[0]);
        } else {
            // project_info
            String[] fields = line.split("\t");
            projectBean.setProjectCode(fields[0]);
            projectBean.setProjectName("");
            projectBean.setProjectType(fields[1]);
            projectBean.setProjectFrom(fields[2]);
            projectBean.setFlag("project_info");
            // projectCode 关联
            k.set(fields[0]);
        }
        context.write(k, projectBean);
    }
}

ReduceReducer

package icu.wzk.demo03;

import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

public class ReducerJoinReducer extends Reducer<Text, ProjectBean, ProjectBean, NullWritable> {

    @Override
    protected void reduce(Text key, Iterable<ProjectBean> values, Reducer<Text, ProjectBean, ProjectBean, NullWritable>.Context context) throws IOException, InterruptedException {
        List<ProjectBean> dataList = new ArrayList<>();
        ProjectBean deviceProjectBean = new ProjectBean();
        for (ProjectBean pb : values) {
            if ("layout_project".equals(pb.getFlag())) {
                // layout_project
                ProjectBean projectProjectBean = new ProjectBean(
                        pb.getProjectCode(),
                        pb.getProjectName(),
                        pb.getProjectType(),
                        pb.getProjectFrom(),
                        pb.getFlag()
                );
                dataList.add(projectProjectBean);
            } else {
                // project_info
                deviceProjectBean = new ProjectBean(
                        pb.getProjectCode(),
                        pb.getProjectName(),
                        pb.getProjectType(),
                        pb.getProjectFrom(),
                        pb.getFlag()
                );
            }
        }

        for (ProjectBean pb : dataList) {
            pb.setProjectType(deviceProjectBean.getProjectType());
            pb.setProjectFrom(deviceProjectBean.getProjectFrom());
            context.write(pb, NullWritable.get());
        }
    }
}

运行结果

ProjectBean{projectCode='"02d9c090-e467-42b6-9c14-52cacd72a4a8"', projectName='"社区项目2"', projectType='"重要类型"', projectFrom='"种类1"', flag=layout_project'}
ProjectBean{projectCode='"244dcaca-0778-4eec-b3a2-403f8fac1dfb"', projectName='"智慧社区"', projectType='"重要类型"', projectFrom='"种类1"', flag=layout_project'}
ProjectBean{projectCode='"2e556d83-bb56-45b1-8d6e-00510902c464"', projectName='"街道公交站点"', projectType='"普通类型"', projectFrom='"种类2"', flag=layout_project'}
ProjectBean{projectCode='"3ba00542-eac9-4399-9c2b-3b06e671f4c9"', projectName='"未命名项目1"', projectType='"一般类型"', projectFrom='"种类2"', flag=layout_project'}
ProjectBean{projectCode='"5a5982d7-7257-422f-822a-a0c2f31c28d1"', projectName='"未命名项目2"', projectType='"一般类型"', projectFrom='"种类2"', flag=layout_project'}
ProjectBean{projectCode='"8aea9ba2-435c-48bd-9751-1cbd4c344d4e"', projectName='"社区项目1"', projectType='"重要类型"', projectFrom='"种类1"', flag=layout_project'}
ProjectBean{projectCode='"94befb97-d1af-43f2-b5d5-6df9ce5b9393"', projectName='"公交站点"', projectType='"普通类型"', projectFrom='"种类1"', flag=layout_project'}
ProjectBean{projectCode='"f44c8d10-bc92-4398-ad9b-8c11dd48ad7c"', projectName='"街道布建"', projectType='"普通类型"', projectFrom='"种类2"', flag=layout_project'}

在这里插入图片描述

方案缺点

JOIN 操作是在 reduce 阶段完成的,reduce端处理压力过大map节点的运算负载很低,资源利用不高

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1891978.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【10年有效】阿里云域名,出阿里云私人子域名

出&#xff1a;阿里云私人子域名&#xff0c;主要是帮助没域名的&#xff0c;又需要使用域名绑定程序的人。 有效期十年&#xff0c;就只要几块&#xff0c;简直是薅羊毛薅到家了~~ 本域名已经备案了。 目标&#xff1a;https://h5.m.goofish.com/item?id811115711415 ---…

【楚怡杯】职业院校技能大赛 “Python程序开发”赛项样题二

Python程序开发实训 &#xff08;时量&#xff1a;240分钟&#xff09; 中国XX 实训说明 注意事项 1. 请根据提供的实训环境&#xff0c;检查所列的硬件设备、软件清单、材料清单是否齐全&#xff0c;计算机设备是否能正常使用。 2. 实训结束后&#xff0c;将各试题代码整合…

QQ录屏文件保存在哪里?一键教你快速查询

无论是记录重要的工作内容&#xff0c;还是分享生活中的点滴&#xff0c;屏幕录制都发挥着至关重要的作用。在众多屏幕录制工具中&#xff0c;qq录屏以其简单易用、功能丰富的特点&#xff0c;受到了广大用户的喜爱。本文将为您揭示qq录屏文件保存在哪里&#xff0c;帮助大家更…

DAY18-力扣刷题

1.从前序与中序遍历序列构造二叉树 105. 从前序与中序遍历序列构造二叉树 - 力扣&#xff08;LeetCode&#xff09; 给定两个整数数组 preorder 和 inorder &#xff0c;其中 preorder 是二叉树的先序遍历&#xff0c; inorder 是同一棵树的中序遍历&#xff0c;请构造二叉树…

C# 实现位比较操作

1、目标 对两个字节进行比较&#xff0c;统计变化位数、一位发生变化的位数、二位发生变化的位数、多位发生变化的位数。 2、代码 using System; using System.Collections.Generic; using System.ComponentModel; using System.Data; using System.Drawing; using System.Lin…

大模型时代数据库技术创新

本文整理自 2024 年 6 月 ArchSummit&#xff08;深圳站&#xff09; Data4AI 和 AI4Data 方面的探索和实践案例专题的同名主题分享。 大家好&#xff0c;我今天讲的内容总共分为三部分&#xff0c;先是数据库和大模型的演变历程&#xff0c;尤其是两者的结合的过程。然后在分别…

高浓度锡回收的工艺流程

高浓度锡回收的工艺流程是一个复杂而精细的过程&#xff0c;它旨在从废旧锡制品或含锡废料中高效、环保地提取出高纯度的锡。以下是对该工艺流程的详细阐述&#xff1a; 一、收集与预处理 收集&#xff1a;高浓度锡回收的第一步是收集废旧锡制品或含锡废料&#xff0c;这些材料…

【分布式系统】监控平台Zabbix自定义模版配置

目录 一.添加Zabbix客户端主机 1.服务端跟客户端配置时间同步 2.安装 zabbix-agent2 3.修改 agent2 配置文件 4.服务端安装 zabbix-get验证客户端数据的连通性 5.Web 页面中添加 agent 主机 6.监控模板 二.自定义监控内容 1.客户端创建自定义key 1.1.明确需要执行的 …

Http 实现请求body体和响应body体的双向压缩方案

目录 一、前言 二、方案一(和http header不进行关联) 二、方案二(和http header进行关联) 三、 客户端支持Accept-Encoding压缩方式,服务器就一定会进行压缩吗? 四、参考 一、前言 有时请求和响应的body体比较大,需要进行压缩,以减少传输的带宽。 二、方案一(和…

14-18 2024 年影响企业 GenAI 的关键技术趋势

现在&#xff0c;大多数 .com 公司已于 2023 年更名为 .ai&#xff0c;那么价值万亿美元的问题是&#xff1a;接下来会发生什么&#xff1f;哪些关键障碍、工具、技术和方法将重塑格局 企业 AI 的不同之处在于&#xff0c;它专注于可衡量、可管理的输出&#xff0c;企业可以控…

配置并调试后端程序(sql)

1.环境准备 安装VS Code和Node.js插件&#xff1a;确保你已经安装了VS Code和Node.js插件。创建launch.json文件&#xff1a;在你的项目中创建一个.vscode文件夹&#xff0c;并在其中创建launch.json文件。添加以下内容&#xff1a; {"version": "0.2.0"…

如何快速申请免费SSL证书,实现网站HTTPS安全传输

随着互联网技术的飞速发展&#xff0c;网络安全已成为不可忽视的重要议题。HTTPS协议&#xff0c;作为HTTP协议的安全版本&#xff0c;通过SSL协议加密客户端与服务器之间的数据传输&#xff0c;从而保障信息在传输过程中的安全性。对于网站运营者而言&#xff0c;为网站部署SS…

C# OpenCvSharp 实现Reinhard颜色迁移算法

C# OpenCvSharp 实现Reinhard颜色迁移算法 目录 效果 项目 代码 下载 效果 项目 Reinhard颜色迁移算法的步骤&#xff1a; 1、将参考图片和目标图片转换到LAB空间下 2、得到参考图片和目标图片的均值和标准差 3、对目标图片的每一个像素值&#xff0c;减去目标图像均值然后…

Unity动画系统(2)

6.1 动画系统基础2-3_哔哩哔哩_bilibili p316 模型添加Animator组件 动画控制器 AnimatorController AnimatorController 可以通过代码控制动画速度 建立动画间的联系 bool值的设定 trigger p318 trigger点击的时候触发&#xff0c;如喊叫&#xff0c;开枪及换子弹等&#x…

【中项第三版】系统集成项目管理工程师 | 第 2 章 信息技术发展

前言 第2章对应的内容大概率仅考察选择题&#xff0c;通读教程&#xff0c;速战速决。选择题分值预计在2-5分&#xff0c;属于必考的知识点。 目录 2.1 信息技术及其发展 2.1.1 计算机软硬件 2.1.2 计算机网络 2.1.3 存储和数据库 2.1.4 信息安全 2.1.5 信息技术的发展 …

表单试卷零代码搭建平台正式上线,支持源码部署

hi, 大家好, 我是徐小夕. 之前一直在社区分享零代码&低代码的技术实践&#xff0c;也陆陆续续设计并开发了多款可视化搭建产品&#xff0c;比如&#xff1a; H5-Dooring&#xff08;页面可视化搭建平台&#xff09;V6.Dooring&#xff08;可视化大屏搭建平台&#xff09;橙…

Facebook:数字社交的引领者与创新者

自2004年诞生以来&#xff0c;Facebook从一个校园网络项目迅速成长为全球最大的社交媒体平台&#xff0c;彻底改变了我们与世界互动的方式。作为数字社交的引领者和创新者&#xff0c;Facebook不仅在技术层面上不断突破&#xff0c;也在社会和文化领域留下了深刻的印记。本文将…

【代码随想录——图论——图论理论基础】

1. 图论理论基础 1.1 图的基本概念 二维坐标中&#xff0c;两点可以连成线&#xff0c;多个点连成的线就构成了图。 当然图也可以就一个节点&#xff0c;甚至没有节点&#xff08;空图&#xff09; 1.1.1 图的种类 有向图 加权有向图无权有向图 无向图 加权无向图无权无向…

LLM调优,大模型怎么学

背景 LLM Transparency Tool 是一个用于深入分析和理解大型语言模型&#xff08;LLM&#xff09;工作原理的工具&#xff0c;旨在增加这些复杂系统的透明度。它提供了一个交互式界面&#xff0c;用户可以通过它观察、分析模型对特定输入&#xff08;prompts&#xff09;的反应…

K8S学习教程(二):在 PetaExpress KubeSphere容器平台部署高可用 Redis 集群

前言 Redis 是在开发过程中经常用到的缓存中间件&#xff0c;为了考虑在生产环境中稳定性和高可用&#xff0c;Redis通常采用集群模式的部署方式。 在制定Redis集群的部署策略时&#xff0c;常规部署在虚拟机上的方式配置繁琐并且需要手动重启节点&#xff0c;相较之下&#…