大数据-268 实时数仓 - ODS层 将 Kafka 中的维度表写入 DIM

news2025/1/8 15:09:59

点一下关注吧!!!非常感谢!!持续更新!!!

Java篇开始了!

  • MyBatis 更新完毕
  • 目前开始更新 Spring,一起深入浅出!

目前已经更新到了:

  • Hadoop(已更完)
  • HDFS(已更完)
  • MapReduce(已更完)
  • Hive(已更完)
  • Flume(已更完)
  • Sqoop(已更完)
  • Zookeeper(已更完)
  • HBase(已更完)
  • Redis (已更完)
  • Kafka(已更完)
  • Spark(已更完)
  • Flink(已更完)
  • ClickHouse(已更完)
  • Kudu(已更完)
  • Druid(已更完)
  • Kylin(已更完)
  • Elasticsearch(已更完)
  • DataX(已更完)
  • Tez(已更完)
  • 数据挖掘(已更完)
  • Prometheus(已更完)
  • Grafana(已更完)
  • 离线数仓(已更完)
  • 实时数仓(正在更新…)

章节内容

  • ODS
  • Lambda架构
  • Kappa架构

在这里插入图片描述

基本介绍

在 Kafka 中写入维度表(DIM)通常涉及将实时或批处理数据从 Kafka 主题(Topic)读取,并根据数据流中的信息更新维度表(DIM),这在数据仓库或数据湖的 ETL(提取、转换、加载)过程中非常常见。维度表(DIM)存储的是与业务数据相关的维度信息,例如客户、产品、地理位置等,用于支持 OLAP(联机分析处理)查询。

理解 Kafka 数据流

Kafka 是一个分布式流平台,用于高吞吐量的消息传递。在 ETL 过程中,Kafka 通常用作数据的消息队列或者流处理的来源。每当新数据生成时,它会被发布到 Kafka 中的某个主题(Topic),然后消费者(Consumer)可以从主题中获取数据进行处理。

设计维度表(DIM)

维度表通常包含业务实体的详细信息,如产品名称、客户信息、时间维度等。与事实表(Fact)不同,维度表的数据较为静态,但可能会随着时间更新(例如,客户地址变更或产品类别更新)。每个维度表通常有一个唯一的主键(如 customer_id 或 product_id)来标识记录。

Kafka 消费者(Consumer)

为了从 Kafka 中读取维度数据,需要创建一个消费者(Consumer),它会从 Kafka 的某个主题(Topic)中读取消息。这些消息通常是 JSON 格式,包含需要写入维度表的信息。消费者将从 Kafka 主题中获取数据,可能包括以下步骤:

  • 连接到 Kafka 集群。
  • 订阅一个或多个主题(Topics)。
  • 消费消息并将其传递给后续的处理逻辑。
  • 消费者的实现可以使用 Kafka 提供的客户端库,例如 Kafka 的 Java 客户端、Python 的 confluent-kafka 等。

数据处理和转换

在读取到 Kafka 消息后,消费者需要对数据进行必要的处理和转换。对于维度数据,处理逻辑可能包括:

  • 数据解析:将消息从 Kafka 中的格式(例如 JSON)解析成结构化数据。
  • 校验数据:检查数据是否符合业务规则,是否完整,是否有效。
  • 维度数据更新:如果 Kafka 中的消息包含的维度信息已经存在,则更新相关记录;如果是新维度,则插入新记录。

维度表的更新

维度表的更新通常有两种常见的方式:

  • 全量更新:每次从 Kafka 获取到新的数据时,都将其覆盖到维度表中。这种方式适用于数据变动较少或者可以接受重写的场景。
  • 增量更新:根据时间戳、有效性标志或版本号等信息,更新已有的维度记录。这种方式适用于数据会有更新(如地址或状态变更)的场
    景。

增量更新时,通常会执行以下操作:

  • 查找是否已有该维度记录(例如通过 dimension_id)。
  • 如果存在且数据发生变化,则更新该记录,同时更新 valid_to 时间,并插入一条新的记录,设置 valid_from 和 valid_to 时间。
  • 如果不存在该记录,则直接插入新的维度数据。

写入到目标存储(DIM)

