Hadoop3:MapReduce中的Reduce Join和Map Join

news2024/11/8 3:06:40

一、概念说明

学过MySQL的都知道,join和left join
这里的join含义和MySQL的join含义一样
就是对两张表的数据,进行关联查询

Hadoop的MapReduce阶段,分为2个阶段
一个Map,一个Reduce
那么,join逻辑,就可以在这两个阶段实现。

两者有什么区别了?
我们都知道,一般情况下,MapTaskReduceTask线程数更多。
所以,当两张表,有一个表数据量非常大,一个表非常小的时候
我们建议放在Map阶段进行join,这样可以提高性能。

二、需求说明

有两张表数据
在这里插入图片描述
将商品信息表中数据根据商品pid合并到订单数据表中
在这里插入图片描述

三、代码实现

1、Reduce Join

TableBean

package com.atguigu.mapreduce.reduceJoin;

import org.apache.hadoop.io.Writable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

public class TableBean implements Writable {

    private String id; // 订单id
    private String pid; // 商品id
    private int amount; // 商品数量
    private String pname;// 商品名称
    private String flag; // 标记是什么表 order pd

    // 空参构造
    public TableBean() {
    }

    public String getId() {
        return id;
    }

    public void setId(String id) {
        this.id = id;
    }

    public String getPid() {
        return pid;
    }

    public void setPid(String pid) {
        this.pid = pid;
    }

    public int getAmount() {
        return amount;
    }

    public void setAmount(int amount) {
        this.amount = amount;
    }

    public String getPname() {
        return pname;
    }

    public void setPname(String pname) {
        this.pname = pname;
    }

    public String getFlag() {
        return flag;
    }

    public void setFlag(String flag) {
        this.flag = flag;
    }

    @Override
    public void write(DataOutput out) throws IOException {
        out.writeUTF(id);
        out.writeUTF(pid);
        out.writeInt(amount);
        out.writeUTF(pname);
        out.writeUTF(flag);
    }

    @Override
    public void readFields(DataInput in) throws IOException {

        this.id = in.readUTF();
        this.pid = in.readUTF();
        this.amount = in.readInt();
        this.pname = in.readUTF();
        this.flag = in.readUTF();
    }

    @Override
    public String toString() {
        // id	pname	amount
        return  id + "\t" +  pname + "\t" + amount ;
    }
}

TableMapper
源数据,是多个文件的时候,我们要在setup方法里,获取文件信息
这样才能在map方法里知道,当前读取的是哪个文件,从而实现区别处理。

package com.atguigu.mapreduce.reduceJoin;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;

import java.io.IOException;

public class TableMapper extends Mapper<LongWritable, Text, Text, TableBean> {

    private String fileName;
    private Text outK  = new Text();
    private TableBean outV = new TableBean();

    @Override
    protected void setup(Context context) throws IOException, InterruptedException {
        // 初始化  order  pd
        FileSplit split = (FileSplit) context.getInputSplit();

        fileName = split.getPath().getName();
    }

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        // 1 获取一行
        String line = value.toString();

        // 2 判断是哪个文件的
        if (fileName.contains("order")){// 处理的是订单表

            String[] split = line.split("\t");

            // 封装k  v
            outK.set(split[1]);
            outV.setId(split[0]);
            outV.setPid(split[1]);
            outV.setAmount(Integer.parseInt(split[2]));
            outV.setPname("");
            outV.setFlag("order");

        }else {// 处理的是商品表
            String[] split = line.split("\t");

            outK.set(split[0]);
            outV.setId("");
            outV.setPid(split[0]);
            outV.setAmount(0);
            outV.setPname(split[1]);
            outV.setFlag("pd");
        }

        // 写出
        context.write(outK, outV);
    }
}

TableReducer

这里要注意
for循环处理bean list的时候,我们要在循环里面,new一个bean,存入list中
因为,Hadoop中,Iterable里存放的是地址,所以,不在循环内new一个bean来存放
会导致数据覆盖,最终只是存了一个bean

package com.atguigu.mapreduce.reduceJoin;

import org.apache.commons.beanutils.BeanUtils;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.util.ArrayList;

public class TableReducer extends Reducer<Text, TableBean,TableBean, NullWritable> {

    @Override
    protected void reduce(Text key, Iterable<TableBean> values, Context context) throws IOException, InterruptedException {
//        01 	1001	1   order
//        01 	1004	4   order
//        01	小米   	     pd
        // 准备初始化集合
        ArrayList<TableBean> orderBeans = new ArrayList<>();
        TableBean pdBean = new TableBean();

        // 循环遍历
        for (TableBean value : values) {

            if ("order".equals(value.getFlag())){// 订单表

                TableBean tmptableBean = new TableBean();

                try {
                    BeanUtils.copyProperties(tmptableBean,value);
                } catch (IllegalAccessException e) {
                    e.printStackTrace();
                } catch (InvocationTargetException e) {
                    e.printStackTrace();
                }

                orderBeans.add(tmptableBean);
            }else {// 商品表

                try {
                    BeanUtils.copyProperties(pdBean,value);
                } catch (IllegalAccessException e) {
                    e.printStackTrace();
                } catch (InvocationTargetException e) {
                    e.printStackTrace();
                }
            }
        }

        // 循环遍历orderBeans,赋值 pdname
        for (TableBean orderBean : orderBeans) {

            orderBean.setPname(pdBean.getPname());

            context.write(orderBean,NullWritable.get());
        }
    }
}

TableDriver

package com.atguigu.mapreduce.reduceJoin;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class TableDriver {

    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        Job job = Job.getInstance(new Configuration());

        job.setJarByClass(TableDriver.class);
        job.setMapperClass(TableMapper.class);
        job.setReducerClass(TableReducer.class);

        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(TableBean.class);

        job.setOutputKeyClass(TableBean.class);
        job.setOutputValueClass(NullWritable.class);

        FileInputFormat.setInputPaths(job, new Path("E:\\workspace\\data\\inputtable"));
        FileOutputFormat.setOutputPath(job, new Path("E:\\workspace\\data\\join1"));

        boolean b = job.waitForCompletion(true);
        System.exit(b ? 0 : 1);
    }

}

测试

在这里插入图片描述在这里插入图片描述

数据变化

1、源数据

在这里插入图片描述

2、Map方法中,按行读取数据

在这里插入图片描述

3、Shuffle阶段排序

因为,map方法中,用pid作为key,所以,这里对pid进行排序
在这里插入图片描述

4、Reduce方法,按key读取数据

这里的key只有3个,所以,reduce被调用了3次
每封装好一条数据,就write一次
reduce方法执行完毕后,进行归并排序,得到最终数据文件,输出到磁盘
在这里插入图片描述

2、Map Join

关键技术:
采用DistributedCache,在map阶段缓存小表数据
并且,取消reduce阶段

MapJoinDriver
关键代码:

        // 加载缓存数据
        job.addCacheFile(new URI("file:///D:/input/tablecache/pd.txt"));
        
        //缓存普通文件到Task运行节点。
		//job.addCacheFile(new URI("file:///e:/cache/pd.txt"));
		//如果是集群运行,需要设置HDFS路径
		//job.addCacheFile(new URI("hdfs://hadoop102:8020/cache/pd.txt"));
		
        // Map端Join的逻辑不需要Reduce阶段,设置reduceTask数量为0
        job.setNumReduceTasks(0);
package com.atguigu.mapreduce.mapjoin;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;

public class MapJoinDriver {
    public static void main(String[] args) throws IOException, URISyntaxException, ClassNotFoundException, InterruptedException {

        // 1 获取job信息
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);
        // 2 设置加载jar包路径
        job.setJarByClass(MapJoinDriver.class);
        // 3 关联mapper
        job.setMapperClass(MapJoinMapper.class);
        // 4 设置Map输出KV类型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(NullWritable.class);
        // 5 设置最终输出KV类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(NullWritable.class);

        // 加载缓存数据
        job.addCacheFile(new URI("file:///D:/input/tablecache/pd.txt"));
        // Map端Join的逻辑不需要Reduce阶段,设置reduceTask数量为0
        job.setNumReduceTasks(0);

        // 6 设置输入输出路径
        FileInputFormat.setInputPaths(job, new Path("D:\\input\\inputtable2"));
        FileOutputFormat.setOutputPath(job, new Path("D:\\hadoop\\output8888"));
        // 7 提交
        boolean b = job.waitForCompletion(true);
        System.exit(b ? 0 : 1);
    }

}

MapJoinMapper
setup方法中,使用driver中配置的小表文件路径,创建流,并将数据缓存起来,供map方法使用。

package com.atguigu.mapreduce.mapjoin;

import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.URI;
import java.util.HashMap;

public class MapJoinMapper extends Mapper<LongWritable, Text, Text, NullWritable> {
    private HashMap<String, String> pdMap = new HashMap<>();
    private Text outK = new Text();

