广告数仓:数仓搭建(二)

news2025/1/12 2:58:47

系列文章目录

广告数仓:采集通道创建
广告数仓:数仓搭建
广告数仓:数仓搭建(二)


文章目录

  • 系列文章目录
  • 前言
  • DWD层创建
    • 1.建表
      • 广告事件事实表
    • 2.数据装载
      • 初步解析日志
      • 解析IP和UA
      • 标注无效流量
      • 编写脚本
  • 总结


前言

这次我们完成数仓剩下的内容


DWD层创建

1.建表

广告事件事实表

drop table if exists dwd_ad_event_inc;
create external table if not exists dwd_ad_event_inc
(
    event_time             bigint comment '事件时间',
    event_type             string comment '事件类型',
    ad_id                  string comment '广告id',
    ad_name                string comment '广告名称',
    ad_product_id          string comment '广告商品id',
    ad_product_name        string comment '广告商品名称',
    ad_product_price       decimal(16, 2) comment '广告商品价格',
    ad_material_id         string comment '广告素材id',
    ad_material_url        string comment '广告素材地址',
    ad_group_id            string comment '广告组id',
    platform_id            string comment '推广平台id',
    platform_name_en       string comment '推广平台名称(英文)',
    platform_name_zh       string comment '推广平台名称(中文)',
    client_country         string comment '客户端所处国家',
    client_area            string comment '客户端所处地区',
    client_province        string comment '客户端所处省份',
    client_city            string comment '客户端所处城市',
    client_ip              string comment '客户端ip地址',
    client_device_id       string comment '客户端设备id',
    client_os_type         string comment '客户端操作系统类型',
    client_os_version      string comment '客户端操作系统版本',
    client_browser_type    string comment '客户端浏览器类型',
    client_browser_version string comment '客户端浏览器版本',
    client_user_agent      string comment '客户端UA',
    is_invalid_traffic     boolean comment '是否是异常流量'
) PARTITIONED BY (`dt` STRING)
    STORED AS ORC
    LOCATION '/warehouse/ad/dwd/dwd_ad_event_inc/'
    TBLPROPERTIES ('orc.compress' = 'snappy');

2.数据装载

初步解析日志

create temporary table coarse_parsed_log
as
select
    parse_url('http://www.example.com' || request_uri, 'QUERY', 't') event_time,
    split(parse_url('http://www.example.com' || request_uri, 'PATH'), '/')[3] event_type,
    parse_url('http://www.example.com' || request_uri, 'QUERY', 'id') ad_id,
    split(parse_url('http://www.example.com' || request_uri, 'PATH'), '/')[2] platform,
    parse_url('http://www.example.com' || request_uri, 'QUERY', 'ip') client_ip,
    reflect('java.net.URLDecoder', 'decode', parse_url('http://www.example.com'||request_uri,'QUERY','ua'), 'utf-8') client_ua,
    parse_url('http://www.example.com'||request_uri,'QUERY','os_type') client_os_type,
    parse_url('http://www.example.com'||request_uri,'QUERY','device_id') client_device_id
from ods_ad_log_inc
where dt='2023-01-07';

解析IP和UA

这里我要用idea编写hive的udf自定义类
为pom.xml添加依赖

<dependencies>
        <!-- hive-exec依赖无需打到jar包,故scope使用provided-->
        <dependency>
            <groupId>org.apache.hive</groupId>
            <artifactId>hive-exec</artifactId>
            <version>3.1.3</version>
            <scope>provided</scope>
        </dependency>

        <!-- ip地址库-->
        <dependency>
            <groupId>org.lionsoul</groupId>
            <artifactId>ip2region</artifactId>
            <version>2.7.0</version>
        </dependency>

        <dependency>
            <groupId>cn.hutool</groupId>
            <artifactId>hutool-http</artifactId>
            <version>5.8.18</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-assembly-plugin</artifactId>
                <version>3.0.0</version>
                <configuration>
                    <!--将依赖编译到jar包中-->
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                </configuration>
                <executions>
                    <!--配置执行器-->
                    <execution>
                        <id>make-assembly</id>
                        <!--绑定到package执行周期上-->
                        <phase>package</phase>
                        <goals>
                            <!--只运行一次-->
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

