Flink学习连载文章4-flink中的各种转换操作

news2024/11/23 17:01:59

 首先,先搞一个模板代码,方便后面的操作


#if (${PACKAGE_NAME} && ${PACKAGE_NAME} != "")package ${PACKAGE_NAME};#end
#parse("File Header.java")
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
/**
 @基本功能:
 @program:${PROJECT_NAME}
 @author: 闫哥
 @create:${YEAR}-${MONTH}-${DAY} ${HOUR}:${MINUTE}:${SECOND}
**/
public class ${NAME} {

    public static void main(String[] args) throws Exception {

    //1. env-准备环境
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
    
    //2. source-加载数据
    //3. transformation-数据处理转换
    //4. sink-数据输出
   
    
    
   
    
     //5. execute-执行
    env.execute();
  }
}

map算子的使用

假如有如下数据:

86.149.9.216 10001 17/05/2015:10:05:30 GET /presentations/logstash-monitorama-2013/images/github-contributions.png
83.149.9.216 10002 17/05/2015:10:06:53 GET /presentations/logstash-monitorama-2013/css/print/paper.css
83.149.9.216 10002 17/05/2015:10:06:53 GET /presentations/logstash-monitorama-2013/css/print/paper.css
83.149.9.216 10002 17/05/2015:10:06:53 GET /presentations/logstash-monitorama-2013/css/print/paper.css
83.149.9.216 10002 17/05/2015:10:06:53 GET /presentations/logstash-monitorama-2013/css/print/paper.css
83.149.9.216 10002 17/05/2015:10:06:53 GET /presentations/logstash-monitorama-2013/css/print/paper.css
83.149.9.216 10002 17/05/2015:10:06:53 GET /presentations/logstash-monitorama-2013/css/print/paper.css
10.0.0.1 10003 17/05/2015:10:06:53 POST /presentations/logstash-monitorama-2013/css/print/paper.css
10.0.0.1 10003 17/05/2015:10:07:53 POST /presentations/logstash-monitorama-2013/css/print/paper.css
10.0.0.1 10003 17/05/2015:10:08:53 POST /presentations/logstash-monitorama-2013/css/print/paper.css
10.0.0.1 10003 17/05/2015:10:09:53 POST /presentations/logstash-monitorama-2013/css/print/paper.css
10.0.0.1 10003 17/05/2015:10:10:53 POST /presentations/logstash-monitorama-2013/css/print/paper.css
10.0.0.1 10003 17/05/2015:10:16:53 POST /presentations/logstash-monitorama-2013/css/print/paper.css
10.0.0.1 10003 17/05/2015:10:26:53 POST /presentations/logstash-monitorama-2013/css/print/paper.css

将其转换为一个LogBean对象,并输出。

提示:读取本地文件,使用如下方式

DataStream<String> lines = env.readTextFile("./data/input/flatmap.log");

字段名可以定义为:

        String ip;      // 访问ip
        int userId;     // 用户id
        long timestamp; // 访问时间戳
        String method;  // 访问方法
        String path;    // 访问路径

假如需要用到日期工具类,可以导入lang3包

<dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-lang3</artifactId>
            <version>3.12.0</version>
</dependency>
package com.bigdata.day03;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.commons.lang3.time.DateUtils;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import java.text.SimpleDateFormat;
import java.util.Date;

/**
 * @基本功能:
 * @program:FlinkDemo
 * @author: 闫哥
 * @create:2023-11-21 09:10:30
 **/

public class MapDemo {

    @Data
    @AllArgsConstructor
    @NoArgsConstructor
    static class LogBean{
        String ip;      // 访问ip
        int userId;     // 用户id
        long timestamp; // 访问时间戳
        String method;  // 访问方法
        String path;    // 访问路径
    }

    public static void main(String[] args) throws Exception {

        //1. env-准备环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);

        //2. source-加载数据
        DataStream<String> fileStream = env.readTextFile("F:\\BD230801\\FlinkDemo\\datas\\a.log");


        //3. transformation-数据处理转换
        // 此处也可以将数据放入到tuple中,tuple可以支持到tuple25
        DataStream<LogBean> mapStream = fileStream.map(new MapFunction<String, LogBean>() {
            @Override
            public LogBean map(String line) throws Exception {
                String[] arr = line.split(" ");
                String ip = arr[0];
                int userId = Integer.valueOf(arr[1]);
                String createTime = arr[2];
                // 如何将一个时间字符串变为时间戳
                // 17/05/2015:10:05:30
                /*SimpleDateFormat simpleDateFormat = new SimpleDateFormat("dd/MM/yyyy:HH:mm:ss");
                Date date = simpleDateFormat.parse(createTime);
                long timeStamp = date.getTime();*/
                // 要想使用这个common.lang3 下的工具类,需要导入包
                Date date = DateUtils.parseDate(createTime, "dd/MM/yyyy:HH:mm:ss");
                long timeStamp = date.getTime();

                String method = arr[3];
                String path = arr[4];

                LogBean logBean = new LogBean(ip, userId, timeStamp, method, path);
                return logBean;
            }
        });
        //4. sink-数据输出
        mapStream.print();


        //5. execute-执行
        env.execute();
    }
}

第二个版本:

package com.bigdata.day02;

import lombok.AllArgsConstructor;
import lombok.Data;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import java.text.SimpleDateFormat;
import java.util.Date;

/**
 * @基本功能:
 * @program:FlinkDemo
 * @author: 闫哥
 * @create:2024-05-13 11:40:37
 **/
@Data
@AllArgsConstructor
class LogBean{
    private String ip;      // 访问ip
    private int userId;     // 用户id
    private long timestamp; // 访问时间戳
    private String method;  // 访问方法
    private String path;    // 访问路径
}
public class Demo04 {