    @Override
    protected void setup(Context context) throws IOException, InterruptedException {
        // 获取缓存的文件,并把文件内容封装到集合 pd.txt
        URI[] cacheFiles = context.getCacheFiles();

        FileSystem fs = FileSystem.get(context.getConfiguration());
        FSDataInputStream fis = fs.open(new Path(cacheFiles[0]));

        // 从流中读取数据
        BufferedReader reader = new BufferedReader(new InputStreamReader(fis, "UTF-8"));

        String line;
        while (StringUtils.isNotEmpty(line = reader.readLine())) {
            // 切割
            String[] fields = line.split("\t");

            // 赋值
            pdMap.put(fields[0], fields[1]);
        }

        // 关流
        IOUtils.closeStream(reader);
    }

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

        // 处理 order.txt
        String line = value.toString();

        String[] fields = line.split("\t");

        // 获取pid
        String pname = pdMap.get(fields[1]);

        // 获取订单id 和订单数量
        // 封装
        outK.set(fields[0] + "\t" + pname + "\t" + fields[2]);

        context.write(outK, NullWritable.get());
    }
}

测试

在这里插入图片描述在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1861869.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Vlog视频如何剪辑 Vlog视频剪辑逻辑 视频剪辑制作教程

剪出感觉、剪出情绪&#xff0c;给Vlog视频注入高级氛围感。不用购买昂贵的前期设备&#xff0c;正确地剪辑思维搭配一款好用的视频剪辑软件&#xff0c;你也能剪出令人惊艳的Vlog作品。请收藏本文并反复练习&#xff0c;相信在不久的将来&#xff0c;您的作品必会让人眼前一亮…

宣告 adsb.exposed - 基于 ClickHouse 的 ADS-B 航班数据交互式可视化和分析

本文字数&#xff1a;10340&#xff1b;估计阅读时间&#xff1a;26 分钟 审校&#xff1a;庄晓东&#xff08;魏庄&#xff09; 本文在公众号【ClickHouseInc】首发 Meetup活动 ClickHouse 上海首届 Meetup 讲师招募中&#xff0c;欢迎讲师在文末扫码报名&#xff01; 也许你已…

开发RpcProvider的网络服务

首先更改src的CMakeLists.txt的内容为&#xff1a; #当前目录的所有源文件放入SRC_LIST aux_source_directory(. SRC_LIST)#生成SHARED动态库 #add_library(mprpc SHARED ${SRC_LIST})#由于muduo是静态库&#xff0c;为了使用muduo&#xff0c;将mprpc也生成为静态库 add_libr…

【Docker】镜像

目录 1. 镜像拉取 2. 镜像查询 3. 镜像导出 4. 镜像上传 5. 镜像打标签 6. 镜像上推 7. 镜像删除 8. 镜像运行及修改 8.1 在registry 节点运行 mariadb 镜像&#xff0c;将宿主机 13306 端口作为容器3306 端口映射 8.2 查看容器ID 8.3 进入容器 8.4 创建数据库xd_d…

第三方软件测评中心▏软件系统测试详情介绍

软件系统测试是指对软件系统的功能、性能、安全等方面进行全面测试和评估的过程。在软件开发生命周期的不同阶段&#xff0c;通过各种测试手段和工具&#xff0c;对软件系统进行验证和确认&#xff0c;以确保软件系统的质量和可靠性。 软件系统测试的内容包括以下几个方面&…

基于4G工业路由器的连锁品牌店铺组网监测

基于4G工业路由器的连锁品牌店铺组网监测是智慧城市建设中至关重要的任务&#xff0c;它涉及到营运管理等多方面&#xff0c;应用物联网技术可确保店铺运营的高效、安全和可靠。 连锁品牌店铺遍布城市各领域&#xff0c;甚至跨城市部署&#xff0c;分布广泛。这对集团总部的管…

用定时器T1中断控制两个LED以不同周期闪烁

#include<reg51.h> // 包含51单片机寄存器定义的头文件 sbit D1P2^0; //将D1位定义为P2.0引脚 sbit D2P2^1; //将D2位定义为P2.1引脚 unsigned char Countor1; //设置全局变量&#xff0c;储存定时器T1中断次数 unsigned char Countor2; //设置全局变量&#xff0c;储…

Python中的并发编程(5)PyQt 多线程

PyQt 多线程 1 卡住的计时器 我们定义了一个计时器&#xff0c;每秒钟更新一次显示的数字。此外我们定义了一个耗时5秒的任务oh_no&#xff0c;和按钮“危险”绑定。 当我们点击“危险”按钮时&#xff0c;程序去执行oh_no&#xff0c;导致显示停止更新了。 import sys im…

AI早班车2024.6.25

全球AI新闻速递 1.高通&#xff1a;开放 AI 模型&#xff0c;帮助开发者打造骁龙 X Elite 平台智能应用。 2.OpenAI&#xff1a;收购数据库分析公司Rockset。 3.大众海外版车型支持 ChatGPT。 4.乐聚夸父人形机器人&#xff0c;搭载华为云盘古具身智能大模型。 5.微软正努力…

