FlinkSQL ChangeLog

news2024/11/16 11:37:39

01 Changelog相关优化规则

0101 运行upsert-kafka作业

登录sql-client,创建一个upsert-kafka的sql作业(注意,这里发送给kafka的消息必须带key,普通只有value的消息无法解析,这里的key即是主键的值)

CREATE TABLE pageviews_per_region (
  user_region STRING,
  pv STRING,
  PRIMARY KEY (user_region) NOT ENFORCED  -- 设置主键
) WITH (
  'connector' = 'upsert-kafka',
  'topic' = 'pageviews_per_region',
  'properties.bootstrap.servers' = 'xxxxxx:9092',
  'key.format' = 'csv',
  'value.format' = 'csv'
);

select * from pageviews_per_region;

发送消息带key和消费消息显示key方式如下

kafka-console-producer.sh --broker-list xxxxxx:9092 --topic pageviews_per_region --property "parse.key=true" --property "key.separator=:"
key1:value1,value1
key2:value2,value2

kafka-console-consumer.sh --bootstrap-server xxxxxx:9092 --topic pageviews_per_region --from-beginning --property print.key=true

作业的DAG图如下
在这里插入图片描述

0102 StreamPhysicalChangelogNormalize

DAG图中有一个ChangelogNormalize,代码中搜索到对应的类是StreamPhysicalChangelogNormalize,这是一个对changelog数据做规范化的类,注释如下

/**
 * Stream physical RelNode which normalizes a changelog stream which maybe an upsert stream or a
 * changelog stream containing duplicate events. This node normalize such stream into a regular
 * changelog stream that contains INSERT/UPDATE_BEFORE/UPDATE_AFTER/DELETE records without
 * duplication.
 */
