使用flink sqlserver cdc 同步数据到StarRocks

news2025/1/11 22:39:48

前沿: flink cdc功能越发强大,支持的数据源也越多,本篇介绍使用flink cdc实现:

sqlserver-》(using flink cdc)-〉flink -》(using flink starrocks connector)-〉starrocks整个流程

1.sqlserver 环境准备(得使用sqlserver 16以下版本,flink cdc当前只支持16以下sqlserver版本)

我这个使用的是docker环境:

xiuchenggong@xiuchengdeMacBook-Pro ~ % docker images
REPOSITORY                                          TAG                            IMAGE ID       CREATED         SIZE
starrocks.docker.scarf.sh/starrocks/allin1-ubuntu   latest                         4d3c0066a012   3 days ago      4.71GB
mcr.microsoft.com/mssql/server                      2019-latest                    e7fc0b49be3c   4 weeks ago     1.47GB
mcr.microsoft.com/mssql/server                      2022-latest                    683d523cd395   5 weeks ago     2.9GB
federatedai/standalone_fate                         latest                         6019ec787699   9 months ago    5.29GB
milvusdb/milvus                                     v2.1.4                         d9a5c977c414   11 months ago   711MB
starrocks/dev-env                                   main                           8f4edba3b115   16 months ago   7.65GB
minio/minio                                         RELEASE.2022-03-17T06-34-49Z   239acc52a73a   17 months ago   228MB
kicbase/stable                                      v0.0.29                        64d09634c60d   20 months ago   1.14GB
quay.io/coreos/etcd                                 v3.5.0                         a7908fd5fb88   2 years ago     110MB
docker run -e 'ACCEPT_EULA=Y' -e 'SA_PASSWORD=abc@123456' -p 30027:1433 --name sql_server_2019 -d mcr.microsoft.com/mssql/server:2019-latest

docker exec -it --user root sql_server_2019 bash

开启代理,重启sqlserver环境,连接: 

xiuchenggong@xiuchengdeMacBook-Pro ~ % docker exec -it --user root sql_server_2019 bash
root@99e196828047:/# /opt/mssql/bin/mssql-conf set sqlagent.enabled true
SQL Server needs to be restarted in order to apply this setting. Please run
'systemctl restart mssql-server.service'.
root@99e196828047:/# exit
exit
xiuchenggong@xiuchengdeMacBook-Pro ~ % docker restart sql_server_2019
sql_server_2019
xiuchenggong@xiuchengdeMacBook-Pro ~ % docker exec -it --user root sql_server_2019 bash

root@99e196828047:/# /opt/mssql-tools/bin/sqlcmd -S localhost -U SA -P "abc@123456"

开启sqlserver cdc功能:


root@99e196828047:/# /opt/mssql-tools/bin/sqlcmd -S localhost -U SA -P "abc@123456"
1> use cdc_test;
2> go
Changed database context to 'cdc_test'.
1> EXEC sys.sp_cdc_enable_db;
2> go
1> SELECT is_cdc_enabled FROM sys.databases WHERE name = 'cdc_test';
2> go
is_cdc_enabled

1> CREATE TABLE orders (id int,order_date date,purchaser int,quantity int,product_id int,PRIMARY KEY ([id]))
2> go
1>
2>
3> EXEC sys.sp_cdc_enable_table
4> @source_schema = 'dbo',
5> @source_name   = 'orders',
6> @role_name     = 'cdc_role';
7> go


Job 'cdc.cdc_test_capture' started successfully.
Job 'cdc.cdc_test_cleanup' started successfully.


          

插入一些数据:

1> select * from orders;
2> go
id          order_date       purchaser   quantity    product_id
----------- ---------------- ----------- ----------- -----------
          1       2023-07-07           1           1           1
          2       2023-07-07           2           2           2
          3       2023-07-07           3           3           3
          4       2023-07-07           4           4           4
         45       2023-07-07           5           5           5

(5 rows affected)
1> update orders set quantity = 100 where id =1 ;
2> go

(1 rows affected)
1> select * from orders;
2> go
id          order_date       purchaser   quantity    product_id
----------- ---------------- ----------- ----------- -----------
          1       2023-07-07           1         100           1
          2       2023-07-07           2           2           2
          3       2023-07-07           3           3           3
          4       2023-07-07           4           4           4
         45       2023-07-07           5           5           5

