Flink实时电商数仓之DWS层

news2025/1/11 10:01:22

需求分析

  • 关键词
    在这里插入图片描述
  • 统计关键词出现的频率

IK分词

进行分词需要引入IK分词器,使用它时需要引入相关的依赖。它能够将搜索的关键字按照日常的使用习惯进行拆分。比如将苹果iphone 手机,拆分为苹果,iphone, 手机。

<dependency>
    <groupId>org.apache.doris</groupId>
    <artifactId>flink-doris-connector-1.17</artifactId>
</dependency>

<dependency>
    <groupId>com.janeluo</groupId>
    <artifactId>ikanalyzer</artifactId>
</dependency>

测试代码如下:

public class IkUtil {
    public static void main(String[] args) throws IOException {
        String s = "Apple 苹果15 5G手机";
        StringReader stringReader = new StringReader(s);

        IKSegmenter ikSegmenter = new IKSegmenter(stringReader, true);//第二个参数表示是否再对拆分后的单词再进行拆分,true时表示不在继续拆分

        Lexeme next = ikSegmenter.next();
        while (next!= null) {
            System.out.println(next.getLexemeText());
            next = ikSegmenter.next();
        }
    }
}

整体流程

  1. 创建自定义分词工具类IKUtil,IK是一个分词工具依赖
  2. 创建自定义函数类
  3. 注册函数
  4. 消费kafka DWD页面主题数据并设置水位线
  5. 从主流中过滤搜索行为
    • page[‘item’] is not null
    • item_type : “keyword”
    • last_page_id: “search”
  6. 使用分词函数对keyword进行拆分
  7. 对keyword进行分组开窗聚合
  8. 写出到doris
    • 创建doris sink
    • flink需要打开检查点才能将数据写出到doris

在这里插入图片描述

具体实现

import com.atguigu.gmall.realtime.common.base.BaseSQLApp;
import com.atguigu.gmall.realtime.common.constant.Constant;
import com.atguigu.gmall.realtime.common.util.SQLUtil;
import com.atguigu.gmall.realtime.dws.function.KwSplit;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;

/**
 * title:
 *
 * @Author 浪拍岸
 * @Create 28/12/2023 上午11:06
 * @Version 1.0
 */
