浏览器用户行为集群建设-数仓建模-数据计算

news2024/9/28 17:27:48

项目介绍

该项目旨在将集群构建--数仓建模--数据计算通路进行模拟,以达到熟悉整个数据流程的效果。

该项目模拟浏览器后台数据集群身份,收集用户浏览器访问数据传入数据集群,并进行数仓建模,以此基础进行相关计算和看数。

该项目的主要目的是体验整个数据开发流程,故而深度一般,但是可以按照相同的方式自行拓展。

大致流程为:

  1. 使用python随机生成用户浏览器访问行为日志数据
  2. 使用flume监控日志文件夹,在拦截器筛选下将数据传入HDFS大数据集群
  3. 使用Hive进行数仓建模,主要为ODS、DWD、DWS、ADS
  4. 使用Spark进行简单的数据计算

前期准备

这里由于是个人项目,没有集群,解决方案是在linux上搭建三个虚拟机模拟集群

Hadoop搭建(linux):

Hadoop:HDFS--分布式文件存储系统_利用hadoop实现文件存储-CSDN博客

Hive:

Hadoop:YARN、MapReduce、Hive操作_hive yarn mapreduce-CSDN博客

Flume搭建:

Flume-CSDN博客

spark搭建:

PySpark(一)Spark原理介绍、PySpark初体验及原理_pyspark读取hdfs数据的原理-CSDN博客

数据生成

使用python随机生成用户及其浏览器访问行为日志


import random

def is_private_ip(ip):
    # 判断是否为私有IP地址
    if ip.startswith("10.") or \
       ip.startswith("172.") and 16 <= int(ip.split('.')[1]) < 32 or \
       ip.startswith("192.168."):
        return True
    return False

def is_excluded_ip(ip, excluded_ips):
    return ip in excluded_ips or is_private_ip(ip)

def generate_random_public_ip(excluded_ips):
    while True:
        ip_parts = [random.randint(1, 223) for _ in range(3)] + [random.randint(1, 254)]
        ip_address = ".".join(map(str, ip_parts))

        # 检查生成的IP是否为非私有且不在排除列表中
        if not is_excluded_ip(ip_address, excluded_ips):
            return ip_address

# 需要排除的DNS服务器地址列表
excluded_ip_address = ["8.8.8.8","1.1.1.1",'114.114.114.114','2.2.2.2']


import random
import time
import datetime

import random
import string

from faker import Faker
fa=Faker('zh_CN')
def generate_random_url(length=10):
    # 定义可能的协议
    protocols = ['http://', 'https://']
    # 定义可能的顶级域名
    top_level_domains = ['com', 'org', 'net', 'edu', 'gov', 'int']
    # 定义可能的二级域名
    second_level_domains = [f'{l1}.{l2}' for l1 in string.ascii_lowercase for l2 in top_level_domains]

    # 随机选择协议
    protocol = random.choice(protocols)
    # 随机选择顶级域名
    tld = random.choice(top_level_domains)
    # 随机选择二级域名
    sld = random.choice(second_level_domains)
    # 随机生成路径和文件名
    path = ''.join(random.choices(string.ascii_lowercase + string.digits, k=length))

    # 拼接URL
    url = f"{protocol}{sld}.{tld}/{path}"
    return url


# 生成一个随机URL

def generate_random_string(length):
    # 生成包含字母和数字的字符串
    characters = string.ascii_letters + string.digits
    # 使用随机选择函数,从字符集中选择一个字符,重复length次,并拼接成字符串
    return ''.join(random.choice(characters) for _ in range(length))

import json
import logging

#随机生成用户
def creat_user(num):
    logger = logging.getLogger()
    logger.setLevel(logging.INFO)
    location = 'D:/all_user.log'
    file_handler = logging.FileHandler(location,encoding="utf-8")
    file_handler.setLevel(logging.INFO)
    logger.addHandler(file_handler)
    for i in range(num):
        id = i+1
        sex = random.choice(["男","女"])
        if sex == "男":
            name = fa.name_male()
        else:
            name = fa.name_female()
        email = fa.free_email()
        number = fa.phone_number()
        user_name = fa.user_name()
        password = fa.password(length = random.randint(8,18),special_chars=random.choice([False,True]),upper_case=random.choice([False,True]),lower_case = random.choice([False,True]))
        data = {
            "user_id": id,
            "name": name,
            "sex": sex,
            "email": email,
            "number": number,
            "user_name": user_name,
            "password":password
        }
        json_data = json.dumps(data)
        logger.info(json_data)
    logger.handlers.clear()

