广告数仓:全流程调度

news2024/11/9 5:17:36

系列文章目录

广告数仓:采集通道创建
广告数仓:数仓搭建
广告数仓:数仓搭建(二)
广告数仓:全流程调度


文章目录

  • 系列文章目录
  • 前言
  • 一、ClickHouse安装
    • 1.修改环境
    • 2.安装依赖
    • 3.单机安装
    • 4.修改配置文件
    • 5.启动clickhouse
    • 6.创建需要的数据库和表
    • 7.Hive数据导出至Clickhouse
      • 1.新建IDEA工程
      • 2.pom依赖
      • 3.HiveToClickhouse.java
      • 4.上传hive-site.xml
      • 5.打包编译
  • 二、DolphinScheduler安装
    • 1.软件下载
    • 2.编写部署脚本
    • 3.创建元数据库及用户
    • 4.运行脚本
  • 三、全流程调度
    • 1.dolphinscheduler启动
    • 2.环境准备
      • 1.创建租户
      • 2.创建用户
      • 3.创建环境
      • 4.worker分组
      • 5.上传脚本
      • 6.创建项目
      • 7.定义工作流
    • 3.数据准备
      • 1.配置数据日期
      • 2.启动环境
      • 3.生成数据
      • 4.全流程调度
  • 总结


前言

这次我们为广告仓库进行一些收尾工作,主要是最终的数据导出和全流程调度以及数据的可视化,不知道一次能不能写完


一、ClickHouse安装

ClickHouse是一个用于联机分析(OLAP)的列式数据库管理系统(DBMS)。

1.修改环境

sudo vim /etc/security/limits.conf
#####
* soft nofile 65536 
* hard nofile 65536 
* soft nproc 131072 
* hard nproc 131072

在这里插入图片描述

sudo vim /etc/security/limits.d/20-nproc.conf
#####
* soft nofile 65536 
* hard nofile 65536 
* soft nproc 131072 
* hard nproc 131072
sudo vim /etc/selinux/config 
#####
SELINUX=disabled

在这里插入图片描述

分发配置文件

sudo xsync /etc/security/limits.conf
sudo xsync /etc/security/limits.d/20-nproc.conf
sudo xsync /etc/selinux/config

2.安装依赖

三台电脑建议都安装

sudo yum install -y libtool
sudo yum install -y *unixODBC*	

3.单机安装

企业实战中,根据自己的需求选择安装的节点数。
这里为了考虑到电脑性能,就是用单机安装。
在software目录下创建clickhouse目录,然后上传提供的安装包
在这里插入图片描述
在当前目录下执行所有rpm文件

 sudo rpm -ivh *.rpm

4.修改配置文件

 sudo vim /etc/clickhouse-server/config.xml

把这一行注释去掉
在这里插入图片描述

5.启动clickhouse

sudo systemctl start clickhouse-server
链接测试
clickhouse-client -m
在这里插入图片描述

6.创建需要的数据库和表

create database ad_report;

use ad_report;

create table if not exists dwd_ad_event_inc
(
    event_time             Int64 comment '事件时间',
    event_type             String comment '事件类型',
    ad_id                  String comment '广告id',
    ad_name                String comment '广告名称',
    ad_product_id          String comment '广告产品id',
    ad_product_name        String comment '广告产品名称',
    ad_product_price       Decimal(16, 2) comment '广告产品价格',
    ad_material_id         String comment '广告素材id',
    ad_material_url        String comment '广告素材url',
    ad_group_id            String comment '广告组id',
    platform_id            String comment '推广平台id',
    platform_name_en       String comment '推广平台名称(英文)',
    platform_name_zh       String comment '推广平台名称(中文)',
    client_country         String comment '客户端所处国家',
    client_area            String comment '客户端所处地区',
    client_province        String comment '客户端所处省份',
    client_city            String comment '客户端所处城市',
    client_ip              String comment '客户端ip地址',
    client_device_id       String comment '客户端设备id',
    client_os_type         String comment '客户端操作系统类型',
    client_os_version      String comment '客户端操作系统版本',
    client_browser_type    String comment '客户端浏览器类型',
    client_browser_version String comment '客户端浏览器版本',
    client_user_agent      String comment '客户端UA',
    is_invalid_traffic     UInt8 comment '是否是异常流量'
) ENGINE = MergeTree()
      ORDER BY (event_time, ad_name, event_type, client_province, client_city, client_os_type,
                client_browser_type, is_invalid_traffic);

