MapReduce实战案例(3)

news2024/11/25 4:42:00

案例三: MR实战之TOPN(自定义GroupingComparator)

项目准备

  1. 需求+测试数据

有如下订单数据

订单id商品id成交金额
Order_0000001Pdt_01222.8
Order_0000001Pdt_0525.8
Order_0000002Pdt_03522.8
Order_0000002Pdt_04122.4
Order_0000002Pdt_05722.4
Order_0000003Pdt_01222.8

现在需要求出每一个订单中成交金额最大的一笔交易

  1. 分析

    a) 利用“订单id和成交金额”作为key,可以将map阶段读取到的所有订单数据按照id分区,按照金额排序,发送到reduce

    b) 在reduce端利用groupingcomparator将订单id相同的kv聚合成组,然后取第一个即是最大值

项目实现

a)自定义groupingcomparator

/**
 * @Author 千锋大数据教学团队
 * @Company 千锋好程序员大数据
 * @Description 用于控制shuffle过程中reduce端对kv对的聚合逻辑
 */
public class ItemidGroupingComparator extends WritableComparator {

    protected ItemidGroupingComparator() {

        super(OrderBean.class, true);
    }


    @Override
    public int compare(WritableComparable a, WritableComparable b) {
        OrderBean abean = (OrderBean) a;
        OrderBean bbean = (OrderBean) b;

        //将item_id相同的bean都视为相同,从而聚合为一组
        return abean.getItemid().compareTo(bbean.getItemid());
    }
}
复制代码

文末扫码领取福利! 

b)定义订单信息bean

/**
 * @Author 千锋大数据教学团队
 * @Company 千锋好程序员大数据
 * @Description 订单信息bean,实现hadoop的序列化机制
 */
public class OrderBean implements WritableComparable<OrderBean>{
    private Text itemid;
    private DoubleWritable amount;

    public OrderBean() {
    }
    public OrderBean(Text itemid, DoubleWritable amount) {
        set(itemid, amount);
    }

    public void set(Text itemid, DoubleWritable amount) {

        this.itemid = itemid;
        this.amount = amount;

    }

    public Text getItemid() {
        return itemid;
    }

    public DoubleWritable getAmount() {
        return amount;
    }

    @Override
    public int compareTo(OrderBean o) {
        int cmp = this.itemid.compareTo(o.getItemid());
        if (cmp == 0) {

            cmp = -this.amount.compareTo(o.getAmount());
        }
        return cmp;
    }

    @Override
    public void write(DataOutput out) throws IOException {
        out.writeUTF(itemid.toString());
        out.writeDouble(amount.get());

    }

    @Override
    public void readFields(DataInput in) throws IOException {
        String readUTF = in.readUTF();
        double readDouble = in.readDouble();

        this.itemid = new Text(readUTF);
        this.amount= new DoubleWritable(readDouble);
    }


    @Override
    public String toString() {
        return itemid.toString() + "\t" + amount.get();
    }
}
复制代码

c) 编写MapReduce处理流程

/**
 * @Author 千锋大数据教学团队
 * @Company 千锋好程序员大数据
 * @Description 利用secondarysort机制输出每种item订单金额最大的记录
 */

public class SecondarySort {

    static class SecondarySortMapper extends Mapper<LongWritable, Text, OrderBean, NullWritable>{

        OrderBean bean = new OrderBean();

        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

            String line = value.toString();
            String[] fields = StringUtils.split(line, "\t");

            bean.set(new Text(fields[0]), new DoubleWritable(Double.parseDouble(fields[1])));

            context.write(bean, NullWritable.get());

        }

    }

    static class SecondarySortReducer extends Reducer<OrderBean, NullWritable, OrderBean, NullWritable>{


        //在设置了groupingcomparator以后,这里收到的kv数据 就是:  <1001 87.6>,null  <1001 76.5>,null  .... 
        //此时,reduce方法中的参数key就是上述kv组中的第一个kv的key:<1001 87.6>
        //要输出同一个item的所有订单中最大金额的那一个,就只要输出这个key
        @Override
        protected void reduce(OrderBean key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
            context.write(key, NullWritable.get());
        }
    }


    public static void main(String[] args) throws Exception {

        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);

        job.setJarByClass(SecondarySort.class);

        job.setMapperClass(SecondarySortMapper.class);
        job.setReducerClass(SecondarySortReducer.class);


        job.setOutputKeyClass(OrderBean.class);
        job.setOutputValueClass(NullWritable.class);

        FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        //指定shuffle所使用的GroupingComparator类
        job.setGroupingComparatorClass(ItemidGroupingComparator.class);
        //指定shuffle所使用的partitioner类
        job.setPartitionerClass(ItemIdPartitioner.class);

        job.setNumReduceTasks(3);

        job.waitForCompletion(true);

    }

}

 

 也可以观看视频:

千锋大数据Hadoop全新增强版-先导片

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/587951.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

6 具有 OCR 功能的顶级 PDF 图像转 Word 转换器

如果您在 PDF 图像中找到一些有用的信息并想转换为 Word 格式以供进一步使用&#xff0c;您将需要一个具有OCR 功能的 PDF 图像转 Word 转换器&#xff0c;该转换器旨在识别 PDF 图像中的文本并将其制作出来可编辑。 将 PDF 图像转换为 Word 并不容易&#xff0c;因为我们需要…

高压放大器工作原理(高压放大器怎么用的)

高压放大器是一种能够将低电平信号放大到足够高的电平&#xff0c;以便用于驱动大功率负载或处理高电压信号的电子设备。它广泛应用于各种电子设备中&#xff0c;例如音频放大器、射频放大器、电力电子设备等。下面我们将详细介绍高压放大器的工作原理以及使用方法。 高压放大器…

一分钟:GTP鼓谱导出转换MIDI格式教程

const loadPromise self.osmd.load("/resource/test");loadPromise.then(function () {self.osmd.render();});作为一名鼓手&#xff0c;我深知鼓谱转换MIDI格式的重要性&#xff0c;但是找了好久&#xff0c;一直没有找到一个好用的工具。 直到我发现了GTP鼓谱转换…

下载YouTube视频的一种方法

文章目录 工具名称下载方法使用方法1.只下载音频2.下载音频转换成mp3&#xff08;加上-x –audio-format参数&#xff09;3.下载视频&#xff08;带音频&#xff09;ID&#xff1a;22 | EXT&#xff1a;mp4 | 1280*720 下载的数据集&#xff1a;YouCook2 工具名称 yt-dlp 下载…

doxygen使用: 跨平台方式让markdown文件包含另一个文件

文章目录 1. 目的和问题2. 解决思路2.1 FILTER_PATTERNS 选项2.2 基于 Python 的 FILTER_PATTERNS 选项2.3 sledcpp.py 脚本 3. 完整工程3.1 目录结构3.2 hello.h 文件内容3.3 CHANGELOG.md 文件内容3.4 generate_doxyfile.py 文件内容3.5 docs/root.md3.6 docs/changelog.md3.…

Redis 事务详细介绍

事务 注意&#xff1a;Redis单条命令是保证原子性的&#xff1b;但是事务不保证原子性&#xff01; Redis事务没有隔离级别的概念&#xff0c;所有的命令在事务中&#xff0c;并没有直接被执行&#xff0c;只有发起执行命令时才执行 Redis事务本质&#xff1a;一组命令的集合&…

API接口对接的流程和注意的事项

API接口对接是将两个应用程序或系统连接并进行数据交换的过程。在进行API接口对接时&#xff0c;需要确保两个系统具有相同的协议和格式&#xff0c;并且数据传输过程中不会出现错误或数据丢失。下面是API接口对接的流程和注意事项&#xff1a; 流程&#xff1a; 1.确认数据格…

【多目标优化算法】多目标蚱蜢优化算法(Matlab代码实现)

&#x1f4a5;&#x1f4a5;&#x1f49e;&#x1f49e;欢迎来到本博客❤️❤️&#x1f4a5;&#x1f4a5; &#x1f3c6;博主优势&#xff1a;&#x1f31e;&#x1f31e;&#x1f31e;博客内容尽量做到思维缜密&#xff0c;逻辑清晰&#xff0c;为了方便读者。 ⛳️座右铭&a…

Linux——进程退出

目录 一.进程退出时有三种选择&#xff1a; 1.1 echo $?命令&#xff1a; 功能&#xff1a; 打印距离现在最近一次执行某进程的退出码 例2代码&#xff1a; 例3&#xff1a; 例4代码&#xff1a; 1.3 进程运行过程中可能会出现的错误种类&#xff1a; 二.总结&#xff…

神经网络学习小记录73——Pytorch CA(Coordinate attention)注意力机制的解析与代码详解

