大数据学习之Flink基础(补充)

news2024/11/27 15:38:01

Flink基础

1、系统时间与事件时间

系统时间(处理时间)

在Sparksreaming的任务计算时,使用的是系统时间。

假设所用窗口为滚动窗口,大小为5分钟。那么每五分钟,都会对接收的数据进行提交任务.

但是,这里有个要注意的点,有个概念叫时间轴对齐。若我们在12:12开始接收数据,按道理我们会在12:17进行提交任务。事实上我们会在12:20进行提交任务,因为会进行时间轴对齐,将一天按照五分钟进行划分,会对应到12:20。在此时提交任务,后面每个五分钟提交任务,都会对应到我们所划分的时间轴。

事件时间

flink支持带有事件时间的窗口(Window)操作

事件时间区别于系统时间,如下举例:

flink处理实时数据,对数据进行逐条处理。设定事件时间为5分钟,12:00开始接收数据,接收的第一条数据时间为12:01,接收的第二条数据为12:02。假设从此时起没有收到数据,那么将不会进行提交任务。**到了12:06,接收到了第三条数据。第三条数据的接收时间自12:00起,已经超过了五分钟,**那么此时便会进行任务提交。

2、wordcount简单案例的实现
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

public class Demo01StreamWordCount {
    public static void main(String[] args) throws Exception {
        // 1、构建Flink环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment
                .getExecutionEnvironment();

        // 2、通过Socket模拟无界流环境,方便FLink处理
        // 虚拟机启动:nc -lk 8888
        // 从Source构建第一个DataStream
        // TODO C:\Windows\System32\drivers\etc\hosts文件中配置了master与IP地址的映射,所以这里可以使用master
        DataStream<String> lineDS = env.socketTextStream("master", 8888);

        // 统计每个单词的数量
        // 第一步:将每行数据的每个单词切出来并进行扁平化处理
        DataStream<String> wordsDS = lineDS.flatMap(new FlatMapFunction<String, String>() {
            /**
             *FlatMapFunction<String, String>: 表示输入、输出数据的类型
             * @param line DS中的一条数据
             * @param out 通过collect方法将数据发送到下游
             * @throws Exception
             */
            @Override
            public void flatMap(String line, Collector<String> out) throws Exception {
                for (String word : line.split(",")) {
                    // 将每个单词发送到下游
                    out.collect(word);
                }
            }
        });

        // 第二步:将每个单词变成 KV格式,V置为1;返回的数据是一个二元组Tuple2
        DataStream<Tuple2<String, Integer>> wordKVDS = wordsDS.map(new MapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public Tuple2<String, Integer> map(String word) throws Exception {
                return Tuple2.of(word, 1);
            }
        });

        /**
         * 第三步:按每一个单词进行分组; 无法再使用其父类DataStream进行定义(无法向上转型)
         * KeyedStream<T, K> 是 DataStream<T> 的一个特殊化版本,它添加了与键控操作相关的特定方法(如 reduce、aggregate、window 等)。
         * 由于 KeyedStream 提供了额外的功能和方法,它不能简单地被视为 DataStream 的一个简单实例,
         * 因为它实现了额外的接口(如 KeyedOperations<T, K>)并可能覆盖了某些方法的行为以支持键控操作。
         */
        KeyedStream<Tuple2<String, Integer>, String> keyedDS = wordKVDS.keyBy(new KeySelector<Tuple2<String, Integer>, String>() {
            @Override
            public String getKey(Tuple2<String, Integer> tuple2) throws Exception {
                // 对Key进行分组
                return tuple2.f0;
            }
        });
        // 第四步:对1进行聚合sum,下标是从0开始的
        DataStream<Tuple2<String, Integer>> wordCntDS = keyedDS.sum(1);

        // 3、打印结果:将DS中的内容Sink到控制台
        wordCntDS.print();

        // 执行任务
        env.execute();
    }
}
3、设置任务执行的并行度

本机为8核,可并行16的线程

手动改变任务的并行度,若不设置则会显示1-16,设置后只会显示1-2
env.setParallelism(2);
setBufferTimeout():设置输出缓冲区刷新的最大时间频率(毫秒)。
env.setBufferTimeout(200);

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

public class Demo01StreamWordCount {
    public static void main(String[] args) throws Exception {
        // 1、构建Flink环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment
                .getExecutionEnvironment();
        // 手动改变任务的并行度,默认并行度为最大,
        env.setParallelism(2);
        // setBufferTimeout():设置输出缓冲区刷新的最大时间频率(毫秒)。
        env.setBufferTimeout(200);

        // 2、通过Socket模拟无界流环境,方便FLink处理
        // 虚拟机启动:nc -lk 8888
        // 从Source构建第一个DataStream
        DataStream<String> lineDS = env.socketTextStream("master", 8888);
        System.out.println("lineDS并行度:" + lineDS.getParallelism());

        // 统计每个单词的数量
        // 第一步:将每行数据的每个单词切出来并进行扁平化处理
        DataStream<String> wordsDS = lineDS.flatMap(new FlatMapFunction<String, String>() {
            /**
             *
             * @param line DS中的一条数据
             * @param out 通过collect方法将数据发送到下游
             * @throws Exception
             */
            @Override
            public void flatMap(String line, Collector<String> out) throws Exception {
                for (String word : line.split(",")) {
                    // 将每个单词发送到下游
                    out.collect(word);
                }
            }
        });
        System.out.println("wordsDS并行度:" + wordsDS.getParallelism());

        // 第二步:将每个单词变成 KV格式,V置为1
        DataStream<Tuple2<String, Integer>> wordKVDS = wordsDS.map(new MapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public Tuple2<String, Integer> map(String word) throws Exception {
                return Tuple2.of(word, 1);
            }
        });
        System.out.println("wordKVDS并行度:" + wordKVDS.getParallelism());

        // 第三步:按每一个单词进行分组
        // keyBy之后数据流会进行分组,相同的key会进入同一个线程中被处理
        // 传递数据的规则:hash取余(线程总数,默认CPU的总线程数)原理
        KeyedStream<Tuple2<String, Integer>, String> keyedDS = wordKVDS.keyBy(new KeySelector<Tuple2<String, Integer>, String>() {
            @Override
            public String getKey(Tuple2<String, Integer> tuple2) throws Exception {
                return tuple2.f0;
            }
        });
        System.out.println("keyedDS并行度:" + keyedDS.getParallelism());

        // 第四步:对1进行聚合sum
        DataStream<Tuple2<String, Integer>> wordCntDS = keyedDS.sum(1);
        System.out.println("wordCntDS并行度:" + wordCntDS.getParallelism());
        // 3、打印结果:将DS中的内容Sink到控制台
        keyedDS.print();

        env.execute();

    }
}
img
4、设置批/流处理方式,使用Lambda表达式,使用自定类实现接口中抽象的方法
package com.shujia.flink.core;

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

