尚硅谷大数据项目《在线教育之实时数仓》笔记008

news2024/12/31 6:51:22

视频地址:尚硅谷大数据项目《在线教育之实时数仓》_哔哩哔哩_bilibili

目录

第10章 数仓开发之DWS层

P066

P067

P068

P069

P070

P071

P072

P073

P074

P075

P076

P077

P078

P079

P080

P081

P082


第10章 数仓开发之DWS层

P066

第10章 数仓开发之DWS层

设计要点:
(1)DWS层的设计参考指标体系。
(2)DWS层表名的命名规范为dws_数据域_统计粒度_业务过程_统计周期(window)。
注:window 表示窗口对应的时间范围。
10.1 流量域来源关键词粒度页面浏览各窗口汇总表
10.1.1 主要任务

    从 Kafka 页面浏览明细主题读取数据,过滤搜索行为,使用自定义 UDTF(一进多出)函数对搜索内容分词。统计各窗口各关键词出现频次,写入 ClickHouse。
10.1.2 思路分析

尚硅谷大数据项目之在线教育数仓\尚硅谷大数据项目之在线教育数仓-3实时\资料\13.总线矩阵及指标体系

在线教育实时指标体系.xlsx

P067

DwsTrafficSourceKeywordPageViewWindow

//TODO 1 创建环境设置状态后端
//TODO 2 自定义拆词函数
//TODO 3 读取kafka中的page_log数据
//TODO 4 过滤数据得到搜索的关键字
//TODO 5 使用自定义函数对关键字拆词
//TODO 6 分组开窗合并计算
//TODO 7 转换为流
//TODO 8 写出到clickHouse中
//TODO 9 运行任务

package com.atguigu.edu.realtime.util;

import org.wltea.analyzer.core.IKSegmenter;
import org.wltea.analyzer.core.Lexeme;

import java.io.IOException;
import java.io.StringReader;
import java.util.ArrayList;

/**
 * @author yhm
 * @create 2023-04-25 16:05
 */
public class KeyWordUtil {
    public static ArrayList<String> analyze(String text) {
        StringReader reader = new StringReader(text);
        IKSegmenter ikSegmenter = new IKSegmenter(reader, true);
        ArrayList<String> strings = new ArrayList<>();
        try {
            Lexeme lexeme = null;
            while ((lexeme = ikSegmenter.next()) != null) {
                String keyWord = lexeme.getLexemeText();
                strings.add(keyWord);
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
        return strings;
    }

    public static void main(String[] args) {
        String s = "Apple iPhoneXSMax (A2104) 256GB 深空灰色 移动联通电信4G手机 双卡双待";
        ArrayList<String> strings = analyze(s);
        System.out.println(strings);
    }
}

P068

User-defined Functions | Apache Flink

DwsTrafficSourceKeywordPageViewWindow

//TODO 1 创建环境设置状态后端
//TODO 2 自定义拆词函数
//TODO 3 读取kafka中的page_log数据
//TODO 4 过滤数据得到搜索的关键字
//TODO 5 使用自定义函数对关键字拆词

//TODO 6 分组开窗合并计算
//TODO 7 转换为流
//TODO 8 写出到clickHouse中
//TODO 9 运行任务

P069

package com.atguigu.edu.realtime.app.dws;

import com.atguigu.edu.realtime.app.func.KeyWordUDTF;
import com.atguigu.edu.realtime.bean.KeywordBean;
import com.atguigu.edu.realtime.common.EduConfig;
import com.atguigu.edu.realtime.common.EduConstant;
import com.atguigu.edu.realtime.util.ClickHouseUtil;
import com.atguigu.edu.realtime.util.EnvUtil;
import com.atguigu.edu.realtime.util.KafkaUtil;
import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
import org.apache.flink.connector.jdbc.JdbcSink;
import org.apache.flink.connector.jdbc.JdbcStatementBuilder;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

import java.sql.PreparedStatement;
import java.sql.SQLException;

/**
 * @author yhm
 * @create 2023-04-25 16:01
 */
public class DwsTrafficSourceKeywordPageViewWindow {
    public static void main(String[] args) throws Exception {
        //TODO 1 创建环境设置状态后端
        StreamExecutionEnvironment env = EnvUtil.getExecutionEnvironment(1);
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

        //TODO 2 自定义拆词函数
        tableEnv.createTemporarySystemFunction("ik_analyze", new KeyWordUDTF());

        //TODO 3 读取kafka中的page_log数据
        String topicName = "dwd_traffic_page_log";
        String groupId = "dws_traffic_source_keyword_page_view_window";
        tableEnv.executeSql("create table page_log(\n" +
                "    common map<String,String>,\n" +
                "    page map<String,String>,\n" +
                "    ts bigint, \n" +
                "    row_time as to_timestamp(from_unixtime(ts/1000,'yyyy-MM-dd HH:mm:ss')), \n" +
                "    WATERMARK FOR row_time AS row_time - INTERVAL '3' SECOND" +
                ")" + KafkaUtil.getKafkaDDL(topicName, groupId));

        //TODO 4 过滤数据得到搜索的关键字
        //① page 字段下 item 字段不为 null;
        //② page 字段下 last_page_id 为 search;
        //③ page 字段下 item_type 为 keyword。
        Table searchTable = tableEnv.sqlQuery("select \n" +
                "    page['item'] full_word,\n" +
                "    row_time\n" +
                "from page_log\n" +
                "where page['item'] is not null \n" +
                "and page['item_type'] ='keyword'\n" +
                // "and page['last_page_id'] = 'search'" +
                "");
        tableEnv.createTemporaryView("search_table", searchTable);

        //TODO 5 使用自定义函数对关键字拆词
        Table splitTable = tableEnv.sqlQuery("select \n" +
                "    keyword,\n" +
                "    row_time\n" +
                "from search_table ,\n" +
                "lateral table (ik_analyze(full_word)) as t(keyword)");
        tableEnv.createTemporaryView("split_table", splitTable);
        tableEnv.executeSql("select * from split_table").print();

        //TODO 6 分组开窗合并计算

        //TODO 7 转换为流

        //TODO 8 写出到clickHouse中

        //TODO 9 运行任务
    }
}

P070

Window Aggregation | Apache Flink

P071

10.1.4 ClickHouse 建表语句

drop table if exists dws_traffic_source_keyword_page_view_window;

create table if not exists dws_traffic_source_keyword_page_view_window

(

    stt           DateTime,

    edt           DateTime,

    source        String,

    keyword       String,

    keyword_count UInt64,

    ts            UInt64

) engine = ReplacingMergeTree(ts)

      partition by toYYYYMMDD(stt)

      order by (stt, edt, source, keyword);

package com.atguigu.edu.realtime.util;

import com.atguigu.edu.realtime.bean.KeywordBean;
import com.atguigu.edu.realtime.bean.TransientSink;
import com.atguigu.edu.realtime.common.EduConfig;
import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
import org.apache.flink.connector.jdbc.JdbcSink;
import org.apache.flink.connector.jdbc.JdbcStatementBuilder;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;

import java.lang.reflect.Field;
import java.sql.PreparedStatement;
import java.sql.SQLException;

/**
 * @author yhm
 * @create 2023-04-25 18:23
 */
public class ClickHouseUtil {
    // 设计泛型 通过传入的数据类型自动补充sql 写出到clickhouse
    public static <T> SinkFunction<T> getJdbcSink(String sql) {
        return JdbcSink.<T>sink(sql, new JdbcStatementBuilder<T>() {
                    @Override
                    public void accept(PreparedStatement preparedStatement, T obj) throws SQLException {
                        // T是泛型,明文是不知道什么类型的,需要使用反射获取
                        Field[] declaredFields = obj.getClass().getDeclaredFields();
                        int skip = 0;
                        for (int i = 0; i < declaredFields.length; i++) {
                            Field field = declaredFields[i];
                            field.setAccessible(true);

                            // 获取属性的注解
                            TransientSink annotation = field.getAnnotation(TransientSink.class);
                            if (annotation != null) {
                                skip++;
                                continue;
                            }

                            // 使用类模板的属性名 get对象 获取值
                            try {
                                Object o = field.get(obj);
                                preparedStatement.setObject(i + 1 - skip, o);
                            } catch (IllegalAccessException e) {
                                e.printStackTrace();
                            }
                        }
                    }
                }, JdbcExecutionOptions.builder()
                        .withBatchIntervalMs(5000L)
                        .withBatchSize(5)
                        .build(),
                new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
                        .withUrl(EduConfig.CLICKHOUSE_URL)
                        .withDriverName(EduConfig.CLICKHOUSE_DRIVER)
                        .build());
    }
}

[atguigu@node001 ~]$ sudo systemctl start clickhouse-server
[atguigu@node001 ~]$ clickhouse-client -m

node001 :) SHOW DATABASES;

node001 :) CREATE DATABASE edu_realtime;
node001 :) SHOW DATABASES;

