1 sqoop原理
1.1 sqoop介绍#
Sqoop是Apache旗下的一款“hadoop和关系型数据库服务器之间传送数据”的工具。
导入数据:MySQL、Oracle导入数据到hadoop的hdfs、hive、hbase等数据存储系统。
导出数据:从hadoop的文件系统中导出数据到关系型数据库中。
1.2 sqoop架构#
- 导入流程
- 首先通过jdbc读取关系型数据库元数据信息,获取到表结构。
- 根据元数据信息生成Java类。
- 启动import程序,通过jdbc读取关系型数据库数据,并通过上一步的Java类进行序列化。
- MapReduce并行写数据到Hadoop中,并使用Java类进行反序列化。
- 导出流程
- sqoop通过jdbc读取关系型数据库元数据,获取到表结构信息,生成Java类。
- MapReduce并行读取hdfs数据,并且通过Java类进行序列化。
- export程序启动,通过Java类反序列化,同时启动多个map,通过jdbc将数据写入到关系型数据库中。
2 安装sqoop
选择已经安装好hive镜像,里面以及安装过mysql,hadoop ,zookeeper,hive。根据自己资源情况调整cpu和内存的资源。
镜像地址:http://cloud.hainiubl.com/#/privateImageDetail?id=2953&imageType=private。选择添加到实验配置
1 将sqoop安装包解压到/usr/local目录下
tar -zxvf /public/software/bigdata/sqoop-1.4.7.bin__hadoop-2.6.0.tar.gz -C /usr/local/
2 创建软链接
ln -s sqoop-1.4.7.bin__hadoop-2.6.0/ sqoop
3 修改sqoop安装目录的所有者和属组
chown -R hadoop:hadoop sqoop-1.4.7.bin__hadoop-2.6.0/
4 修改sqoop环境变量
#添加到/etc/profile文件中
export SQOOP_HOME=/usr/local/sqoop
export PATH=$PATH:$SQOOP_HOME/bin
#让环境变量立即生效
source /etc/profile
5 测试sqoop
# shell 里执行 sqoop 命令
sqoop help
6 导入mysql驱动包到sqooplib目录下
cp /usr/local/hive/lib/mysql-connector-java-5.1.49.jar /usr/local/sqoop/lib/
3 sqoop常用参数
- 常用命令
命令名称 | 对应类 | 命令说明 |
---|---|---|
import | ImportTool | 将关系型数据库数据导入到HDFS、HIVE、HBASE |
export | ExportTool | 将HDFS上的数据导出到关系型数据库 |
codegen | CodeGenTool | 获取数据库中某张表数据生成Java并打成Jar包 |
create-hive-table | CreateHiveTableTool | 创建hive的表 |
eval | EvalSqlTool | 查看SQL的执行结果 |
list-databases | ListDatabasesTool | 列出所有数据库 |
list-tables | ListTablesTool | 列出某个数据库下的所有表 |
help | HelpTool | 打印sqoop帮助信息 |
version | VersionTool | 打印sqoop版本信息 |
- 连接参数列表
Argument | Description |
---|---|
--connect <jdbc-uri> | Specify JDBC connect string 指定JDBC连接字符串 |
--connection-manager <class-name> | Specify connection manager class to use 指定要使用的连接管理器类 |
--driver <class-name> | Manually specify JDBC driver class to use 指定要使用的JDBC驱动类 |
--hadoop-mapred-home <dir> | Override $HADOOP_MAPRED_HOME 指定$HADOOP_MAPRED_HOME路径 |
--help | Print usage instructions 帮助信息 |
--password-file | Set path for a file containing the authentication password 设置用于存放认证的密码信息文件的路径 |
-P | Read password from console 从控制台读取输入的密码 |
--password <password> | Set authentication password 设置认证密码 |
--username <username> | Set authentication username 设置认证用户名 |
--verbose | Print more information while working 打印运行信息 |
--connection-param-file <filename> | Optional properties file that provides connection parameters 指定存储数据库连接参数的属性文件 |
- 连接MySQL示例
# 查询数据库列表 对标show databases
sqoop list-databases --connect jdbc:mysql://localhost:3306/ --username root --password 12345678
# 查询指定库下面所有表 对标show tables in cm
sqoop list-tables --connect jdbc:mysql://localhost:3306/hive_meta --username root --password 12345678
4 sqoop应用
4.1 准备测试数据#
应用场景:
使用sqoop上传字典表数据到hive中与我们的数据进行关联查询。
以 商品表 为例:
-- 创建sqoop_db 数据库
create database sqoop_db default charset utf8 collate utf8_general_ci;
-- 导入SQL文件
mysql -uroot -p sqoop_db < /public/data/goods_table.sql
4.2 eval 查看 sql 查询结果#
# 没有where条件
sqoop eval \
--connect jdbc:mysql://nn1:3306/sqoop_db \
--username root \
--password 12345678 \
--query "select * from goods_table limit 10"
4.3 create-hive-table创建hive表#
先启动hadoop集群
# 基于MySQL表创建hive表
sqoop create-hive-table \
--connect jdbc:mysql://nn1:3306/sqoop_db \
--username root \
--password 12345678 \
--table goods_table \
--hive-table hainiu.goods_table
报错:
修改sqoop配置文件
mv sqoop-env-template.sh sqoop-env.sh
添加hadoop,hive,hbase等环境信息
export ZOOKEEPER_HOME=/usr/local/zookeeper
export HADOOP_HOME=/usr/local/hadoop
export HIVE_HOME=/usr/local/hive
export HIVE_CONF_DIR=/usr/local/hive/conf
将hive-common-3.1.3.jar拷贝到sqoop的lib目录下
cp /usr/local/hive/lib/hive-common-3.1.3.jar /usr/local/sqoop/lib
测试:
查看hive表是否创建成功
4.4 多map条件查询导入HDFS#
语法 :
sqoop import \
--connect 数据库连接字符串 \
--username 数据库用户名 \
--password 数据库密码 \
--target-dir HDFS位置 \
--delete-target-dir \
--fields-terminated-by "\t" \
--num-mappers 3 \
--split-by 切分数据依据 \
--query 'select SQL where 查询条件 and $CONDITIONS'
参数解释 :
--query或--e 将查询结果的数据导入,使用时必须伴随参--target-dir,--hive-table,如果查询中有where条件,则条件后必须加上$CONDITIONS关键字
当sqoop使用--query+sql执行多个maptask并行运行导入数据时,每个maptask将执行一部分数据的导入,原始数据需要使用'--split-by 某个字段'来切分数据,不同的数据交给不同的maptask去处理。maptask执行sql副本时,需要在where条件中添加$CONDITIONS条件,这个是linux系统的变量,可以根据sqoop对边界条件的判断,来替换成不同的值,这就是说若split-by id,则sqoop会判断id的最小值和最大值判断id的整体区间,然后根据maptask的个数来进行区间拆分,每个maptask执行一定id区间范围的数值导入任务,如下为示意图。
4.3.1 导入文本文件#
#用hainiu认证
sqoop import \
--connect jdbc:mysql://nn1:3306/sqoop_db"?useUnicode=true&characterEncoding=UTF-8" \
--username root \
--password 12345678 \
--target-dir /user/hainiu/sqoop/data/goods_1 \
--delete-target-dir \
--fields-terminated-by "\001" \
--num-mappers 4 \
--split-by id \
--query 'select * from goods_table where id < 10 and $CONDITIONS'
# 注意:
# --split-by: 一般都是数值型。
# -Dorg.apache.sqoop.splitter.allow_text_splitter=true: --split-by的是字符串也可以
查询结果 :
4.3.2 导入其他格式文件#
# 导入不同格式,支持格式as-avrodatafile、as-parquetfile、as-sequencefile、as-textfile(默认格式)
# 多次导入时会报jar包已存在错误,请忽略,原因为sqoop读取源数据的schema文件创建的jar在前几次任务中已经创建了。
sqoop import \
--connect jdbc:mysql://nn1:3306/sqoop_db"?useUnicode=true&characterEncoding=UTF-8" \
--username root \
--password 12345678 \
--target-dir /user/hainiu/sqoop/data/goods_2_parquet \
--delete-target-dir \
--as-parquetfile \
--num-mappers 4 \
--split-by id \
--query 'select * from goods_table where id < 10 and $CONDITIONS'
结果:
4.5 全量导入hive表#
4.5.1 导入文本表#
# 导入命令
sqoop import \
--connect jdbc:mysql://nn1:3306/sqoop_db"?useUnicode=true&characterEncoding=UTF-8" \
--username root \
--password 12345678 \
--table goods_table \
--num-mappers 1 \
--delete-target-dir \
--hive-import \
--fields-terminated-by "\001" \
--hive-overwrite \
--hive-table hainiu.goods_table
上面过程分为两步:
1)第一步将数据导入到HDFS,默认的临时目录是/user/当前操作用户/mysql表名;
2)第二步将导入到HDFS的数据迁移到Hive表,如果hive表不存在,sqoop会自动创建内部表;(我们的是在/user/hainiu/goods_table,通过查看job的configuration的outputdir属性得知)
结果:
查询数据:
4.6 增量数据导入#
现在我们已经实现了 hive的数据导入方式,那么我们怎么实现hive的增量数据导入呢?
1、append方式
2、lastmodified方式,必须要加--append(追加)或者--merge-key(合并,一般填主键)
我们先建新表进行增量数据的演示
CREATE TABLE `goods_update_table` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`goods_sn` varchar(50) COLLATE utf8_bin NOT NULL COMMENT '商品的唯一编号、货号',
`goods_cname` varchar(100) COLLATE utf8_bin NOT NULL COMMENT '商品名称(中文)',
`goods_ename` varchar(100) COLLATE utf8_bin DEFAULT NULL COMMENT '商品名称(英文)',
`goods_price` double NOT NULL COMMENT '商品价格',
`last_update_time` datetime NOT NULL DEFAULT now()COMMENT '最近一次更新商品配置的时间',
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin;
-- 添加数据
INSERT INTO `goods_update_table` (goods_sn,goods_cname,goods_ename,goods_price,last_update_time)
VALUES ('111111', '漂亮的高跟鞋1', '', 888, '2020-10-10 11:00:00');
4.6.1 全量导入#
sqoop import \
--connect jdbc:mysql://nn1:3306/sqoop_db"?useUnicode=true&characterEncoding=UTF-8" \
--username root \
--password 12345678 \
--table goods_update_table \
--num-mappers 1 \
--delete-target-dir \
--hive-import \
--fields-terminated-by "\001" \
--hive-overwrite \
--hive-table hainiu.goods_update_table
由于 --hive import 与 incremental 冲突, 所以增量导入不能直接导入到hive表中,但可以导入到hive表对应的hdfs目录里
4.6.2 按照id增量导入#
incremental append 用法
-- MySQL添加一条新的数据
INSERT INTO `goods_update_table` (goods_sn,goods_cname,goods_ename,goods_price,last_update_time) VALUES
('222222', '漂亮的长筒靴1', '', 999, '2020-10-10 12:00:00');
-- 按照id增量导入
sqoop import \
--connect jdbc:mysql://nn1:3306/sqoop_db"?useUnicode=true&characterEncoding=UTF-8" \
--username root \
--password 12345678 \
--table goods_update_table \
--num-mappers 1 \
--target-dir /hive/warehouse/hainiu.db/goods_update_table \
--fields-terminated-by "\001" \
--incremental append \
--check-column id \
--last-value 1
-- 参数解释:
-- 1)incremental <mode> : append或lastmodified,使用lastmodified方式导入数据要指定增量数据是要--append(追加)还是要--merge-key(合并)
-- 2)check-column <字段> : 作为增量导入判断的列名
-- 3)last-value val : 指定某一个值,用于标记增量导入的位置,这个值的数据不会被导入到表中,只用于标记当前表中最后的值。
4.6.3 按照时间增量导入#
--incremental lastmodified --append 用法
如果按照时间增量进行数据导入可以使用 --incremental lastmodified --append 这种方式进行数据导入,lastmodified 用于更新的日期列
INSERT INTO `goods_update_table` (goods_sn,goods_cname,goods_ename,goods_price,last_update_time) VALUES
('333333', '漂亮的长筒靴2', '', 999, '2020-10-10 13:00:00');
sqoop import \
--connect jdbc:mysql://nn1:3306/sqoop_db"?useUnicode=true&characterEncoding=UTF-8" \
--username root \
--password 12345678 \
--table goods_update_table \
--num-mappers 1 \
--target-dir /hive/warehouse/hainiu.db/goods_update_table \
--fields-terminated-by "\001" \
--incremental lastmodified \
--check-column last_update_time \
--last-value '2020-10-10 13:00:00' \
--append
-- 注意:last-value 的设置是把包括 2020-10-10 13:00:00 时间的数据做增量导入。
结果:id=3的数据成功导入
4.6.3 按照时间增量并按照主键合并导入#
--incremental lastmodified --merge-key 用法
如果之前的数据有修改的话可以使用--incremental lastmodified --merge-key进行数据合并执行修改的SQL
-- 更改商品价格
update goods_update_table set goods_price=666 where id=3;
进行合并导入
sqoop import \
--connect jdbc:mysql://nn1:3306/sqoop_db"?useUnicode=true&characterEncoding=UTF-8" \
--username root \
--password 12345678 \
--table goods_update_table \
--num-mappers 1 \
--target-dir /hive/warehouse/hainiu.db/goods_update_table \
--fields-terminated-by "\001" \
--incremental lastmodified \
--check-column last_update_time \
--last-value '2020-10-10 13:00:00' \
--merge-key id
-- --incremental lastmodified --merge-key的作用:修改过的数据和新增的数据(前提是满足last-value的条件)都会导入进来,并且重复的数据(不需要满足last-value的条件)都会进行合并
4.7 import to hbase#
在nn1上安装hbase组件
解压hbase安装包到/usr/local目录下
tar -zxvf /public/software/bigdata/hbase-2.4.13-bin.tar.gz -C /usr/local/
创建软连接
ln -s hbase-2.4.13/ hbase
修改hbase安装目录的所有者和属组为hadoop用户hadoop用户组
chown -R hadoop:hadoop /usr/local/hbase-2.4.13
修改conf目录下的hbase-env.sh配置文件
#添加以下内容
export JAVA_HOME=/usr/java/default
export HBASE_MANAGES_ZK=false
修改conf目录下的hbase-site.xml配置文件
<property>
<name>hbase.rootdir</name>
<value>hdfs://ns1/hbase</value>
</property>
<property>
<name>hbase.tmp.dir</name>
<value>/usr/local/hbase/tmp</value>
</property>
<property>
<name>hbase.zookeeper.property.clientPort</name>
<value>2181</value>
</property>
<!-- zookeeper的端口号 -->
<property>
<name>hbase.zookeeper.quorum</name>
<value>nn1</value>
</property>
<property>
<name>hbase.cluster.distributed</name>
<value>true</value>
</property>
将准备好的hbasejar包导入到sqoop的lib目录下
tar -zxvf /public/software/other/hbasejars.tar.gz /usr/local/sqoop/lib
进入hbase客户端并创建hainiu名称空间
#连接hbase客户端
hbase shell
#创建hainiu的名称空间
create_namespace 'hainiu'
# sqoop导入hbase
sqoop import \
--connect jdbc:mysql://nn1:3306/sqoop_db"?useUnicode=true&characterEncoding=UTF-8" \
--username root \
--password 12345678 \
--table goods_table \
--hbase-create-table \
--hbase-table hainiu:goods_table \
--column-family cf \
--hbase-row-key id
# --hbase-row-key: 要求MySQL表必须有主键,将主键作为rowkey,标识一行
导入后,查看:
scan 'hainiu:goods_table'
4.8 export 数据导出#
4.8.1 hdfs数据导出到MySQL中#
hdfs准备如下数据,放到/data/xinniu目录下
101|bob|manager|50000|yanfa
102|jerry|java |40000|yanfa
103|rose|php|30000|yanfa
104|jim|php|30000|yanfa
105|tom|bigdata|50000|yanfa
hadoop fs -put student /data/xinniu/
sqoop将hdfs数据导入到mysql表中,不会自动创建表,所以需要我们在mysql中,根据hdfs文件中的数据,创建对应的表
CREATE TABLE emp (
id INT NOT NULL PRIMARY KEY,
name VARCHAR(20),
deg VARCHAR(20),
salary INT,
dept VARCHAR(10));
以下命令用于hdfs数据(位于HDFS上的/data/xinniu/的文件)导出到mysql中sqoop_db库下的emp表
sqoop export \
--connect jdbc:mysql://nn1:3306/sqoop_db"?useUnicode=true&characterEncoding=UTF-8" \
--username root \
--password 12345678 \
--export-dir /data/xinniu/data.txt \
--table emp \
--num-mappers 1 \
--input-fields-terminated-by '|'
验证
select * from emp;
4.8.2 hive表数据导出到mysql中#
sqoop的export命令支持 insert、update到关系型数据库,但是不支持merge;
hive表导入mysql数据库insert案例
查看hive中hainiu.student表数据
将数据导出到mysql中
CREATE TABLE student (
id INT NOT NULL PRIMARY KEY,
name VARCHAR(20),
age INT);
sqoop export \
--connect jdbc:mysql://nn1:3306/sqoop_db"?useUnicode=true&characterEncoding=UTF-8" \
--username root \
--password 12345678 \
--table student \
--export-dir /hive/warehouse/hainiu.db/student \
--num-mappers 1 \
--fields-terminated-by '\t'
结果:
hive表导入mysql数据库update案例
更新hive的数据
重新将hive表中的数据导入到mysql中并按照id进行更新
sqoop export \
--connect jdbc:mysql://nn1:3306/sqoop_db"?useUnicode=true&characterEncoding=UTF-8" \
--username root \
--password 12345678 \
--table student \
--export-dir /hive/warehouse/hainiu.db/student \
--update-key id \
--num-mappers 1 \
--fields-terminated-by '\t'
查看结果:
6 应用实例
需求:
编写一个脚本
将sqoop_db中的goods_table表每天抽取所有数据并导入到hdfs:/user/hainiu/goods_table目录下。并按照每天的日期生成对应的目录保存表数据。以shell脚本的方式运行每天定时运行。
vim goods_op.sh
batch_date=$1
sqoop import \
--connect jdbc:mysql://nn1:3306/sqoop_db \
--username root \
--password 12345678 \
--target-dir /user/hainiu/goods_table/${batch_date}/ \
--delete-target-dir \
--fields-terminated-by "\t" \
--split-by Id \
--query 'select * from goods_table where $CONDITIONS'
res=$?
if [ ${res} != 0 ];then
echo 'extract goods_table error! '`date` >> /data/hainiu/extract/goods_table.log
exit 1
else
echo 'extract goods_table successful '`date` >> /data/hainiu/extract/goods_table.log
fi
执行时,需要从外界将日期传递过来
# 给脚本添加执行权
chmod a+x goods_op.sh
# 执行脚本
sh -x goods_op.sh 20230305