hadoop 学习:mapreduce 入门案例三:顾客信息与订单信息相关联(联表)

news2025/1/25 9:01:03

这里的知识点在于如何合并两张表,事实上这种业务场景我们很熟悉了,这就是我们在学习 MySQL 的时候接触到的内连接,左连接,而现在我们要学习 mapreduce 中的做法

这里我们可以选择在 map 阶段和reduce阶段去做

数据:

链接: https://pan.baidu.com/s/1PH1J8SIEJA5UX0muvN-vuQ?pwd=idwx 提取码: idwx

顾客信息

 订单信息

 

编写实体类 CustomerOrder

这里我们除了顾客与订单的属性外,额外定义了一个状态,用来区分当前类是顾客信息还是订单信息

import org.apache.hadoop.io.WritableComparable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

public class CustomerOrders implements WritableComparable<CustomerOrders> {
    private Integer customerId;
    private String customerName;
    private Integer orderId;
    private String orderStatus;

    // 标签
    private String flag;

    @Override
    public String toString() {
        return "CustomerOrders{" +
                "customerId=" + customerId +
                ", customerName='" + customerName + '\'' +
                ", orderId=" + orderId +
                ", orderStatus='" + orderStatus + '\'' +
                ", flag='" + flag + '\'' +
                '}';
    }

    public CustomerOrders() {
    }

    public CustomerOrders(Integer customerId, String customerName, Integer orderId, String orderStatus, String flag) {
        this.customerId = customerId;
        this.customerName = customerName;
        this.orderId = orderId;
        this.orderStatus = orderStatus;
        this.flag = flag;
    }

    public Integer getCustomerId() {
        return customerId;
    }

    public void setCustomerId(Integer customerId) {
        this.customerId = customerId;
    }

    public String getCustomerName() {
        return customerName;
    }

    public void setCustomerName(String customerName) {
        this.customerName = customerName;
    }

    public Integer getOrderId() {
        return orderId;
    }

    public void setOrderId(Integer orderId) {
        this.orderId = orderId;
    }

    public String getOrderStatus() {
        return orderStatus;
    }

    public void setOrderStatus(String orderStatus) {
        this.orderStatus = orderStatus;
    }

    public String getFlag() {
        return flag;
    }

    public void setFlag(String flag) {
        this.flag = flag;
    }

    @Override
    public int compareTo(CustomerOrders o) {
        return 0;
    }

    @Override
    public void write(DataOutput dataOutput) throws IOException {
        dataOutput.writeInt(customerId);
        dataOutput.writeUTF(customerName);
        dataOutput.writeInt(orderId);
        dataOutput.writeUTF(orderStatus);
        dataOutput.writeUTF(flag);
    }

    @Override
    public void readFields(DataInput dataInput) throws IOException {
        this.customerId = dataInput.readInt();
        this.customerName = dataInput.readUTF();
        this.orderId = dataInput.readInt();
        this.orderStatus = dataInput.readUTF();
        this.flag = dataInput.readUTF();
    }
}

 

1. 在 reduce 阶段合并

传入两个文件

(1)map 阶段

setup方法在 map 方法前运行,找到当前数据所在文件的名称,用来区分当前这条数据是顾客信息还是订单信息

map 方法将传进来的数据包装成对象,最后已键值对的形式传给下一阶段

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;

import java.io.IOException;

public class ReduceJoinMapper extends Mapper<LongWritable, Text,Text,CustomerOrders> {
    String fileName = "";
    @Override
    protected void setup(Context context) throws IOException, InterruptedException {
        FileSplit fileSplit = (FileSplit) context.getInputSplit();
        System.out.println("setup method: "+ fileSplit.getPath().toString());
        fileName = fileSplit.getPath().getName();
        System.out.println("fileName : "+fileName);

    }

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//        System.out.println("map stage:");
//        System.out.println("key : "+key+"\tvalue : "+value);
        String[]field = value.toString().split(",");
        CustomerOrders customerOrders = new CustomerOrders();
        if (fileName.startsWith("orders")){         //订单内容
            customerOrders.setCustomerId(Integer.parseInt(field[2]));
            customerOrders.setCustomerName("");
            customerOrders.setOrderId(Integer.parseInt(field[0]));
            customerOrders.setFlag("1");
            customerOrders.setOrderStatus(field[3]);
        }else {                         //用户信息
            customerOrders.setCustomerId(Integer.parseInt(field[0]));
            customerOrders.setCustomerName(field[1]);
            customerOrders.setOrderId(0);
            customerOrders.setFlag("0");
            customerOrders.setOrderStatus("");
        }
        Text text = new Text(customerOrders.getCustomerId().toString());
        context.write(text, customerOrders);
    }
}

(2)reduce 阶段