(5 rows affected)
1> update orders set quantity = 200 where id = 2;
2> go

2.准备flink环境:

  • 下载flink 1.16.2 (官网下载)
  • 下载flink sqlserver cdc 2.2.0 (Central Repository: com/ververica/flink-cdc-connectors)
  • 下载flink starrocks connector 1.15(这个应该也要下载对应版本1.16.2,但官方还没出,我拿1.15测试了也ok)下载链接:Release Release 1.2.6 · StarRocks/starrocks-connector-for-apache-flink · GitHub

3.准备starrocks docker环境:

见链接:使用 Docker 部署 StarRocks @ deploy_with_docker @ StarRocks Docs

4.启动flink环境(cd {FLINK_HOME}):

xiuchenggong@xiuchengdeMacBook-Pro bin % ./start-cluster.sh 
Starting cluster.
Starting standalonesession daemon on host xiuchengdeMacBook-Pro.local.
Starting taskexecutor daemon on host xiuchengdeMacBook-Pro.local.
xiuchenggong@xiuchengdeMacBook-Pro bin % ./sql-client.sh embedded
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/Users/xiuchenggong/flink/flink-1.16.2/lib/log4j-slf4j-impl-2.17.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/Users/xiuchenggong/flink/hadoop-3.3.1/share/hadoop/common/lib/slf4j-log4j12-1.7.30.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.apache.logging.slf4j.Log4jLoggerFactory]

                                   ▒▓██▓██▒
                               ▓████▒▒█▓▒▓███▓▒
                            ▓███▓░░        ▒▒▒▓██▒  ▒
                          ░██▒   ▒▒▓▓█▓▓▒░      ▒████
                          ██▒         ░▒▓███▒    ▒█▒█▒
                            ░▓█            ███   ▓░▒██
                              ▓█       ▒▒▒▒▒▓██▓░▒░▓▓█
                            █░ █   ▒▒░       ███▓▓█ ▒█▒▒▒
                            ████░   ▒▓█▓      ██▒▒▒ ▓███▒
                         ░▒█▓▓██       ▓█▒    ▓█▒▓██▓ ░█░
                   ▓░▒▓████▒ ██         ▒█    █▓░▒█▒░▒█▒
                  ███▓░██▓  ▓█           █   █▓ ▒▓█▓▓█▒
                ░██▓  ░█░            █  █▒ ▒█████▓▒ ██▓░▒
               ███░ ░ █░          ▓ ░█ █████▒░░    ░█░▓  ▓░
              ██▓█ ▒▒▓▒          ▓███████▓░       ▒█▒ ▒▓ ▓██▓
           ▒██▓ ▓█ █▓█       ░▒█████▓▓▒░         ██▒▒  █ ▒  ▓█▒
           ▓█▓  ▓█ ██▓ ░▓▓▓▓▓▓▓▒              ▒██▓           ░█▒
           ▓█    █ ▓███▓▒░              ░▓▓▓███▓          ░▒░ ▓█
           ██▓    ██▒    ░▒▓▓███▓▓▓▓▓██████▓▒            ▓███  █
          ▓███▒ ███   ░▓▓▒░░   ░▓████▓░                  ░▒▓▒  █▓
          █▓▒▒▓▓██  ░▒▒░░░▒▒▒▒▓██▓░                            █▓
          ██ ▓░▒█   ▓▓▓▓▒░░  ▒█▓       ▒▓▓██▓    ▓▒          ▒▒▓
          ▓█▓ ▓▒█  █▓░  ░▒▓▓██▒            ░▓█▒   ▒▒▒░▒▒▓█████▒
           ██░ ▓█▒█▒  ▒▓▓▒  ▓█                █░      ░░░░   ░█▒
           ▓█   ▒█▓   ░     █░                ▒█              █▓
            █▓   ██         █░                 ▓▓        ▒█▓▓▓▒█░
             █▓ ░▓██░       ▓▒                  ▓█▓▒░░░▒▓█░    ▒█
              ██   ▓█▓░      ▒                    ░▒█▒██▒      ▓▓
               ▓█▒   ▒█▓▒░                         ▒▒ █▒█▓▒▒░░▒██
                ░██▒    ▒▓▓▒                     ▓██▓▒█▒ ░▓▓▓▓▒█▓
                  ░▓██▒                          ▓░  ▒█▓█  ░░▒▒▒
                      ▒▓▓▓▓▓▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒░░▓▓  ▓░▒█░
          
    ______ _ _       _       _____  ____  _         _____ _ _            _  BETA   
   |  ____| (_)     | |     / ____|/ __ \| |       / ____| (_)          | |  
   | |__  | |_ _ __ | | __ | (___ | |  | | |      | |    | |_  ___ _ __ | |_ 
   |  __| | | | '_ \| |/ /  \___ \| |  | | |      | |    | | |/ _ \ '_ \| __|
   | |    | | | | | |   <   ____) | |__| | |____  | |____| | |  __/ | | | |_ 
   |_|    |_|_|_| |_|_|\_\ |_____/ \___\_\______|  \_____|_|_|\___|_| |_|\__|
          
        Welcome! Enter 'HELP;' to list all available commands. 'QUIT;' to exit.

