Flink 学习三 Flink 流 process function API

news2024/11/25 4:25:47

Flink 学习三 Flink 流&process function API

1.Flink 多流操作

1.1.split 分流 (deprecated)

把一个数据流根据数据分成多个数据流 1.2 版本后移除

1.2.分流操作 (使用侧流输出)

public class _02_SplitStream {

    public static void main(String[] args) throws Exception {

        // 获取环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        DataStreamSource<Integer> streamSource = env.fromElements(1, 2, 3, 4, 5);
        SingleOutputStreamOperator<Integer> processed = streamSource.process(new ProcessFunction<Integer, Integer>() {
            /**
             *
             * @param value 输出的数据
             * @param ctx A 上下文
             * @param out 主要流输出器
             * @throws Exception
             */
            @Override
            public void processElement(Integer value, ProcessFunction<Integer, Integer>.Context ctx,
                                       Collector<Integer> out) throws Exception {
                if (value % 3 == 0) {
                    //测流数据
                    ctx.output(new OutputTag<Integer>("3%0",TypeInformation.of(Integer.class)) , value);
                }if (value % 3 == 1) {
                    //测流数据
                    ctx.output(new OutputTag<Integer>("3%1",TypeInformation.of(Integer.class)) , value);
                }
                //主流 ,数据
                out.collect(value);
            }
        });

        DataStream<Integer> output0 = processed.getSideOutput(new OutputTag<>("3%0",TypeInformation.of(Integer.class)));
        DataStream<Integer> output1 = processed.getSideOutput(new OutputTag<>("3%1",TypeInformation.of(Integer.class)));
        output1.print();

        env.execute();
    }
}

1.3.connect

connect 连接 DataStream ,DataStream ==> ConnectedStream

两个DataStream 连接成一个新的ConnectedStream ,虽然两个流连接在一起,但是两个流依然是相互独立的,这个方法的最大用处是: 两个流共享State 状态

两个流在内部还是各自处理各自的逻辑 比如 CoMapFunction 内的map1,map2 还是各自处理 streamSource,streamSource2;

数据类型可以不一致

public class _03_ConnectedStream {

    public static void main(String[] args) throws Exception {

        // 获取环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        DataStreamSource<Integer> streamSource = env.fromElements(1, 2, 3, 4, 5);
        DataStreamSource<Integer> streamSource2 = env.fromElements(10, 20, 30, 40, 50);

        ConnectedStreams<Integer, Integer> connected = streamSource.connect(streamSource2);

        // 原来的 MapFunction ==>  CoMapFunction  ; flatMap ==> CoMapFunction
        SingleOutputStreamOperator<Object> mapped = connected.map(new CoMapFunction<Integer, Integer, Object>() {
            @Override
            public Object map1(Integer value) throws Exception {
                return value + 1;
            }

            @Override
            public Object map2(Integer value) throws Exception {
                return value * 10;
            }
        });

        mapped.print();

        env.execute();
    }
}

------------------------------------------------------------------
  --------------------         --------------------    
    streamSource         --->         map1  
  --------------------         --------------------

  --------------------         --------------------    
    streamSource2       --->          map2  
  --------------------         -------------------- 
------------------------------------------------------------------    

1.4.union

可以合并多个流,流数据类型必须一致,


public class _04_UnionStream {

    public static void main(String[] args) throws Exception {

        // 获取环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        DataStreamSource<Integer> streamSource = env.fromElements(1, 2, 3, 4, 5);
        DataStreamSource<Integer> streamSource2 = env.fromElements(10, 20, 30, 40, 50,80,1110);
        DataStream<Integer> unioned = streamSource.union(streamSource2);
        SingleOutputStreamOperator<String> union = unioned.map(new MapFunction<Integer, String>() {
            @Override
            public String map(Integer value) throws Exception {
                return "union" + value;
            }
        });
        union.print();
        env.execute();
    }
}


------------------------------------------------------------------
  --------------------          
    streamSource               
  --------------------              --------------------
						=====>        map
  --------------------              --------------------    
    streamSource2               
  --------------------         
------------------------------------------------------------------    

1.5.coGroup

coGroup 本质上是join 算子的底层算子

有界流的思想去处理; 比如上说是时间窗口: 5S内数据分组匹配

        <左边流>.coGroup(<右边流>)
                .where(<KeySelector>)
                .equalTo(<KeySelector>)
                .window(<窗口>)
                .apply(<处理逻辑>)

在这里插入图片描述

