大数据框架之Hadoop:MapReduce(二)Hadoop序列化

news2024/9/27 19:22:30

2.1序列化概述

1、什么是序列化

序列化就是把内存中的对象,转换成字节序列(或其他数据传输协议)以便于存储到磁盘(持久化)和网络传输。

反序列化就是将收到字节序列(或其他数据传输协议)或者是磁盘的持久化数据,转换成内存中的对象。

2、为什么要序列化

一般来说,“活的”对象只生存在内存里,关机断电就没有了。而且“活的”对象只能由本地的进程使用,不能被发送到网络上的另外一台计算机。然而序列化可以存储“活的”对象,可以将“活的”对象发送到远程计算机。

3、为什么不用java的序列化

Java的序列化是一个重量级序列化框架(Serializable),一个对象被序列化后,会附带很多额外的信息(各种校验信息,Header,继承体系等),不便于在网络中高效传输。所以,Hadoop自己开发了一套序列化机制(Writable)。

Hadoop序列化特点:

  • 紧凑:高效使用存储空间。
  • 快速:读写数据的额外开销小。
  • 可扩展:随着通信协议的升级而可升级。
  • 互操作:支持多语言的交互

2.2自定义bean对象实现序列化接口(Writable)

在企业开发中往往常用的基本序列化类型不能满足所有需求,比如在Hadoop框架内部传递一个bean对象,那么该对象就需要实现序列化接口。

具体实现bean对象序列化步骤如下7步。

(1)必须实现Writable接口

(2)反序列化时,需要反射调用空参构造函数,所以必须有空参构造

public FlowBean() {
	super();
}

(3)重写序列化方法

@Override
public void write(DataOutput out) throws IOException {
	out.writeLong(upFlow);
	out.writeLong(downFlow);
	out.writeLong(sumFlow);
}

(4)重写反序列化方法

@Override
public void readFields(DataInput in) throws IOException {
	upFlow = in.readLong();
	downFlow = in.readLong();
	sumFlow = in.readLong();
}

(5)注意反序列化的顺序和序列化的顺序完全一致

(6)要想把结果显示在文件中,需要重写toString(),可用”\t”分开,方便后续用。

(7)如果需要将自定义的bean放在key中传输,则还需要实现Comparable接口,因为MapReduce框中的Shuffle过程要求对key必须能排序。详见后面排序案例。

@Override
public int compareTo(FlowBean o) {
	// 倒序排列,从大到小
	return this.sumFlow > o.getSumFlow() ? -1 : 1;
}

2.3序列化案例实操

1、需求

统计每一个手机号耗费的总上行流量、下行流量、总流量

(1)输入数据phone_data.txt

1	13736230513	192.196.100.1	www.atguigu.com	2481	24681	200
2	13846544121	192.196.100.2			264	0	200
3 	13956435636	192.196.100.3			132	1512	200
4 	13966251146	192.168.100.1			240	0	404
5 	18271575951	192.168.100.2	www.atguigu.com	1527	2106	200
6 	84188413	192.168.100.3	www.atguigu.com	4116	1432	200
7 	13590439668	192.168.100.4			1116	954	200
8 	15910133277	192.168.100.5	www.hao123.com	3156	2936	200
9 	13729199489	192.168.100.6			240	0	200
10 	13630577991	192.168.100.7	www.shouhu.com	6960	690	200
11 	15043685818	192.168.100.8	www.baidu.com	3659	3538	200
12 	15959002129	192.168.100.9	www.atguigu.com	1938	180	500
13 	13560439638	192.168.100.10			918	4938	200
14 	13470253144	192.168.100.11			180	180	200
15 	13682846555	192.168.100.12	www.qq.com	1938	2910	200
16 	13992314666	192.168.100.13	www.gaga.com	3008	3720	200
17 	13509468723	192.168.100.14	www.qinghua.com	7335	110349	404
18 	18390173782	192.168.100.15	www.sogou.com	9531	2412	200
19 	13975057813	192.168.100.16	www.baidu.com	11058	48243	200
20 	13768778790	192.168.100.17			120	120	200
21 	13568436656	192.168.100.18	www.alibaba.com	2481	24681	200
22 	13568436656	192.168.100.19			1116	954	200

(2)输入数据格式:

7 	13560436666	120.196.100.99		1116		 954			200
id	手机号码		网络ip			上行流量  下行流量     网络状态码

