Map Reduce高级篇:Join-Reduce

news2024/11/15 22:28:58

Join关联操作

背景

在实际的数据库应用中,我们经常需要从多个数据表中读取数据,这时就可以使用SQL语句中的连接(JOIN),在两个或者多个数据表中查询数据。在使用MapReduce框架进行数据查询的过程中,也会涉及到从多个数据集中读取数据,进行Join关联操作,只不过此时需要使用Java代码并根据MapReduce的编程规范实现这个业务。

image-20230419213547374

由于MapReduce的分布式设计理念,对于MapReduce实现Join操作具备了一定的特殊性。特殊性主要体现于:==究竟在MapReduce的什么阶段进行数据集的关联操作?==是mapper阶段还是reduce阶段,之间的区别又是什么?

基于此,整个MapReduce的join可以分为两个阶段:Map阶段进行合并(Map Side Join ),Reduce阶段进行合并(Reduce Side Join)

Reduce阶段进行关联操作 Reduce Side Join

reduce side join ,顾名思义,就是在Reduce阶段进行关联操作,这是最容易想到和实现的join方式,因为通过shuffle过程就可以将相关的数据分到相同的分组中,这将为后面的join操作提供了便捷。

image-20230419214152613

弊端

reduce端join的最大问题就是整个join操作都是在reduce阶段完成的,但是通常情况下,reduce的并行度是极小的(默认是1),这就使得所有的数据都挤压到reduce阶段处理,压力颇大,虽然说可以设置reduce的并行度,但是优惠导致最终结果被分散到多个不同的文件中。并且数据从mapper到reduce的过程中,shuffle阶段十分繁琐,数据集打的时候成本极高

image-20230419214806445

在这里进行一个实验

合并订单数据

数据集阿里云盘分享 (aliyundrive.com)

思路

在这里合并的是两个文本文件,两个文本一个是商品信息,一个是订单信息,二者通过商品编号进行关联,

此外两个文件在同一个文件夹下,那么读取的时候就需要解决一个问题: 如何去问两个文件?两个文件读取后交给Reduce如何解决?

  • 读取文件可以通过setup方法来获取文件名,通过文件名进行区分
  • 在这里数据量比较小,可以直接使用字符串来进行,在使用字符串的时候可以在前缀添加一个能够区分的字符,在Reduce中先将字符给取出来
Mapper
package MapReduceTest.join.reduce;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;

import java.io.IOException;

/**
 * @author wxk
 * @date 2023/04/20/8:09
 */
public class ReduceJoinMapper extends Mapper<LongWritable, Text, Text, Text> {
    String fileName;
    Text outKey = new Text();
    Text outValue = new Text();
    StringBuilder sb = new StringBuilder();

    @Override
    protected void setup(Context context) throws IOException, InterruptedException {
        FileSplit split = (FileSplit) context.getInputSplit();
        //获取文件名
        fileName = split.getPath().getName();
    }

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        //清空内容
        sb.setLength(0);

        String[] split = value.toString().split("\\|");
        //订单
        if ("orders.txt".equals(fileName)) {
            outKey.set(split[1]);
            sb.append(split[0]).append("\t").append(split[2]);
            //为了更好的分辨,在前缀上加入文件名
            outValue.set(sb.insert(0, "orders@").toString());
        } else {
            outKey.set(split[0]);
            sb.append(split[1]).append("\t").append(split[2]);
            outValue.set(sb.insert(0, "goods@").toString());
        }
            context.write(outKey,outValue);
        //数据写出
    }
}

Reduce
package MapReduceTest.join.reduce;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

/**
 * @author wxk
 * @date 2023/04/20/16:37
 */
public class ReduceJoinReduce extends Reducer<Text, Text, Text, Text> {
    List<String> goodsList = new ArrayList<>();
    List<String> ordersList = new ArrayList<>();
    Text outValue = new Text();

