【实时数仓】实现用户行为日志相关功能(源码)

news2025/1/15 22:39:28

文章目录

  • 一 准备用户行为日志-DWD层
    • 1 代码实现
      • (1)识别新老访客
      • (2)利用侧输出流实现数据拆分
      • (3)将不同流的数据推送到下游kafka的不同Topic(分流)
        • a 封装方法
        • b 程序中调用kafka工具类获取sink
        • c 测试
    • 2 dwd层日志数据采集总结

一 准备用户行为日志-DWD层

1 代码实现

(1)识别新老访客

保存每个mid的首次访问日期,每条进入该算子的访问记录,都会把mid对应的首次访问时间读取出来,跟当前日期进行比较,只有首次访问时间不为空,且首次访问时间早于当日的,则认为该访客是老访客,否则是新访客。

同时如果是新访客且没有访问记录的话,会写入首次访问时间。

is_new为0不用修复,已经访问过,状态是正确的;为1时才会出现状态不准确的情况。

日志格式如下:

在这里插入图片描述

        // TODO 5 修复新老访客状态
        // 按照设备id进行分组
        // 匿名内部类
//        jsonObjDS.keyBy(new KeySelector<JSONObject, String>() {
//            @Override
//            public String getKey(JSONObject jsonObj) throws Exception {
//                return jsonObj.getJSONObject("common").getString("mid");
//            }
//        });
        // 按照设备id进行分组
        // lambda表达式
        KeyedStream<JSONObject, String> keyedDS = jsonObjDS.keyBy(
                jsonObj -> jsonObj.getJSONObject("common").getString("mid")
        );
        // 修复,修改json中的某个属性值
        SingleOutputStreamOperator<JSONObject> jsonObjWithNew = keyedDS.map(
                new RichMapFunction<JSONObject, JSONObject>() {
                    // 不能在声明的时候初始化状态,这时还不能获取到RuntimeContext
                    // 富函数只有在调用到open方法时,才可以获取到RuntimeContext
                    private ValueState<String> lastVisitDateState;
                    private SimpleDateFormat sdf;

                    @Override
                    public void open(Configuration parameters) throws Exception {
                        lastVisitDateState = getRuntimeContext().getState(new ValueStateDescriptor<String>("lastVisitDateState", Types.STRING));
                        sdf = new SimpleDateFormat("yyyyMMdd");
                    }

                    @Override
                    public JSONObject map(JSONObject jsonObj) throws Exception {
                        String isNew = jsonObj.getJSONObject("common").getString("is_new");
                        if (isNew.equals("1")) {
                            // 如果之前系统判断是新访客,可能出现错误
                            // 将之前的访问信息保存到状态中
                            String lastVisitDate = lastVisitDateState.value();
                            String curVisitDate = sdf.format(jsonObj.getLong("ts"));
                            // 判断状态中的上次访日期是否为空
                            if (lastVisitDate != null && lastVisitDate.length() > 0) {
                                // 访问过
                                // 判断是否在同一天访问
                                if (!lastVisitDate.equals(curVisitDate)) {
                                    isNew = "0";
                                    jsonObj.getJSONObject("common").put("is_new", isNew);
                                }
                            } else {
                                lastVisitDateState.update(curVisitDate);
                            }
                        }
                        return jsonObj;
                    }
                }
        );

(2)利用侧输出流实现数据拆分

根据日志数据内容,将日志数据分为3类,页面日志、启动日志和曝光日志。页面日志输出到主流,启动日志输出到启动侧输出流,曝光日志输出到曝光日志侧输出流。