node001 :) USE edu_realtime;
node001 :) SHOW TABLES FROM edu_realtime;

node001 :) SELECT * FROM dws_traffic_source_keyword_page_view_window;

[atguigu@node001 ~]$ sudo systemctl start clickhouse-server
[atguigu@node001 ~]$ clickhouse-client -m
ClickHouse client version 20.4.5.36 (official build).
Connecting to localhost:9000 as user default.
Connected to ClickHouse server version 20.4.5 revision 54434.

node001 :) CREATE DATABASE edu_realtime;

CREATE DATABASE edu_realtime

Ok.

0 rows in set. Elapsed: 0.044 sec. 

node001 :) SHOW DATABASES;

SHOW DATABASES

┌─name───────────────────────────┐
│ _temporary_and_external_tables │
│ default                        │
│ edu_realtime                   │
│ system                         │
└────────────────────────────────┘

4 rows in set. Elapsed: 0.031 sec. 

node001 :) SHOW TABLES FROM edu_realtime;

SHOW TABLES FROM edu_realtime

Ok.

0 rows in set. Elapsed: 0.028 sec. 

node001 :) use edu_realtime;

USE edu_realtime

Ok.

0 rows in set. Elapsed: 0.002 sec. 

node001 :) create table if not exists dws_traffic_source_keyword_page_view_window
:-] (
:-]     stt           DateTime,
:-]     edt           DateTime,
:-]     source        String,
:-]     keyword       String,
:-]     keyword_count UInt64,
:-]     ts            UInt64
:-] ) engine = ReplacingMergeTree(ts)
:-]       partition by toYYYYMMDD(stt)
:-]       order by (stt, edt, source, keyword);

CREATE TABLE IF NOT EXISTS dws_traffic_source_keyword_page_view_window
(
    `stt` DateTime, 
    `edt` DateTime, 
    `source` String, 
    `keyword` String, 
    `keyword_count` UInt64, 
    `ts` UInt64
)
ENGINE = ReplacingMergeTree(ts)
PARTITION BY toYYYYMMDD(stt)
ORDER BY (stt, edt, source, keyword)

Ok.

0 rows in set. Elapsed: 0.016 sec. 

node001 :) use edu_realtime;
USE edu_realtime

Ok.

0 rows in set. Elapsed: 0.002 sec. 

node001 :) SHOW TABLES FROM edu_realtime;
SHOW TABLES FROM edu_realtime