public class Demo02BatchWordCount {
    public static void main(String[] args) throws Exception {

        // 1、构建环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 设置Flink程序的处理方式:默认是流处理
        /**
         * BATCH:批处理,只能处理有界流,底层是MR模型,可以进行预聚合
         * STREAMING:流处理,可以处理无界流,也可以处理有界流,底层是持续流模型,数据一条一条处理
         * AUTOMATIC:自动判断,当所有的Source都是有界流则使用BATCH模式,当Source中有一个是无界流则会使用STREAMING模式
         */
        env.setRuntimeMode(RuntimeExecutionMode.BATCH);

        // 2、获得第一个DS
        // 通过readTextFile可以基于文件构建有界流
        DataStream<String> wordsFileDS = env.readTextFile("flink/data/words.txt");

        // 3、DS之间的转换
        // 统计每个单词的数量
        // 第一步:将每行数据的每个单词切出来并进行扁平化处理

        // Flink处理逻辑传入的方式
        // new XXXFunction 使用匿名内部类
//        DataStream<String> wordsDS = wordsFileDS.flatMap(new FlatMapFunction<String, String>() {
//            /**
//             * @param line DS中的一条数据
//             * @param out 通过collect方法将数据发送到下游
//             * @throws Exception
//             * Type parameters:
//             * FlatMapFunction<T, O>
//             * <T> – Type of the input elements. <O> – Type of the returned elements.
//             */
//            @Override
//            public void flatMap(String line, Collector<String> out) throws Exception {
//                for (String word : line.split(",")) {
//                    // 将每个单词发送到下游
//                    out.collect(word);
//                }
//            }
//        });

        /**
         * 使用Lambda表达式
         * 使用时得清楚FlatMapFunction中所要实现的抽象方法flatMap的两个参数的含义
         * ()->{}
         *  通过 -> 分隔,左边是函数的参数,右边是函数实现的具体逻辑
         *  并且需要给出 flatMap函数的输出类型,Types.STRING
         *  line: 输入数据类型, out: 输出数据类型
         */
        DataStream<String> wordsDS = wordsFileDS.flatMap((line, out) -> {
            for (String word : line.split(",")) {
                out.collect(word);
            }
        }, Types.STRING);


        //TODO 使用自定类实现接口中抽象的方法,一般不使用这种方法
        wordsFileDS.flatMap(new MyFunction()).print();

        // 第二步:将每个单词变成 KV格式,V置为1
//        DataStream<Tuple2<String, Integer>> wordKVDS = wordsDS.map(new MapFunction<String, Tuple2<String, Integer>>() {
//            @Override
//            public Tuple2<String, Integer> map(String word) throws Exception {
//                return Tuple2.of(word, 1);
//            }
//        });

        // TODO 此处需要给出 map函数的输出类型,Types.TUPLE(Types.STRING, Types.INT),是一个二元组
        DataStream<Tuple2<String, Integer>> wordKVDS = wordsDS.map(word -> Tuple2.of(word, 1), Types.TUPLE(Types.STRING, Types.INT));


        /**
         * 第三步:按每一个单词进行分组
         *    keyBy之后数据流会进行分组,相同的key会进入同一个线程中被处理
         *    传递数据的规则:hash取余(线程总数,默认CPU的总线程数,本机为16)原理
         */
//        KeyedStream<Tuple2<String, Integer>, String> keyedDS = wordKVDS.keyBy(new KeySelector<Tuple2<String, Integer>, String>() {
//            @Override
//            public String getKey(Tuple2<String, Integer> tuple2) throws Exception {
//                return tuple2.f0;
//            }
//        });


        // TODO 此处的Types.STRING 并不是直接表示某个方法的输出类型,而是用来指定 keyBy 方法中键(key)的类型。这里可以省略!
        KeyedStream<Tuple2<String, Integer>, String> keyedDS = wordKVDS.keyBy(kv -> kv.f0, Types.STRING);
        // 第四步:对1进行聚合sum,无需指定返回值类型
        DataStream<Tuple2<String, Integer>> wordCntDS = keyedDS.sum(1);

        // 4、最终结果的处理(保存/输出打印)
        wordCntDS.print();

        env.execute();

    }
}

class MyFunction implements FlatMapFunction<String,String>{

    @Override
    public void flatMap(String line, Collector<String> out) throws Exception {
        for (String word : line.split(",")) {
                    // 将每个单词发送到下游
                    out.collect(word);
                }
    }
}
5、source

Flink 在流处理和批处理上的 source 大概有 4 类:
基于本地集合的 source、
基于文件的 source、
基于网络套接字的 source、
自定义的 source。自定义的 source 常见的有 Apache kafka、Amazon Kinesis Streams、RabbitMQ、Twitter Streaming API、Apache NiFi 等,当然你也可以定义自己的 source。

1、从本地集合source中读取数据
package com.shujia.flink.source;

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import java.util.ArrayList;

public class Demo01ListSource {
    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 本地集合Source
        ArrayList<String> arrList = new ArrayList<>();
        arrList.add("flink");
        arrList.add("flink");
        arrList.add("flink");
        arrList.add("flink");

        //TODO 有界流,fromCollection
        DataStream<String> listDS = env.fromCollection(arrList);
        listDS.print();

        env.execute();
    }
}
2、新版本从本地文件中读取数据,有界流和无界流两种方式
package com.shujia.flink.source;

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.connector.file.src.FileSource;
import org.apache.flink.connector.file.src.reader.TextLineInputFormat;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import java.io.File;
import java.time.Duration;

public class Demo02FileSource {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        //TODO 历史版本读文件的方式,有界流
        DataStream<String> oldFileDS = env.readTextFile("flink/data/words.txt");
//        oldFileDS.print();

        //TODO 读取案例一: 新版本加载文件的方式:FileSource,默认是有界流
        FileSource<String> fileSource = FileSource
                .forRecordStreamFormat(
                        new TextLineInputFormat()
                        , new Path("flink/data/words.txt")
                )
                .build();
        
        //TODO 从Source加载数据构建DS,使用自带source类,使用 fromSource
        DataStream<String> fileSourceDS = env.fromSource(fileSource, WatermarkStrategy.noWatermarks(), "fileSource");
        fileSourceDS.print();

        //TODO 读取案例二: 将读取文件变成无界流
        FileSource<String> fileSource2 = FileSource
                .forRecordStreamFormat(
                        new TextLineInputFormat()
                        , new Path("flink/data/words")
                )
                //TODO 使成为无界流读取一个文件夹中的数据,类似Flume中的spool dir,可以监控一个目录下文件的变化
                // Duration.ofSeconds(5) 以5秒为间隔持续监控
                .monitorContinuously(Duration.ofSeconds(5))
                .build();

        DataStream<String> fileSourceDS2 = env.fromSource(fileSource2,WatermarkStrategy.noWatermarks(),"fileSource2");
        fileSourceDS2.print();
        env.execute();

    }
}
3、自定义source类,区分有界流与无界流
  • 只有在Source启动时会执行一次
  • run方法如果会结束,则Source会得到一个有界流
    
  • run方法如果不会结束,则Source会得到一个无界流
    
package com.shujia.flink.source;

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;

public class Demo03MySource {
    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // TODO 使用自定义source类,通过addSource对其进行添加
        DataStream<String> mySourceDS = env.addSource(new MySource());
        mySourceDS.print();

        env.execute();
        
    }
}


class MySource implements SourceFunction<String>{