Command history file path: /Users/xiuchenggong/.flink-sql-history

Flink SQL> 

建sqlsever到flink的表:

Flink SQL> CREATE TABLE t_source_sqlserver (
>     id INT,
>     order_date DATE,
>     purchaser INT,
>     quantity INT,
>     product_id INT,
>     PRIMARY KEY (id) NOT ENFORCED -- 主键定义(可选)
> ) WITH (
>     'connector' = 'sqlserver-cdc',  -- 使用SQL Server CDC连接器
>     'hostname' = 'localhost',  -- SQL Server主机名
>     'port' = '30027',               -- SQL Server端口
>     'username' = 'sa',              -- SQL Server用户名
>     'password' = 'abc@123456',      -- SQL Server密码
>     'database-name' = 'cdc_test',   -- 数据库名称
>     'schema-name' = 'dbo',          -- 模式名称
>     'table-name' = 'orders'         -- 要捕获更改的表名
> );

 再建flink到starrocks的表:

Flink SQL> 
> 
> CREATE TABLE IF NOT EXISTS `orders_sink` (
>      id int,
>      order_date date,
>      purchaser int,
>      quantity int,
>      product_id int,
>      PRIMARY KEY(`id`) NOT ENFORCED
> ) with (
> 'load-url' = 'localhost:8030',
> 'sink.buffer-flush.interval-ms' = '15000',
> 'sink.properties.row_delimiter' = '\x02',
> 'sink.properties.column_separator' = '\x01',
> 'connector' = 'starrocks',
> 'database-name' = 'test',
> 'table-name' = 'orders',
> 'jdbc-url' = 'jdbc:mysql://localhost:9030',
> 'password' = '',
> 'username' = 'root'
> )
> ;
[INFO] Execute statement succeed.

Flink SQL> show tables;
+--------------------+
|         table name |
+--------------------+
|        orders_sink |
| t_source_sqlserver |
+--------------------+
2 rows in set

提交作业:

Flink SQL> insert into orders_sink select * from t_source_sqlserver;
[INFO] Submitting SQL update statement to the cluster...
WARNING: An illegal reflective access operation has occurred
WARNING: Illegal reflective access by org.apache.flink.api.java.ClosureCleaner (file:/Users/xiuchenggong/flink/flink-1.16.2/lib/flink-dist-1.16.2.jar) to field java.lang.Class.ANNOTATION
WARNING: Please consider reporting this to the maintainers of org.apache.flink.api.java.ClosureCleaner
WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations
WARNING: All illegal access operations will be denied in a future release
[INFO] SQL update statement has been successfully submitted to the cluster:
Job ID: 746cc173cd71133e96d080f25327e9bc

flink webui看到长期驻留的作业:

5.验证在sqlserver中的数据是不是已经同步到starrocks中了,insert/update/delete:


StarRocks > select * from orders;
+------+------------+-----------+----------+------------+
| id   | order_date | purchaser | quantity | product_id |
+------+------------+-----------+----------+------------+
|    1 | 2023-07-07 |         1 |      100 |          1 |
|    3 | 2023-07-07 |         3 |        3 |          3 |
|    4 | 2023-07-07 |         4 |        4 |          4 |
|   45 | 2023-07-07 |         5 |        5 |          5 |
|    2 | 2023-07-07 |         2 |      200 |          2 |
+------+------------+-----------+----------+------------+
5 rows in set (0.016 sec)

StarRocks >

数据的增删改都同步过去了;

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/961730.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

SQL注入类型与技巧

目录 一、注入类型 1.联合查询注入 2.报错注入 3.bool注入(布尔盲注) 4.sleep注入(时间盲注) 二、注入技巧 1.科学计数法绕过正则 2.过滤information 3.无列名注入 一、注入类型 1.联合查询注入 MySQL联合查询注入利用union(联合查询)可以同时执行多条SQL语句的特点…

error: ‘std::_hypot‘ has not been declared using std::hypot;

Cmake 使用qt的编译器 编译opencv时 执行mingw32-make时出现了错误 本质原因就是 _hypot 没有声明。所以找到对应的文件声明一下 就行了。 E:\*****\Qt5.14.1\Tools\mingw730_64\lib\gcc\x86_64-w64-mingw32\7.3.0\include\c 下面的math.h 文件。 可以看到这个文件有一个…

8月31日-9月1日 第六章 案例:MySQL主从复制与读写分离(面试重点,必记)

本章结构 案例概述 案例前置知识点 详细图示 1、什么是读写分离&#xff1f; 读写分离&#xff0c;基本的原理是让主数据库处理事务性增、改、删操作&#xff08;INSERT、UPDATE、DELETE&#xff09;&#xff0c;而从数据库处理SELECT查询操作。数据库复制被用来把事务性操作导…

python实现MQTT协议(发布者,订阅者,topic)

python实现MQTT协议 一、简介 1.1 概述 本文章针对物联网MQTT协议完成python实现 1.2 环境 Apache-apollo创建brokerPython实现发布者和订阅者 1.3 内容 MQTT协议架构说明 &#xff1a; 利用仿真服务体会 MQTT协议 针对MQTT协议进行测试 任务1&#xff1a;MQTT协议应…

关于Incapsula reese84加密的特征研究

最近研究了下reese84的加密算法&#xff0c;基本上两个参数的加密__utmvc和token&#xff0c;因为nodejs调用会有内存问题&#xff0c;没有采用补环境的方式解决&#xff0c;用python扣的算法 1:__utmvc参数的生成是一个ob混淆&#xff0c;ast处理之后调试难度不是很大 测试结…

说说IO多路复用

分析&回答 IO多路复用 I/O multiplexing 这里面的 multiplexing 指的其实是在单个线程通过记录跟踪每一个Sock(I/O流)的状态(对应空管塔里面的Fight progress strip槽)来同时管理多个I/O流。直白点说&#xff1a;多路指的是多个socket连接&#xff0c;复用指的是复用一个…

ROS 2官方文档(基于humble版本)学习笔记(二)

ROS 2官方文档&#xff08;基于humble版本&#xff09;学习笔记&#xff08;二&#xff09; 理解节点&#xff08;node&#xff09;ros2 runros2 node list重映射&#xff08;remap&#xff09;ros2 node info 理解话题&#xff08;topic&#xff09;rqt_graphros2 topic listr…

JVM工具-1. jps 虚拟机进程状态工具

文章目录 1. jps介绍2. jps命令格式3. jps工具主要选项4. jps -q5. jps -m6. jps -l7. jps -v 1. jps介绍 jps(JVM Process Status Tool)&#xff1a;虚拟机进程状态工具&#xff0c;可以列出正在运行的虚拟机进程&#xff0c;并显示虚拟机执行主类&#xff08;Main Class&…

【校招VIP】操作系统考点之sleep和wait

考点介绍&#xff1a; 多线程可以说是进阶必备的知识点&#xff0c;也是面试中必备的考点。 可能不少人能对多线程说上一二&#xff0c;但这还远远不够&#xff0c;如果碰到比较有经验的面试官再继续追问&#xff0c;很可能会被吊打。 操作系统考点之sleep和wait 相关题目及解…

2023-9-1-虚拟网卡学习

