文章目录
- Flink基础
- 今日课程内容目标
- 为什么要学Flink
- 技术更新迭代
- 市场需求
- 流式计算
- 批量计算
- 概念
- 特点
- 批量计算的优势和弊端
- 流式计算
- 生活中流场景
- 流式计算的概念
- Flink简介
- Flink历史
- Flink介绍
- Flink架构体系
- 已学过的框架技术
- Flink架构
- Flink集群搭建
- Flink的集群模式
- Standalone模式集群搭建
- 安装部署配置
- demo案例运行
- Flink入门案例
- Flink分层API
- Flink程序开发流程
- 搭建Flink工程
- 基于mvn创建项目
- 引入的基本依赖
- 入门案例
- 需求
- 分析
- 实现
- 批处理 - DataStream(从文件中读取批数据)
- 流处理 - DataStream(从socket中读取流数据)
- 流处理 - DataStream(扩展1:从socket中读取流数据,Lambda的方式实现)
- 流处理 - DataStream(扩展2:从socket中读取流数据,Lambda的方式实现)
- 流处理 -Table API
- 流处理 - SQL
- Flink程序提交部署
- Flink程序提交部署
- 以UI的方式递交
- 以命令的方式递交
- 今日总结
Flink基础
今日课程内容目标
- 为什么要学Flink
- 技术更新迭代
- 市场趋势
- 流式计算
- 批量计算
- 流式计算
- Flink简介
- Flink架构体系
- Flink安装部署
- Local
- Standalone
- Yarn【最后一天学习】
- Flink入门案例
- 批处理(已过期)
- 流处理(DataStream API、Table API、SQL)
为什么要学Flink
技术更新迭代
- 离线计算
Hadoop(MR) -> Tez(MR增强版) -> Spark(内存计算)
- 流式计算(实时计算)
Storm -> StructuredStreaming -> Flink
市场需求
小结:流式计算需求趋向于火热。同时,由于大公司在推进,因此,互联网实时需求越来越旺盛。
流式计算
批量计算
概念
批量计算,数据是一批一批地计算,来一批处理一批。
特点
数据是有界的,数据是有开始,也有结束的。
数据一旦产生,不会更改
时效性低
批量计算的优势和弊端
批量计算的优势,是对历史数据的处理。对于时效性要求不高。
但是,对于一些时效性要求高的场景:
- 实时监控网站的异常情况
- 实时监控道路拥堵情况
- 实时监控全国疫情爆发情况
- 实时监控网站成交情况
这个时候,就需要流式计算了。
流式计算
生活中流场景
生活中的流式场景比较多,比如水流,车流,人流(行人),气流,电流,如下图(以水流为例)
这些流式场景,他们的共同点是:
-
数据是源源不断,也就是不间断
-
有开始,没有结束
-
来一条处理一条
流式计算的概念
基于数据流的计算,就叫做流式计算。
数据流:数据是流动的,是源源不断的,是没有结束的。
流式计算的框架:
- Storm
- StructuredStreaming
- Flink(主角)
Flink简介
Flink历史
2010-2014年,起源于欧洲柏林大学的一个StratoSphere项目
2014年4月,捐赠给了Apache软件基金会
在2014年底,称为Apache的顶级项目
2019年,Flink的母公司,被阿里巴巴收购
Flink的最新版:1.20.0
我们这次课程,也是基于1.20.0来讲解。
Flink介绍
Flink官网:https://flink.apache.org/
Flink:基于数据流上的有状态的计算。
数据流:流动的数据。
有状态:Flink会保存每个算子的计算中间结果,不需要用户操心。这也是相比Storm框架的优势。
Flink的编程模型【扩展】
- 数据输入
- MySQL数据
- 日志数据
- 物联网数据
- 点击埋点数据
- 数据处理
- Flink程序
- 数据输出
- 关系型数据库
- 文件
- K-V存储介质
Flink架构体系
已学过的框架技术
- HDFS
- NameNode(主)
- DataNode(从)
- Yarn
- ResourceManager(主)
- NodeManager(从)
- Spark
- Master(主)
- Worker(从)
- Flink
- JobManager(主)
- TaskManager(从)
Flink架构
Flink也是主从架构,分为如下:
- JobManager:负责集群管理,资源管理、任务调度、容错等。
- TaskManager:负责任务执行,心跳汇报
- Slot(槽)就是Flink具体任务的场所。Standalone模式下,槽位在集群启动时,就固定了。在Yarn下,可以动态申请TaskManager,因此可以动态增加槽位。
Flink集群搭建
Flink的集群模式
- Local模式【本地模式, 开发环境可用】
- 一个进程模拟全部的角色,处理所有的代码流程。
- Standalone模式【独立模式,测试或者生产环境可用】
- 每个进程都是互相独立的。
- Yarn模式【生产模式常用,基础课最后一天介绍】
- 不需要额外的搭建,只需要把Yarn、HDFS启动即可。
- 基于Yarn来运行Flink。(需要添加Flink基于HDFS的依赖jar包)
Standalone模式集群搭建
安装部署配置
#0.准备
cd /export/software
#1.下载
wget https://archive.apache.org/dist/flink/flink-1.12.0/flink-1.12.0-bin-scala_2.12.tgz
#2.解压
tar -zxvf flink-1.20.0-bin-scala_2.12.tgz -C /export/server/
#3.进入
cd /export/server/
#4.创建软连接
ln -s flink-1.20.0 flink
#5.修改配置
82行:numberOfTaskSlots: 4
170行:address: node1
177行:bind-address: node1
随便找一行,添加:classloader.check-leaked-classloader: false
#6.启动Flink
bin/start-cluster.sh
#7.停止Flink
bin/stop-cluster.sh
#8.FLINK_HOME配置
#FLINK_HOME
export FLINK_HOME=/export/server/flink
export PATH=$PATH:$FLINK_HOME/bin
#9.source环境变量
source /etc/profile
#10.查看WebUI登录页面
http://node1:8081
Flink安装目录介绍
demo案例运行
cd $FLINK_HOME
bin/flink run examples/batch/WordCount.jar
WebUI运行结果如下:
后台结果如下:
Flink入门案例
Flink分层API
Flink还是一个非常易于开发的框架,因为它拥有易于使用的分层API,越往上抽象程度越高,使用起来越方便;越往下越底层,使用起来难度越大,如下图所示:
- SQL/Table API(最顶层)StreamTableEnvironment
- DataStream API(中间层)StreamExecutionEnvironment
- Stateful Function(最底层)
注意:2020年12月8日发布的新版本1.12.0,已经完全实现了真正的流批一体,DataSetAPI已经处于软性弃用(soft deprecated)的状态,用DataStream API写好的一套代码,既可以处理流数据,也可以处理批数据,只需要设置不同的执行模式,这与之前版本处理有界流的方式是不一样的,Flink已专门对批处理数据做了优化处理,本课程基于Flink1.20版本研发,因此后续的学习以介绍DataStream API为主。
Flink程序开发流程
一个完整的flink作业无论简单与复杂,flink程序都由如下几个部分组成:
- 构建流式执行环境:获取一个编程、执行入口环境env【固定写法】
- 数据输入:通过数据源组件,加载、创建datastream
- 数据处理: 对datastream调用各种处理算子表达计算逻辑
- 数据输出:通过sink算子指定计算结果的输出方式
- 启动流式任务:在env上触发程序提交运行【固定写法】
注意:写完输出(sink)操作并不代表程序已经结束。因为当main()方法被调用时,其实只是定义了作业的每个执行操作,然后添加到数据流图中;这时并没有真正处理数据——因为数据可能还没来。Flink是由事件驱动的,只有等到数据到来,才会触发真正的计算,这也被称为“延迟执行”或“懒执行”。
所以我们需要显式地调用执行环境的execute()方法,来触发程序执行。execute()方法将一直等待作业完成,然后返回一个执行结果(JobExecutionResult)。
env.execute();
搭建Flink工程
基于mvn创建项目
-
创建一个新项目:
Create New Project
-
下一步,选择
maven
项目,并且勾选:Create from archetype
选项(目的是使用flink
官方提供的项目模板快速生成项目结构)如果是第一次创建项目,那么则需要添加一个新的模板文件,请选择:
Add Archetype
按钮,并把官方提供的模板内容填写完整:
这里需要注意的是
Version
字段,请确认你开发时的flink
版本和你运行的环境版本是一致的,以免带来不必要的麻烦。比如:公司的
flink
集群是是基于1.20.0
版本,而你是基于1.10.0
开发的代码,那么最终上线肯定会遇到兼容性问题的,所以请注意。添加好官方的模板后,我们便可以在以下的列表中选择基于该模板来创建项目基本结构:
-
下一步,配置项目名称,并且取一个唯一的
groupId
名称: -
最后,直接下一步选择默认操作完成即可。整个项目目录结构创建完成,如下:
引入的基本依赖
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<flink.version>1.20.0</flink.version>
<parquet-avro>1.12.2</parquet-avro>
<log4j.version>2.17.1</log4j.version>
<mysql.version>5.1.48</mysql.version>
<lombok.version>1.18.22</lombok.version>
<hadoop.version>3.3.0</hadoop.version>
<target.java.version>1.8</target.java.version>
<scala.binary.version>2.12</scala.binary.version>
<maven.compiler.source>${target.java.version}</maven.compiler.source>
<maven.compiler.target>${target.java.version}</maven.compiler.target>
</properties>
<repositories>
<repository>
<id>apache.snapshots</id>
<name>Apache Development Snapshot Repository</name>
<url>https://repository.apache.org/content/repositories/snapshots/</url>
<releases>
<enabled>false</enabled>
</releases>
<snapshots>
<enabled>true</enabled>
</snapshots>
</repository>
</repositories>
<dependencies>
<!-- Apache Flink dependencies -->
<!-- These dependencies are provided, because they should not be packaged into the JAR file. -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table-planner -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- Add connector dependencies here. They must be in the default scope (compile). -->
<!-- Example:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka</artifactId>
<version>3.0.0-1.17</version>
</dependency>
-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka</artifactId>
<version>3.3.0-1.20</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-files</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-jdbc</artifactId>
<version>3.2.0-1.19</version>
</dependency>
<!-- flink连接器-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-csv</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-json</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-parquet</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- Add logging framework, to produce console output when running in the IDE. -->
<!-- These dependencies are excluded from the application JAR by default. -->
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
<version>${log4j.version}</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
<version>${log4j.version}</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>${log4j.version}</version>
<scope>runtime</scope>
</dependency>
<!--lombok插件-->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>${lombok.version}</version>
</dependency>
<!--第三方工具包-->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>2.0.53</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>${mysql.version}</version>
</dependency>
<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-all</artifactId>
<version>5.8.9</version>
</dependency>
</dependencies>
<build>
<plugins>
<!-- Java Compiler -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.1</version>
<configuration>
<source>${target.java.version}</source>
<target>${target.java.version}</target>
</configuration>
</plugin>
<!-- We use the maven-shade plugin to create a fat jar that contains all necessary dependencies. -->
<!-- Change the value of <mainClass>...</mainClass> if your program entry point changes. -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.1.1</version>
<executions>
<!-- Run shade goal on package phase -->
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<createDependencyReducedPom>false</createDependencyReducedPom>
<artifactSet>
<excludes>
<exclude>org.apache.flink:flink-shaded-force-shading</exclude>
<exclude>com.google.code.findbugs:jsr305</exclude>
<exclude>org.slf4j:*</exclude>
<exclude>org.apache.logging.log4j:*</exclude>
</excludes>
</artifactSet>
<filters>
<filter>
<!-- Do not copy the signatures in the META-INF folder.
Otherwise, this might cause SecurityExceptions when using the JAR. -->
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>cn.itcast.DataStreamJob</mainClass>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
<pluginManagement>
<plugins>
<!-- This improves the out-of-the-box experience in Eclipse by resolving some warnings. -->
<plugin>
<groupId>org.eclipse.m2e</groupId>
<artifactId>lifecycle-mapping</artifactId>
<version>1.0.0</version>
<configuration>
<lifecycleMappingMetadata>
<pluginExecutions>
<pluginExecution>
<pluginExecutionFilter>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<versionRange>[3.1.1,)</versionRange>
<goals>
<goal>shade</goal>
</goals>
</pluginExecutionFilter>
<action>
<ignore/>
</action>
</pluginExecution>
<pluginExecution>
<pluginExecutionFilter>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<versionRange>[3.1,)</versionRange>
<goals>
<goal>testCompile</goal>
<goal>compile</goal>
</goals>
</pluginExecutionFilter>
<action>
<ignore/>
</action>
</pluginExecution>
</pluginExecutions>
</lifecycleMappingMetadata>
</configuration>
</plugin>
</plugins>
</pluginManagement>
</build>
入门案例
需求
使用Flink程序,从文件里读取单词,进行Wordcount单词统计。
分析
#3.数据处理
#3.1,进行扁平化处理
hello hadoop hello
hello hive => 转换成如下 hadoop
hello
hive
#3.2把上述每个单词进行转换,转成(单词,1)
hello (hello,1)
hadoop => (hadoop,1)
hello (hello,1)
hive (hive,1)
#3.3 把上述单词,按照word(单词)进行分组
(hello,1) (hello,1),(hello,1)
(hadoop,1) => (hadoop,1)
(hello,1) (hive,1)
(hive,1)
#3.4 把相同组内的单词,进行sum求和
(hello,1),(hello,1) (hello,2)
(hadoop,1) => (hadoop,n)
(hive,1) (hive,n)
实现
批处理 - DataStream(从文件中读取批数据)
package day01;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
/**
* @author: itcast
* @date: 2022/10/26 16:48
* @desc: Flink 程序实现Wordcount单词统计(批处理)
*/
public class Demo01_WordCountBatch {
public static void main(String[] args) throws Exception {
//1.构建流式执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env.setRuntimeMode(RuntimeExecutionMode.BATCH);
//2.数据输入(数据源)
DataStreamSource<String> source = env.readTextFile("D:\\word.txt");
//3.数据处理,匿名内部类 new 接口类(){}
//3.1 flatMap进行扁平化处理
SingleOutputStreamOperator<String> flatMapStream = source.flatMap(new FlatMapFunction<String, String>() {
@Override
public void flatMap(String value, Collector<String> out) throws Exception {
String[] words = value.split(" ");
for (String word : words) {
out.collect(word);
}
}
});
//3.2 使用map方法,进行转换(单词,1)int -> Integer
SingleOutputStreamOperator<Tuple2<String, Integer>> mapStream = flatMapStream.map(new MapFunction<String, Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> map(String value) throws Exception {
return Tuple2.of(value, 1);
}
});
//3.3 使用keyBy算子进行单词分组 (hello,1)
KeyedStream<Tuple2<String, Integer>, String> keyedStream = mapStream.keyBy(new KeySelector<Tuple2<String, Integer>, String>() {
@Override
public String getKey(Tuple2<String, Integer> value) throws Exception {
return value.f0;
}
});
//3.4进行reduce(sum)操作(hello,1),(hello,1)
SingleOutputStreamOperator<Tuple2<String, Integer>> result = keyedStream.reduce(new ReduceFunction<Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1, Tuple2<String, Integer> value2) throws Exception {
return Tuple2.of(value1.f0, value1.f1 + value2.f1);
}
});
//4.数据输出
result.print();
//5.启动流式任务
env.execute();
}
}
运行结果如下:
流处理 - DataStream(从socket中读取流数据)
package day01;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
/**
* @author: itcast
* @date: 2022/10/26 17:18
* @desc: Flink 代码实现流处理,进行单词统计。数据源来自于socket数据。
*/
public class Demo02_WordCountStream {
public static void main(String[] args) throws Exception {
//1.构建流式执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
env.setParallelism(1);
//2.数据输入(数据源)
//从socket读取数据,socket = hostname + port
DataStreamSource<String> source = env.socketTextStream("node1", 9999);
//3.数据处理
//3.1 使用flatMap进行扁平化处理
SingleOutputStreamOperator<String> flatMapStream = source.flatMap(new FlatMapFunction<String, String>() {
@Override
public void flatMap(String value, Collector<String> out) throws Exception {
String[] words = value.split(" ");
for (String word : words) {
out.collect(word);
}
}
});
//3.2 使用map进行转换,转换成(单词,1)
SingleOutputStreamOperator<Tuple2<String, Integer>> mapStream = flatMapStream.map(new MapFunction<String, Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> map(String value) throws Exception {
return Tuple2.of(value, 1);
}
});
//3.3使用keyBy进行单词分组
KeyedStream<Tuple2<String, Integer>, String> keyedStream = mapStream.keyBy(new KeySelector<Tuple2<String, Integer>, String>() {
@Override
public String getKey(Tuple2<String, Integer> value) throws Exception {
return value.f0;
}
});
//3.4 使用reduce(sum)进行聚合操作,sum:就是根据第一个元素(Integer)进行sum操作
SingleOutputStreamOperator<Tuple2<String, Integer>> result = keyedStream.sum(1);
//4.数据输出
result.print();
//5.启动流式任务
env.execute();
}
}
运行结果如下:
流处理 - DataStream(扩展1:从socket中读取流数据,Lambda的方式实现)
package day01;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
/**
* @author: itcast
* @date: 2022/10/27 9:21
* @desc: 扩展2:采用Lambda表达式的方式来编写Flink wordcount入门案例
*/
public class Demo04_WordCountStream_03 {
public static void main(String[] args) throws Exception {
//1.构建流式执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
//2.数据输入
DataStreamSource<String> source = env.socketTextStream("node1", 9999);
//3.数据处理
SingleOutputStreamOperator<Tuple2<String, Integer>> result = source.flatMap((String value, Collector<String> out) -> {
String[] words = value.split(" ");
for (String word : words) {
out.collect(word);
}
}).returns(Types.STRING).map(value -> Tuple2.of(value, 1))
.returns(Types.TUPLE(Types.STRING,Types.INT))
.keyBy(value -> value.f0)
.sum(1);
//4.数据输出
result.print();
//5.启动流式任务
env.execute();
}
}
运行结果如下:
流处理 - DataStream(扩展2:从socket中读取流数据,Lambda的方式实现)
package day01;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
import java.util.Arrays;
/**
* @author: itcast
* @date: 2022/10/27 9:21
* @desc: 扩展3:采用Lambda表达式的方式来编写Flink wordcount入门案例
*/
public class Demo04_WordCountStream_04 {
public static void main(String[] args) throws Exception {
//1.构建流式执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
//2.数据输入
DataStreamSource<String> source = env.socketTextStream("node1", 9999);
//3.数据处理
SingleOutputStreamOperator<Tuple2<String, Integer>> result = source.flatMap((String value, Collector<String> out) -> {
Arrays.stream(value.split(" ")).forEach(out::collect);
}).returns(Types.STRING).map(value -> Tuple2.of(value, 1))
.returns(Types.TUPLE(Types.STRING,Types.INT))
.keyBy(value -> value.f0)
.sum(1);
//4.数据输出
result.print();
//5.启动流式任务
env.execute();
}
}
任务运行截图:
流处理 -Table API
package day01;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.Expressions;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.api.TableDescriptor;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import java.util.concurrent.ExecutionException;
/**
* @author: itcast
* @date: 2022/10/27 9:55
* @desc: 使用Flink Table API进行wordcount单词统计。
* Table:表,(MySQL、Hive、Spark)
* 是不是需要先准备好表?
* 在Flink里面,同样如此。
* //1.构建流式执行环境
* //2.数据输入(数据输入表)
* //3.数据输出(数据输出表)
* //4.数据处理(基于数据输入表、数据输出表进行业务处理(单词统计)
* //5.启动流式任务
*/
public class Demo05_WordCountTable {
public static void main(String[] args) throws Exception {
//1.构建流式执行环境
//env 对象是基于DataStream API构建的,如果需要使用Table API/SQL来提交Flink任务,则需要使用Flink里的StreamTableEnvironment对象
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment t_env = StreamTableEnvironment.create(env);
t_env.getConfig().set("parallelism.default","1");
//2.数据输入(数据输入表)
/**
* createTemporaryTable(String tableName,TableDescriptor tableDescriptor);
* tableName:表名
* tableDescriptor:描述表的schema,column等信息的
* connector: 就类似于jdbc的驱动类,但是Flink不叫驱动包(驱动类),Flink叫做Connector,连接器。
* 连接器:就是用来连接外部数据源的。
*/
/**
* | word |
* | hello |
* | hive |
* | flink |
*/
t_env.createTemporaryTable("source", TableDescriptor.forConnector("datagen")
.schema(Schema.newBuilder()
.column("word", DataTypes.STRING()).build())
.option("rows-per-second","1")
.option("fields.word.kind","random")
.option("fields.word.length","1")
.build());
//3.数据输出(数据输出表)
/**
* | word | counts |
* | a | 2 |
* | 1 | 3 |
*/
t_env.createTemporaryTable("sink",TableDescriptor.forConnector("print")
.schema(Schema.newBuilder()
.column("word",DataTypes.STRING())
.column("counts",DataTypes.BIGINT()).build())
.build());
//4.数据处理(基于数据输入表、数据输出表进行业务处理(单词统计)
/**
* 处理逻辑:
* 首先从源表把数据读取出来,根据单词进行分组,然后按照分组后的字段(word,count(*))进行统计。
* from:从源表读取数据
* groupBy:根据xx字段分组
* select:分组后选择需要的数据,选择的数据&类型需要和目标表匹配
* executeInsert:把最终结果插入到目标表中去
* insert into sink
* select word ,count(*) from source group by word
*/
t_env.from("source")
.groupBy(Expressions.$("word"))
.select(Expressions.$("word"),Expressions.lit(1).count())
.executeInsert("sink")
.await();
//5.启动流式任务
env.execute();
}
}
执行结果如下:
流处理 - SQL
package day01;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import java.util.concurrent.ExecutionException;
/**
* @author: itcast
* @date: 2022/10/27 10:42
* @desc: 使用Flink SQL完成单词统计
*/
public class Demo06_WordCountSQL {
public static void main(String[] args) throws Exception {
//1.构建流式执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment t_env = StreamTableEnvironment.create(env);
t_env.getConfig().set("parallelism.default","1");
//2.构建数据源表(数据输入)
/**
* | word |
* | hello |
* | hive |
* | spark |
* | flink |
*/
t_env.executeSql("create table source(" +
"word varchar" +
") with (" +
"'connector' = 'datagen'," +
"'rows-per-second' = '1'," +
"'fields.word.kind' = 'random'," +
"'fields.word.length' = '1'" +
")");
//3.构建数据输出表(数据输出)
/** 表结构如下:
* | word | counts |
* | hello | 1 |
* | hive | 2 |
* | flink | 3 |
*/
t_env.executeSql("create table sink(" +
"word varchar," +
"counts bigint" +
") with (" +
"'connector' = 'print'" +
")");
//4.数据处理
/**
* 数据处理逻辑SQL如下:
* insert into sink select word,count(*) from source group by word
*/
t_env.executeSql("insert into sink select word,count(*) from source group by word")
.await();
//5.启动流式任务
env.execute();
}
}
执行结果如下:
Flink程序提交部署
Flink程序提交部署
Flink程序递交方式有两种:
- 以UI的方式递交
- 以命令的方式递交
以UI的方式递交
提交步骤:
#1.使用idea自带的打包工具进行打包(双击package即可)
#2.使用瘦包即可(小的包)传到webUI上
#3.设置Entry class(day01.Demo02_WordCountStream)Parallelism(1)
#4.开启socket(nc -lk 9999)
#5.Submit提交
#6.在linux终端输入单词
-
指定递交参数
-
查看任务运行概述
-
查看任务运行结果
以命令的方式递交
-
上传作业jar包到linux服务器
-
配置执行模式(可选)
-
指定递交命令
flink run -c day01.Demo02_WordCountStream original-flinkbase-1.0-SNAPSHOT.jar
-
查看任务运行概述
今日总结
- 学习 Flink 的入门和综述,主要介绍了 Flink 的起源和应用场景,引出了流处理相关 的一些重要概念,并通过介绍数据处理架构发展演变的过程,展示了 Flink 作为新一代分布式流处理器的架构思想。
- 实现了一个Flink 开发的入门程序——词频统计 WordCount。通过批处理和流处理两种不同模式的实现,可以对Flink的API风格和编程方式有所熟悉,并且可以更加深刻地理解批处理和流处理的不同。另外,通过读取有界数据(文件)和无界数据(Socket 文本流)进行流处理的比较,可以更加直观地体会Flink流处理的方式和特点。