    /**
     * 只有在Source启动时会执行一次
     *     run方法如果会结束,则Source会得到一个有界流
     *     run方法如果不会结束,则Source会得到一个无界流
     *  下面的例子Source会得到一个无界流
     */
    @Override
    public void run(SourceContext<String> ctx) throws Exception {
        System.out.println("run方法启动了");
        // ctx 可以通过collect方法向下游发送数据
        long cnt = 0L;
        while(true){
            ctx.collect(cnt+"");
            cnt ++;
            // 休眠一会
            Thread.sleep(1000);
        }


    }

    // Source结束时会执行
    @Override
    public void cancel() {
        System.out.println("Source结束了");

    }
}
4、自定义source类,读取MySQL中的数据,并进行处理
package com.shujia.flink.source;

import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.Statement;

public class Demo04MyMySQLSource {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStream<Students> studentDS = env.addSource(new MyMySQLSource());

        // 统计班级人数
        DataStream<Tuple2<String, Integer>> clazzCntDS = studentDS
                .map(stu -> Tuple2.of(stu.clazz, 1), Types.TUPLE(Types.STRING, Types.INT))
                .keyBy(t2 -> t2.f0)
                .sum(1);
        clazzCntDS.print();

        // 统计性别人数
        DataStream<Tuple2<String, Integer>> genderCntDS = studentDS
                .map(stu -> Tuple2.of(stu.gender, 1), Types.TUPLE(Types.STRING, Types.INT))
                .keyBy(t2 -> t2.f0)
                .sum(1);
        genderCntDS.print();

        env.execute();

    }
}

// TODO 自定义source类从MySQL中读取数据
class MyMySQLSource implements SourceFunction<Students> {

    @Override
    public void run(SourceContext<Students> ctx) throws Exception {
        //TODO run方法只会执行一次创建下列对象的操作
        // 建立连接
        Connection conn = DriverManager.getConnection("jdbc:mysql://master:3306/bigdata_30", "root", "123456");
        // 创建Statement
        Statement st = conn.createStatement();
        // 执行查询
        ResultSet rs = st.executeQuery("select * from students2");
        // 遍历rs提取每一条数据
        while (rs.next()) {
            long id = rs.getLong("id");
            String name = rs.getString("name");
            int age = rs.getInt("age");
            String gender = rs.getString("gender");
            String clazz = rs.getString("clazz");

            Students stu = new Students(id, name, age, gender, clazz);
            ctx.collect(stu);

            /**
             * 16> (文科四班,1)
             * 15> (女,1)
             * 15> (女,2)
             * 2> (男,1)
             * 7> (文科六班,1)
             * 15> (女,3)
             * 2> (男,2)
             * 17> (理科六班,1)
             * 17> (理科六班,2)
             * 13> (理科五班,1)
             * 20> (理科二班,1)
             * 13> (理科四班,1)
             */
        }

        rs.close();
        st.close();
        conn.close();
    }

    @Override
    public void cancel() {

    }
}

// TODO 创建一个类,用于存储从MySQL中取出的数据
class Students {
    Long id;
    String name;
    Integer age;
    String gender;
    String clazz;

    public Students(Long id, String name, Integer age, String gender, String clazz) {
        this.id = id;
        this.name = name;
        this.age = age;
        this.gender = gender;
        this.clazz = clazz;
    }
}
6、sink

Flink 将转换计算后的数据发送的地点 。
Flink 常见的 Sink 大概有如下几类:

写入文件、
打印出来、
写入 socket 、
自定义的 sink 。自定义的 sink 常见的有 Apache kafka、RabbitMQ、MySQL、ElasticSearch、Apache Cassandra、Hadoop FileSystem 等,同理你也可以定义自己的 sink。

1、构建FileSink,监控一个端口中的数据并将其写入到本地文件夹中
package com.shujia.flink.sink;

import org.apache.flink.api.common.serialization.SimpleStringEncoder;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.connector.file.sink.FileSink;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;

import java.time.Duration;
public class Demo01FileSink {
    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStreamSource<String> lineDS = env.socketTextStream("master", 8888);

        // 构建FileSink
        FileSink<String> fileSink = FileSink
                .<String>forRowFormat(new Path("flink/data/fileSink"), new SimpleStringEncoder<String>("UTF-8"))
                .withRollingPolicy(
                        DefaultRollingPolicy.builder()
                                // 这个设置定义了滚动的时间间隔。
                                .withRolloverInterval(Duration.ofSeconds(10))
                                // 这个设置定义了一个不活动间隔。
                                .withInactivityInterval(Duration.ofSeconds(10))
                                // 这个设置定义了单个日志文件可以增长到的最大大小。在这个例子中,每个日志文件在被滚动之前可以增长到最多1MB。
                                .withMaxPartSize(MemorySize.ofMebiBytes(1))
                                .build())
                .build();


        lineDS.sinkTo(fileSink);
        env.execute();

    }
}
2、自定义sink类
package com.shujia.flink.sink;

import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;

import java.util.ArrayList;

public class Demo02MySink {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        ArrayList<String> arrList = new ArrayList<>();
        arrList.add("flink");
        arrList.add("flink");
        arrList.add("flink");
        arrList.add("flink");
        DataStreamSource<String> ds = env.fromCollection(arrList);

        ds.addSink(new MySinkFunction());

        env.execute();

        /**
         * 进入了invoke方法
         * flink
         * 进入了invoke方法
         * flink
         * 进入了invoke方法
         * flink
         * 进入了invoke方法
         * flink
         */

    }
}

class MySinkFunction implements SinkFunction<String>{

    @Override
    public void invoke(String value, Context context) throws Exception {
        System.out.println("进入了invoke方法");
        // invoke 每一条数据会执行一次
        // 最终数据需要sink到哪里,就对value进行处理即可
        System.out.println(value);
    }
}
7、Transformation:数据转换的常用操作
1、Map
package com.shujia.flink.tf;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class Demo01Map {
    public static void main(String[] args) throws Exception {
        // 传入一条数据返回一条数据
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStream<String> ds = env.socketTextStream("master", 8888);

        // 1、使用匿名内部类
        DataStream<Tuple2<String, Integer>> mapDS = ds.map(new MapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public Tuple2<String, Integer> map(String word) throws Exception {
                return Tuple2.of(word, 1);
            }
        });

//        mapDS.print();

        // 2、使用lambda表达式
        DataStream<Tuple2<String, Integer>> mapDS2 =
                ds.map(word -> Tuple2.of(word, 1), Types.TUPLE(Types.STRING, Types.INT));

        mapDS2.print();

        env.execute();
    }
}
2、FlatMap
package com.shujia.flink.tf;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

public class Demo02FlatMap {
    public static void main(String[] args) throws Exception {
        // 传入一条数据返回多条数据,类似UDTF函数
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStream<String> ds = env.socketTextStream("master", 8888);

        // 1、使用匿名内部类
        SingleOutputStreamOperator<Tuple2<String, Integer>> flatMapDS01 = ds.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public void flatMap(String line, Collector<Tuple2<String, Integer>> out) throws Exception {
                for (String word : line.split(",")) {
                    out.collect(Tuple2.of(word, 1));
                }
            }
        });
        flatMapDS01.print();

        // 2、使用lambda表达式
        SingleOutputStreamOperator<Tuple> flatMapDS02 = ds.flatMap((line, out) -> {
            for (String word : line.split(",")) {
                out.collect(Tuple2.of(word, 1));
            }
        }, Types.TUPLE(Types.STRING, Types.INT));
        flatMapDS02.print();


        env.execute();


    }
}
3、Filter
package com.shujia.flink.tf;