    // 将数据转换为javaBean
    public static void main(String[] args) throws Exception {

        //1. env-准备环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);

        //2. source-加载数据
        DataStreamSource<String> streamSource = env.readTextFile("datas/a.log");

        //3. transformation-数据处理转换
        SingleOutputStreamOperator<LogBean> map = streamSource.map(new MapFunction<String, LogBean>() {
            @Override
            public LogBean map(String line) throws Exception {
                String[] arr = line.split("\\s+");

                //时间戳转换  17/05/2015:10:06:53
                String time = arr[2];
                SimpleDateFormat format = new SimpleDateFormat("dd/MM/yyyy:HH:mm:ss");
                Date date = format.parse(time);
                long timeStamp = date.getTime();
                return new LogBean(arr[0],Integer.parseInt(arr[1]),timeStamp,arr[3],arr[4]);
            }
        });

        //4. sink-数据输出
        map.print();

        //5. execute-执行
        env.execute();
    }
}

FlatMap算子的使用练习

将DataStream中的每一个元素转换为0...n个元素

读取flatmap.log文件中的数据

如:

张三,苹果手机,联想电脑,华为平板
李四,华为手机,苹果电脑,小米平板

转换为

张三有苹果手机

张三有联想电脑

张三有华为平板

李四有…

代码演示:

package com.bigdata.day03;

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

/**
 * @基本功能:
 * @program:FlinkDemo
 * @author: 闫哥
 * @create:2023-11-21 09:51:59
 **/
public class FlatMapDemo {

    public static void main(String[] args) throws Exception {

        //1. env-准备环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);

        //2. source-加载数据
        //2. source-加载数据
        DataStream<String> fileStream = env.readTextFile("F:\\BD230801\\FlinkDemo\\datas\\flatmap.log");
        //3. transformation-数据处理转换
        DataStream<String> flatMapStream = fileStream.flatMap(new FlatMapFunction<String, String>() {
            @Override
            public void flatMap(String line, Collector<String> collector) throws Exception {
                //张三,苹果手机,联想电脑,华为平板
                String[] arr = line.split(",");
                String name = arr[0];
                for (int i = 1; i < arr.length; i++) {
                    String goods = arr[i];
                    collector.collect(name+"有"+goods);
                }
            }
        });
        //4. sink-数据输出
        flatMapStream.print();

        //5. execute-执行
        env.execute();
    }
}

Filter的使用

读取第一题中 a.log文件中的访问日志数据,过滤出来以下访问IP是83.149.9.216的访问日志

package com.bigdata.day03;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.commons.lang3.time.DateUtils;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import java.util.Date;

/**
 * @基本功能:
 * @program:FlinkDemo
 * @author: 闫哥
 * @create:2023-11-21 09:10:30
 **/

public class FilterDemo {

    @Data
    @AllArgsConstructor
    @NoArgsConstructor
    static class LogBean{
        String ip;      // 访问ip
        int userId;     // 用户id
        long timestamp; // 访问时间戳
        String method;  // 访问方法
        String path;    // 访问路径
    }

    public static void main(String[] args) throws Exception {

        //1. env-准备环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);

        //2. source-加载数据
        DataStream<String> fileStream = env.readTextFile("F:\\BD230801\\FlinkDemo\\datas\\a.log");


        //3. transformation-数据处理转换
        // 读取第一题中 a.log文件中的访问日志数据,过滤出来以下访问IP是83.149.9.216的访问日志
        DataStream<String> filterStream = fileStream.filter(new FilterFunction<String>() {
            @Override
            public boolean filter(String line) throws Exception {
                String ip = line.split(" ")[0];
                return  ip.equals("83.149.9.216");
            }
        });
        //4. sink-数据输出
        filterStream.print();


        //5. execute-执行
        env.execute();
    }
}

随堂代码:

package com.bigdata.day02;

import lombok.AllArgsConstructor;
import lombok.Data;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import java.text.SimpleDateFormat;
import java.util.Date;

/**
 * @基本功能:
 * @program:FlinkDemo
 * @author: 闫哥
 * @create:2024-05-13 11:40:37
 **/

public class Demo06 {

    // 将数据转换为javaBean
    public static void main(String[] args) throws Exception {

        //1. env-准备环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);

        //2. source-加载数据
        DataStreamSource<String> streamSource = env.readTextFile("datas/a.log");

        //3. transformation-数据处理转换
        //读取第一题中 a.log文件中的访问日志数据,过滤出来以下访问IP是83.149.9.216的访问日志
        streamSource.filter(new FilterFunction<String>() {
            @Override
            public boolean filter(String line) throws Exception {
                String[] arr = line.split(" ");
                String ip = arr[0];
                return ip.equals("83.149.9.216");
            }
        }).writeAsText("datas/b.log", FileSystem.WriteMode.OVERWRITE).setParallelism(1);

        //4. sink-数据输出


        //5. execute-执行
        env.execute();
    }
}

KeyBy

流处理中没有groupBy,而是keyBy

KeySelector对象可以支持元组类型,也可以支持POJO[Entry、JavaBean]

元组类型

单个字段keyBy

//用字段位置(已经被废弃)
wordAndOne.keyBy(0)

//用字段表达式
wordAndOne.keyBy(v -> v.f0)

多个字段keyBy

//用字段位置
wordAndOne.keyBy(0, 1);

//用KeySelector
wordAndOne.keyBy(new KeySelector<Tuple2<String, Integer>, Tuple2<String, Integer>>() {
    @Override
    public Tuple2<String, Integer> getKey(Tuple2<String, Integer> value) throws Exception {
        return Tuple2.of(value.f0, value.f1);
    }
});

类似于sql中的group by

select sex,count(1) from student group by sex;
group by 后面也可以跟多个字段进行分组,同样 keyBy 也支持使用多个列进行分组

POJO

public class PeopleCount {
    private String province;
    private String city;
    private Integer counts;
    public PeopleCount() {
    }
    //省略其他代码。。。
}