# creat_user(100000)
with open('D:/all_user.log',"r") as file:
    logs = file.readlines()

log_id = 0

for i in range(10):
    print(i)
    logger = logging.getLogger()
    logger.setLevel(logging.INFO)
    if i+1<10:
        location = 'D:/behavior2024-04-0'+str(i+1)+'.log'
        start_time = '2024-04-0' + str(i + 1) + ' 00:00:00'
        end_time = '2024-04-0' + str(i + 1) + ' 23:59:59'
    else:
        location = 'D:/behavior2024-04-' + str(i + 1) + '.log'
        start_time = '2024-04-' + str(i + 1) + ' 00:00:00'
        end_time = '2024-04-' + str(i + 1) + ' 23:59:59'
    file_handler = logging.FileHandler(location)
    file_handler.setLevel(logging.INFO)
    logger.addHandler(file_handler)


    timestamp_start = time.mktime(time.strptime(start_time, "%Y-%m-%d %H:%M:%S"))
    timestamp_end = time.mktime(time.strptime(end_time, "%Y-%m-%d %H:%M:%S"))


    for j in range(100000):
        log_id +=1
        user_id = random.randint(1, 100000)-1
        name = json.loads(logs[user_id])["name"]
        sex = json.loads(logs[user_id])["sex"]
        email = json.loads(logs[user_id])["email"]
        number = json.loads(logs[user_id])["number"]
        user_name = json.loads(logs[user_id])["user_name"]
        password = json.loads(logs[user_id])["password"]

        public_ip = fa.ipv4()
        provice = fa.province()
        city = fa.city()


        type = random.choice(["4G","5G","Wifi"])
        random_url = fa.url()
        device = generate_random_string(32)
        user_agent = fa.chrome()
        timestamp = str(random.randint(int(timestamp_start), int(timestamp_end))) + '000'
        data = {
            "log_id":log_id,
            "user_id": user_id+1,
            "name": name,
            "sex": sex,
            "email": email,
            "number": number,
            "user_name": user_name,
            "password":password,
            "province":provice,
            "city":city,
            "public_ip":public_ip,
            "url":random_url,
            "type":type,
            "device":device,
            "user_agent":user_agent,
            "timestamp":timestamp
        }
        json_data = json.dumps(data)
        logger.info(json_data)
    logger.handlers.clear()

用户文件和行为日志文件的格式皆为.log,存储形式为json

用户文件格式:

user_id : 1
name : "郑红霞"
sex : "女"
email : "junpeng@gmail.com"
number : "18884775689"
user_name : "gzou"
password : "FC93qrX1E3dHSOesX"

行为日志文件格式:

log_id : 1
user_id : 65500
name : "李云"
sex : "女"
email : "jingqiao@yahoo.com"
number : "13680104279"
user_name : "mingqiao"
password : "cb33svpoT7JL"
province : "山东省"
city : "宜都市"
public_ip : "136.155.128.188"
url : "http://www.xp.cn/"
type : "4G"
device : "OdOyfRHX4UBnZVwbPRRzuTjcwTABZbTC"
user_agent : "Mozilla/5.0 (Linux; Android 2.1) AppleWebKit/533.2 (KHTML, like Gecko) Chrome/26.0.878.0 Safari/533.2"
timestamp : "1711984700000"

共生成1万名用户,并围绕着1万个用户形成了10天的浏览器访问日志文件,每天的访问记录为10万行,共 100万条记录

flume监控

flume基础参照:

Flume-CSDN博客

上面使用python生成数据,位置在windows的d盘,为了方便,这里将d盘下的一个文件夹与linux虚拟机共享,共享文件夹为/mnt/hgfs/linux_share1,然后使用flume监控该文件夹。

