Flink实时数仓同步:实时表、流水表、快照表整合实战详解

news2024/11/17 13:37:48

一、背景

在大数据领域,数据分析、实时数仓已经成为平台上常见的功能之一。无论是进行实时分析还是离线分析,都离不开数仓中的表数据。

特别是在实时分析领域,查阅实时数据、历史数据以及历史变更数据是非常常见的需求。而这些功能的实现主要依赖于数仓中的实时表、流水表和快照表。

本文将结合前几篇关于实时数仓同步的内容,介绍实际应用中的案例场景,帮助读者更深入地理解这些功能的实际应用价值。

  1. [Mysql] 业务数据 - 用户表全量数据:
idnamephonegendercreate_timeupdate_time
1jack1112023-06-01 13:00:002023-06-01 13:00:00
2jason2222023-06-01 13:00:002023-06-01 13:00:00
3tom3332023-06-01 13:00:002023-06-01 13:00:00
  1. [Mysql] 2023-06-02 业务数据新增了一名用户,且更改了tom的手机号,此时表数据如下:
idnamephonegendercreate_timeupdate_time
1jack1112023-06-01 13:00:002023-06-01 13:00:00
2jason2222023-06-01 13:00:002023-06-01 13:00:00
3tom4442023-06-01 13:00:002023-06-02 09:00:00
4tony5552023-06-02 10:00:002023-06-02 10:00:00

加粗为更新/新增数据

  1. [大数据平台] 2023-06-02日业务人员在大数据平台中查看用户表实时数据,期望数据和Mysql业务数据一致,如下:
idnamephonegendercreate_timeupdate_time
1jack1112023-06-01 13:00:002023-06-01 13:00:00
2jason2222023-06-01 13:00:002023-06-01 13:00:00
3tom4442023-06-01 13:00:002023-06-02 09:00:00
4tony5552023-06-02 10:00:002023-06-02 10:00:00
  1. [大数据平台] 2023-06-03 日业务人员在大数据平台中查看2023-06-02日用户表的历史数据,期望数据如下:
idnamephonegendercreate_timeupdate_time
1jack1112023-06-01 13:00:002023-06-01 13:00:00
2jason2222023-06-01 13:00:002023-06-01 13:00:00
3tom3332023-06-01 13:00:002023-06-01 13:00:00
  1. [大数据平台] 2023-06-02日业务人员在大数据分析平台中查看用户表实时变更记录,数据如下:
idnamephonegendercreate_timeupdate_timeop(操作类型)before(变更前数据)dt
3tom4442023-06-01 13:00:002023-06-02 09:00:00u{3,tom,333,男,…}2023-06-02
4tony5552023-06-02 10:00:002023-06-02 10:00:00cnull2023-06-02

根据以上需求,我们可以按照业务表一比三的表比例创建三张实时数仓表:实时表、快照表和流水表。

然而,离线数仓并不适合满足这一需求,原因如下:

  1. 数据精确性问题: 对于对数据精确度要求较高的场景,采用 T+1 的同步方式可能会导致数据不一致的问题。更详细的问题分析和解决方案可以参考我另一篇文章:深入数仓离线数据同步:问题分析与优化措施。
  2. 同步延迟问题: 离线数仓的同步通常为 T+1,而上述需求要求实时查看当天业务数据的变更情况。

接下来,我们将探讨更适合此需求的实现方案。

二、技术架构

鉴于业务数据通常存储在关系型数据库中,这里选择采用Flink-CDC持续读取binlog日志进行实时同步。为了保证实时数据能够高效写入下游并支持用户OLAP查询分析,这里选择了企业中常见的MMP库Doris作为实时数仓的存储层。整体架构如下图所示:

在这里插入图片描述

关于图片中的实时表、流水表、快照表各自详细的实现方式均可在文章结尾相关资料中查看对应文章。

三、实现方式

3.1、表设计

  1. 实时表:以id 为unique key从而保证表数据唯一性
