掌握大数据处理利器:Flink 知识点全面总结【上】

news2025/1/4 9:29:32

1.Flink的特点

Apache Flink 是一个框架分布式处理引擎,用于对无界有界数据流进行状态计算

Flink主要特点如下:
  • 高吞吐和低延迟。每秒处理数百万个事件,毫秒级延迟。
  • 结果的准确性。Flink提供了事件时间(event--time)和处理时间(processing-time)语义。
对于乱序事件流,事件时间语义仍然能提供一致且准确的结果。
  • 精确一次(exactly-once)的状态一致性保证。
  • 可以连接到最常用的外部系统,如Kafka、Hive、JDBC、HDFS、Redis等。
  • 高可用。本身高可用的设置,加上与K8s,YARN和Mesos的紧密集成,再加上从故障中
快速恢复和动态扩展任务的能力,Flink能做到以极少的停机时间7×24全天候运行。

2.分层API

        有状态流处理:通过底层API〔处理函数),对最原始数据加工处理。底层API与DataStream API相集成,可以处理复杂的计算。
        DataStream API(流处理)和DataSet API(批处理)封装了底层处理函数,提供了通用的模块,比如转换(transformations,包括map、flatmap等),连接(joins),聚合(aggregations),窗口(windows)操作等。注意:Flink1.l2以后,DataStream API已经实现真正的流批一体,所以DataSet API已经过时。
        Table API是以表为中心的声明式编程,其中表可能会动态变化。Table API遵循关系模型:表有二维数据结构,类似于关系数据库中的表:同时API提供可比较的操作,例如select,project、join,group-by,aggregate等。我们可以在表与DataStream/DataSet之间无缝切换,以允许程序将Table API与DataStream以及DataSet混合使用。
        SQL这一层在语法与表达能力上与Table API类似,但是是以SQL查询表达式的形式表现程序。SQL抽象与Table API交互密切,同时SQL查询可以直接在Table API定义的表上执行。

3.流式数据,批量数据

批处理的特点是有界、大量,非常适合需要访问全套记录才能完成的计算工作,一般用于离线统计。

流处理的特点是无界、实时,  无需针对整个数据集执行操作,而是对通过系统传输的每个数据项执行操作,一般用于实时统计。

在spark的世界观中,一切都是由批次组成的,离线数据是一个大批次,而实时数据是由一个一个无限的小批次组成的。

而在flink的世界观中,一切都是由流组成的,离线数据是有界限的流,实时数据是一个没有界限的流,这就是所谓的有界流和无界流

无界数据流:

无界数据流有一个开始但是没有结束,它们不会在生成时终止并提供数据,必须连续处理无界流,也就是说必须在获取后立即处理event。对于无界数据流我们无法等待所有数据都到达,因为输入是无界的,并且在任何时间点都不会完成。处理无界数据通常要求以特定顺序(例如事件发生的顺序)获取event,以便能够推断结果完整性。

有界数据流:

有界数据流有明确定义的开始和结束,可以在执行任何计算之前通过获取所有数据来处理有界流,处理有界流不需要有序获取,因为可以始终对有界数据集进行排序,有界流的处理也称为批处理

4.单作业模式

会话模式因为资源共享会导致很多问题,所以为了更好地隔离资源,我们可以考虑为每个提交的作业启动一个集群,这就是所谓的单作业(Per-Job)模式

单作业模式,就是严格的一对一,集群只为这个作业而生。同样由客户端运行应用程序,然后启动集群,作业被提交给 JobManager,进而分发TaskManager 执行。作业作业完成后,集群就会关闭,所有资源也会释放。这样一来,每个作业都有它自己的 JobManager管理,占用独享的资源,即使发生故障,它的 TaskManager 宕机也不会影响其他作业。

这些特性使得单作业模式在生产环境运行更加稳定,所以是实际应用的首选模式

需要注意的是,Flink 本身无法直接这样运行,所以单作业模式一般需要借助一些资源管理框架来启动集群,比如 YARN、Kubernetes。

5.并行度优先级

每一个算子( operator )可以包含一个或多个子任务( operator subtask ),这些子任务在不同的线程、不同的物理机或不同的容器中完全独立地执行。
一个特定算子的 子任务(subtask)的个数 被称为其并行度 parallelism )。