单个字段keyBy

source.keyBy(a -> a.getProvince());

多个字段keyBy

source.keyBy(new KeySelector<PeopleCount, Tuple2<String, String>>() {
    @Override
    public Tuple2<String, String> getKey(PeopleCount value) throws Exception {
        return Tuple2.of(value.getProvince(), value.getCity());
    }
});

练习:

假如有如下数据:
env.fromElements(
                Tuple2.of("篮球", 1),
                Tuple2.of("篮球", 2),
                Tuple2.of("篮球", 3),
                Tuple2.of("足球", 3),
                Tuple2.of("足球", 2),
                Tuple2.of("足球", 3)
        );
求:篮球多少个,足球多少个?
package com.bigdata.day03;

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

/**
 * @基本功能:
 * @program:FlinkDemo
 * @author: 闫哥
 * @create:2023-11-21 10:07:05
 **/
public class KeyByDemo {

    public static void main(String[] args) throws Exception {

        //1. env-准备环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);

        //2. source-加载数据
        DataStream<Tuple2<String,Integer>> dataStream = env.fromElements(
                Tuple2.of("篮球", 1),
                Tuple2.of("篮球", 2),
                Tuple2.of("篮球", 3),
                Tuple2.of("足球", 3),
                Tuple2.of("足球", 2),
                Tuple2.of("足球", 3)
        );
        //3. transformation-数据处理转换
        /*KeyedStream<Tuple2<String, Integer>, String> keyedStream = dataStream.keyBy(new KeySelector<Tuple2<String, Integer>, String>() {
            @Override
            public String getKey(Tuple2<String, Integer> tuple2) throws Exception {
                return tuple2.f0;
            }
        });*/
        KeyedStream<Tuple2<String, Integer>, String> keyedStream = dataStream.keyBy(tuple -> tuple.f0);
        keyedStream.sum(1).print();
        //4. sink-数据输出


        //5. execute-执行
        env.execute();
    }
}
package com.bigdata.day02;

import lombok.AllArgsConstructor;
import lombok.Data;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

/**
 * @基本功能:
 * @program:FlinkDemo
 * @author: 闫哥
 * @create:2024-05-13 14:32:52
 **/
public class Demo07 {

    @Data
    @AllArgsConstructor
    static class Ball{
        private String ballName;
        private int num;
    }

    public static void main(String[] args) throws Exception {

        //1. env-准备环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);

        //2. source-加载数据
        //3. transformation-数据处理转换
        //4. sink-数据输出
        DataStreamSource<Tuple2<String, Integer>> tuple2DataStreamSource = env.fromElements(
                Tuple2.of("篮球", 1),
                Tuple2.of("篮球", 2),
                Tuple2.of("篮球", 3),
                Tuple2.of("足球", 3),
                Tuple2.of("足球", 2),
                Tuple2.of("足球", 3)
        );
        // 这个写法已经废弃,0 代表的是按照元组的第一个元素进行分组,相同的组进入到相同的编号中
        KeyedStream<Tuple2<String, Integer>, Tuple> tuple2TupleKeyedStream = tuple2DataStreamSource.keyBy(0);
        tuple2TupleKeyedStream.print();
        // 这个写法是目前提倡的写法
        // 使用了lambda表达式,因为这个算子后面不需要写returns 所以看着比较简介
        tuple2DataStreamSource.keyBy(v -> v.f0).print();
        // 这个是原始写法,没有简化
        tuple2DataStreamSource.keyBy(new KeySelector<Tuple2<String, Integer>, String>() {

            @Override
            public String getKey(Tuple2<String, Integer> value) throws Exception {
                return value.f0;
            }
        });
        // 以上的写法是针对数据是二元组的格式,以下演示数据是pojo
        DataStreamSource<Ball> ballSource = env.fromElements(
                new Ball("篮球", 1),
                new Ball("篮球", 2),
                new Ball("篮球", 3),
                new Ball("足球", 3),
                new Ball("足球", 2),
                new Ball("足球", 3)
        );
        ballSource.keyBy(ball -> ball.getBallName()).print();
        ballSource.keyBy(new KeySelector<Ball, String>() {
            @Override
            public String getKey(Ball ball) throws Exception {
                return ball.getBallName();
            }
        });

        //5. execute-执行
        env.execute();
    }
}

假如遇到如下错误:

Exception in thread "main" org.apache.flink.api.common.typeutils.CompositeType$InvalidFieldReferenceException: Cannot reference field by field expression on GenericType<com.bigdata.transformation.Ball>Field expressions are only supported on POJO types, tuples, and case classes. (See the Flink documentation on what is considered a POJO.)

解决方案:

将实体类,不要写在一个.java 文件的里面,单独写成一个文件,并使用 public 修饰即可。

Reduce --sum的底层是reduce

可以对一个dataset 或者一个 group 来进行聚合计算,最终聚合成一个元素

读取a.log日志,统计ip地址访问pv数量,使用reduce 操作聚合成一个最终结果

结果类似:

(86.149.9.216,1)

(10.0.0.1,7)

(83.149.9.216,6)

package com.bigdata.day03;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.commons.lang3.time.DateUtils;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;


import java.util.Date;

/**
 * @基本功能:
 * @program:FlinkDemo
 * @author: 闫哥
 * @create:2023-11-21 09:10:30
 **/

public class ReduceDemo {

    @Data
    @AllArgsConstructor
    @NoArgsConstructor
    static class LogBean{
        String ip;      // 访问ip
        int userId;     // 用户id
        long timestamp; // 访问时间戳
        String method;  // 访问方法
        String path;    // 访问路径
    }

    public static void main(String[] args) throws Exception {

        //1. env-准备环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);

        //2. source-加载数据
        DataStream<String> fileStream = env.readTextFile("F:\\BD230801\\FlinkDemo\\datas\\a.log");


        //3. transformation-数据处理转换
        // 此处也可以将数据放入到tuple中,tuple可以支持到tuple25
        DataStream<LogBean> mapStream = fileStream.map(new MapFunction<String, LogBean>() {
            @Override
            public LogBean map(String line) throws Exception {
                String[] arr = line.split(" ");
                String ip = arr[0];
                int userId = Integer.valueOf(arr[1]);
                String createTime = arr[2];
                // 如何将一个时间字符串变为时间戳
                // 17/05/2015:10:05:30
                /*SimpleDateFormat simpleDateFormat = new SimpleDateFormat("dd/MM/yyyy:HH:mm:ss");
                Date date = simpleDateFormat.parse(createTime);
                long timeStamp = date.getTime();*/
                // 要想使用这个common.lang3 下的工具类,需要导入包
                Date date = DateUtils.parseDate(createTime, "dd/MM/yyyy:HH:mm:ss");
                long timeStamp = date.getTime();

                String method = arr[3];
                String path = arr[4];

                LogBean logBean = new LogBean(ip, userId, timeStamp, method, path);
                return logBean;
            }
        });

        DataStream<Tuple2<String, Integer>> mapStream2 = mapStream.map(new MapFunction<LogBean, Tuple2<String, Integer>>() {
            @Override
            public Tuple2<String, Integer> map(LogBean logBean) throws Exception {
                return Tuple2.of(logBean.getIp(), 1);
            }
        });

        //4. sink-数据输出

        KeyedStream<Tuple2<String,Integer>, String> keyByStream = mapStream2.keyBy(tuple -> tuple.f0);

        // sum的底层是 reduce
       // keyByStream.sum(1).print();
        //  [ ("10.0.0.1",1),("10.0.0.1",1),("10.0.0.1",1) ]
        keyByStream.reduce(new ReduceFunction<Tuple2<String, Integer>>() {
            @Override
            public Tuple2<String, Integer> reduce(Tuple2<String, Integer> t1, Tuple2<String, Integer> t2) throws Exception {
                // t1 => ("10.0.0.1",10)
                // t2 => ("10.0.0.1",1)
                return Tuple2.of(t1.f0,  t1.f1 + t2.f1);
            }
        }).print();



        //5. execute-执行
        env.execute();
    }
}

随堂笔记:

package com.bigdata.day02;

import lombok.AllArgsConstructor;
import lombok.Data;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import java.text.SimpleDateFormat;
import java.util.Date;

/**
 * @基本功能:
 * @program:FlinkDemo
 * @author: 闫哥
 * @create:2024-05-13 11:40:37
 **/

public class Demo08 {

    // 将数据转换为javaBean
    public static void main(String[] args) throws Exception {

        //1. env-准备环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);

        //2. source-加载数据
        DataStreamSource<String> streamSource = env.readTextFile("datas/a.log");

        //3. transformation-数据处理转换
        KeyedStream<Tuple2<String, Integer>, String> keyBy = streamSource.map(new MapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public Tuple2<String, Integer> map(String value) throws Exception {
                String[] arr = value.split(" ");
                return Tuple2.of(arr[0], 1);
            }
        }).keyBy(v -> v.f0);
        // 不使用reduce的情况,本质上sum的底层是agg,agg的底层是reduce
        //keyBy.sum(1).print();
        // 将相同的IP 已经放入到了同一个组中,接着就开始汇总了。
        keyBy.reduce(new ReduceFunction<Tuple2<String, Integer>>() {
            // 第一个v1 代表汇总过的二元组,第二个v2 ,代表 当前分组中的一个二元组
            @Override
            public Tuple2<String, Integer> reduce(Tuple2<String, Integer> v1, Tuple2<String, Integer> v2) throws Exception {
                return Tuple2.of(v1.f0,v1.f1 + v2.f1);
            }
        }).print();
        // 简化版
        keyBy.reduce(( v1, v2) -> Tuple2.of(v1.f0,v1.f1 + v2.f1)).print();


        //5. execute-执行
        env.execute();
    }
}

flatMap/map/filter/keyby/reduce综合练习

需求: 对流数据中的单词进行统计,排除敏感词TMD【腾讯美团滴滴】

此处用到了一个windows版本的软件 netcat,具体用法,先解压,然后在路径中输入cmd,来到黑窗口。

服务端的启动:

客户端就是双击 nc.exe 即可,里面无需写 nc 命令。

假如你想随时随地使用nc这个命令,需要配置环境变量。

官网地址:netcat 1.11 for Win32/Win64

Netcat介绍及安装使用_netcat安装-CSDN博客

package com.bigdata.day03;

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.planner.expressions.In;
import org.apache.flink.util.Collector;

/**
 * @基本功能:
 * @program:FlinkDemo
 * @author: 闫哥
 * @create:2023-11-21 10:44:27
 **/
public class ZongHeDemo {

    public static void main(String[] args) throws Exception {

        //1. env-准备环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);

        //2. source-加载数据
        DataStream<String> dataStreamSource = env.socketTextStream("localhost", 8889);

        dataStreamSource.filter(new FilterFunction<String>() {
            @Override
            public boolean filter(String line) throws Exception {
                return !line.contains("TMD");
            }
        }).flatMap(new FlatMapFunction<String, String>() {
            @Override
            public void flatMap(String line, Collector<String> collector) throws Exception {
                String[] arr = line.split(" ");
                for (String word : arr) {

                    collector.collect(word);

                }
            }
        }).map(new MapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public Tuple2<String, Integer> map(String s) throws Exception {
                return Tuple2.of(s,1);
            }
        }).keyBy(v -> v.f0).reduce(new ReduceFunction<Tuple2<String, Integer>>() {
            @Override
            public Tuple2<String, Integer> reduce(Tuple2<String, Integer> tuple2, Tuple2<String, Integer> t1) throws Exception {
                return Tuple2.of(tuple2.f0,tuple2.f1 + t1.f1);
            }
        }).print();
        //4. sink-数据输出