com/atguigu/ad/hive/udf/ParseIP.java

package com.atguigu.ad.hive.udf;


import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
import org.apache.hadoop.hive.serde2.objectinspector.ConstantObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
import org.apache.hadoop.io.IOUtils;
import org.lionsoul.ip2region.xdb.Searcher;

import java.io.ByteArrayOutputStream;
import java.util.ArrayList;


public class ParseIP extends GenericUDF {
    Searcher searcher = null;

    /**
     * 判断函数传入的参数个数以及类型 同时确定返回值类型
     *
     */
    @Override
    public ObjectInspector initialize(ObjectInspector[] arguments) throws UDFArgumentException {
        //传入参数的个数
        if (arguments.length != 2) {
            throw new UDFArgumentException("parseIP必须填写2个参数");
        }
        // 校验参数的类型
        ObjectInspector hdfsPathOI = arguments[0];
        if (hdfsPathOI.getCategory() != ObjectInspector.Category.PRIMITIVE) {
            throw new UDFArgumentException("parseIP第一个参数必须是基本数据类型");
        }

        PrimitiveObjectInspector hdfsPathOI1 = (PrimitiveObjectInspector) hdfsPathOI;
        if (hdfsPathOI1.getPrimitiveCategory() != PrimitiveObjectInspector.PrimitiveCategory.STRING) {
            throw new UDFArgumentException("parseIP第一个参数必须STRING类型");
        }

        ObjectInspector ipOI = arguments[1];
        if (ipOI.getCategory() != ObjectInspector.Category.PRIMITIVE) {
            throw new UDFArgumentException("parseIP第一个参数必须是基本数据类型");
        }

        PrimitiveObjectInspector ipOI1 = (PrimitiveObjectInspector) ipOI;
        if (ipOI1.getPrimitiveCategory() != PrimitiveObjectInspector.PrimitiveCategory.STRING) {
            throw new UDFArgumentException("parseIP第二个参数必须STRING类型");
        }

        // 读取ip静态库进入内存中
        //获取hdfsPath地址
        if (hdfsPathOI instanceof ConstantObjectInspector) {
            String hafsPath = ((ConstantObjectInspector) hdfsPathOI).getWritableConstantValue().toString();

            // 从hdfs读取静态库
            Path path = new Path(hafsPath);
            try {
                FileSystem fileSystem = FileSystem.get(new Configuration());
                FSDataInputStream inputStream = fileSystem.open(path);
                ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
                IOUtils.copyBytes(inputStream, byteArrayOutputStream, 1024);
                byte[] bytes = byteArrayOutputStream.toByteArray();

                //创建静态库,解析IP
                searcher = Searcher.newWithBuffer(bytes);

            } catch (Exception e) {
                e.printStackTrace();

            }
        }
        // 确定函数返回值的类型
        ArrayList<String> structFieldNames = new ArrayList<>();
        structFieldNames.add("country");
        structFieldNames.add("area");
        structFieldNames.add("province");
        structFieldNames.add("city");
        structFieldNames.add("isp");

        ArrayList<ObjectInspector> structFieldObjectInspectors = new ArrayList<>();
        structFieldObjectInspectors.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
        structFieldObjectInspectors.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
        structFieldObjectInspectors.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
        structFieldObjectInspectors.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
        structFieldObjectInspectors.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);

        return ObjectInspectorFactory.getStandardStructObjectInspector(structFieldNames, structFieldObjectInspectors);
    }

    /**
     * 处理数据
     *
     */
    @Override
    public Object evaluate(DeferredObject[] deferredObjects) throws HiveException {
        String ip = deferredObjects[1].get().toString();

        ArrayList<Object> result = new ArrayList<>();
        try {
            String search = searcher.search(ip);
            String[] split = search.split("\\|");
            result.add(split[0]);
            result.add(split[1]);
            result.add(split[2]);
            result.add(split[3]);
            result.add(split[4]);
        } catch (Exception e) {
            e.printStackTrace();

        }

        return result;
    }

    /**
     * 描述函数
     */
    @Override
    public String getDisplayString(String[] children) {
        return getStandardDisplayString("parseIP", children);
    }
}