7.Hive数据导出至Clickhouse

1.新建IDEA工程

在这里插入图片描述

2.pom依赖

<dependencies>

        <!-- 引入mysql驱动,目的是访问hive的metastore元数据-->
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>8.0.33</version>
        </dependency>

        <!-- 引入spark-hive模块-->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-hive_2.12</artifactId>
            <version>3.3.1</version>
            <scope>provided</scope>
        </dependency>

        <!--引入clickhouse-jdbc驱动,为解决依赖冲突,需排除jackson的两个依赖-->
        <dependency>
            <groupId>ru.yandex.clickhouse</groupId>
            <artifactId>clickhouse-jdbc</artifactId>
            <version>0.2.4</version>
            <exclusions>
                <exclusion>
                    <artifactId>jackson-databind</artifactId>
                    <groupId>com.fasterxml.jackson.core</groupId>
                </exclusion>
                <exclusion>
                    <artifactId>jackson-core</artifactId>
                    <groupId>com.fasterxml.jackson.core</groupId>
                </exclusion>
            </exclusions>
        </dependency>

        <!-- 引入commons-cli,目的是方便处理程序的输入参数 -->
        <dependency>
            <groupId>commons-cli</groupId>
            <artifactId>commons-cli</artifactId>
            <version>1.2</version>
        </dependency>

    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-assembly-plugin</artifactId>
                <version>3.0.0</version>
                <configuration>
                    <!--将依赖编译到jar包中-->
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                </configuration>
                <executions>
                    <!--配置执行器-->
                    <execution>
                        <id>make-assembly</id>
                        <!--绑定到package执行周期上-->
                        <phase>package</phase>
                        <goals>
                            <!--只运行一次-->
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

3.HiveToClickhouse.java

package com.atguigu.ad.spark;

import org.apache.commons.cli.*;

import org.apache.spark.SparkConf;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;

public class HiveToClickhouse {
    public static void main(String[] args) {
        // 使用common-cli处理传入参数
        // 1 定义能够传入那些参数
        Options options = new Options();
        options.addOption(OptionBuilder.withLongOpt("hive_db").withDescription("hive数据库名称(required)").hasArg(true).isRequired(true).create());
        options.addOption(OptionBuilder.withLongOpt("hive_table").withDescription("hive表名称(required)").hasArg(true).isRequired(true).create());
        options.addOption(OptionBuilder.withLongOpt("hive_partition").withDescription("hive分区(required)").hasArg(true).isRequired(true).create());
        options.addOption(OptionBuilder.withLongOpt("ck_url").withDescription("clickhouse的jdbc url(required)").hasArg(true).isRequired(true).create());
        options.addOption(OptionBuilder.withLongOpt("ck_table").withDescription("clickhouse表名称(required)").hasArg(true).isRequired(true).create());
        options.addOption(OptionBuilder.withLongOpt("batch_size").withDescription("数据写入clickhouse时的批次大小(required)").hasArg(true).isRequired(true).create());
        // 解析参数

        GnuParser gnuParser = new GnuParser();
        CommandLine cmd = null;
        try {
            cmd = gnuParser.parse(options, args);
        } catch (ParseException e) {
            e.printStackTrace();
            return;
        }


        // 创建sparksql环境
        SparkConf conf = new SparkConf().setAppName("HiveToClickhouse");

        SparkSession sparkSession = SparkSession.builder()
                .enableHiveSupport()
                .config(conf)
                .getOrCreate();

        // 读取hive中的数据
        sparkSession.sql("set spark.sql.parser.quotedRegexColumnNames=true");

        Dataset<Row> dataset = sparkSession
                .sql(""+"select `(dt)?+.+` from " + cmd.getOptionValue("hive_db") + "." + cmd.getOptionValue("hive_table") + " where dt= '" + cmd.getOptionValue("hive_partition")+"'");
        dataset.show();
        // 写入到clickHouse中
        dataset.write().mode("append")
                .format("jdbc")
                .option("url", cmd.getOptionValue("ck_url"))
                .option("driver", "ru.yandex.clickhouse.ClickHouseDriver")
                .option("dbtable", cmd.getOptionValue("ck_table"))
                .option("batch_size", cmd.getOptionValue("batch_size"))
                .save();

        sparkSession.close();
    }
}

