大数据培训课程Reduce Join案例实操

news2025/1/13 13:35:46

Reduce Join案例实操

1.需求

表4-4 订单数据表t_order

idpidamount
1001011
1002022
1003033
1004014
1005025
1006036

表4-5 商品信息表t_product

pidpname
01小米
02华为
03格力

       将商品信息表中数据根据商品pid合并到订单数据表中。

表4-6 最终数据形式

idpnameamount
1001小米1
1004小米4
1002华为2
1005华为5
1003格力3
1006格力6

2.需求分析

通过将关联条件作为Map输出的key,将两表满足Join条件的数据并携带数据所来源的文件信息,发往同一个ReduceTask,在Reduce中进行数据的串联,如图4-20所示。

图4-20 Reduce端表合并

3.代码实现

1)创建商品和订合并后的Bean类

package com.atguigu.mapreduce.table; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import org.apache.hadoop.io.Writable;   public class TableBean implements Writable {      private String order_id; // 订单id    private String p_id;      // 产品id    private int amount;       // 产品数量    private String pname;     // 产品名称    private String flag;      // 表的标记      public TableBean() {       super();    }      public TableBean(String order_id, String p_id, int amount, String pname, String flag) {         super();         this.order_id = order_id;       this.p_id = p_id;       this.amount = amount;       this.pname = pname;       this.flag = flag;    }      public String getFlag() {       return flag;    }      public void setFlag(String flag) {       this.flag = flag;    }      public String getOrder_id() {       return order_id;    }      public void setOrder_id(String order_id) {       this.order_id = order_id;    }      public String getP_id() {       return p_id;    }      public void setP_id(String p_id) {       this.p_id = p_id;    }      public int getAmount() {       return amount;    }      public void setAmount(int amount) {       this.amount = amount;    }      public String getPname() {       return pname;    }      public void setPname(String pname) {       this.pname = pname;    }      @Override    public void write(DataOutput out) throws IOException {       out.writeUTF(order_id);       out.writeUTF(p_id);       out.writeInt(amount);       out.writeUTF(pname);       out.writeUTF(flag);    }      @Override    public void readFields(DataInput in) throws IOException {       this.order_id = in.readUTF();       this.p_id = in.readUTF();       this.amount = in.readInt();       this.pname = in.readUTF();       this.flag = in.readUTF();    }      @Override    public String toString() {       return order_id + “\t” + pname + “\t” + amount + “\t” ;    } }

2)编写TableMapper类

