Flink系列二:DataStream API中的Source,Transformation,Sink详解(^_^)

news2025/1/13 19:36:06

在上面篇文章中已经对flink进行了简单的介绍以及了解了Flink API 层级划分,这一章内容我们主要介绍DataStream API

 

流程图解:

 

3908739ad07d4f6b987e0e15b257e0ac.png

 

一、DataStream API Source

Flink 在流处理和批处理上的 source 大概有 4 类:

(1)基于本地集合的 source

(2)基于文件的 source

(3)基于网络套接字的 source,具体来说就是从远程服务器或本地端口上的套接字连接中接收数据,比如上一篇文章中的入门案例就属于这一种。

(4)自定义的 source。自定义的 source 常见的有 Apache kafka、Amazon Kinesis Streams、RabbitMQ、Twitter Streaming API、Apache NiFi 等,当然你也可以定义自己的 source,灵活度较高,看个人需求。

 

下面就是纯代码演示了,具体细节会在注释中说明

1、本地集合的source

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import java.util.ArrayList;

public class Demo1ListSource {
    public static void main(String[] args) throws Exception{
        //创建flink执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        //创建集合
        ArrayList<String> arrayList = new ArrayList<>();
        arrayList.add("java");
        arrayList.add("java");
        arrayList.add("java");
        arrayList.add("java");
        arrayList.add("java");

        /*
         *基于集合的Source ----- 属于有界流
         */
        DataStream<String> listDS = env.fromCollection(arrayList);
        listDS.print();

        //启动Flink作业执行
        env.execute();
    }
}

结果:

16787a01a3a847538ef32dcecc29f79f.png

在这解释一下结果图中的数字前缀,这个前缀的主要目的是不同并行实例的输出。什么都不设置的话取决于你电脑的内存了,比如我电脑是16G的内存,那么当数据较多时默认分配给该作业分了16个task。

 

2、本地文件的source

注意:同一个File数据源,既能有界读取,也能无界读取

2.1 有界读取

/*
 *流批统一:
 * 1、同一套算子代码既能作流处理也能做批处理
 * 2、同一个File数据源,既能有界读取,也能无界读取
 */
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.connector.file.src.FileSource;
import org.apache.flink.connector.file.src.reader.TextLineInputFormat;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class Demo2FileSource1 {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        /*
         *有界读取
         */
        //老版本方式:简单但不灵活
        DataStream<String> lineDS = env.readTextFile("flink/data/student.csv");
//        lineDS.print();

        //新版本方式:复杂一点但更灵活,使用这种既能有界读取,也能无界读取
        //构建fileSource
        FileSource<String> fileSource = FileSource.forRecordStreamFormat(
                //指定编码
                new TextLineInputFormat("UTF-8")
                //指定路径
                , new Path("flink/data/student.csv")
        ).build();

        //使用fileSource
        DataStream<String> fileDS = env.fromSource(fileSource, WatermarkStrategy.noWatermarks(), "fileSource");
        fileDS.print();

        env.execute();
    }
}

 

2.1 无界读取

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.connector.file.src.FileSource;
import org.apache.flink.connector.file.src.reader.TextLineInputFormat;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class Demo2FileSource2 {
    public static void main(String[] args) throws Exception {
        /*
         *使用无界流读取文件数,很简单,其实就是对上面的代码修改运行模式并加个参数就可以了
         */
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        //修改运行模式
        env.setRuntimeMode(RuntimeExecutionMode.STREAMING);

        //构建fileSource
        FileSource<String> fileSource = FileSource.forRecordStreamFormat(new TextLineInputFormat("UTF-8"),
                new Path("spark/data/student.csv")).build();

        //使用fileSource
        DataStreamSource<String> linesDS = env.fromSource(fileSource, WatermarkStrategy.noWatermarks(), "fileSource");
        linesDS.print();

        env.execute();

    }
}

 

3、本地端口的source

上一篇文章中的入门案例就属于这一种,后面在代码中也会用到,在此不在赘述了。

 

4、自定义的 source

举例:使用自定义source读取mysql中的数据

/*实现方式:
 * 1、实现SourceFunction或ParallelSourceFunction接口来创建自定义的数据源。
 * 2、然后使用env.addSource(new CustomSourceFunction())或DataStreamSource.fromSource添加你自定义的数据源。
 */
