FlinkCDC基础篇章2-数据源 SqlServerCDC写入到ES中

news2025/2/26 15:48:41

接着 上期FlinkCDC基础篇章1-安装使用  

  1. 下载 Flink 1.17.0 并将其解压至目录 flink-1.17.0

  2. 下载下面列出的依赖包,并将它们放到目录 flink-1.17.0/lib/ 下:

    下载链接只对已发布的版本有效, SNAPSHOT 版本需要本地编译

    • flink-sql-connector-elasticsearch7-3.0.1-1.17.jar
    • flink-sql-connector-mysql-cdc-2.4.0.jar
    • flink-sql-connector-postgres-cdc-2.4.0.jar

首先,开启 checkpoint,每隔3秒做一次 checkpoint

-- Flink SQL                   
Flink SQL> SET execution.checkpointing.interval = 3s
-- 创建源表t_source_sqlserver,使用SQL Server Change Data Capture (CDC)连接器从SQL Server数据库读取数据
CREATE TABLE t_source_sqlserver (
    id INT,
    order_date DATE,
    purchaser INT,
    quantity INT,
    product_id INT,
    PRIMARY KEY (id) NOT ENFORCED -- 主键定义(可选)
) WITH (
    'connector' = 'sqlserver-cdc',  -- 使用SQL Server CDC连接器
    'hostname' = '10.194.183.120',  -- SQL Server主机名
    'port' = '30027',               -- SQL Server端口
    'username' = 'sa',              -- SQL Server用户名
    'password' = 'abc@123456',      -- SQL Server密码
    'database-name' = 'cdc_test',   -- 数据库名称
    'schema-name' = 'dbo',          -- 模式名称
    'table-name' = 'orders'         -- 要捕获更改的表名
);

-- 创建目标表table_sink_mysql,使用JDBC连接器将数据写入MySQL数据库
CREATE TABLE table_sink_mysql (
    id INT,
    order_date DATE,
    purchaser INT,
    quantity INT,
    product_id INT,
    PRIMARY KEY (id) NOT ENFORCED  -- 主键定义(可选)
)
WITH (
    'connector' = 'jdbc',                        -- 使用JDBC连接器
    'url' = 'jdbc:mysql://10.194.183.120:30025/test',  -- MySQL的JDBC URL
    'username' = 'root',                        -- MySQL用户名
    'password' = 'root',                        -- MySQL密码
    'table-name' = 'orders'                     -- 要写入的MySQL表名
);

-- 从t_source_sqlserver表中选择数据,并将其插入到table_sink_mysql表中
INSERT INTO table_sink_mysql SELECT * FROM t_source_sqlserver;
CREATE TABLE income_distribution (

serviceCode STRING,

accountPeriod STRING,

subjectCode STRING,

subjectName STRING,

amt DECIMAL(13,2),

PRIMARY KEY (serviceCode, accountPeriod, subjectCode) NOT ENFORCED

) WITH (

'connector' = 'elasticsearch-7',

'hosts' = 'http://xxxx:9200',

'index' = 'income_distribution',

'sink.bulk-flush.backoff.strategy' = 'EXPONENTIAL'

);

可以在 http://localhost:8081/ 访问到 Flink Web UI,如下所示:

参考文献:

使用 Flink CDC 构建 Streaming ETL | Apache Flink CDC

flink sqlserver cdc实时同步(含sqlserver安装配置等)_flink cdc sqlserver-CSDN博客

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1608791.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

云原生Kubernetes: K8S 1.29版本 部署Jenkins