在数据处理后,需要将更新后的维度数据写入目标存储。这通常是一个数据库(例如 MySQL、PostgreSQL 或 NoSQL 数据库)或数据仓库(例如 Snowflake、Google BigQuery、Redshift)中的维度表(DIM)。

数据存储更新(事务性考虑)

对于维度表的更新,通常需要确保数据的一致性。可以使用事务来确保数据在更新过程中的一致性,防止数据丢失或重复。例如,可以在事务中执行所有的更新和插入操作,确保如果操作失败,可以回滚。

TableObject

创建样例 TableObject

case class TableObject(database: String, tableName: String, typeInfo: String, dataInfo: String) extends Serializable

AreaInfo

case class AreaInfo(
  id: String,
  name: String,
  pid: String,
  sname: String,
  level: String,
  citycode: String,
  yzcode: String,
  mername: String,
  Lng: String,
  Lat: String,
  pinyin: String
  )

DataInfo

case class DataInfo(
  modifiedTime: String,
  orderNo: String,
  isPay: String,
  orderId: String,
  tradeSrc: String,
  payTime: String,
  productMoney: String,
  totalMoney: String,
  dataFlag: String,
  userId: String,
  areaId: String,
  createTime: String,
  payMethod: String,
  isRefund: String,
  tradeType: String,
  status: String
)

ConnHBase

class ConnHBase {
  def connToHbase:Connection ={
    val conf : Configuration = HBaseConfiguration.create()
    conf.set("hbase.zookeeper.quorum","h121.wzk.icu,h122.wzk.icu,h123.wzk.icu")
    conf.set("hbase.zookeeper.property.clientPort","2181")
    conf.setInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT,30000)
    conf.setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD,30000)
    val connection = ConnectionFactory.createConnection(conf)
    connection
  }
}

SinkHBase

class SinkHBase extends RichSinkFunction[util.ArrayList[TableObject]] {

  var connection : Connection = _
  var hbTable : Table = _

  override def open(parameters: Configuration): Unit = {
    connection = new ConnHBase().connToHbase
    hbTable = connection.getTable(TableName.valueOf("wzk_area"))
  }

  override def close(): Unit = {
    if (hbTable != null) {
      hbTable.close()
    }
    if (connection != null) {
      connection.close()
    }
  }

  override def invoke(value: util.ArrayList[TableObject], context: SinkFunction.Context[_]): Unit = {
    value.forEach(x => {
      println(x.toString)
      val database: String = x.database
      val tableName: String = x.tableName
      val typeInfo: String = x.typeInfo
      if ((database.equalsIgnoreCase("dwshow") && tableName.equalsIgnoreCase("wzk_trade_orders"))) {
        if (typeInfo.equalsIgnoreCase("insert")) {
          value.forEach(x => {
            val info: DataInfo = JSON.parseObject(x.dataInfo, classOf[DataInfo])
            insertTradeOrders(hbTable, info)
          })
        } else if (typeInfo.equalsIgnoreCase("update")) {

        } else if (typeInfo.equalsIgnoreCase("delete")) {

        }
      }

      if (database.equalsIgnoreCase("dwshow") && tableName.equalsIgnoreCase("wzk_area")) {
        if (typeInfo.equalsIgnoreCase("insert")) {
          value.forEach(x => {
            val info: AreaInfo = JSON.parseObject(x.dataInfo, classOf[AreaInfo])
            insertArea(hbTable, info)
          })
        } else if (typeInfo.equalsIgnoreCase("update")) {
          value.forEach(x => {
            val info: AreaInfo = JSON.parseObject(x.dataInfo, classOf[AreaInfo])
            insertArea(hbTable, info)
          })
        } else if (typeInfo.equalsIgnoreCase("delete")) {
          value.forEach(x => {
            val info: AreaInfo = JSON.parseObject(x.dataInfo, classOf[AreaInfo])
            deleteArea(hbTable, info)
          })

        }
      }
    })
  }