数据组比如说是时间窗口是5或者是10s 为一批数据, 时间窗口内的数据完成后,根据 where,和 equalTo 选择的key 数据一致 来分组

public class _05_CoGroupStream {

    public static void main(String[] args) throws Exception {

        // 获取环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStream<Person> name_idCard = env.socketTextStream("192.168.141.131", 8888).map(x -> {
            Person person = new Person();
            person.setName(x.split(",")[0]);
            person.setIdCard(x.split(",")[1]);
            return person;
        }).returns(TypeInformation.of(Person.class)).name("==idCard==");
        //name_idCard.print();

        DataStream<Person> name_addr = env.socketTextStream("192.168.141.131", 7777).map(x -> {
            Person person = new Person();
            person.setName(x.split(",")[0]);
            person.setAddr(x.split(",")[1]);
            return person;
        }).returns(TypeInformation.of(Person.class)).name("==addr==");
        //name_addr.print();

        DataStream<Person> dataStream = name_idCard.coGroup(name_addr)
                // 左边流的key
                .where(new KeySelector<Person, Object>() {
                    @Override
                    public Object getKey(Person value) throws Exception {
                        return value.getName();
                    }
                })
                // 右边流的key
                .equalTo(new KeySelector<Person, Object>() {
                    @Override
                    public Object getKey(Person value) throws Exception {
                        return value.getName();
                    }
                })
                //时间窗口
                .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
                //处理逻辑  左边 Person ,右边  Person ,输出 Person
                .apply(new CoGroupFunction<Person, Person, Person>() {
                    /**
                     * first 协调组第一个流个数据
                     * second 协调组第二个流数据
                     */
                    @Override
                    public void coGroup(Iterable<Person> first, Iterable<Person> second, Collector<Person> out) throws Exception {
                        //左连接实现
                        Iterator<Person> iterator = first.iterator();
                        while (iterator.hasNext()) {
                            Person next1 = iterator.next();
                            Iterator<Person> iterator1 = second.iterator();
                            Boolean noDataFlag = true;
                            while (iterator1.hasNext()) {
                                Person result = new Person(next1);
                                Person next = iterator1.next();
                                result.setAddr(next.getAddr());
                                out.collect(result);
                                noDataFlag = false;
                            }
                            if (noDataFlag) {
                                out.collect(next1);
                            }
                        }
                    }
                });

        dataStream.print();

        env.execute();
    }
}

1.6. join 关联操作

用于关联两个流,需要指定join 条件;需要在窗口中进行关联后的计算逻辑

join 使用coGroup 实现的

public class _06_JoinStream {

    public static void main(String[] args) throws Exception {

        // 获取环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        //Perple 数据打平为Tuple  name,idCard,addr
        DataStream<Tuple3<String, String,String>> name_idCard = env.socketTextStream("192.168.141.131", 8888).map(x -> {
            return Tuple3.of(x.split(",")[0],x.split(",")[1],"");
        }).returns(TypeInformation.of(new TypeHint<Tuple3<String, String,String>>() {
        })) ;

        DataStream<Tuple3<String, String,String>> name_addr = env.socketTextStream("192.168.141.131", 7777).map(x -> {
            return Tuple3.of(x.split(",")[0],"",x.split(",")[1]);
        }) .returns(TypeInformation.of(new TypeHint<Tuple3<String, String,String>>() {
        }));
        //name_addr.print();

        DataStream<Tuple3<String, String,String>> dataStream = name_idCard.join(name_addr)
                // 左边流的f0 字段
                .where(tp3->tp3.f0)
                // 右边流的f0 字段
                .equalTo(tp3->tp3.f0)
                //时间窗口
                .window(TumblingProcessingTimeWindows.of(Time.seconds(20)))
                //处理逻辑  左边 Person ,右边  Person ,输出 Person
                .apply(new JoinFunction<Tuple3<String, String,String>, Tuple3<String, String,String>, Tuple3<String, String,String>>() {
                    /**
                     * @param first 匹配到的数据  first input.
                     * @param second 匹配到的数据 second input.
                     * @return
                     * @throws Exception
                     */
                    @Override
                    public Tuple3 join(Tuple3 first, Tuple3 second) throws Exception {
                        return Tuple3.of(first.f0,first.f1,second.f2);
                    }
                });

        dataStream.print();
        env.execute();
    }
}

1.7.broadcast

   datastream1: 用户id|行为|操作数据                   datastream2: 用户id|用户name|用户phone   