import lombok.AllArgsConstructor;
import lombok.Data;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;

public class Demo3MysqlSource {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        //使用自定义的source
        DataStream<Student> studentDSSource = env.addSource(new MysqlSource());
        //统计学生表每个班级的人数

        //取出每一行的班级列并加上人数后缀1
        DataStream<Tuple2<String, Integer>> clazzKvDS = studentDSSource.map(line -> Tuple2.of(line.getClazz(), 1), Types.TUPLE(Types.STRING, Types.INT));
        //分组,将相同的键发送给同一个task中
        KeyedStream<Tuple2<String, Integer>, String> keyByDS = clazzKvDS.keyBy(kv -> kv.f0);
        //求和
        SingleOutputStreamOperator<Tuple2<String, Integer>> clazzSum = keyByDS.sum(1);
        //输出
        clazzSum.print();


        env.execute();

    }

}

/**
 * 自定义source读取mysql中的数据
 */
class MysqlSource implements SourceFunction<Student> {
    /**
     * run()方法会在任务启动的时候执行一次
     */
    @Override
    public void run(SourceContext ctx) throws Exception {
        //1、加载mysq驱动
        Class.forName("com.mysql.jdbc.Driver");
        //2、创建数据库连接
        //注意:如果报连不上的错误,将参数补全(useUnicode=true&allowPublicKeyRetrieval=true&characterEncoding=utf8&useSSL=false)
        Connection conn = DriverManager.getConnection("jdbc:mysql://master:3306/bigdata29?useSSL=false", "root", "123456");
        //3、编写sql查询
        PreparedStatement sql = conn.prepareStatement("select * from students");
        //4、执行查询
        ResultSet resultSet = sql.executeQuery();
        //5、遍历查询出的数据
        while (resultSet.next()) {
            int id = resultSet.getInt("id");
            String name = resultSet.getString("name");
            int age = resultSet.getInt("age");
            String gender = resultSet.getString("gender");
            String clazz = resultSet.getString("clazz");

            //将数据发送到下游
            /*
             * collect():从 DataStream 收集所有的元素,并将它们作为列表或其他集合类型返回给客户端
             */
            ctx.collect(new Student(id, name, age, gender, clazz));
        }

        //6、释放资源
        sql.close();
        conn.close();
    }


    @Override
    public void cancel() {
        /*
         * cancel(),它用于在任务完成后执行清理操作
         */
    }
}

/**
 * 这里使用了lombok插件(小辣椒)
 * 这个插件的作用可以在代码编译的时候增加方法(相当于scala中的case class),就不用我们自己手动添加get、set、toString等方法了。
 * 使用方法:加@就行了
 */
@Data
@AllArgsConstructor
class Student {
    private int id;
    private String name;
    private int age;
    private String gender;
    private String clazz;
}

 

 

二、DataStream API Transformation

Transformation:数据流转换。

常见算子有 Map / FlatMap / Filter /KeyBy / Reduce / Fold / Aggregations / Window / WindowAll / Union / Window join / Split / Select / Project 等,操作很多,可以将数据转换计算成你想要的数据形式。

其实这些算子在功能上与scala或spark中的基本相同,只是形式和细节上会有些差别。

1、map

DataStream → DataStream    输入一个元素同时输出一个元素


import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;


public class Demo1Map {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        //使用nc -lk 8888 模拟实时数据的产生
        DataStreamSource<String> source = env.socketTextStream("master", 8888);
        //方式1:匿名内部类形式
        /*
         * 观察源码map发现:
         * MapFunction<T, O> 是一个函数接口,用于对流中的每个元素的处理
         * 这个接口定义了一个 map 方法,该方法接受一个输入元素(类型为 T)并返回一个输出元素(类型为 O)。
         */
        DataStream<String> map1DS = source.map(new MapFunction<String, String>() {
            @Override
            public String map(String word) throws Exception {
                return word.toUpperCase();
            }
        });
//        map1DS.print();

        //方式2:lambda表达式形式(更简洁常用)
        source.map(String::toUpperCase).print();    //是对source.map(word -> word.toUpperCase())的更简写
        
        env.execute();
        
    }
}