  def insertTradeOrders(hbTable: Table, dataInfo: DataInfo): Unit = {
    val tableName = "wzk_trade_orders"
    val columnFamily = "f1"
    // 如果表不存在则创建
    createTableIfNotExists(connection, tableName, columnFamily)

    val put = new Put(dataInfo.orderId.getBytes)
    put.addColumn("f1".getBytes, "modifiedTime".getBytes, dataInfo.modifiedTime.getBytes())
    put.addColumn("f1".getBytes, "orderNo".getBytes, dataInfo.orderNo.getBytes())
    put.addColumn("f1".getBytes, "isPay".getBytes, dataInfo.isPay.getBytes())
    put.addColumn("f1".getBytes, "orderId".getBytes, dataInfo.orderId.getBytes())
    put.addColumn("f1".getBytes, "tradeSrc".getBytes, dataInfo.tradeSrc.getBytes())
    put.addColumn("f1".getBytes, "payTime".getBytes, dataInfo.payTime.getBytes())
    put.addColumn("f1".getBytes, "productMoney".getBytes, dataInfo.productMoney.getBytes())
    put.addColumn("f1".getBytes, "totalMoney".getBytes, dataInfo.totalMoney.getBytes())
    put.addColumn("f1".getBytes, "dataFlag".getBytes, dataInfo.dataFlag.getBytes())
    put.addColumn("f1".getBytes, "userId".getBytes, dataInfo.userId.getBytes())
    put.addColumn("f1".getBytes, "areaId".getBytes, dataInfo.areaId.getBytes())
    put.addColumn("f1".getBytes, "createTime".getBytes, dataInfo.createTime.getBytes())
    put.addColumn("f1".getBytes, "payMethod".getBytes, dataInfo.payMethod.getBytes())
    put.addColumn("f1".getBytes, "isRefund".getBytes, dataInfo.isRefund.getBytes())
    put.addColumn("f1".getBytes, "tradeType".getBytes, dataInfo.tradeType.getBytes())
    put.addColumn("f1".getBytes, "status".getBytes, dataInfo.status.getBytes())
    hbTable.put(put)
  }

  def insertArea(hbTable: Table, areaInfo: AreaInfo): Unit = {
    // val tableName = "wzk_area"
    // val columnFamily = "f1"
    // 如果表不存在则创建
    // createTableIfNotExists(connection, tableName, columnFamily)

    println(areaInfo.toString)
    val put = new Put(areaInfo.id.getBytes())
    put.addColumn("f1".getBytes(), "name".getBytes(), areaInfo.name.getBytes())
    put.addColumn("f1".getBytes(), "pid".getBytes(), areaInfo.pid.getBytes())
    put.addColumn("f1".getBytes(), "sname".getBytes(), areaInfo.sname.getBytes())
    put.addColumn("f1".getBytes(), "level".getBytes(), areaInfo.level.getBytes())
    put.addColumn("f1".getBytes(), "citycode".getBytes(), areaInfo.citycode.getBytes())
    put.addColumn("f1".getBytes(), "yzcode".getBytes(), areaInfo.yzcode.getBytes())
    put.addColumn("f1".getBytes(), "mername".getBytes(), areaInfo.mername.getBytes())
    put.addColumn("f1".getBytes(), "lng".getBytes(), areaInfo.Lng.getBytes())
    put.addColumn("f1".getBytes(), "lat".getBytes(), areaInfo.Lat.getBytes())
    put.addColumn("f1".getBytes(), "pinyin".getBytes(), areaInfo.pinyin.getBytes())
    hbTable.put(put)
  }

  def deleteArea(hbTable: Table, areaInfo: AreaInfo): Unit = {
    val delete = new Delete(areaInfo.id.getBytes)
    hbTable.delete(delete)
  }

  def createTableIfNotExists(connection: Connection, tableName: String, columnFamily: String): Unit = {
    val admin = connection.getAdmin
    try {
      val table = TableName.valueOf(tableName)

      // 检查表是否存在
      if (!admin.tableExists(table)) {
        val tableDescriptor = new HTableDescriptor(table)
        val columnDescriptor = new HColumnDescriptor(columnFamily.getBytes())
        tableDescriptor.addFamily(columnDescriptor)

        // 创建表
        admin.createTable(tableDescriptor)
        println(s"表 $tableName 创建成功")
      } else {
        println(s"表 $tableName 已存在")
      }
    } finally {
      admin.close()
    }
  }

}

SourceKafka

class SourceKafka {