windows time1 ---------------------------------- 	---------------------------------
				12  |click| xxdssd						12  |aa| 131	
				13  |click| dasd             			 13  |cc| 1331					
				14  |click| ad    						14  |dd| 1321	
windows time2 ---------------------------------- 	---------------------------------
				12  |click| sfs          															
				13  |click| sdfs       
				15  |click| ghf     					17  |dd| 1321											
windows time3 ----------------------------------  	---------------------------------
				14  |click| ghf   
				17  |click| ghf 												
       
       注: 左边流数据是基础数据,使用 join不合适 ,适合 broadcast
           broadcast 适用于关联字典表 
  
       主流算子		<<<----------------------------------	广播状态				
       
       

public class _07_BroadcastStream {

	public static void main(String[] args) throws Exception {

		// 获取环境
		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

		// 数据打平为 用户id|行为|操作数据
		DataStream<Tuple3<String, String, String>> operationInfo = env.socketTextStream("192.168.141.131", 8888)
				.map(x -> {
					return Tuple3.of(x.split(",")[0], x.split(",")[1], x.split(",")[2]);
				}).returns(TypeInformation.of(new TypeHint<Tuple3<String, String, String>>() {
				}));

		// 数据打平为 用户id|用户name|用户phone
		DataStream<Tuple3<String, String, String>> baseInfo = env.socketTextStream("192.168.141.131", 7777).map(x -> {
			return Tuple3.of(x.split(",")[0], x.split(",")[1], x.split(",")[2]);
		}).returns(TypeInformation.of(new TypeHint<Tuple3<String, String, String>>() {
		}));

        //状态描述
		MapStateDescriptor<String, Tuple3<String, String, String>> userBaseInfoStateDesc = new MapStateDescriptor<>(
				"user base info", TypeInformation.of(String.class),
				TypeInformation.of(new TypeHint<Tuple3<String, String, String>>() {
				}));
		// 基础信息 变成广播流
		BroadcastStream<Tuple3<String, String, String>> userBaseInfoBroadcast = baseInfo
				.broadcast(userBaseInfoStateDesc);

		// 关联行为流和广播流
		BroadcastConnectedStream<Tuple3<String, String, String>, Tuple3<String, String, String>> connected = operationInfo
				.connect(userBaseInfoBroadcast);

		SingleOutputStreamOperator<Tuple5<String, String, String, String, String>> processed =
				// 连接后,处理的逻辑
				// connected 如果是keyedStream ===> 参数就是 KeyedBroadcastProcessFunction
				// connected 如果不是keyedStream ===> 参数就是 BroadcastProcessFunction
				connected.process(new BroadcastProcessFunction<Tuple3<String, String, String>, // 左流的数据
						Tuple3<String, String, String>, // 广播的类型
						Tuple5<String, String, String, String, String> // 返回数据类型
				>() {

					/**
					 * 此方法是处理主流方法 主流来一条处理一下
					 * 
					 * @throws Exception
					 */
					@Override
					public void processElement(Tuple3<String, String, String> value, // 左流 主流 数据
							BroadcastProcessFunction<Tuple3<String, String, String>, Tuple3<String, String, String>, Tuple5<String, String, String, String, String>>.ReadOnlyContext ctx, // 上下文
							Collector<Tuple5<String, String, String, String, String>> out // 输出器
					) throws Exception {
						// 基础数据还没有 broadcastStateReadOnly
						// 和 processBroadcastElement 里面获取的 broadcastState 数据一致,只是是只读的
						// 数据是一致的
						ReadOnlyBroadcastState<String, Tuple3<String, String, String>> broadcastStateReadOnly = ctx
								.getBroadcastState(userBaseInfoStateDesc);
						if (broadcastStateReadOnly == null) {
							out.collect(Tuple5.of(value.f0, value.f1, value.f2, null, null));
						} else {
							Tuple3<String, String, String> baseInfo = broadcastStateReadOnly.get(value.f0);
							// 基础数据为空
							if (baseInfo == null) {
								out.collect(Tuple5.of(value.f0, value.f1, value.f2, null, null));
							} else {
								out.collect(Tuple5.of(value.f0, value.f1, value.f2, baseInfo.f1, baseInfo.f2));
							}
						}
					}

					/**
					 *
					 * 处理广播流数据:拿到数据后,存到状态里面
					 */
					@Override
					public void processBroadcastElement(Tuple3<String, String, String> value, // 广播流里面的一条数据
							BroadcastProcessFunction<Tuple3<String, String, String>, Tuple3<String, String, String>, Tuple5<String, String, String, String, String>>.Context ctx, // 上下文
							Collector<Tuple5<String, String, String, String, String>> out // 输出器
					) throws Exception {
						// 上下文 里面获取状态
						BroadcastState<String, Tuple3<String, String, String>> broadcastState = ctx
								.getBroadcastState(userBaseInfoStateDesc);
                          //状态里面 以用户id 作为key , 基础信息为value
						broadcastState.put(value.f0, value);
					}
				});

		processed.print();

		env.execute();
	}
}