package com.atguigu.mapreduce.table; import java.io.IOException; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.FileSplit;   public class TableMapper extends Mapper<LongWritable, Text, Text, TableBean>{   String name;    TableBean bean = new TableBean();    Text k = new Text();       @Override    protected void setup(Context context) throws IOException, InterruptedException {         // 1 获取输入文件切片       FileSplit split = (FileSplit) context.getInputSplit();         // 2 获取输入文件名称       name = split.getPath().getName();    }      @Override    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {             // 1 获取输入数据       String line = value.toString();             // 2 不同文件分别处理       if (name.startsWith(“order”)) {// 订单表处理             // 2.1 切割           String[] fields = line.split(“\t”);                     // 2.2 封装bean对象           bean.setOrder_id(fields[0]);           bean.setP_id(fields[1]);           bean.setAmount(Integer.parseInt(fields[2]));           bean.setPname(“”);           bean.setFlag(“order”);                     k.set(fields[1]);       }else {// 产品表处理             // 2.3 切割           String[] fields = line.split(“\t”);                     // 2.4 封装bean对象           bean.setP_id(fields[0]);           bean.setPname(fields[1]);           bean.setFlag(“pd”);           bean.setAmount(0);           bean.setOrder_id(“”);                     k.set(fields[0]);       }         // 3 写出       context.write(k, bean);    } }

3)编写TableReducer类

package com.atguigu.mapreduce.table; import java.io.IOException; import java.util.ArrayList; import org.apache.commons.beanutils.BeanUtils; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer;   public class TableReducer extends Reducer<Text, TableBean, TableBean, NullWritable> {      @Override    protected void reduce(Text key, Iterable<TableBean> values, Context context)   throws IOException, InterruptedException {         // 1准备存储订单的集合       ArrayList<TableBean> orderBeans = new ArrayList<>();       // 2 准备bean对象       TableBean pdBean = new TableBean();         for (TableBean bean : values) {             if (“order”.equals(bean.getFlag())) {// 订单表                // 拷贝传递过来的每条订单数据到集合中              TableBean orderBean = new TableBean();                try {                 BeanUtils.copyProperties(orderBean, bean);              } catch (Exception e) {                 e.printStackTrace();              }                orderBeans.add(orderBean);           } else {// 产品表                try {                 // 拷贝传递过来的产品表到内存中                 BeanUtils.copyProperties(pdBean, bean);              } catch (Exception e) {                 e.printStackTrace();              }           }       }         // 3 表的拼接       for(TableBean bean:orderBeans){             bean.setPname (pdBean.getPname());                     // 4 数据写出去           context.write(bean, NullWritable.get());       }    } }

4)编写TableDriver类

package com.atguigu.mapreduce.table; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;   public class TableDriver {      public static void main(String[] args) throws Exception {       // 0 根据自己电脑路径重新配置 args = new String[]{“e:/input/inputtable”,”e:/output1″};   // 1 获取配置信息,或者job对象实例       Configuration configuration = new Configuration();       Job job = Job.getInstance(configuration);         // 2 指定本程序的jar包所在的本地路径       job.setJarByClass(TableDriver.class);         // 3 指定本业务job要使用的Mapper/Reducer业务类       job.setMapperClass(TableMapper.class);       job.setReducerClass(TableReducer.class);         // 4 指定Mapper输出数据的kv类型       job.setMapOutputKeyClass(Text.class);       job.setMapOutputValueClass(TableBean.class);         // 5 指定最终输出的数据的kv类型       job.setOutputKeyClass(TableBean.class);       job.setOutputValueClass(NullWritable.class);         // 6 指定job的输入原始文件所在目录       FileInputFormat.setInputPaths(job, new Path(args[0]));       FileOutputFormat.setOutputPath(job, new Path(args[1]));         // 7 将job中配置的相关参数,以及job所用的java类所在的jar包, 提交给yarn去运行       boolean result = job.waitForCompletion(true);       System.exit(result ? 0 : 1);    } }

4.测试

运行程序查看结果

1001   小米   1  1001   小米   1  1002   华为   2  1002   华为   2  1003   格力   3  1003   格力   3    

5.总结

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/24509.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

2022我的前端面试总结

Webpack Proxy工作原理&#xff1f;为什么能解决跨域 1. 是什么 webpack proxy&#xff0c;即webpack提供的代理服务 基本行为就是接收客户端发送的请求后转发给其他服务器 其目的是为了便于开发者在开发模式下解决跨域问题&#xff08;浏览器安全策略限制&#xff09; 想…

盘点 | 跨平台桌面应用开发的5大主流框架

受益于开源技术的发展&#xff0c;以及响应快速开发的实际业务需求&#xff0c;跨平台开发不仅限于移动端跨平台&#xff0c;桌面端虽然在市场应用方面场景不像移动端那么丰富&#xff0c;但也有市场的需求。 相对于个人开发者而言&#xff0c;跨平台框架的使用&#xff0c;主…

Vue开发 提交后台,二维码,自定义

1. 修改title和图标 资源可以放在static下面&#xff0c;给一个小的&#xff1a; 直接再index里面改&#xff1a; 不生效&#xff0c;需要在 vue.config.js 中增加&#xff1a; module.exports {pwa: {iconPaths: {favicon32: logo.png,favicon16: logo.png,appleTouchIcon:…

阿里巴巴全新SpringCloud实战笔记(全彩版)GitHub狂揽70000标星

最近小编淘到一份宝贝&#xff01; 先看看目录&#xff1a; 这份手册真的非常全面&#xff0c;涵盖了所有SpringCloud所有的内容&#xff0c;限于文章篇幅原因&#xff0c;只能以截图的形式展示出来&#xff0c;有需要的小伙伴可以文末获取↓↓↓ 直接展示内容&#xff1a; …

react redux 状态管理

1.store store是一个状态管理容器&#xff0c;它通过createStore创建&#xff0c;createStore接收initialState和reducer两个参数。它暴露了4个api分别是&#xff1a; getState() dispatch(action) subscribe(listener) replaceReducer 前三个是比较常用的api&#xff0c;之…

葡萄糖-聚乙二醇-二茂铁Ferrocene-PEG-Glucose

葡萄糖-聚乙二醇-二茂铁Ferrocene-PEG-Glucose&#xff0c;二茂铁&#xff0c;是一种具有芳香族性质的有机过渡金属化合物&#xff0c;化学式为Fe(C5H5)2&#xff0c;常温下为橙黄色粉末&#xff0c;有樟脑气味。熔点172℃-174℃&#xff0c;沸点249℃&#xff0c;100℃以上能升…

腾讯云服务器+宝塔+后端+前端发布

1、申请云服务器。登陆。 https://cloud.tencent.com/ 创建实例 最好重置密码&#xff0c;并记住。 配置安全组&#xff0c;当我们是学习的时候&#xff0c;全部开放好了。 有些版本是去“防火墙”那里配置。 轻量应用服务器&#xff08;试用的&#xff09; 2、安装Docker。在…

Oracle LiveLabs实验:Load and Analyze Your Data with Autonomous Database

概述 本研讨会中的实验将引导您完成开始使用 Oracle 自治数据库的所有步骤。 首先&#xff0c;您将创建一个 Oracle 自治数据库实例。 然后&#xff0c;您将练习使用自治数据库工具和 API 从不同位置以不同格式加载数据的几种方法。 您将使用 SQL 分析数据并使用 Oracle Analy…

ShardingSphere笔记(二):自定义分片算法 — 按月分表

ShardingSphere笔记&#xff08;二&#xff09;&#xff1a;自定义分片算法 — 按月分表 文章目录ShardingSphere笔记&#xff08;二&#xff09;&#xff1a;自定义分片算法 — 按月分表一、准备二、分表逻辑三、自定义分片算法步骤&#xff08;以按月分表为例&#xff09;1. …

【AI工程】08-MLOps工具-在Charmed Kubeflow上运行MindSpore

作者&#xff1a;王磊 更多精彩分享&#xff0c;欢迎访问和关注&#xff1a;https://www.zhihu.com/people/wldandan 在【AI工程】02-AI工程&#xff08;AI Engineering&#xff09;面面观中&#xff0c;提到Gartner把AI工程化作为未来重要战略技术趋势&#xff0c;Gartner认为…

关于webpack(v5.74.0)的模块联邦原理

在webpack中模块联邦的实现主要依赖于两个插件ContainerReferencePlugin和ContainerPlugin&#xff0c;ContainerPlugin是用来添加入口依赖并给当前依赖添加异步依赖&#xff0c;ContainerReferencePlugin用来添加解析用户的请求并分析是否是远程模块&#xff0c;然后加载远程模…

使用 JPA、Hibernate 和 Spring Data JPA 进行审计

1. 概述 在ORM的上下文中&#xff0c;数据库审计意味着跟踪和记录与持久实体相关的事件&#xff0c;或者只是实体版本控制。受 SQL 触发器的启发&#xff0c;这些事件是对实体的插入、更新和删除操作。数据库审核的好处类似于源版本控制提供的好处。 在本教程中&#xff0c;我…

Shelby American 汽车 NFT 系列来袭!

我们在 The Sandbox 上推出 Shelby NFT 作品集&#xff0c;加入我们吧&#xff01;该系列包含 Carroll Shelby 制造的一些最稀有和最抢手的汽车&#xff0c;也是现实生活中最具收藏价值的汽车。这些汽车构成了最伟大的汽车历史&#xff0c;也是传奇人物 Carroll Shelby 的伟大代…

为什么开源在线表单工具能做好数据管理?

在数字化时代&#xff0c;数据的有效应用和管理可以说是企业的无形资产&#xff0c;做好数据管理既能提升办公效率&#xff0c;又能帮助企业从规律的数字化管理中获取高效的管理策略。那么&#xff0c;什么样的开源在线表单工具可以实现这一目的&#xff1f;对于企业而言&#…

Axure药企内部管理平台+企业内部管理系统平台

这是一款根据药企的需求设计的内部管理系统&#xff0c;此系统主要是针对市场部和销售部的管理&#xff0c;此作品选择了管理员和地区经理两个角色进行了设计&#xff0c; 设计软件&#xff1a;Axure8.1&#xff08;兼容9和10&#xff09; 作品类型&#xff1a;实战原型 其主要…

抓包神器之Charles(绕过代理屏蔽)以及证书校验绕过

简介 大多数进行渗透测试的时候都可以使用burp抓包,但有的app的部分功能会使用okhttp框架,这种框架会使App不使用默认的系统代理,解决方法就是通过proxy的方式走charles,下面是具体使用方法; Charles 是常用的网络封包截取工具, 通过将自己设置成系统的网络访问代{过}{…

11.21SSM-spring 第一天学习总结

1 Spring 是什么&#xff1f; 针对Bean 生命周期进行管理的轻量级容器 IOC : 浅谈IOC--说清楚IOC是什么_ivan820819的博客-CSDN博客_ioc 软件设计六大原则 : 设计模式六大原则 六大设计原则 1.开闭原则 定义&#xff1a;一个软件实体如类、模块和函数应该对扩展开放&a…

JavaScript/uni-app对接海康ISC openapi

JavaScript/uni-app对接海康ISC openapiJavaScript实现HMAC SHA256下载安装使用crypto-js使用签名生成工具参考JavaScript实现HMAC SHA256 Run the code online with this jsfiddle. Dependent upon an open source js library calledhttp://code.google.com/p/crypto-js/.<…

如何将驱动编译为kernel 模块

前言&#xff1a; 本文章目标平台是PC Linux,不包含其他平台。 执行下面的步骤之前&#xff0c;请先编译kernel通过。 linux KO编译 将驱动程序源码集成到Linux内核中&#xff1a; 将驱动源码文件放到drivers/net/wireless并命名 自己简单创建的几个没有任何关联的源文件&…

力扣(LeetCode)30. 串联所有单词的子串(C++)

滑动窗口哈希表 哈希表 tottottot 存 wordswordswords 所有单词的出现次数。 维护滑动窗口&#xff0c;窗口长度 mwm\times wmw &#xff0c; mmm 是单词数量 www是单词长度 &#xff0c; 窗口长度对应可行解的长度。哈希表 wdwdwd 维护滑动窗口内每个单词的出现次数。 维护…