(3)期望输出数据格式

13560436666 		1116		      954 			2070
手机号码		    上行流量        下行流量		总流量

2、需求分析

Untitled

3、编写MapReduce程序

(1)编写流量统计的Bean对象

package com.cuiyf41.flowsum;

import org.apache.hadoop.io.Writable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

// 1 实现writable接口
public class FlowBean implements Writable {

    private long upFlow;
    private long downFlow;
    private long sumFlow;

    //2  反序列化时,需要反射调用空参构造函数,所以必须有
    public FlowBean() {
        super();
    }

    public FlowBean(long upFlow, long downFlow) {
        super();
        this.upFlow = upFlow;
        this.downFlow = downFlow;
        this.sumFlow = upFlow + downFlow;
    }

    //3  写序列化方法
    @Override
    public void write(DataOutput out) throws IOException {
        out.writeLong(upFlow);
        out.writeLong(downFlow);
        out.writeLong(sumFlow);
    }

    //4 反序列化方法
    //5 反序列化方法读顺序必须和写序列化方法的写顺序必须一致
    @Override
    public void readFields(DataInput in) throws IOException {
        this.upFlow  = in.readLong();
        this.downFlow = in.readLong();
        this.sumFlow = in.readLong();
    }

    // 6 编写toString方法,方便后续打印到文本
    @Override
    public String toString() {
        return upFlow + "\t" + downFlow + "\t" + sumFlow;
    }

    public long getUpFlow() {
        return upFlow;
    }

    public void setUpFlow(long upFlow) {
        this.upFlow = upFlow;
    }

    public long getDownFlow() {
        return downFlow;
    }

    public void setDownFlow(long downFlow) {
        this.downFlow = downFlow;
    }

    public long getSumFlow() {
        return sumFlow;
    }

    public void setSumFlow(long sumFlow) {
        this.sumFlow = sumFlow;
    }

    public void set(long upFlow, long downFlow) {
        this.upFlow = upFlow;
        this.downFlow = downFlow;
        this.sumFlow = upFlow + downFlow;
    }
}

(2)编写Mapper类

package com.cuiyf41.flowsum;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class FlowCountMapper extends Mapper<LongWritable, Text, Text, FlowBean> {

    FlowBean v = new FlowBean();
    Text k = new Text();

    @Override
    protected void map(LongWritable key, Text value, Context context)	throws IOException, InterruptedException {

        // 1 获取一行
        String line = value.toString();

        // 2 切割字段
        String[] fields = line.split("\t");

        // 3 封装对象
        // 取出手机号码
        String phoneNum = fields[1];

        // 取出上行流量和下行流量
        long upFlow = Long.parseLong(fields[fields.length - 3]);
        long downFlow = Long.parseLong(fields[fields.length - 2]);

        k.set(phoneNum);
//        FlowBean v = new FlowBean(upFlow, downFlow);
        v.set(upFlow, downFlow);

        // 4 写出
        context.write(k, v);
    }
}

(3)编写Reducer类

package com.cuiyf41.flowsum;

import java.io.IOException;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class FlowCountReducer extends Reducer<Text, FlowBean, Text, FlowBean> {

    FlowBean v = new FlowBean();
    @Override
    protected void reduce(Text key, Iterable<FlowBean> values, Context context)throws IOException, InterruptedException {

        long sum_upFlow = 0;
        long sum_downFlow = 0;

        // 1 遍历所用bean,将其中的上行流量,下行流量分别累加
        for (FlowBean flowBean : values) {
            sum_upFlow += flowBean.getUpFlow();
            sum_downFlow += flowBean.getDownFlow();
        }

        // 2 封装对象
//        FlowBean resultBean = new FlowBean(sum_upFlow, sum_downFlow);
        v.set(sum_upFlow, sum_downFlow);

        // 3 写出
        context.write(key, v);
    }
}

(4)编写Driver驱动类

package com.cuiyf41.flowsum;