com/atguigu/ad/hive/udf/ParseUA.java

package com.atguigu.ad.hive.udf;

import cn.hutool.http.useragent.UserAgent;
import cn.hutool.http.useragent.UserAgentUtil;
import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;

import java.util.ArrayList;

public class ParseUA extends GenericUDF {
    @Override
    public ObjectInspector initialize(ObjectInspector[] arguments) throws UDFArgumentException {
        //传入参数的个数
        if (arguments.length != 1) {
            throw new UDFArgumentException("parseIP必须填写1个参数");
        }
        // 校验参数的类型
        ObjectInspector uaOI = arguments[0];
        if (uaOI.getCategory() != ObjectInspector.Category.PRIMITIVE) {
            throw new UDFArgumentException("parseUA第一个参数必须是基本数据类型");
        }

        PrimitiveObjectInspector uaOI1 = (PrimitiveObjectInspector) uaOI;
        if (uaOI1.getPrimitiveCategory() != PrimitiveObjectInspector.PrimitiveCategory.STRING) {
            throw new UDFArgumentException("parseUA第一个参数必须STRING类型");
        }

        // 确定函数返回值的类型
        ArrayList<String> structFieldNames = new ArrayList<>();
        structFieldNames.add("browser");
        structFieldNames.add("browserVersion");
        structFieldNames.add("engine");
        structFieldNames.add("engineVersion");
        structFieldNames.add("os");
        structFieldNames.add("osVersion");
        structFieldNames.add("platform");
        structFieldNames.add("isMobile");

        ArrayList<ObjectInspector> structFieldObjectInspectors = new ArrayList<>();
        structFieldObjectInspectors.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
        structFieldObjectInspectors.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
        structFieldObjectInspectors.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
        structFieldObjectInspectors.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
        structFieldObjectInspectors.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
        structFieldObjectInspectors.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
        structFieldObjectInspectors.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
        structFieldObjectInspectors.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);

        return ObjectInspectorFactory.getStandardStructObjectInspector(structFieldNames, structFieldObjectInspectors);
    }

    @Override
    public Object evaluate(DeferredObject[] deferredObjects) throws HiveException {
        String ua = deferredObjects[0].get().toString();
        UserAgent parse = UserAgentUtil.parse(ua);
        ArrayList<Object> result = new ArrayList<>();
        result.add(parse.getBrowser().getName());
        result.add(parse.getVersion());
        result.add(parse.getEngine().getName());
        result.add(parse.getEngineVersion());
        result.add(parse.getOs().getName());
        result.add(parse.getOsVersion());
        result.add(parse.getPlatform().getName());
        result.add(parse.isMobile());
        return result;
    }

    @Override
    public String getDisplayString(String[] strings) {
        return getStandardDisplayString("parseUA", strings);
    }
}

打包上传到hadoop集群
上传到/user/hive/jars目录,没有就创建一个
在这里插入图片描述
ip2region.xdb到HDFS/ip2region/
这个文件可以自己生成 也可以用提供的
在这里插入图片描述
在hive中注册自定义函数

create function parse_ip
    as 'com.atguigu.ad.hive.udf.ParseIP'
    using jar 'hdfs://hadoop102:8020//user/hive/jars/ad_hive_udf-1.0-SNAPSHOT-jar-with-dependencies.jar';

create function parse_ua
    as 'com.atguigu.ad.hive.udf.ParseUA'
    using jar 'hdfs://hadoop102:8020//user/hive/jars/ad_hive_udf-1.0-SNAPSHOT-jar-with-dependencies.jar';

测试一下

select parse_ip("hdfs://hadoop102:8020/ip2region/ip2region.xdb","120.245.112.30")

select parse_ua("Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/109.0.0.0 Safari/537.36");

在这里插入图片描述
在这里插入图片描述
创建临时表

set hive.vectorized.execution.enabled=false;
create temporary table fine_parsed_log
as
select
    event_time,
    event_type,
    ad_id,
    platform,
    client_ip,
    client_ua,
    client_os_type,
    client_device_id,
    parse_ip('hdfs://hadoop102:8020/ip2region/ip2region.xdb',client_ip) region_struct,
    if(client_ua != '',parse_ua(client_ua),null) ua_struct