4.上传hive-site.xml

在这里插入图片描述

在这里插入图片描述

5.打包编译

在这里插入图片描述
在这里插入图片描述

二、DolphinScheduler安装

1.软件下载

阿里源下载
版本号2.0.8
然后解压即可

2.编写部署脚本

在这里插入图片描述

vim conf/config/install_config.conf

这里说一下要修改的地方

ips="hadoop102,hadoop103,hadoop104" 
# 将要部署任一 DolphinScheduler 服务的服务器主机名或 ip 列表

masters="hadoop102" 
# master 所在主机名列表,必须是 ips 的子集

workers="hadoop102:default,hadoop103:default,hadoop104:default" 
# worker主机名及队列,此处的 ip 必须在 ips 列表中

alertServer="hadoop102"
# 告警服务所在服务器主机名

apiServers="hadoop102"
# api服务所在服务器主机名

# pythonGatewayServers="ds1" 
# 不需要的配置项,可以保留默认值,也可以用 # 注释

installPath="/opt/module/dolphinscheduler"
# DS 安装路径,如果不存在会创建

deployUser="atguigu"
# 部署用户,任务执行服务是以 sudo -u {linux-user} 切换不同 Linux 用户的方式来实现多租户运行作业,因此该用户必须有免密的 sudo 权限。

javaHome="/opt/module/jdk8u282-b08"
# JAVA_HOME 路径

# 数据库信息
DATABASE_TYPE=mysql

SPRING_DATASOURCE_URL=${SPRING_DATASOURCE_URL:-"jdbc:mysql://hadoop102:3306/dolphinscheduler?useUnicode=true&allowPublicKeyRetrieval=true&characterEncoding=UTF-8"}

# Spring datasource username
SPRING_DATASOURCE_USERNAME=${SPRING_DATASOURCE_USERNAME:-"dolphinscheduler"}

# Spring datasource password
SPRING_DATASOURCE_PASSWORD=${SPRING_DATASOURCE_PASSWORD:-"dolphinscheduler"}

registryServers="hadoop102:2181,hadoop103:2181,hadoop104:2181"
# 注册中心地址,即 Zookeeper 集群的地址

registryNamespace="dolphinscheduler"
# DS 在 Zookeeper 的结点名称

resourceStorageType="HDFS"	
# 资源存储类型

resourceUploadPath="/dolphinscheduler"
# 资源上传路径

defaultFS="hdfs://hadoop102:8020"
# 默认文件系统

yarnHaIps=
# Yarn RM 高可用 ip,若未启用 RM 高可用,则将该值置空

singleYarnIp="hadoop103"
# Yarn RM 主机名,若启用了 HA 或未启用 RM,保留默认值

hdfsRootUser="atguigu"
# 拥有 HDFS 根目录操作权限的用户

3.创建元数据库及用户

先登录Mysql

CREATE DATABASE dolphinscheduler DEFAULT CHARACTER SET utf8 DEFAULT COLLATE utf8_general_ci;

CREATE USER 'dolphinscheduler'@'%' IDENTIFIED BY 'dolphinscheduler';

GRANT ALL PRIVILEGES ON dolphinscheduler.* TO 'dolphinscheduler'@'%';

flush privileges;

将连接Mysql的Jar拷入lib目录
在这里插入图片描述

4.运行脚本

