Flink中核心重点总结

news2024/9/23 15:28:11

目录

1. 算子链

1.1. 一对一(One-to-one, forwarding)

1.2. 重分区(Redistributing)

1.3. 为什么有算子链

2. 物理分区(Physical Partitioning)

2.1. 什么是分区

2.2. 随机分区(shuffle)

2.3. 轮询分区(Round-Robin)

2.4. 重缩放分区(rescale)

2.5. 广播(broadcast)

2.6. 全局分区(global)

2.7. 自定义分区(Custom)

3. 输出算子(Sink)

3.1. 连接到外部系统

3.2. 输出到文件

3.3. 输出到 Kafka

3.4. 输出到 Redis

3.5. 输出到 Elasticsearch

3.6. 输出到 MySQL(JDBC)

3.7. 自定义 Sink 输出

4. 设置watermark以及延迟时间

4.1. 在1.11版本以及之前

4.2. 在1.12版本以及之后

5. Flink中的窗口

5.1. 窗口函数

5.2. 窗口的触发器

6. 多流转换

6.1. 分流

6.2. 基本合流操作

6.3. 基于时间的合流——双流联结(Join)

6.4. 窗口同组联结(Window CoGroup)

7. 状态编程

7.1. 状态的分类

7.2. 状态生存时间(TTL)

7.3. 状态后端

8. 容错机制

8.1. 检查点(Checkpoint )

8.2. 保存点(Savepoint)

8.3. 状态一致性

9. Table API 和 SQL

9.1. TableAPI依赖说明

9.2. 执行环境的创建


1. 算子链

1.1. 一对一(One-to-one, forwarding)

        这种模式下,数据流维护着分区以及元素的顺序。比如图中的 source 和 map 算子, source算子读取数据之后,可以直接发送给 map 算子做处理,它们之间不需要重新分区,也不需要调整数据的顺序。这就意味着 map 算子的子任务,看到的元素个数和顺序跟 source 算子的子任务产生的完全一样,保证着“一对一”的关系。 map、 filter、 flatMap 等算子都是这种 one-to-one的对应关系。

1.2. 重分区(Redistributing)

        在这种模式下,数据流的分区会发生改变。比图中的 map 和后面的 keyBy/window 算子之间(这里的 keyBy 是数据传输算子,后面的 window、 apply 方法共同构成了 window 算子) ,以及 keyBy/window 算子和 Sink 算子之间,都是这样的关系。

        每一个算子的子任务,会根据数据传输的策略,把数据发送到不同的下游目标任务。例如,keyBy()是分组操作,本质上基于键(key)的哈希值(hashCode)进行了重分区;而当并行度改变时,比如从并行度为 2 的 window 算子,要传递到并行度为 1 的 Sink 算子,这时的数据传输方式是再平衡(rebalance),会把数据均匀地向下游子任务分发出去。这些传输方式都会引起重分区(redistribute)的过程,这一过程类似于 Spark 中的 shuffle。

1.3. 为什么有算子链

        Flink 为什么要有算子链这样一个设计呢?这是因为将算子链接成 task 是非常有效的优化:可以减少线程之间的切换和基于缓存区的数据交换,在减少时延的同时提升吞吐量。

Flink 默认会按照算子链的原则进行链接合并,如果我们想要禁止合并或者自行定义,也可以在代码中对算子做一些特定的设置:

// 禁用算子链
.map(word -> Tuple2.of(word, 1L)).disableChaining();
// 从当前算子开始新链
.map(word -> Tuple2.of(word, 1L)).startNewChain()

2. 物理分区(Physical Partitioning)

2.1. 什么是分区

        顾名思义,“分区”(partitioning) 操作就是要将数据进行重新分布,传递到不同的流分区去进行下一步处理。其实我们对分区操作并不陌生,前面介绍聚合算子时,已经提到了 keyBy,它就是一种按照键的哈希值来进行重新分区的操作。只不过这种分区操作只能保证把数据按key“分开”,至于分得均不均匀、每个 key 的数据具体会分到哪一区去,这些是完全无从控制的——所以我们有时也说, keyBy 是一种逻辑分区(logical partitioning)操作。

        如果说 keyBy 这种逻辑分区是一种“软分区”,那真正硬核的分区就应该是所谓的“物理分区”(physical partitioning)。也就是我们要真正控制分区策略,精准地调配数据,告诉每个数据到底去哪里。其实这种分区方式在一些情况下已经在发生了:例如我们编写的程序可能对多个处理任务设置了不同的并行度,那么当数据执行的上下游任务并行度变化时,数据就不应该还在当前分区以直通(forward)方式传输了——因为如果并行度变小,当前分区可能没有下游任务了;而如果并行度变大,所有数据还在原先的分区处理就会导致资源的浪费。 所以这种情况下,系统会自动地将数据均匀地发往下游所有的并行任务,保证各个分区的负载均衡。

        有些时候,我们还需要手动控制数据分区分配策略。比如当发生数据倾斜的时候,系统无法自动调整,这时就需要我们重新进行负载均衡,将数据流较为平均地发送到下游任务操作分区中去。 Flink 对于经过转换操作之后的 DataStream,提供了一系列的底层操作接口,能够帮我们实现数据流的手动重分区。为了同 keyBy 相区别,我们把这些操作统称为“物理分区”操作。物理分区与 keyBy 另一大区别在于, keyBy 之后得到的是一个 KeyedStream,而物理分区之后结果仍是 DataStream,且流中元素数据类型保持不变。从这一点也可以看出,分区算子并不对数据进行转换处理,只是定义了数据的传输方式。

        常见的物理分区策略有随机分配(Random)、轮询分配(Round-Robin)、重缩放(Rescale)和广播(Broadcast),下边我们分别来做了解。

2.2. 随机分区(shuffle)

        最简单的重分区方式就是直接“洗牌”。通过调用 DataStream 的.shuffle()方法,将数据随机地分配到下游算子的并行任务中去。

        随机分区服从均匀分布(uniform distribution),所以可以把流中的数据随机打乱,均匀地传递到下游任务分区,如图 5-9 所示。因为是完全随机的,所以对于同样的输入数据, 每次执行得到的结果也不会相同。

        经过随机分区之后,得到的依然是一个 DataStream。我们可以做个简单测试:将数据读入之后直接打印到控制台,将输出的并行度设置为 4,中间经历一次 shuffle。执行多次,观察结果是否相同。

import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class ShuffleTest {
    public static void main(String[] args) throws Exception {
        // 创建执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        
        // 读取数据源,并行度为 1
        DataStreamSource<Event> stream = env.addSource(new ClickSource());
    
        // 经洗牌后打印输出,并行度为 4
        stream.shuffle().print("shuffle").setParallelism(4);
        env.execute();
    }
}

可以得到如下形式的输出结果:

shuffle:1> Event{user='Bob', url='./cart', timestamp=...}
shuffle:4> Event{user='Cary', url='./home', timestamp=...}
shuffle:3> Event{user='Alice', url='./fav', timestamp=...}
shuffle:4> Event{user='Cary', url='./cart', timestamp=...}
shuffle:3> Event{user='Cary', url='./fav', timestamp=...}
shuffle:1> Event{user='Cary', url='./home', timestamp=...}
shuffle:2> Event{user='Mary', url='./home', timestamp=...}
shuffle:1> Event{user='Bob', url='./fav', timestamp=...}
shuffle:2> Event{user='Mary', url='./home', timestamp=...}

2.3. 轮询分区(Round-Robin)

        轮询也是一种常见的重分区方式。简单来说就是“发牌”,按照先后顺序将数据做依次分发,如图 5-10 所示。通过调用 DataStream 的.rebalance()方法,就可以实现轮询重分区。 rebalance使用的是 Round-Robin 负载均衡算法,可以将输入流数据平均分配到下游的并行任务中去。

注: Round-Robin 算法用在了很多地方,例如 Kafka 和 Nginx。

我们同样可以在代码中进行测试:

import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class RebalanceTest {
    
    public static void main(String[] args) throws Exception {

        // 创建执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        // 读取数据源,并行度为 1
        DataStreamSource<Event> stream = env.addSource(new ClickSource());

        // 经轮询重分区后打印输出,并行度为 4
        stream.rebalance().print("rebalance").setParallelism(4);
        env.execute();
    }
    
}

输出结果的形式如下所示,可以看到,数据被平均分配到所有并行任务中去了。

rebalance:2> Event{user='Cary', url='./fav', timestamp=...}
rebalance:3> Event{user='Mary', url='./cart', timestamp=...}
rebalance:4> Event{user='Mary', url='./fav', timestamp=...}
rebalance:1> Event{user='Cary', url='./home', timestamp=...}
rebalance:2> Event{user='Cary', url='./cart', timestamp=...}
rebalance:3> Event{user='Alice', url='./prod?id=1', timestamp=...}
rebalance:4> Event{user='Cary', url='./prod?id=2', timestamp=...}
rebalance:1> Event{user='Bob', url='./prod?id=2', timestamp=...}
rebalance:2> Event{user='Alice', url='./prod?id=1', timestamp=...}

2.4. 重缩放分区(rescale)

        重缩放分区和轮询分区非常相似。当调用 rescale()方法时,其实底层也是使用 Round-Robin算法进行轮询,但是只会将数据轮询发送到下游并行任务的一部分中,如图 5-11 所示。也就是说,“发牌人”如果有多个,那么 rebalance 的方式是每个发牌人都面向所有人发牌;而 rescale的做法是分成小团体,发牌人只给自己团体内的所有人轮流发牌。

        当下游任务(数据接收方)的数量是上游任务(数据发送方)数量的整数倍时, rescale的效率明显会更高。比如当上游任务数量是 2,下游任务数量是 6 时,上游任务其中一个分区的数据就将会平均分配到下游任务的 3 个分区中。

        由于 rebalance 是所有分区数据的“重新平衡”,当 TaskManager 数据量较多时,这种跨节点的网络传输必然影响效率;而如果我们配置的 task slot 数量合适,用 rescale 的方式进行“局部重缩放”,就可以让数据只在当前 TaskManager 的多个 slot 之间重新分配,从而避免了网络传输带来的损耗。

        从底层实现上看, rebalance 和 rescale 的根本区别在于任务之间的连接机制不同。 rebalance将会针对所有上游任务(发送数据方)和所有下游任务(接收数据方)之间建立通信通道,这是一个笛卡尔积的关系;而 rescale 仅仅针对每一个任务和下游对应的部分任务之间建立通信通道,节省了很多资源。

可以在代码中测试如下:

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;

public class RescaleTest {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        // 这里使用了并行数据源的富函数版本
        // 这样可以调用 getRuntimeContext 方法来获取运行时上下文的一些信息
        env
                .addSource(new RichParallelSourceFunction<Integer>() {
                    @Override
                    public void run(SourceContext<Integer> sourceContext) throws Exception {
                        for (int i = 0; i < 8; i++) {

                            // 将奇数发送到索引为 1 的并行子任务
                            // 将偶数发送到索引为 0 的并行子任务
                            if ((i + 1) % 2 == getRuntimeContext().getIndexOfThisSubtask()) {
                                sourceContext.collect(i + 1);
                            }
                        }
                    }

                    @Override
                    public void cancel() {
                    }

                })
                .setParallelism(2)
                .rescale()
                .print()
                .setParallelism(4);

        env.execute();

    }
}

这里使用 rescale 方法,来做数据的分区,输出结果是:

4> 3
3> 1
1> 2
1> 6
3> 5
4> 7
2> 4
2> 8

可以将 rescale 方法换成 rebalance 方法,来体会一下这两种方法的区别。

2.5. 广播(broadcast)

        这种方式其实不应该叫做“重分区”,因为经过广播之后,数据会在不同的分区都保留一份,可能进行重复处理。可以通过调用 DataStream 的 broadcast()方法,将输入数据复制并发送到下游算子的所有并行任务中去。

具体代码测试如下:

import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class BroadcastTest {

    public static void main(String[] args) throws Exception {

        // 创建执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        // 读取数据源,并行度为 1
        DataStreamSource<Event> stream = env.addSource(new ClickSource());

        // 经广播后打印输出,并行度为 4
        stream.broadcast().print("broadcast").setParallelism(4);
        env.execute();
    }
}

输出结果的形式如下所示:

broadcast:3> Event{user='Mary', url='./cart', timestamp=...}
broadcast:1> Event{user='Mary', url='./cart', timestamp=...}
broadcast:4> Event{user='Mary', url='./cart', timestamp=...}
broadcast:2> Event{user='Mary', url='./cart', timestamp=...}
broadcast:2> Event{user='Alice', url='./fav', timestamp=...}
broadcast:1> Event{user='Alice', url='./fav', timestamp=...}
broadcast:3> Event{user='Alice', url='./fav', timestamp=...}
broadcast:4> Event{user='Alice', url='./fav', timestamp=...}

可以看到,数据被复制然后广播到了下游的所有并行任务中去了。

2.6. 全局分区(global)

        全局分区也是一种特殊的分区方式。这种做法非常极端,通过调用.global()方法,会将所有的输入流数据都发送到下游算子的第一个并行子任务中去。这就相当于强行让下游任务并行度变成了 1,所以使用这个操作需要非常谨慎,可能对程序造成很大的压力。

2.7. 自定义分区(Custom)

        当Flink提供的所有分区策略都不能满足用户的需求时,我们可以通过使用partitionCustom()方法来自定义分区策略。
        在调用时,方法需要传入两个参数,第一个是自定义分区器(Partitioner)对象,第二个是应用分区器的字段,它的指定方式与keyBy指定key基本一样:可以通过字段名称指定,也可以通过字段位置索引来指定,还可以实现一个KeySelector。

例如,我们可以对一组自然数按照奇偶性进行重分区。代码如下:

import org.apache.flink.api.common.functions.Partitioner;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class CustomPartitionTest {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        // 将自然数按照奇偶分区
        env.fromElements(1, 2, 3, 4, 5, 6, 7, 8)
                .partitionCustom(new Partitioner<Integer>() {
                    @Override
                    public int partition(Integer key, int numPartitions) {
                        return key % 2;
                    }
                }, new KeySelector<Integer, Integer>() {
                    @Override
                    public Integer getKey(Integer value) throws Exception {
                        return value;
                    }
                })
                .print()
                .setParallelism(2);
        
        env.execute();
    }
}

3. 输出算子(Sink)

3.1. 连接到外部系统

        在 Flink 中,如果我们希望将数据写入外部系统,其实并不是一件难事。我们知道所有算子都可以通过实现函数类来自定义处理逻辑,所以只要有读写客户端,与外部系统的交互在任何一个处理算子中都可以实现。例如在 MapFunction 中,我们完全可以构建一个到 Redis 的连接,然后将当前处理的结果保存到 Redis 中。如果考虑到只需建立一次连接,我们也可以利用RichMapFunction,在 open() 生命周期中做连接操作。

        这样看起来很方便,却会带来很多问题。 Flink 作为一个快速的分布式实时流处理系统,对稳定性和容错性要求极高。一旦出现故障,我们应该有能力恢复之前的状态,保障处理结果的正确性。这种性质一般被称作“状态一致性”。 Flink 内部提供了一致性检查点(checkpoint)来保障我们可以回滚到正确的状态;但如果我们在处理过程中任意读写外部系统,发生故障后就很难回退到从前了。

        为了避免这样的问题, Flink 的 DataStream API 专门提供了向外部写入数据的方法:addSink。与 addSource 类似, addSink 方法对应着一个“Sink”算子,主要就是用来实现与外部系统连接、并将数据提交写入的; Flink 程序中所有对外的输出操作,一般都是利用 Sink 算子完成的。

        Sink 一词有“下沉”的意思,有些资料会相对于“数据源”把它翻译为“数据汇”。不论怎样理解, Sink 在 Flink 中代表了将结果数据收集起来、输出到外部的意思,所以我们这里统一把它直观地叫作“输出算子”。

        之前我们一直在使用的 print 方法其实就是一种 Sink,它表示将数据流写入标准控制台打印输出。查看源码可以发现, print 方法返回的就是一个 DataStreamSink。

public DataStreamSink<T> print(String sinkIdentifier) {
PrintSinkFunction<T> printFunction = new PrintSinkFunction<>(sinkIdentifier, false);
    return addSink(printFunction).name("Print to Std. Out");
}

与 Source 算子非常类似,除去一些 Flink 预实现的 Sink,一般情况下 Sink 算子的创建是通过调用 DataStream 的.addSink()方法实现的。

stream.addSink(new SinkFunction(…));

addSource 的参数需要实现一个 SourceFunction 接口;类似地, addSink 方法同样需要传入一个参数,实现的是 SinkFunction 接口。在这个接口中只需要重写一个方法 invoke(),用来将指定的值写入到外部系统中。这个方法在每条数据记录到来时都会调用:

default void invoke(IN value, Context context) throws Exception

当然, SinkFuntion 多数情况下同样并不需要我们自己实现。 Flink 官方提供了一部分的框架的 Sink 连接器。如下图所示,列出了 Flink 官方目前支持的第三方系统连接器:

        我们可以看到,像 Kafka 之类流式系统, Flink 提供了完美对接, source/sink 两端都能连接,可读可写;而对于 Elasticsearch、文件系统(FileSystem)、 JDBC 等数据存储系统,则只提供了输出写入的 sink 连接器。

除 Flink 官方之外, Apache Bahir 作为给 Spark 和 Flink 提供扩展支持的项目,也实现了一些其他第三方系统与 Flink 的连接器,如下图所示:

除此以外,就需要用户自定义实现 sink 连接器了。

3.2. 输出到文件

        最简单的输出方式,当然就是写入文件了。对应着读取文件作为输入数据源, Flink 本来也有一些非常简单粗暴的输出到文件的预实现方法:如 writeAsText()、 writeAsCsv(),可以直接将输出结果保存到文本文件或 Csv 文件。但我们知道,这种方式是不支持同时写入一份文件的;所以我们往往会将最后的 Sink 操作并行度设为 1,这就大大拖慢了系统效率;而且对于故障恢复后的状态一致性,也没有任何保证。所以目前这些简单的方法已经要被弃用。

        Flink 为此专门提供了一个流式文件系统的连接器: StreamingFileSink,它继承自抽象类RichSinkFunction,而且集成了 Flink 的检查点(checkpoint)机制,用来保证精确一次(exactlyonce)的一致性语义。

        StreamingFileSink 为批处理和流处理提供了一个统一的 Sink,它可以将分区文件写入 Flink支持的文件系统。它可以保证精确一次的状态一致性, 大大改进了之前流式文件 Sink 的方式。它的主要操作是将数据写入桶(buckets),每个桶中的数据都可以分割成一个个大小有限的分区文件,这样一来就实现真正意义上的分布式文件存储。我们可以通过各种配置来控制“分桶”的操作; 默认的分桶方式是基于时间的,我们每小时写入一个新的桶。换句话说,每个桶内保存的文件,记录的都是 1 小时的输出数据。

        StreamingFileSink 支持行编码(Row-encoded)和批量编码(Bulk-encoded, 比如 Parquet)格式。这两种不同的方式都有各自的构建器(builder),调用方法也非常简单,可以直接调用StreamingFileSink 的静态方法:

  • 行编码: StreamingFileSink.forRowFormat(basePath, rowEncoder)。
  • 批量编码: StreamingFileSink.forBulkFormat(basePath, bulkWriterFactory)。

在创建行或批量编码 Sink 时,我们需要传入两个参数,用来指定存储桶的基本路径(basePath)和数据的编码逻辑(rowEncoder 或 bulkWriterFactory)。

下面我们就以行编码为例,将一些测试数据直接写入文件:

import org.apache.flink.api.common.serialization.SimpleStringEncoder;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
import java.util.concurrent.TimeUnit;

public class SinkToFileTest {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(4);
        DataStreamSource<Event> stream = env.fromElements(
                new Event("Mary", "./home", 1000L),
                new Event("Bob", "./cart", 2000L),
                new Event("Alice", "./prod?id=100", 3000L),
                new Event("Alice", "./prod?id=200", 3500L),
                new Event("Bob", "./prod?id=2", 2500L),
                new Event("Alice", "./prod?id=300", 3600L),
                new Event("Bob", "./home", 3000L),
                new Event("Bob", "./prod?id=1", 2300L),
                new Event("Bob", "./prod?id=3", 3300L)
        );

        StreamingFileSink<String> fileSink = StreamingFileSink
                .<String>forRowFormat(new Path("./output"), new SimpleStringEncoder<>("UTF-8"))
                .withRollingPolicy(
                        DefaultRollingPolicy.builder()
                                .withRolloverInterval(TimeUnit.MINUTES.toMillis(15))
                                .withInactivityInterval(TimeUnit.MINUTES.toMillis(5))
                                .withMaxPartSize(1024 * 1024 * 1024)
                                .build())
                .build();

        // 将 Event 转换成 String 写入文件
        stream.map(Event::toString).addSink(fileSink);

        env.execute();

    }

}

        这里我们创建了一个简单的文件 Sink,通过.withRollingPolicy()方法指定了一个“滚动策略”。“滚动”的概念在日志文件的写入中经常遇到:因为文件会有内容持续不断地写入,所以我们应该给一个标准,到什么时候就开启新的文件,将之前的内容归档保存。也就是说,上面的代码设置了在以下 3 种情况下,我们就会滚动分区文件:

  • 至少包含 15 分钟的数据
  • 最近 5 分钟没有收到新的数据
  • 文件大小已达到 1 GB

3.3. 输出到 Kafka

        Kafka 是一个分布式的基于发布/订阅的消息系统,本身处理的也是流式数据,所以跟Flink“天生一对”,经常会作为 Flink 的输入数据源和输出系统。Flink 官方为 Kafka 提供了 Source和 Sink 的连接器,我们可以用它方便地从 Kafka 读写数据。如果仅仅是支持读写,那还说明不了 Kafka 和 Flink 关系的亲密;真正让它们密不可分的是, Flink 与 Kafka 的连接器提供了端到端的精确一次(exactly once)语义保证,这在实际项目中是最高级别的一致性保证。关于这部分内容,我们会在后续章节做更详细的讲解。

现在我们要将数据输出到 Kafka,整个数据处理的闭环已经形成,所以可以完整测试如下:

步骤一:添加 Kafka 连接器依赖

步骤二:启动 Kafka 集群

步骤三:编写输出到 Kafka 的示例代码(我们可以直接将用户行为数据保存为文件 clicks.csv,读取后不做转换直接写入 Kafka,主题(topic)命名为“clicks”)

import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;

import java.util.Properties;

public class SinkToKafkaTest {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        Properties properties = new Properties();
        properties.put("bootstrap.servers", "hadoop102:9092");
        DataStreamSource<String> stream = env.readTextFile("input/clicks.csv");
        stream
                .addSink(new FlinkKafkaProducer<String>(
                        "clicks",
                        new SimpleStringSchema(),
                        properties
                ));
        env.execute();
    }
    
}

        这里我们可以看到, addSink 传入的参数是一个 FlinkKafkaProducer。这也很好理解,因为需要向 Kafka 写入数据,自然应该创建一个生产者。 FlinkKafkaProducer 继承了抽象类TwoPhaseCommitSinkFunction,这是一个实现了“两阶段提交”的 RichSinkFunction。两阶段提交提供了 Flink 向 Kafka 写入数据的事务性保证,能够真正做到精确一次(exactly once)的状态一致性。

步骤四:运行代码,在 Linux 主机启动一个消费者, 查看是否收到数据

bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic clicks

        我们可以看到消费者可以正常消费数据,证明向 Kafka 写入数据成功。另外,我们也可以读取其它的任意数据源,进行更多的完整测试。比较有趣的一个实验是,我们可以同时将 Kafka 作为 Flink 程序的数据源和写入结果的外部系统。只要将输入和输出的数据设置为不同的 topic,就可以看到整个系统运行的路径: Flink 从 Kakfa 的一个 topic 读取消费数据,然后进行处理转换,最终将结果数据写入 Kafka 的另一个 topic——数据从 Kafka 流入、经 Flink处理后又流回到 Kafka 去,这就是所谓的“数据管道” 应用。

3.4. 输出到 Redis

        Redis 是一个开源的内存式的数据存储,提供了像字符串(string)、哈希表(hash)、列表(list)、集合(set)、排序集合(sorted set)、位图(bitmap)、地理索引和流(stream)等一系列常用的数据结构。因为它运行速度快、支持的数据类型丰富,在实际项目中已经成为了架构优化必不可少的一员,一般用作数据库、缓存,也可以作为消息代理。

        Flink 没有直接提供官方的 Redis 连接器,不过 Bahir 项目还是担任了合格的辅助角色,为我们提供了 Flink-Redis 的连接工具。但版本升级略显滞后,目前连接器版本为 1.0,支持的Scala 版本最新到 2.11。由于我们的测试不涉及到 Scala 的相关版本变化,所以并不影响使用。在实际项目应用中,应该以匹配的组件版本运行。

具体测试步骤如下:

步骤一:导入的 Redis 连接器依赖

<dependency>
    <groupId>org.apache.bahir</groupId>
    <artifactId>flink-connector-redis_2.11</artifactId>
    <version>1.0</version>
</dependency>

步骤二:启动 Redis 集群

步骤三:编写输出到 Redis 的示例代码

        连接器为我们提供了一个 RedisSink,它继承了抽象类 RichSinkFunction,这就是已经实现好的向 Redis 写入数据的 SinkFunction。我们可以直接将 Event 数据输出到 Redis:

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.redis.RedisSink;
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;

public class SinkToRedisTest {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        // 创建一个到 redis 连接的配置
        FlinkJedisPoolConfig conf = new FlinkJedisPoolConfig.Builder().setHost("hadoop102").build();
        env.addSource(new ClickSource())
                .addSink(new RedisSink<Event>(conf, new MyRedisMapper()));
        env.execute();
    }
    
}

这里 RedisSink 的构造方法需要传入两个参数:

  • JFlinkJedisConfigBase: Jedis 的连接配置
  • RedisMapper: Redis 映射类接口,说明怎样将数据转换成可以写入 Redis 的类型

接下来主要就是定义一个 Redis 的映射类,实现 RedisMapper 接口。

public static class MyRedisMapper implements RedisMapper<Event> {
    @Override
    public String getKeyFromData(Event e) {
        return e.user;
    }

    @Override
    public String getValueFromData(Event e) {
        return e.url;
    }

    @Override
    public RedisCommandDescription getCommandDescription() {
        return new RedisCommandDescription(RedisCommand.HSET, "clicks");
    }
}

在这里我们可以看到,保存到 Redis 时调用的命令是 HSET,所以是保存为哈希表(hash),表名为“clicks”;保存的数据以 user 为 key,以 url 为 value,每来一条数据就会做一次转换。

步骤四:运行代码, Redis 查看是否收到数据

$ redis-cli
hadoop102:6379>hgetall clicks
1) “Mary”
2) “./home”
3) “Bob”
4) “./cart”

我们会发现, 发送了多条数据, Redis 中只有 2 条数据. 原因是 hash 中的 key 重复了, 后面的会把前面的覆盖掉。

3.5. 输出到 Elasticsearch

        ElasticSearch 是一个分布式的开源搜索和分析引擎,适用于所有类型的数据。 ElasticSearch有着简洁的 REST 风格的 API,以良好的分布式特性、速度和可扩展性而闻名,在大数据领域应用非常广泛。

        Flink 为 ElasticSearch 专门提供了官方的 Sink 连接器, Flink 1.13 支持当前最新版本的ElasticSearch。

写入数据的 ElasticSearch 的测试步骤如下:

步骤一:添加 Elasticsearch 连接器依赖

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-elasticsearch7_${scala.binary.version}</artifactId>
    <version>${flink.version}</version>
</dependency>

步骤二:启动 Elasticsearch 集群

步骤三:编写输出到 Elasticsearch 的示例代码

import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
import org.apache.flink.streaming.connectors.elasticsearch7.ElasticsearchSink;
import org.apache.http.HttpHost;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.Requests;

import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.HashMap;

public class SinkToEsTest {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        DataStreamSource<Event> stream = env.fromElements(
                new Event("Mary", "./home", 1000L),
                new Event("Bob", "./cart", 2000L),
                new Event("Alice", "./prod?id=100", 3000L),
                new Event("Alice", "./prod?id=200", 3500L),
                new Event("Bob", "./prod?id=2", 2500L),
                new Event("Alice", "./prod?id=300", 3600L),
                new Event("Bob", "./home", 3000L),
                new Event("Bob", "./prod?id=1", 2300L),
                new Event("Bob", "./prod?id=3", 3300L)
        );
        ArrayList<HttpHost> httpHosts = new ArrayList<>();
        httpHosts.add(new HttpHost("hadoop102", 9200, "http"));

        // 创建一个 ElasticsearchSinkFunction
        ElasticsearchSinkFunction<Event> elasticsearchSinkFunction = new ElasticsearchSinkFunction<Event>() {
            @Override
            public void process(Event element, RuntimeContext ctx, RequestIndexer indexer) {
                HashMap<String, String> data = new HashMap<>();
                data.put(element.user, element.url);
                IndexRequest request = Requests.indexRequest()
                        .index("clicks")
                        .type("type") // Es 6 必须定义 type
                        .source(data);
                indexer.add(request);
            }
        };
        stream.addSink(new ElasticsearchSink.Builder<Event>(httpHosts, elasticsearchSinkFunction).build());
        stream.addSink(esBuilder.build());
        env.execute();

    }
}

        与RedisSink类似,连接器也为我们实现了写入到Elasticsearch的SinkFunction——ElasticsearchSink。区别在于,这个类的构造方法是私有(private)的,我们需要使用ElasticsearchSink的Builder内部静态类,调用它的build()方法才能创建出真正的SinkFunction。

而Builder的构造方法中又有两个参数:

  • httpHosts:连接到的 Elasticsearch 集群主机列表
  • elasticsearchSinkFunction:这并不是我们所说的 SinkFunction,而是用来说明具体处理逻辑、准备数据向 Elasticsearch 发送请求的函数

具体的操作需要重写中 elasticsearchSinkFunction 中的 process 方法,我们可以将要发送的数据放在一个 HashMap 中,包装成 IndexRequest 向外部发送 HTTP 请求。

步骤四:运行代码,访问 Elasticsearch 查看是否收到数据,查询结果如下所示

{
    "_shards": {
        "failed": 0,
        "skipped": 0,
        "successful": 1,
        "total": 1
    },
    "hits": {
        "hits": [
            {
                "_id": "dAxBYHoB7eAyu-y5suyU",
                "_index": "clicks",
                "_score": 1.0,
                "_source": {
                    "Mary": "./home"
                },
                "_type": "_doc"
            },
            {},
            {}
        ],
        "max_score": 1.0,
        "total": {
            "relation": "eq",
            "value": 9
        }
    },
    "timed_out": false,
    "took": 5
}

3.6. 输出到 MySQL(JDBC)

        关系型数据库有着非常好的结构化数据设计、方便的 SQL 查询,是很多企业中业务数据存储的主要形式。 MySQL 就是其中的典型代表。尽管在大数据处理中直接与 MySQL 交互的场景不多,但最终处理的计算结果是要给外部应用消费使用的,而外部应用读取的数据存储往往就是 MySQL。所以我们也需要知道如何将数据输出到 MySQL 这样的传统数据库。

写入数据的 MySQL 的测试步骤如下:

步骤一:添加依赖

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-jdbc_${scala.binary.version}</artifactId>
    <version>${flink.version}</version>
</dependency>
<dependency>
    <groupId>mysql</groupId>
    <artifactId>mysql-connector-java</artifactId>
    <version>5.1.47</version>
</dependency>

步骤二:启动 MySQL,在 database 库下建表 clicks

create table clicks(
  user varchar(20) not null,
  url varchar(100) not null
);

步骤三:编写输出到 MySQL 的示例代码

import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
import org.apache.flink.connector.jdbc.JdbcSink;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class SinkToMySQL {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        DataStreamSource<Event> stream = env.fromElements(
                new Event("Mary", "./home", 1000L),
                new Event("Bob", "./cart", 2000L),
                new Event("Alice", "./prod?id=100", 3000L),
                new Event("Alice", "./prod?id=200", 3500L),
                new Event("Bob", "./prod?id=2", 2500L),
                new Event("Alice", "./prod?id=300", 3600L),
                new Event("Bob", "./home", 3000L),
                new Event("Bob", "./prod?id=1", 2300L),
                new Event("Bob", "./prod?id=3", 3300L)
        );

        stream.addSink(
                JdbcSink.sink(
                        "INSERT INTO clicks (user, url) VALUES (?, ?)",
                        (statement, r) -> {
                            statement.setString(1, r.user);
                            statement.setString(2, r.url);
                        },
                        JdbcExecutionOptions.builder()
                                .withBatchSize(1000)
                                .withBatchIntervalMs(200)
                                .withMaxRetries(5)
                                .build(),
                        new JdbcConnectionOptions
                                .JdbcConnectionOptionsBuilder()
                                .withUrl("jdbc:mysql://localhost:3306/userbehavior")  // 对于 MySQL 5.7, 用"com.mysql.jdbc.Driver"
                                .withDriverName("com.mysql.cj.jdbc.Driver")
                                .withUsername("username")
                                .withPassword("password")
                                .build()
                )
        );

        env.execute();

    }

}

步骤四:运行代码,用客户端连接 MySQL,查看是否成功写入数据

mysql> select * from clicks;
+------+--------------+
| user | url |
+------+--------------+
| Mary | ./home |
| Alice| ./prod?id=300 |
| Bob | ./prod?id=3 |
+------+---------------+
3 rows in set (0.00 sec)

3.7. 自定义 Sink 输出

        如果我们想将数据存储到我们自己的存储设备中,而 Flink 并没有提供可以直接使用的连接器,又该怎么办呢?

        与 Source 类似, Flink 为我们提供了通用的 SinkFunction 接口和对应的 RichSinkDunction抽象类,只要实现它,通过简单地调用 DataStream 的.addSink()方法就可以自定义写入任何外部存储。之前与外部系统的连接,其实都是连接器帮我们实现了 SinkFunction,现在既然没有现成的,我们就只好自力更生了。 例如, Flink 并没有提供 HBase 的连接器,所以需要我们自己写。

        在实现 SinkFunction 的时候,需要重写的一个关键方法 invoke(),在这个方法中我们就可以实现将流里的数据发送出去的逻辑。

        我们这里使用了 SinkFunction 的富函数版本,因为这里我们又使用到了生命周期的概念,创建 HBase 的连接以及关闭 HBase 的连接需要分别放在 open()方法和 close()方法中。

步骤一:导入依赖

<dependency>
    <groupId>org.apache.hbase</groupId>
    <artifactId>hbase-client</artifactId>
    <version>${hbase.version}</version>
</dependency>

步骤二:编写输出到 HBase 的示例代码

import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Table;

import java.nio.charset.StandardCharsets;

public class SinkCustomtoHBase {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        env
                .fromElements("hello", "world")
                .addSink(
                        new RichSinkFunction<String>() {

                            // 管理 Hbase 的配置信息,这里因为 Configuration 的重名问题,将类以完整路径导入
                            public org.apache.hadoop.conf.Configuration configuration;

                            // 管理 Hbase 连接
                            public Connection connection;

                            @Override
                            public void open(Configuration parameters) throws Exception {
                                super.open(parameters);
                                configuration = HBaseConfiguration.create();
                                configuration.set("hbase.zookeeper.quorum", "hadoop102:2181");
                                connection = ConnectionFactory.createConnection(configuration);
                            }

                            @Override
                            public void invoke(String value, Context context) throws Exception {

                                // 表名为 test
                                Table table = connection.getTable(TableName.valueOf("test"));

                                // 指定 rowkey
                                Put put = new Put("rowkey".getBytes(StandardCharsets.UTF_8));

                                // 添加数据
                                put.addColumn(
                                        "info".getBytes(StandardCharsets.UTF_8),  // 指定列名
                                        value.getBytes(StandardCharsets.UTF_8),  // 写入的数据
                                        "1".getBytes(StandardCharsets.UTF_8) // 写入的数据
                                );

                                // 执行 put 操作
                                table.put(put);

                                // 将表关闭
                                table.close();

                            }

                            @Override
                            public void close() throws Exception {
                                // 关闭连接
                                super.close();
                                connection.close();
                            }

                        }

                );

        env.execute();

    }
    
}

步骤三:可以在 HBase 查看插入的数据

4. 设置watermark以及延迟时间

注意:在Flink中,1.12版本之前需要手动指定时间类型(EventTime还是ProcessTime等),但是在1.12版本及以后,就默认使用事件时间,当使用事件时间时就不要去手动指定了。

4.1. 在1.11版本以及之前

.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<JSONObject>(Time.seconds(60)) {
    @Override
    public long extractTimestamp(JSONObject element) {
        return element.getLongValue("time") * 1000L;
    }
})

4.2. 在1.12版本以及之后

.assignTimestampsAndWatermarks(
        WatermarkStrategy
                .<JSONObject>forBoundedOutOfOrderness(Duration.ofSeconds(60))
                .withTimestampAssigner((SerializableTimestampAssigner<JSONObject>) (element, recordTimestamp) -> element.getLongValue("time") * 1000L)
)

5. Flink中的窗口

5.1. 窗口函数

        Flink 是一种流式计算引擎,主要是来处理无界数据流的,数据源源不断、无穷无尽。想要更加方便高效地处理无界流,一种方式就是将无限数据切割成有限的“数据块”进行处理,这就是所谓的“窗口”(Window)。

各流之间的转换:

  • 增量聚合函数(incremental aggregation functions)
    • 归约函数(ReduceFunction):输入类型、中间状态和输出类型一致;
    • AggregateFunction:输入类型、中间状态和输出类型可以不一致;
  • 全窗口函数(full window functions)
    • 窗口函数(WindowFunction):能进行基本事件处理;
    • 处理窗口函数(ProcessWindowFunction):在基本事件处理上还有窗口的上下文环境;

窗口API总结(下图):

5.2. 窗口的触发器

Trigger 源码:

import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;

public abstract class Trigger<T, W extends Window> implements Serializable {
    /**
     * 只要有元素落⼊到当前窗⼝, 就会调⽤该⽅法
     *
     * @param element   收到的元素
     * @param timestamp 元素抵达时间.
     * @param window    元素所属的window窗口.
     * @param ctx       ⼀个上下⽂对象,通常⽤该对象注册 timer(ProcessingTime/EventTime) 回调.
     */
    public abstract TriggerResult onElement(T var1, long var2, W var4, Trigger.TriggerContext var5) throws Exception;

    /**
     * processing-time 定时器回调函数
     *
     * @param time   定时器触发的时间.
     * @param window 定时器触发的窗口对象.
     * @param ctx    ⼀个上下⽂对象,通常⽤该对象注册 timer(ProcessingTime/EventTime) 回调.
     */
    public abstract TriggerResult onProcessingTime(long var1, W var3, Trigger.TriggerContext var4) throws Exception;

    /**
     * event-time 定时器回调函数
     *
     * @param time   定时器触发的时间.
     * @param window 定时器触发的窗口对象.
     * @param ctx    ⼀个上下⽂对象,通常⽤该对象注册 timer(ProcessingTime/EventTime) 回调.
     */
    public abstract TriggerResult onEventTime(long var1, W var3, Trigger.TriggerContext var4) throws Exception;

    /**
     * 当 多个窗口合并到⼀个窗⼝的时候,调用该方法法,例如系统SessionWindow
     *
     * @param window 合并后的新窗口对象
     * @param ctx    ⼀个上下⽂对象,通常用该对象注册 timer(ProcessingTime/EventTime)回调以及访问状态
     */
    public void onMerge(W window, Trigger.OnMergeContext ctx) throws Exception {
        throw new UnsupportedOperationException("This trigger does not support merging.");
    }

    /**
     * 当窗口被删除后执⾏所需的任何操作。例如:可以清除定时器或者删除状态数据
     */
    public abstract void clear(W var1, Trigger.TriggerContext var2) throws Exception;
}

TriggerResult 源码:

public enum TriggerResult {
	// 表示对窗口不执行任何操作。即不触发窗口计算,也不删除元素。
    CONTINUE(false, false),
    // 触发窗口计算,输出结果,然后将窗口中的数据和窗口进行清除。
    FIRE_AND_PURGE(true, true),
    // 触发窗口计算,但是保留窗口元素
    FIRE(true, false),
    // 不触发窗口计算,丢弃窗口,并且删除窗口的元素。
    PURGE(false, true);

    private final boolean fire;
    private final boolean purge;

    private TriggerResult(boolean fire, boolean purge) {
        this.purge = purge;
        this.fire = fire;
    }

    public boolean isFire() {
        return this.fire;
    }

    public boolean isPurge() {
        return this.purge;
    }
}

Flink 预置的Trigger:

  • EventTimeTrigger:通过对比EventTime和窗口的Endtime确定是否触发窗口计算,如果EventTime大于Window EndTime则触发,否则不触发,窗口将继续等待。
  • ProcessTimeTrigger:通过对比ProcessTime和窗口EndTme确定是否触发窗口,如果ProcessTime大于EndTime则触发计算,否则窗口继续等待。
  • ContinuousEventTimeTrigger:根据间隔时间周期性触发窗口或者Window的结束时间小于当前EndTime触发窗口计算。
  • ContinuousProcessingTimeTrigger:根据间隔时间周期性触发窗口或者Window的结束时间小于当前ProcessTime触发窗口计算。
  • CountTrigger:根据接入数据量是否超过设定的阙值判断是否触发窗口计算。
  • DeltaTrigger:根据接入数据计算出来的Delta指标是否超过指定的Threshold去判断是否触发窗口计算。
  • PurgingTrigger:可以将任意触发器作为参数转换为Purge类型的触发器,计算完成后数据将被清理。
  • NeverTrigger:任何时候都不触发窗口计算

使用示例:

  • 按次数触发:        .trigger(CountTrigger.of(1))
  • 按时间触发:        .trigger(ContinuousEventTimeTrigger.of(Time.seconds(30)))

6. 多流转换

6.1. 分流

        所谓“分流”,就是将一条数据流拆分成完全独立的两条、甚至多条流。也就是基于一个DataStream,得到完全平等的多个子 DataStream。一般来说,我们会定义一些筛选条件,将符合条件的数据拣选出来放到对应的流里。

  • 实现方式一:使用filter() 方法进行过滤
  • 实现方式二:使用侧输出流( ctx.output() )

方式一会造成流数据重复,所以一般情况下,是使用方式二的侧输出流。

6.2. 基本合流操作

方式一:联合(Union)

        最简单的合流操作,就是直接将多条流合在一起,叫作流的“联合”( union)。联合操作要求必须流中的数据类型必须相同,合并之后的新流会包括所有流中的元素,数据类型不变。这种合流方式非常简单粗暴,就像公路上多个车道汇在一起一样。

实现方式: stream1.union(stream2, stream3, ...) 

注意:多条流的合并,某种意义上也可以看作是多个并行任务向同一个下游任务汇合的过程。所以水位线也使用最小的那个为准。