一般情况下,一个流程序的并行度,可以认为就是其所有算子中最大的并行度。一个程序中,不同的算子可能具有不同的并行度。

Stream在算子之间传输数据的形式可以是one-to-one(forwarding)的模式也可以是redistributing的模式,具体是哪一种形式,取决于算子的种类。

  • One-to-one

stream(比如在source和map operator之间)维护着分区以及元素的顺序。那意味着flatmap 算子的子任务看到的元素的个数以及顺序跟source 算子的子任务生产的元素的个数、顺序相同,map、fliter、flatMap等算子都是one-to-one的对应关系。类似于spark中的窄依赖

  • Redistributing

stream(map()跟keyBy/window之间或者keyBy/window跟sink之间)的分区会发生改变。每一个算子的子任务依据所选择的transformation发送数据到不同的目标任务。例如,keyBy()基于hashCode重分区、broadcast和rebalance会随机重新分区,这些算子都会引起redistribute过程,而redistribute过程就类似于Spark中的shuffle过程。类似于spark中的宽依赖

 并行度可以有如下几种指定方式

  1. Operator Level(算子级别)(可以使用)

    一个算子、数据源和sink的并行度可以通过调用 setParallelism()方法来指定

    image-20200921112057156

  2. Execution Environment Level(Env级别)(可以使用)

    执行环境(任务)的默认并行度可以通过调用setParallelism()方法指定。为了以并行度3来执行所有的算子、数据源和data sink, 可以通过如下的方式设置执行环境的并行度:执行环境的并行度可以通过显式设置算子的并行度而被重写

    image-20200921112112743

  3. Client Level(客户端级别,推荐使用)(可以使用)

    并行度可以在客户端将job提交到Flink时设定。

    对于CLI客户端,可以通过-p参数指定并行度 ./bin/flink run -p 10 WordCount-java.jar

  4. System Level(系统默认级别,尽量不使用)

    在系统级可以通过设置flink-conf.yaml文件中的parallelism.default属性来指定所有执行环境的默认并行度

并行度的优先级:算子级别 > env级别 > Client级别 > 系统默认级别 (越靠前具体的代码并行度的优先级越高)

6.任务调度执行图

逻辑流图(StreamGraph)→ 作业图(JobGraph)→ 执行图(ExecutionGraph)→ 物理图(Physical Graph)。

