大数据之Flink(三)

news2024/11/15 15:48:43
9.3、转换算子
9.3.1、基本转换算子
9.3.1.1、映射map

一一映射

package transform;

import bean.WaterSensor;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

/**
 * @Title: MapDemo
 * @Author lizhe
 * @Package transform
 * @Date 2024/5/31 19:55
 * @description:
 */
public class MapDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        DataStreamSource<WaterSensor> sensorDataStreamSource = env.fromElements(
                new WaterSensor("s1", 1L, 1),
                new WaterSensor("s2", 2L, 2),
                new WaterSensor("s3", 3L, 3)
        );
        SingleOutputStreamOperator<Object> map = sensorDataStreamSource.map((v) -> {
            return v.getId();
        });
        map.print();
        env.execute();
    }
}

9.3.1.2、过滤

转换操作,对数据流进行过滤,通过布尔条件表达式设置过滤条件,true正常输出,false被过滤掉

package transform;

import bean.WaterSensor;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

/**
 * @Title: MapDemo
 * @Author lizhe
 * @Package transform
 * @Date 2024/5/31 19:55
 * @description:
 * s1数据:一进一出
 * s2数据:一进二出
 * s3数据:一进零出
 */
public class FilterDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        DataStreamSource<WaterSensor> sensorDataStreamSource = env.fromElements(
                new WaterSensor("s1", 1L, 1),
                new WaterSensor("s2", 2L, 2),
                new WaterSensor("s3", 3L, 3)
        );
        SingleOutputStreamOperator<WaterSensor> filter = sensorDataStreamSource.filter((v) -> {
            return "s1".equals(v.getId());
        });
        filter.print();
        env.execute();
    }
}

9.3.1.3、扁平映射flatMap

将数据流中的整体拆分成个体使用。消费一个元素可产生多个元素。(一进多出)flatMap为flatten和map的结合,即按照某种规则对数据进行打散拆分,再对拆分后的元素做转换处理。

package transform;

import bean.WaterSensor;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

/**
 * @Title: MapDemo
 * @Author lizhe
 * @Package transform
 * @Date 2024/5/31 19:55
 * @description:
 */
public class FlatMapDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        DataStreamSource<WaterSensor> sensorDataStreamSource = env.fromElements(
                new WaterSensor("s1", 1L, 11),
                new WaterSensor("s2", 2L, 22),
                new WaterSensor("s3", 3L, 3)
        );
        SingleOutputStreamOperator<String> flatmap = sensorDataStreamSource.flatMap(new FlatMapFunction<WaterSensor, String>() {
            @Override
            public void flatMap(WaterSensor value, Collector<String> out) throws Exception {
                if ("s1".equals(value.getId())) {
                    out.collect(String.valueOf(value.getVc()));
                } else if ("s2".equals(value.getId())) {
                    out.collect(String.valueOf(value.getTs()));
                    out.collect(String.valueOf(value.getVc()));
                }
            }
        });
        flatmap.print();
        env.execute();
    }
}

map使用的是return来控制一进一出,flatMap使用Collector,可调用多次采集器实现一进多出

9.3.1.4、聚合算子Aggregation

计算结果不仅依赖当前数据,还与之前的数据有关

  1. 按键分区keyby

    DataStream没有直接聚合的API。在flink中要聚合先进行可以不用keyby分区。keyby通过指定key将一条流划分成不同的分区,分区就是并行处理的子任务。

    package aggreagte;
    
    import bean.WaterSensor;
    import org.apache.flink.api.common.functions.FlatMapFunction;
    import org.apache.flink.api.java.functions.KeySelector;
    import org.apache.flink.streaming.api.datastream.DataStreamSource;
    import org.apache.flink.streaming.api.datastream.KeyedStream;
    import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.util.Collector;
    
    /**
     * @Title: MapDemo
     * @Author lizhe
     * @Package transform
     * @Date 2024/5/31 19:55
     * @description:
     * s1数据:一进一出
     * s2数据:一进二出
     * s3数据:一进零出
     */
    public class KeybyDemo {
        public static void main(String[] args) throws Exception {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.setParallelism(2);
            DataStreamSource<WaterSensor> sensorDataStreamSource = env.fromElements(
                    new WaterSensor("s1", 1L, 11),
                    new WaterSensor("s1",11L,11),
                    new WaterSensor("s2", 2L, 22),
                    new WaterSensor("s3", 3L, 3)
            );
            /*
            * 按照id分组
            * 返回一个键控流KeyedStream,keyBy不是算子
            * keyby分组与分区的关系:
            * 1)keyby对数据进行分组,保证相同key的数据在同一个分区
            * 2)分区:一个子任务可以理解为一个分区,一个分区(子任务)中可以存在多个分组(key)
            * */
                 KeyedStream<WaterSensor, String> keyBy = sensorDataStreamSource.keyBy(new KeySelector<WaterSensor, String>() {
                @Override
                public String getKey(WaterSensor value) throws Exception {
                    return value.getId();
                }
            });
            keyBy.print();
            env.execute();
        }
    }
    
    
  2. 简单聚合

    按键分区后可以进行聚合操作,基本的API包括:sum、min、max、minBy、maxBy。

    sum

    package aggreagte;
    
    import bean.WaterSensor;
    import org.apache.flink.api.java.functions.KeySelector;
    import org.apache.flink.streaming.api.datastream.DataStreamSource;
    import org.apache.flink.streaming.api.datastream.KeyedStream;
    import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    
    
    public class SimpleAggDemo {
        public static void main(String[] args) throws Exception {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.setParallelism(1);
            DataStreamSource<WaterSensor> sensorDataStreamSource = env.fromElements(
                    new WaterSensor("s1", 1L, 11),
                    new WaterSensor("s1", 11L, 11),
                    new WaterSensor("s2", 2L, 22),
                    new WaterSensor("s3", 3L, 3)
            );
            /*
             * 按照id分组
             * 返回一个键控流KeyedStream,keyBy不是算子
             * keyby分组与分区的关系:
             * 1)keyby对数据进行分组,保证相同key的数据在同一个分区
             * 2)分区:一个子任务可以理解为一个分区,一个分区(子任务)中可以存在多个分组(key)
             * */
            KeyedStream<WaterSensor, String> keyBy = sensorDataStreamSource.keyBy(new KeySelector<WaterSensor, String>() {
                @Override
                public String getKey(WaterSensor value) throws Exception {
                    return value.getId();
                }
            });
            //传位置索引适用于tuple类型,不适合pojo类型
    //        SingleOutputStreamOperator<WaterSensor> result = keyBy.sum(2);
            SingleOutputStreamOperator<WaterSensor> sum = keyBy.sum("vc");
            sum.print();
    
    //        SingleOutputStreamOperator<WaterSensor> max = keyBy.max("vc");
    //        max.print();
    
    //        SingleOutputStreamOperator<WaterSensor> maxBy = keyBy.maxBy("vc");
    //        maxBy.print();
            env.execute();
        }
    }
    
    

    max

    package aggreagte;
    
    import bean.WaterSensor;
    import org.apache.flink.api.java.functions.KeySelector;
    import org.apache.flink.streaming.api.datastream.DataStreamSource;
    import org.apache.flink.streaming.api.datastream.KeyedStream;
    import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    
    
    public class SimpleAggDemo {
        public static void main(String[] args) throws Exception {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.setParallelism(1);
            DataStreamSource<WaterSensor> sensorDataStreamSource = env.fromElements(
                    new WaterSensor("s1", 1L, 1),
                    new WaterSensor("s1", 11L, 11),
                    new WaterSensor("s2", 2L, 22),
                    new WaterSensor("s3", 3L, 3)
            );
            /*
             * 按照id分组
             * 返回一个键控流KeyedStream,keyBy不是算子
             * keyby分组与分区的关系:
             * 1)keyby对数据进行分组,保证相同key的数据在同一个分区
             * 2)分区:一个子任务可以理解为一个分区,一个分区(子任务)中可以存在多个分组(key)
             * */
            KeyedStream<WaterSensor, String> keyBy = sensorDataStreamSource.keyBy(new KeySelector<WaterSensor, String>() {
                @Override
                public String getKey(WaterSensor value) throws Exception {
                    return value.getId();
                }
            });
            //传位置索引适用于tuple类型,不适合pojo类型
    //        SingleOutputStreamOperator<WaterSensor> result = keyBy.sum(2);
    //        SingleOutputStreamOperator<WaterSensor> sum = keyBy.sum("vc");
    //        sum.print();
    
            SingleOutputStreamOperator<WaterSensor> max = keyBy.max("vc");
            max.print();
    
    //        SingleOutputStreamOperator<WaterSensor> maxBy = keyBy.maxBy("vc");
    //        maxBy.print();
            env.execute();
        }
    }
    
    

    输出结果:

    WaterSensor{id='s1', ts=1, vc=1}
    WaterSensor{id='s1', ts=1, vc=11}
    WaterSensor{id='s2', ts=2, vc=22}
    WaterSensor{id='s3', ts=3, vc=3}
    

    maxby

    package aggreagte;
    
    import bean.WaterSensor;
    import org.apache.flink.api.java.functions.KeySelector;
    import org.apache.flink.streaming.api.datastream.DataStreamSource;
    import org.apache.flink.streaming.api.datastream.KeyedStream;
    import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    
    
    public class SimpleAggDemo {
        public static void main(String[] args) throws Exception {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.setParallelism(1);
            DataStreamSource<WaterSensor> sensorDataStreamSource = env.fromElements(
                    new WaterSensor("s1", 1L, 1),
                    new WaterSensor("s1", 11L, 11),
                    new WaterSensor("s2", 2L, 22),
                    new WaterSensor("s3", 3L, 3)
            );
            /*
             * 按照id分组
             * 返回一个键控流KeyedStream,keyBy不是算子
             * keyby分组与分区的关系:
             * 1)keyby对数据进行分组,保证相同key的数据在同一个分区
             * 2)分区:一个子任务可以理解为一个分区,一个分区(子任务)中可以存在多个分组(key)
             * */
            KeyedStream<WaterSensor, String> keyBy = sensorDataStreamSource.keyBy(new KeySelector<WaterSensor, String>() {
                @Override
                public String getKey(WaterSensor value) throws Exception {
                    return value.getId();
                }
            });
            //传位置索引适用于tuple类型,不适合pojo类型
    //        SingleOutputStreamOperator<WaterSensor> result = keyBy.sum(2);
    //        SingleOutputStreamOperator<WaterSensor> sum = keyBy.sum("vc");
    //        sum.print();
    
    //        SingleOutputStreamOperator<WaterSensor> max = keyBy.max("vc");
    //        max.print();
    
            SingleOutputStreamOperator<WaterSensor> maxBy = keyBy.maxBy("vc");
            maxBy.print();
            env.execute();
        }
    }
    
    

    输出结果

    WaterSensor{id='s1', ts=1, vc=1}
    WaterSensor{id='s1', ts=11, vc=11}
    WaterSensor{id='s2', ts=2, vc=22}
    WaterSensor{id='s3', ts=3, vc=3}
    

    max与maxby对比(min与minby同理):

    max只会取比较字段的最大值,非比较字段保留第一次的值

    maxby会取比较字段最大值这个对象

  3. 规约函数Reduce

    reduce:两两聚合,每个key第一条数据直接存起来并输出,聚合的结果作为下一次的第一条数据

    package aggreagte;
    
    import bean.WaterSensor;
    import org.apache.flink.api.common.functions.ReduceFunction;
    import org.apache.flink.api.java.functions.KeySelector;
    import org.apache.flink.streaming.api.datastream.DataStreamSource;
    import org.apache.flink.streaming.api.datastream.KeyedStream;
    import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    
    
    public class ReduceDemo {
        public static void main(String[] args) throws Exception {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.setParallelism(1);
            DataStreamSource<WaterSensor> sensorDataStreamSource = env.fromElements(
                    new WaterSensor("s1", 1L, 1),
                    new WaterSensor("s1", 11L, 11),
                    new WaterSensor("s1", 22L, 22),
                    new WaterSensor("s2", 2L, 22),
                    new WaterSensor("s3", 3L, 3)
            );
            /*
             * 按照id分组
             * 返回一个键控流KeyedStream,keyBy不是算子
             * keyby分组与分区的关系:
             * 1)keyby对数据进行分组,保证相同key的数据在同一个分区
             * 2)分区:一个子任务可以理解为一个分区,一个分区(子任务)中可以存在多个分组(key)
             * */
            KeyedStream<WaterSensor, String> keyBy = sensorDataStreamSource.keyBy(new KeySelector<WaterSensor, String>() {
                @Override
                public String getKey(WaterSensor value) throws Exception {
                    return value.getId();
                }
            });
            SingleOutputStreamOperator<WaterSensor> reduce = keyBy.reduce(new ReduceFunction<WaterSensor>() {
                @Override
                public WaterSensor reduce(WaterSensor value1, WaterSensor value2) throws Exception {
                    System.out.println("value1="+value1);
                    System.out.println("value2="+value2);
                    return new WaterSensor(value1.id, value2.ts, value1.vc + value2.vc);
                }
            });
            reduce.print();
            env.execute();
        }
    }
    
9.3.1.5、自定义函数及分区

自定义函数分为:函数类、匿名函数、富函数类

物理分区即数据进入到多个线程中的哪个线程。常见分区策略:随机分配、轮询分配、重缩放、广播。

轮询(rebalance):一般一个source对应一个kafka的partition,如果partition数据源不均匀(数据倾斜),可通过轮询分配进行负载均衡。

缩放(rescale):实现轮询,局部组队,比rebalance高效。

广播(broadcast):下发到下游所有子任务

9.3.1.6、分流

将一条数据拆分成完全独立的两条或多条流。基于一个DataStream通过筛选条件将符合条件的数据放到对应的流里。
在这里插入图片描述
只要针对同一条流进行多次独立调用filter()方法进行筛选就可以得到拆分之后的流,但是效率较低,所有数据都要过滤多次

package split;

import bean.WaterSensor;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;


public class SplitByFilterDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        DataStreamSource<String> dataStreamSource = env.socketTextStream("192.168.132.101", 7777);
        dataStreamSource.filter(value -> Integer.parseInt(value)%2==0).print("偶数流");
        dataStreamSource.filter(value -> Integer.parseInt(value) % 2 == 1).print("奇数流");
        env.execute();
    }
}

使用侧输出流实现分流,可实现数据筛选、告警等

  1. 使用process算子
  2. 定义OutputTag对象
  3. 调用ctx.output
  4. 通过主流获取侧输出流
package split;

import bean.WaterSensor;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;

import java.lang.reflect.Type;


public class SideOutputDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        //如果是s1放到侧输出流s1中
        OutputTag<WaterSensor> s1 = new OutputTag<>("s1", Types.POJO(WaterSensor.class));
        //如果是s2放到侧输出流s2中
        OutputTag<WaterSensor> s2 = new OutputTag<>("s2", Types.POJO(WaterSensor.class));
        SingleOutputStreamOperator<WaterSensor> dataStreamSource = env.socketTextStream("192.168.132.101", 7777)
                .map(value ->{
                    String[] datas = value.split(",");
                    return new WaterSensor(datas[0],Long.parseLong(datas[1]),Integer.parseInt(datas[2]) );

                } );
        SingleOutputStreamOperator<WaterSensor> process = dataStreamSource.process(new ProcessFunction<WaterSensor, WaterSensor>() {
            @Override
            public void processElement(WaterSensor value, Context ctx, Collector<WaterSensor> out) throws Exception {
                String id = value.getId();
                if (id.equals("s1")) {


                    ctx.output(s1, value);
                } else if (id.equals("s2")) {

                    ctx.output(s2, value);
                } else {
                    //其他放到主流中
                    out.collect(value);
                }
            }
        });
        //打印主输出流
        process.print("主输出流");
        //打印侧输出流
        process.getSideOutput(s1).print("s1侧输出流");
        process.getSideOutput(s2).print("s2侧输出流");
        env.execute();
    }
}

9.3.1.7、合流

1、联合union

最简单的合流操作就是将多条流合到一起,要求流中的数据类型必须相同,合并后新流包括所有流的元素,数据类型不变,一次可以合并多条流。
在这里插入图片描述

package combineDemo;

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;


public class UnionDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        DataStreamSource<Integer> source1 = env.fromElements(1, 2, 3, 4, 5);
        DataStreamSource<Integer> source2 = env.fromElements(11, 22, 33, 44, 55);
        DataStreamSource<String> source3 = env.fromElements("111", "222","333","444","555");
        DataStream<Integer> union = source1.union(source2).union(source3.map(value -> Integer.parseInt(value)));
        union.print();
        env.execute();
    }
}

2、连接connect
为合并不同数据类型的数据flink提供connect合流操作。connect连接后得到的是ConnectedStream,形式上统一但内部内部各自数据形式不变,彼此之间相互独立。如要得到新的DataStream要使用“同处理”(co-process),如map、flatmap等,各自处理各自的。

connect一次只能连接两条流。

package combineDemo;

import org.apache.flink.streaming.api.datastream.ConnectedStreams;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.CoMapFunction;


public class ConnectDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        DataStreamSource<Integer> source1 = env.fromElements(1, 2, 3, 4, 5);
        DataStreamSource<String> source2 = env.fromElements("111", "222","333","444","555");
        ConnectedStreams<Integer, String> connect = source1.connect(source2);
        SingleOutputStreamOperator<String> map = connect.map(new CoMapFunction<Integer, String, String>() {
            @Override
            public String map1(Integer value) throws Exception {
                return value.toString();
            }

            @Override
            public String map2(String value) throws Exception {
                return value;
            }
        });
        map.print();
        env.execute();
    }
}

ConnectedStreams可以直接调用keyBy()进行按键分区得到的还是一个ConnectedStreams,通过keyBy()将两条流中key相同的数据放到了一起,然后针对来源的流再各自处理。(类似inner join)

package combineDemo;

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.ConnectedStreams;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.CoMapFunction;
import org.apache.flink.streaming.api.functions.co.CoProcessFunction;
import org.apache.flink.util.Collector;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

public class ConnectKeybyDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(2);
        DataStreamSource<Tuple2<Integer, String>> source1 = env.fromElements(
                Tuple2.of(1, "a1"),
                Tuple2.of(1, "a2"),
                Tuple2.of(2, "b"),
                Tuple2.of(3, "c")
        );
        DataStreamSource<Tuple3<Integer, String,Integer>> source2 = env.fromElements(
                Tuple3.of(1, "aa1",1),
                Tuple3.of(1, "aa2",2),
                Tuple3.of(2, "bb",1),
                Tuple3.of(3, "cc",1)
        );
        ConnectedStreams<Tuple2<Integer, String>, Tuple3<Integer, String, Integer>> connect = source1.connect(source2);
        //多并行度下,要根据关联条件进行keyby,才能保证key相同的数据在一个子任务(线程)里,这样才能匹配上
        ConnectedStreams<Tuple2<Integer, String>, Tuple3<Integer, String, Integer>> keyBy = connect.keyBy(s1 -> s1.f0, s2 -> s2.f0);
        SingleOutputStreamOperator<String> process = keyBy.process(new CoProcessFunction<Tuple2<Integer, String>, Tuple3<Integer, String, Integer>, String>() {
            //每条流定一个hashmap用来存储数据
            Map<Integer, List<Tuple2<Integer, String>>> s1Cache = new HashMap<>();
            Map<Integer, List<Tuple3<Integer, String, Integer>>> s2Cache = new HashMap<>();

            @Override
            public void processElement1(Tuple2<Integer, String> value, Context ctx, Collector<String> out) throws Exception {
                Integer id = value.f0;
                //先把s1数据存到map中
                if(!s1Cache.containsKey(id)){
                    ArrayList<Tuple2<Integer, String>> s1Value = new ArrayList<>();
                    s1Value.add(value);
                    s1Cache.put(id, s1Value);
                }else {
                    s1Cache.get(id).add(value);
                }
                if (s2Cache.containsKey(id)){
                    for (Tuple3<Integer, String, Integer> s2Element : s2Cache.get(id)) {
                        out.collect("s1"+value+"---------"+"s2"+s2Element);
                    }
                }
            }

            @Override
            public void processElement2(Tuple3<Integer, String, Integer> value, Context ctx, Collector<String> out) throws Exception {
                Integer id = value.f0;
                //先把s1数据存到map中
                if(!s2Cache.containsKey(id)){
                    ArrayList<Tuple3<Integer, String, Integer>> s2Value = new ArrayList<>();
                    s2Value.add(value);
                    s2Cache.put(id, s2Value);
                }else {
                    s2Cache.get(id).add(value);
                }
                if (s1Cache.containsKey(id)){
                    for (Tuple2<Integer, String> s1Element : s1Cache.get(id)) {
                        out.collect("s1"+s1Element+"---------"+"s2"+value);
                    }
                }
            }
        });
        process.print();
        env.execute();
    }
}

9.4、输出算子sink

将计算结果写到外部存储
在这里插入图片描述
输出到外部系统参考官网。

9.4.1、输出到文件FileSink
package sink;

import org.apache.flink.api.common.serialization.SimpleStringEncoder;
import org.apache.flink.api.connector.sink.Sink;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.connector.file.sink.FileSink;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.filesystem.DefaultBucketFactoryImpl;
import org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig;
import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.DateTimeBucketAssigner;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;

import java.time.Duration;
import java.time.ZoneId;
import java.util.concurrent.TimeUnit;


public class SinkFile {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        //必须开启,否则文件一直是.inprogress
        env.enableCheckpointing(2000, CheckpointingMode.EXACTLY_ONCE);
        DataStreamSource<String> source = env.fromElements("111", "222","333","444","555");
        FileSink<String> sink = FileSink
                //官网示例
                .forRowFormat(new Path("f:/tmp"), new SimpleStringEncoder<String>("UTF-8"))
                .withRollingPolicy(
                        DefaultRollingPolicy.builder()
                                .withRolloverInterval(TimeUnit.MINUTES.toSeconds(5))
//                                .withInactivityInterval(TimeUnit.MINUTES.toMillis(5))
                                .withMaxPartSize(1024L)
                                .build())
                .build();

//        FileSink<String> fileSink = FileSink
//                //输出行式存储文件
//                .<String>forRowFormat(new Path("f:/tmp"), new SimpleStringEncoder<>())
//                //输出文件配置
//                .withOutputFileConfig(
//                        OutputFileConfig.builder()
//                                .withPartPrefix("test")
//                                .withPartSuffix(".log")
//                                .build()
//                )
//                //文件分桶
//                .withBucketAssigner(new DateTimeBucketAssigner<>("yy-MM-dd", ZoneId.systemDefault()))
//                //文件滚动策略
//                .withRollingPolicy(DefaultRollingPolicy.builder()
//                        .withRolloverInterval(5L * 1000L)
//                        .withMaxPartSize(1L * 1024L)
//                        .build()
//                ).build();
        source.sinkTo(sink);


        env.execute();

    }
}

9.4.2、输出到kafka

参考官网:https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/datastream/kafka/

package sink;

import bean.WaterSensor;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema;
import org.apache.flink.streaming.connectors.kafka.table.KafkaSinkSemantic;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.nio.charset.StandardCharsets;
import java.util.Properties;


public class SinkKafkaDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env =  StreamExecutionEnvironment.getExecutionEnvironment();;

        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "192.168.132.100:9092,192.168.132.101:9092,192.168.132.102:9092");
        SingleOutputStreamOperator<WaterSensor> dataStreamSource = env.socketTextStream("192.168.132.101", 7777)
                .map(value ->{
                    String[] datas = value.split(",");
                    return new WaterSensor(datas[0],Long.parseLong(datas[1]),Integer.parseInt(datas[2]) );
                } );
        KafkaSerializationSchema<WaterSensor> serializationSchema = new KafkaSerializationSchema<WaterSensor>() {
            @Override
            public ProducerRecord<byte[], byte[]> serialize(WaterSensor s,Long time  ) {
                return new ProducerRecord<>(
                        "test", // target topic
                        s.toString().getBytes(StandardCharsets.UTF_8)); // record contents
            }
        };
        dataStreamSource.addSink(new FlinkKafkaProducer<WaterSensor>(
                "test",serializationSchema,properties,FlinkKafkaProducer.Semantic.EXACTLY_ONCE
        ));
        env.execute();

    }
}

![在这里插入图片描述](https://i-blog.csdnimg.cn/direct/c12239189c82417f8d17f9f8312dcf97.png)

##### 9.4.3、输出到MySQL

参考官网:https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/datastream/jdbc/

```java
package sink;

import bean.WaterSensor;
import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
import org.apache.flink.connector.jdbc.JdbcSink;
import org.apache.flink.connector.jdbc.JdbcStatementBuilder;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.nio.charset.StandardCharsets;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.Properties;


public class SinkMysqlDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env =  StreamExecutionEnvironment.getExecutionEnvironment();;
        env.setParallelism(1);

        SingleOutputStreamOperator<WaterSensor> dataStreamSource = env.socketTextStream("192.168.132.101", 7777)
                .map(value ->{
                    String[] datas = value.split(",");
                    return new WaterSensor(datas[0],Long.parseLong(datas[1]),Integer.parseInt(datas[2]) );
                } );
        dataStreamSource.print();
        SinkFunction<WaterSensor> waterSensorSinkFunction = JdbcSink.sink(
                "insert into ws (id,ts,vc)  values (?, ?, ?)",                       // mandatory
                new JdbcStatementBuilder<WaterSensor>() {
                    @Override
                    public void accept(PreparedStatement preparedStatement, WaterSensor waterSensor) throws SQLException {
                        preparedStatement.setString(1, waterSensor.id);
                        preparedStatement.setLong(2, waterSensor.ts);
                        preparedStatement.setInt(3, waterSensor.vc);

                    }
                },                  // mandatory
                JdbcExecutionOptions.builder()
                        .withBatchSize(1000)
                        .withBatchIntervalMs(200)
                        .withMaxRetries(5)
                        .build(),                  // optional
                new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
                        .withUrl("jdbc:mysql://localhost:3306/testflink?" +
                                "autoReconnect=true&useUnicode=true&characterEncoding=UTF-8&serverTimezone=Asia/Shanghai")
//                        .withDriverName("org.Mysql.Driver")
                        .withUsername("root")
                        .withPassword("123")
                        .withConnectionCheckTimeoutSeconds(60)
                        .build()                  // mandatory
        );
        dataStreamSource.addSink(waterSensorSinkFunction);
        env.execute();

    }
}

10、时间和窗口

窗口一般是划定的一段时间范围,即时间窗。窗口本事是截取有界数据的一种方式,对这个范围内的数据进行处理。

10.1、窗口分类
  1. 按照驱动类型分:时间窗口(定点发车)、计数窗口(人齐发车)
  2. 按照窗口分配数据的规则分:滚动窗口、滑动窗口、会话窗口、全局窗口
10.2、窗口API概述

按键分区和非按键分区

1、按键分区

按键分区后数据流被key分成多条逻辑流KeyedStream,窗口计算会在多个并行子任务上同时执行。相同key的数据会在一个子任务中,相当于每个key都定义了一组窗口各自独立进行统计计算。

2、非按键分区

原始流dataStreamSource不会分成多条逻辑流,窗口逻辑只能在一个任务上执行,相当于并行度为1。

10.3、窗口分配器

Window Assigners 是构建窗口算子的第一步,用来定义数据被分配到哪个窗口,即指定窗口的类型。一般使用.window()方法,传入Window Assigners参数,返回WindowedStream。非按键分区使用.windowAll(),返回AllWindowedStream.

基于时间:

  • 按键分区滚动窗口,窗口长度2秒

    keyedStream.window(TumblingProcessingTimeWindows.of(Time.seconds(2)));
    
  • 按键分区滑动时间窗口,窗口长度10s,滑动步长2s

    keyedStream.window(SlidingProcessingTimeWindows.of(Time.seconds(10),Time.seconds(2)));
    
  • 按键分区会话窗口,窗口长度2s

    keyedStream.window(ProcessingTimeSessionWindows.withGap(Time.seconds(2)));
    

基于计数:

  • 按键分区滚动窗口,窗口长度为5个元素

    keyedStream.countWindow(5);
    
  • 按键分区滑动窗口,窗口长度5个元素,滑动步长2个元素

    keyedStream.countWindow(5,2);
    
10.4、窗口函数

窗口分配器只收集数据,窗口函数Window Function进行计算操作。

各种流的相互关系
在这里插入图片描述

  • 增量聚合:来一条算一条,窗口触发时输出计算结果
  • 全窗口函数:数据来了不计算先存上,等窗口触发时计算并输出结果
10.4.1、增量聚合函数

每来一个数据就聚合一次

1、归约函数ReduceFunction

相同key的第一条数据来的时候不会调用reduce方法,来一条数据就算一条,窗口触发输出计算结果

package window;

import bean.WaterSensor;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;



public class WindowAPIDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env =  StreamExecutionEnvironment.getExecutionEnvironment();;
        env.setParallelism(1);

        SingleOutputStreamOperator<WaterSensor> dataStreamSource = env.socketTextStream("192.168.132.101", 7777)
                .map(value ->{
                    String[] datas = value.split(",");
                    return new WaterSensor(datas[0],Long.parseLong(datas[1]),Integer.parseInt(datas[2]) );
                } );
        KeyedStream<WaterSensor, String> keyedStream = dataStreamSource.keyBy(value -> value.getId());
        WindowedStream<WaterSensor, String, TimeWindow> windowStream = keyedStream.window(TumblingProcessingTimeWindows.of(Time.seconds(10)));
        SingleOutputStreamOperator<WaterSensor> reduce = windowStream.reduce(new ReduceFunction<WaterSensor>() {
            @Override
            public WaterSensor reduce(WaterSensor value1, WaterSensor value2) throws Exception {
                return new WaterSensor(value1.id, value2.ts, value1.vc + value2.vc);
            }
        });
        reduce.print();
        env.execute();

    }
}

2、聚合函数Aggregate Function

ReduceFunction能解决大多归约聚合问题,但聚合状态类型、输出结果类型和输入数据类型必须一样。Aggregate Function更加灵活,有三种类型:输入IN、累加器ACC、输出OUT。输入IN是输入流中元素的数据类型;累加器类型ACC是聚合中间状态类型;输出OUT是最终计算结果类型。

  • 第一条数据来创建窗口和累加器
  • 增量聚合:来一条算一条(调用一次add方法)
  • 窗口输出调用一次getresult方法
  • 输入、输出、中间累加器的类型可以不一样
package window;

import bean.WaterSensor;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.ProcessingTimeSessionWindows;
import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;


public class WindowAggregateDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env =  StreamExecutionEnvironment.getExecutionEnvironment();;
        env.setParallelism(1);

        SingleOutputStreamOperator<WaterSensor> dataStreamSource = env.socketTextStream("192.168.132.101", 7777)
                .map(value ->{
                    String[] datas = value.split(",");
                    return new WaterSensor(datas[0],Long.parseLong(datas[1]),Integer.parseInt(datas[2]) );
                } );
        KeyedStream<WaterSensor, String> keyedStream = dataStreamSource.keyBy(value -> value.getId());
        WindowedStream<WaterSensor, String, TimeWindow> window = keyedStream.window(TumblingProcessingTimeWindows.of(Time.seconds(10)));
        SingleOutputStreamOperator<String> aggregate = window.aggregate(new AggregateFunction<WaterSensor, Integer, String>() {
            @Override
            public Integer createAccumulator() {
                System.out.println("初始化累加器");
                return 0;
            }

            @Override
            public Integer add(WaterSensor value, Integer accumulator) {
                System.out.println("调用add");
                return value.vc + accumulator;
            }

            @Override
            public String getResult(Integer accumulator) {
                System.out.println("输出结果");
                return accumulator.toString();
            }

            @Override
            public Integer merge(Integer a, Integer b) {
                //只有会话窗口才用
                return null;
            }
        });

        aggregate.print();
        env.execute();

    }
}

10.4.2、全窗口函数

1、窗口函数

.apply(),但是该方法能提供的上下文信息比较少,已经被ProcessWindowFunction全覆盖

window.apply(new WindowFunction<WaterSensor, String, String, TimeWindow>() {
            @Override
            public void apply(String key, TimeWindow window, Iterable<WaterSensor> input, Collector<String> out) throws Exception {

            }
        });

2、处理窗口函数

ProcessWindowFunction除了能拿到窗口数据外还能获取上下文对象。上下文包括窗口信息、当前的时间和状态信息(处理时间、事件时间水位线)

窗口触发时才调用一次,统一计算窗口内的所有数据

package window;

import bean.WaterSensor;
import org.apache.commons.lang3.time.DateFormatUtils;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.ProcessingTimeSessionWindows;
import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;


public class WindowProcessDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env =  StreamExecutionEnvironment.getExecutionEnvironment();;
        env.setParallelism(1);

        SingleOutputStreamOperator<WaterSensor> dataStreamSource = env.socketTextStream("192.168.132.101", 7777)
                .map(value ->{
                    String[] datas = value.split(",");
                    return new WaterSensor(datas[0],Long.parseLong(datas[1]),Integer.parseInt(datas[2]) );
                } );
        KeyedStream<WaterSensor, String> keyedStream = dataStreamSource.keyBy(value -> value.getId());
//        dataStreamSource.windowAll();
        WindowedStream<WaterSensor, String, TimeWindow> window = keyedStream.window(TumblingProcessingTimeWindows.of(Time.seconds(10)));
        SingleOutputStreamOperator<String> process = window.process(new ProcessWindowFunction<WaterSensor, String, String, TimeWindow>() {
            @Override
            public void process(String key, Context context, Iterable<WaterSensor> elements, Collector<String> out) throws Exception {
                long start = context.window().getStart();
                long end = context.window().getEnd();
                String winStart = DateFormatUtils.format(start, "yyyy-MM-dd HH:mm:ss.SSS");
                String winEnd = DateFormatUtils.format(end, "yyyy-MM-dd HH:mm:ss.SSS");
                long l = elements.spliterator().estimateSize();
                out.collect("key=" + key + "的窗口[" + winStart + "," + winEnd + "]包含" + l + "条数据" + elements.toString());
            }
        });
        process.print();
        env.execute();

    }
}

10.4.3、增量聚合与全窗口函数接合使用

增量聚合Aggregate+全窗口的ProcessWindow

  1. 增量聚合函数处理数据:来一条算一条
  2. 窗口触发时,增量聚合结果(只有一条数据)传给全窗口函数
  3. 经过全窗口函数的处理后输出

从而实现了两者的优点(reduce函数也能传全窗口函数)

  1. 增量聚合:来一条算一条只存储中间计算结果,占用空间少
  2. 全窗口函数:可以通过上下文实现灵活的功能
package window;

import bean.WaterSensor;
import org.apache.commons.lang3.time.DateFormatUtils;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;


public class WindowAggregateAndProcessDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env =  StreamExecutionEnvironment.getExecutionEnvironment();;
        env.setParallelism(1);

        SingleOutputStreamOperator<WaterSensor> dataStreamSource = env.socketTextStream("192.168.132.101", 7777)
                .map(value ->{
                    String[] datas = value.split(",");
                    return new WaterSensor(datas[0],Long.parseLong(datas[1]),Integer.parseInt(datas[2]) );
                } );
        KeyedStream<WaterSensor, String> keyedStream = dataStreamSource.keyBy(value -> value.getId());
        WindowedStream<WaterSensor, String, TimeWindow> window = keyedStream.window(TumblingProcessingTimeWindows.of(Time.seconds(10)));
        SingleOutputStreamOperator<String> outputStreamOperator = window.aggregate(new MyAgg(), new MyProcess());
        outputStreamOperator.print();
        env.execute();

    }
    public static class MyAgg implements  AggregateFunction<WaterSensor, Integer, String>{

        @Override
        public Integer createAccumulator() {
            System.out.println("初始化累加器");
            return 0;
        }

        @Override
        public Integer add(WaterSensor value, Integer accumulator) {
            System.out.println("调用add");
            return value.vc + accumulator;
        }

        @Override
        public String getResult(Integer accumulator) {
            System.out.println("输出结果");
            return accumulator.toString();
        }

        @Override
        public Integer merge(Integer a, Integer b) {
            //只有会话窗口才用
            return null;
        }
    }
    public static class MyProcess extends ProcessWindowFunction<String, String, String, TimeWindow> {

        @Override
        public void process(String key, Context context, Iterable<String> elements, Collector<String> out) throws Exception {
            long start = context.window().getStart();
            long end = context.window().getEnd();
            String winStart = DateFormatUtils.format(start, "yyyy-MM-dd HH:mm:ss.SSS");
            String winEnd = DateFormatUtils.format(end, "yyyy-MM-dd HH:mm:ss.SSS");
            long l = elements.spliterator().estimateSize();
            out.collect("key=" + key + "的窗口[" + winStart + "," + winEnd + "]包含" + l + "条数据" + elements.toString());
        }
    }
}

10.5、小结

触发器、移除器:现成的几个窗口都有默认的实现,一般不需要定义

以时间滚动窗口为例:

  • 窗口什么时候触发输出:时间进展>=窗口的最大时间戳(end-1ms)

  • 窗口是怎么划分的:start=取窗口长度的整数倍,向下取整,end=start+窗口长度,窗口左闭右开[start,end)

  • 窗口生命周期:

    创建:属于本窗口的第一条数据来的时候现new的,放入一个singleton单例的集合中;

    销毁(关窗):时间进展>=窗口的最大时间戳(end-1ms)+允许迟到时间(默认为0)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2117331.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Go开源日志库Logrus的使用

一、Logrus简介 Logrus 是一个流行的 Go 语言日志库&#xff0c;以其功能强大、性能高效和高度灵活性而闻名。有关更多介绍可查看 Logrus。 主要特点 丰富的日志级别&#xff1a;Logrus 支持多种日志级别&#xff0c;包括 Debug、Info、Warn、Error、Fatal 和 Panic&#xf…

深入理解数据库的 4NF:多值依赖与消除数据异常

在数据库设计中&#xff0c; "范式" 是一个常常被提到的重要概念。许多初学者在学习数据库设计时&#xff0c;经常听到第一范式&#xff08;1NF&#xff09;、第二范式&#xff08;2NF&#xff09;、第三范式&#xff08;3NF&#xff09;以及 BCNF&#xff08;Boyce-…

RESTful 还是 JSON-RPC

前言 RESTful 比较简单地说就是&#xff0c;大家请求一样的url&#xff08;GET方法有一个例外&#xff0c;url中带了一个id&#xff09;&#xff0c;通过不同的请求方法&#xff0c;分别进行不同的操作&#xff08;CRUD&#xff09;。 JSON-RPC JSON-RPC是一个无状态且轻量级…

SpringBoot学习(7)(Bean对象注册)(自定义组合注解)

目录 一、引言 二、案例学习 &#xff08;一&#xff09;Bean &#xff08;二&#xff09;Import 三、补充 &#xff08;1&#xff09;关于Java中collection.toArray(new String[0])解释 &#xff08;2&#xff09;组合注解 一、引言 上次学习了解到&#xff0c;springb…

基于机器学习的阿尔兹海默症智能分析预测系统

温馨提示&#xff1a;文末有 CSDN 平台官方提供的学长 QQ 名片 :) 1. 项目简介 阿尔兹海默症&#xff08;Alzheimers Disease, AD&#xff09;是一种常见的神经退行性疾病&#xff0c;主要影响老年人的认知功能。随着全球人口老龄化的加剧&#xff0c;阿尔兹海默症的患病率逐年…

nodejs 使用kafka案例,node-red配置kafka案例,从安装配置kafka开始

生产者测试&#xff1a; bin/kafka-console-producer.sh --broker-list 1.2.3.4:9092 --topic test-topic消费者测试&#xff1a; bin/kafka-console-consumer.sh --bootstrap-server 1.2.3.4:9092 --topic test-topic --from-beginningconst { Kafka } require(kafkajs)con…

【AIGC数字人】EchoMimic:基于可编辑关键点条件的类人音频驱动肖像动画

GitHub&#xff1a;https://github.com/BadToBest/EchoMimic 论文&#xff1a; https://arxiv.org/pdf/2407.08136 comfyui&#xff1a; https://github.com/smthemex/ComfyUI_EchoMimic 相关工作 Wav2Lip Wav2Lip是一个开创性的工作 &#xff0c;但输出会出现面部模糊或扭…

粒子群算法原理的示例介绍

一&#xff1a;粒子群优化算法的介绍 粒子群优化算法&#xff08;PSO&#xff09;是一种基于群体智能的优化算法&#xff0c;于1995年提出。它受到鸟群狩猎行为的启发&#xff0c;通过模拟鸟群或鱼群的社会行为来进行问题的求解。 基本原理 粒子群算法中&#xff0c;每个解决…

顶刊算法 | Matlab实现鹈鹕算法POA-CNN-LSTM-Multihead-Attention多头注意力机制多变量时间序列预测,优化前后对比

顶刊算法 | Matlab实现鹈鹕算法POA-CNN-LSTM-Multihead-Attention多头注意力机制多变量时间序列预测&#xff0c;优化前后对比 目录 顶刊算法 | Matlab实现鹈鹕算法POA-CNN-LSTM-Multihead-Attention多头注意力机制多变量时间序列预测&#xff0c;优化前后对比预测效果基本介绍…

一种小众且适合发文的智能优化算法应用——三维TSP问题

声明&#xff1a;文章是从本人公众号中复制而来&#xff0c;因此&#xff0c;想最新最快了解各类智能优化算法及其改进的朋友&#xff0c;可关注我的公众号&#xff1a;强盛机器学习&#xff0c;不定期会有很多免费代码分享~ 今天给大家介绍一种非常小众的智能优化算法应用&am…

【LabVIEW学习篇 - 21】:DLL与API的调用

文章目录 DLL与API调用DLLAPIDLL的调用 DLL与API调用 LabVIEW虽然已经足够强大&#xff0c;但不同的语言在不同领域都有着自己的优势&#xff0c;为了强强联合&#xff0c;LabVIEW提供了强大的外部程序接口能力&#xff0c;包括DLL、CIN(C语言接口)、ActiveX、.NET、MATLAB等等…

2024/9/9 408“回头看”:

B树是什么&#xff1f;有什么作用&#xff1f;B树的插入和删除具体细节是什么&#xff1f;除了B树还有一个是B&#xff0b;树、还是B-树&#xff0c;他们有什么区别&#xff0c;又有什么相同点&#xff1f; b树在王道考研查找这一章&#xff0c;所以他的主要作用就是查找。 在…

MySQL中binary放在判断语句之前有什么作用

为什么要加binary进行判断 ① 因为 mysql中等号比较是不区分大小写的&#xff0c;select aA这个输出结果为1。 ② 在判断语句之前 加上binary可以以区分大小写比较 &#xff0c;因为这样底层会用二进制形式比较&#xff0c;实现精确匹配。 代码比较 忽略大小写比较 select …

Ftrans跨域文件传输方案,数据流动无阻的高效路径

大型集团企业由于其规模庞大、业务广泛且往往将分支机构、办事处分布在多个地域&#xff0c;因此会涉及到跨域文件传输的需求。主要源于以下几个方面&#xff1a; 1.业务协同&#xff1a;集团内部的不同部门或子公司可能位于不同的地理位置&#xff0c;但需要进行紧密的业务协…

【C++】STL学习——priority_queue(了解仿函数)

目录 priority_queue介绍迭代器种类priority_queue实现仿函数仿函数的使用 priority_queue介绍 优先队列是一种容器适配器&#xff0c;根据严格的弱排序标准&#xff0c;它的第一个元素总是它所包含的元素中最大的。此上下文类似于堆&#xff0c;在堆中可以随时插入元素&#x…

Linux 磁盘管理-RAID磁盘冗余阵列看这一篇就够了

今天给伙伴们分享一下Linux 磁盘管理-RAID磁盘冗余阵列&#xff0c;希望看了有所收获。 我是公众号「想吃西红柿」「云原生运维实战派」作者&#xff0c;对云原生运维感兴趣&#xff0c;也保持时刻学习&#xff0c;后续会分享工作中用到的运维技术&#xff0c;在运维的路上得到…

机器学习深度学习

版权声明 本文原创作者:谷哥的小弟作者博客地址:http://blog.csdn.net/lfdfhl1. 深度学习概述 1.1 定义与历史背景 深度学习作为机器学习领域的一个重要分支,其核心在于构建由多层(深层)的人工神经网络组成的计算模型,这些模型能够学习数据的多层次抽象表示。深度学习的…

东大成贤资源库-数据挖掘技术与应用 实验一:数据预处理_熟悉数据挖掘数据预处理流程。 基于给定的数据集(csv)文件,完成下列数据处理。 1

【实验内容】 程序清单 import pandas as pd import numpy as np# 读入文件&#xff0c;存放在字典data里 data pd.read_csv("D:\\Desktop\\data1.csv")# 填入Id列空缺的值 for i in range(1,len(data)):if pd.isnull(data[Id][i]):data[Id][i]i1# Id属性列去重&a…

虚拟机的安装步骤

我这里使用的是VMware 1.下载centos7 2.配置 跟这图来就好 开启虚拟机 第一个页面直接回车,忘了截图 等待安装 选择语言,看自己 点击完成 点击继续安装 设置账号密码 然后等待就行 安装完成之后会有一个重启,点击(又忘了截图) 完成许可和网络 最后就可以了

AI(文生语音)-TTS 技术线路探索学习:从拼接式参数化方法到Tacotron端到端输出

AI(文生语音)-TTS 技术线路探索学习:从拼接式参数化方法到Tacotron端到端输出 在数字化时代&#xff0c;文本到语音&#xff08;Text-to-Speech, TTS&#xff09;技术已成为人机交互的关键桥梁&#xff0c;无论是为视障人士提供辅助阅读&#xff0c;还是为智能助手注入声音的灵…