┌─name────────────────────────────────────────┐
│ dws_traffic_source_keyword_page_view_window │
└─────────────────────────────────────────────┘

1 rows in set. Elapsed: 0.007 sec. 

node001 :) SELECT * FROM dws_traffic_source_keyword_page_view_window;

SELECT *
FROM dws_traffic_source_keyword_page_view_window

┌─────────────────stt─┬─────────────────edt─┬─source─┬─keyword─┬─keyword_count─┬────────────ts─┐
│ 2022-02-21 23:58:30 │ 2022-02-21 23:58:40 │ SEARCH │ java    │            19 │ 1699513951000 │
│ 2022-02-21 23:58:30 │ 2022-02-21 23:58:40 │ SEARCH │ 前端    │            27 │ 1699513951000 │
│ 2022-02-21 23:58:50 │ 2022-02-21 23:59:00 │ SEARCH │ 前端    │            14 │ 1699513951000 │
│ 2022-02-21 23:59:00 │ 2022-02-21 23:59:10 │ SEARCH │ 大      │            19 │ 1699513951000 │
│ 2022-02-21 23:59:00 │ 2022-02-21 23:59:10 │ SEARCH │ 数据库  │             4 │ 1699513951000 │
└─────────────────────┴─────────────────────┴────────┴─────────┴───────────────┴───────────────┘
┌─────────────────stt─┬─────────────────edt─┬─source─┬─keyword─┬─keyword_count─┬────────────ts─┐
│ 2022-02-21 23:59:50 │ 2022-02-22 00:00:00 │ SEARCH │ hadoop  │            19 │ 1699513951000 │
│ 2022-02-21 23:59:50 │ 2022-02-22 00:00:00 │ SEARCH │ python  │            19 │ 1699513951000 │
│ 2022-02-21 23:59:50 │ 2022-02-22 00:00:00 │ SEARCH │ 前端    │            39 │ 1699513951000 │
│ 2022-02-21 23:59:50 │ 2022-02-22 00:00:00 │ SEARCH │ 数据库  │            19 │ 1699513951000 │
└─────────────────────┴─────────────────────┴────────┴─────────┴───────────────┴───────────────┘
┌─────────────────stt─┬─────────────────edt─┬─source─┬─keyword─┬─keyword_count─┬────────────ts─┐
│ 2022-02-21 23:59:00 │ 2022-02-21 23:59:10 │ SEARCH │ 数据    │            19 │ 1699513951000 │
│ 2022-02-21 23:59:20 │ 2022-02-21 23:59:30 │ SEARCH │ java    │            33 │ 1699513951000 │
│ 2022-02-21 23:59:30 │ 2022-02-21 23:59:40 │ SEARCH │ flink   │            20 │ 1699513951000 │
│ 2022-02-21 23:59:30 │ 2022-02-21 23:59:40 │ SEARCH │ java    │            20 │ 1699513951000 │
│ 2022-02-21 23:59:30 │ 2022-02-21 23:59:40 │ SEARCH │ 前端    │            19 │ 1699513951000 │
└─────────────────────┴─────────────────────┴────────┴─────────┴───────────────┴───────────────┘
┌─────────────────stt─┬─────────────────edt─┬─source─┬─keyword─┬─keyword_count─┬────────────ts─┐
│ 2022-02-22 00:00:10 │ 2022-02-22 00:00:20 │ SEARCH │ 多线程  │            20 │ 1699513951000 │
└─────────────────────┴─────────────────────┴────────┴─────────┴───────────────┴───────────────┘
┌─────────────────stt─┬─────────────────edt─┬─source─┬─keyword─┬─keyword_count─┬────────────ts─┐
│ 2022-02-22 00:00:20 │ 2022-02-22 00:00:30 │ SEARCH │ flink   │             4 │ 1699513951000 │
│ 2022-02-22 00:00:20 │ 2022-02-22 00:00:30 │ SEARCH │ java    │            27 │ 1699513951000 │
└─────────────────────┴─────────────────────┴────────┴─────────┴───────────────┴───────────────┘
┌─────────────────stt─┬─────────────────edt─┬─source─┬─keyword─┬─keyword_count─┬────────────ts─┐
│ 2022-02-21 20:51:40 │ 2022-02-21 20:51:50 │ SEARCH │ 多线程  │             1 │ 1699513903000 │
│ 2022-02-21 20:52:00 │ 2022-02-21 20:52:10 │ SEARCH │ hadoop  │             1 │ 1699449059000 │
│ 2022-02-21 20:53:10 │ 2022-02-21 20:53:20 │ SEARCH │ 多线程  │             1 │ 1699447298000 │
│ 2022-02-21 20:54:20 │ 2022-02-21 20:54:30 │ SEARCH │ 大      │             1 │ 1699447298000 │
│ 2022-02-21 20:54:20 │ 2022-02-21 20:54:30 │ SEARCH │ 数据    │             1 │ 1699447298000 │
│ 2022-02-21 23:59:50 │ 2022-02-22 00:00:00 │ SEARCH │ 前端    │             3 │ 1699449067000 │
│ 2022-02-21 23:59:50 │ 2022-02-22 00:00:00 │ SEARCH │ 数据库  │             1 │ 1699449067000 │
└─────────────────────┴─────────────────────┴────────┴─────────┴───────────────┴───────────────┘
┌─────────────────stt─┬─────────────────edt─┬─source─┬─keyword─┬─keyword_count─┬────────────ts─┐
│ 2022-02-22 00:00:10 │ 2022-02-22 00:00:20 │ SEARCH │ 多线程  │             2 │ 1699449067000 │
└─────────────────────┴─────────────────────┴────────┴─────────┴───────────────┴───────────────┘