from coarse_parsed_log;

标注无效流量

1.根据已知爬虫列表进行判断
建表

drop table if exists dim_crawler_user_agent;
create external table if not exists dim_crawler_user_agent
(
    pattern       STRING comment '正则表达式',
    addition_date STRING comment '收录日期',
    url           STRING comment '爬虫官方url',
    instances     ARRAY<STRING> comment 'UA实例'
)
    STORED AS ORC
    LOCATION '/warehouse/ad/dim/dim_crawler_user_agent'
    TBLPROPERTIES ('orc.compress' = 'snappy');

创建过度表

create temporary table if not exists tmp_crawler_user_agent
(
    pattern       STRING comment '正则表达式',
    addition_date STRING comment '收录日期',
    url           STRING comment '爬虫官方url',
    instances     ARRAY<STRING> comment 'UA实例'
)
    ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.JsonSerDe'
    STORED AS TEXTFILE
    LOCATION '/warehouse/ad/tmp/tmp_crawler_user_agent';

上传数据
在这里插入图片描述

导入数据

insert overwrite table dim_crawler_user_agent select * from tmp_crawler_user_agent;

在这里插入图片描述
2.同一ip访问过快
5分钟内超过100次,SQL实现逻辑如下

create temporary table high_speed_ip
as
select
    distinct client_ip
from
(
    select
        event_time,
        client_ip,
        ad_id,
        count(1) over(partition by client_ip,ad_id order by cast(event_time as bigint) range between 300000 preceding and current row) event_count_last_5min
    from coarse_parsed_log
)t1
where event_count_last_5min>100;

3.同一ip固定周期访问
固定周期访问超过5次,SQL实现逻辑如下

create temporary table cycle_ip
as
select
    distinct client_ip
from
(
    select
        client_ip,
        ad_id,
        s
    from
    (
        select
            event_time,
            client_ip,
            ad_id,
            sum(num) over(partition by client_ip,ad_id order by event_time) s
        from
        (
            select
                event_time,
                client_ip,
                ad_id,
                time_diff,
                if(lag(time_diff,1,0) over(partition by client_ip,ad_id order by event_time)!=time_diff,1,0) num
            from
            (
                select
                    event_time,
                    client_ip,
                    ad_id,
                    lead(event_time,1,0) over(partition by client_ip,ad_id order by event_time)-event_time time_diff
                from coarse_parsed_log
            )t1
        )t2
    )t3
    group by client_ip,ad_id,s
    having count(*)>=5
)t4;

4.同一设备访问过快
5分钟内超过100次,SQL实现逻辑如下

create temporary table high_speed_device
as
select
    distinct client_device_id
from
(
    select
        event_time,
        client_device_id,
        ad_id,
        count(1) over(partition by client_device_id,ad_id order by cast(event_time as bigint) range between 300000 preceding and current row) event_count_last_5min
    from coarse_parsed_log
    where client_device_id != ''
)t1
where event_count_last_5min>100;

5.同一设备固定周期访问
固定周期访问超过5次。

create temporary table cycle_device
as
select
    distinct client_device_id
from
(
    select
        client_device_id,
        ad_id,
        s
    from
    (
        select
            event_time,
            client_device_id,
            ad_id,
            sum(num) over(partition by client_device_id,ad_id order by event_time) s
        from
        (
            select
                event_time,
                client_device_id,
                ad_id,
                time_diff,
                if(lag(time_diff,1,0) over(partition by client_device_id,ad_id order by event_time)!=time_diff,1,0) num
            from
            (
                select
                    event_time,
                    client_device_id,
                    ad_id,
                    lead(event_time,1,0) over(partition by client_device_id,ad_id order by event_time)-event_time time_diff
                from coarse_parsed_log
                where client_device_id != ''
            )t1
        )t2
    )t3
    group by client_device_id,ad_id,s
    having count(*)>=5
)t4;

6.标识异常流量并做维度退化

insert overwrite table dwd_ad_event_inc partition (dt='2023-01-07')
select
    event_time,
    event_type,
    event.ad_id,
    ad_name,
    product_id,
    product_name,
    product_price,
    material_id,
    material_url,
    group_id,
    plt.id,
    platform_name_en,
    platform_name_zh,
    region_struct.country,
    region_struct.area,
    region_struct.province,
    region_struct.city,
    event.client_ip,
    event.client_device_id,
    if(event.client_os_type!='',event.client_os_type,ua_struct.os),
    nvl(ua_struct.osVersion,''),
    nvl(ua_struct.browser,''),
    nvl(ua_struct.browserVersion,''),
    event.client_ua,
    if(coalesce(pattern,hsi.client_ip,ci.client_ip,hsd.client_device_id,cd.client_device_id) is not null,true,false)
from fine_parsed_log event
left join dim_crawler_user_agent crawler on event.client_ua regexp crawler.pattern
left join high_speed_ip hsi on event.client_ip = hsi.client_ip
left join cycle_ip ci on event.client_ip = ci.client_ip
left join high_speed_device hsd on event.client_device_id = hsd.client_device_id
left join cycle_device cd on event.client_device_id = cd.client_device_id
left join
(
    select
        ad_id,
        ad_name,
        product_id,
        product_name,
        product_price,
        material_id,
        material_url,
        group_id
    from dim_ads_info_full
    where dt='2023-01-07'
)ad
on event.ad_id=ad.ad_id
left join
(
    select
        id,
        platform_name_en,
        platform_name_zh
    from dim_platform_info_full
    where dt='2023-01-07'
)plt
on event.platform=plt.platform_name_en;

编写脚本

#!/bin/bash

APP=ad

# 如果是输入的日期按照取输入日期;如果没输入日期取当前时间的前一天
if [ -n "$2" ] ;then
    do_date=$2
else 
    do_date=`date -d "-1 day" +%F`
fi

dwd_ad_event_inc="
set hive.vectorized.execution.enabled=false;
--初步解析
create temporary table coarse_parsed_log
as
select
    parse_url('http://www.example.com' || request_uri, 'QUERY', 't') event_time,
    split(parse_url('http://www.example.com' || request_uri, 'PATH'), '/')[3] event_type,
    parse_url('http://www.example.com' || request_uri, 'QUERY', 'id') ad_id,
    split(parse_url('http://www.example.com' || request_uri, 'PATH'), '/')[2] platform,
    parse_url('http://www.example.com' || request_uri, 'QUERY', 'ip') client_ip,
    reflect('java.net.URLDecoder', 'decode', parse_url('http://www.example.com'||request_uri,'QUERY','ua'), 'utf-8') client_ua,
    parse_url('http://www.example.com'||request_uri,'QUERY','os_type') client_os_type,
    parse_url('http://www.example.com'||request_uri,'QUERY','device_id') client_device_id
from ${APP}.ods_ad_log_inc
where dt='$do_date';
--进一步解析ip和ua
create temporary table fine_parsed_log
as
select
    event_time,
    event_type,
    ad_id,
    platform,
    client_ip,
    client_ua,
    client_os_type,
    client_device_id,
    ${APP}.parse_ip('hdfs://hadoop102:8020/ip2region/ip2region.xdb',client_ip) region_struct,
    if(client_ua != '',${APP}.parse_ua(client_ua),null) ua_struct
from coarse_parsed_log;
--高速访问ip
create temporary table high_speed_ip
as
select
    distinct client_ip
from
(
    select
        event_time,
        client_ip,
        ad_id,
        count(1) over(partition by client_ip,ad_id order by cast(event_time as bigint) range between 300000 preceding and current row) event_count_last_5min
    from coarse_parsed_log
)t1
where event_count_last_5min>100;
--周期访问ip
create temporary table cycle_ip
as
select
    distinct client_ip