1)逻辑流图(StreamGraph

这是根据用户通过 DataStream API编写的代码生成的最初的DAG图,用来表示程序的拓扑结构。这一步一般在客户端完成。

2)作业图(JobGraph

StreamGraph经过优化后生成的就是作业图(JobGraph),这是提交给 JobManager 的数据结构,确定了当前作业中所有任务的划分。主要的优化为:将多个符合条件的节点链接在一起合并成一个任务节点,形成算子链,这样可以减少数据交换的消耗。JobGraph一般也是在客户端生成的,在作业提交时传递给JobMaster。

我们提交作业之后,打开Flink自带的Web UI,点击作业就能看到对应的作业图。

3)执行图(ExecutionGraph

JobMaster收到JobGraph后,会根据它来生成执行图(ExecutionGraph)。ExecutionGraph是JobGraph的并行化版本,是调度层最核心的数据结构。与JobGraph最大的区别就是按照并行度对并行子任务进行了拆分,并明确了任务间数据传输的方式。

4)物理图(Physical Graph

JobMaster生成执行图后,会将它分发给TaskManager;各个TaskManager会根据执行图部署任务,最终的物理执行过程也会形成一张“图”,一般就叫作物理图(Physical Graph)。这只是具体执行层面的图,并不是一个具体的数据结构。

物理图主要就是在执行图的基础上,进一步确定数据存放的位置和收发的具体方式。有了物理图,TaskManager就可以对传递来的数据进行处理计算了。

7.转换算子

数据源读入数据之后,我们就可以使用各种转换算子,将一个或多个DataStream转换为新的DataStream。

基本转换算子

1. 映射(map

map 是大家非常熟悉的大数据操作算子,主要用于将数据流中的数据进行转换,形成新的数据流。简单来说,就是一个“一 一映射”,消费一个元素就产出一个元素

我们只需要基于 DataStrema 调用 map()方法就可以进行转换处理。方法需要传入的参数是接口 MapFunction 的实现;返回值类型还是 DataStream。

public class TransMapTest {
    public static void main(String[] args) throws Exception{
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        DataStreamSource<Event> stream = env.fromElements(
                new Event("Mary", "./home", 1000L),
                new Event("Bob", "./cart", 2000L)
        );
        //1.自定义类,实现mapfunction接口
        SingleOutputStreamOperator<String> result1 = stream.map(new UserExtractor());

        // 2.传入匿名类,实现MapFunction
         SingleOutputStreamOperator<String> result2 = stream.map(new MapFunction<Event, String>() {
            @Override
            public String map(Event value) throws Exception {
                return value.user;
            }
        });

        //3.传入lambda表达式
        SingleOutputStreamOperator<String> result3 = stream.map(data -> data.user);

        result1.print();
        //result2.print();
        //result3.print();
        env.execute();
    }
    public static class UserExtractor implements MapFunction<Event, String> {
        @Override
        public String map(Event value) throws Exception {
            return value.user;
        }
    }
}

MapFunction 实现类的泛型类型,与输入数据类型和输出数据的类型有关。

在实现 MapFunction 接口的时候,需要指定两个泛型,分别是输入事件和输出事件的类型,还

需要重写一个 map()方法,定义从一个输入事件转换为另一个输出事件的具体逻辑。

2. 过滤(filter)

filter转换操作,顾名思义是对数据流执行一个过滤,通过一个布尔条件表达式设置过滤条件,对于每一个流内元素进行判断,若为true则元素正常输出,若为false则元素被过滤掉。

进行filter转换之后的新数据流的数据类型与原数据流是相同的。filter转换需要传入的参数需要实现FilterFunction接口,而FilterFunction内要实现filter()方法,就相当于一个返回布尔类型的条件表达式。

public class TransFilterTest {
    public static void main(String[] args) throws Exception{
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        DataStreamSource<Event> stream = env.fromElements(
                new Event("Mary", "./home", 1000L),
                new Event("Bob", "./cart", 2000L)
        );

        //1.传入实现filterfunction的自定义类
        SingleOutputStreamOperator<Event> result1 = stream.filter(new UserFilter());

        // 2.传入匿名类实现FilterFunction
        SingleOutputStreamOperator<Event> result2 = stream.filter(new FilterFunction<Event>() {
            @Override
            public boolean filter(Event e) throws Exception {
                return e.user.equals("Bob");
            }
        });

        //3.传入lambda表达式
        SingleOutputStreamOperator<Event> result3 = stream.filter(data -> data.user.equals("Bob"));

        result1.print();
      //  result2.print();
      //  result3.print();

        env.execute();
    }
    public static class UserFilter implements FilterFunction<Event> {
        @Override
        public boolean filter(Event e) throws Exception {
            return e.user.equals("Mary");
        }
    }
}

3. 扁平映射(flatMap))

flatMap 操作又称为扁平映射,主要是将数据流中的整体(一般是集合类型)拆分成一个一个的个体使用。消费一个元素,可以产生 0 到多个元素。flatMap 可以认为是“扁平化”(flatten)和“映射”(map)两步操作的结合,也就是先按照某种规则对数据进行打散拆分,再对拆分后的元素做转换处理.

同 map 一样,flatMap 也可以使用 Lambda 表达式或者 FlatMapFunction 接口实现类的方式

来进行传参,返回值类型取决于所传参数的具体逻辑,可以与原数据流相同,也可以不同。

public class TransFlatmapTest {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        DataStreamSource<Event> stream = env.fromElements(
                new Event("Mary", "./home", 1000L),
                new Event("Bob", "./cart", 2000L)
        );

        //1.实现自定义的flatmapfunction
         stream.flatMap(new MyFlatMap()).print("1");

         //2.传入lambda表达式
        stream.flatMap((Event value,Collector<String> out)-> {
            if (value.user.equals("Marry"))
                out.collect(value.url);
            else if(value.user.equals("Bob"))
                out.collect(value.user);
                out.collect(value.url);
                out.collect(value.timestamp.toString());
        }).returns(new TypeHint<String>() {
                })
                .print("2");

        env.execute();
    }
    public static class MyFlatMap implements FlatMapFunction<Event, String> {
        @Override
        public void flatMap(Event value, Collector<String> out) throws Exception {
                out.collect(value.user);
                out.collect(value.url);
                out.collect(value.timestamp.toString());
            }
        }
    }