方式二:连接(Connect)

  • 连接流(ConnectedStreams):两条流的连接就像是“一国两制”,两条流可以保持各自的数据类型、处理方式也可以不同,不过最终还是会统一到同一个 DataStream 中。这里可以的调用的同处理方法有.map()/.flatMap(),以及.process()方法。因为2条流的类型不同,所以在处理算子中有2个处理方法,分别对2条流的数据进行处理。
  • CoProcessFunction:在通过连接流后的处理方法中,还可以使用 CoProcessFunction 方法来处理。在这个方法中,可以分别对2条流进行处理,并且可以设置状态和定时器。可以用来实现实时对账等功能。注意:使用此方法之前需要进行keyBy。

方式三:广播连接流(BroadcastConnectedStream)

        关于两条流的连接,还有一种比较特殊的用法: DataStream 调用.connect()方法时,传入的参数也可以不是一个 DataStream,而是一个“广播流”(BroadcastStream),这时合并两流得到的就变成了一个“广播连接流”(BroadcastConnectedStream)。
        这种连接方式往往用在需要动态定义某些规则或配置的场景。因为规则是实时变动的,所以我们可以用一个单独的流来获取规则数据;而这些规则或配置是对整个应用全局有效的,所以不能只把这数据传递给一个下游并行子任务处理,而是要“广播”(broadcast)给所有的并行子任务。而下游子任务收到广播出来的规则,会把它保存成一个状态,这就是所谓的“广播状态”(broadcast state)。

DataStream<String> output=stream
        .connect(ruleBroadcastStream)
        .process(new BroadcastProcessFunction<>(){...});
import org.apache.flink.streaming.api.functions.co.BaseBroadcastProcessFunction;

public abstract class BroadcastProcessFunction<IN1, IN2, OUT> extends BaseBroadcastProcessFunction {
    
    ...

    public abstract void processElement(IN1 value, ReadOnlyContext ctx, Collector<OUT> out) throws Exception;

    public abstract void processBroadcastElement(IN2 value, Context ctx, Collector<OUT> out) throws Exception;

    ...
    
}

6.3. 基于时间的合流——双流联结(Join)

  • 方式一:窗口联结(Window Join)
stream1.join(stream2)
        .where(<KeySelector>)
        .equalTo(<KeySelector>)
        .window(<WindowAssigner>)
        .apply(<JoinFunction>)
  • 方式二:间隔联结(Interval Join)
stream1
        .keyBy(<KeySelector>)
        .intervalJoin(stream2.keyBy(<KeySelector>))
        .between(Time.milliseconds(-2),Time.milliseconds(1))
        .process(new ProcessJoinFunction<Integer, Integer, String(){

            @Override
            public void processElement(Integer left,Integer right,Context ctx,
                Collector<String> out){
                out.collect(left+","+right);
            }
        });

6.4. 窗口同组联结(Window CoGroup)

        除窗口联结和间隔联结之外, Flink 还提供了一个“窗口同组联结”(window coGroup)操作。它的用法跟 window join 非常类似,也是将两条流合并之后开窗处理匹配的元素,调用时只需要将.join()换为.coGroup()就可以了。

stream1.coGroup(stream2)
    .where(<KeySelector>)
    .equalTo(<KeySelector>)
    .window(TumblingEventTimeWindows.of(Time.hours(1)))
    .apply(<CoGroupFunction>)

        与 window join 的区别在于,调用.apply()方法定义具体操作时,传入的是一个CoGroupFunction。这也是一个函数类接口,源码中定义如下:

public interface CoGroupFunction<IN1, IN2, O> extends Function, Serializable {
    void coGroup(Iterable<IN1> first, Iterable<IN2> second, Collector<O> out) throws Exception;
}

7. 状态编程

7.1. 状态的分类

分类一:托管状态(Managed State)和原始状态(Raw State)

        托管状态就是由 Flink 统一管理的,状态的存储访问、故障恢复和重组等一系列问题都由 Flink 实现,我们只要调接口就可以;而原始状态则是自定义的,相当于就是开辟了一块内存,需要我们自己管理,实现状态的序列化和故障恢复。

  • 托管状态:Flink提供了值状态(ValueState)、列表状态(ListState)、映射状态(MapState)、聚合状态(AggregateState)等多种结构,内部支持各种数据类型。聚合、窗口等算子中内置的状态,就都是托管状态;我们也可以在富函数类(RichFunction)中通过上下文来自定义状态,这些也都是托管状态。
  • 原始状态:Flink 不会对状态进行任何自动操作,也不知道状态的具体数据类型,只会把它当作最原始的字节(Byte)数组来存储。我们需要花费大量的精力来处理状态的管理和维护。(不建议使用,也一般不会使用)

分类二:算子状态(Operator State)和按键分区状态(Keyed State)

        这两个状态都是托管状态(Managed State)。不同的 slot 在计算资源上是物理隔离的,所以 Flink能管理的状态在并行任务间是无法共享的,每个状态只能针对当前子任务的实例有效。而很多有状态的操作(比如聚合、窗口)都是要先做 keyBy 进行按键分区的。按键分区之后,任务所进行的所有计算都应该只针对当前 key 有效,所以状态也应该按照 key 彼此隔离。在这种情况下,状态的访问方式又会有所不同。基于这样的想法,我们又可以将托管状态分为两类:算子状态和按键分区状态。

  • 算子状态(Operator State):状态作用范围限定为当前的算子任务实例,也就是只对当前并行子任务实例有效。这就意味着对于一个并行子任务,占据了一个“分区”,它所处理的所有数据都会访问到相同的状态,状态对于同一任务而言是共享的。

  • 按键分区状态(Keyed State):状态是根据输入流中定义的键( key)来维护和访问的,所以只能定义在按键分区流(KeyedStream)中,也就 keyBy 之后才可以使用。除了聚合的结果是以 Keyed State 的形式保存的,通过富函数类(Rich Function)来自定义的状态也是 Keyed State (在富函数中,我们可以调用.getRuntimeContext()获取当前的运行时上下文( RuntimeContext),进而获取到访问状态的句柄;这种富函数中自定义的状态也是 Keyed State) 。所以即使是 map、 filter 这样无状态的基本转换算子,我们也可以通过富函数类给它们“追加” Keyed State,或者实现 CheckpointedFunction 接口来定义 Operator State;从这个角度讲,Flink 中所有的算子都可以是有状态的,不愧是“有状态的流处理”。需要注意,使用 Keyed State 必须基于 KeyedStream。没有进行 keyBy 分区的 DataStream,即使转换算子实现了对应的富函数类,也不能通过运行时上下文访问 Keyed State。

无论是 Keyed State 还是 Operator State,它们都是在本地实例上维护的,也就是说每个并行子任务维护着对应的状态,算子的子任务之间状态不共享。

7.2. 状态生存时间(TTL)

        在实际应用中,很多状态会随着时间的推移逐渐增长,如果不加以限制,最终就会导致存储空间的耗尽。一个优化的思路是直接在代码中调用.clear()方法去清除状态,但是有时候我们的逻辑要求不能直接清除。这时就需要配置一个状态的“生存时间”(time-to-live, TTL),当状态在内存中存在的时间超出这个值时,就将它清除。

        具体实现上,如果用一个进程不停地扫描所有状态看是否过期,显然会占用大量资源做无用功。状态的失效其实不需要立即删除,所以我们可以给状态附加一个属性,也就是状态的“失效时间”。状态创建的时候,设置 失效时间 = 当前时间 + TTL;之后如果有对状态的访问和修改,我们可以再对失效时间进行更新;当设置的清除条件被触发时(比如,状态被访问的时候,或者每隔一段时间扫描一次失效状态),就可以判断状态是否失效、从而进行清除了。

        配置状态的 TTL 时,需要创建一个 StateTtlConfig 配置对象,然后调用状态描述器的.enableTimeToLive()方法启动 TTL 功能。

StateTtlConfig ttlConfig = StateTtlConfig
    .newBuilder(Time.seconds(10))
    .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
    .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
    .build();
ValueStateDescriptor<String> stateDescriptor = new ValueStateDescriptor<>("mystate", String.class);
stateDescriptor.enableTimeToLive(ttlConfig);

这里用到了几个配置项:

  •  .newBuilder():状态 TTL 配置的构造器方法,必须调用,返回一个 Builder 之后再调用.build()方法就可以得到 StateTtlConfig 了。方法需要传入一个 Time 作为参数,这就是设定的状态生存时间。
  •  .setUpdateType():设置更新类型。更新类型指定了什么时候更新状态失效时间,这里的 OnCreateAndWrite表示只有创建状态和更改状态(写操作)时更新失效时间。另一种类型 OnReadAndWrite 则表示无论读写操作都会更新失效时间,也就是只要对状态进行了访问,就表明它是活跃的,从而延长生存时间。这个配置默认为 OnCreateAndWrite。
  •  .setStateVisibility():设置状态的可见性。所谓的“状态可见性”,是指因为清除操作并不是实时的,所以当状态过期之后还有可能基于存在,这时如果对它进行访问,能否正常读取到就是一个问题了。这里设置的 NeverReturnExpired 是默认行为,表示从不返回过期值,也就是只要过期就认为它已经被清除了,应用不能继续读取;这在处理会话或者隐私数据时比较重要。对应的另一种配置是 ReturnExpireDefNotCleanedUp,就是如果过期状态还存在,就返回它的值。

        除此之外, TTL 配置还可以设置在保存检查点(checkpoint)时触发清除操作,或者配置增量的清理(incremental cleanup),还可以针对 RocksDB 状态后端使用压缩过滤器(compactionfilter)进行后台清理。

        这里需要注意,目前的 TTL 设置只支持处理时间。另外,所有集合类型的状态(例如ListState、 MapState)在设置 TTL 时,都是针对每一项(per-entry)元素的。也就是说,一个列表状态中的每一个元素,都会以自己的失效时间来进行清理,而不是整个列表一起清理。

