1.基本环境
<flink.version>1.17.0</flink.version>
2. 类文件
package com.flink.tablesql;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import java.io.File;
import java.util.List;
public class Main2 {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
StreamTableEnvironment tabEnv = StreamTableEnvironment.create(env);
String path = "E:\\test\\flinktestsql\\orders.sql";
// String path = "/data/flink/flinksql/orders.sql";
List<String> list = FileUtils.readLines(new File(path),"UTF-8");
StringBuilder stringBuilder = new StringBuilder("");
String sql = "";
for(String var : list){
if(StringUtils.isNotBlank(var)){
stringBuilder.append(var);
if(var.contains("$")){
sql = stringBuilder.toString().replace("$","");
System.out.println(sql);
System.out.println("end-----");
tabEnv.executeSql(sql);
stringBuilder = new StringBuilder("");
}else{
stringBuilder.append("\n");
}
}
}
}
}
3.pom文件
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.example</groupId>
<artifactId>flinktestsql</artifactId>
<version>1.0-SNAPSHOT</version>
<!-- <!– 指定仓库位置,依次为aliyun、apache和cloudera仓库 –>-->
<repositories>
<repository>
<id>aliyun</id>
<url>https://maven.aliyun.com/repository/public</url>
</repository>
<repository>
<id>apache</id>
<url>https://maven.aliyun.com/repository/apache-snapshots</url>
</repository>
<repository>
<id>cloudera</id>
<url>https://maven.aliyun.com/repository/gradle-plugin</url>
</repository>
<repository>
<id>central</id>
<url>https://maven.aliyun.com/repository/central</url>
<releases>
<enabled>true</enabled>
</releases>
<snapshots>
<enabled>true</enabled>
</snapshots>
</repository>
</repositories>
<!--版本-->
<properties>
<encoding>UTF-8</encoding>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<flink.version>1.17.0</flink.version>
<slf4j.version>2.0.5</slf4j.version>
<scala.binary.version>2.11</scala.binary.version>
<scala.version>2.11.12</scala.version>
</properties>
<dependencies>
<!-- Flink相关依赖 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-files</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java</artifactId>
<version>1.17.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-loader</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-runtime</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>com.microsoft.sqlserver</groupId>
<artifactId>mssql-jdbc</artifactId>
<version>11.2.1.jre8</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-json</artifactId>
<version>1.11.0</version>
</dependency>
<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-connector-sqlserver-cdc</artifactId>
<version>2.4.2</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-jdbc</artifactId>
<version>3.1.1-1.17</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-test-utils</artifactId>
<version>1.17.1</version>
<scope>test</scope>
</dependency>
<!-- 日志管理相关依赖 -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>${slf4j.version}</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>${slf4j.version}</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-to-slf4j</artifactId>
<version>2.19.0</version>
</dependency>
</dependencies>
</project>
4.后续只需要修改增加 .sql文件即可
String path = "E:\\test\\flinktestsql\\orders.sql";
// String path = "/data/flink/flinksql/orders.sql";
5.以上在本地运行未通过报错,在flink web 界面配置运行可以。我看网上可能是需要修改本地jdk配置,没有修改。
Caused by: com.microsoft.sqlserver.jdbc.SQLServerException: 驱动程序无法通过使用安全套接字层(SSL)加密与 SQL Server 建立安全连接。错误:“sun.security.validator.ValidatorException: PKIX path building failed: sun.security.provider.certpath.SunCertPathBuilderException: unable to find valid certification path to requested target”。 ClientConnectionId:d5fb789f-e4e9-416f-b47c-57dc09429ebf
at com.microsoft.sqlserver.jdbc.SQLServerConnection.terminate(SQLServerConnection.java:3806)
at com.microsoft.sqlserver.jdbc.TDSChannel.enableSSL(IOBuffer.java:1906)
6.flink web 端配置如下:只上传jar不行,还需要配置配置类,才可以
7.以上遇到很多问题,总算解决了,
如增加:
<mirror>
<id>aliyunmaven</id>
<mirrorOf>*</mirrorOf>
<name>阿里云公共仓库</name>
<url>https://maven.aliyun.com/repository/public</url>
</mirror>