import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class FlowsumDriver {

    public static void main(String[] args) throws IllegalArgumentException, IOException, ClassNotFoundException, InterruptedException {

        // 输入输出路径需要根据自己电脑上实际的输入输出路径设置
        args = new String[] { "e:/input/phone_data.txt", "e:/output1" };

        // 1 获取配置信息,或者job对象实例
        Configuration configuration = new Configuration();
        Job job = Job.getInstance(configuration);

        // 6 指定本程序的jar包所在的本地路径
        job.setJarByClass(FlowsumDriver.class);

        // 2 指定本业务job要使用的mapper/Reducer业务类
        job.setMapperClass(FlowCountMapper.class);
        job.setReducerClass(FlowCountReducer.class);

        // 3 指定mapper输出数据的kv类型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(FlowBean.class);

        // 4 指定最终输出的数据的kv类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(FlowBean.class);

        // 5 指定job的输入原始文件所在目录
        FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        // 7 将job中配置的相关参数,以及job所用的java类所在的jar包, 提交给yarn去运行
        boolean result = job.waitForCompletion(true);
        System.exit(result ? 0 : 1);
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/354290.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

TCP 的演化史-fast retransmit/recovery

工作原因要对一个 newreno 实现增加 sack 支持。尝试写了 3 天 C&#xff0c;同时一遍又一遍梳理 sack 标准演进。这些东西我早就了解&#xff0c;但涉及落地写实现&#xff0c;就得不断抠细节&#xff0c;试图写一个完备的实现。 这事有更简单的方法。根本没必要完全实现 RFC…

大型信息系统

一、大型信息系统二、信息系统的规划方法三、信息系统的规划工具 一、大型信息系统 信息系统规划&#xff08;也称为信息系统战略规划&#xff09;是一个组织有关信息系统建设与应用的全局性谋划&#xff0c;主要包括战略目标、策略和部署能内容。 信息化规划是企业信息化建设…

安全—08day

ApabilitiesapabilitiesLinux Capabilities线程的 capabilitiesPermitted 允许Effective 有效InheritableBoundingAmbient文件的 capabilitiesPermittedInheritableEffective运行 execve() 后 capabilities 的变化案例分析方法一、依次执行如下命令方法二、iptables端口转发方案…

SAP ABAP GUI_DOWNLOAD中下载乱码的问题

1 GUI_DOWNLOAD 1.1 问题表现 GUI_DOWNLOAD在应用当中有时会导致输出的文件在某些电脑正常显示&#xff0c;在某些电脑乱码显示。这个固然是由于各个电脑系统配置有差异&#xff0c;但是我们可以在应用该函数时就排除该差异来保证任意台电脑正常显示输出的文件。 如下…

英语基础-定语从句的特殊用法及写作应用

1. 定语从句的引导词省略的情况 1. that 引导定语从句&#xff0c;从句中缺宾语/表语&#xff0c;that可省略&#xff1b; This is the book that he likes. I like the shirt that you gave me. We do not agree on the plan that you make. China is not the country th…

论文浅尝 | SpCQL: 一个自然语言转换Cypher的语义解析数据集

笔记整理&#xff1a;郭爱博&#xff0c;国防科技大学博士论文发表会议&#xff1a;The 31th ACM International Conference on Information and Knowledge Management&#xff0c;CIKM 2022动机随着社交、电子商务、金融等行业的快速发展&#xff0c;现实世界编织出一张庞大而…

测试人员为什么也要学习Linux操作系统

我相信能够看到这篇文章的你&#xff0c;一定是对计算机感兴趣、想要增加技能从而为以后加薪打基础。今天&#xff0c;我就和大家谈谈我对为什么要学习 Linux 系统的看法。我将从如下这三个方面谈我的看法。 巩固基础知识 做一个合格的软件工程师 学以致用 1. 巩固基础知识 …

2023年美国大学生数学建模C题:预测Wordle结果建模详解+模型代码

目录 前言 一、题目理解 背景 解析 字段含义&#xff1a; 建模要求 二、建模思路 灰色预测&#xff1a; ​编辑 二次指数平滑法&#xff1a; person相关性 只希望各位以后遇到建模比赛可以艾特认识一下我&#xff0c;我可以提供免费的思路和部分源码&#xff0c;以后…

字符设备驱动基础(一)

目录 一、Linux内核对设备的分类 linux的文件种类&#xff1a; Linux内核按驱动程序实现模型框架的不同&#xff0c;将设备分为三类&#xff1a; 总体框架图&#xff1a; 二、设备号------内核中同类设备的区分 三、申请和注销设备号 四、函数指针复习 4.1、 内存四区 …

ACM数论 裴蜀定理(贝祖定理)

一.内容定义 「裴蜀定理」&#xff0c;又称贝祖定理&#xff08;Bzouts lemma&#xff09;。是一个关于最大公约数的定理。其内容定义为&#xff1a;对于不全为零的任意整数 a 和 b&#xff0c;记二者的最大公约数为 g 即 gcd(a,b) g&#xff0c;则对于任意整数 x 和 y 都一定…

ASEMI高压MOS管4N65SE,4N65SE参数,4N65SE特征

编辑-Z ASEMI高压MOS管4N65SE参数&#xff1a; 型号&#xff1a;4N65SE 漏极-源极电压&#xff08;VDS&#xff09;&#xff1a;650V 栅源电压&#xff08;VGS&#xff09;&#xff1a;30V 漏极电流&#xff08;ID&#xff09;&#xff1a;4A 功耗&#xff08;PD&#xf…

分析内核自带的LCD驱动程序_基于IMX6ULL

分析内核自带的LCD驱动程序_基于IMX6ULL 文章目录分析内核自带的LCD驱动程序_基于IMX6ULL参考资料&#xff1a;一、驱动程序框架1.1 入口函数注册platform_driver1.2 设备树有对应节点1.3 probe函数分析二、 编写硬件相关的代码2.1 GPIO设置2.2 时钟设置2.3 LCD控制器的配置致谢…

[软件工程导论(第六版)]第1章 软件工程学概述(课后习题详解)

文章目录1. 什么是软件危机&#xff1f;它有哪些典型表现&#xff1f;为什么会出现软件危机&#xff1f;2. 假设自己是一家软件公司的总工程师&#xff0c;当把图1.1给手下的软件工程师们观看&#xff0c;告诉他们及早发现并改正错误的重要性时&#xff0c;有人不同意这个观点&…

C#按边框切检验仪器图

最近碰到一个检验设备是生成PDF文件报告的。imedicallis监听程序把PDF解析出来之后发现PDF里面图不是多个小图&#xff0c;而是一张大图。但用户又要传到检验系统的是小图&#xff0c;而且小图位置和数量不固定&#xff0c;也不能用固定位置截取实现。为此开启一段“高端设备局…

Linux生产者消费模型

1.生产者消费者模型 1.1 为何要使用生产者消费者模型 生产者消费者模式就是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯&#xff0c;而通过阻塞队列来进行通讯&#xff0c;所以生产者生产完数据之后不用等待消费者处理&#xff0c;直接…

【淄博正大光明】收藏|三分钟带你全面了解这个神奇的镜片

对于孩子的东西 家长总是谨慎再谨慎 而对于夜间戴在眼睛里的镜片 家长更是存在很多顾虑 安全吗&#xff1f;有效吗&#xff1f; 影响孩子睡觉吗&#xff1f; 别着急淄博正大光明眼科医院 带你深度了解角膜塑形镜 01 角膜塑形镜究竟是什么&#xff1f; 角膜塑形镜是一种使用高分…

一文搞懂Linux内核进程CPU调度基本原理

为什么需要调度 进程调度的概念比较简单&#xff0c;我们假设在一个单核处理器的系统中&#xff0c;同一时刻只有一个进程可以拥有处理器资源&#xff0c;那么其他的进程只能在就绪队列中等待&#xff0c;等到处理器空闲之后才有计划获得处理器资源来运行。在这种场景下&#…

k8s快速入门

文章目录一、Kubernetes&#xff08;K8S&#xff09;简介1、概念1.1 Kubernetes (K8S) 是什么1.2 核心特性1.3 部署方案2、Kubernetes 集群架构2.1 架构2.2 重要概念 Pod2.3 Kubernetes 组件二、Kubernetes集群安装1、安装方式介绍2、minikubute安装3、裸机搭建&#xff08;Bar…

python实用脚本(六)—— pandas库的使用(生成、读取表格)

本期主题&#xff1a; python的pandas使用 往期链接&#xff1a; python实用脚本&#xff08;一&#xff09;—— 批量修改目标文件夹下的文件名python实用脚本&#xff08;二&#xff09;—— 使用xlrd读取excelpython实用脚本&#xff08;三&#xff09;—— 通过有道智云AP…

Linux 日志查找常用命令

1.1 cat、zcat cat -n app.log | grep "error"&#xff1a;查询日志中含有某个关键字error的信息&#xff0c;显示行号。 cat -n app.log | grep "error" --color&#xff1a;查询日志中含有某个关键字error的信息&#xff0c;显示行号&#xff0c;带颜色…