1003 rows in set. Elapsed: 0.114 sec. Processed 1.00 thousand rows, 54.17 KB (8.79 thousand rows/s., 474.47 KB/s.) 

node001 :) 

package com.atguigu.edu.realtime.app.dws;

import com.atguigu.edu.realtime.app.func.KeyWordUDTF;
import com.atguigu.edu.realtime.bean.KeywordBean;
import com.atguigu.edu.realtime.common.EduConfig;
import com.atguigu.edu.realtime.common.EduConstant;
import com.atguigu.edu.realtime.util.ClickHouseUtil;
import com.atguigu.edu.realtime.util.EnvUtil;
import com.atguigu.edu.realtime.util.KafkaUtil;
import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
import org.apache.flink.connector.jdbc.JdbcSink;
import org.apache.flink.connector.jdbc.JdbcStatementBuilder;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

import java.sql.PreparedStatement;
import java.sql.SQLException;

/**
 * @author yhm
 * @create 2023-04-25 16:01
 */
public class DwsTrafficSourceKeywordPageViewWindow {
    public static void main(String[] args) throws Exception {
        //TODO 1 创建环境设置状态后端
        StreamExecutionEnvironment env = EnvUtil.getExecutionEnvironment(1);
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

        //TODO 2 自定义拆词函数
        tableEnv.createTemporarySystemFunction("ik_analyze", new KeyWordUDTF());

        //TODO 3 读取kafka中的page_log数据
        String topicName = "dwd_traffic_page_log";
        String groupId = "dws_traffic_source_keyword_page_view_window";
        tableEnv.executeSql("create table page_log(\n" +
                "    common map<String,String>,\n" +
                "    page map<String,String>,\n" +
                "    ts bigint, \n" +
                "    row_time as to_timestamp(from_unixtime(ts/1000,'yyyy-MM-dd HH:mm:ss')), \n" +
                "    WATERMARK FOR row_time AS row_time - INTERVAL '3' SECOND" +
                ")" + KafkaUtil.getKafkaDDL(topicName, groupId));

        //TODO 4 过滤数据得到搜索的关键字
        //① page 字段下 item 字段不为 null;
        //② page 字段下 last_page_id 为 search;
        //③ page 字段下 item_type 为 keyword。
        Table searchTable = tableEnv.sqlQuery("select \n" +
                "    page['item'] full_word,\n" +
                "    row_time\n" +
                "from page_log\n" +
                "where page['item'] is not null \n" +
                "and page['item_type'] ='keyword'\n" +
                // "and page['last_page_id'] = 'search'" +
                "");
        tableEnv.createTemporaryView("search_table", searchTable);

        //TODO 5 使用自定义函数对关键字拆词
        Table splitTable = tableEnv.sqlQuery("select \n" +
                "    keyword,\n" +
                "    row_time\n" +
                "from search_table ,\n" +
                "lateral table (ik_analyze(full_word)) as t(keyword)");
        tableEnv.createTemporaryView("split_table", splitTable);
        //tableEnv.executeSql("select * from split_table").print();

        //TODO 6 分组开窗合并计算
        Table keywordBeanTable = tableEnv.sqlQuery("select \n" +
                "    date_format(TUMBLE_START(\n" +
                "    row_time, INTERVAL '10' second),'yyyy-MM-dd HH:mm:ss') stt,\n" +
                "    date_format(TUMBLE_END(\n" +
                "    row_time, INTERVAL '10' second),'yyyy-MM-dd HH:mm:ss') edt,\n" +
                "\n" + "'" + EduConstant.KEYWORD_SEARCH + "' source," +
                "    0 keywordLength,\n" +
                "    keyword,\n" +
                "    count(*) keyword_count,\n" +
                "    UNIX_TIMESTAMP()*1000 ts\n" +
                "from split_table\n" +
                "group by TUMBLE(row_time, INTERVAL '10' second),keyword");

        //TODO 7 转换为流
        DataStream<KeywordBean> keywordBeanDataStream = tableEnv.toDataStream(keywordBeanTable, KeywordBean.class);
        keywordBeanDataStream.print();

        //TODO 8 写出到clickHouse中
        keywordBeanDataStream.addSink(ClickHouseUtil.<KeywordBean>getJdbcSink("insert into dws_traffic_source_keyword_page_view_window values(?,?,?,?,?,?)"));

        //TODO 9 运行任务
        env.execute();
    }
}

P072

package com.atguigu.edu.realtime.bean;

import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;

/**
 * @author yhm
 * @create 2023-04-25 18:37
 */
@Target(ElementType.FIELD)
@Retention(RetentionPolicy.RUNTIME)
public @interface TransientSink {
}

P073

10.2 流量域版本-来源-地区-访客类别粒度页面浏览各窗口汇总表
10.2.1 主要任务

DWS 层是为 ADS 层服务的,通过对指标体系的分析,本节汇总表中需要有会话数、页面浏览数、浏览总时长、独立访客数、跳出会话数五个度量字段。我们的任务是统计这五个指标,并将数据写入 ClickHouse 汇总表。

P074

DwsTrafficVcSourceArIsNewPageViewWindow

TODO 1 ~ TODO 6

P075

DwsTrafficVcSourceArIsNewPageViewWindow

TODO 7 ~ TODO 9

P076

PhoenixUtil、public static <T> List<T> queryList(String sql, Class<T> clazz) {}

P077

DimUtil、public static JSONObject getDimInfoNoCache(String tableName, Tuple2<String, String>... columnNamesAn {}

[atguigu@node001 ~]$ start-hbase.sh
[atguigu@node001 ~]$ cd /opt/module/hbase/apache-phoenix-5.0.0-HBase-2.0-bin/
[atguigu@node001 apache-phoenix-5.0.0-HBase-2.0-bin]$ bin/sqlline.py node001:2181

P078

DwsTrafficVcSourceArIsNewPageViewWindow

TODO 10

P079

10.2 流量域版本-来源-地区-访客类别粒度页面浏览各窗口汇总表

10.2.2 思路分析

4)旁路缓存优化