from
(
    select
        client_ip,
        ad_id,
        s
    from
    (
        select
            event_time,
            client_ip,
            ad_id,
            sum(num) over(partition by client_ip,ad_id order by event_time) s
        from
        (
            select
                event_time,
                client_ip,
                ad_id,
                time_diff,
                if(lag(time_diff,1,0) over(partition by client_ip,ad_id order by event_time)!=time_diff,1,0) num
            from
            (
                select
                    event_time,
                    client_ip,
                    ad_id,
                    lead(event_time,1,0) over(partition by client_ip,ad_id order by event_time)-event_time time_diff
                from coarse_parsed_log
            )t1
        )t2
    )t3
    group by client_ip,ad_id,s
    having count(*)>=5
)t4;
--高速访问设备
create temporary table high_speed_device
as
select
    distinct client_device_id
from
(
    select
        event_time,
        client_device_id,
        ad_id,
        count(1) over(partition by client_device_id,ad_id order by cast(event_time as bigint) range between 300000 preceding and current row) event_count_last_5min
    from coarse_parsed_log
    where client_device_id != ''
)t1
where event_count_last_5min>100;
--周期访问设备
create temporary table cycle_device
as
select
    distinct client_device_id
from
(
    select
        client_device_id,
        ad_id,
        s
    from
    (
        select
            event_time,
            client_device_id,
            ad_id,
            sum(num) over(partition by client_device_id,ad_id order by event_time) s
        from
        (
            select
                event_time,
                client_device_id,
                ad_id,
                time_diff,
                if(lag(time_diff,1,0) over(partition by client_device_id,ad_id order by event_time)!=time_diff,1,0) num
            from
            (
                select
                    event_time,
                    client_device_id,
                    ad_id,
                    lead(event_time,1,0) over(partition by client_device_id,ad_id order by event_time)-event_time time_diff
                from coarse_parsed_log
                where client_device_id != ''
            )t1
        )t2
    )t3
    group by client_device_id,ad_id,s
    having count(*)>=5
)t4;
--维度退化
insert overwrite table ${APP}.dwd_ad_event_inc partition (dt='$do_date')
select
    event_time,
    event_type,
    event.ad_id,
    ad_name,
    product_id,
    product_name,
    product_price,
    material_id,
    material_url,
    group_id,
    plt.id,
    platform_name_en,
    platform_name_zh,
    region_struct.country,
    region_struct.area,
    region_struct.province,
    region_struct.city,
    event.client_ip,
    event.client_device_id,
    if(event.client_os_type!='',event.client_os_type,ua_struct.os),
    nvl(ua_struct.osVersion,''),
    nvl(ua_struct.browser,''),
    nvl(ua_struct.browserVersion,''),
    event.client_ua,
    if(coalesce(pattern,hsi.client_ip,ci.client_ip,hsd.client_device_id,cd.client_device_id) is not null,true,false)
from fine_parsed_log event
left join ${APP}.dim_crawler_user_agent crawler on event.client_ua regexp crawler.pattern
left join high_speed_ip hsi on event.client_ip = hsi.client_ip
left join cycle_ip ci on event.client_ip = ci.client_ip
left join high_speed_device hsd on event.client_device_id = hsd.client_device_id
left join cycle_device cd on event.client_device_id = cd.client_device_id
left join
(
    select
        ad_id,
        ad_name,
        product_id,
        product_name,
        product_price,
        material_id,
        material_url,
        group_id
    from ${APP}.dim_ads_info_full
    where dt='$do_date'
)ad
on event.ad_id=ad.ad_id
left join
(
    select
        id,
        platform_name_en,
        platform_name_zh`在这里插入代码片`
    from ${APP}.dim_platform_info_full
    where dt='$do_date'
)plt
on event.platform=plt.platform_name_en;
"

case $1 in
"dwd_ad_event_inc")
    hive -e "$dwd_ad_event_inc"
;;
"all")
    hive -e "$dwd_ad_event_inc"
;;
esac

添加权限测试一下
测试之前可以先关掉DataGrip节省一点内存,然后重启一下hiveserver2服务,清空之前的内存。

chmod +x ~/bin/ad_ods_to_dwd.sh
ad_ods_to_dwd.sh all 2023-01-07

由于每次调用需要创建多个临时表,所以时间会稍微长一点,大概几分钟。
在这里插入图片描述
在这里插入图片描述


总结

至此输仓搭建全部完成。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/651414.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Web服务器群集:Web基础与HTTP协议