// TODO 6 按照日志类型对日志进行分流
// 启动 -- 启动侧输出流
// 曝光 -- 曝光侧输出流
// 页面 -- 主流
// 声明侧输出流标签
OutputTag<String> startTag = new OutputTag<String>("start"){};
OutputTag<String> displayTag = new OutputTag<String>("display"){};
// 使用Flink侧输出流进行分流
SingleOutputStreamOperator<String> pageDS = jsonObjWithNew.process(
        new ProcessFunction<JSONObject, String>() {
            @Override
            public void processElement(JSONObject jsonObj, Context context, Collector<String> out) throws Exception {
                // 获取启动jsonObj
                JSONObject startJsonObj = jsonObj.getJSONObject("start");
                String jsonStr = jsonObj.toJSONString();
                Long ts = jsonObj.getLong("ts");
                // 判断是否为启动日志
                if (startJsonObj != null && startJsonObj.size() > 0) {
                    // 放到启动侧输出流
                    context.output(startTag, jsonStr);
                } else {
                    // 如果不是启动日志,那么其他日志都属于页面日志,放到主流中
                    out.collect(jsonStr);

                    String pageId = jsonObj.getJSONObject("page").getString("page_id");
                    // 判断是否是曝光日志,曝光日志是一个数组
                    JSONArray displaysArr = jsonObj.getJSONArray("displays");
                    if (displaysArr != null && displaysArr.size() > 0) {
                        // 遍历数据,获取每一条曝光数据
                        for (int i = 0; i < displaysArr.size(); i++) {
                            JSONObject displayJsonObj = displaysArr.getJSONObject(i);
                            displayJsonObj.put("ts", ts);
                            displayJsonObj.put("page_id", pageId);
                            // 将曝光日志输出到曝光侧输出流
                            context.output(displayTag, displayJsonObj.toJSONString());
                        }
                    }
                }
            }
        }
);
// 获取不同流数据,输出测试
DataStream<String> startDS = pageDS.getSideOutput(startTag);
DataStream<String> displayDs = pageDS.getSideOutput(displayTag);
pageDS.print("主流:");
startDS.print("启动流:");
displayDs.print("曝光流:");

开启需要的环境,运行程序,执行日志数据生成jar包,观察输出结果。

(3)将不同流的数据推送到下游kafka的不同Topic(分流)

a 封装方法

在MyKafkaUtil中封装获取kafka生产者的方法。

// 获取kafka的生产者
public static FlinkKafkaProducer<String> getKafkaSink(String topic){
    return new FlinkKafkaProducer<String>(KAFKA_SERVER,topic,new SimpleStringSchema());
}

b 程序中调用kafka工具类获取sink

// TODO 7 将不同流的数据写到kafka的dwd不同主题中
pageDS.addSink(MyKafkaUtil.getKafkaSink("dwd_page_log"));
startDS.addSink(MyKafkaUtil.getKafkaSink("dwd_start_log"));
displayDs.addSink(MyKafkaUtil.getKafkaSink("dwd_diaplay_log"));

c 测试

# 开启三个kafka消费者
kfkcon.sh dwd_page_log
kfkcon.sh dwd_start_log
kfkcon.sh dwd_display_log
# 启动程序,生成日志数据,观察结果

结果如下图:

在这里插入图片描述

2 dwd层日志数据采集总结

日志数据分流执行流程

  • 需要启动的进程

    • zookeeper
    • kafka
    • 如果开启检查点,需要启动hdfs
    • logger.sh【日志采集 + nginx】
  • 生成模拟生成日志的jar包

  • 将生成的日志发送给Nginx

  • Nginx接收到数据之后,进行请求转发,将请求发送给101,102,103上的日志采集服务

  • 日志采集服务对数据进行输出、落盘以及发送到kafka的ods_base_log

  • BaseLogApp从ods_base_log读取数据

    • 结构转换 String -> JSONObject

    • 状态修复 分组、修复

    • 分流

    • 将分流后的数据写到kafka的dwd不同主题中

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/85586.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

数据链路层

文章目录数据链路层的功能ARP协议DNS-------域名解析&#xff08;浅浅的了解一下&#xff09;在浏览器中输入URL后&#xff0c;发生的事情&#xff08;经典面试题&#xff09;ICMP协议NAT技术代理服务器网络核心知识大总结数据链路层的功能 对比理解网络层。 网络层 &#xff…