    @Override
    protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {

        for (Text value : values) {
            String[] s = value.toString().split("@");
            System.out.println("==="+s[1]);
            if ("orders".equals(s[0])) {
                //是以orders@开头,表明是orders数据
                // System.out.println("==="+s[1]);
                ordersList.add(s[1]);
            } else {
                goodsList.add(s[1]);
            }
        }
        int goodS = goodsList.size();
        int orderS= ordersList.size();
        for( int i = 0;i< goodS;i++){
            for( int j = 0;j < orderS;j++){
                outValue.set(ordersList.get(j)+ "\t"+goodsList.get(i) );
                context.write(key,outValue);
            }
        }
        goodsList.clear();
        ordersList.clear();


    }
}
Driver
package MapReduceTest.join.reduce;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

/**
 * @author wxk
 * @date 2023/04/20/16:50
 */
public class ReduceJoinDriver {
    public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf, ReduceJoinDriver.class.getSimpleName());
        //设置Mapper驱动
        job.setMapperClass(ReduceJoinMapper.class);
        //设置驱动
        job.setJarByClass(ReduceJoinDriver.class);
        //设置Mapper输出Key的类型
        job.setMapOutputKeyClass(Text.class);
        //设置Mapper输出Value的类型
        job.setMapOutputValueClass(Text.class);
        //设置Reduce
        job.setReducerClass(ReduceJoinReduce.class);
        //设置Reduce输出的Key的类型
        job.setOutputKeyClass(Text.class);
        //设置Reduce输出Value的类型
        job.setOutputValueClass(Text.class);
        //设置输入路径
        FileInputFormat.setInputPaths(job,new Path("E:/MapReduceTest/JoinTest"));
        //设置输出格式
        FileOutputFormat.setOutputPath(job,new Path("E:/MapReduceTest/JoinTestOut"));
        boolean b = job.waitForCompletion(true);
        System.out.println(b ? 0: 1);
    }

}
运行结果如下:

image-20230422101904814

他这个是根据商品ID进行排序的,但是我们像根据订单编号进行排序

排序

package MapReduceTest.join.reduce;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

/**
 * @author wxk
 * @date 2023/04/22/8:51
 */
public class ReduceJoinSort {
    public static class ReduceJoinSortMapper extends Mapper<LongWritable,Text,Text, Text>{
        Text outKey= new Text();
        Text outValue=new Text();
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            String[] item = value.toString().split("\t");
            //设置订单编号为key
            outKey.set(item[1]);
            //订单编号 商品ID 商品编码 商品名称
            outValue.set(item[1]+ "\t" + item[0]+ "\t"+ item[3] +"\t"+item[4] + "\t" + item[2]);
            context.write(outKey,outValue);
        }
    }
    public static class ReduceJoinSortReduce extends Reducer<Text,Text, NullWritable,Text>{

        @Override
        protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
            for(Text value :values){
                context.write(NullWritable.get(),value);
            }
        }
    }
    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf, ReduceJoinDriver.class.getSimpleName());
        //设置Mapper驱动
        job.setMapperClass(ReduceJoinSortMapper.class);
        //设置驱动
        job.setJarByClass(ReduceJoinSort.class);
        //设置Mapper输出Key的类型
        job.setMapOutputKeyClass(Text.class);
        //设置Mapper输出Value的类型
        job.setMapOutputValueClass(Text.class);
        //设置Reduce
        job.setReducerClass(ReduceJoinSortReduce.class);
        //设置Reduce输出的Key的类型
        job.setOutputKeyClass(NullWritable.class);
        //设置Reduce输出Value的类型
        job.setOutputValueClass(Text.class);
        //设置输入路径
        FileInputFormat.setInputPaths(job,new Path("E:/MapReduceTest/JoinTestOut"));
        //设置输出格式
        FileOutputFormat.setOutputPath(job,new Path("E:/MapReduceTest/JoinTestSortOut"));
        boolean b = job.waitForCompletion(true);
        System.out.println(b ? 0: 1);
    }
}

由于逻辑相对比较简单,就将三个合二为一,以下是输出结果,看起来相对有序点。 确信.jpg

image-20230422102446050

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/446869.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

React Refs

