【Flink实时数仓】数据仓库项目实战 《五》流量域来源关键词粒度页面浏览各窗口汇总表 【DWS】

news2025/1/8 21:49:26

文章目录

  • 【Flink实时数仓】数据仓库项目实战 《五》流量域来源关键词粒度页面浏览各窗口汇总表 【DWS】
    • 1.1流量域来源关键词粒度页面浏览各窗口汇总表(FlinkSQL)
      • 1.1.1 主要任务
      • 1.1.2 思路分析
      • 1.1.3 图解
      • 1.1.4 代码

【Flink实时数仓】数据仓库项目实战 《五》流量域来源关键词粒度页面浏览各窗口汇总表 【DWS】

设计要点:
(1)DWS层的设计参考指标体系;
(2)DWS层表名的命名规范为dws_数据域_统计粒度_业务过程_统计周期(window)
注:window 表示窗口对应的时间范围。

1.1流量域来源关键词粒度页面浏览各窗口汇总表(FlinkSQL)

1.1.1 主要任务

从 Kafka 页面浏览明细主题读取数据,过滤搜索行为,使用自定义 UDTF(一进多出)函数对搜索内容分词。统计各窗口各关键词出现频次,写入 ClickHouse。

1.1.2 思路分析

分词是个一进多出的过程,需要一个 UDTF 函数来实现,FlinkSQL 没有提供相关的内置函数,所以要自定义 UDTF 函数。此处将借助 IK 分词器完成分词。最终要将数据写入 ClickHouse,需要补充相关依赖,封装 ClickHouse 工具类和方法。

1.1.3 图解

在这里插入图片描述

1.1.4 代码

代码来自尚硅谷,微信关注尚硅谷公众号 回复: 大数据 即可获取源码及资料。

展示主流程代码。具体工具类及实现请下载源码。

package com.atguigu.app.dws;

import com.atguigu.app.func.SplitFunction;
import com.atguigu.bean.KeywordBean;
import com.atguigu.utils.MyClickHouseUtil;
import com.atguigu.utils.MyKafkaUtil;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;

//数据流:web/app -> Nginx -> 日志服务器(.log) -> Flume -> Kafka(ODS) -> FlinkApp -> Kafka(DWD) -> FlinkApp -> ClickHouse(DWS)
//程  序:     Mock(lg.sh) -> Flume(f1) -> Kafka(ZK) -> BaseLogApp -> Kafka(ZK) -> DwsTrafficSourceKeywordPageViewWindow > ClickHouse(ZK)
public class DwsTrafficSourceKeywordPageViewWindow {