import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class Demo03Filter {
    public static void main(String[] args) throws Exception {
        // 过滤数据,注意返回值必须是布尔类型,返回true则保留数据,返回false则过滤数据
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStream<String> ds = env.socketTextStream("master", 8888);

        /**
         * Integer.valueOf:该方法将字符串参数转换为 Integer 对象。返回的是 Integer 类型,即 java.lang.Integer 的一个实例。
         * Integer.parseInt:该方法将字符串参数解析为基本数据类型 int 的值。返回的是 int 类型的值,而不是对象。
         * 无需指定返回值类型
         */
        // 只输出大于10的数字
        SingleOutputStreamOperator<String> filterDS = ds.filter(new FilterFunction<String>() {
            @Override
            public boolean filter(String value) throws Exception {
                return Integer.parseInt(value) > 10;
            }
        });
        filterDS.print();

        ds.filter(value -> Integer.parseInt(value) > 10).print();


        env.execute();

    }
}
4、KeyBy

// 两种不同的简写方式
ds.keyBy(value -> value.toLowerCase(), Types.STRING).print();
ds.keyBy(String::toLowerCase, Types.STRING).print();

package com.shujia.flink.tf;

import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class Demo04KeyBy {
    public static void main(String[] args) throws Exception {
        // 用于就数据流分组,让相同的Key进入到同一个任务中进行处理,后续可以跟聚合操作
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStream<String> ds = env.socketTextStream("master", 8888);

        KeyedStream<String, String> keyByDS = ds.keyBy(new KeySelector<String, String>() {
            @Override
            public String getKey(String value) throws Exception {
                return value;
            }
        });
        keyByDS.print();

        // 两种不同的简写方式
        ds.keyBy(value -> value.toLowerCase(), Types.STRING).print();
        ds.keyBy(String::toLowerCase, Types.STRING).print();


        env.execute();

    }
}
5、Reduce
package com.shujia.flink.tf;

import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class Demo05Reduce {
    public static void main(String[] args) throws Exception {

        // 用于对KeyBy之后的数据流进行聚合计算
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStream<String> ds = env.socketTextStream("master", 8888);

        // 统计班级的平均年龄
        /*
         * 文科一班,20
         * 文科一班,22
         * 文科一班,21
         * 文科一班,20
         * 文科一班,22
         *
         * 理科一班,20
         * 理科一班,21
         * 理科一班,20
         * 理科一班,21
         * 理科一班,20
         *
         */
        SingleOutputStreamOperator<Tuple3<String, Integer, Integer>> kvDS = ds.map(line -> {
            String[] split = line.split(",");
            String clazz = split[0];
            int age = Integer.parseInt(split[1]);
            return Tuple3.of(clazz, age, 1);
        }, Types.TUPLE(Types.STRING, Types.INT, Types.INT));

        KeyedStream<Tuple3<String, Integer, Integer>, String> keyByDS = kvDS.keyBy(kv -> kv.f0, Types.STRING);

        keyByDS.reduce(new ReduceFunction<Tuple3<String, Integer, Integer>>() {
                    @Override
                    public Tuple3<String, Integer, Integer> reduce(Tuple3<String, Integer, Integer> value1, Tuple3<String, Integer, Integer> value2) throws Exception {
                        return Tuple3.of(value1.f0, value1.f1 + value2.f1, value1.f2 + value2.f2);
                    }
                }).map(t3 -> Tuple2.of(t3.f0, (double) t3.f1 / t3.f2),Types.TUPLE(Types.STRING,Types.DOUBLE))
                .print();


        keyByDS.reduce((v1,v2)->Tuple3.of(v1.f0, v1.f1 + v2.f1, v1.f2 + v2.f2))
                .map(t3 -> Tuple2.of(t3.f0, (double) t3.f1 / t3.f2),Types.TUPLE(Types.STRING,Types.DOUBLE))
                .print();

        env.execute();
    }
}
6、Window
package com.shujia.flink.tf;

import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;

public class Demo06Window {
    public static void main(String[] args) throws Exception {
        // Flink窗口操作:时间、计数、会话

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStream<String> ds = env.socketTextStream("master", 8888);
        SingleOutputStreamOperator<Tuple2<String, Integer>> kvDS = ds.map(word -> Tuple2.of(word, 1), Types.TUPLE(Types.STRING, Types.INT));

        // 每隔5s统计每个单词的数量 ---> 滚动窗口实现(与spark中的定义相同)
        SingleOutputStreamOperator<Tuple2<String, Integer>> outputDS01 = kvDS
                // 按照Tuple2中的第一个元素进行分组
                .keyBy(kv -> kv.f0, Types.STRING)
                // 设置滚动时间
                .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
                // 对Tuple2中的第二个元素(索引为1的元素,即Integer类型)进行求和
                .sum(1);

//        outputDS01.print();
        // 每隔5s统计最近10s内的每个单词的数量 ---> 滑动窗口实现(与spark中的定义相同)
        kvDS
                .keyBy(kv -> kv.f0, Types.STRING)
                // 设置窗口大小和滑动大小
                .window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5)))
                .sum(1)
                .print();

        env.execute();
    }
}
7、Union
package com.shujia.flink.tf;

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class Demo07Union {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStream<String> ds01 = env.socketTextStream("master", 8888);
        DataStream<String> ds02 = env.socketTextStream("master", 9999);

        DataStream<String> unionDS = ds01.union(ds02);

        // union 就是将两个相同结构的DS合并成一个DS(上下合并)
        unionDS.print();

        env.execute();

    }
}
8、Process

通过processElement实现Map算子操作、flatMap算子操作(实现扁平化)、filter算子操作

package com.shujia.flink.tf;

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;

public class Demo08Process {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStream<String> ds01 = env.socketTextStream("master", 8888);

        ds01.process(new ProcessFunction<String, Object>() {
            /*
             * 每进来一条数据就会执行一次
             * value :一条数据
             * ctx:可以获取任务执行时的信息
             * out:用于输出数据
             * ProcessFunction<String, Object>.Context ctx:flink的上下文对象
             */
            @Override
            public void processElement(String value, ProcessFunction<String, Object>.Context ctx, Collector<Object> out) throws Exception {
                // 通过processElement实现Map算子操作
                out.collect(Tuple2.of(value, 1));
                // 通过processElement实现flatMap算子操作(实现扁平化)
                for (String word : value.split(",")) {
                    out.collect(word);
                }
                // 通过processElement实现filter算子操作
                if("java".equals(value)){
                    out.collect("java ok");
                }
            }
        }).print();


        env.execute();

    }
}

通过processElement实现KeyBy算子操作

package com.shujia.flink.tf;

import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;

import java.util.HashMap;

public class Demo09KeyByProcess {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStream<String> ds01 = env.socketTextStream("master", 8888);

