Flink实时电商数仓(二)

news2024/12/25 15:54:54

GitLab的用户创建和推送

  1. 在root用户-密码界面重新设置密码
  2. 添加Leader用户和自己使用的用户
  3. 使用root用户创建相应的群组
  4. 使用Leader用户创建对应的项目
  5. 设置分支配置为“初始推送后完全保护”
  6. 设置.gitignore文件,项目配置文件等其他非通用代码无需提交
  7. 安装gitlab project 2020插件
  8. 点击share project on gitlab 即可将项目上传到gitlab中

Flink集群的搭建

  • 只需要运行Yarn模式
  • 配置Hadoop的环境变量
    在这里插入图片描述
  • 将Flink1.17解压安装到对应为止即可

Hbase的配置

  1. 依赖zookeeper和hadoop这两个框架
  2. 检查Hadoop是否退出安全模式,如果丢失文件,先退出安全模式,hdfs dfsadmin -safemode leave
  3. 解压Hbase2.4.11的安装包
  4. 添加Hbase的环境变量
    在这里插入图片描述
  5. 修改配置文件
    • hbase-env.xml
      • export hbase_manages_zk=false 不使用自带的zookeeper
    • hbase-site.xml
      • hbase.cluster.distributed = true 使用集群模式
      • hbase.zookeeper.quorum = hadoop102… zookeeper连接地址
      • hbase.rootdir = hdfs://hadoop102:8020, hbase在hdfs的存放根路径
      • hbase.wal.provider = filesystem 预写日志
    • regionservers: 添加hbase小弟的主机名称

Redise的配置

  1. 进入redise目录,执行make指令进行编译
  2. make instanll安装
  3. 将myredis.conf文件复制到~/目录下
  4. 将bind 127.0.0.1 注释掉,并且关闭保护模式
  5. 设置daemon 后台启动模式为yes
  6. redis-server ./my_redis.conf后台启动

实时数仓ODS层

  • 保证数据模拟器产生的数据是有序的
    • 设置mock.if-realtime:1,重复执行数据模拟器产生数据时,会从当前时间继续产生数据。
    • Kafka数据有序:Flink并发度和Kafka的分区数一致
      • 设置三个kafka节点的分区个数都为4,num.partitions=4
      • Flink的并发度=4
  • 历史维度数据
    • 使用maxwell的bootstrap功能初始化维度信息(json格式),写入到kafka
    • 编写mysql_to_kafka_init.sh脚本
    • maxwell需要检查是否连接mysql的binlog成功,查看日志;如果出错,需要在mysql的maxwell库中删除所有表即可

实时数仓dim层

  • dim层的设计依据是维度建模理论,并且遵循三范式,使用雪花模型
  • dim层的数据存储在Hbase中
  • 开发时需要切换到dev开发分支
  • 为Flink的开发创建一个基类,名为BaseApp
    • 抽象方法handle(): 每个主程序的业务逻辑
    • 具体方法start():里面实现Flink代码的通用逻辑
  • 不同分组的数据只能消费一次,如果数据需要给多个程序使用,就需要分为不同的group

Flink-cdc获取维度信息

  1. 数据清洗
  2. 动态拆分维度表功能
    • 方式1:直接将维度表做成List< String > (维度表名称)保存
      • 如果将代码写死,后续想要修改,需要重新编译修改
    • 方式2:将维度表名称设计为单独的一个配置文件,而不是在代码里面写死;后续想要修改,直接改配置文件,重启任务即可生效
    • 方式3:热修改hotfix, 热加载配置文件,不需要重启;热加载文件一般是以时间周期作为加载逻辑。时间长时会出现时效性问题,时间短的话过于耗费资源。
    • 方式4:zookeeper的watch的监控,能够存储基础的表名,但是不适合存储完整的表格信息,除了要判断哪些是维度表,还需要记录哪些数据需要写出到Hbase。
    • 方式5:cdc,变更数据抓取,类似与maxwell。
  3. 注意:运行下面的代码需要再虚拟机的/etc/my.cnf文件中开启对应数据库的binlog日志。注意对照库名是否填写正确。