    public static void main(String[] args) throws Exception {

        //TODO 1.获取执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

        // 1.1 状态后端设置
//        env.enableCheckpointing(3000L, CheckpointingMode.EXACTLY_ONCE);
//        env.getCheckpointConfig().setCheckpointTimeout(60 * 1000L);
//        env.getCheckpointConfig().setMinPauseBetweenCheckpoints(3000L);
//        env.getCheckpointConfig().enableExternalizedCheckpoints(
//                CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION
//        );
//        env.setRestartStrategy(RestartStrategies.failureRateRestart(
//                3, Time.days(1), Time.minutes(1)
//        ));
//        env.setStateBackend(new HashMapStateBackend());
//        env.getCheckpointConfig().setCheckpointStorage(
//                "hdfs://hadoop102:8020/ck"
//        );
//        System.setProperty("HADOOP_USER_NAME", "atguigu");

        //TODO 2.使用DDL方式读取Kafka page_log 主题的数据创建表并且提取时间戳生成Watermark
        String topic = "dwd_traffic_page_log";
        String groupId = "dws_traffic_source_keyword_page_view_window_211126";
        tableEnv.executeSql("" +
                "create table page_log( " +
                "    `page` map<string,string>, " +
                "    `ts` bigint, " +
                "    `rt` as TO_TIMESTAMP(FROM_UNIXTIME(ts/1000)), " +
                "    WATERMARK FOR rt AS rt - INTERVAL '2' SECOND " +
                " ) " + MyKafkaUtil.getKafkaDDL(topic, groupId));

        //TODO 3.过滤出搜索数据
        Table filterTable = tableEnv.sqlQuery("" +
                " select " +
                "    page['item'] item, " +
                "    rt " +
                " from page_log " +
                " where page['last_page_id'] = 'search' " +
                " and page['item_type'] = 'keyword' " +
                " and page['item'] is not null");
        tableEnv.createTemporaryView("filter_table", filterTable);

        //TODO 4.注册UDTF & 切词
        tableEnv.createTemporarySystemFunction("SplitFunction", SplitFunction.class);
        Table splitTable = tableEnv.sqlQuery("" +
                "SELECT " +
                "    word, " +
                "    rt " +
                "FROM filter_table,  " +
                "LATERAL TABLE(SplitFunction(item))");
        tableEnv.createTemporaryView("split_table", splitTable);
        tableEnv.toAppendStream(splitTable, Row.class).print("Split>>>>>>");

        //TODO 5.分组、开窗、聚合
        Table resultTable = tableEnv.sqlQuery("" +
                "select " +
                "    'search' source, " +
                "    DATE_FORMAT(TUMBLE_START(rt, INTERVAL '10' SECOND),'yyyy-MM-dd HH:mm:ss') stt, " +
                "    DATE_FORMAT(TUMBLE_END(rt, INTERVAL '10' SECOND),'yyyy-MM-dd HH:mm:ss') edt, " +
                "    word keyword, " +
                "    count(*) keyword_count, " +
                "    UNIX_TIMESTAMP()*1000 ts " +
                "from split_table " +
                "group by word,TUMBLE(rt, INTERVAL '10' SECOND)");

        //TODO 6.将动态表转换为流
        DataStream<KeywordBean> keywordBeanDataStream = tableEnv.toAppendStream(resultTable, KeywordBean.class);
        keywordBeanDataStream.print(">>>>>>>>>>>>");

        //TODO 7.将数据写出到ClickHouse
        keywordBeanDataStream.addSink(MyClickHouseUtil.getSinkFunction("insert into dws_traffic_source_keyword_page_view_window values(?,?,?,?,?,?)"));

        //TODO 8.启动任务
        env.execute("DwsTrafficSourceKeywordPageViewWindow");

    }

}

clickhouse表结果输出


hadoop102 :) select * from dws_traffic_source_keyword_page_view_window;
SELECT *
FROM dws_traffic_source_keyword_page_view_window

Query id: f9c8f52a-d23d-4250-b39b-c6d3621da797
┌─────────────────stt─┬─────────────────edt─┬─source─┬─keyword──┬─keyword_count─                      ┬────────────ts─┐
│ 2022-06-14 23:44:40 │ 2022-06-14 23:44:50 │ search │ 口红     │             3                       │ 1669218554000 │
│ 2022-06-14 23:44:40 │ 2022-06-14 23:44:50 │ search │ 图书     │             2 					  │ 1669218554000 │
└─────────────────────┴─────────────────────┴────────┴──────────┴─────────────────────────┘
                   

idea输出