目录 一、理论 1.Web基础 2.HTTP协议 二、实验 1.浏览本地HTML页面 三、总结 一、理论 1.Web基础 &#xff08;1&#xff09;域名和DNS ① 域名 网络是基于TCP/IP 协议进行通信和连接的&#xff0c;每一台主机都有一个唯一的标识&#xff08;固定的IP地 址&#xff0…

【Java面试】什么是SpringMVC?它的工作流程是什么样子的?

文章目录 什么是MVC&#xff1f;MVC组件组件前端控制器DispatcherServlet处理器映射器HandlerMapping处理器适配器HandlAdapter视图解析器ViewResolver处理器Handler视图View 工作原理具体执行流程 什么是MVC&#xff1f; M&#xff1a;model&#xff0c;模型层&#xff0c;包…

搅拌机打蒜机不转维修

打蒜机不转维修&#xff1a;打蒜机用的18650电池&#xff0c;霍尔传感器&#xff0c;Dp0206场效应管。故障为按一下开关显示红灯&#xff1a;电池电压低&#xff01;按下启动按钮电机动一下就再不动了。如果给电池两边加一个5伏电源&#xff0c;打蒜机电机运行正常。那么我把充…

【028】C++ 类和对象的 构造函数、析构函数、拷贝构造、初始化列表 详解(最全讲解)

C类和对象的构造函数、析构函数、拷贝构造、初始化列表详解 引言一、构造函数1.1、数据初始化和清理1.2、构造函数概述1.3、构造函数的定义1.4、提供构造函数的影响 二、析构函数三、拷贝构造函数3.1、拷贝构造的定义3.2、拷贝构造、无参构造、有参构造 三者的关系3.3、拷贝构造…

【群智能算法改进】一种改进的浣熊优化算法 改进长鼻浣熊优化算法 改进后的ICOA[1]算法【Matlab代码#41】

文章目录 【获取资源请见文章第5节&#xff1a;资源获取】1. 原始COA算法1.1 开发阶段1.2 探索阶段 2. 改进后的ICOA算法2.1 Circle映射种群初始化2.2 Levy飞行策略2.3 透镜成像折射反向学习策略 3. 部分代码展示4. 仿真结果展示5. 资源获取 【获取资源请见文章第5节&#xff1…

05-DataFrame的数据清洗