外部数据源的查询常常是流式计算的性能瓶颈。以本程序为例,每次查询都要连接 Hbase,数据传输需要做序列化、反序列化,还有网络传输,严重影响时效性。可以通过旁路缓存对查询进行优化。

P080

DimUtil、public static JSONObject getDimInfo(String tableName, Tuple2<String, String>... columnNamesAndValues) {}

P081

DimUtil 、public static void deleteCached(String tableName, String id) {}

[atguigu@node001 ~]$ redis-server ./my_redis.conf 
[atguigu@node001 ~]$ redis-cli 
127.0.0.1:6379> ping
PONG
127.0.0.1:6379> 
[atguigu@node001 ~]$ /opt/module/hbase/apache-phoenix-5.0.0-HBase-2.0-bin/bin/sqlline.py node001:2181

P082

10.2 流量域版本-来源-地区-访客类别粒度页面浏览各窗口汇总表

10.2.2 思路分析

6)异步 IO

DwsTrafficVcSourceArIsNewPageViewWindow

//TODO 10 维度关联

[atguigu@node001 ~]$ clickhouse-client -m
ClickHouse client version 20.4.5.36 (official build).
Connecting to localhost:9000 as user default.
Connected to ClickHouse server version 20.4.5 revision 54434.

node001 :) show databases;

SHOW DATABASES

┌─name───────────────────────────┐
│ _temporary_and_external_tables │
│ default                        │
│ edu_realtime                   │
│ system                         │
└────────────────────────────────┘

4 rows in set. Elapsed: 0.019 sec. 

node001 :) use edu_realtime;

USE edu_realtime

Ok.

0 rows in set. Elapsed: 0.007 sec. 

node001 :) drop table if exists dws_traffic_vc_source_ar_is_new_page_view_window;

DROP TABLE IF EXISTS dws_traffic_vc_source_ar_is_new_page_view_window

Ok.

0 rows in set. Elapsed: 0.007 sec. 

node001 :) create table dws_traffic_vc_source_ar_is_new_page_view_window(
:-] stt DateTime,
:-] edt DateTime,
:-] version_code String,
:-] source_id String,
:-] source_name String,
:-] ar String,
:-] province_name String,
:-] is_new String,
:-] uv_count UInt64,
:-] total_session_count UInt64,
:-] page_view_count UInt64,
:-] total_during_time UInt64,
:-] jump_session_count UInt64,
:-] ts UInt64
:-] ) engine = ReplacingMergeTree(ts)
:-] partition by toYYYYMMDD(stt)
:-] order by(stt, edt, version_code, source_id, source_name, ar, province_name, is_new);

CREATE TABLE dws_traffic_vc_source_ar_is_new_page_view_window
(
    `stt` DateTime, 
    `edt` DateTime, 
    `version_code` String, 
    `source_id` String, 
    `source_name` String, 
    `ar` String, 
    `province_name` String, 
    `is_new` String, 
    `uv_count` UInt64, 
    `total_session_count` UInt64, 
    `page_view_count` UInt64, 
    `total_during_time` UInt64, 
    `jump_session_count` UInt64, 
    `ts` UInt64
)
ENGINE = ReplacingMergeTree(ts)
PARTITION BY toYYYYMMDD(stt)
ORDER BY (stt, edt, version_code, source_id, source_name, ar, province_name, is_new)

Ok.

0 rows in set. Elapsed: 0.043 sec. 

node001 :) select * from edu_realtime.dws_traffic_vc_source_ar_is_new_page_view_window;

SELECT *
FROM edu_realtime.dws_traffic_vc_source_ar_is_new_page_view_window

Ok.

0 rows in set. Elapsed: 0.071 sec. 

node001 :) 
package com.atguigu.edu.realtime.app.dws;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.atguigu.edu.realtime.app.func.DimAsyncFunction;
import com.atguigu.edu.realtime.bean.DwsTrafficForSourcePvBean;
import com.atguigu.edu.realtime.util.*;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.streaming.api.datastream.*;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;

import java.time.Duration;
import java.util.concurrent.TimeUnit;

/**
 * @author yhm
 * @create 2023-04-26 16:08
 */
