采用Flink CDC操作SQL Server数据库获取增量变更数据

news2024/11/28 8:26:01

采用Flink CDC操作SQL Server数据库获取增量变更数据

Flink CDC 1.12版本引入了对SQL Server的支持,包括SqlServerCatalogSqlServerTable。在SqlServerCatalog中,你可以根据表名获取对应的字段和字段类型。

SQL Server 2008 开始支持变更数据捕获 (CDC) 功能。CDC 允许你捕获对表中数据更改的数据,这样你就可以查询更改的数据而不需要扫描整个表。

1、准备工作

软件版本

Flink 1.17.1

数据库版本 Microsoft SQL Server 2019 (RTM) - 15.0.2000.5 (X64)

1.1、数据库准备 启动CDC

--  开启SQL Server数据库CDC。  在需要开启CDC的数据库执行此命令
EXEC sys.sp_cdc_enable_db
-- 查询开启CDC的数据库
select name, is_cdc_enabled from sys.databases 

1.2、开启SQL Server代理

打开 SQL Server配置管理器 => 选择SQL Server服务 => 选择SQL Server代理 右击开启

在这里插入图片描述

1.3、为需要跟踪更改的表启用 CDC。

-- 开启表级别的CDC   --需要开启先SQL Server代理  然后执行 
 EXEC sys.sp_cdc_enable_table
        @source_schema = 'dbo', -- source_schema
        @source_name = 'AIR_STATION_HOUR_DATA', -- table_name
        @capture_instance = NULL, -- capture_instance
        @supports_net_changes = 1, -- supports_net_changes
        @role_name = NULL -- role_name
 --  验证表是否开启cdc成功
 EXEC sys.sp_cdc_help_change_data_capture

2、代码编写

2.1、引入依赖

    <properties>
        <flink.version>1.17.1</flink.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>com.microsoft.sqlserver</groupId>
            <artifactId>mssql-jdbc</artifactId>
            <version>9.4.1.jre8</version>
        </dependency>        
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.26</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>com.ververica</groupId>
            <artifactId>flink-connector-sqlserver-cdc</artifactId>
            <version>2.4.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka</artifactId>
            <version>${flink.version}</version>
        </dependency>
 
 
         <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner-loader</artifactId>
            <version>${flink.version}</version>
        </dependency>
    </dependencies>

2.2、代码编写

2.2.1 、数据库配置文件编写

public class SQLServerConstant {
    public static final String SQLSERVER_HOST = "0.0.0.0";  //数据库地址
    public static final Integer SQLSERVER_PORT = 1433; //端口
 
    public static final String SQLSERVER_DATABASE = "HBDC_AQI";  //库

    public static final String SQLSERVER_TABLE_LIST= "dbo.AIR_STATION_HOUR_DATA"; // 表
    public static final String SQLSERVER_USER_NAME = "sa"; //用户
    public static final String SQLSERVER_PASSWORD = "*******"; //密码
}

2.2.2 CDC数据实体类

@Data
public class DataChangeInfo implements Serializable {
    /**
     * 数据库名
     */
    private String database;
    /**
     * 表名
     */
    private String tableName;
    /**
     * 变更时间
     */
    private LocalDateTime changeTime;
    /**
     * 变更类型 1新增 2修改 3删除
     */
    private Integer eventType;
    /**
     * 变更前数据
     */
    private String beforeData;
    /**
     * 变更后数据
     */
    private String afterData;

}

2.2.2 、SQLServer消息读取自定义序列化

@Slf4j
public class SQLServerJsonDebeziumDeserializationSchema implements DebeziumDeserializationSchema<DataChangeInfo> {

    public static final String TS_MS = "ts_ms";
    public static final String BEFORE = "before";
    public static final String AFTER = "after";
    public static final String SOURCE = "source";
    public static final String CREATE = "CREATE";
    public static final String UPDATE = "UPDATE";