由于dolphinscheduler依赖Hadoop和ZK所以要现将这两个应用都启动了。
在这里插入图片描述

 myhadoop.sh start
 zk.sh start
script/create-dolphinscheduler.sh
./install.sh 

在这里插入图片描述
初始化安装好后,会直接帮你启动。
用网页端查看以下
hadoop102:12345/dolphinscheduler
初始账号密码
admin
dolphinscheduler123
在这里插入图片描述
常用命令

./bin/start-all.sh
./bin/stop-all.sh
./bin/status-all.sh

三、全流程调度

1.dolphinscheduler启动

考虑到虚拟机的资源问题,我们选择测试使用的单机模式,会省一点资源

./bin/dolphinscheduler-daemon.sh start standalone-server

在这里插入图片描述
由于我们用的是同一个数据库,所以效果适合集群一样的。

2.环境准备

1.创建租户

在这里插入图片描述
注意要和Linux用户相同

2.创建用户

在这里插入图片描述
这里要选择我们刚刚创建的租户

3.创建环境

具体参数根据自己电脑修改

export HADOOP_HOME=/opt/module/hadoop
export HADOOP_CONF_DIR=/opt/module/hadoop/etc/hadoop
export SPARK_HOME=/opt/module/spark
export SPARK_HOME1=/opt/module/spark
export SPARK_HOME2=/opt/module/spark
export JAVA_HOME=/opt/module/jdk8u282-b
export HIVE_HOME=/opt/module/hive
export DATAX_HOME=/opt/module/datax

export PATH=$PATH:$HADOOP_HOME/bin:$SPARK_HOME/bin:$JAVA_HOME/bin:$HIVE_HOME/bin:$DATAX_HOME/bin

在这里插入图片描述

4.worker分组

在这里插入图片描述
因为咱们就一台几点跑DS,所以就写一个worker地址就行。

5.上传脚本

用普通用户上传,否则普通用户无法调用管理员用户上传的文件
在这里插入图片描述
创建两个文件夹,分别上传需要的脚本。
在这里插入图片描述
在这里插入图片描述

6.创建项目

在这里插入图片描述

7.定义工作流

在这里插入图片描述
名称随意

ad_mysql_to_hdfs_full
在这里插入图片描述
hdfs_to_ods在这里插入图片描述

ods_to_dim
在这里插入图片描述
ods_to_dwd
在这里插入图片描述

HiveToClickhouse
在这里插入图片描述
在这里插入图片描述

com.atguigu.ad.spark.HiveToClickhouse

--hive_db ad   \
--hive_table dwd_ad_event_inc \
--hive_partition ${dt}   \
--ck_url  jdbc:clickhouse://hadoop102:8123/ad_report   \
--ck_table dwd_ad_event_inc   \
--batch_size 1000

最后全局保存的时候定义全局变量
在这里插入图片描述

3.数据准备

准备数据可以先把DS关掉
启动需要的环境

1.配置数据日期

hadoop102和103修改

vim /opt/module/ad_mock/nginxLogGen.setting

在这里插入图片描述

2.启动环境

hadoop.sh start
zk.sh start
kf.sh start
ad_f1.sh start
ad_f2.sh start

3.生成数据

ad_mock.sh
等待一段时间后,查看HDFS
在这里插入图片描述
等待数据生成完成即可。

4.全流程调度

现在我们进行全流程调度,可以把两个Flume进程和Kafka进程关掉,然后启动hiveserver2来启动hive
由于我们是本地单节点运行,所以最后把日志一起监控一下,除了问题方便排错。

tail -f logs/dolphinscheduler-standalone-server-hadoop102.out

上线工作流
在这里插入图片描述
运行工作流
在这里插入图片描述
在这里插入图片描述
出错了可以检查日志
在这里插入图片描述
在这里插入图片描述
我么可以用dbeaver连接clickhouse查看一下


总结

全流程调度就到这里吧,可视化还要再来一次。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/677810.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

012-从零搭建微服务-接口文档(二)