7.3. 状态后端

1. 状态后端的分类:

  • 哈希表状态后端(HashMapStateBackend):把状态存放在内存里。
  • 内嵌 RocksDB 状态后端(EmbeddedRocksDBStateBackend):可以把数据持久化到本地硬盘。

2. 在flink-conf.yaml 中使用state.backend 来配置默认状态后端:

  • state.backend: hashmap
  • state.backend: rocksdb
# 默认状态后端
state.backend: hashmap
# 存放检查点的文件路径
state.checkpoints.dir: hdfs://namenode:40010/flink/checkpoints

3. 为每个作业(Per-job)单独配置状态后端:

  • env.setStateBackend(new HashMapStateBackend());
  • env.setStateBackend(new EmbeddedRocksDBStateBackend());
<!--需要在idea中导入rocksdb包,但Flink 发行版中默认就包含了 RocksDB,所以打包时不需要导入-->
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-statebackend-rocksdb_${scala.binary.version}</artifactId>
    <version>1.13.0</version>
</dependency>

8. 容错机制

8.1. 检查点(Checkpoint )

1. 启用检查点:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 每隔 1 秒启动一次检查点保存(单位:毫秒)
env.enableCheckpointing(1000);

2. 检查点存储(Checkpoint Storage):

  • 配置存储检查点到 JobManager 堆内存:env.getCheckpointConfig().setCheckpointStorage(new JobManagerCheckpointStorage());
  • 配置存储检查点到文件系统:env.getCheckpointConfig().setCheckpointStorage(new FileSystemCheckpointStorage("hdfs://namenode:40010/flink/checkpoints"));

3. 示例代码:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// 启用检查点,间隔时间 1 秒
env.enableCheckpointing(1000);
CheckpointConfig checkpointConfig = env.getCheckpointConfig();

// 设置精确一次模式
checkpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);

// 最小间隔时间 500 毫秒
checkpointConfig.setMinPauseBetweenCheckpoints(500);

// 超时时间 1 分钟
checkpointConfig.setCheckpointTimeout(60000);

// 同时只能有一个检查点
checkpointConfig.setMaxConcurrentCheckpoints(1);

// 开启检查点的外部持久化保存,作业取消后依然保留
checkpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

// 启用不对齐的检查点保存方式
checkpointConfig.enableUnalignedCheckpoints();

// 设置检查点存储,可以直接传入一个 String,指定文件系统的路径
checkpointConfig.setCheckpointStorage("hdfs://my/checkpoint/dir")

8.2. 保存点(Savepoint)

        保存点与检查点最大的区别,就是触发的时机。检查点是由 Flink 自动管理的,定期创建,发生故障之后自动读取进行恢复,这是一个“自动存盘”的功能;而保存点不会自动创建,必须由用户明确地手动触发保存操作,所以就是“手动存盘”。因此两者尽管原理一致,但用途就有所差别了:检查点主要用来做故障恢复,是容错机制的核心;保存点则更加灵活,可以用来做有计划的手动备份和恢复。

保存点适用的具体场景有:

  • 版本管理和归档存储
  • 更新 Flink 版本
  • 更新应用程序
  • 调整并行度
  • 暂停应用程序

需要注意的是,保存点能够在程序更改的时候依然兼容,前提是状态的拓扑结构和数据类型不变。我们知道保存点中状态都是以算子 ID-状态名称这样的 key-value 组织起来的,算子ID 可以在代码中直接调用 SingleOutputStreamOperator 的.uid()方法来进行指定:

DataStream<String> stream = env
    .addSource(new StatefulSource())
    .uid("source-id")
    .map(new StatefulMapper())
    .uid("mapper-id")
    .print();

        对于没有设置 ID 的算子, Flink 默认会自动进行设置,所以在重新启动应用后可能会导致ID 不同而无法兼容以前的状态。 所以为了方便后续的维护,强烈建议在程序中为每一个算子手动指定 ID。

创建保存点操作如下:

# 要在命令行中为运行的作业创建一个保存点镜像,只需要执行:
bin/flink savepoint :jobId [:targetDirectory]

# 这里 jobId 需要填充要做镜像保存的作业 ID,目标路径 targetDirectory 可选,表示保存点存储的路径。
# 对于保存点的默认路径,可以通过配置文件 flink-conf.yaml 中的 state.savepoints.dir 项来设定:
state.savepoints.dir: hdfs:///flink/savepoints

# 当然对于单独的作业,我们也可以在程序代码中通过执行环境来设置:
env.setDefaultSavepointDir("hdfs:///flink/savepoints");

# 由于创建保存点一般都是希望更改环境之后重启,所以创建之后往往紧接着就是停掉作业的操作。除了对运行的作业创建保存点,我们也可以在停掉一个作业时直接创建保存点:
bin/flink stop --savepointPath [:targetDirectory] :jobId

从保存点重启应用操作如下:

# 从保存点重启应用,只需要增加一个-s 参数,指定保存点的路径就可以了
bin/flink run -s :savepointPath [:runArgs]

8.3. 状态一致性

1. 一致性的级别

  • 最多一次(AT-MOST-ONCE)
  • 至少一次(AT-LEAST-ONCE)
  • 精确一次(EXACTLY-ONCE)

2. 端到端精确一次(end-to-end exactly-once)

  • 输入端保证:外部数据源就必须拥有重放数据的能力
  • 输出端保证:幂等(idempotent)写入 和 事务(transactional)写入

3.  Flink 和 Kafka 连接时的精确一次保证

  • Flink 内部:Flink 内部可以通过检查点机制保证状态和处理结果的 exactly-once 语义
  • 输入端:输入数据源端的 Kafka 可以对数据进行持久化保存,并可以重置偏移量(offset)
  • 输出端:输出端保证 exactly-once 的最佳实现,当然就是两阶段提交(2PC)

4. Flink 和 Kafka 连接时的精确一次保证需要的配置

  • 必须启用检查点
  • 在 FlinkKafkaProducer 的构造函数中传入参数 Semantic.EXACTLY_ONCE
  • 配置 Kafka 读取数据的消费者的隔离级别

        这里所说的 Kafka,是写入的外部系统。预提交阶段数据已经写入,只是被标记为“未提交”(uncommitted),而 Kafka 中默认的隔离级别 isolation.level 是 read_uncommitted,也就是可以读取未提交的数据。这样一来,外部应用就可以直接消费未提交的数据,对于事务性的保证就失效了。所以应该将隔离级别配置为read_committed, 表示消费者遇到未提交的消息时,会停止从分区中消费数据,直到消息被标记为已提交才会再次恢复消费。当然,这样做的话,外部应用消费数据就会有显著的延迟。

  • 事务超时配置

        Flink 的 Kafka连接器中配置的事务超时时间 transaction.timeout.ms 默认是 1小时,而Kafka集群配置的事务最大超时时间 transaction.max.timeout.ms 默认是 15 分钟。所以在检查点保存时间很长时,有可能出现 Kafka 已经认为事务超时了,丢弃了预提交的数据;而 Sink 任务认为还可以继续等待。如果接下来检查点保存成功,发生故障后回滚到这个检查点的状态,这部分数据就被真正丢掉了。所以这两个超时时间,前者应该小于等于后者。

9. Table API 和 SQL

9.1. TableAPI依赖说明

Java 的“桥接器”(bridge),主要就是负责 Table API 和下层 DataStreamAPI 的连接支持,按照不同的语言分为 Java 版和 Scala 版:

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId>
    <version>${flink.version}</version>
</dependency>

如果我们希望在本地的集成开发环境(IDE)里运行 Table API 和 SQL,还需要引入以下依赖:

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
    <version>${flink.version}</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
    <version>${flink.version}</version>
</dependency>

        这里主要添加的依赖是一个“计划器”(planner),它是 Table API 的核心组件,负责提供运行时环境,并生成程序的执行计划。这里我们用到的是新版的 blink planner。由于 Flink 安装包的 lib 目录下会自带 planner,所以在生产集群环境中提交的作业不需要打包这个依赖。

        而在 Table API 的内部实现上,部分相关的代码是用 Scala 实现的,所以还需要额外添加一个Scala 版流处理的相关依赖。