    @Override
    public void deserialize(SourceRecord sourceRecord, Collector<DataChangeInfo> collector) throws Exception {
        try {
            String topic = sourceRecord.topic();
            String[] fields = topic.split("\\.");
            String database = fields[1];
            String tableName = fields[2];
            Struct struct = (Struct) sourceRecord.value();
            final Struct source = struct.getStruct(SOURCE);
            DataChangeInfo dataChangeInfo = new DataChangeInfo();
            dataChangeInfo.setBeforeData(getJsonObject(struct, BEFORE).toJSONString());
            dataChangeInfo.setAfterData(getJsonObject(struct, AFTER).toJSONString());
            // 获取操作类型  CREATE UPDATE DELETE  1新增 2修改 3删除
            Envelope.Operation operation = Envelope.operationFor(sourceRecord);
            String type = operation.toString().toUpperCase();
            int eventType = type.equals(CREATE) ? 1 : UPDATE.equals(type) ? 2 : 3;
            dataChangeInfo.setEventType(eventType);
            dataChangeInfo.setDatabase(database);
            dataChangeInfo.setTableName(tableName);
            ZoneId zone = ZoneId.systemDefault();
            Long timestamp = Optional.ofNullable(struct.get(TS_MS)).map(x -> 		         Long.parseLong(x.toString())).orElseGet(System::currentTimeMillis);
            dataChangeInfo.setChangeTime(LocalDateTime.ofInstant(Instant.ofEpochMilli(timestamp), zone));
            //7.输出数据
            collector.collect(dataChangeInfo);
        } catch (Exception e) {
            log.error("SQLServer消息读取自定义序列化报错:{}", e.getMessage());

        }

    }
    /**
     *
     * 从源数据获取出变更之前或之后的数据
     */
    private JSONObject getJsonObject(Struct value, String fieldElement) {
        Struct element = value.getStruct(fieldElement);
        JSONObject jsonObject = new JSONObject();
        if (element != null) {
            Schema afterSchema = element.schema();
            List<Field> fieldList = afterSchema.fields();
            for (Field field : fieldList) {
                Object afterValue = element.get(field);
                jsonObject.put(field.name(), afterValue);
            }
        }
        return jsonObject;
    }



    @Override
    public TypeInformation<DataChangeInfo> getProducedType() {

        return TypeInformation.of(DataChangeInfo.class);
    }
}

2.2.3 、功能工具类

public class FlinkSourceUtil {
   
    /**
     * 构造SQL Server CDC数据源
     */
    public static DebeziumSourceFunction<DataChangeInfo> buildDataChangeSource() {
        String[] tables = SQLSERVER_TABLE_LIST.replace(" ", "").split(",");
        return SqlServerSource.<DataChangeInfo>builder()
                .hostname(SQLSERVER_HOST)
                .port(SQLSERVER_PORT)
                .database(SQLSERVER_DATABASE) // monitor sqlserver database
                .tableList(tables) // monitor products table
                .username(SQLSERVER_USER_NAME)
                .password(SQLSERVER_PASSWORD)
                /*
                 *initial初始化快照,即全量导入后增量导入(检测更新数据写入)
                 * latest:只进行增量导入(不读取历史变化)
                 */
                .startupOptions(com.ververica.cdc.connectors.base.options.StartupOptions.initial())
                .deserializer(new SQLServerJsonDebeziumDeserializationSchema()) // converts SourceRecord to JSON String
                .build();
    }

   
    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        DebeziumSourceFunction<DataChangeInfo> dataChangeInfoMySqlSource = buildDataChangeSource();
        DataStream<DataChangeInfo> streamSource = env
                .addSource(dataChangeInfoMySqlSource, "SQLServer-source")
                .setParallelism(1);
        streamSource.print();

        env.execute("SQLServer-stream-cdc");


    }
}

2.3、运行main方法测试

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1581897.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

数码相框-显示JPG图片

LCD控制器会将LCD上的屏幕数据映射在相应的显存位置上。 通过libjpeg把jpg图片解压出来RGB原始数据。 libjpeg是使用c语言实现的读写jpeg文件的库。 使用libjpeg的应用程序是以"scanline"为单位进行图像处理的。 libjpeg解压图片的步骤&#xff1a; libjpeg的使…