class StreamPhysicalChangelogNormalize(

功能就是转成对应的exec节点

override def translateToExecNode(): ExecNode[_] = {
  val generateUpdateBefore = ChangelogPlanUtils.generateUpdateBefore(this)
  new StreamExecChangelogNormalize(
    unwrapTableConfig(this),
    uniqueKeys,
    generateUpdateBefore,
    InputProperty.DEFAULT,
    FlinkTypeFactory.toLogicalRowType(getRowType),
    getRelDetailedDescription)
}

0103 StreamPhysicalTableSourceScanRule

StreamPhysicalChangelogNormalize是在优化规则StreamPhysicalTableSourceScanRule当中创建的,如下流式的FlinkLogicalTableSourceScan会应用该规则

class StreamPhysicalTableSourceScanRule
  extends ConverterRule(
    classOf[FlinkLogicalTableSourceScan],
    FlinkConventions.LOGICAL,
    FlinkConventions.STREAM_PHYSICAL,
    "StreamPhysicalTableSourceScanRule") {

创建StreamPhysicalChangelogNormalize,也就是转为changelog的条件如下

if (
  isUpsertSource(resolvedSchema, table.tableSource) ||
  isSourceChangeEventsDuplicate(resolvedSchema, table.tableSource, config)
) {

isUpsertSource判断是否为upsert流,判断逻辑如下

public static boolean isUpsertSource(
        ResolvedSchema resolvedSchema, DynamicTableSource tableSource) {
    if (!(tableSource instanceof ScanTableSource)) {
        return false;
    }
    ChangelogMode mode = ((ScanTableSource) tableSource).getChangelogMode();
    boolean isUpsertMode =
            mode.contains(RowKind.UPDATE_AFTER) && !mode.contains(RowKind.UPDATE_BEFORE);
    boolean hasPrimaryKey = resolvedSchema.getPrimaryKey().isPresent();
    return isUpsertMode && hasPrimaryKey;
}

其中ChangelogMode在各自数据源实现类的getChangelogMode接口中定义,如JDBC只支持insert

@Override
public ChangelogMode getChangelogMode() {
    return ChangelogMode.insertOnly();
}

isSourceChangeEventsDuplicate判断不是upsert的更改流,判断逻辑如下

public static boolean isSourceChangeEventsDuplicate(
        ResolvedSchema resolvedSchema,
        DynamicTableSource tableSource,
        TableConfig tableConfig) {
    if (!(tableSource instanceof ScanTableSource)) {
        return false;
    }
    ChangelogMode mode = ((ScanTableSource) tableSource).getChangelogMode();
    boolean isCDCSource =
            !mode.containsOnly(RowKind.INSERT) && !isUpsertSource(resolvedSchema, tableSource);
    boolean changeEventsDuplicate =
            tableConfig.get(ExecutionConfigOptions.TABLE_EXEC_SOURCE_CDC_EVENTS_DUPLICATE);
    boolean hasPrimaryKey = resolvedSchema.getPrimaryKey().isPresent();
    return isCDCSource && changeEventsDuplicate && hasPrimaryKey;
}

综合来说要走StreamPhysicalChangelogNormalize这一条调用链,就不能是insertOnly的数据源,但目前大部分Flink实现的数据源包括Iceberg都是insertOnly的

0104 更新模式

Flink相关的更新模式类有如下几个:RowKind、ChangelogMode、UpdateKind

  • RowKind

RowKind是定义更新流每条数据的类型,其中对于更新有;两条数据,一条删除旧数据,一条插入新数据

/** Insertion operation. */
INSERT("+I", (byte) 0),

/**
 * Update operation with the previous content of the updated row.
 *
 * <p>This kind SHOULD occur together with {@link #UPDATE_AFTER} for modelling an update that
 * needs to retract the previous row first. It is useful in cases of a non-idempotent update,
 * i.e., an update of a row that is not uniquely identifiable by a key.
 */
UPDATE_BEFORE("-U", (byte) 1),

/**
 * Update operation with new content of the updated row.
 *
 * <p>This kind CAN occur together with {@link #UPDATE_BEFORE} for modelling an update that
 * needs to retract the previous row first. OR it describes an idempotent update, i.e., an
 * update of a row that is uniquely identifiable by a key.
 */
UPDATE_AFTER("+U", (byte) 2),

/** Deletion operation. */
DELETE("-D", (byte) 3);
  • ChangelogMode

ChangelogMode定义数据源的更新模式,主要三种,就是包含不同的RowKind的类型

private static final ChangelogMode INSERT_ONLY =
        ChangelogMode.newBuilder().addContainedKind(RowKind.INSERT).build();

private static final ChangelogMode UPSERT =
        ChangelogMode.newBuilder()
                .addContainedKind(RowKind.INSERT)
                .addContainedKind(RowKind.UPDATE_AFTER)
                .addContainedKind(RowKind.DELETE)
                .build();

private static final ChangelogMode ALL =
        ChangelogMode.newBuilder()
                .addContainedKind(RowKind.INSERT)
                .addContainedKind(RowKind.UPDATE_BEFORE)
                .addContainedKind(RowKind.UPDATE_AFTER)
                .addContainedKind(RowKind.DELETE)
                .build();
  • UpdateKind

UpdateKind是针对update这种更新类型细分

/** NONE doesn't represent any kind of update operation. */
NONE,

/**
 * This kind indicates that operators should emit update changes just as a row of {@code
 * RowKind#UPDATE_AFTER}.
 */
ONLY_UPDATE_AFTER,

/**
 * This kind indicates that operators should emit update changes in the way that a row of {@code
 * RowKind#UPDATE_BEFORE} and a row of {@code RowKind#UPDATE_AFTER} together.
 */
BEFORE_AND_AFTER

02 StreamExecChangelogNormalize

StreamExecChangelogNormalize的处理流程中根据是否启用table.exec.mini-batch.enabled分为微批处理和单数据的流处理

微批处理使用ProcTimeMiniBatchDeduplicateKeepLastRowFunction,流式使用ProcTimeDeduplicateKeepLastRowFunction,两者的核心差别就是微批会缓存数据使用一个for循环处理

这两个函数除了StreamPhysicalChangelogNormalize这一条链路外,还有StreamExecDeduplicate这一条链路,对应StreamPhysicalRankRule,是一个排序的东西

for (Map.Entry<RowData, RowData> entry : buffer.entrySet()) {
    RowData currentKey = entry.getKey();
    RowData currentRow = entry.getValue();
    ctx.setCurrentKey(currentKey);
    if (inputInsertOnly) {
        processLastRowOnProcTime(
                currentRow,
                generateUpdateBefore,
                generateInsert,
                state,
                out,
                isStateTtlEnabled,
                equaliser);
    } else {
        processLastRowOnChangelog(
                currentRow, generateUpdateBefore, state, out, isStateTtlEnabled, equaliser);
    }
}
  • processLastRowOnProcTime

对数据按照时间语义进行去重,将当前数据作为最新,这个函数只针对insert only的数据

static void checkInsertOnly(RowData currentRow) {
    Preconditions.checkArgument(currentRow.getRowKind() == RowKind.INSERT);
}

整套处理逻辑就是对数据根据场景修改数据的RowKind类型

} else {
    if (generateUpdateBefore) {
        preRow.setRowKind(RowKind.UPDATE_BEFORE);
        out.collect(preRow);
    }
    currentRow.setRowKind(RowKind.UPDATE_AFTER);
    out.collect(currentRow);
}
  • processLastRowOnChangelog

这个函数就是按Key去重,本质上也是针对数据修改RowKind

核心的一块功能就是更新的时候要将前一个数据修改为UPDATE_BEFORE

} else {
    if (generateUpdateBefore) {
        preRow.setRowKind(RowKind.UPDATE_BEFORE);
        out.collect(preRow);
    }
    currentRow.setRowKind(RowKind.UPDATE_AFTER);
    out.collect(currentRow);
}

函数整体借用的是Flink的state功能,从状态中获取前面的数据,所以对状态缓存由要求;另外针对非删除型的数据,如果TTL没有开的话,就不会更新前面的数据

if (!isStateTtlEnabled && equaliser.equals(preRow, currentRow)) {
    // currentRow is the same as preRow and state cleaning is not enabled.
    // We do not emit retraction and update message.
    // If state cleaning is enabled, we have to emit messages to prevent too early
    // state eviction of downstream operators.
    return;
}

03 初始RowKind来源

前面的流程里,在进行changelog转换的时候,数据是已经存在一个RowKind的值了,这一章追踪初始RowKind的来源

基于Flink-27的设计,Kafka数据源处理任务有一个KafkaRecordEmitter,emitRecord当中做数据的反序列化

deserializationSchema.deserialize(consumerRecord, sourceOutputWrapper);

最终走到DeserializationSchema.deserialize完成最终的反序列化

default void deserialize(byte[] message, Collector<T> out) throws IOException {
    T deserialize = deserialize(message);
    if (deserialize != null) {
        out.collect(deserialize);
    }
}

这里message是一个二进制数组,实际是Kafka的数据类型ConsumerRecord。根据SQL当中的配置,value反序列化使用的是csv,所以走到CsvRowDataDeserializationSchema当中处理

final JsonNode root = objectReader.readValue(message);
return (RowData) runtimeConverter.convert(root);

这里读出来的root是数据的key,convert的转化的实现类是CsvToRowDataConverters,其createRowConverter接口当中创建了转化函数,函数中将数据转化为了Flink的数据类型GenericRowData

GenericRowData row = new GenericRowData(arity);

GenericRowData的定义当中,有初始化RowKind,就是insert

public GenericRowData(int arity) {
    this.fields = new Object[arity];
    this.kind = RowKind.INSERT; // INSERT as default
}

04 Iceberg流式更新

使用方式

CREATE CATALOG hive_catalog WITH (
  'type'='iceberg',
  'catalog-type'='hive',
  'uri'='thrift://xxxx:19083',
  'clientimecol'='5',
  'property-version'='1',
  'warehouse'='hdfs://nameservice/spark'  //是否HADOOP_CONF_DIR要export一下
);

use CATALOG hive_catalog;

CREATE TABLE test2(
id BIGINT COMMENT 'unique id',
data STRING,
primary key(id) not ENFORCED
);
ALTER TABLE test2 SET('format-version'='2');

SET table.exec.iceberg.use-flip27-source = true;

SELECT * FROM test2 /*+ OPTIONS('streaming'='true', 'monitor-interval'='10s')*/ ;

Iceberg流式更新目前只支持Append的数据,不支持更新删除

参考kafka,追踪IcebergSourceRecordEmitter,发现没有做数据转换,直接做了数据转发

public void emitRecord(
    RecordAndPosition<T> element, SourceOutput<T> output, IcebergSourceSplit split) {
  output.collect(element.record());
  split.updatePosition(element.fileOffset(), element.recordOffset());
}

数据格式的构建在更前面读数据的时候就完成了,读数据的核心逻辑在DataIterator

private void updateCurrentIterator() {
    try {
      while (!currentIterator.hasNext() && tasks.hasNext()) {
        currentIterator.close();
        currentIterator = openTaskIterator(tasks.next());
        fileOffset += 1;
        recordOffset = 0L;
      }
    } catch (IOException e) {
      throw new UncheckedIOException(e);
    }
  }
}