结果:

45374801cb354dccbc098f73af160b18.png

 

 

2、flatMap

DataStream → DataStream

输入一个元素转换为一个或多个元素输出

/*
 *flatMap 方法用于将输入流中的每个元素转换成一个或多个输出元素
 */

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

public class Demo2FaltMap {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStream<String> source = env.socketTextStream("master", 8888);


        //方式1:匿名内部类
        //看源码,这个方法接受一个FlatMapFunction<T, R>类型的参数,其中T是输入元素的类型,R是输出元素的类型
        DataStream<String> out2DS = source.flatMap(new FlatMapFunction<String, String>() {
            @Override
            public void flatMap(String line, Collector<String> out) throws Exception {
                for (String word : line.split(",")) {
                    //循环将数据发送到下游
                    out.collect(word);
                }
            }
        });

//        out2DS.print();


        //方式2:lambda表达式
        DataStream<String> out1DS = source.flatMap((line, out) -> {
            for (String word : line.split(",")) {
                //循环将数据发送到下游
                out.collect(word);
            }
        }, Types.STRING);

        out1DS.print();

        env.execute();
    }
}

结果:

c82a409f9f824b6eba5ee6263c064070.png

 

3、filter

DataStream → DataStream 

为每个元素执行一个布尔 function,并保留那些 function 输出值为 true 的元素

import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class Demo3Filter {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStream<String> source = env.readTextFile("spark/data/student.csv");
        //需求:过滤出文科一班的学生的信息
        //方式一:匿名内部类
        source.filter(new FilterFunction<String>() {
            @Override
            public boolean filter(String lines) throws Exception {
               return "文科一班".equals(lines.split(",")[4]);
            }
        }); //.print();
        
        //方式2:lambda表达式
        source.filter(lines->"文科一班".equals(lines.split(",")[4])).print();

        env.execute();
    }
}

结果:

d3e93c6a422c4b5abbd8ce6822799a0d.png

 

4、keyBy

作用为:分组

DataStream → KeyedStream

在逻辑层面将流划分为不相交的分区。具有相同 key 的记录都分配到同一个分区。在内部, keyBy() 是通过哈希分区实现的。有多种指定 key 的方式。

39272837e4da48c3b64487f8a16d2766.png

import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class Demo4KeyBy {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStreamSource<String> source = env.socketTextStream("master", 8888);

        //方式1:匿名内部类
        /*
         * public <K> KeyedStream<T, K> keyBy(KeySelector<T, K> key)
         * 其中 T 是输入元素的类型,K 是键的类型
         */
        source.map(word-> Tuple2.of(word,1), Types.TUPLE(Types.STRING,Types.INT))
                .keyBy(new KeySelector<Tuple2<String, Integer>, String>() {
                    @Override
                    public String getKey(Tuple2<String, Integer> kv) throws Exception {
                        return kv.f0;
                    }
                });//.print();

        //方式2:lambda表达式
        source.map(word-> Tuple2.of(word,1), Types.TUPLE(Types.STRING,Types.INT))
                        .keyBy(kv->kv.f0).print();

        env.execute();

    }
}

结果: 可以看出的确作了分区

48db7796907641b9b7c8480b6561ab4b.png

 

5、reduce

作用为:聚合

KeyedStream → DataStream

在相同 key 的数据流上“滚动”执行 reduce。将当前元素与最后一次 reduce 得到的值组合然后输出新值。

import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;


public class Demo5Reduce {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStream<String> source = env.socketTextStream("master", 8888);

        //方式1:匿名内部类
        source.map(word -> Tuple2.of(word, 1), Types.TUPLE(Types.STRING, Types.INT))
                .keyBy(kv -> kv.f0)
                .reduce(new ReduceFunction<Tuple2<String, Integer>>() {
                    @Override
                    public Tuple2<String, Integer> reduce(Tuple2<String, Integer> kv1,
                                                          Tuple2<String, Integer> kv2) throws Exception {
                        //kv1和kv2的key是一样的
                        String word = kv1.f0;
                        int counts = kv1.f1 + kv2.f1;
                        return Tuple2.of(word,counts);
                    }
                }).print();


        env.execute();

    }
}

结果:从结果来看说明reduce是一个有状态算子。 