写在最前 如果这个项目让你有所收获&#xff0c;记得 Star 关注哦&#xff0c;这对我是非常不错的鼓励与支持。 源码地址&#xff08;后端&#xff09;&#xff1a;https://gitee.com/csps/mingyue 源码地址&#xff08;前端&#xff09;&#xff1a;https://gitee.com/csps…

统一拦截--过滤器Filter

1.过滤器Filter 1. 概述 概念: Filter过滤器&#xff0c;是JavaWeb三大组件(Servlet、Filter、Listener)之一。过滤器可以把对资源的请求拦截下来&#xff0c;从而实现一些特殊的功能。过滤器一般完成一些通用的操作&#xff0c;比如:登录校验、统一编码处理、敏感字符处理等…

Tcp协议的十大特性详解+示例

前言 之前我们简单了解了一下Tcp是什么及它的套接字如何使用:基于UDP和TCP套接字实现简单的回显客户端服务器程序_Crystal_bit的博客-CSDN博客 因为要给大家介绍Tcp的十大特性&#xff0c;所以这里给出Tcp报头结构&#xff1a; 目录 1. 确认应答 2. 超时重传 3. 连接管理 3…

【Android复习笔记】Parcelable 为什么速度优于 Serializable ?

Q:Parcelable 为什么速度优于 Serializable ? 首先,抛开应用场景谈技术方案都是在耍流氓,所以如果你遇到有面试官问这样的题目本身就是在给面试者挖坑。 序列化 将实例的状态转换为可以存储或传输的形式的过程。 Serializable 实现方式: Serializable 是属于 Java 自带的…

Solid Converter PDF v10 安装及使用教程

目录 一、软件介绍二、下载教程三、安装教程四、使用教程1.PDF转Word、Html等2.合并PDF文件 一、软件介绍 Solid Converter PDF是一套专门将PDF文件转换成Word的软件。 能够将PDF转换为Word、Excel、HTML、PowerPoint、纯文本文件从PDF文档中提取数据并以CSV等格式保存能够转…

数仓工程师理解复杂业务的思考方法论

模型设计框架&#xff08;业务过程驱动&#xff09;还是在经典的三层数据模型架构下去进行&#xff0c;概念模型、逻辑模型、物理模型 首先概念模型其实是业务过程&#xff08;流程图&#xff09;&#xff0c;其中需要考虑到几个方面&#xff1a; 1.数据 业务覆盖 业务感知、…

循坏队列CircularQueue

前言 一、CircularQueue 二、特点 三、设计思路 1&#xff09;判空与判满 2&#xff09;链表还是数组实现&#xff1f; 四、实现 1).IsEmpty() 2).IsFull() 3)CircularQueueCreate创建 4&#xff09;CircularQueueEnQueue插入 5&#xff09;CircularQueueDeQueue删除 6&#xf…

React Hook之useCallback 性能优化