        KeyedStream<Tuple2<String, Integer>, String> keyedDS = ds01.process(new ProcessFunction<String, Tuple2<String, Integer>>() {
            @Override
            public void processElement(String value, ProcessFunction<String, Tuple2<String, Integer>>.Context ctx, Collector<Tuple2<String, Integer>> out) throws Exception {
                for (String word : value.split(",")) {
                    out.collect(Tuple2.of(word, 1));
                }
            }
        }).keyBy(t2 -> t2.f0, Types.STRING);


        // 基于分组之后的数据流同样可以调用process方法
        /**
         * KeyedProcessFunction<K, I, O>
         * Type parameters:
         * <K> – Type of the key. <I> – Type of the input elements. <O> – Type of the output elements.
         */
        keyedDS
                .process(new KeyedProcessFunction<String, Tuple2<String, Integer>, String>() {
                    HashMap<String, Integer> wordCntMap;

                    // 当KeyedProcessFunction构建时只会执行一次,这样就避免了重复创建HashMap对象
                    @Override
                    public void open(Configuration parameters) throws Exception {
                        wordCntMap = new HashMap<String, Integer>();
                    }

                    // 每一条数据会执行一次
                    @Override
                    public void processElement(Tuple2<String, Integer> value, KeyedProcessFunction<String, Tuple2<String, Integer>, String>.Context ctx, Collector<String> out) throws Exception {
                        // 通过process实现word count
                        // 判断word是不是第一次进入,通过HashMap查找word是否有count值
                        String word = value.f0;
                        int cnt = 1;
                        if (wordCntMap.containsKey(word)) {
                            //get 在集合中通过value来获取对应的值
                            int newCnt = wordCntMap.get(word) + 1;
                            wordCntMap.put(word, newCnt);
                            cnt = newCnt;
                        } else {
                            wordCntMap.put(word, 1);
                        }
                        out.collect(word + ":" + cnt);
                    }
                }).print();

        env.execute();
    }
}
8、Flink并行度

如何设置并行度?

1、考虑吞吐量

有聚合操作的任务:1w条/s 一个并行度

无聚合操作的任务:10w条/s 一个并行度

2、考虑集群本身的资源

注:

Task的数量由并行度以及有无Shuffle一起决定(可在shuffle之前观察是否有可合并的Task,可以来减少Task数量)

Task Slot数量 是由任务中最大的并行度决定

TaskManager的数量由配置文件中每个TaskManager设置的Slot数量及任务所需的Slot数量一起决定

FLink 并行度设置的几种方式:

1、通过env设置,不推荐,如果需要调整并行度得修改代码重新打包提交任务
2、每个算子可以单独设置并行度,视实际情况决定,一般不常用
3、还可以在提交任务的时候指定并行度,最常用 比较推荐的方式
命令行:flink run 可以通过 -p 参数设置全局并行度
4、配置文件flink-conf.yaml中设置
web UI:填写parallelism输入框即可设置,优先级:算子本身的设置 > env做的全局设置 > 提交任务时指定的 > 配置文件flink-conf.yaml

package com.shujia.flink.core;

import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class Demo03Parallelism {
    public static void main(String[] args) throws Exception {
        /**
         * 如何设置并行度?
         * 1、考虑吞吐量
         *      有聚合操作的任务:1w条/s 一个并行度
         *      无聚合操作的任务:10w条/s 一个并行度
         * 2、考虑集群本身的资源
         *
         * Task的数量由并行度以及有无Shuffle一起决定(可在shuffle之前观察是否有可合并的Task,可以来减少Task数量)
         * Task Slot数量 是由任务中最大的并行度决定
         * TaskManager的数量由配置文件中每个TaskManager设置的Slot数量及任务所需的Slot数量一起决定
         *
         */
        // FLink 并行度设置的几种方式
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 1、通过env设置,不推荐,如果需要调整并行度得修改代码重新打包提交任务
        env.setParallelism(3);

        // socketTextStream的并行度为1,无法调整
        DataStreamSource<String> ds = env.socketTextStream("master", 8888);

        // 2、每个算子可以单独设置并行度,视实际情况决定,一般不常用
        SingleOutputStreamOperator<Tuple2<String, Integer>> kvDS = ds
                        .map(word -> Tuple2.of(word, 1), Types.TUPLE(Types.STRING, Types.INT))
                        .setParallelism(4);


        SingleOutputStreamOperator<Tuple2<String, Integer>> wordCntDS2P =
                kvDS.keyBy(kv -> kv.f0)
                .sum(1)
                        .setParallelism(2);


        // 如果算子不设置并行度则以全局为准
        wordCntDS2P.print();

        /**
         * 3、还可以在提交任务的时候指定并行度,最常用 比较推荐的方式
         *  命令行:flink run 可以通过 -p 参数设置全局并行度
         *  
         *  web UI:填写parallelism输入框即可设置,优先级:算子本身的设置 > env做的全局设置 > 提交任务时指定的 > 配置文件flink-conf.yaml
         */

        env.execute();

    }
}

上述代码执行如下:
在这里插入图片描述
在这里插入图片描述

9、事件时间

事件时间:指的是数据产生的时间或是数据发生的时间。它是数据本身所携带的时间信息,代表了事件真实发生的时间。在Flink中,事件时间通过数据元素自身带有的时间戳来表示,这个时间戳具有业务含义,并与系统时间独立。

1、案例一:基于事件事件的滚动窗口的实现

窗口的触发条件:

1、水位线大于等于窗口的结束时间

2、窗口内有数据

水位线:某个线程中所接收到的数据中最大的时间戳
水位线设置1: 单调递增时间戳策略,不考虑数据乱序问题。所传入数据的最大事件时间作为水位线
.<Tuple2<String, Long>>forMonotonousTimestamps()
水位线设置2 设置水位线前移,容忍5s的数据乱序到达,本质上将水位线前移5s,缺点:导致任务延时变大

.<Tuple2<String, Long>>forBoundedOutOfOrderness(Duration.ofSeconds(5))

package com.shujia.flink.core;

import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.TimestampAssignerSupplier;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;

import java.time.Duration;

public class Demo04EventTime {
    public static void main(String[] args) throws Exception {
        // 事件时间:数据本身自带的时间
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 设置全局并行度
        env.setParallelism(1);

        /*
        数据格式:单词,时间戳(很大的整数,Long类型)
         a,1722233813000
         a,1722233814000
         a,1722233815000
         a,1722233816000
         a,1722233817000
         a,1722233818000
         a,1722233819000
         a,1722233820000
         a,1722233822000
         a,1722233827000
         */
        DataStreamSource<String> wordTsDS = env.socketTextStream("master", 8888);

        SingleOutputStreamOperator<Tuple2<String, Long>> mapDS = wordTsDS
                .map(line -> Tuple2.of(line.split(",")[0], Long.parseLong(line.split(",")[1])), Types.TUPLE(Types.STRING, Types.LONG));

        // 指定数据的时间戳,告诉Flink,将其作为事件时间进行处理
        SingleOutputStreamOperator<Tuple2<String, Long>> assDS = mapDS
                .assignTimestampsAndWatermarks(
                        WatermarkStrategy
                                  // 水位线:某个线程中所接收到的数据中最大的时间戳
//                                //水位线设置1: 单调递增时间戳策略,不考虑数据乱序问题。所传入数据的最大事件时间作为水位线
//                                .<Tuple2<String, Long>>forMonotonousTimestamps()
                                //TODO :水位线设置2 设置水位线前移,容忍5s的数据乱序到达,本质上将水位线前移5s,缺点:导致任务延时变大
                                .<Tuple2<String, Long>>forBoundedOutOfOrderness(Duration.ofSeconds(5))
                                // 指定事件时间,可以提取数据的某一部分作为事件时间
                                .withTimestampAssigner(new SerializableTimestampAssigner<Tuple2<String, Long>>() {
                                    @Override
                                    public long extractTimestamp(Tuple2<String, Long> t2, long recordTimestamp) {
                                        return t2.f1;
                                    }
                                })
                );

        // 不管是事件时间还是处理时间都需要搭配窗口操作一起使用
        assDS.map(kv -> Tuple2.of(kv.f0, 1), Types.TUPLE(Types.STRING, Types.INT))
                .keyBy(t2 -> t2.f0)
                /**
                 * 窗口的触发条件
                 * 1、水位线大于等于窗口的结束时间
                 * 2、窗口内有数据
                 *TumblingEventTimeWindows:滚动窗口
                 */
                .window(TumblingEventTimeWindows.of(Time.seconds(5)))
                .sum(1)
                .print();

        env.execute();

    }
}
2、案例二:自定义水平线策略