数据清洗 import pandas as pd df pd.read_excel("学生成绩.xlsx") df删除重复列 df.drop_duplicates(inplaceTrue) df删除数学成绩 df.drop([数学成绩],axis1, inplaceTrue) df重命名列名字 df.rename(columns{"生活":"自然"},inplaceTrue…

什么是Redis的BigKey,如何处理BigKey?

一、什么是BigKey BigKey通常以Key的大小和Key中成员的数量来综合判定&#xff0c;例如&#xff1a; Key本身的数据量过大&#xff1a;一个String类型的Key&#xff0c;它的值为5 MB。Key中的成员数过多&#xff1a;一个ZSET类型的Key&#xff0c;它的成员数量为10,000个。Ke…

Linux之多线程(下)——线程控制

文章目录 前言一、POSIX线程库1.概念2.pthread线程库是应用层的原生线程库3.错误的检查 二、线程控制1.创建线程——pthread_createpthread_create函数例子创建一个新线程主线程创建一批新线程 2.获取线程ID——pthread_self3.线程等待——pthread_join4.线程终止——return、p…

SSH服务器详解

文章目录 文字接口连接服务器&#xff1a;SSH服务器连接加密技术简介启动SSH服务SSH客户端连接程序SSH&#xff1a;直接登录远程主机的指令使用案例 服务器公钥记录文件&#xff1a;~/.ssh/known_hosts报错解决 模拟FTP的文件传输方式&#xff1a;SFTP使用案例 文件异地直接复制…

Python主要应用的10大领域你是否感兴趣

原文&#xff1a; Python主要应用的10大领域你是否感兴趣 Python 是一门快速发展的编程语言&#xff0c;其在各个领域的应用也在不断增加。根据 TIOBE 编程语言排行榜&#xff0c;Python 在 2021 年排名第 3&#xff0c;仅次于 Java 和 C。根据 Stack Overflow 的开发者调查报…

圆的基本性质

如何确定一个圆&#xff1f; 两个点&#xff1a; 无法确定一个圆&#xff0c;因为只要到这两个点距离相等的点都可以作为圆心&#xff08;在两个点连线的垂直平分线上&#xff09;&#xff0c;因此可以确定无数个圆 三个点&#xff08;且这三个点不能在同一个直线上&#xf…

【MySQL数据库】事务

事务 一、事务1.1事务的概念 二 、事务的ACID特点2.1原子性2.2一致性&#xff08;Consistency&#xff09;2.3隔离性2.4持久性 三、脏读、不可重复读、幻读、丢失更新3.1脏读3.2不可重复读3.3幻读3.4丢失更新 四、事务的隔离级别 一、事务 1.1事务的概念 事务是一种机制、一个…

二叉堆(大顶堆、小顶堆)学习(使用java手写)

二叉堆 我们现在有一个需求&#xff0c;用来存放整数&#xff0c;要求需要提供三个接口 添加元素获取最大值删除最大值 我们可以用我们熟悉的数据结构去解决这些问题 获取最大值删除最大值添加元素描述动态数组/双向链表O(n)O(n)O(1)O(n) 复杂度太高了&#xff08;有序&#x…

redis -- 持久化存储方案

前言 一般情况下&#xff0c;我们存储到redis的数据&#xff0c;是存储到内存中&#xff0c;再存储到硬盘中(这是基于reb方案来实现)因此一旦强制关机,就直接over了。 硬存和内存的区别和联系&#xff1a; 我们用文本编辑器&#xff0c;里面写入一段话&#xff0c;未保存&am…

检测PPG信号的心跳

基于大佬的代码。 PPG信号靠心率 (HR) 进行估计&#xff0c;主要取决于收缩压峰值检测的准确性。与 ECG 不同&#xff0c;PPG 信号形式简单和特定点 少。低振幅 PPG 信号更容易受到噪声污染和其他不良影响的影响&#xff0c;例如baseline drift和wandering。这是由于信号强度与…

从零开始理解Linux中断架构(3)--Armv8体系架构

首先让我们带着问题进入到armv8架构的学习中。linux中断代码分为两部分entry.S @arch\arm64\kernel\entry.S汇编部分和C代码后续处理。汇编代码中处理最为低级的部分,设置硬件中断向量表,保持当前上下文,切换中断堆栈等任务,这是就如我们嵌入式系统看到那样。 @arch\arm64…

Vue3中div自由拖拽宽度和高度。

Vue3中我们会遇到自由拖拽宽度和高度的页面需求&#xff0c;查看很多方法都无法满足当前需求。下面是我们Vue3版本的代码&#xff0c;非常简单主要构想说粗发拖拽方法&#xff0c;把所需要的div的高宽进行拖拽位置进行监听来加减自身div的px值。直接复制粘贴就可以实现效果。根…

20230615整理(字符设备驱动的内部实现)

1.1 字符设备&#xff1a; 以字节流的形式进行访问&#xff0c;而且只能顺序访问的设备叫做字符设备(比如键盘、鼠标) (块设备&#xff1a;有固定访问大小&#xff0c;可以不按顺序访问的设备&#xff0c;比如U盘、硬盘) 针对字符设备编写的驱动叫做字符设备驱动 1.2 当设备驱…

AI实战营:通用视觉框架OpenMMLab底层视觉与MMEditing

目录 图像超分辨率 Super Resolution ​​​ 深度学习时代的超分辨率算法 卷积网络模型SRCNN FSRCNN SRResNet Super-Resolution CNN, SRCNN, 2014 Fast SRCNN 2016 SRResNet 2016 对抗生成网络介绍Ganerative Adversarial Network 基于GAN的模型SRGAN与ESRGAN S…

vite+vue3+ts 报错和解决办法汇总

1. import path from path 时 ts 报错&#xff1a;模块 ""path"" 只能在使用 "allowSyntheticDefaultImports" 标志时进行默认导入。 在 tsconfig.node.json 文件的 compilerOptions 添加配置 "allowSyntheticDefaultImports": true …