【Phoenix】phoenix实现每个Primarykey主键保留N版本数据,CDC数据记录为Changelog格式

news2024/9/28 7:21:47

一、背景:

CDC数据中包含了,数据的变更过程。当CDC写入传统数据库最终每一个primary key下会保存一条数据。当然可以使用特殊手段保存多分记录但是显然造成了数据膨胀。
另外数据湖Hudi(0.13.1)是不支持保存所有Changelog其Compaction机制会清除所有旧版本的内容。Iceberg支持TimeTravel,能查到某个时间点的数据状态,但是不能列举的单条记录的Change过程。
所以目前只能手动实现。
其实,实现思路很简单,将原PrimaryKey+Cdc的 ts_ms 一起作为新表的 PrimaryKey就可以了。但需要注意的是一条数据可能变更很多次,但一般需要保存近几次的变更,所以就需要删除部分旧变更记录。ts_ms 就是CDC数据中记录的日志实际产生的时间,具体参见debezium 。如果原表primarykey是联合主键,即有多个字段共同组成,则最好将这些字段拼接为一个字符串,方便后续关联。

本文思路
CDC --写入-> Phoenix + 定期删除旧版本记录

CDC数据写入略过,此处使用SQL模拟写入。

二、Phoenix旧版记录删除(DEMO)

phoenix doc

bin/sqlline.py www.xx.com:2181
-- 直接创建phoenix表
create table TEST.TEST_VERSION(
ID VARCHAR NOT NULL,
TS TIMESTAMP NOT NULL,
NAME VARCHAR,
CONSTRAINT my_pk PRIMARY KEY (ID,TS)
) VERSIONS=5;

再去hbase shell中查看,hbase 关联表已经有phoenix创建了。