多并行度,map之后指定水位线生成策略

注:必须两个线程中的水位线都超过了窗口的大小,才能触发窗口的执行

当窗口满足执行条件:

1、所有线程的水位线都超过了窗口的结束时间 (依次每两个不同编号的线程为一组,该组均超过)

2、窗口有数据 触发一次process方法

package tfTest;

import com.shujia.flink.event.MyEvent;
import org.apache.flink.api.common.eventtime.*;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;

public class Demo05WaterMarkStrategy {
    public static void main(String[] args) throws Exception {
        // 自定义水位线策略
        // 参考链接:https://blog.csdn.net/zznanyou/article/details/121666563
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        env.setParallelism(2);

        DataStreamSource<String> eventDS = env.socketTextStream("master", 8888);


        // 将每条数据变成MyEvent类型
        eventDS.map(new MapFunction<String, MyEvent>() {
            @Override
            public MyEvent map(String value) throws Exception {
                String[] split = value.split(",");
                return new MyEvent(split[0],Long.parseLong(split[1]));
            }
        })
                // TODO 设置事件时间和自定义水平线策略
                .assignTimestampsAndWatermarks(new WatermarkStrategy<MyEvent>() {
            @Override
            public TimestampAssigner<MyEvent> createTimestampAssigner(TimestampAssignerSupplier.Context context) {
                return new SerializableTimestampAssigner<MyEvent>() {
                    @Override
                    public long extractTimestamp(MyEvent element, long recordTimestamp) {
                        return element.getTs();
                    }
                };
            }

            @Override
            public WatermarkGenerator<MyEvent> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
                return new MyMapWatermarkGenerator();
            }
        }).keyBy(my-> my.getWord())
                .window(TumblingEventTimeWindows.of(Time.seconds(5)))
                // 当窗口满足执行条件:1、所有线程的水位线都超过了窗口的结束时间 2、窗口有数据 触发一次process方法
                .process(new ProcessWindowFunction<MyEvent, String, String, TimeWindow>() {
                    @Override
                    public void process(String s, ProcessWindowFunction<MyEvent, String, String, TimeWindow>.Context context, Iterable<MyEvent> elements, Collector<String> out) throws Exception {
                        System.out.println("窗口触发执行了。");
                        System.out.println("当前水位线为:" + context.currentWatermark() + ",当前窗口的开始时间:" + context.window().getStart() + ",当前窗口的结束时间:" + context.window().getEnd());

                        // 基于elements做统计 通过out可以将结果发送到下游
                    }
                }).print();


        env.execute();

    }
}

// 用于map之后指定水位线生成策略
class MyMapWatermarkGenerator implements WatermarkGenerator<MyEvent> {

    private final long maxOutOfOrderness = 0;

    private long currentMaxTimeStamp;

    //TODO 每来一条数据会处理一次,若maxOutOfOrderness为0,则为单调递增时间戳策略;若不为0,则是水位线前移策略
    @Override
    public void onEvent(MyEvent event, long eventTimestamp, WatermarkOutput output) {
        currentMaxTimeStamp = Math.max(currentMaxTimeStamp, eventTimestamp);
        System.out.println("当前线程编号为:" + Thread.currentThread().getId() + ",当前水位线为:" + (currentMaxTimeStamp - maxOutOfOrderness));
    }

    // 周期性的执行:env.getConfig().getAutoWatermarkInterval(); 默认是200ms
    @Override
    public void onPeriodicEmit(WatermarkOutput output) {
        // 发送
        output.emitWatermark(new Watermark(currentMaxTimeStamp - maxOutOfOrderness));
    }
}

执行结果:

在这里插入图片描述

多并行度,source之后设置水位线策略

效果通线程并行度为1的情况

package com.shujia.flink.core;

import com.shujia.flink.event.MyEvent;
import org.apache.flink.api.common.eventtime.*;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;

public class Demo05WaterMarkStrategy {
    public static void main(String[] args) throws Exception {
        // 自定义水位线策略
        // 参考链接:https://blog.csdn.net/zznanyou/article/details/121666563
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        env.setParallelism(2);

        DataStreamSource<String> eventDS = env.socketTextStream("master", 8888);

        // 在Source之后就指定水位线策略
        eventDS.assignTimestampsAndWatermarks(new WatermarkStrategy<String>() {
                    // 指定时间戳的提取策略
                    @Override
                    public TimestampAssigner<String> createTimestampAssigner(TimestampAssignerSupplier.Context context) {
                        return new SerializableTimestampAssigner<String>() {
                            @Override
                            public long extractTimestamp(String element, long recordTimestamp) {
                                return Long.parseLong(element.split(",")[1]);
                            }
                        };
                        // 简写方式
//                return (ele,ts)->Long.parseLong(ele.split(",")[1]);
                    }

                    // 指定水位线的策略
                    @Override
                    public WatermarkGenerator<String> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
                        return new MyWatermarkGenerator();
                    }
                })
                // 将数据变成KV格式,即:单词,1
                .map(line -> Tuple2.of(line.split(",")[0], 1), Types.TUPLE(Types.STRING, Types.INT))
                .keyBy(t2 -> t2.f0)
                .window(TumblingEventTimeWindows.of(Time.seconds(5)))
                // 当窗口满足执行条件:1、水位线超过了窗口的结束时间 2、窗口有数据 触发一次process方法
                .process(new ProcessWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, String, TimeWindow>() {
                    @Override
                    public void process(String s, ProcessWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, String, TimeWindow>.Context context, Iterable<Tuple2<String, Integer>> elements, Collector<Tuple2<String, Integer>> out) throws Exception {
                        System.out.println("窗口触发执行了。");
                        System.out.println("当前水位线为:" + context.currentWatermark() + ",当前窗口的开始时间:" + context.window().getStart() + ",当前窗口的结束时间:" + context.window().getEnd());

                        // 基于elements做统计 通过out可以将结果发送到下游
                    }
                }).print();
        env.execute();

    }
}

// 用于Source之后直接指定水位线生成策略
class MyWatermarkGenerator implements WatermarkGenerator<String> {