  def getKafkaSource(topicName: String) : FlinkKafkaConsumer[String] = {
    val props = new Properties()
    props.setProperty("bootstrap.servers", "h121.wzk.icu:9092")
    props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
    props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
    props.setProperty("group.id", "hbase-test")
    props.setProperty("auto.offset.reset", "earliest")
    new FlinkKafkaConsumer[String](topicName, new SimpleStringSchema(), props)
  }

}

KafkaToHBase

object KafkaToHBase {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val kafkaConsumer = new SourceKafka().getKafkaSource("dwshow")
    kafkaConsumer.setStartFromLatest()
    val sourceStream = env.addSource(kafkaConsumer)
    val mapped: DataStream[util.ArrayList[TableObject]] = sourceStream.map(x => {
      val jsonObj: JSONObject = JSON.parseObject(x)
      val database: AnyRef = jsonObj.get("database")
      val table: AnyRef = jsonObj.get("table")
      val typeInfo: AnyRef = jsonObj.get("type")
      val objects = new util.ArrayList[TableObject]()
      jsonObj.getJSONArray("data").forEach(x => {
        objects.add(TableObject(database.toString, table.toString, typeInfo.toString, x.toString))
        println(x.toString)
      })
      objects
    })
    mapped.addSink(new SinkHBase)
    env.execute()
  }
}

启动项目

我们对表进行修改:
在这里插入图片描述
可以看到控制台对饮输出了内容:
在这里插入图片描述
别的表也尝试修改一下:
在这里插入图片描述
查看 HBase 可以看到数据已经有了:
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2273277.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

把vue项目或者vue组件发布成npm包或者打包成lib库文件本地使用

将vue项目发布成npm库文件,第三方通过npm依赖安装使用;使用最近公司接了一个项目,这个项目需要集成到第三方页面,在第三方页面点击项目名称,页面变成我们的项目页面;要求以npm库文件提供给他们;…

Kafka为什么要放弃Zookeeper

1.Kafka简介 Apache Kafka最早是由Linkedin公司开发,后来捐献给了Apack基金会。 Kafka被官方定义为分布式流式处理平台,因为具备高吞吐、可持久化、可水平扩展等特性而被广泛使用。目前Kafka具体如下功能: 消息队列,Kafka具有系统解耦、流…

【JVM】总结篇-类的加载篇之 类的加载器 和ClassLoader分析

文章目录 类的加载器ClassLoader自定义类加载器双亲委派机制概念源码分析优势劣势如何打破Tomcat 沙箱安全机制JDK9 双亲委派机制变化 类的加载器 获得当前类的ClassLoader clazz.getClassLoader() 获得当前线程上下文的ClassLoader Thread.currentThread().getContextClassLoa…

java 转义 反斜杠 Unexpected internal error near index 1

代码: String str"a\\c"; //出现异常,Unexpected internal error near index 1 //System.out.println(str.replaceAll("\\", "c"));//以下三种都正确 System.out.println(str.replace(\\, c)); System.out.println(str.r…

el-table 实现纵向多级表头

为了实现上图效果,最开始打算用el-row、el-col去实现,但发现把表头和数据分成两大列时,数据太多时会导致所在格高度变高。但由于每一格数据肯定不一样,为保持高度样式一致,就需要我们手动去获取最高格的高度之后再设置…

2025最新版Visual Studio Code安装使用指南