拦截器

先配置Interceptor拦截器,在这个项目中配置了两个拦截器

1、lmx.interceptor

  • 检查输入是否符合json格式:JSONUtils.isJSONValidate
  • 检查json的字段是否符合需求:JSONUtils.isJSONcorrect
package lmx;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;
import java.nio.charset.StandardCharsets;
import java.util.Iterator;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class interceptor implements Interceptor {
    private static final Logger LOG = LoggerFactory.getLogger(interceptor.class);

    @Override
    public void initialize() {}

    @Override
    public Event intercept(Event event) {
        byte[] body = event.getBody();
        String log = new String(body, StandardCharsets.UTF_8);
        if (JSONUtils.isJSONValidate(log)) {
            if (JSONUtils.isJSONcorrect(log)) {
                return event;
            } else {
                return null;
            }
        } else {
            return null;
        }
    }

    @Override
    public List<Event> intercept(List<Event> list) {
        Iterator<Event> iterator = list.iterator();

        while (iterator.hasNext()) {
            Event next = iterator.next();
//            LOG.info(new String(next.getBody()));
            if (intercept(next) == null) {
                iterator.remove();
            }
        }
        return list;
    }

    @Override
    public void close() {}

    public static class Builder implements Interceptor.Builder {
        @Override
        public Interceptor build() {

            return new interceptor();
        }
        @Override
        public void configure(Context context) {

        }
    }
}

JSONUtils:

package lmx;

import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.JSONException;
import com.alibaba.fastjson2.JSONObject;

public class JSONUtils {
    public static boolean isJSONValidate(String log){
        try {
            JSON.parse(log);
            return true;
        }catch (JSONException e){
            return false;
        }
    }

    public static boolean isJSONcorrect(String log){
        JSONObject log_json = JSONObject.parseObject(log);
        String[] keys = new String[]{"log_id","user_id","name","sex","device","email","number","user_name","password","province","city","public_ip","url","type","user_agent","timestamp"};
//这个地方不应该写死在代码里,可以搞一个表,再用java读取那个表进行维护
        if (log_json == null){
            return false;
        }
        if (log_json.size()!=keys.length){
            return false;
        }else{
            Object value;
            for(int i=0;i< keys.length;i++){
                value = log_json.get(keys[i]);
                if (value == null){
                    return false;
                }
            }
            return true;
        }
    }
}

2、lmx.TimeStampInterceptor:

按照json中的时间戳给传入的文件打上时间戳,这样在flume读取文件到hdfs时可以根据头文件(head)中的时间戳设置分区

package lmx;

import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;

import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.JSONException;
import com.alibaba.fastjson2.JSONObject;

public class TimeStampInterceptor implements Interceptor {
    private ArrayList<Event> my_events = new ArrayList<>();
    @Override
    public void initialize() {}

    @Override
    public Event intercept(Event event) {
        Map<String, String> headers = event.getHeaders();
        String log = new String(event.getBody(), StandardCharsets.UTF_8);
        JSONObject jsonObject = JSONObject.parseObject(log);
        String ts = jsonObject.getString("timestamp");
        headers.put("timestamp", ts);
        return event;
    }

    @Override
    public List<Event> intercept(List<Event> events) {
        my_events.clear();
        for (Event event : events) {
            my_events.add(intercept(event));
        }
        return my_events;
    }

    @Override
    public void close() {}
    public static class Builder implements Interceptor.Builder {
        @Override
        public Interceptor build() {
            return new TimeStampInterceptor();
        }

        @Override
        public void configure(Context context) {
        }
    }
}

 配置文件

主要就是设置了拦截器以及输入传出的地址

  • 监控位置:/mnt/hgfs/linux_share1
  • 拦截器:lmx.interceptor、lmx.TimeStampInterceptor
  • 输出位置:hdfs://node1:8020/Project/%Y-%m-%d(时间戳来自于拦截器给予头文件中的时间戳)
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source

