广告数仓:采集通道创建

news2025/1/3 3:50:39

系列文章目录

广告数仓:采集通道创建


文章目录

  • 系列文章目录
  • 前言
  • 一、环境和模拟数据准备
    • 1.hadoop集群
    • 2.mysql安装
    • 3.生成曝光测试数据
  • 二、广告管理平台数据采集
    • 1.安装DataX
    • 2.上传脚本生成器
    • 3.生成传输脚本
    • 4.编写全量传输脚本
  • 三、曝光点击检测数据采集
    • 1.安装Zookeeper
    • 2.安装Kafka
    • 3.安装Flume
    • 4.Local Files->Flume->Kafka
      • 1.编写Flume配置文件
      • 2.功能测试
      • 3.编写启动脚本
    • 5.Kafka->Flume->HDFS
      • 1.编写拦截器
      • 2.编写配置文件
      • 3.测试功能
      • 4.编写启动脚本
  • 总结


前言

常用的大数据技术,基本都学完,啃个项目玩玩,这个项目来源于尚桂谷最新的广告数仓


一、环境和模拟数据准备

1.hadoop集群

集群搭建可以看我的hadoop专栏

这里不在演示了。
建议吧hadoop把hadoop版本换成3.3.4不然后期可能会有依赖错误(血的教训)
在这里插入图片描述

2.mysql安装

此次项目使用的是Mysql8,所有实验需要的软件可以在B站尚桂谷平台找到。
上传jar包
在这里插入图片描述
卸载可能影响的依赖

 sudo yum remove mysql-libs

安装需要的依赖

sudo yum install libaio autoconf

用安装脚本一键安装

sudo bash install_mysql.sh

安装完成后后远程连接工具测试一下。
由于mysql8新增了一个公钥验证,所以我们要修改一个默认参数
在这里插入图片描述

在这里插入图片描述
之后新建数据库,将我们的广告平台管理数据导入
在这里插入图片描述

在这里插入图片描述

3.生成曝光测试数据

上传数据模拟器文件
在这里插入图片描述
这里稍微修改一个配置文件,把刚刚的公钥检测加上
在这里插入图片描述

java -jar NginxDataGenerator-1.0-SNAPSHOT-jar-with-dependencies.jar

在这里插入图片描述
我门用hadoop102和hadoop103来生成曝光数据

scp -r ad_mock/ 192.168.10.103:/opt/module/ad_mock

创建一个生成脚本
在这里插入图片描述

#!/bin/bash

for i in hadoop102 hadoop103
do
echo "========== $i =========="
        ssh $i "cd /opt/module/ad_mock ; java -jar /opt/module/ad_mock/NginxDataGenerator-1.0-SNAPSHOT-jar-with-dependencies.jar >/dev/null  2>&1 &"
done

二、广告管理平台数据采集

1.安装DataX

datax是阿里的一个开源工具,作用和sqoop差不多,主要是用来在不同数据库之间传输数据
github官方下载链接
也可以用资料里给的
在这里插入图片描述
直接解压即可
然后执行自带的一个测试脚本
在这里插入图片描述

python /opt/module/datax/bin/datax.py /opt/module/datax/job/job.json

在这里插入图片描述

2.上传脚本生成器

在这里插入图片描述
按需要修改一下脚本
在这里插入图片描述

mysql.username=root
mysql.password=000000
mysql.host=hadoop102
mysql.port=3306
hdfs.uri=hdfs://hadoop102:8020
is.seperated.tables=0

mysql.database.import=ad
mysql.tables.import=
import_out_dir=/opt/module/datax/job/import

3.生成传输脚本

java -jar datax-config-generator-1.0-SNAPSHOT-jar-with-dependencies.jar

在这里插入图片描述

4.编写全量传输脚本

vim ~/bin/ad_mysql_to_hdfs_full.sh
#!/bin/bash

DATAX_HOME=/opt/module/datax

# 如果传入日期则do_date等于传入的日期,否则等于前一天日期
if [ -n "$2" ] ;then
    do_date=$2
else
    do_date=`date -d "-1 day" +%F`
fi