上文 对比之前的组件优化说明React.memo的作用我们说了 React.memo的妙用 但是 它却并非万能 我们来看这个情况 我们子组件代码编写如下 import React from "react";const ChildComponent ({ dom1funt }) > {console.log("ChildComponent 被重新渲染"…

规则引擎--规则集:规则集合的组织和执行

目录 回顾easy-rules的rules执行如何想规则集合的构造 规则集合定义普通规则集和执行定义树形规则集 当弄清楚了一个规则的设计和执行逻辑后&#xff0c;接下来需要考虑的就是许多的规则如何组织了&#xff0c;即规则集的抽象设计。 来看一些例子 回顾easy-rules的rules执行 …

NFCEE Discovery and Mode Set

10.1 NFCEE ID NFCC 动态为 NFCEE 分配 ID&#xff08;称为“NFCEE ID”&#xff09;。 DH 通过执行 NFCEE Discovery 来了解 ID 值。 在配置状态为 0x01 的 NFCC 重置之前&#xff0c;NFCEE ID 一直有效。 值为 0x00 的 ID 在本规范中称为 DH-NFCEE ID&#xff0c;并且应代表…

五、Docker本地镜像发布到阿里云/发布到私有库

目录 前言一、本地镜像发布到阿里云1.1 流程图1.2 注册阿里云创建容器服务个人实例1.3 创建命名空间1.4 创建镜像仓库1.5 将镜像推送到阿里云本地仓库 二、从阿里云仓库拉去自己推送的镜像三、本地镜像发布到阿里云总结四、本地镜像发布到私有库4.1 流程图4.2 下载镜像Docker R…

Shell编程从入门到实践——实践篇

欢迎关注 「Android茶话会」 回 「学习之路」 取Android技术路线经典电子书回 「pdf」 取阿里&字节经典面试题、Android、算法、Java等系列武功秘籍。回 「天涯」 取天涯论坛200精彩博文,包括小说、玄学等 背景 之前在搞一些CI/CD,使用到了shell脚本&#xff0c;shell的开…

nvdiffrec在Windows上的配置及使用

nvdiffrec是NVIDIA研究院开源的项目&#xff0c;源代码地址&#xff1a;https://github.com/NVlabs/nvdiffrec &#xff0c;论文为《Extracting Triangular 3D Models, Materials, and Lighting From Images》&#xff0c;从图像中提取三角形三维(三角网格)模型、空间变化的材质…

uni-app微信小程序获取手机号授权登录(复制即用,js完成敏感数据对称解密,无需走服务端处理)

目录 一、示例 二、具体实现说明 一、示例 获取到的手机号 二、具体实现说明 属性说明 属性名说明生效时机getphonenumber获取用户手机号回调open-type"getPhoneNumber" 按钮写法 <template><view class"login"><view class"content…

为什么要写这个带点玄幻气息的英语单词记忆博客

&#x1f31f;博主&#xff1a;命运之光 ☀️专栏&#xff1a;英之剑法&#x1f5e1; ❤️‍&#x1f525;专栏&#xff1a;英之试炼&#x1f525; ☀️博主的其他文章&#xff1a;点击进入博主的主页 &#x1f433; 开篇想说的话&#xff1a;开学就大三了&#xff0c;命运之光…

DMA详解及应用(嵌入式学习)

DMA 0. 前言1. DMA作用2. DMA特性3. DMA寄存器4. DMA的增量或者循环模式5. 练习 0. 前言 DMA&#xff08;Direct Memory Access&#xff0c;直接内存访问&#xff09;是一种计算机系统中用于高效地实现数据传输的技术。它允许数据在外设和内存之间直接传输&#xff0c;而无需C…

GEE:为每个对象(斑块/超像素)添加属性

作者:CSDN @ _养乐多_ 本文将介绍为每个对象(斑块/超像素)添加属性的代码。并举例将最近距离作为属性添加到每个对象(斑块/超像素)特征中。 结果如下图所示, 文章目录 一、代码二、代码链接一、代码 这段代码的目的是对动态世界土地覆盖图像进行分析,并提取出其中的目…

贪婪算法简介-数据结构和算法教程

贪婪算法是一种算法范例&#xff0c;它遵循在每个阶段进行局部最优选择的问题求解启发式&#xff0c;希望找到全局最优值。换句话说&#xff0c;贪婪算法在每一步都选择最好的可能选项&#xff0c;而不考虑该选择对未来步骤的影响。 当一个问题可以被划分成更小的子问题&#…

1.GPIO的工作原理

1.stm32引脚说明&#xff1a; 对于stm32f103zet6&#xff1a; 一共有7组io口&#xff1b;每组io口有16个io&#xff1b;一共有16*7112个io&#xff1b;分组情况为&#xff1a;GPIOA&#xff0c;GPIOB~GPIOG&#xff1b; 2.GPIO的基本结构&#xff1a; 3.GPIO的工作模式&…

C++入门:类和对象(后)

目录 前言&#xff1a; 一&#xff1a;static成员 (1)概念 (2)特性 (3)例子 二&#xff1a;explicit关键字 三&#xff1a;内部类 (1)概念 (2)特性 (3)实例 四&#xff1a;匿名对象 (1)概念 (2)特性 (3)实例 五&#xff1a;拷贝对象时的一些编译器优化 (1)引入 …