2025最新版Visual Studio Code安装使用指南 Installation and Usage Guide for the Latest Visual Studio Code in 2024 By JacksonML 2025-1-7 1. Visual Studio Code背景 早在二十年前,通用的集成开发环境(Integrated Deveopment Environment, 简称…

Flutter 鸿蒙化 flutter和鸿蒙next混和渲染

前言导读 这一个节课我们讲一下PlatformView的是使用 我们在实战中有可能出现了在鸿蒙next只加载一部分Flutter的情况 我们今天就讲一下这种情况具体实现要使用到我们的PlatformView 效果图 具体实现: 一、Native侧 使用 DevEco Studio工具打开 platform_view_example\oho…

LabVIEW语言学习过程是什么?

学习LabVIEW语言的过程可以分为几个阶段,每个阶段的重点内容逐步加深,帮助你从入门到精通。以下是一个简洁的学习过程: ​ 1. 基础入门阶段 理解图形化编程:LabVIEW是一种图形化编程语言,与传统的文本编程语言不同&am…

Kubernetes Gateway API-4-TCPRoute和GRPCRoute

1 TCPRoute 目前 TCP routing 还处于实验阶段。 Gateway API 被设计为与多个协议一起工作,TCPRoute 就是这样一个允许管理TCP流量的路由。 在这个例子中,我们有一个 Gateway 资源和两个 TCPRoute 资源,它们按照以下规则分配流量&#xff1…

嵌入式SD/TF卡通用协议-SDIO协议

SD卡(SecureDigital MemoryCard)即:安全数码卡,它是在MMC的基础上发展而来,是一种基于半导体快闪记忆器的新一代记忆设备,它被广泛地于便携式装置上使用,例如数码相机、个人数码助理(PDA)和多媒…

性能测试05|JMeter:分布式、报告、并发数计算、性能监控

目录 一、JMeter分布式 1、应用场景 2、原理 3、分布式相关注意事项 4、分布式配置与运行 二、JMeter报告 1、聚合报告 2、HTML报告 三、并发用户数(线程数)计算 四、JMeter下载第三方插件 五、性能监控 1、Concurrency Thread Group 线程组…

wujie无界微前端框架初使用

先说一下项目需求:将单独的四套系统的登录操作统一放在一个入口页面进行登录,所有系统都使用的是vue3,(不要问我为啥会这样设计,产品说的客户要求) 1.主系统下载wujie 我全套都是vue3,所以直接…

SpringIOC循环依赖与三级缓存

SpringIOC循环依赖与三级缓存 Spring解决循环依赖的核心机制就是通过三级缓存: 一级缓存(singletonObjects):存储完全初始化好的Bean;二级缓存(earlySingletonObjects):存储原始实例…

【顶刊TPAMI 2025】多头编码(MHE)之极限分类 Part 3:算法实现

目录 1 三种多头编码(MHE)实现1.1 多头乘积(MHP)1.2 多头级联(MHC)1.3 多头采样(MHS)1.4 标签分解策略 论文:Multi-Head Encoding for Extreme Label Classification 作者…

前端 图片上鼠标画矩形框,标注文字,任意删除

效果: 页面描述: 对给定的几张图片,每张能用鼠标在图上画框,标注相关文字,框的颜色和文字内容能自定义改变,能删除任意画过的框。 实现思路: 1、对给定的这几张图片,用分页器绑定…

【办公利器】ReNamer (批量文件重命名工具) Pro v7.6.0.4 多语便携版,海量文件秒速精准改名!

ReNamer是一款功能强大的文件重命名工具,它可以帮助用户快速方便地批量重命名文件和文件夹。 软件功能 批量重命名:ReNamer可以同时处理多个文件和文件夹,并对其进行批量重命名,从而节省时间和劳动力。灵活的重命名规则&#xff…

unity学习13:gameobject的组件component以及tag, layer 归类

目录 1 gameobject component 是unity的基础 1.1 类比 1.2 为什么要这么设计? 2 从空物体开始 2.1 创建2个物体 2.2 给 empty gameobject添加组件 3 各种组件和新建组件 3.1 点击 add component可以添加各种组件 3.2 新建组件 3.3 组件的操作 3.4 特别的…

数据库模型全解析:从文档存储到搜索引擎

目录 前言1. 文档存储(Document Store)1.1 概念与特点1.2 典型应用1.3 代表性数据库 2. 图数据库(Graph DBMS)2.1 概念与特点2.2 典型应用2.3 代表性数据库 3. 原生 XML 数据库(Native XML DBMS)3.1 概念与…

CSS——1.优缺点

<!DOCTYPE html> <html><head><meta charset"UTF-8"><title></title><link rel"stylesheet" type"text/css" href"1-02.css"/></head><body><!--css&#xff1a;层叠样式表…

UE5本地化和国际化语言

翻译语言 工具 - 本地化控制板 Localization Dashboard 修改图中这几个地方就可以 点击箭头处&#xff0c;把中文翻译成英语&#xff0c;如果要更多语言就点 添加新语言 最后点击编译即可 编译完&#xff0c;会在目录生成文件夹 设置界面相关蓝图中设置 切换本地化语言 必须在…