【吊打面试官系列-Mysql面试题】你可以用什么来确保表格里的字段只接受特定范围里的值?

大家好&#xff0c;我是锋哥。今天分享关于 【你可以用什么来确保表格里的字段只接受特定范围里的值?】面试题&#xff0c;希望对大家有帮助&#xff1b; 你可以用什么来确保表格里的字段只接受特定范围里的值? 答&#xff1a;Check 限制&#xff0c;它在数据库表格里被定义&…

策略模式-通过枚举newInstance替代工厂

策略模式-使用枚举newInstance 前言一、枚举类&#xff1a;MarkCheckDataTypeEnum二、抽象类&#xff1a;AbstractMarkChecker三、检查类&#xff1a;MarkPeopleChecker四、demo演示总结 前言 很久没写文章了~~ 吐槽下&#xff1a;入职新公司后&#xff0c;基本在搬砖&#xf…

好的精益管理咨询公司是什么样

在竞争激烈的商业环境中&#xff0c;企业的成功不仅取决于其产品或服务的质量&#xff0c;更在于其内部管理的精细化和高效性。精益管理作为一种追求最大价值、消除浪费的管理哲学&#xff0c;已经越来越受到企业的重视。那么&#xff0c;一家好的精益管理咨询公司究竟是什么样…

【SQL Server点滴积累】Setup SQL Server 2008 Database Mirror (二)

【SQL Server点滴积累】Setup SQL Server 2008 Database Mirror (一)-CSDN博客今天分享SQL Server 2008 R2搭建数据库镜像(Database Mirror)https://blog.csdn.net/ncutyb123/article/details/139749117?spm1001.2014.3001.5501本篇Blog基于以上Blog步骤进行SQL Server 2008 R…

excel表格加密:电脑文件加密的5个方法介绍【新手篇】

为了防止数据泄露&#xff0c;编辑好表格文件后一般都会加上密码。敏感数据的泄露会导致严重的商业损失和声誉损害。Excel表格加密方法有很多&#xff0c;包括金舟文件夹加密大师、金舟ZIP解压缩、工作簿密码设置等方法。 下面分享5个excel表格加密方法&#xff0c;希望能够帮到…

51单片机STC89C52RC——8.2 8*8 LED点阵模块(动态图像)

目的/效果 在《51单片机STC89C52RC——8.1 8*8 LED点阵模块&#xff08;点亮一个LED&#xff09;》我们点亮一个LED&#xff0c;接下来我们将在8*8的矩阵中展示动态的图像。 1&#xff1a;单列展示&#xff1a; 2&#xff1a;单行展示 3&#xff1a;笑脸 4&#xff1a;右移…

【D3.js in Action 3 精译】第一部分 D3.js 基础知识

第一部分 D3.js 基础知识 欢迎来到 D3.js 的世界&#xff01;可能您已经迫不及待想要构建令人惊叹的数据可视化项目了。我们保证&#xff0c;这一目标很快就能达成&#xff01;但首先&#xff0c;我们必须确保您已经掌握了 D3.js 的基础知识。这一部分提到的概念将会在您后续的…

优化系统小工具

一款利用VB6编写的系统优化小工具&#xff0c;系统优化、桌面优化、清理垃圾、查找文件等功能。 下载:https://download.csdn.net/download/ty5858/89432367

Vue3 国际化i18n

国际化i18n方案 1. 什么是i18n2. i18n安装、配置及使用2.1 安装2.2 配置2.3 挂载到实例2.4 组件中使用2.5 语言切换 1. 什么是i18n i18n 是“国际化”的简称。在资讯领域&#xff0c;国际化(i18n)指让产品&#xff08;出版物&#xff0c;软件&#xff0c;硬件等&#xff09;无…

udp Socket组播 服务器

什么是组播 组播也可以称之为多播这也是 UDP 的特性之一。组播是主机间一对多的通讯模式&#xff0c;是一种允许一个或多个组播源发送同一报文到多个接收者的技术。组播源将一份报文发送到特定的组播地址&#xff0c;组播地址不同于单播地址&#xff0c;它并不属于特定某个主机…

美业门店收银系统Java源码分享私、美业系统中哪种状态订单可以操作退款?

美业门店收银系统是一种专为美容美发、美甲、SPA等美业门店设计的全面性结账解决方案&#xff0c;美业门店收银系统的重要性在于它为门店提供了全面的业务管理解决方案。 美业收银管理系统可以处理销售、预约管理、库存追踪和员工绩效等多项任务&#xff0c;不仅能够简化交易流…