React 支持一种非常特殊的属性 Ref &#xff0c;可以用来绑定到 render() 输出的任何组件上。 这个特殊的属性允许引用 render() 返回的相应的支撑实例 &#xff08; backing instance &#xff09;。这样就可以确保在任何时间总是拿到正确的实例。 使用&#xff1a; 绑定一…

Linux基础—DHCP原理与配置

Linux基础—DHCP原理与配置 一、DHCP工作原理1.了解DHCP服务使用DHCP的优势DHCP的分配方式 2.DHCP的IP地白动获取工作原理: 二、配置DHCP服务器三、DHCP场景应用实验 一、DHCP工作原理 1.了解DHCP服务 DHCP(Dynamic HostConfiguration Protocol&#xff0c;动态主机配置协议) …

网络基础入门

目录 网络存在的意义 网络发展 网络在哪里&#xff1f; 网络是分层的 理解分层 软件可以分层 网络协议栈也是层状结构 认识协议 协议是什么 协议分层 网络传输需要解决的四个问题 OSI七层模型&#xff08;了解为主&#xff09; TCP/IP五层&#xff08;或四层&…

LVS --一文精通

目录 dns解析 下一跳机制 LVS:NAT LVS: IP TUN隧道 LVS: DR DR> TUN > NAT > FULL NAT dns解析 DNS本地域名服务器&#xff0c;当用户访问一个网址&#xff0c;计算机就会提出域名解析请求&#xff0c;并发给本地域名服务器&#xff0c;本地域名服务器收到请求…

你真正了解低代码么?(国内低代码平台状况分析)

■ 写在前面■ 低代码产品如何分类&#xff0c;90% 的人都没有搞清楚■ 低代码平台如何比较&#xff1f;Point 在哪儿&#xff1f;一个比喻大家全听懂■ “拼”出来的低代码平台&#xff0c;真的好用吗&#xff1f;■ 推荐一款 C 端低代码产品 ■ 写在前面 都说技术是生产力&a…

每日一个小技巧:1分钟告诉你如何给黑白照片上色

你是否有过这样的经历&#xff1a;在家翻出爷爷奶奶的旧照片时&#xff0c;发现它们都是黑白色的&#xff0c;无法体现当时的真实色彩&#xff1f;由于一些老照片的拍摄时间较早&#xff0c;因此都是以黑白形式存在的&#xff0c;这样的照片不仅影响观感&#xff0c;还抑制了我…

SAS学习第11章:试验设计

试验设计必须遵循以下原则&#xff1a;重复、随机、局部控制。 重复即每个处理都要有至少2个试验单位&#xff0c;目的是估计试验误差&#xff0c;降低试验误差。若只有1个观测值&#xff0c;无法估计试验误差。平均数抽样误差估计值大小与重复次数的平方成反比&#xff0c;适…

Linux基础——DNS服务器原理及搭建

Linux基础——DNS服务器原理及搭建 一、DNS服务器原理1.DNS系统分布式数据结构2.DNS查询类型3.DNS服物器类型 二、搭建DNS域名解析服务器步骤1.安装bind软件包2. 查看需要修改的配置文件所在路径3. 修改主配置文件4. 修改区域配置文件&#xff0c;添加正向区域配置5.配置正向区…

量化择时——LSTM深度学习量化择时(第1部分—因子测算)

之前我们尝试使用SVM&#xff0c;将时序数据转为横截面的数据&#xff0c;使用机器学习的方法进行预测 量化择时——SVM机器学习量化择时&#xff08;第1部分—因子测算&#xff09;&#xff1a; https://blog.csdn.net/weixin_35757704/article/details/129909497 但是因为股…

轻松解决ChatGPT网络报错,畅享沟通

ChatGPT的确很不错&#xff0c;无论是在什么岗位&#xff0c;使用它都可以让工作的你提升效率&#xff0c;可是我们经常会遇到一个神奇的网络报错&#xff08;当我们一会不使用就来个这样的效果提示&#xff09;&#xff0c;是不是头大&#xff1f; 好了&#xff0c;开始进入正…

实验二:华为云ModelArts订阅口罩检测模型

华为云ModelArts订阅口罩检测模型 1、通过实验提示的网址进入口罩检测模型&#xff1b;拖动窗口找到并点击订阅按钮&#xff1b;勾选温馨提示中复选框内容&#xff1b; 2、点击红色“前往控制台”按钮&#xff0c;选择华北-北京四&#xff0c;并确定&#xff1b; 3、等待同步完…

Jakarta EE - Java EE的又一个名字

Jakarta EE并不是新技术&#xff0c;他的前身就是大家熟悉的Java EE&#xff0c;老一辈的程序员可能还记得J2EE&#xff0c;是的&#xff0c;他们都是同一个东西&#xff0c;至于为什么会改来改去&#xff0c;这里面就有很多故事了。 1998年12月&#xff0c;SUN公司发布了JDK1…

PySide6/PyQT多线程之 线程锁/线程安全

前言 PySide6/PyQT多线程同时访问同一个共享资源或对象&#xff0c;程序可能会出现预期之外的结果。所以需要考虑线程安全问题。 使用PySide6/PyQT开发GUI应用程序&#xff0c;在多个线程同时访问同一个共享对象时候&#xff0c;如果没有进行同步处理那就可能会导致数据不一致或…

HTML5 <q> 标签、HTML5 <rp> 标签

HTML5 <q> 标签 实例 HTML5 <q>标签用于定义一个短引用。请参考一下内容&#xff1a; 标记一个短的引用&#xff1a; <p>WWFs goal is to: <q>Build a future where people live in harmony with nature.</q> We hope they succeed.</p&g…

019:Mapbox GL加载天地图(影像瓦片图)

第019个 点击查看专栏目录 本示例的目的是介绍演示如何在vue+mapbox中加载天地图(影像瓦片图)。 直接复制下面的 vue+mapbox源代码,操作2分钟即可运行实现效果 文章目录 示例效果配置方式示例源代码(共80行)相关API参考:专栏目标示例效果 配置方式 1)查看基础设置:h…