public class DwsTrafficSourceKeywordPageViewWindow extends BaseSQLApp {
    public static void main(String[] args) {
        new DwsTrafficSourceKeywordPageViewWindow().start(10021,4,"dws_traffic_source_keyword_page_view_window");
    }
    @Override
    public void handle(StreamExecutionEnvironment env, TableEnvironment tableEnv, String groupId) {
        //1. 读取主流dwd页面主题数据
        tableEnv.executeSql("create table page_info(\n" +
                "    `common` map<string,string>,\n" +
                "    `page` map<string,string>,\n" +
                "    `ts` bigint,\n" +
                "    `row_time` as to_timestamp_ltz(ts,3),\n" +
                "     WATERMARK FOR row_time AS row_time - INTERVAL '5' SECOND\n" +
                ")" + SQLUtil.getKafkaSourceSQL(Constant.TOPIC_DWD_TRAFFIC_PAGE, groupId));

        //测试是否获取到数据
        //tableEnv.executeSql("select * from page_info").print();

        //2. 筛选出关键字keywords
        Table keywrodTable = tableEnv.sqlQuery("select\n" +
                "    page['item'] keywords,\n" +
                "    `row_time`,\n" +
                "    ts\n" +
                " from page_info\n" +
                " where page['last_page_id'] = 'search'\n" +
                " and page['item_type'] = 'keyword'\n" +
                " and page['item'] is not null");
        tableEnv.createTemporaryView("keywords_table", keywrodTable);

        // 测试是否获取到数据
        //tableEnv.executeSql("select * from keywords_table").print();

        //3. 自定义分词函数并注册
        tableEnv.createTemporarySystemFunction("kwSplit", KwSplit.class );

        //4. 调用分词函数对keywords进行拆分
        Table splitKwTable = tableEnv.sqlQuery("select keywords, keyword, `row_time`" +
                " from keywords_table" +
                " left join lateral Table(kwSplit(keywords)) on true");
        tableEnv.createTemporaryView("split_kw_table", splitKwTable);

        //tableEnv.executeSql("select * from split_kw_table").print();

        //5. 对keyword进行分组开窗聚合
        Table windowAggTable = tableEnv.sqlQuery("select\n" +
                "    keyword,\n" +
                "    cast(tumble_start(row_time,interval '10' second ) as string) wStart,\n" +
                "    cast(tumble_end(row_time,interval '10' second ) as string) wEnd,\n" +
                "    cast(current_date as string)  cur_date,\n" +
                "    count(*) keyword_count\n" +
                "from split_kw_table\n" +
                "group by tumble(row_time, interval '10' second), keyword");

        //tableEnv.createTemporaryView("result_table",table);
        //tableEnv.executeSql("select keyword,keyword_count+1 from result_table").print();

        //6. 写出到doris
        tableEnv.executeSql("create table doris_sink\n" +
                "(\n" +
                "    keyword                STRING,\n" +
                "    wStart                 STRING,\n" +
                "    wEnd                   STRING,\n" +
                "    cur_date               STRING,\n" +
                "    keyword_count          BIGINT\n" +
                ")" + SQLUtil.getDorisSinkSQL(Constant.DWS_TRAFFIC_SOURCE_KEYWORD_PAGE_VIEW_WINDOW));

        windowAggTable.insertInto("doris_sink").execute();
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1341659.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

关于java选择结构switch及反编译

关于java选择结构switch及反编译 在上一篇文章中&#xff0c;我们了解了选择结构中的if else等&#xff0c;本章内容让我们说明一下上一篇文章中的伏笔&#xff0c;switch选择结构&#x1f914; switch多选择结构 多选择结构&#xff1a;多选择结构除了else if &#xff0c;…

Github 2023-12-29 开源项目日报 Top10

根据Github Trendings的统计&#xff0c;今日(2023-12-29统计)共有10个项目上榜。根据开发语言中项目的数量&#xff0c;汇总情况如下&#xff1a; 开发语言项目数量Java项目2HTML项目2TypeScript项目2Python项目2非开发语言项目2C项目1JavaScript项目1 精选面试问题列表 创…

Xamarin开发:商场促销(策略设计模式)

Xamarin开发:商场促销&#xff08;策略设计模式&#xff09; 一、介绍二、需求分析三、实现四、需求分析问题1解决方案问题2解决方案 五、增加新需求六、代码优化与分析总结 一、介绍 本文引用《大话设计模式》第二章节的内容进行学习分析&#xff0c;仅供学习使用 这里接着我…

centos7.9 TCP 加速

BBR是谷歌开发的新的TCP加速算法&#xff0c;在网络状况不好的服务器上开启TCP的bbr&#xff0c;可以在无需增加任何硬件投入的情况下实现网络加速&#xff0c;并且客户端无需做任何配置&#xff0c;因此使用起来非常的方便。TCP加速对网络状况较好的内网环境&#xff0c;或者大…

【数据结构和算法】找出两数组的不同

其他系列文章导航 Java基础合集数据结构与算法合集 设计模式合集 多线程合集 分布式合集 ES合集 文章目录 其他系列文章导航 文章目录 前言 一、题目描述 二、题解 2.1 哈希类算法题注意事项 2.2 方法一&#xff1a;哈希法 三、代码 3.1 方法一&#xff1a;哈希法 四…

Oracle(4)

子查询 子查询语法很简单&#xff0c;就是select 语句的嵌套使用。 查询工资比SCOTT高的员工信息 分析&#xff1a;两步即可完成 1. 查出SCOTT的工资 SQL> select ename, sal from emp where enameSCOTT 其工资3000 2. 查询比3000高的员工 SQL> select * from emp…

【零基础入门VUE】VueJS - 环境设置

✍面向读者&#xff1a;所有人 ✍所属专栏&#xff1a;零基础入门VUE专栏https://blog.csdn.net/arthas777/category_12537076.html 直接在 HTML 文件中使用 <script> 标签 <html><head><script type "text/javascript" src "vue.min.j…

Spark 集群搭建

文章目录 搭建前准备安装搭建解压并重命名环境变量配置配置文件yarn-site.xmlspark-env.sh 官网求 π(PI) 案例启动spark-shell通过浏览器查看显示查看 Spark 的网页信息展示 搭建前准备 下载地址&#xff1a;Index of /dist/spark (apache.org) 配置好 hadoop 环境&#xff…

实战 | 使用OpenCV快速去除文档中的表格线条(步骤 + 源码)

导 读 本文主要介绍如何使用OpenCV快速去除文档中的表格线条,并给详细步骤和代码。 背景介绍 测试图如下,目标是去除下面三张图中的表格线条,方便后续图像处理。 实现步骤 下面演示详细步骤,以图1为例: 【1】获取二值图像:加载图像、转为灰度图、OTSU二值化 i…

Awesome Chrome Form UI - 框架设计与基础实现

Money is not evil by itself. Its just paper with perceived value to obtain other things we value in other ways. If not money what is evil you may ask? Evil is the unquenchable, obsessive and moral bending desire for more. Evil is the bottomless,soulless …

多模态大模型-CogVLm 论文阅读笔记

多模态大模型-CogVLm 论文阅读笔记 COGVLM: VISUAL EXPERT FOR LARGE LANGUAGEMODELS 论文地址 :https://arxiv.org/pdf/2311.03079.pdfcode地址 : https://github.com/THUDM/CogVLM时间 : 2023-11机构 : zhipuai,tsinghua关键词: visual language model效果:&#xff08;2023…

C++面向对象(OOP)编程-C++11新特性详解

C11作为一个重要的版本&#xff0c;引入了很多新的特性&#xff0c;解决了C语言本身很多遗留的内存泄露问题&#xff0c;并且提供了很多比较灵活的用法。引入的auto&#xff0c;智能指针、线程机制都使得C语言的灵活性、安全性、并发性有了很大的提升。 本文会比较详细的介绍C1…

医疗行业的信息安全现状

文章目录 前言一、医疗行业相关政策法规二、“互联网+医疗健康”推进信息安全建设三、医疗行业网络安全形势依然严峻1、等级保护工作未全面开展落实2、医疗行业网络安全风险较高医疗行业网络安全隐患普遍存在遭受勒索病毒攻击严重3、安全防护水平相对落后缺乏必要的网络安全防护…

QT应用篇 三、QML自定义显示SpinBox的加减按键图片及显示值效果

QT应用篇 一、QT上位机串口编程 二、QML用Image组件实现Progress Bar 的效果 三、QML自定义显示SpinBox的加减按键图片及显示值效果 文章目录 QT应用篇前言一、qml需求二、使用组件1.SpinBox组件2.SpinBox中QML的使用 总结 前言 记录自己学习QML的一些小技巧方便日后查找 QT的…

rsync备份工具

有了同步源服务器之后&#xff0c;就可以使用rsync工具来执行远程同步了&#xff0c;本节介绍的备份操作均在客 户机&#xff08;发起端&#xff09;执行&#xff0c;实际上&#xff0c;同步源与发起端可以是同一台主机&#xff08;当然这种情况不常见&#xff09;&#xff0c;…

腾讯云轻量服务器和云服务器区别对比(超详细)

腾讯云轻量服务器和云服务器CVM该怎么选&#xff1f;不差钱选云服务器CVM&#xff0c;追求性价比选择轻量应用服务器&#xff0c;轻量真优惠呀&#xff0c;活动 https://curl.qcloud.com/oRMoSucP 轻量应用服务器2核2G3M价格62元一年、2核2G4M价格118元一年&#xff0c;540元三…

【12.28】转行小白历险记-刷算法04

01两两交换链表中的节点 整体思路 1.要修改后一个节点的指向一定要知道前一个节点的指向才可以改变后面一个节点的 2.分情况奇数和偶数节点&#xff0c;终止条件很重要 3.虚拟头节点&#xff0c;是对我们操作的指针是不是头节点进行判断 02删除链表的倒数第N个节点 思路 …

大数据与人工智能|万物皆算法(第三节)

要点一&#xff1a;数据与智能的关系 1. 一切的核心都是数据&#xff0c;数据和智能之间是密切相关的。 数据是对客观现实的描述&#xff0c;而信息是数据转化而来的。 例如&#xff0c;24是数据&#xff0c;但说“今天的气温是24摄氏度”是信息&#xff0c;而说“班可以分成24…

oracle与mysql的分析函数(窗口函数)

分析函数定义 在SQL语句中&#xff0c;很多查询语句需要进行GROUP BY分组汇总&#xff0c;但是一旦经过分组&#xff0c;SELECT返回的记录数就会减少。为了保留所有原始行记录&#xff0c;并且仍可以进行分组数据分析&#xff0c;分析函数应运而生。 Oracle 8i 版本开始支持窗…

java在线票务系统(选座)Myeclipse开发mysql数据库web结构java编程计算机网页项目

一、源码特点 java servlet 在线票务系统&#xff08;选座&#xff09;管理系统是一套完善的java web信息管理系统 系统采用serlvetdaobean&#xff08;mvc模式)&#xff0c;对理解JSP java编程开发语言有帮助&#xff0c;系统具有完整的源代码和数据库&#xff0c;系统主要…