#处理目标路径,此处的处理逻辑是,如果目标路径不存在,则创建;若存在,则清空,目的是保证同步任务可重复执行
handle_targetdir() {
  hadoop fs -test -e $1
  if [[ $? -eq 1 ]]; then
    echo "路径$1不存在,正在创建......"
    hadoop fs -mkdir -p $1
  else
    echo "路径$1已经存在"
    fs_count=$(hadoop fs -count $1)
    content_size=$(echo $fs_count | awk '{print $3}')
    if [[ $content_size -eq 0 ]]; then
      echo "路径$1为空"
    else
      echo "路径$1不为空,正在清空......"
      hadoop fs -rm -r -f $1/*
    fi
  fi
}

#数据同步
#参数:arg1-datax 配置文件路径;arg2-源数据所在路径
import_data() {
  handle_targetdir $2
  python $DATAX_HOME/bin/datax.py -p"-Dtargetdir=$2" $1
}

case $1 in
"product")
  import_data /opt/module/datax/job/import/ad.product.json /origin_data/ad/db/product_full/$do_date
  ;;
"ads")
  import_data /opt/module/datax/job/import/ad.ads.json /origin_data/ad/db/ads_full/$do_date
  ;;
"server_host")
  import_data /opt/module/datax/job/import/ad.server_host.json /origin_data/ad/db/server_host_full/$do_date
  ;;
"ads_platform")
  import_data /opt/module/datax/job/import/ad.ads_platform.json /origin_data/ad/db/ads_platform_full/$do_date
  ;;
"platform_info")
  import_data /opt/module/datax/job/import/ad.platform_info.json /origin_data/ad/db/platform_info_full/$do_date
  ;;
"all")
  import_data /opt/module/datax/job/import/ad.product.json /origin_data/ad/db/product_full/$do_date
  import_data /opt/module/datax/job/import/ad.ads.json /origin_data/ad/db/ads_full/$do_date
  import_data /opt/module/datax/job/import/ad.server_host.json /origin_data/ad/db/server_host_full/$do_date
  import_data /opt/module/datax/job/import/ad.ads_platform.json /origin_data/ad/db/ads_platform_full/$do_date
  import_data /opt/module/datax/job/import/ad.platform_info.json /origin_data/ad/db/platform_info_full/$do_date
  ;;
esac

赋予执行权限
然后启动hadoop集群
在这里插入图片描述
执行脚本

ad_mysql_to_hdfs_full.sh all 2023-01-07

在这里插入图片描述

三、曝光点击检测数据采集

1.安装Zookeeper

Zookeeper安装

2.安装Kafka

Kafka安装
kafka安装大部分都要把专栏方法相同,不同点是要添加一个配置参数。
在这里插入图片描述
每个节点都要做对应的修改

3.安装Flume

Flume安装
我们还需要修改一下日志文件
在这里插入图片描述

在这里插入图片描述
在这里插入图片描述
修改完成后分发即可。

4.Local Files->Flume->Kafka

1.编写Flume配置文件

在这里插入图片描述

#定义组件
a1.sources = r1
a1.channels = c1

#配置source
a1.sources.r1.type = TAILDIR
a1.sources.r1.filegroups = f1
a1.sources.r1.filegroups.f1 = /opt/module/ad_mock/log/.*
a1.sources.r1.positionFile = /opt/module/flume/taildir_position.json

#配置channel
a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel
a1.channels.c1.kafka.bootstrap.servers = hadoop102:9092
a1.channels.c1.kafka.topic = ad_log
a1.channels.c1.parseAsFlumeEvent = false

#组装 
a1.sources.r1.channels = c1

2.功能测试

我们先启动zookeeper和kafka
在这里插入图片描述
然后启动Flume进行日志采集

bin/flume-ng agent -n a1 -c conf/ -f job/ad_file_to_kafka.conf -Dflume.root.logger=info,console

在这里插入图片描述
然后在kafka启动一个ad_log消费者,如果没有他会自动创建

bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic ad_log

然后用ad_mock.sh生成数据,观察消费者是否能消费到数据。
在这里插入图片描述

3.编写启动脚本

因为ad_mock.sh在102和103同时生成数据,所以我们需要把102的配置文件同步过去。

scp -r job/ hadoop103:/opt/module/flume/

为flume编写一个启动脚本

vim ~/bin/ad_f1.sh
#!/bin/bash

case $1 in
"start"){
        for i in hadoop102 hadoop103
        do
                echo " --------启动 $i 采集flume-------"
                ssh $i "nohup /opt/module/flume/bin/flume-ng agent -n a1 -c /opt/module/flume/conf/ -f /opt/module/flume/job/ad_file_to_kafka.conf >/dev/null 2>&1 &"
        done
};; 
"stop"){
        for i in hadoop102 hadoop103
        do
                echo " --------停止 $i 采集flume-------"
                ssh $i "ps -ef | grep ad_file_to_kafka | grep -v grep |awk  '{print \$2}' | xargs -n1 kill -9 "
        done

};;
esac

编写之后启动它
在这里插入图片描述
再次启动kafka消费者,然后生成数据,如果仍然能出现数据,说明脚本没问题。

5.Kafka->Flume->HDFS

1.编写拦截器

用idea创建一个Maven工程,名字随意。
添加Maven依赖

<dependencies>
        <dependency>
            <groupId>org.apache.flume</groupId>
            <artifactId>flume-ng-core</artifactId>
            <version>1.11.0</version>
        </dependency>
</dependencies>

创建拦截器类
在这里插入图片描述
TimestampInterceptor.java

package com.atguigu.ad.flume.interceptor;

import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;

import java.nio.charset.StandardCharsets;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

public class TimestampInterceptor implements Interceptor {
    private Pattern pattern;

    @Override
    public void initialize() {
        pattern = Pattern.compile(".*t=(\\d{13}).*");

    }

    @Override
    public Event intercept(Event event) {
        // 提取数据中的时间戳,补充到header中
        Map<String, String> headers = event.getHeaders();
        String log = new String(event.getBody(), StandardCharsets.UTF_8);
        //去除日志中的双引号
        String sublog = log.substring(1, log.length() - 1);
        String result = sublog.replaceAll("\"\u0001\"", "\u0001");
        event.setBody(result.getBytes(StandardCharsets.UTF_8));

        // 正则匹配获取时间戳
        Matcher matcher = pattern.matcher(result);
        if (matcher.matches()) {
            String ts = matcher.group(1);
            headers.put("timestamp", ts);
        } else {
            return null;
        }

        return event;
    }

    @Override
    public List<Event> intercept(List<Event> events) {
        Iterator<Event> iterator = events.iterator();

        while (iterator.hasNext()){
            Event next = iterator.next();
            Event intercept = intercept(next);
            if (intercept == null) {
                iterator.remove();
            }
        }
        return events;
    }

    @Override
    public void close() {

    }

    public static class Builder implements Interceptor.Builder{

        @Override
        public Interceptor build() {
            return new TimestampInterceptor();
        }

        @Override
        public void configure(Context context) {

        }
    }
}

直接打包上传到104 flume/lib目录
在这里插入图片描述
在这里插入图片描述

2.编写配置文件

 vim job/ad_kafka_to_hdfs.conf 
#定义组件
a1.sources=r1
a1.channels=c1
a1.sinks=k1

#配置source1
a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.r1.batchSize = 5000
a1.sources.r1.batchDurationMillis = 2000
a1.sources.r1.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092,hadoop104:9092
a1.sources.r1.kafka.topics = ad_log
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = com.atguigu.ad.flume.interceptor.TimestampInterceptor$Builder

#配置channel
a1.channels.c1.type = file
a1.channels.c1.checkpointDir = /opt/module/flume/checkpoint/behavior1
a1.channels.c1.dataDirs = /opt/module/flume/data/behavior1
a1.channels.c1.maxFileSize = 2146435071
a1.channels.c1.capacity = 1000000
a1.channels.c1.keep-alive = 6

#配置sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /origin_data/ad/log/ad_log/%Y-%m-%d
a1.sinks.k1.hdfs.filePrefix = log
a1.sinks.k1.hdfs.round = false


a1.sinks.k1.hdfs.rollInterval = 10
a1.sinks.k1.hdfs.rollSize = 134217728
a1.sinks.k1.hdfs.rollCount = 0

#控制输出文件类型
a1.sinks.k1.hdfs.fileType = CompressedStream
a1.sinks.k1.hdfs.codeC = gzip

#组装 
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

3.测试功能

将hadoop,zookeeper,kafka和之前配置的flume脚本全部启动
在这里插入图片描述
在104开启Flume采集

bin/flume-ng agent -n a1 -c conf/ -f job/ad_kafka_to_hdfs.conf -Dflume.root.logger=info,console

用ad_mock.sh生成数据。
在这里插入图片描述

4.编写启动脚本

vim ~/bin/ad_f2.sh

#!/bin/bash

case $1 in
"start")
        echo " --------启动 hadoop104 日志数据flume-------"
        ssh hadoop104 "nohup /opt/module/flume/bin/flume-ng agent -n a1 -c /opt/module/flume/conf -f /opt/module/flume/job/ad_kafka_to_hdfs.conf >/dev/null 2>&1 &"
;;
"stop")

        echo " --------停止 hadoop104 日志数据flume-------"
        ssh hadoop104 "ps -ef | grep ad_kafka_to_hdfs | grep -v grep |awk '{print \$2}' | xargs -n1 kill"
;;
esac

最后启动脚本在测试一下
在这里插入图片描述


总结

采集通道的创建就到这里了。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/630398.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

CentOS6.10上离线安装ClickHouse19.9.5.36并修改默认数据存储目录

背景 在一台装有 CentOS6.10 操作系统的主机上安装 ClickHouse &#xff08;其实本来计划是先安装 Docker &#xff0c;然后在 Docker 中快速启动 ClickHouse 的&#xff0c;但是由于 CentOS6 对 Docker 支持不好&#xff0c;就直接在系统上装 ClickHouse 吧&#xff09;&…

jvm 命令和工具, jvm 堆 内存泄露 fullgc

目录 堆太大? 堆内存分析工具 MAT JProfiler ZProfiler - 线上的mat - 已进化为Grace EagleEye-MProf - 命令行 jhat jvisual 问题 w使用JProfiler和MAT打开内存超大的hprof文件时报错的解决方案_hprof太大_CoderBruis的博客-CSDN博客 很简单,把jvm参数调整下,设置小…

vue3---模板引用 nextTick

目录 模板引用--ref 访问模板引用 v-for 中的模板引用 函数模板引用 组件上的 ref 简单理解Vue中的nextTick 示例 二、应用场景 三、nextTick源码浅析 实战 --- vue3实现编辑与查看功能 模板引用--ref 虽然 Vue 的声明性渲染模型为你抽象了大部分对 DOM 的直接操作&…

TOF激光雷达告别“技术路线之争”

交流群 | 进“传感器群/滑板底盘群/汽车基础软件群/域控制器群”请扫描文末二维码&#xff0c;添加九章小助手&#xff0c;务必备注交流群名称 真实姓名 公司 职位&#xff08;不备注无法通过好友验证&#xff09; 编辑 | 苏清涛 两三年前&#xff0c;在提起激光雷达时&…

067:cesium flyto一个具体的实体位置

第067个 点击查看专栏目录 本示例的目的是介绍如何在vue+cesium中设置飞行定位功能,飞行到一个实体的区域。viewer.flyTo 函数接受实体、EntityCollection、DataSource、Cesium3DTilset 等。 直接复制下面的 vue+cesium源代码,操作2分钟即可运行实现效果. 文章目录 示例效果…

8.Nginx Rewrite

文章目录 Nginx Rewrite常用Nginx的正则表达式locationlocation大致可以分为三类location常用的匹配规则location优先级location示例说明实际网站使用中&#xff0c;至少有三个匹配规则定义 Rewriterewrite跳转实现rewrite执行顺序如下rewrite示例基于域名的跳转基于客户端IP访…

Git常用命令及基础操作

⭐作者介绍&#xff1a;大二本科网络工程专业在读&#xff0c;持续学习Java&#xff0c;努力输出优质文章 ⭐作者主页&#xff1a;逐梦苍穹 ⭐所属专栏&#xff1a;Git ⭐如果觉得文章写的不错&#xff0c;欢迎点个关注一键三连&#x1f609;有写的不好的地方也欢迎指正&#x…

Linux内核安全技术——磁盘加密技术概述和eCryptfs详解

一、概述 加密是最常见的数据安全保护技术&#xff0c;在数据生命周期各阶段均有应用。从应用场景和技术实现上&#xff0c;按加密对象、用户是否感知、加密算法等维度&#xff0c;有多种分类及对应方案&#xff0c;并在主流操作系统如Windows、Linux、Android中有广泛应用。 本…

【数据湖架构】Azure 数据湖分析(Azure Data Lake Analytics )概述

在本文中&#xff0c;我们将探索 Azure 数据湖分析并使用 U-SQL 查询数据。 Azure 数据湖分析 (ADLA) 简介 Microsoft Azure 平台支持 Hadoop、HDInsight、数据湖等大数据。通常&#xff0c;传统数据仓库存储来自各种数据源的数据&#xff0c;将数据转换为单一格式并进行分析以…

Ae 入门系列之十三:运动跟踪与稳定

运动跟踪&#xff0c;通过跟踪对象的运动&#xff0c;然后将跟踪数据应用到另一个对象&#xff0c;从而可创建图层或效果在其中跟随运动的合成。 稳定运动&#xff0c;同样须先跟踪&#xff0c;之后将跟踪数据反向运用到图层自身&#xff0c;从而达到稳定画面的效果。 跟踪与稳…

web应用常见7大安全漏洞,浅析产生的原因!

今天整理了关于web前端的干货知识&#xff0c;web应用常见的有哪些安全漏洞呢&#xff0c;这些漏洞产生的原因又是什么呢&#xff1f;这些问题你想过吗&#xff1f; 1.SQL 注入 SQL 注入就是通过给 web 应用接口传入一些特殊字符&#xff0c;达到欺骗服务器执行恶意的 SQL 命…

基数排序详解(Radix sort)

本文已收录于专栏 《算法合集》 目录 一、简单释义1、算法概念2、算法目的3、算法思想4、算法由来 二、核心思想三、图形展示1、宏观展示2、微观展示 四、算法实现1、实现思路2、代码实现3、运行结果 五、算法描述1、问题描述2、算法过程3、算法总结 六、算法分析1、时间复杂度…

创新指南|如何优化创新ROI? 亟需从双模创新衡量着手

不确定性和风险是创新投资的常态&#xff0c;这让企业领导者和创新团队面临着一个共同的挑战&#xff1a;如何衡量创新ROI&#xff1f;本文将探讨如何在高风险创新中实现回报&#xff0c;需要采用探索和开发的双模机制。在这个快速变化的市场中&#xff0c;企业创新为了实现可持…

rk3568 SD卡启动

rk3568 SD卡启动 SD卡启动系统&#xff0c;它可以让rk3568在没有硬盘或其他存储设备的情况下启动和运行操作系统。这使得rk3568变得与树梅派一样灵活切换系统&#xff0c;与此同时进行故障排查和修复&#xff0c;而不需要拆卸设备或者使用专业的烧录工具。SD卡启动还可以方便地…

Git 安装并初始化 + 官网下载速度太慢的问题

目录 1. 快速下载 2. 初始化 1. 快速下载 当你兴致勃勃地去官网下载 git 的时候&#xff0c;突然发现&#xff0c;嗯&#xff1f;&#xff1f;下载完成还需 9 个小时&#xff1f; 快速下载地址&#xff0c;请点这里&#xff01; 打开之后是这个样子&#xff1a; 我们可以自…

Rocketmq 一文带你搞懂rocketmq基础

1.集群架构 从上图可以看出来一共有4个部分&#xff0c;分别为Producer,Consumer,NameServer,Broker 1.1 NameServer集群 虽然说NameServer是一个集群&#xff0c;但是每一个NameServer是独立的&#xff0c;不会相互同步数据&#xff0c;因为每个节点都会保存完整的数据&#…

音质好的骨传导蓝牙耳机有哪些,十大公认音质好的骨传导耳机

​骨传导耳机是将声音转化为不同频率的机械振动&#xff0c;通过人的颅骨、骨迷路、内耳淋巴液、螺旋器、听觉中枢来传递声波。由于不需要像入耳式或入耳式耳机一样堵住耳朵来避免听力受损&#xff0c;也不会因为在听音乐的时候塞住耳朵而影响到旁边人的交流&#xff0c;所以骨…

LeetCode_Day5 | 有效的字母异位词、两个数组的交集、快乐数!

LeetCode_哈希表 242.有效的字母异位词1.题目描述2.题解 349.两个数组的交集1.题目描述2.题解 202.快乐数1.题目描述2.题解思路(官方题解啊&#xff01;看了好几遍真难) 算法代码实现复杂度分析 242.有效的字母异位词 1.题目描述 给定两个字符串 s 和 t &#xff0c;编写一个…

滑块验证码------啥?你居然还在手动滑动,你不来试试自动滑动吗

测试网站 测试网站:https://www.geetest.com/demo/slide-float.html 我的giteer:秦老大大 (qin-laoda) - Gitee.com里面有我写的代码 作者备注:由于我个人原因,文章写得感觉太长,后面我会把一个知识分成多部文章,这样可以简单明了的看到了 验证码的思路有两种:一种是通过se…

港科夜闻|香港科大取得重大科研突破,首度利用人工智能为阿尔兹海默症作早期风险预测...

关注并星标 每周阅读港科夜闻 建立新视野 开启新思维 1、香港科大取得重大科研突破&#xff0c;首度利用人工智能为阿尔兹海默症作早期风险预测。香港科大校长叶玉如教授及香港科大陈雷教授带领的研究团队&#xff0c;最近开发了一套人工智能模型&#xff0c;利用遗传信息&…