FPGA:图像数字细节增强算法(工程+仿真+实物,可用毕设)

目录 日常唠嗑一、视频效果二、硬件及功能1、硬件选择2、功能3、特点 未完、待续……四、工程设计五、板级验证六、工程获取 日常唠嗑 有2个多月没写文章了&#xff0c;又是老借口&#xff1a;“最近实在是很忙”&#x1f923;&#xff0c;不过说真&#xff0c;确实是比较忙&am…

AWS服务器有哪些优势?

作为一家总部在美国的公司&#xff0c;AWS为什么会受到中国企业的喜爱&#xff1f;他有什么优势&#xff1f;九河云作为AWS合作伙伴&#xff0c;将会带读者展现使用AWS的优势。 首先是作为跨国企业&#xff0c;AWS在全球有数十个区域节点&#xff0c;这种广泛的地域覆盖不仅有…

【简单讲解下Kotlin】

&#x1f3a5;博主&#xff1a;程序员不想YY啊 &#x1f4ab;CSDN优质创作者&#xff0c;CSDN实力新星&#xff0c;CSDN博客专家 &#x1f917;点赞&#x1f388;收藏⭐再看&#x1f4ab;养成习惯 ✨希望本文对您有所裨益&#xff0c;如有不足之处&#xff0c;欢迎在评论区提出…

[大模型]基于 ChatGLM3 和 LangChain 搭建知识库助手

基于 ChatGLM3 和 LangChain 搭建知识库助手 环境配置 在已完成 ChatGLM3 的部署基础上&#xff0c;还需要安装以下依赖包&#xff1a; pip install langchain0.0.292 pip install gradio4.4.0 pip install chromadb0.4.15 pip install sentence-transformers2.2.2 pip inst…

详解TCP和UDP协议的区别

一、前言 TCP和UDP协议是TCP/IP协议的核心。TCP 传输协议&#xff1a;TCP 协议是一TCP (Transmission Control Protocol)和UDP(User Datagram Protocol)协议属于传输层协议。其中TCP提供IP环境下的数据可靠传输&#xff0c;它提供的服务包括数据流传送、可靠性、有效流控、全双…

实时时钟模块RX8900CE为电子产品设备提供精准时间,能够适应极度紧凑的空间

随着电子技术飞速发展&#xff0c;越来越多的设备需要用到实时时钟电路。而过往的实时时钟电路&#xff0c;大多是分立式的架构&#xff0c;外围有不少的元器件&#xff0c;不但成本高昂&#xff0c;而且稳定性也不高&#xff0c;在严苛的工作条件下就显得有点力不从心。作为设…

ARM单片机的GPIO口在控制不同LED、按键时的设置

个人备忘&#xff0c;不喜勿喷。 GPIO口在驱动共阴极、共阳极LED灯时需要不同的初始化设置 对于这一类的led灯&#xff1a; 最好选择推挽、上拉、高速输出&#xff0c;同时IO口初始化时需要拉高。 上面这种需要下拉输入&#xff1b; 上图这种需要上拉输入&#xff0c;这样才…

聊一聊一些关于npm、pnpm、yarn的事

前言 整理了最近的闲聊&#xff0c;话题是前端各个包管理器&#xff0c;如果分享的不对或者有异议的地方&#xff0c;麻烦请及时告诉我~ 耐心看完&#xff0c;也许你会有所收获~ 概述 本文阅读时间&#xff1a;10-15分钟左右&#xff1b; 难度&#xff1a;初级&#xff0c…

LeetCode 2529. 正整数和负整数的最大计数——每日一题

上一篇博客&#xff1a;LeetCode 993. 二叉树的堂兄弟节点——每日一题 写在前面&#xff1a;大家好&#xff01;我是晴空๓。如果博客中有不足或者的错误的地方欢迎在评论区或者私信我指正&#xff0c;感谢大家的不吝赐教。我的唯一博客更新地址是&#xff1a;https://ac-fun.…