        //5. execute-执行
        env.execute();
    }
}

随堂代码:

package com.bigdata.source;

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.util.Collector;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

/**
 * @基本功能:
 * @program:FlinkDemo
 * @author: 闫哥
 * @create:2024-11-22 09:54:28
 **/
class JdbcSource extends RichSourceFunction<String> {

    Connection connection;
    PreparedStatement statement;
    @Override
    public void open(Configuration parameters) throws Exception {
         //使用jdbc
         //Class.forName("com.jdbc.cj.mysql.Driver");
         connection = DriverManager.getConnection("jdbc:mysql://localhost:3306/zuoye", "root", "123456");
         statement = connection.prepareStatement("select word from fuck_words");
    }

    @Override
    public void close() throws Exception {
        statement.close();
        connection.close();
    }

    @Override
    public void run(SourceContext<String> ctx) throws Exception {
        ResultSet resultSet = statement.executeQuery();
        while(resultSet.next()){
            String word = resultSet.getString("word");
            ctx.collect(word);
        }
    }

    @Override
    public void cancel() {

    }
}

public class _07综合案例 {

    public static void main(String[] args) throws Exception {

        //1. env-准备环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);

        //2. source-加载数据
        DataStreamSource<String> dataStreamSource = env.socketTextStream("localhost", 8889);
        //3. transformation-数据处理转换

        DataStreamSource<String> jdbcSource = env.addSource(new JdbcSource());
        jdbcSource.print();
        ArrayList<String> words = new ArrayList<>();
        jdbcSource.map(new MapFunction<String, String>() {
            @Override
            public String map(String value) throws Exception {
                words.add(value);
                return value;
            }
        }).print();
        // 此路不通,因为我们的代码不是顺序执行的,而且我们的算子还是并行运行的  words没有任何值,悬案!
        System.out.println(words);


        String[] arr = {"tmd","fuck"};
        // 此处的list 只能读取,不能修改和删除
        List<String> list = Arrays.asList(arr);

        dataStreamSource.flatMap(new FlatMapFunction<String, String>() {


            @Override
            public void flatMap(String value, Collector<String> out) throws Exception {
                String[] arr = value.split("\\s+");
                for (String word : arr) {
                    // 此处完全可以直接将 不要的单词过滤掉,也可以将来使用filter方法过滤
                    out.collect(word);
                }
            }
        }).filter(new FilterFunction<String>() {
            @Override
            public boolean filter(String value) throws Exception {
                return !list.contains(value);
            }
        }).map(new MapFunction<String, Tuple2<String,Integer>>() {
            @Override
            public Tuple2<String, Integer> map(String value) throws Exception {
                return new Tuple2<>(value,1);
            }
        }).keyBy(new KeySelector<Tuple2<String, Integer>, String>() {
            @Override
            public String getKey(Tuple2<String, Integer> value) throws Exception {
                return value.f0;
            }
        }).sum(1).print();
        //4. sink-数据输出

        //5. execute-执行
        env.execute();
    }
}

union和connect-合并和连接

Union

union可以合并多个同类型的流

将多个DataStream 合并成一个DataStream

【注意】:union合并的DataStream的类型必须是一致的

注意:union可以取并集,但是不会去重

connect

connect可以连接2个不同类型的流(最后需要处理后再输出)

DataStream,DataStream → ConnectedStreams:连接两个保持他们类型的数据流,两个数据流被 Connect 之后,只是被放在了一个同一个流中,内部依然保持各自的数据和形式不发生任何变化【一国两制】,两个流相互独立, 作为对比Union后是真的变成一个流了。

和union类似,但是connect只能连接两个流,两个流之间的数据类型可以同,对两个流的数据可以分别应用不同的处理逻辑.

package com.bigdata.day03;

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.streaming.api.datastream.ConnectedStreams;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.CoMapFunction;

/**
 * @基本功能:
 * @program:FlinkDemo
 * @author: 闫哥
 * @create:2023-11-21 11:40:12
 **/
public class UnionConnectDemo {

    public static void main(String[] args) throws Exception {

        //1. env-准备环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);

        //2. source-加载数据
        DataStreamSource<String> stream1 = env.fromElements("hello", "nihao", "吃甘蔗的人");
        DataStreamSource<String> stream2 = env.fromElements("hello", "kong ni qi wa", "看电子书的人");
        DataStream<String> unionStream = stream1.union(stream2);
        unionStream.print();

        DataStream<Long> stream3 = env.fromSequence(1, 10);
        // stream1.union(stream3); 报错
        //3. transformation-数据处理转换
        ConnectedStreams<String, Long> connectStream = stream1.connect(stream3);
        // 此时你想使用这个流,需要各自重新处理
        // 处理完之后的数据类型必须相同
        DataStream<String> mapStream = connectStream.map(new CoMapFunction<String, Long, String>() {
            // string 类型的数据
            @Override
            public String map1(String value) throws Exception {
                return value;
            }

            // 这个处理long 类型的数据

            @Override
            public String map2(Long value) throws Exception {
                return Long.toString(value);
            }
        });

        //4. sink-数据输出
        mapStream.print();


        //5. execute-执行
        env.execute();
    }
}
package com.bigdata.transforma;

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.streaming.api.datastream.ConnectedStreams;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.CoProcessFunction;
import org.apache.flink.util.Collector;

/**
 * @基本功能:
 * @program:FlinkDemo
 * @author: 闫哥
 * @create:2024-11-22 10:50:13
 **/
public class _08_两个流join {

