flink cdc多种数据源安装、配置与验证
文章目录
- 1. 前言
- 2. 数据源安装与配置
- 2.1 MySQL
- 2.1.1 安装
- 2.1.2 CDC 配置
- 2.2 Postgresql
- 2.2.1 安装
- 2.2.2 CDC 配置
- 2.3 Oracle
- 2.3.1 安装
- 2.3.2 CDC 配置
- 2.4 SQLServer
- 2.4.1 安装
- 2.4.2 CDC 配置
- 3. 验证
- 3.1 Flink版本与CDC版本的对应关系
- 3.2 下载相关包
- 3.3 添加cdc jar 至lib目录
- 3.4 验证
本文目录结构:
|___ 1. 前言
|___ 2. 数据源安装与配置
|______ 2.1 MySQL
|_________ 2.1.1 安装
|_________ 2.1.2 CDC 配置
|______ 2.2 Postgresql
|_________ 2.2.1 安装
|_________ 2.2.2 CDC 配置
|______ 2.3 Oracle
|_________2.3.1 安装
|_________2.3.2 CDC 配置
|_______2.4 SQLServer
|_________2.4.1 安装
|_________2.4.2 CDC 配置
|___ 3. 验证
|_______3.1 Flink版本与CDC版本的对应关系
|_______3.2 下载相关包
|_______3.3 添加cdc jar 至lib目录
|_______3.4 验证
1. 前言
关于如何使用和配置flink cdc
功能,其实在官方文档(CDC Connectors for Apache Flink® — CDC Connectors for Apache Flink® documentation)有相关的教程了,如下:
但是讲解的不是很详细,比如数据源怎么安装?怎么配置?都没有很详细的描述每一步骤,因此博主前面发布多篇文章以此来记录flink cdc
相关数据源以及其配置相关的文章,有兴趣的同学可以参考下:
- 《docker下安装oracle11g(一次安装成功)》
- 《Docker下安装SqlServer2019》
- 《flink postgresql cdc实时同步(含pg安装配置等)》
- 《flink oracle cdc实时同步(超详细)》
- 《flink sqlserver cdc实时同步(含sqlserver安装配置等)》
本文主要就是记录在docker
下安装和配置各种数据源,以实现flink cdc
的功能,包含如下常见的数据源:
数据源 | 版本 |
---|---|
MySQL | 8.0.25 |
Postgresql | 10.6 |
Oracle | 11g |
SqlServer | 2019 |
2. 数据源安装与配置
2.1 MySQL
版本:8.0.25
2.1.1 安装
Step1: 拉取mysql镜像:
<span style="color:rgba(0, 0, 0, 0.75)"><span style="background-color:#ffffff"><span style="color:#000000"><span style="background-color:#282a36"><code class="language-shell"><span style="color:#8be9fd">docker</span> pull mysql:8.0.25
</code></span></span></span></span>
Step2: 创建并运行 MySQL 容器
<span style="color:rgba(0, 0, 0, 0.75)"><span style="background-color:#ffffff"><span style="color:#000000"><span style="background-color:#282a36"><code class="language-shell"><span style="color:#8be9fd">docker</span> run <span style="color:#ee9900">-d</span> <span style="color:#ee9900">-p</span> <span style="color:#f1fa8c">30025</span>:3306 <span style="color:#ee9900">--name</span> mysql8.0.25 <span style="color:#ee9900">-e</span> <span style="color:#ee9900">MYSQL_ROOT_PASSWORD</span><span style="color:#6272a4">=</span>root mysql:8.0.25
</code></span></span></span></span>
- 1
2.1.2 CDC 配置
Step1:进入正在运行的mysql容器:
<span style="color:rgba(0, 0, 0, 0.75)"><span style="background-color:#ffffff"><span style="color:#000000"><span style="background-color:#282a36"><code class="language-shell"><span style="color:#8be9fd">docker</span> <span style="color:#f1fa8c">exec</span> <span style="color:#ee9900">-it</span> mysql8.0.25 mysql <span style="color:#ee9900">-uroot</span> <span style="color:#ee9900">-proot</span>
</code></span></span></span></span>
- 1
Step2:配置 CDC
<span style="color:rgba(0, 0, 0, 0.75)"><span style="background-color:#ffffff"><span style="color:#000000"><span style="background-color:#282a36"><code class="language-sql"><span style="color:#6272a4">-- 启用二进制日志</span>
mysql<span style="color:#6272a4">></span> <span style="color:#ff79c6">SET</span> <span style="color:#ff79c6">GLOBAL</span> log_bin <span style="color:#6272a4">=</span> <span style="color:#ff79c6">ON</span><span style="color:#999999">;</span>
<span style="color:#6272a4">-- 设置二进制日志格式为行级别</span>
mysql<span style="color:#6272a4">></span> <span style="color:#ff79c6">SET</span> <span style="color:#ff79c6">GLOBAL</span> binlog_format <span style="color:#6272a4">=</span> <span style="color:#f1fa8c">'ROW'</span><span style="color:#999999">;</span>
</code></span></span></span></span>
Step3(非必要):如果配置没生效,重启容器
<span style="color:rgba(0, 0, 0, 0.75)"><span style="background-color:#ffffff"><span style="color:#000000"><span style="background-color:#282a36"><code class="language-shell"><span style="color:#8be9fd">docker</span> restart mysql8.0.25
</code></span></span></span></span>
- 1
2.2 Postgresql
版本:PostgreSQL 10.6 (Debian 10.6-1.pgdg90+1)
2.2.1 安装
Step1: 拉取 PostgreSQL 10.6 版本的镜像:
<span style="color:rgba(0, 0, 0, 0.75)"><span style="background-color:#ffffff"><span style="color:#000000"><span style="background-color:#282a36"><code class="language-shell"><span style="color:#8be9fd">docker</span> pull postgres:10.6
</code></span></span></span></span>
- 1
Step2:创建并启动 PostgreSQL
容器,在这里,我们将把容器的端口 5432 映射到主机的端口 30028,账号密码设置为postgres
,并将 pgoutput
插件加载到 PostgreSQL
实例中:
<span style="color:rgba(0, 0, 0, 0.75)"><span style="background-color:#ffffff"><span style="color:#000000"><span style="background-color:#282a36"><code class="language-shell"><span style="color:#8be9fd">docker</span> run <span style="color:#ee9900">-d</span> <span style="color:#ee9900">-p</span> <span style="color:#f1fa8c">30028</span>:5432 <span style="color:#ee9900">--name</span> postgres-10.6 <span style="color:#ee9900">-e</span> <span style="color:#ee9900">POSTGRES_PASSWORD</span><span style="color:#6272a4">=</span>postgres postgres:10.6 <span style="color:#ee9900">-c</span> <span style="color:#f1fa8c">'shared_preload_libraries=pgoutput'</span>
</code></span></span></span></span>
- 1
Step3: 查看容器是否创建成功:
<span style="color:rgba(0, 0, 0, 0.75)"><span style="background-color:#ffffff"><span style="color:#000000"><span style="background-color:#282a36"><code class="language-shell"><span style="color:#8be9fd">docker</span> <span style="color:#8be9fd">ps</span> <span style="color:#6272a4">|</span> <span style="color:#8be9fd">grep</span> postgres-10.6
</code></span></span></span></span>
- 1
2.2.2 CDC 配置
Step1:docker进去Postgresql数据的容器:
<span style="color:rgba(0, 0, 0, 0.75)"><span style="background-color:#ffffff"><span style="color:#000000"><span style="background-color:#282a36"><code class="language-shell"><span style="color:#8be9fd">docker</span> <span style="color:#f1fa8c">exec</span> <span style="color:#ee9900">-it</span> postgres-10.6 <span style="color:#8be9fd">bash</span>
</code></span></span></span></span>
- 1
Step2:编辑postgresql.conf
配置文件:
<span style="color:rgba(0, 0, 0, 0.75)"><span style="background-color:#ffffff"><span style="color:#000000"><span style="background-color:#282a36"><code class="language-shell"><span style="color:#8be9fd">vi</span> /var/lib/postgresql/data/postgresql.conf
</code></span></span></span></span>
- 1
配置内容如下:
<span style="color:rgba(0, 0, 0, 0.75)"><span style="background-color:#ffffff"><span style="color:#000000"><span style="background-color:#282a36"><code class="language-py"><span style="color:#6272a4"># 更改wal日志方式为logical(方式有:minimal、replica 、logical )</span>
wal_level <span style="color:#6272a4">=</span> logical
<span style="color:#6272a4"># 更改solts最大数量(默认值为10),flink-cdc默认一张表占用一个slots</span>
max_replication_slots <span style="color:#6272a4">=</span> <span style="color:#f1fa8c">20</span>
<span style="color:#6272a4"># 更改wal发送最大进程数(默认值为10),这个值和上面的solts设置一样</span>
max_wal_senders <span style="color:#6272a4">=</span> <span style="color:#f1fa8c">20</span>
<span style="color:#6272a4"># 中断那些停止活动超过指定毫秒数的复制连接,可以适当设置大一点(默认60s,0表示禁用)</span>
wal_sender_timeout <span style="color:#6272a4">=</span> 180s
</code></span></span></span></span>
Step3:重启容器:
<span style="color:rgba(0, 0, 0, 0.75)"><span style="background-color:#ffffff"><span style="color:#000000"><span style="background-color:#282a36"><code class="language-shell"><span style="color:#8be9fd">docker</span> restart postgres-10.6
</code></span></span></span></span>
- 1
连接数据库,如果查询一下语句,返回logical
表示修改成功:
<span style="color:rgba(0, 0, 0, 0.75)"><span style="background-color:#ffffff"><span style="color:#000000"><span style="background-color:#282a36"><code class="language-sql"><span style="color:#ff79c6">SHOW</span> wal_level<span style="color:#999999">;</span>
</code></span></span></span></span>
- 1
Step4:新建用户并赋权。使用创建容器时的账号密码(postgres/postgres
)登录Postgresql数据库。
<span style="color:rgba(0, 0, 0, 0.75)"><span style="background-color:#ffffff"><span style="color:#000000"><span style="background-color:#282a36"><code class="language-sql"><span style="color:#6272a4">-- 创建数据库 test_db</span>
<span style="color:#ff79c6">CREATE</span> <span style="color:#ff79c6">DATABASE</span> test_db<span style="color:#999999">;</span>
<span style="color:#6272a4">-- 连接到新创建的数据库 test_db</span>
\c test_db
<span style="color:#6272a4">-- 创建 t_user 表</span>
<span style="color:#ff79c6">CREATE</span> <span style="color:#ff79c6">TABLE</span> <span style="color:#f1fa8c">"public"</span><span style="color:#999999">.</span><span style="color:#f1fa8c">"t_user"</span> <span style="color:#999999">(</span>
<span style="color:#f1fa8c">"id"</span> int8 <span style="color:#6272a4">NOT</span> <span style="color:#8be9fd">NULL</span><span style="color:#999999">,</span>
<span style="color:#f1fa8c">"name"</span> <span style="color:#ff79c6">varchar</span><span style="color:#999999">(</span><span style="color:#f1fa8c">255</span><span style="color:#999999">)</span><span style="color:#999999">,</span>
<span style="color:#f1fa8c">"age"</span> int2<span style="color:#999999">,</span>
<span style="color:#ff79c6">PRIMARY</span> <span style="color:#ff79c6">KEY</span> <span style="color:#999999">(</span><span style="color:#f1fa8c">"id"</span><span style="color:#999999">)</span>
<span style="color:#999999">)</span><span style="color:#999999">;</span>
<span style="color:#6272a4">-- pg新建用户</span>
<span style="color:#ff79c6">CREATE</span> <span style="color:#ff79c6">USER</span> test1 <span style="color:#ff79c6">WITH</span> PASSWORD <span style="color:#f1fa8c">'test123'</span><span style="color:#999999">;</span>
<span style="color:#6272a4">-- 给用户复制流权限</span>
<span style="color:#ff79c6">ALTER</span> ROLE test1 <span style="color:#ff79c6">replication</span><span style="color:#999999">;</span>
<span style="color:#6272a4">-- 给用户登录数据库权限</span>
<span style="color:#ff79c6">GRANT</span> <span style="color:#ff79c6">CONNECT</span> <span style="color:#ff79c6">ON</span> <span style="color:#ff79c6">DATABASE</span> test_db <span style="color:#ff79c6">to</span> test1<span style="color:#999999">;</span>
<span style="color:#6272a4">-- 把当前库public下所有表查询权限赋给用户</span>
<span style="color:#ff79c6">GRANT</span> <span style="color:#ff79c6">ALL</span> <span style="color:#ff79c6">PRIVILEGES</span> <span style="color:#ff79c6">ON</span> <span style="color:#ff79c6">ALL</span> <span style="color:#ff79c6">TABLES</span> <span style="color:#6272a4">IN</span> <span style="color:#ff79c6">SCHEMA</span> <span style="color:#ff79c6">public</span> <span style="color:#ff79c6">TO</span> test1<span style="color:#999999">;</span>
</code></span></span></span></span>
Step4:发布表:
<span style="color:rgba(0, 0, 0, 0.75)"><span style="background-color:#ffffff"><span style="color:#000000"><span style="background-color:#282a36"><code class="language-sql"><span style="color:#6272a4">-- 设置发布为true</span>
<span style="color:#ff79c6">update</span> pg_publication <span style="color:#ff79c6">set</span> puballtables<span style="color:#6272a4">=</span><span style="color:#8be9fd">true</span> <span style="color:#ff79c6">where</span> pubname <span style="color:#6272a4">is</span> <span style="color:#6272a4">not</span> <span style="color:#8be9fd">null</span><span style="color:#999999">;</span>
<span style="color:#6272a4">-- 把所有表进行发布</span>
<span style="color:#ff79c6">CREATE</span> PUBLICATION dbz_publication <span style="color:#ff79c6">FOR</span> <span style="color:#ff79c6">ALL</span> <span style="color:#ff79c6">TABLES</span><span style="color:#999999">;</span>
<span style="color:#6272a4">-- 查询哪些表已经发布</span>
<span style="color:#ff79c6">select</span> <span style="color:#6272a4">*</span> <span style="color:#ff79c6">from</span> pg_publication_tables<span style="color:#999999">;</span>
<span style="color:#6272a4">-- 更改复制标识包含更新和删除之前值(目的是为了确保表 t_user 在实时同步过程中能够正确地捕获并同步更新和删除的数据变化。如果不执行这两条语句,那么 t_user 表的复制标识可能默认为 NOTHING,这可能导致实时同步时丢失更新和删除的数据行信息,从而影响同步的准确性)</span>
<span style="color:#ff79c6">ALTER</span> <span style="color:#ff79c6">TABLE</span> t_user REPLICA <span style="color:#ff79c6">IDENTITY</span> <span style="color:#ff79c6">FULL</span><span style="color:#999999">;</span>
<span style="color:#6272a4">-- 查看复制标识(为f标识说明设置成功,f(表示 full),否则为 n(表示 nothing),即复制标识未设置)</span>
<span style="color:#ff79c6">select</span> relreplident <span style="color:#ff79c6">from</span> pg_class <span style="color:#ff79c6">where</span> relname<span style="color:#6272a4">=</span><span style="color:#f1fa8c">'t_user'</span><span style="color:#999999">;</span>
</code></span></span></span></span>
2.3 Oracle
版本:Oracle Database 11g Enterprise Edition Release 11.2.0.1.0 - 64bit Production
2.3.1 安装
Step1:拉取 oracle 11g 镜像(有6g,要等较长的时间)
<span style="color:rgba(0, 0, 0, 0.75)"><span style="background-color:#ffffff"><span style="color:#000000"><span style="background-color:#282a36"><code class="language-shell"><span style="color:#8be9fd">docker</span> pull registry.cn-hangzhou.aliyuncs.com/helowin/oracle_11g
</code></span></span></span></span>
- 1
Step2:执行以下命令以创建并运行 Oracle 11g 容器
<span style="color:rgba(0, 0, 0, 0.75)"><span style="background-color:#ffffff"><span style="color:#000000"><span style="background-color:#282a36"><code class="language-shell"><span style="color:#8be9fd">docker</span> run <span style="color:#ee9900">-d</span> <span style="color:#ee9900">-p</span> <span style="color:#f1fa8c">30026</span>:1521 <span style="color:#ee9900">-p</span> <span style="color:#f1fa8c">8081</span>:8080 <span style="color:#999999">\</span>
<span style="color:#ee9900">--name</span> oracle_11g <span style="color:#999999">\</span>
<span style="color:#ee9900">-e</span> <span style="color:#ee9900">ORACLE_HOME</span><span style="color:#6272a4">=</span>/home/oracle/app/oracle/product/11.2.0/dbhome_2 <span style="color:#999999">\</span>
<span style="color:#ee9900">-e</span> <span style="color:#ee9900">ORACLE_SID</span><span style="color:#6272a4">=</span>helowin <span style="color:#999999">\</span>
registry.cn-hangzhou.aliyuncs.com/helowin/oracle_11g
</code></span></span></span></span>
Step3:查看容器是否启动
<span style="color:rgba(0, 0, 0, 0.75)"><span style="background-color:#ffffff"><span style="color:#000000"><span style="background-color:#282a36"><code class="language-shell"><span style="color:#8be9fd">docker</span> <span style="color:#8be9fd">ps</span> -a<span style="color:#6272a4">|</span><span style="color:#8be9fd">grep</span> oracle_11g
</code></span></span></span></span>
Step4:进入容器
<span style="color:rgba(0, 0, 0, 0.75)"><span style="background-color:#ffffff"><span style="color:#000000"><span style="background-color:#282a36"><code class="language-shell"><span style="color:#8be9fd">docker</span> <span style="color:#f1fa8c">exec</span> <span style="color:#ee9900">-it</span> oracle_11g <span style="color:#8be9fd">bash</span>
</code></span></span></span></span>
**Step5:**设置账号密码
<span style="color:rgba(0, 0, 0, 0.75)"><span style="background-color:#ffffff"><span style="color:#000000"><span style="background-color:#282a36"><code class="language-shell"><span style="color:#6272a4"># 1. 切换至root用户(默认是oracle用户),密码为helowin</span>
<span style="color:#8be9fd">su</span> root
<span style="color:#6272a4"># 2. 创建软链接</span>
<span style="color:#8be9fd">ln</span> <span style="color:#ee9900">-s</span> <span style="color:#ee9900">$ORACLE_HOME</span>/bin/sqlplus /usr/bin
<span style="color:#6272a4"># 3.切换回oracle用户</span>
<span style="color:#8be9fd">su</span> oracle
<span style="color:#6272a4"># 4. 登录sql plus</span>
sqlplus /nolog
conn /as sysdba
<span style="color:#6272a4">## 4.1 修改system用户密码为system</span>
alter user system identified by system<span style="color:#999999">;</span>
<span style="color:#6272a4">## 4.2 修改sys用户密码为system</span>
alter user sys identified by system<span style="color:#999999">;</span>
<span style="color:#6272a4">## 4.3 新增一个测试用户(用户名:test,密码:test123);</span>
create user <span style="color:#f1fa8c">test</span> identified by test123<span style="color:#999999">;</span>
<span style="color:#6272a4">## 4.4 将dba权限给内部管理员账号和密码</span>
grant connect,resource,dba to <span style="color:#f1fa8c">test</span><span style="color:#999999">;</span>
<span style="color:#6272a4">## 4.5 修改密码策略规则为:密码永不过期</span>
ALTER PROFILE DEFAULT LIMIT PASSWORD_LIFE_TIME UNLIMITED<span style="color:#999999">;</span>
<span style="color:#6272a4">## 4.6 修改数据库最大连接数;</span>
alter system <span style="color:#f1fa8c">set</span> <span style="color:#ee9900">processes</span><span style="color:#6272a4">=</span><span style="color:#f1fa8c">1000</span> <span style="color:#ee9900">scope</span><span style="color:#6272a4">=</span>spfile<span style="color:#999999">;</span>
<span style="color:#6272a4">## 4.7 最后重启数据库;</span>
<span style="color:#8be9fd">shutdown</span> immediate<span style="color:#999999">;</span>
startup<span style="color:#999999">;</span>
<span style="color:#6272a4"># 5.退出</span>
<span style="color:#f1fa8c">exit</span>
</code></span></span></span></span>
2.3.2 CDC 配置
Step1:进入容器
<span style="color:rgba(0, 0, 0, 0.75)"><span style="background-color:#ffffff"><span style="color:#000000"><span style="background-color:#282a36"><code class="language-shell"><span style="color:#8be9fd">docker</span> <span style="color:#f1fa8c">exec</span> <span style="color:#ee9900">-it</span> oracle_11g <span style="color:#8be9fd">bash</span>
</code></span></span></span></span>
Step2:以DBA的权限登录数据库
<span style="color:rgba(0, 0, 0, 0.75)"><span style="background-color:#ffffff"><span style="color:#000000"><span style="background-color:#282a36"><code class="language-sql">sqlplus <span style="color:#6272a4">/</span>nolog
<span style="color:#ff79c6">CONNECT</span> sys<span style="color:#6272a4">/</span>system <span style="color:#ff79c6">AS</span> SYSDBA
</code></span></span></span></span>
Step3:启用日志归档
<span style="color:rgba(0, 0, 0, 0.75)"><span style="background-color:#ffffff"><span style="color:#000000"><span style="background-color:#282a36"><code class="language-sql"><span style="color:#6272a4">-- 设置数据库恢复文件目标大小为10G</span>
<span style="color:#ff79c6">alter</span> system <span style="color:#ff79c6">set</span> db_recovery_file_dest_size <span style="color:#6272a4">=</span> <span style="color:#f1fa8c">10</span>G<span style="color:#999999">;</span>
<span style="color:#6272a4">-- 设置数据库恢复文件目标路径</span>
<span style="color:#ff79c6">alter</span> system <span style="color:#ff79c6">set</span> db_recovery_file_dest <span style="color:#6272a4">=</span> <span style="color:#f1fa8c">'/home/oracle/app/oracle/product/11.2.0'</span> scope<span style="color:#6272a4">=</span>spfile<span style="color:#999999">;</span>
<span style="color:#6272a4">-- 立即关闭数据库</span>
<span style="color:#ff79c6">shutdown</span> immediate<span style="color:#999999">;</span>
<span style="color:#6272a4">-- 以mount模式启动数据库</span>
startup mount<span style="color:#999999">;</span>
<span style="color:#6272a4">-- 启用数据库归档日志模式</span>
<span style="color:#ff79c6">alter</span> <span style="color:#ff79c6">database</span> archivelog<span style="color:#999999">;</span>
<span style="color:#6272a4">-- 打开数据库,允许用户访问</span>
<span style="color:#ff79c6">alter</span> <span style="color:#ff79c6">database</span> <span style="color:#ff79c6">open</span><span style="color:#999999">;</span>
</code></span></span></span></span>
Step4:查看日志归档是否启用(如果显示“Archive Mode”表示已经启用)
<span style="color:rgba(0, 0, 0, 0.75)"><span style="background-color:#ffffff"><span style="color:#000000"><span style="background-color:#282a36"><code class="language-sql">archive log list<span style="color:#999999">;</span>
</code></span></span></span></span>
- 1
Step5:创建表空间
<span style="color:rgba(0, 0, 0, 0.75)"><span style="background-color:#ffffff"><span style="color:#000000"><span style="background-color:#282a36"><code class="language-sql"><span style="color:#6272a4">-- 以DBA的权限登录数据库</span>
sqlplus <span style="color:#6272a4">/</span>nolog
<span style="color:#ff79c6">CONNECT</span> sys<span style="color:#6272a4">/</span>system <span style="color:#ff79c6">AS</span> SYSDBA
<span style="color:#6272a4">-- 创建一个名为"logminer_tbs"的表空间</span>
<span style="color:#6272a4">-- 指定表空间的数据文件路径为"/home/oracle/app/oracle/product/11.2.0/logminer_tbs.dbf",其中"/home/oracle/app/oracle/product/11.2.0"是数据文件存储的目录,"logminer_tbs.dbf"是数据文件的文件名</span>
<span style="color:#6272a4">-- 设置表空间的初始大小为25MB</span>
<span style="color:#6272a4">-- 如果数据文件已经存在且可重用,将其重用,否则创建一个新的数据文件</span>
<span style="color:#6272a4">-- 启用表空间的自动扩展功能,即当表空间空间不足时,自动增加数据文件的大小</span>
<span style="color:#6272a4">-- 设置表空间的最大允许大小为无限,即表空间可以无限制地自动扩展</span>
<span style="color:#ff79c6">CREATE</span> <span style="color:#ff79c6">TABLESPACE</span> logminer_tbs DATAFILE <span style="color:#f1fa8c">'/home/oracle/app/oracle/product/11.2.0/logminer_tbs.dbf'</span> SIZE <span style="color:#f1fa8c">25</span>M REUSE AUTOEXTEND <span style="color:#ff79c6">ON</span> MAXSIZE UNLIMITED<span style="color:#999999">;</span>
</code></span></span></span></span>
Step6:创建用户并赋权
<span style="color:rgba(0, 0, 0, 0.75)"><span style="background-color:#ffffff"><span style="color:#000000"><span style="background-color:#282a36"><code class="language-sql"><span style="color:#6272a4">-- 创建一个名为"flinkuser"的用户,密码为"flinkpw",将其默认表空间设置为"LOGMINER_TBS",并在该表空间上设置无限配额。</span>
<span style="color:#ff79c6">CREATE</span> <span style="color:#ff79c6">USER</span> flinkuser IDENTIFIED <span style="color:#ff79c6">BY</span> flinkpw <span style="color:#ff79c6">DEFAULT</span> <span style="color:#ff79c6">TABLESPACE</span> LOGMINER_TBS QUOTA UNLIMITED <span style="color:#ff79c6">ON</span> LOGMINER_TBS<span style="color:#999999">;</span>
<span style="color:#6272a4">-- 允许"flinkuser"用户创建会话,即允许该用户连接到数据库。</span>
<span style="color:#ff79c6">GRANT</span> <span style="color:#ff79c6">CREATE</span> <span style="color:#ff79c6">SESSION</span> <span style="color:#ff79c6">TO</span> flinkuser<span style="color:#999999">;</span>
<span style="color:#6272a4">-- (不支持Oracle 11g)允许"flinkuser"用户在多租户数据库(CDB)中设置容器。</span>
<span style="color:#6272a4">-- GRANT SET CONTAINER TO flinkuser;</span>
<span style="color:#6272a4">-- 允许"flinkuser"用户查询V_$DATABASE视图,该视图包含有关数据库实例的信息。</span>
<span style="color:#ff79c6">GRANT</span> <span style="color:#ff79c6">SELECT</span> <span style="color:#ff79c6">ON</span> V_$<span style="color:#ff79c6">DATABASE</span> <span style="color:#ff79c6">TO</span> flinkuser<span style="color:#999999">;</span>
<span style="color:#6272a4">-- 允许"flinkuser"用户执行任何表的闪回操作。</span>
<span style="color:#ff79c6">GRANT</span> FLASHBACK <span style="color:#ff79c6">ANY</span> <span style="color:#ff79c6">TABLE</span> <span style="color:#ff79c6">TO</span> flinkuser<span style="color:#999999">;</span>
<span style="color:#6272a4">-- 允许"flinkuser"用户查询任何表的数据。</span>
<span style="color:#ff79c6">GRANT</span> <span style="color:#ff79c6">SELECT</span> <span style="color:#ff79c6">ANY</span> <span style="color:#ff79c6">TABLE</span> <span style="color:#ff79c6">TO</span> flinkuser<span style="color:#999999">;</span>
<span style="color:#6272a4">-- 允许"flinkuser"用户拥有SELECT_CATALOG_ROLE角色,该角色允许查询数据字典和元数据。</span>
<span style="color:#ff79c6">GRANT</span> SELECT_CATALOG_ROLE <span style="color:#ff79c6">TO</span> flinkuser<span style="color:#999999">;</span>
<span style="color:#6272a4">-- 允许"flinkuser"用户拥有EXECUTE_CATALOG_ROLE角色,该角色允许执行一些数据字典中的过程和函数。</span>
<span style="color:#ff79c6">GRANT</span> EXECUTE_CATALOG_ROLE <span style="color:#ff79c6">TO</span> flinkuser<span style="color:#999999">;</span>
<span style="color:#6272a4">-- 允许"flinkuser"用户查询任何事务。</span>
<span style="color:#ff79c6">GRANT</span> <span style="color:#ff79c6">SELECT</span> <span style="color:#ff79c6">ANY</span> <span style="color:#ff79c6">TRANSACTION</span> <span style="color:#ff79c6">TO</span> flinkuser<span style="color:#999999">;</span>
<span style="color:#6272a4">-- (不支持Oracle 11g)允许"flinkuser"用户进行数据变更追踪(LogMiner)。</span>
<span style="color:#6272a4">-- GRANT LOGMINING TO flinkuser;</span>
<span style="color:#6272a4">-- 允许"flinkuser"用户创建表。</span>
<span style="color:#ff79c6">GRANT</span> <span style="color:#ff79c6">CREATE</span> <span style="color:#ff79c6">TABLE</span> <span style="color:#ff79c6">TO</span> flinkuser<span style="color:#999999">;</span>
<span style="color:#6272a4">-- 允许"flinkuser"用户锁定任何表。</span>
<span style="color:#ff79c6">GRANT</span> <span style="color:#ff79c6">LOCK</span> <span style="color:#ff79c6">ANY</span> <span style="color:#ff79c6">TABLE</span> <span style="color:#ff79c6">TO</span> flinkuser<span style="color:#999999">;</span>
<span style="color:#6272a4">-- 允许"flinkuser"用户修改任何表。</span>
<span style="color:#ff79c6">GRANT</span> <span style="color:#ff79c6">ALTER</span> <span style="color:#ff79c6">ANY</span> <span style="color:#ff79c6">TABLE</span> <span style="color:#ff79c6">TO</span> flinkuser<span style="color:#999999">;</span>
<span style="color:#6272a4">-- 允许"flinkuser"用户创建序列。</span>
<span style="color:#ff79c6">GRANT</span> <span style="color:#ff79c6">CREATE</span> SEQUENCE <span style="color:#ff79c6">TO</span> flinkuser<span style="color:#999999">;</span>
<span style="color:#6272a4">-- 允许"flinkuser"用户执行DBMS_LOGMNR包中的过程。</span>
<span style="color:#ff79c6">GRANT</span> <span style="color:#ff79c6">EXECUTE</span> <span style="color:#ff79c6">ON</span> DBMS_LOGMNR <span style="color:#ff79c6">TO</span> flinkuser<span style="color:#999999">;</span>
<span style="color:#6272a4">-- 允许"flinkuser"用户执行DBMS_LOGMNR_D包中的过程。</span>
<span style="color:#ff79c6">GRANT</span> <span style="color:#ff79c6">EXECUTE</span> <span style="color:#ff79c6">ON</span> DBMS_LOGMNR_D <span style="color:#ff79c6">TO</span> flinkuser<span style="color:#999999">;</span>
<span style="color:#6272a4">-- 允许"flinkuser"用户查询V_$LOG视图,该视图包含有关数据库日志文件的信息。</span>
<span style="color:#ff79c6">GRANT</span> <span style="color:#ff79c6">SELECT</span> <span style="color:#ff79c6">ON</span> V_$LOG <span style="color:#ff79c6">TO</span> flinkuser<span style="color:#999999">;</span>
<span style="color:#6272a4">-- 允许"flinkuser"用户查询V_$LOG_HISTORY视图,该视图包含有关数据库历史日志文件的信息。</span>
<span style="color:#ff79c6">GRANT</span> <span style="color:#ff79c6">SELECT</span> <span style="color:#ff79c6">ON</span> V_$LOG_HISTORY <span style="color:#ff79c6">TO</span> flinkuser<span style="color:#999999">;</span>
<span style="color:#6272a4">-- 允许"flinkuser"用户查询V_$LOGMNR_LOGS视图,该视图包含有关LogMiner日志文件的信息。</span>
<span style="color:#ff79c6">GRANT</span> <span style="color:#ff79c6">SELECT</span> <span style="color:#ff79c6">ON</span> V_$LOGMNR_LOGS <span style="color:#ff79c6">TO</span> flinkuser<span style="color:#999999">;</span>
<span style="color:#6272a4">-- 允许"flinkuser"用户查询V_$LOGMNR_CONTENTS视图,该视图包含LogMiner日志文件的内容。</span>
<span style="color:#ff79c6">GRANT</span> <span style="color:#ff79c6">SELECT</span> <span style="color:#ff79c6">ON</span> V_$LOGMNR_CONTENTS <span style="color:#ff79c6">TO</span> flinkuser<span style="color:#999999">;</span>
<span style="color:#6272a4">-- 允许"flinkuser"用户查询V_$LOGMNR_PARAMETERS视图,该视图包含有关LogMiner的参数信息。</span>
<span style="color:#ff79c6">GRANT</span> <span style="color:#ff79c6">SELECT</span> <span style="color:#ff79c6">ON</span> V_$LOGMNR_PARAMETERS <span style="color:#ff79c6">TO</span> flinkuser<span style="color:#999999">;</span>
<span style="color:#6272a4">-- 允许"flinkuser"用户查询V_$LOGFILE视图,该视图包含有关数据库日志文件的信息。</span>
<span style="color:#ff79c6">GRANT</span> <span style="color:#ff79c6">SELECT</span> <span style="color:#ff79c6">ON</span> V_$LOGFILE <span style="color:#ff79c6">TO</span> flinkuser<span style="color:#999999">;</span>
<span style="color:#6272a4">-- 允许"flinkuser"用户查询V_$ARCHIVED_LOG视图,该视图包含已归档的数据库日志文件的信息。</span>
<span style="color:#ff79c6">GRANT</span> <span style="color:#ff79c6">SELECT</span> <span style="color:#ff79c6">ON</span> V_$ARCHIVED_LOG <span style="color:#ff79c6">TO</span> flinkuser<span style="color:#999999">;</span>
<span style="color:#6272a4">-- 允许"flinkuser"用户查询V_$ARCHIVE_DEST_STATUS视图,该视图包含有关归档目标状态的信息。</span>
<span style="color:#ff79c6">GRANT</span> <span style="color:#ff79c6">SELECT</span> <span style="color:#ff79c6">ON</span> V_$ARCHIVE_DEST_STATUS <span style="color:#ff79c6">TO</span> flinkuser<span style="color:#999999">;</span>
</code></span></span></span></span>
Step7:数据库和表启用增量日志
<span style="color:rgba(0, 0, 0, 0.75)"><span style="background-color:#ffffff"><span style="color:#000000"><span style="background-color:#282a36"><code class="language-sql"><span style="color:#6272a4">-- 切换至flinkuser用户</span>
sqlplus <span style="color:#6272a4">/</span>nolog
<span style="color:#ff79c6">CONNECT</span> flinkuser<span style="color:#6272a4">/</span>flinkpw
<span style="color:#6272a4">-- 创建customers表</span>
<span style="color:#ff79c6">CREATE</span> <span style="color:#ff79c6">TABLE</span> customers <span style="color:#999999">(</span>
customer_id NUMBER <span style="color:#ff79c6">PRIMARY</span> <span style="color:#ff79c6">KEY</span><span style="color:#999999">,</span>
customer_name VARCHAR2<span style="color:#999999">(</span><span style="color:#f1fa8c">50</span><span style="color:#999999">)</span><span style="color:#999999">,</span>
email VARCHAR2<span style="color:#999999">(</span><span style="color:#f1fa8c">100</span><span style="color:#999999">)</span><span style="color:#999999">,</span>
phone VARCHAR2<span style="color:#999999">(</span><span style="color:#f1fa8c">20</span><span style="color:#999999">)</span>
<span style="color:#999999">)</span> <span style="color:#ff79c6">TABLESPACE</span> LOGMINER_TBS<span style="color:#999999">;</span>
<span style="color:#6272a4">-- 查看LOGMINER_TBS表空间下的所有表</span>
<span style="color:#ff79c6">select</span> tablespace_name<span style="color:#999999">,</span> table_name <span style="color:#ff79c6">from</span> user_tables
<span style="color:#ff79c6">where</span> tablespace_name <span style="color:#6272a4">=</span> <span style="color:#f1fa8c">'LOGMINER_TBS'</span><span style="color:#999999">;</span>
<span style="color:#6272a4">-- 以DBA的权限登录数据库</span>
sqlplus <span style="color:#6272a4">/</span>nolog
<span style="color:#ff79c6">CONNECT</span> sys<span style="color:#6272a4">/</span>system <span style="color:#ff79c6">AS</span> SYSDBA
<span style="color:#6272a4">-- 为LOGMINER_TBS表空间下的customers表启用增强日志记录</span>
<span style="color:#ff79c6">ALTER</span> <span style="color:#ff79c6">TABLE</span> FLINKUSER<span style="color:#999999">.</span>CUSTOMERS <span style="color:#ff79c6">ADD</span> SUPPLEMENTAL LOG <span style="color:#ff79c6">DATA</span> <span style="color:#999999">(</span><span style="color:#ff79c6">ALL</span><span style="color:#999999">)</span> <span style="color:#ff79c6">COLUMNS</span>
<span style="color:#6272a4">-- 为数据库启用增强日志记录:</span>
<span style="color:#ff79c6">ALTER</span> <span style="color:#ff79c6">DATABASE</span> <span style="color:#ff79c6">ADD</span> SUPPLEMENTAL LOG <span style="color:#ff79c6">DATA</span><span style="color:#999999">;</span>
</code></span></span></span></span>
2.4 SQLServer
版本:Microsoft SQL Server 2019 (RTM-CU21) (KB5025808) - 15.0.4316.3 (X64)
2.4.1 安装
Step1:拉取SQL Server 2019 镜像
<span style="color:rgba(0, 0, 0, 0.75)"><span style="background-color:#ffffff"><span style="color:#000000"><span style="background-color:#282a36"><code class="language-shell"><span style="color:#8be9fd">docker</span> pull mcr.microsoft.com/mssql/server:2019-latest
</code></span></span></span></span>
Step2:运行 SQL Server 容器(密码必须是8个字符,并包含字母、数字和特殊字符,如:abc@123456 ,下面映射主机端口为30027)
<span style="color:rgba(0, 0, 0, 0.75)"><span style="background-color:#ffffff"><span style="color:#000000"><span style="background-color:#282a36"><code class="language-shell"><span style="color:#8be9fd">docker</span> run <span style="color:#ee9900">-e</span> <span style="color:#f1fa8c">'ACCEPT_EULA=Y'</span> <span style="color:#ee9900">-e</span> <span style="color:#f1fa8c">'SA_PASSWORD=abc@123456'</span> <span style="color:#ee9900">-p</span> <span style="color:#f1fa8c">30027</span>:1433 <span style="color:#ee9900">--name</span> sql_server_2019 <span style="color:#ee9900">-d</span> mcr.microsoft.com/mssql/server:2019-latest
</code></span></span></span></span>
Step3:验证 SQL Server 容器是否正在运行
<span style="color:rgba(0, 0, 0, 0.75)"><span style="background-color:#ffffff"><span style="color:#000000"><span style="background-color:#282a36"><code>docker ps -a|grep sql_server_2019
</code></span></span></span></span>
2.4.2 CDC 配置
Step1:开启SQLServer代理
<span style="color:rgba(0, 0, 0, 0.75)"><span style="background-color:#ffffff"><span style="color:#000000"><span style="background-color:#282a36"><code class="language-shell"><span style="color:#6272a4">## 使用root用户登录容器</span>
<span style="color:#8be9fd">docker</span> <span style="color:#f1fa8c">exec</span> <span style="color:#ee9900">-it</span> <span style="color:#ee9900">--user</span> root sql_server_2019 <span style="color:#8be9fd">bash</span>
<span style="color:#6272a4">## 进入容器后,执行命令启用Agent</span>
/opt/mssql/bin/mssql-conf <span style="color:#f1fa8c">set</span> sqlagent.enabled <span style="color:#8be9fd">true</span>
<span style="color:#6272a4">## 退出,重启容器</span>
<span style="color:#f1fa8c">exit</span>
<span style="color:#8be9fd">docker</span> restart sql_server_2019
</code></span></span></span></span>
Step2:创建’cdc_test’测试数据库,并使用连接工具登录该数据库,使用以下 SQL 命令启用 CDC 功能
<span style="color:rgba(0, 0, 0, 0.75)"><span style="background-color:#ffffff"><span style="color:#000000"><span style="background-color:#282a36"><code class="language-sql"><span style="color:#6272a4">-- 创建数据库</span>
<span style="color:#ff79c6">CREATE</span> <span style="color:#ff79c6">DATABASE</span> cdc_test<span style="color:#999999">;</span>
<span style="color:#6272a4">-- 启用CDC功能</span>
<span style="color:#ff79c6">EXEC</span> sys<span style="color:#999999">.</span>sp_cdc_enable_db<span style="color:#999999">;</span>
<span style="color:#6272a4">-- 判断当前数据库是否启用了CDC(如果返回1,表示已启用)</span>
<span style="color:#ff79c6">SELECT</span> is_cdc_enabled <span style="color:#ff79c6">FROM</span> sys<span style="color:#999999">.</span><span style="color:#ff79c6">databases</span> <span style="color:#ff79c6">WHERE</span> name <span style="color:#6272a4">=</span> <span style="color:#f1fa8c">'cdc_test'</span><span style="color:#999999">;</span>
</code></span></span></span></span>
Step3:选择要进行 CDC 跟踪的表(这里使用orders表作为演示
)
<span style="color:rgba(0, 0, 0, 0.75)"><span style="background-color:#ffffff"><span style="color:#000000"><span style="background-color:#282a36"><code class="language-sql"><span style="color:#6272a4">-- 创建示例表(orders)</span>
<span style="color:#ff79c6">CREATE</span> <span style="color:#ff79c6">TABLE</span> orders <span style="color:#999999">(</span>
id <span style="color:#ff79c6">int</span><span style="color:#999999">,</span>
order_date <span style="color:#ff79c6">date</span><span style="color:#999999">,</span>
purchaser <span style="color:#ff79c6">int</span><span style="color:#999999">,</span>
quantity <span style="color:#ff79c6">int</span><span style="color:#999999">,</span>
product_id <span style="color:#ff79c6">int</span><span style="color:#999999">,</span>
<span style="color:#ff79c6">PRIMARY</span> <span style="color:#ff79c6">KEY</span> <span style="color:#999999">(</span><span style="color:#999999">[</span>id<span style="color:#999999">]</span><span style="color:#999999">)</span>
<span style="color:#999999">)</span><span style="color:#999999">;</span>
<span style="color:#6272a4">-- schema_name 是表所属的架构(schema)的名称。</span>
<span style="color:#6272a4">-- table_name 是要启用 CDC 跟踪的表的名称。</span>
<span style="color:#6272a4">-- cdc_role 是 CDC 使用的角色的名称。如果没有指定角色名称,系统将创建一个默认角色。</span>
<span style="color:#ff79c6">EXEC</span> sys<span style="color:#999999">.</span>sp_cdc_enable_table
<span style="color:#ee9900">@source_schema</span> <span style="color:#6272a4">=</span> <span style="color:#f1fa8c">'dbo'</span><span style="color:#999999">,</span>
<span style="color:#ee9900">@source_name</span> <span style="color:#6272a4">=</span> <span style="color:#f1fa8c">'orders'</span><span style="color:#999999">,</span>
<span style="color:#ee9900">@role_name</span> <span style="color:#6272a4">=</span> <span style="color:#f1fa8c">'cdc_role'</span><span style="color:#999999">;</span>
</code></span></span></span></span>
3. 验证
如果要验证flink cdc
的功能,需要先下载flink
的安装包,然后下载相应的cdc jar
包并依赖,最后使用安装包里面的sql-client
写相关的flink sql
即可验证。
3.1 Flink版本与CDC版本的对应关系
下载Flink安装包以及jar包前,必须确定Flink CDC与Flink版本关系:
Flink CDC 版本 | Flink 版本 |
---|---|
1.0.0 | 1.11.* |
1.1.0 | 1.11.* |
1.2.0 | 1.12.* |
1.3.0 | 1.12.* |
1.4.0 | 1.13.* |
2.0.* | 1.13.* |
2.1.* | 1.13.* |
2.2.* | 1.13.* , 1.14.* |
2.3.* | 1.13.* , 1.14.* , 1.15.* , 1.16.0 |
2.4.* | 1.13.* , 1.14.* , 1.15.* , 1.16.* , 1.17.0 |
本文以 Flink1.13.6 + Flink CDC 2.2.0 版本为例子演示。
3.2 下载相关包
flink 安装包下载,下载地址:Downloads | Apache Flink
下载cdc相关的jar,根据自己的需求,下载相关的cdc jar:Central Repository: com/ververica
3.3 添加cdc jar 至lib目录
把需要验证的cdc jar放到flink安装包解压之后的lib目录(<FLINK_HOME>/lib/
):
3.4 验证
使用下面的命令启动 Flink 集群:
<span style="color:rgba(0, 0, 0, 0.75)"><span style="background-color:#ffffff"><span style="color:#000000"><span style="background-color:#282a36"><code class="language-shell">./bin/start-cluster.sh
</code></span></span></span></span>
启动成功,可以访问 http://localhost:8081 访问到 Flink Web UI:
使用下面的命令启动 Flink SQL CLI :
<span style="color:rgba(0, 0, 0, 0.75)"><span style="background-color:#ffffff"><span style="color:#000000"><span style="background-color:#282a36"><code class="language-shell">./bin/sql-client.sh
</code></span></span></span></span>
展示如下页面,表示启动flink客户端成功:
执行如下FlinkSQL:
<span style="color:rgba(0, 0, 0, 0.75)"><span style="background-color:#ffffff"><span style="color:#000000"><span style="background-color:#282a36"><code class="language-sql"><span style="color:#ff79c6">CREATE</span> <span style="color:#ff79c6">TABLE</span> t_source_sqlserver <span style="color:#999999">(</span>
id <span style="color:#ff79c6">INT</span><span style="color:#999999">,</span>
order_date <span style="color:#ff79c6">DATE</span><span style="color:#999999">,</span>
purchaser <span style="color:#ff79c6">INT</span><span style="color:#999999">,</span>
quantity <span style="color:#ff79c6">INT</span><span style="color:#999999">,</span>
product_id <span style="color:#ff79c6">INT</span><span style="color:#999999">,</span>
<span style="color:#ff79c6">PRIMARY</span> <span style="color:#ff79c6">KEY</span> <span style="color:#999999">(</span>id<span style="color:#999999">)</span> <span style="color:#6272a4">NOT</span> ENFORCED
<span style="color:#999999">)</span> <span style="color:#ff79c6">WITH</span> <span style="color:#999999">(</span>
<span style="color:#f1fa8c">'connector'</span> <span style="color:#6272a4">=</span> <span style="color:#f1fa8c">'sqlserver-cdc'</span><span style="color:#999999">,</span>
<span style="color:#f1fa8c">'hostname'</span> <span style="color:#6272a4">=</span> <span style="color:#f1fa8c">'10.194.183.120'</span><span style="color:#999999">,</span>
<span style="color:#f1fa8c">'port'</span> <span style="color:#6272a4">=</span> <span style="color:#f1fa8c">'30027'</span><span style="color:#999999">,</span>
<span style="color:#f1fa8c">'username'</span> <span style="color:#6272a4">=</span> <span style="color:#f1fa8c">'sa'</span><span style="color:#999999">,</span>
<span style="color:#f1fa8c">'password'</span> <span style="color:#6272a4">=</span> <span style="color:#f1fa8c">'abc@123456'</span><span style="color:#999999">,</span>
<span style="color:#f1fa8c">'database-name'</span> <span style="color:#6272a4">=</span> <span style="color:#f1fa8c">'cdc_test'</span><span style="color:#999999">,</span>
<span style="color:#f1fa8c">'schema-name'</span> <span style="color:#6272a4">=</span> <span style="color:#f1fa8c">'dbo'</span><span style="color:#999999">,</span>
<span style="color:#f1fa8c">'table-name'</span> <span style="color:#6272a4">=</span> <span style="color:#f1fa8c">'orders'</span>
<span style="color:#999999">)</span><span style="color:#999999">;</span>
</code></span></span></span></span>
可以看到执行成功了:
执行select 语句,以便实时查看该表的数据变动:
<span style="color:rgba(0, 0, 0, 0.75)"><span style="background-color:#ffffff"><span style="color:#000000"><span style="background-color:#282a36"><code class="language-sql"><span style="color:#ff79c6">select</span> <span style="color:#6272a4">*</span> <span style="color:#ff79c6">from</span> t_source_sqlserver<span style="color:#999999">;</span>
</code></span></span></span></span>
从下图,可以看出,只要修改左边的数据,会在控制台实时显示新增删除的数据。
同时,也能在Flink web页面看到任务正在运行:
最后,可以通过如下命令关闭掉Flink启动的集群:
<span style="color:rgba(0, 0, 0, 0.75)"><span style="background-color:#ffffff"><span style="color:#000000"><span style="background-color:#282a36"><code class="language-shell">./stop-cluster.sh
</code></span></span></span></span>