Flink 1.18安装 及配置 postgres12 同步到mysql5.7(Flink sql 方式)

news2024/11/28 13:37:07

文章目录

  • 1、参考
  • 2、flink 常见部署模式组合
  • 3、Standalone 安装
    • 3.1 单节点安装
    • 3.2 问题1
    • 3.3 修改ui 端口
    • 3.4 使用ip访问
  • 4 flink sql postgres --->mysql
    • 4.1 配置postgres 12
    • 4.2 新建用户并赋权
    • 4.3. 发布表
    • 4.4 Flink sql
    • 4.5 Could not find any factory for identifier 'postgres-cdc' that implements 'org.apache.flink.table.factories.DynamicTableFactory'
    • 4.6 Caused by: java.io.StreamCorruptedException: unexpected block data
    • 4.7 FLink:Missing required options are: slot.name
    • 4.8 ERROR: relation "pg_publication" does not exist
    • 4.9 Flink:job报错NoResourceAvailableException: Could not acquire the minimum required resources

1、参考

Flink -3- 一文详解安装部署以及使用和调优(standalone 模式 | yarn 模式)

flink-cdc

2、flink 常见部署模式组合

在这里插入图片描述

3、Standalone 安装

3.1 单节点安装

flink 下载地址:https://flink.apache.org/downloads/

下载 flink 安装包:flink-1.18.1-bin-scala_2.12.tgz

安装在基础环境 192.168.1.51


cd /home/module


tar -xzf flink-1.18.1-bin-scala_2.12.tgz


mv flink-1.18.1 flink

3.2 问题1

The file .flink-runtime.version.properties has not been generated correctly. You MUST run ‘mvn generate-sources’ in the flink-runtime module

解决:把jdk 升级1.8.421 就可以了

3.3 修改ui 端口

conf/flink-conf.yaml
rest.port: 8086

3.4 使用ip访问

在这里插入图片描述

4 flink sql postgres —>mysql

4.1 配置postgres 12

vi /var/lib/postgresql/data/postgresql.conf

vi /var/lib/postgresql/data/postgresql.conf



# 更改wal日志方式为logical(方式有:minimal、replica 、logical  )
wal_level = logical  

# 更改solts最大数量(默认值为10),flink-cdc默认一张表占用一个slots
max_replication_slots = 20

# 更改wal发送最大进程数(默认值为10),这个值和上面的solts设置一样
max_wal_senders = 20     

# 中断那些停止活动超过指定毫秒数的复制连接,可以适当设置大一点(默认60s,0表示禁用)
wal_sender_timeout = 180s

重启 postgres

4.2 新建用户并赋权

先创建数据库和表:

-- 创建数据库 test_db
CREATE DATABASE test_db;

-- 连接到新创建的数据库 test_db
\c test_db

-- 创建 t_user 表CREATE TABLE "public"."t_user" (
    "id" int8 NOT NULL,
    "name" varchar(255),
    "age" int2,
    PRIMARY KEY ("id")
);

新建用户并且给用户权限:


-- pg新建用户
CREATE USER test1 WITH PASSWORD 'test123';

-- 给用户复制流权限
ALTER ROLE test1 replication;

-- 给用户登录数据库权限
GRANT CONNECT ON DATABASE test_db to test1;

-- 把当前库public下所有表查询权限赋给用户
GRANT ALL PRIVILEGES ON ALL TABLES IN SCHEMA public TO test1;

4.3. 发布表


-- 设置发布为true
update pg_publication set puballtables=true where pubname is not null;

-- 把所有表进行发布
CREATE PUBLICATION dbz_publication FOR ALL TABLES;

-- 查询哪些表已经发布
select * from pg_publication_tables;

更改表的复制标识包含更新和删除的值:


-- 更改复制标识包含更新和删除之前值(目的是为了确保表 t_user 在实时同步过程中能够正确地捕获并同步更新和删除的数据变化。如果不执行这两条语句,那么 t_user 表的复制标识可能默认为 NOTHING,这可能导致实时同步时丢失更新和删除的数据行信息,从而影响同步的准确性)
ALTER TABLE t_user REPLICA IDENTITY FULL;

-- 查看复制标识(为f标识说明设置成功,f(表示 full),否则为 n(表示 nothing),即复制标识未设置)
select relreplident from pg_class where relname='t_user';

4.4 Flink sql

Flink sql 客户端开启: ./sql-client.sh

CREATE TABLE `table_source_pg` (
      id BIGINT,
      name STRING,
      age INT
      ) WITH (
      'connector' = 'postgres-cdc',
      'hostname' = '192.168.1.115',
      'port' = '5432',
      'username' = 'test1',
      'password' = 'xxxxxx',
      'database-name' = 'test_db',
      'schema-name' = 'public',
      'table-name' = 't_user',
      'decoding.plugin.name' = 'pgoutput',
			'slot.name'= 'flink'
);