CREATE TABLE `example_user_real`
(
    `id` INT NOT NULL COMMENT '用户id',
    `name` STRING NULL COMMENT '用户昵称',
    `phone` STRING NULL COMMENT '手机号',
    `gender` CHAR(5) NULL COMMENT '用户性别',
    `create_time` DATETIMEV2(0) NULL COMMENT '用户注册时间',
    `update_time` DATETIMEV2(0) NULL COMMENT '用户更新时间'
) ENGINE=OLAP
UNIQUE KEY(`id`)
COMMENT '用户实时表'
DISTRIBUTED BY HASH(id) BUCKETS AUTO;
  1. 流水表:以id、update_time、dt 为unique key从而保证天粒度秒级唯一性,同时采用动态分区按天进行划分
CREATE TABLE `example_user_stream`
(
    `id` largeint(40) NOT NULL COMMENT '用户id',
    `update_time` datetime NULL COMMENT '用户更新时间',
    `dt` date NULL COMMENT '流水日期',
    `create_time` datetime NULL COMMENT '用户注册时间',
    `name` varchar(50) NOT NULL COMMENT '用户昵称',
    `phone` largeint(40) NULL COMMENT '手机号',
    `gender` varchar(5) NULL COMMENT '用户性别',
    `op` varchar(4) NOT NULL COMMENT '每条数据的操作类型:r/c/u/d',
    `before` STRING NULL COMMENT '变更前数据',
    `binlog` STRING NULL COMMENT 'binlog全量日志'
) ENGINE=OLAP
UNIQUE KEY(`id`, `update_time`, `dt`)
COMMENT '用户流水表'
PARTITION BY RANGE(dt)()
DISTRIBUTED BY HASH(id) BUCKETS 8
PROPERTIES
(
    "dynamic_partition.enable" = "true",
    "dynamic_partition.time_unit" = "DAY",
    "dynamic_partition.start" = "-90",
    "dynamic_partition.end" = "3",
    "dynamic_partition.prefix" = "p",
    "dynamic_partition.buckets" = "8"
);
  1. 快照表:以id、dt 为unique key从而保证天粒度唯一性,同时采用动态分区按天进行划分
CREATE TABLE `example_user_snapshot`
(
    `id` largeint(40) NOT NULL COMMENT '用户id',
    `dt` date NULL COMMENT '流水日期',
    `name` varchar(50) NOT NULL COMMENT '用户昵称',
    `phone` largeint(40) NULL COMMENT '手机号',
    `gender` varchar(5) NULL COMMENT '用户性别',
    `create_time` datetime NULL COMMENT '用户注册时间',
    `update_time` datetime NULL COMMENT '用户更新时间'
) ENGINE=OLAP
UNIQUE KEY(`id`, `dt`)
COMMENT '用户快照表'
PARTITION BY RANGE(dt)()
DISTRIBUTED BY HASH(id) BUCKETS 8
PROPERTIES
(
    "dynamic_partition.enable" = "true",
    "dynamic_partition.time_unit" = "DAY",
    "dynamic_partition.start" = "-90",
    "dynamic_partition.end" = "3",
    "dynamic_partition.prefix" = "p",
    "dynamic_partition.buckets" = "8"
);

关于doris数据模型及动态分区语法可参考文章底部相关资料

3.2、实时同步逻辑

3.2.1、前提介绍

  1. 首先,由于实时流水表同步使用Flink-cdc读取关系型数据库,flink-cdc提供了四种模式: “initial”,“earliest-offset”,“latest-offset”,“specific-offset” 和 “timestamp”。本文使用的Flink-connector-mysq是2.3版本,这里简单介绍一下这四种模式:

    • initial (默认):在第一次启动时对受监视的数据库表执行初始快照,并继续读取最新的 binlog。

    • earliest-offset:跳过快照阶段,从可读取的最早 binlog 位点开始读取

    • latest-offset:首次启动时,从不对受监视的数据库表执行快照, 连接器仅从 binlog 的结尾处开始读取,这意味着连接器只能读取在连接器启动之后的数据更改。

    • specific-offset:跳过快照阶段,从指定的 binlog 位点开始读取。位点可通过 binlog 文件名和位置指定,或者在 GTID 在集群上启用时通过 GTID 集合指定。

    • timestamp:跳过快照阶段,从指定的时间戳开始读取 binlog 事件。

  2. 这里采用initial模式作为实时同步方式,先全量后增量,这是由于此次同步多张表需要对 binlog 数据进行解析及判断更新操作类型,因此,Flink CDC SQL 文件方式的表建立不再满足我们的要求。为了更好地实现这一功能,我们需要采用 API 方式来构建解决方案,代码如下:

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import com.ververica.cdc.connectors.mysql.source.MySqlSource;