    private final long maxOutOfOrderness = 0;

    private long currentMaxTimeStamp;

    // 每来一条数据会处理一次
    @Override
    public void onEvent(String event, long eventTimestamp, WatermarkOutput output) {
        currentMaxTimeStamp = Math.max(currentMaxTimeStamp, eventTimestamp);
        System.out.println("当前线程编号为:" + Thread.currentThread().getId() + ",当前水位线为:" + (currentMaxTimeStamp - maxOutOfOrderness));
    }

    // 周期性的执行:env.getConfig().getAutoWatermarkInterval(); 默认是200ms
    @Override
    public void onPeriodicEmit(WatermarkOutput output) {
        output.emitWatermark(new Watermark(currentMaxTimeStamp - maxOutOfOrderness));
    }
}
10、窗口
1、时间窗口:滚动与滑动窗口

时间窗口:滚动、滑动

时间类型:处理时间、事件时间

package com.shujia.flink.window;

import com.shujia.flink.event.MyEvent;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;

import java.time.Duration;

public class Demo01TimeWindow {
    public static void main(String[] args) throws Exception {
        /*
         * 时间窗口:滚动、滑动
         * 时间类型:处理时间、事件时间
         */
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        env.setParallelism(1);

        DataStream<MyEvent> myDS = env.socketTextStream("master", 8888)
                .map(new MapFunction<String, MyEvent>() {
                    @Override
                    public MyEvent map(String value) throws Exception {
                        String[] split = value.split(",");
                        return new MyEvent(split[0], Long.parseLong(split[1]));
                    }
                });

        // 基于处理时间的滚动、滑动窗口
        SingleOutputStreamOperator<Tuple2<String, Integer>> processDS = myDS
                .map(e -> Tuple2.of(e.getWord(), 1), Types.TUPLE(Types.STRING, Types.INT))
                .keyBy(t2 -> t2.f0)
                // 滚动窗口 每隔5s统计一次
//                .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
                // 滑动窗口 每隔5s统计最近10s内的数据
                .window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5)))
                .sum(1);

        // 基于事件时间的滚动、滑动窗口
        SingleOutputStreamOperator<MyEvent> assDS = myDS.assignTimestampsAndWatermarks(
                // 设置水位线策略、指定事件时间
                WatermarkStrategy
                        // Duration.ofSeconds(5):水位线前移5s
                        .<MyEvent>forBoundedOutOfOrderness(Duration.ofSeconds(5))
                        .withTimestampAssigner((event, ts) -> event.getTs())
        );
        SingleOutputStreamOperator<Tuple2<String, Integer>> eventDS = assDS
                .map(e -> Tuple2.of(e.getWord(), 1), Types.TUPLE(Types.STRING, Types.INT))
                .keyBy(t2 -> t2.f0)
                // 滚动窗口,由于水位线前移了5s,整体有5s的延时
//                .window(TumblingEventTimeWindows.of(Time.seconds(5)))
                // 滑动窗口
                .window(SlidingEventTimeWindows.of(Time.seconds(10),Time.seconds(5)))
                .sum(1);

//        processDS.print();
        eventDS.print();
        env.execute();

    }
}
2、会话窗口

基于处理时间的会话窗口,当一段时间没有数据,那么就认定此次会话结束并触发窗口的执行
基于事件时间的会话窗口,连续接收的两条数据的事件时间之差要大于5s(窗口大小),才能触发窗口的执行

package com.shujia.flink.window;

import com.shujia.flink.event.MyEvent;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.EventTimeSessionWindows;
import org.apache.flink.streaming.api.windowing.assigners.ProcessingTimeSessionWindows;
import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;

public class Demo02Session {
    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        env.setParallelism(1);

        DataStream<MyEvent> myDS = env.socketTextStream("master", 8888)
                .map(new MapFunction<String, MyEvent>() {
                    @Override
                    public MyEvent map(String value) throws Exception {
                        String[] split = value.split(",");
                        return new MyEvent(split[0], Long.parseLong(split[1]));
                    }
                });


        // 基于处理时间的会话窗口,当一段时间没有数据,那么就认定此次会话结束并触发窗口的执行
        SingleOutputStreamOperator<Tuple2<String, Integer>> processSessionDS = myDS.map(e -> Tuple2.of(e.getWord(), 1), Types.TUPLE(Types.STRING, Types.INT))
                .keyBy(t2 -> t2.f0)
                // 10秒内没有数据,则认定此次会话结束并触发窗口的执行
                .window(ProcessingTimeSessionWindows.withGap(Time.seconds(10)))
                .sum(1);

        //TODO 基于事件时间的会话窗口,连续接收的两条数据的事件时间之差要大于5s(窗口大小),才能触发窗口的执行
        // 指定水位线策略并提供数据中的时间戳解析规则
        SingleOutputStreamOperator<MyEvent> assDS = myDS.assignTimestampsAndWatermarks(
                WatermarkStrategy
                        .<MyEvent>forMonotonousTimestamps()
                        .withTimestampAssigner((e, ts) -> e.getTs())
        );

        SingleOutputStreamOperator<Tuple2<String, Integer>> eventSessionDS = assDS.map(e -> Tuple2.of(e.getWord(), 1), Types.TUPLE(Types.STRING, Types.INT))
                .keyBy(t2 -> t2.f0)
                .window(EventTimeSessionWindows.withGap(Time.seconds(5)))
                .sum(1);

//        processSessionDS.print();
        eventSessionDS.print();
        env.execute();
    }
}
3、计数窗口:滚动、滑动

滚动下:每同一个key的5条数据会统计一次

滑动下:每隔同一个key的5条数据,统计最近的同一个key的10条数据

package com.shujia.flink.window;