这里的 reduce 方法则是,先遍历找到唯一的一个顾客信息,然后将顾客信息填充到订单信息中,再合并为一个 Text 输出

当然也可以有不同的写法,比如将每一条订单信息处理完后就写入 context 之后输出

还有就是这里的对象的赋值写的不太好,但是不能直接用=去赋值,可以使用 BeanUtils 的 copyproperties()方法去赋值

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;


import java.io.IOException;
import java.util.ArrayList;
import java.util.List;


public class ReduceJoinReducer extends Reducer<Text,CustomerOrders,Text,Text> {
    @Override
    protected void reduce(Text key, Iterable<CustomerOrders> values, Context context) throws IOException, InterruptedException {
        System.out.println("reduce stage: key:"+key+"  values:"+values);
        String customerName = "";
        String text = "";
        List<CustomerOrders> list = new ArrayList<>();

        for (CustomerOrders co : values){
            if (co.getFlag().equals("0")){
                customerName = co.getCustomerName();
            }
            CustomerOrders customerOrders = new CustomerOrders();

            customerOrders.setCustomerName(co.getCustomerName());
            customerOrders.setFlag(co.getFlag());
            customerOrders.setCustomerId(co.getCustomerId());
            customerOrders.setOrderStatus(co.getOrderStatus());
            customerOrders.setOrderId(co.getOrderId());
            list.add(customerOrders);
        }
        System.out.println(list);
        System.out.println();
        for (CustomerOrders co : list){

            if (co.getFlag().equals("1")){
                CustomerOrders customerOrders = new CustomerOrders();
                customerOrders = co;

                customerOrders.setCustomerName(customerName);
                customerOrders.setFlag("2");

                System.out.println(customerOrders.toString());
                text += customerOrders.toString()+"\t";
            }
        }
        System.out.println(text);
        System.out.println("customerName:"+customerName);
        context.write(key, new Text(text));
    }
}

(3)driver 启动

基本操作,设置好各个参数

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class ReduceJoinDriver {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);

        job.setJarByClass(ReduceJoinDriver.class);

        job.setMapperClass(ReduceJoinMapper.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(CustomerOrders.class);

        job.setReducerClass(ReduceJoinReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);

        FileInputFormat.setInputPaths(job ,new Path[]{new Path(args[0]),new Path(args[1])});


        Path path = new Path(args[2]);
        FileSystem fs = FileSystem.get(path.toUri(),conf);
        if (fs.exists(path)){
            fs.delete(path, true);
        }
        FileOutputFormat.setOutputPath(job,path);

        fs.close();
        job.waitForCompletion(true);
}}

2. 在 map 阶段合并

传入一个文件,另一个文件以缓存文件cachefile的形式传入,这种方法要注意,cachefile的大小不能太大,可以形象的打个比方,你去朋友家做客,晚上朋友家没有被子,你捎带个被子过去,这是可以的,但是如果说你朋友缺个房子,你不能捎带个房子过去对吧。

(1)map 阶段

setup方法使用 io 流的方法将顾客信息读取进来,使用 List<CustomerOrders>去存储

map 方法对于每个订单信息都遍历一次列表,通过顾客编号这一关联属性去找到对应的顾客信息并填充进去

import md.kb23.demo03.CustomerOrders;
import org.apache.commons.beanutils.BeanUtils;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.codehaus.jackson.map.util.BeanUtil;

import java.io.BufferedReader;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.URI;
import java.util.ArrayList;
import java.util.List;

public class MapJoinMapper extends Mapper<LongWritable, Text, CustomerOrders, NullWritable> {
    private List<CustomerOrders> list = new ArrayList<CustomerOrders>();
    @Override
    protected void setup(Context context) throws IOException, InterruptedException {
        URI[] cashFiles = context.getCacheFiles();
        for (URI uri : cashFiles){
            System.out.println(uri.getPath());
            String currentFileName = new Path(uri).getName();
            if (currentFileName.startsWith("customers")){
                String path = uri.getPath();
                BufferedReader br = new BufferedReader(new InputStreamReader(new FileInputStream(path)));
                String line;
                while ((line = br.readLine())!=null){
                    System.out.println(line);
                    String[] field = line.split(",");
                    CustomerOrders customerOrders = new CustomerOrders(Integer.parseInt(field[0]),field[1]+" "+field[2],0,"","");
                    list.add(customerOrders);
                }
            }
        }
    }

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        String[]orderField = value.toString().split(",");
        int customerId = Integer.parseInt(orderField[2]);
        CustomerOrders customerOrders = null;
        for (CustomerOrders customer : list){
            if (customer.getCustomerId()==customerId){
                customerOrders=customer;
            }
        }
        CustomerOrders order = new CustomerOrders();
        if (customerOrders!=null){
            order.setCustomerName(customerOrders.getCustomerName());
        }else {
            order.setCustomerName("");
        }
        order.setCustomerId(customerId);
        order.setOrderStatus(orderField[3]);
        order.setFlag("1");
        order.setOrderId(Integer.parseInt(orderField[0]));
        context.write(order, null);
    }
}

 