a1.sources.r1.type = spooldir
a1.sources.r1.spoolDir =/mnt/hgfs/linux_share1
a1.sources.r1.fileSuffix = .COMPLETED
a1.sources.r1.fileHeader = true
# #忽略所有以.tmp 结尾的文件,不上传
a1.sources.r1.ignorePattern = ([^ ]*\.tmp)

#配置拦截器
a1.sources.r1.interceptors = i1 i2
a1.sources.r1.interceptors.i1.type = lmx.interceptor$Builder
a1.sources.r1.interceptors.i2.type = lmx.TimeStampInterceptor$Builder

# Describe the sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path =hdfs://node1:8020/Project/%Y-%m-%d
a1.sinks.k1.hdfs.filePrefix = logs-

a1.sinks.k1.hdfs.round = false
a1.sinks.k1.hdfs.fileType = DataStream
a1.sinks.k1.hdfs.rollInterval = 10
a1.sinks.k1.hdfs.rollSize = 134217700
a1.sinks.k1.hdfs.rollCount = 0

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

开启flume

bin/flume-ng agent -c conf/ -n a1 -f job/flume-netcat-logger.conf -Dflume.root.logger=INFO,console

在windows文件夹下,传输完成的文件会被打上.completed标签 

此时在hdfs中就会产生类似下面这样的数据

点进去就是文件的block块,副本数为3

数仓建设

这一章节不涉及理论讲解,按照标准大数据建模方式进行

ODS层

建立hive外部表,以dt分区,这里设计ODS表跟源文件一样,只有一个字段,放置原始的json

create external table ods_Browser_behavior_log
(
    line STRING
)partitioned by (dt string)
location '/Browser_behavior/ods/ods_Browser_behavior_log';

 然后需要将hdfs中的文件载入到hive中:

load data inpath '/Project/2024-04-13' into table ods_Browser_behavior_log partition (dt='2024-04-13');

此处有多个文件,一个一个导入太麻烦,可以写一个linux脚本:

  • 脚本名字:origin_to_ods_init_behavior_log.sh
  • 输入:start_date 、end_date
#!/bin/bash