    public static void main(String[] args) throws Exception {

        //1. env-准备环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);

        DataStreamSource<String> ds1 = env.fromElements("bigdata", "spark", "flink");
        DataStreamSource<String> ds2 = env.fromElements("python", "scala", "java");
        DataStream<String> ds3 = ds1.union(ds2);
        ds3.print();

        // 接着演示 connect
        DataStreamSource<Long> ds4 = env.fromSequence(1, 10);
        ConnectedStreams<String, Long> ds5 = ds1.connect(ds4);

        ds5.process(new CoProcessFunction<String, Long, String>() {
            @Override
            public void processElement1(String value, CoProcessFunction<String, Long, String>.Context ctx, Collector<String> out) throws Exception {
                System.out.println("String流:"+value);
                out.collect(value);
            }

            @Override
            public void processElement2(Long value, CoProcessFunction<String, Long, String>.Context ctx, Collector<String> out) throws Exception {
                System.out.println("Long流:"+value);
                out.collect(String.valueOf(value));
            }
        }).print("合并后的打印:");

        //2. source-加载数据
        //3. transformation-数据处理转换
        //4. sink-数据输出


        //5. execute-执行
        env.execute();
    }
}

Side Outputs侧道输出(侧输出流) --可以分流

举例说明:对流中的数据按照奇数和偶数进行分流,并获取分流后的数据

package com.bigdata.day02;

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;

/**
 * @基本功能:
 * @program:FlinkDemo
 * @author: 闫哥
 * @create:2024-05-13 16:19:56
 **/
public class Demo11 {

    public static void main(String[] args) throws Exception {

        //1. env-准备环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);

        // 侧道输出流
        DataStreamSource<Long> streamSource = env.fromSequence(0, 100);
        // 定义两个标签
        OutputTag<Long> tag_even = new OutputTag<Long>("偶数", TypeInformation.of(Long.class));
        OutputTag<Long> tag_odd = new OutputTag<Long>("奇数", TypeInformation.of(Long.class));
        //2. source-加载数据
        SingleOutputStreamOperator<Long> process = streamSource.process(new ProcessFunction<Long, Long>() {
            @Override
            public void processElement(Long value, ProcessFunction<Long, Long>.Context ctx, Collector<Long> out) throws Exception {
                // value 代表每一个数据
                if (value % 2 == 0) {
                    ctx.output(tag_even, value);
                } else {
                    ctx.output(tag_odd, value);
                }
            }
        });
        // 从数据集中获取奇数的所有数据
        DataStream<Long> sideOutput = process.getSideOutput(tag_odd);
        sideOutput.print("奇数:");
        // 获取所有偶数数据
        DataStream<Long> sideOutput2 = process.getSideOutput(tag_even);
        sideOutput2.print("偶数:");
        //3. transformation-数据处理转换
        //4. sink-数据输出


        //5. execute-执行
        env.execute();
    }
}

物理分区

Flink 也提供以下方法让用户根据需要在数据转换完成后对数据分区进行更细粒度的配置

1) Global Partitioner

该分区器会将所有的数据都发送到下游的某个算子实例(subtask id = 0)

2)Shuffle Partitioner

根据均匀分布随机划分元素

3) Broadcast Partitioner

发送到下游所有的算子实例,是将上游的所有数据,都给下游的每一个分区一份。

4)Rebalance Partitioner --重分区 【重点】

通过循环的方式依次发送到下游的task

数据倾斜: 某一个分区数据量过大。

解决方案:可以对分区数据进行重分区rebalance。

源码截图:

通过人为制造数据不平衡,然后通过方法让其平衡,可以通过观察每一个分区的总数来观察。

随堂代码,熟悉各个分区器的使用方法:

package com.bigdata.day03;

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

/**
 * @基本功能:
 * @program:FlinkDemo
 * @author: 闫哥
 * @create:2023-11-21 14:11:59
 **/
public class PartitionerDemo {

    public static void main(String[] args) throws Exception {

        //1. env-准备环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);

        // 手动设置五个分区
        env.setParallelism(5);
        //2. source-加载数据
        DataStreamSource<Long> dataStream = env.fromSequence(0, 100);

        // 认为制造数据不均衡的情况
        /**
         *  //    [1,20],[21,40],[41,60],[61,80],[81,100]
         *         // -->[11,20],[21,40],[41,60],[61,80],[81,100]
         *         // 这个是由数据源所决定的,假如是一个socket ,就不会出现这个情况了。
         */
        SingleOutputStreamOperator<Long> filterStream = dataStream.filter(new FilterFunction<Long>() {
            @Override
            public boolean filter(Long aLong) throws Exception {
                return aLong > 10;
            }
        });
        filterStream.print("前");
        // 在一个流对象后面,调用rebalance ,会将流中的数据进行再平衡,得到一个新的流
        // DataStream<Long> rebalanceStream = filterStream.rebalance();
        // global 是将数据全部发送给一个分区
        // DataStream<Long> rebalanceStream = filterStream.global();
        // 将上游数据,随机发送给下游的分区
        // DataStream<Long> rebalanceStream = filterStream.shuffle();
        // 前面一个分区的数据,发送给后面一个分区
        //DataStream<Long> rebalanceStream = filterStream.forward();
        // 将前面分区的数据发送给后面的所有分区
        DataStream<Long> rebalanceStream = filterStream.broadcast();
        rebalanceStream.print("后");
        //3. transformation-数据处理转换
        //4. sink-数据输出


        //5. execute-执行
        env.execute();
    }
}

如何查看每一个分区的数据量呢?

package com.bigdata.transforma;

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

/**
 * @基本功能:
 * @program:FlinkDemo
 * @author: 闫哥
 * @create:2024-11-22 11:52:43
 **/
public class _10_物理分区策略 {