public class MySqlSourceExample {
  public static void main(String[] args) throws Exception {
    MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
        .hostname("yourHostname")
        .port(yourPort)
        .databaseList("yourDatabaseName") // 设置捕获的数据库, 如果需要同步整个数据库,请将 tableList 设置为 ".*".
        .tableList("yourDatabaseName.yourTableName") // 设置捕获的表
        .username("yourUsername")
        .password("yourPassword")
        .startupOptions(StartupOptions.timestamp(1685548800000L)) // 从2023-06-01零点处读取binlog
        .deserializer(new JsonDebeziumDeserializationSchema()) // 将 SourceRecord 转换为 JSON 字符串
        .build();

    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    // 设置 3s 的 checkpoint 间隔
    env.enableCheckpointing(3000);

    env
      .fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source")
      // 设置 source 节点的并行度为 4
      .setParallelism(4)
      .print().setParallelism(1); // 设置 sink 节点并行度为 1 

    env.execute("Print MySQL Snapshot + Binlog");
  }
}

代码摘自mysql-cdc-connector官网示例

3.2.2、实时表同步阶段

实时表同步十分简单,只需要在Flink程序中编写flinkSQL即可实现,FlinkSQL如下:

# FlinkSQL 创建Mysql User表
create table mysql_user(
`id` INT,
`name` STRING,
`phone` STRING,
`gender` CHAR(5),
`create_time` TIMESTAMP(0),
`update_time` TIMESTAMP(0),
PRIMARY KEY(id) NOT ENFORCED
) WITH ( 
'connector'='mysql-cdc',
'hostname'='10.185.163.177',
'port' = '80',
'username'='rouser',
'password'='123456',
'database-name' = 'database',
'table-name'='user'
);

# FlinkSQL 创建Doris User实时表
create table doris_user(
`id` INT,
`name` STRING,
`phone` STRING,
`gender` STRING,
`create_time` TIMESTAMP(0),
`update_time` TIMESTAMP(0)
) WITH ( 
'password'='password',
'connector'='doris',
'fenodes'='11.113.208.103:8030',
'table.identifier'='database.user',
'sink.label-prefix'='唯一任务标识,每次启动都要唯一',
'username'='username' 
);

# 实时写入
insert into doris_user select * from mysql_user;

3.2.3、流水表、快照表全量同步阶段

  1. 接下来我们将从全量同步开始逐步演示同步过程,这里我们以2023-06-0日的[Mysql]业务数据为例,此时表数据如下:
idnamephonegendercreate_timeupdate_time
1jack1112023-06-01 13:00:002023-06-01 13:00:00
2jason2222023-06-01 13:00:002023-06-01 13:00:00
3tom3332023-06-01 13:00:002023-06-01 13:00:00
  1. 此时Flink应用启动获取到的数据如下:仅展示一条
{
	"before": null,
	"after": {		 # 实际数据
		"id": 1,
		"name": "jack",
		"phone": "111",
		"gender": "男",
		"create_time": "2023-06-01T05:00:00Z",  # 该日期是UTC时间,只需增加8小时即可转化为北京时间
		"update_time": "2023-06-01T05:00:00Z"	# 该日期是UTC时间,只需增加8小时即可转化为北京时间
	},
	"source": {		 # 元数据
		"version": "1.6.4.Final",
		"connector": "mysql",
		"name": "mysql_binlog_source",
		"ts_ms": 0,
		"snapshot": "false",
		"db": "yushu_dds",
		"sequence": null,
		"table": "user",
		"server_id": 0,
		"gtid": null,
		"file": "",
		"pos": 0,
		"row": 0,
		"thread": null,
		"query": null
	},
	"op": "r",  	 # 记录每条数据的操作类型[重要]
	"ts_ms": 1705471382867,
	"transaction": null
}
  1. 在我们使用 Flink CDC MySQL 同步数据时,默认采用 initial 模式,这意味着首先进行全量同步,然后再进行增量同步。因此,在区分全量和增量同步时,关键在于观察获取到的数据中的 op 字段。op 字段是用来记录每条数据的操作类型的标志。具体的操作类型如下:

    • op=d 代表删除操作

    • op=u 代表更新操作

    • op=c 代表新增操作

    • op=r 代表全量读取,而不是来自 binlog 的增量读取

  2. 在 Flink 程序中,只需要通过 op=r 即可筛选出全量数据。在全量数据同步阶段只需将op=r的业务数据直接同步至快照表(之所以将全量数据同步至快照表是为了次日凌晨与流水表变更数据合并成完整数据),流水表在全量阶段无需同步,导入语句如下:

# 快照表
INSERT INTO example_user_snapshot (id, dt, name, phone, gender, create_time, update_time)
VALUES
    (1, '2023-06-01', 'jack', 111, '男', '2023-06-01 13:00:00', '2023-06-01 13:00:00'),
    (2, '2023-06-01', 'jason', 222, '男', '2023-06-01 13:00:00', '2023-06-01 13:00:00'),
    (3, '2023-06-01', 'tom', 333, '男', '2023-06-01 13:00:00', '2023-06-01 13:00:00');
  1. 此时doris快照表数据如下所示:
iddtnamephonegendercreate_timeupdate_time
12023-06-01jack1112023-06-01 13:00:002023-06-01 13:00:00
22023-06-01jason2222023-06-01 13:00:002023-06-01 13:00:00
32023-06-01tom3332023-06-01 13:00:002023-06-01 13:00:00
  1. 此时doris流水表数据如下所示:全量阶段流水表无需同步
idupdate_timedtcreate_timenamephonegenderopbeforebinlog
NULLNULLNULLNULLNULLNULLNULLNULLNULLNULL

3.2.4、流水表、快照表增量同步阶段

  1. 这里我们以2023-06-02日的[Mysql]业务数据为例,新增了一名tony用户,且更改了tom的手机号,此时表数据如下:
idnamephonegendercreate_timeupdate_time
1jack1112023-06-01 13:00:002023-06-01 13:00:00
2jason2222023-06-01 13:00:002023-06-01 13:00:00
3tom4442023-06-01 13:00:002023-06-02 09:00:00
4tony5552023-06-02 10:00:002023-06-02 10:00:00
  1. 此时Flink应用获取到的数据如下:
# 新增tony变更数据如下
{
	"before": null,
	"after": {
		"id": 4,
		"name": "tony",
		"phone": "666",
		"gender": "男",
		"create_time": "2023-06-02T02:00:00Z",
		"update_time": "2023-06-02T02:00:00Z"
	},
	"source": {
		# 元数据信息忽略
	},
	"op": "c", # 操作类型
	"ts_ms": 1706768344113,
	"transaction": null
}
# tom手机号333->444变更数据如下
{
	"before": {
		"id": 3,
		"name": "tom",
		"phone": "333",
		"gender": "男",
		"create_time": "2023-06-01T05:00:00Z",
		"update_time": "2023-06-01T05:00:00Z"
	},
	"after": {
		"id": 3,
		"name": "tom",
		"phone": "444",
		"gender": "男",
		"create_time": "2023-06-01T05:00:00Z",
		"update_time": "2023-06-01T23:00:00Z"
	},
	"source": {
		# 元数据信息忽略
	},
	"op": "u", # 操作类型
	"ts_ms": 1706768454904,
	"transaction": null
}
  1. 当 Flink 同步程序接收到 op=c/u/d 表示增量更新数据时,提取其中的 opbeforeafter 数据。接着将这些信息拼装成 Doris 的 INSERT 语句后插入到流水表中,此时流水表数据如下所示:
idupdate_timedtcreate_timenamephonegenderopbeforebinlog
42023-06-02 10:00:002023-06-022023-06-02 10:00:00tony555cNULL{“before”:null,“after”:{“id”:4,“name”:“tony”,“phone”:“666”,“gender”:“男”,“create_time”:“2023-06-02T02:00:00Z”,“update_time”:“2023-06-02T02:00:00Z”},“source”:{“version”:“1.6.4.Final”,“connector”:“mysql”,“name”:“mysql_binlog_source”,“ts_ms”:1706768344000,“snapshot”:“false”,“db”:“yushu_dds”,“sequence”:null,“table”:“user”,“server_id”:2307031958,“gtid”:“71221bfd-56e8-11ee-8275-fa163e4ecceb:33719321”,“file”:“3509-binlog.000191”,“pos”:643757739,“row”:0,“thread”:null,“query”:null},“op”:“c”,“ts_ms”:1706768344113,“transaction”:null}
32023-06-02 08:00:002023-06-022023-06-02 13:00:00tom444u{“id”:3,“name”:“tom”,“phone”:“333”,“gender”:“男”,“create_time”:“2023-06-01T05:00:00Z”,“update_time”:“2023-06-01T05:00:00Z”}{“before”:{“id”:3,“name”:“tom”,“phone”:“333”,“gender”:“男”,“create_time”:“2023-06-01T05:00:00Z”,“update_time”:“2023-06-01T05:00:00Z”},“after”:{“id”:3,“name”:“tom”,“phone”:“444”,“gender”:“男”,“create_time”:“2023-06-01T05:00:00Z”,“update_time”:“2023-06-01T23:00:00Z”},“source”:{“version”:“1.6.4.Final”,“connector”:“mysql”,“name”:“mysql_binlog_source”,“ts_ms”:1706768454000,“snapshot”:“false”,“db”:“yushu_dds”,“sequence”:null,“table”:“user”,“server_id”:2307031958,“gtid”:“71221bfd-56e8-11ee-8275-fa163e4ecceb:33719761”,“file”:“3509-binlog.000191”,“pos”:692873739,“row”:0,“thread”:null,“query”:null},“op”:“u”,“ts_ms”:1706768454904,“transaction”:null}
  1. 因增量数据无需同步至快照表,故此时快照表与之前06-01号一样保持不变,快照表数据如下:
iddtnamephonegendercreate_timeupdate_time
12023-06-01jack1112023-06-01 13:00:002023-06-01 13:00:00
22023-06-01jason2222023-06-01 13:00:002023-06-01 13:00:00
32023-06-01tom3332023-06-01 13:00:002023-06-01 13:00:00

3.2.5、合并阶段

在合并阶段,我们将流水表前一天的数据与快照表中前两天的数据进行整合,最终得到前一天的全量数据,并将其写入至快照表的前一天分区。

合并任务会在满足以下任意一个条件时触发:

  1. 当binlog数据中的日期为第二天。
  2. 当凌晨过了5分钟(这是一个自定义的时间阈值)。

第二个条件的存在是因为业务数据很可能在凌晨00:00 ~ 00:05 分之间没有增量数据。因此,即使在没有业务数据同步的情况下,我们仍然可以通过第二个条件触发合并阶段,确保数据的完整性和准确性。


  1. 这里我们假设2023-06-03 00:05:00 触发合并阶段为例,此时业务数据如下所示:
idnamephonegendercreate_timeupdate_time
1jack1112023-06-01 13:00:002023-06-01 13:00:00
2jason2222023-06-01 13:00:002023-06-01 13:00:00
3tom4442023-06-01 13:00:002023-06-02 09:00:00
4tony5552023-06-02 10:00:002023-06-02 10:00:00
  1. flink程序中无新增数据,但由于满足第二个触发条件,在flink程序中将会触发合并任务[可用单独线程实现],此时执行的doris合并语句如下:
INSERT INTO example_user_snapshot (id, dt, name, phone, gender, create_time, update_time)
SELECT
    id,
    '2023-06-02' as dt, -- 通过固定dt字段值从而写入快照表p20230602分区中
    name,
    phone,
    gender,
    create_time,
    update_time
FROM (
         SELECT
             snap.id,
             snap.name,
             snap.phone,
             snap.gender,
             snap.create_time,
             snap.update_time
         FROM example_user_snapshot PARTITION p20230601 snap
    LEFT JOIN example_user_stream PARTITION p20230602 stream ON snap.id = stream.id
         WHERE stream.id IS NULL
         UNION
         SELECT
             id,
             name,
             phone,
             gender,
             create_time,
             update_time
         FROM (
             SELECT
             id,
             name,
             phone,
             gender,
             create_time,
             update_time,
			 -- 使用窗口函数的目的是处理流水表中可能存在多条相同id的记录,例如tom在06-02日更改多次手机号则会有多条相同id的数据,故此窗口函数用于确保选择每个id对应的update_time最大的记录;如果流水表设计的unique key = (id) 则不会出现重复情况无需此处的窗口函数。
             ROW_NUMBER() OVER (PARTITION BY id ORDER BY update_time DESC) AS row_num 
             FROM example_user_stream PARTITION p20230602
             ) ranked
         WHERE row_num = 1
     ) AS temp;

