数仓生态圈辅助工具之Sqoop导入导出数据和Oozie工作流调度

news2024/12/28 22:21:11

数仓生态圈辅助工具


知识点01:Apache Sqoop介绍、工作机制

  • Sqoop介绍

    sqoop是apache旗下一款“Hadoop和关系数据库服务器之间传送数据”的工具。
    导入数据:MySQL,Oracle导入数据到Hadoop的HDFS、HIVE、HBASE等数据存储系统;
    导出数据:从Hadoop的HDFS、HIVE中导出数据到关系数据库mysql等。
    

在这里插入图片描述

  • Sqoop工作机制

    Sqoop工作机制是将导入或导出命令翻译成mapreduce程序来实现。
    在翻译出的mapreduce中主要是对inputformat和outputformat进行定制。
    

在这里插入图片描述

  • sqoop安装

    #你的hive安装在哪一台主机,你的sqoop就要装在哪一台主机,该操作是以node1为例
    
    1、在node1中,上传sqoop的安装包到/export/software目录
    2、在node1中,解压sqoop安装包到/export/server目录
    
     tar -xvf sqoop-1.4.7.bin__hadoop-2.6.0.tar.gz -C /export/server/
     
    3、在node1中,对解压后的sqoop目录进行重命名
     cd /export/server/
     mv sqoop-1.4.7.bin__hadoop-2.6.0 sqoop-1.4.7
     
    4、在node1中,修改sqoop的配置文件,
     cd /export/server/sqoop-1.4.7/conf
     mv sqoop-env-template.sh sqoop-env.sh 
     
     修改sqoop-env.sh 文件,设置以下内容
     export HADOOP_COMMON_HOME=/export/server/hadoop-3.3.0
     export HADOOP_MAPRED_HOME=/export/server/hadoop-3.3.0
     export HIVE_HOME=/export/server/hive-3.1.2 
     
     5、在node1中,加入mysql的jdbc驱动包和hive的执行包
     cp /export/server/hive-3.1.2/lib/mysql-connector-java-5.1.32-bin.jar   /export/server/sqoop-1.4.7/lib/
     
    cp /export/servers/hive-3.1.2/lib/hive-exec-3.1.2.jar /export/servers/sqoop-1.4.7/lib/
    
    cp /export/server/hive-3.1.2/hcatalog/share/hcatalog/hive-hcatalog-core-3.1.2.jar /export/server/sqoop-1.4.7/lib/
     
     6、在node1,node2、node3中,配置环境变量
       vim /etc/profile
       添加以下内容
       
       export SQOOP_HOME=/export/server/sqoop-1.4.7
       export PATH=:$SQOOP_HOME/bin:$PATH
       
       # HCatelog
     export HCAT_HOME=/export/server/hive-3.1.2/hcatalog
    export hive_dependency=$HIVE_HOME/conf:$HIVE_HOME/lib/*:$HIVE_HOME/hcatalog/share/hcatalog/hive-hcatalog-core-3.1.2.jar
    
       添加完之后一定要保存退出,执行以下命令
       source /etc/profile
    
     7、在node1中,测试sqoop
     sqoop list-databases \
     --connect jdbc:mysql://node1:3306/ \
     --username root --password 123456
     
    
  • sqoop测试

     #测试你的sqoop是否能查看MySQL中所有的数据库
     sqoop list-databases \
     --connect jdbc:mysql://hadoop01:3306/ \
     --username root \
     --password 123456
    

知识点02:增量数据、全量数据

  • 全量数据(Full data)

    就是全部数据,所有数据。如对于表来说,就是表中的所有数据。

  • 增量数据(Incremental data)

    就是上次操作之后至今产生的新数据。

  • 数据子集

    也叫做部分数据。整体当中的一部分。


知识点03:Sqoop数据导入至HDFS

  • 测试数据准备

在这里插入图片描述

  • 全量导入MySQL数据到HDFS

    sqoop import \
    --connect jdbc:mysql://hadoop01:3306/userdb \
    --table emp \
    --username root \
    --password 123456 \
    --target-dir /sqoop/result1 \
    --delete-target-dir \
    --m 1
    
    
    #  --table emp  要导入mysql中的userdb数据库的emp表数据
    #  --target-dir /sqoop/result1  表示要导入到hdfs的目标路径
    #  --delete-target-dir          如果目标目录存在,则删除
    #  --m 1                        使用一个Map线程来导入
    
    #导入HDFS之后,默认分隔符是逗号
    
    
    
  • 指定分隔符

    sqoop import \
    --connect jdbc:mysql://192.168.88.80:3306/userdb \
    --username root \
    --password 123456 \
    --target-dir /sqoop/result2 \
    --delete-target-dir \
    --fields-terminated-by  '\t' \
    --table emp \
    --m 1
    
    
    # --fields-terminated-by '\t'  指定HDFS上文件的分隔符是'\t'
    
    
  • 指定任务并行度(maptask个数)

    sqoop import \
    --connect jdbc:mysql://192.168.88.80:3306/userdb \
    --username root \
    --password 123456 \
    --target-dir /sqoop/result3 \
    --delete-target-dir \
    --fields-terminated-by '\t' \
    --split-by id \
    --table emp \
    --m 2
    
    
    #如果你要指定多个maptask来完成数据的导入,也就是--m参数的值不是1,则必须添加一个参数--split-by
    #该参数用来指定你原表的数据如何分配给多个线程来实现导入
    #--split-by id  内部原理是获取id的最小值和id的最大值,进行平均划分
     SELECT MIN(`id`), MAX(`id`) FROM `emp`
    
    #maptask1要导入的数据
    1201	gopal	manager	50000	TP
    1202	manisha	Proof reader	50000	TP
    
    
    #maptask2要导入的数据
    1203	khalil	php dev	30000	AC
    1204	prasanth	php dev	30000	AC
    1205	kranthi	admin	20000	TP
    
    
    
    sqoop import \
    -Dorg.apache.sqoop.splitter.allow_text_splitter=true \
    --connect jdbc:mysql://192.168.88.80:3306/userdb \
    --username root \
    --password 123456 \
    --target-dir /sqoop/result3 \
    --delete-target-dir \
    --fields-terminated-by '\t' \
    --split-by name \
    --table emp \
    --m 2
    
    #小结:split-by后边的字段必须是数字类型,或者是数字字符串类型(‘123’)
          如果是数字类型,则不需要额外添加内容
          如果是数字字符串类型,则需要添加以下内容
    sqoop import \
    -Dorg.apache.sqoop.splitter.allow_text_splitter=true \
    --connect jdbc:mysql://192.168.88.80:3306/userdb \
    --username root \
    --password 123456 \
    --target-dir /sqoop/result3 \
    --delete-target-dir \
    --fields-terminated-by '\t' \
    --split-by name \
    --table emp \
    --m 2
    

知识点04:Sqoop数据导入至Hive

  • 测试准备

    -- Hive中创建测试使用的数据库
    drop database if exists test cascade ;
    create database if not exists test;
    
  • 方式1-先复制mysql的表结构到Hive,然后再导入数据

    1、先复制表结构到hive中再导入数据,将关系型数据的表结构复制到hive中
    sqoop create-hive-table \
    --connect jdbc:mysql://hadoop01:3306/userdb \
    --table emp_add \
    --username root \
    --password 123456 \
    --hive-table test.emp_add_sp
    
    其中:
     --table emp_add为mysql中的数据库userdb中的表。   
     --hive-table emp_add_sp 为hive中新建的表名称。
     复制表结构默认分隔符是'\001'
    
    2、从关系数据库导入文件到hive中
    sqoop import \
    --connect jdbc:mysql://hadoop01:3306/userdb \
    --username root \
    --password 123456 \
    --table emp_add \
    --hive-table test.emp_add_sp \
    --hive-import \
    --m 1 
    
    
  • 方式2:直接导入数据(建表 + 导入数据)

    -- 1、使用hive默认分隔符 '\001'
    
    sqoop import \
    --connect jdbc:mysql://192.168.88.80:3306/userdb \
    --table emp_conn \
    --username root \
    --password 123456 \
    --hive-import \
    --hive-database test \
    --m 1 
    
    
    #从MySQL的userdb数据库的emp_conn表导入到hive的test数据库的emp_conn表
    #如果多次导入,则会进行数据追加
    
    #如果要覆盖操作,需要加参数:
     --hive-overwrite
    
    -- 2、使用指定分隔符  '\t'
    sqoop import \
    --connect jdbc:mysql://192.168.88.80:3306/userdb \
    --username root \
    --password 123456 \
    --table emp_conn \
    --hive-import \
    --hive-database test \
    --fields-terminated-by '\t' \
    --m 1 
    
    
    帮助手册:
     sqoop help 
     
     sqoop help import
    

知识点06:Sqoop数据导入至Hive–HCatalog API

  • sqoop API 原生方式

    所谓sqoop原生的方式指的是sqoop自带的参数完成的数据导入。

    但是有什么不好的地方呢?请看下面案例

    -- 手动在hive中建一张表
    create table test.emp_hive
    (
        id     int,
        name   string,
        deg    string,
        salary int,
        dept   string
    )
    row format delimited fields terminated by '\t'
    stored as orc;
    
    --注意,这里指定了表的文件存储格式为ORC。
    --从存储效率来说,ORC格式胜于默认的textfile格式。
    
    sqoop import \
    --connect jdbc:mysql://hadoop01:3306/userdb \
    --table emp \
    --username root \
    --password 123456 \
    --fields-terminated-by '\t' \
    --hive-database test \
    --hive-table emp_hive \
    -m 1
    

    执行之后,可以发现虽然针对表emp_hive的sqoop任务成功,但是Hive表中却没有数据

在这里插入图片描述

在这里插入图片描述

  • HCatalog API方式

    Apache HCatalog是基于Apache Hadoop之上的数据表和存储管理服务。

    包括:

    • 提供一个共享的模式和数据类型的机制。
    • 抽象出表,使用户不必关心他们的数据怎么存储,底层什么格式。
    • 提供可操作的跨数据处理工具,如Pig,MapReduce,Streaming,和Hive。

    sqoop的官网也做了相关的描述说明,使用HCatalog支持ORC等数据格式。

在这里插入图片描述

#如果要将从MySQL导入到Hive的ORC格式的表,表虚手动在Hive中先创建表
sqoop import \
--connect jdbc:mysql://192.168.88.80:3306/userdb \
--username root \
--password 123456 \
--table emp \
--fields-terminated-by '\t' \
--hcatalog-database test \
--hcatalog-table emp_hive \
-m 1

可以发现数据导入成功,并且底层是使用ORC格式存储的。

  • sqoop原生API和 HCatalog区别

    #数据格式支持(这是实际中使用HCatalog的主要原因,否则还是原生的灵活一些)
    	Sqoop方式支持的数据格式较少;
    	HCatalog支持的数据格式多,包括RCFile, ORCFile, CSV, JSON和SequenceFile等格式。
    
    #数据覆盖
    	Sqoop方式允许数据覆盖,HCatalog不允许数据覆盖,每次都只是追加。
    
    #字段名匹配
    	Sqoop方式比较随意,不要求源表和目标表字段相同(字段名称和个数都可以不相同),它抽取的方式是将字段按顺序插入,比如目标表有3个字段,源表有一个字段,它会将数据插入到Hive表的第一个字段,其余字段为NULL。
    	但是HCatalog不同,源表和目标表字段名需要相同,字段个数可以不相等,如果字段名不同,抽取数据的时候会报NullPointerException错误。HCatalog抽取数据时,会将字段对应到相同字段名的字段上,哪怕字段个数不相等。
    	
    
    导入过程:
      1:在Hive中手动建表
      2:通过Sqoop导入
      3:检查数据是否到位
    

    行存储和列存储

1、行存储
  1)表数据在硬盘上以行为单位,一行的数据是连续存储在一起 ,select * from A  查询效率高。
  2)行存储代表:TextFile、SequenceFile
    create table test.emp_hive
    (
        id     int,
        name   string,
        deg    string,
        salary int,
        dept   string
    )
    row format delimited fields terminated by '\t'
    stored as textfile;
    
2、列存储
    1)表数据在硬盘上以列为单位,一列的数据是连续存储在一起 ,select 字段 from A  查询效率高。
    2)列存储代表:ORC、Parquet 
    create table test.emp_hive
    (
        id     int,
        name   string,
        deg    string,
        salary int,
        dept   string
    )
    row format delimited fields terminated by '\t'
    stored as orc;
    
    
 3、ORC格式插入数据的步骤
 
  1)准备数据:log.dat 18.2M
  2)创建普通表,存储格式是TEXTFILE
  create temporary table log_text (
        track_time string,
        url string,
        session_id string,
        referer string,
        ip string,
        end_user_id string,
        city_id string
   )
   ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'
   STORED AS TEXTFILE ;
  3)给普通表插入数据
  load data local inpath '/root/log.data' into table log_text;
  
  4)创建ORC存储的表
  create table log_orc(
    track_time string,
    url string,
    session_id string,
    referer string,
    ip string,
    end_user_id string,
    city_id string
   )
    ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'
    STORED AS orc ;
  5)从普通表查询数据插入到orc表
  insert into table log_orc  select * from log_text;

知识点07:Sqoop数据导入-HDFS-条件部分导入

在实际开发中,有时候我们从RDBMS(MySQL)中导入数据时,不需要将数据全部导入,而只需要导入满足条件的数据,则需要进行条件导入
  • query查询

    sqoop import \
    --connect jdbc:mysql://192.168.88.80:3306/userdb \
    --username root \
    --password 123456 \
    --target-dir /sqoop/result5 \
    --query 'select id,name,deg from emp WHERE  id>1203 and $CONDITIONS' \
    --delete-target-dir \
    --fields-terminated-by '\001' \
    --m 1
    

    使用sql query语句来进行查找时,不能加参数–table

    并且必须要添加where条件

    并且where条件后面必须带一个$CONDITIONS这个字符串;

    并且这个sql语句必须用单引号,不能用双引号。

在这里插入图片描述


知识点08:Sqoop数据导入-HDFS-增量导入

1、全量导入,表所有数据全部导入
2、条件导入,只要满足查询条件的就导入
3、增量导入,之前已经导入过一次,下一次只导入新增加或者修改的数据
    方式1-使用sqoop自带的参数实现增量导入
    方式2-使用用户自定义条件来实现增量导入(使用该方式比较多)
    增量导入的难点:
       因为你之前已经导入多一次,下一次导入时一定要判断哪些数据是已经导入过的,则不要导入,哪些数据是新增加的或者修改的,则需要导入
  • 方式一:sqoop自带参数实现

    设计思路:对某一列值进行判断,只要大于上一次的值就会导入。

    所谓的增量实现,肯定需要一个判断的依据,上次到哪里了,这次从哪里开始。

       --check-column <column>        Source column to check for incremental
                                      change
       --incremental <import-type>    Define an incremental import of type
                                      'append' or 'lastmodified'
       --last-value <value>           Last imported value in the incremental
                                      check column
       
    
    • append模式

      • 要求:必须有一列自增的值,按照自增的int值进行判断
      • 特点:只能导入增加的数据,无法导入更新的数据
      #第一次,全量导入-首先执行以下指令先将我们之前的数据导入
      sqoop import \
      --connect jdbc:mysql://192.168.88.80:3306/userdb \
      --table emp \
      --username root \
      --password 123456 \
      --target-dir /sqoop/appendresult \
      --m 1
      
      #查看生成的数据文件,发现数据已经导入到hdfs中.
      
      #模拟新增加数据,然后在mysql的emp中插入2条数据:
      insert into `userdb`.`emp` (`id`, `name`, `deg`, `salary`, `dept`) values ('1206', 'allen', 'admin', '30000', 'tp');
      insert into `userdb`.`emp` (`id`, `name`, `deg`, `salary`, `dept`) values ('1207', 'woon', 'admin', '40000', 'tp');
      
      #第二次导入,执行如下的指令,实现增量的导入:
      sqoop import \
      --connect jdbc:mysql://192.168.88.80:3306/userdb \
      --username root \
      --password 123456 \
      --table emp --m 1 \
      --target-dir /sqoop/appendresult \
      --incremental append \
      --check-column id \
      --last-value 1205
      
      #该方式最大的问题是需要记住上一次导入的last-value值
      
      ####如果想实现sqoop自动维护增量记录  可以使用sqoop job作业来实现
      21/10/09 15:03:37 INFO tool.ImportTool:  --incremental append
      21/10/09 15:03:37 INFO tool.ImportTool:   --check-column id
      21/10/09 15:03:37 INFO tool.ImportTool:   --last-value 1207
      21/10/09 15:03:37 INFO tool.ImportTool: (Consider saving this with 'sqoop job --create')
      

在这里插入图片描述

> 并且还可以结合sqoop job作业,实现sqoop自动记录维护last-value值,详细可以参考课程资料。

在这里插入图片描述

  • lastmodifield模式

    • 要求:必须包含动态时间变化这一列,按照数据变化的时间进行判断
    • 特点:既导入新增的数据也导入更新的数据
    # 首先我们要在mysql中创建一个customer表,指定一个时间戳字段
    create table userdb.customertest(
      id int,name varchar(20),
      last_mod timestamp default current_timestamp on update current_timestamp
    );
    #此处的时间戳设置为在数据的产生和更新时都会发生改变. 
    
    #插入如下记录:
    insert into userdb.customertest(id,name) values(1,'neil');
    insert into userdb.customertest(id,name) values(2,'jack');
    insert into userdb.customertest(id,name) values(3,'martin');
    insert into userdb.customertest(id,name) values(4,'tony');
    insert into userdb.customertest(id,name) values(5,'eric');
    
    #第一次全量导入:此时执行sqoop指令将数据导入hdfs:
    sqoop import \
    --connect jdbc:mysql://192.168.88.80:3306/userdb \
    --username root \
    --password 123456 \
    --target-dir /sqoop/lastmodifiedresult \
    --table customertest \
    --m 1
    
    #再次插入一条数据进入customertest表 - 模拟新增
    insert into userdb.customertest(id,name) values(6,'james');
    #更新一条已有的数据,这条数据的时间戳会更新为我们更新数据时的系统时间. - 模拟修改
    update userdb.customertest set name = 'NEIL' where id = 1;
    
    
    #第二次导入:执行如下指令,把id字段作为merge-key:
    sqoop import \
    --connect jdbc:mysql://192.168.88.80:3306/userdb \
    --username root \
    --password 123456 \
    --table customertest \
    --target-dir /sqoop/lastmodifiedresult \
    --incremental lastmodified \
    --check-column last_mod \
    --last-value '2022-10-29 15:13:18' \
    --merge-key id \
    --m 1 \
    
    
    #解释:
    	--incremental lastmodified 第二次导入数据和第一次导入的数据合并,重新执行MR,将MR执行的    覆盖原来的数据
     	--last-value "2022-06-10 10:59:36"  把大于等于这个日期的数据导入
     	--merge-key id  如果第一次导入的数据和第二次导入的数据id相同,则合并
     
    #由于merge-key这种模式是进行了一次完整的mapreduce操作,
    #因此最终我们在lastmodifiedresult文件夹下可以发现id=1的name已经得到修改,同时新增了id=6的数据
    
  • 方式二:(重点)用户条件过滤实现(不用敲)

    • 通过where对字段进行过滤
    
    -- 导入从某一个时间点的数据
    sqoop import \
    --connect jdbc:mysql://192.168.88.80:3306/userdb \
    --username root \
    --password 123456 \
    --query "select * from customertest where last_mod >'2022-06-10 10:59:36' and  \$CONDITIONS" \
    --fields-terminated-by '\001' \
    --hcatalog-database test \
    --hcatalog-table customertest \
    -m 1
    
    -- 导入上一天数据,指定时间区间
    
    sqoop import \
    --connect jdbc:mysql://192.168.88.80:3306/userdb \
    --username root \
    --password 123456 \
    --query "select * from customertest where last_mod  >= '2022-09-12 00:00:00' and last_mod <= '2022-09-12 23:59:59'   and  \$CONDITIONS" \
    --fields-terminated-by '\001' \
    --hcatalog-database test \
    --hcatalog-table customertest \
    -m 1
    
    
    
    
    
    sqoop import \
    --connect jdbc:mysql://192.168.88.80:3306/userdb \
    --username root \
    --password 123456 \
    --query "select * from emp where id>1203 and  \$CONDITIONS" \
    --fields-terminated-by '\001' \
    --hcatalog-database test \
    --hcatalog-table emp_hive \
    -m 1
    

知识点09:Sqoop数据导出

sqoop导出操作最大的特点是,目标表需要自己手动提前创建

1、在大数据中,数据的导出一般发生在大数据分析的最后一个阶段
2、将分析后的指标导入一些通用的数据库系统中,用于进一步使用
3、导入到MySQL时,需要在MySQL中提前创建表
  • (重点)全量数据导出

    #step1:MySQL中建表
     create table userdb.employee ( 
       id int  primary key, 
       name varchar(20), 
       deg varchar(20),
       salary int,
       dept varchar(10));
       
    #step2:从HDFS导出数据到MySQL
    sqoop export \
    --connect jdbc:mysql://192.168.88.80:3306/userdb \
    --table employee \
    --username root \
    --password 123456 \
    --export-dir /sqoop/result1/
    
    
    #解释:将HDFS的/sqoop/result1/目录下的文件内容,导出到mysql中userdb数据库下的employee表
    
    
    
    #step3:从Hive导出数据到MySQL
    #首先清空MySQL表数据
    truncate table employee;
    
    sqoop export \
    --connect "jdbc:mysql://192.168.88.80:3306/userdb? useUnicode=true&characterEncoding=utf-8" \
    --table employee \
    --username root \
    --password 123456 \
    --hcatalog-database test \
    --hcatalog-table emp_hive \
    --input-fields-terminated-by '\t' \
    -m 1 
    
    #解释: 将Hive中test数据库下的emp_hive表导出到MySQL的userdb数据库中employee表
    #--input-fields-terminated-by '\t' :表示sqoop去hive的表目录下读取文件时,使用'\t'对文件进行切割,如果hive文件分隔符是'\001',则该参数不用指定
    
    #注意,如果Hive中的表底层是使用ORC格式存储的,那么必须使用hcatalog API进行操作。
    
  • 增量数据导出

    • updateonly:只增量导出更新的数据
    • allowerinsert:既导出更新的数据,也导出新增的数据
    • updateonly模式

      #在HDFS文件系统中/sqoop/updateonly/目录的下创建一个文件updateonly_1.txt
      hadoop fs -mkdir -p /sqoop/updateonly/
      hadoop fs -put updateonly_1.txt /sqoop/updateonly/
      
      1201,gopal,manager,50000
      1202,manisha,preader,50000
      1203,kalil,php dev,30000
      
      
      #手动创建mysql中的目标表
      
       CREATE TABLE userdb.updateonly ( 
         id INT NOT NULL PRIMARY KEY, 
         name VARCHAR(20), 
         deg VARCHAR(20),
         salary INT
         );
      
      #先执行全部导出操作:
      sqoop export \
      --connect jdbc:mysql://192.168.88.80:3306/userdb \
      --table updateonly \
      --username root \
      --password 123456 \
      --export-dir /sqoop/updateonly/updateonly_1.txt
      
      
      
      
      #在HDFS新增一个文件updateonly_2.txt:修改了前三条数据并且新增了一条记录
      1201,gopal,manager,1212
      1202,manisha,preader,1313
      1203,kalil,php dev,1414
      1204,allen,java,1515
      
      
      hadoop fs -put updateonly_2.txt /sqoop/updateonly/
      
      #执行更新导出:
      sqoop export \
      --connect jdbc:mysql://192.168.88.80:3306/userdb \
      --username root \
      --password 123456 \
      --table updateonly \
      --export-dir /sqoop/updateonly/updateonly_2.txt \
      --update-key id \
      --update-mode updateonly
      
      #解释:
        --update-key id  根据id这列来判断id两次导出的数据是否是同一条数据,如果是则更新
        --update-mode updateonly 导出时,只导出第一次和第二次的id都有的数据,进行更新,不会导出HDFS中新增加的数据
      
    • allowinsert模式

      #手动创建mysql中的目标表
      CREATE TABLE userdb.allowinsert ( 
         id INT NOT NULL PRIMARY KEY, 
         name VARCHAR(20), 
         deg VARCHAR(20),
         salary INT
      );
         
      #先执行全部导出操作
      sqoop export \
      --connect jdbc:mysql://192.168.88.80:3306/userdb \
      --username root \
      --password 123456 \
      --table allowinsert \
      --export-dir /sqoop/updateonly/updateonly_1.txt
      
      
      #执行更新导出
      sqoop export \
      --connect jdbc:mysql://192.168.88.80:3306/userdb \
      --username root --password 123456 \
      --table allowinsert \
      --export-dir /sqoop/updateonly/updateonly_2.txt \
      --update-key id \
      --update-mode allowinsert
      

知识点10:工作流介绍

  • 工作流概念

    	工作流(Workflow),指“业务过程的部分或整体在计算机应用环境下的自动化”。是对工作流程及其各操作步骤之间业务规则的抽象、概括描述。
    	工作流解决的主要问题是:为了实现某个业务目标,利用计算机软件在多个参与者之间按某种预定规则自动传递文档、信息或者任务。
    	一个完整的数据分析系统通常都是由多个前后依赖的模块组合构成的:数据采集、数据预处理、数据分析、数据展示等。各个模块单元之间存在时间先后依赖关系,且存在着周期性重复。
    核心概念:依赖执行 周期重复执行
    
  • 工作流实现方式

    • 自己开发实现调度工具
    • 使用第三方调度软件

知识点11:Apache Oozie介绍、架构

  • 工作流调度

    1、在大数据的实际开发中,我们数据的整个处理和分析过程是周期型的工作,比如每隔一天都需要讲整个流程执行一遍
    2、所以我们需要一个工作流调度框架,帮我们在合适的时间重复执行这个分析任务
    
  • oozie介绍

    	Oozie是一个用来管理 Hadoop生态圈job的工作流调度系统。由Cloudera公司贡献给Apache。
    		Cloudra产品: Cloudera Manager、Cloudera Hadoop(CDH)、Oozie
    	Oozie是运行于Java servlet容器上的一个java web应用。
    	Oozie的目的是按照DAG(有向无环图)调度一系列的Map/Reduce或者Hive等任务。Oozie 工作流由hPDL(Hadoop Process Definition Language)定义(这是一种XML流程定义语言)。
    	适用场景包括:
    		需要按顺序进行一系列任务;
    		需要并行处理的任务;
    		需要定时、周期触发的任务;
    		可视化作业流运行过程;
    		运行结果或异常的通报。
    

在这里插入图片描述

  • oozie架构

在这里插入图片描述

#Oozie Client
	提供命令行、java api、rest等方式,对Oozie的工作流流程的提交、启动、运行等操作;

#Oozie WebApp
	即 Oozie Server,本质是一个java应用。可以使用内置的web容器,也可以使用外置的web容器;

#Hadoop Cluster 
	底层执行Oozie编排流程的各个hadoop生态圈组件;

在这里插入图片描述


在这里插入图片描述

知识点12:Oozie工作流类型

workflow 普通工作流 没有定时和条件触发功能。

coordinator 定时工作流 可以设置执行周期和频率

bundle 批处理工作流 一次可以提交执行多个coordinator

在这里插入图片描述

在这里插入图片描述

在这里插入图片描述


知识点13:Oozie使用案例

  • Oozie的操作步骤-WorkFlow

    • 1、开启Oozie服务

在这里插入图片描述

  • 2、打开Oozie页面

在这里插入图片描述

  • 3、创建WorkFlow工作流,并命名

在这里插入图片描述

  • 4、让oozie调度hive脚本

在这里插入图片描述

  • 5、找到你的hive脚本

在这里插入图片描述

+ 5.1 在Linux的/root目录,写一个SQL脚本:hive_test.sql

  ~~~shell
  create database myhive;
  use myhive;
  create  table if not exists stu(id int ,name string)
   row format delimited fields terminated by '\t';
  insert into stu values (1,"zhangsan");
  insert into stu values (2,"lisi");
  ~~~

  + 5.1 在本地设置脚本权限为为777

    ~~~shell
    chmod 777  /root/hive_test.sh
    ~~~

  + 5.2 将脚本上传到HDFS的/user/hue/oozie_test目录

    ~~~shell
    hadoop fs -put  /root/hive_test.sh /user/hue/oozie_test
    hadoop fs -chmod 777 /user/hue/oozie_test/hive_test.sh
    ~~~
  • 6、保存工作流

在这里插入图片描述

  • 7、点击执行

在这里插入图片描述

  • 8、查看进度

在这里插入图片描述

  • 9、以后如何找到该工作流?

在这里插入图片描述

在这里插入图片描述

  • Oozie的操作步骤-定时计划

    • 1、打开定时计划页面

在这里插入图片描述

  • 2、进行设置

在这里插入图片描述

  • 3、保存并执行

  • Oozie的操作步骤-多个定时计划批量执行(Bundle)

在这里插入图片描述

  • WorkFlow 、计划、Bundle之间的关系

    1、WorkFlow是一次性的工作流
    2、计划是让WorkFlow可以定时的多次执行
    3、Bundle是包含多个计划,可以让多个计划进行批量管理
    
    结论: Bundle包含计划, 计划包含WorkFlow, WorkFlow包含命令和脚本
    
    
  • 新增oozie程序

  • 提交任务

  • 修复异常

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/401490.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

案例14-课程推送页面逻辑整理--vue

目录一级目录二级目录三级目录一、背景介绍二、问题分析问题1&#xff1a;逻辑边界不清晰&#xff0c;封装意识缺乏问题问题2&#xff1a;展示效果上的问题三、解决过程问题一 代码结构混乱问题解决问题二 代码结构混乱问题解决问题三 展示效果上的细微问题四、总结一级目录 二…

JVM(学习原因,虚拟机,作用,位置,组成,各部分用途,执行流程,架构类型)

JVM概述 这里写目录标题JVM概述1.1为什么学习 JVM1.2虚拟机1.3JVM 作用1.4JVM的位置1.5JVM组成1.6各个组成部分的用途1.7JVM代码的执行流程1.8JVM架构类型1.1为什么学习 JVM 中高级程序员必备技能 项目管理,性能调优 1.2虚拟机  所谓虚拟机&#xff08;Virtual Machine&a…

Linux基础命令-ss显示socket信息

Linux基础命令-netstat显示网络状态 ss 一. 命令介绍 先使用手册查看命令介绍信息 NAME ss - another utility to investigate sockets DESCRIPTION ss is used to dump socket statistics. It allows showing information similar to netstat. It can display more TCP and …

Flink从入门到精通系列(一)

1、Flink概述 Apache Flink 是一个框架和分布式处理引擎&#xff0c;用于在&#xff0c; 无边界和有边界数据流上进行有状态的计算 &#xff0c;Flink 能在所有常见集群环境中运行&#xff0c;并能以内存速度和任意规模进行计算。 Apache Flink 功能强大&#xff0c;支持开发…

使用自定义数据绘制脑地形矩阵图

最近做数据处理,想画点自己想要的图,但是找遍了各种库,都没有一个函数可以实现我想要的效果,所以关机时刻还得靠自己啊,自己动手丰衣足食,记录一下实现过程,方便以后查阅。 使用自定义数据绘制脑地形矩阵图 对于处理后的数据我想实现下图所示的效果,以矩阵的形式排列脑…

PyQt5可视化 7 饼图和柱状图实操案例 ②建表建项目改布局

目录 一、数据库建表 1 建表 2 插入数据 3 查看表数据 二、建立项目 1 新建项目 2 appMain.py 3 myMainWindow.py 4 myChartView.py 2.4.1 提升的后果 2.4.2 QmyChartView类说明 2.4.3 添加代码 三、修改myMainWindow.py程序&#xff0c;添加功能 1 打开数据库 …

第十届省赛——8人物相关性分析(数组)

题目&#xff1a;试题 H: 人物相关性分析时间限制: 1.0s 内存限制: 512.0MB 本题总分&#xff1a;20 分【问题描述】小明正在分析一本小说中的人物相关性。他想知道在小说中 Alice 和 Bob有多少次同时出现。更准确的说&#xff0c;小明定义 Alice 和 Bob“同时出现”的意思是&a…

微小目标识别研究(2)——基于K近邻的白酒杂质检测算法实现

文章目录实现思路配置opencv位置剪裁实现代码自适应中值滤波实现代码动态范围增强实现代码形态学处理实现代码图片预处理效果计算帧差连续帧帧差法原理和实现代码实现代码K近邻实现基本介绍实现代码这部分是手动实现的&#xff0c;并没有直接调用相关的库完整的代码——调用ope…

千川投放50问(完)!如何跑出高投产?

第四十一问&#xff1a;计划初期成本很高&#xff0c;是否要关掉重新跑&#xff1f;首先看一下是不是初期回传延迟导致的成本偏高。如果成本没有高的&#xff0c;不建议暂停&#xff0c;先观察一段时间数据&#xff0c;给它一点学习时间。当系统积累过足够的模型之后&#xff0…

08-Oracle游标管理(定义,打开、获取数据及关闭游标)

目标 1.确定何时需要显示游标2.声明、打开和关闭显示游标3.从显示游标中提取数据4.了解与游标有关的属性5.使用游标FOR循环检索游标中的数据6.在游标FOR循环的子查询中声明游标7.评估使用逻辑运算符结合在一起的布尔条件游标 1、在使用一个PL/SQL块来执行DML语句或只返回一行结…

2月更新 | Visual Studio Code Python

我们很高兴地宣布&#xff0c;2023年2月版 Visual Studio Code Python 和 Jupyter 扩展现已推出&#xff01;此版本包括以下改进&#xff1a;从激活的终端启动 VS Code 时的自动选择环境 使用命令 Python: Create Environmen 时可选择需求文件或可选依赖项 预发布&#xff1a;改…

性能优化之HBase性能调优

HBase是Hadoop生态系统中的一个组件&#xff0c;是一个分布式、面向列存储的内存型开源数据库&#xff0c;可以支持数百万列&#xff08;MySQL4张表在HBase中对应1个表&#xff0c;4个列&#xff09;、超过10亿行的数据存储。可用作&#xff1a;冷热数据分离HBase适合作为冷数据…

坐标系、视窗体(裁剪区域),存储着色器

坐标系 两种常见的投影/坐标系&#xff0c;正交和透视&#xff0c;实际上只是特定的4x4变换矩阵。啥都不规定默认的就是-1.0~1.0的笛卡尔坐标系。 正交&#xff1a; 在opengl的核心框架下&#xff0c;没有提供任何内置渲染管线&#xff0c;所以在提交一个几何图形进行渲染之前&…

Ubuntu Protobuf 安装(测试有效)

安装流程 下载软件 下载自己要安装的版本&#xff1a;https://github.com/protocolbuffers/protobuf 下载源码编译&#xff1a; 系统环境&#xff1a;Ubuntu16&#xff08;其它版本亦可&#xff09;&#xff0c;Protobuf-3.6.1 编译源码 cd protobuf# 当使用 git clone 下来的…

【C语言】操作符详解总结(万字)

操作符详解1. 操作符分类2. 算术操作符3. 移位操作符3.1 整数的二进制是怎么形成的3.2 左移操作符3.3 右移操作符4. 位操作符5. 赋值操作符6. 单目操作符6.1 单目操作符介绍6.2 sizeof 和 数组7. 关系操作符8. 逻辑操作符9. 条件操作符9.1 练习19.2 练习210. 逗号表达式11. 下标…

【Vue】vue2导出页面内容为pdf文件,自定义选中页面内容导出为pdf文件,打印选中页面内容,预览打印内容

提示&#xff1a;文章写完后&#xff0c;目录可以自动生成&#xff0c;如何生成可参考右边的帮助文档 文章目录前言一、安装html2canvas和jspdf二、导出pdf使用步骤1.在utils文件夹下创建htmlToPdf.js2.在main.js中引入3.在页面中使用三、打印预览1. 引入print-js2.页面中impor…

PG数据库实现高可用方案(包括通用型方案Corosync+pacemaker协作)

实现高可用方案首先了解一下高可用集群高可用&#xff1a;透明切换&#xff0c;故障切换&#xff0c;连接管理器/集群管理器pgpool-Ⅱ&#xff1a;连接池、复制、负载均衡功能PatroniCorosyncpacemaker高可用解决方案Corosyncpacemakercorosyncpacemaker架构协作资源分配&#…

功耗降低99%,Panamorph超清VR光学架构解析

近期&#xff0c;投影仪变形镜头厂商Panamorph获得新型VR显示技术专利&#xff08;US11493773B2&#xff09;&#xff0c;该专利方案采用了紧凑的结构&#xff0c;结合了Pancake透镜和光波导显示模组&#xff0c;宣称比传统VR方案的功耗、发热减少99%以上&#xff0c;可显著提高…

通讯录(C++实现)

系统需求通讯录是一个可以记录亲人、好友信息的工具。本章主要利用C来实现一个通讯录管理系统系统中需要实现的功能如下:添加联系人:向通讯录中添加新人&#xff0c;信息包括&#xff08;姓名、性别、年龄、联系电话、家庭住址&#xff09;最多记录1000人显示联系人:显示通讯录…

【006】Redis主从/哨兵/分片集群Docker搭建

项目源码合集 https://gitee.com/qiuyusy/small-project-study 搭建过程疯狂踩坑,记录一下希望各位少走弯路 目录主从搭建配置文件redis.conf运行容器测试优化哨兵集群配置文件运行容器测试代码读写分离分片集群mkdir -p /opt/docker/redis_study/redis_0/conf mkdir -p /opt/…