神经网络学习小记录73——Pytorch CA&#xff08;Coordinate attention&#xff09;注意力机制的解析与代码详解 学习前言代码下载CA注意力机制的概念与实现注意力机制的应用 学习前言 CA注意力机制是最近提出的一种注意力机制&#xff0c;全面关注特征层的空间信息和通道信息…

Unity随手问题记录(持续更新~~)

目录 1.将摄像机定位到模型实际中心点前边&#xff08;防止有些模型中心点和实际模型中心位置偏移很大的情况&#xff09; 2.获取当鼠标在RawImage上时&#xff0c;鼠标位置对应的图像坐标(简单粗暴方式) 3.设置脚本运行顺序 4.当plugins底下出现dll文件识别不到的情况&#xf…

LeetCode 1110. Delete Nodes And Return Forest【二叉树,DFS,哈希表】中等

本文属于「征服LeetCode」系列文章之一&#xff0c;这一系列正式开始于2021/08/12。由于LeetCode上部分题目有锁&#xff0c;本系列将至少持续到刷完所有无锁题之日为止&#xff1b;由于LeetCode还在不断地创建新题&#xff0c;本系列的终止日期可能是永远。在这一系列刷题文章…

SKD240

SKD240 系列智能电力仪表 SKD240 系列智能电力仪表是陕西斯科德智能科技有限公司自主研发、生产的。 产品概述 - 点击了解详情 SKD240采用先进的微处理器和数字信号处理技术&#xff08;内置主芯片采用32位单片机, 采用32位浮点型真有效值处理数据&#xff09;&#xff0c;测量…

图像采集卡的基本原理、应用领域和发展趋势

图像采集卡是一种硬件设备&#xff0c;用于将模拟视频信号转换为数字信号&#xff0c;并将其传输到计算机中进行处理和存储。它通常用于监控、视频会议、医学图像等领域。本文将介绍图像采集卡的基本原理、应用领域和发展趋势。 一、图像采集卡的基本原理 图像采集卡的基本原…

视频里的声音怎么转换成音频?

视频里的声音怎么转换成音频&#xff1f;这样我们就能把视频里的想要的声音在其他音频平台播放或是用于其他视频。其实视频提取音频是一种将视频文件中的音频数据分离出来的技术。该技术可以将视频中的音频转换为不同的格式&#xff0c;让我们可以在无需视频的情况下使用音频文…

Web 应用程序防火墙 (WAF) 相关知识介绍

Web应用程序防火墙 (WAF) 如何工作&#xff1f; Web应用防护系统&#xff08;也称为&#xff1a;网站应用级入侵防御系统。英文&#xff1a;Web Application Firewall&#xff0c;简称&#xff1a;WAF&#xff09;。利用国际上公认的一种说法&#xff1a;Web应用防火墙是通过执…

【Zero to One系列】在WSL linux系统上,使用docker运行Mysql与Nacos,以及如何启动与停止WSL

前期回顾&#xff1a; 【Zero to One系列】window系统安装Linux、docker 1、下载docker-compose 1.下载&#xff1a; curl -SL https://github.com/docker/compose/releases/download/v2.17.2/docker-compose-linux-x86_64 -o /usr/local/bin/docker-compose 2.授予权限&a…

大数据治理入门系列:元数据管理

在介绍数据治理一文中&#xff0c;我们曾用在图书馆找书的例子解释为什么需要进行数据治理。数据治理在某种程度上类似于图书管理。元数据管理作为数据治理的重要一环&#xff0c;也可以进行这种类比。 在图书管理过程中&#xff0c;需要根据相应的制度购买、记录、存放、借还…

【LeetCode: 486. 预测赢家 | 暴力递归=>记忆化搜索=>动态规划 】

&#x1f680; 算法题 &#x1f680; &#x1f332; 算法刷题专栏 | 面试必备算法 | 面试高频算法 &#x1f340; &#x1f332; 越难的东西,越要努力坚持&#xff0c;因为它具有很高的价值&#xff0c;算法就是这样✨ &#x1f332; 作者简介&#xff1a;硕风和炜&#xff0c;…

【T+】畅捷通T+设置收入成本配比结转

【问题需求】 收入成本配比原则是指&#xff1a; 取得的销售收入应与为取得该收入所发生的成本相匹配&#xff0c; 即先出库后销货时需要等收到销售发票才能确认成本&#xff0c; 先销货后出库时要先确认虚拟成本。 【解决方案】 重点&#xff1a;业务流程选择【单据立账】的情…