文章目录
- Pig:数据分析引擎
- 9.1 什么是Pig
- 9.1.1 简介
- 9.1.2 与 Hive 的对比
- 9.2 Pig的体系结构和数据模型
- 9.3 Pig的安装和工作模式
- 9.3.1 Pig安装
- 9.3.2 Pig工作模式
- 9.4 Pig的内置函数
- 9.5 使用PigLatin语句分析数据
- 9.6 Pig的自定义函数
- 9.6.1 自定义过滤和运算函数
- 9.6.2 自定义加载函数
Pig:数据分析引擎
9.1 什么是Pig
9.1.1 简介
Pig是一个基于Apache Hadoop的大规模数据分析平台,它提供的SQL-LIKE语言叫PigLatin,该语言的编译器会把类SQL的数据分析请求转换为一系列经过优化处理的MapReduce运算。Pig为复杂的海量数据并行计算提供了一个简单的操作和编程接口,使用者可以透过Python或者JavaScript编写Java,之后再重新转写。
Pig的特点
-
Pig是一个用来处理大规模数据集的平台,由Yahoo!贡献给Apache
-
Pig可以简化MapReduce任务的开发
-
Pig可以看做hadoop的客户端软件,可以连接到hadoop集群迕行数据分析工作
-
Pig方便不熟悉java的用户,使用一种较为简便的类似二SQL的面向数据流的语言pig Latin迕行数据处理
-
PigLatin可以迕行排序、过滤、求和、分组、关联等常用操作,迓可以自定义凼数,返是一种面向数据分析处理的轻量级脚本语言
-
Pig可以看做是PigLatin到MapReduce的映射器
-
Pig可以自动对集群进行分配和回收,自动地对MapReduce程序进行优化
9.1.2 与 Hive 的对比
Pig与Hive作为一种高级数据语言,均运行于HDFS之上,是hadoop上层的衍生架构,用于简化hadoop任务,并对MapReduce进行一个更高层次的封装。Pig与Hive的区别如下:
- Pig是一种面向过程的数据流语言;Hive是一种数据仓库语言,并提供了完整的sql查询功能。
- Pig更轻量级,执行效率更快,适用于实时分析;Hive适用于离线数据分析。
- Hive查询语言为Hql,支持分区;Pig查询语言为Pig Latin,不支持分区。
- Hive支持JDBC/ODBC;Pig不支持JDBC/ODBC。
- Pig适用于半结构化数据(如:日志文件);Hive适用于结构化数据。
9.2 Pig的体系结构和数据模型
为了使用Pig执行特定任务,程序员需要使用Pig Latin语言编写Pig脚本,并使用任何执行机制(Grunt Shell,UDF,嵌入式)执行它们。执行后,这些脚本将经历Pig框架应用的一系列转换,以产生所需的输出。
在内部,Apache Pig将这些脚本转换为一系列MapReduce作业,因此使程序员的工作变得容易。Apache Pig的体系结构如下所示。
数据模型
9.3 Pig的安装和工作模式
9.3.1 Pig安装
首先将Pig的包上传到我的Linux目录当中,然后对文件进行解压安装
解压命令
tar -zxvf pig-0.17.0.tar.gz -C ~/training/
设置环境变量 vi ~/.bash_profile
PIG_HOME=/root/training/pig-0.17.0
export PIG_HOME
PATH=$PIG_HOME/bin:$PATH
export PATH
生效环境变量 source ~/.bash_profile
Pig的常用命令
- ls、cd、cat、mkdir、pwd
- copyFromLocal、copyToLocal
- sh
- register、define
9.3.2 Pig工作模式
本地模式:操作Linux的文件
启动: pig -x local
日志:Connecting to hadoop file system at: file:///
集群模式:链接到HDFS
设置环境变量PIG_CLASSPATH指向Hadoop配置文件所在的目录
注意:通过Pig操作HDFS的速度要快
设置环境变量 vi ~/.bash_profile
PIG_CLASSPATH=/root/training/hadoop-2.7.3/etc/hadoop
export PIG_CLASSPATH
生效环境变量 source ~/.bash_profile
启动: pig
日志: Connecting to hadoop file system at: hdfs://bigdata111:9000
9.4 Pig的内置函数
计算函数
函数名称 | 函数作用 |
---|---|
avg | 计算包中项的平均值 |
concat | 把两个字节数组或者字符数组连接成一个 |
count | 计算包中非空值的个数 |
count_star | 计算包中项的个数,包括空值 |
diff | 计算两个包的差 |
max | 计算包中项的最大值 |
min | 计算包中项的最小值 |
size | 计算一个类型的大小,数值型的大小为1; 对于字符数组,返回字符的个数; 对于字节数组,返回字节的个数; 对于元组,包,映射,返回其中项的个数。 |
sum | 计算一个包中项的值的总和 |
TOKENIZE | 对一个字符数组进行标记解析,并把结果词放入一个包 |
过滤函数
函数名称 | 函数作用 |
---|---|
isempty | 判断一个包或映射是否为空 |
加载存储函数
函数名称 | 函数作用 |
---|---|
PigStorage | 用字段分隔文本格式加载或存储关系,这是默认的存储函数 |
BinStorage | 从二进制文件加载一个关系或者把关系存储到二进制文件 |
BinaryStorage | 从二进制文件加载只是包含一个类型为bytearray的字段的元组到关系,或以这种格式存储一个关系 |
TextLoader | 从纯文本格式加载一个关系 |
PigDump | 用元组的tostring()形式存储关系 |
9.5 使用PigLatin语句分析数据
注意:类似Spark RDD的算子(方法、函数)
Spark的算子有两种类型:
- Transformation:不会触发计算
- Action:会触发计算
注意:启动Yarn的history server
-
首先需要使用Hadoop的HistoryServer,前提要启动hadoop环境
-
命令:mr-jobhistory-daemon.sh start historyserver
-
网页:http://192.168.157.111:19888/jobhistory(包含所有的HDFS操作)
-
-
常用的PigLatin语句:有的会触发计算,有的不会
名称 作用 Spark对应语句 load 加载数据到bag(表) foreach 相当于循环,对bag每一条数据tuple进行处理 RDD.foeach filter 过滤:相当于where RDD.filter group by 分组 也有 join 连接操作 也有 generate 提取列 union/intersect 集合运算 dump 输出:直接打印的屏幕上 store 输出: 输出到文件
-
举例:数据(员工表、部门表)
7654,MARTIN,SALESMAN,7698,1981/9/28,1250,1400,30
(1) 加载员工数据到表
emp = load '/scott/emp.csv';
查询表的结构
describe emp; ---> Schema for emp unknown.
(2) 加载员工数据到表,指定每个tuple的结构和类型
emp = load '/scott/emp.csv' as(empno,ename,job,mgr,hiredate,sal,comm,deptno);
默认的数据类型:bytearray
默认分隔符:制表符
emp = load '/scott/emp.csv' as(empno:int,ename:chararray,job:chararray,mgr:int,hiredate:chararray,sal:int,comm:int,deptno:int);
emp = load '/scott/emp.csv' using PigStorage(',') as(empno:int,ename:chararray,job:chararray,mgr:int,hiredate:chararray,sal:int,comm:int,deptno:int);
创建一个部门表
dept = load '/scott/dept.csv' using PigStorage(',') as(deptno:int,dname:chararray,loc:chararray);
(3) 查询员工信息:员工号 姓名 薪水
SQL: select empno,ename,sal from emp;
PL: emp3 = foreach emp generate empno,ename,sal;
(4) 查询员工信息:按照月薪排序
SQL: select * from emp order by sal;
PL: emp4 = order emp by sal;
(5) 分组:求每个部门的工资的最大值
SQL: select deptno,max(sal) from emp group by deptno;
PL: 第一步:分组
emp51 = group emp by deptno;
表结构:
emp51: {group: int,
emp: {(empno: int,ename: chararray,job: chararray,mgr: int,hiredate: chararray,sal: int,comm: int,deptno: int)}}
数据:
(10,{(7934,MILLER,CLERK,7782,1982/1/23,1300,,10),
(7839,KING,PRESIDENT,,1981/11/17,5000,,10),
(7782,CLARK,MANAGER,7839,1981/6/9,2450,,10)})
(20,{(7876,ADAMS,CLERK,7788,1987/5/23,1100,,20),
(7788,SCOTT,ANALYST,7566,1987/4/19,3000,,20),
(7369,SMITH,CLERK,7902,1980/12/17,800,,20),
(7566,JONES,MANAGER,7839,1981/4/2,2975,,20),
(7902,FORD,ANALYST,7566,1981/12/3,3000,,20)})
(30,{(7844,TURNER,SALESMAN,7698,1981/9/8,1500,0,30),
(7499,ALLEN,SALESMAN,7698,1981/2/20,1600,300,30),
(7698,BLAKE,MANAGER,7839,1981/5/1,2850,,30),
(7654,MARTIN,SALESMAN,7698,1981/9/28,1250,1400,30),
(7521,WARD,SALESMAN,7698,1981/2/22,1250,500,30),
(7900,JAMES,CLERK,7698,1981/12/3,950,,30)})
第二步:求每个部门的工资最大值
emp52 = foreach emp51 generate group,MAX(emp.sal)
(6) 查询10号部门的员工
SQL: select * from emp where deptno=10;
PL: emp6 = filter emp by deptno==10; 注意:两个等号
(7) 多表查询
查询员工信息: 员工姓名 部门名称
SQL: select e.ename,d.dname from emp e,dept d where e.deptno=d.deptno;
PL: emp71 = join dept by deptno,emp by deptno;
emp72 = foreach emp71 generate dept::dname,emp::ename;
(8) 集合运算:关系型数据库Oracle:参与集合运算的各个集合必须列数相同且类型一致
10和20号部门的员工
SQL: select * from emp where deptno=10
union
select * from emp where deptno=20;
PL: emp10 = filter emp by deptno==10;
emp20 = filter emp by deptno==20;
emp10_20 = union emp10,emp20;
(9) 使用PL实现WordCount
① 加载数据
mydata = load '/data/data.txt' as (line:chararray);
② 将字符串分割成单词
words = foreach mydata generate flatten(TOKENIZE(line)) as word;
③ 对单词进行分组
grpd = group words by word;
④ 统计每组中单词数量
cntd = foreach grpd generate group,COUNT(words);
⑤ 打印结果
dump cntd;
注意:PigLatin中,bag具有依赖关系
Spark中,RDD也具有依赖关系(宽依赖、窄依赖)
9.6 Pig的自定义函数
Pig的自定义函数有三种:
-
自定义过滤函数:相当于where条件
-
自定义运算函数:
-
自定义加载函数:使用load语句加载数据,生成一个bag
默认:load语句加载数据,一行解析成一个Tuple
需要MR的jar包
9.6.1 自定义过滤和运算函数
依赖的jar包
1、/root/training/pig-0.17.0/pig-0.17.0-core-h2.jar
2、/root/training/pig-0.17.0/lib
3、/root/training/pig-0.17.0/lib/h2
4、$HADOOP_HOME/share/hadoop/common
5、$HADOOP_HOME/share/hadoop/common/lib
自定义过滤函数IsSalaryTooHigh
//实现自定义的过滤函数:实现:查询过滤薪水大于2000的员工
public class IsSalaryTooHigh extends FilterFunc{
@Override
public Boolean exec(Tuple tuple) throws IOException {
/*参数tuple,调用的时候 传递的参数
*
* 在PigLatin调用:
* myresult1 = filter emp by demo.pig.IsSalaryTooHigh(sal)
*/
//取出薪水
int sal = (int) tuple.get(0);
return sal>2000?true:false;
}
}
自定义过滤函数CheckSalaryGrade
//根据员工的薪水,判断级别
//泛型:经过运算后,结果的类型
public class CheckSalaryGrade extends EvalFunc<String>{
@Override
public String exec(Tuple tuple) throws IOException {
// 调用: myresult2 = foreach emp generate ename,sal,demo.pig.CheckSalaryGrade(sal);
int sal = (int)tuple.get(0);
if(sal<1000) return "Grade A";
else if(sal>=1000 && sal<3000) return "Grade B";
else return "Grade C";
}
}
然后将部署的java代码打包成jar包,上传到Linux服务器上,进行注册执行
注册jar包 register /root/temp/pig.jar
为自定义函数起别名(可选):define myfilter demo.pig.MyFilterFunction;
用法1:result1 = filter emp by demo.pig.IsSalaryTooHigh(sal)
用法2:result2 = foreach emp generate ename,demo.pig.CheckSalaryGrade(sal);
9.6.2 自定义加载函数
-
默认情况下,load语句加载数据,一行解析成一个Tuple
比如:员工信息
- 特殊情况:单词统计的时候。这时候:希望每个单词能被解析成一个Tuple,从而便于处理
- 需要MapReduce的 jar 包(参考之前下载MapReduce的jar选择)
自定义加载函数MyLoadFunc
/*
* 自定义的加载函数:
* 实现:WordCount钱加载数据,每一个单词作为一个Tuple
*/
public class MyLoadFunc extends LoadFunc{
//定义一个变量保存输入流
private RecordReader reader ;
@Override
public InputFormat getInputFormat() throws IOException {
// 输入数据的格式:字符串
return new TextInputFormat();
}
@Override
public Tuple getNext() throws IOException {
// 从输入流中读取一行,如何解析生成返回的tuple
//数据: I love Beijing
Tuple result = null;
try{
//判断是否读入了数据
if(!this.reader.nextKeyValue()){
//没有数据
return result; //----> 是null值
}
//数据数据: I love Beijing
String data = this.reader.getCurrentValue().toString();
//生成返回的结果:Tuple
result = TupleFactory.getInstance().newTuple();
//分词
String[] words = data.split(" ");
//每一个单词单独生成一个tuple(s),再把这些tuple放入一个bag中。
//在把这个bag放入result中
//创建一个表
DataBag bag = BagFactory.getInstance().newDefaultBag();
for(String w:words){
//为每个单词生成tuple
Tuple aTuple = TupleFactory.getInstance().newTuple();
aTuple.append(w); //将单词放入tuple
//再把这些tuple放入一个bag中
bag.add(aTuple);
}
//在把这个bag放入result中
result.append(bag);
}catch(Exception ex){
ex.printStackTrace();
}
return result;
}
@Override
public void prepareToRead(RecordReader reader, PigSplit arg1) throws IOException {
// 使用该方法,初始化输入流
// RecordReader reader: 代表HDFS的输入流
this.reader = reader;
}
@Override
public void setLocation(String path, Job job) throws IOException {
// 从HDFS输入的路径(目录)
FileInputFormat.setInputPaths(job, new Path(path));
}
}
然后将部署的java代码打包成jar包,上传到Linux服务器上,进行注册执行
注册jar包: register /root/temp/p1.jar
myresult3 = load ‘/input/data.txt’ using demo.pig.MyLoadFunc();
定义别名(可选):define myload demo.pig.MyLoadFunc;