该 SQL 查询是先获取两表联接中未更新的数据,与已更新的数据合并,最后写入到快照表中,确保了 2023-06-02 分区的数据是完整的全量数据。

若想详细剖析此sql的运算逻辑可参考笔者另一篇文章:数仓日常维护:剖析每日增量同步的内部机制

  1. 此时快照表的数据如下:
iddtnamephonegendercreate_timeupdate_time
12024-02-02jack1112023-06-01 13:00:002023-06-01 13:00:00
22024-02-02jason2222023-06-01 13:00:002023-06-01 13:00:00
32024-02-02tom3332023-06-01 13:00:002023-06-01 13:00:00
12024-02-03jack1112023-06-01 13:00:002023-06-01 13:00:00
22024-02-03jason2222023-06-01 13:00:002023-06-01 13:00:00
32024-02-03tom5552023-06-02 13:00:002023-06-02 09:00:00
42024-02-03tony5552023-06-02 10:00:002023-06-02 10:00:00
  1. 用户可以通过如下语句查询2023-06-02全量数据:
SELECT * FROM example_user_snapshot PARTITION p20230602;
12024-02-03jack1112023-06-01 13:00:002023-06-01 13:00:00
22024-02-03jason2222023-06-01 13:00:002023-06-01 13:00:00
32024-02-03tom5552023-06-02 13:00:002023-06-02 09:00:00
42024-02-03tony5552023-06-02 10:00:002023-06-02 10:00:00

合并阶段的主要压力是Doris,Flink程序只是传递sql执行后获取结果即可;至此实时快照表同步逻辑结束。

5.3、数据一致性设计

  1. 在上述快照表同步过程中,如果Flink程序挂掉或者重启,是否会影响数据一致性?由于Flink程序是通过定时执行checkpoint且binlog可重读溯源,因此在数据获取阶段不会出现数据一致性问题。

  2. 需要考虑的地方在于合并阶段,如果触发了合并任务,而此时Flink程序还在不断消费业务变更数据,这里是异步还是阻塞?笔者建议使用异步:即Flink程序仍实时同步业务变更数据至流水表,而快照表的合并阶段主要是下沉到Doris库中执行。

  3. 需要注意的是如果在合并阶段时Flink程序挂掉,重启后该如何处理?笔者建议在Flink程序中采用有状态的计算,即Rich functions 富函数中的ValueState,用于记录当前合并阶段是否成功,如下:

javaCopy codeimport org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;

public class TestMapFunction extends RichMapFunction<Tuple2<String, Integer>, Tuple2<String, Integer>>  {
    // state 用于存放合并后的分区,例如: state=p20230601
    private transient ValueState<String> state;

    @Override
    public Tuple2<String, Integer> map(Tuple2<String, Integer> in) throws Exception {
        // 业务逻辑
    }

    public void open(Configuration parameters) throws Exception {
        // 初始化 state
    }
}

通过这种方式,即便Flink在同步过程中宕掉,只要根据checkpoint重启后便可检测到上一个分区任务失败,即state != 20230602,从而再次触发合并阶段!

关于flink有状态的计算可参考Flink官网介绍

四、总结

本文主要整合了实时表、快照表和流水表在实际应用中的使用场景。在实践中,通常一张业务数据表会对应三张实时数仓表供大数据平台使用。虽然也可以通过一张快照表来满足上述需求,但从查询性能的角度来看,并不如分表性能优秀。读者可以根据实际使用场景进行选择,以确保系统的性能和效率。

六、相关资料

  • Flink实时数仓同步:实时表实战详解

  • Flink实时数仓同步:流水表实战详解

  • Flink实时数仓同步:快照表实战详解

  • Flink实时数仓同步:拉链表实战详解

  • Doris 数据模型

  • Flink状态计算

  • MySQL CDC Connector

  • 数仓日常维护:剖析每日增量同步的内部机制

  • 深入数仓离线数据同步:问题分析与优化措施

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1509636.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