6003735a7c804c60b0a2652d4f664ee0.png

 

6、Window

KeyedStream → WindowedStream 

可以在已经分区的 KeyedStreams 上定义 Window,Window 根据某些特征(例如,最近 5 秒内到达的数据)对每个 key Stream 中的数据进行分组。

窗口算子有很多,以后会专门出一章具体说明,下面写一个滑动窗口的案例。

import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.*;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;


public class Demo6Window {
    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        /*
         * 每隔5秒统计最近15秒每个单词的数量 --- 滑动窗口
         */

        DataStream<String> wordsDS = env.socketTextStream("master", 8888);

        //转换成kv
        DataStream<Tuple2<String, Integer>> kvDS = wordsDS
                .map(word -> Tuple2.of(word, 1), Types.TUPLE(Types.STRING, Types.INT));

        //按照单词分组
        KeyedStream<Tuple2<String, Integer>, String> keyByDS = kvDS.keyBy(kv -> kv.f0);

        //划分窗口
        //SlidingEventTimeWindows:滑动的处理时间窗口
        //前一个参数为窗口大小(window size),后一个参数为滑动大小(window slide)
        WindowedStream<Tuple2<String, Integer>, String, TimeWindow> windowDS = keyByDS
                .window(SlidingProcessingTimeWindows.of(Time.seconds(30), Time.seconds(10)));

        //统计单词的数量
        DataStream<Tuple2<String, Integer>> countDS = windowDS.sum(1);

        countDS.print();

        env.execute();
    }
}

 

7、Union

DataStream→ DataStream

将两个或多个数据流联合来创建一个包含所有流中数据的新流。注意:如果一个数据流和自身进行联合,这个流中的每个数据将在合并后的流中出现两次。

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class Demo7Union {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStream<String> source1 = env.socketTextStream("master", 8888);
        DataStream<String> source2 = env.socketTextStream("master", 9999);

        /*
         * 合并两个DataStream
         * 注意:在数据层面并没有合并,只是在逻辑层面合并了
         */
        DataStream<String> unionDS = source1.union(source2);
        unionDS.print();

        env.execute();

    }
}

结果:

5e95fe6957a1463abe7dde2dce4af9d9.png

 

8、process

DataStream→ DataStream

process算子是flink的底层算子,可以用来代替map、faltMap、filter等算子

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;

public class Demo8Process {
    public static void main(String[] args) throws Exception {
        /*
         * process算子是flink的底层算子,可以用来代替map、faltMap、filter等算子
         *
         * public <R> SingleOutputStreamOperator<R> process(ProcessFunction<T, R> processFunction)
         * 其中 T 是输入数据的类型,R 是输出数据的类型
         */

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStream<String> source = env.socketTextStream("master", 8888);

        DataStream<Tuple2<String, Integer>> processDS = source.process(new ProcessFunction<String, Tuple2<String, Integer>>() {
            /*
             * processElement:在当前代码中相当于flatMap,每一条数据执行一次,可以返回一条或多条数据
             * ctx:上下文对象(代表flink执行环境)
             * out:输出,用于将数据发送到下游
             */
            @Override
            public void processElement(String line, ProcessFunction<String, Tuple2<String, Integer>>.Context ctx,
                                       Collector<Tuple2<String, Integer>> out) throws Exception {
                //这里的逻辑与flatMap的逻辑相同
                for (String word : line.split(",")) {
                    out.collect(Tuple2.of(word, 1));
                }
            }
        });

        env.execute();


        /*
         * 注意:该算子不能用lambda表达式改写,因为ProcessFunction它包含了一些生命周期方法和状态管理的方法,
         * 这些方法使得它不适合直接简化为lambda表达式的形式。
         *
         * 在底层代码层面来说,ProcessFunction是一个抽象类,该类还有许多复杂的方法,使得它无法直接用lambda表达式来改写
         * 因为 lambda 表达式只能表示简单的函数接口(即那些只包含一个抽象方法的接口)
         * public abstract class ProcessFunction<I, O> extends AbstractRichFunction
         */
    }
}

 

三、DataStream API Sink

Flink 将转换计算后的数据发送的地点 。

Flink 常见的 Sink 大概有如下几类:

(1)打印在控制台、写入文件。

(2)写入 socket(具体指的是将数据发送到网络套接字(例如端口))。

(3)自定义的 sink :常见的有 Apache kafka、RabbitMQ、MySQL、ElasticSearch、Apache Cassandra、Hadoop FileSystem 等,当然你也可以根据需求定义自己的 sink。

 

1、写入文件

对于写入文件,是否要将所有数据写入同一个文件?由于是流式写入,该文件就一直处于正在写入的状态,而且可能会造成文件过大的问题,所以DataStream API提供了滚动策略的方式来解决这样的问题。

9a5ed8bc0f1c42a4a16f59c8bcf80165.png

 


import org.apache.flink.api.common.serialization.SimpleStringEncoder;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.connector.file.sink.FileSink;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
import java.time.Duration;


public class Demo1FileSink {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStream<String> source = env.socketTextStream("master", 8888);

        //创建fileSink
        /*
        *public static <IN> DefaultRowFormatBuilder<IN> forRowFormat(
        *        final Path basePath, final Encoder<IN> encoder)}
        *
        *<IN> : The type of the elements that are being written by the sink.
        */

        FileSink<String> fileSink = FileSink.forRowFormat(new Path("flink/data/words"), new SimpleStringEncoder<String>("UTF-8"))
                .withRollingPolicy(
                        DefaultRollingPolicy.builder()
                                //每10秒进行一次滚动(生成文件)
                                .withRolloverInterval(Duration.ofSeconds(10))
                                //当延迟超过10秒进行一次滚动
                                .withInactivityInterval(Duration.ofSeconds(5))
                                //文件大小达到1MB进行一次滚动
                                .withMaxPartSize(MemorySize.ofMebiBytes(1))
                                .build())
                .build();


        //使用fileSink,将读取的数据写入另一到文件夹中
        source.sinkTo(fileSink);

        env.execute();

    }
}

结果: 

d9cfbc1a28b94ad79646683db279d104.png

 

2、自定义的 sink

举例:使用自定义sink将数据存到mysql中

import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;

public class Demo3MySqlSInk {
    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStream<String> wordsDS = env.socketTextStream("master", 8888);

        //统计单词的数量
        DataStream<Tuple2<String, Integer>> countDS = wordsDS
                .map(word -> Tuple2.of(word, 1), Types.TUPLE(Types.STRING, Types.INT))
                .keyBy(kv -> kv.f0)
                .sum(1);

        //将统计结果保存到数据
        countDS.addSink(new MySQlSink());

        env.execute();

    }
}

/**
 * 自定义sink将数据保存到mysql
 * RichSinkFunction:多了open和close方法,用于打开和关闭连接
 * SinkFunction
 */
class MySQlSink extends RichSinkFunction<Tuple2<String, Integer>> {
    Connection con;
    PreparedStatement stat;

    /**
     * invoke方法每一条数据执行一次
     */
    @Override
    public void invoke(Tuple2<String, Integer> kv, Context context) throws Exception {
        stat.setString(1, kv.f0);
        stat.setInt(2, kv.f1);

        //执行sql
        stat.execute();
    }

    /**
     * open方法会在任务启动的时候,每一个task中执行一次
     */
    @Override
    public void open(Configuration parameters) throws Exception {
        System.out.println("创建数据库连接");
        //1、加载启动
        Class.forName("com.mysql.jdbc.Driver");
        //2、创建数据库连接
        con = DriverManager.getConnection("jdbc:mysql://master:3306/bigdata29", "root", "123456");
        //3、编写保存数据的sql
        //replace into 替换插入,如果没有就插入,如果有就更新,表需要有主键
        stat = con.prepareStatement("replace into word_count values(?,?)");
    }

