什么是Flink
Flink是一个开源的流处理
和批处理
框架,它能够处理无界和有界的数据流,具有高吞吐量、低延迟和容错性等特点
Flink 可以应用于多个领域如:实时数据处理、数据分析、机器学习、事件驱动等。
什么是流式处理?什么是批处理
流处理是一种针对实时数据流进行连续处理的技术。它的数据通常是无界,数据以持续不断的流的形式到达。
批处理是一种将大量数据集合在一起进行统一处理的技术。在批处理中,首先要收集存储数据,批处理通常用于处理历史数据或离线数据
下载与安装
flink 依赖jdk ,版本推荐 Java 8 or 11
flink 下载与安装
本文使用的是 flink-1.17.2-bin-scala_2.12.tgz
tar -xzf flink-*.tgz
web UI 配置
vim ./conf/flink-conf.yaml
rest.bind-address: 0.0.0.0
启动与停止
./bin/start-cluster.sh
输入 ip:8081
进入UI 管理页面
Flink WebUI 页面
一个简单的例子
新建Maven 项目
添加maven 依赖
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<flink.version>1.14.4</flink.version>
<target.java.version>1.8</target.java.version>
<scala.binary.version>2.11</scala.binary.version>
<maven.compiler.source>${target.java.version}</maven.compiler.source>
<maven.compiler.target>${target.java.version}</maven.compiler.target>
<log4j.version>2.17.1</log4j.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-walkthrough-common_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- This dependency is provided, because it should not be packaged into the JAR file. -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
<version>${log4j.version}</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
<version>${log4j.version}</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>${log4j.version}</version>
<scope>runtime</scope>
</dependency>
</dependencies>
官方文档一个简单的Demo
package com.codetonight;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.api.common.functions.FilterFunction;
public class Example {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Person> flintstones = env.fromElements(
new Person("Fred", 35),
new Person("Wilma", 35),
new Person("Pebbles", 2));
DataStream<Person> adults = flintstones.filter(new FilterFunction<Person>() {
@Override
public boolean filter(Person person) throws Exception {
return person.age >= 18;
}
});
adults.print();
env.execute();
}
public static class Person {
public String name;
public Integer age;
public Person() {}
public Person(String name, Integer age) {
this.name = name;
this.age = age;
}
public String toString() {
return this.name.toString() + ": age " + this.age.toString();
}
}
}
本地 idea 运行
本地启动报java.lang.NoClassDefFoundError: org/apache/flink/api/common/functions/FlatMapFunction
时,
idea 需要勾选 add dependencies with provided scope to classpath
操作路径 Edit Configurations
提交任务到集群
通过UI页面提交Flink 任务,操作路径
Submit New Job
->Add New
任务提交
上传jar,填写处理任务类(包含main 方法)的类全路径
Jobs菜单下可以查看 运行中 和 已完成的 任务
查看任务的日志