基于android的物业管理系统的设计与实现19.8

目录 基于android的物业管理系统的设计与实现 3 摘 要 3 Android property managemengt system 5 Abstract 5 1 绪论 6 1.1 选题背景 6 1.2 课题研究现状 6 1.3 设计研究主要内容 7 1.4 系统主要设计思想 8 2 开发环境 8 2.1 Android系统的结构 8 图2-1 Android系统架构图 9 2…

【C++精简版回顾】21.迭代器,实现迭代器

1.什么是迭代器&#xff1f; 用来遍历容器&#xff0c;访问容器数据。 2.迭代器使用 1.初始化 //初始化 list<int> mylist;//list的整数对象 list<int>::iterator iter;//list内部类&#xff0c;迭代器对象(正向输出) list<int>::reverse_iterator riter;//…

详解数据挖掘

数据挖掘&#xff08;Data Mining&#xff09;&#xff0c;又译为资料探勘、数据采矿&#xff0c;是数据库知识发现&#xff08;Knowledge-Discovery in Databases&#xff0c;简称&#xff1a;KDD&#xff09;中的一个步骤。数据挖掘主要是指从大量的数据中&#xff0c;通过算…

如何选择软文推广渠道?媒介盒子分享

想要做好一个品牌&#xff0c;除了软文文案、推广方式要不断更新外&#xff0c;软文推广渠道也十分重要。有许多企业在创立之初容易踩平台没选好的坑。渠道是品牌触达用户的关键点&#xff0c;今天媒介盒子就来和大家聊聊&#xff1a;如何选择软文发布平台。 一、 了解平台用户…

Java项目:47 ssm007亚盛汽车配件销售业绩管理统+jsp(含文档)

作者主页&#xff1a;源码空间codegym 简介&#xff1a;Java领域优质创作者、Java项目、学习资料、技术互助 文中获取源码 项目介绍 亚盛汽车配件销售业绩管理系统根据调研&#xff0c;确定管理员管理客户&#xff0c;供应商&#xff0c;员工。 管理配件和配件的进货以及出售…

Python中的异常处理及最佳实践【第125篇—异常处理】

Python中的异常处理及最佳实践 异常处理是编写健壮、可靠和易于调试的Python代码中不可或缺的一部分。在本文中&#xff0c;我们将深入探讨Python中的异常处理机制&#xff0c;并分享一些最佳实践和代码示例&#xff0c;以帮助您更好地处理错误情况和提高代码的稳定性。 异常…

VUE3项目学习系列--Axios二次封装(五)

Axios中文文档 | Axios中文网 (axios-http.cn) Axios 是一个基于 promise 网络请求库&#xff0c;作用于node.js 和浏览器中。 它是 isomorphic 的(即同一套代码可以运行在浏览器和node.js中)。在服务端它使用原生 node.js http 模块, 而在客户端 (浏览端) 则使用 XMLHttpRequ…

【SpringCloud微服务实战01】Eureka 注册中心

前言 在 Eureka 架构中,微服务角色有两类: EurekaServer :服务端,注册中心 记录服务信息 心跳监控 EurekaClient :客户端 Provider :服务提供者,例如案例中的 user-service 注册自己的信息到 EurekaS…

java SSM在线学习网站系统myeclipse开发mysql数据库springMVC模式java编程计算机网页设计

一、源码特点 java SSM在线学习网站系统是一套完善的web设计系统&#xff08;系统采用SSM框架进行设计开发&#xff0c;springspringMVCmybatis&#xff09;&#xff0c;对理解JSP java编程开发语言有帮助&#xff0c;系统具有完整的源代码和数据库&#xff0c;系统主要采用…

ubuntu搭建HTTP/3 协议的 Nginx QUIC

ubuntu搭建HTTP/3 协议的 Nginx QUIC 什么是 HTTP/3 和 QUIC&#xff1f; HTTP/3 是一种基于 QUIC (Quick UDP Internet Connections) 协议的 HTTP 协议版本&#xff0c;它是 HTTP/2 的后继者&#xff0c;旨在改进 Web 性能和安全性。 HTTP/3 与之前的 HTTP 协议有很大的不同…