flatMap 操作会应用在每一个输入事件上面,FlatMapFunction 接口中定义了 flatMap 方法,

用户可以重写这个方法,在这个方法中对输入数据进行处理,并决定是返回 0 个、1 个或多个

结果数据。因此 flatMap 并没有直接定义返回值类型,而是通过一个“收集器”(Collector)来

指定输出。希望输出结果时,只要调用收集器的.collect()方法就可以了;这个方法可以多次调

用,也可以不调用。所以 flatMap 方法也可以实现 map 方法和 filter 方法的功能,当返回结果

是 0 个的时候,就相当于对数据进行了过滤,当返回结果是 1 个的时候,相当于对数据进行了

简单的转换操作。

聚合算子(Aggregation

1. 按键分区(keyBy)

对于Flink而言,DataStream是没有直接进行聚合的API的。因为我们对海量数据做聚合肯定要进行分区并行处理,这样才能提高效率。所以在Flink中,要做聚合,需要先进行分区;这个操作就是通过keyBy来完成的。

keyBy是聚合前必须要用到的一个算子。keyBy通过指定键(key),可以将一条流从逻辑上划分成不同的分区(partitions)。这里所说的分区,其实就是并行处理的子任务。

基于不同的key,流中的数据将被分配到不同的分区中去;这样一来,所有具有相同的key的数据,都将被发往同一个分区。

2. 简单聚合

有了按键分区的数据流KeyedStream,我们就可以基于它进行聚合操作了。Flink为我们内置实现了一些最基本、最简单的聚合API,主要有以下几种:

  1. sum():在输入流上,对指定的字段做叠加求和的操作。
  2. min():在输入流上,对指定的字段求最小值。
  3. max():在输入流上,对指定的字段求最大值。
  4. minBy():与min()类似,在输入流上针对指定字段求最小值。不同的是,min()只计算指定字段的最小值,其他字段会保留最初第一个数据的值;而minBy()则会返回包含字段最小值的整条数据。
  5. maxBy():与max()类似,在输入流上针对指定字段求最大值。两者区别与min()/minBy()完全一致。

简单聚合算子使用非常方便,语义也非常明确。这些聚合方法调用时,也需要传入参数;但并不像基本转换算子那样需要实现自定义函数,只要说明聚合指定的字段就可以了。指定字段的方式有两种:指定位置,和指定名称

public class TransformSimpleAggTest {
    public static void main(String[] args) throws Exception{
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        DataStreamSource<Event> stream = env.fromElements(
                new Event("Mary", "./home", 1000L),
                new Event("Bob", "./cart", 2000L),
                new Event("Alice","./prod?id=100", 3000L),
                new Event("Bob","./prod?id=1", 3300L),
                new Event("Bob", "./home", 3500L),
                new Event("Alice","./prod?id=200", 3200L),
                new Event("Bob","./prod?id=2", 3800L),
                new Event("Bob","./prod?id=3", 4200L)
        );

        // 按键分组之后进行聚合,提取当前用户最后一次访问数据
        stream.keyBy(new KeySelector<Event, String>() {
            @Override
            public String getKey(Event value) throws Exception {
                return value.user;
            }
        }).max("timestamp")
                .print("max:");

        stream.keyBy(data -> data.user)
                .maxBy("timestamp")
                .print("maxBy:");

        env.execute();

    }
}

3. 归约聚合(reduce

与简单聚合类似,reduce 操作也会将 KeyedStream 转换为 DataStream。它不会改变流的元素数据类型,所以输出类型和输入类型是一样的。

调用 KeyedStream 的 reduce 方法时,需要传入一个参数,实现 ReduceFunction 接口。接口在源码中的定义如下:

public interface ReduceFunction<T> extends Function, Serializable {
T reduce(T value1, T value2) throws Exception;
}

ReduceFunction 接口里需要实现 reduce()方法,这个方法接收两个输入事件,经过转换处理之后输出一个相同类型的事件;所以,对于一组数据,我们可以先取两个进行合并,然后再将合并的结果看作一个数据、再跟后面的数据合并,最终会将它“简化”成唯一的一个数据,这也就是 reduce“归约”的含义。在流处理的底层实现过程中,实际上是将中间“合并的结果”作为任务的一个状态保存起来的;之后每来一个新的数据,就和之前的聚合状态进一步做归约。

public class TransformReduceTest {
    public static void main(String[] args) throws Exception{
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        DataStreamSource<Event> stream = env.fromElements(
                new Event("Mary", "./home", 1000L),
                new Event("Bob", "./cart", 2000L),
                new Event("Alice","./prod?id=100", 3000L),
                new Event("Bob","./prod?id=1", 3300L),
                new Event("Alice","./prod?id=200", 3200L),
                new Event("Bob", "./home", 3500L),
                new Event("Bob","./prod?id=2", 3800L),
                new Event("Bob","./prod?id=3", 4200L)
        );

        //1. 统计每个用户的访问频次
        SingleOutputStreamOperator<Tuple2<String, Long>> clicksByUser = stream.map(new MapFunction<Event, Tuple2<String, Long>>() {
                    @Override
                    public Tuple2<String, Long> map(Event value) throws Exception {
                        return Tuple2.of(value.user, 1L);
                    }
                }).keyBy(data -> data.f0)
                .reduce(new ReduceFunction<Tuple2<String, Long>>() {
                    @Override
                    public Tuple2<String, Long> reduce(Tuple2<String, Long> value1, Tuple2<String, Long> value2) throws Exception {
                        return Tuple2.of(value1.f0, value1.f1 + value2.f1);
                    }
                });

        //2. 选取当前最活跃的用户
        SingleOutputStreamOperator<Tuple2<String, Long>> result = clicksByUser.keyBy(data -> "key")
                .reduce(new ReduceFunction<Tuple2<String, Long>>() {
                    @Override
                    public Tuple2<String, Long> reduce(Tuple2<String, Long> value1, Tuple2<String, Long> value2) throws Exception {
                        return value1.f1 > value2.f1 ? value1 : value2;
                    }
                });

        result.print();
        env.execute();
    }
}

8.输出算子sink

连接到外部系统

Flink作为数据处理框架,最终还是要把计算处理的结果写入外部存储,为外部应用提供支持。

Flink的DataStream API专门提供了向外部写入数据的方法:addSink。与addSource类似,addSink方法对应着一个“Sink”算子,主要就是用来实现与外部系统连接、并将数据提交写入的;Flink程序中所有对外的输出操作,一般都是利用Sink算子完成的。

Sink 一词有“下沉”的意思,有些资料会相对于“数据源”把它翻译为“数据汇”。不论怎样理解,Sink 在 Flink 中代表了将结果数据收集起来、输出到外部的意思,所以我们这里统一把它直观地叫作“输出算子”。

与 Source 算子非常类似,除去一些 Flink 预实现的 Sink,一般情况下 Sink 算子的创建是通过调用 DataStream 的.addSink()方法实现的。

stream.addSink(new SinkFunction(…));

addSource 的参数需要实现一个 SourceFunction 接口;类似地,addSink 方法同样需要传入一个参数,实现的是 SinkFunction 接口。在这个接口中只需要重写一个方法 invoke(),用来将指定的值写入到外部系统中。这个方法在每条数据记录到来时都会调用:

default void invoke(IN value, Context context) throws Exception

SinkFuntion 多数情况下同样并不需要我们自己实现。Flink 官方提供了一部分的框架的 Sink 连接器。

列出了 Flink 官方目前支持的第三方系统连接器:

除 Flink 官方之外,Apache Bahir 作为给 Spark 和 Flink 提供扩展支持的项目,也实现了一些其他第三方系统与 Flink 的连接器:

除此以外,就需要用户自定义实现 sink 连接器了

输出到文件

Flink 为此专门提供了一个流式文件系统的连接器:StreamingFileSink,它继承自抽象类RichSinkFunction,而且集成了 Flink 的检查点(checkpoint)机制,用来保证精确一次(exactly once)的一致性语义。

StreamingFileSink 为批处理和流处理提供了一个统一的 Sink,它可以将分区文件写入 Flink支持的文件系统。它可以保证精确一次的状态一致性,大大改进了之前流式文件 Sink 的方式。它的主要操作是将数据写入桶(buckets),每个桶中的数据都可以分割成一个个大小有限的分区文件,这样一来就实现真正意义上的分布式文件存储。我们可以通过各种配置来控制“分桶”的操作;默认的分桶方式是基于时间的,我们每小时写入一个新的桶。换句话说,每个桶内保存的文件,记录的都是 1 小时的输出数据。

StreamingFileSink 支持行编码(Row-encoded)和批量编码(Bulk-encoded,比如 Parquet)格式。这两种不同的方式都有各自的构建器(builder),调用方法也非常简单,可以直接调用StreamingFileSink 的静态方法:

行编码:StreamingFileSink.forRowFormat(basePath,rowEncoder)。
批量编码:StreamingFileSink.forBulkFormat(basePath,bulkWriterFactory)。
public class SinkToFileTest {
    public static void main(String[] args) throws Exception{
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(4);

        DataStream<Event> stream = env.fromElements(new Event("Mary", "./home", 1000L),
                new Event("Bob", "./cart", 2000L),
                new Event("Alice", "./prod?id=100", 3000L),
                new Event("Bob", "./prod?id=1", 3300L),
                new Event("Alice", "./prod?id=300", 3200L),
                new Event("Bob", "./home", 3500L),
                new Event("Bob", "./prod?id=2", 3800L),
                new Event("Bob", "./prod?id=3", 4200L));

        StreamingFileSink<String> fileSink = StreamingFileSink
                .<String>forRowFormat(new Path("./output"),
                        new SimpleStringEncoder<>("UTF-8"))
                .withRollingPolicy(
                        DefaultRollingPolicy.builder()
                                .withRolloverInterval(TimeUnit.MINUTES.toMillis(15))
                                .withInactivityInterval(TimeUnit.MINUTES.toMillis(5))
                                .withMaxPartSize(1024 * 1024 * 1024)
                                .build())
                .build();

        // 将Event转换成String写入文件
        stream.map(data -> data.toString()).addSink(fileSink);

        env.execute();
    }
}

输出到Kafka

我们要将数据输出到 Kafka,整个数据处理的闭环已经形成,所以可以完整测试如下:

(1)添加 Kafka 连接器依赖

由于我们已经测试过从 Kafka 数据源读取数据,连接器相关依赖已经引入,这里就不重复

介绍了。

(2)启动 Kafka 集群

(3)编写输出到 Kafka 的示例代码

1.启动zookeeper

命令:bin/zkServer.sh start

2.启动kafka

命令:bin/kafka-server-start.sh -daemon config/server.properties

3.创建生产者

命令:./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic clicks

addSink 传入的参数是一个 FlinkKafkaProducer。这也很好理解,因为需要向 Kafka 写入数据,自然应该创建一个生产者。FlinkKafkaProducer 继承了抽象类TwoPhaseCommitSinkFunction,这是一个实现了“两阶段提交”的 RichSinkFunction。两阶段提交提供了 Flink 向 Kafka 写入数据的事务性保证,能够真正做到精确一次(exactly once)的状态一致性。

4. 运行代码,另开启一个master01窗口,启动一个消费者, 查看是否收到数据

bin/kafka-console-consumer.sh --bootstrap-server  localhost:9092 --topic events

public class SinkToKafkaTest {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        Properties properties = new Properties();
        properties.put("bootstrap.servers", "master01:9092");

        DataStreamSource<String> stream = env.addSource(new FlinkKafkaConsumer<String>(
                "clicks",
                new SimpleStringSchema(),
                properties
        ));

        SingleOutputStreamOperator<String> result = stream.map(new MapFunction<String, String>() {
            @Override
            public String map(String value) throws Exception {
                String[] fields = value.split(",");
                return new Event(fields[0].trim(), fields[1].trim(), Long.valueOf(fields[2].trim())).toString();
            }
        });

        result.addSink(new FlinkKafkaProducer<String>("master01:9092", "events", new SimpleStringSchema()));

        env.execute();
    }
}

结果:


2025年的第一天我居然还在复习【枯死】

新的一年,梦虽遥,追则能达。愿虽艰,持则可圆。祝大家所愿皆所成,多喜乐,长安宁。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2269131.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

国产数据库-崖山使用介绍

本文档基于崖山数据库23.3 个人版本&#xff0c;单机&#xff08;主备&#xff09;部署模式的情况下的使用介绍。 数据库实例状态&#xff1a; NOMOUNT&#xff1a;仅读取参数文件&#xff0c;不加载数据库 MOUNT&#xff1a;读取控制文件&#xff0c;加载数据库&#xff…

Pytest基础01: 入门demo脚本

目录 1 Pytest接口测试 1.1 最简单版hello world 1.2 pytest.ini 2 pytest兼容unittest 3 封装pytest执行入口 1 Pytest接口测试 Pyest是一个可以用于接口测试的强大框架&#xff0c;开源社区也有非常多的pytest插件。 按江湖传统&#xff0c;学习一个新语言或者新框架&…

如何在没有 iCloud 的情况下将数据从 iPhone 传输到 iPhone

概括 您可能会遇到将数据从 iPhone 转移到 iPhone 的情况&#xff0c;尤其是当您获得新的 iPhone 15/14 时&#xff0c;您会很兴奋并希望将数据转移到它。 使用iCloud最终可以做到这一点&#xff0c;但它的缺点也不容忽视&#xff0c;阻碍了你选择它。例如&#xff0c;您需要…

streamlit、shiny、gradio、fastapi四个web APP平台体验

streamlit、shiny、gradio、fastapi四个web APP平台体验 经常被问的问题就是&#xff1a;web APP平台哪个好&#xff1f;该用哪个&#xff1f;刚开始只有用streamlit和shiny&#xff0c;最近体验了一下gradio和fastapi&#xff0c;今天根据自己的体会尝试着回答一下。 使用R语…

HTML5滑块(Slider)

HTML5 的滑块&#xff08;Slider&#xff09;控件允许用户通过拖动滑块来选择数值。以下是如何实现一个简单的滑块组件的详细说明。 HTML5 滑块组件 1. 基本结构 使用 <input type"range"> 元素可以创建一个滑块。下面是基本实现的代码示例&#xff1a; <…

探索 .idea 文件夹:Java Maven 工程的隐形守护者

一、.idea文件夹深度解析&#xff1a;IntelliJ IDEA项目配置的核心 在Java Maven工程的开发环境中&#xff0c;.idea文件夹扮演着举足轻重的角色。这是IntelliJ IDEA项目特有的一个配置文件夹&#xff0c;它包含了项目所需的各种配置信息&#xff0c;以确保项目能够在不同的开发…

遥感图像车辆检测-目标检测数据集

遥感图像车辆检测-目标检测数据集&#xff08;包括VOC格式、YOLO格式&#xff09; 数据集&#xff1a; 链接: https://pan.baidu.com/s/1XVlRTVWpXZFi6ZL_Xcs7Rg?pwdaa6g 提取码: aa6g 数据集信息介绍&#xff1a; 共有 1035 张图像和一一对应的标注文件 标注文件格式提供了…

[Qt] Qt介绍 | 搭建SDK

目录 1. Qt 简介 什么是 Qt&#xff1f; 1.1 引入 1.2 GUI 1.3 Qt 介绍 2. Qt 发展史 3. Qt 支持的平台 4. Qt 版本信息 5. Qt 的优点 6. Qt 应用场景 7. Qt 成功案例 8. Qt 发展前景及就业分析 二. Qt 开发环境搭建 1. 开发工具概述 2.Qt SDK 安装 3.使用 1. …

【机器学习】机器学习的基本分类-自监督学习-对比学习(Contrastive Learning)

对比学习是一种自监督学习方法&#xff0c;其目标是学习数据的表征&#xff08;representation&#xff09;&#xff0c;使得在表征空间中&#xff0c;相似的样本距离更近&#xff0c;不相似的样本距离更远。通过设计对比损失函数&#xff08;Contrastive Loss&#xff09;&…

xterm + vue3 + websocket 终端界面

xterm.js 下载插件 // xterm npm install --save xterm// xterm-addon-fit 使终端适应包含元素 npm install --save xterm-addon-fit// xterm-addon-attach 通过websocket附加到运行中的服务器进程 npm install --save xterm-addon-attach <template><div :…

记一次护网通过外网弱口令一路到内网

视频教程在我主页简介或专栏里 目录&#xff1a; 资产收集 前期打点 突破 完结 又是年底护网季&#xff0c;地市护网有玄机&#xff0c;一路磕磕又绊绊&#xff0c;终是不负领导盼。 扯远了-_-!!&#xff0c;年底来了一个地市级护网&#xff0c;开头挺顺利的&#xff0c…

XIAO ESP32 S3网络摄像头——2视频获取

本文主要是使用XIAO Esp32 S3制作网络摄像头的第2步,获取摄像头图像。 1、效果如下: 2、所需硬件 3、代码实现 3.1硬件代码: #include "WiFi.h" #include "WiFiClient.h" #include "esp_camera.h" #include "camera_pins.h"// 设…

uniapp:微信小程序文本长按无法出现复制菜单

一、问题描述 在集成腾讯TUI后&#xff0c;为了能让聊天文本可以复制&#xff0c;对消息组件的样式进行修改&#xff0c;主要是移除下面的user-select属性限制&#xff1a; user-select: none;-webkit-user-select: none;-khtml-user-select: none;-moz-user-select: none;-ms…

2025:OpenAI的“七十二变”?

朋友们&#xff0c;准备好迎接AI的狂欢了吗&#xff1f;&#x1f680; 是不是跟我一样&#xff0c;每天醒来的第一件事就是看看AI领域又有什么新动向&#xff1f; 尤其是那个名字如雷贯耳的 OpenAI&#xff0c;简直就是AI界的弄潮儿&#xff0c;一举一动都牵动着我们这些“AI发…

无人机频射信号检测数据集,平均正确识别率在94.3%,支持yolo,coco json,pasical voc xml格式的标注,364张原始图片

无人机频射信号检测数据集&#xff0c;平均正确识别率在94.3&#xff05;&#xff0c;支持yolo&#xff0c;coco json&#xff0c;pasical voc xml格式的标注&#xff0c;364张原始图片 可识别下面的信号&#xff1a; 图像传输信号LFST &#xff08;Image_Transmission_sign…

柱状图中最大的矩形 - 困难

************* c topic: 84. 柱状图中最大的矩形 - 力扣&#xff08;LeetCode&#xff09; ************* chenck the topic first: Think about the topics I have done before. the rains project comes:盛最多水的容器 - 中等难度-CSDN博客https://blog.csdn.net/ElseWhe…

第17篇 使用数码管实现计数器___ARM汇编语言程序<四>

Q&#xff1a;如何使用定时器实现数码管循环计数器&#xff1f; A&#xff1a;DE1-SoC_Computer系统有许多硬件定时器&#xff0c;本次实验使用A9 Private Timer定时器实现延时&#xff1a;定时器首先向Load寄存器写入计数值&#xff0c;然后向Control寄存器中的使能位E写1来启…

SSM 进销存系统

&#x1f942;(❁◡❁)您的点赞&#x1f44d;➕评论&#x1f4dd;➕收藏⭐是作者创作的最大动力&#x1f91e; &#x1f496;&#x1f4d5;&#x1f389;&#x1f525; 支持我&#xff1a;点赞&#x1f44d;收藏⭐️留言&#x1f4dd;欢迎留言讨论 &#x1f525;&#x1f525;&…

通过Cephadm工具搭建Ceph分布式存储以及通过文件系统形式进行挂载的步骤

1、什么是Ceph Ceph是一种开源、分布式存储系统&#xff0c;旨在提供卓越的性能、可靠性和可伸缩性。它是为了解决大规模数据存储问题而设计的&#xff0c;使得用户可以在无需特定硬件支持的前提下&#xff0c;通过普通的硬件设备来部署和管理存储解决方案。Ceph的灵活性和设计…

【Rust自学】8.4. String类型 Pt.2:字节、标量值、字形簇以及字符串的各类操作

8.4.0. 本章内容 第八章主要讲的是Rust中常见的集合。Rust中提供了很多集合类型的数据结构&#xff0c;这些集合可以包含很多值。但是第八章所讲的集合与数组和元组有所不同。 第八章中的集合是存储在堆内存上而非栈内存上的&#xff0c;这也意味着这些集合的数据大小无需在编…