2.Flink 编程 process function

2.1 process function 简介

process function相对于前面的map , flatmap ,filter 的区别就是,对数据的处理有更大的自由度; 可以获取到数据的上下文,数据处理逻辑 ,如何控制返回等交给编写者;

在事件驱动的应用中,使用最频繁的api 就是process function

注: 在对不同的流的时候, process function 的类型也不一致

数据流的转换

在这里插入图片描述

不同的DataStream 的process 处理方法需要的参数类型有如下几种

2.2 ProcessFunction


public class _01_ProcessFunction {

	public static void main(String[] args) throws Exception {

		// 获取环境
		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

		// 数据打平为 用户id|行为|操作数据
		DataStreamSource<String> streamSource = env.fromElements("1,click,data1", "2,click1,data2", "10,flow,data1",
				"22,doubleclick,data22");
		DataStream<Tuple3<String, String, String>> operationInfo = streamSource.map(x -> {
			return Tuple3.of(x.split(",")[0], x.split(",")[1], x.split(",")[2]);
		}).returns(TypeInformation.of(new TypeHint<Tuple3<String, String, String>>() {
		}));

		// ProcessFunction
		SingleOutputStreamOperator<String> processed = operationInfo
				.process(new ProcessFunction<Tuple3<String, String, String>, String>() {
					// 处理元素
					@Override
					public void processElement(Tuple3<String, String, String> value,
							ProcessFunction<Tuple3<String, String, String>, String>.Context ctx, Collector<String> out)
							throws Exception {
						// 可以做主流输出
						out.collect(value.f0 + value.f1 + value.f2);
						// 可以做侧流输出
						ctx.output(new OutputTag<Tuple3<String, String, String>>("adasd",
								TypeInformation.of(new TypeHint<Tuple3<String, String, String>>() {
								})), value);
					}

					// 其余 声明周期方法 ... 任务状态 ... 都可以获取
					@Override
					public void open(Configuration parameters) throws Exception {
						super.open(parameters);
					}
				});

		processed.print();

		env.execute();
	}

}

2.3 KeyedProcessFunction

public class _02_KeyedProcessFunction {

	public static void main(String[] args) throws Exception {

		// 获取环境
		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

		// 数据打平为 用户id|行为|操作数据
		DataStreamSource<String> streamSource = env.fromElements("1,click,data1", "2,click1,data2", "10,flow,data1",
				"22,doubleclick,data22", "2,doubleclick,data22");
		DataStream<Tuple3<String, String, String>> operationInfo = streamSource.map(x -> {
			return Tuple3.of(x.split(",")[0], x.split(",")[1], x.split(",")[2]);
		}).returns(TypeInformation.of(new TypeHint<Tuple3<String, String, String>>() {
		}));

		// keyedStream
		KeyedStream<Tuple3<String, String, String>, String> keyedStream = operationInfo.keyBy(tp3 -> tp3.f0);

		// ProcessFunction
		SingleOutputStreamOperator<String> processed = keyedStream
				.process(new ProcessFunction<Tuple3<String, String, String>, String>() {
					@Override
					public void processElement(Tuple3<String, String, String> value,
							ProcessFunction<Tuple3<String, String, String>, String>.Context ctx, Collector<String> out)
							throws Exception {
                        out.collect((value.f0 + value.f1 + value.f2).toUpperCase(Locale.ROOT));
					}
				});

        processed.print();

		env.execute();
	}

}

2.4 ProcessWindowFunction

2.5 ProcessAllWindowFunction

2.6 CoProcessFunction

2.7 ProcessJoinFunction

2.8 BroadcastProcessFunction

参考1.7

2.9 KeyedBroadcastProcessFunction

3.测试

package demo.sff.flink.exercise;