Split>>>>>>> +I[图书, 2022-11-23T20:55:34]
Split>>>>>>> +I[图书, 2022-11-23T20:55:36]
Split>>>>>>> +I[口红, 2022-11-23T20:55:37]
Split>>>>>>> +I[口红, 2022-11-23T20:55:39]
Split>>>>>>> +I[口红, 2022-11-23T20:55:39]
>>>>>>>>>>>>> KeywordBean(stt=2022-11-23 20:55:30, edt=2022-11-23 20:55:40, source=search, keyword=图书, keyword_count=2, ts=1671454538000)
>>>>>>>>>>>>> KeywordBean(stt=2022-11-23 20:55:30, edt=2022-11-23 20:55:40, source=search, keyword=口红, keyword_count=3, ts=1671454538000)
>```

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/102191.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

第七章Servlet

文章目录什么是Servlet为什么需要Servlet从不同角度来看Servlet总过程Servlet之间的继承关系Servlet接口接口中方法GenericServlet抽象方法HttpServlet 抽象子类小结Servlet生命周期从Servlet接口方法开始修改Servlet创建对象的时机Servlet容器Servlet相关的保存作用域request&…

零基础如何学好Python开发?

作为一个零基础小白想学好Python开发应该先确定明确目标、做好学习Python系统规划、选择适合的开发工具、进阶提升学习规划、多练多看加深对Python程序的理解&#xff0c;想入门一门编程语言就需要不断的进行练习。 一、明确目标 很多人在学习Python之前了解很少&#xff0c;很…

ShareSDK 安装教程

一、ShareSDK简介 ShareSDK是一种社会化分享组件&#xff0c;为iOS、Android、WP8 的APP提供社会化功能&#xff0c;集成了一些常用的类库和接口&#xff0c;缩短开发者的开发时间&#xff0c;还有社会化统计分析管理后台。ShareSDK移动开发者服务平台由广州掌淘网络科技有限公…

【C++进阶之路第一卷】预编译头加快编译速度

一、前言 最近在写项目的时候&#xff0c;发现随着项目越来越大&#xff0c;编译需要的时间也越来越长&#xff0c; 然后使用了预编译头&#xff0c;时间减少了很多&#xff01; 这个谁用谁知道&#xff0c;很 Nice&#xff01; 1. 预编译头的原理 简单来说就是将一些你认…

广域网简介、PE/CE/P基本概念理解、PPP协议详细介绍、PAP/CHAP认证介绍与配置、PPPOE会话建立详细介绍并配合实验抓包理解报文交互。

3.1.0 广域网&#xff08;简介、PPP、PAP、CHAP、PPPOE&#xff09; 观前温馨提示&#xff1a; 篇幅较大&#xff0c;本章主要有以下大点&#xff0c;可通过目录与右侧导航跳转观看&#xff1a; &#xff08;1&#xff09;广域网基本概念 &#xff08;2&#xff09;PPP协议介…

【Numpy基础知识】在ndarrays上索引

在ndarrays上索引 来源&#xff1a;Numpy官网&#xff1a;https://numpy.org/doc/stable/user/basics.html 文章目录在ndarrays上索引导包【1】基本索引【2】高级索引【3】结合高级索引和基本索引【3】现场访问【4】展开迭代器索引【5】为索引数组赋值【6】处理程序中可变数量的…

Python3 环境搭建

本章节我们将向大家介绍如何在本地搭建 Python3 开发环境。 Python3 可应用于多平台包括 Windows、Linux 和 Mac OS X。 Unix (Solaris, Linux, FreeBSD, AIX, HP/UX, SunOS, IRIX, 等等。)Win 9x/NT/2000Macintosh (Intel, PPC, 68K)OS/2DOS (多个DOS版本)PalmOSNokia 移动手…

浅谈转行Python的看法,分享我的学习方法

今天跟大家聊一下转行Python的看法和经验。本人之前是做Java开发的&#xff0c;后面因为公司需要Python技术&#xff0c;就接触到了Python&#xff0c;我发现Python比Java更加容易理解&#xff0c;简洁&#xff0c;后面随着Python项目的增多干脆就转行做Python开发了。 Python…

LaTeX教程(三)——文档格式排版

文章目录1. 章节目录1.1 生成章节1.2 生成目录2. 交叉引用和脚注2.1 交叉引用2.2 脚注3. 特殊环境3.1 列表3.2 文本对齐3.3 引用环境3.4 代码环境1. 章节目录 1.1 生成章节 写文章或者论文的时候&#xff0c;章节目录可谓是必不可少的&#xff0c;下面我们来聊聊LaTeX怎么处理…

Linux——安装和使用vmtools

实验1 Linux系统初识 一、安装和使用vmtools vmware tools是虚拟机VMware Workstation自带的一款工具&#xff0c;现在介绍ubuntu linux安装VMare tools。它的作用就是使用户可以从物理主机直接往虚拟机里面拖文件。如果不安装它&#xff0c;我们是无法进行虚拟机和物理…

【火电机组、风能、储能】高比例风电电力系统储能运行及配置分析附Matlab代码

​✅作者简介&#xff1a;热爱科研的Matlab仿真开发者&#xff0c;修心和技术同步精进&#xff0c;matlab项目合作可私信。 &#x1f34e;个人主页&#xff1a;Matlab科研工作室 &#x1f34a;个人信条&#xff1a;格物致知。 更多Matlab仿真内容点击&#x1f447; 智能优化算法…

第十三章ThreadLocal

文章目录什么是ThreadLocal基本使用常用方法使用ThreadLocal来解决使用sychronized解决ThreadLocal与synchronized的区别运用场景_事务案例场景构建引入事务JDBC中关于事务的操作的api常规解决方案常规方案的弊端ThreadLocal解决方案ThreadLocal方案的好处ThreaLocal的内部结构…

Java堆排序和代码实现详解

堆的定义 堆是计算机科学中一类特殊的数据结构的统称&#xff0c;堆通常可以被看做是一棵完全二叉树的数组对象。 堆的特性 1.它是完全二叉树&#xff0c;除了树的最后一层结点不需要是满的&#xff0c;其它的每一层从左到右都是满的&#xff0c;如果最后一层结点不是满的&…

Hadoop(入门)

一、Hadoop概述 1.1 Hadoop是什么 1&#xff09;Hadoop是一个由Apache基金会所开发的分布式系统基础架构。 2&#xff09;主要解决&#xff0c;海量数据的存储和海量数据的分析计算问题。 3&#xff09;广义上来说&#xff0c;Hadoop通常是指一个更广泛的概念—Hadoop生态圈。…

[Vue3]自定义指令实现组件元素可拖拽移动

实现思路&#xff1a; 元素移动设计思路 1.在光标按下的时刻记录下光标的绝对位置坐标&#xff08;以视窗左上角为原点&#xff09;&#xff08;const {clientX, clientY} evt&#xff09; clientX / clientY 事件属性返回当事件被触发时光标指针相对于浏览器页面当前 body …

flutter系列之:移动端手势的具体使用

文章目录简介赋予widget可以点击的功能会动的组件可删除的组件总结简介 之前我们介绍了GestureDetector的定义和其提供的一些基本的方法&#xff0c;GestureDetector的好处就是可以把任何一个widget都赋予类似button的功能。 今天将会通过几个具体的例子来讲解一下GestureDet…

用ChatGPT写一段嵌入式代码

已剪辑自: https://mp.weixin.qq.com/s/uKkUwXx32LPkUYQK44z1lw 废话不多说&#xff0c;开整&#xff01; ChatGPT: Optimizing Language Models for Dialogue&#xff0c;即优化对话的语言模型&#xff0c;它以对话的方式进行交互。对话形式使ChatGPT能够回答后续问题&#…

性能测试---LoadRunner

目录 1.LoadRunner对比Jmeter的优势 2.LoadRunner三个组件之间的关系 3.学习VUG的使用 3.1创建性能测试脚本并进行录制 第一步:打开VUG,创建一个新的性能测试的脚本 第二步:对新建的脚本进行设置 第三步:启动WebTours服务 第四步:回到VUG中,点击录制按钮并设置录制选项…

学习编程的五个关键点!你需要get它,并运用!

总体来说&#xff0c;学习如何编程是一件较难的事情。我最近发现大学里的计算机课程和各种编程训练营错过了编程的一些重要因素&#xff0c;对新手的教学用了不太恰当的方法。于是&#xff0c;我准备分享一个成功的编程课程应该具备的五大基本支柱。 菜鸟的目标是掌握编程的基…

form表单发送put、delete、patch请求的实现过程

关于发送put、delete、patch请求底层实现过程 对于put这些请求&#xff0c;我们无法直接通过form表单发送&#xff0c;form表单仅支持get和post请求&#xff1b; 虽然我们无法直接通过form表单发送这些请求&#xff0c;但我们可以以form表单为载体做二次请求&#xff1a;使用f…