&#x1f37f;*★,*:.☆(&#xffe3;▽&#xffe3;)/$:*.★* &#x1f37f; &#x1f4a5;&#x1f4a5;&#x1f4a5;欢迎来到&#x1f91e;汤姆&#x1f91e;的csdn博文&#x1f4a5;&#x1f4a5;&#x1f4a5; &#x1f49f;&#x1f49f;喜欢的朋友可以关注一下&#xf…

使用boost::geometry::union_ 合并边界(内、外):方案二

使用boost::geometry::union_ 合并边界&#xff08;内、外&#xff09;&#xff1a;方案二 typedef boost::geometry::model::d2::point_xy<double> boost_point; typedef boost::geometry::model::polygon<boost_point> boost_Polygon;struct Point {float x;floa…

kubernetes进阶 (二) 搭建harbor仓库及镜像制作

我遇到的场景规模较大&#xff0c;F5后有很多个仓库。 并且仓库直接还存在同步关系&#xff0c;因为拉取镜像走的是F5&#xff0c;当碰到莫名其妙的原因(仓库挂了&#xff0c;或者维护、磁盘满了&#xff09;&#xff0c;上送到根仓库的镜像没有同步到其他所以的仓库&#xff0…

高版本springboot3.1配置Eureka客户端问题

只需要按上面配置好&#xff0c;然后高版本的Eureka&#xff0c;不需要EnableEurekaClient这个注解了&#xff0c;直接SpringBoot启动&#xff0c;就可以注册到注册中心。 /*********************************************************/ /** * 开启eureka客户端功能 */ //E…

go锁-互斥锁

go锁-互斥锁 sema初始值是0&#xff0c;waitershift等待协程的数量 正常枷锁&#xff1a; 尝试CAS直接加锁&#xff0c;通过原子包给lockerd 为枷锁 若无法直接获取&#xff0c;进行多次自旋尝试&#xff0c;未获取到的锁的g &#xff0c;多次执行空语句&#xff0c;多次尝试…

indeogram用法

特点&#xff1a; indeogram.ai 是一种基于人工智能的图形设计工具&#xff0c;可以帮助用户快速和轻松地创建专业级的图形。它使用人工智能来识别图形的元素&#xff0c;并自动生成设计方案。这使得 indeogram.ai 非常适合没有任何图形设计经验的用户。 登录网站&#xff1a…

参数和BigDecimal zero比较失效的异常

记录Bigdecimal中参数和BigDecimal zero比较失效的异常 List<StockBatchDTO> b a.stream().filter(v->!v.getQuantity().equals(BigDecimal.ZERO)).collect(Collectors.toList());//失效List<StockBatchDTO> c a.stream().filter(v->v.getQuantity().comp…

高忆管理:股票最基本的知识?

股票是一个经济体中的一份所有权。持有股票的人成为公司的股东&#xff0c;代表他们在公司中有一定的决策权和分红权。股票商场是一个重要的金融商场&#xff0c;关于企业和出资者都具有重要的含义。那么&#xff0c;股票出资的基本知识是什么呢&#xff1f; 一、 股票的界说 …

DEAP库文档教程三-----创建类型

本节将继续展示如何通过creator创建类型以及如何使用toolbox如何对复杂问题进行初始化。 Particle的初始化--粒子初始化 一个Particle是另一个特殊类型的个体&#xff0c;这是因为通常情况下它有一个速度&#xff0c;并且有一个最优的位置需要去记忆。这种类型个体的创建与通…

六、高并发内存池--Central Cache

六、高并发内存池–Central Cache 6.1 Central Cache的工作原理 central cache也是一个哈希桶结构&#xff0c;他的哈希桶的映射关系跟thread cache是一样的。不同的是他的每个哈希桶位置挂是SpanList链表结构&#xff0c;不过每个映射桶下面的span中的大内存块被按映射关系切…

pycharm创建的虚拟环境为什么用conda env list命令查询不到?

问题描述&#xff1a;pycharm创建的虚拟环境为什么用conda env list命令查询不到。 pycharm开发环境可以创建虚拟环境&#xff0c;目的是为隔绝其他环境种库带来的版本干扰&#xff0c;但是发现一个问题&#xff0c;无论是在windows终端、anaconda终端、Pycharm开发环境中的终…