文章目录
- 1、基础环境
- 2、开发环境
- 2.1、pom.xml
- 2.2、log4j.properties
- 2.3、测试用的代码
- 2.3.1、Kafka工具
- 2.3.3、Flink执行环境工具
- 2.3.3、测试Flink读写Kafka
- 2.3.4、测试FlinkSQL读写Kafka
- 2.4、打包后上传到服务器
- 3、生产环境
- 3.1、Flink安装
- 3.2、Flink on YARN下3种模式
- 3.2.1、Session-Cluster
- 3.2.2、Per-Job Cluster Mode
- 3.2.3、Application Mode
- 3.3、关闭Flink
1、基础环境
- 开发环境:Windows
WIN10+JDK1.8+IDEA2022.3+Maven3.8.1 - 生产环境:Linux
CentOS7.5+JDK1.8+Hadoop3.1.3+Kafka3.0.0
Flink集群生产环境常用部署模式为 YARN模式
2、开发环境
创建Maven项目
2.1、pom.xml
本依赖支持:Flink、FlinkSQL、Flink读写Kafka、JSON解析
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<flink.version>1.13.6</flink.version>
<scala.version>2.12</scala.version>
<hadoop.version>3.1.3</hadoop.version>
<slf4j.version>2.0.5</slf4j.version>
<log4j.version>2.19.0</log4j.version>
<fastjson.version>1.2.83</fastjson.version>
</properties>
<!-- https://mvnrepository.com/ -->
<dependencies>
<!-- Flink -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_${scala.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_${scala.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime-web_${scala.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- Flink_Kafka -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_${scala.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- FlinkSQL -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_${scala.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- 'format'='json' -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-json</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- Checkpoint保存到HDFS -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>${hadoop.version}</version>
</dependency>
<!-- JSON解析 -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>${fastjson.version}</version>
</dependency>
<!--Flink默认使用slf4j记录日志,相当于一个日志的接口,此处使用log4j作为具体的日志实现 -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>${slf4j.version}</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>${slf4j.version}</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-to-slf4j</artifactId>
<version>${log4j.version}</version>
</dependency>
</dependencies>
<!-- 打包插件 -->
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.1.1</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<artifactSet>
<excludes>
<exclude>com.google.code.findbugs:jsr305</exclude>
<exclude>org.slf4j:*</exclude>
<exclude>log4j:*</exclude>
</excludes>
</artifactSet>
<filters>
<filter>
<!-- 不要复制META-INF文件夹中的签名,否则,在使用JAR时可能会导致SecurityExceptions -->
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<transformers combine.children="append">
<transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer">
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
2.2、log4j.properties
log4j.rootLogger=error, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
2.3、测试用的代码
代码架构
2.3.1、Kafka工具
package org.example;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import java.util.Properties;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
public class KafkaTool {
private static final String BOOTSTRAP_SERVER = "hadoop105:9092,hadoop106:9092,hadoop107:9092";
private static final String CONSUMER_GROUP_ID = "Flink01";
public static FlinkKafkaProducer<String> getFlinkKafkaProducer(String topic) {
return new FlinkKafkaProducer<>(BOOTSTRAP_SERVER, topic, new SimpleStringSchema());
}
public static FlinkKafkaConsumer<String> getFlinkKafkaConsumer(String topic) {
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", BOOTSTRAP_SERVER);
properties.setProperty("group.id", CONSUMER_GROUP_ID);
properties.setProperty("auto.offset.reset", "latest");
return new FlinkKafkaConsumer<>(topic, new SimpleStringSchema(), properties);
}
public static DataStreamSource<String> getKafkaSource(StreamExecutionEnvironment env, String topic) {
return env.addSource(getFlinkKafkaConsumer(topic));
}
public static String getInputTable(String topic) {
return " WITH ("
+ "'connector'='kafka',"
+ "'topic'='" + topic + "',"
+ "'properties.bootstrap.servers'='" + BOOTSTRAP_SERVER + "',"
+ "'properties.group.id'='" + CONSUMER_GROUP_ID + "',"
+ "'scan.startup.mode'='latest-offset',"
+ "'format'='json'"
+ ")";
}
public static String getOutputTable(String topic) {
return " WITH ("
+ "'connector'='kafka',"
+ "'topic'='" + topic + "',"
+ "'properties.bootstrap.servers'='" + BOOTSTRAP_SERVER + "',"
+ "'format'='json'"
+ ")";
}
}
2.3.3、Flink执行环境工具
package org.example;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.runtime.state.hashmap.HashMapStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class FlinkEnv {
private final static String CHECKPOINT_DIRECTORY = "hdfs://hadoop105:8020/Flink/Checkpoint";
private final static String HADOOP_USER_NAME = "hjw";
public static StreamExecutionEnvironment getEnv() {
//创建流执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//设置并行度(=Kafka分区数)
env.setParallelism(3);
//获取checkpoint撇嘴
CheckpointConfig checkpointConfig = env.getCheckpointConfig();
//开启CheckPoint:每隔5分钟1次,精准一次模式
env.enableCheckpointing(300 * 1000L, CheckpointingMode.EXACTLY_ONCE);
//设置CheckPoint超时:10分钟
checkpointConfig.setCheckpointTimeout(600 * 1000L);
//设置Checkpoint最大数量(10/5=2)
checkpointConfig.setMaxConcurrentCheckpoints(2);
//设置重启策略:重启3次,执行延时5秒
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 5000L));
//设置状态后端
env.setStateBackend(new HashMapStateBackend());
checkpointConfig.setCheckpointStorage(CHECKPOINT_DIRECTORY);
System.setProperty("HADOOP_USER_NAME", HADOOP_USER_NAME);
//返回
return env;
}
}
2.3.3、测试Flink读写Kafka
package org.example.test;
import com.alibaba.fastjson.JSONObject;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.example.FlinkEnv;
import org.example.KafkaTool;
public class FlinkTest {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = FlinkEnv.getEnv();
DataStreamSource<String> kafkaSource = KafkaTool.getKafkaSource(env, "topic01");
SingleOutputStreamOperator<String> s = kafkaSource.map(JSONObject::parseObject).map(Object::toString);
s.addSink(KafkaTool.getFlinkKafkaProducer("topic02"));
env.execute();
}
}
2.3.4、测试FlinkSQL读写Kafka
package org.example.test;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.example.FlinkEnv;
import org.example.KafkaTool;
public class FlinkSqlTest {
public static void main(String[] args) {
//TODO 1 创建执行环境
StreamExecutionEnvironment env = FlinkEnv.getEnv();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
//TODO 2 数据来源
tableEnv.executeSql("CREATE TABLE tb1 (database STRING, ts BIGINT, data STRING, type STRING)"
+ KafkaTool.getInputTable("topic02"));
//TODO 3 数据终点
tableEnv.executeSql("CREATE TABLE tb2 (database STRING, ts BIGINT, data STRING, type STRING)"
+ KafkaTool.getOutputTable("topic03"));
//TODO 4 插入数据
tableEnv.executeSql("INSERT INTO tb2 SELECT * FROM tb1 WHERE type IN ('insert','update')");
}
}
2.4、打包后上传到服务器
打包后
把带original
的jar上传到服务器(文件更小,上传更快)
3、生产环境
Flink集群YARN模式
- 把Flink应用提交给YARN的ResourceManager
- 从NodeManager上申请容器
- 在容器内创建JobManager和TaskManager
- 根据运行在JobManger上的Job的需要的插槽数量,动态地给TaskManager分配资源
3.1、Flink安装
1、解压
wget -b https://archive.apache.org/dist/flink/flink-1.13.6/flink-1.13.6-bin-scala_2.12.tgz
tar -zxf flink-1.13.6-bin-scala_2.12.tgz
mv flink-1.13.6 /opt/module/flink
2、环境变量
vim /etc/profile.d/my_env.sh
export HADOOP_CLASSPATH=`hadoop classpath`
3、分发环境变量
source ~/bin/source.sh
4、Per-Job-Cluster时报错:Exception in thread “Thread-5” java.lang.IllegalStateException:
Trying to access closed classloader.
Please check if you store classloaders directly or indirectly in static fields.
If the stacktrace suggests that the leak occurs in a third party library and cannot be fixed immediately,
you can disable this check with the configuration ‘classloader.check-leaked-classloader’.
对此,编辑配置文件
vim /opt/module/flink/conf/flink-conf.yaml
在配置文件添加下面这行,可解决上面报错
classloader.check-leaked-classloader: false
5、下载Flink-Kafka和fastjson的jar(去Maven官网找链接)
cd /opt/module/flink/lib
wget https://repo1.maven.org/maven2/org/apache/flink/flink-sql-connector-kafka_2.12/1.13.6/flink-sql-connector-kafka_2.12-1.13.6.jar
wget https://repo1.maven.org/maven2/com/alibaba/fastjson/1.2.83/fastjson-1.2.83.jar
3.2、Flink on YARN下3种模式
名称 | 译名 | 模式 | 说明 | 适用场景 |
---|---|---|---|---|
Session-Cluster | 会话模式 | 多个Job对应1个Flink集群 | Flink集群向YARN申请资源并常驻,可接收多个作业 | 测试 |
Per-Job-Cluster | 独立作业模式 | 1个Job对应1个Flink集群 | 用户的main 函数在客户端执行 | 生产前 |
Application Mode | 应用模式 | 1个Job对应1个Flink集群 | 用户的main 函数在集群中(JobManager)执行 | 生产 |
Session-Cluster
在YARN中初始化一个Flink集群,开辟一定的资源
这个Flink集群会常驻在YARN集群中,可持续接收Job
若资源用完,则新的Job不能正常提交
Per-Job-Cluster、Application Mode
每次提交都会创建一个新的Flink集群,任务之间互相独立
任务执行完成之后,Flink集群也会消失
3.2.1、Session-Cluster
1、开启会话
/opt/module/flink/bin/yarn-session.sh \
-s 1 \
-jm 1024 \
-tm 1024 \
-nm a1 \
-d
参数 | 说明 |
---|---|
-s,--slots <arg> | 每个TaskManager的slot数量() |
-jm,--jobManagerMemory <arg> | JobManager的内存(单位默认MB) |
-tm,--taskManagerMemory <arg> | 每个TaskManager的内存(单位默认MB) |
-nm,--name <arg> | YARN上应用的名字 |
-d,--detached | 以分离模式运行作业(后台执行) |
-h,--help | 查看帮助 |
2、在会话上运行jar
/opt/module/flink/bin/flink run -c org.example.test.FlinkTest FlinkDW-1.0-SNAPSHOT.jar
/opt/module/flink/bin/flink run -c org.example.test.FlinkSqlTest FlinkDW-1.0-SNAPSHOT.jar
3.2.2、Per-Job Cluster Mode
/opt/module/flink/bin/flink run \
-t yarn-per-job \
-nm a2 \
-ys 1 \
-yjm 1024 \
-ytm 1024 \
-c org.example.test.FlinkSqlTest \
original-FlinkDW-1.0-SNAPSHOT.jar
参数 | 说明 |
---|---|
-ys,--yarnslots <arg> | Number of slots per TaskManager |
-yjm,--yarnjobManagerMemory <arg> | Memory for JobManager Container with optional unit (default: MB) |
-ytm,--yarntaskManagerMemory <arg> | Memory per TaskManager Container with optional unit (default: MB) |
-c,--class <classname> | Class with the program entry point (“main()” method). Only needed if the JAR file does not specify the class in its manifest. |
3.2.3、Application Mode
/opt/module/flink/bin/flink run-application \
-t yarn-application \
-nm a3 \
-ys 1 \
-yjm 1024 \
-ytm 1024 \
-c org.example.test.FlinkTest \
original-FlinkDW-1.0-SNAPSHOT.jar
3.3、关闭Flink
yarn top
可动态查看应用状况:应用名、应用所属用户、应用开始时间、容器数量、内存和CPU消耗…
yarn top
使用
yarn application --list
查看应用ID,使用yarn application --kill
关闭会话