Flink CDC -Sqlserver to Sqlserver java 模版编写

news2024/11/30 4:58:43

1.基本环境

     <flink.version>1.17.0</flink.version>

2. 类文件

package com.flink.tablesql;

import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

import java.io.File;
import java.util.List;

public class Main2 {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        StreamTableEnvironment tabEnv = StreamTableEnvironment.create(env);

        String path = "E:\\test\\flinktestsql\\orders.sql";
//        String path = "/data/flink/flinksql/orders.sql";
        List<String> list = FileUtils.readLines(new File(path),"UTF-8");
        StringBuilder stringBuilder = new StringBuilder("");
        String sql = "";
        for(String var : list){
            if(StringUtils.isNotBlank(var)){
                stringBuilder.append(var);
                if(var.contains("$")){
                    sql = stringBuilder.toString().replace("$","");
                    System.out.println(sql);
                    System.out.println("end-----");
                    tabEnv.executeSql(sql);
                    stringBuilder = new StringBuilder("");
                }else{
                    stringBuilder.append("\n");
                }
            }
        }
    }
}

3.pom文件

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>org.example</groupId>
    <artifactId>flinktestsql</artifactId>
    <version>1.0-SNAPSHOT</version>

<!--    &lt;!&ndash; 指定仓库位置,依次为aliyun、apache和cloudera仓库 &ndash;&gt;-->
    <repositories>
        <repository>
            <id>aliyun</id>
            <url>https://maven.aliyun.com/repository/public</url>
        </repository>
        <repository>
            <id>apache</id>
            <url>https://maven.aliyun.com/repository/apache-snapshots</url>
        </repository>

        <repository>
            <id>cloudera</id>
            <url>https://maven.aliyun.com/repository/gradle-plugin</url>
        </repository>
        <repository>
            <id>central</id>
            <url>https://maven.aliyun.com/repository/central</url>
            <releases>
                <enabled>true</enabled>
            </releases>
            <snapshots>
                <enabled>true</enabled>
            </snapshots>
        </repository>
    </repositories>

    <!--版本-->
    <properties>
        <encoding>UTF-8</encoding>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
        <flink.version>1.17.0</flink.version>
        <slf4j.version>2.0.5</slf4j.version>
        <scala.binary.version>2.11</scala.binary.version>
        <scala.version>2.11.12</scala.version>
    </properties>

    <dependencies>
        <!-- Flink相关依赖 -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-files</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-java</artifactId>
            <version>1.17.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-java-bridge</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner-loader</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-runtime</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>com.microsoft.sqlserver</groupId>
            <artifactId>mssql-jdbc</artifactId>
            <version>11.2.1.jre8</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-json</artifactId>
            <version>1.11.0</version>
        </dependency>
        <dependency>
            <groupId>com.ververica</groupId>
            <artifactId>flink-connector-sqlserver-cdc</artifactId>
            <version>2.4.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-jdbc</artifactId>
            <version>3.1.1-1.17</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-test-utils</artifactId>
            <version>1.17.1</version>
            <scope>test</scope>
        </dependency>

        <!-- 日志管理相关依赖 -->
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <version>${slf4j.version}</version>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <version>${slf4j.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-to-slf4j</artifactId>
            <version>2.19.0</version>
        </dependency>

    </dependencies>

</project>

4.后续只需要修改增加 .sql文件即可

       String path = "E:\\test\\flinktestsql\\orders.sql";

        // String path = "/data/flink/flinksql/orders.sql";

5.以上在本地运行未通过报错,在flink web 界面配置运行可以。我看网上可能是需要修改本地jdk配置,没有修改。

Caused by: com.microsoft.sqlserver.jdbc.SQLServerException: 驱动程序无法通过使用安全套接字层(SSL)加密与 SQL Server 建立安全连接。错误:“sun.security.validator.ValidatorException: PKIX path building failed: sun.security.provider.certpath.SunCertPathBuilderException: unable to find valid certification path to requested target”。 ClientConnectionId:d5fb789f-e4e9-416f-b47c-57dc09429ebf
	at com.microsoft.sqlserver.jdbc.SQLServerConnection.terminate(SQLServerConnection.java:3806)
	at com.microsoft.sqlserver.jdbc.TDSChannel.enableSSL(IOBuffer.java:1906)

6.flink web 端配置如下:只上传jar不行,还需要配置配置类,才可以

7.以上遇到很多问题,总算解决了,

如增加:

<mirror>
<id>aliyunmaven</id>
<mirrorOf>*</mirrorOf>
<name>阿里云公共仓库</name>
<url>https://maven.aliyun.com/repository/public</url>
</mirror>

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1253622.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

leetCode 1026. 节点与其祖先之间的最大差值 + 递归

1026. 节点与其祖先之间的最大差值 - 力扣&#xff08;LeetCode&#xff09; 给定二叉树的根节点 root&#xff0c;找出存在于 不同 节点 A 和 B 之间的最大值 V&#xff0c;其中 V |A.val - B.val|&#xff0c;且 A 是 B 的祖先。&#xff08;如果 A 的任何子节点之一为 B&am…

【单片机学习笔记】STC8H1K08参考手册学习笔记

STC8H1K08参考手册学习笔记 STC8H系列芯片STC8H1K08开发环境串口烧录 STC8H系列芯片 STC8H 系列单片机是不需要外部晶振和外部复位的单片机&#xff0c;是以超强抗干扰/超低价/高速/低功耗为目标的 8051 单片机,在相同的工作频率下,STC8H 系列单片机比传统的 8051约快12 倍速度…

求两对整点坐标连线之间是否存在其他的整点坐标。

证明过程非常的简单&#xff1a; 有两对整点&#xff08;x1&#xff0c;y1&#xff09;&#xff08;x2,y2)&#xff0c;我们现在以(x1&#xff0c;y1)为原点&#xff0c;那么&#xff08;x2&#xff0c;y2)的相对坐标就是&#xff08;x2-x1&#xff0c;y2-y1&#xff09; 设 …

C++ day36 贪心算法 无重叠区间 划分字母区间 合并区间

题目1&#xff1a;435 无重叠区间 题目链接&#xff1a;无重叠区间 对题目的理解 移除数组中的元素&#xff0c;使得区间互不重叠&#xff0c;保证移除的元素数量最少&#xff0c;数组中至少包含一个元素 贪心算法 局部最优&#xff0c;使得重叠区间的个数最大&#xff0c…

mysql高级知识点

一、mysql架构 连接层&#xff1a;负责接收客户端的连接请求&#xff0c;可以进行授权、认证(验证账号密码)。服务层&#xff1a;负责调用sql接口&#xff0c;对sql语法进行解析&#xff0c;对查询进行优化&#xff0c;缓存。引擎层&#xff1a;是真正进行执行sql的地方&#x…

WIFI模块(esp-01s)获取网络时间与天气信息

目录 一、硬件连接 二、获取网络时间 1、AT指令集 2、具体操作 三、获取天气信息 1、心知天气注册 2、AT指令集 3、具体操作 4、json格式检查 一、硬件连接 WiFi模块的RX连接TTL模块的TX&#xff0c; WiFi模块的TX连接TTL模块的RX&#xff0c;电源与地接对。 插入电脑…

Python NeuralProphet库: 高效时间序列预测的利器

更多Python学习内容&#xff1a;ipengtao.com 时间序列数据在许多领域中都扮演着关键的角色&#xff0c;从股票价格到气象数据。为了更准确地预测未来趋势&#xff0c;机器学习领域涌现出许多时间序列预测的方法和工具。其中&#xff0c;NeuralProphet库是一个强大的工具&#…

【涂鸦T2-U】1、开发环境搭建

前言 本章介绍T2-U的开发环境搭建流程&#xff0c;以及一些遇到的问题。 一、资料 试用网址&#xff1a; 【新品体验】涂鸦 T2-U 开发板免费试用 涂鸦官网文档&#xff1a; 涂鸦 T2-U 开发板 T2-U 模组规格书 T2-U 开发板 淘宝(资料较全)&#xff1a; 涂鸦智能 TuyaOS开发…

第二十章 解读PASCAL VOC2012与MS COCO数据集(工具)

PASCAL VOC2012数据集 Pascal VOC2012官网地址&#xff1a;http://host.robots.ox.ac.uk/pascal/VOC/voc2012/ 官方发表关于介绍数据集的文章 《The PASCALVisual Object Classes Challenge: A Retrospective》&#xff1a;http://host.robots.ox.ac.uk/pascal/VOC/pubs/everi…

1.1 C语言之入门:使用Visual Studio Community 2022运行hello world

1.1 使用Visual Studio Community 2022运行c语言的hello world 一、下载安装Visual Studio Community 2022 与 新建项目二、编写c helloworld三、编译、链接、运行 c helloworld1. 问题记录&#xff1a;无法打开源文件"stdio.h"2. 问题记录&#xff1a;调试和执行按钮…

第81篇:JSONP劫持漏洞获取敏感信息原理、复现与坑点总结

Part1 前言 大家好&#xff0c;我是ABC_123。今天我们研究一下JSONP劫持漏洞&#xff0c;早些年这个漏洞主要被攻击者用来窃取个人信息&#xff0c;如姓名、身份证号、家庭住址等&#xff0c;现在更多的用于蜜罐之中&#xff0c;间接溯源红队攻击者的个人身份。好多朋友至今对…

数据结构 / 计算机内存分配

1. Linux 32位系统内存分配 栈(stack): 先进后出, 栈区变量先定义的后分配内存, 栈区地址从高到低分配堆(heap): 先进先出, 栈区变量先定义的先分配内存, 堆区地址从低到高分配堆栈溢出: 表示的是栈区内存耗尽, 称为溢出. 例如: 每次调用递归都需要在栈区申请内存, 如果递归太深…

【深度学习笔记】04 概率论基础

04 概率论基础 概率论公理联合概率条件概率贝叶斯定理边际化独立性期望和方差模拟投掷骰子的概率随投掷次数增加的变化 概率论公理 概率&#xff08;probability&#xff09;可以被认为是将集合映射到真实值的函数。 在给定的样本空间 S \mathcal{S} S中&#xff0c;事件 A \m…

OpenCV快速入门:相机标定——单目视觉和双目视觉

文章目录 前言一、相机标定的基本原理1.1 相机模型与坐标系1.1.1 相机模型1.1.2 坐标系 1.2 相机内参与外参1.2.1 内部参数1.2.2 外部参数 1.3 镜头畸变1.4 透视变换1.5 标定的重要性和应用场景 二、单目视觉2.1 单目视觉的原理2.1.1 单目视觉的原理2.1.2 单目视觉的公式2.1.3 …

Feign 远程调用

目录 代码架构 feign-api 模块解析 架构 依赖 定义接口类 lead-news-article模块 架构 yml配置 依赖 实现类 启动类 lead-news-wemedia模块 架构 调用 启动类 代码架构 feign-api 模块解析 架构 依赖 <dependency><groupId>org.springframework.clo…

vue+elementui如何实现在表格中点击按钮预览图片?

效果图如上&#xff1a; 使用el-image-viewer 重点 &#xff1a; 引入 import ElImageViewer from "element-ui/packages/image/src/image-viewer"; <template><div class"preview-table"><el-table border :data"tableData" …

【计算机网络笔记】多路访问控制(MAC)协议——随机访问MAC协议

系列文章目录 什么是计算机网络&#xff1f; 什么是网络协议&#xff1f; 计算机网络的结构 数据交换之电路交换 数据交换之报文交换和分组交换 分组交换 vs 电路交换 计算机网络性能&#xff08;1&#xff09;——速率、带宽、延迟 计算机网络性能&#xff08;2&#xff09;…

Linux操作系统之apt常用命令记录

文章目录 apt 命令apt 语法apt 常用命令列出所有可更新的软件清单命令升级软件包列出可更新的软件包及版本信息升级软件包&#xff0c;升级前先删除需要更新软件包安装指定的软件命令&#xff1a;安装多个软件包&#xff1a;更新指定的软件命令显示软件包具体信息,例如&#xf…

3. 内存单元

1位的内存单元 对于一个内存单元需要有:1个锁存器,数据输入,可写控制,是否读取(也是是否输出), 行和列(内存地址), 数据输出这几部分组成写入: 当行和列, 数据输入,可写全为1时则写入,(行 & 列 & 输入 & 可写)读出(输出): 当 行,列, 是否读取(也是是否输出) ( 行 …

C语言进阶之路-运算符小怪篇

目录 一、学习目标 二、运算符详谈 算术运算符 关系运算符 逻辑运算符 位运算符 特殊运算符 条件运算符 sizeof 运算符 打怪实战 三、控制流 二路分支 多路分支 const while与 do…while循环 语法&#xff1a; for循环 break与continue goto语句&#xff08…