项目介绍
该项目旨在将集群构建--数仓建模--数据计算通路进行模拟,以达到熟悉整个数据流程的效果。
该项目模拟浏览器后台数据集群身份,收集用户浏览器访问数据传入数据集群,并进行数仓建模,以此基础进行相关计算和看数。
该项目的主要目的是体验整个数据开发流程,故而深度一般,但是可以按照相同的方式自行拓展。
大致流程为:
- 使用python随机生成用户浏览器访问行为日志数据
- 使用flume监控日志文件夹,在拦截器筛选下将数据传入HDFS大数据集群
- 使用Hive进行数仓建模,主要为ODS、DWD、DWS、ADS
- 使用Spark进行简单的数据计算
前期准备
这里由于是个人项目,没有集群,解决方案是在linux上搭建三个虚拟机模拟集群
Hadoop搭建(linux):
Hadoop:HDFS--分布式文件存储系统_利用hadoop实现文件存储-CSDN博客
Hive:
Hadoop:YARN、MapReduce、Hive操作_hive yarn mapreduce-CSDN博客
Flume搭建:
Flume-CSDN博客
spark搭建:
PySpark(一)Spark原理介绍、PySpark初体验及原理_pyspark读取hdfs数据的原理-CSDN博客
数据生成
使用python随机生成用户及其浏览器访问行为日志:
import random
def is_private_ip(ip):
# 判断是否为私有IP地址
if ip.startswith("10.") or \
ip.startswith("172.") and 16 <= int(ip.split('.')[1]) < 32 or \
ip.startswith("192.168."):
return True
return False
def is_excluded_ip(ip, excluded_ips):
return ip in excluded_ips or is_private_ip(ip)
def generate_random_public_ip(excluded_ips):
while True:
ip_parts = [random.randint(1, 223) for _ in range(3)] + [random.randint(1, 254)]
ip_address = ".".join(map(str, ip_parts))
# 检查生成的IP是否为非私有且不在排除列表中
if not is_excluded_ip(ip_address, excluded_ips):
return ip_address
# 需要排除的DNS服务器地址列表
excluded_ip_address = ["8.8.8.8","1.1.1.1",'114.114.114.114','2.2.2.2']
import random
import time
import datetime
import random
import string
from faker import Faker
fa=Faker('zh_CN')
def generate_random_url(length=10):
# 定义可能的协议
protocols = ['http://', 'https://']
# 定义可能的顶级域名
top_level_domains = ['com', 'org', 'net', 'edu', 'gov', 'int']
# 定义可能的二级域名
second_level_domains = [f'{l1}.{l2}' for l1 in string.ascii_lowercase for l2 in top_level_domains]
# 随机选择协议
protocol = random.choice(protocols)
# 随机选择顶级域名
tld = random.choice(top_level_domains)
# 随机选择二级域名
sld = random.choice(second_level_domains)
# 随机生成路径和文件名
path = ''.join(random.choices(string.ascii_lowercase + string.digits, k=length))
# 拼接URL
url = f"{protocol}{sld}.{tld}/{path}"
return url
# 生成一个随机URL
def generate_random_string(length):
# 生成包含字母和数字的字符串
characters = string.ascii_letters + string.digits
# 使用随机选择函数,从字符集中选择一个字符,重复length次,并拼接成字符串
return ''.join(random.choice(characters) for _ in range(length))
import json
import logging
#随机生成用户
def creat_user(num):
logger = logging.getLogger()
logger.setLevel(logging.INFO)
location = 'D:/all_user.log'
file_handler = logging.FileHandler(location,encoding="utf-8")
file_handler.setLevel(logging.INFO)
logger.addHandler(file_handler)
for i in range(num):
id = i+1
sex = random.choice(["男","女"])
if sex == "男":
name = fa.name_male()
else:
name = fa.name_female()
email = fa.free_email()
number = fa.phone_number()
user_name = fa.user_name()
password = fa.password(length = random.randint(8,18),special_chars=random.choice([False,True]),upper_case=random.choice([False,True]),lower_case = random.choice([False,True]))
data = {
"user_id": id,
"name": name,
"sex": sex,
"email": email,
"number": number,
"user_name": user_name,
"password":password
}
json_data = json.dumps(data)
logger.info(json_data)
logger.handlers.clear()
# creat_user(100000)
with open('D:/all_user.log',"r") as file:
logs = file.readlines()
log_id = 0
for i in range(10):
print(i)
logger = logging.getLogger()
logger.setLevel(logging.INFO)
if i+1<10:
location = 'D:/behavior2024-04-0'+str(i+1)+'.log'
start_time = '2024-04-0' + str(i + 1) + ' 00:00:00'
end_time = '2024-04-0' + str(i + 1) + ' 23:59:59'
else:
location = 'D:/behavior2024-04-' + str(i + 1) + '.log'
start_time = '2024-04-' + str(i + 1) + ' 00:00:00'
end_time = '2024-04-' + str(i + 1) + ' 23:59:59'
file_handler = logging.FileHandler(location)
file_handler.setLevel(logging.INFO)
logger.addHandler(file_handler)
timestamp_start = time.mktime(time.strptime(start_time, "%Y-%m-%d %H:%M:%S"))
timestamp_end = time.mktime(time.strptime(end_time, "%Y-%m-%d %H:%M:%S"))
for j in range(100000):
log_id +=1
user_id = random.randint(1, 100000)-1
name = json.loads(logs[user_id])["name"]
sex = json.loads(logs[user_id])["sex"]
email = json.loads(logs[user_id])["email"]
number = json.loads(logs[user_id])["number"]
user_name = json.loads(logs[user_id])["user_name"]
password = json.loads(logs[user_id])["password"]
public_ip = fa.ipv4()
provice = fa.province()
city = fa.city()
type = random.choice(["4G","5G","Wifi"])
random_url = fa.url()
device = generate_random_string(32)
user_agent = fa.chrome()
timestamp = str(random.randint(int(timestamp_start), int(timestamp_end))) + '000'
data = {
"log_id":log_id,
"user_id": user_id+1,
"name": name,
"sex": sex,
"email": email,
"number": number,
"user_name": user_name,
"password":password,
"province":provice,
"city":city,
"public_ip":public_ip,
"url":random_url,
"type":type,
"device":device,
"user_agent":user_agent,
"timestamp":timestamp
}
json_data = json.dumps(data)
logger.info(json_data)
logger.handlers.clear()
用户文件和行为日志文件的格式皆为.log,存储形式为json
用户文件格式:
user_id : 1
name : "郑红霞"
sex : "女"
email : "junpeng@gmail.com"
number : "18884775689"
user_name : "gzou"
password : "FC93qrX1E3dHSOesX"
行为日志文件格式:
log_id : 1
user_id : 65500
name : "李云"
sex : "女"
email : "jingqiao@yahoo.com"
number : "13680104279"
user_name : "mingqiao"
password : "cb33svpoT7JL"
province : "山东省"
city : "宜都市"
public_ip : "136.155.128.188"
url : "http://www.xp.cn/"
type : "4G"
device : "OdOyfRHX4UBnZVwbPRRzuTjcwTABZbTC"
user_agent : "Mozilla/5.0 (Linux; Android 2.1) AppleWebKit/533.2 (KHTML, like Gecko) Chrome/26.0.878.0 Safari/533.2"
timestamp : "1711984700000"
共生成1万名用户,并围绕着1万个用户形成了10天的浏览器访问日志文件,每天的访问记录为10万行,共 100万条记录
flume监控
flume基础参照:
Flume-CSDN博客
上面使用python生成数据,位置在windows的d盘,为了方便,这里将d盘下的一个文件夹与linux虚拟机共享,共享文件夹为/mnt/hgfs/linux_share1,然后使用flume监控该文件夹。
拦截器
先配置Interceptor拦截器,在这个项目中配置了两个拦截器
1、lmx.interceptor
- 检查输入是否符合json格式:JSONUtils.isJSONValidate
- 检查json的字段是否符合需求:JSONUtils.isJSONcorrect
package lmx;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;
import java.nio.charset.StandardCharsets;
import java.util.Iterator;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class interceptor implements Interceptor {
private static final Logger LOG = LoggerFactory.getLogger(interceptor.class);
@Override
public void initialize() {}
@Override
public Event intercept(Event event) {
byte[] body = event.getBody();
String log = new String(body, StandardCharsets.UTF_8);
if (JSONUtils.isJSONValidate(log)) {
if (JSONUtils.isJSONcorrect(log)) {
return event;
} else {
return null;
}
} else {
return null;
}
}
@Override
public List<Event> intercept(List<Event> list) {
Iterator<Event> iterator = list.iterator();
while (iterator.hasNext()) {
Event next = iterator.next();
// LOG.info(new String(next.getBody()));
if (intercept(next) == null) {
iterator.remove();
}
}
return list;
}
@Override
public void close() {}
public static class Builder implements Interceptor.Builder {
@Override
public Interceptor build() {
return new interceptor();
}
@Override
public void configure(Context context) {
}
}
}
JSONUtils:
package lmx;
import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.JSONException;
import com.alibaba.fastjson2.JSONObject;
public class JSONUtils {
public static boolean isJSONValidate(String log){
try {
JSON.parse(log);
return true;
}catch (JSONException e){
return false;
}
}
public static boolean isJSONcorrect(String log){
JSONObject log_json = JSONObject.parseObject(log);
String[] keys = new String[]{"log_id","user_id","name","sex","device","email","number","user_name","password","province","city","public_ip","url","type","user_agent","timestamp"};
//这个地方不应该写死在代码里,可以搞一个表,再用java读取那个表进行维护
if (log_json == null){
return false;
}
if (log_json.size()!=keys.length){
return false;
}else{
Object value;
for(int i=0;i< keys.length;i++){
value = log_json.get(keys[i]);
if (value == null){
return false;
}
}
return true;
}
}
}
2、lmx.TimeStampInterceptor:
按照json中的时间戳给传入的文件打上时间戳,这样在flume读取文件到hdfs时可以根据头文件(head)中的时间戳设置分区
package lmx;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.JSONException;
import com.alibaba.fastjson2.JSONObject;
public class TimeStampInterceptor implements Interceptor {
private ArrayList<Event> my_events = new ArrayList<>();
@Override
public void initialize() {}
@Override
public Event intercept(Event event) {
Map<String, String> headers = event.getHeaders();
String log = new String(event.getBody(), StandardCharsets.UTF_8);
JSONObject jsonObject = JSONObject.parseObject(log);
String ts = jsonObject.getString("timestamp");
headers.put("timestamp", ts);
return event;
}
@Override
public List<Event> intercept(List<Event> events) {
my_events.clear();
for (Event event : events) {
my_events.add(intercept(event));
}
return my_events;
}
@Override
public void close() {}
public static class Builder implements Interceptor.Builder {
@Override
public Interceptor build() {
return new TimeStampInterceptor();
}
@Override
public void configure(Context context) {
}
}
}
配置文件
主要就是设置了拦截器以及输入传出的地址
- 监控位置:/mnt/hgfs/linux_share1
- 拦截器:lmx.interceptor、lmx.TimeStampInterceptor
- 输出位置:hdfs://node1:8020/Project/%Y-%m-%d(时间戳来自于拦截器给予头文件中的时间戳)
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = spooldir
a1.sources.r1.spoolDir =/mnt/hgfs/linux_share1
a1.sources.r1.fileSuffix = .COMPLETED
a1.sources.r1.fileHeader = true
# #忽略所有以.tmp 结尾的文件,不上传
a1.sources.r1.ignorePattern = ([^ ]*\.tmp)
#配置拦截器
a1.sources.r1.interceptors = i1 i2
a1.sources.r1.interceptors.i1.type = lmx.interceptor$Builder
a1.sources.r1.interceptors.i2.type = lmx.TimeStampInterceptor$Builder
# Describe the sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path =hdfs://node1:8020/Project/%Y-%m-%d
a1.sinks.k1.hdfs.filePrefix = logs-
a1.sinks.k1.hdfs.round = false
a1.sinks.k1.hdfs.fileType = DataStream
a1.sinks.k1.hdfs.rollInterval = 10
a1.sinks.k1.hdfs.rollSize = 134217700
a1.sinks.k1.hdfs.rollCount = 0
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
开启flume
bin/flume-ng agent -c conf/ -n a1 -f job/flume-netcat-logger.conf -Dflume.root.logger=INFO,console
在windows文件夹下,传输完成的文件会被打上.completed标签
此时在hdfs中就会产生类似下面这样的数据
点进去就是文件的block块,副本数为3
数仓建设
这一章节不涉及理论讲解,按照标准大数据建模方式进行
ODS层
建立hive外部表,以dt分区,这里设计ODS表跟源文件一样,只有一个字段,放置原始的json
create external table ods_Browser_behavior_log
(
line STRING
)partitioned by (dt string)
location '/Browser_behavior/ods/ods_Browser_behavior_log';
然后需要将hdfs中的文件载入到hive中:
load data inpath '/Project/2024-04-13' into table ods_Browser_behavior_log partition (dt='2024-04-13');
此处有多个文件,一个一个导入太麻烦,可以写一个linux脚本:
- 脚本名字:origin_to_ods_init_behavior_log.sh
- 输入:start_date 、end_date
#!/bin/bash
if [ $# -ne 2 ]; then
echo "useage origin_to_ods_init_behavior_log.sh start_date end_date"
exit
fi
EXPORT_START_DATE=$1
EXPORT_END_DATE=$2
i=$EXPORT_START_DATE
while [[ $i < `date -d "+1 day $EXPORT_END_DATE" +%Y-%m-%d` ]]
do
SQL="load data inpath '/Project/$i' into table Browser_behavior.ods_Browser_behavior_log partition(dt='$i');"
bin/hive -e "$SQL"
i=`date -d "+1 day $i" +%Y-%m-%d`
done
数据预览:
在hdfs中的文件:
DWD层
对ODS数据做初步处理,粒度与ODS层一样,但是将数据展开为一般形式
- 对于json数据,按照key展开即可
- 对于user_agent数据,我们主要关注用户使用的系统和浏览器版本
- 对与email数据,后期业务可能需要邮箱类型进行用户画像,这里将邮箱种类进行提取
user_agent : "Mozilla/5.0 (Linux; Android 2.1) AppleWebKit/533.2 (KHTML, like Gecko) Chrome/26.0.878.0 Safari/533.2"
首先设计表结构:
CREATE EXTERNAL TABLE dwd_Browser_behavior_log
(
`log_id` bigint COMMENT 'log_id',
`user_id` bigint comment 'user_id',
`name` STRING COMMENT '姓名',
`sex` STRING COMMENT '性别',
`email` STRING COMMENT '注册邮箱',
`email_categroy` STRING COMMENT '注册邮箱',
`number` STRING COMMENT '注册电话',
`user_name` STRING COMMENT '用户名',
`password` STRING COMMENT '密码',
`province` STRING COMMENT '省份',
`city` STRING COMMENT '城市',
`url` STRING COMMENT '访问的资源路径',
`public_ip` STRING COMMENT 'ip',
`type` STRING COMMENT '访问类型',
`device` STRING COMMENT '设备id',
`user_agent` String comment "user_agent",
`ts` bigint comment "时间戳",
`system` String comment "设备系统",
`version` String comment "浏览器版本"
) COMMENT '页面启动日志表'
PARTITIONED BY (`dt` STRING)
STORED AS ORC
LOCATION '/Browser_behavior/dwd/dwd_Browser_behavior_log'
TBLPROPERTIES ("orc.compress" = "snappy");
设计UDF函数提取user_agent中的系统和浏览器版本:
package lmx;
import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.exec.UDFArgumentLengthException;
import org.apache.hadoop.hive.ql.exec.UDFArgumentTypeException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
import org.apache.hadoop.io.Text;
import java.nio.charset.StandardCharsets;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
public class User_agent_transfer extends GenericUDF {
@Override
public Object evaluate(DeferredObject[] deferredObjects) throws HiveException {
if(deferredObjects[0].get() == null){
return "" ;
}
String system;
String safari_version;
Pattern pattern = Pattern.compile("(?<=\\().*?(?=;)");
String user_agent = deferredObjects[0].get().toString();
Matcher matcher = pattern.matcher(user_agent);
if( matcher.find() ){
system = matcher.group();
}
else{
system = "unknow_system";
}
// 拿到浏览器版本
safari_version = user_agent.substring(user_agent.length()-5);
return new Text((system+"_"+safari_version).getBytes(StandardCharsets.UTF_8));
}
}
使用UDF获取邮箱类型:
package lmx;
import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.exec.UDFArgumentLengthException;
import org.apache.hadoop.hive.ql.exec.UDFArgumentTypeException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
import org.apache.hadoop.io.Text;
import java.nio.charset.StandardCharsets;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
public class emal_category extends GenericUDF {
@Override
public Object evaluate(DeferredObject[] deferredObjects) throws HiveException {
if(deferredObjects[0].get() == null){
return "" ;
}
Pattern pattern = Pattern.compile("(?<=@).*?(?=\\.)");
String emial = deferredObjects[0].get().toString();
Matcher matcher = pattern.matcher(emial);
String email_category;
if( matcher.find() ){
email_category = matcher.group();
}
else{
email_category = "unknow_email_category";
}
return new Text(email_category.getBytes(StandardCharsets.UTF_8));
}
}
注意,一般来说,如果能使用sql解决的代码,尽量使用sql解决,因为在大数据场景下sql一般是更快的,当然,如果该代码逻辑的复用性很强也可以考虑沉淀为UDF
将数据导入dwd:
insert overwrite table dwd_Browser_behavior_log partition (dt)
select get_json_object(line, '$.log_id'),
get_json_object(line, '$.user_id'),
get_json_object(line, '$.name'),
get_json_object(line, '$.sex'),
get_json_object(line, '$.email'),
email_category(get_json_object(line, '$.email')),
get_json_object(line, '$.number'),
get_json_object(line, '$.user_name'),
get_json_object(line, '$.password'),
get_json_object(line, '$.province'),
get_json_object(line, '$.city'),
get_json_object(line, '$.url'),
get_json_object(line, '$.public_ip'),
get_json_object(line, '$.type'),
get_json_object(line, '$.device'),
get_json_object(line, '$.user_agent'),
get_json_object(line, '$.timestamp'),
split(user_agent_transfer(get_json_object(line, '$.user_agent')),"_")[0],
split(user_agent_transfer(get_json_object(line, '$.user_agent')),"_")[1],
dt
from ods_Browser_behavior_log;
数据预览:
在hdfs中的文件分布:
DWS层
DWS为初步汇总表,在这主要以用户为粒度进行初步汇总,可设计为宽表
面向浏览器浏览域,可以假定在某一场景下主要关注浏览次数指标,后期维度可能需要考虑用户、地区、系统、浏览器版本等
首先设计表结构:
CREATE EXTERNAL TABLE dws_Browser_behavior_user_behavior_cnt
(
`user_id` bigint comment 'user_id',
`sex` STRING COMMENT '性别',
`email_categroy` STRING COMMENT '邮箱类型',
`province` STRING COMMENT '省份',
`city` STRING comment "城市",
`type` STRING COMMENT '访问类型',
`system` STRING COMMENT '访问系统',
`version` STRING COMMENT '浏览器版本',
`cnt` STRING COMMENT '访问次数'
) COMMENT 'dws用户行为次数记录'
PARTITIONED BY (`dt` STRING)
STORED AS ORC
LOCATION '/Browser_behavior/dws/dws_Browser_behavior_user_behavior_cnt'
TBLPROPERTIES ("orc.compress" = "snappy");
插入数据
insert overwrite table dws_Browser_behavior_user_behavior_cnt partition (dt)
select user_id
,max(sex) as sex
,email_categroy
,province
,city
,type
,system
,version
,count(1) as cnt
,dt
from dwd_browser_behavior_log
group by user_id
,email_categroy
,province
,city
,type
,system
,version
,dt
;
数据预览:
ADS层
ADS层在开发中一般面向报表,需要对维度进行拓展分层
insert overwrite table ads_Browser_behavior_user_behavior_cnt partition (dt)
select
sex_explode.sex
,email_categroy_explode.email_categroy
,province_explode.province
,city_explode.city
,type_explode.type
,system_explode.system
,version_explode.version
,sum(a.cnt) as cnt
,count(distinct a.user_id) as uv
,'2024-04-01' as dt
from (
select user_id
, concat('all;', sex) as sex
, concat('all;', email_categroy) as email_categroy
, concat('all;', province) as province
, concat('all;', city) as city
, concat('all;', type) as type
, concat('all;', system) as system
, concat('all;', case
when cast(version as double) < 532 then "低级版本"
when cast(version as double) < 534 THEN "中级版本"
else "高级版本" end) as version
, cnt
from dws_Browser_behavior_user_behavior_cnt
where dt = '2024-04-01'
)a
lateral view explode(split(sex,';')) sex_explode as sex
lateral view explode(split(email_categroy,';')) email_categroy_explode as email_categroy
lateral view explode(split(province,';')) province_explode as province
lateral view explode(split(city,';')) city_explode as city
lateral view explode(split(type,';')) type_explode as type
lateral view explode(split(system,';')) system_explode as system
lateral view explode(split(version,';')) version_explode as version
group by
sex_explode.sex
,email_categroy_explode.email_categroy
,province_explode.province
,city_explode.city
,type_explode.type
,system_explode.system
,version_explode.version
;
Spark计算
PySpark(一)Spark原理介绍、PySpark初体验及原理_pyspark读取hdfs数据的原理-CSDN博客
PySpark(二)RDD基础、RDD常见算子_pyspark rdd算子-CSDN博客
PySpark(三)RDD持久化、共享变量、Spark内核制度,Spark Shuffle、Spark执行流程_pyspark storagelevel-CSDN博客
PySpark(四)PySpark SQL、Catalyst优化器、Spark SQL的执行流程、Spark新特性_pyspark catalyst-CSDN博客
可以使用spark对原始数据进行一些数据计算:
计算每天访问量大于1的用户的人数:
spark = SparkSession.builder.appName('lmx').master('local[*]').getOrCreate()
sc = spark.sparkContext
res = []
for i in range(9):
location = 'hdfs://node1:8020/Browser_behavior/ods/ods_Browser_behavior_log/dt=2024-04-0' + str(i+1)
rdd_set = sc.textFile(location).map(lambda x:json.loads(x)['user_id'])
rdd_user = rdd_set.map(lambda x:(x,1)).reduceByKey(lambda a,b:a+b).filter(lambda x:x[1]>1)
count = rdd_user.map(lambda x:(x,1)).count()
res.append(['2024-04-0'+str(i+1),count])
rdd_res = sc.parallelize(res)
df = spark.createDataFrame(rdd_res,schema=['dt','cnt'])
df.printSchema() # 打印表结构
df.show() # 打印表