    public static void main(String[] args) throws Exception {

        //1. env-准备环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);

        env.setParallelism(5);

        DataStreamSource<Long> streamSource = env.fromSequence(1, 100000);

        //DataStream<Long> ds = streamSource.global();
        // 此时打印,数据都在1 分区
        //ds.print();
        // shuffle 将数据随机均匀分布在不同的 分区上,或者任务上。
        //DataStream<Long> shuffle = streamSource.shuffle();
        // broadcast 将上游的每一个分区的数据发送给下有的所有分区
        //DataStream<Long> broadcast = streamSource.broadcast();
        // 将数据均匀的分发给下游的分区,如果遇到数据倾斜,直接就解决了
        //DataStream<Long> rebalance = streamSource.rebalance();
        // 上有的数据对应下游的数据,分区数必须是 1:1才行
        DataStream<Long> forward = streamSource.forward();
        // streamSource.rescale();
        //shuffle.print();
        // 虽然打印了,但是我不知道某个分区具体有多少数据,所以我想看到某个分区,以及这个分区的数据量
        forward.map(new RichMapFunction<Long, Tuple2<Long,Integer>>() {


            @Override
            public Tuple2<Long, Integer> map(Long value) throws Exception {
                long partition = getRuntimeContext().getIndexOfThisSubtask();
                return Tuple2.of(partition,1);
            }
        }).keyBy(0).sum(1).print();

        //5. execute-执行
        env.execute();
    }
}

Rebalance底层逻辑:

5) Forward Partitioner

发送到下游对应的第一个task,保证上下游算子并行度一致,即上有算子与下游算子是1:1的关系

在上下游的算子没有指定分区器的情况下,如果上下游的算子并行度一致,则使用ForwardPartitioner,否则使用RebalancePartitioner,对于ForwardPartitioner,必须保证上下游算子并行度一致,否则会抛出异常。

6)Custom(自定义) Partitioning

关于分区,很多技术都有分区:

1、hadoop 有分区

2、kafka 有分区

3、spark 有分区

4、hive 有分区

使用用户定义的Partitioner 为每个元素选择目标任务

以下代码是这几种情况的代码演示:

package com.bigdata.transforma;

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.Partitioner;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

/**
 * @基本功能:
 * @program:FlinkDemo
 * @author: 闫哥
 * @create:2024-11-22 14:15:50
 **/
class CustomPartitioner implements Partitioner<Long>{

    @Override
    public int partition(Long key, int numPartitions) {
        System.out.println(numPartitions);
        if(key <10000){
            return 0;
        }
        return 1;
    }
}
public class _11_自定义分区规则 {