hbase(main):032:0> desc "TEST:TEST_VERSION"
Table TEST:TEST_VERSION is ENABLED
TEST:TEST_VERSION, {TABLE_ATTRIBUTES => {coprocessor$1 => '|org.apache.phoenix.coprocessor.ScanRegionObserver|805306366|', coprocessor$2 => '|org.apache.phoenix.coprocessor.UngroupedAggregateRe
gionObserver|805306366|', coprocessor$3 => '|org.apache.phoenix.coprocessor.GroupedAggregateRegionObserver|805306366|', coprocessor$4 => '|org.apache.phoenix.coprocessor.ServerCachingEndpointImpl|80
5306366|', coprocessor$5 => '|org.apache.phoenix.hbase.index.Indexer|805306366|index.builder=org.apache.phoenix.index.PhoenixIndexBuilder,org.apache.hadoop.hbase.index.codec.class=org.apache.phoenix
.index.PhoenixIndexCodec', METADATA => {'OWNER' => 'dcetl'}}
COLUMN FAMILIES DESCRIPTION
{NAME => '0', VERSIONS => '5', EVICT_BLOCKS_ON_CLOSE => 'false', NEW_VERSION_BEHAVIOR => 'false', KEEP_DELETED_CELLS => 'FALSE', CACHE_DATA_ON_WRITE => 'false', DATA_BLOCK_ENCODING => 'FAST_DIFF', T
TL => 'FOREVER', MIN_VERSIONS => '0', REPLICATION_SCOPE => '0', BLOOMFILTER => 'NONE', CACHE_INDEX_ON_WRITE => 'false', IN_MEMORY => 'false', CACHE_BLOOMS_ON_WRITE => 'false', PREFETCH_BLOCKS_ON_OPE
N => 'false', COMPRESSION => 'NONE', BLOCKCACHE => 'true', BLOCKSIZE => '65536'}
-- 在phoenix中向表插入数据
UPSERT INTO TEST.TEST_VERSION(ID,TS,NAME) VALUES('rk001',TO_TIMESTAMP('2020-01-01 10:00:00'),'zhangsan');
UPSERT INTO TEST.TEST_VERSION(ID,TS,NAME) VALUES('rk001',TO_TIMESTAMP('2020-01-01 11:00:00'),'lisi');
UPSERT INTO TEST.TEST_VERSION(ID,TS,NAME) VALUES('rk001',TO_TIMESTAMP('2020-01-01 12:00:00'),'wangwu');
UPSERT INTO TEST.TEST_VERSION(ID,TS,NAME) VALUES('rk001',TO_TIMESTAMP('2020-01-01 13:00:00'),'zhaoliu');
UPSERT INTO TEST.TEST_VERSION(ID,TS,NAME) VALUES('rk001',TO_TIMESTAMP('2020-01-01 14:00:00'),'liuqi');
UPSERT INTO TEST.TEST_VERSION(ID,TS,NAME) VALUES('rk001',TO_TIMESTAMP('2020-01-01 15:00:00'),'sunba');
UPSERT INTO TEST.TEST_VERSION(ID,TS,NAME) VALUES('rk002',TO_TIMESTAMP('2020-01-01 07:00:00'),'sunyang');
UPSERT INTO TEST.TEST_VERSION(ID,TS,NAME) VALUES('rk002',TO_TIMESTAMP('2020-01-01 08:00:00'),'chaoyang');
UPSERT INTO TEST.TEST_VERSION(ID,TS,NAME) VALUES('rk002',TO_TIMESTAMP('2020-01-01 09:00:00'),'xuri');
UPSERT INTO TEST.TEST_VERSION(ID,TS,NAME) VALUES('rk002',TO_TIMESTAMP('2020-01-01 09:30:00'),'chenxi');
-- OK再查询一下数据插入情况
SELECT * FROM TEST.TEST_VERSION;

以下假设每个PrimaryKey需要保留最新的3版本数据。所以红色框内是需要删除的数据。
在这里插入图片描述

现在需要使用row_number的函数给每个primarykey的不通version数据标识。但是phoenix并没有开窗函数。只有agg聚合函数。
phoenix对SQL的限制还是比较多的如:
(1)join 非等值连接不支持,如on a.id>s.id 是不支持的,也不支持数组比较连接,如on a.id = ARRAY[1,2,3]。 会报错:Error: Does not support non-standard or non-equi correlated-subquery conditions. (state=,code=0)
(2)where exists 格式的非等值连接不支持。select ... from A where exists (select 1 from B where A.id>B.id) 是不支持的。会报错:Error: Does not support non-standard or non-equi correlated-subquery conditions. (state=,code=0)
(2)没有开窗window函数
(3)DELETE FROM不支持JOIN

最终发下有一下函数可用
(1)NTH_VALUE 获取分组排序的第N个值。 返回原值的类型。
(2)FIRST_VALUESLAST_VALUES 获取分区排序后的前、后的N个值,返回ARRAY类型。
此三个函数官网doc中,案例是这样的 FIRST_VALUES( name, 3 ) WITHIN GROUP (ORDER BY salary DESC) 是全局分组,而实际使用中是需要搭配 GROUP BY 使用的。

所以可以获取到

-- 方案一:使用NTH_VALUE获取阈值
SELECT A.ID,A.TS FROM TEST.TEST_VERSION A 
INNER JOIN (
SELECT ID,NTH_VALUE(TS,3) WITHIN GROUP (ORDER BY TS DESC) THRES FROM TEST.TEST_VERSION GROUP BY ID) Z ON A.ID=Z.ID
WHERE A.TS < Z.THRES

-- 方案二:使用FIRST_VALUES获取到一个ARRAY 
SELECT A.ID,A.TS FROM TEST.TEST_VERSION A 
INNER JOIN (
SELECT ID,FIRST_VALUES(TS,3) WITHIN GROUP (ORDER BY TS DESC) TSS FROM TEST.TEST_VERSION GROUP BY ID) Z ON A.ID=Z.ID
WHERE A.TS < ALL(Z.TSS);

由于phoenix支持行子查询,以下是官方案例。这样就能绕过不使用DELETE … JOIN了。

Row subqueries
A subquery can return multiple fields in one row, which is considered returning a row constructor. The row constructor on both sides of the operator (IN/NOT IN, EXISTS/NOT EXISTS or comparison operator) must contain the same number of values, like in the below example:
SELECT column1, column2
FROM t1
WHERE (column1, column2) IN
    (SELECT column3, column4
     FROM t2
     WHERE column5 = ‘nowhere’);
This query returns all pairs of (column1, column2) that can match any pair of (column3, column4) in the second table after being filtered by condition: column5 = ‘nowhere’.

最终实现删除 除N个较新的以外的所有旧版本数据, SQL如下:

-- NTH_VALUE方式
DELETE FROM TEST.TEST_VERSION
WHERE (ID,TS) IN (
SELECT A.ID,A.TS FROM TEST.TEST_VERSION A 
INNER JOIN (
SELECT ID,NTH_VALUE(TS,3) WITHIN GROUP (ORDER BY TS DESC) THRES FROM TEST.TEST_VERSION GROUP BY ID) Z ON A.ID=Z.ID
WHERE A.TS < Z.THRES
);

-- FIRST_VALUES方式
DELETE FROM TEST.TEST_VERSION
WHERE (ID,TS) IN (
SELECT A.ID,A.TS FROM TEST.TEST_VERSION A 
INNER JOIN (
SELECT ID,FIRST_VALUES(TS,3) WITHIN GROUP (ORDER BY TS DESC) TSS FROM TEST.TEST_VERSION GROUP BY ID) Z ON A.ID=Z.ID
WHERE A.TS < ALL(Z.TSS)
);

删除后效果:
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1026096.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

语义分割笔记(二):DeepLab V3对图像进行分割(自定义数据集从零到一进行训练、验证和测试)

文章目录 一、语义分割介绍1.1 语义分割和实例分割的区别1.2 DeepLab系列对比 二、代码下载2.1 代码测试2.2 视频学习 三、数据集准备3.1 Json转png3.2 数据集划分 四、模型训练五、模型测试六、模型评估 一、语义分割介绍 语义分割是计算机视觉中的一项技术&#xff0c;旨在将…

在windows下持续ping ip,将返回结果及时间记录到文件中

在纯英文路径下创建文件ping.txt 在txt中写入 Dim args, flag, unsuccOut args"" otherout"" flag0If WScript.Arguments.count 0 Then WScript.Echo "Usage: cscript tping.vbs [-t] [-a] [-n count] [-l size] [-f] [-i TTL] [-v TOS]" WScr…

java中的自定义对象排序

对于数组排序我们知道有Arrays.sort()方法&#xff0c;但是如果遇到想要对一个对象数组中的某个属性进行排序&#xff0c;我们该如何去做呢&#xff1f; 以给学生成绩排序为例&#xff0c;首先创建一个存储学生对象的数组。 package JAVA_API;public class Sort_oop {public …

解密Java多线程中的锁机制:CAS与Synchronized的工作原理及优化策略

目录 CAS什么是CASCAS的应用ABA问题异常举例 Synchronized 原理基本特征加锁过程偏向锁轻量级锁重量级锁 其他优化操作锁消除锁粗化 CAS 什么是CAS CAS: 全称Compare and swap&#xff0c;字面意思:”比较并交换“&#xff0c;CAS涉及如下操作&#xff1a; 假设内存中的原数据…

7、DVWA——SQL盲注

文章目录 一、概述二、low2.1 通关思路&#xff08;布尔盲注&#xff09;&#xff08;1&#xff09;判断是否存在SQL注入漏洞&#xff08;2&#xff09;判断属于数字型注入还是字符型注入&#xff08;3&#xff09;判断结果集中的字段数&#xff08;4&#xff09;猜数据库名长度…

常见的排序算法及时间空间复杂度

排序算法是计算机科学中的基本算法之一&#xff0c;它用于将一组数据按照某种顺序进行排列。下面是一些常见的排序算法&#xff0c;以及它们的思想和时间空间复杂度&#xff0c;希望对大家有所帮助。北京木奇移动技术有限公司&#xff0c;专业的软件外包开发公司&#xff0c;欢…

c++qt day11

通过代码实现电子钟表 头文件 #ifndef WIDGET_H #define WIDGET_H#include <QWidget> #include<QPaintEvent> #include<QDebug> #include<QPainter> #include<QFont> #include<QTime> #include<QTimer>QT_BEGIN_NAMESPACE namespa…

pytorch的卷积层池化层和非线性变化 和机器学习线性回归

卷积层&#xff1a; 两个输出的情况 就会有两个通道 可以改变通道数的 最简单的神经网络结构&#xff1a; nn.Mudule就是继承父类 super执行的是 先执行父类函数里面的 forward执行的就是前向网络&#xff0c;就是往前推进的&#xff0c;当然也有反向转播&#xff0c;那就是…

基于Java的电影院管理系统设计与实现

前言 &#x1f497;博主介绍&#xff1a;✌全网粉丝10W,CSDN特邀作者、博客专家、CSDN新星计划导师、全栈领域优质创作者&#xff0c;博客之星、掘金/华为云/阿里云/InfoQ等平台优质作者、专注于Java、小程序技术领域和毕业项目实战✌&#x1f497; &#x1f447;&#x1f3fb;…

百度收录和权重怎么提升-网站如何获得百度权重

你是否一直苦恼于网站权重的低迷&#xff1f;不知道如何开始提升网站权重&#xff0c;缺乏优质内容更新网站。不清楚如何进行关键词优化来提升网站排名和权重。SEO是一个需要持续投入时间和资源的过程。每个网站的情况都会有所不同&#xff0c;因此所花费的时间也会有所差异。然…

蓝桥杯 题库 简单 每日十题 day6

01 删除字符 题目描述 给定一个单词&#xff0c;请问在单词中删除t个字母后&#xff0c;能得到的字典序最小的单词是什么&#xff1f; 输入描述 输入的第一行包含一个单词&#xff0c;由大写英文字母组成。 第二行包含一个正整数t。 其中&#xff0c;单词长度不超过100&#x…

网络编程day03(UDP中的connect函数、tftp)

今日任务&#xff1a;tftp的文件上传下载&#xff08;服务端已经准备好&#xff09; 服务端&#xff08;已上传&#xff09; 客户端&#xff1a; 代码&#xff1a; #include <stdio.h> #include <string.h> #include <stdlib.h> #include <sys/types.h…

全球南方《乡村振兴战略下传统村落文化旅游设计》许少辉八一新枝——2023学生开学季辉少许

全球南方《乡村振兴战略下传统村落文化旅游设计》许少辉八一新枝——2023学生开学季辉少许

QT实现简易时钟

头文件 #ifndef WIDGET_H #define WIDGET_H#include <QWidget> #include <QPaintEvent> #include <QDebug> #include <QPainter> #include <QTimerEvent> #include <QTimer> #include <QTime>QT_BEGIN_NAMESPACE namespace Ui { cl…

51单片机项目(13)——基于51单片机的智能台灯protues仿真

本次设计&#xff0c;使用protues软件进行仿真&#xff0c;详情如下&#xff1a; 1.输入部分:由热释电红外传感器、光敏传感器、超声波测距传感器所构成的子电路组成。 2.输出模块:由1602液晶显示及其蜂鸣器报警系统组成。 3.中央处理器:主要有AT89C52单片机构成。 4.工作过…

C++ PrimerPlus 复习 第八章 函数探幽

第一章 命令编译链接文件 make文件 第二章 进入c 第三章 处理数据 第四章 复合类型 &#xff08;上&#xff09; 第四章 复合类型 &#xff08;下&#xff09; 第五章 循环和关系表达式 第六章 分支语句和逻辑运算符 第七章 函数——C的编程模块&#xff08;上&#xff…

一、Stable Diffusion WebUI 安装

Mac 配置 类别配置机型Macbook pro m2核总数12 核中央处理器、38 核图形处理器和 16 核神经网络引擎内存64 G系统Sonoma 安装 Homebrew 打开终端执行&#xff08;使用了国内镜像源安装&#xff09; /bin/bash -c "$(curl -fsSL https://gitee.com/cunkai/HomebrewCN/r…

深入理解HttpSecurity的设计

文章目录 HttpSecurity的应用HttpSecurity的类图结构SecurityBuilder接口AbstractConfiguredSecurityBuilderadd方法doBuild方法 HttpSecurity HttpSecurity的应用 在上文介绍了基于配置文件的使用方式以及实现细节&#xff0c;如下&#xff1a; 也就是在配置文件中通过 secur…

javascript使用正则表达式去除字符串中括号的方法

如下面的例子&#xff1a; (fb6d4f10-79ed-4aff-a915-4ce29dc9c7e1,39996f34-013c-4fc6-b1b3-0c1036c47119,39996f34-013c-4fc6-b1b3-0c1036c47169,39996f34-013c-4fc6-b1b3-0c1036c47111,2430bf64-fd56-460c-8b75-da0a1d1cd74c,39996f34-013c-4fc6-b1b3-0c1036c47112) 上面是前…

华为HCIA(六)

LACPDU中携带接口优先级&#xff0c;系统MAC地址&#xff0c;设备优先级 Mac-vlan命令是配置基于MAC地址的VLAN 二层ACL匹配源目MAC二层协议类型等 HTTP为超文本传输协议&#xff0c;用于网页访问 二层组网指的是AC与AP同在一个网段内 IPV6全球单播地址 华为OSPF内部路由…