public class Test02 {
    public static void main(String[] args) {
        //创建env
        //1.创建运行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        //默认是最大并行度
        env.setParallelism(4);

        System.setProperty("HADOOP_USER_NAME", "atguigu");

        //设置检查点和状态后端
        // 1.4 状态后端及检查点相关配置
        // 1.4.1 设置状态后端
        env.setStateBackend(new HashMapStateBackend());

         1.4.2 开启 checkpoint
        //env.enableCheckpointing(5000);
         1.4.3 设置 checkpoint 模式: 精准一次
        //env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
         1.4.4 checkpoint 存储
        //env.getCheckpointConfig().setCheckpointStorage("hdfs://hadoop102:8020/gmall2023/stream/" + "test01");
         1.4.5 checkpoint 并发数
        //env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
         1.4.6 checkpoint 之间的最小间隔
        //env.getCheckpointConfig().setMinPauseBetweenCheckpoints(5000);
         1.4.7 checkpoint  的超时时间
        //env.getCheckpointConfig().setCheckpointTimeout(10000);
         1.4.8 job 取消时 checkpoint 保留策略
        //env.getCheckpointConfig().setExternalizedCheckpointCleanup(RETAIN_ON_CANCELLATION);

        //读取数据


        //mysql source
        MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
                .hostname(Constant.MYSQL_HOST)
                .port(Constant.MYSQL_PORT)
                .username(Constant.MYSQL_USER_NAME)
                .password(Constant.MYSQL_PASSWORD)
                .databaseList("gmall2023_config")
                .tableList("gmall2023_config.table_process_dim")
                .deserializer(new JsonDebeziumDeserializationSchema())
                .startupOptions(StartupOptions.initial())
                .build();

        DataStreamSource<String> ds = env.fromSource(mySqlSource,
                WatermarkStrategy.noWatermarks(),
                "kafkasource").setParallelism(1);

        ds.print();


        try {
            env.execute();
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1322309.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【python VS vba】(9) 在python使用matplotlib库来画多个图形,子图,以及图中图

2 用matplotlib 画多个函数图形 2.1 在一个画布里画多个图形 &#xff08;1张画布&#xff0c;1个坐标轴系&#xff0c;多个图形叠在一起&#xff09; import numpy as np import matplotlib.pyplot as pltfig1plt.figure(num1)xnp.linspace(-5,5, 10) yx*21 y2x**2# 绘图 p…

人工智能在约会APP开发中的作用

约会APP已成为当今技术世界中结识人们的流行方式。这意味着您不必要求您的朋友去见某人约会。简而言之&#xff0c;技术改善了约会过程&#xff0c;而人工智能在约会APP开发中的兴起极大地影响了人们今天的约会方式。 在约会APP中使用人工智能技术可以改善个人寻找完美匹配对象…

基于hfl/rbt3模型的情感分析学习研究——文本挖掘

参考书籍《HuggingFace自然语言处理详解 》 什么是文本挖掘 文本挖掘&#xff08;Text mining&#xff09;有时也被称为文字探勘、文本数据挖掘等&#xff0c;大致相当于文字分析&#xff0c;一般指文本处理过程中产生高质量的信息。高质量的信息通常通过分类和预测来产生&…

基于Java SSM框架实现宠物医院信息管理系统项目【项目源码】

基于java的SSM框架实现宠物医院信息管理系统演示 java简介 Java语言是在二十世纪末由Sun公司发布的&#xff0c;而且公开源代码&#xff0c;这一优点吸引了许多世界各地优秀的编程爱好者&#xff0c;也使得他们开发出当时一款又一款经典好玩的小游戏。Java语言是纯面向对象语言…

一键修复找不到msvcp140.dll无法继续执行代码的办法,有效修复

电脑出现“找不到msvcp140.dll无法继续执行代码”是什么情况&#xff1f;如果系统中没有这个文件或文件发生损坏&#xff0c;那么在启动某些应用程序或游戏时&#xff0c;可能会遇到错误消息&#xff0c;如“程序无法启动因为msvcp140.dll丢失在您的计算机上”或“找不到msvcp1…

CPU缓存一致性问题

什么是可见性问题&#xff1f; Further Reading &#xff1a;什么是可见性问题&#xff1f; 缓存一致性 内存一致性 内存可见性 顺序一致性区别 CPU缓存一致性问题 由于CPU缓存的出现&#xff0c;很好地解决了处理器与内存速度之间的矛盾&#xff0c;极大地提高了CPU的吞吐能…

luttuce(RedisTempate)实现hash(动态数据) expire lua脚本

话不多说先放脚本&#xff1a; local argv ARGV local length #argv if length > 0 then local unpackArgs {} for i 1, length - 1 dotable.insert(unpackArgs, argv[i]) end if redis.call(exists, KEYS[1]) 1 thenredis.call(del, KEYS[1])redis.call(hset, KEYS[…

【ONE·English || 翻译作业 Development: Mendel‘s Legacy to Genetics】

总言 作业&#xff1a;没有严格按照语句结构进行翻译&#xff0c;有不规范之处。下述目录中每一小节是按照原文段落划分。   相关链接&#xff1a;pubmed中查阅的链接&#xff0c;提供了两处文章平台。 文章目录 总言part11.11.21.3 part2&#xff1a;Entwicklung and develo…

Linear Regression多重共线性

目录 介绍&#xff1a; 一、 corr ​二、pairplot 三、VIF 3.1自带vif 3.2自定义函数vif 四、heatmp&#xff08;直观感受&#xff09; 介绍&#xff1a; 多重共线性是指在线性回归模型中&#xff0c;自变量之间存在强相关性或线性关系&#xff0c;从而导致模型的稳定性…

使用相关序列方法做相位校准(附仿真代码)

TI对天线幅相校准提出了标准的方法和流程&#xff0c;可参考这篇文档使用级联毫米波传感器的成像雷达参考设计1&#xff08;TI文档&#xff09;-CSDN博客的3.3节。这里使用自相关序列的方法来对相位做校准。 自相关&#xff08;Autocorrelation&#xff09;&#xff0c;也叫序列…

Hypervisor Display架构

Hypervisor Display架构部分 1&#xff0c;所有LA侧的APP与显示相关的调用最终都会交由SurfaceFlinger处理 2&#xff0c;SurfaceFlinger会最终调用android.hardware.graphics.composer2.4-service服务 3&#xff0c;android.hardware.graphics.composer2.4-service服务会调用G…

http代理的静态ip如何实现YouTube运营?有何优势?

一、静态ip是什么&#xff1f;静态住宅ip有什么优势&#xff1f; 静态ip是指网络中某个设备&#xff08;如计算机、路由器&#xff09;拥有的永久不变的ip地址&#xff0c;它的ip地址在设备与网络连接后&#xff0c;由网络管理员手动配置或预留&#xff0c;并且不会轻易更改。…

紫光FPGA学习之常见报错

紫光pango design suite报错&#xff1a; 一、4005: [D:/**/rtl/burstORsingle.v(line number: 47)] Logic for ddr_head_addr_rr does not match a standard flip-flop. 看来看去都没有发现这个定义没有问题呀&#xff0c;检查发现&#xff1a; 原来代码&#xff1a; always…

2023本四前端社招面经

美团 全程问项目&#xff0c;根据项目提问&#xff0c;SEO优化方案&#xff0c;还出了一道动态规划的题 SEO优化方案 一、内部优化 META 标签优化&#xff1a;例如&#xff1a;TITLE&#xff0c;KEYWORDS&#xff0c;DESCRIPTION &#xff08;TDK&#xff09;等的优化 内部链接…

轻量封装WebGPU渲染系统示例<53>- 多盏灯灯光照在地面的效果

WebGPU实时渲染实现模拟多盏灯的灯光照在地面的效果灯光效果 。 当前示例源码github地址: https://github.com/vilyLei/voxwebgpu/blob/feature/material/src/voxgpu/sample/MultiLightsTest.ts 当前示例运行效果: 此示例基于此渲染系统实现&#xff0c;当前示例TypeScript源…

Java版直播商城规划:电商源码、小程序、三级分销与免 费搭建全攻略

【saas云平台】打造全行业全渠道全场景的saas产品&#xff0c;为经营场景提供一体化解决方案&#xff1b;门店经营区域化、网店经营一体化&#xff0c;本地化、全方位、一站式服务&#xff0c;为多门店提供统一运营解决方案&#xff1b;提供丰富多样的营销玩法覆盖所有经营场景…

git命令查看提交代码行数和次数

右键点击Git Bash Here 查看代码提交次数 git log --since2022-7-1 --before2022-8-1 --author"XXXX" --pretty%aN |sort |uniq -c | sort -k1 -n -r查看代码提交行数 git log --since2022-8-1 --before2022-9-1 --authorXXXX --prettytformat: --numstat |awk {add…

无锡市某厂区工人上岗未穿工作服,殒命车间 富维AI守护每位工友

2018年12月23日&#xff0c;凌晨6点半左右&#xff0c;江阴华士某铜业公司轧球车间内&#xff0c;独自上夜班的操作工朱某正在操作行车吊运一筐切好的铜粒&#xff0c;吊运完成后&#xff0c;他开始解除料筐上的吊具。就在这时&#xff0c;意外突然发生&#xff0c;他身上穿着的…

前端开发新趋势:Web3、区块链和虚拟现实

目录 前言 Web3&#xff1a;下一代互联网 区块链技术 去中心化应用程序&#xff08;DApps&#xff09; 区块链&#xff1a;重塑数字世界 数字钱包 NFT&#xff08;非同质化代币&#xff09; 虚拟现实&#xff1a;沉浸式体验 WebVR和WebXR 三维图形 新挑战与机会 性…