import demo.sff.flink.source.Person;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.state.BroadcastState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ReadOnlyBroadcastState;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.tuple.Tuple5;
import org.apache.flink.connector.file.sink.FileSink;
import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
import org.apache.flink.connector.jdbc.JdbcSink;
import org.apache.flink.connector.jdbc.JdbcStatementBuilder;
import org.apache.flink.core.fs.Path;
import org.apache.flink.formats.parquet.ParquetWriterFactory;
import org.apache.flink.formats.parquet.avro.ParquetAvroWriters;
import org.apache.flink.streaming.api.datastream.*;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig;
import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.DateTimeBucketAssigner;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;

import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.Random;

/**
 * 创建流 Stream 1: id | event | count 1,event1,3 2,event1,5 3,event1,4
 *
 * Stream 2: id | gender | city 1 , male ,beijin 2 ,female,shanghai
 *
 * 需求 : 1.Stream 1 按照 count字段展开为对应的个数 比如id=1 展开为3条 1,event1,随机1 1,event1,随机2
 * 1,event1,随机3 ,id=2 展开为5 条
 *
 * 2.Stream 1 关联上 Stream 2 数据
 *
 * 3.关联不上 测流 其余主流
 *
 * 4.主流,性别分组,取出最大随机数
 *
 * 5.主流写入mysql
 *
 * 6.测流写入parquet
 */
public class Test1 {