    public static void main(String[] args) throws Exception {

        //1. env-准备环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);

        env.setParallelism(2);
        DataStreamSource<Long> streamSource = env.fromSequence(1, 15000);
        DataStream<Long> dataStream = streamSource.partitionCustom(new CustomPartitioner(), new KeySelector<Long, Long>() {

            @Override
            public Long getKey(Long value) throws Exception {
                return value;
            }
        });

        //dataStream.print();
        // 每一个分区的数据量有多少
        dataStream.map(new RichMapFunction<Long, Tuple2<Long,Integer>>() {


            @Override
            public Tuple2<Long, Integer> map(Long value) throws Exception {
                long partition = getRuntimeContext().getIndexOfThisSubtask();
                return Tuple2.of(partition,1);
            }
        }).keyBy(0).sum(1).print("前:");

        DataStream<Long> rebalance = dataStream.rebalance();

        rebalance.map(new RichMapFunction<Long, Tuple2<Long,Integer>>() {


            @Override
            public Tuple2<Long, Integer> map(Long value) throws Exception {
                long partition = getRuntimeContext().getIndexOfThisSubtask();
                return Tuple2.of(partition,1);
            }
        }).keyBy(0).sum(1).print("后:");



        //5. execute-执行
        env.execute();
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2246137.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

fastadmin实现站内通知功能

实现效果如下 application/admin/view/common/header.html <style>#notificationMenu {display: none;position: absolute;top: 40px;right: 0;background: #fff;border-radius: 6px;padding: 10px 0;width: 300px;box-shadow: 0 4px 12px rgba(0, 0, 0, 0.15);z-inde…

默语博主的推荐:探索技术世界的旅程

这是第一位推荐的博主默语 引言&#xff1a; CSDN中的默语博主是一个值得关注和学习的技术大拿。他的博客内容不仅涵盖了各种热门的技术领域&#xff0c;还能够帮助读者深入了解技术背后的原理和应用。在这篇类博客的内容中&#xff0c;我们将探索默语博主推荐的几篇博客&#…

【漏洞复现】|智互联SRM智联云采系统quickReceiptDetail SQL注入漏洞

漏洞描述 智互联(深圳)科技有限公司SRM智联云采系统针对企业供应链管理难题&#xff0c;及智能化转型升级需求&#xff0c;智联云采依托人工智能、物联网、大数据、云等技术&#xff0c;通过软硬件系统化方案&#xff0c;帮助企业实现供应商关系管理和采购线上化、移动化、智能…

【数据分析】认清、明确

1、什么是数据分析。 - 通过对大量的数据进行科学的分析。 - 得出结论&#xff0c;提出建议&#xff0c;辅助公司企业的决策。2、数据分析分为几步。 - 1.明确目的! - 2.收集数据!自己的数据! 自动化采集的数据! - 3.数据处理! - 4.数据分析!数据分析(业务)数据挖掘(代码算法…

前后端分离,解决vue+axios跨域和proxyTable不生效等问题

看到我这篇文章前可能你以前看过很多类似的文章。至少我是这样的&#xff0c;因为一直没有很好的解决问题。 正文 当我们通过webstorm等IDE开发工具启动项目的时候&#xff0c;通过命令控制台可以观察到启动项目的命令 如下&#xff1a; webpack-dev-server --inline --prog…

WPF中如何让Textbox显示为一条直线

由于Textbox直接使用是一条直线 设置如下代码 可以让Textbox变为直线输入 <Style TargetType"TextBox"x:Key"UsernameTextBoxStyle"><Setter Property"Template"><Setter.Value><ControlTemplate TargetType"{x:Typ…

TSmaster Measurement setup(测量设置)

文章目录 1、Measurement setup功能介绍2、数据流过滤3、Measurement Filter 测量过滤器3.1 插入过滤器3.2 设置过滤数据3.3 过滤条件的失能3.4 窗口缩放 1、Measurement setup功能介绍 Measurement setup 窗体主要包含三个功能&#xff1a; 提供一个面板&#xff0c;用户能够…

[Unity Demo]从零开始制作空洞骑士Hollow Knight第二十集:制作专门渲染HUD的相机HUD Camera和画布HUD Canvas

提示&#xff1a;文章写完后&#xff0c;目录可以自动生成&#xff0c;如何生成可参考右边的帮助文档 文章目录 前言一、制作HUD Camera以及让两个相机同时渲染屏幕二、制作HUD Canvas 1.制作法力条Soul Orb引入库2.制作生命条Health读入数据3.制作吉欧统计数Geo Counter4.制作…

万字长文解析Golang高性能内存缓存库BigCache

项目地址 BigCache 是一个快速&#xff0c;支持并发访问&#xff0c;自淘汰的内存型缓存&#xff0c;可以在存储大量元素时依然保持高性能。BigCache将元素保存在堆上却避免了GC的开销。 背景介绍 BigCache的作者在项目里遇到了如下的需求&#xff1a; 支持http协议支持 10…

vulfocus在线靶场:骑士cms_cve_2020_35339:latest 速通手册

目录 一、启动环境&#xff0c;访问页面&#xff0c;ip:端口号/index.php?madmin,进入后台管理页面&#xff0c;账号密码都是adminadmin 二、进入之后&#xff0c;根据图片所示&#xff0c;地址后追加一下代码&#xff0c;保存修改 ​三、新开标签页访问&#xff1a;①ip:端…

【数据结构】归并排序 —— 递归及非递归解决归并排序

归并排序 一、归并排序1、归并排序的思想2、归并排序代码实现&#xff08;递归&#xff09;<1> 归并排序的递归区间<2> 归并排序的稳定性<3> 拷贝 3、归并排序代码实现&#xff08;非递归&#xff09;<1> 循环区间溢出问题 二、总结 一、归并排序 1、…

Java技术复习提升 10异常

10 异常 10.1异常介绍及分类 异常捕获 选中后alttabt->选中try-catch 异常就是程序执行中不正常的情况 注意语法和逻辑错误并不是异常 异常分类有两种 error和exception error是错误 虚拟机无法解决的严重问题 exception是其他因为编程错误或者外在因素导致的一般性的问…

transformer.js(三):底层架构及性能优化指南

Transformer.js 是一个轻量级、功能强大的 JavaScript 库&#xff0c;专注于在浏览器中运行 Transformer 模型&#xff0c;为前端开发者提供了高效实现自然语言处理&#xff08;NLP&#xff09;任务的能力。本文将详细解析 Transformer.js 的底层架构&#xff0c;并提供实用的性…

HCIA笔记3--TCP-UDP-交换机工作原理

1. tcp协议 可靠的连接 1.1 报文格式 1.2 三次握手 1.3 四次挥手 为什么TIME_WAIT需要2MSL的等待时间&#xff1f; &#xff08;a&#xff09; 为了实现可靠的关闭 &#xff08;b&#xff09;为了让过期的报文在网络上消失 对于(a), 假设host发给server的last ack丢了。 ser…

[Redis#2] 定义 | 使用场景 | 安装教程 | 快!

目录 1. 定义 In-memory data structures 在内存中存储数据 2. 优点&#xff01;快 Programmability 可编程性 Extensibility 扩展性 Persistence 持久化 Clustering 分布式集群 High availability 高可用性 ⭕快速访问的实现 3. 使用场景 1.Real-time data store …

Dubbo源码解析-服务调用(七)

一、服务调用流程 服务在订阅过程中&#xff0c;把notify 过来的urls 都转成了invoker&#xff0c;不知道大家是否还记得前面的rpc 过程&#xff0c;protocol也是在服务端和消费端各连接子一个invoker&#xff0c;如下图&#xff1a; 这张图主要展示rpc 主流程&#xff0c;消费…

Postman之newman

系列文章目录 1.Postman之安装及汉化基本使用介绍 2.Postman之变量操作 3.Postman之数据提取 4.Postman之pm.test断言操作 5.Postman之newman Postman之newman 1.基础环境node安装1.1.配置环境变量1.2.安装newman和html报告组件 2.newman运行 newman可以理解为&#xff0c;没有…

shell脚本(五)

声明&#xff01; 学习视频来自B站up主 泷羽sec 有兴趣的师傅可以关注一下&#xff0c;如涉及侵权马上删除文章&#xff0c;笔记只是方便各位师傅的学习和探讨&#xff0c;文章所提到的网站以及内容&#xff0c;只做学习交流&#xff0c;其他均与本人以及泷羽sec团队无关&#…

人口老龄化社区服务|基于springboot+vue的人口老龄化社区服务与管理平台(源码+数据库+文档)

目录 基于springbootvue的人口老龄化社区服务与管理平台 一、前言 二、系统设计 三、系统功能设计 四、数据库设计 五、核心代码 六、论文参考 七、最新计算机毕设选题推荐 八、源码获取&#xff1a; 博主介绍&#xff1a;✌️大厂码农|毕设布道师&#xff0c;阿里云…

初识WGCLOUD - 监测磁盘空间还能使用多久

WGCLOUD是一款免费开源的运维监控软件&#xff0c;性能优秀&#xff0c;部署简单&#xff0c;轻巧使用&#xff0c;支持大部分的Linux和Windows、安卓、MacOS等平台安装部署 最近发布的新版本 v3.5.4&#xff0c;WGCLOUD新增了可以自动计算每个磁盘剩余空间的可使用天数&#x…