目录 一、实验 1.环境 2.K8S 1.29版本 部署Jenkins 服务 3.jenkins安装Kubernetes插件 二、问题 1.创建pod失败 2.journalctl如何查看日志信息 2.容器内如何查询jenkins初始密码 3.jenkins离线安装中文包报错 4.jenkins插件报错 一、实验 1.环境 (1&…

微服务使用SockJs+Stomp实现Websocket 前后端实例 | Vuex形式断开重连、跨域等等问题踩坑(一)

大家好,我是程序员大猩猩。 之前几篇文章,我们讲了Spring Cloud Gateway的轻量级实现,Nginx的配置概念与实现,如以下往期文章。 轻量级的Spring Cloud Gateway实践,实现api和websocket转发轻松实现Nginx的HTTP与WebS…

OpenHarmony实战开发-搜索功能实现案例、如何使用includes方法对数据实现模糊查询

介绍 本示例介绍使用includes方法对数据实现模糊查询 效果图预览 使用说明 点击首页搜索框跳转到搜索页面在搜索页面输入框中输入搜索的内容,下方列表自动根据搜索的内容进行筛选渲染点击筛选后的列表跳转到相应的页面跳转后会保存搜索历史,搜索历史使…

14-Error Handling (错误处理)

ESP32-S3错误处理:理解并掌握其重要性 在编程中,错误处理是一种重要的编程实践,它可以帮助我们的程序在遇到错误时能够优雅地恢复,而不是崩溃或产生不可预测的结果。在IDF中官方提供一些实用的错误处理技巧。👩‍&…

java扩展jmeter依赖

前置条件 创建一个maven项目&#xff0c; 引入依赖 <dependency><groupId>org.apache.jmeter</groupId><artifactId>ApacheJMeter_core</artifactId><version>3.2</version> </dependency> <dependency><groupId&g…

吴恩达2022机器学习专项课程(一) 6.1 动机第三周课后实验:Lab1使用逻辑回归进行分类

问题预览/关键词 回归和分类的区别&#xff1f;逻辑回归的作用是&#xff1f;什么是二分类问题&#xff1f;二分类问题案例如何表达二分类的结果&#xff1f;逻辑回归通常用哪种表达形式&#xff1f;什么是正样本和负样本&#xff1f;什么是阈值&#xff1f;可视化线性回归解决…

论婚恋相亲交友软件的市场前景和开发方案H5小程序APP源码

随着移动互联网的快速发展和社交需求的日益增长&#xff0c;婚恋相亲交友软件小程序成为了越来越多单身人士的选择。本文将从市场前景、使用人群、盈利模式以及竞品分析等多个角度&#xff0c;综合论述这一领域的现状与发展趋势。 一、市场前景 在快节奏的现代生活中&#xf…

ubuntu 22.04 编译 ORBSLAM3

源码地址&#xff08;带注释&#xff09;&#xff1a;ORBSLAM3 根据你安装的openCv版本修改cmake 修改2个文件的2个地方&#xff1a; ORB_SLAM3_detailed_comments-master/CMakeLists.txt ORB_SLAM3_detailed_comments-master/Thirdparty/DBoW2/CMakeLists.txt 查找openCv的地…

Python 爬虫如何配置代理 IP (Py 采集)

在Python中配置代理IP&#xff0c;可以通过设置requests库的proxies参数来实现。以下是一个示例&#xff1a; import requests# 则立可以获取稳定代理Ip&#xff1a;https://www.kuaidaili.com/?refrg3jlsko0ymg # 推荐使用私密动态 IP proxies {"http": "ht…

Mini-Gemini: Mining the Potential of Multi-modality Vision Language Models论文解读

文章目录 前言一、摘要二、引言三、文献1、大语言模型文献2、视觉语言模型文献3、LLM作为生成助手文献 四、模型方法与结果1、Dual Vision Encoders3、Patch Info Mining4、Text and Image Generation1、Text-image Instructions2、Generation-related Instructions 五、实验结…

webAssembly学习及使用rust

学习理解 webAssembly 概念知识&#xff0c;使用 API 进行 web 前端开发。 概念 是一种运行在现代网络浏览器中的新型代码&#xff0c;并且提供新的性能特性和效果。它有一种紧凑的二进制格式&#xff0c;使其能够以接近原生性能的速度运行。C/C、 C#、Rust等语言可以编译为 …

手写Java设计模式之抽象工厂模式,附源码解读

接上篇&#xff0c;抽象工厂模式将汽车的一些属性可以抽象出来&#xff0c;可以理解为给不同汽车品牌生成时加上不同的特性&#xff0c;如颜色等&#xff0c;具体代码如下&#xff1a; 引入颜色接口&#xff1a; public interface Colour {void fill(); }将颜色与汽车生成品牌…

Django项目使用uwsgi+nginx部署上线

Django项目使用uwsginginx部署上线 前言settings 配置安装uwsgi 和配置uwsgi推荐配置文件启用wsgi不使用nginx的配置&#xff08;不推荐&#xff09;使用nginx的配置 安装 nginx和配置niginx 配置 运行参考资料 前言 代码已经开发完成&#xff0c;正式部署上线 settings 配置…

公网IP地址如何申请SSL证书?有免费的IP ssl吗?

如果用户没有域名或只有公网IP地址或者不方便使用域名&#xff0c;IP地址ssl证书这一特殊的证书可以为IP地址实现HTTPS的安全保护&#xff0c;提高网站数据传输的安全性。 IP地址申请SSL证书的基本步骤 IP ssl证书下载---注册填写230916https://www.joyssl.com/certificate/sel…

Stable Diffusion UI 从安装到实现文字图片融合(光影字,错觉图)图片制作详细教程

前言 最近在实践大模型本地部署&#xff0c;前几天在本地部署了一个ChatGLM大模型&#xff0c;刚好环境搭好了&#xff0c;也支持跑Stable Diffusion&#xff0c;所以就安装了再尝试一下。 原因是之前在B站上有大佬做了一个Windows电脑能一键运行的Stable Diffusion的安装包&…

镜舟科技荣获金科创新社 2024 年度金融数据智能解决方案奖

近日&#xff0c; 镜舟科技凭借领先的金融实时数仓构建智能经营解决方案&#xff0c;在“金科创新社第六届金融数据智能优秀解决方案评选”活动中&#xff0c;成功入选“数据治理与数据平台创新优秀解决方案”榜单。 金科创新社主办的“鑫智奖”评选活动&#xff0c;旨在展示…

密码学 | 承诺:基本概念

目录 正文 1 承诺的交互 2 承诺的属性 3 硬币抛掷问题 3.1 朴素版方案 3.2 承诺版方案 &#x1f951;源自&#xff1a;https://en.wikipedia.org/wiki/Commitment_scheme &#x1f951;写在前面&#xff1a;英文的承诺是 commitment scheme&#xff0c;否则很难进行…

Unity射击游戏开发教程:(1)玩家控制

玩家的移动 玩家控制和移动是视频游戏中最酷的事情之一,因为你正在控制游戏中的某些东西 现在游戏中的玩家是我们的蓝色方块英雄。我在游戏开发中了解到,游戏是用简单的对象制作原型,然后添加所有漂亮的艺术和声音。代码… 我们要做的第一件事是在游戏开始时为玩家提供一个…

uniapp H5项目 获取接口的二进制流转化成图片url(base64)

如果你使用的是uniapp, 并且你从接口获取下来的数据长这样&#xff1a; 想要把取到的数据展示成图片&#xff0c;那么你可以这样做&#xff1a; // 这是我们的项目封装的请求方法const res await this.$api.getKaptcha({originResponse: true, // 这样写是为了在request那边特…

js中let和var的区别

在JavaScript中&#xff0c;var、let和const都用于声明变量&#xff0c;但它们之间存在一些重要的区别。特别是let和var之间的区别&#xff0c;我们可以概括为以下几点&#xff1a; 作用域&#xff08;Scope&#xff09;&#xff1a;var有函数作用域或全局作用域&#xff0c;而…