主要的功能类就是currentIterator,实现类为RowDataFileScanTaskReader,最终调用下一层iterator,下一层的实现类根据文件类型不同,parquet的实现类为ParquetReader,在next中读取数据

public T next() {
  if (valuesRead >= nextRowGroupStart) {
    advance();
  }

  if (reuseContainers) {
    this.last = model.read(last);
  } else {
    this.last = model.read(null);
  }
  valuesRead += 1;

  return last;
}

model实现类为ParquetValueReaders

public final T read(T reuse) {
  I intermediate = newStructData(reuse);

  for (int i = 0; i < readers.length; i += 1) {
    set(intermediate, i, readers[i].read(get(intermediate, i)));
    // setters[i].set(intermediate, i, get(intermediate, i));
  }

  return buildStruct(intermediate);
}

newStructData构建数据,创建了GenericRowData

protected GenericRowData newStructData(RowData reuse) {
  if (reuse instanceof GenericRowData) {
    return (GenericRowData) reuse;
  } else {
    return new GenericRowData(numFields);
  }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1483001.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

linux系统Jenkins工具参数化构建

Jenkins参数化构建 web服务器jenkins服务器编写主机清单编写脚本代码 jenkins服务web页面操作 web服务器 下载nginx 下载gitcd /usr/share/nginxrm -rf htmlgit clone http://root:Qq123456192.168.188.176/ximu/test-nginx.gitmv test-nginx/ htmljenkins服务器 下载ansible…

前端Vue自定义勾选协议组件的开发与应用

摘要&#xff1a; 随着前端技术的不断发展&#xff0c;用户体验成为了软件开发中的关键要素。在登录、注册等场景中&#xff0c;勾选协议是常见的需求。本文旨在介绍一款基于 Vue.js 的自定义勾选协议组件的开发与应用&#xff0c;该组件适用于多种场景&#xff0c;并且具备良…

虚拟机部署Sentry步骤,国内地址

Unity3D特效百例案例项目实战源码Android-Unity实战问题汇总游戏脚本-辅助自动化Android控件全解手册再战Android系列Scratch编程案例软考全系列Unity3D学习专栏蓝桥系列ChatGPT和AIGC &#x1f449;关于作者 专注于Android/Unity和各种游戏开发技巧&#xff0c;以及各种资源分…

【计算机网络】TCP 如何实现可靠传输

TCP通过三次握手建立连接&#xff0c;四次挥手释放连接&#xff0c;确保连接建立和连接释放的可靠。 序列号、检验和、确认应答信号、重发机制、连接管理、窗口控制、流量控制、拥塞控制 标准回答 可靠传输就是通过TCP连接传送的数据是没有差错、不会丢失、不重复并且按序到达的…

【C++ map和set】

文章目录 map和set序列式容器和关联式容器键值对setset的主要操作 mapmap主要操作 multiset和multimap map和set 序列式容器和关联式容器 之前我们接触的vector,list,deque等&#xff0c;这些容器统称为序列式容器&#xff0c;其底层为线性序列的的数据结构&#xff0c;里面存…

Mac专用投屏工具AirServer 7.27 for Mac中文版2024最新图文教程

Mac专用投屏工具AirServer 7.27 for Mac中文版是一款适用于Mac的投屏工具&#xff0c;可以将Mac屏幕快速投影到其他设备上&#xff0c;如电视、投影仪、平板等。 Mac专用投屏工具AirServer 7.27 for Mac中文版具有优秀的兼容性&#xff0c;可以与各种设备配合使用。无论是iPhon…

[方案实操]中国电子副总陆志鹏:《数据资产化路径的思考与探索》演讲实录和解析

中国数字经济发展和治理学术年会&#xff08;2023&#xff09;上&#xff0c;中国电子党组成员、副总经理&#xff0c;50人论坛委员陆志鹏先生《数据资产化路径的思考与探索》为题进行了主旨演讲&#xff0c;提出“如果简单把资源进行评估定价&#xff0c;价值非常有限&#xf…

STM32标准库开发—实时时钟(BKP+RTC)

BKP配置结构 注意事项 BKP基本操作 时钟初始化 RCC_APB1PeriphClockCmd(RCC_APB1Periph_PWR, ENABLE);RCC_APB1PeriphClockCmd(RCC_APB1Periph_BKP, ENABLE);PWR_BackupAccessCmd(ENABLE);//设置PWR_CR的DBP&#xff0c;使能对PWR以及BKP的访问读写寄存器操作 uint16_t ArrayW…

springboot基于web的网上摄影工作室的开发与实现论文

网上摄影工作室 摘要 随着信息技术在管理上越来越深入而广泛的应用&#xff0c;管理信息系统的实施在技术上已逐步成熟。本文介绍了网上摄影工作室的开发全过程。通过分析网上摄影工作室管理的不足&#xff0c;创建了一个计算机管理网上摄影工作室的方案。文章介绍了网上摄影工…

详细介绍如何用windows自带Hyper-V安装虚拟机(windows11和ubuntu22)

通过系统自带的hyper-v安装windows11&#xff0c;舒服又惬意&#xff0c;相比用第三方虚拟机软件速度快很多。 硬件准备 准备 系统需要符合能安装 Hyper-V 的最低要求windows版本含Hyper-V的功能 电脑空间 电脑要有足够的空间来安装你这个虚拟机。根据自己的磁盘容量情况来规…

鸿蒙系统的开发与学习:一、安装工具与处理报错

前言&#xff1a; 鸿蒙系统的学习与记录。 1 、使用开发工具&#xff1a;deveco-studio 1&#xff09;这个是工具的安装 2&#xff09;这个是工具包&#xff0c;里面包含了 obpm&#xff0c;如果你装不上这个&#xff0c;可以使用工具包内部的 2、安装 官方安装教程&#xff…

泰迪智能科技企业数据挖掘平台使用场景

企业数据挖掘平台助力企业数据挖掘&#xff0c;数据挖掘平台也在多个领域发挥着重要的作用。 企业数据挖掘平台具有数据抓取、数据清洗、数据分析、机器学习等多项功能&#xff0c;广泛应用于企业的各个领域&#xff0c;包括&#xff1a;金融行业、医疗行业、交通领域、教育、制…

[BUUCTF]-PWN:oneshot_tjctf_2016解析(字符串输入,onegadget)

查看保护 查看ida 这道题的大致思路就是泄露libc地址&#xff0c;然后用onegadget来getshell 但是要注意&#xff0c;这里要我们输入的数据类型是long int&#xff0c;所以不能用我们常用的p64函数了。 完整exp&#xff1a; from pwn import* from LibcSearcher import* con…

Python的循环结构练习

归纳编程学习的感悟&#xff0c; 记录奋斗路上的点滴&#xff0c; 希望能帮到一样刻苦的你&#xff01; 如有不足欢迎指正&#xff01; 共同学习交流&#xff01; &#x1f30e;欢迎各位→点赞 &#x1f44d; 收藏⭐ 留言​&#x1f4dd; 生命对某些人来说是美丽的&#xff0c…

任职资格经典案例:企业任职资格体系搭建项目纪实

传统的任职资格体系主要考虑年限、经验、资历等因素&#xff0c;部分企业在任职资格体系中也引入了能力指标&#xff0c;但是&#xff0c;实际管理过程中仍然存在很多问题&#xff1a;员工“熬年头”意识严重、工作积极性差、优秀人员因得不到晋升而流失、各方面能力都不错的人…

深入理解Lambda表达式:基础概念与实战演练【第114篇—python:Lambda表达式】

深入理解Lambda表达式&#xff1a;基础概念与实战演练 在现代编程语言中&#xff0c;Lambda表达式作为一种轻量级的匿名函数形式&#xff0c;越来越受到程序员的青睐。特别是在函数式编程兴起的今天&#xff0c;Lambda表达式在简化代码、提高可读性方面发挥着重要作用。本文将…

《AI纪元:幻域探险》

游戏项目名称&#xff1a;《AI纪元&#xff1a;幻域探险》 游戏类型&#xff1a;AI驱动的角色扮演探险游戏&#xff08;RPG&#xff09; 背景设定&#xff1a; 《AI纪元&#xff1a;幻域探险》设定在一个名为“幻域”的广阔虚拟世界。这个世界由高度发达的AI技术支持&#xff0…

windows环境下部署k8s

1、安装docker Desktop; 2、打开setting勾选启用k8s&#xff08;参考了许多帖子&#xff0c;说需要预先下载镜像&#xff0c;直接勾选会被墙&#xff0c;应该是跟版本有关&#xff0c;目前使用的版本没有出现这类问题&#xff0c;只是确实会稍慢&#xff0c;如果需要加快可以先…

c++/c图的邻近矩阵表示

#include<iostream> using namespace std;#define MaxVerterNum 100 typedef char VerterType; typedef int EdgeType; typedef struct {VerterType vexs[MaxVerterNum]; // 存储顶点EdgeType edges[MaxVerterNum][MaxVerterNum]; // 存储邻接矩阵int n, e; // 顶点数和边…

【洛谷 P9240】[蓝桥杯 2023 省 B] 冶炼金属 题解(二分答案)

[蓝桥杯 2023 省 B] 冶炼金属 题目描述 小蓝有一个神奇的炉子用于将普通金属 O 冶炼成为一种特殊金属 X。这个炉子有一个称作转换率的属性 V V V&#xff0c; V V V 是一个正整数&#xff0c;这意味着消耗 V V V 个普通金属 O 恰好可以冶炼出一个特殊金属 X&#xff0c;当普…