目录
执行环境(Execution Environment)
创建执行环境
执行模式配置
触发程序执行
源算子(Source)
从集合中读取数据
从文件中读取数据
从Socket读取数据
从Kafka读取数据***
自定义 Source(数据源)
Flink对POJO的要求
类型提示
DataStream API的基本构成:
执行环境(Execution Environment)
创建执行环境
StreamExecutionEnvironment
——流处理环境
1.getExecutionEnvironment
:获取当前程序运行的环境(根据当前运行的方式,自行决定该返回什么样的运行环境)
2.createLocalEnvironment
(并行度):返回一个本地执行环境,可以在调用时传入一个参数,指定默认的并行度;如果不传入,则默认并行度就是本地的 CPU 核心数
StreamExecutionEnvironment localEnv = StreamExecutionEnvironment.createLocalEnvironment();
3.createRemoteEnvironment
:这个方法返回集群执行环境。需要在调用时指定 JobManager 的主机名和端口号,并指定要在集群中运行的 Jar 包
StreamExecutionEnvironment remoteEnv = StreamExecutionEnvironment
.createRemoteEnvironment(
"host", // JobManager 主机名
1234, // JobManager 进程端口号
"path/to/jarFile.jar" // 提交给 JobManager 的 JAR 包
);
ExecutionEnvironment
——批处理API与流处理相同
执行模式配置
流处理/批处理/自动模式(将由程序根据输入数据源是否有界,来自动选择执行模式)
默认是流处理模式
设置为批处理:
命令行:bin/flink run -Dexecution.runtime-mode=BATCH
代码:env.setRuntimeMode(RuntimeExecutionMode.BATCH)
建议在命令行中配置
触发程序执行
env.execute();
Flink 是由事件驱动的,只有等到数据到来,才会触发真正的计算,这也被称为“延迟执行”或“懒执行”,所以我们需要显式地调用执行环境的 execute()方法,来触发程序执行
源算子(Source)
从集合中读取数据
fromCollection(集合对象)
ArrayList<Event> clicks = new ArrayList<>();
clicks.add(new Event("Mary","./home",1000L));
clicks.add(new Event("Bob","./cart",2000L));
//从集合中读取数据
DataStream<Event> stream = env.fromCollection(clicks);
stream.print();
fromElements(对象1.对象2.....)
//直接将元素列举出来,调用 fromElements 方法进行读取数据
DataStreamSource<Event> stream2 = env.fromElements(
new Event("Mary", "./home", 1000L),
new Event("Bob", "./cart", 2000L)
);
从文件中读取数据
readTextFile()
注意:使用hdfs路径的时候需要添加相关依赖:
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.7.5</version>
<scope>provided</scope>
</dependency>
从Socket读取数据
socketTextStream(hostname,port)
从Kafka读取数据***
Flink内部未提供与kafka进行连接的预实现方法,因此需要采用addSource 方式、实现一个 SourceFunction:
Flink官方提供了连接工具flink-connector-kafka,直接帮我们实现了一个消费者:FlinkKafkaConsumer,它就是用来读取 Kafka 数据的SourceFunction
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "hadoop102:9092");
properties.setProperty("group.id", "consumer-group");
properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.setProperty("auto.offset.reset", "latest");
DataStreamSource<String> stream = env.addSource(new FlinkKafkaConsumer<String>(
"clicks",
new SimpleStringSchema(),
properties
));
自定义 Source(数据源)
自定义数据源
//实现SourceFunction接口,能够自定义数据源;
//实现ParallelSourceFunction接口,能够设置并行度(否则并行度只能是1)
public class ClickSource implements SourceFunction<Event>,
ParallelSourceFunction<Event> {
// 声明一个布尔变量,作为控制数据生成的标识位
private Boolean running = true;
@Override
public void run(SourceContext<Event> sourceContext) throws Exception {
Random random = new Random(); // 在指定的数据集中随机选取数据
String[] users = {"Mary", "Alice", "Bob", "Cary"};
String[] urls = {"./home", "./cart", "./fav", "./prod?id=1",
"./prod?id=2"};
while (running) {
sourceContext.collect(new Event(
users[random.nextInt(users.length)],
urls[random.nextInt(urls.length)],
Calendar.getInstance().getTimeInMillis()
));
// 隔 1 秒生成一个点击事件,方便观测
Thread.sleep(1000);
}
}
@Override
public void cancel() {
running = false;
}
}
run()方法:使用运行时上下文对象(SourceContext)向下游发送数据;
cancel()方法:通过标识位控制退出循环,来达到中断数据源的效果
读取自定义的数据源:
//有了自定义的 source function,调用 addSource 方法
DataStreamSource<Event> stream = env.addSource(new ClickSource());
注意:
SourceFunction 接口定义的数据源,并行度只能设置为 1,如果数据源设置为大于 1 的并行度,则会抛出异常
ParallelSourceFunction接口可用来定义并行的数据源;
Flink对POJO的要求
在项目实践中,往往会将流处理程序中的元素类型定为 Flink 的 POJO 类型
类型提示
在Lambda 表达式中,通过自动提取系统提取的数据类型是不够精细的,所以需要显式地提供类型信息,才能使应用程序正常工作或提高其性能
也就是对应的类型提示API
通过return说明类型
:
简单数据类型:
泛型: