9.3、转换算子
9.3.1、基本转换算子
9.3.1.1、映射map
一一映射
package transform;
import bean.WaterSensor;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
/**
* @Title: MapDemo
* @Author lizhe
* @Package transform
* @Date 2024/5/31 19:55
* @description:
*/
public class MapDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStreamSource<WaterSensor> sensorDataStreamSource = env.fromElements(
new WaterSensor("s1", 1L, 1),
new WaterSensor("s2", 2L, 2),
new WaterSensor("s3", 3L, 3)
);
SingleOutputStreamOperator<Object> map = sensorDataStreamSource.map((v) -> {
return v.getId();
});
map.print();
env.execute();
}
}
9.3.1.2、过滤
转换操作,对数据流进行过滤,通过布尔条件表达式设置过滤条件,true正常输出,false被过滤掉
package transform;
import bean.WaterSensor;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
/**
* @Title: MapDemo
* @Author lizhe
* @Package transform
* @Date 2024/5/31 19:55
* @description:
* s1数据:一进一出
* s2数据:一进二出
* s3数据:一进零出
*/
public class FilterDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStreamSource<WaterSensor> sensorDataStreamSource = env.fromElements(
new WaterSensor("s1", 1L, 1),
new WaterSensor("s2", 2L, 2),
new WaterSensor("s3", 3L, 3)
);
SingleOutputStreamOperator<WaterSensor> filter = sensorDataStreamSource.filter((v) -> {
return "s1".equals(v.getId());
});
filter.print();
env.execute();
}
}
9.3.1.3、扁平映射flatMap
将数据流中的整体拆分成个体使用。消费一个元素可产生多个元素。(一进多出)flatMap为flatten和map的结合,即按照某种规则对数据进行打散拆分,再对拆分后的元素做转换处理。
package transform;
import bean.WaterSensor;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
/**
* @Title: MapDemo
* @Author lizhe
* @Package transform
* @Date 2024/5/31 19:55
* @description:
*/
public class FlatMapDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStreamSource<WaterSensor> sensorDataStreamSource = env.fromElements(
new WaterSensor("s1", 1L, 11),
new WaterSensor("s2", 2L, 22),
new WaterSensor("s3", 3L, 3)
);
SingleOutputStreamOperator<String> flatmap = sensorDataStreamSource.flatMap(new FlatMapFunction<WaterSensor, String>() {
@Override
public void flatMap(WaterSensor value, Collector<String> out) throws Exception {
if ("s1".equals(value.getId())) {
out.collect(String.valueOf(value.getVc()));
} else if ("s2".equals(value.getId())) {
out.collect(String.valueOf(value.getTs()));
out.collect(String.valueOf(value.getVc()));
}
}
});
flatmap.print();
env.execute();
}
}
map使用的是return来控制一进一出,flatMap使用Collector,可调用多次采集器实现一进多出
9.3.1.4、聚合算子Aggregation
计算结果不仅依赖当前数据,还与之前的数据有关
-
按键分区keyby
DataStream没有直接聚合的API。在flink中要聚合先进行可以不用keyby分区。keyby通过指定key将一条流划分成不同的分区,分区就是并行处理的子任务。
package aggreagte; import bean.WaterSensor; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.KeyedStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.util.Collector; /** * @Title: MapDemo * @Author lizhe * @Package transform * @Date 2024/5/31 19:55 * @description: * s1数据:一进一出 * s2数据:一进二出 * s3数据:一进零出 */ public class KeybyDemo { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(2); DataStreamSource<WaterSensor> sensorDataStreamSource = env.fromElements( new WaterSensor("s1", 1L, 11), new WaterSensor("s1",11L,11), new WaterSensor("s2", 2L, 22), new WaterSensor("s3", 3L, 3) ); /* * 按照id分组 * 返回一个键控流KeyedStream,keyBy不是算子 * keyby分组与分区的关系: * 1)keyby对数据进行分组,保证相同key的数据在同一个分区 * 2)分区:一个子任务可以理解为一个分区,一个分区(子任务)中可以存在多个分组(key) * */ KeyedStream<WaterSensor, String> keyBy = sensorDataStreamSource.keyBy(new KeySelector<WaterSensor, String>() { @Override public String getKey(WaterSensor value) throws Exception { return value.getId(); } }); keyBy.print(); env.execute(); } }
-
简单聚合
按键分区后可以进行聚合操作,基本的API包括:sum、min、max、minBy、maxBy。
sum
package aggreagte; import bean.WaterSensor; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.KeyedStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; public class SimpleAggDemo { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); DataStreamSource<WaterSensor> sensorDataStreamSource = env.fromElements( new WaterSensor("s1", 1L, 11), new WaterSensor("s1", 11L, 11), new WaterSensor("s2", 2L, 22), new WaterSensor("s3", 3L, 3) ); /* * 按照id分组 * 返回一个键控流KeyedStream,keyBy不是算子 * keyby分组与分区的关系: * 1)keyby对数据进行分组,保证相同key的数据在同一个分区 * 2)分区:一个子任务可以理解为一个分区,一个分区(子任务)中可以存在多个分组(key) * */ KeyedStream<WaterSensor, String> keyBy = sensorDataStreamSource.keyBy(new KeySelector<WaterSensor, String>() { @Override public String getKey(WaterSensor value) throws Exception { return value.getId(); } }); //传位置索引适用于tuple类型,不适合pojo类型 // SingleOutputStreamOperator<WaterSensor> result = keyBy.sum(2); SingleOutputStreamOperator<WaterSensor> sum = keyBy.sum("vc"); sum.print(); // SingleOutputStreamOperator<WaterSensor> max = keyBy.max("vc"); // max.print(); // SingleOutputStreamOperator<WaterSensor> maxBy = keyBy.maxBy("vc"); // maxBy.print(); env.execute(); } }
max
package aggreagte; import bean.WaterSensor; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.KeyedStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; public class SimpleAggDemo { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); DataStreamSource<WaterSensor> sensorDataStreamSource = env.fromElements( new WaterSensor("s1", 1L, 1), new WaterSensor("s1", 11L, 11), new WaterSensor("s2", 2L, 22), new WaterSensor("s3", 3L, 3) ); /* * 按照id分组 * 返回一个键控流KeyedStream,keyBy不是算子 * keyby分组与分区的关系: * 1)keyby对数据进行分组,保证相同key的数据在同一个分区 * 2)分区:一个子任务可以理解为一个分区,一个分区(子任务)中可以存在多个分组(key) * */ KeyedStream<WaterSensor, String> keyBy = sensorDataStreamSource.keyBy(new KeySelector<WaterSensor, String>() { @Override public String getKey(WaterSensor value) throws Exception { return value.getId(); } }); //传位置索引适用于tuple类型,不适合pojo类型 // SingleOutputStreamOperator<WaterSensor> result = keyBy.sum(2); // SingleOutputStreamOperator<WaterSensor> sum = keyBy.sum("vc"); // sum.print(); SingleOutputStreamOperator<WaterSensor> max = keyBy.max("vc"); max.print(); // SingleOutputStreamOperator<WaterSensor> maxBy = keyBy.maxBy("vc"); // maxBy.print(); env.execute(); } }
输出结果:
WaterSensor{id='s1', ts=1, vc=1} WaterSensor{id='s1', ts=1, vc=11} WaterSensor{id='s2', ts=2, vc=22} WaterSensor{id='s3', ts=3, vc=3}
maxby
package aggreagte; import bean.WaterSensor; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.KeyedStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; public class SimpleAggDemo { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); DataStreamSource<WaterSensor> sensorDataStreamSource = env.fromElements( new WaterSensor("s1", 1L, 1), new WaterSensor("s1", 11L, 11), new WaterSensor("s2", 2L, 22), new WaterSensor("s3", 3L, 3) ); /* * 按照id分组 * 返回一个键控流KeyedStream,keyBy不是算子 * keyby分组与分区的关系: * 1)keyby对数据进行分组,保证相同key的数据在同一个分区 * 2)分区:一个子任务可以理解为一个分区,一个分区(子任务)中可以存在多个分组(key) * */ KeyedStream<WaterSensor, String> keyBy = sensorDataStreamSource.keyBy(new KeySelector<WaterSensor, String>() { @Override public String getKey(WaterSensor value) throws Exception { return value.getId(); } }); //传位置索引适用于tuple类型,不适合pojo类型 // SingleOutputStreamOperator<WaterSensor> result = keyBy.sum(2); // SingleOutputStreamOperator<WaterSensor> sum = keyBy.sum("vc"); // sum.print(); // SingleOutputStreamOperator<WaterSensor> max = keyBy.max("vc"); // max.print(); SingleOutputStreamOperator<WaterSensor> maxBy = keyBy.maxBy("vc"); maxBy.print(); env.execute(); } }
输出结果
WaterSensor{id='s1', ts=1, vc=1} WaterSensor{id='s1', ts=11, vc=11} WaterSensor{id='s2', ts=2, vc=22} WaterSensor{id='s3', ts=3, vc=3}
max与maxby对比(min与minby同理):
max只会取比较字段的最大值,非比较字段保留第一次的值
maxby会取比较字段最大值这个对象
-
规约函数Reduce
reduce:两两聚合,每个key第一条数据直接存起来并输出,聚合的结果作为下一次的第一条数据
package aggreagte; import bean.WaterSensor; import org.apache.flink.api.common.functions.ReduceFunction; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.KeyedStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; public class ReduceDemo { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); DataStreamSource<WaterSensor> sensorDataStreamSource = env.fromElements( new WaterSensor("s1", 1L, 1), new WaterSensor("s1", 11L, 11), new WaterSensor("s1", 22L, 22), new WaterSensor("s2", 2L, 22), new WaterSensor("s3", 3L, 3) ); /* * 按照id分组 * 返回一个键控流KeyedStream,keyBy不是算子 * keyby分组与分区的关系: * 1)keyby对数据进行分组,保证相同key的数据在同一个分区 * 2)分区:一个子任务可以理解为一个分区,一个分区(子任务)中可以存在多个分组(key) * */ KeyedStream<WaterSensor, String> keyBy = sensorDataStreamSource.keyBy(new KeySelector<WaterSensor, String>() { @Override public String getKey(WaterSensor value) throws Exception { return value.getId(); } }); SingleOutputStreamOperator<WaterSensor> reduce = keyBy.reduce(new ReduceFunction<WaterSensor>() { @Override public WaterSensor reduce(WaterSensor value1, WaterSensor value2) throws Exception { System.out.println("value1="+value1); System.out.println("value2="+value2); return new WaterSensor(value1.id, value2.ts, value1.vc + value2.vc); } }); reduce.print(); env.execute(); } }
9.3.1.5、自定义函数及分区
自定义函数分为:函数类、匿名函数、富函数类
物理分区即数据进入到多个线程中的哪个线程。常见分区策略:随机分配、轮询分配、重缩放、广播。
轮询(rebalance):一般一个source对应一个kafka的partition,如果partition数据源不均匀(数据倾斜),可通过轮询分配进行负载均衡。
缩放(rescale):实现轮询,局部组队,比rebalance高效。
广播(broadcast):下发到下游所有子任务
9.3.1.6、分流
将一条数据拆分成完全独立的两条或多条流。基于一个DataStream通过筛选条件将符合条件的数据放到对应的流里。
只要针对同一条流进行多次独立调用filter()方法进行筛选就可以得到拆分之后的流,但是效率较低,所有数据都要过滤多次。
package split;
import bean.WaterSensor;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class SplitByFilterDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStreamSource<String> dataStreamSource = env.socketTextStream("192.168.132.101", 7777);
dataStreamSource.filter(value -> Integer.parseInt(value)%2==0).print("偶数流");
dataStreamSource.filter(value -> Integer.parseInt(value) % 2 == 1).print("奇数流");
env.execute();
}
}
使用侧输出流实现分流,可实现数据筛选、告警等
- 使用process算子
- 定义OutputTag对象
- 调用ctx.output
- 通过主流获取侧输出流
package split;
import bean.WaterSensor;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
import java.lang.reflect.Type;
public class SideOutputDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
//如果是s1放到侧输出流s1中
OutputTag<WaterSensor> s1 = new OutputTag<>("s1", Types.POJO(WaterSensor.class));
//如果是s2放到侧输出流s2中
OutputTag<WaterSensor> s2 = new OutputTag<>("s2", Types.POJO(WaterSensor.class));
SingleOutputStreamOperator<WaterSensor> dataStreamSource = env.socketTextStream("192.168.132.101", 7777)
.map(value ->{
String[] datas = value.split(",");
return new WaterSensor(datas[0],Long.parseLong(datas[1]),Integer.parseInt(datas[2]) );
} );
SingleOutputStreamOperator<WaterSensor> process = dataStreamSource.process(new ProcessFunction<WaterSensor, WaterSensor>() {
@Override
public void processElement(WaterSensor value, Context ctx, Collector<WaterSensor> out) throws Exception {
String id = value.getId();
if (id.equals("s1")) {
ctx.output(s1, value);
} else if (id.equals("s2")) {
ctx.output(s2, value);
} else {
//其他放到主流中
out.collect(value);
}
}
});
//打印主输出流
process.print("主输出流");
//打印侧输出流
process.getSideOutput(s1).print("s1侧输出流");
process.getSideOutput(s2).print("s2侧输出流");
env.execute();
}
}
9.3.1.7、合流
1、联合union
最简单的合流操作就是将多条流合到一起,要求流中的数据类型必须相同,合并后新流包括所有流的元素,数据类型不变,一次可以合并多条流。
package combineDemo;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class UnionDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStreamSource<Integer> source1 = env.fromElements(1, 2, 3, 4, 5);
DataStreamSource<Integer> source2 = env.fromElements(11, 22, 33, 44, 55);
DataStreamSource<String> source3 = env.fromElements("111", "222","333","444","555");
DataStream<Integer> union = source1.union(source2).union(source3.map(value -> Integer.parseInt(value)));
union.print();
env.execute();
}
}
2、连接connect
为合并不同数据类型的数据flink提供connect合流操作。connect连接后得到的是ConnectedStream,形式上统一但内部内部各自数据形式不变,彼此之间相互独立。如要得到新的DataStream要使用“同处理”(co-process),如map、flatmap等,各自处理各自的。
connect一次只能连接两条流。
package combineDemo;
import org.apache.flink.streaming.api.datastream.ConnectedStreams;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.CoMapFunction;
public class ConnectDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStreamSource<Integer> source1 = env.fromElements(1, 2, 3, 4, 5);
DataStreamSource<String> source2 = env.fromElements("111", "222","333","444","555");
ConnectedStreams<Integer, String> connect = source1.connect(source2);
SingleOutputStreamOperator<String> map = connect.map(new CoMapFunction<Integer, String, String>() {
@Override
public String map1(Integer value) throws Exception {
return value.toString();
}
@Override
public String map2(String value) throws Exception {
return value;
}
});
map.print();
env.execute();
}
}
ConnectedStreams可以直接调用keyBy()进行按键分区得到的还是一个ConnectedStreams,通过keyBy()将两条流中key相同的数据放到了一起,然后针对来源的流再各自处理。(类似inner join)
package combineDemo;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.ConnectedStreams;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.CoMapFunction;
import org.apache.flink.streaming.api.functions.co.CoProcessFunction;
import org.apache.flink.util.Collector;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class ConnectKeybyDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(2);
DataStreamSource<Tuple2<Integer, String>> source1 = env.fromElements(
Tuple2.of(1, "a1"),
Tuple2.of(1, "a2"),
Tuple2.of(2, "b"),
Tuple2.of(3, "c")
);
DataStreamSource<Tuple3<Integer, String,Integer>> source2 = env.fromElements(
Tuple3.of(1, "aa1",1),
Tuple3.of(1, "aa2",2),
Tuple3.of(2, "bb",1),
Tuple3.of(3, "cc",1)
);
ConnectedStreams<Tuple2<Integer, String>, Tuple3<Integer, String, Integer>> connect = source1.connect(source2);
//多并行度下,要根据关联条件进行keyby,才能保证key相同的数据在一个子任务(线程)里,这样才能匹配上
ConnectedStreams<Tuple2<Integer, String>, Tuple3<Integer, String, Integer>> keyBy = connect.keyBy(s1 -> s1.f0, s2 -> s2.f0);
SingleOutputStreamOperator<String> process = keyBy.process(new CoProcessFunction<Tuple2<Integer, String>, Tuple3<Integer, String, Integer>, String>() {
//每条流定一个hashmap用来存储数据
Map<Integer, List<Tuple2<Integer, String>>> s1Cache = new HashMap<>();
Map<Integer, List<Tuple3<Integer, String, Integer>>> s2Cache = new HashMap<>();
@Override
public void processElement1(Tuple2<Integer, String> value, Context ctx, Collector<String> out) throws Exception {
Integer id = value.f0;
//先把s1数据存到map中
if(!s1Cache.containsKey(id)){
ArrayList<Tuple2<Integer, String>> s1Value = new ArrayList<>();
s1Value.add(value);
s1Cache.put(id, s1Value);
}else {
s1Cache.get(id).add(value);
}
if (s2Cache.containsKey(id)){
for (Tuple3<Integer, String, Integer> s2Element : s2Cache.get(id)) {
out.collect("s1"+value+"---------"+"s2"+s2Element);
}
}
}
@Override
public void processElement2(Tuple3<Integer, String, Integer> value, Context ctx, Collector<String> out) throws Exception {
Integer id = value.f0;
//先把s1数据存到map中
if(!s2Cache.containsKey(id)){
ArrayList<Tuple3<Integer, String, Integer>> s2Value = new ArrayList<>();
s2Value.add(value);
s2Cache.put(id, s2Value);
}else {
s2Cache.get(id).add(value);
}
if (s1Cache.containsKey(id)){
for (Tuple2<Integer, String> s1Element : s1Cache.get(id)) {
out.collect("s1"+s1Element+"---------"+"s2"+value);
}
}
}
});
process.print();
env.execute();
}
}
9.4、输出算子sink
将计算结果写到外部存储
输出到外部系统参考官网。
9.4.1、输出到文件FileSink
package sink;
import org.apache.flink.api.common.serialization.SimpleStringEncoder;
import org.apache.flink.api.connector.sink.Sink;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.connector.file.sink.FileSink;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.filesystem.DefaultBucketFactoryImpl;
import org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig;
import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.DateTimeBucketAssigner;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
import java.time.Duration;
import java.time.ZoneId;
import java.util.concurrent.TimeUnit;
public class SinkFile {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
//必须开启,否则文件一直是.inprogress
env.enableCheckpointing(2000, CheckpointingMode.EXACTLY_ONCE);
DataStreamSource<String> source = env.fromElements("111", "222","333","444","555");
FileSink<String> sink = FileSink
//官网示例
.forRowFormat(new Path("f:/tmp"), new SimpleStringEncoder<String>("UTF-8"))
.withRollingPolicy(
DefaultRollingPolicy.builder()
.withRolloverInterval(TimeUnit.MINUTES.toSeconds(5))
// .withInactivityInterval(TimeUnit.MINUTES.toMillis(5))
.withMaxPartSize(1024L)
.build())
.build();
// FileSink<String> fileSink = FileSink
// //输出行式存储文件
// .<String>forRowFormat(new Path("f:/tmp"), new SimpleStringEncoder<>())
// //输出文件配置
// .withOutputFileConfig(
// OutputFileConfig.builder()
// .withPartPrefix("test")
// .withPartSuffix(".log")
// .build()
// )
// //文件分桶
// .withBucketAssigner(new DateTimeBucketAssigner<>("yy-MM-dd", ZoneId.systemDefault()))
// //文件滚动策略
// .withRollingPolicy(DefaultRollingPolicy.builder()
// .withRolloverInterval(5L * 1000L)
// .withMaxPartSize(1L * 1024L)
// .build()
// ).build();
source.sinkTo(sink);
env.execute();
}
}
9.4.2、输出到kafka
参考官网:https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/datastream/kafka/
package sink;
import bean.WaterSensor;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema;
import org.apache.flink.streaming.connectors.kafka.table.KafkaSinkSemantic;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.nio.charset.StandardCharsets;
import java.util.Properties;
public class SinkKafkaDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();;
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "192.168.132.100:9092,192.168.132.101:9092,192.168.132.102:9092");
SingleOutputStreamOperator<WaterSensor> dataStreamSource = env.socketTextStream("192.168.132.101", 7777)
.map(value ->{
String[] datas = value.split(",");
return new WaterSensor(datas[0],Long.parseLong(datas[1]),Integer.parseInt(datas[2]) );
} );
KafkaSerializationSchema<WaterSensor> serializationSchema = new KafkaSerializationSchema<WaterSensor>() {
@Override
public ProducerRecord<byte[], byte[]> serialize(WaterSensor s,Long time ) {
return new ProducerRecord<>(
"test", // target topic
s.toString().getBytes(StandardCharsets.UTF_8)); // record contents
}
};
dataStreamSource.addSink(new FlinkKafkaProducer<WaterSensor>(
"test",serializationSchema,properties,FlinkKafkaProducer.Semantic.EXACTLY_ONCE
));
env.execute();
}
}
![在这里插入图片描述](https://i-blog.csdnimg.cn/direct/c12239189c82417f8d17f9f8312dcf97.png)
##### 9.4.3、输出到MySQL
参考官网:https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/datastream/jdbc/
```java
package sink;
import bean.WaterSensor;
import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
import org.apache.flink.connector.jdbc.JdbcSink;
import org.apache.flink.connector.jdbc.JdbcStatementBuilder;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.nio.charset.StandardCharsets;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.Properties;
public class SinkMysqlDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();;
env.setParallelism(1);
SingleOutputStreamOperator<WaterSensor> dataStreamSource = env.socketTextStream("192.168.132.101", 7777)
.map(value ->{
String[] datas = value.split(",");
return new WaterSensor(datas[0],Long.parseLong(datas[1]),Integer.parseInt(datas[2]) );
} );
dataStreamSource.print();
SinkFunction<WaterSensor> waterSensorSinkFunction = JdbcSink.sink(
"insert into ws (id,ts,vc) values (?, ?, ?)", // mandatory
new JdbcStatementBuilder<WaterSensor>() {
@Override
public void accept(PreparedStatement preparedStatement, WaterSensor waterSensor) throws SQLException {
preparedStatement.setString(1, waterSensor.id);
preparedStatement.setLong(2, waterSensor.ts);
preparedStatement.setInt(3, waterSensor.vc);
}
}, // mandatory
JdbcExecutionOptions.builder()
.withBatchSize(1000)
.withBatchIntervalMs(200)
.withMaxRetries(5)
.build(), // optional
new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
.withUrl("jdbc:mysql://localhost:3306/testflink?" +
"autoReconnect=true&useUnicode=true&characterEncoding=UTF-8&serverTimezone=Asia/Shanghai")
// .withDriverName("org.Mysql.Driver")
.withUsername("root")
.withPassword("123")
.withConnectionCheckTimeoutSeconds(60)
.build() // mandatory
);
dataStreamSource.addSink(waterSensorSinkFunction);
env.execute();
}
}
10、时间和窗口
窗口一般是划定的一段时间范围,即时间窗。窗口本事是截取有界数据的一种方式,对这个范围内的数据进行处理。
10.1、窗口分类
- 按照驱动类型分:时间窗口(定点发车)、计数窗口(人齐发车)
- 按照窗口分配数据的规则分:滚动窗口、滑动窗口、会话窗口、全局窗口
10.2、窗口API概述
按键分区和非按键分区
1、按键分区
按键分区后数据流被key分成多条逻辑流KeyedStream,窗口计算会在多个并行子任务上同时执行。相同key的数据会在一个子任务中,相当于每个key都定义了一组窗口各自独立进行统计计算。
2、非按键分区
原始流dataStreamSource不会分成多条逻辑流,窗口逻辑只能在一个任务上执行,相当于并行度为1。
10.3、窗口分配器
Window Assigners 是构建窗口算子的第一步,用来定义数据被分配到哪个窗口,即指定窗口的类型。一般使用.window()方法,传入Window Assigners参数,返回WindowedStream。非按键分区使用.windowAll(),返回AllWindowedStream.
基于时间:
-
按键分区滚动窗口,窗口长度2秒
keyedStream.window(TumblingProcessingTimeWindows.of(Time.seconds(2)));
-
按键分区滑动时间窗口,窗口长度10s,滑动步长2s
keyedStream.window(SlidingProcessingTimeWindows.of(Time.seconds(10),Time.seconds(2)));
-
按键分区会话窗口,窗口长度2s
keyedStream.window(ProcessingTimeSessionWindows.withGap(Time.seconds(2)));
基于计数:
-
按键分区滚动窗口,窗口长度为5个元素
keyedStream.countWindow(5);
-
按键分区滑动窗口,窗口长度5个元素,滑动步长2个元素
keyedStream.countWindow(5,2);
10.4、窗口函数
窗口分配器只收集数据,窗口函数Window Function进行计算操作。
各种流的相互关系
- 增量聚合:来一条算一条,窗口触发时输出计算结果
- 全窗口函数:数据来了不计算先存上,等窗口触发时计算并输出结果
10.4.1、增量聚合函数
每来一个数据就聚合一次
1、归约函数ReduceFunction
相同key的第一条数据来的时候不会调用reduce方法,来一条数据就算一条,窗口触发输出计算结果
package window;
import bean.WaterSensor;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
public class WindowAPIDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();;
env.setParallelism(1);
SingleOutputStreamOperator<WaterSensor> dataStreamSource = env.socketTextStream("192.168.132.101", 7777)
.map(value ->{
String[] datas = value.split(",");
return new WaterSensor(datas[0],Long.parseLong(datas[1]),Integer.parseInt(datas[2]) );
} );
KeyedStream<WaterSensor, String> keyedStream = dataStreamSource.keyBy(value -> value.getId());
WindowedStream<WaterSensor, String, TimeWindow> windowStream = keyedStream.window(TumblingProcessingTimeWindows.of(Time.seconds(10)));
SingleOutputStreamOperator<WaterSensor> reduce = windowStream.reduce(new ReduceFunction<WaterSensor>() {
@Override
public WaterSensor reduce(WaterSensor value1, WaterSensor value2) throws Exception {
return new WaterSensor(value1.id, value2.ts, value1.vc + value2.vc);
}
});
reduce.print();
env.execute();
}
}
2、聚合函数Aggregate Function
ReduceFunction能解决大多归约聚合问题,但聚合状态类型、输出结果类型和输入数据类型必须一样。Aggregate Function更加灵活,有三种类型:输入IN、累加器ACC、输出OUT。输入IN是输入流中元素的数据类型;累加器类型ACC是聚合中间状态类型;输出OUT是最终计算结果类型。
- 第一条数据来创建窗口和累加器
- 增量聚合:来一条算一条(调用一次add方法)
- 窗口输出调用一次getresult方法
- 输入、输出、中间累加器的类型可以不一样
package window;
import bean.WaterSensor;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.ProcessingTimeSessionWindows;
import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
public class WindowAggregateDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();;
env.setParallelism(1);
SingleOutputStreamOperator<WaterSensor> dataStreamSource = env.socketTextStream("192.168.132.101", 7777)
.map(value ->{
String[] datas = value.split(",");
return new WaterSensor(datas[0],Long.parseLong(datas[1]),Integer.parseInt(datas[2]) );
} );
KeyedStream<WaterSensor, String> keyedStream = dataStreamSource.keyBy(value -> value.getId());
WindowedStream<WaterSensor, String, TimeWindow> window = keyedStream.window(TumblingProcessingTimeWindows.of(Time.seconds(10)));
SingleOutputStreamOperator<String> aggregate = window.aggregate(new AggregateFunction<WaterSensor, Integer, String>() {
@Override
public Integer createAccumulator() {
System.out.println("初始化累加器");
return 0;
}
@Override
public Integer add(WaterSensor value, Integer accumulator) {
System.out.println("调用add");
return value.vc + accumulator;
}
@Override
public String getResult(Integer accumulator) {
System.out.println("输出结果");
return accumulator.toString();
}
@Override
public Integer merge(Integer a, Integer b) {
//只有会话窗口才用
return null;
}
});
aggregate.print();
env.execute();
}
}
10.4.2、全窗口函数
1、窗口函数
.apply(),但是该方法能提供的上下文信息比较少,已经被ProcessWindowFunction全覆盖
window.apply(new WindowFunction<WaterSensor, String, String, TimeWindow>() {
@Override
public void apply(String key, TimeWindow window, Iterable<WaterSensor> input, Collector<String> out) throws Exception {
}
});
2、处理窗口函数
ProcessWindowFunction除了能拿到窗口数据外还能获取上下文对象。上下文包括窗口信息、当前的时间和状态信息(处理时间、事件时间水位线)
窗口触发时才调用一次,统一计算窗口内的所有数据
package window;
import bean.WaterSensor;
import org.apache.commons.lang3.time.DateFormatUtils;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.ProcessingTimeSessionWindows;
import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
public class WindowProcessDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();;
env.setParallelism(1);
SingleOutputStreamOperator<WaterSensor> dataStreamSource = env.socketTextStream("192.168.132.101", 7777)
.map(value ->{
String[] datas = value.split(",");
return new WaterSensor(datas[0],Long.parseLong(datas[1]),Integer.parseInt(datas[2]) );
} );
KeyedStream<WaterSensor, String> keyedStream = dataStreamSource.keyBy(value -> value.getId());
// dataStreamSource.windowAll();
WindowedStream<WaterSensor, String, TimeWindow> window = keyedStream.window(TumblingProcessingTimeWindows.of(Time.seconds(10)));
SingleOutputStreamOperator<String> process = window.process(new ProcessWindowFunction<WaterSensor, String, String, TimeWindow>() {
@Override
public void process(String key, Context context, Iterable<WaterSensor> elements, Collector<String> out) throws Exception {
long start = context.window().getStart();
long end = context.window().getEnd();
String winStart = DateFormatUtils.format(start, "yyyy-MM-dd HH:mm:ss.SSS");
String winEnd = DateFormatUtils.format(end, "yyyy-MM-dd HH:mm:ss.SSS");
long l = elements.spliterator().estimateSize();
out.collect("key=" + key + "的窗口[" + winStart + "," + winEnd + "]包含" + l + "条数据" + elements.toString());
}
});
process.print();
env.execute();
}
}
10.4.3、增量聚合与全窗口函数接合使用
增量聚合Aggregate+全窗口的ProcessWindow
- 增量聚合函数处理数据:来一条算一条
- 窗口触发时,增量聚合结果(只有一条数据)传给全窗口函数
- 经过全窗口函数的处理后输出
从而实现了两者的优点(reduce函数也能传全窗口函数)
- 增量聚合:来一条算一条只存储中间计算结果,占用空间少
- 全窗口函数:可以通过上下文实现灵活的功能
package window;
import bean.WaterSensor;
import org.apache.commons.lang3.time.DateFormatUtils;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
public class WindowAggregateAndProcessDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();;
env.setParallelism(1);
SingleOutputStreamOperator<WaterSensor> dataStreamSource = env.socketTextStream("192.168.132.101", 7777)
.map(value ->{
String[] datas = value.split(",");
return new WaterSensor(datas[0],Long.parseLong(datas[1]),Integer.parseInt(datas[2]) );
} );
KeyedStream<WaterSensor, String> keyedStream = dataStreamSource.keyBy(value -> value.getId());
WindowedStream<WaterSensor, String, TimeWindow> window = keyedStream.window(TumblingProcessingTimeWindows.of(Time.seconds(10)));
SingleOutputStreamOperator<String> outputStreamOperator = window.aggregate(new MyAgg(), new MyProcess());
outputStreamOperator.print();
env.execute();
}
public static class MyAgg implements AggregateFunction<WaterSensor, Integer, String>{
@Override
public Integer createAccumulator() {
System.out.println("初始化累加器");
return 0;
}
@Override
public Integer add(WaterSensor value, Integer accumulator) {
System.out.println("调用add");
return value.vc + accumulator;
}
@Override
public String getResult(Integer accumulator) {
System.out.println("输出结果");
return accumulator.toString();
}
@Override
public Integer merge(Integer a, Integer b) {
//只有会话窗口才用
return null;
}
}
public static class MyProcess extends ProcessWindowFunction<String, String, String, TimeWindow> {
@Override
public void process(String key, Context context, Iterable<String> elements, Collector<String> out) throws Exception {
long start = context.window().getStart();
long end = context.window().getEnd();
String winStart = DateFormatUtils.format(start, "yyyy-MM-dd HH:mm:ss.SSS");
String winEnd = DateFormatUtils.format(end, "yyyy-MM-dd HH:mm:ss.SSS");
long l = elements.spliterator().estimateSize();
out.collect("key=" + key + "的窗口[" + winStart + "," + winEnd + "]包含" + l + "条数据" + elements.toString());
}
}
}
10.5、小结
触发器、移除器:现成的几个窗口都有默认的实现,一般不需要定义
以时间滚动窗口为例:
-
窗口什么时候触发输出:时间进展>=窗口的最大时间戳(end-1ms)
-
窗口是怎么划分的:start=取窗口长度的整数倍,向下取整,end=start+窗口长度,窗口左闭右开[start,end)
-
窗口生命周期:
创建:属于本窗口的第一条数据来的时候现new的,放入一个singleton单例的集合中;
销毁(关窗):时间进展>=窗口的最大时间戳(end-1ms)+允许迟到时间(默认为0)