(2)driver 启动

这里我们在 map 阶段已经将事情都做完了,就不用再额外写一个 reduce 了,另外就是注意一下 cachefile 的添加方法

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class MapJoinDriver {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        long start = System.currentTimeMillis();

        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);

        job.setJarByClass(MapJoinDriver.class);

        job.setMapperClass(MapJoinMapper.class);
        job.setMapOutputKeyClass(CustomerOrders.class);
        job.setMapOutputValueClass(NullWritable.class);

        Path inpath = new Path("in/demo3/orders.csv");
        FileInputFormat.setInputPaths(job,inpath);

        Path outpath = new Path("out/out5");
        FileSystem fs = FileSystem.get(outpath.toUri(),conf);
        if(fs.exists(outpath)){
            fs.delete(outpath,true);
        }
        FileOutputFormat.setOutputPath(job,outpath);

        //设置 reduce 阶段任务数
        job.setNumReduceTasks(0);

        Path cashPath = new Path("in/demo3/customers.csv");
        job.addCacheFile(cashPath.toUri());

        job.waitForCompletion(true);
        long end = System.currentTimeMillis();

        System.out.println("程序运行时间:"+(end-start));
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/946566.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

聚合支付-第3章-支付宝支付接入指南

惠民支付 第3章讲义-支付宝接入指南 支付宝接入步骤: 1、进入网址https://open.alipay.com/develop/manage 2、扫码登录支付宝账号&#xff0c;控制台&#xff0c;最下边有一个沙箱环境 3、在“支付宝开放平台开发助手”软件中生成密钥&#xff0c;点击生成密钥&#xff0c;保…

Axure RP 8.1.0.3400(原型设计工具)

Axure RP 8是一款原型设计工具&#xff0c;它提供了丰富的功能和工具&#xff0c;帮助用户创建高质量的网页、移动应用和桌面软件原型。以下是Axure RP 8的一些特色介绍&#xff1a; 强大的交互设计&#xff1a;Axure RP 8支持创建复杂的动画和过渡效果&#xff0c;让你的原型更…

一文彻底扒光 Handler

作者&#xff1a;HenAndroid 典型的生产者-消费者模式。 Android跨进程要掌握的是Binder, 而同一进程中最重要的应该就是Handler 消息通信机制了。我这么说&#xff0c;大家不知道是否认同&#xff0c;如果认同&#xff0c;还希望能给一个关注哈。 Handler 是什么&#xff1f;…

AI智能语音识别模块(一)——离线模组介绍

文章目录 离线语音控制模块简介引脚定义开发平台总结 离线语音控制模块 简介 这是一款低成本&#xff0c;低功耗&#xff0c;小体积的高性价比离线语音识别开发板。能快速学习、验证离线语音控制各种外设&#xff0c;如继电器、LED灯&#xff0c;PWM调光等。 板载了Micro USB接…

MySQL DATE_SUB的实践

函数简介DATE_SUB()函数从DATE或DATETIME值中减去时间值(或间隔)。 下面说明了DATE_SUB()函数的语法&#xff1a; DATE_SUB(start_date,INTERVAL expr unit); DATE_SUB()函数接受两个参数&#xff1a; start_date是DATE或DATETIME的起始值。 expr是一个字符串&#xff0c;用于确…

哪个牌子的电视盒子好用?小编盘点复购率最高电视盒子排行榜

复购率可以体现出产品评价如何&#xff0c;电视盒子是我们经常要购买的数码产品&#xff0c;那么电视盒子哪些品牌的复购率最高&#xff1f;用户忠实度最高呢&#xff1f;想了解哪个牌子的电视盒子好用&#xff0c;可以看看小编根据复购情况整理的电视盒子排行榜&#xff1a; ●…

Weblogic漏洞(三)之 Weblogic 弱口令、任意文件读取漏洞

Weblogic 弱口令、任意文件读取漏洞 环境安装 此次我们实验的靶场&#xff0c;是vnlhub中的Weblogic漏洞中的weak_password靶场&#xff0c;我们 cd 到weak_password&#xff0c;然后输入以下命令启动靶场环境&#xff1a; docker-compose up -d输入以下的命令可以查看当前启…

问道管理:仙人指路最佳买入形态?

仙人指路是一种基于技能剖析的股票交易目标。许多投资者运用该目标来预测股票价格的上涨或下跌趋势。在买入股票时&#xff0c;仙人指路能够为投资者供给有用的信息&#xff0c;协助他们找到最佳的买入形状。本文将从多个视点剖析仙人指路的最佳买入形状。 一、仙人指路的基本原…

适合本地运营的同城团购优质商家圈子小程序开发演示

很火的一款适合本地同城运营的同城团购商家圈子小程序。有很多城市都有在用这个小程序做同城资源&#xff0c;实现完美变现。 小程序功能就是将本地商家邀请入驻&#xff0c;以团购的形式出售商家产品或服务套餐。借助微信的社交属性配合同城推广员可以迅速推广起来。 对于商…

网络安全法+网络安全等级保护

网络安全法 网络安全法21条 网络安全法31条 网络安全等级保护 网络安全等级保护分为几级? 一个中心&#xff0c;三重防护 等级保护2.0网络拓扑图 安全区域边界 安全计算环境 等保安全产品 物理机房安全设计

Autofac中多个类继承同一个接口,如何注入?与抽象工厂模式相结合

多个类继承同一个接口,如何注入&#xff1f;与抽象工厂模式相结合 需求: 原来是抽象工厂模式,多个类继承同一个接口。 现在需要使用Autofac进行选择性注入。 Autofac默认常识: Autofac中多个类继承同一个接口,默认是最后一个接口注入的类。 解决方案&#xff1a;(约定大于配…

nodepad++ 插件的安装

nodepad 插件的安装 一、插件安装二、安装插件&#xff1a;Json Viewer nodepad 有 插件管理功能&#xff0c;其中有格式化json以及可以将json作为树查看的插件&#xff1a; Json Viewer 一、插件安装 1、首先下载最新的notepad 64位【https://notepad-plus.en.softonic.com…

Java——一个简单的计算器程序

该代码是一个简单的计算器程序&#xff0c;使用了Java的图形化界面库Swing。具体分析如下&#xff1a; 导入必要的类和包&#xff1a; import java.awt.*; import java.awt.event.ActionEvent; import java.awt.event.ActionListener; import java.util.Objects; import javax.…

Java 中数据结构HashMap的用法

Java HashMap HashMap 是一个散列表&#xff0c;它存储的内容是键值对(key-value)映射。 HashMap 实现了 Map 接口&#xff0c;根据键的 HashCode 值存储数据&#xff0c;具有很快的访问速度&#xff0c;最多允许一条记录的键为 null&#xff0c;不支持线程同步。 HashMap 是…

打通数字化供需“堵点”,828 B2B企业节推出企业应用一站购平台

当前&#xff0c;数字技术与实体经济深度融合&#xff0c;为千行百业注入新动力、拓展新空间。数据显示&#xff0c;2022年中国数字经济规模超过50万亿&#xff0c;占GDP比重超过40%&#xff0c;继续保持在10%的高位增长速度&#xff0c;成为稳定经济增长的关键动力。 为加速企…

智慧校园用电安全解决方案

随着科技的不断发展&#xff0c;智慧校园建设逐渐成为了教育行业的一大趋势。在这个过程中&#xff0c;电力系统作为校园基础设施的重要组成部分&#xff0c;其安全、稳定、高效的运行显得尤为重要。下面小编来为大家介绍下智慧校园用电安全解决方案吧! 一、智慧校园电力系统现…

腾讯云学生服务器优惠价格申请教程

腾讯云学生服务器优惠价格申请教程&#xff0c;腾讯云学生服务器活动&#xff1a;轻量应用服务器2核2G学生价30元3个月、58元6个月、112元一年&#xff0c;轻量应用服务器4核8G配置191.1元3个月、352.8元6个月、646.8元一年&#xff0c;CVM云服务器2核4G配置842.4元一年&#x…

SimpleMind Pro(电脑版思维导图软件)中文版

Simplemind pro是一款极具创意和高效的可视化思维导图工具&#xff0c;它的设计和功能让它在众多思维导图软件中脱颖而出。本文将向您介绍Simplemind pro的特点、使用方法、适用场景以及与其他思维导图软件的比较&#xff0c;帮助您更好地了解Simplemind pro的优势和使用价值。…

文旅虚拟人主播智能讲解员能与人实时对话

元宇宙作为虚拟世界和现实社会交互的重要平台&#xff0c;是数字经济的表现形态之一&#xff0c;在文化和旅游领域拥有广阔的应用空间&#xff0c;也是当下该领域的热门赛道。 众多文旅行业从业者纷纷以“文化科技旅游”的方式&#xff0c;努力探索合适形态的应用场景和商业机会…

EMQX启用双向SSL/TLS安全连接以及java连接

作为基于现代密码学公钥算法的安全协议&#xff0c;TLS/SSL 能在计算机通讯网络上保证传输安全&#xff0c;EMQX 内置对 TLS/SSL 的支持&#xff0c;包括支持单/双向认证、X.509 证书、负载均衡 SSL 等多种安全认证。你可以为 EMQX 支持的所有协议启用 SSL/TLS&#xff0c;也可…