	public static void main(String[] args) throws Exception {
		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
		env.setParallelism(1);

		// 创建流 后面可以使用socket 替换 stream2 先写入广播 不然关联不上
		DataStreamSource<String> stream1 = env.fromElements("1,event1,3", "2,event1,5", "3,event3,4");
		DataStreamSource<String> stream2 = env.fromElements("1,male,beijin", " 2,female,shanghai");

		DataStream<Tuple3<String, String, String>> streamOperator1 = stream1
				.map(x -> Tuple3.of(x.split(",")[0], x.split(",")[1], x.split(",")[2]))
				.returns(TypeInformation.of(new TypeHint<Tuple3<String, String, String>>() {
				}));
		DataStream<Tuple3<String, String, String>> streamOperator2 = stream2
				.map(x -> Tuple3.of(x.split(",")[0], x.split(",")[1], x.split(",")[2]))
				.returns(TypeInformation.of(new TypeHint<Tuple3<String, String, String>>() {
				}));

		// 需求1
		DataStream<Tuple3<String, String, String>> mapDataStream = streamOperator1
				.flatMap(new FlatMapFunction<Tuple3<String, String, String>, Tuple3<String, String, String>>() {
					@Override
					public void flatMap(Tuple3<String, String, String> value,
							Collector<Tuple3<String, String, String>> out) throws Exception {
						Integer integer = Integer.valueOf(value.f2);
						for (Integer i = 0; i < integer; i++) {
							int r = new Random().nextInt(100);
							out.collect(Tuple3.of(value.f0, value.f1, r + ""));
						}
					}
				}).returns(TypeInformation.of(new TypeHint<Tuple3<String, String, String>>() {
				}));

		// mapDataStream.print();

		// 需求2 stream2 数据广播
		MapStateDescriptor<String, Tuple3<String, String, String>> descriptor = new MapStateDescriptor<String, Tuple3<String, String, String>>(
				"userinfo", TypeInformation.of(String.class),
				TypeInformation.of(new TypeHint<Tuple3<String, String, String>>() {
				}));
		BroadcastStream<Tuple3<String, String, String>> tuple3BroadcastStream = streamOperator2.broadcast(descriptor);

		BroadcastConnectedStream<Tuple3<String, String, String>, Tuple3<String, String, String>> tuple3BroadcastConnectedStream = mapDataStream
				.connect(tuple3BroadcastStream);

		SingleOutputStreamOperator<Tuple5<String, String, String, String, String>> processed = tuple3BroadcastConnectedStream
				.process(
						new BroadcastProcessFunction<Tuple3<String, String, String>, Tuple3<String, String, String>, Tuple5<String, String, String, String, String>>() {

							@Override
							public void processElement(Tuple3<String, String, String> value,
									BroadcastProcessFunction<Tuple3<String, String, String>, Tuple3<String, String, String>, Tuple5<String, String, String, String, String>>.ReadOnlyContext ctx,
									Collector<Tuple5<String, String, String, String, String>> out) throws Exception {
								ReadOnlyBroadcastState<String, Tuple3<String, String, String>> broadcastState = ctx
										.getBroadcastState(descriptor);
								// 需求3.关联不上 测流 其余主流
								if (broadcastState == null) {
									// out.collect(Tuple5.of(value.f0, value.f1, value.f2, null, null));
									ctx.output(new OutputTag<String>("nojoin", TypeInformation.of(String.class)),
											value.f0 + value.f1 + value.f2);
								} else {
									Tuple3<String, String, String> stringTuple3 = broadcastState.get(value.f0);
									if (stringTuple3 == null) {
										// out.collect(Tuple5.of(value.f0, value.f1, value.f2, null, null));
										ctx.output(new OutputTag<String>("nojoin", TypeInformation.of(String.class)),
												value.f0 + value.f1 + value.f2);
									} else {
										out.collect(Tuple5.of(value.f0, value.f1, value.f2, stringTuple3.f1,
												stringTuple3.f2));
									}
								}
							}

							@Override
							public void processBroadcastElement(Tuple3<String, String, String> value,
									BroadcastProcessFunction<Tuple3<String, String, String>, Tuple3<String, String, String>, Tuple5<String, String, String, String, String>>.Context ctx,
									Collector<Tuple5<String, String, String, String, String>> out) throws Exception {
								BroadcastState<String, Tuple3<String, String, String>> broadcastState = ctx
										.getBroadcastState(descriptor);
								broadcastState.put(value.f0, value);

							}
						})
				.returns(TypeInformation.of(new TypeHint<Tuple5<String, String, String, String, String>>() {
				}));
		// 主流
		processed.print();
		// 测流
		DataStream<String> sideOutput = processed
				.getSideOutput(new OutputTag<String>("nojoin", TypeInformation.of(String.class)));

		// sideOutput.print();

		// 需求4 主流,性别分组,取出最大随机数
		SingleOutputStreamOperator<Tuple5<String, String, Integer, String, String>> streamOperator = processed
				.keyBy(x -> x.f3)
				.map(new MapFunction<Tuple5<String, String, String, String, String>, Tuple5<String, String, Integer, String, String>>() {
					@Override
					public Tuple5<String, String, Integer, String, String> map(
							Tuple5<String, String, String, String, String> value) throws Exception {
						return Tuple5.of(value.f0, value.f1, Integer.valueOf(value.f2), value.f3, value.f4);
					}
				}).returns(TypeInformation.of(new TypeHint<Tuple5<String, String, Integer, String, String>>() {
				}));
		SingleOutputStreamOperator<Tuple5<String, String, Integer, String, String>> maxBy = streamOperator
				.keyBy(tp5 -> tp5.f3).maxBy(2);
		maxBy.print();

		// 5.主流写入mysql  未验证 待测试
		String sql = " insert into testa values (?,?,?,?,?) on duplicate key a=?,b=?,c=?,d=?,e=?  ";
		SinkFunction<Tuple5<String, String, Integer, String, String>> jdbcSink = JdbcSink.sink(sql,
				new JdbcStatementBuilder<Tuple5<String, String, Integer, String, String>>() {
					@Override
					public void accept(PreparedStatement preparedStatement,
							Tuple5<String, String, Integer, String, String> tuple5) throws SQLException {
						preparedStatement.setString(0, tuple5.f0);
						preparedStatement.setString(1, tuple5.f1);
						preparedStatement.setInt(2, tuple5.f2);
						preparedStatement.setString(3, tuple5.f3);
						preparedStatement.setString(4, tuple5.f4);
						preparedStatement.setString(5, tuple5.f0);
						preparedStatement.setString(6, tuple5.f1);
						preparedStatement.setInt(7, tuple5.f2);
						preparedStatement.setString(8, tuple5.f3);
						preparedStatement.setString(9, tuple5.f4);
					}
				}, JdbcExecutionOptions.builder().withBatchSize(2) // 两条数据一批插入
						.withMaxRetries(3) // 失败插入重试次数
						.build(),
				new JdbcConnectionOptions.JdbcConnectionOptionsBuilder().withPassword("root") // jdbc 连接信息
						.withUsername("root")// jdbc 连接信息
						.withUrl("jdbc:mysql://192.168.141.131:3306/flinkdemo").build());
		streamOperator.addSink(jdbcSink);

		// 6.测流写入parquet  未验证 待测试
		ParquetWriterFactory<String> writerFactory = ParquetAvroWriters.forReflectRecord(String.class);
		FileSink<String> build = FileSink.forBulkFormat(new Path("d:/sink"), writerFactory)
				.withBucketAssigner(new DateTimeBucketAssigner<String>()) // 文件分桶策略
				.withBucketCheckInterval(5)// 文件夹异步线程创建和检测周期
				.withOutputFileConfig(OutputFileConfig.builder().withPartPrefix("flinkdemo") // 文件前缀
						.withPartSuffix(".txt") // 文件后缀
						.build())// 文件的输出格式对象
				.build();

		sideOutput.sinkTo(build);

		env.execute();

	}

}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/668133.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Python接口自动化之登录接口测试

01 什么是接口&#xff1f; 接口&#xff1a;检测外部系统与系统之间以及内部各个子系统之间的交互点。 通俗来说&#xff0c;接口就是连接前后端的桥梁&#xff0c;接口测试可以简单理解为脱离了前端的功能测试。一个又一个的接口就对应功能测试内一个又一个的功能。但注意&am…

前端vue入门(纯代码)10

【10.TodoList-自定义事件】 TodoList案例的完整代码请点击此处粉色文字 TodoList案例中的子组件TodoHeader给父组件App传递数据 App.vue文件中需要修改的代码 原本&#xff1a; Todo案例中子给父传递数据【通信】的方法&#xff1a;props <!-- 把App组件里的方法addTodo(…

2023 Nature 健康系统规模的语言模型是通用预测引擎

文章目录 一、论文关键信息二、论文主要内容三、总结与讨论🍉 CSDN 叶庭云:https://yetingyun.blog.csdn.net/ 一、论文关键信息 论文标题:Health system-scale language models are all-purpose prediction engines 期刊信息:2023 Nature 论文地址:h

Armbian 23.05(代号Suni)操作系统已全面上市

Armbian社区通知我们&#xff0c;适用于ARM和RISC-V单板计算机以及其他平台的Armbian 23.05&#xff08;代号Suni&#xff09;操作系统已全面上市。 在Armbian 23.02发布三个月后&#xff0c;Armbian 23.05版本是第一个在完全重构的构建框架上创建的版本&#xff0c;基于即将发…

Flink 学习八 Flink 容错机制 checkpoint savepoint

Flink 学习八 Flink 容错机制 & checkpoint & savepoint https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/concepts/stateful-stream-processing/ 1.容错基础概念 上一节讲述状态后端;Flink是一个 带状态stateful 的数据处理系统,在处理数据的过程…

基于深度学习的高精度蜜蜂检测识别系统(PyTorch+Pyside6+YOLOv5模型)

摘要&#xff1a;基于深度学习的高精度蜜蜂检测识别系统可用于日常生活中或野外来检测与定位蜜蜂目标&#xff0c;利用深度学习算法可实现图片、视频、摄像头等方式的蜜蜂目标检测识别&#xff0c;另外支持结果可视化与图片或视频检测结果的导出。本系统采用YOLOv5目标检测模型…

VUE 2X ClassStyle ⑦

目录 文章有误请指正&#xff0c;如果觉得对你有用&#xff0c;请点三连一波&#xff0c;蟹蟹支持✨ V u e j s Vuejs Vuejs C l a s s Class Class与 S t y l e Style Style绑定总结 文章有误请指正&#xff0c;如果觉得对你有用&#xff0c;请点三连一波&#xff0c;蟹蟹支持…

初始java String类型

文章目录 初始java String类型理解 next和nextLine的区别new String(); 括号里面可以放什么呢放byte类型的数组放byte类型的数组&#xff0c;索引&#xff0c;长度放char类型的数组放char类型的数组&#xff0c;索引&#xff0c;长度 String 类型对应同一字符串&#xff0c;是否…

boost 异步服务器开发

目录 1、 异步服务器简介 2、异步服务器开发 2.1 会话类 2.1.1 会话类头文件 2.1.2 会话类源文件 2.2 服务类 2.2.1 服务类头文件 2.2.2 服务类源文件 2.3 主函数 3、异步服务器测试 4、当前异步服务器存在的问题及后续优化 1、 异步服务器简介 boost 异步服务器分为…

【Pytest实战】Pytest 如何生成优美的测试报告(allure-pytest)

&#x1f604;作者简介&#xff1a; 小曾同学.com,一个致力于测试开发的博主⛽️&#xff0c;主要职责&#xff1a;测试开发、CI/CD 如果文章知识点有错误的地方&#xff0c;还请大家指正&#xff0c;让我们一起学习&#xff0c;一起进步。&#x1f60a; 座右铭&#xff1a;不想…