import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class Demo03CountWindow {
    public static void main(String[] args) throws Exception {
        // 计数窗口:滚动、滑动
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStreamSource<String> ds = env.socketTextStream("master", 8888);

        ds.map(word-> Tuple2.of(word,1), Types.TUPLE(Types.STRING,Types.INT))
                .keyBy(t2->t2.f0)
//                .countWindow(5) // 每同一个key的5条数据会统计一次
                .countWindow(10,5) // 每隔同一个key的5条数据,统计最近的同一个key的10条数据
                .sum(1)
                .print();

        env.execute();

        /**
         * 每隔同一个key的5条数据,统计最近的同一个key的10条数据
         * 输入:
         * a
         * a
         * a
         * a
         * a
         * b
         * b
         * b
         * a
         * a
         * a
         * a
         * a
         * 输出:
         * 13> (a,5)
         * 13> (a,10)
         */

    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1961885.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【HadoopShuffle原理剖析】基础篇二

Shuffle原理剖析 Shuffle&#xff0c;是指对Map输出结果进行分区、排序、合并等处理并交给Reduce的过程。分为Map端的操作和Reduce端的操作。 Shuffle过程 Map端的Shuffle Map的输出结果首先被缓存到内存&#xff0c;当缓存区容量到达80%&#xff08;缓冲区默认100MB&#xff…

[论文笔记]思维链提示的升级版——回退提示

引言 今天又带来一篇提示策略的论文笔记&#xff1a;TAKE A STEP BACK: EVOKING REASONING VIA ABSTRACTION IN LARGE LANGUAGE MODELS。 作者提出了回退提示(STEP-BACK PROMPTING)技术&#xff0c;使大模型能够进行抽象&#xff0c;从包含具体细节的实例中推导出高层次的概念…

centos7 docker空间不足

今天在使用docker安装镜像的时候&#xff0c;出现报错 查看原因&#xff0c;发现是分区空间不足导致的 所以考虑进行扩容 首先在vmware扩容并没有生效 因为只是扩展的虚拟空间&#xff0c;并不支持扩展分区大小&#xff0c;下面对分区进行扩容 参考&#xff1a; 分区扩容 主…

【echarts】echarts-liquidfill 水球图

echarts-liquidfill3兼容echarts5 echarts-liquidfill2兼容echarts4 npm install echarts npm install echarts-liquidfill设置水球图背景色和内边框样式 var option {series: [{type: liquidFill,data: [0.6, 0.5, 0.4, 0.3],backgroundStyle: {borderWidth: 5,//边框宽度bo…

怎样看待AI就业冲击?

技术进步对于就业的影响&#xff0c;从工业革命开始就是社会的焦点和研究的关注点。具有“卢德主义”性质的运动和思潮&#xff0c;曾经以各种面貌反复出现。不过&#xff0c;无论是从原因穷究结果&#xff0c;还是从本质看到表象&#xff0c;AI就业冲击这一次来得真的不同以往…

申请美区 Apple ID 完整步骤图解,轻松免费创建账户

苹果手机在下载一些软件时需要我们登录其 Apple ID 才能下载&#xff0c;但是由于一些限制国内的 Apple ID 在 App Store 中有一些限制不能下载某些软件&#xff0c;如何解决这个问题&#xff1f;那就是申请一个美区 Apple ID&#xff0c;怎么申请国外苹果账户呢&#xff1f;下…

国家超算互联网平台:模型服务体验与本地部署推理实践

目录 前言一、平台显卡选用1、显卡选择2、镜像选择3、实例列表4、登录服务器 二、平台模型服务【Stable Diffusion WebUI】体验1、模型运行2、端口映射配置3、体验测试 三、本地模型【Qwen1.5-7B-Chat】推理体验1、安装依赖2、加载模型3、定义提示消息4、获取model_inputs5、生…

typescript中interface常见3种用法

文章目录 函数类型对象类型【自命名】&#xff1a; (函数)对象类型 函数类型 作用&#xff1a;声明一个函数接口&#xff1a;可用于类型声明 | 不可implements 对象类型 作用&#xff1a;声明对象具备哪些实例接口&#xff1a;可用于类型 | 可implements 【自命名】&…

【C#】ThreadPool的使用

1.Thread的使用 Thread的使用参考&#xff1a;【C#】Thread的使用 2.ThreadPool的使用 .NET Framework 和 .NET Core 提供了 System.Threading.ThreadPool 类来帮助开发者以一种高效的方式管理线程。ThreadPool 是一个线程池&#xff0c;它能够根据需要动态地分配和回收线程…

DATE_ADD、DATE_SUB Function - Mysql

DATE_ADD、DATE_SUB Function - SQL DATE_ADD() 和 DATE_SUB() 用于在日期或日期时间上增加或减少指定的时间间隔。 1. DATE_ADD() DATE_ADD() 函数用于向指定的日期或日期时间值添加一个时间间隔。 DATE_ADD(date, INTERVAL expr unit)date: 要添加时间间隔的日期或日期时间…

【Lampiao靶场渗透】

文章目录 一、IP地址获取 二、信息收集 三、破解SSH密码 四、漏洞利用 五、提权 一、IP地址获取 netdiscover -i eth0 Arp-scan -l Nmap -sP 192.168.78.0/24 靶机地址&#xff1a;192.168.78.177 Kali地址&#xff1a;192.168.78.128 二、信息收集 nmap -sV -p- 192.…

实战:ElasticSearch 索引操作命令(补充)

四.ElasticSearch 操作命令 4.1 集群信息操作命令 4.1.1 查询集群状态 &#xff08;1&#xff09;使用 Postman 客户端直接向 ES 服务器发 GET 请求 http://hlink1:9200/_cat/health?v &#xff08;2&#xff09;使用服务端进行查询 curl -XGET "hlink1:9200/_cat/h…

装饰大师——装饰模式(Python实现)

大家好&#xff0c;今天我们继续来讲结构型设计模式&#xff0c;上一期我们介绍了组合模式&#xff0c;这个模式特别适合用于处理树形结构的问题&#xff0c;它能够让我们像处理单个对象一样来处理对象组合。 装饰模式&#xff08;Decorator Pattern&#xff09;是一种结构型设…

最新彩虹自助下单代发卡码知识付费商城多模板系统完整版去授权源码V6.9

最新彩虹的知识付费商城源码&#xff0c;后台可以选择多套模板&#xff0c;完整版去授权,支持对接多个资源网站&#xff0c;不怕无资源 推荐用宝塔上传后直接访问即可根据提示安装。 后面用户名/密码&#xff1a;admin/123456 PHP推荐使用7.0及以上版本 V6.9 1.修复SQL注入…

k8s 部署RuoYi-Vue-Plus之ingress域名解析

可参看https://blog.csdn.net/weimeibuqieryu/article/details/140798925 搭建ingress 1.创建Ingress对象 ingress-ruoyi.yaml其中host替换为你对应域名&#xff0c;需要解析域名到服务器, 同时为后端服务添加了二级域名解析 api. 访问http://xxx.xyz/就能访问前端&#xff0…

应急靶场(11):【玄机】日志分析-apache日志分析

题目 提交当天访问次数最多的IP&#xff0c;即黑客IP黑客使用的浏览器指纹是什么&#xff0c;提交指纹的md5查看index.php页面被访问的次数&#xff0c;提交次数查看黑客IP访问了多少次&#xff0c;提交次数查看2023年8月03日8时这一个小时内有多少IP访问&#xff0c;提交次数 …

【Redis 初阶】Redis 常见数据类型(Set、Zset、渐进式遍历、数据库管理)

一、Set 集合 集合类型也是保存多个字符串类型的元素的&#xff08;可以使用 json 格式让 string 也能存储结构化数据&#xff09;&#xff0c;但和列表类型不同的是&#xff0c;集合中&#xff1a; 元素之间是无序的。&#xff08;此处的 “无序” 是和 list 的有序相对应的…

重载云台摄像机如何通过国标28181接入到统一视频接入平台(视频国标接入平台)

目录 一、国标GB/T 28181介绍 1、国标GB/T28181 2、内容和特点 二、重载云台摄像机 1、定义 2、结构与设计 3、功能和优势 4、特点 5、应用场景 二、接入准备工作 1、确定网络环境 &#xff08;1&#xff09;公网接入 &#xff08;2&#xff09;专网传输 2、检查重…

软件测试基础1--功能测试

1、什么是软件测试&#xff1f; 软件是控制计算机硬件运行的工具。 软件测试&#xff1a;使用技术手段验证软件是否满足使用需求&#xff0c;为了发现软件功能和需求不相符合的地方&#xff0c;或者寻找实际输出和预期输出之间的差异。 软件测试的目的&#xff1a;减少软件缺陷…