CREATE TABLE `table_sink_mysql` (
      id BIGINT,
      name STRING,
      age INT,
      PRIMARY KEY (`id`) NOT ENFORCED
      ) WITH (
      'connector' = 'jdbc',
      'url' = 'jdbc:mysql://192.168.1.51:3306/test',
      'username' = 'root',
      'password' = 'xxxxxx',
      'table-name' = 't_user_copy'
);

INSERT INTO `table_sink_mysql` (`id`, `name`, `age`) (SELECT `id`, `name`, `age` FROM `table_source_pg`);

4.5 Could not find any factory for identifier ‘postgres-cdc’ that implements ‘org.apache.flink.table.factories.DynamicTableFactory’

Flink SQL> INSERT INTO table_sink_mysql (id, name, age) (SELECT id, name, age FROM table_source_pg);
[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.table.api.ValidationException: Could not find any factory for identifier ‘postgres-cdc’ that implements ‘org.apache.flink.table.factories.DynamicTableFactory’ in the classpath.

解决: 在flink -->lib 目录下增加如下jar

-rw-r--r--. 1 root root 23715175 10月 15 19:04 flink-sql-connector-mysql-cdc-3.0.1.jar
-rw-r--r--. 1 root root 19379756 10月 15 17:01 flink-sql-connector-postgres-cdc-3.0.1.jar
-rw-r--r--. 1 root root 385471 10月 15 19:27 flink-connector-jdbc-3.2.0.jar
-rw-r--r--. 1 root root 2480823 10月 15 19:33 mysql-connector-j-8.0.32.jar

4.6 Caused by: java.io.StreamCorruptedException: unexpected block data

解决方案:在flink的flink-conf.yaml文件中添加classloader.resolve-order: parent-first 改成parent-first,重启集群即可

4.7 FLink:Missing required options are: slot.name

在这里插入图片描述

4.8 ERROR: relation “pg_publication” does not exist

这个问题在postgres 12上不存在,是在9.6中存在的

4.9 Flink:job报错NoResourceAvailableException: Could not acquire the minimum required resources

1、修改如下参数:

jobmanager.memory.process.size: 2600m

taskmanager.memory.process.size: 2728m

taskmanager.memory.flink.size: 2280m

taskmanager.numberOfTaskSlots: 50

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2222423.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

落实“双碳”行动,深兰科技推动分子能源技术在AI硬件产品领域的应用及产业化进程

10月21日,上海气候周分子能研究中心(筹)成立仪式在上海环境能源交易所举行。仪式上,深兰科技践行“双碳”目标,与上海东八能源技术有限公司签署分子能源AI应用产业化合作协议。 根据协议,国际分子能量发电开拓者、上海气候周分子能…

Notepad++将搜索内容所在行选中,并进行复制等操作

背景 Notepad在非常多的数据行内容中,按照指定内容检索,并定位到具体行,而后对内容行的数据进行复制、剪切、删除等处理动作。 操作说明 检索并标记所在行 弹出搜索框:按下 Ctrl F。 输入查找字符串:在搜索框中输入要…

YOLOv11入门到入土使用教程(含结构图)

一、简介 YOLOv11是Ultralytics公司在之前的YOLO版本上推出的最新一代实时目标检测器,支持目标检测、追踪、实力分割、图像分类和姿态估计等任务。官方代码:ultralytics/ultralytics:ultralytics YOLO11 🚀 (github.com)https://g…

【小洛的VLOG】Web 服务器高并发压力测试(Reactor模型测试)

目录 引言 工具介绍 环境介绍 测试结果 个人主页:东洛的克莱斯韦克-CSDN博客 引言 大部分的网络通信都是支持TCP/IP协议栈,为了保证通信的可靠性,客户端和服务端之间需要建立链接。服务端能并发处理多少个链接,平均每秒钟能处理…

QML----Webengineview点击网页上的下载没反应,下载文件

问题 使用webe加载网页时,点击下载页面会没有反应。原因就是它默认是关闭下载功能 解决 需要在profile里监听下载事件打开onDownloadRequested,当有下载时会触发这个信号,会获取到一个WebEngineDownloadItem这是下载的东西,查询它的一些相关参数,可以修改路径和开…

项目一:3-8译码器的设计与实现(FPGA)

本文以Altera公司生产的Cyclone IV系列的EP4CE15F17C8为主芯片的CRD500开发板作为项目的硬件实现平台,并以Quarter 18.1和ModelSim为开发工具和仿真工具。 目录 一、3-8译码器工作原理 二、设计步骤 1、创建工程文件夹和编辑设计文件 (1)…

(三)将PaddleOCR编译成dll通过Java调用实现ocr识别

说明: 本文编译的PaddleOCR版本:v2.8.1,关于windows下如何生成c项目及如何编译PaddleOCR请参照我的上一篇文章《(二)Windows通过vs c编译PaddleOCR-2.8.1-CSDN博客》,本文是上一个篇文章的延伸。 背景&…

douyin uid转sec_uid 各种进行转换

第一步输入uid: 进行转换: 同时支持接口转换,批量转换,是一个很实用的工具 uid转sec_uid

微信小程序上传图片添加水印

微信小程序使用wx.chooseMedia拍摄或从手机相册中选择图片并添加水印&#xff0c; 代码如下&#xff1a; // WXML代码&#xff1a;<canvas canvas-id"watermarkCanvas" style"width: {{canvasWidth}}px; height: {{canvasHeight}}px;"></canvas&…

如何使用 Spring Cloud 实现客户端负载平衡

微服务系统通常运行每个服务的多个实例。这是实施弹性所必需的。因此&#xff0c;在这些实例之间分配负载非常重要。执行此操作的组件是负载均衡器。Spring 提供了一个 Spring Cloud Load Balancer 库。在本文中&#xff0c;您将学习如何使用它在 Spring Boot 项目中实现客户端…

QPainterPath路径类

函数drawPath()绘制的是一个复合的图形&#xff0c;它使用一个QPainterPath类型的参数作为绘图的对象,QPainterPath类用于记录绘图的操作顺序&#xff0c;优点是绘制复杂图形时只需要创建一个painterpath,然后重复调用就可以了 在使用QPainterPath把路径画好之后&#xff0c;我…

脚本-把B站缓存m4s文件转换成mp4格式

js脚本&#xff0c;自动处理视频 1. 需求简介1.1 pc安装b站客户端1.2 设置视频缓存目录1.3 找个视频缓存1.4 打开缓存文件夹![在这里插入图片描述](https://i-blog.csdnimg.cn/direct/0eb346a84d5f42a7908f1d39bf410c3b.png)1.5 用notepad编辑后缀m4s文件&#xff0c;删除文件内…

Windows系统启动MongoDB报错无法连接服务器

文章目录 发现问题解决办法 发现问题 1&#xff09;、先是发现执行 mongo 命令&#xff0c;启动报错&#xff1a; error: MongoNetworkError: connect ECONNREFUSED 127.0.0.1:27017&#xff1b; 2&#xff09;、再检查 MongoDB 进程 tasklist | findstr mongo 发现没有进程&a…

澳元/美元价格预测:不排除跌至0.6600的可能

澳元/美元一路下跌至0.6620附近。美元保持强劲上涨势头&#xff0c;升至创下三个月新高。汇价的下跌让关键的200日均线受到考验。 澳元/美元周三再度遭遇抛售兴趣&#xff0c;迅速扭转周二的多头尝试&#xff0c;滑落至0.6630附近的新低。这次急剧下跌也对关键的200日均线构成…

yjs机器学习常见算法01——KNN(02)Kd树

1.什么是Kd树&#xff0c;为什么要引入Kd树 knn是寻找k个邻近的点&#xff0c;在这个过程中&#xff0c;需要一个点一个点的与未分类点进行比较&#xff0c;这样的时间复杂度非常高&#xff0c;因此引入了一种原理类似二叉树的Kd树&#xff0c;以减少比较搜索的次数。 kd树的本…

PyTorch求导相关

PyTorch是动态图&#xff0c;即计算图的搭建和运算是同时的&#xff0c;随时可以输出结果&#xff1b;而TensorFlow是静态图。 在pytorch的计算图里只有两种元素&#xff1a;数据&#xff08;tensor&#xff09;和 运算&#xff08;operation&#xff09; 运算包括了&#xf…

Psychophysiology:脑-心交互如何影响个体的情绪体验?

摘要 情绪的主观体验与对身体(例如心脏)活动变化的情境感知和评估相关。情绪唤醒增加与高频心率变异性(HF-HRV)降低、EEG顶枕区α功率降低以及心跳诱发电位(HEP)振幅较高有关。本研究使用沉浸式虚拟现实(VR)技术来研究与情绪唤醒相关的脑心相互作用&#xff0c;以实现自然而可…

SSM考研科目学习APP-计算机毕业设计源码90377

摘 要 基于Android的考研科目学习系统的设计与实现&#xff0c;旨在为广大考研学子提供一个便捷、高效的学习平台。该系统充分利用Android操作系统的广泛普及与灵活定制性&#xff0c;结合考研科目的特点和需求&#xff0c;实现了个性化的学习方案、丰富的题库资源以及智能化…

【个人同步与备份】电脑(Windows)与手机/平板(Android)之间文件同步

文章目录 1. syncthing软件下载2. syncthing的使用2.1. 添加设备2.1.1. syncthing具备设备发现功能&#xff0c;因此安装好软件&#xff0c;只需确认设备信息是否对应即可2.1.2. 如果没有发现到&#xff0c;可以通过设备ID连接2.1.3. 设置GUI身份验证用户&#xff0c;让无关设备…

LeetCode: 3274. 检查棋盘方格颜色是否相同

一、题目 给你两个字符串 coordinate1 和 coordinate2&#xff0c;代表 8 x 8 国际象棋棋盘上的两个方格的坐标。   以下是棋盘的参考图。   如果这两个方格颜色相同&#xff0c;返回 true&#xff0c;否则返回 false。   坐标总是表示有效的棋盘方格。坐标的格式总是先…