    /**
     * close方法会在任务取消的时候,每一个task中执行一次
     */
    @Override
    public void close() throws Exception {
        //4、关闭数据库连接
        stat.close();
        con.close();
    }

 

 

 

 

 

---------------------------------------------------------------------------------------------------------------------------------

代码注意提示:

如果在写flink代码的过程中出现了以下错误,大概率就是有些算子使用没有写数据类型,与spark不同,spaark底层由scala编写,scala提供了自动类型推断机制,所以不写参数类型也不会报错,但是flink底层是java编写的,java没有这种机制。

23ddb4c8d57549598c2cdb15b640badf.png

 

 

基础的算子到这结束,其他算子后续也会写,以上内容具体详情皆参考apache flink官网,官网详细说明了各种算子的使用,网址贴在下面了:

https://nightlies.apache.org/flink/flink-docs-release-1.19/zh/docs/dev/datastream/operators/overview/

个人感觉写的很详细了,看不懂建议直接打死作者(^_^)

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1720744.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

vue3中实现鼠标点击后出现点击特效

一、效果展示 图片下方为效果体验地址 缓若江海凝清光 二、代码 js中&#xff1a; <script setup lang"ts"> window.addEventListener("click", (e: MouseEvent) > {const pointer document.createElement("div");pointer.classLi…

跨越创作壁垒:利用写作素材库挖掘创作灵感

跨越创作壁垒&#xff1a;利用写作素材库挖掘创作灵感 写作素材在提供支持、丰富内容、激发创作灵感、提升可信度和帮助组织文章等方面&#xff0c;发挥着重要的作用。合理利用和处理素材可以提高你的写作质量&#xff0c;使你的作品更具有说服力和吸引力。 因此&#xff0c;当…

Hadoop+Spark大数据技术 第七次作业

第七次作业 1. 简述Spark SQL使用的数据抽象DataFrame与Dataset的区别。 DataFrame: 基于 Row 对象的二维表格结构&#xff0c;类似于关系型数据库中的表。 行和列都有明确的 Schema&#xff08;模式&#xff09;&#xff0c;可以进行类型推断。 提供了丰富的操作接口&#xff…

vue中大屏可视化适配所有屏幕大小

1. 外部盒子 .screenBox {width: 100vw;height: 100vh;background: url("/assets/images/bg.png") no-repeat;background-size: cover; }2.比例盒子 外层盒子css定义 .boxScale {width: 1920px;height: 1080px;background-color: orange;transform-origin: left top;…

【工具】创客贴会员|创客贴截止2024年6月所有AI功能效果实测(热门推荐和图片编辑部分)

上一篇&#xff1a;【工具】创客贴会员&#xff5c;万字测评&#xff01;前沿设计网站创客贴的 AI 文生图效果测评 上一篇写的时候只测了文生图&#xff0c;因为百度那边活动没和创客贴接洽好&#xff0c;他们不清楚创客贴的AI和其他会员功能分开了&#xff0c;导致只有10次体…

LabVIEW老程序功能升级:重写还是改进?

概述&#xff1a;面对LabVIEW老程序的功能升级&#xff0c;开发者常常面临重写与改进之间的选择。本文从多个角度分析两种方法的利弊&#xff0c;并提供评估方法和解决思路。 重写&#xff08;重新开发&#xff09;的优势和劣势&#xff1a; 优势&#xff1a; 代码清晰度高&a…

【机器学习300问】106、Inception网络结构如何设计的?这么设计的目的是什么?

谷歌的Inception网络&#xff0c;也被称为GoogLeNet&#xff0c;是Google在2014年推出的一种深度卷积神经网络&#xff08;CNN&#xff09;模型&#xff0c;在这之前的AlexNet、VGG等结构都是通过增大网络的深度&#xff08;层数&#xff09;来获得更好的训练效果&#xff0c;但…

Python—面向对象小解(3)

一、多态 多态指的是一类事物的多中形态 相同的方法&#xff0c;产生不同的执行结果 运算符 * 的多态 int int 加法计算 str str 字符串拼接 list list 列表的数据合并 在python中可以使用类实现一个多态效果 在python中使用重写的方式实现多态 &#xff08;1&#xff09;定…

字符串 | 字符串匹配之 KMP 算法以及 C++ 代码实现

目录 1 为什么使用 KMP&#xff1f;2 什么是 next 数组&#xff1f;2.1 什么是字符串的前后缀&#xff1f;2.2 如何计算 next 数组&#xff1f; 3 KMP 部分的算法4 完整代码 &#x1f608;前言&#xff1a;这篇文章比较长&#xff0c;但我感觉自己是讲明白了的 1 为什么…

Vue之组件基础(插槽)

在HTML中&#xff0c;开发者可以在双标签内添加一些信息。而在Vue中&#xff0c;组件以标签的形式引用&#xff0c;那么如何在组件的标签内添加一些信息并将信息渲染到页面中呢?其实&#xff0c;Vue 提供了插槽&#xff0c;专门用来实现这样的效果。 一.什么是插槽 Vue为组件…

怎么使用Python代码在图片里面加文字

在Python中&#xff0c;给图片添加文字可以使用Pillow库&#xff08;PIL的一个分支&#xff09;&#xff0c;它是一个强大的图像处理库。如果你还没有安装Pillow&#xff0c;可以通过pip安装&#xff1a; pip install Pillow下面使用一个简单的示例&#xff0c;演示如何使用Pi…

系统架构设计师【第9章】: 软件可靠性基础知识 (核心总结)

文章目录 9.1 软件可靠性基本概念9.1.1 软件可靠性定义9.1.2 软件可靠性的定量描述9.1.3 可靠性目标9.1.4 可靠性测试的意义9.1.5 广义的可靠性测试与狭义的可靠性测试 9.2 软件可靠性建模9.2.1 影响软件可靠性的因素9.2.2 软件可靠性的建模方法9.2.3 软件的可靠性模…

CISCN 2023 初赛 被加密的生产流量

题目附件给了 modbus.pcap 存在多个协议 但是这道题多半是 考 modbus 会发现 每次的 Query 末尾的两个字符 存在规律 猜测是base家族 可以尝试提取流量中的数据 其中Word Count字段中的22871 是10进制转16进制在转ascii字符串 先提取 过滤器判断字段 tshark -r modbus.pcap …

java线程状态介绍

1.新建&#xff08;New&#xff09;: 线程对象已创建&#xff0c;但还没有调用 start() 方法。 2.可运行&#xff08;Runnable&#xff09;: 线程已启动&#xff0c;处于就绪状态&#xff0c;等待 JVM 的线程调度器分配CPU时间。 3.阻塞&#xff08;Blocked&#xff09;: 线程…

自动化桌面整理新时代:Llama 3驱动的智能文件管理系统

在信息爆炸的时代,个人和企业用户的电脑桌面常常被海量文件占据,导致查找特定文件如同大海捞针。为了解决这一痛点,Llama 3应运而生——一个集成了先进多模态AI技术的智能文件管家,旨在将杂乱无章的文件世界变得井然有序。本文将深入探讨Llama 3如何利用其创新功能,不仅自…

上传RKP 证书签名请求息上传到 Google 的后端服务器

上传证书签名请求 1.准备环境&#xff1a;OK pip3 install google-auth2.13.0 requests2.28下载 device_info_uploader.py 。 没找到先跳过 选项 1&#xff1a;通过 GCP 帐户使用 device_info_uploader.py 运行脚本。 ./device_info_uploader.py --credentials /secure/s…

简要分析学习spring内存马,劫持马

简要分析学习spring内存马&#xff0c;劫持马 本文主要是通过SpringMemShell这个工程&#xff0c;来对spring内存马进行演示&#xff0c;利用。 写在前面&#xff1a; 参考的是大佬给的流程以及思路,其中的解释与分析非常详细 ----->>大佬的链接 这里的内存马文件取自gi…

python采集汽车价格数据

python采集汽车价格数据 一、项目简介二、完整代码一、项目简介 本次数据采集的目标是车主之家汽车价格数据,采集的流程包括寻找数据接口、发送请求获取响应、解析数据和持久化存储,先来看一下数据情况,完整代码附后: 二、完整代码 #输入请求页面url #返回html文档 imp…

Scikit-Learn 基础教程

目录 &#x1f40b;Scikit-Learn 基础教程 &#x1f40b;Scikit-Learn 简介 &#x1f40b; 数据预处理 &#x1f988;数据集导入 &#x1f988;数据清洗 &#x1f988;特征选择 &#x1f988;特征标准化 &#x1f40b; 模型选择 &#x1f988;分类模型 &#x1f988;回…

Resilience4j结合微服务出现的异常

Resilience4j结合微服务出现的异常 1、retry未生效 由于支持aop&#xff0c;所以要引入aop的依赖。 <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-aop</artifactId> </dependency>2、circ…