飞控学习随记

常见指令 编译Arduplane程序 cd ardupilot/ ./waf plane 进入 Tools/autotest 文件夹中&#xff0c;启动3D flightgear ./fg_quad_view.sh 进入ArduPLane文件夹中&#xff0c;启动仿真 sim_vehicle.py --map --console -L KSFO&#xff08;-L 选择起飞位置&#xff09; 解锁…

字节女测试工程师万字总结的软件测试入门技巧

成为一个优秀的测试工程师需要具备哪些知识和经验&#xff1f; 针对这个问题&#xff0c;可以直接拆分以下三个小问题来详细说明&#xff1a; 1、优秀软件测试工程师的标准是什么&#xff1f; 2、一个合格的测试工程师需要具备哪些专业知识&#xff1f; 3、一个合格的测试工程…

前端vue项目部署到生产环境(包括nginx安装及配置)

一.vue3项目打包 vue3项目 使用vue-cli创建的&#xff0c;使用npm run build打包到dist 二.在服务器上安装nginx 1.去nginx的官网下载windows版本的nginx&#xff0c;下载地址&#xff1a;nginx: download 最好安装稳定版&#xff0c;下载完成后解压nginx压缩包&#xff1a…

Android Studio实现数独小游戏,休闲益智

文章目录一、项目概述二、开发环境三、详细设计3.1 界面设计3.2 逻辑设计四、运行演示一、项目概述 数独是一种逻辑解谜游戏&#xff0c;它规则稍复杂&#xff0c;解题过程富有挑战性。本次安卓数独小游戏&#xff0c;主页面有继续游戏、新游戏、关于和退出四个功能&#xff0…

【实训项目】教师工作量管理系统(超级详细)

目录 一、需求与分析 1. 项目概述 1.1 教师信息处理 1.2 教师工作量数据处理&#xff1a; 1.3 教师综合信息输出 2. 需求分析 3. 模块设计 3.1 功能模块 3.2 所有功能模块的流程图 二、设计与实现 1. 程序设计 1.1 教师工作量管理系统 1.2 登录系统 1.3 主函数…

初级算法之字符串

344. 反转字符串 编写一个函数&#xff0c;其作用是将输入的字符串反转过来。输入字符串以字符数组 s 的形式给出。 思路一: 从中间开始向两边遍历,然后两边交换位置,最终获得字符串的反转 // class Solution {public void reverseString(char[] s) {int len s.length,siz…

二、JavaScript——Hello World

1. 创建文件 提前在本地新建好文件夹用于存储项目代码&#xff0c;再通过VSode打开指定存储代码的指定文件夹&#xff0c;并新建HelloWorld.html文件 HelloWorld.html文件新建成功之后&#xff0c;输入“&#xff01;”点击自动生成标签 自动生成的标签如下&#xff1a; <!…

02Golang执行流程简介

Golang执行流程简介Golang执行流程的分析两种流程的方式区别什么是编译什么是运行Go程序开发注意事项Golang执行流程的分析 如果是对源码编译后&#xff0c;再执行&#xff0c;go的执行流程如下 如果对源码直接执行go run源码&#xff0c;go的执行流程如下 两种流程的方式区…

副业想做自媒体可以选择什么领域,适合宝妈的三个自媒体领域推荐

大家好&#xff0c;我是蝶衣王的小编&#xff0c;今天说说自媒体可以选择的领域 在过去的两年里&#xff0c;最受欢迎的职业之一必须属于自媒体。无论是全职还是副业&#xff0c;每个人都可以这样做。许多人经常在互联网上看到&#xff0c;通过自媒体&#xff0c;月收入数千或…

【汽车电子】can报文和can database(candbc)

1.can就是controller area network&#xff0c;是面向汽车的通信协议&#xff0c;通俗来讲就是在汽车电子控制领域中的不同部分进行通信的&#xff08;传输数据&#xff09;。 2.can报文有标准帧和扩展帧两种&#xff0c;也就是can和canfd&#xff0c;canfd是can的升级版&…

vscode git拉下来后LF CRLF问题

点击这里可以更改红色报错&#xff0c; windows下默认是CRLF, 类unix下LF 若CRLF数量非常多&#xff0c;解决方法&#xff1a; vscode 1. 在设置里Eol 选\n 2. 在根路径.editorconfig 里end_of_line lf 以上都不生效 可以有两种解决方案&#xff1a; 一. 下载插件EditorCo…

linux系统中如何挂载数据盘

大家好&#xff0c;我是雄雄&#xff0c;欢迎关注微信公众号&#xff1a;雄雄的小课堂。 前言 作为一个开发人员&#xff0c;和服务器打交道是必不可少的&#xff0c;当然要和运维相比的话还是差点儿。 但是&#xff0c;在公司&#xff0c;作为一个程序员&#xff0c;难免会遇…

微电网优化调度|基于多目标粒子群算法的微电网优化调度【风、光、储能、柴油机、电网交互燃汽轮机】(Matlab代码实现)

&#x1f4a5;&#x1f4a5;&#x1f4a5;&#x1f49e;&#x1f49e;&#x1f49e;欢迎来到本博客❤️❤️❤️&#x1f4a5;&#x1f4a5;&#x1f4a5; &#x1f389;作者研究&#xff1a;&#x1f3c5;&#x1f3c5;&#x1f3c5;本科计算机专业&#xff0c;研究生电气学硕…

(十三)Vue之监测数据改变的原理

文章目录监测数据改变的原理之对象vue.set的使用监测数据改变的原理之数组Vue学习目录 上一篇&#xff1a;&#xff08;十二&#xff09;Vue之列表渲染 先看一个需求&#xff1a;使用列表渲染出一组数据&#xff0c;然后点击按钮更新其中一个信息 <!--准备好一个容器--&…

微服务中统一日志-ELK

微服务中统一日志-ELK一.简介1.介绍2.流程3.要求4.下载地址二.安装Elasticsearch1.创建文件存放目录2.进入目录3.下载4.解压5.修改配置5.1.介绍5.2.系统配置5.3修改es配置5.4启动&#xff0c;测试三.安装Kibana1.进入目录2.下载3.解压4.修改配置4.1介绍4.2修改kibana配置4.3启动…

面试八股-JVM

1.Java代码编译过程 准备过程&#xff1a;初始化插入式注解处理器解析与填充符号表过程 词法、语法分析&#xff0c;将字符流转为标记集合&#xff0c;构造抽象语法树填充符号表&#xff0c;产生符号地址和符号信息 插入式注解处理器的注解处理分析与字节码生成过程 标注检查&…

描述统计 | 学习笔记

一.导论 统计学是通过收集&#xff0c;整理&#xff0c;分析&#xff0c;描述数据等手段&#xff0c;以达到推断所测对象的本质&#xff0c;甚至预测对象未来的一门综合性科学。其目的是探索数据的内在数量规律性&#xff0c;以达到对客观事物的科学认识 统计的本业是消化数据…

java语言【#107. 七的奇倍数】(已通过)

题目描述 ​ 如果一个数既是 7 的倍数又不能被 2 整除&#xff0c;那么我们称之为七的奇倍数。 ​ 输入一个正整数 n&#xff0c;判断它是否是 7 的奇倍数。 输入 ​ 输入一个正整数 n &#xff08;0≤n≤100&#xff09; 输出 ​ 如果 n 是 7 的奇倍数 输出 YES 否则输出 NO…

校园二手市场开题报告范文

目录 一、课题意义&#xff08;包括课题的理论意义和现实意义&#xff09; &#xff08;一&#xff09;理论意义 &#xff08;二&#xff09;现实意义 二、文献综述&#xff08;包括&#xff1a;1.理论的渊源及演进过程2.国内外对本课题的研究现状和有待解决的问题3.本人对所…