public class DwsTrafficVcSourceArIsNewPageViewWindow {
    public static void main(String[] args) throws Exception {
        //TODO 1 创建环境设置状态后端
        StreamExecutionEnvironment env = EnvUtil.getExecutionEnvironment(1);

        //TODO 2 读取pageLog主题数据
        String pageTopic = "dwd_traffic_page_log";
        String groupId = "dws_traffic_vc_source_ar_is_new_page_view_window";
        KafkaSource<String> pageSource = KafkaUtil.getKafkaConsumer(pageTopic, groupId);
        DataStreamSource<String> pageStream = env.fromSource(pageSource, WatermarkStrategy.noWatermarks(), "page_log");

        //TODO 3 读取独立访客数据
        String uvTopic = "dwd_traffic_unique_visitor_detail";
        KafkaSource<String> uvSource = KafkaUtil.getKafkaConsumer(uvTopic, groupId);
        DataStreamSource<String> uvStream = env.fromSource(uvSource, WatermarkStrategy.noWatermarks(), "uv_detail");

        //TODO 4 读取跳出用户数据
        String jumpTopic = "dwd_traffic_user_jump_detail";
        KafkaSource<String> jumpSource = KafkaUtil.getKafkaConsumer(jumpTopic, groupId);
        DataStreamSource<String> jumpStream = env.fromSource(jumpSource, WatermarkStrategy.noWatermarks(), "jump_detail");

        //TODO 5 转换数据结构
        SingleOutputStreamOperator<DwsTrafficForSourcePvBean> pageBeanStream = pageStream.map(new MapFunction<String, DwsTrafficForSourcePvBean>() {
            @Override
            public DwsTrafficForSourcePvBean map(String value) throws Exception {
                // 将page_log的一条日志转换为一个对应的javaBean
                JSONObject jsonObject = JSON.parseObject(value);
                JSONObject common = jsonObject.getJSONObject("common");
                JSONObject page = jsonObject.getJSONObject("page");
                Long ts = jsonObject.getLong("ts");

                return DwsTrafficForSourcePvBean.builder()
                        .versionCode(common.getString("vc"))
                        .sourceId(common.getString("sc"))
                        .ar(common.getString("ar"))
                        .isNew(common.getString("is_new"))
                        .uvCount(0L)
                        .totalSessionCount(page.getString("last_page_id") == null ? 1L : 0L)
                        .pageViewCount(1L)
                        .totalDuringTime(page.getLong("during_time"))
                        .jumpSessionCount(0L)
                        .ts(ts)
                        .build();
            }
        });

        SingleOutputStreamOperator<DwsTrafficForSourcePvBean> uvBeanStream = uvStream.map(new MapFunction<String, DwsTrafficForSourcePvBean>() {
            @Override
            public DwsTrafficForSourcePvBean map(String value) throws Exception {
                // 将page_log的一条日志转换为一个对应的javaBean
                JSONObject jsonObject = JSON.parseObject(value);
                JSONObject common = jsonObject.getJSONObject("common");
                Long ts = jsonObject.getLong("ts");

                return DwsTrafficForSourcePvBean.builder()
                        .versionCode(common.getString("vc"))
                        .sourceId(common.getString("sc"))
                        .ar(common.getString("ar"))
                        .isNew(common.getString("is_new"))
                        .uvCount(1L)
                        .totalSessionCount(0L)
                        .pageViewCount(0L)
                        .totalDuringTime(0L)
                        .jumpSessionCount(0L)
                        .ts(ts)
                        .build();
            }
        });

        SingleOutputStreamOperator<DwsTrafficForSourcePvBean> jumpBeanStream = jumpStream.map(new MapFunction<String, DwsTrafficForSourcePvBean>() {
            @Override
            public DwsTrafficForSourcePvBean map(String value) throws Exception {
                // 将page_log的一条日志转换为一个对应的javaBean
                JSONObject jsonObject = JSON.parseObject(value);
                JSONObject common = jsonObject.getJSONObject("common");
                Long ts = jsonObject.getLong("ts");

                return DwsTrafficForSourcePvBean.builder()
                        .versionCode(common.getString("vc"))
                        .sourceId(common.getString("sc"))
                        .ar(common.getString("ar"))
                        .isNew(common.getString("is_new"))
                        .uvCount(0L)
                        .totalSessionCount(0L)
                        .pageViewCount(0L)
                        .totalDuringTime(0L)
                        .jumpSessionCount(1L)
                        .ts(ts)
                        .build();
            }
        });

        //TODO 6 合并3条数据流
        DataStream<DwsTrafficForSourcePvBean> unionStream = pageBeanStream.union(uvBeanStream).union(jumpBeanStream);

        //TODO 7 添加水位线
        SingleOutputStreamOperator<DwsTrafficForSourcePvBean> withWaterMarkStream = unionStream.assignTimestampsAndWatermarks(WatermarkStrategy.<DwsTrafficForSourcePvBean>forBoundedOutOfOrderness(Duration.ofSeconds(15L)).withTimestampAssigner(new SerializableTimestampAssigner<DwsTrafficForSourcePvBean>() {
            @Override
            public long extractTimestamp(DwsTrafficForSourcePvBean element, long recordTimestamp) {
                return element.getTs();
            }
        }));

        //TODO 8 分组开窗
        WindowedStream<DwsTrafficForSourcePvBean, String, TimeWindow> windowStream = withWaterMarkStream.keyBy(new KeySelector<DwsTrafficForSourcePvBean, String>() {
            @Override
            public String getKey(DwsTrafficForSourcePvBean value) throws Exception {
                return value.getVersionCode()
                        + value.getSourceId()
                        + value.getAr()
                        + value.getIsNew();
            }
        }).window(TumblingEventTimeWindows.of(Time.seconds(10L)));

        //TODO 9 聚合统计
        SingleOutputStreamOperator<DwsTrafficForSourcePvBean> reduceStream = windowStream.reduce(new ReduceFunction<DwsTrafficForSourcePvBean>() {
            @Override
            public DwsTrafficForSourcePvBean reduce(DwsTrafficForSourcePvBean value1, DwsTrafficForSourcePvBean value2) throws Exception {
                // 合并相同common信息的数据
                value1.setTotalSessionCount(value1.getTotalSessionCount() + value2.getTotalSessionCount());
                value1.setUvCount(value1.getUvCount() + value2.getUvCount());
                value1.setTotalDuringTime(value1.getTotalDuringTime() + value2.getTotalDuringTime());
                value1.setJumpSessionCount(value1.getJumpSessionCount() + value2.getJumpSessionCount());
                value1.setPageViewCount(value1.getPageViewCount() + value2.getPageViewCount());
                return value1;
            }
        }, new ProcessWindowFunction<DwsTrafficForSourcePvBean, DwsTrafficForSourcePvBean, String, TimeWindow>() {
            @Override
            public void process(String s, Context context, Iterable<DwsTrafficForSourcePvBean> elements, Collector<DwsTrafficForSourcePvBean> out) throws Exception {
                TimeWindow timeWindow = context.window();
                String start = DateFormatUtil.toYmdHms(timeWindow.getStart());
                String end = DateFormatUtil.toYmdHms(timeWindow.getEnd());
                for (DwsTrafficForSourcePvBean element : elements) {
                    element.setStt(start);
                    element.setEdt(end);
                    // 修正时间戳
                    element.setTs(System.currentTimeMillis());
                    out.collect(element);
                }
            }
        });
        reduceStream.print();

        //TODO 10 维度关联
        reduceStream.map(new MapFunction<DwsTrafficForSourcePvBean, DwsTrafficForSourcePvBean>() {
            @Override
            public DwsTrafficForSourcePvBean map(DwsTrafficForSourcePvBean value) throws Exception {
                // 关联来源名称
                String sourceId = value.getSourceId();
                String provinceId = value.getAr();
                JSONObject dimBaseSource = DimUtil.getDimInfo("DIM_BASE_SOURCE", sourceId);
                String sourceName = dimBaseSource.getString("SOURCE_SITE");
                value.setSourceName(sourceName);
                JSONObject dimBaseProvince = DimUtil.getDimInfo("DIM_BASE_PROVINCE", provinceId);
                String provinceName = dimBaseProvince.getString("NAME");
                value.setProvinceName(provinceName);
                return value;
            }
        }).print();

        // 异步操作
        // 关联来源表
        SingleOutputStreamOperator<DwsTrafficForSourcePvBean> sourceBeanStream = AsyncDataStream.unorderedWait(reduceStream, new DimAsyncFunction<DwsTrafficForSourcePvBean>("DIM_BASE_SOURCE") {
            @Override
            public void join(DwsTrafficForSourcePvBean obj, JSONObject jsonObject) throws Exception {
                String sourceName = jsonObject.getString("SOURCE_SITE");
                obj.setSourceName(sourceName);
            }

            @Override
            public String getKey(DwsTrafficForSourcePvBean obj) {
                return obj.getSourceId();
            }
        }, 1, TimeUnit.MINUTES);

        // 关联省份
        SingleOutputStreamOperator<DwsTrafficForSourcePvBean> dimBeanStream = AsyncDataStream.unorderedWait(sourceBeanStream, new DimAsyncFunction<DwsTrafficForSourcePvBean>("DIM_BASE_PROVINCE") {
            @Override
            public void join(DwsTrafficForSourcePvBean obj, JSONObject jsonObject) throws Exception {
                String provinceName = jsonObject.getString("NAME");
                obj.setProvinceName(provinceName);
            }

            @Override
            public String getKey(DwsTrafficForSourcePvBean obj) {
                return obj.getAr();
            }
        }, 1, TimeUnit.MINUTES);

        //TODO 11 写出到clickHouse
        dimBeanStream.addSink(ClickHouseUtil.getJdbcSink(" " +
                "insert into dws_traffic_vc_source_ar_is_new_page_view_window values" +
                "(?,?,?,?,?,?,?,?,?,?,?,?,?,?)"));

        // TODO 12 执行任务
        env.execute();
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1260678.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

互联网金融智能风险防控技术要求

《互联网金融智能风险防控技术要求》 8月6日&#xff0c;国家市场监督管理总局和国家标准化管理委员会发布《互联网金融智能风险防控技术要求》&#xff08;GB/T 42929-2023&#xff09;&#xff08;以下简称“《要求》”&#xff09;&#xff0c;将于2023年12月1日实施。 《要…

MySQL数据库:外键、唯一键、唯一索引

目录 说明 一、如果要使用外键&#xff0c;表的存储引擎选择哪个&#xff1f; 1.1 答 1.2 示范 1.2.1 主表 &#xff08;1&#xff09;MyISAM的表&#xff1a;masterTable2 &#xff08;2&#xff09;InnoDB的表&#xff1a;masterTable1 1.2.2 从表 &#xff08;1&am…

图书管理系统源码,图书管理系统开发,图书借阅系统源码四TuShuManager应用程序MVC视图View

Asp.net web应用程序MVC之View视图 .ASP.NET MVC页面也就是要说的视图基本被放在Views文件夹下&#xff1b; 2.利用APS.NET MVC模板生成框架&#xff0c;Views文件夹下的默认页面为.cshtml页面&#xff1b; 3.ASP.NET MVC默认页面为Razor格式的页面&#xff0c;因此默认页面为.…

无人机光伏巡检代替人工,贵州电站运维升级

无人机光伏巡检如何做到降本增效&#xff1f;贵州省光伏电站有新招&#xff01;某70MWp的光伏电站通过引入复亚智能无人机光伏巡检系统&#xff0c;专注于使用无人机对区域内的光伏面板进行自动巡航巡查&#xff0c;利用自动化巡检和故障识别技术&#xff0c;显著提升了光伏电站…

UniPro集成华为云WeLink 为企业客户构建互为联接的协作平台

华为云WeLink是华为开启数字化办公体验、帮助企业实现数字化转型的实践&#xff0c;类似钉钉。UniPro的客户企业中&#xff0c;有使用WeLink作为协作工具的&#xff0c;基于客户的实际业务需求&#xff0c;UniPro实现了与WeLink集成的能力&#xff0c;以帮助客户企业丰富和扩展…

【触想智能】无风扇工控电脑一体机使用优势分析

无风扇工控电脑一体机是属于工控一体机分类中的其中一种&#xff0c;看名字&#xff0c;很明显就是没有散热风扇的工控电脑一体机&#xff0c;而平常我们使用的电脑主机是带有电源风扇、CPU散热风扇的。 无风扇工控电脑一体机的配置组成和商用电脑主机的配置基本一样&#xff0…

【读懂AUTOSAR】DoIP模块(1)-- 使用场景和链接的建立规范

引子 --什么是?为什么使用DoIP? DoIP就是通过IP进行诊断的意思(Diagnostic Over IP)。我们熟悉的诊断都是通过CAN总线的啊,为什么要通过IP?IP是什么? IP就是Internet Protocol,就是”互联网协议“啦! 那DoIP就是通过互联网进行的诊断喽,也可以叫做“基于以太网的诊…

JMeter之压力测试——混合场景并发

在实际的压力测试场景中&#xff0c;有时会遇到多个场景混合并发的情况&#xff0c;这时就需要设置不同的并发比例对不同场景请求数量的控制&#xff0c;下面提供两种方案。 一、多线程组方案 1.业务场景设计如下&#xff1a;场景A、场景B、场景C&#xff0c;三个场景按照并发…

FPGA模块——AD高速转换模块(并行输出转换的数据)

FPGA模块——AD高速转换模块&#xff08;并行输出转换的数据&#xff09; &#xff08;1&#xff09;AD9280/3PA9280芯片&#xff08;2&#xff09;代码 &#xff08;1&#xff09;AD9280/3PA9280芯片 AD9280/3PA9280芯片的引脚功能&#xff1a; 工作电压2.7到5.5v 数据对应&a…

<Linux> 文件理解与操作

目录 前言&#xff1a; 一、关于文件的预备知识 二、C语言文件操作 1. fope 2. fclose 3. 文件写入 3.1 fprintf 3.2 snprintf 三、系统文件操作 1. open 2. close 3. write 4. read 四、C文件接口与系统文件IO的关系 五、文件描述符 1. 理解文件描述符 2. 文…

商用车自动驾驶政策现状及趋势预判

一、我国自动驾驶法规政策体系 二、重点领域法规政策进展 1、战略引导 2、法律法规 3、标准体系 4、测试认证 5、创新支持 6、配套环境 三、“十四五”期间政策发展趋势 1、应用场景 2、法规标准趋势

Vue框架学习笔记——条件渲染:v-show和v-if

文章目录 前文提要条件渲染v-showv-ifv-else-if和v-else特殊写法&#xff0c;很多个一致的v-if如何消除 总结 前文提要 本人仅做个人学习记录&#xff0c;如有错误&#xff0c;请多包涵 主要学习链接&#xff1a;尚硅谷Vue2.0Vue3.0全套教程丨vuejs从入门到精通 条件渲染 条…

QT QComBox实现模糊查询

一、概述 在Qt中&#xff0c;可以通过QComboBox和QLineEdit实现模糊查询的功能。模糊查询是指根据用户输入的文本&#xff0c;在下拉框的选项中进行模糊匹配&#xff0c;并动态地显示匹配的选项。 二、基础知识 1、QCompleter (1)QCompleter 是 Qt 框架中提供的一个用于自动…

12 网关实战:Spring Cloud Gateway基础理论

为什么需要网关? 传统的单体架构中只有一个服务开放给客户端调用,但是微服务架构中是将一个系统拆分成多个微服务,那么作为客户端如何去调用这些微服务呢?如果没有网关的存在,只能在本地记录每个微服务的调用地址。 无网关的微服务架构往往存在以下问题: 客户端多次请求…

ElasticSearch学习笔记(一)

计算机软件的学习&#xff0c;最重要的是举一反三&#xff0c;只要大胆尝试&#xff0c;认真验证自己的想法就能收到事办功倍的效果。在开始之前可以看看别人的教程做个快速的入门&#xff0c;然后去官方网站看看官方的教程&#xff0c;有中文教程固然是好&#xff0c;没有中文…

从容应对高并发:RabbitMQ与消息限流策略的完美结合

在当今互联网时代&#xff0c;高并发访问已成为许多应用系统面临的常见挑战之一。对于需要处理大量请求的系统来说&#xff0c;如何保证系统的稳定性和可靠性是一个关键问题。RabbitMQ作为一种可靠的消息队列中间件&#xff0c;可以帮助解决高并发环境下的消息处理问题。而结合…

“智”护城市生命线,宏电亮相第十届中国(上海)国际管网展

11月22-24日&#xff0c;第十届中国&#xff08;上海&#xff09;国际管网展览会在国家会展中心盛大举办&#xff0c;展会旨在配合推进国家基础建设工作&#xff0c;推动管网改造建设&#xff0c;汇聚了三百余家优秀企业参展&#xff0c;展示产品及技术覆盖管网建设、智慧水务、…

LeetCode-面试题08.01 三步问题 C/C++实现 超详细思路及过程[E]

&#x1f388;归属专栏&#xff1a;深夜咖啡配算法 &#x1f697;个人主页&#xff1a;Jammingpro &#x1f41f;记录一句&#xff1a;摆了一个周末了&#xff0c;不能摆了&#xff0c;努力起来&#xff01;&#xff01; 文章目录 LeetCode-面试题08.01 三步问题&#x1f697;题…

Hiera实战:使用Hiera实现图像分类任务(一)

文章目录 摘要安装包安装timm 数据增强Cutout和MixupEMA项目结构计算mean和std生成数据集_pickle.PicklingError: Cant pickle <function Head.<lambda> at 0x000001DE8DD7F240>: attribute lookup Head.<lambda> on hiera.hiera failed 摘要 现代层次视觉变…

基于DSP/SOC音乐灯效系统设计方法

音乐灯效系统设计方法 是否需要申请加入数字音频系统研究开发交流答疑群(课题组)?可加我微信hezkz17, 本群提供音频技术答疑服务,+群赠送语音信号处理降噪算法,蓝牙耳机音频,DSP音频项目核心开发资料, 三种方法: (1)MIC 采集音乐信号变化,(2)直接获取SPK 模拟音频…