施耐德电气:以数字化利器,助力中国产业“双转型”

近日&#xff0c;以“创新融生态&#xff0c;加速双转型”为主题的2023施耐德电气创新峰会在乌镇圆满举行。大会上&#xff0c;数千位行业专家、业界领袖和专业人士共聚一堂&#xff0c;共同探讨中国产业如何迈向“数字化”和“绿色低碳”的双转型&#xff0c;旨在为中国产业的…

SpringBoot构造流程源码分析------阶段一

SpringApplication的初始化简介 在入口类主要通过SpringApplication的静态方法–run方法进行SpringApplication类的实例化操作&#xff0c;然后再针对实例化对象调用另一个run方法完成整个项目的初始化和启动。本章节重点围绕此过程的前半部分&#xff08;即SpringApplication…

嵌入式系统开发复习指北

【嵌入式系统】20计科3-4班 第1讲 文件IO操作测试 【嵌入式系统】20计科3-4班 第2讲第4讲进程控制与线程测试 【嵌入式系统】20计科3-4班 第3讲进程通信测试 【嵌入式系统】20计科3-4班 第5-6讲内核和BootLoader开发测试 【嵌入式系统】20计科3-4班 第7讲驱动程序开发测试 大题…

PCB设计系列分享-高速ADC布局布线技巧

目录 概要 整体架构流程 技术名词解释 技术细节 1.裸露焊盘 2.最佳连接 3.去耦和层电容 4.PDS的高频层电容 5.分离接地 小结 概要 在当今的工业领域&#xff0c;系统电路板布局已成为设计本身的一个组成部分。因此&#xff0c;设计工程师必须了解影响高速信号链设计性能的机制。…

【操作系统】期末复习汇总最全版本!电子科技大学2023期末考试

操作系统 【考后感悟】本次考试考察了&#xff1a;操作系统的4大特征、线程和进程的区别、页表与页的基本地址变换机构、磁盘调度算法、银行家算法、调度算法&#xff08;短作业优先、时间片轮转&#xff09;、Linux的一些基本知识、shell读程序题以及PV操作编程。知识点基本涵…

目标检测经典工作发展(超详细对比):R-CNN vs SPPNet vs Fast R-CNN vs Faster R-CNN

序 网上关于两阶段目标检测&#xff08;two-stage object detection&#xff09;的几个经典工作R-CNN&#xff0c;SPPNet&#xff0c;Fast R-CNN&#xff0c;Faster R-CNN的发展&#xff0c;各自的优缺点缺乏一个比较清楚的描述&#xff0c;大部分文章讲的比较细节&#xff0c…

代码随想录算法训练营第四十一天| 背包问题

标准背包问题 有n件物品和一个最多能背重量为w 的背包。 第i件物品的重量是weight[i]&#xff0c;得到的价值是value[i] 。每件物品只能用一次&#xff0c;求解将哪些物品装入背包里物品价值总和最大。 举一个例子&#xff1a; 背包最大重量为4。 物品为&#xff1a; 重量价…

c++之qt学习 基本介绍 界面设计 串口

这里写目录标题 qt基类介绍qt不同版本qt下载打开qt creater制作简单qt界面ui界面点击forms&#xff0c;双击ui文件&#xff0c;就可以进入ui编辑器 qt信号和槽给界面增加图片界面布局布局不会影响代码 界面切换更改代码验证账号密码 qt的三驾马车串口助手为下拉框加入属性信息串…

力扣动态规划专题(四)劫舍问题与股票问题 打家劫舍Ⅰ Ⅱ Ⅲ 买卖股票最佳时机Ⅰ Ⅱ Ⅲ IV 步骤及C++实现

文章目录 198. 打家劫舍213. 打家劫舍 II337. 打家劫舍 III121. 买卖股票的最佳时机动态规划贪心算法 122. 买卖股票的最佳时机 II动态规划贪心算法 123.买卖股票的最佳时机III188.买卖股票的最佳时机IV309.最佳买卖股票时机含冷冻期714.买卖股票的最佳时机含手续费 198. 打家劫…

为摸鱼助力:一份Vue3的生成式ElementPlus表单组件

目录 一、实现背景 二、简介 三、组织架构设计 四、实现方式 五、代码示例 六、示例代码效果预览 七、项目预览地址 & 项目源码地址 目前项目还有诸多待完善的地方&#xff0c;大家有好的想法、建议、意见等欢迎再次评论&#xff0c;或于github提交Issues 一、实现…