【计算机考研】408网课汇总+资源分享

王道的四件套无疑是大多数同学的首选。相比其他课程来说&#xff0c;也是属于市面上最好的408课程了。 从今年的难度来看选择题部分和计网&#xff0c;比起往年来看是有很多偏题&#xff0c;大题除了计网的冷门外&#xff0c;其他倒是中规中矩。总体来看24考研的408难度是非常…

Win11 使用 WSL2 安装 linux 子系统 ubuntu

Win11 使用 WSL2 安装 linux 子系统 ubuntu 段子手168 1、用 部署映像服务和管理工具 dism.exe 命令&#xff0c;开启 WSL2 按【WIN R】&#xff0c;打开【运行】&#xff0c;输入&#xff1a;【cmd】&#xff0c;管理员打开【命令行提示符】。 启用适用于 Linux 的 Windo…

单例模式(饿汉模型,懒汉模型)

在着里我们先了解什么是单例模式。 就是某个类在进程中只能有单个实例&#xff0c;这里的单例模式需要一定的编程技巧&#xff0c;做出限制&#xff0c;一旦程序写的有问题&#xff0c;创建了多个实例&#xff0c;编程就会报错。 如果我们学会了单例模式&#xff0c;这种模式…

ORAN C平面 Section Extension 22

ORAN C平面Section扩展22用于ACK/NACK请求。除section type 7外&#xff0c;section扩展22可以用于从O-DU发送到O-RU的所有section type和section扩展。 对于一个section描述&#xff0c;O-DU可以使用section扩展22要求O-RU使用section type 8 C平面消息进行ACK/NACK反馈。关于…

Spring Validation解决后端表单校验

NotNull&#xff1a;从前台传递过来的参数不能为null,如果为空&#xff0c;会在控制台日志中把message打印出来 Range&#xff1a;范围&#xff0c;最大多少&#xff0c;最小多少 Patten&#xff0c;标注的字段值必须符合定义的正则表达式&#xff08;按照业务规则&#xff0…

智慧公厕是智慧城市建设中不可或缺的一部分

智慧城市的数字化转型正在取得显著成效&#xff0c;各项基础设施的建设也在迅速发展&#xff0c;其中智慧公厕成为了智慧城市体系中不可或缺的一部分。作为社会生活中必要的设施&#xff0c;公共厕所的信息化、数字化、智慧化升级转型能够实现全区域公共厕所管理的横向打通和纵…

T527 Qt 触摸 ----- TSLIB

一、调试 1、驱动路径 bsp/drivers/input/ctp/gt9xx/gt9xx_ts.c 2、硬件接口 挂载在TWI0下 3、中断复位脚 4、设备树 &twi0 {clock-frequency <400000>;pinctrl-0 <&twi0_pins_default>;pinctrl-1 <&twi0_pins_sleep>;pinctrl-names &quo…

vue通过echarts实现数据可视化

1、安装echarts cnpm install echarts -Sechart官方图表示例大全&#xff1a;https://echarts.apache.org/examples/zh/index.html#chart-type-line 2、代码实现 <template><div><div class"box" ref"zhu"></div><div class&…

设计模式之创建型模式---建造者模式

文章目录 建造者模式概述经典的建造者模式建造者模式的变种总结 建造者模式概述 建造者模式是一种广泛使用的设计模式&#xff0c;在三方开源库和各种SDK中经常见到。建造者设计模式在四人帮的经典著作《设计模式&#xff1a;可复用面向对象软件基础》中被提及&#xff0c;它的…

赛氪网|2024中国翻译协会年会“AI科技时代竞赛与就业”分论坛

在2024年中国翻译协会年会期间&#xff0c;赛氪网与中西部翻译协会共同体多边合作平台共同承办&#xff0c;于3月30日下午在长沙成功举办了“AI科技时代竞赛与就业分论坛”。该论坛汇聚了众多翻译界、科技界和教育界的专家学者&#xff0c;共同探讨科技、实践、就业与竞赛人才培…