另外,如果想实现自定义的数据格式来做序列化,可以引入下面的依赖:

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-common</artifactId>
    <version>${flink.version}</version>
</dependency>

9.2. 执行环境的创建

方案一:

import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;

EnvironmentSettings settings = EnvironmentSettings
    .newInstance()
    .inStreamingMode() // 使用流处理模式
    .build();

TableEnvironment tableEnv = TableEnvironment.create(settings);

方案二:

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);


TableAPI、FlinkSQL和FlinkCEP还未完全完成,待续...

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/342749.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【Python学习笔记】30.Python3 命名空间和作用域

前言 本章介绍Python的命名空间和作用域。 命名空间 先看看官方文档的一段话&#xff1a; A namespace is a mapping from names to objects.Most namespaces are currently implemented as Python dictionaries。 命名空间(Namespace)是从名称到对象的映射&#xff0c;大…

在中国程序员工作是青春饭吗?

上个月公司告诉我毕业了。 我打开boss直聘&#xff0c;一溜溜的外包公司和我打招呼。 我寻思我说不定啥时候就离开深圳了&#xff0c;外包不外包也无所谓钱到位就行。&#xff08;大公司学历不够格也进不去&#xff09; 结果华为、平安的外包告诉我&#xff0c;不好意思呀&a…

配置MyBatis Plus 的分页查询功能

配置MyBatis Plus 的分页查询功能一. 回顾Mysql分页查询二. 配置MyBatis Plus 分页功能2.1 配置分页拦截器2.2 进行分页查询三. 开启MyBatis Plus的运行日志一. 回顾Mysql分页查询 limit 是MySQL当中特有的&#xff01;其他数据库没有&#xff01;不通用&#xff1b;limit 是M…

ES6新增特性总结

目录 let和const命令 模板字符串 扩展运算符 解构赋值 对象解构 数组解构 扩展的函数 带参数默认值的函数 剩余参数表示不定参 箭头函数 扩展的对象 直接写入变量和函数&#xff0c;作为对象的属性和方法 新增Object.is()/Object.assign()/Object.keys/Object.val…

内网渗透(二十一)之Windows协议认证和密码抓取-Golden Ticket黄金票据制作原理及利用方式

系列文章第一章节之基础知识篇 内网渗透(一)之基础知识-内网渗透介绍和概述 内网渗透(二)之基础知识-工作组介绍 内网渗透(三)之基础知识-域环境的介绍和优点 内网渗透(四)之基础知识-搭建域环境 内网渗透(五)之基础知识-Active Directory活动目录介绍和使用 内网渗透(六)之基…

UI自动化测试是什么?什么项目适合做UI自动化测试

1. 页面对象设计模式的优势(1) 创建可以跨多个测试用例共享的代码(2) 减少代码的重复性(3) 如果界面需要维护&#xff0c;只需要修改一个地方&#xff0c;修改以及维护的成本减少2. 每个目录结构表达的意思(1) Base:基础层&#xff0c;是用来编写定位元素(2) Common&#xff1a…

[chatGPT] 如何通过JNI在Android上显示实时视频流

目录背景正文layout xmljavaCjava总结一&#xff1a;追问&#xff1a;CC总结二&#xff1a;答疑解惑C画蛇添足 视频不显示黑屏最后感叹科技的更新速度&#xff0c;真的程序员都可能会被替代&#xff0c;下一个时代最大的问题应该是劳动力过剩&#xff0c;导致社会性结构改变&am…

Hudi-集成 Hive

集成 Hive Hudi 源表对应一份 HDFS 数据&#xff0c;通过 Spark&#xff0c;Flink 组件或者 Hudi CLI&#xff0c;可以将 Hudi 表的数据映射为 *Hive 外部表*&#xff0c;基于该外部表&#xff0c; Hive可以方便的进行实时视图&#xff0c;读优化视图以及增量视图的查询。 集…

【软件相关】文献管理工具——Zotero

文章目录0 前期教程1 前言2 一些说明3 下载安装4 功能一&#xff1a;插入文献引用格式5 功能二&#xff1a;从网页下载文献pdf和题录6 功能三&#xff1a;数据多平台同步7 功能四&#xff1a;通过DOI添加条目及添加订阅8 安装xpi插件9 功能五&#xff1a;智能识别中英文文献10 …

The Number Of ThreadPoolExecutor

序言整理下Java 线程池中线程数量如何设置的依据巨人肩膀:https://blog.csdn.net/weilaizhixing007/article/details/125955693https://blog.csdn.net/yuyan_jia/article/details/120298564#:~:text%E4%B8%80%E4%B8%AA%E7%BA%BF%E7%A8%8B%E6%B1%A0%E5%A4%84%E7%90%86%E8%AE%A1,…

MongoDB--》文档查询的详细具体操作

目录 统计查询 分页列表查询 排序查询 正则的复杂条件查询 比较查询 包含查询 条件连接查询 统计查询 统计查询使用count()方法&#xff0c;其语法格式如下&#xff1a; db.collection.count(query,options) ParameterTypeDescriptionquerydocument查询选择条件optio…

int和Integer有什么区别?

第7讲 | int和Integer有什么区别&#xff1f; Java 虽然号称是面向对象的语言&#xff0c;但是原始数据类型仍然是重要的组成元素&#xff0c;所以在面试中&#xff0c;经常考察原始数据类型和包装类等 Java 语言特性。 今天我要问你的问题是&#xff0c;int 和 Integer 有什么…

JUC并发编程 Ⅱ -- 共享模型之管程(下)

文章目录wait notifywait / notify的原理API 介绍sleep与wait辨析优雅地使用wait/notify保护性暂停模式超时版 GuardedObjectjoin原理多任务版GuardedObject生产者-消费者模式定义实现Park & Unpark基本使用特点原理重新理解线程状态转换线程的活跃性死锁定位死锁活锁饥饿R…

vs2019 winform安装包,安装完成后默认启动

vs2019 winform安装包&#xff0c;安装完成后默认启动在要打包的项目中&#xff0c;新建安装类选中打包项目&#xff0c;右键&#xff0c;进入文件系统![在这里插入图片描述](https://img-blog.csdnimg.cn/62647c550ffa4d489f1d19f19bbd99b1.png)选中application folder 右键&a…

java微信小程序校园二手闲置商品交易跳蚤市场

基于跳蚤市场小程序的设计基于现有的手机,可以实现个人中心、用户管理、卖家管理、商品类型管理、商品信息管理、购物车管理、私聊信息管理、聊天回复管理、留言板管理、我的收藏管理、系统管理等功能。方便用户对商品信息等详细的了解及统计分析。根据系统功能需求建立的模块关…

后量子 KEM 方案:Kyber

参考文献&#xff1a; Bos J, Ducas L, Kiltz E, et al. CRYSTALS-Kyber: a CCA-secure module-lattice-based KEM[C]//2018 IEEE European Symposium on Security and Privacy (EuroS&P). IEEE, 2018: 353-367.Avanzi R, Bos J, Ducas L, et al. Crystals-kyber[J]. NIST…

AlphaFold 2 处理蛋白质折叠问题

蛋白质是一个较长的氨基酸序列&#xff0c;比如100个氨基酸的规模&#xff0c;如此长的氨基酸序列连在一起是不稳定的&#xff0c;它们会卷在一起&#xff0c;形成一个独特的3D结构&#xff0c;这个3D结构的形状决定了蛋白质的功能。 蛋白质结构预测&#xff08;蛋白质折叠问题…

Android 一体机研发之修改系统设置————屏幕亮度

Android 一体机研发之修改系统设置————屏幕亮度 Android 一体机研发之修改系统设置————声音 Android 一体机研发之修改系统设置————自动锁屏 前言 最近工作略微有点儿空闲&#xff0c;抽空给大家总结一下&#xff1a;近期一直搞得一体机app研发&#xff0c;适用…

recv和明文收包分析

我们CTRLg 跳到recv 分析收包函数 发现函数会断并且收包函数返回值(收包包长)也会不断变化 那么证明recv是真正的收包函数&#xff0c;游戏没有重新实现该函数 我们只要分析该函数即可 在recv函数执行完毕以后下断 eax是包长,esi28是包指针 我们上2个号&#xff0c;让另外…

项目(今日指数之环境搭建)

一 项目架构1.1 今日指数技术选型【1】前端技术【2】后端技术栈【3】整体概览1.2 核心业务介绍【1】业务结构预览【2】业务结构预览1.定时任务调度服务XXL-JOB通过RestTemplate多线程动态拉去股票接口数据&#xff0c;刷入数据库&#xff1b; 2.国内指数服务 3.板块指数服务 4.…