谷粒商城实战(004 整合elasticSearch(es)搜索引擎)

Java项目《谷粒商城》架构师级Java项目实战&#xff0c;对标阿里P6-P7&#xff0c;全网最强 总时长 104:45:00 共408P 此文章包含第125p-第p127的内容 整合es 最好使用Eleasticsearch-Rest-Client 24年改用Java API Client 其实可以直接用js直接调用es 进行查询&#xff0c;这…

力扣大厂热门面试算法题 12-14

12. 整数转罗马数字&#xff0c;13. 罗马数字转整数&#xff0c;14. 最长公共前缀&#xff0c;每题做详细思路梳理&#xff0c;配套Python&Java双语代码&#xff0c; 2024.03.11 可通过leetcode所有测试用例。 目录 12. 整数转罗马数字 解题思路 完整代码 Java Pytho…

5G CA频段组合与带宽的射频标准

先来复习一下我们前面学习过的章节后缀所代表的含义&#xff1a; None Single CarrierA Carrier Aggregation (CA)B Dual-Connectivity (DC)C Supplement Uplink (SUL)D UL MIMOE V2XF Shared spectrum channel accessG Tx Diversity (TxD)I …

【MySQL探索之旅】数据表的基本操作(附带思维导图)

&#x1f4da;博客主页&#xff1a;爱敲代码的小杨. ✨专栏&#xff1a;《Java SE语法》 | 《数据结构与算法》 | 《C生万物》 |《MySQL探索之旅》 ❤️感谢大家点赞&#x1f44d;&#x1f3fb;收藏⭐评论✍&#x1f3fb;&#xff0c;您的三连就是我持续更新的动力❤️ &…

maven运行spring boot项目

我用idea想跑一个整合flowable的spring boot项目&#xff0c;但是跑不起来&#xff0c;原因是jdk版本不够高。但是我的idea是2018版本&#xff0c;最高只能支持到jdk11。就想办法不用idea编译、打包、运行项目。因为spring boot是maven项目&#xff0c;所以可以用maven进行打包…

04-微服务 面试题

1.Spring Cloud 常见的组件有哪些? Spring Cloud 5大组件有哪些? 基础的内容考察回答原则:简单的问题不能答错(一道面试题就能淘汰一个人)新手和老手都要注意面试参考回答: 面试官:Spring Cloud 5大组件有哪些? 候选人:早期我们一般认为的Spring Cloud五大组件是 …

EXCEL根据某列的数字N,增加N-1行相同的数据

因为工作需要&#xff0c;需要将表格数据拆分&#xff0c;类似于相同的订单有6笔&#xff0c;数据表中就是一行数据但是订单数为6&#xff0c;但是需要将其拆分成相同6笔的订单数为1的数据行。 需要使用VBA代码&#xff0c;具体做法如下&#xff1a; Dim i As Long, j As Long…

web3D三维引擎(Direct3D、OpenGL、UE、U3D、threejs)基础扫盲

Hi&#xff0c;我是贝格前端工场的老司机&#xff0c;本文介绍文web3D的几个引擎&#xff0c;做个基础扫盲&#xff0c;如果还不能解决问题&#xff0c;可以私信我&#xff0c;搞私人订制呦。 三维引擎是指用于创建和渲染三维图形的软件框架。它们通常提供了图形处理、物理模拟…

Springboot+vue的疫情管理系统(有报告)。Javaee项目,springboot vue前后端分离项目。

演示视频&#xff1a; Springbootvue的疫情管理系统&#xff08;有报告&#xff09;。Javaee项目&#xff0c;springboot vue前后端分离项目。 项目介绍&#xff1a; 采用M&#xff08;model&#xff09;V&#xff08;view&#xff09;C&#xff08;controller&#xff09;三层…

蓝桥杯2019年第十届省赛真题-修改数组

查重类题目&#xff0c;想到用标记数组记录是否出现过 但是最坏情况下可能会从头找到小尾巴&#xff0c;时间复杂度O(n2)&#xff0c;数据范围106显然超时 再细看下题目&#xff0c;我们重复进行了寻找是否出现过&#xff0c;干脆把每个元素出现过的次数k记录下来&#xff0c;直…