if [ $# -ne 2 ]; then
	echo "useage origin_to_ods_init_behavior_log.sh start_date end_date"
	exit
fi
EXPORT_START_DATE=$1
EXPORT_END_DATE=$2
i=$EXPORT_START_DATE
while [[ $i < `date -d "+1 day $EXPORT_END_DATE" +%Y-%m-%d` ]]
do
SQL="load data inpath '/Project/$i' into table Browser_behavior.ods_Browser_behavior_log partition(dt='$i');"
bin/hive -e "$SQL"
i=`date -d "+1 day $i" +%Y-%m-%d`
done

数据预览: 

在hdfs中的文件:

DWD层

对ODS数据做初步处理,粒度与ODS层一样,但是将数据展开为一般形式

  • 对于json数据,按照key展开即可
  • 对于user_agent数据,我们主要关注用户使用的系统和浏览器版本
  • 对与email数据,后期业务可能需要邮箱类型进行用户画像,这里将邮箱种类进行提取
user_agent : "Mozilla/5.0 (Linux; Android 2.1) AppleWebKit/533.2 (KHTML, like Gecko) Chrome/26.0.878.0 Safari/533.2"

首先设计表结构: 

CREATE EXTERNAL TABLE dwd_Browser_behavior_log
(
    `log_id`   bigint COMMENT 'log_id',
    `user_id` bigint comment 'user_id',
    `name` STRING COMMENT '姓名',
    `sex`        STRING COMMENT '性别',
    `email`      STRING COMMENT '注册邮箱',
    `email_categroy`      STRING COMMENT '注册邮箱',
    `number`      STRING COMMENT '注册电话',
    `user_name`      STRING COMMENT '用户名',
    `password`      STRING COMMENT '密码',
    `province`      STRING COMMENT '省份',
    `city`      STRING COMMENT '城市',
    `url`         STRING COMMENT '访问的资源路径',
    `public_ip`  STRING COMMENT 'ip',
    `type`        STRING COMMENT '访问类型',
    `device`        STRING COMMENT '设备id',

    `user_agent`         String comment "user_agent",
    `ts`          bigint comment "时间戳",
    `system`         String comment "设备系统",
    `version`      String comment "浏览器版本"
) COMMENT '页面启动日志表'
    PARTITIONED BY (`dt` STRING)
    STORED AS ORC
    LOCATION '/Browser_behavior/dwd/dwd_Browser_behavior_log'
    TBLPROPERTIES ("orc.compress" = "snappy");

设计UDF函数提取user_agent中的系统和浏览器版本:

package lmx;

import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.exec.UDFArgumentLengthException;
import org.apache.hadoop.hive.ql.exec.UDFArgumentTypeException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
import org.apache.hadoop.io.Text;

import java.nio.charset.StandardCharsets;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

public class User_agent_transfer extends GenericUDF {

    @Override
    public Object evaluate(DeferredObject[] deferredObjects) throws HiveException {
        if(deferredObjects[0].get() == null){
            return "" ;
        }
        String system;
        String safari_version;
        Pattern pattern = Pattern.compile("(?<=\\().*?(?=;)");
        String user_agent = deferredObjects[0].get().toString();
        Matcher matcher = pattern.matcher(user_agent);
        if( matcher.find() ){
            system = matcher.group();
        }
        else{
            system = "unknow_system";
        }
//        拿到浏览器版本
        safari_version = user_agent.substring(user_agent.length()-5);
        return new Text((system+"_"+safari_version).getBytes(StandardCharsets.UTF_8));
    }
}

使用UDF获取邮箱类型:

package lmx;

import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.exec.UDFArgumentLengthException;
import org.apache.hadoop.hive.ql.exec.UDFArgumentTypeException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
import org.apache.hadoop.io.Text;

import java.nio.charset.StandardCharsets;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

public class emal_category extends GenericUDF {
    @Override
    public Object evaluate(DeferredObject[] deferredObjects) throws HiveException {
        if(deferredObjects[0].get() == null){
            return "" ;
        }
        Pattern pattern = Pattern.compile("(?<=@).*?(?=\\.)");
        String emial = deferredObjects[0].get().toString();
        Matcher matcher = pattern.matcher(emial);
        String email_category;
        if( matcher.find() ){
            email_category = matcher.group();
        }
        else{
            email_category = "unknow_email_category";
        }
        return new Text(email_category.getBytes(StandardCharsets.UTF_8));
    }
}

注意,一般来说,如果能使用sql解决的代码,尽量使用sql解决,因为在大数据场景下sql一般是更快的,当然,如果该代码逻辑的复用性很强也可以考虑沉淀为UDF 

将数据导入dwd:

insert overwrite table dwd_Browser_behavior_log partition (dt)
select get_json_object(line, '$.log_id'),
       get_json_object(line, '$.user_id'),
       get_json_object(line, '$.name'),
       get_json_object(line, '$.sex'),
       get_json_object(line, '$.email'),
       email_category(get_json_object(line, '$.email')),
       get_json_object(line, '$.number'),
       get_json_object(line, '$.user_name'),
       get_json_object(line, '$.password'),
       get_json_object(line, '$.province'),
       get_json_object(line, '$.city'),
       get_json_object(line, '$.url'),
       get_json_object(line, '$.public_ip'),
       get_json_object(line, '$.type'),
       get_json_object(line, '$.device'),
       get_json_object(line, '$.user_agent'),
        get_json_object(line, '$.timestamp'),
       split(user_agent_transfer(get_json_object(line, '$.user_agent')),"_")[0],
       split(user_agent_transfer(get_json_object(line, '$.user_agent')),"_")[1],
       dt
from ods_Browser_behavior_log;

数据预览:

在hdfs中的文件分布:

DWS层

DWS为初步汇总表,在这主要以用户为粒度进行初步汇总,可设计为宽表

面向浏览器浏览域,可以假定在某一场景下主要关注浏览次数指标,后期维度可能需要考虑用户、地区、系统、浏览器版本

首先设计表结构: 


CREATE EXTERNAL TABLE dws_Browser_behavior_user_behavior_cnt
(
    `user_id` bigint comment 'user_id',
    `sex`         STRING COMMENT '性别',
    `email_categroy`  STRING COMMENT '邮箱类型',
    `province`        STRING COMMENT '省份',
    `city`          STRING comment "城市",
    `type`        STRING COMMENT '访问类型',
    `system`        STRING COMMENT '访问系统',
    `version`        STRING COMMENT '浏览器版本',
    `cnt`        STRING COMMENT '访问次数'

) COMMENT 'dws用户行为次数记录'
    PARTITIONED BY (`dt` STRING)
    STORED AS ORC
    LOCATION '/Browser_behavior/dws/dws_Browser_behavior_user_behavior_cnt'
    TBLPROPERTIES ("orc.compress" = "snappy");

插入数据

insert overwrite table dws_Browser_behavior_user_behavior_cnt partition (dt)
select user_id
        ,max(sex) as sex
        ,email_categroy
        ,province
        ,city
        ,type
        ,system
        ,version
        ,count(1) as cnt
        ,dt
from dwd_browser_behavior_log
group by user_id
        ,email_categroy
        ,province
        ,city
        ,type
        ,system
        ,version
        ,dt
;

 数据预览:

ADS层

ADS层在开发中一般面向报表,需要对维度进行拓展分层

insert overwrite table ads_Browser_behavior_user_behavior_cnt partition (dt)
select
    sex_explode.sex
    ,email_categroy_explode.email_categroy
    ,province_explode.province
    ,city_explode.city
    ,type_explode.type
    ,system_explode.system
    ,version_explode.version
    ,sum(a.cnt) as cnt
    ,count(distinct a.user_id) as uv
    ,'2024-04-01' as dt
from (
         select user_id
              , concat('all;', sex)                 as sex
              , concat('all;', email_categroy)      as email_categroy
              , concat('all;', province)            as province
              , concat('all;', city)                as city
              , concat('all;', type)                as type
              , concat('all;', system)              as system
              , concat('all;', case
                                   when cast(version as double) < 532 then "低级版本"
                                   when cast(version as double) < 534 THEN "中级版本"
                                   else "高级版本" end) as version
              , cnt
         from dws_Browser_behavior_user_behavior_cnt
         where dt = '2024-04-01'
     )a
lateral view explode(split(sex,';')) sex_explode as sex
lateral view explode(split(email_categroy,';')) email_categroy_explode as email_categroy
lateral view explode(split(province,';')) province_explode as province
lateral view explode(split(city,';')) city_explode as city
lateral view explode(split(type,';')) type_explode as type
lateral view explode(split(system,';')) system_explode as system
lateral view explode(split(version,';')) version_explode as version
group by
    sex_explode.sex
    ,email_categroy_explode.email_categroy
    ,province_explode.province
    ,city_explode.city
    ,type_explode.type
    ,system_explode.system
    ,version_explode.version
;

Spark计算

PySpark(一)Spark原理介绍、PySpark初体验及原理_pyspark读取hdfs数据的原理-CSDN博客

PySpark(二)RDD基础、RDD常见算子_pyspark rdd算子-CSDN博客

PySpark(三)RDD持久化、共享变量、Spark内核制度,Spark Shuffle、Spark执行流程_pyspark storagelevel-CSDN博客

PySpark(四)PySpark SQL、Catalyst优化器、Spark SQL的执行流程、Spark新特性_pyspark catalyst-CSDN博客

可以使用spark对原始数据进行一些数据计算: 

计算每天访问量大于1的用户的人数:

    spark = SparkSession.builder.appName('lmx').master('local[*]').getOrCreate()
    sc = spark.sparkContext
    res = []
    for i in range(9):
        location = 'hdfs://node1:8020/Browser_behavior/ods/ods_Browser_behavior_log/dt=2024-04-0' + str(i+1)
        rdd_set = sc.textFile(location).map(lambda x:json.loads(x)['user_id'])
        rdd_user = rdd_set.map(lambda x:(x,1)).reduceByKey(lambda a,b:a+b).filter(lambda x:x[1]>1)
        count = rdd_user.map(lambda x:(x,1)).count()
        res.append(['2024-04-0'+str(i+1),count])
    rdd_res = sc.parallelize(res)

    df = spark.createDataFrame(rdd_res,schema=['dt','cnt'])
    df.printSchema()  # 打印表结构
    df.show()  # 打印表

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2174226.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

浅谈域攻防渗透之道-凭据获取

静时修止动修观&#xff0c;历历情人挂眼前&#xff1b;若把此心以学道&#xff0c;即身成佛有何难&#xff1f; 前言 通过提权得到了⼀个⾼权限的⽤户身份&#xff0c;例如获取到了 SYSYEM 权限后&#xff0c;就可以抓当前机器上各类密码&#xff1a;机器密码、浏览器密码、…

asynDriver-2

操作理论 初始化 在初始化中&#xff0c;端口驱动注册每个通信端口以及所有支持的接口。 用户代码创建一个asynUser, 它是访问asynDriver功能的"句柄"&#xff0c;通过调用&#xff1a; pasynManager->createAsynUser(processCallback,timeoutCallback); 一个…

基于单片机语音智能导盲仪仿真设计

文章目录 前言资料获取设计介绍设计程序具体实现截图设计获取 前言 &#x1f497;博主介绍&#xff1a;✌全网粉丝10W,CSDN特邀作者、博客专家、CSDN新星计划导师&#xff0c;一名热衷于单片机技术探索与分享的博主、专注于 精通51/STM32/MSP430/AVR等单片机设计 主要对象是咱们…

VulnHub-SickOs1.1靶机笔记

SickOs1.1靶机笔记 概述 Vulnhub的靶机sickos1.1 主要练习从互联网上搜取信息的能力&#xff0c;还考察了对代理使用&#xff0c;目录爆破的能力&#xff0c;很不错的靶机 靶机地址&#xff1a; 链接: https://pan.baidu.com/s/1JOTvKbfT-IpcgypcxaCEyQ?pwdytad 提取码: yt…

AFSim仿真系统 --- 系统简解_02(向导模块)

向导 向导是AFSIM的集成开发环境。它提供了视觉和基于文本的工具&#xff0c;以简化场景的开发和执行。 向导支持嵌入式执行基于文本的WSF应用程序&#xff0c;例如任务和传感器图&#xff0c;并提供快捷方式以方便启动其他WSF视觉应用程序&#xff0c;如Warlock和Mystic。 核…

图解IRF

FW1 配置思路 ① 配置IRF优先级 确认设备的主次 ② 设置批量操作的接口方便后续操作 interface range name fw-irf interface GigabitEthernet1/0/2 to GigabitEthernet1/0/3 ③ 接口 showdown 关闭接口 ④ 创建的IRF 1/1 成员的对应的接口的是 GE1/0/2 GE/1/0/3 ⑤ 开放IRF对…

Mathematica线性优化-单纯形/改善单纯形/内点法

引言 Mathematica提供了多种工具和函数来实现线性优化&#xff0c;这些工具可以处理从简单的线性规划问题到复杂的多变量优化问题&#xff0c;最近运筹学作业要熟悉线性优化的编程方法&#xff0c;我们就使用mathematica进行&#xff1a;所有运行代码都在文章上面的资源中&…

Python | Leetcode Python题解之第435题无重叠区间

题目&#xff1a; 题解&#xff1a; class Solution:def eraseOverlapIntervals(self, intervals: List[List[int]]) -> int:if not intervals:return 0intervals.sort(keylambda x: x[1])n len(intervals)right intervals[0][1]ans 1for i in range(1, n):if intervals…

c++速成 01 数据类型与基本运算符

文章目录 前言整型整型短整型长整型无符号整型 浮点型单精度双精度长双精度 变量命名规则&#xff1a;局部变量 全局变量基本运算符算术运算符&#xff1a;赋值运算符比较运算符逻辑运算符位运算符杂项运算符运算符间的优先级 前言 写在前面&#xff1a;本笔记参考b站视频【《…

从零开始手写STL库:Stack

从零开始手写STL库–Stack的实现 Gihub链接&#xff1a;miniSTL 文章目录 从零开始手写STL库–Stack的实现一、stack是什么&#xff1f;二、stack要包含什么函数总结 一、stack是什么&#xff1f; 栈是一种后进先出&#xff08;LIFO&#xff0c;Last In First Out&#xff09…

前端常用动画 直接可以用的代码加详细流程和案例 能应付90%的开发场景

前端项目&#xff0c;特别是Toc的项目&#xff0c;一定少不了各种动效和动画效果。 葫芦七兄弟&#xff1a; CSS 动画 优点&#xff1a;兼容性强&#xff1b;浏览器针对的流畅度优化&#xff1b;语法简单&#xff1b;某些属性&#xff08;如 transform 和 opacity&#xff09;…

CSS 的背景样式

1.1 背景颜色 1.2 背景图片 1.3 背景平铺 1.4 背景图片位置 1.4.1 方位名词 1.4.2 精确单位 1.4.3 混合单位 1.5 背景图像固定 1.6 背景复合写法 1.7 背景色半透明 1.8 总结

Json-Rpc框架(Muduo库快速上手)

阅读导航 引言一、Muduo库简介二、Muduo库常见接口1. TcpServer类基础介绍2. EventLoop类基础介绍3. TcpConnection类基础介绍4. TcpClient类基础介绍5. Buffer类基础介绍 三、Muduo库使用示例⭕英译汉服务器⭕英译汉客户端 引言 在上一篇文章中&#xff0c;我们简要介绍了在项…

业务资源管理模式语言19

相关模式&#xff1a; 如果你考虑类“Resource Maintenance”和“Part used in maintenance”&#xff0c;那么是“Transaction-Transaction Line Item”模式的一个特例[Coa 97]。如果你考虑类“Part”和“Part used in maintenance”&#xff0c;那么是“Item Line Item”模式…

力扣 简单 104.二叉树的最大深度

文章目录 题目介绍解法 题目介绍 解法 如果知道了左子树和右子树的最大深度 l 和 r&#xff0c;那么该二叉树的最大深度即为max(l,r)1&#xff0c;而左子树和右子树的最大深度又可以以同样的方式进行计算。因此我们可以用递归的方法来计算二叉树的最大深度。具体而言&#xff…

动态规划(有背包问题)

目录 1.动态规划的介绍 2.动态规划的例题 第1道题 数字三角形 (如果想看递归写法可以到我的记忆化递归里去看看记忆化递归_将递归程序记忆化-CSDN博客) 第2道题最长公共子序列(模板) 第3道题 最长上升子序列 第4道题最大子段和 背包系列问题 01背包 完全背包 1.动态规…

scrapy爬虫基础

一、初识 创建项目&#xff1a; scrapy startproject my_one_project # 创建项目命令 cd my_one_project # 先进去&#xff0c; 后面在里面运行 运行爬虫命令为&#xff1a;scrapy crawl tk spiders下创建test.py 其中name就是scrapy crawl tk &…

LeetCode讲解篇之5. 最长回文子串

文章目录 题目描述题解思路题解代码 题目描述 题目链接 题解思路 从中心点先寻找和中心点相等的左右端点&#xff0c;在基于左右端点进行往外扩散&#xff0c;直至左右端点不相等或者越界&#xff0c;然后左右端点这个范围内就是我们找寻的回文串&#xff0c;我们遍历中心点…

VS Code 配置 Anaconda Python 环境

VS Code 配置 Anaconda Python 环境 董同学是使用 PyCharm 进行 python 开发的老选手了&#xff0c;但同事们都在用 VS Code。为了更好的和大家交流&#xff0c;转身投入 VS Code 的怀抱&#xff08;当然我都要&#xff09;。当我兴致盎然打开 VS Code 软件&#xff0c;真丝滑…

自动化测试实例:Web登录功能性测试(无验证码)

&#x1f345; 点击文末小卡片 &#xff0c;免费获取软件测试全套资料&#xff0c;资料在手&#xff0c;涨薪更快 一、什么是自动化测试 把人为驱动的测试行为转化为机器执行的一种过程称为自动化测试。(来自百度百科)本质上来说&#xff0c;自动化测试对比起手工测试除了需要…