2023年定向增发研究报告

第一章 行业概况 定向增发是增发的一种&#xff0c;是指上市公司向符合条件的少数特定投资者非公开发行股份的行为&#xff0c;有时也称“定向募集”或“私募”。定向增发的发行价格由参与增发的投资者竞价决定&#xff0c;发行程序与公开增发相比较为灵活。一般认为&#xff…

中国地图标准坐标和投影参数

目录 一、地理坐标 二、投影坐标 三、ArcGIS投影变换 四、说明 一、地理坐标 GCS_Krasovsky_1940&#xff08;克拉索夫斯基_1940椭球体&#xff09; 具体参数如下图&#xff1a; 每个国家或地区都有各自的基准面&#xff0c;我们通常所说的北京54坐标系、西安80坐标系实际上…

天梯赛练习题集

L2-005 集合相似度 find函数&#xff0c;Nt用集合关系求 #include <bits/stdc.h> #define ios ios::sync_with_stdio(0),cin.tie(0) #define PII pair<int,int> typedef long long ll; const int N1e610; const int inf0x3f3f3f3f;using namespace std; int n,k;…

深度学习(9)之 easyOCR使用详解

easyOCR使用详解 本文在 OCR-easyocr初识 基础上进行修改EasyOCR 是一个python版的文字识别工具。目前支持80中语言的识别。其对应的 github 地址&#xff1a;EasyOCR可以在网站版测试 demo 测试效果&#xff1a;https://www.jaided.ai/easyocr/其在字符识别上的效果如下&…

学系统集成项目管理工程师(中项)系列07_信息(文档)管理

1. 信息系统相关信息&#xff08;文档&#xff09; 1.1. 是指某种数据媒体和其中所记录的数据 1.2. 永久性 1.3. 由人或机器阅读 1.4. 仅用于描述人工可读的东西 2. 分类 2.1. 开发文档 2.1.1. 可行性研究报告和项目任务书 2.1.2. 需求规格说明 2.1.3. 功能规格说明 …